From f922f2b34a427da0a70a716930e63da7e58d5ab1 Mon Sep 17 00:00:00 2001 From: Nataneljpwd Date: Mon, 23 Mar 2026 18:24:17 +0200 Subject: [PATCH 1/7] fixed dagrun starvation with the max_active_tasks limit --- airflow-core/src/airflow/models/dagrun.py | 25 +++++++-- .../tests/unit/jobs/test_scheduler_job.py | 53 ++++++++++--------- 2 files changed, 50 insertions(+), 28 deletions(-) diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index c37713da4d843..4ac98ab3f414d 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -649,9 +649,25 @@ def get_queued_dag_runs_to_set_running(cls, session: Session) -> ScalarResult[Da .subquery() ) + available_dagruns_rn = ( + select( + DagRun.dag_id, + DagRun.id, + func.row_number().over(partition_by=[DagRun.dag_id, DagRun.backfill_id]).label("rn"), + ) + .where(DagRun.state == DagRunState.QUEUED) + .subquery() + ) + query = ( select(cls) - .where(cls.state == DagRunState.QUEUED) + .join( + available_dagruns_rn, + and_( + available_dagruns_rn.c.id == DagRun.id, + available_dagruns_rn.c.dag_id == DagRun.dag_id, + ), + ) .join( DagModel, and_( @@ -683,8 +699,9 @@ def get_queued_dag_runs_to_set_running(cls, session: Session) -> ScalarResult[Da # the one done in this query verifies that the dag is not maxed out # it could return many more dag runs than runnable if there is even # capacity for 1. this could be improved. - coalesce(running_drs.c.num_running, text("0")) - < coalesce(Backfill.max_active_runs, DagModel.max_active_runs), + available_dagruns_rn.c.rn + <= coalesce(Backfill.max_active_runs, DagModel.max_active_runs,999999) + - coalesce(running_drs.c.num_running, 0), # don't set paused dag runs as running not_(coalesce(cast("ColumnElement[bool]", Backfill.is_paused), False)), ) @@ -704,6 +721,8 @@ def get_queued_dag_runs_to_set_running(cls, session: Session) -> ScalarResult[Da query = query.where(DagRun.run_after <= func.now()) + print(str(query)) + return session.scalars(with_row_locks(query, of=cls, session=session, skip_locked=True)) @classmethod diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 997ebd24bcf0d..dea2a738af674 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -5965,14 +5965,14 @@ def _running_counts(): EmptyOperator(task_id="mytask") dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, state=State.QUEUED) - for _ in range(9): + for _ in range(29): dr = dag_maker.create_dagrun_after(dr, run_type=DagRunType.SCHEDULED, state=State.QUEUED) # initial state -- nothing is running assert dag1_non_b_running == 0 assert dag1_b_running == 0 assert total_running == 0 - assert session.scalar(select(func.count(DagRun.id))) == 46 + assert session.scalar(select(func.count(DagRun.id))) == 66 assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 36 # now let's run it once @@ -5980,26 +5980,43 @@ def _running_counts(): session.flush() # after running the scheduler one time, observe that only one dag run is started - # this is because there are 30 runs for dag 1 so neither the backfills nor + # and 3 backfill dagruns are started + # this is because there are 30 dags, most of which get filtered due to max_active_runs + # and so due to the default dagruns to examine, we look at the first 20 dags which CAN be run + # according to the max_active_runs parameter, meaning 3 backfill runs will start, 1 non backfill and + # all dagruns of dag2 # any runs for dag2 get started assert DagRun.DEFAULT_DAGRUNS_TO_EXAMINE == 20 dag1_non_b_running, dag1_b_running, total_running = _running_counts() assert dag1_non_b_running == 1 - assert dag1_b_running == 0 - assert total_running == 1 - assert session.scalar(select(func.count()).select_from(DagRun)) == 46 + assert dag1_b_running == 3 + assert total_running == 20 + assert session.scalar(select(func.count()).select_from(DagRun)) == 66 assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 36 + # now we finish all lower priority backfill tasks, and observe new higher priority tasks are started + session.execute( + update(DagRun) + .where( + DagRun.dag_id == "test_dag2", + DagRun.state == DagRunState.RUNNING + ) + .values(state=DagRunState.SUCCESS) + ) + session.commit() + session.flush() # we run scheduler again and observe that now all the runs are created + # other than the finished runs of the backfill # this must be because sorting is working + # new tasks from test dag 2 should run, and so they are scheduled self.job_runner._start_queued_dagruns(session) session.flush() dag1_non_b_running, dag1_b_running, total_running = _running_counts() assert dag1_non_b_running == 1 assert dag1_b_running == 3 - assert total_running == 14 - assert session.scalar(select(func.count()).select_from(DagRun)) == 46 + assert total_running == 18 + assert session.scalar(select(func.count()).select_from(DagRun)) == 66 assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 36 # run it a 3rd time and nothing changes @@ -6009,8 +6026,8 @@ def _running_counts(): dag1_non_b_running, dag1_b_running, total_running = _running_counts() assert dag1_non_b_running == 1 assert dag1_b_running == 3 - assert total_running == 14 - assert session.scalar(select(func.count()).select_from(DagRun)) == 46 + assert total_running == 18 + assert session.scalar(select(func.count()).select_from(DagRun)) == 66 assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 36 def test_backfill_runs_are_started_with_lower_priority_catchup_false(self, dag_maker, session): @@ -6230,25 +6247,11 @@ def _running_counts(): assert dag1_non_b_running == 1 assert dag1_b_running == 3 - # this should be 14 but it is not. why? - # answer: because dag2 got starved out by dag1 - # if we run the scheduler again, dag2 should get queued - assert total_running == 4 + assert total_running == 14 assert session.scalar(select(func.count()).select_from(DagRun)) == 46 assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 36 - # run scheduler a second time - self.job_runner._start_queued_dagruns(session) - session.flush() - - dag1_non_b_running, dag1_b_running, total_running = _running_counts() - assert dag1_non_b_running == 1 - assert dag1_b_running == 3 - - # on the second try, dag 2's 10 runs now start running - assert total_running == 14 - assert session.scalar(select(func.count()).select_from(DagRun)) == 46 assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 36 From 85e4c6a95d87eda7bbb7db61c723f0523fd5674c Mon Sep 17 00:00:00 2001 From: Nataneljpwd Date: Mon, 23 Mar 2026 18:24:34 +0200 Subject: [PATCH 2/7] formatted files --- airflow-core/src/airflow/models/dagrun.py | 2 +- airflow-core/tests/unit/jobs/test_scheduler_job.py | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 4ac98ab3f414d..d02ca71446f62 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -700,7 +700,7 @@ def get_queued_dag_runs_to_set_running(cls, session: Session) -> ScalarResult[Da # it could return many more dag runs than runnable if there is even # capacity for 1. this could be improved. available_dagruns_rn.c.rn - <= coalesce(Backfill.max_active_runs, DagModel.max_active_runs,999999) + <= coalesce(Backfill.max_active_runs, DagModel.max_active_runs, 999999) - coalesce(running_drs.c.num_running, 0), # don't set paused dag runs as running not_(coalesce(cast("ColumnElement[bool]", Backfill.is_paused), False)), diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index dea2a738af674..8f0e10f97d692 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -5996,10 +5996,7 @@ def _running_counts(): # now we finish all lower priority backfill tasks, and observe new higher priority tasks are started session.execute( update(DagRun) - .where( - DagRun.dag_id == "test_dag2", - DagRun.state == DagRunState.RUNNING - ) + .where(DagRun.dag_id == "test_dag2", DagRun.state == DagRunState.RUNNING) .values(state=DagRunState.SUCCESS) ) session.commit() From dd6b74dc9db513082da0ac4893bb77b560eb66f2 Mon Sep 17 00:00:00 2001 From: Nataneljpwd Date: Mon, 23 Mar 2026 18:26:34 +0200 Subject: [PATCH 3/7] removed print --- airflow-core/src/airflow/models/dagrun.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index d02ca71446f62..9a3142e728dfa 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -721,8 +721,6 @@ def get_queued_dag_runs_to_set_running(cls, session: Session) -> ScalarResult[Da query = query.where(DagRun.run_after <= func.now()) - print(str(query)) - return session.scalars(with_row_locks(query, of=cls, session=session, skip_locked=True)) @classmethod From ecdb722987e5bcb4872e185629610f0edd4899a4 Mon Sep 17 00:00:00 2001 From: Nataneljpwd Date: Mon, 23 Mar 2026 18:44:08 +0200 Subject: [PATCH 4/7] removed redundent tests --- .../src/airflow/jobs/scheduler_job_runner.py | 9 +-- airflow-core/src/airflow/models/dagrun.py | 6 +- .../tests/unit/jobs/test_scheduler_job.py | 80 ------------------- 3 files changed, 6 insertions(+), 89 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index c64bd166f1b3a..c8dda7f82baaa 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1905,14 +1905,6 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) - ) for dag_model in dag_models: - if dag_model.exceeds_max_non_backfill: - self.log.warning( - "Dag run cannot be created; max active runs exceeded.", - dag_id=dag_model.dag_id, - max_active_runs=dag_model.max_active_runs, - active_runs=active_runs_of_dags.get(dag_model.dag_id), - ) - continue if dag_model.timetable_partitioned is False: # non partition-aware Dags if dag_model.next_dagrun is None: @@ -3221,6 +3213,7 @@ def _try_to_load_executor( return executor + # TODO: remove as it moved to the get_queued_dag_runs_to_set_running method in dagrun.py def _set_exceeds_max_active_runs( self, *, diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 9a3142e728dfa..889936ee625ff 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -700,7 +700,11 @@ def get_queued_dag_runs_to_set_running(cls, session: Session) -> ScalarResult[Da # it could return many more dag runs than runnable if there is even # capacity for 1. this could be improved. available_dagruns_rn.c.rn - <= coalesce(Backfill.max_active_runs, DagModel.max_active_runs, 999999) + <= coalesce( + Backfill.max_active_runs, + DagModel.max_active_runs, + airflow_conf.getint("core", "max_active_runs_per_dag"), + ) - coalesce(running_drs.c.num_running, 0), # don't set paused dag runs as running not_(coalesce(cast("ColumnElement[bool]", Backfill.is_paused), False)), diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 8f0e10f97d692..e7efc56d737ef 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -3230,33 +3230,6 @@ def test_cleanup_methods_all_called_multiple_executors(self, mock_executors): for executor in self.job_runner.executors: executor.end.assert_called_once() - def test_queued_dagruns_stops_creating_when_max_active_is_reached(self, dag_maker, session): - """This tests that _create_dag_runs stops creating once max_active_runs is reached""" - with dag_maker(max_active_runs=10) as dag: - EmptyOperator(task_id="mytask") - - scheduler_job = Job() - self.job_runner = SchedulerJobRunner(job=scheduler_job, executors=[self.null_exec]) - - orm_dag = session.get(DagModel, dag.dag_id) - assert orm_dag is not None - for _num in range(20): - self.job_runner._create_dag_runs([orm_dag], session) - drs = session.scalars(select(DagRun)).all() - assert len(drs) == 10 - - for dr in drs: - dr.state = State.RUNNING - session.merge(dr) - session.commit() - assert session.scalar(select(func.count(DagRun.state)).where(DagRun.state == State.RUNNING)) == 10 - for _ in range(20): - self.job_runner._create_dag_runs([orm_dag], session) - assert session.scalar(select(func.count()).select_from(DagRun)) == 10 - assert session.scalar(select(func.count(DagRun.state)).where(DagRun.state == State.RUNNING)) == 10 - assert session.scalar(select(func.count(DagRun.state)).where(DagRun.state == State.QUEUED)) == 0 - assert orm_dag.next_dagrun_create_after is not None - def test_runs_are_created_after_max_active_runs_was_reached(self, dag_maker, session): """ Test that when creating runs once max_active_runs is reached the runs does not stick @@ -5525,59 +5498,6 @@ def test_do_schedule_max_active_runs_task_removed(self, session, dag_maker): ti.refresh_from_db(session=session) assert ti.state == State.QUEUED - def test_more_runs_are_not_created_when_max_active_runs_is_reached(self, dag_maker, caplog, session): - """ - This tests that when max_active_runs is reached, _create_dag_runs doesn't create - more dagruns - """ - # Explicitly set catchup=True as test specifically expects historical dates to be respected - with dag_maker(max_active_runs=1, catchup=True): - EmptyOperator(task_id="task") - scheduler_job = Job() - self.job_runner = SchedulerJobRunner(job=scheduler_job, executors=[MockExecutor(do_update=False)]) - - assert session.scalar(select(func.count()).select_from(DagRun)) == 0 - query, _ = DagModel.dags_needing_dagruns(session) - dag_models = query.all() - self.job_runner._create_dag_runs(dag_models, session) - dr = session.scalars(select(DagRun)).one() - dr.state == DagRunState.QUEUED - assert session.scalar(select(func.count()).select_from(DagRun)) == 1 - assert dag_maker.dag_model.next_dagrun_create_after == DEFAULT_DATE + timedelta(days=2) - assert dag_maker.dag_model.next_dagrun == DEFAULT_DATE + timedelta(days=1) - session.flush() - # dags_needing_dagruns query should not return any value - query, _ = DagModel.dags_needing_dagruns(session) - assert len(query.all()) == 0 - self.job_runner._create_dag_runs(dag_models, session) - assert session.scalar(select(func.count()).select_from(DagRun)) == 1 - assert dag_maker.dag_model.next_dagrun_create_after == DEFAULT_DATE + timedelta(days=2) - assert dag_maker.dag_model.next_dagrun == DEFAULT_DATE + timedelta(days=1) - # set dagrun to success - dr = session.scalars(select(DagRun)).one() - dr.state = DagRunState.SUCCESS - ti = dr.get_task_instance("task", session) - ti.state = TaskInstanceState.SUCCESS - session.merge(ti) - session.merge(dr) - session.flush() - # check that next_dagrun is set properly by Schedulerjob._update_dag_next_dagruns - self.job_runner._schedule_dag_run(dr, session) - session.flush() - query, _ = DagModel.dags_needing_dagruns(session) - assert len(query.all()) == 1 - # assert next_dagrun has been updated correctly - assert dag_maker.dag_model.next_dagrun == DEFAULT_DATE + timedelta(days=1) - # assert no dagruns is created yet - assert ( - session.scalar( - select(func.count()) - .select_from(DagRun) - .where(DagRun.state.in_([DagRunState.RUNNING, DagRunState.QUEUED])) - ) - == 0 - ) - def test_max_active_runs_creation_phasing(self, dag_maker, session): """ Test that when creating runs once max_active_runs is reached that the runs come in the right order From 8356843da35ece736644b4cc4b67fe354194c813 Mon Sep 17 00:00:00 2001 From: Nataneljpwd Date: Mon, 23 Mar 2026 21:20:38 +0200 Subject: [PATCH 5/7] fix mysql test --- airflow-core/src/airflow/models/dagrun.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 889936ee625ff..4a9e1b9796166 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -653,7 +653,9 @@ def get_queued_dag_runs_to_set_running(cls, session: Session) -> ScalarResult[Da select( DagRun.dag_id, DagRun.id, - func.row_number().over(partition_by=[DagRun.dag_id, DagRun.backfill_id]).label("rn"), + func.row_number() + .over(partition_by=[DagRun.dag_id, DagRun.backfill_id], uorder_by=DagRun.logical_date) + .label("rn"), ) .where(DagRun.state == DagRunState.QUEUED) .subquery() From 1c0c28f65bb76e6ea5ec77e77c71827462a8130a Mon Sep 17 00:00:00 2001 From: Nataneljpwd Date: Tue, 24 Mar 2026 21:35:20 +0200 Subject: [PATCH 6/7] address cr comments --- .../src/airflow/jobs/scheduler_job_runner.py | 9 +- airflow-core/src/airflow/models/dagrun.py | 3 +- .../tests/unit/jobs/test_scheduler_job.py | 123 ++++++++++++++++++ 3 files changed, 132 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 4a464f10fa6b6..bda95bbf4b06a 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1913,6 +1913,14 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) - ) for dag_model in dag_models: + if dag_model.exceeds_max_non_backfill: + self.log.warning( + "Dag run cannot be created; max active runs exceeded.", + dag_id=dag_model.dag_id, + max_active_runs=dag_model.max_active_runs, + active_runs=active_runs_of_dags.get(dag_model.dag_id), + ) + continue if dag_model.timetable_partitioned is False: # non partition-aware Dags if dag_model.next_dagrun is None: @@ -3221,7 +3229,6 @@ def _try_to_load_executor( return executor - # TODO: remove as it moved to the get_queued_dag_runs_to_set_running method in dagrun.py def _set_exceeds_max_active_runs( self, *, diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 0824f043b4fc5..2ae5a7af39abe 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -661,7 +661,7 @@ def get_queued_dag_runs_to_set_running(cls, session: Session) -> ScalarResult[Da DagRun.dag_id, DagRun.id, func.row_number() - .over(partition_by=[DagRun.dag_id, DagRun.backfill_id], uorder_by=DagRun.logical_date) + .over(partition_by=[DagRun.dag_id, DagRun.backfill_id], order_by=DagRun.logical_date) .label("rn"), ) .where(DagRun.state == DagRunState.QUEUED) @@ -712,7 +712,6 @@ def get_queued_dag_runs_to_set_running(cls, session: Session) -> ScalarResult[Da <= coalesce( Backfill.max_active_runs, DagModel.max_active_runs, - airflow_conf.getint("core", "max_active_runs_per_dag"), ) - coalesce(running_drs.c.num_running, 0), # don't set paused dag runs as running diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index fe1933361c53b..1a7fca70abc68 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -3230,6 +3230,33 @@ def test_cleanup_methods_all_called_multiple_executors(self, mock_executors): for executor in self.job_runner.executors: executor.end.assert_called_once() + def test_queued_dagruns_stops_creating_when_max_active_is_reached(self, dag_maker, session): + """This tests that _create_dag_runs stops creating once max_active_runs is reached""" + with dag_maker(max_active_runs=10) as dag: + EmptyOperator(task_id="mytask") + + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job, executors=[self.null_exec]) + + orm_dag = session.get(DagModel, dag.dag_id) + assert orm_dag is not None + for _num in range(20): + self.job_runner._create_dag_runs([orm_dag], session) + drs = session.scalars(select(DagRun)).all() + assert len(drs) == 10 + + for dr in drs: + dr.state = State.RUNNING + session.merge(dr) + session.commit() + assert session.scalar(select(func.count(DagRun.state)).where(DagRun.state == State.RUNNING)) == 10 + for _ in range(20): + self.job_runner._create_dag_runs([orm_dag], session) + assert session.scalar(select(func.count()).select_from(DagRun)) == 10 + assert session.scalar(select(func.count(DagRun.state)).where(DagRun.state == State.RUNNING)) == 10 + assert session.scalar(select(func.count(DagRun.state)).where(DagRun.state == State.QUEUED)) == 0 + assert orm_dag.next_dagrun_create_after is not None + def test_runs_are_created_after_max_active_runs_was_reached(self, dag_maker, session): """ Test that when creating runs once max_active_runs is reached the runs does not stick @@ -3262,6 +3289,49 @@ def test_runs_are_created_after_max_active_runs_was_reached(self, dag_maker, ses dag_runs = DagRun.find(dag_id=dag.dag_id, session=session) assert len(dag_runs) == 2 + def test_runs_are_not_starved_by_max_active_runs_limit(self, dag_maker, session): + """ + Test that dagruns are not starved by max_active_runs + """ + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job, executors=[self.null_exec]) + + dag_ids = ["dag1", "dag2", "dag3"] + + max_active_runs = 3 + + for dag_id in dag_ids: + with dag_maker( + dag_id=dag_id, + max_active_runs=max_active_runs, + session=session, + catchup=True, + schedule=timedelta(seconds=60), + start_date=DEFAULT_DATE, + ): + # Need to use something that doesn't immediately get marked as success by the scheduler + BashOperator(task_id="task", bash_command="true") + + dag_run = dag_maker.create_dagrun( + state=State.QUEUED, session=session, run_type=DagRunType.SCHEDULED + ) + + for _ in range(50): + # create a bunch of dagruns in queued state, to make sure they are filtered by max_active_runs + dag_run = dag_maker.create_dagrun_after( + dag_run, run_type=DagRunType.SCHEDULED, state=State.QUEUED + ) + # unique something + + self.job_runner._start_queued_dagruns(session) + session.flush() + + dagrun_count = session.scalar( + select(func.count()).select_from(DagRun).where(DagRun.state == DagRunState.RUNNING) + ) + + assert dagrun_count == max_active_runs * len(dag_ids) + def test_dagrun_timeout_verify_max_active_runs(self, dag_maker, session): """ Test if a dagrun will not be scheduled if max_dag_runs @@ -5498,6 +5568,59 @@ def test_do_schedule_max_active_runs_task_removed(self, session, dag_maker): ti.refresh_from_db(session=session) assert ti.state == State.QUEUED + def test_more_runs_are_not_created_when_max_active_runs_is_reached(self, dag_maker, caplog, session): + """ + This tests that when max_active_runs is reached, _create_dag_runs doesn't create + more dagruns + """ + # Explicitly set catchup=True as test specifically expects historical dates to be respected + with dag_maker(max_active_runs=1, catchup=True): + EmptyOperator(task_id="task") + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job, executors=[MockExecutor(do_update=False)]) + + assert session.scalar(select(func.count()).select_from(DagRun)) == 0 + query, _ = DagModel.dags_needing_dagruns(session) + dag_models = query.all() + self.job_runner._create_dag_runs(dag_models, session) + dr = session.scalars(select(DagRun)).one() + dr.state == DagRunState.QUEUED + assert session.scalar(select(func.count()).select_from(DagRun)) == 1 + assert dag_maker.dag_model.next_dagrun_create_after == DEFAULT_DATE + timedelta(days=2) + assert dag_maker.dag_model.next_dagrun == DEFAULT_DATE + timedelta(days=1) + session.flush() + # dags_needing_dagruns query should not return any value + query, _ = DagModel.dags_needing_dagruns(session) + assert len(query.all()) == 0 + self.job_runner._create_dag_runs(dag_models, session) + assert session.scalar(select(func.count()).select_from(DagRun)) == 1 + assert dag_maker.dag_model.next_dagrun_create_after == DEFAULT_DATE + timedelta(days=2) + assert dag_maker.dag_model.next_dagrun == DEFAULT_DATE + timedelta(days=1) + # set dagrun to success + dr = session.scalars(select(DagRun)).one() + dr.state = DagRunState.SUCCESS + ti = dr.get_task_instance("task", session) + ti.state = TaskInstanceState.SUCCESS + session.merge(ti) + session.merge(dr) + session.flush() + # check that next_dagrun is set properly by Schedulerjob._update_dag_next_dagruns + self.job_runner._schedule_dag_run(dr, session) + session.flush() + query, _ = DagModel.dags_needing_dagruns(session) + assert len(query.all()) == 1 + # assert next_dagrun has been updated correctly + assert dag_maker.dag_model.next_dagrun == DEFAULT_DATE + timedelta(days=1) + # assert no dagruns is created yet + assert ( + session.scalar( + select(func.count()) + .select_from(DagRun) + .where(DagRun.state.in_([DagRunState.RUNNING, DagRunState.QUEUED])) + ) + == 0 + ) + def test_max_active_runs_creation_phasing(self, dag_maker, session): """ Test that when creating runs once max_active_runs is reached that the runs come in the right order From 9c28d0dc1b3b30e75dd979cee669fadfef5421db Mon Sep 17 00:00:00 2001 From: Nataneljpwd Date: Tue, 24 Mar 2026 21:52:52 +0200 Subject: [PATCH 7/7] added explicit test for more running dagruns than limit not causing more to be set to running --- .../tests/unit/jobs/test_scheduler_job.py | 51 +++++++++++++++++-- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 1a7fca70abc68..f299665d18cdf 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -3321,16 +3321,61 @@ def test_runs_are_not_starved_by_max_active_runs_limit(self, dag_maker, session) dag_run = dag_maker.create_dagrun_after( dag_run, run_type=DagRunType.SCHEDULED, state=State.QUEUED ) - # unique something self.job_runner._start_queued_dagruns(session) session.flush() - dagrun_count = session.scalar( + running_dagrun_count = session.scalar( select(func.count()).select_from(DagRun).where(DagRun.state == DagRunState.RUNNING) ) - assert dagrun_count == max_active_runs * len(dag_ids) + assert running_dagrun_count == max_active_runs * len(dag_ids) + + def test_no_more_dagruns_are_set_to_running_when_max_active_runs_exceeded(self, dag_maker, session): + """ + Test that dagruns are not moved to running if there are more than the max_active_runs running dagruns + """ + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job, executors=[self.null_exec]) + + max_active_runs = 1 + with dag_maker( + dag_id="test_dag", + max_active_runs=max_active_runs, + session=session, + catchup=True, + schedule=timedelta(seconds=60), + start_date=DEFAULT_DATE, + ): + # Need to use something that doesn't immediately get marked as success by the scheduler + BashOperator(task_id="task", bash_command="true") + + dag_run = dag_maker.create_dagrun(state=State.RUNNING, session=session, run_type=DagRunType.SCHEDULED) + + for _ in range(5): + # create a bunch of dagruns in queued state, to make sure they are filtered by max_active_runs + dag_run = dag_maker.create_dagrun_after( + dag_run, run_type=DagRunType.SCHEDULED, state=State.RUNNING + ) + + running_dagruns_pre = session.scalar( + select(func.count()).select_from(DagRun).where(DagRun.state == DagRunState.RUNNING) + ) + + for _ in range(5): + # create a bunch of dagruns in queued state, to make sure they are filtered by max_active_runs + dag_run = dag_maker.create_dagrun_after( + dag_run, run_type=DagRunType.SCHEDULED, state=State.QUEUED + ) + + self.job_runner._start_queued_dagruns(session) + session.flush() + + running_dagruns_post = session.scalar( + select(func.count()).select_from(DagRun).where(DagRun.state == DagRunState.RUNNING) + ) + + assert running_dagruns_pre == running_dagruns_post def test_dagrun_timeout_verify_max_active_runs(self, dag_maker, session): """