diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index bda95bbf4b06a..e21cf7e4d13f8 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -769,19 +769,17 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) - serialized_dag = self.scheduler_dag_bag.get_dag_for_run( dag_run=task_instance.dag_run, session=session ) - # If the dag is missing, fail the task and continue to the next task. + # If the dag is transiently missing, skip scheduling it this iteration + # and try again next time instead of bulk-failing all scheduled tasks. + # See: https://github.com/apache/airflow/issues/62050 if not serialized_dag: - self.log.error( - "DAG '%s' for task instance %s not found in serialized_dag table", + self.log.warning( + "DAG '%s' for task instance %s not found in serialized_dag table, " + "skipping scheduling for this iteration and will retry next time", dag_id, task_instance, ) - session.execute( - update(TI) - .where(TI.dag_id == dag_id, TI.state == TaskInstanceState.SCHEDULED) - .values(state=TaskInstanceState.FAILED) - .execution_options(synchronize_session="fetch") - ) + starved_dags.add(dag_id) continue task_concurrency_limit: int | None = None diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index f9ae60a9c3583..9e10a45ea1c35 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -69,6 +69,7 @@ from airflow.models.callback import ExecutorCallback from airflow.models.dag import DagModel, get_last_dagrun, infer_automated_data_interval from airflow.models.dag_version import DagVersion +from airflow.models.dagbag import DBDagBag from airflow.models.dagbundle import DagBundleModel from airflow.models.dagrun import DagRun from airflow.models.dagwarning import DagWarning @@ -1806,8 +1807,14 @@ def test_find_executable_task_instances_in_default_pool(self, dag_maker, mock_ex session.rollback() session.close() - def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session): - """Check that task instances of missing DAGs are failed""" + def test_queued_task_instances_skips_with_missing_dag(self, dag_maker, session): + """Check that task instances of transiently missing DAGs are skipped, not bulk-failed. + + When the serialized DAG is transiently missing (e.g. during a DAG file parse cycle), + the scheduler should skip scheduling for that DAG in the current iteration and retry + next time, rather than bulk-failing all SCHEDULED task instances. + See: https://github.com/apache/airflow/issues/62050 + """ dag_id = "SchedulerJobTest.test_find_executable_task_instances_not_in_dagbag" task_id_1 = "dummy" task_id_2 = "dummydummy" @@ -1819,7 +1826,7 @@ def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session): scheduler_job = Job() self.job_runner = SchedulerJobRunner(job=scheduler_job) - self.job_runner.scheduler_dag_bag = mock.MagicMock() + self.job_runner.scheduler_dag_bag = mock.MagicMock(spec=DBDagBag) self.job_runner.scheduler_dag_bag.get_dag_for_run.return_value = None dr = dag_maker.create_dagrun(state=DagRunState.RUNNING) @@ -1831,10 +1838,12 @@ def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session): session.flush() res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session) session.flush() + # No tasks should be queued assert len(res) == 0 + # Tasks should remain SCHEDULED (not be bulk-failed) tis = dr.get_task_instances(session=session) assert len(tis) == 2 - assert all(ti.state == State.FAILED for ti in tis) + assert all(ti.state == State.SCHEDULED for ti in tis) def test_nonexistent_pool(self, dag_maker): dag_id = "SchedulerJobTest.test_nonexistent_pool"