Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions src/helpers/audio.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { spawn } from 'node:child_process';
import { Readable } from 'node:stream';
import { pipeline, Readable } from 'node:stream';
import type { ReadableStream as NodeReadableStream } from 'node:stream/web';
import { platform, versions } from 'node:process';
import { checkFileSupport } from '../internal/uploads';

Expand Down Expand Up @@ -36,14 +37,27 @@ async function nodejsPlayAudio(stream: NodeJS.ReadableStream | Response | File):
try {
const ffplay = spawn('ffplay', ['-autoexit', '-nodisp', '-i', 'pipe:0']);

let source: NodeJS.ReadableStream;
if (isResponse(stream)) {
(stream.body! as any).pipe(ffplay.stdin);
const body = stream.body! as NodeReadableStream | NodeJS.ReadableStream;
if ('pipe' in body && typeof body.pipe === 'function') {
source = body;
} else {
source = Readable.fromWeb(body as NodeReadableStream);
}
} else if (isFile(stream)) {
Readable.from(stream.stream()).pipe(ffplay.stdin);
source = Readable.from(stream.stream());
} else {
stream.pipe(ffplay.stdin);
source = stream;
}

pipeline(source, ffplay.stdin, (error) => {
if (error) {
ffplay.kill();
reject(error);
}
});

ffplay.on('close', (code: number) => {
if (code !== 0) {
reject(new Error(`ffplay process exited with code ${code}`));
Expand Down
75 changes: 75 additions & 0 deletions tests/helpers/audio.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
jest.mock('node:child_process', () => ({ spawn: jest.fn() }));

import { spawn } from 'node:child_process';
import { Readable, Writable } from 'node:stream';
import { playAudio } from 'openai/helpers/audio';

const spawnMock = spawn as jest.MockedFunction<typeof spawn>;

function mockFfplay() {
const chunks: Buffer[] = [];
const stdin = new Writable({
write(chunk, _encoding, callback) {
chunks.push(Buffer.from(chunk));
callback();
},
});
const ffplay = {
stdin,
kill: jest.fn(),
on: jest.fn(),
};

ffplay.on.mockImplementation((event: string, listener: (code: number) => void) => {
if (event === 'close') {
if (stdin.writableEnded) {
queueMicrotask(() => listener(0));
} else {
stdin.on('finish', () => listener(0));
}
}
return ffplay;
});
spawnMock.mockReturnValue(ffplay as any);

return { chunks, ffplay };
}

describe('playAudio', () => {
afterEach(() => {
jest.resetAllMocks();
});

it('pipes a Response Web ReadableStream body to ffplay', async () => {
const { chunks } = mockFfplay();

await playAudio(new Response('hello'));

expect(spawnMock).toHaveBeenCalledWith('ffplay', ['-autoexit', '-nodisp', '-i', 'pipe:0']);
expect(Buffer.concat(chunks).toString()).toBe('hello');
});

it('keeps Node Readable response bodies on the Node stream path', async () => {
const { chunks } = mockFfplay();
const response = { body: Readable.from(['hello']) };

await playAudio(response as any);

expect(Buffer.concat(chunks).toString()).toBe('hello');
});

it('rejects and stops ffplay when a Response Web ReadableStream errors', async () => {
const { ffplay } = mockFfplay();
const response = new Response(
new ReadableStream({
start(controller) {
controller.error(new Error('stream failed'));
},
}),
);

await expect(playAudio(response)).rejects.toThrow('stream failed');

expect(ffplay.kill).toHaveBeenCalled();
});
});