Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/silk/fibers/fiber.h
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ class FiberScheduler
uint8_t category = 0;
// Processor whose io_uring ring holds this SQE; cancelIo must submit
// the cancel to the same ring to avoid a cross-ring -ENOENT failure.
uint16_t processorNumber = INVALID_PROCESSOR_NUMBER;
uint16_t processorNumber = kInvalidProcessorNumber;
#if defined(__SANITIZE_MEMORY__)
// Used to mark the kernel-written bytes as initialized for MSan.
iovec * readIov = nullptr;
Expand Down Expand Up @@ -521,7 +521,7 @@ class FiberScheduler
StackEntry stackEntry;
TreeEntry treeEntry;
uint64_t deadlineCycles = 0;
uint16_t processorNumber = INVALID_PROCESSOR_NUMBER;
uint16_t processorNumber = kInvalidProcessorNumber;
std::atomic<uint32_t> state{};
};

Expand Down
8 changes: 4 additions & 4 deletions include/silk/util/bounded-queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ class BoundedQueue
}

private:
struct alignas(CACHELINE_SIZE) Slot
struct alignas(kCacheLineSize) Slot
{
std::atomic<uint64_t> sequence;
T value;
};

static_assert(sizeof(Slot) == CACHELINE_SIZE);
static_assert(sizeof(Slot) == kCacheLineSize);

// Match offsets and stride used by src/gdb/fiber.py::_walk_bounded_queue
static_assert(offsetof(Slot, sequence) == 0);
Expand All @@ -149,8 +149,8 @@ class BoundedQueue
// src/gdb/fiber.py::_walk_bounded_queue reads enqueuePos at offset 64 and
// dequeuePos at offset 128 (mask=8 bytes, slots=8 bytes, then 2 x cacheline).
// Reordering or inserting fields here requires updating that script.
alignas(CACHELINE_SIZE) std::atomic<uint64_t> enqueuePos{};
alignas(CACHELINE_SIZE) std::atomic<uint64_t> dequeuePos{};
alignas(kCacheLineSize) std::atomic<uint64_t> enqueuePos{};
alignas(kCacheLineSize) std::atomic<uint64_t> dequeuePos{};
};

} // namespace silk
14 changes: 11 additions & 3 deletions include/silk/util/platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,21 @@ namespace silk
{

/** System page size in bytes. */
static constexpr uint64_t PAGE_SIZE = 4096;
#if defined(PAGE_SIZE)
static constexpr uint64_t kPageSize = PAGE_SIZE;
#else
static constexpr uint64_t kPageSize = 4096;
#endif

/** Cache line size in bytes. */
static constexpr uint64_t CACHELINE_SIZE = 64;
#if defined(CACHE_LINESIZE)
static constexpr uint64_t kCacheLineSize = CACHE_LINESIZE;
#else
static constexpr uint64_t kCacheLineSize = 64;
#endif

/** Hard cap on CPU index (largest known socket: 384 cores). */
static constexpr uint16_t INVALID_PROCESSOR_NUMBER = (1 << 10);
static constexpr uint16_t kInvalidProcessorNumber = (1 << 10);

/** Round @p value up to the nearest multiple of @p align (must be a power of two). */
template <typename T>
Expand Down
4 changes: 2 additions & 2 deletions include/silk/util/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ class QueueBase
/**
* Queue node holding a pointer to the enqueued value.
*/
struct alignas(CACHELINE_SIZE) QueueNode
struct alignas(kCacheLineSize) QueueNode
{
StackEntry stackEntry;
std::atomic<QueueNode *> next;
std::atomic<void *> value;
// TODO(vskipin): we can store small data in-place
};

static_assert(sizeof(QueueNode) == CACHELINE_SIZE);
static_assert(sizeof(QueueNode) == kCacheLineSize);

// Match offsets used by src/gdb/fiber.py::_walk_queue
static_assert(offsetof(QueueNode, next) == 8);
Expand Down
2 changes: 1 addition & 1 deletion include/silk/util/sharded-stack.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class ShardedStackBase
* objects or corruption. rseq guarantees the head update is atomic with
* respect to preemption; count is updated after the rseq commit.
*/
struct alignas(CACHELINE_SIZE) ProcessorState
struct alignas(kCacheLineSize) ProcessorState
{
std::atomic<StackEntry *> head{};
std::atomic<uint32_t> count{};
Expand Down
46 changes: 23 additions & 23 deletions src/fibers/fiber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class Fiber
// dispatch and every suspension. runFiber's full read/write set lives on
// this single line, so dispatch never pulls a second cache line on the
// common path.
struct alignas(CACHELINE_SIZE)
struct alignas(kCacheLineSize)
{
// Intrusive node for pool free-list and WaitStack membership.
StackEntry stackEntry;
Expand All @@ -150,10 +150,10 @@ class Fiber
bool inThreadMode = false;

// CPU this fiber is assigned to.
uint16_t processorNumber = INVALID_PROCESSOR_NUMBER;
uint16_t processorNumber = kInvalidProcessorNumber;

// Processor whose suspendedList this fiber is currently in.
uint16_t suspendedProcessorNumber = INVALID_PROCESSOR_NUMBER;
uint16_t suspendedProcessorNumber = kInvalidProcessorNumber;

// Suspend callback set by suspend, invoked by runFiber after the
// context switch back to the scheduler or thread worker.
Expand All @@ -173,7 +173,7 @@ class Fiber
// Cache line 1: context-switch state and profiler timestamps; touched on
// every dispatch. Per-fiber-once fields (fiberMain, parametersDtor,
// waitingFuture) piggyback for free.
struct alignas(CACHELINE_SIZE)
struct alignas(kCacheLineSize)
{
// mmap'd stack and fcontext handles for cooperative switching.
void * stack = nullptr;
Expand Down Expand Up @@ -266,7 +266,7 @@ Fiber::~Fiber() noexcept

if (stack)
{
int r = ::munmap(stack, FiberScheduler::getOptions().fiberStackSize + 2 * PAGE_SIZE);
int r = ::munmap(stack, FiberScheduler::getOptions().fiberStackSize + 2 * kPageSize);
SILK_ASSERT(!r);
}
}
Expand All @@ -277,8 +277,8 @@ bool Fiber::initialize(
state.store(FiberState::SUSPENDED, std::memory_order_relaxed);

inThreadMode = false;
processorNumber = INVALID_PROCESSOR_NUMBER;
suspendedProcessorNumber = INVALID_PROCESSOR_NUMBER;
processorNumber = kInvalidProcessorNumber;
suspendedProcessorNumber = kInvalidProcessorNumber;
suspendCallback = nullptr;
suspendContext = nullptr;
fiberId = fiberId_;
Expand All @@ -291,17 +291,17 @@ bool Fiber::initialize(

if (!stack)
{
stack = ::mmap(nullptr, fiberStackSize + 2 * PAGE_SIZE, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
stack = ::mmap(nullptr, fiberStackSize + 2 * kPageSize, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (stack == MAP_FAILED) [[unlikely]]
{
stack = nullptr;
return false;
}

int r = ::mprotect(stack, PAGE_SIZE, PROT_NONE);
int r = ::mprotect(stack, kPageSize, PROT_NONE);
SILK_ASSERT(!r);

r = ::mprotect(static_cast<uint8_t *>(stack) + PAGE_SIZE + fiberStackSize, PAGE_SIZE, PROT_NONE);
r = ::mprotect(static_cast<uint8_t *>(stack) + kPageSize + fiberStackSize, kPageSize, PROT_NONE);
SILK_ASSERT(!r);
}

Expand All @@ -317,7 +317,7 @@ bool Fiber::initialize(

fiberMain = fiberMain_;
parametersDtor = parametersDtor_;
fiberContext = make_fcontext(static_cast<uint8_t *>(stack) + PAGE_SIZE + fiberStackSize, fiberStackSize, fiberContextMain);
fiberContext = make_fcontext(static_cast<uint8_t *>(stack) + kPageSize + fiberStackSize, fiberStackSize, fiberContextMain);

return true;
}
Expand All @@ -342,7 +342,7 @@ void Fiber::switchToFiberContext() noexcept
#if defined(__SANITIZE_ADDRESS__)
void * schedulerFakeStack = nullptr;
__sanitizer_start_switch_fiber(
&schedulerFakeStack, static_cast<uint8_t *>(stack) + PAGE_SIZE, FiberScheduler::getOptions().fiberStackSize);
&schedulerFakeStack, static_cast<uint8_t *>(stack) + kPageSize, FiberScheduler::getOptions().fiberStackSize);
#endif

#if defined(__SANITIZE_THREAD__)
Expand Down Expand Up @@ -539,10 +539,10 @@ struct FiberScheduler::ProcessorState
void removeSuspended(Fiber * fiber) noexcept;

// Cache line 0: scheduling hot path.
struct alignas(CACHELINE_SIZE)
struct alignas(kCacheLineSize)
{
// CPU index this processor is pinned to.
uint16_t number = INVALID_PROCESSOR_NUMBER;
uint16_t number = kInvalidProcessorNumber;

// Set to true by runScheduler after initialization completes.
// The steal loop checks this before accessing the ring,
Expand Down Expand Up @@ -636,7 +636,7 @@ struct FiberScheduler::ProcessorState

void FiberScheduler::ProcessorState::initialize(uint16_t cpu) noexcept
{
SILK_ASSERT(cpu < INVALID_PROCESSOR_NUMBER);
SILK_ASSERT(cpu < kInvalidProcessorNumber);
number = cpu;

readyQueue.initialize(options.readyQueueCapacity);
Expand Down Expand Up @@ -1023,7 +1023,7 @@ void FiberScheduler::initialize(const Options * userOptions) noexcept
options = *userOptions;
}

SILK_ASSERT(options.fiberStackSize >= PAGE_SIZE && (options.fiberStackSize % PAGE_SIZE) == 0);
SILK_ASSERT(options.fiberStackSize >= kPageSize && (options.fiberStackSize % kPageSize) == 0);
SILK_ASSERT(options.readyQueueCapacity >= 2 && (options.readyQueueCapacity & (options.readyQueueCapacity - 1)) == 0);
SILK_ASSERT(options.ioUringQueueSize >= 2 && (options.ioUringQueueSize & (options.ioUringQueueSize - 1)) == 0);
SILK_ASSERT(options.ioUringFlushThreshold >= 1 && options.ioUringFlushThreshold <= options.ioUringQueueSize);
Expand Down Expand Up @@ -1096,7 +1096,7 @@ void FiberScheduler::buildStealCandidates() noexcept
for (uint16_t cpu = 0; cpu < scheduler->processorCount; ++cpu)
{
ProcessorState * processor = &scheduler->processorState[cpu];
if (processor->number == INVALID_PROCESSOR_NUMBER)
if (processor->number == kInvalidProcessorNumber)
{
continue;
}
Expand All @@ -1112,7 +1112,7 @@ void FiberScheduler::buildStealCandidates() noexcept
continue;
}
uint64_t cost = UINT64_MAX;
if (scheduler->processorState[other].number != INVALID_PROCESSOR_NUMBER)
if (scheduler->processorState[other].number != kInvalidProcessorNumber)
{
cost = topologyCostCycles(topologies[cpu], topologies[other]);
}
Expand Down Expand Up @@ -1156,7 +1156,7 @@ void FiberScheduler::destroy() noexcept
for (uint16_t cpu = 0; cpu < scheduler->processorCount; ++cpu)
{
ProcessorState * processor = &scheduler->processorState[cpu];
if (processor->number != INVALID_PROCESSOR_NUMBER)
if (processor->number != kInvalidProcessorNumber)
{
processor->wakeThread();
}
Expand Down Expand Up @@ -1273,7 +1273,7 @@ void FiberScheduler::enqueueReady(Fiber * fiber) noexcept

if (!fiber->inThreadMode)
{
if (fiber->processorNumber == INVALID_PROCESSOR_NUMBER)
if (fiber->processorNumber == kInvalidProcessorNumber)
{
fiber->processorNumber = getCurrentProcessor();
}
Expand Down Expand Up @@ -1472,7 +1472,7 @@ void FiberScheduler::cancelIo(IoFuture * future) noexcept
// it), io_uring returns -ENOENT and the original operation is never removed,
// leaving the caller's IoFuture::wait() blocked forever.
uint16_t processorNumber = future->processorNumber;
if (processorNumber == INVALID_PROCESSOR_NUMBER)
if (processorNumber == kInvalidProcessorNumber)
{
processorNumber = getCurrentProcessor();
}
Expand Down Expand Up @@ -1992,11 +1992,11 @@ void FiberScheduler::runFiber(Fiber * fiber, CpuTimer * timer) noexcept
// Maintain the per-CPU suspended list for GDB debuggability.
// suspendedLock and suspendedList are co-located in ProcessorState cache line 0.
// Benchmarking showed no net cost.
if (fiber->suspendedProcessorNumber != INVALID_PROCESSOR_NUMBER)
if (fiber->suspendedProcessorNumber != kInvalidProcessorNumber)
{
ProcessorState * processor = &scheduler->processorState[fiber->suspendedProcessorNumber];
processor->removeSuspended(fiber);
fiber->suspendedProcessorNumber = INVALID_PROCESSOR_NUMBER;
fiber->suspendedProcessorNumber = kInvalidProcessorNumber;
}

ProcessorState * processor = &scheduler->processorState[fiber->processorNumber];
Expand Down
4 changes: 2 additions & 2 deletions src/fibers/profiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ class Profiler
// State.
//

alignas(CACHELINE_SIZE) std::atomic<uint64_t> writeIndex{};
alignas(CACHELINE_SIZE) std::atomic<uint64_t> readIndex{};
alignas(kCacheLineSize) std::atomic<uint64_t> writeIndex{};
alignas(kCacheLineSize) std::atomic<uint64_t> readIndex{};
std::atomic<uint64_t> events[RING_CAPACITY];
Histogram histograms[NUM_KINDS][NUM_CATEGORIES];
};
Expand Down
2 changes: 1 addition & 1 deletion src/fibers/tests/fiber-test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ TEST(Fiber, WorkStealing)

std::atomic<bool> started{false};
std::atomic<bool> stop{false};
std::atomic<uint16_t> blockerCpuAtom{INVALID_PROCESSOR_NUMBER};
std::atomic<uint16_t> blockerCpuAtom{kInvalidProcessorNumber};

FiberFuture blocker;
int r = FiberScheduler::run(BlockerParams::fiberMain, {&started, &stop, &blockerCpuAtom}, &blocker);
Expand Down
2 changes: 1 addition & 1 deletion src/gdb/fiber.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ def _walk_bounded_queue(val):

for i in range(num_items):
pos = dequeue_pos + i
slot_addr = slots_base + (pos & mask) * 64 # stride = CACHELINE_SIZE
slot_addr = slots_base + (pos & mask) * 64 # stride = kCacheLineSize
seq = _u64(slot_addr)
if seq == pos + 1: # slot has a valid value
value = _ptr(slot_addr + 8)
Expand Down
2 changes: 1 addition & 1 deletion src/util/benchmarks/platform-bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ BENCHMARK_F(PlatformBench, EventFdRoundTrip)(benchmark::State & state)
// pure MESI protocol traffic.
BENCHMARK_F(PlatformBench, CacheLineRoundTrip)(benchmark::State & state)
{
alignas(CACHELINE_SIZE) std::atomic<uint64_t> shared{};
alignas(kCacheLineSize) std::atomic<uint64_t> shared{};

std::thread helper(
[&]
Expand Down
2 changes: 1 addition & 1 deletion src/util/memory-pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ MemoryPoolBase::MemoryPoolBase(
uint32_t objectSize, uint32_t alignment, uint32_t stackEntryOffset, InitFn * initialize, DestroyFn * destroy) noexcept
: objectSize(alignUp(objectSize, alignment))
, alignment(alignment)
, chunkSize(alignUp<uint32_t>(slotsOffset() + MIN_BATCH_SIZE * this->objectSize, PAGE_SIZE))
, chunkSize(alignUp<uint32_t>(slotsOffset() + MIN_BATCH_SIZE * this->objectSize, kPageSize))
, stackEntryOffset(stackEntryOffset)
, initialize(initialize)
, destroy(destroy)
Expand Down
Loading