Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/62561.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix backfill marked complete before DagRuns are created; add age-based cleanup for orphaned backfills.
11 changes: 10 additions & 1 deletion airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
TaskInletAssetReference,
TaskOutletAssetReference,
)
from airflow.models.backfill import Backfill
from airflow.models.backfill import Backfill, BackfillDagRun
from airflow.models.callback import Callback, CallbackType, ExecutorCallback
from airflow.models.dag import DagModel
from airflow.models.dag_version import DagVersion
Expand Down Expand Up @@ -1861,8 +1861,17 @@ def _mark_backfills_complete(self, session: Session = NEW_SESSION) -> None:
unfinished_states = (DagRunState.RUNNING, DagRunState.QUEUED)
now = timezone.utcnow()
# todo: AIP-78 simplify this function to an update statement
initializing_cutoff = now - timedelta(minutes=2)
query = select(Backfill).where(
Backfill.completed_at.is_(None),
# Guard: backfill must have at least one association,
# otherwise it is still being set up (see #61375).
# Allow cleanup of orphaned backfills older than 2 minutes
# that failed during initialization and never got any associations.
or_(
exists(select(BackfillDagRun.id).where(BackfillDagRun.backfill_id == Backfill.id)),
Backfill.created_at < initializing_cutoff,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added this gaurd that will cleanup any backfills which dont have associated dag runs after 2 mins. Therefore, we wont have any stuck backfills

),
~exists(
select(DagRun.id).where(
and_(DagRun.backfill_id == Backfill.id, DagRun.state.in_(unfinished_states))
Expand Down
259 changes: 258 additions & 1 deletion airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
AssetPartitionDagRun,
PartitionedAssetKeyLog,
)
from airflow.models.backfill import Backfill, _create_backfill
from airflow.models.backfill import Backfill, BackfillDagRun, ReprocessBehavior, _create_backfill
from airflow.models.callback import ExecutorCallback
from airflow.models.dag import DagModel, get_last_dagrun, infer_automated_data_interval
from airflow.models.dag_version import DagVersion
Expand Down Expand Up @@ -8877,6 +8877,263 @@ def test_mark_backfills_completed(dag_maker, session):
assert b.completed_at.timestamp() > 0


def test_mark_backfills_complete_skips_initializing_backfill(dag_maker, session):
clear_db_backfills()
dag_id = "test_backfill_race_lifecycle"
with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"):
BashOperator(task_id="hi", bash_command="echo hi")
b = Backfill(
dag_id=dag_id,
from_date=pendulum.parse("2021-01-01"),
to_date=pendulum.parse("2021-01-03"),
max_active_runs=10,
dag_run_conf={},
reprocess_behavior=ReprocessBehavior.NONE,
)
session.add(b)
session.commit()
backfill_id = b.id
session.expunge_all()
runner = SchedulerJobRunner(
job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)]
)
runner._mark_backfills_complete()
b = session.get(Backfill, backfill_id)
assert b.completed_at is None
session.expunge_all()
dr = DagRun(
dag_id=dag_id,
run_id="backfill__2021-01-01T00:00:00+00:00",
run_type=DagRunType.BACKFILL_JOB,
logical_date=pendulum.parse("2021-01-01"),
data_interval=(pendulum.parse("2021-01-01"), pendulum.parse("2021-01-02")),
run_after=pendulum.parse("2021-01-02"),
state=DagRunState.SUCCESS,
backfill_id=backfill_id,
)
session.add(dr)
session.flush()
session.add(
BackfillDagRun(
backfill_id=backfill_id,
dag_run_id=dr.id,
logical_date=pendulum.parse("2021-01-01"),
sort_ordinal=1,
)
)
session.commit()
session.expunge_all()
runner._mark_backfills_complete()
b = session.get(Backfill, backfill_id)
assert b.completed_at is not None


def test_mark_backfills_complete_cleans_orphan_after_cutoff(dag_maker, session):
"""Backfill with no BackfillDagRun rows older than 2 minutes should be auto-completed."""
clear_db_backfills()
dag_id = "test_backfill_orphan_cleanup"
with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"):
BashOperator(task_id="hi", bash_command="echo hi")
b = Backfill(
dag_id=dag_id,
from_date=pendulum.parse("2021-01-01"),
to_date=pendulum.parse("2021-01-03"),
max_active_runs=10,
dag_run_conf={},
reprocess_behavior=ReprocessBehavior.NONE,
)
session.add(b)
session.commit()
backfill_id = b.id
session.expunge_all()
# Travel 3 minutes into the future so the backfill is past the 2-minute cutoff
with time_machine.travel(pendulum.now("UTC").add(minutes=3), tick=False):
runner = SchedulerJobRunner(
job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)]
)
runner._mark_backfills_complete()
b = session.get(Backfill, backfill_id)
assert b.completed_at is not None


def test_mark_backfills_complete_keeps_old_backfill_with_running_dagruns(dag_maker, session):
"""Old backfill (>2 min) with running DagRuns must NOT be marked complete."""
clear_db_backfills()
dag_id = "test_backfill_old_with_runs"
with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"):
BashOperator(task_id="hi", bash_command="echo hi")
b = Backfill(
dag_id=dag_id,
from_date=pendulum.parse("2021-01-01"),
to_date=pendulum.parse("2021-01-03"),
max_active_runs=10,
dag_run_conf={},
reprocess_behavior=ReprocessBehavior.NONE,
)
session.add(b)
session.commit()
backfill_id = b.id
dr = DagRun(
dag_id=dag_id,
run_id="backfill__2021-01-01T00:00:00+00:00",
run_type=DagRunType.BACKFILL_JOB,
logical_date=pendulum.parse("2021-01-01"),
data_interval=(pendulum.parse("2021-01-01"), pendulum.parse("2021-01-02")),
run_after=pendulum.parse("2021-01-02"),
state=DagRunState.RUNNING,
backfill_id=backfill_id,
)
session.add(dr)
session.flush()
session.add(
BackfillDagRun(
backfill_id=backfill_id,
dag_run_id=dr.id,
logical_date=pendulum.parse("2021-01-01"),
sort_ordinal=1,
)
)
session.commit()
session.expunge_all()
# Travel 3 minutes into the future; backfill is old but has a RUNNING DagRun
with time_machine.travel(pendulum.now("UTC").add(minutes=3), tick=False):
runner = SchedulerJobRunner(
job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)]
)
runner._mark_backfills_complete()
b = session.get(Backfill, backfill_id)
assert b.completed_at is None


def test_mark_backfills_complete_young_backfill_with_finished_runs(dag_maker, session):
"""Young backfill (<2 min) with all SUCCESS DagRuns completes immediately."""
clear_db_backfills()
dag_id = "test_backfill_young_finished"
with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"):
BashOperator(task_id="hi", bash_command="echo hi")
b = Backfill(
dag_id=dag_id,
from_date=pendulum.parse("2021-01-01"),
to_date=pendulum.parse("2021-01-03"),
max_active_runs=10,
dag_run_conf={},
reprocess_behavior=ReprocessBehavior.NONE,
)
session.add(b)
session.commit()
backfill_id = b.id
dr = DagRun(
dag_id=dag_id,
run_id="backfill__2021-01-01T00:00:00+00:00",
run_type=DagRunType.BACKFILL_JOB,
logical_date=pendulum.parse("2021-01-01"),
data_interval=(pendulum.parse("2021-01-01"), pendulum.parse("2021-01-02")),
run_after=pendulum.parse("2021-01-02"),
state=DagRunState.SUCCESS,
backfill_id=backfill_id,
)
session.add(dr)
session.flush()
session.add(
BackfillDagRun(
backfill_id=backfill_id,
dag_run_id=dr.id,
logical_date=pendulum.parse("2021-01-01"),
sort_ordinal=1,
)
)
session.commit()
session.expunge_all()
# No time travel — backfill was just created, should still complete
runner = SchedulerJobRunner(
job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)]
)
runner._mark_backfills_complete()
b = session.get(Backfill, backfill_id)
assert b.completed_at is not None


def test_mark_backfills_complete_multiple_independent(dag_maker, session):
"""Two backfills: one finished, one running — only the finished one completes."""
clear_db_backfills()
with dag_maker(serialized=True, dag_id="dag_finished", schedule="@daily"):
BashOperator(task_id="hi", bash_command="echo hi")
with dag_maker(serialized=True, dag_id="dag_running", schedule="@daily"):
BashOperator(task_id="hi", bash_command="echo hi")
b_finished = Backfill(
dag_id="dag_finished",
from_date=pendulum.parse("2021-01-01"),
to_date=pendulum.parse("2021-01-03"),
max_active_runs=10,
dag_run_conf={},
reprocess_behavior=ReprocessBehavior.NONE,
)
b_running = Backfill(
dag_id="dag_running",
from_date=pendulum.parse("2021-01-01"),
to_date=pendulum.parse("2021-01-03"),
max_active_runs=10,
dag_run_conf={},
reprocess_behavior=ReprocessBehavior.NONE,
)
session.add_all([b_finished, b_running])
session.commit()
finished_id = b_finished.id
running_id = b_running.id
# Finished backfill: SUCCESS DagRun
dr1 = DagRun(
dag_id="dag_finished",
run_id="backfill__2021-01-01T00:00:00+00:00",
run_type=DagRunType.BACKFILL_JOB,
logical_date=pendulum.parse("2021-01-01"),
data_interval=(pendulum.parse("2021-01-01"), pendulum.parse("2021-01-02")),
run_after=pendulum.parse("2021-01-02"),
state=DagRunState.SUCCESS,
backfill_id=finished_id,
)
session.add(dr1)
session.flush()
session.add(
BackfillDagRun(
backfill_id=finished_id,
dag_run_id=dr1.id,
logical_date=pendulum.parse("2021-01-01"),
sort_ordinal=1,
)
)
# Running backfill: RUNNING DagRun
dr2 = DagRun(
dag_id="dag_running",
run_id="backfill__2021-01-01T00:00:00+00:00",
run_type=DagRunType.BACKFILL_JOB,
logical_date=pendulum.parse("2021-01-01"),
data_interval=(pendulum.parse("2021-01-01"), pendulum.parse("2021-01-02")),
run_after=pendulum.parse("2021-01-02"),
state=DagRunState.RUNNING,
backfill_id=running_id,
)
session.add(dr2)
session.flush()
session.add(
BackfillDagRun(
backfill_id=running_id,
dag_run_id=dr2.id,
logical_date=pendulum.parse("2021-01-01"),
sort_ordinal=1,
)
)
session.commit()
session.expunge_all()
runner = SchedulerJobRunner(
job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)]
)
runner._mark_backfills_complete()
b_finished = session.get(Backfill, finished_id)
b_running = session.get(Backfill, running_id)
assert b_finished.completed_at is not None
assert b_running.completed_at is None


class Key1Mapper(CorePartitionMapper):
"""Partition Mapper that returns only key-1 as downstream key"""

Expand Down
Loading