diff --git a/package-lock.json b/package-lock.json index f4274d20..50e83819 100644 --- a/package-lock.json +++ b/package-lock.json @@ -75,6 +75,7 @@ }, "peerDependencies": { "@ai-sdk/provider-utils": "^4.0.6", + "@convex-dev/workflow": "^0.4.3", "ai": "^6.0.35", "convex": "^1.24.8", "convex-helpers": "^0.1.103", diff --git a/package.json b/package.json index b273991d..9148fd87 100644 --- a/package.json +++ b/package.json @@ -70,6 +70,7 @@ }, "peerDependencies": { "@ai-sdk/provider-utils": "^4.0.6", + "@convex-dev/workflow": "^0.4.3", "ai": "^6.0.35", "convex": "^1.24.8", "convex-helpers": "^0.1.103", diff --git a/src/client/index.ts b/src/client/index.ts index 3dfd9531..69c8d793 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -100,6 +100,7 @@ import type { } from "./types.js"; import { streamText } from "./streamText.js"; import { errorToString, hasSuccessfulToolCall, willContinue } from "./utils.js"; +import { runMutation } from "./run.js"; export { stepCountIs } from "ai"; export { hasSuccessfulToolCall }; @@ -877,7 +878,10 @@ export class Agent< ...args, contextOptions, getEmbedding: async (text) => { - assert("runAction" in ctx); + assert( + "runAction" in ctx && "storage" in ctx, + "ActionCtx is required for vector search", + ); const embeddingModel = this.getEmbeddingModel(); assert( embeddingModel, @@ -928,7 +932,8 @@ export class Agent< >; }, ): Promise { - const thread = await ctx.runMutation( + const thread = await runMutation( + ctx, this.component.threads.updateThread, args, ); @@ -1302,7 +1307,7 @@ export class Agent< result: { status: "failed"; error: string } | { status: "success" }; }, ): Promise { - await ctx.runMutation(this.component.messages.finalizeMessage, { + await runMutation(ctx, this.component.messages.finalizeMessage, { messageId: args.messageId, result: args.result, }); @@ -1344,7 +1349,7 @@ export class Agent< this.component, args.patch.message, ); - await ctx.runMutation(this.component.messages.updateMessage, { + await runMutation(ctx, this.component.messages.updateMessage, { messageId: args.messageId, patch: { message, @@ -1480,6 +1485,7 @@ export class Agent< */ /** + * @deprecated use {@link createThread} from within a Workflow directly * Create a mutation that creates a thread so you can call it from a Workflow. * e.g. * ```ts @@ -1490,7 +1496,7 @@ export class Agent< * export const myWorkflow = workflow.define({ * args: {}, * handler: async (step) => { - * const { threadId } = await step.runMutation(internal.foo.createThread); + * const { threadId } = await createThread(step, components.agent, { ... }); * // use the threadId to generate text, object, etc. * }, * }); diff --git a/src/client/messages.ts b/src/client/messages.ts index a6bfa7c5..3d11d144 100644 --- a/src/client/messages.ts +++ b/src/client/messages.ts @@ -19,6 +19,7 @@ import type { ActionCtx, } from "./types.js"; import { parse } from "convex-helpers/validators"; +import { runMutation, runQuery } from "./run.js"; /** * List messages from a thread. @@ -53,7 +54,7 @@ export async function listMessages( continueCursor: paginationOpts.cursor ?? "", }; } - return ctx.runQuery(component.messages.listMessagesByThreadId, { + return runQuery(ctx, component.messages.listMessagesByThreadId, { order: "desc", threadId, paginationOpts, @@ -131,7 +132,7 @@ export async function saveMessages( }; } } - const result = await ctx.runMutation(component.messages.addMessages, { + const result = await runMutation(ctx, component.messages.addMessages, { threadId: args.threadId, userId: args.userId ?? undefined, agentName: args.agentName, diff --git a/src/client/run.ts b/src/client/run.ts new file mode 100644 index 00000000..72e7df4e --- /dev/null +++ b/src/client/run.ts @@ -0,0 +1,32 @@ +import type { + FunctionArgs, + FunctionReference, + FunctionVisibility, +} from "convex/server"; +import type { ActionCtx, MutationCtx, QueryCtx } from "./types.js"; + +/** + * Like ctx.runMutation, but supports inline mutations in workflows. + */ +export function runMutation< + Fn extends FunctionReference<"mutation", FunctionVisibility>, +>(ctx: MutationCtx | ActionCtx, fn: Fn, args: FunctionArgs) { + if ("workflowId" in ctx) { + return ctx.runMutation(fn, args, { inline: true }); + } else { + return ctx.runMutation(fn, args); + } +} + +/** + * Like ctx.runQuery, but supports inline queries in workflows. + */ +export function runQuery< + Fn extends FunctionReference<"query", FunctionVisibility>, +>(ctx: QueryCtx | MutationCtx | ActionCtx, fn: Fn, args: FunctionArgs) { + if ("workflowId" in ctx) { + return ctx.runQuery(fn, args, { inline: true }); + } else { + return ctx.runQuery(fn, args); + } +} diff --git a/src/client/saveInputMessages.ts b/src/client/saveInputMessages.ts index 8c659516..16fb1e62 100644 --- a/src/client/saveInputMessages.ts +++ b/src/client/saveInputMessages.ts @@ -31,7 +31,10 @@ export async function saveInputMessages( storageOptions?: { saveMessages?: "all" | "promptAndOutput"; }; - } & Pick, + } & Pick< + Config, + "usageHandler" | "textEmbeddingModel" | "embeddingModel" | "callSettings" + >, ): Promise<{ promptMessageId: string | undefined; pendingMessage: MessageDoc; @@ -66,7 +69,7 @@ export async function saveInputMessages( | undefined; if ((args.embeddingModel ?? args.textEmbeddingModel) && toSave.length) { assert( - "runAction" in ctx, + "runAction" in ctx && "storage" in ctx, "You must be in an action context to generate embeddings", ); embeddings = await embedMessages( diff --git a/src/client/threads.ts b/src/client/threads.ts index 62249c8a..8bc29551 100644 --- a/src/client/threads.ts +++ b/src/client/threads.ts @@ -6,6 +6,7 @@ import type { MutationCtx, QueryCtx, } from "./types.js"; +import { runMutation, runQuery } from "./run.js"; /** * Create a thread to store messages with an Agent. @@ -19,7 +20,8 @@ export async function createThread( component: AgentComponent, args?: { userId?: string | null; title?: string; summary?: string }, ) { - const { _id: threadId } = await ctx.runMutation( + const { _id: threadId } = await runMutation( + ctx, component.threads.createThread, { userId: args?.userId ?? undefined, @@ -41,7 +43,7 @@ export async function getThreadMetadata( component: AgentComponent, args: { threadId: string }, ): Promise { - const thread = await ctx.runQuery(component.threads.getThread, { + const thread = await runQuery(ctx, component.threads.getThread, { threadId: args.threadId, }); if (!thread) { @@ -55,7 +57,7 @@ export async function updateThreadMetadata( component: AgentComponent, args: { threadId: string; patch: Partial> }, ) { - return ctx.runMutation(component.threads.updateThread, { + return runMutation(ctx, component.threads.updateThread, { threadId: args.threadId, patch: args.patch, }); diff --git a/src/client/types.ts b/src/client/types.ts index 24db4583..0ca874d3 100644 --- a/src/client/types.ts +++ b/src/client/types.ts @@ -32,6 +32,8 @@ export interface Output<_T = any, _P = any, _E = any> { import type { GenericActionCtx, GenericDataModel, + GenericMutationCtx, + GenericQueryCtx, WithoutSystemFields, } from "convex/server"; import type { @@ -43,6 +45,7 @@ import type { } from "../validators.js"; import type { StreamingOptions } from "./streaming.js"; import type { ComponentApi } from "../component/_generated/component.js"; +import type { WorkflowCtx } from "@convex-dev/workflow"; /** * Type-level check that ensures models are from AI SDK v6. @@ -354,10 +357,7 @@ export type TextArgs< OUTPUT extends Output = never, > = Omit< Parameters< - typeof generateText< - TOOLS extends undefined ? AgentTools : TOOLS, - OUTPUT - > + typeof generateText >[0], "model" | "prompt" | "messages" > & { @@ -374,10 +374,7 @@ export type StreamingTextArgs< OUTPUT extends Output = never, > = Omit< Parameters< - typeof streamText< - TOOLS extends undefined ? AgentTools : TOOLS, - OUTPUT - > + typeof streamText >[0], "model" | "prompt" | "messages" > & { @@ -494,11 +491,7 @@ export interface Thread { OUTPUT extends Output = never, >( generateTextArgs: AgentPrompt & - TextArgs< - TOOLS extends undefined ? DefaultTools : TOOLS, - TOOLS, - OUTPUT - >, + TextArgs, options?: Options, ): Promise< GenerateTextResult & @@ -539,10 +532,7 @@ export interface Thread { saveStreamDeltas?: boolean | StreamingOptions; }, ): Promise< - StreamTextResult< - TOOLS extends undefined ? DefaultTools : TOOLS, - OUTPUT - > & + StreamTextResult & ThreadOutputMetadata >; /** @@ -627,11 +617,10 @@ export type SyncStreamsReturnValue = | undefined; /* Type utils follow */ -export type QueryCtx = Pick, "runQuery">; -export type MutationCtx = Pick< - GenericActionCtx, - "runQuery" | "runMutation" ->; +export type QueryCtx = Pick, "runQuery">; +export type MutationCtx = + | Pick, "runQuery" | "runMutation"> + | WorkflowCtx; export type ActionCtx = Pick< GenericActionCtx, "runQuery" | "runMutation" | "runAction" | "storage" | "auth"