diff --git a/.github/workflows/all_test.yml b/.github/workflows/all_test.yml index 183b592..9e729ef 100644 --- a/.github/workflows/all_test.yml +++ b/.github/workflows/all_test.yml @@ -65,10 +65,15 @@ jobs: # Scene selection: # - ci_top_attention_doc_page_build validates doc build through the prebuilt Docker image. # - ci_top_attention_bin_kvtest keeps the Rust kv_test entry under the testbed scene contract. + # - ci_top_attention_mq_core keeps MQ correctness coverage inside the same CI testbed contract. suite["scenes"] = { key: value for key, value in suite["scenes"].items() - if key in ("ci_top_attention_doc_page_build", "ci_top_attention_bin_kvtest") + if key in ( + "ci_top_attention_doc_page_build", + "ci_top_attention_bin_kvtest", + "ci_top_attention_mq_core", + ) } # Profile selection: @@ -91,6 +96,7 @@ jobs: # - Keep the original per-scene scales from ci_test_list.yaml. # - ci_top_attention_doc_page_build stays on n1_kvowner_dram_3gib. # - ci_top_attention_bin_kvtest stays on n1_kvowner_dram_20gib. + # - ci_top_attention_mq_core stays on n1_kvowner_dram_20gib. out_path.write_text( yaml.safe_dump(suite, sort_keys=False, allow_unicode=False), diff --git a/AGENTS.md b/AGENTS.md index 61fc7d4..e001b10 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -9,6 +9,9 @@ Keep this document concise. - Git operations are limited to basic `stage`, `unstage`, `commit`, and `push`. Do not use other Git operations. - Prefer contraction over compatibility by default. Do not add compatibility layers, deprecated paths, or aliases unless the task explicitly requires them. - Prefer one canonical name for one concept. Avoid synonym parameters, duplicated entrypoints, and parallel config surfaces. +- For test entrypoints, match the real execution model directly. If a test is a standalone script/process test, invoke it as a script/process; do not wrap it in `pytest` just for uniformity. +- Do not forward pytest-style flags (`-k`, `-q`, node selectors, etc.) through direct-process test wrappers unless the wrapper explicitly implements and documents that selector surface. +- For new integration or process-lifecycle tests, prefer direct process startup with explicit arguments and explicit exit-code checks over adding new pytest-only wrappers. - Control branching deliberately. Prefer a small, explicit, enumerated set of supported branches in the style of a Rust enum over open-ended proliferation of near-duplicate cases. - When extending a surface, prefer folding the new case into an existing finite branch set. If a new branch is unavoidable, make it explicit, bounded, and easy to list exhaustively. - Names for testbed-scoped concepts should say `testbed` explicitly. Avoid generic names for testbed-only modes, ports, roots, workdirs, and other testbed-scoped settings. diff --git a/AGENTS_CN.md b/AGENTS_CN.md index 1d0f86e..3e3c815 100644 --- a/AGENTS_CN.md +++ b/AGENTS_CN.md @@ -9,6 +9,9 @@ - Git 操作仅限基础的 `stage`、`unstage`、`commit` 和 `push`。不要使用其他 Git 操作 - 默认优先收束而不是兼容。除非任务明确要求,否则不要添加兼容层、废弃路径或别名 - 一个概念优先只保留一个正式名字。避免同义参数、重复入口和并行配置面 +- 对测试入口,要直接匹配真实执行模型。如果测试本质上是独立脚本 / 独立进程测试,就按脚本 / 进程直接启动;不要为了表面统一再额外包一层 `pytest` +- 对直接启动进程的测试包装器,不要透传 `-k`、`-q`、node selector 等 pytest 风格参数,除非该包装器显式实现并文档化了这组筛选接口 +- 新增集成测试或进程生命周期测试时,优先采用“直接启动进程 + 显式参数 + 显式检查退出码”的模式,而不是继续新增 pytest 专用包装层 - 有意识地控制分支。优先采用类似 Rust enum 的小而显式、可穷举罗列的有限分支集合,而不是开放式扩散出一批近似重复分支 - 扩展一个 surface 时,优先把新情况折叠进已有的有限分支集合;如果确实必须新增分支,就让它保持显式、边界清楚、易于完整罗列 - testbed 作用域内的概念,命名里应显式带上 `testbed`。对仅属于 testbed 的 mode、port、root、workdir 等设置,避免使用过于泛化的名字 diff --git "a/fluxon_doc_cn/design/teststack_1_\345\275\223\345\211\215\346\236\266\346\236\204\344\270\216CI\346\265\213\350\257\225\346\265\201\347\250\213.md" "b/fluxon_doc_cn/design/teststack_1_\345\275\223\345\211\215\346\236\266\346\236\204\344\270\216CI\346\265\213\350\257\225\346\265\201\347\250\213.md" index ce767f7..823a4be 100644 --- "a/fluxon_doc_cn/design/teststack_1_\345\275\223\345\211\215\346\236\266\346\236\204\344\270\216CI\346\265\213\350\257\225\346\265\201\347\250\213.md" +++ "b/fluxon_doc_cn/design/teststack_1_\345\275\223\345\211\215\346\236\266\346\236\204\344\270\216CI\346\265\213\350\257\225\346\265\201\347\250\213.md" @@ -9,10 +9,11 @@ **稳定结论:** - `teststack` 由三层组成: - - **suite 编译层**:将 `scene × scale × profile` 组合成可执行 case; - - **testbed 编排层**:拉起共享 testbed,保持 controller 在线,并基于 `fluxon_ops` / ops 接口调度部署 workload; - - **case 执行层**:处理每个 case 的 prepare、execute、collect、finalize。 + - **上层:suite 编译层**:将 `scene × scale × profile` 组合成可执行 case; + - **中层:统一 case plan / dispatch 层**:把编译结果收敛成统一的 `prepare / execute / collect / finalize` 外壳,并按 runtime backend 分发; + - **下层:runtime backend 执行层**:分别承接 `CI` backend 和 `TEST_STACK` backend 的具体 prepare、execute、collect、finalize 实现。 - `test_runner.py` 是统一执行器,覆盖 `CI` case、`TEST_STACK` benchmark case,以及 UI / GitOps 集成入口。 +- `test_runner.py` 当前主要承载上层和中层;`test_runner_runtime_backend.py` 承载下层 runtime backend 实现。 - `start_test_bed.py` 只负责共享 testbed 的启动与 controller 侧 apply 编排,不承担通用测试执行职责。 - `ci_2_virt_node.py` 是 GitHub Actions / 本地双逻辑节点 CI 的封装入口,负责串联打包、dispatch、拉起 testbed、运行 runner、构建文档等步骤。 @@ -56,6 +57,46 @@ flowchart TD B --> I[test_runner UI + GitOps] ``` +### 4.0 `test_runner` 内部的上中下分层 + +这里要把“teststack 三层”与“`test_runner` 内部三层”区分开看。 + +`teststack` 整体上仍然是: + +- suite 编译层 +- testbed 编排层 +- case 执行层 + +但在 `test_runner` 自身内部,当前稳定实现已经进一步分成三层: + +| 层级 | 作用 | 当前主要落点 | +| --- | --- | --- | +| 上层 | 解析 suite、selector、`scene/scale/profile`,并 materialize `resolved_case` | `test_runner.py` | +| 中层 | 将不同 case family 收敛成统一 `_CasePlan` 外壳,并负责统一 dispatch | `test_runner.py` | +| 下层 | 按 runtime backend 执行具体 runtime 逻辑 | `test_runner_runtime_backend.py` | + +这里的关键点是: + +- **上层统一的是 schema 和 case 编译模型**; +- **中层统一的是 `prepare / execute / collect / finalize` 的外壳**; +- **下层不再按 `scene/scale/profile` 切分,而是按 runtime backend 切分**。 + +这意味着: + +- `scene / scale / profile` 仍是一套统一输入模型; +- `CI` 与 `TEST_STACK` 的差异,主要落在下层 runtime backend,而不是上层 schema。 + +当前对应关系可以简化理解为: + +```text +scene / scale / profile + -> resolved_case + -> _CasePlan + -> runtime backend dispatch + -> CI backend + -> TEST_STACK backend +``` + ### 4.1 suite 编译层 本层输入为 `ci_test_list.yaml`,主要定义三类核心对象与一类产物注册表: @@ -135,6 +176,25 @@ flowchart TD - `summary.yaml` 是单次 run_dir 的终态摘要; - `resolved_case.yaml` / `resolved_case_full.yaml` 是单次 run 的编译产物。 +### 4.4 `test_runner.py` 与 `test_runner_runtime_backend.py` 的边界 + +当前 repo 内已经开始把 `test_runner` 主体按“统一编译/分发”和“runtime backend 执行”拆开。 + +稳定边界如下: + +| 文件 | 主要职责 | 不负责什么 | +| --- | --- | --- | +| `fluxon_test_stack/test_runner.py` | 上层 suite/schema/case 编译;中层 `_CasePlan` 编译与统一 dispatch;runner 入口、workdir 历史、通用 util | 不再直接承载大段 `CI` / `TEST_STACK` backend 细节 | +| `fluxon_test_stack/test_runner_runtime_backend.py` | 下层 backend 运行逻辑:`_prepare_ci_case`、`_execute_ci_case`、`_prepare_test_stack_case`、`_execute_test_stack_case`、对应 finalize / result wait | 不解析 suite,不决定 `scene × scale × profile` 的组合空间 | + +这层拆分的目的不是制造第二套 case 模型,而是把: + +- **统一 case schema** +- **统一 `_CasePlan` 外壳** +- **不同 runtime backend 实现** + +三者分开,避免把所有逻辑继续堆在一个 `test_runner.py` 里。 + ## 5. teststack 的公共契约 ### 5.1 两类场景 @@ -407,6 +467,12 @@ sequenceDiagram `test_runner.py` 会先把每个 case 编译成 `_CasePlan`。这里有一个通用骨架:所有 case 都分成 `prepare_phases / execute_phases / collect_phases` 三段。不同场景的差异不在“三段结构本身”,而在于每段里放哪些 runtime phase、每个 phase 覆盖哪些 instance,以及 run_dir 怎样 staging。 +这里要明确: + +- `_CasePlan` 属于中层统一外壳; +- `CI` 和 `TEST_STACK` 都要先落到 `_CasePlan`; +- 真正的 backend 差异,延后到下层 runtime backend 才展开。 + 通用语义如下: - prepare phase 先准备场景依赖的 runtime、配置、脚本和共享目录; diff --git a/fluxon_py/tests/test_config.yaml b/fluxon_py/tests/test_config.yaml index a484320..d47332c 100644 --- a/fluxon_py/tests/test_config.yaml +++ b/fluxon_py/tests/test_config.yaml @@ -1,2 +1,5 @@ -deployconf_path: ../../deployment/deployconf.yaml kv_svc_type: fluxon +etcd_address: 127.0.0.1:2379 +cluster_name: fluxon-example-cluster +shared_memory_path: /tmp/fluxon-example-cluster/shm +shared_file_path: /tmp/fluxon-example-cluster/share diff --git a/fluxon_py/tests/test_lib.py b/fluxon_py/tests/test_lib.py index 7f1d511..a246e24 100644 --- a/fluxon_py/tests/test_lib.py +++ b/fluxon_py/tests/test_lib.py @@ -32,12 +32,11 @@ from setup_and_pack.utils.repo_config_utils import ( _verify_host_port, _verify_url, - load_deployconf_etcd_address, - load_deployconf_fluxon_cluster_name, - load_deployconf_fluxon_shared_file_path, - load_deployconf_fluxon_shared_memory_path, load_test_config_mapping, - load_test_deployconf_path, + load_test_etcd_address_from_test_config, + load_test_fluxon_cluster_name_from_test_config, + load_test_fluxon_shared_file_path_from_test_config, + load_test_fluxon_shared_memory_path_from_test_config, load_test_kv_svc_type_from_test_config, ) @@ -50,10 +49,9 @@ def load_test_kv_svc_type(*, config_path: Optional[Path] = None) -> str: def load_test_kv_svc_ip(*, config_path: Optional[Path] = None) -> str: - """Load test backend host from the shared deployconf.""" - deployconf_path = load_test_deployconf_path(config_path=config_path) - etcd_addr = load_deployconf_etcd_address(config_path=deployconf_path) - s, _port = _verify_host_port(etcd_addr, field="deployconf.global_envs.ETCD_FULL_ADDRESS") + """Load test backend host from test_config.yaml.""" + etcd_addr = load_test_etcd_address_from_test_config(config_path=config_path) + s, _port = _verify_host_port(etcd_addr, field="test_config.yaml.etcd_address") if "://" in s or not s: raise ValueError("test backend host should be a host or IP without scheme, e.g. 127.0.0.1") return s @@ -81,21 +79,18 @@ def load_test_mooncake_master_server_address(*, config_path: Optional[Path] = No def load_test_fluxon_cluster_name(*, config_path: Optional[Path] = None) -> str: - """Load required fluxon cluster name from the shared deployconf.""" - deployconf_path = load_test_deployconf_path(config_path=config_path) - return load_deployconf_fluxon_cluster_name(config_path=deployconf_path) + """Load required Fluxon cluster name from test_config.yaml.""" + return load_test_fluxon_cluster_name_from_test_config(config_path=config_path) def load_test_fluxon_share_mem_path(*, config_path: Optional[Path] = None) -> str: - """Load required fluxon shared memory path from the shared deployconf.""" - deployconf_path = load_test_deployconf_path(config_path=config_path) - return load_deployconf_fluxon_shared_memory_path(config_path=deployconf_path) + """Load required Fluxon shared-memory path from test_config.yaml.""" + return load_test_fluxon_shared_memory_path_from_test_config(config_path=config_path) def load_test_fluxon_share_file_path(*, config_path: Optional[Path] = None) -> str: - """Load required fluxon shared file path from the shared deployconf.""" - deployconf_path = load_test_deployconf_path(config_path=config_path) - return load_deployconf_fluxon_shared_file_path(config_path=deployconf_path) + """Load required Fluxon shared-file path from test_config.yaml.""" + return load_test_fluxon_shared_file_path_from_test_config(config_path=config_path) def load_test_chan_config(*, config_path: Optional[Path] = None) -> Dict[str, int]: @@ -105,10 +100,9 @@ def load_test_chan_config(*, config_path: Optional[Path] = None) -> Dict[str, in """ return {"capacity": 10, "ttl_seconds": 90, "weight": 1} -# Resolve ETCD host/port and test configuration via config utils (no direct field access) -_TEST_DEPLOYCONF_PATH = load_test_deployconf_path() -_ETCD_ADDRESS = load_deployconf_etcd_address(config_path=_TEST_DEPLOYCONF_PATH) -ETCD_HOST, _ETCD_PORT = _verify_host_port(_ETCD_ADDRESS, field="deployconf.global_envs.ETCD_FULL_ADDRESS") +# Resolve ETCD host/port and test configuration via test_config.yaml (single explicit authority) +_ETCD_ADDRESS = load_test_etcd_address_from_test_config() +ETCD_HOST, _ETCD_PORT = _verify_host_port(_ETCD_ADDRESS, field="test_config.yaml.etcd_address") ETCD_PORT = int(_ETCD_PORT) KV_SVC_TYPE = load_test_kv_svc_type() KV_SVC_IP = load_test_kv_svc_ip() diff --git a/fluxon_release/test_rsc/source/prepare.yaml b/fluxon_release/test_rsc/source/prepare.yaml index ae542c9..fb0c5ac 100644 --- a/fluxon_release/test_rsc/source/prepare.yaml +++ b/fluxon_release/test_rsc/source/prepare.yaml @@ -19,6 +19,8 @@ python_runtime: source: wheel - pinned: readerwriterlock==1.0.9 source: wheel + - pinned: pytest==8.3.5 + source: wheel zerorpc: requirements: - pinned: zerorpc==0.6.3 diff --git a/fluxon_test_stack/benchmark_full_matrix.yaml b/fluxon_test_stack/benchmark_full_matrix.yaml index ae59bd7..ffa1f2b 100644 --- a/fluxon_test_stack/benchmark_full_matrix.yaml +++ b/fluxon_test_stack/benchmark_full_matrix.yaml @@ -193,8 +193,7 @@ artifact_sets: region: us-east-1 key_prefix: profiles/fluxon_fastws release_artifacts: - python_wheel: fluxon-0.2.1-py3-none-any.whl - pyo3_wheel: fluxon_pyo3-0.2.1-cp38-abi3-manylinux_2_28_x86_64.whl + wheel: fluxon-0.2.1-py3-none-any.whl test_rsc_source: &test_rsc_source_fastws kind: FLUXON_OPS_FS_S3 bucket: fluxon-release @@ -215,8 +214,7 @@ artifact_sets: region: us-east-1 key_prefix: profiles/fluxon_tquic release_artifacts: - python_wheel: fluxon-0.2.1-py3-none-any.whl - pyo3_wheel: fluxon_pyo3-0.2.1-cp38-abi3-manylinux_2_28_x86_64.whl + wheel: fluxon-0.2.1-py3-none-any.whl test_rsc_source: &test_rsc_source_tquic kind: FLUXON_OPS_FS_S3 bucket: fluxon-release @@ -237,8 +235,7 @@ artifact_sets: region: us-east-1 key_prefix: profiles/fluxon_sockudo_ws release_artifacts: - python_wheel: fluxon-0.2.1-py3-none-any.whl - pyo3_wheel: fluxon_pyo3-0.2.1-cp38-abi3-manylinux_2_28_x86_64.whl + wheel: fluxon-0.2.1-py3-none-any.whl test_rsc_source: &test_rsc_source_sockudo_ws kind: FLUXON_OPS_FS_S3 bucket: fluxon-release @@ -259,8 +256,7 @@ artifact_sets: region: us-east-1 key_prefix: profiles/fluxon_tcp release_artifacts: - python_wheel: fluxon-0.2.1-py3-none-any.whl - pyo3_wheel: fluxon_pyo3-0.2.1-cp38-abi3-manylinux_2_28_x86_64.whl + wheel: fluxon-0.2.1-py3-none-any.whl test_rsc_source: &test_rsc_source_tcp kind: FLUXON_OPS_FS_S3 bucket: fluxon-release diff --git a/fluxon_test_stack/ci_test_list.yaml b/fluxon_test_stack/ci_test_list.yaml index 482cb79..6a3c56b 100644 --- a/fluxon_test_stack/ci_test_list.yaml +++ b/fluxon_test_stack/ci_test_list.yaml @@ -29,6 +29,14 @@ scenes: scales: [n1_kvowner_dram_20gib] profiles: [fluxon_tcp] + ci_top_attention_mq_core: + ci: + subject: mq + runtime_contract: cluster_kv_owner + select: + scales: [n1_kvowner_dram_20gib] + profiles: [fluxon_tcp] + kv_read_heavy_zipf: test_stack: mode: KVSTORE @@ -221,8 +229,7 @@ artifact_sets: region: us-east-1 key_prefix: profiles/fluxon_fastws release_artifacts: - python_wheel: fluxon-0.2.1-py3-none-any.whl - pyo3_wheel: fluxon_pyo3-0.2.1-cp38-abi3-manylinux_2_28_x86_64.whl + wheel: fluxon-0.2.1-py3-none-any.whl test_rsc_source: &test_rsc_source_fastws kind: FLUXON_OPS_FS_S3 bucket: fluxon-release @@ -242,8 +249,7 @@ artifact_sets: region: us-east-1 key_prefix: profiles/fluxon_tquic release_artifacts: - python_wheel: fluxon-0.2.1-py3-none-any.whl - pyo3_wheel: fluxon_pyo3-0.2.1-cp38-abi3-manylinux_2_28_x86_64.whl + wheel: fluxon-0.2.1-py3-none-any.whl test_rsc_source: &test_rsc_source_tquic kind: FLUXON_OPS_FS_S3 bucket: fluxon-release @@ -263,8 +269,7 @@ artifact_sets: region: us-east-1 key_prefix: profiles/fluxon_sockudo_ws release_artifacts: - python_wheel: fluxon-0.2.1-py3-none-any.whl - pyo3_wheel: fluxon_pyo3-0.2.1-cp38-abi3-manylinux_2_28_x86_64.whl + wheel: fluxon-0.2.1-py3-none-any.whl test_rsc_source: &test_rsc_source_sockudo_ws kind: FLUXON_OPS_FS_S3 bucket: fluxon-release @@ -284,8 +289,7 @@ artifact_sets: region: us-east-1 key_prefix: profiles/fluxon_tcp release_artifacts: - python_wheel: fluxon-0.2.1-py3-none-any.whl - pyo3_wheel: fluxon_pyo3-0.2.1-cp38-abi3-manylinux_2_28_x86_64.whl + wheel: fluxon-0.2.1-py3-none-any.whl test_rsc_source: &test_rsc_source_tcp kind: FLUXON_OPS_FS_S3 bucket: fluxon-release @@ -315,6 +319,7 @@ profiles: doc_site_base_url: example.com ci_top_attention_bin_kvtest: kv_test_rounds: all + ci_top_attention_mq_core: {} runtime_contracts: cluster_kv_owner: &cluster_kv_owner_runtime base_runtime: @@ -460,6 +465,7 @@ profiles: doc_site_base_url: example.com ci_top_attention_bin_kvtest: kv_test_rounds: all + ci_top_attention_mq_core: {} test_stack: <<: *common_test_stack_runtime fluxon_sockudo_ws: @@ -472,6 +478,7 @@ profiles: doc_site_base_url: example.com ci_top_attention_bin_kvtest: kv_test_rounds: all + ci_top_attention_mq_core: {} test_stack: <<: *common_test_stack_runtime fluxon_tcp: @@ -484,6 +491,7 @@ profiles: doc_site_base_url: example.com ci_top_attention_bin_kvtest: kv_test_rounds: all + ci_top_attention_mq_core: {} test_stack: <<: *common_test_stack_runtime redis_sharded: diff --git a/fluxon_test_stack/test_runner.py b/fluxon_test_stack/test_runner.py index f70618a..d8cd1c9 100644 --- a/fluxon_test_stack/test_runner.py +++ b/fluxon_test_stack/test_runner.py @@ -51,6 +51,52 @@ run_top_attention_entries, select_top_attention_entries, ) +from test_runner_ci_runtime import ( + _assert_ci_runtime_python_abi as _assert_ci_runtime_python_abi_impl, + _ci_runtime_python_abi as _ci_runtime_python_abi_impl, + _ci_runtime_python_executable as _ci_runtime_python_executable_impl, + _create_ci_runtime_venv as _create_ci_runtime_venv_impl, +) +from test_runner_models import ( + _CasePlan, + _CaseRuntimeTracking, + _ExecutedCase, + _ObservedFileState, + _PlannedCase, + _PreparedCase, + _RemoteRunDirStage, + _ResolvedCase, + _RetryableControllerStatusError, + _RunSelectors, + _RunSlot, + _RuntimePhase, + _Suite, +) +from test_runner_runtime_backend import ( + _execute_ci_case as _execute_ci_case_impl, + _execute_test_stack_case as _execute_test_stack_case_impl, + _finalize_case_runtime as _finalize_case_runtime_impl, + _finalize_ci_case_runtime as _finalize_ci_case_runtime_impl, + _finalize_test_stack_case_runtime as _finalize_test_stack_case_runtime_impl, + _prepare_ci_case as _prepare_ci_case_impl, + _prepare_test_stack_case as _prepare_test_stack_case_impl, + _require_ci_runner_exit_code_baseline as _require_ci_runner_exit_code_baseline_impl, + _require_test_stack_result_path as _require_test_stack_result_path_impl, + _require_test_stack_result_timeout as _require_test_stack_result_timeout_impl, + _test_stack_result_timeout_seconds as _test_stack_result_timeout_seconds_impl, + _wait_and_load_test_stack_benchmark_result_json as _wait_and_load_test_stack_benchmark_result_json_impl, +) +from test_runner_ui_runtime import ( + _ci_log_prefix_lines as _ci_log_prefix_lines_impl, + _ci_log_timestamp_prefix as _ci_log_timestamp_prefix_impl, + _load_gitops_ctx_for_ui as _load_gitops_ctx_for_ui_impl, + _redirect_process_stdio_to_log as _redirect_process_stdio_to_log_impl, + _resolve_history_roots_cli_paths as _resolve_history_roots_cli_paths_impl, + _resolve_repo_root_cli_path as _resolve_repo_root_cli_path_impl, + _runner_stdio_mirror_enabled as _runner_stdio_mirror_enabled_impl, + _start_runner_stdio_log_mirror as _start_runner_stdio_log_mirror_impl, + run_ui_service as run_ui_service_impl, +) # NOTE: This project uses multiple schemas: @@ -171,12 +217,11 @@ def _resolve_repo_root_cli_path(*, raw_path: Path, field_name: str) -> Path: - if raw_path.is_absolute(): - return raw_path.resolve() - resolved = (REPO_ROOT / raw_path).resolve() - if not resolved: - raise RuntimeError(f"failed to resolve {field_name} against repo root: raw={raw_path}") - return resolved + return _resolve_repo_root_cli_path_impl( + repo_root=REPO_ROOT, + raw_path=raw_path, + field_name=field_name, + ) def _json_canonicalize(value: Any) -> Any: @@ -356,6 +401,7 @@ def _runner_native_ci_scene_ids() -> Tuple[str, ...]: return ( "ci_top_attention_doc_page_build", "ci_top_attention_bin_kvtest", + "ci_top_attention_mq_core", ) @@ -422,58 +468,23 @@ def _scene_id_uses_runner_native_ci_commands(scene_id: str) -> bool: def _runner_stdio_mirror_enabled() -> bool: - return os.environ.get("GITHUB_ACTIONS", "").strip().lower() == "true" + return _runner_stdio_mirror_enabled_impl() def _ci_log_timestamp_prefix(now: Optional[float] = None) -> str: - ts = datetime.datetime.fromtimestamp( - time.time() if now is None else float(now), - tz=datetime.timezone.utc, - ) - return ts.strftime("[%Y-%m-%d %H:%M:%S UTC]") + return _ci_log_timestamp_prefix_impl(now) def _ci_log_prefix_lines(text: str, *, now: Optional[float] = None) -> str: - if not text: - return "" - prefix = _ci_log_timestamp_prefix(now) - lines = text.splitlines(keepends=True) - return "".join(f"{prefix} {line}" if line.strip() else line for line in lines) + return _ci_log_prefix_lines_impl(text, now=now) def _start_runner_stdio_log_mirror(*, log_path: Path, stdout_fd: int) -> None: - def _mirror_loop() -> None: - offset = 0 - while True: - try: - if log_path.exists(): - size = log_path.stat().st_size - if size < offset: - offset = 0 - if size > offset: - with log_path.open("r", encoding="utf-8", errors="replace") as fp: - fp.seek(offset) - chunk = fp.read() - offset = fp.tell() - if chunk: - data = _ci_log_prefix_lines(chunk).encode("utf-8", errors="replace") - if stdout_fd >= 0: - try: - os.write(stdout_fd, data) - except OSError: - pass - time.sleep(0.2) - except Exception: - time.sleep(0.5) - - mirror = threading.Thread( - target=_mirror_loop, - name="test-runner-stdio-log-mirror", - daemon=True, - ) - mirror.start() global _RUNNER_STDIO_MIRROR_THREAD - _RUNNER_STDIO_MIRROR_THREAD = mirror + _RUNNER_STDIO_MIRROR_THREAD = _start_runner_stdio_log_mirror_impl( + log_path=log_path, + stdout_fd=stdout_fd, + ) def _redirect_process_stdio_to_log(workdir_root: Path) -> None: @@ -488,62 +499,20 @@ def _redirect_process_stdio_to_log(workdir_root: Path) -> None: """ global _RUNNER_STDIO_LOG_FP global _RUNNER_STDIO_KEEPALIVE_FDS - if _RUNNER_STDIO_LOG_FP is not None: - return - - log_path = (workdir_root / RUNNER_STDIO_LOG_FILENAME).resolve() - log_fp = log_path.open("a", encoding="utf-8", buffering=1) - banner = ( - f"{_ci_log_timestamp_prefix()} [test_runner] redirecting process stdio to stable log: {log_path}\n" + _RUNNER_STDIO_LOG_FP, _RUNNER_STDIO_KEEPALIVE_FDS = _redirect_process_stdio_to_log_impl( + workdir_root=workdir_root, + runner_stdio_log_filename=RUNNER_STDIO_LOG_FILENAME, + stdio_log_fp=_RUNNER_STDIO_LOG_FP, + stdio_keepalive_fds=_RUNNER_STDIO_KEEPALIVE_FDS, + start_mirror=_start_runner_stdio_log_mirror, ) - try: - sys.stdout.write(banner) - sys.stdout.flush() - except OSError: - pass - - try: - sys.stdout.flush() - except OSError: - pass - try: - sys.stderr.flush() - except OSError: - pass - - # English note: - # - Some process supervisors (including CI harnesses) treat "stdout/stderr pipe closed" as a signal - # to reap the whole process tree, even if the process is still alive. - # - `dup2(log_fp, stdout_fd)` closes the original stdout pipe. Keep an extra dup() of the original - # pipe write end open so such harnesses do not mis-detect process exit. - if _RUNNER_STDIO_KEEPALIVE_FDS is None: - try: - out_fd = os.dup(sys.stdout.fileno()) - err_fd = os.dup(sys.stderr.fileno()) - os.set_inheritable(out_fd, False) - os.set_inheritable(err_fd, False) - _RUNNER_STDIO_KEEPALIVE_FDS = (out_fd, err_fd) - except OSError: - _RUNNER_STDIO_KEEPALIVE_FDS = (-1, -1) - - os.dup2(log_fp.fileno(), sys.stdout.fileno()) - os.dup2(log_fp.fileno(), sys.stderr.fileno()) - sys.stdout = os.fdopen(sys.stdout.fileno(), "w", encoding="utf-8", buffering=1, closefd=False) - sys.stderr = os.fdopen(sys.stderr.fileno(), "w", encoding="utf-8", buffering=1, closefd=False) - _RUNNER_STDIO_LOG_FP = log_fp - if _runner_stdio_mirror_enabled(): - keepalive = _RUNNER_STDIO_KEEPALIVE_FDS or (-1, -1) - _start_runner_stdio_log_mirror( - log_path=log_path, - stdout_fd=int(keepalive[0]), - ) def _resolve_history_roots_cli_paths(raw_paths: List[str]) -> List[Path]: - return [ - _resolve_repo_root_cli_path(raw_path=Path(path), field_name="history_root") - for path in raw_paths - ] + return _resolve_history_roots_cli_paths_impl( + repo_root=REPO_ROOT, + raw_paths=raw_paths, + ) def _load_gitops_ctx_for_ui( @@ -551,26 +520,10 @@ def _load_gitops_ctx_for_ui( workdir_root: Path, gitops_config_path: Optional[Path], ) -> Optional[gitops_lib.GitOpsContext]: - if gitops_config_path is None: - return None - gitops_workdir = gitops_lib.default_runtime_root(workdir_root) - gitops_ctx = gitops_lib.load_context( - config_path=gitops_config_path, - workdir=gitops_workdir, - ) - gitops_desc = gitops_lib.describe_context(gitops_ctx) - print( - "INFO: test_runner GitOps integrated: " - f"config={gitops_desc['config_path']} workdir={gitops_desc['workdir']} interval={gitops_desc['interval']}s repos={gitops_desc['repo_count']}", - flush=True, + return _load_gitops_ctx_for_ui_impl( + workdir_root=workdir_root, + gitops_config_path=gitops_config_path, ) - threading.Thread( - target=gitops_lib.poll_forever, - args=(gitops_ctx,), - kwargs={"stop_event": None}, - daemon=True, - ).start() - return gitops_ctx def run_ui_service( @@ -582,25 +535,15 @@ def run_ui_service( extra_history_roots: Optional[List[Path]], gitops_config_path: Optional[Path], ) -> None: - workdir_root = workdir_root.resolve() - if workdir_root.exists(): - if not workdir_root.is_dir(): - raise ValueError(f"ui workdir is not a directory: {workdir_root}") - else: - workdir_root.mkdir(parents=True, exist_ok=True) - ui_lock = _acquire_ui_service_lock(workdir_root=workdir_root) - _ = ui_lock - gitops_ctx = _load_gitops_ctx_for_ui( - workdir_root=workdir_root, - gitops_config_path=gitops_config_path, - ) - _serve_test_runner_ui( + run_ui_service_impl( workdir_root=workdir_root, - host=str(host), - port=int(port), - lookback_days=int(lookback_days), + host=host, + port=port, + lookback_days=lookback_days, extra_history_roots=extra_history_roots, - gitops_ctx=gitops_ctx, + gitops_config_path=gitops_config_path, + acquire_ui_service_lock=_acquire_ui_service_lock, + serve_test_runner_ui=_serve_test_runner_ui, ) @@ -1167,116 +1110,6 @@ def main() -> None: raise SystemExit(1) -@dataclass(frozen=True) -class _Suite: - run_mode: str - run_selectors: "_RunSelectors" - scenes: Dict[str, Dict[str, Any]] - scales: Dict[str, Dict[str, Any]] - artifact_sets: Dict[str, Dict[str, Any]] - profiles: Dict[str, Dict[str, Any]] - - -@dataclass(frozen=True) -class _RunSelectors: - case_ids: Optional[Tuple[str, ...]] - profile_ids: Tuple[str, ...] - command_ids: Optional[Tuple[str, ...]] - test_ids: Optional[Tuple[str, ...]] - - -@dataclass(frozen=True) -class _ResolvedCase: - scene_id: str - scale_id: str - profile_id: str - case_id: str - case_key: str - - -@dataclass -class _RunSlot: - case_key: str - case_id: str - run_index: int - rec: Dict[str, Any] - - -@dataclass(frozen=True) -class _PlannedCase: - case: _ResolvedCase - ci_commands: Optional[List[Dict[str, Any]]] - ci_prepare_steps: Optional[List[Dict[str, Any]]] - label: str - command_id: Optional[str] - test_id: Optional[str] - counted: bool - - -@dataclass(frozen=True) -class _ObservedFileState: - size: int - mtime_ns: int - - -@dataclass(frozen=True) -class _RemoteRunDirStage: - archive_prefix: str - stage_prefix: str - verify_relpaths: Tuple[str, ...] - ctx: str - sync_mode: str - include_relpaths: Optional[Tuple[str, ...]] = None - - -@dataclass(frozen=True) -class _RuntimePhase: - phase_id: str - layer: str - instance_ids: Tuple[str, ...] - write_ctx: str - stage_run_dir: Optional[_RemoteRunDirStage] = None - - -class _RetryableControllerStatusError(RuntimeError): - pass - - -@dataclass(frozen=True) -class _CasePlan: - case_family: str - prepare_phases: Tuple[_RuntimePhase, ...] - execute_phases: Tuple[_RuntimePhase, ...] - collect_phases: Tuple[_RuntimePhase, ...] - - -@dataclass(frozen=True) -class _PreparedCase: - plan: _CasePlan - ci_runner_exit_code_baseline: Optional[_ObservedFileState] = None - test_stack_result_path: Optional[Path] = None - test_stack_coordinator_addr: Optional[str] = None - test_stack_result_timeout_s: Optional[int] = None - - -@dataclass(frozen=True) -class _ExecutedCase: - outcome: str - summary: Dict[str, Any] - - -@dataclass -class _CaseRuntimeTracking: - ci_lock_fp: Optional[Any] = None - controller_lock_fp: Optional[Any] = None - ci_attempted_instance_ids: List[str] = field(default_factory=list) - ci_apply_ids: Dict[str, str] = field(default_factory=dict) - ts_coord_deploy_attempted: bool = False - ts_nodes_deploy_attempted: bool = False - ts_coord_apply_id: Optional[str] = None - ts_nodes_apply_id: Optional[str] = None - - def _load_yaml_file(path: Path) -> Any: with path.open("r", encoding="utf-8") as f: return yaml.safe_load(f) @@ -1503,6 +1336,38 @@ def _load_source_stack_contract() -> Dict[str, Any]: } +def _write_ci_runtime_test_config( + *, + src_root: Path, + etcd_address: str, + cluster_name: str, + shared_memory_path: str, + shared_file_path: str, +) -> Path: + """Materialize the single CI test authority consumed by fluxon_py integration tests. + + English note: + - CI cases under cluster_kv_owner start their own master/owner instances from test_runner. + - The test layer therefore must not read repo example deployconf or testbed deployconf as an + indirect authority for case-local runtime wiring. + - Keep one explicit contract only: write the case-scoped etcd/cluster/shared-bundle values that + the downstream tests actually need. + """ + test_cfg_path = (src_root / "fluxon_py" / "tests" / "test_config.yaml").resolve() + test_cfg_path.parent.mkdir(parents=True, exist_ok=True) + _write_yaml_file( + test_cfg_path, + { + "kv_svc_type": "fluxon", + "etcd_address": str(etcd_address), + "cluster_name": str(cluster_name), + "shared_memory_path": str(shared_memory_path), + "shared_file_path": str(shared_file_path), + }, + ) + return test_cfg_path + + def _discover_test_bed_bootstrap_config_override_opt(*, anchor_paths: Tuple[Path, ...]) -> Optional[Path]: seen_roots: set[Path] = set() for anchor_path in anchor_paths: @@ -3582,136 +3447,14 @@ def _prepare_ci_case( case_plan: _CasePlan, runtime_tracking: _CaseRuntimeTracking, ) -> _PreparedCase: - deploy = _require_dict(resolved_case.get("deploy"), "resolved_case.deploy") - ci_checkout_root = _runner_repo_root() - runtime_tracking.ci_lock_fp = _acquire_ci_lock() - _ensure_deployer_online(resolved_case) - out_cluster_name = _ci_cluster_name(resolved_case) - - scale = _require_dict(resolved_case.get("scale"), "resolved_case.scale") - owner_scale = _require_dict(scale.get("owner"), "resolved_case.scale.owner") - owner_count = _require_int(owner_scale.get("owner_count"), "scale.owner.owner_count", min_v=1) - if owner_count != 1: - raise ValueError("CI currently supports only owner_count=1") - owner_dram_bytes = _require_int( - owner_scale.get("owner_dram_bytes"), "scale.owner.owner_dram_bytes", min_v=16777216 - ) - if owner_dram_bytes % 16777216 != 0: - raise ValueError("scale.owner.owner_dram_bytes must be 16MiB aligned") - - release_root = _resolved_case_release_root(resolved_case) - if not release_root.exists(): - raise ValueError(f"materialized case release_root is missing: {release_root}") - - if _ci_has_instance(resolved_case, instance_id="owner_0"): - owner0 = _find_deploy_instance(resolved_case, instance_id="owner_0") - ci_runner = _find_deploy_instance(resolved_case, instance_id="ci_runner") - owner0_target = _require_str( - _require_dict(owner0.get("deployer"), "owner_0.deployer").get("target"), - "owner_0.target", - ) - ci_target = _require_str( - _require_dict(ci_runner.get("deployer"), "ci_runner.deployer").get("target"), - "ci_runner.target", - ) - if owner0_target != ci_target: - raise ValueError("ci_runner must run on the same target as owner_0") - - _ci_cleanup_runtime(resolved_case, timeout_s=120) - _cleanup_previous_failed_ci_runtime( - resolved_case, - run_dir=run_dir, - run_index=run_index, - ) - _ci_assert_ports_free(resolved_case) - _wait_ci_base_runtime_ready(resolved_case) - - services_root = (run_dir / "services").resolve() - services_root.mkdir(parents=True, exist_ok=True) - # Keep this directory as a stable staging anchor for CI runtime bundles. - # The actual shared_memory_path used by iceoryx2 must NOT embed run_dir (path length limit risk). - (services_root / "share_mem").mkdir(parents=True, exist_ok=True) - share_mem_path = _ci_shared_memory_path(resolved_case, run_dir=run_dir) - share_file_path = _ci_shared_file_path(resolved_case, run_dir=run_dir) - # Create the directory on the local host to make the config deterministic for same-host runs. - # Remote targets (via deployer) run on their own host filesystem and will create the directory - # as needed when initializing the KV client. - Path(share_mem_path).mkdir(parents=True, exist_ok=True) - Path(share_file_path).mkdir(parents=True, exist_ok=True) - - venv_dir = (run_dir / "venv").resolve() - if venv_dir.exists(): - raise ValueError(f"venv dir already exists (no overwrite): {venv_dir}") - _run_subprocess([sys.executable, "-m", "venv", str(venv_dir)], cwd=str(run_dir)) - venv_python = venv_dir / "bin" / "python3" - if not venv_python.exists(): - raise ValueError(f"venv python not found after creation: {venv_python}") - - src_root = (run_dir / "src").resolve() - _ci_prepare_run_inputs( + return _prepare_ci_case_impl( + ctx=sys.modules[__name__], + planned_case=planned_case, resolved_case=resolved_case, - source_root=ci_checkout_root, - release_root=release_root, - test_rsc_root=_resolved_case_test_rsc_root(resolved_case), - src_root=src_root, - venv_python=venv_python, - ci_commands=planned_case.ci_commands, - overlay_live_checkout=True, - ) - - prepare_env_exports = _run_ci_prepare_steps( - resolved_case=resolved_case, - run_dir=run_dir, - src_root=src_root, - ) - if prepare_env_exports: - _write_ci_prepare_env_script(run_dir=run_dir, exports=prepare_env_exports) - - profile = _require_dict(resolved_case.get("profile"), "resolved_case.profile") - profile_ci = _require_dict(profile.get("ci"), "resolved_case.profile.ci") - if profile_ci.get("scene_config") is not None: - _write_ci_scene_config_yaml( - resolved_case, - run_dir=run_dir, - ) - if _ci_cluster_runtime_instance_ids(resolved_case): - _write_ci_master_owner_configs( - resolved_case, - run_dir=run_dir, - cluster_name=out_cluster_name, - share_mem_path=share_mem_path, - share_file_path=share_file_path, - owner_dram_bytes=owner_dram_bytes, - ) - _ = _write_ci_runner_script( - resolved_case, run_dir=run_dir, - src_root=src_root, - share_mem_path=share_mem_path, - share_file_path=share_file_path, - ) - ci_runner_exit_code_path = (run_dir / "logs" / "ci_runner" / "exit_code.txt").resolve() - ci_runner_exit_code_baseline = _observe_file_state(ci_runner_exit_code_path) - - for cluster_runtime_phase in case_plan.prepare_phases: - cluster_runtime_deploy_result = _deploy_runtime_phase( - resolved_case, - run_dir=run_dir, - phase=cluster_runtime_phase, - ) - for instance_id in cluster_runtime_phase.instance_ids: - _record_ci_apply_id( - runtime_tracking.ci_attempted_instance_ids, - runtime_tracking.ci_apply_ids, - instance_id=instance_id, - deploy_result=cluster_runtime_deploy_result, - ctx=f"CI cluster_runtime deploy_result[{instance_id}]", - ) - for instance_id in cluster_runtime_phase.instance_ids: - _wait_ci_instance_ready(resolved_case, instance_id=instance_id) - return _PreparedCase( - plan=case_plan, - ci_runner_exit_code_baseline=ci_runner_exit_code_baseline, + run_index=run_index, + case_plan=case_plan, + runtime_tracking=runtime_tracking, ) @@ -3723,204 +3466,13 @@ def _prepare_test_stack_case( test_stack_meta: Dict[str, Any], runtime_tracking: _CaseRuntimeTracking, ) -> _PreparedCase: - _ensure_deployer_online(resolved_case) - deploy = _require_dict(resolved_case.get("deploy"), "resolved_case.deploy") - controller_url = _require_str(deploy.get("controller_url"), "resolved_case.deploy.controller_url").rstrip("/") - case_obj = _require_dict(resolved_case.get("case"), "resolved_case.case") - case_id = _require_str(case_obj.get("case_id"), "resolved_case.case.case_id") - _cleanup_skipped_case_desired_applies(controller_url=controller_url, case_id=case_id) - _write_deployer_manifests(resolved_case, run_dir, allow_overwrite=False) - - scale = _require_dict(resolved_case.get("scale"), "resolved_case.scale") - max_secs = _require_int(scale.get("duration_seconds"), "scale.duration_seconds", min_v=1) - benchmark_scale = _require_dict( - scale.get("benchmark"), - "resolved_case.scale.benchmark", - ) - metric_warmup_seconds = _require_number( - benchmark_scale.get("metric_warmup_seconds"), - "resolved_case.scale.benchmark.metric_warmup_seconds", - ) - profile = _require_dict(resolved_case.get("profile"), "resolved_case.profile") - profile_test_stack = _require_dict(profile.get("test_stack"), "resolved_case.profile.test_stack") - coordinator_ready_timeout_seconds = _require_int( - profile_test_stack.get("coordinator_ready_timeout_seconds"), - "resolved_case.profile.test_stack.coordinator_ready_timeout_seconds", - min_v=1, - ) - - coordinator_addr = _require_str(test_stack_meta.get("coordinator_addr"), "test_stack_meta.coordinator_addr") - if ":" not in coordinator_addr: - raise ValueError(f"invalid coordinator_addr: {coordinator_addr!r}") - coord_host, coord_port_s = coordinator_addr.rsplit(":", 1) - coord_port = int(coord_port_s) - coordinator_phase = _require_runtime_phase_by_id( - case_plan.prepare_phases, - phase_id="coordinator", - ctx="TEST_STACK prepare", - ) - # Fluxon external benchmark nodes bootstrap from a local KV owner shared bundle on each host. - # Clean stale owner-published files before deploying the coordinator phase so owners cannot - # race the delete verification by recreating shared.json/mmap.file during startup. - scene = _require_dict(resolved_case.get("scene"), "resolved_case.scene") - ts_scene = _require_dict(scene.get("test_stack"), "resolved_case.scene.test_stack") - mode = _require_str(ts_scene.get("mode"), "scene.test_stack.mode") - backend_kind = _require_test_stack_backend_kind( - profile_test_stack.get("kind"), - "resolved_case.profile.test_stack.kind", - ) - owner_instance_ids: List[str] = [] - shared_memory_path: Optional[str] = None - shared_file_path: Optional[str] = None - stack_cluster_name: Optional[str] = None - if _test_stack_backend_uses_dedicated_kv_owners(backend_kind=backend_kind, mode=mode): - runtime = _require_dict(resolved_case.get("runtime"), "resolved_case.runtime") - owner_instance_ids = [ - iid - for iid in coordinator_phase.instance_ids - if isinstance(iid, str) and iid.startswith(TEST_STACK_KV_OWNER_INSTANCE_ID_PREFIX) - ] - if not owner_instance_ids: - raise ValueError( - f"{mode} requires dedicated KV owner instances (missing kv_owner_* in prepare phase)" - ) - if _test_stack_backend_uses_external_fluxon_kv(backend_kind=backend_kind, mode=mode): - stack_identity = _require_dict(runtime.get("stack_identity"), "resolved_case.runtime.stack_identity") - stack_cluster_name = _require_str( - stack_identity.get("cluster_name"), - "runtime.stack_identity.cluster_name", - ) - shared_memory_path = _require_str( - stack_identity.get("shared_memory_path"), - "runtime.stack_identity.shared_memory_path", - ) - shared_file_path = _require_str( - stack_identity.get("shared_file_path"), - "runtime.stack_identity.shared_file_path", - ) - _converge_test_stack_external_owner_shared_bundle_cleanup( - resolved_case, - controller_url=controller_url, - owner_instance_ids=owner_instance_ids, - ) - _stage_runtime_phase_run_dir(resolved_case, run_dir=run_dir, phase=coordinator_phase) - _ensure_test_stack_runtime_env_ready_for_instance_ids( - resolved_case, - run_dir=run_dir, - instance_ids=coordinator_phase.instance_ids, - ) - runtime_tracking.ts_coord_deploy_attempted = True - coord_deploy_result = _deploy_runtime_phase_after_stage( - resolved_case, - run_dir=run_dir, - phase=coordinator_phase, - ) - runtime_tracking.ts_coord_apply_id = _deploy_result_history_id( - coord_deploy_result, - ctx="TEST_STACK coordinator deploy_result", - ) - _wait_instance_running(resolved_case, instance_id="coordinator", timeout_s=30) - _wait_instance_tcp_ready( - resolved_case, - instance_id="coordinator", - host=coord_host, - port=coord_port, - timeout_s=coordinator_ready_timeout_seconds, - ) - if backend_kind == TEST_STACK_BACKEND_MOONCAKE: - bench_cfg = _load_test_stack_benchmark_config(run_dir) - run_kv_base = _require_dict(bench_cfg.get("kv_base"), "benchmark_config.CONFIG.kv_base") - run_mooncake_spec = _require_dict( - run_kv_base.get("mooncake_spec"), - "benchmark_config.CONFIG.kv_base.mooncake_spec", - ) - metadata_server = _require_str( - run_mooncake_spec.get("metadata_server"), - "benchmark_config.CONFIG.kv_base.mooncake_spec.metadata_server", - ) - master_server_address = _require_str( - run_mooncake_spec.get("master_server_address"), - "benchmark_config.CONFIG.kv_base.mooncake_spec.master_server_address", - ) - metadata_host_port = urllib.parse.urlparse(metadata_server) - metadata_port = int(metadata_host_port.port or 0) - if metadata_port <= 0: - raise ValueError(f"invalid TEST_STACK Mooncake metadata_server port: {metadata_server!r}") - if ":" not in master_server_address: - raise ValueError(f"invalid TEST_STACK Mooncake master_server_address: {master_server_address!r}") - _, rpc_port_s = master_server_address.rsplit(":", 1) - rpc_port = int(rpc_port_s) - _wait_instance_running( - resolved_case, - instance_id=TEST_STACK_MOONCAKE_MASTER_INSTANCE_ID, - timeout_s=60, - ) - _wait_instance_tcp_ready( - resolved_case, - instance_id=TEST_STACK_MOONCAKE_MASTER_INSTANCE_ID, - host=coord_host, - port=rpc_port, - timeout_s=coordinator_ready_timeout_seconds, - ) - _wait_instance_tcp_ready( - resolved_case, - instance_id=TEST_STACK_MOONCAKE_MASTER_INSTANCE_ID, - host=coord_host, - port=metadata_port, - timeout_s=coordinator_ready_timeout_seconds, - ) - - node_runtime_phase = _require_runtime_phase_by_id( - case_plan.prepare_phases, - phase_id="node_runtime", - ctx="TEST_STACK prepare", - ) - if _test_stack_backend_uses_external_fluxon_kv(backend_kind=backend_kind, mode=mode): - if shared_memory_path is None or shared_file_path is None or stack_cluster_name is None: - raise ValueError( - "internal error: TEST_STACK shared bundle identity is missing after pre-deploy cleanup" - ) - if "master" in set(coordinator_phase.instance_ids): - _wait_instance_running(resolved_case, instance_id="master", timeout_s=60) - bench = _require_dict(_require_dict(resolved_case.get("scale"), "resolved_case.scale").get("benchmark"), "scale.benchmark") - cluster_ready_timeout_seconds = _require_int( - bench.get("cluster_ready_timeout_seconds"), - "scale.benchmark.cluster_ready_timeout_seconds", - min_v=1, - ) - for owner_id in owner_instance_ids: - owner_target = _instance_target_name(resolved_case, instance_id=owner_id) - shared_bundle_paths = _test_stack_external_owner_shared_bundle_paths( - resolved_case, - owner_target=owner_target, - ) - _wait_instance_running(resolved_case, instance_id=owner_id, timeout_s=60) - _wait_instance_files_present( - resolved_case, - instance_id=owner_id, - paths=shared_bundle_paths, - timeout_s=int(cluster_ready_timeout_seconds), - ctx="TEST_STACK owner shared bundle", - ) - elif _test_stack_backend_uses_dedicated_kv_owners(backend_kind=backend_kind, mode=mode): - for owner_id in owner_instance_ids: - _wait_instance_running(resolved_case, instance_id=owner_id, timeout_s=60) - _stage_runtime_phase_run_dir(resolved_case, run_dir=run_dir, phase=node_runtime_phase) - _ensure_test_stack_runtime_env_ready_for_instance_ids( - resolved_case, + return _prepare_test_stack_case_impl( + ctx=sys.modules[__name__], + resolved_case=resolved_case, run_dir=run_dir, - instance_ids=node_runtime_phase.instance_ids, - ) - return _PreparedCase( - plan=case_plan, - test_stack_result_path=Path( - _require_str(test_stack_meta.get("result_path"), "test_stack_meta.result_path") - ), - test_stack_coordinator_addr=coordinator_addr, - test_stack_result_timeout_s=_test_stack_result_timeout_seconds( - max_benchmark_seconds=int(max_secs), - metric_warmup_seconds=float(metric_warmup_seconds), - ), + case_plan=case_plan, + test_stack_meta=test_stack_meta, + runtime_tracking=runtime_tracking, ) @@ -3934,49 +3486,16 @@ def _execute_ci_case( prepared_case: _PreparedCase, runtime_tracking: _CaseRuntimeTracking, ) -> _ExecutedCase: - ci_runner_exit_timeout_s = _ci_runner_exit_code_timeout_seconds(resolved_case) - ci_runner_phase = prepared_case.plan.execute_phases[0] - ci_runner_deploy_result = _deploy_runtime_phase( - resolved_case, - run_dir=run_dir, - phase=ci_runner_phase, - ) - _record_ci_apply_id( - runtime_tracking.ci_attempted_instance_ids, - runtime_tracking.ci_apply_ids, - instance_id="ci_runner", - deploy_result=ci_runner_deploy_result, - ctx="CI ci_runner deploy_result", - ) - _wait_ci_instance_ready(resolved_case, instance_id="ci_runner") - rc = _wait_ci_runner_exit_code( + return _execute_ci_case_impl( + ctx=sys.modules[__name__], + planned_case=planned_case, resolved_case=resolved_case, run_dir=run_dir, - timeout_s=ci_runner_exit_timeout_s, - baseline_state=_require_ci_runner_exit_code_baseline( - prepared_case.ci_runner_exit_code_baseline, - ), - ) - outcome = RUN_OUTCOME_SUCCESS if rc == 0 else RUN_OUTCOME_FAILED - if outcome == RUN_OUTCOME_SUCCESS and runtime_tracking.ci_apply_ids.get("ci_runner") is not None: - _delete_apply_id( - resolved_case, - apply_id=_require_str(runtime_tracking.ci_apply_ids.get("ci_runner"), "CI ci_runner apply_id"), - ctx="CI ci_runner apply", - ) - del runtime_tracking.ci_apply_ids["ci_runner"] - summary = _build_ci_summary_yaml( - resolved_case, run_index=run_index, - started_at_unix_s=started_at, - finished_at_unix_s=int(time.time()), - outcome=outcome, - counted=False, - ci_out={"rc": rc}, + started_at=started_at, + prepared_case=prepared_case, + runtime_tracking=runtime_tracking, ) - for phase in prepared_case.plan.collect_phases: - _collect_runtime_phase(resolved_case, run_dir=run_dir, phase=phase) - return _ExecutedCase(outcome=outcome, summary=summary) def _execute_test_stack_case( @@ -3988,81 +3507,15 @@ def _execute_test_stack_case( prepared_case: _PreparedCase, runtime_tracking: _CaseRuntimeTracking, ) -> _ExecutedCase: - case_obj = _require_dict(resolved_case.get("case"), "resolved_case.case") - case_id = _require_str(case_obj.get("case_id"), "case.case_id") - - # Always return a summary (including FAIL), so every run_dir has a terminal artifact. - # This is required for FULL_ONCE resume and for postmortem without log spelunking. - outcome = RUN_OUTCOME_FAILED - error_detail: Optional[str] = None - collect_error_detail: Optional[str] = None - result_obj: Optional[Dict[str, Any]] = None - - try: - node_phase = prepared_case.plan.execute_phases[0] - runtime_tracking.ts_nodes_deploy_attempted = True - node_deploy_result = _deploy_runtime_phase( - resolved_case, - run_dir=run_dir, - phase=node_phase, - ) - runtime_tracking.ts_nodes_apply_id = _deploy_result_history_id( - node_deploy_result, - ctx="TEST_STACK node deploy_result", - ) - - result_path = _require_test_stack_result_path(prepared_case.test_stack_result_path) - timeout_s = _require_test_stack_result_timeout(prepared_case.test_stack_result_timeout_s) - result_obj = _wait_and_load_test_stack_benchmark_result_json( - resolved_case, - result_path, - timeout_s=timeout_s, - case_id=case_id, - writer_instance_id="coordinator", - ) - - # The result file is a terminal artifact even when the benchmark failed. - # Therefore: - # - wait for it to become readable/structurally complete (shared-fs races) - # - validate once to decide SUCCESS/FAILED deterministically (do not "wait for success") - _validate_test_stack_benchmark_result(result_obj, case_id=case_id) - outcome = RUN_OUTCOME_SUCCESS - except Exception as exc: # noqa: BLE001 - error_detail = f"{type(exc).__name__}: {exc}" - finally: - # Collect is best-effort: benchmark_result.json is the terminal artifact for pass/fail. - # If /api/status is flaky (e.g. 502), we still want deterministic case outcome and - # keep the collect error visible in summary.yaml. - try: - for phase in prepared_case.plan.collect_phases: - _collect_runtime_phase(resolved_case, run_dir=run_dir, phase=phase) - except Exception as exc: # noqa: BLE001 - collect_error_detail = f"{type(exc).__name__}: {exc}" - - summary = { - "schema_version": SCHEMA_VERSION, - "case_id": case_id, - "case_key": _require_str(case_obj.get("case_key"), "case.case_key"), - "run_index": int(run_index), - "outcome": outcome, - "counted": False, - "timing": { - "started_at_unix_s": int(started_at), - "finished_at_unix_s": int(time.time()), - }, - "test_stack": { - "coordinator_addr": _require_str( - prepared_case.test_stack_coordinator_addr, - "prepared_case.test_stack_coordinator_addr", - ), - "completion_signal": "benchmark_result_json", - "result_path": str(_require_test_stack_result_path(prepared_case.test_stack_result_path)), - "result": result_obj, - "error": error_detail, - "collect_error": collect_error_detail, - }, - } - return _ExecutedCase(outcome=outcome, summary=summary) + return _execute_test_stack_case_impl( + ctx=sys.modules[__name__], + resolved_case=resolved_case, + run_dir=run_dir, + run_index=run_index, + started_at=started_at, + prepared_case=prepared_case, + runtime_tracking=runtime_tracking, + ) def _wait_and_load_test_stack_benchmark_result_json( @@ -4073,52 +3526,14 @@ def _wait_and_load_test_stack_benchmark_result_json( case_id: str, writer_instance_id: str, ) -> Dict[str, Any]: - """Wait until benchmark_result.json becomes readable and structurally complete. - - English note: - - In the self-host test stack, the benchmark coordinator may run on a remote node and write - `benchmark_result.json` only to that host-local staged run_dir. - - Therefore the runner must observe the result through the coordinator instance, not only via - its own local filesystem view. - - Previous runs also observed transient partial reads right after the file appeared. - - Keep convergence deterministic: - - retry until the JSON parses AND the expected top-level structure exists - - do NOT retry waiting for a failed benchmark to "turn into" SUCCESS; validation happens once - in the caller and the run is finalized immediately. - """ - deadline = time.time() + float(timeout_s) - last_err: Optional[str] = None - while True: - try: - raw = _instance_read_text_if_present( - resolved_case, - instance_id=writer_instance_id, - path=result_path, - ) - if raw is None: - raise FileNotFoundError( - f"result file is not present yet: instance_id={writer_instance_id} path={result_path}" - ) - parsed = json.loads(raw) - result_obj = _require_dict(parsed, "test_stack.benchmark_result") - # Structural checks only (avoid waiting on stable FAIL status). - runs = _require_list(result_obj.get("runs"), "benchmark_result.runs") - if not runs: - raise ValueError("benchmark_result.runs is empty") - run0 = _require_dict(runs[0], "benchmark_result.runs[0]") - completion = _require_dict(run0.get("completion"), "benchmark_result.runs[0].completion") - _ = _require_str(completion.get("status"), "benchmark_result.runs[0].completion.status") - if not result_path.exists() or result_path.read_text(encoding="utf-8") != raw: - result_path.write_text(raw, encoding="utf-8") - return result_obj - except Exception as exc: # noqa: BLE001 - last_err = f"{type(exc).__name__}: {exc}" - if time.time() >= deadline: - raise ValueError( - f"benchmark result json did not become readable/valid within timeout: " - f"case_id={case_id} path={result_path} timeout_s={timeout_s} last_err={last_err}" - ) from exc - time.sleep(0.5) + return _wait_and_load_test_stack_benchmark_result_json_impl( + ctx=sys.modules[__name__], + resolved_case=resolved_case, + result_path=result_path, + timeout_s=timeout_s, + case_id=case_id, + writer_instance_id=writer_instance_id, + ) def _finalize_case_runtime( @@ -4129,22 +3544,14 @@ def _finalize_case_runtime( runtime_tracking: _CaseRuntimeTracking, outcome: str, ) -> None: - if case_plan.case_family == CASE_FAMILY_CI: - _finalize_ci_case_runtime( - resolved_case, - run_dir=run_dir, - runtime_tracking=runtime_tracking, - outcome=outcome, - ) - return - if case_plan.case_family == CASE_FAMILY_BENCH: - _finalize_test_stack_case_runtime( - resolved_case, - runtime_tracking=runtime_tracking, - outcome=outcome, - ) - return - raise ValueError(f"unsupported case family for finalize_case_runtime: {case_plan.case_family}") + _finalize_case_runtime_impl( + ctx=sys.modules[__name__], + resolved_case=resolved_case, + run_dir=run_dir, + case_plan=case_plan, + runtime_tracking=runtime_tracking, + outcome=outcome, + ) def _finalize_ci_case_runtime( @@ -4154,49 +3561,12 @@ def _finalize_ci_case_runtime( runtime_tracking: _CaseRuntimeTracking, outcome: str, ) -> None: - case = _require_dict(resolved_case.get("case"), "resolved_case.case") - run_mode = _require_str(case.get("run_mode"), "resolved_case.case.run_mode") - ci_preserved_apply_ids: list[dict[str, str]] = [] - for instance_id in runtime_tracking.ci_attempted_instance_ids: - apply_id = runtime_tracking.ci_apply_ids.get(instance_id) - if apply_id is None: - continue - ci_preserved_apply_ids.append( - { - "instance_id": instance_id, - "apply_id": _require_str(apply_id, f"CI {instance_id} apply_id"), - } - ) - # In FULL_ONCE runs, always teardown to keep the shared test bed clean for the next case. - # Debug runs preserve failed runtime for interactive inspection. - should_teardown = outcome == RUN_OUTCOME_SUCCESS or run_mode == RUN_MODE_FULL_ONCE - if should_teardown: - (run_dir / CI_PRESERVED_APPLY_IDS_FILENAME).unlink(missing_ok=True) - for instance_id in reversed(runtime_tracking.ci_attempted_instance_ids): - apply_id = runtime_tracking.ci_apply_ids.get(instance_id) - if apply_id is None: - continue - _delete_apply_id( - resolved_case, - apply_id=_require_str(apply_id, f"CI {instance_id} apply_id"), - ctx=f"CI {instance_id} apply", - ) - _ci_cleanup_runtime(resolved_case, timeout_s=120) - return - if not ci_preserved_apply_ids: - return - _write_yaml_file( - run_dir / CI_PRESERVED_APPLY_IDS_FILENAME, - { - "schema_version": CI_PRESERVED_APPLY_IDS_SCHEMA_VERSION, - "apply_ids": ci_preserved_apply_ids, - }, - ) - print( - "[CI preserve_runtime] " - "case_id=" - f"{_resolved_case_case_id(resolved_case)} outcome={outcome} apply_ids=" - + ", ".join(f"{entry['instance_id']}={entry['apply_id']}" for entry in ci_preserved_apply_ids) + _finalize_ci_case_runtime_impl( + ctx=sys.modules[__name__], + resolved_case=resolved_case, + run_dir=run_dir, + runtime_tracking=runtime_tracking, + outcome=outcome, ) @@ -4206,58 +3576,26 @@ def _finalize_test_stack_case_runtime( runtime_tracking: _CaseRuntimeTracking, outcome: str, ) -> None: - case = _require_dict(resolved_case.get("case"), "resolved_case.case") - run_mode = _require_str(case.get("run_mode"), "resolved_case.case.run_mode") - ts_preserved_apply_ids: list[str] = [] - if runtime_tracking.ts_nodes_deploy_attempted and runtime_tracking.ts_nodes_apply_id is not None: - ts_preserved_apply_ids.append( - f"nodes={_require_str(runtime_tracking.ts_nodes_apply_id, 'TEST_STACK node apply_id')}" - ) - if runtime_tracking.ts_coord_deploy_attempted and runtime_tracking.ts_coord_apply_id is not None: - ts_preserved_apply_ids.append( - f"coordinator={_require_str(runtime_tracking.ts_coord_apply_id, 'TEST_STACK coordinator apply_id')}" - ) - # In FULL_ONCE runs, always teardown to keep the shared test bed clean for the next case. - # Debug runs preserve failed runtime for interactive inspection. - should_teardown = outcome == RUN_OUTCOME_SUCCESS or run_mode == RUN_MODE_FULL_ONCE - if should_teardown: - if runtime_tracking.ts_nodes_deploy_attempted and runtime_tracking.ts_nodes_apply_id is not None: - _delete_apply_id( - resolved_case, - apply_id=_require_str(runtime_tracking.ts_nodes_apply_id, "TEST_STACK node apply_id"), - ctx="TEST_STACK node apply", - ) - if runtime_tracking.ts_coord_deploy_attempted and runtime_tracking.ts_coord_apply_id is not None: - _delete_apply_id( - resolved_case, - apply_id=_require_str(runtime_tracking.ts_coord_apply_id, "TEST_STACK coordinator apply_id"), - ctx="TEST_STACK coordinator apply", - ) - return - if not ts_preserved_apply_ids: - return - print( - "[TEST_STACK preserve_runtime] " - f"case_id={_resolved_case_case_id(resolved_case)} outcome={outcome} apply_ids={', '.join(ts_preserved_apply_ids)}" + _finalize_test_stack_case_runtime_impl( + ctx=sys.modules[__name__], + resolved_case=resolved_case, + runtime_tracking=runtime_tracking, + outcome=outcome, ) def _require_ci_runner_exit_code_baseline( baseline_state: Optional[_ObservedFileState], ) -> Optional[_ObservedFileState]: - return baseline_state + return _require_ci_runner_exit_code_baseline_impl(baseline_state) def _require_test_stack_result_path(result_path: Optional[Path]) -> Path: - if result_path is None: - raise ValueError("prepared_case.test_stack_result_path is missing") - return result_path + return _require_test_stack_result_path_impl(result_path) def _require_test_stack_result_timeout(timeout_s: Optional[int]) -> int: - if timeout_s is None or timeout_s < 1: - raise ValueError("prepared_case.test_stack_result_timeout_s must be positive") - return int(timeout_s) + return _require_test_stack_result_timeout_impl(timeout_s) def _test_stack_result_timeout_seconds( @@ -4265,19 +3603,9 @@ def _test_stack_result_timeout_seconds( max_benchmark_seconds: int, metric_warmup_seconds: float, ) -> int: - """Keep runner-side result waiting aligned with coordinator semantics.""" - if max_benchmark_seconds <= 0: - raise ValueError( - f"max_benchmark_seconds must be > 0, got: {max_benchmark_seconds}" - ) - if metric_warmup_seconds < 0.0: - raise ValueError( - f"metric_warmup_seconds must be >= 0, got: {metric_warmup_seconds}" - ) - return int( - float(max_benchmark_seconds) - + float(metric_warmup_seconds) - + 600.0 + return _test_stack_result_timeout_seconds_impl( + max_benchmark_seconds=max_benchmark_seconds, + metric_warmup_seconds=metric_warmup_seconds, ) @@ -7699,6 +7027,18 @@ def _runner_native_ci_commands_for_case(case: _ResolvedCase, *, ctx: str) -> Lis "timeout_seconds": 21600, } ] + if scene_id == "ci_top_attention_mq_core": + return [ + { + "id": "top_attention_mq_core", + "command": ( + "__RUN_DIR__/venv/bin/python3 -u " + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_mq_core.py " + "--case-config __RUN_DIR__/configs/ci_scene_config.yaml" + ), + "timeout_seconds": 21600, + } + ] raise ValueError(f"{ctx} unsupported runner-native CI scene: {scene_id!r}") @@ -11459,6 +10799,24 @@ def _record_ci_apply_id( ci_apply_ids[instance_id] = _deploy_result_history_id(deploy_result, ctx=ctx) +def _ci_runtime_tracked_apply_entries(runtime_tracking: _CaseRuntimeTracking) -> List[Dict[str, Any]]: + entries: List[Dict[str, Any]] = [] + by_apply_id: Dict[str, Dict[str, Any]] = {} + for instance_id in runtime_tracking.ci_attempted_instance_ids: + apply_id = runtime_tracking.ci_apply_ids.get(instance_id) + if apply_id is None: + continue + entry = by_apply_id.get(apply_id) + if entry is None: + entry = {"apply_id": apply_id, "instance_ids": []} + by_apply_id[apply_id] = entry + entries.append(entry) + instance_ids = _require_list(entry.get("instance_ids"), "ci tracked apply entry.instance_ids") + if instance_id not in instance_ids: + instance_ids.append(instance_id) + return entries + + def _delete_apply_id(resolved_case: Dict[str, Any], *, apply_id: str, ctx: str) -> None: deploy = _require_dict(resolved_case.get("deploy"), "resolved_case.deploy") controller_url = _require_str(deploy.get("controller_url"), "deploy.controller_url").rstrip("/") @@ -11701,18 +11059,22 @@ def _cleanup_previous_failed_ci_runtime( raw_apply_ids = _require_list(payload.get("apply_ids"), f"{preserved_path}.apply_ids") for index, raw in enumerate(raw_apply_ids): entry = _require_dict(raw, f"{preserved_path}.apply_ids[{index}]") - instance_id = _require_str(entry.get("instance_id"), f"{preserved_path}.apply_ids[{index}].instance_id") + instance_ids = _require_list(entry.get("instance_ids"), f"{preserved_path}.apply_ids[{index}].instance_ids") + instance_id_text = ",".join( + _require_str(raw_instance_id, f"{preserved_path}.apply_ids[{index}].instance_ids[]") + for raw_instance_id in instance_ids + ) apply_id = _require_str(entry.get("apply_id"), f"{preserved_path}.apply_ids[{index}].apply_id") if apply_id in deleted_apply_ids: continue print( - f"[CI cleanup_previous_failed_runtime] run_dir={previous_run_dir} instance_id={instance_id} apply_id={apply_id}", + f"[CI cleanup_previous_failed_runtime] run_dir={previous_run_dir} instance_ids={instance_id_text} apply_id={apply_id}", flush=True, ) _delete_apply_id( previous_cleanup_case, apply_id=apply_id, - ctx=f"CI cleanup previous failed runtime {previous_run_dir.name} {instance_id}", + ctx=f"CI cleanup previous failed runtime {previous_run_dir.name} {instance_id_text}", ) preserved_path.unlink(missing_ok=True) @@ -12661,58 +12023,53 @@ def _manifest_integrity_stat_fingerprint( return tuple(fingerprint) -def _test_stack_runtime_offline_dependency_requirements_for_resolved_case( - resolved_case: Dict[str, Any], +def _offline_dependency_requirements_from_test_rsc_root( + *, + test_rsc_root: Path, + dependency_set_ids: Tuple[str, ...], + ctx: str, ) -> Tuple[str, ...]: - dependency_set_ids = ["base"] - scene = _require_dict(resolved_case.get("scene"), "resolved_case.scene") - scene_ts_raw = scene.get("test_stack") - if isinstance(scene_ts_raw, dict): - rpc_backend_kind = str(scene_ts_raw.get("rpc_backend_kind", "")).strip().upper() - if rpc_backend_kind == TEST_STACK_RPC_BACKEND_ZERORPC: - dependency_set_ids.append("zerorpc") - test_rsc_root = _resolved_case_test_rsc_root(resolved_case) prepare_cfg_path = (test_rsc_root / _TEST_STACK_RUNTIME_PREPARE_CONFIG_NAME).resolve() - prepare_cfg = _require_dict(_load_yaml_file(prepare_cfg_path), f"TEST_STACK runtime prepare config {prepare_cfg_path}") + prepare_cfg = _require_dict(_load_yaml_file(prepare_cfg_path), f"{ctx} prepare config {prepare_cfg_path}") python_runtime_cfg = _require_dict( prepare_cfg.get("python_runtime"), - f"TEST_STACK runtime prepare config python_runtime {prepare_cfg_path}", + f"{ctx} prepare config python_runtime {prepare_cfg_path}", ) dependency_sets_cfg = _require_dict( python_runtime_cfg.get("dependency_sets"), - f"TEST_STACK runtime prepare config python_runtime.dependency_sets {prepare_cfg_path}", + f"{ctx} prepare config python_runtime.dependency_sets {prepare_cfg_path}", ) out: List[str] = [] seen: set[str] = set() for set_id in dependency_set_ids: set_cfg = _require_dict( dependency_sets_cfg.get(set_id), - f"TEST_STACK runtime prepare config python_runtime.dependency_sets.{set_id} {prepare_cfg_path}", + f"{ctx} prepare config python_runtime.dependency_sets.{set_id} {prepare_cfg_path}", ) requirements = set_cfg.get("requirements") if not isinstance(requirements, list): raise ValueError( - "TEST_STACK runtime prepare config dependency set requirements must be a list: " + f"{ctx} prepare config dependency set requirements must be a list: " f"path={prepare_cfg_path} set_id={set_id}" ) for index, raw_item in enumerate(requirements): requirement_cfg = _require_dict( raw_item, ( - "TEST_STACK runtime prepare config " + f"{ctx} prepare config " f"python_runtime.dependency_sets.{set_id}.requirements[{index}]" ), ) requirement = _require_str( requirement_cfg.get("pinned"), ( - "TEST_STACK runtime prepare config " + f"{ctx} prepare config " f"python_runtime.dependency_sets.{set_id}.requirements[{index}].pinned" ), ).strip() if not requirement: raise ValueError( - "TEST_STACK runtime prepare config dependency requirement must be non-empty: " + f"{ctx} prepare config dependency requirement must be non-empty: " f"path={prepare_cfg_path} set_id={set_id} index={index}" ) if requirement in seen: @@ -12722,6 +12079,32 @@ def _test_stack_runtime_offline_dependency_requirements_for_resolved_case( return tuple(out) +def _test_stack_runtime_offline_dependency_requirements_for_resolved_case( + resolved_case: Dict[str, Any], +) -> Tuple[str, ...]: + dependency_set_ids: List[str] = ["base"] + scene = _require_dict(resolved_case.get("scene"), "resolved_case.scene") + scene_ts_raw = scene.get("test_stack") + if isinstance(scene_ts_raw, dict): + rpc_backend_kind = str(scene_ts_raw.get("rpc_backend_kind", "")).strip().upper() + if rpc_backend_kind == TEST_STACK_RPC_BACKEND_ZERORPC: + dependency_set_ids.append("zerorpc") + test_rsc_root = _resolved_case_test_rsc_root(resolved_case) + return _offline_dependency_requirements_from_test_rsc_root( + test_rsc_root=test_rsc_root, + dependency_set_ids=tuple(dependency_set_ids), + ctx="TEST_STACK runtime", + ) + + +def _ci_runtime_offline_dependency_requirements(*, test_rsc_root: Path) -> Tuple[str, ...]: + return _offline_dependency_requirements_from_test_rsc_root( + test_rsc_root=test_rsc_root, + dependency_set_ids=("base",), + ctx="CI runtime", + ) + + def _test_stack_runtime_wheelhouse_root( *, test_rsc_root: Path, @@ -12733,6 +12116,72 @@ def _test_stack_runtime_wheelhouse_root( / python_abi / _TEST_STACK_RUNTIME_PYTHON_RUNTIME_WHEELHOUSE_DIRNAME ).resolve() + + +def _ci_runtime_wheelhouse_root(*, test_rsc_root: Path) -> Path: + return _test_stack_runtime_wheelhouse_root( + test_rsc_root=test_rsc_root, + python_abi=_TEST_STACK_DEFAULT_PYTHON_ABI, + ) + + +def _ci_runtime_python_executable() -> str: + return _ci_runtime_python_executable_impl() + + +def _ci_runtime_python_abi(*, venv_python: Path) -> str: + return _ci_runtime_python_abi_impl( + venv_python=venv_python, + normalize_python_abi=_test_stack_normalize_python_abi, + ) + + +def _assert_ci_runtime_python_abi(*, venv_python: Path) -> None: + _assert_ci_runtime_python_abi_impl( + venv_python=venv_python, + normalize_python_abi=_test_stack_normalize_python_abi, + ) + + +def _create_ci_runtime_venv(*, run_dir: Path) -> Path: + return _create_ci_runtime_venv_impl( + run_dir=run_dir, + run_subprocess=lambda argv: _run_subprocess(argv, cwd=str(run_dir)), + assert_python_abi=lambda venv_python: _assert_ci_runtime_python_abi(venv_python=venv_python), + ) + + +def _prepare_ci_runtime_python_env( + *, + test_rsc_root: Path, + venv_python: Path, + src_root: Path, +) -> None: + _assert_ci_runtime_python_abi(venv_python=venv_python) + wheelhouse_root = _ci_runtime_wheelhouse_root(test_rsc_root=test_rsc_root) + if not wheelhouse_root.exists() or not wheelhouse_root.is_dir(): + raise ValueError( + "CI runtime offline wheelhouse is missing from test_rsc: " + f"{wheelhouse_root}" + ) + offline_dependency_requirements = _ci_runtime_offline_dependency_requirements( + test_rsc_root=test_rsc_root + ) + if not offline_dependency_requirements: + raise ValueError("CI runtime offline dependency requirements must be non-empty") + _run_subprocess( + [ + str(venv_python), + "-m", + "pip", + "install", + "--no-index", + "--find-links", + str(wheelhouse_root), + *offline_dependency_requirements, + ], + cwd=str(src_root), + ) _TEST_STACK_RUNTIME_BACKEND_DIRNAME = "backend" _TEST_STACK_RUNTIME_SOURCE_IGNORE_NAMES: Tuple[str, ...] = ( ".dever", @@ -14333,6 +13782,10 @@ def _ci_prepare_run_inputs( venv_python: Path, ci_commands: Optional[List[Dict[str, str]]], overlay_live_checkout: bool, + etcd_address: str, + cluster_name: str, + shared_memory_path: str, + shared_file_path: str, ) -> None: """Materialize CI run inputs from the case release into an isolated run_dir. @@ -14403,6 +13856,13 @@ def _ci_prepare_run_inputs( build_config_ext_path = src_root / "build_config_ext.yml" if not build_config_ext_path.exists(): build_config_ext_path.write_text("", encoding="utf-8") + _write_ci_runtime_test_config( + src_root=src_root, + etcd_address=etcd_address, + cluster_name=cluster_name, + shared_memory_path=shared_memory_path, + shared_file_path=shared_file_path, + ) release_link_path = src_root / "fluxon_release" _materialize_ci_runtime_release_view( release_root=release_root, @@ -14410,6 +13870,12 @@ def _ci_prepare_run_inputs( release_view_root=release_link_path, ) + _prepare_ci_runtime_python_env( + test_rsc_root=test_rsc_root, + venv_python=venv_python, + src_root=src_root, + ) + wheel = release_root / wheel_name _run_subprocess( [ diff --git a/fluxon_test_stack/test_runner_ci_runtime.py b/fluxon_test_stack/test_runner_ci_runtime.py new file mode 100644 index 0000000..bef19e2 --- /dev/null +++ b/fluxon_test_stack/test_runner_ci_runtime.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +import shutil +import subprocess +from pathlib import Path +from typing import Callable + + +_TEST_STACK_DEFAULT_PYTHON_ABI = "cpython3.10" +_CI_RUNTIME_PYTHON_BIN_NAME = "python3.10" + + +def _ci_runtime_python_executable() -> str: + python_bin = shutil.which(_CI_RUNTIME_PYTHON_BIN_NAME) + if python_bin is None: + raise ValueError( + "CI runtime requires python3.10 on PATH to create the offline-wheelhouse venv" + ) + return python_bin + + +def _ci_runtime_python_abi( + *, + venv_python: Path, + normalize_python_abi: Callable[[str], str], +) -> str: + try: + raw = subprocess.check_output( + [ + str(venv_python), + "-c", + ( + "import sys; " + "print(f'{sys.implementation.name}{sys.version_info[0]}.{sys.version_info[1]}')" + ), + ], + text=True, + ).strip() + except (OSError, subprocess.CalledProcessError) as exc: + raise ValueError(f"failed to probe CI runtime venv python ABI: python={venv_python}") from exc + return normalize_python_abi(raw) + + +def _assert_ci_runtime_python_abi( + *, + venv_python: Path, + normalize_python_abi: Callable[[str], str], +) -> None: + got_python_abi = _ci_runtime_python_abi( + venv_python=venv_python, + normalize_python_abi=normalize_python_abi, + ) + if got_python_abi != _TEST_STACK_DEFAULT_PYTHON_ABI: + raise ValueError( + "CI runtime venv python ABI must match the prepared offline wheelhouse: " + f"expected={_TEST_STACK_DEFAULT_PYTHON_ABI} got={got_python_abi} python={venv_python}" + ) + + +def _create_ci_runtime_venv( + *, + run_dir: Path, + run_subprocess: Callable[[list[str]], None], + assert_python_abi: Callable[[Path], None], +) -> Path: + venv_dir = (run_dir / "venv").resolve() + if venv_dir.exists(): + raise ValueError(f"venv dir already exists (no overwrite): {venv_dir}") + python_bin = _ci_runtime_python_executable() + run_subprocess([python_bin, "-m", "venv", str(venv_dir)]) + venv_python = venv_dir / "bin" / "python3" + if not venv_python.exists(): + raise ValueError(f"venv python not found after creation: {venv_python}") + assert_python_abi(venv_python) + return venv_python diff --git a/fluxon_test_stack/test_runner_models.py b/fluxon_test_stack/test_runner_models.py new file mode 100644 index 0000000..cb38467 --- /dev/null +++ b/fluxon_test_stack/test_runner_models.py @@ -0,0 +1,115 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + + +@dataclass(frozen=True) +class _Suite: + run_mode: str + run_selectors: "_RunSelectors" + scenes: Dict[str, Dict[str, Any]] + scales: Dict[str, Dict[str, Any]] + artifact_sets: Dict[str, Dict[str, Any]] + profiles: Dict[str, Dict[str, Any]] + + +@dataclass(frozen=True) +class _RunSelectors: + case_ids: Optional[Tuple[str, ...]] + profile_ids: Tuple[str, ...] + command_ids: Optional[Tuple[str, ...]] + test_ids: Optional[Tuple[str, ...]] + + +@dataclass(frozen=True) +class _ResolvedCase: + scene_id: str + scale_id: str + profile_id: str + case_id: str + case_key: str + + +@dataclass +class _RunSlot: + case_key: str + case_id: str + run_index: int + rec: Dict[str, Any] + + +@dataclass(frozen=True) +class _PlannedCase: + case: _ResolvedCase + ci_commands: Optional[List[Dict[str, Any]]] + ci_prepare_steps: Optional[List[Dict[str, Any]]] + label: str + command_id: Optional[str] + test_id: Optional[str] + counted: bool + + +@dataclass(frozen=True) +class _ObservedFileState: + size: int + mtime_ns: int + + +@dataclass(frozen=True) +class _RemoteRunDirStage: + archive_prefix: str + stage_prefix: str + verify_relpaths: Tuple[str, ...] + ctx: str + sync_mode: str + include_relpaths: Optional[Tuple[str, ...]] = None + + +@dataclass(frozen=True) +class _RuntimePhase: + phase_id: str + layer: str + instance_ids: Tuple[str, ...] + write_ctx: str + stage_run_dir: Optional[_RemoteRunDirStage] = None + + +class _RetryableControllerStatusError(RuntimeError): + pass + + +@dataclass(frozen=True) +class _CasePlan: + case_family: str + prepare_phases: Tuple[_RuntimePhase, ...] + execute_phases: Tuple[_RuntimePhase, ...] + collect_phases: Tuple[_RuntimePhase, ...] + + +@dataclass(frozen=True) +class _PreparedCase: + plan: _CasePlan + ci_runner_exit_code_baseline: Optional[_ObservedFileState] = None + test_stack_result_path: Optional[Path] = None + test_stack_coordinator_addr: Optional[str] = None + test_stack_result_timeout_s: Optional[int] = None + + +@dataclass(frozen=True) +class _ExecutedCase: + outcome: str + summary: Dict[str, Any] + + +@dataclass +class _CaseRuntimeTracking: + ci_lock_fp: Optional[Any] = None + controller_lock_fp: Optional[Any] = None + ci_attempted_instance_ids: List[str] = field(default_factory=list) + ci_apply_ids: Dict[str, str] = field(default_factory=dict) + ts_coord_deploy_attempted: bool = False + ts_nodes_deploy_attempted: bool = False + ts_coord_apply_id: Optional[str] = None + ts_nodes_apply_id: Optional[str] = None diff --git a/fluxon_test_stack/test_runner_runtime_backend.py b/fluxon_test_stack/test_runner_runtime_backend.py new file mode 100644 index 0000000..bc46a76 --- /dev/null +++ b/fluxon_test_stack/test_runner_runtime_backend.py @@ -0,0 +1,702 @@ +from __future__ import annotations + +import json +import time +import urllib.parse +from pathlib import Path +from typing import Any, Dict, List, Optional + + +def _prepare_ci_case( + *, + ctx: Any, + planned_case: Any, + resolved_case: Dict[str, Any], + run_dir: Path, + run_index: int, + case_plan: Any, + runtime_tracking: Any, +) -> Any: + _ = ctx._require_dict(resolved_case.get("deploy"), "resolved_case.deploy") + ci_checkout_root = ctx._runner_repo_root() + runtime_tracking.ci_lock_fp = ctx._acquire_ci_lock() + ctx._ensure_deployer_online(resolved_case) + out_cluster_name = ctx._ci_cluster_name(resolved_case) + + scale = ctx._require_dict(resolved_case.get("scale"), "resolved_case.scale") + owner_scale = ctx._require_dict(scale.get("owner"), "resolved_case.scale.owner") + owner_count = ctx._require_int(owner_scale.get("owner_count"), "scale.owner.owner_count", min_v=1) + if owner_count != 1: + raise ValueError("CI currently supports only owner_count=1") + owner_dram_bytes = ctx._require_int( + owner_scale.get("owner_dram_bytes"), "scale.owner.owner_dram_bytes", min_v=16777216 + ) + if owner_dram_bytes % 16777216 != 0: + raise ValueError("scale.owner.owner_dram_bytes must be 16MiB aligned") + + release_root = ctx._resolved_case_release_root(resolved_case) + if not release_root.exists(): + raise ValueError(f"materialized case release_root is missing: {release_root}") + + if ctx._ci_has_instance(resolved_case, instance_id="owner_0"): + owner0 = ctx._find_deploy_instance(resolved_case, instance_id="owner_0") + ci_runner = ctx._find_deploy_instance(resolved_case, instance_id="ci_runner") + owner0_target = ctx._require_str( + ctx._require_dict(owner0.get("deployer"), "owner_0.deployer").get("target"), + "owner_0.target", + ) + ci_target = ctx._require_str( + ctx._require_dict(ci_runner.get("deployer"), "ci_runner.deployer").get("target"), + "ci_runner.target", + ) + if owner0_target != ci_target: + raise ValueError("ci_runner must run on the same target as owner_0") + + ctx._ci_cleanup_runtime(resolved_case, timeout_s=120) + ctx._cleanup_previous_failed_ci_runtime( + resolved_case, + run_dir=run_dir, + run_index=run_index, + ) + ctx._ci_assert_ports_free(resolved_case) + ctx._wait_ci_base_runtime_ready(resolved_case) + + services_root = (run_dir / "services").resolve() + services_root.mkdir(parents=True, exist_ok=True) + (services_root / "share_mem").mkdir(parents=True, exist_ok=True) + share_mem_path = ctx._ci_shared_memory_path(resolved_case, run_dir=run_dir) + share_file_path = ctx._ci_shared_file_path(resolved_case, run_dir=run_dir) + Path(share_mem_path).mkdir(parents=True, exist_ok=True) + Path(share_file_path).mkdir(parents=True, exist_ok=True) + + venv_python = ctx._create_ci_runtime_venv(run_dir=run_dir) + + src_root = (run_dir / "src").resolve() + ctx._ci_prepare_run_inputs( + resolved_case=resolved_case, + source_root=ci_checkout_root, + release_root=release_root, + test_rsc_root=ctx._resolved_case_test_rsc_root(resolved_case), + src_root=src_root, + venv_python=venv_python, + ci_commands=planned_case.ci_commands, + overlay_live_checkout=True, + etcd_address=f"{ctx._ci_base_runtime_service_target_ip(resolved_case, service_id='etcd')}:{ctx._ci_base_runtime_service_port(resolved_case, service_id='etcd')}", + cluster_name=out_cluster_name, + shared_memory_path=share_mem_path, + shared_file_path=share_file_path, + ) + + prepare_env_exports = ctx._run_ci_prepare_steps( + resolved_case=resolved_case, + run_dir=run_dir, + src_root=src_root, + ) + if prepare_env_exports: + ctx._write_ci_prepare_env_script(run_dir=run_dir, exports=prepare_env_exports) + + profile = ctx._require_dict(resolved_case.get("profile"), "resolved_case.profile") + profile_ci = ctx._require_dict(profile.get("ci"), "resolved_case.profile.ci") + if profile_ci.get("scene_config") is not None: + ctx._write_ci_scene_config_yaml( + resolved_case, + run_dir=run_dir, + ) + if ctx._ci_cluster_runtime_instance_ids(resolved_case): + ctx._write_ci_master_owner_configs( + resolved_case, + run_dir=run_dir, + cluster_name=out_cluster_name, + share_mem_path=share_mem_path, + share_file_path=share_file_path, + owner_dram_bytes=owner_dram_bytes, + ) + _ = ctx._write_ci_runner_script( + resolved_case, + run_dir=run_dir, + src_root=src_root, + share_mem_path=share_mem_path, + share_file_path=share_file_path, + ) + ci_runner_exit_code_path = (run_dir / "logs" / "ci_runner" / "exit_code.txt").resolve() + ci_runner_exit_code_baseline = ctx._observe_file_state(ci_runner_exit_code_path) + + for cluster_runtime_phase in case_plan.prepare_phases: + cluster_runtime_deploy_result = ctx._deploy_runtime_phase( + resolved_case, + run_dir=run_dir, + phase=cluster_runtime_phase, + ) + for instance_id in cluster_runtime_phase.instance_ids: + ctx._record_ci_apply_id( + runtime_tracking.ci_attempted_instance_ids, + runtime_tracking.ci_apply_ids, + instance_id=instance_id, + deploy_result=cluster_runtime_deploy_result, + ctx=f"CI cluster_runtime deploy_result[{instance_id}]", + ) + for instance_id in cluster_runtime_phase.instance_ids: + ctx._wait_ci_instance_ready(resolved_case, instance_id=instance_id) + return ctx._PreparedCase( + plan=case_plan, + ci_runner_exit_code_baseline=ci_runner_exit_code_baseline, + ) + + +def _prepare_test_stack_case( + *, + ctx: Any, + resolved_case: Dict[str, Any], + run_dir: Path, + case_plan: Any, + test_stack_meta: Dict[str, Any], + runtime_tracking: Any, +) -> Any: + ctx._ensure_deployer_online(resolved_case) + deploy = ctx._require_dict(resolved_case.get("deploy"), "resolved_case.deploy") + controller_url = ctx._require_str(deploy.get("controller_url"), "resolved_case.deploy.controller_url").rstrip("/") + case_obj = ctx._require_dict(resolved_case.get("case"), "resolved_case.case") + case_id = ctx._require_str(case_obj.get("case_id"), "resolved_case.case.case_id") + ctx._cleanup_skipped_case_desired_applies(controller_url=controller_url, case_id=case_id) + ctx._write_deployer_manifests(resolved_case, run_dir, allow_overwrite=False) + + scale = ctx._require_dict(resolved_case.get("scale"), "resolved_case.scale") + max_secs = ctx._require_int(scale.get("duration_seconds"), "scale.duration_seconds", min_v=1) + benchmark_scale = ctx._require_dict( + scale.get("benchmark"), + "resolved_case.scale.benchmark", + ) + metric_warmup_seconds = ctx._require_number( + benchmark_scale.get("metric_warmup_seconds"), + "resolved_case.scale.benchmark.metric_warmup_seconds", + ) + profile = ctx._require_dict(resolved_case.get("profile"), "resolved_case.profile") + profile_test_stack = ctx._require_dict(profile.get("test_stack"), "resolved_case.profile.test_stack") + coordinator_ready_timeout_seconds = ctx._require_int( + profile_test_stack.get("coordinator_ready_timeout_seconds"), + "resolved_case.profile.test_stack.coordinator_ready_timeout_seconds", + min_v=1, + ) + + coordinator_addr = ctx._require_str(test_stack_meta.get("coordinator_addr"), "test_stack_meta.coordinator_addr") + if ":" not in coordinator_addr: + raise ValueError(f"invalid coordinator_addr: {coordinator_addr!r}") + coord_host, coord_port_s = coordinator_addr.rsplit(":", 1) + coord_port = int(coord_port_s) + coordinator_phase = ctx._require_runtime_phase_by_id( + case_plan.prepare_phases, + phase_id="coordinator", + ctx="TEST_STACK prepare", + ) + scene = ctx._require_dict(resolved_case.get("scene"), "resolved_case.scene") + ts_scene = ctx._require_dict(scene.get("test_stack"), "resolved_case.scene.test_stack") + mode = ctx._require_str(ts_scene.get("mode"), "scene.test_stack.mode") + backend_kind = ctx._require_test_stack_backend_kind( + profile_test_stack.get("kind"), + "resolved_case.profile.test_stack.kind", + ) + owner_instance_ids: List[str] = [] + shared_memory_path: Optional[str] = None + shared_file_path: Optional[str] = None + stack_cluster_name: Optional[str] = None + if ctx._test_stack_backend_uses_dedicated_kv_owners(backend_kind=backend_kind, mode=mode): + runtime = ctx._require_dict(resolved_case.get("runtime"), "resolved_case.runtime") + owner_instance_ids = [ + iid + for iid in coordinator_phase.instance_ids + if isinstance(iid, str) and iid.startswith(ctx.TEST_STACK_KV_OWNER_INSTANCE_ID_PREFIX) + ] + if not owner_instance_ids: + raise ValueError( + f"{mode} requires dedicated KV owner instances (missing kv_owner_* in prepare phase)" + ) + if ctx._test_stack_backend_uses_external_fluxon_kv(backend_kind=backend_kind, mode=mode): + stack_identity = ctx._require_dict(runtime.get("stack_identity"), "resolved_case.runtime.stack_identity") + stack_cluster_name = ctx._require_str( + stack_identity.get("cluster_name"), + "runtime.stack_identity.cluster_name", + ) + shared_memory_path = ctx._require_str( + stack_identity.get("shared_memory_path"), + "runtime.stack_identity.shared_memory_path", + ) + shared_file_path = ctx._require_str( + stack_identity.get("shared_file_path"), + "runtime.stack_identity.shared_file_path", + ) + ctx._converge_test_stack_external_owner_shared_bundle_cleanup( + resolved_case, + controller_url=controller_url, + owner_instance_ids=owner_instance_ids, + ) + ctx._stage_runtime_phase_run_dir(resolved_case, run_dir=run_dir, phase=coordinator_phase) + ctx._ensure_test_stack_runtime_env_ready_for_instance_ids( + resolved_case, + run_dir=run_dir, + instance_ids=coordinator_phase.instance_ids, + ) + runtime_tracking.ts_coord_deploy_attempted = True + coord_deploy_result = ctx._deploy_runtime_phase_after_stage( + resolved_case, + run_dir=run_dir, + phase=coordinator_phase, + ) + runtime_tracking.ts_coord_apply_id = ctx._deploy_result_history_id( + coord_deploy_result, + ctx="TEST_STACK coordinator deploy_result", + ) + ctx._wait_instance_running(resolved_case, instance_id="coordinator", timeout_s=30) + ctx._wait_instance_tcp_ready( + resolved_case, + instance_id="coordinator", + host=coord_host, + port=coord_port, + timeout_s=coordinator_ready_timeout_seconds, + ) + if backend_kind == ctx.TEST_STACK_BACKEND_MOONCAKE: + bench_cfg = ctx._load_test_stack_benchmark_config(run_dir) + run_kv_base = ctx._require_dict(bench_cfg.get("kv_base"), "benchmark_config.CONFIG.kv_base") + run_mooncake_spec = ctx._require_dict( + run_kv_base.get("mooncake_spec"), + "benchmark_config.CONFIG.kv_base.mooncake_spec", + ) + metadata_server = ctx._require_str( + run_mooncake_spec.get("metadata_server"), + "benchmark_config.CONFIG.kv_base.mooncake_spec.metadata_server", + ) + master_server_address = ctx._require_str( + run_mooncake_spec.get("master_server_address"), + "benchmark_config.CONFIG.kv_base.mooncake_spec.master_server_address", + ) + metadata_host_port = urllib.parse.urlparse(metadata_server) + metadata_port = int(metadata_host_port.port or 0) + if metadata_port <= 0: + raise ValueError(f"invalid TEST_STACK Mooncake metadata_server port: {metadata_server!r}") + if ":" not in master_server_address: + raise ValueError(f"invalid TEST_STACK Mooncake master_server_address: {master_server_address!r}") + _, rpc_port_s = master_server_address.rsplit(":", 1) + rpc_port = int(rpc_port_s) + ctx._wait_instance_running( + resolved_case, + instance_id=ctx.TEST_STACK_MOONCAKE_MASTER_INSTANCE_ID, + timeout_s=60, + ) + ctx._wait_instance_tcp_ready( + resolved_case, + instance_id=ctx.TEST_STACK_MOONCAKE_MASTER_INSTANCE_ID, + host=coord_host, + port=rpc_port, + timeout_s=coordinator_ready_timeout_seconds, + ) + ctx._wait_instance_tcp_ready( + resolved_case, + instance_id=ctx.TEST_STACK_MOONCAKE_MASTER_INSTANCE_ID, + host=coord_host, + port=metadata_port, + timeout_s=coordinator_ready_timeout_seconds, + ) + + node_runtime_phase = ctx._require_runtime_phase_by_id( + case_plan.prepare_phases, + phase_id="node_runtime", + ctx="TEST_STACK prepare", + ) + if ctx._test_stack_backend_uses_external_fluxon_kv(backend_kind=backend_kind, mode=mode): + if shared_memory_path is None or shared_file_path is None or stack_cluster_name is None: + raise ValueError( + "internal error: TEST_STACK shared bundle identity is missing after pre-deploy cleanup" + ) + if "master" in set(coordinator_phase.instance_ids): + ctx._wait_instance_running(resolved_case, instance_id="master", timeout_s=60) + bench = ctx._require_dict( + ctx._require_dict(resolved_case.get("scale"), "resolved_case.scale").get("benchmark"), + "scale.benchmark", + ) + cluster_ready_timeout_seconds = ctx._require_int( + bench.get("cluster_ready_timeout_seconds"), + "scale.benchmark.cluster_ready_timeout_seconds", + min_v=1, + ) + for owner_id in owner_instance_ids: + owner_target = ctx._instance_target_name(resolved_case, instance_id=owner_id) + shared_bundle_paths = ctx._test_stack_external_owner_shared_bundle_paths( + resolved_case, + owner_target=owner_target, + ) + ctx._wait_instance_running(resolved_case, instance_id=owner_id, timeout_s=60) + ctx._wait_instance_files_present( + resolved_case, + instance_id=owner_id, + paths=shared_bundle_paths, + timeout_s=int(cluster_ready_timeout_seconds), + ctx="TEST_STACK owner shared bundle", + ) + elif ctx._test_stack_backend_uses_dedicated_kv_owners(backend_kind=backend_kind, mode=mode): + for owner_id in owner_instance_ids: + ctx._wait_instance_running(resolved_case, instance_id=owner_id, timeout_s=60) + ctx._stage_runtime_phase_run_dir(resolved_case, run_dir=run_dir, phase=node_runtime_phase) + ctx._ensure_test_stack_runtime_env_ready_for_instance_ids( + resolved_case, + run_dir=run_dir, + instance_ids=node_runtime_phase.instance_ids, + ) + return ctx._PreparedCase( + plan=case_plan, + test_stack_result_path=Path( + ctx._require_str(test_stack_meta.get("result_path"), "test_stack_meta.result_path") + ), + test_stack_coordinator_addr=coordinator_addr, + test_stack_result_timeout_s=_test_stack_result_timeout_seconds( + max_benchmark_seconds=int(max_secs), + metric_warmup_seconds=float(metric_warmup_seconds), + ), + ) + + +def _execute_ci_case( + *, + ctx: Any, + planned_case: Any, + resolved_case: Dict[str, Any], + run_dir: Path, + run_index: int, + started_at: int, + prepared_case: Any, + runtime_tracking: Any, +) -> Any: + ci_runner_exit_timeout_s = ctx._ci_runner_exit_code_timeout_seconds(resolved_case) + ci_runner_phase = prepared_case.plan.execute_phases[0] + ci_runner_deploy_result = ctx._deploy_runtime_phase( + resolved_case, + run_dir=run_dir, + phase=ci_runner_phase, + ) + ctx._record_ci_apply_id( + runtime_tracking.ci_attempted_instance_ids, + runtime_tracking.ci_apply_ids, + instance_id="ci_runner", + deploy_result=ci_runner_deploy_result, + ctx="CI ci_runner deploy_result", + ) + ctx._wait_ci_instance_ready(resolved_case, instance_id="ci_runner") + rc = ctx._wait_ci_runner_exit_code( + resolved_case=resolved_case, + run_dir=run_dir, + timeout_s=ci_runner_exit_timeout_s, + baseline_state=_require_ci_runner_exit_code_baseline( + prepared_case.ci_runner_exit_code_baseline, + ), + ) + outcome = ctx.RUN_OUTCOME_SUCCESS if rc == 0 else ctx.RUN_OUTCOME_FAILED + if outcome == ctx.RUN_OUTCOME_SUCCESS and runtime_tracking.ci_apply_ids.get("ci_runner") is not None: + ctx._delete_apply_id( + resolved_case, + apply_id=ctx._require_str(runtime_tracking.ci_apply_ids.get("ci_runner"), "CI ci_runner apply_id"), + ctx="CI ci_runner apply", + ) + del runtime_tracking.ci_apply_ids["ci_runner"] + summary = ctx._build_ci_summary_yaml( + resolved_case, + run_index=run_index, + started_at_unix_s=started_at, + finished_at_unix_s=int(time.time()), + outcome=outcome, + counted=False, + ci_out={"rc": rc}, + ) + for phase in prepared_case.plan.collect_phases: + ctx._collect_runtime_phase(resolved_case, run_dir=run_dir, phase=phase) + return ctx._ExecutedCase(outcome=outcome, summary=summary) + + +def _execute_test_stack_case( + *, + ctx: Any, + resolved_case: Dict[str, Any], + run_dir: Path, + run_index: int, + started_at: int, + prepared_case: Any, + runtime_tracking: Any, +) -> Any: + case_obj = ctx._require_dict(resolved_case.get("case"), "resolved_case.case") + case_id = ctx._require_str(case_obj.get("case_id"), "case.case_id") + + outcome = ctx.RUN_OUTCOME_FAILED + error_detail: Optional[str] = None + collect_error_detail: Optional[str] = None + result_obj: Optional[Dict[str, Any]] = None + + try: + node_phase = prepared_case.plan.execute_phases[0] + runtime_tracking.ts_nodes_deploy_attempted = True + node_deploy_result = ctx._deploy_runtime_phase( + resolved_case, + run_dir=run_dir, + phase=node_phase, + ) + runtime_tracking.ts_nodes_apply_id = ctx._deploy_result_history_id( + node_deploy_result, + ctx="TEST_STACK node deploy_result", + ) + + result_path = _require_test_stack_result_path(prepared_case.test_stack_result_path) + timeout_s = _require_test_stack_result_timeout(prepared_case.test_stack_result_timeout_s) + result_obj = _wait_and_load_test_stack_benchmark_result_json( + ctx=ctx, + resolved_case=resolved_case, + result_path=result_path, + timeout_s=timeout_s, + case_id=case_id, + writer_instance_id="coordinator", + ) + + ctx._validate_test_stack_benchmark_result(result_obj, case_id=case_id) + outcome = ctx.RUN_OUTCOME_SUCCESS + except Exception as exc: # noqa: BLE001 + error_detail = f"{type(exc).__name__}: {exc}" + finally: + try: + for phase in prepared_case.plan.collect_phases: + ctx._collect_runtime_phase(resolved_case, run_dir=run_dir, phase=phase) + except Exception as exc: # noqa: BLE001 + collect_error_detail = f"{type(exc).__name__}: {exc}" + + summary = { + "schema_version": ctx.SCHEMA_VERSION, + "case_id": case_id, + "case_key": ctx._require_str(case_obj.get("case_key"), "case.case_key"), + "run_index": int(run_index), + "outcome": outcome, + "counted": False, + "timing": { + "started_at_unix_s": int(started_at), + "finished_at_unix_s": int(time.time()), + }, + "test_stack": { + "coordinator_addr": ctx._require_str( + prepared_case.test_stack_coordinator_addr, + "prepared_case.test_stack_coordinator_addr", + ), + "completion_signal": "benchmark_result_json", + "result_path": str(_require_test_stack_result_path(prepared_case.test_stack_result_path)), + "result": result_obj, + "error": error_detail, + "collect_error": collect_error_detail, + }, + } + return ctx._ExecutedCase(outcome=outcome, summary=summary) + + +def _wait_and_load_test_stack_benchmark_result_json( + *, + ctx: Any, + resolved_case: Dict[str, Any], + result_path: Path, + timeout_s: int, + case_id: str, + writer_instance_id: str, +) -> Dict[str, Any]: + deadline = time.time() + float(timeout_s) + last_err: Optional[str] = None + while True: + try: + raw = ctx._instance_read_text_if_present( + resolved_case, + instance_id=writer_instance_id, + path=result_path, + ) + if raw is None: + raise FileNotFoundError( + f"result file is not present yet: instance_id={writer_instance_id} path={result_path}" + ) + parsed = json.loads(raw) + result_obj = ctx._require_dict(parsed, "test_stack.benchmark_result") + runs = ctx._require_list(result_obj.get("runs"), "benchmark_result.runs") + if not runs: + raise ValueError("benchmark_result.runs is empty") + run0 = ctx._require_dict(runs[0], "benchmark_result.runs[0]") + completion = ctx._require_dict(run0.get("completion"), "benchmark_result.runs[0].completion") + _ = ctx._require_str(completion.get("status"), "benchmark_result.runs[0].completion.status") + if not result_path.exists() or result_path.read_text(encoding="utf-8") != raw: + result_path.write_text(raw, encoding="utf-8") + return result_obj + except Exception as exc: # noqa: BLE001 + last_err = f"{type(exc).__name__}: {exc}" + if time.time() >= deadline: + raise ValueError( + f"benchmark result json did not become readable/valid within timeout: " + f"case_id={case_id} path={result_path} timeout_s={timeout_s} last_err={last_err}" + ) from exc + time.sleep(0.5) + + +def _finalize_case_runtime( + *, + ctx: Any, + resolved_case: Dict[str, Any], + run_dir: Path, + case_plan: Any, + runtime_tracking: Any, + outcome: str, +) -> None: + if case_plan.case_family == ctx.CASE_FAMILY_CI: + _finalize_ci_case_runtime( + ctx=ctx, + resolved_case=resolved_case, + run_dir=run_dir, + runtime_tracking=runtime_tracking, + outcome=outcome, + ) + return + if case_plan.case_family == ctx.CASE_FAMILY_BENCH: + _finalize_test_stack_case_runtime( + ctx=ctx, + resolved_case=resolved_case, + runtime_tracking=runtime_tracking, + outcome=outcome, + ) + return + raise ValueError(f"unsupported case family for finalize_case_runtime: {case_plan.case_family}") + + +def _finalize_ci_case_runtime( + *, + ctx: Any, + resolved_case: Dict[str, Any], + run_dir: Path, + runtime_tracking: Any, + outcome: str, +) -> None: + case = ctx._require_dict(resolved_case.get("case"), "resolved_case.case") + run_mode = ctx._require_str(case.get("run_mode"), "resolved_case.case.run_mode") + ci_preserved_apply_ids: list[dict[str, str]] = [] + tracked_apply_entries = ctx._ci_runtime_tracked_apply_entries(runtime_tracking) + for entry in tracked_apply_entries: + apply_id = ctx._require_str(entry.get("apply_id"), "ci tracked apply entry.apply_id") + instance_ids = ctx._require_list(entry.get("instance_ids"), "ci tracked apply entry.instance_ids") + if not instance_ids: + continue + ci_preserved_apply_ids.append( + { + "instance_ids": [ + ctx._require_str(raw_instance_id, "ci tracked apply entry.instance_ids[]") + for raw_instance_id in instance_ids + ], + "apply_id": apply_id, + } + ) + should_teardown = outcome == ctx.RUN_OUTCOME_SUCCESS or run_mode == ctx.RUN_MODE_FULL_ONCE + if should_teardown: + (run_dir / ctx.CI_PRESERVED_APPLY_IDS_FILENAME).unlink(missing_ok=True) + for entry in reversed(tracked_apply_entries): + apply_id = ctx._require_str(entry.get("apply_id"), "ci tracked apply entry.apply_id") + instance_ids = ctx._require_list(entry.get("instance_ids"), "ci tracked apply entry.instance_ids") + instance_id_text = ",".join( + ctx._require_str(raw_instance_id, "ci tracked apply entry.instance_ids[]") + for raw_instance_id in instance_ids + ) + ctx._delete_apply_id( + resolved_case, + apply_id=apply_id, + ctx=f"CI {instance_id_text} apply", + ) + ctx._ci_cleanup_runtime(resolved_case, timeout_s=120) + return + if not ci_preserved_apply_ids: + return + ctx._write_yaml_file( + run_dir / ctx.CI_PRESERVED_APPLY_IDS_FILENAME, + { + "schema_version": ctx.CI_PRESERVED_APPLY_IDS_SCHEMA_VERSION, + "apply_ids": ci_preserved_apply_ids, + }, + ) + print( + "[CI preserve_runtime] " + "case_id=" + f"{ctx._resolved_case_case_id(resolved_case)} outcome={outcome} apply_ids=" + + ", ".join( + f"{','.join(ctx._require_list(entry.get('instance_ids'), 'ci preserved apply entry.instance_ids'))}={entry['apply_id']}" + for entry in ci_preserved_apply_ids + ) + ) + + +def _finalize_test_stack_case_runtime( + *, + ctx: Any, + resolved_case: Dict[str, Any], + runtime_tracking: Any, + outcome: str, +) -> None: + case = ctx._require_dict(resolved_case.get("case"), "resolved_case.case") + run_mode = ctx._require_str(case.get("run_mode"), "resolved_case.case.run_mode") + ts_preserved_apply_ids: list[str] = [] + if runtime_tracking.ts_nodes_deploy_attempted and runtime_tracking.ts_nodes_apply_id is not None: + ts_preserved_apply_ids.append( + f"nodes={ctx._require_str(runtime_tracking.ts_nodes_apply_id, 'TEST_STACK node apply_id')}" + ) + if runtime_tracking.ts_coord_deploy_attempted and runtime_tracking.ts_coord_apply_id is not None: + ts_preserved_apply_ids.append( + f"coordinator={ctx._require_str(runtime_tracking.ts_coord_apply_id, 'TEST_STACK coordinator apply_id')}" + ) + should_teardown = outcome == ctx.RUN_OUTCOME_SUCCESS or run_mode == ctx.RUN_MODE_FULL_ONCE + if should_teardown: + if runtime_tracking.ts_nodes_deploy_attempted and runtime_tracking.ts_nodes_apply_id is not None: + ctx._delete_apply_id( + resolved_case, + apply_id=ctx._require_str(runtime_tracking.ts_nodes_apply_id, "TEST_STACK node apply_id"), + ctx="TEST_STACK node apply", + ) + if runtime_tracking.ts_coord_deploy_attempted and runtime_tracking.ts_coord_apply_id is not None: + ctx._delete_apply_id( + resolved_case, + apply_id=ctx._require_str(runtime_tracking.ts_coord_apply_id, "TEST_STACK coordinator apply_id"), + ctx="TEST_STACK coordinator apply", + ) + return + if not ts_preserved_apply_ids: + return + print( + "[TEST_STACK preserve_runtime] " + f"case_id={ctx._resolved_case_case_id(resolved_case)} outcome={outcome} apply_ids={', '.join(ts_preserved_apply_ids)}" + ) + + +def _require_ci_runner_exit_code_baseline( + baseline_state: Any, +) -> Any: + return baseline_state + + +def _require_test_stack_result_path(result_path: Optional[Path]) -> Path: + if result_path is None: + raise ValueError("prepared_case.test_stack_result_path is missing") + return result_path + + +def _require_test_stack_result_timeout(timeout_s: Optional[int]) -> int: + if timeout_s is None or timeout_s < 1: + raise ValueError("prepared_case.test_stack_result_timeout_s must be positive") + return int(timeout_s) + + +def _test_stack_result_timeout_seconds( + *, + max_benchmark_seconds: int, + metric_warmup_seconds: float, +) -> int: + if max_benchmark_seconds <= 0: + raise ValueError( + f"max_benchmark_seconds must be > 0, got: {max_benchmark_seconds}" + ) + if metric_warmup_seconds < 0.0: + raise ValueError( + f"metric_warmup_seconds must be >= 0, got: {metric_warmup_seconds}" + ) + return int( + float(max_benchmark_seconds) + + float(metric_warmup_seconds) + + 600.0 + ) diff --git a/fluxon_test_stack/test_runner_ui_runtime.py b/fluxon_test_stack/test_runner_ui_runtime.py new file mode 100644 index 0000000..b9873f0 --- /dev/null +++ b/fluxon_test_stack/test_runner_ui_runtime.py @@ -0,0 +1,196 @@ +from __future__ import annotations + +import datetime +import os +import sys +import threading +import time +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +from gitops import gitops_lib + + +def _resolve_repo_root_cli_path(*, repo_root: Path, raw_path: Path, field_name: str) -> Path: + if raw_path.is_absolute(): + return raw_path.resolve() + resolved = (repo_root / raw_path).resolve() + if not resolved: + raise RuntimeError(f"failed to resolve {field_name} against repo root: raw={raw_path}") + return resolved + + +def _runner_stdio_mirror_enabled() -> bool: + return os.environ.get("GITHUB_ACTIONS", "").strip().lower() == "true" + + +def _ci_log_timestamp_prefix(now: Optional[float] = None) -> str: + ts = datetime.datetime.fromtimestamp( + time.time() if now is None else float(now), + tz=datetime.timezone.utc, + ) + return ts.strftime("[%Y-%m-%d %H:%M:%S UTC]") + + +def _ci_log_prefix_lines(text: str, *, now: Optional[float] = None) -> str: + if not text: + return "" + prefix = _ci_log_timestamp_prefix(now) + lines = text.splitlines(keepends=True) + return "".join(f"{prefix} {line}" if line.strip() else line for line in lines) + + +def _start_runner_stdio_log_mirror(*, log_path: Path, stdout_fd: int) -> threading.Thread: + def _mirror_loop() -> None: + offset = 0 + while True: + try: + if log_path.exists(): + size = log_path.stat().st_size + if size < offset: + offset = 0 + if size > offset: + with log_path.open("r", encoding="utf-8", errors="replace") as fp: + fp.seek(offset) + chunk = fp.read() + offset = fp.tell() + if chunk: + data = _ci_log_prefix_lines(chunk).encode("utf-8", errors="replace") + if stdout_fd >= 0: + try: + os.write(stdout_fd, data) + except OSError: + pass + time.sleep(0.2) + except Exception: + time.sleep(0.5) + + mirror = threading.Thread( + target=_mirror_loop, + name="test-runner-stdio-log-mirror", + daemon=True, + ) + mirror.start() + return mirror + + +def _redirect_process_stdio_to_log( + *, + workdir_root: Path, + runner_stdio_log_filename: str, + stdio_log_fp: Optional[Any], + stdio_keepalive_fds: Optional[Tuple[int, int]], + start_mirror, +) -> Tuple[Any, Optional[Tuple[int, int]]]: + """Route runner stdio to a stable workdir log so long suites survive PTY loss.""" + if stdio_log_fp is not None: + return stdio_log_fp, stdio_keepalive_fds + + log_path = (workdir_root / runner_stdio_log_filename).resolve() + log_fp = log_path.open("a", encoding="utf-8", buffering=1) + banner = ( + f"{_ci_log_timestamp_prefix()} [test_runner] redirecting process stdio to stable log: {log_path}\n" + ) + try: + sys.stdout.write(banner) + sys.stdout.flush() + except OSError: + pass + + try: + sys.stdout.flush() + except OSError: + pass + try: + sys.stderr.flush() + except OSError: + pass + + if stdio_keepalive_fds is None: + try: + out_fd = os.dup(sys.stdout.fileno()) + err_fd = os.dup(sys.stderr.fileno()) + os.set_inheritable(out_fd, False) + os.set_inheritable(err_fd, False) + stdio_keepalive_fds = (out_fd, err_fd) + except OSError: + stdio_keepalive_fds = (-1, -1) + + os.dup2(log_fp.fileno(), sys.stdout.fileno()) + os.dup2(log_fp.fileno(), sys.stderr.fileno()) + sys.stdout = os.fdopen(sys.stdout.fileno(), "w", encoding="utf-8", buffering=1, closefd=False) + sys.stderr = os.fdopen(sys.stderr.fileno(), "w", encoding="utf-8", buffering=1, closefd=False) + if _runner_stdio_mirror_enabled(): + keepalive = stdio_keepalive_fds or (-1, -1) + start_mirror( + log_path=log_path, + stdout_fd=int(keepalive[0]), + ) + return log_fp, stdio_keepalive_fds + + +def _resolve_history_roots_cli_paths(*, repo_root: Path, raw_paths: List[str]) -> List[Path]: + return [ + _resolve_repo_root_cli_path(repo_root=repo_root, raw_path=Path(path), field_name="history_root") + for path in raw_paths + ] + + +def _load_gitops_ctx_for_ui( + *, + workdir_root: Path, + gitops_config_path: Optional[Path], +) -> Optional[gitops_lib.GitOpsContext]: + if gitops_config_path is None: + return None + gitops_workdir = gitops_lib.default_runtime_root(workdir_root) + gitops_ctx = gitops_lib.load_context( + config_path=gitops_config_path, + workdir=gitops_workdir, + ) + gitops_desc = gitops_lib.describe_context(gitops_ctx) + print( + "INFO: test_runner GitOps integrated: " + f"config={gitops_desc['config_path']} workdir={gitops_desc['workdir']} interval={gitops_desc['interval']}s repos={gitops_desc['repo_count']}", + flush=True, + ) + threading.Thread( + target=gitops_lib.poll_forever, + args=(gitops_ctx,), + kwargs={"stop_event": None}, + daemon=True, + ).start() + return gitops_ctx + + +def run_ui_service( + *, + workdir_root: Path, + host: str, + port: int, + lookback_days: int, + extra_history_roots: Optional[List[Path]], + gitops_config_path: Optional[Path], + acquire_ui_service_lock, + serve_test_runner_ui, +) -> None: + workdir_root = workdir_root.resolve() + if workdir_root.exists(): + if not workdir_root.is_dir(): + raise ValueError(f"ui workdir is not a directory: {workdir_root}") + else: + workdir_root.mkdir(parents=True, exist_ok=True) + ui_lock = acquire_ui_service_lock(workdir_root=workdir_root) + _ = ui_lock + gitops_ctx = _load_gitops_ctx_for_ui( + workdir_root=workdir_root, + gitops_config_path=gitops_config_path, + ) + serve_test_runner_ui( + workdir_root=workdir_root, + host=str(host), + port=int(port), + lookback_days=int(lookback_days), + extra_history_roots=extra_history_roots, + gitops_ctx=gitops_ctx, + ) diff --git a/fluxon_test_stack/tests/test_ci_2_virt_node_contract.py b/fluxon_test_stack/tests/test_ci_2_virt_node_contract.py index 667c00b..4392be6 100644 --- a/fluxon_test_stack/tests/test_ci_2_virt_node_contract.py +++ b/fluxon_test_stack/tests/test_ci_2_virt_node_contract.py @@ -29,6 +29,7 @@ def _load_module(): class TestCi2VirtNodeContract(unittest.TestCase): _KVTEST_SCENE_ID = "ci_top_attention_bin_kvtest" _DOC_SCENE_ID = "ci_top_attention_doc_page_build" + _MQ_SCENE_ID = "ci_top_attention_mq_core" def test_generated_suite_is_public_dual_local_nodes_ci_only(self) -> None: suite_cfg = _ENTRY._load_yaml_mapping(_ENTRY.DEFAULT_SUITE_PATH, ctx="suite") @@ -119,6 +120,34 @@ def test_generated_suite_is_public_dual_local_nodes_ci_only(self) -> None: ) self.assertNotIn("commands", generated["scenes"][self._KVTEST_SCENE_ID]["ci"]) + def test_generated_suite_supports_mq_core_ci_scene(self) -> None: + suite_cfg = _ENTRY._load_yaml_mapping(_ENTRY.DEFAULT_SUITE_PATH, ctx="suite") + generated = _ENTRY._rewrite_suite_for_local_dual_nodes( + suite_cfg=suite_cfg, + scene_ids=[self._MQ_SCENE_ID], + primary_node_name="local-node-a", + secondary_node_name="local-node-b", + host_ip="10.1.1.119", + wheel_name="fluxon-0.2.1-cp38-abi3-manylinux_2_28_x86_64.whl", + controller_port=19080, + ) + + self.assertEqual(set(generated["scenes"].keys()), {self._MQ_SCENE_ID}) + self.assertEqual( + generated["scenes"][self._MQ_SCENE_ID]["ci"]["runtime_contract"], + "cluster_kv_owner", + ) + self.assertEqual( + generated["scenes"][self._MQ_SCENE_ID]["ci"]["subject"], + "mq", + ) + self.assertNotIn("commands", generated["scenes"][self._MQ_SCENE_ID]["ci"]) + self.assertEqual( + generated["scenes"][self._MQ_SCENE_ID]["select"]["scales"], + ["n1_kvowner_dram_20gib"], + ) + self.assertEqual(set(generated["scales"].keys()), {"n1_kvowner_dram_20gib"}) + def test_generated_suite_preserves_source_scene_configs(self) -> None: suite_cfg = _ENTRY._load_yaml_mapping(_ENTRY.DEFAULT_SUITE_PATH, ctx="suite") suite_cfg["profiles"]["fluxon_tcp"]["runtime"]["ci"]["scene_configs"][self._KVTEST_SCENE_ID]["kv_test_rounds"] = "p2p_only" @@ -412,7 +441,7 @@ def test_main_supports_explicit_suite_path(self) -> None: suite_cfg["scenes"] = { key: value for key, value in suite_cfg["scenes"].items() - if key in (self._DOC_SCENE_ID, self._KVTEST_SCENE_ID) + if key in (self._DOC_SCENE_ID, self._KVTEST_SCENE_ID, self._MQ_SCENE_ID) } suite_cfg["profiles"] = {"fluxon_tcp": suite_cfg["profiles"]["fluxon_tcp"]} suite_cfg["run"]["selectors"]["profile_ids"] = ["fluxon_tcp"] @@ -456,7 +485,10 @@ def test_main_supports_explicit_suite_path(self) -> None: workdir / "generated" / "ci_test_list.local.yaml", ctx="generated suite", ) - self.assertEqual(set(generated_suite["scenes"].keys()), {self._DOC_SCENE_ID, self._KVTEST_SCENE_ID}) + self.assertEqual( + set(generated_suite["scenes"].keys()), + {self._DOC_SCENE_ID, self._KVTEST_SCENE_ID, self._MQ_SCENE_ID}, + ) self.assertEqual( generated_suite["profiles"]["fluxon_tcp_thread"]["runtime"]["ci"]["scene_configs"][self._KVTEST_SCENE_ID][ "kv_test_rounds" @@ -469,6 +501,10 @@ def test_main_supports_explicit_suite_path(self) -> None: ], "tele-ai.github.io/Fluxon", ) + self.assertEqual( + generated_suite["profiles"]["fluxon_tcp_thread"]["runtime"]["ci"]["scene_configs"][self._MQ_SCENE_ID], + {}, + ) def test_main_same_host_generated_configs_use_non_loopback_host_ip(self) -> None: with tempfile.TemporaryDirectory() as td: diff --git a/fluxon_test_stack/tests/test_runner_contract.py b/fluxon_test_stack/tests/test_runner_contract.py index e4948cb..7c8fddd 100644 --- a/fluxon_test_stack/tests/test_runner_contract.py +++ b/fluxon_test_stack/tests/test_runner_contract.py @@ -59,6 +59,10 @@ def _build_checks(selected_test_id: Optional[str]) -> List[Tuple[str, Callable[[ "ci_top_attention_doc_page_build_uses_online_docker_image", test_ci_top_attention_doc_page_build_uses_online_docker_image, ), + ( + "ci_top_attention_mq_core_uses_cluster_kv_owner_runtime", + test_ci_top_attention_mq_core_uses_cluster_kv_owner_runtime, + ), ] if selected_test_id is None: return checks @@ -150,19 +154,6 @@ def test_suite_requires_benchmark_bundle_only_for_bench_cases() -> None: return suite_for_contract = copy.deepcopy(suite_cfg) - artifact_sets = suite_for_contract.get("artifact_sets") - if not isinstance(artifact_sets, dict): - print("FAIL: test_suite_requires_benchmark_bundle_only_for_bench_cases - artifact_sets is not a mapping") - return - for artifact_set in artifact_sets.values(): - if not isinstance(artifact_set, dict): - continue - release_artifacts = artifact_set.get("release_artifacts") - if isinstance(release_artifacts, dict): - python_wheel = release_artifacts.get("python_wheel") - if isinstance(python_wheel, str) and python_wheel.strip(): - artifact_set["release_artifacts"] = {"wheel": python_wheel} - suite_with_bench = _TEST_RUNNER._parse_suite_config(copy.deepcopy(suite_for_contract)) resolved_with_bench = _TEST_RUNNER._expand_cases(suite_with_bench) if not _TEST_RUNNER._suite_requires_benchmark_bundle( @@ -208,19 +199,6 @@ def test_ci_top_attention_doc_page_build_uses_online_docker_image() -> None: return suite_for_contract = copy.deepcopy(suite_cfg) - artifact_sets = suite_for_contract.get("artifact_sets") - if not isinstance(artifact_sets, dict): - print("FAIL: test_ci_top_attention_doc_page_build_uses_online_docker_image - artifact_sets is not a mapping") - return - for artifact_set in artifact_sets.values(): - if not isinstance(artifact_set, dict): - continue - release_artifacts = artifact_set.get("release_artifacts") - if isinstance(release_artifacts, dict): - python_wheel = release_artifacts.get("python_wheel") - if isinstance(python_wheel, str) and python_wheel.strip(): - artifact_set["release_artifacts"] = {"wheel": python_wheel} - suite = _TEST_RUNNER._parse_suite_config(suite_for_contract) cases = _TEST_RUNNER._expand_cases(suite) case = next( @@ -259,5 +237,59 @@ def test_ci_top_attention_doc_page_build_uses_online_docker_image() -> None: print("PASS: test_ci_top_attention_doc_page_build_uses_online_docker_image") +def test_ci_top_attention_mq_core_uses_cluster_kv_owner_runtime() -> None: + repo_root = Path(__file__).resolve().parents[2] + suite_cfg_path = repo_root / "fluxon_test_stack" / "ci_test_list.yaml" + suite_cfg = yaml.safe_load(suite_cfg_path.read_text(encoding="utf-8")) + if not isinstance(suite_cfg, dict): + print("FAIL: test_ci_top_attention_mq_core_uses_cluster_kv_owner_runtime - suite config is not a mapping") + return + + suite_for_contract = copy.deepcopy(suite_cfg) + suite = _TEST_RUNNER._parse_suite_config(suite_for_contract) + cases = _TEST_RUNNER._expand_cases(suite) + case = next( + ( + item + for item in cases + if item.scene_id == "ci_top_attention_mq_core" + and item.profile_id == "fluxon_tcp" + ), + None, + ) + if case is None: + print("FAIL: test_ci_top_attention_mq_core_uses_cluster_kv_owner_runtime - missing mq core case") + return + planned = _TEST_RUNNER._build_ci_execution_plan(case, suite) + if len(planned) != 1: + print( + "FAIL: test_ci_top_attention_mq_core_uses_cluster_kv_owner_runtime - " + f"expected one planned case, got {len(planned)}" + ) + return + commands = planned[0].ci_commands + if len(commands) != 1: + print( + "FAIL: test_ci_top_attention_mq_core_uses_cluster_kv_owner_runtime - " + f"expected one command, got {len(commands)}" + ) + return + command = commands[0] + if command.get("id") != "top_attention_mq_core": + print( + "FAIL: test_ci_top_attention_mq_core_uses_cluster_kv_owner_runtime - " + f"unexpected command id: {command.get('id')!r}" + ) + return + command_text = command.get("command") + if not isinstance(command_text, str) or "_mq_core.py --case-config __RUN_DIR__/configs/ci_scene_config.yaml" not in command_text: + print( + "FAIL: test_ci_top_attention_mq_core_uses_cluster_kv_owner_runtime - " + f"unexpected command: {command_text!r}" + ) + return + print("PASS: test_ci_top_attention_mq_core_uses_cluster_kv_owner_runtime") + + if __name__ == "__main__": raise SystemExit(main()) diff --git a/fluxon_test_stack/tests/test_test_runner_testbed_contract.py b/fluxon_test_stack/tests/test_test_runner_testbed_contract.py index 6910acd..34bf640 100644 --- a/fluxon_test_stack/tests/test_test_runner_testbed_contract.py +++ b/fluxon_test_stack/tests/test_test_runner_testbed_contract.py @@ -38,6 +38,136 @@ def _load_module(): class TestTestRunnerTestbedContract(unittest.TestCase): + def test_ci_runtime_python_executable_requires_python310_on_path(self) -> None: + with mock.patch.object(_RUNNER.shutil, "which", return_value=None): + with self.assertRaisesRegex(ValueError, "requires python3.10 on PATH"): + _RUNNER._ci_runtime_python_executable() + + def test_create_ci_runtime_venv_uses_python310(self) -> None: + with tempfile.TemporaryDirectory() as td: + run_dir = Path(td) + venv_dir = (run_dir / "venv").resolve() + expected_venv_python = (venv_dir / "bin" / "python3").resolve() + + def _fake_create_venv(argv: list[str], *, cwd: str) -> None: + self.assertEqual( + argv, + ["/usr/bin/python3.10", "-m", "venv", str(venv_dir)], + ) + self.assertEqual(cwd, str(run_dir)) + expected_venv_python.parent.mkdir(parents=True, exist_ok=True) + expected_venv_python.write_text("#!/bin/sh\n", encoding="utf-8") + + with mock.patch.object(_RUNNER.shutil, "which", return_value="/usr/bin/python3.10"): + with mock.patch.object(_RUNNER, "_run_subprocess", side_effect=_fake_create_venv) as run_subprocess_mock: + with mock.patch.object(_RUNNER, "_assert_ci_runtime_python_abi") as assert_python_abi: + venv_python = _RUNNER._create_ci_runtime_venv(run_dir=run_dir) + + self.assertEqual(venv_python, expected_venv_python) + run_subprocess_mock.assert_called_once() + assert_python_abi.assert_called_once_with(venv_python=expected_venv_python) + + def test_assert_ci_runtime_python_abi_accepts_python310_venv(self) -> None: + with mock.patch.object(_RUNNER.subprocess, "check_output", return_value="cpython3.10\n") as check_output_mock: + _RUNNER._assert_ci_runtime_python_abi(venv_python=Path("/tmp/venv/bin/python3")) + + check_output_mock.assert_called_once() + + def test_assert_ci_runtime_python_abi_rejects_non_python310_venv(self) -> None: + with mock.patch.object(_RUNNER.subprocess, "check_output", return_value="cpython3.11\n"): + with self.assertRaisesRegex(ValueError, "must match the prepared offline wheelhouse"): + _RUNNER._assert_ci_runtime_python_abi(venv_python=Path("/tmp/venv/bin/python3")) + + def test_ci_runtime_tracked_apply_entries_groups_shared_apply_id(self) -> None: + tracking = _RUNNER._CaseRuntimeTracking( + ci_attempted_instance_ids=["master", "owner_0", "ci_runner"], + ci_apply_ids={ + "master": "apply-cluster", + "owner_0": "apply-cluster", + "ci_runner": "apply-runner", + }, + ) + + entries = _RUNNER._ci_runtime_tracked_apply_entries(tracking) + + self.assertEqual( + entries, + [ + {"apply_id": "apply-cluster", "instance_ids": ["master", "owner_0"]}, + {"apply_id": "apply-runner", "instance_ids": ["ci_runner"]}, + ], + ) + + def test_finalize_ci_case_runtime_deletes_each_apply_id_once(self) -> None: + with tempfile.TemporaryDirectory() as td: + run_dir = Path(td) + tracking = _RUNNER._CaseRuntimeTracking( + ci_attempted_instance_ids=["master", "owner_0", "ci_runner"], + ci_apply_ids={ + "master": "apply-cluster", + "owner_0": "apply-cluster", + "ci_runner": "apply-runner", + }, + ) + resolved_case = { + "case": { + "run_mode": _RUNNER.RUN_MODE_FULL_ONCE, + "case_id": "ci_top_attention_mq_core__n1_kvowner_dram_20gib__fluxon_tcp_thread", + } + } + + with mock.patch.object(_RUNNER, "_delete_apply_id") as delete_apply: + with mock.patch.object(_RUNNER, "_ci_cleanup_runtime") as cleanup_runtime: + _RUNNER._finalize_ci_case_runtime( + resolved_case, + run_dir=run_dir, + runtime_tracking=tracking, + outcome=_RUNNER.RUN_OUTCOME_SUCCESS, + ) + + self.assertEqual( + [call.kwargs["apply_id"] for call in delete_apply.call_args_list], + ["apply-runner", "apply-cluster"], + ) + cleanup_runtime.assert_called_once_with(resolved_case, timeout_s=120) + + def test_finalize_ci_case_runtime_preserves_structured_instance_ids(self) -> None: + with tempfile.TemporaryDirectory() as td: + run_dir = Path(td) + tracking = _RUNNER._CaseRuntimeTracking( + ci_attempted_instance_ids=["master", "owner_0", "ci_runner"], + ci_apply_ids={ + "master": "apply-cluster", + "owner_0": "apply-cluster", + "ci_runner": "apply-runner", + }, + ) + resolved_case = { + "case": { + "run_mode": _RUNNER.RUN_MODE_DEBUG_ONE_BY_ONE, + "case_id": "ci_top_attention_mq_core__n1_kvowner_dram_20gib__fluxon_tcp_thread", + } + } + + _RUNNER._finalize_ci_case_runtime( + resolved_case, + run_dir=run_dir, + runtime_tracking=tracking, + outcome=_RUNNER.RUN_OUTCOME_FAILED, + ) + + payload = yaml.safe_load((run_dir / _RUNNER.CI_PRESERVED_APPLY_IDS_FILENAME).read_text(encoding="utf-8")) + self.assertEqual( + payload, + { + "schema_version": _RUNNER.CI_PRESERVED_APPLY_IDS_SCHEMA_VERSION, + "apply_ids": [ + {"instance_ids": ["master", "owner_0"], "apply_id": "apply-cluster"}, + {"instance_ids": ["ci_runner"], "apply_id": "apply-runner"}, + ], + }, + ) + def test_write_ci_scene_config_yaml_emits_structured_scene_config(self) -> None: with tempfile.TemporaryDirectory() as td: run_dir = Path(td) @@ -86,16 +216,6 @@ def test_ci_source_overlay_includes_fluxon_test_stack(self) -> None: def test_top_attention_ci_execution_plan_is_runner_native(self) -> None: suite_cfg = yaml.safe_load((_RUNNER.RUNNER_REPO_ROOT / "fluxon_test_stack" / "ci_test_list.yaml").read_text(encoding="utf-8")) - artifact_sets = suite_cfg.get("artifact_sets") - if isinstance(artifact_sets, dict): - for artifact_set in artifact_sets.values(): - if not isinstance(artifact_set, dict): - continue - release_artifacts = artifact_set.get("release_artifacts") - if isinstance(release_artifacts, dict): - python_wheel = release_artifacts.get("python_wheel") - if isinstance(python_wheel, str) and python_wheel.strip(): - artifact_set["release_artifacts"] = {"wheel": python_wheel} suite = _RUNNER._parse_suite_config(suite_cfg) cases = _RUNNER._expand_cases(suite) case = next(item for item in cases if item.scene_id == "ci_top_attention_bin_kvtest" and item.profile_id == "fluxon_tcp") @@ -104,18 +224,22 @@ def test_top_attention_ci_execution_plan_is_runner_native(self) -> None: self.assertEqual(planned[0].ci_commands[0]["id"], "top_attention_bin_kvtest") self.assertIn("--case-config __RUN_DIR__/configs/ci_scene_config.yaml", planned[0].ci_commands[0]["command"]) + def test_top_attention_mq_core_ci_execution_plan_is_runner_native(self) -> None: + suite_cfg = yaml.safe_load((_RUNNER.RUNNER_REPO_ROOT / "fluxon_test_stack" / "ci_test_list.yaml").read_text(encoding="utf-8")) + suite = _RUNNER._parse_suite_config(suite_cfg) + cases = _RUNNER._expand_cases(suite) + case = next(item for item in cases if item.scene_id == "ci_top_attention_mq_core" and item.profile_id == "fluxon_tcp") + planned = _RUNNER._build_ci_execution_plan(case, suite) + self.assertEqual(len(planned), 1) + self.assertEqual(planned[0].ci_commands[0]["id"], "top_attention_mq_core") + self.assertIn( + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_mq_core.py", + planned[0].ci_commands[0]["command"], + ) + self.assertIn("--case-config __RUN_DIR__/configs/ci_scene_config.yaml", planned[0].ci_commands[0]["command"]) + def test_doc_page_ci_execution_plan_uses_online_docker_image(self) -> None: suite_cfg = yaml.safe_load((_RUNNER.RUNNER_REPO_ROOT / "fluxon_test_stack" / "ci_test_list.yaml").read_text(encoding="utf-8")) - artifact_sets = suite_cfg.get("artifact_sets") - if isinstance(artifact_sets, dict): - for artifact_set in artifact_sets.values(): - if not isinstance(artifact_set, dict): - continue - release_artifacts = artifact_set.get("release_artifacts") - if isinstance(release_artifacts, dict): - python_wheel = release_artifacts.get("python_wheel") - if isinstance(python_wheel, str) and python_wheel.strip(): - artifact_set["release_artifacts"] = {"wheel": python_wheel} suite = _RUNNER._parse_suite_config(suite_cfg) cases = _RUNNER._expand_cases(suite) case = next(item for item in cases if item.scene_id == "ci_top_attention_doc_page_build" and item.profile_id == "fluxon_tcp") @@ -138,6 +262,21 @@ def test_ci_prepare_run_inputs_rebuilds_release_view_without_reusing_source_test source_root = root / "source_root" source_root.mkdir() (source_root / "README.md").write_text("repo\n", encoding="utf-8") + source_test_cfg = source_root / "fluxon_py" / "tests" / "test_config.yaml" + source_test_cfg.parent.mkdir(parents=True, exist_ok=True) + source_test_cfg.write_text( + "\n".join( + [ + "kv_svc_type: fluxon", + "etcd_address: 127.0.0.1:2379", + "cluster_name: fluxon-example-cluster", + "shared_memory_path: /tmp/fluxon-example-cluster/shm", + "shared_file_path: /tmp/fluxon-example-cluster/share", + "", + ] + ), + encoding="utf-8", + ) release_root = root / "release_root" release_root.mkdir() @@ -152,6 +291,23 @@ def test_ci_prepare_run_inputs_rebuilds_release_view_without_reusing_source_test test_rsc_root = root / "test_rsc_root" test_rsc_root.mkdir() (test_rsc_root / "from_case.txt").write_text("case\n", encoding="utf-8") + (test_rsc_root / "prepare.yaml").write_text( + "\n".join( + [ + "python_runtime:", + " dependency_sets:", + " base:", + " requirements:", + " - pinned: pytest==8.3.5", + " source: wheel", + "", + ] + ), + encoding="utf-8", + ) + wheelhouse_root = test_rsc_root / "python_runtime" / "cpython3.10" / "wheels" + wheelhouse_root.mkdir(parents=True, exist_ok=True) + (wheelhouse_root / "pytest-8.3.5-py3-none-any.whl").write_text("wheel\n", encoding="utf-8") ci_src_archive_path = test_rsc_root / "src_ci.tar.gz" with tarfile.open(ci_src_archive_path, "w:gz") as tf: @@ -168,6 +324,10 @@ def test_ci_prepare_run_inputs_rebuilds_release_view_without_reusing_source_test ) test_rsc_manifest = { "src_ci.tar.gz": _RUNNER._sha256_file(ci_src_archive_path), + "prepare.yaml": _RUNNER._sha256_file(test_rsc_root / "prepare.yaml"), + "python_runtime/cpython3.10/wheels/pytest-8.3.5-py3-none-any.whl": _RUNNER._sha256_file( + wheelhouse_root / "pytest-8.3.5-py3-none-any.whl" + ), } (test_rsc_root / "fluxon_test_rsc.sha256").write_text( "".join(f"{digest} {name}\n" for name, digest in test_rsc_manifest.items()), @@ -180,6 +340,45 @@ def test_ci_prepare_run_inputs_rebuilds_release_view_without_reusing_source_test venv_python = run_dir / "venv" / "bin" / "python3" venv_python.parent.mkdir(parents=True, exist_ok=True) venv_python.write_text("#!/bin/sh\n", encoding="utf-8") + testbed_bundle_root = root / "testbed_bundle" + testbed_bundle_root.mkdir() + start_cfg = testbed_bundle_root / "start_test_bed.runner.yaml" + deployconf_path = testbed_bundle_root / "deployconf.yaml" + start_cfg.write_text( + "\n".join( + [ + "schema_version: 6", + "deployconf_path: ./deployconf.yaml", + "controller_url: http://127.0.0.1:19080/r/ops/fluxon_testbed", + "controller_basic_auth:", + " username: ops_admin", + " password: ops_password", + "", + ] + ), + encoding="utf-8", + ) + deployconf_path.write_text( + "\n".join( + [ + "global_envs:", + " FLUXON_CLUSTER_NAME: fluxon_testbed", + " FLUXON_SHARED_MEM: ${HOSTWORKDIR}/shm1", + " FLUXON_SHARED_MEM2: ${HOSTWORKDIR}/shm2_files", + "cluster_nodes:", + " - hostname: logic-a", + " ip: 127.0.0.1", + " hostworkdir: /tmp/fluxon_testbed/a", + " execution_mode: local", + "service:", + " ops_controller:", + " node_bind:", + " node: [logic-a]", + "", + ] + ), + encoding="utf-8", + ) resolved_case = { "artifact_set": { @@ -191,17 +390,24 @@ def test_ci_prepare_run_inputs_rebuilds_release_view_without_reusing_source_test } } - with mock.patch.object(_RUNNER, "_run_subprocess") as run_subprocess_mock: - _RUNNER._ci_prepare_run_inputs( - resolved_case=resolved_case, - source_root=source_root, - release_root=release_root, - test_rsc_root=test_rsc_root, - src_root=src_root, - venv_python=venv_python, - ci_commands=None, - overlay_live_checkout=False, - ) + env = {**os.environ, _RUNNER.TEST_STACK_START_TEST_BED_CONFIG_ENV: str(start_cfg)} + with mock.patch.dict(os.environ, env, clear=True): + with mock.patch.object(_RUNNER, "_assert_ci_runtime_python_abi") as assert_python_abi: + with mock.patch.object(_RUNNER, "_run_subprocess") as run_subprocess_mock: + _RUNNER._ci_prepare_run_inputs( + resolved_case=resolved_case, + source_root=source_root, + release_root=release_root, + test_rsc_root=test_rsc_root, + src_root=src_root, + venv_python=venv_python, + ci_commands=None, + overlay_live_checkout=False, + etcd_address="127.0.0.1:32579", + cluster_name="ci_case_cluster", + shared_memory_path="/tmp/ci_case_cluster/shm", + shared_file_path="/tmp/ci_case_cluster/share", + ) release_view_root = src_root / "fluxon_release" self.assertTrue(release_view_root.is_dir()) @@ -212,7 +418,49 @@ def test_ci_prepare_run_inputs_rebuilds_release_view_without_reusing_source_test self.assertFalse((release_view_root / "from_release.txt").exists()) self.assertTrue((release_view_root / "test_rsc" / "from_case.txt").exists()) self.assertTrue((src_root / "payload.txt").is_file()) - run_subprocess_mock.assert_called_once() + rendered_test_cfg = yaml.safe_load((src_root / "fluxon_py" / "tests" / "test_config.yaml").read_text(encoding="utf-8")) + self.assertEqual( + rendered_test_cfg, + { + "kv_svc_type": "fluxon", + "etcd_address": "127.0.0.1:32579", + "cluster_name": "ci_case_cluster", + "shared_memory_path": "/tmp/ci_case_cluster/shm", + "shared_file_path": "/tmp/ci_case_cluster/share", + }, + ) + assert_python_abi.assert_called_once_with(venv_python=venv_python) + self.assertEqual(run_subprocess_mock.call_count, 2) + first_call = run_subprocess_mock.call_args_list[0] + second_call = run_subprocess_mock.call_args_list[1] + self.assertEqual( + first_call.kwargs["cwd"], + str(src_root), + ) + self.assertEqual( + first_call.args[0], + [ + str(venv_python), + "-m", + "pip", + "install", + "--no-index", + "--find-links", + str(wheelhouse_root), + "pytest==8.3.5", + ], + ) + self.assertEqual( + second_call.args[0], + [ + str(venv_python), + "-m", + "pip", + "install", + "--force-reinstall", + str(release_root / wheel_name), + ], + ) def test_ci_runner_script_sources_prepare_env_when_present(self) -> None: with tempfile.TemporaryDirectory() as td: diff --git a/fluxon_test_stack/tests/test_top_attention_mq_core_contract.py b/fluxon_test_stack/tests/test_top_attention_mq_core_contract.py new file mode 100644 index 0000000..3b0925f --- /dev/null +++ b/fluxon_test_stack/tests/test_top_attention_mq_core_contract.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import importlib.util +import sys +import tempfile +import unittest +from pathlib import Path +from unittest import mock + +import yaml + + +REPO_ROOT = Path(__file__).resolve().parents[2] +MODULE_PATH = REPO_ROOT / "fluxon_test_stack" / "top_attention_test_index" / "_mq_core.py" + + +def _load_module(): + module_dir = MODULE_PATH.parent + sys.path.insert(0, str(module_dir)) + try: + spec = importlib.util.spec_from_file_location("fluxon_test_stack_top_attention_mq_core_contract", MODULE_PATH) + assert spec is not None and spec.loader is not None + mod = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = mod + spec.loader.exec_module(mod) + return mod + finally: + if sys.path and sys.path[0] == str(module_dir): + sys.path.pop(0) + + +_ENTRY = _load_module() + + +class TestTopAttentionMqCoreContract(unittest.TestCase): + def test_main_accepts_case_config_and_runs_mq_scripts_in_order(self) -> None: + with tempfile.TemporaryDirectory() as td: + run_dir = Path(td) + cfg_dir = run_dir / "configs" + cfg_dir.mkdir(parents=True) + case_cfg = cfg_dir / "ci_scene_config.yaml" + case_cfg.write_text( + yaml.safe_dump( + { + "case": { + "scene_id": "ci_top_attention_mq_core", + "scale_id": "n1_kvowner_dram_20gib", + "profile_id": "fluxon_tcp_thread", + "case_id": "ci_top_attention_mq_core__n1_kvowner_dram_20gib__fluxon_tcp_thread", + }, + "scene_config": {}, + "scene_runtime": { + "etcd": {"ip": "127.0.0.1", "port": 19180}, + }, + }, + sort_keys=False, + ), + encoding="utf-8", + ) + + with mock.patch.object(_ENTRY, "call", side_effect=[0, 0]) as call: + with mock.patch.object( + sys, + "argv", + [str(MODULE_PATH), "--python", "/tmp/venv/bin/python3", "--case-config", str(case_cfg)], + ): + rc = _ENTRY.main() + + self.assertEqual(rc, 0) + self.assertEqual(call.call_count, 2) + self.assertEqual( + call.call_args_list[0].args[0], + [ + "/tmp/venv/bin/python3", + "-u", + str(REPO_ROOT / "fluxon_py/tests/test_mq/test_capacity_and_auto_clean.py"), + ], + ) + self.assertEqual( + call.call_args_list[1].args[0], + [ + "/tmp/venv/bin/python3", + "-u", + str(REPO_ROOT / "fluxon_py/tests/test_mq/test_payload_lease_error.py"), + ], + ) + + def test_main_without_case_config_runs_scripts_without_extra_args(self) -> None: + with mock.patch.object(_ENTRY, "call", side_effect=[0, 0]) as call: + with mock.patch.object(sys, "argv", [str(MODULE_PATH), "--python", "/tmp/venv/bin/python3"]): + rc = _ENTRY.main() + + self.assertEqual( + call.call_args_list[0].args[0], + [ + "/tmp/venv/bin/python3", + "-u", + str(REPO_ROOT / "fluxon_py/tests/test_mq/test_capacity_and_auto_clean.py"), + ], + ) + self.assertEqual( + call.call_args_list[1].args[0], + [ + "/tmp/venv/bin/python3", + "-u", + str(REPO_ROOT / "fluxon_py/tests/test_mq/test_payload_lease_error.py"), + ], + ) + self.assertEqual(rc, 0) + + def test_main_returns_first_non_zero_script_exit_code(self) -> None: + with mock.patch.object(_ENTRY, "call", side_effect=[7]) as call: + with mock.patch.object(sys, "argv", [str(MODULE_PATH), "--python", "/tmp/venv/bin/python3"]): + rc = _ENTRY.main() + + self.assertEqual(rc, 7) + self.assertEqual(call.call_count, 1) + + def test_main_rejects_pytest_style_passthrough_flags(self) -> None: + with mock.patch.object(sys, "argv", [str(MODULE_PATH), "--python", "/tmp/venv/bin/python3", "-k", "payload"]): + with self.assertRaises(SystemExit) as cm: + _ENTRY.main() + + self.assertEqual(cm.exception.code, 2) + + +if __name__ == "__main__": + raise SystemExit(unittest.main()) diff --git a/fluxon_test_stack/top_attention_test_index/_mq_core.py b/fluxon_test_stack/top_attention_test_index/_mq_core.py index 4da1b14..12a61cc 100755 --- a/fluxon_test_stack/top_attention_test_index/_mq_core.py +++ b/fluxon_test_stack/top_attention_test_index/_mq_core.py @@ -1,20 +1,43 @@ #!/usr/bin/env python3 from __future__ import annotations -from _common import run_pytest +import argparse +import os +import sys +from pathlib import Path + +from _common import call, load_case_config_payload TEST_REQUIREMENTS = ["etcd", "fluxon-pyo3", "kv-cluster", "ops", "submodules"] +SCENE_ID = "ci_top_attention_mq_core" +TEST_PATHS = [ + "fluxon_py/tests/test_mq/test_capacity_and_auto_clean.py", + "fluxon_py/tests/test_mq/test_payload_lease_error.py", +] def main() -> int: - return run_pytest( - "Flat index entry for non-Ctrl-C MQ tests.", - [ - "fluxon_py/tests/test_mq/test_capacity_and_auto_clean.py", - "fluxon_py/tests/test_mq/test_payload_lease_error.py", - ], + parser = argparse.ArgumentParser( + description="Flat index entry for non-Ctrl-C MQ script tests." + ) + parser.add_argument( + "--python", + default=os.environ.get("PYTHON", sys.executable), + help="Python executable used for the delegated command.", + ) + parser.add_argument( + "--case-config", + help="Canonical CI case config YAML emitted by test_runner.", ) + args = parser.parse_args() + if args.case_config: + load_case_config_payload(Path(args.case_config).resolve(), expected_scene_id=SCENE_ID) + for test_path in TEST_PATHS: + rc = call([args.python, "-u", str((Path(__file__).resolve().parents[2] / test_path))]) + if rc != 0: + return rc + return 0 if __name__ == "__main__": diff --git a/setup_and_pack/tests/test_doc_site_builder_image_workflow.py b/setup_and_pack/tests/test_doc_site_builder_image_workflow.py index 9a40059..e91244c 100644 --- a/setup_and_pack/tests/test_doc_site_builder_image_workflow.py +++ b/setup_and_pack/tests/test_doc_site_builder_image_workflow.py @@ -37,13 +37,17 @@ def test_workflow_builds_exports_and_smokes_image_without_testbed(self) -> None: self.assertNotIn("ci_2_virt_node.py", workflow_text) self.assertNotIn("fluxon_test_stack/", workflow_text) - def test_main_testbed_workflow_does_not_select_doc_page_scene(self) -> None: + def test_main_testbed_workflow_keeps_suite_generation_in_workflow(self) -> None: workflow_text = ALL_TEST_WORKFLOW_PATH.read_text(encoding="utf-8") yaml.load(workflow_text, Loader=yaml.BaseLoader) + self.assertIn("fluxon_test_stack/ci_2_virt_node.py", workflow_text) + self.assertIn("Write ci_2_virt_node suite", workflow_text) self.assertIn("ci_top_attention_bin_kvtest", workflow_text) self.assertIn("ci_top_attention_doc_page_build", workflow_text) + self.assertIn("ci_top_attention_mq_core", workflow_text) self.assertIn("doc_site_base_url", workflow_text) + self.assertIn("rather_no_git_submodule.py", workflow_text) def test_docs_pages_uses_container_entrypoint(self) -> None: workflow_text = DOCS_PAGES_WORKFLOW_PATH.read_text(encoding="utf-8") diff --git a/setup_and_pack/utils/repo_config_utils.py b/setup_and_pack/utils/repo_config_utils.py index 6f240d1..46f4686 100644 --- a/setup_and_pack/utils/repo_config_utils.py +++ b/setup_and_pack/utils/repo_config_utils.py @@ -32,8 +32,11 @@ "load_tsdb_remote_write_url", "_load_yaml_mapping", "load_test_config_mapping", - "load_test_deployconf_path", "load_test_kv_svc_type_from_test_config", + "load_test_etcd_address_from_test_config", + "load_test_fluxon_cluster_name_from_test_config", + "load_test_fluxon_shared_memory_path_from_test_config", + "load_test_fluxon_shared_file_path_from_test_config", "load_deployconf_mapping", "load_deployconf_resolved_global_envs", "load_deployconf_etcd_address", @@ -338,22 +341,6 @@ def load_test_config_mapping(*, config_path: Optional[Path] = None) -> Dict[str, return _load_yaml_mapping(Path(config_path), label="test_config.yaml") -def load_test_deployconf_path(*, config_path: Optional[Path] = None) -> Path: - """Load the shared deployconf path referenced by fluxon_py test_config.yaml.""" - test_cfg_path = Path(config_path) if config_path is not None else None - test_cfg = load_test_config_mapping(config_path=test_cfg_path) - raw = test_cfg.get("deployconf_path") - if not isinstance(raw, str) or not raw.strip(): - raise ValueError("test_config.yaml must define non-empty deployconf_path") - deployconf_path = Path(raw.strip()) - base_dir = test_cfg_path.parent if test_cfg_path is not None else (Path(__file__).resolve().parents[2] / "fluxon_py" / "tests") - if not deployconf_path.is_absolute(): - deployconf_path = (base_dir / deployconf_path).resolve() - if not deployconf_path.exists(): - raise FileNotFoundError(f"deployconf_path from test_config.yaml does not exist: {deployconf_path}") - return deployconf_path - - def load_test_kv_svc_type_from_test_config(*, config_path: Optional[Path] = None) -> str: """Load kv_svc_type from test_config.yaml and validate it against KvClientType.""" test_cfg = load_test_config_mapping(config_path=config_path) @@ -367,3 +354,40 @@ def load_test_kv_svc_type_from_test_config(*, config_path: Optional[Path] = None if value not in allowed: raise ValueError(f"test_config.yaml kv_svc_type must be one of {sorted(allowed)}") return value + + +def load_test_etcd_address_from_test_config(*, config_path: Optional[Path] = None) -> str: + """Load etcd address from test_config.yaml as the single test authority.""" + test_cfg = load_test_config_mapping(config_path=config_path) + raw = test_cfg.get("etcd_address") + if not isinstance(raw, str) or not raw.strip(): + raise ValueError("test_config.yaml must define non-empty etcd_address") + host, port = _verify_host_port(raw.strip(), field="test_config.yaml.etcd_address") + return f"{host}:{port}" + + +def load_test_fluxon_cluster_name_from_test_config(*, config_path: Optional[Path] = None) -> str: + """Load Fluxon cluster name from test_config.yaml as the single test authority.""" + test_cfg = load_test_config_mapping(config_path=config_path) + raw = test_cfg.get("cluster_name") + if not isinstance(raw, str) or not raw.strip(): + raise ValueError("test_config.yaml must define non-empty cluster_name") + return raw.strip() + + +def load_test_fluxon_shared_memory_path_from_test_config(*, config_path: Optional[Path] = None) -> str: + """Load Fluxon shared-memory root from test_config.yaml as the single test authority.""" + test_cfg = load_test_config_mapping(config_path=config_path) + raw = test_cfg.get("shared_memory_path") + if not isinstance(raw, str) or not raw.strip(): + raise ValueError("test_config.yaml must define non-empty shared_memory_path") + return raw.strip() + + +def load_test_fluxon_shared_file_path_from_test_config(*, config_path: Optional[Path] = None) -> str: + """Load Fluxon shared-file root from test_config.yaml as the single test authority.""" + test_cfg = load_test_config_mapping(config_path=config_path) + raw = test_cfg.get("shared_file_path") + if not isinstance(raw, str) or not raw.strip(): + raise ValueError("test_config.yaml must define non-empty shared_file_path") + return raw.strip()