diff --git a/docs/sphinx/api/qec/cpp_api.rst b/docs/sphinx/api/qec/cpp_api.rst index 2808234c..c5075c7d 100644 --- a/docs/sphinx/api/qec/cpp_api.rst +++ b/docs/sphinx/api/qec/cpp_api.rst @@ -68,6 +68,8 @@ Real-Time Decoding .. include:: cpp_realtime_decoding_api.rst +.. include:: realtime_pipeline_api.rst + .. _parity_check_matrix_utilities: Parity Check Matrix Utilities diff --git a/docs/sphinx/api/qec/realtime_pipeline_api.rst b/docs/sphinx/api/qec/realtime_pipeline_api.rst new file mode 100644 index 00000000..3e2733e2 --- /dev/null +++ b/docs/sphinx/api/qec/realtime_pipeline_api.rst @@ -0,0 +1,72 @@ +.. _realtime_pipeline_api: + +Realtime Pipeline API +===================== + +The realtime pipeline API provides the reusable host-side runtime for +low-latency QEC pipelines that combine GPU inference with optional CPU +post-processing. The published reference is generated from +``cudaq/qec/realtime/pipeline.h``. + +.. note:: + + This API is experimental and subject to change. + + +Configuration +------------- + +.. doxygenstruct:: cudaq::qec::realtime::experimental::core_pinning + :members: + +.. doxygenstruct:: cudaq::qec::realtime::experimental::pipeline_stage_config + :members: + + +GPU Stage +--------- + +.. doxygenstruct:: cudaq::qec::realtime::experimental::gpu_worker_resources + :members: + +.. doxygentypedef:: cudaq::qec::realtime::experimental::gpu_stage_factory + + +CPU Stage +--------- + +.. doxygenstruct:: cudaq::qec::realtime::experimental::cpu_stage_context + :members: + +.. doxygentypedef:: cudaq::qec::realtime::experimental::cpu_stage_callback + +.. doxygenvariable:: cudaq::qec::realtime::experimental::DEFERRED_COMPLETION + + +Completion +---------- + +.. doxygenstruct:: cudaq::qec::realtime::experimental::completion + :members: + +.. doxygentypedef:: cudaq::qec::realtime::experimental::completion_callback + + +Ring Buffer Injector +-------------------- + +.. doxygenclass:: cudaq::qec::realtime::experimental::ring_buffer_injector + :members: + + +Pipeline +-------- + +.. doxygenclass:: cudaq::qec::realtime::experimental::realtime_pipeline + :members: + +.. doxygenstruct:: cudaq::qec::realtime::experimental::realtime_pipeline::Stats + :members: + +.. doxygenstruct:: cudaq::qec::realtime::experimental::realtime_pipeline::ring_buffer_bases + :members: diff --git a/docs/sphinx/components/qec/introduction.rst b/docs/sphinx/components/qec/introduction.rst index 3297ed76..aa761d93 100644 --- a/docs/sphinx/components/qec/introduction.rst +++ b/docs/sphinx/components/qec/introduction.rst @@ -861,6 +861,9 @@ Additional quantum gates can be applied, and only when `get_corrections` is call For detailed information on real-time decoding, see: * :doc:`/examples_rst/qec/realtime_decoding` - Complete Guide with Examples +* :doc:`/examples_rst/qec/realtime_predecoder_pymatching` - Realtime AI Predecoder Pipeline +* :doc:`/examples_rst/qec/realtime_predecoder_fpga` - Realtime AI Predecoder Pipeline with FPGA +* :ref:`realtime_pipeline_api` - Realtime Pipeline C++ API * :doc:`/api/qec/cpp_api` - C++ API Reference (see Real-Time Decoding section) * :doc:`/api/qec/python_api` - Python API Reference (see Real-Time Decoding section) diff --git a/docs/sphinx/examples_rst/qec/examples.rst b/docs/sphinx/examples_rst/qec/examples.rst index 79247213..f9326da9 100644 --- a/docs/sphinx/examples_rst/qec/examples.rst +++ b/docs/sphinx/examples_rst/qec/examples.rst @@ -10,4 +10,5 @@ Examples that illustrate how to use CUDA-QX for application development are avai Code-Capacity QEC Circuit-Level QEC Decoders - Real-Time Decoding \ No newline at end of file + Real-Time Decoding + Realtime AI Predecoder Pipeline \ No newline at end of file diff --git a/docs/sphinx/examples_rst/qec/realtime_predecoder_pymatching.rst b/docs/sphinx/examples_rst/qec/realtime_predecoder_pymatching.rst new file mode 100644 index 00000000..c42ba937 --- /dev/null +++ b/docs/sphinx/examples_rst/qec/realtime_predecoder_pymatching.rst @@ -0,0 +1,310 @@ +Realtime AI Predecoder Pipeline +================================ + +.. note:: + + The following information is about a C++ demonstration that must be built + from source and is not part of any distributed CUDA-Q QEC binaries. + +This guide explains how to build and run the hybrid AI predecoder + PyMatching +streaming benchmark. The benchmark uses a TensorRT-accelerated neural network +(the *predecoder*) to reduce syndrome density on the GPU, then feeds the +residual detectors to a pool of PyMatching MWPM decoders on the CPU. A +software data injector streams pre-generated syndrome shots through the +``RealtimePipeline`` at a configurable rate and collects latency, throughput, +syndrome density, and logical error rate statistics. + +The benchmark binary is +``test_realtime_predecoder_w_pymatching``, built from +`libs/qec/unittests/realtime/test_realtime_predecoder_w_pymatching.cpp +`_. + + +Prerequisites +------------- + +Hardware +^^^^^^^^ + +- CUDA-capable GPU (NVIDIA Grace Blackwell / GB200 recommended) +- Sufficient GPU memory for the TensorRT engine (the d13_r104 model requires + approximately 1 GB per predecoder instance) + +Software +^^^^^^^^ + +- **CUDA Toolkit** 12.6 or later +- **TensorRT** 10.x (headers and libraries) +- **CUDA-Q SDK** pre-installed (provides ``libcudaq``, ``libnvqir``, ``nvq++``) +- **CUDA-Q Realtime** libraries (``libcudaq-realtime``, + ``libcudaq-realtime-dispatch``, ``libcudaq-realtime-host-dispatch``) built + and installed to a known prefix (e.g. ``/tmp/cudaq-realtime``) + +Additional inputs: + +- **Predecoder ONNX model** (e.g. ``predecoder_memory_d13_T104_X.onnx``) + placed under ``libs/qec/lib/realtime/``. A cached TensorRT ``.engine`` file + with the same base name is loaded automatically if present; otherwise the + engine is built from the ONNX file on first run (this can take 1--2 minutes + for large models). +- **Syndrome data directory** containing pre-generated detector samples, + observables, and matching graph data (see `Data Directory Layout`_). + + +Data Directory Layout +--------------------- + +The ``--data-dir`` flag points to a directory with the following files. +All binary files use little-endian format. + +.. list-table:: + :header-rows: 1 + :widths: 30 70 + + * - File + - Description + * - ``detectors.bin`` + - Detector samples. Header: ``uint32 num_samples``, ``uint32 num_detectors``; + body: ``int32[num_samples * num_detectors]``. + * - ``observables.bin`` + - Observable ground-truth labels. Header: ``uint32 num_samples``, + ``uint32 num_observables``; body: ``int32[num_samples * num_observables]``. + * - ``H_csr.bin`` + - Sparse CSR parity check matrix. Header: ``uint32 nrows``, + ``uint32 ncols``, ``uint32 nnz``; body: ``int32 indptr[nrows+1]``, + ``int32 indices[nnz]``. + * - ``O_csr.bin`` + - Sparse CSR observables matrix (same format as ``H_csr.bin``). + * - ``priors.bin`` + - Per-edge error probabilities. Header: ``uint32 num_edges``; body: + ``float64[num_edges]``. + * - ``metadata.txt`` + - Human-readable parameters (``distance``, ``n_rounds``, ``p_error``, + etc.). Not read by the binary; included for reference. + + +Building +-------- + +The benchmark requires two CMake targets: + +- ``test_realtime_predecoder_w_pymatching`` -- the benchmark binary +- ``cudaq-qec-pymatching`` -- the PyMatching decoder plugin (loaded at runtime) + +Configure and build: + +.. code-block:: bash + + cd /path/to/cudaqx + + cmake -S . -B build \ + -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_CUDA_COMPILER=/usr/local/cuda-13.0/bin/nvcc \ + -DCUDAQ_DIR=/usr/local/cudaq/lib/cmake/cudaq \ + -DCUDAQ_REALTIME_ROOT=/tmp/cudaq-realtime \ + -DCUDAQ_QEC_BUILD_TRT_DECODER=ON \ + -DCUDAQX_ENABLE_LIBS=qec \ + -DCUDAQX_INCLUDE_TESTS=ON \ + -DCUDAQX_QEC_INCLUDE_TESTS=ON + + cmake --build build -j$(nproc) --target \ + test_realtime_predecoder_w_pymatching \ + cudaq-qec-pymatching + +.. note:: + + The ``test_realtime_predecoder_w_pymatching`` target requires TensorRT + headers and libraries to be discoverable. CMake searches standard system + paths (e.g. ``/usr/include/aarch64-linux-gnu``, + ``/usr/lib/aarch64-linux-gnu``). If TensorRT is installed elsewhere, set + ``-DTENSORRT_ROOT=/path/to/tensorrt``. + + The ``cudaq-qec-pymatching`` shared library is written to + ``build/lib/decoder-plugins/``. If the benchmark fails with + ``invalid decoder requested: pymatching``, verify that this file exists. + + +Running +------- + +.. code-block:: text + + test_realtime_predecoder_w_pymatching [rate_us] [duration_s] [flags] + +Positional Arguments +^^^^^^^^^^^^^^^^^^^^ + +.. list-table:: + :header-rows: 1 + :widths: 15 55 15 + + * - Argument + - Description + - Default + * - ``config`` + - Pipeline configuration name (see table below) + - ``d7`` + * - ``rate_us`` + - Inter-arrival time in microseconds. ``0`` runs open-loop (as fast as + possible). + - ``0`` + * - ``duration_s`` + - Test duration in seconds + - ``5`` + +Named Flags +^^^^^^^^^^^ + +.. list-table:: + :header-rows: 1 + :widths: 30 70 + + * - Flag + - Description + * - ``--data-dir `` + - Path to syndrome data directory (see `Data Directory Layout`_). When + omitted, random syndromes with 1% error rate are generated. + * - ``--num-gpus `` + - Number of GPUs to use. Currently clamped to 1 (multi-GPU dispatch is + not yet supported). + +Pipeline Configurations +^^^^^^^^^^^^^^^^^^^^^^^ + +.. list-table:: + :header-rows: 1 + :widths: 12 10 10 38 10 10 10 + + * - Config + - Distance + - Rounds + - ONNX Model + - Pre-decoders + - Workers + - Decode Workers + * - ``d13_r104`` + - 13 + - 104 + - ``predecoder_memory_d13_T104_X.onnx`` + - 8 + - 8 + - 16 + +Example +^^^^^^^ + +Run the d13_r104 configuration at 500 req/s for 2 minutes with real syndrome +data: + +.. code-block:: bash + + ./build/libs/qec/unittests/realtime/test_realtime_predecoder_w_pymatching \ + d13_r104 2000 120 \ + --data-dir /path/to/syndrome_data/p0.003 + + +Changing the Predecoder Model +----------------------------- + +The ONNX model file for each configuration is set in the ``PipelineConfig`` +factory methods in +``libs/qec/unittests/realtime/predecoder_pipeline_common.h``. To use a +different model, edit the ``onnx_filename`` field and rebuild. + +.. code-block:: cpp + + static PipelineConfig d13_r104() { + return { + "d13_r104_X", 13, 104, + "predecoder_memory_model_4_d13_T104_X.onnx", // changed model + 8, 8, 16}; + } + +Then rebuild: + +.. code-block:: bash + + cmake --build build -j$(nproc) --target test_realtime_predecoder_w_pymatching + +ONNX model files and their corresponding ``.engine`` caches live in +``libs/qec/lib/realtime/``. If a cached engine exists with the same base name +as the ONNX file, TensorRT loads it directly. Otherwise, the engine is built +from the ONNX file on the first run. + + +Reading the Output +------------------ + +The benchmark prints a structured report after the streaming run completes. + +Throughput and Timing +^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: text + + Submitted: 60001 + Completed: 60001 + Throughput: 500.0 req/s + Backpressure stalls: 0 + +``Backpressure stalls`` counts how many times the producer had to spin because +all pipeline slots were occupied. Zero stalls means the pipeline kept up with +the injection rate. + +Latency Distribution +^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: text + + Latency (us) [steady-state, 59981 requests after 20 warmup] + min = 154.8 + p50 = 203.9 + mean = 215.5 + p99 = 363.4 + +End-to-end latency measured from ``injector.submit()`` to the completion +callback. Includes GPU inference, CPU-side PyMatching decode, and all pipeline +overhead. The first 20 requests are excluded as warmup. + +PyMatching Average Time +^^^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: text + + PyMatching decode: 75.6 us + +Average time for the PyMatching MWPM decoder to process a single residual +syndrome. + +Syndrome Density +^^^^^^^^^^^^^^^^ + +.. code-block:: text + + Input: 931.0 / 17472 (0.0533) + Output: 16.0 / 17472 (0.0009) + Reduction: 98.3% + +Average nonzero detectors before the predecoder (input) and after (residual +output). Higher reduction means the predecoder is removing more syndrome +weight, which reduces PyMatching decode time. + +Correctness Verification +^^^^^^^^^^^^^^^^^^^^^^^^ + +Printed only when ``--data-dir`` is provided: + +.. code-block:: text + + Pipeline (pred+pymatch) mismatches: 108 LER: 0.0018 + +- **Pipeline LER**: logical error rate of the full predecoder + PyMatching + chain compared to ground-truth observables. + +.. note:: + + Syndrome samples are cycled when the run exceeds the dataset size. + For example, if the dataset has 10,000 shots and the test runs 60,000 + requests, each shot is replayed approximately 6 times. Correctness + verification still compares against the correct ground truth for each + replayed shot. diff --git a/libs/qec/include/cudaq/qec/realtime/pipeline.h b/libs/qec/include/cudaq/qec/realtime/pipeline.h index bb968eef..9f4fb35f 100644 --- a/libs/qec/include/cudaq/qec/realtime/pipeline.h +++ b/libs/qec/include/cudaq/qec/realtime/pipeline.h @@ -8,6 +8,13 @@ #pragma once +/// @file pipeline.h +/// @brief Public API for the realtime decoding pipeline. +/// +/// Provides configuration structs, callback types, a software ring buffer +/// injector, and the @p realtime_pipeline class that orchestrates GPU +/// inference and CPU post-processing for low-latency QEC decoding. + #include #include #include @@ -21,21 +28,31 @@ namespace cudaq::qec::realtime::experimental { // Configuration // --------------------------------------------------------------------------- +/// @brief CPU core affinity settings for pipeline threads. struct core_pinning { - int dispatcher = -1; // -1 = no pinning + /// @brief Core for the host dispatcher thread. -1 disables pinning. + int dispatcher = -1; + /// @brief Core for the consumer (completion) thread. -1 disables pinning. int consumer = -1; - int worker_base = -1; // workers pin to base, base+1, ... + /// @brief Base core for worker threads. Workers pin to base, base+1, etc. + /// -1 disables pinning. + int worker_base = -1; }; +/// @brief Configuration for a single pipeline stage. struct pipeline_stage_config { + /// @brief Number of GPU worker threads (max 64). int num_workers = 8; + /// @brief Number of ring buffer slots. int num_slots = 32; + /// @brief Size of each ring buffer slot in bytes. size_t slot_size = 16384; + /// @brief CPU core affinity settings. core_pinning cores; - /// When non-null, the pipeline uses this caller-owned ring buffer - /// (cudaq_ringbuffer_t*) instead of allocating its own. The caller is - /// responsible for lifetime. ring_buffer_injector is unavailable in + /// @brief When non-null, the pipeline uses this caller-owned ring buffer + /// (cudaq_ringbuffer_t*) instead of allocating its own. The caller is + /// responsible for lifetime. ring_buffer_injector is unavailable in /// this mode (the FPGA/emulator owns the producer side). void *external_ringbuffer = nullptr; }; @@ -44,47 +61,72 @@ struct pipeline_stage_config { // GPU Stage Factory // --------------------------------------------------------------------------- +/// @brief Per-worker GPU resources returned by the gpu_stage_factory. +/// +/// Each worker owns a captured CUDA graph, a dedicated stream, and optional +/// pre/post launch callbacks for DMA staging or result extraction. struct gpu_worker_resources { + /// @brief Instantiated CUDA graph for this worker. cudaGraphExec_t graph_exec = nullptr; + /// @brief Dedicated CUDA stream for graph launches. cudaStream_t stream = nullptr; + /// @brief Optional callback invoked before graph launch (e.g. DMA copy). void (*pre_launch_fn)(void *user_data, void *slot_dev, cudaStream_t stream) = nullptr; + /// @brief Opaque user data passed to @p pre_launch_fn. void *pre_launch_data = nullptr; + /// @brief Optional callback invoked after graph launch. void (*post_launch_fn)(void *user_data, void *slot_dev, cudaStream_t stream) = nullptr; + /// @brief Opaque user data passed to @p post_launch_fn. void *post_launch_data = nullptr; + /// @brief RPC function ID that this worker handles. uint32_t function_id = 0; + /// @brief Opaque user context passed to cpu_stage_callback. void *user_context = nullptr; }; -/// Called once per worker during start(). Returns GPU resources for that -/// worker. +/// @brief Factory called once per worker during start(). +/// @param worker_id Zero-based worker index assigned by the pipeline. +/// @return GPU resources for the given worker. Any handles, callbacks, and +/// user data returned here must remain valid until the pipeline stops. using gpu_stage_factory = std::function; // --------------------------------------------------------------------------- // CPU Stage Callback // --------------------------------------------------------------------------- -/// Passed to the user's CPU stage callback on each completed GPU workload. -/// The user reads gpu_output, does post-processing, and writes the -/// result into response_buffer. No atomics are exposed. +/// @brief Context passed to the CPU stage callback for each completed GPU +/// workload. +/// +/// The callback reads @p gpu_output, performs post-processing (e.g. MWPM +/// decoding), and writes the result into @p response_buffer. struct cpu_stage_context { + /// @brief Index of the worker thread invoking this callback. int worker_id; + /// @brief Ring buffer slot that originated this request. int origin_slot; + /// @brief Pointer to GPU inference output (nullptr in poll mode). const void *gpu_output; + /// @brief Size of GPU output in bytes. size_t gpu_output_size; + /// @brief Destination buffer for the RPC response. void *response_buffer; + /// @brief Maximum number of bytes that can be written to @p response_buffer. size_t max_response_size; + /// @brief Opaque user context from gpu_worker_resources::user_context. void *user_context; }; -/// Returns the number of bytes written into response_buffer. +/// @brief CPU stage callback type. +/// @param ctx Poll-mode view of the current worker state and response buffer. +/// @return Number of bytes written into @p ctx.response_buffer. /// Return 0 if no GPU result is ready yet (poll again). /// Return DEFERRED_COMPLETION to release the worker immediately while /// deferring slot completion to a later complete_deferred() call. using cpu_stage_callback = std::function; -/// Sentinel return value from cpu_stage_callback: release the worker +/// @brief Sentinel return value from cpu_stage_callback: release the worker /// (idle_mask) but do NOT signal slot completion (tx_flags). The caller /// is responsible for calling realtime_pipeline::complete_deferred(slot) /// once the deferred work (e.g. a separate decode thread) finishes. @@ -94,41 +136,63 @@ static constexpr size_t DEFERRED_COMPLETION = SIZE_MAX; // Completion Callback // --------------------------------------------------------------------------- +/// @brief Metadata for a completed (or errored) pipeline request. struct completion { + /// @brief Original request ID from the RPC header. uint64_t request_id; + /// @brief Ring buffer slot that held this request. int slot; + /// @brief True if the request completed without CUDA errors. bool success; - int cuda_error; // 0 on success + /// @brief CUDA error code (0 on success). + int cuda_error; }; -/// Called by the consumer thread for each completed (or errored) request. +/// @brief Callback invoked by the consumer thread for each completed request. +/// @param c Metadata for the completed or errored request. using completion_callback = std::function; // --------------------------------------------------------------------------- // Ring Buffer Injector (software-only test/replay data source) // --------------------------------------------------------------------------- -/// Writes RPC-framed requests into the pipeline's ring buffer, simulating -/// FPGA DMA deposits. Created via realtime_pipeline::create_injector(). -/// The parent realtime_pipeline must outlive the injector. +/// @brief Writes RPC-framed requests into the pipeline's ring buffer, +/// simulating FPGA DMA deposits. +/// +/// Created via realtime_pipeline::create_injector(). The parent +/// realtime_pipeline must outlive the injector. Not available when the +/// pipeline is configured with an external ring buffer. class ring_buffer_injector { public: + /// @brief Destroy the injector state. ~ring_buffer_injector(); + /// @brief Move-construct an injector. ring_buffer_injector(ring_buffer_injector &&) noexcept; + /// @brief Move-assign an injector. ring_buffer_injector &operator=(ring_buffer_injector &&) noexcept; ring_buffer_injector(const ring_buffer_injector &) = delete; ring_buffer_injector &operator=(const ring_buffer_injector &) = delete; - /// Try to submit a request. Returns true if accepted, false if - /// backpressure (all slots busy). Non-blocking. Thread-safe. + /// @brief Try to submit a request without blocking. + /// @param function_id RPC function identifier. + /// @param payload Pointer to the payload data. + /// @param payload_size Size of the payload in bytes. + /// @param request_id Caller-assigned request identifier. + /// @return True if accepted, false if all slots are busy (backpressure). bool try_submit(uint32_t function_id, const void *payload, size_t payload_size, uint64_t request_id); - /// Blocking submit: spins until a slot becomes available. + /// @brief Submit a request, spinning until a slot becomes available. + /// @param function_id RPC function identifier. + /// @param payload Pointer to the payload data. + /// @param payload_size Size of the payload in bytes. + /// @param request_id Caller-assigned request identifier. void submit(uint32_t function_id, const void *payload, size_t payload_size, uint64_t request_id); + /// @brief Return the cumulative number of backpressure stalls. + /// @return Number of times submit() had to spin-wait for a free slot. uint64_t backpressure_stalls() const; private: @@ -142,56 +206,95 @@ class ring_buffer_injector { // Pipeline // --------------------------------------------------------------------------- +/// @brief Orchestrates GPU inference and CPU post-processing for low-latency +/// realtime QEC decoding. +/// +/// The pipeline manages a ring buffer, a host dispatcher thread, per-worker +/// GPU streams with captured CUDA graphs, optional CPU worker threads for +/// post-processing (e.g. PyMatching), and a consumer thread for completion +/// signaling. It supports both an internal ring buffer (for software testing +/// via ring_buffer_injector) and an external ring buffer (for FPGA RDMA). class realtime_pipeline { public: + /// @brief Construct a pipeline and allocate ring buffer resources. + /// @param config Stage configuration (slots, slot size, workers, etc.). + /// @note Construction allocates the backing ring buffer or binds the + /// caller-provided external ring so @ref ringbuffer_bases can be queried + /// before @ref start. explicit realtime_pipeline(const pipeline_stage_config &config); + /// @brief Stop the pipeline if needed and release owned resources. ~realtime_pipeline(); realtime_pipeline(const realtime_pipeline &) = delete; realtime_pipeline &operator=(const realtime_pipeline &) = delete; - /// Register the GPU stage factory (called before start). + /// @brief Register the GPU stage factory. Must be called before start(). + /// @param factory Callback that returns gpu_worker_resources per worker. void set_gpu_stage(gpu_stage_factory factory); - /// Register the CPU worker callback (called before start). + /// @brief Register the CPU worker callback. Must be called before start(). + /// @param callback Function invoked by each worker thread to poll for and + /// process completed GPU workloads. If not set, the pipeline operates in + /// GPU-only mode with completion signaled via cudaLaunchHostFunc. void set_cpu_stage(cpu_stage_callback callback); - /// Register the completion callback (called before start). + /// @brief Register the completion callback. Must be called before start(). + /// @param handler Function invoked by the consumer thread for each + /// completed or errored request. void set_completion_handler(completion_callback handler); - /// Allocate resources, build dispatcher config, spawn all threads. + /// @brief Allocate resources, build dispatcher config, and spawn all threads. + /// @throws std::logic_error If the GPU stage factory was not registered. + /// @throws std::logic_error If GPU-only mode is requested with an external + /// ring buffer. void start(); - /// Signal shutdown, join all threads, free resources. + /// @brief Signal shutdown, join all threads, free resources. + /// @note Safe to call multiple times. Subsequent calls are no-ops once the + /// pipeline has fully stopped. void stop(); - /// Create a software injector for testing without FPGA hardware. - /// The pipeline must be constructed but need not be started yet. + /// @brief Create a software injector for testing without FPGA hardware. + /// @return A ring_buffer_injector bound to this pipeline's ring buffer. + /// @throws std::logic_error if the pipeline uses an external ring buffer. ring_buffer_injector create_injector(); + /// @brief Pipeline throughput and backpressure statistics. struct Stats { + /// @brief Total requests submitted to the ring buffer. uint64_t submitted; + /// @brief Total requests that completed (success or error). uint64_t completed; + /// @brief Total packets dispatched by the host dispatcher. uint64_t dispatched; + /// @brief Cumulative producer backpressure stalls. uint64_t backpressure_stalls; }; - /// Thread-safe, lock-free stats snapshot. + /// @brief Thread-safe, lock-free stats snapshot. + /// @return Current pipeline statistics. Stats stats() const; - /// Signal that deferred processing for a slot is complete. - /// Call this from any thread after the cpu_stage callback returned + /// @brief Signal that deferred processing for a slot is complete. + /// + /// Call from any thread after the cpu_stage callback returned /// DEFERRED_COMPLETION and the deferred work has finished writing the /// response into the slot's ring buffer area. + /// @param slot Ring buffer slot index to complete. void complete_deferred(int slot); + /// @brief Host and device base addresses of the RX data ring. struct ring_buffer_bases { + /// @brief Host-mapped base pointer for the RX data ring. uint8_t *rx_data_host; + /// @brief Device-mapped base pointer for the RX data ring. uint8_t *rx_data_dev; }; - /// Return the host and device base addresses of the RX data ring. - /// Useful for pre_launch callbacks that need to convert between the two. + /// @brief Return the host and device base addresses of the RX data ring. + /// @return Struct containing both base pointers. + /// @note In external-ring mode these pointers are the caller-provided ring + /// addresses. In internal mode they refer to the owned mapped ring buffer. ring_buffer_bases ringbuffer_bases() const; private: diff --git a/libs/qec/lib/realtime/realtime_pipeline.cu b/libs/qec/lib/realtime/realtime_pipeline.cu index a10f16fb..6858be65 100644 --- a/libs/qec/lib/realtime/realtime_pipeline.cu +++ b/libs/qec/lib/realtime/realtime_pipeline.cu @@ -6,6 +6,12 @@ * the terms of the Apache License 2.0 which accompanies this distribution. ******************************************************************************/ +// Realtime pipeline implementation. +// +// Implements the mapped ring buffer, host dispatcher integration, GPU-only +// completion path, CPU polling worker threads, and consumer-side completion +// harvesting for realtime_pipeline. + #include "cudaq/qec/realtime/nvtx_helpers.h" #include "cudaq/qec/realtime/pipeline.h" #include "cudaq/realtime/daemon/dispatcher/cudaq_realtime.h" @@ -46,6 +52,7 @@ using atomic_int_sys = cuda::std::atomic; } \ } while (0) +// Pin a thread to a specific CPU core when requested. static void pin_thread(std::thread &t, int core) { if (core < 0) return; @@ -59,6 +66,10 @@ static void pin_thread(std::thread &t, int core) { // GPU-only mode: completion signaling via cudaLaunchHostFunc // --------------------------------------------------------------------------- +// Per-worker state for GPU-only completion signaling. When no cpu_stage +// callback is installed, the pipeline uses cudaLaunchHostFunc to mark +// completion on the worker's CUDA stream and then release the worker back to +// the dispatcher. struct GpuOnlyWorkerCtx { atomic_uint64_sys *tx_flags; atomic_uint64_sys *idle_mask; @@ -73,6 +84,7 @@ struct GpuOnlyWorkerCtx { uint64_t tx_value; }; +// Host callback launched on the worker CUDA stream in GPU-only mode. static void gpu_only_host_callback(void *user_data) { auto *ctx = static_cast(user_data); ctx->tx_flags[ctx->origin_slot].store(ctx->tx_value, @@ -81,6 +93,8 @@ static void gpu_only_host_callback(void *user_data) { cuda::std::memory_order_release); } +// Post-launch hook that chains user callbacks and schedules GPU-only +// completion signaling. static void gpu_only_post_launch(void *user_data, void *slot_dev, cudaStream_t stream) { NVTX_PUSH("GPUPostLaunch"); @@ -102,8 +116,13 @@ static void gpu_only_post_launch(void *user_data, void *slot_dev, // RingBufferManager // --------------------------------------------------------------------------- +// Manage a pinned, GPU-mapped ring buffer for host-device communication. +// +// This allocates rx/tx flag arrays and a data region using cudaHostAllocMapped +// so both CPU and GPU can access them via mapped pointers. class RingBufferManager { public: + // Allocate a ring buffer with the given slot count and size. RingBufferManager(size_t num_slots, size_t slot_size) : num_slots_(num_slots), slot_size_(slot_size) { PIPELINE_CUDA_CHECK(cudaHostAlloc( @@ -151,11 +170,13 @@ public: cudaFreeHost(rx_data_host_); } + // Check whether a slot's rx_flag is zero and therefore available. bool slot_available(uint32_t slot) const { auto *flags = reinterpret_cast(rx_flags_); return __atomic_load_n(&flags[slot], __ATOMIC_ACQUIRE) == 0; } + // Write an RPC request into a slot and signal the dispatcher. void write_and_signal(uint32_t slot, uint32_t function_id, const void *payload, uint32_t payload_len, uint32_t request_id = 0, uint64_t ptp_timestamp = 0) { @@ -165,21 +186,30 @@ public: cudaq_host_ringbuffer_signal_slot(&rb_, slot); } + // Poll the TX flag for a slot to check completion status. cudaq_tx_status_t poll_tx(uint32_t slot, int *cuda_error) const { return cudaq_host_ringbuffer_poll_tx_flag(&rb_, slot, cuda_error); } + // Clear a slot's rx and tx flags after completion. void clear_slot(uint32_t slot) { cudaq_host_ringbuffer_clear_slot(&rb_, slot); } + // Return the number of slots. size_t num_slots() const { return num_slots_; } + // Return the slot size in bytes. size_t slot_size() const { return slot_size_; } + // Return the host-side RX flag array. atomic_uint64_sys *rx_flags() { return rx_flags_; } + // Return the host-side TX flag array. atomic_uint64_sys *tx_flags() { return tx_flags_; } + // Return the host-mapped RX data base pointer. uint8_t *rx_data_host() { return rx_data_host_; } + // Return the device-mapped RX data base pointer. uint8_t *rx_data_dev() { return rx_data_dev_; } + // Return a const reference to the underlying cudaq_ringbuffer_t. const cudaq_ringbuffer_t &ringbuffer() const { return rb_; } private: @@ -200,6 +230,9 @@ private: // Impl // --------------------------------------------------------------------------- +// PIMPL implementation backing realtime_pipeline. Owns the +// dispatcher-facing ring buffer state, worker resources, completion +// accounting, and thread lifecycle machinery hidden from the public header. struct realtime_pipeline::Impl { pipeline_stage_config config; @@ -258,6 +291,7 @@ struct realtime_pipeline::Impl { // Lifecycle // ----------------------------------------------------------------------- + // Allocate mapped ring state and mailbox storage for the pipeline. void allocate(const pipeline_stage_config &cfg) { if (cfg.num_workers > 64) { throw std::invalid_argument("num_workers (" + @@ -288,6 +322,8 @@ struct realtime_pipeline::Impl { slot_occupied.resize(cfg.num_slots, 0); } + // Build worker resources, configure the host dispatcher, and spawn all + // runtime threads. void start_threads() { if (!gpu_factory) { throw std::logic_error("gpu_factory must be set before calling start()"); @@ -409,6 +445,8 @@ struct realtime_pipeline::Impl { started = true; } + /// @brief Stop the dispatcher, worker, and consumer threads in dependency + /// order. void stop_all() { if (!started) return; @@ -445,6 +483,7 @@ struct realtime_pipeline::Impl { started = false; } + /// @brief Release owned mapped-memory allocations after shutdown. void free_resources() { ring.reset(); if (h_mailbox_bank) { @@ -457,6 +496,11 @@ struct realtime_pipeline::Impl { // Worker loop (one per worker thread) // ----------------------------------------------------------------------- + /// @brief Poll-mode CPU worker loop. + /// @param worker_id Zero-based worker index. + /// @details Each worker repeatedly polls its user-supplied cpu_stage + /// callback, writes the response status into the ring buffer when work + /// completes, and returns itself to the dispatcher's idle mask. void worker_loop(int worker_id) { auto *wr = &worker_resources[worker_id]; @@ -507,6 +551,11 @@ struct realtime_pipeline::Impl { // Consumer loop // ----------------------------------------------------------------------- + /// @brief Harvest completed ring-buffer slots and invoke the completion + /// handler. + /// @details The consumer owns slot completion accounting and the ordering + /// required when clearing @c slot_occupied before resetting the shared ring + /// buffer flags on ARM and x86 hosts. void consumer_loop() { const uint32_t ns = static_cast(config.num_slots); @@ -584,37 +633,55 @@ struct realtime_pipeline::Impl { // realtime_pipeline public API // --------------------------------------------------------------------------- +// Construction eagerly binds the active ring buffer and allocates the mapped +// mailbox bank so callers can inspect ring addresses before start(). realtime_pipeline::realtime_pipeline(const pipeline_stage_config &config) : impl_(std::make_unique()) { impl_->allocate(config); } +// Destruction is shutdown-safe: any still-running threads are joined before +// mapped host/device resources are released. realtime_pipeline::~realtime_pipeline() { if (impl_->started) impl_->stop_all(); impl_->free_resources(); } +// The factory is stored and invoked later during start() so each worker can +// build stream-local graph state exactly once. void realtime_pipeline::set_gpu_stage(gpu_stage_factory factory) { impl_->gpu_factory = std::move(factory); } +// Installing a CPU stage switches the pipeline into deferred completion mode, +// where worker threads poll for GPU readiness and decide when to publish +// tx_flags. void realtime_pipeline::set_cpu_stage(cpu_stage_callback callback) { impl_->cpu_stage = std::move(callback); } +// The completion handler runs on the dedicated consumer thread after the +// shared ring buffer indicates either success or CUDA error completion. void realtime_pipeline::set_completion_handler(completion_callback handler) { impl_->completion_handler = std::move(handler); } +// Repeated calls after a successful start are ignored so callers can treat +// start() as idempotent during setup sequences. void realtime_pipeline::start() { if (impl_->started) return; impl_->start_threads(); } +// stop() delegates to the internal shutdown path, which first allows +// in-flight requests to drain and then tears down the dispatcher and worker +// threads. void realtime_pipeline::stop() { impl_->stop_all(); } +// The returned counters are sampled lock-free and may race with live updates, +// but each field is individually coherent. realtime_pipeline::Stats realtime_pipeline::stats() const { return {impl_->total_submitted.load(std::memory_order_relaxed), impl_->total_completed.load(std::memory_order_relaxed), @@ -622,11 +689,16 @@ realtime_pipeline::Stats realtime_pipeline::stats() const { impl_->backpressure_stalls.load(std::memory_order_relaxed)}; } +// In external-ring mode this exposes the caller-owned ring pointers; otherwise +// it returns the internally allocated mapped ring buffer bases. realtime_pipeline::ring_buffer_bases realtime_pipeline::ringbuffer_bases() const { return {impl_->active_rb_.rx_data_host, impl_->active_rb_.rx_data}; } +// Deferred completions publish the slot host pointer into the tx_flags array +// using release ordering so the consumer can safely observe the completed +// response payload. void realtime_pipeline::complete_deferred(int slot) { uint8_t *slot_host = impl_->active_rb_.rx_data_host + static_cast(slot) * impl_->config.slot_size; @@ -650,6 +722,8 @@ struct ring_buffer_injector::State { std::atomic next_slot{0}; }; +// The injector captures pointers into the pipeline's submission and +// bookkeeping state so software tests can emulate FPGA DMA writes. ring_buffer_injector realtime_pipeline::create_injector() { if (impl_->external_ring_) { throw std::logic_error( @@ -667,15 +741,25 @@ ring_buffer_injector realtime_pipeline::create_injector() { return ring_buffer_injector(std::move(s)); } +// Ownership of the shared injector state transfers with the move. ring_buffer_injector::ring_buffer_injector(std::unique_ptr s) : state_(std::move(s)) {} +// Destruction is trivial because the parent pipeline owns the ring buffer and +// completion bookkeeping. ring_buffer_injector::~ring_buffer_injector() = default; +// Moving an injector transfers the submission cursor and shared state handle +// without touching the underlying ring buffer. ring_buffer_injector::ring_buffer_injector(ring_buffer_injector &&) noexcept = default; +// Moving an injector transfers the submission cursor and shared state handle +// without touching the underlying ring buffer. ring_buffer_injector & ring_buffer_injector::operator=(ring_buffer_injector &&) noexcept = default; +// try_submit() attempts a single-slot claim using the shared round-robin +// cursor and returns immediately if the chosen slot is still occupied or +// another thread wins the cursor race. bool ring_buffer_injector::try_submit(uint32_t function_id, const void *payload, size_t payload_size, uint64_t request_id) { @@ -700,6 +784,8 @@ bool ring_buffer_injector::try_submit(uint32_t function_id, const void *payload, return true; } +// submit() spin-waits with CUDAQ_REALTIME_CPU_RELAX until a slot becomes +// available or the producer stop flag is raised during shutdown. void ring_buffer_injector::submit(uint32_t function_id, const void *payload, size_t payload_size, uint64_t request_id) { while (!try_submit(function_id, payload, payload_size, request_id)) { @@ -711,6 +797,8 @@ void ring_buffer_injector::submit(uint32_t function_id, const void *payload, } } +// This mirrors the pipeline-wide counter used for throughput and backpressure +// reporting. uint64_t ring_buffer_injector::backpressure_stalls() const { return state_->backpressure_stalls->load(std::memory_order_relaxed); } diff --git a/libs/qec/unittests/realtime/predecoder_pipeline_common.cpp b/libs/qec/unittests/realtime/predecoder_pipeline_common.cpp index 9f14ce11..4fbc7321 100644 --- a/libs/qec/unittests/realtime/predecoder_pipeline_common.cpp +++ b/libs/qec/unittests/realtime/predecoder_pipeline_common.cpp @@ -6,6 +6,12 @@ * the terms of the Apache License 2.0 which accompanies this distribution. * ******************************************************************************/ +// Implementation file for shared predecoder pipeline utilities. +// +// Provides the runtime behavior behind the helper types declared in +// predecoder_pipeline_common.h, including the TensorRT pre-launch staging +// callback, the deferred PyMatching job queue, and binary dataset loaders. + #include "predecoder_pipeline_common.h" #include @@ -15,6 +21,10 @@ // Pre-launch DMA copy callback // ============================================================================= +// Store the most recent slot pointer in the mailbox bank so the CPU polling +// path can map TensorRT completion back to the original ring slot, then issue +// an asynchronous host-to-device copy of the detector payload located +// immediately after the RPC header. void pre_launch_input_copy(void *user_data, void *slot_dev, cudaStream_t stream) { NVTX_PUSH("PreLaunchCopy"); @@ -33,6 +43,8 @@ void pre_launch_input_copy(void *user_data, void *slot_dev, // PyMatchQueue // ============================================================================= +// Keep the critical section short so predecoder workers can hand off jobs to +// the PyMatching pool without stalling the realtime pipeline. void PyMatchQueue::push(PyMatchJob &&j) { { std::lock_guard lk(mtx_); @@ -41,6 +53,8 @@ void PyMatchQueue::push(PyMatchJob &&j) { cv_.notify_one(); } +// Workers sleep until a new job arrives or shutdown() is called. Returning +// false signals that the worker should exit. bool PyMatchQueue::pop(PyMatchJob &out) { std::unique_lock lk(mtx_); cv_.wait(lk, [&] { return !jobs_.empty() || stop_; }); @@ -51,6 +65,8 @@ bool PyMatchQueue::pop(PyMatchJob &out) { return true; } +// Wake all waiting workers so they can observe stop_ and exit once the queue +// has drained. void PyMatchQueue::shutdown() { { std::lock_guard lk(mtx_); @@ -63,6 +79,8 @@ void PyMatchQueue::shutdown() { // SparseCSR // ============================================================================= +// Expand the CSR representation into a dense row-major tensor so helper code +// can build decoders using the regular tensor-based APIs. cudaqx::tensor SparseCSR::to_dense() const { cudaqx::tensor T; std::vector data(static_cast(nrows) * ncols, 0); @@ -73,6 +91,8 @@ cudaqx::tensor SparseCSR::to_dense() const { return T; } +// Primarily used to fetch the first observable row for projecting residual +// corrections onto logical parity bits. std::vector SparseCSR::row_dense(uint32_t r) const { std::vector row(ncols, 0); for (int32_t j = indptr[r]; j < indptr[r + 1]; ++j) @@ -84,6 +104,8 @@ std::vector SparseCSR::row_dense(uint32_t r) const { // Test data loaders // ============================================================================= +// The binary format starts with a uint32 row/column header followed by +// row-major int32 payload data. bool load_binary_file(const std::string &path, uint32_t &out_rows, uint32_t &out_cols, std::vector &data) { std::ifstream f(path, std::ios::binary); @@ -97,6 +119,8 @@ bool load_binary_file(const std::string &path, uint32_t &out_rows, return f.good(); } +// Return a default-constructed TestData on any load or consistency failure so +// callers can use TestData::loaded() as the success check. TestData load_test_data(const std::string &data_dir) { TestData td; std::string det_path = data_dir + "/detectors.bin"; @@ -126,6 +150,7 @@ TestData load_test_data(const std::string &data_dir) { return td; } +// The file layout is [nrows, ncols, nnz, indptr..., indices...]. bool load_csr(const std::string &path, SparseCSR &out) { std::ifstream f(path, std::ios::binary); if (!f.good()) @@ -142,6 +167,8 @@ bool load_csr(const std::string &path, SparseCSR &out) { return f.good(); } +// Missing optional files leave the corresponding members empty while still +// allowing the benchmark to proceed with the available data. StimData load_stim_data(const std::string &data_dir) { StimData sd; diff --git a/libs/qec/unittests/realtime/predecoder_pipeline_common.h b/libs/qec/unittests/realtime/predecoder_pipeline_common.h index 34a65a38..6044ea75 100644 --- a/libs/qec/unittests/realtime/predecoder_pipeline_common.h +++ b/libs/qec/unittests/realtime/predecoder_pipeline_common.h @@ -11,7 +11,9 @@ /// /// Used by both the software-only benchmark /// (test_realtime_predecoder_w_pymatching.cpp) and the FPGA bridge -/// (hololink_predecoder_bridge.cpp). +/// (hololink_predecoder_bridge.cpp). These helpers are example and test support +/// code rather than part of the stable library API, but documenting them keeps +/// the benchmark and bridge configuration visible from the generated docs. #pragma once @@ -62,21 +64,41 @@ namespace rt_pipeline = cudaq::qec::realtime::experimental; // Pipeline Configuration // ============================================================================= +/// @brief Maximum number of pipeline ring buffer slots. +/// @details Shared by the benchmark and bridge so both data sources use the +/// same ring depth assumptions when staging requests into the realtime +/// pipeline. constexpr size_t NUM_SLOTS = 16; +/// @brief Named configuration for a predecoder pipeline instance. +/// +/// Each factory method returns a preset combining QEC code parameters +/// (distance, rounds) with an ONNX model filename and thread/worker counts +/// tuned for that model size. struct PipelineConfig { + /// @brief Human-readable label (e.g. "d13_r104_X"). std::string label; + /// @brief QEC surface code distance. int distance; + /// @brief Number of QEC syndrome measurement rounds. int num_rounds; + /// @brief ONNX model filename (looked up under ONNX_MODEL_DIR). std::string onnx_filename; + /// @brief Number of parallel TensorRT predecoder instances. int num_predecoders; + /// @brief Number of pipeline GPU worker threads. int num_workers; + /// @brief Number of PyMatching decode worker threads. int num_decode_workers; + /// @brief Full path to the ONNX model file. + /// @return Concatenation of ONNX_MODEL_DIR and @p onnx_filename. std::string onnx_path() const { return std::string(ONNX_MODEL_DIR) + "/" + onnx_filename; } + /// @brief Full path to the cached TensorRT engine file. + /// @return Same as onnx_path() but with a .engine extension. std::string engine_path() const { std::string name = onnx_filename; auto dot = name.rfind('.'); @@ -85,31 +107,47 @@ struct PipelineConfig { return std::string(ONNX_MODEL_DIR) + "/" + name + ".engine"; } + /// @brief Distance-7, 7-round Z-basis config. static PipelineConfig d7_r7() { return {"d7_r7_Z", 7, 7, "model1_d7_r7_unified_Z_batch1.onnx", 16, 16, 32}; } + /// @brief Distance-13, 13-round X-basis config. static PipelineConfig d13_r13() { return {"d13_r13_X", 13, 13, "predecoder_memory_d13_T13_X.onnx", 16, 16, 32}; } + /// @brief Distance-13, 104-round X-basis config. static PipelineConfig d13_r104() { return { "d13_r104_X", 13, 104, "predecoder_memory_d13_T104_X.onnx", 8, 8, 16}; } + /// @brief Distance-21, 21-round Z-basis config. static PipelineConfig d21_r21() { return {"d21_r21_Z", 21, 21, "model1_d21_r21_unified_X_batch1.onnx", 16, 16, 32}; } + /// @brief Distance-21, 42-round X-basis config. + static PipelineConfig d21_r42() { + return {"d21_r42_X", 21, 42, "predecoder_memory_model_1_d21_T42_X.onnx", + 8, 8, 16}; + } + + /// @brief Distance-31, 31-round Z-basis config. static PipelineConfig d31_r31() { return {"d31_r31_Z", 31, 31, "model1_d31_r31_unified_Z_batch1.onnx", 16, 16, 32}; } }; +/// @brief Round a value up to the next power of two. +/// @param v Input value (must be > 0). +/// @return Smallest power of two >= @p v. +/// @details Used when sizing buffers that must satisfy alignment or TensorRT +/// engine requirements. inline size_t round_up_pow2(size_t v) { v--; v |= v >> 1; @@ -125,28 +163,51 @@ inline size_t round_up_pow2(size_t v) { // Decoder Context (PyMatching worker pool) // ============================================================================= +/// @brief Shared state for a pool of PyMatching MWPM decoder instances. +/// +/// Each decode worker thread acquires its own decoder via acquire_decoder() +/// and accumulates timing and syndrome density statistics atomically. The +/// context also carries shape information needed to interpret the predecoder +/// residual syndrome output. struct DecoderContext { + /// @brief Pool of decoder instances (one per decode worker thread). std::vector> decoders; + /// @brief Atomic counter for round-robin decoder assignment. std::atomic next_decoder_idx{0}; + /// @brief Number of Z stabilizers (for spatial slice decoding). int z_stabilizers = 0; + /// @brief Number of spatial slices for per-slice decoding. int spatial_slices = 0; + /// @brief Number of residual detectors in the predecoder output. int num_residual_detectors = 0; + /// @brief If true, use the full H matrix for decoding (not per-slice). bool use_full_H = false; + /// @brief Acquire a thread-local decoder from the pool. + /// @return Pointer to the decoder assigned to the calling thread. + /// @details The first call on each thread claims a stable decoder index via + /// @c next_decoder_idx so repeated calls from that thread reuse the same + /// decoder instance. cudaq::qec::decoder *acquire_decoder() { thread_local int my_idx = next_decoder_idx.fetch_add(1, std::memory_order_relaxed); return decoders[my_idx % decoders.size()].get(); } + /// @brief Cumulative PyMatching decode time in microseconds. std::atomic total_decode_us{0}; + /// @brief Cumulative total worker time in microseconds. std::atomic total_worker_us{0}; + /// @brief Number of decode operations completed (for averaging). std::atomic decode_count{0}; + /// @brief Number of input detectors (before predecoder). int num_input_detectors = 0; + /// @brief Cumulative nonzero count in input syndromes. std::atomic total_input_nonzero{0}; + /// @brief Cumulative nonzero count in output (residual) syndromes. std::atomic total_output_nonzero{0}; }; @@ -154,14 +215,29 @@ struct DecoderContext { // Pre-launch DMA copy callback // ============================================================================= +/// @brief Context for the pre-launch callback that copies ring buffer data +/// into the TensorRT input buffer before graph launch. struct PreLaunchCopyCtx { + /// @brief Device pointer to the TensorRT input tensor. void *d_trt_input; + /// @brief Size of the input tensor in bytes. size_t input_size; + /// @brief Host mailbox bank written by the dispatcher callback. + /// @details The current slot device pointer is published here so the CPU + /// polling path can recover the originating ring slot after TensorRT + /// inference completes. void **h_ring_ptrs; + /// @brief Device-mapped base of the RX data ring. uint8_t *rx_data_dev_base; + /// @brief Host-mapped base of the RX data ring. uint8_t *rx_data_host_base; }; +/// @brief Pre-launch callback: async-copies detector data from the ring buffer +/// slot into the TensorRT input tensor. +/// @param user_data Pointer to a PreLaunchCopyCtx. +/// @param slot_dev Device pointer to the ring buffer slot. +/// @param stream CUDA stream for the async copy. void pre_launch_input_copy(void *user_data, void *slot_dev, cudaStream_t stream); @@ -169,18 +245,36 @@ void pre_launch_input_copy(void *user_data, void *slot_dev, // Worker context // ============================================================================= +/// @brief Per-worker context passed through gpu_worker_resources::user_context. +/// @details Bridges the GPU predecoder worker with the downstream PyMatching +/// decode pool and correctness-tracking arrays used by the benchmark. struct WorkerCtx { + /// @brief Pointer to the AI predecoder service for this worker. ai_predecoder_service *predecoder; + /// @brief Shared decoder context (PyMatching pool and statistics). DecoderContext *decoder_ctx; + /// @brief Array to store per-request total correction counts. + /// Indexed by request_id when correctness checking is enabled. int32_t *decode_corrections = nullptr; + /// @brief Array to store per-request logical prediction parity. + /// Indexed by request_id when correctness checking is enabled. int32_t *decode_logical_pred = nullptr; + /// @brief Maximum number of requests to track. int max_requests = 0; + /// @brief Observable row from the O matrix for logical projection. const uint8_t *obs_row = nullptr; + /// @brief Length of the observable row. size_t obs_row_size = 0; }; +/// @brief Packed RPC response containing decode results. +/// @details This payload is written directly into the pipeline response buffer +/// after the RPC header, so the struct remains packed to match the transport +/// framing exactly. struct __attribute__((packed)) DecodeResponse { + /// @brief Total number of corrections applied (predecoder + PyMatching). int32_t total_corrections; + /// @brief Whether the PyMatching decoder converged (1 = yes). int32_t converged; }; @@ -188,16 +282,33 @@ struct __attribute__((packed)) DecodeResponse { // PyMatching work queue // ============================================================================= +/// @brief A single PyMatching decode job queued from the CPU stage. struct PyMatchJob { + /// @brief Pipeline slot that originated this job. int origin_slot; + /// @brief RPC request ID from the header. uint64_t request_id; + /// @brief Pointer to the ring buffer data for this slot. void *ring_buffer_ptr; }; +/// @brief Thread-safe queue for dispatching PyMatching decode jobs. +/// @details The queue connects the pipeline CPU stage, which only harvests +/// completed predecoder outputs, to a separate pool of PyMatching workers that +/// can finish decoding asynchronously and later call +/// @ref cudaq::qec::realtime::experimental::realtime_pipeline::complete_deferred. class PyMatchQueue { public: + /// @brief Enqueue a job and wake one waiting worker. + /// @param j The job to enqueue (moved). void push(PyMatchJob &&j); + + /// @brief Dequeue a job, blocking until one is available or shutdown. + /// @param out Output: the dequeued job. + /// @return True if a job was dequeued, false if the queue is shut down. bool pop(PyMatchJob &out); + + /// @brief Signal all waiting workers to drain and exit. void shutdown(); private: @@ -211,20 +322,39 @@ class PyMatchQueue { // Test data (pre-generated from Stim, or random) // ============================================================================= +/// @brief Container for pre-generated syndrome test data. +/// +/// Loaded from binary files (detectors.bin, observables.bin) and accessed +/// by sample index with automatic wraparound. This lets the benchmark replay a +/// fixed corpus indefinitely at a target request rate. struct TestData { + /// @brief Flattened detector samples (num_samples * num_detectors). std::vector detectors; + /// @brief Flattened observable labels (num_samples * num_observables). std::vector observables; + /// @brief Number of syndrome samples in the dataset. uint32_t num_samples = 0; + /// @brief Number of detectors per sample. uint32_t num_detectors = 0; + /// @brief Number of observables per sample. uint32_t num_observables = 0; + /// @brief Check whether data was successfully loaded. + /// @return True if samples and detectors are present. bool loaded() const { return num_samples > 0 && num_detectors > 0; } + /// @brief Return a pointer to the detector array for a given sample. + /// @param idx Sample index (wraps modulo num_samples). + /// @return Pointer to num_detectors int32 values. const int32_t *sample(int idx) const { return detectors.data() + (static_cast(idx % num_samples) * num_detectors); } + /// @brief Return a single observable value for a given sample. + /// @param idx Sample index (wraps modulo num_samples). + /// @param obs Observable index within the sample. + /// @return The observable value (0 or 1). int32_t observable(int idx, int obs = 0) const { return observables[static_cast(idx % num_samples) * num_observables + @@ -232,31 +362,74 @@ struct TestData { } }; +/// @brief Load a binary file with a (rows, cols) header and int32 data. +/// @param path File path. +/// @param out_rows Output: number of rows. +/// @param out_cols Output: number of columns. +/// @param data Output: flattened row-major data. +/// @return True on success. bool load_binary_file(const std::string &path, uint32_t &out_rows, uint32_t &out_cols, std::vector &data); +/// @brief Load test data (detectors + observables) from a directory. +/// @param data_dir Directory containing detectors.bin and observables.bin. +/// @return Populated TestData (check loaded() for success). TestData load_test_data(const std::string &data_dir); // ============================================================================= // Stim-derived parity check matrix loader (CSR sparse -> dense tensor) // ============================================================================= +/// @brief Sparse matrix in Compressed Sparse Row (CSR) format. +/// +/// Used to store parity check matrices (H) and observable matrices (O) +/// loaded from binary files before converting them into the dense tensor form +/// expected by the decoder construction helpers. struct SparseCSR { - uint32_t nrows = 0, ncols = 0, nnz = 0; + /// @brief Number of rows. + uint32_t nrows = 0; + /// @brief Number of columns. + uint32_t ncols = 0; + /// @brief Number of nonzero entries. + uint32_t nnz = 0; + /// @brief Row pointer array (size nrows+1). std::vector indptr; + /// @brief Column index array (size nnz). std::vector indices; + /// @brief Check whether the matrix was loaded. + /// @return True if dimensions are nonzero. bool loaded() const { return nrows > 0 && ncols > 0; } + /// @brief Convert to a dense uint8 tensor. + /// @return Dense tensor of shape [nrows, ncols] with 0/1 entries. cudaqx::tensor to_dense() const; + + /// @brief Extract a single row as a dense vector. + /// @param r Row index. + /// @return Dense vector of length ncols with 0/1 entries. std::vector row_dense(uint32_t r) const; }; +/// @brief Collection of Stim-derived data for configuring PyMatching. +/// @details Bundles the parity check matrix, observable projection matrix, and +/// optional edge priors emitted by the offline Stim export step. struct StimData { + /// @brief Parity check matrix (H) in CSR format. SparseCSR H; + /// @brief Observables matrix (O) in CSR format. SparseCSR O; + /// @brief Per-edge error probabilities. std::vector priors; }; +/// @brief Load a sparse CSR matrix from a binary file. +/// @param path File path (expects nrows, ncols, nnz header + indptr + indices). +/// @param out Output SparseCSR struct. +/// @return True on success. bool load_csr(const std::string &path, SparseCSR &out); + +/// @brief Load Stim-derived data (H, O, priors) from a directory. +/// @param data_dir Directory containing H_csr.bin, O_csr.bin, priors.bin. +/// @return Populated StimData (check H.loaded() for success). StimData load_stim_data(const std::string &data_dir);