From 47bb09fa6488acc878a007c9d5c7a62460d67d4b Mon Sep 17 00:00:00 2001 From: Shivam Rastogi <6463385+shivaam@users.noreply.github.com> Date: Fri, 27 Feb 2026 01:48:56 -0800 Subject: [PATCH 1/2] Fix backfill marked complete before DagRuns are created (#61375) The scheduler's _mark_backfills_complete() could mark a backfill as completed during the window between the Backfill row commit and DagRun creation. Add an EXISTS guard on BackfillDagRun so backfills still being initialized are skipped. --- .../src/airflow/jobs/scheduler_job_runner.py | 5 +- .../tests/unit/jobs/test_scheduler_job.py | 53 ++++++++++++++++++- 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index c64bd166f1b3a..0e2f45d98b6f0 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -71,7 +71,7 @@ TaskInletAssetReference, TaskOutletAssetReference, ) -from airflow.models.backfill import Backfill +from airflow.models.backfill import Backfill, BackfillDagRun from airflow.models.callback import Callback, CallbackType, ExecutorCallback from airflow.models.dag import DagModel from airflow.models.dag_version import DagVersion @@ -1855,6 +1855,9 @@ def _mark_backfills_complete(self, session: Session = NEW_SESSION) -> None: # todo: AIP-78 simplify this function to an update statement query = select(Backfill).where( Backfill.completed_at.is_(None), + # Guard: backfill must have at least one BackfillDagRun association, + # otherwise it is still being set up (see #61375). + exists(select(BackfillDagRun.id).where(BackfillDagRun.backfill_id == Backfill.id)), ~exists( select(DagRun.id).where( and_(DagRun.backfill_id == Backfill.id, DagRun.state.in_(unfinished_states)) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 997ebd24bcf0d..3f42ab56d542e 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -65,7 +65,7 @@ AssetPartitionDagRun, PartitionedAssetKeyLog, ) -from airflow.models.backfill import Backfill, _create_backfill +from airflow.models.backfill import Backfill, BackfillDagRun, ReprocessBehavior, _create_backfill from airflow.models.callback import ExecutorCallback from airflow.models.dag import DagModel, get_last_dagrun, infer_automated_data_interval from airflow.models.dag_version import DagVersion @@ -8877,6 +8877,57 @@ def test_mark_backfills_completed(dag_maker, session): assert b.completed_at.timestamp() > 0 +def test_mark_backfills_complete_skips_initializing_backfill(dag_maker, session): + clear_db_backfills() + dag_id = "test_backfill_race_lifecycle" + with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"): + BashOperator(task_id="hi", bash_command="echo hi") + b = Backfill( + dag_id=dag_id, + from_date=pendulum.parse("2021-01-01"), + to_date=pendulum.parse("2021-01-03"), + max_active_runs=10, + dag_run_conf={}, + reprocess_behavior=ReprocessBehavior.NONE, + ) + session.add(b) + session.commit() + backfill_id = b.id + session.expunge_all() + runner = SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + runner._mark_backfills_complete() + b = session.get(Backfill, backfill_id) + assert b.completed_at is None + session.expunge_all() + dr = DagRun( + dag_id=dag_id, + run_id="backfill__2021-01-01T00:00:00+00:00", + run_type=DagRunType.BACKFILL_JOB, + logical_date=pendulum.parse("2021-01-01"), + data_interval=(pendulum.parse("2021-01-01"), pendulum.parse("2021-01-02")), + run_after=pendulum.parse("2021-01-02"), + state=DagRunState.SUCCESS, + backfill_id=backfill_id, + ) + session.add(dr) + session.flush() + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=dr.id, + logical_date=pendulum.parse("2021-01-01"), + sort_ordinal=1, + ) + ) + session.commit() + session.expunge_all() + runner._mark_backfills_complete() + b = session.get(Backfill, backfill_id) + assert b.completed_at is not None + + class Key1Mapper(CorePartitionMapper): """Partition Mapper that returns only key-1 as downstream key""" From 2eb03141b1cfcd31df799549d701e70186ecc90a Mon Sep 17 00:00:00 2001 From: Shivam Rastogi <6463385+shivaam@users.noreply.github.com> Date: Sun, 22 Mar 2026 21:17:59 -0700 Subject: [PATCH 2/2] Add age-based cleanup for orphaned backfills and edge case tests Add a 2-minute age-based fallback to the backfill completion guard so orphaned backfills (those that failed during initialization and never got any BackfillDagRun associations) are auto-completed instead of permanently blocking new backfills for the same DAG. Also adds tests for: orphan cleanup after cutoff, old backfill with running DagRuns stays open, young backfill with finished runs completes immediately, and multiple backfills processed independently. --- airflow-core/newsfragments/62561.bugfix.rst | 1 + .../src/airflow/jobs/scheduler_job_runner.py | 10 +- .../tests/unit/jobs/test_scheduler_job.py | 206 ++++++++++++++++++ 3 files changed, 215 insertions(+), 2 deletions(-) create mode 100644 airflow-core/newsfragments/62561.bugfix.rst diff --git a/airflow-core/newsfragments/62561.bugfix.rst b/airflow-core/newsfragments/62561.bugfix.rst new file mode 100644 index 0000000000000..9b40b89e21177 --- /dev/null +++ b/airflow-core/newsfragments/62561.bugfix.rst @@ -0,0 +1 @@ +Fix backfill marked complete before DagRuns are created; add age-based cleanup for orphaned backfills. diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 0e2f45d98b6f0..0f3202f02d089 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1853,11 +1853,17 @@ def _mark_backfills_complete(self, session: Session = NEW_SESSION) -> None: unfinished_states = (DagRunState.RUNNING, DagRunState.QUEUED) now = timezone.utcnow() # todo: AIP-78 simplify this function to an update statement + initializing_cutoff = now - timedelta(minutes=2) query = select(Backfill).where( Backfill.completed_at.is_(None), - # Guard: backfill must have at least one BackfillDagRun association, + # Guard: backfill must have at least one association, # otherwise it is still being set up (see #61375). - exists(select(BackfillDagRun.id).where(BackfillDagRun.backfill_id == Backfill.id)), + # Allow cleanup of orphaned backfills older than 2 minutes + # that failed during initialization and never got any associations. + or_( + exists(select(BackfillDagRun.id).where(BackfillDagRun.backfill_id == Backfill.id)), + Backfill.created_at < initializing_cutoff, + ), ~exists( select(DagRun.id).where( and_(DagRun.backfill_id == Backfill.id, DagRun.state.in_(unfinished_states)) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 3f42ab56d542e..a4d8dd1f964a3 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -8928,6 +8928,212 @@ def test_mark_backfills_complete_skips_initializing_backfill(dag_maker, session) assert b.completed_at is not None +def test_mark_backfills_complete_cleans_orphan_after_cutoff(dag_maker, session): + """Backfill with no BackfillDagRun rows older than 2 minutes should be auto-completed.""" + clear_db_backfills() + dag_id = "test_backfill_orphan_cleanup" + with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"): + BashOperator(task_id="hi", bash_command="echo hi") + b = Backfill( + dag_id=dag_id, + from_date=pendulum.parse("2021-01-01"), + to_date=pendulum.parse("2021-01-03"), + max_active_runs=10, + dag_run_conf={}, + reprocess_behavior=ReprocessBehavior.NONE, + ) + session.add(b) + session.commit() + backfill_id = b.id + session.expunge_all() + # Travel 3 minutes into the future so the backfill is past the 2-minute cutoff + with time_machine.travel(pendulum.now("UTC").add(minutes=3), tick=False): + runner = SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + runner._mark_backfills_complete() + b = session.get(Backfill, backfill_id) + assert b.completed_at is not None + + +def test_mark_backfills_complete_keeps_old_backfill_with_running_dagruns(dag_maker, session): + """Old backfill (>2 min) with running DagRuns must NOT be marked complete.""" + clear_db_backfills() + dag_id = "test_backfill_old_with_runs" + with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"): + BashOperator(task_id="hi", bash_command="echo hi") + b = Backfill( + dag_id=dag_id, + from_date=pendulum.parse("2021-01-01"), + to_date=pendulum.parse("2021-01-03"), + max_active_runs=10, + dag_run_conf={}, + reprocess_behavior=ReprocessBehavior.NONE, + ) + session.add(b) + session.commit() + backfill_id = b.id + dr = DagRun( + dag_id=dag_id, + run_id="backfill__2021-01-01T00:00:00+00:00", + run_type=DagRunType.BACKFILL_JOB, + logical_date=pendulum.parse("2021-01-01"), + data_interval=(pendulum.parse("2021-01-01"), pendulum.parse("2021-01-02")), + run_after=pendulum.parse("2021-01-02"), + state=DagRunState.RUNNING, + backfill_id=backfill_id, + ) + session.add(dr) + session.flush() + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=dr.id, + logical_date=pendulum.parse("2021-01-01"), + sort_ordinal=1, + ) + ) + session.commit() + session.expunge_all() + # Travel 3 minutes into the future; backfill is old but has a RUNNING DagRun + with time_machine.travel(pendulum.now("UTC").add(minutes=3), tick=False): + runner = SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + runner._mark_backfills_complete() + b = session.get(Backfill, backfill_id) + assert b.completed_at is None + + +def test_mark_backfills_complete_young_backfill_with_finished_runs(dag_maker, session): + """Young backfill (<2 min) with all SUCCESS DagRuns completes immediately.""" + clear_db_backfills() + dag_id = "test_backfill_young_finished" + with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"): + BashOperator(task_id="hi", bash_command="echo hi") + b = Backfill( + dag_id=dag_id, + from_date=pendulum.parse("2021-01-01"), + to_date=pendulum.parse("2021-01-03"), + max_active_runs=10, + dag_run_conf={}, + reprocess_behavior=ReprocessBehavior.NONE, + ) + session.add(b) + session.commit() + backfill_id = b.id + dr = DagRun( + dag_id=dag_id, + run_id="backfill__2021-01-01T00:00:00+00:00", + run_type=DagRunType.BACKFILL_JOB, + logical_date=pendulum.parse("2021-01-01"), + data_interval=(pendulum.parse("2021-01-01"), pendulum.parse("2021-01-02")), + run_after=pendulum.parse("2021-01-02"), + state=DagRunState.SUCCESS, + backfill_id=backfill_id, + ) + session.add(dr) + session.flush() + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=dr.id, + logical_date=pendulum.parse("2021-01-01"), + sort_ordinal=1, + ) + ) + session.commit() + session.expunge_all() + # No time travel — backfill was just created, should still complete + runner = SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + runner._mark_backfills_complete() + b = session.get(Backfill, backfill_id) + assert b.completed_at is not None + + +def test_mark_backfills_complete_multiple_independent(dag_maker, session): + """Two backfills: one finished, one running — only the finished one completes.""" + clear_db_backfills() + with dag_maker(serialized=True, dag_id="dag_finished", schedule="@daily"): + BashOperator(task_id="hi", bash_command="echo hi") + with dag_maker(serialized=True, dag_id="dag_running", schedule="@daily"): + BashOperator(task_id="hi", bash_command="echo hi") + b_finished = Backfill( + dag_id="dag_finished", + from_date=pendulum.parse("2021-01-01"), + to_date=pendulum.parse("2021-01-03"), + max_active_runs=10, + dag_run_conf={}, + reprocess_behavior=ReprocessBehavior.NONE, + ) + b_running = Backfill( + dag_id="dag_running", + from_date=pendulum.parse("2021-01-01"), + to_date=pendulum.parse("2021-01-03"), + max_active_runs=10, + dag_run_conf={}, + reprocess_behavior=ReprocessBehavior.NONE, + ) + session.add_all([b_finished, b_running]) + session.commit() + finished_id = b_finished.id + running_id = b_running.id + # Finished backfill: SUCCESS DagRun + dr1 = DagRun( + dag_id="dag_finished", + run_id="backfill__2021-01-01T00:00:00+00:00", + run_type=DagRunType.BACKFILL_JOB, + logical_date=pendulum.parse("2021-01-01"), + data_interval=(pendulum.parse("2021-01-01"), pendulum.parse("2021-01-02")), + run_after=pendulum.parse("2021-01-02"), + state=DagRunState.SUCCESS, + backfill_id=finished_id, + ) + session.add(dr1) + session.flush() + session.add( + BackfillDagRun( + backfill_id=finished_id, + dag_run_id=dr1.id, + logical_date=pendulum.parse("2021-01-01"), + sort_ordinal=1, + ) + ) + # Running backfill: RUNNING DagRun + dr2 = DagRun( + dag_id="dag_running", + run_id="backfill__2021-01-01T00:00:00+00:00", + run_type=DagRunType.BACKFILL_JOB, + logical_date=pendulum.parse("2021-01-01"), + data_interval=(pendulum.parse("2021-01-01"), pendulum.parse("2021-01-02")), + run_after=pendulum.parse("2021-01-02"), + state=DagRunState.RUNNING, + backfill_id=running_id, + ) + session.add(dr2) + session.flush() + session.add( + BackfillDagRun( + backfill_id=running_id, + dag_run_id=dr2.id, + logical_date=pendulum.parse("2021-01-01"), + sort_ordinal=1, + ) + ) + session.commit() + session.expunge_all() + runner = SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + runner._mark_backfills_complete() + b_finished = session.get(Backfill, finished_id) + b_running = session.get(Backfill, running_id) + assert b_finished.completed_at is not None + assert b_running.completed_at is None + + class Key1Mapper(CorePartitionMapper): """Partition Mapper that returns only key-1 as downstream key"""