diff --git a/src/core/all.rs b/src/core/all.rs index 40ab60fa2c..c43faa68e7 100644 --- a/src/core/all.rs +++ b/src/core/all.rs @@ -268,6 +268,9 @@ fn build_internal_only_controllers() -> Vec { // whatsapp_data ingest: scanner-side write path. Callable over RPC by the // Tauri scanner but excluded from agent-facing schema discovery. controllers.extend(crate::openhuman::whatsapp_data::all_whatsapp_data_internal_controllers()); + // MCP write audit list: internal-only so the desktop UI/CLI can inspect + // local write history without exposing cross-client history as an MCP tool. + controllers.extend(crate::openhuman::mcp_audit::all_mcp_audit_internal_controllers()); controllers } diff --git a/src/core/all_tests.rs b/src/core/all_tests.rs index 140a037e43..3da14fb623 100644 --- a/src/core/all_tests.rs +++ b/src/core/all_tests.rs @@ -205,6 +205,26 @@ fn schema_for_rpc_method_finds_security_policy_info() { assert_eq!(s.function, "policy_info"); } +#[test] +fn schema_for_rpc_method_finds_internal_mcp_audit_list() { + let schema = schema_for_rpc_method("openhuman.mcp_audit_list"); + assert!( + schema.is_some(), + "mcp_audit.list should be internally routable" + ); + let s = schema.unwrap(); + assert_eq!(s.namespace, "mcp_audit"); + assert_eq!(s.function, "list"); +} + +#[test] +fn rpc_method_from_parts_does_not_expose_internal_mcp_audit_list() { + assert!( + rpc_method_from_parts("mcp_audit", "list").is_none(), + "internal MCP audit RPC must not appear in the public controller registry" + ); +} + #[test] fn schema_for_rpc_method_returns_none_for_unknown() { assert!(schema_for_rpc_method("openhuman.nonexistent_method_xyz").is_none()); diff --git a/src/openhuman/mcp_audit/mod.rs b/src/openhuman/mcp_audit/mod.rs new file mode 100644 index 0000000000..648d618752 --- /dev/null +++ b/src/openhuman/mcp_audit/mod.rs @@ -0,0 +1,17 @@ +//! Persistent audit log for MCP write-tool calls. +//! +//! The audit table is stored in the existing memory-tree SQLite database so +//! writes and their query surface reuse the same local workspace persistence. + +mod schemas; +pub mod store; +pub mod types; + +pub use schemas::{ + all_controller_schemas as all_mcp_audit_controller_schemas, + all_internal_controllers as all_mcp_audit_internal_controllers, + all_registered_controllers as all_mcp_audit_registered_controllers, + schemas as mcp_audit_schemas, +}; +pub use store::{list_writes, record_write}; +pub use types::{McpWriteListQuery, McpWriteRecord, NewMcpWriteRecord}; diff --git a/src/openhuman/mcp_audit/schemas.rs b/src/openhuman/mcp_audit/schemas.rs new file mode 100644 index 0000000000..392c17473c --- /dev/null +++ b/src/openhuman/mcp_audit/schemas.rs @@ -0,0 +1,203 @@ +use serde_json::{Map, Value}; + +use crate::core::all::{ControllerFuture, RegisteredController}; +use crate::core::{ControllerSchema, FieldSchema, TypeSchema}; +use crate::openhuman::config::rpc as config_rpc; + +use super::store; +use super::types::McpWriteListQuery; + +pub fn schemas(function: &str) -> ControllerSchema { + match function { + "list" => schema(), + other => panic!("unknown mcp_audit controller schema `{other}`"), + } +} + +pub fn all_controller_schemas() -> Vec { + vec![schemas("list")] +} + +pub fn all_registered_controllers() -> Vec { + all_internal_controllers() +} + +pub fn all_internal_controllers() -> Vec { + vec![RegisteredController { + schema: schemas("list"), + handler: handle_list, + }] +} + +fn schema() -> ControllerSchema { + ControllerSchema { + namespace: "mcp_audit", + function: "list", + description: "List MCP write-tool audit records, including successful writes and rejected or failed write attempts, from local workspace persistence.", + inputs: vec![ + FieldSchema { + name: "limit", + ty: TypeSchema::Option(Box::new(TypeSchema::U64)), + comment: "Maximum number of rows to return (default 50, max 500).", + required: false, + }, + FieldSchema { + name: "offset", + ty: TypeSchema::Option(Box::new(TypeSchema::U64)), + comment: "Number of rows to skip from the newest-first result set.", + required: false, + }, + FieldSchema { + name: "since_ms", + ty: TypeSchema::Option(Box::new(TypeSchema::U64)), + comment: "Only return rows at or after this Unix timestamp in milliseconds.", + required: false, + }, + FieldSchema { + name: "client_filter", + ty: TypeSchema::Option(Box::new(TypeSchema::String)), + comment: "Exact client_info filter, for example `mcp:claude-desktop`.", + required: false, + }, + FieldSchema { + name: "tool_filter", + ty: TypeSchema::Option(Box::new(TypeSchema::String)), + comment: "Exact tool_name filter, for example `memory.store`.", + required: false, + }, + FieldSchema { + name: "success_only", + ty: TypeSchema::Option(Box::new(TypeSchema::Bool)), + comment: "When true, only return rows where the write attempt succeeded.", + required: false, + }, + ], + outputs: vec![FieldSchema { + name: "records", + ty: TypeSchema::Array(Box::new(TypeSchema::Ref("McpWriteRecord"))), + comment: "MCP write attempt audit records ordered by timestamp descending.", + required: true, + }], + } +} + +fn handle_list(params: Map) -> ControllerFuture { + Box::pin(async move { + log::debug!("[mcp_audit] handle_list enter params={params:?}"); + log::trace!("[mcp_audit] handle_list loading config"); + let config = match config_rpc::load_config_with_timeout().await { + Ok(config) => { + log::trace!( + "[mcp_audit] handle_list config loaded workspace={}", + config.workspace_dir.display() + ); + config + } + Err(err) => { + log::warn!("[mcp_audit] handle_list config load failed error={err}"); + return Err(err); + } + }; + + let query = match serde_json::from_value::(Value::Object(params)) { + Ok(query) => { + log::trace!("[mcp_audit] handle_list parsed query={query:?}"); + query + } + Err(err) => { + log::warn!("[mcp_audit] handle_list invalid params error={err}"); + return Err(format!("invalid params: {err}")); + } + }; + + log::trace!( + "[mcp_audit] handle_list querying store workspace={} query={query:?}", + config.workspace_dir.display() + ); + let records = match store::list_writes(&config, &query) { + Ok(records) => { + log::trace!( + "[mcp_audit] handle_list store success records={}", + records.len() + ); + records + } + Err(err) => { + log::warn!("[mcp_audit] handle_list store failed query={query:?} error={err}"); + return Err(err.to_string()); + } + }; + + let count = records.len(); + let records_value = serde_json::to_value(records).map_err(|err| { + log::warn!("[mcp_audit] handle_list serialize response failed error={err}"); + err.to_string() + })?; + log::debug!("[mcp_audit] handle_list exit records={count}"); + Ok(serde_json::json!({ "records": records_value })) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn internal_controller_registers_expected_rpc_name() { + let controllers = all_internal_controllers(); + assert_eq!(controllers.len(), 1); + assert_eq!(controllers[0].schema.namespace, "mcp_audit"); + assert_eq!(controllers[0].schema.function, "list"); + assert_eq!(controllers[0].rpc_method_name(), "openhuman.mcp_audit_list"); + } + + #[test] + fn domain_schema_exports_match_internal_controller() { + let schemas = all_controller_schemas(); + let controllers = all_registered_controllers(); + + assert_eq!(schemas.len(), 1); + assert_eq!(schemas[0].namespace, "mcp_audit"); + assert_eq!(controllers.len(), 1); + assert_eq!(controllers[0].schema.function, schemas[0].function); + } + + #[tokio::test] + async fn handle_list_returns_persisted_audit_records() { + let _env_lock = crate::openhuman::config::TEST_ENV_LOCK + .lock() + .unwrap_or_else(|err| err.into_inner()); + let tmp = tempfile::tempdir().expect("tempdir"); + unsafe { + std::env::set_var("OPENHUMAN_WORKSPACE", tmp.path()); + } + + let config = config_rpc::load_config_with_timeout() + .await + .expect("config"); + store::record_write( + &config, + crate::openhuman::mcp_audit::NewMcpWriteRecord { + timestamp_ms: 10, + client_info: "mcp:test".into(), + tool_name: "memory.store".into(), + args_summary: json!({ "title": "safe" }), + resulting_chunk_id: Some("chunk-1".into()), + success: true, + error_message: None, + }, + ) + .expect("record write"); + + let value = handle_list(Map::new()).await.expect("handle list"); + let records = value["records"].as_array().expect("records array"); + assert_eq!(records.len(), 1); + assert_eq!(records[0]["tool_name"], "memory.store"); + assert_eq!(records[0]["client_info"], "mcp:test"); + + unsafe { + std::env::remove_var("OPENHUMAN_WORKSPACE"); + } + } +} diff --git a/src/openhuman/mcp_audit/store.rs b/src/openhuman/mcp_audit/store.rs new file mode 100644 index 0000000000..91db3f3f4f --- /dev/null +++ b/src/openhuman/mcp_audit/store.rs @@ -0,0 +1,428 @@ +use anyhow::{Context, Result}; +use rusqlite::{params, types::Type, Row, ToSql}; +use serde_json::Value; + +use crate::openhuman::config::Config; +use crate::openhuman::memory_store::chunks::store as chunk_store; + +use super::types::{McpWriteListQuery, McpWriteRecord, NewMcpWriteRecord}; + +const DEFAULT_LIST_LIMIT: u64 = 50; +const MAX_LIST_LIMIT: u64 = 500; +const ERROR_MESSAGE_MAX_BYTES: usize = 1024; + +pub fn record_write(config: &Config, record: NewMcpWriteRecord) -> Result { + log::debug!( + "[mcp_audit] record_write enter tool={} client={} timestamp_ms={} success={} has_error={}", + record.tool_name, + record.client_info, + record.timestamp_ms, + record.success, + record.error_message.is_some() + ); + let args_summary = match serde_json::to_string(&record.args_summary) { + Ok(args_summary) => { + log::trace!( + "[mcp_audit] record_write args_summary serialized tool={} bytes={}", + record.tool_name, + args_summary.len() + ); + args_summary + } + Err(err) => { + log::warn!( + "[mcp_audit] record_write args_summary serialize failed tool={} client={} error={err}", + record.tool_name, + record.client_info + ); + return Err(anyhow::Error::new(err)) + .context("failed to serialize mcp write args_summary"); + } + }; + let error_message = truncate_error_message(record.error_message.as_deref()); + let result = chunk_store::with_connection(config, |conn| { + log::trace!( + "[mcp_audit] record_write inserting row tool={} client={} timestamp_ms={}", + record.tool_name, + record.client_info, + record.timestamp_ms + ); + match conn.execute( + "INSERT INTO mcp_writes ( + timestamp_ms, + client_info, + tool_name, + args_summary, + resulting_chunk_id, + success, + error_message + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", + params![ + record.timestamp_ms, + &record.client_info, + &record.tool_name, + &args_summary, + record.resulting_chunk_id.as_deref(), + if record.success { 1_i64 } else { 0_i64 }, + error_message.as_deref(), + ], + ) { + Ok(_) => {} + Err(err) => { + log::warn!( + "[mcp_audit] record_write insert failed tool={} client={} timestamp_ms={} error={err}", + record.tool_name, + record.client_info, + record.timestamp_ms + ); + return Err(anyhow::Error::new(err)) + .context("failed to insert mcp_writes audit row"); + } + } + Ok(conn.last_insert_rowid()) + }); + match &result { + Ok(row_id) => log::debug!( + "[mcp_audit] record_write exit row_id={} tool={} client={}", + row_id, + record.tool_name, + record.client_info + ), + Err(err) => log::warn!( + "[mcp_audit] record_write failed tool={} client={} error={err}", + record.tool_name, + record.client_info + ), + } + result +} + +pub fn list_writes(config: &Config, query: &McpWriteListQuery) -> Result> { + log::debug!( + "[mcp_audit] list_writes enter since_ms={:?} limit={:?} offset={:?} client_filter={:?} tool_filter={:?} success_only={:?}", + query.since_ms, + query.limit, + query.offset, + query.client_filter, + query.tool_filter, + query.success_only + ); + let result = chunk_store::with_connection(config, |conn| { + let mut sql = String::from( + "SELECT + id, + timestamp_ms, + client_info, + tool_name, + args_summary, + resulting_chunk_id, + success, + error_message + FROM mcp_writes + WHERE 1=1", + ); + let mut bound: Vec> = Vec::new(); + + if let Some(since_ms) = query.since_ms { + sql.push_str(" AND timestamp_ms >= ?"); + bound.push(Box::new(u64_to_i64(since_ms, "since_ms")?)); + log::trace!("[mcp_audit] list_writes applied since_ms filter={since_ms}"); + } + if let Some(client) = normalized_filter(query.client_filter.as_deref()) { + sql.push_str(" AND client_info = ?"); + bound.push(Box::new(client.to_string())); + log::trace!("[mcp_audit] list_writes applied client filter={client}"); + } + if let Some(tool) = normalized_filter(query.tool_filter.as_deref()) { + sql.push_str(" AND tool_name = ?"); + bound.push(Box::new(tool.to_string())); + log::trace!("[mcp_audit] list_writes applied tool filter={tool}"); + } + if query.success_only.unwrap_or(false) { + sql.push_str(" AND success = 1"); + log::trace!("[mcp_audit] list_writes applied success_only filter"); + } + + sql.push_str(" ORDER BY timestamp_ms DESC, id DESC LIMIT ? OFFSET ?"); + let limit = normalized_limit(query.limit)?; + let offset = normalized_offset(query.offset)?; + bound.push(Box::new(limit)); + bound.push(Box::new(offset)); + log::trace!( + "[mcp_audit] list_writes sql={} bound_count={} limit={} offset={}", + sql, + bound.len(), + limit, + offset + ); + + let refs = bound + .iter() + .map(|value| value.as_ref() as &dyn ToSql) + .collect::>(); + log::trace!("[mcp_audit] list_writes preparing query"); + let mut stmt = match conn.prepare(&sql) { + Ok(stmt) => stmt, + Err(err) => { + log::warn!("[mcp_audit] list_writes prepare failed error={err}"); + return Err(anyhow::Error::new(err)) + .context("failed to prepare mcp_writes list query"); + } + }; + log::trace!("[mcp_audit] list_writes executing query"); + let mapped = match stmt.query_map(refs.as_slice(), row_to_record) { + Ok(mapped) => mapped, + Err(err) => { + log::warn!("[mcp_audit] list_writes query failed error={err}"); + return Err(anyhow::Error::new(err)).context("failed to query mcp_writes"); + } + }; + let rows = match mapped.collect::>>() { + Ok(rows) => rows, + Err(err) => { + log::warn!("[mcp_audit] list_writes collect failed error={err}"); + return Err(anyhow::Error::new(err)).context("failed to collect mcp_writes rows"); + } + }; + log::debug!("[mcp_audit] list_writes exit rows={}", rows.len()); + Ok(rows) + }); + if let Err(err) = &result { + log::warn!("[mcp_audit] list_writes failed error={err}"); + } + result +} + +fn normalized_limit(limit: Option) -> Result { + u64_to_i64( + limit.unwrap_or(DEFAULT_LIST_LIMIT).min(MAX_LIST_LIMIT), + "limit", + ) +} + +fn normalized_offset(offset: Option) -> Result { + u64_to_i64(offset.unwrap_or(0), "offset") +} + +fn u64_to_i64(value: u64, field: &str) -> Result { + i64::try_from(value).with_context(|| format!("{field} is too large for SQLite INTEGER")) +} + +fn normalized_filter(value: Option<&str>) -> Option<&str> { + value.map(str::trim).filter(|value| !value.is_empty()) +} + +fn truncate_error_message(message: Option<&str>) -> Option { + let message = message?; + if message.len() <= ERROR_MESSAGE_MAX_BYTES { + return Some(message.to_string()); + } + + let mut end = ERROR_MESSAGE_MAX_BYTES; + while end > 0 && !message.is_char_boundary(end) { + end -= 1; + } + Some(message[..end].to_string()) +} + +fn row_to_record(row: &Row<'_>) -> rusqlite::Result { + let args_summary_text: Option = row.get(4)?; + let args_summary = match args_summary_text { + Some(text) => serde_json::from_str::(&text).map_err(|err| { + rusqlite::Error::FromSqlConversionFailure(4, Type::Text, Box::new(err)) + })?, + None => Value::Null, + }; + let success: i64 = row.get(6)?; + Ok(McpWriteRecord { + id: row.get(0)?, + timestamp_ms: row.get(1)?, + client_info: row.get(2)?, + tool_name: row.get(3)?, + args_summary, + resulting_chunk_id: row.get(5)?, + success: success != 0, + error_message: row.get(7)?, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + use tempfile::TempDir; + + fn test_config(tmp: &TempDir) -> Config { + let mut config = Config::default(); + config.workspace_dir = tmp.path().join("workspace"); + std::fs::create_dir_all(&config.workspace_dir).unwrap(); + config + } + + fn record( + timestamp_ms: i64, + client_info: &str, + tool_name: &str, + success: bool, + ) -> NewMcpWriteRecord { + NewMcpWriteRecord { + timestamp_ms, + client_info: client_info.to_string(), + tool_name: tool_name.to_string(), + args_summary: json!({ "title": format!("record-{timestamp_ms}") }), + resulting_chunk_id: success.then(|| format!("chunk-{timestamp_ms}")), + success, + error_message: (!success).then(|| "write failed".to_string()), + } + } + + #[test] + fn record_write_inserts_success_and_failure_rows() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + + let success_id = record_write( + &config, + record(100, "mcp:claude-desktop", "memory.store", true), + ) + .unwrap(); + let failure_id = + record_write(&config, record(200, "mcp:cursor", "tree.tag", false)).unwrap(); + + let rows = list_writes(&config, &McpWriteListQuery::default()).unwrap(); + assert_eq!(rows.len(), 2); + assert_eq!(rows[0].id, failure_id); + assert!(!rows[0].success); + assert_eq!(rows[0].error_message.as_deref(), Some("write failed")); + assert_eq!(rows[1].id, success_id); + assert!(rows[1].success); + assert_eq!(rows[1].resulting_chunk_id.as_deref(), Some("chunk-100")); + } + + #[test] + fn record_handles_multibyte_error_truncation_safely() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + let symbol = "\u{1F980}"; + let symbol_char = symbol.chars().next().unwrap(); + let mut failed = record(100, "mcp:claude-desktop", "memory.store", false); + failed.error_message = Some(symbol.repeat((ERROR_MESSAGE_MAX_BYTES / symbol.len()) + 2)); + + record_write(&config, failed).unwrap(); + + let rows = list_writes(&config, &McpWriteListQuery::default()).unwrap(); + let stored = rows[0].error_message.as_deref().expect("error message"); + assert!(stored.len() <= ERROR_MESSAGE_MAX_BYTES); + assert!(stored.is_char_boundary(stored.len())); + assert!(stored.chars().all(|ch| ch == symbol_char)); + } + + #[test] + fn list_writes_filters_by_client_tool_since_and_success() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + for row in [ + record(100, "mcp:claude-desktop", "memory.store", true), + record(200, "mcp:cursor", "memory.note", false), + record(300, "mcp:claude-desktop", "tree.tag", true), + record(400, "mcp:cursor", "tree.tag", true), + ] { + record_write(&config, row).unwrap(); + } + + let by_client = list_writes( + &config, + &McpWriteListQuery { + client_filter: Some("mcp:claude-desktop".into()), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!(by_client.len(), 2); + assert!(by_client + .iter() + .all(|row| row.client_info == "mcp:claude-desktop")); + + let by_tool = list_writes( + &config, + &McpWriteListQuery { + tool_filter: Some("tree.tag".into()), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!( + by_tool + .iter() + .map(|row| row.timestamp_ms) + .collect::>(), + vec![400, 300] + ); + + let since = list_writes( + &config, + &McpWriteListQuery { + since_ms: Some(250), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!( + since.iter().map(|row| row.timestamp_ms).collect::>(), + vec![400, 300] + ); + + let success_only = list_writes( + &config, + &McpWriteListQuery { + success_only: Some(true), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!(success_only.len(), 3); + assert!(success_only.iter().all(|row| row.success)); + } + + #[test] + fn list_writes_orders_newest_first_and_supports_limit_offset() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + for ts in [100, 200, 300, 400] { + record_write(&config, record(ts, "mcp", "memory.store", true)).unwrap(); + } + + let rows = list_writes( + &config, + &McpWriteListQuery { + limit: Some(2), + offset: Some(1), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!( + rows.iter().map(|row| row.timestamp_ms).collect::>(), + vec![300, 200] + ); + } + + #[test] + fn list_writes_caps_limit_at_max() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + for ts in 0..505 { + record_write(&config, record(ts, "mcp", "memory.store", true)).unwrap(); + } + + let rows = list_writes( + &config, + &McpWriteListQuery { + limit: Some(MAX_LIST_LIMIT + 100), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!(rows.len(), MAX_LIST_LIMIT as usize); + } +} diff --git a/src/openhuman/mcp_audit/types.rs b/src/openhuman/mcp_audit/types.rs new file mode 100644 index 0000000000..94ff14d6c0 --- /dev/null +++ b/src/openhuman/mcp_audit/types.rs @@ -0,0 +1,41 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct NewMcpWriteRecord { + pub timestamp_ms: i64, + pub client_info: String, + pub tool_name: String, + pub args_summary: Value, + pub resulting_chunk_id: Option, + pub success: bool, + pub error_message: Option, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct McpWriteRecord { + pub id: i64, + pub timestamp_ms: i64, + pub client_info: String, + pub tool_name: String, + pub args_summary: Value, + pub resulting_chunk_id: Option, + pub success: bool, + pub error_message: Option, +} + +#[derive(Debug, Clone, Default, PartialEq, Eq, Deserialize)] +pub struct McpWriteListQuery { + #[serde(default)] + pub limit: Option, + #[serde(default)] + pub offset: Option, + #[serde(default)] + pub since_ms: Option, + #[serde(default)] + pub client_filter: Option, + #[serde(default)] + pub tool_filter: Option, + #[serde(default)] + pub success_only: Option, +} diff --git a/src/openhuman/mcp_server/protocol.rs b/src/openhuman/mcp_server/protocol.rs index 14158c85d7..d87393e436 100644 --- a/src/openhuman/mcp_server/protocol.rs +++ b/src/openhuman/mcp_server/protocol.rs @@ -174,7 +174,7 @@ async fn handle_request(id: Value, method: &str, params: Value, session: &mut Mc session.source_type(), object_keys(&arguments) ); - match tools::call_tool(&name, arguments).await { + match tools::call_tool(&name, arguments, session.source_type()).await { Ok(result) => { log::debug!( "[mcp_server] tools/call response id={} tool={} client_source_type={} is_error={}", diff --git a/src/openhuman/mcp_server/tools.rs b/src/openhuman/mcp_server/tools.rs index ee417d27df..043e61ded2 100644 --- a/src/openhuman/mcp_server/tools.rs +++ b/src/openhuman/mcp_server/tools.rs @@ -4,8 +4,10 @@ use crate::core::all; use crate::openhuman::agent::harness::AgentDefinitionRegistry; use crate::openhuman::agent::Agent; use crate::openhuman::config::rpc as config_rpc; +use crate::openhuman::config::Config; use crate::openhuman::inference::provider::traits::build_tool_instructions_text; use crate::openhuman::integrations::searxng::MAX_RESULTS as SEARXNG_MAX_RESULTS; +use crate::openhuman::mcp_audit::{self, NewMcpWriteRecord}; use crate::openhuman::security::{SecurityPolicy, ToolOperation}; const DEFAULT_LIMIT: u64 = 10; @@ -543,13 +545,31 @@ fn list_tools_result_from_specs(specs: Vec) -> Value { json!({ "tools": tools }) } -pub async fn call_tool(name: &str, arguments: Value) -> Result { +pub async fn call_tool( + name: &str, + arguments: Value, + client_info: &str, +) -> Result { let spec = tool_specs() .into_iter() .find(|tool| tool.name == name) .ok_or_else(|| ToolCallError::InvalidParams(format!("unknown MCP tool `{name}`")))?; - let params = build_rpc_params(spec.name, arguments)?; + let audit_arguments = arguments.clone(); + let mut params = match build_rpc_params(spec.name, arguments) { + Ok(params) => params, + Err(err) => { + if is_write_tool(spec.name) { + audit_write_rejection_without_config( + spec.name, + &audit_arguments, + client_info, + err.message(), + ); + } + return Err(err); + } + }; match spec.name { "core.list_tools" => { reject_unexpected_arguments(¶ms, &[])?; @@ -571,9 +591,35 @@ pub async fn call_tool(name: &str, arguments: Value) -> Result { - enforce_write_policy(spec.name).await?; - validate_controller_params(&spec, ¶ms)?; - return dispatch_write_tool(spec.name, ¶ms).await; + let config = load_write_config(spec.name).await?; + if let Err(err) = enforce_write_policy_for_config(spec.name, &config) { + audit_write_rejection( + &config, + spec.name, + &audit_arguments, + Some(¶ms), + client_info, + &err, + ); + return Err(err); + } + params.insert( + "source_type".to_string(), + Value::String(client_info.to_string()), + ); + if let Err(err) = validate_controller_params(&spec, ¶ms) { + audit_write_rejection( + &config, + spec.name, + &audit_arguments, + Some(¶ms), + client_info, + &err, + ); + return Err(err); + } + return dispatch_write_tool(spec.name, ¶ms, &audit_arguments, client_info, &config) + .await; } _ => {} } @@ -1143,25 +1189,33 @@ async fn enforce_act_policy(tool_name: &str) -> Result<(), ToolCallError> { .map_err(ToolCallError::InvalidParams) } -/// Write operations use the same gate as Act — they are side-effecting and -/// must not run in read-only mode. The separate function gives us a distinct -/// log line so auditors can tell reads from writes at a glance. -async fn enforce_write_policy(tool_name: &str) -> Result<(), ToolCallError> { - let config = match config_rpc::load_config_with_timeout().await { - Ok(config) => config, +async fn load_write_config(tool_name: &str) -> Result { + match config_rpc::load_config_with_timeout().await { + Ok(config) => Ok(config), Err(err) => { log::warn!( "[mcp_server] enforce_write_policy config load failed tool={tool_name} error={err}" ); - return Err(ToolCallError::Internal(format!( + Err(ToolCallError::Internal(format!( "failed to load config: {err}" - ))); + ))) } - }; + } +} + +fn enforce_write_policy_for_config(tool_name: &str, config: &Config) -> Result<(), ToolCallError> { let policy = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir); - policy - .enforce_tool_operation(ToolOperation::Act, tool_name) - .map_err(ToolCallError::InvalidParams) + match policy.enforce_tool_operation(ToolOperation::Act, tool_name) { + Ok(()) => Ok(()), + Err(message) => { + log::debug!( + "[mcp_server] enforce_write_policy denied tool={} decision={}", + tool_name, + message + ); + Err(ToolCallError::InvalidParams(message)) + } + } } /// Dispatch a write tool to its underlying RPC method with provenance and @@ -1169,31 +1223,62 @@ async fn enforce_write_policy(tool_name: &str) -> Result<(), ToolCallError> { async fn dispatch_write_tool( tool_name: &str, params: &Map, + audit_arguments: &Value, + client_info: &str, + config: &Config, ) -> Result { let rpc_method = "openhuman.memory_doc_put"; - tracing::info!( + tracing::debug!( tool = tool_name, rpc_method = rpc_method, - client = "mcp", + client = client_info, "[mcp_server] write dispatch" ); + tracing::trace!( + tool = tool_name, + rpc_method = rpc_method, + param_keys = ?params.keys().collect::>(), + "[mcp_server] write dispatch invoking rpc" + ); + match all::try_invoke_registered_rpc(rpc_method, params.clone()).await { Some(Ok(value)) => { - let document_id = value - .get("document_id") - .and_then(Value::as_str) - .unwrap_or(""); - tracing::info!( + let document_id = extract_document_id(&value); + audit_write( + config, + NewMcpWriteRecord { + timestamp_ms: now_ms(), + client_info: client_info.to_string(), + tool_name: tool_name.to_string(), + args_summary: summarize_write_args(tool_name, audit_arguments), + resulting_chunk_id: document_id.clone(), + success: true, + error_message: None, + }, + ); + tracing::debug!( tool = tool_name, - chunk_id = document_id, - client = "mcp", + chunk_id = document_id.as_deref().unwrap_or(""), + client = client_info, "[mcp_server] write success" ); Ok(tool_success(value)) } Some(Err(message)) => { + audit_write( + config, + NewMcpWriteRecord { + timestamp_ms: now_ms(), + client_info: client_info.to_string(), + tool_name: tool_name.to_string(), + args_summary: summarize_write_args(tool_name, audit_arguments), + resulting_chunk_id: None, + success: false, + error_message: Some(message.clone()), + }, + ); log::warn!( "[mcp_server] write handler error tool={} error={}", tool_name, @@ -1202,19 +1287,216 @@ async fn dispatch_write_tool( Ok(tool_error(format!("{} failed: {message}", tool_name))) } None => { + let message = format!("mapped RPC method `{rpc_method}` is not registered"); + audit_write( + config, + NewMcpWriteRecord { + timestamp_ms: now_ms(), + client_info: client_info.to_string(), + tool_name: tool_name.to_string(), + args_summary: summarize_write_args(tool_name, audit_arguments), + resulting_chunk_id: None, + success: false, + error_message: Some(message.clone()), + }, + ); log::error!( "[mcp_server] write mapping missing registered RPC method tool={} rpc_method={}", tool_name, rpc_method ); - Ok(tool_error(format!( - "{} is unavailable: mapped RPC method `{}` is not registered", - tool_name, rpc_method - ))) + Ok(tool_error(format!("{tool_name} is unavailable: {message}"))) + } + } +} + +fn audit_write(config: &Config, record: NewMcpWriteRecord) { + let config = config.clone(); + if let Ok(handle) = tokio::runtime::Handle::try_current() { + let _ = handle.spawn_blocking(move || { + if let Err(err) = mcp_audit::record_write(&config, record) { + log::warn!("[mcp_server] mcp write audit insert failed: {err}"); + } + }); + } else { + let _ = std::thread::spawn(move || { + if let Err(err) = mcp_audit::record_write(&config, record) { + log::warn!("[mcp_server] mcp write audit insert failed: {err}"); + } + }); + } +} + +fn audit_write_rejection( + config: &Config, + tool_name: &str, + audit_arguments: &Value, + params: Option<&Map>, + client_info: &str, + err: &ToolCallError, +) { + log::debug!( + "[mcp_server] write rejected before dispatch tool={} client={} error={}", + tool_name, + client_info, + err.message() + ); + audit_write( + config, + NewMcpWriteRecord { + timestamp_ms: now_ms(), + client_info: client_info.to_string(), + tool_name: tool_name.to_string(), + args_summary: summarize_rejected_write_args(tool_name, audit_arguments, params), + resulting_chunk_id: None, + success: false, + error_message: Some(err.message().to_string()), + }, + ); +} + +fn audit_write_rejection_without_config( + tool_name: &str, + audit_arguments: &Value, + client_info: &str, + error_message: &str, +) { + log::debug!( + "[mcp_server] write rejected before config load tool={} client={} error={}", + tool_name, + client_info, + error_message + ); + + let tool_name = tool_name.to_string(); + let client_info = client_info.to_string(); + let error_message = error_message.to_string(); + let args_summary = summarize_write_args(&tool_name, audit_arguments); + match tokio::runtime::Handle::try_current() { + Ok(handle) => { + let _ = handle.spawn(async move { + match config_rpc::load_config_with_timeout().await { + Ok(config) => audit_write( + &config, + NewMcpWriteRecord { + timestamp_ms: now_ms(), + client_info, + tool_name, + args_summary, + resulting_chunk_id: None, + success: false, + error_message: Some(error_message), + }, + ), + Err(err) => log::warn!( + "[mcp_server] write rejection audit skipped tool={} config load failed error={}", + tool_name, + err + ), + } + }); } + Err(err) => log::warn!( + "[mcp_server] write rejection audit skipped tool={} runtime unavailable error={}", + tool_name, + err + ), + } +} + +fn is_write_tool(tool_name: &str) -> bool { + matches!(tool_name, "memory.store" | "memory.note" | "tree.tag") +} + +fn summarize_rejected_write_args( + tool_name: &str, + audit_arguments: &Value, + params: Option<&Map>, +) -> Value { + let mut summary = summarize_write_args(tool_name, audit_arguments); + if let (Value::Object(summary), Some(params)) = (&mut summary, params) { + let mut param_keys = params.keys().cloned().collect::>(); + param_keys.sort(); + summary.insert( + "param_keys".to_string(), + Value::Array(param_keys.into_iter().map(Value::String).collect()), + ); + } + summary +} + +fn now_ms() -> i64 { + chrono::Utc::now().timestamp_millis() +} + +fn extract_document_id(value: &Value) -> Option { + value + .get("document_id") + .or_else(|| { + value + .get("result") + .and_then(|result| result.get("document_id")) + }) + .and_then(Value::as_str) + .map(str::to_string) +} + +fn summarize_write_args(tool_name: &str, arguments: &Value) -> Value { + let Some(args) = arguments.as_object() else { + return json!({}); + }; + match tool_name { + "memory.store" => json!({ + "title": args + .get("title") + .and_then(Value::as_str) + .map(|title| first_chars(title, 128)) + .unwrap_or_default(), + "namespace": args + .get("namespace") + .and_then(Value::as_str) + .unwrap_or("mcp"), + "tag_count": args + .get("tags") + .and_then(Value::as_array) + .map(|tags| tags.len()) + .unwrap_or(0), + }), + "memory.note" => json!({ + "chunk_id": args + .get("chunk_id") + .and_then(Value::as_str) + .unwrap_or_default(), + "note_text_length": args + .get("note_text") + .and_then(Value::as_str) + .map(|note| note.chars().count()) + .unwrap_or(0), + }), + "tree.tag" => json!({ + "chunk_id": args + .get("chunk_id") + .and_then(Value::as_str) + .unwrap_or_default(), + "tags": args + .get("tags") + .and_then(Value::as_array) + .map(|tags| { + tags.iter() + .filter_map(Value::as_str) + .map(str::to_string) + .collect::>() + }) + .unwrap_or_default(), + }), + _ => json!({}), } } +fn first_chars(value: &str, max_chars: usize) -> String { + value.chars().take(max_chars).collect() +} + async fn load_config_and_init_registry() -> Result { let config = config_rpc::load_config_with_timeout() @@ -2245,6 +2527,184 @@ mod tests { ); } + // ── MCP write audit summary ──────────────────────────────────────── + + #[test] + fn summarize_write_args_omits_memory_store_content() { + let summary = summarize_write_args( + "memory.store", + &json!({ + "title": "A".repeat(140), + "content": "private body", + "namespace": "work", + "tags": ["project", "planning"] + }), + ); + assert_eq!(summary["title"].as_str().unwrap().chars().count(), 128); + assert_eq!(summary["namespace"], "work"); + assert_eq!(summary["tag_count"], 2); + assert!(summary.get("content").is_none()); + } + + #[test] + fn summarize_write_args_omits_memory_note_text() { + let summary = summarize_write_args( + "memory.note", + &json!({ "chunk_id": "chunk-42", "note_text": "Important context" }), + ); + assert_eq!(summary["chunk_id"], "chunk-42"); + assert_eq!( + summary["note_text_length"].as_u64(), + Some("Important context".chars().count() as u64) + ); + assert!(summary.get("note_text").is_none()); + } + + #[test] + fn summarize_write_args_keeps_tree_tag_labels() { + let summary = summarize_write_args( + "tree.tag", + &json!({ "chunk_id": "chunk-42", "tags": ["todo", "q3"] }), + ); + assert_eq!(summary["chunk_id"], "chunk-42"); + assert_eq!(summary["tags"], json!(["todo", "q3"])); + } + + #[test] + fn summarize_rejected_write_args_includes_param_keys_only() { + let mut params = Map::new(); + params.insert("content".into(), Value::String("private body".into())); + params.insert("source_type".into(), Value::String("mcp:test".into())); + params.insert("title".into(), Value::String("T".into())); + + let summary = summarize_rejected_write_args( + "memory.store", + &json!({ "title": "T", "content": "private body" }), + Some(¶ms), + ); + + assert_eq!( + summary["param_keys"], + json!(["content", "source_type", "title"]) + ); + assert!(summary.get("content").is_none()); + } + + #[test] + fn write_policy_logs_and_returns_denial() { + let tmp = tempfile::TempDir::new().unwrap(); + let mut config = Config::default(); + config.workspace_dir = tmp.path().join("workspace"); + config.autonomy.level = crate::openhuman::security::AutonomyLevel::ReadOnly; + + let err = enforce_write_policy_for_config("memory.store", &config) + .expect_err("read-only mode should deny writes"); + assert!(err.message().contains("read-only mode")); + } + + #[tokio::test] + async fn audit_write_rejection_records_failure_row() { + let tmp = tempfile::TempDir::new().unwrap(); + let mut config = Config::default(); + config.workspace_dir = tmp.path().join("workspace"); + std::fs::create_dir_all(&config.workspace_dir).unwrap(); + + let err = ToolCallError::InvalidParams("bad write request".into()); + audit_write_rejection( + &config, + "memory.store", + &json!({ "title": "T", "content": "private body" }), + None, + "mcp:test", + &err, + ); + + let mut rows = Vec::new(); + for _ in 0..50 { + rows = crate::openhuman::mcp_audit::list_writes( + &config, + &crate::openhuman::mcp_audit::McpWriteListQuery::default(), + ) + .expect("list writes"); + if rows.len() == 1 { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + + assert_eq!(rows.len(), 1); + assert!(!rows[0].success); + assert_eq!(rows[0].tool_name, "memory.store"); + assert_eq!(rows[0].client_info, "mcp:test"); + assert_eq!(rows[0].error_message.as_deref(), Some("bad write request")); + assert!(rows[0].args_summary.get("content").is_none()); + } + + #[tokio::test] + async fn call_tool_records_write_argument_rejection() { + let _env_lock = crate::openhuman::config::TEST_ENV_LOCK + .lock() + .unwrap_or_else(|err| err.into_inner()); + let tmp = tempfile::tempdir().expect("tempdir"); + unsafe { + std::env::set_var("OPENHUMAN_WORKSPACE", tmp.path()); + } + let config = config_rpc::load_config_with_timeout() + .await + .expect("config"); + + let err = call_tool("memory.store", json!({ "title": "T" }), "mcp:test") + .await + .expect_err("missing content should reject"); + assert!( + err.message() + .contains("missing required argument `content`"), + "got: {}", + err.message() + ); + + let mut rows = Vec::new(); + for _ in 0..50 { + rows = crate::openhuman::mcp_audit::list_writes( + &config, + &crate::openhuman::mcp_audit::McpWriteListQuery::default(), + ) + .expect("list writes"); + if rows.len() == 1 { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + + assert_eq!(rows.len(), 1); + assert!(!rows[0].success); + assert_eq!(rows[0].tool_name, "memory.store"); + assert_eq!(rows[0].client_info, "mcp:test"); + assert!(rows[0] + .error_message + .as_deref() + .unwrap_or_default() + .contains("missing required argument `content`")); + assert!(rows[0].args_summary.get("content").is_none()); + + unsafe { + std::env::remove_var("OPENHUMAN_WORKSPACE"); + } + } + + #[test] + fn extract_document_id_reads_rpc_outcome_envelope() { + assert_eq!( + extract_document_id(&json!({"result": {"document_id": "doc-123"}, "logs": []})) + .as_deref(), + Some("doc-123") + ); + assert_eq!( + extract_document_id(&json!({"document_id": "doc-456"})).as_deref(), + Some("doc-456") + ); + } + // ── slug_from ───────────────────────────────────────────────────── #[test] diff --git a/src/openhuman/memory_store/chunks/store.rs b/src/openhuman/memory_store/chunks/store.rs index 352f0794ff..e42bd5071f 100644 --- a/src/openhuman/memory_store/chunks/store.rs +++ b/src/openhuman/memory_store/chunks/store.rs @@ -342,6 +342,26 @@ CREATE TABLE IF NOT EXISTS mem_tree_ingested_sources ( ingested_at_ms INTEGER NOT NULL, PRIMARY KEY (source_kind, source_id) ); + +-- MCP write-tool audit trail (#2536). This intentionally stores compact +-- identifying metadata instead of duplicating the memory document body. +CREATE TABLE IF NOT EXISTS mcp_writes ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp_ms INTEGER NOT NULL, + client_info TEXT NOT NULL, + tool_name TEXT NOT NULL, + args_summary TEXT, + resulting_chunk_id TEXT, + success INTEGER NOT NULL, + error_message TEXT +); + +CREATE INDEX IF NOT EXISTS idx_mcp_writes_timestamp + ON mcp_writes(timestamp_ms DESC); +CREATE INDEX IF NOT EXISTS idx_mcp_writes_client + ON mcp_writes(client_info); +CREATE INDEX IF NOT EXISTS idx_mcp_writes_tool + ON mcp_writes(tool_name); "; /// Upsert a batch of chunks atomically. diff --git a/src/openhuman/mod.rs b/src/openhuman/mod.rs index 503b3e6300..8a4f8d658e 100644 --- a/src/openhuman/mod.rs +++ b/src/openhuman/mod.rs @@ -47,6 +47,7 @@ pub mod integrations; pub mod javascript; pub mod keyring; pub mod learning; +pub mod mcp_audit; pub mod mcp_client; pub mod mcp_registry; pub mod mcp_server;