diff --git a/packages/backend/src/services/__tests__/probe-service.test.ts b/packages/backend/src/services/__tests__/probe-service.test.ts index 672a1b38..e88a8fc7 100644 --- a/packages/backend/src/services/__tests__/probe-service.test.ts +++ b/packages/backend/src/services/__tests__/probe-service.test.ts @@ -153,4 +153,97 @@ describe('ProbeService', () => { expect(result.success).toBe(false); expect(dispatcher.dispatch).not.toHaveBeenCalled(); }); + + test('runProbe cancels streaming response to release concurrency slot', async () => { + const cancelSpy = vi.fn(async () => {}); + const { usageStorage, dispatcher } = makeMocks(); + (dispatcher.dispatch as any).mockResolvedValueOnce({ + id: 'r', + model: 'test-model', + created: Date.now(), + content: null, + stream: { cancel: cancelSpy }, + usage: undefined, + plexus: { + provider: 'p1', + model: 'm1', + apiType: 'chat', + canonicalModel: 'm1', + attemptCount: 1, + }, + }); + + const svc = new ProbeService(dispatcher, usageStorage); + const result = await svc.runProbe({ + provider: 'p1', + model: 'm1', + apiType: 'chat', + source: 'background', + }); + + expect(result.success).toBe(true); + expect(cancelSpy).toHaveBeenCalledTimes(1); + }); + + test('runProbe skips cancellation when stream has no cancel method', async () => { + const { usageStorage, dispatcher } = makeMocks(); + (dispatcher.dispatch as any).mockResolvedValueOnce({ + id: 'r', + model: 'test-model', + created: Date.now(), + content: null, + stream: {}, + usage: undefined, + plexus: { + provider: 'p1', + model: 'm1', + apiType: 'chat', + canonicalModel: 'm1', + attemptCount: 1, + }, + }); + + const svc = new ProbeService(dispatcher, usageStorage); + const result = await svc.runProbe({ + provider: 'p1', + model: 'm1', + apiType: 'chat', + source: 'background', + }); + + expect(result.success).toBe(true); + }); + + test('runProbe swallows stream cancellation errors', async () => { + const cancelSpy = vi.fn(async () => { + throw new Error('stream already closed'); + }); + const { usageStorage, dispatcher } = makeMocks(); + (dispatcher.dispatch as any).mockResolvedValueOnce({ + id: 'r', + model: 'test-model', + created: Date.now(), + content: null, + stream: { cancel: cancelSpy }, + usage: undefined, + plexus: { + provider: 'p1', + model: 'm1', + apiType: 'chat', + canonicalModel: 'm1', + attemptCount: 1, + }, + }); + + const svc = new ProbeService(dispatcher, usageStorage); + const result = await svc.runProbe({ + provider: 'p1', + model: 'm1', + apiType: 'chat', + source: 'background', + }); + + expect(result.success).toBe(true); + expect(cancelSpy).toHaveBeenCalledTimes(1); + }); }); diff --git a/packages/backend/src/services/probe-service.ts b/packages/backend/src/services/probe-service.ts index 68abdd52..e534c4eb 100644 --- a/packages/backend/src/services/probe-service.ts +++ b/packages/backend/src/services/probe-service.ts @@ -232,6 +232,20 @@ export class ProbeService { response = await this.dispatcher.dispatch(unifiedRequest); } + // For streaming responses, cancel the stream to release the concurrency + // slot. The probe only needs routing metadata (plexus.*), not the stream + // content. Without this, the dispatcher's stream wrapper never fires its + // release callback, leaking the concurrency slot permanently. + if (response?.stream && typeof response.stream.cancel === 'function') { + try { + await response.stream.cancel(); + } catch (e) { + // Slot is already released by the dispatcher's wrapper before the + // underlying stream is cancelled; cancellation failure is non-fatal. + logger.warn('Probe stream cancellation failed (slot already released):', e); + } + } + const durationMs = Date.now() - startTime; usageRecord.provider = response.plexus?.provider;