Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ Under identical, mathematically verified logical execution constraints (512x512
| **Maximum Speedup** | ![Speedup Max](https://img.shields.io/badge/dynamic/json?url=https%3A%2F%2Fraw.githubusercontent.com%2FDigitalServerHost%2FORCHID%2Fmain%2Fevidence%2Freproduced%2Fspeedups.json&query=%24.max&label=Speedup%20Max&color=brightgreen) |
| **Mean Speedup** | ![Speedup Mean](https://img.shields.io/badge/dynamic/json?url=https%3A%2F%2Fraw.githubusercontent.com%2FDigitalServerHost%2FORCHID%2Fmain%2Fevidence%2Freproduced%2Fspeedups.json&query=%24.mean&label=Speedup%20Mean&color=orange) |

> [!NOTE]
> **Understanding the Speedup Profiles:**
> - **Physical Cache Locality (C Harness)**: The dynamic badges above measure the hardware execution speedup of cache-blocked locality-aligned loops (matrix multiplication) over flat baselines, yielding **3.6x - 4.0x** actual hardware speedups.
> - **Parallel Memory Scheduler (Go Simulator)**: The scheduler unit tests (`TestBankedSchedulerTriad`) run a software-simulated queue model (STREAM-Triad) to measure bank serialization and parallel role routing. Because STREAM-Triad partitions requests into 3 distinct logical data streams (B-read, C-read, A-write), mapping them to 3 independent memory banks achieves a theoretical parallel speedup limit of exactly **3.0x** (which the Go scheduler hits at exactly **3.000x** cycle reduction).

---

## 🏛️ Centralized Architectural Design & Blueprint
Expand Down
10 changes: 5 additions & 5 deletions evidence/reproduced/speedups.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"max": "3.969x",
"mean": "3.756x",
"median": "3.721x",
"min": "3.617x"
}
"min": "3.546x",
"median": "3.564x",
"max": "3.895x",
"mean": "3.608x"
}
205 changes: 168 additions & 37 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,40 @@ package scheduler

import (
"errors"
"runtime"
"sync"
"sync/atomic"
"syscall"
"unsafe"
)

/**
* @struct QueueItem
* @brief Represents an item in the memory bank's lock-free ring buffer.
*/
type QueueItem struct {
State uint32 ///< 0: Idle, 1: Enqueued/Waiting, 2: Processing
Role string ///< Memory role ('A', 'B', or 'C')
Kind string ///< Access type ('READ' or 'WRITE')
Index int ///< Index inside the stream vector
Earliest uint64 ///< The earliest cycle this request was logically ready
EndCycle uint64 ///< The scheduled completion cycle computed by the scheduler
sem chan struct{} ///< Semaphore channel to park the waiting goroutine
}

/**
* @struct BankQueue
* @brief Bounded, lock-free ring-buffer queue for a memory bank.
*
* Arranges fields to guarantee 8-byte alignment for atomic head and tail cursors.
*/
type BankQueue struct {
head uint64 ///< Tail cursor index (write offset)
tail uint64 ///< Head cursor index (read offset)
mask uint64 ///< Bitmask for circular wrapping
ring []QueueItem ///< The slice backing the ring buffer
}

/**
* @struct AccessEvent
* @brief Holds detail logging metrics for a scheduled memory request.
Expand All @@ -45,7 +73,7 @@ type MemoryScheduler struct {
serviceCycles uint64 ///< Service latency in CPU cycles per request
freeAt []uint64 ///< Slice of cycle clocks when each bank will become free
requests []uint64 ///< Request counters per bank
bankLocks []sync.Mutex ///< Mutex fences protecting each physical memory bank
bankQueues []BankQueue ///< Lock-free ring buffer queues protecting each bank
trace []AccessEvent ///< Log trace of scheduled events
traceLimit int ///< Maximum event log tracing threshold
traceMu sync.Mutex ///< Mutex protecting logging trace slices
Expand All @@ -68,23 +96,124 @@ func NewMemoryScheduler(bankCount int, serviceCycles uint64, traceLimit int) (*M
return nil, errors.New("bankCount and serviceCycles must be greater than or equal to 1")
}

// Bounded ring-buffer queue size per bank (must be a power of two)
// Set to 65536 to guarantee ticket slots never wrap around for active concurrent requests.
const queueSize = 65536
const queueMask = queueSize - 1

bankQueues := make([]BankQueue, bankCount)
for i := 0; i < bankCount; i++ {
ring := make([]QueueItem, queueSize)
for j := 0; j < queueSize; j++ {
ring[j] = QueueItem{
State: 0,
sem: make(chan struct{}, 1),
}
}
bankQueues[i] = BankQueue{
head: 0,
tail: 0,
mask: queueMask,
ring: ring,
}
}

return &MemoryScheduler{
bankCount: bankCount,
serviceCycles: serviceCycles,
freeAt: make([]uint64, bankCount),
requests: make([]uint64, bankCount),
bankLocks: make([]sync.Mutex, bankCount),
bankQueues: bankQueues,
traceLimit: traceLimit,
trace: make([]AccessEvent, 0, traceLimit),
numaBankMap: make(map[int]int),
numaBuffers: make(map[int][]byte),
}, nil
}

/**
* @brief Wait for the queue ticket's turn using channel handoff or bypass.
*/
func (q *BankQueue) waitTurn(ticket uint64, idx uint64) {
h := atomic.LoadUint64(&q.head)
if h == ticket {
// Try to claim the bypassed state transition.
if !atomic.CompareAndSwapUint32(&q.ring[idx].State, 1, 2) {
// Previous thread already claimed and signaled us, consume the token.
<-q.ring[idx].sem
}
} else {
// Park the goroutine until the previous holder signals us.
<-q.ring[idx].sem
}
}

/**
* @brief Performs state transition checks and signals the next ticket holder if enqueued.
*
* @param nextTicket The next ticket sequence number in the queue.
* @param nextIdx The next ticket index mapping into the circular queue ring buffer.
* @return True if signaling is done (either completed, signaled, or bypassed), false to retry.
*/
func (q *BankQueue) checkAndSignal(nextTicket, nextIdx uint64) bool {
// If head has already advanced past nextTicket, the next thread has completed.
if atomic.LoadUint64(&q.head) > nextTicket {
return true
}
st := atomic.LoadUint32(&q.ring[nextIdx].State)
if st == 1 {
if atomic.CompareAndSwapUint32(&q.ring[nextIdx].State, 1, 2) {
q.ring[nextIdx].sem <- struct{}{}
}
return true
}
if st == 2 {
// The next thread already bypassed and is running.
return true
}
return false
}

/**
* @brief Signal the next ticket holder in the queue if they are ready.
*/
func (q *BankQueue) signalNext(ticket uint64) {
nextTicket := ticket + 1
if nextTicket < atomic.LoadUint64(&q.tail) {
nextIdx := nextTicket & q.mask
// Wait briefly until the next thread is enqueued (State becomes 1 or 2).
for {
if q.checkAndSignal(nextTicket, nextIdx) {
break
}
runtime.Gosched()
}
}
}

/**
* @brief Thread-safely records scheduled access parameters into the trace buffer.
*/
func (ms *MemoryScheduler) logTrace(role, kind string, index, bank int, earliest, start, end uint64) {
ms.traceMu.Lock()
defer ms.traceMu.Unlock()
if len(ms.trace) < ms.traceLimit {
ms.trace = append(ms.trace, AccessEvent{
Role: role,
Kind: kind,
Index: index,
Bank: bank,
Earliest: earliest,
Start: start,
End: end,
})
}
}

/**
* @brief Request thread-safe access to a specific memory bank.
*
* Enforces bank-level serialization using active bankLocks, updates the bank's
* Enforces bank-level serialization using active lock-free queues, updates the bank's
* availability register, and increments hardware request counters using atomics.
*
* @param role The memory stream role identifier ('A', 'B', or 'C').
Expand All @@ -99,38 +228,46 @@ func (ms *MemoryScheduler) Access(role string, kind string, index int, bank int,
return 0, errors.New("requested memory bank index out of bounds")
}

// Acquire lock fence for the targeted physical memory bank
ms.bankLocks[bank].Lock()
defer ms.bankLocks[bank].Unlock()
q := &ms.bankQueues[bank]

// 1. Claim a ticket in the ring buffer.
ticket := atomic.AddUint64(&q.tail, 1) - 1

// 2. Write the request data into the acquired slot.
idx := ticket & q.mask
q.ring[idx].Role = role
q.ring[idx].Kind = kind
q.ring[idx].Index = index
q.ring[idx].Earliest = earliest

// Calculate scheduling start and end cycles
currentFree := ms.freeAt[bank]
// Mark slot as enqueued/waiting.
atomic.StoreUint32(&q.ring[idx].State, 1)

// 3. Wait until head pointer reaches our ticket (our turn).
q.waitTurn(ticket, idx)

// 4. Execute the bank allocation cycle update (user-space serialized section).
currentFree := atomic.LoadUint64(&ms.freeAt[bank])
startCycle := earliest
if currentFree > startCycle {
startCycle = currentFree
}
endCycle := startCycle + ms.serviceCycles

// Update bank availability register
ms.freeAt[bank] = endCycle
// Update bank availability register clock.
atomic.StoreUint64(&ms.freeAt[bank], endCycle)

// Atomically increment access metrics
// Increment requests metric atomically.
atomic.AddUint64(&ms.requests[bank], 1)

// Log event to trace buffer if within limit
ms.traceMu.Lock()
if len(ms.trace) < ms.traceLimit {
ms.trace = append(ms.trace, AccessEvent{
Role: role,
Kind: kind,
Index: index,
Bank: bank,
Earliest: earliest,
Start: startCycle,
End: endCycle,
})
}
ms.traceMu.Unlock()
// Log event to trace buffer.
ms.logTrace(role, kind, index, bank, earliest, startCycle, endCycle)

// 5. Release our ticket and signal the next holder if present.
atomic.StoreUint32(&q.ring[idx].State, 0)
atomic.StoreUint64(&q.head, ticket+1)

q.signalNext(ticket)

return endCycle, nil
}
Expand All @@ -145,12 +282,10 @@ func (ms *MemoryScheduler) Access(role string, kind string, index int, bank int,
func (ms *MemoryScheduler) TotalCycles() uint64 {
var maxCycles uint64
for i := 0; i < ms.bankCount; i++ {
ms.bankLocks[i].Lock()
val := ms.freeAt[i]
val := atomic.LoadUint64(&ms.freeAt[i])
if val > maxCycles {
maxCycles = val
}
ms.bankLocks[i].Unlock()
}
return maxCycles
}
Expand Down Expand Up @@ -183,17 +318,13 @@ func (ms *MemoryScheduler) GetTrace() []AccessEvent {
}

/**
* @brief Configures and allocates physical NUMA-bound memory buffers for each bank.
* @brief Allocates a single memory buffer and binds it to a target physical NUMA node.
*
* Leverages explicit memory-mapped file/anonymous nodes (mmap with MAP_POPULATE)
* and the Linux mbind(2) system call to bind virtual memory ranges to host physical sockets.
* This directly demonstrates physical CADENCE memory role isolation.
*
* @param bankToNode A map linking each bank ID to its target physical NUMA node.
* @param bankSize The size in bytes of the buffer to allocate per bank.
* @return An error if allocations fail, or nil on success.
* @param bank The physical memory bank index.
* @param node The physical NUMA node ID to bind the memory to.
* @param bankSize The size in bytes of the buffer to allocate.
* @return The allocated byte slice buffer, or an error on failure.
*/
// allocateAndBindBank handles a single bank allocation and NUMA mbind syscall mapping.
func (ms *MemoryScheduler) allocateAndBindBank(bank, node, bankSize int) ([]byte, error) {
if bank < 0 || bank >= ms.bankCount {
return nil, errors.New("bank index out of range for scheduler configurations")
Expand Down
Loading