Add: distributed worker runtime, group task, and fork+shm chip isolation#444
Add: distributed worker runtime, group task, and fork+shm chip isolation#444hw-native-sys-bot wants to merge 1 commit intohw-native-sys:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request implements a distributed runtime for L3 orchestration, introducing core C++ components such as the Orchestrator, Scheduler, and TensorMap, along with their corresponding Python bindings. It enables multi-process sub-worker management via fork and shared memory and updates the ChipWorker to support the new scheduling engine. Feedback highlights critical concurrency issues, specifically race conditions in task dependency tracking and data races within the DistTensorMap. Additionally, improvements are suggested for the DistRing retirement logic and the efficiency of the scheduler and sub-worker loops to reduce CPU overhead and latency.
| TaskState ps_state = ps.state.load(std::memory_order_acquire); | ||
| if (ps_state == TaskState::COMPLETED || ps_state == TaskState::CONSUMED) { | ||
| // Producer already finished; no dependency needed. | ||
| continue; | ||
| } |
There was a problem hiding this comment.
In submit(), producers in the COMPLETED state are skipped when adding dependencies. This creates a critical race condition: a producer can transition from COMPLETED to CONSUMED (and have its buffers freed) after the Orchestrator has looked it up but before the consumer task is even dispatched. This leads to use-after-free when the consumer eventually runs. Dependencies must be added for any producer that is not yet CONSUMED, regardless of whether it has already finished execution.
| TaskState ps_state = ps.state.load(std::memory_order_acquire); | |
| if (ps_state == TaskState::COMPLETED || ps_state == TaskState::CONSUMED) { | |
| // Producer already finished; no dependency needed. | |
| continue; | |
| } | |
| TaskState ps_state = ps.state.load(std::memory_order_acquire); | |
| if (ps_state == TaskState::CONSUMED) { | |
| // Producer already finished and released; no dependency needed. | |
| continue; | |
| } |
| TaskState cur = s.state.load(std::memory_order_acquire); | ||
| if (released >= total && (cur == TaskState::COMPLETED || cur == TaskState::RUNNING)) { | ||
| on_consumed(slot); | ||
| } |
There was a problem hiding this comment.
The consumption logic in release_ref is incorrect and dangerous. First, it uses released >= total as a threshold, which ignores the implicit 'self' reference (the task must complete itself before being consumed), potentially leading to premature release if scope_end is called before the task finishes. Second, it allows a task to be consumed while in the RUNNING state. If a task is consumed while running, its output buffers are deleted (via DistTaskSlotState::reset), causing a use-after-free in the worker thread. Consumption should only occur when the task is COMPLETED and all references (consumers + scope) are released.
| TaskState cur = s.state.load(std::memory_order_acquire); | |
| if (released >= total && (cur == TaskState::COMPLETED || cur == TaskState::RUNNING)) { | |
| on_consumed(slot); | |
| } | |
| TaskState cur = s.state.load(std::memory_order_acquire); | |
| if (released >= total + 1 && cur == TaskState::COMPLETED) { | |
| on_consumed(slot); | |
| } |
| void DistRing::release(DistTaskSlot slot) { | ||
| // Derive which task_id this slot corresponds to. | ||
| // last_alive tracks the highest released task_id (monotonically advancing). | ||
| // We advance last_alive to at least the task_id that owns this slot. | ||
| // Since slots are released roughly in order, this is safe. | ||
| int32_t current = last_alive_.load(std::memory_order_acquire); | ||
| // The slot belongs to some task_id; find the smallest task_id >= current+1 | ||
| // that maps to this slot. | ||
| int32_t base = current + 1; | ||
| int32_t offset = ((slot - base) & window_mask_); | ||
| int32_t task_id = base + offset; | ||
|
|
||
| int32_t expected = current; | ||
| while (task_id > expected) { | ||
| if (last_alive_.compare_exchange_weak( | ||
| expected, task_id, std::memory_order_release, std::memory_order_relaxed | ||
| )) { | ||
| break; | ||
| } | ||
| // expected updated by CAS; retry if another thread advanced it past us | ||
| if (expected >= task_id) break; | ||
| } | ||
| cv_.notify_all(); |
There was a problem hiding this comment.
The release logic in DistRing is fundamentally broken for out-of-order task completion. It uses a monotonic last_alive_ counter and advances it to the task_id of the released slot even if older tasks are still active. This causes alloc() to incorrectly believe that older slots are available for reuse, leading to slot collisions and use-after-free bugs when tasks complete out of order (which is expected in a multi-worker scheduler). A robust implementation should use a free-list (e.g., a thread-safe queue of available slots) instead of a sliding window with monotonic retirement if out-of-order release is required.
| * - Does not perform overlap detection (each base_ptr maps to one producer) | ||
| * - Cleans up entries actively when a task is CONSUMED | ||
| * | ||
| * Owned exclusively by the Orchestrator (main thread); no locking required. |
There was a problem hiding this comment.
The documentation states that DistTensorMap is owned exclusively by the Orchestrator and requires no locking. However, DistWorker::on_consumed (called from the Scheduler thread) invokes orchestrator_.on_consumed, which calls tensormap_->erase_task_outputs. This results in concurrent access to the underlying std::unordered_map from both the Orchestrator and Scheduler threads, leading to data races and undefined behavior. A mutex must be added to protect DistTensorMap operations.
| completion_cv_.wait_for(lk, std::chrono::milliseconds(1), [this] { | ||
| return !completion_queue_.empty() || stop_requested_.load(std::memory_order_acquire); | ||
| }); | ||
| } |
There was a problem hiding this comment.
The scheduler loop uses wait_for with a hardcoded 1ms timeout to check for new tasks in the ready_queue. This introduces unnecessary latency (up to 1ms per task submission when the scheduler is idle) and causes the scheduler thread to wake up frequently even when there is no work. The scheduler should wait on a condition variable that is notified by both the Orchestrator (when a task is pushed to ready_queue) and the WorkerThreads (when a task completes).
| while True: | ||
| state = struct.unpack_from("i", buf, _OFF_STATE)[0] | ||
|
|
||
| if state == _TASK_READY: | ||
| cid = struct.unpack_from("i", buf, _OFF_CALLABLE_ID)[0] | ||
| fn = registry.get(cid) | ||
| error = 0 | ||
| if fn is None: | ||
| error = 1 | ||
| else: | ||
| try: | ||
| fn() | ||
| except Exception: # noqa: BLE001 | ||
| error = 2 | ||
| struct.pack_into("i", buf, _OFF_ERROR_CODE, error) | ||
| # Release store: error_code written before state=TASK_DONE | ||
| struct.pack_into("i", buf, _OFF_STATE, _TASK_DONE) | ||
|
|
||
| elif state == _SHUTDOWN: | ||
| break | ||
| # Tight spin: same as L2 AICPU pattern (dedicated execution unit) |
There was a problem hiding this comment.
The _sub_worker_loop in the forked child process is a tight spin loop that polls the shared memory mailbox without any yielding or sleeping. While this minimizes latency, it will consume 100% of a CPU core per sub-worker even when idle. Given that these are host-side Python workers, adding a small time.sleep(0) or a micro-sleep when the state is _IDLE would significantly reduce CPU waste without a major impact on orchestration latency.
| WorkerThread *wt = pick_idle(s.payload.worker_type); | ||
| if (!wt) { | ||
| cfg_.ready_queue->push(slot); | ||
| break; | ||
| } |
There was a problem hiding this comment.
When all workers of a specific type are busy, dispatch_ready() pops a task from the ready_queue and immediately pushes it back to the end of the queue. This results in a busy-spin (cycling through the queue every 1ms) when the system is under load. Instead of popping and re-pushing, the scheduler should only pop a task when a compatible worker is known to be idle, or it should wait for a 'worker idle' signal to avoid redundant queue operations.
915f7b5 to
e8e25d3
Compare
a4a3f79 to
d2739c5
Compare
- Distributed scheduling engine (src/common/distributed/): TensorMap dependency tracking, ring-buffer back-pressure, scope lifetime, Orchestrator/Scheduler/WorkerThread model - Group task support: submit N args for N workers on 1 DAG node, completion aggregation via sub_complete_count - Fork+shm ChipWorker process isolation (DistChipProcess): each chip runs in its own forked process, eliminating sim global-state crashes when multiple chips execute concurrently - Python scope context manager (with hw.scope():) replaces scope_begin/end - DistSubWorker: fork/shm mailbox for GIL-free Python callable execution - DeviceRunner changed to thread_local for multi-ChipWorker safety - ChipWorker implements IWorker for uniform scheduling interface - Python bindings (nanobind) and Worker/HostWorker wrappers - Move tests/ut/*.py to tests/ut/py/ for consistent test layout - docs/distributed_level_runtime.md: level model, scheduling, API - docs/sim_multi_device_isolation.md: concurrency analysis and fix Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
d2739c5 to
11b1e67
Compare
Summary
src/common/distributed/): TensorMap dependency tracking, ring-buffer back-pressure, scope lifetime, Orchestrator/Scheduler/WorkerThread modelsub_complete_countDistChipProcess): each chip runs in its own forked process with isolated address space, eliminating sim global-state crashes when multiple chips execute concurrentlywith hw.scope():) replacesscope_begin()/scope_end()thread_localfor multi-ChipWorker thread safetyIWorkerfor uniform scheduling interfacetests/ut/*.pytotests/ut/py/for consistent test layoutdocs/distributed_level_runtime.md: level model, scheduling engine, process/thread model, unified APIdocs/sim_multi_device_isolation.md: root cause analysis of sim multi-device crashes and fixTesting