fix: skip transiently missing serialized DAGs instead of bulk-failing tasks#62878
fix: skip transiently missing serialized DAGs instead of bulk-failing tasks#62878YoannAbriel wants to merge 2 commits intoapache:mainfrom
Conversation
c0bc313 to
c38181d
Compare
potiuk
left a comment
There was a problem hiding this comment.
LGTM. But I would like others who are better versed in scheduler to take a look @ashb @kaxil @ephraimbuddy
c38181d to
6202b33
Compare
| .values(state=TaskInstanceState.FAILED) | ||
| .execution_options(synchronize_session="fetch") | ||
| ) | ||
| starved_dags.add(dag_id) |
There was a problem hiding this comment.
If a DAG is permanently deleted (not just transiently missing), tasks will stay SCHEDULED forever and this warning will fire every scheduler iteration. Would it be worth tracking consecutive misses per dag_id and escalating to failure after N iterations? The issue thread mentioned this approach too. At minimum, this should probably be documented as a known limitation in the PR description so follow-up work can address it.
| if not serialized_dag: | ||
| self.log.error( | ||
| "DAG '%s' for task instance %s not found in serialized_dag table", | ||
| self.log.warning( |
There was a problem hiding this comment.
When a batch has multiple TIs for the same DAG, each one will hit get_dag_for_run and log this warning individually before the starved_dags filter kicks in on the next query iteration. Consider checking if dag_id in starved_dags: continue before entering the has_task_concurrency_limits block, both to avoid redundant DB lookups and to reduce log noise.
| assert all(ti.state == State.FAILED for ti in tis) | ||
| assert all(ti.state == State.SCHEDULED for ti in tis) | ||
|
|
||
| def test_missing_serialized_dag_does_not_bulk_fail_tasks(self, dag_maker, session): |
There was a problem hiding this comment.
This test covers the same scenario as test_queued_task_instances_skips_with_missing_dag above (mock get_dag_for_run to return None, assert tasks stay SCHEDULED). The only differences are the dag_id string, max_active_tis_per_dag value (1 vs 2), and the assertion style. I'd pick one or the other rather than keeping both.
| self.job_runner = SchedulerJobRunner(job=scheduler_job) | ||
|
|
||
| # Simulate serialized DAG being transiently missing | ||
| self.job_runner.scheduler_dag_bag = mock.MagicMock() |
There was a problem hiding this comment.
mock.MagicMock() without spec won't catch typos if someone later renames get_dag_for_run. Consider mock.MagicMock(spec=DBDagBag) (same applies to the existing test on line 1813, but that's pre-existing).
6202b33 to
ca70fe5
Compare
|
static checks are failing |
4b2d28a to
4f3e5ae
Compare
ba501b0 to
b1580dc
Compare
…ransiently missing When the scheduler cannot find a DAG in the serialized_dag table while checking task concurrency limits, it previously set all SCHEDULED task instances for that DAG to FAILED via a bulk UPDATE. This caused intermittent mass task failures on multi-scheduler setups (e.g. MWAA) where the serialized DAG may be transiently absent during a DAG file parse cycle. Instead, treat the missing serialized DAG as a transient condition: log a warning, add the dag_id to starved_dags so subsequent tasks for the same DAG are also skipped, and let the scheduler retry on the next iteration. The existing test 'test_queued_task_instances_fails_with_missing_dag' has been updated to reflect the new expected behavior (tasks remain SCHEDULED). A new regression test 'test_missing_serialized_dag_does_not_bulk_fail_tasks' explicitly verifies no bulk-failure occurs. Closes apache#62050
b1580dc to
d7c6064
Compare
Problem
On multi-scheduler Airflow deployments (e.g. AWS MWAA with 4+ schedulers), tasks intermittently fail in bulk with no apparent cause, but succeed on manual retry. The root issue is in
_executable_task_instances_to_queued: when the scheduler checks task concurrency limits and cannot find the serialized DAG, it immediately bulk-fails all SCHEDULED task instances for that DAG via a raw SQL UPDATE. This is an overly aggressive response to what is often a transient race condition during DAG file parsing or serialization refresh cycles.Root Cause
In
scheduler_job_runner.py, the_executable_task_instances_to_queuedmethod loads the serialized DAG only when checking per-task or per-dagrun concurrency limits. Ifscheduler_dag_bag.get_dag_for_run()returnsNone(serialized DAG transiently absent), the code executed a bulkUPDATE task_instance SET state='failed'for all SCHEDULED tasks of that DAG — instead of treating it as a transient miss.Fix
Replace the bulk-fail with a graceful skip: when the serialized DAG is not found, log a
WARNING(down fromERROR), add thedag_idtostarved_dagsso the rest of its tasks are also skipped in this iteration, and let the scheduler retry naturally on the next heartbeat.Changes:
airflow-core/src/airflow/jobs/scheduler_job_runner.py: removed the bulkUPDATE … SET state=FAILEDblock; replaced withstarved_dags.add(dag_id)+ warning log.airflow-core/tests/unit/jobs/test_scheduler_job.py:test_queued_task_instances_fails_with_missing_dag→test_queued_task_instances_skips_with_missing_dagand updated assertions to expectSCHEDULEDstate (notFAILED).test_missing_serialized_dag_does_not_bulk_fail_tasks(based on the reproducer from the issue) that explicitly asserts tasks remainSCHEDULEDwhen the serialized DAG is transiently missing.Both new/updated tests pass. The full
test_executable_task_instancessuite (24 tests) passes with no regressions.Closes: #62050
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code following the guidelines
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.