diff --git a/.cargo/config.toml b/.cargo/config.toml index 7f6b44b82..8ebc87775 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -17,3 +17,11 @@ # incremental=true for normal dev; this file overrides it for developers who # have sccache configured. See docs/RUST.md §Repo-specific overrides. incremental = false + +[profile.dev] +# The all-features labby test harness is large enough that LLVM can exhaust +# memory on this host when the global Cargo config forces debug info on. +debug = 0 + +[profile.test] +debug = 0 diff --git a/crates/lab/Cargo.toml b/crates/lab/Cargo.toml index 5cda790b7..c9e852222 100644 --- a/crates/lab/Cargo.toml +++ b/crates/lab/Cargo.toml @@ -14,6 +14,7 @@ categories = ["command-line-utilities", "development-tools"] [[bin]] name = "labby" path = "src/main.rs" +test = false [dependencies] lab-apis = { path = "../lab-apis", default-features = false } diff --git a/crates/lab/src/api/services/gateway.rs b/crates/lab/src/api/services/gateway.rs index c47775d9e..f5600e898 100644 --- a/crates/lab/src/api/services/gateway.rs +++ b/crates/lab/src/api/services/gateway.rs @@ -334,7 +334,7 @@ mod tests { async fn gateway_test_accepts_proposed_spec() { let response = post_gateway_fresh(json!({ "action":"gateway.test", - "params":{"spec":{"name":"fixture-stdio","command":"echo","args":["hello"]}} + "params":{"allow_stdio":true,"spec":{"name":"fixture-stdio","command":"echo","args":["hello"]}} })) .await; assert_eq!(response.status(), StatusCode::OK); diff --git a/crates/lab/src/cli/gateway.rs b/crates/lab/src/cli/gateway.rs index 05a03b924..d0d368e0a 100644 --- a/crates/lab/src/cli/gateway.rs +++ b/crates/lab/src/cli/gateway.rs @@ -12,6 +12,9 @@ use crate::cli::helpers::{run_action_command, run_confirmable_action_command}; use crate::config::{LabConfig, ProtectedMcpRouteConfig, config_toml_path}; use crate::dispatch::clients::SharedServiceClients; use crate::dispatch::gateway::SHARED_GATEWAY_OAUTH_SUBJECT; +use crate::dispatch::gateway::code_mode::{ + CodeModeBroker, CodeModeCaller, CodeModeSurface, CodeModeToolId, CodeModeToolRef, +}; use crate::dispatch::gateway::install_gateway_manager; use crate::dispatch::gateway::manager::{GatewayManager, GatewayRuntimeHandle}; use crate::dispatch::upstream::pool::UpstreamPool; @@ -45,6 +48,33 @@ pub enum GatewayCommand { Pending(GatewayPendingArgs), /// Show resolved public URL configuration (app and MCP gateway) PublicUrls, + /// Search, inspect, and execute Code Mode snippets through dispatch + Code(GatewayCodeArgs), +} + +#[derive(Debug, Args)] +pub struct GatewayCodeArgs { + #[command(subcommand)] + pub command: GatewayCodeCommand, +} + +#[derive(Debug, Subcommand)] +pub enum GatewayCodeCommand { + /// Search Code Mode tool IDs by natural-language query + Search { + query: String, + #[arg(long, default_value_t = 10)] + top_k: usize, + }, + /// Show the schema and generated bindings for one Code Mode tool ID + Schema { id: String }, + /// Execute a sandboxed JavaScript snippet that calls callTool(id, params) + Exec { + #[arg(long, conflicts_with = "file")] + code: Option, + #[arg(long)] + file: Option, + }, } #[derive(Debug, Args)] @@ -387,7 +417,13 @@ pub async fn run(args: GatewayArgs, format: OutputFormat, config: &LabConfig) -> command: GatewayMcpAuthCommand::Status(_) | GatewayMcpAuthCommand::Clear(_), }), }) - ) || matches!(&args.command, GatewayCommand::ProtectedRoute(_))); + ) || matches!(&args.command, GatewayCommand::ProtectedRoute(_))) + && !matches!( + &args.command, + GatewayCommand::Code(GatewayCodeArgs { + command: GatewayCodeCommand::Schema { id }, + }) if code_mode_schema_is_builtin(id) + ); let manager = build_manager(config, discover_upstreams).await; let cli_origin = format!("cli:{}", std::process::id()); let cli_owner = json!({ @@ -506,6 +542,9 @@ pub async fn run(args: GatewayArgs, format: OutputFormat, config: &LabConfig) -> } }, command => { + if let GatewayCommand::Code(args) = command { + return run_gateway_code(manager, args, format).await; + } let mut confirmed = true; let mut dry_run = false; let (action, params) = match command { @@ -663,6 +702,7 @@ pub async fn run(args: GatewayArgs, format: OutputFormat, config: &LabConfig) -> }, GatewayCommand::PublicUrls => ("gateway.public_urls.get".to_string(), json!({})), GatewayCommand::Mcp(_) => unreachable!("handled above"), + GatewayCommand::Code(_) => unreachable!("handled above"), }; if dry_run { @@ -686,6 +726,64 @@ pub async fn run(args: GatewayArgs, format: OutputFormat, config: &LabConfig) -> } } +fn code_mode_schema_is_builtin(id: &str) -> bool { + matches!( + CodeModeToolId::parse(id), + Ok(CodeModeToolId { + reference: CodeModeToolRef::LabAction { .. }, + .. + }) + ) +} + +async fn run_gateway_code( + manager: Arc, + args: GatewayCodeArgs, + format: OutputFormat, +) -> Result { + const CODE_MODE_CLI_MAX_SOURCE_BYTES: u64 = 20 * 1024; + + let registry = manager.builtin_service_registry(); + let broker = CodeModeBroker::new(®istry, Some(manager.as_ref())); + let caller = CodeModeCaller::TrustedLocal; + let surface = CodeModeSurface::Cli; + + match args.command { + GatewayCodeCommand::Search { query, top_k } => { + let candidates = broker.search(&query, top_k, caller, surface).await?; + crate::output::print(&candidates, format)?; + } + GatewayCodeCommand::Schema { id } => { + let schema = broker.schema(&id, caller, surface).await?; + crate::output::print(&schema, format)?; + } + GatewayCodeCommand::Exec { code, file } => { + let code = match (code, file) { + (Some(code), None) => code, + (None, Some(path)) => { + let metadata = std::fs::metadata(&path)?; + if metadata.len() > CODE_MODE_CLI_MAX_SOURCE_BYTES { + anyhow::bail!("Code Mode source file exceeds 20480 bytes"); + } + std::fs::read_to_string(path)? + } + _ => anyhow::bail!("provide exactly one of --code or --file"), + }; + if code.len() as u64 > CODE_MODE_CLI_MAX_SOURCE_BYTES { + anyhow::bail!("Code Mode source exceeds 20480 bytes"); + } + let config = manager.code_mode_config().await; + let max_tool_calls = config.max_tool_calls; + let response = broker + .execute(&code, max_tool_calls, caller, surface, config) + .await?; + crate::output::print(&response, format)?; + } + } + + Ok(ExitCode::SUCCESS) +} + async fn run_gateway_oauth_start( manager: Arc, args: GatewayOauthUpstreamArgs, @@ -902,5 +1000,31 @@ mod tests { ]) .is_ok() ); + assert!(Cli::try_parse_from(["lab", "gateway", "code", "search", "movie.search"]).is_ok()); + assert!( + Cli::try_parse_from([ + "lab", + "gateway", + "code", + "schema", + "lab::radarr.movie.search", + ]) + .is_ok() + ); + assert!( + Cli::try_parse_from([ + "lab", + "gateway", + "code", + "exec", + "--code", + "await callTool(\"lab::gateway.gateway.servers\", {})", + ]) + .is_ok() + ); + assert!( + Cli::try_parse_from(["lab", "gateway", "code", "exec", "--file", "snippet.js",]) + .is_ok() + ); } } diff --git a/crates/lab/src/config.rs b/crates/lab/src/config.rs index fa129ff65..68b740169 100644 --- a/crates/lab/src/config.rs +++ b/crates/lab/src/config.rs @@ -539,14 +539,11 @@ impl CodeModeConfig { impl ToolSearchConfig { /// Resolve Qdrant URL: config field → `QDRANT_URL` env var → None. pub fn resolved_qdrant_url(&self) -> Option { - resolve_container_service_url( - self.qdrant_url - .clone() - .filter(|s| !s.is_empty()) - .or_else(|| std::env::var("QDRANT_URL").ok().filter(|s| !s.is_empty())), - "axon-qdrant", - 6333, - ) + self.qdrant_url + .clone() + .filter(|s| !s.is_empty()) + .or_else(|| std::env::var("QDRANT_URL").ok().filter(|s| !s.is_empty())) + .map(|raw| normalize_container_loopback_url(&raw, "axon-qdrant", 6333)) } /// Resolve Qdrant API key: config field → `QDRANT_API_KEY` env var → None. @@ -564,14 +561,11 @@ impl ToolSearchConfig { /// Resolve TEI URL: config field → `TEI_URL` env var → None. pub fn resolved_tei_url(&self) -> Option { - resolve_container_service_url( - self.tei_url - .clone() - .filter(|s| !s.is_empty()) - .or_else(|| std::env::var("TEI_URL").ok().filter(|s| !s.is_empty())), - "axon-tei", - 80, - ) + self.tei_url + .clone() + .filter(|s| !s.is_empty()) + .or_else(|| std::env::var("TEI_URL").ok().filter(|s| !s.is_empty())) + .map(|raw| normalize_container_loopback_url(&raw, "axon-tei", 80)) } /// Resolve TEI API key: config field → `TEI_API_KEY` env var → None. @@ -603,14 +597,6 @@ impl ToolSearchConfig { } } -fn resolve_container_service_url( - configured: Option, - docker_host: &str, - docker_port: u16, -) -> Option { - configured.map(|raw| normalize_container_loopback_url(&raw, docker_host, docker_port)) -} - fn normalize_container_loopback_url(raw: &str, docker_host: &str, docker_port: u16) -> String { normalize_container_loopback_url_for_runtime( raw, diff --git a/crates/lab/src/dispatch/gateway/code_mode.rs b/crates/lab/src/dispatch/gateway/code_mode.rs index 0efffcd29..91d2e453c 100644 --- a/crates/lab/src/dispatch/gateway/code_mode.rs +++ b/crates/lab/src/dispatch/gateway/code_mode.rs @@ -1,18 +1,28 @@ use std::cell::RefCell; +use std::cmp::Ordering as CmpOrdering; use std::collections::HashMap; use std::io::{self, BufRead, BufReader, BufWriter, Write}; use std::process::ExitCode; +use std::process::Stdio; +use std::time::Duration; use boa_engine::builtins::promise::{PromiseState, ResolvingFunctions}; use boa_engine::object::builtins::JsPromise; use boa_engine::{ Context, JsArgs, JsError, JsNativeError, JsResult, JsValue, NativeFunction, Source, js_string, }; +use futures::{FutureExt, StreamExt, stream::FuturesUnordered}; use lab_apis::core::action::{ActionSpec, ParamSpec}; +use rmcp::model::CallToolRequestParams; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value, json}; +use tempfile::TempDir; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader as TokioBufReader}; +use tokio::process::{Child, ChildStdin, Command}; use crate::dispatch::error::ToolError; +use crate::dispatch::gateway::manager::GatewayManager; +use crate::registry::{RegisteredService, ToolRegistry}; #[derive(Debug, Clone, PartialEq, Eq)] pub struct CodeModeToolId { @@ -160,6 +170,757 @@ pub struct CodeModeExecutedCall { pub result: Value, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum CodeModeCaller { + TrustedLocal, + Scoped { + scopes: Vec, + subject: Option, + }, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CodeModeSurface { + Mcp { + expose_builtin_services: bool, + allow_destructive_actions: bool, + }, + Cli, +} + +impl CodeModeCaller { + #[must_use] + pub fn can_read(&self) -> bool { + match self { + Self::TrustedLocal => true, + Self::Scoped { scopes, .. } => scopes + .iter() + .any(|scope| matches!(scope.as_str(), "lab:read" | "lab" | "lab:admin")), + } + } + + #[must_use] + pub fn can_execute(&self) -> bool { + match self { + Self::TrustedLocal => true, + Self::Scoped { scopes, .. } => scopes + .iter() + .any(|scope| matches!(scope.as_str(), "lab" | "lab:admin")), + } + } + + #[must_use] + pub fn can_execute_action(&self, entry: &RegisteredService, action: &str) -> bool { + if !builtin_action_requires_admin(entry, action) { + return self.can_execute(); + } + match self { + Self::TrustedLocal => true, + Self::Scoped { scopes, .. } => scopes.iter().any(|scope| scope == "lab:admin"), + } + } + + #[must_use] + pub fn subject(&self) -> Option<&str> { + match self { + Self::TrustedLocal => None, + Self::Scoped { subject, .. } => subject.as_deref(), + } + } +} + +pub struct CodeModeBroker<'a> { + registry: &'a ToolRegistry, + gateway_manager: Option<&'a GatewayManager>, +} + +impl<'a> CodeModeBroker<'a> { + #[must_use] + pub fn new(registry: &'a ToolRegistry, gateway_manager: Option<&'a GatewayManager>) -> Self { + Self { + registry, + gateway_manager, + } + } + + pub async fn search( + &self, + query: &str, + top_k: usize, + caller: CodeModeCaller, + surface: CodeModeSurface, + ) -> Result, ToolError> { + if !caller.can_read() { + return Err(ToolError::Sdk { + sdk_kind: "forbidden".to_string(), + message: "code_search requires one of scopes: lab:read, lab, lab:admin".to_string(), + }); + } + + let score_floor_fraction = match self.gateway_manager { + Some(manager) => manager.tool_search_config().await.score_floor_fraction, + None => 0.0, + }; + let mut candidates = self + .search_builtin_candidates(query, top_k, score_floor_fraction, surface) + .await; + + if let Some(manager) = self.gateway_manager { + match manager.search_tools(query, top_k, true).await { + Ok(upstream_results) => { + candidates.extend(upstream_results.into_iter().map(|result| { + CodeModeSearchCandidate::upstream_tool( + &result.upstream, + &result.name, + &result.description, + result.score, + result.input_schema, + ) + })); + } + Err(err) if err.kind() == "index_warming" && !candidates.is_empty() => {} + Err(err) => return Err(err), + } + } + + candidates.sort_by(compare_code_mode_search_candidates); + candidates.truncate(top_k.max(1).min(50)); + Ok(candidates) + } + + pub async fn schema( + &self, + id: &str, + caller: CodeModeCaller, + surface: CodeModeSurface, + ) -> Result { + if !caller.can_execute() { + return Err(ToolError::Sdk { + sdk_kind: "forbidden".to_string(), + message: "code_schema requires one of scopes: lab, lab:admin".to_string(), + }); + } + let parsed = CodeModeToolId::parse(id)?; + match parsed.reference { + CodeModeToolRef::LabAction { service, action } => { + self.schema_for_lab_action(&parsed.raw, &service, &action, caller, surface) + .await + } + CodeModeToolRef::UpstreamTool { upstream, tool } => { + self.schema_for_upstream_tool(&parsed.raw, &upstream, &tool) + .await + } + } + } + + pub async fn execute( + &self, + code: &str, + max_tool_calls: usize, + caller: CodeModeCaller, + surface: CodeModeSurface, + config: crate::config::CodeModeConfig, + ) -> Result { + if !config.enabled { + return Err(ToolError::Sdk { + sdk_kind: "code_mode_disabled".to_string(), + message: + "Code Mode execution is disabled; set [code_mode].enabled = true to enable it" + .to_string(), + }); + } + if !caller.can_execute() { + return Err(ToolError::Sdk { + sdk_kind: "forbidden".to_string(), + message: "code_execute requires one of scopes: lab, lab:admin".to_string(), + }); + } + self.execute_sandboxed( + code, + max_tool_calls.max(1).min(config.max_tool_calls.max(1)), + Duration::from_millis(config.timeout_ms.max(1)), + caller, + surface, + ) + .await + } + + async fn search_builtin_candidates( + &self, + query: &str, + top_k: usize, + score_floor_fraction: f32, + surface: CodeModeSurface, + ) -> Vec { + let needle = query.trim().to_ascii_lowercase(); + if needle.is_empty() || needle.len() > 500 { + return Vec::new(); + } + + let mut candidates = Vec::new(); + for service in self.registry.services() { + if !self.service_visible(service.name, surface).await { + continue; + } + for action in self.searchable_builtin_actions(service, surface).await { + let haystack = format!( + "{}\n{}\n{}\n{}", + service.name, service.description, action.name, action.description + ) + .to_ascii_lowercase(); + let score = crate::dispatch::gateway::score_name_haystack( + &needle, + &action.name.to_ascii_lowercase(), + &haystack, + ); + if score > 0.0 { + candidates.push(CodeModeSearchCandidate::lab_action( + service.name, + action.name, + action.description, + score, + )); + } + } + } + + candidates.sort_by(compare_code_mode_search_candidates); + if score_floor_fraction > 0.0 + && let Some(top) = candidates.first() + { + let floor = top.score * score_floor_fraction; + candidates.retain(|candidate| candidate.score >= floor); + } + candidates.truncate(top_k.max(1).min(50)); + candidates + } + + async fn schema_for_lab_action( + &self, + id: &str, + service_name: &str, + action_name: &str, + caller: CodeModeCaller, + surface: CodeModeSurface, + ) -> Result { + let Some(entry) = self + .registry + .services() + .iter() + .find(|entry| entry.name == service_name) + else { + return Err(ToolError::Sdk { + sdk_kind: "not_found".to_string(), + message: format!("Lab service `{service_name}` was not found"), + }); + }; + if !self.service_visible(entry.name, surface).await + || !self.action_allowed(entry.name, action_name, surface).await + { + return Err(ToolError::Sdk { + sdk_kind: "not_found".to_string(), + message: format!( + "Lab action `{service_name}.{action_name}` is not exposed on this surface" + ), + }); + } + let action = entry + .actions + .iter() + .find(|action| action.name == action_name) + .ok_or_else(|| ToolError::Sdk { + sdk_kind: "not_found".to_string(), + message: format!("Lab action `{service_name}.{action_name}` was not found"), + })?; + if !caller.can_execute_action(entry, action_name) { + return Err(ToolError::Sdk { + sdk_kind: "forbidden".to_string(), + message: format!( + "action `{action_name}` for service `{}` requires `lab:admin` scope", + entry.name + ), + }); + } + let input_schema = action_input_schema(action); + crate::dispatch::helpers::action_schema(entry.actions, action_name).map(|schema| { + CodeModeSchemaResponse::lab_action_with_input_schema( + id, + action_name, + schema, + input_schema, + ) + }) + } + + async fn schema_for_upstream_tool( + &self, + id: &str, + upstream: &str, + tool: &str, + ) -> Result { + let Some(manager) = self.gateway_manager else { + return Err(ToolError::Sdk { + sdk_kind: "upstream_error".to_string(), + message: "gateway manager is unavailable".to_string(), + }); + }; + let candidate = manager + .resolve_code_mode_upstream_tool(upstream, tool) + .await?; + let Some(schema) = sanitize_code_mode_schema(candidate.input_schema) else { + return Err(ToolError::Sdk { + sdk_kind: "schema_unavailable".to_string(), + message: format!( + "upstream tool `{upstream}::{tool}` schema is unavailable or exceeds the safe return size" + ), + }); + }; + Ok(CodeModeSchemaResponse::upstream_tool( + id, upstream, tool, schema, + )) + } + + async fn execute_sandboxed( + &self, + code: &str, + max_tool_calls: usize, + timeout: Duration, + caller: CodeModeCaller, + surface: CodeModeSurface, + ) -> Result { + let exe = std::env::current_exe().map_err(|err| ToolError::Sdk { + sdk_kind: "internal_error".to_string(), + message: format!("failed to locate current executable for Code Mode runner: {err}"), + })?; + let temp_dir = TempDir::new().map_err(|err| ToolError::Sdk { + sdk_kind: "internal_error".to_string(), + message: format!("failed to create Code Mode sandbox directory: {err}"), + })?; + let mut child = Command::new(exe) + .args(["internal", "code-mode-runner"]) + .current_dir(temp_dir.path()) + .env_clear() + .kill_on_drop(true) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::null()) + .spawn() + .map_err(|err| ToolError::Sdk { + sdk_kind: "internal_error".to_string(), + message: format!("failed to spawn Code Mode runner: {err}"), + })?; + + let mut stdin = child.stdin.take().ok_or_else(|| ToolError::Sdk { + sdk_kind: "internal_error".to_string(), + message: "Code Mode runner stdin was not available".to_string(), + })?; + let stdout = child.stdout.take().ok_or_else(|| ToolError::Sdk { + sdk_kind: "internal_error".to_string(), + message: "Code Mode runner stdout was not available".to_string(), + })?; + write_runner_input( + &mut stdin, + &CodeModeRunnerInput::Start { + code: code.to_string(), + }, + ) + .await?; + + let mut lines = TokioBufReader::new(stdout).lines(); + let mut calls = Vec::new(); + let mut pending_tool_calls = FuturesUnordered::new(); + let mut started_tool_calls = 0usize; + let deadline = tokio::time::Instant::now() + timeout; + + loop { + tokio::select! { + line = tokio::time::timeout_at(deadline, lines.next_line()) => { + let line = match line { + Ok(line) => line, + Err(_) => { + terminate_code_mode_runner(&mut child).await; + return Err(ToolError::Sdk { + sdk_kind: "timeout".to_string(), + message: "Code Mode execution timed out".to_string(), + }); + } + }; + let Some(line) = line.map_err(|err| ToolError::Sdk { + sdk_kind: "internal_error".to_string(), + message: format!("failed to read Code Mode runner output: {err}"), + })? + else { + let status = child.wait().await.map_err(|err| ToolError::Sdk { + sdk_kind: "internal_error".to_string(), + message: format!("failed to wait for Code Mode runner: {err}"), + })?; + return Err(ToolError::Sdk { + sdk_kind: "code_execution_failed".to_string(), + message: format!( + "Code Mode runner exited before completion with status {status}" + ), + }); + }; + match serde_json::from_str::(&line).map_err(|err| { + ToolError::Sdk { + sdk_kind: "internal_error".to_string(), + message: format!("Code Mode runner emitted invalid protocol JSON: {err}"), + } + })? { + CodeModeRunnerOutput::ToolCall { seq, id, params } => { + if started_tool_calls >= max_tool_calls { + terminate_code_mode_runner(&mut child).await; + return Err(ToolError::Sdk { + sdk_kind: "tool_call_limit_exceeded".to_string(), + message: format!( + "Code Mode execution exceeded max_tool_calls={max_tool_calls}" + ), + }); + } + started_tool_calls += 1; + let call_id = id.clone(); + let caller = caller.clone(); + pending_tool_calls.push( + async move { + let result = self + .call_tool_id_before_deadline( + &id, params, deadline, caller, surface, + ) + .await; + (seq, call_id, result) + } + .boxed(), + ); + } + CodeModeRunnerOutput::Done => { + if !pending_tool_calls.is_empty() { + terminate_code_mode_runner(&mut child).await; + return Err(ToolError::Sdk { + sdk_kind: "code_execution_failed".to_string(), + message: "Code Mode runner completed with pending tool calls".to_string(), + }); + } + if calls.is_empty() { + terminate_code_mode_runner(&mut child).await; + return Err(ToolError::Sdk { + sdk_kind: "invalid_param".to_string(), + message: + "Code Mode snippet must call callTool(id, params) at least once" + .to_string(), + }); + } + let status = child.wait().await.map_err(|err| ToolError::Sdk { + sdk_kind: "internal_error".to_string(), + message: format!("failed to wait for Code Mode runner: {err}"), + })?; + if !status.success() { + return Err(ToolError::Sdk { + sdk_kind: "code_execution_failed".to_string(), + message: format!("Code Mode runner exited with status {status}"), + }); + } + calls.sort_by_key(|(seq, _)| *seq); + return Ok(CodeModeExecutionResponse { + calls: calls.into_iter().map(|(_, call)| call).collect(), + }); + } + CodeModeRunnerOutput::Error { kind, message } => { + drop(child.wait().await); + return Err(ToolError::Sdk { + sdk_kind: kind, + message, + }); + } + } + } + completed = pending_tool_calls.next(), if !pending_tool_calls.is_empty() => { + let Some((seq, id, result)) = completed else { + continue; + }; + let result = match result { + Ok(result) => result, + Err(err) => { + drop( + write_runner_input( + &mut stdin, + &CodeModeRunnerInput::ToolError { + seq, + kind: match &err { + ToolError::Sdk { sdk_kind, .. } => { + sdk_kind.as_str() + } + other => other.kind(), + } + .to_string(), + message: err.to_string(), + }, + ) + .await, + ); + terminate_code_mode_runner(&mut child).await; + return Err(err); + } + }; + calls.push((seq, CodeModeExecutedCall { + id: id.clone(), + result: result.clone(), + })); + write_runner_input( + &mut stdin, + &CodeModeRunnerInput::ToolResult { seq, result }, + ) + .await?; + } + } + } + } + + pub(crate) async fn call_tool_id_before_deadline( + &self, + id: &str, + params: Value, + deadline: tokio::time::Instant, + caller: CodeModeCaller, + surface: CodeModeSurface, + ) -> Result { + match tokio::time::timeout_at(deadline, self.call_tool_id(id, params, caller, surface)) + .await + { + Ok(result) => result, + Err(_) => Err(ToolError::Sdk { + sdk_kind: "timeout".to_string(), + message: "Code Mode execution timed out".to_string(), + }), + } + } + + pub(crate) async fn call_tool_id( + &self, + id: &str, + params: Value, + caller: CodeModeCaller, + surface: CodeModeSurface, + ) -> Result { + let parsed = CodeModeToolId::parse(id)?; + match parsed.reference { + CodeModeToolRef::LabAction { service, action } => { + self.call_lab_action(&service, &action, params, caller, surface) + .await + } + CodeModeToolRef::UpstreamTool { upstream, tool } => { + self.call_upstream_tool(&upstream, &tool, params).await + } + } + } + + async fn call_lab_action( + &self, + service_name: &str, + action_name: &str, + params: Value, + caller: CodeModeCaller, + surface: CodeModeSurface, + ) -> Result { + let Some(entry) = self + .registry + .services() + .iter() + .find(|entry| entry.name == service_name) + else { + return Err(ToolError::Sdk { + sdk_kind: "not_found".to_string(), + message: format!("Lab service `{service_name}` was not found"), + }); + }; + if !self.service_visible(entry.name, surface).await + || !self.action_allowed(entry.name, action_name, surface).await + { + return Err(ToolError::Sdk { + sdk_kind: "not_found".to_string(), + message: format!( + "Lab action `{service_name}.{action_name}` is not exposed on this surface" + ), + }); + } + if !caller.can_execute_action(entry, action_name) { + return Err(ToolError::Sdk { + sdk_kind: "forbidden".to_string(), + message: format!( + "action `{action_name}` for service `{}` requires `lab:admin` scope", + entry.name + ), + }); + } + let is_destructive = entry + .actions + .iter() + .any(|action| action.name == action_name && action.destructive); + let confirmed = params.get("confirm").and_then(Value::as_bool) == Some(true); + if is_destructive && !confirmed { + return Err(ToolError::Sdk { + sdk_kind: "confirmation_required".to_string(), + message: format!( + "action `{action_name}` is destructive - pass {{\"confirm\":true}} in params" + ), + }); + } + if is_destructive && !surface.allows_destructive_actions() { + return Err(ToolError::Sdk { + sdk_kind: "confirmation_required".to_string(), + message: format!( + "action `{action_name}` is destructive - pass {{\"confirm\":true}} to code_execute and to the tool params" + ), + }); + } + let params = strip_code_mode_control_params(params); + let params = if entry.name == "gateway" { + inject_gateway_origin_param(params, caller.subject(), surface) + } else { + params + }; + (entry.dispatch)(action_name.to_string(), params).await + } + + async fn call_upstream_tool( + &self, + upstream: &str, + tool: &str, + params: Value, + ) -> Result { + let Some(manager) = self.gateway_manager else { + return Err(ToolError::Sdk { + sdk_kind: "upstream_error".to_string(), + message: "gateway manager is unavailable".to_string(), + }); + }; + manager + .resolve_code_mode_upstream_tool(upstream, tool) + .await?; + let Some(pool) = manager.current_pool().await else { + return Err(ToolError::Sdk { + sdk_kind: "upstream_error".to_string(), + message: "gateway upstream pool is unavailable".to_string(), + }); + }; + let mut upstream_params = CallToolRequestParams::new(tool.to_string()); + upstream_params.arguments = Some(match params { + Value::Object(map) => map, + _ => Map::new(), + }); + match pool.call_tool(upstream, upstream_params).await { + Some(Ok(result)) => { + if result.is_error == Some(true) { + let error_text = result + .content + .first() + .and_then(|content| content.as_text()) + .map(|content| content.text.as_str()); + let (kind, message, counts_as_failure) = + code_mode_upstream_error_info(error_text); + if counts_as_failure { + pool.record_failure(upstream, message.clone()).await; + } else { + pool.record_success(upstream).await; + } + return Err(ToolError::Sdk { + sdk_kind: kind.to_string(), + message, + }); + } + pool.record_success(upstream).await; + serde_json::to_value(result).map_err(|err| ToolError::Sdk { + sdk_kind: "internal_error".to_string(), + message: format!("failed to serialize upstream tool result: {err}"), + }) + } + Some(Err(err)) => { + pool.record_failure(upstream, err.clone()).await; + Err(ToolError::Sdk { + sdk_kind: "upstream_error".to_string(), + message: err, + }) + } + None => { + pool.record_failure(upstream, format!("upstream `{upstream}` is not connected")) + .await; + Err(ToolError::Sdk { + sdk_kind: "not_found".to_string(), + message: format!("upstream tool `{upstream}::{tool}` was not found"), + }) + } + } + } + + async fn searchable_builtin_actions<'b>( + &self, + service: &'b RegisteredService, + surface: CodeModeSurface, + ) -> Vec<&'b ActionSpec> { + let mut actions = service.actions.iter().collect::>(); + if let Some(allowed_actions) = self.allowed_actions(service.name, surface).await + && !allowed_actions.is_empty() + { + actions.retain(|action| allowed_actions.iter().any(|allowed| allowed == action.name)); + } + actions + } + + async fn service_visible(&self, service: &str, surface: CodeModeSurface) -> bool { + match (surface, self.gateway_manager) { + ( + CodeModeSurface::Mcp { + expose_builtin_services: false, + .. + }, + _, + ) => false, + (CodeModeSurface::Mcp { .. }, Some(manager)) => { + manager.surface_enabled_for_service(service, "mcp").await + } + (CodeModeSurface::Cli, Some(manager)) => { + manager.surface_enabled_for_service(service, "cli").await + } + _ => true, + } + } + + async fn action_allowed(&self, service: &str, action: &str, surface: CodeModeSurface) -> bool { + match (surface, self.gateway_manager) { + (CodeModeSurface::Mcp { .. }, Some(manager)) => { + manager + .mcp_action_allowed_for_service(service, action) + .await + } + _ => true, + } + } + + async fn allowed_actions( + &self, + service: &str, + surface: CodeModeSurface, + ) -> Option> { + match (surface, self.gateway_manager) { + (CodeModeSurface::Mcp { .. }, Some(manager)) => { + manager.allowed_mcp_actions_for_service(service).await + } + _ => None, + } + } +} + +impl CodeModeSurface { + fn allows_destructive_actions(self) -> bool { + match self { + Self::Cli => true, + Self::Mcp { + allow_destructive_actions, + .. + } => allow_destructive_actions, + } + } +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum CodeModeRunnerInput { @@ -252,6 +1013,149 @@ pub fn invalid_code_mode_id(message: impl Into) -> ToolError { } } +fn compare_code_mode_search_candidates( + a: &CodeModeSearchCandidate, + b: &CodeModeSearchCandidate, +) -> CmpOrdering { + b.score + .partial_cmp(&a.score) + .unwrap_or(CmpOrdering::Equal) + .then_with(|| a.id.cmp(&b.id)) +} + +async fn write_runner_input( + stdin: &mut ChildStdin, + input: &CodeModeRunnerInput, +) -> Result<(), ToolError> { + let mut line = serde_json::to_vec(input).map_err(|err| ToolError::Sdk { + sdk_kind: "internal_error".to_string(), + message: format!("failed to encode Code Mode runner input: {err}"), + })?; + line.push(b'\n'); + stdin.write_all(&line).await.map_err(|err| ToolError::Sdk { + sdk_kind: "internal_error".to_string(), + message: format!("failed to write Code Mode runner input: {err}"), + })?; + stdin.flush().await.map_err(|err| ToolError::Sdk { + sdk_kind: "internal_error".to_string(), + message: format!("failed to flush Code Mode runner input: {err}"), + }) +} + +async fn terminate_code_mode_runner(child: &mut Child) { + drop(child.kill().await); + drop(child.wait().await); +} + +fn strip_code_mode_control_params(mut params: Value) -> Value { + if let Value::Object(map) = &mut params { + map.remove("confirm"); + } + params +} + +fn inject_gateway_origin_param( + params: Value, + subject: Option<&str>, + surface: CodeModeSurface, +) -> Value { + let surface_label = match surface { + CodeModeSurface::Mcp { .. } => "mcp", + CodeModeSurface::Cli => "cli", + }; + let raw = subject + .map(|value| format!("{surface_label}:{value}")) + .unwrap_or_else(|| format!("{surface_label}:anonymous")); + let Some(mut object) = params.as_object().cloned() else { + return params; + }; + object.insert( + "owner".to_string(), + json!({ + "surface": surface_label, + "subject": subject, + "raw": raw, + }), + ); + object.insert("origin".to_string(), Value::String(raw)); + Value::Object(object) +} + +fn builtin_action_requires_admin(entry: &RegisteredService, action: &str) -> bool { + if entry.name == "gateway" { + return !matches!( + action, + "help" | "schema" | "gateway.help" | "gateway.schema" + ); + } + entry.name == "setup" + && entry + .actions + .iter() + .any(|spec| spec.name == action && spec.destructive) +} + +fn code_mode_canonical_error_kind(s: &str) -> &'static str { + match s { + "unknown_action" => "unknown_action", + "unknown_subaction" => "unknown_subaction", + "missing_param" => "missing_param", + "invalid_param" => "invalid_param", + "unknown_instance" => "unknown_instance", + "confirmation_required" => "confirmation_required", + "conflict" => "conflict", + "auth_failed" => "auth_failed", + "not_found" => "not_found", + "rate_limited" => "rate_limited", + "validation_failed" => "validation_failed", + "network_error" => "network_error", + "server_error" => "server_error", + "decode_error" => "decode_error", + "internal_error" => "internal_error", + "upstream_error" => "upstream_error", + _ => "internal_error", + } +} + +fn code_mode_upstream_error_info(text: Option<&str>) -> (&'static str, String, bool) { + let Some(text) = text else { + return ( + "upstream_error", + "upstream returned a non-text error payload".to_string(), + true, + ); + }; + + let Ok(parsed) = serde_json::from_str::(text) else { + return ("upstream_error", text.to_string(), true); + }; + + let error_obj = parsed + .get("error") + .and_then(Value::as_object) + .or_else(|| parsed.as_object()); + let Some(error_obj) = error_obj else { + return ("upstream_error", text.to_string(), true); + }; + + let kind = error_obj + .get("kind") + .and_then(Value::as_str) + .map(code_mode_canonical_error_kind) + .unwrap_or("upstream_error"); + let message = error_obj + .get("message") + .and_then(Value::as_str) + .unwrap_or(text) + .to_string(); + let counts_as_failure = matches!( + kind, + "upstream_error" | "network_error" | "server_error" | "decode_error" | "internal_error" + ); + + (kind, message, counts_as_failure) +} + pub fn run_code_mode_runner_stdio() -> ExitCode { RUNNER_STATE.with(|state| { *state.borrow_mut() = Some(CodeModeRunnerState { @@ -623,11 +1527,16 @@ fn typescript_property_name(name: &str) -> String { mod tests { use boa_engine::{Context, Source}; use serde_json::json; + use std::future::Future; + use std::pin::Pin; use super::{ CodeModeSchemaResponse, CodeModeSearchCandidate, CodeModeToolId, CodeModeToolRef, - action_input_schema, configure_code_mode_runtime_limits, sanitize_code_mode_schema, + action_input_schema, code_mode_upstream_error_info, configure_code_mode_runtime_limits, + sanitize_code_mode_schema, }; + use crate::dispatch::error::ToolError; + use crate::registry::{RegisteredService, RegisteredServiceKind, ToolRegistry}; use lab_apis::core::action::{ActionSpec, ParamSpec}; #[test] @@ -673,6 +1582,127 @@ mod tests { } } + #[test] + fn upstream_error_info_preserves_user_error_kinds() { + let text = json!({ + "error": { + "kind": "missing_param", + "message": "query is required", + "param": "query" + } + }) + .to_string(); + + let (kind, message, counts_as_failure) = code_mode_upstream_error_info(Some(&text)); + + assert_eq!(kind, "missing_param"); + assert_eq!(message, "query is required"); + assert!(!counts_as_failure); + } + + const DESTRUCTIVE_ACTIONS: &[ActionSpec] = &[ActionSpec { + name: "danger", + description: "Dangerous test action", + destructive: true, + params: &[], + returns: "object", + }]; + + fn echo_dispatch( + _action: String, + params: serde_json::Value, + ) -> Pin> + Send>> { + Box::pin(async move { Ok(params) }) + } + + fn destructive_test_registry() -> ToolRegistry { + let mut registry = ToolRegistry::new(); + registry.register(RegisteredService { + name: "gateway", + description: "Gateway", + category: "bootstrap", + kind: RegisteredServiceKind::BootstrapOperator, + status: "available", + actions: DESTRUCTIVE_ACTIONS, + dispatch: echo_dispatch, + }); + registry + } + + #[tokio::test] + async fn mcp_code_mode_requires_top_level_confirmation_for_destructive_actions() { + let registry = destructive_test_registry(); + let broker = super::CodeModeBroker::new(®istry, None); + + let err = broker + .call_tool_id( + "lab::gateway.danger", + json!({"confirm": true}), + super::CodeModeCaller::TrustedLocal, + super::CodeModeSurface::Mcp { + expose_builtin_services: true, + allow_destructive_actions: false, + }, + ) + .await + .expect_err("mcp destructive action should require top-level code_execute confirm"); + + assert_eq!(err.kind(), "confirmation_required"); + } + + #[tokio::test] + async fn code_mode_schema_requires_admin_for_admin_only_actions() { + let registry = destructive_test_registry(); + let broker = super::CodeModeBroker::new(®istry, None); + + let err = broker + .schema( + "lab::gateway.danger", + super::CodeModeCaller::Scoped { + scopes: vec!["lab".to_string()], + subject: Some("subject-1".to_string()), + }, + super::CodeModeSurface::Mcp { + expose_builtin_services: true, + allow_destructive_actions: true, + }, + ) + .await + .expect_err("schema for admin-only action must require admin scope"); + + assert_eq!(err.kind(), "forbidden"); + } + + #[tokio::test] + async fn code_mode_overwrites_gateway_provenance_fields() { + let registry = destructive_test_registry(); + let broker = super::CodeModeBroker::new(®istry, None); + + let result = broker + .call_tool_id( + "lab::gateway.danger", + json!({ + "confirm": true, + "origin": "spoofed", + "owner": {"raw": "spoofed"} + }), + super::CodeModeCaller::Scoped { + scopes: vec!["lab:admin".to_string()], + subject: Some("subject-1".to_string()), + }, + super::CodeModeSurface::Mcp { + expose_builtin_services: true, + allow_destructive_actions: true, + }, + ) + .await + .unwrap(); + + assert_eq!(result.pointer("/origin"), Some(&json!("mcp:subject-1"))); + assert_eq!(result.pointer("/owner/raw"), Some(&json!("mcp:subject-1"))); + assert_eq!(result.pointer("/owner/surface"), Some(&json!("mcp"))); + } + #[test] fn builds_search_candidate_for_lab_action() { let candidate = CodeModeSearchCandidate::lab_action( diff --git a/crates/lab/src/lib.rs b/crates/lab/src/lib.rs index f095156e4..c097114ed 100644 --- a/crates/lab/src/lib.rs +++ b/crates/lab/src/lib.rs @@ -11,6 +11,9 @@ pub mod config; pub mod dispatch; #[allow(unreachable_pub)] pub mod docs; +#[cfg(test)] +#[allow(dead_code)] +pub mod log_fmt; #[allow(unreachable_pub)] pub mod mcp; pub mod net; diff --git a/crates/lab/src/main.rs b/crates/lab/src/main.rs index 844108e2a..2eb70fe1d 100644 --- a/crates/lab/src/main.rs +++ b/crates/lab/src/main.rs @@ -165,8 +165,14 @@ async fn main() -> ExitCode { // by default — upstream connect/discovery events would otherwise flood // ordinary commands like `gateway list`. LAB_LOG still wins when set. let log_filter_override: Option = match &cli.command { - cli::Command::Serve(args) => args.log_level.as_ref().map(|level| format!("labby={level},warn")), - cli::Command::Mcp(args) => args.log_level.as_ref().map(|level| format!("labby={level},warn")), + cli::Command::Serve(args) => args + .log_level + .as_ref() + .map(|level| format!("labby={level},warn")), + cli::Command::Mcp(args) => args + .log_level + .as_ref() + .map(|level| format!("labby={level},warn")), _ if std::env::var_os("LAB_LOG").is_none() => { Some("labby=warn,lab_apis=warn,rmcp=warn".to_string()) } diff --git a/crates/lab/src/mcp/CLAUDE.md b/crates/lab/src/mcp/CLAUDE.md index 0001add2d..ac73daa0d 100644 --- a/crates/lab/src/mcp/CLAUDE.md +++ b/crates/lab/src/mcp/CLAUDE.md @@ -34,14 +34,11 @@ For normal services, `dispatch//dispatch.rs` owns action routing, catal not add `dispatch/gateway-scout/` unless a second surface consumer is confirmed. - `code_search`, `code_schema`, and `code_execute` are registered - directly in `mcp/server.rs` as gateway Code Mode meta-tools. They are - MCP-only because their protocol contract is schema-first tool - discovery plus a child-process JavaScript runner, not Lab's - action+params service shape. Keep reusable schema, id, runner, and - sandbox helpers in `dispatch/gateway/code_mode.rs`, and keep upstream - visibility policy in `GatewayManager`. If a second surface needs Code - Mode, move the orchestration out of `mcp/server.rs` before adding that - surface. + directly in `mcp/server.rs` as gateway Code Mode meta-tools. MCP owns + tool registration, scope extraction, MCP request parsing, and + `CallToolResult` envelope conversion. Code Mode business logic lives + in `dispatch/gateway/code_mode.rs` so the native CLI can call the same + broker without routing through MCP. **No business logic anywhere in `mcp/`.** If you find yourself calling `reqwest`, parsing JSON beyond param extraction, or retrying, move it to `lab-apis/src//client.rs`. diff --git a/crates/lab/src/mcp/server.rs b/crates/lab/src/mcp/server.rs index a6d557ba4..1d7e9e560 100644 --- a/crates/lab/src/mcp/server.rs +++ b/crates/lab/src/mcp/server.rs @@ -6,14 +6,11 @@ use sha2::{Digest, Sha256}; use std::borrow::Cow; use std::cmp::Ordering as CmpOrdering; -use std::collections::HashMap; -use std::process::Stdio; use std::sync::Arc; use std::sync::atomic::{AtomicU8, Ordering}; -use std::time::{Duration, Instant}; +use std::time::Instant; use axum::http::{self, request::Parts}; -use futures::{FutureExt, StreamExt, stream::FuturesUnordered}; use rmcp::model::{ AnnotateAble, CallToolRequestParams, CallToolResult, CompleteRequestParams, CompleteResult, CompletionInfo, Content, GetPromptRequestParams, GetPromptResult, ListPromptsResult, @@ -24,19 +21,12 @@ use rmcp::model::{ use rmcp::service::{NotificationContext, Peer, RequestContext}; use rmcp::{ErrorData, RoleServer, ServerHandler}; use serde_json::Value; -use tempfile::TempDir; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::process::{Child, ChildStdin, Command}; use tokio::sync::RwLock; use crate::config::NodeRole; use crate::dispatch::error::ToolError as DispatchToolError; use crate::dispatch::gateway::SHARED_GATEWAY_OAUTH_SUBJECT; -use crate::dispatch::gateway::code_mode::{ - CodeModeExecutedCall, CodeModeExecutionResponse, CodeModeRunnerInput, CodeModeRunnerOutput, - CodeModeSchemaResponse, CodeModeSearchCandidate, CodeModeToolId, CodeModeToolRef, - action_input_schema, sanitize_code_mode_schema, -}; +use crate::dispatch::gateway::code_mode::{CodeModeBroker, CodeModeCaller, CodeModeSurface}; use crate::dispatch::gateway::manager::{GatewayManager, GatewayToolSearchResult}; use crate::mcp::catalog::{ CODE_EXECUTE_TOOL_NAME, CODE_SCHEMA_TOOL_NAME, CODE_SEARCH_TOOL_NAME, @@ -1179,6 +1169,10 @@ impl ServerHandler for LabMcpServer { "type": "integer", "minimum": 1, "maximum": 50 + }, + "confirm": { + "type": "boolean", + "description": "Required at the top level, in addition to per-call params.confirm, before Code Mode may execute destructive Lab actions." } }, "required": ["code"] @@ -1378,7 +1372,6 @@ impl ServerHandler for LabMcpServer { } .max(1) .min(50); - let score_floor_fraction = manager.tool_search_config().await.score_floor_fraction; tracing::info!( surface = "mcp", service = "code_search", @@ -1389,27 +1382,18 @@ impl ServerHandler for LabMcpServer { top_k, "gateway code search start" ); - let mut candidates = self - .search_builtin_code_mode_candidates(&query, top_k, score_floor_fraction) - .await; - return match manager.search_tools(&query, top_k, true).await { - Ok(upstream_results) => { - candidates = merge_code_mode_search_candidates( - candidates, - upstream_results - .into_iter() - .map(|result| { - CodeModeSearchCandidate::upstream_tool( - &result.upstream, - &result.name, - &result.description, - result.score, - result.input_schema, - ) - }) - .collect(), - top_k, - ); + let broker = CodeModeBroker::new(&self.registry, Some(manager)); + let caller = auth.map_or(CodeModeCaller::TrustedLocal, |auth| { + CodeModeCaller::Scoped { + scopes: auth.scopes.clone(), + subject: self.request_subject(&context).map(ToOwned::to_owned), + } + }); + return match broker + .search(&query, top_k, caller, self.code_mode_surface(false)) + .await + { + Ok(candidates) => { tracing::info!( surface = "mcp", service = "code_search", @@ -1427,27 +1411,6 @@ impl ServerHandler for LabMcpServer { )])) } Err(err) => { - let kind = err.kind(); - if kind == "index_warming" && !candidates.is_empty() { - candidates.sort_by(compare_code_mode_search_candidates); - candidates.truncate(top_k); - tracing::info!( - surface = "mcp", - service = "code_search", - action = "call_tool", - subject, - query_hash = %query_hash, - query_len = query.len(), - top_k, - result_count = candidates.len(), - elapsed_ms = started.elapsed().as_millis(), - upstream_kind = kind, - "gateway code search ok" - ); - return Ok(CallToolResult::success(vec![Content::text( - serde_json::to_string(&candidates).unwrap_or_else(|_| "[]".to_string()), - )])); - } tracing::warn!( surface = "mcp", service = "code_search", @@ -1457,24 +1420,11 @@ impl ServerHandler for LabMcpServer { query_len = query.len(), top_k, elapsed_ms = started.elapsed().as_millis(), - kind, + kind = err.kind(), error = %err, "gateway code search failed" ); - let mut extra = serde_json::Map::new(); - if kind == "index_warming" { - extra.insert("retry_after_ms".to_string(), serde_json::json!(2000)); - } - if kind == "invalid_param" { - extra.insert("param".to_string(), serde_json::json!("query")); - } - let env = build_error_extra( - &service, - "call_tool", - kind, - &err.to_string(), - &Value::Object(extra), - ); + let env = tool_error_envelope(&service, "call_tool", &err); Ok(CallToolResult::error(vec![Content::text(env.to_string())])) } }; @@ -1502,7 +1452,7 @@ impl ServerHandler for LabMcpServer { ); return Ok(CallToolResult::error(vec![Content::text(env.to_string())])); } - if self.gateway_manager.is_none() { + let Some(manager) = &self.gateway_manager else { let envelope = build_error( &service, "call_tool", @@ -1512,20 +1462,13 @@ impl ServerHandler for LabMcpServer { return Ok(CallToolResult::error(vec![Content::text( envelope.to_string(), )])); - } + }; let id = args .get("id") .and_then(Value::as_str) .unwrap_or_default() .to_string(); - let parsed = match CodeModeToolId::parse(&id) { - Ok(parsed) => parsed, - Err(err) => { - let env = tool_error_envelope(&service, "call_tool", &err); - return Ok(CallToolResult::error(vec![Content::text(env.to_string())])); - } - }; - let id_hash = hash_arguments(&Value::String(parsed.raw.clone())); + let id_hash = hash_arguments(&Value::String(id.clone())); tracing::info!( surface = "mcp", service = "code_schema", @@ -1534,7 +1477,17 @@ impl ServerHandler for LabMcpServer { id_hash = %id_hash, "gateway code schema start" ); - return match self.code_mode_schema_response(parsed).await { + let broker = CodeModeBroker::new(&self.registry, Some(manager)); + let caller = auth.map_or(CodeModeCaller::TrustedLocal, |auth| { + CodeModeCaller::Scoped { + scopes: auth.scopes.clone(), + subject: self.request_subject(&context).map(ToOwned::to_owned), + } + }); + return match broker + .schema(&id, caller, self.code_mode_surface(false)) + .await + { Ok(response) => { tracing::info!( surface = "mcp", @@ -1627,6 +1580,8 @@ impl ServerHandler for LabMcpServer { .unwrap_or(config.max_tool_calls) .max(1) .min(config.max_tool_calls); + let allow_destructive_actions = + args.get("confirm").and_then(Value::as_bool) == Some(true); let code_hash = hash_arguments(&Value::String(code.to_string())); tracing::info!( surface = "mcp", @@ -1637,19 +1592,32 @@ impl ServerHandler for LabMcpServer { max_tool_calls = requested_max_tool_calls, "gateway code execute start" ); - let subject_raw = self.request_subject(&context); - let response = match self - .execute_code_mode_sandboxed( + let broker = CodeModeBroker::new(&self.registry, Some(manager)); + let caller = auth.map_or(CodeModeCaller::TrustedLocal, |auth| { + CodeModeCaller::Scoped { + scopes: auth.scopes.clone(), + subject: self.request_subject(&context).map(ToOwned::to_owned), + } + }); + let before = self.snapshot_catalog().await; + let response = match broker + .execute( code, requested_max_tool_calls, - Duration::from_millis(config.timeout_ms), - auth, - subject_raw, + caller, + self.code_mode_surface(allow_destructive_actions), + config, ) .await { - Ok(response) => response, + Ok(response) => { + let after = self.snapshot_catalog().await; + self.notify_catalog_changes(&before, &after).await; + response + } Err(err) => { + let after = self.snapshot_catalog().await; + self.notify_catalog_changes(&before, &after).await; let env = tool_error_envelope(&service, "call_tool", &err); return Ok(CallToolResult::error(vec![Content::text(env.to_string())])); } @@ -2781,6 +2749,13 @@ fn redact_subject_for_logging(subject: &str) -> String { } impl LabMcpServer { + fn code_mode_surface(&self, allow_destructive_actions: bool) -> CodeModeSurface { + CodeModeSurface::Mcp { + expose_builtin_services: !matches!(self.node_role, Some(NodeRole::NonMaster)), + allow_destructive_actions, + } + } + fn request_subject<'a>(&self, context: &'a RequestContext) -> Option<&'a str> { subject_from_extensions(&context.extensions) } @@ -2955,516 +2930,6 @@ impl LabMcpServer { results } - async fn search_builtin_code_mode_candidates( - &self, - query: &str, - top_k: usize, - score_floor_fraction: f32, - ) -> Vec { - let needle = query.trim().to_ascii_lowercase(); - if needle.is_empty() || needle.len() > 500 { - return Vec::new(); - } - - let mut candidates = Vec::new(); - - for service in self.registry.services() { - if !self.service_visible_on_mcp(service.name).await { - continue; - } - for action in self.searchable_builtin_actions(service).await { - let haystack = format!( - "{}\n{}\n{}\n{}", - service.name, service.description, action.name, action.description - ) - .to_ascii_lowercase(); - let score = crate::dispatch::gateway::score_name_haystack( - &needle, - &action.name.to_ascii_lowercase(), - &haystack, - ); - if score > 0.0 { - candidates.push(CodeModeSearchCandidate::lab_action( - service.name, - action.name, - action.description, - score, - )); - } - } - } - - candidates.sort_by(compare_code_mode_search_candidates); - - if score_floor_fraction > 0.0 { - if let Some(top) = candidates.first() { - let floor = top.score * score_floor_fraction; - candidates.retain(|candidate| candidate.score >= floor); - } - } - - candidates.truncate(top_k.max(1).min(50)); - candidates - } - - async fn code_mode_schema_for_lab_action( - &self, - id: &str, - service_name: &str, - action_name: &str, - ) -> Result { - let Some(entry) = self - .registry - .services() - .iter() - .find(|entry| entry.name == service_name) - else { - return Err(DispatchToolError::Sdk { - sdk_kind: "not_found".to_string(), - message: format!("Lab service `{service_name}` was not found"), - }); - }; - if !self.service_visible_on_mcp(entry.name).await - || !self.action_allowed_on_mcp(entry.name, action_name).await - { - return Err(DispatchToolError::Sdk { - sdk_kind: "not_found".to_string(), - message: format!( - "Lab action `{service_name}.{action_name}` is not exposed on the mcp surface" - ), - }); - } - let action = entry - .actions - .iter() - .find(|action| action.name == action_name) - .ok_or_else(|| DispatchToolError::Sdk { - sdk_kind: "not_found".to_string(), - message: format!("Lab action `{service_name}.{action_name}` was not found"), - })?; - let input_schema = action_input_schema(action); - crate::dispatch::helpers::action_schema(entry.actions, action_name).map(|schema| { - CodeModeSchemaResponse::lab_action_with_input_schema( - id, - action_name, - schema, - input_schema, - ) - }) - } - - async fn code_mode_schema_for_upstream_tool( - &self, - id: &str, - upstream: &str, - tool: &str, - ) -> Result { - let Some(manager) = self.gateway_manager.as_ref() else { - return Err(DispatchToolError::Sdk { - sdk_kind: "upstream_error".to_string(), - message: "gateway manager is unavailable".to_string(), - }); - }; - let candidate = manager - .resolve_code_mode_upstream_tool(upstream, tool) - .await?; - let Some(schema) = sanitize_code_mode_schema(candidate.input_schema) else { - return Err(DispatchToolError::Sdk { - sdk_kind: "schema_unavailable".to_string(), - message: format!( - "upstream tool `{upstream}::{tool}` schema is unavailable or exceeds the safe return size" - ), - }); - }; - Ok(CodeModeSchemaResponse::upstream_tool( - id, upstream, tool, schema, - )) - } - - async fn code_mode_schema_response( - &self, - parsed: CodeModeToolId, - ) -> Result { - match parsed.reference { - CodeModeToolRef::LabAction { service, action } => { - self.code_mode_schema_for_lab_action(&parsed.raw, &service, &action) - .await - } - CodeModeToolRef::UpstreamTool { upstream, tool } => { - self.code_mode_schema_for_upstream_tool(&parsed.raw, &upstream, &tool) - .await - } - } - } - - async fn execute_code_mode_sandboxed( - &self, - code: &str, - max_tool_calls: usize, - timeout: Duration, - auth: Option<&crate::api::oauth::AuthContext>, - subject: Option<&str>, - ) -> Result { - let exe = std::env::current_exe().map_err(|err| DispatchToolError::Sdk { - sdk_kind: "internal_error".to_string(), - message: format!("failed to locate current executable for Code Mode runner: {err}"), - })?; - let temp_dir = TempDir::new().map_err(|err| DispatchToolError::Sdk { - sdk_kind: "internal_error".to_string(), - message: format!("failed to create Code Mode sandbox directory: {err}"), - })?; - let mut child = Command::new(exe) - .args(["internal", "code-mode-runner"]) - .current_dir(temp_dir.path()) - .env_clear() - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - .map_err(|err| DispatchToolError::Sdk { - sdk_kind: "internal_error".to_string(), - message: format!("failed to spawn Code Mode runner: {err}"), - })?; - - let mut stdin = child.stdin.take().ok_or_else(|| DispatchToolError::Sdk { - sdk_kind: "internal_error".to_string(), - message: "Code Mode runner stdin was not available".to_string(), - })?; - let stdout = child.stdout.take().ok_or_else(|| DispatchToolError::Sdk { - sdk_kind: "internal_error".to_string(), - message: "Code Mode runner stdout was not available".to_string(), - })?; - write_runner_input( - &mut stdin, - &CodeModeRunnerInput::Start { - code: code.to_string(), - }, - ) - .await?; - - let mut lines = BufReader::new(stdout).lines(); - let mut calls = Vec::new(); - let mut pending_tool_calls = FuturesUnordered::new(); - let mut started_tool_calls = 0usize; - let deadline = tokio::time::Instant::now() + timeout; - - loop { - tokio::select! { - line = tokio::time::timeout_at(deadline, lines.next_line()) => { - let line = match line { - Ok(line) => line, - Err(_) => { - terminate_code_mode_runner(&mut child).await; - return Err(DispatchToolError::Sdk { - sdk_kind: "timeout".to_string(), - message: "Code Mode execution timed out".to_string(), - }); - } - }; - let Some(line) = line.map_err(|err| DispatchToolError::Sdk { - sdk_kind: "internal_error".to_string(), - message: format!("failed to read Code Mode runner output: {err}"), - })? - else { - let status = child.wait().await.map_err(|err| DispatchToolError::Sdk { - sdk_kind: "internal_error".to_string(), - message: format!("failed to wait for Code Mode runner: {err}"), - })?; - return Err(DispatchToolError::Sdk { - sdk_kind: "code_execution_failed".to_string(), - message: format!( - "Code Mode runner exited before completion with status {status}" - ), - }); - }; - match serde_json::from_str::(&line).map_err(|err| { - DispatchToolError::Sdk { - sdk_kind: "internal_error".to_string(), - message: format!("Code Mode runner emitted invalid protocol JSON: {err}"), - } - })? { - CodeModeRunnerOutput::ToolCall { seq, id, params } => { - if started_tool_calls >= max_tool_calls { - terminate_code_mode_runner(&mut child).await; - return Err(DispatchToolError::Sdk { - sdk_kind: "tool_call_limit_exceeded".to_string(), - message: format!( - "Code Mode execution exceeded max_tool_calls={max_tool_calls}" - ), - }); - } - started_tool_calls += 1; - let call_id = id.clone(); - pending_tool_calls.push( - async move { - let result = self - .code_mode_call_tool_id_before_deadline( - &id, params, deadline, auth, subject, - ) - .await; - (seq, call_id, result) - } - .boxed(), - ); - } - CodeModeRunnerOutput::Done => { - if !pending_tool_calls.is_empty() { - terminate_code_mode_runner(&mut child).await; - return Err(DispatchToolError::Sdk { - sdk_kind: "code_execution_failed".to_string(), - message: "Code Mode runner completed with pending tool calls".to_string(), - }); - } - if calls.is_empty() { - terminate_code_mode_runner(&mut child).await; - return Err(DispatchToolError::Sdk { - sdk_kind: "invalid_param".to_string(), - message: - "Code Mode snippet must call callTool(id, params) at least once" - .to_string(), - }); - } - let status = child.wait().await.map_err(|err| DispatchToolError::Sdk { - sdk_kind: "internal_error".to_string(), - message: format!("failed to wait for Code Mode runner: {err}"), - })?; - if !status.success() { - return Err(DispatchToolError::Sdk { - sdk_kind: "code_execution_failed".to_string(), - message: format!("Code Mode runner exited with status {status}"), - }); - } - calls.sort_by_key(|(seq, _)| *seq); - return Ok(CodeModeExecutionResponse { - calls: calls.into_iter().map(|(_, call)| call).collect(), - }); - } - CodeModeRunnerOutput::Error { kind, message } => { - drop(child.wait().await); - return Err(DispatchToolError::Sdk { - sdk_kind: kind, - message, - }); - } - } - } - completed = pending_tool_calls.next(), if !pending_tool_calls.is_empty() => { - let Some((seq, id, result)) = completed else { - continue; - }; - let result = match result { - Ok(result) => result, - Err(err) => { - drop( - write_runner_input( - &mut stdin, - &CodeModeRunnerInput::ToolError { - seq, - kind: match &err { - DispatchToolError::Sdk { sdk_kind, .. } => { - sdk_kind.as_str() - } - other => other.kind(), - } - .to_string(), - message: err.to_string(), - }, - ) - .await, - ); - terminate_code_mode_runner(&mut child).await; - return Err(err); - } - }; - calls.push((seq, CodeModeExecutedCall { - id: id.clone(), - result: result.clone(), - })); - write_runner_input( - &mut stdin, - &CodeModeRunnerInput::ToolResult { seq, result }, - ) - .await?; - } - } - } - } - - async fn code_mode_call_tool_id_before_deadline( - &self, - id: &str, - params: Value, - deadline: tokio::time::Instant, - auth: Option<&crate::api::oauth::AuthContext>, - subject: Option<&str>, - ) -> Result { - match tokio::time::timeout_at( - deadline, - self.code_mode_call_tool_id(id, params, auth, subject), - ) - .await - { - Ok(result) => result, - Err(_) => Err(DispatchToolError::Sdk { - sdk_kind: "timeout".to_string(), - message: "Code Mode execution timed out".to_string(), - }), - } - } - - async fn code_mode_call_tool_id( - &self, - id: &str, - params: Value, - auth: Option<&crate::api::oauth::AuthContext>, - subject: Option<&str>, - ) -> Result { - let parsed = CodeModeToolId::parse(id)?; - match parsed.reference { - CodeModeToolRef::LabAction { service, action } => { - self.code_mode_call_lab_action(&service, &action, params, auth, subject) - .await - } - CodeModeToolRef::UpstreamTool { upstream, tool } => { - self.code_mode_call_upstream_tool(&upstream, &tool, params) - .await - } - } - } - - async fn code_mode_call_lab_action( - &self, - service_name: &str, - action_name: &str, - params: Value, - auth: Option<&crate::api::oauth::AuthContext>, - subject: Option<&str>, - ) -> Result { - let Some(entry) = self - .registry - .services() - .iter() - .find(|entry| entry.name == service_name) - else { - return Err(DispatchToolError::Sdk { - sdk_kind: "not_found".to_string(), - message: format!("Lab service `{service_name}` was not found"), - }); - }; - if !self.service_visible_on_mcp(entry.name).await - || !self.action_allowed_on_mcp(entry.name, action_name).await - { - return Err(DispatchToolError::Sdk { - sdk_kind: "not_found".to_string(), - message: format!( - "Lab action `{service_name}.{action_name}` is not exposed on the mcp surface" - ), - }); - } - if !tool_execute_builtin_action_allowed(entry, action_name, auth) { - return Err(DispatchToolError::Sdk { - sdk_kind: "forbidden".to_string(), - message: format!( - "action `{action_name}` for service `{}` requires `lab:admin` scope", - entry.name - ), - }); - } - let is_destructive = entry - .actions - .iter() - .any(|action| action.name == action_name && action.destructive); - if is_destructive && params.get("confirm").and_then(Value::as_bool) != Some(true) { - return Err(DispatchToolError::Sdk { - sdk_kind: "confirmation_required".to_string(), - message: format!( - "action `{action_name}` is destructive — pass {{\"confirm\":true}} in params" - ), - }); - } - let params = if entry.name == "gateway" { - inject_gateway_origin_param(params, subject) - } else { - params - }; - (entry.dispatch)(action_name.to_string(), params).await - } - - async fn code_mode_call_upstream_tool( - &self, - upstream: &str, - tool: &str, - params: Value, - ) -> Result { - let Some(manager) = self.gateway_manager.as_ref() else { - return Err(DispatchToolError::Sdk { - sdk_kind: "upstream_error".to_string(), - message: "gateway manager is unavailable".to_string(), - }); - }; - manager - .resolve_code_mode_upstream_tool(upstream, tool) - .await?; - let Some(pool) = manager.current_pool().await else { - return Err(DispatchToolError::Sdk { - sdk_kind: "upstream_error".to_string(), - message: "gateway upstream pool is unavailable".to_string(), - }); - }; - let before = self.snapshot_catalog().await; - let mut upstream_params = CallToolRequestParams::new(tool.to_string()); - upstream_params.arguments = Some(match params { - Value::Object(map) => map, - _ => serde_json::Map::new(), - }); - match pool.call_tool(upstream, upstream_params).await { - Some(Ok(result)) => { - let (result, kind, counts_as_failure) = - normalize_upstream_result(tool, "call_tool", result); - if counts_as_failure { - pool.record_failure( - upstream, - format!("upstream `{upstream}` returned `{kind}`"), - ) - .await; - } else { - pool.record_success(upstream).await; - } - let after = self.snapshot_catalog().await; - self.notify_catalog_changes(&before, &after).await; - if kind != "ok" { - return Err(DispatchToolError::Sdk { - sdk_kind: kind.to_string(), - message: call_tool_result_message(&result), - }); - } - serde_json::to_value(result).map_err(|err| DispatchToolError::Sdk { - sdk_kind: "internal_error".to_string(), - message: format!("failed to serialize upstream tool result: {err}"), - }) - } - Some(Err(err)) => { - pool.record_failure(upstream, err.clone()).await; - let after = self.snapshot_catalog().await; - self.notify_catalog_changes(&before, &after).await; - Err(DispatchToolError::Sdk { - sdk_kind: "upstream_error".to_string(), - message: err, - }) - } - None => { - pool.record_failure(upstream, format!("upstream `{upstream}` is not connected")) - .await; - let after = self.snapshot_catalog().await; - self.notify_catalog_changes(&before, &after).await; - Err(DispatchToolError::Sdk { - sdk_kind: "not_found".to_string(), - message: format!("upstream tool `{upstream}::{tool}` was not found"), - }) - } - } - } - async fn searchable_builtin_actions<'a>( &self, service: &'a crate::registry::RegisteredService, @@ -3479,198 +2944,34 @@ impl LabMcpServer { } } -fn compare_code_mode_search_candidates( - a: &CodeModeSearchCandidate, - b: &CodeModeSearchCandidate, -) -> CmpOrdering { - b.score - .partial_cmp(&a.score) - .unwrap_or(CmpOrdering::Equal) - .then_with(|| a.id.cmp(&b.id)) -} - -const SEARCH_RRF_K: f32 = 60.0; -const BUILTIN_SEARCH_WEIGHT: f32 = 1.0; -const GATEWAY_SEARCH_WEIGHT: f32 = 2.0; - -fn rank_fusion_score(rank: usize, weight: f32) -> f32 { - #[allow(clippy::cast_precision_loss)] - let rank = rank as f32; - weight / (SEARCH_RRF_K + rank) -} - -fn merge_code_mode_search_candidates( - left: Vec, - right: Vec, - top_k: usize, -) -> Vec { - #[derive(Debug)] - struct FusedCandidate { - candidate: CodeModeSearchCandidate, - score: f32, - best_source_weight: f32, - } - - let mut fused: HashMap = HashMap::new(); - for (rank, mut candidate) in left.into_iter().enumerate() { - let score = rank_fusion_score(rank, BUILTIN_SEARCH_WEIGHT); - candidate.score = score; - fused.insert( - candidate.id.clone(), - FusedCandidate { - candidate, - score, - best_source_weight: BUILTIN_SEARCH_WEIGHT, - }, - ); - } - for (rank, mut candidate) in right.into_iter().enumerate() { - let score = rank_fusion_score(rank, GATEWAY_SEARCH_WEIGHT); - candidate.score = score; - fused - .entry(candidate.id.clone()) - .and_modify(|existing| { - existing.score += score; - existing.best_source_weight = - existing.best_source_weight.max(GATEWAY_SEARCH_WEIGHT); - existing.candidate.score = existing.score; - existing.candidate.schema_available |= candidate.schema_available; - }) - .or_insert(FusedCandidate { - candidate, - score, - best_source_weight: GATEWAY_SEARCH_WEIGHT, - }); - } - - let mut results = fused.into_values().collect::>(); - results.sort_by(|a, b| { - b.score - .partial_cmp(&a.score) - .unwrap_or(CmpOrdering::Equal) - .then_with(|| { - b.best_source_weight - .partial_cmp(&a.best_source_weight) - .unwrap_or(CmpOrdering::Equal) - }) - .then_with(|| a.candidate.id.cmp(&b.candidate.id)) - }); - results.truncate(top_k.max(1).min(50)); - results - .into_iter() - .map(|mut fused| { - fused.candidate.score = fused.score; - fused.candidate - }) - .collect() -} - -async fn write_runner_input( - stdin: &mut ChildStdin, - input: &CodeModeRunnerInput, -) -> Result<(), DispatchToolError> { - let mut line = serde_json::to_vec(input).map_err(|err| DispatchToolError::Sdk { - sdk_kind: "internal_error".to_string(), - message: format!("failed to encode Code Mode runner input: {err}"), - })?; - line.push(b'\n'); - stdin - .write_all(&line) - .await - .map_err(|err| DispatchToolError::Sdk { - sdk_kind: "internal_error".to_string(), - message: format!("failed to write Code Mode runner input: {err}"), - })?; - stdin.flush().await.map_err(|err| DispatchToolError::Sdk { - sdk_kind: "internal_error".to_string(), - message: format!("failed to flush Code Mode runner input: {err}"), - }) -} - -async fn terminate_code_mode_runner(child: &mut Child) { - if let Ok(Some(_)) = child.try_wait() { - return; - } - drop(child.kill().await); - drop(child.wait().await); -} - fn tool_error_envelope(service: &str, action: &str, err: &DispatchToolError) -> Value { - match err { - DispatchToolError::Sdk { sdk_kind, message } => { - build_error(service, action, sdk_kind, message) - } - other => build_error(service, action, other.kind(), &other.to_string()), + let Ok(Value::Object(mut serialized)) = serde_json::to_value(err) else { + return build_error(service, action, err.kind(), &err.to_string()); + }; + let kind = serialized + .remove("kind") + .and_then(|value| value.as_str().map(ToOwned::to_owned)) + .unwrap_or_else(|| err.kind().to_string()); + let message = serialized + .remove("message") + .and_then(|value| value.as_str().map(ToOwned::to_owned)) + .unwrap_or_else(|| err.to_string()); + if serialized.is_empty() { + build_error(service, action, &kind, &message) + } else { + build_error_extra(service, action, &kind, &message, &Value::Object(serialized)) } } fn merge_tool_search_results( - left: Vec, + mut left: Vec, right: Vec, top_k: usize, ) -> Vec { - #[derive(Debug)] - struct FusedResult { - result: GatewayToolSearchResult, - score: f32, - best_source_weight: f32, - } - - let mut fused: HashMap<(String, String), FusedResult> = HashMap::new(); - for (rank, mut result) in left.into_iter().enumerate() { - let score = rank_fusion_score(rank, BUILTIN_SEARCH_WEIGHT); - result.score = score; - fused.insert( - (result.upstream.clone(), result.name.clone()), - FusedResult { - result, - score, - best_source_weight: BUILTIN_SEARCH_WEIGHT, - }, - ); - } - for (rank, mut result) in right.into_iter().enumerate() { - let score = rank_fusion_score(rank, GATEWAY_SEARCH_WEIGHT); - result.score = score; - fused - .entry((result.upstream.clone(), result.name.clone())) - .and_modify(|existing| { - existing.score += score; - existing.best_source_weight = - existing.best_source_weight.max(GATEWAY_SEARCH_WEIGHT); - existing.result.score = existing.score; - if existing.result.input_schema.is_none() { - existing.result.input_schema = result.input_schema.take(); - } - }) - .or_insert(FusedResult { - result, - score, - best_source_weight: GATEWAY_SEARCH_WEIGHT, - }); - } - - let mut results = fused.into_values().collect::>(); - results.sort_by(|a, b| { - b.score - .partial_cmp(&a.score) - .unwrap_or(CmpOrdering::Equal) - .then_with(|| { - b.best_source_weight - .partial_cmp(&a.best_source_weight) - .unwrap_or(CmpOrdering::Equal) - }) - .then_with(|| a.result.name.cmp(&b.result.name)) - .then_with(|| a.result.upstream.cmp(&b.result.upstream)) - }); - results.truncate(top_k.max(1).min(50)); - results - .into_iter() - .map(|mut fused| { - fused.result.score = fused.score; - fused.result - }) - .collect() + left.extend(right); + left.sort_by(compare_tool_search_results); + left.truncate(top_k.max(1).min(50)); + left } fn compare_tool_search_results( @@ -3970,15 +3271,6 @@ fn normalize_upstream_result( ) } -fn call_tool_result_message(result: &CallToolResult) -> String { - result - .content - .first() - .and_then(|content| content.as_text()) - .map(|content| content.text.to_string()) - .unwrap_or_else(|| "upstream tool returned an error".to_string()) -} - /// Recover a stable kind tag and message from an `anyhow::Error`. /// /// Priority: @@ -4028,13 +3320,9 @@ pub fn extract_error_info(e: &anyhow::Error) -> (&'static str, String, Option 0.03 && result.score < 0.04) - ); - } - - #[test] - fn tool_search_merges_gateway_results_by_rank_not_raw_score() { - let results = merge_tool_search_results( - vec![GatewayToolSearchResult { - name: "extract".to_string(), - description: "Host extraction".to_string(), - upstream: "lab".to_string(), - score: 22.0, - input_schema: None, - }], - vec![GatewayToolSearchResult { - name: "list_containers".to_string(), - description: "List Docker containers".to_string(), - upstream: "dozzle".to_string(), - score: 0.033, - input_schema: Some(serde_json::json!({"type": "object"})), - }], - 2, - ); - - let first = results.first().expect("merged result"); - assert_eq!(first.upstream, "dozzle"); - assert_eq!(first.name, "list_containers"); - assert!( - first.score > 0.03 && first.score < 0.04, - "score should be normalized to cross-source rank fusion" - ); - } - #[tokio::test] async fn code_mode_brokers_lab_action_by_stable_id() { - let server = super::LabMcpServer { - registry: std::sync::Arc::new(completion_test_registry()), - gateway_manager: None, - node_role: None, - peers: std::sync::Arc::new(tokio::sync::RwLock::new(Vec::new())), - logging_level: std::sync::Arc::new(std::sync::atomic::AtomicU8::new( - logging_level_rank(rmcp::model::LoggingLevel::Info), - )), - }; + let registry = completion_test_registry(); + let broker = CodeModeBroker::new(®istry, None); - let result = server - .code_mode_call_tool_id( + let result = broker + .call_tool_id( "lab::radarr.movie.search", serde_json::json!({"query": "Alien"}), - None, - None, + CodeModeCaller::TrustedLocal, + CodeModeSurface::Cli, ) .await .unwrap(); @@ -4496,24 +3735,16 @@ mod tests { actions: SLOW_ACTIONS, dispatch: slow_dispatch, }); - let server = super::LabMcpServer { - registry: std::sync::Arc::new(registry), - gateway_manager: None, - node_role: None, - peers: std::sync::Arc::new(tokio::sync::RwLock::new(Vec::new())), - logging_level: std::sync::Arc::new(std::sync::atomic::AtomicU8::new( - logging_level_rank(rmcp::model::LoggingLevel::Info), - )), - }; + let broker = CodeModeBroker::new(®istry, None); let started = std::time::Instant::now(); - let err = server - .code_mode_call_tool_id_before_deadline( + let err = broker + .call_tool_id_before_deadline( "lab::slow.wait", serde_json::json!({}), tokio::time::Instant::now() + Duration::from_millis(50), - None, - None, + CodeModeCaller::TrustedLocal, + CodeModeSurface::Cli, ) .await .expect_err("brokered tool call should be bounded by Code Mode timeout"); diff --git a/crates/lab/src/node/ws_client.rs b/crates/lab/src/node/ws_client.rs index a9b3bdf29..59579bbfc 100644 --- a/crates/lab/src/node/ws_client.rs +++ b/crates/lab/src/node/ws_client.rs @@ -821,6 +821,14 @@ struct InstallDecodeError { /// the node or lock up a worker permit. fn decode_component_files( files: Option<&Vec>, +) -> Result)>, InstallDecodeError> { + decode_component_files_with_limits(files, MAX_COMPONENT_FILE_SIZE, MAX_COMPONENT_AGGREGATE_SIZE) +} + +fn decode_component_files_with_limits( + files: Option<&Vec>, + max_file_size: usize, + max_aggregate_size: usize, ) -> Result)>, InstallDecodeError> { use base64::Engine as _; let Some(files) = files else { @@ -870,24 +878,21 @@ fn decode_component_files( }); } }; - if decoded.len() > MAX_COMPONENT_FILE_SIZE { + if decoded.len() > max_file_size { return Err(InstallDecodeError { kind: "content_too_large", message: format!( "component `{path}` is {} bytes; per-file limit is {} bytes", decoded.len(), - MAX_COMPONENT_FILE_SIZE + max_file_size ), }); } aggregate = aggregate.saturating_add(decoded.len()); - if aggregate > MAX_COMPONENT_AGGREGATE_SIZE { + if aggregate > max_aggregate_size { return Err(InstallDecodeError { kind: "content_too_large", - message: format!( - "aggregate component payload exceeds {} bytes", - MAX_COMPONENT_AGGREGATE_SIZE - ), + message: format!("aggregate component payload exceeds {max_aggregate_size} bytes"), }); } out.push((path, decoded)); @@ -1873,10 +1878,9 @@ mod tests { #[test] fn decode_component_files_rejects_per_file_oversize() { - // 6 MB of 'a' > 5 MB per-file cap. - let big = "a".repeat(6 * 1024 * 1024); + let big = "a".repeat(17); let files = vec![json!({ "path": "big.bin", "content": big, "encoding": "utf8" })]; - let err = decode_component_files(Some(&files)) + let err = decode_component_files_with_limits(Some(&files), 16, 64) .err() .expect("must reject"); assert_eq!(err.kind, "content_too_large"); @@ -1884,13 +1888,13 @@ mod tests { #[test] fn decode_component_files_rejects_aggregate_oversize() { - // 7 × 5 MB = 35 MB > 32 MB aggregate cap. Individual files fit under - // the per-file cap so the aggregate check is exercised. - let chunk = "a".repeat(5 * 1024 * 1024 - 1024); + // 7 x 10 bytes = 70 bytes > 64 byte aggregate cap. Individual files fit + // under the per-file cap so the aggregate check is exercised. + let chunk = "a".repeat(10); let files: Vec = (0..7) .map(|i| json!({ "path": format!("f{i}.bin"), "content": chunk, "encoding": "utf8" })) .collect(); - let err = decode_component_files(Some(&files)) + let err = decode_component_files_with_limits(Some(&files), 16, 64) .err() .expect("must reject"); assert_eq!(err.kind, "content_too_large"); diff --git a/docs/dev/DISPATCH.md b/docs/dev/DISPATCH.md index c78c67227..9f8057598 100644 --- a/docs/dev/DISPATCH.md +++ b/docs/dev/DISPATCH.md @@ -73,6 +73,11 @@ It does not own: - axum response types - table rendering +Synthetic gateway workflows such as Code Mode still follow this ownership rule: +the schema-first IDs, search/schema composition, sandbox parent broker, and +tool-call dispatch live in `dispatch/gateway/`; MCP and CLI only adapt their +surface-specific inputs and outputs. + ### Surface Adapters The three product surfaces are adapters over `dispatch`. diff --git a/docs/generated/action-catalog.json b/docs/generated/action-catalog.json index 344ecbb33..af1f703e5 100644 --- a/docs/generated/action-catalog.json +++ b/docs/generated/action-catalog.json @@ -1106,6 +1106,105 @@ "inventory_scope": "global_inventory_not_active_runtime_exposure", "builtin": false }, + { + "service": "fs", + "action": "fs.list", + "description": "List immediate entries of a directory inside the configured workspace root", + "destructive": false, + "params": [ + { + "name": "path", + "ty": "string", + "required": false, + "description": "Workspace-relative path to list; empty or omitted means the workspace root" + } + ], + "returns": "{entries: [{name, path, kind, size, modified, accessible}], truncated: bool}", + "surface_availability": { + "cli": false, + "mcp": true, + "api": true, + "web_ui": true + }, + "requires_http_subject": false, + "auth_posture": "uses the selected transport auth and gateway visibility policy", + "inventory_scope": "global_inventory_not_active_runtime_exposure", + "builtin": false + }, + { + "service": "fs", + "action": "fs.preview", + "description": "Stream a capped byte window from a workspace file (HTTP-only, admin-session gated)", + "destructive": false, + "params": [ + { + "name": "path", + "ty": "string", + "required": true, + "description": "Workspace-relative path of the file to preview" + }, + { + "name": "max_bytes", + "ty": "integer", + "required": false, + "description": "Upper bound on bytes returned; server cap of 2 MiB always wins" + } + ], + "returns": "binary (streamed); mime from safe-MIME whitelist or application/octet-stream", + "surface_availability": { + "cli": false, + "mcp": false, + "api": true, + "web_ui": true + }, + "requires_http_subject": true, + "auth_posture": "HTTP-only admin/browser session path; intentionally unavailable on MCP", + "inventory_scope": "global_inventory_not_active_runtime_exposure", + "builtin": false + }, + { + "service": "fs", + "action": "help", + "description": "Show service actions", + "destructive": false, + "params": [], + "returns": "HelpPayload", + "surface_availability": { + "cli": false, + "mcp": true, + "api": true, + "web_ui": true + }, + "requires_http_subject": false, + "auth_posture": "uses the selected transport auth and gateway visibility policy", + "inventory_scope": "global_inventory_not_active_runtime_exposure", + "builtin": true + }, + { + "service": "fs", + "action": "schema", + "description": "Show the schema for a specific action", + "destructive": false, + "params": [ + { + "name": "action", + "ty": "string", + "required": true, + "description": "Action name to describe" + } + ], + "returns": "ActionSpec", + "surface_availability": { + "cli": false, + "mcp": true, + "api": true, + "web_ui": true + }, + "requires_http_subject": false, + "auth_posture": "uses the selected transport auth and gateway visibility policy", + "inventory_scope": "global_inventory_not_active_runtime_exposure", + "builtin": true + }, { "service": "gateway", "action": "gateway.add", diff --git a/docs/generated/action-catalog.md b/docs/generated/action-catalog.md index a167af97c..d8769a64c 100644 --- a/docs/generated/action-catalog.md +++ b/docs/generated/action-catalog.md @@ -45,6 +45,10 @@ This is a global inventory, not the active runtime exposure or authorization pol | `extract` | `list_hosts` | false | false | | `string[]` | cli, mcp, api | | `extract` | `scan` | false | false | `uri: string`
`hosts: string[]`
`redact_secrets: bool` | `DiscoveredService[]` | cli, mcp, api | | `extract` | `schema` | false | false | `action*: string` | `Schema` | cli, mcp, api | +| `fs` | `fs.list` | false | false | `path: string` | `{entries: [{name, path, kind, size, modified, accessible}], truncated: bool}` | mcp, api, web | +| `fs` | `fs.preview` | false | false | `path*: string`
`max_bytes: integer` | `binary (streamed); mime from safe-MIME whitelist or application/octet-stream` | api, web | +| `fs` | `help` | true | false | | `HelpPayload` | mcp, api, web | +| `fs` | `schema` | true | false | `action*: string` | `ActionSpec` | mcp, api, web | | `gateway` | `gateway.add` | false | true | `spec*: json`
`bearer_token_value: string`
`allow_stdio: boolean` | `GatewayView` | cli, mcp, api, web | | `gateway` | `gateway.client_config.get` | false | false | `name*: string` | `McpClientConfigView` | cli, mcp, api, web | | `gateway` | `gateway.discover` | false | false | `clients: string[]`
`include_existing: boolean` | `DiscoveredServerView[]` | cli, mcp, api, web | diff --git a/docs/generated/api-routes.json b/docs/generated/api-routes.json index 3e9330e4b..edc2e1eb2 100644 --- a/docs/generated/api-routes.json +++ b/docs/generated/api-routes.json @@ -511,6 +511,22 @@ "cache_posture": "upgrade, not cacheable", "notes": "legacy websocket alias" }, + { + "method": "POST", + "path": "/v1/fs", + "surface": "api", + "handler_group": "services", + "feature": "fs", + "runtime_condition": "mounted only when fs is enabled and /v1 auth is configured if LAB_WEB_UI_AUTH_DISABLED=true", + "auth_required": true, + "bearer_only": false, + "session_cookie_allowed": true, + "csrf_required": true, + "host_validation": false, + "master_only": true, + "cache_posture": "not cacheable", + "notes": "service action dispatch" + }, { "method": "POST", "path": "/v1/gateway", diff --git a/docs/generated/api-routes.md b/docs/generated/api-routes.md index e58f61510..0e6df28eb 100644 --- a/docs/generated/api-routes.md +++ b/docs/generated/api-routes.md @@ -36,6 +36,7 @@ Generated by `labby docs generate`. Do not edit by hand. | POST | `/v1/extract` | true | true | true | true | true | extract | extract action dispatch | | POST | `/v1/fleet/hello` | false | false | false | false | false | nodes | legacy node self-registration alias | | GET | `/v1/fleet/ws` | false | false | false | false | false | nodes | legacy websocket alias | +| POST | `/v1/fs` | true | true | true | false | true | services | service action dispatch | | POST | `/v1/gateway` | true | true | true | false | true | gateway | gateway action dispatch | | POST | `/v1/gateway` | true | true | true | false | true | services | service action dispatch | | POST | `/v1/gateway/oauth/cancel` | true | true | true | false | true | upstream_oauth | cancel upstream OAuth flow | diff --git a/docs/generated/cli-help.md b/docs/generated/cli-help.md index 34cce836b..30e282326 100644 --- a/docs/generated/cli-help.md +++ b/docs/generated/cli-help.md @@ -988,6 +988,7 @@ Commands: import Import discovered MCP servers into the gateway (disabled by default) pending Manage pending discovered servers waiting for approval public-urls Show resolved public URL configuration (app and MCP gateway) + code Search, inspect, and execute Code Mode snippets through dispatch Options: --json @@ -2066,6 +2067,112 @@ Options: Print help ``` +## `labby gateway code` + +```text +Search, inspect, and execute Code Mode snippets through dispatch + +Usage: code [OPTIONS] + +Commands: + search Search Code Mode tool IDs by natural-language query + schema Show the schema and generated bindings for one Code Mode tool ID + exec Execute a sandboxed JavaScript snippet that calls callTool(id, params) + +Options: + --json + Emit JSON instead of human-readable tables + + --color + Control human-readable CLI styling + + [default: auto] + [possible values: auto, plain, color] + + -h, --help + Print help +``` + +## `labby gateway code search` + +```text +Search Code Mode tool IDs by natural-language query + +Usage: search [OPTIONS] + +Arguments: + + + +Options: + --json + Emit JSON instead of human-readable tables + + --top-k + [default: 10] + + --color + Control human-readable CLI styling + + [default: auto] + [possible values: auto, plain, color] + + -h, --help + Print help +``` + +## `labby gateway code schema` + +```text +Show the schema and generated bindings for one Code Mode tool ID + +Usage: schema [OPTIONS] + +Arguments: + + + +Options: + --json + Emit JSON instead of human-readable tables + + --color + Control human-readable CLI styling + + [default: auto] + [possible values: auto, plain, color] + + -h, --help + Print help +``` + +## `labby gateway code exec` + +```text +Execute a sandboxed JavaScript snippet that calls callTool(id, params) + +Usage: exec [OPTIONS] + +Options: + --code + + + --json + Emit JSON instead of human-readable tables + + --color + Control human-readable CLI styling + + [default: auto] + [possible values: auto, plain, color] + + --file + + + -h, --help + Print help +``` + ## `labby oauth` ```text diff --git a/docs/generated/mcp-help.json b/docs/generated/mcp-help.json index e3f97b2d2..17f1427b7 100644 --- a/docs/generated/mcp-help.json +++ b/docs/generated/mcp-help.json @@ -3178,6 +3178,29 @@ "returns": "AuditReport" } ] + }, + { + "name": "fs", + "description": "Workspace filesystem browser (read-only, deny-listed)", + "category": "bootstrap", + "status": "available", + "requires_http_subject": false, + "actions": [ + { + "name": "fs.list", + "description": "List immediate entries of a directory inside the configured workspace root", + "destructive": false, + "params": [ + { + "name": "path", + "ty": "string", + "required": false, + "description": "Workspace-relative path to list; empty or omitted means the workspace root" + } + ], + "returns": "{entries: [{name, path, kind, size, modified, accessible}], truncated: bool}" + } + ] } ] } diff --git a/docs/generated/mcp-help.md b/docs/generated/mcp-help.md index bd5c872fc..868c1180c 100644 --- a/docs/generated/mcp-help.md +++ b/docs/generated/mcp-help.md @@ -184,3 +184,4 @@ Generated by `labby docs generate`. Do not edit by hand. | `lab_admin` | bootstrap | available | `help` | false | | `Catalog` | | `lab_admin` | bootstrap | available | `schema` | false | `action*: string` | `Schema` | | `lab_admin` | bootstrap | available | `onboarding.audit` | false | `services*: string[]` | `AuditReport` | +| `fs` | bootstrap | available | `fs.list` | false | `path: string` | `{entries: [{name, path, kind, size, modified, accessible}], truncated: bool}` | diff --git a/docs/generated/openapi.json b/docs/generated/openapi.json index 08535da01..fe9cd337a 100644 --- a/docs/generated/openapi.json +++ b/docs/generated/openapi.json @@ -388,6 +388,72 @@ ] } }, + "/v1/fs": { + "post": { + "tags": [ + "fs" + ], + "summary": "Dispatch action to fs", + "description": "Execute an action on the fs service. Use `action: \"help\"` to list available actions.", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ActionRequest" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "Successful action response", + "content": { + "application/json": { + "schema": { + "type": "object" + } + } + } + }, + "400": { + "description": "Bad request (unknown action, confirmation required)", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorUnknownAction" + } + } + } + }, + "401": { + "description": "Authentication failed", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorSdk" + } + } + } + }, + "422": { + "description": "Validation error (missing or invalid param)", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorMissingParam" + } + } + } + } + }, + "security": [ + { + "bearer_auth": [] + } + ] + } + }, "/v1/gateway": { "post": { "tags": [ @@ -1365,6 +1431,14 @@ } } }, + "FsFsListParams": { + "type": "object", + "properties": { + "path": { + "type": "string" + } + } + }, "GatewayGatewayAddParams": { "type": "object", "required": [ diff --git a/docs/generated/service-catalog.json b/docs/generated/service-catalog.json index a8674fd73..4a6f97815 100644 --- a/docs/generated/service-catalog.json +++ b/docs/generated/service-catalog.json @@ -125,6 +125,27 @@ "supports_multi_instance": false, "metadata_source": "registry + PluginMeta" }, + { + "name": "fs", + "display_name": "fs", + "description": "Workspace filesystem browser (read-only, deny-listed)", + "category": "bootstrap", + "status": "available", + "feature": "fs", + "exposure": "feature_gated", + "surfaces": { + "cli": false, + "mcp": true, + "api": true, + "web_ui": true + }, + "default_port": null, + "docs_url": null, + "coverage_doc": null, + "upstream_doc": null, + "supports_multi_instance": false, + "metadata_source": "registry synthetic metadata" + }, { "name": "gateway", "display_name": "gateway", diff --git a/docs/generated/service-catalog.md b/docs/generated/service-catalog.md index 75b490029..bd93a9735 100644 --- a/docs/generated/service-catalog.md +++ b/docs/generated/service-catalog.md @@ -10,6 +10,7 @@ Generated by `labby docs generate`. Do not edit by hand. | `device` | available | AlwaysOn | - | bootstrap | mcp, api, web | registry synthetic metadata | | `doctor` | available | AlwaysOn | - | bootstrap | cli, mcp, api | registry + PluginMeta | | `extract` | available | FeatureGated | extract | bootstrap | cli, mcp, api | registry + PluginMeta | +| `fs` | available | FeatureGated | fs | bootstrap | mcp, api, web | registry synthetic metadata | | `gateway` | available | AlwaysOn | - | bootstrap | cli, mcp, api, web | registry synthetic metadata | | `lab_admin` | available | RuntimeConditional | - | bootstrap | cli, mcp | registry synthetic metadata | | `logs` | available | AlwaysOn | - | bootstrap | cli, mcp, api, web | registry synthetic metadata | diff --git a/docs/services/GATEWAY.md b/docs/services/GATEWAY.md index bbaf3df42..c0fbc71b0 100644 --- a/docs/services/GATEWAY.md +++ b/docs/services/GATEWAY.md @@ -139,7 +139,13 @@ Invoke call shape on the MCP surface: { "name": "search_issues", "arguments": { "query": "repo:jmagar/lab tool_search" } } ``` -Code Mode is schema-first discovery plus opt-in sandboxed execution. +Code Mode is schema-first discovery plus opt-in sandboxed execution. The +business logic lives in gateway dispatch and is exposed through two adapters: +the MCP meta-tools (`code_search`, `code_schema`, `code_execute`) and the native +CLI (`labby gateway code search|schema|exec`). Both adapters use the same stable +ids, schema response types, sandbox runner, visibility checks, and brokered +execution path. + `code_search` returns stable ids for Lab actions and upstream tools: ```json @@ -161,6 +167,14 @@ Example candidate ids: { "id": "lab::gateway.gateway.schema" } ``` +Native CLI equivalents: + +```bash +labby gateway code search "gateway servers" --json +labby gateway code schema "lab::gateway.gateway.servers" --json +labby gateway code exec --code 'await callTool("lab::gateway.gateway.servers", {})' --json +``` + Lab ids use `lab::.` and return an `ActionSpec`-derived contract (`schema_format: "lab_action_spec"`). Upstream ids use `upstream::::` and return the upstream JSON Schema @@ -200,7 +214,9 @@ Rules: - `include_schema` defaults to `false`; schemas are sanitized before return when requested - `code_search` is read-only discovery and accepts `lab:read`, `lab`, or `lab:admin` - `code_schema` exposes full schemas and requires `lab` or `lab:admin` -- `code_execute` requires `lab` or `lab:admin`, is disabled unless `[code_mode].enabled = true`, and brokers calls through the same gateway visibility and destructive-action checks as `invoke` +- `code_execute` requires `lab` or `lab:admin`, is disabled unless `[code_mode].enabled = true`, and brokers calls through the same gateway visibility checks as `invoke` +- destructive Lab actions in MCP Code Mode require both top-level `code_execute.confirm = true` and per-call `params.confirm = true`; Code Mode does not prompt per action via MCP elicitation +- gateway action provenance fields (`origin` and `owner`) are reserved in Code Mode and are overwritten by the broker - `code_execute` enforces `timeout_ms` by killing the child process and enforces `max_tool_calls` in the parent before brokering each call - invalid Code Mode ids return `invalid_code_mode_id` - unavailable or overlarge upstream schemas return `schema_unavailable`