-
Notifications
You must be signed in to change notification settings - Fork 16.8k
Bugfix/fix dagrun starvation #64109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Bugfix/fix dagrun starvation #64109
Changes from all commits
f922f2b
85e4c6a
dd6b74d
ecdb722
8356843
fe5462d
1c0c28f
c47e216
9c28d0d
9992724
1923f9f
c78e0a8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3289,6 +3289,94 @@ def test_runs_are_created_after_max_active_runs_was_reached(self, dag_maker, ses | |
| dag_runs = DagRun.find(dag_id=dag.dag_id, session=session) | ||
| assert len(dag_runs) == 2 | ||
|
|
||
| def test_runs_are_not_starved_by_max_active_runs_limit(self, dag_maker, session): | ||
| """ | ||
| Test that dagruns are not starved by max_active_runs | ||
| """ | ||
| scheduler_job = Job() | ||
| self.job_runner = SchedulerJobRunner(job=scheduler_job, executors=[self.null_exec]) | ||
|
|
||
| dag_ids = ["dag1", "dag2", "dag3"] | ||
|
|
||
| max_active_runs = 3 | ||
|
|
||
| for dag_id in dag_ids: | ||
| with dag_maker( | ||
| dag_id=dag_id, | ||
| max_active_runs=max_active_runs, | ||
| session=session, | ||
| catchup=True, | ||
| schedule=timedelta(seconds=60), | ||
| start_date=DEFAULT_DATE, | ||
| ): | ||
| # Need to use something that doesn't immediately get marked as success by the scheduler | ||
| BashOperator(task_id="task", bash_command="true") | ||
|
|
||
| dag_run = dag_maker.create_dagrun( | ||
| state=State.QUEUED, session=session, run_type=DagRunType.SCHEDULED | ||
| ) | ||
|
|
||
| for _ in range(50): | ||
| # create a bunch of dagruns in queued state, to make sure they are filtered by max_active_runs | ||
| dag_run = dag_maker.create_dagrun_after( | ||
| dag_run, run_type=DagRunType.SCHEDULED, state=State.QUEUED | ||
| ) | ||
|
|
||
| self.job_runner._start_queued_dagruns(session) | ||
| session.flush() | ||
|
|
||
| running_dagrun_count = session.scalar( | ||
| select(func.count()).select_from(DagRun).where(DagRun.state == DagRunState.RUNNING) | ||
| ) | ||
|
|
||
| assert running_dagrun_count == max_active_runs * len(dag_ids) | ||
|
|
||
| def test_no_more_dagruns_are_set_to_running_when_max_active_runs_exceeded(self, dag_maker, session): | ||
| """ | ||
| Test that dagruns are not moved to running if there are more than the max_active_runs running dagruns | ||
| """ | ||
| scheduler_job = Job() | ||
| self.job_runner = SchedulerJobRunner(job=scheduler_job, executors=[self.null_exec]) | ||
|
|
||
| max_active_runs = 1 | ||
| with dag_maker( | ||
| dag_id="test_dag", | ||
| max_active_runs=max_active_runs, | ||
| session=session, | ||
| catchup=True, | ||
| schedule=timedelta(seconds=60), | ||
| start_date=DEFAULT_DATE, | ||
| ): | ||
| # Need to use something that doesn't immediately get marked as success by the scheduler | ||
| BashOperator(task_id="task", bash_command="true") | ||
|
|
||
| dag_run = dag_maker.create_dagrun(state=State.RUNNING, session=session, run_type=DagRunType.SCHEDULED) | ||
|
|
||
| for _ in range(5): | ||
| # create a bunch of dagruns in queued state, to make sure they are filtered by max_active_runs | ||
| dag_run = dag_maker.create_dagrun_after( | ||
| dag_run, run_type=DagRunType.SCHEDULED, state=State.RUNNING | ||
| ) | ||
|
|
||
| running_dagruns_pre = session.scalar( | ||
| select(func.count()).select_from(DagRun).where(DagRun.state == DagRunState.RUNNING) | ||
| ) | ||
|
|
||
| for _ in range(5): | ||
| # create a bunch of dagruns in queued state, to make sure they are filtered by max_active_runs | ||
| dag_run = dag_maker.create_dagrun_after( | ||
| dag_run, run_type=DagRunType.SCHEDULED, state=State.QUEUED | ||
| ) | ||
|
|
||
| self.job_runner._start_queued_dagruns(session) | ||
| session.flush() | ||
|
|
||
| running_dagruns_post = session.scalar( | ||
| select(func.count()).select_from(DagRun).where(DagRun.state == DagRunState.RUNNING) | ||
| ) | ||
|
|
||
| assert running_dagruns_pre == running_dagruns_post | ||
|
|
||
| def test_dagrun_timeout_verify_max_active_runs(self, dag_maker, session): | ||
| """ | ||
| Test if a dagrun will not be scheduled if max_dag_runs | ||
|
|
@@ -5965,41 +6053,55 @@ def _running_counts(): | |
| EmptyOperator(task_id="mytask") | ||
|
|
||
| dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, state=State.QUEUED) | ||
| for _ in range(9): | ||
| for _ in range(29): | ||
| dr = dag_maker.create_dagrun_after(dr, run_type=DagRunType.SCHEDULED, state=State.QUEUED) | ||
|
|
||
| # initial state -- nothing is running | ||
| assert dag1_non_b_running == 0 | ||
| assert dag1_b_running == 0 | ||
| assert total_running == 0 | ||
| assert session.scalar(select(func.count(DagRun.id))) == 46 | ||
| assert session.scalar(select(func.count(DagRun.id))) == 66 | ||
| assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 36 | ||
|
|
||
| # now let's run it once | ||
| self.job_runner._start_queued_dagruns(session) | ||
| session.flush() | ||
|
|
||
| # after running the scheduler one time, observe that only one dag run is started | ||
| # this is because there are 30 runs for dag 1 so neither the backfills nor | ||
| # and 3 backfill dagruns are started | ||
| # this is because there are 30 dags, most of which get filtered due to max_active_runs | ||
| # and so due to the default dagruns to examine, we look at the first 20 dags which CAN be run | ||
| # according to the max_active_runs parameter, meaning 3 backfill runs will start, 1 non backfill and | ||
| # all dagruns of dag2 | ||
| # any runs for dag2 get started | ||
| assert DagRun.DEFAULT_DAGRUNS_TO_EXAMINE == 20 | ||
| dag1_non_b_running, dag1_b_running, total_running = _running_counts() | ||
| assert dag1_non_b_running == 1 | ||
| assert dag1_b_running == 0 | ||
| assert total_running == 1 | ||
| assert session.scalar(select(func.count()).select_from(DagRun)) == 46 | ||
| assert dag1_b_running == 3 | ||
| assert total_running == 20 | ||
| assert session.scalar(select(func.count()).select_from(DagRun)) == 66 | ||
| assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 36 | ||
| # now we finish all lower priority backfill tasks, and observe new higher priority tasks are started | ||
| session.execute( | ||
| update(DagRun) | ||
| .where(DagRun.dag_id == "test_dag2", DagRun.state == DagRunState.RUNNING) | ||
| .values(state=DagRunState.SUCCESS) | ||
| ) | ||
|
Comment on lines
+6084
to
+6089
|
||
| session.commit() | ||
| session.flush() | ||
|
|
||
| # we run scheduler again and observe that now all the runs are created | ||
| # other than the finished runs of the backfill | ||
| # this must be because sorting is working | ||
| # new tasks from test dag 2 should run, and so they are scheduled | ||
| self.job_runner._start_queued_dagruns(session) | ||
| session.flush() | ||
|
|
||
| dag1_non_b_running, dag1_b_running, total_running = _running_counts() | ||
| assert dag1_non_b_running == 1 | ||
| assert dag1_b_running == 3 | ||
| assert total_running == 14 | ||
| assert session.scalar(select(func.count()).select_from(DagRun)) == 46 | ||
| assert total_running == 18 | ||
| assert session.scalar(select(func.count()).select_from(DagRun)) == 66 | ||
| assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 36 | ||
|
|
||
| # run it a 3rd time and nothing changes | ||
|
|
@@ -6009,8 +6111,8 @@ def _running_counts(): | |
| dag1_non_b_running, dag1_b_running, total_running = _running_counts() | ||
| assert dag1_non_b_running == 1 | ||
| assert dag1_b_running == 3 | ||
| assert total_running == 14 | ||
| assert session.scalar(select(func.count()).select_from(DagRun)) == 46 | ||
| assert total_running == 18 | ||
| assert session.scalar(select(func.count()).select_from(DagRun)) == 66 | ||
| assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 36 | ||
|
|
||
| def test_backfill_runs_are_started_with_lower_priority_catchup_false(self, dag_maker, session): | ||
|
|
@@ -6230,25 +6332,11 @@ def _running_counts(): | |
| assert dag1_non_b_running == 1 | ||
| assert dag1_b_running == 3 | ||
|
|
||
| # this should be 14 but it is not. why? | ||
| # answer: because dag2 got starved out by dag1 | ||
| # if we run the scheduler again, dag2 should get queued | ||
| assert total_running == 4 | ||
| assert total_running == 14 | ||
|
|
||
| assert session.scalar(select(func.count()).select_from(DagRun)) == 46 | ||
| assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 36 | ||
|
|
||
| # run scheduler a second time | ||
| self.job_runner._start_queued_dagruns(session) | ||
| session.flush() | ||
|
|
||
| dag1_non_b_running, dag1_b_running, total_running = _running_counts() | ||
| assert dag1_non_b_running == 1 | ||
| assert dag1_b_running == 3 | ||
|
|
||
| # on the second try, dag 2's 10 runs now start running | ||
| assert total_running == 14 | ||
|
|
||
| assert session.scalar(select(func.count()).select_from(DagRun)) == 46 | ||
| assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 36 | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
available_dagruns_rnusesrow_number(... order_by=DagRun.logical_date), butlogical_dateis nullable and does not reflect the scheduler’s actual priority ordering (run_after, and for backfillsBackfillDagRun.sort_ordinal, which can differ e.g. reverse backfills). This can cause the rn-capacity filter to pick a non-runnable/futurerun_afterrow (or wrong backfill ordinal) asrn=1and then filter out runnable candidates, reintroducing starvation or breaking backfill ordering. Consider basing the windoworder_byonrun_afterwith a deterministic tiebreaker (e.g.id), and for backfills incorporateBackfillDagRun.sort_ordinal(likely via computing the row_number after joining that table), and/or applyrun_after <= now()in the window subquery so rn is computed only among runnable queued runs.