Skip to content

Commit 6202b33

Browse files
committed
fix: skip scheduling instead of bulk-failing when serialized DAG is transiently missing
When the scheduler cannot find a DAG in the serialized_dag table while checking task concurrency limits, it previously set all SCHEDULED task instances for that DAG to FAILED via a bulk UPDATE. This caused intermittent mass task failures on multi-scheduler setups (e.g. MWAA) where the serialized DAG may be transiently absent during a DAG file parse cycle. Instead, treat the missing serialized DAG as a transient condition: log a warning, add the dag_id to starved_dags so subsequent tasks for the same DAG are also skipped, and let the scheduler retry on the next iteration. The existing test 'test_queued_task_instances_fails_with_missing_dag' has been updated to reflect the new expected behavior (tasks remain SCHEDULED). A new regression test 'test_missing_serialized_dag_does_not_bulk_fail_tasks' explicitly verifies no bulk-failure occurs. Closes #62050
1 parent 75386ca commit 6202b33

2 files changed

Lines changed: 59 additions & 12 deletions

File tree

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -770,19 +770,17 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) -
770770
serialized_dag = self.scheduler_dag_bag.get_dag_for_run(
771771
dag_run=task_instance.dag_run, session=session
772772
)
773-
# If the dag is missing, fail the task and continue to the next task.
773+
# If the dag is transiently missing, skip scheduling it this iteration
774+
# and try again next time instead of bulk-failing all scheduled tasks.
775+
# See: https://github.com/apache/airflow/issues/62050
774776
if not serialized_dag:
775-
self.log.error(
776-
"DAG '%s' for task instance %s not found in serialized_dag table",
777+
self.log.warning(
778+
"DAG '%s' for task instance %s not found in serialized_dag table, "
779+
"skipping scheduling for this iteration and will retry next time",
777780
dag_id,
778781
task_instance,
779782
)
780-
session.execute(
781-
update(TI)
782-
.where(TI.dag_id == dag_id, TI.state == TaskInstanceState.SCHEDULED)
783-
.values(state=TaskInstanceState.FAILED)
784-
.execution_options(synchronize_session="fetch")
785-
)
783+
starved_dags.add(dag_id)
786784
continue
787785

788786
task_concurrency_limit: int | None = None

airflow-core/tests/unit/jobs/test_scheduler_job.py

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1797,8 +1797,14 @@ def test_find_executable_task_instances_in_default_pool(self, dag_maker, mock_ex
17971797
session.rollback()
17981798
session.close()
17991799

1800-
def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session):
1801-
"""Check that task instances of missing DAGs are failed"""
1800+
def test_queued_task_instances_skips_with_missing_dag(self, dag_maker, session):
1801+
"""Check that task instances of transiently missing DAGs are skipped, not bulk-failed.
1802+
1803+
When the serialized DAG is transiently missing (e.g. during a DAG file parse cycle),
1804+
the scheduler should skip scheduling for that DAG in the current iteration and retry
1805+
next time, rather than bulk-failing all SCHEDULED task instances.
1806+
See: https://github.com/apache/airflow/issues/62050
1807+
"""
18021808
dag_id = "SchedulerJobTest.test_find_executable_task_instances_not_in_dagbag"
18031809
task_id_1 = "dummy"
18041810
task_id_2 = "dummydummy"
@@ -1822,10 +1828,53 @@ def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session):
18221828
session.flush()
18231829
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
18241830
session.flush()
1831+
# No tasks should be queued
18251832
assert len(res) == 0
1833+
# Tasks should remain SCHEDULED (not be bulk-failed)
18261834
tis = dr.get_task_instances(session=session)
18271835
assert len(tis) == 2
1828-
assert all(ti.state == State.FAILED for ti in tis)
1836+
assert all(ti.state == State.SCHEDULED for ti in tis)
1837+
1838+
def test_missing_serialized_dag_does_not_bulk_fail_tasks(self, dag_maker, session):
1839+
"""Regression test for https://github.com/apache/airflow/issues/62050
1840+
1841+
When the serialized DAG is transiently missing, all SCHEDULED task instances for the DAG
1842+
should remain SCHEDULED so the scheduler can pick them up in the next iteration.
1843+
Previously, the scheduler would bulk-fail all SCHEDULED tasks when it couldn't find the
1844+
serialized DAG in the DagBag.
1845+
"""
1846+
dag_id = "SchedulerJobTest.test_missing_serialized_dag_bulk_fails"
1847+
1848+
with dag_maker(dag_id=dag_id, session=session, default_args={"max_active_tis_per_dag": 2}):
1849+
EmptyOperator(task_id="task_a")
1850+
EmptyOperator(task_id="task_b")
1851+
1852+
scheduler_job = Job()
1853+
self.job_runner = SchedulerJobRunner(job=scheduler_job)
1854+
1855+
# Simulate serialized DAG being transiently missing
1856+
self.job_runner.scheduler_dag_bag = mock.MagicMock()
1857+
self.job_runner.scheduler_dag_bag.get_dag_for_run.return_value = None
1858+
1859+
dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
1860+
for ti in dr.task_instances:
1861+
ti.state = State.SCHEDULED
1862+
session.merge(ti)
1863+
session.flush()
1864+
1865+
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
1866+
session.flush()
1867+
1868+
# No tasks should be queued since serialized DAG is missing
1869+
assert len(res) == 0
1870+
# All tasks should remain SCHEDULED — not FAILED
1871+
tis = dr.get_task_instances(session=session)
1872+
assert len(tis) == 2
1873+
for ti in tis:
1874+
assert ti.state == State.SCHEDULED, (
1875+
f"Task {ti.task_id} was {ti.state} but expected SCHEDULED. "
1876+
"The scheduler should not bulk-fail tasks when serialized DAG is transiently missing."
1877+
)
18291878

18301879
def test_nonexistent_pool(self, dag_maker):
18311880
dag_id = "SchedulerJobTest.test_nonexistent_pool"

0 commit comments

Comments
 (0)