Skip to content

Avoid duplicate trigger-rule upstream-count queries per scheduling pass#67672

Open
kaxil wants to merge 3 commits into
apache:mainfrom
astronomer:trigger-rule-upstream-count-memo
Open

Avoid duplicate trigger-rule upstream-count queries per scheduling pass#67672
kaxil wants to merge 3 commits into
apache:mainfrom
astronomer:trigger-rule-upstream-count-memo

Conversation

@kaxil

@kaxil kaxil commented May 29, 2026

Copy link
Copy Markdown
Member

Summary

TriggerRuleDep sizes a task's upstream set with a SELECT task_id, count(*) ... GROUP BY task_id, and it only runs that when one of the upstreams is mapped. The problem: if several downstream tasks all depend on the same mapped upstream, each of them runs the identical query during the same scheduling pass. This caches the result on the DepContext for the duration of a pass, so it runs once per distinct upstream set instead of once per downstream.

Impact

The win scales with how many downstream tasks sit behind a mapped upstream. Measured on a DAG with one mapped upstream (5 instances) feeding 60 plain downstream tasks, counting just this query during one scheduling pass:

upstream-count queries per pass
before 60
after 1

That count recurs every update_state pass, so across a run it's roughly downstreams x passes queries collapsing to passes. Each query is individually cheap (sub-millisecond), so this is about cutting database round-trips, not query time. It happens in the per-run scheduling work, not the serialized critical section, so it's a latency win for mapping-heavy DAGs rather than a throughput-ceiling change.

How it works

  • Key is (dag_id, run_id, frozenset(upstream_task_ids)), stored on DepContext, which is already built once per _get_ready_tis pass and reused for every TI in that pass (the same object that caches finished_tis).
  • It only kicks in when the task isn't inside a mapped task group. That's the branch where the predicate is plain task_id IN (upstream_ids) and comes out the same for every downstream with the same upstreams. Tasks inside a mapped task group have per map-index predicates, so they keep running their own query.
  • upstream_setup is still summed in the caller from the cached rows, so the setup count stays right per downstream.

Staleness

It follows finished_tis (a per-pass snapshot). Two things change a mapped task's instance count mid-pass and the memo is dropped for both: expansion of an unexpanded task, and _revise_map_indexes_if_mapped growing an already-expanded one (both in _get_ready_tis). So a downstream evaluated later in the same pass recomputes against the current count. State-only changes (a task finishing, or instances marked REMOVED) don't change the row count the memo is keyed on, so they need no invalidation.

TriggerRuleDep runs a `SELECT task_id, count(*) ... GROUP BY task_id` per
downstream task to size its upstream set, but only when an upstream is mapped.
When many downstreams share the same mapped upstream, each issues an identical
query within the same scheduling pass.

Memoize the result on DepContext (one scheduling pass, same lifetime as
finished_tis), keyed by (dag_id, run_id, frozenset of direct-upstream task_ids).
Only the simple case is cached, where the predicate is exactly
`task_id IN (upstream_ids)`; downstreams inside a mapped task group keep their
own per-instance map-index query. The cache is cleared in _get_ready_tis when a
mapped task expands and changes its instance count.
@kaxil kaxil added the full tests needed We need to run full set of tests for this PR to merge label May 29, 2026
@kaxil kaxil added this to the Airflow 3.3.0 milestone May 29, 2026
@kaxil kaxil requested a review from uranusjr May 29, 2026 00:53
@kaxil kaxil marked this pull request as ready for review May 30, 2026 00:47
@kaxil kaxil requested review from XD-DENG and ashb as code owners May 30, 2026 00:47
@kaxil kaxil requested a review from gopidesupavan June 2, 2026 08:58
_get_ready_tis already drops the trigger-rule upstream-count memo when a mapped
task expands, but not when _revise_map_indexes_if_mapped grows an already-expanded
task and adds new instances. A downstream evaluated later in the same pass could
then read a stale, pre-grow count. Clear the memo in that case too, and add a
regression test that drives _get_ready_tis with a fixed order so the count is
recomputed after the grow (it asserts the query runs twice; fails without the clear).
@vatsrahul1001

Copy link
Copy Markdown
Contributor

@kaxil static checks failing. Do we need this in 3.3.0?

Comment on lines -1527 to +1538
ready_tis.extend(
revised_tis = list(
self._revise_map_indexes_if_mapped(
schedulable.task, dag_version_id=schedulable.dag_version_id, session=session
)
)
ready_tis.extend(revised_tis)
revised_map_index_task_ids.add(schedulable.task.task_id)
if revised_tis:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if the size of revised_tis would be significant. We could do

prev_ti_size = len(ready_tis)
ready_tis.extend(self._revise_map_indexes_if_mapped(...)
if len(ready_tis) > prev_ti_size:
    ...

to avoid building a list.

@kaxil kaxil modified the milestones: Airflow 3.3.0, Airflow 3.3.1 Jun 23, 2026
@kaxil

kaxil commented Jun 23, 2026

Copy link
Copy Markdown
Member Author

@kaxil static checks failing. Do we need this in 3.3.0?

It can wait for 3.3.1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

full tests needed We need to run full set of tests for this PR to merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants