Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions integration/workflow_query_data_context_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,79 @@ Deno.test("queryData chain: factory + driver derive working queryData from dataQ
}
});
});

// Lab issue #237: jobName was always empty in CEL data query results because
// workflow tag overrides never carried `job` and data_writer never mapped it
// onto OwnerDefinition. This test pins down the read-path contract that all
// three provenance fields (workflowName, jobName, stepName) are queryable
// via top-level CEL identifiers when populated on OwnerDefinition.
Deno.test("data query: workflowName, jobName, stepName CEL fields resolve from ownerDefinition", async () => {
await withTempDir(async (repoDir) => {
await setupRepoDir(repoDir);

const catalogStore = new CatalogStore(
join(repoDir, ".swamp", "data", "_catalog.db"),
);
try {
const dataRepo = new FileSystemUnifiedDataRepository(
repoDir,
undefined,
catalogStore,
);
const definitionRepo = new YamlDefinitionRepository(repoDir);
const dataQueryService = new DataQueryService(catalogStore, dataRepo);

const seedModel = Definition.create({
name: "provenance_source",
type: TEST_MODEL_TYPE.normalized,
});
await definitionRepo.save(TEST_MODEL_TYPE, seedModel);
const seedData = Data.create({
name: "provenance_row",
contentType: "application/json",
lifetime: "infinite",
garbageCollection: 10,
tags: {
type: "resource",
workflow: "wf-237",
job: "job-237",
step: "step-17",
},
ownerDefinition: {
ownerType: "model-method",
ownerRef: `${TEST_MODEL_TYPE.normalized}:seed`,
workflowName: "wf-237",
jobName: "job-237",
stepName: "step-17",
source: "step-output",
},
});
await dataRepo.save(
TEST_MODEL_TYPE,
seedModel.id,
seedData,
new TextEncoder().encode(JSON.stringify({})),
);

const byWorkflow = await dataQueryService.query(
'workflowName == "wf-237"',
) as DataRecord[];
assertEquals(byWorkflow.length, 1);
assertEquals(byWorkflow[0].name, "provenance_row");

const byJob = await dataQueryService.query(
'jobName == "job-237"',
) as DataRecord[];
assertEquals(byJob.length, 1);
assertEquals(byJob[0].name, "provenance_row");

const byStep = await dataQueryService.query(
'stepName == "step-17"',
) as DataRecord[];
assertEquals(byStep.length, 1);
assertEquals(byStep[0].name, "provenance_row");
} finally {
catalogStore.close();
}
});
});
2 changes: 2 additions & 0 deletions src/domain/models/data_writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ export function createResourceWriter(
...(tagOverrides?.["workflow"]
? { workflowName: tagOverrides["workflow"] }
: {}),
...(tagOverrides?.["job"] ? { jobName: tagOverrides["job"] } : {}),
...(tagOverrides?.["step"] ? { stepName: tagOverrides["step"] } : {}),
...(tagOverrides?.["source"] ? { source: tagOverrides["source"] } : {}),
},
Expand Down Expand Up @@ -870,6 +871,7 @@ export function createFileWriterFactory(
...(tagOverrides?.["workflow"]
? { workflowName: tagOverrides["workflow"] }
: {}),
...(tagOverrides?.["job"] ? { jobName: tagOverrides["job"] } : {}),
...(tagOverrides?.["step"] ? { stepName: tagOverrides["step"] } : {}),
...(tagOverrides?.["source"] ? { source: tagOverrides["source"] } : {}),
},
Expand Down
63 changes: 63 additions & 0 deletions src/domain/models/data_writer_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,45 @@ Deno.test("createResourceWriter: omits ownerDefinition.workflowRunId when not in
assertEquals(handle.metadata.ownerDefinition.workflowRunId, undefined);
});

Deno.test("createResourceWriter: populates ownerDefinition.jobName and stepName from tagOverrides", async () => {
const repo = createMockRepo();
const tagOverrides = {
source: "step-output",
workflow: "my-wf",
job: "my-job",
step: "my-step",
};

const { writeResource } = createResourceWriter(
repo,
modelType,
modelId,
testResources,
tagOverrides,
);

const handle = await writeResource("item", "test-item", { value: "hello" });
assertEquals(handle.metadata.ownerDefinition.jobName, "my-job");
assertEquals(handle.metadata.ownerDefinition.stepName, "my-step");
assertEquals(handle.metadata.ownerDefinition.workflowName, "my-wf");
});

Deno.test("createResourceWriter: omits ownerDefinition.jobName when not in tagOverrides", async () => {
const repo = createMockRepo();
const tagOverrides = { source: "step-output", workflow: "my-wf" };

const { writeResource } = createResourceWriter(
repo,
modelType,
modelId,
testResources,
tagOverrides,
);

const handle = await writeResource("item", "test-item", { value: "hello" });
assertEquals(handle.metadata.ownerDefinition.jobName, undefined);
});

Deno.test("createFileWriterFactory: populates ownerDefinition.workflowRunId from tagOverrides", async () => {
const repo = createMockRepo();
const workflowRunId = "6be8c3b8-ff3f-4e23-b998-fd9456d84d0a";
Expand All @@ -320,6 +359,30 @@ Deno.test("createFileWriterFactory: populates ownerDefinition.workflowRunId from
assertEquals(handle.metadata.ownerDefinition.ownerRef, modelId);
});

Deno.test("createFileWriterFactory: populates ownerDefinition.jobName and stepName from tagOverrides", async () => {
const repo = createMockRepo();
const tagOverrides = {
source: "step-output",
workflow: "my-wf",
job: "my-job",
step: "my-step",
};

const { createFileWriter } = createFileWriterFactory(
repo,
modelType,
modelId,
testFiles,
tagOverrides,
);

const writer = createFileWriter("log", "test-log");
const handle = await writer.writeText("log content");
assertEquals(handle.metadata.ownerDefinition.jobName, "my-job");
assertEquals(handle.metadata.ownerDefinition.stepName, "my-step");
assertEquals(handle.metadata.ownerDefinition.workflowName, "my-wf");
});

Deno.test("createResourceWriter: rejects reserved data name 'latest'", async () => {
const repo = createMockRepo();
const { writeResource } = createResourceWriter(
Expand Down
1 change: 1 addition & 0 deletions src/domain/workflows/execution_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,7 @@ export class DefaultStepExecutor implements StepExecutor {
source: "step-output",
workflow: ctx.workflowName,
workflowRunId: ctx.workflowRunId,
job: ctx.jobName,
step: ctx.stepName,
};

Expand Down
Loading