From 78c9c9dc41b0787035f4222ed1956ba4c0ec522e Mon Sep 17 00:00:00 2001 From: Hemkumar Chheda Date: Wed, 13 May 2026 16:40:27 +0530 Subject: [PATCH 01/11] fix(assets): release task_instance row lock before asset event emission Under high concurrency (80+ simultaneous task completions emitting asset events), the API server was OOMKilled due to idle-in-transaction DB lock pile-up. Root cause: ti_update_state held a SELECT...FOR UPDATE row lock on task_instance while AssetManager.register_asset_change() ran multiple slow queries, including an ORM .append() that lazy-loaded the entire asset_events collection (potentially thousands of rows). Two fixes: 1. In AssetManager.register_asset_change(), replace asset_alias_model.asset_events.append(asset_event) with a direct INSERT into asset_alias_asset_event. This avoids loading the full relationship collection while the row lock is held. 2. In ti_update_state(), add session.commit() after the TI state UPDATE and Log writes to release the task_instance row lock before running asset registration. Asset registration then runs outside the lock in a fresh implicit transaction. Registration failures are logged and swallowed -- the task state is already durable at that point. Note: session.commit() inside a session-parameter function is an intentional deviation from the CLAUDE.md convention. No code after the commit relies on rollback; the subsequent session.get() re-loads fresh state. Alternative approaches (second session, background task) were considered but have higher operational complexity for equivalent correctness. Production evidence: connections idle-in-transaction for 3+ minutes on asset_alias queries, blocking SELECT task_instance FOR UPDATE across 8 concurrent workers. Disabling the trigger DAGs dropped apiserver memory from 5Gi+ to MBs instantly. --- .../execution_api/routes/task_instances.py | 34 ++++++-- airflow-core/src/airflow/assets/manager.py | 16 +++- .../versions/head/test_task_instances.py | 48 +++++++++++ .../tests/unit/assets/test_manager.py | 85 +++++++++++++++++++ 4 files changed, 173 insertions(+), 10 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index 3deccf35be69e..ac0ea47390567 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -491,6 +491,12 @@ def ti_update_state( extra=json.dumps({"host_name": hostname}) if hostname else None, ) ) + # Commit the TI state update now to release the task_instance row lock before + # running asset-event queries. Asset registration can hold the lock for seconds + # under high concurrency (many aliases with large event histories), causing + # idle-in-transaction pile-up that exhausts API server memory and triggers OOMKill. + # The task outcome is durable from this point on. + session.commit() except DataError: # Let DataErrorHandler return a 422 (not the opaque 500 below). raise @@ -525,6 +531,26 @@ def ti_update_state( task_id=task_id, ) + # Asset registration runs outside the TI row lock. Failures here are logged and + # swallowed — the task state is already committed, so returning HTTP 500 would be + # misleading and would cause unnecessary worker retries. + if isinstance(ti_patch_payload, TISuccessStatePayload) and ti_patch_payload.task_outlets: + try: + ti_for_assets = session.get(TI, task_instance_id) + if ti_for_assets is not None: + TI.register_asset_changes_in_db( + ti_for_assets, + ti_patch_payload.task_outlets, + ti_patch_payload.outlet_events, + session, + ) + except Exception: + log.exception( + "Failed to register asset changes; task state is already committed", + task_instance_id=str(task_instance_id), + new_state=updated_state, + ) + def _emit_task_span(ti, state): # just to be safe @@ -634,13 +660,7 @@ def _create_ti_state_update_query_and_update_state( retry_reason=(ti_patch_payload.retry_reason[:500] if ti_patch_payload.retry_reason else None), ) elif isinstance(ti_patch_payload, TISuccessStatePayload): - if ti is not None: - TI.register_asset_changes_in_db( - ti, - ti_patch_payload.task_outlets, - ti_patch_payload.outlet_events, - session=session, - ) + pass # Asset registration happens after the TI state is committed; see ti_update_state. try: _emit_task_span(ti, state=updated_state) except Exception: diff --git a/airflow-core/src/airflow/assets/manager.py b/airflow-core/src/airflow/assets/manager.py index f72c533c5a0e7..7a85812eb74f0 100644 --- a/airflow-core/src/airflow/assets/manager.py +++ b/airflow-core/src/airflow/assets/manager.py @@ -22,7 +22,7 @@ from typing import TYPE_CHECKING import structlog -from sqlalchemy import exc, or_, select +from sqlalchemy import exc, insert, or_, select from sqlalchemy.orm import joinedload from airflow._shared.observability.metrics import stats @@ -41,6 +41,7 @@ DagScheduleAssetUriReference, PartitionedAssetKeyLog, TaskOutletAssetReference, + asset_alias_asset_event_association_table, ) from airflow.models.log import Log from airflow.timetables.base import compute_rollup_fingerprint @@ -356,8 +357,17 @@ def register_asset_change( ).unique() for asset_alias_model in asset_alias_models: - asset_alias_model.asset_events.append(asset_event) - session.add(asset_alias_model) + # Use a direct INSERT rather than ORM .append() to avoid lazy-loading the + # entire asset_events collection. On long-running deployments that collection + # can contain thousands of rows, and loading it while the task_instance row + # lock is held (from the calling ti_update_state handler) causes the DB + # connection to sit idle-in-transaction for minutes, blocking other workers. + session.execute( + insert(asset_alias_asset_event_association_table).values( + alias_id=asset_alias_model.id, + event_id=asset_event.id, + ) + ) dags_to_queue_from_asset_alias |= { alias_ref.dag diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index f990b7008abdc..e25815a6e32c5 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -1408,6 +1408,54 @@ def test_ti_update_state_running_errors(self, client, session, create_task_insta assert response.status_code == 422 + def test_ti_update_state_to_success_asset_registration_failure_returns_204( + self, client, session, create_task_instance + ): + """Regression: asset registration failure after TI state commit must return 204, not 500. + + The TI state is committed (and the row lock released) before asset registration runs. + If registration fails at that point, the task outcome is already durable as SUCCESS, + so surfacing HTTP 500 would be misleading and cause unnecessary worker retries. + """ + asset = AssetModel( + id=42, + name="fail-asset", + uri="s3://bucket/fail-asset", + group="asset", + extra={}, + ) + asset_active = AssetActive.for_asset(asset) + session.add_all([asset, asset_active]) + + ti = create_task_instance( + task_id="test_asset_reg_failure", + start_date=DEFAULT_START_DATE, + state=State.RUNNING, + ) + session.commit() + + with mock.patch( + "airflow.models.taskinstance.TaskInstance.register_asset_changes_in_db", + side_effect=Exception("simulated DB explosion during asset registration"), + ): + response = client.patch( + f"/execution/task-instances/{ti.id}/state", + json={ + "state": "success", + "end_date": DEFAULT_END_DATE.isoformat(), + "task_outlets": [ + {"name": "fail-asset", "uri": "s3://bucket/fail-asset", "type": "Asset"} + ], + "outlet_events": [], + }, + ) + + assert response.status_code == 204, f"Expected 204, got {response.status_code}: {response.text}" + session.expire_all() + ti_db = session.get(TaskInstance, ti.id) + assert ti_db is not None + assert ti_db.state == TaskInstanceState.SUCCESS + def test_ti_update_state_database_error(self, client, session, create_task_instance): """ Test that a database error is handled correctly when updating the Task Instance state. diff --git a/airflow-core/tests/unit/assets/test_manager.py b/airflow-core/tests/unit/assets/test_manager.py index 687f4d178479d..9e72bd5481619 100644 --- a/airflow-core/tests/unit/assets/test_manager.py +++ b/airflow-core/tests/unit/assets/test_manager.py @@ -38,6 +38,7 @@ AssetPartitionDagRun, DagScheduleAssetAliasReference, DagScheduleAssetReference, + asset_alias_asset_event_association_table, ) from airflow.models.dag import DAG, DagModel from airflow.models.log import Log @@ -174,6 +175,90 @@ def test_register_asset_change_with_alias( ) assert session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 2 + def test_register_asset_change_with_alias_no_lazy_load( + self, session, mock_task_instance, testing_dag_bundle + ): + """Regression: alias-event association must use a direct INSERT, not ORM .append(). + + ORM .append() lazy-loads the entire asset_events collection before writing. + On long-running deployments with thousands of past events, this query runs + while the task_instance row lock is held in ti_update_state, causing idle-in-transaction + pile-up that exhausts API server memory and triggers OOMKill. + """ + from sqlalchemy import insert as sa_insert + + asm = AssetModel(uri="test://asset-nolazy/", name="test_nolazy_asset", group="asset") + session.add(asm) + asam = AssetAliasModel(name="test_nolazy_alias", group="test") + session.add(asam) + session.flush() + + # Pre-populate existing alias-event rows to simulate a long-running deployment. + # If .append() is used, SQLAlchemy will lazy-load ALL of these before inserting the new one. + existing_events = [AssetEvent(asset_id=asm.id, extra={}) for _ in range(5)] + session.add_all(existing_events) + session.flush() + for ev in existing_events: + session.execute( + sa_insert(asset_alias_asset_event_association_table).values(alias_id=asam.id, event_id=ev.id) + ) + session.flush() + + # Expire the alias so a lazy-load would have to hit the DB (no in-memory cache). + session.expire(asam) + + asset = Asset(uri="test://asset-nolazy", name="test_nolazy_asset") + asset_manager = AssetManager() + + lazy_load_selects: list[str] = [] + real_execute = session.execute + + def tracking_execute(stmt, *args, **kwargs): + try: + compiled = str(stmt.compile(compile_kwargs={"literal_binds": True})) + except Exception: + compiled = str(stmt) + # Detect a lazy-load SELECT joining asset_alias_asset_event with asset_event + if ( + "asset_alias_asset_event" in compiled.lower() + and "asset_event" in compiled.lower() + and compiled.strip().upper().startswith("SELECT") + ): + lazy_load_selects.append(compiled[:120]) + return real_execute(stmt, *args, **kwargs) + + with mock.patch.object(session, "execute", side_effect=tracking_execute): + asset_manager.register_asset_change( + task_instance=mock_task_instance, + asset=asset, + source_alias_names=["test_nolazy_alias"], + session=session, + ) + session.flush() + + # The new association row must exist + new_events = session.scalars( + select(AssetEvent).where( + AssetEvent.asset_id == asm.id, + AssetEvent.id.notin_([ev.id for ev in existing_events]), + ) + ).all() + assert len(new_events) == 1, "Expected exactly one new AssetEvent" + + row_count = session.scalar( + select(func.count()) + .select_from(asset_alias_asset_event_association_table) + .where( + asset_alias_asset_event_association_table.c.alias_id == asam.id, + asset_alias_asset_event_association_table.c.event_id == new_events[0].id, + ) + ) + assert row_count == 1, "Expected the alias-event association row to be written" + + assert lazy_load_selects == [], ( + f"Unexpected lazy-load SELECT on asset_alias_asset_event: {lazy_load_selects}" + ) + def test_register_asset_change_no_downstreams(self, session, mock_task_instance): asset_manager = AssetManager() From c226fa9dd945d1bd252fcf781c2b9a68838b1274 Mon Sep 17 00:00:00 2001 From: Hemkumar Chheda Date: Wed, 13 May 2026 16:56:10 +0530 Subject: [PATCH 02/11] chore: add newsfragment for PR #66854 --- airflow-core/newsfragments/66854.bugfix.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 airflow-core/newsfragments/66854.bugfix.rst diff --git a/airflow-core/newsfragments/66854.bugfix.rst b/airflow-core/newsfragments/66854.bugfix.rst new file mode 100644 index 0000000000000..178c8bd75c550 --- /dev/null +++ b/airflow-core/newsfragments/66854.bugfix.rst @@ -0,0 +1 @@ +Fix ``task_instance`` row-lock contention under high asset-event concurrency: asset events are now emitted after the TI state commit (releasing the PostgreSQL row lock), and alias-event associations are written via direct INSERT instead of ORM lazy-load. From 1c8cd00dfcbe9dc54700ff3afe62e0ccb8d9db26 Mon Sep 17 00:00:00 2001 From: Hemkumar Chheda Date: Wed, 13 May 2026 16:58:12 +0530 Subject: [PATCH 03/11] chore: remove newsfragment (internal bugfix, not a breaking change) --- airflow-core/newsfragments/66854.bugfix.rst | 1 - 1 file changed, 1 deletion(-) delete mode 100644 airflow-core/newsfragments/66854.bugfix.rst diff --git a/airflow-core/newsfragments/66854.bugfix.rst b/airflow-core/newsfragments/66854.bugfix.rst deleted file mode 100644 index 178c8bd75c550..0000000000000 --- a/airflow-core/newsfragments/66854.bugfix.rst +++ /dev/null @@ -1 +0,0 @@ -Fix ``task_instance`` row-lock contention under high asset-event concurrency: asset events are now emitted after the TI state commit (releasing the PostgreSQL row lock), and alias-event associations are written via direct INSERT instead of ORM lazy-load. From bfed2b16c2b906ae37b0f558bcc5bd12cd8cb951 Mon Sep 17 00:00:00 2001 From: Hemkumar Chheda Date: Tue, 19 May 2026 20:23:10 +0530 Subject: [PATCH 04/11] review: move import to top level and clarify commit/swallow comments Move `insert` from inline method import to the top-level sqlalchemy import block and drop the unnecessary `sa_insert` alias. Improve the session.commit() comment to explain why the early commit is still needed after the direct-INSERT alias-side fix, and clarify that the post-commit exception swallow is intentional. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- .../execution_api/routes/task_instances.py | 15 +++++++++------ airflow-core/tests/unit/assets/test_manager.py | 6 ++---- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index ac0ea47390567..81d1bf18ab475 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -492,9 +492,11 @@ def ti_update_state( ) ) # Commit the TI state update now to release the task_instance row lock before - # running asset-event queries. Asset registration can hold the lock for seconds - # under high concurrency (many aliases with large event histories), causing - # idle-in-transaction pile-up that exhausts API server memory and triggers OOMKill. + # running asset-event queries. The direct-INSERT fix in AssetManager removes + # the O(n) lazy-load on the alias-event table, but register_asset_changes_in_db + # also queries scheduled dags and inserts AssetDagRunQueue rows — all of which + # would otherwise hold the row lock and cause idle-in-transaction pile-up that + # exhausts API server memory and triggers OOMKill under high concurrency. # The task outcome is durable from this point on. session.commit() except DataError: @@ -531,9 +533,10 @@ def ti_update_state( task_id=task_id, ) - # Asset registration runs outside the TI row lock. Failures here are logged and - # swallowed — the task state is already committed, so returning HTTP 500 would be - # misleading and would cause unnecessary worker retries. + # Asset registration runs outside the TI row lock. The exception is intentionally + # swallowed after logging: the TI state is already committed above, so raising HTTP 500 + # here would be misleading (the task did succeed) and would cause the task-SDK worker + # to retry a state update for a task that has already completed. if isinstance(ti_patch_payload, TISuccessStatePayload) and ti_patch_payload.task_outlets: try: ti_for_assets = session.get(TI, task_instance_id) diff --git a/airflow-core/tests/unit/assets/test_manager.py b/airflow-core/tests/unit/assets/test_manager.py index 9e72bd5481619..1355a2b652b83 100644 --- a/airflow-core/tests/unit/assets/test_manager.py +++ b/airflow-core/tests/unit/assets/test_manager.py @@ -25,7 +25,7 @@ from unittest import mock import pytest -from sqlalchemy import delete, func, select +from sqlalchemy import delete, func, insert, select from sqlalchemy.orm import Session from airflow import settings @@ -185,8 +185,6 @@ def test_register_asset_change_with_alias_no_lazy_load( while the task_instance row lock is held in ti_update_state, causing idle-in-transaction pile-up that exhausts API server memory and triggers OOMKill. """ - from sqlalchemy import insert as sa_insert - asm = AssetModel(uri="test://asset-nolazy/", name="test_nolazy_asset", group="asset") session.add(asm) asam = AssetAliasModel(name="test_nolazy_alias", group="test") @@ -200,7 +198,7 @@ def test_register_asset_change_with_alias_no_lazy_load( session.flush() for ev in existing_events: session.execute( - sa_insert(asset_alias_asset_event_association_table).values(alias_id=asam.id, event_id=ev.id) + insert(asset_alias_asset_event_association_table).values(alias_id=asam.id, event_id=ev.id) ) session.flush() From 5d9771e910805776b08107b2ffadd4fe8035a1db Mon Sep 17 00:00:00 2001 From: Hemkumar Chheda Date: Wed, 27 May 2026 17:48:35 +0530 Subject: [PATCH 05/11] fix(assets): isolate asset registration transaction failures --- .../execution_api/routes/task_instances.py | 5 +- .../versions/head/test_task_instances.py | 124 ++++++++++++++++++ 2 files changed, 128 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index 81d1bf18ab475..f943625e6c0fd 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -494,7 +494,7 @@ def ti_update_state( # Commit the TI state update now to release the task_instance row lock before # running asset-event queries. The direct-INSERT fix in AssetManager removes # the O(n) lazy-load on the alias-event table, but register_asset_changes_in_db - # also queries scheduled dags and inserts AssetDagRunQueue rows — all of which + # also queries scheduled dags and inserts AssetDagRunQueue rows - all of which # would otherwise hold the row lock and cause idle-in-transaction pile-up that # exhausts API server memory and triggers OOMKill under high concurrency. # The task outcome is durable from this point on. @@ -525,7 +525,9 @@ def ti_update_state( task_id=task_id, map_index=map_index, ) + session.commit() except Exception: + session.rollback() log.warning( "Failed to clear task state on success", dag_id=dag_id, @@ -548,6 +550,7 @@ def ti_update_state( session, ) except Exception: + session.rollback() log.exception( "Failed to register asset changes; task state is already committed", task_instance_id=str(task_instance_id), diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index e25815a6e32c5..16faec047b041 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -1456,6 +1456,62 @@ def test_ti_update_state_to_success_asset_registration_failure_returns_204( assert ti_db is not None assert ti_db.state == TaskInstanceState.SUCCESS + def test_ti_update_state_rolls_back_partial_asset_registration_on_failure( + self, client, session, create_task_instance + ): + asset = AssetModel( + id=43, + name="partial-asset", + uri="s3://bucket/partial-asset", + group="asset", + extra={}, + ) + session.add_all([asset, AssetActive.for_asset(asset)]) + + ti = create_task_instance( + task_id="test_partial_asset_registration_failure", + start_date=DEFAULT_START_DATE, + state=State.RUNNING, + ) + session.commit() + + def add_event_then_fail(ti, task_outlets, outlet_events, session): + session.add( + AssetEvent( + asset_id=asset.id, + extra={"partial": True}, + source_task_id=ti.task_id, + source_dag_id=ti.dag_id, + source_run_id=ti.run_id, + source_map_index=ti.map_index, + ) + ) + session.flush() + raise RuntimeError("simulated failure after partial asset registration") + + with mock.patch( + "airflow.models.taskinstance.TaskInstance.register_asset_changes_in_db", + side_effect=add_event_then_fail, + ): + response = client.patch( + f"/execution/task-instances/{ti.id}/state", + json={ + "state": "success", + "end_date": DEFAULT_END_DATE.isoformat(), + "task_outlets": [ + {"name": "partial-asset", "uri": "s3://bucket/partial-asset", "type": "Asset"} + ], + "outlet_events": [], + }, + ) + + assert response.status_code == 204, f"Expected 204, got {response.status_code}: {response.text}" + session.expire_all() + ti_db = session.get(TaskInstance, ti.id) + assert ti_db is not None + assert ti_db.state == TaskInstanceState.SUCCESS + assert session.scalars(select(AssetEvent).where(AssetEvent.asset_id == asset.id)).all() == [] + def test_ti_update_state_database_error(self, client, session, create_task_instance): """ Test that a database error is handled correctly when updating the Task Instance state. @@ -2211,6 +2267,74 @@ def test_ti_update_state_to_success_clears_task_state(self, client, session, cre select(TaskStateStoreModel).where(TaskStateStoreModel.task_id == ti.task_id) ).all() + @pytest.mark.db_test + @conf_vars({("state_store", "clear_on_success"): "True"}) + def test_asset_registration_failure_does_not_rollback_successful_task_state_clear( + self, client, session, create_task_instance + ): + asset = AssetModel( + id=44, + name="partial-asset-with-state-clear", + uri="s3://bucket/partial-asset-with-state-clear", + group="asset", + extra={}, + ) + session.add_all([asset, AssetActive.for_asset(asset)]) + + ti = create_task_instance( + task_id="test_asset_failure_after_state_clear", + start_date=DEFAULT_START_DATE, + state=State.RUNNING, + ) + session.commit() + + backend = MetastoreStateBackend() + scope = TaskScope(dag_id=ti.dag_id, run_id=ti.run_id, task_id=ti.task_id, map_index=ti.map_index) + backend.set(scope, "job_id", "app_1234", session=session) + session.commit() + + def add_event_then_fail(ti, task_outlets, outlet_events, session): + session.add( + AssetEvent( + asset_id=asset.id, + extra={"partial": True}, + source_task_id=ti.task_id, + source_dag_id=ti.dag_id, + source_run_id=ti.run_id, + source_map_index=ti.map_index, + ) + ) + session.flush() + raise RuntimeError("simulated failure after state clear") + + with mock.patch( + "airflow.models.taskinstance.TaskInstance.register_asset_changes_in_db", + side_effect=add_event_then_fail, + ): + response = client.patch( + f"/execution/task-instances/{ti.id}/state", + json={ + "state": "success", + "end_date": DEFAULT_END_DATE.isoformat(), + "task_outlets": [ + { + "name": "partial-asset-with-state-clear", + "uri": "s3://bucket/partial-asset-with-state-clear", + "type": "Asset", + } + ], + "outlet_events": [], + }, + ) + + assert response.status_code == 204, f"Expected 204, got {response.status_code}: {response.text}" + session.expire_all() + ti_db = session.get(TaskInstance, ti.id) + assert ti_db is not None + assert ti_db.state == TaskInstanceState.SUCCESS + assert not session.scalars(select(TaskStateModel).where(TaskStateModel.task_id == ti.task_id)).all() + assert session.scalars(select(AssetEvent).where(AssetEvent.asset_id == asset.id)).all() == [] + @pytest.mark.db_test @conf_vars({("state_store", "clear_on_success"): "True"}) def test_ti_update_state_to_failed_does_not_clear_task_state(self, client, session, create_task_instance): From df3b3da78d96637fabb653a3e6a83acac23da834 Mon Sep 17 00:00:00 2001 From: Hemkumar Chheda Date: Wed, 27 May 2026 22:44:30 +0530 Subject: [PATCH 06/11] fix(assets): handle asset registration commit failures --- .../execution_api/routes/task_instances.py | 1 + airflow-core/src/airflow/assets/manager.py | 11 ++-- .../versions/head/test_task_instances.py | 56 +++++++++++++++++++ 3 files changed, 62 insertions(+), 6 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index f943625e6c0fd..ca594bebc10b3 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -549,6 +549,7 @@ def ti_update_state( ti_patch_payload.outlet_events, session, ) + session.commit() except Exception: session.rollback() log.exception( diff --git a/airflow-core/src/airflow/assets/manager.py b/airflow-core/src/airflow/assets/manager.py index 7a85812eb74f0..c9430bb54ff8a 100644 --- a/airflow-core/src/airflow/assets/manager.py +++ b/airflow-core/src/airflow/assets/manager.py @@ -359,9 +359,8 @@ def register_asset_change( for asset_alias_model in asset_alias_models: # Use a direct INSERT rather than ORM .append() to avoid lazy-loading the # entire asset_events collection. On long-running deployments that collection - # can contain thousands of rows, and loading it while the task_instance row - # lock is held (from the calling ti_update_state handler) causes the DB - # connection to sit idle-in-transaction for minutes, blocking other workers. + # can contain thousands of rows; loading it on the task-success hot path can + # leave DB connections idle-in-transaction for minutes, blocking other workers. session.execute( insert(asset_alias_asset_event_association_table).values( alias_id=asset_alias_model.id, @@ -518,9 +517,9 @@ def _queue_dagruns( # constraint violation. # # If we support it, use ON CONFLICT to do nothing, otherwise - # "fallback" to running this in a nested transaction. This is needed - # so that the adding of these rows happens in the same transaction - # where `ti.state` is changed. + # "fallback" to running this in a nested transaction. Some callers + # run this as part of a TI state transaction; the Execution API commits + # the TI state first, then runs asset registration in a separate transaction. if get_dialect_name(session) == "postgresql": return cls._queue_dagruns_nonpartitioned_postgres(asset_id, non_partitioned_dags, session) return cls._queue_dagruns_nonpartitioned_slow_path(asset_id, non_partitioned_dags, session) diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index 16faec047b041..e3236405b98f7 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -1512,6 +1512,62 @@ def add_event_then_fail(ti, task_outlets, outlet_events, session): assert ti_db.state == TaskInstanceState.SUCCESS assert session.scalars(select(AssetEvent).where(AssetEvent.asset_id == asset.id)).all() == [] + def test_ti_update_state_swallow_asset_registration_commit_failure( + self, client, session, create_task_instance + ): + asset = AssetModel( + id=44, + name="commit-fail-asset", + uri="s3://bucket/commit-fail-asset", + group="asset", + extra={}, + ) + session.add_all([asset, AssetActive.for_asset(asset)]) + + ti = create_task_instance( + task_id="test_asset_registration_commit_failure", + start_date=DEFAULT_START_DATE, + state=State.RUNNING, + ) + session.commit() + + real_commit = Session.commit + commit_count = 0 + failed_asset_commit = False + + def fail_second_commit(session): + nonlocal commit_count, failed_asset_commit + commit_count += 1 + if commit_count == 2: + failed_asset_commit = True + raise RuntimeError("simulated asset registration commit failure") + return real_commit(session) + + with mock.patch("airflow.api_fastapi.common.db.common.Session.commit", fail_second_commit): + response = client.patch( + f"/execution/task-instances/{ti.id}/state", + json={ + "state": "success", + "end_date": DEFAULT_END_DATE.isoformat(), + "task_outlets": [ + { + "name": "commit-fail-asset", + "uri": "s3://bucket/commit-fail-asset", + "type": "Asset", + } + ], + "outlet_events": [], + }, + ) + + assert response.status_code == 204, f"Expected 204, got {response.status_code}: {response.text}" + assert failed_asset_commit + session.expire_all() + ti_db = session.get(TaskInstance, ti.id) + assert ti_db is not None + assert ti_db.state == TaskInstanceState.SUCCESS + assert session.scalars(select(AssetEvent).where(AssetEvent.asset_id == asset.id)).all() == [] + def test_ti_update_state_database_error(self, client, session, create_task_instance): """ Test that a database error is handled correctly when updating the Task Instance state. From 3aee336b56f8fdeb4b8d51bf246b7441337f4af2 Mon Sep 17 00:00:00 2001 From: Hemkumar Chheda Date: Thu, 4 Jun 2026 08:51:00 +0530 Subject: [PATCH 07/11] fix(assets): track post-commit registration failures --- .../execution_api/routes/task_instances.py | 12 +++-- airflow-core/src/airflow/assets/manager.py | 1 + .../versions/head/test_task_instances.py | 52 ++++++++++++++----- .../metrics/metrics_template.yaml | 6 +++ 4 files changed, 52 insertions(+), 19 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index ca594bebc10b3..e17266a571a42 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -40,6 +40,7 @@ from sqlalchemy.sql import select from structlog.contextvars import bind_contextvars +from airflow._shared.observability.metrics import stats from airflow._shared.observability.traces import override_ids from airflow._shared.state import TaskScope from airflow._shared.timezones import timezone @@ -535,10 +536,10 @@ def ti_update_state( task_id=task_id, ) - # Asset registration runs outside the TI row lock. The exception is intentionally - # swallowed after logging: the TI state is already committed above, so raising HTTP 500 - # here would be misleading (the task did succeed) and would cause the task-SDK worker - # to retry a state update for a task that has already completed. + # Asset registration runs outside the TI row lock. Failures are logged and counted; + # raising HTTP 500 here would be misleading because the task already succeeded and + # would make the worker retry a state update that has already completed. Durable + # retry/reconciliation for dropped asset events is out of scope for this hot-path fix. if isinstance(ti_patch_payload, TISuccessStatePayload) and ti_patch_payload.task_outlets: try: ti_for_assets = session.get(TI, task_instance_id) @@ -547,11 +548,12 @@ def ti_update_state( ti_for_assets, ti_patch_payload.task_outlets, ti_patch_payload.outlet_events, - session, + session=session, ) session.commit() except Exception: session.rollback() + stats.incr("asset.registration_failures") log.exception( "Failed to register asset changes; task state is already committed", task_instance_id=str(task_instance_id), diff --git a/airflow-core/src/airflow/assets/manager.py b/airflow-core/src/airflow/assets/manager.py index c9430bb54ff8a..8c7ac5de73a11 100644 --- a/airflow-core/src/airflow/assets/manager.py +++ b/airflow-core/src/airflow/assets/manager.py @@ -361,6 +361,7 @@ def register_asset_change( # entire asset_events collection. On long-running deployments that collection # can contain thousands of rows; loading it on the task-success hot path can # leave DB connections idle-in-transaction for minutes, blocking other workers. + # This intentionally leaves asset_alias_model.asset_events unsynced in-session. session.execute( insert(asset_alias_asset_event_association_table).values( alias_id=asset_alias_model.id, diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index e3236405b98f7..55b6e2c8f8c29 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -1434,9 +1434,12 @@ def test_ti_update_state_to_success_asset_registration_failure_returns_204( ) session.commit() - with mock.patch( - "airflow.models.taskinstance.TaskInstance.register_asset_changes_in_db", - side_effect=Exception("simulated DB explosion during asset registration"), + with ( + mock.patch( + "airflow.models.taskinstance.TaskInstance.register_asset_changes_in_db", + side_effect=Exception("simulated DB explosion during asset registration"), + ), + mock.patch("airflow.api_fastapi.execution_api.routes.task_instances.stats.incr") as mock_incr, ): response = client.patch( f"/execution/task-instances/{ti.id}/state", @@ -1455,6 +1458,7 @@ def test_ti_update_state_to_success_asset_registration_failure_returns_204( ti_db = session.get(TaskInstance, ti.id) assert ti_db is not None assert ti_db.state == TaskInstanceState.SUCCESS + mock_incr.assert_any_call("asset.registration_failures") def test_ti_update_state_rolls_back_partial_asset_registration_on_failure( self, client, session, create_task_instance @@ -1489,9 +1493,12 @@ def add_event_then_fail(ti, task_outlets, outlet_events, session): session.flush() raise RuntimeError("simulated failure after partial asset registration") - with mock.patch( - "airflow.models.taskinstance.TaskInstance.register_asset_changes_in_db", - side_effect=add_event_then_fail, + with ( + mock.patch( + "airflow.models.taskinstance.TaskInstance.register_asset_changes_in_db", + side_effect=add_event_then_fail, + ), + mock.patch("airflow.api_fastapi.execution_api.routes.task_instances.stats.incr") as mock_incr, ): response = client.patch( f"/execution/task-instances/{ti.id}/state", @@ -1511,6 +1518,7 @@ def add_event_then_fail(ti, task_outlets, outlet_events, session): assert ti_db is not None assert ti_db.state == TaskInstanceState.SUCCESS assert session.scalars(select(AssetEvent).where(AssetEvent.asset_id == asset.id)).all() == [] + mock_incr.assert_any_call("asset.registration_failures") def test_ti_update_state_swallow_asset_registration_commit_failure( self, client, session, create_task_instance @@ -1531,19 +1539,34 @@ def test_ti_update_state_swallow_asset_registration_commit_failure( ) session.commit() + real_register_asset_changes_in_db = TaskInstance.register_asset_changes_in_db real_commit = Session.commit - commit_count = 0 + asset_registration_started = False failed_asset_commit = False - def fail_second_commit(session): - nonlocal commit_count, failed_asset_commit - commit_count += 1 - if commit_count == 2: + def register_asset_changes_then_mark_started(ti, task_outlets, outlet_events, *, session): + nonlocal asset_registration_started + real_register_asset_changes_in_db(ti, task_outlets, outlet_events, session=session) + asset_registration_started = True + + def fail_asset_registration_commit(session): + nonlocal failed_asset_commit + if asset_registration_started and not failed_asset_commit: failed_asset_commit = True raise RuntimeError("simulated asset registration commit failure") return real_commit(session) - with mock.patch("airflow.api_fastapi.common.db.common.Session.commit", fail_second_commit): + with ( + mock.patch( + "airflow.models.taskinstance.TaskInstance.register_asset_changes_in_db", + side_effect=register_asset_changes_then_mark_started, + ), + mock.patch( + "airflow.api_fastapi.common.db.common.Session.commit", + fail_asset_registration_commit, + ), + mock.patch("airflow.api_fastapi.execution_api.routes.task_instances.stats.incr") as mock_incr, + ): response = client.patch( f"/execution/task-instances/{ti.id}/state", json={ @@ -1567,6 +1590,7 @@ def fail_second_commit(session): assert ti_db is not None assert ti_db.state == TaskInstanceState.SUCCESS assert session.scalars(select(AssetEvent).where(AssetEvent.asset_id == asset.id)).all() == [] + mock_incr.assert_any_call("asset.registration_failures") def test_ti_update_state_database_error(self, client, session, create_task_instance): """ @@ -2344,7 +2368,7 @@ def test_asset_registration_failure_does_not_rollback_successful_task_state_clea ) session.commit() - backend = MetastoreStateBackend() + backend = MetastoreStoreBackend() scope = TaskScope(dag_id=ti.dag_id, run_id=ti.run_id, task_id=ti.task_id, map_index=ti.map_index) backend.set(scope, "job_id", "app_1234", session=session) session.commit() @@ -2388,7 +2412,7 @@ def add_event_then_fail(ti, task_outlets, outlet_events, session): ti_db = session.get(TaskInstance, ti.id) assert ti_db is not None assert ti_db.state == TaskInstanceState.SUCCESS - assert not session.scalars(select(TaskStateModel).where(TaskStateModel.task_id == ti.task_id)).all() + assert not session.scalars(select(TaskStoreModel).where(TaskStoreModel.task_id == ti.task_id)).all() assert session.scalars(select(AssetEvent).where(AssetEvent.asset_id == asset.id)).all() == [] @pytest.mark.db_test diff --git a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml index 0ba1563116fe4..72bbe3d78e035 100644 --- a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml +++ b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml @@ -290,6 +290,12 @@ metrics: legacy_name: "-" name_variables: [] + - name: "asset.registration_failures" + description: "Number of task success asset registration failures after the task state was updated" + type: "counter" + legacy_name: "-" + name_variables: [] + - name: "asset.orphaned" description: "Number of assets marked as orphans because they are no longer referenced in Dag schedule parameters or task outlets" From 77f2e8eac9cc7a7969c316a447ecc836944d6194 Mon Sep 17 00:00:00 2001 From: Hemkumar Chheda Date: Thu, 4 Jun 2026 11:05:39 +0530 Subject: [PATCH 08/11] fix(ci): skip volatile dependency dirs in connection doc check --- .../ci/prek/check_connection_doc_labels.py | 18 ++++++-- .../prek/test_check_connection_doc_labels.py | 44 +++++++++++++++++++ 2 files changed, 58 insertions(+), 4 deletions(-) create mode 100644 scripts/tests/ci/prek/test_check_connection_doc_labels.py diff --git a/scripts/ci/prek/check_connection_doc_labels.py b/scripts/ci/prek/check_connection_doc_labels.py index 8dc16b346eaae..e3f0e714a2a35 100755 --- a/scripts/ci/prek/check_connection_doc_labels.py +++ b/scripts/ci/prek/check_connection_doc_labels.py @@ -39,6 +39,7 @@ import re import sys +from os import walk from pathlib import Path from rich.console import Console @@ -62,6 +63,7 @@ TOP_LEVEL_ANCHOR_RE = re.compile(r"^\.\.\s+_howto/connection:([a-zA-Z0-9_-]+):\s*$", re.MULTILINE) ANY_ANCHOR_RE = re.compile(r"^\.\.\s+_(howto/connection:[^\s]+?):\s*$", re.MULTILINE) REF_RE = re.compile(r":ref:`(?:[^`]*<(howto/connection:[^>]+)>|(howto/connection:[^`]+))`") +SKIP_SCAN_DIRS = frozenset({"node_modules", ".pnpm-store"}) def collect_connection_types() -> set[str]: @@ -72,18 +74,26 @@ def collect_connection_types() -> set[str]: return conn_types +def collect_files(root: Path, suffix: str) -> list[Path]: + files: list[Path] = [] + for current_root, dirnames, filenames in walk(root): + dirnames[:] = [dirname for dirname in dirnames if dirname not in SKIP_SCAN_DIRS] + files.extend(Path(current_root, filename) for filename in filenames if filename.endswith(suffix)) + return sorted(files) + + def collect_rst_files() -> list[Path]: - rst_files: list[Path] = list(AIRFLOW_PROVIDERS_ROOT_PATH.rglob("*.rst")) + rst_files: list[Path] = collect_files(AIRFLOW_PROVIDERS_ROOT_PATH, ".rst") core_docs = AIRFLOW_ROOT_PATH / "airflow-core" / "docs" if core_docs.is_dir(): - rst_files.extend(core_docs.rglob("*.rst")) + rst_files.extend(collect_files(core_docs, ".rst")) return rst_files def collect_python_files() -> list[Path]: - py_files: list[Path] = list(AIRFLOW_PROVIDERS_ROOT_PATH.rglob("*.py")) + py_files: list[Path] = collect_files(AIRFLOW_PROVIDERS_ROOT_PATH, ".py") if AIRFLOW_CORE_SOURCES_PATH.is_dir(): - py_files.extend(AIRFLOW_CORE_SOURCES_PATH.rglob("*.py")) + py_files.extend(collect_files(AIRFLOW_CORE_SOURCES_PATH, ".py")) return py_files diff --git a/scripts/tests/ci/prek/test_check_connection_doc_labels.py b/scripts/tests/ci/prek/test_check_connection_doc_labels.py new file mode 100644 index 0000000000000..8775e732451b8 --- /dev/null +++ b/scripts/tests/ci/prek/test_check_connection_doc_labels.py @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from check_connection_doc_labels import collect_files + + +def test_collect_files_skips_volatile_dependency_directories(tmp_path): + source_file = tmp_path / "provider" / "docs" / "connection.rst" + source_file.parent.mkdir(parents=True) + source_file.touch() + + node_modules_file = tmp_path / "ui" / "node_modules" / "package" / "docs" / "connection.rst" + node_modules_file.parent.mkdir(parents=True) + node_modules_file.touch() + + pnpm_store_file = tmp_path / "ui" / ".pnpm-store" / "package" / "docs" / "connection.rst" + pnpm_store_file.parent.mkdir(parents=True) + pnpm_store_file.touch() + + assert collect_files(tmp_path, ".rst") == [source_file] + + +def test_collect_files_matches_suffix(tmp_path): + python_file = tmp_path / "src" / "module.py" + python_file.parent.mkdir(parents=True) + python_file.touch() + (python_file.parent / "module.pyi").touch() + + assert collect_files(tmp_path, ".py") == [python_file] From fd147db4b0d14d3b8c4a95be3a0756a242cac125 Mon Sep 17 00:00:00 2001 From: Hemkumar Chheda Date: Tue, 9 Jun 2026 17:41:52 +0530 Subject: [PATCH 09/11] fix(assets): address asset registration review feedback --- .../execution_api/routes/task_instances.py | 6 +- .../airflow_local_settings.py | 169 ++++++++++-------- .../versions/head/test_task_instances.py | 56 ++++++ .../tests/unit/assets/test_manager.py | 27 ++- 4 files changed, 168 insertions(+), 90 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index e17266a571a42..02901104ad2b4 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -540,7 +540,11 @@ def ti_update_state( # raising HTTP 500 here would be misleading because the task already succeeded and # would make the worker retry a state update that has already completed. Durable # retry/reconciliation for dropped asset events is out of scope for this hot-path fix. - if isinstance(ti_patch_payload, TISuccessStatePayload) and ti_patch_payload.task_outlets: + if ( + updated_state == TaskInstanceState.SUCCESS + and isinstance(ti_patch_payload, TISuccessStatePayload) + and ti_patch_payload.task_outlets + ): try: ti_for_assets = session.get(TI, task_instance_id) if ti_for_assets is not None: diff --git a/airflow-core/src/airflow/config_templates/airflow_local_settings.py b/airflow-core/src/airflow/config_templates/airflow_local_settings.py index e30e68fb17984..4e561a1a26d67 100644 --- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py +++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py @@ -174,16 +174,19 @@ def _default_conn_name_from(mod_path, hook_name): from airflow.providers.amazon.aws.log.s3_task_handler import S3RemoteLogIO _default_conn_name_from("airflow.providers.amazon.aws.hooks.s3", "S3Hook") - REMOTE_TASK_LOG = S3RemoteLogIO( - **cast( - "dict[str, Any]", - { - "base_log_folder": BASE_LOG_FOLDER, - "remote_base": remote_base_log_folder, - "delete_local_copy": delete_local_copy, - } - | _io_kwargs, - ) + REMOTE_TASK_LOG = cast( + "RemoteLogIO | RemoteLogStreamIO", + S3RemoteLogIO( + **cast( + "dict[str, Any]", + { + "base_log_folder": BASE_LOG_FOLDER, + "remote_base": remote_base_log_folder, + "delete_local_copy": delete_local_copy, + } + | _io_kwargs, + ) + ), ) elif remote_base_log_folder.startswith("cloudwatch://"): @@ -191,17 +194,20 @@ def _default_conn_name_from(mod_path, hook_name): _default_conn_name_from("airflow.providers.amazon.aws.hooks.logs", "AwsLogsHook") url_parts = urlsplit(remote_base_log_folder) - REMOTE_TASK_LOG = CloudWatchRemoteLogIO( - **cast( - "dict[str, Any]", - { - "base_log_folder": BASE_LOG_FOLDER, - "remote_base": remote_base_log_folder, - "delete_local_copy": delete_local_copy, - "log_group_arn": url_parts.netloc + url_parts.path, - } - | _io_kwargs, - ) + REMOTE_TASK_LOG = cast( + "RemoteLogIO | RemoteLogStreamIO", + CloudWatchRemoteLogIO( + **cast( + "dict[str, Any]", + { + "base_log_folder": BASE_LOG_FOLDER, + "remote_base": remote_base_log_folder, + "delete_local_copy": delete_local_copy, + "log_group_arn": url_parts.netloc + url_parts.path, + } + | _io_kwargs, + ) + ), ) elif remote_base_log_folder.startswith("gs://"): @@ -210,17 +216,20 @@ def _default_conn_name_from(mod_path, hook_name): _default_conn_name_from("airflow.providers.google.cloud.hooks.gcs", "GCSHook") key_path = conf.get_mandatory_value("logging", "google_key_path", fallback=None) - REMOTE_TASK_LOG = GCSRemoteLogIO( - **cast( - "dict[str, Any]", - { - "base_log_folder": BASE_LOG_FOLDER, - "remote_base": remote_base_log_folder, - "delete_local_copy": delete_local_copy, - "gcp_key_path": key_path, - } - | _io_kwargs, - ) + REMOTE_TASK_LOG = cast( + "RemoteLogIO | RemoteLogStreamIO", + GCSRemoteLogIO( + **cast( + "dict[str, Any]", + { + "base_log_folder": BASE_LOG_FOLDER, + "remote_base": remote_base_log_folder, + "delete_local_copy": delete_local_copy, + "gcp_key_path": key_path, + } + | _io_kwargs, + ) + ), ) elif remote_base_log_folder.startswith("wasb"): @@ -234,17 +243,20 @@ def _default_conn_name_from(mod_path, hook_name): # Handle both URI format (wasb://logs) and plain path (e.g., wasb-logs) wasb_remote_base = remote_base_log_folder.removeprefix("wasb://") - REMOTE_TASK_LOG = WasbRemoteLogIO( - **cast( - "dict[str, Any]", - { - "base_log_folder": BASE_LOG_FOLDER, - "remote_base": wasb_remote_base, - "delete_local_copy": delete_local_copy, - "wasb_container": wasb_log_container, - } - | _io_kwargs, - ) + REMOTE_TASK_LOG = cast( + "RemoteLogIO | RemoteLogStreamIO", + WasbRemoteLogIO( + **cast( + "dict[str, Any]", + { + "base_log_folder": BASE_LOG_FOLDER, + "remote_base": wasb_remote_base, + "delete_local_copy": delete_local_copy, + "wasb_container": wasb_log_container, + } + | _io_kwargs, + ) + ), ) elif remote_base_log_folder.startswith("stackdriver://"): @@ -254,17 +266,20 @@ def _default_conn_name_from(mod_path, hook_name): # stackdriver:///airflow-tasks => airflow-tasks log_name = urlsplit(remote_base_log_folder).path[1:] - REMOTE_TASK_LOG = StackdriverRemoteLogIO( - **cast( - "dict[str, Any]", - { - "base_log_folder": BASE_LOG_FOLDER, - "gcp_log_name": log_name, - "gcp_key_path": key_path, - "delete_local_copy": delete_local_copy, - } - | _io_kwargs, - ) + REMOTE_TASK_LOG = cast( + "RemoteLogIO | RemoteLogStreamIO", + StackdriverRemoteLogIO( + **cast( + "dict[str, Any]", + { + "base_log_folder": BASE_LOG_FOLDER, + "gcp_log_name": log_name, + "gcp_key_path": key_path, + "delete_local_copy": delete_local_copy, + } + | _io_kwargs, + ) + ), ) elif remote_base_log_folder.startswith("oss://"): @@ -272,16 +287,19 @@ def _default_conn_name_from(mod_path, hook_name): _default_conn_name_from("airflow.providers.alibaba.cloud.hooks.oss", "OSSHook") - REMOTE_TASK_LOG = OSSRemoteLogIO( - **cast( - "dict[str, Any]", - { - "base_log_folder": BASE_LOG_FOLDER, - "remote_base": remote_base_log_folder, - "delete_local_copy": delete_local_copy, - } - | _io_kwargs, - ) + REMOTE_TASK_LOG = cast( + "RemoteLogIO | RemoteLogStreamIO", + OSSRemoteLogIO( + **cast( + "dict[str, Any]", + { + "base_log_folder": BASE_LOG_FOLDER, + "remote_base": remote_base_log_folder, + "delete_local_copy": delete_local_copy, + } + | _io_kwargs, + ) + ), ) elif remote_base_log_folder.startswith("hdfs://"): @@ -289,16 +307,19 @@ def _default_conn_name_from(mod_path, hook_name): _default_conn_name_from("airflow.providers.apache.hdfs.hooks.webhdfs", "WebHDFSHook") - REMOTE_TASK_LOG = HdfsRemoteLogIO( - **cast( - "dict[str, Any]", - { - "base_log_folder": BASE_LOG_FOLDER, - "remote_base": urlsplit(remote_base_log_folder).path, - "delete_local_copy": delete_local_copy, - } - | _io_kwargs, - ) + REMOTE_TASK_LOG = cast( + "RemoteLogIO | RemoteLogStreamIO", + HdfsRemoteLogIO( + **cast( + "dict[str, Any]", + { + "base_log_folder": BASE_LOG_FOLDER, + "remote_base": urlsplit(remote_base_log_folder).path, + "delete_local_copy": delete_local_copy, + } + | _io_kwargs, + ) + ), ) elif ELASTICSEARCH_HOST: diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index 55b6e2c8f8c29..5e887293d0c3e 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -1460,6 +1460,62 @@ def test_ti_update_state_to_success_asset_registration_failure_returns_204( assert ti_db.state == TaskInstanceState.SUCCESS mock_incr.assert_any_call("asset.registration_failures") + def test_ti_update_state_forced_failure_skips_asset_registration( + self, client, session, create_task_instance + ): + """A forced failure from the state-update path must not register success assets.""" + asset = AssetModel( + id=45, + name="forced-failure-asset", + uri="s3://bucket/forced-failure-asset", + group="asset", + extra={}, + ) + session.add_all([asset, AssetActive.for_asset(asset)]) + + ti = create_task_instance( + task_id="test_forced_failure_skips_asset_registration", + start_date=DEFAULT_START_DATE, + state=State.RUNNING, + ) + session.commit() + + with ( + mock.patch( + "airflow.api_fastapi.execution_api.routes.task_instances." + "_create_ti_state_update_query_and_update_state", + side_effect=RuntimeError("simulated state-update failure"), + ), + mock.patch( + "airflow.models.taskinstance.TaskInstance.register_asset_changes_in_db" + ) as mock_register_asset_changes, + mock.patch("airflow.api_fastapi.execution_api.routes.task_instances.stats.incr") as mock_incr, + ): + response = client.patch( + f"/execution/task-instances/{ti.id}/state", + json={ + "state": "success", + "end_date": DEFAULT_END_DATE.isoformat(), + "task_outlets": [ + { + "name": "forced-failure-asset", + "uri": "s3://bucket/forced-failure-asset", + "type": "Asset", + } + ], + "outlet_events": [], + }, + ) + + assert response.status_code == 204, f"Expected 204, got {response.status_code}: {response.text}" + session.expire_all() + ti_db = session.get(TaskInstance, ti.id) + assert ti_db is not None + assert ti_db.state == TaskInstanceState.FAILED + assert session.scalars(select(AssetEvent).where(AssetEvent.asset_id == asset.id)).all() == [] + mock_register_asset_changes.assert_not_called() + mock_incr.assert_not_called() + def test_ti_update_state_rolls_back_partial_asset_registration_on_failure( self, client, session, create_task_instance ): diff --git a/airflow-core/tests/unit/assets/test_manager.py b/airflow-core/tests/unit/assets/test_manager.py index 1355a2b652b83..633461e38bdf1 100644 --- a/airflow-core/tests/unit/assets/test_manager.py +++ b/airflow-core/tests/unit/assets/test_manager.py @@ -25,7 +25,7 @@ from unittest import mock import pytest -from sqlalchemy import delete, func, insert, select +from sqlalchemy import delete, event, func, insert, select from sqlalchemy.orm import Session from airflow import settings @@ -209,30 +209,27 @@ def test_register_asset_change_with_alias_no_lazy_load( asset_manager = AssetManager() lazy_load_selects: list[str] = [] - real_execute = session.execute - def tracking_execute(stmt, *args, **kwargs): - try: - compiled = str(stmt.compile(compile_kwargs={"literal_binds": True})) - except Exception: - compiled = str(stmt) - # Detect a lazy-load SELECT joining asset_alias_asset_event with asset_event + def track_sql(_conn, _cursor, statement, _parameters, _context, _executemany): if ( - "asset_alias_asset_event" in compiled.lower() - and "asset_event" in compiled.lower() - and compiled.strip().upper().startswith("SELECT") + "asset_alias_asset_event" in statement.lower() + and "asset_event" in statement.lower() + and statement.strip().upper().startswith("SELECT") ): - lazy_load_selects.append(compiled[:120]) - return real_execute(stmt, *args, **kwargs) + lazy_load_selects.append(statement[:120]) - with mock.patch.object(session, "execute", side_effect=tracking_execute): + assert session.bind is not None + event.listen(session.bind, "before_cursor_execute", track_sql) + try: asset_manager.register_asset_change( task_instance=mock_task_instance, asset=asset, source_alias_names=["test_nolazy_alias"], session=session, ) - session.flush() + session.flush() + finally: + event.remove(session.bind, "before_cursor_execute", track_sql) # The new association row must exist new_events = session.scalars( From 6956a8117494c40210d5567d99e22b23267d78f0 Mon Sep 17 00:00:00 2001 From: Hemkumar Chheda Date: Fri, 12 Jun 2026 16:42:30 +0530 Subject: [PATCH 10/11] chore(assets): remove unrelated CI script changes --- .../ci/prek/check_connection_doc_labels.py | 18 ++------ .../prek/test_check_connection_doc_labels.py | 44 ------------------- 2 files changed, 4 insertions(+), 58 deletions(-) delete mode 100644 scripts/tests/ci/prek/test_check_connection_doc_labels.py diff --git a/scripts/ci/prek/check_connection_doc_labels.py b/scripts/ci/prek/check_connection_doc_labels.py index e3f0e714a2a35..8dc16b346eaae 100755 --- a/scripts/ci/prek/check_connection_doc_labels.py +++ b/scripts/ci/prek/check_connection_doc_labels.py @@ -39,7 +39,6 @@ import re import sys -from os import walk from pathlib import Path from rich.console import Console @@ -63,7 +62,6 @@ TOP_LEVEL_ANCHOR_RE = re.compile(r"^\.\.\s+_howto/connection:([a-zA-Z0-9_-]+):\s*$", re.MULTILINE) ANY_ANCHOR_RE = re.compile(r"^\.\.\s+_(howto/connection:[^\s]+?):\s*$", re.MULTILINE) REF_RE = re.compile(r":ref:`(?:[^`]*<(howto/connection:[^>]+)>|(howto/connection:[^`]+))`") -SKIP_SCAN_DIRS = frozenset({"node_modules", ".pnpm-store"}) def collect_connection_types() -> set[str]: @@ -74,26 +72,18 @@ def collect_connection_types() -> set[str]: return conn_types -def collect_files(root: Path, suffix: str) -> list[Path]: - files: list[Path] = [] - for current_root, dirnames, filenames in walk(root): - dirnames[:] = [dirname for dirname in dirnames if dirname not in SKIP_SCAN_DIRS] - files.extend(Path(current_root, filename) for filename in filenames if filename.endswith(suffix)) - return sorted(files) - - def collect_rst_files() -> list[Path]: - rst_files: list[Path] = collect_files(AIRFLOW_PROVIDERS_ROOT_PATH, ".rst") + rst_files: list[Path] = list(AIRFLOW_PROVIDERS_ROOT_PATH.rglob("*.rst")) core_docs = AIRFLOW_ROOT_PATH / "airflow-core" / "docs" if core_docs.is_dir(): - rst_files.extend(collect_files(core_docs, ".rst")) + rst_files.extend(core_docs.rglob("*.rst")) return rst_files def collect_python_files() -> list[Path]: - py_files: list[Path] = collect_files(AIRFLOW_PROVIDERS_ROOT_PATH, ".py") + py_files: list[Path] = list(AIRFLOW_PROVIDERS_ROOT_PATH.rglob("*.py")) if AIRFLOW_CORE_SOURCES_PATH.is_dir(): - py_files.extend(collect_files(AIRFLOW_CORE_SOURCES_PATH, ".py")) + py_files.extend(AIRFLOW_CORE_SOURCES_PATH.rglob("*.py")) return py_files diff --git a/scripts/tests/ci/prek/test_check_connection_doc_labels.py b/scripts/tests/ci/prek/test_check_connection_doc_labels.py deleted file mode 100644 index 8775e732451b8..0000000000000 --- a/scripts/tests/ci/prek/test_check_connection_doc_labels.py +++ /dev/null @@ -1,44 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from check_connection_doc_labels import collect_files - - -def test_collect_files_skips_volatile_dependency_directories(tmp_path): - source_file = tmp_path / "provider" / "docs" / "connection.rst" - source_file.parent.mkdir(parents=True) - source_file.touch() - - node_modules_file = tmp_path / "ui" / "node_modules" / "package" / "docs" / "connection.rst" - node_modules_file.parent.mkdir(parents=True) - node_modules_file.touch() - - pnpm_store_file = tmp_path / "ui" / ".pnpm-store" / "package" / "docs" / "connection.rst" - pnpm_store_file.parent.mkdir(parents=True) - pnpm_store_file.touch() - - assert collect_files(tmp_path, ".rst") == [source_file] - - -def test_collect_files_matches_suffix(tmp_path): - python_file = tmp_path / "src" / "module.py" - python_file.parent.mkdir(parents=True) - python_file.touch() - (python_file.parent / "module.pyi").touch() - - assert collect_files(tmp_path, ".py") == [python_file] From 0ce69fefac1e635ac18a152f1c747f7a6d66f77c Mon Sep 17 00:00:00 2001 From: Hemkumar Chheda Date: Mon, 15 Jun 2026 21:11:14 +0530 Subject: [PATCH 11/11] chore(assets): remove unrelated local settings change --- .../airflow_local_settings.py | 169 ++++++++---------- .../versions/head/test_task_instances.py | 6 +- 2 files changed, 78 insertions(+), 97 deletions(-) diff --git a/airflow-core/src/airflow/config_templates/airflow_local_settings.py b/airflow-core/src/airflow/config_templates/airflow_local_settings.py index 4e561a1a26d67..e30e68fb17984 100644 --- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py +++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py @@ -174,19 +174,16 @@ def _default_conn_name_from(mod_path, hook_name): from airflow.providers.amazon.aws.log.s3_task_handler import S3RemoteLogIO _default_conn_name_from("airflow.providers.amazon.aws.hooks.s3", "S3Hook") - REMOTE_TASK_LOG = cast( - "RemoteLogIO | RemoteLogStreamIO", - S3RemoteLogIO( - **cast( - "dict[str, Any]", - { - "base_log_folder": BASE_LOG_FOLDER, - "remote_base": remote_base_log_folder, - "delete_local_copy": delete_local_copy, - } - | _io_kwargs, - ) - ), + REMOTE_TASK_LOG = S3RemoteLogIO( + **cast( + "dict[str, Any]", + { + "base_log_folder": BASE_LOG_FOLDER, + "remote_base": remote_base_log_folder, + "delete_local_copy": delete_local_copy, + } + | _io_kwargs, + ) ) elif remote_base_log_folder.startswith("cloudwatch://"): @@ -194,20 +191,17 @@ def _default_conn_name_from(mod_path, hook_name): _default_conn_name_from("airflow.providers.amazon.aws.hooks.logs", "AwsLogsHook") url_parts = urlsplit(remote_base_log_folder) - REMOTE_TASK_LOG = cast( - "RemoteLogIO | RemoteLogStreamIO", - CloudWatchRemoteLogIO( - **cast( - "dict[str, Any]", - { - "base_log_folder": BASE_LOG_FOLDER, - "remote_base": remote_base_log_folder, - "delete_local_copy": delete_local_copy, - "log_group_arn": url_parts.netloc + url_parts.path, - } - | _io_kwargs, - ) - ), + REMOTE_TASK_LOG = CloudWatchRemoteLogIO( + **cast( + "dict[str, Any]", + { + "base_log_folder": BASE_LOG_FOLDER, + "remote_base": remote_base_log_folder, + "delete_local_copy": delete_local_copy, + "log_group_arn": url_parts.netloc + url_parts.path, + } + | _io_kwargs, + ) ) elif remote_base_log_folder.startswith("gs://"): @@ -216,20 +210,17 @@ def _default_conn_name_from(mod_path, hook_name): _default_conn_name_from("airflow.providers.google.cloud.hooks.gcs", "GCSHook") key_path = conf.get_mandatory_value("logging", "google_key_path", fallback=None) - REMOTE_TASK_LOG = cast( - "RemoteLogIO | RemoteLogStreamIO", - GCSRemoteLogIO( - **cast( - "dict[str, Any]", - { - "base_log_folder": BASE_LOG_FOLDER, - "remote_base": remote_base_log_folder, - "delete_local_copy": delete_local_copy, - "gcp_key_path": key_path, - } - | _io_kwargs, - ) - ), + REMOTE_TASK_LOG = GCSRemoteLogIO( + **cast( + "dict[str, Any]", + { + "base_log_folder": BASE_LOG_FOLDER, + "remote_base": remote_base_log_folder, + "delete_local_copy": delete_local_copy, + "gcp_key_path": key_path, + } + | _io_kwargs, + ) ) elif remote_base_log_folder.startswith("wasb"): @@ -243,20 +234,17 @@ def _default_conn_name_from(mod_path, hook_name): # Handle both URI format (wasb://logs) and plain path (e.g., wasb-logs) wasb_remote_base = remote_base_log_folder.removeprefix("wasb://") - REMOTE_TASK_LOG = cast( - "RemoteLogIO | RemoteLogStreamIO", - WasbRemoteLogIO( - **cast( - "dict[str, Any]", - { - "base_log_folder": BASE_LOG_FOLDER, - "remote_base": wasb_remote_base, - "delete_local_copy": delete_local_copy, - "wasb_container": wasb_log_container, - } - | _io_kwargs, - ) - ), + REMOTE_TASK_LOG = WasbRemoteLogIO( + **cast( + "dict[str, Any]", + { + "base_log_folder": BASE_LOG_FOLDER, + "remote_base": wasb_remote_base, + "delete_local_copy": delete_local_copy, + "wasb_container": wasb_log_container, + } + | _io_kwargs, + ) ) elif remote_base_log_folder.startswith("stackdriver://"): @@ -266,20 +254,17 @@ def _default_conn_name_from(mod_path, hook_name): # stackdriver:///airflow-tasks => airflow-tasks log_name = urlsplit(remote_base_log_folder).path[1:] - REMOTE_TASK_LOG = cast( - "RemoteLogIO | RemoteLogStreamIO", - StackdriverRemoteLogIO( - **cast( - "dict[str, Any]", - { - "base_log_folder": BASE_LOG_FOLDER, - "gcp_log_name": log_name, - "gcp_key_path": key_path, - "delete_local_copy": delete_local_copy, - } - | _io_kwargs, - ) - ), + REMOTE_TASK_LOG = StackdriverRemoteLogIO( + **cast( + "dict[str, Any]", + { + "base_log_folder": BASE_LOG_FOLDER, + "gcp_log_name": log_name, + "gcp_key_path": key_path, + "delete_local_copy": delete_local_copy, + } + | _io_kwargs, + ) ) elif remote_base_log_folder.startswith("oss://"): @@ -287,19 +272,16 @@ def _default_conn_name_from(mod_path, hook_name): _default_conn_name_from("airflow.providers.alibaba.cloud.hooks.oss", "OSSHook") - REMOTE_TASK_LOG = cast( - "RemoteLogIO | RemoteLogStreamIO", - OSSRemoteLogIO( - **cast( - "dict[str, Any]", - { - "base_log_folder": BASE_LOG_FOLDER, - "remote_base": remote_base_log_folder, - "delete_local_copy": delete_local_copy, - } - | _io_kwargs, - ) - ), + REMOTE_TASK_LOG = OSSRemoteLogIO( + **cast( + "dict[str, Any]", + { + "base_log_folder": BASE_LOG_FOLDER, + "remote_base": remote_base_log_folder, + "delete_local_copy": delete_local_copy, + } + | _io_kwargs, + ) ) elif remote_base_log_folder.startswith("hdfs://"): @@ -307,19 +289,16 @@ def _default_conn_name_from(mod_path, hook_name): _default_conn_name_from("airflow.providers.apache.hdfs.hooks.webhdfs", "WebHDFSHook") - REMOTE_TASK_LOG = cast( - "RemoteLogIO | RemoteLogStreamIO", - HdfsRemoteLogIO( - **cast( - "dict[str, Any]", - { - "base_log_folder": BASE_LOG_FOLDER, - "remote_base": urlsplit(remote_base_log_folder).path, - "delete_local_copy": delete_local_copy, - } - | _io_kwargs, - ) - ), + REMOTE_TASK_LOG = HdfsRemoteLogIO( + **cast( + "dict[str, Any]", + { + "base_log_folder": BASE_LOG_FOLDER, + "remote_base": urlsplit(remote_base_log_folder).path, + "delete_local_copy": delete_local_copy, + } + | _io_kwargs, + ) ) elif ELASTICSEARCH_HOST: diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index 5e887293d0c3e..8c47d57195428 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -2424,7 +2424,7 @@ def test_asset_registration_failure_does_not_rollback_successful_task_state_clea ) session.commit() - backend = MetastoreStoreBackend() + backend = MetastoreBackend() scope = TaskScope(dag_id=ti.dag_id, run_id=ti.run_id, task_id=ti.task_id, map_index=ti.map_index) backend.set(scope, "job_id", "app_1234", session=session) session.commit() @@ -2468,7 +2468,9 @@ def add_event_then_fail(ti, task_outlets, outlet_events, session): ti_db = session.get(TaskInstance, ti.id) assert ti_db is not None assert ti_db.state == TaskInstanceState.SUCCESS - assert not session.scalars(select(TaskStoreModel).where(TaskStoreModel.task_id == ti.task_id)).all() + assert not session.scalars( + select(TaskStateStoreModel).where(TaskStateStoreModel.task_id == ti.task_id) + ).all() assert session.scalars(select(AssetEvent).where(AssetEvent.asset_id == asset.id)).all() == [] @pytest.mark.db_test