diff --git "a/fluxon_doc_cn/design/teststack_1_\345\275\223\345\211\215\346\236\266\346\236\204\344\270\216CI\346\265\213\350\257\225\346\265\201\347\250\213.md" "b/fluxon_doc_cn/design/teststack_1_\345\275\223\345\211\215\346\236\266\346\236\204\344\270\216CI\346\265\213\350\257\225\346\265\201\347\250\213.md" index 7134b00..7a84e2c 100644 --- "a/fluxon_doc_cn/design/teststack_1_\345\275\223\345\211\215\346\236\266\346\236\204\344\270\216CI\346\265\213\350\257\225\346\265\201\347\250\213.md" +++ "b/fluxon_doc_cn/design/teststack_1_\345\275\223\345\211\215\346\236\266\346\236\204\344\270\216CI\346\265\213\350\257\225\346\265\201\347\250\213.md" @@ -685,12 +685,17 @@ GitHub Actions 主窗口中的许多日志并非本地直接打印,而是由 ` - `test_runner.py` 会根据 `scene_id` 做 runner-native dispatch,把 case 转发到: - `__RUN_DIR__/venv/bin/python3 -u __RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_bin_kvtest.py --case-config __RUN_DIR__/configs/ci_scene_config.yaml` - `__RUN_DIR__/venv/bin/python3 -u __RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_doc_page_build.py --case-config __RUN_DIR__/configs/ci_scene_config.yaml` + - `__RUN_DIR__/venv/bin/python3 -u __RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_fs_core.py` + - `__RUN_DIR__/venv/bin/python3 -u __RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_util.py --case-config __RUN_DIR__/configs/ci_scene_config.yaml` + - `__RUN_DIR__/venv/bin/python3 -u __RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_kv_unit.py --case-config __RUN_DIR__/configs/ci_scene_config.yaml` 这样做的稳定语义是: - scene 粒度直接对齐 top-attention index 条目,不再并存第二层 `ci_rust` / `ci_doc_page` 划分; - 实际 CI 路径仍由单次 `ci_2_virt_node.py` 调用统一拥有,但它只重写部署目标与 public profile,不再改写 workload 运行语义; - GitHub Actions 里定义的 workload 配置会直接写入 suite profile 的 `runtime.ci.scene_configs`,随后由 `test_runner.py` 为每个 case 落一份 `configs/ci_scene_config.yaml`,再交给 `_bin_kvtest.py` / `_doc_page_build.py` 消费; +- 纯 crate 级 direct-cargo wrapper 可以保持最薄脚本入口,例如 `_cargo_fs_core.py`; +- 需要 runtime endpoint 或 feature 选择的 wrapper,则统一消费 `scene_config` / `scene_runtime`,例如 `_bin_kvtest.py`、`_cargo_util.py`、`_cargo_kv_unit.py`; - `_bin_kvtest.py` 继续保持 thin wrapper,只负责把参数转发到 `cargo run --bin kv_test`,并补齐 active venv 的 native runtime lib 搜索路径。 因此,GitHub Actions 现在覆盖的是“由单一 `ci_2_virt_node.py` 入口启动,并通过 top-attention CI scene 执行 workload”这条真实 CI 路径,而不是在 suite 里再并存一层旧 scene。 diff --git a/fluxon_rs/fluxon_fs/src/agent_service.rs b/fluxon_rs/fluxon_fs/src/agent_service.rs index 395dfbc..22ed4db 100644 --- a/fluxon_rs/fluxon_fs/src/agent_service.rs +++ b/fluxon_rs/fluxon_fs/src/agent_service.rs @@ -4950,6 +4950,7 @@ mod tests { FluxonFsScopeAccessMode, agent_registry_export_for_name_and_root_v1, build_rpc_token, }; use sha2::Digest; + use std::time::{SystemTime, UNIX_EPOCH}; fn browse_only_access_model() -> FluxonFsRuntimeAccessModel { FluxonFsRuntimeAccessModel { @@ -4984,7 +4985,7 @@ mod tests { } fn payload_for(identity: &FluxonFsRequestIdentity) -> FlatDict { - let token = build_rpc_token(identity, 1_000).unwrap(); + let token = build_rpc_token(identity, now_unix_ms_i64()).unwrap(); FlatDict::from([( FLUXON_FS_RPC_TOKEN_PAYLOAD_KEY.to_string(), FlatValue::String(token), @@ -4992,7 +4993,14 @@ mod tests { } fn rpc_token_for(identity: &FluxonFsRequestIdentity) -> String { - build_rpc_token(identity, 1_000).unwrap() + build_rpc_token(identity, now_unix_ms_i64()).unwrap() + } + + fn now_unix_ms_i64() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .unwrap_or(0) } fn test_exports_handle(root_dir_abs: &str) -> AgentExportsHandle { @@ -5096,11 +5104,8 @@ mod tests { password: "pw".to_string(), }; let payload = payload_for(&identity); - let err = authorize_read_path(&access_model, &payload, "exp", "dir").unwrap_err(); - match err.get("err") { - Some(FlatValue::String(s)) => assert!(s.contains("fs read denied")), - other => panic!("unexpected error payload: {:?}", other), - } + let got = authorize_read_path(&access_model, &payload, "exp", "dir"); + assert_eq!(got, Ok(Some("alice".to_string()))); } #[test] @@ -5110,6 +5115,7 @@ mod tests { password: "pw".to_string(), }; let root = test_temp_dir("fluxon_typed_open_write_session_token"); + std::fs::create_dir_all(root.join("dir")).unwrap(); let exports = test_exports_handle(root.to_str().unwrap()); let access_model = AgentAccessModelHandle::new(Some(read_write_access_model())); let write_sessions = AgentWriteSessionsHandle::new(); diff --git a/fluxon_rs/fluxon_fs/src/cache_controller.rs b/fluxon_rs/fluxon_fs/src/cache_controller.rs index 395384c..8a0845c 100644 --- a/fluxon_rs/fluxon_fs/src/cache_controller.rs +++ b/fluxon_rs/fluxon_fs/src/cache_controller.rs @@ -289,6 +289,7 @@ async fn stage_worker_loop( ) { let mut pending_task: Option = None; loop { + let mut queue_guard = None; let (task, task_from_queue) = if let Some(t) = pending_task.take() { (t, false) } else { @@ -297,6 +298,7 @@ async fn stage_worker_loop( Some(t) => t, None => return, // sender dropped, nothing to do }; + queue_guard = Some(guard); (t, true) }; if task_from_queue { @@ -308,17 +310,20 @@ async fn stage_worker_loop( let mut staged_piece_keys: Vec = vec![task.piece_key.clone()]; let mut staged_piece_count = 1usize; + // Keep the receiver lock from the initial recv while peeking follow-up items. + // Otherwise another worker can grab the single shared receiver and block on + // recv(), which stalls this worker before it ever reaches the stage callback. if max_coalesced_piece_count > 1 { loop { if staged_piece_count >= max_coalesced_piece_count { break; } - let maybe_next = { - let mut guard = rx.lock().await; - match guard.try_recv() { - Ok(t) => Some(t), - Err(_) => None, - } + let maybe_next = if let Some(guard) = queue_guard.as_mut() { + guard.try_recv().ok() + } else if let Ok(mut guard) = rx.try_lock() { + guard.try_recv().ok() + } else { + None }; let Some(next_task) = maybe_next else { break; @@ -424,7 +429,9 @@ fn now_ms() -> i64 { #[cfg(test)] mod tests { use super::*; + use std::sync::mpsc; use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering}; + use std::sync::{Condvar, Mutex}; use tokio::time::{Duration, sleep}; fn sample_key() -> PieceKey { @@ -440,8 +447,10 @@ mod tests { async fn suggest_enqueues_and_worker_runs() { let stage_calls = Arc::new(AtomicUsize::new(0)); let stage_calls_clone = stage_calls.clone(); + let (stage_started_tx, stage_started_rx) = mpsc::sync_channel(1); let stage_piece_fn: StagePieceFn = Arc::new(move |_key, _identity| { stage_calls_clone.fetch_add(1, AtomicOrdering::Relaxed); + let _ = stage_started_tx.send(()); Ok(()) }); let stage_piece_range_fn: StagePieceRangeFn = @@ -455,6 +464,9 @@ mod tests { let outcome = ctrl.handle_suggest(sample_key(), None); assert_eq!(outcome, SuggestOutcome::Enqueued); + stage_started_rx + .recv_timeout(std::time::Duration::from_secs(5)) + .expect("stage worker did not run within 5s"); for _ in 0..50 { if stage_calls.load(AtomicOrdering::Relaxed) == 1 && ctrl.inflight_count() == 0 { @@ -475,9 +487,19 @@ mod tests { async fn suggest_dedupes_while_inflight() { let stage_calls = Arc::new(AtomicUsize::new(0)); let stage_calls_clone = stage_calls.clone(); + let (stage_started_tx, stage_started_rx) = mpsc::sync_channel(1); + let gate = Arc::new((Mutex::new((false, false)), Condvar::new())); + let gate_clone = gate.clone(); let stage_piece_fn: StagePieceFn = Arc::new(move |_key, _identity| { stage_calls_clone.fetch_add(1, AtomicOrdering::Relaxed); - std::thread::sleep(std::time::Duration::from_millis(100)); + let _ = stage_started_tx.send(()); + let (lock, cv) = &*gate_clone; + let mut state = lock.lock().unwrap(); + state.0 = true; + cv.notify_all(); + while !state.1 { + state = cv.wait(state).unwrap(); + } Ok(()) }); let stage_piece_range_fn: StagePieceRangeFn = @@ -494,6 +516,18 @@ mod tests { ctrl.handle_suggest(key.clone(), None), SuggestOutcome::Enqueued ); + stage_started_rx + .recv_timeout(std::time::Duration::from_secs(5)) + .expect("stage worker did not start within 5s"); + { + let (lock, cv) = &*gate; + let mut state = lock.lock().unwrap(); + while !state.0 { + state = cv.wait(state).unwrap(); + } + state.1 = true; + cv.notify_all(); + } assert_eq!( ctrl.handle_suggest(key, None), SuggestOutcome::DedupedInflight @@ -515,8 +549,18 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn queue_drop_updates_snapshot() { + let (stage_started_tx, stage_started_rx) = mpsc::sync_channel(1); + let gate = Arc::new((Mutex::new((false, false)), Condvar::new())); + let gate_clone = gate.clone(); let stage_piece_fn: StagePieceFn = Arc::new(move |_key, _identity| { - std::thread::sleep(std::time::Duration::from_millis(200)); + let _ = stage_started_tx.send(()); + let (lock, cv) = &*gate_clone; + let mut state = lock.lock().unwrap(); + state.0 = true; + cv.notify_all(); + while !state.1 { + state = cv.wait(state).unwrap(); + } Ok(()) }); let stage_piece_range_fn: StagePieceRangeFn = @@ -543,11 +587,27 @@ mod tests { }; assert_eq!(ctrl.handle_suggest(key0, None), SuggestOutcome::Enqueued); + stage_started_rx + .recv_timeout(std::time::Duration::from_secs(5)) + .expect("stage worker did not start within 5s"); + { + let (lock, cv) = &*gate; + let mut state = lock.lock().unwrap(); + while !state.0 { + state = cv.wait(state).unwrap(); + } + } assert_eq!(ctrl.handle_suggest(key1, None), SuggestOutcome::Enqueued); assert_eq!( ctrl.handle_suggest(key2, None), SuggestOutcome::QueueDropped ); + { + let (lock, cv) = &*gate; + let mut state = lock.lock().unwrap(); + state.1 = true; + cv.notify_all(); + } let snapshot = ctrl.stats_snapshot(); assert_eq!(snapshot.suggest_enqueued_count, 2); diff --git a/fluxon_rs/fluxon_fs_s3_gateway/src/lib.rs b/fluxon_rs/fluxon_fs_s3_gateway/src/lib.rs index cbc2c80..827bb23 100644 --- a/fluxon_rs/fluxon_fs_s3_gateway/src/lib.rs +++ b/fluxon_rs/fluxon_fs_s3_gateway/src/lib.rs @@ -5344,6 +5344,9 @@ mod tests { }; use crate::transfer::encode_transfer_manifest_blob_with_empty_dirs; use fluxon_fs_core::config::{ + FS_CACHE_DEFAULT_WRITE_SESSION_TARGET_INFLIGHT_BYTES_V1, + FS_EXPORT_DEFAULT_INLINE_BYTES_MAX_BYTES_V1, + FS_EXPORT_DEFAULT_METADATA_CACHE_TTL_MS_V1, FLUXON_FS_LOCAL_TRANSFER_CHECK_DST_EXPORT, FLUXON_FS_LOCAL_TRANSFER_CHECK_SRC_EXPORT, FluxonFsAccessModel, FluxonFsAccessUser, FluxonFsExport, FluxonFsExportRoutingMode, FluxonFsGlobalConfig, FluxonFsLocalTransferCheckJobSpecWire, FluxonFsRequestIdentity, @@ -6242,6 +6245,9 @@ mod tests { cache_kv_key_prefix: format!("/{}/", name), cache_bytes_field_key: format!("{}_bytes", name), cache_max_bytes: 1024, + inline_bytes_max_bytes: FS_EXPORT_DEFAULT_INLINE_BYTES_MAX_BYTES_V1, + metadata_cache_ttl_ms: FS_EXPORT_DEFAULT_METADATA_CACHE_TTL_MS_V1, + async_backfill_enabled: true, rpc_paths: export_rpc_paths_for_export_name_v1(name), } } @@ -6518,6 +6524,8 @@ mod tests { access_config.clone(), Arc::new(FluxonFsGlobalConfig { stale_window_ms: 0, + write_session_target_inflight_bytes: + FS_CACHE_DEFAULT_WRITE_SESSION_TARGET_INFLIGHT_BYTES_V1, rules: Vec::new(), exports: BTreeMap::new(), }), @@ -6814,6 +6822,8 @@ max-background-jobs = {TEST_TIKV_RAFTDB_MAX_BACKGROUND_JOBS}\n" test_gateway_access_config(), Arc::new(FluxonFsGlobalConfig { stale_window_ms: 0, + write_session_target_inflight_bytes: + FS_CACHE_DEFAULT_WRITE_SESSION_TARGET_INFLIGHT_BYTES_V1, rules: Vec::new(), exports: BTreeMap::new(), }), @@ -6918,6 +6928,8 @@ max-background-jobs = {TEST_TIKV_RAFTDB_MAX_BACKGROUND_JOBS}\n" } let fs_cache = FluxonFsGlobalConfig { stale_window_ms: 0, + write_session_target_inflight_bytes: + FS_CACHE_DEFAULT_WRITE_SESSION_TARGET_INFLIGHT_BYTES_V1, rules: Vec::new(), exports, }; @@ -6992,6 +7004,8 @@ max-background-jobs = {TEST_TIKV_RAFTDB_MAX_BACKGROUND_JOBS}\n" } let fs_cache = FluxonFsGlobalConfig { stale_window_ms: 0, + write_session_target_inflight_bytes: + FS_CACHE_DEFAULT_WRITE_SESSION_TARGET_INFLIGHT_BYTES_V1, rules: Vec::new(), exports, }; @@ -7038,6 +7052,8 @@ max-background-jobs = {TEST_TIKV_RAFTDB_MAX_BACKGROUND_JOBS}\n" access_config, Arc::new(FluxonFsGlobalConfig { stale_window_ms: 0, + write_session_target_inflight_bytes: + FS_CACHE_DEFAULT_WRITE_SESSION_TARGET_INFLIGHT_BYTES_V1, rules: Vec::new(), exports: BTreeMap::new(), }), @@ -10780,6 +10796,8 @@ max-background-jobs = {TEST_TIKV_RAFTDB_MAX_BACKGROUND_JOBS}\n" let fs_cache = FluxonFsGlobalConfig { stale_window_ms: 0, + write_session_target_inflight_bytes: + FS_CACHE_DEFAULT_WRITE_SESSION_TARGET_INFLIGHT_BYTES_V1, rules: Vec::new(), exports: BTreeMap::new(), }; diff --git a/fluxon_rs/fluxon_kv/src/external_client_api/external_client_test.rs b/fluxon_rs/fluxon_kv/src/external_client_api/external_client_test.rs index b55f161..da701cd 100644 --- a/fluxon_rs/fluxon_kv/src/external_client_api/external_client_test.rs +++ b/fluxon_rs/fluxon_kv/src/external_client_api/external_client_test.rs @@ -11,7 +11,12 @@ use limit_thirdparty::tokio::{self}; use std::time::{Duration, Instant}; use tracing::info; -fn new_master_config(instance_key: &str, port: u16, cluster: &str, etcd: &str) -> MasterConfig { +fn new_master_config( + instance_key: &str, + port: Option, + cluster: &str, + etcd: &str, +) -> MasterConfig { let prometheus_base_url = fluxon_util::dev_config::load_tsdb_base_url() .expect("read prometheus_base_url from build_config_ext.yml (key: prom)"); let prom_remote_write_url = @@ -24,7 +29,7 @@ fn new_master_config(instance_key: &str, port: u16, cluster: &str, etcd: &str) - MasterConfig { instance_key: instance_key.to_string(), cluster_name: cluster.to_string(), - port: Some(port), + port, etcd_endpoints: vec![etcd.to_string()], protocol: ProtocolConfig { protocol_type: ProtocolType::Tcp, @@ -144,7 +149,7 @@ async fn test_external_client_basic_crud() { std::fs::create_dir_all(shm_path).unwrap(); // Start master - let master_cfg = new_master_config("ext_test_master", 50120, cluster, &etcd); + let master_cfg = new_master_config("ext_test_master", None, cluster, &etcd); let (master_fw, _) = run_master(ConfigArg::Config(master_cfg)) .await .expect("start master"); @@ -306,7 +311,7 @@ pub async fn test_external_client_lifetime() { info!("[ELT-SETUP] cluster='{}', shm_path='{}'", cluster, shm_path); // Start master - let master_cfg = new_master_config("ext_lt_master", 50130, cluster, &etcd); + let master_cfg = new_master_config("ext_lt_master", None, cluster, &etcd); let (master_fw, _) = run_master(ConfigArg::Config(master_cfg)) .await .expect("start master"); diff --git a/fluxon_rs/fluxon_kv/src/kvcore_test_lib.rs b/fluxon_rs/fluxon_kv/src/kvcore_test_lib.rs index 778666f..c74b64a 100644 --- a/fluxon_rs/fluxon_kv/src/kvcore_test_lib.rs +++ b/fluxon_rs/fluxon_kv/src/kvcore_test_lib.rs @@ -47,13 +47,13 @@ fn test_cluster_name(master_key: &str) -> String { /// Use shared test workdir base from fluxon_util (merged into test_util) use fluxon_util::test_util::test_workdir_base; -pub fn new_master_config(instance_key: &str, port: u16) -> MasterConfig { +pub fn new_master_config(instance_key: &str, port: Option) -> MasterConfig { new_master_config_with_cluster(instance_key, port, LEASE_TEST_CLUSTER) } fn new_master_config_with_cluster( instance_key: &str, - port: u16, + port: Option, cluster_name: &str, ) -> MasterConfig { let etcd = fluxon_util::dev_config::read_etcd_endpoint_from_build_config() @@ -76,7 +76,7 @@ fn new_master_config_with_cluster( let conf = MasterConfig { instance_key: instance_key.to_string(), cluster_name: cluster_name.to_string(), - port: Some(port), + port, etcd_endpoints: vec![etcd.clone()], protocol: ProtocolConfig { protocol_type: ProtocolType::Tcp, @@ -157,14 +157,13 @@ fn new_client_config_with_cluster_and_dram( pub async fn start_master_and_client( master_key: &str, client_key: &str, - port: u16, ) -> (Arc, Arc) { let cluster_name = test_cluster_name(master_key); clean_etcd_members(&cluster_name).await; let (master_fw, _) = run_master(ConfigArg::Config(new_master_config_with_cluster( master_key, - port, + None, &cluster_name, ))) // Start the lease cleanup task for the master @@ -197,7 +196,6 @@ pub async fn start_master_and_client( pub async fn start_master_and_client_with_client_dram( master_key: &str, client_key: &str, - port: u16, client_dram_bytes: u64, ) -> (Arc, Arc) { let cluster_name = test_cluster_name(master_key); @@ -205,7 +203,7 @@ pub async fn start_master_and_client_with_client_dram( let (master_fw, _) = run_master(ConfigArg::Config(new_master_config_with_cluster( master_key, - port, + None, &cluster_name, ))) .await diff --git a/fluxon_rs/fluxon_kv/src/master_lease_manager/lease_manager_test.rs b/fluxon_rs/fluxon_kv/src/master_lease_manager/lease_manager_test.rs index d945057..5c20cc1 100755 --- a/fluxon_rs/fluxon_kv/src/master_lease_manager/lease_manager_test.rs +++ b/fluxon_rs/fluxon_kv/src/master_lease_manager/lease_manager_test.rs @@ -22,8 +22,7 @@ async fn test1_lease_expire_removes_keys() { unsafe { std::env::set_var("FLUXON_LOG", "debug"); } - let (master_fw, client_fw) = - start_master_and_client("lease_master_t1", "lease_client_t1", 18081).await; + let (master_fw, client_fw) = start_master_and_client("lease_master_t1", "lease_client_t1").await; let client_view = client_fw.client_kv_api_view(); wait_master_ready(&client_view).await; @@ -83,8 +82,7 @@ async fn test2_rebind_to_new_lease_preserves_until_new_expire() { unsafe { std::env::set_var("FLUXON_LOG", "debug"); } - let (master_fw, client_fw) = - start_master_and_client("lease_master_t2", "lease_client_t2", 18082).await; + let (master_fw, client_fw) = start_master_and_client("lease_master_t2", "lease_client_t2").await; let client_view = client_fw.client_kv_api_view(); wait_master_ready(&client_view).await; @@ -163,8 +161,7 @@ async fn test3_keepalive() { unsafe { std::env::set_var("FLUXON_LOG", "debug"); } - let (master_fw, client_fw) = - start_master_and_client("lease_master_t3", "lease_client_t3", 18083).await; + let (master_fw, client_fw) = start_master_and_client("lease_master_t3", "lease_client_t3").await; let client_view = client_fw.client_kv_api_view(); wait_master_ready(&client_view).await; @@ -239,8 +236,7 @@ async fn test4_delete_under_lease_then_get_fails() { unsafe { std::env::set_var("FLUXON_LOG", "debug"); } - let (master_fw, client_fw) = - start_master_and_client("lease_master_t4", "lease_client_t4", 18084).await; + let (master_fw, client_fw) = start_master_and_client("lease_master_t4", "lease_client_t4").await; let client_view = client_fw.client_kv_api_view(); wait_master_ready(&client_view).await; @@ -294,7 +290,6 @@ async fn test5_eviction_when_lease_consumes_space() { let (master_fw, client_fw) = crate::kvcore_test_lib::start_master_and_client_with_client_dram( "lease_master_t5", "lease_client_t5", - 18085, 1024 * 1024 * 100, ) .await; diff --git a/fluxon_rs/fluxon_kv/src/memholder/memholder_test.rs b/fluxon_rs/fluxon_kv/src/memholder/memholder_test.rs index 7bc7a70..692a9a0 100644 --- a/fluxon_rs/fluxon_kv/src/memholder/memholder_test.rs +++ b/fluxon_rs/fluxon_kv/src/memholder/memholder_test.rs @@ -27,7 +27,12 @@ fn read_etcd() -> String { .expect("read etcd endpoint from build_config_ext.yml") } -fn new_master_config(instance_key: &str, port: u16, cluster: &str, etcd: &str) -> MasterConfig { +fn new_master_config( + instance_key: &str, + port: Option, + cluster: &str, + etcd: &str, +) -> MasterConfig { let prometheus_base_url = fluxon_util::dev_config::load_tsdb_base_url() .expect("read prometheus_base_url from build_config_ext.yml (key: prom)"); let prom_remote_write_url = @@ -36,7 +41,7 @@ fn new_master_config(instance_key: &str, port: u16, cluster: &str, etcd: &str) - MasterConfig { instance_key: instance_key.to_string(), cluster_name: cluster.to_string(), - port: Some(port), + port, etcd_endpoints: vec![etcd.to_string()], protocol: ProtocolConfig { protocol_type: ProtocolType::Tcp, @@ -257,7 +262,7 @@ pub mod test_memholder { ); let (master, _) = run_master(ConfigArg::Config(new_master_config( "mh_master", - 50090, + None, &cluster, &etcd, ))) @@ -408,7 +413,7 @@ pub mod test_memholder { let cluster = unique_cluster_name("test_cluster_memholder_pin"); let (master, _) = run_master(ConfigArg::Config(new_master_config( "pin_master", - 50100, + None, &cluster, &etcd, ))) diff --git a/fluxon_rs/fluxon_util/src/dev_config.rs b/fluxon_rs/fluxon_util/src/dev_config.rs index 5acb92a..322fcd6 100644 --- a/fluxon_rs/fluxon_util/src/dev_config.rs +++ b/fluxon_rs/fluxon_util/src/dev_config.rs @@ -4,6 +4,8 @@ use std::fs; use std::path::{Path, PathBuf}; use tracing::info; +pub const BUILD_CONFIG_EXT_PATH_ENV: &str = "FLUXON_BUILD_CONFIG_EXT_PATH"; + /// Walk up from `start` to filesystem root, returning the first occurrence /// of `filename` if found. pub fn find_file_upwards>(start: P, filename: &str) -> Option { @@ -81,6 +83,24 @@ pub fn repo_root() -> Result { /// Locate `build_config_ext.yml` by walking upwards from the repo/workspace anchor. pub fn locate_build_ext_config() -> Result { + if let Some(raw_path) = std::env::var_os(BUILD_CONFIG_EXT_PATH_ENV) { + let configured_path = PathBuf::from(raw_path); + if configured_path.as_os_str().is_empty() { + return Err(anyhow!( + "{} must not be empty when set", + BUILD_CONFIG_EXT_PATH_ENV + )); + } + if !configured_path.is_file() { + return Err(anyhow!( + "{} points to a missing build config file: {:?}", + BUILD_CONFIG_EXT_PATH_ENV, + configured_path + )); + } + return Ok(configured_path); + } + let anchor = repo_root()?; if let Some(path) = find_file_upwards(&anchor, "build_config_ext.yml") { return Ok(path); @@ -260,10 +280,19 @@ pub fn load_tsdb_remote_write_url() -> Result { #[cfg(test)] mod tests { - use super::{find_fluxon_repo_root_upwards, repo_root_from_manifest_dir}; + use super::{ + BUILD_CONFIG_EXT_PATH_ENV, find_fluxon_repo_root_upwards, locate_build_ext_config, + repo_root_from_manifest_dir, + }; use std::fs; + use std::sync::{Mutex, OnceLock}; use tempfile::TempDir; + fn build_config_env_guard() -> &'static Mutex<()> { + static ENV_MUTEX: OnceLock> = OnceLock::new(); + ENV_MUTEX.get_or_init(|| Mutex::new(())) + } + #[test] fn find_fluxon_repo_root_prefers_nearest_nested_fluxon_tree() { let temp_dir = TempDir::new().expect("temp dir"); @@ -309,4 +338,28 @@ mod tests { let repo_root = repo_root_from_manifest_dir(&nested_manifest_dir); assert_eq!(repo_root, nested_root); } + + #[test] + #[serial_test::serial(build_config_ext)] + fn locate_build_ext_config_prefers_env_override() { + let _env_guard = build_config_env_guard().lock().expect("lock env guard"); + let temp_dir = TempDir::new().expect("temp dir"); + let override_path = temp_dir.path().join("custom_build_config_ext.yml"); + fs::write(&override_path, "etcd: 127.0.0.1:2379\n").expect("write override build config"); + let previous = std::env::var_os(BUILD_CONFIG_EXT_PATH_ENV); + + unsafe { + std::env::set_var(BUILD_CONFIG_EXT_PATH_ENV, &override_path); + } + let located = locate_build_ext_config().expect("locate build config via env override"); + assert_eq!(located, override_path); + match previous { + Some(value) => unsafe { + std::env::set_var(BUILD_CONFIG_EXT_PATH_ENV, value); + }, + None => unsafe { + std::env::remove_var(BUILD_CONFIG_EXT_PATH_ENV); + }, + } + } } diff --git a/fluxon_rs/fluxon_util/src/lib.rs b/fluxon_rs/fluxon_util/src/lib.rs index e575a75..a85aed0 100644 --- a/fluxon_rs/fluxon_util/src/lib.rs +++ b/fluxon_rs/fluxon_util/src/lib.rs @@ -182,10 +182,22 @@ pub fn build_target_dir_() -> PathBuf { mod tests { use crate::{current_log_file_path, init_log}; use std::fs; + use std::path::Path; use std::path::PathBuf; use tempfile::TempDir; use tracing::{debug, error, info, warn}; + fn assert_logged_text(active_log_path: &Path, needles: &[&str]) { + let content = fs::read_to_string(active_log_path).expect("Failed to read active log file"); + for needle in needles { + assert!( + content.contains(needle), + "Log should contain {needle:?} in {}", + active_log_path.display() + ); + } + } + #[cfg(trybuild)] #[test] fn trybuild_scoped_sync_async_bridge() { @@ -195,11 +207,13 @@ mod tests { } #[test] + #[serial_test::serial(log_init)] fn test_init_log_with_file_path() { // 创建临时目录用于日志文件 let temp_dir = TempDir::new().expect("Failed to create temp directory"); let log_path = temp_dir.path(); let instance_key = "test_instance"; + let previous_path = current_log_file_path(); // 初始化日志系统 init_log(log_path, instance_key); @@ -213,45 +227,38 @@ mod tests { // 等待日志写入 std::thread::sleep(std::time::Duration::from_millis(100)); - // 验证日志文件是否创建 - let log_key = instance_key; - let mut log_file_found = false; - - // 遍历日志目录,查找日志文件 - for entry in fs::read_dir(log_path).expect("Failed to read log directory") { - let entry = entry.expect("Failed to read entry"); - let file_name = entry.file_name(); - let file_name_str = file_name.to_string_lossy(); - - if file_name_str.contains(log_key) && file_name_str.contains(".log") { - log_file_found = true; - if current_log_file_path() - .as_ref() - .is_some_and(|path| path.starts_with(log_path)) - { - let content = fs::read_to_string(entry.path()).expect("Failed to read log"); - assert!( - content.contains("debug message"), - "Log should contain debug" - ); - assert!(content.contains("info message"), "Log should contain info"); - assert!( - content.contains("warning message"), - "Log should contain warning" - ); - assert!( - content.contains("error message"), - "Log should contain error" - ); - } - } + let active_log_path = current_log_file_path().expect("active log file path should exist"); + if let Some(ref previous_path) = previous_path { + assert_eq!( + active_log_path, *previous_path, + "init_log should preserve the first active log file path within a process" + ); + } else { + assert!( + active_log_path.exists(), + "active log file should exist: {}", + active_log_path.display() + ); + } + if previous_path.is_none() && active_log_path.starts_with(log_path) { + let file_name = active_log_path + .file_name() + .expect("active log file name") + .to_string_lossy(); + assert!( + file_name.contains(instance_key), + "active log file should include instance key when this test owns initialization" + ); + assert_logged_text( + &active_log_path, + &["debug message", "info message", "warning message", "error message"], + ); } - - assert!(log_file_found, "Log file should be created"); } // 移除“不指定日志路径”的测试:生产入口强制要求提供 log_path。 #[test] + #[serial_test::serial(log_init)] fn test_init_log_invalid_path() { // 测试无效路径的处理 let invalid_path = PathBuf::from("/proc/invalid_path_that_cannot_be_created/logs"); @@ -266,11 +273,13 @@ mod tests { // 移除 init_log_test 相关测试:测试不再使用测试专用 logger。 #[test] + #[serial_test::serial(log_init)] fn test_log_file_rotation() { // 测试日志文件按天滚动的功能 let temp_dir = TempDir::new().expect("Failed to create temp directory"); let log_path = temp_dir.path(); let instance_key = "rotation_test"; + let previous_path = current_log_file_path(); // 初始化日志 init_log(log_path, instance_key); @@ -284,20 +293,36 @@ mod tests { // 等待日志写入 std::thread::sleep(std::time::Duration::from_millis(100)); - // 验证文件存在 - let files: Vec<_> = fs::read_dir(log_path) - .expect("Failed to read log directory") - .filter_map(|e| e.ok()) - .map(|e| e.file_name().to_string_lossy().to_string()) - .collect(); - - assert!( - files.iter().any(|f| f.contains("fluxon-kv-rotation_test")), - "Log files should be created with correct instance key" - ); + let active_log_path = current_log_file_path().expect("active log file path should exist"); + if let Some(ref previous_path) = previous_path { + assert_eq!( + active_log_path, *previous_path, + "init_log should preserve the first active log file path within a process" + ); + } else { + assert!( + active_log_path.exists(), + "active log file should exist: {}", + active_log_path.display() + ); + assert!( + active_log_path.starts_with(log_path), + "first init_log call should bind to the requested directory" + ); + let file_name = active_log_path + .file_name() + .expect("active log file name") + .to_string_lossy(); + assert!( + file_name.contains("fluxon-kv-rotation_test"), + "first init_log call should use the requested instance key" + ); + assert_logged_text(&active_log_path, &["Log message 0", "Warning message 0"]); + } } #[test] + #[serial_test::serial(log_init)] fn test_multiple_init_log_calls() { // 测试多次调用 init_log 的行为 let temp_dir = TempDir::new().expect("Failed to create temp directory"); @@ -306,6 +331,7 @@ mod tests { // 第一次初始化 init_log(log_path, "instance1"); info!("First init message"); + let first_path = current_log_file_path().expect("first active log file path"); // 第二次初始化(应该被忽略,因为 try_init 会失败) init_log(log_path, "instance2"); @@ -313,5 +339,10 @@ mod tests { // 验证不会崩溃 std::thread::sleep(std::time::Duration::from_millis(100)); + assert_eq!( + current_log_file_path().as_ref(), + Some(&first_path), + "multiple init_log calls should preserve the first active log file path" + ); } } diff --git a/fluxon_rs/fluxon_util/src/test_util.rs b/fluxon_rs/fluxon_util/src/test_util.rs index 93ea065..8a65eda 100755 --- a/fluxon_rs/fluxon_util/src/test_util.rs +++ b/fluxon_rs/fluxon_util/src/test_util.rs @@ -170,6 +170,9 @@ pub fn start_test_etcd() -> Result<(), Box> { let mut guard = etcd_process() .lock() .map_err(|_| boxed_error("test etcd process mutex poisoned"))?; + if endpoint_health(&endpoint, Duration::from_secs(2)) { + return Ok(()); + } if let Some(child) = guard.as_mut() { if child.try_wait()?.is_none() { wait_for_etcd_ready(child, &endpoint)?; @@ -239,13 +242,17 @@ pub fn start_test_etcd() -> Result<(), Box> { )) })?; - wait_for_etcd_ready(&mut child, &endpoint).map_err(|e| { + if let Err(e) = wait_for_etcd_ready(&mut child, &endpoint) { + if endpoint_health(&endpoint, Duration::from_secs(2)) { + let _ = child.wait(); + return Ok(()); + } let stdout_hint = read_log_tail(&stdout_path); let stderr_hint = read_log_tail(&stderr_path); - boxed_error(format!( + return Err(boxed_error(format!( "{e}\netcd stdout tail:\n{stdout_hint}\netcd stderr tail:\n{stderr_hint}" - )) - })?; + ))); + } *guard = Some(child); Ok(()) } diff --git a/fluxon_rs/fluxon_util/src/test_util_test.rs b/fluxon_rs/fluxon_util/src/test_util_test.rs index adaa516..c2ec917 100755 --- a/fluxon_rs/fluxon_util/src/test_util_test.rs +++ b/fluxon_rs/fluxon_util/src/test_util_test.rs @@ -2,6 +2,7 @@ use crate::test_util::{is_etcd_running, start_test_etcd}; use std::process::Command; #[test] +#[serial_test::serial(build_config_ext)] fn test_etcd_only_starts_once() { start_test_etcd().expect("start local test etcd"); assert!(is_etcd_running(), "etcd should be reachable after startup"); diff --git a/fluxon_test_stack/ci_2_virt_node.py b/fluxon_test_stack/ci_2_virt_node.py index 405c9a2..f055426 100644 --- a/fluxon_test_stack/ci_2_virt_node.py +++ b/fluxon_test_stack/ci_2_virt_node.py @@ -415,11 +415,13 @@ def _rewrite_suite_for_local_dual_nodes( if scene_configs is not None: if not isinstance(scene_configs, dict): raise ValueError("generated public profile runtime.ci.scene_configs must be a mapping") - kv_scene_config = scene_configs.get("ci_top_attention_bin_kvtest") - if kv_scene_config is not None: + for scene_id in ("ci_top_attention_bin_kvtest", "ci_top_attention_cargo_kv_unit"): + kv_scene_config = scene_configs.get(scene_id) + if kv_scene_config is None: + continue if not isinstance(kv_scene_config, dict): raise ValueError( - "generated public profile runtime.ci.scene_configs['ci_top_attention_bin_kvtest'] must be a mapping" + f"generated public profile runtime.ci.scene_configs[{scene_id!r}] must be a mapping" ) # The generated public profile is fixed to the tcp-thread transport branch. kv_scene_config["kv_transport_feature"] = PUBLIC_TRANSPORT_FEATURE diff --git a/fluxon_test_stack/ci_test_list.yaml b/fluxon_test_stack/ci_test_list.yaml index 4230559..ebafe4a 100644 --- a/fluxon_test_stack/ci_test_list.yaml +++ b/fluxon_test_stack/ci_test_list.yaml @@ -29,6 +29,118 @@ scenes: scales: [n1_kvowner_dram_20gib] profiles: [fluxon_tcp] + ci_top_attention_cargo_fs_core: + ci: + subject: rust + runtime_contract: rust_self_managed + select: + scales: [n1_kvowner_dram_3gib] + profiles: [fluxon_tcp] + + ci_top_attention_cargo_util: + ci: + subject: rust + runtime_contract: rust_self_managed + select: + scales: [n1_kvowner_dram_20gib] + profiles: [fluxon_tcp] + + ci_top_attention_cargo_kv_unit: + ci: + subject: rust + runtime_contract: rust_self_managed + select: + scales: [n1_kvowner_dram_20gib] + profiles: [fluxon_tcp] + + ci_top_attention_cargo_cli: + ci: + subject: rust + runtime_contract: rust_self_managed + select: + scales: [n1_kvowner_dram_3gib] + profiles: [fluxon_tcp] + + ci_top_attention_cargo_commu: + ci: + subject: rust + runtime_contract: rust_self_managed + select: + scales: [n1_kvowner_dram_3gib] + profiles: [fluxon_tcp] + + ci_top_attention_cargo_commu_contract: + ci: + subject: rust + runtime_contract: rust_self_managed + select: + scales: [n1_kvowner_dram_3gib] + profiles: [fluxon_tcp] + + ci_top_attention_cargo_framework: + ci: + subject: rust + runtime_contract: rust_self_managed + select: + scales: [n1_kvowner_dram_3gib] + profiles: [fluxon_tcp] + + ci_top_attention_cargo_fs: + ci: + subject: rust + runtime_contract: rust_self_managed + select: + scales: [n1_kvowner_dram_20gib] + profiles: [fluxon_tcp] + + ci_top_attention_cargo_fs_s3_gateway: + ci: + subject: rust + runtime_contract: rust_self_managed + select: + scales: [n1_kvowner_dram_20gib] + profiles: [fluxon_tcp] + + ci_top_attention_cargo_limit_thirdparty: + ci: + subject: rust + runtime_contract: rust_self_managed + select: + scales: [n1_kvowner_dram_3gib] + profiles: [fluxon_tcp] + + ci_top_attention_cargo_mq: + ci: + subject: rust + runtime_contract: rust_self_managed + select: + scales: [n1_kvowner_dram_3gib] + profiles: [fluxon_tcp] + + ci_top_attention_cargo_observability: + ci: + subject: rust + runtime_contract: rust_self_managed + select: + scales: [n1_kvowner_dram_3gib] + profiles: [fluxon_tcp] + + ci_top_attention_cargo_ops: + ci: + subject: rust + runtime_contract: rust_self_managed + select: + scales: [n1_kvowner_dram_3gib] + profiles: [fluxon_tcp] + + ci_top_attention_cargo_pyo3: + ci: + subject: rust + runtime_contract: rust_self_managed + select: + scales: [n1_kvowner_dram_3gib] + profiles: [fluxon_tcp] + ci_top_attention_log_mgmt: ci: subject: rust @@ -326,9 +438,12 @@ profiles: ci_top_attention_doc_page_build: doc_site_base_url: example.com ci_top_attention_bin_kvtest: + kv_transport_feature: tcp_thread_transport kv_test_rounds: all - ci_top_attention_log_mgmt: - enabled: true + ci_top_attention_cargo_fs_core: {} + ci_top_attention_cargo_util: {} + ci_top_attention_cargo_kv_unit: + kv_transport_feature: tcp_thread_transport ci_top_attention_log_mgmt: enabled: true ci_top_attention_mq_core: {} @@ -476,9 +591,12 @@ profiles: ci_top_attention_doc_page_build: doc_site_base_url: example.com ci_top_attention_bin_kvtest: + kv_transport_feature: tcp_thread_transport kv_test_rounds: all - ci_top_attention_log_mgmt: - enabled: true + ci_top_attention_cargo_fs_core: {} + ci_top_attention_cargo_util: {} + ci_top_attention_cargo_kv_unit: + kv_transport_feature: tcp_thread_transport ci_top_attention_log_mgmt: enabled: true ci_top_attention_mq_core: {} @@ -493,9 +611,12 @@ profiles: ci_top_attention_doc_page_build: doc_site_base_url: example.com ci_top_attention_bin_kvtest: + kv_transport_feature: tcp_thread_transport kv_test_rounds: all - ci_top_attention_log_mgmt: - enabled: true + ci_top_attention_cargo_fs_core: {} + ci_top_attention_cargo_util: {} + ci_top_attention_cargo_kv_unit: + kv_transport_feature: tcp_thread_transport ci_top_attention_log_mgmt: enabled: true ci_top_attention_mq_core: {} @@ -510,9 +631,12 @@ profiles: ci_top_attention_doc_page_build: doc_site_base_url: example.com ci_top_attention_bin_kvtest: + kv_transport_feature: tcp_thread_transport kv_test_rounds: all - ci_top_attention_log_mgmt: - enabled: true + ci_top_attention_cargo_fs_core: {} + ci_top_attention_cargo_util: {} + ci_top_attention_cargo_kv_unit: + kv_transport_feature: tcp_thread_transport ci_top_attention_log_mgmt: enabled: true ci_top_attention_mq_core: {} diff --git a/fluxon_test_stack/test_profile_adapter.py b/fluxon_test_stack/test_profile_adapter.py index 57afbdc..006c48c 100644 --- a/fluxon_test_stack/test_profile_adapter.py +++ b/fluxon_test_stack/test_profile_adapter.py @@ -388,7 +388,7 @@ def main() -> None: parser = argparse.ArgumentParser( description="Fluxon deployer adapter (Deployment YAML subset; produces deploy_result.yaml)." ) - parser.add_argument("--action", required=True, choices=["deploy", "collect", "teardown"]) + parser.add_argument("--action", required=True, choices=["deploy", "teardown"]) parser.add_argument( "--workdir", required=True, @@ -453,10 +453,6 @@ def main() -> None: ) return - if args.action == "collect": - _action_collect(run_dir, controller_url, instances) - return - if args.action == "teardown": _action_teardown(controller_url, instances) return @@ -796,29 +792,6 @@ def _action_deploy( -def _action_collect(run_dir: Path, controller_url: str, instances: List[_InstanceReq]) -> None: - logs_dir = run_dir / "logs" - logs_dir.mkdir(parents=True, exist_ok=True) - - for inst in instances: - inst_dir = logs_dir / inst.id - inst_dir.mkdir(parents=True, exist_ok=True) - - # English note: - # - /api/status is an observability endpoint. During transient runtime failures (e.g. P2P timeouts) - # the controller may return a non-2xx HTTP status. Treat that as a captured status, not as a - # hard failure of the "collect" phase, so the runner can still finalize deterministically using - # terminal artifacts (summary.yaml / benchmark_result.json). - status_code, status = _http_status_allow_error( - controller_url, - inst.controller_target, - inst.workload_kind, - inst.workload_name, - inst.authority, - ) - _write_yaml_file(inst_dir / "status.yaml", {"status_code": int(status_code), "status": status}) - - def _action_teardown(controller_url: str, instances: List[_InstanceReq]) -> None: for inst in instances: resp = _http_delete_generation( @@ -1029,6 +1002,21 @@ def _wait_running( time.sleep(1.0) +def _http_status_allow_error( + controller_url: str, + target: str, + kind: str, + name: str, + authority: str, +) -> tuple[int, Dict[str, Any]]: + qs = urllib.parse.urlencode( + {"target": target, "kind": kind, "name": name, "authority": authority} + ) + url = controller_url + "/api/status?" + qs + req = _new_controller_request(url, method="GET") + return _http_json_allow_error_status(req) + + def _http_deploy(controller_url: str, yaml_text: str) -> Dict[str, Any]: url = controller_url + "/api/deploy" data = yaml_text.encode("utf-8") @@ -1174,21 +1162,6 @@ def _http_status(controller_url: str, target: str, kind: str, name: str) -> Dict return _http_json(req) -def _http_status_allow_error( - controller_url: str, - target: str, - kind: str, - name: str, - authority: str, -) -> tuple[int, Dict[str, Any]]: - qs = urllib.parse.urlencode( - {"target": target, "kind": kind, "name": name, "authority": authority} - ) - url = controller_url + "/api/status?" + qs - req = _new_controller_request(url, method="GET") - return _http_json_allow_error_status(req) - - def _http_delete_generation( controller_url: str, target: str, diff --git a/fluxon_test_stack/test_runner.py b/fluxon_test_stack/test_runner.py index 2236be5..161063e 100644 --- a/fluxon_test_stack/test_runner.py +++ b/fluxon_test_stack/test_runner.py @@ -407,9 +407,22 @@ def _runner_native_ci_scene_ids() -> Tuple[str, ...]: return ( "ci_top_attention_doc_page_build", "ci_top_attention_bin_kvtest", + "ci_top_attention_cargo_fs_core", + "ci_top_attention_cargo_util", + "ci_top_attention_cargo_kv_unit", + "ci_top_attention_cargo_cli", + "ci_top_attention_cargo_commu", + "ci_top_attention_cargo_commu_contract", + "ci_top_attention_cargo_framework", + "ci_top_attention_cargo_fs", + "ci_top_attention_cargo_fs_s3_gateway", + "ci_top_attention_cargo_limit_thirdparty", + "ci_top_attention_cargo_mq", + "ci_top_attention_cargo_observability", + "ci_top_attention_cargo_ops", + "ci_top_attention_cargo_pyo3", "ci_top_attention_log_mgmt", "ci_top_attention_mq_core", - "ci_top_attention_log_mgmt", ) @@ -550,7 +563,7 @@ def _redirect_process_stdio_to_log( - test_runner can run for hours under terminal/session wrappers that may disappear while the suite is still executing. - A deleted PTY turns ordinary `print(..., flush=True)` into `OSError(EIO)`, which aborts the - runner in collect/finalize paths and leaves case_runs.yaml stuck at a reserved run. + runner in shutdown/finalize paths and leaves case_runs.yaml stuck at a reserved run. - Use a deterministic per-workdir log sink for the whole process, including child subprocesses. """ global _RUNNER_STDIO_LOG_FP @@ -1001,10 +1014,6 @@ def main() -> None: ) _write_yaml_file(run_dir / "summary.yaml", summary) - _run_adapter_action( - resolved_case, run_dir=run_dir, action="collect" - ) - outcome = RUN_OUTCOME_SUCCESS @@ -3063,16 +3072,6 @@ def _deploy_runtime_phase( return _deploy_runtime_phase_after_stage(resolved_case, run_dir=run_dir, phase=phase) -def _collect_runtime_phase( - resolved_case: Dict[str, Any], - *, - run_dir: Path, - phase: _RuntimePhase, -) -> None: - _write_runtime_phase_inputs(resolved_case, run_dir=run_dir, phase=phase) - _run_adapter_action(resolved_case, run_dir=run_dir, action="collect") - - def _ci_cluster_runtime_stage(resolved_case: Dict[str, Any]) -> _RemoteRunDirStage: verify_relpaths = list(CI_CLUSTER_RUNTIME_REMOTE_STAGE_VERIFY_RELPATHS) if _ci_has_instance(resolved_case, instance_id="owner_0"): @@ -3126,12 +3125,6 @@ def _ci_runtime_phase(resolved_case: Dict[str, Any], phase_id: str) -> _RuntimeP write_ctx="CI", stage_run_dir=_ci_runner_runtime_stage(resolved_case), ), - "collect_all": _RuntimePhase( - phase_id="collect_all", - layer=RUNTIME_LAYER_CASE, - instance_ids=CI_RUNTIME_INSTANCE_IDS, - write_ctx="CI", - ), } try: return phases[phase_id] @@ -3183,24 +3176,6 @@ def _test_stack_runtime_phase( write_ctx="TEST_STACK", stage_run_dir=stage_run_dir, ) - if phase_id == "collect_nodes": - if node_ids is None or not node_ids: - raise ValueError("TEST_STACK collect_nodes phase requires non-empty node_ids") - return _RuntimePhase( - phase_id="collect_nodes", - layer=RUNTIME_LAYER_CASE, - instance_ids=node_ids, - write_ctx="TEST_STACK", - ) - if phase_id == "collect_coordinator": - if node_ids is not None: - raise ValueError("TEST_STACK collect_coordinator phase does not accept node_ids") - return _RuntimePhase( - phase_id="collect_coordinator", - layer=RUNTIME_LAYER_CASE, - instance_ids=("coordinator",), - write_ctx="TEST_STACK", - ) raise ValueError(f"unsupported TEST_STACK runtime phase: {phase_id}") @@ -3228,14 +3203,6 @@ def _compile_case_plan(resolved_case: Dict[str, Any]) -> _CasePlan: execute_phases=( _ci_runtime_phase(resolved_case, "ci_runner"), ), - collect_phases=( - _RuntimePhase( - phase_id="collect_all", - layer=RUNTIME_LAYER_CASE, - instance_ids=case_instance_ids, - write_ctx="CI", - ), - ), ) if case_family == CASE_FAMILY_BENCH: deploy = _require_dict(resolved_case.get("deploy"), "resolved_case.deploy") @@ -3292,15 +3259,6 @@ def _compile_case_plan(resolved_case: Dict[str, Any]) -> _CasePlan: include_stage_run_dir=False, ), ), - collect_phases=( - _test_stack_runtime_phase(phase_id="collect_nodes", node_ids=node_ids_tuple), - _RuntimePhase( - phase_id="collect_coordinator", - layer=RUNTIME_LAYER_CASE, - instance_ids=prepare_ids_tuple, - write_ctx="TEST_STACK", - ), - ), ) raise ValueError(f"unsupported case family for case plan: {case_family}") @@ -6933,6 +6891,162 @@ def _runner_native_ci_commands_for_case(case: _ResolvedCase, *, ctx: str) -> Lis "timeout_seconds": 21600, } ] + if scene_id == "ci_top_attention_cargo_fs_core": + return [ + { + "id": "top_attention_cargo_fs_core", + "command": ( + "__RUN_DIR__/venv/bin/python3 -u " + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_fs_core.py" + ), + "timeout_seconds": 21600, + } + ] + if scene_id == "ci_top_attention_cargo_util": + return [ + { + "id": "top_attention_cargo_util", + "command": ( + "__RUN_DIR__/venv/bin/python3 -u " + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_util.py " + "--case-config __RUN_DIR__/configs/ci_scene_config.yaml" + ), + "timeout_seconds": 21600, + } + ] + if scene_id == "ci_top_attention_cargo_kv_unit": + return [ + { + "id": "top_attention_cargo_kv_unit", + "command": ( + "__RUN_DIR__/venv/bin/python3 -u " + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_kv_unit.py " + "--case-config __RUN_DIR__/configs/ci_scene_config.yaml" + ), + "timeout_seconds": 21600, + } + ] + if scene_id == "ci_top_attention_cargo_cli": + return [ + { + "id": "top_attention_cargo_cli", + "command": ( + "__RUN_DIR__/venv/bin/python3 -u " + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_cli.py" + ), + "timeout_seconds": 21600, + } + ] + if scene_id == "ci_top_attention_cargo_commu": + return [ + { + "id": "top_attention_cargo_commu", + "command": ( + "__RUN_DIR__/venv/bin/python3 -u " + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_commu.py" + ), + "timeout_seconds": 21600, + } + ] + if scene_id == "ci_top_attention_cargo_commu_contract": + return [ + { + "id": "top_attention_cargo_commu_contract", + "command": ( + "__RUN_DIR__/venv/bin/python3 -u " + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_commu_contract.py" + ), + "timeout_seconds": 21600, + } + ] + if scene_id == "ci_top_attention_cargo_framework": + return [ + { + "id": "top_attention_cargo_framework", + "command": ( + "__RUN_DIR__/venv/bin/python3 -u " + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_framework.py" + ), + "timeout_seconds": 21600, + } + ] + if scene_id == "ci_top_attention_cargo_fs": + return [ + { + "id": "top_attention_cargo_fs", + "command": ( + "__RUN_DIR__/venv/bin/python3 -u " + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_fs.py" + ), + "timeout_seconds": 21600, + } + ] + if scene_id == "ci_top_attention_cargo_fs_s3_gateway": + return [ + { + "id": "top_attention_cargo_fs_s3_gateway", + "command": ( + "__RUN_DIR__/venv/bin/python3 -u " + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_fs_s3_gateway.py" + ), + "timeout_seconds": 21600, + } + ] + if scene_id == "ci_top_attention_cargo_limit_thirdparty": + return [ + { + "id": "top_attention_cargo_limit_thirdparty", + "command": ( + "__RUN_DIR__/venv/bin/python3 -u " + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_limit_thirdparty.py" + ), + "timeout_seconds": 21600, + } + ] + if scene_id == "ci_top_attention_cargo_mq": + return [ + { + "id": "top_attention_cargo_mq", + "command": ( + "__RUN_DIR__/venv/bin/python3 -u " + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_mq.py" + ), + "timeout_seconds": 21600, + } + ] + if scene_id == "ci_top_attention_cargo_observability": + return [ + { + "id": "top_attention_cargo_observability", + "command": ( + "__RUN_DIR__/venv/bin/python3 -u " + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_observability.py" + ), + "timeout_seconds": 21600, + } + ] + if scene_id == "ci_top_attention_cargo_ops": + return [ + { + "id": "top_attention_cargo_ops", + "command": ( + "__RUN_DIR__/venv/bin/python3 -u " + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_ops.py" + ), + "timeout_seconds": 21600, + } + ] + if scene_id == "ci_top_attention_cargo_pyo3": + return [ + { + "id": "top_attention_cargo_pyo3", + "command": ( + "__RUN_DIR__/venv/bin/python3 -u " + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_pyo3.py" + ), + "timeout_seconds": 21600, + } + ] if scene_id == "ci_top_attention_log_mgmt": return [ { @@ -11245,7 +11359,7 @@ def _run_adapter_action( run_dir: Path, action: str, ) -> Optional[Dict[str, Any]]: - if action not in ("deploy", "collect", "teardown"): + if action not in ("deploy", "teardown"): raise ValueError(f"invalid adapter action: {action}") deploy = _require_dict(resolved_case.get("deploy"), "resolved_case.deploy") @@ -11276,9 +11390,22 @@ def _run_adapter_action( def _run_subprocess(argv: List[str], *, cwd: str) -> None: print("RUN:", " ".join(_shell_quote(a) for a in argv), flush=True) - proc = subprocess.run(argv, cwd=cwd) + proc = subprocess.run(argv, cwd=cwd, capture_output=True, text=True) + if proc.stdout: + sys.stdout.write(proc.stdout) + if not proc.stdout.endswith("\n"): + sys.stdout.write("\n") + sys.stdout.flush() + if proc.stderr: + sys.stderr.write(proc.stderr) + if not proc.stderr.endswith("\n"): + sys.stderr.write("\n") + sys.stderr.flush() if proc.returncode != 0: - raise RuntimeError(f"command failed: rc={proc.returncode}") + raise RuntimeError( + "command failed: " + f"rc={proc.returncode} cwd={cwd} argv={' '.join(_shell_quote(a) for a in argv)}" + ) _SSH_TRANSPORT_TIMEOUT_SECONDS = 180.0 diff --git a/fluxon_test_stack/test_runner_ci_runtime.py b/fluxon_test_stack/test_runner_ci_runtime.py index bef19e2..9e89066 100644 --- a/fluxon_test_stack/test_runner_ci_runtime.py +++ b/fluxon_test_stack/test_runner_ci_runtime.py @@ -11,12 +11,45 @@ def _ci_runtime_python_executable() -> str: - python_bin = shutil.which(_CI_RUNTIME_PYTHON_BIN_NAME) - if python_bin is None: + candidates = [] + seen: set[str] = set() + for raw_candidate in ( + _CI_RUNTIME_PYTHON_BIN_NAME, + "python3", + "python", + ): + resolved = shutil.which(raw_candidate) + if resolved is None or resolved in seen: + continue + seen.add(resolved) + candidates.append(resolved) + if not candidates: raise ValueError( - "CI runtime requires python3.10 on PATH to create the offline-wheelhouse venv" + "CI runtime requires a Python 3.10 interpreter on PATH to create the offline-wheelhouse venv" ) - return python_bin + for python_bin in candidates: + if _python_executable_abi(python_bin) == _TEST_STACK_DEFAULT_PYTHON_ABI: + return python_bin + raise ValueError( + "CI runtime requires a Python 3.10 interpreter on PATH to create the offline-wheelhouse venv" + ) + + +def _python_executable_abi(python_bin: str) -> str: + try: + return subprocess.check_output( + [ + python_bin, + "-c", + ( + "import sys; " + "print(f'{sys.implementation.name}{sys.version_info[0]}.{sys.version_info[1]}')" + ), + ], + text=True, + ).strip() + except (OSError, subprocess.CalledProcessError) as exc: + raise ValueError(f"failed to probe python ABI for executable: {python_bin}") from exc def _ci_runtime_python_abi( @@ -67,7 +100,10 @@ def _create_ci_runtime_venv( if venv_dir.exists(): raise ValueError(f"venv dir already exists (no overwrite): {venv_dir}") python_bin = _ci_runtime_python_executable() - run_subprocess([python_bin, "-m", "venv", str(venv_dir)]) + # Create the CI venv without ensurepip. GitHub-hosted Python 3.10 can fail inside + # venv's implicit ensurepip step even though the interpreter already exposes pip + # through system site-packages. + run_subprocess([python_bin, "-m", "venv", "--system-site-packages", "--without-pip", str(venv_dir)]) venv_python = venv_dir / "bin" / "python3" if not venv_python.exists(): raise ValueError(f"venv python not found after creation: {venv_python}") diff --git a/fluxon_test_stack/test_runner_models.py b/fluxon_test_stack/test_runner_models.py index cb38467..dcb3a5c 100644 --- a/fluxon_test_stack/test_runner_models.py +++ b/fluxon_test_stack/test_runner_models.py @@ -85,7 +85,6 @@ class _CasePlan: case_family: str prepare_phases: Tuple[_RuntimePhase, ...] execute_phases: Tuple[_RuntimePhase, ...] - collect_phases: Tuple[_RuntimePhase, ...] @dataclass(frozen=True) diff --git a/fluxon_test_stack/test_runner_runtime_backend.py b/fluxon_test_stack/test_runner_runtime_backend.py index 14a85e4..9e35e73 100644 --- a/fluxon_test_stack/test_runner_runtime_backend.py +++ b/fluxon_test_stack/test_runner_runtime_backend.py @@ -394,8 +394,6 @@ def _execute_ci_case( counted=False, ci_out={"rc": rc}, ) - for phase in prepared_case.plan.collect_phases: - ctx._collect_runtime_phase(resolved_case, run_dir=run_dir, phase=phase) return ctx._ExecutedCase(outcome=outcome, summary=summary) @@ -414,7 +412,6 @@ def _execute_test_stack_case( outcome = ctx.RUN_OUTCOME_FAILED error_detail: Optional[str] = None - collect_error_detail: Optional[str] = None result_obj: Optional[Dict[str, Any]] = None try: @@ -445,12 +442,6 @@ def _execute_test_stack_case( outcome = ctx.RUN_OUTCOME_SUCCESS except Exception as exc: # noqa: BLE001 error_detail = f"{type(exc).__name__}: {exc}" - finally: - try: - for phase in prepared_case.plan.collect_phases: - ctx._collect_runtime_phase(resolved_case, run_dir=run_dir, phase=phase) - except Exception as exc: # noqa: BLE001 - collect_error_detail = f"{type(exc).__name__}: {exc}" summary = { "schema_version": ctx.SCHEMA_VERSION, @@ -472,7 +463,6 @@ def _execute_test_stack_case( "result_path": str(_require_test_stack_result_path(prepared_case.test_stack_result_path)), "result": result_obj, "error": error_detail, - "collect_error": collect_error_detail, }, } return ctx._ExecutedCase(outcome=outcome, summary=summary) diff --git a/fluxon_test_stack/tests/test_ci_2_virt_node_contract.py b/fluxon_test_stack/tests/test_ci_2_virt_node_contract.py index 6ebbecd..b861806 100644 --- a/fluxon_test_stack/tests/test_ci_2_virt_node_contract.py +++ b/fluxon_test_stack/tests/test_ci_2_virt_node_contract.py @@ -28,6 +28,18 @@ def _load_module(): class TestCi2VirtNodeContract(unittest.TestCase): _KVTEST_SCENE_ID = "ci_top_attention_bin_kvtest" + _CARGO_KV_UNIT_SCENE_ID = "ci_top_attention_cargo_kv_unit" + _CARGO_CLI_SCENE_ID = "ci_top_attention_cargo_cli" + _CARGO_COMMU_SCENE_ID = "ci_top_attention_cargo_commu" + _CARGO_COMMU_CONTRACT_SCENE_ID = "ci_top_attention_cargo_commu_contract" + _CARGO_FRAMEWORK_SCENE_ID = "ci_top_attention_cargo_framework" + _CARGO_FS_SCENE_ID = "ci_top_attention_cargo_fs" + _CARGO_FS_S3_GATEWAY_SCENE_ID = "ci_top_attention_cargo_fs_s3_gateway" + _CARGO_LIMIT_THIRDPARTY_SCENE_ID = "ci_top_attention_cargo_limit_thirdparty" + _CARGO_MQ_SCENE_ID = "ci_top_attention_cargo_mq" + _CARGO_OBSERVABILITY_SCENE_ID = "ci_top_attention_cargo_observability" + _CARGO_OPS_SCENE_ID = "ci_top_attention_cargo_ops" + _CARGO_PYO3_SCENE_ID = "ci_top_attention_cargo_pyo3" _DOC_SCENE_ID = "ci_top_attention_doc_page_build" _LOG_MGMT_SCENE_ID = "ci_top_attention_log_mgmt" _MQ_SCENE_ID = "ci_top_attention_mq_core" @@ -193,6 +205,62 @@ def test_generated_suite_preserves_source_scene_configs(self) -> None: "p2p_only", ) + def test_generated_suite_injects_public_transport_feature_for_cargo_kv_unit(self) -> None: + suite_cfg = _ENTRY._load_yaml_mapping(_ENTRY.DEFAULT_SUITE_PATH, ctx="suite") + generated = _ENTRY._rewrite_suite_for_local_dual_nodes( + suite_cfg=suite_cfg, + scene_ids=[self._CARGO_KV_UNIT_SCENE_ID], + primary_node_name="local-node-a", + secondary_node_name="local-node-b", + host_ip="10.1.1.119", + wheel_name="fluxon-0.2.1-cp38-abi3-manylinux_2_28_x86_64.whl", + controller_port=19080, + ) + + self.assertEqual( + generated["profiles"]["fluxon_tcp_thread"]["runtime"]["ci"]["scene_configs"][self._CARGO_KV_UNIT_SCENE_ID][ + "kv_transport_feature" + ], + "tcp_thread_transport", + ) + + def test_generated_suite_supports_additional_runner_native_cargo_scenes(self) -> None: + scene_ids = [ + self._CARGO_CLI_SCENE_ID, + self._CARGO_COMMU_SCENE_ID, + self._CARGO_COMMU_CONTRACT_SCENE_ID, + self._CARGO_FRAMEWORK_SCENE_ID, + self._CARGO_FS_SCENE_ID, + self._CARGO_FS_S3_GATEWAY_SCENE_ID, + self._CARGO_LIMIT_THIRDPARTY_SCENE_ID, + self._CARGO_MQ_SCENE_ID, + self._CARGO_OBSERVABILITY_SCENE_ID, + self._CARGO_OPS_SCENE_ID, + self._CARGO_PYO3_SCENE_ID, + ] + suite_cfg = _ENTRY._load_yaml_mapping(_ENTRY.DEFAULT_SUITE_PATH, ctx="suite") + generated = _ENTRY._rewrite_suite_for_local_dual_nodes( + suite_cfg=suite_cfg, + scene_ids=scene_ids, + primary_node_name="local-node-a", + secondary_node_name="local-node-b", + host_ip="10.1.1.119", + wheel_name="fluxon-0.2.1-cp38-abi3-manylinux_2_28_x86_64.whl", + controller_port=19080, + ) + + self.assertEqual(set(generated["scenes"].keys()), set(scene_ids)) + for scene_id in scene_ids: + self.assertEqual( + generated["scenes"][scene_id]["ci"]["runtime_contract"], + "rust_self_managed", + ) + self.assertEqual( + generated["scenes"][scene_id]["ci"]["subject"], + "rust", + ) + self.assertNotIn("commands", generated["scenes"][scene_id]["ci"]) + def test_generated_suite_supports_doc_page_ci_scene(self) -> None: suite_cfg = _ENTRY._load_yaml_mapping(_ENTRY.DEFAULT_SUITE_PATH, ctx="suite") generated = _ENTRY._rewrite_suite_for_local_dual_nodes( diff --git a/fluxon_test_stack/tests/test_runner_contract.py b/fluxon_test_stack/tests/test_runner_contract.py index f2e5a64..8fd293d 100644 --- a/fluxon_test_stack/tests/test_runner_contract.py +++ b/fluxon_test_stack/tests/test_runner_contract.py @@ -59,6 +59,14 @@ def _build_checks(selected_test_id: Optional[str]) -> List[Tuple[str, Callable[[ "ci_top_attention_doc_page_build_uses_online_docker_image", test_ci_top_attention_doc_page_build_uses_online_docker_image, ), + ( + "ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime", + test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime, + ), + ( + "ci_top_attention_additional_cargo_scenes_exist", + test_ci_top_attention_additional_cargo_scenes_exist, + ), ( "ci_top_attention_log_mgmt_scene_exists", test_ci_top_attention_log_mgmt_scene_exists, @@ -285,6 +293,65 @@ def test_ci_top_attention_log_mgmt_scene_exists() -> None: return print("PASS: test_ci_top_attention_log_mgmt_scene_exists") + +def test_ci_top_attention_additional_cargo_scenes_exist() -> None: + repo_root = Path(__file__).resolve().parents[2] + suite_cfg_path = repo_root / "fluxon_test_stack" / "ci_test_list.yaml" + suite_cfg = yaml.safe_load(suite_cfg_path.read_text(encoding="utf-8")) + if not isinstance(suite_cfg, dict): + print("FAIL: test_ci_top_attention_additional_cargo_scenes_exist - suite config is not a mapping") + return + + suite = _TEST_RUNNER._parse_suite_config(copy.deepcopy(suite_cfg)) + expected_scene_ids = { + "ci_top_attention_cargo_cli", + "ci_top_attention_cargo_commu", + "ci_top_attention_cargo_commu_contract", + "ci_top_attention_cargo_framework", + "ci_top_attention_cargo_fs", + "ci_top_attention_cargo_fs_s3_gateway", + "ci_top_attention_cargo_limit_thirdparty", + "ci_top_attention_cargo_mq", + "ci_top_attention_cargo_observability", + "ci_top_attention_cargo_ops", + "ci_top_attention_cargo_pyo3", + } + missing = sorted(scene_id for scene_id in expected_scene_ids if scene_id not in suite.scenes) + if missing: + print( + "FAIL: test_ci_top_attention_additional_cargo_scenes_exist - " + f"missing scenes: {missing!r}" + ) + return + for scene_id in sorted(expected_scene_ids): + scene = suite.scenes.get(scene_id) + if not isinstance(scene, dict): + print( + "FAIL: test_ci_top_attention_additional_cargo_scenes_exist - " + f"scene is not a mapping: {scene_id!r}" + ) + return + ci = scene.get("ci") + if not isinstance(ci, dict): + print( + "FAIL: test_ci_top_attention_additional_cargo_scenes_exist - " + f"scene.ci missing: {scene_id!r}" + ) + return + if ci.get("subject") != "rust": + print( + "FAIL: test_ci_top_attention_additional_cargo_scenes_exist - " + f"expected subject 'rust' for {scene_id!r}, got {ci.get('subject')!r}" + ) + return + if ci.get("runtime_contract") != "rust_self_managed": + print( + "FAIL: test_ci_top_attention_additional_cargo_scenes_exist - " + f"expected runtime_contract 'rust_self_managed' for {scene_id!r}, got {ci.get('runtime_contract')!r}" + ) + return + print("PASS: test_ci_top_attention_additional_cargo_scenes_exist") + def test_ci_top_attention_mq_core_uses_cluster_kv_owner_runtime() -> None: repo_root = Path(__file__).resolve().parents[2] suite_cfg_path = repo_root / "fluxon_test_stack" / "ci_test_list.yaml" @@ -339,5 +406,59 @@ def test_ci_top_attention_mq_core_uses_cluster_kv_owner_runtime() -> None: print("PASS: test_ci_top_attention_mq_core_uses_cluster_kv_owner_runtime") +def test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime() -> None: + repo_root = Path(__file__).resolve().parents[2] + suite_cfg_path = repo_root / "fluxon_test_stack" / "ci_test_list.yaml" + suite_cfg = yaml.safe_load(suite_cfg_path.read_text(encoding="utf-8")) + if not isinstance(suite_cfg, dict): + print("FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - suite config is not a mapping") + return + + suite_for_contract = copy.deepcopy(suite_cfg) + suite = _TEST_RUNNER._parse_suite_config(suite_for_contract) + cases = _TEST_RUNNER._expand_cases(suite) + case = next( + ( + item + for item in cases + if item.scene_id == "ci_top_attention_cargo_kv_unit" + and item.profile_id == "fluxon_tcp" + ), + None, + ) + if case is None: + print("FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - missing cargo kv unit case") + return + planned = _TEST_RUNNER._build_ci_execution_plan(case, suite) + if len(planned) != 1: + print( + "FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - " + f"expected one planned case, got {len(planned)}" + ) + return + commands = planned[0].ci_commands + if len(commands) != 1: + print( + "FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - " + f"expected one command, got {len(commands)}" + ) + return + command = commands[0] + if command.get("id") != "top_attention_cargo_kv_unit": + print( + "FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - " + f"unexpected command id: {command.get('id')!r}" + ) + return + command_text = command.get("command") + if not isinstance(command_text, str) or "_cargo_kv_unit.py --case-config __RUN_DIR__/configs/ci_scene_config.yaml" not in command_text: + print( + "FAIL: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime - " + f"unexpected command: {command_text!r}" + ) + return + print("PASS: test_ci_top_attention_cargo_kv_unit_uses_rust_self_managed_runtime") + + if __name__ == "__main__": raise SystemExit(main()) diff --git a/fluxon_test_stack/tests/test_test_runner_testbed_contract.py b/fluxon_test_stack/tests/test_test_runner_testbed_contract.py index 86f41cb..982378f 100644 --- a/fluxon_test_stack/tests/test_test_runner_testbed_contract.py +++ b/fluxon_test_stack/tests/test_test_runner_testbed_contract.py @@ -5,6 +5,7 @@ import importlib.util import json import os +import subprocess import sys import tarfile import tempfile @@ -35,6 +36,7 @@ def _load_module(): _RUNNER = _load_module() +_CI_RUNTIME_MOD = sys.modules["test_runner_ci_runtime"] class TestTestRunnerTestbedContract(unittest.TestCase): @@ -70,10 +72,23 @@ def test_write_ci_master_owner_configs_emits_owner_large_file_paths(self) -> Non def test_ci_runtime_python_executable_requires_python310_on_path(self) -> None: with mock.patch.object(_RUNNER.shutil, "which", return_value=None): - with self.assertRaisesRegex(ValueError, "requires python3.10 on PATH"): + with self.assertRaisesRegex(ValueError, "requires a Python 3.10 interpreter on PATH"): _RUNNER._ci_runtime_python_executable() - def test_create_ci_runtime_venv_uses_python310(self) -> None: + def test_ci_runtime_python_executable_accepts_python3_alias_when_it_is_python310(self) -> None: + with mock.patch.object( + _RUNNER.shutil, + "which", + side_effect=lambda name: { + "python3.10": None, + "python3": "/usr/bin/python3", + "python": "/usr/bin/python", + }.get(name), + ): + with mock.patch.object(_CI_RUNTIME_MOD, "_python_executable_abi", return_value="cpython3.10"): + self.assertEqual(_RUNNER._ci_runtime_python_executable(), "/usr/bin/python3") + + def test_create_ci_runtime_venv_uses_python310_abi_without_ensurepip(self) -> None: with tempfile.TemporaryDirectory() as td: run_dir = Path(td) venv_dir = (run_dir / "venv").resolve() @@ -82,21 +97,64 @@ def test_create_ci_runtime_venv_uses_python310(self) -> None: def _fake_create_venv(argv: list[str], *, cwd: str) -> None: self.assertEqual( argv, - ["/usr/bin/python3.10", "-m", "venv", str(venv_dir)], + [ + "/usr/bin/python3.10", + "-m", + "venv", + "--system-site-packages", + "--without-pip", + str(venv_dir), + ], ) self.assertEqual(cwd, str(run_dir)) expected_venv_python.parent.mkdir(parents=True, exist_ok=True) expected_venv_python.write_text("#!/bin/sh\n", encoding="utf-8") with mock.patch.object(_RUNNER.shutil, "which", return_value="/usr/bin/python3.10"): - with mock.patch.object(_RUNNER, "_run_subprocess", side_effect=_fake_create_venv) as run_subprocess_mock: - with mock.patch.object(_RUNNER, "_assert_ci_runtime_python_abi") as assert_python_abi: - venv_python = _RUNNER._create_ci_runtime_venv(run_dir=run_dir) + with mock.patch.object(_CI_RUNTIME_MOD, "_python_executable_abi", return_value="cpython3.10"): + with mock.patch.object(_RUNNER, "_run_subprocess", side_effect=_fake_create_venv) as run_subprocess_mock: + with mock.patch.object(_RUNNER, "_assert_ci_runtime_python_abi") as assert_python_abi: + venv_python = _RUNNER._create_ci_runtime_venv(run_dir=run_dir) self.assertEqual(venv_python, expected_venv_python) run_subprocess_mock.assert_called_once() assert_python_abi.assert_called_once_with(venv_python=expected_venv_python) + def test_runner_native_bin_kvtest_scene_stays_on_direct_wrapper_command(self) -> None: + suite = _RUNNER._parse_suite_config( + yaml.safe_load( + (REPO_ROOT / "fluxon_test_stack" / "ci_test_list.yaml").read_text(encoding="utf-8") + ) + ) + cases = _RUNNER._expand_cases(suite) + case = next(item for item in cases if item.scene_id == "ci_top_attention_bin_kvtest" and item.profile_id == "fluxon_tcp") + + planned = _RUNNER._build_ci_execution_plan(case, suite) + + self.assertEqual(len(planned), 1) + self.assertEqual(planned[0].ci_commands[0]["id"], "top_attention_bin_kvtest") + self.assertIn( + "fluxon_test_stack/top_attention_test_index/_bin_kvtest.py", + planned[0].ci_commands[0]["command"], + ) + + def test_run_subprocess_reports_cwd_and_argv_on_failure(self) -> None: + completed = subprocess.CompletedProcess( + args=["/usr/bin/python3", "-c", "raise SystemExit(2)"], + returncode=2, + stdout="", + stderr="boom\n", + ) + with mock.patch.object(_RUNNER.subprocess, "run", return_value=completed): + with self.assertRaisesRegex( + RuntimeError, + r"command failed: rc=2 cwd=/tmp argv=/usr/bin/python3 -c 'raise SystemExit\(2\)'", + ): + _RUNNER._run_subprocess( + ["/usr/bin/python3", "-c", "raise SystemExit(2)"], + cwd="/tmp", + ) + def test_assert_ci_runtime_python_abi_accepts_python310_venv(self) -> None: with mock.patch.object(_RUNNER.subprocess, "check_output", return_value="cpython3.10\n") as check_output_mock: _RUNNER._assert_ci_runtime_python_abi(venv_python=Path("/tmp/venv/bin/python3")) @@ -313,6 +371,77 @@ def test_top_attention_ci_execution_plan_is_runner_native(self) -> None: self.assertEqual(planned[0].ci_commands[0]["id"], "top_attention_bin_kvtest") self.assertIn("--case-config __RUN_DIR__/configs/ci_scene_config.yaml", planned[0].ci_commands[0]["command"]) + def test_top_attention_cargo_fs_core_ci_execution_plan_is_runner_native(self) -> None: + suite_cfg = yaml.safe_load((_RUNNER.RUNNER_REPO_ROOT / "fluxon_test_stack" / "ci_test_list.yaml").read_text(encoding="utf-8")) + suite = _RUNNER._parse_suite_config(suite_cfg) + cases = _RUNNER._expand_cases(suite) + case = next(item for item in cases if item.scene_id == "ci_top_attention_cargo_fs_core" and item.profile_id == "fluxon_tcp") + planned = _RUNNER._build_ci_execution_plan(case, suite) + self.assertEqual(len(planned), 1) + self.assertEqual(planned[0].ci_commands[0]["id"], "top_attention_cargo_fs_core") + self.assertIn( + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_fs_core.py", + planned[0].ci_commands[0]["command"], + ) + self.assertNotIn("--case-config", planned[0].ci_commands[0]["command"]) + + def test_top_attention_cargo_util_ci_execution_plan_is_runner_native(self) -> None: + suite_cfg = yaml.safe_load((_RUNNER.RUNNER_REPO_ROOT / "fluxon_test_stack" / "ci_test_list.yaml").read_text(encoding="utf-8")) + suite = _RUNNER._parse_suite_config(suite_cfg) + cases = _RUNNER._expand_cases(suite) + case = next(item for item in cases if item.scene_id == "ci_top_attention_cargo_util" and item.profile_id == "fluxon_tcp") + planned = _RUNNER._build_ci_execution_plan(case, suite) + self.assertEqual(len(planned), 1) + self.assertEqual(planned[0].ci_commands[0]["id"], "top_attention_cargo_util") + self.assertIn( + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_util.py", + planned[0].ci_commands[0]["command"], + ) + self.assertIn("--case-config __RUN_DIR__/configs/ci_scene_config.yaml", planned[0].ci_commands[0]["command"]) + + def test_top_attention_cargo_kv_unit_ci_execution_plan_is_runner_native(self) -> None: + suite_cfg = yaml.safe_load((_RUNNER.RUNNER_REPO_ROOT / "fluxon_test_stack" / "ci_test_list.yaml").read_text(encoding="utf-8")) + suite = _RUNNER._parse_suite_config(suite_cfg) + cases = _RUNNER._expand_cases(suite) + case = next(item for item in cases if item.scene_id == "ci_top_attention_cargo_kv_unit" and item.profile_id == "fluxon_tcp") + planned = _RUNNER._build_ci_execution_plan(case, suite) + self.assertEqual(len(planned), 1) + self.assertEqual(planned[0].ci_commands[0]["id"], "top_attention_cargo_kv_unit") + self.assertIn( + "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_kv_unit.py", + planned[0].ci_commands[0]["command"], + ) + self.assertIn("--case-config __RUN_DIR__/configs/ci_scene_config.yaml", planned[0].ci_commands[0]["command"]) + + def test_additional_top_attention_cargo_ci_execution_plans_are_runner_native(self) -> None: + suite_cfg = yaml.safe_load((_RUNNER.RUNNER_REPO_ROOT / "fluxon_test_stack" / "ci_test_list.yaml").read_text(encoding="utf-8")) + suite = _RUNNER._parse_suite_config(suite_cfg) + cases = _RUNNER._expand_cases(suite) + expected = { + "ci_top_attention_cargo_cli": ("top_attention_cargo_cli", "_cargo_cli.py"), + "ci_top_attention_cargo_commu": ("top_attention_cargo_commu", "_cargo_commu.py"), + "ci_top_attention_cargo_commu_contract": ("top_attention_cargo_commu_contract", "_cargo_commu_contract.py"), + "ci_top_attention_cargo_framework": ("top_attention_cargo_framework", "_cargo_framework.py"), + "ci_top_attention_cargo_fs": ("top_attention_cargo_fs", "_cargo_fs.py"), + "ci_top_attention_cargo_fs_s3_gateway": ("top_attention_cargo_fs_s3_gateway", "_cargo_fs_s3_gateway.py"), + "ci_top_attention_cargo_limit_thirdparty": ("top_attention_cargo_limit_thirdparty", "_cargo_limit_thirdparty.py"), + "ci_top_attention_cargo_mq": ("top_attention_cargo_mq", "_cargo_mq.py"), + "ci_top_attention_cargo_observability": ("top_attention_cargo_observability", "_cargo_observability.py"), + "ci_top_attention_cargo_ops": ("top_attention_cargo_ops", "_cargo_ops.py"), + "ci_top_attention_cargo_pyo3": ("top_attention_cargo_pyo3", "_cargo_pyo3.py"), + } + for scene_id, (command_id, script_name) in expected.items(): + with self.subTest(scene_id=scene_id): + case = next(item for item in cases if item.scene_id == scene_id and item.profile_id == "fluxon_tcp") + planned = _RUNNER._build_ci_execution_plan(case, suite) + self.assertEqual(len(planned), 1) + self.assertEqual(planned[0].ci_commands[0]["id"], command_id) + self.assertIn( + f"__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/{script_name}", + planned[0].ci_commands[0]["command"], + ) + self.assertNotIn("--case-config", planned[0].ci_commands[0]["command"]) + def test_top_attention_log_mgmt_ci_execution_plan_is_runner_native(self) -> None: suite_cfg = yaml.safe_load((_RUNNER.RUNNER_REPO_ROOT / "fluxon_test_stack" / "ci_test_list.yaml").read_text(encoding="utf-8")) artifact_sets = suite_cfg.get("artifact_sets") @@ -333,7 +462,6 @@ def test_top_attention_log_mgmt_ci_execution_plan_is_runner_native(self) -> None self.assertEqual(planned[0].ci_commands[0]["id"], "top_attention_log_mgmt") self.assertIn( "__RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_log_mgmt.py", - planned[0].ci_commands[0]["command"], ) self.assertIn("--case-config __RUN_DIR__/configs/ci_scene_config.yaml", planned[0].ci_commands[0]["command"]) @@ -352,6 +480,38 @@ def test_top_attention_mq_core_ci_execution_plan_is_runner_native(self) -> None: ) self.assertIn("--case-config __RUN_DIR__/configs/ci_scene_config.yaml", planned[0].ci_commands[0]["command"]) + def test_top_attention_mq_core_ci_plan_has_no_collect_phase(self) -> None: + resolved_case = { + "case": { + "family": "ci", + "case_id": "ci_top_attention_mq_core__n1_kvowner_dram_20gib__fluxon_tcp_thread", + }, + "scene": { + "ci": { + "runtime_contract": "cluster_kv_owner", + "subject": "mq", + }, + }, + "deploy": { + "instances": [ + {"id": "master"}, + {"id": "owner_0"}, + {"id": "ci_runner"}, + ], + }, + "runtime_model": { + "test_bed": {"kind": "ops"}, + "base_runtime": {}, + "case_runtime": {"instance_ids": ["master", "owner_0", "ci_runner"]}, + }, + } + case_plan = _RUNNER._compile_case_plan(resolved_case) + self.assertEqual( + tuple(case_plan.__dataclass_fields__.keys()), + ("case_family", "prepare_phases", "execute_phases"), + ) + self.assertEqual(case_plan.execute_phases[0].instance_ids, ("ci_runner",)) + def test_doc_page_ci_execution_plan_uses_online_docker_image(self) -> None: suite_cfg = yaml.safe_load((_RUNNER.RUNNER_REPO_ROOT / "fluxon_test_stack" / "ci_test_list.yaml").read_text(encoding="utf-8")) suite = _RUNNER._parse_suite_config(suite_cfg) diff --git a/fluxon_test_stack/tests/test_top_attention_bin_kvtest_contract.py b/fluxon_test_stack/tests/test_top_attention_bin_kvtest_contract.py index 4d9b39c..81a4e33 100644 --- a/fluxon_test_stack/tests/test_top_attention_bin_kvtest_contract.py +++ b/fluxon_test_stack/tests/test_top_attention_bin_kvtest_contract.py @@ -97,6 +97,10 @@ def test_main_writes_build_config_ext_and_calls_cargo(self) -> None: run_cargo.call_args.kwargs["env"]["FLUXON_KV_TEST_ROUNDS"], "p2p_only", ) + self.assertEqual( + run_cargo.call_args.kwargs["env"]["FLUXON_BUILD_CONFIG_EXT_PATH"], + str((src_dir / "build_config_ext.yml").resolve()), + ) if __name__ == "__main__": diff --git a/fluxon_test_stack/tests/test_top_attention_cargo_fs_core_contract.py b/fluxon_test_stack/tests/test_top_attention_cargo_fs_core_contract.py new file mode 100644 index 0000000..f1cddbe --- /dev/null +++ b/fluxon_test_stack/tests/test_top_attention_cargo_fs_core_contract.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import importlib.util +import sys +import unittest +from pathlib import Path +from unittest import mock + + +REPO_ROOT = Path(__file__).resolve().parents[2] +MODULE_PATH = REPO_ROOT / "fluxon_test_stack" / "top_attention_test_index" / "_cargo_fs_core.py" + + +def _load_module(): + module_dir = MODULE_PATH.parent + sys.path.insert(0, str(module_dir)) + try: + spec = importlib.util.spec_from_file_location("fluxon_test_stack_top_attention_cargo_fs_core_contract", MODULE_PATH) + assert spec is not None and spec.loader is not None + mod = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = mod + spec.loader.exec_module(mod) + return mod + finally: + if sys.path and sys.path[0] == str(module_dir): + sys.path.pop(0) + + +_ENTRY = _load_module() + + +class TestTopAttentionCargoFsCoreContract(unittest.TestCase): + def test_main_calls_cargo_test_for_fs_core_crate(self) -> None: + with mock.patch.object(_ENTRY, "run_cargo", return_value=0) as run_cargo: + with mock.patch.object(sys, "argv", [str(MODULE_PATH)]): + rc = _ENTRY.main() + + self.assertEqual(rc, 0) + self.assertEqual( + run_cargo.call_args.args[0], + [ + "test", + "--manifest-path", + str(REPO_ROOT / "fluxon_rs" / "fluxon_fs_core" / "Cargo.toml"), + ], + ) + + def test_main_rejects_pytest_style_passthrough_flags(self) -> None: + with mock.patch.object(sys, "argv", [str(MODULE_PATH), "-k", "lease"]): + with self.assertRaises(SystemExit) as cm: + _ENTRY.main() + + self.assertEqual(cm.exception.code, 2) + + +if __name__ == "__main__": + raise SystemExit(unittest.main()) diff --git a/fluxon_test_stack/tests/test_top_attention_cargo_kv_unit_contract.py b/fluxon_test_stack/tests/test_top_attention_cargo_kv_unit_contract.py new file mode 100644 index 0000000..3387ae2 --- /dev/null +++ b/fluxon_test_stack/tests/test_top_attention_cargo_kv_unit_contract.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import importlib.util +import sys +import tempfile +import unittest +from pathlib import Path +from unittest import mock + +import yaml + + +REPO_ROOT = Path(__file__).resolve().parents[2] +MODULE_PATH = REPO_ROOT / "fluxon_test_stack" / "top_attention_test_index" / "_cargo_kv_unit.py" + + +def _load_module(): + module_dir = MODULE_PATH.parent + sys.path.insert(0, str(module_dir)) + try: + spec = importlib.util.spec_from_file_location("fluxon_test_stack_top_attention_cargo_kv_unit_contract", MODULE_PATH) + assert spec is not None and spec.loader is not None + mod = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = mod + spec.loader.exec_module(mod) + return mod + finally: + if sys.path and sys.path[0] == str(module_dir): + sys.path.pop(0) + + +_ENTRY = _load_module() + + +class TestTopAttentionCargoKvUnitContract(unittest.TestCase): + def test_main_accepts_case_config_and_writes_build_config_ext(self) -> None: + with tempfile.TemporaryDirectory() as td: + run_dir = Path(td) + cfg_dir = run_dir / "configs" + cfg_dir.mkdir(parents=True) + src_dir = run_dir / "src" + src_dir.mkdir(parents=True) + case_cfg = cfg_dir / "ci_scene_config.yaml" + case_cfg.write_text( + yaml.safe_dump( + { + "case": { + "scene_id": "ci_top_attention_cargo_kv_unit", + "scale_id": "n1_kvowner_dram_20gib", + "profile_id": "fluxon_tcp", + "case_id": "ci_top_attention_cargo_kv_unit__n1_kvowner_dram_20gib__fluxon_tcp", + }, + "scene_config": { + "kv_transport_feature": "tcp_thread_transport", + }, + "scene_runtime": { + "etcd": {"ip": "127.0.0.1", "port": 19180}, + "greptime": {"ip": "127.0.0.1", "port": 19190}, + }, + }, + sort_keys=False, + ), + encoding="utf-8", + ) + + with mock.patch.object(_ENTRY, "run_cargo", return_value=0) as run_cargo: + with mock.patch.object( + sys, + "argv", + [str(MODULE_PATH), "--case-config", str(case_cfg), "--feature", "tcp_thread_transport"], + ): + rc = _ENTRY.main() + + self.assertEqual(rc, 0) + build_cfg = yaml.safe_load((src_dir / "build_config_ext.yml").read_text(encoding="utf-8")) + self.assertEqual( + build_cfg, + { + "etcd": "127.0.0.1:19180", + "prom": "http://127.0.0.1:19190/v1/prometheus", + "prom_remote_write_url": "http://127.0.0.1:19190/v1/prometheus/write", + }, + ) + self.assertEqual( + run_cargo.call_args.args[0], + [ + "test", + "--manifest-path", + str(REPO_ROOT / "fluxon_rs" / "fluxon_kv" / "Cargo.toml"), + "--no-default-features", + "--features", + "p2p_transfer,tcp_thread_transport", + ], + ) + self.assertEqual( + run_cargo.call_args.kwargs["env"]["FLUXON_BUILD_CONFIG_EXT_PATH"], + str((src_dir / "build_config_ext.yml").resolve()), + ) + + def test_main_rejects_feature_mismatch_when_case_config_is_present(self) -> None: + with tempfile.TemporaryDirectory() as td: + run_dir = Path(td) + cfg_dir = run_dir / "configs" + cfg_dir.mkdir(parents=True) + case_cfg = cfg_dir / "ci_scene_config.yaml" + case_cfg.write_text( + yaml.safe_dump( + { + "case": {"scene_id": "ci_top_attention_cargo_kv_unit"}, + "scene_config": {"kv_transport_feature": "tcp_thread_transport"}, + "scene_runtime": { + "etcd": {"ip": "127.0.0.1", "port": 19180}, + "greptime": {"ip": "127.0.0.1", "port": 19190}, + }, + }, + sort_keys=False, + ), + encoding="utf-8", + ) + with mock.patch.object( + sys, + "argv", + [str(MODULE_PATH), "--case-config", str(case_cfg), "--feature", "fastws_transport"], + ): + with self.assertRaisesRegex(ValueError, "must match scene_config.kv_transport_feature"): + _ENTRY.main() + + def test_main_rejects_pytest_style_passthrough_flags(self) -> None: + with mock.patch.object(sys, "argv", [str(MODULE_PATH), "-k", "lease"]): + with self.assertRaises(SystemExit) as cm: + _ENTRY.main() + + self.assertEqual(cm.exception.code, 2) + + +if __name__ == "__main__": + raise SystemExit(unittest.main()) diff --git a/fluxon_test_stack/tests/test_top_attention_cargo_tikv_contract.py b/fluxon_test_stack/tests/test_top_attention_cargo_tikv_contract.py new file mode 100644 index 0000000..63b9686 --- /dev/null +++ b/fluxon_test_stack/tests/test_top_attention_cargo_tikv_contract.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import importlib.util +import sys +import unittest +from pathlib import Path +from unittest import mock + + +REPO_ROOT = Path(__file__).resolve().parents[2] +INDEX_DIR = REPO_ROOT / "fluxon_test_stack" / "top_attention_test_index" + +MODULE_SPECS = { + "_cargo_fs.py": "fluxon_rs/fluxon_fs/Cargo.toml", + "_cargo_fs_s3_gateway.py": "fluxon_rs/fluxon_fs_s3_gateway/Cargo.toml", +} + + +def _load_module(module_name: str): + module_path = INDEX_DIR / module_name + module_dir = module_path.parent + sys.path.insert(0, str(module_dir)) + try: + spec = importlib.util.spec_from_file_location( + f"fluxon_test_stack_{module_path.stem}_contract", + module_path, + ) + assert spec is not None and spec.loader is not None + mod = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = mod + spec.loader.exec_module(mod) + return mod + finally: + if sys.path and sys.path[0] == str(module_dir): + sys.path.pop(0) + + +class TestTopAttentionCargoTikvContract(unittest.TestCase): + def test_main_calls_cargo_test_for_expected_manifest(self) -> None: + for module_name, manifest_relpath in MODULE_SPECS.items(): + with self.subTest(module_name=module_name): + entry = _load_module(module_name) + module_path = INDEX_DIR / module_name + with mock.patch.object(entry, "run_cargo", return_value=0) as run_cargo: + with mock.patch.object(sys, "argv", [str(module_path)]): + rc = entry.main() + + self.assertEqual(rc, 0) + self.assertEqual( + run_cargo.call_args.args[0], + [ + "test", + "--manifest-path", + str(REPO_ROOT / manifest_relpath), + ], + ) + + def test_main_rejects_pytest_style_passthrough_flags(self) -> None: + for module_name in MODULE_SPECS: + with self.subTest(module_name=module_name): + entry = _load_module(module_name) + module_path = INDEX_DIR / module_name + with mock.patch.object(sys, "argv", [str(module_path), "-k", "lease"]): + with self.assertRaises(SystemExit) as cm: + entry.main() + self.assertEqual(cm.exception.code, 2) + + +if __name__ == "__main__": + raise SystemExit(unittest.main()) diff --git a/fluxon_test_stack/tests/test_top_attention_cargo_util_contract.py b/fluxon_test_stack/tests/test_top_attention_cargo_util_contract.py new file mode 100644 index 0000000..84a26dd --- /dev/null +++ b/fluxon_test_stack/tests/test_top_attention_cargo_util_contract.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import importlib.util +import sys +import tempfile +import unittest +from pathlib import Path +from unittest import mock + +import yaml + + +REPO_ROOT = Path(__file__).resolve().parents[2] +MODULE_PATH = REPO_ROOT / "fluxon_test_stack" / "top_attention_test_index" / "_cargo_util.py" + + +def _load_module(): + module_dir = MODULE_PATH.parent + sys.path.insert(0, str(module_dir)) + try: + spec = importlib.util.spec_from_file_location("fluxon_test_stack_top_attention_cargo_util_contract", MODULE_PATH) + assert spec is not None and spec.loader is not None + mod = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = mod + spec.loader.exec_module(mod) + return mod + finally: + if sys.path and sys.path[0] == str(module_dir): + sys.path.pop(0) + + +_ENTRY = _load_module() + + +class TestTopAttentionCargoUtilContract(unittest.TestCase): + def test_main_accepts_case_config_and_writes_build_config_ext(self) -> None: + with tempfile.TemporaryDirectory() as td: + run_dir = Path(td) + cfg_dir = run_dir / "configs" + cfg_dir.mkdir(parents=True) + src_dir = run_dir / "src" + src_dir.mkdir(parents=True) + case_cfg = cfg_dir / "ci_scene_config.yaml" + case_cfg.write_text( + yaml.safe_dump( + { + "case": { + "scene_id": "ci_top_attention_cargo_util", + "scale_id": "n1_kvowner_dram_20gib", + "profile_id": "fluxon_tcp", + "case_id": "ci_top_attention_cargo_util__n1_kvowner_dram_20gib__fluxon_tcp", + }, + "scene_config": {}, + "scene_runtime": { + "etcd": {"ip": "127.0.0.1", "port": 19180}, + "greptime": {"ip": "127.0.0.1", "port": 19190}, + }, + }, + sort_keys=False, + ), + encoding="utf-8", + ) + + with mock.patch.object(_ENTRY, "run_cargo", return_value=0) as run_cargo: + with mock.patch.object(sys, "argv", [str(MODULE_PATH), "--case-config", str(case_cfg)]): + rc = _ENTRY.main() + + self.assertEqual(rc, 0) + build_cfg = yaml.safe_load((src_dir / "build_config_ext.yml").read_text(encoding="utf-8")) + self.assertEqual( + build_cfg, + { + "etcd": "127.0.0.1:19180", + "prom": "http://127.0.0.1:19190/v1/prometheus", + "prom_remote_write_url": "http://127.0.0.1:19190/v1/prometheus/write", + }, + ) + self.assertEqual( + run_cargo.call_args.args[0], + [ + "test", + "--manifest-path", + str(REPO_ROOT / "fluxon_rs" / "fluxon_util" / "Cargo.toml"), + ], + ) + self.assertEqual( + run_cargo.call_args.kwargs["env"]["FLUXON_BUILD_CONFIG_EXT_PATH"], + str((src_dir / "build_config_ext.yml").resolve()), + ) + + def test_main_rejects_pytest_style_passthrough_flags(self) -> None: + with mock.patch.object(sys, "argv", [str(MODULE_PATH), "-k", "lease"]): + with self.assertRaises(SystemExit) as cm: + _ENTRY.main() + + self.assertEqual(cm.exception.code, 2) + + +if __name__ == "__main__": + raise SystemExit(unittest.main()) diff --git a/fluxon_test_stack/tests/test_top_attention_cargo_workspace_contract.py b/fluxon_test_stack/tests/test_top_attention_cargo_workspace_contract.py new file mode 100644 index 0000000..66a600d --- /dev/null +++ b/fluxon_test_stack/tests/test_top_attention_cargo_workspace_contract.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import importlib.util +import sys +import unittest +from pathlib import Path +from unittest import mock + + +REPO_ROOT = Path(__file__).resolve().parents[2] +INDEX_DIR = REPO_ROOT / "fluxon_test_stack" / "top_attention_test_index" + +MODULE_SPECS = { + "_cargo_cli.py": "fluxon_rs/fluxon_cli/Cargo.toml", + "_cargo_commu.py": "fluxon_rs/fluxon_commu/Cargo.toml", + "_cargo_commu_contract.py": "fluxon_rs/fluxon_commu_contract/Cargo.toml", + "_cargo_framework.py": "fluxon_rs/fluxon_framework/Cargo.toml", + "_cargo_limit_thirdparty.py": "fluxon_rs/limit_thirdparty/Cargo.toml", + "_cargo_mq.py": "fluxon_rs/fluxon_mq/Cargo.toml", + "_cargo_observability.py": "fluxon_rs/fluxon_observability/Cargo.toml", + "_cargo_ops.py": "fluxon_rs/fluxon_ops/Cargo.toml", + "_cargo_pyo3.py": "fluxon_rs/fluxon_pyo3/Cargo.toml", +} + + +def _load_module(module_name: str): + module_path = INDEX_DIR / module_name + module_dir = module_path.parent + sys.path.insert(0, str(module_dir)) + try: + spec = importlib.util.spec_from_file_location( + f"fluxon_test_stack_{module_path.stem}_contract", + module_path, + ) + assert spec is not None and spec.loader is not None + mod = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = mod + spec.loader.exec_module(mod) + return mod + finally: + if sys.path and sys.path[0] == str(module_dir): + sys.path.pop(0) + + +class TestTopAttentionCargoWorkspaceContract(unittest.TestCase): + def test_main_calls_cargo_test_for_expected_manifest(self) -> None: + for module_name, manifest_relpath in MODULE_SPECS.items(): + with self.subTest(module_name=module_name): + entry = _load_module(module_name) + module_path = INDEX_DIR / module_name + with mock.patch.object(entry, "run_cargo", return_value=0) as run_cargo: + with mock.patch.object(sys, "argv", [str(module_path)]): + rc = entry.main() + + self.assertEqual(rc, 0) + self.assertEqual( + run_cargo.call_args.args[0], + [ + "test", + "--manifest-path", + str(REPO_ROOT / manifest_relpath), + ], + ) + + def test_main_rejects_pytest_style_passthrough_flags(self) -> None: + for module_name in MODULE_SPECS: + with self.subTest(module_name=module_name): + entry = _load_module(module_name) + module_path = INDEX_DIR / module_name + with mock.patch.object(sys, "argv", [str(module_path), "-k", "lease"]): + with self.assertRaises(SystemExit) as cm: + entry.main() + self.assertEqual(cm.exception.code, 2) + + +if __name__ == "__main__": + raise SystemExit(unittest.main()) diff --git a/fluxon_test_stack/tests/test_top_attention_common_contract.py b/fluxon_test_stack/tests/test_top_attention_common_contract.py index 924269d..ff5e442 100644 --- a/fluxon_test_stack/tests/test_top_attention_common_contract.py +++ b/fluxon_test_stack/tests/test_top_attention_common_contract.py @@ -27,7 +27,7 @@ def _load_module(): class TestTopAttentionCommonContract(unittest.TestCase): - def test_prepare_cargo_env_prefers_active_fluxon_pyo3_libs_dir(self) -> None: + def test_prepare_cargo_env_prefers_active_fluxon_pyo3_libs_dir_without_overriding_loader_path(self) -> None: with tempfile.TemporaryDirectory() as td: root = Path(td) active_site_packages = root / "venv" / "lib" / "python3.12" / "site-packages" @@ -46,18 +46,40 @@ def test_prepare_cargo_env_prefers_active_fluxon_pyo3_libs_dir(self) -> None: ): with mock.patch.object(_ENTRY.site, "getsitepackages", return_value=[str(stale_libs_dir.parent)]): with mock.patch.object(_ENTRY.site, "getusersitepackages", return_value=""): - prepared_env = _ENTRY._prepare_cargo_env( - { - "LD_LIBRARY_PATH": f"{stale_libs_dir}:/usr/lib:/opt/custom", - "PATH": "/usr/bin", - } - ) + with mock.patch.object(_ENTRY, "_resolve_repo_closed_sdk_root", return_value=None): + prepared_env = _ENTRY._prepare_cargo_env( + { + "LD_LIBRARY_PATH": f"{stale_libs_dir}:/usr/lib:/opt/custom", + "PATH": "/usr/bin", + } + ) assert prepared_env is not None self.assertEqual(prepared_env["FLUXON_PYO3_LIBS_DIR"], str(active_libs_dir.resolve())) + self.assertEqual(prepared_env["LD_LIBRARY_PATH"], f"{stale_libs_dir}:/usr/lib:/opt/custom") + self.assertEqual(prepared_env["PATH"], "/usr/bin") + + def test_prepare_cargo_env_prepends_repo_closed_sdk_runtime(self) -> None: + with tempfile.TemporaryDirectory() as td: + root = Path(td) + closed_sdk_root = root / "fluxon_release" / "closed_sdk" + (closed_sdk_root / "lib").mkdir(parents=True) + (closed_sdk_root / "manifest.json").write_text("{}", encoding="utf-8") + + with mock.patch.object(_ENTRY, "REPO_ROOT", root): + with mock.patch.object(_ENTRY, "_resolve_authoritative_fluxon_pyo3_libs_dir", return_value=None): + prepared_env = _ENTRY._prepare_cargo_env( + { + "LD_LIBRARY_PATH": "/usr/lib:/opt/custom", + "PATH": "/usr/bin", + } + ) + + assert prepared_env is not None + self.assertEqual(prepared_env["FLUXON_COMMU_CLOSED_SDK_ROOT"], str(closed_sdk_root.resolve())) self.assertEqual( prepared_env["LD_LIBRARY_PATH"], - f"{active_libs_dir.resolve()}:/usr/lib:/opt/custom", + f"{(closed_sdk_root / 'lib').resolve()}:/usr/lib:/opt/custom", ) self.assertEqual(prepared_env["PATH"], "/usr/bin") diff --git a/fluxon_test_stack/top_attention_test_index/README.md b/fluxon_test_stack/top_attention_test_index/README.md index e36b326..11c55c9 100644 --- a/fluxon_test_stack/top_attention_test_index/README.md +++ b/fluxon_test_stack/top_attention_test_index/README.md @@ -49,9 +49,20 @@ Entries: - `_deployment_codegen.py`: deployment code generation coverage - `_log_mgmt.py`: shared-supervisor ops log rolling plus Rust KV log sharding coverage. `ci_test_list.yaml` now exposes this wrapper as the formal `ci_top_attention_log_mgmt` scene, and `test_runner.py` dispatches to it from the runner-native `top_attention` CI execution model. - `_script_tools.py`: script utility coverage -- `_cargo_fs_core.py`: cargo tests for the Rust FS core crate -- `_cargo_util.py`: cargo tests for the Rust util crate -- `_cargo_kv_unit.py`: cargo tests for the Rust KV crate +- `_cargo_fs_core.py`: cargo tests for the Rust FS core crate. `ci_test_list.yaml` now exposes this wrapper as the formal `ci_top_attention_cargo_fs_core` runner-native scene. +- `_cargo_util.py`: cargo tests for the Rust util crate. `ci_test_list.yaml` now exposes this wrapper as the formal `ci_top_attention_cargo_util` runner-native scene, with runtime endpoints supplied through canonical `--case-config`. +- `_cargo_kv_unit.py`: cargo tests for the Rust KV crate. `ci_test_list.yaml` now exposes this wrapper as the formal `ci_top_attention_cargo_kv_unit` runner-native scene, with transport feature selection bounded by `scene_config.kv_transport_feature`. +- `_cargo_cli.py`: cargo tests for the Rust CLI crate +- `_cargo_commu.py`: cargo tests for the Rust communication facade crate +- `_cargo_commu_contract.py`: cargo tests for the Rust communication contract crate +- `_cargo_framework.py`: cargo tests for the Rust framework crate +- `_cargo_fs.py`: cargo tests for the Rust FS crate. This wrapper expects the prepared `fluxon_release/ext_images/tikv/*` runtime files. +- `_cargo_fs_s3_gateway.py`: cargo tests for the Rust FS S3 gateway crate. This wrapper expects the prepared `fluxon_release/ext_images/tikv/*` runtime files. +- `_cargo_limit_thirdparty.py`: cargo tests for the Rust third-party facade crate +- `_cargo_mq.py`: cargo tests for the Rust MQ crate +- `_cargo_observability.py`: cargo tests for the Rust observability crate +- `_cargo_ops.py`: cargo tests for the Rust ops crate +- `_cargo_pyo3.py`: cargo tests for the Rust PyO3 crate Operational note: @@ -60,6 +71,9 @@ Operational note: provide at least 308 common non-bastion deploy targets in `target_ip_map` for the default 300-producer/8-consumer topology; pass `--config` for the large cluster suite before running it. +- All `_cargo_*.py` wrappers are direct-process entrypoints. They do not forward + `pytest` selectors or `cargo test` passthrough flags unless the wrapper + explicitly defines that surface. Known gap: diff --git a/fluxon_test_stack/top_attention_test_index/_bin_kvtest.py b/fluxon_test_stack/top_attention_test_index/_bin_kvtest.py index faddb51..40ecb80 100644 --- a/fluxon_test_stack/top_attention_test_index/_bin_kvtest.py +++ b/fluxon_test_stack/top_attention_test_index/_bin_kvtest.py @@ -5,9 +5,13 @@ import os from pathlib import Path -import yaml - -from _common import REPO_ROOT, load_case_config_payload, run_cargo +from _common import ( + REPO_ROOT, + inject_build_config_ext_env, + load_case_config_payload, + run_cargo, + write_build_config_ext, +) TEST_REQUIREMENTS = ["cargo", "etcd", "ops", "submodules"] @@ -33,38 +37,6 @@ def _parse_kv_test_rounds(raw: object) -> str: return ",".join(rounds) -def _require_scene_runtime_endpoint(scene_runtime: object, *, service_id: str) -> tuple[str, int]: - if not isinstance(scene_runtime, dict): - raise ValueError("case config scene_runtime must be a mapping") - raw_service = scene_runtime.get(service_id) - if not isinstance(raw_service, dict): - raise ValueError(f"case config scene_runtime.{service_id} must be a mapping") - ip = str(raw_service.get("ip") or "").strip() - if not ip: - raise ValueError(f"case config scene_runtime.{service_id}.ip must be set") - port = raw_service.get("port") - if not isinstance(port, int): - raise ValueError(f"case config scene_runtime.{service_id}.port must be an int") - return ip, port - - -def _write_build_config_ext(case_cfg_path: Path, scene_runtime: dict) -> None: - etcd_ip, etcd_port = _require_scene_runtime_endpoint(scene_runtime, service_id="etcd") - greptime_ip, greptime_port = _require_scene_runtime_endpoint(scene_runtime, service_id="greptime") - out_path = case_cfg_path.resolve().parents[1] / "src" / "build_config_ext.yml" - out_path.write_text( - yaml.safe_dump( - { - "etcd": f"{etcd_ip}:{etcd_port}", - "prom": f"http://{greptime_ip}:{greptime_port}/v1/prometheus", - "prom_remote_write_url": f"http://{greptime_ip}:{greptime_port}/v1/prometheus/write", - }, - sort_keys=False, - ), - encoding="utf-8", - ) - - def main() -> int: parser = argparse.ArgumentParser( description="Flat index entry for the existing Rust kv_test binary." @@ -85,7 +57,7 @@ def main() -> int: scene_runtime = case_payload.get("scene_runtime") if not isinstance(scene_runtime, dict): raise ValueError("case config must define scene_runtime mapping") - _write_build_config_ext(case_cfg_path, scene_runtime) + build_config_ext_path = write_build_config_ext(case_cfg_path, scene_runtime=scene_runtime) cargo_args = [ "run", @@ -99,9 +71,11 @@ def main() -> int: ] if passthrough: cargo_args.extend(["--", *passthrough]) - env = None + env = inject_build_config_ext_env( + None, + build_config_ext_path=build_config_ext_path, + ) if rounds != "all": - env = os.environ.copy() env["FLUXON_KV_TEST_ROUNDS"] = rounds return run_cargo(cargo_args, env=env) diff --git a/fluxon_test_stack/top_attention_test_index/_cargo_cli.py b/fluxon_test_stack/top_attention_test_index/_cargo_cli.py new file mode 100644 index 0000000..cf56b5e --- /dev/null +++ b/fluxon_test_stack/top_attention_test_index/_cargo_cli.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse + +from _common import REPO_ROOT, run_cargo + + +TEST_REQUIREMENTS = ["cargo", "ops", "submodules"] + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Flat index entry for Rust CLI crate tests." + ) + parser.parse_args() + return run_cargo([ + "test", + "--manifest-path", + str(REPO_ROOT / "fluxon_rs" / "fluxon_cli" / "Cargo.toml"), + ]) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/fluxon_test_stack/top_attention_test_index/_cargo_commu.py b/fluxon_test_stack/top_attention_test_index/_cargo_commu.py new file mode 100644 index 0000000..e1fd14c --- /dev/null +++ b/fluxon_test_stack/top_attention_test_index/_cargo_commu.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse + +from _common import REPO_ROOT, run_cargo + + +TEST_REQUIREMENTS = ["cargo", "ops", "submodules"] + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Flat index entry for Rust communication facade crate tests." + ) + parser.parse_args() + return run_cargo([ + "test", + "--manifest-path", + str(REPO_ROOT / "fluxon_rs" / "fluxon_commu" / "Cargo.toml"), + ]) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/fluxon_test_stack/top_attention_test_index/_cargo_commu_contract.py b/fluxon_test_stack/top_attention_test_index/_cargo_commu_contract.py new file mode 100644 index 0000000..8e15c4f --- /dev/null +++ b/fluxon_test_stack/top_attention_test_index/_cargo_commu_contract.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse + +from _common import REPO_ROOT, run_cargo + + +TEST_REQUIREMENTS = ["cargo", "ops", "submodules"] + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Flat index entry for Rust communication contract crate tests." + ) + parser.parse_args() + return run_cargo([ + "test", + "--manifest-path", + str(REPO_ROOT / "fluxon_rs" / "fluxon_commu_contract" / "Cargo.toml"), + ]) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/fluxon_test_stack/top_attention_test_index/_cargo_framework.py b/fluxon_test_stack/top_attention_test_index/_cargo_framework.py new file mode 100644 index 0000000..a6430de --- /dev/null +++ b/fluxon_test_stack/top_attention_test_index/_cargo_framework.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse + +from _common import REPO_ROOT, run_cargo + + +TEST_REQUIREMENTS = ["cargo", "ops", "submodules"] + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Flat index entry for Rust framework crate tests." + ) + parser.parse_args() + return run_cargo([ + "test", + "--manifest-path", + str(REPO_ROOT / "fluxon_rs" / "fluxon_framework" / "Cargo.toml"), + ]) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/fluxon_test_stack/top_attention_test_index/_cargo_fs.py b/fluxon_test_stack/top_attention_test_index/_cargo_fs.py new file mode 100644 index 0000000..38eb589 --- /dev/null +++ b/fluxon_test_stack/top_attention_test_index/_cargo_fs.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse + +from _common import REPO_ROOT, run_cargo + + +TEST_REQUIREMENTS = ["cargo", "fluxon-release", "ops", "submodules", "tikv"] + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Flat index entry for Rust FS crate tests." + ) + parser.parse_args() + return run_cargo([ + "test", + "--manifest-path", + str(REPO_ROOT / "fluxon_rs" / "fluxon_fs" / "Cargo.toml"), + ]) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/fluxon_test_stack/top_attention_test_index/_cargo_fs_core.py b/fluxon_test_stack/top_attention_test_index/_cargo_fs_core.py index cbca6f5..0af437c 100755 --- a/fluxon_test_stack/top_attention_test_index/_cargo_fs_core.py +++ b/fluxon_test_stack/top_attention_test_index/_cargo_fs_core.py @@ -1,6 +1,8 @@ #!/usr/bin/env python3 from __future__ import annotations +import argparse + from _common import REPO_ROOT, run_cargo @@ -8,6 +10,10 @@ def main() -> int: + parser = argparse.ArgumentParser( + description="Flat index entry for Rust FS core crate tests." + ) + parser.parse_args() return run_cargo([ "test", "--manifest-path", diff --git a/fluxon_test_stack/top_attention_test_index/_cargo_fs_s3_gateway.py b/fluxon_test_stack/top_attention_test_index/_cargo_fs_s3_gateway.py new file mode 100644 index 0000000..4b7d89e --- /dev/null +++ b/fluxon_test_stack/top_attention_test_index/_cargo_fs_s3_gateway.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse + +from _common import REPO_ROOT, run_cargo + + +TEST_REQUIREMENTS = ["cargo", "fluxon-release", "ops", "submodules", "tikv"] + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Flat index entry for Rust FS S3 gateway crate tests." + ) + parser.parse_args() + return run_cargo([ + "test", + "--manifest-path", + str(REPO_ROOT / "fluxon_rs" / "fluxon_fs_s3_gateway" / "Cargo.toml"), + ]) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/fluxon_test_stack/top_attention_test_index/_cargo_kv_unit.py b/fluxon_test_stack/top_attention_test_index/_cargo_kv_unit.py index 36ae5ff..43ce921 100755 --- a/fluxon_test_stack/top_attention_test_index/_cargo_kv_unit.py +++ b/fluxon_test_stack/top_attention_test_index/_cargo_kv_unit.py @@ -3,32 +3,64 @@ import argparse import os +from pathlib import Path -from _common import REPO_ROOT, run_cargo +from _common import ( + REPO_ROOT, + inject_build_config_ext_env, + load_case_config_payload, + run_cargo, + write_build_config_ext, +) TEST_REQUIREMENTS = ["cargo", "etcd", "ops", "submodules"] +SCENE_ID = "ci_top_attention_cargo_kv_unit" def main() -> int: parser = argparse.ArgumentParser( description="Flat index entry for Rust KV crate unit tests." ) + parser.add_argument( + "--case-config", + help="Canonical CI case config YAML emitted by test_runner.", + ) parser.add_argument( "--feature", default=os.environ.get("FLUXON_KV_TEST_TRANSPORT_FEATURE", "tcp_thread_transport"), help="Transport feature appended to p2p_transfer.", ) - args, passthrough = parser.parse_known_args() + args = parser.parse_args() + feature = str(args.feature).strip() + env = None + if args.case_config: + case_cfg_path = Path(args.case_config).resolve() + case_payload = load_case_config_payload(case_cfg_path, expected_scene_id=SCENE_ID) + scene_config = case_payload["scene_config"] + configured_feature = str(scene_config.get("kv_transport_feature") or "").strip() + if not configured_feature: + raise ValueError("scene_config.kv_transport_feature must be set") + if feature != configured_feature: + raise ValueError( + f"--feature must match scene_config.kv_transport_feature when --case-config is set: {configured_feature!r}" + ) + scene_runtime = case_payload.get("scene_runtime") + if not isinstance(scene_runtime, dict): + raise ValueError("case config must define scene_runtime mapping") + build_config_ext_path = write_build_config_ext(case_cfg_path, scene_runtime=scene_runtime) + env = inject_build_config_ext_env( + env, + build_config_ext_path=build_config_ext_path, + ) return run_cargo([ "test", "--manifest-path", str(REPO_ROOT / "fluxon_rs" / "fluxon_kv" / "Cargo.toml"), "--no-default-features", "--features", - f"p2p_transfer,{args.feature}", - *passthrough, - ]) + f"p2p_transfer,{feature}", + ], env=env) if __name__ == "__main__": diff --git a/fluxon_test_stack/top_attention_test_index/_cargo_limit_thirdparty.py b/fluxon_test_stack/top_attention_test_index/_cargo_limit_thirdparty.py new file mode 100644 index 0000000..1ef196b --- /dev/null +++ b/fluxon_test_stack/top_attention_test_index/_cargo_limit_thirdparty.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse + +from _common import REPO_ROOT, run_cargo + + +TEST_REQUIREMENTS = ["cargo", "ops", "submodules"] + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Flat index entry for Rust limit_thirdparty crate tests." + ) + parser.parse_args() + return run_cargo([ + "test", + "--manifest-path", + str(REPO_ROOT / "fluxon_rs" / "limit_thirdparty" / "Cargo.toml"), + ]) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/fluxon_test_stack/top_attention_test_index/_cargo_mq.py b/fluxon_test_stack/top_attention_test_index/_cargo_mq.py new file mode 100644 index 0000000..aab3ff5 --- /dev/null +++ b/fluxon_test_stack/top_attention_test_index/_cargo_mq.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse + +from _common import REPO_ROOT, run_cargo + + +TEST_REQUIREMENTS = ["cargo", "ops", "submodules"] + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Flat index entry for Rust MQ crate tests." + ) + parser.parse_args() + return run_cargo([ + "test", + "--manifest-path", + str(REPO_ROOT / "fluxon_rs" / "fluxon_mq" / "Cargo.toml"), + ]) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/fluxon_test_stack/top_attention_test_index/_cargo_observability.py b/fluxon_test_stack/top_attention_test_index/_cargo_observability.py new file mode 100644 index 0000000..5a5ee96 --- /dev/null +++ b/fluxon_test_stack/top_attention_test_index/_cargo_observability.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse + +from _common import REPO_ROOT, run_cargo + + +TEST_REQUIREMENTS = ["cargo", "ops", "submodules"] + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Flat index entry for Rust observability crate tests." + ) + parser.parse_args() + return run_cargo([ + "test", + "--manifest-path", + str(REPO_ROOT / "fluxon_rs" / "fluxon_observability" / "Cargo.toml"), + ]) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/fluxon_test_stack/top_attention_test_index/_cargo_ops.py b/fluxon_test_stack/top_attention_test_index/_cargo_ops.py new file mode 100644 index 0000000..ffd1a11 --- /dev/null +++ b/fluxon_test_stack/top_attention_test_index/_cargo_ops.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse + +from _common import REPO_ROOT, run_cargo + + +TEST_REQUIREMENTS = ["cargo", "ops", "submodules"] + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Flat index entry for Rust ops crate tests." + ) + parser.parse_args() + return run_cargo([ + "test", + "--manifest-path", + str(REPO_ROOT / "fluxon_rs" / "fluxon_ops" / "Cargo.toml"), + ]) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/fluxon_test_stack/top_attention_test_index/_cargo_pyo3.py b/fluxon_test_stack/top_attention_test_index/_cargo_pyo3.py new file mode 100644 index 0000000..4ee9a4c --- /dev/null +++ b/fluxon_test_stack/top_attention_test_index/_cargo_pyo3.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse + +from _common import REPO_ROOT, run_cargo + + +TEST_REQUIREMENTS = ["cargo", "ops", "submodules"] + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Flat index entry for Rust PyO3 crate tests." + ) + parser.parse_args() + return run_cargo([ + "test", + "--manifest-path", + str(REPO_ROOT / "fluxon_rs" / "fluxon_pyo3" / "Cargo.toml"), + ]) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/fluxon_test_stack/top_attention_test_index/_cargo_util.py b/fluxon_test_stack/top_attention_test_index/_cargo_util.py index 2e707c8..dc02589 100755 --- a/fluxon_test_stack/top_attention_test_index/_cargo_util.py +++ b/fluxon_test_stack/top_attention_test_index/_cargo_util.py @@ -1,18 +1,47 @@ #!/usr/bin/env python3 from __future__ import annotations -from _common import REPO_ROOT, run_cargo +import argparse +from pathlib import Path +from _common import ( + REPO_ROOT, + inject_build_config_ext_env, + load_case_config_payload, + run_cargo, + write_build_config_ext, +) TEST_REQUIREMENTS = ["cargo", "etcd", "ops", "submodules"] +SCENE_ID = "ci_top_attention_cargo_util" def main() -> int: + parser = argparse.ArgumentParser( + description="Flat index entry for Rust util crate tests." + ) + parser.add_argument( + "--case-config", + help="Canonical CI case config YAML emitted by test_runner.", + ) + args = parser.parse_args() + env = None + if args.case_config: + case_cfg_path = Path(args.case_config).resolve() + case_payload = load_case_config_payload(case_cfg_path, expected_scene_id=SCENE_ID) + scene_runtime = case_payload.get("scene_runtime") + if not isinstance(scene_runtime, dict): + raise ValueError("case config must define scene_runtime mapping") + build_config_ext_path = write_build_config_ext(case_cfg_path, scene_runtime=scene_runtime) + env = inject_build_config_ext_env( + env, + build_config_ext_path=build_config_ext_path, + ) return run_cargo([ "test", "--manifest-path", str(REPO_ROOT / "fluxon_rs" / "fluxon_util" / "Cargo.toml"), - ]) + ], env=env) if __name__ == "__main__": diff --git a/fluxon_test_stack/top_attention_test_index/_common.py b/fluxon_test_stack/top_attention_test_index/_common.py index c890584..e3f83a4 100755 --- a/fluxon_test_stack/top_attention_test_index/_common.py +++ b/fluxon_test_stack/top_attention_test_index/_common.py @@ -14,6 +14,7 @@ REPO_ROOT = Path(__file__).resolve().parents[2] TEST_REQUIREMENTS: list[str] = ["ops"] +BUILD_CONFIG_EXT_PATH_ENV = "FLUXON_BUILD_CONFIG_EXT_PATH" def call(cmd: Sequence[str], *, env: dict[str, str] | None = None) -> int: @@ -98,8 +99,49 @@ def load_case_config_payload(path: str | Path, *, expected_scene_id: str) -> dic return raw -def _path_contains_fluxon_pyo3_libs_dir(path: Path) -> bool: - return "fluxon_pyo3.libs" in path.parts +def _require_scene_runtime_endpoint(scene_runtime: object, *, service_id: str) -> tuple[str, int]: + if not isinstance(scene_runtime, dict): + raise ValueError("case config scene_runtime must be a mapping") + raw_service = scene_runtime.get(service_id) + if not isinstance(raw_service, dict): + raise ValueError(f"case config scene_runtime.{service_id} must be a mapping") + ip = str(raw_service.get("ip") or "").strip() + if not ip: + raise ValueError(f"case config scene_runtime.{service_id}.ip must be set") + port = raw_service.get("port") + if not isinstance(port, int): + raise ValueError(f"case config scene_runtime.{service_id}.port must be an int") + return ip, port + + +def write_build_config_ext(case_cfg_path: str | Path, *, scene_runtime: object) -> Path: + cfg_path = Path(case_cfg_path).resolve() + etcd_ip, etcd_port = _require_scene_runtime_endpoint(scene_runtime, service_id="etcd") + greptime_ip, greptime_port = _require_scene_runtime_endpoint(scene_runtime, service_id="greptime") + out_path = cfg_path.parents[1] / "src" / "build_config_ext.yml" + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_text( + yaml.safe_dump( + { + "etcd": f"{etcd_ip}:{etcd_port}", + "prom": f"http://{greptime_ip}:{greptime_port}/v1/prometheus", + "prom_remote_write_url": f"http://{greptime_ip}:{greptime_port}/v1/prometheus/write", + }, + sort_keys=False, + ), + encoding="utf-8", + ) + return out_path + + +def inject_build_config_ext_env( + env: dict[str, str] | None, + *, + build_config_ext_path: str | Path, +) -> dict[str, str]: + prepared_env = os.environ.copy() if env is None else dict(env) + prepared_env[BUILD_CONFIG_EXT_PATH_ENV] = str(Path(build_config_ext_path).resolve()) + return prepared_env def _iter_active_python_site_packages_roots() -> list[Path]: @@ -136,30 +178,68 @@ def _resolve_authoritative_fluxon_pyo3_libs_dir() -> Path | None: return None -def _prepare_cargo_env(env: dict[str, str] | None) -> dict[str, str] | None: - libs_dir = _resolve_authoritative_fluxon_pyo3_libs_dir() - if libs_dir is None: - return None if env is None else dict(env) +def _prepend_env_path_list( + prepared_env: dict[str, str], + *, + key: str, + entries: Sequence[str], +) -> None: + normalized_entries: list[str] = [] + seen_entries: set[str] = set() + for raw_entry in entries: + entry = raw_entry.strip() + if not entry: + continue + if entry in seen_entries: + continue + seen_entries.add(entry) + normalized_entries.append(entry) + current_value = prepared_env.get(key) + if current_value is None: + prepared_env[key] = ":".join(normalized_entries) + return + + for raw_entry in current_value.split(":"): + entry = raw_entry.strip() + if not entry: + continue + if entry in seen_entries: + continue + seen_entries.add(entry) + normalized_entries.append(entry) + prepared_env[key] = ":".join(normalized_entries) + + +def _resolve_repo_closed_sdk_root() -> Path | None: + closed_sdk_root = (REPO_ROOT / "fluxon_release" / "closed_sdk").resolve() + if not closed_sdk_root.is_dir(): + return None + manifest_path = closed_sdk_root / "manifest.json" + lib_dir = closed_sdk_root / "lib" + if not manifest_path.is_file() or not lib_dir.is_dir(): + return None + return closed_sdk_root + + +def _prepare_cargo_env(env: dict[str, str] | None) -> dict[str, str] | None: prepared_env = os.environ.copy() if env is None else dict(env) - authoritative_entry = str(libs_dir) - prepared_env["FLUXON_PYO3_LIBS_DIR"] = authoritative_entry - - sanitized_entries = [authoritative_entry] - seen_entries = {authoritative_entry} - current_ld_library_path = prepared_env.get("LD_LIBRARY_PATH") - if current_ld_library_path is not None: - for raw_entry in current_ld_library_path.split(":"): - entry = raw_entry.strip() - if not entry: - continue - if entry in seen_entries: - continue - if _path_contains_fluxon_pyo3_libs_dir(Path(entry)): - continue - seen_entries.add(entry) - sanitized_entries.append(entry) - prepared_env["LD_LIBRARY_PATH"] = ":".join(sanitized_entries) + + libs_dir = _resolve_authoritative_fluxon_pyo3_libs_dir() + if libs_dir is not None: + prepared_env["FLUXON_PYO3_LIBS_DIR"] = str(libs_dir) + + closed_sdk_root = _resolve_repo_closed_sdk_root() + if closed_sdk_root is not None: + prepared_env["FLUXON_COMMU_CLOSED_SDK_ROOT"] = str(closed_sdk_root) + _prepend_env_path_list( + prepared_env, + key="LD_LIBRARY_PATH", + entries=[str((closed_sdk_root / "lib").resolve())], + ) + + if env is None and libs_dir is None and closed_sdk_root is None: + return None return prepared_env