diff --git a/docs/ai-chat/anatomy.mdx b/docs/ai-chat/anatomy.mdx new file mode 100644 index 0000000000..8990e19d45 --- /dev/null +++ b/docs/ai-chat/anatomy.mdx @@ -0,0 +1,71 @@ +--- +title: "Anatomy of an agent" +sidebarTitle: "Anatomy" +description: "The moving parts of a chat agent — the agent task, the session, the frontend transport — and which page covers each." +--- + +import RcBanner from "/snippets/ai-chat-rc-banner.mdx"; + + + +**A chat agent is three parts: a long-lived agent task that runs the turn loop, a durable Session carrying messages in and the response stream out, and a frontend transport that plugs the session into `useChat`.** The pages in this section each own one part of that picture. This page is the map — if you'd rather read mechanics end to end, skip to [How it works](/ai-chat/how-it-works). + +```mermaid +flowchart LR + FE["Frontend
useChat + transport"] -- "user messages" --> IN([Session .in]) + IN --> AGENT["Agent task
turn loop + hooks"] + AGENT --> OUT([Session .out]) + OUT -- "streamed response" --> FE +``` + +Everything below maps onto one annotated agent: + +```ts trigger/my-agent.ts +import { chat } from "@trigger.dev/sdk/ai"; +import { streamText, stepCountIs } from "ai"; +import { anthropic } from "@ai-sdk/anthropic"; + +export const myAgent = chat.agent({ + id: "my-agent", + + // Tools declared on the config survive history re-conversion + // across turns — see Tools. + tools: { searchDocs }, + + // Hooks fire around each turn: validation, persistence, + // post-turn work — see Lifecycle hooks. + onTurnComplete: async ({ responseMessage }) => { + await db.messages.save(responseMessage); + }, + + // The turn loop. Messages arrive accumulated; you stream back. + // Options, levels, and alternatives — see Backend. + run: async ({ messages, tools, signal }) => + streamText({ + ...chat.toStreamTextOptions({ tools }), + model: anthropic("claude-sonnet-4-5"), + messages, + abortSignal: signal, + stopWhen: stepCountIs(15), + }), +}); +``` + +The frontend side is one hook — `useTriggerChatTransport` connects `useChat` to the agent's session, no API routes ([Frontend](/ai-chat/frontend)). Underneath, the conversation lives on a [Session](/ai-chat/sessions): a pair of durable streams keyed on your `chatId` that survives refreshes, deploys, and run boundaries. + +## Where each part is covered + +| Part | Page | +| ----------------------------------------------------- | ---------------------------------------------- | +| `chat.agent()` options, the turn loop, piping | [Backend](/ai-chat/backend) | +| Hooks around each turn (`onTurnComplete`, hydration) | [Lifecycle hooks](/ai-chat/lifecycle-hooks) | +| Declaring tools, typed payloads, `toModelOutput` | [Tools](/ai-chat/tools) | +| `useChat` wiring, tokens, starting sessions | [Frontend](/ai-chat/frontend) | +| Driving a chat from your server instead of a browser | [Server-side chat](/ai-chat/server-chat) | +| The durable substrate under every agent | [Sessions](/ai-chat/sessions) | +| Per-run typed state inside the loop | [chat.local](/ai-chat/chat-local) | +| Type-safe payloads, client data, and messages | [Types](/ai-chat/types) | +| Building without the managed lifecycle | [Custom agents](/ai-chat/custom-agents) | +| End-to-end mechanics: what survives a refresh and why | [How it works](/ai-chat/how-it-works) | + +Beyond this section: [Features](/ai-chat/fast-starts) covers opt-in capabilities (Head Start, compaction, steering, actions), and [Patterns](/ai-chat/patterns/sub-agents) covers production recipes (sub-agents, HITL approvals, persistence, recovery). diff --git a/docs/ai-chat/backend.mdx b/docs/ai-chat/backend.mdx index ccf86ef03b..084107d150 100644 --- a/docs/ai-chat/backend.mdx +++ b/docs/ai-chat/backend.mdx @@ -8,6 +8,22 @@ import RcBanner from "/snippets/ai-chat-rc-banner.mdx"; +There are three abstraction levels for a chat backend. All three speak the same wire protocol, so the [frontend transport](/ai-chat/frontend) works unchanged whichever you pick. + +| Capability | `chat.agent()` | `chat.createSession()` | Raw primitives | +| ------------------------------------- | -------------- | ------------------------------------------------------------- | -------------- | +| Turn loop, stop signals, accumulation | Managed | Managed | You write it | +| Lifecycle hooks | Yes | No — inline code per turn | No | +| Continuation recovery on new runs | Automatic | [Manual seeding](/ai-chat/custom-agents#continuation-runs-and-history-seeding) | Manual seeding | +| Compaction / steering | Built-in | Built-in | Manual | +| Head Start, actions, tool approvals | Yes | No | No | +| Custom stream conversion | No | Limited | Full control | +| Agent dashboard visibility | Yes | Yes (via `customAgent`) | Yes | + +The raw-primitives column assumes [`chat.customAgent()`](/ai-chat/custom-agents) as the wrapper, which is what makes the task visible to the agent dashboard. + +Start with `chat.agent()`. Drop to `chat.createSession()` when you want to own the per-turn code (model routing, persistence, custom telemetry) without rebuilding the turn loop. Drop to raw primitives only when you need full control over stream conversion or a custom protocol. + ## chat.agent() The highest-level approach. Handles message accumulation, stop signals, turn lifecycle, and auto-piping automatically. @@ -119,7 +135,7 @@ writer.write({ - `chat.response` and the `writer` accumulation behavior work with `chat.agent` and `chat.createSession`. If you're using [`chat.customAgent`](#raw-task-with-primitives), you own the accumulator — see the raw-task example for the manual pattern. + `chat.response` and the `writer` accumulation behavior work with `chat.agent` and `chat.createSession`. If you're using [`chat.customAgent`](/ai-chat/custom-agents), you own the accumulator — see the raw-task example for the manual pattern. ### Raw streaming with `chat.stream` @@ -750,7 +766,7 @@ See [ChatUIMessageStreamOptions](/ai-chat/reference#chatuimessagestreamoptions) `onFinish` is managed internally for response capture and cannot be overridden here. Use `streamText`'s `onFinish` callback for custom finish handling, or use [raw task - mode](#raw-task-with-primitives) for full control over `toUIMessageStream()`. + mode](/ai-chat/custom-agents) for full control over `toUIMessageStream()`. ### Manual mode with task() @@ -787,241 +803,15 @@ export const manualChat = task({ --- -## chat.createSession() - -A middle ground between `chat.agent()` and raw primitives. You get an async iterator that yields `ChatTurn` objects — each turn handles stop signals, message accumulation, and turn-complete signaling automatically. You control initialization, model/tool selection, persistence, and any custom per-turn logic. - -Use `chat.createSession()` inside a standard `task()`: - -```ts -import { task } from "@trigger.dev/sdk"; -import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai"; -import { streamText } from "ai"; -import { anthropic } from "@ai-sdk/anthropic"; - -export const myChat = task({ - id: "my-chat", - run: async (payload: ChatTaskWirePayload, { signal }) => { - // One-time initialization — just code, no hooks - const clientData = payload.metadata as { userId: string }; - await db.chat.create({ data: { id: payload.chatId, userId: clientData.userId } }); - - const session = chat.createSession(payload, { - signal, - idleTimeoutInSeconds: 60, - timeout: "1h", - }); - - for await (const turn of session) { - const result = streamText({ - model: anthropic("claude-sonnet-4-5"), - messages: turn.messages, - abortSignal: turn.signal, - stopWhen: stepCountIs(15), - }); - - // Pipe, capture, accumulate, and signal turn-complete — all in one call - await turn.complete(result); - - // Persist after each turn - await db.chat.update({ - where: { id: turn.chatId }, - data: { messages: turn.uiMessages }, - }); - } - }, -}); -``` - -### ChatSessionOptions - -| Option | Type | Default | Description | -| ---------------------- | ------------- | -------- | ------------------------------------------- | -| `signal` | `AbortSignal` | required | Run-level cancel signal (from task context) | -| `idleTimeoutInSeconds` | `number` | `30` | Seconds to stay idle between turns | -| `timeout` | `string` | `"1h"` | Duration string for suspend timeout | -| `maxTurns` | `number` | `100` | Max turns before ending | - -### ChatTurn - -Each turn yielded by the iterator provides: - -| Field | Type | Description | -| -------------- | ---------------- | ------------------------------------------------------ | -| `number` | `number` | Turn number (0-indexed) | -| `chatId` | `string` | Chat session ID | -| `trigger` | `string` | What triggered this turn | -| `clientData` | `unknown` | Client data from the transport | -| `messages` | `ModelMessage[]` | Full accumulated model messages — pass to `streamText` | -| `uiMessages` | `UIMessage[]` | Full accumulated UI messages — use for persistence | -| `signal` | `AbortSignal` | Combined stop+cancel signal (fresh each turn) | -| `stopped` | `boolean` | Whether the user stopped generation this turn | -| `continuation` | `boolean` | Whether this is a continuation run | - -| Method | Description | -| ---------------------------- | ------------------------------------------------------------------- | -| `turn.complete(source)` | Pipe stream, capture response, accumulate, and signal turn-complete | -| `turn.done()` | Just signal turn-complete (when you've piped manually) | -| `turn.addResponse(response)` | Add a response to the accumulator manually | - -### turn.complete() vs manual control - -`turn.complete(result)` is the easy path — it handles piping, capturing the response, accumulating messages, cleaning up aborted parts, and writing the turn-complete chunk. - -For more control, you can do each step manually: - -```ts -for await (const turn of session) { - const result = streamText({ - model: anthropic("claude-sonnet-4-5"), - messages: turn.messages, - abortSignal: turn.signal, - stopWhen: stepCountIs(15), - }); - - // Manual: pipe and capture separately - const response = await chat.pipeAndCapture(result, { signal: turn.signal }); - - if (response) { - // Custom processing before accumulating - await turn.addResponse(response); - } - - // Custom persistence, analytics, etc. - await db.chat.update({ ... }); - - // Must call done() when not using complete() - await turn.done(); -} -``` - ---- - -## Raw task with primitives - -For full control, use a standard `task()` with the composable primitives from the `chat` namespace. You manage everything: the turn loop, stop signals, message accumulation, and turn-complete signaling. - -Raw task mode also lets you call `.toUIMessageStream()` yourself with any options — including `onFinish` and `originalMessages`. This is the right choice when you need complete control over the stream conversion beyond what `chat.setUIMessageStreamOptions()` provides. - -### Primitives - -| Primitive | Description | -| ------------------------------- | ------------------------------------------------------------------------------------------- | -| `chat.messages` | Input stream for incoming messages — use `.waitWithIdleTimeout()` to wait for the next turn | -| `chat.createStopSignal()` | Create a managed stop signal wired to the stop input stream | -| `chat.pipeAndCapture(result)` | Pipe a `StreamTextResult` to the chat stream and capture the response | -| `chat.writeTurnComplete()` | Signal the frontend that the current turn is complete | -| `chat.MessageAccumulator` | Accumulates conversation messages across turns | -| `chat.pipe(stream)` | Pipe a stream to the frontend (no response capture) | -| `chat.cleanupAbortedParts(msg)` | Clean up incomplete parts from a stopped response | - -### Example - -```ts -import { task } from "@trigger.dev/sdk"; -import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai"; -import { streamText } from "ai"; -import { anthropic } from "@ai-sdk/anthropic"; - -export const myChat = task({ - id: "my-chat-raw", - run: async (payload: ChatTaskWirePayload, { signal: runSignal }) => { - let currentPayload = payload; - - // Handle preload — wait for the first real message - if (currentPayload.trigger === "preload") { - const result = await chat.messages.waitWithIdleTimeout({ - idleTimeoutInSeconds: 60, - timeout: "1h", - spanName: "waiting for first message", - }); - if (!result.ok) return; - currentPayload = result.output; - } - - const stop = chat.createStopSignal(); - const conversation = new chat.MessageAccumulator(); - - for (let turn = 0; turn < 100; turn++) { - stop.reset(); - - const messages = await conversation.addIncoming( - currentPayload.messages, - currentPayload.trigger, - turn - ); - - const combinedSignal = AbortSignal.any([runSignal, stop.signal]); - - const result = streamText({ - model: anthropic("claude-sonnet-4-5"), - messages, - abortSignal: combinedSignal, - stopWhen: stepCountIs(15), - }); - - let response; - try { - response = await chat.pipeAndCapture(result, { signal: combinedSignal }); - } catch (error) { - if (error instanceof Error && error.name === "AbortError") { - if (runSignal.aborted) break; - // Stop — fall through to accumulate partial - } else { - throw error; - } - } - - if (response) { - const cleaned = - stop.signal.aborted && !runSignal.aborted ? chat.cleanupAbortedParts(response) : response; - await conversation.addResponse(cleaned); - } - - if (runSignal.aborted) break; +{/* Anchor stubs for inbound deep links to the sections that moved to /ai-chat/custom-agents. */} + + + - // Persist, analytics, etc. - await db.chat.update({ - where: { id: currentPayload.chatId }, - data: { messages: conversation.uiMessages }, - }); +## Custom agents - await chat.writeTurnComplete(); +Both lower levels — `chat.createSession()` (managed turn iterator, your turn body) and `chat.customAgent()` with raw primitives (hand-rolled loop, full stream-conversion control) — are covered together on the Custom agents page, including the `ChatTurn` surface, the continuation-seeding pattern, and the hand-rolled-loop checklist: - // Wait for the next message - const next = await chat.messages.waitWithIdleTimeout({ - idleTimeoutInSeconds: 60, - timeout: "1h", - spanName: "waiting for next message", - }); - if (!next.ok) break; - currentPayload = next.output; - } - - stop.cleanup(); - }, -}); -``` - -### MessageAccumulator - -The `MessageAccumulator` handles the transport protocol automatically: - -- Turn 0: replaces messages (full history from frontend) -- Subsequent turns: appends new messages (frontend only sends the new user message) -- Regenerate: replaces messages (full history minus last assistant message) - -```ts -const conversation = new chat.MessageAccumulator(); - -// Returns full accumulated ModelMessage[] for streamText -const messages = await conversation.addIncoming(payload.messages, payload.trigger, turn); - -// After piping, add the response -const response = await chat.pipeAndCapture(result); -if (response) await conversation.addResponse(response); - -// Access accumulated messages for persistence -conversation.uiMessages; // UIMessage[] -conversation.modelMessages; // ModelMessage[] -``` + + Build agents without the managed lifecycle — createSession or raw primitives. + diff --git a/docs/ai-chat/custom-agents.mdx b/docs/ai-chat/custom-agents.mdx new file mode 100644 index 0000000000..54c0461a75 --- /dev/null +++ b/docs/ai-chat/custom-agents.mdx @@ -0,0 +1,363 @@ +--- +title: "Custom agents" +sidebarTitle: "Custom agents" +description: "Build chat agents without chat.agent()'s managed lifecycle: register with chat.customAgent(), then drive turns with the createSession iterator or a hand-rolled loop." +--- + +import RcBanner from "/snippets/ai-chat-rc-banner.mdx"; + + + +**A custom agent is a task you register with `chat.customAgent()` and drive yourself — either with the managed turn iterator from `chat.createSession()`, or with a fully hand-rolled loop over the raw chat primitives.** You give up `chat.agent()`'s lifecycle hooks and automatic continuation recovery; you gain inline control over every turn, and (at the lowest level) full control over the stream conversion. + +See the [comparison table](/ai-chat/backend) before dropping down. The frontend is unchanged either way: all levels speak the same wire protocol, so [`useTriggerChatTransport`](/ai-chat/frontend) points at a custom agent exactly like a `chat.agent()`. + +## chat.customAgent() + +`chat.customAgent()` is a thin wrapper around `task()` that does two things: it registers the task as an agent (so it appears in the agent dashboard, the playground, and the MCP server's `list_agents`), and it binds the run to its backing [Session](/ai-chat/sessions) so the `chat.*` primitives resolve to the right `.in`/`.out` channels. There is no managed lifecycle — no turn loop, no hooks, no preload handling. + +A plain `task()` works with the same primitives but stays invisible to the agent surfaces, so prefer `customAgent` unless you specifically don't want the task listed as an agent. + +Inside the wrapper, pick one of two loop styles: + +- **[Managed loop](#managed-loop-chatcreatesession)** — `chat.createSession()` yields turns; the SDK handles stop signals, accumulation, idle suspend/resume, and turn-complete signaling. You write the turn body. +- **[Hand-rolled loop](#hand-rolled-loop-with-primitives)** — you write the loop itself with `chat.messages`, `MessageAccumulator`, `pipeAndCapture`, and `writeTurnComplete`. The right choice when you need complete control over `.toUIMessageStream()` (e.g. `onFinish`, `originalMessages`) beyond what `chat.setUIMessageStreamOptions()` provides, or you're implementing a custom protocol. + +## Managed loop: chat.createSession() + +`chat.createSession()` gives you an async iterator of `ChatTurn` objects. Each turn arrives with the accumulated history, a combined stop+cancel signal, and helpers to finish the turn: + +```ts trigger/my-chat.ts +import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai"; +import { streamText, stepCountIs } from "ai"; +import { anthropic } from "@ai-sdk/anthropic"; + +export const myChat = chat.customAgent({ + id: "my-chat", + run: async (payload: ChatTaskWirePayload, { signal }) => { + // One-time initialization — plain code, no hooks. Upsert, not create: + // continuation runs boot with the row already in place. + const clientData = payload.metadata as { userId: string }; + await db.chat.upsert({ + where: { id: payload.chatId }, + create: { id: payload.chatId, userId: clientData.userId }, + update: {}, + }); + + const session = chat.createSession(payload, { + signal, + idleTimeoutInSeconds: 60, + timeout: "1h", + }); + + for await (const turn of session) { + // Persist the incoming user message BEFORE streaming — this is your + // onTurnStart equivalent. Without it, a page reload mid-stream + // restores the assistant text (replayed from the session) but loses + // the user message that prompted it. + await db.chat.update({ + where: { id: turn.chatId }, + data: { messages: turn.uiMessages }, + }); + + const result = streamText({ + model: anthropic("claude-sonnet-4-5"), + messages: turn.messages, + abortSignal: turn.signal, + stopWhen: stepCountIs(15), + }); + + // Pipe, capture, accumulate, and signal turn-complete — all in one call + await turn.complete(result); + + // Persist the full exchange after the turn — your onTurnComplete equivalent + await db.chat.update({ + where: { id: turn.chatId }, + data: { messages: turn.uiMessages }, + }); + } + }, +}); +``` + + + If you pass `compaction` or `pendingMessages` to `chat.createSession()`, you must also pass `prepareStep: turn.prepareStep()` to `streamText` (or spread `chat.toStreamTextOptions()`, which wires it automatically). Without it, both features silently no-op. + + +### ChatSessionOptions + +| Option | Type | Default | Description | +| ---------------------- | ---------------------------- | ----------- | -------------------------------------------------------------------------------------------------- | +| `signal` | `AbortSignal` | required | Run-level cancel signal (from task context) | +| `idleTimeoutInSeconds` | `number` | `30` | Seconds to stay idle between turns before suspending | +| `timeout` | `string` | `"1h"` | Duration string for suspend timeout | +| `maxTurns` | `number` | `100` | Max turns before ending | +| `compaction` | `ChatAgentCompactionOptions` | `undefined` | Automatic context [compaction](/ai-chat/compaction) — same options as on `chat.agent()` | +| `pendingMessages` | `PendingMessagesOptions` | `undefined` | Mid-execution [message injection](/ai-chat/pending-messages) — same options as on `chat.agent()` | + +Between turns the run idles on `waitWithIdleTimeout`: after `idleTimeoutInSeconds` with no message it suspends (compute is freed), and the next message restores it on the same run — the same warm/suspended pipeline `chat.agent()` uses. + +### ChatTurn + +Each turn yielded by the iterator provides: + +| Field | Type | Description | +| ------------------- | --------------------------------- | -------------------------------------------------------- | +| `number` | `number` | Turn number (0-indexed) | +| `chatId` | `string` | Chat session ID | +| `trigger` | `string` | What triggered this turn | +| `clientData` | `unknown` | Client data from the transport | +| `messages` | `ModelMessage[]` | Full accumulated model messages — pass to `streamText` | +| `uiMessages` | `UIMessage[]` | Full accumulated UI messages — use for persistence | +| `signal` | `AbortSignal` | Combined stop+cancel signal (fresh each turn) | +| `stopped` | `boolean` | Whether the user stopped generation this turn | +| `continuation` | `boolean` | Whether this is a continuation run | +| `previousTurnUsage` | `LanguageModelUsage \| undefined` | Token usage from the previous turn (undefined on turn 0) | +| `totalUsage` | `LanguageModelUsage` | Cumulative token usage across all completed turns | + +| Method | Description | +| ----------------------------- | ---------------------------------------------------------------------------------------------------------- | +| `turn.complete(source)` | Pipe stream, capture response, accumulate, and signal turn-complete | +| `turn.done()` | Signal turn-complete only (when you have piped manually) | +| `turn.addResponse(response)` | Add a response to the accumulator manually | +| `turn.setMessages(uiMessages)`| Replace the accumulated messages — continuation seeding and on-demand compaction | +| `turn.prepareStep()` | `prepareStep` callback wiring compaction + injection — pass to `streamText` when not spreading `chat.toStreamTextOptions()` | + +### Continuation runs and history seeding + +`chat.agent()` rebuilds conversation history automatically when a chat continues on a fresh run (after a cancel, crash, version upgrade, or TTL expiry) — via its snapshot/replay boot or your `hydrateMessages` hook. Custom agents do none of that: a continuation run starts with an **empty accumulator**, and history restoration is your job. + +With `createSession`, check `turn.continuation` on the first turn and seed from your store with `turn.setMessages()`: + +```ts +for await (const turn of session) { + if (turn.continuation && turn.number === 0) { + const row = await db.chat.findUnique({ where: { id: turn.chatId } }); + const stored = (row?.messages ?? []) as UIMessage[]; + if (stored.length > 0) { + // Keep any incoming message that isn't already persisted + const incoming = turn.uiMessages.filter((m) => !stored.some((s) => s.id === m.id)); + await turn.setMessages([...stored, ...incoming]); + } + } + + // ... streamText + turn.complete as usual +} +``` + +Without this, a resumed chat silently loses its history: the model sees only the message that triggered the continuation. In a hand-rolled loop, seed by passing the stored history into the turn-0 `addIncoming` call — shown in the example below. + +### turn.complete() vs manual control + +`turn.complete(result)` is the one-call path — it handles piping, capturing the response, accumulating messages, cleaning up aborted parts on a stop, and writing the turn-complete chunk. + +For more control, you can do each step manually: + +```ts +for await (const turn of session) { + const result = streamText({ + model: anthropic("claude-sonnet-4-5"), + messages: turn.messages, + abortSignal: turn.signal, + stopWhen: stepCountIs(15), + }); + + // Manual: pipe and capture separately + const response = await chat.pipeAndCapture(result, { signal: turn.signal }); + + if (response) { + // Custom processing before accumulating + await turn.addResponse(response); + } + + // Custom persistence, analytics, etc. + await db.chat.update({ ... }); + + // Must call done() when not using complete() + await turn.done(); +} +``` + +## Hand-rolled loop with primitives + +For full control, skip `createSession` and compose the primitives directly: + +| Primitive | Description | +| ------------------------------- | -------------------------------------------------------------------------------------------- | +| `chat.messages` | Input stream for incoming messages — use `.waitWithIdleTimeout()` to wait for the next turn | +| `chat.createStopSignal()` | Create a managed stop signal wired to the stop input stream | +| `chat.pipeAndCapture(result)` | Pipe a `StreamTextResult` to the chat stream and capture the response | +| `chat.writeTurnComplete()` | Signal the frontend that the current turn is complete | +| `chat.MessageAccumulator` | Accumulates conversation messages across turns | +| `chat.pipe(stream)` | Pipe a stream to the frontend (no response capture) | +| `chat.cleanupAbortedParts(msg)` | Clean up incomplete parts from a stopped response | + +A complete loop: + +```ts trigger/my-chat-raw.ts +import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai"; +import { streamText, stepCountIs } from "ai"; +import { anthropic } from "@ai-sdk/anthropic"; + +export const myChat = chat.customAgent({ + id: "my-chat-raw", + run: async (payload: ChatTaskWirePayload, { signal: runSignal }) => { + let currentPayload = payload; + + // Handle preload — wait for the first real message + if (currentPayload.trigger === "preload") { + const result = await chat.messages.waitWithIdleTimeout({ + idleTimeoutInSeconds: 60, + timeout: "1h", + spanName: "waiting for first message", + }); + if (!result.ok) return; + currentPayload = result.output; + } + + const stop = chat.createStopSignal(); + const conversation = new chat.MessageAccumulator(); + + // Continuation runs (cancel, crash, upgrade) start with an empty + // accumulator — fetch stored history so turn 0 can seed it. + let continuationSeed: UIMessage[] = []; + if (currentPayload.continuation) { + const row = await db.chat.findUnique({ where: { id: currentPayload.chatId } }); + continuationSeed = (row?.messages ?? []) as UIMessage[]; + } + + for (let turn = 0; turn < 100; turn++) { + stop.reset(); + + // The wire payload carries at most one new message per turn. Turn 0 + // REPLACES the accumulator, so seed stored history through + // addIncoming together with the incoming message — a setMessages + // call before the loop would be wiped here. + const incoming = currentPayload.message ? [currentPayload.message] : []; + const turnInput = + turn === 0 && continuationSeed.length > 0 + ? [...continuationSeed.filter((s) => !incoming.some((m) => m.id === s.id)), ...incoming] + : incoming; + const messages = await conversation.addIncoming(turnInput, currentPayload.trigger, turn); + + // Persist the incoming user message before streaming so a + // mid-stream reload doesn't lose it. + await db.chat.update({ + where: { id: currentPayload.chatId }, + data: { messages: conversation.uiMessages }, + }); + + const combinedSignal = AbortSignal.any([runSignal, stop.signal]); + + const result = streamText({ + model: anthropic("claude-sonnet-4-5"), + messages, + abortSignal: combinedSignal, + stopWhen: stepCountIs(15), + }); + + let response; + try { + response = await chat.pipeAndCapture(result, { signal: combinedSignal }); + } catch (error) { + if (error instanceof Error && error.name === "AbortError") { + if (runSignal.aborted) break; + // Stop — fall through to accumulate partial + } else { + throw error; + } + } + + if (response) { + const cleaned = + stop.signal.aborted && !runSignal.aborted ? chat.cleanupAbortedParts(response) : response; + await conversation.addResponse(cleaned); + } + + if (runSignal.aborted) break; + + // Persist, analytics, etc. + await db.chat.update({ + where: { id: currentPayload.chatId }, + data: { messages: conversation.uiMessages }, + }); + + await chat.writeTurnComplete(); + + // Wait for the next message + const next = await chat.messages.waitWithIdleTimeout({ + idleTimeoutInSeconds: 60, + timeout: "1h", + spanName: "waiting for next message", + }); + if (!next.ok) break; + currentPayload = next.output; + } + + stop.cleanup(); + }, +}); +``` + +### MessageAccumulator + +`addIncoming(messages, trigger, turn)` has two modes: + +- **Turn 0 or `trigger === "regenerate-message"`: replaces** the accumulator with exactly what you pass. This is why continuation seeding goes through `addIncoming` (above), and why a regenerate needs you to slice your own history — the wire omits the message on regenerate, so pass the stored history minus the last assistant message. +- **Every other turn: appends** what you pass (the wire carries at most the one new user message). + +```ts +const conversation = new chat.MessageAccumulator(); + +// Returns full accumulated ModelMessage[] for streamText +const messages = await conversation.addIncoming( + payload.message ? [payload.message] : [], + payload.trigger, + turn +); + +// After piping, add the response +const response = await chat.pipeAndCapture(result); +if (response) await conversation.addResponse(response); + +// Access accumulated messages for persistence +conversation.uiMessages; // UIMessage[] +conversation.modelMessages; // ModelMessage[] +``` + +The constructor also accepts `compaction` and `pendingMessages` options (same shapes as on `chat.agent()`); pass `prepareStep: conversation.prepareStep()` to `streamText` to activate them. See [pending messages](/ai-chat/pending-messages#backend-messageaccumulator-raw-task) for the manual steering wiring. + +### Hand-rolled loop checklist + +Things the managed levels do for you that a raw loop has to get right: + +- **Don't bare-await `result.totalUsage`.** On a stop-abort the AI SDK's `totalUsage` promise never settles, which wedges the loop forever. Race it with a timeout: + + ```ts + const turnUsage = await Promise.race([ + result.totalUsage, + new Promise((resolve) => setTimeout(() => resolve(undefined), 2000)), + ]); + ``` + +- **Persist the user message before streaming** (shown in the example above). The session replay restores the assistant's streamed text after a page reload, but nothing restores a user message you haven't written down. +- **Seed history on continuation runs through the turn-0 `addIncoming`** (shown above). `payload.continuation` is `true` when this run picked up an existing chat; the accumulator starts empty — and because turn 0 replaces the accumulator, a `setMessages` call before the loop gets wiped. +- **Clean up aborted parts on a stop** with `chat.cleanupAbortedParts()` before accumulating, or the partial response carries half-open tool calls into the next turn's prompt. +- **Read `payload.message` (singular).** The wire payload carries at most one new message per turn; there is no `messages` array on the payload. + +## Next steps + + + + The three abstraction levels compared, and everything chat.agent() adds on top. + + + The durable stream pair every agent — managed or custom — is built on. + + + Automatic context compression — works with createSession and MessageAccumulator. + + + The wire format your loop is speaking, chunk by chunk. + + diff --git a/docs/ai-chat/how-it-works.mdx b/docs/ai-chat/how-it-works.mdx index ecde885f4a..7c3d182f8b 100644 --- a/docs/ai-chat/how-it-works.mdx +++ b/docs/ai-chat/how-it-works.mdx @@ -179,7 +179,7 @@ See [Lifecycle hooks](/ai-chat/lifecycle-hooks) for the full signatures and firi **Not a good fit**: - Single-shot completions where you don't need durability or resume. Call your model directly. -- Workflows where you control both ends and want a custom protocol. Use a [raw `task()` with chat primitives](/ai-chat/backend#raw-task-with-primitives) directly without the `chat.agent` wrapper. +- Workflows where you control both ends and want a custom protocol. Use a [raw `task()` with chat primitives](/ai-chat/custom-agents) directly without the `chat.agent` wrapper. - High-fanout broadcasting (one source, many subscribers). Use Trigger.dev realtime streams against a regular task instead. ## Putting it together diff --git a/docs/ai-chat/mcp.mdx b/docs/ai-chat/mcp.mdx index 63c0d8ece0..0c9f0019a0 100644 --- a/docs/ai-chat/mcp.mdx +++ b/docs/ai-chat/mcp.mdx @@ -25,7 +25,7 @@ See the [MCP Tools Reference](/mcp-tools#agent-chat-tools) for full details on e - Ask your AI assistant to list agents in your project. This calls `list_agents` which returns all tasks created with [`chat.agent()`](/ai-chat/backend#chat-agent) or [`chat.customAgent()`](/ai-chat/backend#raw-task-with-primitives). + Ask your AI assistant to list agents in your project. This calls `list_agents` which returns all tasks created with [`chat.agent()`](/ai-chat/backend#chat-agent) or [`chat.customAgent()`](/ai-chat/custom-agents). Start a conversation with an agent using `start_agent_chat`. This triggers a run and optionally preloads the agent so it's ready to respond immediately. @@ -78,7 +78,7 @@ If you haven't set up the MCP server yet, see the [MCP Server introduction](/mcp Agent chat tools require: - A running dev server (`trigger dev`) or a deployed worker -- At least one agent defined with [`chat.agent()`](/ai-chat/backend#chat-agent) or [`chat.customAgent()`](/ai-chat/backend#raw-task-with-primitives) +- At least one agent defined with [`chat.agent()`](/ai-chat/backend#chat-agent) or [`chat.customAgent()`](/ai-chat/custom-agents) ## How it works diff --git a/docs/ai-chat/patterns/oom-resilience.mdx b/docs/ai-chat/patterns/oom-resilience.mdx index 097fe796cf..f7dc86de0b 100644 --- a/docs/ai-chat/patterns/oom-resilience.mdx +++ b/docs/ai-chat/patterns/oom-resilience.mdx @@ -32,7 +32,7 @@ That's the entire opt-in. With `oomMachine` set, the agent gets: - **`retry.outOfMemory.machine: oomMachine`** — the fresh attempt boots on the larger machine. - **`session.in` cursor recovery** — the new attempt skips records belonging to turns that already completed on the prior attempt and only re-runs the OOM'd turn. -`chat.agent` does not expose generic `retry` options. OOM recovery is the only retry path because retrying an LLM-driven loop on non-OOM errors tends to be expensive and side-effecting. Drop down to a [raw `task()` with chat primitives](/ai-chat/backend#raw-task-with-primitives) if you need richer retry semantics. +`chat.agent` does not expose generic `retry` options. OOM recovery is the only retry path because retrying an LLM-driven loop on non-OOM errors tends to be expensive and side-effecting. Drop down to a [raw `task()` with chat primitives](/ai-chat/custom-agents) if you need richer retry semantics. ## How recovery works @@ -109,7 +109,7 @@ export const sendEmail = tool({ ## Limitations - **One OOM retry per run.** `chat.agent` sets `maxAttempts: 2`. If attempt 2 also OOMs, the run fails. Use a sufficiently large `oomMachine` to avoid this. -- **Single fallback tier.** Only one `oomMachine`. There's no "tiered retry" (small → medium → large). If you need that, drop down to a [raw `task()` with chat primitives](/ai-chat/backend#raw-task-with-primitives) and configure `retry` directly. +- **Single fallback tier.** Only one `oomMachine`. There's no "tiered retry" (small → medium → large). If you need that, drop down to a [raw `task()` with chat primitives](/ai-chat/custom-agents) and configure `retry` directly. - **Non-OOM errors don't retry.** Schema errors, model-call rejections, tool throws, etc. fail the run as before. Out-of-memory is the only retry trigger. - **Tools mid-execution are not checkpointed.** A partially-run tool re-runs from scratch on the new attempt. Make them idempotent. diff --git a/docs/ai-chat/pending-messages.mdx b/docs/ai-chat/pending-messages.mdx index 80dbdaab2e..20ab098b9a 100644 --- a/docs/ai-chat/pending-messages.mdx +++ b/docs/ai-chat/pending-messages.mdx @@ -162,12 +162,16 @@ const conversation = new chat.MessageAccumulator({ }); for (let turn = 0; turn < 100; turn++) { - const messages = await conversation.addIncoming(payload.messages, payload.trigger, turn); + // The wire payload carries at most one new message per turn. + const messages = await conversation.addIncoming( + payload.message ? [payload.message] : [], + payload.trigger, + turn + ); // Listen for steering messages during streaming const sub = chat.messages.on(async (msg) => { - const lastMsg = msg.messages?.[msg.messages.length - 1]; - if (lastMsg) await conversation.steerAsync(lastMsg); + if (msg.message) await conversation.steerAsync(msg.message); }); const result = streamText({ diff --git a/docs/ai-chat/reference.mdx b/docs/ai-chat/reference.mdx index 147f396bd5..b305ee2236 100644 --- a/docs/ai-chat/reference.mdx +++ b/docs/ai-chat/reference.mdx @@ -71,7 +71,7 @@ Options for `chat.agent()`. | `exitAfterPreloadIdle` | `boolean` | `false` | Exit run after preload idle timeout instead of suspending. See [exitAfterPreloadIdle](/ai-chat/lifecycle-hooks#exitafterpreloadidle) | | `oomMachine` | `MachinePresetName` | — | Fallback machine when an attempt fails with OOM. Setting it enables a single OOM retry on the larger machine. See [OOM resilience](/ai-chat/patterns/oom-resilience) | -Plus most standard [TaskOptions](/tasks/overview) — `queue`, `machine`, `maxDuration`, **`onWait`**, **`onResume`**, **`onComplete`**, and other lifecycle hooks. Generic `retry` is **not** exposed on `chat.agent`; use `oomMachine` for OOM recovery, or drop down to a raw [`task()`](/ai-chat/backend#raw-task-with-primitives) if you need richer retry semantics. Standard hooks use the same parameter shapes as on a normal `task()` (including `ctx`). +Plus most standard [TaskOptions](/tasks/overview) — `queue`, `machine`, `maxDuration`, **`onWait`**, **`onResume`**, **`onComplete`**, and other lifecycle hooks. Generic `retry` is **not** exposed on `chat.agent`; use `oomMachine` for OOM recovery, or drop down to a raw [`task()`](/ai-chat/custom-agents) if you need richer retry semantics. Standard hooks use the same parameter shapes as on a normal `task()` (including `ctx`). ## Task context (`ctx`) @@ -447,34 +447,40 @@ Return value of `usePendingMessages` hook. See [Pending Messages — Frontend](/ Options for `chat.createSession()`. -| Option | Type | Default | Description | -| ---------------------- | ------------- | -------- | ----------------------------------- | -| `signal` | `AbortSignal` | required | Run-level cancel signal | -| `idleTimeoutInSeconds` | `number` | `30` | Seconds to stay idle between turns | -| `timeout` | `string` | `"1h"` | Duration string for suspend timeout | -| `maxTurns` | `number` | `100` | Max turns before ending | +| Option | Type | Default | Description | +| ---------------------- | --------------------------- | ----------- | ------------------------------------------------------------------------------------------------------------ | +| `signal` | `AbortSignal` | required | Run-level cancel signal | +| `idleTimeoutInSeconds` | `number` | `30` | Seconds to stay idle between turns | +| `timeout` | `string` | `"1h"` | Duration string for suspend timeout | +| `maxTurns` | `number` | `100` | Max turns before ending | +| `compaction` | `ChatAgentCompactionOptions`| `undefined` | Automatic context [compaction](/ai-chat/compaction) — same options as `chat.agent({ compaction })` | +| `pendingMessages` | `PendingMessagesOptions` | `undefined` | Mid-execution [message injection](/ai-chat/pending-messages) — same options as `chat.agent({ pendingMessages })` | ## ChatTurn Each turn yielded by `chat.createSession()`. -| Field | Type | Description | -| -------------- | ---------------- | --------------------------------------------- | -| `number` | `number` | Turn number (0-indexed) | -| `chatId` | `string` | Chat session ID | -| `trigger` | `string` | What triggered this turn | -| `clientData` | `unknown` | Client data from the transport | -| `messages` | `ModelMessage[]` | Full accumulated model messages | -| `uiMessages` | `UIMessage[]` | Full accumulated UI messages | -| `signal` | `AbortSignal` | Combined stop+cancel signal (fresh each turn) | -| `stopped` | `boolean` | Whether the user stopped generation this turn | -| `continuation` | `boolean` | Whether this is a continuation run | - -| Method | Returns | Description | -| ----------------------- | --------------------------------- | ------------------------------------------------------------ | -| `complete(source)` | `Promise` | Pipe, capture, accumulate, cleanup, and signal turn-complete | -| `done()` | `Promise` | Signal turn-complete (when you've piped manually) | -| `addResponse(response)` | `Promise` | Add response to accumulator manually | +| Field | Type | Description | +| ------------------- | --------------------------------- | ---------------------------------------------------------- | +| `number` | `number` | Turn number (0-indexed) | +| `chatId` | `string` | Chat session ID | +| `trigger` | `string` | What triggered this turn | +| `clientData` | `unknown` | Client data from the transport | +| `messages` | `ModelMessage[]` | Full accumulated model messages | +| `uiMessages` | `UIMessage[]` | Full accumulated UI messages | +| `signal` | `AbortSignal` | Combined stop+cancel signal (fresh each turn) | +| `stopped` | `boolean` | Whether the user stopped generation this turn | +| `continuation` | `boolean` | Whether this is a continuation run | +| `previousTurnUsage` | `LanguageModelUsage \| undefined` | Token usage from the previous turn (undefined on turn 0) | +| `totalUsage` | `LanguageModelUsage` | Cumulative token usage across all completed turns | + +| Method | Returns | Description | +| ------------------------ | --------------------------------- | ------------------------------------------------------------------------------------------------------ | +| `complete(source)` | `Promise` | Pipe, capture, accumulate, cleanup, and signal turn-complete | +| `done()` | `Promise` | Signal turn-complete (when you've piped manually) | +| `addResponse(response)` | `Promise` | Add response to accumulator manually | +| `setMessages(uiMessages)`| `Promise` | Replace the accumulated messages (continuation seeding, compaction) | +| `prepareStep()` | `function \| undefined` | `prepareStep` callback wiring compaction + injection — pass to `streamText` when not using `chat.toStreamTextOptions()` | ## chat namespace diff --git a/docs/ai-chat/tools.mdx b/docs/ai-chat/tools.mdx index fee57fed00..92c3bbc06a 100644 --- a/docs/ai-chat/tools.mdx +++ b/docs/ai-chat/tools.mdx @@ -168,7 +168,7 @@ This is shorthand for `UIMessage