Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 44 additions & 12 deletions src/fibers/fiber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ struct FiberScheduler::ProcessorState
FiberId allocateFiberId(uint8_t category) noexcept;
void profileEvent(ProfileEventKind kind, uint8_t category, uint64_t durationCycles) noexcept;

void enqueueDoorbell() noexcept;
void wakeThread() noexcept;
void parkThread(uint64_t waitNs, CpuTimer * timer) noexcept;

Expand Down Expand Up @@ -650,16 +651,9 @@ void FiberScheduler::ProcessorState::initialize(uint16_t cpu) noexcept

SILK_ASSERT(params.features & IORING_FEAT_NODROP);

// Submit a persistent poll for the wakeup fd. IORING_POLL_ADD_MULTI keeps
// the SQE active indefinitely; each eventfd_write produces one CQE.
io_uring_sqe * sqe = ::io_uring_get_sqe(&ring);
SILK_ASSERT(sqe);

::io_uring_prep_poll_multishot(sqe, eventFd, POLLIN);
::io_uring_sqe_set_data64(sqe, CQE_TAG_DOORBELL);

r = ::io_uring_submit(&ring);
SILK_ASSERT(r >= 0);
// Arm the wakeup doorbell. The kernel can end the multishot poll on CQ overflow,
// so handleCompletionQueueSlow re-arms it through the same path on F_MORE loss.
enqueueDoorbell();

if (options.enableProfiler)
{
Expand Down Expand Up @@ -832,6 +826,32 @@ void FiberScheduler::ProcessorState::enqueueWakeup() noexcept
}
}

void FiberScheduler::ProcessorState::enqueueDoorbell() noexcept
{
// (Re-)arm the wakeup doorbell: a persistent multishot poll on eventFd posts a
// CQE each time wakeThread writes to it. The kernel ends the poll on CQ overflow
// (a CQE without IORING_CQE_F_MORE), so the completion drain re-arms it here. A
// missed re-arm leaves the doorbell deaf forever, so retry through a full SQ ring
// rather than skip it the way enqueueWakeup does.
for (;;)
{
bool enqueued = enqueueIo(
nullptr,
[this](io_uring_sqe * sqe) noexcept
{
::io_uring_prep_poll_multishot(sqe, eventFd, POLLIN);
::io_uring_sqe_set_data64(sqe, CQE_TAG_DOORBELL);
});

submitIo(true);

if (enqueued)
{
break;
}
}
}

template <typename Setup>
bool FiberScheduler::ProcessorState::enqueueIo(IoFuture * future, Setup && setup) noexcept
{
Expand Down Expand Up @@ -1749,6 +1769,7 @@ bool FiberScheduler::handleCompletionQueue(ProcessorState * processor) noexcept
__attribute__((noinline)) bool FiberScheduler::handleCompletionQueueSlow(ProcessorState * processor) noexcept
{
bool didWork = false;
bool rearmDoorbell = false;

// TSan needs an explicit barrier between submission/completion.
TSAN_ACQUIRE(processor);
Expand Down Expand Up @@ -1786,10 +1807,16 @@ __attribute__((noinline)) bool FiberScheduler::handleCompletionQueueSlow(Process

if (tag == CQE_TAG_DOORBELL)
{
// Wakeup doorbell: drain the eventfd counter and keep the
// POLL_ADD_MULTI SQE active for the next write.
// Wakeup doorbell: drain the eventfd counter. The kernel ends the
// multishot poll on CQ overflow - a CQE without IORING_CQE_F_MORE -
// so re-arm it after the drain or wakeThread can no longer wake us.
eventfd_t val;
::eventfd_read(processor->eventFd, &val);

if (!(cqe->flags & IORING_CQE_F_MORE))
{
rearmDoorbell = true;
}
continue;
}

Expand Down Expand Up @@ -1852,6 +1879,11 @@ __attribute__((noinline)) bool FiberScheduler::handleCompletionQueueSlow(Process
SILK_ASSERT(r >= 0);
}

if (rearmDoorbell)
{
processor->enqueueDoorbell();
}

if (didWork && processor->profiler)
{
processor->profileEvent(ProfileEventKind::CQ_WAIT, 0, entryCycles - processor->lastCqDrainCycles);
Expand Down
Loading