From 2342c0389c227811dbe98c0f5a597dad266623d6 Mon Sep 17 00:00:00 2001 From: Sean Ghaeli Date: Tue, 23 Jun 2026 19:08:11 +0000 Subject: [PATCH] Resolve VariableInterval deadlines safely at DagRun creation A ``DeadlineAlert`` configured with a ``VariableInterval`` is resolved when the scheduler creates a DagRun, inside ``DAG._process_dagrun_deadline_alerts`` (which runs under the ``prohibit_commit`` guard). Two problems are fixed: 1. Resolution now goes through the full secrets chain (env vars, configured secrets backends, then the metadata DB) via a dedicated ``_resolve_variable_interval`` helper, rather than ``Variable.get`` / ``begin_nested``. ``Variable.get`` and a SAVEPOINT release both commit on the scheduler's session, tripping ``prohibit_commit`` ("UNEXPECTED COMMIT") and silently dropping deadlines for every scheduled DagRun. The helper passes the scheduler session through to the metastore backend so the DB read happens without committing, and reading via the secrets chain (not the variable table directly) means ``AIRFLOW_VAR_*`` env vars and secrets backends resolve too. 2. Each deadline alert is isolated with a plain ``try``/``except`` (NOT ``begin_nested``). Creating a deadline is auxiliary to creating the DagRun; a single bad alert -- a missing/invalid backing Variable, or an undecodable serialized blob -- must never abort the DagRun and stop the DAG scheduling. ``VariableInterval.resolve`` is split into ``resolve`` + ``coerce_to_timedelta`` so the scheduler-side reader reuses the exact same validation (including the OverflowError -> ValueError translation) without going through ``Variable.get``. Generated-by: Claude Code (Sonnet/Opus via Claude Code) on behalf of Sean Ghaeli --- airflow-core/newsfragments/68917.bugfix.rst | 1 + .../airflow/serialization/definitions/dag.py | 154 ++++++++++++----- airflow-core/tests/unit/models/test_dagrun.py | 156 ++++++++++++------ .../metrics/metrics_template.yaml | 6 + .../src/airflow/sdk/definitions/deadline.py | 25 ++- .../task_sdk/definitions/test_deadline.py | 15 ++ 6 files changed, 267 insertions(+), 90 deletions(-) create mode 100644 airflow-core/newsfragments/68917.bugfix.rst diff --git a/airflow-core/newsfragments/68917.bugfix.rst b/airflow-core/newsfragments/68917.bugfix.rst new file mode 100644 index 0000000000000..71cbdf9b05ef1 --- /dev/null +++ b/airflow-core/newsfragments/68917.bugfix.rst @@ -0,0 +1 @@ +Deadline alerts using a ``VariableInterval`` no longer risk aborting DagRun creation. The interval is now resolved through the full secrets chain (environment variables, configured secrets backends, then the metadata database) on the scheduler's own session, so ``AIRFLOW_VAR_*`` and secrets-backend-backed Variables resolve correctly and the read does not commit inside the scheduler's ``prohibit_commit`` guard. Each deadline alert is also isolated: a single unresolvable or undecodable alert is logged and skipped instead of preventing the DagRun from being created. diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py b/airflow-core/src/airflow/serialization/definitions/dag.py index 9dc816eb9cc45..1cce22bb90b5f 100644 --- a/airflow-core/src/airflow/serialization/definitions/dag.py +++ b/airflow-core/src/airflow/serialization/definitions/dag.py @@ -715,51 +715,125 @@ def _process_dagrun_deadline_alerts( if not deadline_alert: continue - deserialized_deadline_alert = decode_deadline_alert( - { - Encoding.TYPE: DAT.DEADLINE_ALERT, - Encoding.VAR: { - DeadlineAlertFields.REFERENCE: deadline_alert.reference, - DeadlineAlertFields.INTERVAL: deadline_alert.interval, - DeadlineAlertFields.CALLBACK: deadline_alert.callback_def, - }, - } - ) + # Isolate each deadline alert: creating a deadline is auxiliary to creating the + # DagRun itself, and must never prevent the DagRun from being created. A single bad + # alert -- e.g. a ``VariableInterval`` whose backing Variable is missing / non-integer + # / <= 0 (``coerce_to_timedelta`` raises ``ValueError``), or a reference whose + # ``evaluate_with`` fails -- would otherwise propagate out of ``create_dagrun`` and + # abort the whole run, silently stopping the DAG from scheduling. + # + # Isolation is done with a plain ``try``/``except`` and MUST NOT use + # ``session.begin_nested()``: ``create_dagrun`` runs on the scheduler session inside + # the ``prohibit_commit`` guard, and releasing a SAVEPOINT issues a commit, which trips + # that guard (``RuntimeError("UNEXPECTED COMMIT ...")``) and -- because this very block + # swallows it -- silently skips deadline creation for *every* scheduled DagRun. The + # try/except alone is sufficient: the only DB mutation in the loop body is the final + # ``session.add`` (everything before it is a decode, an in-memory resolution, or a + # read-only ``evaluate_with`` query), so an exception leaves no partial state to undo, + # and the pending ``Deadline`` is persisted by the caller's outer transaction. + try: + deserialized_deadline_alert = decode_deadline_alert( + { + Encoding.TYPE: DAT.DEADLINE_ALERT, + Encoding.VAR: { + DeadlineAlertFields.REFERENCE: deadline_alert.reference, + DeadlineAlertFields.INTERVAL: deadline_alert.interval, + DeadlineAlertFields.CALLBACK: deadline_alert.callback_def, + }, + } + ) - interval = deserialized_deadline_alert.interval + interval = deserialized_deadline_alert.interval - if isinstance(interval, VariableInterval): - interval = interval.resolve() + if isinstance(interval, VariableInterval): + interval = self._resolve_variable_interval(interval, session=session) - if isinstance(deserialized_deadline_alert.reference, SerializedReferenceModels.TYPES.DAGRUN): - deadline_time = deserialized_deadline_alert.reference.evaluate_with( - session=session, - interval=interval, - # TODO : Pretty sure we can drop these last two; verify after testing is complete - dag_id=self.dag_id, - run_id=orm_dagrun.run_id, - ) + if isinstance(deserialized_deadline_alert.reference, SerializedReferenceModels.TYPES.DAGRUN): + deadline_time = deserialized_deadline_alert.reference.evaluate_with( + session=session, + interval=interval, + # TODO : Pretty sure we can drop these last two; verify after testing is complete + dag_id=self.dag_id, + run_id=orm_dagrun.run_id, + ) - if deadline_time is not None: - session.add( - Deadline( - deadline_time=deadline_time, - callback=deserialized_deadline_alert.callback, - dagrun_id=orm_dagrun.id, - deadline_alert_id=deadline_alert.id, - dag_id=orm_dagrun.dag_id, - bundle_name=orm_dagrun.dag_model.bundle_name, + if deadline_time is not None: + session.add( + Deadline( + deadline_time=deadline_time, + callback=deserialized_deadline_alert.callback, + dagrun_id=orm_dagrun.id, + deadline_alert_id=deadline_alert.id, + dag_id=orm_dagrun.dag_id, + bundle_name=orm_dagrun.dag_model.bundle_name, + ) ) - ) - team_name = ( - DagModel.get_team_name(self.dag_id, session=session) - if airflow_conf.getboolean("core", "multi_team") - else None - ) - stats.incr( - "deadline_alerts.deadline_created", - tags=prune_dict({"dag_id": self.dag_id, "team_name": team_name}), - ) + team_name = ( + DagModel.get_team_name(self.dag_id, session=session) + if airflow_conf.getboolean("core", "multi_team") + else None + ) + stats.incr( + "deadline_alerts.deadline_created", + tags=prune_dict({"dag_id": self.dag_id, "team_name": team_name}), + ) + except Exception: + log.exception( + "Failed to create deadline for alert %s on DagRun %s (dag_id=%s); " + "skipping this deadline, the DagRun is unaffected", + getattr(deadline_alert, "id", ""), + orm_dagrun.run_id, + self.dag_id, + ) + stats.incr("deadline_alerts.deadline_creation_failed", tags={"dag_id": self.dag_id}) + + @staticmethod + def _resolve_variable_interval(interval: VariableInterval, *, session: Session) -> datetime.timedelta: + """ + Resolve a ``VariableInterval`` to a concrete ``timedelta`` at DagRun creation time. + + The backing Variable is read through the full secrets chain (env vars, configured + secrets backends, then the metadata DB) -- the same resolution order as + ``Variable.get`` -- rather than reading only the ``variable`` table. Reading the table + directly would bypass ``AIRFLOW_VAR_*`` env vars and secrets backends, so a Variable + that lives there would resolve to ``None`` and the deadline would be silently dropped. + + We must NOT call ``Variable.get`` / ``get_variable_from_secrets`` here: the metastore + backend's ``get_variable`` is ``@provide_session`` and, given no session, opens the + thread-local scoped session (the SAME one the scheduler holds) whose context-manager + exit COMMITS -- tripping the ``prohibit_commit`` guard ``create_dagrun`` runs under. + Instead we iterate the backends ourselves and pass the scheduler's ``session`` through + to the metastore backend (env / custom backends ignore it), so the DB read happens on + our session with no commit. + + :param interval: The ``VariableInterval`` to resolve. + :param session: The scheduler session, passed through to the metastore backend. + :return: The resolved ``timedelta``. + :raises ValueError: If the Variable cannot be found in any backend, or its value + cannot be coerced to a positive ``timedelta``. + """ + from airflow._shared.secrets_backend.base import call_secrets_backend_method + from airflow.configuration import ensure_secrets_loaded + from airflow.secrets.metastore import MetastoreBackend + + var_val = None + for secrets_backend in ensure_secrets_loaded(): + kwargs = {"session": session} if isinstance(secrets_backend, MetastoreBackend) else {} + var_val = call_secrets_backend_method( + secrets_backend.get_variable, team_name=None, key=interval.key, **kwargs + ) + if var_val is not None: + break + + if var_val is None: + # Fail loudly (the per-alert ``except`` isolates it): the Variable does not exist in + # any backend, so we cannot resolve the interval. This is not a silent skip. + raise ValueError( + f"VariableInterval '{interval.key}' could not be resolved from any " + f"secrets backend, environment variable, or the metadata database" + ) + + return interval.coerce_to_timedelta(var_val) @provide_session def set_task_instance_state( diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index 290e616dd99ec..6b07eb80a69de 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -73,8 +73,6 @@ ) from airflow.sdk.definitions.callback import AsyncCallback from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference, VariableInterval -from airflow.sdk.definitions.variable import Variable -from airflow.sdk.exceptions import AirflowRuntimeError from airflow.serialization.definitions.deadline import SerializedReferenceModels from airflow.serialization.serialized_objects import LazyDeserializedDAG from airflow.settings import get_policy_plugin_manager @@ -1386,16 +1384,27 @@ def test_dag_run_dag_versions_with_null_created_dag_version(self, dag_maker, ses VariableInterval("my_key"), ], ) - @mock.patch.object(Variable, "get") @mock.patch.object(Deadline, "prune_deadlines") - def test_dagrun_success_deadline(self, _, mock_get, interval, session, deadline_test_dag): + def test_dagrun_success_deadline(self, _, interval, session, deadline_test_dag): def on_success_callable(context): assert context["dag_run"].dag_id == "test_dag" - future_date = datetime.datetime.now() + datetime.timedelta(days=365) - - # First value used during resolution - mock_get.return_value = "5" + # Fixed future date (repo standard: no datetime.now() in tests). Far enough ahead that the + # FIXED_DATETIME deadline never fires, but within MySQL's TIMESTAMP range (< 2038-01-19) so + # the deadline_time UtcDateTime column accepts it on all backends. + future_date = datetime.datetime(2037, 1, 1, tzinfo=datetime.timezone.utc) + + # Seed a REAL Variable row so the VariableInterval resolves through the actual + # secrets chain (metastore backend) on the scheduler's session -- the code under test no + # longer calls Variable.get, so mocking it would be a false green (see regression: reading + # only the variable table bypassed env/secrets resolution). + if isinstance(interval, VariableInterval): + # Seed via the metastore model (not the SDK Variable imported above, whose set() routes + # through SUPERVISOR_COMMS) so the row really lands in the variable table on this session. + from airflow.models.variable import Variable as VariableModel + + VariableModel.set(key="my_key", value="5", session=session) + session.flush() scheduler_dag = deadline_test_dag( deadline=DeadlineAlert( @@ -1505,71 +1514,122 @@ def test_dagrun_success_handles_empty_deadline_list(self, mock_prune, dag_maker, mock_prune.assert_not_called() assert dag_run.state == DagRunState.SUCCESS - @mock.patch.object(Variable, "get") @mock.patch.object(Deadline, "prune_deadlines") - def test_dagrun_deadline_variable_interval_stable(self, _, mock_get, session, deadline_test_dag): - future_date = datetime.datetime.now() + datetime.timedelta(days=365) - - # First value used during resolution. - mock_get.return_value = "60" + def test_dagrun_deadline_variable_interval_missing_variable_is_isolated( + self, _, session, deadline_test_dag + ): + """A VariableInterval whose backing Variable is missing must NOT abort DagRun creation. + + ``VariableInterval.resolve()`` raises ``ValueError`` for a missing/invalid Variable. + That resolution happens inside ``_process_dagrun_deadline_alerts`` during + ``create_dagrun``; previously the error propagated out and aborted the whole run, + silently stopping the DAG from scheduling. The per-alert ``try``/``except`` now isolates + the failure: the DagRun is created, the bad deadline is skipped (logged), and no Deadline + row is written. (Isolation must NOT use ``begin_nested`` here -- ``create_dagrun`` runs + under the scheduler ``prohibit_commit`` guard, where a SAVEPOINT release would raise + ``UNEXPECTED COMMIT`` and skip every scheduled DagRun's deadlines.) + """ + # No Variable named "missing_key" is seeded in any backend, so the scheduler-side resolver + # returns None -> ValueError(...could not be resolved...), isolated by the per-alert except. + # Fixed future date (repo standard: no datetime.now() in tests). Far enough ahead that the + # FIXED_DATETIME deadline never fires, but within MySQL's TIMESTAMP range (< 2038-01-19) so + # the deadline_time UtcDateTime column accepts it on all backends. + future_date = datetime.datetime(2037, 1, 1, tzinfo=datetime.timezone.utc) scheduler_dag = deadline_test_dag( deadline=DeadlineAlert( reference=DeadlineReference.FIXED_DATETIME(future_date), - interval=VariableInterval("my_key"), + interval=VariableInterval("missing_key"), callback=AsyncCallback(empty_callback_for_deadline), ), ) + # DagRun creation must succeed despite the unresolvable VariableInterval. dag_run = self.create_dag_run( dag=scheduler_dag, - task_states={"task_1": TaskInstanceState.SUCCESS, "task_2": TaskInstanceState.SUCCESS}, + task_states={"task_1": TaskInstanceState.SUCCESS}, session=session, ) - dag_run.dag = scheduler_dag - - # First update resolve interval to "5". - dag_run.update_state(session=session) + assert dag_run is not None + # The bad deadline was skipped — no Deadline row created. deadline = session.execute(select(Deadline)).scalars().one_or_none() - first_deadline_time = deadline.deadline_time + assert deadline is None - # Change Variable value after resolution. - mock_get.return_value = "120" + @mock.patch.object(Deadline, "prune_deadlines") + def test_dagrun_deadline_variable_interval_resolves_from_env_var( + self, _, session, deadline_test_dag, monkeypatch + ): + """A VariableInterval backed by an ``AIRFLOW_VAR_*`` env var (no DB row) must resolve. - # Run again (This should not change existing deadline). - dag_run.update_state(session=session) + Regression guard: the scheduler-side resolver must go through the full secrets chain + (env vars + secrets backends + metadata DB), not read only the ``variable`` table. A + table-only read returns None for an env/secrets-backed Variable, and the per-alert + ``except`` then silently drops the deadline. Here the Variable lives ONLY in the + environment, so a correct resolver creates the Deadline and a regressed one drops it. + """ + # Variable exists only as an env var — never written to the variable table. + # VariableInterval values are interpreted as SECONDS (see coerce_to_timedelta). + monkeypatch.setenv("AIRFLOW_VAR_ENV_INTERVAL_KEY", "7") + future_date = datetime.datetime(2037, 1, 1, tzinfo=datetime.timezone.utc) + + scheduler_dag = deadline_test_dag( + deadline=DeadlineAlert( + reference=DeadlineReference.FIXED_DATETIME(future_date), + interval=VariableInterval("env_interval_key"), + callback=AsyncCallback(empty_callback_for_deadline), + ), + ) + + dag_run = self.create_dag_run( + dag=scheduler_dag, + task_states={"task_1": TaskInstanceState.SUCCESS}, + session=session, + ) + assert dag_run is not None + # The env-var-backed interval resolved (7 seconds) -> a Deadline row WAS created. deadline = session.execute(select(Deadline)).scalars().one_or_none() - assert deadline.deadline_time == first_deadline_time + assert deadline is not None + assert deadline.deadline_time == future_date + datetime.timedelta(seconds=7) @mock.patch.object(Deadline, "prune_deadlines") - def test_dagrun_deadline_variable_interval_missing_variable_fails(self, _, session, deadline_test_dag): - mock_err = mock.Mock() - mock_err.error.value = "MISSING_DEADLINE" - mock_err.detail = "missing deadline" + def test_dagrun_deadline_decode_failure_is_isolated(self, _, session, deadline_test_dag): + """A deadline alert that fails to *decode* must NOT abort DagRun creation either. + + ``decode_deadline_alert`` can raise for a malformed/legacy serialized blob (e.g. a + None interval after a partial downgrade, an invalid interval type, or a corrupt + reference dict). That decode happens inside the per-alert ``try``/``except``, so the + failure is isolated just like a resolve-time failure: the DagRun is created and the bad + deadline skipped, rather than the corrupt row taking down scheduling for the whole DAG. + """ + # Fixed future date (repo standard: no datetime.now() in tests). Far enough ahead that the + # FIXED_DATETIME deadline never fires, but within MySQL's TIMESTAMP range (< 2038-01-19) so + # the deadline_time UtcDateTime column accepts it on all backends. + future_date = datetime.datetime(2037, 1, 1, tzinfo=datetime.timezone.utc) + scheduler_dag = deadline_test_dag( + deadline=DeadlineAlert( + reference=DeadlineReference.FIXED_DATETIME(future_date), + interval=datetime.timedelta(hours=1), + callback=AsyncCallback(empty_callback_for_deadline), + ), + ) - with mock.patch.object( - Variable, - "get", - side_effect=AirflowRuntimeError(mock_err), + # Simulate a corrupt/legacy blob: decoding the alert raises. + with mock.patch( + "airflow.serialization.definitions.dag.decode_deadline_alert", + side_effect=ValueError("corrupt deadline alert blob"), ): - future_date = datetime.datetime.now() + datetime.timedelta(days=365) - - scheduler_dag = deadline_test_dag( - deadline=DeadlineAlert( - reference=DeadlineReference.FIXED_DATETIME(future_date), - interval=VariableInterval("missing_key"), - callback=AsyncCallback(empty_callback_for_deadline), - ), + dag_run = self.create_dag_run( + dag=scheduler_dag, + task_states={"task_1": TaskInstanceState.SUCCESS}, + session=session, ) - with pytest.raises(ValueError, match="not found"): - self.create_dag_run( - dag=scheduler_dag, - task_states={"task_1": TaskInstanceState.SUCCESS}, - session=session, - ) + assert dag_run is not None + # No Deadline row — the undecodable alert was skipped, not fatal. + deadline = session.execute(select(Deadline)).scalars().one_or_none() + assert deadline is None @pytest.mark.parametrize( diff --git a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml index 6c51e32ff7798..06cd7efcdd749 100644 --- a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml +++ b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml @@ -309,6 +309,12 @@ metrics: legacy_name: "-" name_variables: [] + - name: "deadline_alerts.deadline_creation_failed" + description: "Number of deadline alerts that could not be created (interval/reference resolution raised)" + type: "counter" + legacy_name: "-" + name_variables: [] + - name: "deadline_alerts.deadline_missed" description: "Number of deadline alerts that fired because a Dag run missed its deadline" type: "counter" diff --git a/task-sdk/src/airflow/sdk/definitions/deadline.py b/task-sdk/src/airflow/sdk/definitions/deadline.py index a9da3dd3d3ea2..a791dff85761b 100644 --- a/task-sdk/src/airflow/sdk/definitions/deadline.py +++ b/task-sdk/src/airflow/sdk/definitions/deadline.py @@ -389,9 +389,19 @@ def resolve(self) -> timedelta: value = Variable.get(self.key) except AirflowRuntimeError as e: raise ValueError(f"VariableInterval '{self.key}' not found") from e + return self.coerce_to_timedelta(value) + def coerce_to_timedelta(self, value: str | int | float | None) -> timedelta: + """ + Validate a raw Variable value and convert it into a ``timedelta``. + + Split out from :meth:`resolve` so callers that already hold the Variable value (e.g. a + scheduler-side reader that must fetch it on its own session — see + ``DAG._process_dagrun_deadline_alerts`` — to avoid committing inside ``prohibit_commit``) + can reuse the exact same validation without going through ``Variable.get``. + """ try: - seconds = int(value) + seconds = int(value) # type: ignore[arg-type] # None/non-numeric handled by the except below except (TypeError, ValueError) as e: raise ValueError( f"VariableInterval '{self.key}' must be an integer (seconds), got: {value!r}" @@ -400,4 +410,15 @@ def resolve(self) -> timedelta: if seconds <= 0: raise ValueError(f"VariableInterval '{self.key}' must be > 0, got: {seconds}") - return timedelta(seconds=seconds) + try: + return timedelta(seconds=seconds) + except OverflowError as e: + # ``timedelta`` caps at ~999999999 days; a wildly out-of-range value (e.g. a + # fat-fingered ``deadline_seconds`` meant as ms) raises OverflowError, which is NOT + # a ValueError subclass and would otherwise escape this method's documented + # "raises ValueError on bad input" contract with a cryptic C-int message. Translate + # it to the same clean ValueError so callers (and the deadline-creation isolation) + # get a consistent, actionable error. + raise ValueError( + f"VariableInterval '{self.key}' is too large to be a valid interval: {seconds} seconds" + ) from e diff --git a/task-sdk/tests/task_sdk/definitions/test_deadline.py b/task-sdk/tests/task_sdk/definitions/test_deadline.py index b104980e4c986..8c24fb11b1524 100644 --- a/task-sdk/tests/task_sdk/definitions/test_deadline.py +++ b/task-sdk/tests/task_sdk/definitions/test_deadline.py @@ -212,3 +212,18 @@ def test_resolve_invalid(self, mocker, value, raise_runtime, match): with pytest.raises(ValueError, match=match): interval.resolve() + + @pytest.mark.parametrize( + ("value", "match"), + [ + ("abc", "must be an integer"), + ("", "must be an integer"), + (None, "must be an integer"), + ("0", "must be > 0"), + ("-5", "must be > 0"), + ("99999999999999", "too large to be a valid interval"), + ], + ) + def test_coerce_to_timedelta_invalid(self, value, match): + with pytest.raises(ValueError, match=match): + VariableInterval(key="k").coerce_to_timedelta(value)