diff --git a/src/openhuman/mcp_server/protocol.rs b/src/openhuman/mcp_server/protocol.rs index 14158c85d7..73c560f6c2 100644 --- a/src/openhuman/mcp_server/protocol.rs +++ b/src/openhuman/mcp_server/protocol.rs @@ -174,7 +174,7 @@ async fn handle_request(id: Value, method: &str, params: Value, session: &mut Mc session.source_type(), object_keys(&arguments) ); - match tools::call_tool(&name, arguments).await { + match tools::call_tool_with_session(&name, arguments, session.source_type()).await { Ok(result) => { log::debug!( "[mcp_server] tools/call response id={} tool={} client_source_type={} is_error={}", diff --git a/src/openhuman/mcp_server/tools.rs b/src/openhuman/mcp_server/tools.rs index ee417d27df..03bb4c34ea 100644 --- a/src/openhuman/mcp_server/tools.rs +++ b/src/openhuman/mcp_server/tools.rs @@ -544,6 +544,25 @@ fn list_tools_result_from_specs(specs: Vec) -> Value { } pub async fn call_tool(name: &str, arguments: Value) -> Result { + // Back-compat wrapper for the pre-#2536 signature. The `mcp:` default + // matches the fallback `McpSession` uses when an MCP client omits + // `clientInfo.name` during `initialize`, so audit rows from this path + // are indistinguishable from "no client identity" sessions. + call_tool_with_session(name, arguments, "mcp").await +} + +/// `call_tool` variant that threads the per-session `client_source_type` +/// (e.g. `"mcp:claude-desktop"`) into the write-dispatch path so the +/// audit log (#2536) can record *which* MCP client made each write. +/// +/// Callers with a live `McpSession` should pass +/// `session.source_type()` here; the bare `mcp` fallback in `call_tool` +/// is only for code paths that haven't been threaded yet. +pub async fn call_tool_with_session( + name: &str, + arguments: Value, + client_source_type: &str, +) -> Result { let spec = tool_specs() .into_iter() .find(|tool| tool.name == name) @@ -573,7 +592,7 @@ pub async fn call_tool(name: &str, arguments: Value) -> Result { enforce_write_policy(spec.name).await?; validate_controller_params(&spec, ¶ms)?; - return dispatch_write_tool(spec.name, ¶ms).await; + return dispatch_write_tool(spec.name, ¶ms, client_source_type).await; } _ => {} } @@ -1169,37 +1188,47 @@ async fn enforce_write_policy(tool_name: &str) -> Result<(), ToolCallError> { async fn dispatch_write_tool( tool_name: &str, params: &Map, + client_source_type: &str, ) -> Result { let rpc_method = "openhuman.memory_doc_put"; tracing::info!( tool = tool_name, rpc_method = rpc_method, - client = "mcp", + client = client_source_type, "[mcp_server] write dispatch" ); - match all::try_invoke_registered_rpc(rpc_method, params.clone()).await { + let outcome = all::try_invoke_registered_rpc(rpc_method, params.clone()).await; + let (rpc_result, audit_chunk_id, audit_success, audit_error) = match &outcome { Some(Ok(value)) => { - let document_id = value - .get("document_id") - .and_then(Value::as_str) - .unwrap_or(""); + let document_id = value.get("document_id").and_then(Value::as_str); tracing::info!( tool = tool_name, - chunk_id = document_id, - client = "mcp", + chunk_id = document_id.unwrap_or(""), + client = client_source_type, "[mcp_server] write success" ); - Ok(tool_success(value)) + ( + Ok(tool_success(value.clone())), + document_id.map(str::to_string), + true, + None, + ) } Some(Err(message)) => { log::warn!( - "[mcp_server] write handler error tool={} error={}", + "[mcp_server] write handler error tool={} client={} error={}", tool_name, + client_source_type, message ); - Ok(tool_error(format!("{} failed: {message}", tool_name))) + ( + Ok(tool_error(format!("{} failed: {message}", tool_name))), + None, + false, + Some(message.clone()), + ) } None => { log::error!( @@ -1207,11 +1236,141 @@ async fn dispatch_write_tool( tool_name, rpc_method ); - Ok(tool_error(format!( - "{} is unavailable: mapped RPC method `{}` is not registered", - tool_name, rpc_method - ))) + ( + Ok(tool_error(format!( + "{} is unavailable: mapped RPC method `{}` is not registered", + tool_name, rpc_method + ))), + None, + false, + Some(format!("rpc method `{rpc_method}` not registered")), + ) + } + }; + + // Best-effort audit log (#2536 Q2=A) — never fails the write. + // Failed RPC dispatches are also recorded so abuse-detection + // signals show up alongside success rows. + record_audit_best_effort( + tool_name, + params, + client_source_type, + audit_chunk_id, + audit_success, + audit_error, + ) + .await; + + rpc_result +} + +/// Best-effort audit insert for the MCP write path (#2536). +/// +/// Loads the live config (cheap, cached) and dispatches a non-blocking +/// `record_write`. Any failure is logged at `warn` level and swallowed +/// — the write itself has already succeeded or failed independently, +/// so the user's request outcome is not affected by audit-table issues. +async fn record_audit_best_effort( + tool_name: &str, + params: &Map, + client_source_type: &str, + resulting_chunk_id: Option, + success: bool, + error_message: Option, +) { + use crate::openhuman::memory::mcp_audit::{record_write, McpWriteRecord}; + use std::time::{SystemTime, UNIX_EPOCH}; + + let config = match config_rpc::load_config_with_timeout().await { + Ok(config) => config, + Err(err) => { + log::warn!( + "[mcp_server] audit-log config load failed tool={} client={} error={}; skipping audit insert", + tool_name, client_source_type, err + ); + return; + } + }; + + let timestamp_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .unwrap_or(0); + + let record = McpWriteRecord { + timestamp_ms, + client_source_type: client_source_type.to_string(), + tool_name: tool_name.to_string(), + args_summary: summarize_write_args(tool_name, params), + resulting_chunk_id, + success, + error_message, + }; + + if let Err(err) = record_write(&config, record).await { + log::warn!( + "[mcp_server] audit-log insert failed tool={} client={} error={:#}", + tool_name, + client_source_type, + err + ); + } +} + +/// Build the slim per-tool `args_summary` JSON string for the audit +/// row. Stores **identifying metadata only** — never the document +/// content itself, which lives in the memory tree via `doc_put`. +/// +/// Returns `None` when the per-tool extractor produced no recordable +/// fields; the audit row still goes in with `args_summary` NULL so +/// the row count itself remains an honest write counter. +fn summarize_write_args(tool_name: &str, params: &Map) -> Option { + let mut summary = Map::new(); + match tool_name { + "memory.store" => { + if let Some(title) = params.get("title").and_then(Value::as_str) { + // Cap at 128 chars so a 10 KiB title doesn't bloat + // the audit table. Truncation is identifier-only; + // the full title is already in the chunk doc. + let truncated: String = title.chars().take(128).collect(); + summary.insert("title".to_string(), Value::String(truncated)); + } + if let Some(namespace) = params.get("namespace").and_then(Value::as_str) { + summary.insert( + "namespace".to_string(), + Value::String(namespace.to_string()), + ); + } + if let Some(tags) = params.get("tags").and_then(Value::as_array) { + summary.insert("tag_count".to_string(), Value::from(tags.len())); + } + } + "memory.note" => { + if let Some(meta) = params.get("metadata").and_then(Value::as_object) { + if let Some(chunk_id) = meta.get("annotates_chunk_id").and_then(Value::as_str) { + summary.insert("chunk_id".to_string(), Value::String(chunk_id.to_string())); + } + } + if let Some(content) = params.get("content").and_then(Value::as_str) { + summary.insert("note_text_length".to_string(), Value::from(content.len())); + } + } + "tree.tag" => { + if let Some(meta) = params.get("metadata").and_then(Value::as_object) { + if let Some(chunk_id) = meta.get("tags_for_chunk_id").and_then(Value::as_str) { + summary.insert("chunk_id".to_string(), Value::String(chunk_id.to_string())); + } + if let Some(tags) = meta.get("applied_tags").and_then(Value::as_array) { + summary.insert("tags".to_string(), Value::Array(tags.clone())); + } + } } + _ => {} + } + if summary.is_empty() { + None + } else { + serde_json::to_string(&Value::Object(summary)).ok() } } diff --git a/src/openhuman/memory/mcp_audit/mod.rs b/src/openhuman/memory/mcp_audit/mod.rs new file mode 100644 index 0000000000..a746e2495b --- /dev/null +++ b/src/openhuman/memory/mcp_audit/mod.rs @@ -0,0 +1,32 @@ +//! Persistent audit log for MCP write tool invocations +//! (`memory.store`, `memory.note`, `tree.tag`). +//! +//! Closes out Q4 of the Phase 3 RFC (#2269) — replaces the +//! `tracing::info!`-only audit trail with a queryable SQLite-backed +//! `mcp_writes` table colocated in the existing memory-tree DB. +//! +//! Issue: #2536. +//! +//! ## V1 scope (this module) +//! +//! - Schema (`mcp_writes`) and migration (additive `CREATE TABLE IF NOT EXISTS` +//! in `tree::store::SCHEMA`). +//! - `McpWriteRecord` struct + `record_write` insert helper. +//! - Best-effort coupling to the MCP write dispatch path +//! (`mcp_server::tools::dispatch_write_tool`) — audit insert failures are +//! logged but never abort the underlying write (Q2=A in #2536). +//! +//! ## Out of scope (follow-ups once Q4 is ratified) +//! +//! - Query RPC (`openhuman.mcp_audit_list`). +//! - MCP-client-side exposure of the audit log. +//! - UI surface for browsing the audit history. +//! - Retention / pruning policy. + +mod store; +#[cfg(test)] +mod tests; +mod types; + +pub use store::record_write; +pub use types::McpWriteRecord; diff --git a/src/openhuman/memory/mcp_audit/store.rs b/src/openhuman/memory/mcp_audit/store.rs new file mode 100644 index 0000000000..faabca35f4 --- /dev/null +++ b/src/openhuman/memory/mcp_audit/store.rs @@ -0,0 +1,127 @@ +//! SQLite insert helper for `mcp_writes`. +//! +//! Writes go through the existing memory-tree connection cache +//! (`tree::store::with_connection`) — Q1 = A per #2536 (colocated +//! storage). This keeps the cache, breaker, and migration story +//! aligned with the rest of the memory subsystem and avoids opening +//! a second SQLite handle just for audit rows. +//! +//! The actual `CREATE TABLE IF NOT EXISTS mcp_writes (...)` lives in +//! `tree::store::SCHEMA` so the table is initialised once per +//! connection (alongside the chunk / summary / embedding tables) +//! rather than re-checked on every audit insert. + +use anyhow::{Context, Result}; +use rusqlite::params; + +use super::types::McpWriteRecord; +use crate::openhuman::config::Config; +use crate::openhuman::memory_store::chunks::store as tree_store; + +/// Cap on `error_message` length at insert time. A runaway error stack +/// shouldn't bloat the audit table; 1 KiB is enough for any meaningful +/// upstream error (typical `doc_put` errors are < 200 chars). +const ERROR_MESSAGE_MAX_BYTES: usize = 1024; + +/// Best-effort insert: returns an error rather than panicking so the +/// caller can `let _ = record_write(...).await` and keep going if the +/// audit insert fails (Q2 = A in #2536 — write availability is not +/// degraded by the audit subsystem). +/// +/// The function is **synchronous** internally (SQLite-bound), but +/// exposed via `tokio::task::spawn_blocking` so the async caller doesn't +/// block its runtime thread on disk I/O — same pattern the rest of +/// `memory_store::chunks::store` follows for its sync rusqlite operations. +pub async fn record_write(config: &Config, record: McpWriteRecord) -> Result<()> { + // Cap error_message before crossing the await boundary so the + // serialised form going to the worker thread is already bounded. + let mut record = record; + if let Some(ref msg) = record.error_message { + if msg.len() > ERROR_MESSAGE_MAX_BYTES { + // Truncate at a UTF-8 char boundary to keep the resulting + // string valid even if the slice point lands mid-codepoint. + let mut end = ERROR_MESSAGE_MAX_BYTES; + while end > 0 && !msg.is_char_boundary(end) { + end -= 1; + } + record.error_message = Some(msg[..end].to_string()); + } + } + + let config = config.clone(); + tokio::task::spawn_blocking(move || insert_blocking(&config, &record)) + .await + .context("audit insert task panicked")? +} + +fn insert_blocking(config: &Config, record: &McpWriteRecord) -> Result<()> { + tree_store::with_connection::<()>(config, |conn| { + conn.execute( + "INSERT INTO mcp_writes ( + timestamp_ms, + client_source_type, + tool_name, + args_summary, + resulting_chunk_id, + success, + error_message + ) VALUES (?, ?, ?, ?, ?, ?, ?)", + params![ + record.timestamp_ms, + record.client_source_type, + record.tool_name, + record.args_summary, + record.resulting_chunk_id, + record.success as i32, + record.error_message, + ], + ) + .context("INSERT INTO mcp_writes failed")?; + Ok(()) + }) +} + +/// Read-side helper used by tests (and, eventually, the +/// `openhuman.mcp_audit_list` RPC once #2536 Q4 is ratified). Returns +/// the most recent N rows ordered by `timestamp_ms DESC`. +/// +/// Kept `pub(crate)` for now — once the RPC lands this graduates to +/// `pub` and the RPC handler wraps it. +#[cfg(test)] +pub(crate) async fn recent(config: &Config, limit: u32) -> Result> { + let config = config.clone(); + tokio::task::spawn_blocking(move || recent_blocking(&config, limit)) + .await + .context("audit recent task panicked")? +} + +#[cfg(test)] +fn recent_blocking(config: &Config, limit: u32) -> Result> { + tree_store::with_connection(config, |conn| { + let mut stmt = conn + .prepare( + "SELECT timestamp_ms, client_source_type, tool_name, + args_summary, resulting_chunk_id, success, error_message + FROM mcp_writes + ORDER BY timestamp_ms DESC + LIMIT ?", + ) + .context("prepare SELECT mcp_writes")?; + let rows = stmt + .query_map(params![limit], |row| { + Ok(McpWriteRecord { + timestamp_ms: row.get(0)?, + client_source_type: row.get(1)?, + tool_name: row.get(2)?, + args_summary: row.get(3)?, + resulting_chunk_id: row.get(4)?, + success: row.get::<_, i32>(5)? != 0, + error_message: row.get(6)?, + }) + }) + .context("query mcp_writes")? + .collect::>>() + .context("collect mcp_writes rows")?; + Ok(rows) + }) +} diff --git a/src/openhuman/memory/mcp_audit/tests.rs b/src/openhuman/memory/mcp_audit/tests.rs new file mode 100644 index 0000000000..ffe6f30440 --- /dev/null +++ b/src/openhuman/memory/mcp_audit/tests.rs @@ -0,0 +1,158 @@ +//! Unit tests for the MCP audit log insert + readback round-trip. + +use std::sync::Mutex; +use tempfile::TempDir; + +use super::store::{recent, record_write}; +use super::types::McpWriteRecord; +use crate::openhuman::config::Config; + +// `tree::store::with_connection` keys its connection cache by DB path, +// so concurrent tests with different `tempdir` paths run independently +// — but within a single test we still want one workspace for clarity. +// The mutex is a defensive guard against cargo running tests in parallel +// against the same Config workspace_dir if a test forgets to use TempDir. +static TEST_GUARD: Mutex<()> = Mutex::new(()); + +fn test_config() -> (Config, TempDir) { + let tmp = TempDir::new().expect("tempdir"); + let mut config = Config::default(); + config.workspace_dir = tmp.path().to_path_buf(); + (config, tmp) +} + +fn sample_record(client: &str, tool: &str, success: bool) -> McpWriteRecord { + McpWriteRecord { + timestamp_ms: 1_700_000_000_000, + client_source_type: client.to_string(), + tool_name: tool.to_string(), + args_summary: Some(format!(r#"{{"sample":"{tool}"}}"#)), + resulting_chunk_id: success.then(|| "doc-abc-123".to_string()), + success, + error_message: if success { + None + } else { + Some("doc_put validation failed: missing 'title'".to_string()) + }, + } +} + +#[tokio::test] +async fn record_then_recent_round_trips_a_success_row() { + let _g = TEST_GUARD.lock().unwrap(); + let (config, _tmp) = test_config(); + + let record = sample_record("mcp:claude-desktop", "memory.store", true); + record_write(&config, record.clone()) + .await + .expect("insert ok"); + + let rows = recent(&config, 10).await.expect("query ok"); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0], record); +} + +#[tokio::test] +async fn record_preserves_failure_rows_with_error_message() { + let _g = TEST_GUARD.lock().unwrap(); + let (config, _tmp) = test_config(); + + let record = sample_record("mcp:cursor", "tree.tag", false); + record_write(&config, record.clone()) + .await + .expect("insert ok"); + + let rows = recent(&config, 10).await.expect("query ok"); + assert_eq!(rows.len(), 1); + assert!(!rows[0].success); + assert!(rows[0] + .error_message + .as_deref() + .expect("failure row has error_message") + .contains("doc_put validation failed")); + assert!(rows[0].resulting_chunk_id.is_none()); +} + +#[tokio::test] +async fn recent_orders_by_timestamp_descending() { + let _g = TEST_GUARD.lock().unwrap(); + let (config, _tmp) = test_config(); + + let mut newer = sample_record("mcp", "memory.store", true); + newer.timestamp_ms = 2_000_000_000_000; + let mut older = sample_record("mcp", "memory.note", true); + older.timestamp_ms = 1_000_000_000_000; + + // Insert older first so any natural insertion order can't masquerade + // as correct sorting. + record_write(&config, older.clone()).await.expect("ok"); + record_write(&config, newer.clone()).await.expect("ok"); + + let rows = recent(&config, 10).await.expect("query ok"); + assert_eq!(rows.len(), 2); + assert_eq!(rows[0].timestamp_ms, 2_000_000_000_000); + assert_eq!(rows[1].timestamp_ms, 1_000_000_000_000); +} + +#[tokio::test] +async fn recent_honours_limit() { + let _g = TEST_GUARD.lock().unwrap(); + let (config, _tmp) = test_config(); + + for i in 0..5 { + let mut r = sample_record("mcp", "memory.store", true); + r.timestamp_ms = 1_000_000_000_000 + i; + record_write(&config, r).await.expect("ok"); + } + + let rows = recent(&config, 3).await.expect("query ok"); + assert_eq!(rows.len(), 3); + // Most recent three only — timestamps 4, 3, 2. + assert_eq!(rows[0].timestamp_ms, 1_000_000_000_004); + assert_eq!(rows[2].timestamp_ms, 1_000_000_000_002); +} + +#[tokio::test] +async fn record_truncates_oversize_error_message_at_char_boundary() { + let _g = TEST_GUARD.lock().unwrap(); + let (config, _tmp) = test_config(); + + // 2 KiB error message — must be capped at 1 KiB on insert. + let oversize_error = "a".repeat(2048); + let mut record = sample_record("mcp", "memory.note", false); + record.error_message = Some(oversize_error); + + record_write(&config, record).await.expect("insert ok"); + let rows = recent(&config, 1).await.expect("query ok"); + assert_eq!(rows.len(), 1); + let stored = rows[0] + .error_message + .as_ref() + .expect("error_message stored"); + assert!(stored.len() <= 1024, "got {} bytes", stored.len()); +} + +#[tokio::test] +async fn record_handles_multibyte_truncation_safely() { + let _g = TEST_GUARD.lock().unwrap(); + let (config, _tmp) = test_config(); + + // A 4-byte UTF-8 char repeated past the 1024-byte cap. Naive byte + // truncation would slice mid-codepoint and produce invalid UTF-8; + // our `is_char_boundary` walk-back guards against that. + let multibyte_char = "🦀"; // 4 bytes + let oversize_error = multibyte_char.repeat(300); // 1200 bytes, well past cap + let mut record = sample_record("mcp", "memory.note", false); + record.error_message = Some(oversize_error); + + record_write(&config, record).await.expect("insert ok"); + let rows = recent(&config, 1).await.expect("query ok"); + let stored = rows[0] + .error_message + .as_ref() + .expect("error_message stored"); + // Must round-trip as valid UTF-8 — implicit because `String` was + // built from a `str` slice on a char boundary. + assert!(stored.is_char_boundary(stored.len())); + assert!(stored.len() <= 1024); +} diff --git a/src/openhuman/memory/mcp_audit/types.rs b/src/openhuman/memory/mcp_audit/types.rs new file mode 100644 index 0000000000..6c7f1d39b1 --- /dev/null +++ b/src/openhuman/memory/mcp_audit/types.rs @@ -0,0 +1,51 @@ +//! Types for the MCP write audit log. + +use serde::{Deserialize, Serialize}; + +/// One row of the `mcp_writes` audit table. +/// +/// Recorded per successful **and** failed MCP write tool invocation — +/// both signals matter: +/// +/// - **Success rows** answer "what did Claude Desktop write into my +/// memory this week?" for accountability / compliance. +/// - **Failure rows** are the abuse-detection signal: a misbehaving +/// client repeatedly hitting the write surface but bouncing off the +/// policy gate or `doc_put` validation will show up as a burst of +/// `success = false` rows. +/// +/// `args_summary` deliberately stores **identifying metadata only** +/// (not the document content itself) — the content lives in the +/// memory tree via `doc_put`, so duplicating it here would bloat the +/// audit table without adding information. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct McpWriteRecord { + /// Wall-clock UNIX time in milliseconds. + pub timestamp_ms: i64, + /// Provenance string captured from `McpSession::source_type()` — + /// `"mcp:claude-desktop"` / `"mcp:cursor"` / fallback `"mcp"` when + /// the MCP client didn't supply `clientInfo.name` during + /// `initialize`. Indexed for fast per-client queries. + pub client_source_type: String, + /// MCP tool name that produced the write + /// (`"memory.store"` / `"memory.note"` / `"tree.tag"`). Indexed + /// for "show me all my tag writes" queries. + pub tool_name: String, + /// Slim JSON object capturing identifying args without duplicating + /// document content. Shape varies per tool — see #2536 body for + /// the per-tool schema. `None` when args summarisation produced + /// no recordable fields (treated as a non-fatal soft-failure). + pub args_summary: Option, + /// `document_id` returned by `memory_doc_put` on the success path. + /// `None` on failure rows or when the upstream RPC reply didn't + /// include a document id. + pub resulting_chunk_id: Option, + /// `true` when the underlying RPC returned `Ok`; `false` when it + /// returned `Err` or the RPC was not registered. + pub success: bool, + /// Populated only when `success == false` — the upstream error + /// message at the `dispatch_write_tool` boundary. Truncated to + /// 1 KiB at insert time so a runaway error stack doesn't bloat + /// the table. + pub error_message: Option, +} diff --git a/src/openhuman/memory/mod.rs b/src/openhuman/memory/mod.rs index 8cf55dab18..435ea557a5 100644 --- a/src/openhuman/memory/mod.rs +++ b/src/openhuman/memory/mod.rs @@ -12,6 +12,7 @@ // Legacy memory modules pub mod global; pub mod ingestion; +pub mod mcp_audit; pub mod ops; pub mod preferences; pub mod rpc_models; diff --git a/src/openhuman/memory_store/chunks/store.rs b/src/openhuman/memory_store/chunks/store.rs index 39c77c39cf..56b7022784 100644 --- a/src/openhuman/memory_store/chunks/store.rs +++ b/src/openhuman/memory_store/chunks/store.rs @@ -342,6 +342,25 @@ CREATE TABLE IF NOT EXISTS mem_tree_ingested_sources ( ingested_at_ms INTEGER NOT NULL, PRIMARY KEY (source_kind, source_id) ); + +-- #2536 (Phase 3 RFC Q4): persistent audit log for MCP write tool +-- invocations (memory.store / memory.note / tree.tag). Co-located in +-- this DB rather than a separate file per Q1=A — shares the existing +-- connection cache + circuit breaker without a second SQLite handle. +-- Insert helper lives in `memory::mcp_audit::store::record_write`. +CREATE TABLE IF NOT EXISTS mcp_writes ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp_ms INTEGER NOT NULL, + client_source_type TEXT NOT NULL, + tool_name TEXT NOT NULL, + args_summary TEXT, + resulting_chunk_id TEXT, + success INTEGER NOT NULL, + error_message TEXT +); +CREATE INDEX IF NOT EXISTS idx_mcp_writes_timestamp ON mcp_writes (timestamp_ms DESC); +CREATE INDEX IF NOT EXISTS idx_mcp_writes_client ON mcp_writes (client_source_type); +CREATE INDEX IF NOT EXISTS idx_mcp_writes_tool ON mcp_writes (tool_name); "; /// Upsert a batch of chunks atomically.