From c5efd71822541f8d596658b4bdf7dc9713edc601 Mon Sep 17 00:00:00 2001 From: Yoann Abriel Date: Wed, 4 Mar 2026 16:23:41 +0100 Subject: [PATCH 1/2] fix: skip scheduling instead of bulk-failing when serialized DAG is transiently missing When the scheduler cannot find a DAG in the serialized_dag table while checking task concurrency limits, it previously set all SCHEDULED task instances for that DAG to FAILED via a bulk UPDATE. This caused intermittent mass task failures on multi-scheduler setups (e.g. MWAA) where the serialized DAG may be transiently absent during a DAG file parse cycle. Instead, treat the missing serialized DAG as a transient condition: log a warning, add the dag_id to starved_dags so subsequent tasks for the same DAG are also skipped, and let the scheduler retry on the next iteration. The existing test 'test_queued_task_instances_fails_with_missing_dag' has been updated to reflect the new expected behavior (tasks remain SCHEDULED). A new regression test 'test_missing_serialized_dag_does_not_bulk_fail_tasks' explicitly verifies no bulk-failure occurs. Closes #62050 --- .../src/airflow/jobs/scheduler_job_runner.py | 16 +++--- .../tests/unit/jobs/test_scheduler_job.py | 55 ++++++++++++++++++- 2 files changed, 59 insertions(+), 12 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index bda95bbf4b06a..e21cf7e4d13f8 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -769,19 +769,17 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) - serialized_dag = self.scheduler_dag_bag.get_dag_for_run( dag_run=task_instance.dag_run, session=session ) - # If the dag is missing, fail the task and continue to the next task. + # If the dag is transiently missing, skip scheduling it this iteration + # and try again next time instead of bulk-failing all scheduled tasks. + # See: https://github.com/apache/airflow/issues/62050 if not serialized_dag: - self.log.error( - "DAG '%s' for task instance %s not found in serialized_dag table", + self.log.warning( + "DAG '%s' for task instance %s not found in serialized_dag table, " + "skipping scheduling for this iteration and will retry next time", dag_id, task_instance, ) - session.execute( - update(TI) - .where(TI.dag_id == dag_id, TI.state == TaskInstanceState.SCHEDULED) - .values(state=TaskInstanceState.FAILED) - .execution_options(synchronize_session="fetch") - ) + starved_dags.add(dag_id) continue task_concurrency_limit: int | None = None diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index f9ae60a9c3583..12f5d0a5f34fa 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -1806,8 +1806,14 @@ def test_find_executable_task_instances_in_default_pool(self, dag_maker, mock_ex session.rollback() session.close() - def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session): - """Check that task instances of missing DAGs are failed""" + def test_queued_task_instances_skips_with_missing_dag(self, dag_maker, session): + """Check that task instances of transiently missing DAGs are skipped, not bulk-failed. + + When the serialized DAG is transiently missing (e.g. during a DAG file parse cycle), + the scheduler should skip scheduling for that DAG in the current iteration and retry + next time, rather than bulk-failing all SCHEDULED task instances. + See: https://github.com/apache/airflow/issues/62050 + """ dag_id = "SchedulerJobTest.test_find_executable_task_instances_not_in_dagbag" task_id_1 = "dummy" task_id_2 = "dummydummy" @@ -1831,10 +1837,53 @@ def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session): session.flush() res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session) session.flush() + # No tasks should be queued assert len(res) == 0 + # Tasks should remain SCHEDULED (not be bulk-failed) tis = dr.get_task_instances(session=session) assert len(tis) == 2 - assert all(ti.state == State.FAILED for ti in tis) + assert all(ti.state == State.SCHEDULED for ti in tis) + + def test_missing_serialized_dag_does_not_bulk_fail_tasks(self, dag_maker, session): + """Regression test for https://github.com/apache/airflow/issues/62050 + + When the serialized DAG is transiently missing, all SCHEDULED task instances for the DAG + should remain SCHEDULED so the scheduler can pick them up in the next iteration. + Previously, the scheduler would bulk-fail all SCHEDULED tasks when it couldn't find the + serialized DAG in the DagBag. + """ + dag_id = "SchedulerJobTest.test_missing_serialized_dag_bulk_fails" + + with dag_maker(dag_id=dag_id, session=session, default_args={"max_active_tis_per_dag": 2}): + EmptyOperator(task_id="task_a") + EmptyOperator(task_id="task_b") + + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + # Simulate serialized DAG being transiently missing + self.job_runner.scheduler_dag_bag = mock.MagicMock() + self.job_runner.scheduler_dag_bag.get_dag_for_run.return_value = None + + dr = dag_maker.create_dagrun(state=DagRunState.RUNNING) + for ti in dr.task_instances: + ti.state = State.SCHEDULED + session.merge(ti) + session.flush() + + res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session) + session.flush() + + # No tasks should be queued since serialized DAG is missing + assert len(res) == 0 + # All tasks should remain SCHEDULED — not FAILED + tis = dr.get_task_instances(session=session) + assert len(tis) == 2 + for ti in tis: + assert ti.state == State.SCHEDULED, ( + f"Task {ti.task_id} was {ti.state} but expected SCHEDULED. " + "The scheduler should not bulk-fail tasks when serialized DAG is transiently missing." + ) def test_nonexistent_pool(self, dag_maker): dag_id = "SchedulerJobTest.test_nonexistent_pool" From d7c6064f0718ddbb45cbd680a43a0ebc15eeff53 Mon Sep 17 00:00:00 2001 From: Yoann Abriel Date: Tue, 24 Mar 2026 23:03:52 +0100 Subject: [PATCH 2/2] Address review: add spec to MagicMock, remove redundant test --- .../tests/unit/jobs/test_scheduler_job.py | 44 +------------------ 1 file changed, 2 insertions(+), 42 deletions(-) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 12f5d0a5f34fa..9e10a45ea1c35 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -69,6 +69,7 @@ from airflow.models.callback import ExecutorCallback from airflow.models.dag import DagModel, get_last_dagrun, infer_automated_data_interval from airflow.models.dag_version import DagVersion +from airflow.models.dagbag import DBDagBag from airflow.models.dagbundle import DagBundleModel from airflow.models.dagrun import DagRun from airflow.models.dagwarning import DagWarning @@ -1825,7 +1826,7 @@ def test_queued_task_instances_skips_with_missing_dag(self, dag_maker, session): scheduler_job = Job() self.job_runner = SchedulerJobRunner(job=scheduler_job) - self.job_runner.scheduler_dag_bag = mock.MagicMock() + self.job_runner.scheduler_dag_bag = mock.MagicMock(spec=DBDagBag) self.job_runner.scheduler_dag_bag.get_dag_for_run.return_value = None dr = dag_maker.create_dagrun(state=DagRunState.RUNNING) @@ -1844,47 +1845,6 @@ def test_queued_task_instances_skips_with_missing_dag(self, dag_maker, session): assert len(tis) == 2 assert all(ti.state == State.SCHEDULED for ti in tis) - def test_missing_serialized_dag_does_not_bulk_fail_tasks(self, dag_maker, session): - """Regression test for https://github.com/apache/airflow/issues/62050 - - When the serialized DAG is transiently missing, all SCHEDULED task instances for the DAG - should remain SCHEDULED so the scheduler can pick them up in the next iteration. - Previously, the scheduler would bulk-fail all SCHEDULED tasks when it couldn't find the - serialized DAG in the DagBag. - """ - dag_id = "SchedulerJobTest.test_missing_serialized_dag_bulk_fails" - - with dag_maker(dag_id=dag_id, session=session, default_args={"max_active_tis_per_dag": 2}): - EmptyOperator(task_id="task_a") - EmptyOperator(task_id="task_b") - - scheduler_job = Job() - self.job_runner = SchedulerJobRunner(job=scheduler_job) - - # Simulate serialized DAG being transiently missing - self.job_runner.scheduler_dag_bag = mock.MagicMock() - self.job_runner.scheduler_dag_bag.get_dag_for_run.return_value = None - - dr = dag_maker.create_dagrun(state=DagRunState.RUNNING) - for ti in dr.task_instances: - ti.state = State.SCHEDULED - session.merge(ti) - session.flush() - - res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session) - session.flush() - - # No tasks should be queued since serialized DAG is missing - assert len(res) == 0 - # All tasks should remain SCHEDULED — not FAILED - tis = dr.get_task_instances(session=session) - assert len(tis) == 2 - for ti in tis: - assert ti.state == State.SCHEDULED, ( - f"Task {ti.task_id} was {ti.state} but expected SCHEDULED. " - "The scheduler should not bulk-fail tasks when serialized DAG is transiently missing." - ) - def test_nonexistent_pool(self, dag_maker): dag_id = "SchedulerJobTest.test_nonexistent_pool" with dag_maker(dag_id=dag_id, max_active_tasks=16):