diff --git a/glidefs/src/block/api.rs b/glidefs/src/block/api.rs index eb30240..8d30e98 100644 --- a/glidefs/src/block/api.rs +++ b/glidefs/src/block/api.rs @@ -1005,18 +1005,20 @@ where ); } - // USER_COPY bounce pool: total acquires, fallbacks, and how - // many worker pools have been initialized. `exhaust_fallbacks` - // > 0 sustained means the pool is undersized for this - // workload's concurrent S3-await pattern and the structural - // RSS bound is being broken via the malloc fallback path. + // USER_COPY bounce pool diagnostics: total acquires, backpressure + // waits (transient exhaustion — pool full, futures park, RSS stays + // bounded), heap fallbacks (init OOM — pool couldn't be mmap'd, so + // the worker serves from elastic heap buffers and the RSS bound is + // broken until it recovers), and how many worker pools are live. use std::sync::atomic::Ordering; use crate::block::ublk::buffer_pool::{ - GLOBAL_ACQUIRES, GLOBAL_BACKPRESSURE_WAITS, GLOBAL_POOLS_INITIALIZED, + GLOBAL_ACQUIRES, GLOBAL_BACKPRESSURE_WAITS, GLOBAL_HEAP_FALLBACKS, + GLOBAL_POOLS_INITIALIZED, }; let acq = GLOBAL_ACQUIRES.load(Ordering::Relaxed); let waits = GLOBAL_BACKPRESSURE_WAITS.load(Ordering::Relaxed); let pools = GLOBAL_POOLS_INITIALIZED.load(Ordering::Relaxed); + let heap_fallbacks = GLOBAL_HEAP_FALLBACKS.load(Ordering::Relaxed); let _ = writeln!( output, "# HELP glidefs_ublk_buffer_pool_acquires_total USER_COPY bounce buffers acquired from per-worker pool" @@ -1038,6 +1040,12 @@ where ); let _ = writeln!(output, "# TYPE glidefs_ublk_buffer_pool_workers_initialized gauge"); let _ = writeln!(output, "glidefs_ublk_buffer_pool_workers_initialized {pools}"); + let _ = writeln!( + output, + "# HELP glidefs_ublk_buffer_pool_heap_fallbacks_total USER_COPY I/Os served from a heap buffer because the worker's pool could not be mmap'd (host OOM at worker init). Daemon stays up but RSS is unbounded for that worker. Sustained growth means a worker is stuck degraded — investigate host memory." + ); + let _ = writeln!(output, "# TYPE glidefs_ublk_buffer_pool_heap_fallbacks_total counter"); + let _ = writeln!(output, "glidefs_ublk_buffer_pool_heap_fallbacks_total {heap_fallbacks}"); } Response::builder() .status(StatusCode::OK) diff --git a/glidefs/src/block/ublk/buffer_pool.rs b/glidefs/src/block/ublk/buffer_pool.rs index 5754e88..5d3085f 100644 --- a/glidefs/src/block/ublk/buffer_pool.rs +++ b/glidefs/src/block/ublk/buffer_pool.rs @@ -13,12 +13,24 @@ //! The size 256-per-worker is sized for *concurrent buffer-holding futures*, //! not active CPU. Each worker hosts many queues (potentially hundreds of //! tags); those tags can be in `handle_io.await` (S3 fetch, write_cache -//! flush) holding a buffer. 256 covers the realistic burst before falling -//! back to the malloc path on pool exhaustion. +//! flush) holding a buffer. 256 covers the realistic burst before the pool +//! applies backpressure. //! -//! When the pool is exhausted, `acquire` returns `None` and the caller falls -//! back to `vec![0u8; len]`. The fallback is correct but defeats the RSS -//! bound — so we instrument it and alert on sustained exhaustion. +//! ## Two ways the bound bends — both stay alive +//! +//! 1. **Transient exhaustion** (all 256 slots in flight): [`WorkerBufferPool::acquire`] +//! parks the future on a FIFO waiter queue until a slot is released. RSS +//! stays flat; throughput drops gracefully. `backpressure_waits` exposes it. +//! +//! 2. **Init OOM** (the pool's one-time `mmap` fails because the host is out +//! of committable memory at worker startup): instead of aborting the +//! daemon, [`worker_pool`] returns `None` and [`acquire_io_buf`] hands back +//! a heap [`IoBuf::Heap`] for that I/O. The worker degrades to elastic +//! `vec`-backed bounce buffers — correct, but it defeats the RSS bound, so +//! `GLOBAL_HEAP_FALLBACKS` counts it for alerting. The init failure is *not* +//! cached: a later I/O retries the `mmap`, so a worker that lost the startup +//! race to a transient memory spike upgrades back to the bounded fast path +//! on its own once the host recovers. use std::cell::{OnceCell, RefCell}; use std::collections::VecDeque; @@ -34,6 +46,10 @@ use std::task::{Context, Poll, Waker}; pub static GLOBAL_ACQUIRES: AtomicU64 = AtomicU64::new(0); pub static GLOBAL_BACKPRESSURE_WAITS: AtomicU64 = AtomicU64::new(0); pub static GLOBAL_POOLS_INITIALIZED: AtomicU64 = AtomicU64::new(0); +/// Count of I/Os served from a heap fallback buffer because the calling +/// worker's pool could not be `mmap`'d (init OOM). Sustained non-zero growth +/// means a worker is stuck in the degraded, unbounded-RSS path — alert on it. +pub static GLOBAL_HEAP_FALLBACKS: AtomicU64 = AtomicU64::new(0); /// Slots per worker. 256 × 128 KB = 32 MB per worker. const POOL_SLOTS: usize = 256; @@ -244,25 +260,102 @@ impl Drop for PoolSlot { thread_local! { static WORKER_POOL: OnceCell> = const { OnceCell::new() }; + /// Tracks whether this worker last observed its pool in the degraded + /// (heap-fallback) state, so we log exactly one line per degrade→recover + /// transition instead of one per I/O while the host is starved. + static POOL_DEGRADED: std::cell::Cell = const { std::cell::Cell::new(false) }; } -/// Get (or lazily initialize) the calling worker thread's buffer pool. -/// Panics if mmap fails — at worker init time that means OOM and the -/// daemon cannot continue. -pub fn worker_pool() -> Rc { +/// Get the calling worker thread's buffer pool, lazily creating it on first +/// use. Returns `None` if the pool's `mmap` fails — at worker init time that +/// means the host is momentarily out of committable memory (`ENOMEM`). +/// +/// A failed init is deliberately **not** cached: the next call retries the +/// `mmap`, so a worker that hit a transient memory spike at startup upgrades +/// back to the bounded fast path on its own once memory frees. The cost while +/// degraded is one failed `mmap` syscall per I/O on this worker — acceptable, +/// since the worker is memory-starved regardless. Callers treat `None` as +/// "use a heap buffer" via [`acquire_io_buf`]; they must never abort on it. +pub fn worker_pool() -> Option> { WORKER_POOL.with(|cell| { - cell.get_or_init(|| { - let pool = Rc::new( - WorkerBufferPool::new() - .expect("worker buffer pool mmap failed — OOM at init"), - ); - GLOBAL_POOLS_INITIALIZED.fetch_add(1, Ordering::Relaxed); - pool - }) - .clone() + if let Some(pool) = cell.get() { + // Recovered (or never degraded). Log the upgrade once. + if POOL_DEGRADED.with(|d| d.replace(false)) { + tracing::info!( + "worker buffer pool recovered — back to bounded bounce buffers" + ); + } + return Some(pool.clone()); + } + match WorkerBufferPool::new() { + Ok(pool) => { + let pool = Rc::new(pool); + // First set wins. On a single thread a reentrant set can't + // race, but `set` is fallible by signature; ignore the Err. + let _ = cell.set(pool.clone()); + GLOBAL_POOLS_INITIALIZED.fetch_add(1, Ordering::Relaxed); + if POOL_DEGRADED.with(|d| d.replace(false)) { + tracing::info!( + "worker buffer pool recovered — back to bounded bounce buffers" + ); + } + Some(pool) + } + Err(e) => { + // Log only on the first I/O that observes degradation, not + // every I/O while starved. + if !POOL_DEGRADED.with(|d| d.replace(true)) { + tracing::error!( + error = %e, + "worker buffer pool mmap failed (OOM) — serving from heap \ + fallback buffers (unbounded RSS) until the host recovers", + ); + } + None + } + } }) } +/// A bounce buffer for one in-flight USER_COPY I/O: either a slot from the +/// per-worker pool (the bounded-RSS fast path) or a heap allocation used when +/// the worker's pool could not be `mmap`'d. The heap variant keeps the daemon +/// serving — degraded RSS, not a crash — and disappears once the worker's +/// pool recovers. Both variants expose the same `&mut [u8]` to the I/O path. +pub enum IoBuf { + Pooled(PoolSlot), + Heap(Vec), +} + +impl IoBuf { + /// The first `len` bytes of the buffer. `len` is bounded by `SLOT_SIZE` + /// upstream (a single ublk I/O never exceeds `IO_BUF_BYTES`). + #[inline] + pub fn as_mut_slice(&mut self, len: usize) -> &mut [u8] { + match self { + IoBuf::Pooled(slot) => slot.as_mut_slice(len), + IoBuf::Heap(v) => &mut v[..len], + } + } +} + +/// Acquire a `len`-byte bounce buffer for the calling worker. +/// +/// Fast path: a slot from this worker's pool, parking on backpressure if all +/// slots are momentarily in flight. Fallback: if the pool can't be created +/// (init OOM, see [`worker_pool`]), a heap buffer — counted in +/// `GLOBAL_HEAP_FALLBACKS` so sustained degradation is alertable. Never panics; +/// always yields a usable buffer. +pub async fn acquire_io_buf(len: usize) -> IoBuf { + match worker_pool() { + Some(pool) => IoBuf::Pooled(pool.acquire().await), + None => { + GLOBAL_HEAP_FALLBACKS.fetch_add(1, Ordering::Relaxed); + IoBuf::Heap(vec![0u8; len]) + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/glidefs/src/block/ublk/device.rs b/glidefs/src/block/ublk/device.rs index cb7aa6c..439c626 100644 --- a/glidefs/src/block/ublk/device.rs +++ b/glidefs/src/block/ublk/device.rs @@ -2095,7 +2095,6 @@ async fn io_task_user_copy( ) -> Result<(), UblkError> { let cdev_fd = q.dev.tgt.fds[0]; let qid = q.get_qid(); - let pool = super::buffer_pool::worker_pool(); // Initial fetch — no buffer attached, empty slice. q.submit_io_prep_cmd(tag, BufDesc::Slice(&[]), 0, None).await?; @@ -2120,14 +2119,16 @@ async fn io_task_user_copy( Box::pin(dispatch_cold(op, offset, length, fua, handler)).await } sys::UBLK_IO_OP_READ | sys::UBLK_IO_OP_WRITE => { - // Async-backpressure acquire: if pool is exhausted, this - // future parks until a slot is released. Pool size is a - // *true* ceiling on total bounce RSS — no malloc fallback, - // no possibility of breaking the structural bound under - // load. `backpressure_waits` metric exposes how often we - // parked, so pool undersizing is observable. - let mut slot = pool.acquire().await; - let buf: &mut [u8] = slot.as_mut_slice(length as usize); + // Acquire a bounce buffer for this I/O. Normal path: a slot + // from this worker's pool, parking on backpressure if all + // slots are momentarily in flight — pool size is a *true* + // ceiling on bounce RSS, and `backpressure_waits` exposes + // undersizing. Degraded path: if the pool couldn't be mmap'd + // (host OOM at worker init), this hands back a heap buffer so + // the daemon keeps serving instead of aborting; the worker + // upgrades back to the pool once memory recovers. + let mut iobuf = super::buffer_pool::acquire_io_buf(length as usize).await; + let buf: &mut [u8] = iobuf.as_mut_slice(length as usize); if op == sys::UBLK_IO_OP_WRITE { // Copy WRITE data out of the kernel cmd buffer into ours. @@ -2176,10 +2177,11 @@ async fn io_task_user_copy( res } } - // Slot drops here, releasing back to pool before the - // commit-and-fetch await — keeps pool buffers free for any - // other tag that wakes up first and triggers the FIFO - // waker for the oldest backpressure-parked future. + // `iobuf` drops here: a pool slot releases back to the pool + // before the commit-and-fetch await — keeping buffers free + // for another tag that wakes first and triggers the FIFO + // waker for the oldest backpressure-parked future — while a + // heap fallback buffer is simply freed. } _ => -libc::EINVAL, };