Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions src/fibers/fiber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,12 @@ void FiberScheduler::ProcessorState::profileEvent(ProfileEventKind kind, uint8_t

void FiberScheduler::ProcessorState::wakeThread() noexcept
{
// seq_cst fence pairs with the one in parkThread to prevent the store-buffering race on weak
// memory models (e.g. arm64): the producer's prior queue store and this sleeping load must not
// reorder, so either we observe sleeping=true here (and post the eventfd) or parkThread's
// hasWork() re-check observes our enqueued work. Same pattern as enqueueWaiter / releaseWaiters.
std::atomic_thread_fence(std::memory_order_seq_cst);

if (sleeping.load(std::memory_order_acquire))
{
Perf::getSimpleCounter(simpleCounters[SCHEDULER_THREAD_WAKED], number).increment();
Expand All @@ -714,22 +720,25 @@ void FiberScheduler::ProcessorState::wakeThread() noexcept

void FiberScheduler::ProcessorState::parkThread(uint64_t waitNs, CpuTimer * timer) noexcept
{
__kernel_timespec ts;
ts.tv_sec = static_cast<int64_t>(waitNs / 1'000'000'000);
ts.tv_nsec = static_cast<int64_t>(waitNs % 1'000'000'000);

io_uring_getevents_arg arg{};
arg.ts = reinterpret_cast<uint64_t>(&ts);

// Announce that we are about to park. Release ordering ensures that any
// concurrent wakeThread() that observes parked=true also observes all prior
// queue stores, guaranteeing the eventfd_write reaches us.
// Announce that we are about to park, then a seq_cst fence pairing with the one in wakeThread:
// release alone is not a StoreLoad barrier, so without this the store could reorder past the
// hasWork() re-check below while a concurrent wakeThread reads sleeping=false - both miss, and
// the wakeup is lost on weak memory models (e.g. arm64). Same pattern as enqueueWaiter.
sleeping.store(true, std::memory_order_release);

std::atomic_thread_fence(std::memory_order_seq_cst);

// Double-check: work may have arrived between the last drain and here.
// If so, skip the park entirely so that work is not delayed by waitNs.
if (!hasWork())
{
__kernel_timespec ts;
ts.tv_sec = static_cast<int64_t>(waitNs / 1'000'000'000);
ts.tv_nsec = static_cast<int64_t>(waitNs % 1'000'000'000);

io_uring_getevents_arg arg{};
arg.ts = reinterpret_cast<uint64_t>(&ts);

Perf::getSimpleCounter(simpleCounters[SCHEDULER_THREAD_PARKED], number).increment();

timer->reset(simpleCounters[SCHEDULER_IDLE_TIME], number);
Expand Down
Loading