Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/68917.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Deadline alerts using a ``VariableInterval`` no longer risk aborting DagRun creation. The interval is now resolved through the full secrets chain (environment variables, configured secrets backends, then the metadata database) on the scheduler's own session, so ``AIRFLOW_VAR_*`` and secrets-backend-backed Variables resolve correctly and the read does not commit inside the scheduler's ``prohibit_commit`` guard. Each deadline alert is also isolated: a single unresolvable or undecodable alert is logged and skipped instead of preventing the DagRun from being created.
154 changes: 114 additions & 40 deletions airflow-core/src/airflow/serialization/definitions/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,51 +715,125 @@ def _process_dagrun_deadline_alerts(
if not deadline_alert:
continue

deserialized_deadline_alert = decode_deadline_alert(
{
Encoding.TYPE: DAT.DEADLINE_ALERT,
Encoding.VAR: {
DeadlineAlertFields.REFERENCE: deadline_alert.reference,
DeadlineAlertFields.INTERVAL: deadline_alert.interval,
DeadlineAlertFields.CALLBACK: deadline_alert.callback_def,
},
}
)
# Isolate each deadline alert: creating a deadline is auxiliary to creating the
# DagRun itself, and must never prevent the DagRun from being created. A single bad
# alert -- e.g. a ``VariableInterval`` whose backing Variable is missing / non-integer
# / <= 0 (``coerce_to_timedelta`` raises ``ValueError``), or a reference whose
# ``evaluate_with`` fails -- would otherwise propagate out of ``create_dagrun`` and
# abort the whole run, silently stopping the DAG from scheduling.
#
# Isolation is done with a plain ``try``/``except`` and MUST NOT use
# ``session.begin_nested()``: ``create_dagrun`` runs on the scheduler session inside
# the ``prohibit_commit`` guard, and releasing a SAVEPOINT issues a commit, which trips
# that guard (``RuntimeError("UNEXPECTED COMMIT ...")``) and -- because this very block
# swallows it -- silently skips deadline creation for *every* scheduled DagRun. The
# try/except alone is sufficient: the only DB mutation in the loop body is the final
# ``session.add`` (everything before it is a decode, an in-memory resolution, or a
# read-only ``evaluate_with`` query), so an exception leaves no partial state to undo,
# and the pending ``Deadline`` is persisted by the caller's outer transaction.
try:
deserialized_deadline_alert = decode_deadline_alert(
{
Encoding.TYPE: DAT.DEADLINE_ALERT,
Encoding.VAR: {
DeadlineAlertFields.REFERENCE: deadline_alert.reference,
DeadlineAlertFields.INTERVAL: deadline_alert.interval,
DeadlineAlertFields.CALLBACK: deadline_alert.callback_def,
},
}
)

interval = deserialized_deadline_alert.interval
interval = deserialized_deadline_alert.interval

if isinstance(interval, VariableInterval):
interval = interval.resolve()
if isinstance(interval, VariableInterval):
interval = self._resolve_variable_interval(interval, session=session)

if isinstance(deserialized_deadline_alert.reference, SerializedReferenceModels.TYPES.DAGRUN):
deadline_time = deserialized_deadline_alert.reference.evaluate_with(
session=session,
interval=interval,
# TODO : Pretty sure we can drop these last two; verify after testing is complete
dag_id=self.dag_id,
run_id=orm_dagrun.run_id,
)
if isinstance(deserialized_deadline_alert.reference, SerializedReferenceModels.TYPES.DAGRUN):
deadline_time = deserialized_deadline_alert.reference.evaluate_with(
session=session,
interval=interval,
# TODO : Pretty sure we can drop these last two; verify after testing is complete
dag_id=self.dag_id,
run_id=orm_dagrun.run_id,
)

if deadline_time is not None:
session.add(
Deadline(
deadline_time=deadline_time,
callback=deserialized_deadline_alert.callback,
dagrun_id=orm_dagrun.id,
deadline_alert_id=deadline_alert.id,
dag_id=orm_dagrun.dag_id,
bundle_name=orm_dagrun.dag_model.bundle_name,
if deadline_time is not None:
session.add(
Deadline(
deadline_time=deadline_time,
callback=deserialized_deadline_alert.callback,
dagrun_id=orm_dagrun.id,
deadline_alert_id=deadline_alert.id,
dag_id=orm_dagrun.dag_id,
bundle_name=orm_dagrun.dag_model.bundle_name,
)
)
)
team_name = (
DagModel.get_team_name(self.dag_id, session=session)
if airflow_conf.getboolean("core", "multi_team")
else None
)
stats.incr(
"deadline_alerts.deadline_created",
tags=prune_dict({"dag_id": self.dag_id, "team_name": team_name}),
)
team_name = (
DagModel.get_team_name(self.dag_id, session=session)
if airflow_conf.getboolean("core", "multi_team")
else None
)
stats.incr(
"deadline_alerts.deadline_created",
tags=prune_dict({"dag_id": self.dag_id, "team_name": team_name}),
)
except Exception:
log.exception(
"Failed to create deadline for alert %s on DagRun %s (dag_id=%s); "
"skipping this deadline, the DagRun is unaffected",
getattr(deadline_alert, "id", "<unknown>"),
orm_dagrun.run_id,
self.dag_id,
)
stats.incr("deadline_alerts.deadline_creation_failed", tags={"dag_id": self.dag_id})

@staticmethod
def _resolve_variable_interval(interval: VariableInterval, *, session: Session) -> datetime.timedelta:
"""
Resolve a ``VariableInterval`` to a concrete ``timedelta`` at DagRun creation time.

The backing Variable is read through the full secrets chain (env vars, configured
secrets backends, then the metadata DB) -- the same resolution order as
``Variable.get`` -- rather than reading only the ``variable`` table. Reading the table
directly would bypass ``AIRFLOW_VAR_*`` env vars and secrets backends, so a Variable
that lives there would resolve to ``None`` and the deadline would be silently dropped.

We must NOT call ``Variable.get`` / ``get_variable_from_secrets`` here: the metastore
backend's ``get_variable`` is ``@provide_session`` and, given no session, opens the
thread-local scoped session (the SAME one the scheduler holds) whose context-manager
exit COMMITS -- tripping the ``prohibit_commit`` guard ``create_dagrun`` runs under.
Instead we iterate the backends ourselves and pass the scheduler's ``session`` through
to the metastore backend (env / custom backends ignore it), so the DB read happens on
our session with no commit.

:param interval: The ``VariableInterval`` to resolve.
:param session: The scheduler session, passed through to the metastore backend.
:return: The resolved ``timedelta``.
:raises ValueError: If the Variable cannot be found in any backend, or its value
cannot be coerced to a positive ``timedelta``.
"""
from airflow._shared.secrets_backend.base import call_secrets_backend_method
from airflow.configuration import ensure_secrets_loaded
from airflow.secrets.metastore import MetastoreBackend

var_val = None
for secrets_backend in ensure_secrets_loaded():
kwargs = {"session": session} if isinstance(secrets_backend, MetastoreBackend) else {}
var_val = call_secrets_backend_method(
secrets_backend.get_variable, team_name=None, key=interval.key, **kwargs
)
if var_val is not None:
break

if var_val is None:
# Fail loudly (the per-alert ``except`` isolates it): the Variable does not exist in
# any backend, so we cannot resolve the interval. This is not a silent skip.
raise ValueError(
f"VariableInterval '{interval.key}' could not be resolved from any "
f"secrets backend, environment variable, or the metadata database"
)

return interval.coerce_to_timedelta(var_val)

@provide_session
def set_task_instance_state(
Expand Down
Loading
Loading