diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..537b47f --- /dev/null +++ b/.env.example @@ -0,0 +1,32 @@ +# TAP Voice Agent — Environment Variables +# Author: Dashpreet Singh +# Copy this to .env and fill in your values. + +# ── VAPI (Voice Orchestration) ────────────────────────────────────────────── +VAPI_API_KEY=your_vapi_api_key_here +VAPI_PHONE_NUMBER_ID=your_vapi_phone_number_id +VAPI_WEBHOOK_SECRET=your_webhook_secret_here + +# ── Sarvam AI (Indic STT + TTS) ──────────────────────────────────────────── +# Get key at: https://dashboard.sarvam.ai +SARVAM_API_KEY=your_sarvam_api_key_here + +# ── WhatsApp Business API (Meta) ──────────────────────────────────────────── +WHATSAPP_TOKEN=your_whatsapp_business_token +WHATSAPP_PHONE_NUMBER_ID=your_whatsapp_phone_number_id + +# ── TAP LMS (Frappe) ──────────────────────────────────────────────────────── +FRAPPE_BASE_URL=https://lms.theapprenticeproject.org +FRAPPE_API_KEY=your_frappe_api_key +FRAPPE_API_SECRET=your_frappe_api_secret + +# ── OpenAI (LLM for conversation) ─────────────────────────────────────────── +OPENAI_API_KEY=your_openai_api_key_here + +# ── Redis (session state + rate limiting) ──────────────────────────────────── +REDIS_HOST=localhost +REDIS_PORT=6379 + +# ── Server ─────────────────────────────────────────────────────────────────── +PORT=8000 +DEBUG=false diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cff5543 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.env +__pycache__/ +*.pyc diff --git a/README.md b/README.md index 04e1239..1c949c7 100644 --- a/README.md +++ b/README.md @@ -1 +1,162 @@ -# C4GT_2026 \ No newline at end of file +# TAP Multilingual Voice Agent + +**Author:** Dashpreet Singh +**Email:** dashpreetsinghhanda@gmail.com | 2024ucs0087@iitjammu.ac.in +**Institution:** IIT Jammu, B.Tech CSE 2024–2028 +**Project:** [DMP 2026] Building Multilingual Voice Agents — The Apprentice Project + +--- + +## What This Is + +An AI-powered multilingual voice agent that proactively calls students and parents in Hindi, Marathi, and Punjabi to re-engage them with TAP's learning platform. When a student goes inactive, Didi (the agent persona) calls them, speaks in their language, knows exactly where they left off, and gently guides them back. + +Unlike SMS, a voice call can answer "what do I do next?" — making it significantly more effective for government school demographics with low digital literacy. + +--- + +## Architecture + +``` +User (student/parent phone) + │ + ▼ + VAPI Platform ──── Sarvam AI STT (Indic ASR) + │ ──── GPT-4o-mini (Didi persona) + │ ──── Sarvam AI TTS (Indic voices) + │ + ▼ + FastAPI Server (this repo) + │ + ├── LanguageDetector — script-aware Hindi/Marathi/Punjabi detection + ├── DropoutRiskModel — logistic regression on 6 LMS signals + ├── NudgeOrchestrator — decides who, how, when to call + ├── ConversationFlow — structured dialogue trees + LLM prompts + ├── ExperimentFramework — A/B testing: voice vs WhatsApp vs control + │ + ├── VAPI Client — outbound call orchestration + ├── WhatsApp Client — voice note fallback (Sarvam TTS → Meta API) + └── Frappe Client — TAP LMS REST API integration +``` + +--- + +## Key Innovations + +### 1. Sarvam AI over Whisper +Whisper is trained on clean internet audio. Sarvam AI is specifically trained on Indian telephony audio — noisy, accented, code-switched. For government school calls, this matters enormously. + +### 2. Dropout Risk Model +A lightweight logistic regression over 6 LMS signals (inactivity days, completion %, engagement decay rate, session duration, streak, total sessions). Online learning: every call outcome updates the model weights. The system gets smarter as it runs. + +### 3. Conversational Memory +Redis stores the last 10 call outcomes per student. If math nudges haven't worked for Riya in 2 calls, the system switches to a different topic she responded to before. + +### 4. Parent vs Student Routing +Grade ≤ 6 or hour ≥ 18 → call parent with a different script. The agent knows it's talking to a parent and adjusts vocabulary, tone, and ask accordingly. + +### 5. Channel Fallback Chain +`voice_call → WhatsApp voice note → WhatsApp text` +Voice notes have 3–5× higher open rates than text messages for this demographic. + +### 6. A/B Experimentation Built In +Every nudge is assigned to an experiment arm (control / voice / WhatsApp voice / WhatsApp text) via deterministic hashing. Return-to-platform rate is the primary metric, tracked via a Frappe webhook when the student opens the app. + +--- + +## Setup + +```bash +git clone https://github.com/DZDasherKTB/tap-voice-agent +cd tap-voice-agent + +python -m venv venv +source venv/bin/activate +pip install -r requirements.txt + +cp .env.example .env +# Fill in your API keys in .env + +# Start Redis +docker run -d -p 6379:6379 redis:alpine + +# Run the server +python main.py +``` + +--- + +## Run Tests + +```bash +python -m pytest tests/test_suite.py -v +# Expected: all tests pass, no external services required +``` + +--- + +## API Endpoints + +| Method | Endpoint | Description | +|---|---|---| +| `POST` | `/api/nudge/student` | Trigger nudge for one student | +| `POST` | `/api/nudge/batch` | Trigger batch nudge for all at-risk students | +| `GET` | `/api/experiments/report` | A/B experiment metrics | +| `POST` | `/api/detect-language` | Debug: detect language from text | +| `GET` | `/api/health` | Health check | +| `POST` | `/webhook/vapi` | VAPI call events (internal) | +| `POST` | `/webhook/lms/login` | Student login after nudge (internal) | + +--- + +## Scheduler + +Batch nudges run automatically: +- **10:00 AM IST** — morning session (students before school) +- **6:00 PM IST** — evening session (after school, parents home) + +--- + +## Supported Languages + +| Language | Script | Kokoro | Sarvam STT | Sarvam TTS Voice | +|---|---|---|---|---| +| Hindi | Devanagari | `hi-IN` | ✅ | meera (F), arjun (M) | +| Marathi | Devanagari | `mr-IN` | ✅ | anushka (F), rajan (M) | +| Punjabi | Gurmukhi | `pa-IN` | ✅ | preet (F), gurpreet (M) | + +--- + +## File Structure + +``` +tap-voice-agent/ +├── main.py # FastAPI app, webhooks, scheduler +├── config.py # Central configuration +├── requirements.txt +├── .env.example +├── agent/ +│ ├── language_detector.py # Script-aware language detection +│ └── conversation_flow.py # Dialogue trees, system prompts, closings +├── lms/ +│ └── frappe_client.py # TAP Frappe LMS REST client +├── nudge/ +│ ├── dropout_risk.py # Logistic regression risk scorer +│ └── orchestrator.py # Master pipeline: score → call → log +├── telephony/ +│ ├── vapi_client.py # VAPI outbound call client +│ └── whatsapp_client.py # WhatsApp voice note + text fallback +├── experiments/ +│ └── framework.py # A/B experiment assignment + metrics +└── tests/ + └── test_suite.py # 35+ unit tests, no external services needed +``` + +--- + +## Contact + +Dashpreet Singh +dashpreetsinghhanda@gmail.com +2024ucs0087@iitjammu.ac.in +IIT Jammu — B.Tech CSE 2024–2028 diff --git a/agent/conversation_flow.py b/agent/conversation_flow.py new file mode 100644 index 0000000..68cead0 --- /dev/null +++ b/agent/conversation_flow.py @@ -0,0 +1,257 @@ +# agent/conversation_flow.py — Structured conversational flow engine +# Author: Dashpreet Singh +# +# Builds system prompts and opening utterances for each nudge scenario. +# Uses structured dialogue trees with LLM filling the dynamic parts. +# Keeps the agent on-script — no hallucinated facts about the student. + +from dataclasses import dataclass +from enum import Enum +from typing import Optional + +from agent.language_detector import Language + + +class NudgeType(str, Enum): + RETURN_TO_LEARNING = "return" # hasn't logged in + LESSON_COMPLETION = "completion" # stuck mid-module + STREAK_RECOVERY = "streak" # lost streak + PARENT_AWARENESS = "parent" # call to parent + CELEBRATION = "celebration" # milestone hit — positive nudge + + +# ── Opening utterances per language per nudge type ────────────────────────── +# These are the first thing Didi says. Short, warm, personal. + +OPENINGS: dict[NudgeType, dict[Language, str]] = { + NudgeType.RETURN_TO_LEARNING: { + Language.HINDI: ( + "Namaste {name}! Main Didi hoon TAP se. " + "Aap kuch dino se padhai nahi kar rahe the, " + "socha aapka haal chaal loon. Sab theek hai?" + ), + Language.MARATHI: ( + "Namaskar {name}! Mi Didi ahe TAP madhe. " + "Tumhi kaahi divsaanpasoon shiklelaaat nahi, " + "tumchi kaaljee vaaTali. Sab kaay theek ahe ka?" + ), + Language.PUNJABI: ( + "Sat Sri Akal {name}! Main Didi haan TAP ton. " + "Tusi kujh dina ton padhai nahi kiti, " + "socheya haal-chaal puchain. Sab theek ae?" + ), + }, + NudgeType.LESSON_COMPLETION: { + Language.HINDI: ( + "Namaste {name}! Main Didi hoon TAP se. " + "Aap '{lesson}' lesson pe ruk gaye hain — " + "kya koi mushkil aa rahi hai? Main madad kar sakti hoon." + ), + Language.MARATHI: ( + "Namaskar {name}! Mi Didi ahe TAP madhe. " + "Tumhi '{lesson}' lesson madhye adaklelaaat — " + "kaahi aDchan ahe ka? Mi madad karu shakate." + ), + Language.PUNJABI: ( + "Sat Sri Akal {name}! Main Didi haan TAP ton. " + "Tusi '{lesson}' lesson te ruk gaye ho — " + "ki koi mushkil aa rahi ae? Main madad kar sakdi haan." + ), + }, + NudgeType.STREAK_RECOVERY: { + Language.HINDI: ( + "Namaste {name}! Didi bol rahi hoon. " + "Aapka padhai ka streak toot gaya — " + "lekin koi baat nahi, aaj se phir shuru kar sakte hain. " + "Sirf 10 minute kaafi hain!" + ), + Language.MARATHI: ( + "Namaskar {name}! Didi bolte ahe. " + "Tumcha streak gela — pan kaahi harkat nahi, " + "aajpasun parat suru karu shaktot. " + "Fakt 10 minute puresc ahet!" + ), + Language.PUNJABI: ( + "Sat Sri Akal {name}! Didi bol rahi haan. " + "Tera streak tuT gaya — par koi gal nahi, " + "aaj ton muD shuru kar sakde haan. " + "Sirf 10 minute kaafi ne!" + ), + }, + NudgeType.PARENT_AWARENESS: { + Language.HINDI: ( + "Namaste! Main TAP se Didi bol rahi hoon. " + "Main {name} ke baare mein baat karna chahti thi — " + "aapka bachcha bahut accha seekh raha hai, " + "lekin kuch dino se app nahi khola. " + "Kya aap unhe thoda encourage kar sakte hain?" + ), + Language.MARATHI: ( + "Namaskar! Mi TAP madhoon Didi bolte ahe. " + "{name} baddal bolayache hote — " + "tumcha mule chan shiktat, " + "pan kaahi diwasaanpasoon app ughadale naahi. " + "Tyaanla thodi protsahan deu shakata ka?" + ), + Language.PUNJABI: ( + "Sat Sri Akal! Main TAP ton Didi bol rahi haan. " + "{name} baare gal karni si — " + "tera bacha bahut changi tarah sikhda ae, " + "par kujh dina ton app nahi kholeya. " + "Ki tusi unnu thoda encourage kar sakde ho?" + ), + }, + NudgeType.CELEBRATION: { + Language.HINDI: ( + "Namaste {name}! Bahut bahut badhai ho! " + "Aapne '{milestone}' complete kar liya — " + "ye sach mein bahut bada kaam hai. " + "Main bahut khush hoon aapke liye!" + ), + Language.MARATHI: ( + "Namaskar {name}! Khup khup abhinandan! " + "Tumhi '{milestone}' poorn kela — " + "he kharch khup moThe kaam ahe. " + "Mala tumchyabaddal khup aanand hoto!" + ), + Language.PUNJABI: ( + "Sat Sri Akal {name}! Bahut bahut wadhaiaan! " + "Tune '{milestone}' mukkamal kar leya — " + "ee sach mein bahut waDa kaam ae. " + "Main bahut khush haan tere layi!" + ), + }, +} + +# ── Fallback responses when student doesn't understand ────────────────────── + +FALLBACKS: dict[Language, list[str]] = { + Language.HINDI: [ + "Koi baat nahi, main dobara poochti hoon.", + "Aap bas haan ya nahi boliye — itna kaafi hai.", + "Suno, kya main kal phir call karoon?", + ], + Language.MARATHI: [ + "Kaahi harkat nahi, mi parat vicharate.", + "Fakt ho kinwa nahi sanga — itke pures ahe.", + "Mee udya parat call karu ka?", + ], + Language.PUNJABI: [ + "Koi gal nahi, main muD puchhdi haan.", + "Bas haan ya nahi kaho — itna hi kaafi ae.", + "Ki main kal muD call karan?", + ], +} + +# ── Escalation trigger phrases ─────────────────────────────────────────────── + +ESCALATION_TRIGGERS = [ + "problem", "mushkil", "pareshaan", "help", "madad", + "nahi samajh", "aDchan", "dukhi", "sad", "beemar", "sick", + "school nahi", "shala nahi", "pathshala nahi", +] + + +@dataclass +class ConversationContext: + student_id: str + student_name: str + language: Language + nudge_type: NudgeType + current_lesson: str + completion_pct: float + days_inactive: int + streak_days: int + is_parent_call: bool + call_attempt_number: int # 1st, 2nd call this week + # Filled during conversation + turn_count: int = 0 + consecutive_fallbacks: int = 0 + escalate_to_human: bool = False + student_committed: bool = False # did they say they'll log in? + + +class ConversationFlowEngine: + """Builds prompts and manages dialogue state for Didi agent calls.""" + + def get_opening(self, ctx: ConversationContext) -> str: + template = OPENINGS.get(ctx.nudge_type, OPENINGS[NudgeType.RETURN_TO_LEARNING]) + lang_map = template.get(ctx.language, template.get(Language.HINDI, "")) + return lang_map.format( + name=ctx.student_name.split()[0], # first name only + lesson=ctx.current_lesson, + milestone=ctx.current_lesson, + ) + + def build_system_prompt(self, ctx: ConversationContext) -> str: + lang_names = { + Language.HINDI: "Hindi", + Language.MARATHI: "Marathi", + Language.PUNJABI: "Punjabi", + } + lang_name = lang_names.get(ctx.language, "Hindi") + + return f"""You are Didi, a warm and encouraging voice learning companion for TAP (The Apprentice Project). + +LANGUAGE: Always respond in {lang_name}. If the student switches language, follow them. +STUDENT: {ctx.student_name}, Grade student, {'parent call' if ctx.is_parent_call else 'direct student call'} +CONTEXT: + - Days inactive: {ctx.days_inactive} + - Current lesson: {ctx.current_lesson} + - Course completion: {ctx.completion_pct:.0f}% + - Streak days: {ctx.streak_days} + - Call attempt: {ctx.call_attempt_number} this week + +GOAL: Gently encourage {ctx.student_name} to open the TAP app and do at least 10 minutes of learning today. + +RULES: +1. Never lecture or scold. Be warm, curious, celebratory. +2. Keep responses under 3 sentences — this is a voice call. +3. Ask one question at a time. +4. If the student mentions a problem (school, health, family), acknowledge it warmly and offer to connect them with a teacher. +5. If they agree to log in today, confirm enthusiastically and end the call warmly. +6. If they don't respond after 2 attempts, gently offer to call another day. +7. NEVER make up facts about their progress beyond what's given above. +8. Do NOT mention competitors, other apps, or platforms. + +ESCALATION: If the student seems distressed, in trouble, or unable to attend school, say: +"Main aapke teacher ko bata deti hoon — woh aapse baat karenge" and set escalation flag.""" + + def should_escalate(self, user_text: str) -> bool: + text_lower = user_text.lower() + return any(trigger in text_lower for trigger in ESCALATION_TRIGGERS) + + def get_fallback(self, ctx: ConversationContext) -> str: + fallbacks = FALLBACKS.get(ctx.language, FALLBACKS[Language.HINDI]) + idx = min(ctx.consecutive_fallbacks, len(fallbacks) - 1) + return fallbacks[idx] + + def should_end_call(self, ctx: ConversationContext) -> bool: + return ( + ctx.turn_count >= 8 or + ctx.consecutive_fallbacks >= 3 or + ctx.escalate_to_human or + ctx.student_committed + ) + + def get_closing(self, ctx: ConversationContext) -> str: + closings = { + Language.HINDI: { + True: "Bahut achha! Aaj app zaroor kholiye. Main aapka intezaar kar rahi hoon. Dhanyavaad, aur padhai mein aage badhte rahiye!", + False: "Koi baat nahi. Main phir call karoongi. Apna khayal rakhna. Jai Hind!", + }, + Language.MARATHI: { + True: "Khup chan! Aaj app nakki ugha. Mi waT paahat ahe. Dhanyavaad, aani shikanyaat pudhe chala!", + False: "Kaahi harkat nahi. Mi parat call karate. Swataachi kaaljee ghya. Jai Hind!", + }, + Language.PUNJABI: { + True: "Bahut changi gal! Aaj app zaroor kholo. Main tera intezaar kar rahi haan. Shukriya, te padhaii vich aage vadhde raho!", + False: "Koi gal nahi. Main muD call karan gi. Apna khayal rakho. Jai Hind!", + }, + } + lang_closings = closings.get(ctx.language, closings[Language.HINDI]) + return lang_closings[ctx.student_committed] + + +# Module-level singleton +flow_engine = ConversationFlowEngine() diff --git a/agent/language_detector.py b/agent/language_detector.py new file mode 100644 index 0000000..25ecb75 --- /dev/null +++ b/agent/language_detector.py @@ -0,0 +1,185 @@ +# agent/language_detector.py — Real-time language detection & routing +# Author: Dashpreet Singh +# +# Handles: +# 1. Script-level detection (Devanagari / Gurmukhi / Latin) +# 2. Code-switching mid-sentence (very common in India) +# 3. Language routing to correct STT/TTS voice + +import re +import unicodedata +from dataclasses import dataclass +from enum import Enum + + +class Language(str, Enum): + HINDI = "hi" + MARATHI = "mr" + PUNJABI = "pa" + UNKNOWN = "unknown" + + +# Unicode block ranges +_DEVANAGARI_RE = re.compile(r'[\u0900-\u097F]') # Hindi + Marathi +_GURMUKHI_RE = re.compile(r'[\u0A00-\u0A7F]') # Punjabi +_LATIN_RE = re.compile(r'[a-zA-Z]') + +# Marathi-specific character markers (not in standard Hindi) +_MARATHI_MARKERS = { + 'ळ', 'ऱ', 'ऩ', 'ॉ', # retroflex + ॉ vowel common in Marathi +} + +# Hindi-specific high-frequency words (romanised callers) +_HINDI_ROMAN_WORDS = { + 'haan', 'nahi', 'kya', 'mujhe', 'mera', 'acha', 'theek', 'bahut', + 'padhai', 'school', 'bhai', 'didi', 'sir', 'madam' +} +_MARATHI_ROMAN_WORDS = { + 'ho', 'nako', 'kay', 'mala', 'mazha', 'chan', 'bara', 'khup', + 'shala', 'tai', 'dada', 'bai' +} +_PUNJABI_ROMAN_WORDS = { + 'haan', 'nahi', 'ki', 'menu', 'mera', 'changa', 'theek', 'bahut', + 'pathshala', 'veer', 'penji', 'ji' +} + +# Sarvam AI TTS speaker IDs per language + gender +SARVAM_VOICES = { + Language.HINDI: { + "female": "meera", # warm, teacher-like + "male": "arjun", + }, + Language.MARATHI: { + "female": "anushka", + "male": "rajan", + }, + Language.PUNJABI: { + "female": "preet", + "male": "gurpreet", + }, +} + + +@dataclass +class DetectionResult: + language: Language + confidence: float # 0.0 – 1.0 + script: str # "devanagari" | "gurmukhi" | "latin" | "mixed" + is_code_switched: bool # True if sentence mixes scripts + dominant_script_ratio: float + + +class LanguageDetector: + """Fast, script-aware language detector for real-time voice agent use. + + Detection priority: + 1. Script analysis (character-level, instant) + 2. Marathi vs Hindi disambiguation via marker characters + 3. Romanised text → keyword matching + 4. Fallback to student's registered language preference + """ + + def detect(self, text: str, fallback: Language = Language.HINDI) -> DetectionResult: + if not text or not text.strip(): + return DetectionResult( + language=fallback, + confidence=0.0, + script="unknown", + is_code_switched=False, + dominant_script_ratio=0.0, + ) + + deva_chars = len(_DEVANAGARI_RE.findall(text)) + guru_chars = len(_GURMUKHI_RE.findall(text)) + latin_chars = len(_LATIN_RE.findall(text)) + total = deva_chars + guru_chars + latin_chars or 1 + + deva_ratio = deva_chars / total + guru_ratio = guru_chars / total + latin_ratio = latin_chars / total + + is_code_switched = ( + (deva_ratio > 0.1 and latin_ratio > 0.1) or + (guru_ratio > 0.1 and latin_ratio > 0.1) + ) + + # ── Script-dominant path ────────────────────────────────────── + if guru_ratio > 0.3: + return DetectionResult( + language=Language.PUNJABI, + confidence=min(0.95, 0.6 + guru_ratio * 0.4), + script="gurmukhi", + is_code_switched=is_code_switched, + dominant_script_ratio=guru_ratio, + ) + + if deva_ratio > 0.3: + # Distinguish Marathi from Hindi via marker characters + marathi_score = sum(1 for ch in text if ch in _MARATHI_MARKERS) + if marathi_score >= 2: + lang = Language.MARATHI + conf = min(0.9, 0.65 + deva_ratio * 0.3) + else: + lang = Language.HINDI + conf = min(0.9, 0.65 + deva_ratio * 0.3) + return DetectionResult( + language=lang, + confidence=conf, + script="devanagari", + is_code_switched=is_code_switched, + dominant_script_ratio=deva_ratio, + ) + + # ── Romanised text path ─────────────────────────────────────── + if latin_ratio > 0.5: + words = set(text.lower().split()) + hindi_hits = len(words & _HINDI_ROMAN_WORDS) + marathi_hits = len(words & _MARATHI_ROMAN_WORDS) + punjabi_hits = len(words & _PUNJABI_ROMAN_WORDS) + + scores = { + Language.HINDI: hindi_hits, + Language.MARATHI: marathi_hits, + Language.PUNJABI: punjabi_hits, + } + best = max(scores, key=scores.__getitem__) + best_score = scores[best] + + if best_score == 0: + return DetectionResult( + language=fallback, + confidence=0.3, + script="latin", + is_code_switched=False, + dominant_script_ratio=latin_ratio, + ) + + total_hits = sum(scores.values()) or 1 + return DetectionResult( + language=best, + confidence=min(0.8, 0.4 + best_score / total_hits * 0.5), + script="latin", + is_code_switched=False, + dominant_script_ratio=latin_ratio, + ) + + # ── Fallback ───────────────────────────────────────────────── + return DetectionResult( + language=fallback, + confidence=0.3, + script="mixed", + is_code_switched=is_code_switched, + dominant_script_ratio=max(deva_ratio, guru_ratio, latin_ratio), + ) + + def get_tts_voice( + self, + language: Language, + gender: str = "female", + ) -> str: + voices = SARVAM_VOICES.get(language, SARVAM_VOICES[Language.HINDI]) + return voices.get(gender, voices["female"]) + + +# Module-level singleton +detector = LanguageDetector() diff --git a/config.py b/config.py new file mode 100644 index 0000000..29e5123 --- /dev/null +++ b/config.py @@ -0,0 +1,117 @@ +# config.py — Central configuration for TAP Voice Agent +# Author: Dashpreet Singh +# IIT Jammu, B.Tech CSE 2024-2028 + +import os +from dataclasses import dataclass, field +from typing import Optional + + +@dataclass +class VAPIConfig: + api_key: str = os.getenv("VAPI_API_KEY", "") + phone_number_id: str = os.getenv("VAPI_PHONE_NUMBER_ID", "") + webhook_secret: str = os.getenv("VAPI_WEBHOOK_SECRET", "") + base_url: str = "https://api.vapi.ai" + + +@dataclass +class SarvamConfig: + """Sarvam AI — purpose-built Indic STT/TTS. Far better than Whisper + for Hindi/Marathi/Punjabi accents in low-resource audio conditions.""" + api_key: str = os.getenv("SARVAM_API_KEY", "") + base_url: str = "https://api.sarvam.ai" + stt_model: str = "saarika:v2" # Sarvam Indic ASR + tts_model: str = "bulbul:v1" # Sarvam Indic TTS + translate_model: str = "mayura:v1" # Sarvam translation + + +@dataclass +class WhatsAppConfig: + token: str = os.getenv("WHATSAPP_TOKEN", "") + phone_number_id: str = os.getenv("WHATSAPP_PHONE_NUMBER_ID", "") + base_url: str = "https://graph.facebook.com/v19.0" + + +@dataclass +class FrappeConfig: + """TAP's LMS backend runs on Frappe framework.""" + base_url: str = os.getenv("FRAPPE_BASE_URL", "https://lms.theapprenticeproject.org") + api_key: str = os.getenv("FRAPPE_API_KEY", "") + api_secret: str = os.getenv("FRAPPE_API_SECRET", "") + + +@dataclass +class RedisConfig: + host: str = os.getenv("REDIS_HOST", "localhost") + port: int = int(os.getenv("REDIS_PORT", "6379")) + db: int = 0 + conversation_ttl: int = 86400 * 7 # 7 days — keeps call history per student + + +@dataclass +class LLMConfig: + provider: str = "openai" # openai | anthropic + model: str = "gpt-4o-mini" # fast + cheap for structured nudge dialogue + api_key: str = os.getenv("OPENAI_API_KEY", "") + temperature: float = 0.4 # low temp — keep agent on-script + max_tokens: int = 300 + + +@dataclass +class NudgeConfig: + """Dropout risk thresholds and nudge scheduling.""" + inactivity_days_threshold: int = 3 # call if inactive >= 3 days + high_risk_inactivity_days: int = 7 # priority call + max_calls_per_student_per_week: int = 2 + call_hours: tuple = (9, 20) # 9 AM – 8 PM IST only + parent_call_hour_threshold: int = 18 # after 6 PM → call parent + dropout_risk_score_threshold: float = 0.65 + # Channels in priority order + channel_priority: list = field(default_factory=lambda: [ + "voice_call", "whatsapp_voice_note", "whatsapp_text" + ]) + + +@dataclass +class AppConfig: + vapi: VAPIConfig = field(default_factory=VAPIConfig) + sarvam: SarvamConfig = field(default_factory=SarvamConfig) + whatsapp: WhatsAppConfig = field(default_factory=WhatsAppConfig) + frappe: FrappeConfig = field(default_factory=FrappeConfig) + redis: RedisConfig = field(default_factory=RedisConfig) + llm: LLMConfig = field(default_factory=LLMConfig) + nudge: NudgeConfig = field(default_factory=NudgeConfig) + + # Server + host: str = "0.0.0.0" + port: int = int(os.getenv("PORT", "8000")) + debug: bool = os.getenv("DEBUG", "false").lower() == "true" + + # Agent persona — "Didi" for female voice, warm teacher-like tone + agent_name: str = "Didi" + agent_persona: str = ( + "You are Didi, a warm and encouraging learning companion for students " + "in government schools across India. You speak in simple, friendly Hindi, " + "Marathi, or Punjabi based on the student's preference. Your goal is to " + "gently encourage students to continue their learning journey. You never " + "scold or lecture — you celebrate small progress and make learning feel " + "safe and achievable." + ) + + supported_languages: list = field(default_factory=lambda: [ + "hi", # Hindi + "mr", # Marathi + "pa", # Punjabi + ]) + + +# Singleton +_config: Optional[AppConfig] = None + + +def get_config() -> AppConfig: + global _config + if _config is None: + _config = AppConfig() + return _config diff --git a/experiments/framework.py b/experiments/framework.py new file mode 100644 index 0000000..9122d91 --- /dev/null +++ b/experiments/framework.py @@ -0,0 +1,218 @@ +# experiments/framework.py — A/B experimentation & impact measurement +# Author: Dashpreet Singh +# +# Tracks: return-to-learning rate, completion delta, parent engagement +# Supports: multi-arm experiments (voice vs WhatsApp vs SMS vs control) + +import hashlib +import json +import logging +from dataclasses import dataclass, field, asdict +from datetime import datetime, timezone +from enum import Enum +from typing import Optional + +logger = logging.getLogger("tap.experiments") + + +class Arm(str, Enum): + CONTROL = "control" # no nudge + VOICE_CALL = "voice_call" + WHATSAPP_VOICE = "whatsapp_voice" + WHATSAPP_TEXT = "whatsapp_text" + SMS = "sms" + + +@dataclass +class Experiment: + experiment_id: str + name: str + description: str + arms: list[Arm] + traffic_split: list[float] # must sum to 1.0, one per arm + start_date: str + end_date: Optional[str] = None + is_active: bool = True + # Primary metric + primary_metric: str = "return_within_72h" + # Secondary metrics + secondary_metrics: list[str] = field(default_factory=lambda: [ + "completion_delta_7d", + "call_answer_rate", + "conversation_duration_seconds", + "parent_engagement_rate", + ]) + + +@dataclass +class ExperimentEvent: + experiment_id: str + student_id: str + arm: Arm + event_type: str # "assigned" | "nudged" | "returned" | "completed_lesson" + timestamp: str + metadata: dict = field(default_factory=dict) + + +@dataclass +class ArmMetrics: + arm: Arm + n: int # students in this arm + returned_72h: int # returned to platform within 72h + return_rate: float # returned_72h / n + avg_completion_delta: float # completion % change after nudge + avg_call_duration: float # seconds (0 for non-voice arms) + answer_rate: float # calls answered / calls attempted + parent_responses: int + + +class ExperimentFramework: + """Deterministic A/B assignment + metric aggregation. + + Assignment is deterministic per (student_id, experiment_id) hash + so the same student always lands in the same arm — no re-randomisation. + """ + + def __init__(self): + self._experiments: dict[str, Experiment] = {} + self._events: list[ExperimentEvent] = [] # in prod: write to DB + + # Register default experiment + self._register_default_experiment() + + def _register_default_experiment(self): + exp = Experiment( + experiment_id="exp_voice_v1", + name="Voice vs WhatsApp vs Control — v1", + description=( + "Measure impact of proactive voice calls vs WhatsApp voice notes " + "vs SMS vs no nudge on 72h return-to-learning rate." + ), + arms=[Arm.CONTROL, Arm.VOICE_CALL, Arm.WHATSAPP_VOICE, Arm.WHATSAPP_TEXT], + traffic_split=[0.25, 0.40, 0.25, 0.10], # voice call gets most traffic + start_date=datetime.now(timezone.utc).isoformat(), + primary_metric="return_within_72h", + ) + self._experiments[exp.experiment_id] = exp + + def register_experiment(self, exp: Experiment): + if abs(sum(exp.traffic_split) - 1.0) > 0.001: + raise ValueError("traffic_split must sum to 1.0") + if len(exp.arms) != len(exp.traffic_split): + raise ValueError("arms and traffic_split must have same length") + self._experiments[exp.experiment_id] = exp + logger.info("Registered experiment: %s", exp.experiment_id) + + def assign_arm(self, student_id: str, experiment_id: str) -> Arm: + """Deterministically assign student to an arm using hash bucket.""" + exp = self._experiments.get(experiment_id) + if not exp or not exp.is_active: + return Arm.VOICE_CALL # default if no active experiment + + # Hash (student_id + experiment_id) → [0, 1) bucket + raw = f"{student_id}:{experiment_id}" + h = int(hashlib.md5(raw.encode()).hexdigest(), 16) + bucket = (h % 10000) / 10000.0 + + cumulative = 0.0 + for arm, split in zip(exp.arms, exp.traffic_split): + cumulative += split + if bucket < cumulative: + return arm + + return exp.arms[-1] # should never reach here + + def record_event( + self, + experiment_id: str, + student_id: str, + event_type: str, + metadata: Optional[dict] = None, + ): + arm = self.assign_arm(student_id, experiment_id) + event = ExperimentEvent( + experiment_id=experiment_id, + student_id=student_id, + arm=arm, + event_type=event_type, + timestamp=datetime.now(timezone.utc).isoformat(), + metadata=metadata or {}, + ) + self._events.append(event) + logger.debug("Experiment event: %s / %s / %s", experiment_id, student_id, event_type) + + def compute_metrics(self, experiment_id: str) -> dict[Arm, ArmMetrics]: + """Aggregate events into per-arm metrics.""" + exp = self._experiments.get(experiment_id) + if not exp: + return {} + + # Bucket events per arm + arm_events: dict[Arm, list[ExperimentEvent]] = {a: [] for a in exp.arms} + for e in self._events: + if e.experiment_id == experiment_id: + arm_events[e.arm].append(e) + + metrics = {} + for arm in exp.arms: + events = arm_events[arm] + assigned = [e for e in events if e.event_type == "assigned"] + returned = [e for e in events if e.event_type == "returned"] + nudged = [e for e in events if e.event_type == "nudged"] + + n = len(assigned) or 1 + answer_events = [e for e in nudged if e.metadata.get("answered")] + durations = [e.metadata.get("duration_seconds", 0) for e in nudged] + completion_deltas = [e.metadata.get("completion_delta", 0) for e in returned] + parent_resp = sum(1 for e in nudged if e.metadata.get("is_parent_call")) + + metrics[arm] = ArmMetrics( + arm=arm, + n=n, + returned_72h=len(returned), + return_rate=len(returned) / n, + avg_completion_delta=sum(completion_deltas) / len(completion_deltas) if completion_deltas else 0, + avg_call_duration=sum(durations) / len(durations) if durations else 0, + answer_rate=len(answer_events) / len(nudged) if nudged else 0, + parent_responses=parent_resp, + ) + + return metrics + + def get_summary_report(self, experiment_id: str) -> dict: + metrics = self.compute_metrics(experiment_id) + exp = self._experiments.get(experiment_id) + if not exp: + return {} + + report = { + "experiment_id": experiment_id, + "name": exp.name, + "generated_at": datetime.now(timezone.utc).isoformat(), + "arms": {}, + } + + best_arm = None + best_rate = -1.0 + for arm, m in metrics.items(): + report["arms"][arm] = asdict(m) + if m.return_rate > best_rate: + best_rate = m.return_rate + best_arm = arm + + report["best_performing_arm"] = best_arm + report["lift_over_control"] = ( + round( + (metrics[best_arm].return_rate - metrics[Arm.CONTROL].return_rate) + / max(metrics[Arm.CONTROL].return_rate, 0.001) * 100, + 1, + ) + if Arm.CONTROL in metrics and best_arm + else None + ) + + return report + + +# Module-level singleton +experiment_framework = ExperimentFramework() diff --git a/lms/frappe_client.py b/lms/frappe_client.py new file mode 100644 index 0000000..737ba9b --- /dev/null +++ b/lms/frappe_client.py @@ -0,0 +1,197 @@ +# lms/frappe_client.py — TAP LMS (Frappe) REST API client +# Author: Dashpreet Singh + +import httpx +import logging +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Optional + +from config import get_config + +logger = logging.getLogger("tap.lms") + + +@dataclass +class StudentProfile: + student_id: str + name: str + phone: str # primary contact (student) + parent_phone: Optional[str] # parent/guardian contact + language_preference: str # "hi" | "mr" | "pa" + grade: int + school_name: str + gender: str # "female" | "male" + enrolled_courses: list = field(default_factory=list) + + +@dataclass +class LearningProgress: + student_id: str + current_course: str + current_module: str + current_lesson: str + completion_pct: float # 0.0 – 100.0 + last_active_at: Optional[datetime] + days_inactive: int + total_sessions: int + avg_session_minutes: float + streak_days: int + best_performing_subject: str + struggling_subject: str + # Raw activity log for dropout risk model + weekly_session_counts: list = field(default_factory=list) # last 8 weeks + + +@dataclass +class NudgeRecord: + """Written back to Frappe after each call so teachers can see history.""" + student_id: str + call_timestamp: str + language_used: str + channel: str # "voice_call" | "whatsapp_voice" + call_duration_seconds: int + student_responded: bool + returned_to_platform: bool + nudge_topic: str # what was discussed + escalated_to_human: bool + + +class FrappeClient: + """Async client for TAP's Frappe LMS REST API. + + Auth: Frappe token-based auth (API Key + API Secret in header). + All methods return typed dataclasses — no raw dicts leak out. + """ + + def __init__(self): + cfg = get_config().frappe + self._base = cfg.base_url.rstrip("/") + self._headers = { + "Authorization": f"token {cfg.api_key}:{cfg.api_secret}", + "Content-Type": "application/json", + } + + async def get_student_profile(self, student_id: str) -> Optional[StudentProfile]: + async with httpx.AsyncClient(timeout=10) as client: + try: + r = await client.get( + f"{self._base}/api/resource/Student/{student_id}", + headers=self._headers, + ) + r.raise_for_status() + d = r.json().get("data", {}) + return StudentProfile( + student_id=student_id, + name=d.get("student_name", ""), + phone=d.get("student_mobile_number", ""), + parent_phone=d.get("guardian_mobile_number"), + language_preference=d.get("preferred_language", "hi"), + grade=int(d.get("grade", 0)), + school_name=d.get("school", ""), + gender=d.get("gender", "female").lower(), + enrolled_courses=d.get("enrolled_courses", []), + ) + except Exception as e: + logger.error("Failed to fetch student %s: %s", student_id, e) + return None + + async def get_learning_progress(self, student_id: str) -> Optional[LearningProgress]: + async with httpx.AsyncClient(timeout=10) as client: + try: + r = await client.get( + f"{self._base}/api/method/tap_lms.api.get_student_progress", + headers=self._headers, + params={"student_id": student_id}, + ) + r.raise_for_status() + d = r.json().get("message", {}) + + last_active_str = d.get("last_active_at") + last_active = None + days_inactive = 0 + if last_active_str: + last_active = datetime.fromisoformat(last_active_str) + days_inactive = ( + datetime.now(timezone.utc) - last_active.replace(tzinfo=timezone.utc) + ).days + + return LearningProgress( + student_id=student_id, + current_course=d.get("current_course", ""), + current_module=d.get("current_module", ""), + current_lesson=d.get("current_lesson", ""), + completion_pct=float(d.get("completion_pct", 0)), + last_active_at=last_active, + days_inactive=days_inactive, + total_sessions=int(d.get("total_sessions", 0)), + avg_session_minutes=float(d.get("avg_session_minutes", 0)), + streak_days=int(d.get("streak_days", 0)), + best_performing_subject=d.get("best_subject", ""), + struggling_subject=d.get("struggling_subject", ""), + weekly_session_counts=d.get("weekly_sessions", []), + ) + except Exception as e: + logger.error("Failed to fetch progress for %s: %s", student_id, e) + return None + + async def get_at_risk_students( + self, + inactivity_days: int = 3, + limit: int = 500, + ) -> list[dict]: + """Fetch students who haven't logged in for >= inactivity_days.""" + async with httpx.AsyncClient(timeout=30) as client: + try: + r = await client.get( + f"{self._base}/api/method/tap_lms.api.get_at_risk_students", + headers=self._headers, + params={"inactivity_days": inactivity_days, "limit": limit}, + ) + r.raise_for_status() + return r.json().get("message", []) + except Exception as e: + logger.error("Failed to fetch at-risk students: %s", e) + return [] + + async def write_nudge_record(self, record: NudgeRecord) -> bool: + """Write call outcome back to Frappe so teachers can track.""" + async with httpx.AsyncClient(timeout=10) as client: + try: + r = await client.post( + f"{self._base}/api/resource/Voice Nudge Log", + headers=self._headers, + json={ + "student": record.student_id, + "call_timestamp": record.call_timestamp, + "language_used": record.language_used, + "channel": record.channel, + "call_duration_seconds": record.call_duration_seconds, + "student_responded": int(record.student_responded), + "returned_to_platform": int(record.returned_to_platform), + "nudge_topic": record.nudge_topic, + "escalated_to_human": int(record.escalated_to_human), + }, + ) + r.raise_for_status() + return True + except Exception as e: + logger.error("Failed to write nudge record: %s", e) + return False + + async def update_student_engagement( + self, student_id: str, returned: bool + ) -> bool: + """Update engagement flag after platform return is detected.""" + async with httpx.AsyncClient(timeout=10) as client: + try: + r = await client.put( + f"{self._base}/api/resource/Student/{student_id}", + headers=self._headers, + json={"last_nudge_returned": returned}, + ) + r.raise_for_status() + return True + except Exception as e: + logger.error("Failed to update engagement for %s: %s", student_id, e) + return False diff --git a/main.py b/main.py new file mode 100644 index 0000000..16a4361 --- /dev/null +++ b/main.py @@ -0,0 +1,274 @@ +# main.py — FastAPI application entry point +# Author: Dashpreet Singh +# IIT Jammu, B.Tech CSE 2024-2028 + +import asyncio +import hashlib +import hmac +import json +import logging +from contextlib import asynccontextmanager +from datetime import datetime, timezone + +import uvicorn +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from fastapi import FastAPI, HTTPException, Request, BackgroundTasks +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse +from pydantic import BaseModel +from typing import Optional + +from config import get_config +from nudge.orchestrator import NudgeOrchestrator +from experiments.framework import experiment_framework, Arm +from agent.language_detector import detector, Language + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", +) +logger = logging.getLogger("tap.main") + +cfg = get_config() +orchestrator = NudgeOrchestrator() +scheduler = AsyncIOScheduler(timezone="Asia/Kolkata") + + +# ── Scheduler jobs ───────────────────────────────────────────────────────── + +async def _scheduled_batch_nudge(): + """Run every morning at 10 AM and evening at 6 PM IST.""" + logger.info("Scheduled batch nudge starting...") + result = await orchestrator.run_batch(max_concurrent=10) + logger.info("Batch nudge complete: %s", result) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup + scheduler.add_job( + _scheduled_batch_nudge, + "cron", + hour="10,18", + minute=0, + id="batch_nudge", + replace_existing=True, + ) + scheduler.start() + logger.info("TAP Voice Agent started. Scheduler running.") + yield + # Shutdown + scheduler.shutdown() + logger.info("TAP Voice Agent shutting down.") + + +# ── FastAPI app ──────────────────────────────────────────────────────────── + +app = FastAPI( + title="TAP Multilingual Voice Agent", + description=( + "AI-powered multilingual voice agent for student re-engagement. " + "Built for The Apprentice Project (TAP) government school deployments. " + "Author: Dashpreet Singh " + ), + version="1.0.0", + lifespan=lifespan, +) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], +) + + +# ── Request / Response models ────────────────────────────────────────────── + +class NudgeRequest(BaseModel): + student_id: str + force: bool = False # bypass rate limit (for testing) + + +class BatchNudgeRequest(BaseModel): + inactivity_days: int = 3 + max_concurrent: int = 5 + + +class ExperimentReportRequest(BaseModel): + experiment_id: str = "exp_voice_v1" + + +# ── VAPI webhook ────────────────────────────────────────────────────────── + +@app.post("/webhook/vapi") +async def vapi_webhook(request: Request, background_tasks: BackgroundTasks): + """VAPI sends call events here: call-started, transcript, call-ended.""" + body = await request.body() + + # Verify webhook signature + sig = request.headers.get("x-vapi-signature", "") + expected = hmac.new( + cfg.vapi.webhook_secret.encode(), + body, + hashlib.sha256, + ).hexdigest() + if sig and not hmac.compare_digest(sig, expected): + raise HTTPException(status_code=401, detail="Invalid webhook signature") + + payload = json.loads(body) + event_type = payload.get("message", {}).get("type", "") + call = payload.get("message", {}).get("call", {}) + metadata = call.get("metadata", {}) + student_id = metadata.get("student_id") + + logger.info("VAPI webhook: %s for student %s", event_type, student_id) + + if event_type == "end-of-call-report": + report = payload.get("message", {}) + transcript = report.get("transcript", "") + duration = report.get("durationSeconds", 0) + ended_reason = report.get("endedReason", "") + + # Detect if student committed to logging in + commitment_phrases = [ + "haan", "ho", "haan ji", "theek hai", "zaroor", "abhi", + "okay", "yes", "karenge", "karoonga", "karoongi", + ] + student_committed = any(p in transcript.lower() for p in commitment_phrases) + + # Record in experiment framework + if student_id: + background_tasks.add_task( + experiment_framework.record_event, + "exp_voice_v1", + student_id, + "nudged", + { + "answered": ended_reason != "customer-did-not-answer", + "duration_seconds": duration, + "is_parent_call": metadata.get("is_parent_call", False), + "student_committed": student_committed, + } + ) + + elif event_type == "transcript": + # Real-time: check for escalation triggers + utterance = payload.get("message", {}).get("transcript", "") + if student_id and utterance: + result = detector.detect(utterance) + logger.debug( + "Detected language in call: %s (%.2f confidence)", + result.language, result.confidence + ) + + return JSONResponse({"status": "ok"}) + + +# ── Frappe LMS webhook (student logs in after nudge) ────────────────────── + +@app.post("/webhook/lms/login") +async def lms_login_webhook(request: Request, background_tasks: BackgroundTasks): + """Frappe fires this when a student opens the TAP app. + + Used to measure return-to-learning rate (our primary metric). + """ + payload = await request.json() + student_id = payload.get("student_id") + if not student_id: + return JSONResponse({"status": "ignored"}) + + logger.info("Student logged in after nudge: %s", student_id) + + # Record return event in experiment + background_tasks.add_task( + experiment_framework.record_event, + "exp_voice_v1", + student_id, + "returned", + { + "completion_delta": payload.get("completion_delta", 0), + "timestamp": datetime.now(timezone.utc).isoformat(), + } + ) + + # Update Frappe record + background_tasks.add_task( + orchestrator._lms.update_student_engagement, + student_id, + True, + ) + + return JSONResponse({"status": "recorded"}) + + +# ── REST API endpoints ───────────────────────────────────────────────────── + +@app.post("/api/nudge/student") +async def nudge_student(req: NudgeRequest, background_tasks: BackgroundTasks): + """Trigger a nudge for a specific student.""" + background_tasks.add_task(orchestrator.nudge_student, req.student_id) + return JSONResponse({ + "status": "queued", + "student_id": req.student_id, + "message": "Nudge pipeline started in background", + }) + + +@app.post("/api/nudge/batch") +async def batch_nudge(req: BatchNudgeRequest, background_tasks: BackgroundTasks): + """Trigger batch nudge for all at-risk students.""" + background_tasks.add_task(orchestrator.run_batch, req.max_concurrent) + return JSONResponse({ + "status": "queued", + "message": f"Batch nudge started (max_concurrent={req.max_concurrent})", + }) + + +@app.get("/api/experiments/report") +async def experiment_report(experiment_id: str = "exp_voice_v1"): + """Get A/B experiment metrics report.""" + report = experiment_framework.get_summary_report(experiment_id) + if not report: + raise HTTPException(status_code=404, detail="Experiment not found") + return JSONResponse(report) + + +@app.get("/api/health") +async def health(): + return JSONResponse({ + "status": "healthy", + "service": "TAP Multilingual Voice Agent", + "author": "Dashpreet Singh ", + "version": "1.0.0", + "timestamp": datetime.now(timezone.utc).isoformat(), + "supported_languages": ["Hindi (hi)", "Marathi (mr)", "Punjabi (pa)"], + "scheduler_running": scheduler.running, + }) + + +@app.post("/api/detect-language") +async def detect_language(request: Request): + """Debug endpoint: detect language from text.""" + body = await request.json() + text = body.get("text", "") + result = detector.detect(text) + return JSONResponse({ + "text": text, + "language": result.language, + "confidence": result.confidence, + "script": result.script, + "is_code_switched": result.is_code_switched, + }) + + +# ── Entry point ──────────────────────────────────────────────────────────── + +if __name__ == "__main__": + uvicorn.run( + "main:app", + host=cfg.host, + port=cfg.port, + reload=cfg.debug, + log_level="info", + ) diff --git a/nudge/dropout_risk.py b/nudge/dropout_risk.py new file mode 100644 index 0000000..c8fe29d --- /dev/null +++ b/nudge/dropout_risk.py @@ -0,0 +1,173 @@ +# nudge/dropout_risk.py — Lightweight dropout risk scorer +# Author: Dashpreet Singh +# +# Uses a logistic regression model trained on LMS behavioral signals. +# Intentionally lightweight — runs on the server without GPU. +# Falls back to rule-based scoring if model isn't trained yet. + +import json +import logging +import math +import os +from dataclasses import dataclass +from typing import Optional + +logger = logging.getLogger("tap.nudge") + +MODEL_PATH = os.path.join(os.path.dirname(__file__), "risk_model_weights.json") + + +@dataclass +class RiskScore: + student_id: str + score: float # 0.0 (safe) → 1.0 (about to drop out) + risk_level: str # "low" | "medium" | "high" | "critical" + primary_signal: str # what drove the score + recommended_channel: str # "voice_call" | "whatsapp_voice" | "sms" + recommended_contact: str # "student" | "parent" + call_time_window: str # "morning" | "afternoon" | "evening" + + +def _sigmoid(x: float) -> float: + return 1.0 / (1.0 + math.exp(-x)) + + +def _decay_rate(weekly_counts: list[int]) -> float: + """Measure how fast engagement is dropping week-over-week. + + Returns a value 0 (no decay) to 1 (complete dropout trajectory). + Uses exponential smoothing on the delta sequence. + """ + if len(weekly_counts) < 2: + return 0.0 + deltas = [weekly_counts[i] - weekly_counts[i - 1] + for i in range(1, len(weekly_counts))] + # Exponentially weight recent weeks more + alpha = 0.4 + smoothed = deltas[0] + for d in deltas[1:]: + smoothed = alpha * d + (1 - alpha) * smoothed + # Negative smoothed delta = decaying engagement + return min(1.0, max(0.0, -smoothed / 5.0)) + + +class DropoutRiskModel: + """Logistic regression over 6 behavioral features. + + Features: + f0: days_inactive (normalised to 0-1 over 30-day window) + f1: completion_pct inverse (1 - completion / 100) + f2: engagement decay rate (week-over-week session count delta) + f3: avg_session_minutes below threshold (< 5 min = low engagement) + f4: streak_days inverse (0 streak = 1.0) + f5: total_sessions below threshold (< 10 = new + at-risk) + + Weights are pre-set from domain knowledge and updated online + as outcome data (did_return_after_nudge) accumulates. + """ + + DEFAULT_WEIGHTS = [0.8, 0.4, 1.2, 0.5, 0.6, 0.3] + DEFAULT_BIAS = -1.5 + + def __init__(self): + self.weights = self.DEFAULT_WEIGHTS[:] + self.bias = self.DEFAULT_BIAS + self._load_weights() + + def _load_weights(self): + if os.path.exists(MODEL_PATH): + try: + with open(MODEL_PATH) as f: + data = json.load(f) + self.weights = data.get("weights", self.DEFAULT_WEIGHTS) + self.bias = data.get("bias", self.DEFAULT_BIAS) + logger.info("Loaded risk model weights from %s", MODEL_PATH) + except Exception as e: + logger.warning("Could not load risk model: %s — using defaults", e) + + def save_weights(self): + with open(MODEL_PATH, "w") as f: + json.dump({"weights": self.weights, "bias": self.bias}, f) + + def _extract_features(self, progress) -> list[float]: + f0 = min(1.0, progress.days_inactive / 30.0) + f1 = 1.0 - min(1.0, progress.completion_pct / 100.0) + f2 = _decay_rate(progress.weekly_session_counts) + f3 = 1.0 if progress.avg_session_minutes < 5.0 else 0.0 + f4 = 1.0 if progress.streak_days == 0 else max(0.0, 1.0 - progress.streak_days / 14.0) + f5 = 1.0 if progress.total_sessions < 10 else 0.0 + return [f0, f1, f2, f3, f4, f5] + + def score(self, student_id: str, progress, profile) -> RiskScore: + features = self._extract_features(progress) + logit = self.bias + sum(w * f for w, f in zip(self.weights, features)) + risk = _sigmoid(logit) + + # Risk level bands + if risk >= 0.85: + level = "critical" + elif risk >= 0.65: + level = "high" + elif risk >= 0.40: + level = "medium" + else: + level = "low" + + # Primary signal — which feature drove the score most + weighted = [w * f for w, f in zip(self.weights, features)] + signals = [ + "extended inactivity", "low course completion", + "declining weekly sessions", "very short sessions", + "no active streak", "new learner with low activity" + ] + primary_signal = signals[weighted.index(max(weighted))] + + # Channel recommendation + if risk >= 0.65: + channel = "voice_call" + elif risk >= 0.40: + channel = "whatsapp_voice" + else: + channel = "sms" + + # Student vs parent routing + import datetime + hour = datetime.datetime.now().hour + contact = "parent" if (hour >= 18 or profile.grade <= 6) else "student" + + # Call time window + if 9 <= hour < 13: + time_window = "morning" + elif 13 <= hour < 17: + time_window = "afternoon" + else: + time_window = "evening" + + return RiskScore( + student_id=student_id, + score=round(risk, 4), + risk_level=level, + primary_signal=primary_signal, + recommended_channel=channel, + recommended_contact=contact, + call_time_window=time_window, + ) + + def update_online(self, features: list[float], did_return: bool, lr: float = 0.01): + """Online logistic regression update (SGD step). + + Called after each nudge outcome is known (did student return?). + This lets the model improve over time without full retraining. + """ + logit = self.bias + sum(w * f for w, f in zip(self.weights, features)) + pred = _sigmoid(logit) + label = 1.0 if not did_return else 0.0 # 1 = dropout risk confirmed + error = pred - label + for i in range(len(self.weights)): + self.weights[i] -= lr * error * features[i] + self.bias -= lr * error + self.save_weights() + + +# Module-level singleton +risk_model = DropoutRiskModel() diff --git a/nudge/orchestrator.py b/nudge/orchestrator.py new file mode 100644 index 0000000..b955963 --- /dev/null +++ b/nudge/orchestrator.py @@ -0,0 +1,263 @@ +# nudge/orchestrator.py — Master nudge orchestration engine +# Author: Dashpreet Singh +# +# This is the brain. Given a student, it decides: +# - Should we nudge? (risk score) +# - How? (channel selection) +# - Who? (student vs parent) +# - What do we say? (conversation context) +# - What happened? (outcome logging) + +import asyncio +import json +import logging +import redis.asyncio as aioredis +from datetime import datetime, timezone +from typing import Optional + +from config import get_config +from agent.language_detector import Language, detector +from agent.conversation_flow import ( + ConversationContext, NudgeType, flow_engine +) +from lms.frappe_client import FrappeClient, NudgeRecord +from nudge.dropout_risk import risk_model, RiskScore +from telephony.vapi_client import VAPIClient, CallResult +from telephony.whatsapp_client import WhatsAppClient + +logger = logging.getLogger("tap.orchestrator") + + +class NudgeOrchestrator: + """End-to-end nudge pipeline for a single student. + + Flow: + 1. Fetch student profile + learning progress from Frappe + 2. Score dropout risk + 3. Skip if risk < threshold or max calls exceeded this week + 4. Determine contact (student vs parent), channel, language + 5. Build conversation context + 6. Fire call via VAPI + 7. If no answer → WhatsApp voice note → WhatsApp text + 8. Log outcome to Frappe + Redis + 9. Online model update when return-to-platform signal arrives + """ + + def __init__(self): + self._cfg = get_config() + self._lms = FrappeClient() + self._vapi = VAPIClient() + self._wa = WhatsAppClient() + self._redis: Optional[aioredis.Redis] = None + + async def _get_redis(self) -> aioredis.Redis: + if self._redis is None: + rc = self._cfg.redis + self._redis = await aioredis.from_url( + f"redis://{rc.host}:{rc.port}/{rc.db}", + decode_responses=True, + ) + return self._redis + + # ── Rate limiting ────────────────────────────────────────────────── + + async def _calls_this_week(self, student_id: str) -> int: + r = await self._get_redis() + key = f"calls_week:{student_id}" + val = await r.get(key) + return int(val) if val else 0 + + async def _increment_call_count(self, student_id: str): + r = await self._get_redis() + key = f"calls_week:{student_id}" + await r.incr(key) + await r.expire(key, 7 * 86400) # reset weekly + + # ── Conversation history (memory across calls) ───────────────────── + + async def _get_call_history(self, student_id: str) -> list[dict]: + r = await self._get_redis() + key = f"call_history:{student_id}" + raw = await r.get(key) + return json.loads(raw) if raw else [] + + async def _save_call_outcome(self, student_id: str, outcome: dict): + r = await self._get_redis() + key = f"call_history:{student_id}" + history = await self._get_call_history(student_id) + history.append(outcome) + history = history[-10:] # keep last 10 calls only + await r.set(key, json.dumps(history), ex=self._cfg.redis.conversation_ttl) + + # ── Main nudge pipeline ──────────────────────────────────────────── + + async def nudge_student(self, student_id: str) -> dict: + """Full nudge pipeline for one student. Returns outcome dict.""" + logger.info("Starting nudge pipeline for student: %s", student_id) + + # ── 1. Fetch from LMS ───────────────────────────────────────── + profile, progress = await asyncio.gather( + self._lms.get_student_profile(student_id), + self._lms.get_learning_progress(student_id), + ) + + if not profile or not progress: + return {"student_id": student_id, "status": "skipped", "reason": "lms_fetch_failed"} + + # ── 2. Risk scoring ─────────────────────────────────────────── + risk: RiskScore = risk_model.score(student_id, progress, profile) + logger.info( + "Risk score for %s: %.3f (%s) — %s", + student_id, risk.score, risk.risk_level, risk.primary_signal + ) + + if risk.score < self._cfg.nudge.dropout_risk_score_threshold and progress.days_inactive < self._cfg.nudge.inactivity_days_threshold: + return { + "student_id": student_id, + "status": "skipped", + "reason": "below_threshold", + "risk_score": risk.score, + } + + # ── 3. Rate limiting ────────────────────────────────────────── + calls_this_week = await self._calls_this_week(student_id) + if calls_this_week >= self._cfg.nudge.max_calls_per_student_per_week: + return { + "student_id": student_id, + "status": "skipped", + "reason": "max_calls_reached", + "calls_this_week": calls_this_week, + } + + # ── 4. Determine language ───────────────────────────────────── + lang_map = {"hi": Language.HINDI, "mr": Language.MARATHI, "pa": Language.PUNJABI} + language = lang_map.get(profile.language_preference, Language.HINDI) + + # ── 5. Determine nudge type ─────────────────────────────────── + call_history = await self._get_call_history(student_id) + last_topics = [h.get("topic") for h in call_history[-3:]] + + if risk.recommended_contact == "parent": + nudge_type = NudgeType.PARENT_AWARENESS + elif progress.days_inactive >= 7: + nudge_type = NudgeType.RETURN_TO_LEARNING + elif "return" in last_topics and progress.completion_pct < 30: + nudge_type = NudgeType.LESSON_COMPLETION + elif progress.streak_days == 0 and progress.total_sessions > 5: + nudge_type = NudgeType.STREAK_RECOVERY + else: + nudge_type = NudgeType.RETURN_TO_LEARNING + + # ── 6. Build conversation context ───────────────────────────── + is_parent_call = risk.recommended_contact == "parent" + phone = profile.parent_phone if is_parent_call and profile.parent_phone else profile.phone + + ctx = ConversationContext( + student_id=student_id, + student_name=profile.name, + language=language, + nudge_type=nudge_type, + current_lesson=progress.current_lesson, + completion_pct=progress.completion_pct, + days_inactive=progress.days_inactive, + streak_days=progress.streak_days, + is_parent_call=is_parent_call, + call_attempt_number=calls_this_week + 1, + ) + + # ── 7. Execute call with channel fallback ───────────────────── + channel_used = None + call_result = None + + if risk.recommended_channel == "voice_call": + call_result = await self._vapi.initiate_call(phone, ctx) + channel_used = "voice_call" + + if call_result.status in ("no-answer", "failed"): + logger.info("Call not answered for %s — trying WhatsApp voice note", student_id) + message = self._get_voice_note_message(ctx) + success = await self._wa.send_voice_note(phone, message, language, profile.name) + channel_used = "whatsapp_voice" if success else "whatsapp_text" + if not success: + text_msg = self._wa.get_missed_call_message(profile.name, language) + await self._wa.send_text_message(phone, text_msg, language) + + elif risk.recommended_channel == "whatsapp_voice": + message = self._get_voice_note_message(ctx) + success = await self._wa.send_voice_note(phone, message, language, profile.name) + channel_used = "whatsapp_voice" if success else "whatsapp_text" + + else: + text_msg = self._wa.get_missed_call_message(profile.name, language) + await self._wa.send_text_message(phone, text_msg, language) + channel_used = "whatsapp_text" + + # ── 8. Increment call counter ───────────────────────────────── + await self._increment_call_count(student_id) + + # ── 9. Log outcome ───────────────────────────────────────────── + outcome = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "channel": channel_used, + "language": language, + "topic": nudge_type, + "call_status": call_result.status if call_result else "not_called", + "risk_score": risk.score, + } + await self._save_call_outcome(student_id, outcome) + + record = NudgeRecord( + student_id=student_id, + call_timestamp=outcome["timestamp"], + language_used=language, + channel=channel_used, + call_duration_seconds=call_result.duration_seconds if call_result else 0, + student_responded=call_result.status == "answered" if call_result else False, + returned_to_platform=False, # updated by webhook when student logs in + nudge_topic=nudge_type, + escalated_to_human=False, + ) + await self._lms.write_nudge_record(record) + + return { + "student_id": student_id, + "status": "nudged", + "channel": channel_used, + "language": language, + "nudge_type": nudge_type, + "risk_score": risk.score, + "call_id": call_result.call_id if call_result else None, + } + + def _get_voice_note_message(self, ctx: ConversationContext) -> str: + return flow_engine.get_opening(ctx) + " " + flow_engine.get_closing(ctx) + + # ── Batch nudge (called by scheduler) ──────────────────────────── + + async def run_batch(self, max_concurrent: int = 5) -> dict: + """Fetch all at-risk students and nudge them concurrently.""" + at_risk = await self._lms.get_at_risk_students( + inactivity_days=self._cfg.nudge.inactivity_days_threshold + ) + logger.info("Batch nudge: %d at-risk students", len(at_risk)) + + semaphore = asyncio.Semaphore(max_concurrent) + + async def bounded_nudge(student_id: str): + async with semaphore: + return await self.nudge_student(student_id) + + tasks = [bounded_nudge(s["student_id"]) for s in at_risk] + results = await asyncio.gather(*tasks, return_exceptions=True) + + summary = {"total": len(at_risk), "nudged": 0, "skipped": 0, "errors": 0} + for r in results: + if isinstance(r, Exception): + summary["errors"] += 1 + elif r.get("status") == "nudged": + summary["nudged"] += 1 + else: + summary["skipped"] += 1 + + logger.info("Batch complete: %s", summary) + return summary diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0103dc8 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,11 @@ +fastapi==0.111.0 +uvicorn[standard]==0.29.0 +httpx==0.27.0 +redis[asyncio]==5.0.4 +apscheduler==3.10.4 +pydantic==2.7.1 +python-dotenv==1.0.1 +scikit-learn==1.4.2 +numpy==1.26.4 +pytest==8.2.0 +pytest-asyncio==0.23.6 diff --git a/telephony/vapi_client.py b/telephony/vapi_client.py new file mode 100644 index 0000000..723a505 --- /dev/null +++ b/telephony/vapi_client.py @@ -0,0 +1,194 @@ +# telephony/vapi_client.py — VAPI voice agent orchestration +# Author: Dashpreet Singh +# +# VAPI handles: SIP trunking, WebRTC, call state, STT/TTS pipeline. +# We send it a structured assistant config and it runs the call. + +import httpx +import logging +from dataclasses import dataclass +from typing import Optional + +from config import get_config +from agent.language_detector import Language, SARVAM_VOICES +from agent.conversation_flow import ConversationContext, flow_engine + +logger = logging.getLogger("tap.telephony") + + +@dataclass +class CallResult: + call_id: str + status: str # "queued" | "ringing" | "answered" | "no-answer" | "failed" + phone_number: str + duration_seconds: int = 0 + transcript: str = "" + error: Optional[str] = None + + +class VAPIClient: + """Async client for VAPI voice agent API. + + Each outbound call creates a VAPI Assistant on the fly with: + - Sarvam AI STT (Indic-native, better accent handling than Whisper) + - LLM (GPT-4o-mini) with Didi system prompt + - Sarvam AI TTS (natural Indic voices per language) + - Our webhook for real-time transcript + outcome logging + """ + + def __init__(self): + cfg = get_config() + self._vapi_cfg = cfg.vapi + self._sarvam_cfg = cfg.sarvam + self._llm_cfg = cfg.llm + self._base = self._vapi_cfg.base_url + + def _build_assistant_config(self, ctx: ConversationContext) -> dict: + """Build VAPI assistant payload for a single call.""" + voice_id = SARVAM_VOICES.get(ctx.language, SARVAM_VOICES[Language.HINDI]) + voice_id = voice_id.get("female", "meera") + + system_prompt = flow_engine.build_system_prompt(ctx) + opening = flow_engine.get_opening(ctx) + + # Sarvam language codes for VAPI + sarvam_lang_codes = { + Language.HINDI: "hi-IN", + Language.MARATHI: "mr-IN", + Language.PUNJABI: "pa-IN", + } + lang_code = sarvam_lang_codes.get(ctx.language, "hi-IN") + + return { + "name": f"Didi-{ctx.student_id}", + "firstMessage": opening, + "model": { + "provider": "openai", + "model": self._llm_cfg.model, + "temperature": self._llm_cfg.temperature, + "maxTokens": self._llm_cfg.max_tokens, + "messages": [ + {"role": "system", "content": system_prompt} + ], + }, + "voice": { + "provider": "custom-voice", + "voiceId": voice_id, + # Sarvam TTS via custom provider endpoint + "customVoiceEndpoint": { + "url": f"{self._sarvam_cfg.base_url}/v1/text-to-speech", + "headers": { + "api-subscription-key": self._sarvam_cfg.api_key + }, + "body": { + "target_language_code": lang_code, + "speaker": voice_id, + "model": self._sarvam_cfg.tts_model, + "pace": 0.9, # slightly slower for clarity + "pitch": 0.0, + "loudness": 1.2, + } + }, + }, + "transcriber": { + "provider": "deepgram", # VAPI native; Sarvam STT via webhook + "model": "nova-2", + "language": lang_code, + "smartFormat": True, + "keywords": ["TAP", "app", "padhai", "lesson"], + }, + "endCallFunctionEnabled": True, + "endCallMessage": flow_engine.get_closing(ctx), + "maxDurationSeconds": 180, # 3 min max per call + "silenceTimeoutSeconds": 8, + "responseDelaySeconds": 0.5, + "llmRequestDelaySeconds": 0.1, + "numWordsToInterruptAssistant": 3, + "serverUrl": f"{get_config().host}:{get_config().port}/webhook/vapi", + "serverUrlSecret": self._vapi_cfg.webhook_secret, + "metadata": { + "student_id": ctx.student_id, + "nudge_type": ctx.nudge_type, + "language": ctx.language, + "is_parent_call": ctx.is_parent_call, + }, + } + + async def initiate_call( + self, + phone_number: str, + ctx: ConversationContext, + ) -> CallResult: + assistant = self._build_assistant_config(ctx) + + payload = { + "assistant": assistant, + "phoneNumberId": self._vapi_cfg.phone_number_id, + "customer": { + "number": phone_number, + "name": ctx.student_name, + }, + } + + async with httpx.AsyncClient(timeout=30) as client: + try: + r = await client.post( + f"{self._base}/call/phone", + headers={ + "Authorization": f"Bearer {self._vapi_cfg.api_key}", + "Content-Type": "application/json", + }, + json=payload, + ) + r.raise_for_status() + data = r.json() + logger.info( + "Call initiated: %s → %s (call_id=%s)", + ctx.student_id, phone_number, data.get("id") + ) + return CallResult( + call_id=data.get("id", ""), + status=data.get("status", "queued"), + phone_number=phone_number, + ) + except httpx.HTTPStatusError as e: + logger.error("VAPI call failed: %s — %s", e.response.status_code, e.response.text) + return CallResult( + call_id="", + status="failed", + phone_number=phone_number, + error=str(e), + ) + except Exception as e: + logger.error("VAPI unexpected error: %s", e) + return CallResult( + call_id="", + status="failed", + phone_number=phone_number, + error=str(e), + ) + + async def get_call_status(self, call_id: str) -> dict: + async with httpx.AsyncClient(timeout=10) as client: + try: + r = await client.get( + f"{self._base}/call/{call_id}", + headers={"Authorization": f"Bearer {self._vapi_cfg.api_key}"}, + ) + r.raise_for_status() + return r.json() + except Exception as e: + logger.error("Failed to get call status for %s: %s", call_id, e) + return {} + + async def end_call(self, call_id: str) -> bool: + async with httpx.AsyncClient(timeout=10) as client: + try: + r = await client.delete( + f"{self._base}/call/{call_id}", + headers={"Authorization": f"Bearer {self._vapi_cfg.api_key}"}, + ) + return r.status_code == 200 + except Exception as e: + logger.error("Failed to end call %s: %s", call_id, e) + return False diff --git a/telephony/whatsapp_client.py b/telephony/whatsapp_client.py new file mode 100644 index 0000000..e0948dd --- /dev/null +++ b/telephony/whatsapp_client.py @@ -0,0 +1,177 @@ +# telephony/whatsapp_client.py — WhatsApp voice note fallback +# Author: Dashpreet Singh +# +# When an outbound call is not answered: +# 1. Generate TTS audio via Sarvam AI +# 2. Send as WhatsApp voice note (much higher open rate than text) +# 3. Optionally follow up with a short text if voice note fails + +import httpx +import logging +import tempfile +import os +from typing import Optional + +from config import get_config +from agent.language_detector import Language, SARVAM_VOICES + +logger = logging.getLogger("tap.whatsapp") + + +class WhatsAppClient: + """Meta WhatsApp Business API client with Sarvam TTS voice notes.""" + + def __init__(self): + cfg = get_config() + self._wa = cfg.whatsapp + self._sarvam = cfg.sarvam + self._base = self._wa.base_url + + async def _synthesise_audio( + self, + text: str, + language: Language, + ) -> Optional[bytes]: + """Call Sarvam TTS and return raw OGG/OPUS bytes for WhatsApp.""" + voice_id = SARVAM_VOICES.get(language, SARVAM_VOICES[Language.HINDI]) + voice_id = voice_id.get("female", "meera") + + lang_codes = { + Language.HINDI: "hi-IN", + Language.MARATHI: "mr-IN", + Language.PUNJABI: "pa-IN", + } + + async with httpx.AsyncClient(timeout=30) as client: + try: + r = await client.post( + f"{self._sarvam.base_url}/v1/text-to-speech", + headers={"api-subscription-key": self._sarvam.api_key}, + json={ + "inputs": [text], + "target_language_code": lang_codes.get(language, "hi-IN"), + "speaker": voice_id, + "model": self._sarvam.tts_model, + "pace": 0.9, + "enable_preprocessing": True, + "speech_sample_rate": 8000, # telephony quality, smaller file + }, + ) + r.raise_for_status() + data = r.json() + # Sarvam returns base64-encoded audio + import base64 + audio_b64 = data.get("audios", [""])[0] + return base64.b64decode(audio_b64) if audio_b64 else None + except Exception as e: + logger.error("Sarvam TTS failed: %s", e) + return None + + async def _upload_media(self, audio_bytes: bytes, mime_type: str = "audio/ogg") -> Optional[str]: + """Upload audio to WhatsApp media server, return media_id.""" + async with httpx.AsyncClient(timeout=30) as client: + try: + r = await client.post( + f"{self._base}/{self._wa.phone_number_id}/media", + headers={"Authorization": f"Bearer {self._wa.token}"}, + files={ + "file": ("voice_note.ogg", audio_bytes, mime_type), + "messaging_product": (None, "whatsapp"), + }, + ) + r.raise_for_status() + return r.json().get("id") + except Exception as e: + logger.error("WhatsApp media upload failed: %s", e) + return None + + async def send_voice_note( + self, + phone_number: str, + text: str, + language: Language, + student_name: str, + ) -> bool: + """Synthesise text → upload → send as WhatsApp voice note.""" + logger.info("Sending WhatsApp voice note to %s", phone_number) + + audio_bytes = await self._synthesise_audio(text, language) + if not audio_bytes: + logger.warning("Audio synthesis failed, falling back to text") + return await self.send_text_message(phone_number, text, language) + + media_id = await self._upload_media(audio_bytes) + if not media_id: + return await self.send_text_message(phone_number, text, language) + + async with httpx.AsyncClient(timeout=15) as client: + try: + r = await client.post( + f"{self._base}/{self._wa.phone_number_id}/messages", + headers={ + "Authorization": f"Bearer {self._wa.token}", + "Content-Type": "application/json", + }, + json={ + "messaging_product": "whatsapp", + "to": phone_number, + "type": "audio", + "audio": {"id": media_id}, + }, + ) + r.raise_for_status() + logger.info("Voice note sent to %s", phone_number) + return True + except Exception as e: + logger.error("WhatsApp voice note send failed: %s", e) + return False + + async def send_text_message( + self, + phone_number: str, + text: str, + language: Language, + ) -> bool: + """Last-resort text fallback.""" + async with httpx.AsyncClient(timeout=10) as client: + try: + r = await client.post( + f"{self._base}/{self._wa.phone_number_id}/messages", + headers={ + "Authorization": f"Bearer {self._wa.token}", + "Content-Type": "application/json", + }, + json={ + "messaging_product": "whatsapp", + "to": phone_number, + "type": "text", + "text": {"body": text}, + }, + ) + r.raise_for_status() + return True + except Exception as e: + logger.error("WhatsApp text send failed: %s", e) + return False + + def get_missed_call_message(self, student_name: str, language: Language) -> str: + """Short text fallback message when voice note also fails.""" + first_name = student_name.split()[0] + messages = { + Language.HINDI: ( + f"Namaste {first_name}! Main TAP se Didi hoon. " + f"Aapko call kiya tha — please TAP app kholo aur padhai continue karo. " + f"Aap bahut accha kar rahe ho!" + ), + Language.MARATHI: ( + f"Namaskar {first_name}! Mi TAP madhoon Didi. " + f"Tumhala call kela hota — TAP app ugha aani shikane suru kara. " + f"Tumhi khup chan karat aahat!" + ), + Language.PUNJABI: ( + f"Sat Sri Akal {first_name}! Main TAP ton Didi haan. " + f"Tenu call kita si — TAP app kholo te padhai jaari rakho. " + f"Tusi bahut changi tarah kar rahe ho!" + ), + } + return messages.get(language, messages[Language.HINDI]) diff --git a/tests/test_suite.py b/tests/test_suite.py new file mode 100644 index 0000000..ad58eca --- /dev/null +++ b/tests/test_suite.py @@ -0,0 +1,329 @@ +# tests/test_suite.py — Comprehensive tests for TAP Voice Agent +# Author: Dashpreet Singh +# +# Run: python -m pytest tests/test_suite.py -v +# No external services needed — all network calls are mocked. + +import asyncio +import hashlib +import json +import sys +import os +import unittest +from unittest.mock import AsyncMock, MagicMock, patch + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from agent.language_detector import LanguageDetector, Language +from agent.conversation_flow import ( + ConversationFlowEngine, ConversationContext, NudgeType +) +from nudge.dropout_risk import DropoutRiskModel, _decay_rate +from experiments.framework import ExperimentFramework, Arm, Experiment + + +# ── Language Detection Tests ────────────────────────────────────────────── + +class TestLanguageDetector(unittest.TestCase): + + def setUp(self): + self.det = LanguageDetector() + + def test_detect_hindi_devanagari(self): + result = self.det.detect("नमस्ते, मैं ठीक हूँ") + self.assertEqual(result.language, Language.HINDI) + self.assertGreater(result.confidence, 0.6) + + def test_detect_punjabi_gurmukhi(self): + result = self.det.detect("ਸਤ ਸ੍ਰੀ ਅਕਾਲ, ਤੁਸੀਂ ਕਿਵੇਂ ਹੋ") + self.assertEqual(result.language, Language.PUNJABI) + self.assertGreater(result.confidence, 0.6) + + def test_detect_hindi_romanised(self): + result = self.det.detect("haan mujhe padhai karni hai theek hai") + self.assertEqual(result.language, Language.HINDI) + + def test_detect_marathi_romanised(self): + result = self.det.detect("ho mala shikayche ahe chan") + self.assertEqual(result.language, Language.MARATHI) + + def test_code_switching_detected(self): + result = self.det.detect("haan मुझे याद है the assignment") + self.assertTrue(result.is_code_switched) + + def test_empty_text_returns_fallback(self): + result = self.det.detect("", fallback=Language.HINDI) + self.assertEqual(result.language, Language.HINDI) + self.assertEqual(result.confidence, 0.0) + + def test_numbers_only_returns_fallback(self): + result = self.det.detect("12345", fallback=Language.MARATHI) + self.assertEqual(result.language, Language.MARATHI) + + def test_get_tts_voice_hindi(self): + voice = self.det.get_tts_voice(Language.HINDI, "female") + self.assertEqual(voice, "meera") + + def test_get_tts_voice_punjabi_male(self): + voice = self.det.get_tts_voice(Language.PUNJABI, "male") + self.assertEqual(voice, "gurpreet") + + def test_all_supported_languages_have_voices(self): + for lang in [Language.HINDI, Language.MARATHI, Language.PUNJABI]: + for gender in ["male", "female"]: + voice = self.det.get_tts_voice(lang, gender) + self.assertIsNotNone(voice) + self.assertGreater(len(voice), 0) + + +# ── Conversation Flow Tests ─────────────────────────────────────────────── + +class TestConversationFlow(unittest.TestCase): + + def setUp(self): + self.engine = ConversationFlowEngine() + self.ctx = ConversationContext( + student_id="STU001", + student_name="Priya Sharma", + language=Language.HINDI, + nudge_type=NudgeType.RETURN_TO_LEARNING, + current_lesson="Fractions Basics", + completion_pct=35.0, + days_inactive=5, + streak_days=0, + is_parent_call=False, + call_attempt_number=1, + ) + + def test_opening_contains_student_name(self): + opening = self.engine.get_opening(self.ctx) + self.assertIn("Priya", opening) + + def test_opening_hindi(self): + opening = self.engine.get_opening(self.ctx) + # Should contain Hindi greeting + self.assertIn("Namaste", opening) + + def test_opening_marathi(self): + self.ctx.language = Language.MARATHI + opening = self.engine.get_opening(self.ctx) + self.assertIn("Namaskar", opening) + + def test_opening_punjabi(self): + self.ctx.language = Language.PUNJABI + opening = self.engine.get_opening(self.ctx) + self.assertIn("Sat Sri Akal", opening) + + def test_system_prompt_contains_student_name(self): + prompt = self.engine.build_system_prompt(self.ctx) + self.assertIn("Priya Sharma", prompt) + + def test_system_prompt_contains_language(self): + prompt = self.engine.build_system_prompt(self.ctx) + self.assertIn("Hindi", prompt) + + def test_system_prompt_contains_context(self): + prompt = self.engine.build_system_prompt(self.ctx) + self.assertIn("Fractions Basics", prompt) + self.assertIn("35", prompt) + + def test_escalation_triggers(self): + self.assertTrue(self.engine.should_escalate("mujhe bahut pareshaan hoon")) + self.assertTrue(self.engine.should_escalate("school nahi ja sakta")) + self.assertFalse(self.engine.should_escalate("haan main padhai karoonga")) + + def test_should_end_call_after_max_turns(self): + self.ctx.turn_count = 8 + self.assertTrue(self.engine.should_end_call(self.ctx)) + + def test_should_end_call_after_commitment(self): + self.ctx.student_committed = True + self.assertTrue(self.engine.should_end_call(self.ctx)) + + def test_closing_positive_when_committed(self): + self.ctx.student_committed = True + closing = self.engine.get_closing(self.ctx) + self.assertIn("achha", closing.lower()) + + def test_fallback_cycles_through_options(self): + f0 = self.engine.get_fallback(self.ctx) + self.ctx.consecutive_fallbacks = 1 + f1 = self.engine.get_fallback(self.ctx) + self.assertNotEqual(f0, f1) + + def test_parent_call_nudge_type(self): + self.ctx.nudge_type = NudgeType.PARENT_AWARENESS + self.ctx.is_parent_call = True + opening = self.engine.get_opening(self.ctx) + self.assertIsNotNone(opening) + self.assertGreater(len(opening), 10) + + def test_celebration_nudge(self): + self.ctx.nudge_type = NudgeType.CELEBRATION + opening = self.engine.get_opening(self.ctx) + self.assertIn("badhai", opening.lower()) + + +# ── Dropout Risk Model Tests ────────────────────────────────────────────── + +class TestDropoutRiskModel(unittest.TestCase): + + def setUp(self): + self.model = DropoutRiskModel() + + def _make_progress(self, days_inactive=0, completion=50, sessions=20, + avg_mins=15, streak=5, weekly=None): + p = MagicMock() + p.days_inactive = days_inactive + p.completion_pct = completion + p.total_sessions = sessions + p.avg_session_minutes = avg_mins + p.streak_days = streak + p.weekly_session_counts = weekly or [3, 3, 3, 3, 3, 3, 3, 3] + return p + + def _make_profile(self, grade=8, gender="female"): + pr = MagicMock() + pr.grade = grade + pr.gender = gender + return pr + + def test_low_risk_active_student(self): + progress = self._make_progress(days_inactive=0, completion=80, streak=14) + profile = self._make_profile() + result = self.model.score("STU001", progress, profile) + self.assertLess(result.score, 0.5) + self.assertIn(result.risk_level, ["low", "medium"]) + + def test_high_risk_inactive_student(self): + progress = self._make_progress( + days_inactive=14, completion=5, sessions=3, + avg_mins=2, streak=0, weekly=[5, 4, 3, 2, 1, 0, 0, 0] + ) + profile = self._make_profile() + result = self.model.score("STU002", progress, profile) + self.assertGreater(result.score, 0.5) + self.assertIn(result.risk_level, ["high", "critical"]) + + def test_score_in_valid_range(self): + for inactive in [0, 3, 7, 14, 30]: + progress = self._make_progress(days_inactive=inactive) + profile = self._make_profile() + result = self.model.score("STU003", progress, profile) + self.assertGreaterEqual(result.score, 0.0) + self.assertLessEqual(result.score, 1.0) + + def test_young_student_routes_to_parent(self): + progress = self._make_progress(days_inactive=5) + profile = self._make_profile(grade=4) + result = self.model.score("STU004", progress, profile) + self.assertEqual(result.recommended_contact, "parent") + + def test_decay_rate_declining_sessions(self): + weekly = [8, 7, 5, 3, 2, 1, 0, 0] + rate = _decay_rate(weekly) + self.assertGreater(rate, 0.05) + + def test_decay_rate_stable_sessions(self): + weekly = [4, 4, 4, 4, 4, 4, 4, 4] + rate = _decay_rate(weekly) + self.assertLess(rate, 0.2) + + def test_online_update_does_not_crash(self): + features = [0.5, 0.5, 0.3, 0.0, 0.4, 0.0] + # Should run without exception + self.model.update_online(features, did_return=True) + self.model.update_online(features, did_return=False) + + def test_primary_signal_is_string(self): + progress = self._make_progress(days_inactive=10) + profile = self._make_profile() + result = self.model.score("STU005", progress, profile) + self.assertIsInstance(result.primary_signal, str) + self.assertGreater(len(result.primary_signal), 0) + + +# ── Experiment Framework Tests ──────────────────────────────────────────── + +class TestExperimentFramework(unittest.TestCase): + + def setUp(self): + self.fw = ExperimentFramework() + + def test_arm_assignment_deterministic(self): + arm1 = self.fw.assign_arm("STU001", "exp_voice_v1") + arm2 = self.fw.assign_arm("STU001", "exp_voice_v1") + self.assertEqual(arm1, arm2) + + def test_different_students_spread_across_arms(self): + arms_seen = set() + for i in range(100): + arm = self.fw.assign_arm(f"STU{i:04d}", "exp_voice_v1") + arms_seen.add(arm) + # Should see at least 2 different arms with 100 students + self.assertGreater(len(arms_seen), 1) + + def test_traffic_split_approximately_correct(self): + counts = {arm: 0 for arm in Arm} + n = 1000 + for i in range(n): + arm = self.fw.assign_arm(f"STU{i:04d}", "exp_voice_v1") + counts[arm] += 1 + # Voice call should get ~40% ± 5% + voice_rate = counts[Arm.VOICE_CALL] / n + self.assertAlmostEqual(voice_rate, 0.40, delta=0.07) + + def test_record_event_stored(self): + self.fw.record_event("exp_voice_v1", "STU001", "nudged", {"answered": True}) + self.assertEqual(len(self.fw._events), 1) + + def test_summary_report_structure(self): + # Record some events + self.fw.record_event("exp_voice_v1", "STU001", "assigned") + self.fw.record_event("exp_voice_v1", "STU001", "nudged", {"answered": True, "duration_seconds": 90}) + self.fw.record_event("exp_voice_v1", "STU001", "returned", {"completion_delta": 15}) + report = self.fw.get_summary_report("exp_voice_v1") + self.assertIn("experiment_id", report) + self.assertIn("arms", report) + self.assertIn("best_performing_arm", report) + + def test_invalid_traffic_split_raises(self): + exp = Experiment( + experiment_id="bad_exp", + name="Bad", + description="Bad split", + arms=[Arm.CONTROL, Arm.VOICE_CALL], + traffic_split=[0.3, 0.3], # doesn't sum to 1 + start_date="2025-01-01", + ) + with self.assertRaises(ValueError): + self.fw.register_experiment(exp) + + def test_missing_experiment_returns_empty(self): + report = self.fw.get_summary_report("nonexistent_experiment") + self.assertEqual(report, {}) + + +# ── Decay Rate Tests ────────────────────────────────────────────────────── + +class TestDecayRate(unittest.TestCase): + + def test_complete_dropout(self): + weekly = [10, 8, 5, 2, 0, 0, 0, 0] + rate = _decay_rate(weekly) + self.assertGreater(rate, 0.05) + + def test_growing_engagement(self): + weekly = [1, 2, 3, 4, 5, 6, 7, 8] + rate = _decay_rate(weekly) + self.assertEqual(rate, 0.0) # no decay when growing + + def test_single_entry_returns_zero(self): + self.assertEqual(_decay_rate([5]), 0.0) + + def test_empty_returns_zero(self): + self.assertEqual(_decay_rate([]), 0.0) + + +if __name__ == "__main__": + unittest.main(verbosity=2)