From 44542e3ce0865516d0b52057398a6089adb63476 Mon Sep 17 00:00:00 2001 From: justin Date: Mon, 25 May 2026 13:52:07 +0800 Subject: [PATCH 1/2] feat(mcp-registry): InstalledServer HTTP-remote transport MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the named follow-up TODO from #2559: > InstalledServer HTTP-remote transport variant (most Smithery servers > are HTTP-remote; current install path is stdio-only) Before this change, the setup-agent install path (`mcp_setup_test_connection` + `mcp_setup_install_and_connect`) hard-filtered registry detail responses to `c.r#type == "stdio"`. Smithery — which hosts the bulk of the official MCP ecosystem — serves ~99% of its listings as HTTP-remote, so the UX was: user browses Smithery, picks a server, clicks install, and almost every attempt failed at "no stdio connection". The `mcp_client` module already has `McpHttpClient` (Streamable HTTP + OAuth + SSE per the MCP spec) wired and tested for the static-config servers. This change extends the *install / persistence / connect* plumbing in `mcp_registry` to dispatch through it. ## What changed **types.rs** — new `Transport::{Stdio, HttpRemote { url }}` enum with serialisation-safe `dispatch_kind()` strings (`"stdio"` / `"http_remote"`) and a `parse(kind, url)` inverse that defaults unknown / missing values to `Stdio` (the migration safety hatch). `InstalledServer` grows a `transport: Transport` field, defaulted via `#[serde(default)]` so legacy serialised payloads from before this change still load. **store.rs** — `mcp_servers` table grows two additive columns (`transport`, `deployment_url`) via post-`CREATE TABLE` `ALTER TABLE ADD COLUMN`, gated by a `PRAGMA table_info` lookup so the migration is idempotent across launches. Insert + list + get carry the new columns. The row mapper falls back to `Transport::Stdio` when `transport` is missing — pre-migration rows continue to load as stdio. **setup_ops.rs** — new `pick_connection` helper picks the best `SmitheryConnection` from a registry detail. Preference order: published stdio → any stdio → published http_remote → any http_remote → None. Stdio always wins when offered, so no behavior regression for any server that already installed pre-PR. `mcp_setup_test_connection` + `mcp_setup_install_and_connect` branch on the picked transport and dispatch to either `McpStdioClient` or `McpHttpClient`. A `ConnectionKind` trait normalises the registry-side `"http"` / `"sse"` strings into the persisted `"http_remote"` dispatch kind so the picker and the install record agree on vocabulary. **connections.rs** — new `ActiveClient::{Stdio(Arc<...>), Http(Arc<...>)}` enum wraps the two clients with a shared async surface (`list_tools` / `call_tool` / `close_session`). `connect()` now branches on `server.transport`: - `Stdio` — current subprocess spawn (unchanged). - `HttpRemote { url }` — `McpHttpClient::new(url, 30)` dial. Empty URL bails loudly. Both paths still run `initialize` + `list_tools` synchronously so a misconfigured server fails at connect, not at first call. **ops.rs** — the legacy `mcp_clients_install` direct-install RPC was already stdio-filtered; pinned its constructed records to `Transport::Stdio` so the legacy path is explicit about scope. HTTP support flows through the newer setup-agent path. Wiring the legacy path to HTTP-remote is a small follow-up (out of scope here to keep the diff focused on the migration + dispatch). ## Tests 64/64 pass in `cargo test --lib mcp_registry` (was 57; +7 new): * `transport_dispatch_kind_strings_are_stable` — pins the persisted column values. * `transport_parse_falls_back_to_stdio_for_unknown_kinds` — the migration safety hatch (empty / garbage / `"stdio"` all → Stdio, `"http_remote"` carries the URL through). * `transport_deployment_url_accessor` — confirms stdio → None, http_remote → Some(url), the two never get crossed. * `installed_server_defaults_transport_to_stdio_on_missing_field` — legacy JSON payloads (no `transport` key) still deserialise. * `pick_connection_prefers_stdio_over_http` / `_prefers_published_stdio_first` / `_falls_back_to_http_remote_when_no_stdio` / `_returns_none_for_only_unknown_kinds` — preference order pinned. * `connection_kind_normalises_http_variants` — the `"http"` / `"sse"` / `"http_remote"` → `"http_remote"` mapping. * `http_remote_server_roundtrips_with_url_preserved` — SQLite write + read for the new transport, deployment_url preserved. * `list_servers_preserves_per_row_transport` — mixed stdio + http rows don't cross-contaminate at SELECT. * `additive_migration_recovers_pre_migration_row_as_stdio` — builds a pre-migration schema, inserts a legacy row, runs `init_schema` twice (idempotency), and confirms the legacy row loads as `Transport::Stdio`. `cargo check --lib` + `cargo fmt --check` clean. No protocol-level behavior change for any server that already installs today (stdio still wins the picker when offered). ## Out of scope * Legacy `mcp_clients_install` direct path doesn't yet dispatch HTTP (still stdio-filtered) — most callers should switch to the setup agent; explicit follow-up if anyone needs the legacy path too. * No UI changes — the existing install dialog calls the same RPC and will start succeeding for HTTP-only Smithery servers as soon as this lands. * OAuth flow for HTTP-remote installs is handled by `McpHttpClient`'s existing auth layer; no new auth plumbing here. --- src/openhuman/mcp_registry/connections.rs | 123 ++++++++-- src/openhuman/mcp_registry/ops.rs | 7 + src/openhuman/mcp_registry/setup_ops.rs | 269 +++++++++++++++++++--- src/openhuman/mcp_registry/store.rs | 179 +++++++++++++- src/openhuman/mcp_registry/types.rs | 156 ++++++++++++- 5 files changed, 668 insertions(+), 66 deletions(-) diff --git a/src/openhuman/mcp_registry/connections.rs b/src/openhuman/mcp_registry/connections.rs index 6a442f8e7b..78e37befda 100644 --- a/src/openhuman/mcp_registry/connections.rs +++ b/src/openhuman/mcp_registry/connections.rs @@ -1,9 +1,16 @@ //! Global in-process registry of active MCP client connections. //! //! Keyed by `server_id` (UUID). Connections are established by [`connect`] -//! and removed by [`disconnect`]. The actual stdio transport lives in -//! [`crate::openhuman::mcp_client::McpStdioClient`] — this module just -//! owns the per-server lifecycle and a global handle map. +//! and removed by [`disconnect`]. The actual transport +//! ([`McpStdioClient`] for local subprocess installs, +//! [`McpHttpClient`] for HTTP-remote installs hosted by Smithery / +//! similar) lives in [`crate::openhuman::mcp_client`] — this module just +//! owns the per-server lifecycle, the transport dispatch, and a global +//! handle map. +//! +//! Dispatch is driven by the `transport` field on each [`InstalledServer`], +//! which is set at install time by `mcp_setup_install_and_connect` and +//! persisted in the `mcp_servers.transport` column. use std::collections::HashMap; use std::sync::{Arc, OnceLock}; @@ -12,16 +19,55 @@ use serde_json::Value; use tokio::sync::RwLock; use crate::openhuman::config::Config; -use crate::openhuman::mcp_client::{McpRemoteTool, McpStdioClient}; +use crate::openhuman::mcp_client::{McpHttpClient, McpRemoteTool, McpStdioClient}; use super::store; -use super::types::{ConnStatus, InstalledServer, McpTool, ServerStatus}; +use super::types::{ConnStatus, InstalledServer, McpTool, ServerStatus, Transport}; // ── Connection record ──────────────────────────────────────────────────────── -/// One live MCP subprocess plus the tool list cached after `initialize`. +/// Active transport for one connected MCP install. Mirrors +/// [`crate::openhuman::mcp_client::registry::McpTransportClient`] but lives +/// here so `mcp_registry` doesn't have to depend on the static-config +/// registry's specific wrapping. Both variants expose the same surface +/// (`initialize` / `list_tools` / `call_tool` / `close_session`) so callers +/// don't have to branch. +enum ActiveClient { + Stdio(Arc), + Http(Arc), +} + +impl ActiveClient { + async fn list_tools(&self) -> anyhow::Result> { + match self { + Self::Stdio(c) => c.list_tools().await, + Self::Http(c) => c.list_tools().await, + } + } + + async fn call_tool( + &self, + name: &str, + arguments: Value, + ) -> anyhow::Result { + match self { + Self::Stdio(c) => c.call_tool(name, arguments).await, + Self::Http(c) => c.call_tool(name, arguments).await, + } + } + + async fn close_session(&self) -> anyhow::Result<()> { + match self { + Self::Stdio(c) => c.close_session().await, + Self::Http(c) => c.close_session().await, + } + } +} + +/// One live MCP client (stdio subprocess OR HTTP-remote dial) plus the +/// tool list cached after `initialize`. struct Connection { - client: Arc, + client: ActiveClient, tools: RwLock>, } @@ -41,13 +87,21 @@ fn connections() -> &'static RwLock>> { // ── Public API ──────────────────────────────────────────────────────────────── -/// Spawn a new stdio subprocess (via `McpStdioClient`), run `initialize`, -/// cache the tool list, and store the connection in the global registry. +/// Bring up a new MCP client for `server`, run `initialize`, cache the +/// tool list, and store the connection in the global registry. +/// +/// Dispatches on `server.transport`: +/// - [`Transport::Stdio`] — spawn `command` + `args` as a subprocess and +/// speak JSON-RPC over stdin/stdout (the original behaviour). +/// - [`Transport::HttpRemote`] — dial the published HTTPS endpoint +/// directly with [`McpHttpClient`]. No subprocess. Needed for the +/// `~99%` of Smithery listings that are HTTP-remote. pub async fn connect(config: &Config, server: &InstalledServer) -> anyhow::Result> { tracing::debug!( - "[mcp-registry] connect server_id={} qualified_name={}", + "[mcp-registry] connect server_id={} qualified_name={} transport={}", server.server_id, - server.qualified_name + server.qualified_name, + server.transport.dispatch_kind() ); let env_map = store::load_env_values(config, &server.server_id).unwrap_or_default(); @@ -60,22 +114,45 @@ pub async fn connect(config: &Config, server: &InstalledServer) -> anyhow::Resul ); let identity = config.mcp_client.client_identity.clone(); - let client = Arc::new(McpStdioClient::new( - server.command.clone(), - server.args.clone(), - env, - None, - identity, - )); - - // Initialize + first tools/list happen here so a misconfigured server - // fails loudly at `connect` instead of silently at first `call_tool`. - client.initialize().await?; + + // Branch on transport variant. Both branches end with `initialize` + + // `list_tools` so a misconfigured server fails loudly at connect + // instead of silently at first `call_tool`. + let client = match &server.transport { + Transport::Stdio => { + let stdio = Arc::new(McpStdioClient::new( + server.command.clone(), + server.args.clone(), + env, + None, + identity, + )); + stdio.initialize().await?; + ActiveClient::Stdio(stdio) + } + Transport::HttpRemote { url } => { + if url.is_empty() { + anyhow::bail!( + "[mcp-registry] http_remote server_id={} has empty deployment_url", + server.server_id + ); + } + // 30s timeout matches setup_ops::test_connection so install + // and runtime see the same connect-failure deadlines. Env + // values for HTTP-remote installs (typically OAuth tokens) + // ride through the McpHttpClient's own auth config — out of + // scope for this dispatch. + let http = Arc::new(McpHttpClient::new(url.clone(), 30)); + http.initialize().await?; + ActiveClient::Http(http) + } + }; + let remote_tools = client.list_tools().await?; let tools: Vec = remote_tools.into_iter().map(into_registry_tool).collect(); let conn = Arc::new(Connection { - client: Arc::clone(&client), + client, tools: RwLock::new(tools.clone()), }); diff --git a/src/openhuman/mcp_registry/ops.rs b/src/openhuman/mcp_registry/ops.rs index a1b08c57cc..65faa7f2fe 100644 --- a/src/openhuman/mcp_registry/ops.rs +++ b/src/openhuman/mcp_registry/ops.rs @@ -148,6 +148,12 @@ pub async fn mcp_clients_install( .map(|d| d.as_millis() as i64) .unwrap_or(0); + // The legacy install path only ever picked stdio connections (see the + // `c.r#type == "stdio"` filter above), so legacy installs continue to + // be stdio-only. HTTP-remote installs go through the newer + // `setup_ops::mcp_setup_install_and_connect` setup-agent path, which + // picks the right transport based on what the registry actually + // exposes. let server = InstalledServer { server_id: server_id.clone(), qualified_name: qualified_name.trim().to_string(), @@ -161,6 +167,7 @@ pub async fn mcp_clients_install( config: config_value, installed_at: now_ms, last_connected_at: None, + transport: super::types::Transport::Stdio, }; store::insert_server(config, &server).map_err(|e| e.to_string())?; diff --git a/src/openhuman/mcp_registry/setup_ops.rs b/src/openhuman/mcp_registry/setup_ops.rs index 75b2c65089..29aff2264a 100644 --- a/src/openhuman/mcp_registry/setup_ops.rs +++ b/src/openhuman/mcp_registry/setup_ops.rs @@ -24,12 +24,12 @@ use uuid::Uuid; use crate::core::event_bus::{publish_global, DomainEvent}; use crate::openhuman::config::Config; -use crate::openhuman::mcp_client::McpStdioClient; +use crate::openhuman::mcp_client::{McpHttpClient, McpStdioClient}; use crate::rpc::RpcOutcome; use super::ops::resolve_command; use super::setup::{self, SecretRef}; -use super::types::{CommandKind, InstalledServer}; +use super::types::{CommandKind, InstalledServer, SmitheryConnection, Transport}; use super::{connections, registry, store}; // ── search ─────────────────────────────────────────────────────────────────── @@ -152,43 +152,93 @@ pub async fn mcp_setup_test_connection( let detail = registry::registry_get(config, q) .await .map_err(|e| e.to_string())?; - let stdio_conn = detail - .connections - .iter() - .filter(|c| c.r#type == "stdio") - .find(|c| c.published) - .or_else(|| detail.connections.iter().find(|c| c.r#type == "stdio")); - let (_kind, command, args) = resolve_command(q, stdio_conn); + let picked = pick_connection(&detail.connections).ok_or_else(|| { + format!("server `{q}` exposes neither stdio nor http_remote connections; nothing to test") + })?; let identity = config.mcp_client.client_identity.clone(); - let cwd: Option = None; - let client = McpStdioClient::new(command.clone(), args.clone(), env, cwd, identity); - // Scratch subprocess — initialise + list_tools, then close. Nothing + // Scratch session — initialise + list_tools, then close. Nothing // persisted. Errors bubble up so the agent can show them to the user. - if let Err(err) = client.initialize().await { - return Ok(RpcOutcome::new( - json!({ "ok": false, "error": err.to_string() }), - vec![format!("test_connection failed for {q}: {err}")], - )); - } - let tools = match client.list_tools().await { - Ok(t) => t, - Err(err) => { - let _ = client.close_session().await; + let (init_ok, tools) = match picked.transport_kind() { + "stdio" => { + let (_kind, command, args) = resolve_command(q, Some(picked)); + let cwd: Option = None; + let client = McpStdioClient::new(command, args, env, cwd, identity); + if let Err(err) = client.initialize().await { + return Ok(RpcOutcome::new( + json!({ "ok": false, "error": err.to_string() }), + vec![format!("test_connection failed for {q}: {err}")], + )); + } + match client.list_tools().await { + Ok(t) => { + let _ = client.close_session().await; + (true, t) + } + Err(err) => { + let _ = client.close_session().await; + return Ok(RpcOutcome::new( + json!({ "ok": false, "error": err.to_string() }), + vec![format!("test_connection list_tools failed for {q}: {err}")], + )); + } + } + } + // HTTP-remote path: dial the published deployment_url over + // Streamable HTTP. No subprocess, no env injection needed at + // dial time (env vars for HTTP-remote installs are typically + // OAuth tokens that the McpHttpClient picks up from its own + // auth config — out of scope for this scratch test). + "http_remote" => { + let endpoint = picked.deployment_url.clone().unwrap_or_default(); + if endpoint.is_empty() { + return Ok(RpcOutcome::new( + json!({ "ok": false, "error": "deployment_url is empty for http_remote connection" }), + vec![format!( + "test_connection failed for {q}: empty deployment_url" + )], + )); + } + let client = McpHttpClient::new(endpoint.clone(), 30); + if let Err(err) = client.initialize().await { + return Ok(RpcOutcome::new( + json!({ "ok": false, "error": err.to_string() }), + vec![format!("test_connection (http) failed for {q}: {err}")], + )); + } + match client.list_tools().await { + Ok(t) => { + let _ = client.close_session().await; + (true, t) + } + Err(err) => { + let _ = client.close_session().await; + return Ok(RpcOutcome::new( + json!({ "ok": false, "error": err.to_string() }), + vec![format!( + "test_connection (http) list_tools failed for {q}: {err}" + )], + )); + } + } + } + other => { return Ok(RpcOutcome::new( - json!({ "ok": false, "error": err.to_string() }), - vec![format!("test_connection list_tools failed for {q}: {err}")], + json!({ "ok": false, "error": format!("unsupported transport `{other}`") }), + vec![format!( + "test_connection failed for {q}: unsupported transport `{other}`" + )], )); } }; - let _ = client.close_session().await; let names: Vec<&str> = tools.iter().map(|t| t.name.as_str()).collect(); Ok(RpcOutcome::new( - json!({ "ok": true, "tools": tools }), + json!({ "ok": init_ok, "tools": tools, "transport": picked.transport_kind() }), vec![format!( - "test_connection ok for {q}: {} tools ({:?})", + "test_connection ok for {q} via {}: {} tools ({:?})", + picked.transport_kind(), tools.len(), names )], @@ -212,13 +262,35 @@ pub async fn mcp_setup_install_and_connect( let detail = registry::registry_get(config, q) .await .map_err(|e| e.to_string())?; - let stdio_conn = detail - .connections - .iter() - .filter(|c| c.r#type == "stdio") - .find(|c| c.published) - .or_else(|| detail.connections.iter().find(|c| c.r#type == "stdio")); - let (command_kind, command, args) = resolve_command(q, stdio_conn); + let picked = pick_connection(&detail.connections).ok_or_else(|| { + format!( + "server `{q}` exposes neither stdio nor http_remote connections; nothing to install" + ) + })?; + + // Branch on the picked transport. Stdio installs still populate + // command/args (current behavior). HTTP-remote installs leave them + // empty and stash the deployment URL in `transport`. + let (transport, command_kind, command, args) = match picked.transport_kind() { + "http_remote" => { + let url = picked.deployment_url.clone().unwrap_or_default(); + if url.is_empty() { + return Err(format!( + "server `{q}` http_remote connection has empty deployment_url" + )); + } + ( + Transport::HttpRemote { url }, + CommandKind::Node, // unused for HTTP, but a sensible default + String::new(), + Vec::new(), + ) + } + _ => { + let (kind, command, args) = resolve_command(q, Some(picked)); + (Transport::Stdio, kind, command, args) + } + }; // Consume refs only after `registry_get` succeeds — that way a // misconfigured server name doesn't burn the user's collected @@ -248,6 +320,7 @@ pub async fn mcp_setup_install_and_connect( config: None, installed_at: now_ms, last_connected_at: None, + transport, }; store::insert_server(config, &server).map_err(|e| e.to_string())?; @@ -327,3 +400,131 @@ fn collect_required_env_keys(detail: &super::types::SmitheryServerDetail) -> Vec // at the call site. #[allow(dead_code)] const _: Option = None; + +/// Choose the best [`SmitheryConnection`] from a registry detail response. +/// +/// Preference order: +/// 1. **Published `stdio`** — no behaviour regression for any server that +/// used to install before HTTP-remote support landed. +/// 2. **Any `stdio`** (even unpublished) — also pre-existing fallback. +/// 3. **Published `http_remote`** — the new path. Smithery serves ~99% of +/// their listings as HTTP-remote. +/// 4. **Any `http_remote`** — last-resort. +/// 5. `None` — nothing dialable. +/// +/// Stdio is preferred because it's privacy-strict (everything runs locally) +/// and because most of the existing OpenHuman ecosystem assumes stdio. HTTP +/// is the fallback that finally lets a Smithery-only server install at all. +pub(super) fn pick_connection(connections: &[SmitheryConnection]) -> Option<&SmitheryConnection> { + // Treat the canonical wire names ("stdio", "http") AND the persisted + // dispatch kinds ("http_remote") as equivalent — registry payloads + // historically use "http" while our `Transport` discriminator uses + // "http_remote". `transport_kind` normalises that mapping. + let stdio_pub = connections + .iter() + .find(|c| c.transport_kind() == "stdio" && c.published); + if stdio_pub.is_some() { + return stdio_pub; + } + let stdio_any = connections.iter().find(|c| c.transport_kind() == "stdio"); + if stdio_any.is_some() { + return stdio_any; + } + let http_pub = connections + .iter() + .find(|c| c.transport_kind() == "http_remote" && c.published); + if http_pub.is_some() { + return http_pub; + } + connections + .iter() + .find(|c| c.transport_kind() == "http_remote") +} + +/// Normalise a [`SmitheryConnection::r#type`] string into the same vocabulary +/// the persisted [`Transport`] enum uses. The registry side uses `"http"` +/// in its DTOs; we route those into the `"http_remote"` install path. +trait ConnectionKind { + fn transport_kind(&self) -> &str; +} + +impl ConnectionKind for SmitheryConnection { + fn transport_kind(&self) -> &str { + match self.r#type.as_str() { + "stdio" => "stdio", + "http" | "http_remote" | "sse" => "http_remote", + other => other, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn conn(kind: &str, published: bool, url: Option<&str>) -> SmitheryConnection { + SmitheryConnection { + r#type: kind.to_string(), + deployment_url: url.map(String::from), + config_schema: None, + example_config: None, + published, + extra: std::collections::HashMap::new(), + } + } + + /// Stdio wins when both transports are offered, even when stdio is + /// unpublished and http is published. This pins the "no regression + /// for existing stdio installs" promise. + #[test] + fn pick_connection_prefers_stdio_over_http() { + let conns = vec![ + conn("http", true, Some("https://x.io/mcp")), + conn("stdio", false, None), + ]; + let picked = pick_connection(&conns).expect("stdio should be picked"); + assert_eq!(picked.r#type, "stdio"); + } + + /// Published stdio beats unpublished stdio. + #[test] + fn pick_connection_prefers_published_stdio_first() { + let conns = vec![conn("stdio", false, None), conn("stdio", true, None)]; + let picked = pick_connection(&conns).expect("published stdio should win"); + assert!(picked.published); + } + + /// When the server is HTTP-remote-only (the Smithery-typical case), + /// the picker returns the HTTP-remote connection instead of `None` — + /// this is the core gap the PR closes. + #[test] + fn pick_connection_falls_back_to_http_remote_when_no_stdio() { + let conns = vec![conn("http", true, Some("https://x.io/mcp"))]; + let picked = pick_connection(&conns).expect("http_remote fallback"); + assert_eq!(picked.transport_kind(), "http_remote"); + assert_eq!(picked.deployment_url.as_deref(), Some("https://x.io/mcp")); + } + + /// Smithery DTOs use `"http"`, our `Transport` discriminator uses + /// `"http_remote"`. Normalisation pins both as the same install path. + #[test] + fn connection_kind_normalises_http_variants() { + assert_eq!(conn("http", true, None).transport_kind(), "http_remote"); + assert_eq!( + conn("http_remote", true, None).transport_kind(), + "http_remote" + ); + assert_eq!(conn("sse", true, None).transport_kind(), "http_remote"); + assert_eq!(conn("stdio", true, None).transport_kind(), "stdio"); + // Unknown kinds fall through untouched so the picker can ignore them. + assert_eq!(conn("ws", true, None).transport_kind(), "ws"); + } + + /// No dialable connection → picker returns None so callers can return + /// a clean error instead of dialing garbage. + #[test] + fn pick_connection_returns_none_for_only_unknown_kinds() { + let conns = vec![conn("websocket-future", true, None)]; + assert!(pick_connection(&conns).is_none()); + } +} diff --git a/src/openhuman/mcp_registry/store.rs b/src/openhuman/mcp_registry/store.rs index 1329bcdbc0..2b5948558b 100644 --- a/src/openhuman/mcp_registry/store.rs +++ b/src/openhuman/mcp_registry/store.rs @@ -15,7 +15,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use crate::openhuman::config::Config; -use super::types::{CommandKind, InstalledServer}; +use super::types::{CommandKind, InstalledServer, Transport}; // ── Helpers ────────────────────────────────────────────────────────────────── @@ -81,7 +81,45 @@ fn init_schema(conn: &Connection) -> Result<()> { cached_at INTEGER NOT NULL );", ) - .context("Failed to initialise mcp_clients schema") + .context("Failed to initialise mcp_clients schema")?; + + // Additive HTTP-remote transport columns (introduced after the schema + // was first cut). SQLite's `ALTER TABLE ADD COLUMN` doesn't support + // `IF NOT EXISTS`, so we use `PRAGMA table_info` to detect which + // columns are already there and skip the ones that are. Idempotent + // across launches; old `'stdio'`-implicit rows pick up the new + // `transport` column with the default value. + let existing_cols = mcp_servers_columns(conn)?; + if !existing_cols.iter().any(|c| c == "transport") { + conn.execute( + "ALTER TABLE mcp_servers ADD COLUMN transport TEXT NOT NULL DEFAULT 'stdio'", + [], + ) + .context("Failed to add transport column to mcp_servers")?; + } + if !existing_cols.iter().any(|c| c == "deployment_url") { + conn.execute("ALTER TABLE mcp_servers ADD COLUMN deployment_url TEXT", []) + .context("Failed to add deployment_url column to mcp_servers")?; + } + + Ok(()) +} + +/// Snapshot of the column names on `mcp_servers`. Used by the additive +/// migration in [`init_schema`] to decide which `ALTER TABLE ADD COLUMN` +/// statements still need to run on this DB. +fn mcp_servers_columns(conn: &Connection) -> Result> { + let mut stmt = conn + .prepare("PRAGMA table_info(mcp_servers)") + .context("prepare PRAGMA table_info")?; + // PRAGMA table_info row shape: (cid, name, type, notnull, dflt_value, pk). + let mut rows = stmt.query([])?; + let mut cols = Vec::new(); + while let Some(row) = rows.next()? { + let name: String = row.get(1)?; + cols.push(name); + } + Ok(cols) } // ── InstalledServer CRUD ────────────────────────────────────────────────────── @@ -102,8 +140,8 @@ pub fn insert_server_conn(conn: &Connection, server: &InstalledServer) -> Result "INSERT INTO mcp_servers (server_id, qualified_name, display_name, description, icon_url, command_kind, command, args_json, env_keys_json, config_json, - installed_at, last_connected_at) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)", + installed_at, last_connected_at, transport, deployment_url) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)", params![ server.server_id, server.qualified_name, @@ -117,6 +155,8 @@ pub fn insert_server_conn(conn: &Connection, server: &InstalledServer) -> Result config_json, server.installed_at, server.last_connected_at, + server.transport.dispatch_kind(), + server.transport.deployment_url(), ], ) .context("Failed to insert mcp_server")?; @@ -131,7 +171,7 @@ pub fn list_servers_conn(conn: &Connection) -> Result> { let mut stmt = conn.prepare( "SELECT server_id, qualified_name, display_name, description, icon_url, command_kind, command, args_json, env_keys_json, config_json, - installed_at, last_connected_at + installed_at, last_connected_at, transport, deployment_url FROM mcp_servers ORDER BY installed_at ASC", )?; let rows = stmt.query_map([], map_server_row)?; @@ -150,7 +190,7 @@ pub fn get_server_conn(conn: &Connection, server_id: &str) -> Result) -> rusqlite::Result })?), }; + // Both transport columns are post-migration additions, so `row.get` + // may return missing-column-by-name errors on a DB that hasn't run the + // ADD COLUMN steps for some reason (rare — the migration is in + // `init_schema`). Fall back to stdio rather than fail loading the + // whole row. + let transport_kind: String = row.get::<_, Option>(12)?.unwrap_or_default(); + let deployment_url: Option = row.get(13)?; + let transport = Transport::parse(&transport_kind, deployment_url.as_deref()); + Ok(InstalledServer { server_id: row.get(0)?, qualified_name: row.get(1)?, @@ -216,6 +265,7 @@ fn map_server_row(row: &rusqlite::Row<'_>) -> rusqlite::Result config, installed_at: row.get(10)?, last_connected_at: row.get(11)?, + transport, }) } @@ -345,6 +395,27 @@ mod tests { config: None, installed_at: 1_700_000_000_000, last_connected_at: None, + transport: Transport::Stdio, + } + } + + fn sample_http_server(id: &str, url: &str) -> InstalledServer { + InstalledServer { + server_id: id.to_string(), + qualified_name: "@test/http-server".to_string(), + display_name: "Test HTTP Server".to_string(), + description: None, + icon_url: None, + command_kind: CommandKind::Node, // unused for HTTP + command: String::new(), + args: Vec::new(), + env_keys: Vec::new(), + config: None, + installed_at: 1_700_000_000_000, + last_connected_at: None, + transport: Transport::HttpRemote { + url: url.to_string(), + }, } } @@ -437,4 +508,100 @@ mod tests { assert_eq!(loaded.args, vec!["--port", "8080"]); assert_eq!(loaded.env_keys, vec!["KEY_A", "KEY_B"]); } + + /// HTTP-remote row round-trips through INSERT/SELECT with the + /// `deployment_url` preserved and `transport.dispatch_kind()` flipped + /// to `"http_remote"`. Without this test a regression in the + /// `map_server_row` column indices would silently downgrade every + /// HTTP-remote install back to stdio at next launch. + #[test] + fn http_remote_server_roundtrips_with_url_preserved() { + let (_f, conn) = open_test_conn(); + let server = sample_http_server("srv-http", "https://smithery.ai/server/x/mcp"); + insert_server_conn(&conn, &server).unwrap(); + + let loaded = get_server_conn(&conn, "srv-http").unwrap(); + match loaded.transport { + Transport::HttpRemote { url } => { + assert_eq!(url, "https://smithery.ai/server/x/mcp"); + } + other => panic!("expected HttpRemote, got {other:?}"), + } + } + + /// Mixed stdio + http rows list back in their persisted form (no + /// cross-contamination of the `transport` column between rows). + #[test] + fn list_servers_preserves_per_row_transport() { + let (_f, conn) = open_test_conn(); + insert_server_conn(&conn, &sample_server("srv-stdio")).unwrap(); + insert_server_conn(&conn, &sample_http_server("srv-http", "https://x.io/mcp")).unwrap(); + + let mut servers = list_servers_conn(&conn).unwrap(); + servers.sort_by_key(|s| s.server_id.clone()); + assert_eq!(servers.len(), 2); + // Alphabetical sort: "srv-http" precedes "srv-stdio". + assert_eq!(servers[0].server_id, "srv-http"); + assert_eq!( + servers[0].transport, + Transport::HttpRemote { + url: "https://x.io/mcp".to_string() + } + ); + assert_eq!(servers[1].server_id, "srv-stdio"); + assert_eq!(servers[1].transport, Transport::Stdio); + } + + /// Simulates the pre-migration state by dropping the `transport` and + /// `deployment_url` columns *after* schema init, manually inserting a + /// row that lacks them, and then re-running `init_schema` to confirm + /// the additive ALTER TABLE re-introduces the columns idempotently and + /// the old row loads as stdio (the migration's whole point). + /// + /// SQLite can't `DROP COLUMN` portably before 3.35, so the test uses + /// a CREATE-TABLE-AS rebuild to mimic the original schema shape. + #[test] + fn additive_migration_recovers_pre_migration_row_as_stdio() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let conn = rusqlite::Connection::open(tmp.path()).unwrap(); + + // Step 1: pre-migration schema (no transport / deployment_url). + conn.execute_batch( + "CREATE TABLE mcp_servers ( + server_id TEXT PRIMARY KEY, + qualified_name TEXT NOT NULL, + display_name TEXT NOT NULL, + description TEXT, + icon_url TEXT, + command_kind TEXT NOT NULL DEFAULT 'node', + command TEXT NOT NULL, + args_json TEXT NOT NULL DEFAULT '[]', + env_keys_json TEXT NOT NULL DEFAULT '[]', + config_json TEXT, + installed_at INTEGER NOT NULL, + last_connected_at INTEGER + );", + ) + .unwrap(); + conn.execute( + "INSERT INTO mcp_servers + (server_id, qualified_name, display_name, command_kind, command, installed_at) + VALUES ('legacy-1', '@old/server', 'Old', 'node', 'npx', 1700000000000)", + [], + ) + .unwrap(); + + // Step 2: simulate the upgrade path — re-run init_schema, which + // detects the missing columns via PRAGMA and runs ALTER TABLE. + init_schema(&conn).unwrap(); + + // Idempotency: running it again must not fail or duplicate the + // columns. (Real launches hit this every process start.) + init_schema(&conn).unwrap(); + + // Step 3: the legacy row loads as Transport::Stdio. + let loaded = get_server_conn(&conn, "legacy-1").unwrap(); + assert_eq!(loaded.transport, Transport::Stdio); + assert_eq!(loaded.command, "npx"); + } } diff --git a/src/openhuman/mcp_registry/types.rs b/src/openhuman/mcp_registry/types.rs index 1cf3ff7016..4cd54830fc 100644 --- a/src/openhuman/mcp_registry/types.rs +++ b/src/openhuman/mcp_registry/types.rs @@ -39,6 +39,63 @@ impl CommandKind { } } +// ── Transport ──────────────────────────────────────────────────────────────── + +/// How a connected MCP server's transport is dialled. +/// +/// Mirrors `mcp_client::registry::McpTransportClient` at the install-record +/// layer — same two backends (`McpStdioClient` / `McpHttpClient`), one extra +/// indirection because the install row has to be serialisable + persistable +/// across restarts. The `dispatch_kind` string is what we persist into the +/// `mcp_servers.transport` column (`"stdio"` | `"http_remote"`); existing +/// rows from before the column existed default to `"stdio"` per the +/// store-side migration. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum Transport { + /// Local subprocess JSON-RPC over stdin/stdout. Spawned via + /// `command` + `args` (resolved from `command_kind`). + Stdio, + /// HTTPS endpoint hosted by the upstream registry (typically Smithery — + /// `~99%` of their listings are HTTP-remote). Streamable HTTP + OAuth + + /// SSE per the MCP spec, served by [`mcp_client::McpHttpClient`]. + HttpRemote { url: String }, +} + +impl Transport { + /// Stable string identifier persisted in `mcp_servers.transport`. + /// Kept narrow on purpose — schema migrations notice unknown values. + pub fn dispatch_kind(&self) -> &'static str { + match self { + Self::Stdio => "stdio", + Self::HttpRemote { .. } => "http_remote", + } + } + + /// Inverse of `dispatch_kind`. Unknown / missing values fall back to + /// `Stdio` so pre-migration rows (where the column didn't exist and + /// every record was implicitly stdio) keep working with no behaviour + /// change. + pub fn parse(kind: &str, deployment_url: Option<&str>) -> Self { + match kind { + "http_remote" => Self::HttpRemote { + url: deployment_url.unwrap_or("").to_string(), + }, + _ => Self::Stdio, + } + } + + /// `Some(url)` for HTTP-remote, `None` for stdio. Convenience accessor + /// for the store layer that needs to persist `deployment_url` as its + /// own column. + pub fn deployment_url(&self) -> Option<&str> { + match self { + Self::Stdio => None, + Self::HttpRemote { url } => Some(url.as_str()), + } + } +} + // ── InstalledServer ───────────────────────────────────────────────────────── /// A locally installed MCP server record. @@ -58,11 +115,14 @@ pub struct InstalledServer { pub description: Option, /// Icon URL from the registry. pub icon_url: Option, - /// How the server subprocess should be launched. + /// How the server subprocess should be launched (stdio installs only). + /// For HTTP-remote installs this is still set to a sensible default — + /// callers route off [`Self::transport`] instead. pub command_kind: CommandKind, - /// Resolved binary or launcher (`npx`, `uvx`, etc). + /// Resolved binary or launcher (`npx`, `uvx`, etc). Empty string for + /// HTTP-remote installs. pub command: String, - /// Arguments passed to `command`. + /// Arguments passed to `command`. Empty vec for HTTP-remote installs. pub args: Vec, /// Names of required env vars (values are stored separately and never logged). pub env_keys: Vec, @@ -72,6 +132,18 @@ pub struct InstalledServer { pub installed_at: i64, /// Unix epoch milliseconds when the server last connected successfully. pub last_connected_at: Option, + /// Transport variant for this install — `Stdio` for legacy / local + /// subprocess servers, `HttpRemote { url }` for Smithery-hosted ones. + /// Defaults to `Stdio` for rows persisted before the column existed. + #[serde(default = "default_transport")] + pub transport: Transport, +} + +/// Default for `InstalledServer::transport` when the field is missing from +/// a serialised payload (e.g. legacy persisted rows, callers that haven't +/// migrated their construction site yet). +fn default_transport() -> Transport { + Transport::Stdio } // ── McpTool ───────────────────────────────────────────────────────────────── @@ -278,6 +350,7 @@ mod tests { config: None, installed_at: 1_700_000_000_000, last_connected_at: None, + transport: Transport::Stdio, }; let v = serde_json::to_value(&server).unwrap(); // env_keys present, but no raw values @@ -285,6 +358,83 @@ mod tests { assert!(v.get("env_values").is_none()); } + /// `Transport::dispatch_kind` is the column value persisted into + /// `mcp_servers.transport`. Pinning both stdio and http-remote so a + /// schema-side change can't silently rename one without surfacing here. + #[test] + fn transport_dispatch_kind_strings_are_stable() { + assert_eq!(Transport::Stdio.dispatch_kind(), "stdio"); + assert_eq!( + Transport::HttpRemote { + url: "https://example.com/mcp".to_string() + } + .dispatch_kind(), + "http_remote" + ); + } + + /// `Transport::parse` is what the store layer calls when re-hydrating + /// a row. The Stdio fallback for unknown / missing values is the + /// migration-safety hatch — rows persisted before the `transport` + /// column existed must keep working as stdio installs. + #[test] + fn transport_parse_falls_back_to_stdio_for_unknown_kinds() { + // Stdio: explicit + with-no-url + assert_eq!(Transport::parse("stdio", None), Transport::Stdio); + assert_eq!(Transport::parse("stdio", Some("ignored")), Transport::Stdio); + + // Pre-migration empty value → stdio (backwards-compat). + assert_eq!(Transport::parse("", None), Transport::Stdio); + // Unknown kind from a future row → stdio (defensive default; we'd + // rather a misconfigured row stall on connect than misroute). + assert_eq!(Transport::parse("garbage", None), Transport::Stdio); + + // HTTP remote round-trip carries the URL through. + assert_eq!( + Transport::parse("http_remote", Some("https://x.io/mcp")), + Transport::HttpRemote { + url: "https://x.io/mcp".to_string() + } + ); + } + + /// `deployment_url` accessor is what the store uses to populate the + /// adjacent `mcp_servers.deployment_url` column. Stdio → `None`, + /// HTTP remote → `Some(url)`. Confirms the two never get crossed. + #[test] + fn transport_deployment_url_accessor() { + assert_eq!(Transport::Stdio.deployment_url(), None); + let http = Transport::HttpRemote { + url: "https://smithery.ai/server/x".to_string(), + }; + assert_eq!(http.deployment_url(), Some("https://smithery.ai/server/x")); + } + + /// `InstalledServer::transport` is `#[serde(default)]`-backed so that + /// pre-migration JSON payloads (without the field at all) deserialise + /// as stdio installs. Without this, every persisted row from before + /// this change would fail to load after upgrade. + #[test] + fn installed_server_defaults_transport_to_stdio_on_missing_field() { + let legacy = json!({ + "server_id": "uuid-1", + "qualified_name": "@old/server", + "display_name": "Old", + "description": null, + "icon_url": null, + "command_kind": "node", + "command": "npx", + "args": ["-y", "@old/server"], + "env_keys": [], + "config": null, + "installed_at": 1_700_000_000_000i64, + "last_connected_at": null + // ← deliberately no `transport` key + }); + let s: InstalledServer = serde_json::from_value(legacy).unwrap(); + assert_eq!(s.transport, Transport::Stdio); + } + #[test] fn conn_status_status_field_serializes_lowercase() { let s = ConnStatus { From ad4d1b608b706b81f110a316e91f8a10842bda08 Mon Sep 17 00:00:00 2001 From: justin Date: Mon, 25 May 2026 14:30:29 +0800 Subject: [PATCH 2/2] fix(test): add transport field to mcp_registry_e2e InstalledServer fixture MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `InstalledServer` grew a `transport` field in the previous commit but `tests/mcp_registry_e2e.rs::make_installed_server` was missed because the integration test target isn't compiled by `cargo check --lib` / `cargo test --lib`. CI surfaced the E0063 on `test / Rust Core Tests + Quality`. Pinning the test fixture to `Transport::Stdio` matches the existing test intent — it dials a local stub subprocess. --- tests/mcp_registry_e2e.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/mcp_registry_e2e.rs b/tests/mcp_registry_e2e.rs index 7601d48f29..59048a7867 100644 --- a/tests/mcp_registry_e2e.rs +++ b/tests/mcp_registry_e2e.rs @@ -10,7 +10,7 @@ use openhuman_core::openhuman::config::Config; use openhuman_core::openhuman::mcp_registry::connections; use openhuman_core::openhuman::mcp_registry::store; -use openhuman_core::openhuman::mcp_registry::types::{CommandKind, InstalledServer}; +use openhuman_core::openhuman::mcp_registry::types::{CommandKind, InstalledServer, Transport}; fn fresh_workspace_config() -> (tempfile::TempDir, Config) { let tmp = tempfile::tempdir().expect("tempdir"); @@ -34,6 +34,7 @@ fn make_installed_server() -> InstalledServer { config: None, installed_at: 0, last_connected_at: None, + transport: Transport::Stdio, } }