-
Notifications
You must be signed in to change notification settings - Fork 4
feat(io_uring): Add support for registered buffers #72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
931fad8
91a2952
9b64f36
c332ab2
55ce17f
c87f30b
cc94911
c8ee4a7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1437,6 +1437,40 @@ void FiberScheduler::write(int fd, iovec * iov, uint32_t iov_len, uint64_t offse | |
| enqueueIo(future, [=](io_uring_sqe * sqe) noexcept { ::io_uring_prep_writev(sqe, fd, iov, iov_len, offset); }); | ||
| } | ||
|
|
||
| void FiberScheduler::readFixed( | ||
| int fd, void * buf, uint32_t len, uint64_t offset, int bufIndex, uint64_t * bytesRead, IoFuture * future) noexcept | ||
| { | ||
| future->result = bytesRead; | ||
| #if defined(__SANITIZE_MEMORY__) | ||
| future->readIovStorage = {buf, len}; | ||
| future->readIov = &future->readIovStorage; | ||
| future->readIovLen = 1; | ||
| #endif | ||
| enqueueIo(future, [=](io_uring_sqe * sqe) noexcept { ::io_uring_prep_read_fixed(sqe, fd, buf, len, offset, bufIndex); }); | ||
| } | ||
|
|
||
| void FiberScheduler::writeFixed( | ||
| int fd, const void * buf, uint32_t len, uint64_t offset, int bufIndex, uint64_t * bytesWritten, IoFuture * future) noexcept | ||
| { | ||
| future->result = bytesWritten; | ||
| enqueueIo(future, [=](io_uring_sqe * sqe) noexcept { ::io_uring_prep_write_fixed(sqe, fd, buf, len, offset, bufIndex); }); | ||
| } | ||
|
|
||
| void FiberScheduler::registerBuffers(const iovec * iovecs, unsigned count) noexcept | ||
| { | ||
| SILK_ASSERT(scheduler, "registerBuffers called before initialize()"); | ||
| for (uint32_t cpu = 0; cpu < scheduler->processorCount; ++cpu) | ||
| { | ||
| ProcessorState * processor = &scheduler->processorState[cpu]; | ||
| if (processor->number == INVALID_PROCESSOR_NUMBER) | ||
| { | ||
| continue; | ||
| } | ||
| int r = ::io_uring_register_buffers(&processor->ring, iovecs, count); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is about NUMA awareness here? Allocate once and use on any CPU does not look optimal. Should we maintain separate buffers per-node (per-cpu)?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair point 👍 This may require some changes on all three new apis (read_fixed/write_fixed/register_buffers) I think. Currently the buffers can physically live on one node and cores on other nodes have to pay remote-memory cost. I'm thinking of having something simple struct struct FixedBuf
{
void * base[SILK_MAX_NUMA_NODES]; // node-local pinned bases
uint32_t index; // node-relative index
uint32_t len;
};and make what do you think? May be it's complex? open to other ideas if you got any simpler approach (I'm not super familiar with NUMA in general :) )
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems silk just need to expose raw register-buffer API. It would be better to write client code first and then decide what can be pushed into silk. |
||
| SILK_ASSERT(r == 0, "io_uring_register_buffers failed: cpu=%u, ret=%d", cpu, r); | ||
| } | ||
| } | ||
|
|
||
| void FiberScheduler::poll(int fd, uint32_t events, uint64_t * triggeredEvents, IoFuture * future) noexcept | ||
| { | ||
| future->result = triggeredEvents; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,92 @@ | ||
| #include <silk/fibers/fiber.h> | ||
|
|
||
| #include <gtest/gtest.h> | ||
|
|
||
| #include <cstdint> | ||
| #include <cstdlib> | ||
| #include <cstring> | ||
|
|
||
| #include <fcntl.h> | ||
| #include <unistd.h> | ||
|
|
||
| namespace silk | ||
| { | ||
|
|
||
| // writeFixed then readFixed against a registered buffer. It must test | ||
| // round-trip all the three apis, including reads into a non-base offset | ||
| // within the registered region. | ||
| TEST(IoFixed, writeReadRoundTrip) | ||
| { | ||
| static constexpr uint64_t BLOCK = 4096; | ||
| static constexpr uint64_t NBLOCKS = 2; | ||
| static constexpr uint64_t SIZE = BLOCK * NBLOCKS; | ||
|
|
||
| char tmpl[] = "/tmp/silk-io-fixed-XXXXXX"; | ||
| int fd = ::mkstemp(tmpl); | ||
| ASSERT_GE(fd, 0) << std::strerror(errno); | ||
| ::unlink(tmpl); | ||
| ASSERT_EQ(::ftruncate(fd, static_cast<off_t>(SIZE)), 0) << std::strerror(errno); | ||
|
|
||
| // Single contiguous registration covering the whole buffer; bufIndex 0. | ||
| char * buf = static_cast<char *>(std::malloc(SIZE)); | ||
| ASSERT_NE(buf, nullptr); | ||
| iovec reg{buf, SIZE}; | ||
| FiberScheduler::registerBuffers(®, 1); | ||
|
|
||
| struct Params | ||
| { | ||
| int fd; | ||
| char * buf; | ||
|
|
||
| static int fiberMain(Params * p) noexcept | ||
| { | ||
| // Fill block 0 with a known pattern and write it out via WRITE_FIXED. | ||
| for (uint64_t i = 0; i < BLOCK; ++i) | ||
| { | ||
| p->buf[i] = static_cast<char>((i * 7 + 1) & 0xFF); | ||
| } | ||
|
|
||
| uint64_t bytesWritten = 0; | ||
| FiberScheduler::IoFuture wf; | ||
| FiberScheduler::writeFixed(p->fd, p->buf, BLOCK, 0, 0, &bytesWritten, &wf); | ||
| EXPECT_EQ(wf.wait(), 0); | ||
| EXPECT_EQ(bytesWritten, BLOCK); | ||
|
|
||
| // Read back into the SECOND block: a non-base offset still inside the | ||
| // registered region (exercises the "buf within registered buffer" | ||
| // contract) that is deliberately left untouched by userspace. Under | ||
| // MSan it is poisoned, so the only thing that can mark it initialized | ||
| // is the kernel fill + readFixed's MSAN_UNPOISON. If readFixed forgot | ||
| // to unpoison, the comparison below endup as a use-of-uninitialized. | ||
| char * dst = p->buf + BLOCK; | ||
| uint64_t bytesRead = 0; | ||
| FiberScheduler::IoFuture rf; | ||
| FiberScheduler::readFixed(p->fd, dst, BLOCK, 0, 0, &bytesRead, &rf); | ||
| EXPECT_EQ(rf.wait(), 0); | ||
| EXPECT_EQ(bytesRead, BLOCK); | ||
|
|
||
| // The kernel-filled bytes must match what we wrote (and must be | ||
| // readable without tripping MSan). | ||
| for (uint64_t i = 0; i < BLOCK; ++i) | ||
| { | ||
| EXPECT_EQ(dst[i], static_cast<char>((i * 7 + 1) & 0xFF)) << "mismatch at byte " << i; | ||
| } | ||
|
|
||
| return 0; | ||
| } | ||
| }; | ||
|
|
||
| EXPECT_EQ(FiberScheduler::run(Params::fiberMain, Params{fd, buf}), 0); | ||
|
|
||
| std::free(buf); | ||
| ::close(fd); | ||
| } | ||
|
|
||
| // TODO(kavi): this test runs a single fiber on one CPU. The whole point of | ||
| // registering buffers on every ring is that a fiber can move to another CPU and | ||
| // still use the same bufIndex. We don't test that here because we can't reliably | ||
| // force a fiber to move to a specific CPU, so the test would be flaky. If we need | ||
| // to be sure this works, add a second test that forces the move and checks the | ||
| // fixed IO still works. | ||
|
|
||
| } // namespace silk |
Uh oh!
There was an error while loading. Please reload this page.