Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
646 changes: 646 additions & 0 deletions docs/pr_async_completion_and_notification.md

Large diffs are not rendered by default.

344 changes: 344 additions & 0 deletions docs/runtime_async.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
Golden script for async_completion_demo.

Single-card / sim path keeps the original producer-consumer pipeline:
producer: out[i] = in[i] * 2.0
consumer: result[i] = out[i] + 1.0

Hardware 2-card path validates `out` and `result`:
each rank TGET_ASYNCs the peer rank's `in` into local `out`, then the
normal consumer computes `result = out + 1`.
"""

import ctypes
import torch

__outputs__ = ["result", "out"]

RTOL = 1e-5
ATOL = 1e-5


def generate_inputs(params: dict) -> list:
SIZE = 128 * 128

inp = torch.full((SIZE,), 3.0, dtype=torch.float32)
out = torch.zeros(SIZE, dtype=torch.float32)
result = torch.zeros(SIZE, dtype=torch.float32)
event_handle_output = torch.zeros(4, dtype=torch.int32)

return [
("in", inp),
("out", out),
("result", result),
("event_handle_output", event_handle_output),
("size_in", ctypes.c_int64(inp.nbytes)),
("size_out", ctypes.c_int64(out.nbytes)),
("size_result", ctypes.c_int64(result.nbytes)),
("size_event_handle_output", ctypes.c_int64(event_handle_output.nbytes)),
("SIZE", ctypes.c_int64(SIZE)),
]


def generate_distributed_inputs(rank: int, nranks: int, root: int,
comm_ctx=None) -> list:
del comm_ctx
del nranks
del root

size = 128 * 128
inp = [float(i % 251) / 10.0 for i in range(size)]
out = [0.0] * size
result = [0.0] * size

return [
("in", inp),
("out", out),
("result", result),
]


def compute_golden(tensors: dict, params: dict) -> None:
if "in" in tensors:
inp = torch.as_tensor(tensors["in"])
tensors["result"][:] = inp * 2.0 + 1.0
tensors["out"][:] = inp * 2.0
return

out = tensors["out"]
result = tensors["result"]
for i in range(len(out)):
value = float(i % 251) / 10.0
out[i] = value
result[i] = value + 1.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Async Completion Demo - Consumer Kernel (func_id=1)
*
* Implements: result[i] = src[i] + 1.0
*
* This kernel executes as a normal run-to-completion task. It depends on the
* producer's output tensor; the scheduler only dispatches it after the
* producer's deferred completion (event flag) is resolved.
*
* Kernel args layout (packed by scheduler):
* args[0] = &Tensor(src) — input tensor struct pointer (producer's output)
* args[1] = &Tensor(result) — output tensor struct pointer
*/

#include <cstdint>
#include <pto/pto-inst.hpp>

#include "tensor.h"

using namespace pto;

#ifndef __gm__
#define __gm__
#endif

#ifndef __aicore__
#define __aicore__ [aicore]
#endif

extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ int64_t* args) {
__gm__ Tensor* src_tensor = reinterpret_cast<__gm__ Tensor*>(args[0]);
__gm__ Tensor* result_tensor = reinterpret_cast<__gm__ Tensor*>(args[1]);

__gm__ float* src = reinterpret_cast<__gm__ float*>(src_tensor->buffer.addr) + src_tensor->start_offset;
__gm__ float* result = reinterpret_cast<__gm__ float*>(result_tensor->buffer.addr) + result_tensor->start_offset;

constexpr int kTRows_ = 128;
constexpr int kTCols_ = 128;
constexpr int vRows = 128;
constexpr int vCols = 128;

using DynShapeDim5 = Shape<1, 1, 1, vRows, vCols>;
using DynStridDim5 = Stride<1, 1, 1, kTCols_, 1>;
using GlobalData = GlobalTensor<float, DynShapeDim5, DynStridDim5>;
using TileData = Tile<TileType::Vec, float, kTRows_, kTCols_, BLayout::RowMajor, -1, -1>;

TileData srcTile(vRows, vCols);
TileData dstTile(vRows, vCols);
TASSIGN(srcTile, 0x0);
TASSIGN(dstTile, 0x10000);

GlobalData srcGlobal(src);
GlobalData dstGlobal(result);

TLOAD(srcTile, srcGlobal);
set_flag(PIPE_MTE2, PIPE_V, EVENT_ID0);
wait_flag(PIPE_MTE2, PIPE_V, EVENT_ID0);

TADDS(dstTile, srcTile, 1.0f);
set_flag(PIPE_V, PIPE_MTE3, EVENT_ID0);
wait_flag(PIPE_V, PIPE_MTE3, EVENT_ID0);

TSTORE(dstGlobal, dstTile);
set_flag(PIPE_MTE3, PIPE_S, EVENT_ID7);
wait_flag(PIPE_MTE3, PIPE_S, EVENT_ID7);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/**
* Async Completion Demo - Simulated Producer Kernel (func_id=0)
*
* Implements: out[i] = in[i] * 2.0
*
* After storing the output, writes 1 to a GM completion flag, then registers
* the completion via the CQ. The scheduler reads the CQ after this kernel
* returns and polls the flag address.
*
* Kernel args layout (packed by scheduler):
* args[0] = &Tensor(in) — input tensor struct pointer
* args[1] = &Tensor(out) — output tensor struct pointer
* args[2] = event_flag_gm_addr — GM flag addr (pre-allocated by golden.py)
* args[3] = cq_addr — completion queue (appended by submit_deferred)
*/

#include <cstdint>
#include <pto/pto-inst.hpp>

#include "tensor.h"

using namespace pto;

#ifndef __gm__
#define __gm__
#endif

#ifndef __aicore__
#define __aicore__ [aicore]
#endif

#include "pto_cq_types.h"
#include "pto_cq_kernel_api.h"

extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ int64_t* args) {
__gm__ Tensor* in_tensor = reinterpret_cast<__gm__ Tensor*>(args[0]);
__gm__ Tensor* out_tensor = reinterpret_cast<__gm__ Tensor*>(args[1]);
uint64_t event_flag_addr = static_cast<uint64_t>(args[2]);
uint64_t cq_addr = static_cast<uint64_t>(args[3]);

__gm__ float* in_data = reinterpret_cast<__gm__ float*>(in_tensor->buffer.addr) + in_tensor->start_offset;
__gm__ float* out_data = reinterpret_cast<__gm__ float*>(out_tensor->buffer.addr) + out_tensor->start_offset;

constexpr int kTRows_ = 128;
constexpr int kTCols_ = 128;
constexpr int vRows = 128;
constexpr int vCols = 128;

using DynShapeDim5 = Shape<1, 1, 1, vRows, vCols>;
using DynStridDim5 = Stride<1, 1, 1, kTCols_, 1>;
using GlobalData = GlobalTensor<float, DynShapeDim5, DynStridDim5>;
using TileData = Tile<TileType::Vec, float, kTRows_, kTCols_, BLayout::RowMajor, -1, -1>;

TileData inTile(vRows, vCols);
TileData outTile(vRows, vCols);
TASSIGN(inTile, 0x0);
TASSIGN(outTile, 0x10000);

GlobalData inGlobal(in_data);
GlobalData outGlobal(out_data);

TLOAD(inTile, inGlobal);
set_flag(PIPE_MTE2, PIPE_V, EVENT_ID0);
wait_flag(PIPE_MTE2, PIPE_V, EVENT_ID0);

// out = in + in = in * 2.0
TADD(outTile, inTile, inTile);
set_flag(PIPE_V, PIPE_MTE3, EVENT_ID0);
wait_flag(PIPE_V, PIPE_MTE3, EVENT_ID0);

TSTORE(outGlobal, outTile);
set_flag(PIPE_MTE3, PIPE_S, EVENT_ID7);
wait_flag(PIPE_MTE3, PIPE_S, EVENT_ID7);

// Signal async completion: write non-zero flag to GM
volatile __gm__ int32_t* flag = reinterpret_cast<volatile __gm__ int32_t*>(
static_cast<uintptr_t>(event_flag_addr));
#if defined(SINGLE_CACHE_LINE) && defined(DSB_DDR)
dcci((__gm__ int32_t*)flag, SINGLE_CACHE_LINE);
*flag = 1;
dcci((__gm__ int32_t*)flag, SINGLE_CACHE_LINE);
dsb(DSB_DDR);
#else
*flag = 1;
#endif

volatile __gm__ PTO2CompletionQueue* cq = pto2_cq_get(cq_addr);
pto2_cq_reset(cq);
pto2_save_expected_completion(PTO2_ENGINE_SDMA, cq, event_flag_addr);
pto2_cq_flush(cq);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Async Completion Demo - Hardware 2P SDMA TGET Producer Kernel (func_id=2)
*
* Implements:
* 1. Read peer rank's input buffer via TGET_ASYNC into local out
* 2. Register the async event in the CQ
* 3. Return immediately so the runtime completes the task asynchronously
*
* This kernel is only compiled for real hardware (a2a3), not for simulation.
*
* Kernel args layout (packed by scheduler):
* args[0] = &Tensor(in) — input tensor struct pointer
* args[1] = &Tensor(out) — output tensor struct pointer
* args[2] = CommDeviceContext* — distributed communication context
* args[3] = sdma_context_addr — SDMA async context
* args[4] = cq_addr — completion queue (appended by submit_deferred)
*/

#include <cstdint>
#ifndef __gm__
#define __gm__
#endif

#ifndef __aicore__
#define __aicore__ [aicore]
#endif

#include <pto/pto-inst.hpp>
#include "pto/comm/pto_comm_inst.hpp"
#include "pto/npu/comm/async/sdma/sdma_types.hpp"
#include "pto/common/pto_tile.hpp"

#include "common/comm_context.h"
#include "tensor.h"

using namespace pto;

#include "pto_sq_kernel_api.h"

template <typename T>
AICORE inline __gm__ T* CommRemotePtr(__gm__ CommDeviceContext* ctx, __gm__ T* local_ptr,
int peer_rank) {
uint64_t local_base = ctx->windowsIn[ctx->rankId];
uint64_t offset = (uint64_t)local_ptr - local_base;
return (__gm__ T*)(ctx->windowsIn[peer_rank] + offset);
}

extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ int64_t* args) {
__gm__ Tensor* in_tensor = reinterpret_cast<__gm__ Tensor*>(args[0]);
__gm__ Tensor* out_tensor = reinterpret_cast<__gm__ Tensor*>(args[1]);
__gm__ CommDeviceContext* comm_ctx =
reinterpret_cast<__gm__ CommDeviceContext*>(args[2]);
uint64_t sdma_context = static_cast<uint64_t>(args[3]);
uint64_t cq_addr = static_cast<uint64_t>(args[4]);

__gm__ float* in_data = reinterpret_cast<__gm__ float*>(in_tensor->buffer.addr) + in_tensor->start_offset;
__gm__ float* out_data = reinterpret_cast<__gm__ float*>(out_tensor->buffer.addr) + out_tensor->start_offset;
volatile __gm__ PTO2CompletionQueue* cq = pto2_cq_get(cq_addr);
pto2_cq_reset(cq);

int my_rank = static_cast<int>(comm_ctx->rankId);
int nranks = static_cast<int>(comm_ctx->rankNum);
if (nranks != 2) {
pipe_barrier(PIPE_ALL);
return;
}
int peer_rank = 1 - my_rank;

constexpr int kTotalElems = 128 * 128;

using FlatShape = Shape<1, 1, 1, 1, kTotalElems>;
using FlatStride = Stride<kTotalElems, kTotalElems, kTotalElems, kTotalElems, 1>;
using FlatGlobalData = GlobalTensor<float, FlatShape, FlatStride>;
FlatGlobalData outGlobalFlat(out_data);
__gm__ float* remote_in_data = CommRemotePtr(comm_ctx, in_data, peer_rank);
FlatGlobalData remoteInGlobalFlat(remote_in_data);

using ScratchTile = pto::Tile<pto::TileType::Vec, uint8_t, 1, pto::comm::sdma::UB_ALIGN_SIZE>;
ScratchTile scratchTile;
TASSIGN(scratchTile, 0x20000);

__gm__ uint8_t* context = reinterpret_cast<__gm__ uint8_t*>(static_cast<uintptr_t>(sdma_context));

auto desc = pto2_sdma_tget_descriptor(outGlobalFlat, remoteInGlobalFlat, scratchTile, context);
uint64_t tag = pto2_send_request_entry(PTO2_ENGINE_SDMA, PTO2_SQ_ID_AUTO, desc);
pto2_save_expected_completion(PTO2_ENGINE_SDMA, cq, tag);

pto2_cq_flush(cq);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""
Async Completion Demo - Kernel and Orchestration Configuration

Two hardware cards use the existing deferred-completion producer API to
demonstrate a real 2P TGET_ASYNC remote read. The legacy single-card / sim
path stays available for local debugging.
"""

import os
from pathlib import Path

_KERNELS_ROOT = Path(__file__).parent

ORCHESTRATION = {
"source": str(_KERNELS_ROOT / "orchestration" / "async_demo_orchestration.cpp"),
"function_name": "aicpu_orchestration_entry",
}

_platform = os.environ.get("PTO_PLATFORM", "a2a3sim")

KERNELS = [
{"func_id": 0, "source": str(_KERNELS_ROOT / "aiv" / "kernel_producer.cpp"), "core_type": "aiv"},
{"func_id": 1, "source": str(_KERNELS_ROOT / "aiv" / "kernel_consumer.cpp"), "core_type": "aiv"},
]

if _platform == "a2a3":
KERNELS.append(
{"func_id": 2, "source": str(_KERNELS_ROOT / "aiv" / "kernel_producer_async.cpp"), "core_type": "aiv"},
)

RUNTIME_CONFIG = {
"runtime": "tensormap_and_ringbuffer",
"aicpu_thread_num": 4,
"orch_thread_num": 1,
"block_dim": 3,
"rounds": 1,
}

if _platform == "a2a3":
RUNTIME_ENV = {
"PTO2_ENABLE_SDMA": "1",
}

DISTRIBUTED_CONFIG = {
"nranks": 2,
"root": 0,
"win_sync_prefix": 256,
"buffers": [
{"name": "in", "dtype": "float32", "count": 128 * 128, "placement": "window"},
{"name": "out", "dtype": "float32", "count": 128 * 128, "placement": "window"},
{"name": "result", "dtype": "float32", "count": 128 * 128, "placement": "device"},
],
"inputs": ["in"],
"outputs": ["out", "result"],
"args": ["in", "out", "result", "deviceCtx"],
}
Loading