diff --git a/apps/code/src/main/di/container.ts b/apps/code/src/main/di/container.ts index f45163c53..2a688ba2a 100644 --- a/apps/code/src/main/di/container.ts +++ b/apps/code/src/main/di/container.ts @@ -10,6 +10,7 @@ import { WorkspaceRepository } from "../db/repositories/workspace-repository"; import { WorktreeRepository } from "../db/repositories/worktree-repository"; import { DatabaseService } from "../db/service"; import { AgentAuthAdapter } from "../services/agent/auth-adapter"; +import { LocalCommandReceiver } from "../services/agent/local-command-receiver"; import { AgentService } from "../services/agent/service"; import { AppLifecycleService } from "../services/app-lifecycle/service"; import { ArchiveService } from "../services/archive/service"; @@ -64,6 +65,7 @@ container.bind(MAIN_TOKENS.ArchiveRepository).to(ArchiveRepository); container.bind(MAIN_TOKENS.SuspensionRepository).to(SuspensionRepositoryImpl); container.bind(MAIN_TOKENS.AgentAuthAdapter).to(AgentAuthAdapter); container.bind(MAIN_TOKENS.AgentService).to(AgentService); +container.bind(MAIN_TOKENS.LocalCommandReceiver).to(LocalCommandReceiver); container.bind(MAIN_TOKENS.AuthService).to(AuthService); container.bind(MAIN_TOKENS.AuthProxyService).to(AuthProxyService); container.bind(MAIN_TOKENS.ArchiveService).to(ArchiveService); diff --git a/apps/code/src/main/di/tokens.ts b/apps/code/src/main/di/tokens.ts index 27bdbcafc..5308d78f4 100644 --- a/apps/code/src/main/di/tokens.ts +++ b/apps/code/src/main/di/tokens.ts @@ -21,6 +21,7 @@ export const MAIN_TOKENS = Object.freeze({ // Services AgentAuthAdapter: Symbol.for("Main.AgentAuthAdapter"), AgentService: Symbol.for("Main.AgentService"), + LocalCommandReceiver: Symbol.for("Main.LocalCommandReceiver"), AuthService: Symbol.for("Main.AuthService"), AuthProxyService: Symbol.for("Main.AuthProxyService"), ArchiveService: Symbol.for("Main.ArchiveService"), diff --git a/apps/code/src/main/services/agent/local-command-receiver.ts b/apps/code/src/main/services/agent/local-command-receiver.ts new file mode 100644 index 000000000..8a3b1d186 --- /dev/null +++ b/apps/code/src/main/services/agent/local-command-receiver.ts @@ -0,0 +1,262 @@ +import { inject, injectable, preDestroy } from "inversify"; +import { MAIN_TOKENS } from "../../di/tokens"; +import { logger } from "../../utils/logger"; +import type { AuthService } from "../auth/service"; + +const log = logger.scope("local-command-receiver"); +const INITIAL_RECONNECT_DELAY_MS = 2000; +const MAX_RECONNECT_DELAY_MS = 30_000; +// After this many consecutive failures, assume Last-Event-ID is stale (event +// trimmed from the backend buffer) and fall back to a fresh connect with +// start=latest. Accepts that we may drop commands issued during the outage. +const STALE_EVENT_ID_THRESHOLD = 3; + +/** + * JSON-RPC envelope carried inside an `incoming_command` SSE event. The + * backend repackages whatever mobile POSTs to /command/ into this shape + * (see products/tasks/backend/api.py :: command). + */ +export interface IncomingCommandPayload { + jsonrpc: string; + method: string; + params?: { content?: string } & Record; + id?: string | number; +} + +interface SubscribeParams { + taskId: string; + taskRunId: string; + projectId: number; + apiHost: string; + onCommand: (payload: IncomingCommandPayload) => Promise; +} + +interface Subscription { + taskRunId: string; + controller: AbortController; +} + +/** + * Subscribes to the PostHog task-run SSE stream for a local run and + * delivers `incoming_command` events (published by the backend when mobile + * POSTs to /command/ on a run with environment=local) to a caller-supplied + * callback. + * + * One SSE connection per subscribed run. Reconnects with backoff on failure. + * Uses the `Last-Event-ID` header to resume from the last processed event + * so brief network blips don't drop commands. + */ +@injectable() +export class LocalCommandReceiver { + private readonly subs = new Map(); + + constructor( + @inject(MAIN_TOKENS.AuthService) + private readonly auth: AuthService, + ) {} + + subscribe(params: SubscribeParams): void { + if (this.subs.has(params.taskRunId)) { + log.debug("Already subscribed", { taskRunId: params.taskRunId }); + return; + } + const controller = new AbortController(); + this.subs.set(params.taskRunId, { + taskRunId: params.taskRunId, + controller, + }); + log.info("Subscribing to SSE stream", { taskRunId: params.taskRunId }); + void this.connectLoop(params, controller).catch((err) => { + if (controller.signal.aborted) return; + log.error("Connect loop exited unexpectedly", { + taskRunId: params.taskRunId, + error: err instanceof Error ? err.message : String(err), + }); + }); + } + + unsubscribe(taskRunId: string): void { + const sub = this.subs.get(taskRunId); + if (!sub) return; + sub.controller.abort(); + this.subs.delete(taskRunId); + log.info("Unsubscribed", { taskRunId }); + } + + @preDestroy() + async shutdown(): Promise { + // Abort before awaiting teardown — per async-cleanup-ordering guidance. + for (const sub of this.subs.values()) sub.controller.abort(); + this.subs.clear(); + } + + private async connectLoop( + params: SubscribeParams, + controller: AbortController, + ): Promise { + let lastEventId: string | undefined; + let consecutiveFailures = 0; + + while (!controller.signal.aborted) { + let streamOpened = false; + try { + const { accessToken } = await this.auth.getValidAccessToken(); + const url = new URL( + `${params.apiHost}/api/projects/${params.projectId}/tasks/${params.taskId}/runs/${params.taskRunId}/stream/`, + ); + if (!lastEventId) { + // Fresh connect: only care about events published from now on. + // On reconnect we use Last-Event-ID instead (see headers below). + url.searchParams.set("start", "latest"); + } + + const headers: Record = { + Authorization: `Bearer ${accessToken}`, + Accept: "text/event-stream", + }; + if (lastEventId) headers["Last-Event-ID"] = lastEventId; + + const response = await fetch(url.toString(), { + headers, + signal: controller.signal, + }); + if (!response.ok) { + const body = await response.text().catch(() => ""); + throw new Error( + `SSE HTTP ${response.status}${body ? `: ${body.slice(0, 200)}` : ""}`, + ); + } + + streamOpened = true; + consecutiveFailures = 0; + lastEventId = await this.readEventStream( + response.body, + params.onCommand, + controller.signal, + lastEventId, + ); + log.info("SSE stream ended cleanly", { + taskRunId: params.taskRunId, + }); + } catch (err) { + if (controller.signal.aborted) return; + if (!streamOpened) consecutiveFailures++; + if ( + consecutiveFailures >= STALE_EVENT_ID_THRESHOLD && + lastEventId !== undefined + ) { + log.warn( + "Dropping possibly-stale Last-Event-ID after repeated failures", + { + taskRunId: params.taskRunId, + consecutiveFailures, + }, + ); + lastEventId = undefined; + } + log.warn("SSE disconnected, will reconnect", { + taskRunId: params.taskRunId, + consecutiveFailures, + error: err instanceof Error ? err.message : String(err), + }); + } + if (controller.signal.aborted) return; + const delay = Math.min( + MAX_RECONNECT_DELAY_MS, + INITIAL_RECONNECT_DELAY_MS * 2 ** Math.max(0, consecutiveFailures - 1), + ); + await this.sleep(delay, controller.signal); + } + } + + private async readEventStream( + body: ReadableStream | null, + onCommand: SubscribeParams["onCommand"], + signal: AbortSignal, + seedLastEventId: string | undefined, + ): Promise { + if (!body) throw new Error("Missing SSE response body"); + const reader = body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + let lastEventId = seedLastEventId; + + try { + while (!signal.aborted) { + const { done, value } = await reader.read(); + if (done) return lastEventId; + buffer += decoder.decode(value, { stream: true }); + + // SSE event blocks are separated by a blank line (\n\n). + while (true) { + const separator = buffer.indexOf("\n\n"); + if (separator === -1) break; + const rawEvent = buffer.slice(0, separator); + buffer = buffer.slice(separator + 2); + + let dataChunks = ""; + let eventId: string | undefined; + for (const line of rawEvent.split("\n")) { + if (line.startsWith("data: ")) { + dataChunks += line.slice(6); + } else if (line.startsWith("id: ")) { + eventId = line.slice(4); + } + // `event:` and comments are ignored — we route on data.type. + } + if (!dataChunks) continue; + + let parsed: unknown; + try { + parsed = JSON.parse(dataChunks); + } catch { + log.warn("Failed to parse SSE data chunk", { + preview: dataChunks.slice(0, 120), + }); + continue; + } + + if ( + typeof parsed === "object" && + parsed !== null && + (parsed as { type?: unknown }).type === "incoming_command" + ) { + const payload = (parsed as { payload?: unknown }).payload; + if (payload && typeof payload === "object") { + try { + await onCommand(payload as IncomingCommandPayload); + } catch (err) { + log.error("Incoming command handler threw", { + error: err instanceof Error ? err.message : String(err), + }); + } + } + } + + if (eventId) lastEventId = eventId; + } + } + return lastEventId; + } finally { + try { + await reader.cancel(); + } catch { + // Reader already closed or cancelled; nothing to do. + } + } + } + + private sleep(ms: number, signal: AbortSignal): Promise { + return new Promise((resolve) => { + const timer = setTimeout(resolve, ms); + signal.addEventListener( + "abort", + () => { + clearTimeout(timer); + resolve(); + }, + { once: true }, + ); + }); + } +} diff --git a/apps/code/src/main/services/agent/schemas.ts b/apps/code/src/main/services/agent/schemas.ts index 5ad875f8f..018af0556 100644 --- a/apps/code/src/main/services/agent/schemas.ts +++ b/apps/code/src/main/services/agent/schemas.ts @@ -16,24 +16,6 @@ export const credentialsSchema = z.object({ export type Credentials = z.infer; -// Session config schema -export const sessionConfigSchema = z.object({ - taskId: z.string(), - taskRunId: z.string(), - repoPath: z.string(), - credentials: credentialsSchema, - logUrl: z.string().optional(), - /** The agent's session ID (for resume - SDK session ID for Claude, Codex's session ID for Codex) */ - sessionId: z.string().optional(), - adapter: z.enum(["claude", "codex"]).optional(), - /** Additional directories Claude can access beyond cwd (for worktree support) */ - additionalDirectories: z.array(z.string()).optional(), - /** Permission mode to use for the session (e.g. "default", "acceptEdits", "plan", "bypassPermissions") */ - permissionMode: z.string().optional(), -}); - -export type SessionConfig = z.infer; - // Start session input/output export const startSessionInput = z.object({ @@ -173,6 +155,7 @@ export const reconnectSessionInput = z.object({ permissionMode: z.string().optional(), customInstructions: z.string().max(2000).optional(), effort: effortLevelSchema.optional(), + runMode: z.enum(["local", "cloud"]).optional(), }); export type ReconnectSessionInput = z.infer; @@ -198,6 +181,11 @@ export const recordActivityInput = z.object({ export const AgentServiceEvent = { SessionEvent: "session-event", PermissionRequest: "permission-request", + // Fires when a pending permission is resolved by anything other than the + // Electron UI (e.g. a mobile client calling permission_response). Renderer + // uses this to clear its own pendingPermissions copy in lockstep with the + // main-process map. + PermissionResolved: "permission-resolved", SessionsIdle: "sessions-idle", SessionIdleKilled: "session-idle-killed", AgentFileActivity: "agent-file-activity", @@ -226,9 +214,15 @@ export interface AgentFileActivityPayload { branchName: string | null; } +export interface PermissionResolvedPayload { + taskRunId: string; + toolCallId: string; +} + export interface AgentServiceEvents { [AgentServiceEvent.SessionEvent]: AgentSessionEventPayload; [AgentServiceEvent.PermissionRequest]: PermissionRequestPayload; + [AgentServiceEvent.PermissionResolved]: PermissionResolvedPayload; [AgentServiceEvent.SessionsIdle]: undefined; [AgentServiceEvent.SessionIdleKilled]: SessionIdleKilledPayload; [AgentServiceEvent.AgentFileActivity]: AgentFileActivityPayload; diff --git a/apps/code/src/main/services/agent/service.test.ts b/apps/code/src/main/services/agent/service.test.ts index 02e1642cc..8cc5789f7 100644 --- a/apps/code/src/main/services/agent/service.test.ts +++ b/apps/code/src/main/services/agent/service.test.ts @@ -176,6 +176,11 @@ function createMockDependencies() { notifyToolResult: vi.fn(), notifyToolCancelled: vi.fn(), }, + localCommandReceiver: { + subscribe: vi.fn(), + unsubscribe: vi.fn(), + shutdown: vi.fn().mockResolvedValue(undefined), + }, }; } @@ -189,11 +194,12 @@ const baseSessionParams = { describe("AgentService", () => { let service: AgentService; + let deps: ReturnType; beforeEach(() => { vi.clearAllMocks(); - const deps = createMockDependencies(); + deps = createMockDependencies(); service = new AgentService( deps.processTracking as never, deps.sleepService as never, @@ -201,6 +207,7 @@ describe("AgentService", () => { deps.posthogPluginService as never, deps.agentAuthAdapter as never, deps.mcpAppsService as never, + deps.localCommandReceiver as never, ); }); @@ -439,4 +446,118 @@ describe("AgentService", () => { ); }); }); + + describe("local runMode", () => { + it("subscribes to local command receiver when runMode is local", async () => { + await service.startSession({ + ...baseSessionParams, + runMode: "local", + }); + + expect(deps.localCommandReceiver.subscribe).toHaveBeenCalledTimes(1); + const call = deps.localCommandReceiver.subscribe.mock.calls[0][0]; + expect(call).toMatchObject({ + taskId: "task-1", + taskRunId: "run-1", + projectId: 1, + apiHost: "https://app.posthog.com", + }); + expect(typeof call.onCommand).toBe("function"); + }); + + it("does not subscribe when runMode is cloud", async () => { + await service.startSession({ + ...baseSessionParams, + runMode: "cloud", + }); + + expect(deps.localCommandReceiver.subscribe).not.toHaveBeenCalled(); + }); + + it("delivers user_message commands to the session via prompt", async () => { + const promptSpy = vi + .spyOn(service, "prompt") + .mockResolvedValue(undefined as never); + + await service.startSession({ + ...baseSessionParams, + runMode: "local", + }); + + const onCommand = deps.localCommandReceiver.subscribe.mock.calls[0][0] + .onCommand as (payload: unknown) => Promise; + + await onCommand({ + jsonrpc: "2.0", + method: "user_message", + params: { content: "hello from mobile" }, + }); + + expect(promptSpy).toHaveBeenCalledWith("run-1", [ + { type: "text", text: "hello from mobile" }, + ]); + }); + + it("ignores non-user_message commands", async () => { + const promptSpy = vi + .spyOn(service, "prompt") + .mockResolvedValue(undefined as never); + + await service.startSession({ + ...baseSessionParams, + runMode: "local", + }); + + const onCommand = deps.localCommandReceiver.subscribe.mock.calls[0][0] + .onCommand as (payload: unknown) => Promise; + + await onCommand({ + jsonrpc: "2.0", + method: "something_else", + params: { content: "ignored" }, + }); + + expect(promptSpy).not.toHaveBeenCalled(); + }); + + it("ignores commands with missing or non-string content", async () => { + const promptSpy = vi + .spyOn(service, "prompt") + .mockResolvedValue(undefined as never); + + await service.startSession({ + ...baseSessionParams, + runMode: "local", + }); + + const onCommand = deps.localCommandReceiver.subscribe.mock.calls[0][0] + .onCommand as (payload: unknown) => Promise; + + await onCommand({ jsonrpc: "2.0", method: "user_message", params: {} }); + await onCommand({ + jsonrpc: "2.0", + method: "user_message", + params: { content: "" }, + }); + + expect(promptSpy).not.toHaveBeenCalled(); + }); + + it("unsubscribes on session cleanup", async () => { + await service.startSession({ + ...baseSessionParams, + runMode: "local", + }); + + await ( + service as unknown as { + cleanupSession: (id: string) => Promise; + } + ).cleanupSession("run-1"); + + expect(deps.localCommandReceiver.unsubscribe).toHaveBeenCalledWith( + "run-1", + ); + }); + }); }); diff --git a/apps/code/src/main/services/agent/service.ts b/apps/code/src/main/services/agent/service.ts index b03024eb5..e15e68a57 100644 --- a/apps/code/src/main/services/agent/service.ts +++ b/apps/code/src/main/services/agent/service.ts @@ -51,6 +51,7 @@ import type { ProcessTrackingService } from "../process-tracking/service"; import type { SleepService } from "../sleep/service"; import type { AgentAuthAdapter } from "./auth-adapter"; import { discoverExternalPlugins } from "./discover-plugins"; +import type { LocalCommandReceiver } from "./local-command-receiver"; import { AgentServiceEvent, type AgentServiceEvents, @@ -255,6 +256,8 @@ interface SessionConfig { effort?: EffortLevel; /** Model to use for the session (e.g. "claude-sonnet-4-6") */ model?: string; + /** Whether this session runs locally on the desktop or in a cloud sandbox */ + runMode?: "local" | "cloud"; } interface ManagedSession { @@ -322,6 +325,7 @@ export class AgentService extends TypedEventEmitter { private posthogPluginService: PosthogPluginService; private agentAuthAdapter: AgentAuthAdapter; private mcpAppsService: McpAppsService; + private localCommandReceiver: LocalCommandReceiver; constructor( @inject(MAIN_TOKENS.ProcessTrackingService) @@ -336,6 +340,8 @@ export class AgentService extends TypedEventEmitter { agentAuthAdapter: AgentAuthAdapter, @inject(MAIN_TOKENS.McpAppsService) mcpAppsService: McpAppsService, + @inject(MAIN_TOKENS.LocalCommandReceiver) + localCommandReceiver: LocalCommandReceiver, ) { super(); this.processTracking = processTracking; @@ -344,6 +350,7 @@ export class AgentService extends TypedEventEmitter { this.posthogPluginService = posthogPluginService; this.agentAuthAdapter = agentAuthAdapter; this.mcpAppsService = mcpAppsService; + this.localCommandReceiver = localCommandReceiver; powerMonitor.on("resume", () => this.checkIdleDeadlines()); } @@ -388,6 +395,7 @@ export class AgentService extends TypedEventEmitter { }); this.pendingPermissions.delete(key); + this.emit(AgentServiceEvent.PermissionResolved, { taskRunId, toolCallId }); this.recordActivity(taskRunId); } @@ -416,6 +424,7 @@ export class AgentService extends TypedEventEmitter { }); this.pendingPermissions.delete(key); + this.emit(AgentServiceEvent.PermissionResolved, { taskRunId, toolCallId }); this.recordActivity(taskRunId); } @@ -809,6 +818,15 @@ When creating pull requests, add the following footer at the end of the PR descr this.sessions.set(taskRunId, session); this.recordActivity(taskRunId); + if (config.runMode === "local") { + this.ensureLocalCommandSubscription( + taskId, + taskRunId, + credentials.projectId, + credentials.apiHost, + ); + } + if (isRetry) { log.info("Session created after auth retry", { taskRunId }); } @@ -1148,7 +1166,110 @@ For git operations while detached: session.inFlightMcpToolCalls.clear(); } + /** + * Idempotently subscribe the LocalCommandReceiver to a task-run's SSE + * stream so mobile-originated /command/ calls reach this session. Called + * both on fresh session creation and when an existing session is reused + * — the receiver itself short-circuits duplicate subscribes, so multiple + * calls are safe. Without this, a session that existed before this code + * path shipped (or that had its subscription torn down by an earlier + * cleanup) would silently drop mobile commands. + */ + private ensureLocalCommandSubscription( + taskId: string, + taskRunId: string, + projectId: number, + apiHost: string, + ): void { + this.localCommandReceiver.subscribe({ + taskId, + taskRunId, + projectId, + apiHost, + onCommand: async (payload) => { + log.debug("Local command received", { + taskRunId, + method: payload.method, + }); + + // Mobile (or any external client) answering an outstanding + // requestPermission call. Route it directly to the pending + // promise rather than treating it as a new prompt — otherwise + // the agent stays blocked inside the current turn and the + // answer starts a second turn that can never run. + if (payload.method === "permission_response") { + const params = payload.params ?? {}; + const toolCallId = + typeof params.toolCallId === "string" + ? params.toolCallId + : undefined; + const optionId = + typeof params.optionId === "string" ? params.optionId : undefined; + const customInput = + typeof params.customInput === "string" + ? params.customInput + : undefined; + const rawAnswers = params.answers; + const answers = + rawAnswers && typeof rawAnswers === "object" + ? (rawAnswers as Record) + : undefined; + if (!toolCallId || !optionId) { + log.warn("Invalid permission_response from external client", { + taskRunId, + hasToolCallId: !!toolCallId, + hasOptionId: !!optionId, + }); + return; + } + try { + this.respondToPermission( + taskRunId, + toolCallId, + optionId, + customInput, + answers, + ); + } catch (err) { + log.error("Failed to apply external permission_response", { + taskRunId, + toolCallId, + error: err instanceof Error ? err.message : String(err), + }); + } + return; + } + + if (payload.method !== "user_message") { + log.debug("Ignoring non-user_message local command", { + method: payload.method, + taskRunId, + }); + return; + } + const content = payload.params?.content; + if (typeof content !== "string" || content.length === 0) { + log.warn("Local command missing content", { taskRunId }); + return; + } + try { + await this.prompt(taskRunId, [{ type: "text", text: content }]); + } catch (err) { + log.error("Failed to deliver local command to session", { + taskRunId, + error: err instanceof Error ? err.message : String(err), + }); + } + }, + }); + } + private async cleanupSession(taskRunId: string): Promise { + // Abort any outstanding SSE subscription for this run first — per + // async-cleanup-ordering guidance, we release external resources + // before awaiting anything that depends on the session being gone. + this.localCommandReceiver.unsubscribe(taskRunId); + const session = this.sessions.get(taskRunId); if (session) { this.cancelInFlightMcpToolCalls(session); @@ -1470,6 +1591,7 @@ For git operations while detached: "customInstructions" in params ? params.customInstructions : undefined, effort: "effort" in params ? params.effort : undefined, model: "model" in params ? params.model : undefined, + runMode: "runMode" in params ? params.runMode : undefined, }; } diff --git a/apps/code/src/main/trpc/routers/agent.ts b/apps/code/src/main/trpc/routers/agent.ts index 98c20a8ce..835845332 100644 --- a/apps/code/src/main/trpc/routers/agent.ts +++ b/apps/code/src/main/trpc/routers/agent.ts @@ -105,6 +105,26 @@ export const agentRouter = router({ } }), + // Permission resolved subscription - yields when a pending permission gets + // answered through a path other than the local UI (e.g. a mobile client). + // The renderer uses this to clear its mirror of pendingPermissions. + onPermissionResolved: publicProcedure + .input(subscribeSessionInput) + .subscription(async function* (opts) { + const service = getService(); + const targetTaskRunId = opts.input.taskRunId; + const iterable = service.toIterable( + AgentServiceEvent.PermissionResolved, + { signal: opts.signal }, + ); + + for await (const event of iterable) { + if (event.taskRunId === targetTaskRunId) { + yield event; + } + } + }), + // Respond to a permission request from the UI respondToPermission: publicProcedure .input(respondToPermissionInput) diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index 7305679de..dce667748 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -16,6 +16,9 @@ const mockTrpcAgent = vi.hoisted(() => ({ cancelPermission: { mutate: vi.fn() }, onSessionEvent: { subscribe: vi.fn() }, onPermissionRequest: { subscribe: vi.fn() }, + onPermissionResolved: { + subscribe: vi.fn(() => ({ unsubscribe: vi.fn() })), + }, onSessionIdleKilled: { subscribe: vi.fn(() => ({ unsubscribe: vi.fn() })) }, resetAll: { mutate: vi.fn().mockResolvedValue(undefined) }, })); diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 12cfeb904..c1c87435a 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -118,6 +118,7 @@ export class SessionService { { event: { unsubscribe: () => void }; permission?: { unsubscribe: () => void }; + permissionResolved?: { unsubscribe: () => void }; } >(); /** Active cloud task watchers, keyed by taskId */ @@ -412,6 +413,7 @@ export class SessionService { adapter: resolvedAdapter, permissionMode: persistedMode, customInstructions: customInstructions || undefined, + runMode: "local", }); if (result) { @@ -586,6 +588,7 @@ export class SessionService { ? (reasoningLevel as EffortLevel) : undefined, model: preferredModel, + runMode: "local", }); const session = this.createBaseSession(taskRun.id, taskId, taskTitle); @@ -695,9 +698,32 @@ export class SessionService { }, ); + // Clears the local pendingPermissions mirror when a permission is + // resolved outside the Electron UI (e.g. a mobile client answers + // the question). Without this, the desktop card would stay visible + // indefinitely even though the agent has already moved on. + const permissionResolvedSubscription = + trpcClient.agent.onPermissionResolved.subscribe( + { taskRunId }, + { + onData: (payload) => { + const session = sessionStoreSetters.getSessions()[taskRunId]; + if (!session) return; + this.resolvePermission(session, payload.toolCallId); + }, + onError: (err) => { + log.error("Permission-resolved subscription error", { + taskRunId, + error: err, + }); + }, + }, + ); + this.subscriptions.set(taskRunId, { event: eventSubscription, permission: permissionSubscription, + permissionResolved: permissionResolvedSubscription, }); } @@ -705,6 +731,7 @@ export class SessionService { const subscription = this.subscriptions.get(taskRunId); subscription?.event.unsubscribe(); subscription?.permission?.unsubscribe(); + subscription?.permissionResolved?.unsubscribe(); this.subscriptions.delete(taskRunId); } diff --git a/apps/mobile/index.js b/apps/mobile/index.js new file mode 100644 index 000000000..80d3d998f --- /dev/null +++ b/apps/mobile/index.js @@ -0,0 +1 @@ +import "expo-router/entry"; diff --git a/apps/mobile/package.json b/apps/mobile/package.json index ee63d673a..233501cdb 100644 --- a/apps/mobile/package.json +++ b/apps/mobile/package.json @@ -1,7 +1,7 @@ { "name": "@posthog/mobile", "version": "1.0.0", - "main": "expo-router/entry", + "main": "./index.js", "scripts": { "start": "expo start", "start:clear": "expo start --clear", diff --git a/apps/mobile/src/app/task/[id].tsx b/apps/mobile/src/app/task/[id].tsx index bbd9b969d..25260a4f8 100644 --- a/apps/mobile/src/app/task/[id].tsx +++ b/apps/mobile/src/app/task/[id].tsx @@ -23,8 +23,13 @@ export default function TaskDetailScreen() { const [loading, setLoading] = useState(true); const [error, setError] = useState(null); - const { connectToTask, disconnectFromTask, sendPrompt, getSessionForTask } = - useTaskSessionStore(); + const { + connectToTask, + disconnectFromTask, + sendPrompt, + sendPermissionResponse, + getSessionForTask, + } = useTaskSessionStore(); const session = taskId ? getSessionForTask(taskId) : undefined; @@ -81,6 +86,16 @@ export default function TaskDetailScreen() { [taskId, sendPrompt], ); + const handleSendPermissionResponse = useCallback( + (args: Parameters[1]) => { + if (!taskId) return; + sendPermissionResponse(taskId, args).catch((err) => { + console.error("Failed to send permission response:", err); + }); + }, + [taskId, sendPermissionResponse], + ); + const handleOpenTask = useCallback( (newTaskId: string) => { router.push(`/task/${newTaskId}`); @@ -161,7 +176,7 @@ export default function TaskDetailScreen() { isConnecting={session?.isPromptPending ?? false} isThinking={session?.isPromptPending ?? false} onOpenTask={handleOpenTask} - onSendAnswer={handleSendPrompt} + onSendPermissionResponse={handleSendPermissionResponse} contentContainerStyle={{ paddingTop: 80 + insets.bottom, paddingBottom: 16, diff --git a/apps/mobile/src/features/tasks/components/QuestionCard.tsx b/apps/mobile/src/features/tasks/components/QuestionCard.tsx index af70b5f4c..6cca5c807 100644 --- a/apps/mobile/src/features/tasks/components/QuestionCard.tsx +++ b/apps/mobile/src/features/tasks/components/QuestionCard.tsx @@ -35,9 +35,17 @@ interface ToolData { result?: unknown; } +interface PermissionResponseArgs { + toolCallId: string; + optionId: string; + answers?: Record; + customInput?: string; + displayText: string; +} + interface QuestionCardProps { toolData: ToolData; - onSendAnswer?: (answer: string) => void; + onSendPermissionResponse?: (args: PermissionResponseArgs) => void; } function extractQuestions(args?: Record): QuestionItem[] { @@ -70,7 +78,10 @@ function extractAnswer(result: unknown): string | null { return null; } -export function QuestionCard({ toolData, onSendAnswer }: QuestionCardProps) { +export function QuestionCard({ + toolData, + onSendPermissionResponse, +}: QuestionCardProps) { const themeColors = useThemeColors(); const questions = extractQuestions(toolData.args); const isCompleted = @@ -115,16 +126,22 @@ export function QuestionCard({ toolData, onSendAnswer }: QuestionCardProps) { } return ( - + ); } function InteractiveQuestion({ questions, - onSendAnswer, + toolCallId, + onSendPermissionResponse, }: { questions: QuestionItem[]; - onSendAnswer?: (answer: string) => void; + toolCallId: string; + onSendPermissionResponse?: (args: PermissionResponseArgs) => void; }) { const themeColors = useThemeColors(); const [currentIndex, setCurrentIndex] = useState(0); @@ -195,18 +212,38 @@ function InteractiveQuestion({ for (const label of selected) { parts.push(label); } - if (otherText.trim()) { - parts.push(otherText.trim()); + const trimmedOther = otherText.trim(); + if (trimmedOther) { + parts.push(trimmedOther); } const answer = parts.join(", "); - if (isLastQuestion) { - if (answer && onSendAnswer) { - onSendAnswer(answer); - } - } else { + if (!isLastQuestion) { setCurrentIndex(currentIndex + 1); + return; } + + if (!answer || !onSendPermissionResponse) return; + + // Derive the ACP optionId the agent is expecting. Options are built + // server-side (buildQuestionOptions in packages/agent) as + // `${OPTION_PREFIX}${idx}` where OPTION_PREFIX is "option_". If the + // user only typed into "Other", fall back to option_0 — the answers + // map carries the actual content for the agent. + const firstSelectedLabel = parts[0]; + const selectedIdx = question.options.findIndex( + (o) => o.label === firstSelectedLabel, + ); + const optionIdx = selectedIdx >= 0 ? selectedIdx : 0; + const optionId = `option_${optionIdx}`; + + onSendPermissionResponse({ + toolCallId, + optionId, + answers: { [question.question]: answer }, + customInput: trimmedOther || undefined, + displayText: answer, + }); }; return ( diff --git a/apps/mobile/src/features/tasks/components/TaskSessionView.tsx b/apps/mobile/src/features/tasks/components/TaskSessionView.tsx index e81257c81..b09b3c1e1 100644 --- a/apps/mobile/src/features/tasks/components/TaskSessionView.tsx +++ b/apps/mobile/src/features/tasks/components/TaskSessionView.tsx @@ -1,9 +1,4 @@ -import { - ArrowDown, - Brain, - CaretRight, - Robot, -} from "phosphor-react-native"; +import { ArrowDown, Brain, CaretRight, Robot } from "phosphor-react-native"; import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { ActivityIndicator, @@ -24,12 +19,20 @@ import type { PlanEntry, SessionEvent, SessionNotification } from "../types"; import { PlanStatusBar } from "./PlanStatusBar"; import { QuestionCard } from "./QuestionCard"; +interface PermissionResponseArgs { + toolCallId: string; + optionId: string; + answers?: Record; + customInput?: string; + displayText: string; +} + interface TaskSessionViewProps { events: SessionEvent[]; isConnecting?: boolean; isThinking?: boolean; onOpenTask?: (taskId: string) => void; - onSendAnswer?: (answer: string) => void; + onSendPermissionResponse?: (args: PermissionResponseArgs) => void; contentContainerStyle?: object; } @@ -113,8 +116,7 @@ function parseSessionNotification( } case "tool_call": { const meta = update._meta?.claudeCode; - const isAgent = - meta?.toolName === "Agent" || meta?.toolName === "Task"; + const isAgent = meta?.toolName === "Agent" || meta?.toolName === "Task"; return { type: "tool", toolData: { @@ -387,7 +389,11 @@ function CollapsedThought({ content }: { content: string }) { function tryReassembleString(obj: Record): string | null { const numericKeys = Object.keys(obj).filter((k) => /^\d+$/.test(k)); if (numericKeys.length < 3) return null; - if (numericKeys.every((k) => typeof obj[k] === "string" && (obj[k] as string).length === 1)) { + if ( + numericKeys.every( + (k) => typeof obj[k] === "string" && (obj[k] as string).length === 1, + ) + ) { return numericKeys .sort((a, b) => Number(a) - Number(b)) .map((k) => obj[k]) @@ -410,7 +416,14 @@ function extractErrorText(result: unknown): string | null { if (reassembled) return reassembled; // Check simple string fields, recurse into nested objects - for (const key of ["error", "message", "stderr", "output", "text", "content"]) { + for (const key of [ + "error", + "message", + "stderr", + "output", + "text", + "content", + ]) { if (typeof obj[key] === "string") return obj[key] as string; if (obj[key] && typeof obj[key] === "object") { const nested = extractErrorText(obj[key]); @@ -439,11 +452,12 @@ function agentPromptSummary(args?: Record): string | null { : null; if (!prompt) return null; // Take the first meaningful line, truncated - const firstLine = prompt.split("\n").find((l) => l.trim())?.trim(); + const firstLine = prompt + .split("\n") + .find((l) => l.trim()) + ?.trim(); if (!firstLine) return null; - return firstLine.length > 120 - ? `${firstLine.slice(0, 120)}…` - : firstLine; + return firstLine.length > 120 ? `${firstLine.slice(0, 120)}…` : firstLine; } function AgentToolCard({ @@ -469,10 +483,7 @@ function AgentToolCard({ return ( {/* Header */} - setExpanded(!expanded)} - className="px-3 py-2" - > + setExpanded(!expanded)} className="px-3 py-2"> {isLoading ? ( @@ -583,7 +594,7 @@ function ThinkingIndicator() { } function ConnectingIndicator() { - const themeColors = useThemeColors(); + const _themeColors = useThemeColors(); const [dots, setDots] = useState(1); useEffect(() => { @@ -607,7 +618,7 @@ export function TaskSessionView({ isConnecting, isThinking, onOpenTask, - onSendAnswer, + onSendPermissionResponse, contentContainerStyle, }: TaskSessionViewProps) { const processorRef = useRef(createProcessorState()); @@ -679,14 +690,12 @@ export function TaskSessionView({ return ( ); } if (item.toolData.isAgent) { - return ( - - ); + return ; } return ( [...taskKeys.all, "list"] as const, - list: (filters?: { repository?: string; createdBy?: number }) => + list: (filters?: { repository?: string }) => [...taskKeys.lists(), filters] as const, details: () => [...taskKeys.all, "detail"] as const, detail: (id: string) => [...taskKeys.details(), id] as const, @@ -27,7 +27,6 @@ export function useTasks(filters?: { repository?: string }) { const queryFilters = { ...filters, - createdBy: currentUser?.id, }; const query = useQuery({ @@ -64,23 +63,15 @@ export function useTask(taskId: string) { export function useCreateTask() { const queryClient = useQueryClient(); - const { data: currentUser } = useUserQuery(); - const invalidateTasks = (newTask?: Task) => { - if (newTask && currentUser?.id) { - // Update the correct cache entry with the user's filter - const queryKey = taskKeys.list({ createdBy: currentUser.id }); - queryClient.setQueryData(queryKey, (old) => - old ? [newTask, ...old] : [newTask], - ); - } + const invalidateTasks = () => { queryClient.invalidateQueries({ queryKey: taskKeys.lists() }); }; const mutation = useMutation({ mutationFn: (options: CreateTaskOptions) => createTask(options), - onSuccess: (newTask) => { - invalidateTasks(newTask); + onSuccess: () => { + invalidateTasks(); }, onError: (error) => { console.error("Failed to create task:", error.message); diff --git a/apps/mobile/src/features/tasks/stores/taskSessionStore.ts b/apps/mobile/src/features/tasks/stores/taskSessionStore.ts index 57b478134..f899ca05b 100644 --- a/apps/mobile/src/features/tasks/stores/taskSessionStore.ts +++ b/apps/mobile/src/features/tasks/stores/taskSessionStore.ts @@ -93,6 +93,16 @@ interface TaskSessionStore { connectToTask: (task: Task) => Promise; disconnectFromTask: (taskId: string) => void; sendPrompt: (taskId: string, prompt: string) => Promise; + sendPermissionResponse: ( + taskId: string, + args: { + toolCallId: string; + optionId: string; + answers?: Record; + customInput?: string; + displayText: string; + }, + ) => Promise; cancelPrompt: (taskId: string) => Promise; getSessionForTask: (taskId: string) => TaskSession | undefined; @@ -124,7 +134,7 @@ export const useTaskSessionStore = create((set, get) => ({ const taskId = task.id; const latestRunId = task.latest_run?.id; const latestRunLogUrl = task.latest_run?.log_url; - const taskDescription = task.description; + const _taskDescription = task.description; if (connectAttempts.has(taskId)) { logger.debug("Connection already in progress", { taskId }); @@ -357,6 +367,84 @@ export const useTaskSessionStore = create((set, get) => ({ } }, + // Resolve an outstanding requestPermission on the desktop/agent side + // (e.g. AskUserQuestion). Unlike sendPrompt, this never queues — a + // permission reply only makes sense while the agent is paused inside + // requestPermission, and it completes an existing turn rather than + // starting a new one. + sendPermissionResponse: async (taskId, args) => { + const session = get().getSessionForTask(taskId); + if (!session) { + throw new Error("No active session for task"); + } + + const ts = Date.now(); + const userEvent: SessionEvent = { + type: "session_update", + ts, + notification: { + update: { + sessionUpdate: "user_message_chunk", + content: { type: "text", text: args.displayText }, + }, + }, + }; + + set((state) => { + const current = state.sessions[session.taskRunId]; + if (!current) return state; + const nextLocalEchoes = new Set(current.localUserEchoes ?? []); + nextLocalEchoes.add(args.displayText); + return { + sessions: { + ...state.sessions, + [session.taskRunId]: { + ...current, + events: [...current.events, userEvent], + localUserEchoes: nextLocalEchoes, + isPromptPending: true, + awaitingPing: true, + }, + }, + }; + }); + + try { + await sendCloudCommand(taskId, session.taskRunId, "permission_response", { + toolCallId: args.toolCallId, + optionId: args.optionId, + ...(args.answers ? { answers: args.answers } : {}), + ...(args.customInput ? { customInput: args.customInput } : {}), + }); + logger.debug("Sent permission_response", { + taskId, + runId: session.taskRunId, + toolCallId: args.toolCallId, + }); + } catch (err) { + logger.error("Failed to send permission_response", err); + // Roll back the optimistic state so the UI reflects reality. + set((state) => { + const current = state.sessions[session.taskRunId]; + if (!current) return state; + const nextLocalEchoes = new Set(current.localUserEchoes ?? []); + nextLocalEchoes.delete(args.displayText); + return { + sessions: { + ...state.sessions, + [session.taskRunId]: { + ...current, + events: current.events.filter((e) => e !== userEvent), + localUserEchoes: nextLocalEchoes, + isPromptPending: false, + }, + }, + }; + }); + throw err; + } + }, + cancelPrompt: async (taskId: string) => { const session = get().getSessionForTask(taskId); if (!session) return false; @@ -460,10 +548,7 @@ export const useTaskSessionStore = create((set, get) => ({ }, }; }); - if ( - shouldPing && - usePreferencesStore.getState().pingsEnabled - ) { + if (shouldPing && usePreferencesStore.getState().pingsEnabled) { playMeepSound().catch(() => {}); } } @@ -550,7 +635,13 @@ export const useTaskSessionStore = create((set, get) => ({ entry.type === "notification" && (entry.notification?.method === "_posthog/turn_complete" || entry.notification?.method === "_posthog/task_complete" || - entry.notification?.method === "_posthog/error") + entry.notification?.method === "_posthog/error" || + // Agent explicitly blocked on a user reply (e.g. a question + // tool invoked via requestPermission). Treat this as a + // turn boundary so the input UI unblocks — otherwise the + // user's answer would be stuck in the "queue while busy" + // path in sendPrompt. + entry.notification?.method === "_posthog/awaiting_user_input") ) { receivedAgentMessage = true; } diff --git a/packages/agent/src/acp-extensions.ts b/packages/agent/src/acp-extensions.ts index 62a2a1083..315b186e9 100644 --- a/packages/agent/src/acp-extensions.ts +++ b/packages/agent/src/acp-extensions.ts @@ -25,6 +25,14 @@ export const POSTHOG_NOTIFICATIONS = { /** Agent finished processing a turn (prompt returned, waiting for next input) */ TURN_COMPLETE: "_posthog/turn_complete", + /** + * Agent has stopped mid-turn and is blocked on a user reply (e.g. a + * question tool invoked via requestPermission). Emitted by permission + * handlers before they block; clients use it to unblock their input UI + * so the user's answer is sent directly instead of being queued. + */ + AWAITING_USER_INPUT: "_posthog/awaiting_user_input", + /** Error occurred during task execution */ ERROR: "_posthog/error", diff --git a/packages/agent/src/adapters/claude/permissions/permission-handlers.ts b/packages/agent/src/adapters/claude/permissions/permission-handlers.ts index 41aa80ab1..f48e4172a 100644 --- a/packages/agent/src/adapters/claude/permissions/permission-handlers.ts +++ b/packages/agent/src/adapters/claude/permissions/permission-handlers.ts @@ -3,6 +3,7 @@ import type { RequestPermissionResponse, } from "@agentclientprotocol/sdk"; import type { PermissionUpdate } from "@anthropic-ai/claude-agent-sdk"; +import { POSTHOG_NOTIFICATIONS } from "../../../acp-extensions"; import { text } from "../../../utils/acp-content"; import type { Logger } from "../../../utils/logger"; import { toolInfoFromToolUse } from "../conversion/tool-use-to-acp"; @@ -276,6 +277,25 @@ async function handleAskUserQuestionTool( input: toolInput, }); + // Tell any attached clients (including log readers like the mobile app) + // that the agent has stopped and is blocked on a user reply. Without this + // signal, clients reading the log-only stream have no reliable way to + // distinguish "tool running" from "tool waiting for a human" — and treat + // replies as queued-while-busy instead of sending them through. + try { + await client.extNotification(POSTHOG_NOTIFICATIONS.AWAITING_USER_INPUT, { + sessionId, + toolCallId: toolUseID, + }); + } catch (err) { + context.logger.warn( + "[AskUserQuestion] Failed to emit awaiting_user_input", + { + error: err, + }, + ); + } + const response = await client.requestPermission({ options, sessionId, diff --git a/packages/agent/src/server/agent-server.ts b/packages/agent/src/server/agent-server.ts index ebe4554a4..ad2775575 100644 --- a/packages/agent/src/server/agent-server.ts +++ b/packages/agent/src/server/agent-server.ts @@ -561,6 +561,38 @@ export class AgentServer { }; } + case "permission_response": { + // Cloud questions are not a blocking permission wait (the cloud + // permission handler returns "cancelled" with a wait hint so + // Claude ends its turn, then resumes on the next user_message). + // So a mobile permission_response for a cloud run is effectively + // a follow-up prompt — forward it through the user_message path + // using the user's answer as prompt text. + const customInput = + typeof params.customInput === "string" ? params.customInput : ""; + const rawAnswers = params.answers; + const answerValues = + rawAnswers && + typeof rawAnswers === "object" && + !Array.isArray(rawAnswers) + ? Object.values(rawAnswers as Record).filter( + (v): v is string => typeof v === "string", + ) + : []; + const answerText = customInput || answerValues.join(", "); + + if (!answerText) { + this.logger.warn("permission_response missing answer content", { + toolCallId: params.toolCallId, + }); + return { stopReason: "cancelled" }; + } + + return await this.executeCommand("user_message", { + content: answerText, + }); + } + case POSTHOG_NOTIFICATIONS.CANCEL: case "cancel": { this.logger.info("Cancel requested", {