Skip to content
28 changes: 28 additions & 0 deletions app/test/e2e/specs/mega-flow.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,20 @@ async function waitForMockRequest(
return undefined;
}

async function waitForBackendSession(label: string, timeoutMs = 15_000): Promise<void> {
const deadline = Date.now() + timeoutMs;
let lastProbe: unknown = undefined;

while (Date.now() < deadline) {
const probe = await callOpenhumanRpc('openhuman.composio_list_triggers', {});
if (probe.ok) return;
lastProbe = probe;
await browser.pause(500);
}

throw new Error(`${LOG} ${label}: backend session not ready: ${JSON.stringify(lastProbe)}`);
}

async function resetEverything(label: string): Promise<void> {
console.log(`${LOG} reset (${label}) — admin reset only (skip destructive core reset)`);
// Mock-side reset is enough to give each scenario a clean slate for the
Expand Down Expand Up @@ -237,6 +251,7 @@ describe('Mega flow — login + Gmail OAuth + Composio in one session', () => {
// Re-login since reset wipes the session.
await triggerDeepLink('openhuman://auth?token=mega-composio-token');
await waitForMockRequest('POST', '/telegram/login-tokens/', 15_000);
await waitForBackendSession('Scenario 4 auth');

// Seed connections + available triggers; start with an empty active list.
setMockBehaviors({
Expand All @@ -248,6 +263,9 @@ describe('Mega flow — login + Gmail OAuth + Composio in one session', () => {
});

const before = await callOpenhumanRpc('openhuman.composio_list_triggers', {});
if (!before.ok) {
console.log(`${LOG} composio: list before enable failed`, before);
}
expect(before.ok).toBe(true);
// list_triggers always emits a log line → RpcOutcome wraps in {result, logs}.
// JSON-RPC result shape: { result: { triggers: [...] }, logs: [...] }
Expand All @@ -262,9 +280,15 @@ describe('Mega flow — login + Gmail OAuth + Composio in one session', () => {
connection_id: 'c1',
slug: 'GMAIL_NEW_GMAIL_MESSAGE',
});
if (!enable.ok) {
console.log(`${LOG} composio: enable failed`, enable);
}
expect(enable.ok).toBe(true);

const after = await callOpenhumanRpc('openhuman.composio_list_triggers', {});
if (!after.ok) {
console.log(`${LOG} composio: list after enable failed`, after);
}
expect(after.ok).toBe(true);
const afterList = (after.result?.result?.triggers ?? after.result?.triggers ?? []) as unknown[];
expect(afterList.length).toBeGreaterThan(0);
Expand Down Expand Up @@ -561,6 +585,7 @@ describe('Mega flow — login + Gmail OAuth + Composio in one session', () => {

await triggerDeepLink('openhuman://auth?token=mega-composio-webhook-token');
await waitForMockRequest('POST', '/telegram/login-tokens/', 15_000);
await waitForBackendSession('Scenario 11 auth');
clearRequestLog();

// Seed composio state.
Expand All @@ -577,6 +602,9 @@ describe('Mega flow — login + Gmail OAuth + Composio in one session', () => {
connection_id: 'c2',
slug: 'GITHUB_PULL_REQUEST_EVENT',
});
if (!enable.ok) {
console.log(`${LOG} composio+webhook: enable failed`, enable);
}
expect(enable.ok).toBe(true);
console.log(`${LOG} composio+webhook: trigger enabled`);

Expand Down
12 changes: 6 additions & 6 deletions src/openhuman/config/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ pub use storage_memory::{
};
pub use tools::{
BrowserComputerUseConfig, BrowserConfig, ComposioConfig, ComputerControlConfig, CurlConfig,
GitbooksConfig, HttpRequestConfig, IntegrationToggle, IntegrationsConfig, McpAuthConfig,
McpClientConfig, McpClientIdentityConfig, McpServerConfig, MultimodalConfig,
PolymarketClobCredentials, PolymarketConfig, SearchConfig, SearchEngine,
SearchEngineCredentials, SearxngConfig, SecretsConfig, SeltzConfig, WebSearchConfig,
COMPOSIO_MODE_BACKEND, COMPOSIO_MODE_DIRECT, SEARCH_ENGINE_BRAVE, SEARCH_ENGINE_MANAGED,
SEARCH_ENGINE_PARALLEL,
ExternalCapabilityProviderConfig, ExternalCapabilityProvidersConfig, GitbooksConfig,
HttpRequestConfig, IntegrationToggle, IntegrationsConfig, McpAuthConfig, McpClientConfig,
McpClientIdentityConfig, McpServerConfig, MultimodalConfig, PolymarketClobCredentials,
PolymarketConfig, SearchConfig, SearchEngine, SearchEngineCredentials, SearxngConfig,
SecretsConfig, SeltzConfig, WebSearchConfig, COMPOSIO_MODE_BACKEND, COMPOSIO_MODE_DIRECT,
SEARCH_ENGINE_BRAVE, SEARCH_ENGINE_MANAGED, SEARCH_ENGINE_PARALLEL,
};
pub use update::{UpdateConfig, UpdateRestartStrategy};
mod voice_server;
Expand Down
3 changes: 3 additions & 0 deletions src/openhuman/config/schema/tools.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
//! Tool-related config: browser, HTTP, web search, composio, secrets, multimodal.

use super::defaults;
pub use crate::openhuman::external_capabilities::{
ExternalCapabilityProviderConfig, ExternalCapabilityProvidersConfig,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
Expand Down
4 changes: 4 additions & 0 deletions src/openhuman/config/schema/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ pub struct Config {
#[serde(default)]
pub mcp_client: McpClientConfig,

#[serde(default)]
pub external_capability_providers: ExternalCapabilityProvidersConfig,

#[serde(default)]
pub multimodal: MultimodalConfig,

Expand Down Expand Up @@ -612,6 +615,7 @@ impl Default for Config {
storage: StorageConfig::default(),
composio: ComposioConfig::default(),
secrets: SecretsConfig::default(),
external_capability_providers: ExternalCapabilityProvidersConfig::default(),
browser: BrowserConfig::default(),
http_request: HttpRequestConfig::default(),
curl: CurlConfig::default(),
Expand Down
14 changes: 14 additions & 0 deletions src/openhuman/external_capabilities/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//! Registry for external capability providers.
//!
//! This module keeps provider identity and trust metadata generic. It does not
//! know how any provider packages, loads, or executes capabilities; it only
//! normalizes the provider records OpenHuman can use for admission, policy, and
//! diagnostics.

mod registry;
mod types;

pub use registry::{normalize_provider_id, ExternalCapabilityProviderRegistry};
pub use types::{
ExternalCapabilityProvider, ExternalCapabilityProviderConfig, ExternalCapabilityProvidersConfig,
};
258 changes: 258 additions & 0 deletions src/openhuman/external_capabilities/registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
use std::collections::BTreeMap;

use super::types::{
ExternalCapabilityProvider, ExternalCapabilityProviderConfig, ExternalCapabilityProvidersConfig,
};

impl ExternalCapabilityProvider {
pub(crate) fn from_config(config: &ExternalCapabilityProviderConfig) -> Result<Self, String> {
let id = normalize_provider_id(&config.id)
.ok_or_else(|| format!("invalid external capability provider id `{}`", config.id))?;
let name = config.name.trim();
if name.is_empty() {
return Err(format!(
"external capability provider `{id}` name must be non-empty"
));
}

Ok(Self {
id,
name: name.to_string(),
source_uri: trim_optional(&config.source_uri),
source_digest: trim_optional(&config.source_digest),
trusted: config.trusted,
enabled: config.enabled,
})
}
}

/// Lookup table for normalized external capability providers.
#[derive(Debug, Clone, Default)]
pub struct ExternalCapabilityProviderRegistry {
providers: BTreeMap<String, ExternalCapabilityProvider>,
errors: Vec<String>,
}

impl ExternalCapabilityProviderRegistry {
/// Build a registry from config, collecting invalid records as errors.
pub fn from_config(config: &ExternalCapabilityProvidersConfig) -> Self {
let total_providers = config.providers.len();
log::debug!(
"[external_capability][registry] build_start total_providers={}",
total_providers
);
let mut providers = BTreeMap::new();
let mut errors = Vec::new();
let mut accepted_count = 0usize;
let mut rejected_count = 0usize;
let mut duplicate_count = 0usize;

for provider in &config.providers {
match ExternalCapabilityProvider::from_config(provider) {
Ok(provider) => {
if providers.contains_key(&provider.id) {
duplicate_count += 1;
rejected_count += 1;
log::debug!(
"[external_capability][registry] provider_duplicate provider_id={} total_providers={} accepted_count={} rejected_count={} duplicate_count={}",
provider.id,
total_providers,
accepted_count,
rejected_count,
duplicate_count
);
errors.push(format!(
"duplicate external capability provider id `{}`",
provider.id
));
} else {
accepted_count += 1;
log::debug!(
"[external_capability][registry] provider_accepted provider_id={} trusted={} enabled={} total_providers={} accepted_count={} rejected_count={} duplicate_count={}",
provider.id,
provider.trusted,
provider.enabled,
total_providers,
accepted_count,
rejected_count,
duplicate_count
);
providers.insert(provider.id.clone(), provider);
}
}
Err(err) => {
rejected_count += 1;
log::debug!(
"[external_capability][registry] provider_rejected provider_config_id={} error={} total_providers={} accepted_count={} rejected_count={} duplicate_count={}",
provider.id,
err,
total_providers,
accepted_count,
rejected_count,
duplicate_count
);
errors.push(err);
}
}
}

let provider_ids = providers.keys().cloned().collect::<Vec<_>>();
log::debug!(
"[external_capability][registry] build_end total_providers={} accepted_count={} rejected_count={} duplicate_count={} error_count={} provider_ids={:?} errors={:?}",
total_providers,
accepted_count,
rejected_count,
duplicate_count,
errors.len(),
provider_ids,
errors
);

Self { providers, errors }
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/// Whether the registry has no valid providers.
pub fn is_empty(&self) -> bool {
self.providers.is_empty()
}

/// List valid providers in normalized id order.
pub fn list(&self) -> Vec<&ExternalCapabilityProvider> {
self.providers.values().collect()
}

/// Get a provider by raw or normalized id.
pub fn get(&self, provider_id: &str) -> Option<&ExternalCapabilityProvider> {
normalize_provider_id(provider_id).and_then(|id| self.providers.get(&id))
}

/// Whether a provider is known, enabled, and trusted.
pub fn can_register_tools(&self, provider_id: &str) -> bool {
self.get(provider_id)
.map(ExternalCapabilityProvider::can_register_tools)
.unwrap_or(false)
}

/// Config load errors for invalid or duplicate provider records.
pub fn errors(&self) -> &[String] {
&self.errors
}
}

/// Normalize and validate an external capability provider id.
pub fn normalize_provider_id(value: &str) -> Option<String> {
let normalized = value.trim().to_ascii_lowercase();
if normalized.is_empty() {
return None;
}
let valid = normalized
.chars()
.all(|ch| ch.is_ascii_lowercase() || ch.is_ascii_digit() || matches!(ch, '-' | '_' | '.'));
if !valid {
return None;
}
let starts_or_ends_with_sep = normalized
.chars()
.next()
.zip(normalized.chars().last())
.map(|(first, last)| is_separator(first) || is_separator(last))
.unwrap_or(true);
if starts_or_ends_with_sep {
return None;
}
Some(normalized)
}

fn is_separator(ch: char) -> bool {
matches!(ch, '-' | '_' | '.')
}

fn trim_optional(value: &Option<String>) -> Option<String> {
value
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}

#[cfg(test)]
mod tests {
use super::*;

fn config(id: &str) -> ExternalCapabilityProviderConfig {
ExternalCapabilityProviderConfig {
id: id.to_string(),
name: "Local Runtime".to_string(),
source_uri: Some(" file:///runtime ".to_string()),
source_digest: Some(" sha256:abc ".to_string()),
trusted: true,
enabled: true,
}
}

#[test]
fn normalizes_valid_provider_ids() {
assert_eq!(
normalize_provider_id(" Local.Runtime_1 "),
Some("local.runtime_1".to_string())
);
assert_eq!(
normalize_provider_id("provider-1"),
Some("provider-1".to_string())
);
}

#[test]
fn rejects_invalid_provider_ids() {
assert_eq!(normalize_provider_id(""), None);
assert_eq!(normalize_provider_id(".provider"), None);
assert_eq!(normalize_provider_id("provider."), None);
assert_eq!(normalize_provider_id("provider id"), None);
assert_eq!(normalize_provider_id("provider/id"), None);
}

#[test]
fn registry_loads_trusted_enabled_provider() {
let registry =
ExternalCapabilityProviderRegistry::from_config(&ExternalCapabilityProvidersConfig {
providers: vec![config("runtime.local")],
});

assert!(registry.errors().is_empty());
assert_eq!(registry.list().len(), 1);
assert!(registry.can_register_tools("RUNTIME.LOCAL"));
let provider = registry.get("runtime.local").unwrap();
assert_eq!(provider.source_uri.as_deref(), Some("file:///runtime"));
assert_eq!(provider.source_digest.as_deref(), Some("sha256:abc"));
}

#[test]
fn disabled_or_untrusted_provider_cannot_register_tools() {
let mut disabled = config("disabled.runtime");
disabled.enabled = false;
let mut untrusted = config("untrusted.runtime");
untrusted.trusted = false;
let registry =
ExternalCapabilityProviderRegistry::from_config(&ExternalCapabilityProvidersConfig {
providers: vec![disabled, untrusted],
});

assert!(!registry.can_register_tools("disabled.runtime"));
assert!(!registry.can_register_tools("untrusted.runtime"));
}

#[test]
fn registry_reports_duplicates_and_invalid_records() {
let mut unnamed = config("unnamed.runtime");
unnamed.name = " ".to_string();
let registry =
ExternalCapabilityProviderRegistry::from_config(&ExternalCapabilityProvidersConfig {
providers: vec![config("runtime.local"), config("RUNTIME.LOCAL"), unnamed],
});

assert_eq!(registry.list().len(), 1);
assert_eq!(registry.errors().len(), 2);
assert!(registry.errors()[0].contains("duplicate"));
assert!(registry.errors()[1].contains("name must be non-empty"));
}
}
Loading
Loading