fix(assets): reduce task success asset registration lock contention#66854
fix(assets): reduce task success asset registration lock contention#66854hkc-8010 wants to merge 11 commits into
Conversation
|
The failing CI job ( This is a All other CI checks pass. |
98668e8 to
e8bca3b
Compare
e8bca3b to
c8086d2
Compare
|
The CI failure in |
|
@hkc-8010 Converting to draft — this PR doesn't yet meet our Pull Request quality criteria. See the linked criteria for how to fix each item, then mark the PR "Ready for review". This is not a rejection — just an invitation to bring the PR up to standard. No rush. Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. Drafted-by: Claude Code (Opus 4.7); reviewed by @potiuk before posting |
f6486ea to
3cca00a
Compare
3cca00a to
280a86a
Compare
280a86a to
4673ff3
Compare
|
@hkc-8010 — There are 2 unresolved review thread(s) on this PR from @Lee-W, @kaxil. Could you either push a fix or reply in each thread explaining why the feedback doesn't apply? When you believe the feedback is addressed, please mark the threads as resolved and ping the reviewer (@Lee-W, @kaxil) for a final look. Thanks! Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. |
ab90c26 to
c3e3e7c
Compare
|
@potiuk thanks for the triage notes. The items called out in your comments should be addressed now:
Could you please take another look at the PR when you get a chance? |
c3e3e7c to
4b38817
Compare
Under high concurrency (80+ simultaneous task completions emitting asset events), the API server was OOMKilled due to idle-in-transaction DB lock pile-up. Root cause: ti_update_state held a SELECT...FOR UPDATE row lock on task_instance while AssetManager.register_asset_change() ran multiple slow queries, including an ORM .append() that lazy-loaded the entire asset_events collection (potentially thousands of rows). Two fixes: 1. In AssetManager.register_asset_change(), replace asset_alias_model.asset_events.append(asset_event) with a direct INSERT into asset_alias_asset_event. This avoids loading the full relationship collection while the row lock is held. 2. In ti_update_state(), add session.commit() after the TI state UPDATE and Log writes to release the task_instance row lock before running asset registration. Asset registration then runs outside the lock in a fresh implicit transaction. Registration failures are logged and swallowed -- the task state is already durable at that point. Note: session.commit() inside a session-parameter function is an intentional deviation from the CLAUDE.md convention. No code after the commit relies on rollback; the subsequent session.get() re-loads fresh state. Alternative approaches (second session, background task) were considered but have higher operational complexity for equivalent correctness. Production evidence: connections idle-in-transaction for 3+ minutes on asset_alias queries, blocking SELECT task_instance FOR UPDATE across 8 concurrent workers. Disabling the trigger DAGs dropped apiserver memory from 5Gi+ to MBs instantly.
Move `insert` from inline method import to the top-level sqlalchemy import block and drop the unnecessary `sa_insert` alias. Improve the session.commit() comment to explain why the early commit is still needed after the direct-INSERT alias-side fix, and clarify that the post-commit exception swallow is intentional. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
4b38817 to
0ce69fe
Compare
| ti_patch_payload.outlet_events, | ||
| session=session, | ||
| ) | ||
| pass # Asset registration happens after the TI state is committed; see ti_update_state. |
There was a problem hiding this comment.
I don’t think the comment is needed. Anyone reading this part of the code would not assume asset registration should be done here; it was done here, but there’s no reason it should be.
(Honestly, it is unclear why a lot of the logic, such as creating triggers for TIDeferredStatePayload, should be done here in the first place. Maybe it should be moved out too. But this is a topic for another PR.)
| # Use a direct INSERT rather than ORM .append() to avoid lazy-loading the | ||
| # entire asset_events collection. On long-running deployments that collection | ||
| # can contain thousands of rows; loading it on the task-success hot path can | ||
| # leave DB connections idle-in-transaction for minutes, blocking other workers. | ||
| # This intentionally leaves asset_alias_model.asset_events unsynced in-session. |
There was a problem hiding this comment.
This is too much text. Arguably only the first one and a half sentences are needed. The last sentence is also unsatisfying; why is it intentional? Does it not cause other issues? Why are those other issues not relevant here?
| # Asset registration runs outside the TI row lock. Failures are logged and counted; | ||
| # raising HTTP 500 here would be misleading because the task already succeeded and | ||
| # would make the worker retry a state update that has already completed. Durable | ||
| # retry/reconciliation for dropped asset events is out of scope for this hot-path fix. |
There was a problem hiding this comment.
Things mentioned in this comment are obvious to anyone who can reasonably read Python code. Useless.
| # Commit the TI state update now to release the task_instance row lock before | ||
| # running asset-event queries. The direct-INSERT fix in AssetManager removes | ||
| # the O(n) lazy-load on the alias-event table, but register_asset_changes_in_db | ||
| # also queries scheduled dags and inserts AssetDagRunQueue rows - all of which | ||
| # would otherwise hold the row lock and cause idle-in-transaction pile-up that | ||
| # exhausts API server memory and triggers OOMKill under high concurrency. | ||
| # The task outcome is durable from this point on. |
There was a problem hiding this comment.
Same, much too long.
As a general rule, you should review all comments generated by an AI agent. Please do due diligence in the future and not shift the responsibility to reviewers.
Closes #66853
Summary
ti_update_state(), releasing thetask_instancerow lock before asset scheduling work runs.asset_eventscollections on the task completion path.asset.registration_failuresso dropped registration work is observable. Durable retry/reconciliation is intentionally out of scope for this lock-contention fix.FAILEDtask-instance state does not emit asset events.Evidence
Sanitized measurements from the affected deployment showed the task completion path under load with 80+ concurrent successful task completions, asset-registration queries left idle in transaction for minutes, blocked
SELECT ... FOR UPDATEcalls ontask_instance, and apiserver OOMKills while those requests piled up.Changes
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py: early task-state commit, post-commit asset registration, committed-state guard, andasset.registration_failuresmetric on registration failure.airflow-core/src/airflow/assets/manager.py: direct alias association insert, with an explicit note thatasset_alias_model.asset_eventsis intentionally left unsynced in the current session because this path does not read it again before commit.airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py: regression coverage for 204 task-state durability, asset-registration failure metric emission, and the forced-failure path that commitsFAILEDand skips asset registration.airflow-core/tests/unit/assets/test_manager.py: engine-level SQL listener coverage proving alias registration does not emit a lazy-loadSELECTon the alias-event association path. This was red-checked locally against the old ORM.append()implementation.shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml: metric registry entry forasset.registration_failures.scripts/ci/prek/check_connection_doc_labels.py: skip volatile generated dependency directories while scanning source/docs for connection labels, so all-files static checks do not race with UInode_moduleschurn.Validation
prek run --files airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py airflow-core/src/airflow/assets/manager.py airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py airflow-core/tests/unit/assets/test_manager.py scripts/ci/prek/check_connection_doc_labels.py scripts/tests/ci/prek/test_check_connection_doc_labels.py shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml airflow-core/src/airflow/config_templates/airflow_local_settings.py scripts/ci/prek/check_partition_mapper_defaults_in_sync.pyprek run --from-ref upstream/main --to-ref HEAD(substantive hooks passed; the workspaceidentityhook reports the checked file list as modified when run over this uncommitted PR range).venv/bin/python scripts/ci/prek/run_mypy_full_dist_local_venv_or_breeze_in_ci.py airflow-coreuv run --frozen --no-sync pytest --with-db-init airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py airflow-core/tests/unit/assets/test_manager.py -qPYTHONPATH=scripts/ci/prek uv run --frozen --no-sync pytest scripts/tests/ci/prek/test_check_connection_doc_labels.py -quv run --no-sync scripts/ci/prek/check_connection_doc_labels.pyprek run check-connection-doc-labels --all-filesbreeze testing core-tests --backend sqlite --python 3.10 --db-reset -- airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py airflow-core/tests/unit/assets/test_manager.py -qbreeze testing core-tests --backend postgres --python 3.10 --db-reset -- airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py airflow-core/tests/unit/assets/test_manager.py -qbreeze testing core-tests --backend postgres --python 3.10 --downgrade-pendulum --db-reset -- airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py airflow-core/tests/unit/assets/test_manager.py -qbreeze testing core-tests --backend sqlite --python 3.10 --force-lowest-dependencies --db-reset -- airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py airflow-core/tests/unit/assets/test_manager.py -qgit diff --checkPR Checklist
mainbranchprek, mypy, pytest, and Breeze validation passed locally