From 0707de26f8cbd4cb36a71098b841a235827c660b Mon Sep 17 00:00:00 2001 From: ActivePeter <1020401660@qq.com> Date: Wed, 24 Jun 2026 16:44:26 +0800 Subject: [PATCH 1/5] test --- ...13\350\257\225\346\265\201\347\250\213.md" | 5 + fluxon_test_stack/ci_2_virt_node.py | 8 +- fluxon_test_stack/ci_test_list.yaml | 44 ++++++ fluxon_test_stack/test_runner.py | 38 +++++ .../tests/test_ci_2_virt_node_contract.py | 20 +++ .../tests/test_runner_contract.py | 58 ++++++++ .../test_test_runner_testbed_contract.py | 42 ++++++ ...st_top_attention_cargo_fs_core_contract.py | 59 ++++++++ ...st_top_attention_cargo_kv_unit_contract.py | 135 ++++++++++++++++++ .../test_top_attention_cargo_util_contract.py | 98 +++++++++++++ .../top_attention_test_index/README.md | 6 +- .../top_attention_test_index/_bin_kvtest.py | 38 +---- .../_cargo_fs_core.py | 6 + .../_cargo_kv_unit.py | 29 +++- .../top_attention_test_index/_cargo_util.py | 20 ++- .../top_attention_test_index/_common.py | 35 +++++ 16 files changed, 594 insertions(+), 47 deletions(-) create mode 100644 fluxon_test_stack/tests/test_top_attention_cargo_fs_core_contract.py create mode 100644 fluxon_test_stack/tests/test_top_attention_cargo_kv_unit_contract.py create mode 100644 fluxon_test_stack/tests/test_top_attention_cargo_util_contract.py diff --git "a/fluxon_doc_cn/design/teststack_1_\345\275\223\345\211\215\346\236\266\346\236\204\344\270\216CI\346\265\213\350\257\225\346\265\201\347\250\213.md" "b/fluxon_doc_cn/design/teststack_1_\345\275\223\345\211\215\346\236\266\346\236\204\344\270\216CI\346\265\213\350\257\225\346\265\201\347\250\213.md" index 823a4be..0432a73 100644 --- "a/fluxon_doc_cn/design/teststack_1_\345\275\223\345\211\215\346\236\266\346\236\204\344\270\216CI\346\265\213\350\257\225\346\265\201\347\250\213.md" +++ "b/fluxon_doc_cn/design/teststack_1_\345\275\223\345\211\215\346\236\266\346\236\204\344\270\216CI\346\265\213\350\257\225\346\265\201\347\250\213.md" @@ -661,12 +661,17 @@ GitHub Actions 主窗口中的许多日志并非本地直接打印,而是由 ` - `test_runner.py` 会根据 `scene_id` 做 runner-native dispatch,把 case 转发到: - `__RUN_DIR__/venv/bin/python3 -u __RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_bin_kvtest.py --case-config __RUN_DIR__/configs/ci_scene_config.yaml` - `__RUN_DIR__/venv/bin/python3 -u __RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_doc_page_build.py --case-config __RUN_DIR__/configs/ci_scene_config.yaml` + - `__RUN_DIR__/venv/bin/python3 -u __RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_fs_core.py` + - `__RUN_DIR__/venv/bin/python3 -u __RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_util.py --case-config __RUN_DIR__/configs/ci_scene_config.yaml` + - `__RUN_DIR__/venv/bin/python3 -u __RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_kv_unit.py --case-config __RUN_DIR__/configs/ci_scene_config.yaml` 这样做的稳定语义是: - scene 粒度直接对齐 top-attention index 条目,不再并存第二层 `ci_rust` / `ci_doc_page` 划分; - 实际 CI 路径仍由单次 `ci_2_virt_node.py` 调用统一拥有,但它只重写部署目标与 public profile,不再改写 workload 运行语义; - GitHub Actions 里定义的 workload 配置会直接写入 suite profile 的 `runtime.ci.scene_configs`,随后由 `test_runner.py` 为每个 case 落一份 `configs/ci_scene_config.yaml`,再交给 `_bin_kvtest.py` / `_doc_page_build.py` 消费; +- 纯 crate 级 direct-cargo wrapper 可以保持最薄脚本入口,例如 `_cargo_fs_core.py`; +- 需要 runtime endpoint 或 feature 选择的 wrapper,则统一消费 `scene_config` / `scene_runtime`,例如 `_bin_kvtest.py`、`_cargo_util.py`、`_cargo_kv_unit.py`; - `_bin_kvtest.py` 继续保持 thin wrapper,只负责把参数转发到 `cargo run --bin kv_test`,并补齐 active venv 的 native runtime lib 搜索路径。 因此,GitHub Actions 现在覆盖的是“由单一 `ci_2_virt_node.py` 入口启动,并通过 top-attention CI scene 执行 workload”这条真实 CI 路径,而不是在 suite 里再并存一层旧 scene。 diff --git a/fluxon_test_stack/ci_2_virt_node.py b/fluxon_test_stack/ci_2_virt_node.py index 28e9b82..4a74f28 100644 --- a/fluxon_test_stack/ci_2_virt_node.py +++ b/fluxon_test_stack/ci_2_virt_node.py @@ -415,11 +415,13 @@ def _rewrite_suite_for_local_dual_nodes( if scene_configs is not None: if not isinstance(scene_configs, dict): raise ValueError("generated public profile runtime.ci.scene_configs must be a mapping") - kv_scene_config = scene_configs.get("ci_top_attention_bin_kvtest") - if kv_scene_config is not None: + for scene_id in ("ci_top_attention_bin_kvtest", "ci_top_attention_cargo_kv_unit"): + kv_scene_config = scene_configs.get(scene_id) + if kv_scene_config is None: + continue if not isinstance(kv_scene_config, dict): raise ValueError( - "generated public profile runtime.ci.scene_configs['ci_top_attention_bin_kvtest'] must be a mapping" + f"generated public profile runtime.ci.scene_configs[{scene_id!r}] must be a mapping" ) # The generated public profile is fixed to the tcp-thread transport branch. kv_scene_config["kv_transport_feature"] = PUBLIC_TRANSPORT_FEATURE diff --git a/fluxon_test_stack/ci_test_list.yaml b/fluxon_test_stack/ci_test_list.yaml index 6a3c56b..a023367 100644 --- a/fluxon_test_stack/ci_test_list.yaml +++ b/fluxon_test_stack/ci_test_list.yaml @@ -29,6 +29,30 @@ scenes: scales: [n1_kvowner_dram_20gib] profiles: [fluxon_tcp] + ci_top_attention_cargo_fs_core: + ci: + subject: rust + runtime_contract: rust_self_managed + select: + scales: [n1_kvowner_dram_3gib] + profiles: [fluxon_tcp] + + ci_top_attention_cargo_util: + ci: + subject: rust + runtime_contract: rust_self_managed + select: + scales: [n1_kvowner_dram_20gib] + profiles: [fluxon_tcp] + + ci_top_attention_cargo_kv_unit: + ci: + subject: rust + runtime_contract: rust_self_managed + select: + scales: [n1_kvowner_dram_20gib] + profiles: [fluxon_tcp] + ci_top_attention_mq_core: ci: subject: mq @@ -318,7 +342,12 @@ profiles: ci_top_attention_doc_page_build: doc_site_base_url: example.com ci_top_attention_bin_kvtest: + kv_transport_feature: tcp_thread_transport kv_test_rounds: all + ci_top_attention_cargo_fs_core: {} + ci_top_attention_cargo_util: {} + ci_top_attention_cargo_kv_unit: + kv_transport_feature: tcp_thread_transport ci_top_attention_mq_core: {} runtime_contracts: cluster_kv_owner: &cluster_kv_owner_runtime @@ -464,7 +493,12 @@ profiles: ci_top_attention_doc_page_build: doc_site_base_url: example.com ci_top_attention_bin_kvtest: + kv_transport_feature: tcp_thread_transport kv_test_rounds: all + ci_top_attention_cargo_fs_core: {} + ci_top_attention_cargo_util: {} + ci_top_attention_cargo_kv_unit: + kv_transport_feature: tcp_thread_transport ci_top_attention_mq_core: {} test_stack: <<: *common_test_stack_runtime @@ -477,7 +511,12 @@ profiles: ci_top_attention_doc_page_build: doc_site_base_url: example.com ci_top_attention_bin_kvtest: + kv_transport_feature: tcp_thread_transport kv_test_rounds: all + ci_top_attention_cargo_fs_core: {} + ci_top_attention_cargo_util: {} + ci_top_attention_cargo_kv_unit: + kv_transport_feature: tcp_thread_transport ci_top_attention_mq_core: {} test_stack: <<: *common_test_stack_runtime @@ -490,7 +529,12 @@ profiles: ci_top_attention_doc_page_build: doc_site_base_url: example.com ci_top_attention_bin_kvtest: + kv_transport_feature: tcp_thread_transport kv_test_rounds: all + ci_top_attention_cargo_fs_core: {} + ci_top_attention_cargo_util: {} + ci_top_attention_cargo_kv_unit: + kv_transport_feature: tcp_thread_transport ci_top_attention_mq_core: {} test_stack: <<: *common_test_stack_runtime diff --git a/fluxon_test_stack/test_runner.py b/fluxon_test_stack/test_runner.py index d8cd1c9..ca65e15 100644 --- a/fluxon_test_stack/test_runner.py +++ b/fluxon_test_stack/test_runner.py @@ -401,6 +401,9 @@ def _runner_native_ci_scene_ids() -> Tuple[str, ...]: return ( "ci_top_attention_doc_page_build", "ci_top_attention_bin_kvtest", + "ci_top_attention_cargo_fs_core", + "ci_top_attention_cargo_util", + "ci_top_attention_cargo_kv_unit", "ci_top_attention_mq_core", ) @@ -7027,6 +7030,41 @@ def _runner_native_ci_commands_for_case(case: _ResolvedCase, *, ctx: str) -> Lis "timeout_seconds": 21600, } ] + if scene_id == "ci_top_attention_cargo_fs_core": + return [ + { + "id": "top_attention_cargo_fs_core", + "command": ( + "__RUN_DIR__/venv/bin/python3 -u " + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_fs_core.py" + ), + "timeout_seconds": 21600, + } + ] + if scene_id == "ci_top_attention_cargo_util": + return [ + { + "id": "top_attention_cargo_util", + "command": ( + "__RUN_DIR__/venv/bin/python3 -u " + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_util.py " + "--case-config __RUN_DIR__/configs/ci_scene_config.yaml" + ), + "timeout_seconds": 21600, + } + ] + if scene_id == "ci_top_attention_cargo_kv_unit": + return [ + { + "id": "top_attention_cargo_kv_unit", + "command": ( + "__RUN_DIR__/venv/bin/python3 -u " + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_kv_unit.py " + "--case-config __RUN_DIR__/configs/ci_scene_config.yaml" + ), + "timeout_seconds": 21600, + } + ] if scene_id == "ci_top_attention_mq_core": return [ { diff --git a/fluxon_test_stack/tests/test_ci_2_virt_node_contract.py b/fluxon_test_stack/tests/test_ci_2_virt_node_contract.py index 4392be6..9becc4c 100644 --- a/fluxon_test_stack/tests/test_ci_2_virt_node_contract.py +++ b/fluxon_test_stack/tests/test_ci_2_virt_node_contract.py @@ -28,6 +28,7 @@ def _load_module(): class TestCi2VirtNodeContract(unittest.TestCase): _KVTEST_SCENE_ID = "ci_top_attention_bin_kvtest" + _CARGO_KV_UNIT_SCENE_ID = "ci_top_attention_cargo_kv_unit" _DOC_SCENE_ID = "ci_top_attention_doc_page_build" _MQ_SCENE_ID = "ci_top_attention_mq_core" @@ -169,6 +170,25 @@ def test_generated_suite_preserves_source_scene_configs(self) -> None: "p2p_only", ) + def test_generated_suite_injects_public_transport_feature_for_cargo_kv_unit(self) -> None: + suite_cfg = _ENTRY._load_yaml_mapping(_ENTRY.DEFAULT_SUITE_PATH, ctx="suite") + generated = _ENTRY._rewrite_suite_for_local_dual_nodes( + suite_cfg=suite_cfg, + scene_ids=[self._CARGO_KV_UNIT_SCENE_ID], + primary_node_name="local-node-a", + secondary_node_name="local-node-b", + host_ip="10.1.1.119", + wheel_name="fluxon-0.2.1-cp38-abi3-manylinux_2_28_x86_64.whl", + controller_port=19080, + ) + + self.assertEqual( + generated["profiles"]["fluxon_tcp_thread"]["runtime"]["ci"]["scene_configs"][self._CARGO_KV_UNIT_SCENE_ID][ + "kv_transport_feature" + ], + "tcp_thread_transport", + ) + def test_generated_suite_supports_doc_page_ci_scene(self) -> None: suite_cfg = _ENTRY._load_yaml_mapping(_ENTRY.DEFAULT_SUITE_PATH, ctx="suite") generated = _ENTRY._rewrite_suite_for_local_dual_nodes( diff --git a/fluxon_test_stack/tests/test_runner_contract.py b/fluxon_test_stack/tests/test_runner_contract.py index 7c8fddd..1c3ff12 100644 --- a/fluxon_test_stack/tests/test_runner_contract.py +++ b/fluxon_test_stack/tests/test_runner_contract.py @@ -59,6 +59,10 @@ def _build_checks(selected_test_id: Optional[str]) -> List[Tuple[str, Callable[[ "ci_top_attention_doc_page_build_uses_online_docker_image", test_ci_top_attention_doc_page_build_uses_online_docker_image, ), + ( + "ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime", + test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime, + ), ( "ci_top_attention_mq_core_uses_cluster_kv_owner_runtime", test_ci_top_attention_mq_core_uses_cluster_kv_owner_runtime, @@ -291,5 +295,59 @@ def test_ci_top_attention_mq_core_uses_cluster_kv_owner_runtime() -> None: print("PASS: test_ci_top_attention_mq_core_uses_cluster_kv_owner_runtime") +def test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime() -> None: + repo_root = Path(__file__).resolve().parents[2] + suite_cfg_path = repo_root / "fluxon_test_stack" / "ci_test_list.yaml" + suite_cfg = yaml.safe_load(suite_cfg_path.read_text(encoding="utf-8")) + if not isinstance(suite_cfg, dict): + print("FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - suite config is not a mapping") + return + + suite_for_contract = copy.deepcopy(suite_cfg) + suite = _TEST_RUNNER._parse_suite_config(suite_for_contract) + cases = _TEST_RUNNER._expand_cases(suite) + case = next( + ( + item + for item in cases + if item.scene_id == "ci_top_attention_cargo_kv_unit" + and item.profile_id == "fluxon_tcp" + ), + None, + ) + if case is None: + print("FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - missing cargo kv unit case") + return + planned = _TEST_RUNNER._build_ci_execution_plan(case, suite) + if len(planned) != 1: + print( + "FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - " + f"expected one planned case, got {len(planned)}" + ) + return + commands = planned[0].ci_commands + if len(commands) != 1: + print( + "FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - " + f"expected one command, got {len(commands)}" + ) + return + command = commands[0] + if command.get("id") != "top_attention_cargo_kv_unit": + print( + "FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - " + f"unexpected command id: {command.get('id')!r}" + ) + return + command_text = command.get("command") + if not isinstance(command_text, str) or "_cargo_kv_unit.py --case-config __RUN_DIR__/configs/ci_scene_config.yaml" not in command_text: + print( + "FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - " + f"unexpected command: {command_text!r}" + ) + return + print("PASS: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime") + + if __name__ == "__main__": raise SystemExit(main()) diff --git a/fluxon_test_stack/tests/test_test_runner_testbed_contract.py b/fluxon_test_stack/tests/test_test_runner_testbed_contract.py index 34bf640..b8c8f3c 100644 --- a/fluxon_test_stack/tests/test_test_runner_testbed_contract.py +++ b/fluxon_test_stack/tests/test_test_runner_testbed_contract.py @@ -224,6 +224,48 @@ def test_top_attention_ci_execution_plan_is_runner_native(self) -> None: self.assertEqual(planned[0].ci_commands[0]["id"], "top_attention_bin_kvtest") self.assertIn("--case-config __RUN_DIR__/configs/ci_scene_config.yaml", planned[0].ci_commands[0]["command"]) + def test_top_attention_cargo_fs_core_ci_execution_plan_is_runner_native(self) -> None: + suite_cfg = yaml.safe_load((_RUNNER.RUNNER_REPO_ROOT / "fluxon_test_stack" / "ci_test_list.yaml").read_text(encoding="utf-8")) + suite = _RUNNER._parse_suite_config(suite_cfg) + cases = _RUNNER._expand_cases(suite) + case = next(item for item in cases if item.scene_id == "ci_top_attention_cargo_fs_core" and item.profile_id == "fluxon_tcp") + planned = _RUNNER._build_ci_execution_plan(case, suite) + self.assertEqual(len(planned), 1) + self.assertEqual(planned[0].ci_commands[0]["id"], "top_attention_cargo_fs_core") + self.assertIn( + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_fs_core.py", + planned[0].ci_commands[0]["command"], + ) + self.assertNotIn("--case-config", planned[0].ci_commands[0]["command"]) + + def test_top_attention_cargo_util_ci_execution_plan_is_runner_native(self) -> None: + suite_cfg = yaml.safe_load((_RUNNER.RUNNER_REPO_ROOT / "fluxon_test_stack" / "ci_test_list.yaml").read_text(encoding="utf-8")) + suite = _RUNNER._parse_suite_config(suite_cfg) + cases = _RUNNER._expand_cases(suite) + case = next(item for item in cases if item.scene_id == "ci_top_attention_cargo_util" and item.profile_id == "fluxon_tcp") + planned = _RUNNER._build_ci_execution_plan(case, suite) + self.assertEqual(len(planned), 1) + self.assertEqual(planned[0].ci_commands[0]["id"], "top_attention_cargo_util") + self.assertIn( + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_util.py", + planned[0].ci_commands[0]["command"], + ) + self.assertIn("--case-config __RUN_DIR__/configs/ci_scene_config.yaml", planned[0].ci_commands[0]["command"]) + + def test_top_attention_cargo_kv_unit_ci_execution_plan_is_runner_native(self) -> None: + suite_cfg = yaml.safe_load((_RUNNER.RUNNER_REPO_ROOT / "fluxon_test_stack" / "ci_test_list.yaml").read_text(encoding="utf-8")) + suite = _RUNNER._parse_suite_config(suite_cfg) + cases = _RUNNER._expand_cases(suite) + case = next(item for item in cases if item.scene_id == "ci_top_attention_cargo_kv_unit" and item.profile_id == "fluxon_tcp") + planned = _RUNNER._build_ci_execution_plan(case, suite) + self.assertEqual(len(planned), 1) + self.assertEqual(planned[0].ci_commands[0]["id"], "top_attention_cargo_kv_unit") + self.assertIn( + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_kv_unit.py", + planned[0].ci_commands[0]["command"], + ) + self.assertIn("--case-config __RUN_DIR__/configs/ci_scene_config.yaml", planned[0].ci_commands[0]["command"]) + def test_top_attention_mq_core_ci_execution_plan_is_runner_native(self) -> None: suite_cfg = yaml.safe_load((_RUNNER.RUNNER_REPO_ROOT / "fluxon_test_stack" / "ci_test_list.yaml").read_text(encoding="utf-8")) suite = _RUNNER._parse_suite_config(suite_cfg) diff --git a/fluxon_test_stack/tests/test_top_attention_cargo_fs_core_contract.py b/fluxon_test_stack/tests/test_top_attention_cargo_fs_core_contract.py new file mode 100644 index 0000000..f1cddbe --- /dev/null +++ b/fluxon_test_stack/tests/test_top_attention_cargo_fs_core_contract.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import importlib.util +import sys +import unittest +from pathlib import Path +from unittest import mock + + +REPO_ROOT = Path(__file__).resolve().parents[2] +MODULE_PATH = REPO_ROOT / "fluxon_test_stack" / "top_attention_test_index" / "_cargo_fs_core.py" + + +def _load_module(): + module_dir = MODULE_PATH.parent + sys.path.insert(0, str(module_dir)) + try: + spec = importlib.util.spec_from_file_location("fluxon_test_stack_top_attention_cargo_fs_core_contract", MODULE_PATH) + assert spec is not None and spec.loader is not None + mod = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = mod + spec.loader.exec_module(mod) + return mod + finally: + if sys.path and sys.path[0] == str(module_dir): + sys.path.pop(0) + + +_ENTRY = _load_module() + + +class TestTopAttentionCargoFsCoreContract(unittest.TestCase): + def test_main_calls_cargo_test_for_fs_core_crate(self) -> None: + with mock.patch.object(_ENTRY, "run_cargo", return_value=0) as run_cargo: + with mock.patch.object(sys, "argv", [str(MODULE_PATH)]): + rc = _ENTRY.main() + + self.assertEqual(rc, 0) + self.assertEqual( + run_cargo.call_args.args[0], + [ + "test", + "--manifest-path", + str(REPO_ROOT / "fluxon_rs" / "fluxon_fs_core" / "Cargo.toml"), + ], + ) + + def test_main_rejects_pytest_style_passthrough_flags(self) -> None: + with mock.patch.object(sys, "argv", [str(MODULE_PATH), "-k", "lease"]): + with self.assertRaises(SystemExit) as cm: + _ENTRY.main() + + self.assertEqual(cm.exception.code, 2) + + +if __name__ == "__main__": + raise SystemExit(unittest.main()) diff --git a/fluxon_test_stack/tests/test_top_attention_cargo_kv_unit_contract.py b/fluxon_test_stack/tests/test_top_attention_cargo_kv_unit_contract.py new file mode 100644 index 0000000..bfc189f --- /dev/null +++ b/fluxon_test_stack/tests/test_top_attention_cargo_kv_unit_contract.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import importlib.util +import sys +import tempfile +import unittest +from pathlib import Path +from unittest import mock + +import yaml + + +REPO_ROOT = Path(__file__).resolve().parents[2] +MODULE_PATH = REPO_ROOT / "fluxon_test_stack" / "top_attention_test_index" / "_cargo_kv_unit.py" + + +def _load_module(): + module_dir = MODULE_PATH.parent + sys.path.insert(0, str(module_dir)) + try: + spec = importlib.util.spec_from_file_location("fluxon_test_stack_top_attention_cargo_kv_unit_contract", MODULE_PATH) + assert spec is not None and spec.loader is not None + mod = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = mod + spec.loader.exec_module(mod) + return mod + finally: + if sys.path and sys.path[0] == str(module_dir): + sys.path.pop(0) + + +_ENTRY = _load_module() + + +class TestTopAttentionCargoKvUnitContract(unittest.TestCase): + def test_main_accepts_case_config_and_writes_build_config_ext(self) -> None: + with tempfile.TemporaryDirectory() as td: + run_dir = Path(td) + cfg_dir = run_dir / "configs" + cfg_dir.mkdir(parents=True) + src_dir = run_dir / "src" + src_dir.mkdir(parents=True) + case_cfg = cfg_dir / "ci_scene_config.yaml" + case_cfg.write_text( + yaml.safe_dump( + { + "case": { + "scene_id": "ci_top_attention_cargo_kv_unit", + "scale_id": "n1_kvowner_dram_20gib", + "profile_id": "fluxon_tcp", + "case_id": "ci_top_attention_cargo_kv_unit__n1_kvowner_dram_20gib__fluxon_tcp", + }, + "scene_config": { + "kv_transport_feature": "tcp_thread_transport", + }, + "scene_runtime": { + "etcd": {"ip": "127.0.0.1", "port": 19180}, + "greptime": {"ip": "127.0.0.1", "port": 19190}, + }, + }, + sort_keys=False, + ), + encoding="utf-8", + ) + + with mock.patch.object(_ENTRY, "run_cargo", return_value=0) as run_cargo: + with mock.patch.object( + sys, + "argv", + [str(MODULE_PATH), "--case-config", str(case_cfg), "--feature", "tcp_thread_transport"], + ): + rc = _ENTRY.main() + + self.assertEqual(rc, 0) + build_cfg = yaml.safe_load((src_dir / "build_config_ext.yml").read_text(encoding="utf-8")) + self.assertEqual( + build_cfg, + { + "etcd": "127.0.0.1:19180", + "prom": "http://127.0.0.1:19190/v1/prometheus", + "prom_remote_write_url": "http://127.0.0.1:19190/v1/prometheus/write", + }, + ) + self.assertEqual( + run_cargo.call_args.args[0], + [ + "test", + "--manifest-path", + str(REPO_ROOT / "fluxon_rs" / "fluxon_kv" / "Cargo.toml"), + "--no-default-features", + "--features", + "p2p_transfer,tcp_thread_transport", + ], + ) + + def test_main_rejects_feature_mismatch_when_case_config_is_present(self) -> None: + with tempfile.TemporaryDirectory() as td: + run_dir = Path(td) + cfg_dir = run_dir / "configs" + cfg_dir.mkdir(parents=True) + case_cfg = cfg_dir / "ci_scene_config.yaml" + case_cfg.write_text( + yaml.safe_dump( + { + "case": {"scene_id": "ci_top_attention_cargo_kv_unit"}, + "scene_config": {"kv_transport_feature": "tcp_thread_transport"}, + "scene_runtime": { + "etcd": {"ip": "127.0.0.1", "port": 19180}, + "greptime": {"ip": "127.0.0.1", "port": 19190}, + }, + }, + sort_keys=False, + ), + encoding="utf-8", + ) + with mock.patch.object( + sys, + "argv", + [str(MODULE_PATH), "--case-config", str(case_cfg), "--feature", "fastws_transport"], + ): + with self.assertRaisesRegex(ValueError, "must match scene_config.kv_transport_feature"): + _ENTRY.main() + + def test_main_rejects_pytest_style_passthrough_flags(self) -> None: + with mock.patch.object(sys, "argv", [str(MODULE_PATH), "-k", "lease"]): + with self.assertRaises(SystemExit) as cm: + _ENTRY.main() + + self.assertEqual(cm.exception.code, 2) + + +if __name__ == "__main__": + raise SystemExit(unittest.main()) diff --git a/fluxon_test_stack/tests/test_top_attention_cargo_util_contract.py b/fluxon_test_stack/tests/test_top_attention_cargo_util_contract.py new file mode 100644 index 0000000..52b5e72 --- /dev/null +++ b/fluxon_test_stack/tests/test_top_attention_cargo_util_contract.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import importlib.util +import sys +import tempfile +import unittest +from pathlib import Path +from unittest import mock + +import yaml + + +REPO_ROOT = Path(__file__).resolve().parents[2] +MODULE_PATH = REPO_ROOT / "fluxon_test_stack" / "top_attention_test_index" / "_cargo_util.py" + + +def _load_module(): + module_dir = MODULE_PATH.parent + sys.path.insert(0, str(module_dir)) + try: + spec = importlib.util.spec_from_file_location("fluxon_test_stack_top_attention_cargo_util_contract", MODULE_PATH) + assert spec is not None and spec.loader is not None + mod = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = mod + spec.loader.exec_module(mod) + return mod + finally: + if sys.path and sys.path[0] == str(module_dir): + sys.path.pop(0) + + +_ENTRY = _load_module() + + +class TestTopAttentionCargoUtilContract(unittest.TestCase): + def test_main_accepts_case_config_and_writes_build_config_ext(self) -> None: + with tempfile.TemporaryDirectory() as td: + run_dir = Path(td) + cfg_dir = run_dir / "configs" + cfg_dir.mkdir(parents=True) + src_dir = run_dir / "src" + src_dir.mkdir(parents=True) + case_cfg = cfg_dir / "ci_scene_config.yaml" + case_cfg.write_text( + yaml.safe_dump( + { + "case": { + "scene_id": "ci_top_attention_cargo_util", + "scale_id": "n1_kvowner_dram_20gib", + "profile_id": "fluxon_tcp", + "case_id": "ci_top_attention_cargo_util__n1_kvowner_dram_20gib__fluxon_tcp", + }, + "scene_config": {}, + "scene_runtime": { + "etcd": {"ip": "127.0.0.1", "port": 19180}, + "greptime": {"ip": "127.0.0.1", "port": 19190}, + }, + }, + sort_keys=False, + ), + encoding="utf-8", + ) + + with mock.patch.object(_ENTRY, "run_cargo", return_value=0) as run_cargo: + with mock.patch.object(sys, "argv", [str(MODULE_PATH), "--case-config", str(case_cfg)]): + rc = _ENTRY.main() + + self.assertEqual(rc, 0) + build_cfg = yaml.safe_load((src_dir / "build_config_ext.yml").read_text(encoding="utf-8")) + self.assertEqual( + build_cfg, + { + "etcd": "127.0.0.1:19180", + "prom": "http://127.0.0.1:19190/v1/prometheus", + "prom_remote_write_url": "http://127.0.0.1:19190/v1/prometheus/write", + }, + ) + self.assertEqual( + run_cargo.call_args.args[0], + [ + "test", + "--manifest-path", + str(REPO_ROOT / "fluxon_rs" / "fluxon_util" / "Cargo.toml"), + ], + ) + + def test_main_rejects_pytest_style_passthrough_flags(self) -> None: + with mock.patch.object(sys, "argv", [str(MODULE_PATH), "-k", "lease"]): + with self.assertRaises(SystemExit) as cm: + _ENTRY.main() + + self.assertEqual(cm.exception.code, 2) + + +if __name__ == "__main__": + raise SystemExit(unittest.main()) diff --git a/fluxon_test_stack/top_attention_test_index/README.md b/fluxon_test_stack/top_attention_test_index/README.md index 516c07e..e601dd5 100644 --- a/fluxon_test_stack/top_attention_test_index/README.md +++ b/fluxon_test_stack/top_attention_test_index/README.md @@ -48,9 +48,9 @@ Entries: - `_test_stack_contract.py`: test-stack runner contract coverage - `_deployment_codegen.py`: deployment code generation coverage - `_script_tools.py`: script utility coverage -- `_cargo_fs_core.py`: cargo tests for the Rust FS core crate -- `_cargo_util.py`: cargo tests for the Rust util crate -- `_cargo_kv_unit.py`: cargo tests for the Rust KV crate +- `_cargo_fs_core.py`: cargo tests for the Rust FS core crate. `ci_test_list.yaml` now exposes this wrapper as the formal `ci_top_attention_cargo_fs_core` runner-native scene. +- `_cargo_util.py`: cargo tests for the Rust util crate. `ci_test_list.yaml` now exposes this wrapper as the formal `ci_top_attention_cargo_util` runner-native scene, with runtime endpoints supplied through canonical `--case-config`. +- `_cargo_kv_unit.py`: cargo tests for the Rust KV crate. `ci_test_list.yaml` now exposes this wrapper as the formal `ci_top_attention_cargo_kv_unit` runner-native scene, with transport feature selection bounded by `scene_config.kv_transport_feature`. Operational note: diff --git a/fluxon_test_stack/top_attention_test_index/_bin_kvtest.py b/fluxon_test_stack/top_attention_test_index/_bin_kvtest.py index faddb51..c563c14 100644 --- a/fluxon_test_stack/top_attention_test_index/_bin_kvtest.py +++ b/fluxon_test_stack/top_attention_test_index/_bin_kvtest.py @@ -5,9 +5,7 @@ import os from pathlib import Path -import yaml - -from _common import REPO_ROOT, load_case_config_payload, run_cargo +from _common import REPO_ROOT, load_case_config_payload, run_cargo, write_build_config_ext TEST_REQUIREMENTS = ["cargo", "etcd", "ops", "submodules"] @@ -33,38 +31,6 @@ def _parse_kv_test_rounds(raw: object) -> str: return ",".join(rounds) -def _require_scene_runtime_endpoint(scene_runtime: object, *, service_id: str) -> tuple[str, int]: - if not isinstance(scene_runtime, dict): - raise ValueError("case config scene_runtime must be a mapping") - raw_service = scene_runtime.get(service_id) - if not isinstance(raw_service, dict): - raise ValueError(f"case config scene_runtime.{service_id} must be a mapping") - ip = str(raw_service.get("ip") or "").strip() - if not ip: - raise ValueError(f"case config scene_runtime.{service_id}.ip must be set") - port = raw_service.get("port") - if not isinstance(port, int): - raise ValueError(f"case config scene_runtime.{service_id}.port must be an int") - return ip, port - - -def _write_build_config_ext(case_cfg_path: Path, scene_runtime: dict) -> None: - etcd_ip, etcd_port = _require_scene_runtime_endpoint(scene_runtime, service_id="etcd") - greptime_ip, greptime_port = _require_scene_runtime_endpoint(scene_runtime, service_id="greptime") - out_path = case_cfg_path.resolve().parents[1] / "src" / "build_config_ext.yml" - out_path.write_text( - yaml.safe_dump( - { - "etcd": f"{etcd_ip}:{etcd_port}", - "prom": f"http://{greptime_ip}:{greptime_port}/v1/prometheus", - "prom_remote_write_url": f"http://{greptime_ip}:{greptime_port}/v1/prometheus/write", - }, - sort_keys=False, - ), - encoding="utf-8", - ) - - def main() -> int: parser = argparse.ArgumentParser( description="Flat index entry for the existing Rust kv_test binary." @@ -85,7 +51,7 @@ def main() -> int: scene_runtime = case_payload.get("scene_runtime") if not isinstance(scene_runtime, dict): raise ValueError("case config must define scene_runtime mapping") - _write_build_config_ext(case_cfg_path, scene_runtime) + write_build_config_ext(case_cfg_path, scene_runtime=scene_runtime) cargo_args = [ "run", diff --git a/fluxon_test_stack/top_attention_test_index/_cargo_fs_core.py b/fluxon_test_stack/top_attention_test_index/_cargo_fs_core.py index cbca6f5..0af437c 100755 --- a/fluxon_test_stack/top_attention_test_index/_cargo_fs_core.py +++ b/fluxon_test_stack/top_attention_test_index/_cargo_fs_core.py @@ -1,6 +1,8 @@ #!/usr/bin/env python3 from __future__ import annotations +import argparse + from _common import REPO_ROOT, run_cargo @@ -8,6 +10,10 @@ def main() -> int: + parser = argparse.ArgumentParser( + description="Flat index entry for Rust FS core crate tests." + ) + parser.parse_args() return run_cargo([ "test", "--manifest-path", diff --git a/fluxon_test_stack/top_attention_test_index/_cargo_kv_unit.py b/fluxon_test_stack/top_attention_test_index/_cargo_kv_unit.py index 36ae5ff..576ba44 100755 --- a/fluxon_test_stack/top_attention_test_index/_cargo_kv_unit.py +++ b/fluxon_test_stack/top_attention_test_index/_cargo_kv_unit.py @@ -3,31 +3,52 @@ import argparse import os +from pathlib import Path -from _common import REPO_ROOT, run_cargo +from _common import REPO_ROOT, load_case_config_payload, run_cargo, write_build_config_ext TEST_REQUIREMENTS = ["cargo", "etcd", "ops", "submodules"] +SCENE_ID = "ci_top_attention_cargo_kv_unit" def main() -> int: parser = argparse.ArgumentParser( description="Flat index entry for Rust KV crate unit tests." ) + parser.add_argument( + "--case-config", + help="Canonical CI case config YAML emitted by test_runner.", + ) parser.add_argument( "--feature", default=os.environ.get("FLUXON_KV_TEST_TRANSPORT_FEATURE", "tcp_thread_transport"), help="Transport feature appended to p2p_transfer.", ) - args, passthrough = parser.parse_known_args() + args = parser.parse_args() + feature = str(args.feature).strip() + if args.case_config: + case_cfg_path = Path(args.case_config).resolve() + case_payload = load_case_config_payload(case_cfg_path, expected_scene_id=SCENE_ID) + scene_config = case_payload["scene_config"] + configured_feature = str(scene_config.get("kv_transport_feature") or "").strip() + if not configured_feature: + raise ValueError("scene_config.kv_transport_feature must be set") + if feature != configured_feature: + raise ValueError( + f"--feature must match scene_config.kv_transport_feature when --case-config is set: {configured_feature!r}" + ) + scene_runtime = case_payload.get("scene_runtime") + if not isinstance(scene_runtime, dict): + raise ValueError("case config must define scene_runtime mapping") + write_build_config_ext(case_cfg_path, scene_runtime=scene_runtime) return run_cargo([ "test", "--manifest-path", str(REPO_ROOT / "fluxon_rs" / "fluxon_kv" / "Cargo.toml"), "--no-default-features", "--features", - f"p2p_transfer,{args.feature}", - *passthrough, + f"p2p_transfer,{feature}", ]) diff --git a/fluxon_test_stack/top_attention_test_index/_cargo_util.py b/fluxon_test_stack/top_attention_test_index/_cargo_util.py index 2e707c8..658ea4a 100755 --- a/fluxon_test_stack/top_attention_test_index/_cargo_util.py +++ b/fluxon_test_stack/top_attention_test_index/_cargo_util.py @@ -1,13 +1,31 @@ #!/usr/bin/env python3 from __future__ import annotations -from _common import REPO_ROOT, run_cargo +import argparse +from pathlib import Path +from _common import REPO_ROOT, load_case_config_payload, run_cargo, write_build_config_ext TEST_REQUIREMENTS = ["cargo", "etcd", "ops", "submodules"] +SCENE_ID = "ci_top_attention_cargo_util" def main() -> int: + parser = argparse.ArgumentParser( + description="Flat index entry for Rust util crate tests." + ) + parser.add_argument( + "--case-config", + help="Canonical CI case config YAML emitted by test_runner.", + ) + args = parser.parse_args() + if args.case_config: + case_cfg_path = Path(args.case_config).resolve() + case_payload = load_case_config_payload(case_cfg_path, expected_scene_id=SCENE_ID) + scene_runtime = case_payload.get("scene_runtime") + if not isinstance(scene_runtime, dict): + raise ValueError("case config must define scene_runtime mapping") + write_build_config_ext(case_cfg_path, scene_runtime=scene_runtime) return run_cargo([ "test", "--manifest-path", diff --git a/fluxon_test_stack/top_attention_test_index/_common.py b/fluxon_test_stack/top_attention_test_index/_common.py index 3991aa7..204cb12 100755 --- a/fluxon_test_stack/top_attention_test_index/_common.py +++ b/fluxon_test_stack/top_attention_test_index/_common.py @@ -85,6 +85,41 @@ def load_case_config_payload(path: str | Path, *, expected_scene_id: str) -> dic return raw +def _require_scene_runtime_endpoint(scene_runtime: object, *, service_id: str) -> tuple[str, int]: + if not isinstance(scene_runtime, dict): + raise ValueError("case config scene_runtime must be a mapping") + raw_service = scene_runtime.get(service_id) + if not isinstance(raw_service, dict): + raise ValueError(f"case config scene_runtime.{service_id} must be a mapping") + ip = str(raw_service.get("ip") or "").strip() + if not ip: + raise ValueError(f"case config scene_runtime.{service_id}.ip must be set") + port = raw_service.get("port") + if not isinstance(port, int): + raise ValueError(f"case config scene_runtime.{service_id}.port must be an int") + return ip, port + + +def write_build_config_ext(case_cfg_path: str | Path, *, scene_runtime: object) -> Path: + cfg_path = Path(case_cfg_path).resolve() + etcd_ip, etcd_port = _require_scene_runtime_endpoint(scene_runtime, service_id="etcd") + greptime_ip, greptime_port = _require_scene_runtime_endpoint(scene_runtime, service_id="greptime") + out_path = cfg_path.parents[1] / "src" / "build_config_ext.yml" + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_text( + yaml.safe_dump( + { + "etcd": f"{etcd_ip}:{etcd_port}", + "prom": f"http://{greptime_ip}:{greptime_port}/v1/prometheus", + "prom_remote_write_url": f"http://{greptime_ip}:{greptime_port}/v1/prometheus/write", + }, + sort_keys=False, + ), + encoding="utf-8", + ) + return out_path + + def _path_contains_fluxon_pyo3_libs_dir(path: Path) -> bool: return "fluxon_pyo3.libs" in path.parts From a691d7f75bee11671674fb9b48ec58c9a4bc887b Mon Sep 17 00:00:00 2001 From: ActivePeter <1020401660@qq.com> Date: Thu, 25 Jun 2026 02:26:16 +0800 Subject: [PATCH 2/5] test --- fluxon_test_stack/test_runner.py | 17 ++++- fluxon_test_stack/test_runner_ci_runtime.py | 46 ++++++++++-- .../test_test_runner_testbed_contract.py | 70 +++++++++++++++++-- 3 files changed, 120 insertions(+), 13 deletions(-) diff --git a/fluxon_test_stack/test_runner.py b/fluxon_test_stack/test_runner.py index 5275fe9..bcef3a5 100644 --- a/fluxon_test_stack/test_runner.py +++ b/fluxon_test_stack/test_runner.py @@ -11445,9 +11445,22 @@ def _run_adapter_action( def _run_subprocess(argv: List[str], *, cwd: str) -> None: print("RUN:", " ".join(_shell_quote(a) for a in argv), flush=True) - proc = subprocess.run(argv, cwd=cwd) + proc = subprocess.run(argv, cwd=cwd, capture_output=True, text=True) + if proc.stdout: + sys.stdout.write(proc.stdout) + if not proc.stdout.endswith("\n"): + sys.stdout.write("\n") + sys.stdout.flush() + if proc.stderr: + sys.stderr.write(proc.stderr) + if not proc.stderr.endswith("\n"): + sys.stderr.write("\n") + sys.stderr.flush() if proc.returncode != 0: - raise RuntimeError(f"command failed: rc={proc.returncode}") + raise RuntimeError( + "command failed: " + f"rc={proc.returncode} cwd={cwd} argv={' '.join(_shell_quote(a) for a in argv)}" + ) _SSH_TRANSPORT_TIMEOUT_SECONDS = 180.0 diff --git a/fluxon_test_stack/test_runner_ci_runtime.py b/fluxon_test_stack/test_runner_ci_runtime.py index bef19e2..9e89066 100644 --- a/fluxon_test_stack/test_runner_ci_runtime.py +++ b/fluxon_test_stack/test_runner_ci_runtime.py @@ -11,12 +11,45 @@ def _ci_runtime_python_executable() -> str: - python_bin = shutil.which(_CI_RUNTIME_PYTHON_BIN_NAME) - if python_bin is None: + candidates = [] + seen: set[str] = set() + for raw_candidate in ( + _CI_RUNTIME_PYTHON_BIN_NAME, + "python3", + "python", + ): + resolved = shutil.which(raw_candidate) + if resolved is None or resolved in seen: + continue + seen.add(resolved) + candidates.append(resolved) + if not candidates: raise ValueError( - "CI runtime requires python3.10 on PATH to create the offline-wheelhouse venv" + "CI runtime requires a Python 3.10 interpreter on PATH to create the offline-wheelhouse venv" ) - return python_bin + for python_bin in candidates: + if _python_executable_abi(python_bin) == _TEST_STACK_DEFAULT_PYTHON_ABI: + return python_bin + raise ValueError( + "CI runtime requires a Python 3.10 interpreter on PATH to create the offline-wheelhouse venv" + ) + + +def _python_executable_abi(python_bin: str) -> str: + try: + return subprocess.check_output( + [ + python_bin, + "-c", + ( + "import sys; " + "print(f'{sys.implementation.name}{sys.version_info[0]}.{sys.version_info[1]}')" + ), + ], + text=True, + ).strip() + except (OSError, subprocess.CalledProcessError) as exc: + raise ValueError(f"failed to probe python ABI for executable: {python_bin}") from exc def _ci_runtime_python_abi( @@ -67,7 +100,10 @@ def _create_ci_runtime_venv( if venv_dir.exists(): raise ValueError(f"venv dir already exists (no overwrite): {venv_dir}") python_bin = _ci_runtime_python_executable() - run_subprocess([python_bin, "-m", "venv", str(venv_dir)]) + # Create the CI venv without ensurepip. GitHub-hosted Python 3.10 can fail inside + # venv's implicit ensurepip step even though the interpreter already exposes pip + # through system site-packages. + run_subprocess([python_bin, "-m", "venv", "--system-site-packages", "--without-pip", str(venv_dir)]) venv_python = venv_dir / "bin" / "python3" if not venv_python.exists(): raise ValueError(f"venv python not found after creation: {venv_python}") diff --git a/fluxon_test_stack/tests/test_test_runner_testbed_contract.py b/fluxon_test_stack/tests/test_test_runner_testbed_contract.py index 7d93f09..dab98bf 100644 --- a/fluxon_test_stack/tests/test_test_runner_testbed_contract.py +++ b/fluxon_test_stack/tests/test_test_runner_testbed_contract.py @@ -5,6 +5,7 @@ import importlib.util import json import os +import subprocess import sys import tarfile import tempfile @@ -35,6 +36,7 @@ def _load_module(): _RUNNER = _load_module() +_CI_RUNTIME_MOD = sys.modules["test_runner_ci_runtime"] class TestTestRunnerTestbedContract(unittest.TestCase): @@ -70,10 +72,23 @@ def test_write_ci_master_owner_configs_emits_owner_large_file_paths(self) -> Non def test_ci_runtime_python_executable_requires_python310_on_path(self) -> None: with mock.patch.object(_RUNNER.shutil, "which", return_value=None): - with self.assertRaisesRegex(ValueError, "requires python3.10 on PATH"): + with self.assertRaisesRegex(ValueError, "requires a Python 3.10 interpreter on PATH"): _RUNNER._ci_runtime_python_executable() - def test_create_ci_runtime_venv_uses_python310(self) -> None: + def test_ci_runtime_python_executable_accepts_python3_alias_when_it_is_python310(self) -> None: + with mock.patch.object( + _RUNNER.shutil, + "which", + side_effect=lambda name: { + "python3.10": None, + "python3": "/usr/bin/python3", + "python": "/usr/bin/python", + }.get(name), + ): + with mock.patch.object(_CI_RUNTIME_MOD, "_python_executable_abi", return_value="cpython3.10"): + self.assertEqual(_RUNNER._ci_runtime_python_executable(), "/usr/bin/python3") + + def test_create_ci_runtime_venv_uses_python310_abi_without_ensurepip(self) -> None: with tempfile.TemporaryDirectory() as td: run_dir = Path(td) venv_dir = (run_dir / "venv").resolve() @@ -82,21 +97,64 @@ def test_create_ci_runtime_venv_uses_python310(self) -> None: def _fake_create_venv(argv: list[str], *, cwd: str) -> None: self.assertEqual( argv, - ["/usr/bin/python3.10", "-m", "venv", str(venv_dir)], + [ + "/usr/bin/python3.10", + "-m", + "venv", + "--system-site-packages", + "--without-pip", + str(venv_dir), + ], ) self.assertEqual(cwd, str(run_dir)) expected_venv_python.parent.mkdir(parents=True, exist_ok=True) expected_venv_python.write_text("#!/bin/sh\n", encoding="utf-8") with mock.patch.object(_RUNNER.shutil, "which", return_value="/usr/bin/python3.10"): - with mock.patch.object(_RUNNER, "_run_subprocess", side_effect=_fake_create_venv) as run_subprocess_mock: - with mock.patch.object(_RUNNER, "_assert_ci_runtime_python_abi") as assert_python_abi: - venv_python = _RUNNER._create_ci_runtime_venv(run_dir=run_dir) + with mock.patch.object(_CI_RUNTIME_MOD, "_python_executable_abi", return_value="cpython3.10"): + with mock.patch.object(_RUNNER, "_run_subprocess", side_effect=_fake_create_venv) as run_subprocess_mock: + with mock.patch.object(_RUNNER, "_assert_ci_runtime_python_abi") as assert_python_abi: + venv_python = _RUNNER._create_ci_runtime_venv(run_dir=run_dir) self.assertEqual(venv_python, expected_venv_python) run_subprocess_mock.assert_called_once() assert_python_abi.assert_called_once_with(venv_python=expected_venv_python) + def test_runner_native_bin_kvtest_scene_stays_on_direct_wrapper_command(self) -> None: + suite = _RUNNER._parse_suite_config( + yaml.safe_load( + (REPO_ROOT / "fluxon_test_stack" / "ci_test_list.yaml").read_text(encoding="utf-8") + ) + ) + cases = _RUNNER._expand_cases(suite) + case = next(item for item in cases if item.scene_id == "ci_top_attention_bin_kvtest" and item.profile_id == "fluxon_tcp") + + planned = _RUNNER._build_ci_execution_plan(case, suite) + + self.assertEqual(len(planned), 1) + self.assertEqual(planned[0].ci_commands[0]["id"], "top_attention_bin_kvtest") + self.assertIn( + "fluxon_test_stack/top_attention_test_index/_bin_kvtest.py", + planned[0].ci_commands[0]["command"], + ) + + def test_run_subprocess_reports_cwd_and_argv_on_failure(self) -> None: + completed = subprocess.CompletedProcess( + args=["/usr/bin/python3", "-c", "raise SystemExit(2)"], + returncode=2, + stdout="", + stderr="boom\n", + ) + with mock.patch.object(_RUNNER.subprocess, "run", return_value=completed): + with self.assertRaisesRegex( + RuntimeError, + r"command failed: rc=2 cwd=/tmp argv=/usr/bin/python3 -c 'raise SystemExit\(2\)'", + ): + _RUNNER._run_subprocess( + ["/usr/bin/python3", "-c", "raise SystemExit(2)"], + cwd="/tmp", + ) + def test_assert_ci_runtime_python_abi_accepts_python310_venv(self) -> None: with mock.patch.object(_RUNNER.subprocess, "check_output", return_value="cpython3.10\n") as check_output_mock: _RUNNER._assert_ci_runtime_python_abi(venv_python=Path("/tmp/venv/bin/python3")) From 11db4def6d62656b5495073e6cf127d6abcb3a94 Mon Sep 17 00:00:00 2001 From: ActivePeter <1020401660@qq.com> Date: Thu, 25 Jun 2026 11:28:11 +0800 Subject: [PATCH 3/5] test --- fluxon_test_stack/test_profile_adapter.py | 44 +------------- fluxon_test_stack/test_runner.py | 59 +------------------ fluxon_test_stack/test_runner_models.py | 1 - .../test_runner_runtime_backend.py | 10 ---- .../test_test_runner_testbed_contract.py | 32 ++++++++++ 5 files changed, 35 insertions(+), 111 deletions(-) diff --git a/fluxon_test_stack/test_profile_adapter.py b/fluxon_test_stack/test_profile_adapter.py index 57afbdc..b522810 100644 --- a/fluxon_test_stack/test_profile_adapter.py +++ b/fluxon_test_stack/test_profile_adapter.py @@ -388,7 +388,7 @@ def main() -> None: parser = argparse.ArgumentParser( description="Fluxon deployer adapter (Deployment YAML subset; produces deploy_result.yaml)." ) - parser.add_argument("--action", required=True, choices=["deploy", "collect", "teardown"]) + parser.add_argument("--action", required=True, choices=["deploy", "teardown"]) parser.add_argument( "--workdir", required=True, @@ -453,10 +453,6 @@ def main() -> None: ) return - if args.action == "collect": - _action_collect(run_dir, controller_url, instances) - return - if args.action == "teardown": _action_teardown(controller_url, instances) return @@ -796,29 +792,6 @@ def _action_deploy( -def _action_collect(run_dir: Path, controller_url: str, instances: List[_InstanceReq]) -> None: - logs_dir = run_dir / "logs" - logs_dir.mkdir(parents=True, exist_ok=True) - - for inst in instances: - inst_dir = logs_dir / inst.id - inst_dir.mkdir(parents=True, exist_ok=True) - - # English note: - # - /api/status is an observability endpoint. During transient runtime failures (e.g. P2P timeouts) - # the controller may return a non-2xx HTTP status. Treat that as a captured status, not as a - # hard failure of the "collect" phase, so the runner can still finalize deterministically using - # terminal artifacts (summary.yaml / benchmark_result.json). - status_code, status = _http_status_allow_error( - controller_url, - inst.controller_target, - inst.workload_kind, - inst.workload_name, - inst.authority, - ) - _write_yaml_file(inst_dir / "status.yaml", {"status_code": int(status_code), "status": status}) - - def _action_teardown(controller_url: str, instances: List[_InstanceReq]) -> None: for inst in instances: resp = _http_delete_generation( @@ -1174,21 +1147,6 @@ def _http_status(controller_url: str, target: str, kind: str, name: str) -> Dict return _http_json(req) -def _http_status_allow_error( - controller_url: str, - target: str, - kind: str, - name: str, - authority: str, -) -> tuple[int, Dict[str, Any]]: - qs = urllib.parse.urlencode( - {"target": target, "kind": kind, "name": name, "authority": authority} - ) - url = controller_url + "/api/status?" + qs - req = _new_controller_request(url, method="GET") - return _http_json_allow_error_status(req) - - def _http_delete_generation( controller_url: str, target: str, diff --git a/fluxon_test_stack/test_runner.py b/fluxon_test_stack/test_runner.py index bcef3a5..161063e 100644 --- a/fluxon_test_stack/test_runner.py +++ b/fluxon_test_stack/test_runner.py @@ -563,7 +563,7 @@ def _redirect_process_stdio_to_log( - test_runner can run for hours under terminal/session wrappers that may disappear while the suite is still executing. - A deleted PTY turns ordinary `print(..., flush=True)` into `OSError(EIO)`, which aborts the - runner in collect/finalize paths and leaves case_runs.yaml stuck at a reserved run. + runner in shutdown/finalize paths and leaves case_runs.yaml stuck at a reserved run. - Use a deterministic per-workdir log sink for the whole process, including child subprocesses. """ global _RUNNER_STDIO_LOG_FP @@ -1014,10 +1014,6 @@ def main() -> None: ) _write_yaml_file(run_dir / "summary.yaml", summary) - _run_adapter_action( - resolved_case, run_dir=run_dir, action="collect" - ) - outcome = RUN_OUTCOME_SUCCESS @@ -3076,16 +3072,6 @@ def _deploy_runtime_phase( return _deploy_runtime_phase_after_stage(resolved_case, run_dir=run_dir, phase=phase) -def _collect_runtime_phase( - resolved_case: Dict[str, Any], - *, - run_dir: Path, - phase: _RuntimePhase, -) -> None: - _write_runtime_phase_inputs(resolved_case, run_dir=run_dir, phase=phase) - _run_adapter_action(resolved_case, run_dir=run_dir, action="collect") - - def _ci_cluster_runtime_stage(resolved_case: Dict[str, Any]) -> _RemoteRunDirStage: verify_relpaths = list(CI_CLUSTER_RUNTIME_REMOTE_STAGE_VERIFY_RELPATHS) if _ci_has_instance(resolved_case, instance_id="owner_0"): @@ -3139,12 +3125,6 @@ def _ci_runtime_phase(resolved_case: Dict[str, Any], phase_id: str) -> _RuntimeP write_ctx="CI", stage_run_dir=_ci_runner_runtime_stage(resolved_case), ), - "collect_all": _RuntimePhase( - phase_id="collect_all", - layer=RUNTIME_LAYER_CASE, - instance_ids=CI_RUNTIME_INSTANCE_IDS, - write_ctx="CI", - ), } try: return phases[phase_id] @@ -3196,24 +3176,6 @@ def _test_stack_runtime_phase( write_ctx="TEST_STACK", stage_run_dir=stage_run_dir, ) - if phase_id == "collect_nodes": - if node_ids is None or not node_ids: - raise ValueError("TEST_STACK collect_nodes phase requires non-empty node_ids") - return _RuntimePhase( - phase_id="collect_nodes", - layer=RUNTIME_LAYER_CASE, - instance_ids=node_ids, - write_ctx="TEST_STACK", - ) - if phase_id == "collect_coordinator": - if node_ids is not None: - raise ValueError("TEST_STACK collect_coordinator phase does not accept node_ids") - return _RuntimePhase( - phase_id="collect_coordinator", - layer=RUNTIME_LAYER_CASE, - instance_ids=("coordinator",), - write_ctx="TEST_STACK", - ) raise ValueError(f"unsupported TEST_STACK runtime phase: {phase_id}") @@ -3241,14 +3203,6 @@ def _compile_case_plan(resolved_case: Dict[str, Any]) -> _CasePlan: execute_phases=( _ci_runtime_phase(resolved_case, "ci_runner"), ), - collect_phases=( - _RuntimePhase( - phase_id="collect_all", - layer=RUNTIME_LAYER_CASE, - instance_ids=case_instance_ids, - write_ctx="CI", - ), - ), ) if case_family == CASE_FAMILY_BENCH: deploy = _require_dict(resolved_case.get("deploy"), "resolved_case.deploy") @@ -3305,15 +3259,6 @@ def _compile_case_plan(resolved_case: Dict[str, Any]) -> _CasePlan: include_stage_run_dir=False, ), ), - collect_phases=( - _test_stack_runtime_phase(phase_id="collect_nodes", node_ids=node_ids_tuple), - _RuntimePhase( - phase_id="collect_coordinator", - layer=RUNTIME_LAYER_CASE, - instance_ids=prepare_ids_tuple, - write_ctx="TEST_STACK", - ), - ), ) raise ValueError(f"unsupported case family for case plan: {case_family}") @@ -11414,7 +11359,7 @@ def _run_adapter_action( run_dir: Path, action: str, ) -> Optional[Dict[str, Any]]: - if action not in ("deploy", "collect", "teardown"): + if action not in ("deploy", "teardown"): raise ValueError(f"invalid adapter action: {action}") deploy = _require_dict(resolved_case.get("deploy"), "resolved_case.deploy") diff --git a/fluxon_test_stack/test_runner_models.py b/fluxon_test_stack/test_runner_models.py index cb38467..dcb3a5c 100644 --- a/fluxon_test_stack/test_runner_models.py +++ b/fluxon_test_stack/test_runner_models.py @@ -85,7 +85,6 @@ class _CasePlan: case_family: str prepare_phases: Tuple[_RuntimePhase, ...] execute_phases: Tuple[_RuntimePhase, ...] - collect_phases: Tuple[_RuntimePhase, ...] @dataclass(frozen=True) diff --git a/fluxon_test_stack/test_runner_runtime_backend.py b/fluxon_test_stack/test_runner_runtime_backend.py index 14a85e4..9e35e73 100644 --- a/fluxon_test_stack/test_runner_runtime_backend.py +++ b/fluxon_test_stack/test_runner_runtime_backend.py @@ -394,8 +394,6 @@ def _execute_ci_case( counted=False, ci_out={"rc": rc}, ) - for phase in prepared_case.plan.collect_phases: - ctx._collect_runtime_phase(resolved_case, run_dir=run_dir, phase=phase) return ctx._ExecutedCase(outcome=outcome, summary=summary) @@ -414,7 +412,6 @@ def _execute_test_stack_case( outcome = ctx.RUN_OUTCOME_FAILED error_detail: Optional[str] = None - collect_error_detail: Optional[str] = None result_obj: Optional[Dict[str, Any]] = None try: @@ -445,12 +442,6 @@ def _execute_test_stack_case( outcome = ctx.RUN_OUTCOME_SUCCESS except Exception as exc: # noqa: BLE001 error_detail = f"{type(exc).__name__}: {exc}" - finally: - try: - for phase in prepared_case.plan.collect_phases: - ctx._collect_runtime_phase(resolved_case, run_dir=run_dir, phase=phase) - except Exception as exc: # noqa: BLE001 - collect_error_detail = f"{type(exc).__name__}: {exc}" summary = { "schema_version": ctx.SCHEMA_VERSION, @@ -472,7 +463,6 @@ def _execute_test_stack_case( "result_path": str(_require_test_stack_result_path(prepared_case.test_stack_result_path)), "result": result_obj, "error": error_detail, - "collect_error": collect_error_detail, }, } return ctx._ExecutedCase(outcome=outcome, summary=summary) diff --git a/fluxon_test_stack/tests/test_test_runner_testbed_contract.py b/fluxon_test_stack/tests/test_test_runner_testbed_contract.py index dab98bf..982378f 100644 --- a/fluxon_test_stack/tests/test_test_runner_testbed_contract.py +++ b/fluxon_test_stack/tests/test_test_runner_testbed_contract.py @@ -480,6 +480,38 @@ def test_top_attention_mq_core_ci_execution_plan_is_runner_native(self) -> None: ) self.assertIn("--case-config __RUN_DIR__/configs/ci_scene_config.yaml", planned[0].ci_commands[0]["command"]) + def test_top_attention_mq_core_ci_plan_has_no_collect_phase(self) -> None: + resolved_case = { + "case": { + "family": "ci", + "case_id": "ci_top_attention_mq_core__n1_kvowner_dram_20gib__fluxon_tcp_thread", + }, + "scene": { + "ci": { + "runtime_contract": "cluster_kv_owner", + "subject": "mq", + }, + }, + "deploy": { + "instances": [ + {"id": "master"}, + {"id": "owner_0"}, + {"id": "ci_runner"}, + ], + }, + "runtime_model": { + "test_bed": {"kind": "ops"}, + "base_runtime": {}, + "case_runtime": {"instance_ids": ["master", "owner_0", "ci_runner"]}, + }, + } + case_plan = _RUNNER._compile_case_plan(resolved_case) + self.assertEqual( + tuple(case_plan.__dataclass_fields__.keys()), + ("case_family", "prepare_phases", "execute_phases"), + ) + self.assertEqual(case_plan.execute_phases[0].instance_ids, ("ci_runner",)) + def test_doc_page_ci_execution_plan_uses_online_docker_image(self) -> None: suite_cfg = yaml.safe_load((_RUNNER.RUNNER_REPO_ROOT / "fluxon_test_stack" / "ci_test_list.yaml").read_text(encoding="utf-8")) suite = _RUNNER._parse_suite_config(suite_cfg) From c3bc48d04067e4008c2a2a35177e469bb268149b Mon Sep 17 00:00:00 2001 From: ActivePeter <1020401660@qq.com> Date: Thu, 25 Jun 2026 12:28:38 +0800 Subject: [PATCH 4/5] test --- fluxon_test_stack/test_profile_adapter.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/fluxon_test_stack/test_profile_adapter.py b/fluxon_test_stack/test_profile_adapter.py index b522810..006c48c 100644 --- a/fluxon_test_stack/test_profile_adapter.py +++ b/fluxon_test_stack/test_profile_adapter.py @@ -1002,6 +1002,21 @@ def _wait_running( time.sleep(1.0) +def _http_status_allow_error( + controller_url: str, + target: str, + kind: str, + name: str, + authority: str, +) -> tuple[int, Dict[str, Any]]: + qs = urllib.parse.urlencode( + {"target": target, "kind": kind, "name": name, "authority": authority} + ) + url = controller_url + "/api/status?" + qs + req = _new_controller_request(url, method="GET") + return _http_json_allow_error_status(req) + + def _http_deploy(controller_url: str, yaml_text: str) -> Dict[str, Any]: url = controller_url + "/api/deploy" data = yaml_text.encode("utf-8") From c048a6ccb7669c4999d6e85b2ffbb3dc1538f0c0 Mon Sep 17 00:00:00 2001 From: ActivePeter <1020401660@qq.com> Date: Thu, 25 Jun 2026 18:05:58 +0800 Subject: [PATCH 5/5] test --- ...13\350\257\225\346\265\201\347\250\213.md" | 75 +++++++------- fluxon_test_stack/test_runner.py | 32 +++--- fluxon_test_stack/test_runner_ci_runtime.py | 9 +- .../tests/test_runner_contract.py | 66 ++++++++++++- .../test_test_runner_testbed_contract.py | 97 +++++++++++++++---- ...st_top_attention_cargo_kv_unit_contract.py | 24 +++-- .../test_top_attention_common_contract.py | 43 +++++++- .../top_attention_test_index/README.md | 2 +- .../_cargo_kv_unit.py | 42 +++----- .../top_attention_test_index/_common.py | 72 +++++++------- 10 files changed, 306 insertions(+), 156 deletions(-) diff --git "a/fluxon_doc_cn/design/teststack_1_\345\275\223\345\211\215\346\236\266\346\236\204\344\270\216CI\346\265\213\350\257\225\346\265\201\347\250\213.md" "b/fluxon_doc_cn/design/teststack_1_\345\275\223\345\211\215\346\236\266\346\236\204\344\270\216CI\346\265\213\350\257\225\346\265\201\347\250\213.md" index 7a84e2c..1b542a6 100644 --- "a/fluxon_doc_cn/design/teststack_1_\345\275\223\345\211\215\346\236\266\346\236\204\344\270\216CI\346\265\213\350\257\225\346\265\201\347\250\213.md" +++ "b/fluxon_doc_cn/design/teststack_1_\345\275\223\345\211\215\346\236\266\346\236\204\344\270\216CI\346\265\213\350\257\225\346\265\201\347\250\213.md" @@ -10,8 +10,8 @@ - `teststack` 由三层组成: - **上层:suite 编译层**:将 `scene × scale × profile` 组合成可执行 case; - - **中层:统一 case plan / dispatch 层**:把编译结果收敛成统一的 `prepare / execute / collect / finalize` 外壳,并按 runtime backend 分发; - - **下层:runtime backend 执行层**:分别承接 `CI` backend 和 `TEST_STACK` backend 的具体 prepare、execute、collect、finalize 实现。 + - **中层:统一 case plan / dispatch 层**:把编译结果收敛成统一的 `prepare / execute` 外壳,并按 runtime backend 分发;结果观测和终态落盘放在 execute / finalize 两段里完成; + - **下层:runtime backend 执行层**:分别承接 `CI` backend 和 `TEST_STACK` backend 的具体 prepare、execute、finalize 实现。 - `test_runner.py` 是统一执行器,覆盖 `CI` case、`TEST_STACK` benchmark case,以及 UI / GitOps 集成入口。 - `test_runner.py` 当前主要承载上层和中层;`test_runner_runtime_backend.py` 承载下层 runtime backend 实现。 - `start_test_bed.py` 只负责共享 testbed 的启动与 controller 侧 apply 编排,不承担通用测试执行职责。 @@ -31,7 +31,7 @@ | 模块 / 文件 | 职责 | 不负责什么 | | --- | --- | --- | | `fluxon_test_stack/ci_test_list.yaml` | 定义 suite:`run`、`scenes`、`scales`、`artifact_sets`、`profiles` | 不直接执行任何 case | -| `fluxon_test_stack/test_runner.py` | 统一 runner。负责解析 suite、展开 case、生成 `resolved_case`、驱动 prepare / execute / collect / finalize | 不直接拥有共享 testbed 的长期生命周期 | +| `fluxon_test_stack/test_runner.py` | 统一 runner。负责解析 suite、展开 case、生成 `resolved_case`、驱动 prepare / execute,并在 finalize 路径完成收尾 | 不直接拥有共享 testbed 的长期生命周期 | | `fluxon_test_stack/start_test_bed.py` | 共享 testbed 启动协调器;负责 bare bootstrap 和 controller apply 顺序 | 不负责按 case 执行测试命令 | | `fluxon_test_stack/start_test_bed.yaml` | testbed 启动契约;描述 bootstrap phases、controller、UI、deploy_workloads | 不定义单个 case 的测试命令 | | `fluxon_test_stack/ci_2_virt_node.py` | 双逻辑节点 CI 封装;生成本地化 deployconf / start_test_bed 配置,并串起整条 CI 流程 | 不替代 `test_runner.py` 的 case 执行逻辑 | @@ -78,7 +78,7 @@ flowchart TD 这里的关键点是: - **上层统一的是 schema 和 case 编译模型**; -- **中层统一的是 `prepare / execute / collect / finalize` 的外壳**; +- **中层统一的是 `prepare / execute` 的外壳**;结果观测和 finalize 是 runner 级收尾,不是 `_CasePlan` 的 phase; - **下层不再按 `scene/scale/profile` 切分,而是按 runtime backend 切分**。 这意味着: @@ -162,12 +162,11 @@ scene / scale / profile 本层由 `test_runner.py` 驱动。 -它对每个 case 做四类动作: +它对每个 case 做三类动作: 1. 准备输入:release、test_rsc、运行时配置、远端 run_dir; -2. 执行主体:远端 executor、benchmark node 或场景专用 workload; -3. collect:收集日志和结果; -4. finalize:回收 runtime、更新 `summary.yaml`、更新 `case_runs.yaml`。 +2. 执行主体并完成结果观测 / 落盘:远端 executor、benchmark node 或场景专用 workload; +3. finalize:回收 runtime、更新 `summary.yaml`、更新 `case_runs.yaml`。 **核心事实:** @@ -217,7 +216,7 @@ suite 中有两大类场景: - 让 controller / deployer 回到可接单状态; - 不运行单个测试 case。 2. `test_runner.py` - - 解决 suite 下每个 case 怎么编译、怎么执行、怎么收集、怎么收尾; + - 解决 suite 下每个 case 怎么编译、怎么执行、怎么收尾; - 它依赖 testbed 已经存在,或者在 controller 离线时尝试触发一次 bootstrap。 这两个步骤描述职责分离;testbed 仍可包含 UI 或 GitOps 相关工作负载。 @@ -401,7 +400,7 @@ deploy.instances 不写死在 suite 中。Runner 会结合 scale、profile 和 - `scale.targets` - profile 中的场景 runtime 模板 -生成后的 deploy.instances 是后续 prepare / execute / collect phase 的部署输入。实例集合和顺序必须稳定,因为后续 phase 规划会依赖它们。 +生成后的 deploy.instances 是后续 prepare / execute 输入的部署基础。实例集合和顺序必须稳定,因为后续执行计划会依赖它们。 ### 7.7 `CI` 特化编译逻辑 @@ -478,18 +477,18 @@ sequenceDiagram R->>R: parse suite + expand cases + build resolved_case R->>R: materialize release/test_rsc - R->>R: plan prepare / execute / collect phases + R->>R: plan prepare / execute phases R->>C: deploy phase workloads C->>N: start remote workloads N->>N: run scene-specific workload R->>N: observe logs / status / result markers - R->>C: collect all instances + R->>C: finalize runtime cleanup R->>R: write summary.yaml + update case_runs.yaml ``` ### 8.2 phase 规划 -`test_runner.py` 会先把每个 case 编译成 `_CasePlan`。这里有一个通用骨架:所有 case 都分成 `prepare_phases / execute_phases / collect_phases` 三段。不同场景的差异不在“三段结构本身”,而在于每段里放哪些 runtime phase、每个 phase 覆盖哪些 instance,以及 run_dir 怎样 staging。 +`test_runner.py` 会先把每个 case 编译成 `_CasePlan`。这里有一个通用骨架:所有 case 都分成 `prepare_phases / execute_phases` 两段。不同场景的差异不在“两段结构本身”,而在于每段里放哪些 runtime phase、每个 phase 覆盖哪些 instance,以及 run_dir 怎样 staging。结果观测和 finalize 不属于 `_CasePlan`。 这里要明确: @@ -500,16 +499,18 @@ sequenceDiagram 通用语义如下: - prepare phase 先准备场景依赖的 runtime、配置、脚本和共享目录; -- execute phase 执行场景主体 workload; -- collect phase 汇总 deploy 侧运行结果和日志; +- execute phase 执行场景主体 workload,并在需要时观测结果、写回摘要; +- finalize 路径做 runtime cleanup,并更新 `summary.yaml` / `case_runs.yaml`; - phase 输入来自 `resolved_case.yaml`,完整视图保存在 `resolved_case_full.yaml`。 当前两类场景的 `_CasePlan` 形状如下: -| 场景 | prepare_phases | execute_phases | collect_phases | -| --- | --- | --- | --- | -| `CI` | `cluster_runtime` | `ci_runner` | `collect_all` | -| `TEST_STACK` / bench | `coordinator`、`node_runtime` | `nodes` | `collect_nodes`、`collect_coordinator` | +| 场景 | prepare_phases | execute_phases | +| --- | --- | --- | +| `CI` | `cluster_runtime` | `ci_runner` | +| `TEST_STACK` / bench | `coordinator`、`node_runtime` | `nodes` | + +`CI` 和 `TEST_STACK` 的结果观测、摘要写回和清理都在 execute / finalize 路径里完成,不再单独拆出额外的收尾阶段。 ### 8.3 远端 run_dir staging @@ -528,19 +529,18 @@ staging 内容由场景和 phase 决定,通常包括: - deployer adapter 每次只消费该 phase 需要的 instance 子集; - 完整的 case 视图另存为 `resolved_case_full.yaml`。 -### 8.4 观测、collect 与 finalize +### 8.4 观测、结果写回与 finalize `test_runner.py` 是 case 执行的观测者和收敛者。它会根据场景定义的日志、状态和结果标记判断执行是否完成。 当场景主体 workload 返回终态后,`test_runner.py` 继续执行两类动作: -1. `collect` - - 对 phase / instance 做 collect; - - 把 deploy 侧运行结果和日志汇总回来。 +1. 结果观测与摘要写回 + - 读取 exit code、result file 或其他终态标记; + - 把 run 结果写入 `summary.yaml`。 2. `finalize` - - 更新 `summary.yaml` - - 更新 `case_runs.yaml` - - 做 runtime cleanup + - 更新 `case_runs.yaml`; + - 做 runtime cleanup。 需要区分两个对象: @@ -551,14 +551,14 @@ staging 内容由场景和 phase 决定,通常包括: ### 8.5 `CI` 特化:每段里放什么 -`CI` 的特化点不是“三段结构本身”,而是三段里放的 phase 比较固定: +`CI` 的特化点不是“两段结构本身”,而是两段里放的 phase 比较固定: - prepare_phases - `cluster_runtime` - execute_phases - `ci_runner` -- collect_phases - - `collect_all` + +CI 的结果观测靠 `ci_runner` 退出码和 runner 的 summary 写回完成,不再单独拆出额外的收尾阶段。 其中 prepare 阶段只负责 cluster runtime: @@ -580,16 +580,15 @@ staging 内容由场景和 phase 决定,通常包括: ### 8.6 `TEST_STACK` / bench 对照:每段里放什么 -`TEST_STACK` / bench 同样走 `prepare / execute / collect` 三段骨架,但段内 phase 不同: +`TEST_STACK` / bench 同样走 `prepare / execute` 两段骨架,但段内 phase 不同: - prepare_phases - `coordinator` - `node_runtime` - execute_phases - `nodes` -- collect_phases - - `collect_nodes` - - `collect_coordinator` + +`TEST_STACK` 的结果观测靠 benchmark result file 完成,finalize 负责收尾和清理,不再单独拆出额外的收尾阶段。 这些 phase 的职责分别是: @@ -598,16 +597,12 @@ staging 内容由场景和 phase 决定,通常包括: - `node_runtime` - 把 benchmark config 和 runtime bundle staging 到各个 benchmark node; - `nodes` - - 真正启动 job 型 benchmark node workload; -- `collect_nodes` - - 汇总各个 benchmark node 的结果和日志; -- `collect_coordinator` - - 再收 coordinator 侧的汇总结果。 + - 真正启动 job 型 benchmark node workload,并等待结果文件就绪。 -所以 `bench` 也是三段。它和 `CI` 共用同一个 `_CasePlan` 外壳;真正的特化点是: +所以 `bench` 也是两段。它和 `CI` 共用同一个 `_CasePlan` 外壳;真正的特化点是: - `CI` 用单个 `ci_runner` job 串行执行命令列表; -- `TEST_STACK` / bench 用 `coordinator + node runtime + node jobs` 的多 phase 结构展开。 +- `TEST_STACK` / bench 用 `coordinator + node runtime + node jobs` 的多 phase 结构展开,结果观测和收尾由 execute / finalize 路径承担。 ### 8.7 `CI` 特化:prepare 子步骤 diff --git a/fluxon_test_stack/test_runner.py b/fluxon_test_stack/test_runner.py index 161063e..f187678 100644 --- a/fluxon_test_stack/test_runner.py +++ b/fluxon_test_stack/test_runner.py @@ -1085,11 +1085,17 @@ def main() -> None: "ERROR: teardown failed; stopping after finalize (no fallback). " f"case_id={case.case_id} err={finalize_error}" ) - if case_family == CASE_FAMILY_BENCH and outcome == RUN_OUTCOME_SUCCESS: - print( - "WARN: TEST_STACK finalize failed after terminal benchmark success; " - f"preserving SUCCESS outcome for case_id={case.case_id} finalize_err={finalize_error}" - ) + if _preserve_success_after_finalize_error(case_family=case_family, outcome=outcome): + if case_family == CASE_FAMILY_BENCH: + print( + "WARN: TEST_STACK finalize failed after terminal benchmark success; " + f"preserving SUCCESS outcome for case_id={case.case_id} finalize_err={finalize_error}" + ) + else: + print( + "WARN: CI finalize failed after terminal ci_runner success; " + f"preserving SUCCESS outcome for case_id={case.case_id} finalize_err={finalize_error}" + ) else: outcome = RUN_OUTCOME_FAILED if suite.run_mode == RUN_MODE_DEBUG_ONE_BY_ONE and outcome != RUN_OUTCOME_SUCCESS: @@ -11390,17 +11396,7 @@ def _run_adapter_action( def _run_subprocess(argv: List[str], *, cwd: str) -> None: print("RUN:", " ".join(_shell_quote(a) for a in argv), flush=True) - proc = subprocess.run(argv, cwd=cwd, capture_output=True, text=True) - if proc.stdout: - sys.stdout.write(proc.stdout) - if not proc.stdout.endswith("\n"): - sys.stdout.write("\n") - sys.stdout.flush() - if proc.stderr: - sys.stderr.write(proc.stderr) - if not proc.stderr.endswith("\n"): - sys.stderr.write("\n") - sys.stderr.flush() + proc = subprocess.run(argv, cwd=cwd) if proc.returncode != 0: raise RuntimeError( "command failed: " @@ -11408,6 +11404,10 @@ def _run_subprocess(argv: List[str], *, cwd: str) -> None: ) +def _preserve_success_after_finalize_error(*, case_family: str, outcome: str) -> bool: + return outcome == RUN_OUTCOME_SUCCESS and case_family in (CASE_FAMILY_BENCH, CASE_FAMILY_CI) + + _SSH_TRANSPORT_TIMEOUT_SECONDS = 180.0 _SSH_TRANSPORT_ARCHIVE_TRANSFER_TIMEOUT_SECONDS = 1800.0 _SSH_TRANSPORT_MAX_ATTEMPTS = 10 diff --git a/fluxon_test_stack/test_runner_ci_runtime.py b/fluxon_test_stack/test_runner_ci_runtime.py index 9e89066..281843f 100644 --- a/fluxon_test_stack/test_runner_ci_runtime.py +++ b/fluxon_test_stack/test_runner_ci_runtime.py @@ -100,12 +100,13 @@ def _create_ci_runtime_venv( if venv_dir.exists(): raise ValueError(f"venv dir already exists (no overwrite): {venv_dir}") python_bin = _ci_runtime_python_executable() - # Create the CI venv without ensurepip. GitHub-hosted Python 3.10 can fail inside - # venv's implicit ensurepip step even though the interpreter already exposes pip - # through system site-packages. - run_subprocess([python_bin, "-m", "venv", "--system-site-packages", "--without-pip", str(venv_dir)]) + # Skip venv's implicit ensurepip step, then seed pip explicitly so the venv stays + # self-contained and does not depend on host site-packages. + run_subprocess([python_bin, "-m", "venv", "--without-pip", str(venv_dir)]) venv_python = venv_dir / "bin" / "python3" if not venv_python.exists(): raise ValueError(f"venv python not found after creation: {venv_python}") + run_subprocess([str(venv_python), "-m", "ensurepip", "--upgrade", "--default-pip"]) + run_subprocess([str(venv_python), "-m", "pip", "--version"]) assert_python_abi(venv_python) return venv_python diff --git a/fluxon_test_stack/tests/test_runner_contract.py b/fluxon_test_stack/tests/test_runner_contract.py index 8fd293d..f5901bb 100644 --- a/fluxon_test_stack/tests/test_runner_contract.py +++ b/fluxon_test_stack/tests/test_runner_contract.py @@ -450,11 +450,71 @@ def test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime() -> None f"unexpected command id: {command.get('id')!r}" ) return - command_text = command.get("command") - if not isinstance(command_text, str) or "_cargo_kv_unit.py --case-config __RUN_DIR__/configs/ci_scene_config.yaml" not in command_text: + scene = suite.scenes.get("ci_top_attention_cargo_kv_unit") + if not isinstance(scene, dict): print( "FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - " - f"unexpected command: {command_text!r}" + "missing cargo kv unit scene" + ) + return + ci = scene.get("ci") + if not isinstance(ci, dict): + print( + "FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - " + "scene.ci missing" + ) + return + if ci.get("subject") != "rust": + print( + "FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - " + f"expected subject 'rust', got {ci.get('subject')!r}" + ) + return + if ci.get("runtime_contract") != "rust_self_managed": + print( + "FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - " + f"expected runtime_contract 'rust_self_managed', got {ci.get('runtime_contract')!r}" + ) + return + profile = suite.profiles.get("fluxon_tcp") + if not isinstance(profile, dict): + print( + "FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - " + "missing fluxon_tcp profile" + ) + return + runtime = profile.get("runtime") + if not isinstance(runtime, dict): + print( + "FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - " + "profile.runtime missing" + ) + return + profile_ci = runtime.get("ci") + if not isinstance(profile_ci, dict): + print( + "FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - " + "profile.runtime.ci missing" + ) + return + scene_configs = profile_ci.get("scene_configs") + if not isinstance(scene_configs, dict): + print( + "FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - " + "profile.runtime.ci.scene_configs missing" + ) + return + cargo_scene_config = scene_configs.get("ci_top_attention_cargo_kv_unit") + if not isinstance(cargo_scene_config, dict): + print( + "FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - " + "missing ci_top_attention_cargo_kv_unit scene config" + ) + return + if cargo_scene_config.get("kv_transport_feature") != "tcp_thread_transport": + print( + "FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - " + f"unexpected kv_transport_feature: {cargo_scene_config.get('kv_transport_feature')!r}" ) return print("PASS: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime") diff --git a/fluxon_test_stack/tests/test_test_runner_testbed_contract.py b/fluxon_test_stack/tests/test_test_runner_testbed_contract.py index 982378f..48c2279 100644 --- a/fluxon_test_stack/tests/test_test_runner_testbed_contract.py +++ b/fluxon_test_stack/tests/test_test_runner_testbed_contract.py @@ -88,27 +88,54 @@ def test_ci_runtime_python_executable_accepts_python3_alias_when_it_is_python310 with mock.patch.object(_CI_RUNTIME_MOD, "_python_executable_abi", return_value="cpython3.10"): self.assertEqual(_RUNNER._ci_runtime_python_executable(), "/usr/bin/python3") - def test_create_ci_runtime_venv_uses_python310_abi_without_ensurepip(self) -> None: + def test_create_ci_runtime_venv_uses_python310_abi_and_seeds_pip(self) -> None: with tempfile.TemporaryDirectory() as td: run_dir = Path(td) venv_dir = (run_dir / "venv").resolve() expected_venv_python = (venv_dir / "bin" / "python3").resolve() + observed_calls: list[list[str]] = [] def _fake_create_venv(argv: list[str], *, cwd: str) -> None: - self.assertEqual( - argv, - [ - "/usr/bin/python3.10", - "-m", - "venv", - "--system-site-packages", - "--without-pip", - str(venv_dir), - ], - ) + observed_calls.append(argv) self.assertEqual(cwd, str(run_dir)) - expected_venv_python.parent.mkdir(parents=True, exist_ok=True) - expected_venv_python.write_text("#!/bin/sh\n", encoding="utf-8") + if len(observed_calls) == 1: + self.assertEqual( + argv, + [ + "/usr/bin/python3.10", + "-m", + "venv", + "--without-pip", + str(venv_dir), + ], + ) + expected_venv_python.parent.mkdir(parents=True, exist_ok=True) + expected_venv_python.write_text("#!/bin/sh\n", encoding="utf-8") + return + if len(observed_calls) == 2: + self.assertEqual( + argv, + [ + str(expected_venv_python), + "-m", + "ensurepip", + "--upgrade", + "--default-pip", + ], + ) + return + if len(observed_calls) == 3: + self.assertEqual( + argv, + [ + str(expected_venv_python), + "-m", + "pip", + "--version", + ], + ) + return + self.fail(f"unexpected _run_subprocess call: argv={argv!r}") with mock.patch.object(_RUNNER.shutil, "which", return_value="/usr/bin/python3.10"): with mock.patch.object(_CI_RUNTIME_MOD, "_python_executable_abi", return_value="cpython3.10"): @@ -117,7 +144,15 @@ def _fake_create_venv(argv: list[str], *, cwd: str) -> None: venv_python = _RUNNER._create_ci_runtime_venv(run_dir=run_dir) self.assertEqual(venv_python, expected_venv_python) - run_subprocess_mock.assert_called_once() + self.assertEqual( + observed_calls, + [ + ["/usr/bin/python3.10", "-m", "venv", "--without-pip", str(venv_dir)], + [str(expected_venv_python), "-m", "ensurepip", "--upgrade", "--default-pip"], + [str(expected_venv_python), "-m", "pip", "--version"], + ], + ) + self.assertEqual(run_subprocess_mock.call_count, 3) assert_python_abi.assert_called_once_with(venv_python=expected_venv_python) def test_runner_native_bin_kvtest_scene_stays_on_direct_wrapper_command(self) -> None: @@ -256,6 +291,32 @@ def test_finalize_ci_case_runtime_preserves_structured_instance_ids(self) -> Non }, ) + def test_finalize_error_preserves_success_for_ci_and_bench(self) -> None: + self.assertTrue( + _RUNNER._preserve_success_after_finalize_error( + case_family=_RUNNER.CASE_FAMILY_CI, + outcome=_RUNNER.RUN_OUTCOME_SUCCESS, + ) + ) + self.assertTrue( + _RUNNER._preserve_success_after_finalize_error( + case_family=_RUNNER.CASE_FAMILY_BENCH, + outcome=_RUNNER.RUN_OUTCOME_SUCCESS, + ) + ) + self.assertFalse( + _RUNNER._preserve_success_after_finalize_error( + case_family=_RUNNER.CASE_FAMILY_CI, + outcome=_RUNNER.RUN_OUTCOME_FAILED, + ) + ) + self.assertFalse( + _RUNNER._preserve_success_after_finalize_error( + case_family=_RUNNER.CASE_FAMILY_INFER, + outcome=_RUNNER.RUN_OUTCOME_SUCCESS, + ) + ) + def test_write_ci_scene_config_yaml_emits_structured_scene_config(self) -> None: with tempfile.TemporaryDirectory() as td: run_dir = Path(td) @@ -506,10 +567,8 @@ def test_top_attention_mq_core_ci_plan_has_no_collect_phase(self) -> None: }, } case_plan = _RUNNER._compile_case_plan(resolved_case) - self.assertEqual( - tuple(case_plan.__dataclass_fields__.keys()), - ("case_family", "prepare_phases", "execute_phases"), - ) + self.assertEqual(tuple(phase.phase_id for phase in case_plan.prepare_phases), ("cluster_runtime",)) + self.assertEqual(tuple(phase.phase_id for phase in case_plan.execute_phases), ("ci_runner",)) self.assertEqual(case_plan.execute_phases[0].instance_ids, ("ci_runner",)) def test_doc_page_ci_execution_plan_uses_online_docker_image(self) -> None: diff --git a/fluxon_test_stack/tests/test_top_attention_cargo_kv_unit_contract.py b/fluxon_test_stack/tests/test_top_attention_cargo_kv_unit_contract.py index 3387ae2..03203c6 100644 --- a/fluxon_test_stack/tests/test_top_attention_cargo_kv_unit_contract.py +++ b/fluxon_test_stack/tests/test_top_attention_cargo_kv_unit_contract.py @@ -3,6 +3,7 @@ from __future__ import annotations import importlib.util +import os import sys import tempfile import unittest @@ -35,7 +36,7 @@ def _load_module(): class TestTopAttentionCargoKvUnitContract(unittest.TestCase): - def test_main_accepts_case_config_and_writes_build_config_ext(self) -> None: + def test_main_accepts_case_config_and_uses_scene_config_feature(self) -> None: with tempfile.TemporaryDirectory() as td: run_dir = Path(td) cfg_dir = run_dir / "configs" @@ -65,13 +66,14 @@ def test_main_accepts_case_config_and_writes_build_config_ext(self) -> None: encoding="utf-8", ) - with mock.patch.object(_ENTRY, "run_cargo", return_value=0) as run_cargo: - with mock.patch.object( - sys, - "argv", - [str(MODULE_PATH), "--case-config", str(case_cfg), "--feature", "tcp_thread_transport"], - ): - rc = _ENTRY.main() + with mock.patch.dict(os.environ, {"FLUXON_KV_TEST_TRANSPORT_FEATURE": "fastws_transport"}, clear=False): + with mock.patch.object(_ENTRY, "run_cargo", return_value=0) as run_cargo: + with mock.patch.object( + sys, + "argv", + [str(MODULE_PATH), "--case-config", str(case_cfg)], + ): + rc = _ENTRY.main() self.assertEqual(rc, 0) build_cfg = yaml.safe_load((src_dir / "build_config_ext.yml").read_text(encoding="utf-8")) @@ -99,7 +101,7 @@ def test_main_accepts_case_config_and_writes_build_config_ext(self) -> None: str((src_dir / "build_config_ext.yml").resolve()), ) - def test_main_rejects_feature_mismatch_when_case_config_is_present(self) -> None: + def test_main_rejects_feature_override_flag(self) -> None: with tempfile.TemporaryDirectory() as td: run_dir = Path(td) cfg_dir = run_dir / "configs" @@ -124,9 +126,11 @@ def test_main_rejects_feature_mismatch_when_case_config_is_present(self) -> None "argv", [str(MODULE_PATH), "--case-config", str(case_cfg), "--feature", "fastws_transport"], ): - with self.assertRaisesRegex(ValueError, "must match scene_config.kv_transport_feature"): + with self.assertRaises(SystemExit) as cm: _ENTRY.main() + self.assertEqual(cm.exception.code, 2) + def test_main_rejects_pytest_style_passthrough_flags(self) -> None: with mock.patch.object(sys, "argv", [str(MODULE_PATH), "-k", "lease"]): with self.assertRaises(SystemExit) as cm: diff --git a/fluxon_test_stack/tests/test_top_attention_common_contract.py b/fluxon_test_stack/tests/test_top_attention_common_contract.py index ff5e442..e66b313 100644 --- a/fluxon_test_stack/tests/test_top_attention_common_contract.py +++ b/fluxon_test_stack/tests/test_top_attention_common_contract.py @@ -27,7 +27,7 @@ def _load_module(): class TestTopAttentionCommonContract(unittest.TestCase): - def test_prepare_cargo_env_prefers_active_fluxon_pyo3_libs_dir_without_overriding_loader_path(self) -> None: + def test_prepare_cargo_env_prefers_active_fluxon_pyo3_libs_dir_and_sanitizes_loader_path(self) -> None: with tempfile.TemporaryDirectory() as td: root = Path(td) active_site_packages = root / "venv" / "lib" / "python3.12" / "site-packages" @@ -56,7 +56,46 @@ def test_prepare_cargo_env_prefers_active_fluxon_pyo3_libs_dir_without_overridin assert prepared_env is not None self.assertEqual(prepared_env["FLUXON_PYO3_LIBS_DIR"], str(active_libs_dir.resolve())) - self.assertEqual(prepared_env["LD_LIBRARY_PATH"], f"{stale_libs_dir}:/usr/lib:/opt/custom") + self.assertEqual(prepared_env["LD_LIBRARY_PATH"], f"{active_libs_dir.resolve()}:/usr/lib:/opt/custom") + self.assertEqual(prepared_env["PATH"], "/usr/bin") + + def test_prepare_cargo_env_places_authoritative_fluxon_root_before_closed_sdk_runtime(self) -> None: + with tempfile.TemporaryDirectory() as td: + root = Path(td) + active_site_packages = root / "venv" / "lib" / "python3.12" / "site-packages" + active_libs_dir = active_site_packages / "fluxon_pyo3.libs" + active_libs_dir.mkdir(parents=True) + closed_sdk_root = root / "fluxon_release" / "closed_sdk" + (closed_sdk_root / "lib").mkdir(parents=True) + (closed_sdk_root / "manifest.json").write_text("{}", encoding="utf-8") + stale_libs_dir = root / "stale" / "site-packages" / "fluxon_pyo3.libs" + stale_libs_dir.mkdir(parents=True) + + with mock.patch.object( + _ENTRY.sysconfig, + "get_paths", + return_value={ + "platlib": str(active_site_packages), + "purelib": str(active_site_packages), + }, + ): + with mock.patch.object(_ENTRY.site, "getsitepackages", return_value=[str(stale_libs_dir.parent)]): + with mock.patch.object(_ENTRY.site, "getusersitepackages", return_value=""): + with mock.patch.object(_ENTRY, "REPO_ROOT", root): + prepared_env = _ENTRY._prepare_cargo_env( + { + "LD_LIBRARY_PATH": f"{stale_libs_dir}:/usr/lib:/opt/custom", + "PATH": "/usr/bin", + } + ) + + assert prepared_env is not None + self.assertEqual(prepared_env["FLUXON_PYO3_LIBS_DIR"], str(active_libs_dir.resolve())) + self.assertEqual(prepared_env["FLUXON_COMMU_CLOSED_SDK_ROOT"], str(closed_sdk_root.resolve())) + self.assertEqual( + prepared_env["LD_LIBRARY_PATH"], + f"{active_libs_dir.resolve()}:{(closed_sdk_root / 'lib').resolve()}:/usr/lib:/opt/custom", + ) self.assertEqual(prepared_env["PATH"], "/usr/bin") def test_prepare_cargo_env_prepends_repo_closed_sdk_runtime(self) -> None: diff --git a/fluxon_test_stack/top_attention_test_index/README.md b/fluxon_test_stack/top_attention_test_index/README.md index 11c55c9..19069f2 100644 --- a/fluxon_test_stack/top_attention_test_index/README.md +++ b/fluxon_test_stack/top_attention_test_index/README.md @@ -51,7 +51,7 @@ Entries: - `_script_tools.py`: script utility coverage - `_cargo_fs_core.py`: cargo tests for the Rust FS core crate. `ci_test_list.yaml` now exposes this wrapper as the formal `ci_top_attention_cargo_fs_core` runner-native scene. - `_cargo_util.py`: cargo tests for the Rust util crate. `ci_test_list.yaml` now exposes this wrapper as the formal `ci_top_attention_cargo_util` runner-native scene, with runtime endpoints supplied through canonical `--case-config`. -- `_cargo_kv_unit.py`: cargo tests for the Rust KV crate. `ci_test_list.yaml` now exposes this wrapper as the formal `ci_top_attention_cargo_kv_unit` runner-native scene, with transport feature selection bounded by `scene_config.kv_transport_feature`. +- `_cargo_kv_unit.py`: cargo tests for the Rust KV crate. `ci_test_list.yaml` now exposes this wrapper as the formal `ci_top_attention_cargo_kv_unit` runner-native scene, with transport feature selection sourced only from canonical `--case-config` (`scene_config.kv_transport_feature`). - `_cargo_cli.py`: cargo tests for the Rust CLI crate - `_cargo_commu.py`: cargo tests for the Rust communication facade crate - `_cargo_commu_contract.py`: cargo tests for the Rust communication contract crate diff --git a/fluxon_test_stack/top_attention_test_index/_cargo_kv_unit.py b/fluxon_test_stack/top_attention_test_index/_cargo_kv_unit.py index 43ce921..56dfc81 100755 --- a/fluxon_test_stack/top_attention_test_index/_cargo_kv_unit.py +++ b/fluxon_test_stack/top_attention_test_index/_cargo_kv_unit.py @@ -2,7 +2,6 @@ from __future__ import annotations import argparse -import os from pathlib import Path from _common import ( @@ -24,35 +23,24 @@ def main() -> int: ) parser.add_argument( "--case-config", + required=True, help="Canonical CI case config YAML emitted by test_runner.", ) - parser.add_argument( - "--feature", - default=os.environ.get("FLUXON_KV_TEST_TRANSPORT_FEATURE", "tcp_thread_transport"), - help="Transport feature appended to p2p_transfer.", - ) args = parser.parse_args() - feature = str(args.feature).strip() - env = None - if args.case_config: - case_cfg_path = Path(args.case_config).resolve() - case_payload = load_case_config_payload(case_cfg_path, expected_scene_id=SCENE_ID) - scene_config = case_payload["scene_config"] - configured_feature = str(scene_config.get("kv_transport_feature") or "").strip() - if not configured_feature: - raise ValueError("scene_config.kv_transport_feature must be set") - if feature != configured_feature: - raise ValueError( - f"--feature must match scene_config.kv_transport_feature when --case-config is set: {configured_feature!r}" - ) - scene_runtime = case_payload.get("scene_runtime") - if not isinstance(scene_runtime, dict): - raise ValueError("case config must define scene_runtime mapping") - build_config_ext_path = write_build_config_ext(case_cfg_path, scene_runtime=scene_runtime) - env = inject_build_config_ext_env( - env, - build_config_ext_path=build_config_ext_path, - ) + case_cfg_path = Path(args.case_config).resolve() + case_payload = load_case_config_payload(case_cfg_path, expected_scene_id=SCENE_ID) + scene_config = case_payload["scene_config"] + feature = str(scene_config.get("kv_transport_feature") or "").strip() + if not feature: + raise ValueError("scene_config.kv_transport_feature must be set") + scene_runtime = case_payload.get("scene_runtime") + if not isinstance(scene_runtime, dict): + raise ValueError("case config must define scene_runtime mapping") + build_config_ext_path = write_build_config_ext(case_cfg_path, scene_runtime=scene_runtime) + env = inject_build_config_ext_env( + None, + build_config_ext_path=build_config_ext_path, + ) return run_cargo([ "test", "--manifest-path", diff --git a/fluxon_test_stack/top_attention_test_index/_common.py b/fluxon_test_stack/top_attention_test_index/_common.py index e3f83a4..1b07b46 100755 --- a/fluxon_test_stack/top_attention_test_index/_common.py +++ b/fluxon_test_stack/top_attention_test_index/_common.py @@ -178,37 +178,35 @@ def _resolve_authoritative_fluxon_pyo3_libs_dir() -> Path | None: return None -def _prepend_env_path_list( - prepared_env: dict[str, str], +def _path_contains_fluxon_pyo3_libs_dir(path: Path) -> bool: + return "fluxon_pyo3.libs" in path.parts + + +def _sanitize_cargo_ld_library_path( *, - key: str, - entries: Sequence[str], -) -> None: - normalized_entries: list[str] = [] + authoritative_entries: Sequence[str], + current_value: str | None, +) -> str: + # Keep the authoritative loader roots first, then retain only non-fluxon entries from the parent env. + sanitized_entries: list[str] = [] seen_entries: set[str] = set() - for raw_entry in entries: + for raw_entry in authoritative_entries: entry = raw_entry.strip() - if not entry: - continue - if entry in seen_entries: + if not entry or entry in seen_entries: continue seen_entries.add(entry) - normalized_entries.append(entry) + sanitized_entries.append(entry) - current_value = prepared_env.get(key) - if current_value is None: - prepared_env[key] = ":".join(normalized_entries) - return - - for raw_entry in current_value.split(":"): - entry = raw_entry.strip() - if not entry: - continue - if entry in seen_entries: - continue - seen_entries.add(entry) - normalized_entries.append(entry) - prepared_env[key] = ":".join(normalized_entries) + if current_value is not None: + for raw_entry in current_value.split(":"): + entry = raw_entry.strip() + if not entry or entry in seen_entries: + continue + if _path_contains_fluxon_pyo3_libs_dir(Path(entry)): + continue + seen_entries.add(entry) + sanitized_entries.append(entry) + return ":".join(sanitized_entries) def _resolve_repo_closed_sdk_root() -> Path | None: @@ -223,23 +221,29 @@ def _resolve_repo_closed_sdk_root() -> Path | None: def _prepare_cargo_env(env: dict[str, str] | None) -> dict[str, str] | None: + libs_dir = _resolve_authoritative_fluxon_pyo3_libs_dir() + closed_sdk_root = _resolve_repo_closed_sdk_root() + if env is None and libs_dir is None and closed_sdk_root is None: + return None + prepared_env = os.environ.copy() if env is None else dict(env) + authoritative_entries: list[str] = [] - libs_dir = _resolve_authoritative_fluxon_pyo3_libs_dir() if libs_dir is not None: - prepared_env["FLUXON_PYO3_LIBS_DIR"] = str(libs_dir) + authoritative_entry = str(libs_dir) + prepared_env["FLUXON_PYO3_LIBS_DIR"] = authoritative_entry + authoritative_entries.append(authoritative_entry) - closed_sdk_root = _resolve_repo_closed_sdk_root() if closed_sdk_root is not None: prepared_env["FLUXON_COMMU_CLOSED_SDK_ROOT"] = str(closed_sdk_root) - _prepend_env_path_list( - prepared_env, - key="LD_LIBRARY_PATH", - entries=[str((closed_sdk_root / "lib").resolve())], + authoritative_entries.append(str((closed_sdk_root / "lib").resolve())) + + if authoritative_entries: + prepared_env["LD_LIBRARY_PATH"] = _sanitize_cargo_ld_library_path( + authoritative_entries=authoritative_entries, + current_value=prepared_env.get("LD_LIBRARY_PATH"), ) - if env is None and libs_dir is None and closed_sdk_root is None: - return None return prepared_env