Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
9aea847
Broadened exception handling in DagRunContext validator
Sathvik-Chowdary-Veerapaneni Mar 16, 2026
07cef39
Made produce_dag_callback resilient to DagRunContext failures
Sathvik-Chowdary-Veerapaneni Mar 16, 2026
6215918
Added warning log for callbacks with mismatched bundle_name
Sathvik-Chowdary-Veerapaneni Mar 16, 2026
cd44d96
Added info log when DAG callback is sent to processor
Sathvik-Chowdary-Veerapaneni Mar 16, 2026
0f79160
Added tests for DAG callback resilience and bundle mismatch warning
Sathvik-Chowdary-Veerapaneni Mar 16, 2026
dd65357
Added newsfragment for #63374
Sathvik-Chowdary-Veerapaneni Mar 16, 2026
4ef7360
Renamed newsfragment to match PR number
Sathvik-Chowdary-Veerapaneni Mar 16, 2026
5bc2d48
Narrow DagRunContext exception catch to SQLAlchemyError
Sathvik-Chowdary-Veerapaneni Apr 3, 2026
f87af36
Remove newsfragment per reviewer request
Sathvik-Chowdary-Veerapaneni Apr 3, 2026
eb038b2
Removed redundant callback bundle guard
Sathvik-Chowdary-Veerapaneni May 13, 2026
ba34fc9
Changed callback request log level
Sathvik-Chowdary-Veerapaneni May 13, 2026
354caea
Removed partial DAG callback context fallback
Sathvik-Chowdary-Veerapaneni May 13, 2026
f8a0376
Updated callback bundle filter test wording
Sathvik-Chowdary-Veerapaneni May 13, 2026
a757125
Removed partial context fallback test
Sathvik-Chowdary-Veerapaneni May 13, 2026
1b9eda2
Hardened DagRun reload in DAG callback context recovery
Sathvik-Chowdary-Veerapaneni Jun 12, 2026
91b5209
Added test for DAG callback context with invalidated session transaction
Sathvik-Chowdary-Veerapaneni Jun 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions airflow-core/src/airflow/callbacks/callback_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
import structlog
from pydantic import BaseModel, Field, model_validator
from sqlalchemy import inspect as sa_inspect
from sqlalchemy.exc import NoInspectionAvailable
from sqlalchemy.exc import NoInspectionAvailable, SQLAlchemyError
from sqlalchemy.orm.attributes import set_committed_value
from sqlalchemy.orm.exc import DetachedInstanceError

from airflow.api_fastapi.execution_api.datamodels import taskinstance as ti_datamodel # noqa: TC001
from airflow.utils.state import TaskInstanceState
Expand Down Expand Up @@ -116,19 +115,20 @@ def _sanitize_consumed_asset_events(cls, values: Mapping[str, Any]) -> Mapping[s
except NoInspectionAvailable:
return values

# Relationship access may raise DetachedInstanceError; on that path, reload DagRun
# from the DB to avoid crashing the scheduler.
# Relationship access may raise DetachedInstanceError or other SQLAlchemy
# exceptions (e.g. InvalidRequestError when the session is closed); on that
# path, reload the DagRun from the DB to avoid crashing the scheduler.
try:
events = dag_run.consumed_asset_events
set_committed_value(
dag_run,
"consumed_asset_events",
list(events) if events is not None else [],
)
except DetachedInstanceError:
except SQLAlchemyError:
log.warning(
"DagRunContext encountered DetachedInstanceError while accessing "
"consumed_asset_events; reloading DagRun from DB."
"DagRunContext failed to access consumed_asset_events; reloading DagRun from DB.",
exc_info=True,
)
from sqlalchemy import select
from sqlalchemy.orm import selectinload
Expand All @@ -137,26 +137,31 @@ def _sanitize_consumed_asset_events(cls, values: Mapping[str, Any]) -> Mapping[s
from airflow.models.dagrun import DagRun
from airflow.utils.session import create_session

# Defensive guardrail: reload DagRun with eager-loaded relationships on
# DetachedInstanceError to recover state without adding DB I/O to the hot path.
# Read the primary key via the instance identity: attribute access on
# an expired instance would hit the same broken session and re-raise.
identity = sa_inspect(dag_run).identity
if identity is None:
raise

# Reload DagRun with eager-loaded relationships to recover state
# without adding DB I/O to the hot path.
with create_session() as session:
dag_run_reloaded = session.scalar(
select(DagRun)
.where(DagRun.id == dag_run.id)
.where(DagRun.id == identity[0])
.options(
selectinload(DagRun.consumed_asset_events).selectinload(AssetEvent.asset),
selectinload(DagRun.consumed_asset_events).selectinload(AssetEvent.source_aliases),
)
)

# DagRun exists; reload is expected to succeed.
if dag_run_reloaded is None:
raise
dag_run_reloaded = cast("DagRun", dag_run_reloaded)
reloaded_events = dag_run_reloaded.consumed_asset_events

# Install DB-backed relationship state on the detached instance.
set_committed_value(
dag_run, "consumed_asset_events", list(reloaded_events) if reloaded_events is not None else []
)
# Validate the reloaded instance: attribute reads on the original
# would hit the broken session again.
return {**values, "dag_run": dag_run_reloaded}

return values

Expand Down
6 changes: 6 additions & 0 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want this in info? Wondering if debug is enough.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, changed to debug in fd432e3.

Original file line number Diff line number Diff line change
Expand Up @@ -2878,6 +2878,12 @@ def _send_dag_callbacks_to_processor(
callback: DagCallbackRequest | None = None,
) -> None:
if callback:
self.log.debug(
"Sending %s callback request for dag_id=%s, run_id=%s to DAG Processor",
"failure" if callback.is_failure_callback else "success",
callback.dag_id,
callback.run_id,
)
self.executor.send_callback(callback)
else:
self.log.debug("callback is empty")
Expand Down
49 changes: 49 additions & 0 deletions airflow-core/tests/unit/callbacks/test_callback_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import pytest
from pydantic import TypeAdapter
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import sessionmaker

from airflow._shared.timezones import timezone
from airflow.api_fastapi.execution_api.datamodels.taskinstance import (
Expand Down Expand Up @@ -259,6 +261,53 @@ def test_dagrun_context_attached_consumed_asset_events(self, session):
assert events is not None
assert isinstance(events, list)

def test_dagrun_context_consumed_asset_events_with_invalidated_transaction(self, session):
"""
DagRunContext should not fail when the DagRun is attached to a session whose
transaction has been invalidated by a prior database error.

Lazy-loading a relationship on such a session raises PendingRollbackError,
which is a SQLAlchemyError but not a DetachedInstanceError, so the reload
fallback must catch SQLAlchemyError broadly to produce the context.
"""
independent_session = sessionmaker(bind=session.get_bind(), expire_on_commit=False)()
current_time = timezone.utcnow()
dag_run = DagRun(
dag_id="test_dag",
run_id="test_run_invalidated_transaction",
logical_date=current_time,
state="running",
run_type="manual",
)
independent_session.add(dag_run)
independent_session.commit()

try:
# A failed flush (duplicate dag_id/run_id) poisons the session; further
# SQL raises PendingRollbackError until rollback.
independent_session.add(
DagRun(
dag_id="test_dag",
run_id="test_run_invalidated_transaction",
logical_date=current_time,
state="running",
run_type="manual",
)
)
with pytest.raises(IntegrityError):
independent_session.flush()

context = DagRunContext(dag_run=dag_run, last_ti=None)

events = context.dag_run.consumed_asset_events

# Relationship should be normalized to a safe iterable.
assert events is not None
assert isinstance(events, list)
finally:
independent_session.rollback()
independent_session.close()


class TestDagCallbackRequestWithContext:
def test_dag_callback_request_with_context_from_server(self):
Expand Down
Loading