Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ jobs:
- name: Run on-device examples (a2a3)
run: |
export PATH="$HOME/.local/bin:$PATH"
source ${ASCEND_HOME_PATH}/bin/setenv.bash && python ci.py -p a2a3 -d ${DEVICE_RANGE} --parallel -c 882c4db -t 600 --clone-protocol https
source ${ASCEND_HOME_PATH}/bin/setenv.bash && python ci.py -p a2a3 -d ${DEVICE_RANGE} -c 882c4db -t 600 --clone-protocol https


# ---------- Detect A5 changes (runs on GitHub server, not A5 machine) ----------
Expand Down Expand Up @@ -290,4 +290,4 @@ jobs:
export PATH="$HOME/.local/bin:$PATH"
source ${ASCEND_HOME_PATH}/bin/setenv.bash
DEVICE_LIST=$(python -c "s,e='${DEVICE_RANGE}'.split('-'); print(','.join(str(i) for i in range(int(s),int(e)+1)))")
task-submit --device "$DEVICE_LIST" --run "python ci.py -p a5 -d ${DEVICE_RANGE} --parallel -c 882c4db -t 600 --clone-protocol https"
task-submit --device "$DEVICE_LIST" --run "python ci.py -p a5 -d ${DEVICE_RANGE} -c 882c4db -t 600 --clone-protocol https"
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ venv/
.claude/settings.json
.claude/settings.local.json
.claude/worktrees
.claude/plans

# Git cloned dependencies (not tracked in repo)
examples/scripts/_deps/
Expand Down
139 changes: 71 additions & 68 deletions ci.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
per device, reusing ChipWorker across tasks that share the same runtime.

Usage:
python tools/ci.py -p a2a3 -d 5-8 --parallel -c 6622890 -t 600
python tools/ci.py -p a2a3 -d 5-8 -c 6622890 -t 600
python tools/ci.py -p a2a3sim -r tensormap_and_ringbuffer -c 6622890 -t 600
"""

Expand Down Expand Up @@ -601,79 +601,80 @@ def device_worker(
# ---------------------------------------------------------------------------


def run_sim_tasks(compiled: list[CompiledTask], parallel: bool = False) -> list[TaskResult]:
"""Run simulation tasks with ChipWorker reuse per runtime group."""
groups = group_by_runtime(compiled)
def _run_sim_task_with_worker(ct: CompiledTask, worker: ChipWorker, max_attempts: int) -> list[TaskResult]:
"""Run a single sim task using *worker*, returning per-attempt results."""
results: list[TaskResult] = []
lock = Lock()
rt_bins = cast(RuntimeBinariesLike, ct.runtime_bins)

def _run_group(runtime_name: str, group_tasks: list[CompiledTask]):
worker = ChipWorker()
rt_bins = cast(RuntimeBinariesLike, group_tasks[0].runtime_bins)
for attempt in range(max_attempts):
start = time.monotonic()
try:
worker.init(0, str(rt_bins.host_path), rt_bins.aicpu_path.read_bytes(), rt_bins.aicore_path.read_bytes())
run_single_task(ct, worker, 0)
elapsed = time.monotonic() - start
logger.info(f"[sim] PASS: {ct.spec.name} ({elapsed:.1f}s)")
results.append(
TaskResult(
name=ct.spec.name,
platform=ct.spec.platform,
passed=True,
device="sim",
attempt=attempt,
elapsed_s=elapsed,
)
)
break
except Exception as e:
logger.error(f"[sim] Failed to init ChipWorker for {runtime_name}: {e}")
with lock:
results.extend(
TaskResult(
name=ct.spec.name,
platform=ct.spec.platform,
passed=False,
device="sim",
attempt=0,
elapsed_s=0,
error=str(e),
)
for ct in group_tasks
elapsed = time.monotonic() - start
logger.error(f"[sim] FAIL: {ct.spec.name} ({elapsed:.1f}s): {e}")
results.append(
TaskResult(
name=ct.spec.name,
platform=ct.spec.platform,
passed=False,
device="sim",
attempt=attempt,
elapsed_s=elapsed,
error=str(e),
)
return

try:
for ct in group_tasks:
start = time.monotonic()
try:
run_single_task(ct, worker, 0)
elapsed = time.monotonic() - start
logger.info(f"[sim] PASS: {ct.spec.name} ({elapsed:.1f}s)")
r = TaskResult(
name=ct.spec.name,
platform=ct.spec.platform,
passed=True,
device="sim",
attempt=0,
elapsed_s=elapsed,
)
except Exception as e:
elapsed = time.monotonic() - start
logger.error(f"[sim] FAIL: {ct.spec.name} ({elapsed:.1f}s): {e}")
r = TaskResult(
name=ct.spec.name,
platform=ct.spec.platform,
passed=False,
device="sim",
attempt=0,
elapsed_s=elapsed,
error=str(e),
)
with lock:
results.append(r)
)
if attempt + 1 >= max_attempts:
logger.warning(f"[sim] Exhausted retries on {ct.spec.name}")
finally:
worker.reset()
if worker.initialized:
worker.reset()

return results

if parallel:
threads = [Thread(target=_run_group, args=(rt_name, tasks)) for rt_name, tasks in groups.items()]
for t in threads:
t.start()
for t in threads:
t.join()
else:
for rt_name, tasks in groups.items():
_run_group(rt_name, tasks)

def run_sim_tasks(
compiled: list[CompiledTask],
max_attempts: int = MAX_RETRIES,
) -> list[TaskResult]:
"""Run simulation tasks in-process with per-task init/reset isolation."""
results: list[TaskResult] = []
worker = ChipWorker()
for ct in compiled:
results.extend(_run_sim_task_with_worker(ct, worker, max_attempts))
return results


def compile_and_run_sim_tasks(
tasks: list[TaskSpec],
args: argparse.Namespace,
pto_isa_root: str,
) -> list[TaskResult]:
"""Compile simulation tasks in-process and run them with ChipWorker.reset()."""
logger.info(f"Compiling {len(tasks)} sim task(s)...")
compiled = compile_all_tasks(
tasks,
pto_isa_root,
build_runtime=args.build_runtime,
run_all_cases=args.run_all_cases,
)
return run_sim_tasks(compiled, max_attempts=args.max_attempts)


def run_hw_tasks(
compiled: list[CompiledTask],
devices: list[int],
Expand Down Expand Up @@ -1094,7 +1095,6 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("-c", "--pto-isa-commit", default=None)
parser.add_argument("-t", "--timeout", type=int, default=600)
parser.add_argument("--clone-protocol", choices=["ssh", "https"], default="ssh")
parser.add_argument("--parallel", action="store_true")
parser.add_argument("--all", dest="run_all_cases", action="store_true", help="Run all cases, not just DEFAULT_CASE")
parser.add_argument("--device-worker", action="store_true", help=argparse.SUPPRESS)
parser.add_argument("--max-attempts", type=int, default=MAX_RETRIES, help=argparse.SUPPRESS)
Expand Down Expand Up @@ -1150,10 +1150,12 @@ def _watchdog_handler(signum, frame):
return 0
logger.info(f"Discovered {len(tasks)} tasks")

# Step 2 & 3: Compile and run via subprocess-per-runtime-group
# Each subprocess loads exactly one host .so, avoiding RTLD_GLOBAL symbol conflicts.
# Step 2 & 3: Compile and run.
# Sim: in-process with RTLD_LOCAL isolation.
# HW: subprocess per device (-d).
if is_sim:
all_results = run_hw_tasks_subprocess(tasks, [0], args)
pto_isa_root = ensure_pto_isa(args.pto_isa_commit, args.clone_protocol)
all_results = compile_and_run_sim_tasks(tasks, args, pto_isa_root)
else:
all_results = run_hw_tasks_subprocess(tasks, args.devices, args)

Expand All @@ -1169,11 +1171,12 @@ def _watchdog_handler(signum, frame):
if failures and args.pto_isa_commit:
failed_names = {r.name for r in failures}
logger.info(f"[CI] {len(failures)} failure(s), retrying with pinned PTO-ISA {args.pto_isa_commit}")
reset_pto_isa(args.pto_isa_commit, args.clone_protocol)
retry_tasks = [task for task in tasks if task.name in failed_names]
if is_sim:
retry_results = run_hw_tasks_subprocess(retry_tasks, [0], args)
retry_pto_isa_root = reset_pto_isa(args.pto_isa_commit, args.clone_protocol)
retry_results = compile_and_run_sim_tasks(retry_tasks, args, retry_pto_isa_root)
else:
reset_pto_isa(args.pto_isa_commit, args.clone_protocol)
retry_results = run_hw_tasks_subprocess(retry_tasks, args.devices, args)

all_results.extend(retry_results)
Expand Down
Loading
Loading