Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from sqlalchemy.sql import select
from structlog.contextvars import bind_contextvars

from airflow._shared.observability.metrics import stats
from airflow._shared.observability.traces import override_ids
from airflow._shared.state import TaskScope
from airflow._shared.timezones import timezone
Expand Down Expand Up @@ -491,6 +492,14 @@ def ti_update_state(
extra=json.dumps({"host_name": hostname}) if hostname else None,
)
)
# Commit the TI state update now to release the task_instance row lock before
# running asset-event queries. The direct-INSERT fix in AssetManager removes
# the O(n) lazy-load on the alias-event table, but register_asset_changes_in_db
# also queries scheduled dags and inserts AssetDagRunQueue rows - all of which
# would otherwise hold the row lock and cause idle-in-transaction pile-up that
# exhausts API server memory and triggers OOMKill under high concurrency.
# The task outcome is durable from this point on.
Comment on lines +495 to +501

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, much too long.

As a general rule, you should review all comments generated by an AI agent. Please do due diligence in the future and not shift the responsibility to reviewers.

session.commit()
Comment thread
hkc-8010 marked this conversation as resolved.
except DataError:
# Let DataErrorHandler return a 422 (not the opaque 500 below).
raise
Expand All @@ -517,14 +526,44 @@ def ti_update_state(
task_id=task_id,
map_index=map_index,
)
session.commit()
except Exception:
session.rollback()
log.warning(
"Failed to clear task state on success",
dag_id=dag_id,
run_id=run_id,
task_id=task_id,
)

# Asset registration runs outside the TI row lock. Failures are logged and counted;
# raising HTTP 500 here would be misleading because the task already succeeded and
# would make the worker retry a state update that has already completed. Durable
# retry/reconciliation for dropped asset events is out of scope for this hot-path fix.
Comment on lines +539 to +542

@uranusjr uranusjr Jun 23, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Things mentioned in this comment are obvious to anyone who can reasonably read Python code. Useless.

if (
updated_state == TaskInstanceState.SUCCESS
and isinstance(ti_patch_payload, TISuccessStatePayload)
and ti_patch_payload.task_outlets
):
try:
ti_for_assets = session.get(TI, task_instance_id)
if ti_for_assets is not None:
TI.register_asset_changes_in_db(
ti_for_assets,
ti_patch_payload.task_outlets,
ti_patch_payload.outlet_events,
session=session,
)
session.commit()
except Exception:
session.rollback()
stats.incr("asset.registration_failures")
log.exception(
"Failed to register asset changes; task state is already committed",
task_instance_id=str(task_instance_id),
new_state=updated_state,
)


def _emit_task_span(ti, state):
# just to be safe
Expand Down Expand Up @@ -634,13 +673,7 @@ def _create_ti_state_update_query_and_update_state(
retry_reason=(ti_patch_payload.retry_reason[:500] if ti_patch_payload.retry_reason else None),
)
elif isinstance(ti_patch_payload, TISuccessStatePayload):
if ti is not None:
TI.register_asset_changes_in_db(
ti,
ti_patch_payload.task_outlets,
ti_patch_payload.outlet_events,
session=session,
)
pass # Asset registration happens after the TI state is committed; see ti_update_state.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think the comment is needed. Anyone reading this part of the code would not assume asset registration should be done here; it was done here, but there’s no reason it should be.

(Honestly, it is unclear why a lot of the logic, such as creating triggers for TIDeferredStatePayload, should be done here in the first place. Maybe it should be moved out too. But this is a topic for another PR.)

try:
_emit_task_span(ti, state=updated_state)
except Exception:
Expand Down
22 changes: 16 additions & 6 deletions airflow-core/src/airflow/assets/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from typing import TYPE_CHECKING

import structlog
from sqlalchemy import exc, or_, select
from sqlalchemy import exc, insert, or_, select
from sqlalchemy.orm import joinedload

from airflow._shared.observability.metrics import stats
Expand All @@ -41,6 +41,7 @@
DagScheduleAssetUriReference,
PartitionedAssetKeyLog,
TaskOutletAssetReference,
asset_alias_asset_event_association_table,
)
from airflow.models.log import Log
from airflow.timetables.base import compute_rollup_fingerprint
Expand Down Expand Up @@ -356,8 +357,17 @@ def register_asset_change(
).unique()

for asset_alias_model in asset_alias_models:
asset_alias_model.asset_events.append(asset_event)
session.add(asset_alias_model)
# Use a direct INSERT rather than ORM .append() to avoid lazy-loading the
# entire asset_events collection. On long-running deployments that collection
# can contain thousands of rows; loading it on the task-success hot path can
# leave DB connections idle-in-transaction for minutes, blocking other workers.
# This intentionally leaves asset_alias_model.asset_events unsynced in-session.
Comment on lines +360 to +364

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too much text. Arguably only the first one and a half sentences are needed. The last sentence is also unsatisfying; why is it intentional? Does it not cause other issues? Why are those other issues not relevant here?

session.execute(
Comment thread
hkc-8010 marked this conversation as resolved.
insert(asset_alias_asset_event_association_table).values(
alias_id=asset_alias_model.id,
event_id=asset_event.id,
)
)

dags_to_queue_from_asset_alias |= {
alias_ref.dag
Expand Down Expand Up @@ -508,9 +518,9 @@ def _queue_dagruns(
# constraint violation.
#
# If we support it, use ON CONFLICT to do nothing, otherwise
# "fallback" to running this in a nested transaction. This is needed
# so that the adding of these rows happens in the same transaction
# where `ti.state` is changed.
# "fallback" to running this in a nested transaction. Some callers
# run this as part of a TI state transaction; the Execution API commits
# the TI state first, then runs asset registration in a separate transaction.
if get_dialect_name(session) == "postgresql":
return cls._queue_dagruns_nonpartitioned_postgres(asset_id, non_partitioned_dags, session)
return cls._queue_dagruns_nonpartitioned_slow_path(asset_id, non_partitioned_dags, session)
Expand Down
Loading
Loading