-
Notifications
You must be signed in to change notification settings - Fork 17.3k
fix(assets): reduce task success asset registration lock contention #66854
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
78c9c9d
c226fa9
1c8cd00
bfed2b1
5d9771e
df3b3da
3aee336
77f2e8e
fd147db
6956a81
0ce69fe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,6 +40,7 @@ | |
| from sqlalchemy.sql import select | ||
| from structlog.contextvars import bind_contextvars | ||
|
|
||
| from airflow._shared.observability.metrics import stats | ||
| from airflow._shared.observability.traces import override_ids | ||
| from airflow._shared.state import TaskScope | ||
| from airflow._shared.timezones import timezone | ||
|
|
@@ -491,6 +492,14 @@ def ti_update_state( | |
| extra=json.dumps({"host_name": hostname}) if hostname else None, | ||
| ) | ||
| ) | ||
| # Commit the TI state update now to release the task_instance row lock before | ||
| # running asset-event queries. The direct-INSERT fix in AssetManager removes | ||
| # the O(n) lazy-load on the alias-event table, but register_asset_changes_in_db | ||
| # also queries scheduled dags and inserts AssetDagRunQueue rows - all of which | ||
| # would otherwise hold the row lock and cause idle-in-transaction pile-up that | ||
| # exhausts API server memory and triggers OOMKill under high concurrency. | ||
| # The task outcome is durable from this point on. | ||
| session.commit() | ||
|
hkc-8010 marked this conversation as resolved.
|
||
| except DataError: | ||
| # Let DataErrorHandler return a 422 (not the opaque 500 below). | ||
| raise | ||
|
|
@@ -517,14 +526,44 @@ def ti_update_state( | |
| task_id=task_id, | ||
| map_index=map_index, | ||
| ) | ||
| session.commit() | ||
| except Exception: | ||
| session.rollback() | ||
| log.warning( | ||
| "Failed to clear task state on success", | ||
| dag_id=dag_id, | ||
| run_id=run_id, | ||
| task_id=task_id, | ||
| ) | ||
|
|
||
| # Asset registration runs outside the TI row lock. Failures are logged and counted; | ||
| # raising HTTP 500 here would be misleading because the task already succeeded and | ||
| # would make the worker retry a state update that has already completed. Durable | ||
| # retry/reconciliation for dropped asset events is out of scope for this hot-path fix. | ||
|
Comment on lines
+539
to
+542
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Things mentioned in this comment are obvious to anyone who can reasonably read Python code. Useless. |
||
| if ( | ||
| updated_state == TaskInstanceState.SUCCESS | ||
| and isinstance(ti_patch_payload, TISuccessStatePayload) | ||
| and ti_patch_payload.task_outlets | ||
| ): | ||
| try: | ||
| ti_for_assets = session.get(TI, task_instance_id) | ||
| if ti_for_assets is not None: | ||
| TI.register_asset_changes_in_db( | ||
| ti_for_assets, | ||
| ti_patch_payload.task_outlets, | ||
| ti_patch_payload.outlet_events, | ||
| session=session, | ||
| ) | ||
| session.commit() | ||
| except Exception: | ||
| session.rollback() | ||
| stats.incr("asset.registration_failures") | ||
| log.exception( | ||
| "Failed to register asset changes; task state is already committed", | ||
| task_instance_id=str(task_instance_id), | ||
| new_state=updated_state, | ||
| ) | ||
|
|
||
|
|
||
| def _emit_task_span(ti, state): | ||
| # just to be safe | ||
|
|
@@ -634,13 +673,7 @@ def _create_ti_state_update_query_and_update_state( | |
| retry_reason=(ti_patch_payload.retry_reason[:500] if ti_patch_payload.retry_reason else None), | ||
| ) | ||
| elif isinstance(ti_patch_payload, TISuccessStatePayload): | ||
| if ti is not None: | ||
| TI.register_asset_changes_in_db( | ||
| ti, | ||
| ti_patch_payload.task_outlets, | ||
| ti_patch_payload.outlet_events, | ||
| session=session, | ||
| ) | ||
| pass # Asset registration happens after the TI state is committed; see ti_update_state. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don’t think the comment is needed. Anyone reading this part of the code would not assume asset registration should be done here; it was done here, but there’s no reason it should be. (Honestly, it is unclear why a lot of the logic, such as creating triggers for TIDeferredStatePayload, should be done here in the first place. Maybe it should be moved out too. But this is a topic for another PR.) |
||
| try: | ||
| _emit_task_span(ti, state=updated_state) | ||
| except Exception: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,7 +22,7 @@ | |
| from typing import TYPE_CHECKING | ||
|
|
||
| import structlog | ||
| from sqlalchemy import exc, or_, select | ||
| from sqlalchemy import exc, insert, or_, select | ||
| from sqlalchemy.orm import joinedload | ||
|
|
||
| from airflow._shared.observability.metrics import stats | ||
|
|
@@ -41,6 +41,7 @@ | |
| DagScheduleAssetUriReference, | ||
| PartitionedAssetKeyLog, | ||
| TaskOutletAssetReference, | ||
| asset_alias_asset_event_association_table, | ||
| ) | ||
| from airflow.models.log import Log | ||
| from airflow.timetables.base import compute_rollup_fingerprint | ||
|
|
@@ -356,8 +357,17 @@ def register_asset_change( | |
| ).unique() | ||
|
|
||
| for asset_alias_model in asset_alias_models: | ||
| asset_alias_model.asset_events.append(asset_event) | ||
| session.add(asset_alias_model) | ||
| # Use a direct INSERT rather than ORM .append() to avoid lazy-loading the | ||
| # entire asset_events collection. On long-running deployments that collection | ||
| # can contain thousands of rows; loading it on the task-success hot path can | ||
| # leave DB connections idle-in-transaction for minutes, blocking other workers. | ||
| # This intentionally leaves asset_alias_model.asset_events unsynced in-session. | ||
|
Comment on lines
+360
to
+364
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is too much text. Arguably only the first one and a half sentences are needed. The last sentence is also unsatisfying; why is it intentional? Does it not cause other issues? Why are those other issues not relevant here? |
||
| session.execute( | ||
|
hkc-8010 marked this conversation as resolved.
|
||
| insert(asset_alias_asset_event_association_table).values( | ||
| alias_id=asset_alias_model.id, | ||
| event_id=asset_event.id, | ||
| ) | ||
| ) | ||
|
|
||
| dags_to_queue_from_asset_alias |= { | ||
| alias_ref.dag | ||
|
|
@@ -508,9 +518,9 @@ def _queue_dagruns( | |
| # constraint violation. | ||
| # | ||
| # If we support it, use ON CONFLICT to do nothing, otherwise | ||
| # "fallback" to running this in a nested transaction. This is needed | ||
| # so that the adding of these rows happens in the same transaction | ||
| # where `ti.state` is changed. | ||
| # "fallback" to running this in a nested transaction. Some callers | ||
| # run this as part of a TI state transaction; the Execution API commits | ||
| # the TI state first, then runs asset registration in a separate transaction. | ||
| if get_dialect_name(session) == "postgresql": | ||
| return cls._queue_dagruns_nonpartitioned_postgres(asset_id, non_partitioned_dags, session) | ||
| return cls._queue_dagruns_nonpartitioned_slow_path(asset_id, non_partitioned_dags, session) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same, much too long.
As a general rule, you should review all comments generated by an AI agent. Please do due diligence in the future and not shift the responsibility to reviewers.