From c59077cc2e895064f532f26e8080292f5caefa56 Mon Sep 17 00:00:00 2001 From: Rahul Joshi <186129212+crypticsaiyan@users.noreply.github.com> Date: Tue, 23 Jun 2026 22:47:51 +0530 Subject: [PATCH] fix(test): dedupe and serialize chunked batch-rerun dispatch Batch-rerun chunks (>50 testIds) were dispatched concurrently via Promise.all. The backend's producer/teardown closure dedup happens per-request, not across requests, so two concurrent chunks sharing a project's producer could each independently decide to trigger it, double-running the producer or teardown. Dispatch chunks sequentially in both the initial and deferred-retry paths, closing the race at the source. Also dedupe the aggregated accepted[] by testId and merge closure.byProject across chunks as a defensive second layer, warning on stderr if a duplicate trigger is detected. Fixed a pre-existing test whose fixture relied on the old double-counting behavior (same accepted entry returned from every retry call). --- src/commands/test.rerun.spec.ts | 177 +++++++++++++++++++++++++++++-- src/commands/test.ts | 178 ++++++++++++++++++++++++-------- 2 files changed, 300 insertions(+), 55 deletions(-) diff --git a/src/commands/test.rerun.spec.ts b/src/commands/test.rerun.spec.ts index e33634b..86c8e89 100644 --- a/src/commands/test.rerun.spec.ts +++ b/src/commands/test.rerun.spec.ts @@ -2431,6 +2431,159 @@ describe('[fix-D] --all resolves >50 tests: chunked batch requests, aggregated r // Exactly 50 → 1 request only expect(batchCallCount).toBe(1); }); + + // Regression: chunked batch-rerun dispatched chunks via Promise.all, so + // when --all resolves >50 tests every chunk's request was in flight at + // once. BE producer/teardown closure dedup happens per-request, so two + // concurrent chunks sharing a project's producer could each independently + // trigger it. Chunks must be dispatched strictly one at a time. + it('60 tests → 2 chunks are dispatched sequentially, not concurrently', async () => { + const creds = makeCreds(); + const allTests = Array.from({ length: 60 }, (_, i) => ({ + ...FE_TEST, + id: `test_seq_${String(i).padStart(3, '0')}`, + })); + const CHUNK_DELAY_MS = 40; + let activeBatchCalls = 0; + const activeAtStart: number[] = []; + + type FetchInput2 = Parameters[0]; + const fetchImpl = (async (input: FetchInput2, init: RequestInit = {}) => { + const url = + typeof input === 'string' + ? input + : input instanceof URL + ? input.toString() + : (input as { url: string }).url; + if (url.includes('/tests') && !url.includes('batch') && !url.includes('/runs')) { + return new Response(JSON.stringify({ items: allTests, nextToken: null }), { + status: 200, + headers: { 'content-type': 'application/json' }, + }); + } + if (url.includes('/tests/batch/rerun')) { + activeBatchCalls++; + activeAtStart.push(activeBatchCalls); + const body = JSON.parse(init.body as string) as { testIds: string[] }; + await new Promise(resolve => setTimeout(resolve, CHUNK_DELAY_MS)); + activeBatchCalls--; + const accepted = body.testIds.map(tid => ({ + testId: tid, + runId: `run_${tid}`, + enqueuedAt: '2026-06-03T10:00:00.000Z', + })); + return new Response( + JSON.stringify({ + accepted, + deferred: [], + conflicts: [], + closure: { byProject: [] }, + } satisfies BatchRerunResponse), + { status: 202, headers: { 'content-type': 'application/json' } }, + ); + } + return new Response( + JSON.stringify({ + error: { code: 'NOT_FOUND', message: 'not found', nextAction: '', requestId: 'r1' }, + }), + { status: 404, headers: { 'content-type': 'application/json' } }, + ); + }) as typeof globalThis.fetch; + + await runTestRerun( + { + testIds: [], + all: true, + projectId: 'project_abc', + wait: false, + timeoutSeconds: 600, + autoHeal: false, + autoHealExplicit: false, + skipDependencies: false, + maxConcurrency: 10, + output: 'json', + profile: 'default', + dryRun: false, + debug: false, + verbose: false, + }, + { ...creds, sleep: instantSleep, fetchImpl }, + ); + + expect(activeAtStart).toEqual([1, 1]); + }); + + // Regression: even with sequential dispatch, defend the CLI's own + // accounting against a shared BE producer/teardown coming back accepted + // from more than one chunk (a different runId each time). Duplicate + // testIds must be deduped, not double-counted or double-polled. + it('a testId accepted by two chunks is deduped, kept once, and warned about', async () => { + const creds = makeCreds(); + const allTests = Array.from({ length: 60 }, (_, i) => ({ + ...FE_TEST, + id: `test_dup_${String(i).padStart(3, '0')}`, + })); + let batchCallCount = 0; + const stderrLines: string[] = []; + + const fetchImpl = makeFetch((url, init) => { + if (url.includes('/tests') && !url.includes('batch') && !url.includes('/runs')) { + return { body: { items: allTests, nextToken: null } }; + } + if (url.includes('/tests/batch/rerun')) { + batchCallCount++; + const body = JSON.parse(init.body as string) as { testIds: string[] }; + const accepted = body.testIds.map(tid => ({ + testId: tid, + runId: `run_${tid}_call${batchCallCount}`, + enqueuedAt: '2026-06-03T10:00:00.000Z', + })); + // Simulate a shared BE producer (not one of the 60 selected ids) + // that both chunks' server-side closure expansion independently + // decided to trigger, each with its own runId. + accepted.push({ + testId: 'test_dup_producer', + runId: `run_producer_call${batchCallCount}`, + enqueuedAt: '2026-06-03T10:00:00.000Z', + }); + return { + status: 202, + body: { + accepted, + deferred: [], + conflicts: [], + closure: { byProject: [] }, + } satisfies BatchRerunResponse, + }; + } + return errorBody('NOT_FOUND'); + }); + + await runTestRerun( + { + testIds: [], + all: true, + projectId: 'project_abc', + wait: false, + timeoutSeconds: 600, + autoHeal: false, + autoHealExplicit: false, + skipDependencies: false, + maxConcurrency: 10, + output: 'json', + profile: 'default', + dryRun: false, + debug: false, + verbose: false, + }, + { ...creds, sleep: instantSleep, fetchImpl, stderr: line => stderrLines.push(line) }, + ); + + expect(batchCallCount).toBe(2); + expect( + stderrLines.some(l => l.includes('triggered more than once') && l.includes('1 test')), + ).toBe(true); + }); }); // --------------------------------------------------------------------------- @@ -3055,20 +3208,21 @@ describe('D3: batch rerun summary surfaces deferred + conflicts', () => { it('--wait JSON summary includes deferred/conflicts counts (no silent undercount)', async () => { const creds = makeCreds(); // Initial dispatch: 1 accepted, 1 deferred, 1 conflict. - // D3 retry loop will fire (opts.wait=true). All 3 retry attempts return the - // same deferred entry so `deferred` never drains, and each retry's `accepted` - // entry (same test_1 / run_b1) is merged in. After the loop, accepted has - // 4 entries (1 original + 3 retries) and deferred still has 1 entry. + // D3 retry loop will fire (opts.wait=true). The retry request only ever + // re-asks about the still-deferred testId (test_deferred), so a + // realistic retry response never re-returns test_1 as newly accepted. + // All 3 retry attempts keep returning the same deferred entry so + // `deferred` never drains. const initialBatchResp: BatchRerunResponse = { accepted: [{ testId: 'test_1', runId: 'run_b1', enqueuedAt: '2026-06-03T10:00:00.000Z' }], deferred: [{ testId: 'test_deferred', reason: 'rate_limited' }], conflicts: [{ testId: 'test_conf', currentRunId: 'run_conf' }], closure: { byProject: [] }, }; - // Retry responses: keep returning 1 deferred + 1 accepted (same run) so - // the loop exhausts MAX_DEFERRED_RETRIES and falls through. + // Retry responses: keep returning 1 deferred so the loop exhausts + // MAX_DEFERRED_RETRIES and falls through. No new accepted entries. const retryBatchResp: BatchRerunResponse = { - accepted: [{ testId: 'test_1', runId: 'run_b1', enqueuedAt: '2026-06-03T10:00:00.000Z' }], + accepted: [], deferred: [{ testId: 'test_deferred', reason: 'rate_limited' }], conflicts: [], closure: { byProject: [] }, @@ -3120,15 +3274,16 @@ describe('D3: batch rerun summary surfaces deferred + conflicts', () => { const withSummary = printed.find(p => p.summary); expect(withSummary).toBeDefined(); - // After D3 retries: accepted = 4 entries (same run_b1 merged 4 times); - // all 4 poll as passed. deferred = 1 (still undrained). conflicts = 1 (from initial). + // After D3 retries: accepted = 1 entry (test_1 from the initial dispatch; + // retries never re-return it). deferred = 1 (still undrained). conflicts + // = 1 (from initial). expect(withSummary!.summary).toMatchObject({ - passed: 4, + passed: 1, failed: 0, timedOut: 0, deferred: 1, conflicts: 1, - total: 4, + total: 1, }); }); }); diff --git a/src/commands/test.ts b/src/commands/test.ts index e254958..b625079 100644 --- a/src/commands/test.ts +++ b/src/commands/test.ts @@ -51,6 +51,7 @@ import type { RerunResponse, BatchRerunResponse, BatchRerunAccepted, + BatchRerunClosureByProject, RerunClosureMember, ListRunsResponse, RunHistoryItem, @@ -1700,6 +1701,65 @@ const MAX_BATCH_BODY_BYTES = 5 * 1024 * 1024; */ const MAX_BATCH_RERUN_IDS = 50; +/** + * Drop duplicate `testId` entries from a chunked batch-rerun's aggregated + * `accepted[]`, keeping the first occurrence. BE producer/teardown closure + * dedup happens per-request server-side, not across the separate requests + * one chunk per `MAX_BATCH_RERUN_IDS` window produces, so the same producer + * can come back accepted (with a different runId) from more than one + * chunk. Returns the deduped list plus how many entries were dropped, so + * the caller can warn the operator that a shared BE producer/teardown was + * triggered more than once. + */ +function dedupeBatchRerunAccepted(entries: BatchRerunAccepted[]): { + deduped: BatchRerunAccepted[]; + droppedCount: number; +} { + const seen = new Map(); + let droppedCount = 0; + for (const entry of entries) { + if (seen.has(entry.testId)) { + droppedCount++; + continue; + } + seen.set(entry.testId, entry); + } + return { deduped: [...seen.values()], droppedCount }; +} + +/** + * Merge per-project closure summaries from multiple batch-rerun chunk + * responses, combining entries that share a `projectId` rather than + * leaving one entry per chunk. `testIds` / `addedProducers` / + * `addedTeardowns` are unioned (a producer present in two chunks' entries + * for the same project, the closure-dedup race this fixes, must not be + * counted twice); `clearedCaptured` is summed, each chunk's expansion is a + * disjoint operation so its count is additive. + */ +function mergeBatchRerunClosureByProject( + entries: BatchRerunClosureByProject[], +): BatchRerunClosureByProject[] { + const byProject = new Map(); + for (const entry of entries) { + const existing = byProject.get(entry.projectId); + if (!existing) { + byProject.set(entry.projectId, { + projectId: entry.projectId, + testIds: [...new Set(entry.testIds)], + addedProducers: [...new Set(entry.addedProducers)], + addedTeardowns: [...new Set(entry.addedTeardowns)], + clearedCaptured: entry.clearedCaptured, + }); + continue; + } + existing.testIds = [...new Set([...existing.testIds, ...entry.testIds])]; + existing.addedProducers = [...new Set([...existing.addedProducers, ...entry.addedProducers])]; + existing.addedTeardowns = [...new Set([...existing.addedTeardowns, ...entry.addedTeardowns])]; + existing.clearedCaptured += entry.clearedCaptured; + } + return [...byProject.values()]; +} + /** * Default max in-flight run-triggers for `create-batch --run`. * @@ -6147,20 +6207,28 @@ export async function runTestRerun( let chunkResponses: BatchRerunResponse[]; try { - chunkResponses = await Promise.all( - chunks.map((chunk, idx) => { - const chunkKey = chunks.length === 1 ? idempotencyKey : `${idempotencyKey}:chunk${idx}`; - return client.triggerBatchRerun( - { - source: 'cli', - testIds: chunk, - ...(effectiveAutoHeal ? { autoHeal: true } : {}), - ...(opts.skipDependencies ? { skipDependencies: true } : {}), - }, - { idempotencyKey: chunkKey }, - ); - }), - ); + // Dispatch chunks one at a time, NOT via Promise.all. BE producer/ + // teardown closure dedup happens per-request, server-side. Two chunks + // that share a project's producer fired concurrently can each decide + // independently "this producer hasn't been added yet" and both trigger + // it, double-running the producer. Sequential dispatch closes that + // race: by the time chunk N is sent, chunk N-1's trigger has already + // landed server-side for it to dedup against. + chunkResponses = []; + for (let idx = 0; idx < chunks.length; idx++) { + const chunk = chunks[idx]!; + const chunkKey = chunks.length === 1 ? idempotencyKey : `${idempotencyKey}:chunk${idx}`; + const chunkResp = await client.triggerBatchRerun( + { + source: 'cli', + testIds: chunk, + ...(effectiveAutoHeal ? { autoHeal: true } : {}), + ...(opts.skipDependencies ? { skipDependencies: true } : {}), + }, + { idempotencyKey: chunkKey }, + ); + chunkResponses.push(chunkResp); + } } catch (err) { // D2 (dogfood): the batch endpoint rejects the WHOLE request when any id is // unresolvable (unknown, cross-tenant, or never ran cleanly), so one bad id @@ -6183,12 +6251,24 @@ export async function runTestRerun( } // Aggregate chunk responses into a single synthetic BatchRerunResponse. + // `accepted` is deduped by testId (defense in depth: even with sequential + // dispatch above, a shared producer/teardown should never be reported, or + // polled under --wait, more than once) and `closure.byProject` entries + // sharing a projectId are merged rather than left as separate per-chunk + // entries. + const { deduped: dedupedAccepted, droppedCount: duplicateAcceptedCount } = + dedupeBatchRerunAccepted(chunkResponses.flatMap(r => r.accepted)); + if (duplicateAcceptedCount > 0) { + stderrFn( + `[warn] ${duplicateAcceptedCount} test${duplicateAcceptedCount !== 1 ? 's were' : ' was'} triggered more than once across chunked batch-rerun requests (shared BE producer/teardown); kept the first run, ignored the rest.`, + ); + } const batchResp: BatchRerunResponse = { - accepted: chunkResponses.flatMap(r => r.accepted), + accepted: dedupedAccepted, deferred: chunkResponses.flatMap(r => r.deferred), conflicts: chunkResponses.flatMap(r => r.conflicts), closure: { - byProject: chunkResponses.flatMap(r => r.closure.byProject), + byProject: mergeBatchRerunClosureByProject(chunkResponses.flatMap(r => r.closure.byProject)), }, notFound: chunkResponses.flatMap(r => r.notFound ?? []), }; @@ -6284,32 +6364,36 @@ export async function runTestRerun( let retryChunkResponses: BatchRerunResponse[]; try { - retryChunkResponses = await Promise.all( - retryChunks.map((chunk, idx) => { - // [P2] Bound the derived key to ≤256 chars. Caller-supplied keys may - // be up to 256 chars; appending the suffix could exceed the server - // limit and cause every retry to be rejected. Truncate the base key - // to leave room for the longest possible suffix before concatenating. - const retrySuffix = - retryChunks.length === 1 - ? `:deferred-retry${attempt}` - : `:deferred-retry${attempt}:chunk${idx}`; - const retryBase = - idempotencyKey.length + retrySuffix.length > 256 - ? idempotencyKey.slice(0, 256 - retrySuffix.length) - : idempotencyKey; - const retryKey = `${retryBase}${retrySuffix}`; - return client.triggerBatchRerun( - { - source: 'cli', - testIds: chunk, - ...(effectiveAutoHeal ? { autoHeal: true } : {}), - ...(opts.skipDependencies ? { skipDependencies: true } : {}), - }, - { idempotencyKey: retryKey }, - ); - }), - ); + // Sequential, same reason as the initial dispatch above: concurrent + // chunks racing on per-request server-side closure dedup can + // double-trigger a shared BE producer/teardown. + retryChunkResponses = []; + for (let idx = 0; idx < retryChunks.length; idx++) { + const chunk = retryChunks[idx]!; + // [P2] Bound the derived key to ≤256 chars. Caller-supplied keys may + // be up to 256 chars; appending the suffix could exceed the server + // limit and cause every retry to be rejected. Truncate the base key + // to leave room for the longest possible suffix before concatenating. + const retrySuffix = + retryChunks.length === 1 + ? `:deferred-retry${attempt}` + : `:deferred-retry${attempt}:chunk${idx}`; + const retryBase = + idempotencyKey.length + retrySuffix.length > 256 + ? idempotencyKey.slice(0, 256 - retrySuffix.length) + : idempotencyKey; + const retryKey = `${retryBase}${retrySuffix}`; + const retryChunkResp = await client.triggerBatchRerun( + { + source: 'cli', + testIds: chunk, + ...(effectiveAutoHeal ? { autoHeal: true } : {}), + ...(opts.skipDependencies ? { skipDependencies: true } : {}), + }, + { idempotencyKey: retryKey }, + ); + retryChunkResponses.push(retryChunkResp); + } } catch (err) { stderrFn( `[deferred-retry] attempt ${attempt} failed with error: ${err instanceof Error ? err.message : String(err)}`, @@ -6317,7 +6401,8 @@ export async function runTestRerun( break; } - const newlyAccepted = retryChunkResponses.flatMap(r => r.accepted); + const { deduped: newlyAccepted, droppedCount: newlyDuplicateCount } = + dedupeBatchRerunAccepted(retryChunkResponses.flatMap(r => r.accepted)); const newlyDeferred = retryChunkResponses.flatMap(r => r.deferred); const newlyConflicted = retryChunkResponses.flatMap(r => r.conflicts); // [P2] Collect notFound[] from the retry response. A deferred test may be @@ -6326,11 +6411,16 @@ export async function runTestRerun( // reported as "resolved" in the final output. const newlyNotFound = retryChunkResponses.flatMap(r => r.notFound ?? []); + if (newlyDuplicateCount > 0) { + stderrFn( + `[warn] ${newlyDuplicateCount} test${newlyDuplicateCount !== 1 ? 's were' : ' was'} triggered more than once across deferred-retry chunked requests (shared BE producer/teardown); kept the first run, ignored the rest.`, + ); + } if (newlyAccepted.length > 0) { stderrFn( `[deferred-retry] attempt ${attempt}: ${newlyAccepted.length} test${newlyAccepted.length !== 1 ? 's' : ''} now accepted.`, ); - accepted = accepted.concat(newlyAccepted); + accepted = dedupeBatchRerunAccepted(accepted.concat(newlyAccepted)).deduped; } if (newlyConflicted.length > 0) { // [P1] Merge retry-returned conflicts into the running conflicts collection