Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1202,18 +1202,6 @@ def recalculate(self) -> _UnfinishedStates:
execute=execute_callbacks,
)

if dag.deadline:
# The dagrun has succeeded. If there were any Deadlines for it which were not breached, they are no longer needed.
deadline_alerts = [
DeadlineAlertModel.get_by_id(alert_id, session) for alert_id in dag.deadline
]

if any(
deadline_alert.reference_class in SerializedReferenceModels.TYPES.DAGRUN
for deadline_alert in deadline_alerts
):
Deadline.prune_deadlines(session=session, conditions={DagRun.id: self.id})

# if *all tasks* are deadlocked, the run failed
elif unfinished.should_schedule and not are_runnable_tasks:
self.log.error("Task deadlock (no runnable tasks); marking run %s failed", self)
Expand Down Expand Up @@ -1267,6 +1255,20 @@ def recalculate(self) -> _UnfinishedStates:
self.data_interval_start,
self.data_interval_end,
)

if dag.deadline:
# The dagrun has reached a terminal state. Prune any pending deadlines
# so they don't fire after the run is already finished.
deadline_alerts = [
DeadlineAlertModel.get_by_id(alert_id, session) for alert_id in dag.deadline
]

if any(
deadline_alert.reference_class in SerializedReferenceModels.TYPES.DAGRUN
for deadline_alert in deadline_alerts
Comment on lines +1262 to +1268

Copilot AI Apr 10, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deadline_alerts = [DeadlineAlertModel.get_by_id(...)] issues one DB query per deadline ID. Since this code runs on every terminal DagRun (now including failures), consider fetching all referenced DeadlineAlert rows in a single query (id IN (...)) and then checking whether any reference_class is DagRun-related, to avoid N queries when multiple deadlines are configured.

Suggested change
deadline_alerts = [
DeadlineAlertModel.get_by_id(alert_id, session) for alert_id in dag.deadline
]
if any(
deadline_alert.reference_class in SerializedReferenceModels.TYPES.DAGRUN
for deadline_alert in deadline_alerts
deadline_alert_reference_classes = session.execute(
select(DeadlineAlertModel.reference_class).where(
DeadlineAlertModel.id.in_(set(dag.deadline))
)
).scalars()
if any(
reference_class in SerializedReferenceModels.TYPES.DAGRUN
for reference_class in deadline_alert_reference_classes

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, I don't really think this is necessary and I'd just resolve it. Seems like over-optimizing around an edge case where a dagrun has many deadlines as opposed to just using the exiting helper that serves the same purpose.

):
Deadline.prune_deadlines(session=session, conditions={DagRun.id: self.id})

session.flush()
self._emit_dagrun_span(state=self.state)

Expand Down
30 changes: 30 additions & 0 deletions airflow-core/tests/unit/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,36 @@ def test_dagrun_success_handles_empty_deadline_list(self, mock_prune, dag_maker,
mock_prune.assert_not_called()
assert dag_run.state == DagRunState.SUCCESS

@mock.patch.object(Deadline, "prune_deadlines")
@mock.patch.object(DeadlineAlertModel, "get_by_id")
def test_dagrun_failure_prunes_dagrun_deadlines(
self, mock_get_by_id, mock_prune, session, deadline_test_dag
):
"""Deadlines should be pruned when a DAG run fails, not just on success."""
mock_deadline_alert = mock.MagicMock()

Copilot AI Apr 10, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mock.MagicMock() is created without a spec/autospec, which can hide attribute/typo mistakes in tests. Consider using a spec’d mock (or an autospecced patch) matching the DeadlineAlert model so the test fails if it relies on non-existent attributes.

Suggested change
mock_deadline_alert = mock.MagicMock()
mock_deadline_alert = mock.create_autospec(DeadlineAlertModel, instance=True)

Copilot uses AI. Check for mistakes.
mock_deadline_alert.reference_class = SerializedReferenceModels.FixedDatetimeDeadline
mock_get_by_id.return_value = mock_deadline_alert

scheduler_dag = deadline_test_dag()

deadline_ids = ["deadline-uuid-1", "deadline-uuid-2"]
scheduler_dag.deadline = deadline_ids

dag_run = self.create_dag_run(
dag=scheduler_dag,
task_states={"task_1": TaskInstanceState.SUCCESS, "task_2": TaskInstanceState.FAILED},
session=session,
)
dag_run.dag = scheduler_dag

dag_run.update_state(session=session)

assert mock_get_by_id.call_count == len(deadline_ids)
for deadline_id in deadline_ids:
mock_get_by_id.assert_any_call(deadline_id, session)
mock_prune.assert_called_once_with(session=session, conditions={DagRun.id: dag_run.id})
assert dag_run.state == DagRunState.FAILED


@pytest.mark.parametrize(
("run_type", "expected_tis"),
Expand Down
Loading