From f8c20ca6c54b976eff61826a33b24ac5d4826f2f Mon Sep 17 00:00:00 2001 From: Jacob Magar Date: Mon, 25 May 2026 03:25:32 -0400 Subject: [PATCH] feat(mcp): add token-count telemetry to code_*/scout/invoke logs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Surface input_tokens and output_tokens fields on every dispatch event for the five gateway meta-tools — code_search, code_schema, code_execute, scout, and invoke. The fields complement the existing elapsed_ms and make it possible to size LLM context budgets and spot ballooning responses from log analytics. Uses a simple chars/4 estimator (chars div_ceil 4) — dependency-free and accurate enough for capacity tracking. Pull in tiktoken-rs later if exact counts are required. Input tokens are computed once at handler entry from the MCP arguments map. Output tokens are computed before the success log emits, against the serialized result. Failure paths only emit input_tokens (no useful output to size). Refs: timing already existed inline; this commit adds token accounting on the same boundary. New estimator helpers covered by 3 unit tests. --- crates/lab/src/mcp/server.rs | 144 ++++++++++++++++++++++++++++------- 1 file changed, 118 insertions(+), 26 deletions(-) diff --git a/crates/lab/src/mcp/server.rs b/crates/lab/src/mcp/server.rs index 43e603e4..d2361d96 100644 --- a/crates/lab/src/mcp/server.rs +++ b/crates/lab/src/mcp/server.rs @@ -1317,6 +1317,7 @@ impl ServerHandler for LabMcpServer { let svc = self.registry.services().iter().find(|s| s.name == service); if service == CODE_SEARCH_TOOL_NAME && self.gateway_code_mode_enabled().await { let started = Instant::now(); + let input_tokens = estimate_tokens_args(&args); let subject = self.request_subject_log_tag(&context); let auth = auth_context_from_extensions(&context.extensions); if !tool_search_scope_allowed(auth) { @@ -1326,6 +1327,7 @@ impl ServerHandler for LabMcpServer { action = "call_tool", subject, elapsed_ms = started.elapsed().as_millis(), + input_tokens, kind = "forbidden", "gateway code search denied by scope" ); @@ -1362,6 +1364,7 @@ impl ServerHandler for LabMcpServer { subject, code_hash = %code_hash, code_len = code.len(), + input_tokens, "gateway code search start" ); let broker = CodeModeBroker::new(&self.registry, Some(manager)); @@ -1376,6 +1379,8 @@ impl ServerHandler for LabMcpServer { .await { Ok(response) => { + let output = serde_json::to_string(&response).unwrap_or_else(|_| "null".to_string()); + let output_tokens = estimate_tokens(&output); tracing::info!( surface = "mcp", service = "code_search", @@ -1384,11 +1389,11 @@ impl ServerHandler for LabMcpServer { code_hash = %code_hash, code_len = code.len(), elapsed_ms = started.elapsed().as_millis(), + input_tokens, + output_tokens, "gateway code search ok" ); - Ok(CallToolResult::success(vec![Content::text( - serde_json::to_string(&response).unwrap_or_else(|_| "null".to_string()), - )])) + Ok(CallToolResult::success(vec![Content::text(output)])) } Err(err) => { tracing::warn!( @@ -1399,6 +1404,7 @@ impl ServerHandler for LabMcpServer { code_hash = %code_hash, code_len = code.len(), elapsed_ms = started.elapsed().as_millis(), + input_tokens, kind = err.kind(), error = %err, "gateway code search failed" @@ -1410,6 +1416,7 @@ impl ServerHandler for LabMcpServer { } if service == CODE_EXECUTE_TOOL_NAME && self.gateway_code_mode_enabled().await { let started = Instant::now(); + let input_tokens = estimate_tokens_args(&args); let subject = self.request_subject_log_tag(&context); let auth = auth_context_from_extensions(&context.extensions); if !tool_execute_scope_allowed(auth) { @@ -1419,6 +1426,7 @@ impl ServerHandler for LabMcpServer { action = "call_tool", subject, elapsed_ms = started.elapsed().as_millis(), + input_tokens, kind = "forbidden", "gateway code execute denied by scope" ); @@ -1490,6 +1498,7 @@ impl ServerHandler for LabMcpServer { subject, code_hash = %code_hash, max_tool_calls = requested_max_tool_calls, + input_tokens, "gateway code execute start" ); let broker = CodeModeBroker::new(&self.registry, Some(manager)); @@ -1522,6 +1531,8 @@ impl ServerHandler for LabMcpServer { return Ok(CallToolResult::error(vec![Content::text(env.to_string())])); } }; + let output = serde_json::to_string(&response).unwrap_or_else(|_| "{}".to_string()); + let output_tokens = estimate_tokens(&output); tracing::info!( surface = "mcp", service = "code_execute", @@ -1530,14 +1541,15 @@ impl ServerHandler for LabMcpServer { code_hash = %code_hash, call_count = response.calls.len(), elapsed_ms = started.elapsed().as_millis(), + input_tokens, + output_tokens, "gateway code execute ok" ); - return Ok(CallToolResult::success(vec![Content::text( - serde_json::to_string(&response).unwrap_or_else(|_| "{}".to_string()), - )])); + return Ok(CallToolResult::success(vec![Content::text(output)])); } if service == TOOL_SEARCH_TOOL_NAME || service == LEGACY_TOOL_SEARCH_TOOL_NAME { let started = Instant::now(); + let input_tokens = estimate_tokens_args(&args); let subject = self.request_subject_log_tag(&context); let auth = auth_context_from_extensions(&context.extensions); if !tool_search_scope_allowed(auth) { @@ -1547,6 +1559,7 @@ impl ServerHandler for LabMcpServer { action = "call_tool", subject, elapsed_ms = started.elapsed().as_millis(), + input_tokens, kind = "forbidden", "gateway tool search denied by scope" ); @@ -1598,6 +1611,7 @@ impl ServerHandler for LabMcpServer { query_len = query.len(), top_k, include_schema, + input_tokens, "gateway tool search start" ); let score_floor_fraction = manager.tool_search_config().await.score_floor_fraction; @@ -1608,6 +1622,8 @@ impl ServerHandler for LabMcpServer { Ok(upstream_results) => { let results = merge_tool_search_results(builtin_results, upstream_results, top_k); + let output = serde_json::to_string(&results).unwrap_or_else(|_| "[]".to_string()); + let output_tokens = estimate_tokens(&output); tracing::info!( surface = "mcp", service = "tool_search", @@ -1619,15 +1635,18 @@ impl ServerHandler for LabMcpServer { include_schema, result_count = results.len(), elapsed_ms = started.elapsed().as_millis(), + input_tokens, + output_tokens, "gateway tool search ok" ); - Ok(CallToolResult::success(vec![Content::text( - serde_json::to_string(&results).unwrap_or_else(|_| "[]".to_string()), - )])) + Ok(CallToolResult::success(vec![Content::text(output)])) } Err(err) => { let kind = err.kind(); if kind == "index_warming" && !builtin_results.is_empty() { + let output = serde_json::to_string(&builtin_results) + .unwrap_or_else(|_| "[]".to_string()); + let output_tokens = estimate_tokens(&output); tracing::info!( surface = "mcp", service = "tool_search", @@ -1639,13 +1658,12 @@ impl ServerHandler for LabMcpServer { include_schema, result_count = builtin_results.len(), elapsed_ms = started.elapsed().as_millis(), + input_tokens, + output_tokens, upstream_kind = kind, "gateway tool search ok" ); - return Ok(CallToolResult::success(vec![Content::text( - serde_json::to_string(&builtin_results) - .unwrap_or_else(|_| "[]".to_string()), - )])); + return Ok(CallToolResult::success(vec![Content::text(output)])); } tracing::warn!( surface = "mcp", @@ -1657,6 +1675,7 @@ impl ServerHandler for LabMcpServer { top_k, include_schema, elapsed_ms = started.elapsed().as_millis(), + input_tokens, kind, error = %err, "gateway tool search failed" @@ -1684,6 +1703,7 @@ impl ServerHandler for LabMcpServer { TOOL_EXECUTE_TOOL_NAME | LEGACY_TOOL_EXECUTE_TOOL_NAME | LEGACY_TOOL_INVOKE_TOOL_NAME ) { let started = Instant::now(); + let input_tokens = estimate_tokens_args(&args); let tool_name = args .get("name") .and_then(Value::as_str) @@ -1711,6 +1731,7 @@ impl ServerHandler for LabMcpServer { upstream_tool = %tool_name, arguments_hash = %arguments_hash, elapsed_ms = started.elapsed().as_millis(), + input_tokens, kind = "forbidden", "gateway tool execute denied by scope" ); @@ -1750,6 +1771,7 @@ impl ServerHandler for LabMcpServer { upstream_tool = %tool_name, arguments_hash = %arguments_hash, elapsed_ms = started.elapsed().as_millis(), + input_tokens, kind = "not_found", "gateway tool execute failed" ); @@ -1782,6 +1804,7 @@ impl ServerHandler for LabMcpServer { upstream_tool = %tool_name, arguments_hash = %arguments_hash, elapsed_ms = started.elapsed().as_millis(), + input_tokens, kind = "unknown_action", "gateway tool execute failed" ); @@ -1820,6 +1843,7 @@ impl ServerHandler for LabMcpServer { builtin_action = %builtin_action, arguments_hash = %arguments_hash, elapsed_ms = started.elapsed().as_millis(), + input_tokens, kind = "forbidden", "gateway tool execute denied by built-in action scope" ); @@ -1907,18 +1931,23 @@ impl ServerHandler for LabMcpServer { .map_err(|te| anyhow::Error::from(DispatchError::from(te))); let elapsed_ms = started.elapsed().as_millis(); match &result { - Ok(_) => tracing::info!( - surface = "mcp", - service = %service, - action = "call_tool", - subject, - upstream = "lab", - upstream_tool = %tool_name, - builtin_action = %builtin_action, - arguments_hash = %arguments_hash, - elapsed_ms, - "gateway tool execute ok" - ), + Ok(value) => { + let output_tokens = estimate_tokens_value(value); + tracing::info!( + surface = "mcp", + service = %service, + action = "call_tool", + subject, + upstream = "lab", + upstream_tool = %tool_name, + builtin_action = %builtin_action, + arguments_hash = %arguments_hash, + elapsed_ms, + input_tokens, + output_tokens, + "gateway tool execute ok" + ); + } Err(err) => { let (kind, _, _) = extract_error_info(err); tracing::warn!( @@ -1931,6 +1960,7 @@ impl ServerHandler for LabMcpServer { builtin_action = %builtin_action, arguments_hash = %arguments_hash, elapsed_ms, + input_tokens, kind, "gateway tool execute failed" ); @@ -1969,6 +1999,7 @@ impl ServerHandler for LabMcpServer { requested_upstream = requested_upstream.as_deref().unwrap_or(""), arguments_hash = %arguments_hash, elapsed_ms = started.elapsed().as_millis(), + input_tokens, kind = "ambiguous_tool", valid_count = valid.len(), "gateway tool execute failed" @@ -2001,6 +2032,7 @@ impl ServerHandler for LabMcpServer { requested_upstream = requested_upstream.as_deref().unwrap_or(""), arguments_hash = %arguments_hash, elapsed_ms = started.elapsed().as_millis(), + input_tokens, kind, error = %err, "gateway tool execute failed" @@ -2034,6 +2066,7 @@ impl ServerHandler for LabMcpServer { upstream = %upstream_name, upstream_tool = %upstream_tool_name, arguments_hash = %arguments_hash, + input_tokens, "gateway tool execute start" ); let mut upstream_params = CallToolRequestParams::new(upstream_tool_name.clone()); @@ -2043,6 +2076,9 @@ impl ServerHandler for LabMcpServer { }); match pool.call_tool(&upstream_name, upstream_params).await { Some(Ok(result)) => { + let output_tokens = estimate_tokens( + &serde_json::to_string(&result).unwrap_or_default(), + ); tracing::info!( surface = "mcp", service = %service, @@ -2052,6 +2088,8 @@ impl ServerHandler for LabMcpServer { upstream_tool = %upstream_tool_name, arguments_hash = %arguments_hash, elapsed_ms = started.elapsed().as_millis(), + input_tokens, + output_tokens, "gateway tool execute ok" ); return Ok(result); @@ -2066,6 +2104,7 @@ impl ServerHandler for LabMcpServer { upstream_tool = %upstream_tool_name, arguments_hash = %arguments_hash, elapsed_ms = started.elapsed().as_millis(), + input_tokens, kind = "upstream_error", error = %e, "gateway tool execute failed" @@ -2083,6 +2122,7 @@ impl ServerHandler for LabMcpServer { upstream_tool = %upstream_tool_name, arguments_hash = %arguments_hash, elapsed_ms = started.elapsed().as_millis(), + input_tokens, kind = "upstream_error", "gateway tool execute upstream disconnected" ); @@ -2111,6 +2151,7 @@ impl ServerHandler for LabMcpServer { upstream_tool = %upstream_tool_name, arguments_hash = %arguments_hash, elapsed_ms = started.elapsed().as_millis(), + input_tokens, kind = "upstream_error", "gateway tool execute dispatched without upstream pool" ); @@ -3012,6 +3053,25 @@ fn hash_arguments(arguments: &Value) -> String { hex::encode(Sha256::digest(bytes)) } +/// Rough char-based token estimator for gateway telemetry logs. +/// +/// Uses the conventional ~4 chars-per-token heuristic. Cheap, dependency-free, +/// and accurate enough for capacity tracking; do NOT use for LLM budget +/// enforcement — pull in `tiktoken-rs` if exact counts are required. +fn estimate_tokens(s: &str) -> usize { + s.len().div_ceil(4) +} + +/// Token count of a JSON value, computed against its serialized form. +fn estimate_tokens_value(value: &Value) -> usize { + estimate_tokens(&serde_json::to_string(value).unwrap_or_default()) +} + +/// Token count of an MCP arguments map (`request.arguments.unwrap_or_default()`). +fn estimate_tokens_args(arguments: &serde_json::Map) -> usize { + estimate_tokens(&serde_json::to_string(arguments).unwrap_or_default()) +} + /// Format the result of a dispatch operation into an MCP `CallToolResult`. fn format_dispatch_result( result: Result, @@ -3219,7 +3279,10 @@ pub fn extract_error_info(e: &anyhow::Error) -> (&'static str, String, Option = serde_json::Map::new(); + // "{}" → 2 chars → 1 token. + assert_eq!(estimate_tokens_args(&empty), 1); + + let mut populated = serde_json::Map::new(); + populated.insert("name".into(), Value::String("scout".into())); + // `{"name":"scout"}` is 16 chars → 4 tokens. + assert_eq!(estimate_tokens_args(&populated), 4); + } + #[tokio::test] async fn extract_error_info_preserves_unknown_action_from_real_dispatch_downcast() { let err = crate::dispatch::lab_admin::dispatch("definitely.unknown", serde_json::json!({}))