diff --git a/README.md b/README.md index de45a31..334df6f 100644 --- a/README.md +++ b/README.md @@ -146,12 +146,14 @@ Async file I/O benchmark using io_uring. | `--rw MODE [MODE ...]` | `randread` | Access mode(s): `randread`, `randwrite`, `seqread` | | `--flamegraph` | | Profile and generate flamegraph SVG | | `--print-counters` | | Print perf counters after each run | +| `--fixed-buffers` | | Use registered buffers (`IORING_OP_READ_FIXED` / `WRITE_FIXED`) | ``` ./bb -b release file-perf ./bb -b release file-perf --bs 64k --size 4g ./bb -b release file-perf --numjobs 1 16 --iodepth 1 16 ./bb -b release file-perf --rw randread randwrite +./bb -b release file-perf --fixed-buffers ./bb -b release file-perf --flamegraph ``` diff --git a/bb b/bb index 332e9cc..2472440 100755 --- a/bb +++ b/bb @@ -741,6 +741,7 @@ class FilePerfParams: rw: list[str] = field(default_factory=lambda: ["randread"]) flamegraph: bool = False print_counters: bool = False + fixed_buffers: bool = False timeout: int = 180 @@ -777,6 +778,7 @@ def cmd_file_perf(preset: str, params: FilePerfParams) -> None: file_perf = os.path.join(ROOT, f"build/{preset}/bin/file-perf") verbose_flag = ["--verbose"] if log.isEnabledFor(logging.DEBUG) else [] print_counters_flag = ["--print-counters"] if params.print_counters else [] + fixed_buffers_flag = ["--fixed-buffers"] if params.fixed_buffers else [] try: if params.flamegraph: @@ -802,6 +804,7 @@ def cmd_file_perf(preset: str, params: FilePerfParams) -> None: str(params.warmup), "--filename", params.file, + *fixed_buffers_flag, *verbose_flag, ], ) @@ -828,6 +831,7 @@ def cmd_file_perf(preset: str, params: FilePerfParams) -> None: str(params.warmup), "--filename", params.file, + *fixed_buffers_flag, *print_counters_flag, *verbose_flag, timeout=params.timeout or None, @@ -1546,6 +1550,12 @@ def _build_parser() -> argparse.ArgumentParser: metavar="DURATION", help="warmup duration applied to every benchmark (e.g. 10s); per-binary defaults are used when omitted", ) + perf_parser.add_argument( + "--fixed-buffers", + dest="fixed_buffers", + action="store_true", + help="run file-perf with registered buffers (IORING_OP_READ_FIXED / WRITE_FIXED)", + ) perf_parser.add_argument( "targets", nargs="+", @@ -1628,6 +1638,12 @@ def _build_parser() -> argparse.ArgumentParser: action="store_true", help="print perf counters after each run", ) + file_perf_parser.add_argument( + "--fixed-buffers", + dest="file_fixed_buffers", + action="store_true", + help="use registered buffers (IORING_OP_READ_FIXED / WRITE_FIXED)", + ) file_perf_parser.add_argument( "--timeout", dest="file_timeout", @@ -2054,6 +2070,7 @@ def main() -> None: iodepth=[1, 16], rw=["randwrite", "randread"], timeout=args.timeout, + fixed_buffers=args.fixed_buffers, **timing_overrides, ) if "file" in targets: diff --git a/docs/perf.md b/docs/perf.md index 2b7cdfc..137f13e 100644 --- a/docs/perf.md +++ b/docs/perf.md @@ -8,7 +8,7 @@ The main tables are reproducible with `./bb -b release perf --duration 60s --war ## file-perf -- async file I/O -`/dev/shm` (tmpfs, in-memory), bs=4k, size=1 GiB, 60 s measurement, 10 s warmup. Uses `FiberScheduler::read`/`write` (`IORING_OP_READV` / `IORING_OP_WRITEV`). `numjobs` = concurrent worker fibers; `iodepth` = per-fiber async IO queue depth (ring of `IoFuture`s). +`/dev/shm` (tmpfs, in-memory), bs=4k, size=1 GiB, 60 s measurement, 10 s warmup. Uses `FiberScheduler::read`/`write` (`IORING_OP_READV` / `IORING_OP_WRITEV`). `numjobs` = concurrent worker fibers; `iodepth` = per-fiber async IO queue depth (ring of `IoFuture`s). Pass `--fixed-buffers` to switch to registered buffers (`IORING_OP_READ_FIXED` / `IORING_OP_WRITE_FIXED`) -- see the subsection below. | numjobs | iodepth | mode | IOPS | BW | avg | p50 | p95 | p99 | p99.9 | |---|---|---|---|---|---|---|---|---|---| @@ -27,6 +27,12 @@ The main tables are reproducible with `./bb -b release perf --duration 60s --war **Note on batching**: The default `Options::ioUringFlushThreshold = 64` defers `io_uring_submit` until the SQ ring has accumulated enough work to amortize the syscall -- the right trade for network/HTTP/S3 workloads where completion latency dwarfs the few-µs batching delay (see net-perf below for the resulting p99 win). On tmpfs the kernel completes reads inline at submit time, so any deferral pushes submissions off the inline-completion fast path. `file-perf` therefore initializes the scheduler with `ioUringFlushThreshold = 1`, equivalent to per-fiber submit. Measured under the default threshold (64), `16/1 randread` lands at ~1.6M IOPS and `16/16 randread` at ~4.2M -- the override recovers full throughput without any kernel or scheduler change. +### Registered buffers (`--fixed-buffers`) + +`./bb -b release perf --duration 60s --warmup 10s file --fixed-buffers` reruns the same matrix with registered buffers: each worker registers one buffer (covering its whole `iodepth * blockSize` block) on every per-CPU ring via `FiberScheduler::registerBuffers`, then issues `readFixed`/`writeFixed` against it (see `docs/scheduler.md`). The kernel reuses the pre-pinned mapping and skips the per-IO page-pin and iovec import. + +The win is largest where per-IO buffer setup is the dominant cost: high-concurrency writes (`16` jobs) gain the most IOPS and shed the most average and tail latency. Reads, already inline-completed on tmpfs, see smaller gains except at `16/16`. + --- ## fio comparison (io_uring, /dev/shm, bs=4k, size=1 GiB) diff --git a/docs/scheduler.md b/docs/scheduler.md index d7bccdf..8587e3a 100644 --- a/docs/scheduler.md +++ b/docs/scheduler.md @@ -103,6 +103,17 @@ All sleep deadlines are in TSC cycles. `Tsc::getCycles()` is `rdtsc` / `cntvct_e Each of `read`, `write`, and `poll` has two overloads: a blocking form that submits an io_uring SQE and suspends the fiber until completion, and an async form that submits the SQE and returns immediately with an `IoFuture*` the caller waits on separately. `handleCompletionQueue` processes CQEs, extracts the `IoFuture*` from the CQE user data, and calls `future->set()` to wake the waiting fiber. +### Registered (fixed) buffers + +`readFixed` / `writeFixed` are async-only counterparts to `read`/`write` that submit `IORING_OP_READ_FIXED` / `IORING_OP_WRITE_FIXED` against a buffer that was pre-registered with the kernel via `registerBuffers`. Instead of passing an iovec the kernel must pin per IO, the caller passes a `(buf, len)` plus the `bufIndex` of a previously registered buffer; the kernel reuses the pre-pinned page mapping and skips the per-IO page-pin and iovec import. `buf` must lie inside the registered buffer at `bufIndex`. + +`registerBuffers(iovecs, count)` registers one buffer set on **every** per-CPU io_uring ring, so a fiber that is work-stolen to another CPU can still submit fixed IO referencing the same index. Buffers are addressable as `bufIndex` `0..count-1`. Constraints: + +- Call once, after `initialize()` and before issuing any fixed-buffer IO. io_uring allows a single buffer set per ring with no way to undo or change it, so a second call fails (`-EBUSY`) and trips an assert. +- Each ring pins its own copy, so total locked memory is `(number of CPUs) * (size of all buffers)`. With many CPUs or large buffers this can exceed `RLIMIT_MEMLOCK`; registration then fails and trips an assert. + +The underlying liburing helpers are `io_uring_register_buffers`, `io_uring_prep_read_fixed`, and `io_uring_prep_write_fixed`. `file-perf --fixed-buffers` exercises this path (see `docs/perf.md`). + --- ## Sleep Cancellation diff --git a/include/silk/fibers/fiber.h b/include/silk/fibers/fiber.h index fb0e05c..1c03e06 100644 --- a/include/silk/fibers/fiber.h +++ b/include/silk/fibers/fiber.h @@ -354,6 +354,11 @@ class FiberScheduler // Used to mark the kernel-written bytes as initialized for MSan. iovec * readIov = nullptr; uint32_t readIovLen = 0; + // Backing storage for the single contiguous region of a fixed read + // (readFixed api has no caller-owned iovec to point at). + // readIov is pointed here in that case. Needed for Memory sanitizer. + iovec readIovStorage{}; + #endif }; @@ -421,6 +426,45 @@ class FiberScheduler */ static void write(int fd, iovec * iov, uint32_t iov_len, uint64_t offset, uint64_t * bytesWritten, IoFuture * future) noexcept; + /** + * Async fixed-buffer read: submits IORING_OP_READ_FIXED against the buffer + * previously registered at @p bufIndex (see registerBuffers). @p buf must lie + * within that registered buffer. Single contiguous region (no iovec import); + * the kernel skips the per-IO page-pin by reusing the pre-pinned registration. + * + * @param fd File descriptor to read from. + * @param buf Destination, inside the registered buffer at @p bufIndex. + * @param len Number of bytes to read. + * @param offset Byte offset within the file. + * @param bufIndex Index into the registered buffer table. + * @param bytesRead If not null, receives the number of bytes read on success. + * @param future Completion handle. + */ + static void readFixed(int fd, void * buf, uint32_t len, uint64_t offset, int bufIndex, uint64_t * bytesRead, IoFuture * future) noexcept; + + /** + * Async fixed-buffer write: IORING_OP_WRITE_FIXED. See readFixed. + */ + static void + writeFixed(int fd, const void * buf, uint32_t len, uint64_t offset, int bufIndex, uint64_t * bytesWritten, IoFuture * future) noexcept; + + /** + * Register a fixed buffer set on every per-CPU io_uring ring, so a fiber that + * is work-stolen to another CPU can still submit READ_FIXED/WRITE_FIXED + * referencing the same index. Call once after initialize(), before issuing any + * fixed-buffer IO. @p count buffers are addressable as bufIndex 0..count-1. + * + * You can only call this once. io_uring allows one buffer set per ring, and + * there is no way to undo or change it, so a second call fails (-EBUSY) and + * trips an assert. + * + * Each ring pins its own copy of the buffers in memory, so the total locked + * memory is (number of CPUs) * (size of all buffers). With many CPUs or large + * buffers this can go over the RLIMIT_MEMLOCK limit; if it does, registration + * fails and trips an assert. + */ + static void registerBuffers(const iovec * iovecs, unsigned count) noexcept; + /** * Blocking poll: suspend the calling fiber until one of the requested * events becomes ready on @p fd. diff --git a/src/fibers/fiber.cpp b/src/fibers/fiber.cpp index abf8dfe..89b419a 100644 --- a/src/fibers/fiber.cpp +++ b/src/fibers/fiber.cpp @@ -1437,6 +1437,40 @@ void FiberScheduler::write(int fd, iovec * iov, uint32_t iov_len, uint64_t offse enqueueIo(future, [=](io_uring_sqe * sqe) noexcept { ::io_uring_prep_writev(sqe, fd, iov, iov_len, offset); }); } +void FiberScheduler::readFixed( + int fd, void * buf, uint32_t len, uint64_t offset, int bufIndex, uint64_t * bytesRead, IoFuture * future) noexcept +{ + future->result = bytesRead; +#if defined(__SANITIZE_MEMORY__) + future->readIovStorage = {buf, len}; + future->readIov = &future->readIovStorage; + future->readIovLen = 1; +#endif + enqueueIo(future, [=](io_uring_sqe * sqe) noexcept { ::io_uring_prep_read_fixed(sqe, fd, buf, len, offset, bufIndex); }); +} + +void FiberScheduler::writeFixed( + int fd, const void * buf, uint32_t len, uint64_t offset, int bufIndex, uint64_t * bytesWritten, IoFuture * future) noexcept +{ + future->result = bytesWritten; + enqueueIo(future, [=](io_uring_sqe * sqe) noexcept { ::io_uring_prep_write_fixed(sqe, fd, buf, len, offset, bufIndex); }); +} + +void FiberScheduler::registerBuffers(const iovec * iovecs, unsigned count) noexcept +{ + SILK_ASSERT(scheduler, "registerBuffers called before initialize()"); + for (uint32_t cpu = 0; cpu < scheduler->processorCount; ++cpu) + { + ProcessorState * processor = &scheduler->processorState[cpu]; + if (processor->number == INVALID_PROCESSOR_NUMBER) + { + continue; + } + int r = ::io_uring_register_buffers(&processor->ring, iovecs, count); + SILK_ASSERT(r == 0, "io_uring_register_buffers failed: cpu=%u, ret=%d", cpu, r); + } +} + void FiberScheduler::poll(int fd, uint32_t events, uint64_t * triggeredEvents, IoFuture * future) noexcept { future->result = triggeredEvents; diff --git a/src/fibers/tests/io-fixed-test.cpp b/src/fibers/tests/io-fixed-test.cpp new file mode 100644 index 0000000..5cf7ab0 --- /dev/null +++ b/src/fibers/tests/io-fixed-test.cpp @@ -0,0 +1,92 @@ +#include + +#include + +#include +#include +#include + +#include +#include + +namespace silk +{ + +// writeFixed then readFixed against a registered buffer. It must test +// round-trip all the three apis, including reads into a non-base offset +// within the registered region. +TEST(IoFixed, writeReadRoundTrip) +{ + static constexpr uint64_t BLOCK = 4096; + static constexpr uint64_t NBLOCKS = 2; + static constexpr uint64_t SIZE = BLOCK * NBLOCKS; + + char tmpl[] = "/tmp/silk-io-fixed-XXXXXX"; + int fd = ::mkstemp(tmpl); + ASSERT_GE(fd, 0) << std::strerror(errno); + ::unlink(tmpl); + ASSERT_EQ(::ftruncate(fd, static_cast(SIZE)), 0) << std::strerror(errno); + + // Single contiguous registration covering the whole buffer; bufIndex 0. + char * buf = static_cast(std::malloc(SIZE)); + ASSERT_NE(buf, nullptr); + iovec reg{buf, SIZE}; + FiberScheduler::registerBuffers(®, 1); + + struct Params + { + int fd; + char * buf; + + static int fiberMain(Params * p) noexcept + { + // Fill block 0 with a known pattern and write it out via WRITE_FIXED. + for (uint64_t i = 0; i < BLOCK; ++i) + { + p->buf[i] = static_cast((i * 7 + 1) & 0xFF); + } + + uint64_t bytesWritten = 0; + FiberScheduler::IoFuture wf; + FiberScheduler::writeFixed(p->fd, p->buf, BLOCK, 0, 0, &bytesWritten, &wf); + EXPECT_EQ(wf.wait(), 0); + EXPECT_EQ(bytesWritten, BLOCK); + + // Read back into the SECOND block: a non-base offset still inside the + // registered region (exercises the "buf within registered buffer" + // contract) that is deliberately left untouched by userspace. Under + // MSan it is poisoned, so the only thing that can mark it initialized + // is the kernel fill + readFixed's MSAN_UNPOISON. If readFixed forgot + // to unpoison, the comparison below endup as a use-of-uninitialized. + char * dst = p->buf + BLOCK; + uint64_t bytesRead = 0; + FiberScheduler::IoFuture rf; + FiberScheduler::readFixed(p->fd, dst, BLOCK, 0, 0, &bytesRead, &rf); + EXPECT_EQ(rf.wait(), 0); + EXPECT_EQ(bytesRead, BLOCK); + + // The kernel-filled bytes must match what we wrote (and must be + // readable without tripping MSan). + for (uint64_t i = 0; i < BLOCK; ++i) + { + EXPECT_EQ(dst[i], static_cast((i * 7 + 1) & 0xFF)) << "mismatch at byte " << i; + } + + return 0; + } + }; + + EXPECT_EQ(FiberScheduler::run(Params::fiberMain, Params{fd, buf}), 0); + + std::free(buf); + ::close(fd); +} + +// TODO(kavi): this test runs a single fiber on one CPU. The whole point of +// registering buffers on every ring is that a fiber can move to another CPU and +// still use the same bufIndex. We don't test that here because we can't reliably +// force a fiber to move to a specific CPU, so the test would be flaky. If we need +// to be sure this works, add a second test that forces the move and checks the +// fixed IO still works. + +} // namespace silk diff --git a/src/perf/file-perf.cpp b/src/perf/file-perf.cpp index 328d8dd..6ba4c29 100644 --- a/src/perf/file-perf.cpp +++ b/src/perf/file-perf.cpp @@ -129,6 +129,8 @@ struct ClientConfig uint64_t warmupNs = 2'000'000'000ULL; bool direct = false; bool printCounters = false; + // use registered buffers (IORING_OP_READ_FIXED / IORING_OP_WRITE_FIXED). + bool fixedBuffers = false; }; class Benchmark @@ -157,6 +159,7 @@ class Benchmark std::unique_ptr bufs{nullptr, std::free}; std::unique_ptr slots; uint32_t head = 0; + uint32_t index = 0; }; // @@ -199,6 +202,7 @@ void Benchmark::start() uint32_t i = 0; for (Job & job : jobs) { + job.index = i; job.strategy = OffsetStrategy(cfg.mode, cfg.fileSize, cfg.blockSize, i++, static_cast(jobs.size())); // O_DIRECT requires 512-byte-aligned buffers. @@ -216,6 +220,19 @@ void Benchmark::start() } } + // register one fixed buffer per job (each covering its whole + // iodepth*blockSize block) on every per-CPU ring. bufIndex == job.index. + if (cfg.fixedBuffers) + { + std::vector regBufs(jobs.size()); + for (Job & job : jobs) + { + regBufs[job.index].iov_base = job.bufs.get(); + regBufs[job.index].iov_len = static_cast(cfg.iodepth) * cfg.blockSize; + } + silk::FiberScheduler::registerBuffers(regBufs.data(), static_cast(regBufs.size())); + } + for (Job & job : jobs) { int r = silk::FiberScheduler::run(workerFiberMain, {this, &job}, &job.future); @@ -249,6 +266,23 @@ void Benchmark::submit(Job * job, Slot * slot) { slot->startCycles = silk::Tsc::getCycles(); uint64_t offset = job->strategy.next(); + + if (cfg.fixedBuffers) + { + // READ_FIXED / WRITE_FIXED against the buffer registered for this job. + int bufIndex = static_cast(job->index); + if (cfg.mode == MODE_RANDWRITE) + { + silk::FiberScheduler::writeFixed(fd, slot->iov.iov_base, slot->iov.iov_len, offset, bufIndex, nullptr, &slot->future); + } + else + { + silk::FiberScheduler::readFixed(fd, slot->iov.iov_base, slot->iov.iov_len, offset, bufIndex, nullptr, &slot->future); + } + return; + } + + // Baseline: vectored READV / WRITEV with a 1-element iovec. if (cfg.mode == MODE_RANDWRITE) { silk::FiberScheduler::write(fd, &slot->iov, 1, offset, nullptr, &slot->future); @@ -319,6 +353,7 @@ static void printJson(std::vector & latNs, const ClientConfig & cfg) printf(" \"iodepth\": %u,\n", cfg.iodepth); printf(" \"block_size_bytes\": %u,\n", cfg.blockSize); printf(" \"mode\": \"%s\",\n", modeName(cfg.mode)); + printf(" \"fixed_buffers\": %s,\n", cfg.fixedBuffers ? "true" : "false"); printf(" \"file_size_bytes\": %lu,\n", cfg.fileSize); printf(" \"duration_s\": %.3f,\n", durationS); printf(" \"total\": %lu,\n", total); @@ -362,6 +397,7 @@ int main(int argc, char ** argv) ("warmup", "warmup duration (e.g. 2s, 500ms)", cxxopts::value(warmupStr)) ("filename", "file path", cxxopts::value(cfg.filename)) ("direct", "use O_DIRECT (bypass page cache)", cxxopts::value(cfg.direct)) + ("fixed-buffers", "use registered buffers (IORING_OP_READ_FIXED / WRITE_FIXED)", cxxopts::value(cfg.fixedBuffers)) ("print-counters", "enable per-CPU profiler and include counters in the JSON report", cxxopts::value(cfg.printCounters)) ("v,verbose", "enable debug logging", cxxopts::value(verbose)) ;