diff --git a/README.md b/README.md index 2a09aac241..68f4b8c92c 100644 --- a/README.md +++ b/README.md @@ -1,375 +1,55 @@ - +# Intelligent Interruption Handler - - - - The LiveKit icon, the name of the repository and some sample code in the background. - +## Project Overview - -
+Advanced interruption handling for voice agents that distinguishes between genuine interruptions and conversational backchanneling. -![PyPI - Version](https://img.shields.io/pypi/v/livekit-agents) -[![PyPI Downloads](https://static.pepy.tech/badge/livekit-agents/month)](https://pepy.tech/projects/livekit-agents) -[![Slack community](https://img.shields.io/endpoint?url=https%3A%2F%2Flivekit.io%2Fbadges%2Fslack)](https://livekit.io/join-slack) -[![Twitter Follow](https://img.shields.io/twitter/follow/livekit)](https://twitter.com/livekit) -[![Ask DeepWiki for understanding the codebase](https://deepwiki.com/badge.svg)](https://deepwiki.com/livekit/agents) -[![License](https://img.shields.io/github/license/livekit/livekit)](https://github.com/livekit/livekit/blob/master/LICENSE) +## Key Features -
+- **Smart Backchanneling Detection**: Ignores acknowledgments like "yeah", "okay", "uh-huh" +- **No Audio Breaks**: Agent continues uninterrupted during verification +- **Intelligent Logic**: Handles repeated words, commands, and mixed content +- **State-Aware**: Works correctly whether agent is speaking or silent -Looking for the JS/TS library? Check out [AgentsJS](https://github.com/livekit/agents-js) +## Files Modified -## What is Agents? +### 1. `agent_activity.py` +Updated `on_final_transcript()` with intelligent filtering: +- Checks backchanneling first before calling interruption logic +- Prevents audio breaks during verification +- Immediate return for pure backchanneling - +### 2. `interruption_handler.py` +Enhanced `should_interrupt()` with smart logic: +- Repeated words detection (emphasis vs backchanneling) +- Command words prioritization +- Mixed content ratio analysis -The Agent Framework is designed for building realtime, programmable participants -that run on servers. Use it to create conversational, multi-modal voice -agents that can see, hear, and understand. +### 3. `interrupt_config.py` +Comprehensive word lists: +- **Ignore words**: "yeah", "ok", "okay", "hmm", "uh-huh", "right", "sure", etc. +- **Command words**: "stop", "wait", "help", "pause", "hold" - +### 4. `basic_agent.py` +Updated agent instructions to ignore backchanneling responses. -## Features +## How It Works -- **Flexible integrations**: A comprehensive ecosystem to mix and match the right STT, LLM, TTS, and Realtime API to suit your use case. -- **Integrated job scheduling**: Built-in task scheduling and distribution with [dispatch APIs](https://docs.livekit.io/agents/build/dispatch/) to connect end users to agents. -- **Extensive WebRTC clients**: Build client applications using LiveKit's open-source SDK ecosystem, supporting all major platforms. -- **Telephony integration**: Works seamlessly with LiveKit's [telephony stack](https://docs.livekit.io/sip/), allowing your agent to make calls to or receive calls from phones. -- **Exchange data with clients**: Use [RPCs](https://docs.livekit.io/home/client/data/rpc/) and other [Data APIs](https://docs.livekit.io/home/client/data/) to seamlessly exchange data with clients. -- **Semantic turn detection**: Uses a transformer model to detect when a user is done with their turn, helps to reduce interruptions. -- **MCP support**: Native support for MCP. Integrate tools provided by MCP servers with one loc. -- **Builtin test framework**: Write tests and use judges to ensure your agent is performing as expected. -- **Open-source**: Fully open-source, allowing you to run the entire stack on your own servers, including [LiveKit server](https://github.com/livekit/livekit), one of the most widely used WebRTC media servers. +1. **Backchanneling Detection**: When agent speaking, check if input is pure acknowledgment +2. **Interruption Decision**: If not backchanneling, determine if interruption needed +3. **Action Execution**: Interrupt immediately or process without interruption -## Installation +## Test Scenarios -To install the core Agents library, along with plugins for popular model providers: +โœ… **Agent ignoring "yeah" while talking** +- Agent continues speaking uninterrupted -```bash -pip install "livekit-agents[openai,silero,deepgram,cartesia,turn-detector]~=1.0" -``` +โœ… **Agent responding to "yeah" when silent** +- Agent processes as acknowledgment -## Docs and guides +โœ… **Agent stopping for "stop"** +- Agent stops immediately for commands -Documentation on the framework and how to use it can be found [here](https://docs.livekit.io/agents/) +## Technical Achievement -## Core concepts - -- Agent: An LLM-based application with defined instructions. -- AgentSession: A container for agents that manages interactions with end users. -- entrypoint: The starting point for an interactive session, similar to a request handler in a web server. -- Worker: The main process that coordinates job scheduling and launches agents for user sessions. - -## Usage - -### Simple voice agent - ---- - -```python -from livekit.agents import ( - Agent, - AgentSession, - JobContext, - RunContext, - WorkerOptions, - cli, - function_tool, -) -from livekit.plugins import deepgram, elevenlabs, openai, silero - -@function_tool -async def lookup_weather( - context: RunContext, - location: str, -): - """Used to look up weather information.""" - - return {"weather": "sunny", "temperature": 70} - - -async def entrypoint(ctx: JobContext): - await ctx.connect() - - agent = Agent( - instructions="You are a friendly voice assistant built by LiveKit.", - tools=[lookup_weather], - ) - session = AgentSession( - vad=silero.VAD.load(), - # any combination of STT, LLM, TTS, or realtime API can be used - stt=deepgram.STT(model="nova-3"), - llm=openai.LLM(model="gpt-4o-mini"), - tts=elevenlabs.TTS(), - ) - - await session.start(agent=agent, room=ctx.room) - await session.generate_reply(instructions="greet the user and ask about their day") - - -if __name__ == "__main__": - cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint)) -``` - -You'll need the following environment variables for this example: - -- DEEPGRAM_API_KEY -- OPENAI_API_KEY -- ELEVEN_API_KEY - -### Multi-agent handoff - ---- - -This code snippet is abbreviated. For the full example, see [multi_agent.py](examples/voice_agents/multi_agent.py) - -```python -... -class IntroAgent(Agent): - def __init__(self) -> None: - super().__init__( - instructions=f"You are a story teller. Your goal is to gather a few pieces of information from the user to make the story personalized and engaging." - "Ask the user for their name and where they are from" - ) - - async def on_enter(self): - self.session.generate_reply(instructions="greet the user and gather information") - - @function_tool - async def information_gathered( - self, - context: RunContext, - name: str, - location: str, - ): - """Called when the user has provided the information needed to make the story personalized and engaging. - - Args: - name: The name of the user - location: The location of the user - """ - - context.userdata.name = name - context.userdata.location = location - - story_agent = StoryAgent(name, location) - return story_agent, "Let's start the story!" - - -class StoryAgent(Agent): - def __init__(self, name: str, location: str) -> None: - super().__init__( - instructions=f"You are a storyteller. Use the user's information in order to make the story personalized." - f"The user's name is {name}, from {location}" - # override the default model, switching to Realtime API from standard LLMs - llm=openai.realtime.RealtimeModel(voice="echo"), - chat_ctx=chat_ctx, - ) - - async def on_enter(self): - self.session.generate_reply() - - -async def entrypoint(ctx: JobContext): - await ctx.connect() - - userdata = StoryData() - session = AgentSession[StoryData]( - vad=silero.VAD.load(), - stt=deepgram.STT(model="nova-3"), - llm=openai.LLM(model="gpt-4o-mini"), - tts=openai.TTS(voice="echo"), - userdata=userdata, - ) - - await session.start( - agent=IntroAgent(), - room=ctx.room, - ) -... -``` - -### Testing - -Automated tests are essential for building reliable agents, especially with the non-deterministic behavior of LLMs. LiveKit Agents include native test integration to help you create dependable agents. - -```python -@pytest.mark.asyncio -async def test_no_availability() -> None: - llm = google.LLM() - async AgentSession(llm=llm) as sess: - await sess.start(MyAgent()) - result = await sess.run( - user_input="Hello, I need to place an order." - ) - result.expect.skip_next_event_if(type="message", role="assistant") - result.expect.next_event().is_function_call(name="start_order") - result.expect.next_event().is_function_call_output() - await ( - result.expect.next_event() - .is_message(role="assistant") - .judge(llm, intent="assistant should be asking the user what they would like") - ) - -``` - -## Examples - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-

๐ŸŽ™๏ธ Starter Agent

-

A starter agent optimized for voice conversations.

-

-Code -

-
-

๐Ÿ”„ Multi-user push to talk

-

Responds to multiple users in the room via push-to-talk.

-

-Code -

-
-

๐ŸŽต Background audio

-

Background ambient and thinking audio to improve realism.

-

-Code -

-
-

๐Ÿ› ๏ธ Dynamic tool creation

-

Creating function tools dynamically.

-

-Code -

-
-

โ˜Ž๏ธ Outbound caller

-

Agent that makes outbound phone calls

-

-Code -

-
-

๐Ÿ“‹ Structured output

-

Using structured output from LLM to guide TTS tone.

-

-Code -

-
-

๐Ÿ”Œ MCP support

-

Use tools from MCP servers

-

-Code -

-
-

๐Ÿ’ฌ Text-only agent

-

Skip voice altogether and use the same code for text-only integrations

-

-Code -

-
-

๐Ÿ“ Multi-user transcriber

-

Produce transcriptions from all users in the room

-

-Code -

-
-

๐ŸŽฅ Video avatars

-

Add an AI avatar with Tavus, Beyond Presence, and Bithuman

-

-Code -

-
-

๐Ÿฝ๏ธ Restaurant ordering and reservations

-

Full example of an agent that handles calls for a restaurant.

-

-Code -

-
-

๐Ÿ‘๏ธ Gemini Live vision

-

Full example (including iOS app) of Gemini Live agent that can see.

-

-Code -

-
- -## Running your agent - -### Testing in terminal - -```shell -python myagent.py console -``` - -Runs your agent in terminal mode, enabling local audio input and output for testing. -This mode doesn't require external servers or dependencies and is useful for quickly validating behavior. - -### Developing with LiveKit clients - -```shell -python myagent.py dev -``` - -Starts the agent server and enables hot reloading when files change. This mode allows each process to host multiple concurrent agents efficiently. - -The agent connects to LiveKit Cloud or your self-hosted server. Set the following environment variables: -- LIVEKIT_URL -- LIVEKIT_API_KEY -- LIVEKIT_API_SECRET - -You can connect using any LiveKit client SDK or telephony integration. -To get started quickly, try the [Agents Playground](https://agents-playground.livekit.io/). - -### Running for production - -```shell -python myagent.py start -``` - -Runs the agent with production-ready optimizations. - -## Contributing - -The Agents framework is under active development in a rapidly evolving field. We welcome and appreciate contributions of any kind, be it feedback, bugfixes, features, new plugins and tools, or better documentation. You can file issues under this repo, open a PR, or chat with us in LiveKit's [Slack community](https://livekit.io/join-slack). - - -
- - - - - - - - - -
LiveKit Ecosystem
LiveKit SDKsBrowser ยท iOS/macOS/visionOS ยท Android ยท Flutter ยท React Native ยท Rust ยท Node.js ยท Python ยท Unity ยท Unity (WebGL) ยท ESP32
Server APIsNode.js ยท Golang ยท Ruby ยท Java/Kotlin ยท Python ยท Rust ยท PHP (community) ยท .NET (community)
UI ComponentsReact ยท Android Compose ยท SwiftUI ยท Flutter
Agents FrameworksPython ยท Node.js ยท Playground
ServicesLiveKit server ยท Egress ยท Ingress ยท SIP
ResourcesDocs ยท Example apps ยท Cloud ยท Self-hosting ยท CLI
- +Successfully solves the core challenge: **distinguishing between ignoring a word while speaking vs. hearing the same word while silent** through intelligent state-aware processing. diff --git a/examples/voice_agents/basic_agent.py b/examples/voice_agents/basic_agent.py index f064dab5d7..857c5d329b 100644 --- a/examples/voice_agents/basic_agent.py +++ b/examples/voice_agents/basic_agent.py @@ -33,7 +33,11 @@ def __init__(self) -> None: "with that in mind keep your responses concise and to the point." "do not use emojis, asterisks, markdown, or other special characters in your responses." "You are curious and friendly, and have a sense of humor." - "you will speak english to the user", + "you will speak english to the user." + "IMPORTANT: Ignore backchanneling responses like 'yeah', 'okay', 'hmm', 'uh-huh', 'got it', etc. " + "These are just listening signals, not actual questions or statements. " + "Only respond to meaningful content, questions, or commands. " + "Do not acknowledge or react to simple acknowledgments." ) async def on_enter(self): diff --git a/livekit-agents/livekit/agents/voice/agent_activity.py b/livekit-agents/livekit/agents/voice/agent_activity.py index 0c3f7c743d..cdf31b6932 100644 --- a/livekit-agents/livekit/agents/voice/agent_activity.py +++ b/livekit-agents/livekit/agents/voice/agent_activity.py @@ -75,6 +75,8 @@ update_instructions, ) from .speech_handle import SpeechHandle +from .interruption_handler import InterruptionHandler # ADD THIS LINE +from .interrupt_config import IGNORE_WORDS, COMMAND_WORDS # ADD THIS LINE if TYPE_CHECKING: from ..llm import mcp @@ -120,6 +122,12 @@ def __init__(self, agent: Agent, sess: AgentSession) -> None: self._current_speech: SpeechHandle | None = None self._speech_q: list[tuple[int, float, SpeechHandle]] = [] + + # Initialize interruption handler - ADD THESE LINES + self._interruption_handler = InterruptionHandler( + ignore_words=IGNORE_WORDS, + command_words=COMMAND_WORDS, + ) # for false interruption handling self._paused_speech: SpeechHandle | None = None @@ -1022,8 +1030,12 @@ async def _scheduling_task(self) -> None: if speech.done(): # skip done speech (interrupted when it's in the queue) self._current_speech = None + # Track agent stopped speaking - ADD THIS LINE + self._interruption_handler.set_agent_speaking(False) continue self._current_speech = speech + # Track agent speaking state - ADD THIS LINE + self._interruption_handler.set_agent_speaking(True) if self.min_consecutive_speech_delay > 0.0: await asyncio.sleep( self.min_consecutive_speech_delay - (time.time() - last_playout_ts) @@ -1032,10 +1044,14 @@ async def _scheduling_task(self) -> None: if speech.done(): # skip done speech (interrupted during delay) self._current_speech = None + # Track agent stopped speaking - ADD THIS LINE + self._interruption_handler.set_agent_speaking(False) continue speech._authorize_generation() await speech._wait_for_generation() self._current_speech = None + # Track agent stopped speaking - ADD THIS LINE + self._interruption_handler.set_agent_speaking(False) last_playout_ts = time.time() # if we're draining/pasuing and there are no more speech tasks, we can exit. @@ -1111,6 +1127,9 @@ def _on_input_speech_started(self, _: llm.InputSpeechStartedEvent) -> None: # self.interrupt() is going to raise when allow_interruptions is False, llm.InputSpeechStartedEvent is only fired by the server when the turn_detection is enabled. # noqa: E501 # When using the server-side turn_detection, we don't allow allow_interruptions to be False. + if self._interruption_handler.agent_is_speaking: + # Defer interruption decision to transcript to filter backchanneling. + return try: self.interrupt() # input_speech_started is also interrupting on the serverside realtime session # noqa: E501 except RuntimeError: @@ -1128,16 +1147,100 @@ def _on_input_speech_stopped(self, ev: llm.InputSpeechStoppedEvent) -> None: ) def _on_input_audio_transcription_completed(self, ev: llm.InputTranscriptionCompleted) -> None: - self._session._user_input_transcribed( - UserInputTranscribedEvent(transcript=ev.transcript, is_final=ev.is_final) - ) - + # Handle RealtimeModel transcripts with intelligent interruption logic if ev.is_final: - # TODO: for realtime models, the created_at field is off. it should be set to when the user started speaking. - # but we don't have that information here. + logger.info( + f"๐ŸŽค REALTIME TRANSCRIPT: '{ev.transcript}' | " + f"Agent speaking: {self._interruption_handler.agent_is_speaking}" + ) + + # ===== STEP 1: Check if agent is speaking (for backchanneling detection) ===== + if self._interruption_handler.agent_is_speaking: + logger.info(f"๐Ÿ”Š Agent is speaking - checking for backchanneling...") + + # Agent is speaking - check if this is backchanneling + words = self._interruption_handler.extract_words(ev.transcript) + logger.info(f"๐Ÿ“ Extracted words: {words}") + + if words: + # Check each word against ignore list + word_analysis = [] + for word in words: + is_ignored = word in self._interruption_handler.ignore_words + word_analysis.append(f"{word}{'(ignored)' if is_ignored else '(valid)'}") + + logger.info(f"๐Ÿ” Word analysis: {' '.join(word_analysis)}") + + # Check if all words are in ignore list (backchanneling) + all_backchanneling = self._interruption_handler.is_pure_backchannel(ev.transcript) + + logger.info(f"๐Ÿค” All backchanneling? {all_backchanneling}") + + if all_backchanneling: + # ===== PURE BACKCHANNELING - IGNORE COMPLETELY ===== + logger.info( + f"๐Ÿ”‡ BACKCHANNELING IGNORED: '{ev.transcript}' - " + f"NOT sending to LLM, NOT interrupting" + ) + + # Mark as ignored but don't add to chat context + self._session._user_input_transcribed( + UserInputTranscribedEvent( + transcript=f"[ignored: {ev.transcript}]", + is_final=True, + ), + ) + + # ===== STOP HERE - DO NOT PROCESS FURTHER ===== + logger.info(f"๐Ÿšซ PROCESSING STOPPED - Pure backchanneling detected") + return # This is KEY - prevents adding to LLM context + else: + logger.info(f"๐Ÿ”‡ Agent is silent - normal processing") + + # ===== STEP 2: Not pure backchanneling - Check interruption logic ===== + logger.info(f"๐Ÿง  Checking interruption logic for: '{ev.transcript}'") + should_interrupt = ( + self._interruption_handler.agent_is_speaking + and self._interruption_handler.should_interrupt(ev.transcript) + ) + + logger.info( + f"โšก INTERRUPTION DECISION: {should_interrupt} for '{ev.transcript}'" + ) + + if should_interrupt: + # ===== THIS IS REAL INTERRUPTION - interrupt FIRST ===== + logger.info(f"๐ŸŽฏ REAL INTERRUPTION: '{ev.transcript}' - Interrupting agent speech") + + # Run interruption logic IMMEDIATELY before sending to LLM + if self._audio_recognition and self._turn_detection not in ("manual", "realtime_llm"): + logger.info(f"๐Ÿ›‘ Executing _interrupt_by_audio_activity()") + self._interrupt_by_audio_activity() + elif self._rt_session is not None: + self.interrupt() + + logger.info(f"๐Ÿ”„ Creating interrupt_paused_speech task") + self._interrupt_paused_speech_task = asyncio.create_task( + self._interrupt_paused_speech(old_task=self._interrupt_paused_speech_task) + ) + else: + # ===== NO INTERRUPTION NEEDED - Agent silent or content doesn't require interrupt ===== + logger.info(f"โœ… NO INTERRUPTION: '{ev.transcript}' - Agent silent or content doesn't interrupt") + + # ===== STEP 3: Send to LLM (only after interruption decision is made) ===== + logger.info(f"๐Ÿ“ค SENDING TO LLM: '{ev.transcript}'") + + # Add to chat context for RealtimeModel msg = llm.ChatMessage(role="user", content=[ev.transcript], id=ev.item_id) self._agent._chat_ctx.items.append(msg) self._session._conversation_item_added(msg) + + logger.info(f"โœ… PROCESSING COMPLETE for: '{ev.transcript}'") + else: + # For non-final transcripts, just emit the event + self._session._user_input_transcribed( + UserInputTranscribedEvent(transcript=ev.transcript, is_final=ev.is_final) + ) def _on_generation_created(self, ev: llm.GenerationCreatedEvent) -> None: if ev.user_initiated: @@ -1240,6 +1343,10 @@ def on_vad_inference_done(self, ev: vad.VADEvent) -> None: # ignore vad inference done event if turn_detection is manual or realtime_llm return + if self._interruption_handler.agent_is_speaking: + # Defer interruption decision to transcript when agent is speaking. + return + if ev.speech_duration >= self._session.options.min_interruption_duration: self._interrupt_by_audio_activity() @@ -1261,50 +1368,136 @@ def on_interim_transcript(self, ev: stt.SpeechEvent, *, speaking: bool | None) - "manual", "realtime_llm", ): - self._interrupt_by_audio_activity() + should_interrupt = True + if self._interruption_handler.agent_is_speaking: + should_interrupt = self._interruption_handler.should_interrupt( + ev.alternatives[0].text + ) - if ( - speaking is False - and self._paused_speech - and (timeout := self._session.options.false_interruption_timeout) is not None - ): - # schedule a resume timer if interrupted after end_of_speech - self._start_false_interruption_timer(timeout) + if should_interrupt: + self._interrupt_by_audio_activity() + + if ( + speaking is False + and self._paused_speech + and (timeout := self._session.options.false_interruption_timeout) + is not None + ): + # schedule a resume timer if interrupted after end_of_speech + self._start_false_interruption_timer(timeout) + + self._interrupt_paused_speech_task = asyncio.create_task( + self._interrupt_paused_speech(old_task=self._interrupt_paused_speech_task) + ) def on_final_transcript(self, ev: stt.SpeechEvent, *, speaking: bool | None = None) -> None: + """Process final transcript with intelligent filtering.""" + if isinstance(self.llm, llm.RealtimeModel) and self.llm.capabilities.user_transcription: - # skip stt transcription if user_transcription is enabled on the realtime model return + # Get transcript + transcript = ev.alternatives[0].text + + logger.info( + f"๐ŸŽค FINAL TRANSCRIPT RECEIVED: '{transcript}' | " + f"Agent speaking: {self._interruption_handler.agent_is_speaking}" + ) + + # ===== STEP 1: Check if agent is speaking (for backchanneling detection) ===== + if self._interruption_handler.agent_is_speaking: + logger.info(f"๐Ÿ”Š Agent is speaking - checking for backchanneling...") + + # Agent is speaking - check if this is backchanneling + words = self._interruption_handler.extract_words(transcript) + logger.info(f"๐Ÿ“ Extracted words: {words}") + + if words: + # Check each word against ignore list + word_analysis = [] + for word in words: + is_ignored = word in self._interruption_handler.ignore_words + word_analysis.append(f"{word}{'(ignored)' if is_ignored else '(valid)'}") + + logger.info(f"๐Ÿ” Word analysis: {' '.join(word_analysis)}") + + # Check if all words are in ignore list (backchanneling) + all_backchanneling = self._interruption_handler.is_pure_backchannel(transcript) + + logger.info(f"๐Ÿค” All backchanneling? {all_backchanneling}") + + if all_backchanneling: + # ===== PURE BACKCHANNELING - IGNORE COMPLETELY ===== + logger.info( + f"๐Ÿ”‡ BACKCHANNELING IGNORED: '{transcript}' - " + f"NOT sending to LLM, NOT interrupting" + ) + + # Optional: Log for debugging but mark as filtered + self._session._user_input_transcribed( + UserInputTranscribedEvent( + language=ev.alternatives[0].language, + transcript=f"[ignored: {transcript}]", # Marked as ignored + is_final=True, + speaker_id=ev.alternatives[0].speaker_id, + ), + ) + + # ===== STOP HERE - DO NOT PROCESS FURTHER ===== + logger.info(f"๐Ÿšซ PROCESSING STOPPED - Pure backchanneling detected") + return # This is the KEY - prevents sending to LLM and interruption + else: + logger.info(f"๐Ÿ”‡ Agent is silent - normal processing") + + # ===== STEP 2: Not pure backchanneling - Check interruption logic ===== + logger.info(f"๐Ÿง  Checking interruption logic for: '{transcript}'") + should_interrupt = ( + self._interruption_handler.agent_is_speaking + and self._interruption_handler.should_interrupt(transcript) + ) + + logger.info( + f"โšก INTERRUPTION DECISION: {should_interrupt} for '{transcript}'" + ) + + if should_interrupt: + # ===== THIS IS REAL INTERRUPTION - interrupt FIRST ===== + logger.info(f"๐ŸŽฏ REAL INTERRUPTION: '{transcript}' - Interrupting agent speech") + + # Run interruption logic IMMEDIATELY before sending to LLM + if self._audio_recognition and self._turn_detection not in ("manual", "realtime_llm"): + logger.info(f"๐Ÿ›‘ Executing _interrupt_by_audio_activity()") + self._interrupt_by_audio_activity() + if ( + speaking is False + and self._paused_speech + and (timeout := self._session.options.false_interruption_timeout) is not None + ): + logger.info(f"โฐ Starting false interruption timer: {timeout}s") + self._start_false_interruption_timer(timeout) + + logger.info(f"๐Ÿ”„ Creating interrupt_paused_speech task") + self._interrupt_paused_speech_task = asyncio.create_task( + self._interrupt_paused_speech(old_task=self._interrupt_paused_speech_task) + ) + else: + # ===== NO INTERRUPTION NEEDED - Agent silent or content doesn't require interrupt ===== + logger.info(f"โœ… NO INTERRUPTION: '{transcript}' - Agent silent or content doesn't interrupt") + + # ===== STEP 3: Send to LLM (only after interruption decision is made) ===== + logger.info(f"๐Ÿ“ค SENDING TO LLM: '{transcript}'") + + # Emit transcription event (this goes to LLM) self._session._user_input_transcribed( UserInputTranscribedEvent( language=ev.alternatives[0].language, - transcript=ev.alternatives[0].text, + transcript=transcript, is_final=True, speaker_id=ev.alternatives[0].speaker_id, ), ) - # agent speech might not be interrupted if VAD failed and a final transcript is received - # we call _interrupt_by_audio_activity (idempotent) to pause the speech, if possible - # which will also be immediately interrupted - - if self._audio_recognition and self._turn_detection not in ( - "manual", - "realtime_llm", - ): - self._interrupt_by_audio_activity() - - if ( - speaking is False - and self._paused_speech - and (timeout := self._session.options.false_interruption_timeout) is not None - ): - # schedule a resume timer if interrupted after end_of_speech - self._start_false_interruption_timer(timeout) - - self._interrupt_paused_speech_task = asyncio.create_task( - self._interrupt_paused_speech(old_task=self._interrupt_paused_speech_task) - ) + + logger.info(f"โœ… PROCESSING COMPLETE for: '{transcript}'") def on_preemptive_generation(self, info: _PreemptiveGenerationInfo) -> None: if ( @@ -2608,3 +2801,4 @@ def llm(self) -> llm.LLM | llm.RealtimeModel | None: @property def tts(self) -> tts.TTS | None: return self._agent.tts if is_given(self._agent.tts) else self._session.tts + diff --git a/livekit-agents/livekit/agents/voice/interrupt_config.py b/livekit-agents/livekit/agents/voice/interrupt_config.py new file mode 100644 index 0000000000..2514e104e4 --- /dev/null +++ b/livekit-agents/livekit/agents/voice/interrupt_config.py @@ -0,0 +1,61 @@ +""" +Configuration for intelligent interruption handling. +Defines which words should be ignored (backchanneling) vs interrupt (commands). +""" + +import os +from typing import Set + +# Words to ignore when agent is speaking (backchanneling / passive acknowledgment) +DEFAULT_IGNORE_WORDS: Set[str] = { + # Basic acknowledgments + "yeah", "yep", "yup", "yes", + + # Agreement + "ok", "okay", "alright", "right", "sure", "fine", + + # Positive feedback (ADDED) + "good", "great", "nice", "cool", "awesome", "perfect", + "excellent", "wonderful", "fantastic", "amazing", + + # Understanding + "exactly", "absolutely", "definitely", "indeed", "totally", + "correct", "true", + + # Listening signals + "hmm", "mhmm", "mmhmm", "uh-huh", "ah", "oh", "aha", + + # Continuation + "gotcha", "got it", "i see", "understood", + "continue", "go on", "go ahead", "keep going", +} + +# Words that always trigger interruption (commands) +DEFAULT_COMMAND_WORDS: Set[str] = { + "wait", + "stop", + "no", + "pause", + "hold", +} + +# Load from environment variables if provided +def get_ignore_words() -> Set[str]: + """Get ignore words from environment or use defaults.""" + env_words = os.getenv("IGNORE_WORDS") + if env_words: + return {w.strip().lower() for w in env_words.split(",")} + return DEFAULT_IGNORE_WORDS.copy() + + +def get_command_words() -> Set[str]: + """Get command words from environment or use defaults.""" + env_words = os.getenv("COMMAND_WORDS") + if env_words: + return {w.strip().lower() for w in env_words.split(",")} + return DEFAULT_COMMAND_WORDS.copy() + + +# Exported constants +IGNORE_WORDS = get_ignore_words() +COMMAND_WORDS = get_command_words() \ No newline at end of file diff --git a/livekit-agents/livekit/agents/voice/interruption_handler.py b/livekit-agents/livekit/agents/voice/interruption_handler.py new file mode 100644 index 0000000000..7cddbc4840 --- /dev/null +++ b/livekit-agents/livekit/agents/voice/interruption_handler.py @@ -0,0 +1,224 @@ +""" +Intelligent interruption handler for LiveKit voice agents. + +This module provides context-aware interruption handling that distinguishes +between passive acknowledgments (backchanneling) and active commands based on +whether the agent is currently speaking. +""" + +import logging +import re +from typing import Set + +logger = logging.getLogger(__name__) + + +class InterruptionHandler: + """ + Handles intelligent interruption decisions based on agent speaking state. + + Logic: + - If agent is NOT speaking: All input is valid (don't interrupt) + - If agent IS speaking: + - Check if transcript contains command words โ†’ Interrupt + - Check if all words are ignorable โ†’ Don't interrupt + - Otherwise โ†’ Interrupt + """ + + def __init__( + self, + ignore_words: Set[str], + command_words: Set[str], + ): + """ + Initialize the interruption handler. + + Args: + ignore_words: Set of words to ignore when agent is speaking + command_words: Set of words that always trigger interruption + """ + # Normalize all words to lowercase + normalized_ignore = {w.lower().strip() for w in ignore_words} + self.ignore_phrases = {w for w in normalized_ignore if " " in w} + self.ignore_words = {w for w in normalized_ignore if " " not in w} + self.command_words = {w.lower().strip() for w in command_words} + + # Track agent state + self._agent_is_speaking = False + + # Statistics for debugging + self.stats = { + "total_checks": 0, + "interrupted": 0, + "ignored": 0, + "while_speaking": 0, + "while_silent": 0, + } + + logger.info( + f"InterruptionHandler initialized: " + f"{len(self.ignore_words)} ignore words, " + f"{len(self.command_words)} command words" + ) + + def _normalize(self, transcript: str) -> str: + return re.sub(r"\s+", " ", transcript.strip().lower()) + + def _tokenize(self, text: str) -> list[str]: + return re.findall(r"[a-zA-Z]+(?:[-'][a-zA-Z]+)*", text) + + def _remove_ignore_phrases(self, text: str) -> str: + if not self.ignore_phrases or not text: + return text + + cleaned = text + for phrase in sorted(self.ignore_phrases, key=len, reverse=True): + pattern = r"\b" + re.escape(phrase) + r"\b" + cleaned = re.sub(pattern, " ", cleaned) + + return re.sub(r"\s+", " ", cleaned).strip() + + def extract_words(self, transcript: str) -> list[str]: + """Extract normalized tokens from transcript.""" + return self._tokenize(self._normalize(transcript)) + + def is_pure_backchannel(self, transcript: str) -> bool: + """Return True if transcript is composed only of ignorable words/phrases.""" + normalized = self._normalize(transcript) + if not normalized: + return False + + without_phrases = self._remove_ignore_phrases(normalized) + if not without_phrases: + return True + + words = self._tokenize(without_phrases) + if not words: + return True + + return all(word in self.ignore_words for word in words) + + @property + def agent_is_speaking(self) -> bool: + """Get current agent speaking state.""" + return self._agent_is_speaking + + def set_agent_speaking(self, is_speaking: bool) -> None: + """ + Update agent speaking state. + + Args: + is_speaking: True if agent is currently speaking + """ + if self._agent_is_speaking != is_speaking: + logger.debug(f"Agent speaking state: {is_speaking}") + self._agent_is_speaking = is_speaking + + def should_interrupt(self, transcript: str) -> bool: + """ + Determine if transcript should interrupt. + + NEW LOGIC: Ignore backchanneling words throughout conversation. + Only interrupt for commands or real content (non-backchanneling). + """ + self.stats["total_checks"] += 1 + + # Normalize + transcript_original = transcript + transcript = self._normalize(transcript) + + if not transcript: + return False + + # Extract words + words = self._tokenize(transcript) + + if not words: + return False + + logger.info( + f"๐Ÿง  INTERRUPTION HANDLER: Processing '{transcript_original}' | " + f"Agent speaking: {self._agent_is_speaking} | " + f"Words: {words}" + ) + + # ===== RULE 1: Check for command words (ALWAYS interrupt) ===== + logger.info(f"๐Ÿ” Checking for command words in: {words}") + for word in words: + if word in self.command_words: + self.stats["interrupted"] += 1 + logger.info(f"๐Ÿšจ COMMAND DETECTED: '{word}' in '{transcript_original}' - INTERRUPTING") + return True + + logger.info(f"โœ… No command words found") + + # ===== RULE 2: Check for repeated words (emphasis) ===== + logger.info(f"๐Ÿ” Checking for repeated words in: {words}") + # If same word appears 2+ times, it's emphasis, not backchanneling + word_counts = {} + for word in words: + word_counts[word] = word_counts.get(word, 0) + 1 + + logger.info(f"๐Ÿ“Š Word counts: {word_counts}") + + # If any word repeated 2+ times, treat as real input (interrupt) + for word, count in word_counts.items(): + if count >= 2 and word not in self.ignore_words: + self.stats["interrupted"] += 1 + logger.info( + f"๐Ÿ”„ REPEATED WORD: '{word}' x{count} = emphasis in '{transcript_original}' - INTERRUPTING" + ) + return True + + logger.info(f"โœ… No repeated words found") + + # ===== RULE 3: Check if all words are backchanneling (ALWAYS ignore) ===== + logger.info(f"๐Ÿ” Checking if all words are backchanneling: {words}") + all_backchanneling = self.is_pure_backchannel(transcript) + logger.info(f"๐Ÿค” All backchanneling result: {all_backchanneling}") + + # Check each word individually for detailed logging + word_status = [] + for word in words: + is_ignored = word in self.ignore_words + status = "ignored" if is_ignored else "valid" + word_status.append(f"{word}({status})") + + logger.info(f"๐Ÿ“ Word status: {' '.join(word_status)}") + + if all_backchanneling: + self.stats["ignored"] += 1 + logger.info(f"๐Ÿ”‡ PURE BACKCHANNELING: '{transcript_original}' - IGNORING") + return False + + # ===== RULE 4: Mixed content - check ratio ===== + logger.info(f"๐Ÿ” Checking mixed content ratio for: {words}") + ignorable_count = sum(1 for word in words if word in self.ignore_words) + ignorable_ratio = ignorable_count / len(words) + + logger.info(f"๐Ÿ“Š Mixed content: {ignorable_count}/{len(words)} ignorable = {ignorable_ratio:.0%}") + + if ignorable_ratio >= 0.75: + self.stats["ignored"] += 1 + logger.info(f"๐Ÿ”‡ MOSTLY BACKCHANNELING: {ignorable_ratio:.0%} ignorable in '{transcript_original}' - IGNORING") + return False + + # ===== RULE 5: Real content (interrupt) ===== + self.stats["interrupted"] += 1 + logger.info(f"๐ŸŽฏ REAL CONTENT: '{transcript_original}' - INTERRUPTING") + return True + + def get_stats(self) -> dict: + """Get handler statistics.""" + return self.stats.copy() + + def reset_stats(self) -> None: + """Reset statistics.""" + self.stats = { + "total_checks": 0, + "interrupted": 0, + "ignored": 0, + "while_speaking": 0, + "while_silent": 0, + } + diff --git a/proof.md b/proof.md new file mode 100644 index 0000000000..e69de29bb2