From 3726bc0db8f5563e5eaca0b39c5268f924b4e74f Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 12 Jun 2026 12:39:27 +0100 Subject: [PATCH 1/4] docs(ai-chat): document custom agents and complete the createSession reference Adds a dedicated Custom agents page covering chat.customAgent (agent registration + session binding) and both loop styles: the managed createSession iterator and the hand-rolled primitives loop. Includes the patterns the managed lifecycle otherwise covers for you: continuation history seeding, persisting the user message before streaming, racing totalUsage after a stop, and the slim wire shape. The Backend page leads with a decision table across the three abstraction levels and now focuses on chat.agent, routing to the new page. Completes the ChatSessionOptions and ChatTurn reference tables (compaction, pendingMessages, usage fields, setMessages, prepareStep) and fixes stale examples that read a plural messages field off the wire payload. --- docs/ai-chat/backend.mdx | 261 ++-------------- docs/ai-chat/custom-agents.mdx | 363 +++++++++++++++++++++++ docs/ai-chat/how-it-works.mdx | 2 +- docs/ai-chat/mcp.mdx | 4 +- docs/ai-chat/patterns/oom-resilience.mdx | 4 +- docs/ai-chat/pending-messages.mdx | 10 +- docs/ai-chat/reference.mdx | 54 ++-- 7 files changed, 428 insertions(+), 270 deletions(-) create mode 100644 docs/ai-chat/custom-agents.mdx diff --git a/docs/ai-chat/backend.mdx b/docs/ai-chat/backend.mdx index ccf86ef03b5..626d8c4b7ad 100644 --- a/docs/ai-chat/backend.mdx +++ b/docs/ai-chat/backend.mdx @@ -8,6 +8,22 @@ import RcBanner from "/snippets/ai-chat-rc-banner.mdx"; +There are three abstraction levels for a chat backend. All three speak the same wire protocol, so the [frontend transport](/ai-chat/frontend) works unchanged whichever you pick. + +| Capability | `chat.agent()` | `chat.createSession()` | Raw primitives | +| ------------------------------------- | -------------- | ------------------------------------------------------------- | -------------- | +| Turn loop, stop signals, accumulation | Managed | Managed | You write it | +| Lifecycle hooks | Yes | No — inline code per turn | No | +| Continuation recovery on new runs | Automatic | [Manual seeding](/ai-chat/custom-agents#continuation-runs-and-history-seeding) | Manual seeding | +| Compaction / steering | Built-in | Built-in | Manual | +| Head Start, actions, tool approvals | Yes | No | No | +| Custom stream conversion | No | Limited | Full control | +| Agent dashboard visibility | Yes | Yes (via `customAgent`) | Yes | + +The raw-primitives column assumes [`chat.customAgent()`](/ai-chat/custom-agents) as the wrapper, which is what makes the task visible to the agent dashboard. + +Start with `chat.agent()`. Drop to `chat.createSession()` when you want to own the per-turn code (model routing, persistence, custom telemetry) without rebuilding the turn loop. Drop to raw primitives only when you need full control over stream conversion or a custom protocol. + ## chat.agent() The highest-level approach. Handles message accumulation, stop signals, turn lifecycle, and auto-piping automatically. @@ -119,7 +135,7 @@ writer.write({ - `chat.response` and the `writer` accumulation behavior work with `chat.agent` and `chat.createSession`. If you're using [`chat.customAgent`](#raw-task-with-primitives), you own the accumulator — see the raw-task example for the manual pattern. + `chat.response` and the `writer` accumulation behavior work with `chat.agent` and `chat.createSession`. If you're using [`chat.customAgent`](/ai-chat/custom-agents), you own the accumulator — see the raw-task example for the manual pattern. ### Raw streaming with `chat.stream` @@ -750,7 +766,7 @@ See [ChatUIMessageStreamOptions](/ai-chat/reference#chatuimessagestreamoptions) `onFinish` is managed internally for response capture and cannot be overridden here. Use `streamText`'s `onFinish` callback for custom finish handling, or use [raw task - mode](#raw-task-with-primitives) for full control over `toUIMessageStream()`. + mode](/ai-chat/custom-agents) for full control over `toUIMessageStream()`. ### Manual mode with task() @@ -787,241 +803,10 @@ export const manualChat = task({ --- -## chat.createSession() - -A middle ground between `chat.agent()` and raw primitives. You get an async iterator that yields `ChatTurn` objects — each turn handles stop signals, message accumulation, and turn-complete signaling automatically. You control initialization, model/tool selection, persistence, and any custom per-turn logic. - -Use `chat.createSession()` inside a standard `task()`: - -```ts -import { task } from "@trigger.dev/sdk"; -import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai"; -import { streamText } from "ai"; -import { anthropic } from "@ai-sdk/anthropic"; - -export const myChat = task({ - id: "my-chat", - run: async (payload: ChatTaskWirePayload, { signal }) => { - // One-time initialization — just code, no hooks - const clientData = payload.metadata as { userId: string }; - await db.chat.create({ data: { id: payload.chatId, userId: clientData.userId } }); - - const session = chat.createSession(payload, { - signal, - idleTimeoutInSeconds: 60, - timeout: "1h", - }); - - for await (const turn of session) { - const result = streamText({ - model: anthropic("claude-sonnet-4-5"), - messages: turn.messages, - abortSignal: turn.signal, - stopWhen: stepCountIs(15), - }); - - // Pipe, capture, accumulate, and signal turn-complete — all in one call - await turn.complete(result); - - // Persist after each turn - await db.chat.update({ - where: { id: turn.chatId }, - data: { messages: turn.uiMessages }, - }); - } - }, -}); -``` - -### ChatSessionOptions +## Custom agents -| Option | Type | Default | Description | -| ---------------------- | ------------- | -------- | ------------------------------------------- | -| `signal` | `AbortSignal` | required | Run-level cancel signal (from task context) | -| `idleTimeoutInSeconds` | `number` | `30` | Seconds to stay idle between turns | -| `timeout` | `string` | `"1h"` | Duration string for suspend timeout | -| `maxTurns` | `number` | `100` | Max turns before ending | +Both lower levels — `chat.createSession()` (managed turn iterator, your turn body) and `chat.customAgent()` with raw primitives (hand-rolled loop, full stream-conversion control) — are covered together on the Custom agents page, including the `ChatTurn` surface, the continuation-seeding pattern, and the hand-rolled-loop checklist: -### ChatTurn - -Each turn yielded by the iterator provides: - -| Field | Type | Description | -| -------------- | ---------------- | ------------------------------------------------------ | -| `number` | `number` | Turn number (0-indexed) | -| `chatId` | `string` | Chat session ID | -| `trigger` | `string` | What triggered this turn | -| `clientData` | `unknown` | Client data from the transport | -| `messages` | `ModelMessage[]` | Full accumulated model messages — pass to `streamText` | -| `uiMessages` | `UIMessage[]` | Full accumulated UI messages — use for persistence | -| `signal` | `AbortSignal` | Combined stop+cancel signal (fresh each turn) | -| `stopped` | `boolean` | Whether the user stopped generation this turn | -| `continuation` | `boolean` | Whether this is a continuation run | - -| Method | Description | -| ---------------------------- | ------------------------------------------------------------------- | -| `turn.complete(source)` | Pipe stream, capture response, accumulate, and signal turn-complete | -| `turn.done()` | Just signal turn-complete (when you've piped manually) | -| `turn.addResponse(response)` | Add a response to the accumulator manually | - -### turn.complete() vs manual control - -`turn.complete(result)` is the easy path — it handles piping, capturing the response, accumulating messages, cleaning up aborted parts, and writing the turn-complete chunk. - -For more control, you can do each step manually: - -```ts -for await (const turn of session) { - const result = streamText({ - model: anthropic("claude-sonnet-4-5"), - messages: turn.messages, - abortSignal: turn.signal, - stopWhen: stepCountIs(15), - }); - - // Manual: pipe and capture separately - const response = await chat.pipeAndCapture(result, { signal: turn.signal }); - - if (response) { - // Custom processing before accumulating - await turn.addResponse(response); - } - - // Custom persistence, analytics, etc. - await db.chat.update({ ... }); - - // Must call done() when not using complete() - await turn.done(); -} -``` - ---- - -## Raw task with primitives - -For full control, use a standard `task()` with the composable primitives from the `chat` namespace. You manage everything: the turn loop, stop signals, message accumulation, and turn-complete signaling. - -Raw task mode also lets you call `.toUIMessageStream()` yourself with any options — including `onFinish` and `originalMessages`. This is the right choice when you need complete control over the stream conversion beyond what `chat.setUIMessageStreamOptions()` provides. - -### Primitives - -| Primitive | Description | -| ------------------------------- | ------------------------------------------------------------------------------------------- | -| `chat.messages` | Input stream for incoming messages — use `.waitWithIdleTimeout()` to wait for the next turn | -| `chat.createStopSignal()` | Create a managed stop signal wired to the stop input stream | -| `chat.pipeAndCapture(result)` | Pipe a `StreamTextResult` to the chat stream and capture the response | -| `chat.writeTurnComplete()` | Signal the frontend that the current turn is complete | -| `chat.MessageAccumulator` | Accumulates conversation messages across turns | -| `chat.pipe(stream)` | Pipe a stream to the frontend (no response capture) | -| `chat.cleanupAbortedParts(msg)` | Clean up incomplete parts from a stopped response | - -### Example - -```ts -import { task } from "@trigger.dev/sdk"; -import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai"; -import { streamText } from "ai"; -import { anthropic } from "@ai-sdk/anthropic"; - -export const myChat = task({ - id: "my-chat-raw", - run: async (payload: ChatTaskWirePayload, { signal: runSignal }) => { - let currentPayload = payload; - - // Handle preload — wait for the first real message - if (currentPayload.trigger === "preload") { - const result = await chat.messages.waitWithIdleTimeout({ - idleTimeoutInSeconds: 60, - timeout: "1h", - spanName: "waiting for first message", - }); - if (!result.ok) return; - currentPayload = result.output; - } - - const stop = chat.createStopSignal(); - const conversation = new chat.MessageAccumulator(); - - for (let turn = 0; turn < 100; turn++) { - stop.reset(); - - const messages = await conversation.addIncoming( - currentPayload.messages, - currentPayload.trigger, - turn - ); - - const combinedSignal = AbortSignal.any([runSignal, stop.signal]); - - const result = streamText({ - model: anthropic("claude-sonnet-4-5"), - messages, - abortSignal: combinedSignal, - stopWhen: stepCountIs(15), - }); - - let response; - try { - response = await chat.pipeAndCapture(result, { signal: combinedSignal }); - } catch (error) { - if (error instanceof Error && error.name === "AbortError") { - if (runSignal.aborted) break; - // Stop — fall through to accumulate partial - } else { - throw error; - } - } - - if (response) { - const cleaned = - stop.signal.aborted && !runSignal.aborted ? chat.cleanupAbortedParts(response) : response; - await conversation.addResponse(cleaned); - } - - if (runSignal.aborted) break; - - // Persist, analytics, etc. - await db.chat.update({ - where: { id: currentPayload.chatId }, - data: { messages: conversation.uiMessages }, - }); - - await chat.writeTurnComplete(); - - // Wait for the next message - const next = await chat.messages.waitWithIdleTimeout({ - idleTimeoutInSeconds: 60, - timeout: "1h", - spanName: "waiting for next message", - }); - if (!next.ok) break; - currentPayload = next.output; - } - - stop.cleanup(); - }, -}); -``` - -### MessageAccumulator - -The `MessageAccumulator` handles the transport protocol automatically: - -- Turn 0: replaces messages (full history from frontend) -- Subsequent turns: appends new messages (frontend only sends the new user message) -- Regenerate: replaces messages (full history minus last assistant message) - -```ts -const conversation = new chat.MessageAccumulator(); - -// Returns full accumulated ModelMessage[] for streamText -const messages = await conversation.addIncoming(payload.messages, payload.trigger, turn); - -// After piping, add the response -const response = await chat.pipeAndCapture(result); -if (response) await conversation.addResponse(response); - -// Access accumulated messages for persistence -conversation.uiMessages; // UIMessage[] -conversation.modelMessages; // ModelMessage[] -``` + + Build agents without the managed lifecycle — createSession or raw primitives. + diff --git a/docs/ai-chat/custom-agents.mdx b/docs/ai-chat/custom-agents.mdx new file mode 100644 index 00000000000..54c0461a75d --- /dev/null +++ b/docs/ai-chat/custom-agents.mdx @@ -0,0 +1,363 @@ +--- +title: "Custom agents" +sidebarTitle: "Custom agents" +description: "Build chat agents without chat.agent()'s managed lifecycle: register with chat.customAgent(), then drive turns with the createSession iterator or a hand-rolled loop." +--- + +import RcBanner from "/snippets/ai-chat-rc-banner.mdx"; + + + +**A custom agent is a task you register with `chat.customAgent()` and drive yourself — either with the managed turn iterator from `chat.createSession()`, or with a fully hand-rolled loop over the raw chat primitives.** You give up `chat.agent()`'s lifecycle hooks and automatic continuation recovery; you gain inline control over every turn, and (at the lowest level) full control over the stream conversion. + +See the [comparison table](/ai-chat/backend) before dropping down. The frontend is unchanged either way: all levels speak the same wire protocol, so [`useTriggerChatTransport`](/ai-chat/frontend) points at a custom agent exactly like a `chat.agent()`. + +## chat.customAgent() + +`chat.customAgent()` is a thin wrapper around `task()` that does two things: it registers the task as an agent (so it appears in the agent dashboard, the playground, and the MCP server's `list_agents`), and it binds the run to its backing [Session](/ai-chat/sessions) so the `chat.*` primitives resolve to the right `.in`/`.out` channels. There is no managed lifecycle — no turn loop, no hooks, no preload handling. + +A plain `task()` works with the same primitives but stays invisible to the agent surfaces, so prefer `customAgent` unless you specifically don't want the task listed as an agent. + +Inside the wrapper, pick one of two loop styles: + +- **[Managed loop](#managed-loop-chatcreatesession)** — `chat.createSession()` yields turns; the SDK handles stop signals, accumulation, idle suspend/resume, and turn-complete signaling. You write the turn body. +- **[Hand-rolled loop](#hand-rolled-loop-with-primitives)** — you write the loop itself with `chat.messages`, `MessageAccumulator`, `pipeAndCapture`, and `writeTurnComplete`. The right choice when you need complete control over `.toUIMessageStream()` (e.g. `onFinish`, `originalMessages`) beyond what `chat.setUIMessageStreamOptions()` provides, or you're implementing a custom protocol. + +## Managed loop: chat.createSession() + +`chat.createSession()` gives you an async iterator of `ChatTurn` objects. Each turn arrives with the accumulated history, a combined stop+cancel signal, and helpers to finish the turn: + +```ts trigger/my-chat.ts +import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai"; +import { streamText, stepCountIs } from "ai"; +import { anthropic } from "@ai-sdk/anthropic"; + +export const myChat = chat.customAgent({ + id: "my-chat", + run: async (payload: ChatTaskWirePayload, { signal }) => { + // One-time initialization — plain code, no hooks. Upsert, not create: + // continuation runs boot with the row already in place. + const clientData = payload.metadata as { userId: string }; + await db.chat.upsert({ + where: { id: payload.chatId }, + create: { id: payload.chatId, userId: clientData.userId }, + update: {}, + }); + + const session = chat.createSession(payload, { + signal, + idleTimeoutInSeconds: 60, + timeout: "1h", + }); + + for await (const turn of session) { + // Persist the incoming user message BEFORE streaming — this is your + // onTurnStart equivalent. Without it, a page reload mid-stream + // restores the assistant text (replayed from the session) but loses + // the user message that prompted it. + await db.chat.update({ + where: { id: turn.chatId }, + data: { messages: turn.uiMessages }, + }); + + const result = streamText({ + model: anthropic("claude-sonnet-4-5"), + messages: turn.messages, + abortSignal: turn.signal, + stopWhen: stepCountIs(15), + }); + + // Pipe, capture, accumulate, and signal turn-complete — all in one call + await turn.complete(result); + + // Persist the full exchange after the turn — your onTurnComplete equivalent + await db.chat.update({ + where: { id: turn.chatId }, + data: { messages: turn.uiMessages }, + }); + } + }, +}); +``` + + + If you pass `compaction` or `pendingMessages` to `chat.createSession()`, you must also pass `prepareStep: turn.prepareStep()` to `streamText` (or spread `chat.toStreamTextOptions()`, which wires it automatically). Without it, both features silently no-op. + + +### ChatSessionOptions + +| Option | Type | Default | Description | +| ---------------------- | ---------------------------- | ----------- | -------------------------------------------------------------------------------------------------- | +| `signal` | `AbortSignal` | required | Run-level cancel signal (from task context) | +| `idleTimeoutInSeconds` | `number` | `30` | Seconds to stay idle between turns before suspending | +| `timeout` | `string` | `"1h"` | Duration string for suspend timeout | +| `maxTurns` | `number` | `100` | Max turns before ending | +| `compaction` | `ChatAgentCompactionOptions` | `undefined` | Automatic context [compaction](/ai-chat/compaction) — same options as on `chat.agent()` | +| `pendingMessages` | `PendingMessagesOptions` | `undefined` | Mid-execution [message injection](/ai-chat/pending-messages) — same options as on `chat.agent()` | + +Between turns the run idles on `waitWithIdleTimeout`: after `idleTimeoutInSeconds` with no message it suspends (compute is freed), and the next message restores it on the same run — the same warm/suspended pipeline `chat.agent()` uses. + +### ChatTurn + +Each turn yielded by the iterator provides: + +| Field | Type | Description | +| ------------------- | --------------------------------- | -------------------------------------------------------- | +| `number` | `number` | Turn number (0-indexed) | +| `chatId` | `string` | Chat session ID | +| `trigger` | `string` | What triggered this turn | +| `clientData` | `unknown` | Client data from the transport | +| `messages` | `ModelMessage[]` | Full accumulated model messages — pass to `streamText` | +| `uiMessages` | `UIMessage[]` | Full accumulated UI messages — use for persistence | +| `signal` | `AbortSignal` | Combined stop+cancel signal (fresh each turn) | +| `stopped` | `boolean` | Whether the user stopped generation this turn | +| `continuation` | `boolean` | Whether this is a continuation run | +| `previousTurnUsage` | `LanguageModelUsage \| undefined` | Token usage from the previous turn (undefined on turn 0) | +| `totalUsage` | `LanguageModelUsage` | Cumulative token usage across all completed turns | + +| Method | Description | +| ----------------------------- | ---------------------------------------------------------------------------------------------------------- | +| `turn.complete(source)` | Pipe stream, capture response, accumulate, and signal turn-complete | +| `turn.done()` | Signal turn-complete only (when you have piped manually) | +| `turn.addResponse(response)` | Add a response to the accumulator manually | +| `turn.setMessages(uiMessages)`| Replace the accumulated messages — continuation seeding and on-demand compaction | +| `turn.prepareStep()` | `prepareStep` callback wiring compaction + injection — pass to `streamText` when not spreading `chat.toStreamTextOptions()` | + +### Continuation runs and history seeding + +`chat.agent()` rebuilds conversation history automatically when a chat continues on a fresh run (after a cancel, crash, version upgrade, or TTL expiry) — via its snapshot/replay boot or your `hydrateMessages` hook. Custom agents do none of that: a continuation run starts with an **empty accumulator**, and history restoration is your job. + +With `createSession`, check `turn.continuation` on the first turn and seed from your store with `turn.setMessages()`: + +```ts +for await (const turn of session) { + if (turn.continuation && turn.number === 0) { + const row = await db.chat.findUnique({ where: { id: turn.chatId } }); + const stored = (row?.messages ?? []) as UIMessage[]; + if (stored.length > 0) { + // Keep any incoming message that isn't already persisted + const incoming = turn.uiMessages.filter((m) => !stored.some((s) => s.id === m.id)); + await turn.setMessages([...stored, ...incoming]); + } + } + + // ... streamText + turn.complete as usual +} +``` + +Without this, a resumed chat silently loses its history: the model sees only the message that triggered the continuation. In a hand-rolled loop, seed by passing the stored history into the turn-0 `addIncoming` call — shown in the example below. + +### turn.complete() vs manual control + +`turn.complete(result)` is the one-call path — it handles piping, capturing the response, accumulating messages, cleaning up aborted parts on a stop, and writing the turn-complete chunk. + +For more control, you can do each step manually: + +```ts +for await (const turn of session) { + const result = streamText({ + model: anthropic("claude-sonnet-4-5"), + messages: turn.messages, + abortSignal: turn.signal, + stopWhen: stepCountIs(15), + }); + + // Manual: pipe and capture separately + const response = await chat.pipeAndCapture(result, { signal: turn.signal }); + + if (response) { + // Custom processing before accumulating + await turn.addResponse(response); + } + + // Custom persistence, analytics, etc. + await db.chat.update({ ... }); + + // Must call done() when not using complete() + await turn.done(); +} +``` + +## Hand-rolled loop with primitives + +For full control, skip `createSession` and compose the primitives directly: + +| Primitive | Description | +| ------------------------------- | -------------------------------------------------------------------------------------------- | +| `chat.messages` | Input stream for incoming messages — use `.waitWithIdleTimeout()` to wait for the next turn | +| `chat.createStopSignal()` | Create a managed stop signal wired to the stop input stream | +| `chat.pipeAndCapture(result)` | Pipe a `StreamTextResult` to the chat stream and capture the response | +| `chat.writeTurnComplete()` | Signal the frontend that the current turn is complete | +| `chat.MessageAccumulator` | Accumulates conversation messages across turns | +| `chat.pipe(stream)` | Pipe a stream to the frontend (no response capture) | +| `chat.cleanupAbortedParts(msg)` | Clean up incomplete parts from a stopped response | + +A complete loop: + +```ts trigger/my-chat-raw.ts +import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai"; +import { streamText, stepCountIs } from "ai"; +import { anthropic } from "@ai-sdk/anthropic"; + +export const myChat = chat.customAgent({ + id: "my-chat-raw", + run: async (payload: ChatTaskWirePayload, { signal: runSignal }) => { + let currentPayload = payload; + + // Handle preload — wait for the first real message + if (currentPayload.trigger === "preload") { + const result = await chat.messages.waitWithIdleTimeout({ + idleTimeoutInSeconds: 60, + timeout: "1h", + spanName: "waiting for first message", + }); + if (!result.ok) return; + currentPayload = result.output; + } + + const stop = chat.createStopSignal(); + const conversation = new chat.MessageAccumulator(); + + // Continuation runs (cancel, crash, upgrade) start with an empty + // accumulator — fetch stored history so turn 0 can seed it. + let continuationSeed: UIMessage[] = []; + if (currentPayload.continuation) { + const row = await db.chat.findUnique({ where: { id: currentPayload.chatId } }); + continuationSeed = (row?.messages ?? []) as UIMessage[]; + } + + for (let turn = 0; turn < 100; turn++) { + stop.reset(); + + // The wire payload carries at most one new message per turn. Turn 0 + // REPLACES the accumulator, so seed stored history through + // addIncoming together with the incoming message — a setMessages + // call before the loop would be wiped here. + const incoming = currentPayload.message ? [currentPayload.message] : []; + const turnInput = + turn === 0 && continuationSeed.length > 0 + ? [...continuationSeed.filter((s) => !incoming.some((m) => m.id === s.id)), ...incoming] + : incoming; + const messages = await conversation.addIncoming(turnInput, currentPayload.trigger, turn); + + // Persist the incoming user message before streaming so a + // mid-stream reload doesn't lose it. + await db.chat.update({ + where: { id: currentPayload.chatId }, + data: { messages: conversation.uiMessages }, + }); + + const combinedSignal = AbortSignal.any([runSignal, stop.signal]); + + const result = streamText({ + model: anthropic("claude-sonnet-4-5"), + messages, + abortSignal: combinedSignal, + stopWhen: stepCountIs(15), + }); + + let response; + try { + response = await chat.pipeAndCapture(result, { signal: combinedSignal }); + } catch (error) { + if (error instanceof Error && error.name === "AbortError") { + if (runSignal.aborted) break; + // Stop — fall through to accumulate partial + } else { + throw error; + } + } + + if (response) { + const cleaned = + stop.signal.aborted && !runSignal.aborted ? chat.cleanupAbortedParts(response) : response; + await conversation.addResponse(cleaned); + } + + if (runSignal.aborted) break; + + // Persist, analytics, etc. + await db.chat.update({ + where: { id: currentPayload.chatId }, + data: { messages: conversation.uiMessages }, + }); + + await chat.writeTurnComplete(); + + // Wait for the next message + const next = await chat.messages.waitWithIdleTimeout({ + idleTimeoutInSeconds: 60, + timeout: "1h", + spanName: "waiting for next message", + }); + if (!next.ok) break; + currentPayload = next.output; + } + + stop.cleanup(); + }, +}); +``` + +### MessageAccumulator + +`addIncoming(messages, trigger, turn)` has two modes: + +- **Turn 0 or `trigger === "regenerate-message"`: replaces** the accumulator with exactly what you pass. This is why continuation seeding goes through `addIncoming` (above), and why a regenerate needs you to slice your own history — the wire omits the message on regenerate, so pass the stored history minus the last assistant message. +- **Every other turn: appends** what you pass (the wire carries at most the one new user message). + +```ts +const conversation = new chat.MessageAccumulator(); + +// Returns full accumulated ModelMessage[] for streamText +const messages = await conversation.addIncoming( + payload.message ? [payload.message] : [], + payload.trigger, + turn +); + +// After piping, add the response +const response = await chat.pipeAndCapture(result); +if (response) await conversation.addResponse(response); + +// Access accumulated messages for persistence +conversation.uiMessages; // UIMessage[] +conversation.modelMessages; // ModelMessage[] +``` + +The constructor also accepts `compaction` and `pendingMessages` options (same shapes as on `chat.agent()`); pass `prepareStep: conversation.prepareStep()` to `streamText` to activate them. See [pending messages](/ai-chat/pending-messages#backend-messageaccumulator-raw-task) for the manual steering wiring. + +### Hand-rolled loop checklist + +Things the managed levels do for you that a raw loop has to get right: + +- **Don't bare-await `result.totalUsage`.** On a stop-abort the AI SDK's `totalUsage` promise never settles, which wedges the loop forever. Race it with a timeout: + + ```ts + const turnUsage = await Promise.race([ + result.totalUsage, + new Promise((resolve) => setTimeout(() => resolve(undefined), 2000)), + ]); + ``` + +- **Persist the user message before streaming** (shown in the example above). The session replay restores the assistant's streamed text after a page reload, but nothing restores a user message you haven't written down. +- **Seed history on continuation runs through the turn-0 `addIncoming`** (shown above). `payload.continuation` is `true` when this run picked up an existing chat; the accumulator starts empty — and because turn 0 replaces the accumulator, a `setMessages` call before the loop gets wiped. +- **Clean up aborted parts on a stop** with `chat.cleanupAbortedParts()` before accumulating, or the partial response carries half-open tool calls into the next turn's prompt. +- **Read `payload.message` (singular).** The wire payload carries at most one new message per turn; there is no `messages` array on the payload. + +## Next steps + + + + The three abstraction levels compared, and everything chat.agent() adds on top. + + + The durable stream pair every agent — managed or custom — is built on. + + + Automatic context compression — works with createSession and MessageAccumulator. + + + The wire format your loop is speaking, chunk by chunk. + + diff --git a/docs/ai-chat/how-it-works.mdx b/docs/ai-chat/how-it-works.mdx index ecde885f4ac..7c3d182f8ba 100644 --- a/docs/ai-chat/how-it-works.mdx +++ b/docs/ai-chat/how-it-works.mdx @@ -179,7 +179,7 @@ See [Lifecycle hooks](/ai-chat/lifecycle-hooks) for the full signatures and firi **Not a good fit**: - Single-shot completions where you don't need durability or resume. Call your model directly. -- Workflows where you control both ends and want a custom protocol. Use a [raw `task()` with chat primitives](/ai-chat/backend#raw-task-with-primitives) directly without the `chat.agent` wrapper. +- Workflows where you control both ends and want a custom protocol. Use a [raw `task()` with chat primitives](/ai-chat/custom-agents) directly without the `chat.agent` wrapper. - High-fanout broadcasting (one source, many subscribers). Use Trigger.dev realtime streams against a regular task instead. ## Putting it together diff --git a/docs/ai-chat/mcp.mdx b/docs/ai-chat/mcp.mdx index 63c0d8ece00..0c9f0019a07 100644 --- a/docs/ai-chat/mcp.mdx +++ b/docs/ai-chat/mcp.mdx @@ -25,7 +25,7 @@ See the [MCP Tools Reference](/mcp-tools#agent-chat-tools) for full details on e - Ask your AI assistant to list agents in your project. This calls `list_agents` which returns all tasks created with [`chat.agent()`](/ai-chat/backend#chat-agent) or [`chat.customAgent()`](/ai-chat/backend#raw-task-with-primitives). + Ask your AI assistant to list agents in your project. This calls `list_agents` which returns all tasks created with [`chat.agent()`](/ai-chat/backend#chat-agent) or [`chat.customAgent()`](/ai-chat/custom-agents). Start a conversation with an agent using `start_agent_chat`. This triggers a run and optionally preloads the agent so it's ready to respond immediately. @@ -78,7 +78,7 @@ If you haven't set up the MCP server yet, see the [MCP Server introduction](/mcp Agent chat tools require: - A running dev server (`trigger dev`) or a deployed worker -- At least one agent defined with [`chat.agent()`](/ai-chat/backend#chat-agent) or [`chat.customAgent()`](/ai-chat/backend#raw-task-with-primitives) +- At least one agent defined with [`chat.agent()`](/ai-chat/backend#chat-agent) or [`chat.customAgent()`](/ai-chat/custom-agents) ## How it works diff --git a/docs/ai-chat/patterns/oom-resilience.mdx b/docs/ai-chat/patterns/oom-resilience.mdx index 097fe796cf7..f7dc86de0be 100644 --- a/docs/ai-chat/patterns/oom-resilience.mdx +++ b/docs/ai-chat/patterns/oom-resilience.mdx @@ -32,7 +32,7 @@ That's the entire opt-in. With `oomMachine` set, the agent gets: - **`retry.outOfMemory.machine: oomMachine`** — the fresh attempt boots on the larger machine. - **`session.in` cursor recovery** — the new attempt skips records belonging to turns that already completed on the prior attempt and only re-runs the OOM'd turn. -`chat.agent` does not expose generic `retry` options. OOM recovery is the only retry path because retrying an LLM-driven loop on non-OOM errors tends to be expensive and side-effecting. Drop down to a [raw `task()` with chat primitives](/ai-chat/backend#raw-task-with-primitives) if you need richer retry semantics. +`chat.agent` does not expose generic `retry` options. OOM recovery is the only retry path because retrying an LLM-driven loop on non-OOM errors tends to be expensive and side-effecting. Drop down to a [raw `task()` with chat primitives](/ai-chat/custom-agents) if you need richer retry semantics. ## How recovery works @@ -109,7 +109,7 @@ export const sendEmail = tool({ ## Limitations - **One OOM retry per run.** `chat.agent` sets `maxAttempts: 2`. If attempt 2 also OOMs, the run fails. Use a sufficiently large `oomMachine` to avoid this. -- **Single fallback tier.** Only one `oomMachine`. There's no "tiered retry" (small → medium → large). If you need that, drop down to a [raw `task()` with chat primitives](/ai-chat/backend#raw-task-with-primitives) and configure `retry` directly. +- **Single fallback tier.** Only one `oomMachine`. There's no "tiered retry" (small → medium → large). If you need that, drop down to a [raw `task()` with chat primitives](/ai-chat/custom-agents) and configure `retry` directly. - **Non-OOM errors don't retry.** Schema errors, model-call rejections, tool throws, etc. fail the run as before. Out-of-memory is the only retry trigger. - **Tools mid-execution are not checkpointed.** A partially-run tool re-runs from scratch on the new attempt. Make them idempotent. diff --git a/docs/ai-chat/pending-messages.mdx b/docs/ai-chat/pending-messages.mdx index 80dbdaab2eb..20ab098b9a2 100644 --- a/docs/ai-chat/pending-messages.mdx +++ b/docs/ai-chat/pending-messages.mdx @@ -162,12 +162,16 @@ const conversation = new chat.MessageAccumulator({ }); for (let turn = 0; turn < 100; turn++) { - const messages = await conversation.addIncoming(payload.messages, payload.trigger, turn); + // The wire payload carries at most one new message per turn. + const messages = await conversation.addIncoming( + payload.message ? [payload.message] : [], + payload.trigger, + turn + ); // Listen for steering messages during streaming const sub = chat.messages.on(async (msg) => { - const lastMsg = msg.messages?.[msg.messages.length - 1]; - if (lastMsg) await conversation.steerAsync(lastMsg); + if (msg.message) await conversation.steerAsync(msg.message); }); const result = streamText({ diff --git a/docs/ai-chat/reference.mdx b/docs/ai-chat/reference.mdx index 147f396bd5e..b305ee22365 100644 --- a/docs/ai-chat/reference.mdx +++ b/docs/ai-chat/reference.mdx @@ -71,7 +71,7 @@ Options for `chat.agent()`. | `exitAfterPreloadIdle` | `boolean` | `false` | Exit run after preload idle timeout instead of suspending. See [exitAfterPreloadIdle](/ai-chat/lifecycle-hooks#exitafterpreloadidle) | | `oomMachine` | `MachinePresetName` | — | Fallback machine when an attempt fails with OOM. Setting it enables a single OOM retry on the larger machine. See [OOM resilience](/ai-chat/patterns/oom-resilience) | -Plus most standard [TaskOptions](/tasks/overview) — `queue`, `machine`, `maxDuration`, **`onWait`**, **`onResume`**, **`onComplete`**, and other lifecycle hooks. Generic `retry` is **not** exposed on `chat.agent`; use `oomMachine` for OOM recovery, or drop down to a raw [`task()`](/ai-chat/backend#raw-task-with-primitives) if you need richer retry semantics. Standard hooks use the same parameter shapes as on a normal `task()` (including `ctx`). +Plus most standard [TaskOptions](/tasks/overview) — `queue`, `machine`, `maxDuration`, **`onWait`**, **`onResume`**, **`onComplete`**, and other lifecycle hooks. Generic `retry` is **not** exposed on `chat.agent`; use `oomMachine` for OOM recovery, or drop down to a raw [`task()`](/ai-chat/custom-agents) if you need richer retry semantics. Standard hooks use the same parameter shapes as on a normal `task()` (including `ctx`). ## Task context (`ctx`) @@ -447,34 +447,40 @@ Return value of `usePendingMessages` hook. See [Pending Messages — Frontend](/ Options for `chat.createSession()`. -| Option | Type | Default | Description | -| ---------------------- | ------------- | -------- | ----------------------------------- | -| `signal` | `AbortSignal` | required | Run-level cancel signal | -| `idleTimeoutInSeconds` | `number` | `30` | Seconds to stay idle between turns | -| `timeout` | `string` | `"1h"` | Duration string for suspend timeout | -| `maxTurns` | `number` | `100` | Max turns before ending | +| Option | Type | Default | Description | +| ---------------------- | --------------------------- | ----------- | ------------------------------------------------------------------------------------------------------------ | +| `signal` | `AbortSignal` | required | Run-level cancel signal | +| `idleTimeoutInSeconds` | `number` | `30` | Seconds to stay idle between turns | +| `timeout` | `string` | `"1h"` | Duration string for suspend timeout | +| `maxTurns` | `number` | `100` | Max turns before ending | +| `compaction` | `ChatAgentCompactionOptions`| `undefined` | Automatic context [compaction](/ai-chat/compaction) — same options as `chat.agent({ compaction })` | +| `pendingMessages` | `PendingMessagesOptions` | `undefined` | Mid-execution [message injection](/ai-chat/pending-messages) — same options as `chat.agent({ pendingMessages })` | ## ChatTurn Each turn yielded by `chat.createSession()`. -| Field | Type | Description | -| -------------- | ---------------- | --------------------------------------------- | -| `number` | `number` | Turn number (0-indexed) | -| `chatId` | `string` | Chat session ID | -| `trigger` | `string` | What triggered this turn | -| `clientData` | `unknown` | Client data from the transport | -| `messages` | `ModelMessage[]` | Full accumulated model messages | -| `uiMessages` | `UIMessage[]` | Full accumulated UI messages | -| `signal` | `AbortSignal` | Combined stop+cancel signal (fresh each turn) | -| `stopped` | `boolean` | Whether the user stopped generation this turn | -| `continuation` | `boolean` | Whether this is a continuation run | - -| Method | Returns | Description | -| ----------------------- | --------------------------------- | ------------------------------------------------------------ | -| `complete(source)` | `Promise` | Pipe, capture, accumulate, cleanup, and signal turn-complete | -| `done()` | `Promise` | Signal turn-complete (when you've piped manually) | -| `addResponse(response)` | `Promise` | Add response to accumulator manually | +| Field | Type | Description | +| ------------------- | --------------------------------- | ---------------------------------------------------------- | +| `number` | `number` | Turn number (0-indexed) | +| `chatId` | `string` | Chat session ID | +| `trigger` | `string` | What triggered this turn | +| `clientData` | `unknown` | Client data from the transport | +| `messages` | `ModelMessage[]` | Full accumulated model messages | +| `uiMessages` | `UIMessage[]` | Full accumulated UI messages | +| `signal` | `AbortSignal` | Combined stop+cancel signal (fresh each turn) | +| `stopped` | `boolean` | Whether the user stopped generation this turn | +| `continuation` | `boolean` | Whether this is a continuation run | +| `previousTurnUsage` | `LanguageModelUsage \| undefined` | Token usage from the previous turn (undefined on turn 0) | +| `totalUsage` | `LanguageModelUsage` | Cumulative token usage across all completed turns | + +| Method | Returns | Description | +| ------------------------ | --------------------------------- | ------------------------------------------------------------------------------------------------------ | +| `complete(source)` | `Promise` | Pipe, capture, accumulate, cleanup, and signal turn-complete | +| `done()` | `Promise` | Signal turn-complete (when you've piped manually) | +| `addResponse(response)` | `Promise` | Add response to accumulator manually | +| `setMessages(uiMessages)`| `Promise` | Replace the accumulated messages (continuation seeding, compaction) | +| `prepareStep()` | `function \| undefined` | `prepareStep` callback wiring compaction + injection — pass to `streamText` when not using `chat.toStreamTextOptions()` | ## chat namespace From eb908b90db4a87f4dc9723de30a18cbdedecdce9 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 12 Jun 2026 12:40:01 +0100 Subject: [PATCH 2/4] docs(ai-chat): add an anatomy entry page and reorder building agents The Building agents group opened with the long How it works mechanics page, which is the wrong first step after the Quick Start. A new Anatomy page now leads the group: the three moving parts (agent task, session, frontend transport), one annotated chat.agent example where each region names the page that covers it, and a routing table for the group. No setup steps, explicitly skippable. How it works moves to the end of the group as the depth payoff, the position peers (Clerk, LangGraph) give their internals pages. Also registers the Custom agents page at the bottom of the group. --- docs/ai-chat/anatomy.mdx | 71 ++++++++++++++++++++++++++++++++++++++++ docs/docs.json | 6 ++-- 2 files changed, 75 insertions(+), 2 deletions(-) create mode 100644 docs/ai-chat/anatomy.mdx diff --git a/docs/ai-chat/anatomy.mdx b/docs/ai-chat/anatomy.mdx new file mode 100644 index 00000000000..8990e19d458 --- /dev/null +++ b/docs/ai-chat/anatomy.mdx @@ -0,0 +1,71 @@ +--- +title: "Anatomy of an agent" +sidebarTitle: "Anatomy" +description: "The moving parts of a chat agent — the agent task, the session, the frontend transport — and which page covers each." +--- + +import RcBanner from "/snippets/ai-chat-rc-banner.mdx"; + + + +**A chat agent is three parts: a long-lived agent task that runs the turn loop, a durable Session carrying messages in and the response stream out, and a frontend transport that plugs the session into `useChat`.** The pages in this section each own one part of that picture. This page is the map — if you'd rather read mechanics end to end, skip to [How it works](/ai-chat/how-it-works). + +```mermaid +flowchart LR + FE["Frontend
useChat + transport"] -- "user messages" --> IN([Session .in]) + IN --> AGENT["Agent task
turn loop + hooks"] + AGENT --> OUT([Session .out]) + OUT -- "streamed response" --> FE +``` + +Everything below maps onto one annotated agent: + +```ts trigger/my-agent.ts +import { chat } from "@trigger.dev/sdk/ai"; +import { streamText, stepCountIs } from "ai"; +import { anthropic } from "@ai-sdk/anthropic"; + +export const myAgent = chat.agent({ + id: "my-agent", + + // Tools declared on the config survive history re-conversion + // across turns — see Tools. + tools: { searchDocs }, + + // Hooks fire around each turn: validation, persistence, + // post-turn work — see Lifecycle hooks. + onTurnComplete: async ({ responseMessage }) => { + await db.messages.save(responseMessage); + }, + + // The turn loop. Messages arrive accumulated; you stream back. + // Options, levels, and alternatives — see Backend. + run: async ({ messages, tools, signal }) => + streamText({ + ...chat.toStreamTextOptions({ tools }), + model: anthropic("claude-sonnet-4-5"), + messages, + abortSignal: signal, + stopWhen: stepCountIs(15), + }), +}); +``` + +The frontend side is one hook — `useTriggerChatTransport` connects `useChat` to the agent's session, no API routes ([Frontend](/ai-chat/frontend)). Underneath, the conversation lives on a [Session](/ai-chat/sessions): a pair of durable streams keyed on your `chatId` that survives refreshes, deploys, and run boundaries. + +## Where each part is covered + +| Part | Page | +| ----------------------------------------------------- | ---------------------------------------------- | +| `chat.agent()` options, the turn loop, piping | [Backend](/ai-chat/backend) | +| Hooks around each turn (`onTurnComplete`, hydration) | [Lifecycle hooks](/ai-chat/lifecycle-hooks) | +| Declaring tools, typed payloads, `toModelOutput` | [Tools](/ai-chat/tools) | +| `useChat` wiring, tokens, starting sessions | [Frontend](/ai-chat/frontend) | +| Driving a chat from your server instead of a browser | [Server-side chat](/ai-chat/server-chat) | +| The durable substrate under every agent | [Sessions](/ai-chat/sessions) | +| Per-run typed state inside the loop | [chat.local](/ai-chat/chat-local) | +| Type-safe payloads, client data, and messages | [Types](/ai-chat/types) | +| Building without the managed lifecycle | [Custom agents](/ai-chat/custom-agents) | +| End-to-end mechanics: what survives a refresh and why | [How it works](/ai-chat/how-it-works) | + +Beyond this section: [Features](/ai-chat/fast-starts) covers opt-in capabilities (Head Start, compaction, steering, actions), and [Patterns](/ai-chat/patterns/sub-agents) covers production recipes (sub-agents, HITL approvals, persistence, recovery). diff --git a/docs/docs.json b/docs/docs.json index ef448f860a4..3e8d020b7d7 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -104,7 +104,7 @@ { "group": "Building agents", "pages": [ - "ai-chat/how-it-works", + "ai-chat/anatomy", "ai-chat/backend", "ai-chat/lifecycle-hooks", "ai-chat/tools", @@ -112,7 +112,9 @@ "ai-chat/server-chat", "ai-chat/sessions", "ai-chat/chat-local", - "ai-chat/types" + "ai-chat/types", + "ai-chat/custom-agents", + "ai-chat/how-it-works" ] }, { From 965b81446b73a0300ca10cfb98e709526447e037 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 12 Jun 2026 13:03:40 +0100 Subject: [PATCH 3/4] docs(ai-chat): keep anchor stubs for moved backend sections Inbound deep links to the sections that moved to the Custom agents page (#chat-createsession, #chat-customagent, #raw-task-with-primitives) now land on the router stub instead of the top of the Backend page. Path-level redirects cannot cover this since fragments never reach the server. --- docs/ai-chat/backend.mdx | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/ai-chat/backend.mdx b/docs/ai-chat/backend.mdx index 626d8c4b7ad..084107d1502 100644 --- a/docs/ai-chat/backend.mdx +++ b/docs/ai-chat/backend.mdx @@ -803,6 +803,11 @@ export const manualChat = task({ --- +{/* Anchor stubs for inbound deep links to the sections that moved to /ai-chat/custom-agents. */} + + + + ## Custom agents Both lower levels — `chat.createSession()` (managed turn iterator, your turn body) and `chat.customAgent()` with raw primitives (hand-rolled loop, full stream-conversion control) — are covered together on the Custom agents page, including the `ChatTurn` surface, the continuation-seeding pattern, and the hand-rolled-loop checklist: From 3415122f80282f8f75fe8d8aabc67729e9a11bd7 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 12 Jun 2026 14:29:00 +0100 Subject: [PATCH 4/4] docs(ai-chat): repoint the tools page customAgent link to custom-agents --- docs/ai-chat/tools.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/ai-chat/tools.mdx b/docs/ai-chat/tools.mdx index fee57fed008..92c3bbc06a7 100644 --- a/docs/ai-chat/tools.mdx +++ b/docs/ai-chat/tools.mdx @@ -168,7 +168,7 @@ This is shorthand for `UIMessage