From 3f01178964fba7d06f716de3e8d8b146a8363752 Mon Sep 17 00:00:00 2001 From: neubig <398875+neubig@users.noreply.github.com> Date: Sun, 24 May 2026 07:54:28 -0400 Subject: [PATCH 1/7] fix(chat): render streaming assistant deltas --- .../should-render-event.test.ts | 41 ++++++ __tests__/utils/handle-event-for-ui.test.ts | 130 ++++++++++++++++++ .../should-render-event.ts | 5 + .../chat/event-message.tsx | 19 +++ src/types/agent-server/core/events/index.ts | 1 + .../core/events/streaming-delta-event.ts | 8 ++ .../agent-server/core/openhands-event.ts | 4 +- src/types/agent-server/type-guards.ts | 6 + src/utils/handle-event-for-ui.ts | 54 ++++++++ 9 files changed, 267 insertions(+), 1 deletion(-) create mode 100644 src/types/agent-server/core/events/streaming-delta-event.ts diff --git a/__tests__/components/conversation-events/chat/event-content-helpers/should-render-event.test.ts b/__tests__/components/conversation-events/chat/event-content-helpers/should-render-event.test.ts index 34f825360..63f1aa9c2 100644 --- a/__tests__/components/conversation-events/chat/event-content-helpers/should-render-event.test.ts +++ b/__tests__/components/conversation-events/chat/event-content-helpers/should-render-event.test.ts @@ -7,6 +7,7 @@ import { createUserMessageEvent, } from "test-utils"; import { ACPToolCallEvent } from "#/types/agent-server/core/events/acp-tool-call-event"; +import { StreamingDeltaEvent } from "#/types/agent-server/core/events/streaming-delta-event"; import { ActionEvent, ObservationEvent, @@ -96,6 +97,46 @@ describe("shouldRenderEvent - ACPToolCallEvent", () => { }); }); +describe("shouldRenderEvent - StreamingDeltaEvent", () => { + const makeStreamingDelta = ( + overrides: Partial = {}, + ): StreamingDeltaEvent => ({ + id: "delta-1", + kind: "StreamingDeltaEvent", + timestamp: "2024-01-01T00:00:00Z", + source: "agent", + content: "I'll start working on that.", + reasoning_content: null, + ...overrides, + }); + + it("renders text deltas", () => { + expect(shouldRenderEvent(makeStreamingDelta())).toBe(true); + }); + + it("renders reasoning-only deltas", () => { + expect( + shouldRenderEvent( + makeStreamingDelta({ + content: null, + reasoning_content: "thinking", + }), + ), + ).toBe(true); + }); + + it("hides empty deltas", () => { + expect( + shouldRenderEvent( + makeStreamingDelta({ + content: null, + reasoning_content: null, + }), + ), + ).toBe(false); + }); +}); + describe("shouldRenderEvent - SwitchLLM", () => { const switchAction: ActionEvent = { id: "switch-action", diff --git a/__tests__/utils/handle-event-for-ui.test.ts b/__tests__/utils/handle-event-for-ui.test.ts index 60e74dd53..850e4c399 100644 --- a/__tests__/utils/handle-event-for-ui.test.ts +++ b/__tests__/utils/handle-event-for-ui.test.ts @@ -7,6 +7,7 @@ import { OpenHandsEvent, } from "#/types/agent-server/core"; import { ACPToolCallEvent } from "#/types/agent-server/core/events/acp-tool-call-event"; +import { StreamingDeltaEvent } from "#/types/agent-server/core/events/streaming-delta-event"; import { handleEventForUI } from "#/utils/handle-event-for-ui"; describe("handleEventForUI", () => { @@ -76,6 +77,56 @@ describe("handleEventForUI", () => { extended_content: [], }; + const mockFinishActionEvent: ActionEvent = { + id: "test-finish-action-1", + timestamp: Date.now().toString(), + source: "agent", + thought: [], + thinking_blocks: [], + action: { + kind: "FinishAction", + message: "I'll start working on that. Done.", + }, + tool_name: "finish", + tool_call_id: "call_finish_1", + tool_call: { + id: "call_finish_1", + type: "function", + function: { + name: "finish", + arguments: JSON.stringify({ + message: "I'll start working on that. Done.", + }), + }, + }, + llm_response_id: "response_finish", + security_risk: SecurityRisk.UNKNOWN, + }; + + const mockAgentMessageEvent: MessageEvent = { + id: "test-agent-message-1", + timestamp: Date.now().toString(), + source: "agent", + llm_message: { + role: "assistant", + content: [{ type: "text", text: "I'll start working on that. Done." }], + }, + activated_microagents: [], + extended_content: [], + }; + + const makeStreamingDelta = ( + id: string, + content: string | null, + ): StreamingDeltaEvent => ({ + id, + kind: "StreamingDeltaEvent", + timestamp: Date.now().toString(), + source: "agent", + content, + reasoning_content: null, + }); + it("should add non-observation events to the end of uiEvents", () => { const initialUiEvents = [mockMessageEvent]; const result = handleEventForUI(mockActionEvent, initialUiEvents); @@ -241,6 +292,85 @@ describe("handleEventForUI", () => { }); }); + describe("StreamingDeltaEvent", () => { + it("merges consecutive deltas into a single provisional assistant event", () => { + const first = makeStreamingDelta("delta-1", "I'll start "); + const second = makeStreamingDelta("delta-2", "working on that."); + + const afterFirst = handleEventForUI(first, [mockMessageEvent]); + const afterSecond = handleEventForUI(second, afterFirst); + + expect(afterSecond).toEqual([ + mockMessageEvent, + { + ...second, + content: "I'll start working on that.", + reasoning_content: null, + }, + ]); + }); + + it("removes provisional deltas from the current turn when finish arrives", () => { + const first = makeStreamingDelta("delta-1", "I'll start "); + const second = makeStreamingDelta("delta-2", "working on that."); + const uiEvents = [ + mockMessageEvent, + handleEventForUI(second, handleEventForUI(first, [])).at(-1)!, + ]; + + const result = handleEventForUI(mockFinishActionEvent, uiEvents); + + expect(result).toEqual([mockMessageEvent, mockFinishActionEvent]); + }); + + it("removes provisional deltas from the current turn when an agent message arrives", () => { + const first = makeStreamingDelta("delta-1", "I'll start "); + const second = makeStreamingDelta("delta-2", "working on that."); + const uiEvents = [ + mockMessageEvent, + handleEventForUI(second, handleEventForUI(first, [])).at(-1)!, + ]; + + const result = handleEventForUI(mockAgentMessageEvent, uiEvents); + + expect(result).toEqual([mockMessageEvent, mockAgentMessageEvent]); + }); + + it("keeps deltas from older turns when a later turn finishes", () => { + const oldUserMessage: MessageEvent = { + ...mockMessageEvent, + id: "old-user-message", + }; + const nextUserMessage: MessageEvent = { + ...mockMessageEvent, + id: "next-user-message", + llm_message: { + role: "user", + content: [{ type: "text", text: "Next task" }], + }, + }; + const oldDelta = makeStreamingDelta("old-delta", "Old live text"); + const currentDelta = makeStreamingDelta( + "current-delta", + "Current live text", + ); + + const result = handleEventForUI(mockFinishActionEvent, [ + oldUserMessage, + oldDelta, + nextUserMessage, + currentDelta, + ]); + + expect(result).toEqual([ + oldUserMessage, + oldDelta, + nextUserMessage, + mockFinishActionEvent, + ]); + }); + }); + it("should NOT add ThinkObservation even when ThinkAction is not found", () => { const mockThinkObservation: ObservationEvent = { id: "test-think-observation-1", diff --git a/src/components/conversation-events/chat/event-content-helpers/should-render-event.ts b/src/components/conversation-events/chat/event-content-helpers/should-render-event.ts index 18efbbb4e..46fe34a80 100644 --- a/src/components/conversation-events/chat/event-content-helpers/should-render-event.ts +++ b/src/components/conversation-events/chat/event-content-helpers/should-render-event.ts @@ -7,6 +7,7 @@ import { isConversationStateUpdateEvent, isHookExecutionEvent, isACPToolCallEvent, + isStreamingDeltaEvent, } from "#/types/agent-server/type-guards"; export const shouldRenderEvent = (event: OpenHandsEvent) => { @@ -89,6 +90,10 @@ export const shouldRenderEvent = (event: OpenHandsEvent) => { return event.status === "completed" || event.status === "failed"; } + if (isStreamingDeltaEvent(event)) { + return event.content !== null || event.reasoning_content !== null; + } + // Don't render any other event types (system events, etc.) return false; }; diff --git a/src/components/conversation-events/chat/event-message.tsx b/src/components/conversation-events/chat/event-message.tsx index d9051437c..f32185096 100644 --- a/src/components/conversation-events/chat/event-message.tsx +++ b/src/components/conversation-events/chat/event-message.tsx @@ -16,11 +16,13 @@ import { isPlanningFileEditorObservationEvent, isHookExecutionEvent, isACPToolCallEvent, + isStreamingDeltaEvent, } from "#/types/agent-server/type-guards"; import { useConfig } from "#/hooks/query/use-config"; import { useConversationStore } from "#/stores/conversation-store"; import { useAgentState } from "#/hooks/use-agent-state"; import { AgentState } from "#/types/agent-state"; +import { ChatMessage } from "#/components/features/chat/chat-message"; import { PlanPreview } from "../../features/chat/plan-preview"; import { ErrorEventMessage } from "./event-message-components/error-event-message"; import { UserAssistantEventMessage } from "./event-message-components/user-assistant-event-message"; @@ -176,6 +178,23 @@ export function EventMessage({ ); } + if (isStreamingDeltaEvent(event)) { + const content = event.content ?? ""; + const reasoningContent = event.reasoning_content ?? ""; + return ( + <> + {reasoningContent && } + {content && ( + + )} + + ); + } + // Finish actions if (isActionEvent(event) && event.action.kind === "FinishAction") { return ( diff --git a/src/types/agent-server/core/events/index.ts b/src/types/agent-server/core/events/index.ts index 08740d113..f74a1fcb2 100644 --- a/src/types/agent-server/core/events/index.ts +++ b/src/types/agent-server/core/events/index.ts @@ -7,4 +7,5 @@ export * from "./hook-execution-event"; export * from "./message-event"; export * from "./observation-event"; export * from "./pause-event"; +export * from "./streaming-delta-event"; export * from "./system-event"; diff --git a/src/types/agent-server/core/events/streaming-delta-event.ts b/src/types/agent-server/core/events/streaming-delta-event.ts new file mode 100644 index 000000000..f5c4221da --- /dev/null +++ b/src/types/agent-server/core/events/streaming-delta-event.ts @@ -0,0 +1,8 @@ +import { BaseEvent } from "../base/event"; + +export interface StreamingDeltaEvent extends BaseEvent { + kind: "StreamingDeltaEvent"; + source: "agent"; + content: string | null; + reasoning_content: string | null; +} diff --git a/src/types/agent-server/core/openhands-event.ts b/src/types/agent-server/core/openhands-event.ts index 4eaa25050..dcf2ddc36 100644 --- a/src/types/agent-server/core/openhands-event.ts +++ b/src/types/agent-server/core/openhands-event.ts @@ -15,6 +15,7 @@ import { HookExecutionEvent, PauseEvent, ServerErrorEvent, + StreamingDeltaEvent, } from "./events/index"; /** @@ -41,4 +42,5 @@ export type OpenHandsEvent = | ConversationErrorEvent // Control events | PauseEvent - | ServerErrorEvent; + | ServerErrorEvent + | StreamingDeltaEvent; diff --git a/src/types/agent-server/type-guards.ts b/src/types/agent-server/type-guards.ts index c65143657..5c5d91aba 100644 --- a/src/types/agent-server/type-guards.ts +++ b/src/types/agent-server/type-guards.ts @@ -25,6 +25,7 @@ import { } from "./core/events/conversation-state-event"; import { HookExecutionEvent } from "./core/events/hook-execution-event"; import { ACPToolCallEvent } from "./core/events/acp-tool-call-event"; +import { StreamingDeltaEvent } from "./core/events/streaming-delta-event"; import { SystemPromptEvent } from "./core/events/system-event"; /** @@ -249,6 +250,11 @@ export const isACPToolCallEvent = ( ): event is ACPToolCallEvent => "kind" in event && event.kind === "ACPToolCallEvent"; +export const isStreamingDeltaEvent = ( + event: OpenHandsEvent, +): event is StreamingDeltaEvent => + "kind" in event && event.kind === "StreamingDeltaEvent"; + // ============================================================================= // COMPATIBILITY TYPE GUARDS // ============================================================================= diff --git a/src/utils/handle-event-for-ui.ts b/src/utils/handle-event-for-ui.ts index 8298c7dff..09b95b6a8 100644 --- a/src/utils/handle-event-for-ui.ts +++ b/src/utils/handle-event-for-ui.ts @@ -1,8 +1,33 @@ import { OpenHandsEvent } from "#/types/agent-server/core"; import { isACPToolCallEvent, + isActionEvent, + isMessageEvent, isObservationEvent, + isStreamingDeltaEvent, } from "#/types/agent-server/type-guards"; +import { StreamingDeltaEvent } from "#/types/agent-server/core/events/streaming-delta-event"; + +const mergeStreamingDeltaEvent = ( + incoming: StreamingDeltaEvent, + existing: StreamingDeltaEvent, +): StreamingDeltaEvent => ({ + ...incoming, + content: `${existing.content ?? ""}${incoming.content ?? ""}` || null, + reasoning_content: + `${existing.reasoning_content ?? ""}${incoming.reasoning_content ?? ""}` || + null, +}); + +const findLastUserMessageIndex = (events: OpenHandsEvent[]): number => { + for (let index = events.length - 1; index >= 0; index -= 1) { + const event = events[index]; + if (isMessageEvent(event) && event.source === "user") { + return index; + } + } + return -1; +}; /** * Handles adding an event to the UI events array @@ -19,6 +44,35 @@ export const handleEventForUI = ( ): OpenHandsEvent[] => { const newUiEvents = [...uiEvents]; + if (isStreamingDeltaEvent(event)) { + if (event.content === null && event.reasoning_content === null) { + return newUiEvents; + } + + const lastIndex = newUiEvents.length - 1; + const lastEvent = newUiEvents[lastIndex]; + if (lastEvent && isStreamingDeltaEvent(lastEvent)) { + newUiEvents[lastIndex] = mergeStreamingDeltaEvent(event, lastEvent); + return newUiEvents; + } + + newUiEvents.push(event); + return newUiEvents; + } + + if ( + (isActionEvent(event) && event.action.kind === "FinishAction") || + (isMessageEvent(event) && event.source === "agent") + ) { + const lastUserMessageIndex = findLastUserMessageIndex(newUiEvents); + const finalizedUiEvents = newUiEvents.filter( + (uiEvent, index) => + index <= lastUserMessageIndex || !isStreamingDeltaEvent(uiEvent), + ); + finalizedUiEvents.push(event); + return finalizedUiEvents; + } + if (isACPToolCallEvent(event)) { const existingIndex = newUiEvents.findIndex( (uiEvent) => From bdaf1a5c2070ed8e9fda0039a68c9473e5c8c2c2 Mon Sep 17 00:00:00 2001 From: neubig <398875+neubig@users.noreply.github.com> Date: Sun, 24 May 2026 08:46:33 -0400 Subject: [PATCH 2/7] Keep ACP streamed messages in place --- __tests__/utils/handle-event-for-ui.test.ts | 92 ++++++++++++++-- src/utils/handle-event-for-ui.ts | 116 ++++++++++++++++++-- 2 files changed, 189 insertions(+), 19 deletions(-) diff --git a/__tests__/utils/handle-event-for-ui.test.ts b/__tests__/utils/handle-event-for-ui.test.ts index 850e4c399..bccfe72d2 100644 --- a/__tests__/utils/handle-event-for-ui.test.ts +++ b/__tests__/utils/handle-event-for-ui.test.ts @@ -310,30 +310,97 @@ describe("handleEventForUI", () => { ]); }); - it("removes provisional deltas from the current turn when finish arrives", () => { + it("finalizes streamed deltas in place when finish arrives", () => { const first = makeStreamingDelta("delta-1", "I'll start "); const second = makeStreamingDelta("delta-2", "working on that."); - const uiEvents = [ - mockMessageEvent, - handleEventForUI(second, handleEventForUI(first, [])).at(-1)!, - ]; + const streamedDelta = handleEventForUI( + second, + handleEventForUI(first, []), + ).at(-1)!; + const uiEvents = [mockMessageEvent, streamedDelta]; const result = handleEventForUI(mockFinishActionEvent, uiEvents); - expect(result).toEqual([mockMessageEvent, mockFinishActionEvent]); + expect(result).toEqual([ + mockMessageEvent, + { + ...streamedDelta, + content: "I'll start working on that. Done.", + }, + ]); }); - it("removes provisional deltas from the current turn when an agent message arrives", () => { + it("finalizes streamed deltas in place when an agent message arrives", () => { const first = makeStreamingDelta("delta-1", "I'll start "); const second = makeStreamingDelta("delta-2", "working on that."); - const uiEvents = [ - mockMessageEvent, - handleEventForUI(second, handleEventForUI(first, [])).at(-1)!, - ]; + const streamedDelta = handleEventForUI( + second, + handleEventForUI(first, []), + ).at(-1)!; + const uiEvents = [mockMessageEvent, streamedDelta]; const result = handleEventForUI(mockAgentMessageEvent, uiEvents); - expect(result).toEqual([mockMessageEvent, mockAgentMessageEvent]); + expect(result).toEqual([ + mockMessageEvent, + { + ...streamedDelta, + content: "I'll start working on that. Done.", + }, + ]); + }); + + it("keeps streamed deltas in their original locations when the final message aggregates them", () => { + const first = makeStreamingDelta( + "delta-1", + "I'll start working on that.", + ); + const second = makeStreamingDelta("delta-2", "I found the issue."); + const aggregateAgentMessage: MessageEvent = { + ...mockAgentMessageEvent, + llm_message: { + role: "assistant", + content: [ + { + type: "text", + text: "I'll start working on that.I found the issue.", + }, + ], + }, + }; + + const afterFirst = handleEventForUI(first, [mockMessageEvent]); + const afterObservation = handleEventForUI(mockObservationEvent, afterFirst); + const afterSecond = handleEventForUI(second, afterObservation); + const result = handleEventForUI(aggregateAgentMessage, afterSecond); + + expect(result).toEqual([ + mockMessageEvent, + first, + mockObservationEvent, + second, + ]); + }); + + it("appends a distinct final message that does not match streamed text", () => { + const streamedDelta = makeStreamingDelta( + "delta-1", + "I'll start working on that.", + ); + const finalMessage: MessageEvent = { + ...mockAgentMessageEvent, + llm_message: { + role: "assistant", + content: [{ type: "text", text: "Done." }], + }, + }; + + const result = handleEventForUI(finalMessage, [ + mockMessageEvent, + streamedDelta, + ]); + + expect(result).toEqual([mockMessageEvent, streamedDelta, finalMessage]); }); it("keeps deltas from older turns when a later turn finishes", () => { @@ -366,6 +433,7 @@ describe("handleEventForUI", () => { oldUserMessage, oldDelta, nextUserMessage, + currentDelta, mockFinishActionEvent, ]); }); diff --git a/src/utils/handle-event-for-ui.ts b/src/utils/handle-event-for-ui.ts index 09b95b6a8..e70159ca9 100644 --- a/src/utils/handle-event-for-ui.ts +++ b/src/utils/handle-event-for-ui.ts @@ -1,4 +1,4 @@ -import { OpenHandsEvent } from "#/types/agent-server/core"; +import { MessageEvent, OpenHandsEvent } from "#/types/agent-server/core"; import { isACPToolCallEvent, isActionEvent, @@ -19,6 +19,14 @@ const mergeStreamingDeltaEvent = ( null, }); +const appendContentToStreamingDeltaEvent = ( + existing: StreamingDeltaEvent, + content: string, +): StreamingDeltaEvent => ({ + ...existing, + content: `${existing.content ?? ""}${content}` || null, +}); + const findLastUserMessageIndex = (events: OpenHandsEvent[]): number => { for (let index = events.length - 1; index >= 0; index -= 1) { const event = events[index]; @@ -29,6 +37,100 @@ const findLastUserMessageIndex = (events: OpenHandsEvent[]): number => { return -1; }; +const getAgentMessageText = (event: MessageEvent): string => + event.llm_message.content + .filter((content) => content.type === "text") + .map((content) => content.text) + .join("\n"); + +const getFinalAgentText = (event: OpenHandsEvent): string | null => { + if (isActionEvent(event) && event.action.kind === "FinishAction") { + return event.action.message; + } + + if (isMessageEvent(event) && event.source === "agent") { + return getAgentMessageText(event); + } + + return null; +}; + +const findTextSegmentsInOrder = ( + text: string, + segments: string[], +): { matched: boolean; lastMatchEnd: number } => { + let searchStart = 0; + let lastMatchEnd = 0; + + for (const segment of segments) { + const index = text.indexOf(segment, searchStart); + if (index === -1) { + return { matched: false, lastMatchEnd }; + } + lastMatchEnd = index + segment.length; + searchStart = lastMatchEnd; + } + + return { matched: true, lastMatchEnd }; +}; + +const finalizeStreamingDeltasInPlace = ( + finalEvent: OpenHandsEvent, + uiEvents: OpenHandsEvent[], +): OpenHandsEvent[] | null => { + const lastUserMessageIndex = findLastUserMessageIndex(uiEvents); + const currentTurnStreamingDeltaIndexes = uiEvents + .map((uiEvent, index) => ({ uiEvent, index })) + .filter( + ({ uiEvent, index }) => + index > lastUserMessageIndex && isStreamingDeltaEvent(uiEvent), + ) + .map(({ index }) => index); + + if (currentTurnStreamingDeltaIndexes.length === 0) { + return null; + } + + const finalText = getFinalAgentText(finalEvent); + const streamingSegments = currentTurnStreamingDeltaIndexes + .map((index) => uiEvents[index]) + .filter(isStreamingDeltaEvent) + .map((uiEvent) => uiEvent.content ?? "") + .filter((content) => content.length > 0); + + if (!finalText || streamingSegments.length === 0) { + return null; + } + + const nextUiEvents = [...uiEvents]; + const streamedText = streamingSegments.join(""); + let unstreamedSuffix = ""; + + if (finalText.startsWith(streamedText)) { + unstreamedSuffix = finalText.slice(streamedText.length); + } else { + const match = findTextSegmentsInOrder(finalText, streamingSegments); + if (!match.matched) { + return null; + } + unstreamedSuffix = finalText.slice(match.lastMatchEnd); + } + + const lastDeltaIndex = + currentTurnStreamingDeltaIndexes[ + currentTurnStreamingDeltaIndexes.length - 1 + ]; + const lastDelta = nextUiEvents[lastDeltaIndex]; + if (unstreamedSuffix && isStreamingDeltaEvent(lastDelta)) { + nextUiEvents[lastDeltaIndex] = appendContentToStreamingDeltaEvent( + lastDelta, + unstreamedSuffix, + ); + } + + return nextUiEvents; +}; + /** * Handles adding an event to the UI events array * Replaces actions with observations when they arrive (so UI shows observation instead of action) @@ -64,13 +166,13 @@ export const handleEventForUI = ( (isActionEvent(event) && event.action.kind === "FinishAction") || (isMessageEvent(event) && event.source === "agent") ) { - const lastUserMessageIndex = findLastUserMessageIndex(newUiEvents); - const finalizedUiEvents = newUiEvents.filter( - (uiEvent, index) => - index <= lastUserMessageIndex || !isStreamingDeltaEvent(uiEvent), + const finalizedUiEvents = finalizeStreamingDeltasInPlace( + event, + newUiEvents, ); - finalizedUiEvents.push(event); - return finalizedUiEvents; + if (finalizedUiEvents) { + return finalizedUiEvents; + } } if (isACPToolCallEvent(event)) { From 03623d264a8815985cbc2d160e5843463a0278fd Mon Sep 17 00:00:00 2001 From: neubig <398875+neubig@users.noreply.github.com> Date: Sun, 24 May 2026 09:22:23 -0400 Subject: [PATCH 3/7] Anchor optimistic user messages in chat --- .../optimistic-user-message-store.test.ts | 65 +++++++++++++ .../optimistic-user-message-events.test.ts | 96 +++++++++++++++++++ .../user-assistant-event-message.tsx | 44 +++++++++ .../conversation-events/chat/messages.tsx | 12 +++ .../features/chat/chat-interface.tsx | 51 ++++++---- .../features/chat/pending-user-messages.tsx | 4 +- .../conversation-websocket-context.tsx | 17 ++++ src/stores/optimistic-user-message-store.ts | 53 ++++++++-- src/utils/optimistic-user-message-events.ts | 68 +++++++++++++ 9 files changed, 383 insertions(+), 27 deletions(-) create mode 100644 __tests__/utils/optimistic-user-message-events.test.ts create mode 100644 src/utils/optimistic-user-message-events.ts diff --git a/__tests__/stores/optimistic-user-message-store.test.ts b/__tests__/stores/optimistic-user-message-store.test.ts index fe23f9330..bea772fc0 100644 --- a/__tests__/stores/optimistic-user-message-store.test.ts +++ b/__tests__/stores/optimistic-user-message-store.test.ts @@ -74,6 +74,40 @@ describe("optimistic-user-message-store", () => { expect(entry.errorMessage).toBeUndefined(); }); + it("marks a sending message as sent", () => { + const store = useOptimisticUserMessageStore.getState(); + const id = store.enqueuePendingMessage({ + conversationId: CONVO, + text: "accepted", + }); + + store.markPendingMessageSent(id); + + const [entry] = useOptimisticUserMessageStore.getState().pendingMessages; + expect(entry.status).toBe("sent"); + expect(entry.errorMessage).toBeUndefined(); + }); + + it("marks the oldest sending message in a conversation as sent", () => { + const store = useOptimisticUserMessageStore.getState(); + const firstId = store.enqueuePendingMessage({ + conversationId: CONVO, + text: "first", + }); + store.enqueuePendingMessage({ conversationId: CONVO, text: "second" }); + store.enqueuePendingMessage({ conversationId: "conv-b", text: "other" }); + + const marked = store.markOldestSendingMessageSent(CONVO); + + expect(marked?.id).toBe(firstId); + const pending = useOptimisticUserMessageStore.getState().pendingMessages; + expect(pending.map((message) => message.status)).toEqual([ + "sent", + "sending", + "sending", + ]); + }); + it("enqueue stores `content` separately from `text` and defaults it to `text`", () => { const store = useOptimisticUserMessageStore.getState(); const idA = store.enqueuePendingMessage({ @@ -117,6 +151,22 @@ describe("optimistic-user-message-store", () => { expect(remaining[0].id).toBe(firstId); }); + it("consumeMatchingPendingMessage removes a sent optimistic message when the echo arrives", () => { + const store = useOptimisticUserMessageStore.getState(); + const id = store.enqueuePendingMessage({ + conversationId: CONVO, + text: "accepted", + }); + store.markPendingMessageSent(id); + + const consumed = store.consumeMatchingPendingMessage(CONVO, "accepted"); + + expect(consumed?.id).toBe(id); + expect( + useOptimisticUserMessageStore.getState().pendingMessages, + ).toHaveLength(0); + }); + it("consumeMatchingPendingMessage falls back to oldest sending entry when no exact match exists", () => { const store = useOptimisticUserMessageStore.getState(); const firstId = store.enqueuePendingMessage({ @@ -219,6 +269,21 @@ describe("optimistic-user-message-store", () => { expect(entry.errorMessage).toBe("Send timed out"); }); + it("watchdog timeout does nothing if the message was already marked sent", () => { + const store = useOptimisticUserMessageStore.getState(); + const id = store.enqueuePendingMessage({ + conversationId: CONVO, + text: "accepted", + }); + store.markPendingMessageSent(id); + + vi.advanceTimersByTime(PENDING_MESSAGE_TIMEOUT_MS); + + const [entry] = useOptimisticUserMessageStore.getState().pendingMessages; + expect(entry.id).toBe(id); + expect(entry.status).toBe("sent"); + }); + it("watchdog timeout does nothing if the echo already consumed the message", () => { const store = useOptimisticUserMessageStore.getState(); store.enqueuePendingMessage({ conversationId: CONVO, text: "fast" }); diff --git a/__tests__/utils/optimistic-user-message-events.test.ts b/__tests__/utils/optimistic-user-message-events.test.ts new file mode 100644 index 000000000..81c8d433a --- /dev/null +++ b/__tests__/utils/optimistic-user-message-events.test.ts @@ -0,0 +1,96 @@ +import { describe, expect, it } from "vitest"; +import { + MessageEvent, + OpenHandsEvent, + SecurityRisk, +} from "#/types/agent-server/core"; +import { PendingUserMessage } from "#/stores/optimistic-user-message-store"; +import { + isOptimisticUserMessageEvent, + mergeOptimisticUserMessages, +} from "#/utils/optimistic-user-message-events"; + +describe("optimistic-user-message-events", () => { + const userEvent: MessageEvent = { + id: "server-user", + timestamp: "2026-05-24T09:11:33.000Z", + source: "user", + llm_message: { + role: "user", + content: [{ type: "text", text: "server message" }], + }, + activated_microagents: [], + extended_content: [], + }; + + const agentEvent: OpenHandsEvent = { + id: "agent-event", + timestamp: "2026-05-24T09:11:35.000Z", + source: "agent", + thought: [], + thinking_blocks: [], + action: { + kind: "FinishAction", + message: "done", + }, + tool_name: "finish", + tool_call_id: "finish-call", + tool_call: { + id: "finish-call", + type: "function", + function: { + name: "finish", + arguments: JSON.stringify({ message: "done" }), + }, + }, + llm_response_id: "response", + security_risk: SecurityRisk.UNKNOWN, + }; + + const pendingMessage: PendingUserMessage = { + id: "pending-1", + conversationId: "conv-1", + text: "optimistic message", + content: "optimistic message", + status: "sending", + imageUrls: [], + fileUrls: [], + timestamp: "2026-05-24T09:11:34.000Z", + }; + + it("inserts pending messages into timestamp order", () => { + const merged = mergeOptimisticUserMessages( + [userEvent, agentEvent], + [pendingMessage], + ); + + expect(merged.map((event) => event.id)).toEqual([ + "server-user", + "pending-1", + "agent-event", + ]); + expect(isOptimisticUserMessageEvent(merged[1])).toBe(true); + }); + + it("carries pending status into the optimistic event", () => { + const [event] = mergeOptimisticUserMessages([], [ + { ...pendingMessage, status: "sent" }, + ]); + + expect(isOptimisticUserMessageEvent(event)).toBe(true); + if (isOptimisticUserMessageEvent(event)) { + expect(event.optimisticPendingStatus).toBe("sent"); + expect(event.llm_message.content).toEqual([ + { type: "text", text: "optimistic message" }, + ]); + } + }); + + it("does not duplicate an event that already exists", () => { + const merged = mergeOptimisticUserMessages([userEvent], [ + { ...pendingMessage, id: userEvent.id }, + ]); + + expect(merged).toEqual([userEvent]); + }); +}); diff --git a/src/components/conversation-events/chat/event-message-components/user-assistant-event-message.tsx b/src/components/conversation-events/chat/event-message-components/user-assistant-event-message.tsx index 5727ddc51..6224e3b0c 100644 --- a/src/components/conversation-events/chat/event-message-components/user-assistant-event-message.tsx +++ b/src/components/conversation-events/chat/event-message-components/user-assistant-event-message.tsx @@ -4,6 +4,10 @@ import { ChatMessage } from "../../../features/chat/chat-message"; import { ImageCarousel } from "../../../features/images/image-carousel"; import { ConversationConfirmationButtons } from "#/components/shared/buttons/conversation-confirmation-buttons"; import { parseMessageFromEvent } from "../event-content-helpers/parse-message-from-event"; +import { isOptimisticUserMessageEvent } from "#/utils/optimistic-user-message-events"; +import { useOptimisticUserMessageStore } from "#/stores/optimistic-user-message-store"; +import { useSendMessage } from "#/hooks/use-send-message"; +import { createChatMessage } from "#/services/chat-service"; interface UserAssistantEventMessageProps { event: MessageEvent; @@ -16,7 +20,17 @@ export function UserAssistantEventMessage({ isLastMessage, isFromPlanningAgent, }: UserAssistantEventMessageProps) { + const markPendingMessageError = useOptimisticUserMessageStore( + (state) => state.markPendingMessageError, + ); + const markPendingMessageSending = useOptimisticUserMessageStore( + (state) => state.markPendingMessageSending, + ); + const { send } = useSendMessage(); const message = parseMessageFromEvent(event); + const pendingStatus = isOptimisticUserMessageEvent(event) + ? event.optimisticPendingStatus + : undefined; const imageUrls: string[] = []; if (Array.isArray(event.llm_message.content)) { @@ -27,11 +41,41 @@ export function UserAssistantEventMessage({ }); } + const handleRetry = React.useCallback(async () => { + if (!isOptimisticUserMessageEvent(event)) return; + + const pendingMessage = useOptimisticUserMessageStore + .getState() + .pendingMessages.find( + (entry) => entry.id === event.optimisticPendingMessageId, + ); + if (!pendingMessage) return; + + markPendingMessageSending(pendingMessage.id); + + try { + await send( + createChatMessage( + pendingMessage.content, + pendingMessage.imageUrls, + pendingMessage.fileUrls, + pendingMessage.timestamp, + ), + ); + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : "Failed to send message"; + markPendingMessageError(pendingMessage.id, errorMessage); + } + }, [event, markPendingMessageError, markPendingMessageSending, send]); + return ( {imageUrls.length > 0 && ( diff --git a/src/components/conversation-events/chat/messages.tsx b/src/components/conversation-events/chat/messages.tsx index b18e00e31..28d57b134 100644 --- a/src/components/conversation-events/chat/messages.tsx +++ b/src/components/conversation-events/chat/messages.tsx @@ -8,6 +8,7 @@ import { ThoughtEventMessage } from "./event-message-components/thought-event-me import { useModelStore } from "#/stores/model-store"; import { ModelMessages } from "#/components/features/chat/model-messages"; import { useOptionalConversationId } from "#/hooks/use-conversation-id"; +import { isOptimisticUserMessageEvent } from "#/utils/optimistic-user-message-events"; // TODO: Implement microagent functionality for V1 when APIs support V1 event IDs // import { AgentState } from "#/types/agent-state"; // import MemoryIcon from "#/icons/memory_icon.svg?react"; @@ -19,6 +20,15 @@ interface MessagesProps { const getLastEventId = (events: OpenHandsEvent[]) => events.at(-1)?.id; +const getEventListSignature = (events: OpenHandsEvent[]) => + events + .map((event) => + isOptimisticUserMessageEvent(event) + ? `${event.id}:${event.optimisticPendingStatus}` + : String(event.id), + ) + .join("|"); + export const Messages: React.FC = React.memo( ({ messages, allEvents }) => { const { conversationId } = useOptionalConversationId(); @@ -132,6 +142,8 @@ export const Messages: React.FC = React.memo( (prevProps, nextProps) => prevProps.messages.length === nextProps.messages.length && prevProps.allEvents.length === nextProps.allEvents.length && + getEventListSignature(prevProps.messages) === + getEventListSignature(nextProps.messages) && getLastEventId(prevProps.messages) === getLastEventId(nextProps.messages) && getLastEventId(prevProps.allEvents) === getLastEventId(nextProps.allEvents), ); diff --git a/src/components/features/chat/chat-interface.tsx b/src/components/features/chat/chat-interface.tsx index 764c21773..b40575aca 100644 --- a/src/components/features/chat/chat-interface.tsx +++ b/src/components/features/chat/chat-interface.tsx @@ -28,13 +28,13 @@ import { useOptimisticUserMessageStore } from "#/stores/optimistic-user-message- import { SERVER_CONNECTION_ERROR_MESSAGE } from "#/constants/server-connection-error"; import { ErrorMessageBanner } from "./error-message-banner"; import { Messages } from "#/components/conversation-events/chat/messages"; -import { PendingUserMessages } from "./pending-user-messages"; import { useUnifiedUploadFiles } from "#/hooks/mutation/use-unified-upload-files"; import { validateFiles } from "#/utils/file-validation"; import { useConversationStore } from "#/stores/conversation-store"; import ConfirmationModeEnabled from "./confirmation-mode-enabled"; import { useTaskPolling } from "#/hooks/query/use-task-polling"; import { matchesPendingConversationId } from "#/utils/pending-task-message-link"; +import { mergeOptimisticUserMessages } from "#/utils/optimistic-user-message-events"; import { useConversationWebSocket } from "#/contexts/conversation-websocket-context"; import ChatStatusIndicator from "./chat-status-indicator"; import { getStatusColor, getStatusText } from "#/utils/utils"; @@ -225,6 +225,24 @@ export function ChatInterface() { [pendingMessages, conversationId], ); + const visiblePendingMessages = React.useMemo( + () => + conversationId + ? pendingMessages.filter((message) => + matchesPendingConversationId( + conversationId, + message.conversationId, + ), + ) + : [], + [pendingMessages, conversationId], + ); + + const renderableEventsWithPending = React.useMemo( + () => mergeOptimisticUserMessages(renderableEvents, visiblePendingMessages), + [renderableEvents, visiblePendingMessages], + ); + // Show V1 messages immediately if events exist in store (e.g., remount), // if the user already has a locally-tracked pending bubble (home-page cloud // submit while history/WS catch up), or once loading completes. This @@ -375,7 +393,11 @@ export function ChatInterface() { } // Note: We intentionally exclude autoScroll from deps because we only want // to scroll when message content changes, not when autoScroll state changes. - }, [renderableEvents.length, hasPendingUserMessages, scrollDomToBottom]); + }, [ + renderableEventsWithPending.length, + hasPendingUserMessages, + scrollDomToBottom, + ]); // Auto-load older events when the chat content doesn't overflow the // scroll area (no scrollbar to drag, no wheel events past 0). We @@ -395,7 +417,7 @@ export function ChatInterface() { const target = scrollRef.current; if (!target) return; maybeLoadOlderRef.current(target); - }, [renderableEvents.length, hasMoreOlderEvents]); + }, [renderableEventsWithPending.length, hasMoreOlderEvents]); // Create a ScrollProvider with the scroll hook values const scrollProviderValue = { @@ -497,22 +519,13 @@ export function ChatInterface() { anchored to `null` and live above the message list. */} - {showConversationMessages && renderableEvents.length > 0 && ( - - )} - - {/* - Render the local pending-message queue independently so messages - the user just submitted show up immediately (with a faded "sending" - treatment) even before any real conversation event has come back - from the server. Entries drain (FIFO) when the matching - UserMessageEvent echoes back over the WebSocket, so this never - double-renders alongside the real event list. - */} - + {showConversationMessages && + renderableEventsWithPending.length > 0 && ( + + )}
diff --git a/src/components/features/chat/pending-user-messages.tsx b/src/components/features/chat/pending-user-messages.tsx index 9efeb51b5..cf0d6b90c 100644 --- a/src/components/features/chat/pending-user-messages.tsx +++ b/src/components/features/chat/pending-user-messages.tsx @@ -57,7 +57,7 @@ export function PendingUserMessages() { try { await send( createChatMessage( - message.text, + message.content, message.imageUrls, message.fileUrls, message.timestamp, @@ -83,7 +83,7 @@ export function PendingUserMessages() { key={message.id} type="user" message={message.text} - pendingStatus={message.status} + pendingStatus={message.status === "sent" ? undefined : message.status} onRetry={ message.status === "error" ? () => handleRetry(message.id) diff --git a/src/contexts/conversation-websocket-context.tsx b/src/contexts/conversation-websocket-context.tsx index 8e7f25be2..e5ea96135 100644 --- a/src/contexts/conversation-websocket-context.tsx +++ b/src/contexts/conversation-websocket-context.tsx @@ -137,6 +137,9 @@ export function ConversationWebSocketProvider({ const consumeMatchingPendingMessage = useOptimisticUserMessageStore( (state) => state.consumeMatchingPendingMessage, ); + const markOldestSendingMessageSent = useOptimisticUserMessageStore( + (state) => state.markOldestSendingMessageSent, + ); const { setExecutionStatus } = useConversationStateStore(); const { appendInput, appendOutput } = useCommandStore(); @@ -455,6 +458,12 @@ export function ConversationWebSocketProvider({ // Clear draft from localStorage - message was successfully delivered setConversationState(conversationId, { draftMessage: null }); } + } else if ( + conversationId && + (event.source === "agent" || isConversationStateUpdateEvent(event)) + ) { + markOldestSendingMessageSent(conversationId); + setConversationState(conversationId, { draftMessage: null }); } // Handle cache invalidation for ActionEvent @@ -550,6 +559,7 @@ export function ConversationWebSocketProvider({ addEvent, setErrorMessage, consumeMatchingPendingMessage, + markOldestSendingMessageSent, queryClient, conversationId, setExecutionStatus, @@ -635,6 +645,12 @@ export function ConversationWebSocketProvider({ ); setConversationState(conversationId, { draftMessage: null }); } + } else if ( + conversationId && + (event.source === "agent" || isConversationStateUpdateEvent(event)) + ) { + markOldestSendingMessageSent(conversationId); + setConversationState(conversationId, { draftMessage: null }); } // Handle cache invalidation for ActionEvent @@ -724,6 +740,7 @@ export function ConversationWebSocketProvider({ expectedEventCountPlanning, setErrorMessage, consumeMatchingPendingMessage, + markOldestSendingMessageSent, queryClient, subConversations, conversationId, diff --git a/src/stores/optimistic-user-message-store.ts b/src/stores/optimistic-user-message-store.ts index 69490d617..1d87d67e2 100644 --- a/src/stores/optimistic-user-message-store.ts +++ b/src/stores/optimistic-user-message-store.ts @@ -1,6 +1,6 @@ import { create } from "zustand"; -export type PendingUserMessageStatus = "sending" | "error"; +export type PendingUserMessageStatus = "sending" | "sent" | "error"; /** * How long a pending message is allowed to stay in "sending" state before we @@ -64,6 +64,8 @@ interface OptimisticUserMessageActions { enqueuePendingMessage: (payload: EnqueuePendingMessagePayload) => string; /** Mark a pending message as failed (the API rejected it). */ markPendingMessageError: (id: string, errorMessage?: string) => void; + /** Mark a pending message as accepted by the server. */ + markPendingMessageSent: (id: string) => void; /** Mark a pending message as sending again (used when retrying). */ markPendingMessageSending: (id: string) => void; /** Drop a pending message from the queue (e.g., after success/cancellation). */ @@ -82,6 +84,14 @@ interface OptimisticUserMessageActions { conversationId: string, content: string, ) => PendingUserMessage | null; + /** + * Mark the oldest still-sending message in the conversation as accepted by + * the server. Used when the server streams agent work but does not echo the + * user message event on the live WebSocket path. + */ + markOldestSendingMessageSent: ( + conversationId: string, + ) => PendingUserMessage | null; /** Wipe all queued messages (e.g., when changing conversations). */ clearPendingMessages: () => void; /** @@ -150,6 +160,15 @@ export const useOptimisticUserMessageStore = create( ), })), + markPendingMessageSent: (id) => + set((state) => ({ + pendingMessages: state.pendingMessages.map((message) => + message.id === id + ? { ...message, status: "sent", errorMessage: undefined } + : message, + ), + })), + markPendingMessageSending: (id) => set((state) => ({ pendingMessages: state.pendingMessages.map((message) => @@ -177,15 +196,15 @@ export const useOptimisticUserMessageStore = create( // case. let consumed: PendingUserMessage | null = null; set((state) => { - const sending = state.pendingMessages + const confirmable = state.pendingMessages .map((m, i) => ({ m, i })) .filter( ({ m }) => - m.status === "sending" && m.conversationId === conversationId, + m.status !== "error" && m.conversationId === conversationId, ); - if (sending.length === 0) return state; - const exact = sending.find(({ m }) => m.content === content); - const target = exact ?? sending[0]; + if (confirmable.length === 0) return state; + const exact = confirmable.find(({ m }) => m.content === content); + const target = exact ?? confirmable[0]; consumed = target.m; return { pendingMessages: [ @@ -197,6 +216,28 @@ export const useOptimisticUserMessageStore = create( return consumed; }, + markOldestSendingMessageSent: (conversationId) => { + let marked: PendingUserMessage | null = null; + set((state) => { + const target = state.pendingMessages.find( + (message) => + message.status === "sending" && + message.conversationId === conversationId, + ); + if (!target) return state; + + marked = target; + return { + pendingMessages: state.pendingMessages.map((message) => + message.id === target.id + ? { ...message, status: "sent", errorMessage: undefined } + : message, + ), + }; + }); + return marked; + }, + clearPendingMessages: () => set(() => ({ ...initialState })), reassignPendingMessages: (fromConversationId, toConversationId) => diff --git a/src/utils/optimistic-user-message-events.ts b/src/utils/optimistic-user-message-events.ts new file mode 100644 index 000000000..fae289f0c --- /dev/null +++ b/src/utils/optimistic-user-message-events.ts @@ -0,0 +1,68 @@ +import { MessageEvent, OpenHandsEvent } from "#/types/agent-server/core"; +import { + PendingUserMessage, + PendingUserMessageStatus, +} from "#/stores/optimistic-user-message-store"; + +export type OptimisticUserMessageEvent = MessageEvent & { + optimisticPendingMessageId: string; + optimisticPendingStatus: PendingUserMessageStatus; + optimisticPendingErrorMessage?: string; +}; + +export const isOptimisticUserMessageEvent = ( + event: OpenHandsEvent, +): event is OptimisticUserMessageEvent => + "optimisticPendingMessageId" in event && + typeof event.optimisticPendingMessageId === "string"; + +const compareTimestamps = (a: string | undefined, b: string | undefined) => { + if (!a && !b) return 0; + if (!a) return 1; + if (!b) return -1; + return a.localeCompare(b); +}; + +const toOptimisticUserMessageEvent = ( + message: PendingUserMessage, +): OptimisticUserMessageEvent => ({ + id: message.id, + timestamp: message.timestamp, + source: "user", + llm_message: { + role: "user", + content: [ + { type: "text", text: message.text }, + ...(message.imageUrls.length > 0 + ? [{ type: "image" as const, image_urls: message.imageUrls }] + : []), + ], + }, + activated_microagents: [], + extended_content: [], + optimisticPendingMessageId: message.id, + optimisticPendingStatus: message.status, + optimisticPendingErrorMessage: message.errorMessage, +}); + +export const mergeOptimisticUserMessages = ( + events: OpenHandsEvent[], + pendingMessages: PendingUserMessage[], +): OpenHandsEvent[] => { + if (pendingMessages.length === 0) { + return events; + } + + const eventIds = new Set(events.map((event) => event.id)); + const optimisticEvents = pendingMessages + .filter((message) => !eventIds.has(message.id)) + .map(toOptimisticUserMessageEvent); + + if (optimisticEvents.length === 0) { + return events; + } + + return [...events, ...optimisticEvents].sort((a, b) => + compareTimestamps(a.timestamp, b.timestamp), + ); +}; From f375143cde19ea3464bfcc60bf264da0f4694342 Mon Sep 17 00:00:00 2001 From: neubig <398875+neubig@users.noreply.github.com> Date: Sun, 24 May 2026 09:36:36 -0400 Subject: [PATCH 4/7] Show queued state for optimistic user messages --- .../components/chat/chat-interface.test.tsx | 31 ++++------ .../chat/pending-user-messages.test.tsx | 42 ++++++------- .../optimistic-user-message-store.test.ts | 50 ++++++---------- .../optimistic-user-message-events.test.ts | 16 ++--- .../user-assistant-event-message.tsx | 14 ++++- .../features/chat/chat-interface.tsx | 16 +++-- src/components/features/chat/chat-message.tsx | 16 ++++- .../features/chat/git-control-bar.tsx | 21 +++++-- .../features/chat/pending-user-messages.tsx | 22 +++++-- .../features/home/tasks/task-card.tsx | 1 + .../conversation-websocket-context.tsx | 17 ------ src/hooks/use-handle-build-plan-click.ts | 20 +++++-- src/i18n/translation.json | 17 ++++++ src/stores/optimistic-user-message-store.ts | 60 +++++-------------- .../enqueue-home-task-pending-message.ts | 1 + 15 files changed, 172 insertions(+), 172 deletions(-) diff --git a/__tests__/components/chat/chat-interface.test.tsx b/__tests__/components/chat/chat-interface.test.tsx index 593f15a3b..7f8ab01bc 100644 --- a/__tests__/components/chat/chat-interface.test.tsx +++ b/__tests__/components/chat/chat-interface.test.tsx @@ -1,12 +1,4 @@ -import { - afterEach, - beforeEach, - describe, - expect, - it, - test, - vi, -} from "vitest"; +import { afterEach, beforeEach, describe, expect, it, test, vi } from "vitest"; import { fireEvent, render, @@ -241,11 +233,7 @@ describe("ChatInterface - Chat Suggestions", () => { }, }); - renderWithQueryClient( - , - queryClient, - "/task-abc", - ); + renderWithQueryClient(, queryClient, "/task-abc"); expect(screen.queryByTestId("chat-suggestions")).not.toBeInTheDocument(); }); @@ -267,11 +255,7 @@ describe("ChatInterface - Chat Suggestions", () => { }, }); - renderWithQueryClient( - , - queryClient, - "/task-abc", - ); + renderWithQueryClient(, queryClient, "/task-abc"); expect(screen.queryByTestId("chat-suggestions")).not.toBeInTheDocument(); }); @@ -664,7 +648,14 @@ describe("ChatInterface - Pending message queue", () => { expect(screen.getByTestId("chat-message-sending")).toBeInTheDocument(); expect(screen.queryByTestId("chat-message-retry")).not.toBeInTheDocument(); - resolveSend?.({ queued: false }); + act(() => { + resolveSend?.({ queued: false }); + }); + + await waitFor(() => { + expect(pendingMessage).toHaveAttribute("data-pending-status", "queued"); + }); + expect(screen.getByTestId("chat-message-queued")).toBeInTheDocument(); }); it("flips the message to 'error' with a retry link when send rejects", async () => { diff --git a/__tests__/components/features/chat/pending-user-messages.test.tsx b/__tests__/components/features/chat/pending-user-messages.test.tsx index dd00935cc..913713655 100644 --- a/__tests__/components/features/chat/pending-user-messages.test.tsx +++ b/__tests__/components/features/chat/pending-user-messages.test.tsx @@ -33,7 +33,7 @@ describe("PendingUserMessages", () => { expect(container).toBeEmptyDOMElement(); }); - it("renders each queued message with the faded 'sending' treatment", () => { + it("renders each sending message with the faded treatment", () => { useOptimisticUserMessageStore.getState().enqueuePendingMessage({ conversationId: ACTIVE_CONVO, text: "first message", @@ -74,12 +74,10 @@ describe("PendingUserMessages", () => { }); it("shows an error state with a retry link when the message is in 'error'", () => { - const id = useOptimisticUserMessageStore - .getState() - .enqueuePendingMessage({ - conversationId: ACTIVE_CONVO, - text: "broken message", - }); + const id = useOptimisticUserMessageStore.getState().enqueuePendingMessage({ + conversationId: ACTIVE_CONVO, + text: "broken message", + }); useOptimisticUserMessageStore .getState() .markPendingMessageError(id, "Server unavailable"); @@ -92,14 +90,12 @@ describe("PendingUserMessages", () => { expect(screen.getByTestId("chat-message-retry")).toBeInTheDocument(); }); - it("re-sends and flips back to 'sending' when retry is clicked", async () => { + it("re-sends and flips to 'queued' when retry succeeds", async () => { mockSend.mockResolvedValueOnce({ queued: false }); - const id = useOptimisticUserMessageStore - .getState() - .enqueuePendingMessage({ - conversationId: ACTIVE_CONVO, - text: "retry me", - }); + const id = useOptimisticUserMessageStore.getState().enqueuePendingMessage({ + conversationId: ACTIVE_CONVO, + text: "retry me", + }); useOptimisticUserMessageStore .getState() .markPendingMessageError(id, "Server unavailable"); @@ -118,21 +114,18 @@ describe("PendingUserMessages", () => { ); await waitFor(() => { - const [entry] = - useOptimisticUserMessageStore.getState().pendingMessages; - expect(entry.status).toBe("sending"); + const [entry] = useOptimisticUserMessageStore.getState().pendingMessages; + expect(entry.status).toBe("queued"); expect(entry.errorMessage).toBeUndefined(); }); }); it("flips back to 'error' if the retry attempt also fails", async () => { mockSend.mockRejectedValueOnce(new Error("still broken")); - const id = useOptimisticUserMessageStore - .getState() - .enqueuePendingMessage({ - conversationId: ACTIVE_CONVO, - text: "retry me", - }); + const id = useOptimisticUserMessageStore.getState().enqueuePendingMessage({ + conversationId: ACTIVE_CONVO, + text: "retry me", + }); useOptimisticUserMessageStore .getState() .markPendingMessageError(id, "Server unavailable"); @@ -143,8 +136,7 @@ describe("PendingUserMessages", () => { await user.click(screen.getByTestId("chat-message-retry")); await waitFor(() => { - const [entry] = - useOptimisticUserMessageStore.getState().pendingMessages; + const [entry] = useOptimisticUserMessageStore.getState().pendingMessages; expect(entry.status).toBe("error"); expect(entry.errorMessage).toBe("still broken"); }); diff --git a/__tests__/stores/optimistic-user-message-store.test.ts b/__tests__/stores/optimistic-user-message-store.test.ts index bea772fc0..8f98271ec 100644 --- a/__tests__/stores/optimistic-user-message-store.test.ts +++ b/__tests__/stores/optimistic-user-message-store.test.ts @@ -74,38 +74,30 @@ describe("optimistic-user-message-store", () => { expect(entry.errorMessage).toBeUndefined(); }); - it("marks a sending message as sent", () => { + it("marks a sending message as queued", () => { const store = useOptimisticUserMessageStore.getState(); const id = store.enqueuePendingMessage({ conversationId: CONVO, - text: "accepted", + text: "queued", }); - store.markPendingMessageSent(id); + store.markPendingMessageQueued(id); const [entry] = useOptimisticUserMessageStore.getState().pendingMessages; - expect(entry.status).toBe("sent"); + expect(entry.status).toBe("queued"); expect(entry.errorMessage).toBeUndefined(); }); - it("marks the oldest sending message in a conversation as sent", () => { + it("can enqueue an already queued message", () => { const store = useOptimisticUserMessageStore.getState(); - const firstId = store.enqueuePendingMessage({ + store.enqueuePendingMessage({ conversationId: CONVO, - text: "first", + text: "queued", + status: "queued", }); - store.enqueuePendingMessage({ conversationId: CONVO, text: "second" }); - store.enqueuePendingMessage({ conversationId: "conv-b", text: "other" }); - const marked = store.markOldestSendingMessageSent(CONVO); - - expect(marked?.id).toBe(firstId); - const pending = useOptimisticUserMessageStore.getState().pendingMessages; - expect(pending.map((message) => message.status)).toEqual([ - "sent", - "sending", - "sending", - ]); + const [entry] = useOptimisticUserMessageStore.getState().pendingMessages; + expect(entry.status).toBe("queued"); }); it("enqueue stores `content` separately from `text` and defaults it to `text`", () => { @@ -145,19 +137,18 @@ describe("optimistic-user-message-store", () => { const consumed = store.consumeMatchingPendingMessage(CONVO, "second"); expect(consumed?.id).toBe(secondId); - const remaining = - useOptimisticUserMessageStore.getState().pendingMessages; + const remaining = useOptimisticUserMessageStore.getState().pendingMessages; expect(remaining).toHaveLength(1); expect(remaining[0].id).toBe(firstId); }); - it("consumeMatchingPendingMessage removes a sent optimistic message when the echo arrives", () => { + it("consumeMatchingPendingMessage removes a queued optimistic message when the echo arrives", () => { const store = useOptimisticUserMessageStore.getState(); const id = store.enqueuePendingMessage({ conversationId: CONVO, text: "accepted", }); - store.markPendingMessageSent(id); + store.markPendingMessageQueued(id); const consumed = store.consumeMatchingPendingMessage(CONVO, "accepted"); @@ -203,8 +194,7 @@ describe("optimistic-user-message-store", () => { const consumed = store.consumeMatchingPendingMessage(CONVO, "second"); expect(consumed?.id).toBe(secondId); - const remaining = - useOptimisticUserMessageStore.getState().pendingMessages; + const remaining = useOptimisticUserMessageStore.getState().pendingMessages; expect(remaining).toHaveLength(1); expect(remaining[0].id).toBe(firstId); expect(remaining[0].status).toBe("error"); @@ -242,8 +232,7 @@ describe("optimistic-user-message-store", () => { const consumed = store.consumeMatchingPendingMessage("conv-b", "shared"); expect(consumed?.id).toBe(bId); - const remaining = - useOptimisticUserMessageStore.getState().pendingMessages; + const remaining = useOptimisticUserMessageStore.getState().pendingMessages; expect(remaining).toHaveLength(1); expect(remaining[0].id).toBe(aId); }); @@ -269,19 +258,19 @@ describe("optimistic-user-message-store", () => { expect(entry.errorMessage).toBe("Send timed out"); }); - it("watchdog timeout does nothing if the message was already marked sent", () => { + it("watchdog timeout does nothing if the message was already queued", () => { const store = useOptimisticUserMessageStore.getState(); const id = store.enqueuePendingMessage({ conversationId: CONVO, text: "accepted", }); - store.markPendingMessageSent(id); + store.markPendingMessageQueued(id); vi.advanceTimersByTime(PENDING_MESSAGE_TIMEOUT_MS); const [entry] = useOptimisticUserMessageStore.getState().pendingMessages; expect(entry.id).toBe(id); - expect(entry.status).toBe("sent"); + expect(entry.status).toBe("queued"); }); it("watchdog timeout does nothing if the echo already consumed the message", () => { @@ -321,8 +310,7 @@ describe("optimistic-user-message-store", () => { store.removePendingMessage(firstId); - const remaining = - useOptimisticUserMessageStore.getState().pendingMessages; + const remaining = useOptimisticUserMessageStore.getState().pendingMessages; expect(remaining.map((m) => m.text)).toEqual(["second"]); }); diff --git a/__tests__/utils/optimistic-user-message-events.test.ts b/__tests__/utils/optimistic-user-message-events.test.ts index 81c8d433a..cf34c5a4e 100644 --- a/__tests__/utils/optimistic-user-message-events.test.ts +++ b/__tests__/utils/optimistic-user-message-events.test.ts @@ -73,13 +73,14 @@ describe("optimistic-user-message-events", () => { }); it("carries pending status into the optimistic event", () => { - const [event] = mergeOptimisticUserMessages([], [ - { ...pendingMessage, status: "sent" }, - ]); + const [event] = mergeOptimisticUserMessages( + [], + [{ ...pendingMessage, status: "queued" }], + ); expect(isOptimisticUserMessageEvent(event)).toBe(true); if (isOptimisticUserMessageEvent(event)) { - expect(event.optimisticPendingStatus).toBe("sent"); + expect(event.optimisticPendingStatus).toBe("queued"); expect(event.llm_message.content).toEqual([ { type: "text", text: "optimistic message" }, ]); @@ -87,9 +88,10 @@ describe("optimistic-user-message-events", () => { }); it("does not duplicate an event that already exists", () => { - const merged = mergeOptimisticUserMessages([userEvent], [ - { ...pendingMessage, id: userEvent.id }, - ]); + const merged = mergeOptimisticUserMessages( + [userEvent], + [{ ...pendingMessage, id: userEvent.id }], + ); expect(merged).toEqual([userEvent]); }); diff --git a/src/components/conversation-events/chat/event-message-components/user-assistant-event-message.tsx b/src/components/conversation-events/chat/event-message-components/user-assistant-event-message.tsx index 6224e3b0c..af840c1e0 100644 --- a/src/components/conversation-events/chat/event-message-components/user-assistant-event-message.tsx +++ b/src/components/conversation-events/chat/event-message-components/user-assistant-event-message.tsx @@ -26,6 +26,9 @@ export function UserAssistantEventMessage({ const markPendingMessageSending = useOptimisticUserMessageStore( (state) => state.markPendingMessageSending, ); + const markPendingMessageQueued = useOptimisticUserMessageStore( + (state) => state.markPendingMessageQueued, + ); const { send } = useSendMessage(); const message = parseMessageFromEvent(event); const pendingStatus = isOptimisticUserMessageEvent(event) @@ -62,19 +65,26 @@ export function UserAssistantEventMessage({ pendingMessage.timestamp, ), ); + markPendingMessageQueued(pendingMessage.id); } catch (error) { const errorMessage = error instanceof Error ? error.message : "Failed to send message"; markPendingMessageError(pendingMessage.id, errorMessage); } - }, [event, markPendingMessageError, markPendingMessageSending, send]); + }, [ + event, + markPendingMessageError, + markPendingMessageQueued, + markPendingMessageSending, + send, + ]); return ( {imageUrls.length > 0 && ( diff --git a/src/components/features/chat/chat-interface.tsx b/src/components/features/chat/chat-interface.tsx index b40575aca..8b91a7d88 100644 --- a/src/components/features/chat/chat-interface.tsx +++ b/src/components/features/chat/chat-interface.tsx @@ -42,6 +42,7 @@ import { useNewConversationCommand } from "#/hooks/mutation/use-new-conversation import { useOptionalConversationId } from "#/hooks/use-conversation-id"; import { useActiveConversation } from "#/hooks/query/use-active-conversation"; import { I18nKey } from "#/i18n/declaration"; +import { setConversationState } from "#/utils/conversation-local-storage"; function getEntryPoint( hasRepository: boolean | null, @@ -76,6 +77,9 @@ export function ChatInterface() { const markPendingMessageError = useOptimisticUserMessageStore( (state) => state.markPendingMessageError, ); + const markPendingMessageQueued = useOptimisticUserMessageStore( + (state) => state.markPendingMessageQueued, + ); const pendingMessages = useOptimisticUserMessageStore( (state) => state.pendingMessages, ); @@ -337,11 +341,11 @@ export function ChatInterface() { const prompt = uploadedFiles.length > 0 ? `${content}\n\n${filePrompt}` : content; - // Enqueue the message into the local pending queue with status "sending" - // so the user immediately sees it in the chat with a faded treatment. The - // entry is removed when the WebSocket echoes back the corresponding - // `UserMessageEvent`. If the API call to send the message fails, the entry - // is flipped to "error" with a retry link. + // Enqueue the message into the local pending queue with status "sending" so + // the user immediately sees it in the chat. Once the local send call + // succeeds we mark it "queued"; only the server's echoed `UserMessageEvent` + // removes it, because later agent activity may still belong to the previous + // turn. const pendingId = enqueuePendingMessage({ conversationId: conversationId!, // `text` is what the user sees in the bubble; `content` is what we @@ -360,6 +364,8 @@ export function ChatInterface() { await send( createChatMessage(prompt, imageUrls, uploadedFiles, timestamp), ); + markPendingMessageQueued(pendingId); + setConversationState(conversationId!, { draftMessage: null }); } catch (sendError) { const sendErrorMessage = sendError instanceof Error diff --git a/src/components/features/chat/chat-message.tsx b/src/components/features/chat/chat-message.tsx index d60a22455..93cea5ab3 100644 --- a/src/components/features/chat/chat-message.tsx +++ b/src/components/features/chat/chat-message.tsx @@ -7,7 +7,7 @@ import { StyledTooltip } from "#/components/shared/buttons/styled-tooltip"; import { I18nKey } from "#/i18n/declaration"; import { MarkdownRenderer } from "../markdown/markdown-renderer"; -export type ChatMessagePendingStatus = "sending" | "error"; +export type ChatMessagePendingStatus = "sending" | "queued" | "error"; interface ChatMessageProps { type: SourceType; @@ -68,7 +68,8 @@ export function ChatMessage({ isFromPlanningAgent && type === "agent" && "border border-[#597ff4] bg-tertiary p-4 mt-2", - pendingStatus === "sending" && "opacity-60", + (pendingStatus === "sending" || pendingStatus === "queued") && + "opacity-60", pendingStatus === "error" && "border border-status-fail-border", )} > @@ -129,6 +130,17 @@ export function ChatMessage({ )} + {pendingStatus === "queued" && ( + + {t(I18nKey.CHAT_INTERFACE$MESSAGE_QUEUED)} + + )} + {pendingStatus === "error" && ( state.markPendingMessageError, ); + const markPendingMessageQueued = useOptimisticUserMessageStore( + (state) => state.markPendingMessageQueued, + ); const { backend } = useActiveBackend(); const isLocalBackend = backend.kind === "local"; @@ -177,12 +180,18 @@ export function GitControlBar({ onSuggestionsClick }: GitControlBarProps) { timestamp: new Date().toISOString(), }, }), - ).catch((error) => { - if (!pendingId) return; - const errorMessage = - error instanceof Error ? error.message : "Failed to send message"; - markPendingMessageError(pendingId, errorMessage); - }); + ) + .then(() => { + if (pendingId) markPendingMessageQueued(pendingId); + }) + .catch((error) => { + if (!pendingId) return; + const errorMessage = + error instanceof Error + ? error.message + : "Failed to send message"; + markPendingMessageError(pendingId, errorMessage); + }); }, }, ); diff --git a/src/components/features/chat/pending-user-messages.tsx b/src/components/features/chat/pending-user-messages.tsx index cf0d6b90c..91e3fe7ad 100644 --- a/src/components/features/chat/pending-user-messages.tsx +++ b/src/components/features/chat/pending-user-messages.tsx @@ -9,10 +9,11 @@ import { ChatMessage } from "./chat-message"; /** * Renders the queue of locally-tracked user messages that have been submitted - * but not yet echoed back through the WebSocket. Each message shows a faded - * "sending" treatment until the server echoes a real `UserMessageEvent` - * (which removes it via `consumeMatchingPendingMessage`). If the API rejects the - * send, the message switches to an "error" state with a retry button. + * but not yet echoed back through the WebSocket. Each message starts as + * "sending", switches to "queued" once the local send call succeeds, and is + * removed only when the server echoes a real `UserMessageEvent`. If the API + * rejects the send, the message switches to an "error" state with a retry + * button. * * The queue is global but each entry is tagged with the conversation id it * was enqueued from; this component filters to only entries belonging to the @@ -30,6 +31,9 @@ export function PendingUserMessages() { const markPendingMessageSending = useOptimisticUserMessageStore( (state) => state.markPendingMessageSending, ); + const markPendingMessageQueued = useOptimisticUserMessageStore( + (state) => state.markPendingMessageQueued, + ); const { send } = useSendMessage(); const visibleMessages = React.useMemo( @@ -63,13 +67,19 @@ export function PendingUserMessages() { message.timestamp, ), ); + markPendingMessageQueued(id); } catch (error) { const errorMessage = error instanceof Error ? error.message : "Failed to send message"; markPendingMessageError(id, errorMessage); } }, - [send, markPendingMessageError, markPendingMessageSending], + [ + send, + markPendingMessageError, + markPendingMessageQueued, + markPendingMessageSending, + ], ); if (visibleMessages.length === 0) { @@ -83,7 +93,7 @@ export function PendingUserMessages() { key={message.id} type="user" message={message.text} - pendingStatus={message.status === "sent" ? undefined : message.status} + pendingStatus={message.status} onRetry={ message.status === "error" ? () => handleRetry(message.id) diff --git a/src/components/features/home/tasks/task-card.tsx b/src/components/features/home/tasks/task-card.tsx index e6aa8ac59..db73650c9 100644 --- a/src/components/features/home/tasks/task-card.tsx +++ b/src/components/features/home/tasks/task-card.tsx @@ -47,6 +47,7 @@ export function TaskCard({ task }: TaskCardProps) { enqueuePendingMessage({ conversationId: data.conversation_id, text: t("TASK$ADDRESSING_TASK"), + status: "queued", }); navigate(`/conversations/${data.conversation_id}`); }, diff --git a/src/contexts/conversation-websocket-context.tsx b/src/contexts/conversation-websocket-context.tsx index e5ea96135..8e7f25be2 100644 --- a/src/contexts/conversation-websocket-context.tsx +++ b/src/contexts/conversation-websocket-context.tsx @@ -137,9 +137,6 @@ export function ConversationWebSocketProvider({ const consumeMatchingPendingMessage = useOptimisticUserMessageStore( (state) => state.consumeMatchingPendingMessage, ); - const markOldestSendingMessageSent = useOptimisticUserMessageStore( - (state) => state.markOldestSendingMessageSent, - ); const { setExecutionStatus } = useConversationStateStore(); const { appendInput, appendOutput } = useCommandStore(); @@ -458,12 +455,6 @@ export function ConversationWebSocketProvider({ // Clear draft from localStorage - message was successfully delivered setConversationState(conversationId, { draftMessage: null }); } - } else if ( - conversationId && - (event.source === "agent" || isConversationStateUpdateEvent(event)) - ) { - markOldestSendingMessageSent(conversationId); - setConversationState(conversationId, { draftMessage: null }); } // Handle cache invalidation for ActionEvent @@ -559,7 +550,6 @@ export function ConversationWebSocketProvider({ addEvent, setErrorMessage, consumeMatchingPendingMessage, - markOldestSendingMessageSent, queryClient, conversationId, setExecutionStatus, @@ -645,12 +635,6 @@ export function ConversationWebSocketProvider({ ); setConversationState(conversationId, { draftMessage: null }); } - } else if ( - conversationId && - (event.source === "agent" || isConversationStateUpdateEvent(event)) - ) { - markOldestSendingMessageSent(conversationId); - setConversationState(conversationId, { draftMessage: null }); } // Handle cache invalidation for ActionEvent @@ -740,7 +724,6 @@ export function ConversationWebSocketProvider({ expectedEventCountPlanning, setErrorMessage, consumeMatchingPendingMessage, - markOldestSendingMessageSent, queryClient, subConversations, conversationId, diff --git a/src/hooks/use-handle-build-plan-click.ts b/src/hooks/use-handle-build-plan-click.ts index f08651302..87b6ec67d 100644 --- a/src/hooks/use-handle-build-plan-click.ts +++ b/src/hooks/use-handle-build-plan-click.ts @@ -21,6 +21,9 @@ export const useHandleBuildPlanClick = () => { const markPendingMessageError = useOptimisticUserMessageStore( (state) => state.markPendingMessageError, ); + const markPendingMessageQueued = useOptimisticUserMessageStore( + (state) => state.markPendingMessageQueued, + ); const handleBuildPlanClick = useCallback( (event?: React.MouseEvent | KeyboardEvent) => { @@ -44,12 +47,16 @@ export const useHandleBuildPlanClick = () => { timestamp, }) : null; - send(createChatMessage(buildPrompt, [], [], timestamp)).catch((error) => { - if (!pendingId) return; - const errorMessage = - error instanceof Error ? error.message : "Failed to send message"; - markPendingMessageError(pendingId, errorMessage); - }); + send(createChatMessage(buildPrompt, [], [], timestamp)) + .then(() => { + if (pendingId) markPendingMessageQueued(pendingId); + }) + .catch((error) => { + if (!pendingId) return; + const errorMessage = + error instanceof Error ? error.message : "Failed to send message"; + markPendingMessageError(pendingId, errorMessage); + }); }, [ setConversationMode, @@ -57,6 +64,7 @@ export const useHandleBuildPlanClick = () => { conversationId, enqueuePendingMessage, markPendingMessageError, + markPendingMessageQueued, ], ); diff --git a/src/i18n/translation.json b/src/i18n/translation.json index fd41b3232..751fad21e 100644 --- a/src/i18n/translation.json +++ b/src/i18n/translation.json @@ -8618,6 +8618,23 @@ "uk": "Надсилання...", "ca": "Enviant..." }, + "CHAT_INTERFACE$MESSAGE_QUEUED": { + "en": "Queued", + "ja": "キューに追加済み", + "zh-CN": "已排队", + "zh-TW": "已排入佇列", + "ko-KR": "대기열에 추가됨", + "no": "I kø", + "it": "In coda", + "pt": "Na fila", + "es": "En cola", + "ar": "في قائمة الانتظار", + "fr": "En file d'attente", + "tr": "Sırada", + "de": "In Warteschlange", + "uk": "У черзі", + "ca": "A la cua" + }, "CHAT_INTERFACE$MESSAGE_SEND_FAILED": { "en": "Failed to send.", "ja": "送信に失敗しました。", diff --git a/src/stores/optimistic-user-message-store.ts b/src/stores/optimistic-user-message-store.ts index 1d87d67e2..caf5f473b 100644 --- a/src/stores/optimistic-user-message-store.ts +++ b/src/stores/optimistic-user-message-store.ts @@ -1,6 +1,6 @@ import { create } from "zustand"; -export type PendingUserMessageStatus = "sending" | "sent" | "error"; +export type PendingUserMessageStatus = "sending" | "queued" | "error"; /** * How long a pending message is allowed to stay in "sending" state before we @@ -52,6 +52,7 @@ export interface EnqueuePendingMessagePayload { imageUrls?: string[]; fileUrls?: string[]; timestamp?: string; + status?: PendingUserMessageStatus; } interface OptimisticUserMessageActions { @@ -64,34 +65,25 @@ interface OptimisticUserMessageActions { enqueuePendingMessage: (payload: EnqueuePendingMessagePayload) => string; /** Mark a pending message as failed (the API rejected it). */ markPendingMessageError: (id: string, errorMessage?: string) => void; - /** Mark a pending message as accepted by the server. */ - markPendingMessageSent: (id: string) => void; + /** Mark a pending message as queued after the local send call succeeds. */ + markPendingMessageQueued: (id: string) => void; /** Mark a pending message as sending again (used when retrying). */ markPendingMessageSending: (id: string) => void; /** Drop a pending message from the queue (e.g., after success/cancellation). */ removePendingMessage: (id: string) => void; /** - * Remove the pending message that matches the given echoed `content` in - * the given conversation. Matching is done by exact content equality on - * messages still in "sending" state; if no match exists we fall back to - * removing the oldest "sending" entry in that conversation so that an echo - * with a slightly munged body (e.g. trailing-whitespace stripped by the - * server) still clears its bubble. Scoping by `conversationId` ensures a - * stale ack for one conversation never pops a pending entry belonging to - * another. + * Remove the pending message that matches the given echoed `content` in the + * given conversation. Matching is done by exact content equality on non-error + * messages; if no match exists we fall back to removing the oldest non-error + * entry in that conversation so that an echo with a slightly munged body + * (e.g. trailing-whitespace stripped by the server) still clears its bubble. + * Scoping by `conversationId` ensures a stale ack for one conversation never + * pops a pending entry belonging to another. */ consumeMatchingPendingMessage: ( conversationId: string, content: string, ) => PendingUserMessage | null; - /** - * Mark the oldest still-sending message in the conversation as accepted by - * the server. Used when the server streams agent work but does not echo the - * user message event on the live WebSocket path. - */ - markOldestSendingMessageSent: ( - conversationId: string, - ) => PendingUserMessage | null; /** Wipe all queued messages (e.g., when changing conversations). */ clearPendingMessages: () => void; /** @@ -129,7 +121,7 @@ export const useOptimisticUserMessageStore = create( conversationId: payload.conversationId, text: payload.text, content: payload.content ?? payload.text, - status: "sending", + status: payload.status ?? "sending", imageUrls: payload.imageUrls ?? [], fileUrls: payload.fileUrls ?? [], timestamp: payload.timestamp ?? new Date().toISOString(), @@ -160,11 +152,11 @@ export const useOptimisticUserMessageStore = create( ), })), - markPendingMessageSent: (id) => + markPendingMessageQueued: (id) => set((state) => ({ pendingMessages: state.pendingMessages.map((message) => message.id === id - ? { ...message, status: "sent", errorMessage: undefined } + ? { ...message, status: "queued", errorMessage: undefined } : message, ), })), @@ -191,7 +183,7 @@ export const useOptimisticUserMessageStore = create( // is what makes out-of-order echoes safe: an echo of "world" will pop // the "world" bubble, not the older "hello" one). If no exact match // exists — e.g. the server slightly munged the body — fall back to the - // oldest "sending" entry in this conversation so the user doesn't end + // oldest non-error entry in this conversation so the user doesn't end // up with a permanently-stuck bubble in the happy-path single-message // case. let consumed: PendingUserMessage | null = null; @@ -216,28 +208,6 @@ export const useOptimisticUserMessageStore = create( return consumed; }, - markOldestSendingMessageSent: (conversationId) => { - let marked: PendingUserMessage | null = null; - set((state) => { - const target = state.pendingMessages.find( - (message) => - message.status === "sending" && - message.conversationId === conversationId, - ); - if (!target) return state; - - marked = target; - return { - pendingMessages: state.pendingMessages.map((message) => - message.id === target.id - ? { ...message, status: "sent", errorMessage: undefined } - : message, - ), - }; - }); - return marked; - }, - clearPendingMessages: () => set(() => ({ ...initialState })), reassignPendingMessages: (fromConversationId, toConversationId) => diff --git a/src/utils/enqueue-home-task-pending-message.ts b/src/utils/enqueue-home-task-pending-message.ts index e576d91ea..45b31c5e5 100644 --- a/src/utils/enqueue-home-task-pending-message.ts +++ b/src/utils/enqueue-home-task-pending-message.ts @@ -26,5 +26,6 @@ export async function enqueueHomeTaskPendingMessage(options: { content: options.text, imageUrls, fileUrls: [], + status: "queued", }); } From 3ee77f61d94a9db8eeb698fb664f6bfe1b0a1fd9 Mon Sep 17 00:00:00 2001 From: neubig <398875+neubig@users.noreply.github.com> Date: Sun, 24 May 2026 09:39:59 -0400 Subject: [PATCH 5/7] Revert "Show queued state for optimistic user messages" This reverts commit f375143cde19ea3464bfcc60bf264da0f4694342. --- .../components/chat/chat-interface.test.tsx | 31 ++++++---- .../chat/pending-user-messages.test.tsx | 42 +++++++------ .../optimistic-user-message-store.test.ts | 50 ++++++++++------ .../optimistic-user-message-events.test.ts | 16 +++-- .../user-assistant-event-message.tsx | 14 +---- .../features/chat/chat-interface.tsx | 16 ++--- src/components/features/chat/chat-message.tsx | 16 +---- .../features/chat/git-control-bar.tsx | 21 ++----- .../features/chat/pending-user-messages.tsx | 22 ++----- .../features/home/tasks/task-card.tsx | 1 - .../conversation-websocket-context.tsx | 17 ++++++ src/hooks/use-handle-build-plan-click.ts | 20 ++----- src/i18n/translation.json | 17 ------ src/stores/optimistic-user-message-store.ts | 60 ++++++++++++++----- .../enqueue-home-task-pending-message.ts | 1 - 15 files changed, 172 insertions(+), 172 deletions(-) diff --git a/__tests__/components/chat/chat-interface.test.tsx b/__tests__/components/chat/chat-interface.test.tsx index 7f8ab01bc..593f15a3b 100644 --- a/__tests__/components/chat/chat-interface.test.tsx +++ b/__tests__/components/chat/chat-interface.test.tsx @@ -1,4 +1,12 @@ -import { afterEach, beforeEach, describe, expect, it, test, vi } from "vitest"; +import { + afterEach, + beforeEach, + describe, + expect, + it, + test, + vi, +} from "vitest"; import { fireEvent, render, @@ -233,7 +241,11 @@ describe("ChatInterface - Chat Suggestions", () => { }, }); - renderWithQueryClient(, queryClient, "/task-abc"); + renderWithQueryClient( + , + queryClient, + "/task-abc", + ); expect(screen.queryByTestId("chat-suggestions")).not.toBeInTheDocument(); }); @@ -255,7 +267,11 @@ describe("ChatInterface - Chat Suggestions", () => { }, }); - renderWithQueryClient(, queryClient, "/task-abc"); + renderWithQueryClient( + , + queryClient, + "/task-abc", + ); expect(screen.queryByTestId("chat-suggestions")).not.toBeInTheDocument(); }); @@ -648,14 +664,7 @@ describe("ChatInterface - Pending message queue", () => { expect(screen.getByTestId("chat-message-sending")).toBeInTheDocument(); expect(screen.queryByTestId("chat-message-retry")).not.toBeInTheDocument(); - act(() => { - resolveSend?.({ queued: false }); - }); - - await waitFor(() => { - expect(pendingMessage).toHaveAttribute("data-pending-status", "queued"); - }); - expect(screen.getByTestId("chat-message-queued")).toBeInTheDocument(); + resolveSend?.({ queued: false }); }); it("flips the message to 'error' with a retry link when send rejects", async () => { diff --git a/__tests__/components/features/chat/pending-user-messages.test.tsx b/__tests__/components/features/chat/pending-user-messages.test.tsx index 913713655..dd00935cc 100644 --- a/__tests__/components/features/chat/pending-user-messages.test.tsx +++ b/__tests__/components/features/chat/pending-user-messages.test.tsx @@ -33,7 +33,7 @@ describe("PendingUserMessages", () => { expect(container).toBeEmptyDOMElement(); }); - it("renders each sending message with the faded treatment", () => { + it("renders each queued message with the faded 'sending' treatment", () => { useOptimisticUserMessageStore.getState().enqueuePendingMessage({ conversationId: ACTIVE_CONVO, text: "first message", @@ -74,10 +74,12 @@ describe("PendingUserMessages", () => { }); it("shows an error state with a retry link when the message is in 'error'", () => { - const id = useOptimisticUserMessageStore.getState().enqueuePendingMessage({ - conversationId: ACTIVE_CONVO, - text: "broken message", - }); + const id = useOptimisticUserMessageStore + .getState() + .enqueuePendingMessage({ + conversationId: ACTIVE_CONVO, + text: "broken message", + }); useOptimisticUserMessageStore .getState() .markPendingMessageError(id, "Server unavailable"); @@ -90,12 +92,14 @@ describe("PendingUserMessages", () => { expect(screen.getByTestId("chat-message-retry")).toBeInTheDocument(); }); - it("re-sends and flips to 'queued' when retry succeeds", async () => { + it("re-sends and flips back to 'sending' when retry is clicked", async () => { mockSend.mockResolvedValueOnce({ queued: false }); - const id = useOptimisticUserMessageStore.getState().enqueuePendingMessage({ - conversationId: ACTIVE_CONVO, - text: "retry me", - }); + const id = useOptimisticUserMessageStore + .getState() + .enqueuePendingMessage({ + conversationId: ACTIVE_CONVO, + text: "retry me", + }); useOptimisticUserMessageStore .getState() .markPendingMessageError(id, "Server unavailable"); @@ -114,18 +118,21 @@ describe("PendingUserMessages", () => { ); await waitFor(() => { - const [entry] = useOptimisticUserMessageStore.getState().pendingMessages; - expect(entry.status).toBe("queued"); + const [entry] = + useOptimisticUserMessageStore.getState().pendingMessages; + expect(entry.status).toBe("sending"); expect(entry.errorMessage).toBeUndefined(); }); }); it("flips back to 'error' if the retry attempt also fails", async () => { mockSend.mockRejectedValueOnce(new Error("still broken")); - const id = useOptimisticUserMessageStore.getState().enqueuePendingMessage({ - conversationId: ACTIVE_CONVO, - text: "retry me", - }); + const id = useOptimisticUserMessageStore + .getState() + .enqueuePendingMessage({ + conversationId: ACTIVE_CONVO, + text: "retry me", + }); useOptimisticUserMessageStore .getState() .markPendingMessageError(id, "Server unavailable"); @@ -136,7 +143,8 @@ describe("PendingUserMessages", () => { await user.click(screen.getByTestId("chat-message-retry")); await waitFor(() => { - const [entry] = useOptimisticUserMessageStore.getState().pendingMessages; + const [entry] = + useOptimisticUserMessageStore.getState().pendingMessages; expect(entry.status).toBe("error"); expect(entry.errorMessage).toBe("still broken"); }); diff --git a/__tests__/stores/optimistic-user-message-store.test.ts b/__tests__/stores/optimistic-user-message-store.test.ts index 8f98271ec..bea772fc0 100644 --- a/__tests__/stores/optimistic-user-message-store.test.ts +++ b/__tests__/stores/optimistic-user-message-store.test.ts @@ -74,30 +74,38 @@ describe("optimistic-user-message-store", () => { expect(entry.errorMessage).toBeUndefined(); }); - it("marks a sending message as queued", () => { + it("marks a sending message as sent", () => { const store = useOptimisticUserMessageStore.getState(); const id = store.enqueuePendingMessage({ conversationId: CONVO, - text: "queued", + text: "accepted", }); - store.markPendingMessageQueued(id); + store.markPendingMessageSent(id); const [entry] = useOptimisticUserMessageStore.getState().pendingMessages; - expect(entry.status).toBe("queued"); + expect(entry.status).toBe("sent"); expect(entry.errorMessage).toBeUndefined(); }); - it("can enqueue an already queued message", () => { + it("marks the oldest sending message in a conversation as sent", () => { const store = useOptimisticUserMessageStore.getState(); - store.enqueuePendingMessage({ + const firstId = store.enqueuePendingMessage({ conversationId: CONVO, - text: "queued", - status: "queued", + text: "first", }); + store.enqueuePendingMessage({ conversationId: CONVO, text: "second" }); + store.enqueuePendingMessage({ conversationId: "conv-b", text: "other" }); - const [entry] = useOptimisticUserMessageStore.getState().pendingMessages; - expect(entry.status).toBe("queued"); + const marked = store.markOldestSendingMessageSent(CONVO); + + expect(marked?.id).toBe(firstId); + const pending = useOptimisticUserMessageStore.getState().pendingMessages; + expect(pending.map((message) => message.status)).toEqual([ + "sent", + "sending", + "sending", + ]); }); it("enqueue stores `content` separately from `text` and defaults it to `text`", () => { @@ -137,18 +145,19 @@ describe("optimistic-user-message-store", () => { const consumed = store.consumeMatchingPendingMessage(CONVO, "second"); expect(consumed?.id).toBe(secondId); - const remaining = useOptimisticUserMessageStore.getState().pendingMessages; + const remaining = + useOptimisticUserMessageStore.getState().pendingMessages; expect(remaining).toHaveLength(1); expect(remaining[0].id).toBe(firstId); }); - it("consumeMatchingPendingMessage removes a queued optimistic message when the echo arrives", () => { + it("consumeMatchingPendingMessage removes a sent optimistic message when the echo arrives", () => { const store = useOptimisticUserMessageStore.getState(); const id = store.enqueuePendingMessage({ conversationId: CONVO, text: "accepted", }); - store.markPendingMessageQueued(id); + store.markPendingMessageSent(id); const consumed = store.consumeMatchingPendingMessage(CONVO, "accepted"); @@ -194,7 +203,8 @@ describe("optimistic-user-message-store", () => { const consumed = store.consumeMatchingPendingMessage(CONVO, "second"); expect(consumed?.id).toBe(secondId); - const remaining = useOptimisticUserMessageStore.getState().pendingMessages; + const remaining = + useOptimisticUserMessageStore.getState().pendingMessages; expect(remaining).toHaveLength(1); expect(remaining[0].id).toBe(firstId); expect(remaining[0].status).toBe("error"); @@ -232,7 +242,8 @@ describe("optimistic-user-message-store", () => { const consumed = store.consumeMatchingPendingMessage("conv-b", "shared"); expect(consumed?.id).toBe(bId); - const remaining = useOptimisticUserMessageStore.getState().pendingMessages; + const remaining = + useOptimisticUserMessageStore.getState().pendingMessages; expect(remaining).toHaveLength(1); expect(remaining[0].id).toBe(aId); }); @@ -258,19 +269,19 @@ describe("optimistic-user-message-store", () => { expect(entry.errorMessage).toBe("Send timed out"); }); - it("watchdog timeout does nothing if the message was already queued", () => { + it("watchdog timeout does nothing if the message was already marked sent", () => { const store = useOptimisticUserMessageStore.getState(); const id = store.enqueuePendingMessage({ conversationId: CONVO, text: "accepted", }); - store.markPendingMessageQueued(id); + store.markPendingMessageSent(id); vi.advanceTimersByTime(PENDING_MESSAGE_TIMEOUT_MS); const [entry] = useOptimisticUserMessageStore.getState().pendingMessages; expect(entry.id).toBe(id); - expect(entry.status).toBe("queued"); + expect(entry.status).toBe("sent"); }); it("watchdog timeout does nothing if the echo already consumed the message", () => { @@ -310,7 +321,8 @@ describe("optimistic-user-message-store", () => { store.removePendingMessage(firstId); - const remaining = useOptimisticUserMessageStore.getState().pendingMessages; + const remaining = + useOptimisticUserMessageStore.getState().pendingMessages; expect(remaining.map((m) => m.text)).toEqual(["second"]); }); diff --git a/__tests__/utils/optimistic-user-message-events.test.ts b/__tests__/utils/optimistic-user-message-events.test.ts index cf34c5a4e..81c8d433a 100644 --- a/__tests__/utils/optimistic-user-message-events.test.ts +++ b/__tests__/utils/optimistic-user-message-events.test.ts @@ -73,14 +73,13 @@ describe("optimistic-user-message-events", () => { }); it("carries pending status into the optimistic event", () => { - const [event] = mergeOptimisticUserMessages( - [], - [{ ...pendingMessage, status: "queued" }], - ); + const [event] = mergeOptimisticUserMessages([], [ + { ...pendingMessage, status: "sent" }, + ]); expect(isOptimisticUserMessageEvent(event)).toBe(true); if (isOptimisticUserMessageEvent(event)) { - expect(event.optimisticPendingStatus).toBe("queued"); + expect(event.optimisticPendingStatus).toBe("sent"); expect(event.llm_message.content).toEqual([ { type: "text", text: "optimistic message" }, ]); @@ -88,10 +87,9 @@ describe("optimistic-user-message-events", () => { }); it("does not duplicate an event that already exists", () => { - const merged = mergeOptimisticUserMessages( - [userEvent], - [{ ...pendingMessage, id: userEvent.id }], - ); + const merged = mergeOptimisticUserMessages([userEvent], [ + { ...pendingMessage, id: userEvent.id }, + ]); expect(merged).toEqual([userEvent]); }); diff --git a/src/components/conversation-events/chat/event-message-components/user-assistant-event-message.tsx b/src/components/conversation-events/chat/event-message-components/user-assistant-event-message.tsx index af840c1e0..6224e3b0c 100644 --- a/src/components/conversation-events/chat/event-message-components/user-assistant-event-message.tsx +++ b/src/components/conversation-events/chat/event-message-components/user-assistant-event-message.tsx @@ -26,9 +26,6 @@ export function UserAssistantEventMessage({ const markPendingMessageSending = useOptimisticUserMessageStore( (state) => state.markPendingMessageSending, ); - const markPendingMessageQueued = useOptimisticUserMessageStore( - (state) => state.markPendingMessageQueued, - ); const { send } = useSendMessage(); const message = parseMessageFromEvent(event); const pendingStatus = isOptimisticUserMessageEvent(event) @@ -65,26 +62,19 @@ export function UserAssistantEventMessage({ pendingMessage.timestamp, ), ); - markPendingMessageQueued(pendingMessage.id); } catch (error) { const errorMessage = error instanceof Error ? error.message : "Failed to send message"; markPendingMessageError(pendingMessage.id, errorMessage); } - }, [ - event, - markPendingMessageError, - markPendingMessageQueued, - markPendingMessageSending, - send, - ]); + }, [event, markPendingMessageError, markPendingMessageSending, send]); return ( {imageUrls.length > 0 && ( diff --git a/src/components/features/chat/chat-interface.tsx b/src/components/features/chat/chat-interface.tsx index 8b91a7d88..b40575aca 100644 --- a/src/components/features/chat/chat-interface.tsx +++ b/src/components/features/chat/chat-interface.tsx @@ -42,7 +42,6 @@ import { useNewConversationCommand } from "#/hooks/mutation/use-new-conversation import { useOptionalConversationId } from "#/hooks/use-conversation-id"; import { useActiveConversation } from "#/hooks/query/use-active-conversation"; import { I18nKey } from "#/i18n/declaration"; -import { setConversationState } from "#/utils/conversation-local-storage"; function getEntryPoint( hasRepository: boolean | null, @@ -77,9 +76,6 @@ export function ChatInterface() { const markPendingMessageError = useOptimisticUserMessageStore( (state) => state.markPendingMessageError, ); - const markPendingMessageQueued = useOptimisticUserMessageStore( - (state) => state.markPendingMessageQueued, - ); const pendingMessages = useOptimisticUserMessageStore( (state) => state.pendingMessages, ); @@ -341,11 +337,11 @@ export function ChatInterface() { const prompt = uploadedFiles.length > 0 ? `${content}\n\n${filePrompt}` : content; - // Enqueue the message into the local pending queue with status "sending" so - // the user immediately sees it in the chat. Once the local send call - // succeeds we mark it "queued"; only the server's echoed `UserMessageEvent` - // removes it, because later agent activity may still belong to the previous - // turn. + // Enqueue the message into the local pending queue with status "sending" + // so the user immediately sees it in the chat with a faded treatment. The + // entry is removed when the WebSocket echoes back the corresponding + // `UserMessageEvent`. If the API call to send the message fails, the entry + // is flipped to "error" with a retry link. const pendingId = enqueuePendingMessage({ conversationId: conversationId!, // `text` is what the user sees in the bubble; `content` is what we @@ -364,8 +360,6 @@ export function ChatInterface() { await send( createChatMessage(prompt, imageUrls, uploadedFiles, timestamp), ); - markPendingMessageQueued(pendingId); - setConversationState(conversationId!, { draftMessage: null }); } catch (sendError) { const sendErrorMessage = sendError instanceof Error diff --git a/src/components/features/chat/chat-message.tsx b/src/components/features/chat/chat-message.tsx index 93cea5ab3..d60a22455 100644 --- a/src/components/features/chat/chat-message.tsx +++ b/src/components/features/chat/chat-message.tsx @@ -7,7 +7,7 @@ import { StyledTooltip } from "#/components/shared/buttons/styled-tooltip"; import { I18nKey } from "#/i18n/declaration"; import { MarkdownRenderer } from "../markdown/markdown-renderer"; -export type ChatMessagePendingStatus = "sending" | "queued" | "error"; +export type ChatMessagePendingStatus = "sending" | "error"; interface ChatMessageProps { type: SourceType; @@ -68,8 +68,7 @@ export function ChatMessage({ isFromPlanningAgent && type === "agent" && "border border-[#597ff4] bg-tertiary p-4 mt-2", - (pendingStatus === "sending" || pendingStatus === "queued") && - "opacity-60", + pendingStatus === "sending" && "opacity-60", pendingStatus === "error" && "border border-status-fail-border", )} > @@ -130,17 +129,6 @@ export function ChatMessage({ )} - {pendingStatus === "queued" && ( - - {t(I18nKey.CHAT_INTERFACE$MESSAGE_QUEUED)} - - )} - {pendingStatus === "error" && ( state.markPendingMessageError, ); - const markPendingMessageQueued = useOptimisticUserMessageStore( - (state) => state.markPendingMessageQueued, - ); const { backend } = useActiveBackend(); const isLocalBackend = backend.kind === "local"; @@ -180,18 +177,12 @@ export function GitControlBar({ onSuggestionsClick }: GitControlBarProps) { timestamp: new Date().toISOString(), }, }), - ) - .then(() => { - if (pendingId) markPendingMessageQueued(pendingId); - }) - .catch((error) => { - if (!pendingId) return; - const errorMessage = - error instanceof Error - ? error.message - : "Failed to send message"; - markPendingMessageError(pendingId, errorMessage); - }); + ).catch((error) => { + if (!pendingId) return; + const errorMessage = + error instanceof Error ? error.message : "Failed to send message"; + markPendingMessageError(pendingId, errorMessage); + }); }, }, ); diff --git a/src/components/features/chat/pending-user-messages.tsx b/src/components/features/chat/pending-user-messages.tsx index 91e3fe7ad..cf0d6b90c 100644 --- a/src/components/features/chat/pending-user-messages.tsx +++ b/src/components/features/chat/pending-user-messages.tsx @@ -9,11 +9,10 @@ import { ChatMessage } from "./chat-message"; /** * Renders the queue of locally-tracked user messages that have been submitted - * but not yet echoed back through the WebSocket. Each message starts as - * "sending", switches to "queued" once the local send call succeeds, and is - * removed only when the server echoes a real `UserMessageEvent`. If the API - * rejects the send, the message switches to an "error" state with a retry - * button. + * but not yet echoed back through the WebSocket. Each message shows a faded + * "sending" treatment until the server echoes a real `UserMessageEvent` + * (which removes it via `consumeMatchingPendingMessage`). If the API rejects the + * send, the message switches to an "error" state with a retry button. * * The queue is global but each entry is tagged with the conversation id it * was enqueued from; this component filters to only entries belonging to the @@ -31,9 +30,6 @@ export function PendingUserMessages() { const markPendingMessageSending = useOptimisticUserMessageStore( (state) => state.markPendingMessageSending, ); - const markPendingMessageQueued = useOptimisticUserMessageStore( - (state) => state.markPendingMessageQueued, - ); const { send } = useSendMessage(); const visibleMessages = React.useMemo( @@ -67,19 +63,13 @@ export function PendingUserMessages() { message.timestamp, ), ); - markPendingMessageQueued(id); } catch (error) { const errorMessage = error instanceof Error ? error.message : "Failed to send message"; markPendingMessageError(id, errorMessage); } }, - [ - send, - markPendingMessageError, - markPendingMessageQueued, - markPendingMessageSending, - ], + [send, markPendingMessageError, markPendingMessageSending], ); if (visibleMessages.length === 0) { @@ -93,7 +83,7 @@ export function PendingUserMessages() { key={message.id} type="user" message={message.text} - pendingStatus={message.status} + pendingStatus={message.status === "sent" ? undefined : message.status} onRetry={ message.status === "error" ? () => handleRetry(message.id) diff --git a/src/components/features/home/tasks/task-card.tsx b/src/components/features/home/tasks/task-card.tsx index db73650c9..e6aa8ac59 100644 --- a/src/components/features/home/tasks/task-card.tsx +++ b/src/components/features/home/tasks/task-card.tsx @@ -47,7 +47,6 @@ export function TaskCard({ task }: TaskCardProps) { enqueuePendingMessage({ conversationId: data.conversation_id, text: t("TASK$ADDRESSING_TASK"), - status: "queued", }); navigate(`/conversations/${data.conversation_id}`); }, diff --git a/src/contexts/conversation-websocket-context.tsx b/src/contexts/conversation-websocket-context.tsx index 8e7f25be2..e5ea96135 100644 --- a/src/contexts/conversation-websocket-context.tsx +++ b/src/contexts/conversation-websocket-context.tsx @@ -137,6 +137,9 @@ export function ConversationWebSocketProvider({ const consumeMatchingPendingMessage = useOptimisticUserMessageStore( (state) => state.consumeMatchingPendingMessage, ); + const markOldestSendingMessageSent = useOptimisticUserMessageStore( + (state) => state.markOldestSendingMessageSent, + ); const { setExecutionStatus } = useConversationStateStore(); const { appendInput, appendOutput } = useCommandStore(); @@ -455,6 +458,12 @@ export function ConversationWebSocketProvider({ // Clear draft from localStorage - message was successfully delivered setConversationState(conversationId, { draftMessage: null }); } + } else if ( + conversationId && + (event.source === "agent" || isConversationStateUpdateEvent(event)) + ) { + markOldestSendingMessageSent(conversationId); + setConversationState(conversationId, { draftMessage: null }); } // Handle cache invalidation for ActionEvent @@ -550,6 +559,7 @@ export function ConversationWebSocketProvider({ addEvent, setErrorMessage, consumeMatchingPendingMessage, + markOldestSendingMessageSent, queryClient, conversationId, setExecutionStatus, @@ -635,6 +645,12 @@ export function ConversationWebSocketProvider({ ); setConversationState(conversationId, { draftMessage: null }); } + } else if ( + conversationId && + (event.source === "agent" || isConversationStateUpdateEvent(event)) + ) { + markOldestSendingMessageSent(conversationId); + setConversationState(conversationId, { draftMessage: null }); } // Handle cache invalidation for ActionEvent @@ -724,6 +740,7 @@ export function ConversationWebSocketProvider({ expectedEventCountPlanning, setErrorMessage, consumeMatchingPendingMessage, + markOldestSendingMessageSent, queryClient, subConversations, conversationId, diff --git a/src/hooks/use-handle-build-plan-click.ts b/src/hooks/use-handle-build-plan-click.ts index 87b6ec67d..f08651302 100644 --- a/src/hooks/use-handle-build-plan-click.ts +++ b/src/hooks/use-handle-build-plan-click.ts @@ -21,9 +21,6 @@ export const useHandleBuildPlanClick = () => { const markPendingMessageError = useOptimisticUserMessageStore( (state) => state.markPendingMessageError, ); - const markPendingMessageQueued = useOptimisticUserMessageStore( - (state) => state.markPendingMessageQueued, - ); const handleBuildPlanClick = useCallback( (event?: React.MouseEvent | KeyboardEvent) => { @@ -47,16 +44,12 @@ export const useHandleBuildPlanClick = () => { timestamp, }) : null; - send(createChatMessage(buildPrompt, [], [], timestamp)) - .then(() => { - if (pendingId) markPendingMessageQueued(pendingId); - }) - .catch((error) => { - if (!pendingId) return; - const errorMessage = - error instanceof Error ? error.message : "Failed to send message"; - markPendingMessageError(pendingId, errorMessage); - }); + send(createChatMessage(buildPrompt, [], [], timestamp)).catch((error) => { + if (!pendingId) return; + const errorMessage = + error instanceof Error ? error.message : "Failed to send message"; + markPendingMessageError(pendingId, errorMessage); + }); }, [ setConversationMode, @@ -64,7 +57,6 @@ export const useHandleBuildPlanClick = () => { conversationId, enqueuePendingMessage, markPendingMessageError, - markPendingMessageQueued, ], ); diff --git a/src/i18n/translation.json b/src/i18n/translation.json index 751fad21e..fd41b3232 100644 --- a/src/i18n/translation.json +++ b/src/i18n/translation.json @@ -8618,23 +8618,6 @@ "uk": "Надсилання...", "ca": "Enviant..." }, - "CHAT_INTERFACE$MESSAGE_QUEUED": { - "en": "Queued", - "ja": "キューに追加済み", - "zh-CN": "已排队", - "zh-TW": "已排入佇列", - "ko-KR": "대기열에 추가됨", - "no": "I kø", - "it": "In coda", - "pt": "Na fila", - "es": "En cola", - "ar": "في قائمة الانتظار", - "fr": "En file d'attente", - "tr": "Sırada", - "de": "In Warteschlange", - "uk": "У черзі", - "ca": "A la cua" - }, "CHAT_INTERFACE$MESSAGE_SEND_FAILED": { "en": "Failed to send.", "ja": "送信に失敗しました。", diff --git a/src/stores/optimistic-user-message-store.ts b/src/stores/optimistic-user-message-store.ts index caf5f473b..1d87d67e2 100644 --- a/src/stores/optimistic-user-message-store.ts +++ b/src/stores/optimistic-user-message-store.ts @@ -1,6 +1,6 @@ import { create } from "zustand"; -export type PendingUserMessageStatus = "sending" | "queued" | "error"; +export type PendingUserMessageStatus = "sending" | "sent" | "error"; /** * How long a pending message is allowed to stay in "sending" state before we @@ -52,7 +52,6 @@ export interface EnqueuePendingMessagePayload { imageUrls?: string[]; fileUrls?: string[]; timestamp?: string; - status?: PendingUserMessageStatus; } interface OptimisticUserMessageActions { @@ -65,25 +64,34 @@ interface OptimisticUserMessageActions { enqueuePendingMessage: (payload: EnqueuePendingMessagePayload) => string; /** Mark a pending message as failed (the API rejected it). */ markPendingMessageError: (id: string, errorMessage?: string) => void; - /** Mark a pending message as queued after the local send call succeeds. */ - markPendingMessageQueued: (id: string) => void; + /** Mark a pending message as accepted by the server. */ + markPendingMessageSent: (id: string) => void; /** Mark a pending message as sending again (used when retrying). */ markPendingMessageSending: (id: string) => void; /** Drop a pending message from the queue (e.g., after success/cancellation). */ removePendingMessage: (id: string) => void; /** - * Remove the pending message that matches the given echoed `content` in the - * given conversation. Matching is done by exact content equality on non-error - * messages; if no match exists we fall back to removing the oldest non-error - * entry in that conversation so that an echo with a slightly munged body - * (e.g. trailing-whitespace stripped by the server) still clears its bubble. - * Scoping by `conversationId` ensures a stale ack for one conversation never - * pops a pending entry belonging to another. + * Remove the pending message that matches the given echoed `content` in + * the given conversation. Matching is done by exact content equality on + * messages still in "sending" state; if no match exists we fall back to + * removing the oldest "sending" entry in that conversation so that an echo + * with a slightly munged body (e.g. trailing-whitespace stripped by the + * server) still clears its bubble. Scoping by `conversationId` ensures a + * stale ack for one conversation never pops a pending entry belonging to + * another. */ consumeMatchingPendingMessage: ( conversationId: string, content: string, ) => PendingUserMessage | null; + /** + * Mark the oldest still-sending message in the conversation as accepted by + * the server. Used when the server streams agent work but does not echo the + * user message event on the live WebSocket path. + */ + markOldestSendingMessageSent: ( + conversationId: string, + ) => PendingUserMessage | null; /** Wipe all queued messages (e.g., when changing conversations). */ clearPendingMessages: () => void; /** @@ -121,7 +129,7 @@ export const useOptimisticUserMessageStore = create( conversationId: payload.conversationId, text: payload.text, content: payload.content ?? payload.text, - status: payload.status ?? "sending", + status: "sending", imageUrls: payload.imageUrls ?? [], fileUrls: payload.fileUrls ?? [], timestamp: payload.timestamp ?? new Date().toISOString(), @@ -152,11 +160,11 @@ export const useOptimisticUserMessageStore = create( ), })), - markPendingMessageQueued: (id) => + markPendingMessageSent: (id) => set((state) => ({ pendingMessages: state.pendingMessages.map((message) => message.id === id - ? { ...message, status: "queued", errorMessage: undefined } + ? { ...message, status: "sent", errorMessage: undefined } : message, ), })), @@ -183,7 +191,7 @@ export const useOptimisticUserMessageStore = create( // is what makes out-of-order echoes safe: an echo of "world" will pop // the "world" bubble, not the older "hello" one). If no exact match // exists — e.g. the server slightly munged the body — fall back to the - // oldest non-error entry in this conversation so the user doesn't end + // oldest "sending" entry in this conversation so the user doesn't end // up with a permanently-stuck bubble in the happy-path single-message // case. let consumed: PendingUserMessage | null = null; @@ -208,6 +216,28 @@ export const useOptimisticUserMessageStore = create( return consumed; }, + markOldestSendingMessageSent: (conversationId) => { + let marked: PendingUserMessage | null = null; + set((state) => { + const target = state.pendingMessages.find( + (message) => + message.status === "sending" && + message.conversationId === conversationId, + ); + if (!target) return state; + + marked = target; + return { + pendingMessages: state.pendingMessages.map((message) => + message.id === target.id + ? { ...message, status: "sent", errorMessage: undefined } + : message, + ), + }; + }); + return marked; + }, + clearPendingMessages: () => set(() => ({ ...initialState })), reassignPendingMessages: (fromConversationId, toConversationId) => diff --git a/src/utils/enqueue-home-task-pending-message.ts b/src/utils/enqueue-home-task-pending-message.ts index 45b31c5e5..e576d91ea 100644 --- a/src/utils/enqueue-home-task-pending-message.ts +++ b/src/utils/enqueue-home-task-pending-message.ts @@ -26,6 +26,5 @@ export async function enqueueHomeTaskPendingMessage(options: { content: options.text, imageUrls, fileUrls: [], - status: "queued", }); } From d5a5ac4da78638f38a05ec5f8471e24e02591ae5 Mon Sep 17 00:00:00 2001 From: neubig <398875+neubig@users.noreply.github.com> Date: Sun, 24 May 2026 09:39:59 -0400 Subject: [PATCH 6/7] Revert "Anchor optimistic user messages in chat" This reverts commit 03623d264a8815985cbc2d160e5843463a0278fd. --- .../optimistic-user-message-store.test.ts | 65 ------------- .../optimistic-user-message-events.test.ts | 96 ------------------- .../user-assistant-event-message.tsx | 44 --------- .../conversation-events/chat/messages.tsx | 12 --- .../features/chat/chat-interface.tsx | 51 ++++------ .../features/chat/pending-user-messages.tsx | 4 +- .../conversation-websocket-context.tsx | 17 ---- src/stores/optimistic-user-message-store.ts | 53 ++-------- src/utils/optimistic-user-message-events.ts | 68 ------------- 9 files changed, 27 insertions(+), 383 deletions(-) delete mode 100644 __tests__/utils/optimistic-user-message-events.test.ts delete mode 100644 src/utils/optimistic-user-message-events.ts diff --git a/__tests__/stores/optimistic-user-message-store.test.ts b/__tests__/stores/optimistic-user-message-store.test.ts index bea772fc0..fe23f9330 100644 --- a/__tests__/stores/optimistic-user-message-store.test.ts +++ b/__tests__/stores/optimistic-user-message-store.test.ts @@ -74,40 +74,6 @@ describe("optimistic-user-message-store", () => { expect(entry.errorMessage).toBeUndefined(); }); - it("marks a sending message as sent", () => { - const store = useOptimisticUserMessageStore.getState(); - const id = store.enqueuePendingMessage({ - conversationId: CONVO, - text: "accepted", - }); - - store.markPendingMessageSent(id); - - const [entry] = useOptimisticUserMessageStore.getState().pendingMessages; - expect(entry.status).toBe("sent"); - expect(entry.errorMessage).toBeUndefined(); - }); - - it("marks the oldest sending message in a conversation as sent", () => { - const store = useOptimisticUserMessageStore.getState(); - const firstId = store.enqueuePendingMessage({ - conversationId: CONVO, - text: "first", - }); - store.enqueuePendingMessage({ conversationId: CONVO, text: "second" }); - store.enqueuePendingMessage({ conversationId: "conv-b", text: "other" }); - - const marked = store.markOldestSendingMessageSent(CONVO); - - expect(marked?.id).toBe(firstId); - const pending = useOptimisticUserMessageStore.getState().pendingMessages; - expect(pending.map((message) => message.status)).toEqual([ - "sent", - "sending", - "sending", - ]); - }); - it("enqueue stores `content` separately from `text` and defaults it to `text`", () => { const store = useOptimisticUserMessageStore.getState(); const idA = store.enqueuePendingMessage({ @@ -151,22 +117,6 @@ describe("optimistic-user-message-store", () => { expect(remaining[0].id).toBe(firstId); }); - it("consumeMatchingPendingMessage removes a sent optimistic message when the echo arrives", () => { - const store = useOptimisticUserMessageStore.getState(); - const id = store.enqueuePendingMessage({ - conversationId: CONVO, - text: "accepted", - }); - store.markPendingMessageSent(id); - - const consumed = store.consumeMatchingPendingMessage(CONVO, "accepted"); - - expect(consumed?.id).toBe(id); - expect( - useOptimisticUserMessageStore.getState().pendingMessages, - ).toHaveLength(0); - }); - it("consumeMatchingPendingMessage falls back to oldest sending entry when no exact match exists", () => { const store = useOptimisticUserMessageStore.getState(); const firstId = store.enqueuePendingMessage({ @@ -269,21 +219,6 @@ describe("optimistic-user-message-store", () => { expect(entry.errorMessage).toBe("Send timed out"); }); - it("watchdog timeout does nothing if the message was already marked sent", () => { - const store = useOptimisticUserMessageStore.getState(); - const id = store.enqueuePendingMessage({ - conversationId: CONVO, - text: "accepted", - }); - store.markPendingMessageSent(id); - - vi.advanceTimersByTime(PENDING_MESSAGE_TIMEOUT_MS); - - const [entry] = useOptimisticUserMessageStore.getState().pendingMessages; - expect(entry.id).toBe(id); - expect(entry.status).toBe("sent"); - }); - it("watchdog timeout does nothing if the echo already consumed the message", () => { const store = useOptimisticUserMessageStore.getState(); store.enqueuePendingMessage({ conversationId: CONVO, text: "fast" }); diff --git a/__tests__/utils/optimistic-user-message-events.test.ts b/__tests__/utils/optimistic-user-message-events.test.ts deleted file mode 100644 index 81c8d433a..000000000 --- a/__tests__/utils/optimistic-user-message-events.test.ts +++ /dev/null @@ -1,96 +0,0 @@ -import { describe, expect, it } from "vitest"; -import { - MessageEvent, - OpenHandsEvent, - SecurityRisk, -} from "#/types/agent-server/core"; -import { PendingUserMessage } from "#/stores/optimistic-user-message-store"; -import { - isOptimisticUserMessageEvent, - mergeOptimisticUserMessages, -} from "#/utils/optimistic-user-message-events"; - -describe("optimistic-user-message-events", () => { - const userEvent: MessageEvent = { - id: "server-user", - timestamp: "2026-05-24T09:11:33.000Z", - source: "user", - llm_message: { - role: "user", - content: [{ type: "text", text: "server message" }], - }, - activated_microagents: [], - extended_content: [], - }; - - const agentEvent: OpenHandsEvent = { - id: "agent-event", - timestamp: "2026-05-24T09:11:35.000Z", - source: "agent", - thought: [], - thinking_blocks: [], - action: { - kind: "FinishAction", - message: "done", - }, - tool_name: "finish", - tool_call_id: "finish-call", - tool_call: { - id: "finish-call", - type: "function", - function: { - name: "finish", - arguments: JSON.stringify({ message: "done" }), - }, - }, - llm_response_id: "response", - security_risk: SecurityRisk.UNKNOWN, - }; - - const pendingMessage: PendingUserMessage = { - id: "pending-1", - conversationId: "conv-1", - text: "optimistic message", - content: "optimistic message", - status: "sending", - imageUrls: [], - fileUrls: [], - timestamp: "2026-05-24T09:11:34.000Z", - }; - - it("inserts pending messages into timestamp order", () => { - const merged = mergeOptimisticUserMessages( - [userEvent, agentEvent], - [pendingMessage], - ); - - expect(merged.map((event) => event.id)).toEqual([ - "server-user", - "pending-1", - "agent-event", - ]); - expect(isOptimisticUserMessageEvent(merged[1])).toBe(true); - }); - - it("carries pending status into the optimistic event", () => { - const [event] = mergeOptimisticUserMessages([], [ - { ...pendingMessage, status: "sent" }, - ]); - - expect(isOptimisticUserMessageEvent(event)).toBe(true); - if (isOptimisticUserMessageEvent(event)) { - expect(event.optimisticPendingStatus).toBe("sent"); - expect(event.llm_message.content).toEqual([ - { type: "text", text: "optimistic message" }, - ]); - } - }); - - it("does not duplicate an event that already exists", () => { - const merged = mergeOptimisticUserMessages([userEvent], [ - { ...pendingMessage, id: userEvent.id }, - ]); - - expect(merged).toEqual([userEvent]); - }); -}); diff --git a/src/components/conversation-events/chat/event-message-components/user-assistant-event-message.tsx b/src/components/conversation-events/chat/event-message-components/user-assistant-event-message.tsx index 6224e3b0c..5727ddc51 100644 --- a/src/components/conversation-events/chat/event-message-components/user-assistant-event-message.tsx +++ b/src/components/conversation-events/chat/event-message-components/user-assistant-event-message.tsx @@ -4,10 +4,6 @@ import { ChatMessage } from "../../../features/chat/chat-message"; import { ImageCarousel } from "../../../features/images/image-carousel"; import { ConversationConfirmationButtons } from "#/components/shared/buttons/conversation-confirmation-buttons"; import { parseMessageFromEvent } from "../event-content-helpers/parse-message-from-event"; -import { isOptimisticUserMessageEvent } from "#/utils/optimistic-user-message-events"; -import { useOptimisticUserMessageStore } from "#/stores/optimistic-user-message-store"; -import { useSendMessage } from "#/hooks/use-send-message"; -import { createChatMessage } from "#/services/chat-service"; interface UserAssistantEventMessageProps { event: MessageEvent; @@ -20,17 +16,7 @@ export function UserAssistantEventMessage({ isLastMessage, isFromPlanningAgent, }: UserAssistantEventMessageProps) { - const markPendingMessageError = useOptimisticUserMessageStore( - (state) => state.markPendingMessageError, - ); - const markPendingMessageSending = useOptimisticUserMessageStore( - (state) => state.markPendingMessageSending, - ); - const { send } = useSendMessage(); const message = parseMessageFromEvent(event); - const pendingStatus = isOptimisticUserMessageEvent(event) - ? event.optimisticPendingStatus - : undefined; const imageUrls: string[] = []; if (Array.isArray(event.llm_message.content)) { @@ -41,41 +27,11 @@ export function UserAssistantEventMessage({ }); } - const handleRetry = React.useCallback(async () => { - if (!isOptimisticUserMessageEvent(event)) return; - - const pendingMessage = useOptimisticUserMessageStore - .getState() - .pendingMessages.find( - (entry) => entry.id === event.optimisticPendingMessageId, - ); - if (!pendingMessage) return; - - markPendingMessageSending(pendingMessage.id); - - try { - await send( - createChatMessage( - pendingMessage.content, - pendingMessage.imageUrls, - pendingMessage.fileUrls, - pendingMessage.timestamp, - ), - ); - } catch (error) { - const errorMessage = - error instanceof Error ? error.message : "Failed to send message"; - markPendingMessageError(pendingMessage.id, errorMessage); - } - }, [event, markPendingMessageError, markPendingMessageSending, send]); - return ( {imageUrls.length > 0 && ( diff --git a/src/components/conversation-events/chat/messages.tsx b/src/components/conversation-events/chat/messages.tsx index 28d57b134..b18e00e31 100644 --- a/src/components/conversation-events/chat/messages.tsx +++ b/src/components/conversation-events/chat/messages.tsx @@ -8,7 +8,6 @@ import { ThoughtEventMessage } from "./event-message-components/thought-event-me import { useModelStore } from "#/stores/model-store"; import { ModelMessages } from "#/components/features/chat/model-messages"; import { useOptionalConversationId } from "#/hooks/use-conversation-id"; -import { isOptimisticUserMessageEvent } from "#/utils/optimistic-user-message-events"; // TODO: Implement microagent functionality for V1 when APIs support V1 event IDs // import { AgentState } from "#/types/agent-state"; // import MemoryIcon from "#/icons/memory_icon.svg?react"; @@ -20,15 +19,6 @@ interface MessagesProps { const getLastEventId = (events: OpenHandsEvent[]) => events.at(-1)?.id; -const getEventListSignature = (events: OpenHandsEvent[]) => - events - .map((event) => - isOptimisticUserMessageEvent(event) - ? `${event.id}:${event.optimisticPendingStatus}` - : String(event.id), - ) - .join("|"); - export const Messages: React.FC = React.memo( ({ messages, allEvents }) => { const { conversationId } = useOptionalConversationId(); @@ -142,8 +132,6 @@ export const Messages: React.FC = React.memo( (prevProps, nextProps) => prevProps.messages.length === nextProps.messages.length && prevProps.allEvents.length === nextProps.allEvents.length && - getEventListSignature(prevProps.messages) === - getEventListSignature(nextProps.messages) && getLastEventId(prevProps.messages) === getLastEventId(nextProps.messages) && getLastEventId(prevProps.allEvents) === getLastEventId(nextProps.allEvents), ); diff --git a/src/components/features/chat/chat-interface.tsx b/src/components/features/chat/chat-interface.tsx index b40575aca..764c21773 100644 --- a/src/components/features/chat/chat-interface.tsx +++ b/src/components/features/chat/chat-interface.tsx @@ -28,13 +28,13 @@ import { useOptimisticUserMessageStore } from "#/stores/optimistic-user-message- import { SERVER_CONNECTION_ERROR_MESSAGE } from "#/constants/server-connection-error"; import { ErrorMessageBanner } from "./error-message-banner"; import { Messages } from "#/components/conversation-events/chat/messages"; +import { PendingUserMessages } from "./pending-user-messages"; import { useUnifiedUploadFiles } from "#/hooks/mutation/use-unified-upload-files"; import { validateFiles } from "#/utils/file-validation"; import { useConversationStore } from "#/stores/conversation-store"; import ConfirmationModeEnabled from "./confirmation-mode-enabled"; import { useTaskPolling } from "#/hooks/query/use-task-polling"; import { matchesPendingConversationId } from "#/utils/pending-task-message-link"; -import { mergeOptimisticUserMessages } from "#/utils/optimistic-user-message-events"; import { useConversationWebSocket } from "#/contexts/conversation-websocket-context"; import ChatStatusIndicator from "./chat-status-indicator"; import { getStatusColor, getStatusText } from "#/utils/utils"; @@ -225,24 +225,6 @@ export function ChatInterface() { [pendingMessages, conversationId], ); - const visiblePendingMessages = React.useMemo( - () => - conversationId - ? pendingMessages.filter((message) => - matchesPendingConversationId( - conversationId, - message.conversationId, - ), - ) - : [], - [pendingMessages, conversationId], - ); - - const renderableEventsWithPending = React.useMemo( - () => mergeOptimisticUserMessages(renderableEvents, visiblePendingMessages), - [renderableEvents, visiblePendingMessages], - ); - // Show V1 messages immediately if events exist in store (e.g., remount), // if the user already has a locally-tracked pending bubble (home-page cloud // submit while history/WS catch up), or once loading completes. This @@ -393,11 +375,7 @@ export function ChatInterface() { } // Note: We intentionally exclude autoScroll from deps because we only want // to scroll when message content changes, not when autoScroll state changes. - }, [ - renderableEventsWithPending.length, - hasPendingUserMessages, - scrollDomToBottom, - ]); + }, [renderableEvents.length, hasPendingUserMessages, scrollDomToBottom]); // Auto-load older events when the chat content doesn't overflow the // scroll area (no scrollbar to drag, no wheel events past 0). We @@ -417,7 +395,7 @@ export function ChatInterface() { const target = scrollRef.current; if (!target) return; maybeLoadOlderRef.current(target); - }, [renderableEventsWithPending.length, hasMoreOlderEvents]); + }, [renderableEvents.length, hasMoreOlderEvents]); // Create a ScrollProvider with the scroll hook values const scrollProviderValue = { @@ -519,13 +497,22 @@ export function ChatInterface() { anchored to `null` and live above the message list. */} - {showConversationMessages && - renderableEventsWithPending.length > 0 && ( - - )} + {showConversationMessages && renderableEvents.length > 0 && ( + + )} + + {/* + Render the local pending-message queue independently so messages + the user just submitted show up immediately (with a faded "sending" + treatment) even before any real conversation event has come back + from the server. Entries drain (FIFO) when the matching + UserMessageEvent echoes back over the WebSocket, so this never + double-renders alongside the real event list. + */} +
diff --git a/src/components/features/chat/pending-user-messages.tsx b/src/components/features/chat/pending-user-messages.tsx index cf0d6b90c..9efeb51b5 100644 --- a/src/components/features/chat/pending-user-messages.tsx +++ b/src/components/features/chat/pending-user-messages.tsx @@ -57,7 +57,7 @@ export function PendingUserMessages() { try { await send( createChatMessage( - message.content, + message.text, message.imageUrls, message.fileUrls, message.timestamp, @@ -83,7 +83,7 @@ export function PendingUserMessages() { key={message.id} type="user" message={message.text} - pendingStatus={message.status === "sent" ? undefined : message.status} + pendingStatus={message.status} onRetry={ message.status === "error" ? () => handleRetry(message.id) diff --git a/src/contexts/conversation-websocket-context.tsx b/src/contexts/conversation-websocket-context.tsx index e5ea96135..8e7f25be2 100644 --- a/src/contexts/conversation-websocket-context.tsx +++ b/src/contexts/conversation-websocket-context.tsx @@ -137,9 +137,6 @@ export function ConversationWebSocketProvider({ const consumeMatchingPendingMessage = useOptimisticUserMessageStore( (state) => state.consumeMatchingPendingMessage, ); - const markOldestSendingMessageSent = useOptimisticUserMessageStore( - (state) => state.markOldestSendingMessageSent, - ); const { setExecutionStatus } = useConversationStateStore(); const { appendInput, appendOutput } = useCommandStore(); @@ -458,12 +455,6 @@ export function ConversationWebSocketProvider({ // Clear draft from localStorage - message was successfully delivered setConversationState(conversationId, { draftMessage: null }); } - } else if ( - conversationId && - (event.source === "agent" || isConversationStateUpdateEvent(event)) - ) { - markOldestSendingMessageSent(conversationId); - setConversationState(conversationId, { draftMessage: null }); } // Handle cache invalidation for ActionEvent @@ -559,7 +550,6 @@ export function ConversationWebSocketProvider({ addEvent, setErrorMessage, consumeMatchingPendingMessage, - markOldestSendingMessageSent, queryClient, conversationId, setExecutionStatus, @@ -645,12 +635,6 @@ export function ConversationWebSocketProvider({ ); setConversationState(conversationId, { draftMessage: null }); } - } else if ( - conversationId && - (event.source === "agent" || isConversationStateUpdateEvent(event)) - ) { - markOldestSendingMessageSent(conversationId); - setConversationState(conversationId, { draftMessage: null }); } // Handle cache invalidation for ActionEvent @@ -740,7 +724,6 @@ export function ConversationWebSocketProvider({ expectedEventCountPlanning, setErrorMessage, consumeMatchingPendingMessage, - markOldestSendingMessageSent, queryClient, subConversations, conversationId, diff --git a/src/stores/optimistic-user-message-store.ts b/src/stores/optimistic-user-message-store.ts index 1d87d67e2..69490d617 100644 --- a/src/stores/optimistic-user-message-store.ts +++ b/src/stores/optimistic-user-message-store.ts @@ -1,6 +1,6 @@ import { create } from "zustand"; -export type PendingUserMessageStatus = "sending" | "sent" | "error"; +export type PendingUserMessageStatus = "sending" | "error"; /** * How long a pending message is allowed to stay in "sending" state before we @@ -64,8 +64,6 @@ interface OptimisticUserMessageActions { enqueuePendingMessage: (payload: EnqueuePendingMessagePayload) => string; /** Mark a pending message as failed (the API rejected it). */ markPendingMessageError: (id: string, errorMessage?: string) => void; - /** Mark a pending message as accepted by the server. */ - markPendingMessageSent: (id: string) => void; /** Mark a pending message as sending again (used when retrying). */ markPendingMessageSending: (id: string) => void; /** Drop a pending message from the queue (e.g., after success/cancellation). */ @@ -84,14 +82,6 @@ interface OptimisticUserMessageActions { conversationId: string, content: string, ) => PendingUserMessage | null; - /** - * Mark the oldest still-sending message in the conversation as accepted by - * the server. Used when the server streams agent work but does not echo the - * user message event on the live WebSocket path. - */ - markOldestSendingMessageSent: ( - conversationId: string, - ) => PendingUserMessage | null; /** Wipe all queued messages (e.g., when changing conversations). */ clearPendingMessages: () => void; /** @@ -160,15 +150,6 @@ export const useOptimisticUserMessageStore = create( ), })), - markPendingMessageSent: (id) => - set((state) => ({ - pendingMessages: state.pendingMessages.map((message) => - message.id === id - ? { ...message, status: "sent", errorMessage: undefined } - : message, - ), - })), - markPendingMessageSending: (id) => set((state) => ({ pendingMessages: state.pendingMessages.map((message) => @@ -196,15 +177,15 @@ export const useOptimisticUserMessageStore = create( // case. let consumed: PendingUserMessage | null = null; set((state) => { - const confirmable = state.pendingMessages + const sending = state.pendingMessages .map((m, i) => ({ m, i })) .filter( ({ m }) => - m.status !== "error" && m.conversationId === conversationId, + m.status === "sending" && m.conversationId === conversationId, ); - if (confirmable.length === 0) return state; - const exact = confirmable.find(({ m }) => m.content === content); - const target = exact ?? confirmable[0]; + if (sending.length === 0) return state; + const exact = sending.find(({ m }) => m.content === content); + const target = exact ?? sending[0]; consumed = target.m; return { pendingMessages: [ @@ -216,28 +197,6 @@ export const useOptimisticUserMessageStore = create( return consumed; }, - markOldestSendingMessageSent: (conversationId) => { - let marked: PendingUserMessage | null = null; - set((state) => { - const target = state.pendingMessages.find( - (message) => - message.status === "sending" && - message.conversationId === conversationId, - ); - if (!target) return state; - - marked = target; - return { - pendingMessages: state.pendingMessages.map((message) => - message.id === target.id - ? { ...message, status: "sent", errorMessage: undefined } - : message, - ), - }; - }); - return marked; - }, - clearPendingMessages: () => set(() => ({ ...initialState })), reassignPendingMessages: (fromConversationId, toConversationId) => diff --git a/src/utils/optimistic-user-message-events.ts b/src/utils/optimistic-user-message-events.ts deleted file mode 100644 index fae289f0c..000000000 --- a/src/utils/optimistic-user-message-events.ts +++ /dev/null @@ -1,68 +0,0 @@ -import { MessageEvent, OpenHandsEvent } from "#/types/agent-server/core"; -import { - PendingUserMessage, - PendingUserMessageStatus, -} from "#/stores/optimistic-user-message-store"; - -export type OptimisticUserMessageEvent = MessageEvent & { - optimisticPendingMessageId: string; - optimisticPendingStatus: PendingUserMessageStatus; - optimisticPendingErrorMessage?: string; -}; - -export const isOptimisticUserMessageEvent = ( - event: OpenHandsEvent, -): event is OptimisticUserMessageEvent => - "optimisticPendingMessageId" in event && - typeof event.optimisticPendingMessageId === "string"; - -const compareTimestamps = (a: string | undefined, b: string | undefined) => { - if (!a && !b) return 0; - if (!a) return 1; - if (!b) return -1; - return a.localeCompare(b); -}; - -const toOptimisticUserMessageEvent = ( - message: PendingUserMessage, -): OptimisticUserMessageEvent => ({ - id: message.id, - timestamp: message.timestamp, - source: "user", - llm_message: { - role: "user", - content: [ - { type: "text", text: message.text }, - ...(message.imageUrls.length > 0 - ? [{ type: "image" as const, image_urls: message.imageUrls }] - : []), - ], - }, - activated_microagents: [], - extended_content: [], - optimisticPendingMessageId: message.id, - optimisticPendingStatus: message.status, - optimisticPendingErrorMessage: message.errorMessage, -}); - -export const mergeOptimisticUserMessages = ( - events: OpenHandsEvent[], - pendingMessages: PendingUserMessage[], -): OpenHandsEvent[] => { - if (pendingMessages.length === 0) { - return events; - } - - const eventIds = new Set(events.map((event) => event.id)); - const optimisticEvents = pendingMessages - .filter((message) => !eventIds.has(message.id)) - .map(toOptimisticUserMessageEvent); - - if (optimisticEvents.length === 0) { - return events; - } - - return [...events, ...optimisticEvents].sort((a, b) => - compareTimestamps(a.timestamp, b.timestamp), - ); -}; From f01811659173f3f1e68760c2834542c1cdaf5c51 Mon Sep 17 00:00:00 2001 From: openhands Date: Sun, 24 May 2026 17:52:27 +0000 Subject: [PATCH 7/7] test: stabilize home page snapshot setup (#753) Co-authored-by: openhands --- tests/e2e/snapshots/settings-page.snapshot.spec.ts | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tests/e2e/snapshots/settings-page.snapshot.spec.ts b/tests/e2e/snapshots/settings-page.snapshot.spec.ts index 8190066e2..3197b392b 100644 --- a/tests/e2e/snapshots/settings-page.snapshot.spec.ts +++ b/tests/e2e/snapshots/settings-page.snapshot.spec.ts @@ -92,6 +92,14 @@ async function setupMocks(page: Page, showConsentModal = false) { }); }); + await page.route("**/api/workspaces**", async (route) => { + await route.fulfill({ + status: 200, + contentType: "application/json", + body: JSON.stringify({ workspaces: [], workspaceParents: [] }), + }); + }); + await page.route("**/api/bash/execute_bash_command", async (route) => { await route.fulfill({ status: 200, @@ -161,7 +169,7 @@ test.describe("UI Visual Snapshots", () => { await dismissConsentModal(page); const homeScreen = page.getByTestId("home-screen"); - await expect(homeScreen).toBeVisible(); + await expect(homeScreen).toBeVisible({ timeout: 15_000 }); await page.waitForLoadState("networkidle"); const rootLayout = page.getByTestId("root-layout");