Skip to content

[Perf] Streams 3: Add qd.stream_parallel() context manager#409

Draft
hughperkins wants to merge 6 commits intohp/streams-quadrantsic-2-amdgpu-cpufrom
hp/streams-quadrantsic-3-stream-parallel
Draft

[Perf] Streams 3: Add qd.stream_parallel() context manager#409
hughperkins wants to merge 6 commits intohp/streams-quadrantsic-2-amdgpu-cpufrom
hp/streams-quadrantsic-3-stream-parallel

Conversation

@hughperkins
Copy link
Collaborator

@hughperkins hughperkins commented Mar 11, 2026

Introduces stream_parallel() for running top-level for-loop blocks on separate GPU streams. The AST transformer maps 'with qd.stream_parallel()' blocks to stream-parallel group IDs, which propagate through IR lowering and offloading to the CUDA/AMDGPU kernel launchers. Each unique group ID gets its own stream at launch time. Includes validation that all top-level kernel statements must be stream_parallel blocks (no mixing), and offline cache key support.

lines added: +377 - 161 = +216

Issue: #

Brief Summary

copilot:summary

Walkthrough

copilot:walkthrough

Introduces stream_parallel() for running top-level for-loop blocks on
separate GPU streams. The AST transformer maps 'with qd.stream_parallel()'
blocks to stream-parallel group IDs, which propagate through IR lowering
and offloading to the CUDA/AMDGPU kernel launchers. Each unique group ID
gets its own stream at launch time. Includes validation that all top-level
kernel statements must be stream_parallel blocks (no mixing), and offline
cache key support.
@hughperkins hughperkins marked this pull request as draft March 11, 2026 23:54
…adrantsic-3-stream-parallel

# Conflicts:
#	python/quadrants/lang/stream.py
Prevents stale group IDs from leaking if insert_for is called after a
path that set a non-zero stream_parallel_group_id, matching the reset
pattern of all other ForLoopConfig fields.
Add an error check in begin_stream_parallel() to prevent nesting, which
would produce undefined group ID semantics.
…context safety

Add comments explaining that streams are created/destroyed per launch
(stream pooling as future optimization), and that RuntimeContext sharing
across concurrent streams is safe because kernels only read from it.
@hughperkins
Copy link
Collaborator Author

Review from Opus (predates last 5 commits above):

PR Review: qd.stream_parallel() context manager for implicit stream parallelism

Summary

This PR introduces a qd.stream_parallel() context manager that allows users to declare groups of for-loops within a kernel that should execute on separate GPU streams. The feature spans the full stack:

  1. Python API (stream.py): A @contextmanager no-op at runtime, intercepted at compile time by the AST transformer.
  2. AST transform (ast_transformer.py, function_def_transformer.py): Recognizes with qd.stream_parallel(): blocks, calls begin_stream_parallel()/end_stream_parallel() on the C++ ASTBuilder, and validates that all top-level kernel statements are stream_parallel blocks (or none are).
  3. IR propagation (frontend_ir.h/cpp, statements.h/cpp, lower_ast.cpp, offload.cpp): Threads a stream_parallel_group_id through ForLoopConfig -> FrontendForStmt -> RangeForStmt/StructForStmt -> OffloadedStmt -> OffloadedTask.
  4. Codegen (codegen_cuda.cpp, codegen_amdgpu.cpp): Copies stream_parallel_group_id onto the OffloadedTask.
  5. Runtime (kernel_launcher.cpp for CUDA and AMDGPU): Groups consecutive tasks with non-zero group IDs, creates one stream per unique group ID, launches them concurrently, then synchronizes and destroys the streams.

The design is clean: each with qd.stream_parallel(): block gets a monotonically increasing group ID, loops within the same block share a stream (serialized on that stream), while loops in different blocks get different streams (concurrent). On CPU/Metal, the group ID simply has no runtime effect, providing a natural serial fallback.

Architecture & Design Feedback

Strengths:

  • The compile-time interception of a Python context manager is elegant -- stream_parallel() is a no-op yield at Python runtime, but the AST transformer gives it compile-time semantics.
  • The exclusivity validation (all top-level statements must be stream_parallel blocks if any are) is a pragmatic simplification that avoids complex interleaving semantics.
  • The group ID approach naturally handles multiple loops within one stream_parallel block (they share a stream and execute serially on it).

Concerns:

  1. Stream creation/destruction per launch. Every kernel invocation creates and destroys GPU streams. Stream creation is a driver-level synchronization point and can be expensive (~5-50us on CUDA). For kernels invoked in a loop, this overhead could negate the parallelism benefit. Consider a stream pool that reuses streams across invocations, or at least document this as a known limitation.

  2. Near-identical CUDA/AMDGPU launcher code. The stream-parallel dispatch logic in quadrants/runtime/cuda/kernel_launcher.cpp and quadrants/runtime/amdgpu/kernel_launcher.cpp is copy-pasted (~40 lines each). If any bug is found or behavior changes, both must be updated in lockstep. Consider extracting a shared template or helper function.

Potential Bugs / Issues

  1. ForLoopDecoratorRecorder::reset() does not clear stream_parallel_group_id.
// quadrants/ir/frontend_ir.h:950-957
void reset() {
    config.is_bit_vectorized = false;
    config.num_cpu_threads = 0;
    config.uniform = false;
    config.mem_access_opt.clear();
    config.block_dim = 0;
    config.strictly_serialized = false;
}

Every other config field is reset here except stream_parallel_group_id. This works today because all begin_frontend_*_for methods explicitly stamp current_stream_parallel_group_id_ onto the config before use. However, ASTBuilder::insert_for creates a FrontendForStmt using for_loop_dec_.config without first setting stream_parallel_group_id:

// quadrants/ir/frontend_ir.cpp:1347-1359
void ASTBuilder::insert_for(const Expr &s,
                            const Expr &e,
                            const std::function<void(Expr)> &func) {
    auto i = Expr(std::make_shared<IdExpression>(get_next_id()));
    auto stmt_unique = std::make_unique<FrontendForStmt>(i, s, e, this->arch_,
                                                         for_loop_dec_.config);
    for_loop_dec_.reset();
    // ...
}

If insert_for were ever called after a path that set a non-zero stream_parallel_group_id, the stale value would leak. Add config.stream_parallel_group_id = 0; to reset() for safety.

  1. No exception safety on stream creation. In both CUDA and AMDGPU launchers, if a launch() call throws after some streams have been created, those streams leak. This is consistent with the existing error handling style in the codebase, but worth noting. A RAII wrapper or scope guard would make this robust.

  2. Shared RuntimeContext across concurrent streams. In the CUDA launcher, all parallel tasks share the same ctx.get_context() pointer:

// quadrants/runtime/cuda/kernel_launcher.cpp:170-177
for (size_t j = group_start; j < i; j++) {
    auto &t = offloaded_tasks[j];
    CUDAContext::get_instance().set_stream(
        stream_by_id[t.stream_parallel_group_id]);
    cuda_module->launch(t.name, t.grid_dim, t.block_dim,
                        t.dynamic_shared_array_bytes, {&ctx.get_context()},
                        {});
}

If any kernel writes to the RuntimeContext (e.g. result buffer), concurrent kernels sharing the same context could race. This is probably safe if the kernels only read from the context (args), but worth verifying that no kernel writes back into the context during execution. The same applies to AMDGPU where context_pointer is shared.

Edge Cases

  1. Empty stream_parallel blocks. A with qd.stream_parallel(): block with no for-loops inside would set a group ID but generate no tasks. The begin_stream_parallel()/end_stream_parallel() still increments the counter. This is harmless but untested.

  2. Nested stream_parallel blocks. The build_With handler doesn't prevent nesting:

with qd.stream_parallel():
    with qd.stream_parallel():  # nested -- what happens?
        for i in range(...):
            ...

The inner begin_stream_parallel() would overwrite current_stream_parallel_group_id_, and the outer's end_stream_parallel() would reset it to 0. The inner end_stream_parallel() would have already set it to 0 before the outer's. This seems like it would work "accidentally" but the semantics are undefined. Consider explicitly rejecting nested stream_parallel blocks.

  1. with statement in non-kernel qd.func. The build_With handler is registered on the generic AST transformer but the exclusivity validation in _validate_stream_parallel_exclusivity only runs for ctx.is_kernel. What happens if stream_parallel is used inside a @qd.func? The begin_stream_parallel()/end_stream_parallel() calls would still execute and tag loops with group IDs. If those loops get inlined into a kernel, they'd carry the group IDs. Consider whether this should be rejected for @qd.func.

Code Style

  1. The formatting-only changes (multi-line function signatures in ast_transformer.py) are fine but arguably belong in a separate commit to keep the diff focused.

  2. The test_stream_with_ndarray test was moved (not deleted) to the end of the file. This is fine but could cause confusion in the diff for reviewers.

Tests

The test coverage is good:

  • test_stream_parallel_basic -- correctness of two independent parallel blocks
  • test_stream_parallel_multiple_loops_per_stream -- multiple loops sharing a stream
  • test_stream_parallel_timing -- actual speedup measurement on GPU with serial fallback tolerance
  • test_stream_parallel_rejects_mixed_top_level -- validation error testing

Missing tests:

  • Empty stream_parallel block (no loops inside)
  • stream_parallel with struct-for loops (only range-for is tested)
  • stream_parallel with ndarray arguments

Suggestions Summary

Priority Item
High Add config.stream_parallel_group_id = 0; to ForLoopDecoratorRecorder::reset() (#3)
High Verify RuntimeContext is safe to share across concurrent streams (#5)
Medium Reject nested stream_parallel blocks explicitly (#7)
Medium Consider rejecting stream_parallel in @qd.func (#8)
Low Extract shared stream-dispatch logic from CUDA/AMDGPU launchers (#2)
Low Consider stream pooling for repeated kernel launches (#1)

@hughperkins
Copy link
Collaborator Author

For the concern about stream pool, added 4th pr to add stream pool #410

@hughperkins hughperkins marked this pull request as ready for review March 12, 2026 01:38
@hughperkins hughperkins changed the title [Perf] Streams part 3: Add qd.stream_parallel() context manager [Perf] Streams 3: Add qd.stream_parallel() context manager Mar 12, 2026
@hughperkins hughperkins marked this pull request as draft March 12, 2026 04:59
Comment on lines +312 to +314
if len(stmt.items) != 1:
return False
item = stmt.items[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is items ? Could you document here or somewhere else why the length can be 1 or more, and what does it means in this context?

Comment on lines +327 to +330
"When using qd.stream_parallel(), all top-level statements "
"in the kernel must be 'with qd.stream_parallel():' blocks. "
"Move non-parallel code to a separate kernel."
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't understand why you are moving to the next line before you have to. This is weird to me. But I don't care much.

Comment on lines +321 to +322
has_sp = any(FunctionDefTransformer._is_stream_parallel_with(s, global_vars) for s in body)
if not has_sp:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather do

# <Insert fancy comment explaining what this check is doing>
if not any(FunctionDefTransformer._is_stream_parallel_with(s, global_vars) for s in body):
    return

Comment on lines +1396 to +1398
if len(node.items) != 1:
raise QuadrantsSyntaxError("'with' in Quadrants kernels only supports a single context manager")
item = node.items[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same. Not clear what items is.

Copy link
Contributor

@duburcqa duburcqa Mar 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All this "code duplication" (at least duplicate logics) is annoying but if there is no better choice and it is what we have been doing so far, then it is ok.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants