Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions include/silk/fibers/fiber.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ class Fiber;
static constexpr uint64_t FIBER_PARAMETERS_SIZE = 64;
static constexpr uint64_t FIBER_PARAMETERS_OFFSET = 224;

/**
* Hard cap on CPU index (largest known socket: 384 cores).
*/
static constexpr uint16_t INVALID_PROCESSOR_NUMBER = (1 << 10);

/**
* Fiber entry point signature. Returns an integer result code.
*/
Expand Down Expand Up @@ -349,7 +344,7 @@ class FiberScheduler
uint8_t category = 0;
// Processor whose io_uring ring holds this SQE; cancelIo must submit
// the cancel to the same ring to avoid a cross-ring -ENOENT failure.
uint32_t processorNumber = INVALID_PROCESSOR_NUMBER;
uint16_t processorNumber = INVALID_PROCESSOR_NUMBER;
#if defined(__SANITIZE_MEMORY__)
// Used to mark the kernel-written bytes as initialized for MSan.
iovec * readIov = nullptr;
Expand Down Expand Up @@ -526,7 +521,7 @@ class FiberScheduler
StackEntry stackEntry;
TreeEntry treeEntry;
uint64_t deadlineCycles = 0;
uint32_t processorNumber = INVALID_PROCESSOR_NUMBER;
uint16_t processorNumber = INVALID_PROCESSOR_NUMBER;
std::atomic<uint32_t> state{};
};

Expand Down
15 changes: 9 additions & 6 deletions include/silk/util/platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ static constexpr uint64_t PAGE_SIZE = 4096;
/** Cache line size in bytes. */
static constexpr uint64_t CACHELINE_SIZE = 64;

/** Hard cap on CPU index (largest known socket: 384 cores). */
static constexpr uint16_t INVALID_PROCESSOR_NUMBER = (1 << 10);

/** Round @p value up to the nearest multiple of @p align (must be a power of two). */
template <typename T>
static constexpr T alignUp(T value, T align) noexcept
Expand Down Expand Up @@ -82,21 +85,21 @@ static T * containerOf(M * member, M T::* memberPtr) noexcept
}

/** Return the total number of online processors. */
static inline uint32_t getProcessorCount() noexcept
static inline uint16_t getProcessorCount() noexcept
{
return static_cast<uint32_t>(sysconf(_SC_NPROCESSORS_ONLN));
return static_cast<uint16_t>(sysconf(_SC_NPROCESSORS_ONLN));
}

/** Return the number of processors available to the calling process (respects taskset/cgroup affinity). */
static inline uint32_t getAvailableProcessorCount() noexcept
static inline uint16_t getAvailableProcessorCount() noexcept
{
cpu_set_t cpuSet;
sched_getaffinity(0, sizeof(cpuSet), &cpuSet);
return static_cast<uint32_t>(CPU_COUNT(&cpuSet));
return static_cast<uint16_t>(CPU_COUNT(&cpuSet));
}

/** Return the index of the CPU the calling thread is currently running on. */
static inline uint32_t getCurrentProcessor() noexcept
static inline uint16_t getCurrentProcessor() noexcept
{
// The thread pointer is read through volatile asm rather than __builtin_thread_pointer(). A fiber
// may suspend on one OS thread and resume on another (work-stealing, or the SQ-ring-overflow yield
Expand All @@ -119,7 +122,7 @@ static inline uint32_t getCurrentProcessor() noexcept
// which is the same rseq read we do here. We skip the function call and the cpu_id >= 0
// fallback to vDSO/syscall -- safe on Linux 4.18+ / glibc 2.35+ where rseq is always registered.
struct rseq * rseq = reinterpret_cast<struct rseq *>(threadPointer + __rseq_offset);
return rseq->cpu_id;
return static_cast<uint16_t>(rseq->cpu_id);
}

/** Yield the calling thread's remaining timeslice to the OS scheduler. */
Expand Down
60 changes: 30 additions & 30 deletions src/fibers/fiber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ struct FiberScheduler::CpuTimer
counterRunning = counter;
}

void reset(uint32_t counter, uint32_t cpu) noexcept
void reset(uint32_t counter, uint16_t cpu) noexcept
{
uint64_t now = Tsc::getCycles();
uint64_t elapsedNs = Tsc::cyclesToNanoseconds(now - startedCycles);
Expand All @@ -518,7 +518,7 @@ struct FiberScheduler::CpuTimer
*/
struct FiberScheduler::ProcessorState
{
void initialize(uint32_t cpu) noexcept;
void initialize(uint16_t cpu) noexcept;
void destroy() noexcept;

FiberId allocateFiberId(uint8_t category) noexcept;
Expand Down Expand Up @@ -634,10 +634,10 @@ struct FiberScheduler::ProcessorState
BoundedQueue<Fiber *> readyQueue;
};

void FiberScheduler::ProcessorState::initialize(uint32_t cpu) noexcept
void FiberScheduler::ProcessorState::initialize(uint16_t cpu) noexcept
{
SILK_ASSERT(cpu < INVALID_PROCESSOR_NUMBER);
number = static_cast<uint16_t>(cpu);
number = cpu;

readyQueue.initialize(options.readyQueueCapacity);

Expand Down Expand Up @@ -958,9 +958,9 @@ struct FiberScheduler::SchedulerState

std::atomic<bool> stopping{};

uint32_t processorCount = 0;
uint32_t schedulerThreadCount = 0;
uint32_t workerThreadCount = 0;
uint16_t processorCount = 0;
uint16_t schedulerThreadCount = 0;
uint16_t workerThreadCount = 0;

std::unique_ptr<ProcessorState[]> processorState;
std::unique_ptr<std::thread[]> schedulerThreads;
Expand Down Expand Up @@ -1042,22 +1042,22 @@ void FiberScheduler::initialize(const Options * userOptions) noexcept
CPU_ZERO(&processCpuSet);
sched_getaffinity(0, sizeof(processCpuSet), &processCpuSet);

scheduler->schedulerThreadCount = CPU_COUNT(&processCpuSet);
scheduler->schedulerThreadCount = static_cast<uint16_t>(CPU_COUNT(&processCpuSet));
scheduler->schedulerThreads = std::make_unique<std::thread[]>(scheduler->schedulerThreadCount);

for (uint32_t cpu = 0; cpu < scheduler->processorCount; ++cpu)
for (uint16_t cpu = 0; cpu < scheduler->processorCount; ++cpu)
{
if (CPU_ISSET(cpu, &processCpuSet))
{
ProcessorState * processor = &scheduler->processorState[cpu];
processor->number = static_cast<uint16_t>(cpu);
processor->number = cpu;
}
}

buildStealCandidates();

uint32_t threadIndex = 0;
for (uint32_t cpu = 0; cpu < scheduler->processorCount; ++cpu)
uint16_t threadIndex = 0;
for (uint16_t cpu = 0; cpu < scheduler->processorCount; ++cpu)
{
if (CPU_ISSET(cpu, &processCpuSet))
{
Expand All @@ -1066,7 +1066,7 @@ void FiberScheduler::initialize(const Options * userOptions) noexcept
}
}

for (uint32_t cpu = 0; cpu < scheduler->processorCount; ++cpu)
for (uint16_t cpu = 0; cpu < scheduler->processorCount; ++cpu)
{
if (CPU_ISSET(cpu, &processCpuSet))
{
Expand All @@ -1081,7 +1081,7 @@ void FiberScheduler::initialize(const Options * userOptions) noexcept
scheduler->workerThreadCount = scheduler->schedulerThreadCount;
scheduler->workerThreads = std::make_unique<std::thread[]>(scheduler->workerThreadCount);

for (uint32_t i = 0; i < scheduler->workerThreadCount; ++i)
for (uint16_t i = 0; i < scheduler->workerThreadCount; ++i)
{
scheduler->workerThreads[i] = std::thread(runThreadWorker);
}
Expand All @@ -1092,8 +1092,8 @@ void FiberScheduler::buildStealCandidates() noexcept
auto topologies = std::make_unique<CpuTopology[]>(scheduler->processorCount);
readCpuTopologies(topologies.get(), scheduler->processorCount);

uint32_t candidateCount = scheduler->processorCount - 1;
for (uint32_t cpu = 0; cpu < scheduler->processorCount; ++cpu)
uint16_t candidateCount = scheduler->processorCount - 1;
for (uint16_t cpu = 0; cpu < scheduler->processorCount; ++cpu)
{
ProcessorState * processor = &scheduler->processorState[cpu];
if (processor->number == INVALID_PROCESSOR_NUMBER)
Expand All @@ -1104,8 +1104,8 @@ void FiberScheduler::buildStealCandidates() noexcept
// Build an array of CPUs with the estimated stealing cost.
processor->stealCandidates = std::make_unique<StealCandidate[]>(candidateCount);

uint32_t i = 0;
for (uint32_t other = 0; other < scheduler->processorCount; ++other)
uint16_t i = 0;
for (uint16_t other = 0; other < scheduler->processorCount; ++other)
{
if (other == cpu)
{
Expand All @@ -1125,10 +1125,10 @@ void FiberScheduler::buildStealCandidates() noexcept
// deterministic rotation by cpu % groupSize. Avoids the thundering
// herd of every CPU racing the same first target while keeping the
// candidate order reproducible across runs.
for (uint32_t start = 0; start < candidateCount;)
for (uint16_t start = 0; start < candidateCount;)
{
uint64_t groupCost = processor->stealCandidates[start].costCycles;
uint32_t end = start;
uint16_t end = start;
while (end < candidateCount && processor->stealCandidates[end].costCycles == groupCost)
{
++end;
Expand All @@ -1153,7 +1153,7 @@ void FiberScheduler::destroy() noexcept

scheduler->stopping.store(true, std::memory_order_release);

for (uint32_t cpu = 0; cpu < scheduler->processorCount; ++cpu)
for (uint16_t cpu = 0; cpu < scheduler->processorCount; ++cpu)
{
ProcessorState * processor = &scheduler->processorState[cpu];
if (processor->number != INVALID_PROCESSOR_NUMBER)
Expand All @@ -1162,23 +1162,23 @@ void FiberScheduler::destroy() noexcept
}
}

for (uint32_t i = 0; i < scheduler->schedulerThreadCount; ++i)
for (uint16_t i = 0; i < scheduler->schedulerThreadCount; ++i)
{
scheduler->schedulerThreads[i].join();
}

for (uint32_t cpu = 0; cpu < scheduler->processorCount; ++cpu)
for (uint16_t cpu = 0; cpu < scheduler->processorCount; ++cpu)
{
ProcessorState * processor = &scheduler->processorState[cpu];
processor->destroy();
}

for (uint32_t i = 0; i < scheduler->workerThreadCount; ++i)
for (uint16_t i = 0; i < scheduler->workerThreadCount; ++i)
{
scheduler->wakeThread();
}

for (uint32_t i = 0; i < scheduler->workerThreadCount; ++i)
for (uint16_t i = 0; i < scheduler->workerThreadCount; ++i)
{
scheduler->workerThreads[i].join();
}
Expand Down Expand Up @@ -1275,7 +1275,7 @@ void FiberScheduler::enqueueReady(Fiber * fiber) noexcept
{
if (fiber->processorNumber == INVALID_PROCESSOR_NUMBER)
{
fiber->processorNumber = static_cast<uint16_t>(getCurrentProcessor());
fiber->processorNumber = getCurrentProcessor();
}

ProcessorState * processor = &scheduler->processorState[fiber->processorNumber];
Expand Down Expand Up @@ -1471,7 +1471,7 @@ void FiberScheduler::cancelIo(IoFuture * future) noexcept
// was work-stolen to another CPU between registering the poll and cancelling
// it), io_uring returns -ENOENT and the original operation is never removed,
// leaving the caller's IoFuture::wait() blocked forever.
uint32_t processorNumber = future->processorNumber;
uint16_t processorNumber = future->processorNumber;
if (processorNumber == INVALID_PROCESSOR_NUMBER)
{
processorNumber = getCurrentProcessor();
Expand Down Expand Up @@ -1547,7 +1547,7 @@ void FiberScheduler::cancelSleep(SleepFuture * future) noexcept
LatencyReport FiberScheduler::reportLatency(ProfileEventKind kind, uint8_t category) noexcept
{
Histogram merged;
for (uint32_t cpu = 0; cpu < scheduler->processorCount; ++cpu)
for (uint16_t cpu = 0; cpu < scheduler->processorCount; ++cpu)
{
ProcessorState * processor = &scheduler->processorState[cpu];
if (processor->profiler)
Expand Down Expand Up @@ -1664,8 +1664,8 @@ bool FiberScheduler::runStealLoop(ProcessorState * processor, uint64_t idleSince
uint64_t idleCycles = now - idleSinceCycles;
uint64_t deadlineCycles = now + idleCycles;

uint32_t candidateCount = scheduler->processorCount - 1;
for (uint32_t i = 0; i < candidateCount && now < deadlineCycles; ++i)
uint16_t candidateCount = scheduler->processorCount - 1;
for (uint16_t i = 0; i < candidateCount && now < deadlineCycles; ++i)
{
// Candidates are sorted cheapest first. Once the threshold exceeds our
// idle duration all remaining candidates are even more expensive.
Expand Down
4 changes: 2 additions & 2 deletions src/fibers/tests/fiber-test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ TEST(Fiber, WorkStealing)
{
std::atomic<bool> * started;
std::atomic<bool> * stop;
std::atomic<uint32_t> * cpu;
std::atomic<uint16_t> * cpu;

static int fiberMain(BlockerParams * p) noexcept
{
Expand All @@ -885,7 +885,7 @@ TEST(Fiber, WorkStealing)

std::atomic<bool> started{false};
std::atomic<bool> stop{false};
std::atomic<uint32_t> blockerCpuAtom{UINT32_MAX};
std::atomic<uint16_t> blockerCpuAtom{INVALID_PROCESSOR_NUMBER};

FiberFuture blocker;
int r = FiberScheduler::run(BlockerParams::fiberMain, {&started, &stop, &blockerCpuAtom}, &blocker);
Expand Down
Loading