From fc928772ae22e3d294f98fab9b990afb4d1ae9b5 Mon Sep 17 00:00:00 2001 From: Rahul Joshi <186129212+crypticsaiyan@users.noreply.github.com> Date: Tue, 23 Jun 2026 15:28:43 +0530 Subject: [PATCH] fix(test): batch-run scheduler serializes after first wave create-batch --run launched its first concurrencyLimit triggers in parallel, but the steady-state loop awaited each subsequent job to fully finish (trigger + full --wait poll) before launching the next one. Effective concurrency dropped to 1 after the initial wave regardless of --max-concurrency. Switch to the launch-then-race pattern already used by the other three fan-outs in this file: launch up to the limit, relaunch on each completion via startNext(), never await a whole job inline. Add a regression test using equal-delay trigger responses so the first wave settles in the same microtask batch, which is the exact condition that exposed the bug. --- src/commands/test.test.ts | 97 +++++++++++++++++++++++++++++++++++++++ src/commands/test.ts | 59 ++++++++++-------------- 2 files changed, 120 insertions(+), 36 deletions(-) diff --git a/src/commands/test.test.ts b/src/commands/test.test.ts index 6921d57..3714db5 100644 --- a/src/commands/test.test.ts +++ b/src/commands/test.test.ts @@ -6694,6 +6694,103 @@ describe('runCreateBatch', () => { ).resolves.toBeDefined(); }); + // Regression test: create-batch --run must keep launching new triggers + // up to --max-concurrency as slots free up, not collapse to serial + // after the first wave. Uses equal-delay trigger responses so the + // first three jobs settle in the same microtask batch, the exact + // condition that exposed the bug (race only reacts to one settlement, + // then blocks the scheduler on the whole next job before moving on). + it('--run keeps concurrency at --max-concurrency for tail jobs, not just the first wave', async () => { + const { credentialsPath } = makeCreds(); + const specs = Array.from({ length: 6 }, (_, i) => ({ ...FE_SPEC, name: `spec-${i}` })); + const plansFile = writePlansJsonl(specs); + const CREATE_RESP = { + results: specs.map((_, i) => ({ + specIndex: i, + testId: `test_tail_${i}`, + status: 'created' as const, + })), + summary: { total: 6, created: 6, failed: 0 }, + }; + const TRIGGER_DELAY_MS = 60; + const limit = 3; + let activeCount = 0; + let triggerCallIndex = 0; + const activeAtStart: number[] = []; + + type FetchInput2 = Parameters[0]; + const fetchImpl = (async (input: FetchInput2) => { + const url = + typeof input === 'string' + ? input + : input instanceof URL + ? input.toString() + : (input as { url: string }).url; + if (url.includes('/tests/batch')) { + return new Response(JSON.stringify(CREATE_RESP), { + status: 200, + headers: { 'content-type': 'application/json' }, + }); + } + if (url.includes('/runs')) { + const callIdx = triggerCallIndex++; + activeCount++; + activeAtStart[callIdx] = activeCount; + await new Promise(resolve => setTimeout(resolve, TRIGGER_DELAY_MS)); + activeCount--; + return new Response( + JSON.stringify({ + runId: `run_tail_${callIdx}`, + status: 'queued' as const, + enqueuedAt: '2026-06-09T10:00:00.000Z', + codeVersion: 'v1', + targetUrl: 'https://example.com', + }), + { status: 200, headers: { 'content-type': 'application/json' } }, + ); + } + return new Response( + JSON.stringify({ + error: { code: 'NOT_FOUND', message: 'not found', nextAction: '', requestId: 'r1' }, + }), + { status: 404, headers: { 'content-type': 'application/json' } }, + ); + }) as typeof globalThis.fetch; + + try { + await runCreateBatch( + { + profile: 'default', + output: 'json', + debug: false, + plans: plansFile, + run: true, + wait: false, + dryRun: false, + maxConcurrency: limit, + }, + { + credentialsPath, + fetchImpl: fetchImpl as ReturnType, + stdout: () => undefined, + stderr: () => undefined, + }, + ); + } catch { + // CLIError exit 1 expected: trigger status is 'queued', not 'passed'. + } + + expect(activeAtStart).toHaveLength(6); + // First wave fills up to the limit; true under the bug too. + expect(Math.max(...activeAtStart.slice(0, limit))).toBe(limit); + // Tail jobs (index >= limit) must ALSO reach the concurrency limit. + // Under the bug, the scheduler blocks on each whole job after the + // first wave, so every tail job launches alone (active === 1). + for (const snapshot of activeAtStart.slice(limit)) { + expect(snapshot).toBe(limit); + } + }); + // Per codex round-1 P2: a 200 OK with `summary.created === 0` on a // non-empty batch must not exit 0. Without this, a misconfigured // batch job (every spec invalid) silently lands nothing in DDB while diff --git a/src/commands/test.ts b/src/commands/test.ts index e254958..3fea0b0 100644 --- a/src/commands/test.ts +++ b/src/commands/test.ts @@ -2753,43 +2753,30 @@ async function runBatchRun( }; } - // Bounded concurrency fan-out using a semaphore pattern. - // We process testIds one slot at a time up to the concurrency limit. - // This avoids pulling in p-limit; the logic is simple enough inline. - const remaining = [...testIds]; - const inFlight = new Set>(); - - async function drainOne(): Promise { - const testId = remaining.shift(); - if (testId === undefined) return; - const p = triggerOne(testId).then(result => { - batchRunResults.push(result); - inFlight.delete(p); - return result; - }); - inFlight.add(p); - await p; - } - - // Fill up to concurrencyLimit slots, then drain one before adding - // each new testId so we never exceed the limit. - const initial = Math.min(concurrencyLimit, testIds.length); - const startPromises: Promise[] = []; - for (let i = 0; i < initial; i++) { - startPromises.push(drainOne()); - } - // Process remaining items as slots free up. - while (remaining.length > 0) { - // Wait for any in-flight slot to free up. - if (inFlight.size > 0) { - await Promise.race(inFlight); - } - if (remaining.length > 0 && inFlight.size < concurrencyLimit) { - await drainOne(); + // Bounded concurrency fan-out: launch up to concurrencyLimit jobs, then + // launch the next one as each finishes. Mirrors the startNext() pattern + // used by the other fan-outs in this file (e.g. pollFreshAccepted below). + let nextIdx = 0; + let inFlight = 0; + + await new Promise((resolve, reject) => { + function startNext(): void { + while (inFlight < concurrencyLimit && nextIdx < testIds.length) { + const testId = testIds[nextIdx++]!; + inFlight++; + triggerOne(testId) + .then(result => { + batchRunResults.push(result); + inFlight--; + startNext(); + if (inFlight === 0 && nextIdx >= testIds.length) resolve(); + }) + .catch(reject); + } } - } - // Wait for all in-flight to finish. - await Promise.all([...inFlight, ...startPromises]); + startNext(); + if (testIds.length === 0) resolve(); + }); // Sort by testId order (same as input order for stable output). batchRunResults.sort((a, b) => testIds.indexOf(a.testId) - testIds.indexOf(b.testId));