Skip to content

Commit f65ca69

Browse files
feat: refactor streaming to reduce complexity (pure front-end) (#91)
Co-authored-by: Cursor Agent <cursoragent@cursor.com>
1 parent 11a51df commit f65ca69

51 files changed

Lines changed: 10239 additions & 935 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 389 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,389 @@
1+
# Streaming Architecture Documentation
2+
3+
This document describes the refactored streaming architecture for the PaperDebugger chat system.
4+
5+
## Overview
6+
7+
The streaming system handles real-time message delivery from the server to the client, managing state transitions, error recovery, and UI updates. The architecture has been redesigned to be more maintainable, testable, and extensible.
8+
9+
## Architecture Diagram
10+
11+
```
12+
┌────────────────────────────────────────────────────────────────────────────┐
13+
│ useSendMessageStream Hook │
14+
│ (Orchestrator - Single Responsibility) │
15+
│ - Builds stream requests using buildStreamRequest() │
16+
│ - Maps responses using mapResponseToStreamEvent() │
17+
│ - Delegates event handling to state machine │
18+
└─────────────────────────────────┬──────────────────────────────────────────┘
19+
20+
21+
┌────────────────────────────────────────────────────────────────────────────┐
22+
│ StreamingStateMachine │
23+
│ (Zustand Store + Event Handler) │
24+
│ State: idle | receiving | finalizing | error │
25+
│ Actions: handleEvent(), reset() │
26+
│ Data: streamingMessage, incompleteIndicator │
27+
└─────────────────────────────────┬──────────────────────────────────────────┘
28+
29+
┌────────────────────────┼────────────────────────┐
30+
│ │ │
31+
▼ ▼ ▼
32+
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────────┐
33+
│ MessageType │ │ Error │ │ Conversation │
34+
│ Handlers │ │ Handler │ │ Store │
35+
│ (Registry) │ │ (Recovery) │ │ (Persistence) │
36+
└─────────────────┘ └──────────────────┘ └─────────────────────┘
37+
│ │ │
38+
│ │ │
39+
└────────────────────────┼────────────────────────┘
40+
41+
42+
┌────────────────────────────────────────────────────────────────────────────┐
43+
│ MessageStore │
44+
│ (Unified Display Messages) │
45+
│ - Subscribes to ConversationStore (finalized messages) │
46+
│ - Subscribes to StreamingStateMachine (streaming messages) │
47+
│ - Provides getAllDisplayMessages() for UI components │
48+
└─────────────────────────────────┬──────────────────────────────────────────┘
49+
50+
51+
┌────────────────────────────────────────────────────────────────────────────┐
52+
│ UI Components │
53+
│ (ChatBody, MessageCard, etc.) │
54+
│ - Consume DisplayMessage type directly │
55+
│ - No knowledge of streaming internals │
56+
└────────────────────────────────────────────────────────────────────────────┘
57+
```
58+
59+
## Core Components
60+
61+
### 1. StreamingStateMachine (`stores/streaming/streaming-state-machine.ts`)
62+
63+
The central hub for all streaming state management. Implements a state machine pattern with the following states:
64+
65+
| State | Description | Valid Transitions |
66+
|-------|-------------|-------------------|
67+
| `idle` | No active stream |`receiving` (on INIT) |
68+
| `receiving` | Actively receiving stream data |`finalizing`, `error` |
69+
| `finalizing` | Flushing data to conversation store |`idle` |
70+
| `error` | Error occurred during streaming |`idle` (on reset) |
71+
72+
#### Event Types
73+
74+
```typescript
75+
type StreamEvent =
76+
| { type: "INIT"; payload: StreamInitialization }
77+
| { type: "PART_BEGIN"; payload: StreamPartBegin }
78+
| { type: "CHUNK"; payload: MessageChunk }
79+
| { type: "REASONING_CHUNK"; payload: ReasoningChunk }
80+
| { type: "PART_END"; payload: StreamPartEnd }
81+
| { type: "FINALIZE"; payload: StreamFinalization }
82+
| { type: "ERROR"; payload: StreamError }
83+
| { type: "INCOMPLETE"; payload: IncompleteIndicator }
84+
| { type: "CONNECTION_ERROR"; payload: Error };
85+
```
86+
87+
### 2. MessageTypeHandlers (`stores/streaming/message-type-handlers.ts`)
88+
89+
A registry of handlers for different message types. Adding a new message type only requires:
90+
1. Creating a new handler class implementing `MessageTypeHandler`
91+
2. Registering it in the `messageTypeHandlers` registry
92+
93+
Available handlers:
94+
- `AssistantHandler` - Handles assistant messages
95+
- `ToolCallPrepareHandler` - Handles tool call argument streaming
96+
- `ToolCallHandler` - Handles completed tool calls
97+
- `NoOpHandler` - For types that don't require streaming handling
98+
99+
### 3. ErrorHandler (`stores/streaming/error-handler.ts`)
100+
101+
Centralized error handling with configurable recovery strategies:
102+
103+
| Error Code | Strategy | Behavior |
104+
|------------|----------|----------|
105+
| `PROJECT_OUT_OF_DATE` | sync-and-retry | Sync project, then retry |
106+
| `NETWORK_ERROR` | retry | Exponential backoff, 3 attempts |
107+
| `TIMEOUT` | retry | Linear backoff, 2 attempts |
108+
| `RATE_LIMITED` | retry | Exponential backoff, 3 attempts |
109+
| `SERVER_ERROR` | retry | Exponential backoff, 2 attempts |
110+
| `INVALID_RESPONSE` | show-error | Display error toast |
111+
| `AUTHENTICATION_ERROR` | show-error | Display error, require re-auth |
112+
| `UNKNOWN` | show-error | Display generic error |
113+
114+
### 4. MessageStore (`stores/message-store.ts`)
115+
116+
Unified store that combines finalized and streaming messages:
117+
118+
```typescript
119+
interface MessageStore {
120+
// State
121+
messages: Message[]; // Finalized from API
122+
streamingEntries: InternalMessage[]; // Currently streaming
123+
124+
// Computed
125+
getAllDisplayMessages(): DisplayMessage[];
126+
getVisibleDisplayMessages(): DisplayMessage[];
127+
128+
// Helpers
129+
hasStreamingMessages(): boolean;
130+
isWaitingForResponse(): boolean;
131+
}
132+
```
133+
134+
## Data Types
135+
136+
### InternalMessage
137+
138+
The canonical internal message format used throughout the application:
139+
140+
```typescript
141+
type InternalMessage =
142+
| UserMessage
143+
| AssistantMessage
144+
| ToolCallMessage
145+
| ToolCallPrepareMessage
146+
| SystemMessage
147+
| UnknownMessage;
148+
149+
interface UserMessage {
150+
id: string;
151+
role: "user";
152+
status: MessageStatus;
153+
data: {
154+
content: string;
155+
selectedText?: string;
156+
surrounding?: string;
157+
};
158+
}
159+
160+
// Similar structures for other message types...
161+
```
162+
163+
### DisplayMessage
164+
165+
UI-friendly message format:
166+
167+
```typescript
168+
interface DisplayMessage {
169+
id: string;
170+
type: "user" | "assistant" | "toolCall" | "toolCallPrepare" | "error";
171+
status: "streaming" | "complete" | "error" | "stale";
172+
content: string;
173+
// Role-specific optional fields
174+
selectedText?: string;
175+
reasoning?: string;
176+
toolName?: string;
177+
toolArgs?: string;
178+
toolResult?: string;
179+
toolError?: string;
180+
}
181+
```
182+
183+
## Data Flow
184+
185+
### Happy Path: User Message → Response
186+
187+
```
188+
1. User submits message
189+
└── useSendMessageStream.sendMessageStream(message, selectedText)
190+
└── buildStreamRequest() → API request
191+
192+
2. User message added to streaming state
193+
└── StreamingStateMachine.setState({ streamingMessage: { parts: [userMessage] } })
194+
195+
3. API stream begins
196+
└── Server sends: streamInitialization
197+
└── INIT event → Finalize user message, flush to conversation
198+
199+
4. Assistant response streams
200+
└── Server sends: streamPartBegin (assistant)
201+
└── PART_BEGIN event → Create streaming assistant message
202+
└── Server sends: messageChunk (delta: "Hello")
203+
└── CHUNK event → Append to assistant content
204+
└── Server sends: messageChunk (delta: " World")
205+
└── CHUNK event → Append to assistant content
206+
└── Server sends: streamPartEnd (assistant)
207+
└── PART_END event → Mark as complete
208+
209+
5. Stream completes
210+
└── Server sends: streamFinalization
211+
└── FINALIZE event → Flush to conversation, reset streaming state
212+
```
213+
214+
### Error Recovery Flow
215+
216+
```
217+
1. Error occurs during streaming
218+
└── Server sends: streamError or connection fails
219+
└── ERROR/CONNECTION_ERROR event
220+
221+
2. ErrorHandler evaluates strategy
222+
└── createStreamingError() → Categorize error
223+
└── getRecoveryStrategy() → Determine recovery approach
224+
225+
3. Execute recovery
226+
└── retry: Attempt operation again with backoff
227+
└── sync-and-retry: Sync project first, then retry
228+
└── show-error: Display toast to user
229+
└── abort: Stop processing
230+
```
231+
232+
## File Structure
233+
234+
```
235+
src/
236+
├── stores/
237+
│ ├── streaming/
238+
│ │ ├── index.ts # Module exports
239+
│ │ ├── types.ts # Type definitions
240+
│ │ ├── streaming-state-machine.ts # Main state machine
241+
│ │ ├── message-type-handlers.ts # Handler registry
242+
│ │ ├── error-handler.ts # Error handling
243+
│ │ └── __tests__/ # Unit tests
244+
│ │ ├── streaming-state-machine.test.ts
245+
│ │ ├── message-type-handlers.test.ts
246+
│ │ └── error-handler.test.ts
247+
│ ├── message-store.ts # Unified message store
248+
│ ├── converters.ts # Store-level converters
249+
│ └── types.ts # Store types (DisplayMessage)
250+
├── types/
251+
│ └── message.ts # InternalMessage definitions
252+
├── utils/
253+
│ ├── message-converters.ts # API ↔ Internal converters
254+
│ ├── stream-request-builder.ts # Request building
255+
│ ├── stream-event-mapper.ts # Response → Event mapping
256+
│ └── __tests__/
257+
│ └── message-converters.test.ts
258+
├── hooks/
259+
│ └── useSendMessageStream.ts # Main orchestration hook
260+
└── __tests__/
261+
└── streaming-flow.integration.test.ts
262+
```
263+
264+
## Extension Points
265+
266+
### Adding a New Message Type
267+
268+
1. Define the type in `types/message.ts`:
269+
```typescript
270+
export interface NewMessageData {
271+
// type-specific data
272+
}
273+
274+
export interface NewMessage extends InternalMessageBase {
275+
role: "newType";
276+
data: NewMessageData;
277+
}
278+
279+
// Update InternalMessage union
280+
export type InternalMessage = ... | NewMessage;
281+
```
282+
283+
2. Create handler in `message-type-handlers.ts`:
284+
```typescript
285+
class NewTypeHandler implements MessageTypeHandler {
286+
onPartBegin(partBegin: StreamPartBegin): InternalMessage | null {
287+
// Create streaming message
288+
}
289+
290+
onPartEnd(partEnd: StreamPartEnd, existing: InternalMessage): InternalMessage | null {
291+
// Finalize message
292+
}
293+
}
294+
295+
// Register in messageTypeHandlers
296+
```
297+
298+
3. Add converters in `utils/message-converters.ts`:
299+
```typescript
300+
// In fromApiMessage()
301+
case "newType":
302+
return { /* conversion */ };
303+
304+
// In toApiMessage()
305+
case "newType":
306+
return fromJson(MessageSchema, { /* conversion */ });
307+
```
308+
309+
### Adding a New Error Type
310+
311+
1. Add error code in `stores/streaming/types.ts`:
312+
```typescript
313+
export type StreamingErrorCode =
314+
| ...
315+
| "NEW_ERROR_TYPE";
316+
```
317+
318+
2. Add detection in `error-handler.ts`:
319+
```typescript
320+
function detectErrorCodeFromMessage(message: string): StreamingErrorCode {
321+
if (message.includes("new error pattern")) {
322+
return "NEW_ERROR_TYPE";
323+
}
324+
// ...
325+
}
326+
```
327+
328+
3. Configure recovery strategy:
329+
```typescript
330+
const DEFAULT_STRATEGIES: Record<StreamingErrorCode, RecoveryStrategy> = {
331+
NEW_ERROR_TYPE: {
332+
type: "retry",
333+
maxAttempts: 2,
334+
backoff: "exponential",
335+
delayMs: 1000,
336+
},
337+
// ...
338+
};
339+
```
340+
341+
## Testing
342+
343+
### Running Tests
344+
345+
```bash
346+
# Run all tests
347+
bun test
348+
349+
# Run specific test file
350+
bun test src/stores/streaming/__tests__/streaming-state-machine.test.ts
351+
352+
# Run with watch mode
353+
bun test --watch
354+
```
355+
356+
### Test Coverage
357+
358+
- **Unit Tests**: State machine, handlers, error handler, converters
359+
- **Integration Tests**: Complete streaming flows, error scenarios
360+
361+
## Performance Considerations
362+
363+
1. **Sequence Numbers**: Each update increments a sequence number, allowing React to detect changes efficiently.
364+
365+
2. **Computed Selectors**: `getAllDisplayMessages()` is computed and cached, only recomputing when source data changes.
366+
367+
3. **Subscription-based Updates**: MessageStore subscribes to source stores, avoiding polling.
368+
369+
4. **No flushSync**: The architecture uses natural React batching, eliminating the need for `flushSync`.
370+
371+
## Troubleshooting
372+
373+
### Messages not appearing
374+
375+
1. Check that the state machine is receiving events (add logging to `handleEvent`)
376+
2. Verify MessageStore subscriptions are initialized
377+
3. Check DisplayMessage conversion in `toDisplayMessage()`
378+
379+
### Stale messages after error
380+
381+
1. Verify error event is being dispatched
382+
2. Check that the error handler is marking messages as stale
383+
3. Ensure UI is properly rendering stale status
384+
385+
### Retry not working
386+
387+
1. Check error code detection in `createStreamingError()`
388+
2. Verify recovery strategy is configured for the error type
389+
3. Ensure sync/retry callbacks are provided to error handler

0 commit comments

Comments
 (0)