Skip to content

Commit

Permalink
[AIRFLOW-6516] BugFix: airflow.cfg does not exist in Volume Mounts
Browse files Browse the repository at this point in the history
  • Loading branch information
kaxil committed Jan 9, 2020
1 parent 550521f commit ba2217b
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 17 deletions.
43 changes: 27 additions & 16 deletions airflow/contrib/kubernetes/worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,33 @@ def _construct_volume(name, claim, host):
'mode': 0o440
}

if self.kube_config.airflow_local_settings_configmap:
config_path = '{}/config/airflow_local_settings.py'.format(self.worker_airflow_home)

if self.kube_config.airflow_local_settings_configmap != self.kube_config.airflow_configmap:
config_volume_name = 'airflow-local-settings'
volumes[config_volume_name] = {
'name': config_volume_name,
'configMap': {
'name': self.kube_config.airflow_local_settings_configmap
}
}

volume_mounts[config_volume_name] = {
'name': config_volume_name,
'mountPath': config_path,
'subPath': 'airflow_local_settings.py',
'readOnly': True
}

else:
volume_mounts['airflow-local-settings'] = {
'name': 'airflow-config',
'mountPath': config_path,
'subPath': 'airflow_local_settings.py',
'readOnly': True
}

# Mount the airflow.cfg file via a configmap the user has specified
if self.kube_config.airflow_configmap:
config_volume_name = 'airflow-config'
Expand All @@ -328,22 +355,6 @@ def _construct_volume(name, claim, host):
'readOnly': True
}

if self.kube_config.airflow_local_settings_configmap:
config_volume_name = 'airflow-config'
config_path = '{}/config/airflow_local_settings.py'.format(self.worker_airflow_home)
volumes[config_volume_name] = {
'name': config_volume_name,
'configMap': {
'name': self.kube_config.airflow_local_settings_configmap
}
}
volume_mounts[config_volume_name] = {
'name': config_volume_name,
'mountPath': config_path,
'subPath': 'airflow_local_settings.py',
'readOnly': True
}

return volumes, volume_mounts

def generate_dag_volume_mount_path(self):
Expand Down
185 changes: 184 additions & 1 deletion tests/contrib/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,16 @@ def setUp(self):
self.kube_config.git_dags_folder_mount_point = None
self.kube_config.kube_labels = {'dag_id': 'original_dag_id', 'my_label': 'label_id'}

def tearDown(self):
self.kube_config = None

def test_worker_configuration_no_subpaths(self):
worker_config = WorkerConfiguration(self.kube_config)
volumes, volume_mounts = worker_config._get_volumes_and_mounts()
volumes_list = [value for value in volumes.values()]
volume_mounts_list = [value for value in volume_mounts.values()]
for volume_or_mount in volumes_list + volume_mounts_list:
if volume_or_mount['name'] != 'airflow-config':
if volume_or_mount['name'] not in ['airflow-config', 'airflow-local-settings']:
self.assertNotIn(
'subPath', volume_or_mount,
"subPath shouldn't be defined"
Expand Down Expand Up @@ -635,6 +638,186 @@ def test_worker_container_dags(self):
self.assertEqual(0, len(dag_volume_mount))
self.assertEqual(0, len(init_containers))

def test_set_airflow_config_configmap(self):
"""
Test that airflow.cfg can be set via configmap by
checking volume & volume-mounts are set correctly.
"""
self.kube_config.airflow_home = '/usr/local/airflow'
self.kube_config.airflow_configmap = 'airflow-configmap'
self.kube_config.airflow_local_settings_configmap = None
self.kube_config.dags_folder = '/workers/path/to/dags'

worker_config = WorkerConfiguration(self.kube_config)
kube_executor_config = KubernetesExecutorConfig(annotations=[],
volumes=[],
volume_mounts=[])

pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
"test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
kube_executor_config)

airflow_config_volume = [
volume for volume in pod.volumes if volume["name"] == 'airflow-config'
]
# Test that volume_name is found
self.assertEqual(1, len(airflow_config_volume))

# Test that config map exists
self.assertEqual(
{'configMap': {'name': 'airflow-configmap'}, 'name': 'airflow-config'},
airflow_config_volume[0]
)

# Test that only 1 Volume Mounts exists with 'airflow-config' name
# One for airflow.cfg
volume_mounts = [
volume_mount for volume_mount in pod.volume_mounts
if volume_mount['name'] == 'airflow-config'
]

self.assertEqual([
{
'mountPath': '/usr/local/airflow/airflow.cfg',
'name': 'airflow-config',
'readOnly': True,
'subPath': 'airflow.cfg',
}
],
volume_mounts
)

def test_set_airflow_local_settings_configmap(self):
"""
Test that airflow_local_settings.py can be set via configmap by
checking volume & volume-mounts are set correctly.
"""
self.kube_config.airflow_home = '/usr/local/airflow'
self.kube_config.airflow_configmap = 'airflow-configmap'
self.kube_config.airflow_local_settings_configmap = 'airflow-configmap'
self.kube_config.dags_folder = '/workers/path/to/dags'

worker_config = WorkerConfiguration(self.kube_config)
kube_executor_config = KubernetesExecutorConfig(annotations=[],
volumes=[],
volume_mounts=[])

pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
"test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
kube_executor_config)

airflow_config_volume = [
volume for volume in pod.volumes if volume["name"] == 'airflow-config'
]
# Test that volume_name is found
self.assertEqual(1, len(airflow_config_volume))

# Test that config map exists
self.assertEqual(
{'configMap': {'name': 'airflow-configmap'}, 'name': 'airflow-config'},
airflow_config_volume[0]
)

# Test that 2 Volume Mounts exists and has 2 different mount-paths
# One for airflow.cfg
# Second for airflow_local_settings.py
volume_mounts = [
volume_mount for volume_mount in pod.volume_mounts
if volume_mount['name'] == 'airflow-config'
]
self.assertEqual(2, len(volume_mounts))

self.assertCountEqual(
[
{
'mountPath': '/usr/local/airflow/airflow.cfg',
'name': 'airflow-config',
'readOnly': True,
'subPath': 'airflow.cfg',
},
{
'mountPath': '/usr/local/airflow/config/airflow_local_settings.py',
'name': 'airflow-config',
'readOnly': True,
'subPath': 'airflow_local_settings.py',
}
],
volume_mounts
)

def test_set_airflow_configmap_different_for_local_setting(self):
"""
Test that airflow_local_settings.py can be set via configmap by
checking volume & volume-mounts are set correctly.
"""
self.kube_config.airflow_home = '/usr/local/airflow'
self.kube_config.airflow_configmap = 'airflow-configmap'
self.kube_config.airflow_local_settings_configmap = 'airflow-ls-configmap'
self.kube_config.dags_folder = '/workers/path/to/dags'

worker_config = WorkerConfiguration(self.kube_config)
kube_executor_config = KubernetesExecutorConfig(annotations=[],
volumes=[],
volume_mounts=[])

pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
"test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
kube_executor_config)

airflow_local_settings_volume = [
volume for volume in pod.volumes if volume["name"] == 'airflow-local-settings'
]
# Test that volume_name is found
self.assertEqual(1, len(airflow_local_settings_volume))

# Test that config map exists
self.assertEqual(
[{'configMap': {'name': 'airflow-ls-configmap'}, 'name': 'airflow-local-settings'}],
airflow_local_settings_volume
)

# Test that 2 Volume Mounts exists and has 2 different mount-paths
# One for airflow.cfg
# Second for airflow_local_settings.py
airflow_cfg_volume_mount = [
volume_mount for volume_mount in pod.volume_mounts
if volume_mount['name'] == 'airflow-config'
]

local_setting_volume_mount = [
volume_mount for volume_mount in pod.volume_mounts
if volume_mount['name'] == 'airflow-local-settings'
]
self.assertEqual(1, len(airflow_cfg_volume_mount))
self.assertEqual(1, len(local_setting_volume_mount))

self.assertEqual(
[
{
'mountPath': '/usr/local/airflow/config/airflow_local_settings.py',
'name': 'airflow-local-settings',
'readOnly': True,
'subPath': 'airflow_local_settings.py',
}
],
local_setting_volume_mount
)

print(airflow_cfg_volume_mount)

self.assertEqual(
[
{
'mountPath': '/usr/local/airflow/airflow.cfg',
'name': 'airflow-config',
'readOnly': True,
'subPath': 'airflow.cfg',
}
],
airflow_cfg_volume_mount
)


def test_kubernetes_environment_variables(self):
# Tests the kubernetes environment variables get copied into the worker pods
input_environment = {
Expand Down

0 comments on commit ba2217b

Please sign in to comment.