Skip to content

Commit 1f1a366

Browse files
authored
fix(sdk): custom agent loop parity for continuations, steering, and subtasks (#3936)
## Summary Three fixes that bring custom agent loops (`chat.customAgent` hand-rolled loops and `chat.createSession`) up to the behavior `chat.agent` users already get, and that the docs already promise: - **Continuation runs no longer replay already-answered messages.** A chat continuing after a cancel, crash, or upgrade re-delivered every prior user message into the loop's first wait, so the model re-answered an old message while the real new one had to arrive via steering. The `.in` resume cursor is now seeded before any listener attaches, using the same boot logic as `chat.agent`. - **Mid-stream steering no longer wipes the in-flight response.** `chat.pipeAndCapture` (also backing `turn.complete()`) streamed without a server-generated message id, so a `prepareStep` injection regenerated the assistant id mid-stream and the frontend replaced the partial message, discarding everything streamed before the injection. - **Task-backed tools now work from custom agent loops.** A child task triggered via `ai.toolExecute` failed with "chat.agent session handle is not initialized" because the parent's chatId only threaded from the per-turn context that hand-rolled loops never set. It now falls back to the session handle the `chat.customAgent` wrapper binds at run boot, so children can stream progress into the chat with `chat.stream.writer({ target: "root" })` (the documented sub-agent pattern). ## Root cause on the replay fix Attaching any `.in` listener (`chat.createStopSignal`, `chat.messages.on`, the first wait) opens the SSE tail with `Last-Event-ID` taken from the seq cursor at attach time. Custom loops attached before any cursor existed, so S2 replayed from seq 0. The fix resolves the cursor from the latest turn-complete header and seeds both manager cursors (`setLastSeqNum` drives the SSE resume point, `setLastDispatchedSeqNum` gates waiter dispatch) before attach; `chat.createSession` now creates its stop signal lazily on the first iteration, after the seed. Seeding only the first cursor after attach does not work, which is why the earlier attempt at this was reverted. All three were reproduced red-green against the references ai-chat project: the replay repro showed the continuation wait consuming a stale message in 403ms with the real message arriving via steering injection; post-fix the wait consumes the real message directly with no injection. Steering now preserves the full in-flight response, and the deepResearch sub-agent streams its progress parts into a raw-loop parent. Existing behavior verified unchanged: full SDK unit suite, `chat.agent` steering, and stop-then-continue on `chat.createSession`.
1 parent 85d93ff commit 1f1a366

2 files changed

Lines changed: 96 additions & 5 deletions

File tree

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Three fixes for custom agent loops (`chat.customAgent`, `chat.createSession`, and hand-rolled `MessageAccumulator` loops):
6+
7+
- Continuation runs no longer replay already-answered user messages into the first turn. The `.in` resume cursor is now seeded before any listener attaches (the same boot logic `chat.agent` uses), so a chat that continues after a cancel, crash, or upgrade only sees genuinely new messages.
8+
- Steering a hand-rolled loop mid-stream no longer wipes the in-flight assistant response. `chat.pipeAndCapture` now stamps a server-generated message id on the stream, so a `prepareStep` injection keeps the partial text instead of replacing the message.
9+
- Task-backed tools (`ai.toolExecute`) now work from custom agent loops: the parent's session is threaded to the child run, so child tasks can stream progress into the chat with `chat.stream.writer({ target: "root" })` instead of failing with "session handle is not initialized".

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 87 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ const chatTurnContextKey = locals.create<ChatTurnContext>("chat.turnContext");
160160
* @internal
161161
*/
162162
const chatSessionHandleKey = locals.create<SessionHandle>("chat.sessionHandle");
163+
// The external `chatId` from the boot payload — the value `ToolCallExecutionOptions.chatId`
164+
// is documented to carry. Custom-agent loops never set per-turn context, so subtask tool
165+
// metadata reads this directly rather than the Session handle id.
166+
const chatExternalIdKey = locals.create<string>("chat.externalId");
163167

164168
/**
165169
* S2 seq_num of the most recent `turn-complete` control record written by
@@ -221,6 +225,47 @@ export async function __findLatestSessionInCursorForTests(
221225
return findLatestSessionInCursor(chatId);
222226
}
223227

228+
/**
229+
* Seed the `.in` resume cursor for custom-agent loops (`chat.customAgent`
230+
* raw loops and `chat.createSession`) the way `chat.agent`'s boot does.
231+
*
232+
* MUST run before anything attaches a `.in` listener (`createStopSignal`,
233+
* `chat.messages.on`, the first wait): attaching opens the SSE tail with
234+
* `Last-Event-ID` from the seeded cursor, so attach-then-seed replays
235+
* every record from seq 0 — already-answered user messages get delivered
236+
* into the new run's first wait and the loop re-answers them.
237+
*
238+
* Seeds both cursors: `setLastSeqNum` controls the SSE `Last-Event-ID`,
239+
* `setLastDispatchedSeqNum` gates waiter dispatch — seeding only the
240+
* former still re-delivers records the manager buffered before the seed.
241+
*
242+
* No-ops on fresh boots and when a cursor is already seeded (e.g. the
243+
* `chatCustomAgent` wrapper ran before a nested `createChatSession`).
244+
* @internal
245+
*/
246+
async function seedSessionInResumeCursorForCustomLoop(
247+
payload: Pick<ChatTaskWirePayload, "chatId" | "continuation">
248+
): Promise<void> {
249+
if (sessionStreams.lastSeqNum(payload.chatId, "in") !== undefined) return;
250+
// No continuation/attempt gate: the wire may omit `continuation` on a
251+
// run that still has prior turns (chat.agent covers that case via its
252+
// snapshot). The scan doubles as the prior-state probe — a fresh
253+
// session has no turn-complete on `.out`, returns no cursor, and
254+
// seeds nothing. Cost on fresh boots is one non-blocking records read.
255+
try {
256+
const cursor = await findLatestSessionInCursor(payload.chatId);
257+
if (cursor !== undefined) {
258+
sessionStreams.setLastSeqNum(payload.chatId, "in", cursor);
259+
sessionStreams.setLastDispatchedSeqNum(payload.chatId, "in", cursor);
260+
}
261+
} catch (error) {
262+
logger.warn(
263+
"chat session: session.in resume cursor lookup failed; old messages may replay",
264+
{ error: error instanceof Error ? error.message : String(error) }
265+
);
266+
}
267+
}
268+
224269
/**
225270
* Versioned blob written to S3 after every turn completes (when no
226271
* `hydrateMessages` hook is registered). Read at run boot to seed the
@@ -921,6 +966,15 @@ function createTaskToolExecuteHandler<
921966
toolMeta.turn = chatCtx.turn;
922967
toolMeta.continuation = chatCtx.continuation;
923968
toolMeta.clientData = chatCtx.clientData;
969+
} else {
970+
// Hand-rolled chat.customAgent loops never set per-turn context, but
971+
// the wrapper records the boot payload's external chatId at run boot
972+
// — thread it so subtask chat helpers (`chat.stream.writer` with
973+
// target "root") can open the parent's session.
974+
const chatExternalId = locals.get(chatExternalIdKey);
975+
if (chatExternalId) {
976+
toolMeta.chatId = chatExternalId;
977+
}
924978
}
925979

926980
const chatLocals: Record<string, unknown> = {};
@@ -5104,6 +5158,7 @@ function chatCustomAgent<
51045158
// `chat.createStartSessionAction`) before this run is triggered.
51055159
// No client-side upsert needed.
51065160
locals.set(chatSessionHandleKey, sessions.open(payload.chatId));
5161+
locals.set(chatExternalIdKey, payload.chatId);
51075162
locals.set(chatAgentRunContextKey, runOptions.ctx);
51085163
// Initialize the turn-complete trim slot so `chat.writeTurnComplete`
51095164
// trims `session.out` back to the previous turn boundary. Without
@@ -5113,6 +5168,10 @@ function chatCustomAgent<
51135168
markChatAgentRunForStreamsWarning();
51145169
taskContext.setConversationId(payload.chatId);
51155170
stampConversationIdOnActiveSpan(payload.chatId);
5171+
// Seed the `.in` resume cursor before user code attaches any `.in`
5172+
// listener — otherwise a continuation boot replays already-answered
5173+
// messages into the loop's first wait.
5174+
await seedSessionInResumeCursorForCustomLoop(payload);
51165175
return userRun(payload, runOptions);
51175176
},
51185177
});
@@ -5213,6 +5272,7 @@ function chatAgent<
52135272
// `chat.createStartSessionAction` or browser-direct) before this
52145273
// run is triggered — no client-side upsert needed here.
52155274
locals.set(chatSessionHandleKey, sessions.open(payload.chatId));
5275+
locals.set(chatExternalIdKey, payload.chatId);
52165276
// Mutable holder; advances in `writeTurnCompleteChunk` after each turn
52175277
// and is the trim target for the NEXT turn's trim record.
52185278
locals.set(lastTurnCompleteSeqNumKey, { value: undefined });
@@ -8613,8 +8673,15 @@ async function pipeChatAndCapture(
86138673
resolveOnFinish = r;
86148674
});
86158675

8676+
const resolvedOptions = resolveUIMessageStreamOptions();
86168677
const uiStream = source.toUIMessageStream({
8617-
...resolveUIMessageStreamOptions(),
8678+
...resolvedOptions,
8679+
// Stamp a server-generated id on the start chunk, same as chat.agent's
8680+
// pipe. Without it the AI SDK regenerates the assistant id when a
8681+
// prepareStep injection (steering) starts a new step mid-stream, and
8682+
// the frontend replaces the partial message — wiping the
8683+
// pre-injection text from the UI and the captured response.
8684+
generateMessageId: resolvedOptions.generateMessageId ?? generateMessageId,
86188685
onFinish: ({ responseMessage }: { responseMessage: UIMessage }) => {
86198686
captured = responseMessage;
86208687
resolveOnFinish!();
@@ -8936,14 +9003,18 @@ export type ChatTurn = {
89369003
* signaling, and idle/suspend between turns. You control: initialization,
89379004
* model/tool selection, persistence, and any custom per-turn logic.
89389005
*
9006+
* Call from inside a `chat.customAgent()` run — the wrapper binds the
9007+
* backing Session that the iterator's stop signal and message channels
9008+
* resolve to. (A plain `task()` does not bind it, so `createSession`
9009+
* would throw "session handle is not initialized".)
9010+
*
89399011
* @example
89409012
* ```ts
8941-
* import { task } from "@trigger.dev/sdk";
89429013
* import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai";
89439014
* import { streamText } from "ai";
89449015
* import { openai } from "@ai-sdk/openai";
89459016
*
8946-
* export const myChat = task({
9017+
* export const myChat = chat.customAgent({
89479018
* id: "my-chat",
89489019
* run: async (payload: ChatTaskWirePayload, { signal }) => {
89499020
* const session = chat.createSession(payload, { signal });
@@ -8979,13 +9050,23 @@ function createChatSession(
89799050
[Symbol.asyncIterator]() {
89809051
let currentPayload = payload;
89819052
let turn = -1;
8982-
const stop = createStopSignal();
9053+
// Created on the first next() call, AFTER the resume-cursor seed —
9054+
// createStopSignal attaches the `.in` SSE tail, and attaching
9055+
// before the seed replays every record from seq 0 (the seed is a
9056+
// no-op when the chatCustomAgent wrapper already ran it).
9057+
let stop!: ReturnType<typeof createStopSignal>;
9058+
let booted = false;
89839059
const accumulator = new ChatMessageAccumulator();
89849060
let previousTurnUsage: LanguageModelUsage | undefined;
89859061
let cumulativeUsage: LanguageModelUsage = emptyUsage();
89869062

89879063
return {
89889064
async next(): Promise<IteratorResult<ChatTurn>> {
9065+
if (!booted) {
9066+
booted = true;
9067+
await seedSessionInResumeCursorForCustomLoop(currentPayload);
9068+
stop = createStopSignal();
9069+
}
89899070
turn++;
89909071

89919072
// First turn: wait when the boot payload carries no message.
@@ -9328,7 +9409,8 @@ function createChatSession(
93289409
},
93299410

93309411
async return() {
9331-
stop.cleanup();
9412+
// `stop` only exists once next() has booted the iterator.
9413+
stop?.cleanup();
93329414
return { done: true, value: undefined };
93339415
},
93349416
};

0 commit comments

Comments
 (0)