Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions src/hooks/useSSEStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,12 @@ function handleSSEEvent(
export async function consumeSSEStream(
reader: ReadableStreamDefaultReader<Uint8Array>,
callbacks: SSECallbacks,
): Promise<{ accumulated: string; tokenUsage: TokenUsage | null }> {
): Promise<{ accumulated: string; tokenUsage: TokenUsage | null; receivedDone: boolean }> {
const decoder = new TextDecoder();
let buffer = '';
let accumulated = '';
let tokenUsage: TokenUsage | null = null;
let receivedDone = false;

const wrappedCallbacks: SSECallbacks = {
...callbacks,
Expand All @@ -264,14 +265,30 @@ export async function consumeSSEStream(

try {
const event: SSEEvent = JSON.parse(line.slice(6));
if (event.type === 'done') {
receivedDone = true;
}
accumulated = handleSSEEvent(event, accumulated, wrappedCallbacks);
} catch {
// skip malformed SSE lines
}
}
}

return { accumulated, tokenUsage };
// Flush any residual buffer in case the final chunk didn't end with \n
if (buffer.trim().startsWith('data: ')) {
try {
const event: SSEEvent = JSON.parse(buffer.trim().slice(6));
if (event.type === 'done') {
receivedDone = true;
}
accumulated = handleSSEEvent(event, accumulated, wrappedCallbacks);
} catch {
// skip malformed residual data
}
}

return { accumulated, tokenUsage, receivedDone };
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/i18n/en.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const en = {
'streaming.allowForSession': 'Allow for Session',
'streaming.allowed': 'Allowed',
'streaming.denied': 'Denied',
'streaming.connectionDrop': 'Connection lost — the server stream ended unexpectedly. Claude may still be running in the background. Please try sending your message again.',

// ── Chat view / session page ────────────────────────────────
'chat.newConversation': 'New Conversation',
Expand Down
1 change: 1 addition & 0 deletions src/i18n/zh.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const zh: Record<TranslationKey, string> = {
'streaming.allowForSession': '本次会话允许',
'streaming.allowed': '已允许',
'streaming.denied': '已拒绝',
'streaming.connectionDrop': '连接中断 — 服务器流意外结束。Claude 可能仍在后台运行,请尝试重新发送消息。',

// ── Chat view / session page ────────────────────────────────
'chat.newConversation': '新对话',
Expand Down
21 changes: 21 additions & 0 deletions src/lib/claude-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ export function streamClaude(options: ClaudeStreamOptions): ReadableStream<strin
generativeUI,
} = options;

let heartbeatTimer: ReturnType<typeof setInterval> | null = null;

return new ReadableStream<string>({
async start(controller) {
// Resolve provider via the unified resolver. The caller may pass an explicit
Expand Down Expand Up @@ -824,6 +826,22 @@ export function streamClaude(options: ClaudeStreamOptions): ReadableStream<strin
let tokenUsage: TokenUsage | null = null;
// Track pending TodoWrite tool_use_ids so we can sync after successful execution
const pendingTodoWrites = new Map<string, Array<{ content: string; status: string; activeForm?: string }>>();

// Server-side heartbeat: send keep_alive every 30s to prevent
// transport-level idle connection drops (Electron, OS TCP, proxies).
// This is independent of the SDK's own keep_alive messages.
heartbeatTimer = setInterval(() => {
try {
controller.enqueue(formatSSE({ type: 'keep_alive', data: '' }));
} catch {
// Controller may be closed — stop heartbeat
if (heartbeatTimer) {
clearInterval(heartbeatTimer);
heartbeatTimer = null;
}
}
}, 30_000);

for await (const message of conversation) {
if (abortController?.signal.aborted) {
break;
Expand Down Expand Up @@ -1059,9 +1077,11 @@ export function streamClaude(options: ClaudeStreamOptions): ReadableStream<strin
}
}

if (heartbeatTimer) { clearInterval(heartbeatTimer); heartbeatTimer = null; }
controller.enqueue(formatSSE({ type: 'done', data: '' }));
controller.close();
} catch (error) {
if (heartbeatTimer) { clearInterval(heartbeatTimer); heartbeatTimer = null; }
const rawMessage = error instanceof Error ? error.message : 'Unknown error';
// Log full error details for debugging (visible in terminal / dev tools)
const stderrContent = error instanceof Error ? (error as { stderr?: string }).stderr : undefined;
Expand Down Expand Up @@ -1124,6 +1144,7 @@ export function streamClaude(options: ClaudeStreamOptions): ReadableStream<strin
},

cancel() {
if (heartbeatTimer) { clearInterval(heartbeatTimer); heartbeatTimer = null; }
abortController?.abort();
},
});
Expand Down
36 changes: 36 additions & 0 deletions src/lib/stream-session-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,42 @@ async function runStream(stream: ActiveStream, params: StartStreamParams): Promi
},
});

// Detect premature stream end (connection drop without server 'done' event)
if (!result.receivedDone) {
cleanupTimers(stream);

const dropMsg = 'Connection lost — the server stream ended unexpectedly. Claude may still be running in the background. Please try sending your message again.';
const errContent = stream.accumulatedText.trim()
? stream.accumulatedText.trim() + `\n\n**Error:** ${dropMsg}`
: `**Error:** ${dropMsg}`;

stream.snapshot = {
...buildSnapshot(stream),
phase: 'error',
completedAt: Date.now(),
error: dropMsg,
finalMessageContent: errContent,
statusText: undefined,
pendingPermission: null,
permissionResolved: null,
};
stream.accumulatedText = '';
stream.toolUsesArray = [];
stream.toolResultsArray = [];
stream.toolOutputAccumulated = '';
emit(stream, 'completed');

// Clear stale SDK session so next message starts fresh
fetch(`/api/chat/sessions/${encodeURIComponent(stream.sessionId)}`, {
method: 'PATCH',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ sdk_session_id: '' }),
}).catch(() => {});

scheduleGC(stream);
return;
}

// Stream completed successfully — build final message content
const accumulated = result.accumulated;
const finalToolUses = stream.toolUsesArray;
Expand Down
Loading