From e9f98c645671a2a9b5ee3cae915c31a852053cf4 Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Wed, 11 Mar 2026 18:33:27 -0700 Subject: [PATCH 1/3] Add stream pool to reuse GPU streams across kernel launches Replace per-launch stream_create/stream_destroy with acquire_stream/ release_stream on CUDAContext and AMDGPUContext. Streams are cached in a pool and reused across invocations, avoiding the driver-level overhead of stream creation (~5-50us) on every kernel launch in hot loops. --- quadrants/rhi/amdgpu/amdgpu_context.h | 18 ++++++++++++++++++ quadrants/rhi/cuda/cuda_context.h | 19 +++++++++++++++++++ quadrants/runtime/amdgpu/kernel_launcher.cpp | 8 ++------ quadrants/runtime/cuda/kernel_launcher.cpp | 8 ++------ 4 files changed, 41 insertions(+), 12 deletions(-) diff --git a/quadrants/rhi/amdgpu/amdgpu_context.h b/quadrants/rhi/amdgpu/amdgpu_context.h index 4fc7c8328..dd99e4fd3 100644 --- a/quadrants/rhi/amdgpu/amdgpu_context.h +++ b/quadrants/rhi/amdgpu/amdgpu_context.h @@ -24,6 +24,7 @@ class AMDGPUContext { AMDGPUDriver &driver_; bool debug_{false}; static thread_local void *stream_; + std::vector stream_pool_; std::vector kernel_arg_pointer_; public: @@ -125,6 +126,23 @@ class AMDGPUContext { return stream_; } + void *acquire_stream() { + std::lock_guard _(lock_); + if (!stream_pool_.empty()) { + auto s = stream_pool_.back(); + stream_pool_.pop_back(); + return s; + } + void *s = nullptr; + AMDGPUDriver::get_instance().stream_create(&s, 0); + return s; + } + + void release_stream(void *s) { + std::lock_guard _(lock_); + stream_pool_.push_back(s); + } + static AMDGPUContext &get_instance(); }; diff --git a/quadrants/rhi/cuda/cuda_context.h b/quadrants/rhi/cuda/cuda_context.h index ba891644a..b4a480961 100644 --- a/quadrants/rhi/cuda/cuda_context.h +++ b/quadrants/rhi/cuda/cuda_context.h @@ -3,6 +3,7 @@ #include #include #include +#include #include "quadrants/program/kernel_profiler.h" #include "quadrants/rhi/cuda/cuda_driver.h" @@ -31,6 +32,7 @@ class CUDAContext { bool debug_; bool supports_mem_pool_; static thread_local void *stream_; + std::vector stream_pool_; public: CUDAContext(); @@ -120,6 +122,23 @@ class CUDAContext { void *get_stream() const { return stream_; } + + void *acquire_stream() { + std::lock_guard _(lock_); + if (!stream_pool_.empty()) { + auto s = stream_pool_.back(); + stream_pool_.pop_back(); + return s; + } + void *s = nullptr; + CUDADriver::get_instance().stream_create(&s, 0); + return s; + } + + void release_stream(void *s) { + std::lock_guard _(lock_); + stream_pool_.push_back(s); + } }; } // namespace quadrants::lang diff --git a/quadrants/runtime/amdgpu/kernel_launcher.cpp b/quadrants/runtime/amdgpu/kernel_launcher.cpp index 6abd0778e..88a357092 100644 --- a/quadrants/runtime/amdgpu/kernel_launcher.cpp +++ b/quadrants/runtime/amdgpu/kernel_launcher.cpp @@ -127,15 +127,11 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, i++; } - // Create one stream per unique group ID. Streams are created/destroyed - // per launch; a stream pool could reduce overhead for hot loops. std::map stream_by_id; for (size_t j = group_start; j < i; j++) { int sid = offloaded_tasks[j].stream_parallel_group_id; if (stream_by_id.find(sid) == stream_by_id.end()) { - void *s = nullptr; - AMDGPUDriver::get_instance().stream_create(&s, 0); - stream_by_id[sid] = s; + stream_by_id[sid] = AMDGPUContext::get_instance().acquire_stream(); } } @@ -155,7 +151,7 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, AMDGPUDriver::get_instance().stream_synchronize(s); } for (auto &[sid, s] : stream_by_id) { - AMDGPUDriver::get_instance().stream_destroy(s); + AMDGPUContext::get_instance().release_stream(s); } AMDGPUContext::get_instance().set_stream(active_stream); diff --git a/quadrants/runtime/cuda/kernel_launcher.cpp b/quadrants/runtime/cuda/kernel_launcher.cpp index 9cf24915a..6743d7c29 100644 --- a/quadrants/runtime/cuda/kernel_launcher.cpp +++ b/quadrants/runtime/cuda/kernel_launcher.cpp @@ -159,15 +159,11 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, i++; } - // Create one stream per unique group ID. Streams are created/destroyed - // per launch; a stream pool could reduce overhead for hot loops. std::map stream_by_id; for (size_t j = group_start; j < i; j++) { int sid = offloaded_tasks[j].stream_parallel_group_id; if (stream_by_id.find(sid) == stream_by_id.end()) { - void *s = nullptr; - CUDADriver::get_instance().stream_create(&s, 0); - stream_by_id[sid] = s; + stream_by_id[sid] = CUDAContext::get_instance().acquire_stream(); } } @@ -187,7 +183,7 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, CUDADriver::get_instance().stream_synchronize(s); } for (auto &[sid, s] : stream_by_id) { - CUDADriver::get_instance().stream_destroy(s); + CUDAContext::get_instance().release_stream(s); } CUDAContext::get_instance().set_stream(active_stream); From 65a7967ca88aa33f62b3de4411cd3f51f870ed5f Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Wed, 11 Mar 2026 18:37:11 -0700 Subject: [PATCH 2/3] Add test for stream pool reuse across repeated kernel launches Calls a stream_parallel kernel 5 times in a loop to verify that pooled streams are correctly reused with correct results each iteration. --- tests/python/test_streams.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tests/python/test_streams.py b/tests/python/test_streams.py index 4c28b6f58..86568c4e1 100644 --- a/tests/python/test_streams.py +++ b/tests/python/test_streams.py @@ -419,3 +419,31 @@ def fill(arr: qd.types.ndarray(dtype=qd.f32, ndim=1)): s.synchronize() assert np.allclose(arr.to_numpy(), 99.0) s.destroy() + + +@test_utils.test() +def test_stream_pool_reuse(): + """Repeated stream_parallel invocations reuse pooled streams correctly.""" + N = 128 + a = qd.ndarray(qd.f32, shape=(N,)) + b = qd.ndarray(qd.f32, shape=(N,)) + + @qd.kernel + def parallel_fill( + x: qd.types.ndarray(dtype=qd.f32, ndim=1), + y: qd.types.ndarray(dtype=qd.f32, ndim=1), + val: qd.f32, + ): + with qd.stream_parallel(): + for i in range(N): + x[i] = val + with qd.stream_parallel(): + for i in range(N): + y[i] = val * 2.0 + + for iteration in range(5): + v = float(iteration + 1) + parallel_fill(a, b, v) + qd.sync() + assert np.allclose(a.to_numpy(), v), f"iteration {iteration}" + assert np.allclose(b.to_numpy(), v * 2.0), f"iteration {iteration}" From 5393d04c8d210edf8fe7d0301ae6f68e22e56b8f Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Wed, 11 Mar 2026 18:51:33 -0700 Subject: [PATCH 3/3] Destroy pooled streams in CUDAContext and AMDGPUContext destructors --- quadrants/rhi/amdgpu/amdgpu_context.cpp | 4 ++++ quadrants/rhi/cuda/cuda_context.cpp | 11 ++++------- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/quadrants/rhi/amdgpu/amdgpu_context.cpp b/quadrants/rhi/amdgpu/amdgpu_context.cpp index 24d924ed0..7163431e3 100644 --- a/quadrants/rhi/amdgpu/amdgpu_context.cpp +++ b/quadrants/rhi/amdgpu/amdgpu_context.cpp @@ -204,6 +204,10 @@ void AMDGPUContext::launch(void *func, } AMDGPUContext::~AMDGPUContext() { + for (auto *s : stream_pool_) { + driver_.stream_destroy(s); + } + stream_pool_.clear(); if (context_) { driver_.device_primary_ctx_release(device_); } diff --git a/quadrants/rhi/cuda/cuda_context.cpp b/quadrants/rhi/cuda/cuda_context.cpp index 23399649a..286c4eb3b 100644 --- a/quadrants/rhi/cuda/cuda_context.cpp +++ b/quadrants/rhi/cuda/cuda_context.cpp @@ -180,13 +180,10 @@ void CUDAContext::launch(void *func, } CUDAContext::~CUDAContext() { - // TODO: restore these? - /* - CUDADriver::get_instance().cuMemFree(context_buffer); - for (auto cudaModule: cudaModules) - CUDADriver::get_instance().cuModuleUnload(cudaModule); - CUDADriver::get_instance().cuCtxDestroy(context); - */ + for (auto *s : stream_pool_) { + driver_.stream_destroy(s); + } + stream_pool_.clear(); } CUDAContext &CUDAContext::get_instance() {