From b4871aa99276973c2e7d7289fd34b0b1cdea8994 Mon Sep 17 00:00:00 2001 From: Aarne Laur Date: Sat, 7 Mar 2026 20:26:57 +0100 Subject: [PATCH 1/3] interpreted engine --- .changeset/native-batched-tool-authoring.md | 9 + packages/bridge-compiler/src/codegen.ts | 78 ++++++- .../bridge-compiler/src/execute-bridge.ts | 35 ++- packages/bridge-core/src/ExecutionTree.ts | 216 +++++++++++++++++- packages/bridge-core/src/index.ts | 4 + packages/bridge-core/src/tracing.ts | 9 + packages/bridge-core/src/types.ts | 4 + packages/bridge-types/src/index.ts | 68 +++++- packages/bridge/test/native-batching.test.ts | 137 +++++++++++ .../src/content/docs/advanced/custom-tools.md | 80 ++++++- 10 files changed, 623 insertions(+), 17 deletions(-) create mode 100644 .changeset/native-batched-tool-authoring.md create mode 100644 packages/bridge/test/native-batching.test.ts diff --git a/.changeset/native-batched-tool-authoring.md b/.changeset/native-batched-tool-authoring.md new file mode 100644 index 00000000..92c6c452 --- /dev/null +++ b/.changeset/native-batched-tool-authoring.md @@ -0,0 +1,9 @@ +--- +"@stackables/bridge": minor +"@stackables/bridge-core": minor +"@stackables/bridge-types": minor +--- + +Improve native batched tool authoring by documenting the feature, exporting dedicated batch tool types, and simplifying the batch contract to plain input arrays. + +Batch tools now receive `Input[]` and must return `Output[]` in matching order. Batched tool tracing and logging are also emitted once per flushed batch call instead of once per queued item. diff --git a/packages/bridge-compiler/src/codegen.ts b/packages/bridge-compiler/src/codegen.ts index d05c52f5..376aa125 100644 --- a/packages/bridge-compiler/src/codegen.ts +++ b/packages/bridge-compiler/src/codegen.ts @@ -703,6 +703,10 @@ class CodegenContext { lines.push( ` const __ctx = { logger: __opts?.logger ?? {}, signal: __signal };`, ); + lines.push( + ` const __queueMicrotask = globalThis.queueMicrotask ?? ((fn) => Promise.resolve().then(fn));`, + ); + lines.push(` const __batchQueues = new Map();`); lines.push(` const __trace = __opts?.__trace;`); lines.push(` function __rethrowBridgeError(err, loc) {`); lines.push( @@ -788,6 +792,74 @@ class CodegenContext { lines.push(` }`); lines.push(` return result;`); lines.push(` }`); + lines.push(` function __callBatch(fn, input, toolName) {`); + lines.push( + ` if (__signal?.aborted) return Promise.reject(new __BridgeAbortError());`, + ); + lines.push(` let queue = __batchQueues.get(fn);`); + lines.push(` if (!queue) {`); + lines.push( + ` queue = { items: [], scheduled: false, toolName, maxBatchSize: typeof fn.bridge?.batch === "object" && fn.bridge?.batch?.maxBatchSize > 0 ? Math.floor(fn.bridge.batch.maxBatchSize) : undefined };`, + ); + lines.push(` __batchQueues.set(fn, queue);`); + lines.push(` }`); + lines.push(` return new Promise((resolve, reject) => {`); + lines.push(` queue.items.push({ input, resolve, reject });`); + lines.push(` if (queue.scheduled) return;`); + lines.push(` queue.scheduled = true;`); + lines.push( + ` __queueMicrotask(() => { void __flushBatch(fn, queue); });`, + ); + lines.push(` });`); + lines.push(` }`); + lines.push(` async function __flushBatch(fn, queue) {`); + lines.push( + ` const pending = queue.items.splice(0, queue.items.length);`, + ); + lines.push(` queue.scheduled = false;`); + lines.push(` if (pending.length === 0) return;`); + lines.push(` if (__signal?.aborted) {`); + lines.push(` const err = new __BridgeAbortError();`); + lines.push(` for (const item of pending) item.reject(err);`); + lines.push(` return;`); + lines.push(` }`); + lines.push( + ` const chunkSize = queue.maxBatchSize && queue.maxBatchSize > 0 ? queue.maxBatchSize : pending.length;`, + ); + lines.push( + ` for (let start = 0; start < pending.length; start += chunkSize) {`, + ); + lines.push(` const chunk = pending.slice(start, start + chunkSize);`); + lines.push(` try {`); + lines.push(` const inputs = chunk.map((item) => item.input);`); + lines.push(` const batchPromise = fn(inputs, __ctx);`); + lines.push(` let result;`); + lines.push( + ` if (__timeoutMs > 0 && batchPromise && typeof batchPromise.then === "function") {`, + ); + lines.push( + ` let t; const timeout = new Promise((_, rej) => { t = setTimeout(() => rej(new __BridgeTimeoutError(queue.toolName, __timeoutMs)), __timeoutMs); });`, + ); + lines.push( + ` try { result = await Promise.race([batchPromise, timeout]); } finally { clearTimeout(t); }`, + ); + lines.push(` } else {`); + lines.push(` result = await batchPromise;`); + lines.push(` }`); + lines.push( + ` if (!Array.isArray(result)) throw new Error('Batch tool "' + queue.toolName + '" must return an array of results');`, + ); + lines.push( + ` if (result.length !== chunk.length) throw new Error('Batch tool "' + queue.toolName + '" returned ' + result.length + ' results for ' + chunk.length + ' queued calls');`, + ); + lines.push( + ` for (let i = 0; i < chunk.length; i++) chunk[i].resolve(result[i]);`, + ); + lines.push(` } catch (err) {`); + lines.push(` for (const item of chunk) item.reject(err);`); + lines.push(` }`); + lines.push(` }`); + lines.push(` }`); // Sync tool caller — no await, no timeout, enforces no-promise return. lines.push(` function __callSync(fn, input, toolName) {`); lines.push(` if (__signal?.aborted) throw new __BridgeAbortError();`); @@ -875,7 +947,7 @@ class CodegenContext { lines.push(` if (cached !== undefined) return cached;`); lines.push(` try {`); lines.push( - ` const result = fn.bridge?.sync ? __callSync(fn, input, toolName) : __call(fn, input, toolName);`, + ` const result = fn.bridge?.batch ? __callBatch(fn, input, toolName) : fn.bridge?.sync ? __callSync(fn, input, toolName) : __call(fn, input, toolName);`, ); lines.push(` if (result && typeof result.then === "function") {`); lines.push( @@ -1114,7 +1186,7 @@ class CodegenContext { if (memoizeTrunkKey && this.memoizedToolKeys.has(memoizeTrunkKey)) { return `await __callMemoized(${fn}, ${inputObj}, ${name}, ${JSON.stringify(memoizeTrunkKey)})`; } - return `(${fn}.bridge?.sync ? __callSync(${fn}, ${inputObj}, ${name}) : await __call(${fn}, ${inputObj}, ${name}))`; + return `(${fn}.bridge?.batch ? await __callBatch(${fn}, ${inputObj}, ${name}) : ${fn}.bridge?.sync ? __callSync(${fn}, ${inputObj}, ${name}) : await __call(${fn}, ${inputObj}, ${name}))`; } /** @@ -1131,7 +1203,7 @@ class CodegenContext { if (memoizeTrunkKey && this.memoizedToolKeys.has(memoizeTrunkKey)) { return `__callMemoized(${fn}, ${inputObj}, ${name}, ${JSON.stringify(memoizeTrunkKey)})`; } - return `(${fn}.bridge?.sync ? __callSync(${fn}, ${inputObj}, ${name}) : __call(${fn}, ${inputObj}, ${name}))`; + return `(${fn}.bridge?.batch ? __callBatch(${fn}, ${inputObj}, ${name}) : ${fn}.bridge?.sync ? __callSync(${fn}, ${inputObj}, ${name}) : __call(${fn}, ${inputObj}, ${name}))`; } /** diff --git a/packages/bridge-compiler/src/execute-bridge.ts b/packages/bridge-compiler/src/execute-bridge.ts index 04981eb5..56cc7ac8 100644 --- a/packages/bridge-compiler/src/execute-bridge.ts +++ b/packages/bridge-compiler/src/execute-bridge.ts @@ -221,6 +221,12 @@ function flattenTools( return flat; } +function hasBatchedTool(flatTools: Record): boolean { + return Object.values(flatTools).some( + (value) => typeof value === "function" && value.bridge?.batch, + ); +} + // ── Public API ────────────────────────────────────────────────────────────── /** @@ -259,6 +265,30 @@ export async function executeBridge( maxDepth, } = options; + // Prototype limitation: runtime batching exists, but compiled batching still + // needs dedicated scheduling codegen to coalesce loop-scoped awaits. Fall + // back to the interpreter when any tool opts into `.bridge.batch`. + const allTools: ToolMap = { std: bundledStd, ...userTools }; + const flatTools = flattenTools(allTools as Record); + if (hasBatchedTool(flatTools)) { + logger?.warn?.( + "Batched tools currently run through the interpreter. Falling back to core executeBridge.", + ); + return executeCoreBridge({ + document, + operation, + input, + tools: userTools, + context, + signal, + toolTimeoutMs, + logger, + trace: options.trace, + requestedFields: options.requestedFields, + ...(maxDepth !== undefined ? { maxDepth } : {}), + }); + } + let fn: BridgeFn; try { fn = getOrCompile(document, operation, options.requestedFields); @@ -282,11 +312,6 @@ export async function executeBridge( throw err; } - // Merge built-in std namespace with user-provided tools, then flatten - // so the generated code can access them via dotted keys like tools["std.str.toUpperCase"]. - const allTools: ToolMap = { std: bundledStd, ...userTools }; - const flatTools = flattenTools(allTools as Record); - // Set up tracing if requested const traceLevel = options.trace ?? "off"; let tracer: TraceCollector | undefined; diff --git a/packages/bridge-core/src/ExecutionTree.ts b/packages/bridge-core/src/ExecutionTree.ts index ce39025c..bca724cd 100644 --- a/packages/bridge-core/src/ExecutionTree.ts +++ b/packages/bridge-core/src/ExecutionTree.ts @@ -5,7 +5,7 @@ import { trunkDependsOnElement, } from "./scheduleTools.ts"; import { internal } from "./tools/index.ts"; -import type { ToolTrace } from "./tracing.ts"; +import type { EffectiveToolLog, ToolTrace } from "./tracing.ts"; import { isOtelActive, logToolError, @@ -89,6 +89,20 @@ function stableMemoizeKey(value: unknown): string { .join(",")}}`; } +type PendingBatchToolCall = { + input: Record; + resolve: (value: any) => void; + reject: (err: unknown) => void; +}; + +type BatchToolQueue = { + items: PendingBatchToolCall[]; + scheduled: boolean; + toolName: string; + fnName: string; + maxBatchSize?: number; +}; + export class ExecutionTree implements TreeContext { state: Record = {}; bridge: Bridge | undefined; @@ -123,6 +137,9 @@ export class ExecutionTree implements TreeContext { /** Per-tool memoization caches keyed by stable input fingerprints. */ private toolMemoCache: Map>> = new Map(); + /** Per-request batch queues for tools declared with `.bridge.batch`. */ + private toolBatchQueues: Map<(...args: any[]) => any, BatchToolQueue> = + new Map(); /** Promise that resolves when all critical `force` handles have settled. */ private forcedExecution?: Promise; /** Shared trace collector — present only when tracing is enabled. */ @@ -305,7 +322,21 @@ export class ExecutionTree implements TreeContext { }; const timeoutMs = this.toolTimeoutMs; - const { sync: isSyncTool, doTrace, log } = resolveToolMeta(fnImpl); + const { sync: isSyncTool, batch, doTrace, log } = resolveToolMeta(fnImpl); + + if (batch) { + return this.callBatchedTool( + toolName, + fnName, + fnImpl, + input, + timeoutMs, + toolContext, + doTrace, + log, + batch.maxBatchSize, + ); + } // ── Fast path: no instrumentation configured ────────────────── // When there is no internal tracer, no logger, and OpenTelemetry @@ -480,6 +511,186 @@ export class ExecutionTree implements TreeContext { ); } + private callBatchedTool( + toolName: string, + fnName: string, + fnImpl: (...args: any[]) => any, + input: Record, + timeoutMs: number, + toolContext: ToolContext, + doTrace: boolean, + log: EffectiveToolLog, + maxBatchSize?: number, + ): Promise { + let queue = this.toolBatchQueues.get(fnImpl); + if (!queue) { + queue = { + items: [], + scheduled: false, + toolName, + fnName, + maxBatchSize, + }; + this.toolBatchQueues.set(fnImpl, queue); + } + + if (maxBatchSize !== undefined) { + queue.maxBatchSize = maxBatchSize; + } + + return new Promise((resolve, reject) => { + queue!.items.push({ input, resolve, reject }); + if (queue!.scheduled) return; + queue!.scheduled = true; + queueMicrotask(() => { + void this.flushBatchedToolQueue( + fnImpl, + toolContext, + timeoutMs, + doTrace, + log, + ); + }); + }); + } + + private async flushBatchedToolQueue( + fnImpl: (...args: any[]) => any, + toolContext: ToolContext, + timeoutMs: number, + doTrace: boolean, + log: EffectiveToolLog, + ): Promise { + const queue = this.toolBatchQueues.get(fnImpl); + if (!queue) return; + + const pending = queue.items.splice(0, queue.items.length); + queue.scheduled = false; + if (pending.length === 0) return; + + if (this.signal?.aborted) { + const abortErr = new BridgeAbortError(); + for (const item of pending) item.reject(abortErr); + return; + } + + const chunkSize = + queue.maxBatchSize && queue.maxBatchSize > 0 + ? Math.floor(queue.maxBatchSize) + : pending.length; + + for (let start = 0; start < pending.length; start += chunkSize) { + const chunk = pending.slice(start, start + chunkSize); + const batchInput = chunk.map((item) => item.input); + const tracer = this.tracer; + const logger = this.logger; + const metricAttrs = { + "bridge.tool.name": queue.toolName, + "bridge.tool.fn": queue.fnName, + }; + + try { + const executeBatch = async () => { + const batchResult = fnImpl(batchInput, toolContext); + return timeoutMs > 0 && isPromise(batchResult) + ? await raceTimeout(batchResult, timeoutMs, queue.toolName) + : await batchResult; + }; + + const resolved = + !tracer && !logger && !isOtelActive() + ? await executeBatch() + : await withSpan( + doTrace, + `bridge.tool.${queue.toolName}.${queue.fnName}`, + metricAttrs, + async (span) => { + const traceStart = tracer?.now(); + const wallStart = performance.now(); + try { + const result = await executeBatch(); + const durationMs = roundMs(performance.now() - wallStart); + toolCallCounter.add(1, metricAttrs); + toolDurationHistogram.record(durationMs, metricAttrs); + if (tracer && traceStart != null) { + tracer.record( + tracer.entry({ + tool: queue.toolName, + fn: queue.fnName, + input: batchInput, + output: result, + durationMs: roundMs(tracer.now() - traceStart), + startedAt: traceStart, + }), + ); + } + logToolSuccess( + logger, + log.execution, + queue.toolName, + queue.fnName, + durationMs, + ); + return result; + } catch (err) { + const durationMs = roundMs(performance.now() - wallStart); + toolCallCounter.add(1, metricAttrs); + toolDurationHistogram.record(durationMs, metricAttrs); + toolErrorCounter.add(1, metricAttrs); + if (tracer && traceStart != null) { + tracer.record( + tracer.entry({ + tool: queue.toolName, + fn: queue.fnName, + input: batchInput, + error: (err as Error).message, + durationMs: roundMs(tracer.now() - traceStart), + startedAt: traceStart, + }), + ); + } + recordSpanError(span, err as Error); + logToolError( + logger, + log.errors, + queue.toolName, + queue.fnName, + err as Error, + ); + if ( + this.signal?.aborted && + err instanceof DOMException && + err.name === "AbortError" + ) { + throw new BridgeAbortError(); + } + throw err; + } finally { + span?.end(); + } + }, + ); + + if (!Array.isArray(resolved)) { + throw new Error( + `Batch tool "${queue.fnName}" must return an array of results`, + ); + } + if (resolved.length !== chunk.length) { + throw new Error( + `Batch tool "${queue.fnName}" returned ${resolved.length} results for ${chunk.length} queued calls`, + ); + } + + for (let i = 0; i < chunk.length; i++) { + chunk[i]!.resolve(resolved[i]); + } + } catch (err) { + for (const item of chunk) item.reject(err); + } + } + } + shadow(): ExecutionTree { // Lightweight: bypass the constructor to avoid redundant work that // re-derives data identical to the parent (bridge lookup, pipeHandleMap, @@ -505,6 +716,7 @@ export class ExecutionTree implements TreeContext { child.handleVersionMap = this.handleVersionMap; child.memoizedToolKeys = this.memoizedToolKeys; child.toolMemoCache = this.toolMemoCache; + child.toolBatchQueues = this.toolBatchQueues; child.toolFns = this.toolFns; child.elementTrunkKey = this.elementTrunkKey; child.tracer = this.tracer; diff --git a/packages/bridge-core/src/index.ts b/packages/bridge-core/src/index.ts index 13ad98a3..db41eb9a 100644 --- a/packages/bridge-core/src/index.ts +++ b/packages/bridge-core/src/index.ts @@ -58,6 +58,8 @@ export { SELF_MODULE } from "./types.ts"; export type { Bridge, BridgeDocument, + BatchToolCallFn, + BatchToolFn, CacheStore, ConstDef, ControlFlowInstruction, @@ -66,6 +68,8 @@ export type { Instruction, NodeRef, SourceLocation, + ScalarToolCallFn, + ScalarToolFn, ToolCallFn, ToolContext, ToolDef, diff --git a/packages/bridge-core/src/tracing.ts b/packages/bridge-core/src/tracing.ts index 24cc4989..94e8bb29 100644 --- a/packages/bridge-core/src/tracing.ts +++ b/packages/bridge-core/src/tracing.ts @@ -8,6 +8,7 @@ import { metrics, trace } from "@opentelemetry/api"; import type { Span } from "@opentelemetry/api"; import type { ToolMetadata } from "@stackables/bridge-types"; +import type { BatchToolMetadata } from "@stackables/bridge-types"; import type { Logger } from "./tree-types.ts"; import { roundMs } from "./tree-utils.ts"; @@ -248,6 +249,8 @@ export type EffectiveToolLog = { export type ResolvedToolMeta = { /** Whether the tool declares synchronous execution. */ sync: boolean; + /** Batch mode contract, when declared. */ + batch?: BatchToolMetadata; /** Emit an OTel span for this call. Default: `true`. */ doTrace: boolean; log: EffectiveToolLog; @@ -271,6 +274,12 @@ export function resolveToolMeta(fn: (...args: any[]) => any): ResolvedToolMeta { const bridge = (fn as any).bridge as ToolMetadata | undefined; return { sync: bridge?.sync === true, + batch: + bridge?.batch === true + ? {} + : typeof bridge?.batch === "object" + ? bridge.batch + : undefined, doTrace: bridge?.trace !== false, log: resolveToolLog(bridge), }; diff --git a/packages/bridge-core/src/types.ts b/packages/bridge-core/src/types.ts index 5bd7fdcc..cb5b2230 100644 --- a/packages/bridge-core/src/types.ts +++ b/packages/bridge-core/src/types.ts @@ -256,7 +256,11 @@ export type ToolWire = // Re-exported from @stackables/bridge-types to break circular dependency // with bridge-stdlib while maintaining backward-compatible imports. export type { + BatchToolCallFn, + BatchToolFn, ToolContext, + ScalarToolCallFn, + ScalarToolFn, ToolCallFn, ToolMap, ToolMetadata, diff --git a/packages/bridge-types/src/index.ts b/packages/bridge-types/src/index.ts index c1db04e6..d53087fd 100644 --- a/packages/bridge-types/src/index.ts +++ b/packages/bridge-types/src/index.ts @@ -27,7 +27,7 @@ export type ToolContext = { }; /** - * Tool call function — the signature for registered tool functions. + * Scalar tool call function — the default signature for registered tools. * * Receives a fully-built nested input object and an optional `ToolContext` * providing access to the engine's logger and other services. @@ -36,10 +36,35 @@ export type ToolContext = { * input = { baseUrl: "https://...", method: "GET", path: "/geocode", * headers: { apiKey: "..." }, q: "Berlin" } */ -export type ToolCallFn = ( - input: Record, - context?: ToolContext, -) => Promise>; +export type ScalarToolCallFn< + Input extends Record = Record, + Output = any, +> = (input: Input, context?: ToolContext) => Output | Promise; + +/** + * Batch tool call function — opt-in signature for tools declared with + * `{ batch: true }` or `{ batch: { ... } }` metadata. + * + * The engine passes a plain array of input objects to the tool. The returned + * array must preserve the same ordering and length. + */ +export type BatchToolCallFn< + Input extends Record = Record, + Output = any, +> = (inputs: Input[], context?: ToolContext) => Output[] | Promise; + +/** Backward-compatible alias for the standard scalar tool signature. */ +export type ToolCallFn< + Input extends Record = Record, + Output = any, +> = ScalarToolCallFn; + +export interface BatchToolMetadata { + /** Maximum number of queued calls to flush in a single engine batch. */ + maxBatchSize?: number; + /** Flush strategy for queued calls. Prototype only supports microtasks. */ + flush?: "microtask"; +} /** * Optional metadata that can be attached to any tool function as a `.bridge` property. @@ -64,6 +89,15 @@ export interface ToolMetadata { */ sync?: boolean; + /** + * If set, the tool is invoked in batch mode and always receives an array of + * input objects instead of a single input object. + * + * The tool must return an array of results with the same ordering and + * length as the input batch. + */ + batch?: true | BatchToolMetadata; + // ─── Observability ──────────────────────────────────────────────────── /** @@ -95,6 +129,22 @@ export interface ToolMetadata { }; } +/** Scalar tool function with optional `.bridge` metadata attached. */ +export type ScalarToolFn< + Input extends Record = Record, + Output = any, +> = ScalarToolCallFn & { + bridge?: ToolMetadata; +}; + +/** Batch tool function with optional `.bridge` metadata attached. */ +export type BatchToolFn< + Input extends Record = Record, + Output = any, +> = BatchToolCallFn & { + bridge?: ToolMetadata; +}; + /** * Recursive tool map — supports namespaced tools via nesting. * @@ -104,7 +154,13 @@ export interface ToolMetadata { * Lookup is dot-separated: "std.str.toUpperCase" → tools.std.str.toUpperCase */ export type ToolMap = { - [key: string]: ToolCallFn | ((...args: any[]) => any) | ToolMap; + [key: string]: + | ToolCallFn + | BatchToolCallFn + | ScalarToolFn + | BatchToolFn + | ((...args: any[]) => any) + | ToolMap; }; /** diff --git a/packages/bridge/test/native-batching.test.ts b/packages/bridge/test/native-batching.test.ts new file mode 100644 index 00000000..7d7ed371 --- /dev/null +++ b/packages/bridge/test/native-batching.test.ts @@ -0,0 +1,137 @@ +import assert from "node:assert/strict"; +import { test } from "node:test"; +import { parseBridgeFormat as parseBridge } from "../src/index.ts"; +import type { BatchToolFn, ToolMetadata } from "../src/index.ts"; +import { forEachEngine } from "./_dual-run.ts"; + +forEachEngine("native batched tools", (run, ctx) => { + test("tool metadata batches loop-scoped calls without userland loaders", async () => { + const bridge = `version 1.5 + +bridge Query.users { + with context as ctx + with output as o + + o <- ctx.userIds[] as userId { + with app.fetchUser as user + + user.id <- userId + .id <- userId + .name <- user.name + } +}`; + + let batchCalls = 0; + let receivedInputs: Array<{ id: string }> | undefined; + + const fetchUser: BatchToolFn<{ id: string }, { name: string }> = async ( + inputs, + ) => { + batchCalls++; + receivedInputs = inputs; + return inputs.map((input) => ({ + name: `user:${input.id}`, + })); + }; + + // Batching is opt-in through tool metadata, so bridge authors write + // ordinary wires and do not need to thread DataLoaders via context. + fetchUser.bridge = { + batch: { + maxBatchSize: 100, + flush: "microtask", + }, + } satisfies ToolMetadata; + + const result = await run( + bridge, + "Query.users", + {}, + { + app: { fetchUser }, + }, + { + context: { + userIds: ["u1", "u2", "u3"], + }, + }, + ); + + assert.deepEqual(result.data, [ + { id: "u1", name: "user:u1" }, + { id: "u2", name: "user:u2" }, + { id: "u3", name: "user:u3" }, + ]); + + assert.deepEqual(receivedInputs, [ + { id: "u1" }, + { id: "u2" }, + { id: "u3" }, + ]); + assert.equal(batchCalls, 1); + }); + + test("batched tools emit one trace and log entry per flushed batch call", async () => { + const bridge = `version 1.5 + +bridge Query.users { + with context as ctx + with output as o + + o <- ctx.userIds[] as userId { + with app.fetchUser as user + + user.id <- userId + .id <- userId + .name <- user.name + } +}`; + + const infos: Array<{ tool: string; fn: string; durationMs: number }> = []; + + const fetchUser: BatchToolFn<{ id: string }, { name: string }> = async ( + inputs, + ) => inputs.map((input) => ({ name: `user:${input.id}` })); + + fetchUser.bridge = { + batch: true, + log: { execution: "info" }, + } satisfies ToolMetadata; + + const result = await ctx.executeFn({ + document: parseBridge(bridge), + operation: "Query.users", + tools: { + app: { fetchUser }, + }, + context: { + userIds: ["u1", "u2", "u3"], + }, + trace: "full", + logger: { + info: (meta: { tool: string; fn: string; durationMs: number }) => { + infos.push(meta); + }, + }, + }); + + assert.deepEqual(result.data, [ + { id: "u1", name: "user:u1" }, + { id: "u2", name: "user:u2" }, + { id: "u3", name: "user:u3" }, + ]); + assert.equal(result.traces.length, 1); + assert.equal(result.traces[0]!.tool, "app.fetchUser"); + assert.deepEqual(result.traces[0]!.input, [ + { id: "u1" }, + { id: "u2" }, + { id: "u3" }, + ]); + assert.deepEqual(result.traces[0]!.output, [ + { name: "user:u1" }, + { name: "user:u2" }, + { name: "user:u3" }, + ]); + assert.equal(infos.length, 1); + }); +}); diff --git a/packages/docs-site/src/content/docs/advanced/custom-tools.md b/packages/docs-site/src/content/docs/advanced/custom-tools.md index 29e6c2f7..d1bb23c4 100644 --- a/packages/docs-site/src/content/docs/advanced/custom-tools.md +++ b/packages/docs-site/src/content/docs/advanced/custom-tools.md @@ -6,7 +6,10 @@ description: Writing custom tools ## Register Custom tools You can inject your own tools into the engine. A tool is any function -`(input: object) => object | Promise`. +`(input: object, context?: ToolContext) => object | Promise`. + +If you opt into native batching, the signature becomes +`(inputs: object[], context?: ToolContext) => object[] | Promise`. ### Gateway Mode @@ -67,6 +70,31 @@ export interface ToolContext { } ``` +For type-safe authoring, Bridge exports dedicated function types for both scalar and batched tools. + +```typescript +import type { + BatchToolFn, + ScalarToolFn, + ToolContext, +} from "@stackables/bridge"; + +const geocoder: ScalarToolFn< + { q: string }, + { lat: number; lon: number } +> = async (input, context) => { + context.logger?.debug?.({ q: input.q }, "geocoding"); + return await geocodeService.lookup(input.q, { signal: context.signal }); +}; + +const fetchUsers: BatchToolFn<{ id: string }, { name: string }> = async ( + inputs, + context, +) => { + return await userService.fetchMany(inputs, { signal: context.signal }); +}; +``` + ### The `AbortSignal` The Bridge engine uses a unified architecture for handling **Fatal Execution Halts**. Whether a client disconnects from the GraphQL server, or a developer writes a `panic` keyword in a `.bridge` file to intentionally kill the request, the engine triggers the `AbortSignal`. @@ -108,3 +136,53 @@ geocoder.bridge = { }, } satisfies ToolMetadata; ``` + +## Native Batching + +If your backend already supports bulk fetches, you can let Bridge batch loop-scoped tool calls for you. This removes the need to thread DataLoaders through GraphQL context just to avoid N+1 calls. + +### Batch Authoring Contract + +Mark the tool with `bridge.batch`, then implement it as `Input[] -> Output[]`. + +```typescript +import type { BatchToolFn, ToolMetadata } from "@stackables/bridge"; + +export const fetchUsers: BatchToolFn<{ id: string }, { name: string }> = async ( + inputs, + context, +) => { + const rows = await userService.fetchManyById( + inputs.map((input) => input.id), + { signal: context.signal }, + ); + + return inputs.map((input) => ({ + name: rows.get(input.id)?.name ?? "unknown", + })); +}; + +fetchUsers.bridge = { + batch: { + maxBatchSize: 100, + flush: "microtask", + }, +} satisfies ToolMetadata; +``` + +Rules: + +- A batched tool always receives a plain array of input objects. +- A batched tool must return an array with the same length and ordering. +- Bridge fans the results back out to the original wire sites automatically. +- `maxBatchSize` splits very large queues into multiple batch calls. +- `flush: "microtask"` means compatible calls in the same microtask are coalesced together. + +### Tracing and Logging + +Batch tools are instrumented once per flushed batch call, not once per item. + +- One OpenTelemetry span is emitted for each actual batch function call. +- One trace entry is recorded for each actual batch function call. +- One success or error log is emitted for each actual batch function call. +- In `trace: "full"`, the trace input/output are arrays. From 0714325258acd7ad4ad9c10276f2c1ea5ed506e0 Mon Sep 17 00:00:00 2001 From: Aarne Laur Date: Sat, 7 Mar 2026 20:38:16 +0100 Subject: [PATCH 2/3] Compiler --- packages/bridge-compiler/src/codegen.ts | 126 ++++++++++++------ .../bridge-compiler/src/execute-bridge.ts | 35 +---- packages/bridge-compiler/test/codegen.test.ts | 66 +++++++++ .../src/content/docs/advanced/custom-tools.md | 1 + 4 files changed, 158 insertions(+), 70 deletions(-) diff --git a/packages/bridge-compiler/src/codegen.ts b/packages/bridge-compiler/src/codegen.ts index 376aa125..92995cbf 100644 --- a/packages/bridge-compiler/src/codegen.ts +++ b/packages/bridge-compiler/src/codegen.ts @@ -708,6 +708,18 @@ class CodegenContext { ); lines.push(` const __batchQueues = new Map();`); lines.push(` const __trace = __opts?.__trace;`); + lines.push(` function __toolExecutionLogLevel(fn) {`); + lines.push(` const log = fn?.bridge?.log;`); + lines.push(` if (log === false || log == null) return false;`); + lines.push(` if (log === true) return "info";`); + lines.push(` return log.execution === "info" ? "info" : log.execution ? "debug" : false;`); + lines.push(` }`); + lines.push(` function __toolErrorLogLevel(fn) {`); + lines.push(` const log = fn?.bridge?.log;`); + lines.push(` if (log === false) return false;`); + lines.push(` if (log == null || log === true) return "error";`); + lines.push(` return log.errors === false ? false : log.errors === "warn" ? "warn" : "error";`); + lines.push(` }`); lines.push(` function __rethrowBridgeError(err, loc) {`); lines.push( ` if (err?.name === "BridgePanicError") throw __attachBridgeMeta(err, loc);`, @@ -832,6 +844,7 @@ class CodegenContext { lines.push(` const chunk = pending.slice(start, start + chunkSize);`); lines.push(` try {`); lines.push(` const inputs = chunk.map((item) => item.input);`); + lines.push(` const startTime = (__trace || __ctx.logger) ? performance.now() : 0;`); lines.push(` const batchPromise = fn(inputs, __ctx);`); lines.push(` let result;`); lines.push( @@ -846,6 +859,9 @@ class CodegenContext { lines.push(` } else {`); lines.push(` result = await batchPromise;`); lines.push(` }`); + lines.push(` if (__trace) __trace(queue.toolName, startTime, performance.now(), inputs, result, null);`); + lines.push(` const __execLevel = __toolExecutionLogLevel(fn);`); + lines.push(` if (__execLevel) __ctx.logger?.[__execLevel]?.({ tool: queue.toolName, fn: queue.toolName, durationMs: Math.round((performance.now() - startTime) * 1000) / 1000 }, "[bridge] tool completed");`); lines.push( ` if (!Array.isArray(result)) throw new Error('Batch tool "' + queue.toolName + '" must return an array of results');`, ); @@ -856,6 +872,9 @@ class CodegenContext { ` for (let i = 0; i < chunk.length; i++) chunk[i].resolve(result[i]);`, ); lines.push(` } catch (err) {`); + lines.push(` if (__trace) __trace(queue.toolName, startTime, performance.now(), inputs, null, err);`); + lines.push(` const __errorLevel = __toolErrorLogLevel(fn);`); + lines.push(` if (__errorLevel) __ctx.logger?.[__errorLevel]?.({ tool: queue.toolName, fn: queue.toolName, err: err instanceof Error ? err.message : String(err) }, "[bridge] tool failed");`); lines.push(` for (const item of chunk) item.reject(err);`); lines.push(` }`); lines.push(` }`); @@ -872,11 +891,15 @@ class CodegenContext { lines.push( ` if (__trace) __trace(toolName, start, performance.now(), input, result, null);`, ); + lines.push(` const __execLevel = __toolExecutionLogLevel(fn);`); + lines.push(` if (__execLevel) __ctx.logger?.[__execLevel]?.({ tool: toolName, fn: toolName, durationMs: Math.round((performance.now() - start) * 1000) / 1000 }, "[bridge] tool completed");`); lines.push(` return result;`); lines.push(` } catch (err) {`); lines.push( ` if (__trace) __trace(toolName, start, performance.now(), input, null, err);`, ); + lines.push(` const __errorLevel = __toolErrorLogLevel(fn);`); + lines.push(` if (__errorLevel) __ctx.logger?.[__errorLevel]?.({ tool: toolName, fn: toolName, err: err instanceof Error ? err.message : String(err) }, "[bridge] tool failed");`); lines.push(` throw err;`); lines.push(` }`); lines.push(` }`); @@ -906,11 +929,15 @@ class CodegenContext { lines.push( ` if (__trace) __trace(toolName, start, performance.now(), input, result, null);`, ); + lines.push(` const __execLevel = __toolExecutionLogLevel(fn);`); + lines.push(` if (__execLevel) __ctx.logger?.[__execLevel]?.({ tool: toolName, fn: toolName, durationMs: Math.round((performance.now() - start) * 1000) / 1000 }, "[bridge] tool completed");`); lines.push(` return result;`); lines.push(` } catch (err) {`); lines.push( ` if (__trace) __trace(toolName, start, performance.now(), input, null, err);`, ); + lines.push(` const __errorLevel = __toolErrorLogLevel(fn);`); + lines.push(` if (__errorLevel) __ctx.logger?.[__errorLevel]?.({ tool: toolName, fn: toolName, err: err instanceof Error ? err.message : String(err) }, "[bridge] tool failed");`); lines.push(` throw err;`); lines.push(` }`); lines.push(` }`); @@ -1724,7 +1751,8 @@ class CodegenContext { if (needsAsync) { // Check if async is only from element-scoped tools (no catch fallbacks). // If so, generate a dual sync/async path with a runtime check. - const canDualPath = !cf && this.asyncOnlyFromTools(elemWires); + const canDualPath = + !cf && !requiresLabeledLoop && this.asyncOnlyFromTools(elemWires); const toolRefs = canDualPath ? this.collectElementToolRefs(currentScopeElemWires) : []; @@ -1765,7 +1793,9 @@ class CodegenContext { this.elementLocalVars.clear(); } - // Async branch — for...of loop with await + // Async branch — Promise.all over async element callbacks so batched + // tool calls can coalesce before the first microtask flush. Control + // flow still requires an explicit loop. const preambleLines: string[] = []; this.elementLocalVars.clear(); this.collectElementPreamble( @@ -1774,31 +1804,42 @@ class CodegenContext { preambleLines, ); - const body = cf - ? this.buildElementBodyWithControlFlow( - elemWires, - arrayIterators, - 0, - 4, - cf.kind === "continue" ? "for-continue" : "break", - ) - : ` _result.push(${this.buildElementBody(elemWires, arrayIterators, 0, 4)});`; + if (cf?.kind === "break" || cf?.kind === "continue" || requiresLabeledLoop) { + const body = cf + ? this.buildElementBodyWithControlFlow( + elemWires, + arrayIterators, + 0, + 4, + cf.kind === "continue" ? "for-continue" : "break", + ) + : ` _result.push(${this.buildElementBody(elemWires, arrayIterators, 0, 4)});`; - lines.push(` const _result = [];`); - lines.push(` __loop0: for (const _el0 of (${arrayExpr} ?? [])) {`); - lines.push(` try {`); - for (const pl of preambleLines) { - lines.push(` ${pl}`); + lines.push(` const _result = [];`); + lines.push(` __loop0: for (const _el0 of (${arrayExpr} ?? [])) {`); + lines.push(` try {`); + for (const pl of preambleLines) { + lines.push(` ${pl}`); + } + lines.push(` ${body.trimStart()}`); + lines.push(` } catch (_ctrl) {`); + lines.push( + ` if (__isLoopCtrl(_ctrl)) { if (_ctrl.levels > 1) throw __nextLoopCtrl(_ctrl); if (_ctrl.__bridgeControl === "break") break; continue; }`, + ); + lines.push(` throw _ctrl;`); + lines.push(` }`); + lines.push(` }`); + lines.push(` return _result;`); + } else { + lines.push(` return await Promise.all((${arrayExpr} ?? []).map(async (_el0) => {`); + for (const pl of preambleLines) { + lines.push(` ${pl}`); + } + lines.push( + ` return ${this.buildElementBody(elemWires, arrayIterators, 0, 4)};`, + ); + lines.push(` }));`); } - lines.push(` ${body.trimStart()}`); - lines.push(` } catch (_ctrl) {`); - lines.push( - ` if (__isLoopCtrl(_ctrl)) { if (_ctrl.levels > 1) throw __nextLoopCtrl(_ctrl); if (_ctrl.__bridgeControl === "break") break; continue; }`, - ); - lines.push(` throw _ctrl;`); - lines.push(` }`); - lines.push(` }`); - lines.push(` return _result;`); this.elementLocalVars.clear(); } else if (cf?.kind === "continue" && cf.levels === 1) { // Use flatMap — skip elements that trigger continue (sync only) @@ -1974,7 +2015,8 @@ class CodegenContext { let mapExpr: string; if (needsAsync) { // Check if we can generate a dual sync/async path - const canDualPath = !cf && this.asyncOnlyFromTools(shifted); + const canDualPath = + !cf && !requiresLabeledLoop && this.asyncOnlyFromTools(shifted); const toolRefs = canDualPath ? this.collectElementToolRefs(currentScopeShifted) : []; @@ -2011,14 +2053,15 @@ class CodegenContext { "_el0", preambleLines, ); - const asyncBody = ` _result.push(${this.buildElementBody(shifted, shiftedIterators, 0, 8)});`; const preamble = preambleLines.map((l) => ` ${l}`).join("\n"); - const asyncExpr = `await (async () => { const _src = ${arrayExpr}; if (_src == null) return null; const _result = []; __loop0: for (const _el0 of _src) {\n try {\n${preamble}\n${asyncBody}\n } catch (_ctrl) { if (__isLoopCtrl(_ctrl)) { if (_ctrl.levels > 1) throw __nextLoopCtrl(_ctrl); if (_ctrl.__bridgeControl === "break") break; continue; } throw _ctrl; }\n } return _result; })()`; + const asyncExpr = `await ((async (__s) => Array.isArray(__s) ? Promise.all(__s.map(async (_el0) => {\n${preamble}${preamble ? "\n" : ""} return ${this.buildElementBody(shifted, shiftedIterators, 0, 8)};\n })) : null)(${arrayExpr}))`; this.elementLocalVars.clear(); mapExpr = `(${syncCheck}) ? ${syncMapExpr} : ${asyncExpr}`; } else { - // Standard async path — for...of inside an async IIFE + // Standard async path — Promise.all over async element callbacks so + // batched tools can queue together before the first flush. Control + // flow still requires an explicit loop. const preambleLines: string[] = []; this.elementLocalVars.clear(); this.collectElementPreamble( @@ -2031,18 +2074,21 @@ class CodegenContext { arrayField, ); - const asyncBody = cf - ? this.buildElementBodyWithControlFlow( - shifted, - shiftedIterators, - 0, - 8, - cf.kind === "continue" ? "for-continue" : "break", - ) - : ` _result.push(${this.buildElementBody(shifted, shiftedIterators, 0, 8)});`; - const preamble = preambleLines.map((l) => ` ${l}`).join("\n"); - mapExpr = `await (async () => { const _src = ${arrayExpr}; if (_src == null) return null; const _result = []; __loop0: for (const _el0 of _src) {\n try {\n${preamble}\n${asyncBody}\n } catch (_ctrl) { if (__isLoopCtrl(_ctrl)) { if (_ctrl.levels > 1) throw __nextLoopCtrl(_ctrl); if (_ctrl.__bridgeControl === "break") break; continue; } throw _ctrl; }\n } return _result; })()`; + if (cf?.kind === "break" || cf?.kind === "continue" || requiresLabeledLoop) { + const asyncBody = cf + ? this.buildElementBodyWithControlFlow( + shifted, + shiftedIterators, + 0, + 8, + cf.kind === "continue" ? "for-continue" : "break", + ) + : ` _result.push(${this.buildElementBody(shifted, shiftedIterators, 0, 8)});`; + mapExpr = `await (async () => { const _src = ${arrayExpr}; if (_src == null) return null; const _result = []; __loop0: for (const _el0 of _src) {\n try {\n${preamble}\n${asyncBody}\n } catch (_ctrl) { if (__isLoopCtrl(_ctrl)) { if (_ctrl.levels > 1) throw __nextLoopCtrl(_ctrl); if (_ctrl.__bridgeControl === "break") break; continue; } throw _ctrl; }\n } return _result; })()`; + } else { + mapExpr = `await ((async (__s) => Array.isArray(__s) ? Promise.all(__s.map(async (_el0) => {\n${preamble}${preamble ? "\n" : ""} return ${this.buildElementBody(shifted, shiftedIterators, 0, 8)};\n })) : null)(${arrayExpr}))`; + } this.elementLocalVars.clear(); } } else if (cf?.kind === "continue" && cf.levels === 1) { diff --git a/packages/bridge-compiler/src/execute-bridge.ts b/packages/bridge-compiler/src/execute-bridge.ts index 56cc7ac8..04981eb5 100644 --- a/packages/bridge-compiler/src/execute-bridge.ts +++ b/packages/bridge-compiler/src/execute-bridge.ts @@ -221,12 +221,6 @@ function flattenTools( return flat; } -function hasBatchedTool(flatTools: Record): boolean { - return Object.values(flatTools).some( - (value) => typeof value === "function" && value.bridge?.batch, - ); -} - // ── Public API ────────────────────────────────────────────────────────────── /** @@ -265,30 +259,6 @@ export async function executeBridge( maxDepth, } = options; - // Prototype limitation: runtime batching exists, but compiled batching still - // needs dedicated scheduling codegen to coalesce loop-scoped awaits. Fall - // back to the interpreter when any tool opts into `.bridge.batch`. - const allTools: ToolMap = { std: bundledStd, ...userTools }; - const flatTools = flattenTools(allTools as Record); - if (hasBatchedTool(flatTools)) { - logger?.warn?.( - "Batched tools currently run through the interpreter. Falling back to core executeBridge.", - ); - return executeCoreBridge({ - document, - operation, - input, - tools: userTools, - context, - signal, - toolTimeoutMs, - logger, - trace: options.trace, - requestedFields: options.requestedFields, - ...(maxDepth !== undefined ? { maxDepth } : {}), - }); - } - let fn: BridgeFn; try { fn = getOrCompile(document, operation, options.requestedFields); @@ -312,6 +282,11 @@ export async function executeBridge( throw err; } + // Merge built-in std namespace with user-provided tools, then flatten + // so the generated code can access them via dotted keys like tools["std.str.toUpperCase"]. + const allTools: ToolMap = { std: bundledStd, ...userTools }; + const flatTools = flattenTools(allTools as Record); + // Set up tracing if requested const traceLevel = options.trace ?? "off"; let tracer: TraceCollector | undefined; diff --git a/packages/bridge-compiler/test/codegen.test.ts b/packages/bridge-compiler/test/codegen.test.ts index 3676d0a5..7cbce661 100644 --- a/packages/bridge-compiler/test/codegen.test.ts +++ b/packages/bridge-compiler/test/codegen.test.ts @@ -1718,6 +1718,72 @@ bridge Query.test { assert.deepStrictEqual(result.traces, []); assert.equal(result.data.name, "Alice"); }); + + test("batched tools execute natively in compiled mode", async () => { + const document = parseBridgeFormat(`version 1.5 +bridge Query.users { + with context as ctx + with output as o + + o <- ctx.userIds[] as userId { + with app.fetchUser as user + + user.id <- userId + .id <- userId + .name <- user.name + } +}`); + + let batchCalls = 0; + const warnings: string[] = []; + const infos: Array<{ tool: string; fn: string; durationMs: number }> = []; + + const fetchUser = Object.assign( + async (inputs: Array<{ id: string }>) => { + batchCalls++; + return inputs.map((input) => ({ name: `user:${input.id}` })); + }, + { + bridge: { + batch: true, + log: { execution: "info" as const }, + }, + }, + ); + + const result = await executeAot({ + document, + operation: "Query.users", + tools: { + app: { fetchUser }, + }, + context: { + userIds: ["u1", "u2", "u3"], + }, + trace: "full", + logger: { + warn: (message: string) => warnings.push(message), + info: (meta: { tool: string; fn: string; durationMs: number }) => { + infos.push(meta); + }, + }, + }); + + assert.deepStrictEqual(result.data, [ + { id: "u1", name: "user:u1" }, + { id: "u2", name: "user:u2" }, + { id: "u3", name: "user:u3" }, + ]); + assert.equal(batchCalls, 1); + assert.deepStrictEqual(warnings, []); + assert.equal(result.traces.length, 1); + assert.deepStrictEqual(result.traces[0]!.input, [ + { id: "u1" }, + { id: "u2" }, + { id: "u3" }, + ]); + assert.equal(infos.length, 1); + }); }); // ── Parallel scheduling ────────────────────────────────────────────────────── diff --git a/packages/docs-site/src/content/docs/advanced/custom-tools.md b/packages/docs-site/src/content/docs/advanced/custom-tools.md index d1bb23c4..f3dbfa2e 100644 --- a/packages/docs-site/src/content/docs/advanced/custom-tools.md +++ b/packages/docs-site/src/content/docs/advanced/custom-tools.md @@ -177,6 +177,7 @@ Rules: - Bridge fans the results back out to the original wire sites automatically. - `maxBatchSize` splits very large queues into multiple batch calls. - `flush: "microtask"` means compatible calls in the same microtask are coalesced together. +- Native batching works in both the runtime interpreter and the compiled executor. ### Tracing and Logging From 6ee2cdbce1ee79f2f7159f168883ebc6b82444a8 Mon Sep 17 00:00:00 2001 From: Aarne Laur Date: Sat, 7 Mar 2026 20:45:04 +0100 Subject: [PATCH 3/3] Enhance native batching: document feature, improve error handling, and update tests for partial failures --- .changeset/native-batched-tool-authoring.md | 2 + packages/bridge-compiler/src/codegen.ts | 89 ++++++++++++++----- packages/bridge-core/src/ExecutionTree.ts | 7 +- packages/bridge/test/native-batching.test.ts | 55 ++++++++++++ .../src/content/docs/advanced/custom-tools.md | 1 + 5 files changed, 130 insertions(+), 24 deletions(-) diff --git a/.changeset/native-batched-tool-authoring.md b/.changeset/native-batched-tool-authoring.md index 92c6c452..16eb336d 100644 --- a/.changeset/native-batched-tool-authoring.md +++ b/.changeset/native-batched-tool-authoring.md @@ -7,3 +7,5 @@ Improve native batched tool authoring by documenting the feature, exporting dedicated batch tool types, and simplifying the batch contract to plain input arrays. Batch tools now receive `Input[]` and must return `Output[]` in matching order. Batched tool tracing and logging are also emitted once per flushed batch call instead of once per queued item. + +Native batching now works in compiled execution as well as the runtime interpreter. Batch tools can also signal partial failures by returning an `Error` at a specific result index, which rejects only that item and allows normal wire-level `catch` fallbacks to handle it. diff --git a/packages/bridge-compiler/src/codegen.ts b/packages/bridge-compiler/src/codegen.ts index 92995cbf..7d32eef0 100644 --- a/packages/bridge-compiler/src/codegen.ts +++ b/packages/bridge-compiler/src/codegen.ts @@ -712,13 +712,17 @@ class CodegenContext { lines.push(` const log = fn?.bridge?.log;`); lines.push(` if (log === false || log == null) return false;`); lines.push(` if (log === true) return "info";`); - lines.push(` return log.execution === "info" ? "info" : log.execution ? "debug" : false;`); + lines.push( + ` return log.execution === "info" ? "info" : log.execution ? "debug" : false;`, + ); lines.push(` }`); lines.push(` function __toolErrorLogLevel(fn) {`); lines.push(` const log = fn?.bridge?.log;`); lines.push(` if (log === false) return false;`); lines.push(` if (log == null || log === true) return "error";`); - lines.push(` return log.errors === false ? false : log.errors === "warn" ? "warn" : "error";`); + lines.push( + ` return log.errors === false ? false : log.errors === "warn" ? "warn" : "error";`, + ); lines.push(` }`); lines.push(` function __rethrowBridgeError(err, loc) {`); lines.push( @@ -842,9 +846,11 @@ class CodegenContext { ` for (let start = 0; start < pending.length; start += chunkSize) {`, ); lines.push(` const chunk = pending.slice(start, start + chunkSize);`); + lines.push(` const inputs = chunk.map((item) => item.input);`); + lines.push( + ` const startTime = (__trace || __ctx.logger) ? performance.now() : 0;`, + ); lines.push(` try {`); - lines.push(` const inputs = chunk.map((item) => item.input);`); - lines.push(` const startTime = (__trace || __ctx.logger) ? performance.now() : 0;`); lines.push(` const batchPromise = fn(inputs, __ctx);`); lines.push(` let result;`); lines.push( @@ -859,9 +865,13 @@ class CodegenContext { lines.push(` } else {`); lines.push(` result = await batchPromise;`); lines.push(` }`); - lines.push(` if (__trace) __trace(queue.toolName, startTime, performance.now(), inputs, result, null);`); + lines.push( + ` if (__trace) __trace(queue.toolName, startTime, performance.now(), inputs, result, null);`, + ); lines.push(` const __execLevel = __toolExecutionLogLevel(fn);`); - lines.push(` if (__execLevel) __ctx.logger?.[__execLevel]?.({ tool: queue.toolName, fn: queue.toolName, durationMs: Math.round((performance.now() - startTime) * 1000) / 1000 }, "[bridge] tool completed");`); + lines.push( + ` if (__execLevel) __ctx.logger?.[__execLevel]?.({ tool: queue.toolName, fn: queue.toolName, durationMs: Math.round((performance.now() - startTime) * 1000) / 1000 }, "[bridge] tool completed");`, + ); lines.push( ` if (!Array.isArray(result)) throw new Error('Batch tool "' + queue.toolName + '" must return an array of results');`, ); @@ -869,12 +879,16 @@ class CodegenContext { ` if (result.length !== chunk.length) throw new Error('Batch tool "' + queue.toolName + '" returned ' + result.length + ' results for ' + chunk.length + ' queued calls');`, ); lines.push( - ` for (let i = 0; i < chunk.length; i++) chunk[i].resolve(result[i]);`, + ` for (let i = 0; i < chunk.length; i++) { const value = result[i]; if (value instanceof Error) chunk[i].reject(value); else chunk[i].resolve(value); }`, ); lines.push(` } catch (err) {`); - lines.push(` if (__trace) __trace(queue.toolName, startTime, performance.now(), inputs, null, err);`); + lines.push( + ` if (__trace) __trace(queue.toolName, startTime, performance.now(), inputs, null, err);`, + ); lines.push(` const __errorLevel = __toolErrorLogLevel(fn);`); - lines.push(` if (__errorLevel) __ctx.logger?.[__errorLevel]?.({ tool: queue.toolName, fn: queue.toolName, err: err instanceof Error ? err.message : String(err) }, "[bridge] tool failed");`); + lines.push( + ` if (__errorLevel) __ctx.logger?.[__errorLevel]?.({ tool: queue.toolName, fn: queue.toolName, err: err instanceof Error ? err.message : String(err) }, "[bridge] tool failed");`, + ); lines.push(` for (const item of chunk) item.reject(err);`); lines.push(` }`); lines.push(` }`); @@ -892,14 +906,18 @@ class CodegenContext { ` if (__trace) __trace(toolName, start, performance.now(), input, result, null);`, ); lines.push(` const __execLevel = __toolExecutionLogLevel(fn);`); - lines.push(` if (__execLevel) __ctx.logger?.[__execLevel]?.({ tool: toolName, fn: toolName, durationMs: Math.round((performance.now() - start) * 1000) / 1000 }, "[bridge] tool completed");`); + lines.push( + ` if (__execLevel) __ctx.logger?.[__execLevel]?.({ tool: toolName, fn: toolName, durationMs: Math.round((performance.now() - start) * 1000) / 1000 }, "[bridge] tool completed");`, + ); lines.push(` return result;`); lines.push(` } catch (err) {`); lines.push( ` if (__trace) __trace(toolName, start, performance.now(), input, null, err);`, ); lines.push(` const __errorLevel = __toolErrorLogLevel(fn);`); - lines.push(` if (__errorLevel) __ctx.logger?.[__errorLevel]?.({ tool: toolName, fn: toolName, err: err instanceof Error ? err.message : String(err) }, "[bridge] tool failed");`); + lines.push( + ` if (__errorLevel) __ctx.logger?.[__errorLevel]?.({ tool: toolName, fn: toolName, err: err instanceof Error ? err.message : String(err) }, "[bridge] tool failed");`, + ); lines.push(` throw err;`); lines.push(` }`); lines.push(` }`); @@ -930,14 +948,18 @@ class CodegenContext { ` if (__trace) __trace(toolName, start, performance.now(), input, result, null);`, ); lines.push(` const __execLevel = __toolExecutionLogLevel(fn);`); - lines.push(` if (__execLevel) __ctx.logger?.[__execLevel]?.({ tool: toolName, fn: toolName, durationMs: Math.round((performance.now() - start) * 1000) / 1000 }, "[bridge] tool completed");`); + lines.push( + ` if (__execLevel) __ctx.logger?.[__execLevel]?.({ tool: toolName, fn: toolName, durationMs: Math.round((performance.now() - start) * 1000) / 1000 }, "[bridge] tool completed");`, + ); lines.push(` return result;`); lines.push(` } catch (err) {`); lines.push( ` if (__trace) __trace(toolName, start, performance.now(), input, null, err);`, ); lines.push(` const __errorLevel = __toolErrorLogLevel(fn);`); - lines.push(` if (__errorLevel) __ctx.logger?.[__errorLevel]?.({ tool: toolName, fn: toolName, err: err instanceof Error ? err.message : String(err) }, "[bridge] tool failed");`); + lines.push( + ` if (__errorLevel) __ctx.logger?.[__errorLevel]?.({ tool: toolName, fn: toolName, err: err instanceof Error ? err.message : String(err) }, "[bridge] tool failed");`, + ); lines.push(` throw err;`); lines.push(` }`); lines.push(` }`); @@ -1804,7 +1826,11 @@ class CodegenContext { preambleLines, ); - if (cf?.kind === "break" || cf?.kind === "continue" || requiresLabeledLoop) { + if ( + cf?.kind === "break" || + cf?.kind === "continue" || + requiresLabeledLoop + ) { const body = cf ? this.buildElementBodyWithControlFlow( elemWires, @@ -1831,7 +1857,9 @@ class CodegenContext { lines.push(` }`); lines.push(` return _result;`); } else { - lines.push(` return await Promise.all((${arrayExpr} ?? []).map(async (_el0) => {`); + lines.push( + ` return await Promise.all((${arrayExpr} ?? []).map(async (_el0) => {`, + ); for (const pl of preambleLines) { lines.push(` ${pl}`); } @@ -2075,7 +2103,11 @@ class CodegenContext { ); const preamble = preambleLines.map((l) => ` ${l}`).join("\n"); - if (cf?.kind === "break" || cf?.kind === "continue" || requiresLabeledLoop) { + if ( + cf?.kind === "break" || + cf?.kind === "continue" || + requiresLabeledLoop + ) { const asyncBody = cf ? this.buildElementBodyWithControlFlow( shifted, @@ -2886,21 +2918,30 @@ class CodegenContext { ); const inputObj = this.buildElementToolInput(toolWires, elVar); const fnName = this.resolveToolDef(tool.toolName)?.fn ?? tool.toolName; + const isCatchGuarded = this.catchGuardedTools.has(tk); if (syncOnly) { const fn = `tools[${JSON.stringify(fnName)}]`; - if (this.memoizedToolKeys.has(tk)) { + const syncExpr = this.memoizedToolKeys.has(tk) + ? `__callMemoized(${fn}, ${inputObj}, ${JSON.stringify(fnName)}, ${JSON.stringify(tk)})` + : `__callSync(${fn}, ${inputObj}, ${JSON.stringify(fnName)})`; + if (isCatchGuarded) { + lines.push(`let ${vn}, ${vn}_err;`); lines.push( - `const ${vn} = __callMemoized(${fn}, ${inputObj}, ${JSON.stringify(fnName)}, ${JSON.stringify(tk)});`, + `try { ${vn} = ${syncExpr}; } catch (_e) { if (_e?.name === "BridgePanicError" || _e?.name === "BridgeAbortError") throw _e; ${vn}_err = _e; }`, ); } else { + lines.push(`const ${vn} = ${syncExpr};`); + } + } else { + const asyncExpr = this.syncAwareCall(fnName, inputObj, tk); + if (isCatchGuarded) { + lines.push(`let ${vn}, ${vn}_err;`); lines.push( - `const ${vn} = __callSync(${fn}, ${inputObj}, ${JSON.stringify(fnName)});`, + `try { ${vn} = ${asyncExpr}; } catch (_e) { if (_e?.name === "BridgePanicError" || _e?.name === "BridgeAbortError") throw _e; ${vn}_err = _e; }`, ); + } else { + lines.push(`const ${vn} = ${asyncExpr};`); } - } else { - lines.push( - `const ${vn} = ${this.syncAwareCall(fnName, inputObj, tk)};`, - ); } } } @@ -3226,6 +3267,8 @@ class CodegenContext { if (!this.catchGuardedTools.has(srcKey)) return undefined; if (this.internalToolKeys.has(srcKey) || this.defineContainers.has(srcKey)) return undefined; + const localVar = this.elementLocalVars.get(srcKey); + if (localVar) return `${localVar}_err`; const tool = this.tools.get(srcKey); if (!tool) return undefined; return `${tool.varName}_err`; diff --git a/packages/bridge-core/src/ExecutionTree.ts b/packages/bridge-core/src/ExecutionTree.ts index bca724cd..2f6485a3 100644 --- a/packages/bridge-core/src/ExecutionTree.ts +++ b/packages/bridge-core/src/ExecutionTree.ts @@ -683,7 +683,12 @@ export class ExecutionTree implements TreeContext { } for (let i = 0; i < chunk.length; i++) { - chunk[i]!.resolve(resolved[i]); + const value = resolved[i]; + if (value instanceof Error) { + chunk[i]!.reject(value); + } else { + chunk[i]!.resolve(value); + } } } catch (err) { for (const item of chunk) item.reject(err); diff --git a/packages/bridge/test/native-batching.test.ts b/packages/bridge/test/native-batching.test.ts index 7d7ed371..0d045050 100644 --- a/packages/bridge/test/native-batching.test.ts +++ b/packages/bridge/test/native-batching.test.ts @@ -134,4 +134,59 @@ bridge Query.users { ]); assert.equal(infos.length, 1); }); + + test("partial batch failures route failed items through catch fallbacks", async () => { + const bridge = `version 1.5 + +bridge Query.users { + with context as ctx + with output as o + + o <- ctx.userIds[] as userId { + with app.fetchUser as user + + user.id <- userId + .id <- userId + .name <- user.name catch "missing" + } +}`; + + let batchCalls = 0; + + const fetchUser: BatchToolFn<{ id: string }, { name: string }> = async ( + inputs, + ) => { + batchCalls++; + return inputs.map((input) => + input.id === "u2" + ? new Error("Not Found") + : { name: `user:${input.id}` }, + ) as Array<{ name: string } | Error>; + }; + + fetchUser.bridge = { + batch: true, + } satisfies ToolMetadata; + + const result = await run( + bridge, + "Query.users", + {}, + { + app: { fetchUser }, + }, + { + context: { + userIds: ["u1", "u2", "u3"], + }, + }, + ); + + assert.equal(batchCalls, 1); + assert.deepEqual(result.data, [ + { id: "u1", name: "user:u1" }, + { id: "u2", name: "missing" }, + { id: "u3", name: "user:u3" }, + ]); + }); }); diff --git a/packages/docs-site/src/content/docs/advanced/custom-tools.md b/packages/docs-site/src/content/docs/advanced/custom-tools.md index f3dbfa2e..c99dcb8d 100644 --- a/packages/docs-site/src/content/docs/advanced/custom-tools.md +++ b/packages/docs-site/src/content/docs/advanced/custom-tools.md @@ -178,6 +178,7 @@ Rules: - `maxBatchSize` splits very large queues into multiple batch calls. - `flush: "microtask"` means compatible calls in the same microtask are coalesced together. - Native batching works in both the runtime interpreter and the compiled executor. +- To fail just one item in a batch, return an `Error` instance at that index. Bridge rejects only that item and routes it through the usual wire-level `catch` fallback. ### Tracing and Logging