Skip to content

feat(workflows): configurable concurrency limits for fan-out (swamp-club#260)#1323

Merged
stack72 merged 1 commit intomainfrom
worktree-260
May 6, 2026
Merged

feat(workflows): configurable concurrency limits for fan-out (swamp-club#260)#1323
stack72 merged 1 commit intomainfrom
worktree-260

Conversation

@stack72
Copy link
Copy Markdown
Contributor

@stack72 stack72 commented May 6, 2026

Summary

  • Add optional concurrency field at workflow, job, and step levels to cap parallel execution in fan-out scenarios (forEach, parallel jobs/steps)
  • Implement Semaphore primitive and mergeWithConcurrency() stream combinator that wraps existing merge() with zero overhead on the unbounded path
  • Support SWAMP_MAX_CONCURRENT_STEPS env var as a host-level ceiling (min(local, global))

Details

Resolution order: step → job → workflow → unbounded. The most-local non-zero value wins. 0 or absent means unbounded (full backward compatibility).

Verified end-to-end: a 10-item forEach with concurrency: 3 correctly batches execution into groups of 3 (max concurrent = 3, ~8s total vs ~2s unbounded).

Files changed

Area Files
Infrastructure semaphore.ts (new), merge.ts (+mergeWithConcurrency), libswamp re-export
Domain schemas workflow.ts, job.ts, step.ts — optional concurrency field
Execution execution_service.ts — concurrency resolution + gated merge at both job and step levels
Design doc design/workflow.md — concurrency semantics section
Skill swamp-workflow SKILL.md + forEach reference
Tests 7 semaphore tests, 7 mergeWithConcurrency tests, 5541 total pass

Follow-up issues

  • swamp-club#266 — Docs: document concurrency limits in reference manual
  • systeminit/swamp-uat#192 — UAT: adversarial test for workflow concurrency limits

Closes swamp-club#260

Test plan

  • deno check — zero type errors
  • deno lint — clean
  • deno fmt — clean
  • deno run test — 5541 passed, 0 failed
  • deno run compile — binary compiled
  • End-to-end verification: concurrency: 3 caps 10-item fan-out (max concurrent = 3, batched in ~2s intervals)
  • Backward compatibility: workflows without concurrency field behave identically

🤖 Generated with Claude Code

…lub#260)

Add an optional `concurrency` field at the workflow, job, and step levels to
cap how many parallel units execute simultaneously. This prevents downstream
API rate-limit failures and local resource exhaustion on large forEach
fan-outs.

A semaphore-gated `mergeWithConcurrency()` wraps the existing `merge()`
stream combinator. When the limit is unset or exceeds the stream count,
the unbounded `merge()` path is used with zero overhead.

Resolution order: step > job > workflow > unbounded. A global
`SWAMP_MAX_CONCURRENT_STEPS` env var provides a host-level ceiling.

Closes swamp-club#260

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CLI UX Review

Blocking

None.

Suggestions

  1. SWAMP_MAX_CONCURRENT_STEPS not mentioned in workflow run help text. The existing convention for env vars that back CLI options is to document them inline (e.g. "Repository directory (env: SWAMP_REPO_DIR)"). SWAMP_MAX_CONCURRENT_STEPS has no corresponding CLI flag, so the inline pattern doesn't quite apply — and SWAMP_DATASTORE_SYNC_TIMEOUT_MS (another operator-scoped knob) is similarly undocumented in help text. Still, a --help mention or a description addendum on workflow run like "Concurrency capped by SWAMP_MAX_CONCURRENT_STEPS if set" would help operators discover it without reading design docs. Low priority given the precedent.

Verdict

PASS — no commands, flags, renderers, or error messages changed. The concurrency field is optional and exposed automatically via swamp workflow schema get. Backward compatibility is intact (absent/0 = unbounded). Ready to merge.

Copy link
Copy Markdown

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

Clean, well-structured PR. The semaphore primitive is correct, the merge combinator properly falls through to the unbounded path, and the domain model changes are minimal and additive. DDD layer placement is sound — infrastructure primitives in infrastructure/stream/, configuration fields on domain entities.

Blocking Issues

None.

Suggestions

  1. src/libswamp/mod.ts doesn't re-export mergeWithConcurrency — Line 39 re-exports merge from ./stream/merge.ts, but the new mergeWithConcurrency (added to src/libswamp/stream/merge.ts) isn't re-exported from the barrel. Nothing currently needs it from the barrel, but for consistency with the existing merge export, consider adding it.

  2. resolveEffectiveConcurrency test coverage — The infrastructure layer (semaphore, mergeWithConcurrency) is well-tested, but the resolution logic in execution_service.ts — particularly the step > job > workflow > global cascade and the min() across step concurrencies in a level — encodes important business rules that would benefit from targeted unit tests. Even a few cases for resolveEffectiveConcurrency(local, global) as a standalone function (or exposed via a test helper) would lock down the edge cases (e.g., both set, one zero, both undefined).

  3. Minor: readGlobalConcurrencyLimit() called per-execution — This reads Deno.env.get() directly in domain code. The codebase has this pattern elsewhere (SWAMP_DEBUG, SYNC_TIMEOUT_ENV_VAR), so it's consistent. Just noting that in strict DDD this would be injected configuration rather than a direct env read. No action needed.

Copy link
Copy Markdown

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adversarial Review

Critical / High

None found. The Semaphore, mergeWithConcurrency, and resolution logic are well-constructed. Abort paths, resource cleanup, and counter management all hold up under trace analysis.

Medium

  1. Step concurrency "leaks" to unrelated steps in the same topological levelexecution_service.ts:1489-1496

    When a topological level contains steps from multiple original step definitions with different concurrency values, the code takes Math.min(...stepConcurrencies) and applies that single limit to the entire level's merged stream. This means a step with concurrency: 10 gets throttled to 2 if another unrelated step in the same level has concurrency: 2.

    Example: Step A (forEach over 20 items, concurrency: 2) and Step B (forEach over 5 items, concurrency: 10) are independent and land in the same topological level. The effective limit is min(2, 10) = 2, so Step B is throttled far below its configured limit. The user set concurrency: 10 on Step B but gets 2.

    Mitigation: In practice, forEach expansions from different steps rarely share a topological level (they'd need zero inter-dependencies). The conservative min is safe — it over-restricts rather than under-restricts. But the documented "most-local non-zero value wins" doesn't capture this cross-step interaction. Worth documenting or addressing in a follow-up.

Low

  1. Semaphore has no over-release guardsemaphore.ts:75-82. Calling release() without a matching acquire() would push available above limit, silently expanding capacity. Not triggerable by the current mergeWithConcurrency call site (release is only in the finally block after a successful acquire), but a latent footgun for future callers.

  2. mergeWithConcurrency abort path leaves remaining > 0merge.ts:112-131. Tasks that fail sem.acquire(signal) during abort return early without decrementing remaining, so remaining never reaches 0. This is benign because the abort handler has already called queue.close() via queue.abort(), so the consumer has exited and Promise.allSettled(tasks) correctly waits for all tasks to settle. No leak or deadlock, just worth noting the invariant difference from the unbounded merge().

  3. SWAMP_MAX_CONCURRENT_STEPS parsing is lenientexecution_service.ts:2007. parseInt("3abc", 10) returns 3. Acceptable for an env var, but a typo like "3 0" (intended 30) would silently become 3. No action needed — consistent with how most CLI tools parse env vars.

Verdict

PASS — The semaphore and concurrency-limited merge are correctly implemented with proper abort handling, resource cleanup, and backward compatibility. The schema changes are additive and optional. The resolution logic matches the documented semantics. The medium finding is a design nuance worth tracking but not blocking.

@stack72 stack72 merged commit 58bede5 into main May 6, 2026
21 of 22 checks passed
@stack72 stack72 deleted the worktree-260 branch May 6, 2026 18:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant