Skip to content

Bugfix/fix dagrun starvation#64109

Open
Nataneljpwd wants to merge 12 commits intoapache:mainfrom
Nataneljpwd:bugfix/fix-dagrun-starvation
Open

Bugfix/fix dagrun starvation#64109
Nataneljpwd wants to merge 12 commits intoapache:mainfrom
Nataneljpwd:bugfix/fix-dagrun-starvation

Conversation

@Nataneljpwd
Copy link
Contributor

We have been experiencing severe dagrun starvation at our cluster, where when there were a lot of dagruns, and a low max_active_runs limit (hundreds to thousands runs with a limit in the 10s) this caused a lot of dags to get stuck in queued state without moving to running, causing those dagruns to timeout.
After investigation, we found that the reason was due to the _start_queued_dagruns method, where the query was returning dagruns which cannot be set to running due to the max_active_runs limit, meaning that other dagruns where starved.

A similar issue occurs when new dagruns are created in large batches (due to the nulls first), yet this is out of scope for the given pr, I will submit an additional PR soon.

closes #49508


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Mar 23, 2026
@Nataneljpwd Nataneljpwd marked this pull request as draft March 23, 2026 16:48
@collinmcnulty
Copy link
Contributor

I'm struggling to understand how the problem has been solved from reading the code. Can you explain your solution for preventing the starvation?

@Nataneljpwd
Copy link
Contributor Author

I'm struggling to understand how the problem has been solved from reading the code. Can you explain your solution for preventing the starvation?

Sure,
Instead of (as of now) querying N first runs, and then filtering on the max active runs, we query the first N runs where we (in SQL) check the the max active runs (before the limit is applied)
And so we skip a lot of runs which cannot be scheduled

Assume dags a, b
a - 3 max active runs
b - no limit (default to 16 from config)
If now the query result looked like so (small letter is schedulable, capital letter is schedulable according to ) where each row represents a run (the - determine the limit, all runs before the - are selected, all other are ignored) where the max dagruns to schedule per loop (the limit) is 5

A
A
A
a
a

B
B
B

Here (as of now) the last 3 dagruns are ommitted and ignored (starving runs from b)

After the change it will look like so:

A
A
A
B
B

B

Now we do schedule everything we queried without dagruns from a limiting us (the limit now becomes the max dagruns per loop to schedule configuration) and it is guaranteed that the runs queried will be able to run

Hope this explained it, if anything is not clear feel free to let me know, I will write a better explanation.

@Nataneljpwd Nataneljpwd force-pushed the bugfix/fix-dagrun-starvation branch from 6b09818 to fe5462d Compare March 23, 2026 19:21
@Nataneljpwd Nataneljpwd marked this pull request as ready for review March 23, 2026 21:01
@eladkal eladkal requested a review from kaxil March 24, 2026 10:05
Comment on lines -1916 to -1923
if dag_model.exceeds_max_non_backfill:
self.log.warning(
"Dag run cannot be created; max active runs exceeded.",
dag_id=dag_model.dag_id,
max_active_runs=dag_model.max_active_runs,
active_runs=active_runs_of_dags.get(dag_model.dag_id),
)
continue
Copy link
Contributor

@eladkal eladkal Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it right to remove this check?
Backfill has a special case as explained in the docs
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/backfill.html#concurrency-control

You can set max_active_runs on a backfill and it will control how many Dag runs in the backfill can run concurrently. Backfill max_active_runs is applied independently the Dag max_active_runs setting.

The idea is to give the backfill run the power to limit concurrency even if the scheduler can schedule it all at once.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is checked in the window query added, and so the data that gets here is after the query validated the non backfill runs

@kaxil kaxil requested a review from Copilot March 24, 2026 12:15
@kaxil kaxil added this to the Airflow 3.2.1 milestone Mar 24, 2026
@kaxil
Copy link
Member

kaxil commented Mar 24, 2026

This will have to wait until 3.2.0 -- This touches the core and I don't want to hurry until 3.2.0 is out.

We have 1200+ commits in 3.2.0

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses scheduler DAG run starvation by changing how queued DagRuns are selected for transition to RUNNING when max_active_runs is low and there are large backlogs of queued runs.

Changes:

  • Update DagRun.get_queued_dag_runs_to_set_running() to limit queued candidates per DAG/backfill using a row_number() window so the scheduler doesn’t spend its per-loop budget examining non-runnable runs from a single DAG.
  • Adjust scheduler unit tests to validate the improved fairness/scheduling behavior under backlog conditions.
  • Remove a max-active guard/logging in _create_dag_runs() and add a TODO note around _set_exceeds_max_active_runs.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

File Description
airflow-core/src/airflow/models/dagrun.py Changes queued dagrun selection query to avoid starvation by limiting per DAG/backfill candidates.
airflow-core/src/airflow/jobs/scheduler_job_runner.py Removes a creation-time max-active skip and adds a TODO comment near _set_exceeds_max_active_runs.
airflow-core/tests/unit/jobs/test_scheduler_job.py Updates/reshapes scheduling tests to assert non-starved behavior and updated run counts.

Comment on lines +5916 to +5921
# now we finish all lower priority backfill tasks, and observe new higher priority tasks are started
session.execute(
update(DagRun)
.where(DagRun.dag_id == "test_dag2", DagRun.state == DagRunState.RUNNING)
.values(state=DagRunState.SUCCESS)
)
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment says you’re finishing “lower priority backfill tasks”, but the code updates DAG runs for test_dag2 (non-backfill) from RUNNING→SUCCESS. Please adjust the comment to match what the test is actually doing (finishing test_dag2 runs) to avoid confusion when maintaining the test expectations.

Copilot uses AI. Check for mistakes.
Comment on lines +664 to +667
.over(partition_by=[DagRun.dag_id, DagRun.backfill_id], order_by=DagRun.logical_date)
.label("rn"),
)
.where(DagRun.state == DagRunState.QUEUED)
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

available_dagruns_rn uses row_number(... order_by=DagRun.logical_date), but logical_date is nullable and does not reflect the scheduler’s actual priority ordering (run_after, and for backfills BackfillDagRun.sort_ordinal, which can differ e.g. reverse backfills). This can cause the rn-capacity filter to pick a non-runnable/future run_after row (or wrong backfill ordinal) as rn=1 and then filter out runnable candidates, reintroducing starvation or breaking backfill ordering. Consider basing the window order_by on run_after with a deterministic tiebreaker (e.g. id), and for backfills incorporate BackfillDagRun.sort_ordinal (likely via computing the row_number after joining that table), and/or apply run_after <= now() in the window subquery so rn is computed only among runnable queued runs.

Suggested change
.over(partition_by=[DagRun.dag_id, DagRun.backfill_id], order_by=DagRun.logical_date)
.label("rn"),
)
.where(DagRun.state == DagRunState.QUEUED)
.over(
partition_by=[DagRun.dag_id, DagRun.backfill_id],
order_by=[BackfillDagRun.sort_ordinal, DagRun.run_after, DagRun.id],
)
.label("rn"),
)
.join(
BackfillDagRun,
DagRun.backfill_id == BackfillDagRun.id,
isouter=True,
)
.where(
and_(
DagRun.state == DagRunState.QUEUED,
DagRun.run_after <= func.now(),
)
)

Copilot uses AI. Check for mistakes.
Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The core idea — using row_number() to cap how many queued runs per DAG/backfill pass through the query — is a reasonable approach to solving the starvation problem in get_queued_dag_runs_to_set_running. However, this PR bundles in unrelated and incorrect changes to the DagRun creation path (_create_dag_runs), which is a separate concern from the QUEUED→RUNNING promotion path.

See inline comments for details.

@Nataneljpwd
Copy link
Contributor Author

This will have to wait until 3.2.0 -- This touches the core and I don't want to hurry until 3.2.0 is out.

We have 1200+ commits in 3.2.0

Sure, no problem

@eladkal eladkal added the type:bug-fix Changelog: Bug Fixes label Mar 24, 2026
@Nataneljpwd Nataneljpwd force-pushed the bugfix/fix-dagrun-starvation branch from a9c77cd to c47e216 Compare March 24, 2026 19:35
@Nataneljpwd
Copy link
Contributor Author

@kaxil, @eladkal I have addressed the review comments and would appreciate another review

@Nataneljpwd Nataneljpwd requested review from eladkal and kaxil March 24, 2026 20:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

DAG Runs from a single DAG can prevent scheduler from seeing other DAG's runs

5 participants