From d8444b6d2de9d71b2eb37bdfcc3200fddfb4b923 Mon Sep 17 00:00:00 2001 From: Ankur Goyal Date: Thu, 30 Apr 2026 07:44:21 -0700 Subject: [PATCH 1/2] initial commit --- js/src/dataset-pipeline.ts | 102 +++++++++++++++++++++++++++++++++++++ js/src/exports.ts | 21 +++++++- js/src/logger.ts | 18 +++++-- js/util/object.ts | 4 +- 4 files changed, 138 insertions(+), 7 deletions(-) create mode 100644 js/src/dataset-pipeline.ts diff --git a/js/src/dataset-pipeline.ts b/js/src/dataset-pipeline.ts new file mode 100644 index 000000000..8786c680b --- /dev/null +++ b/js/src/dataset-pipeline.ts @@ -0,0 +1,102 @@ +import type { ObjectReferenceType as ObjectReference } from "./generated_types"; +import type { Dataset, FullInitDatasetOptions } from "./logger"; +import type { Trace } from "./trace"; + +export type DatasetPipelineScope = "span" | "trace"; + +export type DatasetPipelineSource = { + projectId?: string; + projectName?: string; + orgName?: string; + filter?: string; + scope?: DatasetPipelineScope; + limit?: number; +}; + +type DatasetPipelineInitDatasetOptions = FullInitDatasetOptions; + +export type DatasetPipelineOrigin = ObjectReference; + +export type DatasetPipelineTarget = { + projectId?: DatasetPipelineInitDatasetOptions["projectId"]; + projectName?: DatasetPipelineInitDatasetOptions["project"]; + orgName?: DatasetPipelineInitDatasetOptions["orgName"]; + datasetName: NonNullable; + description?: DatasetPipelineInitDatasetOptions["description"]; + metadata?: DatasetPipelineInitDatasetOptions["metadata"]; +}; + +export type DatasetPipelineRow = Parameters[0]; + +export type DatasetPipelineCandidate = { + trace: Trace; + /** + * The matching source span row id when the source scope is "span". + */ + id?: string; + /** + * Default provenance for rows returned by transform. In span scope this + * points at the matching source span row. + */ + origin?: ObjectReference; +}; + +export type DatasetPipelineTransformContext = { + pipeline: DatasetPipelineDefinition; +}; + +export type DatasetPipelineTransformResult = + | DatasetPipelineRow + | DatasetPipelineRow[] + | null + | undefined; + +export type DatasetPipelineTransform = ( + candidate: DatasetPipelineCandidate, + context: DatasetPipelineTransformContext, +) => DatasetPipelineTransformResult | Promise; + +export type DatasetPipelineDefinition = { + name?: string; + source: DatasetPipelineSource; + transform: DatasetPipelineTransform; + target: DatasetPipelineTarget; +}; + +const DATASET_PIPELINE_MARKER = "__braintrustDatasetPipeline"; + +declare global { + // eslint-disable-next-line no-var + var __braintrust_dataset_pipelines: DatasetPipelineDefinition[] | undefined; +} + +function registry(): DatasetPipelineDefinition[] { + if (!globalThis.__braintrust_dataset_pipelines) { + globalThis.__braintrust_dataset_pipelines = []; + } + return globalThis.__braintrust_dataset_pipelines; +} + +export function getRegisteredDatasetPipelines(): DatasetPipelineDefinition[] { + return [...registry()]; +} + +export function isDatasetPipelineDefinition( + value: unknown, +): value is DatasetPipelineDefinition { + return ( + typeof value === "object" && + value !== null && + DATASET_PIPELINE_MARKER in value + ); +} + +export function DatasetPipeline( + definition: DatasetPipelineDefinition, +): DatasetPipelineDefinition { + const registered = Object.assign(definition, { + [DATASET_PIPELINE_MARKER]: true, + }); + registry().push(registered); + return registered; +} diff --git a/js/src/exports.ts b/js/src/exports.ts index 93a12bb05..7bbd11325 100644 --- a/js/src/exports.ts +++ b/js/src/exports.ts @@ -215,6 +215,25 @@ export { defaultErrorScoreHandler, } from "./framework"; +export type { + DatasetPipelineCandidate, + DatasetPipelineDefinition, + DatasetPipelineOrigin, + DatasetPipelineRow, + DatasetPipelineScope, + DatasetPipelineSource, + DatasetPipelineTarget, + DatasetPipelineTransform, + DatasetPipelineTransformContext, + DatasetPipelineTransformResult, +} from "./dataset-pipeline"; + +export { + DatasetPipeline, + getRegisteredDatasetPipelines, + isDatasetPipelineDefinition, +} from "./dataset-pipeline"; + export type { CodeOpts, CreateProjectOpts, @@ -246,7 +265,7 @@ export { } from "./prompt-schemas"; export type { Trace, SpanData, GetThreadOptions } from "./trace"; -export { SpanFetcher, CachedSpanFetcher } from "./trace"; +export { LocalTrace, SpanFetcher, CachedSpanFetcher } from "./trace"; export type { ParentExperimentIds, diff --git a/js/src/logger.ts b/js/src/logger.ts index 81da807fe..559f4fddb 100644 --- a/js/src/logger.ts +++ b/js/src/logger.ts @@ -79,6 +79,7 @@ import { type RepoInfoType as RepoInfo, type PromptBlockDataType as PromptBlockData, type ResponseFormatJsonSchemaType as ResponseFormatJsonSchema, + type ObjectReferenceType as ObjectReference, } from "./generated_types"; const BRAINTRUST_ATTACHMENT = @@ -4263,8 +4264,8 @@ export function initDataset< legacy, _internal_btql, resolvedVersion instanceof LazyValue || - normalizedEnvironment !== undefined || - normalizedSnapshotName !== undefined + normalizedEnvironment !== undefined || + normalizedSnapshotName !== undefined ? { ...(resolvedVersion instanceof LazyValue ? { @@ -6041,9 +6042,9 @@ export type WithTransactionId = R & { export const DEFAULT_FETCH_BATCH_SIZE = 1000; export const MAX_BTQL_ITERATIONS = 10000; -export class ObjectFetcher implements AsyncIterable< - WithTransactionId -> { +export class ObjectFetcher + implements AsyncIterable> +{ private _fetchedData: WithTransactionId[] | undefined = undefined; constructor( @@ -7367,6 +7368,7 @@ export class Dataset< metadata, tags, output, + origin, isMerge, }: { id: string; @@ -7375,6 +7377,7 @@ export class Dataset< metadata?: Record; tags?: string[]; output?: unknown; + origin?: ObjectReference; isMerge?: boolean; }): LazyValue { return new LazyValue(async () => { @@ -7389,6 +7392,7 @@ export class Dataset< dataset_id, created: !isMerge ? new Date().toISOString() : undefined, //if we're merging/updating an event we will not add this ts metadata, + origin, ...(!!isMerge ? { [IS_MERGE_FIELD]: true, @@ -7412,6 +7416,7 @@ export class Dataset< * about anything else that's relevant, that you can use to help find and analyze examples later. For example, you could log the * `prompt`, example's `id`, or anything else that would be useful to slice/dice later. The values in `metadata` can be any * JSON-serializable type, but its keys must be strings. + * @param event.origin (Optional) a reference to the source object this dataset record was derived from. * @param event.id (Optional) a unique identifier for the event. If you don't provide one, Braintrust will generate one for you. * @param event.output: (Deprecated) The output of your application. Use `expected` instead. * @returns The `id` of the logged record. @@ -7423,6 +7428,7 @@ export class Dataset< tags, id, output, + origin, }: { readonly input?: unknown; readonly expected?: unknown; @@ -7430,6 +7436,7 @@ export class Dataset< readonly metadata?: Record; readonly id?: string; readonly output?: unknown; + readonly origin?: ObjectReference; }): string { this.validateEvent({ metadata, expected, output, tags }); @@ -7442,6 +7449,7 @@ export class Dataset< metadata, tags, output, + origin, isMerge: false, }), ); diff --git a/js/util/object.ts b/js/util/object.ts index 616b31a1b..ac50f596b 100644 --- a/js/util/object.ts +++ b/js/util/object.ts @@ -15,6 +15,7 @@ import { export type IdField = { id: string }; export type InputField = { input: unknown }; +type ObjectReference = z.infer; export type OtherExperimentLogFields = { output: unknown; expected: unknown; @@ -25,7 +26,7 @@ export type OtherExperimentLogFields = { metadata: Record; metrics: Record; datasetRecordId: string; - origin: z.infer; + origin: ObjectReference; span_attributes: Record; [ASYNC_SCORING_CONTROL_FIELD]: AsyncScoringControl; [MERGE_PATHS_FIELD]: string[][]; @@ -100,6 +101,7 @@ export type DatasetEvent = { tags?: string[]; metadata?: unknown; created?: string; + origin?: ObjectReference; id: string; dataset_id: string; } & ({ expected?: unknown } | { output?: unknown }); From 222b907ac684bfbd27db3b84276527fcd39b8e14 Mon Sep 17 00:00:00 2001 From: Ankur Goyal Date: Sun, 3 May 2026 15:34:29 -0400 Subject: [PATCH 2/2] updates --- js/src/dataset-pipeline.ts | 37 ++++++++-------- js/src/exports.ts | 3 +- js/src/logger.ts | 1 + js/src/span-cache.ts | 1 + js/src/trace.ts | 91 +++++++++++++++++++++++++------------- 5 files changed, 82 insertions(+), 51 deletions(-) diff --git a/js/src/dataset-pipeline.ts b/js/src/dataset-pipeline.ts index 8786c680b..3b5ed7b0e 100644 --- a/js/src/dataset-pipeline.ts +++ b/js/src/dataset-pipeline.ts @@ -1,5 +1,10 @@ import type { ObjectReferenceType as ObjectReference } from "./generated_types"; -import type { Dataset, FullInitDatasetOptions } from "./logger"; +import type { + BaseMetadata, + Dataset, + DefaultMetadataType, + FullInitDatasetOptions, +} from "./logger"; import type { Trace } from "./trace"; export type DatasetPipelineScope = "span" | "trace"; @@ -10,7 +15,6 @@ export type DatasetPipelineSource = { orgName?: string; filter?: string; scope?: DatasetPipelineScope; - limit?: number; }; type DatasetPipelineInitDatasetOptions = FullInitDatasetOptions; @@ -28,21 +32,17 @@ export type DatasetPipelineTarget = { export type DatasetPipelineRow = Parameters[0]; -export type DatasetPipelineCandidate = { - trace: Trace; - /** - * The matching source span row id when the source scope is "span". - */ - id?: string; - /** - * Default provenance for rows returned by transform. In span scope this - * points at the matching source span row. - */ - origin?: ObjectReference; -}; - -export type DatasetPipelineTransformContext = { - pipeline: DatasetPipelineDefinition; +export type DatasetPipelineTransformArgs< + Input = unknown, + Output = unknown, + Expected = unknown, + Metadata extends BaseMetadata = DefaultMetadataType, +> = { + input?: Input; + output?: Output; + expected?: Expected; + metadata?: Metadata extends void ? Record : Metadata; + trace?: Trace; }; export type DatasetPipelineTransformResult = @@ -52,8 +52,7 @@ export type DatasetPipelineTransformResult = | undefined; export type DatasetPipelineTransform = ( - candidate: DatasetPipelineCandidate, - context: DatasetPipelineTransformContext, + args: DatasetPipelineTransformArgs, ) => DatasetPipelineTransformResult | Promise; export type DatasetPipelineDefinition = { diff --git a/js/src/exports.ts b/js/src/exports.ts index 7bbd11325..b88008656 100644 --- a/js/src/exports.ts +++ b/js/src/exports.ts @@ -216,7 +216,6 @@ export { } from "./framework"; export type { - DatasetPipelineCandidate, DatasetPipelineDefinition, DatasetPipelineOrigin, DatasetPipelineRow, @@ -224,7 +223,7 @@ export type { DatasetPipelineSource, DatasetPipelineTarget, DatasetPipelineTransform, - DatasetPipelineTransformContext, + DatasetPipelineTransformArgs, DatasetPipelineTransformResult, } from "./dataset-pipeline"; diff --git a/js/src/logger.ts b/js/src/logger.ts index 559f4fddb..3e412d53f 100644 --- a/js/src/logger.ts +++ b/js/src/logger.ts @@ -6901,6 +6901,7 @@ export class SpanImpl implements Span { const cachedSpan: CachedSpan = { input: partialRecord.input, output: partialRecord.output, + expected: partialRecord.expected, metadata: partialRecord.metadata, span_id: this._spanId, span_parents: this._spanParents, diff --git a/js/src/span-cache.ts b/js/src/span-cache.ts index 61c50d577..191497257 100644 --- a/js/src/span-cache.ts +++ b/js/src/span-cache.ts @@ -36,6 +36,7 @@ function canUseSpanCache(): boolean { export interface CachedSpan { input?: unknown; output?: unknown; + expected?: unknown; metadata?: Record; span_id: string; span_parents?: string[]; diff --git a/js/src/trace.ts b/js/src/trace.ts index a821a239f..f7e41ec3e 100644 --- a/js/src/trace.ts +++ b/js/src/trace.ts @@ -23,9 +23,14 @@ export class SpanFetcher extends ObjectFetcher { private readonly rootSpanId: string, private readonly _state: BraintrustState, private readonly spanTypeFilter?: string[], + includeScorers = false, ) { // Build the filter expression for root_span_id and optionally span_attributes.type - const filterExpr = SpanFetcher.buildFilter(rootSpanId, spanTypeFilter); + const filterExpr = SpanFetcher.buildFilter( + rootSpanId, + spanTypeFilter, + includeScorers, + ); super(objectType, undefined, undefined, { filter: filterExpr, @@ -35,6 +40,7 @@ export class SpanFetcher extends ObjectFetcher { private static buildFilter( rootSpanId: string, spanTypeFilter?: string[], + includeScorers = false, ): Record { const children: Record[] = [ // Base filter: root_span_id = 'value' @@ -43,8 +49,10 @@ export class SpanFetcher extends ObjectFetcher { left: { op: "ident", name: ["root_span_id"] }, right: { op: "literal", value: rootSpanId }, }, - // Exclude span_attributes.purpose = 'score' - { + ]; + + if (!includeScorers) { + children.push({ op: "or", children: [ { @@ -57,8 +65,8 @@ export class SpanFetcher extends ObjectFetcher { right: { op: "literal", value: "scorer" }, }, ], - }, - ]; + }); + } // If no spanType filter, just return root_span_id filter if (spanTypeFilter && spanTypeFilter.length > 0) { @@ -92,6 +100,7 @@ export class SpanFetcher extends ObjectFetcher { export interface SpanData { input?: unknown; output?: unknown; + expected?: unknown; metadata?: Record; span_id?: string; span_parents?: string[]; @@ -107,6 +116,10 @@ export interface SpanData { export type SpanFetchFn = ( spanType: string[] | undefined, ) => Promise; +export type SpanFetchWithOptionsFn = ( + spanType: string[] | undefined, + includeScorers: boolean, +) => Promise; /** * Cached span fetcher that handles fetching and caching spans by type. @@ -119,7 +132,7 @@ export type SpanFetchFn = ( export class CachedSpanFetcher { private spanCache = new Map(); private allFetched = false; - private fetchFn: SpanFetchFn; + private fetchFn: SpanFetchWithOptionsFn; constructor( objectType: "experiment" | "project_logs" | "playground_logs", @@ -140,11 +153,11 @@ export class CachedSpanFetcher { ) { if (typeof objectTypeOrFetchFn === "function") { // Direct fetch function injection (for testing) - this.fetchFn = objectTypeOrFetchFn; + this.fetchFn = (spanType) => objectTypeOrFetchFn(spanType); } else { // Standard constructor with SpanFetcher const objectType = objectTypeOrFetchFn; - this.fetchFn = async (spanType) => { + this.fetchFn = async (spanType, includeScorers) => { const state = await getState!(); const fetcher = new SpanFetcher( objectType, @@ -152,30 +165,39 @@ export class CachedSpanFetcher { rootSpanId!, state, spanType, + includeScorers, ); const rows: WithTransactionId[] = await fetcher.fetchedData(); - return rows - .filter((row) => row.span_attributes?.purpose !== "scorer") - .map((row) => ({ - input: row.input, - output: row.output, - metadata: row.metadata, - span_id: row.span_id, - span_parents: row.span_parents, - span_attributes: row.span_attributes, - id: row.id, - _xact_id: row._xact_id, - _pagination_key: row._pagination_key, - root_span_id: row.root_span_id, - })); + return rows.map((row) => ({ + input: row.input, + output: row.output, + expected: row.expected, + metadata: row.metadata, + span_id: row.span_id, + span_parents: row.span_parents, + span_attributes: row.span_attributes, + id: row.id, + _xact_id: row._xact_id, + _pagination_key: row._pagination_key, + root_span_id: row.root_span_id, + created: row.created, + tags: row.tags, + })); }; } } - async getSpans({ spanType }: { spanType?: string[] } = {}): Promise< + async getSpans({ + spanType, + includeScorers = false, + }: { spanType?: string[]; includeScorers?: boolean } = {}): Promise< SpanData[] > { + if (includeScorers) { + return this.fetchFn(spanType, true); + } + // If we've fetched all spans, just filter from cache if (this.allFetched) { return this.getFromCache(spanType); @@ -202,7 +224,7 @@ export class CachedSpanFetcher { } private async fetchSpans(spanType: string[] | undefined): Promise { - const spans = await this.fetchFn(spanType); + const spans = await this.fetchFn(spanType, false); for (const span of spans) { const type = span.span_attributes?.type ?? ""; @@ -248,7 +270,10 @@ export interface Trace { object_id: string; root_span_id: string; }; - getSpans(options?: { spanType?: string[] }): Promise; + getSpans(options?: { + spanType?: string[]; + includeScorers?: boolean; + }): Promise; /** * Get the thread (preprocessed messages) for this trace. * Uses the project default preprocessor, falling back to the global "thread" preprocessor. @@ -328,15 +353,20 @@ export class LocalTrace implements Trace { * First checks the local span cache for recently logged spans, then falls * back to CachedSpanFetcher which handles BTQL fetching and caching. */ - async getSpans({ spanType }: { spanType?: string[] } = {}): Promise< + async getSpans({ + spanType, + includeScorers = false, + }: { spanType?: string[]; includeScorers?: boolean } = {}): Promise< SpanData[] > { // Try local span cache first (for recently logged spans not yet flushed) const cachedSpans = this.state.spanCache.getByRootSpanId(this.rootSpanId); if (cachedSpans && cachedSpans.length > 0) { - let spans = cachedSpans.filter( - (span) => span.span_attributes?.purpose !== "scorer", - ); + let spans = includeScorers + ? cachedSpans + : cachedSpans.filter( + (span) => span.span_attributes?.purpose !== "scorer", + ); if (spanType && spanType.length > 0) { spans = spans.filter((span) => @@ -347,6 +377,7 @@ export class LocalTrace implements Trace { return spans.map((span) => ({ input: span.input, output: span.output, + expected: span.expected, metadata: span.metadata, span_id: span.span_id, span_parents: span.span_parents, @@ -355,7 +386,7 @@ export class LocalTrace implements Trace { } // Fall back to CachedSpanFetcher for BTQL fetching with caching - return this.cachedFetcher.getSpans({ spanType }); + return this.cachedFetcher.getSpans({ spanType, includeScorers }); } /**