diff --git a/quadrants/program/program.cpp b/quadrants/program/program.cpp index be152d02d..8bab1d30f 100644 --- a/quadrants/program/program.cpp +++ b/quadrants/program/program.cpp @@ -25,6 +25,11 @@ #include "quadrants/rhi/cuda/cuda_context.h" #endif +#ifdef QD_WITH_AMDGPU +#include "quadrants/rhi/amdgpu/amdgpu_driver.h" +#include "quadrants/rhi/amdgpu/amdgpu_context.h" +#endif + #ifdef QD_WITH_VULKAN #include "quadrants/runtime/program_impls/vulkan/vulkan_program.h" #include "quadrants/rhi/vulkan/vulkan_loader.h" @@ -493,6 +498,13 @@ uint64 Program::stream_create() { CUDADriver::get_instance().stream_create(&stream, 0 /*flags*/); return reinterpret_cast(stream); } +#endif +#ifdef QD_WITH_AMDGPU + if (compile_config().arch == Arch::amdgpu) { + void *stream = nullptr; + AMDGPUDriver::get_instance().stream_create(&stream, 0 /*flags*/); + return reinterpret_cast(stream); + } #endif return 0; } @@ -504,6 +516,12 @@ void Program::stream_destroy(uint64 stream_handle) { reinterpret_cast(stream_handle)); } #endif +#ifdef QD_WITH_AMDGPU + if (compile_config().arch == Arch::amdgpu && stream_handle != 0) { + AMDGPUDriver::get_instance().stream_destroy( + reinterpret_cast(stream_handle)); + } +#endif } void Program::stream_synchronize(uint64 stream_handle) { @@ -513,6 +531,12 @@ void Program::stream_synchronize(uint64 stream_handle) { reinterpret_cast(stream_handle)); } #endif +#ifdef QD_WITH_AMDGPU + if (compile_config().arch == Arch::amdgpu && stream_handle != 0) { + AMDGPUDriver::get_instance().stream_synchronize( + reinterpret_cast(stream_handle)); + } +#endif } void Program::set_current_cuda_stream(uint64 stream_handle) { @@ -522,6 +546,12 @@ void Program::set_current_cuda_stream(uint64 stream_handle) { reinterpret_cast(stream_handle)); } #endif +#ifdef QD_WITH_AMDGPU + if (compile_config().arch == Arch::amdgpu) { + AMDGPUContext::get_instance().set_stream( + reinterpret_cast(stream_handle)); + } +#endif } uint64 Program::event_create() { @@ -532,6 +562,14 @@ uint64 Program::event_create() { 0x02 /*CU_EVENT_DISABLE_TIMING*/); return reinterpret_cast(event); } +#endif +#ifdef QD_WITH_AMDGPU + if (compile_config().arch == Arch::amdgpu) { + void *event = nullptr; + AMDGPUDriver::get_instance().event_create(&event, + 0x02 /*hipEventDisableTiming*/); + return reinterpret_cast(event); + } #endif return 0; } @@ -543,6 +581,12 @@ void Program::event_destroy(uint64 event_handle) { reinterpret_cast(event_handle)); } #endif +#ifdef QD_WITH_AMDGPU + if (compile_config().arch == Arch::amdgpu && event_handle != 0) { + AMDGPUDriver::get_instance().event_destroy( + reinterpret_cast(event_handle)); + } +#endif } void Program::event_record(uint64 event_handle, uint64 stream_handle) { @@ -553,6 +597,13 @@ void Program::event_record(uint64 event_handle, uint64 stream_handle) { reinterpret_cast(stream_handle)); } #endif +#ifdef QD_WITH_AMDGPU + if (compile_config().arch == Arch::amdgpu && event_handle != 0) { + AMDGPUDriver::get_instance().event_record( + reinterpret_cast(event_handle), + reinterpret_cast(stream_handle)); + } +#endif } void Program::event_synchronize(uint64 event_handle) { @@ -562,6 +613,12 @@ void Program::event_synchronize(uint64 event_handle) { reinterpret_cast(event_handle)); } #endif +#ifdef QD_WITH_AMDGPU + if (compile_config().arch == Arch::amdgpu && event_handle != 0) { + AMDGPUDriver::get_instance().event_synchronize( + reinterpret_cast(event_handle)); + } +#endif } void Program::stream_wait_event(uint64 stream_handle, uint64 event_handle) { @@ -572,6 +629,13 @@ void Program::stream_wait_event(uint64 stream_handle, uint64 event_handle) { reinterpret_cast(event_handle), 0 /*flags*/); } #endif +#ifdef QD_WITH_AMDGPU + if (compile_config().arch == Arch::amdgpu && event_handle != 0) { + AMDGPUDriver::get_instance().stream_wait_event( + reinterpret_cast(stream_handle), + reinterpret_cast(event_handle), 0 /*flags*/); + } +#endif } } // namespace quadrants::lang diff --git a/quadrants/rhi/amdgpu/amdgpu_context.cpp b/quadrants/rhi/amdgpu/amdgpu_context.cpp index 22f55339e..24d924ed0 100644 --- a/quadrants/rhi/amdgpu/amdgpu_context.cpp +++ b/quadrants/rhi/amdgpu/amdgpu_context.cpp @@ -13,6 +13,8 @@ namespace quadrants { namespace lang { +thread_local void *AMDGPUContext::stream_ = nullptr; + AMDGPUContext::AMDGPUContext() : driver_(AMDGPUDriver::get_instance_without_context()) { dev_count_ = 0; @@ -188,7 +190,7 @@ void AMDGPUContext::launch(void *func, void *config[] = {(void *)0x01, (void *)packed_arg, (void *)0x02, (void *)&pack_size, (void *)0x03}; driver_.launch_kernel(func, grid_dim, 1, 1, block_dim, 1, 1, - dynamic_shared_mem_bytes, nullptr, nullptr, + dynamic_shared_mem_bytes, stream_, nullptr, reinterpret_cast(&config)); } std::free(packed_arg); @@ -197,7 +199,7 @@ void AMDGPUContext::launch(void *func, profiler_->stop(task_handle); if (debug_) { - driver_.stream_synchronize(nullptr); + driver_.stream_synchronize(stream_); } } diff --git a/quadrants/rhi/amdgpu/amdgpu_context.h b/quadrants/rhi/amdgpu/amdgpu_context.h index 9529953bf..4fc7c8328 100644 --- a/quadrants/rhi/amdgpu/amdgpu_context.h +++ b/quadrants/rhi/amdgpu/amdgpu_context.h @@ -23,6 +23,7 @@ class AMDGPUContext { KernelProfilerBase *profiler_{nullptr}; AMDGPUDriver &driver_; bool debug_{false}; + static thread_local void *stream_; std::vector kernel_arg_pointer_; public: @@ -116,6 +117,14 @@ class AMDGPUContext { return std::unique_lock(lock_); } + void set_stream(void *stream) { + stream_ = stream; + } + + void *get_stream() const { + return stream_; + } + static AMDGPUContext &get_instance(); }; diff --git a/quadrants/rhi/amdgpu/amdgpu_driver_functions.inc.h b/quadrants/rhi/amdgpu/amdgpu_driver_functions.inc.h index dbb3612c8..25e33774e 100644 --- a/quadrants/rhi/amdgpu/amdgpu_driver_functions.inc.h +++ b/quadrants/rhi/amdgpu/amdgpu_driver_functions.inc.h @@ -26,6 +26,7 @@ PER_AMDGPU_FUNCTION(context_get_current, hipCtxGetCurrent, void **); // Stream management PER_AMDGPU_FUNCTION(stream_create, hipStreamCreate, void **, uint32); +PER_AMDGPU_FUNCTION(stream_destroy, hipStreamDestroy, void *); // Memory management PER_AMDGPU_FUNCTION(memcpy_host_to_device, @@ -69,6 +70,8 @@ PER_AMDGPU_FUNCTION(memcpy_device_to_host_async, std::size_t, void *); PER_AMDGPU_FUNCTION(malloc, hipMalloc, void **, std::size_t); +// hipMallocAsync/hipFreeAsync require ROCm >= 5.4 +PER_AMDGPU_FUNCTION(malloc_async, hipMallocAsync, void **, std::size_t, void *); PER_AMDGPU_FUNCTION(malloc_managed, hipMallocManaged, void **, @@ -76,6 +79,7 @@ PER_AMDGPU_FUNCTION(malloc_managed, uint32); PER_AMDGPU_FUNCTION(memset, hipMemset, void *, uint8, std::size_t); PER_AMDGPU_FUNCTION(mem_free, hipFree, void *); +PER_AMDGPU_FUNCTION(mem_free_async, hipFreeAsync, void *, void *); PER_AMDGPU_FUNCTION(mem_get_info, hipMemGetInfo, std::size_t *, std::size_t *); PER_AMDGPU_FUNCTION(mem_get_attribute, hipPointerGetAttribute, @@ -121,6 +125,11 @@ PER_AMDGPU_FUNCTION(kernel_get_occupancy, // Stream management PER_AMDGPU_FUNCTION(stream_synchronize, hipStreamSynchronize, void *); +PER_AMDGPU_FUNCTION(stream_wait_event, + hipStreamWaitEvent, + void *, + void *, + uint32); // Event management PER_AMDGPU_FUNCTION(event_create, hipEventCreateWithFlags, void **, uint32); diff --git a/quadrants/runtime/amdgpu/kernel_launcher.cpp b/quadrants/runtime/amdgpu/kernel_launcher.cpp index 6ef0b0e0e..f772fc7b5 100644 --- a/quadrants/runtime/amdgpu/kernel_launcher.cpp +++ b/quadrants/runtime/amdgpu/kernel_launcher.cpp @@ -1,5 +1,6 @@ #include "quadrants/runtime/amdgpu/kernel_launcher.h" #include "quadrants/rhi/amdgpu/amdgpu_context.h" +#include "quadrants/rhi/amdgpu/amdgpu_driver.h" #include "quadrants/program/launch_context_builder.h" namespace quadrants::lang { @@ -32,18 +33,14 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, transfers; std::unordered_map device_ptrs; + auto *active_stream = AMDGPUContext::get_instance().get_stream(); + char *device_result_buffer{nullptr}; - // Here we have to guarantee the result_result_buffer isn't nullptr - // It is interesting - The code following - // L60: DeviceAllocation devalloc = - // executor->allocate_memory_on_device( call another kernel and it will result - // in - // Memory access fault by GPU node-1 (Agent handle: 0xeda5ca0) on address - // (nil). Reason: Page not present or supervisor privilege. - // if you don't allocate it. - AMDGPUDriver::get_instance().malloc( + // Must always allocate device_result_buffer (even when result_buffer_size + // is 0) to avoid memory access faults from allocate_memory_on_device below. + AMDGPUDriver::get_instance().malloc_async( (void **)&device_result_buffer, - std::max(ctx.result_buffer_size, sizeof(uint64))); + std::max(ctx.result_buffer_size, sizeof(uint64)), active_stream); for (int i = 0; i < (int)parameters.size(); i++) { const auto &kv = parameters[i]; @@ -69,8 +66,9 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, executor->get_device_alloc_info_ptr(devalloc); transfers[data_ptr_idx] = {data_ptr, devalloc}; - AMDGPUDriver::get_instance().memcpy_host_to_device( - (void *)device_ptrs[data_ptr_idx], data_ptr, arr_sz); + AMDGPUDriver::get_instance().memcpy_host_to_device_async( + (void *)device_ptrs[data_ptr_idx], data_ptr, arr_sz, + active_stream); } ctx.set_ndarray_ptrs(arg_id, (uint64)device_ptrs[data_ptr_idx], (uint64)ctx.array_ptrs[grad_ptr_idx]); @@ -86,27 +84,28 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, } } if (transfers.size() > 0) { - AMDGPUDriver::get_instance().stream_synchronize(nullptr); + AMDGPUDriver::get_instance().stream_synchronize(active_stream); } char *host_result_buffer = (char *)ctx.get_context().result_buffer; if (ctx.result_buffer_size > 0) { - // Malloc_Async and Free_Async are available after ROCm 5.4 ctx.get_context().result_buffer = (uint64 *)device_result_buffer; } char *device_arg_buffer = nullptr; if (ctx.arg_buffer_size > 0) { - AMDGPUDriver::get_instance().malloc((void **)&device_arg_buffer, - ctx.arg_buffer_size); - AMDGPUDriver::get_instance().memcpy_host_to_device( - device_arg_buffer, ctx.get_context().arg_buffer, ctx.arg_buffer_size); + AMDGPUDriver::get_instance().malloc_async( + (void **)&device_arg_buffer, ctx.arg_buffer_size, active_stream); + AMDGPUDriver::get_instance().memcpy_host_to_device_async( + device_arg_buffer, ctx.get_context().arg_buffer, ctx.arg_buffer_size, + active_stream); ctx.get_context().arg_buffer = device_arg_buffer; } void *context_pointer; int arg_size = sizeof(RuntimeContext *); - AMDGPUDriver::get_instance().malloc((void **)&context_pointer, - sizeof(RuntimeContext)); - AMDGPUDriver::get_instance().memcpy_host_to_device( - context_pointer, &ctx.get_context(), sizeof(RuntimeContext)); + AMDGPUDriver::get_instance().malloc_async( + (void **)&context_pointer, sizeof(RuntimeContext), active_stream); + AMDGPUDriver::get_instance().memcpy_host_to_device_async( + context_pointer, &ctx.get_context(), sizeof(RuntimeContext), + active_stream); AMDGPUContext::get_instance().push_back_kernel_arg_pointer(context_pointer); @@ -119,13 +118,18 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, } QD_TRACE("Launching kernel"); if (ctx.arg_buffer_size > 0) { - AMDGPUDriver::get_instance().mem_free(device_arg_buffer); + AMDGPUDriver::get_instance().mem_free_async(device_arg_buffer, + active_stream); } if (ctx.result_buffer_size > 0) { - AMDGPUDriver::get_instance().memcpy_device_to_host( - host_result_buffer, device_result_buffer, ctx.result_buffer_size); + AMDGPUDriver::get_instance().memcpy_device_to_host_async( + host_result_buffer, device_result_buffer, ctx.result_buffer_size, + active_stream); } + AMDGPUDriver::get_instance().mem_free_async(device_result_buffer, + active_stream); if (transfers.size()) { + AMDGPUDriver::get_instance().stream_synchronize(active_stream); for (auto itr = transfers.begin(); itr != transfers.end(); itr++) { auto &idx = itr->first; auto arg_id = idx.arg_id; @@ -135,8 +139,6 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, executor->deallocate_memory_on_device(itr->second.second); } } - // Since we always allocating above then we should always free - AMDGPUDriver::get_instance().mem_free(device_result_buffer); } KernelLauncher::Handle KernelLauncher::register_llvm_kernel( diff --git a/tests/python/test_streams.py b/tests/python/test_streams.py index fabc217e9..073d383c2 100644 --- a/tests/python/test_streams.py +++ b/tests/python/test_streams.py @@ -8,7 +8,7 @@ from tests import test_utils -@test_utils.test(arch=[qd.cuda]) +@test_utils.test(arch=[qd.cuda, qd.amdgpu]) def test_create_and_destroy_stream(): s = qd.create_stream() assert isinstance(s, Stream) @@ -17,7 +17,7 @@ def test_create_and_destroy_stream(): assert s.handle == 0 -@test_utils.test(arch=[qd.cuda]) +@test_utils.test(arch=[qd.cuda, qd.amdgpu]) def test_create_and_destroy_event(): e = qd.create_event() assert isinstance(e, Event) @@ -195,3 +195,83 @@ def fill(arr: qd.types.ndarray(dtype=qd.f32, ndim=1)): s.synchronize() assert np.allclose(arr.to_numpy(), 99.0) s.destroy() + + +@test_utils.test() +def test_concurrent_streams_with_events(): + """Two slow kernels on separate streams run concurrently (~1s on GPU), + serial fallback on CPU/Metal.""" + SPIN_ITERS = 5_000_000 + + @qd.kernel + def slow_fill( + a: qd.types.ndarray(dtype=qd.f32, ndim=1), + lcg_state: qd.types.ndarray(dtype=qd.i32, ndim=1), + index: qd.i32, + value: qd.f32, + ): + qd.loop_config(block_dim=1) + for _ in range(1): + x = lcg_state[index] + for _j in range(SPIN_ITERS): + x = (1664525 * x + 1013904223) % 2147483647 + lcg_state[index] = x + a[index] = value + + @qd.kernel + def add_first_two(a: qd.types.ndarray(dtype=qd.f32, ndim=1)): + qd.loop_config(block_dim=1) + for _ in range(1): + a[2] = a[0] + a[1] + + import time + + # Warm up JIT + a_warmup = qd.ndarray(qd.f32, shape=(3,)) + lcg_warmup = qd.ndarray(qd.i32, shape=(3,)) + slow_fill(a_warmup, lcg_warmup, 0, 0.0) + add_first_two(a_warmup) + qd.sync() + + # Serial baseline + a = qd.ndarray(qd.f32, shape=(3,)) + lcg = qd.ndarray(qd.i32, shape=(3,)) + qd.sync() + t0 = time.perf_counter() + slow_fill(a, lcg, 0, 5.0) + slow_fill(a, lcg, 1, 7.0) + add_first_two(a) + qd.sync() + serial_time = time.perf_counter() - t0 + assert np.isclose(a.to_numpy()[2], 12.0) + + # Streams + a = qd.ndarray(qd.f32, shape=(3,)) + lcg = qd.ndarray(qd.i32, shape=(3,)) + s1 = qd.create_stream() + s2 = qd.create_stream() + e1 = qd.create_event() + e2 = qd.create_event() + qd.sync() + t0 = time.perf_counter() + slow_fill(a, lcg, 0, 5.0, qd_stream=s1) + slow_fill(a, lcg, 1, 7.0, qd_stream=s2) + e1.record(s1) + e2.record(s2) + e1.wait() + e2.wait() + add_first_two(a) + qd.sync() + stream_time = time.perf_counter() - t0 + assert np.isclose(a.to_numpy()[2], 12.0) + + speedup = serial_time / stream_time + if qd.lang.impl.current_cfg().arch in (qd.cuda, qd.amdgpu): + assert speedup > 1.5, f"Expected >1.5x speedup, got {speedup:.2f}x" + else: + assert speedup > 0.75, f"Expected >=0.75x (serial fallback), got {speedup:.2f}x" + + s1.destroy() + s2.destroy() + e1.destroy() + e2.destroy()