Skip to content

fix(assets): reduce task success asset registration lock contention#66854

Open
hkc-8010 wants to merge 11 commits into
apache:mainfrom
hkc-8010:fix/asset-event-lock-contention
Open

fix(assets): reduce task success asset registration lock contention#66854
hkc-8010 wants to merge 11 commits into
apache:mainfrom
hkc-8010:fix/asset-event-lock-contention

Conversation

@hkc-8010

@hkc-8010 hkc-8010 commented May 13, 2026

Copy link
Copy Markdown
Contributor

Closes #66853

Summary

  • Commit the task-instance state update and log entry before asset registration in ti_update_state(), releasing the task_instance row lock before asset scheduling work runs.
  • Replace the alias-event ORM relationship append with a direct association-table insert so long-running aliases do not lazy-load large asset_events collections on the task completion path.
  • Keep HTTP 204 after task success is durable if post-commit asset registration fails, but log the failure and increment asset.registration_failures so dropped registration work is observable. Durable retry/reconciliation is intentionally out of scope for this lock-contention fix.
  • Gate post-commit asset registration on the committed outcome, so a success payload that is forced into a durable FAILED task-instance state does not emit asset events.

Evidence

Sanitized measurements from the affected deployment showed the task completion path under load with 80+ concurrent successful task completions, asset-registration queries left idle in transaction for minutes, blocked SELECT ... FOR UPDATE calls on task_instance, and apiserver OOMKills while those requests piled up.

Changes

  • airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py: early task-state commit, post-commit asset registration, committed-state guard, and asset.registration_failures metric on registration failure.
  • airflow-core/src/airflow/assets/manager.py: direct alias association insert, with an explicit note that asset_alias_model.asset_events is intentionally left unsynced in the current session because this path does not read it again before commit.
  • airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py: regression coverage for 204 task-state durability, asset-registration failure metric emission, and the forced-failure path that commits FAILED and skips asset registration.
  • airflow-core/tests/unit/assets/test_manager.py: engine-level SQL listener coverage proving alias registration does not emit a lazy-load SELECT on the alias-event association path. This was red-checked locally against the old ORM .append() implementation.
  • shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml: metric registry entry for asset.registration_failures.
  • scripts/ci/prek/check_connection_doc_labels.py: skip volatile generated dependency directories while scanning source/docs for connection labels, so all-files static checks do not race with UI node_modules churn.

Validation

  • prek run --files airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py airflow-core/src/airflow/assets/manager.py airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py airflow-core/tests/unit/assets/test_manager.py scripts/ci/prek/check_connection_doc_labels.py scripts/tests/ci/prek/test_check_connection_doc_labels.py shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml airflow-core/src/airflow/config_templates/airflow_local_settings.py scripts/ci/prek/check_partition_mapper_defaults_in_sync.py
  • prek run --from-ref upstream/main --to-ref HEAD (substantive hooks passed; the workspace identity hook reports the checked file list as modified when run over this uncommitted PR range)
  • .venv/bin/python scripts/ci/prek/run_mypy_full_dist_local_venv_or_breeze_in_ci.py airflow-core
  • uv run --frozen --no-sync pytest --with-db-init airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py airflow-core/tests/unit/assets/test_manager.py -q
  • PYTHONPATH=scripts/ci/prek uv run --frozen --no-sync pytest scripts/tests/ci/prek/test_check_connection_doc_labels.py -q
  • uv run --no-sync scripts/ci/prek/check_connection_doc_labels.py
  • prek run check-connection-doc-labels --all-files
  • breeze testing core-tests --backend sqlite --python 3.10 --db-reset -- airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py airflow-core/tests/unit/assets/test_manager.py -q
  • breeze testing core-tests --backend postgres --python 3.10 --db-reset -- airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py airflow-core/tests/unit/assets/test_manager.py -q
  • breeze testing core-tests --backend postgres --python 3.10 --downgrade-pendulum --db-reset -- airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py airflow-core/tests/unit/assets/test_manager.py -q
  • breeze testing core-tests --backend sqlite --python 3.10 --force-lowest-dependencies --db-reset -- airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py airflow-core/tests/unit/assets/test_manager.py -q
  • git diff --check

PR Checklist

  • My PR is targeted at the main branch
  • Tests added/updated
  • Targeted prek, mypy, pytest, and Breeze validation passed locally

@boring-cyborg boring-cyborg Bot added area:API Airflow's REST/HTTP API area:task-sdk labels May 13, 2026
hkc-8010 added a commit to hkc-8010/my-airflow-repository that referenced this pull request May 13, 2026
@hkc-8010 hkc-8010 marked this pull request as ready for review May 13, 2026 11:41
@hkc-8010

Copy link
Copy Markdown
Contributor Author

The failing CI job (provider distributions tests / Compat 3.0.6:P3.10) is pre-existing on main and unrelated to this PR. It fails with:

ImportError: cannot import name 'Options' from 'jwt.types'

This is a flask_jwt_extended/PyJWT version incompatibility in providers/fab tests running against Airflow 3.0.6. Confirmed failing on main in run https://github.com/apache/airflow/actions/runs/25789005777 before this PR was opened.

All other CI checks pass.

Lee-W pushed a commit to hkc-8010/my-airflow-repository that referenced this pull request May 14, 2026
@Lee-W Lee-W force-pushed the fix/asset-event-lock-contention branch from 98668e8 to e8bca3b Compare May 14, 2026 09:07
@hkc-8010 hkc-8010 force-pushed the fix/asset-event-lock-contention branch from e8bca3b to c8086d2 Compare May 15, 2026 04:57
hkc-8010 added a commit to hkc-8010/my-airflow-repository that referenced this pull request May 15, 2026
@hkc-8010

hkc-8010 commented May 15, 2026

Copy link
Copy Markdown
Contributor Author

The CI failure in Integration and System Tests / Integration core otel (test_export_legacy_metric_names) is unrelated to this PR — it tests scheduler-side metric emission timing and has a history of flakiness (see #61070, #65867). This PR touches only task_instances.py and assets/manager.py; no OTEL or scheduler metric code is modified.

@choo121600 choo121600 added the ready for maintainer review Set after triaging when all criteria pass. label May 15, 2026
Comment thread airflow-core/tests/unit/assets/test_manager.py Outdated
@hkc-8010 hkc-8010 requested a review from Lee-W May 19, 2026 15:11
@potiuk potiuk removed the ready for maintainer review Set after triaging when all criteria pass. label May 24, 2026
@eladkal eladkal added this to the Airflow 3.2.3 milestone May 25, 2026
@potiuk potiuk marked this pull request as draft May 26, 2026 00:46
@potiuk

potiuk commented May 26, 2026

Copy link
Copy Markdown
Member

@hkc-8010 Converting to draft — this PR doesn't yet meet our Pull Request quality criteria.

  • Pre-commit / static checks. See docs.
  • Unresolved review comments: 2 thread(s). See docs.

See the linked criteria for how to fix each item, then mark the PR "Ready for review". This is not a rejection — just an invitation to bring the PR up to standard. No rush.


Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you.


Drafted-by: Claude Code (Opus 4.7); reviewed by @potiuk before posting

@hkc-8010 hkc-8010 force-pushed the fix/asset-event-lock-contention branch from f6486ea to 3cca00a Compare May 27, 2026 12:25
hkc-8010 added a commit to hkc-8010/my-airflow-repository that referenced this pull request May 27, 2026
@hkc-8010 hkc-8010 force-pushed the fix/asset-event-lock-contention branch from 3cca00a to 280a86a Compare May 27, 2026 18:11
hkc-8010 added a commit to hkc-8010/my-airflow-repository that referenced this pull request May 27, 2026
Comment thread airflow-core/src/airflow/assets/manager.py
@hkc-8010 hkc-8010 force-pushed the fix/asset-event-lock-contention branch from 280a86a to 4673ff3 Compare June 4, 2026 03:34
Comment thread airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py Outdated
Comment thread airflow-core/tests/unit/assets/test_manager.py Outdated
@potiuk

potiuk commented Jun 9, 2026

Copy link
Copy Markdown
Member

@hkc-8010 — There are 2 unresolved review thread(s) on this PR from @Lee-W, @kaxil. Could you either push a fix or reply in each thread explaining why the feedback doesn't apply? When you believe the feedback is addressed, please mark the threads as resolved and ping the reviewer (@Lee-W, @kaxil) for a final look. Thanks!

Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you.

@hkc-8010 hkc-8010 force-pushed the fix/asset-event-lock-contention branch from ab90c26 to c3e3e7c Compare June 9, 2026 12:19
hkc-8010 added a commit to hkc-8010/my-airflow-repository that referenced this pull request Jun 9, 2026
@hkc-8010 hkc-8010 requested a review from kaxil June 9, 2026 14:03
@hkc-8010

Copy link
Copy Markdown
Contributor Author

@potiuk thanks for the triage notes. The items called out in your comments should be addressed now:

  • The unresolved review threads from @Lee-W / @kaxil have been replied to and resolved.
  • The PR is no longer draft.
  • The latest pushed commit is c3e3e7cb98a, and the visible GitHub checks are green: 131 passing, 0 failing, 0 pending, 14 skipped.

Could you please take another look at the PR when you get a chance?

Comment thread airflow-core/src/airflow/config_templates/airflow_local_settings.py Outdated
Comment thread scripts/ci/prek/check_connection_doc_labels.py Outdated
Comment thread scripts/ci/prek/check_partition_mapper_defaults_in_sync.py Outdated
hkc-8010 added a commit to hkc-8010/my-airflow-repository that referenced this pull request Jun 12, 2026
@hkc-8010 hkc-8010 force-pushed the fix/asset-event-lock-contention branch from c3e3e7c to 4b38817 Compare June 12, 2026 11:17
hkc-8010 and others added 11 commits June 15, 2026 20:45
Under high concurrency (80+ simultaneous task completions emitting asset
events), the API server was OOMKilled due to idle-in-transaction DB
lock pile-up. Root cause: ti_update_state held a SELECT...FOR UPDATE row
lock on task_instance while AssetManager.register_asset_change() ran
multiple slow queries, including an ORM .append() that lazy-loaded the
entire asset_events collection (potentially thousands of rows).

Two fixes:

1. In AssetManager.register_asset_change(), replace
   asset_alias_model.asset_events.append(asset_event) with a direct
   INSERT into asset_alias_asset_event. This avoids loading the full
   relationship collection while the row lock is held.

2. In ti_update_state(), add session.commit() after the TI state UPDATE
   and Log writes to release the task_instance row lock before running
   asset registration. Asset registration then runs outside the lock in
   a fresh implicit transaction. Registration failures are logged and
   swallowed -- the task state is already durable at that point.

Note: session.commit() inside a session-parameter function is an
intentional deviation from the CLAUDE.md convention. No code after the
commit relies on rollback; the subsequent session.get() re-loads fresh
state. Alternative approaches (second session, background task) were
considered but have higher operational complexity for equivalent
correctness.

Production evidence: connections idle-in-transaction for 3+ minutes on
asset_alias queries, blocking SELECT task_instance FOR UPDATE across 8
concurrent workers. Disabling the trigger DAGs dropped apiserver memory
from 5Gi+ to MBs instantly.
Move `insert` from inline method import to the top-level sqlalchemy
import block and drop the unnecessary `sa_insert` alias. Improve the
session.commit() comment to explain why the early commit is still
needed after the direct-INSERT alias-side fix, and clarify that the
post-commit exception swallow is intentional.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
@hkc-8010 hkc-8010 force-pushed the fix/asset-event-lock-contention branch from 4b38817 to 0ce69fe Compare June 15, 2026 15:47
@eladkal eladkal requested a review from Lee-W June 15, 2026 16:08
@potiuk potiuk added the ready for maintainer review Set after triaging when all criteria pass. label Jun 17, 2026
ti_patch_payload.outlet_events,
session=session,
)
pass # Asset registration happens after the TI state is committed; see ti_update_state.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think the comment is needed. Anyone reading this part of the code would not assume asset registration should be done here; it was done here, but there’s no reason it should be.

(Honestly, it is unclear why a lot of the logic, such as creating triggers for TIDeferredStatePayload, should be done here in the first place. Maybe it should be moved out too. But this is a topic for another PR.)

Comment on lines +360 to +364
# Use a direct INSERT rather than ORM .append() to avoid lazy-loading the
# entire asset_events collection. On long-running deployments that collection
# can contain thousands of rows; loading it on the task-success hot path can
# leave DB connections idle-in-transaction for minutes, blocking other workers.
# This intentionally leaves asset_alias_model.asset_events unsynced in-session.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too much text. Arguably only the first one and a half sentences are needed. The last sentence is also unsatisfying; why is it intentional? Does it not cause other issues? Why are those other issues not relevant here?

Comment on lines +539 to +542
# Asset registration runs outside the TI row lock. Failures are logged and counted;
# raising HTTP 500 here would be misleading because the task already succeeded and
# would make the worker retry a state update that has already completed. Durable
# retry/reconciliation for dropped asset events is out of scope for this hot-path fix.

@uranusjr uranusjr Jun 23, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Things mentioned in this comment are obvious to anyone who can reasonably read Python code. Useless.

Comment on lines +495 to +501
# Commit the TI state update now to release the task_instance row lock before
# running asset-event queries. The direct-INSERT fix in AssetManager removes
# the O(n) lazy-load on the alias-event table, but register_asset_changes_in_db
# also queries scheduled dags and inserts AssetDagRunQueue rows - all of which
# would otherwise hold the row lock and cause idle-in-transaction pile-up that
# exhausts API server memory and triggers OOMKill under high concurrency.
# The task outcome is durable from this point on.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, much too long.

As a general rule, you should review all comments generated by an AI agent. Please do due diligence in the future and not shift the responsibility to reviewers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API area:task-sdk ready for maintainer review Set after triaging when all criteria pass. type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

API server OOMKill: task_instance row lock held during asset event emission under high concurrency

9 participants