Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .claude/skills/swamp-workflow/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,33 @@ swamp workflow evaluate --all --json
- Vault expressions (`${{ vault.get(...) }}`) remain raw for runtime resolution
- Output saved to `.swamp/workflows-evaluated/` for `--last-evaluated` use

## Concurrency Limits

Add `concurrency: N` at the workflow, job, or step level to cap parallel
execution. Absent or `0` means unbounded. Resolution: step > job > workflow >
unbounded. A `SWAMP_MAX_CONCURRENT_STEPS` env var provides a host-level ceiling.
See
[references/expressions-and-foreach.md](references/expressions-and-foreach.md)
for forEach concurrency examples.

```yaml
concurrency: 10 # workflow level — caps parallel jobs
jobs:
- name: fan-out
concurrency: 5 # job level — caps parallel steps
steps:
- name: per-item
forEach:
item: target
in: ${{ inputs.targets }}
concurrency: 3 # step level — caps forEach iterations
task: {
type: model_method,
modelIdOrName: api-client,
methodName: call,
}
```

## Allow Failure

Steps can be marked with `allowFailure: true` so their failure does not fail the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,34 @@ through `task.inputs`. See
[nested-workflows.md § When to Use Nested Workflows](nested-workflows.md#when-to-use-nested-workflows)
for the full pattern.

### forEach with Concurrency Limits

By default, all forEach iterations run in parallel. Add `concurrency` to cap
simultaneous execution — useful for rate-limited APIs or resource-constrained
hosts:

```yaml
steps:
- name: call-${{ self.target }}
forEach:
item: target
in: ${{ inputs.targets }}
concurrency: 3
task:
type: model_method
modelIdOrName: api-client
methodName: call
inputs:
target: ${{ self.target }}
```

With 10 targets and `concurrency: 3`, at most 3 iterations execute at once. The
remaining iterations queue until a permit is released. Resolution order:
`step → job → workflow → unbounded` — the most-local non-zero value wins.

A global `SWAMP_MAX_CONCURRENT_STEPS` environment variable provides a host-level
ceiling: `min(local, global)` is the effective limit.

### forEach with Vary Dimensions

Use `vary` on `dataOutputOverrides` to isolate data per forEach iteration:
Expand Down
44 changes: 42 additions & 2 deletions design/workflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,52 @@ and only execute if their dependcy condition is met (for example, only run this
job if one of its upstream dependencies fail).

Within a job, steps are executed with a weighted topological sort, so that they
have maximum paralleism through the job.
have maximum parallelism through the job. Steps support an optional
`concurrency` field that caps how many steps in a topological level run
simultaneously — particularly useful for `forEach` expansions that hit
rate-limited APIs.

Jobs can have dependencies on other jobs. The entire workflow is executed with a
weighted topological sort, so thtat htye have maximum paralleism through the
weighted topological sort, so that they have maximum parallelism through the
workflow. Like steps, jobs also have conditions that trigger them.

## Concurrency Limits

By default, all jobs in a topological level and all steps in a topological level
run concurrently (maximum parallelism). The optional `concurrency` field caps
the number of simultaneously executing units at each level:

```yaml
concurrency: 10 # workflow level — caps parallel jobs

jobs:
- name: fan-out
concurrency: 5 # job level — caps parallel steps in this job
steps:
- name: per-item
forEach:
item: target
in: ${{ inputs.targets }}
concurrency: 3 # step level — caps forEach iterations
task: { ... }
```

**Semantics:**

- A positive integer is a hard cap on simultaneously executing units at that
level.
- `0` or absent means unbounded (current default behavior).
- Resolution order: step > job > workflow > unbounded. The most-local non-zero
value wins.
- A global `SWAMP_MAX_CONCURRENT_STEPS` environment variable provides a
host-level ceiling. The effective limit is `min(local, global)` when both are
set.

Concurrency limiting is implemented via a semaphore-gated
`mergeWithConcurrency()` that wraps the existing `merge()` stream combinator.
When the limit is unset or exceeds the stream count, the unbounded `merge()` path
is used with zero overhead.

Workflows are specified in YAML files, that are validated with Zod, in the
top-level `workflows/` directory of the repository, as
`workflows/workflow-{uuid}.yaml`. Workflow run output is stored in the datastore
Expand Down
60 changes: 57 additions & 3 deletions src/domain/workflows/execution_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ import {
import { join } from "@std/path";
import { SecretRedactor } from "../secrets/mod.ts";
import { VaultService } from "../vaults/vault_service.ts";
import { merge } from "../../infrastructure/stream/merge.ts";
import { mergeWithConcurrency } from "../../infrastructure/stream/merge.ts";
import { withEventBridge } from "../../infrastructure/stream/event_bridge.ts";
import type { ReportFilterOptions } from "../reports/report_execution_service.ts";
import { getTracer, SpanStatusCode } from "../../infrastructure/tracing/mod.ts";
Expand Down Expand Up @@ -1273,6 +1273,13 @@ export class WorkflowExecutionService {

const sortedJobs = this.sortService.sort(jobNodes);

// Resolve effective job-level concurrency:
// workflow.concurrency capped by SWAMP_MAX_CONCURRENT_STEPS
const jobConcurrency = resolveEffectiveConcurrency(
workflow.concurrency,
readGlobalConcurrencyLimit(),
);

// Execute jobs level by level
for (const level of sortedJobs.levels) {
// Merge parallel job generators within each level
Expand All @@ -1285,7 +1292,13 @@ export class WorkflowExecutionService {
stepOpts,
)
);
for await (const event of merge(jobStreams, options?.signal)) {
for await (
const event of mergeWithConcurrency(
jobStreams,
jobConcurrency,
options?.signal,
)
) {
yield event;
}
await this.saveRun(workflow.id, run);
Expand Down Expand Up @@ -1428,13 +1441,15 @@ export class WorkflowExecutionService {
}

const sortedSteps = this.sortService.sort(effectiveNodes);
const globalLimit = readGlobalConcurrencyLimit();

// Execute steps level by level
let jobFailed = false;
for (const level of sortedSteps.levels) {
if (jobFailed) break;

// Merge parallel step generators within each level
const stepConcurrencies: number[] = [];
const stepStreams = level.map((stepName) => {
// Find the expanded step info if applicable
let forEachVar: { name: string; value: unknown } | undefined;
Expand All @@ -1451,6 +1466,13 @@ export class WorkflowExecutionService {
}
}

// Collect step-level concurrency for this level
const stepConc = originalStep?.concurrency ??
job.getStep(stepName)?.concurrency;
if (stepConc && stepConc > 0) {
stepConcurrencies.push(stepConc);
}

return this.runStep(
workflow,
run,
Expand All @@ -1464,7 +1486,22 @@ export class WorkflowExecutionService {
);
});

for await (const event of merge(stepStreams, options.signal)) {
// Resolve: step (min across level) > job > workflow > global
const levelStepConc = stepConcurrencies.length > 0
? Math.min(...stepConcurrencies)
: undefined;
const stepConcurrency = resolveEffectiveConcurrency(
levelStepConc ?? job.concurrency ?? workflow.concurrency,
globalLimit,
);

for await (
const event of mergeWithConcurrency(
stepStreams,
stepConcurrency,
options.signal,
)
) {
yield event;
if (event.kind === "step_failed" && !event.allowedFailure) {
jobFailed = true;
Expand Down Expand Up @@ -1967,3 +2004,20 @@ export class WorkflowExecutionService {
}
}
}

function readGlobalConcurrencyLimit(): number | undefined {
const raw = Deno.env.get("SWAMP_MAX_CONCURRENT_STEPS");
if (!raw) return undefined;
const n = parseInt(raw, 10);
return Number.isFinite(n) && n > 0 ? n : undefined;
}

function resolveEffectiveConcurrency(
local: number | undefined,
global: number | undefined,
): number | undefined {
const l = local && local > 0 ? local : undefined;
const g = global && global > 0 ? global : undefined;
if (l && g) return Math.min(l, g);
return l ?? g;
}
6 changes: 6 additions & 0 deletions src/domain/workflows/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export const JobSchema = z.object({
steps: z.array(StepSchema).min(1),
dependsOn: z.array(JobDependencySchema).default([]),
weight: z.number().default(0),
concurrency: z.number().int().nonnegative().optional(),
driver: DriverFieldSchema,
driverConfig: DriverConfigFieldSchema,
});
Expand Down Expand Up @@ -82,6 +83,7 @@ export interface CreateJobProps {
steps: Step[];
dependsOn?: JobDependency[];
weight?: number;
concurrency?: number;
driver?: string;
driverConfig?: Record<string, unknown>;
}
Expand All @@ -103,6 +105,7 @@ export class Job {
private _steps: Step[],
private _dependsOn: JobDependency[],
readonly weight: number,
readonly concurrency: number | undefined,
readonly driver: string | undefined,
readonly driverConfig: Record<string, unknown> | undefined,
) {}
Expand All @@ -124,6 +127,7 @@ export class Job {
condition: d.condition.toData(),
})),
weight: props.weight ?? 0,
concurrency: props.concurrency,
driver: props.driver,
driverConfig: props.driverConfig,
});
Expand All @@ -148,6 +152,7 @@ export class Job {
steps,
dependsOn,
validated.weight,
validated.concurrency,
validated.driver,
validated.driverConfig,
);
Expand Down Expand Up @@ -194,6 +199,7 @@ export class Job {
condition: d.condition.toData() as TriggerConditionData,
})),
weight: this.weight,
concurrency: this.concurrency,
driver: this.driver,
driverConfig: this.driverConfig,
};
Expand Down
6 changes: 6 additions & 0 deletions src/domain/workflows/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export const StepSchema = z.object({
forEach: ForEachSchema.optional(),
dependsOn: z.array(StepDependencySchema).default([]),
weight: z.number().default(0),
concurrency: z.number().int().nonnegative().optional(),
dataOutputOverrides: z.array(DataOutputOverrideSchema).optional(),
allowFailure: z.boolean().default(false),
driver: DriverFieldSchema,
Expand Down Expand Up @@ -109,6 +110,7 @@ export interface CreateStepProps {
forEach?: ForEach;
dependsOn?: StepDependency[];
weight?: number;
concurrency?: number;
dataOutputOverrides?: DataOutputOverride[];
allowFailure?: boolean;
driver?: string;
Expand All @@ -134,6 +136,7 @@ export class Step {
readonly forEach: ForEach | undefined,
private _dependsOn: StepDependency[],
readonly weight: number,
readonly concurrency: number | undefined,
private _dataOutputOverrides: DataOutputOverride[],
readonly allowFailure: boolean,
readonly driver: string | undefined,
Expand All @@ -154,6 +157,7 @@ export class Step {
condition: d.condition.toData(),
})),
weight: props.weight ?? 0,
concurrency: props.concurrency,
dataOutputOverrides: props.dataOutputOverrides,
allowFailure: props.allowFailure ?? false,
driver: props.driver,
Expand Down Expand Up @@ -196,6 +200,7 @@ export class Step {
forEach,
dependsOn,
validated.weight,
validated.concurrency,
dataOutputOverrides,
validated.allowFailure,
validated.driver,
Expand Down Expand Up @@ -247,6 +252,7 @@ export class Step {
condition: d.condition.toData() as TriggerConditionData,
})),
weight: this.weight,
concurrency: this.concurrency,
dataOutputOverrides: this._dataOutputOverrides.length > 0
? this._dataOutputOverrides.map((override) => ({
specName: override.specName,
Expand Down
7 changes: 7 additions & 0 deletions src/domain/workflows/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export const WorkflowSchema = z.object({
inputs: InputsSchemaSchema,
jobs: z.array(JobSchema).min(1),
version: z.number().int().positive().default(1),
concurrency: z.number().int().nonnegative().optional(),
reports: ReportSelectionSchema,
driver: DriverFieldSchema,
driverConfig: DriverConfigFieldSchema,
Expand Down Expand Up @@ -101,6 +102,7 @@ export interface CreateWorkflowProps {
inputs?: InputsSchema;
jobs?: Job[];
version?: number;
concurrency?: number;
reports?: ReportSelection;
driver?: string;
driverConfig?: Record<string, unknown>;
Expand All @@ -126,6 +128,7 @@ export class Workflow {
readonly inputs: InputsSchema | undefined,
private _jobs: Job[],
readonly version: number,
readonly concurrency: number | undefined,
readonly reports: ReportSelection | undefined,
readonly driver: string | undefined,
readonly driverConfig: Record<string, unknown> | undefined,
Expand All @@ -150,6 +153,7 @@ export class Workflow {
inputs: props.inputs,
jobs: jobs.map((j) => j.toData()),
version,
concurrency: props.concurrency,
reports: props.reports,
driver: props.driver,
driverConfig: props.driverConfig,
Expand All @@ -172,6 +176,7 @@ export class Workflow {
data.inputs,
jobs,
data.version,
data.concurrency,
data.reports,
data.driver,
data.driverConfig,
Expand All @@ -194,6 +199,7 @@ export class Workflow {
validated.inputs,
jobs,
validated.version,
validated.concurrency,
validated.reports,
validated.driver,
validated.driverConfig,
Expand Down Expand Up @@ -251,6 +257,7 @@ export class Workflow {
inputs: this.inputs,
jobs: this._jobs.map((j) => j.toData()) as JobData[],
version: this.version,
concurrency: this.concurrency,
reports: this.reports,
driver: this.driver,
driverConfig: this.driverConfig,
Expand Down
Loading
Loading