From 760b9c9de492b4c9f099f2faaaef587858130fb0 Mon Sep 17 00:00:00 2001 From: stack72 Date: Tue, 5 May 2026 13:05:25 +0100 Subject: [PATCH] fix(data): populate ownerDefinition.jobName on workflow step output (swamp-club#237) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Workflow step output data had `ownerDefinition.jobName` always empty, so `swamp data query 'jobName == "..."'` returned 0 rows even though the CEL schema and `design/data-query.md:128` advertise it as a queryable provenance field. Two missing links in the existing provenance pipeline: - `workflowTagOverrides` in execution_service.ts set workflow / workflowRunId / step / source but no `job` tag. - Both `data_writer.ts` factories translated `step → stepName`, `workflow → workflowName`, `source → source` but had no `job → jobName` mapping. This fix mirrors the surrounding `step → stepName` plumbing exactly. No schema change, no migration: `OwnerDefinition.jobName?` already exists, the catalog column is already there, the CEL field is already advertised. This change makes the code match the documented contract. Forward-only: data created before this fix retains empty `jobName`. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../workflow_query_data_context_test.ts | 76 +++++++++++++++++++ src/domain/models/data_writer.ts | 2 + src/domain/models/data_writer_test.ts | 63 +++++++++++++++ src/domain/workflows/execution_service.ts | 1 + 4 files changed, 142 insertions(+) diff --git a/integration/workflow_query_data_context_test.ts b/integration/workflow_query_data_context_test.ts index 651a53d7..ca13f449 100644 --- a/integration/workflow_query_data_context_test.ts +++ b/integration/workflow_query_data_context_test.ts @@ -212,3 +212,79 @@ Deno.test("queryData chain: factory + driver derive working queryData from dataQ } }); }); + +// Lab issue #237: jobName was always empty in CEL data query results because +// workflow tag overrides never carried `job` and data_writer never mapped it +// onto OwnerDefinition. This test pins down the read-path contract that all +// three provenance fields (workflowName, jobName, stepName) are queryable +// via top-level CEL identifiers when populated on OwnerDefinition. +Deno.test("data query: workflowName, jobName, stepName CEL fields resolve from ownerDefinition", async () => { + await withTempDir(async (repoDir) => { + await setupRepoDir(repoDir); + + const catalogStore = new CatalogStore( + join(repoDir, ".swamp", "data", "_catalog.db"), + ); + try { + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + catalogStore, + ); + const definitionRepo = new YamlDefinitionRepository(repoDir); + const dataQueryService = new DataQueryService(catalogStore, dataRepo); + + const seedModel = Definition.create({ + name: "provenance_source", + type: TEST_MODEL_TYPE.normalized, + }); + await definitionRepo.save(TEST_MODEL_TYPE, seedModel); + const seedData = Data.create({ + name: "provenance_row", + contentType: "application/json", + lifetime: "infinite", + garbageCollection: 10, + tags: { + type: "resource", + workflow: "wf-237", + job: "job-237", + step: "step-17", + }, + ownerDefinition: { + ownerType: "model-method", + ownerRef: `${TEST_MODEL_TYPE.normalized}:seed`, + workflowName: "wf-237", + jobName: "job-237", + stepName: "step-17", + source: "step-output", + }, + }); + await dataRepo.save( + TEST_MODEL_TYPE, + seedModel.id, + seedData, + new TextEncoder().encode(JSON.stringify({})), + ); + + const byWorkflow = await dataQueryService.query( + 'workflowName == "wf-237"', + ) as DataRecord[]; + assertEquals(byWorkflow.length, 1); + assertEquals(byWorkflow[0].name, "provenance_row"); + + const byJob = await dataQueryService.query( + 'jobName == "job-237"', + ) as DataRecord[]; + assertEquals(byJob.length, 1); + assertEquals(byJob[0].name, "provenance_row"); + + const byStep = await dataQueryService.query( + 'stepName == "step-17"', + ) as DataRecord[]; + assertEquals(byStep.length, 1); + assertEquals(byStep[0].name, "provenance_row"); + } finally { + catalogStore.close(); + } + }); +}); diff --git a/src/domain/models/data_writer.ts b/src/domain/models/data_writer.ts index 9334f87d..7f1cf29e 100644 --- a/src/domain/models/data_writer.ts +++ b/src/domain/models/data_writer.ts @@ -584,6 +584,7 @@ export function createResourceWriter( ...(tagOverrides?.["workflow"] ? { workflowName: tagOverrides["workflow"] } : {}), + ...(tagOverrides?.["job"] ? { jobName: tagOverrides["job"] } : {}), ...(tagOverrides?.["step"] ? { stepName: tagOverrides["step"] } : {}), ...(tagOverrides?.["source"] ? { source: tagOverrides["source"] } : {}), }, @@ -870,6 +871,7 @@ export function createFileWriterFactory( ...(tagOverrides?.["workflow"] ? { workflowName: tagOverrides["workflow"] } : {}), + ...(tagOverrides?.["job"] ? { jobName: tagOverrides["job"] } : {}), ...(tagOverrides?.["step"] ? { stepName: tagOverrides["step"] } : {}), ...(tagOverrides?.["source"] ? { source: tagOverrides["source"] } : {}), }, diff --git a/src/domain/models/data_writer_test.ts b/src/domain/models/data_writer_test.ts index 9a6fd4ac..f0474c13 100644 --- a/src/domain/models/data_writer_test.ts +++ b/src/domain/models/data_writer_test.ts @@ -296,6 +296,45 @@ Deno.test("createResourceWriter: omits ownerDefinition.workflowRunId when not in assertEquals(handle.metadata.ownerDefinition.workflowRunId, undefined); }); +Deno.test("createResourceWriter: populates ownerDefinition.jobName and stepName from tagOverrides", async () => { + const repo = createMockRepo(); + const tagOverrides = { + source: "step-output", + workflow: "my-wf", + job: "my-job", + step: "my-step", + }; + + const { writeResource } = createResourceWriter( + repo, + modelType, + modelId, + testResources, + tagOverrides, + ); + + const handle = await writeResource("item", "test-item", { value: "hello" }); + assertEquals(handle.metadata.ownerDefinition.jobName, "my-job"); + assertEquals(handle.metadata.ownerDefinition.stepName, "my-step"); + assertEquals(handle.metadata.ownerDefinition.workflowName, "my-wf"); +}); + +Deno.test("createResourceWriter: omits ownerDefinition.jobName when not in tagOverrides", async () => { + const repo = createMockRepo(); + const tagOverrides = { source: "step-output", workflow: "my-wf" }; + + const { writeResource } = createResourceWriter( + repo, + modelType, + modelId, + testResources, + tagOverrides, + ); + + const handle = await writeResource("item", "test-item", { value: "hello" }); + assertEquals(handle.metadata.ownerDefinition.jobName, undefined); +}); + Deno.test("createFileWriterFactory: populates ownerDefinition.workflowRunId from tagOverrides", async () => { const repo = createMockRepo(); const workflowRunId = "6be8c3b8-ff3f-4e23-b998-fd9456d84d0a"; @@ -320,6 +359,30 @@ Deno.test("createFileWriterFactory: populates ownerDefinition.workflowRunId from assertEquals(handle.metadata.ownerDefinition.ownerRef, modelId); }); +Deno.test("createFileWriterFactory: populates ownerDefinition.jobName and stepName from tagOverrides", async () => { + const repo = createMockRepo(); + const tagOverrides = { + source: "step-output", + workflow: "my-wf", + job: "my-job", + step: "my-step", + }; + + const { createFileWriter } = createFileWriterFactory( + repo, + modelType, + modelId, + testFiles, + tagOverrides, + ); + + const writer = createFileWriter("log", "test-log"); + const handle = await writer.writeText("log content"); + assertEquals(handle.metadata.ownerDefinition.jobName, "my-job"); + assertEquals(handle.metadata.ownerDefinition.stepName, "my-step"); + assertEquals(handle.metadata.ownerDefinition.workflowName, "my-wf"); +}); + Deno.test("createResourceWriter: rejects reserved data name 'latest'", async () => { const repo = createMockRepo(); const { writeResource } = createResourceWriter( diff --git a/src/domain/workflows/execution_service.ts b/src/domain/workflows/execution_service.ts index 5c345702..f2ea7b0f 100644 --- a/src/domain/workflows/execution_service.ts +++ b/src/domain/workflows/execution_service.ts @@ -620,6 +620,7 @@ export class DefaultStepExecutor implements StepExecutor { source: "step-output", workflow: ctx.workflowName, workflowRunId: ctx.workflowRunId, + job: ctx.jobName, step: ctx.stepName, };