Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions src/commands/test.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6694,6 +6694,103 @@ describe('runCreateBatch', () => {
).resolves.toBeDefined();
});

// Regression test: create-batch --run must keep launching new triggers
// up to --max-concurrency as slots free up, not collapse to serial
// after the first wave. Uses equal-delay trigger responses so the
// first three jobs settle in the same microtask batch, the exact
// condition that exposed the bug (race only reacts to one settlement,
// then blocks the scheduler on the whole next job before moving on).
it('--run keeps concurrency at --max-concurrency for tail jobs, not just the first wave', async () => {
const { credentialsPath } = makeCreds();
const specs = Array.from({ length: 6 }, (_, i) => ({ ...FE_SPEC, name: `spec-${i}` }));
const plansFile = writePlansJsonl(specs);
const CREATE_RESP = {
results: specs.map((_, i) => ({
specIndex: i,
testId: `test_tail_${i}`,
status: 'created' as const,
})),
summary: { total: 6, created: 6, failed: 0 },
};
const TRIGGER_DELAY_MS = 60;
const limit = 3;
let activeCount = 0;
let triggerCallIndex = 0;
const activeAtStart: number[] = [];

type FetchInput2 = Parameters<typeof globalThis.fetch>[0];
const fetchImpl = (async (input: FetchInput2) => {
const url =
typeof input === 'string'
? input
: input instanceof URL
? input.toString()
: (input as { url: string }).url;
if (url.includes('/tests/batch')) {
return new Response(JSON.stringify(CREATE_RESP), {
status: 200,
headers: { 'content-type': 'application/json' },
});
}
if (url.includes('/runs')) {
const callIdx = triggerCallIndex++;
activeCount++;
activeAtStart[callIdx] = activeCount;
await new Promise(resolve => setTimeout(resolve, TRIGGER_DELAY_MS));
activeCount--;
return new Response(
JSON.stringify({
runId: `run_tail_${callIdx}`,
status: 'queued' as const,
enqueuedAt: '2026-06-09T10:00:00.000Z',
codeVersion: 'v1',
targetUrl: 'https://example.com',
}),
{ status: 200, headers: { 'content-type': 'application/json' } },
);
}
return new Response(
JSON.stringify({
error: { code: 'NOT_FOUND', message: 'not found', nextAction: '', requestId: 'r1' },
}),
{ status: 404, headers: { 'content-type': 'application/json' } },
);
}) as typeof globalThis.fetch;

try {
await runCreateBatch(
{
profile: 'default',
output: 'json',
debug: false,
plans: plansFile,
run: true,
wait: false,
dryRun: false,
maxConcurrency: limit,
},
{
credentialsPath,
fetchImpl: fetchImpl as ReturnType<typeof makeFetch>,
stdout: () => undefined,
stderr: () => undefined,
},
);
} catch {
// CLIError exit 1 expected: trigger status is 'queued', not 'passed'.
}

expect(activeAtStart).toHaveLength(6);
// First wave fills up to the limit; true under the bug too.
expect(Math.max(...activeAtStart.slice(0, limit))).toBe(limit);
// Tail jobs (index >= limit) must ALSO reach the concurrency limit.
// Under the bug, the scheduler blocks on each whole job after the
// first wave, so every tail job launches alone (active === 1).
for (const snapshot of activeAtStart.slice(limit)) {
expect(snapshot).toBe(limit);
}
});

// Per codex round-1 P2: a 200 OK with `summary.created === 0` on a
// non-empty batch must not exit 0. Without this, a misconfigured
// batch job (every spec invalid) silently lands nothing in DDB while
Expand Down
59 changes: 23 additions & 36 deletions src/commands/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2753,43 +2753,30 @@ async function runBatchRun(
};
}

// Bounded concurrency fan-out using a semaphore pattern.
// We process testIds one slot at a time up to the concurrency limit.
// This avoids pulling in p-limit; the logic is simple enough inline.
const remaining = [...testIds];
const inFlight = new Set<Promise<CliBatchRunResult>>();

async function drainOne(): Promise<void> {
const testId = remaining.shift();
if (testId === undefined) return;
const p = triggerOne(testId).then(result => {
batchRunResults.push(result);
inFlight.delete(p);
return result;
});
inFlight.add(p);
await p;
}

// Fill up to concurrencyLimit slots, then drain one before adding
// each new testId so we never exceed the limit.
const initial = Math.min(concurrencyLimit, testIds.length);
const startPromises: Promise<void>[] = [];
for (let i = 0; i < initial; i++) {
startPromises.push(drainOne());
}
// Process remaining items as slots free up.
while (remaining.length > 0) {
// Wait for any in-flight slot to free up.
if (inFlight.size > 0) {
await Promise.race(inFlight);
}
if (remaining.length > 0 && inFlight.size < concurrencyLimit) {
await drainOne();
// Bounded concurrency fan-out: launch up to concurrencyLimit jobs, then
// launch the next one as each finishes. Mirrors the startNext() pattern
// used by the other fan-outs in this file (e.g. pollFreshAccepted below).
let nextIdx = 0;
let inFlight = 0;

await new Promise<void>((resolve, reject) => {
function startNext(): void {
while (inFlight < concurrencyLimit && nextIdx < testIds.length) {
const testId = testIds[nextIdx++]!;
inFlight++;
triggerOne(testId)
.then(result => {
batchRunResults.push(result);
inFlight--;
startNext();
if (inFlight === 0 && nextIdx >= testIds.length) resolve();
})
.catch(reject);
}
}
}
// Wait for all in-flight to finish.
await Promise.all([...inFlight, ...startPromises]);
startNext();
if (testIds.length === 0) resolve();
});

// Sort by testId order (same as input order for stable output).
batchRunResults.sort((a, b) => testIds.indexOf(a.testId) - testIds.indexOf(b.testId));
Expand Down