diff --git a/crates/edda-ask/src/lib.rs b/crates/edda-ask/src/lib.rs index f923af4..85bbb17 100644 --- a/crates/edda-ask/src/lib.rs +++ b/crates/edda-ask/src/lib.rs @@ -179,7 +179,13 @@ pub fn ask( InputType::Domain(domain) => { let active = branch_filter( ledger - .active_decisions(Some(domain), None, after_ref, before_ref)? + .active_decisions_limited( + Some(domain), + None, + after_ref, + before_ref, + opts.limit, + )? .into_iter() .map(|r| to_decision_hit(&r)) .collect(), @@ -196,7 +202,7 @@ pub fn ask( InputType::Keyword(kw) => { let mut hits = branch_filter( ledger - .active_decisions(None, Some(kw), after_ref, before_ref)? + .active_decisions_limited(None, Some(kw), after_ref, before_ref, opts.limit)? .into_iter() .map(|r| to_decision_hit(&r)) .collect(), @@ -242,7 +248,7 @@ pub fn ask( InputType::Overview => { let active = branch_filter( ledger - .active_decisions(None, None, after_ref, before_ref)? + .active_decisions_limited(None, None, after_ref, before_ref, opts.limit)? .into_iter() .map(|r| to_decision_hit(&r)) .collect(), diff --git a/crates/edda-bridge-claude/src/bg_detect.rs b/crates/edda-bridge-claude/src/bg_detect.rs index 78c5ab7..39cce93 100644 --- a/crates/edda-bridge-claude/src/bg_detect.rs +++ b/crates/edda-bridge-claude/src/bg_detect.rs @@ -766,7 +766,10 @@ fn save_detect_result(project_id: &str, result: &DetectResult) -> Result<()> { fn append_audit_log(project_id: &str, entry: &AuditEntry) -> Result<()> { use std::io::Write; let path = audit_log_path(project_id); - fs::create_dir_all(path.parent().context("detect audit log path has no parent")?)?; + fs::create_dir_all( + path.parent() + .context("detect audit log path has no parent")?, + )?; let line = serde_json::to_string(entry)?; let mut file = fs::OpenOptions::new() .create(true) diff --git a/crates/edda-bridge-claude/src/bg_scan.rs b/crates/edda-bridge-claude/src/bg_scan.rs index 9862c26..07e153f 100644 --- a/crates/edda-bridge-claude/src/bg_scan.rs +++ b/crates/edda-bridge-claude/src/bg_scan.rs @@ -743,7 +743,7 @@ fn save_scan_state(project_id: &str, result: &ScanResult) -> Result<()> { status: "completed".to_string(), }; let path = scan_state_path(project_id); - fs::create_dir_all(path.parent().unwrap())?; + fs::create_dir_all(path.parent().context("scan state path has no parent")?)?; let json = serde_json::to_string_pretty(&state)?; fs::write(&path, json)?; Ok(()) @@ -751,7 +751,7 @@ fn save_scan_state(project_id: &str, result: &ScanResult) -> Result<()> { fn save_scan_drafts(project_id: &str, result: &ScanResult) -> Result<()> { let path = scan_draft_path(project_id, &result.scan_id); - fs::create_dir_all(path.parent().unwrap())?; + fs::create_dir_all(path.parent().context("scan draft path has no parent")?)?; let json = serde_json::to_string_pretty(result)?; fs::write(&path, json)?; Ok(()) @@ -760,7 +760,7 @@ fn save_scan_drafts(project_id: &str, result: &ScanResult) -> Result<()> { fn append_audit_log(project_id: &str, entry: &AuditEntry) -> Result<()> { use std::io::Write; let path = audit_log_path(project_id); - fs::create_dir_all(path.parent().unwrap())?; + fs::create_dir_all(path.parent().context("audit log path has no parent")?)?; let line = serde_json::to_string(entry)?; let mut file = fs::OpenOptions::new() .create(true) diff --git a/crates/edda-bridge-claude/src/issue_proposal.rs b/crates/edda-bridge-claude/src/issue_proposal.rs index e52fa79..38df2bb 100644 --- a/crates/edda-bridge-claude/src/issue_proposal.rs +++ b/crates/edda-bridge-claude/src/issue_proposal.rs @@ -74,7 +74,7 @@ pub fn new_proposal_id() -> String { /// Save an issue proposal to disk. pub fn save_proposal(project_id: &str, proposal: &IssueProposal) -> Result<()> { let path = proposal_path(project_id, &proposal.proposal_id); - fs::create_dir_all(path.parent().unwrap())?; + fs::create_dir_all(path.parent().context("proposal path has no parent")?)?; let json = serde_json::to_string_pretty(proposal)?; fs::write(&path, json)?; @@ -288,7 +288,7 @@ fn append_audit( ) -> Result<()> { use std::io::Write; let path = audit_log_path(project_id); - fs::create_dir_all(path.parent().unwrap())?; + fs::create_dir_all(path.parent().context("audit log path has no parent")?)?; let entry = AuditEntry { ts: now_rfc3339(), proposal_id: proposal_id.to_string(), diff --git a/crates/edda-ledger/Cargo.toml b/crates/edda-ledger/Cargo.toml index 391dc98..9bbb0fa 100644 --- a/crates/edda-ledger/Cargo.toml +++ b/crates/edda-ledger/Cargo.toml @@ -22,3 +22,4 @@ rusqlite = { version = "0.32", features = ["bundled"] } rand.workspace = true sha2.workspace = true hex.workspace = true +tracing.workspace = true diff --git a/crates/edda-ledger/src/ledger.rs b/crates/edda-ledger/src/ledger.rs index fc2ae3d..6620c4b 100644 --- a/crates/edda-ledger/src/ledger.rs +++ b/crates/edda-ledger/src/ledger.rs @@ -185,7 +185,20 @@ impl Ledger { before: Option<&str>, ) -> anyhow::Result> { self.sqlite - .active_decisions(domain, key_pattern, after, before) + .active_decisions(domain, key_pattern, after, before, None) + } + + /// Query active decisions with limit for hot path optimization. + pub fn active_decisions_limited( + &self, + domain: Option<&str>, + key_pattern: Option<&str>, + after: Option<&str>, + before: Option<&str>, + limit: usize, + ) -> anyhow::Result> { + self.sqlite + .active_decisions(domain, key_pattern, after, before, Some(limit)) } /// All decisions for a key (active + superseded), ordered by time. @@ -409,10 +422,12 @@ impl Ledger { &self, village_id: Option<&str>, engine_version: Option<&str>, + decision_type: Option<&str>, limit: usize, + offset: usize, ) -> anyhow::Result> { self.sqlite - .query_snapshots(village_id, engine_version, limit) + .query_snapshots(village_id, engine_version, decision_type, limit, offset) } /// Find all snapshots for a given context_hash. diff --git a/crates/edda-ledger/src/sqlite_store.rs b/crates/edda-ledger/src/sqlite_store.rs index 5df2272..3485fbe 100644 --- a/crates/edda-ledger/src/sqlite_store.rs +++ b/crates/edda-ledger/src/sqlite_store.rs @@ -6,6 +6,8 @@ use edda_core::types::{Digest, Event, Provenance, Refs}; use rusqlite::{params, Connection, OptionalExtension}; use std::path::Path; +use std::time::Instant; +use tracing::debug; const SCHEMA_SQL: &str = " PRAGMA journal_mode = WAL; @@ -156,6 +158,20 @@ CREATE INDEX IF NOT EXISTS idx_decisions_scope ON decisions(scope) WHERE scope ! CREATE INDEX IF NOT EXISTS idx_decisions_source ON decisions(source_project_id) WHERE source_project_id IS NOT NULL; "; +const SCHEMA_V9_SQL: &str = " +CREATE INDEX IF NOT EXISTS idx_decisions_active_domain_branch + ON decisions(is_active, domain, branch) WHERE is_active = TRUE; +CREATE INDEX IF NOT EXISTS idx_decisions_active_domain + ON decisions(is_active, domain) WHERE is_active = TRUE; +"; + +const SCHEMA_V10_SQL: &str = " +ALTER TABLE decide_snapshots ADD COLUMN decision_type TEXT NOT NULL DEFAULT 'general'; +CREATE INDEX IF NOT EXISTS idx_snapshots_village_created_type + ON decide_snapshots(village_id, created_at DESC, decision_type) + WHERE village_id IS NOT NULL; +"; + /// A row from the `decisions` table. #[derive(Debug, Clone)] pub struct DecisionRow { @@ -260,6 +276,7 @@ pub struct DecideSnapshotRow { pub event_id: String, pub context_hash: String, pub engine_version: String, + pub decision_type: String, pub schema_version: String, pub redaction_level: String, pub village_id: Option, @@ -389,6 +406,18 @@ impl SqliteStore { self.migrate_v7_to_v8()?; } + // Migrate to v9 if needed (hot path query optimization indexes) + let current = self.schema_version()?; + if current < 9 { + self.migrate_v8_to_v9()?; + } + + // Migrate to v10 if needed (snapshot hot path indexes + decision_type) + let current = self.schema_version()?; + if current < 10 { + self.migrate_v9_to_v10()?; + } + Ok(()) } @@ -460,8 +489,8 @@ impl SqliteStore { self.conn.execute( "INSERT OR IGNORE INTO decisions - (event_id, key, value, reason, domain, branch, supersedes_id, is_active, scope) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, TRUE, 'local')", + (event_id, key, value, reason, domain, branch, supersedes_id, is_active) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, TRUE)", params![event_id, key, value, reason, domain, branch, supersedes_id], )?; } @@ -659,6 +688,36 @@ impl SqliteStore { Ok(()) } + fn migrate_v8_to_v9(&self) -> anyhow::Result<()> { + self.conn.execute_batch(SCHEMA_V9_SQL)?; + self.set_schema_version(9)?; + Ok(()) + } + + fn migrate_v9_to_v10(&self) -> anyhow::Result<()> { + let has_decision_type = self + .conn + .prepare("PRAGMA table_info(decide_snapshots)")? + .query_map([], |row| row.get::<_, String>(1))? + .collect::, _>>()? + .iter() + .any(|name| name == "decision_type"); + + if has_decision_type { + self.conn.execute( + "CREATE INDEX IF NOT EXISTS idx_snapshots_village_created_type + ON decide_snapshots(village_id, created_at DESC, decision_type) + WHERE village_id IS NOT NULL", + [], + )?; + } else { + self.conn.execute_batch(SCHEMA_V10_SQL)?; + } + + self.set_schema_version(10)?; + Ok(()) + } + /// Backfill task brief updates from existing commit/note/merge events. fn backfill_task_brief_updates(&self) -> anyhow::Result<()> { let mut brief_stmt = self @@ -1396,13 +1455,16 @@ impl SqliteStore { /// Query active decisions, optionally filtered by domain or key prefix. /// `after`/`before` are optional ISO 8601 bounds for temporal filtering. + /// `limit` caps the result set to prevent full table scans on hot path. pub fn active_decisions( &self, domain: Option<&str>, key_pattern: Option<&str>, after: Option<&str>, before: Option<&str>, + limit: Option, ) -> anyhow::Result> { + let start = Instant::now(); let has_temporal = after.is_some() || before.is_some(); let mut sql = String::from( @@ -1444,13 +1506,30 @@ impl SqliteStore { sql.push_str(" ORDER BY d.domain, d.key"); } + if let Some(lim) = limit { + sql.push_str(&format!(" LIMIT {lim}")); + } + let mut stmt = self.conn.prepare(&sql)?; let params_ref: Vec<&dyn rusqlite::types::ToSql> = param_values.iter().map(|p| p.as_ref()).collect(); let rows = stmt.query_map(params_ref.as_slice(), map_decision_row)?; - rows.collect::, _>>() - .map_err(|e| anyhow::anyhow!("decision query failed: {e}")) + let result = rows + .collect::, _>>() + .map_err(|e| anyhow::anyhow!("decision query failed: {e}"))?; + + let elapsed = start.elapsed(); + debug!( + domain = domain, + key_pattern = key_pattern, + limit = limit, + result_count = result.len(), + elapsed_ms = elapsed.as_millis() as u64, + "active_decisions query completed" + ); + + Ok(result) } /// All decisions for a key (active + superseded), ordered by time. @@ -2303,13 +2382,14 @@ impl SqliteStore { pub fn insert_snapshot(&self, row: &DecideSnapshotRow) -> anyhow::Result<()> { self.conn.execute( "INSERT INTO decide_snapshots - (event_id, context_hash, engine_version, schema_version, + (event_id, context_hash, engine_version, decision_type, schema_version, redaction_level, village_id, cycle_id, has_blobs, created_at) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", params![ row.event_id, row.context_hash, row.engine_version, + row.decision_type, row.schema_version, row.redaction_level, row.village_id, @@ -2326,9 +2406,12 @@ impl SqliteStore { &self, village_id: Option<&str>, engine_version: Option<&str>, + decision_type: Option<&str>, limit: usize, + offset: usize, ) -> anyhow::Result> { - let base = "SELECT event_id, context_hash, engine_version, schema_version, + let start = Instant::now(); + let base = "SELECT event_id, context_hash, engine_version, decision_type, schema_version, redaction_level, village_id, cycle_id, has_blobs, created_at FROM decide_snapshots"; @@ -2343,16 +2426,21 @@ impl SqliteStore { conditions.push("engine_version = ?"); param_values.push(Box::new(e.to_string())); } + if let Some(dt) = decision_type { + conditions.push("decision_type = ?"); + param_values.push(Box::new(dt.to_string())); + } let sql = if conditions.is_empty() { - format!("{base} ORDER BY created_at DESC LIMIT ?") + format!("{base} ORDER BY created_at DESC LIMIT ? OFFSET ?") } else { format!( - "{base} WHERE {} ORDER BY created_at DESC LIMIT ?", + "{base} WHERE {} ORDER BY created_at DESC LIMIT ? OFFSET ?", conditions.join(" AND ") ) }; param_values.push(Box::new(limit as i64)); + param_values.push(Box::new(offset as i64)); let param_refs: Vec<&dyn rusqlite::types::ToSql> = param_values.iter().map(|p| p.as_ref()).collect(); @@ -2360,8 +2448,23 @@ impl SqliteStore { let mut stmt = self.conn.prepare(&sql)?; let rows = stmt.query_map(param_refs.as_slice(), map_snapshot_row)?; - rows.collect::, _>>() - .map_err(|e| anyhow::anyhow!("snapshot query failed: {e}")) + let result = rows + .collect::, _>>() + .map_err(|e| anyhow::anyhow!("snapshot query failed: {e}"))?; + + let elapsed = start.elapsed(); + debug!( + village_id = village_id, + engine_version = engine_version, + decision_type = decision_type, + limit = limit, + offset = offset, + result_count = result.len(), + elapsed_ms = elapsed.as_millis() as u64, + "snapshot query completed" + ); + + Ok(result) } /// Find all snapshots with a given context_hash (for version comparison). @@ -2370,7 +2473,7 @@ impl SqliteStore { context_hash: &str, ) -> anyhow::Result> { let mut stmt = self.conn.prepare( - "SELECT event_id, context_hash, engine_version, schema_version, + "SELECT event_id, context_hash, engine_version, decision_type, schema_version, redaction_level, village_id, cycle_id, has_blobs, created_at FROM decide_snapshots WHERE context_hash = ?1 @@ -2413,12 +2516,13 @@ fn map_snapshot_row(row: &rusqlite::Row<'_>) -> rusqlite::Result= 2); // Verify decisions table was populated by backfill - let active = store.active_decisions(None, None, None, None).unwrap(); + let active = store + .active_decisions(None, None, None, None, None) + .unwrap(); assert_eq!(active.len(), 1); assert_eq!(active[0].key, "db.engine"); assert_eq!(active[0].value, "postgres"); @@ -3369,7 +3483,9 @@ mod tests { let e = new_note_event("main", None, "system", "just a note", &["todo".into()]).unwrap(); store.append_event(&e).unwrap(); - let active = store.active_decisions(None, None, None, None).unwrap(); + let active = store + .active_decisions(None, None, None, None, None) + .unwrap(); assert!(active.is_empty()); drop(store); @@ -3652,7 +3768,129 @@ mod tests { assert!(tables.contains(&"task_briefs".to_string())); assert!(tables.contains(&"device_tokens".to_string())); assert!(tables.contains(&"decide_snapshots".to_string())); - assert_eq!(store.schema_version().unwrap(), 8); + assert_eq!(store.schema_version().unwrap(), 10); + drop(store); + let _ = std::fs::remove_dir_all(&dir); + } + + #[test] + fn query_snapshots_supports_decision_type_and_offset() { + let (dir, store) = tmp_db(); + + let e1 = make_decision_event("main", "snap.key1", "v1", None, None); + store.append_event(&e1).unwrap(); + store + .insert_snapshot(&DecideSnapshotRow { + event_id: e1.event_id.clone(), + context_hash: "ctx1".to_string(), + engine_version: "engine-a".to_string(), + decision_type: "chief".to_string(), + schema_version: "snapshot.v1".to_string(), + redaction_level: "full".to_string(), + village_id: Some("blog-village".to_string()), + cycle_id: None, + has_blobs: false, + created_at: "2026-03-17T10:00:00Z".to_string(), + }) + .unwrap(); + + let e2 = make_decision_event("main", "snap.key2", "v2", None, None); + store.append_event(&e2).unwrap(); + store + .insert_snapshot(&DecideSnapshotRow { + event_id: e2.event_id.clone(), + context_hash: "ctx2".to_string(), + engine_version: "engine-a".to_string(), + decision_type: "chief".to_string(), + schema_version: "snapshot.v1".to_string(), + redaction_level: "full".to_string(), + village_id: Some("blog-village".to_string()), + cycle_id: None, + has_blobs: false, + created_at: "2026-03-17T10:01:00Z".to_string(), + }) + .unwrap(); + + let e3 = make_decision_event("main", "snap.key3", "v3", None, None); + store.append_event(&e3).unwrap(); + store + .insert_snapshot(&DecideSnapshotRow { + event_id: e3.event_id.clone(), + context_hash: "ctx3".to_string(), + engine_version: "engine-a".to_string(), + decision_type: "general".to_string(), + schema_version: "snapshot.v1".to_string(), + redaction_level: "full".to_string(), + village_id: Some("blog-village".to_string()), + cycle_id: None, + has_blobs: false, + created_at: "2026-03-17T10:02:00Z".to_string(), + }) + .unwrap(); + + let chief = store + .query_snapshots(Some("blog-village"), None, Some("chief"), 20, 0) + .unwrap(); + assert_eq!(chief.len(), 2); + assert_eq!(chief[0].context_hash, "ctx2"); + assert_eq!(chief[1].context_hash, "ctx1"); + + let chief_offset = store + .query_snapshots(Some("blog-village"), None, Some("chief"), 1, 1) + .unwrap(); + assert_eq!(chief_offset.len(), 1); + assert_eq!(chief_offset[0].context_hash, "ctx1"); + + drop(store); + let _ = std::fs::remove_dir_all(&dir); + } + + #[test] + fn snapshots_hot_path_query_performance() { + use std::time::Instant; + + let (dir, store) = tmp_db(); + + for i in 0..300 { + let e = make_decision_event("main", &format!("snap.key{i}"), "value", None, None); + store.append_event(&e).unwrap(); + let decision_type = if i % 2 == 0 { "chief" } else { "general" }; + store + .insert_snapshot(&DecideSnapshotRow { + event_id: e.event_id.clone(), + context_hash: format!("ctx{i}"), + engine_version: "engine-a".to_string(), + decision_type: decision_type.to_string(), + schema_version: "snapshot.v1".to_string(), + redaction_level: "full".to_string(), + village_id: Some("blog-village".to_string()), + cycle_id: None, + has_blobs: false, + created_at: format!("2026-03-17T10:{:02}:00Z", i % 60), + }) + .unwrap(); + } + + let _ = store + .query_snapshots(Some("blog-village"), None, Some("chief"), 20, 0) + .unwrap(); + + let start = Instant::now(); + for _ in 0..100 { + let rows = store + .query_snapshots(Some("blog-village"), None, Some("chief"), 20, 0) + .unwrap(); + assert_eq!(rows.len(), 20); + } + let elapsed = start.elapsed(); + let avg_ms = elapsed.as_millis() as f64 / 100.0; + + assert!( + avg_ms < 100.0, + "snapshot hot path query avg {}ms exceeds 100ms threshold", + avg_ms + ); + drop(store); let _ = std::fs::remove_dir_all(&dir); } @@ -4686,7 +4924,7 @@ mod tests { .unwrap(); let results = store - .active_decisions(None, None, Some("2026-03-11T00:00:00Z"), None) + .active_decisions(None, None, Some("2026-03-11T00:00:00Z"), None, None) .unwrap(); assert_eq!(results.len(), 1); assert_eq!(results[0].key, "db.pool"); @@ -4716,7 +4954,7 @@ mod tests { .unwrap(); let results = store - .active_decisions(None, None, None, Some("2026-03-11T00:00:00Z")) + .active_decisions(None, None, None, Some("2026-03-11T00:00:00Z"), None) .unwrap(); assert_eq!(results.len(), 1); assert_eq!(results[0].key, "db.engine"); @@ -4759,6 +4997,7 @@ mod tests { None, Some("2026-03-09T00:00:00Z"), Some("2026-03-12T00:00:00Z"), + None, ) .unwrap(); assert_eq!(results.len(), 1); @@ -4797,7 +5036,7 @@ mod tests { .unwrap(); let results = store - .active_decisions(Some("db"), None, Some("2026-03-11T00:00:00Z"), None) + .active_decisions(Some("db"), None, Some("2026-03-11T00:00:00Z"), None, None) .unwrap(); assert_eq!(results.len(), 1); assert_eq!(results[0].key, "db.pool"); @@ -4827,7 +5066,13 @@ mod tests { .unwrap(); let results = store - .active_decisions(None, Some("engine"), Some("2026-03-11T00:00:00Z"), None) + .active_decisions( + None, + Some("engine"), + Some("2026-03-11T00:00:00Z"), + None, + None, + ) .unwrap(); assert_eq!(results.len(), 1); assert_eq!(results[0].key, "cache.engine"); @@ -4854,6 +5099,7 @@ mod tests { None, Some("2026-03-15T00:00:00Z"), Some("2026-03-05T00:00:00Z"), + None, ) .unwrap(); assert!(results.is_empty()); @@ -4882,7 +5128,9 @@ mod tests { )) .unwrap(); - let results = store.active_decisions(None, None, None, None).unwrap(); + let results = store + .active_decisions(None, None, None, None, None) + .unwrap(); assert_eq!(results.len(), 2); // Original sort order: domain, key (auth before db) assert_eq!(results[0].key, "auth.method"); @@ -4921,7 +5169,7 @@ mod tests { .unwrap(); let results = store - .active_decisions(None, None, Some("2026-03-07T00:00:00Z"), None) + .active_decisions(None, None, Some("2026-03-07T00:00:00Z"), None, None) .unwrap(); assert_eq!(results.len(), 3); assert_eq!(results[0].key, "c.third"); @@ -5174,4 +5422,72 @@ mod tests { drop(store); let _ = std::fs::remove_dir_all(&dir); } + + #[test] + fn hot_path_query_performance() { + use std::time::Instant; + + let (dir, store) = tmp_db(); + + // Seed with 500 decisions across 5 domains (typical workload) + let domains = ["db", "auth", "api", "cache", "infra"]; + for (i, dom) in domains.iter().enumerate() { + for j in 0..100 { + let key = format!("{}.key{}", dom, j); + let value = format!("value{}_{}", i, j); + let reason = format!("reason for {} decision", key); + let e = make_decision_event("main", &key, &value, Some(&reason), None); + store.append_event(&e).unwrap(); + } + } + + // Warm up + let _ = store + .active_decisions(None, None, None, None, Some(20)) + .unwrap(); + + // Benchmark: query all active decisions with limit (hot path) + let start = Instant::now(); + for _ in 0..100 { + let results = store + .active_decisions(None, None, None, None, Some(20)) + .unwrap(); + assert_eq!(results.len(), 20); + } + let elapsed_all = start.elapsed(); + let avg_ms_all = elapsed_all.as_millis() as f64 / 100.0; + + // Benchmark: query by domain (hot path) + let start = Instant::now(); + for _ in 0..100 { + let results = store + .active_decisions(Some("db"), None, None, None, Some(20)) + .unwrap(); + assert_eq!(results.len(), 20); + } + let elapsed_domain = start.elapsed(); + let avg_ms_domain = elapsed_domain.as_millis() as f64 / 100.0; + + // Verify hot path queries are under 100ms (requirement from GH-319) + // With index optimization, should be well under 10ms + assert!( + avg_ms_all < 100.0, + "hot path query (all) avg {}ms exceeds 100ms threshold", + avg_ms_all + ); + assert!( + avg_ms_domain < 100.0, + "hot path query (domain) avg {}ms exceeds 100ms threshold", + avg_ms_domain + ); + + // Log results for visibility + eprintln!( + "hot_path_query_performance: avg_all={:.2}ms, avg_domain={:.2}ms", + avg_ms_all, avg_ms_domain + ); + + drop(store); + let _ = std::fs::remove_dir_all(&dir); + } } diff --git a/crates/edda-serve/src/lib.rs b/crates/edda-serve/src/lib.rs index 0df21d3..f4a3d96 100644 --- a/crates/edda-serve/src/lib.rs +++ b/crates/edda-serve/src/lib.rs @@ -16,6 +16,7 @@ use axum::{Json, Router}; use edda_ledger::device_token::{generate_device_token, hash_token}; use serde::{Deserialize, Serialize}; use tower_http::cors::{AllowOrigin, CorsLayer}; +use tracing::debug; use edda_aggregate::aggregate::{ aggregate_decisions, aggregate_overview, per_project_metrics, DateRange, ProjectMetrics, @@ -50,6 +51,7 @@ struct AppState { repo_root: PathBuf, chronicle: Option, pending_pairings: Mutex>, + snapshot_cache: Mutex>, } struct PairingRequest { @@ -57,6 +59,11 @@ struct PairingRequest { expires_at: std::time::Instant, } +struct SnapshotCacheEntry { + expires_at: std::time::Instant, + snapshots: Vec, +} + struct ChronicleContext { _store_root: PathBuf, } @@ -149,6 +156,7 @@ pub async fn serve(repo_root: &Path, config: ServeConfig) -> anyhow::Result<()> repo_root: repo_root.to_path_buf(), chronicle, pending_pairings: Mutex::new(HashMap::new()), + snapshot_cache: Mutex::new(HashMap::new()), }); // Public routes (no auth required) @@ -263,6 +271,7 @@ fn router(repo_root: &Path) -> Router { repo_root: repo_root.to_path_buf(), chronicle, pending_pairings: Mutex::new(HashMap::new()), + snapshot_cache: Mutex::new(HashMap::new()), }); Router::new() .route("/api/health", get(health)) @@ -1812,6 +1821,8 @@ struct SnapshotBody { context: serde_json::Value, result: serde_json::Value, engine_version: String, + #[serde(default = "default_snapshot_decision_type")] + decision_type: String, #[serde(default = "default_snapshot_schema")] schema_version: String, context_hash: String, @@ -1825,6 +1836,10 @@ fn default_snapshot_schema() -> String { "snapshot.v1".to_string() } +fn default_snapshot_decision_type() -> String { + "general".to_string() +} + fn default_redaction_level() -> String { "full".to_string() } @@ -1883,6 +1898,7 @@ async fn post_snapshot( // Build event payload: metadata + inline or blob refs let mut payload = serde_json::json!({ "engine_version": body.engine_version, + "decision_type": body.decision_type, "schema_version": body.schema_version, "context_hash": body.context_hash, "redaction_level": body.redaction_level, @@ -1919,6 +1935,7 @@ async fn post_snapshot( event_id: event_id.clone(), context_hash: body.context_hash.clone(), engine_version: body.engine_version, + decision_type: body.decision_type, schema_version: body.schema_version, redaction_level: body.redaction_level, village_id: body.village_id, @@ -1942,23 +1959,89 @@ async fn post_snapshot( struct SnapshotsQuery { village_id: Option, engine_version: Option, + decision_type: Option, #[serde(default = "default_snapshot_limit")] limit: usize, + #[serde(default)] + offset: usize, } fn default_snapshot_limit() -> usize { 20 } +fn snapshots_cache_key(query: &SnapshotsQuery) -> String { + format!( + "v={:?}|e={:?}|d={:?}|l={}|o={}", + query.village_id, query.engine_version, query.decision_type, query.limit, query.offset + ) +} + +fn snapshots_cache_lookup( + state: &Arc, + key: &str, +) -> Result>, AppError> { + let mut cache = state + .snapshot_cache + .lock() + .map_err(|e| AppError::Internal(anyhow::anyhow!("lock poisoned: {e}")))?; + let now = std::time::Instant::now(); + cache.retain(|_, v| v.expires_at > now); + Ok(cache.get(key).map(|entry| entry.snapshots.clone())) +} + +fn snapshots_cache_store( + state: &Arc, + key: String, + snapshots: Vec, +) -> Result<(), AppError> { + let mut cache = state + .snapshot_cache + .lock() + .map_err(|e| AppError::Internal(anyhow::anyhow!("lock poisoned: {e}")))?; + cache.insert( + key, + SnapshotCacheEntry { + expires_at: std::time::Instant::now() + Duration::from_secs(300), + snapshots, + }, + ); + Ok(()) +} + async fn get_snapshots( State(state): State>, Query(query): Query, ) -> Result { + let start = std::time::Instant::now(); + let should_cache = query.village_id.is_some() && query.limit <= 100 && query.offset < 100; + let cache_key = snapshots_cache_key(&query); + + if should_cache { + if let Some(cached) = snapshots_cache_lookup(&state, &cache_key)? { + let elapsed = start.elapsed(); + debug!( + village_id = query.village_id.as_deref(), + engine_version = query.engine_version.as_deref(), + decision_type = query.decision_type.as_deref(), + limit = query.limit, + offset = query.offset, + result_count = cached.len(), + elapsed_ms = elapsed.as_millis() as u64, + cached = true, + "get_snapshots request completed" + ); + return Ok(Json(cached)); + } + } + let ledger = state.open_ledger()?; let rows = ledger.query_snapshots( query.village_id.as_deref(), query.engine_version.as_deref(), + query.decision_type.as_deref(), query.limit, + query.offset, )?; let mut snapshots = Vec::new(); @@ -1967,6 +2050,23 @@ async fn get_snapshots( snapshots.push(snapshot); } + if should_cache { + snapshots_cache_store(&state, cache_key, snapshots.clone())?; + } + + let elapsed = start.elapsed(); + debug!( + village_id = query.village_id.as_deref(), + engine_version = query.engine_version.as_deref(), + decision_type = query.decision_type.as_deref(), + limit = query.limit, + offset = query.offset, + result_count = snapshots.len(), + elapsed_ms = elapsed.as_millis() as u64, + cached = false, + "get_snapshots request completed" + ); + Ok(Json(snapshots)) } @@ -2035,6 +2135,7 @@ fn reconstruct_snapshot( "event_id": row.event_id, "context_hash": row.context_hash, "engine_version": row.engine_version, + "decision_type": row.decision_type, "schema_version": row.schema_version, "redaction_level": row.redaction_level, "village_id": row.village_id, @@ -5978,6 +6079,8 @@ actors: fn snapshot_json() -> serde_json::Value { serde_json::json!({ "engine_version": "claude-3.5", + "decision_type": "chief", + "village_id": "blog-village", "context_hash": "abc123def456", "context": {"files": ["main.rs"], "prompt": "test"}, "result": {"decisions": [{"key": "db.engine", "value": "sqlite"}]}, @@ -6192,6 +6295,74 @@ actors: assert_eq!(resp.status(), StatusCode::NOT_FOUND); } + #[tokio::test] + async fn get_snapshots_filters_decision_type_and_offset() { + let tmp = tempfile::tempdir().unwrap(); + setup_workspace(tmp.path()); + + // Seed snapshots with mixed decision_type and same village + let app = router(tmp.path()); + let body1 = serde_json::json!({ + "engine_version": "claude-3.5", + "decision_type": "chief", + "village_id": "blog-village", + "context_hash": "ctx-chief-1", + "context": {"files": ["a.rs"]}, + "result": {"ok": true} + }); + let body2 = serde_json::json!({ + "engine_version": "claude-3.5", + "decision_type": "chief", + "village_id": "blog-village", + "context_hash": "ctx-chief-2", + "context": {"files": ["b.rs"]}, + "result": {"ok": true} + }); + let body3 = serde_json::json!({ + "engine_version": "claude-3.5", + "decision_type": "general", + "village_id": "blog-village", + "context_hash": "ctx-general-1", + "context": {"files": ["c.rs"]}, + "result": {"ok": true} + }); + + for body in [body1, body2, body3] { + let resp = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/api/snapshot") + .header("content-type", "application/json") + .body(Body::from(body.to_string())) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::CREATED); + } + + // Filter by decision_type=chief, limit=1, offset=1 + let resp = app + .oneshot( + Request::builder() + .uri("/api/snapshots?village_id=blog-village&decision_type=chief&limit=1&offset=1") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp.status(), StatusCode::OK); + let bytes = axum::body::to_bytes(resp.into_body(), usize::MAX) + .await + .unwrap(); + let json: Vec = serde_json::from_slice(&bytes).unwrap(); + assert_eq!(json.len(), 1); + assert_eq!(json[0]["decision_type"], "chief"); + } + // ── POST /api/decisions/batch tests ── #[tokio::test] @@ -6642,6 +6813,7 @@ actors: repo_root: repo_root.to_path_buf(), chronicle, pending_pairings: Mutex::new(HashMap::new()), + snapshot_cache: Mutex::new(HashMap::new()), }); let public_routes = Router::new().route("/api/health", get(health)); diff --git a/opencode.json b/opencode.json index 34b24f3..a0cfa03 100644 --- a/opencode.json +++ b/opencode.json @@ -6,10 +6,10 @@ "npm": "@ai-sdk/openai-compatible", "env": ["T8STAR_API_KEY"], "models": { - "gpt-5.3-codex-high": { - "name": "GPT 5.3 Codex High", + "gpt-5.3-codex-medium": { + "name": "GPT 5.3 Codex Medium", "tool_call": true, - "limit": { "context": 200000, "output": 16384 } + "limit": { "context": 200000, "output": 163840 } } }, "options": {