Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,6 @@ coverage/

# Marketing drafts (private — edit/publish externally)
.marketing/

# Detached Claude worker state (see #73)
.klonode/workers/
278 changes: 191 additions & 87 deletions packages/ui/src/lib/components/ChatPanel/ChatPanel.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
coContextUsage, CO_MAX_CONTEXT, closedSessionQueue,
CONTEXT_DEPTH_LABELS,
getCliSessionId, setCliSessionId, clearCliSessionId,
getActiveWorkerId, setActiveWorkerId, clearActiveWorkerId,
getLastWorkerOffset, setLastWorkerOffset,
type ContextDepth as CtxDepth,
} from '../../stores/agents';
import { recordActivity, clearActivity } from '../../stores/activity';
Expand All @@ -18,6 +20,7 @@
defineComponentAction,
defineComponentState,
} from '../../workstation/registry';
import { spawnWorker, connectWorker, stopWorker, type WorkerConnection } from '../../workers/worker-client';

// Register with the workstation self-introspection registry so Claude can
// read "which session is active, how many messages, is it loading" and
Expand Down Expand Up @@ -113,6 +116,8 @@
let attachments: { name: string; type: string; dataUrl: string; file: File }[] = [];
let fileInput: HTMLInputElement;
let abortController: AbortController | null = null;
/** Active detached-worker SSE connection. Closed on Stop or onDestroy. */
let activeWorkerConnection: WorkerConnection | null = null;
// Klonode session tab ID → Claude CLI session ID is now tracked in
// sessionsStore.cliSessionIds and persisted to localStorage, so reloads
// and Vite server restarts preserve conversation continuity. Use
Expand Down Expand Up @@ -163,7 +168,9 @@
return;
}

// For CLI mode: use streaming endpoint for live feedback
// For CLI mode: spawn a detached Claude worker and tail its log file.
// This survives Vite HMR / dev-server restarts because the worker is
// NOT a child of the Vite process — see #73.
if (settings.connectionMode === 'cli') {
const graph = (await import('../../stores/graph')).graphStore;
let repoPath = '';
Expand All @@ -178,89 +185,78 @@
messages: [...s.messages, userMsg, loadingMsg],
}));

// Build the same system prompt the legacy /api/chat/stream endpoint
// used. The worker just spawns Claude — it doesn't inject a prompt
// on its own, so we do it here and pipe it through the `prompt`
// request field.
const systemPrompt = isCO
? `You are an experienced developer with full access to all tools. Work directly in the project directory.\n\nAnswer in Norwegian unless the user writes in English. Write all code and CONTEXT.md files in English.`
: `Erfaren utvikler. Svar pa norsk med mindre brukeren skriver pa engelsk.`;
const fullPrompt = `${systemPrompt}\n\nBrukerens sporsmaal: ${userMessage}`;

let resultText = '';
let resultTokens = { input: 0, output: 0, total: 0, costUsd: 0, numTurns: 0 };
const klonodeSessionId = $sessionsStore.activeSessionId;

try {
abortController = new AbortController();
const res = await fetch('/api/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
signal: abortController.signal,
body: JSON.stringify({
message: userMessage,
context: '',
cliPath: settings.cliPath,
mode: 'with-klonode',
repoPath,
executionMode: isCO ? 'bypass' : (settings.executionMode === 'auto' ? 'bypass' : settings.executionMode),
isCO,
sessionId: getCliSessionId($sessionsStore.activeSessionId),
}),
const { workerId } = await spawnWorker({
cliPath: settings.cliPath,
repoPath,
prompt: fullPrompt,
maxTurns: isCO ? 500 : 500,
});
setActiveWorkerId(klonodeSessionId, workerId);

// Wrap the callback-based stream in a promise so the rest of the
// send flow can await a clean `done` event.
await new Promise<void>((resolve) => {
activeWorkerConnection = connectWorker(workerId, 0, repoPath, {
onSession: (sid) => {
if (sid) setCliSessionId(klonodeSessionId, sid);
},
onTool: (tool, input) => {
activityLog = [...activityLog, { tool, input, time: new Date() }];
recordActivity(tool, input, repoPath);
tick().then(scrollToBottom);
},
onText: (text) => {
streamingText += text;
},
onResult: (r) => {
resultText = r.text || streamingText || '';
const u = r.usage as { input_tokens?: number; output_tokens?: number; cache_creation_input_tokens?: number; cache_read_input_tokens?: number; } | undefined;
if (u) {
resultTokens = {
input: (u.input_tokens || 0) + (u.cache_creation_input_tokens || 0) + (u.cache_read_input_tokens || 0),
output: u.output_tokens || 0,
total: 0,
costUsd: r.costUsd || 0,
numTurns: r.numTurns || 0,
};
resultTokens.total = resultTokens.input + resultTokens.output;
}
},
onStderr: (text) => {
// Surface in console for now — a future PR wires this to a
// dedicated stderr strip in the chat panel.
// eslint-disable-next-line no-console
console.debug('[worker stderr]', text);
},
onError: (message) => {
resultText = `Feil: ${message}`;
},
onOffset: (offset) => {
setLastWorkerOffset(klonodeSessionId, offset);
},
onDone: () => {
resolve();
},
});
});

const reader = res.body?.getReader();
const decoder = new TextDecoder();
let resultText = '';
let resultTokens = { input: 0, output: 0, total: 0, costUsd: 0, numTurns: 0 };

if (reader) {
let buf = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;

buf += decoder.decode(value, { stream: true });
const parts = buf.split('\n\n');
buf = parts.pop() || '';

for (const part of parts) {
const eventMatch = part.match(/^event: (\w+)\ndata: (.+)$/s);
if (!eventMatch) continue;
const [, eventType, dataStr] = eventMatch;
try {
const data = JSON.parse(dataStr);
switch (eventType) {
case 'session':
// Store CLI session ID for this tab so next message resumes the conversation
if (data.sessionId) {
setCliSessionId($sessionsStore.activeSessionId, data.sessionId);
}
break;
case 'tool':
activityLog = [...activityLog, { tool: data.tool, input: data.input, time: new Date() }];
// Also push to the shared activity store so tree/graph can highlight
recordActivity(data.tool, data.input, repoPath);
await tick();
scrollToBottom();
break;
case 'text':
streamingText += data.text;
break;
case 'result':
resultText = data.text || streamingText || '';
if (data.usage) {
resultTokens = {
input: (data.usage.input_tokens || 0) + (data.usage.cache_creation_input_tokens || 0) + (data.usage.cache_read_input_tokens || 0),
output: data.usage.output_tokens || 0,
total: 0,
costUsd: data.costUsd || 0,
numTurns: data.numTurns || 0,
};
resultTokens.total = resultTokens.input + resultTokens.output;
}
break;
case 'error':
resultText = `Feil: ${data.message}`;
break;
}
} catch { /* skip bad JSON */ }
}
}
}

// If no result text but we have streaming text, use that
if (!resultText && streamingText) resultText = streamingText;
if (!resultText) resultText = 'Claude brukte alle steg. Prov et mer spesifikt sporsmal.';

abortController = null;
chatStore.update(s => ({
...s, isLoading: false,
messages: s.messages.map(m => m.id === loadingId ? {
Expand All @@ -269,6 +265,8 @@
} : m),
}));
streamingText = '';
clearActiveWorkerId(klonodeSessionId);
activeWorkerConnection = null;

// After CO finishes, refresh the graph so UI shows updated CONTEXT.md files
if (isCO) {
Expand All @@ -280,7 +278,6 @@
});
const refreshData = await refreshRes.json();
if (refreshData.updated > 0) {
// Reload the graph in the UI
const { loadGraphFromUrl } = await import('../../stores/loader');
await loadGraphFromUrl('/demo-graph.json');
console.log(`[Klonode] Graph refreshed: ${refreshData.updated} files updated`);
Expand All @@ -290,7 +287,8 @@
}
}
} catch (err) {
abortController = null;
activeWorkerConnection = null;
clearActiveWorkerId(klonodeSessionId);
chatStore.update(s => ({
...s, isLoading: false,
messages: s.messages.map(m => m.id === loadingId ? {
Expand Down Expand Up @@ -377,17 +375,37 @@ Rules:
handleSend();
}

function handleStop() {
async function handleStop() {
// Close the SSE tail first so the UI stops receiving events.
if (activeWorkerConnection) {
activeWorkerConnection.close();
activeWorkerConnection = null;
}
// Tell the server to SIGTERM the detached worker.
const klonodeSessionId = $sessionsStore.activeSessionId;
const workerId = getActiveWorkerId(klonodeSessionId);
if (workerId) {
const graph = (await import('../../stores/graph')).graphStore;
let repoPath = '';
graph.subscribe(g => { if (g) repoPath = g.repoPath; })();
try {
await stopWorker(workerId, repoPath);
} catch (e) {
console.warn('[Klonode] stopWorker failed', e);
}
clearActiveWorkerId(klonodeSessionId);
}
// Legacy AbortController path (API mode / compare mode still use fetch).
if (abortController) {
abortController.abort();
abortController = null;
chatStore.update(s => ({
...s, isLoading: false,
messages: s.messages.map(m => m.loading
? { ...m, loading: false, content: 'Stoppet av bruker.' }
: m),
}));
}
chatStore.update(s => ({
...s, isLoading: false,
messages: s.messages.map(m => m.loading
? { ...m, loading: false, content: 'Stoppet av bruker.' }
: m),
}));
}

function scrollToBottom() {
Expand Down Expand Up @@ -418,6 +436,92 @@ Rules:
if ($settingsStore.connectionMode === 'cli' && !$settingsStore.cliPath) {
await detectCli();
}

// Self-hosting resume: if this tab had an active detached worker when
// the page was unloaded (Vite HMR, reload, explicit refresh), reconnect
// to it and resume streaming from the last persisted byte offset. The
// worker has been writing to disk the whole time, so we never lose any
// tool calls or tokens. See #73.
const klonodeSessionId = $sessionsStore.activeSessionId;
const workerId = getActiveWorkerId(klonodeSessionId);
if (workerId) {
const graph = (await import('../../stores/graph')).graphStore;
let repoPath = '';
graph.subscribe(g => { if (g) repoPath = g.repoPath; })();
const since = getLastWorkerOffset(klonodeSessionId);

// Find the most recent assistant message that was flagged as
// interrupted at hydrate time — that's the bubble we should resume
// streaming into. If there isn't one, append a fresh loading bubble.
const state = get(chatStore);
let targetId = '';
for (let i = state.messages.length - 1; i >= 0; i--) {
if (state.messages[i].role === 'assistant' && state.messages[i].interrupted) {
targetId = state.messages[i].id;
break;
}
}
if (!targetId) {
targetId = Math.random().toString(36).slice(2, 10);
chatStore.update(s => ({
...s,
isLoading: true,
messages: [
...s.messages,
{ id: targetId, role: 'assistant' as const, content: '', loading: true, timestamp: new Date() },
],
}));
} else {
// Clear interrupted + flip back to loading so the UI re-shows the
// streaming indicators instead of the "interrupted" banner.
chatStore.update(s => ({
...s,
isLoading: true,
messages: s.messages.map(m => m.id === targetId
? { ...m, interrupted: false, loading: true }
: m),
}));
}

let resumedResultText = '';
const resumedTokens = { input: 0, output: 0, total: 0, costUsd: 0, numTurns: 0 };

activeWorkerConnection = connectWorker(workerId, since, repoPath, {
onSession: (sid) => { if (sid) setCliSessionId(klonodeSessionId, sid); },
onTool: (tool, input) => {
activityLog = [...activityLog, { tool, input, time: new Date() }];
recordActivity(tool, input, repoPath);
tick().then(scrollToBottom);
},
onText: (text) => { streamingText += text; },
onResult: (r) => {
resumedResultText = r.text || streamingText || '';
const u = r.usage as { input_tokens?: number; output_tokens?: number; cache_creation_input_tokens?: number; cache_read_input_tokens?: number; } | undefined;
if (u) {
resumedTokens.input = (u.input_tokens || 0) + (u.cache_creation_input_tokens || 0) + (u.cache_read_input_tokens || 0);
resumedTokens.output = u.output_tokens || 0;
resumedTokens.costUsd = r.costUsd || 0;
resumedTokens.numTurns = r.numTurns || 0;
resumedTokens.total = resumedTokens.input + resumedTokens.output;
}
},
onError: (message) => { resumedResultText = `Feil: ${message}`; },
onOffset: (offset) => setLastWorkerOffset(klonodeSessionId, offset),
onDone: () => {
const finalText = resumedResultText || streamingText || 'Respons gjenopprettet.';
chatStore.update(s => ({
...s,
isLoading: false,
messages: s.messages.map(m => m.id === targetId
? { ...m, loading: false, content: finalText, tokens: resumedTokens.total > 0 ? resumedTokens : undefined }
: m),
}));
streamingText = '';
clearActiveWorkerId(klonodeSessionId);
activeWorkerConnection = null;
},
});
}
});
</script>

Expand Down
Loading
Loading