-
Notifications
You must be signed in to change notification settings - Fork 35
⚡ Bolt: Serialization caching and blockchain integrity for voice issues #679
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -95,10 +95,10 @@ async def lifespan(app: FastAPI): | |||||
| # Startup: Initialize Grievance Service (needed for escalation engine) | ||||||
| try: | ||||||
| logger.info("Initializing grievance service...") | ||||||
| # Temporarily disabled for local dev | ||||||
| # grievance_service = GrievanceService() | ||||||
| # app.state.grievance_service = grievance_service | ||||||
| logger.info("Grievance service initialization skipped for local dev.") | ||||||
| # Re-enabled to avoid redundant re-initialization on every manual escalation | ||||||
| grievance_service = GrievanceService() | ||||||
|
||||||
| grievance_service = GrievanceService() | |
| grievance_service = await run_in_threadpool(GrievanceService) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| from fastapi import APIRouter, Depends, HTTPException, Query, Request | ||
| from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response | ||
| from sqlalchemy.orm import Session, joinedload, selectinload | ||
| from sqlalchemy import func, case | ||
| from typing import List, Optional | ||
|
|
@@ -23,6 +23,7 @@ | |
| ) | ||
| from backend.grievance_service import GrievanceService | ||
| from backend.closure_service import ClosureService | ||
| from backend.cache import grievance_list_cache, escalation_stats_cache | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
@@ -38,9 +39,15 @@ def get_grievances( | |
| ): | ||
| """ | ||
| Get list of grievances with escalation history. | ||
| Optimized: Uses selectinload for audit_logs to avoid Cartesian product and improve O(N) fetching. | ||
| Optimized: Uses selectinload for audit_logs and serialization caching. | ||
| """ | ||
| try: | ||
| # Check cache first | ||
| cache_key = f"grievances_{status}_{category}_{limit}_{offset}" | ||
| cached_json = grievance_list_cache.get(cache_key) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P1: Caching was added for grievances/stats, but manual escalation does not invalidate those caches, so clients can receive stale data after escalation. Prompt for AI agents |
||
| if cached_json: | ||
| return Response(content=cached_json, media_type="application/json") | ||
|
|
||
|
Comment on lines
+45
to
+50
|
||
| query = db.query(Grievance).options( | ||
| selectinload(Grievance.audit_logs), | ||
| joinedload(Grievance.jurisdiction) | ||
|
|
@@ -57,37 +64,41 @@ def get_grievances( | |
| result = [] | ||
| for grievance in grievances: | ||
| escalation_history = [ | ||
| EscalationAuditResponse( | ||
| id=audit.id, | ||
| grievance_id=audit.grievance_id, | ||
| previous_authority=audit.previous_authority, | ||
| new_authority=audit.new_authority, | ||
| timestamp=audit.timestamp, | ||
| reason=audit.reason.value | ||
| ) | ||
| { | ||
| "id": audit.id, | ||
| "grievance_id": audit.grievance_id, | ||
| "previous_authority": audit.previous_authority, | ||
| "new_authority": audit.new_authority, | ||
| "timestamp": audit.timestamp.isoformat() if audit.timestamp else None, | ||
| "reason": audit.reason.value | ||
| } | ||
| for audit in grievance.audit_logs | ||
| ] | ||
|
|
||
| result.append(GrievanceSummaryResponse( | ||
| id=grievance.id, | ||
| unique_id=grievance.unique_id, | ||
| category=grievance.category, | ||
| severity=grievance.severity.value, | ||
| pincode=grievance.pincode, | ||
| city=grievance.city, | ||
| district=grievance.district, | ||
| state=grievance.state, | ||
| current_jurisdiction_id=grievance.current_jurisdiction_id, | ||
| assigned_authority=grievance.assigned_authority, | ||
| sla_deadline=grievance.sla_deadline, | ||
| status=grievance.status.value, | ||
| created_at=grievance.created_at, | ||
| updated_at=grievance.updated_at, | ||
| resolved_at=grievance.resolved_at, | ||
| escalation_history=escalation_history | ||
| )) | ||
|
|
||
| return result | ||
| result.append({ | ||
| "id": grievance.id, | ||
| "unique_id": grievance.unique_id, | ||
| "category": grievance.category, | ||
| "severity": grievance.severity.value, | ||
| "pincode": grievance.pincode, | ||
| "city": grievance.city, | ||
| "district": grievance.district, | ||
| "state": grievance.state, | ||
| "current_jurisdiction_id": grievance.current_jurisdiction_id, | ||
| "assigned_authority": grievance.assigned_authority, | ||
| "sla_deadline": grievance.sla_deadline.isoformat() if grievance.sla_deadline else None, | ||
| "status": grievance.status.value, | ||
| "created_at": grievance.created_at.isoformat() if grievance.created_at else None, | ||
| "updated_at": grievance.updated_at.isoformat() if grievance.updated_at else None, | ||
| "resolved_at": grievance.resolved_at.isoformat() if grievance.resolved_at else None, | ||
| "escalation_history": escalation_history | ||
| }) | ||
|
|
||
| # Cache pre-serialized JSON to bypass Pydantic overhead | ||
| json_data = json.dumps(result) | ||
| grievance_list_cache.set(json_data, cache_key) | ||
|
|
||
|
Comment on lines
+97
to
+100
|
||
| return Response(content=json_data, media_type="application/json") | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Error getting grievances: {e}", exc_info=True) | ||
|
|
@@ -149,32 +160,43 @@ def get_grievance(grievance_id: int, db: Session = Depends(get_db)): | |
| def get_escalation_stats(db: Session = Depends(get_db)): | ||
| """ | ||
| Get escalation statistics. | ||
| Optimized: Uses a single GROUP BY query instead of 4 separate count queries. | ||
| Optimized: Uses a single multi-metric aggregate query and serialization caching. | ||
| """ | ||
| try: | ||
| # Perform aggregation in a single query for performance | ||
| status_counts = db.query( | ||
| Grievance.status, | ||
| func.count(Grievance.id) | ||
| ).group_by(Grievance.status).all() | ||
| # Check cache first | ||
| cached_json = escalation_stats_cache.get("global_stats") | ||
| if cached_json: | ||
| return Response(content=cached_json, media_type="application/json") | ||
|
|
||
|
Comment on lines
+166
to
170
|
||
| # Process results into a dictionary for easy lookup | ||
| counts_dict = {status.value if hasattr(status, 'value') else status: count for status, count in status_counts} | ||
| # Perform aggregation in a single query for maximum performance | ||
| # Using func.sum(case(...)) is faster than group_by for a fixed set of statuses | ||
| stats = db.query( | ||
| func.count(Grievance.id).label('total'), | ||
| func.sum(case((Grievance.status == 'escalated', 1), else_=0)).label('escalated'), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P1: The new status aggregates compare enum values using lowercase strings, which can produce incorrect escalation stats for Enum-backed Prompt for AI agents |
||
| func.sum(case((Grievance.status == 'open', 1), (Grievance.status == 'in_progress', 1), else_=0)).label('active'), | ||
| func.sum(case((Grievance.status == 'resolved', 1), else_=0)).label('resolved') | ||
| ).first() | ||
|
Comment on lines
+171
to
+178
|
||
|
|
||
| total_grievances = sum(counts_dict.values()) | ||
| escalated_grievances = counts_dict.get("escalated", 0) | ||
| active_grievances = counts_dict.get("open", 0) + counts_dict.get("in_progress", 0) | ||
| resolved_grievances = counts_dict.get("resolved", 0) | ||
| total_grievances = stats.total or 0 | ||
| escalated_grievances = int(stats.escalated or 0) | ||
| active_grievances = int(stats.active or 0) | ||
| resolved_grievances = int(stats.resolved or 0) | ||
|
|
||
| escalation_rate = (escalated_grievances / total_grievances * 100) if total_grievances > 0 else 0 | ||
|
|
||
| return EscalationStatsResponse( | ||
| total_grievances=total_grievances, | ||
| escalated_grievances=escalated_grievances, | ||
| active_grievances=active_grievances, | ||
| resolved_grievances=resolved_grievances, | ||
| escalation_rate=escalation_rate | ||
| ) | ||
| data = { | ||
| "total_grievances": total_grievances, | ||
| "escalated_grievances": escalated_grievances, | ||
| "active_grievances": active_grievances, | ||
| "resolved_grievances": resolved_grievances, | ||
| "escalation_rate": escalation_rate | ||
| } | ||
|
|
||
| # Cache pre-serialized JSON to bypass Pydantic overhead | ||
| json_data = json.dumps(data) | ||
| escalation_stats_cache.set(json_data, "global_stats") | ||
|
|
||
| return Response(content=json_data, media_type="application/json") | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Error getting escalation stats: {e}", exc_info=True) | ||
|
|
@@ -276,6 +298,9 @@ def follow_grievance( | |
| ) | ||
| db.add(follower) | ||
| db.commit() | ||
|
|
||
| # Invalidate caches | ||
| grievance_list_cache.clear() | ||
|
|
||
| # Count total followers | ||
| total_followers = db.query(func.count(GrievanceFollower.id)).filter( | ||
|
|
@@ -314,6 +339,9 @@ def unfollow_grievance( | |
|
|
||
| db.delete(follower) | ||
| db.commit() | ||
|
|
||
| # Invalidate caches | ||
| grievance_list_cache.clear() | ||
|
|
||
| return {"message": "Successfully unfollowed grievance"} | ||
|
|
||
|
|
@@ -335,6 +363,9 @@ def request_grievance_closure( | |
| result = ClosureService.request_closure(grievance_id, db) | ||
|
|
||
| if result.get("skip_confirmation"): | ||
| # Invalidate caches as status might have changed to resolved | ||
| grievance_list_cache.clear() | ||
| escalation_stats_cache.clear() | ||
| return RequestClosureResponse( | ||
| grievance_id=grievance_id, | ||
| message=result["message"], | ||
|
|
@@ -343,6 +374,10 @@ def request_grievance_closure( | |
| required_confirmations=0 | ||
| ) | ||
|
|
||
| # Invalidate caches | ||
| grievance_list_cache.clear() | ||
| escalation_stats_cache.clear() | ||
|
|
||
| return RequestClosureResponse( | ||
| grievance_id=grievance_id, | ||
| message=result["message"], | ||
|
|
@@ -366,6 +401,10 @@ def confirm_grievance_closure( | |
| ): | ||
| """Confirm or dispute a grievance closure (followers only)""" | ||
| try: | ||
| # Invalidate caches as status or approval might change | ||
| grievance_list_cache.clear() | ||
| escalation_stats_cache.clear() | ||
|
|
||
| result = ClosureService.submit_confirmation( | ||
| grievance_id=grievance_id, | ||
| user_email=confirmation.user_email, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing cache invalidation on status updates and escalations.
Invalidation on
create_grievanceis good, butupdate_grievance_status(Line 232) and the escalation paths (escalate_grievance_severity,manual_escalate,run_escalation_check) also changeGrievance.status, which is the primary dimension bothgrievance_list_cache(filterable by status) andescalation_stats_cache(counts by status) are keyed on.Without matching invalidation, UI will show stale list contents and stale escalation counters for up to the 300s TTL after every status transition — which is exactly the kind of flicker that erodes trust in "live" escalation dashboards.
🛠️ Proposed fix
Apply the same pair of
clear()calls after successful commits insideescalation_engine.evaluate_and_escalate_grievances/manual_escalate/escalate_grievance_severity.🤖 Prompt for AI Agents