diff --git a/packages/xl-ai/src/streamTool/filterValidOperations.ts b/packages/xl-ai/src/streamTool/filterValidOperations.ts index 91b513a4f4..c48dab8781 100644 --- a/packages/xl-ai/src/streamTool/filterValidOperations.ts +++ b/packages/xl-ai/src/streamTool/filterValidOperations.ts @@ -29,6 +29,7 @@ export async function* filterValidOperations( let forceNewOperation = false; for await (const chunk of operationsStream) { const operation = chunk.operation; + if (operation.ok) { yield { operation: operation.value, diff --git a/packages/xl-ai/src/streamTool/preprocess.test.ts b/packages/xl-ai/src/streamTool/preprocess.test.ts index 814c6455f5..c323aed6c1 100644 --- a/packages/xl-ai/src/streamTool/preprocess.test.ts +++ b/packages/xl-ai/src/streamTool/preprocess.test.ts @@ -1,10 +1,7 @@ import { BlockNoteEditor } from "@blocknote/core"; import { beforeEach, describe, expect, it } from "vitest"; import { tools } from "../api/formats/json/tools/index.js"; -import { - preprocessOperationsNonStreaming, - preprocessOperationsStreaming, -} from "./preprocess.js"; +import { preprocessOperationsStreaming } from "./preprocess.js"; import { StreamTool } from "./streamTool.js"; const addOperationValid = { @@ -148,68 +145,37 @@ describe("preprocess", () => { }); }); - describe("preprocessOperationsNonStreaming", () => { - it("should pass valid operations", async () => { - async function* mockStream() { - yield { - partialOperation: addOperationValid, - isUpdateToPreviousOperation: false, - isPossiblyPartial: false, - metadata: undefined, - }; - } - - const results = await collectStreamToArray( - preprocessOperationsNonStreaming(mockStream(), streamTools), - ); - - expect(results.length).toBe(1); - }); - - it("should throw an error on invalid operations (invalid id)", async () => { - async function* mockStream() { - yield { - partialOperation: addOperationInvalidId, - isUpdateToPreviousOperation: false, - isPossiblyPartial: false, - metadata: undefined, - }; - } - - await expect( - collectStreamToArray( - preprocessOperationsNonStreaming(mockStream(), streamTools), - ), - ).rejects.toThrow(); - }); - - it("should throw an error on invalid operations (invalid type)", async () => { - async function* mockStream() { - yield { - partialOperation: invalidOperationType, - isUpdateToPreviousOperation: false, - isPossiblyPartial: false, - metadata: undefined, - }; - } - - await expect( - collectStreamToArray( - preprocessOperationsNonStreaming(mockStream(), streamTools), - ), - ).rejects.toThrow(); - }); - - it("should handle empty operation streams", async () => { - async function* mockStream() { - // Empty stream - } - - const results = await collectStreamToArray( - preprocessOperationsNonStreaming(mockStream(), streamTools), - ); + it("should throw an error on invalid operations (invalid id)", async () => { + async function* mockStream() { + yield { + partialOperation: addOperationInvalidId, + isUpdateToPreviousOperation: false, + isPossiblyPartial: false, + metadata: undefined, + }; + } + + await expect( + collectStreamToArray( + preprocessOperationsStreaming(mockStream(), streamTools), + ), + ).rejects.toThrow(); + }); - expect(results).toHaveLength(0); - }); + it("should throw an error on invalid operations (invalid type)", async () => { + async function* mockStream() { + yield { + partialOperation: invalidOperationType, + isUpdateToPreviousOperation: false, + isPossiblyPartial: false, + metadata: undefined, + }; + } + + await expect( + collectStreamToArray( + preprocessOperationsStreaming(mockStream(), streamTools), + ), + ).rejects.toThrow(); }); }); diff --git a/packages/xl-ai/src/streamTool/preprocess.ts b/packages/xl-ai/src/streamTool/preprocess.ts index a395059c15..7af54daf11 100644 --- a/packages/xl-ai/src/streamTool/preprocess.ts +++ b/packages/xl-ai/src/streamTool/preprocess.ts @@ -1,3 +1,4 @@ +import { getErrorMessage } from "@ai-sdk/provider-utils"; import { ChunkExecutionError } from "./ChunkExecutionError.js"; import { filterValidOperations } from "./filterValidOperations.js"; import { StreamTool, StreamToolCall } from "./streamTool.js"; @@ -36,47 +37,16 @@ export async function* preprocessOperationsStreaming< (chunk) => { if (!chunk.isPossiblyPartial) { // only throw if the operation is not possibly partial - - throw new ChunkExecutionError("invalid operation: " + chunk.operation.error, chunk); + throw new ChunkExecutionError( + `Invalid operation. ${getErrorMessage(chunk.operation.error)}`, + chunk, + { + cause: chunk.operation.error, + }, + ); } }, ); yield* validOperationsStream; } - -/** - * Validates an stream of operations and throws an error if an invalid operation is found. - * - * TODO: remove - * - * @deprecated - */ -export async function* preprocessOperationsNonStreaming< - T extends StreamTool[], ->( - operationsStream: AsyncIterable<{ - partialOperation: any; - isUpdateToPreviousOperation: boolean; - isPossiblyPartial: boolean; - metadata: any; - }>, - streamTools: T, -): AsyncGenerator> { - // from partial operations to valid / invalid operations - const validatedOperationsStream = toValidatedOperations( - operationsStream, - streamTools, - ); - - // filter valid operations, invalid operations should throw an error - const validOperationsStream = filterValidOperations( - validatedOperationsStream, - (chunk) => { - throw new Error("invalid operation: " + chunk.operation.error); - }, - ); - - // yield results - yield* validOperationsStream; -} diff --git a/packages/xl-ai/src/streamTool/toValidatedOperations.ts b/packages/xl-ai/src/streamTool/toValidatedOperations.ts index f1dd5af047..97dc65fade 100644 --- a/packages/xl-ai/src/streamTool/toValidatedOperations.ts +++ b/packages/xl-ai/src/streamTool/toValidatedOperations.ts @@ -22,6 +22,18 @@ export async function* toValidatedOperations[]>( metadata: any; }> { for await (const chunk of partialObjectStream) { + if (!chunk.partialOperation.type) { + yield { + operation: { + ok: false, + error: "The `type` property of an operation is required.", + }, + isUpdateToPreviousOperation: chunk.isUpdateToPreviousOperation, + isPossiblyPartial: chunk.isPossiblyPartial, + metadata: chunk.metadata, + }; + continue; + } const func = streamTools.find( (f) => f.name === chunk.partialOperation.type, )!; diff --git a/packages/xl-ai/src/streamTool/vercelAiSdk/util/chatHandlers.ts b/packages/xl-ai/src/streamTool/vercelAiSdk/util/chatHandlers.ts index 5bab921d7f..2014b2d8a8 100644 --- a/packages/xl-ai/src/streamTool/vercelAiSdk/util/chatHandlers.ts +++ b/packages/xl-ai/src/streamTool/vercelAiSdk/util/chatHandlers.ts @@ -129,6 +129,8 @@ export async function setupToolCallStreaming( if (result.status === "rejected") { if (result.reason instanceof ChunkExecutionError) { + // all errors thrown in the pipeline should be ChunkExecutionErrors, + // so we can retrieve the chunk that caused the error error = result.reason; } else { if (!chat.error) {