From db19d8d60960735450ba8f2852b459cc4205fa93 Mon Sep 17 00:00:00 2001 From: Jason Tang Date: Tue, 10 Mar 2026 19:53:50 -0500 Subject: [PATCH] fix: detect silent SSE connection drops during long tool executions When Claude runs a long script, the SSE connection can be silently dropped by transport layers (OS TCP stack, Electron internals, HTTP idle timeouts). Previously this caused the stream to appear "completed" with partial content and no error message. Two-part fix: Server-side heartbeat (claude-client.ts): - Send keep_alive SSE events every 30s independently of SDK activity - Prevents intermediate layers from considering the connection idle - Timer properly cleaned up in all exit paths including cancel() Client-side detection (useSSEStream.ts + stream-session-manager.ts): - Track whether the server's 'done' SSE event was received - If reader finishes without 'done', treat as connection drop - Show "Connection lost" error and clear stale SDK session - Flush residual SSE buffer after reader signals done i18n (en.ts + zh.ts): - Add streaming.connectionDrop translation key Co-Authored-By: Claude Opus 4.6 --- src/hooks/useSSEStream.ts | 21 ++++++++++++++++-- src/i18n/en.ts | 1 + src/i18n/zh.ts | 1 + src/lib/claude-client.ts | 21 ++++++++++++++++++ src/lib/stream-session-manager.ts | 36 +++++++++++++++++++++++++++++++ 5 files changed, 78 insertions(+), 2 deletions(-) diff --git a/src/hooks/useSSEStream.ts b/src/hooks/useSSEStream.ts index 79df5882..a37c7f3f 100644 --- a/src/hooks/useSSEStream.ts +++ b/src/hooks/useSSEStream.ts @@ -237,11 +237,12 @@ function handleSSEEvent( export async function consumeSSEStream( reader: ReadableStreamDefaultReader, callbacks: SSECallbacks, -): Promise<{ accumulated: string; tokenUsage: TokenUsage | null }> { +): Promise<{ accumulated: string; tokenUsage: TokenUsage | null; receivedDone: boolean }> { const decoder = new TextDecoder(); let buffer = ''; let accumulated = ''; let tokenUsage: TokenUsage | null = null; + let receivedDone = false; const wrappedCallbacks: SSECallbacks = { ...callbacks, @@ -264,6 +265,9 @@ export async function consumeSSEStream( try { const event: SSEEvent = JSON.parse(line.slice(6)); + if (event.type === 'done') { + receivedDone = true; + } accumulated = handleSSEEvent(event, accumulated, wrappedCallbacks); } catch { // skip malformed SSE lines @@ -271,7 +275,20 @@ export async function consumeSSEStream( } } - return { accumulated, tokenUsage }; + // Flush any residual buffer in case the final chunk didn't end with \n + if (buffer.trim().startsWith('data: ')) { + try { + const event: SSEEvent = JSON.parse(buffer.trim().slice(6)); + if (event.type === 'done') { + receivedDone = true; + } + accumulated = handleSSEEvent(event, accumulated, wrappedCallbacks); + } catch { + // skip malformed residual data + } + } + + return { accumulated, tokenUsage, receivedDone }; } /** diff --git a/src/i18n/en.ts b/src/i18n/en.ts index 479d50f5..24eeda06 100644 --- a/src/i18n/en.ts +++ b/src/i18n/en.ts @@ -57,6 +57,7 @@ const en = { 'streaming.allowForSession': 'Allow for Session', 'streaming.allowed': 'Allowed', 'streaming.denied': 'Denied', + 'streaming.connectionDrop': 'Connection lost — the server stream ended unexpectedly. Claude may still be running in the background. Please try sending your message again.', // ── Chat view / session page ──────────────────────────────── 'chat.newConversation': 'New Conversation', diff --git a/src/i18n/zh.ts b/src/i18n/zh.ts index c239873d..20537069 100644 --- a/src/i18n/zh.ts +++ b/src/i18n/zh.ts @@ -54,6 +54,7 @@ const zh: Record = { 'streaming.allowForSession': '本次会话允许', 'streaming.allowed': '已允许', 'streaming.denied': '已拒绝', + 'streaming.connectionDrop': '连接中断 — 服务器流意外结束。Claude 可能仍在后台运行,请尝试重新发送消息。', // ── Chat view / session page ──────────────────────────────── 'chat.newConversation': '新对话', diff --git a/src/lib/claude-client.ts b/src/lib/claude-client.ts index 9979e91e..4d6ed193 100644 --- a/src/lib/claude-client.ts +++ b/src/lib/claude-client.ts @@ -397,6 +397,8 @@ export function streamClaude(options: ClaudeStreamOptions): ReadableStream | null = null; + return new ReadableStream({ async start(controller) { // Resolve provider via the unified resolver. The caller may pass an explicit @@ -824,6 +826,22 @@ export function streamClaude(options: ClaudeStreamOptions): ReadableStream>(); + + // Server-side heartbeat: send keep_alive every 30s to prevent + // transport-level idle connection drops (Electron, OS TCP, proxies). + // This is independent of the SDK's own keep_alive messages. + heartbeatTimer = setInterval(() => { + try { + controller.enqueue(formatSSE({ type: 'keep_alive', data: '' })); + } catch { + // Controller may be closed — stop heartbeat + if (heartbeatTimer) { + clearInterval(heartbeatTimer); + heartbeatTimer = null; + } + } + }, 30_000); + for await (const message of conversation) { if (abortController?.signal.aborted) { break; @@ -1059,9 +1077,11 @@ export function streamClaude(options: ClaudeStreamOptions): ReadableStream {}); + + scheduleGC(stream); + return; + } + // Stream completed successfully — build final message content const accumulated = result.accumulated; const finalToolUses = stream.toolUsesArray;