-
Notifications
You must be signed in to change notification settings - Fork 86
Add httpStreamText and useHttpStream #259
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f5b906c
363563f
0317e71
161cf67
028e62a
48a7752
08f45f9
4fb89cb
6568c5e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| // Agent for the streaming demo. Reuses the same tools as the approval demo | ||
| // so the UI can show streaming patterns alongside human-in-the-loop approval. | ||
| import { Agent, createTool, stepCountIs } from "@convex-dev/agent"; | ||
| import { components } from "../_generated/api"; | ||
| import { defaultConfig } from "./config"; | ||
| import { z } from "zod/v4"; | ||
|
|
||
| const deleteFileTool = createTool({ | ||
| description: "Delete a file from the system", | ||
| inputSchema: z.object({ | ||
| filename: z.string().describe("The name of the file to delete"), | ||
| }), | ||
| needsApproval: () => true, | ||
| execute: async (_ctx, input) => { | ||
| return `Successfully deleted file: ${input.filename}`; | ||
| }, | ||
| }); | ||
|
|
||
| const transferMoneyTool = createTool({ | ||
| description: "Transfer money to an account", | ||
| inputSchema: z.object({ | ||
| amount: z.number().describe("The amount to transfer"), | ||
| toAccount: z.string().describe("The destination account"), | ||
| }), | ||
| needsApproval: async (_ctx, input) => { | ||
| return input.amount > 100; | ||
| }, | ||
| execute: async (_ctx, input) => { | ||
| return `Transferred $${input.amount} to account ${input.toAccount}`; | ||
| }, | ||
| }); | ||
|
|
||
| const checkBalanceTool = createTool({ | ||
| description: "Check the account balance", | ||
| inputSchema: z.object({ | ||
| accountId: z.string().describe("The account to check"), | ||
| }), | ||
| execute: async (_ctx, _input) => { | ||
| return `Balance: $1,234.56`; | ||
| }, | ||
| }); | ||
|
|
||
| export const streamingDemoAgent = new Agent(components.agent, { | ||
| name: "Streaming Demo Agent", | ||
| instructions: | ||
| "You are a concise assistant who responds with emojis " + | ||
| "and abbreviations like lmao, lol, iirc, afaik, etc. where appropriate. " + | ||
| "You can delete files, transfer money, and check account balances. " + | ||
| "Always confirm what action you took after it completes.", | ||
| tools: { | ||
| deleteFile: deleteFileTool, | ||
| transferMoney: transferMoneyTool, | ||
| checkBalance: checkBalanceTool, | ||
| }, | ||
| stopWhen: stepCountIs(5), | ||
| ...defaultConfig, | ||
| callSettings: { ...defaultConfig.callSettings, temperature: 0 }, | ||
| }); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,238 @@ | ||
| /** | ||
| * Full Streaming Demo | ||
| * | ||
| * Demonstrates ALL streaming patterns in one place: | ||
| * 1. Async delta streaming (recommended) - mutation saves prompt, action streams | ||
| * 2. HTTP streaming - direct text stream over HTTP response | ||
| * 3. One-shot streaming - single action call with delta persistence | ||
| * 4. Stream lifecycle management - abort, status transitions, cleanup | ||
| * 5. Tool approval - pauses generation, resumes after approve/deny | ||
| */ | ||
| import { paginationOptsValidator } from "convex/server"; | ||
| import { | ||
| listUIMessages, | ||
| syncStreams, | ||
| abortStream, | ||
| listStreams, | ||
| vStreamArgs, | ||
| } from "@convex-dev/agent"; | ||
| import { components, internal } from "../_generated/api"; | ||
| import { | ||
| action, | ||
| httpAction, | ||
| internalAction, | ||
| mutation, | ||
| query, | ||
| } from "../_generated/server"; | ||
| import { v } from "convex/values"; | ||
| import { authorizeThreadAccess } from "../threads"; | ||
| import { streamingDemoAgent } from "../agents/streamingDemo"; | ||
|
|
||
| // ============================================================================ | ||
| // Pattern 1: Async Delta Streaming (RECOMMENDED) | ||
| // | ||
| // Two-phase approach: | ||
| // Phase 1 (mutation): Save the user message and schedule the action. | ||
| // Phase 2 (action): Stream the AI response, saving deltas to the DB. | ||
| // | ||
| // Clients subscribe via `useUIMessages` with `stream: true` and see real-time | ||
| // delta updates through Convex's reactive query system. | ||
| // ============================================================================ | ||
|
|
||
| export const sendMessage = mutation({ | ||
| args: { prompt: v.string(), threadId: v.string() }, | ||
| handler: async (ctx, { prompt, threadId }) => { | ||
| await authorizeThreadAccess(ctx, threadId); | ||
| const { messageId } = await streamingDemoAgent.saveMessage(ctx, { | ||
| threadId, | ||
| prompt, | ||
| skipEmbeddings: true, | ||
| }); | ||
| await ctx.scheduler.runAfter( | ||
| 0, | ||
| internal.chat.streamingDemo.streamResponse, | ||
| { threadId, promptMessageId: messageId }, | ||
| ); | ||
| }, | ||
| }); | ||
|
|
||
| export const streamResponse = internalAction({ | ||
| args: { promptMessageId: v.string(), threadId: v.string() }, | ||
| handler: async (ctx, { promptMessageId, threadId }) => { | ||
| const result = await streamingDemoAgent.streamText( | ||
| ctx, | ||
| { threadId }, | ||
| { promptMessageId }, | ||
| { saveStreamDeltas: { chunking: "word", throttleMs: 100 } }, | ||
| ); | ||
| await result.consumeStream(); | ||
| }, | ||
| }); | ||
|
|
||
| // ============================================================================ | ||
| // Pattern 2: HTTP Streaming | ||
| // | ||
| // Streams text directly over an HTTP response using `agent.asHttpAction()`. | ||
| // The handler parses the JSON body, streams the response, and sets | ||
| // X-Message-Id / X-Stream-Id headers. | ||
| // | ||
| // `saveStreamDeltas: { returnImmediately: true }` saves deltas in the | ||
| // background so `useUIMessages` can dedupe by `streamId` AND the response | ||
| // body starts streaming immediately. Plain `true` would buffer the full | ||
| // generation before opening the body. | ||
| // | ||
| // IMPORTANT: `body.threadId` is only honored if `authorize` validates | ||
| // ownership and returns it. Without authorize (or without returning | ||
| // `threadId`), the helper creates a new thread instead — preventing a | ||
| // caller from appending to or reading from arbitrary threads by guessing | ||
| // IDs. This demo authorizes against the (mocked) authorizeThreadAccess | ||
| // helper used by the rest of the example app. | ||
| // ============================================================================ | ||
|
|
||
| export const streamOverHttp = httpAction( | ||
| streamingDemoAgent.asHttpAction({ | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how big of a win do you think |
||
| saveStreamDeltas: { returnImmediately: true }, | ||
| authorize: async (ctx, _request, body) => { | ||
| if (body.threadId) { | ||
| await authorizeThreadAccess(ctx, body.threadId); | ||
| return { threadId: body.threadId }; | ||
| } | ||
| return {}; | ||
| }, | ||
| }), | ||
| ); | ||
|
|
||
| // ============================================================================ | ||
| // Pattern 3: One-Shot Streaming | ||
| // | ||
| // Single action call that both streams and persists deltas. Simpler than | ||
| // the two-phase approach but does not support optimistic client updates. | ||
| // ============================================================================ | ||
|
|
||
| export const streamOneShot = action({ | ||
| args: { prompt: v.string(), threadId: v.string() }, | ||
| handler: async (ctx, { prompt, threadId }) => { | ||
| await authorizeThreadAccess(ctx, threadId); | ||
| await streamingDemoAgent.streamText( | ||
| ctx, | ||
| { threadId }, | ||
| { prompt }, | ||
| { saveStreamDeltas: true }, | ||
| ); | ||
| }, | ||
| }); | ||
|
|
||
| // ============================================================================ | ||
| // Tool Approval | ||
| // | ||
| // When the model calls a tool with `needsApproval`, generation pauses. | ||
| // The client shows Approve/Deny buttons; once resolved, the client triggers | ||
| // continuation via delta streaming. | ||
| // ============================================================================ | ||
|
|
||
| export const submitApproval = mutation({ | ||
| args: { | ||
| threadId: v.string(), | ||
| approvalId: v.string(), | ||
| approved: v.boolean(), | ||
| reason: v.optional(v.string()), | ||
| }, | ||
| returns: v.object({ messageId: v.string() }), | ||
| handler: async (ctx, { threadId, approvalId, approved, reason }) => { | ||
| await authorizeThreadAccess(ctx, threadId); | ||
| const { messageId } = approved | ||
| ? await streamingDemoAgent.approveToolCall(ctx, { | ||
| threadId, | ||
| approvalId, | ||
| reason, | ||
| }) | ||
| : await streamingDemoAgent.denyToolCall(ctx, { | ||
| threadId, | ||
| approvalId, | ||
| reason, | ||
| }); | ||
| return { messageId }; | ||
| }, | ||
| }); | ||
|
|
||
| export const triggerContinuation = mutation({ | ||
| args: { threadId: v.string(), lastApprovalMessageId: v.string() }, | ||
| handler: async (ctx, { threadId, lastApprovalMessageId }) => { | ||
| await authorizeThreadAccess(ctx, threadId); | ||
| await ctx.scheduler.runAfter( | ||
| 0, | ||
| internal.chat.streamingDemo.continueAfterApprovals, | ||
| { threadId, lastApprovalMessageId }, | ||
| ); | ||
| }, | ||
| }); | ||
|
|
||
| export const continueAfterApprovals = internalAction({ | ||
| args: { threadId: v.string(), lastApprovalMessageId: v.string() }, | ||
| handler: async (ctx, { threadId, lastApprovalMessageId }) => { | ||
| const result = await streamingDemoAgent.streamText( | ||
| ctx, | ||
| { threadId }, | ||
| { promptMessageId: lastApprovalMessageId }, | ||
| { saveStreamDeltas: { chunking: "word", throttleMs: 100 } }, | ||
| ); | ||
| await result.consumeStream(); | ||
| }, | ||
| }); | ||
|
|
||
| // ============================================================================ | ||
| // Queries: Messages + Stream Sync | ||
| // ============================================================================ | ||
|
|
||
| export const listThreadMessages = query({ | ||
| args: { | ||
| threadId: v.string(), | ||
| paginationOpts: paginationOptsValidator, | ||
| streamArgs: vStreamArgs, | ||
| }, | ||
| handler: async (ctx, args) => { | ||
| const { threadId, streamArgs } = args; | ||
| await authorizeThreadAccess(ctx, threadId); | ||
| const streams = await syncStreams(ctx, components.agent, { | ||
| threadId, | ||
| streamArgs, | ||
| }); | ||
| const paginated = await listUIMessages(ctx, components.agent, args); | ||
| return { ...paginated, streams }; | ||
| }, | ||
| }); | ||
|
|
||
| export const listActiveStreams = query({ | ||
| args: { threadId: v.string() }, | ||
| handler: async (ctx, { threadId }) => { | ||
| await authorizeThreadAccess(ctx, threadId); | ||
| return listStreams(ctx, components.agent, { threadId }); | ||
| }, | ||
| }); | ||
|
Comment on lines
+205
to
+211
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's this? I believe |
||
|
|
||
| // ============================================================================ | ||
| // Stream Lifecycle Management | ||
| // ============================================================================ | ||
|
|
||
| export const abortStreamByOrder = mutation({ | ||
| args: { threadId: v.string(), order: v.number() }, | ||
| handler: async (ctx, { threadId, order }) => { | ||
| await authorizeThreadAccess(ctx, threadId); | ||
| return abortStream(ctx, components.agent, { | ||
| threadId, | ||
| order, | ||
| reason: "User requested abort", | ||
| }); | ||
| }, | ||
| }); | ||
|
|
||
| export const listAllStreams = query({ | ||
| args: { threadId: v.string() }, | ||
| handler: async (ctx, { threadId }) => { | ||
| await authorizeThreadAccess(ctx, threadId); | ||
| return listStreams(ctx, components.agent, { | ||
| threadId, | ||
| includeStatuses: ["streaming", "finished", "aborted"], | ||
| }); | ||
| }, | ||
| }); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| /// <reference types="vite/client" /> | ||
| import { test } from "vitest"; | ||
| import { convexTest } from "convex-test"; | ||
| import { convexTest, type TestConvex } from "convex-test"; | ||
| import type { GenericSchema, SchemaDefinition } from "convex/server"; | ||
| import schema from "./schema.js"; | ||
| import agent from "@convex-dev/agent/test"; | ||
| import workflow from "@convex-dev/workflow/test"; | ||
|
|
@@ -10,8 +11,15 @@ export const modules = import.meta.glob("./**/*.*s"); | |
| export function initConvexTest() { | ||
| const t = convexTest(schema, modules); | ||
| agent.register(t); | ||
| workflow.register(t); | ||
| rateLimiter.register(t); | ||
| // workflow and rate-limiter still type their `register` against the | ||
| // pre-generic SchemaDefinition<GenericSchema, boolean>. Cast through | ||
| // the broader type so this file typechecks regardless of which version | ||
| // of those packages is resolved. | ||
| const generic = t as unknown as TestConvex< | ||
| SchemaDefinition<GenericSchema, boolean> | ||
|
Comment on lines
+14
to
+19
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should be fixed in the latest version of convex-test |
||
| >; | ||
| workflow.register(generic); | ||
| rateLimiter.register(generic); | ||
| return t; | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This last sentence could get re-worded - I didn't understand at first