Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/68919.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed the deadline UI/API endpoints raising ``500`` errors on deadline alerts whose interval is stored as a serialized object rather than a plain number. ``DeadlineAlert.interval`` is a JSON column holding the Airflow-serialized interval (a fixed ``timedelta`` or a dynamic ``VariableInterval``); the ``/ui/dags/{dag_id}/deadlineAlerts`` response now coerces it to seconds for a fixed interval and to ``null`` for a dynamic one instead of failing to validate. Sorting deadline alerts by ``interval`` (which would sort by JSON structure, not duration) is no longer offered. Deserializing a corrupt or unrecognized deadline reference now raises a clear error instead of an opaque ``KeyError``, custom references that share a class name with a builtin are routed correctly via ``__class_path``, and ``Deadline.__repr__`` no longer raises when the ``dagrun`` relationship has been severed. ``prune_deadlines`` now explicitly skips deadlines already marked ``missed`` so their queued callbacks are never cascade-deleted.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

from collections.abc import Iterable
from datetime import datetime
from typing import Any
from uuid import UUID

from pydantic import AliasPath, Field
from pydantic import AliasPath, Field, field_validator

from airflow.api_fastapi.core_api.base import BaseModel

Expand Down Expand Up @@ -52,9 +53,42 @@ class DeadlineAlertResponse(BaseModel):
id: UUID
name: str | None = None
reference_type: str = Field(validation_alias=AliasPath("reference", "reference_type"))
interval: float = Field(description="Interval in seconds between deadline evaluations.")
interval: float | None = Field(
default=None,
description=(
"Interval in seconds between the reference time and the deadline. "
"Null for a dynamic interval (e.g. a VariableInterval) whose value is "
"only resolved at scheduler evaluation time."
),
)
created_at: datetime

@field_validator("interval", mode="before")
@classmethod
def coerce_interval_to_seconds(cls, value: Any) -> float | None:
"""
Coerce the stored ``interval`` into seconds.

``DeadlineAlert.interval`` is a JSON column holding the Airflow-serialized form
of the SDK interval, not a plain number. A fixed ``timedelta`` serializes to
``{"__classname__": "datetime.timedelta", "__data__": <seconds>}`` and a dynamic
``VariableInterval`` to ``{"__classname__": ".../VariableInterval", "__data__": {...}}``.
Without this coercion Pydantic cannot turn that dict into ``float`` and the
``/ui/dags/{dag_id}/deadlineAlerts`` endpoint raises a 500, which breaks the
run-page deadline status badge. Return the seconds for a fixed interval, or
``None`` for a dynamic one (resolved later by the scheduler).
"""
if value is None or isinstance(value, (int, float)):
return value
if isinstance(value, dict):
data = value.get("__data__")
# Fixed timedelta: __data__ is the total seconds as a number.
if isinstance(data, (int, float)):
return float(data)
# Dynamic interval (e.g. VariableInterval): no fixed seconds to report.
return None
return None


class DeadlineAlertCollectionResponse(BaseModel):
"""DeadlineAlert Collection serializer for responses."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -894,13 +894,12 @@ paths:
type: string
description: 'Attributes to order by, multi criteria sort is supported.
Prefix with `-` for descending order. Supported attributes: `id, created_at,
name, interval`'
name`'
default:
- created_at
title: Order By
description: 'Attributes to order by, multi criteria sort is supported. Prefix
with `-` for descending order. Supported attributes: `id, created_at, name,
interval`'
with `-` for descending order. Supported attributes: `id, created_at, name`'
responses:
'200':
description: Successful Response
Expand Down Expand Up @@ -2515,9 +2514,13 @@ components:
type: string
title: Reference Type
interval:
type: number
anyOf:
- type: number
- type: 'null'
title: Interval
description: Interval in seconds between deadline evaluations.
description: Interval in seconds between the reference time and the deadline.
Null for a dynamic interval (e.g. a VariableInterval) whose value is only
resolved at scheduler evaluation time.
created_at:
type: string
format: date-time
Expand All @@ -2526,7 +2529,6 @@ components:
required:
- id
- reference_type
- interval
- created_at
title: DeadlineAlertResponse
description: DeadlineAlert serializer for responses.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,16 @@ def get_dag_deadline_alerts(
order_by: Annotated[
SortParam,
Depends(
# NOTE: ``interval`` is intentionally NOT a sortable key. ``DeadlineAlert.interval`` is a
# JSON column holding the Airflow-serialized interval — a dict such as
# ``{"__classname__": "datetime.timedelta", "__data__": 300.0}`` for a fixed interval, or a
# structurally different dict for a ``VariableInterval``. Ordering by it at the DB level
# sorts by the JSON text/structure, not the duration, so the result is arbitrary and
# misleading (e.g. a dynamic VariableInterval sorts before/after fixed intervals by shape,
# and "300" vs "3600" compare lexicographically). Meaningful sorting would need a computed
# seconds column. Allow only columns that sort correctly.
SortParam(
["id", "created_at", "name", "interval"],
["id", "created_at", "name"],
DeadlineAlert,
).dynamic_depends(default="created_at")
),
Expand Down
18 changes: 15 additions & 3 deletions airflow-core/src/airflow/models/deadline.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,13 @@ def __repr__(self):
def _determine_resource() -> tuple[str, str]:
"""Determine the type of resource based on which values are present."""
if self.dagrun_id:
# The deadline is for a Dag run:
return "DagRun", f"Dag: {self.dagrun.dag_id} Run: {self.dagrun_id}"
# The deadline is for a Dag run. Guard on the ``dagrun`` relationship (not just
# ``dagrun_id``): the FK can be set while the relationship resolves to None — e.g.
# the DagRun was deleted (ondelete=CASCADE) and this is a stale/expired in-memory
# Deadline. A __repr__ must never raise (it's used in logs, tracebacks, debuggers),
# so fall back to the id-only form rather than dereferencing ``self.dagrun.dag_id``.
dag_id = self.dagrun.dag_id if self.dagrun is not None else "<unknown>"
return "DagRun", f"Dag: {dag_id} Run: {self.dagrun_id}"

return "Unknown", ""

Expand Down Expand Up @@ -184,8 +189,15 @@ def prune_deadlines(cls, *, session: Session, conditions: dict[Mapped, Any]) ->

try:
# Get deadlines which match the provided conditions and their associated DagRuns.
# Exclude deadlines already marked ``missed``: once the scheduler has marked a
# deadline missed it has queued (and owns) that deadline's callback, so prune must
# never delete it (the cascade would drop the queued callback). Today this is also
# implied by the ``end_date <= deadline_time`` guard below — a missed deadline has
# ``deadline_time < now <= end_date`` so it can't satisfy that predicate — but making
# the ``~missed`` filter explicit keeps the "prune only handles on-time deadlines"
# invariant from depending on clock relationships and protects against future callers.
deadline_dagrun_pairs = session.execute(
select(Deadline, DagRun).join(DagRun).where(and_(*filter_conditions))
select(Deadline, DagRun).join(DagRun).where(and_(*filter_conditions)).where(~Deadline.missed)
).all()

except AttributeError as e:
Expand Down
15 changes: 13 additions & 2 deletions airflow-core/src/airflow/models/deadline_alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,22 @@ def __repr__(self):

interval_seconds = None

# ``interval`` is a JSON column holding the Airflow-serialized interval: a fixed timedelta is
# ``{"__classname__": "datetime.timedelta", "__data__": <seconds>}``, a dynamic
# ``VariableInterval`` is ``{"__classname__": ".../VariableInterval", "__data__": {...}}``, and
# a legacy pre-0117 blob may be a bare number. Extract seconds for the fixed/legacy cases and
# fall back to "dynamic" otherwise. (Note: the previous ``isinstance(self.interval,
# datetime.timedelta)`` branch was doubly broken — ``datetime`` here is the *class*
# ``datetime.datetime`` (``from datetime import datetime``), so ``datetime.timedelta`` raised
# ``AttributeError``, and ``interval`` is always a dict so that branch was also unreachable-by-type.
# A ``__repr__`` must never raise — it is used in logs, tracebacks, and debuggers.)
if isinstance(self.interval, (int, float)):
interval_seconds = int(self.interval)

elif isinstance(self.interval, datetime.timedelta):
interval_seconds = int(self.interval.total_seconds())
elif isinstance(self.interval, dict):
data = self.interval.get("__data__")
if isinstance(data, (int, float)):
interval_seconds = int(data)

if interval_seconds is None:
interval_display = "dynamic"
Expand Down
11 changes: 10 additions & 1 deletion airflow-core/src/airflow/serialization/decoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,16 @@ def decode_deadline_reference(reference_data: dict):
"""Decode a previously serialized deadline reference."""
ref_name = reference_data.get(SerializedReferenceModels.REFERENCE_TYPE_FIELD)

if ref_name and SerializedReferenceModels.is_builtin_reference(ref_name):
# ``__class_path`` is stamped by the encoder only for custom (non-builtin) references and is
# the authoritative discriminator. It must take precedence over the ``reference_type`` name:
# a user's custom reference may share a class name with a builtin (e.g. ``FixedDatetimeDeadline``),
# and routing by name alone would silently decode it as the builtin class — dropping the custom
# evaluation logic (or raising a spurious KeyError on builtin-only fields).
if "__class_path" in reference_data:
reference_class: type[SerializedReferenceModels.SerializedBaseDeadlineReference] = (
SerializedReferenceModels.SerializedCustomReference
)
elif ref_name and SerializedReferenceModels.is_builtin_reference(ref_name):
reference_class = SerializedReferenceModels.get_reference_class(ref_name)
else:
reference_class = SerializedReferenceModels.SerializedCustomReference
Expand Down
17 changes: 16 additions & 1 deletion airflow-core/src/airflow/serialization/definitions/deadline.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,22 @@ def serialize_reference(self) -> dict:
def deserialize_reference(cls, reference_data: dict):
from airflow.serialization.helpers import find_registered_custom_deadline_reference

custom_class = find_registered_custom_deadline_reference(reference_data["__class_path"])
# ``decode_deadline_reference`` also routes here for any reference it cannot otherwise
# classify — an unrecognized ``reference_type`` with no ``__class_path`` (corrupted /
# hand-edited rows, a blob from a newer version, or a legacy custom-ref whose plugin is
# gone). Without ``__class_path`` there is nothing to import, so a bare
# ``reference_data["__class_path"]`` would raise an opaque ``KeyError``. Surface a clear,
# actionable error instead (the deadline-creation isolation logs this and skips the
# alert rather than aborting the DagRun).
class_path = reference_data.get("__class_path")
if not class_path:
raise ValueError(
"Cannot deserialize deadline reference: unrecognized reference_type "
f"{reference_data.get(SerializedReferenceModels.REFERENCE_TYPE_FIELD)!r} with no "
"'__class_path' to import. The stored reference is corrupt, from a newer "
"Airflow version, or references a custom class whose plugin is no longer installed."
)
custom_class = find_registered_custom_deadline_reference(class_path)
inner_ref = custom_class.deserialize_reference(reference_data)
return cls(inner_ref)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

from __future__ import annotations

from datetime import timedelta

import pytest
from sqlalchemy import select

Expand All @@ -26,7 +28,7 @@
from airflow.models.serialized_dag import SerializedDagModel
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk.definitions.callback import AsyncCallback
from airflow.sdk.definitions.deadline import DeadlineReference
from airflow.sdk.definitions.deadline import DeadlineReference, VariableInterval
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

Expand Down Expand Up @@ -507,10 +509,89 @@ def test_should_response_404_for_nonexistent_dag(self, test_client):
response = test_client.get("/dags/nonexistent_dag/deadlineAlerts")
assert response.status_code == 404

@pytest.mark.parametrize("order_by", ["interval", "-interval"])
def test_order_by_interval_is_rejected(self, test_client, order_by):
"""``interval`` is a serialized-JSON column (timedelta/VariableInterval dict), so DB-level
ordering by it is meaningless (sorts by JSON structure, not duration). It must NOT be an
allowed sort key — the endpoint should reject it with a 400 rather than silently returning
an arbitrarily-ordered list.
"""
response = test_client.get(f"/dags/{DAG_ID}/deadlineAlerts", params={"order_by": order_by})
assert response.status_code == 400
assert "interval" in response.json()["detail"]

def test_should_response_401(self, unauthenticated_test_client):
response = unauthenticated_test_client.get(f"/dags/{DAG_ID}/deadlineAlerts")
assert response.status_code == 401

def test_should_response_403(self, unauthorized_test_client):
response = unauthorized_test_client.get(f"/dags/{DAG_ID}/deadlineAlerts")
assert response.status_code == 403


class TestDeadlineAlertsIntervalSerialization:
"""Regression tests for the ``interval`` column shape on the deadlineAlerts endpoint.

``DeadlineAlert.interval`` is a JSON column that, in production, holds the
Airflow-*serialized* interval — ``encode_deadline_alert`` stores
``serialize(self.interval)``, not a plain number. A fixed ``timedelta`` becomes
``{"__classname__": "datetime.timedelta", "__version__": 2, "__data__": <seconds>}``
and a dynamic ``VariableInterval`` becomes
``{"__classname__": ".../VariableInterval", "__data__": {"key": ...}}``.

The response model originally typed ``interval`` as a bare ``float``, so Pydantic
raised on that dict and the endpoint returned 500 — which broke the run-page
``DeadlineStatus`` badge. The existing fixtures masked this by seeding a bare float
(``interval=3600.0``), a value the real creation path never produces. These tests
seed the realistic serialized forms.
"""

@pytest.fixture
def dag_with_serialized_intervals(self, dag_maker, session):
from airflow.sdk.serde import serialize

dag_id = "dag_serialized_interval"
with dag_maker(dag_id, serialized=True, session=session):
EmptyOperator(task_id="task")
dag_maker.sync_dagbag_to_db()
session.commit()

serialized_dag = session.scalar(select(SerializedDagModel).where(SerializedDagModel.dag_id == dag_id))
# Fixed interval: stored as the serialized timedelta dict, exactly as
# encode_deadline_alert would persist it.
session.add(
DeadlineAlert(
serialized_dag_id=serialized_dag.id,
name="fixed_interval_alert",
reference=DeadlineReference.DAGRUN_QUEUED_AT.serialize_reference(),
interval=serialize(timedelta(seconds=300)),
callback_def={"path": _CALLBACK_PATH},
)
)
# Dynamic interval: a VariableInterval serializes to a dict with no fixed
# seconds — the value is only resolved at scheduler evaluation time.
session.add(
DeadlineAlert(
serialized_dag_id=serialized_dag.id,
name="dynamic_interval_alert",
reference=DeadlineReference.DAGRUN_QUEUED_AT.serialize_reference(),
interval=serialize(VariableInterval("deadline_seconds")),
callback_def={"path": _CALLBACK_PATH},
)
)
session.commit()
return dag_id

def test_serialized_timedelta_interval_does_not_500(self, test_client, dag_with_serialized_intervals):
"""A fixed timedelta interval is coerced to its seconds value (not a 500)."""
response = test_client.get(f"/dags/{dag_with_serialized_intervals}/deadlineAlerts")
assert response.status_code == 200
alerts = {a["name"]: a for a in response.json()["deadline_alerts"]}
assert alerts["fixed_interval_alert"]["interval"] == 300.0

def test_dynamic_variable_interval_serializes_as_null(self, test_client, dag_with_serialized_intervals):
"""A dynamic VariableInterval has no fixed seconds, so interval is null (not a 500)."""
response = test_client.get(f"/dags/{dag_with_serialized_intervals}/deadlineAlerts")
assert response.status_code == 200
alerts = {a["name"]: a for a in response.json()["deadline_alerts"]}
assert alerts["dynamic_interval_alert"]["interval"] is None
17 changes: 17 additions & 0 deletions airflow-core/tests/unit/models/test_deadline.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,23 @@ def test_repr_without_callback_kwargs(self, dagrun, session):
assert f"needed by {DEFAULT_DATE}" in repr_str
assert TEST_CALLBACK_PATH in repr_str

def test_repr_with_dagrun_id_but_no_dagrun_relationship(self, deadline_orm):
"""__repr__ must NOT raise when dagrun_id is set but the dagrun relationship is None.

The FK (dagrun_id) can be set while the relationship resolves to None — e.g. the DagRun
was deleted (ondelete=CASCADE) and this is a stale/expired in-memory Deadline. A __repr__
that raised AttributeError here would break log lines, tracebacks, and debugger displays
exactly when something is already going wrong. The repr falls back to an id-only form.
"""
# Sever the relationship while keeping the FK id (simulates deleted/detached DagRun).
deadline_orm.dagrun = None
assert deadline_orm.dagrun_id is not None

repr_str = repr(deadline_orm) # must not raise
assert "[DagRun Deadline]" in repr_str
assert f"Run: {deadline_orm.dagrun_id}" in repr_str
assert "Dag: <unknown>" in repr_str

@pytest.mark.db_test
def test_bundle_name_propagated_to_callback(self, dagrun, session):
"""The bundle name is forwarded to the callback so the triggerer can resolve its team."""
Expand Down
Loading
Loading