diff --git a/airflow-core/newsfragments/62561.bugfix.rst b/airflow-core/newsfragments/62561.bugfix.rst new file mode 100644 index 0000000000000..9b40b89e21177 --- /dev/null +++ b/airflow-core/newsfragments/62561.bugfix.rst @@ -0,0 +1 @@ +Fix backfill marked complete before DagRuns are created; add age-based cleanup for orphaned backfills. diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index bda95bbf4b06a..67006a7d8ee27 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -71,7 +71,7 @@ TaskInletAssetReference, TaskOutletAssetReference, ) -from airflow.models.backfill import Backfill +from airflow.models.backfill import Backfill, BackfillDagRun from airflow.models.callback import Callback, CallbackType, ExecutorCallback from airflow.models.dag import DagModel from airflow.models.dag_version import DagVersion @@ -1861,8 +1861,17 @@ def _mark_backfills_complete(self, session: Session = NEW_SESSION) -> None: unfinished_states = (DagRunState.RUNNING, DagRunState.QUEUED) now = timezone.utcnow() # todo: AIP-78 simplify this function to an update statement + initializing_cutoff = now - timedelta(minutes=2) query = select(Backfill).where( Backfill.completed_at.is_(None), + # Guard: backfill must have at least one association, + # otherwise it is still being set up (see #61375). + # Allow cleanup of orphaned backfills older than 2 minutes + # that failed during initialization and never got any associations. + or_( + exists(select(BackfillDagRun.id).where(BackfillDagRun.backfill_id == Backfill.id)), + Backfill.created_at < initializing_cutoff, + ), ~exists( select(DagRun.id).where( and_(DagRun.backfill_id == Backfill.id, DagRun.state.in_(unfinished_states)) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index f9ae60a9c3583..76eeba8f29d52 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -65,7 +65,7 @@ AssetPartitionDagRun, PartitionedAssetKeyLog, ) -from airflow.models.backfill import Backfill, _create_backfill +from airflow.models.backfill import Backfill, BackfillDagRun, ReprocessBehavior, _create_backfill from airflow.models.callback import ExecutorCallback from airflow.models.dag import DagModel, get_last_dagrun, infer_automated_data_interval from airflow.models.dag_version import DagVersion @@ -8877,6 +8877,263 @@ def test_mark_backfills_completed(dag_maker, session): assert b.completed_at.timestamp() > 0 +def test_mark_backfills_complete_skips_initializing_backfill(dag_maker, session): + clear_db_backfills() + dag_id = "test_backfill_race_lifecycle" + with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"): + BashOperator(task_id="hi", bash_command="echo hi") + b = Backfill( + dag_id=dag_id, + from_date=pendulum.parse("2021-01-01"), + to_date=pendulum.parse("2021-01-03"), + max_active_runs=10, + dag_run_conf={}, + reprocess_behavior=ReprocessBehavior.NONE, + ) + session.add(b) + session.commit() + backfill_id = b.id + session.expunge_all() + runner = SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + runner._mark_backfills_complete() + b = session.get(Backfill, backfill_id) + assert b.completed_at is None + session.expunge_all() + dr = DagRun( + dag_id=dag_id, + run_id="backfill__2021-01-01T00:00:00+00:00", + run_type=DagRunType.BACKFILL_JOB, + logical_date=pendulum.parse("2021-01-01"), + data_interval=(pendulum.parse("2021-01-01"), pendulum.parse("2021-01-02")), + run_after=pendulum.parse("2021-01-02"), + state=DagRunState.SUCCESS, + backfill_id=backfill_id, + ) + session.add(dr) + session.flush() + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=dr.id, + logical_date=pendulum.parse("2021-01-01"), + sort_ordinal=1, + ) + ) + session.commit() + session.expunge_all() + runner._mark_backfills_complete() + b = session.get(Backfill, backfill_id) + assert b.completed_at is not None + + +def test_mark_backfills_complete_cleans_orphan_after_cutoff(dag_maker, session): + """Backfill with no BackfillDagRun rows older than 2 minutes should be auto-completed.""" + clear_db_backfills() + dag_id = "test_backfill_orphan_cleanup" + with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"): + BashOperator(task_id="hi", bash_command="echo hi") + b = Backfill( + dag_id=dag_id, + from_date=pendulum.parse("2021-01-01"), + to_date=pendulum.parse("2021-01-03"), + max_active_runs=10, + dag_run_conf={}, + reprocess_behavior=ReprocessBehavior.NONE, + ) + session.add(b) + session.commit() + backfill_id = b.id + session.expunge_all() + # Travel 3 minutes into the future so the backfill is past the 2-minute cutoff + with time_machine.travel(pendulum.now("UTC").add(minutes=3), tick=False): + runner = SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + runner._mark_backfills_complete() + b = session.get(Backfill, backfill_id) + assert b.completed_at is not None + + +def test_mark_backfills_complete_keeps_old_backfill_with_running_dagruns(dag_maker, session): + """Old backfill (>2 min) with running DagRuns must NOT be marked complete.""" + clear_db_backfills() + dag_id = "test_backfill_old_with_runs" + with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"): + BashOperator(task_id="hi", bash_command="echo hi") + b = Backfill( + dag_id=dag_id, + from_date=pendulum.parse("2021-01-01"), + to_date=pendulum.parse("2021-01-03"), + max_active_runs=10, + dag_run_conf={}, + reprocess_behavior=ReprocessBehavior.NONE, + ) + session.add(b) + session.commit() + backfill_id = b.id + dr = DagRun( + dag_id=dag_id, + run_id="backfill__2021-01-01T00:00:00+00:00", + run_type=DagRunType.BACKFILL_JOB, + logical_date=pendulum.parse("2021-01-01"), + data_interval=(pendulum.parse("2021-01-01"), pendulum.parse("2021-01-02")), + run_after=pendulum.parse("2021-01-02"), + state=DagRunState.RUNNING, + backfill_id=backfill_id, + ) + session.add(dr) + session.flush() + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=dr.id, + logical_date=pendulum.parse("2021-01-01"), + sort_ordinal=1, + ) + ) + session.commit() + session.expunge_all() + # Travel 3 minutes into the future; backfill is old but has a RUNNING DagRun + with time_machine.travel(pendulum.now("UTC").add(minutes=3), tick=False): + runner = SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + runner._mark_backfills_complete() + b = session.get(Backfill, backfill_id) + assert b.completed_at is None + + +def test_mark_backfills_complete_young_backfill_with_finished_runs(dag_maker, session): + """Young backfill (<2 min) with all SUCCESS DagRuns completes immediately.""" + clear_db_backfills() + dag_id = "test_backfill_young_finished" + with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"): + BashOperator(task_id="hi", bash_command="echo hi") + b = Backfill( + dag_id=dag_id, + from_date=pendulum.parse("2021-01-01"), + to_date=pendulum.parse("2021-01-03"), + max_active_runs=10, + dag_run_conf={}, + reprocess_behavior=ReprocessBehavior.NONE, + ) + session.add(b) + session.commit() + backfill_id = b.id + dr = DagRun( + dag_id=dag_id, + run_id="backfill__2021-01-01T00:00:00+00:00", + run_type=DagRunType.BACKFILL_JOB, + logical_date=pendulum.parse("2021-01-01"), + data_interval=(pendulum.parse("2021-01-01"), pendulum.parse("2021-01-02")), + run_after=pendulum.parse("2021-01-02"), + state=DagRunState.SUCCESS, + backfill_id=backfill_id, + ) + session.add(dr) + session.flush() + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=dr.id, + logical_date=pendulum.parse("2021-01-01"), + sort_ordinal=1, + ) + ) + session.commit() + session.expunge_all() + # No time travel — backfill was just created, should still complete + runner = SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + runner._mark_backfills_complete() + b = session.get(Backfill, backfill_id) + assert b.completed_at is not None + + +def test_mark_backfills_complete_multiple_independent(dag_maker, session): + """Two backfills: one finished, one running — only the finished one completes.""" + clear_db_backfills() + with dag_maker(serialized=True, dag_id="dag_finished", schedule="@daily"): + BashOperator(task_id="hi", bash_command="echo hi") + with dag_maker(serialized=True, dag_id="dag_running", schedule="@daily"): + BashOperator(task_id="hi", bash_command="echo hi") + b_finished = Backfill( + dag_id="dag_finished", + from_date=pendulum.parse("2021-01-01"), + to_date=pendulum.parse("2021-01-03"), + max_active_runs=10, + dag_run_conf={}, + reprocess_behavior=ReprocessBehavior.NONE, + ) + b_running = Backfill( + dag_id="dag_running", + from_date=pendulum.parse("2021-01-01"), + to_date=pendulum.parse("2021-01-03"), + max_active_runs=10, + dag_run_conf={}, + reprocess_behavior=ReprocessBehavior.NONE, + ) + session.add_all([b_finished, b_running]) + session.commit() + finished_id = b_finished.id + running_id = b_running.id + # Finished backfill: SUCCESS DagRun + dr1 = DagRun( + dag_id="dag_finished", + run_id="backfill__2021-01-01T00:00:00+00:00", + run_type=DagRunType.BACKFILL_JOB, + logical_date=pendulum.parse("2021-01-01"), + data_interval=(pendulum.parse("2021-01-01"), pendulum.parse("2021-01-02")), + run_after=pendulum.parse("2021-01-02"), + state=DagRunState.SUCCESS, + backfill_id=finished_id, + ) + session.add(dr1) + session.flush() + session.add( + BackfillDagRun( + backfill_id=finished_id, + dag_run_id=dr1.id, + logical_date=pendulum.parse("2021-01-01"), + sort_ordinal=1, + ) + ) + # Running backfill: RUNNING DagRun + dr2 = DagRun( + dag_id="dag_running", + run_id="backfill__2021-01-01T00:00:00+00:00", + run_type=DagRunType.BACKFILL_JOB, + logical_date=pendulum.parse("2021-01-01"), + data_interval=(pendulum.parse("2021-01-01"), pendulum.parse("2021-01-02")), + run_after=pendulum.parse("2021-01-02"), + state=DagRunState.RUNNING, + backfill_id=running_id, + ) + session.add(dr2) + session.flush() + session.add( + BackfillDagRun( + backfill_id=running_id, + dag_run_id=dr2.id, + logical_date=pendulum.parse("2021-01-01"), + sort_ordinal=1, + ) + ) + session.commit() + session.expunge_all() + runner = SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + runner._mark_backfills_complete() + b_finished = session.get(Backfill, finished_id) + b_running = session.get(Backfill, running_id) + assert b_finished.completed_at is not None + assert b_running.completed_at is None + + class Key1Mapper(CorePartitionMapper): """Partition Mapper that returns only key-1 as downstream key"""