Conversation
|
I'm struggling to understand how the problem has been solved from reading the code. Can you explain your solution for preventing the starvation? |
Sure, Assume dags a, b A
|
6b09818 to
fe5462d
Compare
| if dag_model.exceeds_max_non_backfill: | ||
| self.log.warning( | ||
| "Dag run cannot be created; max active runs exceeded.", | ||
| dag_id=dag_model.dag_id, | ||
| max_active_runs=dag_model.max_active_runs, | ||
| active_runs=active_runs_of_dags.get(dag_model.dag_id), | ||
| ) | ||
| continue |
There was a problem hiding this comment.
is it right to remove this check?
Backfill has a special case as explained in the docs
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/backfill.html#concurrency-control
You can set max_active_runs on a backfill and it will control how many Dag runs in the backfill can run concurrently. Backfill max_active_runs is applied independently the Dag max_active_runs setting.
The idea is to give the backfill run the power to limit concurrency even if the scheduler can schedule it all at once.
There was a problem hiding this comment.
It is checked in the window query added, and so the data that gets here is after the query validated the non backfill runs
|
This will have to wait until 3.2.0 -- This touches the core and I don't want to hurry until 3.2.0 is out. We have 1200+ commits in 3.2.0 |
There was a problem hiding this comment.
Pull request overview
This PR addresses scheduler DAG run starvation by changing how queued DagRuns are selected for transition to RUNNING when max_active_runs is low and there are large backlogs of queued runs.
Changes:
- Update
DagRun.get_queued_dag_runs_to_set_running()to limit queued candidates per DAG/backfill using arow_number()window so the scheduler doesn’t spend its per-loop budget examining non-runnable runs from a single DAG. - Adjust scheduler unit tests to validate the improved fairness/scheduling behavior under backlog conditions.
- Remove a max-active guard/logging in
_create_dag_runs()and add a TODO note around_set_exceeds_max_active_runs.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
airflow-core/src/airflow/models/dagrun.py |
Changes queued dagrun selection query to avoid starvation by limiting per DAG/backfill candidates. |
airflow-core/src/airflow/jobs/scheduler_job_runner.py |
Removes a creation-time max-active skip and adds a TODO comment near _set_exceeds_max_active_runs. |
airflow-core/tests/unit/jobs/test_scheduler_job.py |
Updates/reshapes scheduling tests to assert non-starved behavior and updated run counts. |
| # now we finish all lower priority backfill tasks, and observe new higher priority tasks are started | ||
| session.execute( | ||
| update(DagRun) | ||
| .where(DagRun.dag_id == "test_dag2", DagRun.state == DagRunState.RUNNING) | ||
| .values(state=DagRunState.SUCCESS) | ||
| ) |
There was a problem hiding this comment.
This comment says you’re finishing “lower priority backfill tasks”, but the code updates DAG runs for test_dag2 (non-backfill) from RUNNING→SUCCESS. Please adjust the comment to match what the test is actually doing (finishing test_dag2 runs) to avoid confusion when maintaining the test expectations.
| .over(partition_by=[DagRun.dag_id, DagRun.backfill_id], order_by=DagRun.logical_date) | ||
| .label("rn"), | ||
| ) | ||
| .where(DagRun.state == DagRunState.QUEUED) |
There was a problem hiding this comment.
available_dagruns_rn uses row_number(... order_by=DagRun.logical_date), but logical_date is nullable and does not reflect the scheduler’s actual priority ordering (run_after, and for backfills BackfillDagRun.sort_ordinal, which can differ e.g. reverse backfills). This can cause the rn-capacity filter to pick a non-runnable/future run_after row (or wrong backfill ordinal) as rn=1 and then filter out runnable candidates, reintroducing starvation or breaking backfill ordering. Consider basing the window order_by on run_after with a deterministic tiebreaker (e.g. id), and for backfills incorporate BackfillDagRun.sort_ordinal (likely via computing the row_number after joining that table), and/or apply run_after <= now() in the window subquery so rn is computed only among runnable queued runs.
| .over(partition_by=[DagRun.dag_id, DagRun.backfill_id], order_by=DagRun.logical_date) | |
| .label("rn"), | |
| ) | |
| .where(DagRun.state == DagRunState.QUEUED) | |
| .over( | |
| partition_by=[DagRun.dag_id, DagRun.backfill_id], | |
| order_by=[BackfillDagRun.sort_ordinal, DagRun.run_after, DagRun.id], | |
| ) | |
| .label("rn"), | |
| ) | |
| .join( | |
| BackfillDagRun, | |
| DagRun.backfill_id == BackfillDagRun.id, | |
| isouter=True, | |
| ) | |
| .where( | |
| and_( | |
| DagRun.state == DagRunState.QUEUED, | |
| DagRun.run_after <= func.now(), | |
| ) | |
| ) |
kaxil
left a comment
There was a problem hiding this comment.
The core idea — using row_number() to cap how many queued runs per DAG/backfill pass through the query — is a reasonable approach to solving the starvation problem in get_queued_dag_runs_to_set_running. However, this PR bundles in unrelated and incorrect changes to the DagRun creation path (_create_dag_runs), which is a separate concern from the QUEUED→RUNNING promotion path.
See inline comments for details.
Sure, no problem |
a9c77cd to
c47e216
Compare
We have been experiencing severe dagrun starvation at our cluster, where when there were a lot of dagruns, and a low max_active_runs limit (hundreds to thousands runs with a limit in the 10s) this caused a lot of dags to get stuck in queued state without moving to running, causing those dagruns to timeout.
After investigation, we found that the reason was due to the _start_queued_dagruns method, where the query was returning dagruns which cannot be set to running due to the max_active_runs limit, meaning that other dagruns where starved.
A similar issue occurs when new dagruns are created in large batches (due to the nulls first), yet this is out of scope for the given pr, I will submit an additional PR soon.
closes #49508
Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.