Skip to content

Commit

Permalink
Fix labels used to find queued KubeExecutor pods (apache#19904)
Browse files Browse the repository at this point in the history
We need to use the job_id used to queue the TI, not the current
schedulers job_id. These can differ naturally with HA schedulers and
with scheduler restarts (clearing "queued but not launched TIs" happens
before adoption).

(cherry picked from commit b80084a)
  • Loading branch information
jedcunningham committed Dec 1, 2021
1 parent b9581c0 commit 85cad14
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 2 deletions.
3 changes: 1 addition & 2 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,6 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
del self.last_handled[key]

for task in queued_tasks:

self.log.debug("Checking task %s", task)

# Check to see if we've handled it ourselves recently
Expand All @@ -474,7 +473,7 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
dict_string = "dag_id={},task_id={},airflow-worker={}".format(
pod_generator.make_safe_label_value(task.dag_id),
pod_generator.make_safe_label_value(task.task_id),
pod_generator.make_safe_label_value(str(self.scheduler_job_id)),
pod_generator.make_safe_label_value(str(task.queued_by_job_id)),
)
kwargs = dict(label_selector=dict_string)
if self.kube_config.kube_client_request_args:
Expand Down
55 changes: 55 additions & 0 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,61 @@ def test_pending_pod_timeout_multi_namespace_mode(
)
mock_delete_pod.assert_called_once_with('foo90', 'anothernamespace')

def test_clear_not_launched_queued_tasks_not_launched(self, dag_maker, create_dummy_dag, session):
"""If a pod isn't found for a TI, reset the state to scheduled"""
mock_kube_client = mock.MagicMock()
mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(items=[])

create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()

ti = dag_run.task_instances[0]
ti.state = State.QUEUED
ti.queued_by_job_id = 1
session.flush()

executor = self.kubernetes_executor
executor.kube_client = mock_kube_client
executor.clear_not_launched_queued_tasks(session=session)

ti.refresh_from_db()
assert ti.state == State.SCHEDULED
assert mock_kube_client.list_namespaced_pod.call_count == 2
mock_kube_client.list_namespaced_pod.assert_any_call(
"default", label_selector="dag_id=test_clear,task_id=task1,airflow-worker=1,run_id=test"
)
# also check that we fall back to execution_date if we didn't find the pod with run_id
execution_date_label = pod_generator.datetime_to_label_safe_datestring(ti.execution_date)
mock_kube_client.list_namespaced_pod.assert_called_with(
"default",
label_selector=(
f"dag_id=test_clear,task_id=task1,airflow-worker=1,execution_date={execution_date_label}"
),
)

def test_clear_not_launched_queued_tasks_launched(self, dag_maker, create_dummy_dag, session):
"""Leave the state alone if a pod already exists"""
mock_kube_client = mock.MagicMock()
mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(items=["something"])

create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()

ti = dag_run.task_instances[0]
ti.state = State.QUEUED
ti.queued_by_job_id = 1
session.flush()

executor = self.kubernetes_executor
executor.kube_client = mock_kube_client
executor.clear_not_launched_queued_tasks(session=session)

ti.refresh_from_db()
assert ti.state == State.QUEUED
mock_kube_client.list_namespaced_pod.assert_called_once_with(
"default", label_selector="dag_id=test_clear,task_id=task1,airflow-worker=1,run_id=test"
)


class TestKubernetesJobWatcher(unittest.TestCase):
def setUp(self):
Expand Down

0 comments on commit 85cad14

Please sign in to comment.