Skip to content
371 changes: 371 additions & 0 deletions docs/http-streaming-requirements.md

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions example/convex/_generated/api.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import type * as agents_config from "../agents/config.js";
import type * as agents_fashion from "../agents/fashion.js";
import type * as agents_simple from "../agents/simple.js";
import type * as agents_story from "../agents/story.js";
import type * as agents_streamingDemo from "../agents/streamingDemo.js";
import type * as agents_weather from "../agents/weather.js";
import type * as chat_approval from "../chat/approval.js";
import type * as chat_basic from "../chat/basic.js";
import type * as chat_human from "../chat/human.js";
import type * as chat_streamAbort from "../chat/streamAbort.js";
import type * as chat_streaming from "../chat/streaming.js";
import type * as chat_streamingDemo from "../chat/streamingDemo.js";
import type * as chat_streamingReasoning from "../chat/streamingReasoning.js";
import type * as chat_withoutAgent from "../chat/withoutAgent.js";
import type * as crons from "../crons.js";
Expand Down Expand Up @@ -62,12 +64,14 @@ declare const fullApi: ApiFromModules<{
"agents/fashion": typeof agents_fashion;
"agents/simple": typeof agents_simple;
"agents/story": typeof agents_story;
"agents/streamingDemo": typeof agents_streamingDemo;
"agents/weather": typeof agents_weather;
"chat/approval": typeof chat_approval;
"chat/basic": typeof chat_basic;
"chat/human": typeof chat_human;
"chat/streamAbort": typeof chat_streamAbort;
"chat/streaming": typeof chat_streaming;
"chat/streamingDemo": typeof chat_streamingDemo;
"chat/streamingReasoning": typeof chat_streamingReasoning;
"chat/withoutAgent": typeof chat_withoutAgent;
crons: typeof crons;
Expand Down
58 changes: 58 additions & 0 deletions example/convex/agents/streamingDemo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Agent for the streaming demo. Reuses the same tools as the approval demo
// so the UI can show streaming patterns alongside human-in-the-loop approval.
import { Agent, createTool, stepCountIs } from "@convex-dev/agent";
import { components } from "../_generated/api";
import { defaultConfig } from "./config";
import { z } from "zod/v4";

const deleteFileTool = createTool({
description: "Delete a file from the system",
inputSchema: z.object({
filename: z.string().describe("The name of the file to delete"),
}),
needsApproval: () => true,
execute: async (_ctx, input) => {
return `Successfully deleted file: ${input.filename}`;
},
});

const transferMoneyTool = createTool({
description: "Transfer money to an account",
inputSchema: z.object({
amount: z.number().describe("The amount to transfer"),
toAccount: z.string().describe("The destination account"),
}),
needsApproval: async (_ctx, input) => {
return input.amount > 100;
},
execute: async (_ctx, input) => {
return `Transferred $${input.amount} to account ${input.toAccount}`;
},
});

const checkBalanceTool = createTool({
description: "Check the account balance",
inputSchema: z.object({
accountId: z.string().describe("The account to check"),
}),
execute: async (_ctx, _input) => {
return `Balance: $1,234.56`;
},
});

export const streamingDemoAgent = new Agent(components.agent, {
name: "Streaming Demo Agent",
instructions:
"You are a concise assistant who responds with emojis " +
"and abbreviations like lmao, lol, iirc, afaik, etc. where appropriate. " +
"You can delete files, transfer money, and check account balances. " +
"Always confirm what action you took after it completes.",
tools: {
deleteFile: deleteFileTool,
transferMoney: transferMoneyTool,
checkBalance: checkBalanceTool,
},
stopWhen: stepCountIs(5),
...defaultConfig,
callSettings: { ...defaultConfig.callSettings, temperature: 0 },
});
238 changes: 238 additions & 0 deletions example/convex/chat/streamingDemo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/**
* Full Streaming Demo
*
* Demonstrates ALL streaming patterns in one place:
* 1. Async delta streaming (recommended) - mutation saves prompt, action streams
* 2. HTTP streaming - direct text stream over HTTP response
* 3. One-shot streaming - single action call with delta persistence
* 4. Stream lifecycle management - abort, status transitions, cleanup
* 5. Tool approval - pauses generation, resumes after approve/deny
*/
import { paginationOptsValidator } from "convex/server";
import {
listUIMessages,
syncStreams,
abortStream,
listStreams,
vStreamArgs,
} from "@convex-dev/agent";
import { components, internal } from "../_generated/api";
import {
action,
httpAction,
internalAction,
mutation,
query,
} from "../_generated/server";
import { v } from "convex/values";
import { authorizeThreadAccess } from "../threads";
import { streamingDemoAgent } from "../agents/streamingDemo";

// ============================================================================
// Pattern 1: Async Delta Streaming (RECOMMENDED)
//
// Two-phase approach:
// Phase 1 (mutation): Save the user message and schedule the action.
// Phase 2 (action): Stream the AI response, saving deltas to the DB.
//
// Clients subscribe via `useUIMessages` with `stream: true` and see real-time
// delta updates through Convex's reactive query system.
// ============================================================================

export const sendMessage = mutation({
args: { prompt: v.string(), threadId: v.string() },
handler: async (ctx, { prompt, threadId }) => {
await authorizeThreadAccess(ctx, threadId);
const { messageId } = await streamingDemoAgent.saveMessage(ctx, {
threadId,
prompt,
skipEmbeddings: true,
});
await ctx.scheduler.runAfter(
0,
internal.chat.streamingDemo.streamResponse,
{ threadId, promptMessageId: messageId },
);
},
});

export const streamResponse = internalAction({
args: { promptMessageId: v.string(), threadId: v.string() },
handler: async (ctx, { promptMessageId, threadId }) => {
const result = await streamingDemoAgent.streamText(
ctx,
{ threadId },
{ promptMessageId },
{ saveStreamDeltas: { chunking: "word", throttleMs: 100 } },
);
await result.consumeStream();
},
});

// ============================================================================
// Pattern 2: HTTP Streaming
//
// Streams text directly over an HTTP response using `agent.asHttpAction()`.
// The handler parses the JSON body, streams the response, and sets
// X-Message-Id / X-Stream-Id headers.
//
// `saveStreamDeltas: { returnImmediately: true }` saves deltas in the
// background so `useUIMessages` can dedupe by `streamId` AND the response
// body starts streaming immediately. Plain `true` would buffer the full
// generation before opening the body.
Comment on lines +81 to +82
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This last sentence could get re-worded - I didn't understand at first

//
// IMPORTANT: `body.threadId` is only honored if `authorize` validates
// ownership and returns it. Without authorize (or without returning
// `threadId`), the helper creates a new thread instead — preventing a
// caller from appending to or reading from arbitrary threads by guessing
// IDs. This demo authorizes against the (mocked) authorizeThreadAccess
// helper used by the rest of the example app.
// ============================================================================

export const streamOverHttp = httpAction(
streamingDemoAgent.asHttpAction({
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how big of a win do you think .asHttpAction is over a regular http action where they return the standard toTextStreamResponse / etc. ? It's slicker, but I wonder if getting agents to learn how to do authorize is just as hard as doing their own authorization paired with the well-documented AI SDK calls

saveStreamDeltas: { returnImmediately: true },
authorize: async (ctx, _request, body) => {
if (body.threadId) {
await authorizeThreadAccess(ctx, body.threadId);
return { threadId: body.threadId };
}
return {};
},
}),
);

// ============================================================================
// Pattern 3: One-Shot Streaming
//
// Single action call that both streams and persists deltas. Simpler than
// the two-phase approach but does not support optimistic client updates.
// ============================================================================

export const streamOneShot = action({
args: { prompt: v.string(), threadId: v.string() },
handler: async (ctx, { prompt, threadId }) => {
await authorizeThreadAccess(ctx, threadId);
await streamingDemoAgent.streamText(
ctx,
{ threadId },
{ prompt },
{ saveStreamDeltas: true },
);
},
});

// ============================================================================
// Tool Approval
//
// When the model calls a tool with `needsApproval`, generation pauses.
// The client shows Approve/Deny buttons; once resolved, the client triggers
// continuation via delta streaming.
// ============================================================================

export const submitApproval = mutation({
args: {
threadId: v.string(),
approvalId: v.string(),
approved: v.boolean(),
reason: v.optional(v.string()),
},
returns: v.object({ messageId: v.string() }),
handler: async (ctx, { threadId, approvalId, approved, reason }) => {
await authorizeThreadAccess(ctx, threadId);
const { messageId } = approved
? await streamingDemoAgent.approveToolCall(ctx, {
threadId,
approvalId,
reason,
})
: await streamingDemoAgent.denyToolCall(ctx, {
threadId,
approvalId,
reason,
});
return { messageId };
},
});

export const triggerContinuation = mutation({
args: { threadId: v.string(), lastApprovalMessageId: v.string() },
handler: async (ctx, { threadId, lastApprovalMessageId }) => {
await authorizeThreadAccess(ctx, threadId);
await ctx.scheduler.runAfter(
0,
internal.chat.streamingDemo.continueAfterApprovals,
{ threadId, lastApprovalMessageId },
);
},
});

export const continueAfterApprovals = internalAction({
args: { threadId: v.string(), lastApprovalMessageId: v.string() },
handler: async (ctx, { threadId, lastApprovalMessageId }) => {
const result = await streamingDemoAgent.streamText(
ctx,
{ threadId },
{ promptMessageId: lastApprovalMessageId },
{ saveStreamDeltas: { chunking: "word", throttleMs: 100 } },
);
await result.consumeStream();
},
});

// ============================================================================
// Queries: Messages + Stream Sync
// ============================================================================

export const listThreadMessages = query({
args: {
threadId: v.string(),
paginationOpts: paginationOptsValidator,
streamArgs: vStreamArgs,
},
handler: async (ctx, args) => {
const { threadId, streamArgs } = args;
await authorizeThreadAccess(ctx, threadId);
const streams = await syncStreams(ctx, components.agent, {
threadId,
streamArgs,
});
const paginated = await listUIMessages(ctx, components.agent, args);
return { ...paginated, streams };
},
});

export const listActiveStreams = query({
args: { threadId: v.string() },
handler: async (ctx, { threadId }) => {
await authorizeThreadAccess(ctx, threadId);
return listStreams(ctx, components.agent, { threadId });
},
});
Comment on lines +205 to +211
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this? I believe listThreadMessages if passed streamArgs : { kind: "list" } will do this


// ============================================================================
// Stream Lifecycle Management
// ============================================================================

export const abortStreamByOrder = mutation({
args: { threadId: v.string(), order: v.number() },
handler: async (ctx, { threadId, order }) => {
await authorizeThreadAccess(ctx, threadId);
return abortStream(ctx, components.agent, {
threadId,
order,
reason: "User requested abort",
});
},
});

export const listAllStreams = query({
args: { threadId: v.string() },
handler: async (ctx, { threadId }) => {
await authorizeThreadAccess(ctx, threadId);
return listStreams(ctx, components.agent, {
threadId,
includeStatuses: ["streaming", "finished", "aborted"],
});
},
});
14 changes: 13 additions & 1 deletion example/convex/http.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import { httpRouter } from "convex/server";
import { streamOverHttp } from "./chat/streaming";
import { streamOverHttp as streamOverHttpDemo } from "./chat/streamingDemo";
import { corsRouter } from "convex-helpers/server/cors";

const http = httpRouter();

const cors = corsRouter(http, {
allowCredentials: true,
allowedHeaders: ["Authorization", "Content-Type"],
exposedHeaders: ["Content-Type", "Content-Length", "X-Message-Id"],
exposedHeaders: [
"Content-Type",
"Content-Length",
"X-Message-Id",
"X-Stream-Id",
],
});

cors.route({
Expand All @@ -16,5 +22,11 @@ cors.route({
handler: streamOverHttp,
});

cors.route({
path: "/streamTextDemo",
method: "POST",
handler: streamOverHttpDemo,
});

// Convex expects the router to be the default export of `convex/http.js`.
export default http;
14 changes: 11 additions & 3 deletions example/convex/setup.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/// <reference types="vite/client" />
import { test } from "vitest";
import { convexTest } from "convex-test";
import { convexTest, type TestConvex } from "convex-test";
import type { GenericSchema, SchemaDefinition } from "convex/server";
import schema from "./schema.js";
import agent from "@convex-dev/agent/test";
import workflow from "@convex-dev/workflow/test";
Expand All @@ -10,8 +11,15 @@ export const modules = import.meta.glob("./**/*.*s");
export function initConvexTest() {
const t = convexTest(schema, modules);
agent.register(t);
workflow.register(t);
rateLimiter.register(t);
// workflow and rate-limiter still type their `register` against the
// pre-generic SchemaDefinition<GenericSchema, boolean>. Cast through
// the broader type so this file typechecks regardless of which version
// of those packages is resolved.
const generic = t as unknown as TestConvex<
SchemaDefinition<GenericSchema, boolean>
Comment on lines +14 to +19
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be fixed in the latest version of convex-test

>;
workflow.register(generic);
rateLimiter.register(generic);
return t;
}

Expand Down
Loading
Loading