diff --git a/src/helpers/audio.ts b/src/helpers/audio.ts index ecb6d7481..2a6aad3c4 100644 --- a/src/helpers/audio.ts +++ b/src/helpers/audio.ts @@ -1,5 +1,6 @@ import { spawn } from 'node:child_process'; -import { Readable } from 'node:stream'; +import { pipeline, Readable } from 'node:stream'; +import type { ReadableStream as NodeReadableStream } from 'node:stream/web'; import { platform, versions } from 'node:process'; import { checkFileSupport } from '../internal/uploads'; @@ -36,14 +37,27 @@ async function nodejsPlayAudio(stream: NodeJS.ReadableStream | Response | File): try { const ffplay = spawn('ffplay', ['-autoexit', '-nodisp', '-i', 'pipe:0']); + let source: NodeJS.ReadableStream; if (isResponse(stream)) { - (stream.body! as any).pipe(ffplay.stdin); + const body = stream.body! as NodeReadableStream | NodeJS.ReadableStream; + if ('pipe' in body && typeof body.pipe === 'function') { + source = body; + } else { + source = Readable.fromWeb(body as NodeReadableStream); + } } else if (isFile(stream)) { - Readable.from(stream.stream()).pipe(ffplay.stdin); + source = Readable.from(stream.stream()); } else { - stream.pipe(ffplay.stdin); + source = stream; } + pipeline(source, ffplay.stdin, (error) => { + if (error) { + ffplay.kill(); + reject(error); + } + }); + ffplay.on('close', (code: number) => { if (code !== 0) { reject(new Error(`ffplay process exited with code ${code}`)); diff --git a/tests/helpers/audio.test.ts b/tests/helpers/audio.test.ts new file mode 100644 index 000000000..6c2a3be87 --- /dev/null +++ b/tests/helpers/audio.test.ts @@ -0,0 +1,75 @@ +jest.mock('node:child_process', () => ({ spawn: jest.fn() })); + +import { spawn } from 'node:child_process'; +import { Readable, Writable } from 'node:stream'; +import { playAudio } from 'openai/helpers/audio'; + +const spawnMock = spawn as jest.MockedFunction; + +function mockFfplay() { + const chunks: Buffer[] = []; + const stdin = new Writable({ + write(chunk, _encoding, callback) { + chunks.push(Buffer.from(chunk)); + callback(); + }, + }); + const ffplay = { + stdin, + kill: jest.fn(), + on: jest.fn(), + }; + + ffplay.on.mockImplementation((event: string, listener: (code: number) => void) => { + if (event === 'close') { + if (stdin.writableEnded) { + queueMicrotask(() => listener(0)); + } else { + stdin.on('finish', () => listener(0)); + } + } + return ffplay; + }); + spawnMock.mockReturnValue(ffplay as any); + + return { chunks, ffplay }; +} + +describe('playAudio', () => { + afterEach(() => { + jest.resetAllMocks(); + }); + + it('pipes a Response Web ReadableStream body to ffplay', async () => { + const { chunks } = mockFfplay(); + + await playAudio(new Response('hello')); + + expect(spawnMock).toHaveBeenCalledWith('ffplay', ['-autoexit', '-nodisp', '-i', 'pipe:0']); + expect(Buffer.concat(chunks).toString()).toBe('hello'); + }); + + it('keeps Node Readable response bodies on the Node stream path', async () => { + const { chunks } = mockFfplay(); + const response = { body: Readable.from(['hello']) }; + + await playAudio(response as any); + + expect(Buffer.concat(chunks).toString()).toBe('hello'); + }); + + it('rejects and stops ffplay when a Response Web ReadableStream errors', async () => { + const { ffplay } = mockFfplay(); + const response = new Response( + new ReadableStream({ + start(controller) { + controller.error(new Error('stream failed')); + }, + }), + ); + + await expect(playAudio(response)).rejects.toThrow('stream failed'); + + expect(ffplay.kill).toHaveBeenCalled(); + }); +});