Skip to content

fix: skip transiently missing serialized DAGs instead of bulk-failing tasks#62878

Open
YoannAbriel wants to merge 2 commits intoapache:mainfrom
YoannAbriel:fix/issue-62050
Open

fix: skip transiently missing serialized DAGs instead of bulk-failing tasks#62878
YoannAbriel wants to merge 2 commits intoapache:mainfrom
YoannAbriel:fix/issue-62050

Conversation

@YoannAbriel
Copy link
Contributor

Problem

On multi-scheduler Airflow deployments (e.g. AWS MWAA with 4+ schedulers), tasks intermittently fail in bulk with no apparent cause, but succeed on manual retry. The root issue is in _executable_task_instances_to_queued: when the scheduler checks task concurrency limits and cannot find the serialized DAG, it immediately bulk-fails all SCHEDULED task instances for that DAG via a raw SQL UPDATE. This is an overly aggressive response to what is often a transient race condition during DAG file parsing or serialization refresh cycles.

Root Cause

In scheduler_job_runner.py, the _executable_task_instances_to_queued method loads the serialized DAG only when checking per-task or per-dagrun concurrency limits. If scheduler_dag_bag.get_dag_for_run() returns None (serialized DAG transiently absent), the code executed a bulk UPDATE task_instance SET state='failed' for all SCHEDULED tasks of that DAG — instead of treating it as a transient miss.

Fix

Replace the bulk-fail with a graceful skip: when the serialized DAG is not found, log a WARNING (down from ERROR), add the dag_id to starved_dags so the rest of its tasks are also skipped in this iteration, and let the scheduler retry naturally on the next heartbeat.

Changes:

  • airflow-core/src/airflow/jobs/scheduler_job_runner.py: removed the bulk UPDATE … SET state=FAILED block; replaced with starved_dags.add(dag_id) + warning log.
  • airflow-core/tests/unit/jobs/test_scheduler_job.py:
    • Renamed test_queued_task_instances_fails_with_missing_dagtest_queued_task_instances_skips_with_missing_dag and updated assertions to expect SCHEDULED state (not FAILED).
    • Added new regression test test_missing_serialized_dag_does_not_bulk_fail_tasks (based on the reproducer from the issue) that explicitly asserts tasks remain SCHEDULED when the serialized DAG is transiently missing.

Both new/updated tests pass. The full test_executable_task_instances suite (24 tests) passes with no regressions.

Closes: #62050



Was generative AI tooling used to co-author this PR?
  • Yes (Claude Code)

Generated-by: Claude Code following the guidelines


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@YoannAbriel YoannAbriel requested review from XD-DENG and ashb as code owners March 4, 2026 15:25
@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Mar 4, 2026
@YoannAbriel YoannAbriel force-pushed the fix/issue-62050 branch 4 times, most recently from c0bc313 to c38181d Compare March 8, 2026 19:04
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. But I would like others who are better versed in scheduler to take a look @ashb @kaxil @ephraimbuddy

.values(state=TaskInstanceState.FAILED)
.execution_options(synchronize_session="fetch")
)
starved_dags.add(dag_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a DAG is permanently deleted (not just transiently missing), tasks will stay SCHEDULED forever and this warning will fire every scheduler iteration. Would it be worth tracking consecutive misses per dag_id and escalating to failure after N iterations? The issue thread mentioned this approach too. At minimum, this should probably be documented as a known limitation in the PR description so follow-up work can address it.

if not serialized_dag:
self.log.error(
"DAG '%s' for task instance %s not found in serialized_dag table",
self.log.warning(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a batch has multiple TIs for the same DAG, each one will hit get_dag_for_run and log this warning individually before the starved_dags filter kicks in on the next query iteration. Consider checking if dag_id in starved_dags: continue before entering the has_task_concurrency_limits block, both to avoid redundant DB lookups and to reduce log noise.

assert all(ti.state == State.FAILED for ti in tis)
assert all(ti.state == State.SCHEDULED for ti in tis)

def test_missing_serialized_dag_does_not_bulk_fail_tasks(self, dag_maker, session):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test covers the same scenario as test_queued_task_instances_skips_with_missing_dag above (mock get_dag_for_run to return None, assert tasks stay SCHEDULED). The only differences are the dag_id string, max_active_tis_per_dag value (1 vs 2), and the assertion style. I'd pick one or the other rather than keeping both.

self.job_runner = SchedulerJobRunner(job=scheduler_job)

# Simulate serialized DAG being transiently missing
self.job_runner.scheduler_dag_bag = mock.MagicMock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mock.MagicMock() without spec won't catch typos if someone later renames get_dag_for_run. Consider mock.MagicMock(spec=DBDagBag) (same applies to the existing test on line 1813, but that's pre-existing).

@eladkal
Copy link
Contributor

eladkal commented Mar 15, 2026

static checks are failing

@eladkal eladkal added this to the Airflow 3.1.9 milestone Mar 15, 2026
@eladkal eladkal added type:bug-fix Changelog: Bug Fixes backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch labels Mar 15, 2026
@YoannAbriel YoannAbriel force-pushed the fix/issue-62050 branch 6 times, most recently from 4b2d28a to 4f3e5ae Compare March 16, 2026 16:09
@YoannAbriel YoannAbriel force-pushed the fix/issue-62050 branch 2 times, most recently from ba501b0 to b1580dc Compare March 23, 2026 19:05
…ransiently missing

When the scheduler cannot find a DAG in the serialized_dag table while checking
task concurrency limits, it previously set all SCHEDULED task instances for that
DAG to FAILED via a bulk UPDATE. This caused intermittent mass task failures on
multi-scheduler setups (e.g. MWAA) where the serialized DAG may be transiently
absent during a DAG file parse cycle.

Instead, treat the missing serialized DAG as a transient condition: log a warning,
add the dag_id to starved_dags so subsequent tasks for the same DAG are also
skipped, and let the scheduler retry on the next iteration.

The existing test 'test_queued_task_instances_fails_with_missing_dag' has been
updated to reflect the new expected behavior (tasks remain SCHEDULED). A new
regression test 'test_missing_serialized_dag_does_not_bulk_fail_tasks' explicitly
verifies no bulk-failure occurs.

Closes apache#62050
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Scheduler bulk-fails all scheduled tasks when serialized DAG is transiently missing

5 participants