Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions glidefs/src/block/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1005,18 +1005,20 @@ where
);
}

// USER_COPY bounce pool: total acquires, fallbacks, and how
// many worker pools have been initialized. `exhaust_fallbacks`
// > 0 sustained means the pool is undersized for this
// workload's concurrent S3-await pattern and the structural
// RSS bound is being broken via the malloc fallback path.
// USER_COPY bounce pool diagnostics: total acquires, backpressure
// waits (transient exhaustion — pool full, futures park, RSS stays
// bounded), heap fallbacks (init OOM — pool couldn't be mmap'd, so
// the worker serves from elastic heap buffers and the RSS bound is
// broken until it recovers), and how many worker pools are live.
use std::sync::atomic::Ordering;
use crate::block::ublk::buffer_pool::{
GLOBAL_ACQUIRES, GLOBAL_BACKPRESSURE_WAITS, GLOBAL_POOLS_INITIALIZED,
GLOBAL_ACQUIRES, GLOBAL_BACKPRESSURE_WAITS, GLOBAL_HEAP_FALLBACKS,
GLOBAL_POOLS_INITIALIZED,
};
let acq = GLOBAL_ACQUIRES.load(Ordering::Relaxed);
let waits = GLOBAL_BACKPRESSURE_WAITS.load(Ordering::Relaxed);
let pools = GLOBAL_POOLS_INITIALIZED.load(Ordering::Relaxed);
let heap_fallbacks = GLOBAL_HEAP_FALLBACKS.load(Ordering::Relaxed);
let _ = writeln!(
output,
"# HELP glidefs_ublk_buffer_pool_acquires_total USER_COPY bounce buffers acquired from per-worker pool"
Expand All @@ -1038,6 +1040,12 @@ where
);
let _ = writeln!(output, "# TYPE glidefs_ublk_buffer_pool_workers_initialized gauge");
let _ = writeln!(output, "glidefs_ublk_buffer_pool_workers_initialized {pools}");
let _ = writeln!(
output,
"# HELP glidefs_ublk_buffer_pool_heap_fallbacks_total USER_COPY I/Os served from a heap buffer because the worker's pool could not be mmap'd (host OOM at worker init). Daemon stays up but RSS is unbounded for that worker. Sustained growth means a worker is stuck degraded — investigate host memory."
);
let _ = writeln!(output, "# TYPE glidefs_ublk_buffer_pool_heap_fallbacks_total counter");
let _ = writeln!(output, "glidefs_ublk_buffer_pool_heap_fallbacks_total {heap_fallbacks}");
}
Response::builder()
.status(StatusCode::OK)
Expand Down
129 changes: 111 additions & 18 deletions glidefs/src/block/ublk/buffer_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,24 @@
//! The size 256-per-worker is sized for *concurrent buffer-holding futures*,
//! not active CPU. Each worker hosts many queues (potentially hundreds of
//! tags); those tags can be in `handle_io.await` (S3 fetch, write_cache
//! flush) holding a buffer. 256 covers the realistic burst before falling
//! back to the malloc path on pool exhaustion.
//! flush) holding a buffer. 256 covers the realistic burst before the pool
//! applies backpressure.
//!
//! When the pool is exhausted, `acquire` returns `None` and the caller falls
//! back to `vec![0u8; len]`. The fallback is correct but defeats the RSS
//! bound — so we instrument it and alert on sustained exhaustion.
//! ## Two ways the bound bends — both stay alive
//!
//! 1. **Transient exhaustion** (all 256 slots in flight): [`WorkerBufferPool::acquire`]
//! parks the future on a FIFO waiter queue until a slot is released. RSS
//! stays flat; throughput drops gracefully. `backpressure_waits` exposes it.
//!
//! 2. **Init OOM** (the pool's one-time `mmap` fails because the host is out
//! of committable memory at worker startup): instead of aborting the
//! daemon, [`worker_pool`] returns `None` and [`acquire_io_buf`] hands back
//! a heap [`IoBuf::Heap`] for that I/O. The worker degrades to elastic
//! `vec`-backed bounce buffers — correct, but it defeats the RSS bound, so
//! `GLOBAL_HEAP_FALLBACKS` counts it for alerting. The init failure is *not*
//! cached: a later I/O retries the `mmap`, so a worker that lost the startup
//! race to a transient memory spike upgrades back to the bounded fast path
//! on its own once the host recovers.

use std::cell::{OnceCell, RefCell};
use std::collections::VecDeque;
Expand All @@ -34,6 +46,10 @@ use std::task::{Context, Poll, Waker};
pub static GLOBAL_ACQUIRES: AtomicU64 = AtomicU64::new(0);
pub static GLOBAL_BACKPRESSURE_WAITS: AtomicU64 = AtomicU64::new(0);
pub static GLOBAL_POOLS_INITIALIZED: AtomicU64 = AtomicU64::new(0);
/// Count of I/Os served from a heap fallback buffer because the calling
/// worker's pool could not be `mmap`'d (init OOM). Sustained non-zero growth
/// means a worker is stuck in the degraded, unbounded-RSS path — alert on it.
pub static GLOBAL_HEAP_FALLBACKS: AtomicU64 = AtomicU64::new(0);

/// Slots per worker. 256 × 128 KB = 32 MB per worker.
const POOL_SLOTS: usize = 256;
Expand Down Expand Up @@ -244,25 +260,102 @@ impl Drop for PoolSlot {

thread_local! {
static WORKER_POOL: OnceCell<Rc<WorkerBufferPool>> = const { OnceCell::new() };
/// Tracks whether this worker last observed its pool in the degraded
/// (heap-fallback) state, so we log exactly one line per degrade→recover
/// transition instead of one per I/O while the host is starved.
static POOL_DEGRADED: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
}

/// Get (or lazily initialize) the calling worker thread's buffer pool.
/// Panics if mmap fails — at worker init time that means OOM and the
/// daemon cannot continue.
pub fn worker_pool() -> Rc<WorkerBufferPool> {
/// Get the calling worker thread's buffer pool, lazily creating it on first
/// use. Returns `None` if the pool's `mmap` fails — at worker init time that
/// means the host is momentarily out of committable memory (`ENOMEM`).
///
/// A failed init is deliberately **not** cached: the next call retries the
/// `mmap`, so a worker that hit a transient memory spike at startup upgrades
/// back to the bounded fast path on its own once memory frees. The cost while
/// degraded is one failed `mmap` syscall per I/O on this worker — acceptable,
/// since the worker is memory-starved regardless. Callers treat `None` as
/// "use a heap buffer" via [`acquire_io_buf`]; they must never abort on it.
pub fn worker_pool() -> Option<Rc<WorkerBufferPool>> {
WORKER_POOL.with(|cell| {
cell.get_or_init(|| {
let pool = Rc::new(
WorkerBufferPool::new()
.expect("worker buffer pool mmap failed — OOM at init"),
);
GLOBAL_POOLS_INITIALIZED.fetch_add(1, Ordering::Relaxed);
pool
})
.clone()
if let Some(pool) = cell.get() {
// Recovered (or never degraded). Log the upgrade once.
if POOL_DEGRADED.with(|d| d.replace(false)) {
tracing::info!(
"worker buffer pool recovered — back to bounded bounce buffers"
);
}
return Some(pool.clone());
}
match WorkerBufferPool::new() {
Ok(pool) => {
let pool = Rc::new(pool);
// First set wins. On a single thread a reentrant set can't
// race, but `set` is fallible by signature; ignore the Err.
let _ = cell.set(pool.clone());
GLOBAL_POOLS_INITIALIZED.fetch_add(1, Ordering::Relaxed);
if POOL_DEGRADED.with(|d| d.replace(false)) {
tracing::info!(
"worker buffer pool recovered — back to bounded bounce buffers"
);
}
Some(pool)
}
Err(e) => {
// Log only on the first I/O that observes degradation, not
// every I/O while starved.
if !POOL_DEGRADED.with(|d| d.replace(true)) {
tracing::error!(
error = %e,
"worker buffer pool mmap failed (OOM) — serving from heap \
fallback buffers (unbounded RSS) until the host recovers",
);
}
None
}
}
})
}

/// A bounce buffer for one in-flight USER_COPY I/O: either a slot from the
/// per-worker pool (the bounded-RSS fast path) or a heap allocation used when
/// the worker's pool could not be `mmap`'d. The heap variant keeps the daemon
/// serving — degraded RSS, not a crash — and disappears once the worker's
/// pool recovers. Both variants expose the same `&mut [u8]` to the I/O path.
pub enum IoBuf {
Pooled(PoolSlot),
Heap(Vec<u8>),
}

impl IoBuf {
/// The first `len` bytes of the buffer. `len` is bounded by `SLOT_SIZE`
/// upstream (a single ublk I/O never exceeds `IO_BUF_BYTES`).
#[inline]
pub fn as_mut_slice(&mut self, len: usize) -> &mut [u8] {
match self {
IoBuf::Pooled(slot) => slot.as_mut_slice(len),
IoBuf::Heap(v) => &mut v[..len],
}
}
}

/// Acquire a `len`-byte bounce buffer for the calling worker.
///
/// Fast path: a slot from this worker's pool, parking on backpressure if all
/// slots are momentarily in flight. Fallback: if the pool can't be created
/// (init OOM, see [`worker_pool`]), a heap buffer — counted in
/// `GLOBAL_HEAP_FALLBACKS` so sustained degradation is alertable. Never panics;
/// always yields a usable buffer.
pub async fn acquire_io_buf(len: usize) -> IoBuf {
match worker_pool() {
Some(pool) => IoBuf::Pooled(pool.acquire().await),
None => {
GLOBAL_HEAP_FALLBACKS.fetch_add(1, Ordering::Relaxed);
IoBuf::Heap(vec![0u8; len])
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
28 changes: 15 additions & 13 deletions glidefs/src/block/ublk/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2095,7 +2095,6 @@ async fn io_task_user_copy(
) -> Result<(), UblkError> {
let cdev_fd = q.dev.tgt.fds[0];
let qid = q.get_qid();
let pool = super::buffer_pool::worker_pool();

// Initial fetch — no buffer attached, empty slice.
q.submit_io_prep_cmd(tag, BufDesc::Slice(&[]), 0, None).await?;
Expand All @@ -2120,14 +2119,16 @@ async fn io_task_user_copy(
Box::pin(dispatch_cold(op, offset, length, fua, handler)).await
}
sys::UBLK_IO_OP_READ | sys::UBLK_IO_OP_WRITE => {
// Async-backpressure acquire: if pool is exhausted, this
// future parks until a slot is released. Pool size is a
// *true* ceiling on total bounce RSS — no malloc fallback,
// no possibility of breaking the structural bound under
// load. `backpressure_waits` metric exposes how often we
// parked, so pool undersizing is observable.
let mut slot = pool.acquire().await;
let buf: &mut [u8] = slot.as_mut_slice(length as usize);
// Acquire a bounce buffer for this I/O. Normal path: a slot
// from this worker's pool, parking on backpressure if all
// slots are momentarily in flight — pool size is a *true*
// ceiling on bounce RSS, and `backpressure_waits` exposes
// undersizing. Degraded path: if the pool couldn't be mmap'd
// (host OOM at worker init), this hands back a heap buffer so
// the daemon keeps serving instead of aborting; the worker
// upgrades back to the pool once memory recovers.
let mut iobuf = super::buffer_pool::acquire_io_buf(length as usize).await;
let buf: &mut [u8] = iobuf.as_mut_slice(length as usize);

if op == sys::UBLK_IO_OP_WRITE {
// Copy WRITE data out of the kernel cmd buffer into ours.
Expand Down Expand Up @@ -2176,10 +2177,11 @@ async fn io_task_user_copy(
res
}
}
// Slot drops here, releasing back to pool before the
// commit-and-fetch await — keeps pool buffers free for any
// other tag that wakes up first and triggers the FIFO
// waker for the oldest backpressure-parked future.
// `iobuf` drops here: a pool slot releases back to the pool
// before the commit-and-fetch await — keeping buffers free
// for another tag that wakes first and triggers the FIFO
// waker for the oldest backpressure-parked future — while a
// heap fallback buffer is simply freed.
}
_ => -libc::EINVAL,
};
Expand Down
Loading