Resolve VariableInterval deadlines safely at DagRun creation#68917
Open
seanghaeli wants to merge 2 commits into
Open
Resolve VariableInterval deadlines safely at DagRun creation#68917seanghaeli wants to merge 2 commits into
seanghaeli wants to merge 2 commits into
Conversation
7aee60c to
94825ec
Compare
A ``DeadlineAlert`` configured with a ``VariableInterval`` is resolved when the
scheduler creates a DagRun, inside ``DAG._process_dagrun_deadline_alerts`` (which
runs under the ``prohibit_commit`` guard). Two problems are fixed:
1. Resolution now goes through the full secrets chain (env vars, configured
secrets backends, then the metadata DB) via a dedicated
``_resolve_variable_interval`` helper, rather than ``Variable.get`` /
``begin_nested``. ``Variable.get`` and a SAVEPOINT release both commit on the
scheduler's session, tripping ``prohibit_commit`` ("UNEXPECTED COMMIT") and
silently dropping deadlines for every scheduled DagRun. The helper passes the
scheduler session through to the metastore backend so the DB read happens
without committing, and reading via the secrets chain (not the variable table
directly) means ``AIRFLOW_VAR_*`` env vars and secrets backends resolve too.
2. Each deadline alert is isolated with a plain ``try``/``except`` (NOT
``begin_nested``). Creating a deadline is auxiliary to creating the DagRun; a
single bad alert -- a missing/invalid backing Variable, or an undecodable
serialized blob -- must never abort the DagRun and stop the DAG scheduling.
``VariableInterval.resolve`` is split into ``resolve`` + ``coerce_to_timedelta``
so the scheduler-side reader reuses the exact same validation (including the
OverflowError -> ValueError translation) without going through ``Variable.get``.
Generated-by: Claude Code (Sonnet/Opus via Claude Code) on behalf of Sean Ghaeli
94825ec to
2342c03
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Why
This re-introduces the
VariableIntervalresolution portion of #66608.A
DeadlineAlertconfigured with aVariableIntervalis resolved when the scheduler creates a DagRun, insideDAG._process_dagrun_deadline_alerts(which runs under theprohibit_commitguard). Two problems are fixed:Resolution goes through the full secrets chain (env vars, configured secrets backends, then the metadata DB) via a dedicated
_resolve_variable_intervalhelper, rather thanVariable.get/begin_nested.Variable.getand a SAVEPOINT release both commit on the scheduler's session, trippingprohibit_commit(UNEXPECTED COMMIT) and silently dropping deadlines for every scheduled DagRun. The helper passes the scheduler session through to the metastore backend so the DB read does not commit, and reading via the secrets chain (not thevariabletable directly) meansAIRFLOW_VAR_*env vars and secrets-backend-backed Variables resolve too.Each deadline alert is isolated with a plain
try/except(deliberately notbegin_nested, which would commit a SAVEPOINT and trip the same guard). Creating a deadline is auxiliary to creating the DagRun; a single bad alert — a missing/invalid backing Variable, or an undecodable serialized blob — must never abort the DagRun and stop the DAG from scheduling.VariableInterval.resolveis split intoresolve+coerce_to_timedeltaso the scheduler-side reader reuses the exact same validation (including theOverflowError->ValueErrortranslation) without going throughVariable.get.Tests
airflow-core/tests/unit/models/test_dagrun.py: VariableInterval resolves from a real Variable row and from anAIRFLOW_VAR_*env var; a missing Variable and an undecodable alert are isolated (DagRun still created, no Deadline row, error logged).task-sdk/tests/task_sdk/definitions/test_deadline.py:coerce_to_timedeltavalidation (non-integer,<= 0, overflow).Verified locally in Breeze: 8 passed (dagrun deadline/variable) + 14 passed (SDK
TestVariableInterval).Generated-by: Claude Code (Opus via Claude Code) on behalf of Sean Ghaeli