Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .claude/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
"deny": [
"Read(./apps/cli/**)",
"Edit(./apps/cli/**)",
"Write(./apps/cli/**)",
"Read(./apps/mobile/**)",
"Edit(./apps/mobile/**)",
"Write(./apps/mobile/**)"
"Write(./apps/cli/**)"
]
}
}
2 changes: 2 additions & 0 deletions apps/code/src/main/di/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { WorkspaceRepository } from "../db/repositories/workspace-repository";
import { WorktreeRepository } from "../db/repositories/worktree-repository";
import { DatabaseService } from "../db/service";
import { AgentAuthAdapter } from "../services/agent/auth-adapter";
import { LocalCommandReceiver } from "../services/agent/local-command-receiver";
import { AgentService } from "../services/agent/service";
import { AppLifecycleService } from "../services/app-lifecycle/service";
import { ArchiveService } from "../services/archive/service";
Expand Down Expand Up @@ -64,6 +65,7 @@ container.bind(MAIN_TOKENS.ArchiveRepository).to(ArchiveRepository);
container.bind(MAIN_TOKENS.SuspensionRepository).to(SuspensionRepositoryImpl);
container.bind(MAIN_TOKENS.AgentAuthAdapter).to(AgentAuthAdapter);
container.bind(MAIN_TOKENS.AgentService).to(AgentService);
container.bind(MAIN_TOKENS.LocalCommandReceiver).to(LocalCommandReceiver);
container.bind(MAIN_TOKENS.AuthService).to(AuthService);
container.bind(MAIN_TOKENS.AuthProxyService).to(AuthProxyService);
container.bind(MAIN_TOKENS.ArchiveService).to(ArchiveService);
Expand Down
1 change: 1 addition & 0 deletions apps/code/src/main/di/tokens.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export const MAIN_TOKENS = Object.freeze({
// Services
AgentAuthAdapter: Symbol.for("Main.AgentAuthAdapter"),
AgentService: Symbol.for("Main.AgentService"),
LocalCommandReceiver: Symbol.for("Main.LocalCommandReceiver"),
AuthService: Symbol.for("Main.AuthService"),
AuthProxyService: Symbol.for("Main.AuthProxyService"),
ArchiveService: Symbol.for("Main.ArchiveService"),
Expand Down
262 changes: 262 additions & 0 deletions apps/code/src/main/services/agent/local-command-receiver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
import { inject, injectable, preDestroy } from "inversify";
import { MAIN_TOKENS } from "../../di/tokens";
import { logger } from "../../utils/logger";
import type { AuthService } from "../auth/service";

const log = logger.scope("local-command-receiver");
const INITIAL_RECONNECT_DELAY_MS = 2000;
const MAX_RECONNECT_DELAY_MS = 30_000;
// After this many consecutive failures, assume Last-Event-ID is stale (event
// trimmed from the backend buffer) and fall back to a fresh connect with
// start=latest. Accepts that we may drop commands issued during the outage.
const STALE_EVENT_ID_THRESHOLD = 3;

/**
* JSON-RPC envelope carried inside an `incoming_command` SSE event. The
* backend repackages whatever mobile POSTs to /command/ into this shape
* (see products/tasks/backend/api.py :: command).
*/
export interface IncomingCommandPayload {
jsonrpc: string;
method: string;
params?: { content?: string } & Record<string, unknown>;
id?: string | number;
}

interface SubscribeParams {
taskId: string;
taskRunId: string;
projectId: number;
apiHost: string;
onCommand: (payload: IncomingCommandPayload) => Promise<void>;
}

interface Subscription {
taskRunId: string;
controller: AbortController;
}

/**
* Subscribes to the PostHog task-run SSE stream for a local run and
* delivers `incoming_command` events (published by the backend when mobile
* POSTs to /command/ on a run with environment=local) to a caller-supplied
* callback.
*
* One SSE connection per subscribed run. Reconnects with backoff on failure.
* Uses the `Last-Event-ID` header to resume from the last processed event
* so brief network blips don't drop commands.
*/
@injectable()
export class LocalCommandReceiver {
private readonly subs = new Map<string, Subscription>();

constructor(
@inject(MAIN_TOKENS.AuthService)
private readonly auth: AuthService,
) {}

subscribe(params: SubscribeParams): void {
if (this.subs.has(params.taskRunId)) {
log.debug("Already subscribed", { taskRunId: params.taskRunId });
return;
}
const controller = new AbortController();
this.subs.set(params.taskRunId, {
taskRunId: params.taskRunId,
controller,
});
log.info("Subscribing to SSE stream", { taskRunId: params.taskRunId });
void this.connectLoop(params, controller).catch((err) => {
if (controller.signal.aborted) return;
log.error("Connect loop exited unexpectedly", {
taskRunId: params.taskRunId,
error: err instanceof Error ? err.message : String(err),
});
});
}

unsubscribe(taskRunId: string): void {
const sub = this.subs.get(taskRunId);
if (!sub) return;
sub.controller.abort();
this.subs.delete(taskRunId);
log.info("Unsubscribed", { taskRunId });
}

@preDestroy()
async shutdown(): Promise<void> {
// Abort before awaiting teardown — per async-cleanup-ordering guidance.
for (const sub of this.subs.values()) sub.controller.abort();
this.subs.clear();
}

private async connectLoop(
params: SubscribeParams,
controller: AbortController,
): Promise<void> {
let lastEventId: string | undefined;
let consecutiveFailures = 0;

while (!controller.signal.aborted) {
let streamOpened = false;
try {
const { accessToken } = await this.auth.getValidAccessToken();
const url = new URL(
`${params.apiHost}/api/projects/${params.projectId}/tasks/${params.taskId}/runs/${params.taskRunId}/stream/`,
);
if (!lastEventId) {
// Fresh connect: only care about events published from now on.
// On reconnect we use Last-Event-ID instead (see headers below).
url.searchParams.set("start", "latest");
}

const headers: Record<string, string> = {
Authorization: `Bearer ${accessToken}`,
Accept: "text/event-stream",
};
if (lastEventId) headers["Last-Event-ID"] = lastEventId;

const response = await fetch(url.toString(), {
headers,
signal: controller.signal,
});
if (!response.ok) {
const body = await response.text().catch(() => "");
throw new Error(
`SSE HTTP ${response.status}${body ? `: ${body.slice(0, 200)}` : ""}`,
);
}

streamOpened = true;
consecutiveFailures = 0;
lastEventId = await this.readEventStream(
response.body,
params.onCommand,
controller.signal,
lastEventId,
);
log.info("SSE stream ended cleanly", {
taskRunId: params.taskRunId,
});
} catch (err) {
if (controller.signal.aborted) return;
if (!streamOpened) consecutiveFailures++;
if (
consecutiveFailures >= STALE_EVENT_ID_THRESHOLD &&
lastEventId !== undefined
) {
log.warn(
"Dropping possibly-stale Last-Event-ID after repeated failures",
{
taskRunId: params.taskRunId,
consecutiveFailures,
},
);
lastEventId = undefined;
}
log.warn("SSE disconnected, will reconnect", {
taskRunId: params.taskRunId,
consecutiveFailures,
error: err instanceof Error ? err.message : String(err),
});
}
if (controller.signal.aborted) return;
const delay = Math.min(
MAX_RECONNECT_DELAY_MS,
INITIAL_RECONNECT_DELAY_MS * 2 ** Math.max(0, consecutiveFailures - 1),
);
await this.sleep(delay, controller.signal);
}
}

private async readEventStream(
body: ReadableStream<Uint8Array> | null,
onCommand: SubscribeParams["onCommand"],
signal: AbortSignal,
seedLastEventId: string | undefined,
): Promise<string | undefined> {
if (!body) throw new Error("Missing SSE response body");
const reader = body.getReader();
const decoder = new TextDecoder();
let buffer = "";
let lastEventId = seedLastEventId;

try {
while (!signal.aborted) {
const { done, value } = await reader.read();
if (done) return lastEventId;
buffer += decoder.decode(value, { stream: true });

// SSE event blocks are separated by a blank line (\n\n).
while (true) {
const separator = buffer.indexOf("\n\n");
if (separator === -1) break;
const rawEvent = buffer.slice(0, separator);
buffer = buffer.slice(separator + 2);

let dataChunks = "";
let eventId: string | undefined;
for (const line of rawEvent.split("\n")) {
if (line.startsWith("data: ")) {
dataChunks += line.slice(6);
} else if (line.startsWith("id: ")) {
eventId = line.slice(4);
}
// `event:` and comments are ignored — we route on data.type.
}
if (!dataChunks) continue;

let parsed: unknown;
try {
parsed = JSON.parse(dataChunks);
} catch {
log.warn("Failed to parse SSE data chunk", {
preview: dataChunks.slice(0, 120),
});
continue;
}

if (
typeof parsed === "object" &&
parsed !== null &&
(parsed as { type?: unknown }).type === "incoming_command"
) {
const payload = (parsed as { payload?: unknown }).payload;
if (payload && typeof payload === "object") {
try {
await onCommand(payload as IncomingCommandPayload);
} catch (err) {
log.error("Incoming command handler threw", {
error: err instanceof Error ? err.message : String(err),
});
}
}
}

if (eventId) lastEventId = eventId;
}
}
return lastEventId;
} finally {
try {
await reader.cancel();
} catch {
// Reader already closed or cancelled; nothing to do.
}
}
}

private sleep(ms: number, signal: AbortSignal): Promise<void> {
return new Promise((resolve) => {
const timer = setTimeout(resolve, ms);
signal.addEventListener(
"abort",
() => {
clearTimeout(timer);
resolve();
},
{ once: true },
);
});
}
}
30 changes: 12 additions & 18 deletions apps/code/src/main/services/agent/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,6 @@ export const credentialsSchema = z.object({

export type Credentials = z.infer<typeof credentialsSchema>;

// Session config schema
export const sessionConfigSchema = z.object({
taskId: z.string(),
taskRunId: z.string(),
repoPath: z.string(),
credentials: credentialsSchema,
logUrl: z.string().optional(),
/** The agent's session ID (for resume - SDK session ID for Claude, Codex's session ID for Codex) */
sessionId: z.string().optional(),
adapter: z.enum(["claude", "codex"]).optional(),
/** Additional directories Claude can access beyond cwd (for worktree support) */
additionalDirectories: z.array(z.string()).optional(),
/** Permission mode to use for the session (e.g. "default", "acceptEdits", "plan", "bypassPermissions") */
permissionMode: z.string().optional(),
});

export type SessionConfig = z.infer<typeof sessionConfigSchema>;

// Start session input/output

export const startSessionInput = z.object({
Expand Down Expand Up @@ -173,6 +155,7 @@ export const reconnectSessionInput = z.object({
permissionMode: z.string().optional(),
customInstructions: z.string().max(2000).optional(),
effort: effortLevelSchema.optional(),
runMode: z.enum(["local", "cloud"]).optional(),
});

export type ReconnectSessionInput = z.infer<typeof reconnectSessionInput>;
Expand All @@ -198,6 +181,11 @@ export const recordActivityInput = z.object({
export const AgentServiceEvent = {
SessionEvent: "session-event",
PermissionRequest: "permission-request",
// Fires when a pending permission is resolved by anything other than the
// Electron UI (e.g. a mobile client calling permission_response). Renderer
// uses this to clear its own pendingPermissions copy in lockstep with the
// main-process map.
PermissionResolved: "permission-resolved",
SessionsIdle: "sessions-idle",
SessionIdleKilled: "session-idle-killed",
AgentFileActivity: "agent-file-activity",
Expand Down Expand Up @@ -226,9 +214,15 @@ export interface AgentFileActivityPayload {
branchName: string | null;
}

export interface PermissionResolvedPayload {
taskRunId: string;
toolCallId: string;
}

export interface AgentServiceEvents {
[AgentServiceEvent.SessionEvent]: AgentSessionEventPayload;
[AgentServiceEvent.PermissionRequest]: PermissionRequestPayload;
[AgentServiceEvent.PermissionResolved]: PermissionResolvedPayload;
[AgentServiceEvent.SessionsIdle]: undefined;
[AgentServiceEvent.SessionIdleKilled]: SessionIdleKilledPayload;
[AgentServiceEvent.AgentFileActivity]: AgentFileActivityPayload;
Expand Down
Loading