From 08fe1c71b6eed4880193f1b326caecc9889dbd24 Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Fri, 5 Jun 2026 12:52:31 -0700 Subject: [PATCH] Add cross-pipeline lineage scan (client-side) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit scanPipelinesForLineage(originId, targetDigest) enumerates every locally-stored pipeline via getAllComponentFilesFromList and finds tasks sharing the origin — recursing through subgraphs — reporting per pipeline which tasks are pending vs already on the edited version. Pipelines live client-side, so this is the cross-pipeline discovery mechanism with no backend involved. Adds parseLineage to read lineage from a stored annotation (JSON string or object); embeddedLineageOf now reuses it. This feeds the Reconcile overview (built with the session + routing branch). --- .../lineage/scanPipelinesForLineage.test.ts | 109 ++++++++++++++++++ .../Editor/lineage/scanPipelinesForLineage.ts | 93 +++++++++++++++ src/utils/lineage.ts | 18 ++- 3 files changed, 215 insertions(+), 5 deletions(-) create mode 100644 src/routes/v2/pages/Editor/lineage/scanPipelinesForLineage.test.ts create mode 100644 src/routes/v2/pages/Editor/lineage/scanPipelinesForLineage.ts diff --git a/src/routes/v2/pages/Editor/lineage/scanPipelinesForLineage.test.ts b/src/routes/v2/pages/Editor/lineage/scanPipelinesForLineage.test.ts new file mode 100644 index 000000000..acdf3c733 --- /dev/null +++ b/src/routes/v2/pages/Editor/lineage/scanPipelinesForLineage.test.ts @@ -0,0 +1,109 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { LINEAGE_ORIGIN_ANNOTATION } from "@/utils/annotations"; + +import { scanPipelinesForLineage } from "./scanPipelinesForLineage"; + +vi.mock("@/utils/componentStore", () => ({ + getAllComponentFilesFromList: vi.fn(), +})); + +const { getAllComponentFilesFromList } = await import("@/utils/componentStore"); +const mockGetAll = vi.mocked(getAllComponentFilesFromList); + +const ORIGIN = "https://x/train.yaml"; +const TARGET = "edited-digest"; + +const lineageAnn = (originId: string) => ({ + [LINEAGE_ORIGIN_ANNOTATION]: JSON.stringify({ originId }), +}); + +const containerTask = (name: string, digest: string, originId?: string) => ({ + componentRef: { name, digest }, + annotations: originId ? lineageAnn(originId) : {}, +}); + +const subgraphTask = ( + name: string, + nestedTasks: Record, + originId?: string, +) => ({ + componentRef: { + name, + spec: { name, implementation: { graph: { tasks: nestedTasks } } }, + }, + annotations: originId ? lineageAnn(originId) : {}, +}); + +const pipeline = (name: string, tasks: Record) => ({ + componentRef: { spec: { name, implementation: { graph: { tasks } } } }, +}); + +const asStore = (entries: Record) => + new Map(Object.entries(entries)); + +describe("scanPipelinesForLineage", () => { + beforeEach(() => mockGetAll.mockReset()); + + it("returns pipelines with matching tasks and pending/reconciled counts", async () => { + mockGetAll.mockResolvedValue( + asStore({ + "Pipeline A": pipeline("Pipeline A", { + "Train old": containerTask("Train old", "old-digest", ORIGIN), + "Train new": containerTask("Train new", TARGET, ORIGIN), + Unrelated: containerTask("Unrelated", "z", "https://x/other.yaml"), + }), + "Pipeline B": pipeline("Pipeline B", { + Nothing: containerTask("Nothing", "n"), + }), + }), + ); + + const results = await scanPipelinesForLineage(ORIGIN, TARGET); + + expect(results).toHaveLength(1); + expect(results[0]).toMatchObject({ + storageKey: "Pipeline A", + pipelineName: "Pipeline A", + pendingCount: 1, + reconciledCount: 1, + }); + expect(results[0].tasks.map((t) => t.taskName).sort()).toEqual([ + "Train new", + "Train old", + ]); + }); + + it("recurses into subgraphs and records the path", async () => { + mockGetAll.mockResolvedValue( + asStore({ + "Pipeline C": pipeline("Pipeline C", { + Group: subgraphTask("Group", { + "Nested train": containerTask("Nested train", "old", ORIGIN), + }), + }), + }), + ); + + const results = await scanPipelinesForLineage(ORIGIN, TARGET); + + expect(results).toHaveLength(1); + expect(results[0].tasks[0]).toMatchObject({ + taskName: "Nested train", + subgraphPath: ["Group"], + reconciled: false, + }); + }); + + it("returns nothing when no pipeline shares the origin", async () => { + mockGetAll.mockResolvedValue( + asStore({ + "Pipeline D": pipeline("Pipeline D", { + A: containerTask("A", "d", "https://x/other.yaml"), + }), + }), + ); + + expect(await scanPipelinesForLineage(ORIGIN, TARGET)).toEqual([]); + }); +}); diff --git a/src/routes/v2/pages/Editor/lineage/scanPipelinesForLineage.ts b/src/routes/v2/pages/Editor/lineage/scanPipelinesForLineage.ts new file mode 100644 index 000000000..bf10c971d --- /dev/null +++ b/src/routes/v2/pages/Editor/lineage/scanPipelinesForLineage.ts @@ -0,0 +1,93 @@ +import { LINEAGE_ORIGIN_ANNOTATION } from "@/utils/annotations"; +import { + type ComponentSpec, + isGraphImplementation, +} from "@/utils/componentSpec"; +import { + type ComponentFileEntry, + getAllComponentFilesFromList, +} from "@/utils/componentStore"; +import { USER_PIPELINES_LIST_NAME } from "@/utils/constants"; +import { parseLineage } from "@/utils/lineage"; + +interface PipelineLineageTaskMatch { + taskName: string; + subgraphPath: string[]; + digest?: string; + /** True when this task is already on the target (edited) version. */ + reconciled: boolean; +} + +export interface PipelineLineageMatch { + /** Storage key (also the pipeline route param). */ + storageKey: string; + pipelineName: string; + tasks: PipelineLineageTaskMatch[]; + /** Tasks sharing the origin but not yet on the target version. */ + pendingCount: number; + /** Tasks already on the target version. */ + reconciledCount: number; +} + +function walkSpec( + spec: ComponentSpec | undefined, + originId: string, + targetDigest: string | undefined, + path: string[], + out: PipelineLineageTaskMatch[], +): void { + const impl = spec?.implementation; + if (!impl || !isGraphImplementation(impl)) return; + + for (const [taskName, task] of Object.entries(impl.graph.tasks)) { + const lineage = parseLineage(task.annotations?.[LINEAGE_ORIGIN_ANNOTATION]); + if (lineage?.originId === originId) { + const digest = task.componentRef.digest; + out.push({ + taskName, + subgraphPath: path, + digest, + reconciled: targetDigest != null && digest === targetDigest, + }); + } + + const nestedSpec = task.componentRef.spec; + if (nestedSpec && isGraphImplementation(nestedSpec.implementation)) { + walkSpec(nestedSpec, originId, targetDigest, [...path, taskName], out); + } + } +} + +/** + * Scan every locally-stored pipeline for tasks sharing `originId` (recursing + * through subgraphs). Pipelines live client-side, so this is the cross-pipeline + * discovery mechanism — no backend involved. `targetDigest` is the edited + * version: tasks already at it are counted as reconciled, the rest as pending. + * + * Returns only pipelines with at least one matching task. + */ +export async function scanPipelinesForLineage( + originId: string, + targetDigest?: string, +): Promise { + const files = await getAllComponentFilesFromList(USER_PIPELINES_LIST_NAME); + + const results: PipelineLineageMatch[] = []; + for (const [storageKey, entry] of files) { + const spec = (entry as ComponentFileEntry).componentRef.spec; + const tasks: PipelineLineageTaskMatch[] = []; + walkSpec(spec, originId, targetDigest, [], tasks); + + if (tasks.length === 0) continue; + + results.push({ + storageKey, + pipelineName: spec?.name ?? storageKey, + tasks, + pendingCount: tasks.filter((t) => !t.reconciled).length, + reconciledCount: tasks.filter((t) => t.reconciled).length, + }); + } + + return results; +} diff --git a/src/utils/lineage.ts b/src/utils/lineage.ts index b9ef01f93..4ab4f6472 100644 --- a/src/utils/lineage.ts +++ b/src/utils/lineage.ts @@ -82,6 +82,18 @@ export function makeLineage(ref: ReferenceLike): ComponentLineage | undefined { }; } +/** + * Parse a lineage value as stored in an annotation — either a JSON string (the + * serialized form that round-trips through YAML) or an already-parsed object. + * Returns `undefined` when absent or invalid. + */ +export function parseLineage(raw: unknown): ComponentLineage | undefined { + if (raw == null) return undefined; + const value = typeof raw === "string" ? safeJsonParse(raw) : raw; + const result = componentLineageSchema.safeParse(value); + return result.success ? result.data : undefined; +} + /** * Read a lineage previously embedded in a (published) component spec's metadata * annotations, if present and valid. @@ -89,11 +101,7 @@ export function makeLineage(ref: ReferenceLike): ComponentLineage | undefined { export function embeddedLineageOf( spec: ComponentSpec | undefined, ): ComponentLineage | undefined { - const raw = spec?.metadata?.annotations?.[EMBEDDED_LINEAGE_KEY]; - if (raw == null) return undefined; - const value = typeof raw === "string" ? safeJsonParse(raw) : raw; - const result = componentLineageSchema.safeParse(value); - return result.success ? result.data : undefined; + return parseLineage(spec?.metadata?.annotations?.[EMBEDDED_LINEAGE_KEY]); } /**