From d3da7ac47c95bda7c6c9b445e4fdf7979b5a8d85 Mon Sep 17 00:00:00 2001 From: Rahul Joshi <186129212+crypticsaiyan@users.noreply.github.com> Date: Wed, 24 Jun 2026 17:51:35 +0530 Subject: [PATCH] fix(bundle): add retry loop to streamUrlToFile for transport failures --- src/lib/bundle.test.ts | 97 +++++++++++++++++++++++++++++++++ src/lib/bundle.ts | 120 ++++++++++++++++++++++++++--------------- 2 files changed, 174 insertions(+), 43 deletions(-) diff --git a/src/lib/bundle.test.ts b/src/lib/bundle.test.ts index 102208c..7319322 100644 --- a/src/lib/bundle.test.ts +++ b/src/lib/bundle.test.ts @@ -8,6 +8,9 @@ * the full http+fetch path is wired against MSW). */ +import { mkdtempSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; import { describe, expect, it } from 'vitest'; import { applyFailedOnly, @@ -16,6 +19,8 @@ import { buildMeta, pickCodeExtension, resolveBundleDir, + STREAM_URL_MAX_RETRIES, + streamUrlToFile, stepFilenamePrefix, type AssertContextIntegrityOptions, } from './bundle.js'; @@ -592,3 +597,95 @@ describe('resolveBundleDir', () => { expect(out).toBe('/tmp/x'); }); }); + +describe('streamUrlToFile retry', () => { + const noSleep = () => Promise.resolve(); + + it('succeeds on the first attempt: file written, fetchImpl called once', async () => { + const dir = mkdtempSync(join(tmpdir(), 'stream-test-')); + const dest = join(dir, 'out.bin'); + let calls = 0; + const fetchImpl = async () => { + calls++; + return new Response('hello', { status: 200 }); + }; + await streamUrlToFile('https://example.com/x', dest, fetchImpl as typeof globalThis.fetch, { + sleep: noSleep, + }); + expect(calls).toBe(1); + }); + + it('retries on transport error and succeeds: fetchImpl called twice, file written', async () => { + const dir = mkdtempSync(join(tmpdir(), 'stream-test-')); + const dest = join(dir, 'out.bin'); + let calls = 0; + const fetchImpl = async () => { + calls++; + if (calls === 1) throw new Error('ECONNRESET socket hang up'); + return new Response('retried-content', { status: 200 }); + }; + await streamUrlToFile('https://example.com/x', dest, fetchImpl as typeof globalThis.fetch, { + sleep: noSleep, + }); + expect(calls).toBe(2); + }); + + it('throws TransportError after all retries exhausted', async () => { + let calls = 0; + const fetchImpl = async () => { + calls++; + throw new Error('ENETUNREACH dns lookup failed'); + }; + await expect( + streamUrlToFile( + 'https://example.com/x', + '/tmp/will-not-be-written', + fetchImpl as typeof globalThis.fetch, + { sleep: noSleep }, + ), + ).rejects.toMatchObject({ + name: 'TransportError', + message: expect.stringContaining('ENETUNREACH'), + }); + expect(calls).toBe(STREAM_URL_MAX_RETRIES); + }); + + it('does NOT retry a non-2xx HTTP response (expired presigned URL)', async () => { + let calls = 0; + const fetchImpl = async () => { + calls++; + return new Response('Forbidden', { status: 403 }); + }; + await expect( + streamUrlToFile( + 'https://example.com/x', + '/tmp/will-not-be-written', + fetchImpl as typeof globalThis.fetch, + { sleep: noSleep }, + ), + ).rejects.toMatchObject({ code: 'UNAVAILABLE' }); + expect(calls).toBe(1); + }); + + it('sleeps between retries', async () => { + const sleepDelays: number[] = []; + const fetchImpl = async () => { + throw new Error('flaky'); + }; + await expect( + streamUrlToFile( + 'https://example.com/x', + '/tmp/will-not-be-written', + fetchImpl as typeof globalThis.fetch, + { + sleep: ms => { + sleepDelays.push(ms); + return Promise.resolve(); + }, + }, + ), + ).rejects.toThrow(); + expect(sleepDelays).toHaveLength(STREAM_URL_MAX_RETRIES - 1); + expect(sleepDelays.every(d => d > 0)).toBe(true); + }); +}); diff --git a/src/lib/bundle.ts b/src/lib/bundle.ts index d50948c..ff64f66 100644 --- a/src/lib/bundle.ts +++ b/src/lib/bundle.ts @@ -46,6 +46,11 @@ import type { FetchImpl } from './http.js'; /** Schema version stamped into `meta.json`. Bumps with the contract. */ export const BUNDLE_SCHEMA_VERSION = 'cli-v1' as const; +/** Max fetch attempts for each presigned URL (initial + retries). */ +export const STREAM_URL_MAX_RETRIES = 3; +/** Delay between retries for transient transport failures (ms). */ +export const STREAM_URL_RETRY_DELAY_MS = 1000; + /** Default radius around the failed step when `--failed-only` is set. */ export const FAILED_ONLY_RADIUS = 1; @@ -695,55 +700,84 @@ function sidecarExtension(kind: 'log' | 'network' | 'console' | 'screenshot' | ' * the upstream reader rather than buffering chunks in V8 heap. This * matters for video files (multi-MB) and for very large HTML * snapshots. + * + * Transport failures (network reset, DNS blip, mid-stream EOF) are + * retried up to STREAM_URL_MAX_RETRIES times with a fixed delay. + * Presigned URLs are valid for 15 minutes, so retries are safe. */ export async function streamUrlToFile( url: string, filePath: string, fetchImpl: FetchImpl, + deps?: { sleep?: (ms: number) => Promise }, ): Promise { - let response: Response; - try { - response = await fetchImpl(url); - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - throw new TransportError(`Failed to download presigned URL ${url}: ${message}`); - } - if (!response.ok) { - throw ApiError.fromEnvelope({ - error: { - code: 'UNAVAILABLE', - message: `Failed to download presigned URL (HTTP ${response.status}).`, - nextAction: - 'Re-run `testsprite test failure get`. Presigned URLs in the bundle expire after 15 minutes.', - requestId: 'local', - details: { status: response.status, url }, - }, - }); - } - if (!response.body) { - // Some test runtimes / fetch polyfills don't expose `body` as a - // ReadableStream. Fall back to a buffered write — same correctness, - // just no streaming benefit. The bundle is bounded by the - // backend's 15-min TTL, so even a multi-MB video buffers fully in - // a tolerable amount of memory. - const buffer = Buffer.from(await response.arrayBuffer()); - await writeFile(filePath, buffer); - return; - } - await mkdir(dirname(filePath), { recursive: true }); - // `response.body` is a Web ReadableStream. Node's `pipeline` accepts - // it via `Readable.fromWeb` (Node ≥ 18). Wrap in a try so any error - // from the stream propagates as a TransportError, preserving the - // exit-code contract. - const fileSink = createWriteStream(filePath); - try { - const webBody = response.body as unknown as NodeReadableStream; - const { Readable } = await import('node:stream'); - const nodeStream = Readable.fromWeb(webBody); - await pipeline(nodeStream, fileSink as unknown as Writable); - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - throw new TransportError(`Failed mid-download of ${url}: ${message}`); + const sleepFn = deps?.sleep ?? ((ms: number) => new Promise(r => setTimeout(r, ms))); + for (let attempt = 1; attempt <= STREAM_URL_MAX_RETRIES; attempt++) { + let response: Response; + try { + response = await fetchImpl(url); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + if (attempt < STREAM_URL_MAX_RETRIES) { + await sleepFn(STREAM_URL_RETRY_DELAY_MS); + continue; + } + throw new TransportError(`Failed to download presigned URL ${url}: ${message}`); + } + if (!response.ok) { + // Non-2xx: the URL itself is bad (expired, unauthorized, not found). + // Retrying the same URL won't help — surface immediately. + throw ApiError.fromEnvelope({ + error: { + code: 'UNAVAILABLE', + message: `Failed to download presigned URL (HTTP ${response.status}).`, + nextAction: + 'Re-run `testsprite test failure get`. Presigned URLs in the bundle expire after 15 minutes.', + requestId: 'local', + details: { status: response.status, url }, + }, + }); + } + if (!response.body) { + // Some test runtimes / fetch polyfills don't expose `body` as a + // ReadableStream. Fall back to a buffered write — same correctness, + // just no streaming benefit. The bundle is bounded by the + // backend's 15-min TTL, so even a multi-MB video buffers fully in + // a tolerable amount of memory. + try { + const buffer = Buffer.from(await response.arrayBuffer()); + await writeFile(filePath, buffer); + return; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + if (attempt < STREAM_URL_MAX_RETRIES) { + await sleepFn(STREAM_URL_RETRY_DELAY_MS); + continue; + } + throw new TransportError(`Failed to download presigned URL ${url}: ${message}`); + } + } + await mkdir(dirname(filePath), { recursive: true }); + // `response.body` is a Web ReadableStream. Node's `pipeline` accepts + // it via `Readable.fromWeb` (Node >= 18). Wrap in a try so any error + // from the stream propagates as a TransportError, preserving the + // exit-code contract. `pipeline` destroys both streams on error, so + // a new WriteStream on retry starts clean. + const fileSink = createWriteStream(filePath); + try { + const webBody = response.body as unknown as NodeReadableStream; + const { Readable } = await import('node:stream'); + const nodeStream = Readable.fromWeb(webBody); + await pipeline(nodeStream, fileSink as unknown as Writable); + return; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + if (attempt < STREAM_URL_MAX_RETRIES) { + await sleepFn(STREAM_URL_RETRY_DELAY_MS); + continue; + } + throw new TransportError(`Failed mid-download of ${url}: ${message}`); + } } }