Skip to content

Commit a4a3f79

Browse files
committed
Add: distributed worker runtime and host worker API
- Implement DistWorker scheduling, dependency tracking, scope lifetime, and fork/shm SubWorker dispatch - Expose L3 HostWorker and Worker Python APIs through nanobind bindings - Add unit and scene coverage for distributed scheduler, HostWorker, and multi-worker execution
1 parent 30fdeca commit a4a3f79

46 files changed

Lines changed: 5020 additions & 8 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/distributed_level_runtime.md

Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
# Distributed Level Runtime
2+
3+
## 1. Level Model
4+
5+
The runtime uses a 7-level hierarchy that mirrors the physical topology of Ascend NPU clusters:
6+
7+
```text
8+
L6 CLOS2 / Cluster ── full cluster (N6 super-nodes)
9+
L5 CLOS1 / SuperNode ── super-node (N5 pods)
10+
L4 POD / Pod ── pod (4 hosts)
11+
L3 HOST / Node ── single host machine (16 chips + M SubWorkers)
12+
L2 CHIP / Processor ── one NPU chip (shared device memory)
13+
L1 DIE / L2Cache ── chip die (hardware-managed)
14+
L0 CORE / AIV, AIC ── individual compute core (hardware-managed)
15+
```
16+
17+
**L2 is the boundary** between two worlds:
18+
19+
- **L0–L2** (on-device): AICPU scheduler, AICore/AIV workers, device Global Memory. Managed by the simpler runtime. Communication via shared GM with atomics and barriers (Tier 1).
20+
- **L3–L6** (host/cluster): each level is a separate process. Communication via IPC — Unix sockets, TCP, or RDMA (Tier 3). L3↔L2 uses host-device DMA (Tier 2).
21+
22+
Every level from L3 upward runs the **same scheduling engine** (`DistWorker`). The only difference is what workers it manages:
23+
24+
| Level | Workers it contains | Process model |
25+
| ----- | ------------------- | ------------- |
26+
| L3 (Host) | ChipWorker ×N + DistSubWorker ×M | One process per host |
27+
| L4 (Pod) | DistWorker(3) ×N (each is an L3 node) | One process per pod |
28+
| L5 (SuperNode) | DistWorker(4) ×N | One process per super-node |
29+
| L6 (Cluster) | DistWorker(5) ×N | One cluster process |
30+
31+
A `DistWorker` at any level implements `IWorker`, so a higher level treats it as just another worker — recursive composition. The scheduling engine, DAG tracking, and scope management are identical at every level.
32+
33+
## 2. One Level: Orchestrator / Scheduler / Worker
34+
35+
Within each level, three roles cooperate:
36+
37+
```text
38+
Orch thread Scheduler thread Worker threads
39+
─────────── ──────────────── ──────────────
40+
User code ──► DistOrchestrator DistScheduler
41+
│ │
42+
│ submit(callable, args, config) │
43+
│ 1. alloc ring slot │
44+
│ 2. TensorMap: build deps │
45+
│ 3. fanin wiring │
46+
│ 4. if ready → push ready_queue ─►│
47+
│ │ pop ready_queue
48+
│ │ pick idle WorkerThread
49+
│ │ dispatch(payload) ──────► IWorker::run()
50+
│ │ (blocking)
51+
│ │◄── worker_done(slot) ──── return
52+
│ │ on_task_complete:
53+
│ │ fanout release
54+
│ │ wake downstream tasks
55+
│ │ try_consume → ring release
56+
│ │
57+
│ drain() ◄── notify when all done ──│
58+
```
59+
60+
**Orchestrator** (main thread, single-threaded):
61+
62+
- Owns TensorMap, Scope, Ring alloc side — no locks needed
63+
- Builds the DAG: for each submit, looks up input tensors to find producers, wires fanin/fanout edges
64+
- Pushes READY tasks to the ready queue
65+
66+
**Scheduler** (dedicated C++ thread):
67+
68+
- Pops tasks from ready queue, finds idle WorkerThreads, dispatches
69+
- Receives completion callbacks from WorkerThreads
70+
- Releases fanout refs, wakes downstream consumers, retires consumed slots
71+
72+
**WorkerThread** (one per IWorker, dedicated thread):
73+
74+
- Wraps one `IWorker` (ChipWorker, DistSubWorker, or nested DistWorker)
75+
- Calls `worker->run(payload)` synchronously — blocks until done
76+
- Notifies Scheduler via `worker_done(slot)`
77+
78+
## 3. How It Works: Scope, TensorMap, RingBuffer
79+
80+
### TensorMap — automatic dependency inference
81+
82+
TensorMap maps `tensor_base_ptr → producer_task_slot`. When a task is submitted:
83+
84+
```text
85+
submit(inputs=[ptr_A, ptr_B], outputs=[ptr_C]):
86+
87+
TensorMap.lookup(ptr_A) → slot 3 (producer) → fanin edge: 3 → current
88+
TensorMap.lookup(ptr_B) → not found → no dependency
89+
TensorMap.insert(ptr_C, current_slot) → future consumers will depend on us
90+
```
91+
92+
The user never explicitly declares "task X depends on task Y". Dependencies are inferred from which tasks produce/consume the same tensor addresses.
93+
94+
### RingBuffer — slot allocation with back-pressure
95+
96+
The ring manages a fixed window of task slots (`DIST_TASK_WINDOW_SIZE = 128`). The Orchestrator calls `alloc()` to claim the next slot. If all slots are occupied by in-flight tasks, `alloc()` blocks until a slot is freed — this is **back-pressure**, preventing the Orchestrator from running too far ahead of the Scheduler.
97+
98+
```text
99+
alloc() ──► [slot 0][slot 1]...[slot 127] ──► release()
100+
↑ blocks if full ↑ called when task CONSUMED
101+
```
102+
103+
### Scope — intermediate tensor lifetime
104+
105+
Scopes group tasks whose intermediate outputs should be released together. Each task submitted inside a scope carries one extra "scope reference" in its fanout count. When `scope_end()` is called, that reference is released for every task in the scope, allowing completed tasks with no downstream consumers to reach CONSUMED and free their ring slot.
106+
107+
```python
108+
with hw.scope():
109+
r1 = hw.submit(...) # r1 gets scope ref (fanout_total += 1)
110+
r2 = hw.submit(...) # r2 gets scope ref
111+
# scope_end: release scope ref on r1 and r2
112+
# if r1/r2 have no downstream consumers, they transition to CONSUMED
113+
```
114+
115+
Without scopes, tasks with no downstream consumers would never be consumed (no one releases their fanout ref), eventually exhausting the ring.
116+
117+
### Task State Machine
118+
119+
```text
120+
FREE ──► PENDING ──► READY ──► RUNNING ──► COMPLETED ──► CONSUMED
121+
│ │ │ │ │
122+
has fanin fanin=0 Scheduler worker(s) all fanout
123+
deps satisfied dispatches done refs released
124+
→ ring slot freed
125+
```
126+
127+
For group tasks, RUNNING → COMPLETED requires ALL N workers to finish (`sub_complete_count == group_size`).
128+
129+
## 4. Python/C++ Division and Process/Thread Model
130+
131+
### Division of Responsibility
132+
133+
```text
134+
Python layer C++ layer
135+
────────────── ──────────────
136+
Worker / HostWorker DistWorker
137+
- fork() SubWorker processes - DistOrchestrator (DAG, TensorMap)
138+
- register callables (before fork) - DistScheduler (thread, dispatch)
139+
- manage SharedMemory lifecycle - DistRing (slot allocation)
140+
- provide submit() / scope() API - WorkerThread (per-worker thread)
141+
- call drain() to wait - DistSubWorker (mailbox I/O)
142+
- ChipWorker (device runtime)
143+
```
144+
145+
Python handles **process lifecycle** (fork, waitpid, SharedMemory alloc/unlink). C++ handles **scheduling and execution** (threads, atomics, condition variables).
146+
147+
### Process Model
148+
149+
```text
150+
┌─────────────────────────────────────────────────────┐
151+
│ Main process │
152+
│ │
153+
│ Python main thread (Orch) │
154+
│ │ │
155+
│ ├── C++ Scheduler thread │
156+
│ ├── C++ WorkerThread[0] → ChipWorker[0] │
157+
│ ├── C++ WorkerThread[1] → ChipWorker[1] │
158+
│ ├── C++ WorkerThread[2] → DistSubWorker[0] │
159+
│ └── C++ WorkerThread[3] → DistSubWorker[1] │
160+
│ │
161+
└──────────────────────────┬───────────────────────────┘
162+
│ fork() (before C++ threads start)
163+
┌──────────────┼──────────────┐
164+
▼ ▼
165+
┌────────────────┐ ┌────────────────┐
166+
│ Child process 0 │ │ Child process 1 │
167+
│ Python loop: │ │ Python loop: │
168+
│ poll mailbox │ │ poll mailbox │
169+
│ run callable │ │ run callable │
170+
└────────────────┘ └────────────────┘
171+
```
172+
173+
**Fork ordering**: Python forks child processes FIRST, then creates C++ threads (`DistWorker.init()`). This avoids POSIX fork-in-multithreaded-process issues.
174+
175+
### Data Exchange
176+
177+
| Path | Mechanism | Data |
178+
| ---- | --------- | ---- |
179+
| Orch → Scheduler | `DistReadyQueue` (mutex + CV) | task slot index |
180+
| Scheduler → WorkerThread | `WorkerThread.queue_` (mutex + CV) | `WorkerPayload` copy |
181+
| WorkerThread → Scheduler | `completion_queue_` (mutex + CV) | task slot index |
182+
| WorkerThread ↔ Child process | SharedMemory mailbox (256 bytes, acquire/release) | callable_id, state, error_code |
183+
| Python ↔ ChipWorker | `WorkerPayload.callable` / `.args` (raw pointers) | ChipCallable buffer, TaskArgs |
184+
| All tensors | `torch.share_memory_()` or host malloc | zero-copy shared address space |
185+
186+
## 5. Unified Interface — Same API at Every Level
187+
188+
All levels share the same user-facing operations. An orchestration function written for L3 can run at L4 or L5 without modification — only the physical workers behind it change.
189+
190+
### Core Operations
191+
192+
```python
193+
# At any level:
194+
worker.submit(worker_type, payload, inputs=[...], outputs=[...]) # submit a task
195+
worker.submit(..., args_list=[a0, a1, a2, a3]) # submit a group task
196+
with worker.scope(): # scope lifetime
197+
worker.submit(...)
198+
worker.run(Task(orch=my_orch)) # run and drain
199+
```
200+
201+
### L2 Usage — Single Chip
202+
203+
```python
204+
w = Worker(level=2, device_id=0, platform="a2a3sim", runtime="tensormap_and_ringbuffer")
205+
w.init()
206+
w.run(chip_callable, chip_args, block_dim=24)
207+
w.close()
208+
```
209+
210+
### L3 Usage — Multiple Chips + SubWorkers
211+
212+
```python
213+
w = Worker(level=3, device_ids=[0, 1], num_sub_workers=2,
214+
platform="a2a3sim", runtime="tensormap_and_ringbuffer")
215+
cid = w.register(my_python_fn) # register before init (inherited by fork)
216+
w.init()
217+
218+
def my_orch(w, args):
219+
# Build callable and task args (same types as L2)
220+
chip_callable = ChipCallable.build(signature, func_name, orch_bin, children)
221+
task_args = ChipStorageTaskArgs()
222+
task_args.add_tensor(make_tensor_arg(input_tensor))
223+
task_args.add_tensor(make_tensor_arg(output_tensor))
224+
225+
with w.scope():
226+
# ChipWorker task: runs kernel on NPU
227+
payload = WorkerPayload()
228+
payload.callable = chip_callable.buffer_ptr()
229+
payload.args = task_args.__ptr__()
230+
payload.block_dim = 24
231+
r = w.submit(WorkerType.CHIP, payload, outputs=[64])
232+
233+
# SubWorker task: runs Python callable, depends on chip output
234+
sub_p = WorkerPayload()
235+
sub_p.callable_id = cid
236+
w.submit(WorkerType.SUB, sub_p, inputs=[r.outputs[0].ptr])
237+
238+
w.run(Task(orch=my_orch))
239+
w.close()
240+
```
241+
242+
### L3 Group Task — N Chips as One Logical Worker
243+
244+
```python
245+
def my_orch(w, args):
246+
# Each chip gets its own args with rank-specific data
247+
args_list = []
248+
for rank in range(4):
249+
a = ChipStorageTaskArgs()
250+
a.add_tensor(make_tensor_arg(input))
251+
a.add_tensor(make_tensor_arg(output))
252+
a.add_scalar(rank)
253+
a.add_scalar(4)
254+
args_list.append(a.__ptr__())
255+
256+
# 1 DAG node, 4 chips execute in parallel
257+
w.submit(WorkerType.CHIP, payload, args_list=args_list, outputs=[out_size])
258+
```
259+
260+
### Why It's Uniform
261+
262+
The internal C++ interface is `IWorker::run(payload)` — one method, implemented by every worker type:
263+
264+
| Implementation | What `run()` does |
265+
| -------------- | ----------------- |
266+
| `ChipWorker` | Calls NPU runtime → device executes kernel |
267+
| `DistSubWorker` | Writes shared-memory mailbox → forked child executes Python callable |
268+
| `DistWorker` | Runs sub-engine (Orchestrator + Scheduler + workers) → drains |
269+
270+
An L4 Scheduler dispatches to L3 `DistWorker` instances by calling `run()`. It doesn't know or care what's inside — could be 1 chip or 100 chips with SubWorkers. This recursive composition makes the hierarchy arbitrarily deep with zero API changes.
271+
272+
## Architecture Diagram
273+
274+
```text
275+
Python Application
276+
277+
└─► Worker / HostWorker ← Python wrapper (lifecycle, fork management)
278+
279+
└── DistWorker(level=3) ← C++ scheduling engine
280+
281+
├── DistOrchestrator ← submit(), TensorMap, Scope
282+
├── DistScheduler ← ready_queue → WorkerThread dispatch
283+
├── DistRing ← slot allocator with back-pressure
284+
├── DistTensorMap ← base_ptr → producer slot mapping
285+
├── DistScope ← scope lifetime management
286+
287+
├── ChipWorker ×N ← IWorker: NPU device execution
288+
│ └── DeviceRunner (thread_local)
289+
290+
└── DistSubWorker ×M ← IWorker: fork/shm Python callable
291+
└── forked child process ← mailbox state machine
292+
```
293+
294+
## Files
295+
296+
| File | Purpose |
297+
| ---- | ------- |
298+
| `src/common/distributed/dist_types.h/.cpp` | WorkerPayload, DistTaskSlotState, IWorker, DistReadyQueue |
299+
| `src/common/distributed/dist_orchestrator.h/.cpp` | submit / submit_group, TensorMap wiring, scope |
300+
| `src/common/distributed/dist_scheduler.h/.cpp` | Scheduler thread, WorkerThread, group dispatch/completion |
301+
| `src/common/distributed/dist_worker.h/.cpp` | Top-level engine: composes all components |
302+
| `src/common/distributed/dist_ring.h/.cpp` | Circular slot allocator with back-pressure |
303+
| `src/common/distributed/dist_tensormap.h/.cpp` | base_ptr → producer slot mapping |
304+
| `src/common/distributed/dist_scope.h/.cpp` | Scope depth tracking and ref management |
305+
| `src/common/distributed/dist_sub_worker.h/.cpp` | fork/shm IWorker with mailbox protocol |
306+
| `src/common/worker/chip_worker.h/.cpp` | L2 device execution, thread_local DeviceRunner |
307+
| `python/host_worker/host_worker.py` | L3 Python wrapper, fork management, scope context manager |
308+
| `python/worker.py` | Unified Worker factory (L2 + L3) |
309+
| `python/bindings/dist_worker_bind.h` | nanobind bindings for distributed types |

python/bindings/CMakeLists.txt

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,28 @@
77
# See LICENSE in the root of the software repository for the full text of the License.
88
# -----------------------------------------------------------------------------------------------------------
99

10-
# nanobind Python bindings for task_interface
10+
# nanobind Python bindings for task_interface and distributed runtime
1111

1212
set(BINDING_SOURCES
1313
task_interface.cpp
1414
)
1515

1616
list(TRANSFORM BINDING_SOURCES PREPEND "${CMAKE_CURRENT_SOURCE_DIR}/")
1717

18-
nanobind_add_module(_task_interface ${BINDING_SOURCES})
18+
set(DIST_SRC ${CMAKE_SOURCE_DIR}/src/common/distributed)
19+
20+
set(DIST_SOURCES
21+
${DIST_SRC}/dist_types.cpp
22+
${DIST_SRC}/dist_tensormap.cpp
23+
${DIST_SRC}/dist_ring.cpp
24+
${DIST_SRC}/dist_scope.cpp
25+
${DIST_SRC}/dist_orchestrator.cpp
26+
${DIST_SRC}/dist_sub_worker.cpp
27+
${DIST_SRC}/dist_scheduler.cpp
28+
${DIST_SRC}/dist_worker.cpp
29+
)
30+
31+
nanobind_add_module(_task_interface ${BINDING_SOURCES} ${DIST_SOURCES})
1932

2033
target_sources(_task_interface PRIVATE
2134
${CMAKE_SOURCE_DIR}/src/common/worker/chip_worker.cpp
@@ -24,9 +37,11 @@ target_sources(_task_interface PRIVATE
2437
target_include_directories(_task_interface PRIVATE
2538
${CMAKE_SOURCE_DIR}/src/common/task_interface
2639
${CMAKE_SOURCE_DIR}/src/common/worker
40+
${CMAKE_SOURCE_DIR}/src/common/distributed
41+
${CMAKE_CURRENT_SOURCE_DIR}
2742
)
2843

29-
target_link_libraries(_task_interface PRIVATE ${CMAKE_DL_LIBS})
44+
target_link_libraries(_task_interface PRIVATE ${CMAKE_DL_LIBS} pthread)
3045

3146
if(SKBUILD_MODE)
3247
install(TARGETS _task_interface DESTINATION .)

0 commit comments

Comments
 (0)