From 4d822fc452849c6c0da7c8afe1afd9319a2e842c Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 12 Jun 2026 12:42:21 +0100 Subject: [PATCH] feat(ai-chat): harden the customAgent variants and add a head-start hydrate route aiChatSession and aiChatRaw now cover the patterns the managed lifecycle handles for chat.agent: history seeding on continuation boots (through the turn-0 addIncoming for the raw loop, since turn 0 replaces the accumulator), persisting the incoming user message before streaming so a mid-stream reload keeps it, and racing totalUsage after a stop so the loop cannot wedge. Also adds the /api/chat-hydrated head-start route and a taskMode-aware headStart URL in the chat view, giving the hydrate agent head-start coverage, and threads tools through aiChatHydrated with an upsert in its hydrate hook (head-start first turns run before the row exists). --- .../src/app/api/chat-hydrated/route.ts | 34 ++++++++++ projects/ai-chat/src/components/chat-view.tsx | 8 ++- projects/ai-chat/src/trigger/chat.ts | 68 ++++++++++++++++--- 3 files changed, 99 insertions(+), 11 deletions(-) create mode 100644 projects/ai-chat/src/app/api/chat-hydrated/route.ts diff --git a/projects/ai-chat/src/app/api/chat-hydrated/route.ts b/projects/ai-chat/src/app/api/chat-hydrated/route.ts new file mode 100644 index 0000000..a2ec220 --- /dev/null +++ b/projects/ai-chat/src/app/api/chat-hydrated/route.ts @@ -0,0 +1,34 @@ +/** + * chat.headStart first-turn endpoint for the `ai-chat-hydrated` agent. + * + * Same shape as `/api/chat` (see the header comment there for the + * full head-start mechanics) but handing over to the hydrateMessages + * agent. This is the smoke-test surface for the headStart × + * hydrateMessages combination: the agent owns history via its DB-backed + * `hydrateMessages` hook, and the turn-0 handover splice must still + * deliver the warm handler's step-1 partial (text, tool calls, stable + * messageId) into the agent's accumulator. + */ +import { chat } from "@trigger.dev/sdk/chat-server"; +import { streamText } from "ai"; +import { anthropic } from "@ai-sdk/anthropic"; +// ⚠️ Imports MUST come from `chat-tools-schemas` only — see the +// header comment in that file for the bundle-isolation rationale. +import { headStartTools } from "@/lib/chat-tools-schemas"; + +export const POST = chat.headStart({ + agentId: "ai-chat-hydrated", + run: async ({ chat: chatHelper }) => { + return streamText({ + ...chatHelper.toStreamTextOptions({ tools: headStartTools }), + model: anthropic("claude-sonnet-4-6"), + system: + "You are a helpful AI assistant. Be concise and friendly. Use the available tools when relevant.", + // Extended thinking so head-start smoke tests cover reasoning + // parts surviving the handover into durable history. + providerOptions: { + anthropic: { thinking: { type: "enabled", budgetTokens: 2048 } }, + }, + }); + }, +}); diff --git a/projects/ai-chat/src/components/chat-view.tsx b/projects/ai-chat/src/components/chat-view.tsx index 5c1d522..94751df 100644 --- a/projects/ai-chat/src/components/chat-view.tsx +++ b/projects/ai-chat/src/components/chat-view.tsx @@ -75,7 +75,13 @@ export function ChatView({ // session state from response headers and writes directly to // `session.in` for turn 2 onward — same direct-trigger path as // when `headStart` is unset. - headStart: useHandover ? "/api/chat" : undefined, + // The hydrated agent has its own head-start route so the + // headStart × hydrateMessages combination is testable end-to-end. + headStart: useHandover + ? taskMode === "ai-chat-hydrated" + ? "/api/chat-hydrated" + : "/api/chat" + : undefined, }); const handleFirstMessage = useCallback( diff --git a/projects/ai-chat/src/trigger/chat.ts b/projects/ai-chat/src/trigger/chat.ts index 00f2746..42c9ca3 100644 --- a/projects/ai-chat/src/trigger/chat.ts +++ b/projects/ai-chat/src/trigger/chat.ts @@ -674,14 +674,25 @@ export const aiChatRaw = chat.customAgent({ }, }); + // Continuation boots (fresh run) start with an empty accumulator — + // fetch stored history so turn 0 can seed it. Seeding must go THROUGH + // addIncoming: turn 0 replaces the accumulator, so a setMessages + // before the loop would be wiped. + let continuationSeed: UIMessage[] = []; + if (currentPayload.continuation) { + const row = await prisma.chat.findUnique({ where: { id: currentPayload.chatId } }); + continuationSeed = (row?.messages ?? []) as unknown as UIMessage[]; + } + for (let turn = 0; turn < 100; turn++) { stop.reset(); - const messages = await conversation.addIncoming( - currentPayload.message ? [currentPayload.message] : [], - currentPayload.trigger, - turn - ); + const incoming = currentPayload.message ? [currentPayload.message] : []; + const turnInput = + turn === 0 && continuationSeed.length > 0 + ? [...continuationSeed.filter((s) => !incoming.some((m) => m.id === s.id)), ...incoming] + : incoming; + const messages = await conversation.addIncoming(turnInput, currentPayload.trigger, turn); const turnClientData = (currentPayload.metadata ?? currentClientData) as | { userId: string; model?: string } @@ -696,6 +707,13 @@ export const aiChatRaw = chat.customAgent({ const useReasoning = useExtendedThinking(modelOverride); const combinedSignal = AbortSignal.any([runSignal, stop.signal]); + // Persist the incoming user message BEFORE streaming (the + // onTurnStart-equivalent) so a mid-stream reload doesn't lose it. + await prisma.chat.update({ + where: { id: currentPayload.chatId }, + data: { messages: conversation.uiMessages as unknown as ChatMessagesForWrite }, + }); + const steeringSub = chat.messages.on(async (msg) => { if (msg.message) await conversation.steerAsync(msg.message); }); @@ -744,9 +762,13 @@ export const aiChatRaw = chat.customAgent({ if (runSignal.aborted) break; + // Race with a timeout — on stop-abort totalUsage never settles. let turnUsage: LanguageModelUsage | undefined; try { - turnUsage = await result.totalUsage; + turnUsage = await Promise.race([ + result.totalUsage, + new Promise((r) => setTimeout(() => r(undefined), 2000)), + ]); } catch { /* non-fatal */ } @@ -827,6 +849,17 @@ export const aiChatSession = chat }); for await (const turn of session) { + // Continuation boots (fresh run) start with an empty accumulator — + // seed it from the DB, keeping any incoming message not yet persisted. + if (turn.continuation && turn.number === 0) { + const row = await prisma.chat.findUnique({ where: { id: turn.chatId } }); + const stored = (row?.messages ?? []) as unknown as UIMessage[]; + if (stored.length > 0) { + const incoming = turn.uiMessages.filter((m) => !stored.some((s) => s.id === m.id)); + await turn.setMessages([...stored, ...incoming]); + } + } + const turnClientData = (turn.clientData ?? clientData) as | { userId: string; model?: string } | undefined; @@ -837,6 +870,13 @@ export const aiChatSession = chat const modelOverride = turnClientData?.model ?? userContext.preferredModel ?? undefined; const useReasoning = useExtendedThinking(modelOverride); + // Persist the incoming user message BEFORE streaming (the + // onTurnStart-equivalent) so a mid-stream reload doesn't lose it. + await prisma.chat.update({ + where: { id: turn.chatId }, + data: { messages: turn.uiMessages as unknown as ChatMessagesForWrite }, + }); + const result = streamText({ ...chat.toStreamTextOptions({ registry }), model: languageModelForChatTurn(modelOverride), @@ -904,6 +944,10 @@ export const aiChatHydrated = chat id: "ai-chat-hydrated", idleTimeoutInSeconds: 60, + // Same tool set as `ai-chat` so head-start tool-call handovers can + // execute post-handover (see /api/chat-hydrated). + tools: chatTools, + // Load message history from the database on every turn. // The frontend's accumulated messages are ignored — the DB is the // single source of truth. `upsertIncomingMessage` handles HITL @@ -915,9 +959,12 @@ export const aiChatHydrated = chat const stored = (record?.messages as unknown as UIMessage[]) ?? []; if (upsertIncomingMessage(stored, { trigger, incomingMessages })) { - await prisma.chat.update({ + // Upsert, not update: on a head-start first turn there's no + // preload, so the row may not exist yet when the hook fires. + await prisma.chat.upsert({ where: { id: chatId }, - data: { messages: stored as unknown as ChatMessagesForWrite }, + create: { id: chatId, title: "New chat", messages: stored as unknown as ChatMessagesForWrite }, + update: { messages: stored as unknown as ChatMessagesForWrite }, }); } @@ -1007,13 +1054,14 @@ export const aiChatHydrated = chat ]); }, - run: async ({ messages, clientData, stopSignal }) => { + run: async ({ messages, clientData, stopSignal, tools }) => { return streamText({ - ...chat.toStreamTextOptions(), + ...chat.toStreamTextOptions({ tools }), model: languageModelForChatTurn( clientData?.model ?? userContext.preferredModel ?? undefined ), messages, + stopWhen: stepCountIs(10), abortSignal: stopSignal, }); },