From a297987894f93497fe2d94d6e5d725e777e222e3 Mon Sep 17 00:00:00 2001 From: qer Date: Fri, 26 Jun 2026 00:04:27 +0800 Subject: [PATCH 1/5] perf(web): isolate streaming text to fix main-thread jank During a long streaming reply every assistantDelta was routed through the immutable reducer and the coarse rawState graph, which made the whole App and the sidebar computeds re-render on every token and stalled the main thread for hundreds of milliseconds per frame. Keep the live streaming text in a separate fine-grained store outside rawState and let assistantDelta bypass the reducer, so only the actively streaming block re-renders. Also stop cloneState from copying the sessions array for events that do not touch it, so the sidebar computeds stay stable during streaming. --- .changeset/fix-web-streaming-render-jank.md | 5 ++ apps/kimi-web/src/api/daemon/eventReducer.ts | 8 ++- .../kimi-web/src/components/chat/ChatPane.vue | 23 ++++++ .../src/components/chat/ConversationPane.vue | 1 + .../kimi-web/src/components/chat/Markdown.vue | 15 ++-- .../src/components/chat/StreamingBlocks.vue | 51 +++++++++++++ .../src/composables/client/streamingStore.ts | 71 +++++++++++++++++++ .../src/composables/useKimiWebClient.ts | 35 +++++++++ apps/kimi-web/test/event-reducer.test.ts | 41 +++++++++++ apps/kimi-web/test/streaming-store.test.ts | 52 ++++++++++++++ 10 files changed, 295 insertions(+), 7 deletions(-) create mode 100644 .changeset/fix-web-streaming-render-jank.md create mode 100644 apps/kimi-web/src/components/chat/StreamingBlocks.vue create mode 100644 apps/kimi-web/src/composables/client/streamingStore.ts create mode 100644 apps/kimi-web/test/streaming-store.test.ts diff --git a/.changeset/fix-web-streaming-render-jank.md b/.changeset/fix-web-streaming-render-jank.md new file mode 100644 index 000000000..78647db3c --- /dev/null +++ b/.changeset/fix-web-streaming-render-jank.md @@ -0,0 +1,5 @@ +--- +"@moonshot-ai/kimi-code": patch +--- + +Keep the web chat responsive during long streaming replies by isolating live token text from the rest of the UI state, so it no longer stalls the main thread. diff --git a/apps/kimi-web/src/api/daemon/eventReducer.ts b/apps/kimi-web/src/api/daemon/eventReducer.ts index 23a94e002..47d9c4b47 100644 --- a/apps/kimi-web/src/api/daemon/eventReducer.ts +++ b/apps/kimi-web/src/api/daemon/eventReducer.ts @@ -84,7 +84,13 @@ export function createInitialState(): KimiClientState { function cloneState(s: KimiClientState): KimiClientState { return { ...s, - sessions: [...s.sessions], + // Reuse the `sessions` array reference when an event does not touch it. + // Every session-mutating case below already builds its own array via + // `[...]` / `.map` / `.filter`, so sharing the reference is safe — and it + // keeps `rawState.sessions` stable for events that don't change sessions, + // so the sidebar computeds (sessionsForView / workspaceGroups / + // mergedWorkspaces) are not dirtied by unrelated events. + sessions: s.sessions, messagesBySession: { ...s.messagesBySession }, approvalsBySession: { ...s.approvalsBySession }, planReviewByToolCallId: { ...s.planReviewByToolCallId }, diff --git a/apps/kimi-web/src/components/chat/ChatPane.vue b/apps/kimi-web/src/components/chat/ChatPane.vue index 5217731cc..8c858435f 100644 --- a/apps/kimi-web/src/components/chat/ChatPane.vue +++ b/apps/kimi-web/src/components/chat/ChatPane.vue @@ -5,6 +5,7 @@ import { useI18n } from 'vue-i18n'; import type { ChatTurn, ApprovalBlock, FilePreviewRequest, ToolMedia } from '../../types'; import ToolCall from './ToolCall.vue'; import Markdown from './Markdown.vue'; +import StreamingBlocks from './StreamingBlocks.vue'; import ThinkingBlock from './ThinkingBlock.vue'; import ActivityNotice from './ActivityNotice.vue'; import AgentCard from './AgentCard.vue'; @@ -44,6 +45,12 @@ onUnmounted(() => { const props = withDefaults( defineProps<{ turns: ChatTurn[]; + /** + * The session these turns belong to. Used by the streaming renderer to look + * up the live text in the streaming store. Optional so SideChatPanel (which + * renders a subagent transcript, not a streaming session) can omit it. + */ + sessionId?: string; approvals?: { approvalId: string; block: ApprovalBlock; agentName?: string }[]; /** * Bubble chat layout: render each turn as a chat bubble (user = right-aligned @@ -537,6 +544,14 @@ function isStreamingRenderBlock(turn: ChatTurn, block: { sourceIndex: number }): +
{{ formatDuration(turn.durationMs) }}
diff --git a/apps/kimi-web/src/components/chat/ConversationPane.vue b/apps/kimi-web/src/components/chat/ConversationPane.vue index ad96e6266..e88df2bc1 100644 --- a/apps/kimi-web/src/components/chat/ConversationPane.vue +++ b/apps/kimi-web/src/components/chat/ConversationPane.vue @@ -1008,6 +1008,7 @@ defineExpose({ loadComposerForEdit }); ref="chatPaneRef" :key="fileReloadKey ?? 'no-session'" :turns="turns" + :session-id="sessionId" :approvals="approvals" :bubble="bubble" :mobile="mobile" diff --git a/apps/kimi-web/src/components/chat/Markdown.vue b/apps/kimi-web/src/components/chat/Markdown.vue index dcafb4790..c55ec2d17 100644 --- a/apps/kimi-web/src/components/chat/Markdown.vue +++ b/apps/kimi-web/src/components/chat/Markdown.vue @@ -70,12 +70,15 @@ const renderPlan = computed(() => { // Code blocks follow the app colour scheme (shiki re-renders on flip). const isDark = useIsDark(); -// markstream's chat mode can batch nodes and defer offscreen nodes. Batching is -// safe for settled history, but viewport deferral can leave individual code -// blocks blank in our internal chat scroller when visibility events are missed -// during a session/theme switch. Keep batching for history, but always mount the -// actual nodes so every code block has at least its plain fallback immediately. -const allowBatchRender = computed(() => !props.streaming); +// markstream's chat mode batches node mounting across frames (frame-budget +// scheduling) and can defer offscreen nodes. Viewport deferral can leave +// individual code blocks blank in our internal chat scroller when visibility +// events are missed, so it stays disabled below (`deferNodesUntilVisible: +// false`). Batching itself only spreads mounting by a frame or two and is +// exactly the scenario streaming needs, so it stays on for both live and +// settled content (the `loading: false` code-block prop already removes the +// skeleton, so a not-yet-mounted block simply appears a frame later). +const allowBatchRender = computed(() => true); // --------------------------------------------------------------------------- // Local image resolution — rewrite the SOURCE TEXT before markstream sees it. diff --git a/apps/kimi-web/src/components/chat/StreamingBlocks.vue b/apps/kimi-web/src/components/chat/StreamingBlocks.vue new file mode 100644 index 000000000..5230314d9 --- /dev/null +++ b/apps/kimi-web/src/components/chat/StreamingBlocks.vue @@ -0,0 +1,51 @@ + + + + + diff --git a/apps/kimi-web/src/composables/client/streamingStore.ts b/apps/kimi-web/src/composables/client/streamingStore.ts new file mode 100644 index 000000000..73f3a80ed --- /dev/null +++ b/apps/kimi-web/src/composables/client/streamingStore.ts @@ -0,0 +1,71 @@ +// apps/kimi-web/src/composables/client/streamingStore.ts +// +// Fine-grained streaming-text store, kept OUTSIDE `rawState` on purpose. +// +// `assistantDelta` is the only genuinely high-frequency event (dozens to +// hundreds per second). Routing it through the immutable reducer + the coarse +// `rawState` graph makes every delta re-render the whole App and recompute the +// sidebar computeds (see the main-thread-jank investigation). Instead, deltas +// append here and only the single `StreamingBlocks` component subscribed to a +// session re-renders. +// +// Lifecycle: deltas append; `messageUpdated` (authoritative full content) and +// turn-end (`sessionStatusChanged` idle/aborted) clear the entry so the +// committed content in `messagesBySession` takes over without duplication. + +import { reactive } from 'vue'; + +export interface StreamingBlock { + contentIndex: number; + kind: 'text' | 'thinking'; + text: string; +} + +export interface StreamingState { + /** id of the assistant message currently being streamed. */ + messageId: string; + /** Ordered live text/thinking blocks (always trailing in the message). */ + blocks: StreamingBlock[]; +} + +/** + * Per-session live streaming state. A session has at most one in-flight + * assistant message (its trailing one), so a single entry per session suffices. + */ +export const streamingBySession = reactive>({}); + +/** + * Append one `assistantDelta` to the streaming store. O(1): either mutates the + * trailing block's text in place (same contentIndex) or pushes a new block + * (new contentIndex, rare). Never touches `rawState`, so no heavy computed + * (`turns`, sidebar) is dirtied. + */ +export function appendStreamingDelta( + sessionId: string, + messageId: string, + contentIndex: number, + delta: { text?: string; thinking?: string }, +): void { + let state = streamingBySession[sessionId]; + // A new assistant message (new step, or text resuming after a tool) starts a + // fresh entry — the previous message is already committed via messageUpdated. + if (!state || state.messageId !== messageId) { + state = streamingBySession[sessionId] = { messageId, blocks: [] }; + } + + const kind: 'text' | 'thinking' = delta.text !== undefined ? 'text' : 'thinking'; + const chunk = delta.text ?? delta.thinking ?? ''; + if (chunk.length === 0) return; + + const last = state.blocks.at(-1); + if (last && last.contentIndex === contentIndex && last.kind === kind) { + last.text += chunk; + } else { + state.blocks.push({ contentIndex, kind, text: chunk }); + } +} + +/** Drop the live entry for a session (commit or turn end). */ +export function clearStreaming(sessionId: string): void { + delete streamingBySession[sessionId]; +} diff --git a/apps/kimi-web/src/composables/useKimiWebClient.ts b/apps/kimi-web/src/composables/useKimiWebClient.ts index 738696064..afd9db928 100644 --- a/apps/kimi-web/src/composables/useKimiWebClient.ts +++ b/apps/kimi-web/src/composables/useKimiWebClient.ts @@ -19,6 +19,7 @@ import { STORAGE_KEYS, } from '../lib/storage'; import { createEventBatcher, isRenderEvent } from './client/eventBatcher'; +import { appendStreamingDelta, clearStreaming } from './client/streamingStore'; import { useAppearance } from './client/useAppearance'; import { useNotification } from './client/useNotification'; import { useTaskPoller } from './client/useTaskPoller'; @@ -639,8 +640,28 @@ function nextOptimisticMsgId(): string { // past the queue check and clobber promptIdBySession (breaking abort). const inFlightPromptSessions = new Set(); +// Mirror of the reducer's advanceSeq, for the one event (assistantDelta) that +// bypasses the reducer. lastSeqBySession is a resync cursor with no rendering +// dependencies, so mutating it in place is both safe and cheap. +function advanceSeqCursor(sessionId: string | undefined, seq: number | undefined): void { + if (sessionId !== undefined && seq !== undefined && seq > 0) { + const prev = rawState.lastSeqBySession[sessionId] ?? 0; + if (seq > prev) rawState.lastSeqBySession[sessionId] = seq; + } +} + // Helper: mutate rawState by applying a reducer on a snapshot then re-assigning fields function applyEvent(event: ReturnType, sessionId: string, seq: number): void { + // Streaming text/thinking deltas bypass the reducer entirely. Appending to the + // fine-grained streaming store is O(1) and dirties only the single + // StreamingBlocks component — instead of cloning all of `rawState` and + // re-rendering the whole App + sidebar on every token. + if (event.type === 'assistantDelta') { + appendStreamingDelta(sessionId, event.messageId, event.contentIndex, event.delta); + advanceSeqCursor(sessionId, seq); + return; + } + const snapshot: KimiClientState = { sessions: rawState.sessions, activeSessionId: rawState.activeSessionId, @@ -670,6 +691,20 @@ function applyEvent(event: ReturnType, sessionId: string, seq rawState.config = next.config ?? null; rawState.warnings = next.warnings; + // `messageUpdated` carries the authoritative full content of a message (tool + // slot / step end / turn end): drop the live streaming entry so the just- + // committed content takes over without rendering the same text twice. + if (event.type === 'messageUpdated') { + clearStreaming(sessionId); + } + // Turn end: release the streaming entry for the session. + if ( + event.type === 'sessionStatusChanged' && + (event.status === 'idle' || event.status === 'aborted') + ) { + clearStreaming(sessionId); + } + if (event.type === 'configChanged') { rawState.defaultModel = event.config.defaultModel ?? null; } diff --git a/apps/kimi-web/test/event-reducer.test.ts b/apps/kimi-web/test/event-reducer.test.ts index 4df40777e..068c7ea8c 100644 --- a/apps/kimi-web/test/event-reducer.test.ts +++ b/apps/kimi-web/test/event-reducer.test.ts @@ -149,3 +149,44 @@ describe('reduceAppEvent taskProgress', () => { expect(lines?.at(-1)).toBe('line 59'); }); }); + +describe('reduceAppEvent sessions reference stability', () => { + // The sidebar computeds (sessionsForView / workspaceGroups / mergedWorkspaces) + // depend on `rawState.sessions`. Events that do not change sessions must keep + // the SAME array reference so those computeds are not dirtied; events that do + // change sessions must produce a NEW array. + + it('reuses the sessions reference for an event that does not touch sessions', () => { + const state = { + ...createInitialState(), + sessions: [makeSession('s1', '2026-01-01T00:00:00.000Z')], + messagesBySession: { s1: [makeMessage('s1', '2026-01-01T00:00:00.000Z')] }, + }; + const next = reduceAppEvent( + state, + { + type: 'messageUpdated', + sessionId: 's1', + messageId: 'msg_2026-01-01T00:00:00.000Z', + content: [{ type: 'text', text: 'updated' }], + status: 'completed', + }, + { sessionId: 's1', seq: 2 }, + ); + expect(next.sessions).toBe(state.sessions); + }); + + it('produces a new sessions array for an event that changes sessions', () => { + const state = { + ...createInitialState(), + sessions: [makeSession('s1', '2026-01-01T00:00:00.000Z')], + }; + const next = reduceAppEvent( + state, + { type: 'sessionCreated', session: makeSession('s2', '2026-02-01T00:00:00.000Z') }, + { sessionId: 's2', seq: 3 }, + ); + expect(next.sessions).not.toBe(state.sessions); + expect(next.sessions.map((s) => s.id)).toEqual(['s2', 's1']); + }); +}); diff --git a/apps/kimi-web/test/streaming-store.test.ts b/apps/kimi-web/test/streaming-store.test.ts new file mode 100644 index 000000000..82951f633 --- /dev/null +++ b/apps/kimi-web/test/streaming-store.test.ts @@ -0,0 +1,52 @@ +import { beforeEach, describe, expect, it } from 'vitest'; +import { + appendStreamingDelta, + clearStreaming, + streamingBySession, +} from '../src/composables/client/streamingStore'; + +const SID = 'session-1'; + +describe('streamingStore', () => { + beforeEach(() => { + clearStreaming(SID); + }); + + it('appends text to the same block on repeated deltas', () => { + appendStreamingDelta(SID, 'msg-a', 0, { text: 'hello' }); + appendStreamingDelta(SID, 'msg-a', 0, { text: ' ' }); + appendStreamingDelta(SID, 'msg-a', 0, { text: 'world' }); + const blocks = streamingBySession[SID]?.blocks ?? []; + expect(blocks).toHaveLength(1); + expect(blocks[0]).toMatchObject({ contentIndex: 0, kind: 'text', text: 'hello world' }); + }); + + it('opens a new block when contentIndex changes', () => { + appendStreamingDelta(SID, 'msg-a', 0, { thinking: 'think' }); + appendStreamingDelta(SID, 'msg-a', 1, { text: 'answer' }); + const blocks = streamingBySession[SID]?.blocks ?? []; + expect(blocks).toHaveLength(2); + expect(blocks[0]).toMatchObject({ contentIndex: 0, kind: 'thinking', text: 'think' }); + expect(blocks[1]).toMatchObject({ contentIndex: 1, kind: 'text', text: 'answer' }); + }); + + it('resets when the message id changes (new step / after a tool)', () => { + appendStreamingDelta(SID, 'msg-a', 0, { text: 'first message' }); + appendStreamingDelta(SID, 'msg-b', 0, { text: 'second message' }); + const state = streamingBySession[SID]; + expect(state?.messageId).toBe('msg-b'); + expect(state?.blocks).toHaveLength(1); + expect(state?.blocks[0]?.text).toBe('second message'); + }); + + it('ignores empty chunks', () => { + appendStreamingDelta(SID, 'msg-a', 0, { text: '' }); + expect(streamingBySession[SID]?.blocks ?? []).toHaveLength(0); + }); + + it('clears the entry', () => { + appendStreamingDelta(SID, 'msg-a', 0, { text: 'hi' }); + clearStreaming(SID); + expect(streamingBySession[SID]).toBeUndefined(); + }); +}); From ab177991a280093032c9bff08d6b5db0eea4cb41 Mon Sep 17 00:00:00 2001 From: qer Date: Fri, 26 Jun 2026 00:23:03 +0800 Subject: [PATCH 2/5] fix(web): clear streaming store on snapshot and session teardown Since streaming deltas now live outside rawState, the snapshot install path (which overwrites messagesBySession) no longer clears them. A queued delta flushed right before a snapshot would survive the authoritative seed and render on top of it (or leak into the next turn) after a reconnect or delta-gap resync. Clear the per-session streaming store when installing a snapshot and when tearing down a session. Addresses review feedback on PR #1111. --- apps/kimi-web/src/composables/useKimiWebClient.ts | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/apps/kimi-web/src/composables/useKimiWebClient.ts b/apps/kimi-web/src/composables/useKimiWebClient.ts index afd9db928..6a709da8a 100644 --- a/apps/kimi-web/src/composables/useKimiWebClient.ts +++ b/apps/kimi-web/src/composables/useKimiWebClient.ts @@ -487,6 +487,7 @@ function forgetSession(sessionId: string): void { // That would make hasLoadedMessages() treat the stale empty cache as // authoritative and skip the next snapshot fetch for this id. enqueueEvent.flush(); + clearStreaming(sessionId); removeSession(sessionId); removeSessionMessages(sessionId); delete rawState.approvalsBySession[sessionId]; @@ -1063,8 +1064,14 @@ async function syncSessionFromSnapshot(sessionId: string): Promise ({ ...snap.session, From b791a35c7a20b1f4674bd8651d6c7cb3fcb74de6 Mon Sep 17 00:00:00 2001 From: qer Date: Fri, 26 Jun 2026 00:59:02 +0800 Subject: [PATCH 3/5] fix(web): show live thinking text in the detail panel while streaming Clicking a still-streaming thinking block emitted a blockIndex (contentIndex) that did not match the turn.blocks index the detail panel reads, and the live block is not in client.turns at all (deltas bypass messagesBySession). So the panel either stayed closed or showed a stale committed block. Tag live thinking targets with a `live` flag and read their text from the streaming store in thinkingPanelText, so the panel shows the growing thinking text while the reply is streaming (restoring the pre-existing "click to view full thinking" behavior). Once the turn settles the store is cleared and the live target returns null, closing the panel rather than indexing turn.blocks with a mismatched contentIndex. --- .../kimi-web/src/components/chat/ChatPane.vue | 2 +- .../src/components/chat/ConversationPane.vue | 2 +- .../src/components/chat/StreamingBlocks.vue | 4 +-- .../src/composables/useDetailPanel.ts | 25 ++++++++++++++++--- 4 files changed, 26 insertions(+), 7 deletions(-) diff --git a/apps/kimi-web/src/components/chat/ChatPane.vue b/apps/kimi-web/src/components/chat/ChatPane.vue index 8c858435f..7216a7127 100644 --- a/apps/kimi-web/src/components/chat/ChatPane.vue +++ b/apps/kimi-web/src/components/chat/ChatPane.vue @@ -205,7 +205,7 @@ const emit = defineEmits<{ openMedia: [media: ToolMedia]; copyConversationCopied: []; /** Show a thinking block's full text in the right-side panel. */ - openThinking: [target: { turnId: string; blockIndex: number }]; + openThinking: [target: { turnId: string; blockIndex: number; live?: boolean }]; /** Show a compaction divider's summary text in the right-side panel. */ openCompaction: [target: { turnId: string }]; /** Show a subagent's full detail in the right-side panel. */ diff --git a/apps/kimi-web/src/components/chat/ConversationPane.vue b/apps/kimi-web/src/components/chat/ConversationPane.vue index e88df2bc1..aa251bb3c 100644 --- a/apps/kimi-web/src/components/chat/ConversationPane.vue +++ b/apps/kimi-web/src/components/chat/ConversationPane.vue @@ -105,7 +105,7 @@ const emit = defineEmits<{ selectModel: [modelId: string]; openFile: [target: FilePreviewRequest]; openMedia: [media: ToolMedia]; - openThinking: [target: { turnId: string; blockIndex: number }]; + openThinking: [target: { turnId: string; blockIndex: number; live?: boolean }]; openCompaction: [target: { turnId: string }]; openAgent: [target: { turnId: string; blockIndex: number; memberId: string }]; openToolDiff: [id: string]; diff --git a/apps/kimi-web/src/components/chat/StreamingBlocks.vue b/apps/kimi-web/src/components/chat/StreamingBlocks.vue index 5230314d9..ea002478d 100644 --- a/apps/kimi-web/src/components/chat/StreamingBlocks.vue +++ b/apps/kimi-web/src/components/chat/StreamingBlocks.vue @@ -27,7 +27,7 @@ const props = withDefaults( const emit = defineEmits<{ openFile: [target: FilePreviewRequest]; - openThinking: [target: { turnId: string; blockIndex: number }]; + openThinking: [target: { turnId: string; blockIndex: number; live?: boolean }]; }>(); // Subscribe to this session's live blocks. Only this computed (and therefore @@ -42,7 +42,7 @@ const blocks = computed(() => streamingBySession[props.sessionId]?.blocks ?? []) :text="blk.text" :mobile="mobile" :streaming="true" - @open="emit('openThinking', { turnId, blockIndex: blk.contentIndex })" + @open="emit('openThinking', { turnId, blockIndex: blk.contentIndex, live: true })" />
diff --git a/apps/kimi-web/src/composables/useDetailPanel.ts b/apps/kimi-web/src/composables/useDetailPanel.ts index e92880be5..1e705f32b 100644 --- a/apps/kimi-web/src/composables/useDetailPanel.ts +++ b/apps/kimi-web/src/composables/useDetailPanel.ts @@ -8,6 +8,7 @@ import type { useKimiWebClient } from './useKimiWebClient'; import { buildEditDiffLines, extractEditPath, findToolCallById } from '../lib/toolDiff'; import { toolLabel } from '../lib/toolMeta'; import { clampPanelWidth, panelMaxWidth, useViewportWidth } from './useViewportWidth'; +import { streamingBySession } from './client/streamingStore'; type KimiWebClient = ReturnType; @@ -64,11 +65,24 @@ export function useDetailPanel({ // --------------------------------------------------------------------------- // Thinking panel // --------------------------------------------------------------------------- - const thinkingTarget = ref<{ turnId: string; blockIndex: number } | null>(null); + const thinkingTarget = ref<{ turnId: string; blockIndex: number; live?: boolean } | null>(null); const thinkingPanelText = computed(() => { const target = thinkingTarget.value; if (!target) return null; + // A live (still-streaming) thinking block is not in `client.turns` — its + // text lives in the streaming store. Read it there so the panel shows the + // growing text while the reply is still streaming (reactive: updates on + // each delta). Once the turn settles the store is cleared and the live + // target goes stale; return null so the panel closes rather than indexing + // `turn.blocks` with a contentIndex that does not match its sourceIndex. + if (target.live) { + const sid = client.activeSessionId.value; + const live = streamingBySession[sid]?.blocks.find( + (b) => b.kind === 'thinking' && b.contentIndex === target.blockIndex, + ); + return live?.text ?? null; + } const turn = client.turns.value.find((tn) => tn.id === target.turnId); const blk = turn?.blocks?.[target.blockIndex]; return blk?.kind === 'thinking' ? blk.thinking : null; @@ -76,9 +90,14 @@ export function useDetailPanel({ const thinkingVisible = computed(() => thinkingPanelText.value !== null); - function openThinkingPanel(target: { turnId: string; blockIndex: number }): void { + function openThinkingPanel(target: { turnId: string; blockIndex: number; live?: boolean }): void { const current = thinkingTarget.value; - if (current && current.turnId === target.turnId && current.blockIndex === target.blockIndex) { + if ( + current && + current.turnId === target.turnId && + current.blockIndex === target.blockIndex && + current.live === target.live + ) { thinkingTarget.value = null; if (detailTarget.value === 'thinking') detailTarget.value = null; return; From 119eba8e49bec3749a8c9a75c67bad9756ce238a Mon Sep 17 00:00:00 2001 From: qer Date: Fri, 26 Jun 2026 01:08:35 +0800 Subject: [PATCH 4/5] fix(web): keep thinking panel content complete across step/turn boundaries The previous fix read live thinking text from the streaming store, but the store is cleared on every messageUpdated (tool slot / step end / turn end) so the committed content can take over in the chat. The final deltas and that clear land in the same tick and coalesce, so the panel would close *before* rendering the last chunk, and at step boundaries it would stay closed for the next step. That made the stream look unstable and drop the final content. When the store is empty, fall back to the committed thinking block in the turn (which already holds the full text via messageUpdated). The panel now shows the live text while streaming and the complete committed text through each boundary, instead of flickering closed. Also add an integration test that drives the real projector -> store -> reducer pipeline (thinking deltas, step/turn completion, and a mid-stream reconnect) to lock in the live-store lifecycle. --- .../src/composables/useDetailPanel.ts | 16 +- apps/kimi-web/test/thinking-streaming.test.ts | 236 ++++++++++++++++++ 2 files changed, 248 insertions(+), 4 deletions(-) create mode 100644 apps/kimi-web/test/thinking-streaming.test.ts diff --git a/apps/kimi-web/src/composables/useDetailPanel.ts b/apps/kimi-web/src/composables/useDetailPanel.ts index 1e705f32b..7ac4909f7 100644 --- a/apps/kimi-web/src/composables/useDetailPanel.ts +++ b/apps/kimi-web/src/composables/useDetailPanel.ts @@ -73,15 +73,23 @@ export function useDetailPanel({ // A live (still-streaming) thinking block is not in `client.turns` — its // text lives in the streaming store. Read it there so the panel shows the // growing text while the reply is still streaming (reactive: updates on - // each delta). Once the turn settles the store is cleared and the live - // target goes stale; return null so the panel closes rather than indexing - // `turn.blocks` with a contentIndex that does not match its sourceIndex. + // each delta). if (target.live) { const sid = client.activeSessionId.value; const live = streamingBySession[sid]?.blocks.find( (b) => b.kind === 'thinking' && b.contentIndex === target.blockIndex, ); - return live?.text ?? null; + if (live?.text) return live.text; + // The store is cleared at every `messageUpdated` (tool slot / step end / + // turn end) so the committed content takes over in the chat. The last + // deltas and that clear land in the same tick and coalesce, so without a + // fallback the panel would close *before* rendering the final chunk. + // Fall back to the committed thinking block in the turn — it already + // holds the full text — so the panel keeps showing the complete content + // through the boundary instead of flickering closed. + const turn = client.turns.value.find((tn) => tn.id === target.turnId); + const committed = turn?.blocks?.find((b) => b.kind === 'thinking'); + return committed?.kind === 'thinking' ? committed.thinking : null; } const turn = client.turns.value.find((tn) => tn.id === target.turnId); const blk = turn?.blocks?.[target.blockIndex]; diff --git a/apps/kimi-web/test/thinking-streaming.test.ts b/apps/kimi-web/test/thinking-streaming.test.ts new file mode 100644 index 000000000..733ba2dd6 --- /dev/null +++ b/apps/kimi-web/test/thinking-streaming.test.ts @@ -0,0 +1,236 @@ +import { beforeEach, describe, expect, it } from 'vitest'; +import { createAgentProjector } from '../src/api/daemon/agentEventProjector'; +import type { AppEvent, AppSession } from '../src/api/types'; +import { reduceAppEvent, createInitialState } from '../src/api/daemon/eventReducer'; +import type { KimiClientState } from '../src/api/daemon/eventReducer'; +import { + appendStreamingDelta, + clearStreaming, + streamingBySession, +} from '../src/composables/client/streamingStore'; + +// Integration reproduction for the thinking-streaming regression. +// +// Real daemon frames flow: raw agent-core frame → projector.project() → +// AppEvent[] → applyEvent(). Since a29798789, applyEvent short-circuits +// `assistantDelta` into the streaming store and bypasses the reducer, then +// `messageUpdated` / `sessionStatusChanged`(idle|aborted) clear the store so +// the committed content takes over. This test drives that exact pipeline with a +// think → text → step.completed → turn.ended sequence and asserts the live +// thinking block stays visible while streaming and settles correctly. + +const SID = 'session-thinking'; + +// Local mirror of applyEvent's short-circuit + commit-clear logic (the real +// one closes over module-level rawState, which we cannot reset between tests). +// Keeping the decision rules here in sync with useKimiWebClient.applyEvent is +// the point — if they drift, this test catches it. +function applyEventLocally(state: KimiClientState, event: AppEvent, seq: number): KimiClientState { + if (event.type === 'assistantDelta') { + appendStreamingDelta(SID, event.messageId, event.contentIndex, event.delta); + // advanceSeqCursor (no rendering dependency) + if (seq > 0 && seq > (state.lastSeqBySession[SID] ?? 0)) { + state.lastSeqBySession[SID] = seq; + } + return state; + } + const next = reduceAppEvent(state, event, { sessionId: SID, seq }); + if (event.type === 'messageUpdated') clearStreaming(SID); + if (event.type === 'sessionStatusChanged' && (event.status === 'idle' || event.status === 'aborted')) { + clearStreaming(SID); + } + return next; +} + +function projectFrame( + projector: ReturnType, + state: KimiClientState, + type: string, + payload: Record, + seq: number, + offset?: number, +): KimiClientState { + const events = projector.project(type, payload, SID, offset !== undefined ? { offset } : undefined); + let s = state; + for (const evt of events) s = applyEventLocally(s, evt, seq); + return s; +} + +beforeEach(() => { + clearStreaming(SID); +}); + +function makeInitialSessionState(): KimiClientState { + // Real flow has a sessionCreated before turn.started; sessionStatusChanged + // only maps an existing session, so seed one here. + const session: AppSession = { + id: SID, + title: SID, + createdAt: '2026-01-01T00:00:00.000Z', + updatedAt: '2026-01-01T00:00:00.000Z', + status: 'idle', + archived: false, + cwd: '/workspace', + model: 'kimi-code', + usage: { + inputTokens: 0, + outputTokens: 0, + cacheReadTokens: 0, + cacheCreationTokens: 0, + totalCostUsd: 0, + contextTokens: 0, + contextLimit: 0, + turnCount: 0, + }, + messageCount: 0, + lastSeq: 0, + }; + return { ...createInitialState(), sessions: [session] }; +} + +describe('thinking streaming pipeline (projector → streaming store → reducer)', () => { + it('keeps the live thinking block visible while streaming, then settles on step.completed', () => { + const projector = createAgentProjector(); + let state: KimiClientState = { ...makeInitialSessionState() }; + + // turn.started → running + state = projectFrame(projector, state, 'turn.started', { turnId: 1 }, 1); + expect(state.sessions.some((s) => s.id === SID && s.status === 'running')).toBe(true); + + // turn.step.started → empty assistant message created + state = projectFrame(projector, state, 'turn.step.started', { turnId: 1 }, 2); + const msgs = state.messagesBySession[SID] ?? []; + expect(msgs.length).toBe(1); + expect(msgs[0]!.role).toBe('assistant'); + expect(msgs[0]!.content).toHaveLength(0); + const assistantMsgId = msgs[0]!.id; + + // thinking.delta x3 → live thinking accumulates in the streaming store ONLY + // (reducer is bypassed), so messagesBySession stays empty. Wire `offset` is + // the pre-append cumulative length, so it must track the running total. + state = projectFrame(projector, state, 'thinking.delta', { delta: 'Let me ' }, 3, 0); + state = projectFrame(projector, state, 'thinking.delta', { delta: 'think ' }, 4, 7); + state = projectFrame(projector, state, 'thinking.delta', { delta: 'about this.' }, 5, 13); + + expect(state.messagesBySession[SID]![0]!.content).toHaveLength(0); + const live = streamingBySession[SID]; + expect(live).toBeDefined(); + expect(live!.messageId).toBe(assistantMsgId); + expect(live!.blocks).toHaveLength(1); + expect(live!.blocks[0]).toMatchObject({ kind: 'thinking', contentIndex: 0, text: 'Let me think about this.' }); + + // assistant.delta → opens a NEW content part (idx 1, text) in the live store. + // turnTextLen starts at 0 for the text stream; offset tracks its own running + // total (independent of thinking). + state = projectFrame(projector, state, 'assistant.delta', { delta: 'Here ' }, 6, 0); + state = projectFrame(projector, state, 'assistant.delta', { delta: 'is the answer.' }, 7, 5); + + const live2 = streamingBySession[SID]; + expect(live2!.blocks).toHaveLength(2); + expect(live2!.blocks[0]).toMatchObject({ kind: 'thinking' }); + expect(live2!.blocks[1]).toMatchObject({ kind: 'text', text: 'Here is the answer.' }); + + // turn.step.completed → messageUpdated carries the full content (thinking + + // text) and CLEARS the live store. The committed content must now hold both. + state = projectFrame(projector, state, 'turn.step.completed', { turnId: 1, usage: {} }, 8); + expect(streamingBySession[SID]).toBeUndefined(); + const committed = state.messagesBySession[SID]![0]!.content; + expect(committed).toHaveLength(2); + expect(committed[0]).toMatchObject({ type: 'thinking', thinking: 'Let me think about this.' }); + expect(committed[1]).toMatchObject({ type: 'text', text: 'Here is the answer.' }); + + // turn.ended → idle; store already cleared. + state = projectFrame(projector, state, 'turn.ended', { turnId: 1, reason: 'completed', durationMs: 42 }, 9); + expect(streamingBySession[SID]).toBeUndefined(); + expect(state.sessions.some((s) => s.id === SID && s.status === 'idle')).toBe(true); + expect(state.messagesBySession[SID]![0]!.durationMs).toBe(42); + }); + + it('does NOT clear the live thinking store on unrelated lifecycle events mid-stream', () => { + const projector = createAgentProjector(); + let state: KimiClientState = { ...makeInitialSessionState() }; + state = projectFrame(projector, state, 'turn.started', { turnId: 1 }, 1); + state = projectFrame(projector, state, 'turn.step.started', { turnId: 1 }, 2); + state = projectFrame(projector, state, 'thinking.delta', { delta: 'pondering' }, 3, 0); + + // An agent.status.updated (usage) event is common mid-stream and must NOT + // wipe the live thinking block. + state = projectFrame(projector, state, 'agent.status.updated', { model: 'kimi-x', contextTokens: 123 }, 4); + expect(streamingBySession[SID]).toBeDefined(); + expect(streamingBySession[SID]!.blocks[0]).toMatchObject({ kind: 'thinking', text: 'pondering' }); + }); + + it('streams thinking across a second step after the first step commits', () => { + // Multi-step turn: step1 (thinking + text) completes → messageUpdated clears + // the store → step2 starts a fresh assistant message and streams thinking. + // The live thinking must reappear for step2 (regression guard: clearing on + // messageUpdated must not permanently kill the store for the rest of turn). + const projector = createAgentProjector(); + let state: KimiClientState = { ...makeInitialSessionState() }; + + state = projectFrame(projector, state, 'turn.started', { turnId: 1 }, 1); + state = projectFrame(projector, state, 'turn.step.started', { turnId: 1 }, 2); + state = projectFrame(projector, state, 'thinking.delta', { delta: 'step1 thought' }, 3, 0); + state = projectFrame(projector, state, 'assistant.delta', { delta: 'step1 text' }, 4, 0); + state = projectFrame(projector, state, 'turn.step.completed', { turnId: 1, usage: {} }, 5); + expect(streamingBySession[SID]).toBeUndefined(); + + // step2: a fresh assistant message; live thinking must accumulate again. + state = projectFrame(projector, state, 'turn.step.started', { turnId: 1 }, 6); + state = projectFrame(projector, state, 'thinking.delta', { delta: 'step2 thought' }, 7, 0); + const live = streamingBySession[SID]; + expect(live).toBeDefined(); + expect(live!.blocks).toHaveLength(1); + expect(live!.blocks[0]).toMatchObject({ kind: 'thinking', text: 'step2 thought' }); + }); +}); + + it('resumes live thinking after a mid-stream reconnect (seedInFlight) without duplicating', () => { + // Reproduce the reconnect-mid-thinking path that ab177991a targeted. + // Flow: snapshot arrives with inFlightTurn.thinkingText already partially + // streamed → syncSessionFromSnapshot clears the store → seedInFlight + // rebuilds the assistant message (thinking + text parts) via messageCreated + // → live thinking.delta arrives aligned by offset and must append to the + // store WITHOUT re-rendering the already-committed thinking prefix. + const projector = createAgentProjector(); + let state: KimiClientState = { ...makeInitialSessionState() }; + + // The snapshot already saw "thinking prefix " (15 chars) of thinking. + const inFlight = { + turnId: 1, + promptId: 'pr_real', + thinkingText: 'thinking prefix ', + assistantText: '', + runningTools: [], + }; + // syncSessionFromSnapshot: clearStreaming first (store is empty here anyway) + clearStreaming(SID); + // seedInFlight → sessionStatusChanged(running) + messageCreated(thinking) + const seedEvents = projector.seedInFlight(SID, inFlight); + for (const evt of seedEvents) state = applyEventLocally(state, evt, 100); + + // The seeded assistant message carries the committed thinking prefix. + const seeded = state.messagesBySession[SID]!.at(-1)!; + expect(seeded.role).toBe('assistant'); + expect(seeded.content[0]).toMatchObject({ type: 'thinking', thinking: 'thinking prefix ' }); + const seededMsgId = seeded.id; + // Store must be empty after seed (only messageCreated ran, no delta yet). + expect(streamingBySession[SID]).toBeUndefined(); + + // Live thinking.delta resumes at the seeded prefix length. It must append + // to the store as a NEW live thinking block (the seeded thinking is part 0, + // and the live delta continues part 0). + const liveEvents = projector.project('thinking.delta', { delta: 'continued…' }, SID, { + offset: inFlight.thinkingText.length, + }); + for (const evt of liveEvents) state = applyEventLocally(state, evt, 101); + + const live = streamingBySession[SID]; + expect(live).toBeDefined(); + expect(live!.messageId).toBe(seededMsgId); + expect(live!.blocks).toHaveLength(1); + expect(live!.blocks[0]).toMatchObject({ kind: 'thinking', text: 'continued…' }); + // The committed prefix is NOT duplicated into the live store. + expect(live!.blocks[0]!.text).not.toContain('prefix'); + }); +}); From 318e752fb4db125b86ec3d8ca270cb07e4a32d6a Mon Sep 17 00:00:00 2001 From: qer Date: Fri, 26 Jun 2026 01:25:28 +0800 Subject: [PATCH 5/5] fix(web): include live tail in copy and clear store on session delete Two follow-ups to the streaming-store migration: - "Copy conversation" and "copy final summary" serialized only props.turns, which during streaming stops at the last messageUpdated and drops the live tail. Merge the streaming store's live blocks into the streaming turn before serializing, so a copy mid-stream captures the full in-flight text. - The WS sessionDeleted event flows through the reducer (not forgetSession), so it never cleared the streaming store and left an orphaned entry behind. Clear it in applyEvent, mirroring forgetSession. --- .../kimi-web/src/components/chat/ChatPane.vue | 29 +++++++++++++++++-- .../src/composables/useKimiWebClient.ts | 7 +++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/apps/kimi-web/src/components/chat/ChatPane.vue b/apps/kimi-web/src/components/chat/ChatPane.vue index 7216a7127..52a3997e5 100644 --- a/apps/kimi-web/src/components/chat/ChatPane.vue +++ b/apps/kimi-web/src/components/chat/ChatPane.vue @@ -24,6 +24,7 @@ import { turnFinalText, turnToMarkdown, } from '../chatTurnRendering'; +import { streamingBySession, type StreamingBlock } from '../../composables/client/streamingStore'; const { t } = useI18n(); @@ -300,14 +301,37 @@ function confirmEditMessage(turn: ChatTurn): void { const copiedConversation = ref(false); let copiedConversationTimer: ReturnType | null = null; +/** Live text/thinking blocks for the turn currently streaming, if any. */ +function liveBlocksForStreaming(): StreamingBlock[] { + if (!props.running || !props.sessionId) return []; + return streamingBySession[props.sessionId]?.blocks ?? []; +} + +/** + * Merge the still-streaming live blocks into a turn for serialization (copy). + * Live text/thinking is not in `turn.blocks` during streaming (deltas bypass + * messagesBySession), so without this a copy mid-stream would drop the tail. + */ +function withLiveBlocks(turn: ChatTurn, liveBlocks: StreamingBlock[]): ChatTurn { + if (liveBlocks.length === 0) return turn; + const blocks = turn.blocks ? [...turn.blocks] : turnBlocks(turn); + for (const blk of liveBlocks) { + if (blk.kind === 'text' && blk.text) blocks.push({ kind: 'text', text: blk.text }); + else if (blk.kind === 'thinking' && blk.text) blocks.push({ kind: 'thinking', thinking: blk.text }); + } + return { ...turn, blocks }; +} + /** Convert the entire conversation to Markdown and copy to clipboard. */ function copyConversation(): void { if (props.turns.length === 0) return; + const liveBlocks = liveBlocksForStreaming(); const lines: string[] = []; for (const turn of props.turns) { if (turn.role === 'compaction') continue; // dividers don't copy + const t = turn.id === streamingTurnId.value ? withLiveBlocks(turn, liveBlocks) : turn; const roleLabel = turn.role === 'user' ? 'User' : 'Assistant'; - const content = turnToMarkdown(turn); + const content = turnToMarkdown(t); if (content.trim()) { lines.push(`**${roleLabel}**\n\n${content}`); } @@ -336,8 +360,9 @@ function assistantRunEndingAt(index: number): ChatTurn[] { } function assistantRunFinalText(index: number): string { + const liveBlocks = liveBlocksForStreaming(); return assistantRunEndingAt(index) - .map((t) => turnFinalText(t)) + .map((t) => turnFinalText(t.id === streamingTurnId.value ? withLiveBlocks(t, liveBlocks) : t)) .filter(Boolean) .join('\n\n'); } diff --git a/apps/kimi-web/src/composables/useKimiWebClient.ts b/apps/kimi-web/src/composables/useKimiWebClient.ts index 6a709da8a..8b450938b 100644 --- a/apps/kimi-web/src/composables/useKimiWebClient.ts +++ b/apps/kimi-web/src/composables/useKimiWebClient.ts @@ -705,6 +705,13 @@ function applyEvent(event: ReturnType, sessionId: string, seq ) { clearStreaming(sessionId); } + // Session removed via the WS delete event: release the streaming entry so it + // does not leak as an orphan. `forgetSession()` already does this, but that + // path only covers not-found / archive — the WS delete event flows through the + // reducer instead, so it must clear the store here too. + if (event.type === 'sessionDeleted') { + clearStreaming(sessionId); + } if (event.type === 'configChanged') { rawState.defaultModel = event.config.defaultModel ?? null;