diff --git a/.claude/settings.json b/.claude/settings.json index a98a38348..e73b7b901 100644 --- a/.claude/settings.json +++ b/.claude/settings.json @@ -4,10 +4,7 @@ "deny": [ "Read(./apps/cli/**)", "Edit(./apps/cli/**)", - "Write(./apps/cli/**)", - "Read(./apps/mobile/**)", - "Edit(./apps/mobile/**)", - "Write(./apps/mobile/**)" + "Write(./apps/cli/**)" ] } } diff --git a/apps/code/src/main/di/container.ts b/apps/code/src/main/di/container.ts index f45163c53..2a688ba2a 100644 --- a/apps/code/src/main/di/container.ts +++ b/apps/code/src/main/di/container.ts @@ -10,6 +10,7 @@ import { WorkspaceRepository } from "../db/repositories/workspace-repository"; import { WorktreeRepository } from "../db/repositories/worktree-repository"; import { DatabaseService } from "../db/service"; import { AgentAuthAdapter } from "../services/agent/auth-adapter"; +import { LocalCommandReceiver } from "../services/agent/local-command-receiver"; import { AgentService } from "../services/agent/service"; import { AppLifecycleService } from "../services/app-lifecycle/service"; import { ArchiveService } from "../services/archive/service"; @@ -64,6 +65,7 @@ container.bind(MAIN_TOKENS.ArchiveRepository).to(ArchiveRepository); container.bind(MAIN_TOKENS.SuspensionRepository).to(SuspensionRepositoryImpl); container.bind(MAIN_TOKENS.AgentAuthAdapter).to(AgentAuthAdapter); container.bind(MAIN_TOKENS.AgentService).to(AgentService); +container.bind(MAIN_TOKENS.LocalCommandReceiver).to(LocalCommandReceiver); container.bind(MAIN_TOKENS.AuthService).to(AuthService); container.bind(MAIN_TOKENS.AuthProxyService).to(AuthProxyService); container.bind(MAIN_TOKENS.ArchiveService).to(ArchiveService); diff --git a/apps/code/src/main/di/tokens.ts b/apps/code/src/main/di/tokens.ts index 27bdbcafc..5308d78f4 100644 --- a/apps/code/src/main/di/tokens.ts +++ b/apps/code/src/main/di/tokens.ts @@ -21,6 +21,7 @@ export const MAIN_TOKENS = Object.freeze({ // Services AgentAuthAdapter: Symbol.for("Main.AgentAuthAdapter"), AgentService: Symbol.for("Main.AgentService"), + LocalCommandReceiver: Symbol.for("Main.LocalCommandReceiver"), AuthService: Symbol.for("Main.AuthService"), AuthProxyService: Symbol.for("Main.AuthProxyService"), ArchiveService: Symbol.for("Main.ArchiveService"), diff --git a/apps/code/src/main/services/agent/local-command-receiver.ts b/apps/code/src/main/services/agent/local-command-receiver.ts new file mode 100644 index 000000000..8a3b1d186 --- /dev/null +++ b/apps/code/src/main/services/agent/local-command-receiver.ts @@ -0,0 +1,262 @@ +import { inject, injectable, preDestroy } from "inversify"; +import { MAIN_TOKENS } from "../../di/tokens"; +import { logger } from "../../utils/logger"; +import type { AuthService } from "../auth/service"; + +const log = logger.scope("local-command-receiver"); +const INITIAL_RECONNECT_DELAY_MS = 2000; +const MAX_RECONNECT_DELAY_MS = 30_000; +// After this many consecutive failures, assume Last-Event-ID is stale (event +// trimmed from the backend buffer) and fall back to a fresh connect with +// start=latest. Accepts that we may drop commands issued during the outage. +const STALE_EVENT_ID_THRESHOLD = 3; + +/** + * JSON-RPC envelope carried inside an `incoming_command` SSE event. The + * backend repackages whatever mobile POSTs to /command/ into this shape + * (see products/tasks/backend/api.py :: command). + */ +export interface IncomingCommandPayload { + jsonrpc: string; + method: string; + params?: { content?: string } & Record; + id?: string | number; +} + +interface SubscribeParams { + taskId: string; + taskRunId: string; + projectId: number; + apiHost: string; + onCommand: (payload: IncomingCommandPayload) => Promise; +} + +interface Subscription { + taskRunId: string; + controller: AbortController; +} + +/** + * Subscribes to the PostHog task-run SSE stream for a local run and + * delivers `incoming_command` events (published by the backend when mobile + * POSTs to /command/ on a run with environment=local) to a caller-supplied + * callback. + * + * One SSE connection per subscribed run. Reconnects with backoff on failure. + * Uses the `Last-Event-ID` header to resume from the last processed event + * so brief network blips don't drop commands. + */ +@injectable() +export class LocalCommandReceiver { + private readonly subs = new Map(); + + constructor( + @inject(MAIN_TOKENS.AuthService) + private readonly auth: AuthService, + ) {} + + subscribe(params: SubscribeParams): void { + if (this.subs.has(params.taskRunId)) { + log.debug("Already subscribed", { taskRunId: params.taskRunId }); + return; + } + const controller = new AbortController(); + this.subs.set(params.taskRunId, { + taskRunId: params.taskRunId, + controller, + }); + log.info("Subscribing to SSE stream", { taskRunId: params.taskRunId }); + void this.connectLoop(params, controller).catch((err) => { + if (controller.signal.aborted) return; + log.error("Connect loop exited unexpectedly", { + taskRunId: params.taskRunId, + error: err instanceof Error ? err.message : String(err), + }); + }); + } + + unsubscribe(taskRunId: string): void { + const sub = this.subs.get(taskRunId); + if (!sub) return; + sub.controller.abort(); + this.subs.delete(taskRunId); + log.info("Unsubscribed", { taskRunId }); + } + + @preDestroy() + async shutdown(): Promise { + // Abort before awaiting teardown — per async-cleanup-ordering guidance. + for (const sub of this.subs.values()) sub.controller.abort(); + this.subs.clear(); + } + + private async connectLoop( + params: SubscribeParams, + controller: AbortController, + ): Promise { + let lastEventId: string | undefined; + let consecutiveFailures = 0; + + while (!controller.signal.aborted) { + let streamOpened = false; + try { + const { accessToken } = await this.auth.getValidAccessToken(); + const url = new URL( + `${params.apiHost}/api/projects/${params.projectId}/tasks/${params.taskId}/runs/${params.taskRunId}/stream/`, + ); + if (!lastEventId) { + // Fresh connect: only care about events published from now on. + // On reconnect we use Last-Event-ID instead (see headers below). + url.searchParams.set("start", "latest"); + } + + const headers: Record = { + Authorization: `Bearer ${accessToken}`, + Accept: "text/event-stream", + }; + if (lastEventId) headers["Last-Event-ID"] = lastEventId; + + const response = await fetch(url.toString(), { + headers, + signal: controller.signal, + }); + if (!response.ok) { + const body = await response.text().catch(() => ""); + throw new Error( + `SSE HTTP ${response.status}${body ? `: ${body.slice(0, 200)}` : ""}`, + ); + } + + streamOpened = true; + consecutiveFailures = 0; + lastEventId = await this.readEventStream( + response.body, + params.onCommand, + controller.signal, + lastEventId, + ); + log.info("SSE stream ended cleanly", { + taskRunId: params.taskRunId, + }); + } catch (err) { + if (controller.signal.aborted) return; + if (!streamOpened) consecutiveFailures++; + if ( + consecutiveFailures >= STALE_EVENT_ID_THRESHOLD && + lastEventId !== undefined + ) { + log.warn( + "Dropping possibly-stale Last-Event-ID after repeated failures", + { + taskRunId: params.taskRunId, + consecutiveFailures, + }, + ); + lastEventId = undefined; + } + log.warn("SSE disconnected, will reconnect", { + taskRunId: params.taskRunId, + consecutiveFailures, + error: err instanceof Error ? err.message : String(err), + }); + } + if (controller.signal.aborted) return; + const delay = Math.min( + MAX_RECONNECT_DELAY_MS, + INITIAL_RECONNECT_DELAY_MS * 2 ** Math.max(0, consecutiveFailures - 1), + ); + await this.sleep(delay, controller.signal); + } + } + + private async readEventStream( + body: ReadableStream | null, + onCommand: SubscribeParams["onCommand"], + signal: AbortSignal, + seedLastEventId: string | undefined, + ): Promise { + if (!body) throw new Error("Missing SSE response body"); + const reader = body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + let lastEventId = seedLastEventId; + + try { + while (!signal.aborted) { + const { done, value } = await reader.read(); + if (done) return lastEventId; + buffer += decoder.decode(value, { stream: true }); + + // SSE event blocks are separated by a blank line (\n\n). + while (true) { + const separator = buffer.indexOf("\n\n"); + if (separator === -1) break; + const rawEvent = buffer.slice(0, separator); + buffer = buffer.slice(separator + 2); + + let dataChunks = ""; + let eventId: string | undefined; + for (const line of rawEvent.split("\n")) { + if (line.startsWith("data: ")) { + dataChunks += line.slice(6); + } else if (line.startsWith("id: ")) { + eventId = line.slice(4); + } + // `event:` and comments are ignored — we route on data.type. + } + if (!dataChunks) continue; + + let parsed: unknown; + try { + parsed = JSON.parse(dataChunks); + } catch { + log.warn("Failed to parse SSE data chunk", { + preview: dataChunks.slice(0, 120), + }); + continue; + } + + if ( + typeof parsed === "object" && + parsed !== null && + (parsed as { type?: unknown }).type === "incoming_command" + ) { + const payload = (parsed as { payload?: unknown }).payload; + if (payload && typeof payload === "object") { + try { + await onCommand(payload as IncomingCommandPayload); + } catch (err) { + log.error("Incoming command handler threw", { + error: err instanceof Error ? err.message : String(err), + }); + } + } + } + + if (eventId) lastEventId = eventId; + } + } + return lastEventId; + } finally { + try { + await reader.cancel(); + } catch { + // Reader already closed or cancelled; nothing to do. + } + } + } + + private sleep(ms: number, signal: AbortSignal): Promise { + return new Promise((resolve) => { + const timer = setTimeout(resolve, ms); + signal.addEventListener( + "abort", + () => { + clearTimeout(timer); + resolve(); + }, + { once: true }, + ); + }); + } +} diff --git a/apps/code/src/main/services/agent/schemas.ts b/apps/code/src/main/services/agent/schemas.ts index 3ead6cf15..de161a862 100644 --- a/apps/code/src/main/services/agent/schemas.ts +++ b/apps/code/src/main/services/agent/schemas.ts @@ -16,24 +16,6 @@ export const credentialsSchema = z.object({ export type Credentials = z.infer; -// Session config schema -export const sessionConfigSchema = z.object({ - taskId: z.string(), - taskRunId: z.string(), - repoPath: z.string(), - credentials: credentialsSchema, - logUrl: z.string().optional(), - /** The agent's session ID (for resume - SDK session ID for Claude, Codex's session ID for Codex) */ - sessionId: z.string().optional(), - adapter: z.enum(["claude", "codex"]).optional(), - /** Additional directories Claude can access beyond cwd (for worktree support) */ - additionalDirectories: z.array(z.string()).optional(), - /** Permission mode to use for the session (e.g. "default", "acceptEdits", "plan", "bypassPermissions") */ - permissionMode: z.string().optional(), -}); - -export type SessionConfig = z.infer; - // Start session input/output export const startSessionInput = z.object({ @@ -175,6 +157,7 @@ export const reconnectSessionInput = z.object({ customInstructions: z.string().max(2000).optional(), effort: effortLevelSchema.optional(), jsonSchema: z.record(z.string(), z.unknown()).nullish(), + runMode: z.enum(["local", "cloud"]).optional(), }); export type ReconnectSessionInput = z.infer; @@ -200,6 +183,11 @@ export const recordActivityInput = z.object({ export const AgentServiceEvent = { SessionEvent: "session-event", PermissionRequest: "permission-request", + // Fires when a pending permission is resolved by anything other than the + // Electron UI (e.g. a mobile client calling permission_response). Renderer + // uses this to clear its own pendingPermissions copy in lockstep with the + // main-process map. + PermissionResolved: "permission-resolved", SessionsIdle: "sessions-idle", SessionIdleKilled: "session-idle-killed", AgentFileActivity: "agent-file-activity", @@ -228,9 +216,15 @@ export interface AgentFileActivityPayload { branchName: string | null; } +export interface PermissionResolvedPayload { + taskRunId: string; + toolCallId: string; +} + export interface AgentServiceEvents { [AgentServiceEvent.SessionEvent]: AgentSessionEventPayload; [AgentServiceEvent.PermissionRequest]: PermissionRequestPayload; + [AgentServiceEvent.PermissionResolved]: PermissionResolvedPayload; [AgentServiceEvent.SessionsIdle]: undefined; [AgentServiceEvent.SessionIdleKilled]: SessionIdleKilledPayload; [AgentServiceEvent.AgentFileActivity]: AgentFileActivityPayload; diff --git a/apps/code/src/main/services/agent/service.test.ts b/apps/code/src/main/services/agent/service.test.ts index 02e1642cc..8cc5789f7 100644 --- a/apps/code/src/main/services/agent/service.test.ts +++ b/apps/code/src/main/services/agent/service.test.ts @@ -176,6 +176,11 @@ function createMockDependencies() { notifyToolResult: vi.fn(), notifyToolCancelled: vi.fn(), }, + localCommandReceiver: { + subscribe: vi.fn(), + unsubscribe: vi.fn(), + shutdown: vi.fn().mockResolvedValue(undefined), + }, }; } @@ -189,11 +194,12 @@ const baseSessionParams = { describe("AgentService", () => { let service: AgentService; + let deps: ReturnType; beforeEach(() => { vi.clearAllMocks(); - const deps = createMockDependencies(); + deps = createMockDependencies(); service = new AgentService( deps.processTracking as never, deps.sleepService as never, @@ -201,6 +207,7 @@ describe("AgentService", () => { deps.posthogPluginService as never, deps.agentAuthAdapter as never, deps.mcpAppsService as never, + deps.localCommandReceiver as never, ); }); @@ -439,4 +446,118 @@ describe("AgentService", () => { ); }); }); + + describe("local runMode", () => { + it("subscribes to local command receiver when runMode is local", async () => { + await service.startSession({ + ...baseSessionParams, + runMode: "local", + }); + + expect(deps.localCommandReceiver.subscribe).toHaveBeenCalledTimes(1); + const call = deps.localCommandReceiver.subscribe.mock.calls[0][0]; + expect(call).toMatchObject({ + taskId: "task-1", + taskRunId: "run-1", + projectId: 1, + apiHost: "https://app.posthog.com", + }); + expect(typeof call.onCommand).toBe("function"); + }); + + it("does not subscribe when runMode is cloud", async () => { + await service.startSession({ + ...baseSessionParams, + runMode: "cloud", + }); + + expect(deps.localCommandReceiver.subscribe).not.toHaveBeenCalled(); + }); + + it("delivers user_message commands to the session via prompt", async () => { + const promptSpy = vi + .spyOn(service, "prompt") + .mockResolvedValue(undefined as never); + + await service.startSession({ + ...baseSessionParams, + runMode: "local", + }); + + const onCommand = deps.localCommandReceiver.subscribe.mock.calls[0][0] + .onCommand as (payload: unknown) => Promise; + + await onCommand({ + jsonrpc: "2.0", + method: "user_message", + params: { content: "hello from mobile" }, + }); + + expect(promptSpy).toHaveBeenCalledWith("run-1", [ + { type: "text", text: "hello from mobile" }, + ]); + }); + + it("ignores non-user_message commands", async () => { + const promptSpy = vi + .spyOn(service, "prompt") + .mockResolvedValue(undefined as never); + + await service.startSession({ + ...baseSessionParams, + runMode: "local", + }); + + const onCommand = deps.localCommandReceiver.subscribe.mock.calls[0][0] + .onCommand as (payload: unknown) => Promise; + + await onCommand({ + jsonrpc: "2.0", + method: "something_else", + params: { content: "ignored" }, + }); + + expect(promptSpy).not.toHaveBeenCalled(); + }); + + it("ignores commands with missing or non-string content", async () => { + const promptSpy = vi + .spyOn(service, "prompt") + .mockResolvedValue(undefined as never); + + await service.startSession({ + ...baseSessionParams, + runMode: "local", + }); + + const onCommand = deps.localCommandReceiver.subscribe.mock.calls[0][0] + .onCommand as (payload: unknown) => Promise; + + await onCommand({ jsonrpc: "2.0", method: "user_message", params: {} }); + await onCommand({ + jsonrpc: "2.0", + method: "user_message", + params: { content: "" }, + }); + + expect(promptSpy).not.toHaveBeenCalled(); + }); + + it("unsubscribes on session cleanup", async () => { + await service.startSession({ + ...baseSessionParams, + runMode: "local", + }); + + await ( + service as unknown as { + cleanupSession: (id: string) => Promise; + } + ).cleanupSession("run-1"); + + expect(deps.localCommandReceiver.unsubscribe).toHaveBeenCalledWith( + "run-1", + ); + }); + }); }); diff --git a/apps/code/src/main/services/agent/service.ts b/apps/code/src/main/services/agent/service.ts index d2fb6eed3..ad2cadd40 100644 --- a/apps/code/src/main/services/agent/service.ts +++ b/apps/code/src/main/services/agent/service.ts @@ -51,6 +51,7 @@ import type { ProcessTrackingService } from "../process-tracking/service"; import type { SleepService } from "../sleep/service"; import type { AgentAuthAdapter } from "./auth-adapter"; import { discoverExternalPlugins } from "./discover-plugins"; +import type { LocalCommandReceiver } from "./local-command-receiver"; import { AgentServiceEvent, type AgentServiceEvents, @@ -257,6 +258,8 @@ interface SessionConfig { model?: string; /** JSON Schema for structured task output — when set, the agent gets a create_output tool */ jsonSchema?: Record | null; + /** Whether this session runs locally on the desktop or in a cloud sandbox */ + runMode?: "local" | "cloud"; } interface ManagedSession { @@ -324,6 +327,7 @@ export class AgentService extends TypedEventEmitter { private posthogPluginService: PosthogPluginService; private agentAuthAdapter: AgentAuthAdapter; private mcpAppsService: McpAppsService; + private localCommandReceiver: LocalCommandReceiver; constructor( @inject(MAIN_TOKENS.ProcessTrackingService) @@ -338,6 +342,8 @@ export class AgentService extends TypedEventEmitter { agentAuthAdapter: AgentAuthAdapter, @inject(MAIN_TOKENS.McpAppsService) mcpAppsService: McpAppsService, + @inject(MAIN_TOKENS.LocalCommandReceiver) + localCommandReceiver: LocalCommandReceiver, ) { super(); this.processTracking = processTracking; @@ -346,6 +352,7 @@ export class AgentService extends TypedEventEmitter { this.posthogPluginService = posthogPluginService; this.agentAuthAdapter = agentAuthAdapter; this.mcpAppsService = mcpAppsService; + this.localCommandReceiver = localCommandReceiver; powerMonitor.on("resume", () => this.checkIdleDeadlines()); } @@ -390,6 +397,7 @@ export class AgentService extends TypedEventEmitter { }); this.pendingPermissions.delete(key); + this.emit(AgentServiceEvent.PermissionResolved, { taskRunId, toolCallId }); this.recordActivity(taskRunId); } @@ -418,6 +426,7 @@ export class AgentService extends TypedEventEmitter { }); this.pendingPermissions.delete(key); + this.emit(AgentServiceEvent.PermissionResolved, { taskRunId, toolCallId }); this.recordActivity(taskRunId); } @@ -822,6 +831,15 @@ When creating pull requests, add the following footer at the end of the PR descr this.sessions.set(taskRunId, session); this.recordActivity(taskRunId); + if (config.runMode === "local") { + this.ensureLocalCommandSubscription( + taskId, + taskRunId, + credentials.projectId, + credentials.apiHost, + ); + } + if (isRetry) { log.info("Session created after auth retry", { taskRunId }); } @@ -1161,7 +1179,110 @@ For git operations while detached: session.inFlightMcpToolCalls.clear(); } + /** + * Idempotently subscribe the LocalCommandReceiver to a task-run's SSE + * stream so mobile-originated /command/ calls reach this session. Called + * both on fresh session creation and when an existing session is reused + * — the receiver itself short-circuits duplicate subscribes, so multiple + * calls are safe. Without this, a session that existed before this code + * path shipped (or that had its subscription torn down by an earlier + * cleanup) would silently drop mobile commands. + */ + private ensureLocalCommandSubscription( + taskId: string, + taskRunId: string, + projectId: number, + apiHost: string, + ): void { + this.localCommandReceiver.subscribe({ + taskId, + taskRunId, + projectId, + apiHost, + onCommand: async (payload) => { + log.debug("Local command received", { + taskRunId, + method: payload.method, + }); + + // Mobile (or any external client) answering an outstanding + // requestPermission call. Route it directly to the pending + // promise rather than treating it as a new prompt — otherwise + // the agent stays blocked inside the current turn and the + // answer starts a second turn that can never run. + if (payload.method === "permission_response") { + const params = payload.params ?? {}; + const toolCallId = + typeof params.toolCallId === "string" + ? params.toolCallId + : undefined; + const optionId = + typeof params.optionId === "string" ? params.optionId : undefined; + const customInput = + typeof params.customInput === "string" + ? params.customInput + : undefined; + const rawAnswers = params.answers; + const answers = + rawAnswers && typeof rawAnswers === "object" + ? (rawAnswers as Record) + : undefined; + if (!toolCallId || !optionId) { + log.warn("Invalid permission_response from external client", { + taskRunId, + hasToolCallId: !!toolCallId, + hasOptionId: !!optionId, + }); + return; + } + try { + this.respondToPermission( + taskRunId, + toolCallId, + optionId, + customInput, + answers, + ); + } catch (err) { + log.error("Failed to apply external permission_response", { + taskRunId, + toolCallId, + error: err instanceof Error ? err.message : String(err), + }); + } + return; + } + + if (payload.method !== "user_message") { + log.debug("Ignoring non-user_message local command", { + method: payload.method, + taskRunId, + }); + return; + } + const content = payload.params?.content; + if (typeof content !== "string" || content.length === 0) { + log.warn("Local command missing content", { taskRunId }); + return; + } + try { + await this.prompt(taskRunId, [{ type: "text", text: content }]); + } catch (err) { + log.error("Failed to deliver local command to session", { + taskRunId, + error: err instanceof Error ? err.message : String(err), + }); + } + }, + }); + } + private async cleanupSession(taskRunId: string): Promise { + // Abort any outstanding SSE subscription for this run first — per + // async-cleanup-ordering guidance, we release external resources + // before awaiting anything that depends on the session being gone. + this.localCommandReceiver.unsubscribe(taskRunId); + const session = this.sessions.get(taskRunId); if (session) { this.cancelInFlightMcpToolCalls(session); @@ -1484,6 +1605,7 @@ For git operations while detached: effort: "effort" in params ? params.effort : undefined, model: "model" in params ? params.model : undefined, jsonSchema: "jsonSchema" in params ? params.jsonSchema : undefined, + runMode: "runMode" in params ? params.runMode : undefined, }; } diff --git a/apps/code/src/main/trpc/routers/agent.ts b/apps/code/src/main/trpc/routers/agent.ts index 98c20a8ce..835845332 100644 --- a/apps/code/src/main/trpc/routers/agent.ts +++ b/apps/code/src/main/trpc/routers/agent.ts @@ -105,6 +105,26 @@ export const agentRouter = router({ } }), + // Permission resolved subscription - yields when a pending permission gets + // answered through a path other than the local UI (e.g. a mobile client). + // The renderer uses this to clear its mirror of pendingPermissions. + onPermissionResolved: publicProcedure + .input(subscribeSessionInput) + .subscription(async function* (opts) { + const service = getService(); + const targetTaskRunId = opts.input.taskRunId; + const iterable = service.toIterable( + AgentServiceEvent.PermissionResolved, + { signal: opts.signal }, + ); + + for await (const event of iterable) { + if (event.taskRunId === targetTaskRunId) { + yield event; + } + } + }), + // Respond to a permission request from the UI respondToPermission: publicProcedure .input(respondToPermissionInput) diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index a47d7ee56..e7d235e8a 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -16,6 +16,9 @@ const mockTrpcAgent = vi.hoisted(() => ({ cancelPermission: { mutate: vi.fn() }, onSessionEvent: { subscribe: vi.fn() }, onPermissionRequest: { subscribe: vi.fn() }, + onPermissionResolved: { + subscribe: vi.fn(() => ({ unsubscribe: vi.fn() })), + }, onSessionIdleKilled: { subscribe: vi.fn(() => ({ unsubscribe: vi.fn() })) }, resetAll: { mutate: vi.fn().mockResolvedValue(undefined) }, })); diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 75d319dd3..dc5b8e5a6 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -146,6 +146,7 @@ export class SessionService { { event: { unsubscribe: () => void }; permission?: { unsubscribe: () => void }; + permissionResolved?: { unsubscribe: () => void }; } >(); /** Active cloud task watchers, keyed by taskId */ @@ -442,6 +443,7 @@ export class SessionService { adapter: resolvedAdapter, permissionMode: persistedMode, customInstructions: customInstructions || undefined, + runMode: "local", }); if (result) { @@ -616,6 +618,7 @@ export class SessionService { ? (reasoningLevel as EffortLevel) : undefined, model: preferredModel, + runMode: "local", }); const session = this.createBaseSession(taskRun.id, taskId, taskTitle); @@ -725,9 +728,32 @@ export class SessionService { }, ); + // Clears the local pendingPermissions mirror when a permission is + // resolved outside the Electron UI (e.g. a mobile client answers + // the question). Without this, the desktop card would stay visible + // indefinitely even though the agent has already moved on. + const permissionResolvedSubscription = + trpcClient.agent.onPermissionResolved.subscribe( + { taskRunId }, + { + onData: (payload) => { + const session = sessionStoreSetters.getSessions()[taskRunId]; + if (!session) return; + this.resolvePermission(session, payload.toolCallId); + }, + onError: (err) => { + log.error("Permission-resolved subscription error", { + taskRunId, + error: err, + }); + }, + }, + ); + this.subscriptions.set(taskRunId, { event: eventSubscription, permission: permissionSubscription, + permissionResolved: permissionResolvedSubscription, }); } @@ -735,6 +761,7 @@ export class SessionService { const subscription = this.subscriptions.get(taskRunId); subscription?.event.unsubscribe(); subscription?.permission?.unsubscribe(); + subscription?.permissionResolved?.unsubscribe(); this.subscriptions.delete(taskRunId); } diff --git a/apps/mobile/app.json b/apps/mobile/app.json index cb58a4bfc..bfc669988 100644 --- a/apps/mobile/app.json +++ b/apps/mobile/app.json @@ -19,6 +19,7 @@ "bundleIdentifier": "com.posthog.mobile", "infoPlist": { "NSMicrophoneUsageDescription": "Allow PostHog to use your microphone for voice-to-text input", + "NSSpeechRecognitionUsageDescription": "Allow PostHog to transcribe your voice input on-device", "ITSAppUsesNonExemptEncryption": false } }, @@ -152,7 +153,14 @@ } } ], - "expo-localization" + "expo-localization", + [ + "expo-speech-recognition", + { + "microphonePermission": "Allow PostHog to use your microphone for voice-to-text input", + "speechRecognitionPermission": "Allow PostHog to transcribe your voice input on-device" + } + ] ], "extra": { "router": {}, diff --git a/apps/mobile/assets/sounds/meep.mp3 b/apps/mobile/assets/sounds/meep.mp3 new file mode 100644 index 000000000..fd7b4cf7e Binary files /dev/null and b/apps/mobile/assets/sounds/meep.mp3 differ diff --git a/apps/mobile/index.js b/apps/mobile/index.js new file mode 100644 index 000000000..80d3d998f --- /dev/null +++ b/apps/mobile/index.js @@ -0,0 +1 @@ +import "expo-router/entry"; diff --git a/apps/mobile/package.json b/apps/mobile/package.json index 2652f9df1..233501cdb 100644 --- a/apps/mobile/package.json +++ b/apps/mobile/package.json @@ -1,7 +1,7 @@ { "name": "@posthog/mobile", "version": "1.0.0", - "main": "expo-router/entry", + "main": "./index.js", "scripts": { "start": "expo start", "start:clear": "expo start --clear", @@ -42,6 +42,7 @@ "expo-localization": "~17.0.8", "expo-router": "~6.0.17", "expo-secure-store": "^15.0.8", + "expo-speech-recognition": "^3.1.2", "expo-splash-screen": "~31.0.12", "expo-status-bar": "~3.0.9", "expo-system-ui": "~6.0.9", diff --git a/apps/mobile/src/app/(tabs)/_layout.tsx b/apps/mobile/src/app/(tabs)/_layout.tsx index 9641fa233..1e4bc6033 100644 --- a/apps/mobile/src/app/(tabs)/_layout.tsx +++ b/apps/mobile/src/app/(tabs)/_layout.tsx @@ -1,9 +1,11 @@ import { Icon, Label, NativeTabs } from "expo-router/unstable-native-tabs"; import { DynamicColorIOS, Platform } from "react-native"; +import { usePreferencesStore } from "@/features/preferences/stores/preferencesStore"; import { useThemeColors } from "@/lib/theme"; export default function TabsLayout() { const themeColors = useThemeColors(); + const aiChatEnabled = usePreferencesStore((s) => s.aiChatEnabled); // Dynamic colors for liquid glass effect on iOS const dynamicTextColor = @@ -30,21 +32,23 @@ export default function TabsLayout() { tintColor={dynamicTintColor} minimizeBehavior="onScrollDown" > - {/* Conversations - First Tab (default landing) */} - - - - + {/* Conversations - Chats tab, hidden by default to focus on Code */} + {aiChatEnabled && ( + + + + + )} - {/* Tasks Tab */} + {/* Code tab (task list for PostHog Code) */} - + s.aiChatEnabled); + + if (!aiChatEnabled) { + return ; + } const handleConversationPress = (conversation: ConversationDetail) => { router.push(`/chat/${conversation.id}`); diff --git a/apps/mobile/src/app/(tabs)/settings.tsx b/apps/mobile/src/app/(tabs)/settings.tsx index 5c3907b3f..df35d9eb4 100644 --- a/apps/mobile/src/app/(tabs)/settings.tsx +++ b/apps/mobile/src/app/(tabs)/settings.tsx @@ -2,15 +2,21 @@ import { router } from "expo-router"; import { Linking, ScrollView, + Switch, Text, TouchableOpacity, View, } from "react-native"; import { useAuthStore, useUserQuery } from "@/features/auth"; +import { usePreferencesStore } from "@/features/preferences/stores/preferencesStore"; export default function SettingsScreen() { const { logout, cloudRegion, getCloudUrlFromRegion } = useAuthStore(); const { data: userData } = useUserQuery(); + const aiChatEnabled = usePreferencesStore((s) => s.aiChatEnabled); + const setAiChatEnabled = usePreferencesStore((s) => s.setAiChatEnabled); + const pingsEnabled = usePreferencesStore((s) => s.pingsEnabled); + const setPingsEnabled = usePreferencesStore((s) => s.setPingsEnabled); const handleLogout = async () => { await logout(); @@ -88,6 +94,36 @@ export default function SettingsScreen() { + {/* Labs */} + + Labs + + Experimental features + + + + + PostHog AI chat + + + Show the Chats tab for PostHog AI conversations + + + + + + + + Enable pings + + + Play a sound when a task completes + + + + + + {/* All Settings Button */} { + const handle = InteractionManager.runAfterInteractions(() => { + readyRef.current = true; + }); + return () => { + readyRef.current = false; + handle.cancel(); + }; + }, []), + ); const handleCreateTask = () => { router.push("/task"); }; - const handleTaskPress = (taskId: string) => { - router.push(`/task/${taskId}`); - }; + const handleTaskPress = useCallback( + (taskId: string) => { + if (!readyRef.current) return; + readyRef.current = false; + router.push(`/task/${taskId}`); + }, + [router], + ); return ( @@ -20,8 +43,10 @@ export default function TasksScreen() { - Tasks - Your PostHog tasks + Code + + Your PostHog Code sessions + s.aiChatEnabled); const themeColors = useThemeColors(); useScreenTracking(); @@ -47,25 +49,29 @@ function RootLayoutNav() { - {/* Chat routes - regular stack navigation */} - - + {/* Chat routes - only registered when AI chat feature is enabled */} + {aiChatEnabled && ( + <> + + + + )} {/* Task routes - modal presentation */} ("us"); const [isLoading, setIsLoading] = useState(false); const [error, setError] = useState(null); + const [devToken, setDevToken] = useState(""); + const [devProjectId, setDevProjectId] = useState(""); - const { loginWithOAuth } = useAuthStore(); + const { loginWithOAuth, loginWithPersonalApiKey } = useAuthStore(); + + const handleDevSignIn = async () => { + setIsLoading(true); + setError(null); + try { + const projectIdNum = Number(devProjectId); + if (!Number.isFinite(projectIdNum) || projectIdNum <= 0) { + throw new Error("Project ID must be a positive number"); + } + await loginWithPersonalApiKey({ + token: devToken, + projectId: projectIdNum, + region: selectedRegion, + }); + router.replace("/(tabs)"); + } catch (err) { + setError(err instanceof Error ? err.message : "Failed to sign in"); + } finally { + setIsLoading(false); + } + }; const handleSignIn = async () => { setIsLoading(true); @@ -57,7 +87,11 @@ export default function AuthScreen() { return ( - + {/* Header */} @@ -131,8 +165,52 @@ export default function AuthScreen() { )} + + {__DEV__ && ( + + + Dev sign-in (personal API key) + + + Skips OAuth. Create a personal API key at Settings → User API + keys with scopes: user:read, project:read, task:write, + integration:read, conversation:write, query:read. + + + + + + Dev sign in + + + + )} - + ); } diff --git a/apps/mobile/src/app/task/[id].tsx b/apps/mobile/src/app/task/[id].tsx index 35a1aa2d7..5372a88cb 100644 --- a/apps/mobile/src/app/task/[id].tsx +++ b/apps/mobile/src/app/task/[id].tsx @@ -8,6 +8,7 @@ import { useSafeAreaInsets } from "react-native-safe-area-context"; import { Composer } from "@/features/chat"; import { getTask, + runTaskInCloud, type Task, TaskSessionView, useTaskSessionStore, @@ -22,9 +23,15 @@ export default function TaskDetailScreen() { const [task, setTask] = useState(null); const [loading, setLoading] = useState(true); const [error, setError] = useState(null); + const [retrying, setRetrying] = useState(false); - const { connectToTask, disconnectFromTask, sendPrompt, getSessionForTask } = - useTaskSessionStore(); + const { + connectToTask, + disconnectFromTask, + sendPrompt, + sendPermissionResponse, + getSessionForTask, + } = useTaskSessionStore(); const session = taskId ? getSessionForTask(taskId) : undefined; @@ -47,27 +54,60 @@ export default function TaskDetailScreen() { useEffect(() => { if (!taskId) return; + let cancelled = false; setLoading(true); setError(null); getTask(taskId) .then((fetchedTask) => { + if (cancelled) return; setTask(fetchedTask); return connectToTask(fetchedTask); }) + .then(() => { + if (cancelled) return; + // Brief delay for FlatList to render its initial batch behind + // the loading overlay before revealing. + setTimeout(() => setLoading(false), 150); + }) .catch((err) => { + if (cancelled) return; console.error("Failed to load task:", err); setError("Failed to load task"); - }) - .finally(() => { setLoading(false); }); return () => { + cancelled = true; disconnectFromTask(taskId); }; }, [taskId, connectToTask, disconnectFromTask]); + // Auto-reconnect if the session disappears while the screen is active + // (e.g., cloud sandbox expired and the session was cleaned up). + // Re-fetches the task to get a fresh S3 presigned URL. + useEffect(() => { + if (!taskId || !task || loading) return; + if (session) return; + if (retrying) return; + + let cancelled = false; + getTask(taskId) + .then((freshTask) => { + if (cancelled) return; + setTask(freshTask); + return connectToTask(freshTask); + }) + .catch((err) => { + if (cancelled) return; + console.error("Failed to reconnect to task:", err); + }); + + return () => { + cancelled = true; + }; + }, [taskId, task, loading, session, connectToTask, retrying]); + const handleSendPrompt = useCallback( (text: string) => { if (!taskId) return; @@ -78,35 +118,51 @@ export default function TaskDetailScreen() { [taskId, sendPrompt], ); + const handleRetry = useCallback(async () => { + if (!taskId || !task) return; + try { + setRetrying(true); + disconnectFromTask(taskId); + + const updatedTask = await runTaskInCloud(taskId, { + resumeFromRunId: task.latest_run?.id, + }); + setTask(updatedTask); + await connectToTask(updatedTask); + // Don't clear retrying here — the effect below clears it + // once the session shows meaningful state (thinking or terminal). + } catch (err) { + console.error("Failed to retry task:", err); + setRetrying(false); + } + }, [taskId, task, disconnectFromTask, connectToTask]); + + // Clear retrying once the agent finishes a turn or the run terminates. + useEffect(() => { + if (!retrying || !session) return; + if (!session.isPromptPending || session.terminalStatus) { + setRetrying(false); + } + }, [retrying, session]); + + const handleSendPermissionResponse = useCallback( + (args: Parameters[1]) => { + if (!taskId) return; + sendPermissionResponse(taskId, args).catch((err) => { + console.error("Failed to send permission response:", err); + }); + }, + [taskId, sendPermissionResponse], + ); + const handleOpenTask = useCallback( (newTaskId: string) => { - router.push(`/task/${newTaskId}`); + router.replace(`/task/${newTaskId}`); }, [router], ); - if (loading) { - return ( - <> - - - - Loading task... - - - ); - } - - if (error || !task) { + if (error || (!task && !loading)) { return ( <> ( + + + {environment === "cloud" ? "Cloud" : "Local"} + + + ) + : undefined, }} /> + {/* Always render TaskSessionView so the FlatList can layout behind + the loading overlay. This prevents the "flash of messages" when + switching from loading spinner to rendered content. */} 0 + } + terminalStatus={retrying ? undefined : session?.terminalStatus} + lastError={retrying ? undefined : session?.lastError} + onRetry={ + !retrying && session?.terminalStatus ? handleRetry : undefined + } onOpenTask={handleOpenTask} + onSendPermissionResponse={handleSendPermissionResponse} contentContainerStyle={{ - paddingTop: 80 + insets.bottom, + paddingTop: + session?.terminalStatus && !retrying ? 16 : 80 + insets.bottom, paddingBottom: 16, }} /> - {/* Fixed input at bottom */} - - - + {/* Loading overlay — covers the list while it does initial layout */} + {loading && ( + + + + {task?.latest_run ? "Connecting..." : "Loading task..."} + + + )} + + {/* Fixed input at bottom — hidden when run is terminal */} + {!session?.terminalStatus && ( + + + + )} ); diff --git a/apps/mobile/src/app/task/index.tsx b/apps/mobile/src/app/task/index.tsx index 42e535e15..1f4aaf2db 100644 --- a/apps/mobile/src/app/task/index.tsx +++ b/apps/mobile/src/app/task/index.tsx @@ -1,7 +1,7 @@ import { Text } from "@components/text"; import { Stack, useRouter } from "expo-router"; import * as WebBrowser from "expo-web-browser"; -import { useCallback, useEffect, useState } from "react"; +import { useCallback, useEffect, useMemo, useState } from "react"; import { ActivityIndicator, FlatList, @@ -79,10 +79,17 @@ export default function NewTaskScreen() { const [integrations, setIntegrations] = useState([]); const [repositories, setRepositories] = useState([]); const [selectedRepo, setSelectedRepo] = useState(null); + const [repoSearch, setRepoSearch] = useState(""); const [prompt, setPrompt] = useState(""); const [creating, setCreating] = useState(false); const [loadingRepos, setLoadingRepos] = useState(true); + const filteredRepositories = useMemo(() => { + const query = repoSearch.trim().toLowerCase(); + if (!query) return repositories; + return repositories.filter((repo) => repo.toLowerCase().includes(query)); + }, [repositories, repoSearch]); + const loadIntegrations = useCallback(async () => { try { setLoadingRepos(true); @@ -116,14 +123,21 @@ export default function NewTaskScreen() { try { const githubIntegration = integrations.find((i) => i.kind === "github"); + const trimmedPrompt = prompt.trim(); const task = await createTask({ - description: prompt.trim(), - title: prompt.trim().slice(0, 100), + description: trimmedPrompt, + title: trimmedPrompt.slice(0, 100), repository: selectedRepo, github_integration: githubIntegration?.id, }); - await runTaskInCloud(task.id); + // Pass the prompt as pending_user_message so the cloud agent has + // something to process on start — matches how the desktop launches + // new cloud runs. Without this the sandbox starts idle and the UI + // stays stuck on "Thinking...". + await runTaskInCloud(task.id, { + pendingUserMessage: trimmedPrompt, + }); // Navigate to task detail (replaces current modal) router.replace(`/task/${task.id}`); @@ -161,29 +175,50 @@ export default function NewTaskScreen() { ) : ( <> Repository + - item} - renderItem={({ item }) => ( - setSelectedRepo(item)} - className={`border-gray-6 border-b px-3 py-3 ${ - selectedRepo === item ? "bg-accent-3" : "" - }`} - > - + + {repoSearch + ? `No repositories match "${repoSearch}"` + : "No repositories available"} + + + ) : ( + item} + keyboardShouldPersistTaps="handled" + renderItem={({ item }) => ( + setSelectedRepo(item)} + className={`border-gray-6 border-b px-3 py-3 ${ + selectedRepo === item ? "bg-accent-3" : "" }`} > - {item} - - - )} - /> + + {item} + + + )} + /> + )} Task description diff --git a/apps/mobile/src/features/auth/stores/authStore.ts b/apps/mobile/src/features/auth/stores/authStore.ts index a9d105dbe..79a029415 100644 --- a/apps/mobile/src/features/auth/stores/authStore.ts +++ b/apps/mobile/src/features/auth/stores/authStore.ts @@ -29,6 +29,11 @@ interface AuthState { // Methods loginWithOAuth: (region: CloudRegion) => Promise; + loginWithPersonalApiKey: (params: { + token: string; + projectId: number; + region: CloudRegion; + }) => Promise; refreshAccessToken: () => Promise; scheduleTokenRefresh: () => void; initializeAuth: () => Promise; @@ -96,6 +101,40 @@ export const useAuthStore = create()( get().scheduleTokenRefresh(); }, + loginWithPersonalApiKey: async ({ token, projectId, region }) => { + if (!__DEV__) { + throw new Error( + "Dev sign-in is only available in development builds", + ); + } + const trimmed = token.trim(); + if (!trimmed) { + throw new Error("Personal API key is required"); + } + if (!Number.isFinite(projectId) || projectId <= 0) { + throw new Error("Valid project ID is required"); + } + + const storedTokens: StoredTokens = { + accessToken: trimmed, + refreshToken: "", + expiresAt: Number.MAX_SAFE_INTEGER, + cloudRegion: region, + scopedTeams: [projectId], + }; + + await saveTokens(storedTokens); + + set({ + oauthAccessToken: trimmed, + oauthRefreshToken: null, + tokenExpiry: null, + cloudRegion: region, + projectId, + isAuthenticated: true, + }); + }, + refreshAccessToken: async () => { const state = get(); @@ -140,7 +179,8 @@ export const useAuthStore = create()( refreshTimeoutId = null; } - if (!state.tokenExpiry) { + // Personal API key sessions have no refresh token — nothing to schedule. + if (!state.tokenExpiry || !state.oauthRefreshToken) { return; } diff --git a/apps/mobile/src/features/chat/components/AgentMessage.tsx b/apps/mobile/src/features/chat/components/AgentMessage.tsx index 96f3f1bc8..8a5e86042 100644 --- a/apps/mobile/src/features/chat/components/AgentMessage.tsx +++ b/apps/mobile/src/features/chat/components/AgentMessage.tsx @@ -5,6 +5,7 @@ import { useThemeColors } from "@/lib/theme"; import { usePeriodicRerender } from "../hooks/usePeriodicRerender"; import type { AssistantToolCall } from "../types"; import { getRandomThinkingMessage } from "../utils/thinkingMessages"; +import { MarkdownText } from "./MarkdownText"; import { ToolMessage } from "./ToolMessage"; interface AgentMessageProps { @@ -107,9 +108,7 @@ export function AgentMessage({ {/* Show final content */} {content && ( - - {content} - + )} diff --git a/apps/mobile/src/features/chat/components/Composer.tsx b/apps/mobile/src/features/chat/components/Composer.tsx index d7f5bb9ef..0a1104389 100644 --- a/apps/mobile/src/features/chat/components/Composer.tsx +++ b/apps/mobile/src/features/chat/components/Composer.tsx @@ -1,8 +1,10 @@ import { GlassContainer, GlassView } from "expo-glass-effect"; import { ArrowUp, Microphone, Stop } from "phosphor-react-native"; -import { useState } from "react"; +import { useEffect, useRef, useState } from "react"; import { ActivityIndicator, + Animated, + Easing, Platform, TextInput, TouchableOpacity, @@ -15,12 +17,76 @@ interface ComposerProps { onSend: (message: string) => void; disabled?: boolean; placeholder?: string; + isUserTurn?: boolean; + queuedCount?: number; +} + +function PulsingBorder({ + active, + color, +}: { + active: boolean; + color: string; +}) { + const opacity = useRef(new Animated.Value(0)).current; + const animRef = useRef(null); + + useEffect(() => { + if (active) { + opacity.setValue(0); + animRef.current = Animated.loop( + Animated.sequence([ + Animated.timing(opacity, { + toValue: 1, + duration: 1500, + easing: Easing.inOut(Easing.ease), + useNativeDriver: true, + }), + Animated.timing(opacity, { + toValue: 0, + duration: 1500, + easing: Easing.inOut(Easing.ease), + useNativeDriver: true, + }), + ]), + ); + animRef.current.start(); + } else { + animRef.current?.stop(); + animRef.current = null; + opacity.setValue(0); + } + return () => { + animRef.current?.stop(); + }; + }, [active, opacity]); + + if (!active) return null; + + return ( + + ); } export function Composer({ onSend, disabled = false, placeholder = "Ask a question", + isUserTurn = false, + queuedCount = 0, }: ComposerProps) { const themeColors = useThemeColors(); const [message, setMessage] = useState(""); @@ -55,6 +121,11 @@ export function Composer({ }; const canSend = message.trim().length > 0 && !disabled && !isRecording; + const effectivePlaceholder = queuedCount > 0 + ? `${queuedCount} message${queuedCount > 1 ? "s" : ""} queued...` + : !isUserTurn && !disabled + ? "Message will be queued..." + : placeholder; if (Platform.OS === "ios") { return ( @@ -85,40 +156,46 @@ export function Composer({ gap: 8, }} > - {/* Input field with rounded glass background */} - - + + - + isInteractive + > + + + {/* Mic / Send button */} cell.trim()); + // Skip the separator row + if (!/^[\s-:|]+$/.test(lines[i].replace(/\|/g, ""))) { + rows.push(row); + } + i++; + } + if (rows.length > 0) { + blocks.push({ type: "table", content: "", rows }); + } + continue; + } + + // Empty line + if (line.trim() === "") { + i++; + continue; + } + + // Paragraph: collect consecutive non-special lines + const paraLines: string[] = []; + while ( + i < lines.length && + lines[i].trim() !== "" && + !lines[i].startsWith("```") && + !lines[i].match(/^#{1,6}\s/) && + !/^\s*[-*]\s/.test(lines[i]) && + !/^\s*\d+[.)]\s/.test(lines[i]) && + !( + lines[i].includes("|") && + i + 1 < lines.length && + /^\s*\|?[\s-:|]+\|/.test(lines[i + 1]) + ) + ) { + paraLines.push(lines[i]); + i++; + } + if (paraLines.length > 0) { + blocks.push({ type: "paragraph", content: paraLines.join("\n") }); + } + } + + return blocks; +} + +function renderInline(text: string): React.ReactNode[] { + const nodes: React.ReactNode[] = []; + const pattern = /(\*\*(.+?)\*\*|\*(.+?)\*|`([^`]+)`)/g; + let lastIndex = 0; + let match: RegExpExecArray | null = null; + + // biome-ignore lint/suspicious/noAssignInExpressions: regex exec loop + while ((match = pattern.exec(text)) !== null) { + if (match.index > lastIndex) { + nodes.push(text.slice(lastIndex, match.index)); + } + + if (match[2]) { + nodes.push( + + {match[2]} + , + ); + } else if (match[3]) { + nodes.push( + + {match[3]} + , + ); + } else if (match[4]) { + nodes.push( + + {match[4]} + , + ); + } + + lastIndex = match.index + match[0].length; + } + + if (lastIndex < text.length) { + nodes.push(text.slice(lastIndex)); + } + + return nodes.length > 0 ? nodes : [text]; +} + +export function MarkdownText({ content }: MarkdownTextProps) { + const blocks = parseBlocks(content); + + return ( + + {blocks.map((block, i) => { + const key = `block-${i}`; + + switch (block.type) { + case "code": + return ( + + + {block.content} + + + ); + + case "heading": + return ( + + {renderInline(block.content)} + + ); + + case "list": + return ( + + {block.items?.map((item, idx) => ( + + + {block.ordered ? `${idx + 1}.` : "•"} + + + {renderInline(item)} + + + ))} + + ); + + case "table": { + const rows = block.rows ?? []; + const header = rows[0]; + const body = rows.slice(1); + return ( + + + {header && ( + + {header.map((cell, col) => { + const colKey = `${key}-h${col}-${cell}`; + return ( + 0 + ? { + borderLeftWidth: 1, + borderLeftColor: "#3333", + } + : undefined + } + > + + {renderInline(cell)} + + + ); + })} + + )} + {body.map((row, ri) => { + const rowKey = `${key}-r${ri}`; + return ( + + {row.map((cell, col) => { + const cellKey = `${rowKey}-c${col}-${cell}`; + return ( + 0 + ? { + borderLeftWidth: 1, + borderLeftColor: "#3333", + } + : undefined + } + > + + {renderInline(cell)} + + + ); + })} + + ); + })} + + + ); + } + + default: + return ( + + {renderInline(block.content)} + + ); + } + })} + + ); +} diff --git a/apps/mobile/src/features/chat/components/ToolMessage.tsx b/apps/mobile/src/features/chat/components/ToolMessage.tsx index ee6085fcd..a5ab7f8a8 100644 --- a/apps/mobile/src/features/chat/components/ToolMessage.tsx +++ b/apps/mobile/src/features/chat/components/ToolMessage.tsx @@ -54,6 +54,81 @@ const kindIcons: Record = { other: Wrench, }; +export function deriveToolKind(toolName: string): ToolKind { + // Agent titles can include file paths, e.g. "Edit `src/foo.ts`" or + // "Read 200 lines in `bar.ts`", so match on prefix / keyword. + const name = toolName.toLowerCase(); + if (name.startsWith("read") || name === "read_file") return "read"; + if ( + name.startsWith("edit") || + name.startsWith("write") || + name.startsWith("multiedit") || + name.startsWith("multi_edit") || + name === "search_replace" + ) + return "edit"; + if (name.startsWith("delete")) return "delete"; + if ( + name.startsWith("grep") || + name.startsWith("search") || + name.startsWith("glob") || + name.startsWith("find") || + name.startsWith("list") + ) + return "search"; + if ( + name.startsWith("bash") || + name.startsWith("execute") || + name.startsWith("terminal") + ) + return "execute"; + if (name.startsWith("think")) return "think"; + if (name.startsWith("webfetch") || name.startsWith("fetch")) return "fetch"; + if (name === "create_task") return "create_task"; + return "other"; +} + +export function getToolSubtitle( + toolName: string, + args?: Record, +): string | null { + if (!args) return null; + const kind = deriveToolKind(toolName); + + switch (kind) { + case "read": + case "edit": + case "delete": + case "move": + if (typeof args.file_path === "string") + return shortenPath(args.file_path); + if (typeof args.target_file === "string") + return shortenPath(args.target_file); + return null; + case "search": + if (typeof args.pattern === "string") return `"${args.pattern}"`; + return null; + case "execute": + if (typeof args.command === "string") + return args.command.length > 60 + ? `${args.command.slice(0, 60)}...` + : args.command; + return null; + case "fetch": + if (typeof args.url === "string") + return args.url.length > 60 ? `${args.url.slice(0, 60)}...` : args.url; + return null; + case "think": + if (typeof args.content === "string") + return args.content.length > 60 + ? `${args.content.slice(0, 60)}...` + : args.content; + return null; + default: + return null; + } +} + interface CreateTaskArgs { title?: string; description?: string; @@ -93,6 +168,403 @@ export function formatToolTitle( return toolName; } +// Shape guards for file-editing tool args. The agent forwards Claude's raw +// tool input through the ACP `rawInput` field, so we can detect Edit / Write / +// MultiEdit by the keys present in args. +interface EditArgs { + file_path: string; + old_string: string; + new_string: string; +} + +interface MultiEditArgs { + file_path: string; + edits: Array<{ old_string: string; new_string: string }>; +} + +interface WriteArgs { + file_path: string; + content: string; +} + +function asEditArgs( + args: Record | undefined, +): EditArgs | null { + if (!args) return null; + if ( + typeof args.file_path === "string" && + typeof args.old_string === "string" && + typeof args.new_string === "string" + ) { + return { + file_path: args.file_path, + old_string: args.old_string, + new_string: args.new_string, + }; + } + return null; +} + +function asMultiEditArgs( + args: Record | undefined, +): MultiEditArgs | null { + if (!args || typeof args.file_path !== "string") return null; + if (!Array.isArray(args.edits)) return null; + const edits: MultiEditArgs["edits"] = []; + for (const raw of args.edits) { + if ( + raw && + typeof raw === "object" && + typeof (raw as Record).old_string === "string" && + typeof (raw as Record).new_string === "string" + ) { + edits.push({ + old_string: (raw as Record).old_string as string, + new_string: (raw as Record).new_string as string, + }); + } + } + if (edits.length === 0) return null; + return { file_path: args.file_path, edits }; +} + +function asWriteArgs( + args: Record | undefined, +): WriteArgs | null { + if (!args) return null; + if ( + typeof args.file_path === "string" && + typeof args.content === "string" && + args.old_string === undefined + ) { + return { file_path: args.file_path, content: args.content }; + } + return null; +} + +// Strip ANSI escape codes from terminal output +function stripAnsi(text: string): string { + // biome-ignore lint/suspicious/noControlCharactersInRegex: stripping ANSI codes requires matching control chars + return text.replace(/\x1b\[[0-9;]*[a-zA-Z]/g, ""); +} + +function extractResultText(result: unknown): string | null { + if (typeof result === "string") return result; + if (result && typeof result === "object") { + const obj = result as Record; + for (const key of ["stdout", "output", "text", "content"] as const) { + if (typeof obj[key] === "string") return obj[key] as string; + } + } + return null; +} + +function countDiffLines( + editArgs: EditArgs | null, + multiEditArgs: MultiEditArgs | null, + writeArgs: WriteArgs | null, +): { added: number; removed: number } { + let added = 0; + let removed = 0; + + const countFromDiff = (oldText: string, newText: string) => { + const lines = computeLineDiff(oldText, newText, Number.MAX_SAFE_INTEGER); + for (const line of lines) { + if (line.kind === "added") added++; + else if (line.kind === "removed") removed++; + } + }; + + if (editArgs) { + countFromDiff(editArgs.old_string ?? "", editArgs.new_string ?? ""); + } else if (multiEditArgs) { + for (const edit of multiEditArgs.edits) { + countFromDiff(edit.old_string ?? "", edit.new_string ?? ""); + } + } else if (writeArgs) { + added = writeArgs.content ? writeArgs.content.split("\n").length : 0; + } + + return { added, removed }; +} + +// Extract a file path from agent tool titles like "Read `src/foo.ts`" or +// "Read 200 lines in `bar.ts`" when rawInput/args are unavailable. +function extractPathFromTitle(title: string): string | null { + const backtickMatch = title.match(/`([^`]+)`/); + if (backtickMatch) return backtickMatch[1]; + // Fallback: strip common prefixes like "Read file", "Read 200 lines in" + const stripped = title + .replace(/^read\s+/i, "") + .replace(/^file\s*/i, "") + .replace(/^\d+\s+lines?\s+in\s+/i, "") + .trim(); + // Only treat the remainder as a path if it looks like one + if (stripped.includes("/") || stripped.includes(".")) return stripped; + return null; +} + +function shortenPath(path: string, maxLen = 48): string { + if (path.length <= maxLen) return path; + const parts = path.split("/"); + if (parts.length <= 2) return `…${path.slice(-(maxLen - 1))}`; + return `…/${parts.slice(-2).join("/")}`; +} + +// Unified diff support — detects and renders `git diff` output when the agent +// runs commands like `git diff` through the Bash tool and the result comes +// back as stdout rather than a structured tool content block. +type UnifiedDiffLine = + | { kind: "file"; text: string } + | { kind: "hunk"; text: string } + | { kind: "added"; text: string } + | { kind: "removed"; text: string } + | { kind: "context"; text: string } + | { kind: "meta"; text: string }; + +function looksLikeUnifiedDiff(text: string): boolean { + if (!text) return false; + if (/(^|\n)diff --git /.test(text)) return true; + return /(^|\n)--- /.test(text) && /(^|\n)\+\+\+ /.test(text); +} + +function extractDiffFromResult(result: unknown): string | null { + if (typeof result === "string") { + return looksLikeUnifiedDiff(result) ? result : null; + } + if (result && typeof result === "object") { + const obj = result as Record; + for (const key of ["stdout", "output", "text", "content"] as const) { + const value = obj[key]; + if (typeof value === "string" && looksLikeUnifiedDiff(value)) { + return value; + } + } + } + return null; +} + +function parseUnifiedDiff(text: string): UnifiedDiffLine[] { + const result: UnifiedDiffLine[] = []; + for (const line of text.split("\n")) { + if ( + line.startsWith("diff --git ") || + line.startsWith("--- ") || + line.startsWith("+++ ") || + line.startsWith("index ") || + line.startsWith("new file mode") || + line.startsWith("deleted file mode") || + line.startsWith("similarity index") || + line.startsWith("rename ") + ) { + result.push({ kind: "file", text: line }); + } else if (line.startsWith("@@")) { + result.push({ kind: "hunk", text: line }); + } else if (line.startsWith("+")) { + result.push({ kind: "added", text: line }); + } else if (line.startsWith("-")) { + result.push({ kind: "removed", text: line }); + } else if (line.startsWith(" ")) { + result.push({ kind: "context", text: line }); + } else { + result.push({ kind: "meta", text: line }); + } + } + return result; +} + +interface UnifiedDiffBlockProps { + diffText: string; + maxLines?: number; +} + +function UnifiedDiffBlock({ diffText, maxLines = 120 }: UnifiedDiffBlockProps) { + const allLines = parseUnifiedDiff(diffText); + const truncated = allLines.length > maxLines; + const lines = truncated ? allLines.slice(0, maxLines) : allLines; + + return ( + + {lines.map((line, i) => { + let cls = "font-mono text-[11px] leading-4 text-gray-11 px-2"; + if (line.kind === "file") { + cls += " text-gray-9"; + } else if (line.kind === "hunk") { + cls += " bg-accent-3 text-accent-11"; + } else if (line.kind === "added") { + cls += " bg-status-success/10 text-status-success"; + } else if (line.kind === "removed") { + cls += " bg-status-error/10 text-status-error"; + } else if (line.kind === "context") { + cls += " text-gray-11"; + } else { + cls += " text-gray-9"; + } + return ( + + {line.text || " "} + + ); + })} + {truncated && ( + + … {allLines.length - maxLines} more lines + + )} + + ); +} + +// LCS-based line diff: correctly identifies unchanged lines even when +// changes are scattered throughout the block, then collapses distant +// context into separators. +type DiffLine = + | { kind: "context"; text: string } + | { kind: "added"; text: string } + | { kind: "removed"; text: string } + | { kind: "separator" }; + +// O(n*m) LCS — fine for typical edit blocks (< 200 lines). +function lcsBacktrack(a: string[], b: string[]): DiffLine[] { + const m = a.length; + const n = b.length; + + // Build LCS table + const dp: number[][] = []; + for (let i = 0; i <= m; i++) { + dp[i] = new Array(n + 1).fill(0); + } + for (let i = 1; i <= m; i++) { + for (let j = 1; j <= n; j++) { + dp[i][j] = + a[i - 1] === b[j - 1] + ? dp[i - 1][j - 1] + 1 + : Math.max(dp[i - 1][j], dp[i][j - 1]); + } + } + + // Backtrack to produce diff + const result: DiffLine[] = []; + let i = m; + let j = n; + while (i > 0 || j > 0) { + if (i > 0 && j > 0 && a[i - 1] === b[j - 1]) { + result.push({ kind: "context", text: a[i - 1] }); + i--; + j--; + } else if (j > 0 && (i === 0 || dp[i][j - 1] >= dp[i - 1][j])) { + result.push({ kind: "added", text: b[j - 1] }); + j--; + } else { + result.push({ kind: "removed", text: a[i - 1] }); + i--; + } + } + result.reverse(); + return result; +} + +// Collapse context lines far from changes into separators. +function collapseContext(lines: DiffLine[], contextLines: number): DiffLine[] { + // Mark which lines are near a change + const isChange = lines.map((l) => l.kind === "added" || l.kind === "removed"); + const nearChange = new Array(lines.length).fill(false); + for (let i = 0; i < lines.length; i++) { + if (isChange[i]) { + for ( + let k = Math.max(0, i - contextLines); + k <= Math.min(lines.length - 1, i + contextLines); + k++ + ) { + nearChange[k] = true; + } + } + } + + const result: DiffLine[] = []; + let inSkip = false; + for (let i = 0; i < lines.length; i++) { + if (nearChange[i] || isChange[i]) { + inSkip = false; + result.push(lines[i]); + } else if (!inSkip) { + inSkip = true; + result.push({ kind: "separator" }); + } + } + return result; +} + +function computeLineDiff( + oldText: string, + newText: string, + contextLines = 2, +): DiffLine[] { + const oldLines = oldText.length > 0 ? oldText.split("\n") : []; + const newLines = newText.length > 0 ? newText.split("\n") : []; + + if (oldLines.length === 0) { + return newLines.map((l) => ({ kind: "added" as const, text: l })); + } + if (newLines.length === 0) { + return oldLines.map((l) => ({ kind: "removed" as const, text: l })); + } + + const raw = lcsBacktrack(oldLines, newLines); + return collapseContext(raw, contextLines); +} + +interface DiffBlockProps { + oldText: string; + newText: string; + maxLines?: number; +} + +function DiffBlock({ oldText, newText, maxLines = 60 }: DiffBlockProps) { + const allLines = computeLineDiff(oldText, newText); + const truncated = allLines.length > maxLines; + const lines = truncated ? allLines.slice(0, maxLines) : allLines; + + return ( + + {lines.map((line, i) => { + const key = `${line.kind}-${i}`; + if (line.kind === "separator") { + return ( + + ··· + + ); + } + let cls = "font-mono text-[11px] leading-4 px-2"; + if (line.kind === "added") { + cls += " bg-status-success/10 text-status-success"; + } else if (line.kind === "removed") { + cls += " bg-status-error/10 text-status-error"; + } else { + cls += " text-gray-11"; + } + const prefix = + line.kind === "added" ? "+ " : line.kind === "removed" ? "- " : " "; + return ( + + {prefix} + {line.text || " "} + + ); + })} + {truncated && ( + + … {allLines.length - maxLines} more lines + + )} + + ); +} + function CreateTaskPreview({ args, showAction, @@ -234,13 +706,117 @@ export function ToolMessage({ const isLoading = status === "pending" || status === "running"; const isFailed = status === "error"; - const hasDetails = args || result !== undefined; const displayTitle = formatToolTitle(toolName, args); const KindIcon = kind ? kindIcons[kind] : Wrench; const isCreateTask = toolName.toLowerCase() === "create_task" || kind === "create_task"; + // File-editing tools get a proper diff view using the rawInput we already + // receive on the wire. Detection is by shape, not tool name, so it works + // regardless of how the agent labels the tool. + const editArgs = asEditArgs(args); + const multiEditArgs = !editArgs ? asMultiEditArgs(args) : null; + const writeArgs = !editArgs && !multiEditArgs ? asWriteArgs(args) : null; + const fileToolArgs = editArgs ?? multiEditArgs ?? writeArgs; + + // Unified-diff-in-result: when the agent runs commands like `git diff` + // via the Bash tool, the result comes back as stdout containing a unified + // diff string. Detect that and render it as a real diff view. + const unifiedDiffText = !fileToolArgs ? extractDiffFromResult(result) : null; + + if (fileToolArgs && !isCreateTask) { + const stats = countDiffLines(editArgs, multiEditArgs, writeArgs); + // Collapse diffs for failed edits (retries make them noise) + const showDiff = !isFailed || isOpen; + + return ( + + {/* Header row */} + isFailed && setIsOpen(!isOpen)} + className="flex-row items-center gap-2" + disabled={!isFailed} + > + {isLoading ? ( + + ) : ( + + )} + + {shortenPath(fileToolArgs.file_path)} + + {stats.added > 0 && !isFailed && ( + + +{stats.added} + + )} + {stats.removed > 0 && !isFailed && ( + + -{stats.removed} + + )} + {isFailed && ( + + Failed + + )} + + + {/* Diff content — collapsed when failed */} + {showDiff && ( + <> + {editArgs && ( + + )} + {multiEditArgs?.edits.map((edit, i) => ( + + ))} + {writeArgs && } + + )} + + ); + } + + // Unified-diff-in-result renderer (e.g. `git diff` via Bash) + if (unifiedDiffText && !isCreateTask) { + return ( + + + {isLoading ? ( + + ) : ( + + )} + + {displayTitle} + + {isFailed && ( + (Failed) + )} + + + + ); + } + // For create_task, show rich preview instead of expandable if (isCreateTask && args) { return ( @@ -264,18 +840,169 @@ export function ToolMessage({ ); } + const resolvedKind = kind ?? deriveToolKind(toolName); + const isPending = status === "pending"; + const isRunning = status === "running"; + const isCompleted = status === "completed"; + const resultText = extractResultText(result); + + // Execute/Bash: show description + command subtitle + expandable output + if (resolvedKind === "execute") { + const command = typeof args?.command === "string" ? args.command : null; + const description = + typeof args?.description === "string" ? args.description : null; + const outputText = resultText ? stripAnsi(resultText) : null; + const hasOutput = outputText && outputText.trim().length > 0; + + return ( + + {/* Header */} + hasOutput && setIsOpen(!isOpen)} + className="flex-row items-center gap-2" + disabled={!hasOutput} + > + {isLoading ? ( + + ) : ( + + )} + + {description ?? displayTitle} + + {isFailed && ( + + Failed + + )} + + + {/* Command as subtitle line */} + {command && ( + + $ {command} + + )} + + {/* Output */} + {isOpen && hasOutput && ( + + + {outputText} + + + )} + + ); + } + + // Read: show file path, line range, and expandable content preview + if (resolvedKind === "read") { + // Try args first, then extract a path from the tool title (e.g. + // "Read `src/foo.ts`" or "Read 200 lines in `bar.ts`"). + const filePath = + typeof args?.file_path === "string" + ? args.file_path + : typeof args?.target_file === "string" + ? args.target_file + : extractPathFromTitle(toolName); + const hasContent = resultText && resultText.trim().length > 0; + const lineCount = hasContent ? resultText.split("\n").length : null; + const offset = typeof args?.offset === "number" ? args.offset : null; + const limit = typeof args?.limit === "number" ? args.limit : null; + const lineRange = offset + ? `lines ${offset}–${offset + (limit ?? lineCount ?? 0)}` + : lineCount + ? `${lineCount} lines` + : null; + + return ( + + hasContent && setIsOpen(!isOpen)} + className="flex-row items-center gap-2" + disabled={!hasContent} + > + {isLoading ? ( + + ) : ( + + )} + + Read + + {filePath ? ( + + {shortenPath(filePath, 36)} + + ) : null} + {lineRange && isCompleted && ( + + {lineRange} + + )} + {isFailed && ( + + Failed + + )} + + + {/* Content preview */} + {isOpen && hasContent && ( + + + {resultText} + + + )} + + ); + } + + // Default: all other tools (search, think, fetch, etc.) + const subtitle = getToolSubtitle(toolName, args); + return ( - - hasDetails && setIsOpen(!isOpen)} - className="flex-row items-center gap-2" - disabled={!hasDetails} - > + + {/* Status indicator */} {isLoading ? ( ) : ( - + )} {/* Tool name */} @@ -283,45 +1010,28 @@ export function ToolMessage({ {displayTitle} + {/* Queued label */} + {isPending && ( + Queued + )} + {/* Failed indicator */} {isFailed && ( - (Failed) + + Failed + )} - - - {/* Expanded content */} - {isOpen && hasDetails && ( - - {args && ( - - - Arguments - - - - {JSON.stringify(args, null, 2)} - - - - )} - {result !== undefined && ( - - - Result - - - - {typeof result === "string" - ? result - : JSON.stringify(result, null, 2)} - - - - )} - + + + {/* Contextual subtitle */} + {subtitle && !isPending && ( + + {subtitle} + )} ); diff --git a/apps/mobile/src/features/chat/hooks/useVoiceRecording.ts b/apps/mobile/src/features/chat/hooks/useVoiceRecording.ts index 5683fbe39..e1302b142 100644 --- a/apps/mobile/src/features/chat/hooks/useVoiceRecording.ts +++ b/apps/mobile/src/features/chat/hooks/useVoiceRecording.ts @@ -1,7 +1,5 @@ -import { Audio } from "expo-av"; -import { File } from "expo-file-system"; +import { ExpoSpeechRecognitionModule } from "expo-speech-recognition"; import { useCallback, useRef, useState } from "react"; -import { useAuthStore } from "@/features/auth"; type RecordingStatus = "idle" | "recording" | "transcribing" | "error"; @@ -16,148 +14,128 @@ interface UseVoiceRecordingReturn { export function useVoiceRecording(): UseVoiceRecordingReturn { const [status, setStatus] = useState("idle"); const [error, setError] = useState(null); - const recordingRef = useRef(null); + const transcriptRef = useRef(""); + const resolveRef = useRef<((text: string | null) => void) | null>(null); + const listenersRef = useRef<(() => void)[]>([]); + + const cleanup = useCallback(() => { + for (const remove of listenersRef.current) { + remove(); + } + listenersRef.current = []; + resolveRef.current = null; + transcriptRef.current = ""; + }, []); const startRecording = useCallback(async () => { try { setError(null); + transcriptRef.current = ""; - // Request permissions - const { granted } = await Audio.requestPermissionsAsync(); - if (!granted) { - setError("Microphone permission is required"); + if (!ExpoSpeechRecognitionModule.isRecognitionAvailable()) { + setError("Speech recognition is not available on this device"); setStatus("error"); return; } - // Configure audio mode for recording - await Audio.setAudioModeAsync({ - allowsRecordingIOS: true, - playsInSilentModeIOS: true, - }); - - // Create and start recording - const recording = new Audio.Recording(); - await recording.prepareToRecordAsync( - Audio.RecordingOptionsPresets.HIGH_QUALITY, - ); - await recording.startAsync(); - recordingRef.current = recording; - setStatus("recording"); - } catch (err) { - console.error("Failed to start recording:", err); - setError("Failed to start recording"); - setStatus("error"); - } - }, []); - - const stopRecording = useCallback(async (): Promise => { - if (!recordingRef.current) { - return null; - } - - try { - setStatus("transcribing"); - - // Stop recording and get URI - await recordingRef.current.stopAndUnloadAsync(); - const uri = recordingRef.current.getURI(); - recordingRef.current = null; - - // Reset audio mode - await Audio.setAudioModeAsync({ - allowsRecordingIOS: false, - }); - - if (!uri) { - setError("No recording found"); + const { granted } = + await ExpoSpeechRecognitionModule.requestPermissionsAsync(); + if (!granted) { + setError("Speech recognition permission is required"); setStatus("error"); - return null; + return; } - const { - oauthAccessToken, - cloudRegion, - projectId, - getCloudUrlFromRegion, - } = useAuthStore.getState(); - - if (!oauthAccessToken || !cloudRegion || !projectId) { - setError("Not authenticated"); - setStatus("error"); - return null; - } + // Listen for results — accumulate the latest transcript + const resultSub = ExpoSpeechRecognitionModule.addListener( + "result", + (event) => { + const best = event.results[0]?.transcript; + if (best) { + transcriptRef.current = best; + } + if (event.isFinal && resolveRef.current) { + resolveRef.current(transcriptRef.current || null); + cleanup(); + setStatus("idle"); + } + }, + ); - const cloudUrl = getCloudUrlFromRegion(cloudRegion); - - // Create form data with the recording file - const formData = new FormData(); - formData.append("file", { - uri, - type: "audio/mp4", - name: "recording.m4a", - } as unknown as Blob); - - // Call PostHog LLM Gateway transcription API - const response = await fetch( - `${cloudUrl}/api/projects/${projectId}/llm_gateway/v1/audio/transcriptions`, - { - method: "POST", - headers: { - Authorization: `Bearer ${oauthAccessToken}`, - }, - body: formData, + const errorSub = ExpoSpeechRecognitionModule.addListener( + "error", + (event) => { + // "no-speech" is not a real error — just means the user didn't say anything + if (event.error === "no-speech") { + if (resolveRef.current) { + resolveRef.current(null); + } + cleanup(); + setStatus("idle"); + return; + } + setError(event.message || "Speech recognition failed"); + if (resolveRef.current) { + resolveRef.current(null); + } + cleanup(); + setStatus("error"); }, ); - // Clean up the temp file - const recordingFile = new File(uri); - if (recordingFile.exists) { - await recordingFile.delete(); - } + // If recognition ends without a final result (e.g. silence timeout) + const endSub = ExpoSpeechRecognitionModule.addListener("end", () => { + if (resolveRef.current) { + resolveRef.current(transcriptRef.current || null); + cleanup(); + setStatus("idle"); + } + }); - if (!response.ok) { - const errorData = await response.text(); - throw new Error(`Transcription failed: ${errorData}`); - } + listenersRef.current = [ + () => resultSub.remove(), + () => errorSub.remove(), + () => endSub.remove(), + ]; + + const useOnDevice = + ExpoSpeechRecognitionModule.supportsOnDeviceRecognition(); + + ExpoSpeechRecognitionModule.start({ + lang: "en-US", + interimResults: true, + requiresOnDeviceRecognition: useOnDevice, + addsPunctuation: true, + }); - const data = await response.json(); - setStatus("idle"); - return data.text; + setStatus("recording"); } catch (err) { - console.error("Failed to transcribe:", err); - const errorMessage = - err instanceof Error ? err.message : "Transcription failed"; - setError(errorMessage); + console.error("Failed to start speech recognition:", err); + setError("Failed to start speech recognition"); setStatus("error"); - return null; } - }, []); + }, [cleanup]); - const cancelRecording = useCallback(async () => { - if (recordingRef.current) { - try { - await recordingRef.current.stopAndUnloadAsync(); - const uri = recordingRef.current.getURI(); - if (uri) { - const file = new File(uri); - if (file.exists) { - await file.delete(); - } - } - } catch { - // Ignore cleanup errors - } - recordingRef.current = null; + const stopRecording = useCallback(async (): Promise => { + if (status !== "recording") { + return null; } - await Audio.setAudioModeAsync({ - allowsRecordingIOS: false, + setStatus("transcribing"); + + return new Promise((resolve) => { + resolveRef.current = resolve; + // stop() asks the recognizer to deliver a final result then end + ExpoSpeechRecognitionModule.stop(); }); + }, [status]); + const cancelRecording = useCallback(async () => { + ExpoSpeechRecognitionModule.abort(); + cleanup(); setStatus("idle"); setError(null); - }, []); + }, [cleanup]); return { status, diff --git a/apps/mobile/src/features/chat/index.ts b/apps/mobile/src/features/chat/index.ts index 3ca5c7509..cc4d7c8d1 100644 --- a/apps/mobile/src/features/chat/index.ts +++ b/apps/mobile/src/features/chat/index.ts @@ -11,7 +11,7 @@ export type { ToolMessageProps, ToolStatus, } from "./components/ToolMessage"; -export { ToolMessage } from "./components/ToolMessage"; +export { deriveToolKind, ToolMessage } from "./components/ToolMessage"; export { VisualizationArtifact } from "./components/VisualizationArtifact"; // Hooks diff --git a/apps/mobile/src/features/preferences/stores/preferencesStore.ts b/apps/mobile/src/features/preferences/stores/preferencesStore.ts new file mode 100644 index 000000000..1e44c1cce --- /dev/null +++ b/apps/mobile/src/features/preferences/stores/preferencesStore.ts @@ -0,0 +1,29 @@ +import AsyncStorage from "@react-native-async-storage/async-storage"; +import { create } from "zustand"; +import { createJSONStorage, persist } from "zustand/middleware"; + +interface PreferencesState { + aiChatEnabled: boolean; + setAiChatEnabled: (enabled: boolean) => void; + pingsEnabled: boolean; + setPingsEnabled: (enabled: boolean) => void; +} + +export const usePreferencesStore = create()( + persist( + (set) => ({ + aiChatEnabled: false, + setAiChatEnabled: (enabled) => set({ aiChatEnabled: enabled }), + pingsEnabled: true, + setPingsEnabled: (enabled) => set({ pingsEnabled: enabled }), + }), + { + name: "posthog-preferences", + storage: createJSONStorage(() => AsyncStorage), + partialize: (state) => ({ + aiChatEnabled: state.aiChatEnabled, + pingsEnabled: state.pingsEnabled, + }), + }, + ), +); diff --git a/apps/mobile/src/features/tasks/api.ts b/apps/mobile/src/features/tasks/api.ts index 10f023349..7e35ce882 100644 --- a/apps/mobile/src/features/tasks/api.ts +++ b/apps/mobile/src/features/tasks/api.ts @@ -160,16 +160,52 @@ export async function deleteTask(taskId: string): Promise { } } -export async function runTaskInCloud(taskId: string): Promise { +export interface RunTaskInCloudOptions { + branch?: string | null; + resumeFromRunId?: string; + pendingUserMessage?: string; + mode?: "interactive" | "background"; +} + +export async function runTaskInCloud( + taskId: string, + options?: RunTaskInCloudOptions, +): Promise { const baseUrl = getBaseUrl(); const projectId = getProjectId(); const headers = getHeaders(); + // Only serialize a body when we have options to send. Sending an empty + // or minimal body on the initial run historically changed backend + // behavior, so we preserve the "no body" path for the common case. + const hasOptions = + !!options && + (options.branch !== undefined || + options.resumeFromRunId !== undefined || + options.pendingUserMessage !== undefined || + options.mode !== undefined); + + let body: string | undefined; + if (hasOptions) { + const payload: Record = { + mode: options?.mode ?? "interactive", + }; + if (options?.branch) payload.branch = options.branch; + if (options?.resumeFromRunId) { + payload.resume_from_run_id = options.resumeFromRunId; + } + if (options?.pendingUserMessage) { + payload.pending_user_message = options.pendingUserMessage; + } + body = JSON.stringify(payload); + } + const response = await fetch( `${baseUrl}/api/projects/${projectId}/tasks/${taskId}/run/`, { method: "POST", headers, + body, }, ); @@ -228,10 +264,113 @@ export async function appendTaskRunLog( ); } +/** + * Structured error thrown by `sendCloudCommand`. Exposes the HTTP status and + * the backend error payload so callers can branch on specific failure modes + * (e.g. "No active sandbox for this task run" → trigger a resume flow). + */ +export class CloudCommandError extends Error { + readonly status: number; + readonly backendError: string | null; + readonly method: string; + + constructor( + method: string, + status: number, + backendError: string | null, + message: string, + ) { + super(message); + this.name = "CloudCommandError"; + this.method = method; + this.status = status; + this.backendError = backendError; + } + + /** True when the cloud sandbox for this run has terminated. */ + isSandboxInactive(): boolean { + return ( + !!this.backendError?.includes("No active sandbox") || + !!this.backendError?.includes("returned 404") || + this.status === 404 + ); + } +} + +/** + * Sends a JSON-RPC command to a running cloud task. This is the correct path + * for delivering follow-up user prompts to the agent — it gets translated into + * `session/prompt` on the agent side. Note: `appendTaskRunLog` only writes to + * S3 for display; it does NOT notify the agent. + */ +export async function sendCloudCommand( + taskId: string, + runId: string, + method: string, + params: Record = {}, +): Promise { + const baseUrl = getBaseUrl(); + const projectId = getProjectId(); + const headers = getHeaders(); + + const body = { + jsonrpc: "2.0", + method, + params, + id: `posthog-mobile-${Date.now()}`, + }; + + const response = await fetch( + `${baseUrl}/api/projects/${projectId}/tasks/${taskId}/runs/${runId}/command/`, + { + method: "POST", + headers, + body: JSON.stringify(body), + }, + ); + + if (!response.ok) { + const text = await response.text().catch(() => ""); + let backendError: string | null = null; + try { + const parsed = JSON.parse(text); + backendError = + typeof parsed?.error === "string" + ? parsed.error + : (parsed?.error?.message ?? null); + } catch { + backendError = text || null; + } + throw new CloudCommandError( + method, + response.status, + backendError, + `Cloud command '${method}' failed: ${response.status} ${response.statusText} ${text}`, + ); + } + + const data = await response.json(); + if (data?.error) { + const message = + typeof data.error === "string" + ? data.error + : (data.error.message ?? JSON.stringify(data.error)); + throw new CloudCommandError( + method, + 200, + message, + `Cloud command '${method}' error: ${message}`, + ); + } + return data?.result; +} + export async function fetchS3Logs(logUrl: string): Promise { return withRetry( async () => { - const response = await fetch(logUrl); + const response = await fetch(logUrl, { + signal: AbortSignal.timeout(10_000), + }); if (!response.ok) { if (response.status === 404) { @@ -281,17 +420,13 @@ export async function getGithubRepositories( } const data = await response.json(); - - const integrations = await getIntegrations(); - const integration = integrations.find((i) => i.id === integrationId); - const organization = - integration?.display_name || - integration?.config?.account?.login || - "unknown"; - - const repoNames = data.repositories ?? data.results ?? data ?? []; - return repoNames.map( - (repoName: string) => - `${organization.toLowerCase()}/${repoName.toLowerCase()}`, - ); + const repos: Array = + data.repositories ?? data.results ?? data ?? []; + + return repos + .map((repo) => { + if (typeof repo === "string") return repo.toLowerCase(); + return (repo.full_name ?? repo.name ?? "").toLowerCase(); + }) + .filter((name) => name.length > 0); } diff --git a/apps/mobile/src/features/tasks/components/PlanStatusBar.tsx b/apps/mobile/src/features/tasks/components/PlanStatusBar.tsx new file mode 100644 index 000000000..2c8633cc5 --- /dev/null +++ b/apps/mobile/src/features/tasks/components/PlanStatusBar.tsx @@ -0,0 +1,88 @@ +import { CheckCircle, CircleDashed, XCircle } from "phosphor-react-native"; +import { useMemo, useState } from "react"; +import { ActivityIndicator, Pressable, Text, View } from "react-native"; +import { useThemeColors } from "@/lib/theme"; +import type { PlanEntry } from "../types"; + +interface PlanStatusBarProps { + plan: PlanEntry[] | null; +} + +function StatusIcon({ status }: { status: string }) { + const themeColors = useThemeColors(); + + switch (status) { + case "completed": + return ; + case "in_progress": + return ; + case "failed": + return ; + default: + return ; + } +} + +export function PlanStatusBar({ plan }: PlanStatusBarProps) { + const [isExpanded, setIsExpanded] = useState(false); + const themeColors = useThemeColors(); + + const stats = useMemo(() => { + if (!plan?.length) return null; + + const completed = plan.filter((e) => e.status === "completed").length; + const total = plan.length; + const inProgress = plan.find((e) => e.status === "in_progress"); + const allCompleted = completed === total; + + return { completed, total, inProgress, allCompleted }; + }, [plan]); + + if (!stats || stats.allCompleted) return null; + + return ( + + setIsExpanded(!isExpanded)} + className="flex-row items-center gap-2 px-4 py-2.5" + > + + {stats.completed}/{stats.total} completed + + {stats.inProgress && ( + <> + · + + + {stats.inProgress.content} + + + )} + + + {isExpanded && plan && ( + + {plan.map((entry) => ( + + + + {entry.content} + + + ))} + + )} + + ); +} diff --git a/apps/mobile/src/features/tasks/components/QuestionCard.tsx b/apps/mobile/src/features/tasks/components/QuestionCard.tsx new file mode 100644 index 000000000..6cca5c807 --- /dev/null +++ b/apps/mobile/src/features/tasks/components/QuestionCard.tsx @@ -0,0 +1,378 @@ +import { + ChatCircle, + CheckCircle, + CircleDashed, + RadioButton, +} from "phosphor-react-native"; +import { useState } from "react"; +import { + Pressable, + Text, + TextInput, + TouchableOpacity, + View, +} from "react-native"; +import type { ToolStatus } from "@/features/chat"; +import { useThemeColors } from "@/lib/theme"; + +interface QuestionOption { + label: string; + description?: string; +} + +interface QuestionItem { + question: string; + header?: string; + options: QuestionOption[]; + multiSelect?: boolean; +} + +interface ToolData { + toolName: string; + toolCallId: string; + status: ToolStatus; + args?: Record; + result?: unknown; +} + +interface PermissionResponseArgs { + toolCallId: string; + optionId: string; + answers?: Record; + customInput?: string; + displayText: string; +} + +interface QuestionCardProps { + toolData: ToolData; + onSendPermissionResponse?: (args: PermissionResponseArgs) => void; +} + +function extractQuestions(args?: Record): QuestionItem[] { + if (!args) return []; + // Questions may be at top level or nested under input + const raw = + args.questions ?? (args.input as Record)?.questions; + if (!Array.isArray(raw)) return []; + return raw.filter( + (q): q is QuestionItem => + q != null && + typeof q === "object" && + typeof (q as QuestionItem).question === "string" && + Array.isArray((q as QuestionItem).options), + ); +} + +function extractAnswer(result: unknown): string | null { + if (typeof result === "string") return result; + if (result && typeof result === "object") { + const obj = result as Record; + if (typeof obj.answer === "string") return obj.answer; + if (typeof obj.answers === "object" && obj.answers) { + const answers = obj.answers as Record; + return Object.values(answers).join(", "); + } + if (typeof obj.text === "string") return obj.text; + if (typeof obj.content === "string") return obj.content; + } + return null; +} + +export function QuestionCard({ + toolData, + onSendPermissionResponse, +}: QuestionCardProps) { + const themeColors = useThemeColors(); + const questions = extractQuestions(toolData.args); + const isCompleted = + toolData.status === "completed" || toolData.status === "error"; + + if (questions.length === 0) { + return null; + } + + if (isCompleted) { + const answer = extractAnswer(toolData.result); + return ( + + + + + {questions[0]?.header ?? "Question"} + + + + + {questions[0]?.question} + + {answer && ( + + + + {answer} + + + )} + + + ); + } + + return ( + + ); +} + +function InteractiveQuestion({ + questions, + toolCallId, + onSendPermissionResponse, +}: { + questions: QuestionItem[]; + toolCallId: string; + onSendPermissionResponse?: (args: PermissionResponseArgs) => void; +}) { + const themeColors = useThemeColors(); + const [currentIndex, setCurrentIndex] = useState(0); + const [selectedOptions, setSelectedOptions] = useState< + Map> + >(new Map()); + const [otherTexts, setOtherTexts] = useState>(new Map()); + const [showOtherInput, setShowOtherInput] = useState>( + new Map(), + ); + + const question = questions[currentIndex]; + if (!question) return null; + + const isMultiSelect = question.multiSelect ?? false; + const isLastQuestion = currentIndex === questions.length - 1; + const selected = selectedOptions.get(currentIndex) ?? new Set(); + const otherText = otherTexts.get(currentIndex) ?? ""; + const isOtherShown = showOtherInput.get(currentIndex) ?? false; + const hasSelection = selected.size > 0 || otherText.trim().length > 0; + + const toggleOption = (label: string) => { + const newSelected = new Map(selectedOptions); + const current = new Set(selected); + + if (isMultiSelect) { + if (current.has(label)) { + current.delete(label); + } else { + current.add(label); + } + } else { + if (current.has(label)) { + current.clear(); + } else { + current.clear(); + current.add(label); + } + // Clear "Other" when selecting a preset option + const newOther = new Map(showOtherInput); + newOther.set(currentIndex, false); + setShowOtherInput(newOther); + const newTexts = new Map(otherTexts); + newTexts.set(currentIndex, ""); + setOtherTexts(newTexts); + } + + newSelected.set(currentIndex, current); + setSelectedOptions(newSelected); + }; + + const toggleOther = () => { + const newOther = new Map(showOtherInput); + const isNowShown = !isOtherShown; + newOther.set(currentIndex, isNowShown); + setShowOtherInput(newOther); + + if (!isMultiSelect && isNowShown) { + // Clear preset selections when choosing "Other" in single-select + const newSelected = new Map(selectedOptions); + newSelected.set(currentIndex, new Set()); + setSelectedOptions(newSelected); + } + }; + + const handleSubmit = () => { + const parts: string[] = []; + for (const label of selected) { + parts.push(label); + } + const trimmedOther = otherText.trim(); + if (trimmedOther) { + parts.push(trimmedOther); + } + const answer = parts.join(", "); + + if (!isLastQuestion) { + setCurrentIndex(currentIndex + 1); + return; + } + + if (!answer || !onSendPermissionResponse) return; + + // Derive the ACP optionId the agent is expecting. Options are built + // server-side (buildQuestionOptions in packages/agent) as + // `${OPTION_PREFIX}${idx}` where OPTION_PREFIX is "option_". If the + // user only typed into "Other", fall back to option_0 — the answers + // map carries the actual content for the agent. + const firstSelectedLabel = parts[0]; + const selectedIdx = question.options.findIndex( + (o) => o.label === firstSelectedLabel, + ); + const optionIdx = selectedIdx >= 0 ? selectedIdx : 0; + const optionId = `option_${optionIdx}`; + + onSendPermissionResponse({ + toolCallId, + optionId, + answers: { [question.question]: answer }, + customInput: trimmedOther || undefined, + displayText: answer, + }); + }; + + return ( + + {/* Header */} + + + + {question.header ?? "Question"} + + {questions.length > 1 && ( + + {currentIndex + 1}/{questions.length} + + )} + + + {/* Question text */} + + + {question.question} + + + + {/* Options */} + + {question.options.map((option) => { + const isSelected = selected.has(option.label); + return ( + toggleOption(option.label)} + className={`mb-1.5 rounded-lg border px-3 py-2.5 ${ + isSelected + ? "border-accent-8 bg-accent-3" + : "border-gray-6 bg-gray-3" + }`} + > + + {isMultiSelect ? ( + isSelected ? ( + + ) : ( + + ) + ) : isSelected ? ( + + ) : ( + + )} + + {option.label} + + + {option.description && ( + + {option.description} + + )} + + ); + })} + + {/* Other option */} + + + Other... + + + + {isOtherShown && ( + { + const newTexts = new Map(otherTexts); + newTexts.set(currentIndex, text); + setOtherTexts(newTexts); + }} + multiline + autoFocus + /> + )} + + + {/* Submit */} + + + + {isLastQuestion ? "Submit" : "Next"} + + + + + ); +} diff --git a/apps/mobile/src/features/tasks/components/SwipeableTaskItem.tsx b/apps/mobile/src/features/tasks/components/SwipeableTaskItem.tsx new file mode 100644 index 000000000..256fdbcb8 --- /dev/null +++ b/apps/mobile/src/features/tasks/components/SwipeableTaskItem.tsx @@ -0,0 +1,143 @@ +import { Archive, ArrowCounterClockwise } from "phosphor-react-native"; +import { useEffect, useRef } from "react"; +import { + Animated, + Easing, + LayoutAnimation, + PanResponder, + Text, + View, +} from "react-native"; +import { useThemeColors } from "@/lib/theme"; +import type { Task } from "../types"; +import { TaskItem } from "./TaskItem"; + +const SWIPE_THRESHOLD = 60; + +interface SwipeableTaskItemProps { + task: Task; + isArchived: boolean; + onPress: (task: Task) => void; + onArchive: (taskId: string) => void; + onUnarchive: (taskId: string) => void; + onSwipeStart?: () => void; + onSwipeEnd?: () => void; +} + +export function SwipeableTaskItem({ + task, + isArchived, + onPress, + onArchive, + onUnarchive, + onSwipeStart, + onSwipeEnd, +}: SwipeableTaskItemProps) { + const themeColors = useThemeColors(); + const translateX = useRef(new Animated.Value(0)).current; + const actionTriggeredRef = useRef(false); + + // Reset position when the item reappears (e.g. moved between sections) + useEffect(() => { + translateX.setValue(0); + actionTriggeredRef.current = false; + }, [isArchived, translateX]); + + const panResponder = useRef( + PanResponder.create({ + // Start tracking immediately on horizontal movement + onStartShouldSetPanResponder: () => false, + onMoveShouldSetPanResponder: (_, gesture) => + Math.abs(gesture.dx) > 5 && + Math.abs(gesture.dx) > Math.abs(gesture.dy) && + gesture.dx < 0, + // Capture before children so FlatList doesn't steal + onMoveShouldSetPanResponderCapture: (_, gesture) => + Math.abs(gesture.dx) > 8 && + Math.abs(gesture.dx) > Math.abs(gesture.dy * 1.2) && + gesture.dx < 0, + // Never let go once we have the gesture + onPanResponderTerminationRequest: () => false, + onShouldBlockNativeResponder: () => true, + onPanResponderGrant: () => { + actionTriggeredRef.current = false; + onSwipeStart?.(); + }, + onPanResponderMove: Animated.event( + [null, { dx: translateX }], + { + useNativeDriver: false, + listener: (_: unknown, gesture: { dx: number }) => { + // Clamp to left-only + if (gesture.dx > 0) translateX.setValue(0); + }, + }, + ), + onPanResponderRelease: (_, gesture) => { + onSwipeEnd?.(); + if (gesture.dx < -SWIPE_THRESHOLD && !actionTriggeredRef.current) { + actionTriggeredRef.current = true; + Animated.timing(translateX, { + toValue: -400, + duration: 150, + easing: Easing.in(Easing.ease), + useNativeDriver: true, + }).start(() => { + LayoutAnimation.configureNext( + LayoutAnimation.Presets.easeInEaseOut, + ); + if (isArchived) { + onUnarchive(task.id); + } else { + onArchive(task.id); + } + }); + } else { + Animated.spring(translateX, { + toValue: 0, + useNativeDriver: true, + tension: 40, + friction: 8, + }).start(); + } + }, + onPanResponderTerminate: () => { + onSwipeEnd?.(); + Animated.spring(translateX, { + toValue: 0, + useNativeDriver: true, + }).start(); + }, + }), + ).current; + + const actionBg = isArchived ? themeColors.accent[9] : themeColors.gray[8]; + const ActionIcon = isArchived ? ArrowCounterClockwise : Archive; + const actionLabel = isArchived ? "Restore" : "Archive"; + + return ( + + {/* Action revealed behind the row */} + + + + {actionLabel} + + + + {/* Sliding task row */} + + + + + ); +} diff --git a/apps/mobile/src/features/tasks/components/TaskItem.tsx b/apps/mobile/src/features/tasks/components/TaskItem.tsx index 6b92195d7..e067abda7 100644 --- a/apps/mobile/src/features/tasks/components/TaskItem.tsx +++ b/apps/mobile/src/features/tasks/components/TaskItem.tsx @@ -36,7 +36,7 @@ function TaskItemComponent({ task, onPress }: TaskItemProps) { const prUrl = task.latest_run?.output?.pr_url as string | undefined; const hasPR = !!prUrl; const status = hasPR ? "completed" : task.latest_run?.status || "backlog"; - const isCloudTask = task.latest_run?.environment === "cloud"; + const environment = task.latest_run?.environment; const statusColors = statusColorMap[status] || statusColorMap.backlog; @@ -56,10 +56,15 @@ function TaskItemComponent({ task, onPress }: TaskItemProps) { - {/* Cloud indicator */} - {isCloudTask && ( - - ☁️ + {/* Environment badge */} + {environment === "cloud" && ( + + Cloud + + )} + {environment === "local" && ( + + Local )} diff --git a/apps/mobile/src/features/tasks/components/TaskList.tsx b/apps/mobile/src/features/tasks/components/TaskList.tsx index 88e91a10f..fbcd8c9dc 100644 --- a/apps/mobile/src/features/tasks/components/TaskList.tsx +++ b/apps/mobile/src/features/tasks/components/TaskList.tsx @@ -1,5 +1,7 @@ import { Text } from "@components/text"; import * as WebBrowser from "expo-web-browser"; +import { CaretRight } from "phosphor-react-native"; +import { useMemo, useState } from "react"; import { ActivityIndicator, FlatList, @@ -11,8 +13,9 @@ import { useAuthStore } from "@/features/auth"; import { useThemeColors } from "@/lib/theme"; import { useIntegrations } from "../hooks/useIntegrations"; import { useTasks } from "../hooks/useTasks"; +import { useArchivedTasksStore } from "../stores/archivedTasksStore"; import type { Task } from "../types"; -import { TaskItem } from "./TaskItem"; +import { SwipeableTaskItem } from "./SwipeableTaskItem"; interface TaskListProps { onTaskPress?: (taskId: string) => void; @@ -108,11 +111,18 @@ function CreateTaskEmptyState({ onCreateTask }: CreateTaskEmptyStateProps) { ); } +type ListItem = + | { type: "task"; task: Task; isArchived: boolean } + | { type: "archived-header"; count: number; expanded: boolean }; + export function TaskList({ onTaskPress, onCreateTask }: TaskListProps) { const { tasks, isLoading, error, refetch } = useTasks(); const { hasGithubIntegration, refetch: refetchIntegrations } = useIntegrations(); const themeColors = useThemeColors(); + const { archivedTasks, archive, unarchive } = useArchivedTasksStore(); + const [archivedExpanded, setArchivedExpanded] = useState(false); + const [scrollEnabled, setScrollEnabled] = useState(true); const handleTaskPress = (task: Task) => { onTaskPress?.(task.id); @@ -122,6 +132,46 @@ export function TaskList({ onTaskPress, onCreateTask }: TaskListProps) { await Promise.all([refetch(), refetchIntegrations()]); }; + const listItems = useMemo((): ListItem[] => { + const active: Task[] = []; + const archived: Task[] = []; + + for (const task of tasks) { + if (task.id in archivedTasks) { + archived.push(task); + } else { + active.push(task); + } + } + + // Sort archived by FIFO (earliest archived first) + archived.sort( + (a, b) => (archivedTasks[a.id] ?? 0) - (archivedTasks[b.id] ?? 0), + ); + + const items: ListItem[] = active.map((task) => ({ + type: "task", + task, + isArchived: false, + })); + + if (archived.length > 0) { + items.push({ + type: "archived-header", + count: archived.length, + expanded: archivedExpanded, + }); + + if (archivedExpanded) { + for (const task of archived) { + items.push({ type: "task", task, isArchived: true }); + } + } + } + + return items; + }, [tasks, archivedTasks, archivedExpanded]); + if (error) { return ( @@ -160,14 +210,51 @@ export function TaskList({ onTaskPress, onCreateTask }: TaskListProps) { return ; } - // Has tasks - show the list (regardless of GitHub connection status) return ( item.id} - renderItem={({ item }) => ( - - )} + scrollEnabled={scrollEnabled} + data={listItems} + keyExtractor={(item) => + item.type === "archived-header" + ? "__archived_header__" + : `${item.task.id}-${item.isArchived ? "a" : "v"}` + } + renderItem={({ item }) => { + if (item.type === "archived-header") { + return ( + setArchivedExpanded(!item.expanded)} + className="flex-row items-center gap-2 border-gray-6 border-t bg-gray-2 px-3 py-2.5" + > + + + Archived + + {item.count} + + ); + } + + return ( + setScrollEnabled(false)} + onSwipeEnd={() => setScrollEnabled(true)} + /> + ); + }} refreshControl={ ; + customInput?: string; + displayText: string; +} interface TaskSessionViewProps { events: SessionEvent[]; - isPromptPending: boolean; + isConnecting?: boolean; + isThinking?: boolean; + terminalStatus?: "failed" | "completed"; + lastError?: string | null; + onRetry?: () => void; onOpenTask?: (taskId: string) => void; + onSendPermissionResponse?: (args: PermissionResponseArgs) => void; contentContainerStyle?: object; } @@ -22,13 +45,16 @@ interface ToolData { status: ToolStatus; args?: Record; result?: unknown; + isAgent?: boolean; + parentToolCallId?: string; } interface ParsedMessage { id: string; - type: "user" | "agent" | "tool"; + type: "user" | "agent" | "thought" | "tool" | "connecting" | "thinking"; content: string; toolData?: ToolData; + children?: ParsedMessage[]; } function mapToolStatus( @@ -48,11 +74,14 @@ function mapToolStatus( } } -function parseSessionNotification(notification: SessionNotification): { - type: "user" | "agent" | "tool" | "tool_update"; - content?: string; - toolData?: ToolData; -} | null { +type ParsedNotification = + | { type: "user" | "agent" | "agent_complete" | "thought"; content: string } + | { type: "tool" | "tool_update"; toolData: ToolData } + | { type: "plan"; entries: PlanEntry[] }; + +function parseSessionNotification( + notification: SessionNotification, +): ParsedNotification | null { const { update } = notification; if (!update?.sessionUpdate) { return null; @@ -70,7 +99,27 @@ function parseSessionNotification(notification: SessionNotification): { } return null; } + // `agent_message` is the aggregated final message emitted by the server + // once a response is complete. If we already received streaming chunks, + // this is a duplicate — replace pending text instead of appending. + case "agent_message": { + if (update.content?.type === "text") { + return { + type: "agent_complete" as const, + content: update.content.text, + }; + } + return null; + } + case "agent_thought_chunk": { + if (update.content?.type === "text") { + return { type: "thought", content: update.content.text }; + } + return null; + } case "tool_call": { + const meta = update._meta?.claudeCode; + const isAgent = meta?.toolName === "Agent" || meta?.toolName === "Task"; return { type: "tool", toolData: { @@ -78,10 +127,13 @@ function parseSessionNotification(notification: SessionNotification): { toolCallId: update.toolCallId ?? "", status: mapToolStatus(update.status), args: update.rawInput, + isAgent, + parentToolCallId: meta?.parentToolCallId, }, }; } case "tool_call_update": { + const meta = update._meta?.claudeCode; return { type: "tool_update", toolData: { @@ -90,31 +142,124 @@ function parseSessionNotification(notification: SessionNotification): { status: mapToolStatus(update.status), args: update.rawInput, result: update.rawOutput, + parentToolCallId: meta?.parentToolCallId, }, }; } + case "plan": { + if (Array.isArray(update.entries)) { + return { type: "plan", entries: update.entries }; + } + return null; + } default: return null; } } -function processEvents(events: SessionEvent[]): ParsedMessage[] { - const messages: ParsedMessage[] = []; - let pendingAgentText = ""; - let agentMessageCount = 0; - const toolMessages = new Map(); +interface ProcessedEvents { + messages: ParsedMessage[]; + plan: PlanEntry[] | null; +} + +function isQuestionTool(toolData?: ToolData): boolean { + if (!toolData) return false; + if (toolData.toolName.toLowerCase().includes("question")) return true; + if (Array.isArray(toolData.args?.questions)) return true; + return false; +} + +// Mutable processor state persisted across renders via useRef. +// Only new events (past processedIdx) are processed on each call. +interface EventProcessorState { + messages: ParsedMessage[]; + plan: PlanEntry[] | null; + pendingAgentText: string; + pendingThoughtText: string; + lastAgentMsgIdx: number | null; + agentMessageCount: number; + thoughtMessageCount: number; + userMessageCount: number; + toolMessages: Map; + // Maps agent toolCallId → agent ParsedMessage for nesting children + agentTools: Map; + processedIdx: number; + // Snapshot tracking: only create a new array ref when messages grow. + // Mutations (tool_update, agent_complete replacing content) reuse the + // same snapshot so FlatList doesn't re-layout and reset scroll position. + lastSnapshot: ParsedMessage[]; + lastSnapshotLength: number; +} + +function createProcessorState(): EventProcessorState { + return { + messages: [], + plan: null, + pendingAgentText: "", + pendingThoughtText: "", + lastAgentMsgIdx: null, + agentMessageCount: 0, + thoughtMessageCount: 0, + userMessageCount: 0, + toolMessages: new Map(), + agentTools: new Map(), + processedIdx: 0, + lastSnapshot: [], + lastSnapshotLength: 0, + }; +} + +function processNewEvents( + state: EventProcessorState, + events: SessionEvent[], +): ProcessedEvents { + // If events shrank (e.g. session reset), start fresh + if (events.length < state.processedIdx) { + Object.assign(state, createProcessorState()); + } + + // Nothing new to process + if (events.length === state.processedIdx) { + return { messages: state.messages, plan: state.plan }; + } const flushAgentText = () => { - if (!pendingAgentText) return; - messages.push({ - id: `agent-${agentMessageCount++}`, + if (!state.pendingAgentText) return; + const msg: ParsedMessage = { + id: `agent-${state.agentMessageCount++}`, type: "agent", - content: pendingAgentText, - }); - pendingAgentText = ""; + content: state.pendingAgentText, + }; + state.messages.push(msg); + state.lastAgentMsgIdx = state.messages.length - 1; + state.pendingAgentText = ""; + }; + + const flushThoughtText = () => { + if (!state.pendingThoughtText) return; + // Merge consecutive thoughts into one message instead of many rows + const lastMsg = state.messages[state.messages.length - 1]; + if (lastMsg?.type === "thought") { + lastMsg.content += state.pendingThoughtText; + } else { + state.messages.push({ + id: `thought-${state.thoughtMessageCount++}`, + type: "thought", + content: state.pendingThoughtText, + }); + } + state.pendingThoughtText = ""; + }; + + const flushPending = () => { + flushThoughtText(); + flushAgentText(); }; - for (const event of events) { + let hasItemMutation = false; + + for (let i = state.processedIdx; i < events.length; i++) { + const event = events[i]; if (event.type !== "session_update") continue; const parsed = parseSessionNotification(event.notification); @@ -122,53 +267,417 @@ function processEvents(events: SessionEvent[]): ParsedMessage[] { switch (parsed.type) { case "user": - flushAgentText(); - messages.push({ - id: `user-${event.ts}`, + flushPending(); + state.messages.push({ + id: `user-${state.userMessageCount++}`, type: "user", content: parsed.content ?? "", }); + state.lastAgentMsgIdx = null; break; case "agent": - pendingAgentText += parsed.content ?? ""; + flushThoughtText(); + state.pendingAgentText += parsed.content ?? ""; break; - case "tool": + case "agent_complete": + flushThoughtText(); + // If we already flushed an agent message from chunks, replace it + if ( + state.lastAgentMsgIdx !== null && + state.messages[state.lastAgentMsgIdx]?.type === "agent" + ) { + state.messages[state.lastAgentMsgIdx].content = parsed.content ?? ""; + state.pendingAgentText = ""; + } else { + state.pendingAgentText = parsed.content ?? ""; + } + break; + case "thought": flushAgentText(); + state.pendingThoughtText += parsed.content ?? ""; + break; + case "plan": + state.plan = parsed.entries; + break; + case "tool": + flushPending(); if (parsed.toolData) { - const msg: ParsedMessage = { - id: `tool-${parsed.toolData.toolCallId}`, - type: "tool", - content: "", - toolData: parsed.toolData, - }; - toolMessages.set(parsed.toolData.toolCallId, msg); - messages.push(msg); + const existing = state.toolMessages.get(parsed.toolData.toolCallId); + if (existing?.toolData) { + existing.toolData = { + ...existing.toolData, + ...parsed.toolData, + }; + } else { + const msg: ParsedMessage = { + id: `tool-${parsed.toolData.toolCallId}`, + type: "tool", + content: "", + toolData: parsed.toolData, + children: parsed.toolData.isAgent ? [] : undefined, + }; + state.toolMessages.set(parsed.toolData.toolCallId, msg); + + // Agent tools: register for child nesting + if (parsed.toolData.isAgent) { + state.agentTools.set(parsed.toolData.toolCallId, msg); + } + + // Child tools: nest under parent agent instead of top-level + const parentId = parsed.toolData.parentToolCallId; + const parent = parentId + ? state.agentTools.get(parentId) + : undefined; + if (parent?.children) { + parent.children.push(msg); + hasItemMutation = true; + } else { + state.messages.push(msg); + } + } } + state.lastAgentMsgIdx = null; break; case "tool_update": if (parsed.toolData) { - const existing = toolMessages.get(parsed.toolData.toolCallId); + const existing = state.toolMessages.get(parsed.toolData.toolCallId); if (existing?.toolData) { existing.toolData.status = parsed.toolData.status; existing.toolData.result = parsed.toolData.result; + if (parsed.toolData.args) { + existing.toolData.args = parsed.toolData.args; + } + hasItemMutation = true; } } break; } } - flushAgentText(); - return messages; + flushPending(); + state.processedIdx = events.length; + + // Create a new array reference when messages were added or when a tool + // received args for the first time (so the diff view can render). + // Pure status/text mutations reuse the prior snapshot to avoid jumps. + if (state.messages.length !== state.lastSnapshotLength || hasItemMutation) { + state.lastSnapshot = [...state.messages]; + state.lastSnapshotLength = state.messages.length; + } + + return { messages: state.lastSnapshot, plan: state.plan }; +} + +function CollapsedThought({ content }: { content: string }) { + const themeColors = useThemeColors(); + const [expanded, setExpanded] = useState(false); + + return ( + setExpanded(!expanded)} className="px-4 py-0.5"> + + + Thought + + {expanded && ( + + {content} + + )} + + ); +} + +// Detect objects like {"0":"E","1":"r","2":"r",...,"isError":true} — a string +// serialized as char-per-key (possibly with extra metadata keys mixed in). +function tryReassembleString(obj: Record): string | null { + const numericKeys = Object.keys(obj).filter((k) => /^\d+$/.test(k)); + if (numericKeys.length < 3) return null; + if ( + numericKeys.every( + (k) => typeof obj[k] === "string" && (obj[k] as string).length === 1, + ) + ) { + return numericKeys + .sort((a, b) => Number(a) - Number(b)) + .map((k) => obj[k]) + .join(""); + } + return null; +} + +function extractErrorText(result: unknown): string | null { + if (typeof result === "string") return result; + if (Array.isArray(result)) { + const texts = result.map(extractErrorText).filter(Boolean); + return texts.length > 0 ? texts.join("\n") : null; + } + if (!result || typeof result !== "object") return null; + const obj = result as Record; + + // Reassemble char-per-key strings: {"0":"E","1":"r",...} + const reassembled = tryReassembleString(obj); + if (reassembled) return reassembled; + + // Check simple string fields, recurse into nested objects + for (const key of [ + "error", + "message", + "stderr", + "output", + "text", + "content", + ]) { + if (typeof obj[key] === "string") return obj[key] as string; + if (obj[key] && typeof obj[key] === "object") { + const nested = extractErrorText(obj[key]); + if (nested) return nested; + } + } + + // Last resort: stringify the result so *something* shows + try { + const str = JSON.stringify(result, null, 2); + if (str && str !== "{}") return str; + } catch { + // ignore + } + + return null; +} + +function agentPromptSummary(args?: Record): string | null { + if (!args) return null; + const prompt = + typeof args.prompt === "string" + ? args.prompt + : typeof args.description === "string" + ? args.description + : null; + if (!prompt) return null; + // Take the first meaningful line, truncated + const firstLine = prompt + .split("\n") + .find((l) => l.trim()) + ?.trim(); + if (!firstLine) return null; + return firstLine.length > 120 ? `${firstLine.slice(0, 120)}…` : firstLine; +} + +function AgentToolCard({ + item, + onOpenTask, +}: { + item: ParsedMessage; + onOpenTask?: (taskId: string) => void; +}) { + const themeColors = useThemeColors(); + const [expanded, setExpanded] = useState(false); + const toolData = item.toolData; + const children = item.children ?? []; + if (!toolData) return null; + + const isLoading = + toolData.status === "pending" || toolData.status === "running"; + const isFailed = toolData.status === "error"; + const childCount = children.length; + const subtitle = agentPromptSummary(toolData.args); + const errorText = isFailed ? extractErrorText(toolData.result) : null; + + return ( + + {/* Header */} + setExpanded(!expanded)} className="px-3 py-2"> + + {isLoading ? ( + + ) : ( + + )} + + {toolData.toolName} + + {childCount > 0 && ( + + {childCount} {childCount === 1 ? "tool" : "tools"} + + )} + {isFailed && ( + + Failed + + )} + + + {subtitle && ( + + {subtitle} + + )} + + + {/* Error message + nested tool calls */} + {expanded && ( + + {errorText && ( + + + {errorText} + + + )} + {children.map((child) => { + if (!child.toolData) return null; + return ( + + ); + })} + + )} + + ); +} + +function formatElapsed(seconds: number): string { + const m = Math.floor(seconds / 60); + const s = seconds % 60; + return m > 0 ? `${m}m ${s}s` : `${s}s`; +} + +function useElapsedTimer() { + const [elapsed, setElapsed] = useState(0); + useEffect(() => { + setElapsed(0); + const interval = setInterval(() => { + setElapsed((e) => e + 1); + }, 1000); + return () => clearInterval(interval); + }, []); + return elapsed; +} + +function ThinkingIndicator() { + const [dots, setDots] = useState(1); + const elapsed = useElapsedTimer(); + + useEffect(() => { + const interval = setInterval(() => { + setDots((d) => (d % 3) + 1); + }, 500); + return () => clearInterval(interval); + }, []); + + return ( + + + Thinking{".".repeat(dots)} + + {formatElapsed(elapsed)} + + ); +} + +function ConnectingIndicator() { + const [dots, setDots] = useState(1); + const elapsed = useElapsedTimer(); + + useEffect(() => { + const interval = setInterval(() => { + setDots((d) => (d % 3) + 1); + }, 500); + return () => clearInterval(interval); + }, []); + + return ( + + + Connecting{".".repeat(dots)} + + {formatElapsed(elapsed)} + + ); } export function TaskSessionView({ events, - isPromptPending, + isConnecting, + isThinking, + terminalStatus, + lastError, + onRetry, onOpenTask, + onSendPermissionResponse, contentContainerStyle, }: TaskSessionViewProps) { - const messages = useMemo(() => processEvents(events), [events]); + const processorRef = useRef(createProcessorState()); + const prevEventsRef = useRef(events); + // Reset processor when events array shrinks or changes identity completely + // (e.g., navigating between tasks while Expo Router reuses the component). + if ( + events.length === 0 || + (events !== prevEventsRef.current && events[0] !== prevEventsRef.current[0]) + ) { + processorRef.current = createProcessorState(); + } + prevEventsRef.current = events; + const { messages, plan } = useMemo( + () => processNewEvents(processorRef.current, events), + [events], + ); + // Inverted FlatList renders data[0] at the visual bottom. + // Reverse so newest messages are at index 0 = bottom. + const reversedMessages = useMemo(() => [...messages].reverse(), [messages]); const themeColors = useThemeColors(); + const flatListRef = useRef(null); + const buttonRef = useRef(null); + const isScrolledRef = useRef(false); + + const scrollToBottom = useCallback(() => { + flatListRef.current?.scrollToOffset({ offset: 0, animated: true }); + }, []); + + const handleScroll = useCallback( + (e: { nativeEvent: { contentOffset: { y: number } } }) => { + const scrolled = e.nativeEvent.contentOffset.y > 0; + if (scrolled !== isScrolledRef.current) { + isScrolledRef.current = scrolled; + buttonRef.current?.setNativeProps({ + style: { + opacity: scrolled ? 1 : 0, + pointerEvents: scrolled ? "auto" : "none", + }, + }); + } + }, + [], + ); const renderMessage = useCallback( ({ item }: { item: ParsedMessage }) => { @@ -179,46 +688,132 @@ export function TaskSessionView({ return ( ); + case "thought": + return ; case "tool": - return item.toolData ? ( + if (!item.toolData) return null; + if (isQuestionTool(item.toolData)) { + return ( + + ); + } + if (item.toolData.isAgent) { + return ; + } + return ( - ) : null; + ); default: return null; } }, - [onOpenTask], + [onOpenTask, onSendPermissionResponse], ); return ( - item.id} - inverted - contentContainerStyle={{ - flexDirection: "column-reverse", - ...contentContainerStyle, - }} - keyboardDismissMode="interactive" - keyboardShouldPersistTaps="handled" - showsVerticalScrollIndicator={false} - ListHeaderComponent={ - isPromptPending ? ( - - - - Thinking... - - - ) : null - } - /> + + + item.id} + inverted + contentContainerStyle={contentContainerStyle} + keyboardDismissMode="interactive" + keyboardShouldPersistTaps="handled" + showsVerticalScrollIndicator + onScroll={handleScroll} + scrollEventThrottle={100} + maxToRenderPerBatch={15} + windowSize={21} + initialNumToRender={30} + ListHeaderComponent={ + terminalStatus ? ( + + + {terminalStatus === "failed" ? "Run failed" : "Run completed"} + + {lastError && ( + {lastError} + )} + {onRetry && ( + + + {terminalStatus === "failed" ? "Retry" : "Continue"} + + + )} + + ) : null + } + /> + {/* Thinking/connecting indicators absolutely positioned above the Composer area. + Rendered outside FlatList to avoid inverted-list double-mount bugs. */} + {(isConnecting || isThinking) && ( + + {isConnecting ? ( + + ) : isThinking ? ( + + ) : null} + + )} + + + + + + ); } diff --git a/apps/mobile/src/features/tasks/hooks/useTasks.ts b/apps/mobile/src/features/tasks/hooks/useTasks.ts index c0bb1f812..63727f1d8 100644 --- a/apps/mobile/src/features/tasks/hooks/useTasks.ts +++ b/apps/mobile/src/features/tasks/hooks/useTasks.ts @@ -14,7 +14,7 @@ import type { CreateTaskOptions, Task } from "../types"; export const taskKeys = { all: ["tasks"] as const, lists: () => [...taskKeys.all, "list"] as const, - list: (filters?: { repository?: string; createdBy?: number }) => + list: (filters?: { repository?: string }) => [...taskKeys.lists(), filters] as const, details: () => [...taskKeys.all, "detail"] as const, detail: (id: string) => [...taskKeys.details(), id] as const, @@ -27,7 +27,6 @@ export function useTasks(filters?: { repository?: string }) { const queryFilters = { ...filters, - createdBy: currentUser?.id, }; const query = useQuery({ @@ -64,23 +63,15 @@ export function useTask(taskId: string) { export function useCreateTask() { const queryClient = useQueryClient(); - const { data: currentUser } = useUserQuery(); - const invalidateTasks = (newTask?: Task) => { - if (newTask && currentUser?.id) { - // Update the correct cache entry with the user's filter - const queryKey = taskKeys.list({ createdBy: currentUser.id }); - queryClient.setQueryData(queryKey, (old) => - old ? [newTask, ...old] : [newTask], - ); - } + const invalidateTasks = () => { queryClient.invalidateQueries({ queryKey: taskKeys.lists() }); }; const mutation = useMutation({ mutationFn: (options: CreateTaskOptions) => createTask(options), - onSuccess: (newTask) => { - invalidateTasks(newTask); + onSuccess: () => { + invalidateTasks(); }, onError: (error) => { console.error("Failed to create task:", error.message); diff --git a/apps/mobile/src/features/tasks/stores/archivedTasksStore.ts b/apps/mobile/src/features/tasks/stores/archivedTasksStore.ts new file mode 100644 index 000000000..8a2def9f4 --- /dev/null +++ b/apps/mobile/src/features/tasks/stores/archivedTasksStore.ts @@ -0,0 +1,40 @@ +import AsyncStorage from "@react-native-async-storage/async-storage"; +import { create } from "zustand"; +import { persist, createJSONStorage } from "zustand/middleware"; + +interface ArchivedTasksState { + // taskId → timestamp (ms) for FIFO ordering + archivedTasks: Record; + archive: (taskId: string) => void; + unarchive: (taskId: string) => void; + isArchived: (taskId: string) => boolean; +} + +export const useArchivedTasksStore = create()( + persist( + (set, get) => ({ + archivedTasks: {}, + + archive: (taskId: string) => + set((state) => ({ + archivedTasks: { + ...state.archivedTasks, + [taskId]: Date.now(), + }, + })), + + unarchive: (taskId: string) => + set((state) => { + const { [taskId]: _, ...rest } = state.archivedTasks; + return { archivedTasks: rest }; + }), + + isArchived: (taskId: string) => taskId in get().archivedTasks, + }), + { + name: "archived-tasks", + storage: createJSONStorage(() => AsyncStorage), + partialize: (state) => ({ archivedTasks: state.archivedTasks }), + }, + ), +); diff --git a/apps/mobile/src/features/tasks/stores/taskSessionStore.ts b/apps/mobile/src/features/tasks/stores/taskSessionStore.ts index a11ac1383..994ec81d0 100644 --- a/apps/mobile/src/features/tasks/stores/taskSessionStore.ts +++ b/apps/mobile/src/features/tasks/stores/taskSessionStore.ts @@ -1,6 +1,14 @@ import { create } from "zustand"; +import { usePreferencesStore } from "@/features/preferences/stores/preferencesStore"; import { logger } from "@/lib/logger"; -import { appendTaskRunLog, fetchS3Logs, runTaskInCloud } from "../api"; +import { + CloudCommandError, + fetchS3Logs, + getTask, + getTaskRun, + runTaskInCloud, + sendCloudCommand, +} from "../api"; import type { SessionEvent, SessionNotification, @@ -11,6 +19,46 @@ import { convertRawEntriesToEvents, parseSessionLogs, } from "../utils/parseSessionLogs"; +import { playMeepSound } from "../utils/sounds"; + +// Infer whether the agent is actively working or idle (waiting for user input). +// Primary signal: _posthog/turn_complete or _posthog/task_complete in raw log +// entries. Fallback: session update notification heuristic for older logs. +function inferAgentIsIdle( + rawEntries: StoredLogEntry[], + notifications: SessionNotification[], +): boolean { + // Check raw entries for explicit turn/task completion signals + for (let i = rawEntries.length - 1; i >= 0; i--) { + const method = rawEntries[i].notification?.method; + if ( + method === "_posthog/turn_complete" || + method === "_posthog/task_complete" + ) { + return true; + } + // If we hit a client-direction entry (user message), the agent hasn't + // completed a turn since the last user input. + if (rawEntries[i].direction === "client") break; + } + + // Fallback: check session update notifications for agent responses + for (let i = notifications.length - 1; i >= 0; i--) { + const su = notifications[i].update?.sessionUpdate; + if (su === "agent_message" || su === "agent_message_chunk") { + return true; + } + if ( + su === "user_message_chunk" || + su === "tool_call" || + su === "tool_call_update" || + su === "agent_thought_chunk" + ) { + return false; + } + } + return false; +} const CLOUD_POLLING_INTERVAL_MS = 500; @@ -23,6 +71,20 @@ export interface TaskSession { logUrl: string; processedLineCount: number; processedHashes?: Set; + // Content of user prompts echoed locally (before the agent writes them to + // the log). Used by polling to dedup the canonical copy against the echo. + localUserEchoes?: Set; + // Terminal backend status for this run, populated by the status-check + // poller so the UI can surface "Run failed" / "Run completed". + terminalStatus?: "failed" | "completed"; + lastError?: string | null; + // True when the user initiated work (new task, sendPrompt, resume) and + // we should play a sound when control returns. False when reconnecting + // to an already-running task to avoid spurious pings. + awaitingPing?: boolean; + // Messages queued while the agent is working. Auto-sent when control + // returns (isPromptPending flips to false). + messageQueue?: string[]; } interface TaskSessionStore { @@ -31,16 +93,42 @@ interface TaskSessionStore { connectToTask: (task: Task) => Promise; disconnectFromTask: (taskId: string) => void; sendPrompt: (taskId: string, prompt: string) => Promise; + sendPermissionResponse: ( + taskId: string, + args: { + toolCallId: string; + optionId: string; + answers?: Record; + customInput?: string; + displayText: string; + }, + ) => Promise; cancelPrompt: (taskId: string) => Promise; getSessionForTask: (taskId: string) => TaskSession | undefined; _handleEvent: (taskRunId: string, event: SessionEvent) => void; _startCloudPolling: (taskRunId: string, logUrl: string) => void; _stopCloudPolling: (taskRunId: string) => void; + _resumeCloudRun: ( + taskId: string, + previousRunId: string, + prompt: string, + ) => Promise; } const cloudPollers = new Map>(); const connectAttempts = new Set(); +// Guard against overlapping poll ticks — if a fetch takes >500ms, the next +// interval fires while the previous is still running, causing both to read +// the same processedLineCount and produce duplicate events. +const pollInFlight = new Set(); +// Timestamps for when each poll tick started — used to force-clear stuck ticks. +const pollInFlightSince = new Map(); +const POLL_IN_FLIGHT_TIMEOUT_MS = 30_000; +// Tick counts per task run used to throttle backend task-run status polling. +const pollTicks = new Map(); +// How many S3 polling ticks between each backend task-run status check. +const STATUS_CHECK_TICK_INTERVAL = 5; export const useTaskSessionStore = create((set, get) => ({ sessions: {}, @@ -49,7 +137,7 @@ export const useTaskSessionStore = create((set, get) => ({ const taskId = task.id; const latestRunId = task.latest_run?.id; const latestRunLogUrl = task.latest_run?.log_url; - const taskDescription = task.description; + const _taskDescription = task.description; if (connectAttempts.has(taskId)) { logger.debug("Connection already in progress", { taskId }); @@ -82,24 +170,12 @@ export const useTaskSessionStore = create((set, get) => ({ [newRunId]: { taskRunId: newRunId, taskId, - events: taskDescription - ? [ - { - type: "session_update" as const, - ts: Date.now(), - notification: { - update: { - sessionUpdate: "user_message_chunk", - content: { type: "text", text: taskDescription }, - }, - }, - }, - ] - : [], + events: [], status: "connected", - isPromptPending: true, // Agent is processing initial task + isPromptPending: true, logUrl: newLogUrl, processedLineCount: 0, + awaitingPing: true, }, }, })); @@ -121,29 +197,26 @@ export const useTaskSessionStore = create((set, get) => ({ logger.debug("Loaded cloud historical logs", { notifications: notifications.length, rawEntries: rawEntries.length, + backendStatus: task.latest_run?.status, }); const historicalEvents = convertRawEntriesToEvents( rawEntries, notifications, - taskDescription, ); - // Check if agent is still processing by looking at the last entry - // If the last non-client entry is a user message, agent is likely still working - const lastAgentEntry = [...rawEntries] - .reverse() - .find((e) => e.direction !== "client"); - // biome-ignore lint/suspicious/noExplicitAny: Entry structure varies - const lastUpdate = (lastAgentEntry?.notification as any)?.params?.update - ?.sessionUpdate; - const isAgentResponding = - lastUpdate === "agent_message_chunk" || - lastUpdate === "agent_thought_chunk" || - lastUpdate === "tool_call" || - lastUpdate === "tool_call_update"; - // If we have entries but the last one isn't an agent response, agent may still be processing - const isPromptPending = rawEntries.length > 0 && !isAgentResponding; + // Terminal runs (completed/failed) always clear isPromptPending. + // For non-terminal runs we infer idle vs working from the log shape + // because the backend has no "waiting_for_input" status. + const backendStatus = task.latest_run?.status; + const isTerminal = + backendStatus === "completed" || backendStatus === "failed"; + const terminalStatus: "completed" | "failed" | undefined = isTerminal + ? (backendStatus as "completed" | "failed") + : undefined; + const lastError = isTerminal + ? (task.latest_run?.error_message ?? null) + : null; set((state) => ({ sessions: { @@ -153,15 +226,24 @@ export const useTaskSessionStore = create((set, get) => ({ taskId, events: historicalEvents, status: "connected", - isPromptPending, + isPromptPending: isTerminal + ? false + : !inferAgentIsIdle(rawEntries, notifications), logUrl: latestRunLogUrl, processedLineCount: rawEntries.length, + terminalStatus, + lastError, }, }, })); get()._startCloudPolling(latestRunId, latestRunLogUrl); - logger.debug("Connected to cloud session", { taskId, latestRunId }); + logger.debug("Connected to cloud session", { + taskId, + latestRunId, + backendStatus, + isTerminal, + }); } catch (error) { logger.error("Failed to connect to task", error); } finally { @@ -188,67 +270,191 @@ export const useTaskSessionStore = create((set, get) => ({ throw new Error("No active session for task"); } - const notification: StoredLogEntry = { - type: "notification", - timestamp: new Date().toISOString(), - direction: "client", - notification: { - method: "session/update", - params: { - update: { - sessionUpdate: "user_message_chunk", - content: { type: "text", text: prompt }, + // If the agent is still working, queue the message for later. + if (session.isPromptPending) { + logger.debug("Agent busy, queuing message", { taskId }); + set((state) => { + const current = state.sessions[session.taskRunId]; + if (!current) return state; + return { + sessions: { + ...state.sessions, + [session.taskRunId]: { + ...current, + messageQueue: [...(current.messageQueue ?? []), prompt], + }, }, + }; + }); + return; + } + + // Local echo for immediate UX feedback — polling will re-surface the + // canonical copy once the agent writes it to the log; any duplicate is + // removed by content-based dedup in the polling loop below. + const ts = Date.now(); + const userEvent: SessionEvent = { + type: "session_update", + ts, + notification: { + update: { + sessionUpdate: "user_message_chunk", + content: { type: "text", text: prompt }, }, }, }; - await appendTaskRunLog(taskId, session.taskRunId, [notification]); - logger.debug("Sent cloud message via S3", { - taskId, - runId: session.taskRunId, + set((state) => { + const current = state.sessions[session.taskRunId]; + const nextLocalEchoes = new Set(current.localUserEchoes ?? []); + nextLocalEchoes.add(prompt); + return { + sessions: { + ...state.sessions, + [session.taskRunId]: { + ...current, + events: [...current.events, userEvent], + localUserEchoes: nextLocalEchoes, + isPromptPending: true, + awaitingPing: true, + }, + }, + }; }); + try { + await sendCloudCommand(taskId, session.taskRunId, "user_message", { + content: prompt, + }); + logger.debug("Sent cloud command user_message", { + taskId, + runId: session.taskRunId, + }); + } catch (err) { + // Sandbox for this run has shut down — create a resume run on the + // backend and swap the local session to the new run id. + let rollbackError: unknown = err; + if (err instanceof CloudCommandError && err.isSandboxInactive()) { + logger.info("Sandbox inactive, creating resume run", { + taskId, + previousRunId: session.taskRunId, + }); + try { + await get()._resumeCloudRun(taskId, session.taskRunId, prompt); + return; + } catch (resumeErr) { + logger.error("Failed to resume cloud run", resumeErr); + rollbackError = resumeErr; + } + } + + // Roll back the local echo + pending state so the user can retry. + set((state) => { + const current = state.sessions[session.taskRunId]; + if (!current) return state; + const nextLocalEchoes = new Set(current.localUserEchoes ?? []); + nextLocalEchoes.delete(prompt); + return { + sessions: { + ...state.sessions, + [session.taskRunId]: { + ...current, + events: current.events.filter((e) => e !== userEvent), + localUserEchoes: nextLocalEchoes, + isPromptPending: false, + }, + }, + }; + }); + throw rollbackError; + } + }, + + // Resolve an outstanding requestPermission on the desktop/agent side + // (e.g. AskUserQuestion). Unlike sendPrompt, this never queues — a + // permission reply only makes sense while the agent is paused inside + // requestPermission, and it completes an existing turn rather than + // starting a new one. + sendPermissionResponse: async (taskId, args) => { + const session = get().getSessionForTask(taskId); + if (!session) { + throw new Error("No active session for task"); + } + const ts = Date.now(); const userEvent: SessionEvent = { type: "session_update", ts, - notification: notification.notification?.params as SessionNotification, + notification: { + update: { + sessionUpdate: "user_message_chunk", + content: { type: "text", text: args.displayText }, + }, + }, }; - set((state) => ({ - sessions: { - ...state.sessions, - [session.taskRunId]: { - ...state.sessions[session.taskRunId], - events: [...state.sessions[session.taskRunId].events, userEvent], - processedLineCount: - (state.sessions[session.taskRunId].processedLineCount ?? 0) + 1, - isPromptPending: true, + set((state) => { + const current = state.sessions[session.taskRunId]; + if (!current) return state; + const nextLocalEchoes = new Set(current.localUserEchoes ?? []); + nextLocalEchoes.add(args.displayText); + return { + sessions: { + ...state.sessions, + [session.taskRunId]: { + ...current, + events: [...current.events, userEvent], + localUserEchoes: nextLocalEchoes, + isPromptPending: true, + awaitingPing: true, + }, }, - }, - })); + }; + }); + + try { + await sendCloudCommand(taskId, session.taskRunId, "permission_response", { + toolCallId: args.toolCallId, + optionId: args.optionId, + ...(args.answers ? { answers: args.answers } : {}), + ...(args.customInput ? { customInput: args.customInput } : {}), + }); + logger.debug("Sent permission_response", { + taskId, + runId: session.taskRunId, + toolCallId: args.toolCallId, + }); + } catch (err) { + logger.error("Failed to send permission_response", err); + // Roll back the optimistic state so the UI reflects reality. + set((state) => { + const current = state.sessions[session.taskRunId]; + if (!current) return state; + const nextLocalEchoes = new Set(current.localUserEchoes ?? []); + nextLocalEchoes.delete(args.displayText); + return { + sessions: { + ...state.sessions, + [session.taskRunId]: { + ...current, + events: current.events.filter((e) => e !== userEvent), + localUserEchoes: nextLocalEchoes, + isPromptPending: false, + }, + }, + }; + }); + throw err; + } }, cancelPrompt: async (taskId: string) => { const session = get().getSessionForTask(taskId); if (!session) return false; - const cancelNotification: StoredLogEntry = { - type: "notification", - timestamp: new Date().toISOString(), - direction: "client", - notification: { - method: "session/cancel", - params: { - sessionId: session.taskRunId, - }, - }, - }; - try { - await appendTaskRunLog(taskId, session.taskRunId, [cancelNotification]); - logger.debug("Sent cancel request via S3", { + await sendCloudCommand(taskId, session.taskRunId, "cancel"); + logger.debug("Sent cancel command", { taskId, runId: session.taskRunId, }); @@ -295,6 +501,17 @@ export const useTaskSessionStore = create((set, get) => ({ logger.debug("Starting cloud S3 polling", { taskRunId }); const pollS3 = async () => { + // Skip if previous tick is still in flight — but force-clear if stuck + if (pollInFlight.has(taskRunId)) { + const startedAt = pollInFlightSince.get(taskRunId) ?? 0; + if (Date.now() - startedAt < POLL_IN_FLIGHT_TIMEOUT_MS) return; + logger.warn("Force-clearing stuck pollInFlight", { taskRunId }); + pollInFlight.delete(taskRunId); + pollInFlightSince.delete(taskRunId); + } + pollInFlight.add(taskRunId); + pollInFlightSince.set(taskRunId, Date.now()); + try { const session = get().sessions[taskRunId]; if (!session) { @@ -302,6 +519,56 @@ export const useTaskSessionStore = create((set, get) => ({ return; } + // Check backend status periodically, or every tick while the agent + // is pending (so "Thinking..." clears promptly when the run finishes). + const tick = (pollTicks.get(taskRunId) ?? 0) + 1; + pollTicks.set(taskRunId, tick); + const shouldCheckStatus = + session.isPromptPending || tick % STATUS_CHECK_TICK_INTERVAL === 0; + if (shouldCheckStatus) { + try { + const run = await getTaskRun(session.taskId, taskRunId); + logger.debug("Status check", { + taskRunId, + status: run.status, + error: run.error_message, + }); + if (run.status === "failed" || run.status === "completed") { + logger.debug("Backend run reached terminal status", { + taskRunId, + status: run.status, + error: run.error_message, + }); + const shouldPing = + get().sessions[taskRunId]?.awaitingPing ?? false; + set((state) => { + const current = state.sessions[taskRunId]; + if (!current) return state; + return { + sessions: { + ...state.sessions, + [taskRunId]: { + ...current, + isPromptPending: false, + terminalStatus: run.status as "failed" | "completed", + lastError: run.error_message, + awaitingPing: false, + messageQueue: undefined, + }, + }, + }; + }); + if (shouldPing && usePreferencesStore.getState().pingsEnabled) { + playMeepSound().catch(() => {}); + } + } + } catch (statusErr) { + logger.warn("Failed to fetch task run status", { + error: statusErr, + }); + } + } + const text = await fetchS3Logs(logUrl); if (!text) return; @@ -310,9 +577,20 @@ export const useTaskSessionStore = create((set, get) => ({ if (lines.length > processedCount) { const newLines = lines.slice(processedCount); + logger.debug("Poll picked up new log lines", { + taskRunId, + newLineCount: newLines.length, + totalLines: lines.length, + }); const currentHashes = new Set(session.processedHashes ?? []); - + const remainingLocalEchoes = new Set(session.localUserEchoes ?? []); + // Collect all new events in a batch, then do a single store + // update. This prevents N re-renders per poll tick. + const batchedEvents: SessionEvent[] = []; let receivedAgentMessage = false; + // Track when a user_message_chunk arrives that wasn't sent from + // this device — means someone prompted from the desktop app. + let receivedExternalUserMessage = false; for (const line of newLines) { try { @@ -321,45 +599,83 @@ export const useTaskSessionStore = create((set, get) => ({ ? new Date(entry.timestamp).getTime() : Date.now(); - const hash = `${entry.timestamp ?? ""}-${entry.notification?.method ?? ""}-${entry.direction ?? ""}`; + // Build a dedup hash specific enough to distinguish different + // events at the same timestamp. For session/update entries, + // include the update type, toolCallId, and status so that a + // tool_call and its tool_call_update don't collide. + const params = entry.notification?.params; + const suDetail = params?.update + ? `-${params.update.sessionUpdate ?? ""}-${params.update.toolCallId ?? ""}-${params.update.status ?? ""}` + : `-${entry.direction ?? ""}`; + const hash = `${entry.timestamp ?? ""}-${entry.notification?.method ?? ""}${suDetail}`; if (currentHashes.has(hash)) { continue; } currentHashes.add(hash); - const isClientMessage = entry.direction === "client"; - if (isClientMessage) { - continue; + // Check for local echo dedup BEFORE pushing any events for + // this entry — otherwise the acp_message duplicate gets in. + if ( + entry.type === "notification" && + entry.notification?.method === "session/update" && + entry.notification?.params + ) { + const params = entry.notification.params as SessionNotification; + const sessionUpdate = params?.update?.sessionUpdate; + + if (sessionUpdate === "user_message_chunk") { + const text = params?.update?.content?.text; + if (text && remainingLocalEchoes.has(text)) { + remainingLocalEchoes.delete(text); + continue; + } + // User message not from this device (e.g. desktop app) + receivedExternalUserMessage = true; + } } - const acpEvent: SessionEvent = { + batchedEvents.push({ type: "acp_message", direction: entry.direction ?? "agent", ts, message: entry.notification, - }; - get()._handleEvent(taskRunId, acpEvent); + }); + + if ( + entry.type === "notification" && + (entry.notification?.method === "_posthog/turn_complete" || + entry.notification?.method === "_posthog/task_complete" || + entry.notification?.method === "_posthog/error" || + // Agent explicitly blocked on a user reply (e.g. a question + // tool invoked via requestPermission). Treat this as a + // turn boundary so the input UI unblocks — otherwise the + // user's answer would be stuck in the "queue while busy" + // path in sendPrompt. + entry.notification?.method === "_posthog/awaiting_user_input") + ) { + receivedAgentMessage = true; + } if ( entry.type === "notification" && entry.notification?.method === "session/update" && entry.notification?.params ) { - const sessionUpdateEvent: SessionEvent = { + const params = entry.notification.params as SessionNotification; + const sessionUpdate = params?.update?.sessionUpdate; + + batchedEvents.push({ type: "session_update", ts, - notification: entry.notification - .params as SessionNotification, - }; - get()._handleEvent(taskRunId, sessionUpdateEvent); - - // Check if this is an agent message - means agent is responding - const sessionUpdate = - entry.notification?.params?.update?.sessionUpdate; - if ( - sessionUpdate === "agent_message_chunk" || - sessionUpdate === "agent_thought_chunk" - ) { + notification: params, + }); + + // agent_message (finalized, non-chunk) is a reasonable proxy + // for turn completion — it's emitted once the full response + // is assembled. Chunks and thoughts fire mid-turn and are NOT + // reliable. The proper signal is _posthog/turn_complete but + // it's not yet written to S3 logs by the server. + if (sessionUpdate === "agent_message") { receivedAgentMessage = true; } } @@ -368,23 +684,65 @@ export const useTaskSessionStore = create((set, get) => ({ } } - set((state) => ({ - sessions: { - ...state.sessions, - [taskRunId]: { - ...state.sessions[taskRunId], - processedLineCount: lines.length, - processedHashes: currentHashes, - // Clear pending state when we receive agent response - isPromptPending: receivedAgentMessage - ? false - : (state.sessions[taskRunId]?.isPromptPending ?? false), + // Determine if we should ping. If an external user message armed + // the ping in this same batch, honour it even though the store + // hasn't updated yet. + const wasAwaitingPing = + get().sessions[taskRunId]?.awaitingPing ?? false; + const shouldPingAfterBatch = + receivedAgentMessage && + (wasAwaitingPing || receivedExternalUserMessage); + set((state) => { + const current = state.sessions[taskRunId]; + if (!current) return state; + + // Determine isPromptPending: external user message starts work, + // turn/task completion ends it. + let nextIsPromptPending = current.isPromptPending; + if (receivedExternalUserMessage) nextIsPromptPending = true; + if (receivedAgentMessage) nextIsPromptPending = false; + + // awaitingPing: arm when work starts (even from another device), + // disarm when it completes and the ping fires. + let nextAwaitingPing = current.awaitingPing; + if (receivedExternalUserMessage && !current.awaitingPing) { + nextAwaitingPing = true; + } + if (receivedAgentMessage) nextAwaitingPing = false; + + return { + sessions: { + ...state.sessions, + [taskRunId]: { + ...current, + events: + batchedEvents.length > 0 + ? [...current.events, ...batchedEvents] + : current.events, + processedLineCount: lines.length, + processedHashes: currentHashes, + localUserEchoes: + remainingLocalEchoes.size > 0 + ? remainingLocalEchoes + : undefined, + isPromptPending: nextIsPromptPending, + awaitingPing: nextAwaitingPing, + }, }, - }, - })); + }; + }); + if ( + shouldPingAfterBatch && + usePreferencesStore.getState().pingsEnabled + ) { + playMeepSound().catch(() => {}); + } } } catch (err) { logger.warn("Cloud polling error", { error: err }); + } finally { + pollInFlight.delete(taskRunId); + pollInFlightSince.delete(taskRunId); } }; @@ -398,7 +756,111 @@ export const useTaskSessionStore = create((set, get) => ({ if (interval) { clearInterval(interval); cloudPollers.delete(taskRunId); + pollTicks.delete(taskRunId); logger.debug("Stopped cloud S3 polling", { taskRunId }); } }, + + _resumeCloudRun: async ( + taskId: string, + previousRunId: string, + prompt: string, + ) => { + // Fetch the latest task to pick up the branch the previous run was using — + // otherwise the backend would create a new branch and we'd lose working + // tree context. + const freshTask = await getTask(taskId); + const previousBranch = freshTask.latest_run?.branch ?? null; + + const updatedTask = await runTaskInCloud(taskId, { + branch: previousBranch, + resumeFromRunId: previousRunId, + pendingUserMessage: prompt, + }); + + const newRun = updatedTask.latest_run; + if (!newRun?.id || !newRun.log_url) { + throw new Error("Resume run was created but has no id or log_url"); + } + + // Stop polling the dead run and swap the session over to the new run id. + // Read the CURRENT session state to preserve the local echo that was + // just added in sendPrompt (the captured `session` variable in the + // caller is stale). + get()._stopCloudPolling(previousRunId); + + set((state) => { + const previousSession = state.sessions[previousRunId]; + if (!previousSession) return state; + const { [previousRunId]: _old, ...rest } = state.sessions; + return { + sessions: { + ...rest, + [newRun.id]: { + ...previousSession, + taskRunId: newRun.id, + logUrl: newRun.log_url, + status: "connected", + isPromptPending: true, + processedLineCount: 0, + processedHashes: new Set(), + awaitingPing: true, + }, + }, + }; + }); + + get()._startCloudPolling(newRun.id, newRun.log_url); + logger.debug("Swapped to resume run", { + taskId, + previousRunId, + newRunId: newRun.id, + }); + }, })); + +// Watch for isPromptPending transitions (true → false) and auto-send the +// next queued message. Uses setTimeout so state is fully settled before +// sendPrompt re-enters the store. +const drainInFlight = new Set(); +useTaskSessionStore.subscribe((state, prev) => { + for (const [runId, session] of Object.entries(state.sessions)) { + const prevSession = prev.sessions[runId]; + if ( + prevSession?.isPromptPending && + !session.isPromptPending && + !session.terminalStatus && + session.messageQueue?.length && + !drainInFlight.has(runId) + ) { + drainInFlight.add(runId); + setTimeout(async () => { + try { + const current = useTaskSessionStore.getState().sessions[runId]; + if (!current?.messageQueue?.length) return; + + const [next, ...rest] = current.messageQueue; + useTaskSessionStore.setState((s) => { + const sess = s.sessions[runId]; + if (!sess) return s; + return { + sessions: { + ...s.sessions, + [runId]: { + ...sess, + messageQueue: rest.length > 0 ? rest : undefined, + }, + }, + }; + }); + + await useTaskSessionStore.getState().sendPrompt(current.taskId, next); + } catch (err) { + logger.warn("Failed to send queued message", { runId, error: err }); + } finally { + drainInFlight.delete(runId); + } + }, 50); + } + } +}); diff --git a/apps/mobile/src/features/tasks/types.ts b/apps/mobile/src/features/tasks/types.ts index 4c1578a4b..4ee8bb75e 100644 --- a/apps/mobile/src/features/tasks/types.ts +++ b/apps/mobile/src/features/tasks/types.ts @@ -51,9 +51,22 @@ export interface SessionNotification { status?: "pending" | "in_progress" | "completed" | "failed" | null; rawInput?: Record; rawOutput?: unknown; + entries?: PlanEntry[]; + _meta?: { + claudeCode?: { + toolName?: string; + parentToolCallId?: string; + }; + }; }; } +export interface PlanEntry { + content: string; + status: "pending" | "in_progress" | "completed"; + priority: string; +} + export interface AcpMessage { type: "acp_message"; direction: "client" | "agent"; diff --git a/apps/mobile/src/features/tasks/utils/parseSessionLogs.ts b/apps/mobile/src/features/tasks/utils/parseSessionLogs.ts index fb405b1a6..307efca93 100644 --- a/apps/mobile/src/features/tasks/utils/parseSessionLogs.ts +++ b/apps/mobile/src/features/tasks/utils/parseSessionLogs.ts @@ -56,27 +56,10 @@ export function parseSessionLogs(content: string): ParsedSessionLogs { export function convertRawEntriesToEvents( rawEntries: StoredLogEntry[], notifications: SessionNotification[], - taskDescription?: string, ): SessionEvent[] { const events: SessionEvent[] = []; let notificationIdx = 0; - if (taskDescription) { - const startTs = rawEntries[0]?.timestamp - ? new Date(rawEntries[0].timestamp).getTime() - 1 - : Date.now(); - events.push({ - type: "session_update", - ts: startTs, - notification: { - update: { - sessionUpdate: "user_message_chunk", - content: { type: "text", text: taskDescription }, - }, - }, - }); - } - for (const entry of rawEntries) { const ts = entry.timestamp ? new Date(entry.timestamp).getTime() diff --git a/apps/mobile/src/features/tasks/utils/sounds.ts b/apps/mobile/src/features/tasks/utils/sounds.ts new file mode 100644 index 000000000..8460a0cb3 --- /dev/null +++ b/apps/mobile/src/features/tasks/utils/sounds.ts @@ -0,0 +1,23 @@ +import { Audio } from "expo-av"; + +// eslint-disable-next-line @typescript-eslint/no-require-imports +const meepAsset = require("../../../../assets/sounds/meep.mp3"); + +let audioModeConfigured = false; + +export async function playMeepSound(): Promise { + if (!audioModeConfigured) { + await Audio.setAudioModeAsync({ + playsInSilentModeIOS: true, + }); + audioModeConfigured = true; + } + const { sound } = await Audio.Sound.createAsync(meepAsset, { + shouldPlay: true, + }); + sound.setOnPlaybackStatusUpdate((status) => { + if (status.isLoaded && status.didJustFinish) { + sound.unloadAsync(); + } + }); +} diff --git a/packages/agent/src/acp-extensions.ts b/packages/agent/src/acp-extensions.ts index 3cfeba297..722c3fab9 100644 --- a/packages/agent/src/acp-extensions.ts +++ b/packages/agent/src/acp-extensions.ts @@ -25,6 +25,14 @@ export const POSTHOG_NOTIFICATIONS = { /** Agent finished processing a turn (prompt returned, waiting for next input) */ TURN_COMPLETE: "_posthog/turn_complete", + /** + * Agent has stopped mid-turn and is blocked on a user reply (e.g. a + * question tool invoked via requestPermission). Emitted by permission + * handlers before they block; clients use it to unblock their input UI + * so the user's answer is sent directly instead of being queued. + */ + AWAITING_USER_INPUT: "_posthog/awaiting_user_input", + /** Error occurred during task execution */ ERROR: "_posthog/error", diff --git a/packages/agent/src/adapters/claude/permissions/permission-handlers.ts b/packages/agent/src/adapters/claude/permissions/permission-handlers.ts index b97ab8fdb..6315b22d8 100644 --- a/packages/agent/src/adapters/claude/permissions/permission-handlers.ts +++ b/packages/agent/src/adapters/claude/permissions/permission-handlers.ts @@ -3,6 +3,7 @@ import type { RequestPermissionResponse, } from "@agentclientprotocol/sdk"; import type { PermissionUpdate } from "@anthropic-ai/claude-agent-sdk"; +import { POSTHOG_NOTIFICATIONS } from "../../../acp-extensions"; import { text } from "../../../utils/acp-content"; import type { Logger } from "../../../utils/logger"; import { toolInfoFromToolUse } from "../conversion/tool-use-to-acp"; @@ -276,6 +277,25 @@ async function handleAskUserQuestionTool( input: toolInput, }); + // Tell any attached clients (including log readers like the mobile app) + // that the agent has stopped and is blocked on a user reply. Without this + // signal, clients reading the log-only stream have no reliable way to + // distinguish "tool running" from "tool waiting for a human" — and treat + // replies as queued-while-busy instead of sending them through. + try { + await client.extNotification(POSTHOG_NOTIFICATIONS.AWAITING_USER_INPUT, { + sessionId, + toolCallId: toolUseID, + }); + } catch (err) { + context.logger.warn( + "[AskUserQuestion] Failed to emit awaiting_user_input", + { + error: err, + }, + ); + } + const response = await client.requestPermission({ options, sessionId, @@ -291,10 +311,23 @@ async function handleAskUserQuestionTool( }, }); - if (context.signal?.aborted || response.outcome?.outcome === "cancelled") { + if (context.signal?.aborted) { throw new Error("Tool use aborted"); } + if (response.outcome?.outcome === "cancelled") { + const cancelMessage = ( + response._meta as Record | undefined + )?.message; + return { + behavior: "deny", + message: + typeof cancelMessage === "string" + ? cancelMessage + : "User cancelled the questions", + }; + } + if (response.outcome?.outcome !== "selected") { const customMessage = ( response._meta as Record | undefined diff --git a/packages/agent/src/server/agent-server.ts b/packages/agent/src/server/agent-server.ts index 69f9dd620..4a4ce0705 100644 --- a/packages/agent/src/server/agent-server.ts +++ b/packages/agent/src/server/agent-server.ts @@ -599,6 +599,38 @@ export class AgentServer { }; } + case "permission_response": { + // Cloud questions are not a blocking permission wait (the cloud + // permission handler returns "cancelled" with a wait hint so + // Claude ends its turn, then resumes on the next user_message). + // So a mobile permission_response for a cloud run is effectively + // a follow-up prompt — forward it through the user_message path + // using the user's answer as prompt text. + const customInput = + typeof params.customInput === "string" ? params.customInput : ""; + const rawAnswers = params.answers; + const answerValues = + rawAnswers && + typeof rawAnswers === "object" && + !Array.isArray(rawAnswers) + ? Object.values(rawAnswers as Record).filter( + (v): v is string => typeof v === "string", + ) + : []; + const answerText = customInput || answerValues.join(", "); + + if (!answerText) { + this.logger.warn("permission_response missing answer content", { + toolCallId: params.toolCallId, + }); + return { stopReason: "cancelled" }; + } + + return await this.executeCommand("user_message", { + content: answerText, + }); + } + case POSTHOG_NOTIFICATIONS.CANCEL: case "cancel": { this.logger.info("Cancel requested", { @@ -1579,6 +1611,88 @@ ${attributionInstructions} } } + // Interactive sessions (mobile/web): don't auto-approve questions. + // Log the question as in-progress so the client renders it interactively, + // then return cancelled with a wait message so the agent waits for the + // user's reply (same pattern as the Slack relay above). + if ( + mode === "interactive" && + codeToolKind === "question" && + interactionOrigin !== "slack" && + params.toolCall?._meta + ) { + const questionMeta = params.toolCall._meta; + const toolCallId = `question-${Date.now()}`; + + const questionNotification = { + jsonrpc: "2.0", + method: "session/update", + params: { + update: { + sessionUpdate: "tool_call", + title: "AskUserQuestion", + toolCallId, + status: "in_progress", + rawInput: questionMeta, + }, + }, + }; + + this.broadcastEvent({ + type: "notification", + timestamp: new Date().toISOString(), + notification: questionNotification, + }); + + this.session?.logWriter.appendRawLine( + payload.run_id, + JSON.stringify(questionNotification), + ); + + return { + outcome: { outcome: "cancelled" as const }, + _meta: { + message: + "This question has been sent to the user's device. " + + "The user will reply with their selection. Do NOT re-ask the question or pick an answer yourself. " + + "Wait for the user's reply.", + }, + }; + } + + // Background mode or non-question permissions: log as completed and auto-approve + if (codeToolKind === "question" && params.toolCall?._meta) { + const questionMeta = params.toolCall._meta; + const toolCallId = `question-${Date.now()}`; + const selectedOption = allowOption?.name ?? "Auto-approved"; + + const questionNotification = { + jsonrpc: "2.0", + method: "session/update", + params: { + update: { + sessionUpdate: "tool_call", + title: "AskUserQuestion", + toolCallId, + status: "completed", + rawInput: questionMeta, + rawOutput: { answer: selectedOption }, + }, + }, + }; + + this.broadcastEvent({ + type: "notification", + timestamp: new Date().toISOString(), + notification: questionNotification, + }); + + this.session?.logWriter.appendRawLine( + payload.run_id, + JSON.stringify(questionNotification), + ); + } + // Relay permission requests to the desktop app when: // - Questions: always relay (need human answers regardless of mode) // - Plan approvals: always relay @@ -1953,18 +2067,27 @@ ${attributionInstructions} private broadcastTurnComplete(stopReason: string): void { if (!this.session) return; + const notification = { + jsonrpc: "2.0" as const, + method: POSTHOG_NOTIFICATIONS.TURN_COMPLETE, + params: { + sessionId: this.session.acpSessionId, + stopReason, + }, + }; + this.broadcastEvent({ type: "notification", timestamp: new Date().toISOString(), - notification: { - jsonrpc: "2.0", - method: POSTHOG_NOTIFICATIONS.TURN_COMPLETE, - params: { - sessionId: this.session.acpSessionId, - stopReason, - }, - }, + notification, }); + + // Persist to S3 log so mobile clients (which poll S3) can detect + // turn completion and trigger "your turn" indicators / sounds. + this.session.logWriter.appendRawLine( + this.session.payload.run_id, + JSON.stringify(notification), + ); } private broadcastEvent(event: Record): void { diff --git a/packages/agent/src/server/question-relay.test.ts b/packages/agent/src/server/question-relay.test.ts index e3865e309..d56233ecf 100644 --- a/packages/agent/src/server/question-relay.test.ts +++ b/packages/agent/src/server/question-relay.test.ts @@ -225,7 +225,7 @@ describe("Question relay", () => { delete process.env.POSTHOG_CODE_INTERACTION_ORIGIN; }); - it("auto-approves question tools (no Slack relay)", async () => { + it("returns cancelled with wait message for interactive question tools", async () => { const client = server.createCloudClient(TEST_PAYLOAD); const result = await client.requestPermission({ @@ -233,7 +233,10 @@ describe("Question relay", () => { toolCall: { _meta: QUESTION_META }, }); - expect(result.outcome.outcome).toBe("selected"); + expect(result.outcome.outcome).toBe("cancelled"); + expect(result._meta?.message).toContain( + "sent to the user's device", + ); }); it("keeps auto-approving permissions after SSE send failures", async () => { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index e2364f028..7800234b6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -566,6 +566,9 @@ importers: expo-secure-store: specifier: ^15.0.8 version: 15.0.8(expo@54.0.33) + expo-speech-recognition: + specifier: ^3.1.2 + version: 3.1.2(expo@54.0.33)(react-native@0.81.5(@babel/core@7.29.0)(@types/react@19.2.11)(react@19.1.0))(react@19.1.0) expo-splash-screen: specifier: ~31.0.12 version: 31.0.13(expo@54.0.33) @@ -7012,6 +7015,13 @@ packages: resolution: {integrity: sha512-IGR++flYH70rhLyeXF0Phle56/k4cee87WeQ4mamS+MkVAVP+dDlOHf2nN06Z9Y2KhU0Gp1k+y61KkghF7HdhA==} engines: {node: '>=20.16.0'} + expo-speech-recognition@3.1.2: + resolution: {integrity: sha512-yaXy+6w218Urdshits2KsfLjXNCnGNlXzUxEP4BVehKEbiIPAeUKBzuicCeELU5H2zTLwL9u+RjbFAUom4LiYQ==} + peerDependencies: + expo: '*' + react: '*' + react-native: '*' + expo-splash-screen@31.0.13: resolution: {integrity: sha512-1epJLC1cDlwwj089R2h8cxaU5uk4ONVAC+vzGiTZH4YARQhL4Stlz1MbR6yAS173GMosvkE6CAeihR7oIbCkDA==} peerDependencies: @@ -18802,6 +18812,12 @@ snapshots: expo-server@1.0.5: {} + expo-speech-recognition@3.1.2(expo@54.0.33)(react-native@0.81.5(@babel/core@7.29.0)(@types/react@19.2.11)(react@19.1.0))(react@19.1.0): + dependencies: + expo: 54.0.33(@babel/core@7.29.0)(@expo/metro-runtime@6.1.2)(expo-router@6.0.23)(graphql@16.12.0)(react-native-webview@13.16.0(react-native@0.81.5(@babel/core@7.29.0)(@types/react@19.2.11)(react@19.1.0))(react@19.1.0))(react-native@0.81.5(@babel/core@7.29.0)(@types/react@19.2.11)(react@19.1.0))(react@19.1.0) + react: 19.1.0 + react-native: 0.81.5(@babel/core@7.29.0)(@types/react@19.2.11)(react@19.1.0) + expo-splash-screen@31.0.13(expo@54.0.33): dependencies: '@expo/prebuild-config': 54.0.8(expo@54.0.33)