From 8ba1f4fac0fe61697671d26e187e1157db0eb969 Mon Sep 17 00:00:00 2001 From: MackJack023 <141124084+MackJack023@users.noreply.github.com> Date: Sun, 24 May 2026 21:17:23 +0800 Subject: [PATCH 1/5] feat(mcp): audit write tool calls --- src/core/all.rs | 3 + src/core/all_tests.rs | 20 ++ src/openhuman/mcp_audit/mod.rs | 12 ++ src/openhuman/mcp_audit/schemas.rs | 91 ++++++++ src/openhuman/mcp_audit/store.rs | 296 +++++++++++++++++++++++++++ src/openhuman/mcp_audit/types.rs | 41 ++++ src/openhuman/mcp_server/protocol.rs | 2 +- src/openhuman/mcp_server/tools.rs | 215 +++++++++++++++++-- src/openhuman/memory_tree/store.rs | 20 ++ src/openhuman/mod.rs | 1 + 10 files changed, 683 insertions(+), 18 deletions(-) create mode 100644 src/openhuman/mcp_audit/mod.rs create mode 100644 src/openhuman/mcp_audit/schemas.rs create mode 100644 src/openhuman/mcp_audit/store.rs create mode 100644 src/openhuman/mcp_audit/types.rs diff --git a/src/core/all.rs b/src/core/all.rs index e6cbebf682..26bd876af9 100644 --- a/src/core/all.rs +++ b/src/core/all.rs @@ -264,6 +264,9 @@ fn build_internal_only_controllers() -> Vec { // whatsapp_data ingest: scanner-side write path. Callable over RPC by the // Tauri scanner but excluded from agent-facing schema discovery. controllers.extend(crate::openhuman::whatsapp_data::all_whatsapp_data_internal_controllers()); + // MCP write audit list: internal-only so the desktop UI/CLI can inspect + // local write history without exposing cross-client history as an MCP tool. + controllers.extend(crate::openhuman::mcp_audit::all_mcp_audit_internal_controllers()); controllers } diff --git a/src/core/all_tests.rs b/src/core/all_tests.rs index 140a037e43..3da14fb623 100644 --- a/src/core/all_tests.rs +++ b/src/core/all_tests.rs @@ -205,6 +205,26 @@ fn schema_for_rpc_method_finds_security_policy_info() { assert_eq!(s.function, "policy_info"); } +#[test] +fn schema_for_rpc_method_finds_internal_mcp_audit_list() { + let schema = schema_for_rpc_method("openhuman.mcp_audit_list"); + assert!( + schema.is_some(), + "mcp_audit.list should be internally routable" + ); + let s = schema.unwrap(); + assert_eq!(s.namespace, "mcp_audit"); + assert_eq!(s.function, "list"); +} + +#[test] +fn rpc_method_from_parts_does_not_expose_internal_mcp_audit_list() { + assert!( + rpc_method_from_parts("mcp_audit", "list").is_none(), + "internal MCP audit RPC must not appear in the public controller registry" + ); +} + #[test] fn schema_for_rpc_method_returns_none_for_unknown() { assert!(schema_for_rpc_method("openhuman.nonexistent_method_xyz").is_none()); diff --git a/src/openhuman/mcp_audit/mod.rs b/src/openhuman/mcp_audit/mod.rs new file mode 100644 index 0000000000..8cf7d001f9 --- /dev/null +++ b/src/openhuman/mcp_audit/mod.rs @@ -0,0 +1,12 @@ +//! Persistent audit log for MCP write-tool calls. +//! +//! The audit table is stored in the existing memory-tree SQLite database so +//! writes and their query surface reuse the same local workspace persistence. + +mod schemas; +pub mod store; +pub mod types; + +pub use schemas::all_internal_controllers as all_mcp_audit_internal_controllers; +pub use store::{list_writes, record_write}; +pub use types::{McpWriteListQuery, McpWriteRecord, NewMcpWriteRecord}; diff --git a/src/openhuman/mcp_audit/schemas.rs b/src/openhuman/mcp_audit/schemas.rs new file mode 100644 index 0000000000..c9486fefcc --- /dev/null +++ b/src/openhuman/mcp_audit/schemas.rs @@ -0,0 +1,91 @@ +use serde_json::{Map, Value}; + +use crate::core::all::{ControllerFuture, RegisteredController}; +use crate::core::{ControllerSchema, FieldSchema, TypeSchema}; +use crate::openhuman::config::rpc as config_rpc; + +use super::store; +use super::types::McpWriteListQuery; + +pub fn all_internal_controllers() -> Vec { + vec![RegisteredController { + schema: schema(), + handler: handle_list, + }] +} + +fn schema() -> ControllerSchema { + ControllerSchema { + namespace: "mcp_audit", + function: "list", + description: "List MCP write-tool audit records from local workspace persistence.", + inputs: vec![ + FieldSchema { + name: "limit", + ty: TypeSchema::Option(Box::new(TypeSchema::U64)), + comment: "Maximum number of rows to return (default 50, max 500).", + required: false, + }, + FieldSchema { + name: "offset", + ty: TypeSchema::Option(Box::new(TypeSchema::U64)), + comment: "Number of rows to skip from the newest-first result set.", + required: false, + }, + FieldSchema { + name: "since_ms", + ty: TypeSchema::Option(Box::new(TypeSchema::U64)), + comment: "Only return rows at or after this Unix timestamp in milliseconds.", + required: false, + }, + FieldSchema { + name: "client_filter", + ty: TypeSchema::Option(Box::new(TypeSchema::String)), + comment: "Exact client_info filter, for example `mcp:claude-desktop`.", + required: false, + }, + FieldSchema { + name: "tool_filter", + ty: TypeSchema::Option(Box::new(TypeSchema::String)), + comment: "Exact tool_name filter, for example `memory.store`.", + required: false, + }, + FieldSchema { + name: "success_only", + ty: TypeSchema::Option(Box::new(TypeSchema::Bool)), + comment: "When true, only return successful writes.", + required: false, + }, + ], + outputs: vec![FieldSchema { + name: "records", + ty: TypeSchema::Array(Box::new(TypeSchema::Ref("McpWriteRecord"))), + comment: "MCP write audit records ordered by timestamp descending.", + required: true, + }], + } +} + +fn handle_list(params: Map) -> ControllerFuture { + Box::pin(async move { + let config = config_rpc::load_config_with_timeout().await?; + let query = serde_json::from_value::(Value::Object(params)) + .map_err(|err| format!("invalid params: {err}"))?; + let records = store::list_writes(&config, &query).map_err(|err| err.to_string())?; + serde_json::to_value(records).map_err(|err| err.to_string()) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn internal_controller_registers_expected_rpc_name() { + let controllers = all_internal_controllers(); + assert_eq!(controllers.len(), 1); + assert_eq!(controllers[0].schema.namespace, "mcp_audit"); + assert_eq!(controllers[0].schema.function, "list"); + assert_eq!(controllers[0].rpc_method_name(), "openhuman.mcp_audit_list"); + } +} diff --git a/src/openhuman/mcp_audit/store.rs b/src/openhuman/mcp_audit/store.rs new file mode 100644 index 0000000000..a63eebae42 --- /dev/null +++ b/src/openhuman/mcp_audit/store.rs @@ -0,0 +1,296 @@ +use anyhow::{Context, Result}; +use rusqlite::{params, types::Type, Row, ToSql}; +use serde_json::Value; + +use crate::openhuman::config::Config; +use crate::openhuman::memory_tree; + +use super::types::{McpWriteListQuery, McpWriteRecord, NewMcpWriteRecord}; + +const DEFAULT_LIST_LIMIT: u64 = 50; +const MAX_LIST_LIMIT: u64 = 500; + +pub fn record_write(config: &Config, record: NewMcpWriteRecord) -> Result { + let args_summary = serde_json::to_string(&record.args_summary) + .context("failed to serialize mcp write args_summary")?; + memory_tree::store::with_connection(config, |conn| { + conn.execute( + "INSERT INTO mcp_writes ( + timestamp_ms, + client_info, + tool_name, + args_summary, + resulting_chunk_id, + success, + error_message + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", + params![ + record.timestamp_ms, + record.client_info, + record.tool_name, + args_summary, + record.resulting_chunk_id, + if record.success { 1_i64 } else { 0_i64 }, + record.error_message, + ], + ) + .context("failed to insert mcp_writes audit row")?; + Ok(conn.last_insert_rowid()) + }) +} + +pub fn list_writes(config: &Config, query: &McpWriteListQuery) -> Result> { + memory_tree::store::with_connection(config, |conn| { + let mut sql = String::from( + "SELECT + id, + timestamp_ms, + client_info, + tool_name, + args_summary, + resulting_chunk_id, + success, + error_message + FROM mcp_writes + WHERE 1=1", + ); + let mut bound: Vec> = Vec::new(); + + if let Some(since_ms) = query.since_ms { + sql.push_str(" AND timestamp_ms >= ?"); + bound.push(Box::new(u64_to_i64(since_ms, "since_ms")?)); + } + if let Some(client) = normalized_filter(query.client_filter.as_deref()) { + sql.push_str(" AND client_info = ?"); + bound.push(Box::new(client.to_string())); + } + if let Some(tool) = normalized_filter(query.tool_filter.as_deref()) { + sql.push_str(" AND tool_name = ?"); + bound.push(Box::new(tool.to_string())); + } + if query.success_only.unwrap_or(false) { + sql.push_str(" AND success = 1"); + } + + sql.push_str(" ORDER BY timestamp_ms DESC, id DESC LIMIT ? OFFSET ?"); + bound.push(Box::new(normalized_limit(query.limit)?)); + bound.push(Box::new(normalized_offset(query.offset)?)); + + let refs = bound + .iter() + .map(|value| value.as_ref() as &dyn ToSql) + .collect::>(); + let mut stmt = conn + .prepare(&sql) + .context("failed to prepare mcp_writes list query")?; + let rows = stmt + .query_map(refs.as_slice(), row_to_record) + .context("failed to query mcp_writes")? + .collect::>>() + .context("failed to collect mcp_writes rows")?; + Ok(rows) + }) +} + +fn normalized_limit(limit: Option) -> Result { + u64_to_i64( + limit.unwrap_or(DEFAULT_LIST_LIMIT).min(MAX_LIST_LIMIT), + "limit", + ) +} + +fn normalized_offset(offset: Option) -> Result { + u64_to_i64(offset.unwrap_or(0), "offset") +} + +fn u64_to_i64(value: u64, field: &str) -> Result { + i64::try_from(value).with_context(|| format!("{field} is too large for SQLite INTEGER")) +} + +fn normalized_filter(value: Option<&str>) -> Option<&str> { + value.map(str::trim).filter(|value| !value.is_empty()) +} + +fn row_to_record(row: &Row<'_>) -> rusqlite::Result { + let args_summary_text: Option = row.get(4)?; + let args_summary = match args_summary_text { + Some(text) => serde_json::from_str::(&text).map_err(|err| { + rusqlite::Error::FromSqlConversionFailure(4, Type::Text, Box::new(err)) + })?, + None => Value::Null, + }; + let success: i64 = row.get(6)?; + Ok(McpWriteRecord { + id: row.get(0)?, + timestamp_ms: row.get(1)?, + client_info: row.get(2)?, + tool_name: row.get(3)?, + args_summary, + resulting_chunk_id: row.get(5)?, + success: success != 0, + error_message: row.get(7)?, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + use tempfile::TempDir; + + fn test_config(tmp: &TempDir) -> Config { + let mut config = Config::default(); + config.workspace_dir = tmp.path().join("workspace"); + std::fs::create_dir_all(&config.workspace_dir).unwrap(); + config + } + + fn record( + timestamp_ms: i64, + client_info: &str, + tool_name: &str, + success: bool, + ) -> NewMcpWriteRecord { + NewMcpWriteRecord { + timestamp_ms, + client_info: client_info.to_string(), + tool_name: tool_name.to_string(), + args_summary: json!({ "title": format!("record-{timestamp_ms}") }), + resulting_chunk_id: success.then(|| format!("chunk-{timestamp_ms}")), + success, + error_message: (!success).then(|| "write failed".to_string()), + } + } + + #[test] + fn record_write_inserts_success_and_failure_rows() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + + let success_id = record_write( + &config, + record(100, "mcp:claude-desktop", "memory.store", true), + ) + .unwrap(); + let failure_id = + record_write(&config, record(200, "mcp:cursor", "tree.tag", false)).unwrap(); + + let rows = list_writes(&config, &McpWriteListQuery::default()).unwrap(); + assert_eq!(rows.len(), 2); + assert_eq!(rows[0].id, failure_id); + assert!(!rows[0].success); + assert_eq!(rows[0].error_message.as_deref(), Some("write failed")); + assert_eq!(rows[1].id, success_id); + assert!(rows[1].success); + assert_eq!(rows[1].resulting_chunk_id.as_deref(), Some("chunk-100")); + } + + #[test] + fn list_writes_filters_by_client_tool_since_and_success() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + for row in [ + record(100, "mcp:claude-desktop", "memory.store", true), + record(200, "mcp:cursor", "memory.note", false), + record(300, "mcp:claude-desktop", "tree.tag", true), + record(400, "mcp:cursor", "tree.tag", true), + ] { + record_write(&config, row).unwrap(); + } + + let by_client = list_writes( + &config, + &McpWriteListQuery { + client_filter: Some("mcp:claude-desktop".into()), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!(by_client.len(), 2); + assert!(by_client + .iter() + .all(|row| row.client_info == "mcp:claude-desktop")); + + let by_tool = list_writes( + &config, + &McpWriteListQuery { + tool_filter: Some("tree.tag".into()), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!( + by_tool + .iter() + .map(|row| row.timestamp_ms) + .collect::>(), + vec![400, 300] + ); + + let since = list_writes( + &config, + &McpWriteListQuery { + since_ms: Some(250), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!( + since.iter().map(|row| row.timestamp_ms).collect::>(), + vec![400, 300] + ); + + let success_only = list_writes( + &config, + &McpWriteListQuery { + success_only: Some(true), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!(success_only.len(), 3); + assert!(success_only.iter().all(|row| row.success)); + } + + #[test] + fn list_writes_orders_newest_first_and_supports_limit_offset() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + for ts in [100, 200, 300, 400] { + record_write(&config, record(ts, "mcp", "memory.store", true)).unwrap(); + } + + let rows = list_writes( + &config, + &McpWriteListQuery { + limit: Some(2), + offset: Some(1), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!( + rows.iter().map(|row| row.timestamp_ms).collect::>(), + vec![300, 200] + ); + } + + #[test] + fn list_writes_caps_limit_at_max() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + for ts in 0..505 { + record_write(&config, record(ts, "mcp", "memory.store", true)).unwrap(); + } + + let rows = list_writes( + &config, + &McpWriteListQuery { + limit: Some(MAX_LIST_LIMIT + 100), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!(rows.len(), MAX_LIST_LIMIT as usize); + } +} diff --git a/src/openhuman/mcp_audit/types.rs b/src/openhuman/mcp_audit/types.rs new file mode 100644 index 0000000000..94ff14d6c0 --- /dev/null +++ b/src/openhuman/mcp_audit/types.rs @@ -0,0 +1,41 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct NewMcpWriteRecord { + pub timestamp_ms: i64, + pub client_info: String, + pub tool_name: String, + pub args_summary: Value, + pub resulting_chunk_id: Option, + pub success: bool, + pub error_message: Option, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct McpWriteRecord { + pub id: i64, + pub timestamp_ms: i64, + pub client_info: String, + pub tool_name: String, + pub args_summary: Value, + pub resulting_chunk_id: Option, + pub success: bool, + pub error_message: Option, +} + +#[derive(Debug, Clone, Default, PartialEq, Eq, Deserialize)] +pub struct McpWriteListQuery { + #[serde(default)] + pub limit: Option, + #[serde(default)] + pub offset: Option, + #[serde(default)] + pub since_ms: Option, + #[serde(default)] + pub client_filter: Option, + #[serde(default)] + pub tool_filter: Option, + #[serde(default)] + pub success_only: Option, +} diff --git a/src/openhuman/mcp_server/protocol.rs b/src/openhuman/mcp_server/protocol.rs index 14158c85d7..d87393e436 100644 --- a/src/openhuman/mcp_server/protocol.rs +++ b/src/openhuman/mcp_server/protocol.rs @@ -174,7 +174,7 @@ async fn handle_request(id: Value, method: &str, params: Value, session: &mut Mc session.source_type(), object_keys(&arguments) ); - match tools::call_tool(&name, arguments).await { + match tools::call_tool(&name, arguments, session.source_type()).await { Ok(result) => { log::debug!( "[mcp_server] tools/call response id={} tool={} client_source_type={} is_error={}", diff --git a/src/openhuman/mcp_server/tools.rs b/src/openhuman/mcp_server/tools.rs index ee417d27df..11144198b7 100644 --- a/src/openhuman/mcp_server/tools.rs +++ b/src/openhuman/mcp_server/tools.rs @@ -4,8 +4,10 @@ use crate::core::all; use crate::openhuman::agent::harness::AgentDefinitionRegistry; use crate::openhuman::agent::Agent; use crate::openhuman::config::rpc as config_rpc; +use crate::openhuman::config::Config; use crate::openhuman::inference::provider::traits::build_tool_instructions_text; use crate::openhuman::integrations::searxng::MAX_RESULTS as SEARXNG_MAX_RESULTS; +use crate::openhuman::mcp_audit::{self, NewMcpWriteRecord}; use crate::openhuman::security::{SecurityPolicy, ToolOperation}; const DEFAULT_LIMIT: u64 = 10; @@ -543,13 +545,18 @@ fn list_tools_result_from_specs(specs: Vec) -> Value { json!({ "tools": tools }) } -pub async fn call_tool(name: &str, arguments: Value) -> Result { +pub async fn call_tool( + name: &str, + arguments: Value, + client_info: &str, +) -> Result { let spec = tool_specs() .into_iter() .find(|tool| tool.name == name) .ok_or_else(|| ToolCallError::InvalidParams(format!("unknown MCP tool `{name}`")))?; - let params = build_rpc_params(spec.name, arguments)?; + let audit_arguments = arguments.clone(); + let mut params = build_rpc_params(spec.name, arguments)?; match spec.name { "core.list_tools" => { reject_unexpected_arguments(¶ms, &[])?; @@ -571,9 +578,14 @@ pub async fn call_tool(name: &str, arguments: Value) -> Result { - enforce_write_policy(spec.name).await?; + let config = enforce_write_policy(spec.name).await?; + params.insert( + "source_type".to_string(), + Value::String(client_info.to_string()), + ); validate_controller_params(&spec, ¶ms)?; - return dispatch_write_tool(spec.name, ¶ms).await; + return dispatch_write_tool(spec.name, ¶ms, &audit_arguments, client_info, &config) + .await; } _ => {} } @@ -1146,7 +1158,7 @@ async fn enforce_act_policy(tool_name: &str) -> Result<(), ToolCallError> { /// Write operations use the same gate as Act — they are side-effecting and /// must not run in read-only mode. The separate function gives us a distinct /// log line so auditors can tell reads from writes at a glance. -async fn enforce_write_policy(tool_name: &str) -> Result<(), ToolCallError> { +async fn enforce_write_policy(tool_name: &str) -> Result { let config = match config_rpc::load_config_with_timeout().await { Ok(config) => config, Err(err) => { @@ -1161,7 +1173,8 @@ async fn enforce_write_policy(tool_name: &str) -> Result<(), ToolCallError> { let policy = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir); policy .enforce_tool_operation(ToolOperation::Act, tool_name) - .map_err(ToolCallError::InvalidParams) + .map_err(ToolCallError::InvalidParams)?; + Ok(config) } /// Dispatch a write tool to its underlying RPC method with provenance and @@ -1169,31 +1182,55 @@ async fn enforce_write_policy(tool_name: &str) -> Result<(), ToolCallError> { async fn dispatch_write_tool( tool_name: &str, params: &Map, + audit_arguments: &Value, + client_info: &str, + config: &Config, ) -> Result { let rpc_method = "openhuman.memory_doc_put"; tracing::info!( tool = tool_name, rpc_method = rpc_method, - client = "mcp", + client = client_info, "[mcp_server] write dispatch" ); match all::try_invoke_registered_rpc(rpc_method, params.clone()).await { Some(Ok(value)) => { - let document_id = value - .get("document_id") - .and_then(Value::as_str) - .unwrap_or(""); + let document_id = extract_document_id(&value); + audit_write( + config, + NewMcpWriteRecord { + timestamp_ms: now_ms(), + client_info: client_info.to_string(), + tool_name: tool_name.to_string(), + args_summary: summarize_write_args(tool_name, audit_arguments), + resulting_chunk_id: document_id.clone(), + success: true, + error_message: None, + }, + ); tracing::info!( tool = tool_name, - chunk_id = document_id, - client = "mcp", + chunk_id = document_id.as_deref().unwrap_or(""), + client = client_info, "[mcp_server] write success" ); Ok(tool_success(value)) } Some(Err(message)) => { + audit_write( + config, + NewMcpWriteRecord { + timestamp_ms: now_ms(), + client_info: client_info.to_string(), + tool_name: tool_name.to_string(), + args_summary: summarize_write_args(tool_name, audit_arguments), + resulting_chunk_id: None, + success: false, + error_message: Some(message.clone()), + }, + ); log::warn!( "[mcp_server] write handler error tool={} error={}", tool_name, @@ -1202,19 +1239,107 @@ async fn dispatch_write_tool( Ok(tool_error(format!("{} failed: {message}", tool_name))) } None => { + let message = format!("mapped RPC method `{rpc_method}` is not registered"); + audit_write( + config, + NewMcpWriteRecord { + timestamp_ms: now_ms(), + client_info: client_info.to_string(), + tool_name: tool_name.to_string(), + args_summary: summarize_write_args(tool_name, audit_arguments), + resulting_chunk_id: None, + success: false, + error_message: Some(message.clone()), + }, + ); log::error!( "[mcp_server] write mapping missing registered RPC method tool={} rpc_method={}", tool_name, rpc_method ); - Ok(tool_error(format!( - "{} is unavailable: mapped RPC method `{}` is not registered", - tool_name, rpc_method - ))) + Ok(tool_error(format!("{tool_name} is unavailable: {message}"))) } } } +fn audit_write(config: &Config, record: NewMcpWriteRecord) { + if let Err(err) = mcp_audit::record_write(config, record) { + log::warn!("[mcp_server] mcp write audit insert failed: {err}"); + } +} + +fn now_ms() -> i64 { + chrono::Utc::now().timestamp_millis() +} + +fn extract_document_id(value: &Value) -> Option { + value + .get("document_id") + .or_else(|| { + value + .get("result") + .and_then(|result| result.get("document_id")) + }) + .and_then(Value::as_str) + .map(str::to_string) +} + +fn summarize_write_args(tool_name: &str, arguments: &Value) -> Value { + let Some(args) = arguments.as_object() else { + return json!({}); + }; + match tool_name { + "memory.store" => json!({ + "title": args + .get("title") + .and_then(Value::as_str) + .map(|title| first_chars(title, 128)) + .unwrap_or_default(), + "namespace": args + .get("namespace") + .and_then(Value::as_str) + .unwrap_or("mcp"), + "tag_count": args + .get("tags") + .and_then(Value::as_array) + .map(|tags| tags.len()) + .unwrap_or(0), + }), + "memory.note" => json!({ + "chunk_id": args + .get("chunk_id") + .and_then(Value::as_str) + .unwrap_or_default(), + "note_text_length": args + .get("note_text") + .and_then(Value::as_str) + .map(|note| note.chars().count()) + .unwrap_or(0), + }), + "tree.tag" => json!({ + "chunk_id": args + .get("chunk_id") + .and_then(Value::as_str) + .unwrap_or_default(), + "tags": args + .get("tags") + .and_then(Value::as_array) + .map(|tags| { + tags.iter() + .filter_map(Value::as_str) + .map(str::to_string) + .collect::>() + }) + .unwrap_or_default(), + }), + _ => json!({}), + } +} + +fn first_chars(value: &str, max_chars: usize) -> String { + value.chars().take(max_chars).collect() +} + async fn load_config_and_init_registry() -> Result { let config = config_rpc::load_config_with_timeout() @@ -2245,6 +2370,62 @@ mod tests { ); } + // ── MCP write audit summary ──────────────────────────────────────── + + #[test] + fn summarize_write_args_omits_memory_store_content() { + let summary = summarize_write_args( + "memory.store", + &json!({ + "title": "A".repeat(140), + "content": "private body", + "namespace": "work", + "tags": ["project", "planning"] + }), + ); + assert_eq!(summary["title"].as_str().unwrap().chars().count(), 128); + assert_eq!(summary["namespace"], "work"); + assert_eq!(summary["tag_count"], 2); + assert!(summary.get("content").is_none()); + } + + #[test] + fn summarize_write_args_omits_memory_note_text() { + let summary = summarize_write_args( + "memory.note", + &json!({ "chunk_id": "chunk-42", "note_text": "Important context" }), + ); + assert_eq!(summary["chunk_id"], "chunk-42"); + assert_eq!( + summary["note_text_length"].as_u64(), + Some("Important context".chars().count() as u64) + ); + assert!(summary.get("note_text").is_none()); + } + + #[test] + fn summarize_write_args_keeps_tree_tag_labels() { + let summary = summarize_write_args( + "tree.tag", + &json!({ "chunk_id": "chunk-42", "tags": ["todo", "q3"] }), + ); + assert_eq!(summary["chunk_id"], "chunk-42"); + assert_eq!(summary["tags"], json!(["todo", "q3"])); + } + + #[test] + fn extract_document_id_reads_rpc_outcome_envelope() { + assert_eq!( + extract_document_id(&json!({"result": {"document_id": "doc-123"}, "logs": []})) + .as_deref(), + Some("doc-123") + ); + assert_eq!( + extract_document_id(&json!({"document_id": "doc-456"})).as_deref(), + Some("doc-456") + ); + } + // ── slug_from ───────────────────────────────────────────────────── #[test] diff --git a/src/openhuman/memory_tree/store.rs b/src/openhuman/memory_tree/store.rs index 1902b0b6d0..3d0104c650 100644 --- a/src/openhuman/memory_tree/store.rs +++ b/src/openhuman/memory_tree/store.rs @@ -342,6 +342,26 @@ CREATE TABLE IF NOT EXISTS mem_tree_ingested_sources ( ingested_at_ms INTEGER NOT NULL, PRIMARY KEY (source_kind, source_id) ); + +-- MCP write-tool audit trail (#2536). This intentionally stores compact +-- identifying metadata instead of duplicating the memory document body. +CREATE TABLE IF NOT EXISTS mcp_writes ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp_ms INTEGER NOT NULL, + client_info TEXT NOT NULL, + tool_name TEXT NOT NULL, + args_summary TEXT, + resulting_chunk_id TEXT, + success INTEGER NOT NULL, + error_message TEXT +); + +CREATE INDEX IF NOT EXISTS idx_mcp_writes_timestamp + ON mcp_writes(timestamp_ms DESC); +CREATE INDEX IF NOT EXISTS idx_mcp_writes_client + ON mcp_writes(client_info); +CREATE INDEX IF NOT EXISTS idx_mcp_writes_tool + ON mcp_writes(tool_name); "; /// Upsert a batch of chunks atomically. diff --git a/src/openhuman/mod.rs b/src/openhuman/mod.rs index 4a29ac5cf2..80c8689782 100644 --- a/src/openhuman/mod.rs +++ b/src/openhuman/mod.rs @@ -47,6 +47,7 @@ pub mod integrations; pub mod javascript; pub mod keyring; pub mod learning; +pub mod mcp_audit; pub mod mcp_client; pub mod mcp_registry; pub mod mcp_server; From 32ae7731be2de86ac1fd17148329c97b294cb89d Mon Sep 17 00:00:00 2001 From: MackJack023 <141124084+MackJack023@users.noreply.github.com> Date: Sun, 24 May 2026 22:11:47 +0800 Subject: [PATCH 2/5] fix(mcp): address write audit review feedback --- src/openhuman/mcp_audit/mod.rs | 7 +- src/openhuman/mcp_audit/schemas.rs | 124 +++++++++++- src/openhuman/mcp_audit/store.rs | 147 +++++++++++--- src/openhuman/mcp_server/tools.rs | 313 +++++++++++++++++++++++++++-- 4 files changed, 543 insertions(+), 48 deletions(-) diff --git a/src/openhuman/mcp_audit/mod.rs b/src/openhuman/mcp_audit/mod.rs index 8cf7d001f9..648d618752 100644 --- a/src/openhuman/mcp_audit/mod.rs +++ b/src/openhuman/mcp_audit/mod.rs @@ -7,6 +7,11 @@ mod schemas; pub mod store; pub mod types; -pub use schemas::all_internal_controllers as all_mcp_audit_internal_controllers; +pub use schemas::{ + all_controller_schemas as all_mcp_audit_controller_schemas, + all_internal_controllers as all_mcp_audit_internal_controllers, + all_registered_controllers as all_mcp_audit_registered_controllers, + schemas as mcp_audit_schemas, +}; pub use store::{list_writes, record_write}; pub use types::{McpWriteListQuery, McpWriteRecord, NewMcpWriteRecord}; diff --git a/src/openhuman/mcp_audit/schemas.rs b/src/openhuman/mcp_audit/schemas.rs index c9486fefcc..b4fc1f8ce5 100644 --- a/src/openhuman/mcp_audit/schemas.rs +++ b/src/openhuman/mcp_audit/schemas.rs @@ -7,9 +7,24 @@ use crate::openhuman::config::rpc as config_rpc; use super::store; use super::types::McpWriteListQuery; +pub fn schemas(function: &str) -> ControllerSchema { + match function { + "list" => schema(), + other => panic!("unknown mcp_audit controller schema `{other}`"), + } +} + +pub fn all_controller_schemas() -> Vec { + vec![schemas("list")] +} + +pub fn all_registered_controllers() -> Vec { + all_internal_controllers() +} + pub fn all_internal_controllers() -> Vec { vec![RegisteredController { - schema: schema(), + schema: schemas("list"), handler: handle_list, }] } @@ -68,17 +83,65 @@ fn schema() -> ControllerSchema { fn handle_list(params: Map) -> ControllerFuture { Box::pin(async move { - let config = config_rpc::load_config_with_timeout().await?; - let query = serde_json::from_value::(Value::Object(params)) - .map_err(|err| format!("invalid params: {err}"))?; - let records = store::list_writes(&config, &query).map_err(|err| err.to_string())?; - serde_json::to_value(records).map_err(|err| err.to_string()) + log::debug!("[mcp_audit] handle_list enter params={params:?}"); + log::trace!("[mcp_audit] handle_list loading config"); + let config = match config_rpc::load_config_with_timeout().await { + Ok(config) => { + log::trace!( + "[mcp_audit] handle_list config loaded workspace={}", + config.workspace_dir.display() + ); + config + } + Err(err) => { + log::warn!("[mcp_audit] handle_list config load failed error={err}"); + return Err(err); + } + }; + + let query = match serde_json::from_value::(Value::Object(params)) { + Ok(query) => { + log::trace!("[mcp_audit] handle_list parsed query={query:?}"); + query + } + Err(err) => { + log::warn!("[mcp_audit] handle_list invalid params error={err}"); + return Err(format!("invalid params: {err}")); + } + }; + + log::trace!( + "[mcp_audit] handle_list querying store workspace={} query={query:?}", + config.workspace_dir.display() + ); + let records = match store::list_writes(&config, &query) { + Ok(records) => { + log::trace!( + "[mcp_audit] handle_list store success records={}", + records.len() + ); + records + } + Err(err) => { + log::warn!("[mcp_audit] handle_list store failed query={query:?} error={err}"); + return Err(err.to_string()); + } + }; + + let count = records.len(); + let value = serde_json::to_value(records).map_err(|err| { + log::warn!("[mcp_audit] handle_list serialize response failed error={err}"); + err.to_string() + })?; + log::debug!("[mcp_audit] handle_list exit records={count}"); + Ok(value) }) } #[cfg(test)] mod tests { use super::*; + use serde_json::json; #[test] fn internal_controller_registers_expected_rpc_name() { @@ -88,4 +151,53 @@ mod tests { assert_eq!(controllers[0].schema.function, "list"); assert_eq!(controllers[0].rpc_method_name(), "openhuman.mcp_audit_list"); } + + #[test] + fn domain_schema_exports_match_internal_controller() { + let schemas = all_controller_schemas(); + let controllers = all_registered_controllers(); + + assert_eq!(schemas.len(), 1); + assert_eq!(schemas[0].namespace, "mcp_audit"); + assert_eq!(controllers.len(), 1); + assert_eq!(controllers[0].schema.function, schemas[0].function); + } + + #[tokio::test] + async fn handle_list_returns_persisted_audit_records() { + let _env_lock = crate::openhuman::config::TEST_ENV_LOCK + .lock() + .unwrap_or_else(|err| err.into_inner()); + let tmp = tempfile::tempdir().expect("tempdir"); + unsafe { + std::env::set_var("OPENHUMAN_WORKSPACE", tmp.path()); + } + + let config = config_rpc::load_config_with_timeout() + .await + .expect("config"); + store::record_write( + &config, + crate::openhuman::mcp_audit::NewMcpWriteRecord { + timestamp_ms: 10, + client_info: "mcp:test".into(), + tool_name: "memory.store".into(), + args_summary: json!({ "title": "safe" }), + resulting_chunk_id: Some("chunk-1".into()), + success: true, + error_message: None, + }, + ) + .expect("record write"); + + let value = handle_list(Map::new()).await.expect("handle list"); + let records = value.as_array().expect("records array"); + assert_eq!(records.len(), 1); + assert_eq!(records[0]["tool_name"], "memory.store"); + assert_eq!(records[0]["client_info"], "mcp:test"); + + unsafe { + std::env::remove_var("OPENHUMAN_WORKSPACE"); + } + } } diff --git a/src/openhuman/mcp_audit/store.rs b/src/openhuman/mcp_audit/store.rs index a63eebae42..bd31e1fe36 100644 --- a/src/openhuman/mcp_audit/store.rs +++ b/src/openhuman/mcp_audit/store.rs @@ -11,10 +11,41 @@ const DEFAULT_LIST_LIMIT: u64 = 50; const MAX_LIST_LIMIT: u64 = 500; pub fn record_write(config: &Config, record: NewMcpWriteRecord) -> Result { - let args_summary = serde_json::to_string(&record.args_summary) - .context("failed to serialize mcp write args_summary")?; - memory_tree::store::with_connection(config, |conn| { - conn.execute( + log::debug!( + "[mcp_audit] record_write enter tool={} client={} timestamp_ms={} success={} has_error={}", + record.tool_name, + record.client_info, + record.timestamp_ms, + record.success, + record.error_message.is_some() + ); + let args_summary = match serde_json::to_string(&record.args_summary) { + Ok(args_summary) => { + log::trace!( + "[mcp_audit] record_write args_summary serialized tool={} bytes={}", + record.tool_name, + args_summary.len() + ); + args_summary + } + Err(err) => { + log::warn!( + "[mcp_audit] record_write args_summary serialize failed tool={} client={} error={err}", + record.tool_name, + record.client_info + ); + return Err(anyhow::Error::new(err)) + .context("failed to serialize mcp write args_summary"); + } + }; + let result = memory_tree::store::with_connection(config, |conn| { + log::trace!( + "[mcp_audit] record_write inserting row tool={} client={} timestamp_ms={}", + record.tool_name, + record.client_info, + record.timestamp_ms + ); + match conn.execute( "INSERT INTO mcp_writes ( timestamp_ms, client_info, @@ -26,21 +57,55 @@ pub fn record_write(config: &Config, record: NewMcpWriteRecord) -> Result { ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", params![ record.timestamp_ms, - record.client_info, - record.tool_name, - args_summary, - record.resulting_chunk_id, + &record.client_info, + &record.tool_name, + &args_summary, + record.resulting_chunk_id.as_deref(), if record.success { 1_i64 } else { 0_i64 }, - record.error_message, + record.error_message.as_deref(), ], - ) - .context("failed to insert mcp_writes audit row")?; + ) { + Ok(_) => {} + Err(err) => { + log::warn!( + "[mcp_audit] record_write insert failed tool={} client={} timestamp_ms={} error={err}", + record.tool_name, + record.client_info, + record.timestamp_ms + ); + return Err(anyhow::Error::new(err)) + .context("failed to insert mcp_writes audit row"); + } + } Ok(conn.last_insert_rowid()) - }) + }); + match &result { + Ok(row_id) => log::debug!( + "[mcp_audit] record_write exit row_id={} tool={} client={}", + row_id, + record.tool_name, + record.client_info + ), + Err(err) => log::warn!( + "[mcp_audit] record_write failed tool={} client={} error={err}", + record.tool_name, + record.client_info + ), + } + result } pub fn list_writes(config: &Config, query: &McpWriteListQuery) -> Result> { - memory_tree::store::with_connection(config, |conn| { + log::debug!( + "[mcp_audit] list_writes enter since_ms={:?} limit={:?} offset={:?} client_filter={:?} tool_filter={:?} success_only={:?}", + query.since_ms, + query.limit, + query.offset, + query.client_filter, + query.tool_filter, + query.success_only + ); + let result = memory_tree::store::with_connection(config, |conn| { let mut sql = String::from( "SELECT id, @@ -59,37 +124,71 @@ pub fn list_writes(config: &Config, query: &McpWriteListQuery) -> Result= ?"); bound.push(Box::new(u64_to_i64(since_ms, "since_ms")?)); + log::trace!("[mcp_audit] list_writes applied since_ms filter={since_ms}"); } if let Some(client) = normalized_filter(query.client_filter.as_deref()) { sql.push_str(" AND client_info = ?"); bound.push(Box::new(client.to_string())); + log::trace!("[mcp_audit] list_writes applied client filter={client}"); } if let Some(tool) = normalized_filter(query.tool_filter.as_deref()) { sql.push_str(" AND tool_name = ?"); bound.push(Box::new(tool.to_string())); + log::trace!("[mcp_audit] list_writes applied tool filter={tool}"); } if query.success_only.unwrap_or(false) { sql.push_str(" AND success = 1"); + log::trace!("[mcp_audit] list_writes applied success_only filter"); } sql.push_str(" ORDER BY timestamp_ms DESC, id DESC LIMIT ? OFFSET ?"); - bound.push(Box::new(normalized_limit(query.limit)?)); - bound.push(Box::new(normalized_offset(query.offset)?)); + let limit = normalized_limit(query.limit)?; + let offset = normalized_offset(query.offset)?; + bound.push(Box::new(limit)); + bound.push(Box::new(offset)); + log::trace!( + "[mcp_audit] list_writes sql={} bound_count={} limit={} offset={}", + sql, + bound.len(), + limit, + offset + ); let refs = bound .iter() .map(|value| value.as_ref() as &dyn ToSql) .collect::>(); - let mut stmt = conn - .prepare(&sql) - .context("failed to prepare mcp_writes list query")?; - let rows = stmt - .query_map(refs.as_slice(), row_to_record) - .context("failed to query mcp_writes")? - .collect::>>() - .context("failed to collect mcp_writes rows")?; + log::trace!("[mcp_audit] list_writes preparing query"); + let mut stmt = match conn.prepare(&sql) { + Ok(stmt) => stmt, + Err(err) => { + log::warn!("[mcp_audit] list_writes prepare failed error={err}"); + return Err(anyhow::Error::new(err)) + .context("failed to prepare mcp_writes list query"); + } + }; + log::trace!("[mcp_audit] list_writes executing query"); + let mapped = match stmt.query_map(refs.as_slice(), row_to_record) { + Ok(mapped) => mapped, + Err(err) => { + log::warn!("[mcp_audit] list_writes query failed error={err}"); + return Err(anyhow::Error::new(err)).context("failed to query mcp_writes"); + } + }; + let rows = match mapped.collect::>>() { + Ok(rows) => rows, + Err(err) => { + log::warn!("[mcp_audit] list_writes collect failed error={err}"); + return Err(anyhow::Error::new(err)).context("failed to collect mcp_writes rows"); + } + }; + log::debug!("[mcp_audit] list_writes exit rows={}", rows.len()); Ok(rows) - }) + }); + if let Err(err) = &result { + log::warn!("[mcp_audit] list_writes failed error={err}"); + } + result } fn normalized_limit(limit: Option) -> Result { diff --git a/src/openhuman/mcp_server/tools.rs b/src/openhuman/mcp_server/tools.rs index 11144198b7..c16d417291 100644 --- a/src/openhuman/mcp_server/tools.rs +++ b/src/openhuman/mcp_server/tools.rs @@ -556,7 +556,20 @@ pub async fn call_tool( .ok_or_else(|| ToolCallError::InvalidParams(format!("unknown MCP tool `{name}`")))?; let audit_arguments = arguments.clone(); - let mut params = build_rpc_params(spec.name, arguments)?; + let mut params = match build_rpc_params(spec.name, arguments) { + Ok(params) => params, + Err(err) => { + if is_write_tool(spec.name) { + audit_write_rejection_without_config( + spec.name, + &audit_arguments, + client_info, + err.message(), + ); + } + return Err(err); + } + }; match spec.name { "core.list_tools" => { reject_unexpected_arguments(¶ms, &[])?; @@ -578,12 +591,33 @@ pub async fn call_tool( return run_subagent_tool(¶ms).await; } "memory.store" | "memory.note" | "tree.tag" => { - let config = enforce_write_policy(spec.name).await?; + let config = load_write_config(spec.name).await?; + if let Err(err) = enforce_write_policy_for_config(spec.name, &config) { + audit_write_rejection( + &config, + spec.name, + &audit_arguments, + Some(¶ms), + client_info, + &err, + ); + return Err(err); + } params.insert( "source_type".to_string(), Value::String(client_info.to_string()), ); - validate_controller_params(&spec, ¶ms)?; + if let Err(err) = validate_controller_params(&spec, ¶ms) { + audit_write_rejection( + &config, + spec.name, + &audit_arguments, + Some(¶ms), + client_info, + &err, + ); + return Err(err); + } return dispatch_write_tool(spec.name, ¶ms, &audit_arguments, client_info, &config) .await; } @@ -1155,11 +1189,8 @@ async fn enforce_act_policy(tool_name: &str) -> Result<(), ToolCallError> { .map_err(ToolCallError::InvalidParams) } -/// Write operations use the same gate as Act — they are side-effecting and -/// must not run in read-only mode. The separate function gives us a distinct -/// log line so auditors can tell reads from writes at a glance. -async fn enforce_write_policy(tool_name: &str) -> Result { - let config = match config_rpc::load_config_with_timeout().await { +async fn load_write_config(tool_name: &str) -> Result { + match config_rpc::load_config_with_timeout().await { Ok(config) => config, Err(err) => { log::warn!( @@ -1169,12 +1200,22 @@ async fn enforce_write_policy(tool_name: &str) -> Result "failed to load config: {err}" ))); } - }; + } +} + +fn enforce_write_policy_for_config(tool_name: &str, config: &Config) -> Result<(), ToolCallError> { let policy = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir); - policy - .enforce_tool_operation(ToolOperation::Act, tool_name) - .map_err(ToolCallError::InvalidParams)?; - Ok(config) + match policy.enforce_tool_operation(ToolOperation::Act, tool_name) { + Ok(()) => Ok(()), + Err(message) => { + log::debug!( + "[mcp_server] enforce_write_policy denied tool={} decision={}", + tool_name, + message + ); + Err(ToolCallError::InvalidParams(message)) + } + } } /// Dispatch a write tool to its underlying RPC method with provenance and @@ -1188,13 +1229,20 @@ async fn dispatch_write_tool( ) -> Result { let rpc_method = "openhuman.memory_doc_put"; - tracing::info!( + tracing::debug!( tool = tool_name, rpc_method = rpc_method, client = client_info, "[mcp_server] write dispatch" ); + tracing::trace!( + tool = tool_name, + rpc_method = rpc_method, + param_keys = ?params.keys().collect::>(), + "[mcp_server] write dispatch invoking rpc" + ); + match all::try_invoke_registered_rpc(rpc_method, params.clone()).await { Some(Ok(value)) => { let document_id = extract_document_id(&value); @@ -1210,7 +1258,7 @@ async fn dispatch_write_tool( error_message: None, }, ); - tracing::info!( + tracing::debug!( tool = tool_name, chunk_id = document_id.as_deref().unwrap_or(""), client = client_info, @@ -1263,11 +1311,120 @@ async fn dispatch_write_tool( } fn audit_write(config: &Config, record: NewMcpWriteRecord) { - if let Err(err) = mcp_audit::record_write(config, record) { - log::warn!("[mcp_server] mcp write audit insert failed: {err}"); + let config = config.clone(); + if let Ok(handle) = tokio::runtime::Handle::try_current() { + let _ = handle.spawn_blocking(move || { + if let Err(err) = mcp_audit::record_write(&config, record) { + log::warn!("[mcp_server] mcp write audit insert failed: {err}"); + } + }); + } else { + let _ = std::thread::spawn(move || { + if let Err(err) = mcp_audit::record_write(&config, record) { + log::warn!("[mcp_server] mcp write audit insert failed: {err}"); + } + }); } } +fn audit_write_rejection( + config: &Config, + tool_name: &str, + audit_arguments: &Value, + params: Option<&Map>, + client_info: &str, + err: &ToolCallError, +) { + log::debug!( + "[mcp_server] write rejected before dispatch tool={} client={} error={}", + tool_name, + client_info, + err.message() + ); + audit_write( + config, + NewMcpWriteRecord { + timestamp_ms: now_ms(), + client_info: client_info.to_string(), + tool_name: tool_name.to_string(), + args_summary: summarize_rejected_write_args(tool_name, audit_arguments, params), + resulting_chunk_id: None, + success: false, + error_message: Some(err.message().to_string()), + }, + ); +} + +fn audit_write_rejection_without_config( + tool_name: &str, + audit_arguments: &Value, + client_info: &str, + error_message: &str, +) { + log::debug!( + "[mcp_server] write rejected before config load tool={} client={} error={}", + tool_name, + client_info, + error_message + ); + + let tool_name = tool_name.to_string(); + let client_info = client_info.to_string(); + let error_message = error_message.to_string(); + let args_summary = summarize_write_args(&tool_name, audit_arguments); + match tokio::runtime::Handle::try_current() { + Ok(handle) => { + let _ = handle.spawn(async move { + match config_rpc::load_config_with_timeout().await { + Ok(config) => audit_write( + &config, + NewMcpWriteRecord { + timestamp_ms: now_ms(), + client_info, + tool_name, + args_summary, + resulting_chunk_id: None, + success: false, + error_message: Some(error_message), + }, + ), + Err(err) => log::warn!( + "[mcp_server] write rejection audit skipped tool={} config load failed error={}", + tool_name, + err + ), + } + }); + } + Err(err) => log::warn!( + "[mcp_server] write rejection audit skipped tool={} runtime unavailable error={}", + tool_name, + err + ), + } +} + +fn is_write_tool(tool_name: &str) -> bool { + matches!(tool_name, "memory.store" | "memory.note" | "tree.tag") +} + +fn summarize_rejected_write_args( + tool_name: &str, + audit_arguments: &Value, + params: Option<&Map>, +) -> Value { + let mut summary = summarize_write_args(tool_name, audit_arguments); + if let (Value::Object(summary), Some(params)) = (&mut summary, params) { + let mut param_keys = params.keys().cloned().collect::>(); + param_keys.sort(); + summary.insert( + "param_keys".to_string(), + Value::Array(param_keys.into_iter().map(Value::String).collect()), + ); + } + summary +} + fn now_ms() -> i64 { chrono::Utc::now().timestamp_millis() } @@ -2413,6 +2570,128 @@ mod tests { assert_eq!(summary["tags"], json!(["todo", "q3"])); } + #[test] + fn summarize_rejected_write_args_includes_param_keys_only() { + let mut params = Map::new(); + params.insert("content".into(), Value::String("private body".into())); + params.insert("source_type".into(), Value::String("mcp:test".into())); + params.insert("title".into(), Value::String("T".into())); + + let summary = summarize_rejected_write_args( + "memory.store", + &json!({ "title": "T", "content": "private body" }), + Some(¶ms), + ); + + assert_eq!( + summary["param_keys"], + json!(["content", "source_type", "title"]) + ); + assert!(summary.get("content").is_none()); + } + + #[test] + fn write_policy_logs_and_returns_denial() { + let tmp = tempfile::TempDir::new().unwrap(); + let mut config = Config::default(); + config.workspace_dir = tmp.path().join("workspace"); + config.autonomy.level = crate::openhuman::security::AutonomyLevel::ReadOnly; + + let err = enforce_write_policy_for_config("memory.store", &config) + .expect_err("read-only mode should deny writes"); + assert!(err.message().contains("read-only mode")); + } + + #[tokio::test] + async fn audit_write_rejection_records_failure_row() { + let tmp = tempfile::TempDir::new().unwrap(); + let mut config = Config::default(); + config.workspace_dir = tmp.path().join("workspace"); + std::fs::create_dir_all(&config.workspace_dir).unwrap(); + + let err = ToolCallError::InvalidParams("bad write request".into()); + audit_write_rejection( + &config, + "memory.store", + &json!({ "title": "T", "content": "private body" }), + None, + "mcp:test", + &err, + ); + + let mut rows = Vec::new(); + for _ in 0..50 { + rows = crate::openhuman::mcp_audit::list_writes( + &config, + &crate::openhuman::mcp_audit::McpWriteListQuery::default(), + ) + .expect("list writes"); + if rows.len() == 1 { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + + assert_eq!(rows.len(), 1); + assert!(!rows[0].success); + assert_eq!(rows[0].tool_name, "memory.store"); + assert_eq!(rows[0].client_info, "mcp:test"); + assert_eq!(rows[0].error_message.as_deref(), Some("bad write request")); + assert!(rows[0].args_summary.get("content").is_none()); + } + + #[tokio::test] + async fn call_tool_records_write_argument_rejection() { + let _env_lock = crate::openhuman::config::TEST_ENV_LOCK + .lock() + .unwrap_or_else(|err| err.into_inner()); + let tmp = tempfile::tempdir().expect("tempdir"); + unsafe { + std::env::set_var("OPENHUMAN_WORKSPACE", tmp.path()); + } + let config = config_rpc::load_config_with_timeout() + .await + .expect("config"); + + let err = call_tool("memory.store", json!({ "title": "T" }), "mcp:test") + .await + .expect_err("missing content should reject"); + assert!( + err.message() + .contains("missing required argument `content`"), + "got: {}", + err.message() + ); + + let mut rows = Vec::new(); + for _ in 0..50 { + rows = crate::openhuman::mcp_audit::list_writes( + &config, + &crate::openhuman::mcp_audit::McpWriteListQuery::default(), + ) + .expect("list writes"); + if rows.len() == 1 { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + + assert_eq!(rows.len(), 1); + assert!(!rows[0].success); + assert_eq!(rows[0].tool_name, "memory.store"); + assert_eq!(rows[0].client_info, "mcp:test"); + assert!(rows[0] + .error_message + .as_deref() + .unwrap_or_default() + .contains("missing required argument `content`")); + assert!(rows[0].args_summary.get("content").is_none()); + + unsafe { + std::env::remove_var("OPENHUMAN_WORKSPACE"); + } + } + #[test] fn extract_document_id_reads_rpc_outcome_envelope() { assert_eq!( From 503097182076267f4975fde28122bf3ca933f5e8 Mon Sep 17 00:00:00 2001 From: MackJack023 <141124084+MackJack023@users.noreply.github.com> Date: Mon, 25 May 2026 13:31:53 +0800 Subject: [PATCH 3/5] fix(mcp): harden audit error message storage --- src/openhuman/mcp_audit/store.rs | 41 ++++++++++++++++++++++++++++--- src/openhuman/mcp_server/tools.rs | 6 ++--- 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/src/openhuman/mcp_audit/store.rs b/src/openhuman/mcp_audit/store.rs index bd31e1fe36..91db3f3f4f 100644 --- a/src/openhuman/mcp_audit/store.rs +++ b/src/openhuman/mcp_audit/store.rs @@ -3,12 +3,13 @@ use rusqlite::{params, types::Type, Row, ToSql}; use serde_json::Value; use crate::openhuman::config::Config; -use crate::openhuman::memory_tree; +use crate::openhuman::memory_store::chunks::store as chunk_store; use super::types::{McpWriteListQuery, McpWriteRecord, NewMcpWriteRecord}; const DEFAULT_LIST_LIMIT: u64 = 50; const MAX_LIST_LIMIT: u64 = 500; +const ERROR_MESSAGE_MAX_BYTES: usize = 1024; pub fn record_write(config: &Config, record: NewMcpWriteRecord) -> Result { log::debug!( @@ -38,7 +39,8 @@ pub fn record_write(config: &Config, record: NewMcpWriteRecord) -> Result { .context("failed to serialize mcp write args_summary"); } }; - let result = memory_tree::store::with_connection(config, |conn| { + let error_message = truncate_error_message(record.error_message.as_deref()); + let result = chunk_store::with_connection(config, |conn| { log::trace!( "[mcp_audit] record_write inserting row tool={} client={} timestamp_ms={}", record.tool_name, @@ -62,7 +64,7 @@ pub fn record_write(config: &Config, record: NewMcpWriteRecord) -> Result { &args_summary, record.resulting_chunk_id.as_deref(), if record.success { 1_i64 } else { 0_i64 }, - record.error_message.as_deref(), + error_message.as_deref(), ], ) { Ok(_) => {} @@ -105,7 +107,7 @@ pub fn list_writes(config: &Config, query: &McpWriteListQuery) -> Result) -> Option<&str> { value.map(str::trim).filter(|value| !value.is_empty()) } +fn truncate_error_message(message: Option<&str>) -> Option { + let message = message?; + if message.len() <= ERROR_MESSAGE_MAX_BYTES { + return Some(message.to_string()); + } + + let mut end = ERROR_MESSAGE_MAX_BYTES; + while end > 0 && !message.is_char_boundary(end) { + end -= 1; + } + Some(message[..end].to_string()) +} + fn row_to_record(row: &Row<'_>) -> rusqlite::Result { let args_summary_text: Option = row.get(4)?; let args_summary = match args_summary_text { @@ -284,6 +299,24 @@ mod tests { assert_eq!(rows[1].resulting_chunk_id.as_deref(), Some("chunk-100")); } + #[test] + fn record_handles_multibyte_error_truncation_safely() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + let symbol = "\u{1F980}"; + let symbol_char = symbol.chars().next().unwrap(); + let mut failed = record(100, "mcp:claude-desktop", "memory.store", false); + failed.error_message = Some(symbol.repeat((ERROR_MESSAGE_MAX_BYTES / symbol.len()) + 2)); + + record_write(&config, failed).unwrap(); + + let rows = list_writes(&config, &McpWriteListQuery::default()).unwrap(); + let stored = rows[0].error_message.as_deref().expect("error message"); + assert!(stored.len() <= ERROR_MESSAGE_MAX_BYTES); + assert!(stored.is_char_boundary(stored.len())); + assert!(stored.chars().all(|ch| ch == symbol_char)); + } + #[test] fn list_writes_filters_by_client_tool_since_and_success() { let tmp = TempDir::new().unwrap(); diff --git a/src/openhuman/mcp_server/tools.rs b/src/openhuman/mcp_server/tools.rs index c16d417291..043e61ded2 100644 --- a/src/openhuman/mcp_server/tools.rs +++ b/src/openhuman/mcp_server/tools.rs @@ -1191,14 +1191,14 @@ async fn enforce_act_policy(tool_name: &str) -> Result<(), ToolCallError> { async fn load_write_config(tool_name: &str) -> Result { match config_rpc::load_config_with_timeout().await { - Ok(config) => config, + Ok(config) => Ok(config), Err(err) => { log::warn!( "[mcp_server] enforce_write_policy config load failed tool={tool_name} error={err}" ); - return Err(ToolCallError::Internal(format!( + Err(ToolCallError::Internal(format!( "failed to load config: {err}" - ))); + ))) } } } From 52dfff43011a77ddecd4c023a2ebce46d87461b8 Mon Sep 17 00:00:00 2001 From: MackJack023 <141124084+MackJack023@users.noreply.github.com> Date: Mon, 25 May 2026 13:33:59 +0800 Subject: [PATCH 4/5] docs(mcp): clarify audit write attempt records --- src/openhuman/mcp_audit/schemas.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/openhuman/mcp_audit/schemas.rs b/src/openhuman/mcp_audit/schemas.rs index b4fc1f8ce5..c666365075 100644 --- a/src/openhuman/mcp_audit/schemas.rs +++ b/src/openhuman/mcp_audit/schemas.rs @@ -33,7 +33,7 @@ fn schema() -> ControllerSchema { ControllerSchema { namespace: "mcp_audit", function: "list", - description: "List MCP write-tool audit records from local workspace persistence.", + description: "List MCP write-tool audit records, including successful writes and rejected or failed write attempts, from local workspace persistence.", inputs: vec![ FieldSchema { name: "limit", @@ -68,14 +68,14 @@ fn schema() -> ControllerSchema { FieldSchema { name: "success_only", ty: TypeSchema::Option(Box::new(TypeSchema::Bool)), - comment: "When true, only return successful writes.", + comment: "When true, only return rows where the write attempt succeeded.", required: false, }, ], outputs: vec![FieldSchema { name: "records", ty: TypeSchema::Array(Box::new(TypeSchema::Ref("McpWriteRecord"))), - comment: "MCP write audit records ordered by timestamp descending.", + comment: "MCP write attempt audit records ordered by timestamp descending.", required: true, }], } From fda912ae263b939789dafc7e20c2af4253d3e4c3 Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Sun, 24 May 2026 22:55:47 -0700 Subject: [PATCH 5/5] fix(mcp_audit): wrap handle_list response in {"records": [...]} to match schema The ControllerSchema outputs define a "records" field, but handle_list was returning a bare JSON array. Clients relying on the schema contract would fail when accessing response["records"]. Updated the test assertion to match. Addresses @coderabbitai on schemas.rs:131-137 and schemas.rs:193-197. --- src/openhuman/mcp_audit/schemas.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/openhuman/mcp_audit/schemas.rs b/src/openhuman/mcp_audit/schemas.rs index c666365075..392c17473c 100644 --- a/src/openhuman/mcp_audit/schemas.rs +++ b/src/openhuman/mcp_audit/schemas.rs @@ -129,12 +129,12 @@ fn handle_list(params: Map) -> ControllerFuture { }; let count = records.len(); - let value = serde_json::to_value(records).map_err(|err| { + let records_value = serde_json::to_value(records).map_err(|err| { log::warn!("[mcp_audit] handle_list serialize response failed error={err}"); err.to_string() })?; log::debug!("[mcp_audit] handle_list exit records={count}"); - Ok(value) + Ok(serde_json::json!({ "records": records_value })) }) } @@ -191,7 +191,7 @@ mod tests { .expect("record write"); let value = handle_list(Map::new()).await.expect("handle list"); - let records = value.as_array().expect("records array"); + let records = value["records"].as_array().expect("records array"); assert_eq!(records.len(), 1); assert_eq!(records[0]["tool_name"], "memory.store"); assert_eq!(records[0]["client_info"], "mcp:test");