From fb2dc04427419da3e584008112bfc431b78f9ca3 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Tue, 23 Jun 2026 20:25:46 +0200 Subject: [PATCH 1/3] fix playAudio response streams --- src/helpers/audio.ts | 3 ++- tests/helpers/audio.test.ts | 50 +++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 tests/helpers/audio.test.ts diff --git a/src/helpers/audio.ts b/src/helpers/audio.ts index ecb6d74814..25d27d2e5b 100644 --- a/src/helpers/audio.ts +++ b/src/helpers/audio.ts @@ -1,5 +1,6 @@ import { spawn } from 'node:child_process'; import { Readable } from 'node:stream'; +import type { ReadableStream as NodeReadableStream } from 'node:stream/web'; import { platform, versions } from 'node:process'; import { checkFileSupport } from '../internal/uploads'; @@ -37,7 +38,7 @@ async function nodejsPlayAudio(stream: NodeJS.ReadableStream | Response | File): const ffplay = spawn('ffplay', ['-autoexit', '-nodisp', '-i', 'pipe:0']); if (isResponse(stream)) { - (stream.body! as any).pipe(ffplay.stdin); + Readable.fromWeb(stream.body! as NodeReadableStream).pipe(ffplay.stdin); } else if (isFile(stream)) { Readable.from(stream.stream()).pipe(ffplay.stdin); } else { diff --git a/tests/helpers/audio.test.ts b/tests/helpers/audio.test.ts new file mode 100644 index 0000000000..3f4790c1f7 --- /dev/null +++ b/tests/helpers/audio.test.ts @@ -0,0 +1,50 @@ +jest.mock('node:child_process', () => ({ spawn: jest.fn() })); + +import { spawn } from 'node:child_process'; +import { Writable } from 'node:stream'; +import { playAudio } from 'openai/helpers/audio'; + +const spawnMock = spawn as jest.MockedFunction; + +function mockFfplay() { + const chunks: Buffer[] = []; + const stdin = new Writable({ + write(chunk, _encoding, callback) { + chunks.push(Buffer.from(chunk)); + callback(); + }, + }); + const ffplay = { + stdin, + on: jest.fn(), + }; + + ffplay.on.mockImplementation((event: string, listener: (code: number) => void) => { + if (event === 'close') { + if (stdin.writableEnded) { + queueMicrotask(() => listener(0)); + } else { + stdin.on('finish', () => listener(0)); + } + } + return ffplay; + }); + spawnMock.mockReturnValue(ffplay as any); + + return { chunks }; +} + +describe('playAudio', () => { + afterEach(() => { + jest.resetAllMocks(); + }); + + it('pipes a Response Web ReadableStream body to ffplay', async () => { + const { chunks } = mockFfplay(); + + await playAudio(new Response('hello')); + + expect(spawnMock).toHaveBeenCalledWith('ffplay', ['-autoexit', '-nodisp', '-i', 'pipe:0']); + expect(Buffer.concat(chunks).toString()).toBe('hello'); + }); +}); From c9e3996a6672dddc8ff5bda4966687ea211bcd6f Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Tue, 23 Jun 2026 20:39:04 +0200 Subject: [PATCH 2/3] preserve Node response streams --- src/helpers/audio.ts | 7 ++++++- tests/helpers/audio.test.ts | 11 ++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/src/helpers/audio.ts b/src/helpers/audio.ts index 25d27d2e5b..0f8f809845 100644 --- a/src/helpers/audio.ts +++ b/src/helpers/audio.ts @@ -38,7 +38,12 @@ async function nodejsPlayAudio(stream: NodeJS.ReadableStream | Response | File): const ffplay = spawn('ffplay', ['-autoexit', '-nodisp', '-i', 'pipe:0']); if (isResponse(stream)) { - Readable.fromWeb(stream.body! as NodeReadableStream).pipe(ffplay.stdin); + const body = stream.body! as NodeReadableStream | NodeJS.ReadableStream; + if ('pipe' in body && typeof body.pipe === 'function') { + body.pipe(ffplay.stdin); + } else { + Readable.fromWeb(body as NodeReadableStream).pipe(ffplay.stdin); + } } else if (isFile(stream)) { Readable.from(stream.stream()).pipe(ffplay.stdin); } else { diff --git a/tests/helpers/audio.test.ts b/tests/helpers/audio.test.ts index 3f4790c1f7..ed23d2f79a 100644 --- a/tests/helpers/audio.test.ts +++ b/tests/helpers/audio.test.ts @@ -1,7 +1,7 @@ jest.mock('node:child_process', () => ({ spawn: jest.fn() })); import { spawn } from 'node:child_process'; -import { Writable } from 'node:stream'; +import { Readable, Writable } from 'node:stream'; import { playAudio } from 'openai/helpers/audio'; const spawnMock = spawn as jest.MockedFunction; @@ -47,4 +47,13 @@ describe('playAudio', () => { expect(spawnMock).toHaveBeenCalledWith('ffplay', ['-autoexit', '-nodisp', '-i', 'pipe:0']); expect(Buffer.concat(chunks).toString()).toBe('hello'); }); + + it('keeps Node Readable response bodies on the Node stream path', async () => { + const { chunks } = mockFfplay(); + const response = { body: Readable.from(['hello']) }; + + await playAudio(response as any); + + expect(Buffer.concat(chunks).toString()).toBe('hello'); + }); }); From 1e81a787ca2a7000faf035cf4457d26755c9910b Mon Sep 17 00:00:00 2001 From: Hayden Date: Thu, 25 Jun 2026 10:13:20 -0700 Subject: [PATCH 3/3] fix(audio): handle playback stream errors --- src/helpers/audio.ts | 18 +++++++++++++----- tests/helpers/audio.test.ts | 18 +++++++++++++++++- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/src/helpers/audio.ts b/src/helpers/audio.ts index 0f8f809845..2a6aad3c44 100644 --- a/src/helpers/audio.ts +++ b/src/helpers/audio.ts @@ -1,5 +1,5 @@ import { spawn } from 'node:child_process'; -import { Readable } from 'node:stream'; +import { pipeline, Readable } from 'node:stream'; import type { ReadableStream as NodeReadableStream } from 'node:stream/web'; import { platform, versions } from 'node:process'; import { checkFileSupport } from '../internal/uploads'; @@ -37,19 +37,27 @@ async function nodejsPlayAudio(stream: NodeJS.ReadableStream | Response | File): try { const ffplay = spawn('ffplay', ['-autoexit', '-nodisp', '-i', 'pipe:0']); + let source: NodeJS.ReadableStream; if (isResponse(stream)) { const body = stream.body! as NodeReadableStream | NodeJS.ReadableStream; if ('pipe' in body && typeof body.pipe === 'function') { - body.pipe(ffplay.stdin); + source = body; } else { - Readable.fromWeb(body as NodeReadableStream).pipe(ffplay.stdin); + source = Readable.fromWeb(body as NodeReadableStream); } } else if (isFile(stream)) { - Readable.from(stream.stream()).pipe(ffplay.stdin); + source = Readable.from(stream.stream()); } else { - stream.pipe(ffplay.stdin); + source = stream; } + pipeline(source, ffplay.stdin, (error) => { + if (error) { + ffplay.kill(); + reject(error); + } + }); + ffplay.on('close', (code: number) => { if (code !== 0) { reject(new Error(`ffplay process exited with code ${code}`)); diff --git a/tests/helpers/audio.test.ts b/tests/helpers/audio.test.ts index ed23d2f79a..6c2a3be875 100644 --- a/tests/helpers/audio.test.ts +++ b/tests/helpers/audio.test.ts @@ -16,6 +16,7 @@ function mockFfplay() { }); const ffplay = { stdin, + kill: jest.fn(), on: jest.fn(), }; @@ -31,7 +32,7 @@ function mockFfplay() { }); spawnMock.mockReturnValue(ffplay as any); - return { chunks }; + return { chunks, ffplay }; } describe('playAudio', () => { @@ -56,4 +57,19 @@ describe('playAudio', () => { expect(Buffer.concat(chunks).toString()).toBe('hello'); }); + + it('rejects and stops ffplay when a Response Web ReadableStream errors', async () => { + const { ffplay } = mockFfplay(); + const response = new Response( + new ReadableStream({ + start(controller) { + controller.error(new Error('stream failed')); + }, + }), + ); + + await expect(playAudio(response)).rejects.toThrow('stream failed'); + + expect(ffplay.kill).toHaveBeenCalled(); + }); });