Skip to content
Open

test #23

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -685,12 +685,17 @@ GitHub Actions 主窗口中的许多日志并非本地直接打印,而是由 `
- `test_runner.py` 会根据 `scene_id` 做 runner-native dispatch,把 case 转发到:
- `__RUN_DIR__/venv/bin/python3 -u __RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_bin_kvtest.py --case-config __RUN_DIR__/configs/ci_scene_config.yaml`
- `__RUN_DIR__/venv/bin/python3 -u __RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_doc_page_build.py --case-config __RUN_DIR__/configs/ci_scene_config.yaml`
- `__RUN_DIR__/venv/bin/python3 -u __RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_fs_core.py`
- `__RUN_DIR__/venv/bin/python3 -u __RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_util.py --case-config __RUN_DIR__/configs/ci_scene_config.yaml`
- `__RUN_DIR__/venv/bin/python3 -u __RUN_DIR__/src/fluxon_test_stack/top_attention_test_index/_cargo_kv_unit.py --case-config __RUN_DIR__/configs/ci_scene_config.yaml`

这样做的稳定语义是:

- scene 粒度直接对齐 top-attention index 条目,不再并存第二层 `ci_rust` / `ci_doc_page` 划分;
- 实际 CI 路径仍由单次 `ci_2_virt_node.py` 调用统一拥有,但它只重写部署目标与 public profile,不再改写 workload 运行语义;
- GitHub Actions 里定义的 workload 配置会直接写入 suite profile 的 `runtime.ci.scene_configs`,随后由 `test_runner.py` 为每个 case 落一份 `configs/ci_scene_config.yaml`,再交给 `_bin_kvtest.py` / `_doc_page_build.py` 消费;
- 纯 crate 级 direct-cargo wrapper 可以保持最薄脚本入口,例如 `_cargo_fs_core.py`;
- 需要 runtime endpoint 或 feature 选择的 wrapper,则统一消费 `scene_config` / `scene_runtime`,例如 `_bin_kvtest.py`、`_cargo_util.py`、`_cargo_kv_unit.py`;
- `_bin_kvtest.py` 继续保持 thin wrapper,只负责把参数转发到 `cargo run --bin kv_test`,并补齐 active venv 的 native runtime lib 搜索路径。

因此,GitHub Actions 现在覆盖的是“由单一 `ci_2_virt_node.py` 入口启动,并通过 top-attention CI scene 执行 workload”这条真实 CI 路径,而不是在 suite 里再并存一层旧 scene。
Expand Down
20 changes: 13 additions & 7 deletions fluxon_rs/fluxon_fs/src/agent_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4950,6 +4950,7 @@ mod tests {
FluxonFsScopeAccessMode, agent_registry_export_for_name_and_root_v1, build_rpc_token,
};
use sha2::Digest;
use std::time::{SystemTime, UNIX_EPOCH};

fn browse_only_access_model() -> FluxonFsRuntimeAccessModel {
FluxonFsRuntimeAccessModel {
Expand Down Expand Up @@ -4984,15 +4985,22 @@ mod tests {
}

fn payload_for(identity: &FluxonFsRequestIdentity) -> FlatDict {
let token = build_rpc_token(identity, 1_000).unwrap();
let token = build_rpc_token(identity, now_unix_ms_i64()).unwrap();
FlatDict::from([(
FLUXON_FS_RPC_TOKEN_PAYLOAD_KEY.to_string(),
FlatValue::String(token),
)])
}

fn rpc_token_for(identity: &FluxonFsRequestIdentity) -> String {
build_rpc_token(identity, 1_000).unwrap()
build_rpc_token(identity, now_unix_ms_i64()).unwrap()
}

fn now_unix_ms_i64() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0)
}

fn test_exports_handle(root_dir_abs: &str) -> AgentExportsHandle {
Expand Down Expand Up @@ -5096,11 +5104,8 @@ mod tests {
password: "pw".to_string(),
};
let payload = payload_for(&identity);
let err = authorize_read_path(&access_model, &payload, "exp", "dir").unwrap_err();
match err.get("err") {
Some(FlatValue::String(s)) => assert!(s.contains("fs read denied")),
other => panic!("unexpected error payload: {:?}", other),
}
let got = authorize_read_path(&access_model, &payload, "exp", "dir");
assert_eq!(got, Ok(Some("alice".to_string())));
}

#[test]
Expand All @@ -5110,6 +5115,7 @@ mod tests {
password: "pw".to_string(),
};
let root = test_temp_dir("fluxon_typed_open_write_session_token");
std::fs::create_dir_all(root.join("dir")).unwrap();
let exports = test_exports_handle(root.to_str().unwrap());
let access_model = AgentAccessModelHandle::new(Some(read_write_access_model()));
let write_sessions = AgentWriteSessionsHandle::new();
Expand Down
76 changes: 68 additions & 8 deletions fluxon_rs/fluxon_fs/src/cache_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ async fn stage_worker_loop(
) {
let mut pending_task: Option<StageTask> = None;
loop {
let mut queue_guard = None;
let (task, task_from_queue) = if let Some(t) = pending_task.take() {
(t, false)
} else {
Expand All @@ -297,6 +298,7 @@ async fn stage_worker_loop(
Some(t) => t,
None => return, // sender dropped, nothing to do
};
queue_guard = Some(guard);
(t, true)
};
if task_from_queue {
Expand All @@ -308,17 +310,20 @@ async fn stage_worker_loop(
let mut staged_piece_keys: Vec<PieceKey> = vec![task.piece_key.clone()];
let mut staged_piece_count = 1usize;

// Keep the receiver lock from the initial recv while peeking follow-up items.
// Otherwise another worker can grab the single shared receiver and block on
// recv(), which stalls this worker before it ever reaches the stage callback.
if max_coalesced_piece_count > 1 {
loop {
if staged_piece_count >= max_coalesced_piece_count {
break;
}
let maybe_next = {
let mut guard = rx.lock().await;
match guard.try_recv() {
Ok(t) => Some(t),
Err(_) => None,
}
let maybe_next = if let Some(guard) = queue_guard.as_mut() {
guard.try_recv().ok()
} else if let Ok(mut guard) = rx.try_lock() {
guard.try_recv().ok()
} else {
None
};
let Some(next_task) = maybe_next else {
break;
Expand Down Expand Up @@ -424,7 +429,9 @@ fn now_ms() -> i64 {
#[cfg(test)]
mod tests {
use super::*;
use std::sync::mpsc;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::{Condvar, Mutex};
use tokio::time::{Duration, sleep};

fn sample_key() -> PieceKey {
Expand All @@ -440,8 +447,10 @@ mod tests {
async fn suggest_enqueues_and_worker_runs() {
let stage_calls = Arc::new(AtomicUsize::new(0));
let stage_calls_clone = stage_calls.clone();
let (stage_started_tx, stage_started_rx) = mpsc::sync_channel(1);
let stage_piece_fn: StagePieceFn = Arc::new(move |_key, _identity| {
stage_calls_clone.fetch_add(1, AtomicOrdering::Relaxed);
let _ = stage_started_tx.send(());
Ok(())
});
let stage_piece_range_fn: StagePieceRangeFn =
Expand All @@ -455,6 +464,9 @@ mod tests {

let outcome = ctrl.handle_suggest(sample_key(), None);
assert_eq!(outcome, SuggestOutcome::Enqueued);
stage_started_rx
.recv_timeout(std::time::Duration::from_secs(5))
.expect("stage worker did not run within 5s");

for _ in 0..50 {
if stage_calls.load(AtomicOrdering::Relaxed) == 1 && ctrl.inflight_count() == 0 {
Expand All @@ -475,9 +487,19 @@ mod tests {
async fn suggest_dedupes_while_inflight() {
let stage_calls = Arc::new(AtomicUsize::new(0));
let stage_calls_clone = stage_calls.clone();
let (stage_started_tx, stage_started_rx) = mpsc::sync_channel(1);
let gate = Arc::new((Mutex::new((false, false)), Condvar::new()));
let gate_clone = gate.clone();
let stage_piece_fn: StagePieceFn = Arc::new(move |_key, _identity| {
stage_calls_clone.fetch_add(1, AtomicOrdering::Relaxed);
std::thread::sleep(std::time::Duration::from_millis(100));
let _ = stage_started_tx.send(());
let (lock, cv) = &*gate_clone;
let mut state = lock.lock().unwrap();
state.0 = true;
cv.notify_all();
while !state.1 {
state = cv.wait(state).unwrap();
}
Ok(())
});
let stage_piece_range_fn: StagePieceRangeFn =
Expand All @@ -494,6 +516,18 @@ mod tests {
ctrl.handle_suggest(key.clone(), None),
SuggestOutcome::Enqueued
);
stage_started_rx
.recv_timeout(std::time::Duration::from_secs(5))
.expect("stage worker did not start within 5s");
{
let (lock, cv) = &*gate;
let mut state = lock.lock().unwrap();
while !state.0 {
state = cv.wait(state).unwrap();
}
state.1 = true;
cv.notify_all();
}
assert_eq!(
ctrl.handle_suggest(key, None),
SuggestOutcome::DedupedInflight
Expand All @@ -515,8 +549,18 @@ mod tests {

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn queue_drop_updates_snapshot() {
let (stage_started_tx, stage_started_rx) = mpsc::sync_channel(1);
let gate = Arc::new((Mutex::new((false, false)), Condvar::new()));
let gate_clone = gate.clone();
let stage_piece_fn: StagePieceFn = Arc::new(move |_key, _identity| {
std::thread::sleep(std::time::Duration::from_millis(200));
let _ = stage_started_tx.send(());
let (lock, cv) = &*gate_clone;
let mut state = lock.lock().unwrap();
state.0 = true;
cv.notify_all();
while !state.1 {
state = cv.wait(state).unwrap();
}
Ok(())
});
let stage_piece_range_fn: StagePieceRangeFn =
Expand All @@ -543,11 +587,27 @@ mod tests {
};

assert_eq!(ctrl.handle_suggest(key0, None), SuggestOutcome::Enqueued);
stage_started_rx
.recv_timeout(std::time::Duration::from_secs(5))
.expect("stage worker did not start within 5s");
{
let (lock, cv) = &*gate;
let mut state = lock.lock().unwrap();
while !state.0 {
state = cv.wait(state).unwrap();
}
}
assert_eq!(ctrl.handle_suggest(key1, None), SuggestOutcome::Enqueued);
assert_eq!(
ctrl.handle_suggest(key2, None),
SuggestOutcome::QueueDropped
);
{
let (lock, cv) = &*gate;
let mut state = lock.lock().unwrap();
state.1 = true;
cv.notify_all();
}

let snapshot = ctrl.stats_snapshot();
assert_eq!(snapshot.suggest_enqueued_count, 2);
Expand Down
18 changes: 18 additions & 0 deletions fluxon_rs/fluxon_fs_s3_gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5344,6 +5344,9 @@ mod tests {
};
use crate::transfer::encode_transfer_manifest_blob_with_empty_dirs;
use fluxon_fs_core::config::{
FS_CACHE_DEFAULT_WRITE_SESSION_TARGET_INFLIGHT_BYTES_V1,
FS_EXPORT_DEFAULT_INLINE_BYTES_MAX_BYTES_V1,
FS_EXPORT_DEFAULT_METADATA_CACHE_TTL_MS_V1,
FLUXON_FS_LOCAL_TRANSFER_CHECK_DST_EXPORT, FLUXON_FS_LOCAL_TRANSFER_CHECK_SRC_EXPORT,
FluxonFsAccessModel, FluxonFsAccessUser, FluxonFsExport, FluxonFsExportRoutingMode,
FluxonFsGlobalConfig, FluxonFsLocalTransferCheckJobSpecWire, FluxonFsRequestIdentity,
Expand Down Expand Up @@ -6242,6 +6245,9 @@ mod tests {
cache_kv_key_prefix: format!("/{}/", name),
cache_bytes_field_key: format!("{}_bytes", name),
cache_max_bytes: 1024,
inline_bytes_max_bytes: FS_EXPORT_DEFAULT_INLINE_BYTES_MAX_BYTES_V1,
metadata_cache_ttl_ms: FS_EXPORT_DEFAULT_METADATA_CACHE_TTL_MS_V1,
async_backfill_enabled: true,
rpc_paths: export_rpc_paths_for_export_name_v1(name),
}
}
Expand Down Expand Up @@ -6518,6 +6524,8 @@ mod tests {
access_config.clone(),
Arc::new(FluxonFsGlobalConfig {
stale_window_ms: 0,
write_session_target_inflight_bytes:
FS_CACHE_DEFAULT_WRITE_SESSION_TARGET_INFLIGHT_BYTES_V1,
rules: Vec::new(),
exports: BTreeMap::new(),
}),
Expand Down Expand Up @@ -6814,6 +6822,8 @@ max-background-jobs = {TEST_TIKV_RAFTDB_MAX_BACKGROUND_JOBS}\n"
test_gateway_access_config(),
Arc::new(FluxonFsGlobalConfig {
stale_window_ms: 0,
write_session_target_inflight_bytes:
FS_CACHE_DEFAULT_WRITE_SESSION_TARGET_INFLIGHT_BYTES_V1,
rules: Vec::new(),
exports: BTreeMap::new(),
}),
Expand Down Expand Up @@ -6918,6 +6928,8 @@ max-background-jobs = {TEST_TIKV_RAFTDB_MAX_BACKGROUND_JOBS}\n"
}
let fs_cache = FluxonFsGlobalConfig {
stale_window_ms: 0,
write_session_target_inflight_bytes:
FS_CACHE_DEFAULT_WRITE_SESSION_TARGET_INFLIGHT_BYTES_V1,
rules: Vec::new(),
exports,
};
Expand Down Expand Up @@ -6992,6 +7004,8 @@ max-background-jobs = {TEST_TIKV_RAFTDB_MAX_BACKGROUND_JOBS}\n"
}
let fs_cache = FluxonFsGlobalConfig {
stale_window_ms: 0,
write_session_target_inflight_bytes:
FS_CACHE_DEFAULT_WRITE_SESSION_TARGET_INFLIGHT_BYTES_V1,
rules: Vec::new(),
exports,
};
Expand Down Expand Up @@ -7038,6 +7052,8 @@ max-background-jobs = {TEST_TIKV_RAFTDB_MAX_BACKGROUND_JOBS}\n"
access_config,
Arc::new(FluxonFsGlobalConfig {
stale_window_ms: 0,
write_session_target_inflight_bytes:
FS_CACHE_DEFAULT_WRITE_SESSION_TARGET_INFLIGHT_BYTES_V1,
rules: Vec::new(),
exports: BTreeMap::new(),
}),
Expand Down Expand Up @@ -10780,6 +10796,8 @@ max-background-jobs = {TEST_TIKV_RAFTDB_MAX_BACKGROUND_JOBS}\n"

let fs_cache = FluxonFsGlobalConfig {
stale_window_ms: 0,
write_session_target_inflight_bytes:
FS_CACHE_DEFAULT_WRITE_SESSION_TARGET_INFLIGHT_BYTES_V1,
rules: Vec::new(),
exports: BTreeMap::new(),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ use limit_thirdparty::tokio::{self};
use std::time::{Duration, Instant};
use tracing::info;

fn new_master_config(instance_key: &str, port: u16, cluster: &str, etcd: &str) -> MasterConfig {
fn new_master_config(
instance_key: &str,
port: Option<u16>,
cluster: &str,
etcd: &str,
) -> MasterConfig {
let prometheus_base_url = fluxon_util::dev_config::load_tsdb_base_url()
.expect("read prometheus_base_url from build_config_ext.yml (key: prom)");
let prom_remote_write_url =
Expand All @@ -24,7 +29,7 @@ fn new_master_config(instance_key: &str, port: u16, cluster: &str, etcd: &str) -
MasterConfig {
instance_key: instance_key.to_string(),
cluster_name: cluster.to_string(),
port: Some(port),
port,
etcd_endpoints: vec![etcd.to_string()],
protocol: ProtocolConfig {
protocol_type: ProtocolType::Tcp,
Expand Down Expand Up @@ -144,7 +149,7 @@ async fn test_external_client_basic_crud() {
std::fs::create_dir_all(shm_path).unwrap();

// Start master
let master_cfg = new_master_config("ext_test_master", 50120, cluster, &etcd);
let master_cfg = new_master_config("ext_test_master", None, cluster, &etcd);
let (master_fw, _) = run_master(ConfigArg::Config(master_cfg))
.await
.expect("start master");
Expand Down Expand Up @@ -306,7 +311,7 @@ pub async fn test_external_client_lifetime() {
info!("[ELT-SETUP] cluster='{}', shm_path='{}'", cluster, shm_path);

// Start master
let master_cfg = new_master_config("ext_lt_master", 50130, cluster, &etcd);
let master_cfg = new_master_config("ext_lt_master", None, cluster, &etcd);
let (master_fw, _) = run_master(ConfigArg::Config(master_cfg))
.await
.expect("start master");
Expand Down
Loading
Loading