Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 100 additions & 23 deletions src/openhuman/mcp_registry/connections.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
//! Global in-process registry of active MCP client connections.
//!
//! Keyed by `server_id` (UUID). Connections are established by [`connect`]
//! and removed by [`disconnect`]. The actual stdio transport lives in
//! [`crate::openhuman::mcp_client::McpStdioClient`] — this module just
//! owns the per-server lifecycle and a global handle map.
//! and removed by [`disconnect`]. The actual transport
//! ([`McpStdioClient`] for local subprocess installs,
//! [`McpHttpClient`] for HTTP-remote installs hosted by Smithery /
//! similar) lives in [`crate::openhuman::mcp_client`] — this module just
//! owns the per-server lifecycle, the transport dispatch, and a global
//! handle map.
//!
//! Dispatch is driven by the `transport` field on each [`InstalledServer`],
//! which is set at install time by `mcp_setup_install_and_connect` and
//! persisted in the `mcp_servers.transport` column.

use std::collections::HashMap;
use std::sync::{Arc, OnceLock};
Expand All @@ -12,16 +19,55 @@ use serde_json::Value;
use tokio::sync::RwLock;

use crate::openhuman::config::Config;
use crate::openhuman::mcp_client::{McpRemoteTool, McpStdioClient};
use crate::openhuman::mcp_client::{McpHttpClient, McpRemoteTool, McpStdioClient};

use super::store;
use super::types::{ConnStatus, InstalledServer, McpTool, ServerStatus};
use super::types::{ConnStatus, InstalledServer, McpTool, ServerStatus, Transport};

// ── Connection record ────────────────────────────────────────────────────────

/// One live MCP subprocess plus the tool list cached after `initialize`.
/// Active transport for one connected MCP install. Mirrors
/// [`crate::openhuman::mcp_client::registry::McpTransportClient`] but lives
/// here so `mcp_registry` doesn't have to depend on the static-config
/// registry's specific wrapping. Both variants expose the same surface
/// (`initialize` / `list_tools` / `call_tool` / `close_session`) so callers
/// don't have to branch.
enum ActiveClient {
Stdio(Arc<McpStdioClient>),
Http(Arc<McpHttpClient>),
}

impl ActiveClient {
async fn list_tools(&self) -> anyhow::Result<Vec<McpRemoteTool>> {
match self {
Self::Stdio(c) => c.list_tools().await,
Self::Http(c) => c.list_tools().await,
}
}

async fn call_tool(
&self,
name: &str,
arguments: Value,
) -> anyhow::Result<crate::openhuman::mcp_client::McpServerToolResult> {
match self {
Self::Stdio(c) => c.call_tool(name, arguments).await,
Self::Http(c) => c.call_tool(name, arguments).await,
}
}

async fn close_session(&self) -> anyhow::Result<()> {
match self {
Self::Stdio(c) => c.close_session().await,
Self::Http(c) => c.close_session().await,
}
}
}

/// One live MCP client (stdio subprocess OR HTTP-remote dial) plus the
/// tool list cached after `initialize`.
struct Connection {
client: Arc<McpStdioClient>,
client: ActiveClient,
tools: RwLock<Vec<McpTool>>,
}

Expand All @@ -41,13 +87,21 @@ fn connections() -> &'static RwLock<HashMap<String, Arc<Connection>>> {

// ── Public API ────────────────────────────────────────────────────────────────

/// Spawn a new stdio subprocess (via `McpStdioClient`), run `initialize`,
/// cache the tool list, and store the connection in the global registry.
/// Bring up a new MCP client for `server`, run `initialize`, cache the
/// tool list, and store the connection in the global registry.
///
/// Dispatches on `server.transport`:
/// - [`Transport::Stdio`] — spawn `command` + `args` as a subprocess and
/// speak JSON-RPC over stdin/stdout (the original behaviour).
/// - [`Transport::HttpRemote`] — dial the published HTTPS endpoint
/// directly with [`McpHttpClient`]. No subprocess. Needed for the
/// `~99%` of Smithery listings that are HTTP-remote.
pub async fn connect(config: &Config, server: &InstalledServer) -> anyhow::Result<Vec<McpTool>> {
tracing::debug!(
"[mcp-registry] connect server_id={} qualified_name={}",
"[mcp-registry] connect server_id={} qualified_name={} transport={}",
server.server_id,
server.qualified_name
server.qualified_name,
server.transport.dispatch_kind()
);

let env_map = store::load_env_values(config, &server.server_id).unwrap_or_default();
Expand All @@ -60,22 +114,45 @@ pub async fn connect(config: &Config, server: &InstalledServer) -> anyhow::Resul
);

let identity = config.mcp_client.client_identity.clone();
let client = Arc::new(McpStdioClient::new(
server.command.clone(),
server.args.clone(),
env,
None,
identity,
));

// Initialize + first tools/list happen here so a misconfigured server
// fails loudly at `connect` instead of silently at first `call_tool`.
client.initialize().await?;

// Branch on transport variant. Both branches end with `initialize` +
// `list_tools` so a misconfigured server fails loudly at connect
// instead of silently at first `call_tool`.
let client = match &server.transport {
Transport::Stdio => {
let stdio = Arc::new(McpStdioClient::new(
server.command.clone(),
server.args.clone(),
env,
None,
identity,
));
stdio.initialize().await?;
ActiveClient::Stdio(stdio)
}
Transport::HttpRemote { url } => {
if url.is_empty() {
anyhow::bail!(
"[mcp-registry] http_remote server_id={} has empty deployment_url",
server.server_id
);
}
// 30s timeout matches setup_ops::test_connection so install
// and runtime see the same connect-failure deadlines. Env
// values for HTTP-remote installs (typically OAuth tokens)
// ride through the McpHttpClient's own auth config — out of
// scope for this dispatch.
let http = Arc::new(McpHttpClient::new(url.clone(), 30));
http.initialize().await?;
ActiveClient::Http(http)
}
};

let remote_tools = client.list_tools().await?;
let tools: Vec<McpTool> = remote_tools.into_iter().map(into_registry_tool).collect();

let conn = Arc::new(Connection {
client: Arc::clone(&client),
client,
tools: RwLock::new(tools.clone()),
});

Expand Down
7 changes: 7 additions & 0 deletions src/openhuman/mcp_registry/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ pub async fn mcp_clients_install(
.map(|d| d.as_millis() as i64)
.unwrap_or(0);

// The legacy install path only ever picked stdio connections (see the
// `c.r#type == "stdio"` filter above), so legacy installs continue to
// be stdio-only. HTTP-remote installs go through the newer
// `setup_ops::mcp_setup_install_and_connect` setup-agent path, which
// picks the right transport based on what the registry actually
// exposes.
let server = InstalledServer {
server_id: server_id.clone(),
qualified_name: qualified_name.trim().to_string(),
Expand All @@ -161,6 +167,7 @@ pub async fn mcp_clients_install(
config: config_value,
installed_at: now_ms,
last_connected_at: None,
transport: super::types::Transport::Stdio,
};

store::insert_server(config, &server).map_err(|e| e.to_string())?;
Expand Down
Loading
Loading