From db8d16462324194dd33d900509b6e87e07de6839 Mon Sep 17 00:00:00 2001 From: "rosetta-livekit-bot[bot]" <282703043+rosetta-livekit-bot[bot]@users.noreply.github.com> Date: Fri, 12 Jun 2026 04:35:56 +0000 Subject: [PATCH] fix(voice): avoid AgentTask close deadlock --- .changeset/fix-agent-task-close-deadlock.md | 5 ++++ agents/src/voice/agent.test.ts | 3 +++ agents/src/voice/agent.ts | 9 ++++++- agents/src/voice/agent_activity.ts | 27 ++++++++++++++++--- agents/src/voice/agent_session.ts | 5 ++++ .../src/voice/agent_session_handoff.test.ts | 17 +++++------- 6 files changed, 51 insertions(+), 15 deletions(-) create mode 100644 .changeset/fix-agent-task-close-deadlock.md diff --git a/.changeset/fix-agent-task-close-deadlock.md b/.changeset/fix-agent-task-close-deadlock.md new file mode 100644 index 000000000..53e423c08 --- /dev/null +++ b/.changeset/fix-agent-task-close-deadlock.md @@ -0,0 +1,5 @@ +--- +'@livekit/agents': patch +--- + +fix(voice): avoid AgentTask handoff deadlock during session close diff --git a/agents/src/voice/agent.test.ts b/agents/src/voice/agent.test.ts index e644ca043..4df0e31b1 100644 --- a/agents/src/voice/agent.test.ts +++ b/agents/src/voice/agent.test.ts @@ -391,6 +391,7 @@ describe('Agent', () => { agent: oldAgent, agentSession: mockSession, _onEnterTask: undefined, + _addDrainBlockedTasks: vi.fn(), llm: undefined, close: async () => {}, }; @@ -432,6 +433,7 @@ describe('Agent', () => { agent: oldAgent, agentSession: mockSession, _onEnterTask: undefined, + _addDrainBlockedTasks: vi.fn(), llm: undefined, close: async () => {}, }; @@ -544,6 +546,7 @@ describe('Agent', () => { agent: oldAgent, agentSession: mockSession, _onEnterTask: undefined, + _addDrainBlockedTasks: vi.fn(), llm: undefined, close: closeOldActivity, }; diff --git a/agents/src/voice/agent.ts b/agents/src/voice/agent.ts index f65d45425..528ba2e50 100644 --- a/agents/src/voice/agent.ts +++ b/agents/src/voice/agent.ts @@ -654,6 +654,10 @@ export class AgentTask extends Agent extends Agent; // Maps response_id to OTEL span for metrics recording @@ -205,7 +206,7 @@ export class AgentActivity implements RecognitionHooks { private _schedulingPaused = true; private newTurnsBlocked = false; private _authorizationPaused = false; - private _drainBlockedTasks: Task[] = []; + private _drainBlockedTasks: Set> = new Set(); private _currentSpeech?: SpeechHandle; private speechQueue: Heap<[number, number, SpeechHandle]>; // [priority, timestamp, speechHandle] private q_updated: Future; @@ -1819,7 +1820,7 @@ export class AgentActivity implements RecognitionHooks { const toWait: Task[] = []; for (const task of this.speechTasks) { - if (this._drainBlockedTasks.includes(task)) { + if (this._drainBlockedTasks.has(task)) { continue; } @@ -3771,11 +3772,21 @@ export class AgentActivity implements RecognitionHooks { this.wakeupMainTask(); } + /** @internal */ + _addDrainBlockedTasks(tasks: Task[]): void { + for (const task of tasks) { + this._drainBlockedTasks.add(task); + } + this.wakeupMainTask(); + } + private async _pauseSchedulingTask(blockedTasks: Task[]): Promise { if (this._schedulingPaused) return; this._schedulingPaused = true; - this._drainBlockedTasks = blockedTasks; + if (blockedTasks.length > 0) { + this._addDrainBlockedTasks(blockedTasks); + } this.wakeupMainTask(); if (this._mainTask) { @@ -3791,6 +3802,7 @@ export class AgentActivity implements RecognitionHooks { this._schedulingPaused = false; this.newTurnsBlocked = false; + this._drainBlockedTasks.clear(); this._mainTask = Task.from(({ signal }) => this.mainTask(signal)); } @@ -3804,6 +3816,10 @@ export class AgentActivity implements RecognitionHooks { const unlock = await this.lock.lock(); try { + if (this.closed) { + return undefined; + } + const span = tracer.startSpan({ name: 'pause_agent_activity', attributes: { [traceTypes.ATTR_AGENT_LABEL]: this.agent.id }, @@ -3884,6 +3900,11 @@ export class AgentActivity implements RecognitionHooks { async close(): Promise { const unlock = await this.lock.lock(); try { + if (this.closed) { + return; + } + this.closed = true; + this.cancelPreemptiveGeneration(); await cancelAndWait(Array.from(this.speechTasks), AgentActivity.REPLY_TASK_CANCEL_TIMEOUT); diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index d308df854..9e6bd3dbd 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -1053,6 +1053,11 @@ export class AgentSession< let reusableResources: ReusableResources | undefined; try { + if (this.closing && newActivity === 'start') { + this.logger.warn({ agentId: agent.id }, 'Session is closing, skipping start of activity'); + return; + } + this.agent = agent; const prevActivityObj = this.activity; diff --git a/agents/src/voice/agent_session_handoff.test.ts b/agents/src/voice/agent_session_handoff.test.ts index 5690f7412..c0585ccfc 100644 --- a/agents/src/voice/agent_session_handoff.test.ts +++ b/agents/src/voice/agent_session_handoff.test.ts @@ -223,18 +223,14 @@ describe('AgentSession reusable resources handoff', () => { expect(item.newAgentId).toBe(nextAgent.id); }); - it('skips starting a new activity while the session is closing and cleans up resources', async () => { - const closeFn = vi.fn(async () => {}); - const resources: ReusableResources = { - sttPipeline: { close: closeFn } as any, - }; + it('skips starting a new activity while the session is closing', async () => { const previousAgent = new Agent({ instructions: 'old' }); const nextAgent = new Agent({ instructions: 'new' }); const previousActivity = { agent: previousAgent, - drain: vi.fn(async () => resources), + drain: vi.fn(async () => undefined), close: vi.fn(async () => {}), - pause: vi.fn(async () => resources), + pause: vi.fn(async () => undefined), }; const startSpy = vi.spyOn(AgentActivity.prototype, 'start').mockResolvedValue(undefined); @@ -249,11 +245,10 @@ describe('AgentSession reusable resources handoff', () => { waitOnEnter: false, }); - expect(previousActivity.drain).toHaveBeenCalledTimes(1); - expect(previousActivity.close).toHaveBeenCalledTimes(1); - expect(closeFn).toHaveBeenCalledTimes(1); + expect(previousActivity.drain).not.toHaveBeenCalled(); + expect(previousActivity.close).not.toHaveBeenCalled(); expect(startSpy).not.toHaveBeenCalled(); - expect((session as any).activity).toBeUndefined(); + expect((session as any).activity).toBe(previousActivity); expect((session as any).nextActivity).toBeUndefined(); } finally { startSpy.mockRestore();