Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
},
"peerDependencies": {
"@ai-sdk/provider-utils": "^4.0.6",
"@convex-dev/workflow": "^0.4.3",
"ai": "^6.0.35",
"convex": "^1.24.8",
"convex-helpers": "^0.1.103",
Expand Down
16 changes: 11 additions & 5 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ import type {
} from "./types.js";
import { streamText } from "./streamText.js";
import { errorToString, hasSuccessfulToolCall, willContinue } from "./utils.js";
import { runMutation } from "./run.js";

export { stepCountIs } from "ai";
export { hasSuccessfulToolCall };
Expand Down Expand Up @@ -877,7 +878,10 @@ export class Agent<
...args,
contextOptions,
getEmbedding: async (text) => {
assert("runAction" in ctx);
assert(
"runAction" in ctx && "storage" in ctx,
"ActionCtx is required for vector search",
);
const embeddingModel = this.getEmbeddingModel();
assert(
embeddingModel,
Expand Down Expand Up @@ -928,7 +932,8 @@ export class Agent<
>;
},
): Promise<ThreadDoc> {
const thread = await ctx.runMutation(
const thread = await runMutation(
ctx,
this.component.threads.updateThread,
args,
);
Expand Down Expand Up @@ -1302,7 +1307,7 @@ export class Agent<
result: { status: "failed"; error: string } | { status: "success" };
},
): Promise<void> {
await ctx.runMutation(this.component.messages.finalizeMessage, {
await runMutation(ctx, this.component.messages.finalizeMessage, {
messageId: args.messageId,
result: args.result,
});
Expand Down Expand Up @@ -1344,7 +1349,7 @@ export class Agent<
this.component,
args.patch.message,
);
await ctx.runMutation(this.component.messages.updateMessage, {
await runMutation(ctx, this.component.messages.updateMessage, {
messageId: args.messageId,
patch: {
message,
Expand Down Expand Up @@ -1480,6 +1485,7 @@ export class Agent<
*/

/**
* @deprecated use {@link createThread} from within a Workflow directly
* Create a mutation that creates a thread so you can call it from a Workflow.
* e.g.
* ```ts
Expand All @@ -1490,7 +1496,7 @@ export class Agent<
* export const myWorkflow = workflow.define({
* args: {},
* handler: async (step) => {
* const { threadId } = await step.runMutation(internal.foo.createThread);
* const { threadId } = await createThread(step, components.agent, { ... });
* // use the threadId to generate text, object, etc.
* },
* });
Expand Down
5 changes: 3 additions & 2 deletions src/client/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import type {
ActionCtx,
} from "./types.js";
import { parse } from "convex-helpers/validators";
import { runMutation, runQuery } from "./run.js";

/**
* List messages from a thread.
Expand Down Expand Up @@ -53,7 +54,7 @@ export async function listMessages(
continueCursor: paginationOpts.cursor ?? "",
};
}
return ctx.runQuery(component.messages.listMessagesByThreadId, {
return runQuery(ctx, component.messages.listMessagesByThreadId, {
order: "desc",
threadId,
paginationOpts,
Expand Down Expand Up @@ -131,7 +132,7 @@ export async function saveMessages(
};
}
}
const result = await ctx.runMutation(component.messages.addMessages, {
const result = await runMutation(ctx, component.messages.addMessages, {
threadId: args.threadId,
userId: args.userId ?? undefined,
agentName: args.agentName,
Expand Down
32 changes: 32 additions & 0 deletions src/client/run.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import type {
FunctionArgs,
FunctionReference,
FunctionVisibility,
} from "convex/server";
import type { ActionCtx, MutationCtx, QueryCtx } from "./types.js";

/**
* Like ctx.runMutation, but supports inline mutations in workflows.
*/
export function runMutation<
Fn extends FunctionReference<"mutation", FunctionVisibility>,
>(ctx: MutationCtx | ActionCtx, fn: Fn, args: FunctionArgs<Fn>) {
if ("workflowId" in ctx) {
return ctx.runMutation(fn, args, { inline: true });
} else {
return ctx.runMutation(fn, args);
}
}

/**
* Like ctx.runQuery, but supports inline queries in workflows.
*/
export function runQuery<
Fn extends FunctionReference<"query", FunctionVisibility>,
>(ctx: QueryCtx | MutationCtx | ActionCtx, fn: Fn, args: FunctionArgs<Fn>) {
if ("workflowId" in ctx) {
return ctx.runQuery(fn, args, { inline: true });
} else {
return ctx.runQuery(fn, args);
}
}
7 changes: 5 additions & 2 deletions src/client/saveInputMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ export async function saveInputMessages(
storageOptions?: {
saveMessages?: "all" | "promptAndOutput";
};
} & Pick<Config, "usageHandler" | "textEmbeddingModel" | "embeddingModel" | "callSettings">,
} & Pick<
Config,
"usageHandler" | "textEmbeddingModel" | "embeddingModel" | "callSettings"
>,
): Promise<{
promptMessageId: string | undefined;
pendingMessage: MessageDoc;
Expand Down Expand Up @@ -66,7 +69,7 @@ export async function saveInputMessages(
| undefined;
if ((args.embeddingModel ?? args.textEmbeddingModel) && toSave.length) {
assert(
"runAction" in ctx,
"runAction" in ctx && "storage" in ctx,
"You must be in an action context to generate embeddings",
);
embeddings = await embedMessages(
Expand Down
8 changes: 5 additions & 3 deletions src/client/threads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type {
MutationCtx,
QueryCtx,
} from "./types.js";
import { runMutation, runQuery } from "./run.js";

/**
* Create a thread to store messages with an Agent.
Expand All @@ -19,7 +20,8 @@ export async function createThread(
component: AgentComponent,
args?: { userId?: string | null; title?: string; summary?: string },
) {
const { _id: threadId } = await ctx.runMutation(
const { _id: threadId } = await runMutation(
ctx,
component.threads.createThread,
{
userId: args?.userId ?? undefined,
Expand All @@ -41,7 +43,7 @@ export async function getThreadMetadata(
component: AgentComponent,
args: { threadId: string },
): Promise<ThreadDoc> {
const thread = await ctx.runQuery(component.threads.getThread, {
const thread = await runQuery(ctx, component.threads.getThread, {
threadId: args.threadId,
});
if (!thread) {
Expand All @@ -55,7 +57,7 @@ export async function updateThreadMetadata(
component: AgentComponent,
args: { threadId: string; patch: Partial<WithoutSystemFields<ThreadDoc>> },
) {
return ctx.runMutation(component.threads.updateThread, {
return runMutation(ctx, component.threads.updateThread, {
threadId: args.threadId,
patch: args.patch,
});
Expand Down
33 changes: 11 additions & 22 deletions src/client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ export interface Output<_T = any, _P = any, _E = any> {
import type {
GenericActionCtx,
GenericDataModel,
GenericMutationCtx,
GenericQueryCtx,
WithoutSystemFields,
} from "convex/server";
import type {
Expand All @@ -43,6 +45,7 @@ import type {
} from "../validators.js";
import type { StreamingOptions } from "./streaming.js";
import type { ComponentApi } from "../component/_generated/component.js";
import type { WorkflowCtx } from "@convex-dev/workflow";

/**
* Type-level check that ensures models are from AI SDK v6.
Expand Down Expand Up @@ -354,10 +357,7 @@ export type TextArgs<
OUTPUT extends Output<any, any, any> = never,
> = Omit<
Parameters<
typeof generateText<
TOOLS extends undefined ? AgentTools : TOOLS,
OUTPUT
>
typeof generateText<TOOLS extends undefined ? AgentTools : TOOLS, OUTPUT>
>[0],
"model" | "prompt" | "messages"
> & {
Expand All @@ -374,10 +374,7 @@ export type StreamingTextArgs<
OUTPUT extends Output<any, any, any> = never,
> = Omit<
Parameters<
typeof streamText<
TOOLS extends undefined ? AgentTools : TOOLS,
OUTPUT
>
typeof streamText<TOOLS extends undefined ? AgentTools : TOOLS, OUTPUT>
>[0],
"model" | "prompt" | "messages"
> & {
Expand Down Expand Up @@ -494,11 +491,7 @@ export interface Thread<DefaultTools extends ToolSet> {
OUTPUT extends Output<any, any, any> = never,
>(
generateTextArgs: AgentPrompt &
TextArgs<
TOOLS extends undefined ? DefaultTools : TOOLS,
TOOLS,
OUTPUT
>,
TextArgs<TOOLS extends undefined ? DefaultTools : TOOLS, TOOLS, OUTPUT>,
options?: Options,
): Promise<
GenerateTextResult<TOOLS extends undefined ? DefaultTools : TOOLS, OUTPUT> &
Expand Down Expand Up @@ -539,10 +532,7 @@ export interface Thread<DefaultTools extends ToolSet> {
saveStreamDeltas?: boolean | StreamingOptions;
},
): Promise<
StreamTextResult<
TOOLS extends undefined ? DefaultTools : TOOLS,
OUTPUT
> &
StreamTextResult<TOOLS extends undefined ? DefaultTools : TOOLS, OUTPUT> &
ThreadOutputMetadata
>;
/**
Expand Down Expand Up @@ -627,11 +617,10 @@ export type SyncStreamsReturnValue =
| undefined;

/* Type utils follow */
export type QueryCtx = Pick<GenericActionCtx<GenericDataModel>, "runQuery">;
export type MutationCtx = Pick<
GenericActionCtx<GenericDataModel>,
"runQuery" | "runMutation"
>;
export type QueryCtx = Pick<GenericQueryCtx<GenericDataModel>, "runQuery">;
export type MutationCtx =
| Pick<GenericMutationCtx<GenericDataModel>, "runQuery" | "runMutation">
| WorkflowCtx;
export type ActionCtx = Pick<
GenericActionCtx<GenericDataModel>,
"runQuery" | "runMutation" | "runAction" | "storage" | "auth"
Expand Down
Loading