Skip to content

[pipeline] Generalize _Node queue handling with input/output queues#1325

Merged
mthrok merged 1 commit intomainfrom
refactor3
Apr 8, 2026
Merged

[pipeline] Generalize _Node queue handling with input/output queues#1325
mthrok merged 1 commit intomainfrom
refactor3

Conversation

@mthrok
Copy link
Copy Markdown
Collaborator

@mthrok mthrok commented Apr 7, 2026

Replace the single queue field on _Node with explicit input_queues
and output_queues sequences, assigned at construction time.

Previously, nodes implicitly derived their input queue from
upstream[0].queue at coroutine-build time. This works for linear
pipelines but breaks down for fan-out patterns (PathVariants) where:

  • A router writes to multiple output queues (one per path)
  • A path's first pipe reads from a dedicated queue, not upstream's output

With explicit queue fields, the data-flow wiring is visible in the node
graph structure rather than scattered across special-case logic in
_build_node.

Queue assignments by node type:

  • Source:
    input_queues=[],
    output_queues=[out_q]
  • Pipe/Aggregate/Disaggregate:
    input_queues=[upstream.output_queues[0]],
    output_queues=[out_q]
  • Merge:
    input_queues=[n.output_queues[0] for n in upstream],
    output_queues=[out_q]
  • Sink:
    input_queues=[upstream.output_queues[0]],
    output_queues=[out_q]

@meta-cla meta-cla bot added the CLA Signed This label is managed by the Meta Open Source bot. label Apr 7, 2026
@meta-codesync
Copy link
Copy Markdown
Contributor

meta-codesync bot commented Apr 7, 2026

@facebook-github-bot has imported this pull request. If you are a Meta employee, you can view this in D99880597. (Because this pull request was imported automatically, there will not be any future comments.)

…ueues

Replace the single `queue` field on `_Node` with explicit `input_queues`
and `output_queues` sequences, assigned at construction time.

Previously, nodes implicitly derived their input queue from
`upstream[0].queue` at coroutine-build time. This works for linear
pipelines but breaks down for fan-out patterns (PathVariants) where:
- A router writes to multiple output queues (one per path)
- A path's first pipe reads from a dedicated queue, not upstream's output

With explicit queue fields, the data-flow wiring is visible in the node
graph structure rather than scattered across special-case logic in
`_build_node`.

Queue assignments by node type:
- Source:
  `input_queues=[]`,
  `output_queues=[out_q]`
- Pipe/Aggregate/Disaggregate:
  `input_queues=[upstream.output_queues[0]]`,
  `output_queues=[out_q]`
- Merge:
  `input_queues=[n.output_queues[0] for n in upstream]`,
  `output_queues=[out_q]`
- Sink:
  `input_queues=[upstream.output_queues[0]]`,
  `output_queues=[out_q]`
@mthrok mthrok marked this pull request as ready for review April 8, 2026 01:05
@mthrok mthrok enabled auto-merge (squash) April 8, 2026 01:05
@mthrok mthrok merged commit 9d3dc38 into main Apr 8, 2026
37 of 38 checks passed
@mthrok mthrok deleted the refactor3 branch April 8, 2026 01:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Meta Open Source bot.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant