Avoid duplicate trigger-rule upstream-count queries per scheduling pass#67672
Open
kaxil wants to merge 3 commits into
Open
Avoid duplicate trigger-rule upstream-count queries per scheduling pass#67672kaxil wants to merge 3 commits into
kaxil wants to merge 3 commits into
Conversation
TriggerRuleDep runs a `SELECT task_id, count(*) ... GROUP BY task_id` per downstream task to size its upstream set, but only when an upstream is mapped. When many downstreams share the same mapped upstream, each issues an identical query within the same scheduling pass. Memoize the result on DepContext (one scheduling pass, same lifetime as finished_tis), keyed by (dag_id, run_id, frozenset of direct-upstream task_ids). Only the simple case is cached, where the predicate is exactly `task_id IN (upstream_ids)`; downstreams inside a mapped task group keep their own per-instance map-index query. The cache is cleared in _get_ready_tis when a mapped task expands and changes its instance count.
_get_ready_tis already drops the trigger-rule upstream-count memo when a mapped task expands, but not when _revise_map_indexes_if_mapped grows an already-expanded task and adds new instances. A downstream evaluated later in the same pass could then read a stale, pre-grow count. Clear the memo in that case too, and add a regression test that drives _get_ready_tis with a fixed order so the count is recomputed after the grow (it asserts the query runs twice; fails without the clear).
Contributor
|
@kaxil static checks failing. Do we need this in 3.3.0? |
uranusjr
reviewed
Jun 23, 2026
Comment on lines
-1527
to
+1538
| ready_tis.extend( | ||
| revised_tis = list( | ||
| self._revise_map_indexes_if_mapped( | ||
| schedulable.task, dag_version_id=schedulable.dag_version_id, session=session | ||
| ) | ||
| ) | ||
| ready_tis.extend(revised_tis) | ||
| revised_map_index_task_ids.add(schedulable.task.task_id) | ||
| if revised_tis: |
Member
There was a problem hiding this comment.
Not sure if the size of revised_tis would be significant. We could do
prev_ti_size = len(ready_tis)
ready_tis.extend(self._revise_map_indexes_if_mapped(...)
if len(ready_tis) > prev_ti_size:
...to avoid building a list.
uranusjr
approved these changes
Jun 23, 2026
Member
Author
It can wait for 3.3.1 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
TriggerRuleDepsizes a task's upstream set with aSELECT task_id, count(*) ... GROUP BY task_id, and it only runs that when one of the upstreams is mapped. The problem: if several downstream tasks all depend on the same mapped upstream, each of them runs the identical query during the same scheduling pass. This caches the result on theDepContextfor the duration of a pass, so it runs once per distinct upstream set instead of once per downstream.Impact
The win scales with how many downstream tasks sit behind a mapped upstream. Measured on a DAG with one mapped upstream (5 instances) feeding 60 plain downstream tasks, counting just this query during one scheduling pass:
That count recurs every
update_statepass, so across a run it's roughlydownstreams x passesqueries collapsing topasses. Each query is individually cheap (sub-millisecond), so this is about cutting database round-trips, not query time. It happens in the per-run scheduling work, not the serialized critical section, so it's a latency win for mapping-heavy DAGs rather than a throughput-ceiling change.How it works
(dag_id, run_id, frozenset(upstream_task_ids)), stored onDepContext, which is already built once per_get_ready_tispass and reused for every TI in that pass (the same object that cachesfinished_tis).task_id IN (upstream_ids)and comes out the same for every downstream with the same upstreams. Tasks inside a mapped task group have per map-index predicates, so they keep running their own query.upstream_setupis still summed in the caller from the cached rows, so the setup count stays right per downstream.Staleness
It follows
finished_tis(a per-pass snapshot). Two things change a mapped task's instance count mid-pass and the memo is dropped for both: expansion of an unexpanded task, and_revise_map_indexes_if_mappedgrowing an already-expanded one (both in_get_ready_tis). So a downstream evaluated later in the same pass recomputes against the current count. State-only changes (a task finishing, or instances marked REMOVED) don't change the row count the memo is keyed on, so they need no invalidation.