From c6192fb0bf36fa15d88c6bf85fea384fb72c2da0 Mon Sep 17 00:00:00 2001 From: Sirn Thanabulpong Date: Tue, 19 May 2026 16:01:47 +0900 Subject: [PATCH] fix(probe): cancel streaming response to release concurrency slot Chat probes use stream: true to measure realistic TTFT/TPS, but the ProbeService never consumed or cancelled the returned ReadableStream. The dispatcher wraps streams so doRelease() only fires when the stream is consumed, cancelled, or errored. Since the probe only reads response.plexus metadata, the stream was silently abandoned, leaking one concurrency slot per chat probe. Over time this exhausted maxConcurrency and deadlocked the provider/model. Cancel the stream after dispatch with error handling (logger.warn on failure, since the slot is already released by the wrapper). --- .../services/__tests__/probe-service.test.ts | 93 +++++++++++++++++++ .../backend/src/services/probe-service.ts | 14 +++ 2 files changed, 107 insertions(+) diff --git a/packages/backend/src/services/__tests__/probe-service.test.ts b/packages/backend/src/services/__tests__/probe-service.test.ts index 672a1b38..e88a8fc7 100644 --- a/packages/backend/src/services/__tests__/probe-service.test.ts +++ b/packages/backend/src/services/__tests__/probe-service.test.ts @@ -153,4 +153,97 @@ describe('ProbeService', () => { expect(result.success).toBe(false); expect(dispatcher.dispatch).not.toHaveBeenCalled(); }); + + test('runProbe cancels streaming response to release concurrency slot', async () => { + const cancelSpy = vi.fn(async () => {}); + const { usageStorage, dispatcher } = makeMocks(); + (dispatcher.dispatch as any).mockResolvedValueOnce({ + id: 'r', + model: 'test-model', + created: Date.now(), + content: null, + stream: { cancel: cancelSpy }, + usage: undefined, + plexus: { + provider: 'p1', + model: 'm1', + apiType: 'chat', + canonicalModel: 'm1', + attemptCount: 1, + }, + }); + + const svc = new ProbeService(dispatcher, usageStorage); + const result = await svc.runProbe({ + provider: 'p1', + model: 'm1', + apiType: 'chat', + source: 'background', + }); + + expect(result.success).toBe(true); + expect(cancelSpy).toHaveBeenCalledTimes(1); + }); + + test('runProbe skips cancellation when stream has no cancel method', async () => { + const { usageStorage, dispatcher } = makeMocks(); + (dispatcher.dispatch as any).mockResolvedValueOnce({ + id: 'r', + model: 'test-model', + created: Date.now(), + content: null, + stream: {}, + usage: undefined, + plexus: { + provider: 'p1', + model: 'm1', + apiType: 'chat', + canonicalModel: 'm1', + attemptCount: 1, + }, + }); + + const svc = new ProbeService(dispatcher, usageStorage); + const result = await svc.runProbe({ + provider: 'p1', + model: 'm1', + apiType: 'chat', + source: 'background', + }); + + expect(result.success).toBe(true); + }); + + test('runProbe swallows stream cancellation errors', async () => { + const cancelSpy = vi.fn(async () => { + throw new Error('stream already closed'); + }); + const { usageStorage, dispatcher } = makeMocks(); + (dispatcher.dispatch as any).mockResolvedValueOnce({ + id: 'r', + model: 'test-model', + created: Date.now(), + content: null, + stream: { cancel: cancelSpy }, + usage: undefined, + plexus: { + provider: 'p1', + model: 'm1', + apiType: 'chat', + canonicalModel: 'm1', + attemptCount: 1, + }, + }); + + const svc = new ProbeService(dispatcher, usageStorage); + const result = await svc.runProbe({ + provider: 'p1', + model: 'm1', + apiType: 'chat', + source: 'background', + }); + + expect(result.success).toBe(true); + expect(cancelSpy).toHaveBeenCalledTimes(1); + }); }); diff --git a/packages/backend/src/services/probe-service.ts b/packages/backend/src/services/probe-service.ts index 68abdd52..e534c4eb 100644 --- a/packages/backend/src/services/probe-service.ts +++ b/packages/backend/src/services/probe-service.ts @@ -232,6 +232,20 @@ export class ProbeService { response = await this.dispatcher.dispatch(unifiedRequest); } + // For streaming responses, cancel the stream to release the concurrency + // slot. The probe only needs routing metadata (plexus.*), not the stream + // content. Without this, the dispatcher's stream wrapper never fires its + // release callback, leaking the concurrency slot permanently. + if (response?.stream && typeof response.stream.cancel === 'function') { + try { + await response.stream.cancel(); + } catch (e) { + // Slot is already released by the dispatcher's wrapper before the + // underlying stream is cancelled; cancellation failure is non-fatal. + logger.warn('Probe stream cancellation failed (slot already released):', e); + } + } + const durationMs = Date.now() - startTime; usageRecord.provider = response.plexus?.provider;