Skip to content

Commit 628da37

Browse files
committed
fix(chat): prevent useChat resume from hanging on completed turns
Add isStreaming flag to session state — set true when streaming starts, false on turn-complete. reconnectToStream returns null immediately when isStreaming is false, so resume: true is safe to pass unconditionally.
1 parent 9604024 commit 628da37

File tree

2 files changed

+183
-10
lines changed

2 files changed

+183
-10
lines changed

packages/trigger-sdk/src/v3/chat.test.ts

Lines changed: 160 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,145 @@ describe("TriggerChatTransport", () => {
502502

503503
expect(receivedChunks.length).toBeGreaterThan(0);
504504
});
505+
506+
it("should return null when session exists but isStreaming is false (TRI-8557)", async () => {
507+
// Simulate a session restored from DB after a completed turn
508+
const transport = new TriggerChatTransport({
509+
task: "my-task",
510+
accessToken: "token",
511+
sessions: {
512+
"chat-completed": {
513+
runId: "run_completed",
514+
publicAccessToken: "pub_token",
515+
lastEventId: "42",
516+
isStreaming: false,
517+
},
518+
},
519+
});
520+
521+
// reconnectToStream should return null immediately — no hanging
522+
const result = await transport.reconnectToStream({
523+
chatId: "chat-completed",
524+
});
525+
526+
expect(result).toBeNull();
527+
});
528+
529+
it("should reconnect when session exists and isStreaming is true", async () => {
530+
global.fetch = vi.fn().mockImplementation(async (url: string | URL) => {
531+
const urlStr = typeof url === "string" ? url : url.toString();
532+
533+
if (urlStr.includes("/realtime/v1/streams/")) {
534+
const chunks: UIMessageChunk[] = [
535+
{ type: "text-start", id: "part-1" },
536+
{ type: "text-delta", id: "part-1", delta: "Resumed!" },
537+
{ type: "text-end", id: "part-1" },
538+
];
539+
return new Response(createSSEStream(sseEncode(chunks)), {
540+
status: 200,
541+
headers: {
542+
"content-type": "text/event-stream",
543+
"X-Stream-Version": "v1",
544+
},
545+
});
546+
}
547+
548+
throw new Error(`Unexpected fetch URL: ${urlStr}`);
549+
});
550+
551+
const transport = new TriggerChatTransport({
552+
task: "my-task",
553+
accessToken: "token",
554+
baseURL: "https://api.test.trigger.dev",
555+
sessions: {
556+
"chat-streaming": {
557+
runId: "run_streaming",
558+
publicAccessToken: "pub_token",
559+
lastEventId: "10",
560+
isStreaming: true,
561+
},
562+
},
563+
});
564+
565+
const stream = await transport.reconnectToStream({
566+
chatId: "chat-streaming",
567+
});
568+
569+
expect(stream).toBeInstanceOf(ReadableStream);
570+
});
571+
572+
it("should set isStreaming to false via onSessionChange when turn completes", async () => {
573+
const sessionChanges: Array<{
574+
chatId: string;
575+
session: { isStreaming?: boolean } | null;
576+
}> = [];
577+
578+
global.fetch = vi.fn().mockImplementation(async (url: string | URL) => {
579+
const urlStr = typeof url === "string" ? url : url.toString();
580+
581+
if (urlStr.includes("/trigger")) {
582+
return new Response(JSON.stringify({ id: "run_streaming_flag" }), {
583+
status: 200,
584+
headers: {
585+
"content-type": "application/json",
586+
"x-trigger-jwt": "pub_token",
587+
},
588+
});
589+
}
590+
591+
if (urlStr.includes("/realtime/v1/streams/")) {
592+
const chunks = [
593+
{ type: "text-start", id: "part-1" },
594+
{ type: "text-delta", id: "part-1", delta: "Hi" },
595+
{ type: "text-end", id: "part-1" },
596+
{ type: "trigger:turn-complete", publicAccessToken: "refreshed_token" },
597+
];
598+
return new Response(createSSEStream(sseEncode(chunks)), {
599+
status: 200,
600+
headers: {
601+
"content-type": "text/event-stream",
602+
"X-Stream-Version": "v1",
603+
},
604+
});
605+
}
606+
607+
throw new Error(`Unexpected fetch URL: ${urlStr}`);
608+
});
609+
610+
const transport = new TriggerChatTransport({
611+
task: "my-task",
612+
accessToken: "token",
613+
baseURL: "https://api.test.trigger.dev",
614+
onSessionChange: (chatId, session) => {
615+
sessionChanges.push({ chatId, session });
616+
},
617+
});
618+
619+
const stream = await transport.sendMessages({
620+
trigger: "submit-message",
621+
chatId: "chat-flag-test",
622+
messageId: undefined,
623+
messages: [createUserMessage("Hello")],
624+
abortSignal: undefined,
625+
});
626+
627+
// Drain the stream
628+
const reader = stream.getReader();
629+
while (true) {
630+
const { done } = await reader.read();
631+
if (done) break;
632+
}
633+
634+
// Find the session changes for this chat
635+
const changes = sessionChanges.filter((c) => c.chatId === "chat-flag-test");
636+
637+
// First change: session created with isStreaming: true
638+
expect(changes[0]?.session?.isStreaming).toBe(true);
639+
640+
// Last change: turn completed, isStreaming: false
641+
const lastChange = changes[changes.length - 1];
642+
expect(lastChange?.session?.isStreaming).toBe(false);
643+
});
505644
});
506645

507646
describe("renewRunAccessToken", () => {
@@ -651,6 +790,15 @@ describe("TriggerChatTransport", () => {
651790
if (done) break;
652791
}
653792

793+
// Simulate mid-stream state (isStreaming must be true for reconnect to attempt)
794+
const session = transport.getSession("chat-fail-renew");
795+
transport.setOnSessionChange(undefined); // prevent side-effects
796+
// Re-seed with isStreaming: true to simulate reconnect during an active turn
797+
(transport as any).sessions.set("chat-fail-renew", {
798+
...session,
799+
isStreaming: true,
800+
});
801+
654802
const stream = await transport.reconnectToStream({ chatId: "chat-fail-renew" });
655803
const reader = stream!.getReader();
656804
await expect(reader.read()).rejects.toMatchObject({ status: 401 });
@@ -1013,13 +1161,21 @@ describe("TriggerChatTransport", () => {
10131161
const r2 = s2.getReader();
10141162
while (!(await r2.read()).done) {}
10151163

1016-
// Both sessions should be independently reconnectable
1164+
// Both sessions should exist but not be reconnectable (turns completed)
1165+
const sessionA = transport.getSession("session-a");
1166+
const sessionB = transport.getSession("session-b");
1167+
expect(sessionA).toBeDefined();
1168+
expect(sessionB).toBeDefined();
1169+
expect(sessionA!.isStreaming).toBe(false);
1170+
expect(sessionB!.isStreaming).toBe(false);
1171+
1172+
// Completed turns return null on reconnect (TRI-8557 fix)
10171173
const streamA = await transport.reconnectToStream({ chatId: "session-a" });
10181174
const streamB = await transport.reconnectToStream({ chatId: "session-b" });
10191175
const streamC = await transport.reconnectToStream({ chatId: "nonexistent" });
10201176

1021-
expect(streamA).toBeInstanceOf(ReadableStream);
1022-
expect(streamB).toBeInstanceOf(ReadableStream);
1177+
expect(streamA).toBeNull();
1178+
expect(streamB).toBeNull();
10231179
expect(streamC).toBeNull();
10241180
});
10251181
});
@@ -2060,6 +2216,7 @@ describe("TriggerChatTransport", () => {
20602216
runId: triggerRunId,
20612217
publicAccessToken: publicToken,
20622218
lastEventId: undefined,
2219+
isStreaming: true,
20632220
});
20642221

20652222
// Consume stream

packages/trigger-sdk/src/v3/chat.ts

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ type TriggerChatTransportOptionsBase<TClientData = unknown> = {
173173
* });
174174
* ```
175175
*/
176-
sessions?: Record<string, { runId: string; publicAccessToken: string; lastEventId?: string }>;
176+
sessions?: Record<string, { runId: string; publicAccessToken: string; lastEventId?: string; isStreaming?: boolean }>;
177177

178178
/**
179179
* Called whenever a chat session's state changes.
@@ -203,7 +203,7 @@ type TriggerChatTransportOptionsBase<TClientData = unknown> = {
203203
*/
204204
onSessionChange?: (
205205
chatId: string,
206-
session: { runId: string; publicAccessToken: string; lastEventId?: string } | null
206+
session: { runId: string; publicAccessToken: string; lastEventId?: string; isStreaming?: boolean } | null
207207
) => void;
208208

209209
/**
@@ -336,6 +336,8 @@ type ChatSessionState = {
336336
lastEventId?: string;
337337
/** Set when the stream was aborted mid-turn (stop). On reconnect, skip chunks until trigger:turn-complete. */
338338
skipToTurnComplete?: boolean;
339+
/** Whether the agent is currently streaming a response. Set on first chunk, cleared on turn-complete. */
340+
isStreaming?: boolean;
339341
};
340342

341343
/**
@@ -385,7 +387,7 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
385387
private _onSessionChange:
386388
| ((
387389
chatId: string,
388-
session: { runId: string; publicAccessToken: string; lastEventId?: string } | null
390+
session: { runId: string; publicAccessToken: string; lastEventId?: string; isStreaming?: boolean } | null
389391
) => void)
390392
| undefined;
391393

@@ -428,6 +430,7 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
428430
runId: session.runId,
429431
publicAccessToken: session.publicAccessToken,
430432
lastEventId: session.lastEventId,
433+
isStreaming: session.isStreaming,
431434
});
432435
}
433436
}
@@ -515,6 +518,9 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
515518
this.activeStreams.delete(chatId);
516519
}
517520

521+
currentSession.isStreaming = true;
522+
this.notifySessionChange(chatId, currentSession);
523+
518524
return this.subscribeToStream(
519525
currentSession.runId,
520526
currentSession.publicAccessToken,
@@ -534,7 +540,7 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
534540

535541
const { runId, publicAccessToken } = await this.triggerNewRun(chatId, triggerPayload, "trigger");
536542

537-
const newSession: ChatSessionState = { runId, publicAccessToken };
543+
const newSession: ChatSessionState = { runId, publicAccessToken, isStreaming: true };
538544
this.sessions.set(chatId, newSession);
539545
this.notifySessionChange(chatId, newSession);
540546
return this.subscribeToStream(runId, publicAccessToken, abortSignal, chatId, {
@@ -617,6 +623,12 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
617623
return null;
618624
}
619625

626+
// No active stream — the last turn completed before the page refreshed.
627+
// Return null so useChat settles into "ready" state instead of hanging.
628+
if (!session.isStreaming) {
629+
return null;
630+
}
631+
620632
// Deduplicate: if there's already an active stream for this chatId,
621633
// return null so the second caller no-ops.
622634
if (this.activeStreams.has(options.chatId)) {
@@ -790,13 +802,14 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
790802
*/
791803
getSession = (
792804
chatId: string
793-
): { runId: string; publicAccessToken: string; lastEventId?: string } | undefined => {
805+
): { runId: string; publicAccessToken: string; lastEventId?: string; isStreaming?: boolean } | undefined => {
794806
const session = this.sessions.get(chatId);
795807
if (!session) return undefined;
796808
return {
797809
runId: session.runId,
798810
publicAccessToken: session.publicAccessToken,
799811
lastEventId: session.lastEventId,
812+
isStreaming: session.isStreaming,
800813
};
801814
};
802815

@@ -808,7 +821,7 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
808821
callback:
809822
| ((
810823
chatId: string,
811-
session: { runId: string; publicAccessToken: string; lastEventId?: string } | null
824+
session: { runId: string; publicAccessToken: string; lastEventId?: string; isStreaming?: boolean } | null
812825
) => void)
813826
| undefined
814827
): void {
@@ -966,6 +979,7 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
966979
runId: session.runId,
967980
publicAccessToken: session.publicAccessToken,
968981
lastEventId: session.lastEventId,
982+
isStreaming: session.isStreaming,
969983
});
970984
} else {
971985
this._onSessionChange(chatId, null);
@@ -1212,8 +1226,10 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
12121226
if (session && typeof chunk.publicAccessToken === "string") {
12131227
session.publicAccessToken = chunk.publicAccessToken;
12141228
}
1215-
// Notify with updated session (including refreshed token)
1229+
// Mark streaming as complete so reconnectToStream doesn't
1230+
// hang on page refresh when no turn is in-flight.
12161231
if (session) {
1232+
session.isStreaming = false;
12171233
this.notifySessionChange(chatId, session);
12181234
}
12191235

0 commit comments

Comments
 (0)