Skip to content

Commit 17e18f4

Browse files
committed
feat(chat): add chat.response API for persistent data parts, transient flag support
1 parent f99f87a commit 17e18f4

File tree

2 files changed

+158
-5
lines changed
  • packages/trigger-sdk/src/v3
  • references/ai-chat/src/trigger

2 files changed

+158
-5
lines changed

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 137 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,45 @@ export { CHAT_MESSAGES_STREAM_ID, CHAT_STOP_STREAM_ID };
452452
*/
453453
const chatStream = streams.define<UIMessageChunk>({ id: _CHAT_STREAM_KEY });
454454

455+
// ---------------------------------------------------------------------------
456+
// chat.response — write data parts that persist to the response message
457+
// ---------------------------------------------------------------------------
458+
459+
/**
460+
* Write data parts that both stream to the frontend AND persist in
461+
* `onTurnComplete`'s `responseMessage` and `uiMessages`.
462+
*
463+
* Non-transient data chunks (`type` starts with `data-`, no `transient: true`)
464+
* are queued for accumulation into the assistant response message.
465+
* Transient or non-data chunks are streamed only (same as `chat.stream`).
466+
*
467+
* @example
468+
* ```ts
469+
* // Persists to responseMessage.parts
470+
* chat.response.write({ type: "data-handover", data: { context: summary } });
471+
*
472+
* // Transient — streams only, not in responseMessage
473+
* chat.response.write({ type: "data-progress", data: { percent: 50 }, transient: true });
474+
* ```
475+
*/
476+
const chatResponse = {
477+
/**
478+
* Write a single chunk. Non-transient data parts are accumulated into the
479+
* response message; everything else is stream-only.
480+
*/
481+
write(part: UIMessageChunk): void {
482+
queueResponsePart(part);
483+
const { waitUntilComplete } = streams.writer(CHAT_STREAM_KEY, {
484+
spanName: "chat.response.write",
485+
collapsed: true,
486+
execute: ({ write }) => {
487+
write(part);
488+
},
489+
});
490+
waitUntilComplete().catch(() => {});
491+
},
492+
};
493+
455494
// ---------------------------------------------------------------------------
456495
// ChatWriter — stream writer for callbacks
457496
// ---------------------------------------------------------------------------
@@ -516,6 +555,7 @@ function createLazyChatWriter(): { writer: ChatWriter; flush: () => Promise<void
516555
writer: {
517556
write(part: UIMessageChunk) {
518557
ensureInitialized();
558+
queueResponsePart(part);
519559
writeImpl!(part);
520560
},
521561
merge(stream: ReadableStream<UIMessageChunk>) {
@@ -963,6 +1003,30 @@ const chatPendingMessagesKey = locals.create<PendingMessagesOptions>("chat.pendi
9631003
const chatSteeringQueueKey = locals.create<SteeringQueueEntry[]>("chat.steeringQueue");
9641004
/** @internal — IDs of messages that were successfully injected via prepareStep */
9651005
const chatInjectedMessageIdsKey = locals.create<Set<string>>("chat.injectedMessageIds");
1006+
/** @internal — non-transient data parts queued via chat.response or writer.write() for accumulation into the response message */
1007+
const chatResponsePartsKey = locals.create<unknown[]>("chat.responseParts");
1008+
1009+
/**
1010+
* Check if a chunk is a non-transient data part that should persist to the response message.
1011+
* @internal
1012+
*/
1013+
function isNonTransientDataPart(part: unknown): boolean {
1014+
if (typeof part !== "object" || part === null) return false;
1015+
const p = part as Record<string, unknown>;
1016+
return typeof p.type === "string" && p.type.startsWith("data-") && p.transient !== true;
1017+
}
1018+
1019+
/**
1020+
* Queue a chunk for accumulation into the response message (if it's a non-transient data part).
1021+
* Called by `chat.response.write()` and `ChatWriter.write()`.
1022+
* @internal
1023+
*/
1024+
function queueResponsePart(part: unknown): void {
1025+
if (!isNonTransientDataPart(part)) return;
1026+
const parts = locals.get(chatResponsePartsKey) ?? [];
1027+
parts.push(part);
1028+
locals.set(chatResponsePartsKey, parts);
1029+
}
9661030

9671031
/**
9681032
* Event passed to the `prepareMessages` hook.
@@ -1272,6 +1336,7 @@ async function chatCompact(
12721336
type: "data-compaction",
12731337
id: compactionId,
12741338
data: { status: "compacting", totalTokens },
1339+
transient: true,
12751340
});
12761341

12771342
// Generate summary
@@ -1317,6 +1382,7 @@ async function chatCompact(
13171382
type: "data-compaction",
13181383
id: compactionId,
13191384
data: { status: "complete", totalTokens },
1385+
transient: true,
13201386
});
13211387
write({ type: "finish-step" });
13221388
},
@@ -2889,6 +2955,7 @@ function chatAgent<
28892955
locals.set(chatDeferKey, new Set());
28902956
locals.set(chatCompactionStateKey, undefined);
28912957
locals.set(chatSteeringQueueKey, []);
2958+
locals.set(chatResponsePartsKey, []);
28922959
// NOTE: chatBackgroundQueueKey is NOT reset here — messages injected
28932960
// by deferred work from the previous turn's onTurnComplete need to
28942961
// survive into the next turn. The queue is drained before run().
@@ -3358,6 +3425,15 @@ function chatAgent<
33583425
if (!capturedResponseMessage.id) {
33593426
capturedResponseMessage = { ...capturedResponseMessage, id: generateMessageId() };
33603427
}
3428+
// Append any non-transient data parts queued via chat.response or writer.write()
3429+
const queuedParts = locals.get(chatResponsePartsKey);
3430+
if (queuedParts && queuedParts.length > 0) {
3431+
capturedResponseMessage = {
3432+
...capturedResponseMessage,
3433+
parts: [...capturedResponseMessage.parts, ...queuedParts],
3434+
} as TUIMessage;
3435+
locals.set(chatResponsePartsKey, []);
3436+
}
33613437
accumulatedUIMessages.push(capturedResponseMessage);
33623438
turnNewUIMessages.push(capturedResponseMessage);
33633439
try {
@@ -3370,10 +3446,21 @@ function chatAgent<
33703446
// Conversion failed — skip accumulation for this turn
33713447
}
33723448
}
3373-
// TODO: When the user calls `pipeChat` manually instead of returning a
3374-
// StreamTextResult, we don't have access to onFinish. A future iteration
3375-
// should let manual-mode users report back response messages for
3376-
// accumulation (e.g. via a `chat.addMessages()` helper).
3449+
// If there's no captured response (manual pipe mode) but there are
3450+
// queued data parts, create a minimal response message to hold them.
3451+
if (!capturedResponseMessage) {
3452+
const remainingParts = locals.get(chatResponsePartsKey);
3453+
if (remainingParts && remainingParts.length > 0) {
3454+
capturedResponseMessage = {
3455+
id: generateMessageId(),
3456+
role: "assistant" as const,
3457+
parts: [...remainingParts],
3458+
} as TUIMessage;
3459+
locals.set(chatResponsePartsKey, []);
3460+
accumulatedUIMessages.push(capturedResponseMessage);
3461+
turnNewUIMessages.push(capturedResponseMessage);
3462+
}
3463+
}
33773464

33783465
if (runSignal.aborted) return "exit";
33793466

@@ -3422,6 +3509,7 @@ function chatAgent<
34223509
type: "data-compaction",
34233510
id: compactionId,
34243511
data: { status: "compacting", totalTokens: turnUsage.totalTokens },
3512+
transient: true,
34253513
});
34263514

34273515
const summary = await outerCompaction.summarize({
@@ -3493,6 +3581,7 @@ function chatAgent<
34933581
type: "data-compaction",
34943582
id: compactionId,
34953583
data: { status: "complete", totalTokens: turnUsage.totalTokens },
3584+
transient: true,
34963585
});
34973586
},
34983587
});
@@ -3576,6 +3665,23 @@ function chatAgent<
35763665
);
35773666
}
35783667

3668+
// Drain any late response parts added during onBeforeTurnComplete
3669+
const lateParts = locals.get(chatResponsePartsKey);
3670+
if (lateParts && lateParts.length > 0 && capturedResponseMessage) {
3671+
const idx = accumulatedUIMessages.findIndex((m) => m.id === capturedResponseMessage!.id);
3672+
if (idx !== -1) {
3673+
const msg = accumulatedUIMessages[idx]!;
3674+
accumulatedUIMessages[idx] = {
3675+
...msg,
3676+
parts: [...(msg.parts ?? []), ...lateParts],
3677+
} as TUIMessage;
3678+
capturedResponseMessage = accumulatedUIMessages[idx] as TUIMessage;
3679+
turnCompleteEvent.responseMessage = capturedResponseMessage;
3680+
turnCompleteEvent.uiMessages = accumulatedUIMessages;
3681+
}
3682+
locals.set(chatResponsePartsKey, []);
3683+
}
3684+
35793685
// Write turn-complete control chunk — closes the frontend stream.
35803686
const turnCompleteResult = await writeTurnCompleteChunk(
35813687
currentWirePayload.chatId,
@@ -4986,6 +5092,8 @@ function createChatSession(
49865092
// Reset stop signal for this turn
49875093
stop.reset();
49885094

5095+
// Reset per-turn state
5096+
locals.set(chatResponsePartsKey, []);
49895097
// Set up steering queue and pending messages config in locals
49905098
// so toStreamTextOptions() auto-injects prepareStep for steering
49915099
const turnSteeringQueue: SteeringQueueEntry[] = [];
@@ -5092,7 +5200,24 @@ function createChatSession(
50925200
stop.signal.aborted && !runSignal.aborted
50935201
? cleanupAbortedParts(response)
50945202
: response;
5203+
// Append any non-transient data parts queued via chat.response or writer.write()
5204+
const queuedParts = locals.get(chatResponsePartsKey);
5205+
if (queuedParts && queuedParts.length > 0) {
5206+
(cleaned as any).parts = [...(cleaned.parts ?? []), ...queuedParts];
5207+
locals.set(chatResponsePartsKey, []);
5208+
}
50955209
await accumulator.addResponse(cleaned);
5210+
} else {
5211+
// No response (manual pipe mode) but there are queued data parts
5212+
const queuedParts = locals.get(chatResponsePartsKey);
5213+
if (queuedParts && queuedParts.length > 0) {
5214+
await accumulator.addResponse({
5215+
id: generateMessageId(),
5216+
role: "assistant" as const,
5217+
parts: queuedParts as UIMessage["parts"],
5218+
});
5219+
locals.set(chatResponsePartsKey, []);
5220+
}
50965221
}
50975222

50985223
// Capture token usage from the streamText result
@@ -5169,6 +5294,12 @@ function createChatSession(
51695294
},
51705295

51715296
async addResponse(response: UIMessage) {
5297+
// Append any non-transient data parts queued via chat.response or writer.write()
5298+
const queuedParts = locals.get(chatResponsePartsKey);
5299+
if (queuedParts && queuedParts.length > 0) {
5300+
response = { ...response, parts: [...(response.parts ?? []), ...(queuedParts as UIMessage["parts"])] };
5301+
locals.set(chatResponsePartsKey, []);
5302+
}
51725303
await accumulator.addResponse(response);
51735304
},
51745305

@@ -5576,6 +5707,8 @@ export const chat = {
55765707
inject: injectBackgroundContext,
55775708
/** Typed chat output stream for writing custom chunks or piping from subtasks. */
55785709
stream: chatStream,
5710+
/** Write data parts that persist to the response message. See {@link chatResponse}. */
5711+
response: chatResponse,
55795712
/** Pre-built input stream for receiving messages from the transport. */
55805713
messages: messagesInput,
55815714
/** Create a managed stop signal wired to the stop input stream. See {@link createStopSignal}. */

references/ai-chat/src/trigger/chat.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ export const aiChat = chat
359359
// #region onTurnStart — persist messages + write status via writer
360360
onTurnStart: async ({ chatId, uiMessages, writer, runId }) => {
361361
warmCodeSandbox(runId);
362-
writer.write({ type: "data-turn-status", data: { status: "preparing" } });
362+
writer.write({ type: "data-turn-status", data: { status: "preparing" }, transient: true });
363363
chat.defer(
364364
prisma.chat.update({
365365
where: { id: chatId },
@@ -373,15 +373,35 @@ export const aiChat = chat
373373
await disposeCodeSandboxForRun(ctx.run.id);
374374
},
375375

376+
// #region onBeforeTurnComplete — add a persistent data part to test chat.response
377+
onBeforeTurnComplete: async ({ writer, turn }) => {
378+
writer.write({
379+
type: "data-turn-metadata",
380+
data: { turn, timestamp: Date.now(), source: "onBeforeTurnComplete" },
381+
});
382+
},
383+
// #endregion
384+
376385
// #region onTurnComplete — persist + background self-review via chat.inject()
377386
onTurnComplete: async ({
378387
chatId,
379388
uiMessages,
380389
messages,
390+
responseMessage,
381391
runId,
382392
chatAccessToken,
383393
lastEventId,
384394
}) => {
395+
// Log whether data-turn-metadata persisted to the response
396+
const metadataParts = responseMessage?.parts?.filter(
397+
(p: any) => p.type === "data-turn-metadata"
398+
);
399+
logger.info("onTurnComplete response parts check", {
400+
hasResponseMessage: !!responseMessage,
401+
totalParts: responseMessage?.parts?.length ?? 0,
402+
metadataPartsCount: metadataParts?.length ?? 0,
403+
metadataParts,
404+
});
385405
await prisma.chat.update({
386406
where: { id: chatId },
387407
data: { messages: uiMessages as unknown as ChatMessagesForWrite },

0 commit comments

Comments
 (0)