Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-agent-task-close-deadlock.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/agents': patch
---

fix(voice): avoid AgentTask handoff deadlock during session close
3 changes: 3 additions & 0 deletions agents/src/voice/agent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ describe('Agent', () => {
agent: oldAgent,
agentSession: mockSession,
_onEnterTask: undefined,
_addDrainBlockedTasks: vi.fn(),
llm: undefined,
close: async () => {},
};
Expand Down Expand Up @@ -432,6 +433,7 @@ describe('Agent', () => {
agent: oldAgent,
agentSession: mockSession,
_onEnterTask: undefined,
_addDrainBlockedTasks: vi.fn(),
llm: undefined,
close: async () => {},
};
Expand Down Expand Up @@ -544,6 +546,7 @@ describe('Agent', () => {
agent: oldAgent,
agentSession: mockSession,
_onEnterTask: undefined,
_addDrainBlockedTasks: vi.fn(),
llm: undefined,
close: closeOldActivity,
};
Expand Down
9 changes: 8 additions & 1 deletion agents/src/voice/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,10 @@ export class AgentTask<ResultT = unknown, UserData = any> extends Agent<UserData
blockedTasks.push(onEnterTask);
}

// Register before any await so a concurrent drain (e.g. session close)
// won't wait for tasks blocked on this handoff.
oldActivity._addDrainBlockedTasks(blockedTasks);

if (
taskInfo.functionCall &&
oldActivity.llm instanceof RealtimeModel &&
Expand Down Expand Up @@ -690,7 +694,10 @@ export class AgentTask<ResultT = unknown, UserData = any> extends Agent<UserData
// runState could have changed after future resolved
runState = session._globalRunState;

if (session.currentAgent !== this) {
if (session._closing && this._agentActivity === undefined) {
// The activity never started because the session is closing; the close path
// owns the previous activity.
} else if (session.currentAgent !== this) {
this.#logger.warn(
`${this.constructor.name} completed, but the agent has changed in the meantime. ` +
`Ignoring handoff to the previous agent, likely due to AgentSession.updateAgent being invoked.`,
Expand Down
27 changes: 24 additions & 3 deletions agents/src/voice/agent_activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ export class AgentActivity implements RecognitionHooks {
private static readonly REPLY_TASK_CANCEL_TIMEOUT = 5000;

private started = false;
private closed = false;
private audioRecognition?: AudioRecognition;
private realtimeSession?: RealtimeSession;
private realtimeSpans?: Map<string, Span>; // Maps response_id to OTEL span for metrics recording
Expand All @@ -205,7 +206,7 @@ export class AgentActivity implements RecognitionHooks {
private _schedulingPaused = true;
private newTurnsBlocked = false;
private _authorizationPaused = false;
private _drainBlockedTasks: Task<any>[] = [];
private _drainBlockedTasks: Set<Task<any>> = new Set();
private _currentSpeech?: SpeechHandle;
private speechQueue: Heap<[number, number, SpeechHandle]>; // [priority, timestamp, speechHandle]
private q_updated: Future<void, never>;
Expand Down Expand Up @@ -1819,7 +1820,7 @@ export class AgentActivity implements RecognitionHooks {

const toWait: Task<void>[] = [];
for (const task of this.speechTasks) {
if (this._drainBlockedTasks.includes(task)) {
if (this._drainBlockedTasks.has(task)) {
continue;
}

Expand Down Expand Up @@ -3771,11 +3772,21 @@ export class AgentActivity implements RecognitionHooks {
this.wakeupMainTask();
}

/** @internal */
_addDrainBlockedTasks(tasks: Task<any>[]): void {
for (const task of tasks) {
this._drainBlockedTasks.add(task);
}
this.wakeupMainTask();
}

private async _pauseSchedulingTask(blockedTasks: Task<any>[]): Promise<void> {
if (this._schedulingPaused) return;

this._schedulingPaused = true;
this._drainBlockedTasks = blockedTasks;
if (blockedTasks.length > 0) {
this._addDrainBlockedTasks(blockedTasks);
}
this.wakeupMainTask();

if (this._mainTask) {
Expand All @@ -3791,6 +3802,7 @@ export class AgentActivity implements RecognitionHooks {

this._schedulingPaused = false;
this.newTurnsBlocked = false;
this._drainBlockedTasks.clear();
this._mainTask = Task.from(({ signal }) => this.mainTask(signal));
}

Expand All @@ -3804,6 +3816,10 @@ export class AgentActivity implements RecognitionHooks {
const unlock = await this.lock.lock();

try {
if (this.closed) {
return undefined;
}

const span = tracer.startSpan({
name: 'pause_agent_activity',
attributes: { [traceTypes.ATTR_AGENT_LABEL]: this.agent.id },
Expand Down Expand Up @@ -3884,6 +3900,11 @@ export class AgentActivity implements RecognitionHooks {
async close(): Promise<void> {
const unlock = await this.lock.lock();
try {
if (this.closed) {
return;
}
this.closed = true;

this.cancelPreemptiveGeneration();

await cancelAndWait(Array.from(this.speechTasks), AgentActivity.REPLY_TASK_CANCEL_TIMEOUT);
Expand Down
5 changes: 5 additions & 0 deletions agents/src/voice/agent_session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,11 @@ export class AgentSession<
let reusableResources: ReusableResources | undefined;

try {
if (this.closing && newActivity === 'start') {
this.logger.warn({ agentId: agent.id }, 'Session is closing, skipping start of activity');
return;
}

this.agent = agent;
const prevActivityObj = this.activity;

Expand Down
17 changes: 6 additions & 11 deletions agents/src/voice/agent_session_handoff.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,18 +223,14 @@ describe('AgentSession reusable resources handoff', () => {
expect(item.newAgentId).toBe(nextAgent.id);
});

it('skips starting a new activity while the session is closing and cleans up resources', async () => {
const closeFn = vi.fn(async () => {});
const resources: ReusableResources = {
sttPipeline: { close: closeFn } as any,
};
it('skips starting a new activity while the session is closing', async () => {
const previousAgent = new Agent({ instructions: 'old' });
const nextAgent = new Agent({ instructions: 'new' });
const previousActivity = {
agent: previousAgent,
drain: vi.fn(async () => resources),
drain: vi.fn(async () => undefined),
close: vi.fn(async () => {}),
pause: vi.fn(async () => resources),
pause: vi.fn(async () => undefined),
};

const startSpy = vi.spyOn(AgentActivity.prototype, 'start').mockResolvedValue(undefined);
Expand All @@ -249,11 +245,10 @@ describe('AgentSession reusable resources handoff', () => {
waitOnEnter: false,
});

expect(previousActivity.drain).toHaveBeenCalledTimes(1);
expect(previousActivity.close).toHaveBeenCalledTimes(1);
expect(closeFn).toHaveBeenCalledTimes(1);
expect(previousActivity.drain).not.toHaveBeenCalled();
expect(previousActivity.close).not.toHaveBeenCalled();
expect(startSpy).not.toHaveBeenCalled();
expect((session as any).activity).toBeUndefined();
expect((session as any).activity).toBe(previousActivity);
expect((session as any).nextActivity).toBeUndefined();
} finally {
startSpy.mockRestore();
Expand Down