diff --git a/.changeset/fix-web-streaming-render-jank.md b/.changeset/fix-web-streaming-render-jank.md new file mode 100644 index 000000000..78647db3c --- /dev/null +++ b/.changeset/fix-web-streaming-render-jank.md @@ -0,0 +1,5 @@ +--- +"@moonshot-ai/kimi-code": patch +--- + +Keep the web chat responsive during long streaming replies by isolating live token text from the rest of the UI state, so it no longer stalls the main thread. diff --git a/apps/kimi-web/src/api/daemon/eventReducer.ts b/apps/kimi-web/src/api/daemon/eventReducer.ts index 23a94e002..47d9c4b47 100644 --- a/apps/kimi-web/src/api/daemon/eventReducer.ts +++ b/apps/kimi-web/src/api/daemon/eventReducer.ts @@ -84,7 +84,13 @@ export function createInitialState(): KimiClientState { function cloneState(s: KimiClientState): KimiClientState { return { ...s, - sessions: [...s.sessions], + // Reuse the `sessions` array reference when an event does not touch it. + // Every session-mutating case below already builds its own array via + // `[...]` / `.map` / `.filter`, so sharing the reference is safe — and it + // keeps `rawState.sessions` stable for events that don't change sessions, + // so the sidebar computeds (sessionsForView / workspaceGroups / + // mergedWorkspaces) are not dirtied by unrelated events. + sessions: s.sessions, messagesBySession: { ...s.messagesBySession }, approvalsBySession: { ...s.approvalsBySession }, planReviewByToolCallId: { ...s.planReviewByToolCallId }, diff --git a/apps/kimi-web/src/components/chat/ChatPane.vue b/apps/kimi-web/src/components/chat/ChatPane.vue index 5217731cc..52a3997e5 100644 --- a/apps/kimi-web/src/components/chat/ChatPane.vue +++ b/apps/kimi-web/src/components/chat/ChatPane.vue @@ -5,6 +5,7 @@ import { useI18n } from 'vue-i18n'; import type { ChatTurn, ApprovalBlock, FilePreviewRequest, ToolMedia } from '../../types'; import ToolCall from './ToolCall.vue'; import Markdown from './Markdown.vue'; +import StreamingBlocks from './StreamingBlocks.vue'; import ThinkingBlock from './ThinkingBlock.vue'; import ActivityNotice from './ActivityNotice.vue'; import AgentCard from './AgentCard.vue'; @@ -23,6 +24,7 @@ import { turnFinalText, turnToMarkdown, } from '../chatTurnRendering'; +import { streamingBySession, type StreamingBlock } from '../../composables/client/streamingStore'; const { t } = useI18n(); @@ -44,6 +46,12 @@ onUnmounted(() => { const props = withDefaults( defineProps<{ turns: ChatTurn[]; + /** + * The session these turns belong to. Used by the streaming renderer to look + * up the live text in the streaming store. Optional so SideChatPanel (which + * renders a subagent transcript, not a streaming session) can omit it. + */ + sessionId?: string; approvals?: { approvalId: string; block: ApprovalBlock; agentName?: string }[]; /** * Bubble chat layout: render each turn as a chat bubble (user = right-aligned @@ -198,7 +206,7 @@ const emit = defineEmits<{ openMedia: [media: ToolMedia]; copyConversationCopied: []; /** Show a thinking block's full text in the right-side panel. */ - openThinking: [target: { turnId: string; blockIndex: number }]; + openThinking: [target: { turnId: string; blockIndex: number; live?: boolean }]; /** Show a compaction divider's summary text in the right-side panel. */ openCompaction: [target: { turnId: string }]; /** Show a subagent's full detail in the right-side panel. */ @@ -293,14 +301,37 @@ function confirmEditMessage(turn: ChatTurn): void { const copiedConversation = ref(false); let copiedConversationTimer: ReturnType | null = null; +/** Live text/thinking blocks for the turn currently streaming, if any. */ +function liveBlocksForStreaming(): StreamingBlock[] { + if (!props.running || !props.sessionId) return []; + return streamingBySession[props.sessionId]?.blocks ?? []; +} + +/** + * Merge the still-streaming live blocks into a turn for serialization (copy). + * Live text/thinking is not in `turn.blocks` during streaming (deltas bypass + * messagesBySession), so without this a copy mid-stream would drop the tail. + */ +function withLiveBlocks(turn: ChatTurn, liveBlocks: StreamingBlock[]): ChatTurn { + if (liveBlocks.length === 0) return turn; + const blocks = turn.blocks ? [...turn.blocks] : turnBlocks(turn); + for (const blk of liveBlocks) { + if (blk.kind === 'text' && blk.text) blocks.push({ kind: 'text', text: blk.text }); + else if (blk.kind === 'thinking' && blk.text) blocks.push({ kind: 'thinking', thinking: blk.text }); + } + return { ...turn, blocks }; +} + /** Convert the entire conversation to Markdown and copy to clipboard. */ function copyConversation(): void { if (props.turns.length === 0) return; + const liveBlocks = liveBlocksForStreaming(); const lines: string[] = []; for (const turn of props.turns) { if (turn.role === 'compaction') continue; // dividers don't copy + const t = turn.id === streamingTurnId.value ? withLiveBlocks(turn, liveBlocks) : turn; const roleLabel = turn.role === 'user' ? 'User' : 'Assistant'; - const content = turnToMarkdown(turn); + const content = turnToMarkdown(t); if (content.trim()) { lines.push(`**${roleLabel}**\n\n${content}`); } @@ -329,8 +360,9 @@ function assistantRunEndingAt(index: number): ChatTurn[] { } function assistantRunFinalText(index: number): string { + const liveBlocks = liveBlocksForStreaming(); return assistantRunEndingAt(index) - .map((t) => turnFinalText(t)) + .map((t) => turnFinalText(t.id === streamingTurnId.value ? withLiveBlocks(t, liveBlocks) : t)) .filter(Boolean) .join('\n\n'); } @@ -537,6 +569,14 @@ function isStreamingRenderBlock(turn: ChatTurn, block: { sourceIndex: number }): +
{{ formatDuration(turn.durationMs) }}
diff --git a/apps/kimi-web/src/components/chat/ConversationPane.vue b/apps/kimi-web/src/components/chat/ConversationPane.vue index ad96e6266..aa251bb3c 100644 --- a/apps/kimi-web/src/components/chat/ConversationPane.vue +++ b/apps/kimi-web/src/components/chat/ConversationPane.vue @@ -105,7 +105,7 @@ const emit = defineEmits<{ selectModel: [modelId: string]; openFile: [target: FilePreviewRequest]; openMedia: [media: ToolMedia]; - openThinking: [target: { turnId: string; blockIndex: number }]; + openThinking: [target: { turnId: string; blockIndex: number; live?: boolean }]; openCompaction: [target: { turnId: string }]; openAgent: [target: { turnId: string; blockIndex: number; memberId: string }]; openToolDiff: [id: string]; @@ -1008,6 +1008,7 @@ defineExpose({ loadComposerForEdit }); ref="chatPaneRef" :key="fileReloadKey ?? 'no-session'" :turns="turns" + :session-id="sessionId" :approvals="approvals" :bubble="bubble" :mobile="mobile" diff --git a/apps/kimi-web/src/components/chat/Markdown.vue b/apps/kimi-web/src/components/chat/Markdown.vue index dcafb4790..c55ec2d17 100644 --- a/apps/kimi-web/src/components/chat/Markdown.vue +++ b/apps/kimi-web/src/components/chat/Markdown.vue @@ -70,12 +70,15 @@ const renderPlan = computed(() => { // Code blocks follow the app colour scheme (shiki re-renders on flip). const isDark = useIsDark(); -// markstream's chat mode can batch nodes and defer offscreen nodes. Batching is -// safe for settled history, but viewport deferral can leave individual code -// blocks blank in our internal chat scroller when visibility events are missed -// during a session/theme switch. Keep batching for history, but always mount the -// actual nodes so every code block has at least its plain fallback immediately. -const allowBatchRender = computed(() => !props.streaming); +// markstream's chat mode batches node mounting across frames (frame-budget +// scheduling) and can defer offscreen nodes. Viewport deferral can leave +// individual code blocks blank in our internal chat scroller when visibility +// events are missed, so it stays disabled below (`deferNodesUntilVisible: +// false`). Batching itself only spreads mounting by a frame or two and is +// exactly the scenario streaming needs, so it stays on for both live and +// settled content (the `loading: false` code-block prop already removes the +// skeleton, so a not-yet-mounted block simply appears a frame later). +const allowBatchRender = computed(() => true); // --------------------------------------------------------------------------- // Local image resolution — rewrite the SOURCE TEXT before markstream sees it. diff --git a/apps/kimi-web/src/components/chat/StreamingBlocks.vue b/apps/kimi-web/src/components/chat/StreamingBlocks.vue new file mode 100644 index 000000000..ea002478d --- /dev/null +++ b/apps/kimi-web/src/components/chat/StreamingBlocks.vue @@ -0,0 +1,51 @@ + + + + + diff --git a/apps/kimi-web/src/composables/client/streamingStore.ts b/apps/kimi-web/src/composables/client/streamingStore.ts new file mode 100644 index 000000000..73f3a80ed --- /dev/null +++ b/apps/kimi-web/src/composables/client/streamingStore.ts @@ -0,0 +1,71 @@ +// apps/kimi-web/src/composables/client/streamingStore.ts +// +// Fine-grained streaming-text store, kept OUTSIDE `rawState` on purpose. +// +// `assistantDelta` is the only genuinely high-frequency event (dozens to +// hundreds per second). Routing it through the immutable reducer + the coarse +// `rawState` graph makes every delta re-render the whole App and recompute the +// sidebar computeds (see the main-thread-jank investigation). Instead, deltas +// append here and only the single `StreamingBlocks` component subscribed to a +// session re-renders. +// +// Lifecycle: deltas append; `messageUpdated` (authoritative full content) and +// turn-end (`sessionStatusChanged` idle/aborted) clear the entry so the +// committed content in `messagesBySession` takes over without duplication. + +import { reactive } from 'vue'; + +export interface StreamingBlock { + contentIndex: number; + kind: 'text' | 'thinking'; + text: string; +} + +export interface StreamingState { + /** id of the assistant message currently being streamed. */ + messageId: string; + /** Ordered live text/thinking blocks (always trailing in the message). */ + blocks: StreamingBlock[]; +} + +/** + * Per-session live streaming state. A session has at most one in-flight + * assistant message (its trailing one), so a single entry per session suffices. + */ +export const streamingBySession = reactive>({}); + +/** + * Append one `assistantDelta` to the streaming store. O(1): either mutates the + * trailing block's text in place (same contentIndex) or pushes a new block + * (new contentIndex, rare). Never touches `rawState`, so no heavy computed + * (`turns`, sidebar) is dirtied. + */ +export function appendStreamingDelta( + sessionId: string, + messageId: string, + contentIndex: number, + delta: { text?: string; thinking?: string }, +): void { + let state = streamingBySession[sessionId]; + // A new assistant message (new step, or text resuming after a tool) starts a + // fresh entry — the previous message is already committed via messageUpdated. + if (!state || state.messageId !== messageId) { + state = streamingBySession[sessionId] = { messageId, blocks: [] }; + } + + const kind: 'text' | 'thinking' = delta.text !== undefined ? 'text' : 'thinking'; + const chunk = delta.text ?? delta.thinking ?? ''; + if (chunk.length === 0) return; + + const last = state.blocks.at(-1); + if (last && last.contentIndex === contentIndex && last.kind === kind) { + last.text += chunk; + } else { + state.blocks.push({ contentIndex, kind, text: chunk }); + } +} + +/** Drop the live entry for a session (commit or turn end). */ +export function clearStreaming(sessionId: string): void { + delete streamingBySession[sessionId]; +} diff --git a/apps/kimi-web/src/composables/useDetailPanel.ts b/apps/kimi-web/src/composables/useDetailPanel.ts index e92880be5..7ac4909f7 100644 --- a/apps/kimi-web/src/composables/useDetailPanel.ts +++ b/apps/kimi-web/src/composables/useDetailPanel.ts @@ -8,6 +8,7 @@ import type { useKimiWebClient } from './useKimiWebClient'; import { buildEditDiffLines, extractEditPath, findToolCallById } from '../lib/toolDiff'; import { toolLabel } from '../lib/toolMeta'; import { clampPanelWidth, panelMaxWidth, useViewportWidth } from './useViewportWidth'; +import { streamingBySession } from './client/streamingStore'; type KimiWebClient = ReturnType; @@ -64,11 +65,32 @@ export function useDetailPanel({ // --------------------------------------------------------------------------- // Thinking panel // --------------------------------------------------------------------------- - const thinkingTarget = ref<{ turnId: string; blockIndex: number } | null>(null); + const thinkingTarget = ref<{ turnId: string; blockIndex: number; live?: boolean } | null>(null); const thinkingPanelText = computed(() => { const target = thinkingTarget.value; if (!target) return null; + // A live (still-streaming) thinking block is not in `client.turns` — its + // text lives in the streaming store. Read it there so the panel shows the + // growing text while the reply is still streaming (reactive: updates on + // each delta). + if (target.live) { + const sid = client.activeSessionId.value; + const live = streamingBySession[sid]?.blocks.find( + (b) => b.kind === 'thinking' && b.contentIndex === target.blockIndex, + ); + if (live?.text) return live.text; + // The store is cleared at every `messageUpdated` (tool slot / step end / + // turn end) so the committed content takes over in the chat. The last + // deltas and that clear land in the same tick and coalesce, so without a + // fallback the panel would close *before* rendering the final chunk. + // Fall back to the committed thinking block in the turn — it already + // holds the full text — so the panel keeps showing the complete content + // through the boundary instead of flickering closed. + const turn = client.turns.value.find((tn) => tn.id === target.turnId); + const committed = turn?.blocks?.find((b) => b.kind === 'thinking'); + return committed?.kind === 'thinking' ? committed.thinking : null; + } const turn = client.turns.value.find((tn) => tn.id === target.turnId); const blk = turn?.blocks?.[target.blockIndex]; return blk?.kind === 'thinking' ? blk.thinking : null; @@ -76,9 +98,14 @@ export function useDetailPanel({ const thinkingVisible = computed(() => thinkingPanelText.value !== null); - function openThinkingPanel(target: { turnId: string; blockIndex: number }): void { + function openThinkingPanel(target: { turnId: string; blockIndex: number; live?: boolean }): void { const current = thinkingTarget.value; - if (current && current.turnId === target.turnId && current.blockIndex === target.blockIndex) { + if ( + current && + current.turnId === target.turnId && + current.blockIndex === target.blockIndex && + current.live === target.live + ) { thinkingTarget.value = null; if (detailTarget.value === 'thinking') detailTarget.value = null; return; diff --git a/apps/kimi-web/src/composables/useKimiWebClient.ts b/apps/kimi-web/src/composables/useKimiWebClient.ts index 738696064..8b450938b 100644 --- a/apps/kimi-web/src/composables/useKimiWebClient.ts +++ b/apps/kimi-web/src/composables/useKimiWebClient.ts @@ -19,6 +19,7 @@ import { STORAGE_KEYS, } from '../lib/storage'; import { createEventBatcher, isRenderEvent } from './client/eventBatcher'; +import { appendStreamingDelta, clearStreaming } from './client/streamingStore'; import { useAppearance } from './client/useAppearance'; import { useNotification } from './client/useNotification'; import { useTaskPoller } from './client/useTaskPoller'; @@ -486,6 +487,7 @@ function forgetSession(sessionId: string): void { // That would make hasLoadedMessages() treat the stale empty cache as // authoritative and skip the next snapshot fetch for this id. enqueueEvent.flush(); + clearStreaming(sessionId); removeSession(sessionId); removeSessionMessages(sessionId); delete rawState.approvalsBySession[sessionId]; @@ -639,8 +641,28 @@ function nextOptimisticMsgId(): string { // past the queue check and clobber promptIdBySession (breaking abort). const inFlightPromptSessions = new Set(); +// Mirror of the reducer's advanceSeq, for the one event (assistantDelta) that +// bypasses the reducer. lastSeqBySession is a resync cursor with no rendering +// dependencies, so mutating it in place is both safe and cheap. +function advanceSeqCursor(sessionId: string | undefined, seq: number | undefined): void { + if (sessionId !== undefined && seq !== undefined && seq > 0) { + const prev = rawState.lastSeqBySession[sessionId] ?? 0; + if (seq > prev) rawState.lastSeqBySession[sessionId] = seq; + } +} + // Helper: mutate rawState by applying a reducer on a snapshot then re-assigning fields function applyEvent(event: ReturnType, sessionId: string, seq: number): void { + // Streaming text/thinking deltas bypass the reducer entirely. Appending to the + // fine-grained streaming store is O(1) and dirties only the single + // StreamingBlocks component — instead of cloning all of `rawState` and + // re-rendering the whole App + sidebar on every token. + if (event.type === 'assistantDelta') { + appendStreamingDelta(sessionId, event.messageId, event.contentIndex, event.delta); + advanceSeqCursor(sessionId, seq); + return; + } + const snapshot: KimiClientState = { sessions: rawState.sessions, activeSessionId: rawState.activeSessionId, @@ -670,6 +692,27 @@ function applyEvent(event: ReturnType, sessionId: string, seq rawState.config = next.config ?? null; rawState.warnings = next.warnings; + // `messageUpdated` carries the authoritative full content of a message (tool + // slot / step end / turn end): drop the live streaming entry so the just- + // committed content takes over without rendering the same text twice. + if (event.type === 'messageUpdated') { + clearStreaming(sessionId); + } + // Turn end: release the streaming entry for the session. + if ( + event.type === 'sessionStatusChanged' && + (event.status === 'idle' || event.status === 'aborted') + ) { + clearStreaming(sessionId); + } + // Session removed via the WS delete event: release the streaming entry so it + // does not leak as an orphan. `forgetSession()` already does this, but that + // path only covers not-found / archive — the WS delete event flows through the + // reducer instead, so it must clear the store here too. + if (event.type === 'sessionDeleted') { + clearStreaming(sessionId); + } + if (event.type === 'configChanged') { rawState.defaultModel = event.config.defaultModel ?? null; } @@ -1028,8 +1071,14 @@ async function syncSessionFromSnapshot(sessionId: string): Promise ({ ...snap.session, diff --git a/apps/kimi-web/test/event-reducer.test.ts b/apps/kimi-web/test/event-reducer.test.ts index 4df40777e..068c7ea8c 100644 --- a/apps/kimi-web/test/event-reducer.test.ts +++ b/apps/kimi-web/test/event-reducer.test.ts @@ -149,3 +149,44 @@ describe('reduceAppEvent taskProgress', () => { expect(lines?.at(-1)).toBe('line 59'); }); }); + +describe('reduceAppEvent sessions reference stability', () => { + // The sidebar computeds (sessionsForView / workspaceGroups / mergedWorkspaces) + // depend on `rawState.sessions`. Events that do not change sessions must keep + // the SAME array reference so those computeds are not dirtied; events that do + // change sessions must produce a NEW array. + + it('reuses the sessions reference for an event that does not touch sessions', () => { + const state = { + ...createInitialState(), + sessions: [makeSession('s1', '2026-01-01T00:00:00.000Z')], + messagesBySession: { s1: [makeMessage('s1', '2026-01-01T00:00:00.000Z')] }, + }; + const next = reduceAppEvent( + state, + { + type: 'messageUpdated', + sessionId: 's1', + messageId: 'msg_2026-01-01T00:00:00.000Z', + content: [{ type: 'text', text: 'updated' }], + status: 'completed', + }, + { sessionId: 's1', seq: 2 }, + ); + expect(next.sessions).toBe(state.sessions); + }); + + it('produces a new sessions array for an event that changes sessions', () => { + const state = { + ...createInitialState(), + sessions: [makeSession('s1', '2026-01-01T00:00:00.000Z')], + }; + const next = reduceAppEvent( + state, + { type: 'sessionCreated', session: makeSession('s2', '2026-02-01T00:00:00.000Z') }, + { sessionId: 's2', seq: 3 }, + ); + expect(next.sessions).not.toBe(state.sessions); + expect(next.sessions.map((s) => s.id)).toEqual(['s2', 's1']); + }); +}); diff --git a/apps/kimi-web/test/streaming-store.test.ts b/apps/kimi-web/test/streaming-store.test.ts new file mode 100644 index 000000000..82951f633 --- /dev/null +++ b/apps/kimi-web/test/streaming-store.test.ts @@ -0,0 +1,52 @@ +import { beforeEach, describe, expect, it } from 'vitest'; +import { + appendStreamingDelta, + clearStreaming, + streamingBySession, +} from '../src/composables/client/streamingStore'; + +const SID = 'session-1'; + +describe('streamingStore', () => { + beforeEach(() => { + clearStreaming(SID); + }); + + it('appends text to the same block on repeated deltas', () => { + appendStreamingDelta(SID, 'msg-a', 0, { text: 'hello' }); + appendStreamingDelta(SID, 'msg-a', 0, { text: ' ' }); + appendStreamingDelta(SID, 'msg-a', 0, { text: 'world' }); + const blocks = streamingBySession[SID]?.blocks ?? []; + expect(blocks).toHaveLength(1); + expect(blocks[0]).toMatchObject({ contentIndex: 0, kind: 'text', text: 'hello world' }); + }); + + it('opens a new block when contentIndex changes', () => { + appendStreamingDelta(SID, 'msg-a', 0, { thinking: 'think' }); + appendStreamingDelta(SID, 'msg-a', 1, { text: 'answer' }); + const blocks = streamingBySession[SID]?.blocks ?? []; + expect(blocks).toHaveLength(2); + expect(blocks[0]).toMatchObject({ contentIndex: 0, kind: 'thinking', text: 'think' }); + expect(blocks[1]).toMatchObject({ contentIndex: 1, kind: 'text', text: 'answer' }); + }); + + it('resets when the message id changes (new step / after a tool)', () => { + appendStreamingDelta(SID, 'msg-a', 0, { text: 'first message' }); + appendStreamingDelta(SID, 'msg-b', 0, { text: 'second message' }); + const state = streamingBySession[SID]; + expect(state?.messageId).toBe('msg-b'); + expect(state?.blocks).toHaveLength(1); + expect(state?.blocks[0]?.text).toBe('second message'); + }); + + it('ignores empty chunks', () => { + appendStreamingDelta(SID, 'msg-a', 0, { text: '' }); + expect(streamingBySession[SID]?.blocks ?? []).toHaveLength(0); + }); + + it('clears the entry', () => { + appendStreamingDelta(SID, 'msg-a', 0, { text: 'hi' }); + clearStreaming(SID); + expect(streamingBySession[SID]).toBeUndefined(); + }); +}); diff --git a/apps/kimi-web/test/thinking-streaming.test.ts b/apps/kimi-web/test/thinking-streaming.test.ts new file mode 100644 index 000000000..733ba2dd6 --- /dev/null +++ b/apps/kimi-web/test/thinking-streaming.test.ts @@ -0,0 +1,236 @@ +import { beforeEach, describe, expect, it } from 'vitest'; +import { createAgentProjector } from '../src/api/daemon/agentEventProjector'; +import type { AppEvent, AppSession } from '../src/api/types'; +import { reduceAppEvent, createInitialState } from '../src/api/daemon/eventReducer'; +import type { KimiClientState } from '../src/api/daemon/eventReducer'; +import { + appendStreamingDelta, + clearStreaming, + streamingBySession, +} from '../src/composables/client/streamingStore'; + +// Integration reproduction for the thinking-streaming regression. +// +// Real daemon frames flow: raw agent-core frame → projector.project() → +// AppEvent[] → applyEvent(). Since a29798789, applyEvent short-circuits +// `assistantDelta` into the streaming store and bypasses the reducer, then +// `messageUpdated` / `sessionStatusChanged`(idle|aborted) clear the store so +// the committed content takes over. This test drives that exact pipeline with a +// think → text → step.completed → turn.ended sequence and asserts the live +// thinking block stays visible while streaming and settles correctly. + +const SID = 'session-thinking'; + +// Local mirror of applyEvent's short-circuit + commit-clear logic (the real +// one closes over module-level rawState, which we cannot reset between tests). +// Keeping the decision rules here in sync with useKimiWebClient.applyEvent is +// the point — if they drift, this test catches it. +function applyEventLocally(state: KimiClientState, event: AppEvent, seq: number): KimiClientState { + if (event.type === 'assistantDelta') { + appendStreamingDelta(SID, event.messageId, event.contentIndex, event.delta); + // advanceSeqCursor (no rendering dependency) + if (seq > 0 && seq > (state.lastSeqBySession[SID] ?? 0)) { + state.lastSeqBySession[SID] = seq; + } + return state; + } + const next = reduceAppEvent(state, event, { sessionId: SID, seq }); + if (event.type === 'messageUpdated') clearStreaming(SID); + if (event.type === 'sessionStatusChanged' && (event.status === 'idle' || event.status === 'aborted')) { + clearStreaming(SID); + } + return next; +} + +function projectFrame( + projector: ReturnType, + state: KimiClientState, + type: string, + payload: Record, + seq: number, + offset?: number, +): KimiClientState { + const events = projector.project(type, payload, SID, offset !== undefined ? { offset } : undefined); + let s = state; + for (const evt of events) s = applyEventLocally(s, evt, seq); + return s; +} + +beforeEach(() => { + clearStreaming(SID); +}); + +function makeInitialSessionState(): KimiClientState { + // Real flow has a sessionCreated before turn.started; sessionStatusChanged + // only maps an existing session, so seed one here. + const session: AppSession = { + id: SID, + title: SID, + createdAt: '2026-01-01T00:00:00.000Z', + updatedAt: '2026-01-01T00:00:00.000Z', + status: 'idle', + archived: false, + cwd: '/workspace', + model: 'kimi-code', + usage: { + inputTokens: 0, + outputTokens: 0, + cacheReadTokens: 0, + cacheCreationTokens: 0, + totalCostUsd: 0, + contextTokens: 0, + contextLimit: 0, + turnCount: 0, + }, + messageCount: 0, + lastSeq: 0, + }; + return { ...createInitialState(), sessions: [session] }; +} + +describe('thinking streaming pipeline (projector → streaming store → reducer)', () => { + it('keeps the live thinking block visible while streaming, then settles on step.completed', () => { + const projector = createAgentProjector(); + let state: KimiClientState = { ...makeInitialSessionState() }; + + // turn.started → running + state = projectFrame(projector, state, 'turn.started', { turnId: 1 }, 1); + expect(state.sessions.some((s) => s.id === SID && s.status === 'running')).toBe(true); + + // turn.step.started → empty assistant message created + state = projectFrame(projector, state, 'turn.step.started', { turnId: 1 }, 2); + const msgs = state.messagesBySession[SID] ?? []; + expect(msgs.length).toBe(1); + expect(msgs[0]!.role).toBe('assistant'); + expect(msgs[0]!.content).toHaveLength(0); + const assistantMsgId = msgs[0]!.id; + + // thinking.delta x3 → live thinking accumulates in the streaming store ONLY + // (reducer is bypassed), so messagesBySession stays empty. Wire `offset` is + // the pre-append cumulative length, so it must track the running total. + state = projectFrame(projector, state, 'thinking.delta', { delta: 'Let me ' }, 3, 0); + state = projectFrame(projector, state, 'thinking.delta', { delta: 'think ' }, 4, 7); + state = projectFrame(projector, state, 'thinking.delta', { delta: 'about this.' }, 5, 13); + + expect(state.messagesBySession[SID]![0]!.content).toHaveLength(0); + const live = streamingBySession[SID]; + expect(live).toBeDefined(); + expect(live!.messageId).toBe(assistantMsgId); + expect(live!.blocks).toHaveLength(1); + expect(live!.blocks[0]).toMatchObject({ kind: 'thinking', contentIndex: 0, text: 'Let me think about this.' }); + + // assistant.delta → opens a NEW content part (idx 1, text) in the live store. + // turnTextLen starts at 0 for the text stream; offset tracks its own running + // total (independent of thinking). + state = projectFrame(projector, state, 'assistant.delta', { delta: 'Here ' }, 6, 0); + state = projectFrame(projector, state, 'assistant.delta', { delta: 'is the answer.' }, 7, 5); + + const live2 = streamingBySession[SID]; + expect(live2!.blocks).toHaveLength(2); + expect(live2!.blocks[0]).toMatchObject({ kind: 'thinking' }); + expect(live2!.blocks[1]).toMatchObject({ kind: 'text', text: 'Here is the answer.' }); + + // turn.step.completed → messageUpdated carries the full content (thinking + + // text) and CLEARS the live store. The committed content must now hold both. + state = projectFrame(projector, state, 'turn.step.completed', { turnId: 1, usage: {} }, 8); + expect(streamingBySession[SID]).toBeUndefined(); + const committed = state.messagesBySession[SID]![0]!.content; + expect(committed).toHaveLength(2); + expect(committed[0]).toMatchObject({ type: 'thinking', thinking: 'Let me think about this.' }); + expect(committed[1]).toMatchObject({ type: 'text', text: 'Here is the answer.' }); + + // turn.ended → idle; store already cleared. + state = projectFrame(projector, state, 'turn.ended', { turnId: 1, reason: 'completed', durationMs: 42 }, 9); + expect(streamingBySession[SID]).toBeUndefined(); + expect(state.sessions.some((s) => s.id === SID && s.status === 'idle')).toBe(true); + expect(state.messagesBySession[SID]![0]!.durationMs).toBe(42); + }); + + it('does NOT clear the live thinking store on unrelated lifecycle events mid-stream', () => { + const projector = createAgentProjector(); + let state: KimiClientState = { ...makeInitialSessionState() }; + state = projectFrame(projector, state, 'turn.started', { turnId: 1 }, 1); + state = projectFrame(projector, state, 'turn.step.started', { turnId: 1 }, 2); + state = projectFrame(projector, state, 'thinking.delta', { delta: 'pondering' }, 3, 0); + + // An agent.status.updated (usage) event is common mid-stream and must NOT + // wipe the live thinking block. + state = projectFrame(projector, state, 'agent.status.updated', { model: 'kimi-x', contextTokens: 123 }, 4); + expect(streamingBySession[SID]).toBeDefined(); + expect(streamingBySession[SID]!.blocks[0]).toMatchObject({ kind: 'thinking', text: 'pondering' }); + }); + + it('streams thinking across a second step after the first step commits', () => { + // Multi-step turn: step1 (thinking + text) completes → messageUpdated clears + // the store → step2 starts a fresh assistant message and streams thinking. + // The live thinking must reappear for step2 (regression guard: clearing on + // messageUpdated must not permanently kill the store for the rest of turn). + const projector = createAgentProjector(); + let state: KimiClientState = { ...makeInitialSessionState() }; + + state = projectFrame(projector, state, 'turn.started', { turnId: 1 }, 1); + state = projectFrame(projector, state, 'turn.step.started', { turnId: 1 }, 2); + state = projectFrame(projector, state, 'thinking.delta', { delta: 'step1 thought' }, 3, 0); + state = projectFrame(projector, state, 'assistant.delta', { delta: 'step1 text' }, 4, 0); + state = projectFrame(projector, state, 'turn.step.completed', { turnId: 1, usage: {} }, 5); + expect(streamingBySession[SID]).toBeUndefined(); + + // step2: a fresh assistant message; live thinking must accumulate again. + state = projectFrame(projector, state, 'turn.step.started', { turnId: 1 }, 6); + state = projectFrame(projector, state, 'thinking.delta', { delta: 'step2 thought' }, 7, 0); + const live = streamingBySession[SID]; + expect(live).toBeDefined(); + expect(live!.blocks).toHaveLength(1); + expect(live!.blocks[0]).toMatchObject({ kind: 'thinking', text: 'step2 thought' }); + }); +}); + + it('resumes live thinking after a mid-stream reconnect (seedInFlight) without duplicating', () => { + // Reproduce the reconnect-mid-thinking path that ab177991a targeted. + // Flow: snapshot arrives with inFlightTurn.thinkingText already partially + // streamed → syncSessionFromSnapshot clears the store → seedInFlight + // rebuilds the assistant message (thinking + text parts) via messageCreated + // → live thinking.delta arrives aligned by offset and must append to the + // store WITHOUT re-rendering the already-committed thinking prefix. + const projector = createAgentProjector(); + let state: KimiClientState = { ...makeInitialSessionState() }; + + // The snapshot already saw "thinking prefix " (15 chars) of thinking. + const inFlight = { + turnId: 1, + promptId: 'pr_real', + thinkingText: 'thinking prefix ', + assistantText: '', + runningTools: [], + }; + // syncSessionFromSnapshot: clearStreaming first (store is empty here anyway) + clearStreaming(SID); + // seedInFlight → sessionStatusChanged(running) + messageCreated(thinking) + const seedEvents = projector.seedInFlight(SID, inFlight); + for (const evt of seedEvents) state = applyEventLocally(state, evt, 100); + + // The seeded assistant message carries the committed thinking prefix. + const seeded = state.messagesBySession[SID]!.at(-1)!; + expect(seeded.role).toBe('assistant'); + expect(seeded.content[0]).toMatchObject({ type: 'thinking', thinking: 'thinking prefix ' }); + const seededMsgId = seeded.id; + // Store must be empty after seed (only messageCreated ran, no delta yet). + expect(streamingBySession[SID]).toBeUndefined(); + + // Live thinking.delta resumes at the seeded prefix length. It must append + // to the store as a NEW live thinking block (the seeded thinking is part 0, + // and the live delta continues part 0). + const liveEvents = projector.project('thinking.delta', { delta: 'continued…' }, SID, { + offset: inFlight.thinkingText.length, + }); + for (const evt of liveEvents) state = applyEventLocally(state, evt, 101); + + const live = streamingBySession[SID]; + expect(live).toBeDefined(); + expect(live!.messageId).toBe(seededMsgId); + expect(live!.blocks).toHaveLength(1); + expect(live!.blocks[0]).toMatchObject({ kind: 'thinking', text: 'continued…' }); + // The committed prefix is NOT duplicated into the live store. + expect(live!.blocks[0]!.text).not.toContain('prefix'); + }); +});