Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,14 @@ Async file I/O benchmark using io_uring.
| `--rw MODE [MODE ...]` | `randread` | Access mode(s): `randread`, `randwrite`, `seqread` |
| `--flamegraph` | | Profile and generate flamegraph SVG |
| `--print-counters` | | Print perf counters after each run |
| `--fixed-buffers` | | Use registered buffers (`IORING_OP_READ_FIXED` / `WRITE_FIXED`) |

```
./bb -b release file-perf
./bb -b release file-perf --bs 64k --size 4g
./bb -b release file-perf --numjobs 1 16 --iodepth 1 16
./bb -b release file-perf --rw randread randwrite
./bb -b release file-perf --fixed-buffers
./bb -b release file-perf --flamegraph
```

Expand Down
17 changes: 17 additions & 0 deletions bb
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,7 @@ class FilePerfParams:
rw: list[str] = field(default_factory=lambda: ["randread"])
flamegraph: bool = False
print_counters: bool = False
fixed_buffers: bool = False
timeout: int = 180


Expand Down Expand Up @@ -777,6 +778,7 @@ def cmd_file_perf(preset: str, params: FilePerfParams) -> None:
file_perf = os.path.join(ROOT, f"build/{preset}/bin/file-perf")
verbose_flag = ["--verbose"] if log.isEnabledFor(logging.DEBUG) else []
print_counters_flag = ["--print-counters"] if params.print_counters else []
fixed_buffers_flag = ["--fixed-buffers"] if params.fixed_buffers else []

try:
if params.flamegraph:
Expand All @@ -802,6 +804,7 @@ def cmd_file_perf(preset: str, params: FilePerfParams) -> None:
str(params.warmup),
"--filename",
params.file,
*fixed_buffers_flag,
*verbose_flag,
],
)
Expand All @@ -828,6 +831,7 @@ def cmd_file_perf(preset: str, params: FilePerfParams) -> None:
str(params.warmup),
"--filename",
params.file,
*fixed_buffers_flag,
*print_counters_flag,
*verbose_flag,
timeout=params.timeout or None,
Expand Down Expand Up @@ -1546,6 +1550,12 @@ def _build_parser() -> argparse.ArgumentParser:
metavar="DURATION",
help="warmup duration applied to every benchmark (e.g. 10s); per-binary defaults are used when omitted",
)
perf_parser.add_argument(
"--fixed-buffers",
dest="fixed_buffers",
action="store_true",
help="run file-perf with registered buffers (IORING_OP_READ_FIXED / WRITE_FIXED)",
)
perf_parser.add_argument(
"targets",
nargs="+",
Expand Down Expand Up @@ -1628,6 +1638,12 @@ def _build_parser() -> argparse.ArgumentParser:
action="store_true",
help="print perf counters after each run",
)
file_perf_parser.add_argument(
"--fixed-buffers",
dest="file_fixed_buffers",
action="store_true",
help="use registered buffers (IORING_OP_READ_FIXED / WRITE_FIXED)",
)
file_perf_parser.add_argument(
"--timeout",
dest="file_timeout",
Expand Down Expand Up @@ -2054,6 +2070,7 @@ def main() -> None:
iodepth=[1, 16],
rw=["randwrite", "randread"],
timeout=args.timeout,
fixed_buffers=args.fixed_buffers,
**timing_overrides,
)
if "file" in targets:
Expand Down
8 changes: 7 additions & 1 deletion docs/perf.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ The main tables are reproducible with `./bb -b release perf --duration 60s --war

## file-perf -- async file I/O

`/dev/shm` (tmpfs, in-memory), bs=4k, size=1 GiB, 60 s measurement, 10 s warmup. Uses `FiberScheduler::read`/`write` (`IORING_OP_READV` / `IORING_OP_WRITEV`). `numjobs` = concurrent worker fibers; `iodepth` = per-fiber async IO queue depth (ring of `IoFuture`s).
`/dev/shm` (tmpfs, in-memory), bs=4k, size=1 GiB, 60 s measurement, 10 s warmup. Uses `FiberScheduler::read`/`write` (`IORING_OP_READV` / `IORING_OP_WRITEV`). `numjobs` = concurrent worker fibers; `iodepth` = per-fiber async IO queue depth (ring of `IoFuture`s). Pass `--fixed-buffers` to switch to registered buffers (`IORING_OP_READ_FIXED` / `IORING_OP_WRITE_FIXED`) -- see the subsection below.

| numjobs | iodepth | mode | IOPS | BW | avg | p50 | p95 | p99 | p99.9 |
|---|---|---|---|---|---|---|---|---|---|
Expand All @@ -27,6 +27,12 @@ The main tables are reproducible with `./bb -b release perf --duration 60s --war

**Note on batching**: The default `Options::ioUringFlushThreshold = 64` defers `io_uring_submit` until the SQ ring has accumulated enough work to amortize the syscall -- the right trade for network/HTTP/S3 workloads where completion latency dwarfs the few-µs batching delay (see net-perf below for the resulting p99 win). On tmpfs the kernel completes reads inline at submit time, so any deferral pushes submissions off the inline-completion fast path. `file-perf` therefore initializes the scheduler with `ioUringFlushThreshold = 1`, equivalent to per-fiber submit. Measured under the default threshold (64), `16/1 randread` lands at ~1.6M IOPS and `16/16 randread` at ~4.2M -- the override recovers full throughput without any kernel or scheduler change.

### Registered buffers (`--fixed-buffers`)

`./bb -b release perf --duration 60s --warmup 10s file --fixed-buffers` reruns the same matrix with registered buffers: each worker registers one buffer (covering its whole `iodepth * blockSize` block) on every per-CPU ring via `FiberScheduler::registerBuffers`, then issues `readFixed`/`writeFixed` against it (see `docs/scheduler.md`). The kernel reuses the pre-pinned mapping and skips the per-IO page-pin and iovec import.

The win is largest where per-IO buffer setup is the dominant cost: high-concurrency writes (`16` jobs) gain the most IOPS and shed the most average and tail latency. Reads, already inline-completed on tmpfs, see smaller gains except at `16/16`.

---

## fio comparison (io_uring, /dev/shm, bs=4k, size=1 GiB)
Expand Down
11 changes: 11 additions & 0 deletions docs/scheduler.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,17 @@ All sleep deadlines are in TSC cycles. `Tsc::getCycles()` is `rdtsc` / `cntvct_e

Each of `read`, `write`, and `poll` has two overloads: a blocking form that submits an io_uring SQE and suspends the fiber until completion, and an async form that submits the SQE and returns immediately with an `IoFuture*` the caller waits on separately. `handleCompletionQueue` processes CQEs, extracts the `IoFuture*` from the CQE user data, and calls `future->set()` to wake the waiting fiber.

### Registered (fixed) buffers

`readFixed` / `writeFixed` are async-only counterparts to `read`/`write` that submit `IORING_OP_READ_FIXED` / `IORING_OP_WRITE_FIXED` against a buffer that was pre-registered with the kernel via `registerBuffers`. Instead of passing an iovec the kernel must pin per IO, the caller passes a `(buf, len)` plus the `bufIndex` of a previously registered buffer; the kernel reuses the pre-pinned page mapping and skips the per-IO page-pin and iovec import. `buf` must lie inside the registered buffer at `bufIndex`.

`registerBuffers(iovecs, count)` registers one buffer set on **every** per-CPU io_uring ring, so a fiber that is work-stolen to another CPU can still submit fixed IO referencing the same index. Buffers are addressable as `bufIndex` `0..count-1`. Constraints:

- Call once, after `initialize()` and before issuing any fixed-buffer IO. io_uring allows a single buffer set per ring with no way to undo or change it, so a second call fails (`-EBUSY`) and trips an assert.
- Each ring pins its own copy, so total locked memory is `(number of CPUs) * (size of all buffers)`. With many CPUs or large buffers this can exceed `RLIMIT_MEMLOCK`; registration then fails and trips an assert.

The underlying liburing helpers are `io_uring_register_buffers`, `io_uring_prep_read_fixed`, and `io_uring_prep_write_fixed`. `file-perf --fixed-buffers` exercises this path (see `docs/perf.md`).

---

## Sleep Cancellation
Expand Down
44 changes: 44 additions & 0 deletions include/silk/fibers/fiber.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,11 @@ class FiberScheduler
// Used to mark the kernel-written bytes as initialized for MSan.
iovec * readIov = nullptr;
uint32_t readIovLen = 0;
// Backing storage for the single contiguous region of a fixed read
// (readFixed api has no caller-owned iovec to point at).
// readIov is pointed here in that case. Needed for Memory sanitizer.
iovec readIovStorage{};

#endif
};

Expand Down Expand Up @@ -421,6 +426,45 @@ class FiberScheduler
*/
static void write(int fd, iovec * iov, uint32_t iov_len, uint64_t offset, uint64_t * bytesWritten, IoFuture * future) noexcept;

/**
* Async fixed-buffer read: submits IORING_OP_READ_FIXED against the buffer
* previously registered at @p bufIndex (see registerBuffers). @p buf must lie
* within that registered buffer. Single contiguous region (no iovec import);
* the kernel skips the per-IO page-pin by reusing the pre-pinned registration.
*
* @param fd File descriptor to read from.
* @param buf Destination, inside the registered buffer at @p bufIndex.
* @param len Number of bytes to read.
* @param offset Byte offset within the file.
* @param bufIndex Index into the registered buffer table.
* @param bytesRead If not null, receives the number of bytes read on success.
* @param future Completion handle.
*/
static void readFixed(int fd, void * buf, uint32_t len, uint64_t offset, int bufIndex, uint64_t * bytesRead, IoFuture * future) noexcept;

/**
* Async fixed-buffer write: IORING_OP_WRITE_FIXED. See readFixed.
*/
static void
writeFixed(int fd, const void * buf, uint32_t len, uint64_t offset, int bufIndex, uint64_t * bytesWritten, IoFuture * future) noexcept;

/**
* Register a fixed buffer set on every per-CPU io_uring ring, so a fiber that
* is work-stolen to another CPU can still submit READ_FIXED/WRITE_FIXED
* referencing the same index. Call once after initialize(), before issuing any
* fixed-buffer IO. @p count buffers are addressable as bufIndex 0..count-1.
*
* You can only call this once. io_uring allows one buffer set per ring, and
* there is no way to undo or change it, so a second call fails (-EBUSY) and
* trips an assert.
*
* Each ring pins its own copy of the buffers in memory, so the total locked
* memory is (number of CPUs) * (size of all buffers). With many CPUs or large
* buffers this can go over the RLIMIT_MEMLOCK limit; if it does, registration
* fails and trips an assert.
*/
static void registerBuffers(const iovec * iovecs, unsigned count) noexcept;

/**
* Blocking poll: suspend the calling fiber until one of the requested
* events becomes ready on @p fd.
Expand Down
34 changes: 34 additions & 0 deletions src/fibers/fiber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1437,6 +1437,40 @@ void FiberScheduler::write(int fd, iovec * iov, uint32_t iov_len, uint64_t offse
enqueueIo(future, [=](io_uring_sqe * sqe) noexcept { ::io_uring_prep_writev(sqe, fd, iov, iov_len, offset); });
}

void FiberScheduler::readFixed(
int fd, void * buf, uint32_t len, uint64_t offset, int bufIndex, uint64_t * bytesRead, IoFuture * future) noexcept
{
future->result = bytesRead;
#if defined(__SANITIZE_MEMORY__)
future->readIovStorage = {buf, len};
future->readIov = &future->readIovStorage;
future->readIovLen = 1;
#endif
enqueueIo(future, [=](io_uring_sqe * sqe) noexcept { ::io_uring_prep_read_fixed(sqe, fd, buf, len, offset, bufIndex); });
}

void FiberScheduler::writeFixed(
int fd, const void * buf, uint32_t len, uint64_t offset, int bufIndex, uint64_t * bytesWritten, IoFuture * future) noexcept
{
future->result = bytesWritten;
enqueueIo(future, [=](io_uring_sqe * sqe) noexcept { ::io_uring_prep_write_fixed(sqe, fd, buf, len, offset, bufIndex); });
}

void FiberScheduler::registerBuffers(const iovec * iovecs, unsigned count) noexcept
{
SILK_ASSERT(scheduler, "registerBuffers called before initialize()");
for (uint32_t cpu = 0; cpu < scheduler->processorCount; ++cpu)
Comment thread
kavirajk marked this conversation as resolved.
{
ProcessorState * processor = &scheduler->processorState[cpu];
if (processor->number == INVALID_PROCESSOR_NUMBER)
{
continue;
}
int r = ::io_uring_register_buffers(&processor->ring, iovecs, count);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is about NUMA awareness here? Allocate once and use on any CPU does not look optimal. Should we maintain separate buffers per-node (per-cpu)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point 👍 This may require some changes on all three new apis (read_fixed/write_fixed/register_buffers) I think. Currently the buffers can physically live on one node and cores on other nodes have to pay remote-memory cost.

I'm thinking of having something simple struct

  struct FixedBuf
  {
      void *   base[SILK_MAX_NUMA_NODES];  // node-local pinned bases
      uint32_t index;                      // node-relative index
      uint32_t len;
  };

and make read_fixed and write_fixed apis accepts this FixedBuffer along with offset instead of plain void * pointer.

what do you think? May be it's complex? open to other ideas if you got any simpler approach (I'm not super familiar with NUMA in general :) )

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems silk just need to expose raw register-buffer API. It would be better to write client code first and then decide what can be pushed into silk.

SILK_ASSERT(r == 0, "io_uring_register_buffers failed: cpu=%u, ret=%d", cpu, r);
}
}

void FiberScheduler::poll(int fd, uint32_t events, uint64_t * triggeredEvents, IoFuture * future) noexcept
{
future->result = triggeredEvents;
Expand Down
92 changes: 92 additions & 0 deletions src/fibers/tests/io-fixed-test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#include <silk/fibers/fiber.h>

#include <gtest/gtest.h>

#include <cstdint>
#include <cstdlib>
#include <cstring>

#include <fcntl.h>
#include <unistd.h>

namespace silk
{

// writeFixed then readFixed against a registered buffer. It must test
// round-trip all the three apis, including reads into a non-base offset
// within the registered region.
TEST(IoFixed, writeReadRoundTrip)
{
static constexpr uint64_t BLOCK = 4096;
static constexpr uint64_t NBLOCKS = 2;
static constexpr uint64_t SIZE = BLOCK * NBLOCKS;

char tmpl[] = "/tmp/silk-io-fixed-XXXXXX";
int fd = ::mkstemp(tmpl);
ASSERT_GE(fd, 0) << std::strerror(errno);
::unlink(tmpl);
ASSERT_EQ(::ftruncate(fd, static_cast<off_t>(SIZE)), 0) << std::strerror(errno);

// Single contiguous registration covering the whole buffer; bufIndex 0.
char * buf = static_cast<char *>(std::malloc(SIZE));
ASSERT_NE(buf, nullptr);
iovec reg{buf, SIZE};
FiberScheduler::registerBuffers(&reg, 1);

struct Params
{
int fd;
char * buf;

static int fiberMain(Params * p) noexcept
{
// Fill block 0 with a known pattern and write it out via WRITE_FIXED.
for (uint64_t i = 0; i < BLOCK; ++i)
{
p->buf[i] = static_cast<char>((i * 7 + 1) & 0xFF);
}

uint64_t bytesWritten = 0;
FiberScheduler::IoFuture wf;
FiberScheduler::writeFixed(p->fd, p->buf, BLOCK, 0, 0, &bytesWritten, &wf);
EXPECT_EQ(wf.wait(), 0);
EXPECT_EQ(bytesWritten, BLOCK);

// Read back into the SECOND block: a non-base offset still inside the
// registered region (exercises the "buf within registered buffer"
// contract) that is deliberately left untouched by userspace. Under
// MSan it is poisoned, so the only thing that can mark it initialized
// is the kernel fill + readFixed's MSAN_UNPOISON. If readFixed forgot
// to unpoison, the comparison below endup as a use-of-uninitialized.
char * dst = p->buf + BLOCK;
uint64_t bytesRead = 0;
FiberScheduler::IoFuture rf;
FiberScheduler::readFixed(p->fd, dst, BLOCK, 0, 0, &bytesRead, &rf);
EXPECT_EQ(rf.wait(), 0);
EXPECT_EQ(bytesRead, BLOCK);

// The kernel-filled bytes must match what we wrote (and must be
// readable without tripping MSan).
for (uint64_t i = 0; i < BLOCK; ++i)
{
EXPECT_EQ(dst[i], static_cast<char>((i * 7 + 1) & 0xFF)) << "mismatch at byte " << i;
}

return 0;
}
};

EXPECT_EQ(FiberScheduler::run(Params::fiberMain, Params{fd, buf}), 0);

std::free(buf);
::close(fd);
}

// TODO(kavi): this test runs a single fiber on one CPU. The whole point of
// registering buffers on every ring is that a fiber can move to another CPU and
// still use the same bufIndex. We don't test that here because we can't reliably
// force a fiber to move to a specific CPU, so the test would be flaky. If we need
// to be sure this works, add a second test that forces the move and checks the
// fixed IO still works.

} // namespace silk
36 changes: 36 additions & 0 deletions src/perf/file-perf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ struct ClientConfig
uint64_t warmupNs = 2'000'000'000ULL;
bool direct = false;
bool printCounters = false;
// use registered buffers (IORING_OP_READ_FIXED / IORING_OP_WRITE_FIXED).
bool fixedBuffers = false;
};

class Benchmark
Expand Down Expand Up @@ -157,6 +159,7 @@ class Benchmark
std::unique_ptr<char, decltype(&std::free)> bufs{nullptr, std::free};
std::unique_ptr<Slot[]> slots;
uint32_t head = 0;
uint32_t index = 0;
};

//
Expand Down Expand Up @@ -199,6 +202,7 @@ void Benchmark::start()
uint32_t i = 0;
for (Job & job : jobs)
{
job.index = i;
job.strategy = OffsetStrategy(cfg.mode, cfg.fileSize, cfg.blockSize, i++, static_cast<uint32_t>(jobs.size()));

// O_DIRECT requires 512-byte-aligned buffers.
Expand All @@ -216,6 +220,19 @@ void Benchmark::start()
}
}

// register one fixed buffer per job (each covering its whole
// iodepth*blockSize block) on every per-CPU ring. bufIndex == job.index.
if (cfg.fixedBuffers)
{
std::vector<iovec> regBufs(jobs.size());
for (Job & job : jobs)
{
regBufs[job.index].iov_base = job.bufs.get();
regBufs[job.index].iov_len = static_cast<uint64_t>(cfg.iodepth) * cfg.blockSize;
}
silk::FiberScheduler::registerBuffers(regBufs.data(), static_cast<unsigned>(regBufs.size()));
}

for (Job & job : jobs)
{
int r = silk::FiberScheduler::run(workerFiberMain, {this, &job}, &job.future);
Expand Down Expand Up @@ -249,6 +266,23 @@ void Benchmark::submit(Job * job, Slot * slot)
{
slot->startCycles = silk::Tsc::getCycles();
uint64_t offset = job->strategy.next();

if (cfg.fixedBuffers)
{
// READ_FIXED / WRITE_FIXED against the buffer registered for this job.
int bufIndex = static_cast<int>(job->index);
if (cfg.mode == MODE_RANDWRITE)
{
silk::FiberScheduler::writeFixed(fd, slot->iov.iov_base, slot->iov.iov_len, offset, bufIndex, nullptr, &slot->future);
}
else
{
silk::FiberScheduler::readFixed(fd, slot->iov.iov_base, slot->iov.iov_len, offset, bufIndex, nullptr, &slot->future);
}
return;
}

// Baseline: vectored READV / WRITEV with a 1-element iovec.
if (cfg.mode == MODE_RANDWRITE)
{
silk::FiberScheduler::write(fd, &slot->iov, 1, offset, nullptr, &slot->future);
Expand Down Expand Up @@ -319,6 +353,7 @@ static void printJson(std::vector<uint64_t> & latNs, const ClientConfig & cfg)
printf(" \"iodepth\": %u,\n", cfg.iodepth);
printf(" \"block_size_bytes\": %u,\n", cfg.blockSize);
printf(" \"mode\": \"%s\",\n", modeName(cfg.mode));
printf(" \"fixed_buffers\": %s,\n", cfg.fixedBuffers ? "true" : "false");
printf(" \"file_size_bytes\": %lu,\n", cfg.fileSize);
printf(" \"duration_s\": %.3f,\n", durationS);
printf(" \"total\": %lu,\n", total);
Expand Down Expand Up @@ -362,6 +397,7 @@ int main(int argc, char ** argv)
("warmup", "warmup duration (e.g. 2s, 500ms)", cxxopts::value<std::string>(warmupStr))
("filename", "file path", cxxopts::value<std::string>(cfg.filename))
("direct", "use O_DIRECT (bypass page cache)", cxxopts::value<bool>(cfg.direct))
("fixed-buffers", "use registered buffers (IORING_OP_READ_FIXED / WRITE_FIXED)", cxxopts::value<bool>(cfg.fixedBuffers))
("print-counters", "enable per-CPU profiler and include counters in the JSON report", cxxopts::value<bool>(cfg.printCounters))
("v,verbose", "enable debug logging", cxxopts::value<bool>(verbose))
;
Expand Down
Loading