From 80fca48b046eabe43e3136cd7f5cf6fbaa241300 Mon Sep 17 00:00:00 2001 From: stack72 Date: Wed, 6 May 2026 18:37:55 +0100 Subject: [PATCH] feat(workflows): configurable concurrency limits for fan-out (swamp-club#260) Add an optional `concurrency` field at the workflow, job, and step levels to cap how many parallel units execute simultaneously. This prevents downstream API rate-limit failures and local resource exhaustion on large forEach fan-outs. A semaphore-gated `mergeWithConcurrency()` wraps the existing `merge()` stream combinator. When the limit is unset or exceeds the stream count, the unbounded `merge()` path is used with zero overhead. Resolution order: step > job > workflow > unbounded. A global `SWAMP_MAX_CONCURRENT_STEPS` env var provides a host-level ceiling. Closes swamp-club#260 Co-Authored-By: Claude Opus 4.6 (1M context) --- .claude/skills/swamp-workflow/SKILL.md | 27 ++++ .../references/expressions-and-foreach.md | 28 +++++ design/workflow.md | 44 ++++++- src/domain/workflows/execution_service.ts | 60 ++++++++- src/domain/workflows/job.ts | 6 + src/domain/workflows/step.ts | 6 + src/domain/workflows/workflow.ts | 7 ++ src/infrastructure/stream/merge.ts | 61 +++++++++ src/infrastructure/stream/merge_test.ts | 70 ++++++++++- src/infrastructure/stream/semaphore.ts | 83 +++++++++++++ src/infrastructure/stream/semaphore_test.ts | 117 ++++++++++++++++++ src/libswamp/stream/merge.ts | 5 +- 12 files changed, 507 insertions(+), 7 deletions(-) create mode 100644 src/infrastructure/stream/semaphore.ts create mode 100644 src/infrastructure/stream/semaphore_test.ts diff --git a/.claude/skills/swamp-workflow/SKILL.md b/.claude/skills/swamp-workflow/SKILL.md index f65c532b..ce9e6b45 100644 --- a/.claude/skills/swamp-workflow/SKILL.md +++ b/.claude/skills/swamp-workflow/SKILL.md @@ -434,6 +434,33 @@ swamp workflow evaluate --all --json - Vault expressions (`${{ vault.get(...) }}`) remain raw for runtime resolution - Output saved to `.swamp/workflows-evaluated/` for `--last-evaluated` use +## Concurrency Limits + +Add `concurrency: N` at the workflow, job, or step level to cap parallel +execution. Absent or `0` means unbounded. Resolution: step > job > workflow > +unbounded. A `SWAMP_MAX_CONCURRENT_STEPS` env var provides a host-level ceiling. +See +[references/expressions-and-foreach.md](references/expressions-and-foreach.md) +for forEach concurrency examples. + +```yaml +concurrency: 10 # workflow level — caps parallel jobs +jobs: + - name: fan-out + concurrency: 5 # job level — caps parallel steps + steps: + - name: per-item + forEach: + item: target + in: ${{ inputs.targets }} + concurrency: 3 # step level — caps forEach iterations + task: { + type: model_method, + modelIdOrName: api-client, + methodName: call, + } +``` + ## Allow Failure Steps can be marked with `allowFailure: true` so their failure does not fail the diff --git a/.claude/skills/swamp-workflow/references/expressions-and-foreach.md b/.claude/skills/swamp-workflow/references/expressions-and-foreach.md index 6be794c9..af75746a 100644 --- a/.claude/skills/swamp-workflow/references/expressions-and-foreach.md +++ b/.claude/skills/swamp-workflow/references/expressions-and-foreach.md @@ -107,6 +107,34 @@ through `task.inputs`. See [nested-workflows.md § When to Use Nested Workflows](nested-workflows.md#when-to-use-nested-workflows) for the full pattern. +### forEach with Concurrency Limits + +By default, all forEach iterations run in parallel. Add `concurrency` to cap +simultaneous execution — useful for rate-limited APIs or resource-constrained +hosts: + +```yaml +steps: + - name: call-${{ self.target }} + forEach: + item: target + in: ${{ inputs.targets }} + concurrency: 3 + task: + type: model_method + modelIdOrName: api-client + methodName: call + inputs: + target: ${{ self.target }} +``` + +With 10 targets and `concurrency: 3`, at most 3 iterations execute at once. The +remaining iterations queue until a permit is released. Resolution order: +`step → job → workflow → unbounded` — the most-local non-zero value wins. + +A global `SWAMP_MAX_CONCURRENT_STEPS` environment variable provides a host-level +ceiling: `min(local, global)` is the effective limit. + ### forEach with Vary Dimensions Use `vary` on `dataOutputOverrides` to isolate data per forEach iteration: diff --git a/design/workflow.md b/design/workflow.md index 3614d014..348eaae6 100644 --- a/design/workflow.md +++ b/design/workflow.md @@ -11,12 +11,52 @@ and only execute if their dependcy condition is met (for example, only run this job if one of its upstream dependencies fail). Within a job, steps are executed with a weighted topological sort, so that they -have maximum paralleism through the job. +have maximum parallelism through the job. Steps support an optional +`concurrency` field that caps how many steps in a topological level run +simultaneously — particularly useful for `forEach` expansions that hit +rate-limited APIs. Jobs can have dependencies on other jobs. The entire workflow is executed with a -weighted topological sort, so thtat htye have maximum paralleism through the +weighted topological sort, so that they have maximum parallelism through the workflow. Like steps, jobs also have conditions that trigger them. +## Concurrency Limits + +By default, all jobs in a topological level and all steps in a topological level +run concurrently (maximum parallelism). The optional `concurrency` field caps +the number of simultaneously executing units at each level: + +```yaml +concurrency: 10 # workflow level — caps parallel jobs + +jobs: + - name: fan-out + concurrency: 5 # job level — caps parallel steps in this job + steps: + - name: per-item + forEach: + item: target + in: ${{ inputs.targets }} + concurrency: 3 # step level — caps forEach iterations + task: { ... } +``` + +**Semantics:** + +- A positive integer is a hard cap on simultaneously executing units at that + level. +- `0` or absent means unbounded (current default behavior). +- Resolution order: step > job > workflow > unbounded. The most-local non-zero + value wins. +- A global `SWAMP_MAX_CONCURRENT_STEPS` environment variable provides a + host-level ceiling. The effective limit is `min(local, global)` when both are + set. + +Concurrency limiting is implemented via a semaphore-gated +`mergeWithConcurrency()` that wraps the existing `merge()` stream combinator. +When the limit is unset or exceeds the stream count, the unbounded `merge()` path +is used with zero overhead. + Workflows are specified in YAML files, that are validated with Zod, in the top-level `workflows/` directory of the repository, as `workflows/workflow-{uuid}.yaml`. Workflow run output is stored in the datastore diff --git a/src/domain/workflows/execution_service.ts b/src/domain/workflows/execution_service.ts index c0fb74af..bc14a683 100644 --- a/src/domain/workflows/execution_service.ts +++ b/src/domain/workflows/execution_service.ts @@ -89,7 +89,7 @@ import { import { join } from "@std/path"; import { SecretRedactor } from "../secrets/mod.ts"; import { VaultService } from "../vaults/vault_service.ts"; -import { merge } from "../../infrastructure/stream/merge.ts"; +import { mergeWithConcurrency } from "../../infrastructure/stream/merge.ts"; import { withEventBridge } from "../../infrastructure/stream/event_bridge.ts"; import type { ReportFilterOptions } from "../reports/report_execution_service.ts"; import { getTracer, SpanStatusCode } from "../../infrastructure/tracing/mod.ts"; @@ -1273,6 +1273,13 @@ export class WorkflowExecutionService { const sortedJobs = this.sortService.sort(jobNodes); + // Resolve effective job-level concurrency: + // workflow.concurrency capped by SWAMP_MAX_CONCURRENT_STEPS + const jobConcurrency = resolveEffectiveConcurrency( + workflow.concurrency, + readGlobalConcurrencyLimit(), + ); + // Execute jobs level by level for (const level of sortedJobs.levels) { // Merge parallel job generators within each level @@ -1285,7 +1292,13 @@ export class WorkflowExecutionService { stepOpts, ) ); - for await (const event of merge(jobStreams, options?.signal)) { + for await ( + const event of mergeWithConcurrency( + jobStreams, + jobConcurrency, + options?.signal, + ) + ) { yield event; } await this.saveRun(workflow.id, run); @@ -1428,6 +1441,7 @@ export class WorkflowExecutionService { } const sortedSteps = this.sortService.sort(effectiveNodes); + const globalLimit = readGlobalConcurrencyLimit(); // Execute steps level by level let jobFailed = false; @@ -1435,6 +1449,7 @@ export class WorkflowExecutionService { if (jobFailed) break; // Merge parallel step generators within each level + const stepConcurrencies: number[] = []; const stepStreams = level.map((stepName) => { // Find the expanded step info if applicable let forEachVar: { name: string; value: unknown } | undefined; @@ -1451,6 +1466,13 @@ export class WorkflowExecutionService { } } + // Collect step-level concurrency for this level + const stepConc = originalStep?.concurrency ?? + job.getStep(stepName)?.concurrency; + if (stepConc && stepConc > 0) { + stepConcurrencies.push(stepConc); + } + return this.runStep( workflow, run, @@ -1464,7 +1486,22 @@ export class WorkflowExecutionService { ); }); - for await (const event of merge(stepStreams, options.signal)) { + // Resolve: step (min across level) > job > workflow > global + const levelStepConc = stepConcurrencies.length > 0 + ? Math.min(...stepConcurrencies) + : undefined; + const stepConcurrency = resolveEffectiveConcurrency( + levelStepConc ?? job.concurrency ?? workflow.concurrency, + globalLimit, + ); + + for await ( + const event of mergeWithConcurrency( + stepStreams, + stepConcurrency, + options.signal, + ) + ) { yield event; if (event.kind === "step_failed" && !event.allowedFailure) { jobFailed = true; @@ -1967,3 +2004,20 @@ export class WorkflowExecutionService { } } } + +function readGlobalConcurrencyLimit(): number | undefined { + const raw = Deno.env.get("SWAMP_MAX_CONCURRENT_STEPS"); + if (!raw) return undefined; + const n = parseInt(raw, 10); + return Number.isFinite(n) && n > 0 ? n : undefined; +} + +function resolveEffectiveConcurrency( + local: number | undefined, + global: number | undefined, +): number | undefined { + const l = local && local > 0 ? local : undefined; + const g = global && global > 0 ? global : undefined; + if (l && g) return Math.min(l, g); + return l ?? g; +} diff --git a/src/domain/workflows/job.ts b/src/domain/workflows/job.ts index 2544536b..30e3f44e 100644 --- a/src/domain/workflows/job.ts +++ b/src/domain/workflows/job.ts @@ -51,6 +51,7 @@ export const JobSchema = z.object({ steps: z.array(StepSchema).min(1), dependsOn: z.array(JobDependencySchema).default([]), weight: z.number().default(0), + concurrency: z.number().int().nonnegative().optional(), driver: DriverFieldSchema, driverConfig: DriverConfigFieldSchema, }); @@ -82,6 +83,7 @@ export interface CreateJobProps { steps: Step[]; dependsOn?: JobDependency[]; weight?: number; + concurrency?: number; driver?: string; driverConfig?: Record; } @@ -103,6 +105,7 @@ export class Job { private _steps: Step[], private _dependsOn: JobDependency[], readonly weight: number, + readonly concurrency: number | undefined, readonly driver: string | undefined, readonly driverConfig: Record | undefined, ) {} @@ -124,6 +127,7 @@ export class Job { condition: d.condition.toData(), })), weight: props.weight ?? 0, + concurrency: props.concurrency, driver: props.driver, driverConfig: props.driverConfig, }); @@ -148,6 +152,7 @@ export class Job { steps, dependsOn, validated.weight, + validated.concurrency, validated.driver, validated.driverConfig, ); @@ -194,6 +199,7 @@ export class Job { condition: d.condition.toData() as TriggerConditionData, })), weight: this.weight, + concurrency: this.concurrency, driver: this.driver, driverConfig: this.driverConfig, }; diff --git a/src/domain/workflows/step.ts b/src/domain/workflows/step.ts index 878017c3..372f7a0c 100644 --- a/src/domain/workflows/step.ts +++ b/src/domain/workflows/step.ts @@ -67,6 +67,7 @@ export const StepSchema = z.object({ forEach: ForEachSchema.optional(), dependsOn: z.array(StepDependencySchema).default([]), weight: z.number().default(0), + concurrency: z.number().int().nonnegative().optional(), dataOutputOverrides: z.array(DataOutputOverrideSchema).optional(), allowFailure: z.boolean().default(false), driver: DriverFieldSchema, @@ -109,6 +110,7 @@ export interface CreateStepProps { forEach?: ForEach; dependsOn?: StepDependency[]; weight?: number; + concurrency?: number; dataOutputOverrides?: DataOutputOverride[]; allowFailure?: boolean; driver?: string; @@ -134,6 +136,7 @@ export class Step { readonly forEach: ForEach | undefined, private _dependsOn: StepDependency[], readonly weight: number, + readonly concurrency: number | undefined, private _dataOutputOverrides: DataOutputOverride[], readonly allowFailure: boolean, readonly driver: string | undefined, @@ -154,6 +157,7 @@ export class Step { condition: d.condition.toData(), })), weight: props.weight ?? 0, + concurrency: props.concurrency, dataOutputOverrides: props.dataOutputOverrides, allowFailure: props.allowFailure ?? false, driver: props.driver, @@ -196,6 +200,7 @@ export class Step { forEach, dependsOn, validated.weight, + validated.concurrency, dataOutputOverrides, validated.allowFailure, validated.driver, @@ -247,6 +252,7 @@ export class Step { condition: d.condition.toData() as TriggerConditionData, })), weight: this.weight, + concurrency: this.concurrency, dataOutputOverrides: this._dataOutputOverrides.length > 0 ? this._dataOutputOverrides.map((override) => ({ specName: override.specName, diff --git a/src/domain/workflows/workflow.ts b/src/domain/workflows/workflow.ts index 2c50615c..36ad15de 100644 --- a/src/domain/workflows/workflow.ts +++ b/src/domain/workflows/workflow.ts @@ -74,6 +74,7 @@ export const WorkflowSchema = z.object({ inputs: InputsSchemaSchema, jobs: z.array(JobSchema).min(1), version: z.number().int().positive().default(1), + concurrency: z.number().int().nonnegative().optional(), reports: ReportSelectionSchema, driver: DriverFieldSchema, driverConfig: DriverConfigFieldSchema, @@ -101,6 +102,7 @@ export interface CreateWorkflowProps { inputs?: InputsSchema; jobs?: Job[]; version?: number; + concurrency?: number; reports?: ReportSelection; driver?: string; driverConfig?: Record; @@ -126,6 +128,7 @@ export class Workflow { readonly inputs: InputsSchema | undefined, private _jobs: Job[], readonly version: number, + readonly concurrency: number | undefined, readonly reports: ReportSelection | undefined, readonly driver: string | undefined, readonly driverConfig: Record | undefined, @@ -150,6 +153,7 @@ export class Workflow { inputs: props.inputs, jobs: jobs.map((j) => j.toData()), version, + concurrency: props.concurrency, reports: props.reports, driver: props.driver, driverConfig: props.driverConfig, @@ -172,6 +176,7 @@ export class Workflow { data.inputs, jobs, data.version, + data.concurrency, data.reports, data.driver, data.driverConfig, @@ -194,6 +199,7 @@ export class Workflow { validated.inputs, jobs, validated.version, + validated.concurrency, validated.reports, validated.driver, validated.driverConfig, @@ -251,6 +257,7 @@ export class Workflow { inputs: this.inputs, jobs: this._jobs.map((j) => j.toData()) as JobData[], version: this.version, + concurrency: this.concurrency, reports: this.reports, driver: this.driver, driverConfig: this.driverConfig, diff --git a/src/infrastructure/stream/merge.ts b/src/infrastructure/stream/merge.ts index dcf3962f..08bb91bb 100644 --- a/src/infrastructure/stream/merge.ts +++ b/src/infrastructure/stream/merge.ts @@ -18,6 +18,7 @@ // along with Swamp. If not, see . import { AsyncQueue } from "./async_queue.ts"; +import { Semaphore } from "./semaphore.ts"; /** * Merges multiple async iterables into a single stream. @@ -80,3 +81,63 @@ export async function* merge( await Promise.allSettled(tasks); } } + +/** + * Concurrency-limited variant of {@link merge}. At most `limit` source + * streams drain concurrently; additional streams are queued until a permit + * is released. When `limit` is `undefined` or `0`, delegates to the + * unbounded {@link merge} — no semaphore overhead on the default path. + */ +export async function* mergeWithConcurrency( + streams: AsyncIterable[], + limit: number | undefined, + signal?: AbortSignal, +): AsyncGenerator { + if (!limit || limit <= 0 || limit >= streams.length) { + yield* merge(streams, signal); + return; + } + + const queue = new AsyncQueue(); + let remaining = streams.length; + const sem = new Semaphore(limit); + + let abortHandler: (() => void) | undefined; + if (signal) { + if (signal.aborted) return; + abortHandler = () => queue.abort(signal.reason); + signal.addEventListener("abort", abortHandler, { once: true }); + } + + const drainStream = async (stream: AsyncIterable) => { + try { + await sem.acquire(signal); + } catch { + return; + } + try { + for await (const item of stream) { + queue.push(item); + } + } catch { + // Silently handle errors from closed queue (abort scenario) + } finally { + sem.release(); + remaining--; + if (remaining === 0) { + queue.close(); + } + } + }; + + const tasks = streams.map((s) => drainStream(s)); + + try { + yield* queue; + } finally { + if (abortHandler && signal) { + signal.removeEventListener("abort", abortHandler); + } + await Promise.allSettled(tasks); + } +} diff --git a/src/infrastructure/stream/merge_test.ts b/src/infrastructure/stream/merge_test.ts index 0f5779d8..9f585c7c 100644 --- a/src/infrastructure/stream/merge_test.ts +++ b/src/infrastructure/stream/merge_test.ts @@ -18,7 +18,7 @@ // along with Swamp. If not, see . import { assertEquals } from "@std/assert"; -import { merge } from "./merge.ts"; +import { merge, mergeWithConcurrency } from "./merge.ts"; async function* fromArray(items: T[]): AsyncGenerator { for (const item of items) { @@ -96,3 +96,71 @@ Deno.test("merge with pre-aborted signal yields nothing", async () => { ], controller.signal)); assertEquals(items, []); }); + +// mergeWithConcurrency tests + +Deno.test("mergeWithConcurrency: undefined limit delegates to merge", async () => { + const items = await collect(mergeWithConcurrency([ + fromArray([1, 2]), + fromArray([3, 4]), + ], undefined)); + assertEquals(items.sort((a, b) => a - b), [1, 2, 3, 4]); +}); + +Deno.test("mergeWithConcurrency: zero limit delegates to merge", async () => { + const items = await collect(mergeWithConcurrency([ + fromArray(["a", "b"]), + fromArray(["c"]), + ], 0)); + assertEquals(items.sort(), ["a", "b", "c"]); +}); + +Deno.test("mergeWithConcurrency: limit >= streams delegates to merge", async () => { + const items = await collect(mergeWithConcurrency([ + fromArray([1]), + fromArray([2]), + ], 5)); + assertEquals(items.sort((a, b) => a - b), [1, 2]); +}); + +Deno.test("mergeWithConcurrency: collects all items", async () => { + const items = await collect(mergeWithConcurrency([ + fromArray([1, 2]), + fromArray([3, 4]), + fromArray([5, 6]), + fromArray([7, 8]), + ], 2)); + assertEquals(items.sort((a, b) => a - b), [1, 2, 3, 4, 5, 6, 7, 8]); +}); + +Deno.test("mergeWithConcurrency: limits actual concurrency", async () => { + let active = 0; + let maxActive = 0; + + async function* tracked(id: number): AsyncGenerator { + active++; + maxActive = Math.max(maxActive, active); + await new Promise((r) => setTimeout(r, 20)); + yield id; + active--; + } + + const streams = Array.from({ length: 6 }, (_, i) => tracked(i)); + const items = await collect(mergeWithConcurrency(streams, 2)); + assertEquals(items.length, 6); + assertEquals(maxActive, 2); +}); + +Deno.test("mergeWithConcurrency: with pre-aborted signal yields nothing", async () => { + const controller = new AbortController(); + controller.abort(); + const items = await collect(mergeWithConcurrency( + [ + fromArray([1, 2]), + fromArray([3, 4]), + ], + 1, + controller.signal, + )); + assertEquals(items, []); +}); diff --git a/src/infrastructure/stream/semaphore.ts b/src/infrastructure/stream/semaphore.ts new file mode 100644 index 00000000..178aa4eb --- /dev/null +++ b/src/infrastructure/stream/semaphore.ts @@ -0,0 +1,83 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +/** + * Counting semaphore for limiting concurrent async operations. + * + * Acquire a permit before starting work, release it when done. + * When all permits are taken, `acquire()` blocks until one is released. + */ +export class Semaphore { + private available: number; + private readonly waiters: { + resolve: () => void; + reject: (reason: unknown) => void; + }[] = []; + + constructor(readonly limit: number) { + if (limit < 1) { + throw new Error("Semaphore limit must be at least 1"); + } + this.available = limit; + } + + acquire(signal?: AbortSignal): Promise { + if (signal?.aborted) { + return Promise.reject( + signal.reason ?? new DOMException("Aborted", "AbortError"), + ); + } + + if (this.available > 0) { + this.available--; + return Promise.resolve(); + } + + return new Promise((resolve, reject) => { + const waiter = { resolve, reject }; + this.waiters.push(waiter); + + if (signal) { + const onAbort = () => { + const idx = this.waiters.indexOf(waiter); + if (idx !== -1) { + this.waiters.splice(idx, 1); + reject(signal.reason ?? new DOMException("Aborted", "AbortError")); + } + }; + signal.addEventListener("abort", onAbort, { once: true }); + + const origResolve = waiter.resolve; + waiter.resolve = () => { + signal.removeEventListener("abort", onAbort); + origResolve(); + }; + } + }); + } + + release(): void { + const next = this.waiters.shift(); + if (next) { + next.resolve(); + } else { + this.available++; + } + } +} diff --git a/src/infrastructure/stream/semaphore_test.ts b/src/infrastructure/stream/semaphore_test.ts new file mode 100644 index 00000000..90114fa9 --- /dev/null +++ b/src/infrastructure/stream/semaphore_test.ts @@ -0,0 +1,117 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +import { assertEquals, assertRejects, assertThrows } from "@std/assert"; +import { Semaphore } from "./semaphore.ts"; + +Deno.test("Semaphore: throws on limit < 1", () => { + assertThrows(() => new Semaphore(0), Error, "at least 1"); + assertThrows(() => new Semaphore(-1), Error, "at least 1"); +}); + +Deno.test("Semaphore: acquire within limit resolves immediately", async () => { + const sem = new Semaphore(2); + await sem.acquire(); + await sem.acquire(); +}); + +Deno.test("Semaphore: acquire beyond limit blocks until release", async () => { + const sem = new Semaphore(1); + await sem.acquire(); + + let acquired = false; + const pending = sem.acquire().then(() => { + acquired = true; + }); + + // Yield to let microtasks run — still blocked + await Promise.resolve(); + assertEquals(acquired, false); + + sem.release(); + await pending; + assertEquals(acquired, true); +}); + +Deno.test("Semaphore: tracks max concurrency correctly", async () => { + const sem = new Semaphore(3); + let active = 0; + let maxActive = 0; + + const work = async () => { + await sem.acquire(); + active++; + maxActive = Math.max(maxActive, active); + await new Promise((r) => setTimeout(r, 10)); + active--; + sem.release(); + }; + + await Promise.all(Array.from({ length: 10 }, () => work())); + assertEquals(maxActive, 3); +}); + +Deno.test("Semaphore: acquire with pre-aborted signal rejects", async () => { + const sem = new Semaphore(1); + const controller = new AbortController(); + controller.abort(); + await assertRejects( + () => sem.acquire(controller.signal), + DOMException, + ); +}); + +Deno.test("Semaphore: acquire aborted while waiting rejects", async () => { + const sem = new Semaphore(1); + await sem.acquire(); + + const controller = new AbortController(); + const pending = sem.acquire(controller.signal); + + controller.abort(); + await assertRejects( + () => pending, + DOMException, + ); + + sem.release(); +}); + +Deno.test("Semaphore: release after abort does not double-count", async () => { + const sem = new Semaphore(1); + await sem.acquire(); + + const controller = new AbortController(); + const p = sem.acquire(controller.signal).catch(() => {}); + controller.abort(); + await p; + + sem.release(); + + // Only one permit should be available now + await sem.acquire(); + let acquired = false; + const pending = sem.acquire().then(() => { + acquired = true; + }); + await Promise.resolve(); + assertEquals(acquired, false); + sem.release(); + await pending; +}); diff --git a/src/libswamp/stream/merge.ts b/src/libswamp/stream/merge.ts index 2a520041..68eb32a8 100644 --- a/src/libswamp/stream/merge.ts +++ b/src/libswamp/stream/merge.ts @@ -17,4 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with Swamp. If not, see . -export { merge } from "../../infrastructure/stream/merge.ts"; +export { + merge, + mergeWithConcurrency, +} from "../../infrastructure/stream/merge.ts";