From 31bd6d5e7640c0b5fc8518842331cbe7c6c69378 Mon Sep 17 00:00:00 2001 From: Sean Ghaeli Date: Tue, 23 Jun 2026 21:30:08 +0000 Subject: [PATCH] Make deadline reads and serialization robust to dynamic/malformed intervals Hardens the read and (de)serialization paths for deadline alerts so dynamic (``VariableInterval``) and malformed stored data no longer break the UI/API. - UI deadline-alert response: ``DeadlineAlert.interval`` is a JSON column holding the Airflow-serialized interval, not a plain number. Coerce it to seconds for a fixed ``timedelta`` and to ``None`` for a dynamic ``VariableInterval`` (resolved later by the scheduler), instead of letting Pydantic 500 on the dict. The ``interval`` field becomes ``float | None``. - Drop ``interval`` from the sortable columns of the deadline-alerts endpoint: ordering by a JSON column sorts by structure/text, not duration, so the result was arbitrary and misleading. - Deserialization: route by the encoder-stamped ``__class_path`` ahead of the ``reference_type`` name (a custom reference may share a class name with a builtin), and raise a clear error for a reference with no importable ``__class_path`` instead of an opaque ``KeyError``. - ``Deadline.__repr__`` / ``DeadlineAlert.__repr__`` no longer raise: guard the ``dagrun`` relationship (the FK can be set while the relationship is None after a cascade delete) and handle the dict-shaped JSON interval. A ``__repr__`` must never raise. - ``prune_deadlines`` explicitly excludes deadlines already marked ``missed`` so a missed deadline (whose callback is owned by the scheduler/triggerer) and its queued callback are never cascade-deleted. Generated-by: Claude Code (Opus via Claude Code) on behalf of Sean Ghaeli --- airflow-core/newsfragments/68919.bugfix.rst | 1 + .../core_api/datamodels/ui/deadline.py | 38 ++++- .../core_api/openapi/_private_ui.yaml | 14 +- .../core_api/routes/ui/deadlines.py | 10 +- airflow-core/src/airflow/models/deadline.py | 18 ++- .../src/airflow/models/deadline_alert.py | 15 +- .../src/airflow/serialization/decoders.py | 11 +- .../serialization/definitions/deadline.py | 17 ++- .../core_api/routes/ui/test_deadlines.py | 83 ++++++++++- .../tests/unit/models/test_deadline.py | 17 +++ .../tests/unit/models/test_deadline_alert.py | 31 ++++ .../tests/unit/models/test_prune_deadlines.py | 140 ++++++++++++++++++ .../test_deadline_reference_registry.py | 82 ++++++++++ 13 files changed, 460 insertions(+), 17 deletions(-) create mode 100644 airflow-core/newsfragments/68919.bugfix.rst create mode 100644 airflow-core/tests/unit/models/test_prune_deadlines.py diff --git a/airflow-core/newsfragments/68919.bugfix.rst b/airflow-core/newsfragments/68919.bugfix.rst new file mode 100644 index 0000000000000..4e938d68e5402 --- /dev/null +++ b/airflow-core/newsfragments/68919.bugfix.rst @@ -0,0 +1 @@ +Fixed the deadline UI/API endpoints raising ``500`` errors on deadline alerts whose interval is stored as a serialized object rather than a plain number. ``DeadlineAlert.interval`` is a JSON column holding the Airflow-serialized interval (a fixed ``timedelta`` or a dynamic ``VariableInterval``); the ``/ui/dags/{dag_id}/deadlineAlerts`` response now coerces it to seconds for a fixed interval and to ``null`` for a dynamic one instead of failing to validate. Sorting deadline alerts by ``interval`` (which would sort by JSON structure, not duration) is no longer offered. Deserializing a corrupt or unrecognized deadline reference now raises a clear error instead of an opaque ``KeyError``, custom references that share a class name with a builtin are routed correctly via ``__class_path``, and ``Deadline.__repr__`` no longer raises when the ``dagrun`` relationship has been severed. ``prune_deadlines`` now explicitly skips deadlines already marked ``missed`` so their queued callbacks are never cascade-deleted. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/deadline.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/deadline.py index 6f9402f23603d..334d3523ac3e3 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/deadline.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/deadline.py @@ -19,9 +19,10 @@ from collections.abc import Iterable from datetime import datetime +from typing import Any from uuid import UUID -from pydantic import AliasPath, Field +from pydantic import AliasPath, Field, field_validator from airflow.api_fastapi.core_api.base import BaseModel @@ -52,9 +53,42 @@ class DeadlineAlertResponse(BaseModel): id: UUID name: str | None = None reference_type: str = Field(validation_alias=AliasPath("reference", "reference_type")) - interval: float = Field(description="Interval in seconds between deadline evaluations.") + interval: float | None = Field( + default=None, + description=( + "Interval in seconds between the reference time and the deadline. " + "Null for a dynamic interval (e.g. a VariableInterval) whose value is " + "only resolved at scheduler evaluation time." + ), + ) created_at: datetime + @field_validator("interval", mode="before") + @classmethod + def coerce_interval_to_seconds(cls, value: Any) -> float | None: + """ + Coerce the stored ``interval`` into seconds. + + ``DeadlineAlert.interval`` is a JSON column holding the Airflow-serialized form + of the SDK interval, not a plain number. A fixed ``timedelta`` serializes to + ``{"__classname__": "datetime.timedelta", "__data__": }`` and a dynamic + ``VariableInterval`` to ``{"__classname__": ".../VariableInterval", "__data__": {...}}``. + Without this coercion Pydantic cannot turn that dict into ``float`` and the + ``/ui/dags/{dag_id}/deadlineAlerts`` endpoint raises a 500, which breaks the + run-page deadline status badge. Return the seconds for a fixed interval, or + ``None`` for a dynamic one (resolved later by the scheduler). + """ + if value is None or isinstance(value, (int, float)): + return value + if isinstance(value, dict): + data = value.get("__data__") + # Fixed timedelta: __data__ is the total seconds as a number. + if isinstance(data, (int, float)): + return float(data) + # Dynamic interval (e.g. VariableInterval): no fixed seconds to report. + return None + return None + class DeadlineAlertCollectionResponse(BaseModel): """DeadlineAlert Collection serializer for responses.""" diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml index d786e259b4e77..d7f20f888f9ad 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml @@ -894,13 +894,12 @@ paths: type: string description: 'Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `id, created_at, - name, interval`' + name`' default: - created_at title: Order By description: 'Attributes to order by, multi criteria sort is supported. Prefix - with `-` for descending order. Supported attributes: `id, created_at, name, - interval`' + with `-` for descending order. Supported attributes: `id, created_at, name`' responses: '200': description: Successful Response @@ -2515,9 +2514,13 @@ components: type: string title: Reference Type interval: - type: number + anyOf: + - type: number + - type: 'null' title: Interval - description: Interval in seconds between deadline evaluations. + description: Interval in seconds between the reference time and the deadline. + Null for a dynamic interval (e.g. a VariableInterval) whose value is only + resolved at scheduler evaluation time. created_at: type: string format: date-time @@ -2526,7 +2529,6 @@ components: required: - id - reference_type - - interval - created_at title: DeadlineAlertResponse description: DeadlineAlert serializer for responses. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/deadlines.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/deadlines.py index d9cfea10d6d94..1f7076e554e01 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/deadlines.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/deadlines.py @@ -165,8 +165,16 @@ def get_dag_deadline_alerts( order_by: Annotated[ SortParam, Depends( + # NOTE: ``interval`` is intentionally NOT a sortable key. ``DeadlineAlert.interval`` is a + # JSON column holding the Airflow-serialized interval — a dict such as + # ``{"__classname__": "datetime.timedelta", "__data__": 300.0}`` for a fixed interval, or a + # structurally different dict for a ``VariableInterval``. Ordering by it at the DB level + # sorts by the JSON text/structure, not the duration, so the result is arbitrary and + # misleading (e.g. a dynamic VariableInterval sorts before/after fixed intervals by shape, + # and "300" vs "3600" compare lexicographically). Meaningful sorting would need a computed + # seconds column. Allow only columns that sort correctly. SortParam( - ["id", "created_at", "name", "interval"], + ["id", "created_at", "name"], DeadlineAlert, ).dynamic_depends(default="created_at") ), diff --git a/airflow-core/src/airflow/models/deadline.py b/airflow-core/src/airflow/models/deadline.py index f596c66301498..7747e8b290403 100644 --- a/airflow-core/src/airflow/models/deadline.py +++ b/airflow-core/src/airflow/models/deadline.py @@ -147,8 +147,13 @@ def __repr__(self): def _determine_resource() -> tuple[str, str]: """Determine the type of resource based on which values are present.""" if self.dagrun_id: - # The deadline is for a Dag run: - return "DagRun", f"Dag: {self.dagrun.dag_id} Run: {self.dagrun_id}" + # The deadline is for a Dag run. Guard on the ``dagrun`` relationship (not just + # ``dagrun_id``): the FK can be set while the relationship resolves to None — e.g. + # the DagRun was deleted (ondelete=CASCADE) and this is a stale/expired in-memory + # Deadline. A __repr__ must never raise (it's used in logs, tracebacks, debuggers), + # so fall back to the id-only form rather than dereferencing ``self.dagrun.dag_id``. + dag_id = self.dagrun.dag_id if self.dagrun is not None else "" + return "DagRun", f"Dag: {dag_id} Run: {self.dagrun_id}" return "Unknown", "" @@ -184,8 +189,15 @@ def prune_deadlines(cls, *, session: Session, conditions: dict[Mapped, Any]) -> try: # Get deadlines which match the provided conditions and their associated DagRuns. + # Exclude deadlines already marked ``missed``: once the scheduler has marked a + # deadline missed it has queued (and owns) that deadline's callback, so prune must + # never delete it (the cascade would drop the queued callback). Today this is also + # implied by the ``end_date <= deadline_time`` guard below — a missed deadline has + # ``deadline_time < now <= end_date`` so it can't satisfy that predicate — but making + # the ``~missed`` filter explicit keeps the "prune only handles on-time deadlines" + # invariant from depending on clock relationships and protects against future callers. deadline_dagrun_pairs = session.execute( - select(Deadline, DagRun).join(DagRun).where(and_(*filter_conditions)) + select(Deadline, DagRun).join(DagRun).where(and_(*filter_conditions)).where(~Deadline.missed) ).all() except AttributeError as e: diff --git a/airflow-core/src/airflow/models/deadline_alert.py b/airflow-core/src/airflow/models/deadline_alert.py index 20bfed459eef9..9f029fff5c15a 100644 --- a/airflow-core/src/airflow/models/deadline_alert.py +++ b/airflow-core/src/airflow/models/deadline_alert.py @@ -57,11 +57,22 @@ def __repr__(self): interval_seconds = None + # ``interval`` is a JSON column holding the Airflow-serialized interval: a fixed timedelta is + # ``{"__classname__": "datetime.timedelta", "__data__": }``, a dynamic + # ``VariableInterval`` is ``{"__classname__": ".../VariableInterval", "__data__": {...}}``, and + # a legacy pre-0117 blob may be a bare number. Extract seconds for the fixed/legacy cases and + # fall back to "dynamic" otherwise. (Note: the previous ``isinstance(self.interval, + # datetime.timedelta)`` branch was doubly broken — ``datetime`` here is the *class* + # ``datetime.datetime`` (``from datetime import datetime``), so ``datetime.timedelta`` raised + # ``AttributeError``, and ``interval`` is always a dict so that branch was also unreachable-by-type. + # A ``__repr__`` must never raise — it is used in logs, tracebacks, and debuggers.) if isinstance(self.interval, (int, float)): interval_seconds = int(self.interval) - elif isinstance(self.interval, datetime.timedelta): - interval_seconds = int(self.interval.total_seconds()) + elif isinstance(self.interval, dict): + data = self.interval.get("__data__") + if isinstance(data, (int, float)): + interval_seconds = int(data) if interval_seconds is None: interval_display = "dynamic" diff --git a/airflow-core/src/airflow/serialization/decoders.py b/airflow-core/src/airflow/serialization/decoders.py index 890a1296a4a00..dce9d8a9e5f06 100644 --- a/airflow-core/src/airflow/serialization/decoders.py +++ b/airflow-core/src/airflow/serialization/decoders.py @@ -148,7 +148,16 @@ def decode_deadline_reference(reference_data: dict): """Decode a previously serialized deadline reference.""" ref_name = reference_data.get(SerializedReferenceModels.REFERENCE_TYPE_FIELD) - if ref_name and SerializedReferenceModels.is_builtin_reference(ref_name): + # ``__class_path`` is stamped by the encoder only for custom (non-builtin) references and is + # the authoritative discriminator. It must take precedence over the ``reference_type`` name: + # a user's custom reference may share a class name with a builtin (e.g. ``FixedDatetimeDeadline``), + # and routing by name alone would silently decode it as the builtin class — dropping the custom + # evaluation logic (or raising a spurious KeyError on builtin-only fields). + if "__class_path" in reference_data: + reference_class: type[SerializedReferenceModels.SerializedBaseDeadlineReference] = ( + SerializedReferenceModels.SerializedCustomReference + ) + elif ref_name and SerializedReferenceModels.is_builtin_reference(ref_name): reference_class = SerializedReferenceModels.get_reference_class(ref_name) else: reference_class = SerializedReferenceModels.SerializedCustomReference diff --git a/airflow-core/src/airflow/serialization/definitions/deadline.py b/airflow-core/src/airflow/serialization/definitions/deadline.py index 89e231cba24dc..1226fec3c6c9c 100644 --- a/airflow-core/src/airflow/serialization/definitions/deadline.py +++ b/airflow-core/src/airflow/serialization/definitions/deadline.py @@ -311,7 +311,22 @@ def serialize_reference(self) -> dict: def deserialize_reference(cls, reference_data: dict): from airflow.serialization.helpers import find_registered_custom_deadline_reference - custom_class = find_registered_custom_deadline_reference(reference_data["__class_path"]) + # ``decode_deadline_reference`` also routes here for any reference it cannot otherwise + # classify — an unrecognized ``reference_type`` with no ``__class_path`` (corrupted / + # hand-edited rows, a blob from a newer version, or a legacy custom-ref whose plugin is + # gone). Without ``__class_path`` there is nothing to import, so a bare + # ``reference_data["__class_path"]`` would raise an opaque ``KeyError``. Surface a clear, + # actionable error instead (the deadline-creation isolation logs this and skips the + # alert rather than aborting the DagRun). + class_path = reference_data.get("__class_path") + if not class_path: + raise ValueError( + "Cannot deserialize deadline reference: unrecognized reference_type " + f"{reference_data.get(SerializedReferenceModels.REFERENCE_TYPE_FIELD)!r} with no " + "'__class_path' to import. The stored reference is corrupt, from a newer " + "Airflow version, or references a custom class whose plugin is no longer installed." + ) + custom_class = find_registered_custom_deadline_reference(class_path) inner_ref = custom_class.deserialize_reference(reference_data) return cls(inner_ref) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_deadlines.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_deadlines.py index acadab3a6b16f..6c83044e17c3f 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_deadlines.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_deadlines.py @@ -17,6 +17,8 @@ from __future__ import annotations +from datetime import timedelta + import pytest from sqlalchemy import select @@ -26,7 +28,7 @@ from airflow.models.serialized_dag import SerializedDagModel from airflow.providers.standard.operators.empty import EmptyOperator from airflow.sdk.definitions.callback import AsyncCallback -from airflow.sdk.definitions.deadline import DeadlineReference +from airflow.sdk.definitions.deadline import DeadlineReference, VariableInterval from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -507,6 +509,17 @@ def test_should_response_404_for_nonexistent_dag(self, test_client): response = test_client.get("/dags/nonexistent_dag/deadlineAlerts") assert response.status_code == 404 + @pytest.mark.parametrize("order_by", ["interval", "-interval"]) + def test_order_by_interval_is_rejected(self, test_client, order_by): + """``interval`` is a serialized-JSON column (timedelta/VariableInterval dict), so DB-level + ordering by it is meaningless (sorts by JSON structure, not duration). It must NOT be an + allowed sort key — the endpoint should reject it with a 400 rather than silently returning + an arbitrarily-ordered list. + """ + response = test_client.get(f"/dags/{DAG_ID}/deadlineAlerts", params={"order_by": order_by}) + assert response.status_code == 400 + assert "interval" in response.json()["detail"] + def test_should_response_401(self, unauthenticated_test_client): response = unauthenticated_test_client.get(f"/dags/{DAG_ID}/deadlineAlerts") assert response.status_code == 401 @@ -514,3 +527,71 @@ def test_should_response_401(self, unauthenticated_test_client): def test_should_response_403(self, unauthorized_test_client): response = unauthorized_test_client.get(f"/dags/{DAG_ID}/deadlineAlerts") assert response.status_code == 403 + + +class TestDeadlineAlertsIntervalSerialization: + """Regression tests for the ``interval`` column shape on the deadlineAlerts endpoint. + + ``DeadlineAlert.interval`` is a JSON column that, in production, holds the + Airflow-*serialized* interval — ``encode_deadline_alert`` stores + ``serialize(self.interval)``, not a plain number. A fixed ``timedelta`` becomes + ``{"__classname__": "datetime.timedelta", "__version__": 2, "__data__": }`` + and a dynamic ``VariableInterval`` becomes + ``{"__classname__": ".../VariableInterval", "__data__": {"key": ...}}``. + + The response model originally typed ``interval`` as a bare ``float``, so Pydantic + raised on that dict and the endpoint returned 500 — which broke the run-page + ``DeadlineStatus`` badge. The existing fixtures masked this by seeding a bare float + (``interval=3600.0``), a value the real creation path never produces. These tests + seed the realistic serialized forms. + """ + + @pytest.fixture + def dag_with_serialized_intervals(self, dag_maker, session): + from airflow.sdk.serde import serialize + + dag_id = "dag_serialized_interval" + with dag_maker(dag_id, serialized=True, session=session): + EmptyOperator(task_id="task") + dag_maker.sync_dagbag_to_db() + session.commit() + + serialized_dag = session.scalar(select(SerializedDagModel).where(SerializedDagModel.dag_id == dag_id)) + # Fixed interval: stored as the serialized timedelta dict, exactly as + # encode_deadline_alert would persist it. + session.add( + DeadlineAlert( + serialized_dag_id=serialized_dag.id, + name="fixed_interval_alert", + reference=DeadlineReference.DAGRUN_QUEUED_AT.serialize_reference(), + interval=serialize(timedelta(seconds=300)), + callback_def={"path": _CALLBACK_PATH}, + ) + ) + # Dynamic interval: a VariableInterval serializes to a dict with no fixed + # seconds — the value is only resolved at scheduler evaluation time. + session.add( + DeadlineAlert( + serialized_dag_id=serialized_dag.id, + name="dynamic_interval_alert", + reference=DeadlineReference.DAGRUN_QUEUED_AT.serialize_reference(), + interval=serialize(VariableInterval("deadline_seconds")), + callback_def={"path": _CALLBACK_PATH}, + ) + ) + session.commit() + return dag_id + + def test_serialized_timedelta_interval_does_not_500(self, test_client, dag_with_serialized_intervals): + """A fixed timedelta interval is coerced to its seconds value (not a 500).""" + response = test_client.get(f"/dags/{dag_with_serialized_intervals}/deadlineAlerts") + assert response.status_code == 200 + alerts = {a["name"]: a for a in response.json()["deadline_alerts"]} + assert alerts["fixed_interval_alert"]["interval"] == 300.0 + + def test_dynamic_variable_interval_serializes_as_null(self, test_client, dag_with_serialized_intervals): + """A dynamic VariableInterval has no fixed seconds, so interval is null (not a 500).""" + response = test_client.get(f"/dags/{dag_with_serialized_intervals}/deadlineAlerts") + assert response.status_code == 200 + alerts = {a["name"]: a for a in response.json()["deadline_alerts"]} + assert alerts["dynamic_interval_alert"]["interval"] is None diff --git a/airflow-core/tests/unit/models/test_deadline.py b/airflow-core/tests/unit/models/test_deadline.py index 3635bcadcbea5..d07f9f2006792 100644 --- a/airflow-core/tests/unit/models/test_deadline.py +++ b/airflow-core/tests/unit/models/test_deadline.py @@ -212,6 +212,23 @@ def test_repr_without_callback_kwargs(self, dagrun, session): assert f"needed by {DEFAULT_DATE}" in repr_str assert TEST_CALLBACK_PATH in repr_str + def test_repr_with_dagrun_id_but_no_dagrun_relationship(self, deadline_orm): + """__repr__ must NOT raise when dagrun_id is set but the dagrun relationship is None. + + The FK (dagrun_id) can be set while the relationship resolves to None — e.g. the DagRun + was deleted (ondelete=CASCADE) and this is a stale/expired in-memory Deadline. A __repr__ + that raised AttributeError here would break log lines, tracebacks, and debugger displays + exactly when something is already going wrong. The repr falls back to an id-only form. + """ + # Sever the relationship while keeping the FK id (simulates deleted/detached DagRun). + deadline_orm.dagrun = None + assert deadline_orm.dagrun_id is not None + + repr_str = repr(deadline_orm) # must not raise + assert "[DagRun Deadline]" in repr_str + assert f"Run: {deadline_orm.dagrun_id}" in repr_str + assert "Dag: " in repr_str + @pytest.mark.db_test def test_bundle_name_propagated_to_callback(self, dagrun, session): """The bundle name is forwarded to the callback so the triggerer can resolve its team.""" diff --git a/airflow-core/tests/unit/models/test_deadline_alert.py b/airflow-core/tests/unit/models/test_deadline_alert.py index a9b1854f6abce..1ea54bfd69d75 100644 --- a/airflow-core/tests/unit/models/test_deadline_alert.py +++ b/airflow-core/tests/unit/models/test_deadline_alert.py @@ -117,6 +117,37 @@ def test_deadline_alert_repr(self, deadline_alert_orm, deadline_reference): assert "interval=1m" in repr_str assert repr(deadline_alert_orm.callback_def) in repr_str + @pytest.mark.parametrize( + ("interval", "expected"), + [ + # The PRODUCTION shape after migration 0117: ``interval`` is a JSON dict holding the + # Airflow-serialized form, NOT a bare number. A fixed timedelta serializes to + # ``{"__classname__": "datetime.timedelta", "__data__": }``. + pytest.param( + {"__classname__": "datetime.timedelta", "__data__": 7200.0}, "interval=2h", id="timedelta_2h" + ), + # A corrupted dict without ``__data__`` must still render (no raise) as dynamic. + pytest.param({"unexpected": "shape"}, "interval=dynamic", id="corrupted_dict_dynamic"), + ], + ) + def test_deadline_alert_repr_does_not_raise_on_json_dict_interval( + self, deadline_alert_orm, interval, expected + ): + """``DeadlineAlert.__repr__`` must never raise for the PRODUCTION (JSON-dict) ``interval`` shape. + + Regression for a bug where ``__repr__`` did ``isinstance(self.interval, datetime.timedelta)`` + while ``datetime`` is imported as the *class* (``from datetime import datetime``), so + ``datetime.timedelta`` raised ``AttributeError`` — and since ``interval`` is *always* a JSON + dict in production (post-migration-0117), every real ``repr()`` hit that branch and raised + (a ``__repr__`` that raises breaks logs/tracebacks/debuggers — the same class as the + ``Deadline.__repr__`` None-deref fix). The old ``test_deadline_alert_repr`` used a *bare int* + fixture (``DEADLINE_INTERVAL = 60``) which only exercised the int/float branch and masked it. + """ + deadline_alert_orm.interval = interval + repr_str = repr(deadline_alert_orm) # must not raise + assert "[DeadlineAlert]" in repr_str + assert expected in repr_str + def test_deadline_alert_matches_definition(self, session, deadline_reference): alert1 = DeadlineAlert( serialized_dag_id=SERIALIZED_DAG_ID, diff --git a/airflow-core/tests/unit/models/test_prune_deadlines.py b/airflow-core/tests/unit/models/test_prune_deadlines.py new file mode 100644 index 0000000000000..ac657e4395358 --- /dev/null +++ b/airflow-core/tests/unit/models/test_prune_deadlines.py @@ -0,0 +1,140 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Adversarial QA coverage for ``Deadline.prune_deadlines``. + +``prune_deadlines`` is the batch-delete path the scheduler invokes (via +``DagRun.update_state`` -> ``dagrun.py:1237``) when a DagRun completes on time: +deadlines that no longer need to fire are removed. These tests drill into the +on-time/overdue/pending selection logic, the (lack of) batching, callback +cascade behaviour, and concurrent-mutation edge cases against a real DB. +""" + +from __future__ import annotations + +from datetime import timedelta +from typing import TYPE_CHECKING + +import pytest +import time_machine +from sqlalchemy import select + +from airflow.models import DagRun +from airflow.models.callback import Callback +from airflow.models.deadline import Deadline +from airflow.providers.standard.operators.empty import EmptyOperator +from airflow.sdk.definitions.callback import AsyncCallback +from airflow.utils.state import DagRunState + +from tests_common.test_utils import db +from unit.models import DEFAULT_DATE + +if TYPE_CHECKING: + from sqlalchemy.orm import Session + +DAG_ID = "qaw18_prune_dag" + + +async def _qaw18_callback(): + pass + + +CALLBACK_PATH = f"{__name__}.{_qaw18_callback.__name__}" + + +def _clean_db(): + db.clear_db_dags() + db.clear_db_runs() + db.clear_db_deadline() + + +@pytest.fixture +def dagrun(session, dag_maker): + with dag_maker(DAG_ID): + EmptyOperator(task_id="task_id") + with time_machine.travel(DEFAULT_DATE): + dag_maker.create_dagrun(state=DagRunState.QUEUED, logical_date=DEFAULT_DATE) + session.commit() + return session.scalars(select(DagRun)).one() + + +def _make_deadline(session: Session, *, dagrun_id: int, deadline_time, state=None) -> Deadline: + deadline = Deadline( + deadline_time=deadline_time, + callback=AsyncCallback(CALLBACK_PATH), + dagrun_id=dagrun_id, + dag_id=DAG_ID, + deadline_alert_id=None, + ) + session.add(deadline) + session.flush() + if state is not None: + deadline.callback.state = state + session.add(deadline.callback) + session.flush() + return deadline + + +@pytest.mark.db_test +class TestPruneDeadlines: + @staticmethod + def setup_method(): + _clean_db() + + @staticmethod + def teardown_method(): + _clean_db() + + def test_handle_miss_then_prune_does_not_delete_missed(self, dagrun, session): + """ + Inverse race: handle_miss marks the deadline missed and queues the callback + BEFORE the on-time prune runs. prune must NOT delete a deadline already marked + ``missed`` — its callback is owned by the scheduler/triggerer, and cascade-deleting + the deadline would silently drop that queued callback (a lost-callback window). + + prune_deadlines explicitly filters ``~Deadline.missed`` so a missed deadline (and its + queued callback) survives even if the DagRun's end_date would otherwise match the + on-time predicate. (In the real scheduler a missed deadline always has + ``deadline_time < now <= end_date`` so it can't match the on-time predicate anyway; + the explicit filter makes the invariant robust to future callers and clock skew.) + """ + d = _make_deadline(session, dagrun_id=dagrun.id, deadline_time=DEFAULT_DATE + timedelta(hours=1)) + # Reach the post-miss state directly: the deadline is marked ``missed`` and its callback + # is queued (owned by the scheduler/triggerer). We set this up explicitly rather than via + # ``handle_miss`` so the test pins the prune ``~missed`` guard in isolation, independent of + # how the missed state is produced. + d.callback.queue(session=session) + d.missed = True + session.add_all([d, d.callback]) + session.flush() + assert d.missed is True + deadline_id = d.id + callback_id = d.callback.id + + # DagRun reports on-time completion (end_date <= deadline_time) — the exact condition + # that would have pruned the row before the ~missed guard was added. + dagrun.end_date = DEFAULT_DATE + session.add(dagrun) + session.flush() + + deleted = Deadline.prune_deadlines(session=session, conditions={DagRun.id: dagrun.id}) + session.flush() + + # The missed deadline and its queued callback both survive. + assert deleted == 0 + assert session.get(Deadline, deadline_id) is not None + assert session.get(Callback, callback_id) is not None diff --git a/airflow-core/tests/unit/serialization/test_deadline_reference_registry.py b/airflow-core/tests/unit/serialization/test_deadline_reference_registry.py index feae74bf98043..ff54a6b2569a8 100644 --- a/airflow-core/tests/unit/serialization/test_deadline_reference_registry.py +++ b/airflow-core/tests/unit/serialization/test_deadline_reference_registry.py @@ -98,3 +98,85 @@ def test_serialized_custom_reference_rejects_unregistered(monkeypatch): SerializedReferenceModels.SerializedCustomReference.deserialize_reference( {"__class_path": "some.other.module.UnregisteredReference"} ) + + +@pytest.mark.parametrize( + "reference_data", + [ + pytest.param({"reference_type": "TotallyUnknownReference"}, id="unknown_type_no_class_path"), + pytest.param({"reference_type": "X", "__class_path": ""}, id="empty_class_path"), + ], +) +def test_serialized_custom_reference_missing_class_path_raises_clear_error(reference_data): + """A reference routed to SerializedCustomReference but lacking a usable ``__class_path`` + (corrupt / hand-edited row, blob from a newer version, or a custom ref whose plugin is gone) + must raise a clear ValueError — NOT a bare ``KeyError: '__class_path'``.""" + with pytest.raises(ValueError, match="unrecognized reference_type"): + SerializedReferenceModels.SerializedCustomReference.deserialize_reference(reference_data) + + +class FixedDatetimeDeadline(ReferenceModels.BaseDeadlineReference): + """Custom reference whose class name deliberately collides with a builtin reference name.""" + + required_kwargs: set[str] = set() + + def serialize_reference(self) -> dict: + return {"reference_type": self.reference_name, "marker": "i-am-custom"} + + def _evaluate_with(self, *, session, **kwargs): + raise AssertionError("custom evaluate should not be exercised in this test") + + +class DagRunLogicalDateDeadline(ReferenceModels.BaseDeadlineReference): + """Custom reference colliding with a builtin that has no required deserialize fields.""" + + required_kwargs: set[str] = set() + + def serialize_reference(self) -> dict: + return {"reference_type": self.reference_name} + + def _evaluate_with(self, *, session, **kwargs): + raise AssertionError("custom evaluate should not be exercised in this test") + + +_COLLIDING_REFS = { + f"{FixedDatetimeDeadline.__module__}.FixedDatetimeDeadline": FixedDatetimeDeadline, + f"{DagRunLogicalDateDeadline.__module__}.DagRunLogicalDateDeadline": DagRunLogicalDateDeadline, +} + + +@pytest.fixture +def colliding_plugin_registry(monkeypatch): + """Advertise custom references whose names collide with builtin reference names.""" + monkeypatch.setattr( + plugins_manager, + "get_deadline_references_plugins", + lambda: _COLLIDING_REFS, + ) + return _COLLIDING_REFS + + +@pytest.mark.parametrize( + "custom_cls", + [FixedDatetimeDeadline], +) +def test_custom_reference_name_collision_routes_to_custom(colliding_plugin_registry, custom_cls): + """ + A custom reference whose class name collides with a builtin must round-trip as the custom + class, not silently decode as the builtin (which loses the user's evaluation logic or raises + a spurious KeyError on builtin-only fields). + + Regression test: ``decode_deadline_reference`` previously routed solely by the + ``reference_type`` name string, ignoring the authoritative ``__class_path`` key. + """ + from airflow.serialization.decoders import decode_deadline_reference + from airflow.serialization.encoders import encode_deadline_reference + + encoded = encode_deadline_reference(custom_cls()) + # The encoder stamps __class_path for custom references regardless of name collision. + assert encoded["__class_path"] == f"{custom_cls.__module__}.{custom_cls.__name__}" + + decoded = decode_deadline_reference(encoded) + + assert isinstance(decoded, SerializedReferenceModels.SerializedCustomReference) + assert isinstance(decoded.inner_ref, custom_cls)