Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/openhuman/mcp_server/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ async fn handle_request(id: Value, method: &str, params: Value, session: &mut Mc
session.source_type(),
object_keys(&arguments)
);
match tools::call_tool(&name, arguments).await {
match tools::call_tool_with_session(&name, arguments, session.source_type()).await {
Ok(result) => {
log::debug!(
"[mcp_server] tools/call response id={} tool={} client_source_type={} is_error={}",
Expand Down
191 changes: 175 additions & 16 deletions src/openhuman/mcp_server/tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,25 @@ fn list_tools_result_from_specs(specs: Vec<McpToolSpec>) -> Value {
}

pub async fn call_tool(name: &str, arguments: Value) -> Result<Value, ToolCallError> {
// Back-compat wrapper for the pre-#2536 signature. The `mcp:` default
// matches the fallback `McpSession` uses when an MCP client omits
// `clientInfo.name` during `initialize`, so audit rows from this path
// are indistinguishable from "no client identity" sessions.
call_tool_with_session(name, arguments, "mcp").await
}

/// `call_tool` variant that threads the per-session `client_source_type`
/// (e.g. `"mcp:claude-desktop"`) into the write-dispatch path so the
/// audit log (#2536) can record *which* MCP client made each write.
///
/// Callers with a live `McpSession` should pass
/// `session.source_type()` here; the bare `mcp` fallback in `call_tool`
/// is only for code paths that haven't been threaded yet.
pub async fn call_tool_with_session(
name: &str,
arguments: Value,
client_source_type: &str,
) -> Result<Value, ToolCallError> {
let spec = tool_specs()
.into_iter()
.find(|tool| tool.name == name)
Expand Down Expand Up @@ -573,7 +592,7 @@ pub async fn call_tool(name: &str, arguments: Value) -> Result<Value, ToolCallEr
"memory.store" | "memory.note" | "tree.tag" => {
enforce_write_policy(spec.name).await?;
validate_controller_params(&spec, &params)?;
return dispatch_write_tool(spec.name, &params).await;
return dispatch_write_tool(spec.name, &params, client_source_type).await;
}
_ => {}
}
Expand Down Expand Up @@ -1169,49 +1188,189 @@ async fn enforce_write_policy(tool_name: &str) -> Result<(), ToolCallError> {
async fn dispatch_write_tool(
tool_name: &str,
params: &Map<String, Value>,
client_source_type: &str,
) -> Result<Value, ToolCallError> {
let rpc_method = "openhuman.memory_doc_put";

tracing::info!(
tool = tool_name,
rpc_method = rpc_method,
client = "mcp",
client = client_source_type,
"[mcp_server] write dispatch"
);

match all::try_invoke_registered_rpc(rpc_method, params.clone()).await {
let outcome = all::try_invoke_registered_rpc(rpc_method, params.clone()).await;
let (rpc_result, audit_chunk_id, audit_success, audit_error) = match &outcome {
Some(Ok(value)) => {
let document_id = value
.get("document_id")
.and_then(Value::as_str)
.unwrap_or("<unknown>");
let document_id = value.get("document_id").and_then(Value::as_str);
tracing::info!(
tool = tool_name,
chunk_id = document_id,
client = "mcp",
chunk_id = document_id.unwrap_or("<unknown>"),
client = client_source_type,
"[mcp_server] write success"
);
Ok(tool_success(value))
(
Ok(tool_success(value.clone())),
document_id.map(str::to_string),
true,
None,
)
}
Some(Err(message)) => {
log::warn!(
"[mcp_server] write handler error tool={} error={}",
"[mcp_server] write handler error tool={} client={} error={}",
tool_name,
client_source_type,
message
);
Ok(tool_error(format!("{} failed: {message}", tool_name)))
(
Ok(tool_error(format!("{} failed: {message}", tool_name))),
None,
false,
Some(message.clone()),
)
}
None => {
log::error!(
"[mcp_server] write mapping missing registered RPC method tool={} rpc_method={}",
tool_name,
rpc_method
);
Ok(tool_error(format!(
"{} is unavailable: mapped RPC method `{}` is not registered",
tool_name, rpc_method
)))
(
Ok(tool_error(format!(
"{} is unavailable: mapped RPC method `{}` is not registered",
tool_name, rpc_method
))),
None,
false,
Some(format!("rpc method `{rpc_method}` not registered")),
)
}
};

// Best-effort audit log (#2536 Q2=A) — never fails the write.
// Failed RPC dispatches are also recorded so abuse-detection
// signals show up alongside success rows.
record_audit_best_effort(
tool_name,
params,
client_source_type,
audit_chunk_id,
audit_success,
audit_error,
)
.await;

rpc_result
}

/// Best-effort audit insert for the MCP write path (#2536).
///
/// Loads the live config (cheap, cached) and dispatches a non-blocking
/// `record_write`. Any failure is logged at `warn` level and swallowed
/// — the write itself has already succeeded or failed independently,
/// so the user's request outcome is not affected by audit-table issues.
async fn record_audit_best_effort(
tool_name: &str,
params: &Map<String, Value>,
client_source_type: &str,
resulting_chunk_id: Option<String>,
success: bool,
error_message: Option<String>,
) {
use crate::openhuman::memory::mcp_audit::{record_write, McpWriteRecord};
use std::time::{SystemTime, UNIX_EPOCH};

let config = match config_rpc::load_config_with_timeout().await {
Ok(config) => config,
Err(err) => {
log::warn!(
"[mcp_server] audit-log config load failed tool={} client={} error={}; skipping audit insert",
tool_name, client_source_type, err
);
return;
}
};

let timestamp_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0);

let record = McpWriteRecord {
timestamp_ms,
client_source_type: client_source_type.to_string(),
tool_name: tool_name.to_string(),
args_summary: summarize_write_args(tool_name, params),
resulting_chunk_id,
success,
error_message,
};

if let Err(err) = record_write(&config, record).await {
log::warn!(
"[mcp_server] audit-log insert failed tool={} client={} error={:#}",
tool_name,
client_source_type,
err
);
}
}

/// Build the slim per-tool `args_summary` JSON string for the audit
/// row. Stores **identifying metadata only** — never the document
/// content itself, which lives in the memory tree via `doc_put`.
///
/// Returns `None` when the per-tool extractor produced no recordable
/// fields; the audit row still goes in with `args_summary` NULL so
/// the row count itself remains an honest write counter.
fn summarize_write_args(tool_name: &str, params: &Map<String, Value>) -> Option<String> {
let mut summary = Map::new();
match tool_name {
"memory.store" => {
if let Some(title) = params.get("title").and_then(Value::as_str) {
// Cap at 128 chars so a 10 KiB title doesn't bloat
// the audit table. Truncation is identifier-only;
// the full title is already in the chunk doc.
let truncated: String = title.chars().take(128).collect();
summary.insert("title".to_string(), Value::String(truncated));
}
if let Some(namespace) = params.get("namespace").and_then(Value::as_str) {
summary.insert(
"namespace".to_string(),
Value::String(namespace.to_string()),
);
}
if let Some(tags) = params.get("tags").and_then(Value::as_array) {
summary.insert("tag_count".to_string(), Value::from(tags.len()));
}
}
"memory.note" => {
if let Some(meta) = params.get("metadata").and_then(Value::as_object) {
if let Some(chunk_id) = meta.get("annotates_chunk_id").and_then(Value::as_str) {
summary.insert("chunk_id".to_string(), Value::String(chunk_id.to_string()));
}
}
if let Some(content) = params.get("content").and_then(Value::as_str) {
summary.insert("note_text_length".to_string(), Value::from(content.len()));
}
}
"tree.tag" => {
if let Some(meta) = params.get("metadata").and_then(Value::as_object) {
if let Some(chunk_id) = meta.get("tags_for_chunk_id").and_then(Value::as_str) {
summary.insert("chunk_id".to_string(), Value::String(chunk_id.to_string()));
}
if let Some(tags) = meta.get("applied_tags").and_then(Value::as_array) {
summary.insert("tags".to_string(), Value::Array(tags.clone()));
}
}
}
_ => {}
}
if summary.is_empty() {
None
} else {
serde_json::to_string(&Value::Object(summary)).ok()
}
}

Expand Down
32 changes: 32 additions & 0 deletions src/openhuman/memory/mcp_audit/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//! Persistent audit log for MCP write tool invocations
//! (`memory.store`, `memory.note`, `tree.tag`).
//!
//! Closes out Q4 of the Phase 3 RFC (#2269) — replaces the
//! `tracing::info!`-only audit trail with a queryable SQLite-backed
//! `mcp_writes` table colocated in the existing memory-tree DB.
//!
//! Issue: #2536.
//!
//! ## V1 scope (this module)
//!
//! - Schema (`mcp_writes`) and migration (additive `CREATE TABLE IF NOT EXISTS`
//! in `tree::store::SCHEMA`).
//! - `McpWriteRecord` struct + `record_write` insert helper.
//! - Best-effort coupling to the MCP write dispatch path
//! (`mcp_server::tools::dispatch_write_tool`) — audit insert failures are
//! logged but never abort the underlying write (Q2=A in #2536).
//!
//! ## Out of scope (follow-ups once Q4 is ratified)
//!
//! - Query RPC (`openhuman.mcp_audit_list`).
//! - MCP-client-side exposure of the audit log.
//! - UI surface for browsing the audit history.
//! - Retention / pruning policy.

mod store;
#[cfg(test)]
mod tests;
mod types;

pub use store::record_write;
pub use types::McpWriteRecord;
Loading
Loading