From 9aea8473fb105b299619fd79a60d802eaaa93749 Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Mon, 16 Mar 2026 00:25:33 -0400 Subject: [PATCH 01/16] Broadened exception handling in DagRunContext validator Previously only DetachedInstanceError was caught when accessing consumed_asset_events on ORM DagRun objects. Other SQLAlchemy exceptions (e.g. InvalidRequestError) crashed the scheduler. closes: #63374 --- .../src/airflow/callbacks/callback_requests.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/airflow-core/src/airflow/callbacks/callback_requests.py b/airflow-core/src/airflow/callbacks/callback_requests.py index ce48438f0e70a..25cb639534901 100644 --- a/airflow-core/src/airflow/callbacks/callback_requests.py +++ b/airflow-core/src/airflow/callbacks/callback_requests.py @@ -24,7 +24,6 @@ from sqlalchemy import inspect as sa_inspect from sqlalchemy.exc import NoInspectionAvailable from sqlalchemy.orm.attributes import set_committed_value -from sqlalchemy.orm.exc import DetachedInstanceError from airflow.api_fastapi.execution_api.datamodels import taskinstance as ti_datamodel # noqa: TC001 from airflow.utils.state import TaskInstanceState @@ -116,8 +115,9 @@ def _sanitize_consumed_asset_events(cls, values: Mapping[str, Any]) -> Mapping[s except NoInspectionAvailable: return values - # Relationship access may raise DetachedInstanceError; on that path, reload DagRun - # from the DB to avoid crashing the scheduler. + # Relationship access may raise DetachedInstanceError or other SQLAlchemy + # exceptions (e.g. InvalidRequestError when the session is closed); on that + # path, reload the DagRun from the DB to avoid crashing the scheduler. try: events = dag_run.consumed_asset_events set_committed_value( @@ -125,10 +125,10 @@ def _sanitize_consumed_asset_events(cls, values: Mapping[str, Any]) -> Mapping[s "consumed_asset_events", list(events) if events is not None else [], ) - except DetachedInstanceError: + except Exception: log.warning( - "DagRunContext encountered DetachedInstanceError while accessing " - "consumed_asset_events; reloading DagRun from DB." + "DagRunContext failed to access consumed_asset_events; reloading DagRun from DB.", + exc_info=True, ) from sqlalchemy import select from sqlalchemy.orm import selectinload @@ -137,8 +137,8 @@ def _sanitize_consumed_asset_events(cls, values: Mapping[str, Any]) -> Mapping[s from airflow.models.dagrun import DagRun from airflow.utils.session import create_session - # Defensive guardrail: reload DagRun with eager-loaded relationships on - # DetachedInstanceError to recover state without adding DB I/O to the hot path. + # Reload DagRun with eager-loaded relationships to recover state + # without adding DB I/O to the hot path. with create_session() as session: dag_run_reloaded = session.scalar( select(DagRun) From 07cef39d602eb90b5b7ec589d2894b2fe6657b1e Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Mon, 16 Mar 2026 00:25:49 -0400 Subject: [PATCH 02/16] Made produce_dag_callback resilient to DagRunContext failures DagRunContext creation could crash when ORM relationship access failed, preventing the callback from being produced entirely. The callback is now sent with minimal context on failure. --- airflow-core/src/airflow/models/dagrun.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 80e279f149936..31580ea2b8258 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -1391,16 +1391,26 @@ def produce_dag_callback( ) relevant_ti = None if not execute: + try: + context_from_server = DagRunContext( + dag_run=self, + last_ti=relevant_ti, + ) + except Exception: + self.log.exception( + "Failed to build DagRunContext for dag_id=%s run_id=%s; " + "sending callback with minimal context", + self.dag_id, + self.run_id, + ) + context_from_server = None return DagCallbackRequest( filepath=self.dag_model.relative_fileloc, dag_id=self.dag_id, run_id=self.run_id, bundle_name=self.dag_model.bundle_name, bundle_version=self.bundle_version, - context_from_server=DagRunContext( - dag_run=self, - last_ti=relevant_ti, - ), + context_from_server=context_from_server, is_failure_callback=(not success), msg=reason, ) From 621591880ee86067def89def7d10237aa91fb831 Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Mon, 16 Mar 2026 00:26:03 -0400 Subject: [PATCH 03/16] Added warning log for callbacks with mismatched bundle_name --- airflow-core/src/airflow/dag_processing/manager.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 783de1747585c..9394997a1a979 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -662,6 +662,15 @@ def _fetch_callbacks_from_db( ] for callback in callbacks: req = callback.get_callback_request() + if req.bundle_name not in bundle_names: + self.log.warning( + "Callback for dag_id=%s has bundle_name=%r which is not served by " + "this DAG processor (serving bundles: %s). Skipping.", + getattr(req, "dag_id", "unknown"), + req.bundle_name, + bundle_names, + ) + continue try: callback_queue.append(req) session.delete(callback) From cd44d96b30ee97a8ecd40eaef5a934787da7ebeb Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Mon, 16 Mar 2026 00:26:18 -0400 Subject: [PATCH 04/16] Added info log when DAG callback is sent to processor --- airflow-core/src/airflow/jobs/scheduler_job_runner.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 21ce3f3c582b3..97d7cf607557a 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2878,6 +2878,12 @@ def _send_dag_callbacks_to_processor( callback: DagCallbackRequest | None = None, ) -> None: if callback: + self.log.info( + "Sending %s callback request for dag_id=%s, run_id=%s to DAG Processor", + "failure" if callback.is_failure_callback else "success", + callback.dag_id, + callback.run_id, + ) self.executor.send_callback(callback) else: self.log.debug("callback is empty") From 0f79160accefe5c12316c749b6cb68dc833bd2b1 Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Mon, 16 Mar 2026 00:26:33 -0400 Subject: [PATCH 05/16] Added tests for DAG callback resilience and bundle mismatch warning --- .../tests/unit/dag_processing/test_manager.py | 7 ++-- airflow-core/tests/unit/models/test_dagrun.py | 40 +++++++++++++++++++ 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 17795bcc8870c..6f2e2cd33fb01 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -1722,8 +1722,8 @@ def test_fetch_callbacks_from_database_max_per_loop(self, tmp_path, configure_te assert len(session.scalars(select(DbCallbackRequest)).all()) == 1 @conf_vars({("core", "load_examples"): "False"}) - def test_fetch_callbacks_ignores_other_bundles(self, configure_testing_dag_bundle): - """Ensure callbacks for bundles not owned by current dag processor manager are ignored and not deleted.""" + def test_fetch_callbacks_skips_other_bundles_with_warning(self, configure_testing_dag_bundle): + """Callbacks for bundles not served by this processor are skipped with a warning log.""" dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py" @@ -1759,10 +1759,9 @@ def test_fetch_callbacks_ignores_other_bundles(self, configure_testing_dag_bundl # Only the matching callback should be returned assert [c.run_id for c in callbacks] == ["match"] - # The non-matching callback should remain in the DB + # The non-matching callback should remain in the DB for the correct processor remaining = session.scalars(select(DbCallbackRequest)).all() assert len(remaining) == 1 - # Decode remaining request and verify it's for the other bundle remaining_req = remaining[0].get_callback_request() assert remaining_req.bundle_name == "other-bundle" diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index a8a831fb6faad..1fc7d4a164054 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -672,6 +672,46 @@ def on_failure_callable(context): ), ) + def test_produce_dag_callback_resilient_to_context_failure(self, testing_dag_bundle, dag_maker, session): + """produce_dag_callback should still return a callback even when DagRunContext creation fails.""" + + def on_failure_callable(context): + pass + + relative_fileloc = "test_produce_dag_callback_resilient.py" + with dag_maker( + dag_id="test_produce_dag_callback_resilient", + schedule=datetime.timedelta(days=1), + start_date=datetime.datetime(2017, 1, 1), + on_failure_callback=on_failure_callable, + ) as dag: + EmptyOperator(task_id="task1") + dm = DagModel.get_dagmodel(dag.dag_id, session=session) + dm.relative_fileloc = relative_fileloc + session.merge(dm) + session.commit() + + initial_task_states = {"task1": TaskInstanceState.FAILED} + dag.relative_fileloc = relative_fileloc + SerializedDagModel.write_dag(LazyDeserializedDAG.from_dag(dag), bundle_name="dag_maker") + session.commit() + + dag_run = self.create_dag_run(dag=dag, task_states=initial_task_states, session=session) + dag_run.dag_model = dm + + # Patch DagRunContext to raise an exception during construction + with mock.patch( + "airflow.models.dagrun.DagRunContext", side_effect=RuntimeError("Simulated context failure") + ): + _, callback = dag_run.update_state(execute_callbacks=False) + + assert dag_run.state == DagRunState.FAILED + # Callback should still be produced with context_from_server=None + assert callback is not None + assert callback.dag_id == "test_produce_dag_callback_resilient" + assert callback.is_failure_callback is True + assert callback.context_from_server is None + def test_dagrun_set_state_end_date(self, dag_maker, session): with dag_maker(schedule=datetime.timedelta(days=1), start_date=DEFAULT_DATE): pass From dd65357034c90d61a3a5a860bfdfb2fda9a10750 Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Mon, 16 Mar 2026 00:30:36 -0400 Subject: [PATCH 06/16] Added newsfragment for #63374 --- airflow-core/newsfragments/63374.bugfix.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 airflow-core/newsfragments/63374.bugfix.rst diff --git a/airflow-core/newsfragments/63374.bugfix.rst b/airflow-core/newsfragments/63374.bugfix.rst new file mode 100644 index 0000000000000..2df4cc7ddd36d --- /dev/null +++ b/airflow-core/newsfragments/63374.bugfix.rst @@ -0,0 +1 @@ +Fixed DAG-level on_failure_callback not firing when DagRunContext creation failed due to ORM session errors. From 4ef73605398992b11d4f23f8ead05b28743284b1 Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Mon, 16 Mar 2026 00:43:14 -0400 Subject: [PATCH 07/16] Renamed newsfragment to match PR number --- airflow-core/newsfragments/{63374.bugfix.rst => 63692.bugfix.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename airflow-core/newsfragments/{63374.bugfix.rst => 63692.bugfix.rst} (100%) diff --git a/airflow-core/newsfragments/63374.bugfix.rst b/airflow-core/newsfragments/63692.bugfix.rst similarity index 100% rename from airflow-core/newsfragments/63374.bugfix.rst rename to airflow-core/newsfragments/63692.bugfix.rst From 5bc2d48a96fafc82859d72c1f06ad25aa11015ea Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Fri, 3 Apr 2026 14:06:12 -0400 Subject: [PATCH 08/16] Narrow DagRunContext exception catch to SQLAlchemyError --- airflow-core/src/airflow/callbacks/callback_requests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/callbacks/callback_requests.py b/airflow-core/src/airflow/callbacks/callback_requests.py index 25cb639534901..0d5f7869e8070 100644 --- a/airflow-core/src/airflow/callbacks/callback_requests.py +++ b/airflow-core/src/airflow/callbacks/callback_requests.py @@ -22,7 +22,7 @@ import structlog from pydantic import BaseModel, Field, model_validator from sqlalchemy import inspect as sa_inspect -from sqlalchemy.exc import NoInspectionAvailable +from sqlalchemy.exc import NoInspectionAvailable, SQLAlchemyError from sqlalchemy.orm.attributes import set_committed_value from airflow.api_fastapi.execution_api.datamodels import taskinstance as ti_datamodel # noqa: TC001 @@ -125,7 +125,7 @@ def _sanitize_consumed_asset_events(cls, values: Mapping[str, Any]) -> Mapping[s "consumed_asset_events", list(events) if events is not None else [], ) - except Exception: + except SQLAlchemyError: log.warning( "DagRunContext failed to access consumed_asset_events; reloading DagRun from DB.", exc_info=True, From f87af36a1bbd2db21be3a7f6662e2e817b5facc5 Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Fri, 3 Apr 2026 18:16:06 -0400 Subject: [PATCH 09/16] Remove newsfragment per reviewer request --- airflow-core/newsfragments/63692.bugfix.rst | 1 - 1 file changed, 1 deletion(-) delete mode 100644 airflow-core/newsfragments/63692.bugfix.rst diff --git a/airflow-core/newsfragments/63692.bugfix.rst b/airflow-core/newsfragments/63692.bugfix.rst deleted file mode 100644 index 2df4cc7ddd36d..0000000000000 --- a/airflow-core/newsfragments/63692.bugfix.rst +++ /dev/null @@ -1 +0,0 @@ -Fixed DAG-level on_failure_callback not firing when DagRunContext creation failed due to ORM session errors. From eb038b272c230c114d0365b529cd53a596fe0ef0 Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Wed, 13 May 2026 12:56:11 -0400 Subject: [PATCH 10/16] Removed redundant callback bundle guard - Removed duplicate bundle-name guard after scoped callback fetch --- airflow-core/src/airflow/dag_processing/manager.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 9394997a1a979..783de1747585c 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -662,15 +662,6 @@ def _fetch_callbacks_from_db( ] for callback in callbacks: req = callback.get_callback_request() - if req.bundle_name not in bundle_names: - self.log.warning( - "Callback for dag_id=%s has bundle_name=%r which is not served by " - "this DAG processor (serving bundles: %s). Skipping.", - getattr(req, "dag_id", "unknown"), - req.bundle_name, - bundle_names, - ) - continue try: callback_queue.append(req) session.delete(callback) From ba34fc9112eae5e1e61bccaaed270699e06a6a32 Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Wed, 13 May 2026 12:56:20 -0400 Subject: [PATCH 11/16] Changed callback request log level - Updated scheduler callback request log from info to debug --- airflow-core/src/airflow/jobs/scheduler_job_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 97d7cf607557a..e6c214b476b2a 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2878,7 +2878,7 @@ def _send_dag_callbacks_to_processor( callback: DagCallbackRequest | None = None, ) -> None: if callback: - self.log.info( + self.log.debug( "Sending %s callback request for dag_id=%s, run_id=%s to DAG Processor", "failure" if callback.is_failure_callback else "success", callback.dag_id, From 354caea9717cb518eae3ebbe6f8d05d7493f3226 Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Wed, 13 May 2026 12:56:26 -0400 Subject: [PATCH 12/16] Removed partial DAG callback context fallback - Removed broad fallback around DagRunContext creation --- airflow-core/src/airflow/models/dagrun.py | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 31580ea2b8258..80e279f149936 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -1391,26 +1391,16 @@ def produce_dag_callback( ) relevant_ti = None if not execute: - try: - context_from_server = DagRunContext( - dag_run=self, - last_ti=relevant_ti, - ) - except Exception: - self.log.exception( - "Failed to build DagRunContext for dag_id=%s run_id=%s; " - "sending callback with minimal context", - self.dag_id, - self.run_id, - ) - context_from_server = None return DagCallbackRequest( filepath=self.dag_model.relative_fileloc, dag_id=self.dag_id, run_id=self.run_id, bundle_name=self.dag_model.bundle_name, bundle_version=self.bundle_version, - context_from_server=context_from_server, + context_from_server=DagRunContext( + dag_run=self, + last_ti=relevant_ti, + ), is_failure_callback=(not success), msg=reason, ) From f8a03768fe8527a67ba610583552aeec457755cf Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Wed, 13 May 2026 12:56:36 -0400 Subject: [PATCH 13/16] Updated callback bundle filter test wording - Restored test name and comments for existing bundle filtering behavior --- airflow-core/tests/unit/dag_processing/test_manager.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 6f2e2cd33fb01..17795bcc8870c 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -1722,8 +1722,8 @@ def test_fetch_callbacks_from_database_max_per_loop(self, tmp_path, configure_te assert len(session.scalars(select(DbCallbackRequest)).all()) == 1 @conf_vars({("core", "load_examples"): "False"}) - def test_fetch_callbacks_skips_other_bundles_with_warning(self, configure_testing_dag_bundle): - """Callbacks for bundles not served by this processor are skipped with a warning log.""" + def test_fetch_callbacks_ignores_other_bundles(self, configure_testing_dag_bundle): + """Ensure callbacks for bundles not owned by current dag processor manager are ignored and not deleted.""" dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py" @@ -1759,9 +1759,10 @@ def test_fetch_callbacks_skips_other_bundles_with_warning(self, configure_testin # Only the matching callback should be returned assert [c.run_id for c in callbacks] == ["match"] - # The non-matching callback should remain in the DB for the correct processor + # The non-matching callback should remain in the DB remaining = session.scalars(select(DbCallbackRequest)).all() assert len(remaining) == 1 + # Decode remaining request and verify it's for the other bundle remaining_req = remaining[0].get_callback_request() assert remaining_req.bundle_name == "other-bundle" From a757125ddda41d2c09624c45cefcbd0c5fe55c54 Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Wed, 13 May 2026 12:56:41 -0400 Subject: [PATCH 14/16] Removed partial context fallback test - Removed regression test for dropped minimal-context callback fallback --- airflow-core/tests/unit/models/test_dagrun.py | 40 ------------------- 1 file changed, 40 deletions(-) diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index 1fc7d4a164054..a8a831fb6faad 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -672,46 +672,6 @@ def on_failure_callable(context): ), ) - def test_produce_dag_callback_resilient_to_context_failure(self, testing_dag_bundle, dag_maker, session): - """produce_dag_callback should still return a callback even when DagRunContext creation fails.""" - - def on_failure_callable(context): - pass - - relative_fileloc = "test_produce_dag_callback_resilient.py" - with dag_maker( - dag_id="test_produce_dag_callback_resilient", - schedule=datetime.timedelta(days=1), - start_date=datetime.datetime(2017, 1, 1), - on_failure_callback=on_failure_callable, - ) as dag: - EmptyOperator(task_id="task1") - dm = DagModel.get_dagmodel(dag.dag_id, session=session) - dm.relative_fileloc = relative_fileloc - session.merge(dm) - session.commit() - - initial_task_states = {"task1": TaskInstanceState.FAILED} - dag.relative_fileloc = relative_fileloc - SerializedDagModel.write_dag(LazyDeserializedDAG.from_dag(dag), bundle_name="dag_maker") - session.commit() - - dag_run = self.create_dag_run(dag=dag, task_states=initial_task_states, session=session) - dag_run.dag_model = dm - - # Patch DagRunContext to raise an exception during construction - with mock.patch( - "airflow.models.dagrun.DagRunContext", side_effect=RuntimeError("Simulated context failure") - ): - _, callback = dag_run.update_state(execute_callbacks=False) - - assert dag_run.state == DagRunState.FAILED - # Callback should still be produced with context_from_server=None - assert callback is not None - assert callback.dag_id == "test_produce_dag_callback_resilient" - assert callback.is_failure_callback is True - assert callback.context_from_server is None - def test_dagrun_set_state_end_date(self, dag_maker, session): with dag_maker(schedule=datetime.timedelta(days=1), start_date=DEFAULT_DATE): pass From 1b9eda206623865c3c1f9040c8cef50971e026e3 Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Thu, 11 Jun 2026 22:06:03 -0400 Subject: [PATCH 15/16] Hardened DagRun reload in DAG callback context recovery - Read primary key from instance identity to avoid re-raising on expired attributes - Raised original error when reload returns no row - Passed reloaded DagRun to validation instead of patching the broken instance --- .../airflow/callbacks/callback_requests.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/airflow-core/src/airflow/callbacks/callback_requests.py b/airflow-core/src/airflow/callbacks/callback_requests.py index 0d5f7869e8070..3aba464f520c7 100644 --- a/airflow-core/src/airflow/callbacks/callback_requests.py +++ b/airflow-core/src/airflow/callbacks/callback_requests.py @@ -137,26 +137,31 @@ def _sanitize_consumed_asset_events(cls, values: Mapping[str, Any]) -> Mapping[s from airflow.models.dagrun import DagRun from airflow.utils.session import create_session + # Read the primary key via the instance identity: attribute access on + # an expired instance would hit the same broken session and re-raise. + identity = sa_inspect(dag_run).identity + if identity is None: + raise + # Reload DagRun with eager-loaded relationships to recover state # without adding DB I/O to the hot path. with create_session() as session: dag_run_reloaded = session.scalar( select(DagRun) - .where(DagRun.id == dag_run.id) + .where(DagRun.id == identity[0]) .options( selectinload(DagRun.consumed_asset_events).selectinload(AssetEvent.asset), selectinload(DagRun.consumed_asset_events).selectinload(AssetEvent.source_aliases), ) ) - # DagRun exists; reload is expected to succeed. + if dag_run_reloaded is None: + raise dag_run_reloaded = cast("DagRun", dag_run_reloaded) - reloaded_events = dag_run_reloaded.consumed_asset_events - # Install DB-backed relationship state on the detached instance. - set_committed_value( - dag_run, "consumed_asset_events", list(reloaded_events) if reloaded_events is not None else [] - ) + # Validate the reloaded instance: attribute reads on the original + # would hit the broken session again. + return {**values, "dag_run": dag_run_reloaded} return values From 91b5209d3fb37a0ab7fe0d052d8b2a6244f1e16c Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Thu, 11 Jun 2026 22:06:03 -0400 Subject: [PATCH 16/16] Added test for DAG callback context with invalidated session transaction - Covered PendingRollbackError path that requires the broad SQLAlchemyError reload --- .../unit/callbacks/test_callback_requests.py | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/airflow-core/tests/unit/callbacks/test_callback_requests.py b/airflow-core/tests/unit/callbacks/test_callback_requests.py index 434223e37475f..471054a3b74e0 100644 --- a/airflow-core/tests/unit/callbacks/test_callback_requests.py +++ b/airflow-core/tests/unit/callbacks/test_callback_requests.py @@ -20,6 +20,8 @@ import pytest from pydantic import TypeAdapter +from sqlalchemy.exc import IntegrityError +from sqlalchemy.orm import sessionmaker from airflow._shared.timezones import timezone from airflow.api_fastapi.execution_api.datamodels.taskinstance import ( @@ -259,6 +261,53 @@ def test_dagrun_context_attached_consumed_asset_events(self, session): assert events is not None assert isinstance(events, list) + def test_dagrun_context_consumed_asset_events_with_invalidated_transaction(self, session): + """ + DagRunContext should not fail when the DagRun is attached to a session whose + transaction has been invalidated by a prior database error. + + Lazy-loading a relationship on such a session raises PendingRollbackError, + which is a SQLAlchemyError but not a DetachedInstanceError, so the reload + fallback must catch SQLAlchemyError broadly to produce the context. + """ + independent_session = sessionmaker(bind=session.get_bind(), expire_on_commit=False)() + current_time = timezone.utcnow() + dag_run = DagRun( + dag_id="test_dag", + run_id="test_run_invalidated_transaction", + logical_date=current_time, + state="running", + run_type="manual", + ) + independent_session.add(dag_run) + independent_session.commit() + + try: + # A failed flush (duplicate dag_id/run_id) poisons the session; further + # SQL raises PendingRollbackError until rollback. + independent_session.add( + DagRun( + dag_id="test_dag", + run_id="test_run_invalidated_transaction", + logical_date=current_time, + state="running", + run_type="manual", + ) + ) + with pytest.raises(IntegrityError): + independent_session.flush() + + context = DagRunContext(dag_run=dag_run, last_ti=None) + + events = context.dag_run.consumed_asset_events + + # Relationship should be normalized to a safe iterable. + assert events is not None + assert isinstance(events, list) + finally: + independent_session.rollback() + independent_session.close() + class TestDagCallbackRequestWithContext: def test_dag_callback_request_with_context_from_server(self):