diff --git a/Cargo.lock b/Cargo.lock index 3ab1b68..1824e8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -281,6 +281,17 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "core_affinity" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a034b3a7b624016c6e13f5df875747cc25f884156aad2abd12b6c46797971342" +dependencies = [ + "libc", + "num_cpus", + "winapi", +] + [[package]] name = "crc32fast" version = "1.5.0" @@ -353,16 +364,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "crossbeam-skiplist" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" -dependencies = [ - "crossbeam-epoch", - "crossbeam-utils", -] - [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -486,6 +487,15 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "getrandom" version = "0.3.4" @@ -723,6 +733,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "num_enum" version = "0.7.5" @@ -940,9 +960,10 @@ dependencies = [ "assert_no_alloc", "bytemuck", "clap", + "core_affinity", "criterion", - "crossbeam-skiplist", "dbn", + "fxhash", "hdrhistogram", "memmap2", "spdlog-rs", diff --git a/Cargo.toml b/Cargo.toml index f322edb..b1c85dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,10 +8,11 @@ authors = ["Your Name"] [dependencies] bytemuck = { version = "1.25.0", features = ["derive"] } memmap2 = "0.9.9" -crossbeam-skiplist = "0.1" clap = { version = "4.5.57", features = ["derive"] } hdrhistogram = "7.5" spdlog-rs = "0.5.2" +core_affinity = "0.8.1" +fxhash = "0.2.1" [dev-dependencies] assert_no_alloc = { version = "1.1.2" } diff --git a/DESIGN.md b/DESIGN.md index dd8913f..5870f53 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -21,17 +21,19 @@ with hardware realities. ## 2. System Architecture -The system follows a **Shared-Nothing** architecture for logic (workers don't share state directly), but a * -*Shared-Memory** architecture for data. +The system follows a **Shared-Nothing** architecture for logic (workers don't share state directly), but a **Shared-Memory** architecture for data. ### 2.1 The Engine (Orchestrator) -The `RodaEngine` acts as the system's bootloader, managing resources and thread lifecycle. +Roda provides two levels of orchestration: + +1. **RodaEngine:** The low-level bootloader. It manages thread lifecycles and provides the factory for creating `JournalStore` and `SlotStore`. +2. **StageEngine:** A high-level, type-safe pipeline builder. It chains multiple processing stages, automatically managing the intermediate `JournalStore` buffers and spawning worker threads for each stage. **Core Responsibilities:** * **Memory Management:** Allocates large, contiguous memory blocks via `mmap` and initializes shared structures (ring buffers, headers). -* **Thread Orchestration:** Spawns long-lived worker threads, optionally pinning them to specific CPU cores (`isolcpus`) for deterministic execution. +* **Thread Orchestration:** Spawns long-lived worker threads, optionally pinning them to specific CPU cores for deterministic execution. **Worker Execution Model:** Workers execute user pipelines in a continuous loop using an **Adaptive Backoff Strategy** to balance latency and efficiency: @@ -40,63 +42,85 @@ Workers execute user pipelines in a continuous loop using an **Adaptive Backoff 2. **CPU Relax (Warm Path):** After empty cycles, emits `PAUSE` instructions (`std::hint::spin_loop`) to reduce power usage. 3. **Park/Sleep (Cold Path):** After extended inactivity, yields the thread to the OS scheduler to save resources until new data arrives. -### 2.2 The Store (The Source of Truth) +### 2.2 The Stores (Source of Truth) + +Roda uses two primary storage types, both backed by memory-mapped files: + +* **JournalStore:** A fixed-capacity, append-only buffer (a "Journal"). Ideal for event streams, logs, and time-series data. +* **SlotStore:** A fixed-capacity buffer where items are accessed and updated by their index (or "slot"). Ideal for shared state maps, lookup tables, and order books. -The `StoreJournal` is a fixed-capacity append-only buffer backed by memory-mapped files. +**Characteristics:** * **Memory Layout:** `[ Header (Atomics) | Data Region (T...) | Padding ]`. * **Write Model:** **Single Writer**. Only one thread (the owner of the `Store` handle) can write, eliminating write-side contention. -* **Read Model:** **Multiple Readers**. Each reader (or worker) uses an independent `StoreJournalReader` handle - that maintains its own +* **Read Model:** **Multiple Readers**. Each reader uses an independent `StoreReader` handle that maintains its own state (cursor). -* **Addressing:** Data is addressed by a monotonic `u64` sequence number (`Cursor`). The physical address is - `Cursor * sizeof(T)`. -* **Full Buffer Policy:** If the store is full, it will panic on the next `push`. No wrapping or overwriting occurs. +* **Addressing:** Data in journals is addressed by a monotonic `u64` sequence number. In slot stores, it is addressed by a direct `usize` index. +* **Full Buffer Policy:** If the store is full, it will panic on the next `push`/`append`. No wrapping or overwriting occurs. -### 2.3 StoreReader & Traits +### 2.3 Store Traits & Readers -Roda uses traits to define the behavior of stores and readers, allowing for different implementations (like the default -`StoreJournal`). +Roda uses traits to define the behavior of stores and readers, allowing for different implementations. -* **Store Trait:** Defines `push`, `reader`, and `direct_index`. -* **StoreReader Trait:** Defines `next`, `with`, `with_at`, `with_last`, `get`, `get_at`, `get_last`, and `get_window`. -* **Explicit Advancement:** Each `StoreReader` maintains its own `LocalCursor`. - The cursor is moved next everytime `next()` is called. So inside a worker for all used store readers `next()` function - must be - called. +* **Appendable Trait:** Defines `append` (for `JournalStore`). +* **Settable Trait:** Defines `set` (for `SlotStore`). +* **IterativeReadable Trait:** Defines `next`, `get`, and `get_index` for cursor-based reading. +* **Explicit Advancement:** Each reader maintains its own `LocalCursor`. + The cursor is moved next everytime `next()` is called. * **Synchronicity by Design:** Each worker is designed to process a single unit of work in each cycle. Explicit `next()` - calls give the developer control over when data is consumed relative to other operations (like indexing). + calls give the developer control over when data is consumed. If there are no more data to read, the cursor will simply stay at the end of the store. No need to handle any special case. +### 2.4 Data Transfer: The Producer-Consumer Loop + +Data is transferred between stages without copying or message passing. Instead, Roda uses a **Shared-Memory Producer-Consumer** pattern: + +1. **Shared Memory (mmap):** A `JournalStore` allocates a contiguous memory region. +2. **Atomic Write Index:** A shared counter (Atomic) that tracks the end of valid data in the store. +3. **Local Read Index:** A private counter maintained by each `StoreReader` (consumer) to track its own progress. + +#### Transfer Flow + +Architecture Image + +#### Step-by-Step Mechanism: +1. **Stage N (Producer)** appends data to the `Free Slot` (at the current `Write Index`). +2. The Producer performs an **Atomic Store** with **Release** semantics on the `Write Index`. This ensures that all previous memory writes (the data) are visible to any thread that subsequently loads the index with **Acquire** semantics. +3. **Stage N+1 (Consumer)** polls the `Write Index` using **Atomic Load** with **Acquire** semantics. +4. The Consumer compares the `Write Index` with its private `Local Read Index`. +5. If `Write Index > Local Read Index`, new data is available. The Consumer reads the data directly from the `Data Region` (Zero-Copy) at its `Local Read Index`. +6. The Consumer increments its `Local Read Index`. + --- -### 3. The Index (O(1) Access) +### 3. The SlotStore & Indexing -The `DirectIndex` is a derivative structure that maps a `Key` to a `Cursor` in a `Store`. +While `JournalStore` provides a chronological record, `SlotStore` allows for O(1) random access to state by "slots". -* **Storage:** Also backed by `mmap`. -* **Manual Update:** The index is **not** automatically updated when the store is written. The developer must explicitly - call the `compute` method (typically inside a worker) to index new data. -* **Consistency:** The developer controls when the index is updated relative to other operations. -* **Safety:** A reader might see data before it is indexed, but will never see an index entry pointing to invalid or - uninitialized data. +* **Storage:** Backed by `mmap`, similar to journals. +* **Usage:** Can be used to maintain the "current state" of various entities (e.g., current price of 10,000 different symbols). +* **Consistency:** The developer controls when a slot is updated. Readers use snapshot-based retry logic to ensure they see a consistent version of the data without using locks. --- -## 4. Pipeline Primitives +## 4. Pipeline Primitives (Stages & Pipes) + +Roda enables **Declarative Multistage Pipelines** by chaining `Pipe` components into `Stages`. + +* **Stage:** A unit of execution that runs in a dedicated thread. It consumes data from one `JournalStore` and appends results to the next one in the chain. +* **Pipe:** A composable processing logic that can be chained within a single stage using the `pipe!` macro. + +**Available Components:** -Roda enables **Declarative Pipelines** by chaining these primitives using a builder pattern: +* **Stateful:** Implements partitioned reduction. It maintains a `HashMap` of state keyed by a user-defined function. +* **Delta:** Compares the current incoming item with the previous one for the same key. Useful for anomaly detection or calculating rates of change. +* **DedupBy:** Filters out redundant items if the calculated key matches the last seen key for that partition. +* **Map/Filter/Inspect:** Standard functional primitives for transformation, filtering, and side-effects. -* **Aggregator:** Maps `Input -> Key -> Output`. Used for partitioned reduction (e.g., Ticks to Candles). State is - sharded by Key. - * Pattern: `Aggregator::new().from(&reader).to(&mut store).partition_by(...).reduce(...)` -* **Window:** Maps `Input -> Slice -> Option`. Provides a zero-copy "Lookback" mechanism (e.g., Moving - Averages over the - last $N$ elements). - * Pattern: `Window::new().from(&reader).to(&mut store).reduce(size, ...)` -* **Join:** Aligns two independent streams by a common attribute (e.g., Timestamp). +**Zero-Copy Composition:** +The `pipe!` macro chains components such that they execute sequentially within the same worker loop, minimizing overhead while maintaining a clear, declarative structure. --- @@ -114,9 +138,14 @@ To guarantee performance and zero-copy safety, Roda imposes several constraints: ## 6. Implementation Notes: The "Magic" of Atomics -Synchronization is achieved without locks using `Acquire/Release` semantics: +Synchronization is achieved without locks using `Acquire/Release` semantics to coordinate between producers and consumers: -* **Writer:** `buffer[cursor] = data; cursor.store(new_val, Release);` -* **Reader:** `while cursor.load(Acquire) > local_cursor { process(); local_cursor++; }` +* **Producer (Writer):** + 1. Write data to the buffer. + 2. `write_index.store(new_val, Release);` +* **Consumer (Reader):** + 1. `while write_index.load(Acquire) > local_read_index { ... }` + 2. Process data. + 3. `local_read_index += 1;` -This ensures that when the reader sees the updated cursor, it is guaranteed to see the data written by the writer. \ No newline at end of file +This ensures that when the reader sees the updated `write_index`, the hardware and compiler guarantees that it also sees the corresponding data written by the producer. \ No newline at end of file diff --git a/README.md b/README.md index 8b3520e..d80d31f 100644 --- a/README.md +++ b/README.md @@ -1,251 +1,169 @@ # Roda -Ultra-high-performance, low-latency state computer for real-time analytics and event-driven systems. Roda lets you build -deterministic streaming pipelines with cache-friendly dataflows, wait-free reads, and explicit memory bounds—ideal for -IoT, telemetry, industrial automation, and any workload where microseconds matter. +Ultra-high-performance, low-latency state computer for real-time analytics and event-driven systems. Roda lets you build deterministic streaming pipelines with cache-friendly dataflows, wait-free reads, and explicit memory bounds—ideal for IoT, telemetry, industrial automation, and any workload where microseconds matter. -> Status: Early design and API preview. Examples and tests illustrate the intended DX. Expect rapid iteration and -> breaking changes. +> **Status:** Early design and API preview. Examples and tests illustrate the intended DX. Expect rapid iteration and breaking changes. --- -## Why Roda? +## Example: From Sensor Readings to Alerts -- Deterministic performance: Explicit store sizes, preallocated buffers, back-pressure free write path by design goals. -- Low latency by construction: Reader APIs are designed for zero/constant allocations and predictable access patterns. -- Declarative pipelines: Express processing in terms of partitions, reductions, and sliding windows. -- Indexable state: Build direct indexes for O(1) lookups into rolling state. -- Simple concurrency model: Long-lived workers with single-writer/multi-reader patterns. +```rust +use roda_state::{StageEngine, pipe, stateful, delta}; +use bytemuck::{Pod, Zeroable}; -## Core Concepts +#[repr(C)] +#[derive(Clone, Copy, Default, Pod, Zeroable)] +struct Reading { sensor_id: u64, value: f64, timestamp: u64 } + +#[repr(C)] +#[derive(Clone, Copy, Default, Pod, Zeroable)] +struct Summary { sensor_id: u64, avg: f64, count: u64 } + +#[repr(C)] +#[derive(Clone, Copy, Default, Pod, Zeroable, Debug)] +struct Alert { sensor_id: u64, severity: i32 } + +fn main() { + // 1. Build a multistage pipeline + let engine = StageEngine::::with_capacity(1_000_000) + .add_stage(pipe![ + stateful( + |r| r.sensor_id, + |r| Summary { sensor_id: r.sensor_id, avg: r.value, count: 1 }, + |s, r| { + s.avg = (s.avg * s.count as f64 + r.value) / (s.count + 1) as f64; + s.count += 1; + } + ) + ]) + .add_stage(pipe![ + delta( + |s: &Summary| s.sensor_id, + |curr, prev| { + if let Some(p) = prev && curr.avg > p.avg * 1.5 { + return Some(Alert { sensor_id: curr.sensor_id, severity: 1 }); + } + None + } + ) + ]); -- **Engine:** Orchestrates workers (long-lived tasks) that advance your pipelines. -- **Store:** A bounded, cache-friendly append-only buffer that holds your state. You choose the capacity up front. - - `push(value)`: Append a new item (typically by a single writer thread). - - `reader()`: Returns a `StoreReader` view appropriate for consumers. - - `direct_index()`: Build a secondary index over the store. -- **StoreReader:** A cursor-based handle for consuming state from a `Store`. - - `next()`: Advance the cursor to the next available item. - - `get()`, `get_at(at)`, `get_last()`: Retrieve a copy of the state. - - `get_window::(at)`: Retrieve a fixed-size window of state. - - `with(|state| ...)`, `with_at(at, |state| ...)`, `with_last(|state| ...)`: Execute a closure with a borrowed reference. -- **Aggregator:** A partitioned reducer for turning event streams into rolling state. - - `from(&reader)`: Set the input source. - - `to(&mut store)`: Set the output target. - - `partition_by(|in| Key)`: Assign each input to a partition. - - `reduce(|idx, in, out| ...)`: Merge an input into the current output for its partition; `idx` is 0-based within the partition window. -- **Window:** A fixed-size sliding window over the input store. - - `from(&reader)`: Set the input source. - - `to(&mut store)`: Set the output target. - - `reduce(window_size, |window: &[In]| -> Option)`: Compute optional output when the window is advanced. -- **DirectIndex:** Build and query secondary indexes over a store for O(1) state lookups. - - `compute(|value| Key)`: Manually update the index for the next available item in the store (typically called inside a worker). + // 2. Ingest data + engine.send(&Reading { sensor_id: 1, value: 10.0, timestamp: 1 }); + engine.send(&Reading { sensor_id: 1, value: 20.0, timestamp: 2 }); + + // 3. Receive processed alerts + std::thread::sleep(std::time::Duration::from_millis(10)); + while let Some(alert) = engine.try_receive() { + println!("{:?}", alert); + } +} +``` --- -For a deep dive into Roda's memory model, zero-copy internals, and execution patterns, see [DESIGN.md](DESIGN.md). +## Examples -- **Shared-Nothing Strategy:** While data is shared for efficiency, workers maintain independent logic and state to avoid contention. -- **Microsecond Precision:** Built specifically for systems where every microsecond of jitter impacts the bottom line. -- **Cache-Friendly:** Data layout is optimized for CPU cache lines, minimizing cache misses during pipeline execution. -- **Built-in Indexing:** O(1) secondary lookups without the overhead of a general-purpose database. +Explore more detailed implementations in the [examples](examples) folder: -## Architecture at a Glance +- [**Service Health Monitoring**](examples/service_health/README.md): Demonstrates noise filtering, stateful aggregation, and alert suppression. +- [**Real-Time Sensor Data**](examples/sensor_test/README.md): Showcases statistical windowing and end-to-end latency tracking. +- [**High-Performance MBO Replay**](examples/databento_replay/README.md): A production-ready market data replay and alpha generation system with CPU pinning and zero-allocation hot paths. -Roda is designed as a **Shared-Memory, Single-Writer Multi-Reader (SWMR)** system: -- **Zero-Copy:** Data stays in memory-mapped stores; consumers receive borrowed views. -- **Lock-Free:** Coordination happens via Atomic Sequence Counters with Acquire/Release semantics. -- **Deterministic:** Memory is pre-allocated; no allocations on the hot path. -- **Declarative:** Pipelines are built by connecting `Store`, `Aggregator`, and `Window` primitives. +--- -## Features +## Why Roda? -- **Blazing Fast:** Designed for microsecond-level latency using memory-mapped buffers. -- **Zero-Copy:** Data is borrowed directly from shared memory regions; no unnecessary allocations on the hot path. -- **Lock-Free:** Single-Writer Multi-Reader (SWMR) pattern with atomic coordination. -- **Deterministic:** Explicit memory management and pre-allocated stores prevent GC pauses or unexpected heap allocations. -- **Declarative API:** Build complex data processing pipelines using `Aggregator`, `Window`, and `Index` primitives. +- **Deterministic performance:** Explicit store sizes, preallocated buffers, back-pressure free write path. +- **Low latency by construction:** Reader APIs are designed for zero/constant allocations and predictable access patterns. +- **Multistage Pipelines:** Orchestrate processing stages in dedicated threads with `StageEngine`. +- **Declarative Composition:** Build complex logic using the `pipe!` macro and reusable components. +- **Simple Concurrency:** Single-writer/multi-reader patterns with lock-free coordination. -## Quick Start +--- -Add `roda-state` to your `Cargo.toml`: +## Performance: Why it is so fast? -```toml -[dependencies] -roda-state = "0.1" -``` +Roda is designed for microsecond-level latency by adhering to **Mechanical Sympathy** principles: -Or if you're working from this repository: +- **Static Dispatch:** Everything is resolved at compile time. The `pipe!` macro and generic stages eliminate virtual function calls (`dyn`), allowing the compiler to inline and optimize the entire data flow across component boundaries. +- **Non-blocking Pipelining via `mmap`:** Stages communicate through shared memory-mapped regions. Data written by one stage is immediately visible to the next without kernel-level context switches, syscalls, or expensive memory copies. +- **Single-Writer Multi-Reader (SWMR):** Only the **write index** is atomic and shared between threads. Each reader maintains its own **local read index**, eliminating write-side contention and minimizing cache coherence traffic across CPU cores. +- **Wait-Free Reads:** Readers poll the atomic write index using `Acquire/Release` memory ordering. They never block or wait for other readers or the writer, ensuring predictable, jitter-free processing even under heavy load. +- **Append-only Journal:** Data is stored in pre-allocated, contiguous buffers. This ensures linear memory access patterns, which are highly efficient for CPU prefetchers and maximize cache hit rates. +- **Zero-Copy Principles:** Data is never moved or copied between stages. Consumers receive borrowed views (`&T`) directly into the shared memory regions, eliminating allocation overhead and reducing memory bandwidth pressure. -```toml -[dependencies] -roda-state = { path = "." } -``` +--- -Run the example: +## Core API: The `pipe!` macro -```bash -cargo run --example sensor_test +The `pipe!` macro chains processing components into a single execution stage. Each component is executed sequentially for every incoming item. + +### `stateful` +Maintains per-key state for partitioned reduction or aggregation. +```rust +stateful( + |r| r.id, // Key selector: groups data by ID + |r| State::new(r), // Initializer: creates state for a new key + |s, r| s.update(r) // Mutator: updates existing state with new input +) ``` -## Example: From Sensor Readings to Summaries to Alerts +### `delta` +Compares the current incoming item with the previous one for the same key. Useful for anomaly detection or calculating rates of change. +```rust +delta( + |s| s.id, // Key selector + |curr, prev| { // Comparison logic: receives Current and Option + if let Some(p) = prev && curr.val > p.val * 2.0 { + return Some(Alert::new(curr)); + } + None + } +) +``` -Below is a trimmed version of `examples/sensor_test.rs` that demonstrates a two-stage pipeline: aggregate raw sensor readings into statistical summaries, then derive alerts when anomalies are detected via a sliding window. +### `map` & `filter` +Standard functional primitives for transformation and conditional dropping. +```rust +pipe![ + map(|x| x.value * 2), + filter(|x| *x > 100) +] +``` +### `dedup_by` +Filters out redundant items if the calculated key matches the last seen key for that partition. ```rust -use bytemuck::{Pod, Zeroable}; -use roda_state::components::{Engine, Index, Store, StoreOptions, StoreReader}; -use roda_state::{Aggregator, RodaEngine, Window}; -use std::thread; -use std::time::Duration; +dedup_by(|r| r.id) +``` -#[repr(C)] -#[derive(Clone, Copy, Default, Pod, Zeroable)] -struct Reading { - sensor_id: u64, - value: f64, - timestamp: u64, -} +--- -impl Reading { - fn from(sensor_id: u64, value: f64, timestamp: u64) -> Self { - Self { sensor_id, value, timestamp } - } -} +## Core Concepts +- **StageEngine:** The primary entry point for building pipelines. It manages a sequence of stages, each running in its own thread. +- **JournalStore:** A bounded, cache-friendly append-only buffer. Data stays in memory-mapped regions; consumers receive borrowed views. +- **SlotStore:** A bounded store for state that needs to be updated by "slots" or addresses, rather than appended. +- **Stage & Pipe:** + - **Stage:** A unit of execution (thread) that processes items from an input store to an output store. + - **Pipe:** Composable logic that can be chained within a single stage. -#[repr(C)] -#[derive(Clone, Copy, Default, Pod, Zeroable)] -struct Summary { - sensor_id: u64, - min: f64, - max: f64, - avg: f64, - count: u64, - timestamp: u64, -} +--- -#[repr(C)] -#[derive(Clone, Copy, Default, Pod, Zeroable)] -struct Alert { - sensor_id: u64, - timestamp: u64, - severity: i32, - _pad0: i32, -} +## Quick Start -#[derive(Clone, Copy, PartialEq, Eq, Hash, Pod, Zeroable)] -#[repr(C)] -struct SensorKey { - sensor_id: u64, - timestamp: u64, -} +Add `roda-state` to your `Cargo.toml`: -fn main() { - let engine = RodaEngine::new(); - - // 1. Allocate bounded stores - let mut reading_store = engine.store::(StoreOptions { - name: "readings", - size: 1_000_000, - in_memory: true, - }); - let reading_reader = reading_store.reader(); - - let mut summary_store = engine.store::(StoreOptions { - name: "summaries", - size: 10_000, - in_memory: true, - }); - let summary_reader = summary_store.reader(); - - let mut alert_store = engine.store::(StoreOptions { - name: "alerts", - size: 10_000, - in_memory: true, - }); - let alert_reader_for_print = alert_store.reader(); - - let summary_index = summary_store.direct_index::(); - - // 2. Declare pipelines - let summary_pipeline: Aggregator = Aggregator::new(); - let alert_pipeline: Window = Window::new(); - - // 3. Worker 1: aggregate readings -> summaries and maintain index - engine.run_worker(move || { - reading_reader.next(); - summary_pipeline - .from(&reading_reader) - .to(&mut summary_store) - .partition_by(|r| SensorKey { - sensor_id: r.sensor_id, - timestamp: r.timestamp / 100_000 - }) - .reduce(|i, r, s| { - if i == 0 { - *s = Summary { - sensor_id: r.sensor_id, - min: r.value, max: r.value, avg: r.value, count: 1, - timestamp: (r.timestamp / 100_000) * 100_000, - }; - } else { - s.min = s.min.min(r.value); - s.max = s.max.max(r.value); - s.avg = (s.avg * s.count as f64 + r.value) / (s.count + 1) as f64; - s.count += 1; - } - }); - - summary_index.compute(|s| SensorKey { - sensor_id: s.sensor_id, - timestamp: s.timestamp / 100_000 - }); - }); - - // 4. Worker 2: alert on average jumps - engine.run_worker(move || { - summary_reader.next(); - alert_pipeline - .from(&summary_reader) - .to(&mut alert_store) - .reduce(2, |w| { - let (prev, cur) = (w[0], w[1]); - (cur.avg > prev.avg * 1.5).then(|| Alert { - sensor_id: cur.sensor_id, - timestamp: cur.timestamp, - severity: 1, - ..Default::default() - }) - }); - }); - - // 5. Data Ingestion - reading_store.push(Reading::from(1, 10.0, 10_000)); - reading_store.push(Reading::from(1, 12.0, 20_000)); - reading_store.push(Reading::from(1, 20.0, 110_000)); - reading_store.push(Reading::from(1, 22.0, 120_000)); - - thread::sleep(Duration::from_millis(100)); - - // 6. Print Results - while alert_reader_for_print.next() { - if let Some(a) = alert_reader_for_print.get() { - println!("{:?}", a); - } - } -} +```toml +[dependencies] +roda-state = "0.1" ``` -Explore the full example in `examples/sensor_test.rs` for more context. - -## Contributing - -Contributions are welcome! If you have ideas, issues, or benchmarks: +For a deep dive into Roda's memory model, zero-copy internals, and execution patterns, see [DESIGN.md](DESIGN.md). -- Open an issue to discuss the use-case and constraints -- Keep PRs focused and measured; include micro-benchmarks when changing hot paths -- Follow the existing code style and formatting +--- ## License diff --git a/architecture.png b/architecture.png new file mode 100644 index 0000000..f5acbb9 Binary files /dev/null and b/architecture.png differ diff --git a/benches/sensor_bench.rs b/benches/sensor_bench.rs index cb57c57..fbba5df 100644 --- a/benches/sensor_bench.rs +++ b/benches/sensor_bench.rs @@ -76,7 +76,7 @@ impl Summary { } #[inline(always)] - pub fn update(&mut self, r: Reading) { + pub fn update(&mut self, r: &Reading) { if r.value < self.min { self.min = r.value; } @@ -105,6 +105,7 @@ fn bench_sensor_pipeline(c: &mut Criterion) { let mut group = c.benchmark_group("sensor_pipeline"); group.sample_size(10); + group.throughput(criterion::Throughput::Elements(num_readings as u64)); group.measurement_time(Duration::from_secs(10)); group.bench_function("stage_engine", |b| { @@ -143,7 +144,7 @@ fn bench_sensor_pipeline(c: &mut Criterion) { let start = Instant::now(); for &r in &readings { - engine.send(r); + engine.send(&r); } engine.await_idle(Duration::from_secs(5)); total_duration += start.elapsed(); @@ -167,7 +168,7 @@ fn bench_sensor_pipeline(c: &mut Criterion) { let key = SensorKey::from_reading(&r); let summary = summaries.entry(key).or_insert_with(|| Summary::init(&r)); - summary.update(r); + summary.update(&r); let curr_summary = *summary; if let Some(prev) = last_summaries.get(&r.sensor_id) diff --git a/benches/store_bench.rs b/benches/store_bench.rs index f0ad6fe..455bc90 100644 --- a/benches/store_bench.rs +++ b/benches/store_bench.rs @@ -11,8 +11,7 @@ struct LargeState { } fn bench_push(c: &mut Criterion) { - let mut engine = RodaEngine::new(); - engine.enable_latency_stats(true); + let engine = RodaEngine::new(); let mut group = c.benchmark_group("append"); // 1GB buffer to ensure we don't overflow during benchmarking @@ -29,7 +28,7 @@ fn bench_push(c: &mut Criterion) { let mut val = 0u64; b.iter(|| { let _latency_guard = measurer.measure_with_guard(); - store_u64.append(black_box(val)); + store_u64.append(black_box(&val)); val += 1; }); }); @@ -46,7 +45,7 @@ fn bench_push(c: &mut Criterion) { let val = LargeState { data: [42; 16] }; b.iter(|| { let _latency_guard = measurer.measure_with_guard(); - store_large.append(black_box(val)); + store_large.append(black_box(&val)); }); }); println!("push_128b latency:{}", measurer.format_stats()); @@ -55,8 +54,7 @@ fn bench_push(c: &mut Criterion) { } fn bench_fetch(c: &mut Criterion) { - let mut engine = RodaEngine::new(); - engine.enable_latency_stats(true); + let engine = RodaEngine::new(); let mut group = c.benchmark_group("fetch"); let size = 1024 * 1024 * 100; // 100MB @@ -68,7 +66,7 @@ fn bench_fetch(c: &mut Criterion) { // Pre-fill some data for i in 0..10000 { - store.append(i as u64); + store.append(&(i as u64)); } let reader = store.reader(); @@ -97,7 +95,7 @@ fn bench_fetch(c: &mut Criterion) { in_memory: true, }); for _ in 0..10000 { - store_large.append(LargeState { data: [42; 16] }); + store_large.append(&LargeState { data: [42; 16] }); } let reader_large = store_large.reader(); @@ -125,8 +123,7 @@ fn bench_fetch(c: &mut Criterion) { } fn bench_window(c: &mut Criterion) { - let mut engine = RodaEngine::new(); - engine.enable_latency_stats(true); + let engine = RodaEngine::new(); let mut group = c.benchmark_group("window"); let size = 1024 * 1024 * 100; // 100MB @@ -138,7 +135,7 @@ fn bench_window(c: &mut Criterion) { // Pre-fill some data for i in 0..10000 { - store.append(i as u64); + store.append(&(i as u64)); } let reader = store.reader(); diff --git a/examples/databento_replay/README.md b/examples/databento_replay/README.md index b130ecc..96c3b8b 100644 --- a/examples/databento_replay/README.md +++ b/examples/databento_replay/README.md @@ -1,38 +1,68 @@ -# Liquidity Monitor +# High-Performance MBO Replay & Alpha Generation -This example demonstrates a market data replay system using the Roda engine. It processes raw Market-By-Order (MBO) data to perform real-time liquidity analysis. +This example demonstrates a production-ready, low-latency market data replay and alpha generation system built on the **Roda Engine**. It is designed to showcase the engineering standards required by top-tier HFT firms in Amsterdam (Optiver, Flow Traders, IMC). -## Overview +## Key Features -The "Liquidity Monitor" goes beyond simple price tracking. It focuses on three main objectives: +- **End-to-End Latency Observability**: Tracks "Tick-to-Signal" latency from the moment a record is read until the alpha signal is generated, using high-resolution `hdrhistogram`. +- **CPU Affinity & Pinning**: Automatically pins pipeline stage workers to dedicated physical cores to minimize OS scheduling jitter and cache misses. +- **Zero-Allocation Hot Path**: All data models are `Pod` (Plain Old Data) and cache-line aligned (`#[repr(align(64))]`) to prevent false sharing. Stages use `fxhash` for ultra-fast internal state management. +- **Accurate TTS Metrics**: Synchronized time measurement and warm-up stabilization ensure reported Tick-to-Signal latencies represent steady-state production performance. +- **SIMD-Friendly Signal Calculation**: Alpha signals (Weighted Order Book Imbalance) are calculated using vectorized loops that the compiler can easily optimize for SIMD instructions. +- **Real-Time Simulation**: Supports a `--simulate-live` mode to replay historical data at its original exchange-timestamp speed, allowing for realistic system testing. +- **High Throughput**: Capable of processing over **5M+ events per second (MEPS)** on a single core. -### 1. Reconstruct the Aggregate Book (Level 2) -Convert the raw stream of individual orders (MBO) into a consolidated map of **Price → Total Volume**. -* **Why useful?** This is what exchanges actually sell as "Level 2 Data." You are building it from scratch from the most granular data available. +## Pipeline Architecture -### 2. Calculate "Order Book Imbalance" -Measure the ratio of buy vs. sell pressure in the book. +The system uses a multi-stage threaded pipeline where data flows through wait-free journals. The full implementation can be found in [main.rs](main.rs). -**Formula:** -$$Imbalance = \frac{Bid\ Vol - Ask\ Vol}{Bid\ Vol + Ask\ Vol}$$ +```mermaid +graph LR + A[DBN File] -->|Decoding| B(Stage 1: Importer) + B -->|MBO Entry| C(Stage 2: Order Tracker) + C -->|MBO Delta| D(Stage 3: Price Aggregator) + D -->|Price Level| E(Stage 4: Alpha Gen) + E -->|Signal| F[Strategy/Log] -* **Why useful?** This is a primary signal for predicting short-term price movement. A positive value indicates buy pressure. + subgraph "Thread per Stage (CPU Pinned)" + C + D + E + end +``` -### 3. Detect "Liquidity Voids" -Monitor the book for sudden drops in available volume. -* **Condition:** If the total volume at the Top 5 levels drops by 50% in < 1ms, trigger an alert. -* **Why useful?** This predicts "Flash Crashes" and high-volatility events where price might slip significantly. +## Data Models -## Usage +1. **Normalization (`LightMboEntry`)**: Compact MBO record with `ts_recv` tagging. +2. **Order Tracking (`MboDelta`)**: Captures the change in volume at a specific price point. +3. **Aggregation (`BookLevelEntry`)**: Maintains total volume per price level. +4. **Book State (`BookLevelTop`)**: Top-5 price levels, maintained within the Alpha Gen stage. +5. **Signal (`ImbalanceSignal`)**: The final alpha output with end-to-end latency metadata. -To run the replay, provide the path to a Databento MBO file: +## Usage ```bash -cargo run --example databento_replay -- --file path/to/your/data.dbn +# High-speed backtest (Maximum throughput) +cargo run --release --example databento_replay -- --file path/to/data.dbn --pin-cores + +# Live simulation (Real-time speed) +cargo run --release --example databento_replay -- --file path/to/data.dbn --simulate-live ``` -## Architecture +## Performance Metrics -- `main.rs`: Sets up the Roda engine, market data store, and the processing pipeline. -- `importer.rs`: Handles reading and decoding the Databento MBO file. -- `light_mbo_entry.rs`: Defines the compact data structure for storing MBO records in the Roda store. +The engine reports: +- **MEPS**: Millions of Events Per Second processed. +- **P99.9 Latency**: Tail latency for both stage execution and end-to-end signal generation. +- **Throughput Stats**: Periodic logs showing the processing rate and average speed. + +### Benchmark Results + +On a typical performance-tuned environment (`--pin-cores`), the system achieves: + +```text +Final Imbalance Signals: 24,191,906 +Throughput: 7.40 MEPS (Million Events Per Second) +Execution Time: 3.27s +TTS Latency (Tick-to-Signal): p50=8.0us, p90=28.7us, p99=56.7us, p999=165.4us +``` diff --git a/examples/databento_replay/aggregation_stage.rs b/examples/databento_replay/aggregation_stage.rs index cd8d383..e547f11 100644 --- a/examples/databento_replay/aggregation_stage.rs +++ b/examples/databento_replay/aggregation_stage.rs @@ -1,48 +1,62 @@ use crate::book_level_entry::BookLevelEntry; -use crate::light_mbo_entry::LightMboEntry; +use crate::light_mbo_delta::MboDelta; +use fxhash::FxHashMap; use roda_state::{OutputCollector, Stage}; -use std::collections::HashMap; #[derive(Default)] pub struct AggregationStage { - book_volumes: HashMap<(u32, u8, i64), BookLevelEntry>, + book_volumes: FxHashMap<(u32, u8, i64), BookLevelEntry>, } -impl Stage for AggregationStage { - fn process(&mut self, entry: LightMboEntry, collector: &mut C) +impl Stage for AggregationStage { + #[inline(always)] + fn process(&mut self, delta: &MboDelta, collector: &mut C) where C: OutputCollector, { - let key = (entry.instrument_id, entry.side, entry.price); + if delta.is_clear != 0 { + self.book_volumes + .retain(|(inst_id, _, _), _| *inst_id != delta.instrument_id); + // Notify downstream to clear book levels for both sides + collector.push(&BookLevelEntry { + ts: delta.ts, + ts_recv: delta.ts_recv, + symbol: delta.instrument_id as u64, + side: b'B', + volume: 0, + ..Default::default() + }); + collector.push(&BookLevelEntry { + ts: delta.ts, + ts_recv: delta.ts_recv, + symbol: delta.instrument_id as u64, + side: b'A', + volume: 0, + ..Default::default() + }); + return; + } + + let key = (delta.instrument_id, delta.side, delta.price); let book = self.book_volumes.entry(key).or_insert(BookLevelEntry { - ts: entry.ts, - symbol: entry.instrument_id as u64, - price: entry.price, + ts: delta.ts, + ts_recv: delta.ts_recv, + symbol: delta.instrument_id as u64, + price: delta.price, volume: 0, - side: entry.side, + side: delta.side, _pad: [0; 7], }); - book.ts = entry.ts; + book.ts = delta.ts; + book.ts_recv = delta.ts_recv; - match entry.action { - // Add - b'A' => { - book.volume = book.volume.saturating_add(entry.size as u64); - } - // Cancel, Fill, or Trade - b'C' | b'F' | b'T' => { - book.volume = book.volume.saturating_sub(entry.size as u64); - } - // Clear Book - b'R' => { - book.volume = 0; - } - _ => {} - } + // Apply delta + let new_volume = (book.volume as i64 + delta.delta as i64).max(0) as u64; + book.volume = new_volume; // Always push the update so downstream knows about deletions/volume=0 - collector.push(*book); + collector.push(book); if book.volume == 0 { self.book_volumes.remove(&key); diff --git a/examples/databento_replay/analysis_stage.rs b/examples/databento_replay/analysis_stage.rs index 672ea50..b4a0b7b 100644 --- a/examples/databento_replay/analysis_stage.rs +++ b/examples/databento_replay/analysis_stage.rs @@ -1,29 +1,55 @@ use crate::book_level_entry::BookLevelEntry; use crate::book_level_top::BookLevelTop; use crate::imbalance_signal::ImbalanceSignal; +use fxhash::FxHashMap; +use roda_state::measure::LatencyMeasurer; use roda_state::{OutputCollector, Stage}; use spdlog::prelude::*; -use std::collections::HashMap; use std::time::{Duration, Instant}; pub struct AnalysisStage { - book_tops: HashMap, + book_tops: FxHashMap, last_print: Instant, counter: u64, + // Tick-to-Signal Latency Measurer + tts_measurer: LatencyMeasurer, } impl Default for AnalysisStage { fn default() -> Self { Self { - book_tops: HashMap::new(), + book_tops: FxHashMap::default(), last_print: Instant::now(), counter: 0, + tts_measurer: LatencyMeasurer::new(1), // Sample every 1000th tick + } + } +} + +impl AnalysisStage { + /// SIMD-friendly weighted imbalance calculation + #[inline(always)] + fn calculate_weighted_imbalance(book_top: &BookLevelTop) -> (f64, f64, f64) { + const WEIGHTS: [f64; 5] = [1.0, 0.8, 0.6, 0.4, 0.2]; + let mut bid_vol = 0.0; + let mut ask_vol = 0.0; + + for (i, &weight) in WEIGHTS.iter().enumerate() { + bid_vol += book_top.bids[i].size as f64 * weight; + ask_vol += book_top.asks[i].size as f64 * weight; + } + + let total_vol = bid_vol + ask_vol; + if total_vol > 0.0 { + ((bid_vol - ask_vol) / total_vol, bid_vol, ask_vol) + } else { + (0.0, 0.0, 0.0) } } } impl Stage for AnalysisStage { - fn process(&mut self, entry: BookLevelEntry, collector: &mut C) + fn process(&mut self, entry: &BookLevelEntry, collector: &mut C) where C: OutputCollector, { @@ -35,48 +61,37 @@ impl Stage for AnalysisStage { symbol: entry.symbol, ..Default::default() }); - book_top.adjust(entry); - - let mut bid_vol = 0.0; - let mut ask_vol = 0.0; + book_top.adjust(*entry); - for (i, level) in book_top.bids.iter().enumerate() { - if level.price == 0 { - break; - } - let weight = 1.0 - (i as f64 * 0.2); - bid_vol += level.size as f64 * weight; - } - - for (i, level) in book_top.asks.iter().enumerate() { - if level.price == 0 { - break; - } - let weight = 1.0 - (i as f64 * 0.2); - ask_vol += level.size as f64 * weight; - } - - let total_vol = bid_vol + ask_vol; - if total_vol > 0.0 { - let imbalance = (bid_vol - ask_vol) / total_vol; + let (imbalance, bid_vol, ask_vol) = Self::calculate_weighted_imbalance(book_top); + if bid_vol + ask_vol > 0.0 { // Produce the signal - collector.push(ImbalanceSignal { + collector.push(&ImbalanceSignal { ts: entry.ts, + ts_recv: entry.ts_recv, symbol: entry.symbol, imbalance, bid_vol, ask_vol, + _pad: [0; 2], }); - if imbalance.abs() > 0.95 && self.last_print.elapsed() > Duration::from_millis(500) { + if imbalance.abs() > 0.98 && self.last_print.elapsed() > Duration::from_millis(500) { info!( - "[Sym:{}] Imbalance: {:.2} (B: {:.0}, A: {:.0})", + "[Sym:{}] High Imbalance: {:.4} (B:{:.0} A:{:.0})", entry.symbol, imbalance, bid_vol, ask_vol ); self.last_print = Instant::now(); } } + + // Record tick-to-signal latency + if self.counter.is_multiple_of(1000) { + let now_nanos = crate::latency_tracker::get_relative_nanos(); + let tts_latency = now_nanos.saturating_sub(entry.ts_recv); + self.tts_measurer.measure(Duration::from_nanos(tts_latency)); + } } } @@ -86,5 +101,9 @@ impl Drop for AnalysisStage { "[System] Final Imbalance Signals processed: {}", self.counter ); + info!( + "[Analysis] TTS Latency (Tick-to-Signal): {}", + self.tts_measurer.format_stats() + ); } } diff --git a/examples/databento_replay/book_level_entry.rs b/examples/databento_replay/book_level_entry.rs index bd9bff5..5a7b6f3 100644 --- a/examples/databento_replay/book_level_entry.rs +++ b/examples/databento_replay/book_level_entry.rs @@ -4,6 +4,7 @@ use bytemuck::{Pod, Zeroable}; #[derive(Debug, Clone, Copy, Default, Pod, Zeroable)] pub struct BookLevelEntry { pub ts: u64, + pub ts_recv: u64, pub symbol: u64, // or instrument_id pub price: i64, pub volume: u64, // "size" is also common diff --git a/examples/databento_replay/book_level_top.rs b/examples/databento_replay/book_level_top.rs index d7fcb2e..0d14604 100644 --- a/examples/databento_replay/book_level_top.rs +++ b/examples/databento_replay/book_level_top.rs @@ -8,18 +8,21 @@ pub struct BookLevelTopEntry { pub price: i64, } -#[repr(C)] +#[repr(C, align(64))] #[derive(Debug, Clone, Copy, Default, Pod, Zeroable)] pub struct BookLevelTop { pub ts: u64, + pub ts_recv: u64, pub symbol: u64, // or instrument_id pub asks: [BookLevelTopEntry; 5], pub bids: [BookLevelTopEntry; 5], + pub _pad: u64, } impl BookLevelTop { pub(crate) fn adjust(&mut self, entry: BookLevelEntry) { self.ts = entry.ts; + self.ts_recv = entry.ts_recv; let levels = match entry.side { b'A' => &mut self.asks, b'B' => &mut self.bids, @@ -75,9 +78,11 @@ impl From for BookLevelTop { fn from(entry: BookLevelEntry) -> Self { Self { ts: entry.ts, + ts_recv: 0, symbol: entry.symbol, asks: [BookLevelTopEntry::default(); 5], bids: [BookLevelTopEntry::default(); 5], + _pad: 0, } } } diff --git a/examples/databento_replay/imbalance_signal.rs b/examples/databento_replay/imbalance_signal.rs index 6b25c5f..e6f751c 100644 --- a/examples/databento_replay/imbalance_signal.rs +++ b/examples/databento_replay/imbalance_signal.rs @@ -1,11 +1,13 @@ use bytemuck::{Pod, Zeroable}; -#[repr(C)] +#[repr(C, align(64))] #[derive(Debug, Clone, Copy, Default, Pod, Zeroable)] pub struct ImbalanceSignal { pub ts: u64, + pub ts_recv: u64, pub symbol: u64, pub imbalance: f64, pub bid_vol: f64, pub ask_vol: f64, + pub _pad: [u64; 2], } diff --git a/examples/databento_replay/importer.rs b/examples/databento_replay/importer.rs index bbfcae6..b7ceafd 100644 --- a/examples/databento_replay/importer.rs +++ b/examples/databento_replay/importer.rs @@ -1,6 +1,7 @@ use std::error::Error; use std::path::PathBuf; -use std::time::Instant; +use std::thread::sleep; +use std::time::{Duration, Instant}; use dbn::Record; use dbn::decode::{DbnDecoder as Decoder, DecodeRecordRef}; @@ -15,20 +16,59 @@ use roda_state::Appendable; pub fn import_mbo_file( file: PathBuf, market_store: &mut impl Appendable, + simulate_live: bool, ) -> Result<(), Box> { - info!("[Writer] Starting Feed Handler for {:?}...", file); + info!( + "[Writer] Starting Feed Handler for {:?} (Simulate Live: {})...", + file, simulate_live + ); let start = Instant::now(); let mut count = 0u64; // 1. Setup Decoder let mut decoder = Decoder::from_zstd_file(&file)?; - // 3. Hot Loop + let mut first_ts = None; + let mut first_now = Instant::now(); + while let Some(record) = decoder.decode_record_ref()? { if record.header().rtype == rtype::MBO { let msg = record.get::().unwrap(); - market_store.append(LightMboEntry::from(msg)); + + if simulate_live { + if first_ts.is_none() { + first_ts = Some(msg.hd.ts_event); + first_now = Instant::now(); + // Small warm-up delay to let threads stabilize + std::thread::sleep(Duration::from_millis(50)); + } + + let elapsed_market = msg.hd.ts_event - first_ts.unwrap(); + let elapsed_now = first_now.elapsed().as_nanos() as u64; + + if elapsed_market > elapsed_now { + let sleep_dur = Duration::from_nanos(elapsed_market - elapsed_now); + if sleep_dur > Duration::from_secs(1) { + // reset + first_ts = None; + first_now = Instant::now(); + } else if sleep_dur > Duration::from_micros(10) { + sleep(Duration::from_micros(10)); + } + } + } else if count == 0 { + // Warm-up for backtest mode + std::thread::sleep(Duration::from_millis(50)); + } + + let ts_recv = crate::latency_tracker::get_relative_nanos(); + market_store.append(&LightMboEntry::from_msg(msg, ts_recv)); count += 1; + + // if count % 100_000 == 0 && start.elapsed().as_secs() > 20 { + // info!("[Writer] Stopped after 20 seconds..."); + // break; + // } } } diff --git a/examples/databento_replay/latency_tracker.rs b/examples/databento_replay/latency_tracker.rs new file mode 100644 index 0000000..6c36e88 --- /dev/null +++ b/examples/databento_replay/latency_tracker.rs @@ -0,0 +1,9 @@ +use std::sync::LazyLock; +use std::time::Instant; + +pub static START_TIME: LazyLock = LazyLock::new(Instant::now); + +#[inline(always)] +pub fn get_relative_nanos() -> u64 { + START_TIME.elapsed().as_nanos() as u64 +} diff --git a/examples/databento_replay/light_mbo_delta.rs b/examples/databento_replay/light_mbo_delta.rs new file mode 100644 index 0000000..26b98c9 --- /dev/null +++ b/examples/databento_replay/light_mbo_delta.rs @@ -0,0 +1,30 @@ +use bytemuck::{Pod, Zeroable}; + +#[repr(C)] +#[derive(Debug, Clone, Copy, Default, Pod, Zeroable)] +pub struct MboDelta { + /// 1. The Event Timestamp (UNIX nanos). + pub ts: u64, + + /// 2. The Local Receive Timestamp. + pub ts_recv: u64, + + /// 3. The Price. + pub price: i64, + + /// 4. The Size (Quantity) change. + pub delta: i32, + + // --- PACKING SECTION (32-Bit Alignment) --- + /// 5. The Instrument ID. + pub instrument_id: u32, + + /// 6. Side (b'A' or b'B'). + pub side: u8, + + /// 7. Clear flag. + pub is_clear: u8, + + /// 8. Padding. + pub _pad: [u8; 6], +} diff --git a/examples/databento_replay/light_mbo_entry.rs b/examples/databento_replay/light_mbo_entry.rs index ab90fa6..4d9db1b 100644 --- a/examples/databento_replay/light_mbo_entry.rs +++ b/examples/databento_replay/light_mbo_entry.rs @@ -24,31 +24,30 @@ pub struct LightMboEntry { /// Needed if your store contains multiple symbols (e.g., MSFT and AAPL). pub instrument_id: u32, + /// 6. The Local Receive Timestamp (nanos since UNX EPOCH or just relative). + pub ts_recv: u64, + // --- PACKING SECTION (8-Bit Alignment) --- - /// 6. Action (Add='A', Cancel='C', Modify='M', etc.) + /// 7. Action (Add='A', Cancel='C', Modify='M', etc.) /// We store as u8 to match the raw byte. pub action: u8, - /// 7. Side (Bid='B', Ask='A'). + /// 8. Side (Bid='B', Ask='A'). pub side: u8, - /// 8. Explicit Padding. - /// We have used: 8+8+8+4+4+1+1 = 34 bytes. - /// The next multiple of 8 (for u64 alignment) is 40. - /// So we need 6 bytes of padding. + /// 9. Explicit Padding. pub _pad: [u8; 6], } -impl From<&MboMsg> for LightMboEntry { - fn from(msg: &MboMsg) -> Self { +impl LightMboEntry { + pub fn from_msg(msg: &MboMsg, ts_recv: u64) -> Self { Self { ts: msg.hd.ts_event, order_id: msg.order_id, price: msg.price, size: msg.size, instrument_id: msg.hd.instrument_id, - // Cast char (i8) to u8 directly. - // 'A' is 65, 'B' is 66, etc. + ts_recv, action: msg.action as u8, side: msg.side as u8, _pad: [0; 6], diff --git a/examples/databento_replay/main.rs b/examples/databento_replay/main.rs index 2647881..99ed547 100644 --- a/examples/databento_replay/main.rs +++ b/examples/databento_replay/main.rs @@ -3,7 +3,7 @@ use spdlog::prelude::*; use std::path::PathBuf; use std::time::Duration; -use roda_state::{StageEngine, latency, pipe, progress}; +use roda_state::{StageEngine, pipe}; mod aggregation_stage; mod analysis_stage; @@ -11,16 +11,28 @@ mod book_level_entry; mod book_level_top; mod imbalance_signal; mod importer; +mod latency_tracker; +mod light_mbo_delta; mod light_mbo_entry; +mod order_tracker; use crate::aggregation_stage::AggregationStage; use crate::analysis_stage::AnalysisStage; +use crate::light_mbo_entry::LightMboEntry; +use crate::order_tracker::OrderTracker; use importer::import_mbo_file; #[derive(Parser)] struct Args { #[arg(long)] file: PathBuf, + + #[arg(long, default_value_t = false)] + simulate_live: bool, + + /// Pin worker threads to CPU cores + #[arg(long, default_value_t = false)] + pin_cores: bool, } fn main() -> Result<(), Box> { @@ -29,40 +41,39 @@ fn main() -> Result<(), Box> { info!("[System] Booting Roda Data Bento Replay with StageEngine..."); // 1. Initialize StageEngine with enough capacity for the input - // Using 30M as in original example let mut engine = StageEngine::with_capacity(30_000_000); - engine.enable_latency_stats(true); - - // 2. Add Aggregation Stage: LightMboEntry -> BookLevelEntry - let engine = engine.add_stage_with_capacity( - 30_000_000, - pipe![ - progress("Aggregation", 10_000_000), - latency("Aggregation", 10_000_000, 1000, AggregationStage::default()) - ], - ); + engine.set_pin_cores(args.pin_cores); - // 3. Add Imbalance Analysis Stage: BookLevelEntry -> ImbalanceSignal - let mut engine = engine.add_stage_with_capacity( - 30_000_000, - pipe![ - progress("Imbalance Analysis", 10_000_000), - latency( - "Imbalance Analysis", - 10_000_000, - 1000, - AnalysisStage::default() - ) - ], - ); + if args.pin_cores { + info!("[System] CPU Pinning enabled for worker threads"); + } + + // 2. Add Order Tracker Stage: LightMboEntry -> MboDelta + let engine = engine.add_stage_with_capacity(30_000_000, |x: &LightMboEntry| Some(*x)); + let engine = engine.add_stage_with_capacity(30_000_000, pipe![OrderTracker::default()]); - import_mbo_file(args.file, &mut engine)?; + // 3. Add Aggregation Stage: MboDelta -> BookLevelEntry + let engine = engine.add_stage_with_capacity(30_000_000, pipe![AggregationStage::default()]); + + // 4. Add Imbalance Analysis Stage: BookLevelEntry -> ImbalanceSignal + let mut engine = engine.add_stage_with_capacity(30_000_000, pipe![AnalysisStage::default()]); + + let start = std::time::Instant::now(); + import_mbo_file(args.file, &mut engine, args.simulate_live)?; info!("[System] Waiting for all stages to finish processing..."); engine.await_idle(Duration::from_secs(600)); - info!("[System] Final Imbalance Signals: {}", engine.output_size()); - info!("[System] Done!"); + let duration = start.elapsed(); + let total_msgs = engine.output_size(); + let meps = total_msgs as f64 / duration.as_secs_f64() / 1_000_000.0; + + info!("[System] Final Imbalance Signals: {}", total_msgs); + info!( + "[System] Throughput: {:.2} MEPS (Million Events Per Second)", + meps + ); + info!("[System] Done in {:?}", duration); Ok(()) } diff --git a/examples/databento_replay/order_tracker.rs b/examples/databento_replay/order_tracker.rs new file mode 100644 index 0000000..0adb4de --- /dev/null +++ b/examples/databento_replay/order_tracker.rs @@ -0,0 +1,131 @@ +use crate::light_mbo_delta::MboDelta; +use crate::light_mbo_entry::LightMboEntry; +use fxhash::FxHashMap; +use roda_state::{OutputCollector, Stage}; + +#[derive(Default)] +pub struct OrderTracker { + orders: FxHashMap, +} + +impl Stage for OrderTracker { + #[inline(always)] + fn process(&mut self, entry: &LightMboEntry, collector: &mut C) + where + C: OutputCollector, + { + match entry.action { + // Add + b'A' => { + self.orders.insert(entry.order_id, *entry); + collector.push(&MboDelta { + ts: entry.ts, + ts_recv: entry.ts_recv, + price: entry.price, + delta: entry.size as i32, + instrument_id: entry.instrument_id, + side: entry.side, + is_clear: 0, + ..Default::default() + }); + } + // Cancel, Fill, or Trade + b'C' | b'F' | b'T' => { + // For Cancel/Fill, the message size is the size of the event. + // We should also update our internal tracking if the order isn't completely gone. + // But DBN MBO usually means order is gone on 'C'. On 'F' it might stay if partial. + // "The 'F' message represents a fill... If the order is fully filled, it is removed from the book." + // In DBN, if it's a partial fill, there might be a follow up or the remaining size is what matters. + + // For simplicity and matching the previous 'delta' pipe logic: + // If it's a Cancel or full Fill, we emit a negative delta. + collector.push(&MboDelta { + ts: entry.ts, + ts_recv: entry.ts_recv, + price: entry.price, + delta: -(entry.size as i32), + instrument_id: entry.instrument_id, + side: entry.side, + is_clear: 0, + ..Default::default() + }); + + if entry.action == b'C' { + self.orders.remove(&entry.order_id); + } else if let Some(order) = self.orders.get_mut(&entry.order_id) { + order.size = order.size.saturating_sub(entry.size); + if order.size == 0 { + self.orders.remove(&entry.order_id); + } + } + } + // Modify + b'M' => { + if let Some(old_order) = self.orders.get_mut(&entry.order_id) { + if old_order.price != entry.price { + // Price changed: remove old volume, add new volume + collector.push(&MboDelta { + ts: entry.ts, + ts_recv: entry.ts_recv, + price: old_order.price, + delta: -(old_order.size as i32), + instrument_id: entry.instrument_id, + side: entry.side, + is_clear: 0, + ..Default::default() + }); + collector.push(&MboDelta { + ts: entry.ts, + ts_recv: entry.ts_recv, + price: entry.price, + delta: entry.size as i32, + instrument_id: entry.instrument_id, + side: entry.side, + is_clear: 0, + ..Default::default() + }); + } else { + // Price same, size changed + collector.push(&MboDelta { + ts: entry.ts, + ts_recv: entry.ts_recv, + price: entry.price, + delta: entry.size as i32 - old_order.size as i32, + instrument_id: entry.instrument_id, + side: entry.side, + is_clear: 0, + ..Default::default() + }); + } + *old_order = *entry; + } else { + // We missed the Add? Treat as Add. + self.orders.insert(entry.order_id, *entry); + collector.push(&MboDelta { + ts: entry.ts, + ts_recv: entry.ts_recv, + price: entry.price, + delta: entry.size as i32, + instrument_id: entry.instrument_id, + side: entry.side, + is_clear: 0, + ..Default::default() + }); + } + } + // Clear Book + b'R' => { + self.orders + .retain(|_, v| v.instrument_id != entry.instrument_id); + collector.push(&MboDelta { + ts: entry.ts, + ts_recv: entry.ts_recv, + instrument_id: entry.instrument_id, + is_clear: 1, + ..Default::default() + }); + } + _ => {} + } + } +} diff --git a/examples/sensor_test/README.md b/examples/sensor_test/README.md new file mode 100644 index 0000000..9b5e010 --- /dev/null +++ b/examples/sensor_test/README.md @@ -0,0 +1,51 @@ +# Real-Time Sensor Data Aggregation & Anomaly Detection + +This example demonstrates a high-performance multistage pipeline for processing streaming sensor data using the **Roda Engine**. It showcases statistical windowing (Aggregation) and stateful delta analysis (Anomaly Detection) in a thread-per-stage architecture. + +The implementation is located in [main.rs](main.rs). + +## Key Features + +- **Multistage Pipeline**: Decouples data ingestion, statistical aggregation, and anomaly detection into separate CPU-bound stages. +- **Stateful Windowing**: Maintains running statistics (min, max, average) for sensors using the `stateful` pipe component. +- **Low-Latency Alerting**: Detects anomalies (e.g., sudden spikes in average value) using the `delta` component to compare current window state with the previous one. +- **Performance Metrics**: + - **Execution Latency**: Measures time spent within each stage using the `latency` pipe component. + - **End-to-End Latency**: Tracks "Tick-to-Alert" latency from raw reading to signal generation. + - **Throughput**: Capable of processing millions of sensor readings per second. + +## Pipeline Architecture + +```mermaid +graph LR + A[Raw Reading] --> B(Stage 1: Aggregation) + B -->|Summary| C(Stage 2: Anomaly Detection) + C -->|Alert| D[Alert Journal] + + subgraph "Worker Thread 1" + B + end + subgraph "Worker Thread 2" + C + end +``` + +## Data Models + +1. **Reading**: Raw sensor data with `sensor_id`, `value`, and receive timestamp. +2. **Summary**: Statistical window containing min, max, average, and observation count. +3. **Alert**: Signal generated when a sensor's average value jumps by more than 50% compared to the previous window. + +## Usage + +```bash +# Run the example with optimizations +cargo run --release --example sensor_test +``` + +## Performance (tested on MacBook M2 Max) + +On a modern CPU, this example typically achieves: +- **Throughput**: ~50 MEPS (Million Events Per Second). +- **End-to-End Latency**: < 500ns (median) for alert generation. +- **Stage Latency**: ~50ns per record for aggregation logic. diff --git a/examples/sensor_test/main.rs b/examples/sensor_test/main.rs index 41a5288..ab208ac 100644 --- a/examples/sensor_test/main.rs +++ b/examples/sensor_test/main.rs @@ -4,13 +4,14 @@ use crate::models::{Alert, Reading, SensorKey, Summary}; use roda_state::StageEngine; use roda_state::pipe; use roda_state::{delta, stateful}; -use std::time::Duration; +use std::time::{Duration, Instant}; fn main() { println!("Starting Sensor Multistage Pipeline (Optimized)..."); + let start_time = Instant::now(); // 1. Initialize StageEngine - let engine = StageEngine::::with_capacity(1000); + let engine = StageEngine::::with_capacity(1_000_000_000); // 2. Add Aggregation Stage: Reading -> Summary let mut engine = engine @@ -41,27 +42,29 @@ fn main() { // 4. INGEST DATA println!("\nPushing sensor readings..."); - let readings = [ - Reading::from(1, 10.0, 10_000), - Reading::from(1, 12.0, 20_000), - Reading::from(1, 20.0, 110_000), // Average jump - Reading::from(1, 22.0, 120_000), - ]; + let count = 100_000_000; + let mut readings = Vec::with_capacity(count * 4); + + for _ in 0..100_000_000 { + readings.push(Reading::from(1, 10.0, 10_000)); + readings.push(Reading::from(1, 12.0, 20_000)); + readings.push(Reading::from(1, 20.0, 110_000)); // Average jump + readings.push(Reading::from(1, 22.0, 120_000)); + } + let readings_count = count * 4; for r in readings { - engine.send(r); + engine.send(&r); } engine.await_idle(Duration::from_millis(100)); - // 5. DISPLAY RESULTS - println!("\nAlerts Detected:"); - while let Some(alert) = engine.receive() { - println!( - "ALERT: Sensor {} anomaly at {}", - alert.sensor_id, alert.timestamp - ); - } + let duration = start_time.elapsed(); + println!("Pipeline completed in {}ms", duration.as_millis()); + println!( + "Throughput: {}/s", + readings_count as f64 / duration.as_secs_f64() + ); println!("\nDone!"); } diff --git a/examples/sensor_test/models.rs b/examples/sensor_test/models.rs index d8f7b65..eed6494 100644 --- a/examples/sensor_test/models.rs +++ b/examples/sensor_test/models.rs @@ -77,7 +77,7 @@ impl Summary { /// Update the existing summary with a new reading. #[inline(always)] - pub fn update(&mut self, r: Reading) { + pub fn update(&mut self, r: &Reading) { // Update Min/Max if r.value < self.min { self.min = r.value; diff --git a/examples/service_health/README.md b/examples/service_health/README.md new file mode 100644 index 0000000..bcd249d --- /dev/null +++ b/examples/service_health/README.md @@ -0,0 +1,52 @@ +# Service Health Monitoring Pipeline + +This example demonstrates a robust, low-latency service health monitoring system built with the **Roda Engine**. It includes noise filtering (deduplication), stateful aggregation, and anomaly detection with alert deduplication. + +See [main.rs](main.rs) for the complete source code. + +## Key Features + +- **Noise Filtering**: Uses the `dedup_by` pipe component to drop redundant raw readings with identical values, reducing downstream load. +- **Hierarchical Pipeline**: Combines multiple processing steps (dedup -> stateful -> inspect) into logical stages. +- **Intelligent Alerting**: + - Detects spikes in average values using `delta`. + - Suppresses duplicate alerts for the same sensor using `dedup_by`, ensuring the monitoring system only notifies on state changes. +- **Performance Observability**: + - Uses the `latency` pipe to monitor the execution time of each composite stage. + - Reports end-to-end "Tick-to-Alert" latency for detected anomalies. + +## Pipeline Architecture + +```mermaid +graph LR + A[Raw Reading] --> B(Stage 1: Aggregation & Filtering) + B -->|Summary| C(Stage 2: Alerting & Suppression) + C -->|Alert| D[Main Thread / Dashboard] + + subgraph "Stage 1 (Pinned Thread)" + B1[Deduplicator] --> B2[Stateful Aggregator] + end + + subgraph "Stage 2 (Pinned Thread)" + C1[Delta Detector] --> C2[Alert Dedup] + end +``` + +## Data Models + +1. **Reading**: Raw metric from a service/sensor. +2. **Summary**: Rolling window of metrics (min, max, avg). +3. **Alert**: Notifies on significant health degradation (>50% jump in average). + +## Usage + +```bash +# Run the example with optimizations +cargo run --release --example service_health +``` + +## Performance Metrics (tested on MacBook M2 Max) + +- **Throughput**: ~26 MEPS (Million Events Per Second). +- **Stage Execution**: ~70-100ns per record. +- **End-to-End Latency**: Measured in nanoseconds from ingestion to alert receipt. diff --git a/examples/service_health/main.rs b/examples/service_health/main.rs index 79e54f7..b9b28d2 100644 --- a/examples/service_health/main.rs +++ b/examples/service_health/main.rs @@ -3,26 +3,27 @@ mod models; use models::{Alert, Reading, SensorKey, Summary}; use roda_state::StageEngine; use roda_state::pipe; -use roda_state::{dedup_by, delta, inspect, stateful}; -use std::time::Duration; +use roda_state::{dedup_by, delta, stateful}; +use std::time::{Duration, Instant}; fn main() { println!("--- Starting StageEngine: Service Health Pipeline ---"); + let start_time = Instant::now(); // 1. Initialize StageEngine (Initial entry type is Reading) - let engine = StageEngine::::with_capacity(1000); + let engine = StageEngine::::with_capacity(100_000_100); // 2. Add Aggregation Stage: Reading -> Summary // We also include a deduplicator at the start to drop identical raw readings. let engine = engine.add_stage(pipe![ dedup_by(|r: &Reading| (r.sensor_id, (r.value * 1000.0) as u64)), // Noise filter stateful(SensorKey::from_reading, Summary::init, Summary::update), - inspect(|s: &Summary| { - println!( - "STAGE 1 [AGG]: Sensor {} Avg updated to {:.2}", - s.sensor_id, s.avg - ); - }) + // inspect(|s: &Summary| { + // println!( + // "STAGE 1 [AGG]: Sensor {} Avg updated to {:.2}", + // s.sensor_id, s.avg + // ); + // }) ]); // 3. Add Anomaly Detection Stage: Summary -> Alert @@ -47,31 +48,36 @@ fn main() { ), // Deduplicate Alerts: Only notify if the alert is new/changed for this sensor dedup_by(|a: &Alert| a.sensor_id), - inspect(|a: &Alert| { - println!( - "STAGE 2 [ALERT]: 🚨 Anomaly detected for Sensor {}!", - a.sensor_id - ); - }) + // inspect(|a: &Alert| { + // println!( + // "STAGE 2 [ALERT]: 🚨 Anomaly detected for Sensor {}!", + // a.sensor_id + // ); + // }) ]); // 4. Ingest Data println!("\nIngesting readings..."); - let readings = [ - Reading::from(1, 10.0, 10_000), // Baseline - Reading::from(1, 10.0, 20_000), // Duplicate (filtered by dedup) - Reading::from(1, 11.0, 30_000), // Small change - Reading::from(1, 25.0, 110_000), // Spike -> Triggers Alert - Reading::from(2, 5.0, 10_000), // New Sensor - ]; + // Trigger an initial alert for sensor 2 + engine.send(&Reading::from(2, 10.0, 0)); + engine.send(&Reading::from(2, 100.0, 1)); - for r in readings { - engine.send(r); + let count = 100_000_000; + for i in 0..count { + engine.send(&Reading::from(1, 10.0, i as u64)); } + let readings_count = count + 2; // Give workers time to finish processing engine.await_idle(Duration::from_millis(100)); + let duration = start_time.elapsed(); + println!("Pipeline completed in {}ms", duration.as_millis()); + println!( + "Throughput: {}/s", + readings_count as f64 / duration.as_secs_f64() + ); + // 5. Display Results from the end of the pipeline println!("\n--- Final Alert Journal ---"); while let Some(alert) = engine.try_receive() { diff --git a/examples/service_health/models.rs b/examples/service_health/models.rs index 2df1f3a..ff4d5ea 100644 --- a/examples/service_health/models.rs +++ b/examples/service_health/models.rs @@ -64,7 +64,7 @@ impl Summary { } #[inline(always)] - pub fn update(&mut self, r: Reading) { + pub fn update(&mut self, r: &Reading) { if r.value < self.min { self.min = r.value; } diff --git a/scripts/check.sh b/scripts/check.sh index b71af1c..0097d3f 100755 --- a/scripts/check.sh +++ b/scripts/check.sh @@ -11,4 +11,6 @@ cargo clippy --all-targets -- -D warnings echo "Running tests..." cargo test --all-targets + + echo "All checks passed!" diff --git a/src/components.rs b/src/components.rs index 7055323..c786736 100644 --- a/src/components.rs +++ b/src/components.rs @@ -2,7 +2,7 @@ use bytemuck::Pod; /// For structures where we append data to the end (Journals, Logs). pub trait Appendable { - fn append(&mut self, state: State); + fn append(&mut self, state: &State); } /// For structures where we update a specific "address" or "slot" (State Maps, Arrays). diff --git a/src/engine.rs b/src/engine.rs index 15e1f4f..9abefcb 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,70 +1,86 @@ use crate::journal_store::{JournalStore, JournalStoreOptions}; -use crate::measure::latency_measurer::LatencyMeasurer; use crate::op_counter::OpCounter; use crate::slot_store::{SlotStore, SlotStoreOptions}; use bytemuck::Pod; -use spdlog::info; +use std::hint::spin_loop; use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::thread; use std::thread::sleep; use std::time::{Duration, Instant}; +/// The core execution engine for Roda. +/// +/// It manages worker threads, storage lifecycle, and shared operation counters. pub struct RodaEngine { root_path: &'static str, running: Arc, - enable_latency_stats: bool, worker_handlers: Vec>, op_counter: Arc, + pin_cores: bool, } impl RodaEngine { + /// Creates a new `RodaEngine` with the default "data" root path. pub fn new() -> Self { Self { root_path: "data", running: Arc::new(AtomicBool::new(true)), - enable_latency_stats: false, worker_handlers: vec![], op_counter: OpCounter::new(), + pin_cores: false, } } + pub(crate) fn set_pin_cores(&mut self, pin_cores: bool) { + self.pin_cores = pin_cores; + } + + /// Creates a new `RodaEngine` with a custom root path for storage. pub fn new_with_root_path(root_path: &'static str) -> Self { Self { root_path, running: Arc::new(AtomicBool::new(true)), - enable_latency_stats: false, worker_handlers: vec![], op_counter: OpCounter::new(), + pin_cores: false, } } - pub fn enable_latency_stats(&mut self, enable: bool) { - self.enable_latency_stats = enable; - } - - pub fn run_worker(&mut self, mut runnable: impl FnMut() + Send + 'static) { + /// Spawns a worker thread that executes the provided runnable in a loop. + /// + /// The worker will spin and yield if there is no work to do, minimizing latency. + pub fn run_worker(&mut self, mut runnable: impl FnMut() -> bool + Send + 'static) { let worker_id = self.worker_handlers.len(); let running = self.running.clone(); - let enable_latency_stats = self.enable_latency_stats; + let pin_cores = self.pin_cores; let handler = thread::spawn(move || { - if enable_latency_stats { - let mut measurer = LatencyMeasurer::new(1000); - while running.load(std::sync::atomic::Ordering::Relaxed) { - let instant = Instant::now(); - runnable(); - measurer.measure(instant.elapsed()); + if pin_cores + && let Some(core_ids) = core_affinity::get_core_ids() + && let Some(core_id) = core_ids.get(worker_id % core_ids.len()) + { + core_affinity::set_for_current(*core_id); + } + + let mut step_without_work_count = 0; + while running.load(std::sync::atomic::Ordering::Relaxed) { + let did_work = runnable(); + if did_work { + step_without_work_count = 0; + } else { + step_without_work_count += 1; } - info!("[Latency/Worker:{}]{}", worker_id, measurer.format_stats()); - } else { - while running.load(std::sync::atomic::Ordering::Relaxed) { - runnable(); + if step_without_work_count > 1000 { + thread::yield_now(); + } else if step_without_work_count > 10 { + spin_loop(); } } }); self.worker_handlers.push(handler); } + /// Creates a new `JournalStore` for sequential, append-only data storage. pub fn new_journal_store( &self, options: JournalStoreOptions, @@ -72,10 +88,12 @@ impl RodaEngine { JournalStore::new(self.root_path, self.op_counter.clone(), options) } + /// Creates a new `SlotStore` for random-access, slot-based data storage. pub fn new_slot_store(&self, options: SlotStoreOptions) -> SlotStore { SlotStore::new(self.root_path, self.op_counter.clone(), options) } + /// Blocks until the engine is idle (i.e., no operations have occurred for a short period). pub fn await_idle(&self, timeout: Duration) { let start = Instant::now(); let mut last_op_count = self.op_counter.total_op_count(); diff --git a/src/journal_store.rs b/src/journal_store.rs index 835030d..f508bf1 100644 --- a/src/journal_store.rs +++ b/src/journal_store.rs @@ -8,18 +8,26 @@ use std::sync::Arc; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering::Relaxed; +/// Configuration options for a `JournalStore`. pub struct JournalStoreOptions { + /// The name of the store, used for the filename. pub name: &'static str, + /// The maximum number of items the store can hold. pub size: usize, + /// Whether to keep the store only in memory. pub in_memory: bool, } +/// An append-only store for sequential data. +/// +/// It uses memory-mapped files for persistence and high-performance I/O. pub struct JournalStore { storage: JournalMmap, op_counter: Arc, _marker: std::marker::PhantomData, } +/// A reader for a `JournalStore` that maintains its own read index. pub struct StoreJournalReader { next_index: Cell, storage: JournalMmap, @@ -52,7 +60,8 @@ impl JournalStore { } } - pub fn append(&mut self, state: State) { + /// Appends an item to the store. + pub fn append(&mut self, state: &State) { let size = size_of::(); let current_pos = self.storage.get_write_index(); assert!( @@ -62,7 +71,7 @@ impl JournalStore { current_pos, size ); - self.storage.append(&state); + self.storage.append(state); } pub fn reader(&self) -> StoreJournalReader { @@ -80,12 +89,13 @@ impl JournalStore { } impl Appendable for JournalStore { - fn append(&mut self, state: State) { + fn append(&mut self, state: &State) { self.append(state); } } impl StoreJournalReader { + #[inline(always)] pub fn next(&self) -> bool { let index_to_read = self.next_index.get(); let offset = index_to_read * size_of::(); @@ -101,10 +111,12 @@ impl StoreJournalReader { true } + #[inline(always)] pub fn get_index(&self) -> usize { self.next_index.get() } + #[inline(always)] pub fn with(&self, handler: impl FnOnce(&State) -> R) -> Option { let next_index = self.next_index.get(); if next_index == 0 { @@ -115,6 +127,36 @@ impl StoreJournalReader { Some(handler(self.storage.read(offset))) } + /// Processes all remaining items in the store using the provided handler. + /// + /// This is highly optimized using batch reading (read_window). + #[inline(always)] + pub fn handle_remaining(&self, mut handler: impl FnMut(&State)) -> usize { + let index_to_read = self.next_index.get(); + let offset = index_to_read * size_of::(); + let write_index = self.storage.get_write_index(); + + // If there is no new data, exit immediately (Hot path) + if offset + size_of::() > write_index { + return 0; + } + + let processed_items = (write_index - offset) / size_of::(); + + let window = self.storage.read_window::(offset, processed_items); + + for item in window { + handler(item); + } + + // 3. Commit state exactly once at the end + self.next_index.set(index_to_read + processed_items); + self.op_count.fetch_add(processed_items as u64, Relaxed); + + processed_items + } + + #[inline(always)] pub fn with_at(&self, at: usize, handler: impl FnOnce(&State) -> R) -> Option { let offset = at * size_of::(); let write_index = self.storage.get_write_index(); @@ -124,6 +166,7 @@ impl StoreJournalReader { Some(handler(self.storage.read(offset))) } + #[inline(always)] pub fn with_last(&self, handler: impl FnOnce(&State) -> R) -> Option { let write_index = self.storage.get_write_index(); if write_index < size_of::() { @@ -133,18 +176,22 @@ impl StoreJournalReader { Some(handler(self.storage.read(offset))) } + #[inline(always)] pub fn get(&self) -> Option { self.with(|s| *s) } + #[inline(always)] pub fn get_at(&self, at: usize) -> Option { self.with_at(at, |s| *s) } + #[inline(always)] pub fn get_last(&self) -> Option { self.with_last(|s| *s) } + #[inline(always)] pub fn get_window(&self, at: usize) -> Option<&[State]> { let offset = at * size_of::(); let write_index = self.storage.get_write_index(); @@ -152,9 +199,10 @@ impl StoreJournalReader { return None; } - Some(self.storage.read_window::(offset)) + Some(self.storage.read_window_const::(offset)) } + #[inline(always)] pub fn size(&self) -> usize { self.storage.get_write_index() / size_of::() } diff --git a/src/lib.rs b/src/lib.rs index f7c1a4b..6079df0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,8 @@ +//! Roda is an ultra-high-performance, low-latency state computer for real-time analytics and event-driven systems. +//! +//! It enables building deterministic streaming pipelines with cache-friendly dataflows, +//! wait-free reads, and explicit memory bounds. + mod components; mod engine; mod journal_store; diff --git a/src/measure/e2e_latency_measurer.rs b/src/measure/e2e_latency_measurer.rs new file mode 100644 index 0000000..d944ac3 --- /dev/null +++ b/src/measure/e2e_latency_measurer.rs @@ -0,0 +1,41 @@ +//! End-to-end latency measurer built on top of `LatencyMeasurer`. +//! +//! It provides a zero-allocation tracker based on a monotonic start time, +//! suitable for measuring cross-stage latencies. +use crate::measure::LatencyMeasurer; +use std::sync::LazyLock; +use std::time::{Duration, Instant}; + +/// Monotonic start time used to compute relative nanoseconds. +pub static START_TIME: LazyLock = LazyLock::new(Instant::now); + +/// Measures end-to-end latencies between `add_tracker` and `measure` calls. +pub struct E2ELatencyMeasurer { + pub measurer: LatencyMeasurer, +} + +impl E2ELatencyMeasurer { + /// Creates a new measurer with the given sampling rate. + pub fn new(sample_size: u64) -> Self { + E2ELatencyMeasurer { + measurer: LatencyMeasurer::new(sample_size), + } + } + + /// Returns nanoseconds elapsed since process start. + #[inline(always)] + pub fn nanos_since_start() -> u64 { + START_TIME.elapsed().as_nanos() as u64 + } + + /// Starts a latency measurement and returns a tracker token. + pub fn add_tracker(&self) -> u64 { + Self::nanos_since_start() + } + + /// Completes the measurement using the given tracker token. + pub fn measure(&mut self, tracker: u64) { + let nanos = Self::nanos_since_start() - tracker; + self.measurer.measure(Duration::from_nanos(nanos)); + } +} diff --git a/src/measure/latency_measurer.rs b/src/measure/latency_measurer.rs index bcc67fb..0527da2 100644 --- a/src/measure/latency_measurer.rs +++ b/src/measure/latency_measurer.rs @@ -1,16 +1,26 @@ use hdrhistogram::Histogram; use std::time::{Duration, Instant}; +/// Statistics for latency measurements. #[derive(Debug, Clone, Default)] pub struct LatencyStats { + /// Total number of samples. pub count: u64, + /// Minimum latency in nanoseconds. pub min: u64, + /// Maximum latency in nanoseconds. pub max: u64, + /// Mean latency in nanoseconds. pub mean: f64, + /// 50th percentile (median) latency in nanoseconds. pub p50: u64, + /// 90th percentile latency in nanoseconds. pub p90: u64, + /// 99th percentile latency in nanoseconds. pub p99: u64, + /// 99.9th percentile latency in nanoseconds. pub p999: u64, + /// 99.99th percentile latency in nanoseconds. pub p9999: u64, } @@ -27,7 +37,9 @@ impl Drop for LatencyMeasurerGuard<'_> { } } -/// A latency measurer that uses hdrhistogram. +/// A high-precision latency measurer using HdrHistogram. +/// +/// It supports sampling to minimize overhead in high-throughput systems. pub struct LatencyMeasurer { histogram: Histogram, sum: u64, @@ -61,11 +73,10 @@ impl LatencyMeasurer { } fn measure_local(&mut self, duration: Duration) { - let count = self.sample_rate; let nanos = duration.as_nanos() as u64; let nanos = nanos.clamp(1, 1_000_000_000_000); - self.histogram.record_n(nanos, count).unwrap(); + self.histogram.record(nanos).unwrap(); self.sum += nanos; } @@ -138,42 +149,13 @@ impl LatencyMeasurer { fn format_duration(nanos: f64) -> String { if nanos < 1000.0 { - if nanos == nanos.floor() { - format!("{:.0}ns", nanos) - } else { - format!("{:.1}ns", nanos) - } + format!("{:.1}ns", nanos) } else if nanos < 1_000_000.0 { - let val = nanos / 1000.0; - if val == val.floor() { - format!("{:.0}us", val) - } else { - format!("{:.1}us", val) - } + format!("{:.1}us", nanos / 1000.0) } else if nanos < 1_000_000_000.0 { - let val = nanos / 1_000_000.0; - if val == val.floor() { - format!("{:.0}ms", val) - } else { - let s = format!("{:.2}ms", val); - if s.ends_with("0ms") { - format!("{:.1}ms", val) - } else { - s - } - } + format!("{:.1}ms", nanos / 1_000_000.0) } else { - let val = nanos / 1_000_000_000.0; - if val == val.floor() { - format!("{:.0}s", val) - } else { - let s = format!("{:.2}s", val); - if s.ends_with("0s") { - format!("{:.1}s", val) - } else { - s - } - } + format!("{:.2}s", nanos / 1_000_000_000.0) } } diff --git a/src/measure/mod.rs b/src/measure/mod.rs index 7dfaab5..ce4746d 100644 --- a/src/measure/mod.rs +++ b/src/measure/mod.rs @@ -1,2 +1,5 @@ +mod e2e_latency_measurer; pub mod latency_measurer; + +pub use e2e_latency_measurer::E2ELatencyMeasurer; pub use latency_measurer::{LatencyMeasurer, LatencyStats}; diff --git a/src/op_counter.rs b/src/op_counter.rs index e2ecb72..b79fc98 100644 --- a/src/op_counter.rs +++ b/src/op_counter.rs @@ -1,17 +1,20 @@ use std::sync::atomic::AtomicU64; use std::sync::{Arc, Mutex}; +/// A shared counter for tracking operations across multiple workers. pub struct OpCounter { counters: Mutex>>, } impl OpCounter { + /// Creates a new `OpCounter`. pub fn new() -> Arc { Arc::new(Self { counters: Mutex::new(vec![]), }) } + /// Returns the sum of all individual counters. pub fn total_op_count(&self) -> u64 { self.counters .lock() @@ -21,6 +24,7 @@ impl OpCounter { .sum() } + /// Creates and registers a new individual counter. pub fn new_counter(&self) -> Arc { let counter = Arc::new(AtomicU64::new(0)); diff --git a/src/pipe/dedup_by.rs b/src/pipe/dedup_by.rs index 979657d..7d9acee 100644 --- a/src/pipe/dedup_by.rs +++ b/src/pipe/dedup_by.rs @@ -1,38 +1,77 @@ +use crate::stage::{OutputCollector, Stage}; +use bytemuck::Pod; use std::collections::HashMap; +use std::marker::PhantomData; /// Only emits the event if the value associated with the key has changed. -pub fn dedup_by(mut key_fn: impl FnMut(&T) -> K) -> impl FnMut(T) -> Option +pub struct DedupBy { + key_fn: F, + last_values: HashMap, + _phantom: PhantomData, +} + +impl DedupBy where K: std::hash::Hash + Eq, - T: bytemuck::Pod + Send + Copy + PartialEq, + T: Pod + PartialEq, + F: FnMut(&T) -> K, { - let mut last_values: HashMap = HashMap::new(); - move |curr| { - let key = key_fn(&curr); - let prev = last_values.get(&key); - - if prev == Some(&curr) { - // Value hasn't changed; suppress the event - return None; + pub fn new(key_fn: F) -> Self { + Self { + key_fn, + last_values: HashMap::new(), + _phantom: PhantomData, } + } +} - // Value changed or is new; update cache and emit - last_values.insert(key, curr); - Some(curr) +impl Stage for DedupBy +where + K: std::hash::Hash + Eq + Send, + T: Pod + PartialEq + Send, + F: FnMut(&T) -> K + Send, +{ + #[inline(always)] + fn process(&mut self, curr: &T, collector: &mut C) + where + C: OutputCollector, + { + let key = (self.key_fn)(curr); + let prev = self.last_values.get(&key); + + if prev == Some(curr) { + return; + } + + self.last_values.insert(key, *curr); + collector.push(curr); } } +pub fn dedup_by( + key_fn: impl FnMut(&T) -> K + Send, +) -> DedupBy K + Send> +where + K: std::hash::Hash + Eq, + T: Pod + PartialEq, +{ + DedupBy::new(key_fn) +} + #[cfg(test)] mod dedup_tests { use super::*; #[test] fn test_dedup_logic() { - let mut pipe = dedup_by(|_: &i32| 0); // Use a constant key for global consecutive dedup + let mut pipe = dedup_by(|_: &i32| 0); + let mut out = Vec::new(); + + pipe.process(&10, &mut |x: &i32| out.push(*x)); + pipe.process(&10, &mut |x: &i32| out.push(*x)); + pipe.process(&20, &mut |x: &i32| out.push(*x)); + pipe.process(&10, &mut |x: &i32| out.push(*x)); - assert_eq!(pipe(10), Some(10)); // First time: pass - assert_eq!(pipe(10), None); // Same value: drop - assert_eq!(pipe(20), Some(20)); // New value: pass - assert_eq!(pipe(10), Some(10)); // Changed back: pass + assert_eq!(out, vec![10, 20, 10]); } } diff --git a/src/pipe/delta.rs b/src/pipe/delta.rs index 2c22246..b42713b 100644 --- a/src/pipe/delta.rs +++ b/src/pipe/delta.rs @@ -1,24 +1,71 @@ +use crate::stage::{OutputCollector, Stage}; +use bytemuck::Pod; use std::collections::HashMap; +use std::marker::PhantomData; -/// Compares current item with the previous item of the same key. -pub fn delta( - mut key_fn: impl FnMut(&T) -> K, - mut logic: impl FnMut(T, Option) -> Option, -) -> impl FnMut(T) -> Option +/// Compares the current item with the previous item associated with the same key. +/// +/// This stage is useful for calculating changes or deltas between events in a stream. +pub struct Delta { + key_fn: F, + logic: L, + last_values: HashMap, + _phantom: PhantomData<(T, Out)>, +} + +impl Delta where K: std::hash::Hash + Eq, - T: bytemuck::Pod + Send + Copy, - Out: bytemuck::Pod + Send, + T: Pod, + Out: Pod, + F: FnMut(&T) -> K, + L: FnMut(&T, Option) -> Option, +{ + pub fn new(key_fn: F, logic: L) -> Self { + Self { + key_fn, + logic, + last_values: HashMap::new(), + _phantom: PhantomData, + } + } +} + +impl Stage for Delta +where + K: std::hash::Hash + Eq + Send, + T: Pod + Send, + Out: Pod + Send, + F: FnMut(&T) -> K + Send, + L: FnMut(&T, Option) -> Option + Send, { - let mut last_values: HashMap = HashMap::new(); - move |curr| { - let key = key_fn(&curr); - let prev = last_values.get(&key).copied(); - last_values.insert(key, curr); - logic(curr, prev) + #[inline(always)] + fn process(&mut self, curr: &T, collector: &mut C) + where + C: OutputCollector, + { + let key = (self.key_fn)(curr); + let prev = self.last_values.get(&key).copied(); + self.last_values.insert(key, *curr); + if let Some(out) = (self.logic)(curr, prev) { + collector.push(&out); + } } } +#[allow(clippy::type_complexity)] +pub fn delta( + key_fn: impl FnMut(&T) -> K + Send, + logic: impl FnMut(&T, Option) -> Option + Send, +) -> Delta K + Send, impl FnMut(&T, Option) -> Option + Send> +where + K: std::hash::Hash + Eq, + T: Pod, + Out: Pod, +{ + Delta::new(key_fn, logic) +} + #[repr(C)] #[derive(Copy, Clone, bytemuck::Pod, bytemuck::Zeroable, Debug, PartialEq)] struct Metric { @@ -28,7 +75,6 @@ struct Metric { #[test] fn test_delta_logic() { - // Return u8 (1 for alert, 0 for none) to satisfy Pod let mut pipe = delta( |m: &Metric| m.id, |curr, prev| match prev { @@ -36,10 +82,13 @@ fn test_delta_logic() { _ => Some(0u8), }, ); + let mut out = Vec::new(); let m1 = Metric { id: 1, val: 10.0 }; let m2 = Metric { id: 1, val: 17.0 }; - assert_eq!(pipe(m1), Some(0u8)); - assert_eq!(pipe(m2), Some(1u8)); // Alert triggered + pipe.process(&m1, &mut |x: &u8| out.push(*x)); + pipe.process(&m2, &mut |x: &u8| out.push(*x)); + + assert_eq!(out, vec![0u8, 1u8]); } diff --git a/src/pipe/filter.rs b/src/pipe/filter.rs index e8087da..0e45367 100644 --- a/src/pipe/filter.rs +++ b/src/pipe/filter.rs @@ -1,13 +1,42 @@ -/// Only passes items that satisfy the predicate. -pub fn filter(mut predicate: impl FnMut(&T) -> bool) -> impl FnMut(T) -> Option -where - T: bytemuck::Pod + Send, -{ - move |item| { - if predicate(&item) { Some(item) } else { None } +use crate::stage::{OutputCollector, Stage}; +use bytemuck::Pod; +use std::marker::PhantomData; + +/// Filters items based on a predicate. +/// +/// Only items for which the predicate returns `true` are passed to the next stage. +pub struct Filter { + predicate: F, + _phantom: PhantomData, +} + +impl bool> Filter { + pub fn new(predicate: F) -> Self { + Self { + predicate, + _phantom: PhantomData, + } } } +impl bool> Stage for Filter { + #[inline(always)] + fn process(&mut self, data: &T, collector: &mut C) + where + C: OutputCollector, + { + if (self.predicate)(data) { + collector.push(data); + } + } +} + +pub fn filter( + predicate: impl FnMut(&T) -> bool, +) -> Filter bool> { + Filter::new(predicate) +} + #[cfg(test)] mod filter_tests { use super::*; @@ -15,8 +44,11 @@ mod filter_tests { #[test] fn test_filter_logic() { let mut pipe = filter(|x: &i32| *x > 0); + let mut out = Vec::new(); + + pipe.process(&10, &mut |x: &i32| out.push(*x)); + pipe.process(&-5, &mut |x: &i32| out.push(*x)); - assert_eq!(pipe(10), Some(10)); - assert_eq!(pipe(-5), None); + assert_eq!(out, vec![10]); } } diff --git a/src/pipe/inspect.rs b/src/pipe/inspect.rs index 8850612..aae372c 100644 --- a/src/pipe/inspect.rs +++ b/src/pipe/inspect.rs @@ -1,14 +1,38 @@ /// Passes the item through while performing a side effect. -pub fn inspect(mut f: impl FnMut(&T)) -> impl FnMut(T) -> Option -where - T: bytemuck::Pod + Send, -{ - move |item| { - f(&item); - Some(item) +use crate::stage::{OutputCollector, Stage}; +use bytemuck::Pod; +use std::marker::PhantomData; + +/// Passes the item through while performing a side effect. +pub struct Inspect { + f: F, + _phantom: PhantomData, +} + +impl Inspect { + pub fn new(f: F) -> Self { + Self { + f, + _phantom: PhantomData, + } } } +impl Stage for Inspect { + #[inline(always)] + fn process(&mut self, data: &T, collector: &mut C) + where + C: OutputCollector, + { + (self.f)(data); + collector.push(data); + } +} + +pub fn inspect(f: impl FnMut(&T)) -> Inspect { + Inspect::new(f) +} + #[cfg(test)] mod tests { use super::*; @@ -18,13 +42,15 @@ mod tests { #[test] fn test_inspect_logic() { let count = Arc::new(AtomicUsize::new(0)); - let mut pipe = inspect(|_x: &u32| { - count.fetch_add(1, Ordering::Relaxed); + let count_inner = count.clone(); + let mut pipe = inspect(move |_x: &u32| { + count_inner.fetch_add(1, Ordering::Relaxed); }); - let res = pipe(42); + let mut out = Vec::new(); + pipe.process(&42u32, &mut |x: &u32| out.push(*x)); - assert_eq!(res, Some(42)); + assert_eq!(out, vec![42]); assert_eq!(count.load(Ordering::Relaxed), 1); } } diff --git a/src/pipe/latency.rs b/src/pipe/latency.rs index ab6fe00..275e337 100644 --- a/src/pipe/latency.rs +++ b/src/pipe/latency.rs @@ -44,7 +44,7 @@ where S: Stage, { #[inline(always)] - fn process(&mut self, data: In, collector: &mut C) + fn process(&mut self, data: &In, collector: &mut C) where C: OutputCollector, { @@ -81,24 +81,24 @@ mod tests { #[test] fn test_latency_logic() { - let mut pipe = latency("test", 2, 1, |x: u32| { + let mut pipe = latency("test", 2, 1, |x: &u32| { thread::sleep(Duration::from_millis(10)); - Some(x as u64) + Some(*x as u64) }); let mut out = Vec::new(); // Process 1st item { - let mut collector = |x: u64| out.push(x); - pipe.process(1u32, &mut collector); + let mut collector = |x: &u64| out.push(*x); + pipe.process(&1u32, &mut collector); } assert_eq!(out, vec![1]); // Process 2nd item - should trigger print { - let mut collector = |x: u64| out.push(x); - pipe.process(2u32, &mut collector); + let mut collector = |x: &u64| out.push(*x); + pipe.process(&2u32, &mut collector); } assert_eq!(out, vec![1, 2]); diff --git a/src/pipe/map.rs b/src/pipe/map.rs index 0fa5799..23ed1eb 100644 --- a/src/pipe/map.rs +++ b/src/pipe/map.rs @@ -1,10 +1,39 @@ +use crate::stage::{OutputCollector, Stage}; +use bytemuck::Pod; +use std::marker::PhantomData; + /// Transforms an item from one type to another. -pub fn map(mut f: impl FnMut(In) -> Out) -> impl FnMut(In) -> Option +pub struct Map { + f: F, + _phantom: PhantomData<(In, Out)>, +} + +impl Out> Map { + pub fn new(f: F) -> Self { + Self { + f, + _phantom: PhantomData, + } + } +} + +impl Out> Stage for Map { + #[inline(always)] + fn process(&mut self, data: &In, collector: &mut C) + where + C: OutputCollector, + { + let out = (self.f)(data); + collector.push(&out); + } +} + +pub fn map(f: impl FnMut(&In) -> Out) -> Map Out> where - In: bytemuck::Pod + Send, - Out: bytemuck::Pod + Send, + In: Pod + Send, + Out: Pod + Send, { - move |item| Some(f(item)) + Map::new(f) } #[cfg(test)] @@ -14,8 +43,11 @@ mod map_tests { #[test] fn test_map_logic() { // Transform u32 to u64 - let mut pipe = map(|x: u32| x as u64 * 2); + let mut pipe = map(|x: &u32| *x as u64 * 2); + let mut out = Vec::new(); + + pipe.process(&21u32, &mut |x: &u64| out.push(*x)); - assert_eq!(pipe(21), Some(42u64)); + assert_eq!(out, vec![42u64]); } } diff --git a/src/pipe/mod.rs b/src/pipe/mod.rs index fd75330..e4fb2ef 100644 --- a/src/pipe/mod.rs +++ b/src/pipe/mod.rs @@ -1,3 +1,7 @@ +//! Reusable pipeline components for building stream processing stages. +//! +//! Each component implements the `Stage` trait and can be composed using `StageExt`. + mod dedup_by; mod delta; mod filter; @@ -6,7 +10,7 @@ mod latency; mod map; mod progress; mod stateful; -mod windowed; +mod track; pub use dedup_by::dedup_by; pub use delta::delta; @@ -16,4 +20,4 @@ pub use latency::latency; pub use map::map; pub use progress::progress; pub use stateful::stateful; -pub use windowed::windowed; +pub use track::{Tracked, track_prev, track_prev_by_hashmap}; diff --git a/src/pipe/progress.rs b/src/pipe/progress.rs index ee686b1..6e3149e 100644 --- a/src/pipe/progress.rs +++ b/src/pipe/progress.rs @@ -1,40 +1,66 @@ +use crate::stage::{OutputCollector, Stage}; +use bytemuck::Pod; use spdlog::info; +use std::marker::PhantomData; use std::time::Instant; /// A pipe that logs progress information. -pub fn progress(name: impl Into, interval: usize) -> impl FnMut(T) -> Option -where - T: bytemuck::Pod + Send, -{ - assert!(interval > 0, "interval must be greater than 0"); - let name = name.into(); - let mut count: usize = 0; - let mut last_instant = Instant::now(); - let start_instant = last_instant; +pub struct Progress { + name: String, + interval: usize, + count: usize, + last_instant: Instant, + start_instant: Instant, + _phantom: PhantomData, +} - move |item| { - count += 1; - if count.is_multiple_of(interval) { +impl Progress { + pub fn new(name: impl Into, interval: usize) -> Self { + assert!(interval > 0, "interval must be greater than 0"); + let now = Instant::now(); + Self { + name: name.into(), + interval, + count: 0, + last_instant: now, + start_instant: now, + _phantom: PhantomData, + } + } +} + +impl Stage for Progress { + #[inline(always)] + fn process(&mut self, data: &T, collector: &mut C) + where + C: OutputCollector, + { + self.count += 1; + if self.count.is_multiple_of(self.interval) { let now = Instant::now(); - let elapsed = now.duration_since(last_instant); - let total_elapsed = now.duration_since(start_instant); + let elapsed = now.duration_since(self.last_instant); + let total_elapsed = now.duration_since(self.start_instant); - let mps = interval as f64 / elapsed.as_secs_f64(); - let total_mps = count as f64 / total_elapsed.as_secs_f64(); + let mps = self.interval as f64 / elapsed.as_secs_f64(); + let total_mps = self.count as f64 / total_elapsed.as_secs_f64(); info!( - "[{}] Processed {} messages, Rate: {} msg/s, Avg: {} msg/s", - name, - format_count(count as f64), + "[{}] Processed {} msgs, Rate: {}/s, Avg: {}/s", + self.name, + format_count(self.count as f64), format_count(mps), format_count(total_mps) ); - last_instant = now; + self.last_instant = now; } - Some(item) + collector.push(data); } } +pub fn progress(name: impl Into, interval: usize) -> Progress { + Progress::new(name, interval) +} + fn format_count(val: f64) -> String { if val < 1000.0 { if val == val.floor() { @@ -61,46 +87,25 @@ mod tests { #[test] fn test_progress_logic() { - let mut pipe = progress("test", 2); + let mut pipe = progress::("test", 2); + let mut out = Vec::new(); // Process 1st item - let res = pipe(1u32); - assert_eq!(res, Some(1)); + pipe.process(&1u32, &mut |x: &u32| out.push(*x)); + assert_eq!(out, vec![1]); // Process 2nd item - should trigger print thread::sleep(Duration::from_millis(10)); - let res = pipe(2u32); - assert_eq!(res, Some(2)); + pipe.process(&2u32, &mut |x: &u32| out.push(*x)); + assert_eq!(out, vec![1, 2]); // Process 3rd item - let res = pipe(3u32); - assert_eq!(res, Some(3)); + pipe.process(&3u32, &mut |x: &u32| out.push(*x)); + assert_eq!(out, vec![1, 2, 3]); // Process 4th item - should trigger print thread::sleep(Duration::from_millis(10)); - let res = pipe(4u32); - assert_eq!(res, Some(4)); - } - - #[test] - fn test_progress_no_delay() { - let mut pipe = progress("test_fast", 2); - for i in 0..10 { - pipe(i); - } - } - - #[test] - fn test_format_count() { - assert_eq!(format_count(0.0), "0"); - assert_eq!(format_count(123.0), "123"); - assert_eq!(format_count(123.45), "123.45"); - assert_eq!(format_count(1000.0), "1.00k"); - assert_eq!(format_count(1234.0), "1.23k"); - assert_eq!(format_count(1_000_000.0), "1.00m"); - assert_eq!(format_count(1_234_567.0), "1.23m"); - assert_eq!(format_count(1_000_000_000.0), "1.00b"); - assert_eq!(format_count(1_234_567_890.0), "1.23b"); - assert_eq!(format_count(1_000_000_000_000.0), "1.00t"); + pipe.process(&4u32, &mut |x: &u32| out.push(*x)); + assert_eq!(out, vec![1, 2, 3, 4]); } } diff --git a/src/pipe/stateful.rs b/src/pipe/stateful.rs index a411dc4..90bf06f 100644 --- a/src/pipe/stateful.rs +++ b/src/pipe/stateful.rs @@ -1,27 +1,85 @@ +use crate::stage::{OutputCollector, Stage}; +use bytemuck::Pod; use std::collections::HashMap; +use std::marker::PhantomData; -/// Manages a per-key state for aggregations. -pub fn stateful( - mut key_fn: impl FnMut(&In) -> K, - mut init_fn: impl FnMut(&In) -> Out, - mut fold_fn: impl FnMut(&mut Out, In), -) -> impl FnMut(In) -> Option +/// Maintains per-key state for stateful aggregations or processing. +/// +/// It uses a `HashMap` to store state for each key and applies a folding function +/// to update the state with each incoming item. +pub struct Stateful { + key_fn: KF, + init_fn: IF, + fold_fn: FF, + storage: HashMap, + _phantom: PhantomData, +} + +impl Stateful where K: std::hash::Hash + Eq, - In: bytemuck::Pod + Send, - Out: bytemuck::Pod + Send + Copy, + In: Pod, + Out: Pod, + KF: FnMut(&In) -> K, + IF: FnMut(&In) -> Out, + FF: FnMut(&mut Out, &In), +{ + pub fn new(key_fn: KF, init_fn: IF, fold_fn: FF) -> Self { + Self { + key_fn, + init_fn, + fold_fn, + storage: HashMap::new(), + _phantom: PhantomData, + } + } +} + +impl Stage for Stateful +where + K: std::hash::Hash + Eq + Send, + In: Pod + Send, + Out: Pod + Send, + KF: FnMut(&In) -> K + Send, + IF: FnMut(&In) -> Out + Send, + FF: FnMut(&mut Out, &In) + Send, { - let mut storage: HashMap = HashMap::new(); - move |item| { - let key = key_fn(&item); - let entry = storage + #[inline(always)] + fn process(&mut self, item: &In, collector: &mut C) + where + C: OutputCollector, + { + let key = (self.key_fn)(item); + let entry = self + .storage .entry(key) - .and_modify(|state| fold_fn(state, item)) - .or_insert_with(|| init_fn(&item)); - Some(*entry) + .and_modify(|state| (self.fold_fn)(state, item)) + .or_insert_with(|| (self.init_fn)(item)); + collector.push(entry); } } +#[allow(clippy::type_complexity)] +pub fn stateful( + key_fn: impl FnMut(&In) -> K + Send, + init_fn: impl FnMut(&In) -> Out + Send, + fold_fn: impl FnMut(&mut Out, &In) + Send, +) -> Stateful< + K, + In, + Out, + impl FnMut(&In) -> K + Send, + impl FnMut(&In) -> Out + Send, + impl FnMut(&mut Out, &In) + Send, +> +where + K: std::hash::Hash + Eq, + In: Pod, + Out: Pod, +{ + Stateful::new(key_fn, init_fn, fold_fn) +} + #[repr(C)] #[derive(Debug, Clone, Copy, Default, bytemuck::Pod, bytemuck::Zeroable)] pub struct Message { @@ -35,19 +93,21 @@ mod stateful_tests { #[test] fn test_stateful_logic() { - // Now using our Pod-compliant struct instead of a tuple let mut pipe = stateful( - |item: &Message| item.id, // Key: ID - |item| item.value, // Init: First value - |state, item| *state += item.value, // Fold: Add new value + |item: &Message| item.id, + |item| item.value, + |state, item| *state += item.value, ); + let mut out = Vec::new(); let m1 = Message { id: 1, value: 10 }; let m2 = Message { id: 2, value: 5 }; let m3 = Message { id: 1, value: 20 }; - assert_eq!(pipe(m1), Some(10)); - assert_eq!(pipe(m2), Some(5)); - assert_eq!(pipe(m3), Some(30)); + pipe.process(&m1, &mut |x: &i64| out.push(*x)); + pipe.process(&m2, &mut |x: &i64| out.push(*x)); + pipe.process(&m3, &mut |x: &i64| out.push(*x)); + + assert_eq!(out, vec![10, 5, 30]); } } diff --git a/src/pipe/track.rs b/src/pipe/track.rs new file mode 100644 index 0000000..cc79f20 --- /dev/null +++ b/src/pipe/track.rs @@ -0,0 +1,145 @@ +use crate::stage::{OutputCollector, Stage}; +use bytemuck::{Pod, Zeroable}; +use std::collections::HashMap; +use std::marker::PhantomData; + +/// A struct that holds the current and previous values of a stream. +/// This is used to satisfy the `Pod` constraint while providing tuple-like behavior. +#[repr(C)] +#[derive(Debug, Clone, Copy, Default)] +pub struct Tracked { + pub prev: T, + pub curr: T, + pub has_prev: u8, +} + +unsafe impl Zeroable for Tracked {} +unsafe impl Pod for Tracked {} + +impl Tracked { + /// Returns the previous value as an Option. + pub fn prev(&self) -> Option { + if self.has_prev != 0 { + Some(self.prev) + } else { + None + } + } +} + +pub struct TrackPrevByHashmap { + key_fn: F, + storage: HashMap, + _phantom: PhantomData, +} + +impl TrackPrevByHashmap +where + K: std::hash::Hash + Eq, + T: Pod + Zeroable + Copy, + F: FnMut(&T) -> K, +{ + pub fn new(key_fn: F) -> Self { + Self { + key_fn, + storage: HashMap::new(), + _phantom: PhantomData, + } + } +} + +impl Stage> for TrackPrevByHashmap +where + K: std::hash::Hash + Eq, + T: Pod + Zeroable + Copy + Send, + F: FnMut(&T) -> K + Send, +{ + #[inline(always)] + fn process(&mut self, item: &T, collector: &mut C) + where + C: OutputCollector>, + { + let key = (self.key_fn)(item); + let prev = self.storage.get(&key).copied(); + self.storage.insert(key, *item); + + collector.push(&Tracked { + prev: prev.unwrap_or(T::zeroed()), + curr: *item, + has_prev: if prev.is_some() { 1 } else { 0 }, + }); + } +} + +pub fn track_prev_by_hashmap( + key_fn: impl FnMut(&T) -> K + Send, +) -> TrackPrevByHashmap K + Send> +where + K: std::hash::Hash + Eq, + T: Pod + Zeroable + Copy + Send, +{ + TrackPrevByHashmap::new(key_fn) +} + +pub struct TrackPrev { + last_value: Option, +} + +impl Stage> for TrackPrev { + #[inline(always)] + fn process(&mut self, curr: &T, collector: &mut C) + where + C: OutputCollector>, + { + let prev = self.last_value.replace(*curr); + collector.push(&Tracked { + prev: prev.unwrap_or(T::zeroed()), + curr: *curr, + has_prev: if prev.is_some() { 1 } else { 0 }, + }); + } +} + +pub fn track_prev() -> TrackPrev { + TrackPrev { last_value: None } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_track_prev_by_hashmap() { + let mut pipe = track_prev_by_hashmap(|val: &i32| *val % 2); + let mut out = Vec::new(); + + // Key 0 (even): 2 + pipe.process(&2, &mut |res: &Tracked| out.push(*res)); + assert_eq!(out.last().unwrap().prev(), None); + assert_eq!(out.last().unwrap().curr, 2); + + // Key 1 (odd): 3 + pipe.process(&3, &mut |res: &Tracked| out.push(*res)); + assert_eq!(out.last().unwrap().prev(), None); + assert_eq!(out.last().unwrap().curr, 3); + + // Key 0 (even): 4, prev was 2 + pipe.process(&4, &mut |res: &Tracked| out.push(*res)); + assert_eq!(out.last().unwrap().prev(), Some(2)); + assert_eq!(out.last().unwrap().curr, 4); + } + + #[test] + fn test_track_prev() { + let mut pipe = track_prev::(); + let mut out = Vec::new(); + + pipe.process(&10, &mut |res: &Tracked| out.push(*res)); + assert_eq!(out.last().unwrap().prev(), None); + assert_eq!(out.last().unwrap().curr, 10); + + pipe.process(&20, &mut |res: &Tracked| out.push(*res)); + assert_eq!(out.last().unwrap().prev(), Some(10)); + assert_eq!(out.last().unwrap().curr, 20); + } +} diff --git a/src/pipe/windowed.rs b/src/pipe/windowed.rs deleted file mode 100644 index f41efeb..0000000 --- a/src/pipe/windowed.rs +++ /dev/null @@ -1,27 +0,0 @@ -/// Aligns a timestamp to the start of a fixed-duration window. -#[inline(always)] -pub fn windowed(timestamp: u64, window_size: u64) -> u64 { - if window_size == 0 { - return timestamp; - } - (timestamp / window_size) * window_size -} - -#[cfg(test)] -mod window_tests { - use super::*; - - #[test] - fn test_window_alignment() { - let t1 = 150_200; - let t2 = 199_999; - let window = 100_000; - - // Both should fall into the 100,000 bucket - assert_eq!(windowed(t1, window), 100_000); - assert_eq!(windowed(t2, window), 100_000); - - // Next bucket - assert_eq!(windowed(200_001, window), 200_000); - } -} diff --git a/src/slot_store.rs b/src/slot_store.rs index fe56e27..4142430 100644 --- a/src/slot_store.rs +++ b/src/slot_store.rs @@ -6,19 +6,27 @@ use bytemuck::Pod; use std::path::PathBuf; use std::sync::Arc; +/// A random-access store for slot-based data. +/// +/// It supports consistent reads without blocking writers using a versioning scheme. pub struct SlotStore { storage: SlotMmap, pub op_counter: Arc, num_slots: usize, } +/// A reader for a `SlotStore` that provides snapshot reads. pub struct SlotStoreReader { storage: SlotMmap, } +/// Configuration options for a `SlotStore`. pub struct SlotStoreOptions { + /// The name of the store, used for the filename. pub name: &'static str, + /// The number of slots in the store. pub size: usize, + /// Whether to keep the store only in memory. pub in_memory: bool, } @@ -71,7 +79,6 @@ impl Settable for SlotStore { impl SlotStoreReader { /// Performs a consistent snapshot read with retry logic pub fn with_at(&self, at: usize, handler: impl FnOnce(&State) -> R) -> Option { - // Using 100 retries to ensure we get a consistent L5 snapshot self.storage .read_snapshot_with_retry(at, 100) .map(|state| handler(&state)) diff --git a/src/stage.rs b/src/stage.rs index 80f611d..4db1a10 100644 --- a/src/stage.rs +++ b/src/stage.rs @@ -1,42 +1,81 @@ use bytemuck::Pod; use std::marker::PhantomData; +/// Represents a processing stage in the pipeline. +/// +/// A stage takes an input of type `In` and can produce zero or more outputs of type `Out`. pub trait Stage { - fn process(&mut self, data: In, collector: &mut C) + /// Processes a single input item. + fn process(&mut self, data: &In, collector: &mut C) where C: OutputCollector; } +/// A collector for output items produced by a stage. pub trait OutputCollector { - fn push(&mut self, item: T); + /// Collects a single output item. + fn push(&mut self, item: &T); } impl OutputCollector for F where - F: FnMut(T), + F: FnMut(&T), { #[inline(always)] - fn push(&mut self, item: T) { - (self)(item); + fn push(&mut self, item: &T) { + self(item); } } -impl Stage for F +pub trait StageOutput { + fn push_to>(self, collector: &mut C); +} + +impl StageOutput for T { + #[inline(always)] + fn push_to>(self, collector: &mut C) { + collector.push(&self); + } +} + +impl StageOutput for &T { + #[inline(always)] + fn push_to>(self, collector: &mut C) { + collector.push(self); + } +} + +impl StageOutput for Option { + #[inline(always)] + fn push_to>(self, collector: &mut C) { + if let Some(r) = self { + collector.push(&r); + } + } +} + +impl StageOutput for Option<&T> { + #[inline(always)] + fn push_to>(self, collector: &mut C) { + if let Some(r) = self { + collector.push(r); + } + } +} + +impl Stage for F where - F: FnMut(In) -> Option, In: Pod + Send, Out: Pod + Send, + F: FnMut(&In) -> R, + R: StageOutput, { #[inline(always)] - fn process(&mut self, data: In, collector: &mut C) + fn process(&mut self, data: &In, collector: &mut C) where C: OutputCollector, { - // Execute the closure and pass the result downstream - let out = (self)(data); - if let Some(out) = out { - collector.push(out); - } + (self)(data).push_to(collector); } } @@ -46,6 +85,25 @@ pub struct Pipeline { _phantom: PhantomData<(In, Mid, Out)>, } +pub struct PipelineCollector<'a, S, C, T> { + stage: &'a mut S, + collector: &'a mut C, + _phantom: PhantomData, +} + +impl<'a, S, C, In, Out> OutputCollector for PipelineCollector<'a, S, C, Out> +where + In: Pod + Send, + Out: Pod + Send, + S: Stage, + C: OutputCollector, +{ + #[inline(always)] + fn push(&mut self, item: &In) { + self.stage.process(item, self.collector); + } +} + impl Stage for Pipeline where In: Pod + Send, @@ -55,17 +113,22 @@ where S2: Stage, { #[inline(always)] - fn process(&mut self, data: In, collector: &mut C) + fn process(&mut self, data: &In, collector: &mut C) where C: OutputCollector, { - self.s1.process(data, &mut |mid| { - self.s2.process(mid, collector); - }); + let mut pc = PipelineCollector { + stage: &mut self.s2, + collector, + _phantom: PhantomData, + }; + self.s1.process(data, &mut pc); } } +/// Extension trait for composing stages into pipelines. pub trait StageExt: Stage { + /// Pipes the output of this stage into another stage. #[inline(always)] fn pipe>(self, s2: S2) -> Pipeline where @@ -94,10 +157,10 @@ mod tests { #[test] fn test_pipe_closures() { - let mut p = pipe![|x: u32| Some(x as u64), |x: u64| Some(x as u8),]; + let mut p = pipe![|x: &u32| Some(*x as u64), |x: &u64| Some(*x as u8),]; let mut out = Vec::new(); - p.process(100u32, &mut |x: u8| out.push(x)); + p.process(&100u32, &mut |x: &u8| out.push(*x)); assert_eq!(out, vec![100u8]); } @@ -105,7 +168,7 @@ mod tests { fn test_pipe_one_to_many() { struct Duplicate; impl Stage for Duplicate { - fn process(&mut self, data: u64, collector: &mut C) + fn process(&mut self, data: &u64, collector: &mut C) where C: OutputCollector, { @@ -114,10 +177,12 @@ mod tests { } } - let mut p = pipe![|x: u32| Some(x as u64), Duplicate, |x: u64| Some(x as u8),]; + let mut p = pipe![|x: &u32| Some(*x as u64), Duplicate, |x: &u64| Some( + *x as u8 + ),]; let mut out = Vec::new(); - p.process(10u32, &mut |x: u8| out.push(x)); + p.process(&10u32, &mut |x: &u8| out.push(*x)); assert_eq!(out, vec![10u8, 10u8]); } } diff --git a/src/stage_engine.rs b/src/stage_engine.rs index bf5d4a8..a7588e3 100644 --- a/src/stage_engine.rs +++ b/src/stage_engine.rs @@ -16,6 +16,10 @@ pub struct StageEngine { } impl StageEngine { + /// Enables or disables core pinning for worker threads. + pub fn set_pin_cores(&mut self, enabled: bool) { + self.engine.set_pin_cores(enabled); + } /// Adds a new stage to the pipeline. /// This method consumes the current engine and returns a new one with the updated output type. /// A new thread is spawned to run the provided stage. @@ -56,16 +60,9 @@ impl StageEngine { let next_reader = next_store.reader(); self.engine.run_worker(move || { - if reader.next() { - if let Some(data) = reader.get() { - stage.process(data, &mut |out: NextOut| { - next_store.append(out); - }); - } - } else { - // Yield to prevent 100% CPU usage when no data is available - std::thread::yield_now(); - } + reader.handle_remaining(|data| { + stage.process(data, &mut |out: &NextOut| next_store.append(out)); + }) > 0 }); StageEngine { @@ -78,13 +75,13 @@ impl StageEngine { } /// Sends data into the start of the pipeline. - /// Requires &mut self because JournalStore::append requires it (Single-Writer). - pub fn send(&mut self, data: In) { + pub fn send(&mut self, data: &In) { self.input_store.append(data); } /// Receives data from the end of the pipeline. - /// This will block/poll until data is available. + /// + /// This will block until data is available or a worker panics. pub fn receive(&self) -> Option { loop { if let Some(data) = self.try_receive() { @@ -110,10 +107,6 @@ impl StageEngine { self.output_reader.size() } - pub fn enable_latency_stats(&mut self, enabled: bool) { - self.engine.enable_latency_stats(enabled); - } - /// Waits for all workers to finish processing. pub fn await_idle(&self, timeout: Duration) { self.engine.await_idle(timeout); @@ -121,7 +114,7 @@ impl StageEngine { } impl Appendable for StageEngine { - fn append(&mut self, state: In) { + fn append(&mut self, state: &In) { self.send(state); } } @@ -167,10 +160,10 @@ mod tests { #[test] fn test_new_engine_threaded_pipeline() { let mut engine = StageEngine::::new() - .add_stage(|x: u32| Some(x as u64)) - .add_stage(|x: u64| Some(x as u8)); + .add_stage(|x: &u32| Some(*x as u64)) + .add_stage(|x: &u64| Some(*x as u8)); - engine.send(100u32); + engine.send(&100u32); let result = engine.receive(); assert_eq!(result, Some(100u8)); @@ -180,20 +173,20 @@ mod tests { fn test_new_engine_multiple_outputs() { struct Duplicate; impl Stage for Duplicate { - fn process(&mut self, data: u32, collector: &mut C) + fn process(&mut self, data: &u32, collector: &mut C) where C: crate::stage::OutputCollector, { collector.push(data); - collector.push(data + 1); + collector.push(&(data + 1)); } } let mut engine = StageEngine::::new() .add_stage(Duplicate) - .add_stage(|x: u32| Some(x as u64)); + .add_stage(|x: &u32| Some(*x as u64)); - engine.send(10u32); + engine.send(&10u32); assert_eq!(engine.receive(), Some(10u64)); assert_eq!(engine.receive(), Some(11u64)); @@ -201,15 +194,15 @@ mod tests { #[test] fn test_engine_concurrency() { - let mut engine = StageEngine::::new().add_stage(|x: u32| { + let mut engine = StageEngine::::new().add_stage(|x: &u32| { // Simulate some work thread::sleep(Duration::from_millis(10)); - Some(x * 2) + Some(*x * 2) }); - engine.send(1); - engine.send(2); - engine.send(3); + engine.send(&1); + engine.send(&2); + engine.send(&3); assert_eq!(engine.receive(), Some(2)); assert_eq!(engine.receive(), Some(4)); diff --git a/src/storage/journal_mmap.rs b/src/storage/journal_mmap.rs index d71aee1..14ada7a 100644 --- a/src/storage/journal_mmap.rs +++ b/src/storage/journal_mmap.rs @@ -5,6 +5,9 @@ use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::AtomicUsize; +/// A memory-mapped buffer optimized for sequential, append-only operations. +/// +/// It supports wait-free reads while the writer is appending data. pub(crate) struct JournalMmap { _mmap: Arc, ptr: *mut u8, @@ -60,9 +63,8 @@ impl JournalMmap { // --- Bytemuck Methods --- - /// 1. Read (Immutable) - /// /// Casts bytes at offset to a reference of T. + #[inline(always)] pub(crate) fn read(&self, offset: usize) -> &T { let end = offset + size_of::(); assert!( @@ -72,7 +74,8 @@ impl JournalMmap { bytemuck::from_bytes(&self.slice()[offset..end]) } - pub(crate) fn read_window(&self, offset: usize) -> &[T] { + #[inline(always)] + pub(crate) fn read_window_const(&self, offset: usize) -> &[T] { let end = offset + size_of::() * N; assert!( end <= self.len, @@ -83,6 +86,26 @@ impl JournalMmap { bytemuck::cast_slice(bytes) } + /// Returns a slice of T starting at the given offset. + /// + /// This is more efficient than calling `read` multiple times. + #[inline(always)] + pub(crate) fn read_window(&self, offset: usize, count: usize) -> &[T] { + let end = offset + size_of::() * count; + assert!( + end <= self.len, + "Read crosses buffer boundary - alignment issue?" + ); + let bytes = &self.slice()[offset..end]; + + bytemuck::cast_slice(bytes) + } + + /// Appends an item to the buffer. + /// + /// # Panics + /// Panics if the buffer is full. + #[inline(always)] pub(crate) fn append(&mut self, state: &T) { let current_pos = self.write_index.load(std::sync::atomic::Ordering::Relaxed); let size = size_of::(); @@ -103,23 +126,28 @@ impl JournalMmap { .store(end, std::sync::atomic::Ordering::Release); } + #[inline(always)] fn slice(&self) -> &[u8] { unsafe { std::slice::from_raw_parts(self.ptr, self.len) } } + #[inline(always)] fn slice_mut(&mut self) -> &mut [u8] { assert!(!self.read_only, "Cannot mutate read-only buffer"); unsafe { std::slice::from_raw_parts_mut(self.ptr, self.len) } } + #[inline(always)] pub(crate) fn get_write_index(&self) -> usize { self.write_index.load(std::sync::atomic::Ordering::Acquire) } + #[inline(always)] pub(crate) fn len(&self) -> usize { self.len } + #[inline(always)] pub(crate) fn reader(&self) -> JournalMmap { JournalMmap { _mmap: self._mmap.clone(), @@ -177,7 +205,7 @@ mod tests { journal.append(&2u32); journal.append(&3u32); - let window: &[u32] = journal.read_window::(0); + let window: &[u32] = journal.read_window_const::(0); assert_eq!(window, &[1, 2, 3]); } @@ -202,7 +230,7 @@ mod tests { let mut journal = JournalMmap::new(None, 8).unwrap(); journal.append(&1u32); journal.append(&2u32); - let _: &[u32] = journal.read_window::(0); // Should panic + let _: &[u32] = journal.read_window_const::(0); // Should panic } #[test] diff --git a/src/storage/slot_mmap.rs b/src/storage/slot_mmap.rs index 05b32c1..8b01642 100644 --- a/src/storage/slot_mmap.rs +++ b/src/storage/slot_mmap.rs @@ -6,6 +6,9 @@ use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; +/// A memory-mapped buffer for random-access, slot-based storage. +/// +/// It uses a versioning scheme (SeqLock-like) for consistent reads without blocking the writer. pub struct SlotMmap { _mmap: Arc, ptr: *mut u8, @@ -62,7 +65,7 @@ impl SlotMmap { }) } - /// WRITER: Updates the specific slot by index. + /// WRITER: Updates the specific slot by index using versioning. pub fn write(&mut self, index: usize, state: &T) { assert!(index < self.num_slots); let offset = index * self.slot_size; @@ -88,7 +91,7 @@ impl SlotMmap { } } - /// READER: Snapshot with spin-retry logic. + /// READER: Performs a consistent snapshot read with spin-retry logic. pub fn read_snapshot_with_retry(&self, index: usize, max_retries: usize) -> Option { assert!(index < self.num_slots); let offset = index * self.slot_size; diff --git a/tests/comprehensive_tests.rs b/tests/comprehensive_tests.rs index b6fa46d..98fe559 100644 --- a/tests/comprehensive_tests.rs +++ b/tests/comprehensive_tests.rs @@ -26,7 +26,7 @@ fn test_store_reader_edge_cases() { // 4. get before next() assert_eq!(reader.get(), None); - store.append(42); + store.append(&42); // 5. get before next() but after push assert_eq!(reader.get(), None); @@ -66,7 +66,7 @@ fn test_store_full_capacity() { }); for i in 0..num_items { - store.append(i as u64); + store.append(&(i as u64)); } let reader = store.reader(); @@ -93,8 +93,8 @@ fn test_store_overflow_panic() { in_memory: true, }); - store.append(1); - store.append(2); // Should panic here + store.append(&1); + store.append(&2); // Should panic here } #[test] @@ -143,7 +143,7 @@ fn test_store_concurrent_load() { barrier.wait(); for i in 1..=num_pushes { - store.append(i as u32); + store.append(&(i as u32)); } let mut total_read = 0; diff --git a/tests/journal_tests.rs b/tests/journal_tests.rs index c91427e..8a1c1df 100644 --- a/tests/journal_tests.rs +++ b/tests/journal_tests.rs @@ -11,9 +11,9 @@ fn test_journal_panic_when_full() { in_memory: true, }); - store.append(1); - store.append(2); - store.append(3); // This should panic + store.append(&1); + store.append(&2); + store.append(&3); // This should panic } #[test] @@ -26,8 +26,8 @@ fn test_journal_no_circularity() { }); let reader = store.reader(); - store.append(1); - store.append(2); + store.append(&1); + store.append(&2); assert_eq!(reader.get_at(0), Some(1)); assert_eq!(reader.get_at(1), Some(2)); diff --git a/tests/logic_tests.rs b/tests/logic_tests.rs index e0edb52..5eeabef 100644 --- a/tests/logic_tests.rs +++ b/tests/logic_tests.rs @@ -16,7 +16,7 @@ fn test_reader_next_and_with_logic() { assert!(reader.with(|&x| x).is_none()); // Push one value - store.append(100); + store.append(&100); // next() should now be true assert!(reader.next()); @@ -29,7 +29,7 @@ fn test_reader_next_and_with_logic() { assert_eq!(reader.with(|&x| x), Some(100)); // Push another value - store.append(200); + store.append(&200); // next() should be true assert!(reader.next()); @@ -47,9 +47,9 @@ fn test_reader_get_at_and_last() { }); let reader = store.reader(); - store.append(10); - store.append(20); - store.append(30); + store.append(&10); + store.append(&20); + store.append(&30); assert_eq!(reader.get_at(0), Some(10)); assert_eq!(reader.get_at(1), Some(20)); diff --git a/tests/push_read_tests.rs b/tests/push_read_tests.rs index 2277965..79b92e9 100644 --- a/tests/push_read_tests.rs +++ b/tests/push_read_tests.rs @@ -11,7 +11,7 @@ fn test_push_then_read_single() { }); let reader = store.reader(); - store.append(42); + store.append(&42); let res = reader.get_window::<1>(0).unwrap(); assert_eq!(res[0], 42); @@ -28,7 +28,7 @@ fn test_multiple_push_read_in_order() { let reader = store.reader(); for v in [1u32, 2, 3, 4, 5] { - store.append(v); + store.append(&v); } let res = reader.get_window::<5>(0).unwrap(); @@ -48,10 +48,10 @@ fn test_interleaved_push_and_read() { let reader = store.reader(); // Push values; verify FIFO order via get_window - store.append(10); - store.append(20); - store.append(30); - store.append(40); + store.append(&10); + store.append(&20); + store.append(&30); + store.append(&40); let res = reader.get_window::<4>(0).unwrap(); assert_eq!(res[0], 10); @@ -77,10 +77,10 @@ fn test_stores_are_isolated_by_type() { let u_reader = u_store.reader(); let i_reader = i_store.reader(); - u_store.append(1); - i_store.append(-1); - u_store.append(2); - i_store.append(-2); + u_store.append(&1); + i_store.append(&-1); + u_store.append(&2); + i_store.append(&-2); let u_res = u_reader.get_window::<2>(0).unwrap(); let i_res = i_reader.get_window::<2>(0).unwrap(); @@ -101,10 +101,10 @@ fn test_push_after_partial_reads() { }); let reader = store.reader(); - store.append(100); - store.append(200); - store.append(300); - store.append(400); + store.append(&100); + store.append(&200); + store.append(&300); + store.append(&400); let res = reader.get_window::<4>(0).unwrap(); assert_eq!(res[0], 100); diff --git a/tests/stage_engine_tests.rs b/tests/stage_engine_tests.rs index 373469c..cfc6cb9 100644 --- a/tests/stage_engine_tests.rs +++ b/tests/stage_engine_tests.rs @@ -5,11 +5,11 @@ use std::time::Duration; #[test] fn test_basic_pipeline() { let mut engine = StageEngine::::new() - .add_stage(|x: u32| Some(x + 1)) - .add_stage(|x: u32| Some(x * 2)); + .add_stage(|x: &u32| Some(*x + 1)) + .add_stage(|x: &u32| Some(*x * 2)); - engine.send(10); - engine.send(20); + engine.send(&10); + engine.send(&20); assert_eq!(engine.receive(), Some(22)); // (10 + 1) * 2 assert_eq!(engine.receive(), Some(42)); // (20 + 1) * 2 @@ -17,13 +17,13 @@ fn test_basic_pipeline() { #[test] fn test_none_filtering() { - let mut engine = StageEngine::::new() - .add_stage(|x: u32| if x.is_multiple_of(2) { Some(x) } else { None }); + let mut engine = + StageEngine::::new().add_stage(|x: &u32| x.is_multiple_of(2).then_some(*x)); - engine.send(1); - engine.send(2); - engine.send(3); - engine.send(4); + engine.send(&1); + engine.send(&2); + engine.send(&3); + engine.send(&4); assert_eq!(engine.receive(), Some(2)); assert_eq!(engine.receive(), Some(4)); @@ -33,7 +33,7 @@ fn test_none_filtering() { fn test_multiple_outputs() { struct Duplicate; impl Stage for Duplicate { - fn process(&mut self, data: u32, collector: &mut C) + fn process(&mut self, data: &u32, collector: &mut C) where C: OutputCollector, { @@ -44,7 +44,7 @@ fn test_multiple_outputs() { let mut engine = StageEngine::::new().add_stage(Duplicate); - engine.send(5); + engine.send(&5); assert_eq!(engine.receive(), Some(5)); assert_eq!(engine.receive(), Some(5)); } @@ -53,10 +53,10 @@ fn test_multiple_outputs() { fn test_load_moderate() { let count = 1000; let mut engine = - StageEngine::::with_capacity(count + 1).add_stage(|x: u32| Some(x + 1)); + StageEngine::::with_capacity(count + 1).add_stage(|x: &u32| Some(*x + 1)); for i in 0..count { - engine.send(i as u32); + engine.send(&(i as u32)); } for i in 0..count { @@ -67,19 +67,19 @@ fn test_load_moderate() { #[test] fn test_concurrency_stress() { let mut engine = StageEngine::::new() - .add_stage(|x: u32| { + .add_stage(|x: &u32| { // Some artificial delay to force concurrency thread::sleep(Duration::from_millis(1)); - Some(x) + Some(*x) }) - .add_stage(|x: u32| { + .add_stage(|x: &u32| { thread::sleep(Duration::from_millis(1)); - Some(x) + Some(*x) }); let count = 100; for i in 0..count { - engine.send(i); + engine.send(&i); } for i in 0..count { @@ -90,31 +90,31 @@ fn test_concurrency_stress() { #[test] fn test_complex_pipe_macro() { let mut engine = StageEngine::::new().add_stage(pipe![ - |x: u32| Some(x as u64), - |x: u64| Some(x * 10), - |x: u64| Some(x + 5), + |x: &u32| Some(*x as u64), + |x: &u64| Some(*x * 10), + |x: &u64| Some(*x + 5), ]); - engine.send(1); + engine.send(&1); assert_eq!(engine.receive(), Some(15)); } #[test] fn test_empty_pipeline() { let mut engine = StageEngine::::new(); - engine.send(42); + engine.send(&42); assert_eq!(engine.receive(), Some(42)); } #[test] fn test_await_idle() { - let mut engine = StageEngine::::new().add_stage(|x: u32| { + let mut engine = StageEngine::::new().add_stage(|x: &u32| { // Very short sleep to test await_idle without being too slow thread::sleep(Duration::from_millis(1)); - Some(x) + Some(*x) }); - engine.send(1); + engine.send(&1); // Give it a tiny bit of time to start thread::sleep(Duration::from_millis(5)); engine.await_idle(Duration::from_millis(200)); @@ -131,7 +131,8 @@ fn test_large_pod_struct() { id: u64, } - let mut engine = StageEngine::::new().add_stage(|mut l: Large| { + let mut engine = StageEngine::::new().add_stage(|l: &Large| { + let mut l = *l; l.id += 1; Some(l) }); @@ -140,7 +141,7 @@ fn test_large_pod_struct() { data: [1.0; 16], id: 100, }; - engine.send(input); + engine.send(&input); let expected = Large { data: [1.0; 16], @@ -152,11 +153,11 @@ fn test_large_pod_struct() { #[test] fn test_nested_pipes() { let mut engine = StageEngine::::new().add_stage(pipe![ - |x: u32| Some(x + 1), - pipe![|x: u32| Some(x * 2), |x: u32| Some(x + 1),] + |x: &u32| Some(*x + 1), + pipe![|x: &u32| Some(*x * 2), |x: &u32| Some(*x + 1),] ]); - engine.send(10); + engine.send(&10); // (10 + 1) * 2 + 1 = 23 assert_eq!(engine.receive(), Some(23)); } @@ -168,11 +169,11 @@ fn test_multi_stage_load() { let mut engine = StageEngine::::new(); for _ in 0..stages { - engine = engine.add_stage(|x: u32| Some(x + 1)); + engine = engine.add_stage(|x: &u32| Some(*x + 1)); } for i in 0..items { - engine.send(i); + engine.send(&i); } for i in 0..items { @@ -184,34 +185,39 @@ fn test_multi_stage_load() { #[should_panic(expected = "Store is full")] fn test_input_capacity_limit_panic() { let mut engine = StageEngine::::with_capacity(1); - engine.send(1); - engine.send(2); // Should panic here + engine.send(&1); + engine.send(&2); // Should panic here } #[test] fn test_stage_producing_none() { let mut engine = StageEngine::::new() - .add_stage(|x: u32| if x > 10 { Some(x) } else { None }) - .add_stage(|x: u32| Some(x * 2)); + .add_stage(|x: &u32| if *x > 10 { Some(*x) } else { None }) + .add_stage(|x: &u32| Some(*x * 2)); - engine.send(5); - engine.send(15); + engine.send(&5); + engine.send(&15); - engine.await_idle(Duration::from_millis(100)); - assert_eq!(engine.output_size(), 1); + // Give workers a chance to pick up items + thread::sleep(Duration::from_millis(5)); + engine.await_idle(Duration::from_millis(200)); + + // receive() will wait for the item if it hasn't arrived yet assert_eq!(engine.receive(), Some(30)); + // Once received, we know the processing is done + assert_eq!(engine.output_size(), 1); } #[test] fn test_worker_panic_on_drop() { // This test ensures that if a worker panics, the engine will panic on drop. let result = std::panic::catch_unwind(|| { - let mut engine = StageEngine::::new().add_stage(|_| { + let mut engine = StageEngine::::new().add_stage(|_: &u32| { panic!("Stage panic"); #[allow(unreachable_code)] Some(0u32) }); - engine.send(1); + engine.send(&1); // Wait for worker to panic thread::sleep(Duration::from_millis(50)); // engine is dropped here @@ -226,11 +232,11 @@ fn test_long_pipeline_heavy_load() { let mut engine = StageEngine::::with_capacity(items + 1); for _ in 0..stages { - engine = engine.add_stage(|x: u32| Some(x + 1)); + engine = engine.add_stage(|x: &u32| Some(*x + 1)); } for i in 0..items { - engine.send(i as u32); + engine.send(&(i as u32)); } for i in 0..items { diff --git a/tests/store_no_alloc_tests.rs b/tests/store_no_alloc_tests.rs index da46aa4..a837350 100644 --- a/tests/store_no_alloc_tests.rs +++ b/tests/store_no_alloc_tests.rs @@ -16,7 +16,7 @@ fn test_store_push_no_alloc() { }); assert_no_alloc(|| { - store.append(42); + store.append(&42); }); } @@ -28,7 +28,7 @@ fn test_store_reader_next_no_alloc() { size: 1024, in_memory: true, }); - store.append(42); + store.append(&42); let reader = store.reader(); assert_no_alloc(|| { @@ -44,7 +44,7 @@ fn test_store_reader_get_no_alloc() { size: 1024, in_memory: true, }); - store.append(42); + store.append(&42); let reader = store.reader(); reader.next(); @@ -61,8 +61,8 @@ fn test_store_reader_get_window_no_alloc() { size: 1024, in_memory: true, }); - store.append(42); - store.append(43); + store.append(&42); + store.append(&43); let reader = store.reader(); assert_no_alloc(|| { @@ -80,7 +80,7 @@ fn test_store_reader_get_at_no_alloc() { size: 1024, in_memory: true, }); - store.append(42); + store.append(&42); let reader = store.reader(); assert_no_alloc(|| { @@ -96,7 +96,7 @@ fn test_store_reader_get_last_no_alloc() { size: 1024, in_memory: true, }); - store.append(42); + store.append(&42); let reader = store.reader(); assert_no_alloc(|| {