diff --git a/wavefront/server/apps/call_processing/call_processing/services/conversation_completion_tool.py b/wavefront/server/apps/call_processing/call_processing/services/conversation_completion_tool.py new file mode 100644 index 00000000..c7f393b8 --- /dev/null +++ b/wavefront/server/apps/call_processing/call_processing/services/conversation_completion_tool.py @@ -0,0 +1,88 @@ +""" +Conversation Completion Tool for Voice Agents + +Provides LLM-callable conversation ending capabilities +""" + +from typing import Dict, Any, Callable +from pipecat.services.llm_service import FunctionCallParams +from pipecat.frames.frames import TTSSpeakFrame, EndTaskFrame +from pipecat.processors.frame_processor import FrameDirection +from call_processing.log.logger import logger + + +class ConversationCompletionToolFactory: + """Factory for creating conversation completion tool with runtime context""" + + @staticmethod + def create_conversation_completion_tool( + task_container: Dict[str, Any], + ) -> Callable: + """ + Create conversation completion tool function with captured context + + Args: + task_container: Dictionary containing PipelineTask (populated after task creation) + Format: {'task': PipelineTask | None} + + Returns: + Async function compatible with Pipecat's function calling + """ + + async def end_conversation(params: FunctionCallParams): + """ + LLM-callable function to end the conversation gracefully + + This function is called by the LLM when it determines the user + wants to end the conversation. It sends a farewell message and + terminates the pipeline. + + Parameters (from LLM): + farewell_message: str - Optional custom farewell message + (defaults to standard goodbye) + """ + try: + # Get task from container + task = task_container.get('task') + if not task: + error_msg = ( + 'Pipeline task not initialized in conversation completion tool' + ) + logger.error(error_msg) + await params.result_callback({'success': False, 'error': error_msg}) + return + + # Extract parameters + arguments = params.arguments + farewell_message = arguments.get( + 'farewell_message', 'Thank you for using our service! Goodbye!' + ) + + logger.info( + f'Conversation completion tool called - Farewell: "{farewell_message}"' + ) + + # Send farewell message via TTS + await params.llm.push_frame(TTSSpeakFrame(farewell_message)) + + # End the conversation + await params.llm.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM) + + logger.info('Conversation ended by LLM decision') + + # Return success result + await params.result_callback( + { + 'success': True, + 'status': 'complete', + 'farewell_sent': True, + 'farewell_message': farewell_message, + } + ) + + except Exception as e: + error_msg = f'Error ending conversation: {str(e)}' + logger.error(error_msg, exc_info=True) + await params.result_callback({'success': False, 'error': error_msg}) + + return end_conversation diff --git a/wavefront/server/apps/call_processing/call_processing/services/language_detection_tool.py b/wavefront/server/apps/call_processing/call_processing/services/language_detection_tool.py new file mode 100644 index 00000000..275c4e6b --- /dev/null +++ b/wavefront/server/apps/call_processing/call_processing/services/language_detection_tool.py @@ -0,0 +1,222 @@ +""" +Language Detection Tool for Multi-Language Voice Agents + +Provides LLM-callable language detection and switching capabilities +""" + +from typing import Dict, Any, List, Callable +from pipecat.services.llm_service import FunctionCallParams +from pipecat.frames.frames import ManuallySwitchServiceFrame, LLMMessagesUpdateFrame +from call_processing.log.logger import logger +from call_processing.constants.language_config import LANGUAGE_INSTRUCTIONS + + +class LanguageDetectionToolFactory: + """Factory for creating language detection tool with runtime context""" + + @staticmethod + def create_language_detection_tool( + task_container: Dict[str, Any], + stt_services: Dict[str, Any], + tts_services: Dict[str, Any], + context_container: Dict[str, Any], + supported_languages: List[str], + default_language: str, + language_state: Dict[str, Any], + ) -> Callable: + """ + Create language detection tool function with captured context + + Args: + task_container: Dictionary containing PipelineTask (populated after task creation) + Format: {'task': PipelineTask | None} + stt_services: Dictionary mapping language codes to STT services + tts_services: Dictionary mapping language codes to TTS services + context_container: Dictionary containing LLMContext (populated after context creation) + Format: {'context': LLMContext | None} + supported_languages: List of supported language codes + default_language: Default language code + language_state: Dictionary to track current language and switch count + Format: {'current_language': str, 'switch_count': int, 'original_system_prompt': str} + + Returns: + Async function compatible with Pipecat's function calling + """ + + async def detect_and_switch_language(params: FunctionCallParams): + """ + LLM-callable function to detect and switch conversation language + + This function is called by the LLM when it determines the user + wants to switch to a different language. It validates the request, + performs the service switch, and updates the system prompt. + + Parameters (from LLM): + target_language: str - Language code to switch to (e.g., 'es', 'hi', 'en') + user_intent: str - User's stated language preference (for logging) + """ + try: + # Get task and context from containers + task = task_container.get('task') + if not task: + error_msg = ( + 'Pipeline task not initialized in language detection tool' + ) + logger.error(error_msg) + await params.result_callback({'success': False, 'error': error_msg}) + return + + context = context_container.get('context') + if not context: + error_msg = 'LLM context not initialized in language detection tool' + logger.error(error_msg) + await params.result_callback({'success': False, 'error': error_msg}) + return + + # Extract parameters + arguments = params.arguments + target_language = arguments.get('target_language', '').lower() + user_intent = arguments.get('user_intent', 'Unknown') + + current_language = language_state.get( + 'current_language', default_language + ) + switch_count = language_state.get('switch_count', 0) + + logger.info( + f'Language detection tool called - Target: {target_language}, ' + f'Current: {current_language}, User intent: {user_intent}' + ) + + # Validation 1: Check if target language is supported + if target_language not in supported_languages: + error_msg = ( + f"Language '{target_language}' is not supported. " + f"Supported languages: {', '.join(supported_languages)}" + ) + logger.warning(error_msg) + await params.result_callback( + { + 'success': False, + 'error': error_msg, + 'current_language': current_language, + 'supported_languages': supported_languages, + } + ) + return + + # Validation 2: Check if already in target language + if target_language == current_language: + logger.info(f'Already using language: {target_language}') + await params.result_callback( + { + 'success': True, + 'message': f'Already using {target_language}', + 'current_language': current_language, + 'switch_performed': False, + } + ) + return + + # Validation 3: Check if services exist for target language + if ( + target_language not in stt_services + or target_language not in tts_services + ): + error_msg = ( + f'Services not configured for language: {target_language}' + ) + logger.error(error_msg) + await params.result_callback( + { + 'success': False, + 'error': error_msg, + 'current_language': current_language, + } + ) + return + + # Perform language switch + try: + target_stt = stt_services[target_language] + target_tts = tts_services[target_language] + + # Queue service switch frames + await task.queue_frames( + [ + ManuallySwitchServiceFrame(service=target_stt), + ManuallySwitchServiceFrame(service=target_tts), + ] + ) + + logger.info( + f'Switched STT/TTS services from {current_language} to {target_language}' + ) + + # Update system prompt with language instruction + language_instruction = LANGUAGE_INSTRUCTIONS.get( + target_language, + LANGUAGE_INSTRUCTIONS.get('en', 'Respond in English.'), + ) + + # Get base prompt without language instruction (must exist for multi-language) + base_prompt = language_state.get('original_system_prompt') + if not base_prompt: + error_msg = 'Original system prompt not found in language state' + logger.error(error_msg) + await params.result_callback( + {'success': False, 'error': error_msg} + ) + return + + # Append new language instruction to clean base prompt + updated_content = f'{base_prompt}\n\n{language_instruction}' + updated_system_message = { + 'role': 'system', + 'content': updated_content, + } + + # Update context + current_messages = context.get_messages() + new_messages = [updated_system_message] + current_messages[1:] + await task.queue_frame( + LLMMessagesUpdateFrame(new_messages, run_llm=False) + ) + + logger.info( + f'Updated system prompt with {target_language} instruction' + ) + + # Update state + language_state['current_language'] = target_language + language_state['switch_count'] = switch_count + 1 + + # Return success result + await params.result_callback( + { + 'success': True, + 'message': f'Language switched to {target_language}', + 'previous_language': current_language, + 'current_language': target_language, + 'switch_performed': True, + 'switch_count': language_state['switch_count'], + } + ) + + except Exception as e: + error_msg = f'Error switching services: {str(e)}' + logger.error(error_msg, exc_info=True) + await params.result_callback( + { + 'success': False, + 'error': error_msg, + 'current_language': current_language, + } + ) + + except Exception as e: + error_msg = f'Error in language detection tool: {str(e)}' + logger.error(error_msg, exc_info=True) + await params.result_callback({'success': False, 'error': error_msg}) + + return detect_and_switch_language diff --git a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py index ccf23002..6e333cf6 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py @@ -13,12 +13,6 @@ from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 -from pipecat.frames.frames import ( - TTSSpeakFrame, - EndTaskFrame, - ManuallySwitchServiceFrame, - LLMMessagesUpdateFrame, -) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -27,11 +21,8 @@ LLMContextAggregatorPair, LLMUserAggregatorParams, ) -from pipecat.processors.frame_processor import FrameProcessor, FrameDirection -from pipecat.processors.user_idle_processor import UserIdleProcessor from pipecat.processors.transcript_processor import ( TranscriptProcessor, - TranscriptionMessage, ) from pipecat.pipeline.service_switcher import ( ServiceSwitcher, @@ -51,101 +42,17 @@ TurnAnalyzerUserTurnStopStrategy, # TranscriptionUserTurnStopStrategy ) -from pipecat.services.llm_service import FunctionCallParams from call_processing.services.stt_service import STTServiceFactory from call_processing.services.tts_service import TTSServiceFactory from call_processing.services.llm_service import LLMServiceFactory +from call_processing.services.conversation_completion_tool import ( + ConversationCompletionToolFactory, +) from call_processing.constants.language_config import ( - LANGUAGE_KEYWORDS, LANGUAGE_INSTRUCTIONS, ) -# Advanced handler with retry logic -async def handle_user_idle(processor: FrameProcessor, retry_count): - if retry_count == 1: - # First attempt - gentle reminder - await processor.push_frame(TTSSpeakFrame('Are you still there?')) - return True # Continue monitoring - elif retry_count == 2: - # Second attempt - more direct prompt - await processor.push_frame( - TTSSpeakFrame('Would you like to continue our conversation?') - ) - return True # Continue monitoring - else: - # Third attempt - end conversation - await processor.push_frame( - TTSSpeakFrame("I'll leave you for now. Have a nice day!") - ) - await processor.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM) - return False # Stop monitoring - - -async def evaluate_completion_criteria(params: FunctionCallParams): - """ - Check if the last user message contains goodbye-related phrases. - Returns True if goodbye detected, False otherwise. - """ - context = params.context - - # Get the conversation messages - messages = context.get_messages() - - # Find the last user message - last_user_message = None - for message in reversed(messages): - if message.get('role') == 'user': - last_user_message = message.get('content', '').lower() - break - - # If no user message found, conversation is not complete - if not last_user_message: - return False - - # List of goodbye phrases to check - goodbye_phrases = [ - 'goodbye', - 'bye', - 'good bye', - 'see you', - 'talk to you later', - 'ttyl', - 'have a good day', - 'take care', - 'farewell', - 'later', - 'peace out', - ] - - # Check if any goodbye phrase is in the message - return any(phrase in last_user_message for phrase in goodbye_phrases) - - -async def check_conversation_complete(params: FunctionCallParams): - """ - Function to check if conversation should end based on goodbye detection. - """ - # Check if goodbye is present - conversation_complete = await evaluate_completion_criteria(params) - - if conversation_complete: - # Send farewell message - await params.llm.push_frame( - TTSSpeakFrame('Thank you for using our service! Goodbye!') - ) - # End the conversation - await params.llm.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM) - - # Return result to LLM - await params.result_callback( - { - 'status': 'complete' if conversation_complete else 'continuing', - 'goodbye_detected': conversation_complete, - } - ) - - class PipecatService: """Service for creating and running Pipecat pipelines""" @@ -189,6 +96,13 @@ async def run_conversation( f'default: {default_language}, multi-language: {is_multi_language}' ) + # Track language state for multi-language conversations + language_state = { + 'current_language': default_language, + 'switch_count': 0, + 'original_system_prompt': '', + } + # Create LLM service (language-agnostic) llm = LLMServiceFactory.create_llm_service(llm_config) @@ -290,11 +204,26 @@ async def run_conversation( tts = TTSServiceFactory.create_tts_service(tts_config_with_params) # Create initial messages with system prompt + base_system_prompt = ( + f'Customer phone number: {customer_number}\n' + + agent_config['system_prompt'] + ) + + # Add language instruction for default language if multi-language + if is_multi_language: + initial_language_instruction = LANGUAGE_INSTRUCTIONS.get( + default_language, LANGUAGE_INSTRUCTIONS.get('en', 'Respond in English.') + ) + system_content = f'{base_system_prompt}\n\n{initial_language_instruction}' + # Store base prompt without language instruction for switching + language_state['original_system_prompt'] = base_system_prompt + else: + system_content = base_system_prompt + messages = [ { 'role': 'system', - 'content': f'Customer phone number: {customer_number}\n' - + agent_config['system_prompt'], + 'content': system_content, } ] @@ -326,25 +255,101 @@ async def run_conversation( else: logger.info(f'No tools configured for agent {agent_id}') - # Register built-in function handler with LLM service - llm.register_function( - 'check_conversation_complete', check_conversation_complete - ) + # Create containers for late binding (populated after creation) + task_container = {'task': None} + context_container = {'context': None} + + # Register language detection tool if multi-language enabled + if is_multi_language: + from call_processing.services.language_detection_tool import ( + LanguageDetectionToolFactory, + ) - # Create FunctionSchema for check_conversation_complete - check_complete_schema = FunctionSchema( - name='check_conversation_complete', - description='Check if conversation should end based on goodbye detection', - properties={}, # No parameters needed + language_detection_func = ( + LanguageDetectionToolFactory.create_language_detection_tool( + task_container=task_container, + stt_services=stt_services, + tts_services=tts_services, + context_container=context_container, + supported_languages=supported_languages, + default_language=default_language, + language_state=language_state, + ) + ) + + llm.register_function('detect_and_switch_language', language_detection_func) + logger.info('Registered language detection tool with LLM') + + # Register conversation completion tool + conversation_completion_func = ( + ConversationCompletionToolFactory.create_conversation_completion_tool( + task_container=task_container + ) + ) + llm.register_function('end_conversation', conversation_completion_func) + logger.info('Registered conversation completion tool with LLM') + + # Create FunctionSchema for conversation completion + end_conversation_schema = FunctionSchema( + name='end_conversation', + description=( + 'Call this function when the user indicates they want to end the conversation. ' + 'This includes goodbye phrases, expressions of completion, or any indication ' + 'that the user wants to hang up or finish the call. Examples: "goodbye", "bye", ' + '"thank you", "that\'s all", "I\'m done", etc.' + ), + properties={ + 'farewell_message': { + 'type': 'string', + 'description': ( + 'Optional custom farewell message to say to the user before ending. ' + 'If not provided, uses default: "Thank you for using our service! Goodbye!"' + ), + } + }, required=[], ) + # Create FunctionSchema for language detection (if multi-language) + language_detection_schemas = [] + if is_multi_language: + language_detection_schema = FunctionSchema( + name='detect_and_switch_language', + description=( + f"Detect and switch the conversation language when the user explicitly " + f"requests to speak in a different language. Call this when the user says " + f"something like 'switch to Spanish', 'I want to speak Hindi', 'let's talk in English', etc. " + f"Supported languages: {', '.join(supported_languages)}. " + f"Current language: {language_state['current_language']}." + ), + properties={ + 'target_language': { + 'type': 'string', + 'description': f"Target language code. Must be one of: {', '.join(supported_languages)}", + 'enum': supported_languages, + }, + 'user_intent': { + 'type': 'string', + 'description': "The user's statement indicating language preference (for logging)", + }, + }, + required=['target_language', 'user_intent'], + ) + language_detection_schemas.append(language_detection_schema) + # Combine all FunctionSchema objects for ToolsSchema - all_function_schemas = [check_complete_schema] + function_schemas + all_function_schemas = ( + [end_conversation_schema] + language_detection_schemas + function_schemas + ) tools_schema = ToolsSchema(standard_tools=all_function_schemas) # Create LLM context and aggregator context = LLMContext(messages, tools=tools_schema) + + # Populate context container for language detection tool (if multi-language) + if is_multi_language: + context_container['context'] = context + context_aggregator = LLMContextAggregatorPair( context, user_params=LLMUserAggregatorParams( @@ -370,21 +375,11 @@ async def run_conversation( # Create transcript processor for language detection transcript = TranscriptProcessor() - # Track current language detection state (only for multi-language) - language_detected = {'detected': False, 'current_language': default_language} - - # Create user idle processor (fresh instance for each conversation) - user_idle = UserIdleProcessor( - callback=handle_user_idle, - timeout=4.0, - ) - # Build pipeline components list pipeline_components = [ transport.input(), # Audio input from Twilio stt, # Speech-to-Text (ServiceSwitcher for multi-lang, direct for single) transcript.user(), # Transcript processor for user messages - user_idle, # User idle detection context_aggregator.user(), # Add user message to context llm, # LLM processing tts, # Text-to-Speech (ServiceSwitcher for multi-lang, direct for single) @@ -406,132 +401,12 @@ async def run_conversation( # enable_usage_metrics=True, # report_only_initial_ttfb=True ), - idle_timeout_secs=20, # Safety net - allows UserIdleProcessor to complete 3 retries (4s each = 12s total) + idle_timeout_secs=20, ) - # Multi-language detection event handler + # Populate task container for language detection tool (if multi-language) if is_multi_language: - - @transcript.event_handler('on_transcript_update') - async def handle_language_detection(processor, frame): - """Detect language from first user message and switch services""" - - # Only detect once - if language_detected['detected']: - return - - messages: List[TranscriptionMessage] = frame.messages - - # Look for user messages - for message in messages: - if message.role == 'user': - message_content = message.content.lower().strip() - - # Skip empty messages - if not message_content: - continue - - logger.info( - f"Analyzing message for language detection: '{message_content}'" - ) - - # Check for language keywords - detected_lang = None - for lang_code in supported_languages: - keywords = LANGUAGE_KEYWORDS.get(lang_code, []) - for keyword in keywords: - if keyword.lower() in message_content: - detected_lang = lang_code - logger.info( - f'Language detected: {detected_lang} ' - f"(matched keyword: '{keyword}')" - ) - break - if detected_lang: - break - - # Use detected language or fallback to default - target_language = detected_lang or default_language - - if not detected_lang: - logger.info( - f'No language detected, using default: {default_language}' - ) - - # Mark detection as complete - language_detected['detected'] = True - language_detected['current_language'] = target_language - - # Only switch services if target language is different from default - if target_language != default_language: - if ( - target_language in stt_services - and target_language in tts_services - ): - target_stt = stt_services[target_language] - target_tts = tts_services[target_language] - - try: - await task.queue_frames( - [ - ManuallySwitchServiceFrame( - service=target_stt - ), - ManuallySwitchServiceFrame( - service=target_tts - ), - ] - ) - logger.info( - f'Switched STT/TTS services to language: {target_language}' - ) - except Exception as e: - logger.error( - f'Error switching services: {e}', exc_info=True - ) - else: - logger.info( - f'Language {target_language} is default, no service switch needed' - ) - - # Update LLM system prompt with language instruction - language_instruction = LANGUAGE_INSTRUCTIONS.get( - target_language, - LANGUAGE_INSTRUCTIONS.get('en', 'Respond in English.'), - ) - - # Get current system prompt and append language instruction - current_messages = context.get_messages() - if current_messages and len(current_messages) > 0: - system_message = current_messages[0] - else: - system_message = { - 'role': 'system', - 'content': agent_config['system_prompt'], - } - - updated_content = ( - f"{system_message['content']}\n\n{language_instruction}" - ) - updated_system_message = { - 'role': 'system', - 'content': updated_content, - } - - # Update context with new system message - new_messages = [updated_system_message] + current_messages[1:] - await task.queue_frame( - LLMMessagesUpdateFrame(new_messages, run_llm=False) - ) - - logger.info( - f'Updated LLM context with language instruction for {target_language}' - ) - - # Exit after first detection - break - - logger.info('Language detection event handler registered') + task_container['task'] = task # Register event handlers @transport.event_handler('on_client_connected')