diff --git a/airflow-core/newsfragments/68917.bugfix.rst b/airflow-core/newsfragments/68917.bugfix.rst new file mode 100644 index 0000000000000..71cbdf9b05ef1 --- /dev/null +++ b/airflow-core/newsfragments/68917.bugfix.rst @@ -0,0 +1 @@ +Deadline alerts using a ``VariableInterval`` no longer risk aborting DagRun creation. The interval is now resolved through the full secrets chain (environment variables, configured secrets backends, then the metadata database) on the scheduler's own session, so ``AIRFLOW_VAR_*`` and secrets-backend-backed Variables resolve correctly and the read does not commit inside the scheduler's ``prohibit_commit`` guard. Each deadline alert is also isolated: a single unresolvable or undecodable alert is logged and skipped instead of preventing the DagRun from being created. diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py b/airflow-core/src/airflow/serialization/definitions/dag.py index 9dc816eb9cc45..1cce22bb90b5f 100644 --- a/airflow-core/src/airflow/serialization/definitions/dag.py +++ b/airflow-core/src/airflow/serialization/definitions/dag.py @@ -715,51 +715,125 @@ def _process_dagrun_deadline_alerts( if not deadline_alert: continue - deserialized_deadline_alert = decode_deadline_alert( - { - Encoding.TYPE: DAT.DEADLINE_ALERT, - Encoding.VAR: { - DeadlineAlertFields.REFERENCE: deadline_alert.reference, - DeadlineAlertFields.INTERVAL: deadline_alert.interval, - DeadlineAlertFields.CALLBACK: deadline_alert.callback_def, - }, - } - ) + # Isolate each deadline alert: creating a deadline is auxiliary to creating the + # DagRun itself, and must never prevent the DagRun from being created. A single bad + # alert -- e.g. a ``VariableInterval`` whose backing Variable is missing / non-integer + # / <= 0 (``coerce_to_timedelta`` raises ``ValueError``), or a reference whose + # ``evaluate_with`` fails -- would otherwise propagate out of ``create_dagrun`` and + # abort the whole run, silently stopping the DAG from scheduling. + # + # Isolation is done with a plain ``try``/``except`` and MUST NOT use + # ``session.begin_nested()``: ``create_dagrun`` runs on the scheduler session inside + # the ``prohibit_commit`` guard, and releasing a SAVEPOINT issues a commit, which trips + # that guard (``RuntimeError("UNEXPECTED COMMIT ...")``) and -- because this very block + # swallows it -- silently skips deadline creation for *every* scheduled DagRun. The + # try/except alone is sufficient: the only DB mutation in the loop body is the final + # ``session.add`` (everything before it is a decode, an in-memory resolution, or a + # read-only ``evaluate_with`` query), so an exception leaves no partial state to undo, + # and the pending ``Deadline`` is persisted by the caller's outer transaction. + try: + deserialized_deadline_alert = decode_deadline_alert( + { + Encoding.TYPE: DAT.DEADLINE_ALERT, + Encoding.VAR: { + DeadlineAlertFields.REFERENCE: deadline_alert.reference, + DeadlineAlertFields.INTERVAL: deadline_alert.interval, + DeadlineAlertFields.CALLBACK: deadline_alert.callback_def, + }, + } + ) - interval = deserialized_deadline_alert.interval + interval = deserialized_deadline_alert.interval - if isinstance(interval, VariableInterval): - interval = interval.resolve() + if isinstance(interval, VariableInterval): + interval = self._resolve_variable_interval(interval, session=session) - if isinstance(deserialized_deadline_alert.reference, SerializedReferenceModels.TYPES.DAGRUN): - deadline_time = deserialized_deadline_alert.reference.evaluate_with( - session=session, - interval=interval, - # TODO : Pretty sure we can drop these last two; verify after testing is complete - dag_id=self.dag_id, - run_id=orm_dagrun.run_id, - ) + if isinstance(deserialized_deadline_alert.reference, SerializedReferenceModels.TYPES.DAGRUN): + deadline_time = deserialized_deadline_alert.reference.evaluate_with( + session=session, + interval=interval, + # TODO : Pretty sure we can drop these last two; verify after testing is complete + dag_id=self.dag_id, + run_id=orm_dagrun.run_id, + ) - if deadline_time is not None: - session.add( - Deadline( - deadline_time=deadline_time, - callback=deserialized_deadline_alert.callback, - dagrun_id=orm_dagrun.id, - deadline_alert_id=deadline_alert.id, - dag_id=orm_dagrun.dag_id, - bundle_name=orm_dagrun.dag_model.bundle_name, + if deadline_time is not None: + session.add( + Deadline( + deadline_time=deadline_time, + callback=deserialized_deadline_alert.callback, + dagrun_id=orm_dagrun.id, + deadline_alert_id=deadline_alert.id, + dag_id=orm_dagrun.dag_id, + bundle_name=orm_dagrun.dag_model.bundle_name, + ) ) - ) - team_name = ( - DagModel.get_team_name(self.dag_id, session=session) - if airflow_conf.getboolean("core", "multi_team") - else None - ) - stats.incr( - "deadline_alerts.deadline_created", - tags=prune_dict({"dag_id": self.dag_id, "team_name": team_name}), - ) + team_name = ( + DagModel.get_team_name(self.dag_id, session=session) + if airflow_conf.getboolean("core", "multi_team") + else None + ) + stats.incr( + "deadline_alerts.deadline_created", + tags=prune_dict({"dag_id": self.dag_id, "team_name": team_name}), + ) + except Exception: + log.exception( + "Failed to create deadline for alert %s on DagRun %s (dag_id=%s); " + "skipping this deadline, the DagRun is unaffected", + getattr(deadline_alert, "id", ""), + orm_dagrun.run_id, + self.dag_id, + ) + stats.incr("deadline_alerts.deadline_creation_failed", tags={"dag_id": self.dag_id}) + + @staticmethod + def _resolve_variable_interval(interval: VariableInterval, *, session: Session) -> datetime.timedelta: + """ + Resolve a ``VariableInterval`` to a concrete ``timedelta`` at DagRun creation time. + + The backing Variable is read through the full secrets chain (env vars, configured + secrets backends, then the metadata DB) -- the same resolution order as + ``Variable.get`` -- rather than reading only the ``variable`` table. Reading the table + directly would bypass ``AIRFLOW_VAR_*`` env vars and secrets backends, so a Variable + that lives there would resolve to ``None`` and the deadline would be silently dropped. + + We must NOT call ``Variable.get`` / ``get_variable_from_secrets`` here: the metastore + backend's ``get_variable`` is ``@provide_session`` and, given no session, opens the + thread-local scoped session (the SAME one the scheduler holds) whose context-manager + exit COMMITS -- tripping the ``prohibit_commit`` guard ``create_dagrun`` runs under. + Instead we iterate the backends ourselves and pass the scheduler's ``session`` through + to the metastore backend (env / custom backends ignore it), so the DB read happens on + our session with no commit. + + :param interval: The ``VariableInterval`` to resolve. + :param session: The scheduler session, passed through to the metastore backend. + :return: The resolved ``timedelta``. + :raises ValueError: If the Variable cannot be found in any backend, or its value + cannot be coerced to a positive ``timedelta``. + """ + from airflow._shared.secrets_backend.base import call_secrets_backend_method + from airflow.configuration import ensure_secrets_loaded + from airflow.secrets.metastore import MetastoreBackend + + var_val = None + for secrets_backend in ensure_secrets_loaded(): + kwargs = {"session": session} if isinstance(secrets_backend, MetastoreBackend) else {} + var_val = call_secrets_backend_method( + secrets_backend.get_variable, team_name=None, key=interval.key, **kwargs + ) + if var_val is not None: + break + + if var_val is None: + # Fail loudly (the per-alert ``except`` isolates it): the Variable does not exist in + # any backend, so we cannot resolve the interval. This is not a silent skip. + raise ValueError( + f"VariableInterval '{interval.key}' could not be resolved from any " + f"secrets backend, environment variable, or the metadata database" + ) + + return interval.coerce_to_timedelta(var_val) @provide_session def set_task_instance_state( diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index a2e852f583449..5a8f87c37f513 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -73,8 +73,6 @@ ) from airflow.sdk.definitions.callback import AsyncCallback from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference, VariableInterval -from airflow.sdk.definitions.variable import Variable -from airflow.sdk.exceptions import AirflowRuntimeError from airflow.serialization.definitions.deadline import SerializedReferenceModels from airflow.serialization.serialized_objects import LazyDeserializedDAG from airflow.settings import get_policy_plugin_manager @@ -1386,16 +1384,27 @@ def test_dag_run_dag_versions_with_null_created_dag_version(self, dag_maker, ses VariableInterval("my_key"), ], ) - @mock.patch.object(Variable, "get") @mock.patch.object(Deadline, "prune_deadlines") - def test_dagrun_success_deadline(self, _, mock_get, interval, session, deadline_test_dag): + def test_dagrun_success_deadline(self, _, interval, session, deadline_test_dag): def on_success_callable(context): assert context["dag_run"].dag_id == "test_dag" - future_date = datetime.datetime.now() + datetime.timedelta(days=365) - - # First value used during resolution - mock_get.return_value = "5" + # Fixed future date (repo standard: no datetime.now() in tests). Far enough ahead that the + # FIXED_DATETIME deadline never fires, but within MySQL's TIMESTAMP range (< 2038-01-19) so + # the deadline_time UtcDateTime column accepts it on all backends. + future_date = datetime.datetime(2037, 1, 1, tzinfo=datetime.timezone.utc) + + # Seed a REAL Variable row so the VariableInterval resolves through the actual + # secrets chain (metastore backend) on the scheduler's session -- the code under test no + # longer calls Variable.get, so mocking it would be a false green (see regression: reading + # only the variable table bypassed env/secrets resolution). + if isinstance(interval, VariableInterval): + # Seed via the metastore model (not the SDK Variable imported above, whose set() routes + # through SUPERVISOR_COMMS) so the row really lands in the variable table on this session. + from airflow.models.variable import Variable as VariableModel + + VariableModel.set(key="my_key", value="5", session=session) + session.flush() scheduler_dag = deadline_test_dag( deadline=DeadlineAlert( @@ -1505,71 +1514,122 @@ def test_dagrun_success_handles_empty_deadline_list(self, mock_prune, dag_maker, mock_prune.assert_not_called() assert dag_run.state == DagRunState.SUCCESS - @mock.patch.object(Variable, "get") @mock.patch.object(Deadline, "prune_deadlines") - def test_dagrun_deadline_variable_interval_stable(self, _, mock_get, session, deadline_test_dag): - future_date = datetime.datetime.now() + datetime.timedelta(days=365) - - # First value used during resolution. - mock_get.return_value = "60" + def test_dagrun_deadline_variable_interval_missing_variable_is_isolated( + self, _, session, deadline_test_dag + ): + """A VariableInterval whose backing Variable is missing must NOT abort DagRun creation. + + ``VariableInterval.resolve()`` raises ``ValueError`` for a missing/invalid Variable. + That resolution happens inside ``_process_dagrun_deadline_alerts`` during + ``create_dagrun``; previously the error propagated out and aborted the whole run, + silently stopping the DAG from scheduling. The per-alert ``try``/``except`` now isolates + the failure: the DagRun is created, the bad deadline is skipped (logged), and no Deadline + row is written. (Isolation must NOT use ``begin_nested`` here -- ``create_dagrun`` runs + under the scheduler ``prohibit_commit`` guard, where a SAVEPOINT release would raise + ``UNEXPECTED COMMIT`` and skip every scheduled DagRun's deadlines.) + """ + # No Variable named "missing_key" is seeded in any backend, so the scheduler-side resolver + # returns None -> ValueError(...could not be resolved...), isolated by the per-alert except. + # Fixed future date (repo standard: no datetime.now() in tests). Far enough ahead that the + # FIXED_DATETIME deadline never fires, but within MySQL's TIMESTAMP range (< 2038-01-19) so + # the deadline_time UtcDateTime column accepts it on all backends. + future_date = datetime.datetime(2037, 1, 1, tzinfo=datetime.timezone.utc) scheduler_dag = deadline_test_dag( deadline=DeadlineAlert( reference=DeadlineReference.FIXED_DATETIME(future_date), - interval=VariableInterval("my_key"), + interval=VariableInterval("missing_key"), callback=AsyncCallback(empty_callback_for_deadline), ), ) + # DagRun creation must succeed despite the unresolvable VariableInterval. dag_run = self.create_dag_run( dag=scheduler_dag, - task_states={"task_1": TaskInstanceState.SUCCESS, "task_2": TaskInstanceState.SUCCESS}, + task_states={"task_1": TaskInstanceState.SUCCESS}, session=session, ) - dag_run.dag = scheduler_dag - - # First update resolve interval to "5". - dag_run.update_state(session=session) + assert dag_run is not None + # The bad deadline was skipped — no Deadline row created. deadline = session.execute(select(Deadline)).scalars().one_or_none() - first_deadline_time = deadline.deadline_time + assert deadline is None - # Change Variable value after resolution. - mock_get.return_value = "120" + @mock.patch.object(Deadline, "prune_deadlines") + def test_dagrun_deadline_variable_interval_resolves_from_env_var( + self, _, session, deadline_test_dag, monkeypatch + ): + """A VariableInterval backed by an ``AIRFLOW_VAR_*`` env var (no DB row) must resolve. - # Run again (This should not change existing deadline). - dag_run.update_state(session=session) + Regression guard: the scheduler-side resolver must go through the full secrets chain + (env vars + secrets backends + metadata DB), not read only the ``variable`` table. A + table-only read returns None for an env/secrets-backed Variable, and the per-alert + ``except`` then silently drops the deadline. Here the Variable lives ONLY in the + environment, so a correct resolver creates the Deadline and a regressed one drops it. + """ + # Variable exists only as an env var — never written to the variable table. + # VariableInterval values are interpreted as SECONDS (see coerce_to_timedelta). + monkeypatch.setenv("AIRFLOW_VAR_ENV_INTERVAL_KEY", "7") + future_date = datetime.datetime(2037, 1, 1, tzinfo=datetime.timezone.utc) + + scheduler_dag = deadline_test_dag( + deadline=DeadlineAlert( + reference=DeadlineReference.FIXED_DATETIME(future_date), + interval=VariableInterval("env_interval_key"), + callback=AsyncCallback(empty_callback_for_deadline), + ), + ) + + dag_run = self.create_dag_run( + dag=scheduler_dag, + task_states={"task_1": TaskInstanceState.SUCCESS}, + session=session, + ) + assert dag_run is not None + # The env-var-backed interval resolved (7 seconds) -> a Deadline row WAS created. deadline = session.execute(select(Deadline)).scalars().one_or_none() - assert deadline.deadline_time == first_deadline_time + assert deadline is not None + assert deadline.deadline_time == future_date + datetime.timedelta(seconds=7) @mock.patch.object(Deadline, "prune_deadlines") - def test_dagrun_deadline_variable_interval_missing_variable_fails(self, _, session, deadline_test_dag): - mock_err = mock.Mock() - mock_err.error.value = "MISSING_DEADLINE" - mock_err.detail = "missing deadline" + def test_dagrun_deadline_decode_failure_is_isolated(self, _, session, deadline_test_dag): + """A deadline alert that fails to *decode* must NOT abort DagRun creation either. + + ``decode_deadline_alert`` can raise for a malformed/legacy serialized blob (e.g. a + None interval after a partial downgrade, an invalid interval type, or a corrupt + reference dict). That decode happens inside the per-alert ``try``/``except``, so the + failure is isolated just like a resolve-time failure: the DagRun is created and the bad + deadline skipped, rather than the corrupt row taking down scheduling for the whole DAG. + """ + # Fixed future date (repo standard: no datetime.now() in tests). Far enough ahead that the + # FIXED_DATETIME deadline never fires, but within MySQL's TIMESTAMP range (< 2038-01-19) so + # the deadline_time UtcDateTime column accepts it on all backends. + future_date = datetime.datetime(2037, 1, 1, tzinfo=datetime.timezone.utc) + scheduler_dag = deadline_test_dag( + deadline=DeadlineAlert( + reference=DeadlineReference.FIXED_DATETIME(future_date), + interval=datetime.timedelta(hours=1), + callback=AsyncCallback(empty_callback_for_deadline), + ), + ) - with mock.patch.object( - Variable, - "get", - side_effect=AirflowRuntimeError(mock_err), + # Simulate a corrupt/legacy blob: decoding the alert raises. + with mock.patch( + "airflow.serialization.definitions.dag.decode_deadline_alert", + side_effect=ValueError("corrupt deadline alert blob"), ): - future_date = datetime.datetime.now() + datetime.timedelta(days=365) - - scheduler_dag = deadline_test_dag( - deadline=DeadlineAlert( - reference=DeadlineReference.FIXED_DATETIME(future_date), - interval=VariableInterval("missing_key"), - callback=AsyncCallback(empty_callback_for_deadline), - ), + dag_run = self.create_dag_run( + dag=scheduler_dag, + task_states={"task_1": TaskInstanceState.SUCCESS}, + session=session, ) - with pytest.raises(ValueError, match="not found"): - self.create_dag_run( - dag=scheduler_dag, - task_states={"task_1": TaskInstanceState.SUCCESS}, - session=session, - ) + assert dag_run is not None + # No Deadline row — the undecodable alert was skipped, not fatal. + deadline = session.execute(select(Deadline)).scalars().one_or_none() + assert deadline is None @pytest.mark.parametrize( diff --git a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml index 6c51e32ff7798..06cd7efcdd749 100644 --- a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml +++ b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml @@ -309,6 +309,12 @@ metrics: legacy_name: "-" name_variables: [] + - name: "deadline_alerts.deadline_creation_failed" + description: "Number of deadline alerts that could not be created (interval/reference resolution raised)" + type: "counter" + legacy_name: "-" + name_variables: [] + - name: "deadline_alerts.deadline_missed" description: "Number of deadline alerts that fired because a Dag run missed its deadline" type: "counter" diff --git a/task-sdk/src/airflow/sdk/definitions/deadline.py b/task-sdk/src/airflow/sdk/definitions/deadline.py index a9da3dd3d3ea2..a791dff85761b 100644 --- a/task-sdk/src/airflow/sdk/definitions/deadline.py +++ b/task-sdk/src/airflow/sdk/definitions/deadline.py @@ -389,9 +389,19 @@ def resolve(self) -> timedelta: value = Variable.get(self.key) except AirflowRuntimeError as e: raise ValueError(f"VariableInterval '{self.key}' not found") from e + return self.coerce_to_timedelta(value) + def coerce_to_timedelta(self, value: str | int | float | None) -> timedelta: + """ + Validate a raw Variable value and convert it into a ``timedelta``. + + Split out from :meth:`resolve` so callers that already hold the Variable value (e.g. a + scheduler-side reader that must fetch it on its own session — see + ``DAG._process_dagrun_deadline_alerts`` — to avoid committing inside ``prohibit_commit``) + can reuse the exact same validation without going through ``Variable.get``. + """ try: - seconds = int(value) + seconds = int(value) # type: ignore[arg-type] # None/non-numeric handled by the except below except (TypeError, ValueError) as e: raise ValueError( f"VariableInterval '{self.key}' must be an integer (seconds), got: {value!r}" @@ -400,4 +410,15 @@ def resolve(self) -> timedelta: if seconds <= 0: raise ValueError(f"VariableInterval '{self.key}' must be > 0, got: {seconds}") - return timedelta(seconds=seconds) + try: + return timedelta(seconds=seconds) + except OverflowError as e: + # ``timedelta`` caps at ~999999999 days; a wildly out-of-range value (e.g. a + # fat-fingered ``deadline_seconds`` meant as ms) raises OverflowError, which is NOT + # a ValueError subclass and would otherwise escape this method's documented + # "raises ValueError on bad input" contract with a cryptic C-int message. Translate + # it to the same clean ValueError so callers (and the deadline-creation isolation) + # get a consistent, actionable error. + raise ValueError( + f"VariableInterval '{self.key}' is too large to be a valid interval: {seconds} seconds" + ) from e diff --git a/task-sdk/tests/task_sdk/definitions/test_deadline.py b/task-sdk/tests/task_sdk/definitions/test_deadline.py index b104980e4c986..8c24fb11b1524 100644 --- a/task-sdk/tests/task_sdk/definitions/test_deadline.py +++ b/task-sdk/tests/task_sdk/definitions/test_deadline.py @@ -212,3 +212,18 @@ def test_resolve_invalid(self, mocker, value, raise_runtime, match): with pytest.raises(ValueError, match=match): interval.resolve() + + @pytest.mark.parametrize( + ("value", "match"), + [ + ("abc", "must be an integer"), + ("", "must be an integer"), + (None, "must be an integer"), + ("0", "must be > 0"), + ("-5", "must be > 0"), + ("99999999999999", "too large to be a valid interval"), + ], + ) + def test_coerce_to_timedelta_invalid(self, value, match): + with pytest.raises(ValueError, match=match): + VariableInterval(key="k").coerce_to_timedelta(value)