Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/native-batched-tool-authoring.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"@stackables/bridge": minor
"@stackables/bridge-core": minor
"@stackables/bridge-types": minor
---

Improve native batched tool authoring by documenting the feature, exporting dedicated batch tool types, and simplifying the batch contract to plain input arrays.

Batch tools now receive `Input[]` and must return `Output[]` in matching order. Batched tool tracing and logging are also emitted once per flushed batch call instead of once per queued item.

Native batching now works in compiled execution as well as the runtime interpreter. Batch tools can also signal partial failures by returning an `Error` at a specific result index, which rejects only that item and allows normal wire-level `catch` fallbacks to handle it.
261 changes: 211 additions & 50 deletions packages/bridge-compiler/src/codegen.ts

Large diffs are not rendered by default.

66 changes: 66 additions & 0 deletions packages/bridge-compiler/test/codegen.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1718,6 +1718,72 @@ bridge Query.test {
assert.deepStrictEqual(result.traces, []);
assert.equal(result.data.name, "Alice");
});

test("batched tools execute natively in compiled mode", async () => {
const document = parseBridgeFormat(`version 1.5
bridge Query.users {
with context as ctx
with output as o

o <- ctx.userIds[] as userId {
with app.fetchUser as user

user.id <- userId
.id <- userId
.name <- user.name
}
}`);

let batchCalls = 0;
const warnings: string[] = [];
const infos: Array<{ tool: string; fn: string; durationMs: number }> = [];

const fetchUser = Object.assign(
async (inputs: Array<{ id: string }>) => {
batchCalls++;
return inputs.map((input) => ({ name: `user:${input.id}` }));
},
{
bridge: {
batch: true,
log: { execution: "info" as const },
},
},
);

const result = await executeAot<any>({
document,
operation: "Query.users",
tools: {
app: { fetchUser },
},
context: {
userIds: ["u1", "u2", "u3"],
},
trace: "full",
logger: {
warn: (message: string) => warnings.push(message),
info: (meta: { tool: string; fn: string; durationMs: number }) => {
infos.push(meta);
},
},
});

assert.deepStrictEqual(result.data, [
{ id: "u1", name: "user:u1" },
{ id: "u2", name: "user:u2" },
{ id: "u3", name: "user:u3" },
]);
assert.equal(batchCalls, 1);
assert.deepStrictEqual(warnings, []);
assert.equal(result.traces.length, 1);
assert.deepStrictEqual(result.traces[0]!.input, [
{ id: "u1" },
{ id: "u2" },
{ id: "u3" },
]);
assert.equal(infos.length, 1);
});
});

// ── Parallel scheduling ──────────────────────────────────────────────────────
Expand Down
221 changes: 219 additions & 2 deletions packages/bridge-core/src/ExecutionTree.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
trunkDependsOnElement,
} from "./scheduleTools.ts";
import { internal } from "./tools/index.ts";
import type { ToolTrace } from "./tracing.ts";
import type { EffectiveToolLog, ToolTrace } from "./tracing.ts";
import {
isOtelActive,
logToolError,
Expand Down Expand Up @@ -89,6 +89,20 @@ function stableMemoizeKey(value: unknown): string {
.join(",")}}`;
}

type PendingBatchToolCall = {
input: Record<string, any>;
resolve: (value: any) => void;
reject: (err: unknown) => void;
};

type BatchToolQueue = {
items: PendingBatchToolCall[];
scheduled: boolean;
toolName: string;
fnName: string;
maxBatchSize?: number;
};

export class ExecutionTree implements TreeContext {
state: Record<string, any> = {};
bridge: Bridge | undefined;
Expand Down Expand Up @@ -123,6 +137,9 @@ export class ExecutionTree implements TreeContext {
/** Per-tool memoization caches keyed by stable input fingerprints. */
private toolMemoCache: Map<string, Map<string, MaybePromise<any>>> =
new Map();
/** Per-request batch queues for tools declared with `.bridge.batch`. */
private toolBatchQueues: Map<(...args: any[]) => any, BatchToolQueue> =
new Map();
/** Promise that resolves when all critical `force` handles have settled. */
private forcedExecution?: Promise<void>;
/** Shared trace collector — present only when tracing is enabled. */
Expand Down Expand Up @@ -305,7 +322,21 @@ export class ExecutionTree implements TreeContext {
};

const timeoutMs = this.toolTimeoutMs;
const { sync: isSyncTool, doTrace, log } = resolveToolMeta(fnImpl);
const { sync: isSyncTool, batch, doTrace, log } = resolveToolMeta(fnImpl);

if (batch) {
return this.callBatchedTool(
toolName,
fnName,
fnImpl,
input,
timeoutMs,
toolContext,
doTrace,
log,
batch.maxBatchSize,
);
}

// ── Fast path: no instrumentation configured ──────────────────
// When there is no internal tracer, no logger, and OpenTelemetry
Expand Down Expand Up @@ -480,6 +511,191 @@ export class ExecutionTree implements TreeContext {
);
}

private callBatchedTool(
toolName: string,
fnName: string,
fnImpl: (...args: any[]) => any,
input: Record<string, any>,
timeoutMs: number,
toolContext: ToolContext,
doTrace: boolean,
log: EffectiveToolLog,
maxBatchSize?: number,
): Promise<any> {
let queue = this.toolBatchQueues.get(fnImpl);
if (!queue) {
queue = {
items: [],
scheduled: false,
toolName,
fnName,
maxBatchSize,
};
this.toolBatchQueues.set(fnImpl, queue);
}

if (maxBatchSize !== undefined) {
queue.maxBatchSize = maxBatchSize;
}

return new Promise((resolve, reject) => {
queue!.items.push({ input, resolve, reject });
if (queue!.scheduled) return;
queue!.scheduled = true;
queueMicrotask(() => {
void this.flushBatchedToolQueue(
fnImpl,
toolContext,
timeoutMs,
doTrace,
log,
);
});
});
}

private async flushBatchedToolQueue(
fnImpl: (...args: any[]) => any,
toolContext: ToolContext,
timeoutMs: number,
doTrace: boolean,
log: EffectiveToolLog,
): Promise<void> {
const queue = this.toolBatchQueues.get(fnImpl);
if (!queue) return;

const pending = queue.items.splice(0, queue.items.length);
queue.scheduled = false;
if (pending.length === 0) return;

if (this.signal?.aborted) {
const abortErr = new BridgeAbortError();
for (const item of pending) item.reject(abortErr);
return;
}

const chunkSize =
queue.maxBatchSize && queue.maxBatchSize > 0
? Math.floor(queue.maxBatchSize)
: pending.length;

for (let start = 0; start < pending.length; start += chunkSize) {
const chunk = pending.slice(start, start + chunkSize);
const batchInput = chunk.map((item) => item.input);
const tracer = this.tracer;
const logger = this.logger;
const metricAttrs = {
"bridge.tool.name": queue.toolName,
"bridge.tool.fn": queue.fnName,
};

try {
const executeBatch = async () => {
const batchResult = fnImpl(batchInput, toolContext);
return timeoutMs > 0 && isPromise(batchResult)
? await raceTimeout(batchResult, timeoutMs, queue.toolName)
: await batchResult;
};

const resolved =
!tracer && !logger && !isOtelActive()
? await executeBatch()
: await withSpan(
doTrace,
`bridge.tool.${queue.toolName}.${queue.fnName}`,
metricAttrs,
async (span) => {
const traceStart = tracer?.now();
const wallStart = performance.now();
try {
const result = await executeBatch();
const durationMs = roundMs(performance.now() - wallStart);
toolCallCounter.add(1, metricAttrs);
toolDurationHistogram.record(durationMs, metricAttrs);
if (tracer && traceStart != null) {
tracer.record(
tracer.entry({
tool: queue.toolName,
fn: queue.fnName,
input: batchInput,
output: result,
durationMs: roundMs(tracer.now() - traceStart),
startedAt: traceStart,
}),
);
}
logToolSuccess(
logger,
log.execution,
queue.toolName,
queue.fnName,
durationMs,
);
return result;
} catch (err) {
const durationMs = roundMs(performance.now() - wallStart);
toolCallCounter.add(1, metricAttrs);
toolDurationHistogram.record(durationMs, metricAttrs);
toolErrorCounter.add(1, metricAttrs);
if (tracer && traceStart != null) {
tracer.record(
tracer.entry({
tool: queue.toolName,
fn: queue.fnName,
input: batchInput,
error: (err as Error).message,
durationMs: roundMs(tracer.now() - traceStart),
startedAt: traceStart,
}),
);
}
recordSpanError(span, err as Error);
logToolError(
logger,
log.errors,
queue.toolName,
queue.fnName,
err as Error,
);
if (
this.signal?.aborted &&
err instanceof DOMException &&
err.name === "AbortError"
) {
throw new BridgeAbortError();
}
throw err;
} finally {
span?.end();
}
},
);

if (!Array.isArray(resolved)) {
throw new Error(
`Batch tool "${queue.fnName}" must return an array of results`,
);
}
if (resolved.length !== chunk.length) {
throw new Error(
`Batch tool "${queue.fnName}" returned ${resolved.length} results for ${chunk.length} queued calls`,
);
}

for (let i = 0; i < chunk.length; i++) {
const value = resolved[i];
if (value instanceof Error) {
chunk[i]!.reject(value);
} else {
chunk[i]!.resolve(value);
}
}
} catch (err) {
for (const item of chunk) item.reject(err);
}
}
}

shadow(): ExecutionTree {
// Lightweight: bypass the constructor to avoid redundant work that
// re-derives data identical to the parent (bridge lookup, pipeHandleMap,
Expand All @@ -505,6 +721,7 @@ export class ExecutionTree implements TreeContext {
child.handleVersionMap = this.handleVersionMap;
child.memoizedToolKeys = this.memoizedToolKeys;
child.toolMemoCache = this.toolMemoCache;
child.toolBatchQueues = this.toolBatchQueues;
child.toolFns = this.toolFns;
child.elementTrunkKey = this.elementTrunkKey;
child.tracer = this.tracer;
Expand Down
4 changes: 4 additions & 0 deletions packages/bridge-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ export { SELF_MODULE } from "./types.ts";
export type {
Bridge,
BridgeDocument,
BatchToolCallFn,
BatchToolFn,
CacheStore,
ConstDef,
ControlFlowInstruction,
Expand All @@ -66,6 +68,8 @@ export type {
Instruction,
NodeRef,
SourceLocation,
ScalarToolCallFn,
ScalarToolFn,
ToolCallFn,
ToolContext,
ToolDef,
Expand Down
9 changes: 9 additions & 0 deletions packages/bridge-core/src/tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import { metrics, trace } from "@opentelemetry/api";
import type { Span } from "@opentelemetry/api";
import type { ToolMetadata } from "@stackables/bridge-types";
import type { BatchToolMetadata } from "@stackables/bridge-types";
import type { Logger } from "./tree-types.ts";
import { roundMs } from "./tree-utils.ts";

Expand Down Expand Up @@ -248,6 +249,8 @@ export type EffectiveToolLog = {
export type ResolvedToolMeta = {
/** Whether the tool declares synchronous execution. */
sync: boolean;
/** Batch mode contract, when declared. */
batch?: BatchToolMetadata;
/** Emit an OTel span for this call. Default: `true`. */
doTrace: boolean;
log: EffectiveToolLog;
Expand All @@ -271,6 +274,12 @@ export function resolveToolMeta(fn: (...args: any[]) => any): ResolvedToolMeta {
const bridge = (fn as any).bridge as ToolMetadata | undefined;
return {
sync: bridge?.sync === true,
batch:
bridge?.batch === true
? {}
: typeof bridge?.batch === "object"
? bridge.batch
: undefined,
doTrace: bridge?.trace !== false,
log: resolveToolLog(bridge),
};
Expand Down
Loading
Loading