diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 9acc80533c..f2d78fc8c9 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -73,6 +73,10 @@ jobs: image: ghcr.io/tinyhumansai/openhuman_ci:rust-1.93.0 env: CARGO_INCREMENTAL: '0' + # Coverage instrumentation makes each test binary link substantially + # heavier. Keep the core coverage job's linker work serialized to avoid + # intermittent rust-lld bus errors on hosted Linux runners. + CARGO_BUILD_JOBS: '1' # sccache is incompatible with `-C instrument-coverage` profiles, so we # skip it for coverage runs and rely on Swatinem/rust-cache for warmup. steps: diff --git a/app/src-tauri/src/core_process.rs b/app/src-tauri/src/core_process.rs index ec030cc65a..2dc2dec9de 100644 --- a/app/src-tauri/src/core_process.rs +++ b/app/src-tauri/src/core_process.rs @@ -367,15 +367,21 @@ impl CoreProcessHandle { let port_open = self.is_rpc_port_open().await; return Err(self - .cleanup_startup_timeout(received_ready, port_open) + .cleanup_startup_timeout(received_ready, port_open, startup_attempt + 1) .await); } let port_open = self.is_rpc_port_open().await; - Err(self.cleanup_startup_timeout(false, port_open).await) + Err(self.cleanup_startup_timeout(false, port_open, 2).await) } - async fn cleanup_startup_timeout(&self, received_ready: bool, port_open: bool) -> String { + async fn cleanup_startup_timeout( + &self, + received_ready: bool, + port_open: bool, + attempt: u8, + ) -> String { + let port = self.port(); let task_state = { let guard = self.task.lock().await; match guard.as_ref() { @@ -386,14 +392,16 @@ impl CoreProcessHandle { }; log::error!( "[core] startup timed out after {CORE_READY_TIMEOUT_MS}ms \ - (ready_signal={received_ready}, port_open={port_open}, task_state={task_state}); \ + (port={port}, ready_signal={received_ready}, port_open={port_open}, \ + task_state={task_state}, attempt={attempt}); \ aborting embedded startup task before retry" ); self.cancel_shutdown_token(" after startup timeout").await; self.abort_task(" after startup timeout").await; format!( "core process did not become ready within {CORE_READY_TIMEOUT_MS}ms \ - (ready_signal={received_ready}, port_open={port_open}, task_state={task_state})" + (port={port}, ready_signal={received_ready}, port_open={port_open}, \ + task_state={task_state}, attempt={attempt})" ) } diff --git a/app/src-tauri/src/core_process_tests.rs b/app/src-tauri/src/core_process_tests.rs index 45a956a793..5ec730ec26 100644 --- a/app/src-tauri/src/core_process_tests.rs +++ b/app/src-tauri/src/core_process_tests.rs @@ -543,7 +543,7 @@ fn startup_timeout_cleanup_aborts_task_and_clears_slot() { *guard = Some(task); } - let message = handle.cleanup_startup_timeout(false, false).await; + let message = handle.cleanup_startup_timeout(false, false, 2).await; assert!( message.contains("core process did not become ready within"), @@ -553,6 +553,10 @@ fn startup_timeout_cleanup_aborts_task_and_clears_slot() { message.contains("ready_signal=false"), "timeout message should include ready signal state: {message}" ); + assert!( + message.contains("port=19006"), + "timeout message should include RPC port: {message}" + ); assert!( message.contains("port_open=false"), "timeout message should include final port state: {message}" @@ -561,6 +565,10 @@ fn startup_timeout_cleanup_aborts_task_and_clears_slot() { message.contains("task_state=running"), "timeout message should include task state: {message}" ); + assert!( + message.contains("attempt=2"), + "timeout message should include startup attempt: {message}" + ); assert!( handle.task.lock().await.is_none(), "cleanup must clear the managed task slot so retry can spawn fresh" diff --git a/app/src/components/composio/toolkitMeta.test.tsx b/app/src/components/composio/toolkitMeta.test.tsx index b0ff99c24c..4f5d274c44 100644 --- a/app/src/components/composio/toolkitMeta.test.tsx +++ b/app/src/components/composio/toolkitMeta.test.tsx @@ -4,9 +4,10 @@ import { composioToolkitMeta, KNOWN_COMPOSIO_TOOLKITS } from './toolkitMeta'; describe('composioToolkitMeta', () => { it('ships the full Composio managed-auth catalog fallback', () => { - expect(KNOWN_COMPOSIO_TOOLKITS).toHaveLength(118); + expect(KNOWN_COMPOSIO_TOOLKITS).toHaveLength(119); expect(KNOWN_COMPOSIO_TOOLKITS).toContain('gmail'); expect(KNOWN_COMPOSIO_TOOLKITS).toContain('discord'); + expect(KNOWN_COMPOSIO_TOOLKITS).toContain('larksuite'); expect(KNOWN_COMPOSIO_TOOLKITS).toContain('supabase'); expect(KNOWN_COMPOSIO_TOOLKITS).toContain('zoom'); }); @@ -24,6 +25,18 @@ describe('composioToolkitMeta', () => { expect(calendar.logoUrl).toContain('/googlecalendar'); }); + it('normalizes Lark and Feishu aliases to the LarkSuite toolkit for Chinese workplace coverage (#2148)', () => { + const larksuite = composioToolkitMeta('larksuite'); + const lark = composioToolkitMeta('lark'); + const feishu = composioToolkitMeta('feishu'); + + expect(larksuite.name).toBe('Lark / Feishu'); + expect(larksuite.category).toBe('Chat'); + expect(larksuite.permissionLabel).toBe('Messages, channels, and communication data'); + expect(lark.slug).toBe('larksuite'); + expect(feishu.slug).toBe('larksuite'); + }); + it('documents Instagram Business account requirement and Meta 429 guidance', () => { const meta = composioToolkitMeta('instagram'); expect(meta.description).toMatch(/Business or Creator/i); diff --git a/app/src/components/composio/toolkitMeta.tsx b/app/src/components/composio/toolkitMeta.tsx index a4b4913c7e..93d824b844 100644 --- a/app/src/components/composio/toolkitMeta.tsx +++ b/app/src/components/composio/toolkitMeta.tsx @@ -8,8 +8,8 @@ * names, categories, descriptions, and logos for rendering. * * Source of truth for the managed-auth list: - * https://docs.composio.dev/toolkits/managed-auth (118 toolkits as of - * May 1, 2026). + * https://docs.composio.dev/toolkits/managed-auth plus OpenHuman's + * compatibility aliases (119 toolkits as of May 21, 2026). */ import { type ReactNode, useState } from 'react'; @@ -100,6 +100,7 @@ const MANAGED_COMPOSIO_TOOLKITS: readonly ManagedToolkitEntry[] = Object.freeze( { slug: 'intercom', name: 'Intercom' }, { slug: 'jira', name: 'Jira' }, { slug: 'kit', name: 'Kit' }, + { slug: 'larksuite', name: 'Lark / Feishu' }, { slug: 'linear', name: 'Linear' }, { slug: 'linkedin', name: 'LinkedIn' }, { slug: 'linkhut', name: 'Linkhut' }, @@ -163,7 +164,16 @@ const MANAGED_TOOLKIT_NAME_BY_SLUG = new Map( MANAGED_COMPOSIO_TOOLKITS.map(entry => [entry.slug, entry.name]) ); -const CHAT_KEYWORDS = ['discord', 'slack', 'teams', 'webex', 'whatsapp', 'dialpad']; +const CHAT_KEYWORDS = [ + 'discord', + 'slack', + 'teams', + 'webex', + 'whatsapp', + 'dialpad', + 'lark', + 'feishu', +]; const SOCIAL_KEYWORDS = [ 'facebook', 'instagram', diff --git a/app/src/hooks/useUsageState.test.ts b/app/src/hooks/useUsageState.test.ts index 352b7c0a46..4e0836d845 100644 --- a/app/src/hooks/useUsageState.test.ts +++ b/app/src/hooks/useUsageState.test.ts @@ -37,6 +37,21 @@ const ALL_OPENHUMAN_AI_SETTINGS = { }, }; +const ALL_LOCAL_AI_SETTINGS = { + cloudProviders: [], + routing: { + chat: { kind: 'local' as const, model: 'qwen3:8b' }, + reasoning: { kind: 'local' as const, model: 'qwen3:8b' }, + agentic: { kind: 'local' as const, model: 'qwen3:8b' }, + coding: { kind: 'local' as const, model: 'qwen3:8b' }, + memory: { kind: 'local' as const, model: 'nomic-embed-text' }, + embeddings: { kind: 'local' as const, model: 'nomic-embed-text' }, + heartbeat: { kind: 'local' as const, model: 'qwen3:8b' }, + learning: { kind: 'local' as const, model: 'qwen3:8b' }, + subconscious: { kind: 'local' as const, model: 'qwen3:8b' }, + }, +}; + interface BuildUsageOpts { remainingUsd?: number; cycleBudgetUsd?: number; @@ -475,6 +490,79 @@ describe('useUsageState', () => { expect(result.current.isAtLimit).toBe(false); }); + it('does not fetch billing usage when every workload routes away from OpenHuman (#2020)', async () => { + const { useUsageState } = await import('./useUsageState'); + + mockLoadAISettings.mockResolvedValue(ALL_LOCAL_AI_SETTINGS); + mockGetCurrentPlan.mockRejectedValue(new Error('billing plan should not be fetched')); + mockGetTeamUsage.mockRejectedValue(new Error('team usage should not be fetched')); + + const { result } = renderHook(() => useUsageState()); + + await waitFor(() => { + expect(result.current.isLoading).toBe(false); + }); + + expect(result.current.teamUsage).toBeNull(); + expect(result.current.currentPlan).toBeNull(); + expect(result.current.isFullyRoutedAway).toBe(true); + expect(mockGetCurrentPlan).not.toHaveBeenCalled(); + expect(mockGetTeamUsage).not.toHaveBeenCalled(); + }); + + it('rechecks routing before returning a warm billing cache (#2020)', async () => { + const { useUsageState } = await import('./useUsageState'); + + mockGetCurrentPlan.mockResolvedValue(basicPlan()); + mockGetTeamUsage.mockResolvedValue(buildUsage({ remainingUsd: 0, cycleBudgetUsd: 10 })); + mockLoadAISettings + .mockResolvedValueOnce(ALL_OPENHUMAN_AI_SETTINGS) + .mockResolvedValueOnce(ALL_LOCAL_AI_SETTINGS); + + const first = renderHook(() => useUsageState()); + await waitFor(() => { + expect(first.result.current.isLoading).toBe(false); + }); + expect(first.result.current.teamUsage).not.toBeNull(); + first.unmount(); + + mockGetCurrentPlan.mockClear(); + mockGetTeamUsage.mockClear(); + + const second = renderHook(() => useUsageState()); + await waitFor(() => { + expect(second.result.current.isLoading).toBe(false); + }); + + expect(second.result.current.teamUsage).toBeNull(); + expect(second.result.current.currentPlan).toBeNull(); + expect(second.result.current.isFullyRoutedAway).toBe(true); + expect(mockLoadAISettings).toHaveBeenCalledTimes(2); + expect(mockGetCurrentPlan).not.toHaveBeenCalled(); + expect(mockGetTeamUsage).not.toHaveBeenCalled(); + }); + + it('still fetches billing when a background workload remains on OpenHuman', async () => { + const { useUsageState } = await import('./useUsageState'); + + mockLoadAISettings.mockResolvedValue({ + ...ALL_LOCAL_AI_SETTINGS, + routing: { ...ALL_LOCAL_AI_SETTINGS.routing, memory: { kind: 'openhuman' as const } }, + }); + mockGetCurrentPlan.mockResolvedValue(freePlan()); + mockGetTeamUsage.mockResolvedValue(buildUsage()); + + const { result } = renderHook(() => useUsageState()); + + await waitFor(() => { + expect(result.current.isLoading).toBe(false); + }); + + expect(result.current.isFullyRoutedAway).toBe(true); + expect(mockGetCurrentPlan).toHaveBeenCalledTimes(1); + expect(mockGetTeamUsage).toHaveBeenCalledTimes(1); + }); + it('rethrows CoreRpcError(kind=auth_expired) from loadAISettings instead of swallowing it (graycyrus review on #2053)', async () => { // The two sibling fetches (getTeamUsage, getCurrentPlan) explicitly // re-throw auth_expired so coreRpcClient's global re-auth event fires. diff --git a/app/src/hooks/useUsageState.ts b/app/src/hooks/useUsageState.ts index 79d4602482..534c4ffbbd 100644 --- a/app/src/hooks/useUsageState.ts +++ b/app/src/hooks/useUsageState.ts @@ -1,6 +1,11 @@ import { useCallback, useEffect, useState } from 'react'; -import { type AISettings, CHAT_WORKLOADS, loadAISettings } from '../services/api/aiSettingsApi'; +import { + type AISettings, + ALL_WORKLOADS, + CHAT_WORKLOADS, + loadAISettings, +} from '../services/api/aiSettingsApi'; import { billingApi } from '../services/api/billingApi'; import { creditsApi, type TeamUsage } from '../services/api/creditsApi'; import { CoreRpcError } from '../services/coreRpcClient'; @@ -38,19 +43,45 @@ let _cache: { const USAGE_UNAVAILABLE = Symbol('usage-unavailable'); +function workloadsRoutedAway(aiSettings: AISettings, workloads: readonly string[]): boolean { + return workloads.every(w => { + const ref = aiSettings.routing[w as keyof AISettings['routing']]; + return ref !== undefined && ref.kind !== 'openhuman'; + }); +} + async function fetchUsageData(): Promise<{ teamUsage: TeamUsage | null; currentPlan: CurrentPlanData | null; aiSettings: AISettings | null; } | null> { + // Read routing first. If every workload is explicitly assigned to a local + // or user-supplied cloud provider, this session should not phone home to + // OpenHuman's billing/usage APIs at all (#2020). Missing/failed AI settings + // stay conservative and fall through to the existing billing path. + const aiSettings = await loadAISettings().catch(err => { + if (err instanceof CoreRpcError && err.kind === 'auth_expired') { + throw err; + } + return USAGE_UNAVAILABLE; + }); + if ( + aiSettings !== USAGE_UNAVAILABLE && + workloadsRoutedAway(aiSettings as AISettings, ALL_WORKLOADS) + ) { + return { teamUsage: null, currentPlan: null, aiSettings: aiSettings as AISettings }; + } if (_cache && Date.now() - _cache.fetchedAt < CACHE_TTL_MS) { - return _cache.data; + return { + ..._cache.data, + aiSettings: aiSettings === USAGE_UNAVAILABLE ? null : (aiSettings as AISettings), + }; } // Wrap each leg so a single failing call (e.g. /teams returning 401 after // session expiry) cannot reject the Promise.all microtask before the // sibling resolves — that race let the unhandled rejection leak to the // window's unhandledrejection trap and onward to Sentry (#1472). - const [teamUsage, currentPlan, aiSettings] = await Promise.all([ + const [teamUsage, currentPlan] = await Promise.all([ creditsApi.getTeamUsage().catch(err => { if (err instanceof CoreRpcError && err.kind === 'auth_expired') { throw err; @@ -63,19 +94,6 @@ async function fetchUsageData(): Promise<{ } return USAGE_UNAVAILABLE; }), - // AI settings drive the "routed away from openhuman" detection used to - // suppress the budget banner when the user supplied their own provider - // key (#2040 / #2041). Mirror the sibling fetches: re-throw - // CoreRpcError(kind='auth_expired') so the documented session-expired - // signal still reaches the global re-auth handler (graycyrus review on - // #2053). Other failures are treated as "unknown" — the budget gate - // stays in its conservative (banner-on) state. - loadAISettings().catch(err => { - if (err instanceof CoreRpcError && err.kind === 'auth_expired') { - throw err; - } - return USAGE_UNAVAILABLE; - }), ]); const data = { teamUsage: teamUsage === USAGE_UNAVAILABLE ? null : (teamUsage as TeamUsage), @@ -154,12 +172,7 @@ export function useUsageState(): UsageState { // user. Conservative on missing aiSettings (treat as still using // openhuman) so we never silently disable the gate after a transient // fetch failure (#2040, #2041). - const isFullyRoutedAway = aiSettings - ? CHAT_WORKLOADS.every(w => { - const ref = aiSettings.routing[w]; - return ref !== undefined && ref.kind !== 'openhuman'; - }) - : false; + const isFullyRoutedAway = aiSettings ? workloadsRoutedAway(aiSettings, CHAT_WORKLOADS) : false; const rawBudgetExhausted = teamUsage ? teamUsage.cycleBudgetUsd > 0.01 && teamUsage.remainingUsd <= 0.01 diff --git a/app/src/lib/composio/toolkitSlug.ts b/app/src/lib/composio/toolkitSlug.ts index 362fee8ef3..5dfcff36c1 100644 --- a/app/src/lib/composio/toolkitSlug.ts +++ b/app/src/lib/composio/toolkitSlug.ts @@ -1,7 +1,9 @@ const TOOLKIT_ALIASES: Record = { + feishu: 'larksuite', google_calendar: 'googlecalendar', google_drive: 'googledrive', google_sheets: 'googlesheets', + lark: 'larksuite', }; export function canonicalizeComposioToolkitSlug(slug: string): string { diff --git a/app/src/lib/i18n/chunks/de-5.ts b/app/src/lib/i18n/chunks/de-5.ts index b88de30cf6..2ce85a837c 100644 --- a/app/src/lib/i18n/chunks/de-5.ts +++ b/app/src/lib/i18n/chunks/de-5.ts @@ -217,25 +217,6 @@ const de5: TranslationMap = { 'settings.developerMenu.integrationTriggers.title': 'Integrationsauslöser', 'settings.developerMenu.integrationTriggers.desc': 'Konfiguriere KI-Triage-Einstellungen für Composio-Integrationsauslöser', - 'settings.mcpServer.title': 'MCP-Server', - 'settings.mcpServer.toolsSectionTitle': 'Verfügbare Werkzeuge', - 'settings.mcpServer.toolsSectionDesc': - 'Werkzeuge, die über den MCP-Stdio-Server beim Ausführen von openhuman-core mcp bereitgestellt werden', - 'settings.mcpServer.configSectionTitle': 'Client-Konfiguration', - 'settings.mcpServer.configSectionDesc': - 'Wähle deinen MCP-Client aus, um das richtige Konfigurations-Snippet zu generieren', - 'settings.mcpServer.copySnippet': 'In die Zwischenablage kopieren', - 'settings.mcpServer.copied': 'Kopiert!', - 'settings.mcpServer.openConfigFile': 'Konfigurationsdatei öffnen', - 'settings.mcpServer.binaryPathNotFound': - 'OpenHuman-Binärdatei nicht gefunden. Wenn du aus dem Quellcode arbeitest, baue sie mit: cargo build --bin openhuman-core', - 'settings.mcpServer.openConfigError': 'Konfigurationsdatei konnte nicht geöffnet werden', - 'settings.mcpServer.clientClaudeDesktop': 'Claude Desktop', - 'settings.mcpServer.clientCursor': 'Cursor', - 'settings.mcpServer.clientCodex': 'Codex', - 'settings.mcpServer.clientZed': 'Zed', - 'settings.mcpServer.configFilePath': 'Konfigurationsdatei', - 'settings.mcpServer.clientSelectorAriaLabel': 'MCP-Client-Auswahl', 'settings.appearance.menuDesc': 'Wähle hell, dunkel oder passend zu deinem Systemthema', 'settings.mascot.active': 'Aktiv', 'settings.mascot.characterDesc': 'Charakterbeschreibung', @@ -564,6 +545,26 @@ const de5: TranslationMap = { 'settings.appearance.tabBarAlwaysShowLabels': 'Beschriftungen immer anzeigen', 'settings.appearance.tabBarAlwaysShowLabelsDesc': 'Wenn diese Option deaktiviert ist, werden Beschriftungen nur beim Hover oder für die aktive Registerkarte angezeigt.', + 'settings.mcpServer.title': 'MCP Server', + 'settings.mcpServer.toolsSectionTitle': 'Verfügbare Werkzeuge', + 'settings.mcpServer.toolsSectionDesc': + 'Werkzeuge, die über den MCP-stdio-Server bereitgestellt werden, wenn openhuman-core mcp ausgeführt wird', + 'settings.mcpServer.configSectionTitle': 'Client-Konfiguration', + 'settings.mcpServer.configSectionDesc': + 'Wähle deinen MCP-Client aus, um den passenden Konfigurationsausschnitt zu erzeugen', + 'settings.mcpServer.copySnippet': 'In die Zwischenablage kopieren', + 'settings.mcpServer.copied': 'Kopiert!', + 'settings.mcpServer.openConfigFile': 'Konfigurationsdatei öffnen', + 'settings.mcpServer.binaryPathNotFound': + 'OpenHuman-Binärdatei nicht gefunden. Wenn du aus dem Quellcode arbeitest, baue sie mit: cargo build --bin openhuman-core', + 'settings.mcpServer.openConfigError': 'Konfigurationsdatei konnte nicht geöffnet werden', + 'settings.mcpServer.clientClaudeDesktop': 'Claude Desktop', + 'settings.mcpServer.clientCursor': 'Cursor', + 'settings.mcpServer.clientCodex': 'Codex', + 'settings.mcpServer.clientZed': 'Zed', + 'settings.mcpServer.configFilePath': 'Konfigurationsdatei', + 'settings.mcpServer.clientSelectorAriaLabel': 'MCP-Client-Auswahl', + 'common.breadcrumb': 'Breadcrumb', 'settings.betaBuild': 'Beta-Build – v{version}', 'migration.vendor.openclaw': 'OpenClaw', diff --git a/app/src/utils/tauriCommands/vault.ts b/app/src/utils/tauriCommands/vault.ts index 496ac97d11..52b848739d 100644 --- a/app/src/utils/tauriCommands/vault.ts +++ b/app/src/utils/tauriCommands/vault.ts @@ -8,6 +8,7 @@ export interface CoreVault { id: string; name: string; root_path: string; + host_os?: string | null; namespace: string; include_globs: string[]; exclude_globs: string[]; diff --git a/gitbooks/features/native-tools/integrations.md b/gitbooks/features/native-tools/integrations.md index d463b5d176..bf90996b4c 100644 --- a/gitbooks/features/native-tools/integrations.md +++ b/gitbooks/features/native-tools/integrations.md @@ -1,11 +1,11 @@ --- -description: The agent's view of the 118+ connected third-party services. +description: The agent's view of the 119+ connected third-party services. icon: plug --- # Third-party Integrations -OpenHuman's agent can call into [118+ third-party services](../integrations/README.md) - Gmail, Notion, GitHub, Slack, Stripe, Calendar, and the long tail - through a single proxied tool surface. +OpenHuman's agent can call into [119+ third-party services](../integrations/README.md) - Gmail, Notion, GitHub, Slack, Lark / Feishu, Stripe, Calendar, and the long tail - through a single proxied tool surface. ## How it shows up to the agent @@ -13,21 +13,23 @@ Once you've connected a service via OAuth, its actions become callable tools. Th A few examples of what becomes available: -* "Send a message to #engineering on Slack." -* "Create an issue in the openhuman repo." -* "What's on my calendar tomorrow?" -* "Pull the last 20 Stripe charges over $1000." +- "Send a message to #engineering on Slack." +- "Create an issue in the openhuman repo." +- "What's on my calendar tomorrow?" +- "Pull the last 20 Stripe charges over $1000." ## Native vs proxied Some services have **native providers** - Rust modules that know how to ingest the service into the [Memory Tree](../obsidian-wiki/memory-tree.md) directly (e.g. Gmail's native ingest path). Others are exposed as **proxied tools** only: the agent can call them, but there's no automatic ingest yet. New native providers are added as features land. +Lark / Feishu currently has two surfaces: a native real-time channel for message send/receive, and a Composio-proxied workspace toolkit entry for chat, docs, wiki, and meeting actions when the backend allowlist exposes it. Historical chat/doc backfill into the Memory Tree is not yet a native provider; track that separately from the live channel connector. + ## Privacy boundary -OpenHuman's core never calls any third-party API directly. All requests go through the OpenHuman backend, which handles OAuth tokens and rate limiting. Your tokens never sit on disk in plaintext on your machine, and the agent only sees the *results* of tool calls, not the credentials. +For Composio-proxied integrations, OpenHuman's core never calls any third-party API directly. Requests go through the OpenHuman backend, which handles OAuth tokens and rate limiting. Your tokens never sit on disk in plaintext on your machine, and the agent only sees the _results_ of tool calls, not the credentials. Native channels such as Lark / Feishu use their own local configuration and should be reviewed separately from the Composio OAuth boundary. ## See also -* [Third-party Integrations (catalog)](../integrations/README.md) - the user-facing pitch, OAuth flow, and connection management. -* [Auto-fetch](../obsidian-wiki/auto-fetch.md) - how connected services flow into the Memory Tree. -* [Privacy & Security](../privacy-and-security.md) - the full boundary. +- [Third-party Integrations (catalog)](../integrations/README.md) - the user-facing pitch, OAuth flow, and connection management. +- [Auto-fetch](../obsidian-wiki/auto-fetch.md) - how connected services flow into the Memory Tree. +- [Privacy & Security](../privacy-and-security.md) - the full boundary. diff --git a/src/core/auth.rs b/src/core/auth.rs index caa95e29d8..543859f529 100644 --- a/src/core/auth.rs +++ b/src/core/auth.rs @@ -19,6 +19,8 @@ //! Endpoints exempt from auth (checked by [`rpc_auth_middleware`]): //! - `GET /` — public info page //! - `GET /health` — liveness probe +//! - `GET /auth` — desktop login callback fallback; consumes only +//! one-time login tokens, never raw session JWTs //! - `GET /auth/telegram` — external browser callback (carries its own token) //! - `GET /schema` — read-only schema discovery //! - `GET /events` — SSE stream; browser `EventSource` cannot set headers @@ -65,6 +67,7 @@ static RPC_TOKEN: OnceLock = OnceLock::new(); const PUBLIC_PATHS: &[&str] = &[ "/", "/health", + "/auth", "/auth/telegram", "/schema", "/events", @@ -448,6 +451,11 @@ mod tests { ); } + #[test] + fn public_paths_include_desktop_auth_callback() { + assert!(PUBLIC_PATHS.contains(&"/auth")); + } + #[cfg(unix)] #[test] fn token_file_has_owner_only_permissions() { diff --git a/src/core/jsonrpc.rs b/src/core/jsonrpc.rs index 06417ef3ab..3a9dae2a06 100644 --- a/src/core/jsonrpc.rs +++ b/src/core/jsonrpc.rs @@ -399,8 +399,18 @@ struct TelegramAuthQuery { token: Option, } +/// Query parameters for the generic desktop auth callback. +#[derive(Debug, serde::Deserialize)] +struct DesktopAuthQuery { + /// One-time login token consumed through the backend. + token: Option, + /// Deprecated backend marker for direct session JWT callbacks. + key: Option, +} + /// Returns the HTML for a successful connection page. -fn success_html() -> String { +fn success_html(message: &str) -> String { + let escaped_message = escape_html(message); r#" @@ -420,11 +430,11 @@ fn success_html() -> String {

Connected!

-

Your Telegram account has been connected to OpenHuman. You can close this tab.

+

__MESSAGE__

"# - .to_string() + .replace("__MESSAGE__", &escaped_message) } /// Simple HTML escaping for error messages. @@ -466,6 +476,36 @@ fn error_html(message: &str) -> String { ) } +/// Require desktop `/auth` callbacks to be top-level document navigations when +/// browser fetch-metadata headers are present. +/// +/// The preferred Tauri loopback listener has a per-login state nonce. This +/// legacy core fallback cannot rely on that state, so it must reject embedded +/// resource loads (``, iframe, fetch, script) before token exchange. +fn desktop_callback_navigation_ok(headers: &axum::http::HeaderMap) -> Result<(), &'static str> { + let get_str = |name: &str| -> Option<&str> { + headers + .get(name) + .and_then(|v| v.to_str().ok()) + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + }; + + if let Some(mode) = get_str("sec-fetch-mode") { + if mode != "navigate" { + return Err("Sec-Fetch-Mode must be 'navigate'"); + } + } + + if let Some(dest) = get_str("sec-fetch-dest") { + if dest != "document" { + return Err("Sec-Fetch-Dest must be 'document'"); + } + } + + Ok(()) +} + /// Inspect the browser fetch-metadata + Referer/Origin headers and decide /// whether the inbound `/auth/telegram` request looks like a legitimate /// top-level redirect from Telegram, or a cross-site CSRF attempt. @@ -666,7 +706,129 @@ async fn telegram_auth_handler( } } - html_response(StatusCode::OK, success_html()) + html_response( + StatusCode::OK, + success_html( + "Your Telegram account has been connected to OpenHuman. You can close this tab.", + ), + ) +} + +/// Handles the generic desktop login callback fallback. +/// +/// The preferred path is the `openhuman://auth?...` deep link handled in the +/// renderer. On hosts where URL-scheme registration is broken, some login +/// flows can fall back to the local core callback (`/auth`). This route is +/// public because the callback carries its own one-time login token; raw +/// session JWT callbacks are intentionally rejected on this public surface. +async fn desktop_auth_handler( + headers: axum::http::HeaderMap, + Query(query): Query, +) -> impl IntoResponse { + let html_response = |status: StatusCode, body: String| -> Response { + ( + status, + [(header::CONTENT_TYPE, "text/html; charset=utf-8")], + body, + ) + .into_response() + }; + + if let Err(reason) = desktop_callback_navigation_ok(&headers) { + log::warn!("[auth:desktop] Rejected non-navigation callback: {reason}"); + return html_response( + StatusCode::BAD_REQUEST, + error_html("Sign-in callback must be opened as a browser page. Please try again."), + ); + } + + let token = match query + .token + .as_deref() + .map(str::trim) + .filter(|s| !s.is_empty()) + { + Some(t) => t.to_string(), + None => { + return html_response( + StatusCode::BAD_REQUEST, + error_html("Sign-in callback was missing a token. Please try again."), + ) + } + }; + + if query + .key + .as_deref() + .map(str::trim) + .filter(|key| !key.is_empty()) + .is_some() + { + log::warn!("[auth:desktop] Rejected deprecated direct session token callback"); + return html_response( + StatusCode::BAD_REQUEST, + error_html("This sign-in callback is no longer supported. Please start sign-in again."), + ); + } + + log::info!("[auth:desktop] Received desktop auth callback"); + + let config = match crate::openhuman::config::Config::load_or_init().await { + Ok(c) => c, + Err(e) => { + log::error!("[auth:desktop] Failed to load config: {e}"); + return html_response( + StatusCode::INTERNAL_SERVER_ERROR, + error_html("Internal error. Please try again."), + ); + } + }; + + let api_url = crate::api::config::effective_backend_api_url(&config.api_url); + let client = match crate::api::rest::BackendOAuthClient::new(&api_url) { + Ok(c) => c, + Err(e) => { + log::error!("[auth:desktop] Failed to create API client: {e}"); + return html_response( + StatusCode::INTERNAL_SERVER_ERROR, + error_html("Internal error. Please try again."), + ); + } + }; + + let jwt_token = match client.consume_login_token(&token).await { + Ok(jwt) => jwt, + Err(e) => { + log::warn!("[auth:desktop] Login token consumption failed: {e}"); + return html_response( + StatusCode::BAD_REQUEST, + error_html("This sign-in link has expired or was already used. Please try again."), + ); + } + }; + + match crate::openhuman::credentials::ops::store_session(&config, &jwt_token, None, None).await { + Ok(outcome) => { + for msg in &outcome.logs { + log::info!("[auth:desktop] {msg}"); + } + log::info!("[auth:desktop] Session stored successfully"); + } + Err(e) => { + log::error!("[auth:desktop] Failed to store session: {e}"); + return html_response( + StatusCode::INTERNAL_SERVER_ERROR, + error_html( + "Sign-in succeeded but OpenHuman could not save the session. Please try again.", + ), + ); + } + } + + html_response( + StatusCode::OK, + success_html("Sign-in completed. You can close this tab and return to OpenHuman."), + ) } /// WebSocket upgrade handler for streaming voice dictation. @@ -702,6 +864,7 @@ pub fn build_core_http_router(socketio_enabled: bool) -> Router { .route("/events/webhooks", get(webhook_events_handler)) .route("/rpc", post(rpc_handler)) .route("/ws/dictation", get(dictation_ws_handler)) + .route("/auth", get(desktop_auth_handler)) .route("/auth/telegram", get(telegram_auth_handler)) // OpenAI-compatible inference endpoint (/v1/chat/completions, /v1/models) .nest("/v1", crate::openhuman::inference::http::router()) diff --git a/src/core/jsonrpc_tests.rs b/src/core/jsonrpc_tests.rs index f1c5c5e37f..c02ad94107 100644 --- a/src/core/jsonrpc_tests.rs +++ b/src/core/jsonrpc_tests.rs @@ -1178,3 +1178,60 @@ async fn test_http_health_handler_returns_correct_status() { assert_eq!(status, expected_status); } + +#[tokio::test] +async fn desktop_auth_rejects_deprecated_direct_session_token_marker() { + use axum::body::to_bytes; + use axum::extract::Query; + use axum::http::{HeaderMap, StatusCode}; + use axum::response::IntoResponse; + + let resp = super::desktop_auth_handler( + HeaderMap::new(), + Query(super::DesktopAuthQuery { + token: Some("eyJ.attacker.session.jwt".to_string()), + key: Some(" auth ".to_string()), + }), + ) + .await + .into_response(); + + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + + let body = to_bytes(resp.into_body(), usize::MAX) + .await + .expect("response body"); + let body = String::from_utf8(body.to_vec()).expect("html body should be utf8"); + assert!(body.contains("no longer supported")); + assert!(!body.contains("Sign-in completed")); +} + +#[tokio::test] +async fn desktop_auth_rejects_embedded_fetch_metadata() { + use axum::body::to_bytes; + use axum::extract::Query; + use axum::http::{HeaderMap, HeaderValue, StatusCode}; + use axum::response::IntoResponse; + + let mut headers = HeaderMap::new(); + headers.insert("sec-fetch-mode", HeaderValue::from_static("no-cors")); + headers.insert("sec-fetch-dest", HeaderValue::from_static("image")); + + let resp = super::desktop_auth_handler( + headers, + Query(super::DesktopAuthQuery { + token: Some("one-time-login-token".to_string()), + key: None, + }), + ) + .await + .into_response(); + + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + + let body = to_bytes(resp.into_body(), usize::MAX) + .await + .expect("response body"); + let body = String::from_utf8(body.to_vec()).expect("html body should be utf8"); + assert!(body.contains("must be opened as a browser page")); +} diff --git a/src/openhuman/credentials/ops.rs b/src/openhuman/credentials/ops.rs index 11db7e50b2..8077661b9c 100644 --- a/src/openhuman/credentials/ops.rs +++ b/src/openhuman/credentials/ops.rs @@ -279,6 +279,21 @@ pub async fn store_session( logs.push("session stored".to_string()); + match crate::openhuman::memory::global::init(effective_config.workspace_dir.clone()) { + Ok(_) => logs.push(format!( + "memory client bound to workspace {}", + effective_config.workspace_dir.display() + )), + Err(e) => { + tracing::warn!(error = %e, "[credentials] failed to bind memory client after login"); + logs.push(format!("memory client bind warning: {e}")); + } + } + crate::openhuman::memory_conversations::register_conversation_persistence_subscriber( + effective_config.workspace_dir.clone(), + ); + logs.push("conversation persistence bound to active workspace".to_string()); + // Now that active_user.toml exists and config.workspace_dir resolves to // the per-user path, seed the subconscious defaults and spawn the // heartbeat loop. Idempotent — no-op on subsequent logins of the same diff --git a/src/openhuman/inference/provider/ops.rs b/src/openhuman/inference/provider/ops.rs index 5ff95a1c11..945e0a8eec 100644 --- a/src/openhuman/inference/provider/ops.rs +++ b/src/openhuman/inference/provider/ops.rs @@ -27,6 +27,17 @@ pub struct ModelInfo { pub async fn list_configured_models( provider_id: &str, +) -> Result, String> { + let config = crate::openhuman::config::Config::load_or_init() + .await + .map_err(|e| e.to_string())?; + + list_configured_models_from_config(provider_id, &config).await +} + +async fn list_configured_models_from_config( + provider_id: &str, + config: &crate::openhuman::config::Config, ) -> Result, String> { let provider_id = provider_id.trim().to_string(); if provider_id.is_empty() { @@ -35,10 +46,6 @@ pub async fn list_configured_models( log::debug!("[providers][list_models] provider_id={}", provider_id); - let config = crate::openhuman::config::Config::load_or_init() - .await - .map_err(|e| e.to_string())?; - let entry = config .cloud_providers .iter() @@ -56,7 +63,7 @@ pub async fn list_configured_models( ); let api_key = - crate::openhuman::inference::provider::factory::lookup_key_for_slug(&entry.slug, &config) + crate::openhuman::inference::provider::factory::lookup_key_for_slug(&entry.slug, config) .unwrap_or_default(); let api_key = api_key.trim().to_string(); @@ -952,33 +959,6 @@ mod tests { model_authorization: Arc>>>, } - struct WorkspaceEnvGuard { - prev: Option, - _lock: std::sync::MutexGuard<'static, ()>, - } - - impl Drop for WorkspaceEnvGuard { - fn drop(&mut self) { - unsafe { - match self.prev.take() { - Some(value) => std::env::set_var("OPENHUMAN_WORKSPACE", value), - None => std::env::remove_var("OPENHUMAN_WORKSPACE"), - } - } - } - } - - fn set_workspace_env(path: &std::path::Path) -> WorkspaceEnvGuard { - let lock = crate::openhuman::config::TEST_ENV_LOCK - .lock() - .unwrap_or_else(|e| e.into_inner()); - let prev = std::env::var_os("OPENHUMAN_WORKSPACE"); - unsafe { - std::env::set_var("OPENHUMAN_WORKSPACE", path); - } - WorkspaceEnvGuard { prev, _lock: lock } - } - async fn openrouter_key_handler( State(state): State, headers: HeaderMap, @@ -1162,11 +1142,10 @@ mod tests { #[tokio::test] async fn openrouter_invalid_key_fails_before_models_catalog_probe() { let tmp = tempfile::tempdir().expect("tempdir"); - let _env = set_workspace_env(tmp.path()); let (endpoint, state) = spawn_openrouter_probe_server(StatusCode::UNAUTHORIZED).await; - configure_openrouter_workspace(&tmp, endpoint, "bad-openrouter-key").await; + let config = configure_openrouter_workspace(&tmp, endpoint, "bad-openrouter-key").await; - let err = list_configured_models("openrouter") + let err = list_configured_models_from_config("openrouter", &config) .await .expect_err("invalid OpenRouter key must fail"); @@ -1185,11 +1164,10 @@ mod tests { #[tokio::test] async fn openrouter_valid_key_allows_models_catalog_probe() { let tmp = tempfile::tempdir().expect("tempdir"); - let _env = set_workspace_env(tmp.path()); let (endpoint, state) = spawn_openrouter_probe_server(StatusCode::OK).await; - configure_openrouter_workspace(&tmp, endpoint, "valid-openrouter-key").await; + let config = configure_openrouter_workspace(&tmp, endpoint, "valid-openrouter-key").await; - let outcome = list_configured_models("openrouter") + let outcome = list_configured_models_from_config("openrouter", &config) .await .expect("valid OpenRouter key should list models"); @@ -1201,11 +1179,11 @@ mod tests { #[tokio::test] async fn openrouter_key_is_trimmed_for_validation_and_catalog_probe() { let tmp = tempfile::tempdir().expect("tempdir"); - let _env = set_workspace_env(tmp.path()); let (endpoint, state) = spawn_openrouter_probe_server(StatusCode::OK).await; - configure_openrouter_workspace(&tmp, endpoint, " valid-openrouter-key\r\n").await; + let config = + configure_openrouter_workspace(&tmp, endpoint, " valid-openrouter-key\r\n").await; - list_configured_models("openrouter") + list_configured_models_from_config("openrouter", &config) .await .expect("trimmed OpenRouter key should list models"); diff --git a/src/openhuman/mcp_client/stdio.rs b/src/openhuman/mcp_client/stdio.rs index 67b192dc27..67c82eac1c 100644 --- a/src/openhuman/mcp_client/stdio.rs +++ b/src/openhuman/mcp_client/stdio.rs @@ -238,42 +238,3 @@ impl McpStdioClient { Ok(()) } } - -#[cfg(test)] -mod tests { - use super::*; - use std::path::PathBuf; - - fn openhuman_core_bin() -> PathBuf { - let status = std::process::Command::new("cargo") - .args(["build", "--quiet", "--bin", "openhuman-core"]) - .status() - .expect("spawn cargo build"); - assert!(status.success(), "cargo build --bin openhuman-core failed"); - - let exe = std::env::current_exe().expect("current_exe"); - let debug_dir = exe - .parent() - .and_then(|p| p.parent()) - .expect("target/debug dir"); - let bin = debug_dir.join("openhuman-core"); - assert!(bin.exists(), "expected openhuman-core at {}", bin.display()); - bin - } - - #[tokio::test] - async fn stdio_client_talks_to_openhuman_mcp_server() { - let client = McpStdioClient::new( - openhuman_core_bin().display().to_string(), - vec!["mcp".into()], - Vec::new(), - Some(std::env::current_dir().unwrap()), - McpClientIdentityConfig::default(), - ); - let init = client.initialize().await.expect("initialize"); - assert_eq!(init.protocol_version, LATEST_PROTOCOL_VERSION); - let tools = client.list_tools().await.expect("list_tools"); - assert!(tools.iter().any(|tool| tool.name == "memory.search")); - client.close_session().await.expect("close"); - } -} diff --git a/src/openhuman/memory/global.rs b/src/openhuman/memory/global.rs index e49d514995..508f70864f 100644 --- a/src/openhuman/memory/global.rs +++ b/src/openhuman/memory/global.rs @@ -17,34 +17,94 @@ //! ``` use std::path::PathBuf; -use std::sync::{Arc, OnceLock}; +use std::sync::{Arc, OnceLock, RwLock}; use crate::openhuman::memory_store::{MemoryClient, MemoryClientRef}; -/// The process-global memory client. -static GLOBAL_CLIENT: OnceLock = OnceLock::new(); +#[derive(Clone)] +struct GlobalMemoryClient { + workspace_dir: PathBuf, + client: MemoryClientRef, +} + +type GlobalClientSlot = RwLock>; -/// Initialise the global memory client from a workspace directory. +/// The process-global memory client slot. +static GLOBAL_CLIENT: OnceLock = OnceLock::new(); + +fn global_slot() -> &'static GlobalClientSlot { + GLOBAL_CLIENT.get_or_init(GlobalClientSlot::default) +} + +/// Initialise or re-bind the global memory client from a workspace directory. /// -/// Safe to call multiple times — only the first call takes effect. -/// Returns the (possibly pre-existing) client reference. +/// Safe to call multiple times. Calls for the same workspace return the +/// existing client; calls for a different workspace replace the global handle +/// so a post-login active-user switch does not keep writing to the pre-login +/// workspace. pub fn init(workspace_dir: PathBuf) -> Result { - if let Some(existing) = GLOBAL_CLIENT.get() { - log::debug!("[memory:global] already initialised, returning existing client"); - return Ok(Arc::clone(existing)); + init_in_slot(global_slot(), workspace_dir) +} + +fn init_in_slot( + slot: &GlobalClientSlot, + workspace_dir: PathBuf, +) -> Result { + if let Some(existing) = slot + .read() + .map_err(|e| format!("[memory:global] read lock poisoned: {e}"))? + .as_ref() + { + if existing.workspace_dir == workspace_dir { + log::debug!("[memory:global] already initialised for current workspace"); + return Ok(Arc::clone(&existing.client)); + } } log::info!( "[memory:global] initialising global MemoryClient workspace={}", workspace_dir.display() ); - let client = Arc::new(MemoryClient::from_workspace_dir(workspace_dir)?); + let client = match MemoryClient::from_workspace_dir(workspace_dir.clone()) { + Ok(client) => Arc::new(client), + Err(error) => { + let mut guard = slot + .write() + .map_err(|e| format!("[memory:global] write lock poisoned: {e}"))?; + if guard + .as_ref() + .is_some_and(|existing| existing.workspace_dir != workspace_dir) + { + log::warn!( + "[memory:global] clearing stale MemoryClient after failed rebind to {}", + workspace_dir.display() + ); + *guard = None; + } + return Err(error); + } + }; + + let mut guard = slot + .write() + .map_err(|e| format!("[memory:global] write lock poisoned: {e}"))?; + if let Some(existing) = guard.as_ref() { + if existing.workspace_dir == workspace_dir { + return Ok(Arc::clone(&existing.client)); + } - // OnceLock::set can fail if another thread raced us — that's fine, - // just return whichever won. - let _ = GLOBAL_CLIENT.set(Arc::clone(&client)); + log::info!( + "[memory:global] rebinding MemoryClient workspace {} -> {}", + existing.workspace_dir.display(), + workspace_dir.display() + ); + } - Ok(GLOBAL_CLIENT.get().cloned().unwrap_or(client)) + *guard = Some(GlobalMemoryClient { + workspace_dir, + client: Arc::clone(&client), + }); + Ok(client) } /// Initialise using the default `~/.openhuman/workspace` directory. @@ -66,28 +126,35 @@ pub fn init_default() -> Result { /// /// Returns `Err` if [`init`] has not yet been called. There is **no** lazy /// fallback: a fallback would pin the global to `~/.openhuman/workspace` on -/// the first stray call (test, early RPC, etc.), and `OnceLock::set` is -/// one-shot, so the real `init(custom_workspace)` would silently no-op -/// afterwards and every caller would get the wrong workspace. +/// the first stray call (test, early RPC, etc.). The explicit init/rebind path +/// keeps workspace ownership visible at startup and after login. /// /// Callers that can tolerate "not yet ready" should use /// [`client_if_ready`] instead. pub fn client() -> Result { - client_from(&GLOBAL_CLIENT) + client_from(global_slot()) } /// Implementation backing [`client`] — extracted so unit tests can pass a -/// freshly-constructed local `OnceLock` and assert the uninitialised-error +/// freshly-constructed local slot and assert the uninitialised-error /// contract without racing the process-global singleton. -fn client_from(slot: &OnceLock) -> Result { - slot.get().cloned().ok_or_else(|| { - "memory global accessed before init — call init(workspace) at startup".to_string() - }) +fn client_from(slot: &GlobalClientSlot) -> Result { + slot.read() + .map_err(|e| format!("[memory:global] read lock poisoned: {e}"))? + .as_ref() + .map(|entry| Arc::clone(&entry.client)) + .ok_or_else(|| { + "memory global accessed before init — call init(workspace) at startup".to_string() + }) } /// Returns the global client if already initialised, without lazy init. pub fn client_if_ready() -> Option { - GLOBAL_CLIENT.get().cloned() + global_slot() + .read() + .ok()? + .as_ref() + .map(|entry| Arc::clone(&entry.client)) } #[cfg(test)] @@ -95,10 +162,9 @@ mod tests { use super::*; use tempfile::TempDir; - /// All tests must contend with the fact that `GLOBAL_CLIENT` is a - /// process-wide `OnceLock` — once set, it stays set for the rest of - /// the test binary. We tolerate both branches so test ordering doesn't - /// flake the suite. + /// All tests that touch `GLOBAL_CLIENT` must contend with process-wide + /// state. We tolerate both branches so test ordering doesn't flake the + /// suite. #[tokio::test] async fn client_if_ready_is_some_after_init_or_remains_none() { let before = client_if_ready(); @@ -115,13 +181,45 @@ mod tests { #[tokio::test] async fn init_returns_existing_client_when_already_set() { + let slot = GlobalClientSlot::default(); + let tmp = TempDir::new().unwrap(); + let workspace = tmp.path().join("ws"); + + let first = init_in_slot(&slot, workspace.clone()).unwrap(); + let second = init_in_slot(&slot, workspace).unwrap(); + + assert!(Arc::ptr_eq(&first, &second)); + } + + #[tokio::test] + async fn init_rebinds_client_when_workspace_changes() { + let slot = GlobalClientSlot::default(); let tmp = TempDir::new().unwrap(); - let first = init(tmp.path().join("ws-a")); - let tmp2 = TempDir::new().unwrap(); - let second = init(tmp2.path().join("ws-b")); - assert!(first.is_ok() && second.is_ok()); - // Both refs point to the same global Arc — the second init is a no-op. - assert!(Arc::ptr_eq(&first.unwrap(), &second.unwrap())); + + let first = init_in_slot(&slot, tmp.path().join("ws-a")).unwrap(); + let second = init_in_slot(&slot, tmp.path().join("ws-b")).unwrap(); + let current = client_from(&slot).unwrap(); + + assert!(!Arc::ptr_eq(&first, &second)); + assert!(Arc::ptr_eq(&second, ¤t)); + } + + #[tokio::test] + async fn init_clears_existing_client_when_rebind_workspace_cannot_initialise() { + let slot = GlobalClientSlot::default(); + let tmp = TempDir::new().unwrap(); + + let _first = init_in_slot(&slot, tmp.path().join("ws-a")).unwrap(); + let file_path = tmp.path().join("not-a-directory"); + std::fs::write(&file_path, b"not a workspace").unwrap(); + + let err = match init_in_slot(&slot, file_path) { + Ok(_) => panic!("rebind to a file path must fail"), + Err(err) => err, + }; + + assert!(err.contains("Create workspace dir")); + assert!(client_from(&slot).is_err()); } #[tokio::test] @@ -142,7 +240,7 @@ mod tests { // other tests may have already called `init()` on the singleton, so // an `is_none`-gated check on `GLOBAL_CLIENT` would race / silently // skip. `client_from` lets us assert the contract deterministically. - let local: OnceLock = OnceLock::new(); + let local = GlobalClientSlot::default(); match client_from(&local) { Ok(_) => panic!("client_from(empty) must error"), Err(err) => assert!( diff --git a/src/openhuman/memory/ops/documents.rs b/src/openhuman/memory/ops/documents.rs index 7ff43481dc..8bd99e2dee 100644 --- a/src/openhuman/memory/ops/documents.rs +++ b/src/openhuman/memory/ops/documents.rs @@ -513,7 +513,8 @@ mod tests { } fn unique_namespace(prefix: &str) -> String { - format!("{prefix}-{}", uuid::Uuid::new_v4()) + let short = &uuid::Uuid::new_v4().as_simple().to_string()[..12]; + format!("{prefix}{short}") } fn sample_put(namespace: String, key: String, title: &str, content: &str) -> PutDocParams { @@ -536,7 +537,10 @@ mod tests { async fn direct_document_handlers_roundtrip_through_namespace() { ensure_memory_client(); let namespace = unique_namespace("memory-docs-direct"); - let key = format!("note-{}", uuid::Uuid::new_v4()); + let key = format!( + "note{}", + &uuid::Uuid::new_v4().as_simple().to_string()[..12] + ); let put = doc_put(sample_put( namespace.clone(), @@ -611,7 +615,7 @@ mod tests { async fn envelope_memory_handlers_report_counts_and_statuses() { ensure_memory_client(); let namespace = unique_namespace("memory-docs-envelope"); - let key = format!("env-{}", uuid::Uuid::new_v4()); + let key = format!("env{}", &uuid::Uuid::new_v4().as_simple().to_string()[..12]); let _ = memory_init(MemoryInitRequest { jwt_token: None }) .await diff --git a/src/openhuman/memory/ops/kv_graph.rs b/src/openhuman/memory/ops/kv_graph.rs index 0551f6662b..cc5c9d89db 100644 --- a/src/openhuman/memory/ops/kv_graph.rs +++ b/src/openhuman/memory/ops/kv_graph.rs @@ -157,14 +157,18 @@ mod tests { } fn unique_namespace(prefix: &str) -> String { - format!("{prefix}-{}", uuid::Uuid::new_v4()) + let short = &uuid::Uuid::new_v4().as_simple().to_string()[..12]; + format!("{prefix}{short}") } #[tokio::test] async fn kv_handlers_roundtrip_scoped_values() { ensure_memory_client(); let namespace = unique_namespace("kv-graph-kv"); - let key = format!("state-{}", uuid::Uuid::new_v4()); + let key = format!( + "state{}", + &uuid::Uuid::new_v4().as_simple().to_string()[..12] + ); let set = kv_set(KvSetParams { namespace: Some(namespace.clone()), @@ -214,7 +218,10 @@ mod tests { async fn graph_handlers_roundtrip_relation_rows() { ensure_memory_client(); let namespace = unique_namespace("kv-graph-rel"); - let subject = format!("alice-{}", uuid::Uuid::new_v4()); + let subject = format!( + "alice{}", + &uuid::Uuid::new_v4().as_simple().to_string()[..12] + ); let upsert = graph_upsert(GraphUpsertParams { namespace: Some(namespace.clone()), diff --git a/src/openhuman/memory/ops/learn.rs b/src/openhuman/memory/ops/learn.rs index 79b470e959..b4bcc83077 100644 --- a/src/openhuman/memory/ops/learn.rs +++ b/src/openhuman/memory/ops/learn.rs @@ -184,7 +184,9 @@ mod tests { impl WorkspaceEnvGuard { fn set(path: &std::path::Path) -> Self { - let lock = crate::openhuman::config::TEST_ENV_LOCK.lock().unwrap(); + let lock = crate::openhuman::config::TEST_ENV_LOCK + .lock() + .unwrap_or_else(|e| e.into_inner()); let previous = std::env::var_os("OPENHUMAN_WORKSPACE"); std::env::set_var("OPENHUMAN_WORKSPACE", path); Self { @@ -206,12 +208,13 @@ mod tests { async fn seed_namespace(prefix: &str) -> String { ensure_memory_client(); - let namespace = format!("{prefix}-{}", uuid::Uuid::new_v4()); + let short_id = &uuid::Uuid::new_v4().as_simple().to_string()[..12]; + let namespace = format!("{prefix}ns{short_id}"); let client = crate::openhuman::memory::global::client().expect("memory client"); client .put_doc_light(NamespaceDocumentInput { namespace: namespace.clone(), - key: format!("key-{}", uuid::Uuid::new_v4()), + key: format!("testkey{short_id}"), title: "Test".into(), content: "Seed content".into(), source_type: "doc".into(), @@ -230,13 +233,14 @@ mod tests { async fn write_config_with_runtime_enabled( workspace_root: &std::path::Path, runtime_enabled: bool, - ) { - let _guard = WorkspaceEnvGuard::set(workspace_root); + ) -> WorkspaceEnvGuard { + let guard = WorkspaceEnvGuard::set(workspace_root); let mut config = crate::openhuman::config::Config::load_or_init() .await .expect("load config"); config.local_ai.runtime_enabled = runtime_enabled; config.save().await.expect("save config"); + guard } #[tokio::test] @@ -255,7 +259,10 @@ mod tests { #[tokio::test] async fn memory_learn_all_is_noop_when_requested_namespaces_do_not_exist() { ensure_memory_client(); - let missing = format!("missing-{}", uuid::Uuid::new_v4()); + let missing = format!( + "missing{}", + &uuid::Uuid::new_v4().as_simple().to_string()[..12] + ); let outcome = memory_learn_all(LearnAllParams { namespaces: Some(vec![missing]), }) @@ -269,10 +276,12 @@ mod tests { async fn memory_learn_all_filters_missing_namespaces_and_dedupes_requested_order() { let namespace_a = seed_namespace("memory-learn-a").await; let namespace_b = seed_namespace("memory-learn-b").await; - let missing = format!("missing-{}", uuid::Uuid::new_v4()); + let missing = format!( + "missing{}", + &uuid::Uuid::new_v4().as_simple().to_string()[..12] + ); let tmp = TempDir::new().expect("tempdir"); - write_config_with_runtime_enabled(tmp.path(), true).await; - let _workspace = WorkspaceEnvGuard::set(tmp.path()); + let _workspace = write_config_with_runtime_enabled(tmp.path(), true).await; let outcome = memory_learn_all(LearnAllParams { namespaces: Some(vec![ @@ -297,8 +306,7 @@ mod tests { async fn memory_learn_all_requires_local_ai_once_existing_namespace_is_selected() { let namespace = seed_namespace("memory-learn-runtime").await; let tmp = TempDir::new().expect("tempdir"); - write_config_with_runtime_enabled(tmp.path(), false).await; - let _workspace = WorkspaceEnvGuard::set(tmp.path()); + let _workspace = write_config_with_runtime_enabled(tmp.path(), false).await; let err = memory_learn_all(LearnAllParams { namespaces: Some(vec![namespace]), @@ -314,8 +322,7 @@ mod tests { let namespace_a = seed_namespace("memory-learn-all-a").await; let namespace_b = seed_namespace("memory-learn-all-b").await; let tmp = TempDir::new().expect("tempdir"); - write_config_with_runtime_enabled(tmp.path(), true).await; - let _workspace = WorkspaceEnvGuard::set(tmp.path()); + let _workspace = write_config_with_runtime_enabled(tmp.path(), true).await; let outcome = memory_learn_all(LearnAllParams { namespaces: None }) .await diff --git a/src/openhuman/memory/ops/tool_memory.rs b/src/openhuman/memory/ops/tool_memory.rs index e3685efc2c..ea60613fbf 100644 --- a/src/openhuman/memory/ops/tool_memory.rs +++ b/src/openhuman/memory/ops/tool_memory.rs @@ -196,7 +196,8 @@ mod tests { } fn unique_tool_name() -> String { - format!("tool-memory-{}", uuid::Uuid::new_v4()) + let short = &uuid::Uuid::new_v4().as_simple().to_string()[..12]; + format!("toolmem{short}") } #[tokio::test] diff --git a/src/openhuman/memory_conversations/bus.rs b/src/openhuman/memory_conversations/bus.rs index 3877a38689..33b8e13f9b 100644 --- a/src/openhuman/memory_conversations/bus.rs +++ b/src/openhuman/memory_conversations/bus.rs @@ -3,7 +3,7 @@ //! etc.) persist alongside UI-driven threads. use std::path::{Path, PathBuf}; -use std::sync::{Arc, OnceLock}; +use std::sync::{Arc, OnceLock, RwLock}; use async_trait::async_trait; use chrono::Utc; @@ -18,6 +18,7 @@ use super::{ }; static CONVERSATION_PERSISTENCE_HANDLE: OnceLock = OnceLock::new(); +static CONVERSATION_PERSISTENCE_WORKSPACE: OnceLock>> = OnceLock::new(); const LOG_PREFIX: &str = "[memory:conversations:bus]"; @@ -26,12 +27,23 @@ const LOG_PREFIX: &str = "[memory:conversations:bus]"; /// This bridges typed channel events onto the workspace-backed JSONL /// conversation store so non-web channels persist alongside UI threads. pub fn register_conversation_persistence_subscriber(workspace_dir: PathBuf) { + let workspace = CONVERSATION_PERSISTENCE_WORKSPACE + .get_or_init(|| Arc::new(RwLock::new(workspace_dir.clone()))); + match workspace.write() { + Ok(mut guard) => { + *guard = workspace_dir; + } + Err(error) => { + log::warn!("{LOG_PREFIX} failed to update workspace binding: {error}"); + } + } + if CONVERSATION_PERSISTENCE_HANDLE.get().is_some() { return; } match crate::core::event_bus::subscribe_global(Arc::new( - ConversationPersistenceSubscriber::new(workspace_dir), + ConversationPersistenceSubscriber::new_shared(Arc::clone(workspace)), )) { Some(handle) => { let _ = CONVERSATION_PERSISTENCE_HANDLE.set(handle); @@ -45,13 +57,26 @@ pub fn register_conversation_persistence_subscriber(workspace_dir: PathBuf) { } pub struct ConversationPersistenceSubscriber { - workspace_dir: PathBuf, + workspace_dir: Arc>, } impl ConversationPersistenceSubscriber { pub fn new(workspace_dir: PathBuf) -> Self { + Self { + workspace_dir: Arc::new(RwLock::new(workspace_dir)), + } + } + + fn new_shared(workspace_dir: Arc>) -> Self { Self { workspace_dir } } + + fn workspace_dir_snapshot(&self) -> Result { + self.workspace_dir + .read() + .map(|guard| guard.clone()) + .map_err(|error| format!("workspace binding poisoned: {error}")) + } } #[async_trait] @@ -74,8 +99,15 @@ impl EventHandler for ConversationPersistenceSubscriber { content, thread_ts, } => { + let workspace_dir = match self.workspace_dir_snapshot() { + Ok(workspace_dir) => workspace_dir, + Err(error) => { + log::warn!("{LOG_PREFIX} failed to resolve workspace: {error}"); + return; + } + }; if let Err(error) = persist_channel_turn( - &self.workspace_dir, + &workspace_dir, ChannelTurnDescriptor { channel, message_id, @@ -108,8 +140,15 @@ impl EventHandler for ConversationPersistenceSubscriber { success, .. } => { + let workspace_dir = match self.workspace_dir_snapshot() { + Ok(workspace_dir) => workspace_dir, + Err(error) => { + log::warn!("{LOG_PREFIX} failed to resolve workspace: {error}"); + return; + } + }; if let Err(error) = persist_channel_turn( - &self.workspace_dir, + &workspace_dir, ChannelTurnDescriptor { channel, message_id, @@ -270,6 +309,19 @@ mod tests { use super::*; + #[test] + fn subscriber_reads_rebound_workspace_from_shared_handle() { + let tmp = tempfile::TempDir::new().unwrap(); + let first = tmp.path().join("first"); + let second = tmp.path().join("second"); + let shared = Arc::new(RwLock::new(first.clone())); + let subscriber = ConversationPersistenceSubscriber::new_shared(Arc::clone(&shared)); + + assert_eq!(subscriber.workspace_dir_snapshot().unwrap(), first); + *shared.write().unwrap() = second.clone(); + assert_eq!(subscriber.workspace_dir_snapshot().unwrap(), second); + } + #[tokio::test] async fn persists_inbound_and_processed_turns_into_workspace_thread() { let temp = TempDir::new().expect("tempdir"); diff --git a/src/openhuman/memory_sync/composio/providers/descriptions.rs b/src/openhuman/memory_sync/composio/providers/descriptions.rs index 0a0cf52a80..09468447e2 100644 --- a/src/openhuman/memory_sync/composio/providers/descriptions.rs +++ b/src/openhuman/memory_sync/composio/providers/descriptions.rs @@ -23,6 +23,9 @@ pub fn toolkit_description(slug: &str) -> &'static str { "google_sheets" => "Read, write, and manage Google Sheets spreadsheets", "outlook" => "Send, read, and manage emails in Microsoft Outlook", "microsoft_teams" => "Send messages and manage channels in Microsoft Teams", + "larksuite" => { + "Connect Lark / Feishu workspace chat, docs, wiki, and meetings via Composio" + } "linear" => { "Create, read, and manage issues, projects, and cycles in Linear; sync \ assigned issues into Memory Tree" diff --git a/src/openhuman/screen_intelligence/helpers.rs b/src/openhuman/screen_intelligence/helpers.rs index 7f0e277fdb..33b45ba61b 100644 --- a/src/openhuman/screen_intelligence/helpers.rs +++ b/src/openhuman/screen_intelligence/helpers.rs @@ -195,7 +195,7 @@ pub(crate) async fn persist_vision_summary( content.push_str(&format!("{}\n", summary.key_text)); } - let key = format!("screen_intelligence_{}", summary.id); + let key = vision_summary_memory_key(&summary); let document = NamespaceDocumentInput { namespace: VISION_MEMORY_NAMESPACE.to_string(), key: key.clone(), @@ -232,6 +232,23 @@ pub(crate) async fn persist_vision_summary( }) } +pub(crate) fn vision_summary_memory_key(summary: &VisionSummary) -> String { + format!( + "screen_intelligence_{}_{}", + summary.captured_at_ms, + stable_decimal_hash(&summary.id) + ) +} + +fn stable_decimal_hash(value: &str) -> u64 { + let mut hash = 0xcbf29ce484222325u64; + for byte in value.as_bytes() { + hash ^= u64::from(*byte); + hash = hash.wrapping_mul(0x100000001b3); + } + hash +} + pub(crate) fn truncate_tail(text: &str, max_chars: usize) -> String { let chars: Vec = text.chars().collect(); if chars.len() <= max_chars { diff --git a/src/openhuman/screen_intelligence/tests.rs b/src/openhuman/screen_intelligence/tests.rs index a19c6317e7..8f018c07ce 100644 --- a/src/openhuman/screen_intelligence/tests.rs +++ b/src/openhuman/screen_intelligence/tests.rs @@ -9,10 +9,11 @@ use image::{ImageBuffer, Rgb, RgbImage}; use tempfile::tempdir; use super::helpers::{ - generate_suggestions, parse_vision_summary_output, truncate_tail, validate_input_action, + generate_suggestions, parse_vision_summary_output, persist_vision_summary, truncate_tail, + validate_input_action, vision_summary_memory_key, }; use super::state::{AccessibilityEngine, EngineState}; -use super::types::{CaptureFrame, InputActionParams, StartSessionParams}; +use super::types::{CaptureFrame, InputActionParams, StartSessionParams, VisionSummary}; use crate::openhuman::accessibility::{parse_foreground_output, AppContext}; use crate::openhuman::config::{Config, ScreenIntelligenceConfig}; use crate::openhuman::embeddings::NoopEmbedding; @@ -775,7 +776,7 @@ async fn analyze_and_persist_frame_writes_unified_memory_document() { let documents = list["documents"] .as_array() .expect("documents array should exist"); - let key = format!("screen_intelligence_{}", summary.id); + let key = vision_summary_memory_key(&summary); assert!( documents .iter() @@ -784,6 +785,43 @@ async fn analyze_and_persist_frame_writes_unified_memory_document() { ); } +#[tokio::test] +async fn persist_vision_summary_uses_pii_safe_document_key() { + let _env_lock = screen_intelligence_env_lock(); + let tmp = tempdir().expect("tempdir"); + let _workspace = EnvVarGuard::set_to_path("OPENHUMAN_WORKSPACE", tmp.path()); + write_screen_intelligence_test_config(tmp.path(), true, "ollama"); + + let summary = VisionSummary { + id: "vision-1700000000300-abcde1234f".to_string(), + captured_at_ms: 1700000000300, + app_name: Some("PipelineApp".to_string()), + window_title: Some("Main.rs".to_string()), + ui_state: "editor".to_string(), + key_text: "fn main() {}".to_string(), + actionable_notes: "Rust source is open".to_string(), + confidence: 0.93, + }; + let raw_key = format!("screen_intelligence_{}", summary.id); + assert!( + crate::openhuman::memory_store::safety::pii::has_likely_pii(&raw_key), + "test fixture must resemble formatted PII before safe-key rewriting" + ); + + let persisted = persist_vision_summary(summary.clone()) + .await + .expect("internal screen-intelligence keys must not trip PII guards"); + assert_eq!(persisted.namespace, "background"); + assert!( + !crate::openhuman::memory_store::safety::pii::has_likely_pii(&persisted.key), + "rewritten memory key must stay below the PII boundary guard" + ); + assert_ne!( + persisted.key, raw_key, + "memory key should not embed the raw vision id when it can resemble formatted PII" + ); +} + #[tokio::test] async fn analyze_and_persist_frame_rejects_non_local_provider() { let _env_lock = screen_intelligence_env_lock(); diff --git a/src/openhuman/vault/ops.rs b/src/openhuman/vault/ops.rs index 774356f1d4..909004053a 100644 --- a/src/openhuman/vault/ops.rs +++ b/src/openhuman/vault/ops.rs @@ -52,6 +52,7 @@ pub async fn vault_create( .canonicalize() .map(|p| p.to_string_lossy().to_string()) .unwrap_or_else(|_| trimmed_root.to_string()), + host_os: Some(store::current_host_os().to_string()), namespace, include_globs, exclude_globs, @@ -90,6 +91,9 @@ pub async fn vault_files(config: &Config, id: &str) -> Result>> = OnceLock::new(); + pub(crate) fn with_connection( config: &Config, f: impl FnOnce(&Connection) -> Result, @@ -25,6 +31,7 @@ pub(crate) fn with_connection( id TEXT PRIMARY KEY, name TEXT NOT NULL, root_path TEXT NOT NULL, + host_os TEXT, namespace TEXT NOT NULL UNIQUE, include_globs TEXT NOT NULL DEFAULT '[]', exclude_globs TEXT NOT NULL DEFAULT '[]', @@ -47,18 +54,44 @@ pub(crate) fn with_connection( ) .context("Failed to initialize vault schema")?; + let migrated = MIGRATED_VAULT_DBS.get_or_init(|| Mutex::new(HashSet::new())); + let mut migrated_paths = migrated + .lock() + .map_err(|_| anyhow!("Failed to lock vault migration cache"))?; + if !migrated_paths.contains(&db_path) { + ensure_host_os_column(&conn).context("Failed to migrate vault schema")?; + migrated_paths.insert(db_path.clone()); + } + f(&conn) } pub fn insert_vault(config: &Config, vault: &Vault) -> Result<()> { + insert_vault_inner(config, vault, true) +} + +#[cfg(test)] +pub(crate) fn insert_vault_preserving_host_for_tests(config: &Config, vault: &Vault) -> Result<()> { + insert_vault_inner(config, vault, false) +} + +fn insert_vault_inner(config: &Config, vault: &Vault, stamp_current_host: bool) -> Result<()> { with_connection(config, |conn| { + let host_os = normalized_host_os(vault.host_os.as_deref()).or_else(|| { + if stamp_current_host { + Some(current_host_os()) + } else { + None + } + }); conn.execute( - "INSERT INTO vaults (id, name, root_path, namespace, include_globs, exclude_globs, created_at, last_synced_at) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", + "INSERT INTO vaults (id, name, root_path, host_os, namespace, include_globs, exclude_globs, created_at, last_synced_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", params![ vault.id, vault.name, vault.root_path, + host_os, vault.namespace, serde_json::to_string(&vault.include_globs)?, serde_json::to_string(&vault.exclude_globs)?, @@ -74,7 +107,7 @@ pub fn insert_vault(config: &Config, vault: &Vault) -> Result<()> { pub fn list_vaults(config: &Config) -> Result> { with_connection(config, |conn| { let mut stmt = conn.prepare( - "SELECT v.id, v.name, v.root_path, v.namespace, v.include_globs, v.exclude_globs, + "SELECT v.id, v.name, v.root_path, v.host_os, v.namespace, v.include_globs, v.exclude_globs, v.created_at, v.last_synced_at, (SELECT COUNT(*) FROM vault_files vf WHERE vf.vault_id = v.id AND vf.status = 'ok') FROM vaults v @@ -83,7 +116,16 @@ pub fn list_vaults(config: &Config) -> Result> { let rows = stmt.query_map([], row_to_vault)?; let mut out = Vec::new(); for row in rows { - out.push(row?); + let vault = row?; + if vault_belongs_to_current_host(&vault) { + out.push(vault); + } else { + log::debug!( + "[vault] hiding incompatible vault id={} host_os={:?}", + vault.id, + vault.host_os + ); + } } Ok(out) }) @@ -91,16 +133,18 @@ pub fn list_vaults(config: &Config) -> Result> { pub fn get_vault(config: &Config, id: &str) -> Result> { with_connection(config, |conn| { - conn.query_row( - "SELECT v.id, v.name, v.root_path, v.namespace, v.include_globs, v.exclude_globs, + let vault = conn + .query_row( + "SELECT v.id, v.name, v.root_path, v.host_os, v.namespace, v.include_globs, v.exclude_globs, v.created_at, v.last_synced_at, (SELECT COUNT(*) FROM vault_files vf WHERE vf.vault_id = v.id AND vf.status = 'ok') FROM vaults v WHERE v.id = ?1", - params![id], - row_to_vault, - ) - .optional() - .context("Failed to read vault") + params![id], + row_to_vault, + ) + .optional() + .context("Failed to read vault")?; + Ok(vault.filter(vault_belongs_to_current_host)) }) } @@ -176,16 +220,17 @@ pub fn delete_file(config: &Config, vault_id: &str, rel_path: &str) -> Result<() } fn row_to_vault(row: &rusqlite::Row<'_>) -> rusqlite::Result { - let include_raw: String = row.get(4)?; - let exclude_raw: String = row.get(5)?; - let created_raw: String = row.get(6)?; - let last_raw: Option = row.get(7)?; - let file_count: i64 = row.get(8)?; + let include_raw: String = row.get(5)?; + let exclude_raw: String = row.get(6)?; + let created_raw: String = row.get(7)?; + let last_raw: Option = row.get(8)?; + let file_count: i64 = row.get(9)?; Ok(Vault { id: row.get(0)?, name: row.get(1)?, root_path: row.get(2)?, - namespace: row.get(3)?, + host_os: row.get(3)?, + namespace: row.get(4)?, include_globs: serde_json::from_str(&include_raw).unwrap_or_default(), exclude_globs: serde_json::from_str(&exclude_raw).unwrap_or_default(), created_at: parse_dt(&created_raw), @@ -215,3 +260,80 @@ fn parse_dt(raw: &str) -> DateTime { .map(|t| t.with_timezone(&Utc)) .unwrap_or_else(|_| Utc::now()) } + +fn ensure_host_os_column(conn: &Connection) -> Result<()> { + let mut stmt = conn.prepare("PRAGMA table_info(vaults)")?; + let columns = stmt.query_map([], |row| row.get::<_, String>(1))?; + let mut has_host_os = false; + for column in columns { + if column?.eq_ignore_ascii_case("host_os") { + has_host_os = true; + break; + } + } + + if !has_host_os { + conn.execute("ALTER TABLE vaults ADD COLUMN host_os TEXT", [])?; + } + Ok(()) +} + +pub(crate) fn current_host_os() -> &'static str { + std::env::consts::OS +} + +pub(crate) fn path_looks_compatible_with_host_os(raw_path: &str, host_os: &str) -> bool { + let path = raw_path.trim(); + if path.is_empty() { + return false; + } + + if is_windows_host_os(host_os) { + return looks_like_windows_absolute_path(path); + } + + looks_like_unix_absolute_path(path) +} + +fn vault_belongs_to_current_host(vault: &Vault) -> bool { + let current = current_host_os(); + let Some(host_os) = normalized_host_os(vault.host_os.as_deref()) else { + return path_looks_compatible_with_host_os(&vault.root_path, current); + }; + + host_os.eq_ignore_ascii_case(current) + && path_looks_compatible_with_host_os(&vault.root_path, current) +} + +fn normalized_host_os(raw: Option<&str>) -> Option<&str> { + raw.map(str::trim).filter(|host_os| !host_os.is_empty()) +} + +fn is_windows_host_os(host_os: &str) -> bool { + host_os.eq_ignore_ascii_case("windows") || host_os.eq_ignore_ascii_case("win32") +} + +fn looks_like_windows_absolute_path(path: &str) -> bool { + looks_like_windows_drive_path(path) || looks_like_windows_unc_path(path) +} + +fn looks_like_windows_drive_path(path: &str) -> bool { + let bytes = path.as_bytes(); + bytes.len() >= 3 + && bytes[0].is_ascii_alphabetic() + && bytes[1] == b':' + && matches!(bytes[2], b'\\' | b'/') +} + +/// Only backslash-style UNC (`\\server\share`). Forward-slash `//…` is +/// POSIX-legal and must not be classified as Windows. +fn looks_like_windows_unc_path(path: &str) -> bool { + let bytes = path.as_bytes(); + bytes.len() >= 3 && bytes[0] == b'\\' && bytes[1] == b'\\' && !matches!(bytes[2], b'\\' | b'/') +} + +fn looks_like_unix_absolute_path(path: &str) -> bool { + path.starts_with('/') + && !looks_like_windows_drive_path(path) + && !looks_like_windows_unc_path(path) +} diff --git a/src/openhuman/vault/tests.rs b/src/openhuman/vault/tests.rs index ead1c2f1ee..cb2aea3b96 100644 --- a/src/openhuman/vault/tests.rs +++ b/src/openhuman/vault/tests.rs @@ -23,6 +23,7 @@ fn sample_vault(root: PathBuf) -> Vault { id: "vault-test-1".to_string(), name: "Test".to_string(), root_path: root.to_string_lossy().to_string(), + host_os: None, namespace: "vault:vault-test-1".to_string(), include_globs: vec![], exclude_globs: vec![], @@ -32,6 +33,100 @@ fn sample_vault(root: PathBuf) -> Vault { } } +fn incompatible_path_for_current_host() -> &'static str { + if cfg!(windows) { + "/home/leigh/OHvault" + } else { + r"C:\Users\leigh\OHvault" + } +} + +#[test] +fn path_compatibility_rejects_cross_platform_absolute_paths() { + assert!(store::path_looks_compatible_with_host_os( + r"C:\Users\leigh\OHvault", + "windows" + )); + assert!(store::path_looks_compatible_with_host_os( + r"\\server\share\OHvault", + "windows" + )); + // Forward-slash `//…` is POSIX-legal, not Windows UNC. + assert!(!store::path_looks_compatible_with_host_os( + "//server/share/OHvault", + "windows" + )); + assert!(!store::path_looks_compatible_with_host_os( + "/home/leigh/OHvault", + "windows" + )); + + assert!(store::path_looks_compatible_with_host_os( + "/home/leigh/OHvault", + "linux" + )); + assert!(store::path_looks_compatible_with_host_os( + "/Users/leigh/OHvault", + "macos" + )); + assert!(!store::path_looks_compatible_with_host_os( + r"C:\Users\leigh\OHvault", + "linux" + )); + assert!(!store::path_looks_compatible_with_host_os( + r"\\server\share\OHvault", + "macos" + )); + // Forward-slash `//…` is POSIX-legal — compatible with Unix hosts. + assert!(store::path_looks_compatible_with_host_os( + "//server/share/OHvault", + "macos" + )); + assert!(store::path_looks_compatible_with_host_os( + "//server/share/OHvault", + "linux" + )); +} + +#[test] +fn store_stamps_new_vaults_with_current_host_os() { + let tmp = TempDir::new().unwrap(); + let config = make_config(&tmp); + let vault = sample_vault(tmp.path().to_path_buf()); + + store::insert_vault(&config, &vault).unwrap(); + + let listed = store::list_vaults(&config).unwrap(); + assert_eq!(listed.len(), 1); + assert_eq!(listed[0].host_os.as_deref(), Some(std::env::consts::OS)); +} + +#[test] +fn store_filters_legacy_vaults_whose_path_belongs_to_another_host_family() { + let tmp = TempDir::new().unwrap(); + let config = make_config(&tmp); + let mut vault = sample_vault(PathBuf::from(incompatible_path_for_current_host())); + vault.host_os = None; + + store::insert_vault_preserving_host_for_tests(&config, &vault).unwrap(); + + assert!(store::list_vaults(&config).unwrap().is_empty()); + assert!(store::get_vault(&config, &vault.id).unwrap().is_none()); +} + +#[test] +fn store_filters_vaults_created_on_a_different_host_os() { + let tmp = TempDir::new().unwrap(); + let config = make_config(&tmp); + let mut vault = sample_vault(tmp.path().to_path_buf()); + vault.host_os = Some(if cfg!(windows) { "linux" } else { "windows" }.to_string()); + + store::insert_vault_preserving_host_for_tests(&config, &vault).unwrap(); + + assert!(store::list_vaults(&config).unwrap().is_empty()); + assert!(store::get_vault(&config, &vault.id).unwrap().is_none()); +} + #[test] fn supported_extension_accepts_md_and_code() { assert!(supported_extension("md")); @@ -231,6 +326,24 @@ fn state_update_progress_noop_on_missing() { // ops.rs — vault_sync_status RPC operation // --------------------------------------------------------------------------- +#[tokio::test] +async fn vault_create_returns_current_host_os() { + let tmp = TempDir::new().unwrap(); + let config = make_config(&tmp); + + let outcome = ops::vault_create( + &config, + "Test", + tmp.path().to_str().unwrap(), + vec![], + vec![], + ) + .await + .unwrap(); + + assert_eq!(outcome.value.host_os.as_deref(), Some(std::env::consts::OS)); +} + #[tokio::test] async fn vault_sync_status_returns_idle_for_unknown_vault() { let outcome = ops::vault_sync_status("__ops_status_unknown__") diff --git a/src/openhuman/vault/types.rs b/src/openhuman/vault/types.rs index c3d204653b..f73c25ce05 100644 --- a/src/openhuman/vault/types.rs +++ b/src/openhuman/vault/types.rs @@ -51,6 +51,8 @@ pub struct Vault { pub id: String, pub name: String, pub root_path: String, + #[serde(default)] + pub host_os: Option, pub namespace: String, pub include_globs: Vec, pub exclude_globs: Vec, diff --git a/tests/mcp_stdio_integration.rs b/tests/mcp_stdio_integration.rs new file mode 100644 index 0000000000..684a289757 --- /dev/null +++ b/tests/mcp_stdio_integration.rs @@ -0,0 +1,30 @@ +//! Integration coverage for the stdio MCP client against the real core binary. +//! +//! Keep this as an integration test so Cargo builds `openhuman-core` as part of +//! the test graph and exposes it through `CARGO_BIN_EXE_openhuman-core`. Running +//! a nested `cargo build` from a lib unit test is prone to CI disk exhaustion. + +use openhuman_core::openhuman::config::McpClientIdentityConfig; +use openhuman_core::openhuman::mcp_client::McpStdioClient; +use std::path::PathBuf; + +const LATEST_PROTOCOL_VERSION: &str = "2025-11-25"; + +#[tokio::test] +async fn stdio_client_talks_to_openhuman_mcp_server() { + let client = McpStdioClient::new( + env!("CARGO_BIN_EXE_openhuman-core").to_string(), + vec!["mcp".into()], + Vec::new(), + Some(PathBuf::from(env!("CARGO_MANIFEST_DIR"))), + McpClientIdentityConfig::default(), + ); + + let init = client.initialize().await.expect("initialize"); + assert_eq!(init.protocol_version, LATEST_PROTOCOL_VERSION); + + let tools = client.list_tools().await.expect("list_tools"); + assert!(tools.iter().any(|tool| tool.name == "memory.search")); + + client.close_session().await.expect("close"); +} diff --git a/tests/screen_intelligence_vision_e2e.rs b/tests/screen_intelligence_vision_e2e.rs index f0dc17de97..9728f15e11 100644 --- a/tests/screen_intelligence_vision_e2e.rs +++ b/tests/screen_intelligence_vision_e2e.rs @@ -88,6 +88,36 @@ fn env_lock() -> std::sync::MutexGuard<'static, ()> { } } +fn expected_vision_summary_memory_key_for_json(summary: &serde_json::Value) -> String { + expected_vision_summary_memory_key( + summary["id"].as_str().expect("summary id"), + summary["captured_at_ms"] + .as_i64() + .expect("summary captured_at_ms"), + ) +} + +fn expected_vision_summary_memory_key_for_summary(summary: &VisionSummary) -> String { + expected_vision_summary_memory_key(&summary.id, summary.captured_at_ms) +} + +fn expected_vision_summary_memory_key(id: &str, captured_at_ms: i64) -> String { + format!( + "screen_intelligence_{}_{}", + captured_at_ms, + stable_decimal_hash(id) + ) +} + +fn stable_decimal_hash(value: &str) -> u64 { + let mut hash = 0xcbf29ce484222325u64; + for byte in value.as_bytes() { + hash ^= u64::from(*byte); + hash = hash.wrapping_mul(0x100000001b3); + } + hash +} + // ── Helpers ────────────────────────────────────────────────────────── /// Create a synthetic PNG data-URI simulating a desktop screenshot. @@ -243,7 +273,7 @@ async fn vision_pipeline_compress_parse_persist() { // ── Step 4: Persist to memory ─────────────────────────────────── let mem = open_test_memory(tmp.path()); let content = serde_json::to_string(&summary).expect("serialize summary"); - let key = format!("screen_intelligence_{}", summary["id"].as_str().unwrap()); + let key = expected_vision_summary_memory_key_for_json(&summary); mem.upsert_document(NamespaceDocumentInput { namespace: "background".to_string(), key: key.clone(), @@ -317,7 +347,7 @@ async fn multiple_vision_summaries_persist_and_query() { }); let content = serde_json::to_string(&summary).expect("serialize"); - let key = format!("screen_intelligence_{}", summary["id"].as_str().unwrap()); + let key = expected_vision_summary_memory_key_for_json(&summary); mem.upsert_document(NamespaceDocumentInput { namespace: "background".to_string(), key, @@ -632,9 +662,9 @@ async fn vision_summary_struct_persist_and_deserialize_roundtrip() { ); // ── Step 2: persist to UnifiedMemory, verify queryable by key ───────── - // Matches exactly what persist_vision_summary() does (namespace, key format, tags). + // Mirrors the PII-safe key contract used by persist_vision_summary(). let mem = open_test_memory(tmp.path()); - let key = format!("screen_intelligence_{}", summary.id); + let key = expected_vision_summary_memory_key_for_summary(&summary); mem.upsert_document(NamespaceDocumentInput { namespace: "background".to_string(), key: key.clone(), @@ -696,7 +726,7 @@ async fn engine_pipeline_with_mocked_local_vision_persists_to_memory() { .as_array() .cloned() .expect("documents array"); - let key = format!("screen_intelligence_{}", summary.id); + let key = expected_vision_summary_memory_key_for_summary(&summary); assert!( docs.iter().any(|doc| doc["key"].as_str() == Some(&key)), "expected persisted summary key in memory: {key}" @@ -767,7 +797,7 @@ async fn macos_real_capture_cycle_persists_summary() { .as_array() .cloned() .expect("documents array"); - let key = format!("screen_intelligence_{}", summary.id); + let key = expected_vision_summary_memory_key_for_summary(&summary); assert!( docs.iter().any(|doc| doc["key"].as_str() == Some(&key)), "expected persisted summary key after real capture cycle: {key}"