From 7a00c1dd444a5a79786eb0df3a73307a96901501 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Fri, 23 Jan 2026 20:04:19 +0530 Subject: [PATCH 01/16] add cust_number to sys prompt dynamically --- .../call_processing/controllers/webhook_controller.py | 9 +++++++++ .../call_processing/services/pipecat_service.py | 4 +++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/wavefront/server/apps/call_processing/call_processing/controllers/webhook_controller.py b/wavefront/server/apps/call_processing/call_processing/controllers/webhook_controller.py index 22410e0a..1f42e391 100644 --- a/wavefront/server/apps/call_processing/call_processing/controllers/webhook_controller.py +++ b/wavefront/server/apps/call_processing/call_processing/controllers/webhook_controller.py @@ -119,6 +119,8 @@ async def inbound_webhook( # Pass parameters to WebSocket stream stream.parameter(name='voice_agent_id', value=agent_id) + stream.parameter(name='customer_number', value=From) + stream.parameter(name='agent_number', value=To) connect.append(stream) response.append(connect) @@ -134,6 +136,8 @@ async def inbound_webhook( @webhook_router.post('/twiml') async def twiml_endpoint( + From: str = Form(...), + To: str = Form(...), voice_agent_id: str = Query(...), welcome_message_audio_url: str = Query(default=''), ): @@ -181,6 +185,8 @@ async def twiml_endpoint( # Pass parameters to WebSocket stream stream.parameter(name='voice_agent_id', value=voice_agent_id) + stream.parameter(name='customer_number', value=To) + stream.parameter(name='agent_number', value=From) connect.append(stream) response.append(connect) @@ -223,6 +229,8 @@ async def websocket_endpoint( # Extract parameters from stream body_data = call_data.get('body', {}) voice_agent_id = body_data.get('voice_agent_id') + customer_number = body_data.get('customer_number') + # agent_number = body_data.get('agent_number') if not voice_agent_id: logger.error('voice_agent_id not found in stream parameters') @@ -282,6 +290,7 @@ async def websocket_endpoint( tts_config=configs['tts_config'], stt_config=configs['stt_config'], tools=configs['tools'], + customer_number=customer_number, ) except Exception as e: diff --git a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py index db29a8af..def39381 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py @@ -145,6 +145,7 @@ async def run_conversation( tts_config: Dict[str, Any], stt_config: Dict[str, Any], tools: List[Dict[str, Any]], + customer_number: str, ): """ Create and run the Pipecat pipeline for a voice conversation @@ -261,7 +262,8 @@ async def run_conversation( messages = [ { 'role': 'system', - 'content': agent_config['system_prompt'], + 'content': f'Customer phone number: {customer_number}\n' + + agent_config['system_prompt'], } ] From c5d0a04e504d2b8c8cb3b26536cfedcd7e5dd0f0 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Tue, 27 Jan 2026 11:56:11 +0530 Subject: [PATCH 02/16] changed voice_agent - tts_voice_id str to tts_voice_ids dict --- .../voice-agents/CreateVoiceAgentDialog.tsx | 53 +++++-- .../voice-agents/EditVoiceAgentDialog.tsx | 53 +++++-- wavefront/client/src/types/voice-agent.ts | 6 +- .../services/pipecat_service.py | 25 ++- .../apps/call_processing/pyproject.toml | 2 +- ...61a34bfc_refactor_tts_voice_id_to_jsonb.py | 150 ++++++++++++++++++ .../db_repo_module/models/voice_agent.py | 3 +- .../controllers/voice_agent_controller.py | 6 +- .../models/voice_agent_schemas.py | 46 +++++- .../services/voice_agent_service.py | 90 ++++++++--- 10 files changed, 375 insertions(+), 59 deletions(-) create mode 100644 wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_01_24_1447-b92161a34bfc_refactor_tts_voice_id_to_jsonb.py diff --git a/wavefront/client/src/pages/apps/[appId]/voice-agents/CreateVoiceAgentDialog.tsx b/wavefront/client/src/pages/apps/[appId]/voice-agents/CreateVoiceAgentDialog.tsx index ec275b5f..cf06c56c 100644 --- a/wavefront/client/src/pages/apps/[appId]/voice-agents/CreateVoiceAgentDialog.tsx +++ b/wavefront/client/src/pages/apps/[appId]/voice-agents/CreateVoiceAgentDialog.tsx @@ -56,7 +56,9 @@ const createVoiceAgentSchema = z.object({ tts_config_id: z.string().min(1, 'TTS configuration is required'), stt_config_id: z.string().min(1, 'STT configuration is required'), telephony_config_id: z.string().min(1, 'Telephony configuration is required'), - tts_voice_id: z.string().min(1, 'TTS Voice ID is required'), + tts_voice_ids: z.record(z.string(), z.string()).refine((val) => Object.keys(val).length > 0, { + message: 'At least one voice ID is required', + }), system_prompt: z.string().min(1, 'System prompt is required'), welcome_message: z.string().min(1, 'Welcome message is required'), conversation_config: z.string().optional(), @@ -83,6 +85,7 @@ const CreateVoiceAgentDialog: React.FC = ({ isOpen, const [creating, setCreating] = useState(false); const [ttsParameters, setTtsParameters] = useState>({}); const [sttParameters, setSttParameters] = useState>({}); + const [voiceIdState, setVoiceIdState] = useState>({ en: '' }); // Fetch configs for dropdowns const { data: llmConfigs = [] } = useGetLLMConfigs(appId); @@ -99,7 +102,7 @@ const CreateVoiceAgentDialog: React.FC = ({ isOpen, tts_config_id: '', stt_config_id: '', telephony_config_id: '', - tts_voice_id: '', + tts_voice_ids: { en: '' }, system_prompt: '', welcome_message: '', conversation_config: '{}', @@ -114,6 +117,7 @@ const CreateVoiceAgentDialog: React.FC = ({ isOpen, // Watch config selections to determine providers const watchedTtsConfigId = form.watch('tts_config_id'); const watchedSttConfigId = form.watch('stt_config_id'); + const watchedSupportedLanguages = form.watch('supported_languages'); // Get selected providers const selectedTtsProvider = ttsConfigs.find((c) => c.id === watchedTtsConfigId)?.provider; @@ -132,6 +136,20 @@ const CreateVoiceAgentDialog: React.FC = ({ isOpen, } }, [selectedSttProvider, isOpen]); + // Sync voice ID state with language changes + useEffect(() => { + if (isOpen && watchedSupportedLanguages) { + setVoiceIdState((prev) => { + const newState: Record = {}; + // Preserve existing voice IDs for languages still selected + watchedSupportedLanguages.forEach((lang) => { + newState[lang] = prev[lang] || ''; + }); + return newState; + }); + } + }, [watchedSupportedLanguages, isOpen]); + // Reset form when dialog closes useEffect(() => { if (!isOpen) { @@ -142,7 +160,7 @@ const CreateVoiceAgentDialog: React.FC = ({ isOpen, tts_config_id: '', stt_config_id: '', telephony_config_id: '', - tts_voice_id: '', + tts_voice_ids: { en: '' }, system_prompt: '', welcome_message: '', conversation_config: '{}', @@ -154,6 +172,7 @@ const CreateVoiceAgentDialog: React.FC = ({ isOpen, }); setTtsParameters({}); setSttParameters({}); + setVoiceIdState({ en: '' }); } }, [isOpen, form]); @@ -232,7 +251,7 @@ const CreateVoiceAgentDialog: React.FC = ({ isOpen, tts_config_id: data.tts_config_id.trim(), stt_config_id: data.stt_config_id.trim(), telephony_config_id: data.telephony_config_id.trim(), - tts_voice_id: data.tts_voice_id.trim(), + tts_voice_ids: data.tts_voice_ids, tts_parameters: Object.keys(builtTtsParameters).length > 0 ? builtTtsParameters : null, stt_parameters: Object.keys(builtSttParameters).length > 0 ? builtSttParameters : null, system_prompt: data.system_prompt.trim(), @@ -710,17 +729,31 @@ const CreateVoiceAgentDialog: React.FC = ({ isOpen,

TTS Voice Settings

( - TTS Voice ID* + TTS Voice IDs* - - - +
+ {watchedSupportedLanguages.map((langCode) => ( +
+ + { + const newState = { ...voiceIdState, [langCode]: e.target.value }; + setVoiceIdState(newState); + field.onChange(newState); + }} + className="flex-1" + /> +
+ ))} +
- Provider-specific voice identifier (e.g., for Deepgram: aura-2-helena-en) + Provider-specific voice identifiers per language (e.g., "aura-2-helena-en" for Deepgram)
diff --git a/wavefront/client/src/pages/apps/[appId]/voice-agents/EditVoiceAgentDialog.tsx b/wavefront/client/src/pages/apps/[appId]/voice-agents/EditVoiceAgentDialog.tsx index 5475064e..6676fb4b 100644 --- a/wavefront/client/src/pages/apps/[appId]/voice-agents/EditVoiceAgentDialog.tsx +++ b/wavefront/client/src/pages/apps/[appId]/voice-agents/EditVoiceAgentDialog.tsx @@ -51,7 +51,7 @@ const updateVoiceAgentSchema = z.object({ tts_config_id: z.string().min(1, 'TTS configuration is required'), stt_config_id: z.string().min(1, 'STT configuration is required'), telephony_config_id: z.string().min(1, 'Telephony configuration is required'), - tts_voice_id: z.string().min(1, 'TTS Voice ID is required'), + tts_voice_ids: z.record(z.string(), z.string()).optional(), system_prompt: z.string().min(1, 'System prompt is required'), welcome_message: z.string().min(1, 'Welcome message is required'), conversation_config: z.string().optional(), @@ -92,6 +92,7 @@ const EditVoiceAgentDialog: React.FC = ({ // State for TTS/STT parameters (managed separately from form) const [ttsParameters, setTtsParameters] = useState>({}); const [sttParameters, setSttParameters] = useState>({}); + const [voiceIdState, setVoiceIdState] = useState>(agent.tts_voice_ids || { en: '' }); const form = useForm({ resolver: zodResolver(updateVoiceAgentSchema), @@ -102,7 +103,7 @@ const EditVoiceAgentDialog: React.FC = ({ tts_config_id: agent.tts_config_id, stt_config_id: agent.stt_config_id, telephony_config_id: agent.telephony_config_id, - tts_voice_id: agent.tts_voice_id, + tts_voice_ids: agent.tts_voice_ids, system_prompt: agent.system_prompt, welcome_message: agent.welcome_message, conversation_config: agent.conversation_config ? JSON.stringify(agent.conversation_config, null, 2) : '{}', @@ -117,6 +118,7 @@ const EditVoiceAgentDialog: React.FC = ({ // Watch for config changes to determine providers const watchedTtsConfigId = form.watch('tts_config_id'); const watchedSttConfigId = form.watch('stt_config_id'); + const watchedSupportedLanguages = form.watch('supported_languages'); const selectedTtsProvider = ttsConfigs.find((c) => c.id === watchedTtsConfigId)?.provider; const selectedSttProvider = sttConfigs.find((c) => c.id === watchedSttConfigId)?.provider; @@ -134,7 +136,7 @@ const EditVoiceAgentDialog: React.FC = ({ tts_config_id: agent.tts_config_id, stt_config_id: agent.stt_config_id, telephony_config_id: agent.telephony_config_id, - tts_voice_id: agent.tts_voice_id, + tts_voice_ids: agent.tts_voice_ids, system_prompt: agent.system_prompt, welcome_message: agent.welcome_message, conversation_config: agent.conversation_config ? JSON.stringify(agent.conversation_config, null, 2) : '{}', @@ -144,6 +146,7 @@ const EditVoiceAgentDialog: React.FC = ({ supported_languages: agent.supported_languages || ['en'], default_language: agent.default_language || 'en', }); + setVoiceIdState(agent.tts_voice_ids || { en: '' }); } }, [isOpen, agent, form]); @@ -161,6 +164,20 @@ const EditVoiceAgentDialog: React.FC = ({ } }, [isOpen, agent.stt_parameters]); + // Sync voice ID state with language changes + useEffect(() => { + if (isOpen && watchedSupportedLanguages) { + setVoiceIdState((prev) => { + const newState: Record = {}; + // Preserve existing voice IDs for languages still selected + watchedSupportedLanguages.forEach((lang) => { + newState[lang] = prev[lang] || ''; + }); + return newState; + }); + } + }, [watchedSupportedLanguages, isOpen]); + // Helper functions to update parameters const setTtsParameter = (key: string, value: unknown) => { setTtsParameters((prev) => ({ ...prev, [key]: value })); @@ -266,8 +283,8 @@ const EditVoiceAgentDialog: React.FC = ({ requestData.telephony_config_id = data.telephony_config_id; } - if (data.tts_voice_id.trim() !== agent.tts_voice_id) { - requestData.tts_voice_id = data.tts_voice_id.trim(); + if (JSON.stringify(data.tts_voice_ids) !== JSON.stringify(agent.tts_voice_ids)) { + requestData.tts_voice_ids = data.tts_voice_ids; } // Check if TTS parameters changed @@ -757,17 +774,31 @@ const EditVoiceAgentDialog: React.FC = ({

TTS Voice Settings

( - TTS Voice ID* + TTS Voice IDs* - - - +
+ {watchedSupportedLanguages.map((langCode) => ( +
+ + { + const newState = { ...voiceIdState, [langCode]: e.target.value }; + setVoiceIdState(newState); + field.onChange(newState); + }} + className="flex-1" + /> +
+ ))} +
- Provider-specific voice identifier (e.g., for Deepgram: aura-2-helena-en) + Provider-specific voice identifiers per language (e.g., "aura-2-helena-en" for Deepgram)
diff --git a/wavefront/client/src/types/voice-agent.ts b/wavefront/client/src/types/voice-agent.ts index 8d0d8260..d34a5520 100644 --- a/wavefront/client/src/types/voice-agent.ts +++ b/wavefront/client/src/types/voice-agent.ts @@ -13,7 +13,7 @@ export interface VoiceAgent { telephony_config_id: string; system_prompt: string; welcome_message: string; - tts_voice_id: string; + tts_voice_ids: Record; tts_parameters: Record | null; stt_parameters: Record | null; conversation_config: Record | null; @@ -39,7 +39,7 @@ export interface CreateVoiceAgentRequest { telephony_config_id: string; system_prompt: string; welcome_message: string; - tts_voice_id: string; + tts_voice_ids: Record; tts_parameters?: Record | null; stt_parameters?: Record | null; conversation_config?: Record | null; @@ -63,7 +63,7 @@ export interface UpdateVoiceAgentRequest { telephony_config_id?: string; system_prompt?: string; welcome_message?: string; - tts_voice_id?: string; + tts_voice_ids?: Record; tts_parameters?: Record | null; stt_parameters?: Record | null; conversation_config?: Record | null; diff --git a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py index def39381..574ec3c9 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py @@ -165,7 +165,9 @@ async def run_conversation( is_multi_language = len(supported_languages) > 1 # Extract TTS/STT parameters from agent - tts_voice_id = agent_config.get('tts_voice_id') + tts_voice_ids_dict = agent_config.get( + 'tts_voice_ids', {} + ) # Dict of language -> voice_id tts_parameters = agent_config.get('tts_parameters', {}) stt_parameters = agent_config.get('stt_parameters', {}) @@ -178,11 +180,14 @@ async def run_conversation( # Create LLM service (language-agnostic) llm = LLMServiceFactory.create_llm_service(llm_config) + # Get voice ID for default language + default_voice_id = tts_voice_ids_dict.get(default_language, 'default') + # Merge TTS config credentials with agent's voice and parameters tts_config_with_params = { 'provider': tts_config['provider'], 'api_key': tts_config['api_key'], - 'voice_id': tts_voice_id, + 'voice_id': default_voice_id, # Will be overridden per language in multi-lang mode 'parameters': tts_parameters or {}, } @@ -204,6 +209,14 @@ async def run_conversation( # Create STT/TTS services for each supported language for lang_code in supported_languages: + # Get voice ID for this language + voice_id_for_lang = tts_voice_ids_dict.get(lang_code) + if not voice_id_for_lang: + logger.warning( + f'No voice ID for language {lang_code}, using default' + ) + voice_id_for_lang = default_voice_id + # Deep clone configs to avoid mutating original configs stt_config_lang = deepcopy(stt_config_with_params) tts_config_lang = deepcopy(tts_config_with_params) @@ -217,6 +230,9 @@ async def run_conversation( tts_config_lang['parameters'] = {} tts_config_lang['parameters']['language'] = lang_code + # Set language-specific voice ID + tts_config_lang['voice_id'] = voice_id_for_lang + # Create services stt_services[lang_code] = STTServiceFactory.create_stt_service( stt_config_lang @@ -225,7 +241,10 @@ async def run_conversation( tts_config_lang ) - logger.info(f'Created STT/TTS services for language: {lang_code}') + logger.info( + f'Created STT/TTS services for language: {lang_code} ' + f'with voice: {voice_id_for_lang}' + ) # Create service switchers with manual strategy # Order services list with default language first (ServiceSwitcher uses first as initial) diff --git a/wavefront/server/apps/call_processing/pyproject.toml b/wavefront/server/apps/call_processing/pyproject.toml index 4cb97c09..6ed520ce 100644 --- a/wavefront/server/apps/call_processing/pyproject.toml +++ b/wavefront/server/apps/call_processing/pyproject.toml @@ -21,7 +21,7 @@ dependencies = [ "redis>=5.0.0", "tenacity>=8.0.0", # Pipecat and voice processing - "pipecat-ai[websocket,cartesia,google,silero,deepgram,groq,runner]==0.0.97", + "pipecat-ai[websocket,cartesia,google,silero,deepgram,groq,runner,azure]==0.0.97", # Twilio "twilio>=8.0.0", ] diff --git a/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_01_24_1447-b92161a34bfc_refactor_tts_voice_id_to_jsonb.py b/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_01_24_1447-b92161a34bfc_refactor_tts_voice_id_to_jsonb.py new file mode 100644 index 00000000..7dd2000e --- /dev/null +++ b/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_01_24_1447-b92161a34bfc_refactor_tts_voice_id_to_jsonb.py @@ -0,0 +1,150 @@ +"""refactor_tts_voice_ids_to_jsonb + +Revision ID: b92161a34bfc +Revises: a5b3c4d5e6f7 +Create Date: 2026-01-24 14:47:58.115161 + +""" + +from typing import Sequence, Union +import json + +from alembic import op +import sqlalchemy as sa +from sqlalchemy import text + + +# revision identifiers, used by Alembic. +revision: str = 'b92161a34bfc' +down_revision: Union[str, None] = 'a5b3c4d5e6f7' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """ + Convert tts_voice_id from String to tts_voice_ids JSONB dictionary. + + For each agent, converts the single voice ID string to a dictionary + mapping each supported language to that voice ID. + + Example: "alloy" with supported_languages=["en", "hi"] becomes {"en": "alloy", "hi": "alloy"} + """ + connection = op.get_bind() + + # Step 1: Add new JSONB column (nullable initially) + op.execute(""" + ALTER TABLE voice_agents + ADD COLUMN IF NOT EXISTS tts_voice_ids JSONB + """) + + # Step 2: Migrate data - convert string to dict for all supported_languages + # Note: Migrate ALL agents including deleted ones to avoid NOT NULL constraint violation + agents = connection.execute( + text(""" + SELECT id, tts_voice_id, supported_languages + FROM voice_agents + """) + ).fetchall() + + for agent in agents: + old_voice_id = agent.tts_voice_id + + # Handle null/empty supported_languages - default to ["en"] + if agent.supported_languages: + if isinstance(agent.supported_languages, list): + supported_langs = agent.supported_languages + else: + # In case it's stored as JSON string + try: + supported_langs = json.loads(agent.supported_languages) + except (json.JSONDecodeError, TypeError): + supported_langs = ['en'] + else: + supported_langs = ['en'] + + # Build dict: apply old voice_id to all supported languages + voice_ids_dict = {lang: old_voice_id for lang in supported_langs} + + # Update the agent with new JSONB dict + connection.execute( + text(""" + UPDATE voice_agents + SET tts_voice_ids = :voice_dict + WHERE id = :agent_id + """), + {'voice_dict': json.dumps(voice_ids_dict), 'agent_id': str(agent.id)}, + ) + + # Step 3: Make new column NOT NULL + op.execute(""" + ALTER TABLE voice_agents + ALTER COLUMN tts_voice_ids SET NOT NULL + """) + + # Step 4: Drop old column + op.execute(""" + ALTER TABLE voice_agents + DROP COLUMN tts_voice_id + """) + + +def downgrade() -> None: + """ + Restore tts_voice_id as String column from tts_voice_ids JSONB. + + This is a lossy operation - only the voice ID for the default_language + is preserved. Voice IDs for other languages are lost. + """ + connection = op.get_bind() + + # Step 1: Add old string column back (nullable initially) + op.add_column( + 'voice_agents', sa.Column('tts_voice_id', sa.String(255), nullable=True) + ) + + # Step 2: Extract voice_id from dict (use default_language voice or first available) + agents = connection.execute( + text(""" + SELECT id, tts_voice_ids, default_language + FROM voice_agents + """) + ).fetchall() + + for agent in agents: + # Parse JSONB dict + if agent.tts_voice_ids: + if isinstance(agent.tts_voice_ids, dict): + voice_dict = agent.tts_voice_ids + else: + try: + voice_dict = json.loads(agent.tts_voice_ids) + except (json.JSONDecodeError, TypeError): + voice_dict = {} + else: + voice_dict = {} + + # Try to use default_language voice, fallback to first available, or 'default' + default_lang = agent.default_language or 'en' + voice_id = voice_dict.get(default_lang) or next( + iter(voice_dict.values()), 'default' + ) + + # Update with single voice_id string + connection.execute( + text(""" + UPDATE voice_agents + SET tts_voice_id = :voice_id + WHERE id = :agent_id + """), + {'voice_id': voice_id, 'agent_id': str(agent.id)}, + ) + + # Step 3: Make old column NOT NULL + op.execute(""" + ALTER TABLE voice_agents + ALTER COLUMN tts_voice_id SET NOT NULL + """) + + # Step 4: Drop JSONB column + op.drop_column('voice_agents', 'tts_voice_ids') diff --git a/wavefront/server/modules/db_repo_module/db_repo_module/models/voice_agent.py b/wavefront/server/modules/db_repo_module/db_repo_module/models/voice_agent.py index c4774616..bb3a0e0f 100644 --- a/wavefront/server/modules/db_repo_module/db_repo_module/models/voice_agent.py +++ b/wavefront/server/modules/db_repo_module/db_repo_module/models/voice_agent.py @@ -36,7 +36,7 @@ class VoiceAgent(Base): status: Mapped[str] = mapped_column(String(length=64), nullable=False) # TTS/STT configuration - tts_voice_id: Mapped[str] = mapped_column(String(length=255), nullable=False) + tts_voice_ids: Mapped[dict] = mapped_column(JSONB, nullable=False) tts_parameters: Mapped[Optional[dict]] = mapped_column(JSONB, nullable=True) stt_parameters: Mapped[Optional[dict]] = mapped_column(JSONB, nullable=True) @@ -75,6 +75,7 @@ def to_dict(self): 'supported_languages', 'tts_parameters', 'stt_parameters', + 'tts_voice_ids', ]: # Parse JSON/JSONB fields if value: diff --git a/wavefront/server/modules/voice_agents_module/voice_agents_module/controllers/voice_agent_controller.py b/wavefront/server/modules/voice_agents_module/voice_agents_module/controllers/voice_agent_controller.py index 7ec584b3..63138286 100644 --- a/wavefront/server/modules/voice_agents_module/voice_agents_module/controllers/voice_agent_controller.py +++ b/wavefront/server/modules/voice_agents_module/voice_agents_module/controllers/voice_agent_controller.py @@ -55,7 +55,7 @@ async def create_voice_agent( telephony_config_id=payload.telephony_config_id, system_prompt=payload.system_prompt, welcome_message=payload.welcome_message, - tts_voice_id=payload.tts_voice_id, + tts_voice_ids=payload.tts_voice_ids, tts_parameters=payload.tts_parameters, stt_parameters=payload.stt_parameters, conversation_config=payload.conversation_config, @@ -181,8 +181,8 @@ async def update_voice_agent( update_data['system_prompt'] = payload.system_prompt if payload.welcome_message is not UNSET: update_data['welcome_message'] = payload.welcome_message - if payload.tts_voice_id is not UNSET: - update_data['tts_voice_id'] = payload.tts_voice_id + if payload.tts_voice_ids is not UNSET: + update_data['tts_voice_ids'] = payload.tts_voice_ids if payload.tts_parameters is not UNSET: update_data['tts_parameters'] = payload.tts_parameters if payload.stt_parameters is not UNSET: diff --git a/wavefront/server/modules/voice_agents_module/voice_agents_module/models/voice_agent_schemas.py b/wavefront/server/modules/voice_agents_module/voice_agents_module/models/voice_agent_schemas.py index 000bc720..2e764330 100644 --- a/wavefront/server/modules/voice_agents_module/voice_agents_module/models/voice_agent_schemas.py +++ b/wavefront/server/modules/voice_agents_module/voice_agents_module/models/voice_agent_schemas.py @@ -1,4 +1,4 @@ -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, validator from typing import Optional, Union, Any, Dict, List from enum import Enum from datetime import datetime @@ -30,7 +30,10 @@ class CreateVoiceAgentPayload(BaseModel): ..., description='Welcome message to play at call start (will be converted to audio)', ) - tts_voice_id: str = Field(..., description='TTS voice identifier') + tts_voice_ids: Dict[str, str] = Field( + ..., + description='TTS voice identifiers per language (e.g., {"en": "alloy", "hi": "shimmer"})', + ) tts_parameters: Optional[Dict[str, Any]] = Field( None, description='Provider-specific TTS parameters (model, stability, etc.)' ) @@ -58,6 +61,41 @@ class CreateVoiceAgentPayload(BaseModel): description='Default language if detection fails (must be in supported_languages)', ) + @validator('tts_voice_ids') + def validate_tts_voice_ids_keys(cls, v, values): + """Validate that tts_voice_ids has voice IDs for all supported languages.""" + # Get supported languages, default to ['en'] if not provided + supported_langs = values.get('supported_languages') or ['en'] + + if not isinstance(v, dict): + raise ValueError('tts_voice_ids must be a dictionary') + + if not v: + raise ValueError('tts_voice_ids dictionary cannot be empty') + + # Check all languages have voice IDs + supported_set = set(supported_langs) + provided_set = set(v.keys()) + + missing_langs = supported_set - provided_set + if missing_langs: + raise ValueError( + f'Missing voice IDs for languages: {sorted(missing_langs)}' + ) + + extra_langs = provided_set - supported_set + if extra_langs: + raise ValueError( + f'Voice IDs provided for unsupported languages: {sorted(extra_langs)}' + ) + + # Validate each voice_id is non-empty + for lang, voice_id in v.items(): + if not voice_id or not str(voice_id).strip(): + raise ValueError(f'Voice ID for language "{lang}" cannot be empty') + + return v + class UpdateVoiceAgentPayload(BaseModel): name: Union[str, Any] = Field(default=UNSET) @@ -69,7 +107,7 @@ class UpdateVoiceAgentPayload(BaseModel): system_prompt: Union[str, Any] = Field(default=UNSET) conversation_config: Union[Dict[str, Any], None, Any] = Field(default=UNSET) welcome_message: Union[str, Any] = Field(default=UNSET) - tts_voice_id: Union[str, Any] = Field(default=UNSET) + tts_voice_ids: Union[Dict[str, str], Any] = Field(default=UNSET) tts_parameters: Union[Dict[str, Any], None, Any] = Field(default=UNSET) stt_parameters: Union[Dict[str, Any], None, Any] = Field(default=UNSET) status: Union[VoiceAgentStatus, Any] = Field(default=UNSET) @@ -90,7 +128,7 @@ class VoiceAgentResponse(BaseModel): system_prompt: str conversation_config: Optional[Dict[str, Any]] welcome_message: str - tts_voice_id: str + tts_voice_ids: Dict[str, str] tts_parameters: Optional[Dict[str, Any]] stt_parameters: Optional[Dict[str, Any]] status: str diff --git a/wavefront/server/modules/voice_agents_module/voice_agents_module/services/voice_agent_service.py b/wavefront/server/modules/voice_agents_module/voice_agents_module/services/voice_agent_service.py index 79c826a3..dfb0e4ab 100644 --- a/wavefront/server/modules/voice_agents_module/voice_agents_module/services/voice_agent_service.py +++ b/wavefront/server/modules/voice_agents_module/voice_agents_module/services/voice_agent_service.py @@ -122,7 +122,8 @@ async def _validate_foreign_keys( def _validate_tts_stt_parameters( self, - tts_voice_id: str, + tts_voice_ids: dict, + supported_languages: List[str], tts_parameters: Optional[dict] = None, stt_parameters: Optional[dict] = None, ) -> Tuple[bool, Optional[str]]: @@ -130,16 +131,40 @@ def _validate_tts_stt_parameters( Validate TTS/STT parameters. Args: - tts_voice_id: TTS voice identifier + tts_voice_ids: TTS voice identifiers dict (language -> voice_id) + supported_languages: List of supported language codes tts_parameters: Provider-specific TTS parameters stt_parameters: Provider-specific STT parameters Returns: Tuple of (is_valid, error_message). error_message is None if valid. """ - # Validate TTS voice_id is not empty - if not tts_voice_id or not tts_voice_id.strip(): - return False, 'TTS voice_id is required and cannot be empty' + # Validate TTS voice_ids is a dict + if not isinstance(tts_voice_ids, dict): + return False, 'TTS voice_ids must be a dictionary' + + if not tts_voice_ids: + return False, 'TTS voice_ids dictionary cannot be empty' + + # Validate all languages have voice IDs + supported_set = set(supported_languages) + provided_set = set(tts_voice_ids.keys()) + + missing_langs = supported_set - provided_set + if missing_langs: + return False, f'Missing voice IDs for languages: {sorted(missing_langs)}' + + extra_langs = provided_set - supported_set + if extra_langs: + return ( + False, + f'Voice IDs provided for unsupported languages: {sorted(extra_langs)}', + ) + + # Validate each voice_id is non-empty + for lang, voice_id in tts_voice_ids.items(): + if not voice_id or not str(voice_id).strip(): + return False, f'Voice ID for language "{lang}" cannot be empty' # Validate TTS parameters is a dict if provided if tts_parameters is not None and not isinstance(tts_parameters, dict): @@ -228,7 +253,7 @@ async def _generate_and_upload_welcome_audio( self, welcome_message: str, tts_config_id: UUID, - tts_voice_id: str, + tts_voice_ids: dict, tts_parameters: Optional[dict], agent_id: UUID, supported_languages: List[str], @@ -241,7 +266,7 @@ async def _generate_and_upload_welcome_audio( Args: welcome_message: Text of the welcome message tts_config_id: TTS config ID to use for generation - tts_voice_id: Voice ID for TTS + tts_voice_ids: Voice IDs dict (language -> voice_id) tts_parameters: Provider-specific TTS parameters agent_id: Voice agent ID (used for generating storage key) supported_languages: List of supported language codes @@ -257,6 +282,13 @@ async def _generate_and_upload_welcome_audio( if not tts_config: raise ValueError(f'TTS config {tts_config_id} not found') + # Get voice ID for default language + voice_id_for_default = tts_voice_ids.get(default_language) + if not voice_id_for_default: + raise ValueError( + f'No voice ID found for default language: {default_language}' + ) + # Build welcome audio text audio_text = welcome_message @@ -274,7 +306,7 @@ async def _generate_and_upload_welcome_audio( tts_config_with_params = { 'provider': tts_config['provider'], 'api_key': tts_config['api_key'], - 'voice_id': tts_voice_id, + 'voice_id': voice_id_for_default, # Use default language voice 'parameters': tts_parameters or {}, } @@ -318,7 +350,7 @@ async def create_agent( telephony_config_id: UUID, system_prompt: str, welcome_message: str, - tts_voice_id: str, + tts_voice_ids: dict, description: Optional[str] = None, conversation_config: Optional[dict] = None, status: str = 'inactive', @@ -340,7 +372,7 @@ async def create_agent( telephony_config_id: Telephony config ID system_prompt: System prompt for the agent welcome_message: Welcome message text (will be converted to audio) - tts_voice_id: TTS voice identifier + tts_voice_ids: TTS voice identifiers per language description: Description of the agent (optional) conversation_config: Conversation configuration (optional) status: Agent status (default: inactive) @@ -375,7 +407,7 @@ async def create_agent( # Validate TTS/STT parameters is_valid, error_message = self._validate_tts_stt_parameters( - tts_voice_id, tts_parameters, stt_parameters + tts_voice_ids, supported_languages, tts_parameters, stt_parameters ) if not is_valid: logger.error(f'TTS/STT validation failed: {error_message}') @@ -402,7 +434,7 @@ async def create_agent( await self._generate_and_upload_welcome_audio( welcome_message, tts_config_id, - tts_voice_id, + tts_voice_ids, tts_parameters, agent_id, supported_languages, @@ -424,7 +456,7 @@ async def create_agent( else None, welcome_message=welcome_message, status=status, - tts_voice_id=tts_voice_id, + tts_voice_ids=tts_voice_ids, tts_parameters=tts_parameters, stt_parameters=stt_parameters, inbound_numbers=inbound_numbers, @@ -587,9 +619,19 @@ async def update_agent(self, agent_id: UUID, **update_data) -> Optional[dict]: raise ValueError(error_message) # Validate TTS/STT parameters if being updated - tts_stt_fields = ['tts_voice_id', 'tts_parameters', 'stt_parameters'] + tts_stt_fields = [ + 'tts_voice_ids', + 'tts_parameters', + 'stt_parameters', + 'supported_languages', + ] if any(key in update_data for key in tts_stt_fields): - tts_voice_id = update_data.get('tts_voice_id', existing_agent.tts_voice_id) + tts_voice_ids = update_data.get( + 'tts_voice_ids', existing_agent.tts_voice_ids + ) + supported_languages_for_validation = update_data.get( + 'supported_languages', existing_dict.get('supported_languages', ['en']) + ) tts_parameters = update_data.get( 'tts_parameters', existing_dict.get('tts_parameters') ) @@ -598,7 +640,10 @@ async def update_agent(self, agent_id: UUID, **update_data) -> Optional[dict]: ) is_valid, error_message = self._validate_tts_stt_parameters( - tts_voice_id, tts_parameters, stt_parameters + tts_voice_ids, + supported_languages_for_validation, + tts_parameters, + stt_parameters, ) if not is_valid: logger.error(f'TTS/STT validation failed: {error_message}') @@ -619,10 +664,9 @@ async def update_agent(self, agent_id: UUID, **update_data) -> Optional[dict]: 'default_language' ] != existing_dict.get('default_language'): audio_regeneration_needed = True - if ( - 'tts_voice_id' in update_data - and update_data['tts_voice_id'] != existing_agent.tts_voice_id - ): + if 'tts_voice_ids' in update_data and update_data[ + 'tts_voice_ids' + ] != existing_dict.get('tts_voice_ids'): audio_regeneration_needed = True if 'tts_parameters' in update_data and update_data[ 'tts_parameters' @@ -677,8 +721,8 @@ async def update_agent(self, agent_id: UUID, **update_data) -> Optional[dict]: tts_config_id = update_data.get( 'tts_config_id', existing_agent.tts_config_id ) - tts_voice_id = update_data.get( - 'tts_voice_id', existing_agent.tts_voice_id + tts_voice_ids = update_data.get( + 'tts_voice_ids', existing_agent.tts_voice_ids ) tts_parameters = update_data.get( 'tts_parameters', existing_dict.get('tts_parameters') @@ -694,7 +738,7 @@ async def update_agent(self, agent_id: UUID, **update_data) -> Optional[dict]: await self._generate_and_upload_welcome_audio( welcome_message, tts_config_id, - tts_voice_id, + tts_voice_ids, tts_parameters, agent_id, supported_languages, From 2acffea7fbf9ee1429d4558e2d198ab63de8d4cd Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Tue, 27 Jan 2026 12:03:02 +0530 Subject: [PATCH 03/16] call_processing - added azure llm service --- .../call_processing/services/llm_service.py | 45 +++++++++++++++++++ wavefront/server/uv.lock | 21 ++++++++- 2 files changed, 64 insertions(+), 2 deletions(-) diff --git a/wavefront/server/apps/call_processing/call_processing/services/llm_service.py b/wavefront/server/apps/call_processing/call_processing/services/llm_service.py index 6bf2b9b4..c75035ca 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/llm_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/llm_service.py @@ -12,6 +12,7 @@ from pipecat.services.openai.base_llm import BaseOpenAILLMService from pipecat.services.google.llm import GoogleLLMService from pipecat.services.groq.llm import GroqLLMService +from pipecat.services.azure.llm import AzureLLMService # Add more as needed @@ -50,6 +51,11 @@ def create_llm_service(llm_config: Dict[str, Any]): if llm_type == 'openai': return LLMServiceFactory._create_openai_llm(api_key, model, parameters) + elif llm_type == 'azure_openai': + base_url = llm_config['base_url'] + return LLMServiceFactory._create_azure_llm( + api_key, model, parameters, base_url + ) elif llm_type == 'gemini': return LLMServiceFactory._create_google_llm(api_key, model, parameters) elif llm_type == 'groq': @@ -140,3 +146,42 @@ def _create_groq_llm(api_key: str, model: str, parameters: Dict[str, Any]): ) return GroqLLMService(api_key=api_key, model=model, params=input_params) + + @staticmethod + def _create_azure_llm( + api_key: str, model: str, parameters: Dict[str, Any], base_url: str = None + ): + """Create Azure OpenAI LLM service""" + # Extract Azure specific params + # api_version = parameters.get('api_version') + + # Build InputParams from the parameters dict + params_dict = {} + + if 'temperature' in parameters: + params_dict['temperature'] = parameters['temperature'] + if 'max_completion_tokens' in parameters: + params_dict['max_completion_tokens'] = parameters['max_completion_tokens'] + if 'top_p' in parameters: + params_dict['top_p'] = parameters['top_p'] + if 'frequency_penalty' in parameters: + params_dict['frequency_penalty'] = parameters['frequency_penalty'] + if 'presence_penalty' in parameters: + params_dict['presence_penalty'] = parameters['presence_penalty'] + if 'seed' in parameters: + params_dict['seed'] = parameters['seed'] + + # Create InputParams object + input_params = BaseOpenAILLMService.InputParams(**params_dict) + + logger.info( + f"Azure OpenAI LLM config: model={model}, endpoint={base_url}, temp={params_dict.get('temperature', 'default')}" + ) + + return AzureLLMService( + api_key=api_key, + endpoint=base_url, + model=model, + # api_version=api_version, + params=input_params, + ) diff --git a/wavefront/server/uv.lock b/wavefront/server/uv.lock index 169b3242..05bcf661 100644 --- a/wavefront/server/uv.lock +++ b/wavefront/server/uv.lock @@ -69,6 +69,7 @@ wheels = [ name = "aenum" version = "3.1.16" source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/09/7a/61ed58e8be9e30c3fe518899cc78c284896d246d51381bab59b5db11e1f3/aenum-3.1.16.tar.gz", hash = "sha256:bfaf9589bdb418ee3a986d85750c7318d9d2839c1b1a1d6fe8fc53ec201cf140", size = 137693, upload-time = "2026-01-12T22:34:38.819Z" } wheels = [ { url = "https://files.pythonhosted.org/packages/e3/52/6ad8f63ec8da1bf40f96996d25d5b650fdd38f5975f8c813732c47388f18/aenum-3.1.16-py3-none-any.whl", hash = "sha256:9035092855a98e41b66e3d0998bd7b96280e85ceb3a04cc035636138a1943eaf", size = 165627, upload-time = "2025-04-25T03:17:58.89Z" }, ] @@ -476,6 +477,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0e/aa/91355b5f539caf1b94f0e66ff1e4ee39373b757fce08204981f7829ede51/authlib-1.6.4-py2.py3-none-any.whl", hash = "sha256:39313d2a2caac3ecf6d8f95fbebdfd30ae6ea6ae6a6db794d976405fdd9aa796", size = 243076, upload-time = "2025-09-17T09:59:22.259Z" }, ] +[[package]] +name = "azure-cognitiveservices-speech" +version = "1.42.0" +source = { registry = "https://pypi.org/simple" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/86/1d/07fc84ab9590fae9cc66a789d1971a0e3494e605e1787879c3581c5a385a/azure_cognitiveservices_speech-1.42.0-py3-none-macosx_10_14_x86_64.whl", hash = "sha256:ad45a18ad6973a4fa2dbd4d71ded3a1a02c4dbbf13696b08f7a16f4156dddce7", size = 7420332, upload-time = "2025-01-13T22:10:18.831Z" }, + { url = "https://files.pythonhosted.org/packages/a1/72/7ebe03784b220b9adece692adc31ec6e4a1bac96c3e9d3fef511b5ca08aa/azure_cognitiveservices_speech-1.42.0-py3-none-macosx_11_0_arm64.whl", hash = "sha256:9105a64a9d83044790f4f8c9358b6ea66a7c042cbd67173db303501782e62d3f", size = 7277345, upload-time = "2025-01-13T22:10:24.23Z" }, + { url = "https://files.pythonhosted.org/packages/83/f7/9241ad7154e554730ea56271e14ad1115c278b26a81eb892eac16fabb480/azure_cognitiveservices_speech-1.42.0-py3-none-manylinux1_x86_64.whl", hash = "sha256:90890a147499239f37b0b1a5112c51820b90fa2b5adafa0df4da6cc0c211887f", size = 39727186, upload-time = "2025-01-13T22:10:01.628Z" }, + { url = "https://files.pythonhosted.org/packages/fc/fd/af607bdfa95306b13fcdeadcd48d28b80b27cc5e3b99e2bde96f6212cd3a/azure_cognitiveservices_speech-1.42.0-py3-none-manylinux2014_aarch64.whl", hash = "sha256:106fbdb165a215cada47d7e95851e0b9d2755a3f2355369bab4915ad001efe89", size = 39508123, upload-time = "2025-01-13T22:10:13.719Z" }, + { url = "https://files.pythonhosted.org/packages/17/fb/1c998efbfcb1e44f9dc4dbb8b182ea3e5287fdc167aa352aef4685e29435/azure_cognitiveservices_speech-1.42.0-py3-none-win32.whl", hash = "sha256:7d57218beec24360a8b7ce89755c2c133259e3411c233ef0a659b951e4c4c904", size = 2109807, upload-time = "2025-01-13T22:09:50.516Z" }, + { url = "https://files.pythonhosted.org/packages/52/bb/ef7a29f5717cca646be6698d80e542446a6a442be897c8f67bf93551c672/azure_cognitiveservices_speech-1.42.0-py3-none-win_amd64.whl", hash = "sha256:32076ee03b3b402a2e8841f2c21e5cd54dc3ffbf5af183426344727298c8bbd4", size = 2377971, upload-time = "2025-01-13T22:09:44.706Z" }, +] + [[package]] name = "azure-core" version = "1.35.1" @@ -617,7 +631,7 @@ dependencies = [ { name = "dependency-injector" }, { name = "fastapi" }, { name = "httpx" }, - { name = "pipecat-ai", extra = ["cartesia", "deepgram", "google", "groq", "runner", "silero", "websocket"] }, + { name = "pipecat-ai", extra = ["azure", "cartesia", "deepgram", "google", "groq", "runner", "silero", "websocket"] }, { name = "pydantic" }, { name = "python-dotenv" }, { name = "python-multipart" }, @@ -632,7 +646,7 @@ requires-dist = [ { name = "dependency-injector", specifier = ">=4.46.0,<5.0.0" }, { name = "fastapi", specifier = ">=0.115.2,<1.0.0" }, { name = "httpx", specifier = ">=0.27.0" }, - { name = "pipecat-ai", extras = ["websocket", "cartesia", "google", "silero", "deepgram", "groq", "runner"], specifier = "==0.0.97" }, + { name = "pipecat-ai", extras = ["websocket", "cartesia", "google", "silero", "deepgram", "groq", "runner", "azure"], specifier = "==0.0.97" }, { name = "pydantic", specifier = ">=2.0.0" }, { name = "python-dotenv", specifier = ">=1.1.0,<2.0.0" }, { name = "python-multipart", specifier = ">=0.0.9" }, @@ -3909,6 +3923,9 @@ wheels = [ ] [package.optional-dependencies] +azure = [ + { name = "azure-cognitiveservices-speech" }, +] cartesia = [ { name = "cartesia" }, { name = "websockets" }, From 98b05977e9097e9a8961630ec626b7fe4a2223ff Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Tue, 27 Jan 2026 16:45:36 +0530 Subject: [PATCH 04/16] updated pipecat version --- .../apps/call_processing/pyproject.toml | 2 +- wavefront/server/uv.lock | 70 +++++++++++-------- 2 files changed, 40 insertions(+), 32 deletions(-) diff --git a/wavefront/server/apps/call_processing/pyproject.toml b/wavefront/server/apps/call_processing/pyproject.toml index 6ed520ce..a0a776bf 100644 --- a/wavefront/server/apps/call_processing/pyproject.toml +++ b/wavefront/server/apps/call_processing/pyproject.toml @@ -21,7 +21,7 @@ dependencies = [ "redis>=5.0.0", "tenacity>=8.0.0", # Pipecat and voice processing - "pipecat-ai[websocket,cartesia,google,silero,deepgram,groq,runner,azure]==0.0.97", + "pipecat-ai[websocket,cartesia,google,silero,deepgram,groq,runner,azure,local-smart-turn-v3]==0.0.100", # Twilio "twilio>=8.0.0", ] diff --git a/wavefront/server/uv.lock b/wavefront/server/uv.lock index 05bcf661..632120cb 100644 --- a/wavefront/server/uv.lock +++ b/wavefront/server/uv.lock @@ -479,15 +479,18 @@ wheels = [ [[package]] name = "azure-cognitiveservices-speech" -version = "1.42.0" +version = "1.44.0" source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "azure-core" }, +] wheels = [ - { url = "https://files.pythonhosted.org/packages/86/1d/07fc84ab9590fae9cc66a789d1971a0e3494e605e1787879c3581c5a385a/azure_cognitiveservices_speech-1.42.0-py3-none-macosx_10_14_x86_64.whl", hash = "sha256:ad45a18ad6973a4fa2dbd4d71ded3a1a02c4dbbf13696b08f7a16f4156dddce7", size = 7420332, upload-time = "2025-01-13T22:10:18.831Z" }, - { url = "https://files.pythonhosted.org/packages/a1/72/7ebe03784b220b9adece692adc31ec6e4a1bac96c3e9d3fef511b5ca08aa/azure_cognitiveservices_speech-1.42.0-py3-none-macosx_11_0_arm64.whl", hash = "sha256:9105a64a9d83044790f4f8c9358b6ea66a7c042cbd67173db303501782e62d3f", size = 7277345, upload-time = "2025-01-13T22:10:24.23Z" }, - { url = "https://files.pythonhosted.org/packages/83/f7/9241ad7154e554730ea56271e14ad1115c278b26a81eb892eac16fabb480/azure_cognitiveservices_speech-1.42.0-py3-none-manylinux1_x86_64.whl", hash = "sha256:90890a147499239f37b0b1a5112c51820b90fa2b5adafa0df4da6cc0c211887f", size = 39727186, upload-time = "2025-01-13T22:10:01.628Z" }, - { url = "https://files.pythonhosted.org/packages/fc/fd/af607bdfa95306b13fcdeadcd48d28b80b27cc5e3b99e2bde96f6212cd3a/azure_cognitiveservices_speech-1.42.0-py3-none-manylinux2014_aarch64.whl", hash = "sha256:106fbdb165a215cada47d7e95851e0b9d2755a3f2355369bab4915ad001efe89", size = 39508123, upload-time = "2025-01-13T22:10:13.719Z" }, - { url = "https://files.pythonhosted.org/packages/17/fb/1c998efbfcb1e44f9dc4dbb8b182ea3e5287fdc167aa352aef4685e29435/azure_cognitiveservices_speech-1.42.0-py3-none-win32.whl", hash = "sha256:7d57218beec24360a8b7ce89755c2c133259e3411c233ef0a659b951e4c4c904", size = 2109807, upload-time = "2025-01-13T22:09:50.516Z" }, - { url = "https://files.pythonhosted.org/packages/52/bb/ef7a29f5717cca646be6698d80e542446a6a442be897c8f67bf93551c672/azure_cognitiveservices_speech-1.42.0-py3-none-win_amd64.whl", hash = "sha256:32076ee03b3b402a2e8841f2c21e5cd54dc3ffbf5af183426344727298c8bbd4", size = 2377971, upload-time = "2025-01-13T22:09:44.706Z" }, + { url = "https://files.pythonhosted.org/packages/0b/0d/0752835f079e8d2cc42bb634f3ccd761c8d6e9d0d46a2d6cf7b3ed8e714c/azure_cognitiveservices_speech-1.44.0-py3-none-macosx_10_14_x86_64.whl", hash = "sha256:78037a147ba72abb57e8c10b693d43a1bb029986fae0918f1f9b7d6342737bfe", size = 7492396, upload-time = "2025-05-19T15:46:11.318Z" }, + { url = "https://files.pythonhosted.org/packages/76/1d/d0ed4ec0f51303a2a532dc845eeb72c7729a3c8639b08050f3c1cd96db79/azure_cognitiveservices_speech-1.44.0-py3-none-macosx_11_0_arm64.whl", hash = "sha256:2c9b436326cd8dd82dfa88454b7b68359dfc7149e2ac9029f9bcff155ebd5c95", size = 7347577, upload-time = "2025-05-19T15:46:13.644Z" }, + { url = "https://files.pythonhosted.org/packages/89/c8/f0a4ea8bea014b912046f737e429378ceadad68258395454d62acf7f65bb/azure_cognitiveservices_speech-1.44.0-py3-none-manylinux1_x86_64.whl", hash = "sha256:e5f07fc0587067850288c17aebf33d307d2c1ef9e0b2d11d9f44bff2af400568", size = 40977193, upload-time = "2025-05-19T15:46:15.878Z" }, + { url = "https://files.pythonhosted.org/packages/6a/0d/0a0394e8102d6660afeec6b780c451401f6074b1e19f00e90785529e459e/azure_cognitiveservices_speech-1.44.0-py3-none-manylinux2014_aarch64.whl", hash = "sha256:3461e22cf04816f69a964d936218d920240f987c0656fdaaf46571529ff0f7e6", size = 40747860, upload-time = "2025-05-19T15:46:19.316Z" }, + { url = "https://files.pythonhosted.org/packages/55/ad/3b7f6eca73040821358ce01f22067446a03d876bfed41cd784291706db4c/azure_cognitiveservices_speech-1.44.0-py3-none-win32.whl", hash = "sha256:a3fe7fd67ba7db281ae490de3d71b5a22648454ec2630eb6a70797f666330586", size = 2164045, upload-time = "2025-05-19T15:46:22.373Z" }, + { url = "https://files.pythonhosted.org/packages/83/ac/f491487d7d0e25ae2929b4f07e7f9b7456feb38e65b36fb605b2c9685b10/azure_cognitiveservices_speech-1.44.0-py3-none-win_amd64.whl", hash = "sha256:77cfb5dd40733b7ccc21edc427e9fb4720997832ea8a1ba460dc94345f3588ae", size = 2422937, upload-time = "2025-05-19T15:46:23.657Z" }, ] [[package]] @@ -614,15 +617,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7e/83/a753562020b69fa90cebc39e8af2c753b24dcdc74bee8355ee3f6cefdf34/botocore-1.38.27-py3-none-any.whl", hash = "sha256:a785d5e9a5eda88ad6ab9ed8b87d1f2ac409d0226bba6ff801c55359e94d91a8", size = 13580545, upload-time = "2025-05-30T19:32:26.712Z" }, ] -[[package]] -name = "cachetools" -version = "6.2.0" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/9d/61/e4fad8155db4a04bfb4734c7c8ff0882f078f24294d42798b3568eb63bff/cachetools-6.2.0.tar.gz", hash = "sha256:38b328c0889450f05f5e120f56ab68c8abaf424e1275522b138ffc93253f7e32", size = 30988, upload-time = "2025-08-25T18:57:30.924Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/6c/56/3124f61d37a7a4e7cc96afc5492c78ba0cb551151e530b54669ddd1436ef/cachetools-6.2.0-py3-none-any.whl", hash = "sha256:1c76a8960c0041fcc21097e357f882197c79da0dbff766e7317890a65d7d8ba6", size = 11276, upload-time = "2025-08-25T18:57:29.684Z" }, -] - [[package]] name = "call-processing" version = "0.1.0" @@ -631,7 +625,7 @@ dependencies = [ { name = "dependency-injector" }, { name = "fastapi" }, { name = "httpx" }, - { name = "pipecat-ai", extra = ["azure", "cartesia", "deepgram", "google", "groq", "runner", "silero", "websocket"] }, + { name = "pipecat-ai", extra = ["azure", "cartesia", "deepgram", "google", "groq", "local-smart-turn-v3", "runner", "silero", "websocket"] }, { name = "pydantic" }, { name = "python-dotenv" }, { name = "python-multipart" }, @@ -646,7 +640,7 @@ requires-dist = [ { name = "dependency-injector", specifier = ">=4.46.0,<5.0.0" }, { name = "fastapi", specifier = ">=0.115.2,<1.0.0" }, { name = "httpx", specifier = ">=0.27.0" }, - { name = "pipecat-ai", extras = ["websocket", "cartesia", "google", "silero", "deepgram", "groq", "runner", "azure"], specifier = "==0.0.97" }, + { name = "pipecat-ai", extras = ["websocket", "cartesia", "google", "silero", "deepgram", "groq", "runner", "azure", "local-smart-turn-v3"], specifier = "==0.0.100" }, { name = "pydantic", specifier = ">=2.0.0" }, { name = "python-dotenv", specifier = ">=1.1.0,<2.0.0" }, { name = "python-multipart", specifier = ">=0.0.9" }, @@ -1692,16 +1686,21 @@ wheels = [ [[package]] name = "google-auth" -version = "2.41.0" +version = "2.48.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "cachetools" }, + { name = "cryptography" }, { name = "pyasn1-modules" }, { name = "rsa" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/07/c5/87742f5b5f055514c67f970f7174a876fccff2289a69d460b0614cc7ccfb/google_auth-2.41.0.tar.gz", hash = "sha256:c9d7b534ea4a5d9813c552846797fafb080312263cd4994d6622dd50992ae101", size = 292282, upload-time = "2025-09-29T21:36:35.791Z" } +sdist = { url = "https://files.pythonhosted.org/packages/0c/41/242044323fbd746615884b1c16639749e73665b718209946ebad7ba8a813/google_auth-2.48.0.tar.gz", hash = "sha256:4f7e706b0cd3208a3d940a19a822c37a476ddba5450156c3e6624a71f7c841ce", size = 326522, upload-time = "2026-01-26T19:22:47.157Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/78/ff/a1c426fc9bea7268230bf92340da7d112fae18cf946cafe13ab17d14e6ee/google_auth-2.41.0-py2.py3-none-any.whl", hash = "sha256:d8bed9b53ab63b7b0374656b8e1bef051f95bb14ecc0cf21ba49de7911d62e09", size = 221168, upload-time = "2025-09-29T21:36:33.925Z" }, + { url = "https://files.pythonhosted.org/packages/83/1d/d6466de3a5249d35e832a52834115ca9d1d0de6abc22065f049707516d47/google_auth-2.48.0-py3-none-any.whl", hash = "sha256:2e2a537873d449434252a9632c28bfc268b0adb1e53f9fb62afc5333a975903f", size = 236499, upload-time = "2026-01-26T19:22:45.099Z" }, +] + +[package.optional-dependencies] +requests = [ + { name = "requests" }, ] [[package]] @@ -1902,21 +1901,23 @@ wheels = [ [[package]] name = "google-genai" -version = "1.47.0" +version = "1.60.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "anyio" }, - { name = "google-auth" }, + { name = "distro" }, + { name = "google-auth", extra = ["requests"] }, { name = "httpx" }, { name = "pydantic" }, { name = "requests" }, + { name = "sniffio" }, { name = "tenacity" }, { name = "typing-extensions" }, { name = "websockets" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/9f/97/784fba9bc6c41263ff90cb9063eadfdd755dde79cfa5a8d0e397b067dcf9/google_genai-1.47.0.tar.gz", hash = "sha256:ecece00d0a04e6739ea76cc8dad82ec9593d9380aaabef078990e60574e5bf59", size = 241471, upload-time = "2025-10-29T22:01:02.88Z" } +sdist = { url = "https://files.pythonhosted.org/packages/0a/3f/a753be0dcee352b7d63bc6d1ba14a72591d63b6391dac0cdff7ac168c530/google_genai-1.60.0.tar.gz", hash = "sha256:9768061775fddfaecfefb0d6d7a6cabefb3952ebd246cd5f65247151c07d33d1", size = 487721, upload-time = "2026-01-21T22:17:30.398Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/89/ef/e080e8d67c270ea320956bb911a9359664fc46d3b87d1f029decd33e5c4c/google_genai-1.47.0-py3-none-any.whl", hash = "sha256:e3851237556cbdec96007d8028b4b1f2425cdc5c099a8dc36b72a57e42821b60", size = 241506, upload-time = "2025-10-29T22:01:00.982Z" }, + { url = "https://files.pythonhosted.org/packages/31/e5/384b1f383917b5f0ae92e28f47bc27b16e3d26cd9bacb25e9f8ecab3c8fe/google_genai-1.60.0-py3-none-any.whl", hash = "sha256:967338378ffecebec19a8ed90cf8797b26818bacbefd7846a9280beb1099f7f3", size = 719431, upload-time = "2026-01-21T22:17:28.086Z" }, ] [[package]] @@ -3896,7 +3897,7 @@ wheels = [ [[package]] name = "pipecat-ai" -version = "0.0.97" +version = "0.0.100" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "aiofiles" }, @@ -3917,9 +3918,9 @@ dependencies = [ { name = "soxr" }, { name = "wait-for2", marker = "python_full_version < '3.12'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/8d/23/affeb18644f9aa3d4a5cb36a99160290a27a2a321fc34f53ecef0ebf058a/pipecat_ai-0.0.97.tar.gz", hash = "sha256:71ce27d1b5c9353958e3f4ac0c8ca18b8c00840d0be3f33350bd138d7329c24d", size = 10768149, upload-time = "2025-12-05T23:53:11.077Z" } +sdist = { url = "https://files.pythonhosted.org/packages/53/e9/f377fe49617721d41e3f6d563069849d6d1047c7b4cf4e6dfcb143a868a5/pipecat_ai-0.0.100.tar.gz", hash = "sha256:028a6203094f182461f8065a9c5fd92180038826a52ef1322dac2e258190f9f2", size = 10855536, upload-time = "2026-01-21T03:36:59.613Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/a0/9a/e71f7ec77153fafa509928414086310984f2cce39cad17b915974580208b/pipecat_ai-0.0.97-py3-none-any.whl", hash = "sha256:dc637121ed4aece2053194bf0e94be0b984f2bd887f83118092c1fbd223a9374", size = 10448878, upload-time = "2025-12-05T23:53:08.447Z" }, + { url = "https://files.pythonhosted.org/packages/6c/23/4f77c613fdcd5a581a5df0227bfd6e8554151e3f5a423c1e4b2a42b6d3d5/pipecat_ai-0.0.100-py3-none-any.whl", hash = "sha256:1f832329dcb0f5d26f0a478df3a40007b1d3032c9b84f66d3f80225bb59d2f30", size = 10539423, upload-time = "2026-01-21T03:36:56.974Z" }, ] [package.optional-dependencies] @@ -3943,6 +3944,10 @@ google = [ groq = [ { name = "groq" }, ] +local-smart-turn-v3 = [ + { name = "onnxruntime" }, + { name = "transformers" }, +] runner = [ { name = "fastapi" }, { name = "pipecat-ai-small-webrtc-prebuilt" }, @@ -3959,12 +3964,15 @@ websocket = [ [[package]] name = "pipecat-ai-small-webrtc-prebuilt" -version = "2.0.0" +version = "2.0.4" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "fastapi", extra = ["all"] }, ] -sdist = { url = "https://files.pythonhosted.org/packages/9c/14/8484ab387169779d1a6aaba90e133a067f0810ba5c53c9d0e2b6707857a9/pipecat_ai_small_webrtc_prebuilt-2.0.0.tar.gz", hash = "sha256:ad43b0ff1d4afaeae25241b3a8c2c283896f879d1e5d585ed8ed159db123178d", size = 588667, upload-time = "2025-12-01T00:27:43.27Z" } +sdist = { url = "https://files.pythonhosted.org/packages/2e/88/57b26547ec45623718f1d5beb9e004dba55b7ab548a3b48748466e3e1769/pipecat_ai_small_webrtc_prebuilt-2.0.4.tar.gz", hash = "sha256:3c3447679007ea937c760223bb66579f2605cf94628a68c9da1d66787a96caad", size = 584994, upload-time = "2025-12-30T19:14:52.655Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/9c/6e/332b78d1c7888ff426bd528b150aad0da05024f4f91e56502c359726c07b/pipecat_ai_small_webrtc_prebuilt-2.0.4-py3-none-any.whl", hash = "sha256:054b3cee843fe69191859dbb0693560d9ca08f7d57a9ff0457d0bc741f36f4df", size = 585606, upload-time = "2025-12-30T19:14:50.595Z" }, +] [[package]] name = "platformdirs" From b460cf2c41d7f14c639a9475e79caa8a82267b33 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Tue, 27 Jan 2026 17:17:18 +0530 Subject: [PATCH 05/16] resetting interruption strategies, adding smart turn analyzer - also set stop_secs=0.2 in VADParams as we are using smart turn detection. - removed depereceated interruption strategy setting --- .../controllers/webhook_controller.py | 7 +--- .../services/pipecat_service.py | 42 ++++++++++++++++--- .../call_processing/services/stt_service.py | 10 ++--- .../services/tool_wrapper_service.py | 4 +- 4 files changed, 45 insertions(+), 18 deletions(-) diff --git a/wavefront/server/apps/call_processing/call_processing/controllers/webhook_controller.py b/wavefront/server/apps/call_processing/call_processing/controllers/webhook_controller.py index 1f42e391..fa8f43c4 100644 --- a/wavefront/server/apps/call_processing/call_processing/controllers/webhook_controller.py +++ b/wavefront/server/apps/call_processing/call_processing/controllers/webhook_controller.py @@ -16,8 +16,6 @@ from pipecat.runner.types import WebSocketRunnerArguments from pipecat.runner.utils import parse_telephony_websocket -# from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 -# from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams from pipecat.serializers.twilio import TwilioFrameSerializer from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams @@ -271,13 +269,12 @@ async def websocket_endpoint( vad_analyzer=SileroVADAnalyzer( params=VADParams( confidence=0.7, # Default is 0.7, can lower to 0.4-0.5 for faster detection - start_secs=0.15, # Default is 0.2, keep it - stop_secs=0.8, # KEY: Lower from default 0.8 for faster cutoff (should be 0.2 for smart turn detection) + start_secs=0.2, # Default is 0.2, keep it + stop_secs=0.2, # KEY: Lower from default 0.8 for faster cutoff (should be 0.2 for smart turn detection) min_volume=0.6, # Default is 0.6, adjust based on your audio quality ), ), # Voice Activity Detection serializer=serializer, - # turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), ), ) diff --git a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py index 574ec3c9..ccf23002 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py @@ -12,9 +12,7 @@ # Pipecat core imports from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.adapters.schemas.function_schema import FunctionSchema -from pipecat.audio.interruptions.min_words_interruption_strategy import ( - MinWordsInterruptionStrategy, -) +from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 from pipecat.frames.frames import ( TTSSpeakFrame, EndTaskFrame, @@ -27,6 +25,7 @@ from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, + LLMUserAggregatorParams, ) from pipecat.processors.frame_processor import FrameProcessor, FrameDirection from pipecat.processors.user_idle_processor import UserIdleProcessor @@ -39,6 +38,19 @@ ServiceSwitcherStrategyManual, ) from pipecat.transports.base_transport import BaseTransport +from pipecat.turns.user_mute import ( + FunctionCallUserMuteStrategy, + # MuteUntilFirstBotCompleteUserMuteStrategy, +) +from pipecat.turns.user_turn_strategies import UserTurnStrategies +from pipecat.turns.user_start import ( + VADUserTurnStartStrategy, + MinWordsUserTurnStartStrategy, +) +from pipecat.turns.user_stop import ( + TurnAnalyzerUserTurnStopStrategy, + # TranscriptionUserTurnStopStrategy +) from pipecat.services.llm_service import FunctionCallParams from call_processing.services.stt_service import STTServiceFactory from call_processing.services.tts_service import TTSServiceFactory @@ -333,7 +345,27 @@ async def run_conversation( # Create LLM context and aggregator context = LLMContext(messages, tools=tools_schema) - context_aggregator = LLMContextAggregatorPair(context) + context_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams( + user_turn_strategies=UserTurnStrategies( + start=[ + VADUserTurnStartStrategy(), + MinWordsUserTurnStartStrategy(min_words=3), + ], # List of start strategies + stop=[ + TurnAnalyzerUserTurnStopStrategy( + turn_analyzer=LocalSmartTurnAnalyzerV3() + ), + # TranscriptionUserTurnStopStrategy() # Not needed + ], # List of stop strategies + ), + user_mute_strategies=[ + # MuteUntilFirstBotCompleteUserMuteStrategy(), # Not needed since first message is pre-recorded audio + FunctionCallUserMuteStrategy(), + ], + ), + ) # Create transcript processor for language detection transcript = TranscriptProcessor() @@ -372,8 +404,6 @@ async def run_conversation( audio_out_sample_rate=8000, enable_metrics=True, # enable_usage_metrics=True, - allow_interruptions=True, - interruption_strategies=[MinWordsInterruptionStrategy(min_words=3)], # report_only_initial_ttfb=True ), idle_timeout_secs=20, # Safety net - allows UserIdleProcessor to complete 3 retries (4s each = 12s total) diff --git a/wavefront/server/apps/call_processing/call_processing/services/stt_service.py b/wavefront/server/apps/call_processing/call_processing/services/stt_service.py index 964520e0..117b2ede 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/stt_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/stt_service.py @@ -75,8 +75,8 @@ def _create_deepgram_stt(api_key: str, parameters: Dict[str, Any]): options_dict['encoding'] = parameters['encoding'] if 'sample_rate' in parameters: options_dict['sample_rate'] = parameters['sample_rate'] - if 'endpointing' in parameters: - options_dict['endpointing'] = parameters['endpointing'] + # if 'endpointing' in parameters: # using pipecat VAD + smart turn detection + # options_dict['endpointing'] = parameters['endpointing'] if 'channels' in parameters: options_dict['channels'] = parameters['channels'] if 'smart_format' in parameters: @@ -85,14 +85,14 @@ def _create_deepgram_stt(api_key: str, parameters: Dict[str, Any]): options_dict['punctuate'] = parameters['punctuate'] if 'profanity_filter' in parameters: options_dict['profanity_filter'] = parameters['profanity_filter'] - if 'vad_events' in parameters: - options_dict['vad_events'] = parameters['vad_events'] + # if 'vad_events' in parameters: # depreceated in pipecat 0.99+ + # options_dict['vad_events'] = parameters['vad_events'] # Set smart defaults if not provided options_dict.setdefault( 'interim_results', True ) # Always enable for faster feedback - options_dict.setdefault('endpointing', 300) # 300ms = faster cutoff + # options_dict.setdefault('endpointing', 300) # 300ms = faster cutoff options_dict.setdefault('encoding', 'linear16') options_dict.setdefault('sample_rate', 8000) options_dict.setdefault('model', 'nova-2') diff --git a/wavefront/server/apps/call_processing/call_processing/services/tool_wrapper_service.py b/wavefront/server/apps/call_processing/call_processing/services/tool_wrapper_service.py index d932a965..06dd5422 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/tool_wrapper_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/tool_wrapper_service.py @@ -23,7 +23,7 @@ def __init__( headers: Optional[Dict[str, str]] = None, auth_type: Optional[str] = None, auth_credentials: Optional[Dict[str, str]] = None, - timeout: int = 30, + timeout: int = 6, ): """ Initialize API tool wrapper @@ -197,7 +197,7 @@ def create_wrapper(tool: Dict[str, Any]): headers=config.get('headers', {}), auth_type=config.get('auth_type', 'none'), auth_credentials=config.get('auth_credentials', {}), - timeout=config.get('timeout', 30), + timeout=config.get('timeout', 6), ) elif tool_type == 'python': return PythonToolWrapper(config) From a27956a62eb2277b971178a059f0d4f044d67a42 Mon Sep 17 00:00:00 2001 From: vishnu r kumar Date: Thu, 29 Jan 2026 12:09:43 +0530 Subject: [PATCH 06/16] fix: generating gcs signed urls using workload identity credentials --- .../packages/flo_cloud/flo_cloud/gcp/gcs.py | 30 ++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py b/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py index 217142e2..f131578a 100644 --- a/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py +++ b/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py @@ -8,6 +8,9 @@ from ..exceptions import CloudStorageFileNotFoundError import re from re import Match +from google.auth import iam +from google.auth.transport import requests as google_requests +import google.auth class GCSStorage(CloudStorageHandler): @@ -20,10 +23,32 @@ def __init__(self, credentials_path: Optional[str] = None): Args: credentials_path: Path to GCP credentials JSON file (optional) """ + self.signing_credentials = None if credentials_path: self.client = storage.Client.from_service_account_json(credentials_path) else: - self.client = storage.Client() + self.credentials, self.project_id = google.auth.default() + + if hasattr(self.credentials, 'service_account_email'): + self.service_account_email = self.credentials.service_account_email + else: + # Fallback: get from metadata service or environment + import requests + + metadata_url = 'http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/email' + headers = {'Metadata-Flavor': 'Google'} + response = requests.get(metadata_url, headers=headers) + self.service_account_email = response.text + + self.client = storage.Client( + credentials=self.credentials, project=self.project_id + ) + + self.signing_credentials = iam.Signer( + request=google_requests.Request(), + credentials=self.credentials, + service_account_email=self.service_account_email, + ) def get_file(self, bucket_name: str, file_path: str) -> bytes: """ @@ -143,6 +168,9 @@ def generate_presigned_url( version='v4', expiration=datetime.now(UTC) + timedelta(seconds=expiresIn), method=type, + credentials=self.signing_credentials + if self.signing_credentials + else None, ) return presigned_url except Exception as e: From 95e7a11e0664df439f4d6e858095b84d2802f331 Mon Sep 17 00:00:00 2001 From: vishnu r kumar Date: Thu, 29 Jan 2026 13:18:33 +0530 Subject: [PATCH 07/16] fix: add email & token flow in gcs presigned url --- .../packages/flo_cloud/flo_cloud/gcp/gcs.py | 44 +++++++++---------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py b/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py index f131578a..968623d3 100644 --- a/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py +++ b/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py @@ -8,9 +8,8 @@ from ..exceptions import CloudStorageFileNotFoundError import re from re import Match -from google.auth import iam -from google.auth.transport import requests as google_requests import google.auth +import requests class GCSStorage(CloudStorageHandler): @@ -24,31 +23,14 @@ def __init__(self, credentials_path: Optional[str] = None): credentials_path: Path to GCP credentials JSON file (optional) """ self.signing_credentials = None + self.credential_path = credentials_path + if credentials_path: self.client = storage.Client.from_service_account_json(credentials_path) else: self.credentials, self.project_id = google.auth.default() - if hasattr(self.credentials, 'service_account_email'): - self.service_account_email = self.credentials.service_account_email - else: - # Fallback: get from metadata service or environment - import requests - - metadata_url = 'http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/email' - headers = {'Metadata-Flavor': 'Google'} - response = requests.get(metadata_url, headers=headers) - self.service_account_email = response.text - - self.client = storage.Client( - credentials=self.credentials, project=self.project_id - ) - - self.signing_credentials = iam.Signer( - request=google_requests.Request(), - credentials=self.credentials, - service_account_email=self.service_account_email, - ) + self.client = storage.Client() def get_file(self, bucket_name: str, file_path: str) -> bytes: """ @@ -162,15 +144,29 @@ def generate_presigned_url( if not type: raise ValueError('type cannot be None or empty') + service_account_email = None + token = None + if self.credential_path is None: + r = requests.Request() + self.credentials.refresh(r) + + if hasattr(self.credentials, 'service_account_email'): + service_account_email = self.credentials.service_account_email + print(f'service_account_email: {service_account_email}') + if hasattr(self.credentials, 'token'): + token = self.credentials.token + print(f'token: {token}') + bucket = self.client.bucket(bucket_name) blob = bucket.blob(key) presigned_url = blob.generate_signed_url( version='v4', expiration=datetime.now(UTC) + timedelta(seconds=expiresIn), method=type, - credentials=self.signing_credentials - if self.signing_credentials + service_account_email=service_account_email + if service_account_email else None, + token=token if token else None, ) return presigned_url except Exception as e: From 1e6741fb9475d7a68fa14ddb83122ba263cc8ec0 Mon Sep 17 00:00:00 2001 From: vishnu r kumar Date: Thu, 29 Jan 2026 13:30:38 +0530 Subject: [PATCH 08/16] fix: change request type --- wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py b/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py index 968623d3..5c8c2f63 100644 --- a/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py +++ b/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py @@ -8,8 +8,8 @@ from ..exceptions import CloudStorageFileNotFoundError import re from re import Match +from google.auth.transport import requests as google_requests import google.auth -import requests class GCSStorage(CloudStorageHandler): @@ -147,7 +147,7 @@ def generate_presigned_url( service_account_email = None token = None if self.credential_path is None: - r = requests.Request() + r = google_requests.Request() self.credentials.refresh(r) if hasattr(self.credentials, 'service_account_email'): From 8d35f5a56d51c4f4696c9e43dbc8d5d804ead84e Mon Sep 17 00:00:00 2001 From: vishnu r kumar Date: Thu, 29 Jan 2026 13:43:12 +0530 Subject: [PATCH 09/16] fix: change keyword argument to access_token --- wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py b/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py index 5c8c2f63..d94672f3 100644 --- a/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py +++ b/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py @@ -166,7 +166,7 @@ def generate_presigned_url( service_account_email=service_account_email if service_account_email else None, - token=token if token else None, + access_token=token if token else None, ) return presigned_url except Exception as e: From 0a2e56758c1a9740a5e6d819ae7440feb72bf959 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Mon, 2 Feb 2026 17:12:04 +0530 Subject: [PATCH 10/16] language and endconversation detection tool (#210) * added language and env_conversation detection tool * lang detection -> parallel pipeline instead of service switcher * little prompt fix for tool --- .../services/conversation_completion_tool.py | 88 +++ .../services/language_detection_tool.py | 199 +++++++ .../services/pipecat_service.py | 523 +++++++++--------- 3 files changed, 534 insertions(+), 276 deletions(-) create mode 100644 wavefront/server/apps/call_processing/call_processing/services/conversation_completion_tool.py create mode 100644 wavefront/server/apps/call_processing/call_processing/services/language_detection_tool.py diff --git a/wavefront/server/apps/call_processing/call_processing/services/conversation_completion_tool.py b/wavefront/server/apps/call_processing/call_processing/services/conversation_completion_tool.py new file mode 100644 index 00000000..c7f393b8 --- /dev/null +++ b/wavefront/server/apps/call_processing/call_processing/services/conversation_completion_tool.py @@ -0,0 +1,88 @@ +""" +Conversation Completion Tool for Voice Agents + +Provides LLM-callable conversation ending capabilities +""" + +from typing import Dict, Any, Callable +from pipecat.services.llm_service import FunctionCallParams +from pipecat.frames.frames import TTSSpeakFrame, EndTaskFrame +from pipecat.processors.frame_processor import FrameDirection +from call_processing.log.logger import logger + + +class ConversationCompletionToolFactory: + """Factory for creating conversation completion tool with runtime context""" + + @staticmethod + def create_conversation_completion_tool( + task_container: Dict[str, Any], + ) -> Callable: + """ + Create conversation completion tool function with captured context + + Args: + task_container: Dictionary containing PipelineTask (populated after task creation) + Format: {'task': PipelineTask | None} + + Returns: + Async function compatible with Pipecat's function calling + """ + + async def end_conversation(params: FunctionCallParams): + """ + LLM-callable function to end the conversation gracefully + + This function is called by the LLM when it determines the user + wants to end the conversation. It sends a farewell message and + terminates the pipeline. + + Parameters (from LLM): + farewell_message: str - Optional custom farewell message + (defaults to standard goodbye) + """ + try: + # Get task from container + task = task_container.get('task') + if not task: + error_msg = ( + 'Pipeline task not initialized in conversation completion tool' + ) + logger.error(error_msg) + await params.result_callback({'success': False, 'error': error_msg}) + return + + # Extract parameters + arguments = params.arguments + farewell_message = arguments.get( + 'farewell_message', 'Thank you for using our service! Goodbye!' + ) + + logger.info( + f'Conversation completion tool called - Farewell: "{farewell_message}"' + ) + + # Send farewell message via TTS + await params.llm.push_frame(TTSSpeakFrame(farewell_message)) + + # End the conversation + await params.llm.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM) + + logger.info('Conversation ended by LLM decision') + + # Return success result + await params.result_callback( + { + 'success': True, + 'status': 'complete', + 'farewell_sent': True, + 'farewell_message': farewell_message, + } + ) + + except Exception as e: + error_msg = f'Error ending conversation: {str(e)}' + logger.error(error_msg, exc_info=True) + await params.result_callback({'success': False, 'error': error_msg}) + + return end_conversation diff --git a/wavefront/server/apps/call_processing/call_processing/services/language_detection_tool.py b/wavefront/server/apps/call_processing/call_processing/services/language_detection_tool.py new file mode 100644 index 00000000..87b5b1e7 --- /dev/null +++ b/wavefront/server/apps/call_processing/call_processing/services/language_detection_tool.py @@ -0,0 +1,199 @@ +""" +Language Detection Tool for Multi-Language Voice Agents + +Provides LLM-callable language detection and switching capabilities +""" + +from typing import Dict, Any, List, Callable +from pipecat.services.llm_service import FunctionCallParams +from pipecat.frames.frames import LLMMessagesUpdateFrame +from call_processing.log.logger import logger +from call_processing.constants.language_config import LANGUAGE_INSTRUCTIONS + + +class LanguageDetectionToolFactory: + """Factory for creating language detection tool with runtime context""" + + @staticmethod + def create_language_detection_tool( + task_container: Dict[str, Any], + language_switcher: Any, + stt_language_switcher: Any, + context_container: Dict[str, Any], + supported_languages: List[str], + default_language: str, + language_state: Dict[str, Any], + ) -> Callable: + """ + Create language detection tool function with captured context + + Args: + task_container: Dictionary containing PipelineTask (populated after task creation) + Format: {'task': PipelineTask | None} + language_switcher: LanguageSwitcher instance that manages TTS routing + stt_language_switcher: STTLanguageSwitcher instance that manages STT routing + context_container: Dictionary containing LLMContext (populated after context creation) + Format: {'context': LLMContext | None} + supported_languages: List of supported language codes + default_language: Default language code + language_state: Dictionary to track current language and switch count + Format: {'current_language': str, 'switch_count': int, 'original_system_prompt': str} + + Returns: + Async function compatible with Pipecat's function calling + """ + + async def detect_and_switch_language(params: FunctionCallParams): + """ + LLM-callable function to detect and switch conversation language + + This function is called by the LLM when it determines the user + wants to switch to a different language. It validates the request, + performs the service switch, and updates the system prompt. + + Parameters (from LLM): + target_language: str - Language code to switch to (e.g., 'es', 'hi', 'en') + user_intent: str - User's stated language preference (for logging) + """ + try: + # Get task and context from containers + task = task_container.get('task') + if not task: + error_msg = ( + 'Pipeline task not initialized in language detection tool' + ) + logger.error(error_msg) + await params.result_callback({'success': False, 'error': error_msg}) + return + + context = context_container.get('context') + if not context: + error_msg = 'LLM context not initialized in language detection tool' + logger.error(error_msg) + await params.result_callback({'success': False, 'error': error_msg}) + return + + # Extract parameters + arguments = params.arguments + target_language = arguments.get('target_language', '').lower() + user_intent = arguments.get('user_intent', 'Unknown') + + current_language = language_state.get( + 'current_language', default_language + ) + switch_count = language_state.get('switch_count', 0) + + logger.info( + f'Language detection tool called - Target: {target_language}, ' + f'Current: {current_language}, User intent: {user_intent}' + ) + + # Validation 1: Check if target language is supported + if target_language not in supported_languages: + error_msg = ( + f"Language '{target_language}' is not supported. " + f"Supported languages: {', '.join(supported_languages)}" + ) + logger.warning(error_msg) + await params.result_callback( + { + 'success': False, + 'error': error_msg, + 'current_language': current_language, + 'supported_languages': supported_languages, + } + ) + return + + # Validation 2: Check if already in target language + if target_language == current_language: + logger.info(f'Already using language: {target_language}') + await params.result_callback( + { + 'success': True, + 'message': f'Already using {target_language}', + 'current_language': current_language, + 'switch_performed': False, + } + ) + return + + # Perform language switch + try: + # Update TTS language switcher state + language_switcher.set_language(target_language) + + # Update STT language switcher state + stt_language_switcher.set_language(target_language) + + logger.info( + f'Switched TTS and STT language from {current_language} to {target_language}' + ) + + # Update system prompt with language instruction + language_instruction = LANGUAGE_INSTRUCTIONS.get( + target_language, + LANGUAGE_INSTRUCTIONS.get('en', 'Respond in English.'), + ) + + # Get base prompt without language instruction (must exist for multi-language) + base_prompt = language_state.get('original_system_prompt') + if not base_prompt: + error_msg = 'Original system prompt not found in language state' + logger.error(error_msg) + await params.result_callback( + {'success': False, 'error': error_msg} + ) + return + + # Append new language instruction to clean base prompt + updated_content = f'{base_prompt}\n\n{language_instruction}' + updated_system_message = { + 'role': 'system', + 'content': updated_content, + } + + # Update context + current_messages = context.get_messages() + new_messages = [updated_system_message] + current_messages[1:] + await task.queue_frame( + LLMMessagesUpdateFrame(new_messages, run_llm=False) + ) + + logger.info( + f'Updated system prompt with {target_language} instruction' + ) + + # Update state + language_state['current_language'] = target_language + language_state['switch_count'] = switch_count + 1 + + # Return success result + await params.result_callback( + { + 'success': True, + 'message': f'Language switched to {target_language}', + 'previous_language': current_language, + 'current_language': target_language, + 'switch_performed': True, + 'switch_count': language_state['switch_count'], + } + ) + + except Exception as e: + error_msg = f'Error switching services: {str(e)}' + logger.error(error_msg, exc_info=True) + await params.result_callback( + { + 'success': False, + 'error': error_msg, + 'current_language': current_language, + } + ) + + except Exception as e: + error_msg = f'Error in language detection tool: {str(e)}' + logger.error(error_msg, exc_info=True) + await params.result_callback({'success': False, 'error': error_msg}) + + return detect_and_switch_language diff --git a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py index ccf23002..4db6a46e 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py @@ -13,12 +13,8 @@ from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 -from pipecat.frames.frames import ( - TTSSpeakFrame, - EndTaskFrame, - ManuallySwitchServiceFrame, - LLMMessagesUpdateFrame, -) +from pipecat.frames.frames import Frame +from pipecat.pipeline.parallel_pipeline import ParallelPipeline from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -27,16 +23,15 @@ LLMContextAggregatorPair, LLMUserAggregatorParams, ) -from pipecat.processors.frame_processor import FrameProcessor, FrameDirection -from pipecat.processors.user_idle_processor import UserIdleProcessor +from pipecat.processors.filters.function_filter import FunctionFilter from pipecat.processors.transcript_processor import ( TranscriptProcessor, - TranscriptionMessage, -) -from pipecat.pipeline.service_switcher import ( - ServiceSwitcher, - ServiceSwitcherStrategyManual, ) + +# from pipecat.pipeline.service_switcher import ( +# ServiceSwitcher, +# ServiceSwitcherStrategyManual, +# ) from pipecat.transports.base_transport import BaseTransport from pipecat.turns.user_mute import ( FunctionCallUserMuteStrategy, @@ -45,105 +40,114 @@ from pipecat.turns.user_turn_strategies import UserTurnStrategies from pipecat.turns.user_start import ( VADUserTurnStartStrategy, - MinWordsUserTurnStartStrategy, + # MinWordsUserTurnStartStrategy, ) from pipecat.turns.user_stop import ( TurnAnalyzerUserTurnStopStrategy, # TranscriptionUserTurnStopStrategy ) -from pipecat.services.llm_service import FunctionCallParams from call_processing.services.stt_service import STTServiceFactory from call_processing.services.tts_service import TTSServiceFactory from call_processing.services.llm_service import LLMServiceFactory +from call_processing.services.conversation_completion_tool import ( + ConversationCompletionToolFactory, +) from call_processing.constants.language_config import ( - LANGUAGE_KEYWORDS, LANGUAGE_INSTRUCTIONS, ) -# Advanced handler with retry logic -async def handle_user_idle(processor: FrameProcessor, retry_count): - if retry_count == 1: - # First attempt - gentle reminder - await processor.push_frame(TTSSpeakFrame('Are you still there?')) - return True # Continue monitoring - elif retry_count == 2: - # Second attempt - more direct prompt - await processor.push_frame( - TTSSpeakFrame('Would you like to continue our conversation?') - ) - return True # Continue monitoring - else: - # Third attempt - end conversation - await processor.push_frame( - TTSSpeakFrame("I'll leave you for now. Have a nice day!") - ) - await processor.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM) - return False # Stop monitoring - - -async def evaluate_completion_criteria(params: FunctionCallParams): +class STTLanguageSwitcher(ParallelPipeline): """ - Check if the last user message contains goodbye-related phrases. - Returns True if goodbye detected, False otherwise. + ParallelPipeline that routes STT to different language-specific services + based on current language state. Same pattern as LanguageSwitcher for TTS. """ - context = params.context - - # Get the conversation messages - messages = context.get_messages() - - # Find the last user message - last_user_message = None - for message in reversed(messages): - if message.get('role') == 'user': - last_user_message = message.get('content', '').lower() - break - - # If no user message found, conversation is not complete - if not last_user_message: - return False - - # List of goodbye phrases to check - goodbye_phrases = [ - 'goodbye', - 'bye', - 'good bye', - 'see you', - 'talk to you later', - 'ttyl', - 'have a good day', - 'take care', - 'farewell', - 'later', - 'peace out', - ] - - # Check if any goodbye phrase is in the message - return any(phrase in last_user_message for phrase in goodbye_phrases) - - -async def check_conversation_complete(params: FunctionCallParams): + + def __init__( + self, + stt_services: Dict[str, Any], + supported_languages: List[str], + default_language: str, + ): + self._current_language = default_language + self._stt_services = stt_services + self._supported_languages = supported_languages + + # Build parallel routes: one per language + routes = [] + for lang_code in supported_languages: + filter_func = self._create_language_filter(lang_code) + stt_service = stt_services[lang_code] + routes.append([FunctionFilter(filter_func), stt_service]) + + super().__init__(*routes) + + def _create_language_filter(self, lang_code: str): + """Create filter function for specific language""" + + async def language_filter(_: Frame) -> bool: + return self._current_language == lang_code + + return language_filter + + @property + def current_language(self): + return self._current_language + + def set_language(self, language_code: str): + """Update current language (called by language detection tool)""" + if language_code in self._supported_languages: + self._current_language = language_code + logger.info(f'STTLanguageSwitcher: Language set to {language_code}') + else: + logger.warning(f'STTLanguageSwitcher: Invalid language {language_code}') + + +class LanguageSwitcher(ParallelPipeline): """ - Function to check if conversation should end based on goodbye detection. + ParallelPipeline that routes TTS to different language-specific services + based on current language state. """ - # Check if goodbye is present - conversation_complete = await evaluate_completion_criteria(params) - if conversation_complete: - # Send farewell message - await params.llm.push_frame( - TTSSpeakFrame('Thank you for using our service! Goodbye!') - ) - # End the conversation - await params.llm.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM) - - # Return result to LLM - await params.result_callback( - { - 'status': 'complete' if conversation_complete else 'continuing', - 'goodbye_detected': conversation_complete, - } - ) + def __init__( + self, + tts_services: Dict[str, Any], + supported_languages: List[str], + default_language: str, + ): + self._current_language = default_language + self._tts_services = tts_services + self._supported_languages = supported_languages + + # Build parallel routes: one per language + # Each route: [FunctionFilter, TTS service] + routes = [] + for lang_code in supported_languages: + filter_func = self._create_language_filter(lang_code) + tts_service = tts_services[lang_code] + routes.append([FunctionFilter(filter_func), tts_service]) + + super().__init__(*routes) + + def _create_language_filter(self, lang_code: str): + """Create filter function for specific language""" + + async def language_filter(_: Frame) -> bool: + return self._current_language == lang_code + + return language_filter + + @property + def current_language(self): + return self._current_language + + def set_language(self, language_code: str): + """Update current language (called by language detection tool)""" + if language_code in self._supported_languages: + self._current_language = language_code + logger.info(f'LanguageSwitcher: Language set to {language_code}') + else: + logger.warning(f'LanguageSwitcher: Invalid language {language_code}') class PipecatService: @@ -189,6 +193,13 @@ async def run_conversation( f'default: {default_language}, multi-language: {is_multi_language}' ) + # Track language state for multi-language conversations + language_state = { + 'current_language': default_language, + 'switch_count': 0, + 'original_system_prompt': '', + } + # Create LLM service (language-agnostic) llm = LLMServiceFactory.create_llm_service(llm_config) @@ -210,8 +221,7 @@ async def run_conversation( 'parameters': stt_parameters or {}, } - # Create STT/TTS services with multi-language support if needed - stt_services = {} + # Create TTS services (one per language for multi-language mode) tts_services = {} if is_multi_language: @@ -219,7 +229,7 @@ async def run_conversation( f'Multi-language mode enabled for languages: {supported_languages}' ) - # Create STT/TTS services for each supported language + # Create TTS services for each supported language for lang_code in supported_languages: # Get voice ID for this language voice_id_for_lang = tts_voice_ids_dict.get(lang_code) @@ -229,59 +239,57 @@ async def run_conversation( ) voice_id_for_lang = default_voice_id - # Deep clone configs to avoid mutating original configs - stt_config_lang = deepcopy(stt_config_with_params) + # Deep clone config to avoid mutating original tts_config_lang = deepcopy(tts_config_with_params) - # Update language in parameters - if 'parameters' not in stt_config_lang: - stt_config_lang['parameters'] = {} - stt_config_lang['parameters']['language'] = lang_code - + # Update language parameters if 'parameters' not in tts_config_lang: tts_config_lang['parameters'] = {} tts_config_lang['parameters']['language'] = lang_code - - # Set language-specific voice ID tts_config_lang['voice_id'] = voice_id_for_lang - # Create services - stt_services[lang_code] = STTServiceFactory.create_stt_service( - stt_config_lang - ) + # Create TTS service tts_services[lang_code] = TTSServiceFactory.create_tts_service( tts_config_lang ) - logger.info( - f'Created STT/TTS services for language: {lang_code} ' + f'Created TTS service for language: {lang_code} ' f'with voice: {voice_id_for_lang}' ) - # Create service switchers with manual strategy - # Order services list with default language first (ServiceSwitcher uses first as initial) - stt_services_list = [] - tts_services_list = [] + # Create per-language STT services (same pattern as TTS) + stt_services = {} + for lang_code in supported_languages: + stt_config_lang = deepcopy(stt_config_with_params) + if 'parameters' not in stt_config_lang: + stt_config_lang['parameters'] = {} + stt_config_lang['parameters']['language'] = lang_code - # Add default language service first - if default_language in stt_services: - stt_services_list.append(stt_services[default_language]) - tts_services_list.append(tts_services[default_language]) + stt_services[lang_code] = STTServiceFactory.create_stt_service( + stt_config_lang + ) + logger.info(f'Created STT service for language: {lang_code}') - # Add remaining services - for lang_code in supported_languages: - if lang_code != default_language: - stt_services_list.append(stt_services[lang_code]) - tts_services_list.append(tts_services[lang_code]) + # Create STTLanguageSwitcher for STT routing + stt = STTLanguageSwitcher( + stt_services=stt_services, + supported_languages=supported_languages, + default_language=default_language, + ) + logger.info( + f'Initialized STTLanguageSwitcher with default language: {default_language}' + ) - stt = ServiceSwitcher( - services=stt_services_list, strategy_type=ServiceSwitcherStrategyManual + # Create LanguageSwitcher for TTS routing + tts = LanguageSwitcher( + tts_services=tts_services, + supported_languages=supported_languages, + default_language=default_language, ) - tts = ServiceSwitcher( - services=tts_services_list, strategy_type=ServiceSwitcherStrategyManual + logger.info( + f'Initialized LanguageSwitcher with default language: {default_language}' ) - logger.info(f'Initialized with default language: {default_language}') else: logger.info('Single language mode - no language detection needed') @@ -290,11 +298,26 @@ async def run_conversation( tts = TTSServiceFactory.create_tts_service(tts_config_with_params) # Create initial messages with system prompt + base_system_prompt = ( + f'Customer phone number: {customer_number}\n' + + agent_config['system_prompt'] + ) + + # Add language instruction for default language if multi-language + if is_multi_language: + initial_language_instruction = LANGUAGE_INSTRUCTIONS.get( + default_language, LANGUAGE_INSTRUCTIONS.get('en', 'Respond in English.') + ) + system_content = f'{base_system_prompt}\n\n{initial_language_instruction}' + # Store base prompt without language instruction for switching + language_state['original_system_prompt'] = base_system_prompt + else: + system_content = base_system_prompt + messages = [ { 'role': 'system', - 'content': f'Customer phone number: {customer_number}\n' - + agent_config['system_prompt'], + 'content': system_content, } ] @@ -326,32 +349,110 @@ async def run_conversation( else: logger.info(f'No tools configured for agent {agent_id}') - # Register built-in function handler with LLM service - llm.register_function( - 'check_conversation_complete', check_conversation_complete - ) + # Create containers for late binding (populated after creation) + task_container = {'task': None} + context_container = {'context': None} - # Create FunctionSchema for check_conversation_complete - check_complete_schema = FunctionSchema( - name='check_conversation_complete', - description='Check if conversation should end based on goodbye detection', - properties={}, # No parameters needed + # Register language detection tool if multi-language enabled + if is_multi_language: + from call_processing.services.language_detection_tool import ( + LanguageDetectionToolFactory, + ) + + language_detection_func = ( + LanguageDetectionToolFactory.create_language_detection_tool( + task_container=task_container, + language_switcher=tts, # Pass the TTS LanguageSwitcher instance + stt_language_switcher=stt, # Pass the STT LanguageSwitcher instance + context_container=context_container, + supported_languages=supported_languages, + default_language=default_language, + language_state=language_state, + ) + ) + + llm.register_function('detect_and_switch_language', language_detection_func) + logger.info('Registered language detection tool with LLM') + + # Register conversation completion tool + conversation_completion_func = ( + ConversationCompletionToolFactory.create_conversation_completion_tool( + task_container=task_container + ) + ) + llm.register_function('end_conversation', conversation_completion_func) + logger.info('Registered conversation completion tool with LLM') + + # Create FunctionSchema for conversation completion + end_conversation_schema = FunctionSchema( + name='end_conversation', + description=( + 'Call this function when the user indicates they want to end the conversation. ' + 'This includes goodbye phrases, expressions of completion, or any indication ' + 'that the user wants to hang up or finish the call. Examples: "goodbye", "bye", ' + '"thank you", "that\'s all", "I\'m done", etc.' + ), + properties={ + 'farewell_message': { + 'type': 'string', + 'description': ( + 'Optional custom farewell message to say to the user before ending. ' + 'If not provided, uses default: "Thank you for using our service! Goodbye!"' + ), + } + }, required=[], ) + # Create FunctionSchema for language detection (if multi-language) + language_detection_schemas = [] + if is_multi_language: + language_detection_schema = FunctionSchema( + name='detect_and_switch_language', + description=( + f"Detect and switch the conversation language. Call this whenever the user " + f"indicates a language preference, including: responding with a language name " + f"(e.g., 'Hindi', 'Spanish', 'English'), requesting a switch (e.g., 'switch to Hindi', " + f"'I want to speak in Spanish'), or selecting a language when asked for their preference. " + f"Even a single word like 'Hindi' or 'Spanish' should trigger this tool if it refers to a language choice. " + f"Supported languages: {', '.join(supported_languages)}. " + f"Current language: {language_state['current_language']}." + ), + properties={ + 'target_language': { + 'type': 'string', + 'description': f"Target language code. Must be one of: {', '.join(supported_languages)}", + 'enum': supported_languages, + }, + 'user_intent': { + 'type': 'string', + 'description': "The user's statement indicating language preference (for logging)", + }, + }, + required=['target_language', 'user_intent'], + ) + language_detection_schemas.append(language_detection_schema) + # Combine all FunctionSchema objects for ToolsSchema - all_function_schemas = [check_complete_schema] + function_schemas + all_function_schemas = ( + [end_conversation_schema] + language_detection_schemas + function_schemas + ) tools_schema = ToolsSchema(standard_tools=all_function_schemas) # Create LLM context and aggregator context = LLMContext(messages, tools=tools_schema) + + # Populate context container for language detection tool (if multi-language) + if is_multi_language: + context_container['context'] = context + context_aggregator = LLMContextAggregatorPair( context, user_params=LLMUserAggregatorParams( user_turn_strategies=UserTurnStrategies( start=[ VADUserTurnStartStrategy(), - MinWordsUserTurnStartStrategy(min_words=3), + # MinWordsUserTurnStartStrategy(min_words=3), ], # List of start strategies stop=[ TurnAnalyzerUserTurnStopStrategy( @@ -370,21 +471,11 @@ async def run_conversation( # Create transcript processor for language detection transcript = TranscriptProcessor() - # Track current language detection state (only for multi-language) - language_detected = {'detected': False, 'current_language': default_language} - - # Create user idle processor (fresh instance for each conversation) - user_idle = UserIdleProcessor( - callback=handle_user_idle, - timeout=4.0, - ) - # Build pipeline components list pipeline_components = [ transport.input(), # Audio input from Twilio stt, # Speech-to-Text (ServiceSwitcher for multi-lang, direct for single) transcript.user(), # Transcript processor for user messages - user_idle, # User idle detection context_aggregator.user(), # Add user message to context llm, # LLM processing tts, # Text-to-Speech (ServiceSwitcher for multi-lang, direct for single) @@ -406,132 +497,12 @@ async def run_conversation( # enable_usage_metrics=True, # report_only_initial_ttfb=True ), - idle_timeout_secs=20, # Safety net - allows UserIdleProcessor to complete 3 retries (4s each = 12s total) + idle_timeout_secs=20, ) - # Multi-language detection event handler + # Populate task container for language detection tool (if multi-language) if is_multi_language: - - @transcript.event_handler('on_transcript_update') - async def handle_language_detection(processor, frame): - """Detect language from first user message and switch services""" - - # Only detect once - if language_detected['detected']: - return - - messages: List[TranscriptionMessage] = frame.messages - - # Look for user messages - for message in messages: - if message.role == 'user': - message_content = message.content.lower().strip() - - # Skip empty messages - if not message_content: - continue - - logger.info( - f"Analyzing message for language detection: '{message_content}'" - ) - - # Check for language keywords - detected_lang = None - for lang_code in supported_languages: - keywords = LANGUAGE_KEYWORDS.get(lang_code, []) - for keyword in keywords: - if keyword.lower() in message_content: - detected_lang = lang_code - logger.info( - f'Language detected: {detected_lang} ' - f"(matched keyword: '{keyword}')" - ) - break - if detected_lang: - break - - # Use detected language or fallback to default - target_language = detected_lang or default_language - - if not detected_lang: - logger.info( - f'No language detected, using default: {default_language}' - ) - - # Mark detection as complete - language_detected['detected'] = True - language_detected['current_language'] = target_language - - # Only switch services if target language is different from default - if target_language != default_language: - if ( - target_language in stt_services - and target_language in tts_services - ): - target_stt = stt_services[target_language] - target_tts = tts_services[target_language] - - try: - await task.queue_frames( - [ - ManuallySwitchServiceFrame( - service=target_stt - ), - ManuallySwitchServiceFrame( - service=target_tts - ), - ] - ) - logger.info( - f'Switched STT/TTS services to language: {target_language}' - ) - except Exception as e: - logger.error( - f'Error switching services: {e}', exc_info=True - ) - else: - logger.info( - f'Language {target_language} is default, no service switch needed' - ) - - # Update LLM system prompt with language instruction - language_instruction = LANGUAGE_INSTRUCTIONS.get( - target_language, - LANGUAGE_INSTRUCTIONS.get('en', 'Respond in English.'), - ) - - # Get current system prompt and append language instruction - current_messages = context.get_messages() - if current_messages and len(current_messages) > 0: - system_message = current_messages[0] - else: - system_message = { - 'role': 'system', - 'content': agent_config['system_prompt'], - } - - updated_content = ( - f"{system_message['content']}\n\n{language_instruction}" - ) - updated_system_message = { - 'role': 'system', - 'content': updated_content, - } - - # Update context with new system message - new_messages = [updated_system_message] + current_messages[1:] - await task.queue_frame( - LLMMessagesUpdateFrame(new_messages, run_llm=False) - ) - - logger.info( - f'Updated LLM context with language instruction for {target_language}' - ) - - # Exit after first detection - break - - logger.info('Language detection event handler registered') + task_container['task'] = task # Register event handlers @transport.event_handler('on_client_connected') From b0899d7b6f73d0000462f8a963acb898e7c4948e Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Mon, 2 Feb 2026 17:39:11 +0530 Subject: [PATCH 11/16] resolved comments --- .../call_processing/controllers/webhook_controller.py | 8 +++++++- .../call_processing/services/llm_service.py | 6 +++++- .../call_processing/services/pipecat_service.py | 3 +-- .../voice_agents_module/models/tool_schemas.py | 4 ++-- wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py | 1 - 5 files changed, 15 insertions(+), 7 deletions(-) diff --git a/wavefront/server/apps/call_processing/call_processing/controllers/webhook_controller.py b/wavefront/server/apps/call_processing/call_processing/controllers/webhook_controller.py index fa8f43c4..f3477201 100644 --- a/wavefront/server/apps/call_processing/call_processing/controllers/webhook_controller.py +++ b/wavefront/server/apps/call_processing/call_processing/controllers/webhook_controller.py @@ -142,7 +142,7 @@ async def twiml_endpoint( """ Twilio TwiML endpoint - Called by Twilio when call connects (directly or via inbound webhook redirect). + Called by Twilio when call connects (directly or via outbound webhook redirect). Returns TwiML XML with WebSocket connection instructions. Query params: @@ -235,6 +235,12 @@ async def websocket_endpoint( await websocket.close(code=1008, reason='Missing voice_agent_id') return + if not customer_number: + logger.warning( + 'customer_number not found in stream parameters, using empty string' + ) + customer_number = '' + logger.info(f'Voice agent ID: {voice_agent_id}') # Convert voice_agent_id to UUID diff --git a/wavefront/server/apps/call_processing/call_processing/services/llm_service.py b/wavefront/server/apps/call_processing/call_processing/services/llm_service.py index c75035ca..3bd00fd9 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/llm_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/llm_service.py @@ -52,7 +52,11 @@ def create_llm_service(llm_config: Dict[str, Any]): if llm_type == 'openai': return LLMServiceFactory._create_openai_llm(api_key, model, parameters) elif llm_type == 'azure_openai': - base_url = llm_config['base_url'] + base_url = llm_config.get('base_url') + if not base_url: + raise ValueError( + 'Azure OpenAI requires base_url (endpoint) to be configured' + ) return LLMServiceFactory._create_azure_llm( api_key, model, parameters, base_url ) diff --git a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py index 4db6a46e..c677fd0c 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py @@ -501,8 +501,7 @@ async def run_conversation( ) # Populate task container for language detection tool (if multi-language) - if is_multi_language: - task_container['task'] = task + task_container['task'] = task # Register event handlers @transport.event_handler('on_client_connected') diff --git a/wavefront/server/modules/voice_agents_module/voice_agents_module/models/tool_schemas.py b/wavefront/server/modules/voice_agents_module/voice_agents_module/models/tool_schemas.py index 128b42bc..9f7c3865 100644 --- a/wavefront/server/modules/voice_agents_module/voice_agents_module/models/tool_schemas.py +++ b/wavefront/server/modules/voice_agents_module/voice_agents_module/models/tool_schemas.py @@ -32,7 +32,7 @@ class ApiToolConfig(BaseModel): default=None, description='HTTP headers to include' ) timeout: int = Field( - default=30, ge=1, le=300, description='Request timeout in seconds' + default=6, ge=1, le=300, description='Request timeout in seconds' ) auth_type: Optional[str] = Field( default='none', description='Authentication type (none, bearer, api_key)' @@ -89,7 +89,7 @@ class PythonToolConfig(BaseModel): code_storage_key: str = Field(..., description='Cloud Storage key for Python code') cloud_run_url: str = Field(..., description='Cloud Run service URL') timeout: int = Field( - default=60, ge=1, le=300, description='Execution timeout in seconds' + default=6, ge=1, le=300, description='Execution timeout in seconds' ) resource_limits: Optional[Dict[str, Any]] = Field( default=None, description='CPU and memory limits' diff --git a/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py b/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py index d94672f3..620ed7f3 100644 --- a/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py +++ b/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py @@ -155,7 +155,6 @@ def generate_presigned_url( print(f'service_account_email: {service_account_email}') if hasattr(self.credentials, 'token'): token = self.credentials.token - print(f'token: {token}') bucket = self.client.bucket(bucket_name) blob = bucket.blob(key) From c594ff60a8b27990e06793b66af56783e4df2f59 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Mon, 2 Feb 2026 18:05:42 +0530 Subject: [PATCH 12/16] resolved client review comments --- .../apps/[appId]/voice-agents/CreateVoiceAgentDialog.tsx | 9 ++++++--- .../apps/[appId]/voice-agents/EditVoiceAgentDialog.tsx | 1 + 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/wavefront/client/src/pages/apps/[appId]/voice-agents/CreateVoiceAgentDialog.tsx b/wavefront/client/src/pages/apps/[appId]/voice-agents/CreateVoiceAgentDialog.tsx index cf06c56c..961e7423 100644 --- a/wavefront/client/src/pages/apps/[appId]/voice-agents/CreateVoiceAgentDialog.tsx +++ b/wavefront/client/src/pages/apps/[appId]/voice-agents/CreateVoiceAgentDialog.tsx @@ -56,9 +56,11 @@ const createVoiceAgentSchema = z.object({ tts_config_id: z.string().min(1, 'TTS configuration is required'), stt_config_id: z.string().min(1, 'STT configuration is required'), telephony_config_id: z.string().min(1, 'Telephony configuration is required'), - tts_voice_ids: z.record(z.string(), z.string()).refine((val) => Object.keys(val).length > 0, { - message: 'At least one voice ID is required', - }), + tts_voice_ids: z + .record(z.string(), z.string().min(1, 'Voice ID must not be empty')) + .refine((val) => Object.keys(val).length > 0, { + message: 'At least one voice ID is required', + }), system_prompt: z.string().min(1, 'System prompt is required'), welcome_message: z.string().min(1, 'Welcome message is required'), conversation_config: z.string().optional(), @@ -145,6 +147,7 @@ const CreateVoiceAgentDialog: React.FC = ({ isOpen, watchedSupportedLanguages.forEach((lang) => { newState[lang] = prev[lang] || ''; }); + form.setValue('tts_voice_ids', newState); return newState; }); } diff --git a/wavefront/client/src/pages/apps/[appId]/voice-agents/EditVoiceAgentDialog.tsx b/wavefront/client/src/pages/apps/[appId]/voice-agents/EditVoiceAgentDialog.tsx index 6676fb4b..2380144e 100644 --- a/wavefront/client/src/pages/apps/[appId]/voice-agents/EditVoiceAgentDialog.tsx +++ b/wavefront/client/src/pages/apps/[appId]/voice-agents/EditVoiceAgentDialog.tsx @@ -173,6 +173,7 @@ const EditVoiceAgentDialog: React.FC = ({ watchedSupportedLanguages.forEach((lang) => { newState[lang] = prev[lang] || ''; }); + form.setValue('tts_voice_ids', newState); return newState; }); } From bdb3f10f0721f932afaaedf72c434ab16390998d Mon Sep 17 00:00:00 2001 From: vishnu r kumar Date: Mon, 2 Feb 2026 19:23:50 +0530 Subject: [PATCH 13/16] fix: reuse gcs function in pdo service --- .../insights_module/service/pdo_service.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/wavefront/server/modules/insights_module/insights_module/service/pdo_service.py b/wavefront/server/modules/insights_module/insights_module/service/pdo_service.py index f7016a1e..d6228543 100644 --- a/wavefront/server/modules/insights_module/insights_module/service/pdo_service.py +++ b/wavefront/server/modules/insights_module/insights_module/service/pdo_service.py @@ -1,7 +1,5 @@ from abc import ABC from abc import abstractmethod -from datetime import datetime -from datetime import timedelta import json import re @@ -9,6 +7,7 @@ from common_module.utils.odata_parser import prepare_odata_filter from google.cloud import storage from insights_module.service.insights_service import InsightsService +from flo_cloud.gcp.gcs import GCSStorage class PdoService(ABC): @@ -112,6 +111,7 @@ def __init__( self._transcript_bucket_name = transcript_bucket_name self._audio_bucket_name = audio_bucket_name self.client = storage.Client() + self.storage = GCSStorage() def get_bucket_key(self, value: str): match = re.match(r'gs://([^/]+)/(.+)', value) @@ -131,13 +131,12 @@ def fetch_upto_limit(self, filter, limit, offset, table_name=None): def fetch_audio(self, url): audio_bucket_name, key = self.get_bucket_key(url) - expiration = timedelta(minutes=30) - bucket = self.client.bucket(audio_bucket_name) - blob = bucket.blob(key) - - presigned_url = blob.generate_signed_url( - version='v4', expiration=datetime.utcnow() + expiration, method='GET' + presigned_url = self.storage.generate_presigned_url( + bucket_name=audio_bucket_name, + key=key, + type='GET', + expiresIn=300, ) return presigned_url From e6208450877ab41d165a86513b5b18c318e07b90 Mon Sep 17 00:00:00 2001 From: Vishnu Satis Date: Mon, 2 Feb 2026 16:54:00 +0000 Subject: [PATCH 14/16] Support for image in middleware proxy (#211) --- .../api_services_module/api_services_module/core/proxy.py | 3 ++- .../api_services_module/api_services_module/core/router.py | 7 ++++++- .../api_services_module/models/service.py | 4 ++-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/wavefront/server/modules/api_services_module/api_services_module/core/proxy.py b/wavefront/server/modules/api_services_module/api_services_module/core/proxy.py index 5e85373e..b1aa8212 100644 --- a/wavefront/server/modules/api_services_module/api_services_module/core/proxy.py +++ b/wavefront/server/modules/api_services_module/api_services_module/core/proxy.py @@ -65,6 +65,7 @@ async def process_request( query_params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, body: Optional[Any] = None, + trace: bool = False, ) -> Union[ProxyResponse, Response]: """ Process an API request through the proxy pipeline. @@ -144,7 +145,7 @@ async def process_request( # Create successful response for JSON/text content response = ProxyResponse.success( data=context.response_body, - trace=context.execution_trace, + trace=context.execution_trace if trace else None, message='Request processed successfully', http_status_code=context.response_status, ) diff --git a/wavefront/server/modules/api_services_module/api_services_module/core/router.py b/wavefront/server/modules/api_services_module/api_services_module/core/router.py index bf4d4bab..971e9352 100644 --- a/wavefront/server/modules/api_services_module/api_services_module/core/router.py +++ b/wavefront/server/modules/api_services_module/api_services_module/core/router.py @@ -285,7 +285,11 @@ async def dynamic_proxy_handler(request: Request, response: Response): else: body_bytes = await request.body() if body_bytes: - body = body_bytes.decode('utf-8') + body = body_bytes + + trace = False + if query_params: + trace = query_params.get('trace', '0') == '1' # Process request through proxy # Client always uses POST, but backend will use api_method from config @@ -299,6 +303,7 @@ async def dynamic_proxy_handler(request: Request, response: Response): query_params=query_params, headers=headers, body=body, + trace=trace, ) # Set response status code diff --git a/wavefront/server/modules/api_services_module/api_services_module/models/service.py b/wavefront/server/modules/api_services_module/api_services_module/models/service.py index 0703f95d..c37eea25 100644 --- a/wavefront/server/modules/api_services_module/api_services_module/models/service.py +++ b/wavefront/server/modules/api_services_module/api_services_module/models/service.py @@ -116,7 +116,7 @@ class ProxyResponse: def success( cls, data: Any, - trace: List[str], + trace: Optional[List[str]] = None, message: str = 'Success', http_status_code: int = 200, ) -> 'ProxyResponse': @@ -131,7 +131,7 @@ def success( def error( cls, message: str, - trace: List[str], + trace: Optional[List[str]] = None, status: str = 'error', http_status_code: int = 500, ) -> 'ProxyResponse': From b5993b930aeabeb18b4c5971c55154ad685c4b62 Mon Sep 17 00:00:00 2001 From: vizsatiz Date: Tue, 3 Feb 2026 12:18:04 +0530 Subject: [PATCH 15/16] fix for json enabled in inference api --- .../agents_module/controllers/agent_controller.py | 8 +++++++- .../agents_module/models/agent_schemas.py | 2 +- .../services/agent_inference_service.py | 14 ++++---------- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/wavefront/server/modules/agents_module/agents_module/controllers/agent_controller.py b/wavefront/server/modules/agents_module/agents_module/controllers/agent_controller.py index aae8268b..1b1e1c62 100644 --- a/wavefront/server/modules/agents_module/agents_module/controllers/agent_controller.py +++ b/wavefront/server/modules/agents_module/agents_module/controllers/agent_controller.py @@ -21,6 +21,7 @@ LlmInferenceConfigService, ) from tools_module.registry.tool_loader import ToolLoader +from flo_ai import FloUtils agents_router = APIRouter() @@ -204,8 +205,13 @@ async def agent_inference_v2( content=response_formatter.buildErrorResponse(str(e)), ) + if agent_inference_payload.output_json_enabled: + result = FloUtils.extract_jsons_from_string(result[-1].content) + else: + result = result[-1].content + response_data = AgentInferenceResponse( - result=result[-1].content, + result=result, agent_id=str(agent_id), namespace=namespace, execution_time=execution_time, diff --git a/wavefront/server/modules/agents_module/agents_module/models/agent_schemas.py b/wavefront/server/modules/agents_module/agents_module/models/agent_schemas.py index 38083ff4..de3efee9 100644 --- a/wavefront/server/modules/agents_module/agents_module/models/agent_schemas.py +++ b/wavefront/server/modules/agents_module/agents_module/models/agent_schemas.py @@ -40,7 +40,7 @@ class AgentInferenceRequest(BaseModel): class AgentInferenceResponse(BaseModel): """Response model for agent inference""" - result: str = Field(..., description='The inference result from the agent') + result: str | dict = Field(..., description='The inference result from the agent') agent_id: str = Field( ..., description='The ID of the agent that performed the inference' ) diff --git a/wavefront/server/modules/agents_module/agents_module/services/agent_inference_service.py b/wavefront/server/modules/agents_module/agents_module/services/agent_inference_service.py index b8ffd603..3d49a2ec 100644 --- a/wavefront/server/modules/agents_module/agents_module/services/agent_inference_service.py +++ b/wavefront/server/modules/agents_module/agents_module/services/agent_inference_service.py @@ -7,7 +7,7 @@ from db_repo_module.models.llm_inference_config import LlmInferenceConfig from db_repo_module.models.message_processors import MessageProcessors from db_repo_module.repositories.sql_alchemy_repository import SQLAlchemyRepository -from flo_ai import AgentBuilder, Agent, BaseMessage, FloUtils +from flo_ai import AgentBuilder, Agent, BaseMessage from flo_ai.llm import OpenAI, Anthropic, Gemini, OllamaLLM, OpenAIVLLM from flo_ai.tool.base_tool import Tool from flo_cloud.cloud_storage import CloudStorageManager @@ -237,7 +237,7 @@ async def run_agent_inference( variables: Dict[str, Any], agent_name: str, output_json_enabled: bool = True, - ) -> tuple[str, float]: + ) -> tuple[List[BaseMessage], float]: """ Run agent inference with provided variables @@ -257,13 +257,7 @@ async def run_agent_inference( start_time = time.time() # Use a generic prompt that allows the agent to use the variables - result_str = await agent.run(inputs, variables=variables) - - # Conditionally extract JSON based on output_json_enabled flag - if output_json_enabled: - result = FloUtils.extract_jsons_from_string(result_str) - else: - result = result_str + result: List[BaseMessage] = await agent.run(inputs, variables=variables) execution_time = time.time() - start_time logger.info( @@ -324,7 +318,7 @@ async def perform_inference_v2( output_json_enabled: bool = True, access_token: Optional[str] = None, app_key: Optional[str] = None, - ) -> tuple[str, float, str]: + ) -> tuple[List[BaseMessage], float, str]: """ Complete inference workflow (v2): fetch agent from DB + cloud storage, run inference From 452755f406ff2abc56856890345d5cc431d6ba0b Mon Sep 17 00:00:00 2001 From: vizsatiz Date: Tue, 3 Feb 2026 12:21:40 +0530 Subject: [PATCH 16/16] Adding control for log levels --- wavefront/server/apps/floware/floware/server.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/wavefront/server/apps/floware/floware/server.py b/wavefront/server/apps/floware/floware/server.py index 8ac2e37b..bc27fd68 100644 --- a/wavefront/server/apps/floware/floware/server.py +++ b/wavefront/server/apps/floware/floware/server.py @@ -551,14 +551,17 @@ async def global_exception_handler(request: Request, exc: Exception): # Running with Uvicorn (for local development) if __name__ == '__main__': + worker_count = os.getenv('FLOWARE_WORKER_COUNT', 4) + uvicorn_log_level = os.getenv('UVICORN_LOG_LEVEL', 'critical') + print(f'Starting application in environment: {environment}') if environment == 'production': uvicorn.run( 'server:app', host='0.0.0.0', port=8001, - workers=4, - log_level='critical', + workers=worker_count, + log_level=uvicorn_log_level, forwarded_allow_ips='*', ) else: