diff --git a/.gitignore b/.gitignore index 13f497c..20adc1a 100644 --- a/.gitignore +++ b/.gitignore @@ -42,3 +42,6 @@ coverage/ # Marketing drafts (private — edit/publish externally) .marketing/ + +# Detached Claude worker state (see #73) +.klonode/workers/ diff --git a/packages/ui/src/lib/components/ChatPanel/ChatPanel.svelte b/packages/ui/src/lib/components/ChatPanel/ChatPanel.svelte index d24b882..9787752 100644 --- a/packages/ui/src/lib/components/ChatPanel/ChatPanel.svelte +++ b/packages/ui/src/lib/components/ChatPanel/ChatPanel.svelte @@ -9,6 +9,8 @@ coContextUsage, CO_MAX_CONTEXT, closedSessionQueue, CONTEXT_DEPTH_LABELS, getCliSessionId, setCliSessionId, clearCliSessionId, + getActiveWorkerId, setActiveWorkerId, clearActiveWorkerId, + getLastWorkerOffset, setLastWorkerOffset, type ContextDepth as CtxDepth, } from '../../stores/agents'; import { recordActivity, clearActivity } from '../../stores/activity'; @@ -18,6 +20,7 @@ defineComponentAction, defineComponentState, } from '../../workstation/registry'; + import { spawnWorker, connectWorker, stopWorker, type WorkerConnection } from '../../workers/worker-client'; // Register with the workstation self-introspection registry so Claude can // read "which session is active, how many messages, is it loading" and @@ -113,6 +116,8 @@ let attachments: { name: string; type: string; dataUrl: string; file: File }[] = []; let fileInput: HTMLInputElement; let abortController: AbortController | null = null; + /** Active detached-worker SSE connection. Closed on Stop or onDestroy. */ + let activeWorkerConnection: WorkerConnection | null = null; // Klonode session tab ID → Claude CLI session ID is now tracked in // sessionsStore.cliSessionIds and persisted to localStorage, so reloads // and Vite server restarts preserve conversation continuity. Use @@ -163,7 +168,9 @@ return; } - // For CLI mode: use streaming endpoint for live feedback + // For CLI mode: spawn a detached Claude worker and tail its log file. + // This survives Vite HMR / dev-server restarts because the worker is + // NOT a child of the Vite process — see #73. if (settings.connectionMode === 'cli') { const graph = (await import('../../stores/graph')).graphStore; let repoPath = ''; @@ -178,89 +185,78 @@ messages: [...s.messages, userMsg, loadingMsg], })); + // Build the same system prompt the legacy /api/chat/stream endpoint + // used. The worker just spawns Claude — it doesn't inject a prompt + // on its own, so we do it here and pipe it through the `prompt` + // request field. + const systemPrompt = isCO + ? `You are an experienced developer with full access to all tools. Work directly in the project directory.\n\nAnswer in Norwegian unless the user writes in English. Write all code and CONTEXT.md files in English.` + : `Erfaren utvikler. Svar pa norsk med mindre brukeren skriver pa engelsk.`; + const fullPrompt = `${systemPrompt}\n\nBrukerens sporsmaal: ${userMessage}`; + + let resultText = ''; + let resultTokens = { input: 0, output: 0, total: 0, costUsd: 0, numTurns: 0 }; + const klonodeSessionId = $sessionsStore.activeSessionId; + try { - abortController = new AbortController(); - const res = await fetch('/api/chat/stream', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - signal: abortController.signal, - body: JSON.stringify({ - message: userMessage, - context: '', - cliPath: settings.cliPath, - mode: 'with-klonode', - repoPath, - executionMode: isCO ? 'bypass' : (settings.executionMode === 'auto' ? 'bypass' : settings.executionMode), - isCO, - sessionId: getCliSessionId($sessionsStore.activeSessionId), - }), + const { workerId } = await spawnWorker({ + cliPath: settings.cliPath, + repoPath, + prompt: fullPrompt, + maxTurns: isCO ? 500 : 500, + }); + setActiveWorkerId(klonodeSessionId, workerId); + + // Wrap the callback-based stream in a promise so the rest of the + // send flow can await a clean `done` event. + await new Promise((resolve) => { + activeWorkerConnection = connectWorker(workerId, 0, repoPath, { + onSession: (sid) => { + if (sid) setCliSessionId(klonodeSessionId, sid); + }, + onTool: (tool, input) => { + activityLog = [...activityLog, { tool, input, time: new Date() }]; + recordActivity(tool, input, repoPath); + tick().then(scrollToBottom); + }, + onText: (text) => { + streamingText += text; + }, + onResult: (r) => { + resultText = r.text || streamingText || ''; + const u = r.usage as { input_tokens?: number; output_tokens?: number; cache_creation_input_tokens?: number; cache_read_input_tokens?: number; } | undefined; + if (u) { + resultTokens = { + input: (u.input_tokens || 0) + (u.cache_creation_input_tokens || 0) + (u.cache_read_input_tokens || 0), + output: u.output_tokens || 0, + total: 0, + costUsd: r.costUsd || 0, + numTurns: r.numTurns || 0, + }; + resultTokens.total = resultTokens.input + resultTokens.output; + } + }, + onStderr: (text) => { + // Surface in console for now — a future PR wires this to a + // dedicated stderr strip in the chat panel. + // eslint-disable-next-line no-console + console.debug('[worker stderr]', text); + }, + onError: (message) => { + resultText = `Feil: ${message}`; + }, + onOffset: (offset) => { + setLastWorkerOffset(klonodeSessionId, offset); + }, + onDone: () => { + resolve(); + }, + }); }); - const reader = res.body?.getReader(); - const decoder = new TextDecoder(); - let resultText = ''; - let resultTokens = { input: 0, output: 0, total: 0, costUsd: 0, numTurns: 0 }; - - if (reader) { - let buf = ''; - while (true) { - const { done, value } = await reader.read(); - if (done) break; - - buf += decoder.decode(value, { stream: true }); - const parts = buf.split('\n\n'); - buf = parts.pop() || ''; - - for (const part of parts) { - const eventMatch = part.match(/^event: (\w+)\ndata: (.+)$/s); - if (!eventMatch) continue; - const [, eventType, dataStr] = eventMatch; - try { - const data = JSON.parse(dataStr); - switch (eventType) { - case 'session': - // Store CLI session ID for this tab so next message resumes the conversation - if (data.sessionId) { - setCliSessionId($sessionsStore.activeSessionId, data.sessionId); - } - break; - case 'tool': - activityLog = [...activityLog, { tool: data.tool, input: data.input, time: new Date() }]; - // Also push to the shared activity store so tree/graph can highlight - recordActivity(data.tool, data.input, repoPath); - await tick(); - scrollToBottom(); - break; - case 'text': - streamingText += data.text; - break; - case 'result': - resultText = data.text || streamingText || ''; - if (data.usage) { - resultTokens = { - input: (data.usage.input_tokens || 0) + (data.usage.cache_creation_input_tokens || 0) + (data.usage.cache_read_input_tokens || 0), - output: data.usage.output_tokens || 0, - total: 0, - costUsd: data.costUsd || 0, - numTurns: data.numTurns || 0, - }; - resultTokens.total = resultTokens.input + resultTokens.output; - } - break; - case 'error': - resultText = `Feil: ${data.message}`; - break; - } - } catch { /* skip bad JSON */ } - } - } - } - - // If no result text but we have streaming text, use that if (!resultText && streamingText) resultText = streamingText; if (!resultText) resultText = 'Claude brukte alle steg. Prov et mer spesifikt sporsmal.'; - abortController = null; chatStore.update(s => ({ ...s, isLoading: false, messages: s.messages.map(m => m.id === loadingId ? { @@ -269,6 +265,8 @@ } : m), })); streamingText = ''; + clearActiveWorkerId(klonodeSessionId); + activeWorkerConnection = null; // After CO finishes, refresh the graph so UI shows updated CONTEXT.md files if (isCO) { @@ -280,7 +278,6 @@ }); const refreshData = await refreshRes.json(); if (refreshData.updated > 0) { - // Reload the graph in the UI const { loadGraphFromUrl } = await import('../../stores/loader'); await loadGraphFromUrl('/demo-graph.json'); console.log(`[Klonode] Graph refreshed: ${refreshData.updated} files updated`); @@ -290,7 +287,8 @@ } } } catch (err) { - abortController = null; + activeWorkerConnection = null; + clearActiveWorkerId(klonodeSessionId); chatStore.update(s => ({ ...s, isLoading: false, messages: s.messages.map(m => m.id === loadingId ? { @@ -377,17 +375,37 @@ Rules: handleSend(); } - function handleStop() { + async function handleStop() { + // Close the SSE tail first so the UI stops receiving events. + if (activeWorkerConnection) { + activeWorkerConnection.close(); + activeWorkerConnection = null; + } + // Tell the server to SIGTERM the detached worker. + const klonodeSessionId = $sessionsStore.activeSessionId; + const workerId = getActiveWorkerId(klonodeSessionId); + if (workerId) { + const graph = (await import('../../stores/graph')).graphStore; + let repoPath = ''; + graph.subscribe(g => { if (g) repoPath = g.repoPath; })(); + try { + await stopWorker(workerId, repoPath); + } catch (e) { + console.warn('[Klonode] stopWorker failed', e); + } + clearActiveWorkerId(klonodeSessionId); + } + // Legacy AbortController path (API mode / compare mode still use fetch). if (abortController) { abortController.abort(); abortController = null; - chatStore.update(s => ({ - ...s, isLoading: false, - messages: s.messages.map(m => m.loading - ? { ...m, loading: false, content: 'Stoppet av bruker.' } - : m), - })); } + chatStore.update(s => ({ + ...s, isLoading: false, + messages: s.messages.map(m => m.loading + ? { ...m, loading: false, content: 'Stoppet av bruker.' } + : m), + })); } function scrollToBottom() { @@ -418,6 +436,92 @@ Rules: if ($settingsStore.connectionMode === 'cli' && !$settingsStore.cliPath) { await detectCli(); } + + // Self-hosting resume: if this tab had an active detached worker when + // the page was unloaded (Vite HMR, reload, explicit refresh), reconnect + // to it and resume streaming from the last persisted byte offset. The + // worker has been writing to disk the whole time, so we never lose any + // tool calls or tokens. See #73. + const klonodeSessionId = $sessionsStore.activeSessionId; + const workerId = getActiveWorkerId(klonodeSessionId); + if (workerId) { + const graph = (await import('../../stores/graph')).graphStore; + let repoPath = ''; + graph.subscribe(g => { if (g) repoPath = g.repoPath; })(); + const since = getLastWorkerOffset(klonodeSessionId); + + // Find the most recent assistant message that was flagged as + // interrupted at hydrate time — that's the bubble we should resume + // streaming into. If there isn't one, append a fresh loading bubble. + const state = get(chatStore); + let targetId = ''; + for (let i = state.messages.length - 1; i >= 0; i--) { + if (state.messages[i].role === 'assistant' && state.messages[i].interrupted) { + targetId = state.messages[i].id; + break; + } + } + if (!targetId) { + targetId = Math.random().toString(36).slice(2, 10); + chatStore.update(s => ({ + ...s, + isLoading: true, + messages: [ + ...s.messages, + { id: targetId, role: 'assistant' as const, content: '', loading: true, timestamp: new Date() }, + ], + })); + } else { + // Clear interrupted + flip back to loading so the UI re-shows the + // streaming indicators instead of the "interrupted" banner. + chatStore.update(s => ({ + ...s, + isLoading: true, + messages: s.messages.map(m => m.id === targetId + ? { ...m, interrupted: false, loading: true } + : m), + })); + } + + let resumedResultText = ''; + const resumedTokens = { input: 0, output: 0, total: 0, costUsd: 0, numTurns: 0 }; + + activeWorkerConnection = connectWorker(workerId, since, repoPath, { + onSession: (sid) => { if (sid) setCliSessionId(klonodeSessionId, sid); }, + onTool: (tool, input) => { + activityLog = [...activityLog, { tool, input, time: new Date() }]; + recordActivity(tool, input, repoPath); + tick().then(scrollToBottom); + }, + onText: (text) => { streamingText += text; }, + onResult: (r) => { + resumedResultText = r.text || streamingText || ''; + const u = r.usage as { input_tokens?: number; output_tokens?: number; cache_creation_input_tokens?: number; cache_read_input_tokens?: number; } | undefined; + if (u) { + resumedTokens.input = (u.input_tokens || 0) + (u.cache_creation_input_tokens || 0) + (u.cache_read_input_tokens || 0); + resumedTokens.output = u.output_tokens || 0; + resumedTokens.costUsd = r.costUsd || 0; + resumedTokens.numTurns = r.numTurns || 0; + resumedTokens.total = resumedTokens.input + resumedTokens.output; + } + }, + onError: (message) => { resumedResultText = `Feil: ${message}`; }, + onOffset: (offset) => setLastWorkerOffset(klonodeSessionId, offset), + onDone: () => { + const finalText = resumedResultText || streamingText || 'Respons gjenopprettet.'; + chatStore.update(s => ({ + ...s, + isLoading: false, + messages: s.messages.map(m => m.id === targetId + ? { ...m, loading: false, content: finalText, tokens: resumedTokens.total > 0 ? resumedTokens : undefined } + : m), + })); + streamingText = ''; + clearActiveWorkerId(klonodeSessionId); + activeWorkerConnection = null; + }, + }); + } }); diff --git a/packages/ui/src/lib/stores/agents.ts b/packages/ui/src/lib/stores/agents.ts index af99be0..667f568 100644 --- a/packages/ui/src/lib/stores/agents.ts +++ b/packages/ui/src/lib/stores/agents.ts @@ -85,6 +85,15 @@ interface SessionsState { * self-hosting session survival work (#53 follow-up / #65 sibling). */ cliSessionIds: Record; + /** + * Detached-worker tracking per tab. When a tab is streaming, its worker + * id + last byte offset live here. Persisted so that after a Vite HMR or + * page reload, ChatPanel's onMount can reconnect by calling + * `connectWorker(activeWorkerId, lastWorkerOffset)` and pick up exactly + * where the previous browser dropped off. See #73 for the architecture. + */ + activeWorkerIds: Record; + lastWorkerOffsets: Record; } const STORAGE_KEY = 'klonode-sessions'; @@ -135,6 +144,8 @@ function loadState(): SessionsState { coMemory: '', closedSessionQueue: [], cliSessionIds: {}, + activeWorkerIds: {}, + lastWorkerOffsets: {}, }; if (typeof localStorage === 'undefined') return defaults; @@ -168,6 +179,8 @@ function loadState(): SessionsState { closedSessionQueue: saved.closedSessionQueue || [], coMemory: saved.coMemory || '', cliSessionIds: saved.cliSessionIds || {}, + activeWorkerIds: saved.activeWorkerIds || {}, + lastWorkerOffsets: saved.lastWorkerOffsets || {}, }; } } @@ -286,6 +299,56 @@ export function clearCliSessionId(klonodeSessionId: string): void { }); } +/** + * Look up the detached Claude worker currently streaming into a tab, or + * `undefined` if the tab is idle. Set by ChatPanel when it spawns a worker + * and cleared when the worker finishes or is stopped. Persisted across + * reloads so Vite HMR doesn't orphan the streaming worker. See #73. + */ +export function getActiveWorkerId(klonodeSessionId: string): string | undefined { + return get(sessionsStore).activeWorkerIds[klonodeSessionId]; +} + +/** Record the worker id for a tab. */ +export function setActiveWorkerId(klonodeSessionId: string, workerId: string): void { + sessionsStore.update(s => ({ + ...s, + activeWorkerIds: { ...s.activeWorkerIds, [klonodeSessionId]: workerId }, + })); +} + +/** Clear the active worker id for a tab (called on done / stop). */ +export function clearActiveWorkerId(klonodeSessionId: string): void { + sessionsStore.update(s => { + if (!(klonodeSessionId in s.activeWorkerIds)) return s; + const nextWorkers = { ...s.activeWorkerIds }; + delete nextWorkers[klonodeSessionId]; + const nextOffsets = { ...s.lastWorkerOffsets }; + delete nextOffsets[klonodeSessionId]; + return { ...s, activeWorkerIds: nextWorkers, lastWorkerOffsets: nextOffsets }; + }); +} + +/** Current byte offset the tab has processed for its active worker's log. */ +export function getLastWorkerOffset(klonodeSessionId: string): number { + return get(sessionsStore).lastWorkerOffsets[klonodeSessionId] ?? 0; +} + +/** + * Persist the latest byte offset from the worker's tail stream. Called on + * every event so a reconnect after HMR / reload resumes from the same + * place. + */ +export function setLastWorkerOffset(klonodeSessionId: string, offset: number): void { + sessionsStore.update(s => { + if (s.lastWorkerOffsets[klonodeSessionId] === offset) return s; + return { + ...s, + lastWorkerOffsets: { ...s.lastWorkerOffsets, [klonodeSessionId]: offset }, + }; + }); +} + /** * Clear the `interrupted` flag on a message once a new response has * successfully arrived for the same session. The flag is set at hydrate diff --git a/packages/ui/src/lib/workers/worker-client.ts b/packages/ui/src/lib/workers/worker-client.ts new file mode 100644 index 0000000..b67339e --- /dev/null +++ b/packages/ui/src/lib/workers/worker-client.ts @@ -0,0 +1,177 @@ +/** + * Browser-side client for the detached Claude worker system. + * + * Replaces the direct `fetch('/api/chat/stream')` path in ChatPanel with a + * three-step dance: spawn → connect SSE → resume-on-reconnect. The critical + * property is that the browser can drop its connection (because of a Vite + * HMR, a page reload, or the user closing the tab) and a later `connect()` + * call with the same `workerId` and last-seen byte offset will pick up + * exactly where it left off. + * + * See #73. + */ + +import type { + WorkerStreamEvent, + WorkerSpawnResponse, + WorkerStopResponse, +} from './worker-protocol'; + +export interface WorkerSpawnParams { + cliPath: string; + repoPath: string; + prompt: string; + maxTurns?: number; + allowedTools?: string[]; + model?: string; +} + +/** + * Event callbacks the caller registers on `connect()`. All are optional — + * ChatPanel only needs a subset at any given moment. + */ +export interface WorkerHandlers { + onSession?: (sessionId: string) => void; + onTool?: (tool: string, input: string) => void; + onText?: (text: string) => void; + onResult?: (result: { + text: string; + usage?: unknown; + costUsd?: number; + numTurns?: number; + subtype?: string; + }) => void; + onStderr?: (text: string) => void; + onError?: (message: string) => void; + /** Fired when the worker exits. After this the EventSource is closed. */ + onDone?: (exitCode: number | null) => void; + /** + * Called after every processed event with the current byte offset. The + * caller persists this into the store so a reconnect can pass `since=`. + */ + onOffset?: (offset: number) => void; +} + +/** Open connection returned from `connect()`. Call `close()` to abort. */ +export interface WorkerConnection { + close(): void; + /** Current byte offset — useful for a one-shot snapshot. */ + getOffset(): number; +} + +/** + * POST /api/worker/spawn. Returns the worker id the caller should store. + */ +export async function spawnWorker(params: WorkerSpawnParams): Promise { + const res = await fetch('/api/worker/spawn', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(params), + }); + if (!res.ok) { + const text = await res.text(); + throw new Error(`spawnWorker: ${res.status} ${text.slice(0, 200)}`); + } + return (await res.json()) as WorkerSpawnResponse; +} + +/** + * Connect (or reconnect) to an existing worker's tail endpoint and stream + * events into the handlers. `since` is the byte offset to resume from — + * pass 0 on first connect, pass the last offset the browser persisted on a + * reconnect after HMR / reload. + */ +export function connectWorker( + workerId: string, + since: number, + repoPath: string, + handlers: WorkerHandlers, +): WorkerConnection { + let offset = since; + let closed = false; + + const params = new URLSearchParams({ + since: String(since), + repoPath, + }); + const es = new EventSource(`/api/worker/${encodeURIComponent(workerId)}/tail?${params}`); + + function updateOffset(ev: WorkerStreamEvent): void { + if (typeof ev.offset === 'number' && ev.offset > offset) { + offset = ev.offset; + handlers.onOffset?.(offset); + } + } + + function addTypedListener( + type: K, + handler: (ev: Extract) => void, + ): void { + es.addEventListener(type, (raw: MessageEvent) => { + if (closed) return; + try { + const data = JSON.parse(raw.data) as Extract; + updateOffset(data); + handler(data); + } catch { + // ignore bad JSON + } + }); + } + + addTypedListener('session', ev => handlers.onSession?.(ev.sessionId)); + addTypedListener('tool', ev => handlers.onTool?.(ev.tool, ev.input)); + addTypedListener('text', ev => handlers.onText?.(ev.text)); + addTypedListener('result', ev => + handlers.onResult?.({ + text: ev.text, + usage: ev.usage, + costUsd: ev.costUsd, + numTurns: ev.numTurns, + subtype: ev.subtype, + }), + ); + addTypedListener('stderr', ev => handlers.onStderr?.(ev.text)); + addTypedListener('error', ev => handlers.onError?.(ev.message)); + addTypedListener('done', ev => { + handlers.onDone?.(ev.exitCode); + try { es.close(); } catch { /* ignore */ } + closed = true; + }); + + // Generic error (network failure, server unreachable). The EventSource + // will auto-retry, so we don't close here — we just tell the caller so + // they can show a "reconnecting" banner if they want. + es.addEventListener('error', () => { + if (!closed) handlers.onError?.('EventSource dropped connection (auto-retrying)'); + }); + + return { + close(): void { + if (closed) return; + closed = true; + try { es.close(); } catch { /* ignore */ } + }, + getOffset(): number { + return offset; + }, + }; +} + +/** + * POST /api/worker/[id]/stop. Used by the Stop button. + */ +export async function stopWorker( + workerId: string, + repoPath: string, +): Promise { + const params = new URLSearchParams({ repoPath }); + const res = await fetch( + `/api/worker/${encodeURIComponent(workerId)}/stop?${params}`, + { method: 'POST' }, + ); + if (!res.ok) { + throw new Error(`stopWorker: ${res.status}`); + } + return (await res.json()) as WorkerStopResponse; +} diff --git a/packages/ui/src/lib/workers/worker-paths.ts b/packages/ui/src/lib/workers/worker-paths.ts new file mode 100644 index 0000000..480ff94 --- /dev/null +++ b/packages/ui/src/lib/workers/worker-paths.ts @@ -0,0 +1,57 @@ +/** + * Shared filesystem paths used by the detached Claude worker system. + * + * The worker writes its output to a log file on disk and records its PID + * alongside it. Multiple Klonode components read these paths: + * - `/api/worker/spawn` writes them when it starts a worker + * - `/api/worker/[id]/tail` reads the log file + * - `/api/worker/[id]/stop` reads the PID file + * - Boot-time cleanup removes stale entries for workers whose PIDs no + * longer exist + * + * See #73 for the full architecture. + */ + +import { existsSync, mkdirSync } from 'fs'; +import { join } from 'path'; +import { randomBytes } from 'crypto'; + +/** Directory under the repo root where detached workers write their state. */ +export function workersDir(repoRoot: string): string { + return join(repoRoot, '.klonode', 'workers'); +} + +/** Ensure the workers directory exists. Returns the resolved path. */ +export function ensureWorkersDir(repoRoot: string): string { + const dir = workersDir(repoRoot); + if (!existsSync(dir)) { + mkdirSync(dir, { recursive: true }); + } + return dir; +} + +/** Log file for a given worker id. stdout + stderr both get redirected here. */ +export function workerLogPath(repoRoot: string, workerId: string): string { + return join(workersDir(repoRoot), `${workerId}.log`); +} + +/** PID file so /api/worker/[id]/stop can read it and send SIGTERM. */ +export function workerPidPath(repoRoot: string, workerId: string): string { + return join(workersDir(repoRoot), `${workerId}.pid`); +} + +/** Metadata file describing the worker (spawn args, cwd, start time). */ +export function workerMetaPath(repoRoot: string, workerId: string): string { + return join(workersDir(repoRoot), `${workerId}.meta.json`); +} + +/** Generate a random worker id. Short enough to embed in URLs. */ +export function newWorkerId(): string { + return `w-${Date.now().toString(36)}-${randomBytes(3).toString('hex')}`; +} + +/** Resolve the repo root from a request. Defaults to process.cwd(). */ +export function resolveRepoRoot(explicit: string | null | undefined): string { + if (explicit && existsSync(explicit)) return explicit; + return process.cwd(); +} diff --git a/packages/ui/src/lib/workers/worker-protocol.ts b/packages/ui/src/lib/workers/worker-protocol.ts new file mode 100644 index 0000000..81845d5 --- /dev/null +++ b/packages/ui/src/lib/workers/worker-protocol.ts @@ -0,0 +1,66 @@ +/** + * Shared types between the server-side worker endpoints and the browser-side + * worker client. The events emitted by the tail endpoint mirror the existing + * chat-stream SSE events so ChatPanel can reuse the same event handling code + * when it's connected to a worker. + * + * See #73. + */ + +/** + * A metadata record persisted to `.klonode/workers/.meta.json`. The tail + * endpoint reads this to know where the log file is and whether the worker + * is still considered alive. + */ +export interface WorkerMeta { + id: string; + pid: number; + repoPath: string; + cwd: string; + cliPath: string; + startedAt: string; // ISO timestamp + /** The prompt that was passed in. Stored for debugging / recovery. */ + prompt: string; + /** Set to true by /api/worker/[id]/stop after SIGTERM succeeds. */ + stopped?: boolean; + /** Set by the tail endpoint when the child exits cleanly. */ + exitCode?: number; +} + +/** + * The events the tail endpoint streams back to the browser. One per + * `stream-json` line emitted by the Claude CLI, translated through the same + * mapping the legacy /api/chat/stream endpoint used. Byte offsets into the + * log file are passed alongside so the browser can resume by offset if the + * SSE connection drops. + */ +export type WorkerStreamEvent = + | { type: 'session'; offset: number; sessionId: string } + | { type: 'tool'; offset: number; tool: string; input: string } + | { type: 'text'; offset: number; text: string } + | { + type: 'result'; + offset: number; + text: string; + usage?: unknown; + costUsd?: number; + numTurns?: number; + subtype?: string; + } + | { type: 'stderr'; offset: number; text: string } + | { type: 'error'; offset: number; message: string } + | { type: 'done'; offset: number; exitCode: number | null }; + +/** Response shape from POST /api/worker/spawn. */ +export interface WorkerSpawnResponse { + workerId: string; + repoPath: string; + logPath: string; +} + +/** Response shape from POST /api/worker/[id]/stop. */ +export interface WorkerStopResponse { + stopped: boolean; + hadPid: boolean; + wasAlive: boolean; +} diff --git a/packages/ui/src/routes/api/worker/[id]/stop/+server.ts b/packages/ui/src/routes/api/worker/[id]/stop/+server.ts new file mode 100644 index 0000000..a17d61f --- /dev/null +++ b/packages/ui/src/routes/api/worker/[id]/stop/+server.ts @@ -0,0 +1,97 @@ +/** + * POST /api/worker/[id]/stop?repoPath= + * + * Sends SIGTERM to the detached Claude worker identified by `id`. Reads the + * PID from the `.klonode/workers/.pid` file written at spawn time. + * + * Returns 200 regardless of whether the worker was actually alive — the + * idempotent behavior means a UI retry after a failed stop is safe. The + * response body reports what actually happened so the caller can tell the + * user whether the Stop click did anything. + * + * See #73. + */ + +import { json } from '@sveltejs/kit'; +import { existsSync, readFileSync, writeFileSync } from 'fs'; +import type { RequestHandler } from './$types'; +import type { WorkerMeta, WorkerStopResponse } from '$lib/workers/worker-protocol'; +import { + resolveRepoRoot, + workerMetaPath, + workerPidPath, +} from '$lib/workers/worker-paths'; + +export const POST: RequestHandler = async ({ params, url }) => { + const workerId = params.id; + if (!workerId) { + return json({ error: 'missing worker id' }, { status: 400 }); + } + + const repoRoot = resolveRepoRoot(url.searchParams.get('repoPath')); + const pidPath = workerPidPath(repoRoot, workerId); + const metaPath = workerMetaPath(repoRoot, workerId); + + const response: WorkerStopResponse = { + stopped: false, + hadPid: false, + wasAlive: false, + }; + + if (!existsSync(pidPath)) { + return json(response); + } + + response.hadPid = true; + let pid: number; + try { + pid = Number.parseInt(readFileSync(pidPath, 'utf-8').trim(), 10); + } catch { + return json(response); + } + if (!pid) return json(response); + + // Liveness probe using signal 0 — a platform-portable way to ask "is this + // PID still a running process I can signal?" + try { + process.kill(pid, 0); + response.wasAlive = true; + } catch { + response.wasAlive = false; + } + + if (!response.wasAlive) { + // Already dead. Update the meta file so other readers see the terminal + // state consistently. + markStopped(metaPath, null); + return json(response); + } + + try { + // SIGTERM first. On Windows this maps to TerminateProcess via Node's + // signal-handling layer, which is good enough for our wrapped bash. + process.kill(pid, 'SIGTERM'); + response.stopped = true; + } catch { + response.stopped = false; + } + + if (response.stopped) { + markStopped(metaPath, null); + } + + return json(response); +}; + +function markStopped(metaPath: string, exitCode: number | null): void { + if (!existsSync(metaPath)) return; + try { + const raw = readFileSync(metaPath, 'utf-8'); + const meta = JSON.parse(raw) as WorkerMeta; + meta.stopped = true; + if (exitCode !== null) meta.exitCode = exitCode; + writeFileSync(metaPath, JSON.stringify(meta, null, 2), 'utf-8'); + } catch { + // ignore + } +} diff --git a/packages/ui/src/routes/api/worker/[id]/tail/+server.ts b/packages/ui/src/routes/api/worker/[id]/tail/+server.ts new file mode 100644 index 0000000..7d1b1cf --- /dev/null +++ b/packages/ui/src/routes/api/worker/[id]/tail/+server.ts @@ -0,0 +1,284 @@ +/** + * GET /api/worker/[id]/tail?since=&repoPath= + * + * Server-sent-events stream that tails the log file a detached Claude worker + * is writing to. Starts at the caller-supplied byte offset so a reconnecting + * browser doesn't replay events it has already rendered. + * + * The tail loop: + * 1. `open` the log file read-only (separate fd from the child's write fd). + * 2. Seek to `since`, read what's there, parse any complete `stream-json` + * lines, emit mapped SSE events with the byte offset advancing. + * 3. Use `fs.watch` to get notified on subsequent appends; re-read, re-emit. + * 4. When the worker's PID stops existing AND the file has no more unread + * data, emit `{ type: 'done', exitCode: null }` and close the stream. + * 5. If the SSE connection itself is closed by the browser, `fs.watch` is + * unregistered but the worker keeps running — the browser can reconnect + * later from the new offset. + * + * See #73. + */ + +import { existsSync, openSync, fstatSync, readSync, closeSync, watch, readFileSync } from 'fs'; +import type { RequestHandler } from './$types'; +import type { WorkerStreamEvent } from '$lib/workers/worker-protocol'; +import { + resolveRepoRoot, + workerLogPath, + workerPidPath, + workerMetaPath, +} from '$lib/workers/worker-paths'; + +/** Poll interval for the liveness probe when fs.watch doesn't fire. */ +const POLL_MS = 500; + +export const GET: RequestHandler = async ({ params, url, request }) => { + const workerId = params.id; + if (!workerId) { + return new Response( + JSON.stringify({ error: 'missing worker id' }), + { status: 400, headers: { 'Content-Type': 'application/json' } }, + ); + } + + const repoRoot = resolveRepoRoot(url.searchParams.get('repoPath')); + const logPath = workerLogPath(repoRoot, workerId); + const pidPath = workerPidPath(repoRoot, workerId); + const metaPath = workerMetaPath(repoRoot, workerId); + + if (!existsSync(logPath)) { + return new Response( + JSON.stringify({ + error: 'no log file for this worker id — has it been spawned?', + logPath, + repoPath: repoRoot, + }), + { status: 404, headers: { 'Content-Type': 'application/json' } }, + ); + } + + const startOffset = Number.parseInt(url.searchParams.get('since') || '0', 10) || 0; + + const stream = new ReadableStream({ + async start(controller) { + const encoder = new TextEncoder(); + let offset = startOffset; + let closed = false; + let watcher: ReturnType | null = null; + let pollTimer: ReturnType | null = null; + + function send(ev: WorkerStreamEvent): void { + if (closed) return; + try { + controller.enqueue( + encoder.encode(`event: ${ev.type}\ndata: ${JSON.stringify(ev)}\n\n`), + ); + } catch { + closed = true; + } + } + + function close(): void { + if (closed) return; + closed = true; + if (watcher) { + try { watcher.close(); } catch { /* ignore */ } + watcher = null; + } + if (pollTimer) { + clearInterval(pollTimer); + pollTimer = null; + } + try { controller.close(); } catch { /* ignore */ } + } + + // Abort handler — if the browser disconnects, stop tailing but leave + // the worker alive. Next reconnect picks up from whatever offset the + // browser last persisted. + request.signal.addEventListener('abort', close); + + // Buffer for partial lines across reads. + let buffer = ''; + + function translateLine(line: string, lineOffset: number): WorkerStreamEvent[] { + const events: WorkerStreamEvent[] = []; + if (!line.trim()) return events; + // Skip our own header lines. + if (line.startsWith('#') || line.startsWith('---')) return events; + + let parsed: any; + try { + parsed = JSON.parse(line); + } catch { + // Not valid JSON — treat as stderr so the UI can at least show it. + events.push({ type: 'stderr', offset: lineOffset, text: line.slice(0, 200) }); + return events; + } + + // Session init + if (parsed.type === 'system' && parsed.subtype === 'init' && parsed.session_id) { + events.push({ type: 'session', offset: lineOffset, sessionId: parsed.session_id }); + } + + // Assistant content — tool_use and text blocks + if (parsed.type === 'assistant' && parsed.message?.content) { + for (const block of parsed.message.content) { + if (block.type === 'tool_use') { + events.push({ + type: 'tool', + offset: lineOffset, + tool: block.name, + input: summarizeToolInput(block.name, block.input), + }); + } else if (block.type === 'text' && block.text) { + events.push({ type: 'text', offset: lineOffset, text: block.text }); + } + } + } + + // Final result + if (parsed.type === 'result') { + events.push({ + type: 'result', + offset: lineOffset, + text: + typeof parsed.result === 'string' + ? parsed.result + : Array.isArray(parsed.result) + ? parsed.result + .filter((b: any) => b.type === 'text') + .map((b: any) => b.text) + .join('\n') + : '', + usage: parsed.usage, + costUsd: parsed.total_cost_usd, + numTurns: parsed.num_turns, + subtype: parsed.subtype, + }); + } + + return events; + } + + function drainLog(): void { + if (closed) return; + let fd = -1; + try { + fd = openSync(logPath, 'r'); + } catch { + return; + } + try { + const stat = fstatSync(fd); + if (stat.size <= offset) return; // nothing new + const toRead = stat.size - offset; + const buf = Buffer.alloc(toRead); + readSync(fd, buf, 0, toRead, offset); + offset += toRead; + + buffer += buf.toString('utf-8'); + const lines = buffer.split('\n'); + // Keep the last (possibly incomplete) line in the buffer. + buffer = lines.pop() ?? ''; + for (const line of lines) { + for (const ev of translateLine(line, offset)) { + send(ev); + } + } + } finally { + try { closeSync(fd); } catch { /* ignore */ } + } + } + + function pidAlive(): boolean { + if (!existsSync(pidPath)) return false; + try { + const pid = Number.parseInt(readFileSync(pidPath, 'utf-8').trim(), 10); + if (!pid) return false; + // `process.kill(pid, 0)` is a posix/win32 idiom to test existence. + process.kill(pid, 0); + return true; + } catch { + return false; + } + } + + function checkDoneOrKeepGoing(): void { + if (closed) return; + drainLog(); + if (!pidAlive()) { + // Child has exited. Drain one more time to catch the tail, then + // signal done to the client. + drainLog(); + let exitCode: number | null = null; + try { + const metaRaw = readFileSync(metaPath, 'utf-8'); + const meta = JSON.parse(metaRaw); + if (typeof meta.exitCode === 'number') exitCode = meta.exitCode; + } catch { /* ignore */ } + send({ type: 'done', offset, exitCode }); + close(); + } + } + + // Initial drain — emit anything already in the log since the + // caller-supplied offset. + drainLog(); + + // If the worker is already dead and drain caught up, we're done. + if (!pidAlive()) { + send({ type: 'done', offset, exitCode: null }); + close(); + return; + } + + // Watch the log file for appends. `fs.watch` is fast-path; the poll + // interval is a belt-and-suspenders fallback because fs.watch is + // unreliable on Windows network drives and some filesystems. + try { + watcher = watch(logPath, { persistent: false }, () => { + drainLog(); + }); + } catch { + // fs.watch not supported — rely on polling only. + } + pollTimer = setInterval(checkDoneOrKeepGoing, POLL_MS); + }, + + cancel() { + // Browser aborted — nothing to do; the close() handler in start() will + // fire via the AbortSignal listener. + }, + }); + + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }, + }); +}; + +/** + * Condense a tool's input object into a short human-readable string. Copy of + * the helper in /api/chat/stream. Kept local to avoid cross-importing a + * server-only function from the legacy handler. + */ +function summarizeToolInput(name: string, input: unknown): string { + if (!input || typeof input !== 'object') return ''; + const o = input as Record; + if (name === 'Read' || name === 'Edit' || name === 'Write') { + return String(o.file_path ?? '').slice(0, 200); + } + if (name === 'Bash') { + return String(o.command ?? '').slice(0, 200); + } + if (name === 'Glob') { + return String(o.pattern ?? '').slice(0, 200); + } + if (name === 'Grep') { + return `${o.pattern ?? ''}${o.path ? ` in ${o.path}` : ''}`.slice(0, 200); + } + return JSON.stringify(o).slice(0, 200); +} diff --git a/packages/ui/src/routes/api/worker/spawn/+server.ts b/packages/ui/src/routes/api/worker/spawn/+server.ts new file mode 100644 index 0000000..3decd43 --- /dev/null +++ b/packages/ui/src/routes/api/worker/spawn/+server.ts @@ -0,0 +1,176 @@ +/** + * POST /api/worker/spawn + * + * Spawns the Claude CLI as a detached child process whose stdout + stderr are + * redirected straight into `.klonode/workers/.log`. The child's PID is + * persisted so a later `/api/worker/[id]/stop` call can SIGTERM it, and a + * metadata file records the original spawn args for recovery. + * + * Because the child is `detached: true` with `stdio: ['ignore', fd, fd]`, + * it is NOT tied to the Vite dev server's lifetime. When Vite restarts + * (because the user — or Klonode itself — edited a server-side file) the + * worker keeps running, its output keeps landing in the log file, and a + * fresh tail request picks up from the last byte offset the browser + * acknowledged. + * + * This is the minimum viable version of the detached-worker architecture + * described in #73. Future follow-ups layer on named-pipe / unix-socket IPC + * for two-way control and a standalone `packages/worker/` binary. + */ + +import { json } from '@sveltejs/kit'; +import { spawn } from 'child_process'; +import { existsSync, openSync, readFileSync, writeFileSync, writeSync, closeSync } from 'fs'; +import { join } from 'path'; +import type { RequestHandler } from './$types'; +import type { WorkerMeta, WorkerSpawnResponse } from '$lib/workers/worker-protocol'; +import { + ensureWorkersDir, + newWorkerId, + resolveRepoRoot, + workerLogPath, + workerMetaPath, + workerPidPath, +} from '$lib/workers/worker-paths'; + +interface SpawnRequest { + /** Full path to the Claude CLI binary. */ + cliPath: string; + /** Root of the project Klonode is operating on. Used as the worker's cwd. */ + repoPath: string; + /** The prompt text to pipe into Claude on stdin. */ + prompt: string; + /** Max turns the worker is allowed. Defaults to 500. */ + maxTurns?: number; + /** Allowed tool names. Defaults to the full file-editing set. */ + allowedTools?: string[]; + /** Optional model override (e.g. for CO runs). */ + model?: string; +} + +export const POST: RequestHandler = async ({ request }) => { + const body: SpawnRequest = await request.json(); + + const cliPath = body.cliPath || 'claude'; + const repoRoot = resolveRepoRoot(body.repoPath); + const workerId = newWorkerId(); + + ensureWorkersDir(repoRoot); + const logPath = workerLogPath(repoRoot, workerId); + const pidPath = workerPidPath(repoRoot, workerId); + const metaPath = workerMetaPath(repoRoot, workerId); + + // Write the prompt to a temp file so we can pipe it to Claude without + // tangling with cmd.exe / bash shell quoting. + const promptPath = join(repoRoot, '.klonode', 'workers', `${workerId}.prompt.txt`); + writeFileSync(promptPath, body.prompt, 'utf-8'); + + // Build Claude CLI args. Mirrors the existing stream handler. + const maxTurns = body.maxTurns ?? 500; + const tools = body.allowedTools ?? ['Read', 'Write', 'Edit', 'Bash', 'Glob', 'Grep']; + const args = [ + '-p', + '--verbose', + '--max-turns', + String(maxTurns), + '--output-format', + 'stream-json', + '--allowedTools', + tools.join(','), + ]; + if (body.model) { + args.push('--model', body.model); + } + + // Environment cleanup — avoid polluting the worker with Klonode's own + // Claude Code env vars that would confuse the spawned instance. + const cleanEnv: NodeJS.ProcessEnv = { ...process.env }; + delete cleanEnv.CLAUDECODE; + delete cleanEnv.CLAUDE_CODE_ENTRYPOINT; + delete cleanEnv.CLAUDE_AGENT_SDK_VERSION; + delete cleanEnv.CLAUDE_CODE_DISABLE_CRON; + delete cleanEnv.CLAUDE_CODE_EMIT_TOOL_USE_SUMMARIES; + delete cleanEnv.CLAUDE_CODE_ENABLE_ASK_USER_QUESTION_TOOL; + delete cleanEnv.CLAUDE_CODE_PROVIDER_MANAGED_BY_HOST; + delete cleanEnv.DEFAULT_LLM_MODEL; + // Restore the OAuth token from the Klonode-managed file if it isn't + // already in the env. Without this the spawned Claude returns + // "Not logged in" because the env cleanup above dropped the parent + // process's token. Mirrors the logic in the legacy /api/chat/stream. + if (!cleanEnv.CLAUDE_CODE_OAUTH_TOKEN) { + try { + const tokenPath = join(process.env.HOME || process.env.USERPROFILE || '', '.claude', 'klonode-oauth-token'); + if (existsSync(tokenPath)) { + cleanEnv.CLAUDE_CODE_OAUTH_TOKEN = readFileSync(tokenPath, 'utf-8').trim(); + } + } catch { /* ignore */ } + } + if (!cleanEnv.HOME && cleanEnv.USERPROFILE) { + cleanEnv.HOME = cleanEnv.USERPROFILE; + } + + // Open the log file ourselves so the child inherits an fd that is + // decoupled from any file-handle state the parent holds. If we used + // `openSync(logPath, 'a')` directly in the spawn options the child would + // inherit *our* lifetime for that handle, which would be closed when the + // parent Vite process exits — defeating the whole point. + const outFd = openSync(logPath, 'a'); + // Write a small header so the log is self-documenting. + writeSync( + outFd, + `# klonode-worker ${workerId} started at ${new Date().toISOString()}\n# cli=${cliPath}\n# repo=${repoRoot}\n# args=${args.join(' ')}\n---\n`, + ); + + // Run through bash on Windows (matches the existing stream handler) so + // the stdin redirect from the prompt file works the same on every OS. + const isWindows = process.platform === 'win32'; + const shell = isWindows ? 'C:\\Program Files\\Git\\usr\\bin\\bash.exe' : '/bin/bash'; + const bashCliPath = cliPath.replace(/\\/g, '/'); + const bashPromptPath = promptPath.replace(/\\/g, '/'); + const shellCmd = `"${bashCliPath}" ${args.join(' ')} < "${bashPromptPath}"`; + + const child = spawn(shell, ['-c', shellCmd], { + cwd: repoRoot, + env: cleanEnv, + detached: true, + stdio: ['ignore', outFd, outFd], + windowsHide: true, + }); + + // `unref()` tells Node not to keep the parent alive on behalf of this + // child. Combined with `detached: true` and `stdio: ['ignore', fd, fd]` + // this means the child survives a parent exit — which is the entire + // point of the detached worker architecture (#73). + child.unref(); + + if (!child.pid) { + // Spawn failed synchronously. Close the fd we opened and bail. + try { closeSync(outFd); } catch { /* ignore */ } + return json({ error: 'failed to spawn Claude CLI worker' }, { status: 500 }); + } + + // We can close the fd on the parent side now — the OS keeps the file + // open for the child because the dup2 that spawn did transfers + // ownership. The child writes independently after this. + try { closeSync(outFd); } catch { /* ignore */ } + + // Persist PID + meta so /stop and /tail can find the worker later. + writeFileSync(pidPath, String(child.pid), 'utf-8'); + const meta: WorkerMeta = { + id: workerId, + pid: child.pid, + repoPath: repoRoot, + cwd: repoRoot, + cliPath, + startedAt: new Date().toISOString(), + prompt: body.prompt, + }; + writeFileSync(metaPath, JSON.stringify(meta, null, 2), 'utf-8'); + + const response: WorkerSpawnResponse = { + workerId, + repoPath: repoRoot, + logPath, + }; + return json(response); +};