diff --git a/.jules/bolt.md b/.jules/bolt.md index 956273fc..ce86e5fa 100644 --- a/.jules/bolt.md +++ b/.jules/bolt.md @@ -89,3 +89,7 @@ ## 2026-05-18 - Jaccard Similarity Optimization via Set Arithmetic **Learning:** In retrieval loops calculating Jaccard similarity (e.g. RAG), explicitly building a union set `A.union(B)` is expensive due to memory allocation and population. **Action:** Use the inclusion-exclusion principle $|A \cup B| = |A| + |B| - |A \cap B|$ to calculate union size in O(1) arithmetic time after calculating the intersection. Pre-calculate $|B|$ (token count) to further reduce overhead. Use `isdisjoint()` for fast early-exit. + +## 2026-05-19 - Replace func.sum(case(...)) with GROUP BY +**Learning:** Using multiple `func.sum(case(...))` aggregates over a single categorical column performs significantly slower than a simple `GROUP BY` query, as the database evaluates each condition for every row instead of optimizing via index/hashing. +**Action:** Replace `func.sum(case(...))` aggregation logic with a standard `GROUP BY` query and load the results into a Python dictionary for O(1) value lookups. diff --git a/backend/closure_service.py b/backend/closure_service.py index f4ecf984..52e26ef7 100644 --- a/backend/closure_service.py +++ b/backend/closure_service.py @@ -1,7 +1,12 @@ from sqlalchemy.orm import Session from sqlalchemy import func from datetime import datetime, timedelta, timezone -from backend.models import Grievance, GrievanceFollower, ClosureConfirmation, GrievanceStatus +from backend.models import ( + Grievance, + GrievanceFollower, + ClosureConfirmation, + GrievanceStatus, +) import logging import hashlib import hmac @@ -10,92 +15,117 @@ logger = logging.getLogger(__name__) + class ClosureService: """Service for handling grievance closure confirmation logic""" - + # Configuration CONFIRMATION_THRESHOLD = 0.60 # 60% of followers must confirm TIMEOUT_DAYS = 7 # 7 days to confirm MINIMUM_FOLLOWERS = 3 # Minimum followers needed for confirmation process - + @staticmethod def request_closure(grievance_id: int, db: Session) -> dict: """Request closure for a grievance - triggers confirmation process""" grievance = db.query(Grievance).filter(Grievance.id == grievance_id).first() if not grievance: raise ValueError("Grievance not found") - + if grievance.status == GrievanceStatus.RESOLVED: raise ValueError("Grievance is already resolved") - + # Count followers - follower_count = db.query(func.count(GrievanceFollower.id)).filter( - GrievanceFollower.grievance_id == grievance_id - ).scalar() - + follower_count = ( + db.query(func.count(GrievanceFollower.id)) + .filter(GrievanceFollower.grievance_id == grievance_id) + .scalar() + ) + # If less than minimum followers, skip confirmation process if follower_count < ClosureService.MINIMUM_FOLLOWERS: grievance.status = GrievanceStatus.RESOLVED grievance.resolved_at = datetime.now(timezone.utc) grievance.closure_approved = True db.commit() - + return { "message": "Grievance resolved (no confirmation needed - insufficient followers)", "skip_confirmation": True, - "follower_count": follower_count + "follower_count": follower_count, } - + # Set closure pending grievance.pending_closure = True grievance.closure_requested_at = datetime.now(timezone.utc) - grievance.closure_confirmation_deadline = datetime.now(timezone.utc) + timedelta(days=ClosureService.TIMEOUT_DAYS) + grievance.closure_confirmation_deadline = datetime.now( + timezone.utc + ) + timedelta(days=ClosureService.TIMEOUT_DAYS) db.commit() - - required_confirmations = max(1, int(follower_count * ClosureService.CONFIRMATION_THRESHOLD)) - + + required_confirmations = max( + 1, int(follower_count * ClosureService.CONFIRMATION_THRESHOLD) + ) + return { "message": "Closure confirmation requested - waiting for community approval", "skip_confirmation": False, "follower_count": follower_count, "required_confirmations": required_confirmations, - "deadline": grievance.closure_confirmation_deadline + "deadline": grievance.closure_confirmation_deadline, } - + @staticmethod - def submit_confirmation(grievance_id: int, user_email: str, confirmation_type: str, reason: str, db: Session) -> dict: + def submit_confirmation( + grievance_id: int, + user_email: str, + confirmation_type: str, + reason: str, + db: Session, + ) -> dict: """Submit a closure confirmation or dispute""" grievance = db.query(Grievance).filter(Grievance.id == grievance_id).first() if not grievance: raise ValueError("Grievance not found") - + if not grievance.pending_closure: raise ValueError("Grievance is not pending closure confirmation") - + # Check if user is a follower - is_follower = db.query(GrievanceFollower).filter( - GrievanceFollower.grievance_id == grievance_id, - GrievanceFollower.user_email == user_email - ).first() - + is_follower = ( + db.query(GrievanceFollower) + .filter( + GrievanceFollower.grievance_id == grievance_id, + GrievanceFollower.user_email == user_email, + ) + .first() + ) + if not is_follower: raise ValueError("Only followers can confirm or dispute closure") - + # Check if user already submitted confirmation - existing = db.query(ClosureConfirmation).filter( - ClosureConfirmation.grievance_id == grievance_id, - ClosureConfirmation.user_email == user_email - ).first() - + existing = ( + db.query(ClosureConfirmation) + .filter( + ClosureConfirmation.grievance_id == grievance_id, + ClosureConfirmation.user_email == user_email, + ) + .first() + ) + if existing: raise ValueError("You have already submitted a response for this closure") - + # Blockchain feature: calculate integrity hash for the closure confirmation # Performance Boost: Use thread-safe cache to eliminate DB query for last hash prev_hash = closure_last_hash_cache.get("last_hash") if prev_hash is None: # Cache miss: Fetch only the last hash from DB - last_record = db.query(ClosureConfirmation.integrity_hash).order_by(ClosureConfirmation.id.desc()).first() + last_record = ( + db.query(ClosureConfirmation.integrity_hash) + .order_by(ClosureConfirmation.id.desc()) + .first() + ) prev_hash = last_record[0] if last_record and last_record[0] else "" closure_last_hash_cache.set(data=prev_hash, key="last_hash") @@ -103,9 +133,7 @@ def submit_confirmation(grievance_id: int, user_email: str, confirmation_type: s hash_content = f"{grievance_id}|{user_email}|{confirmation_type}|{prev_hash}" secret_key = get_auth_config().secret_key integrity_hash = hmac.new( - secret_key.encode('utf-8'), - hash_content.encode('utf-8'), - hashlib.sha256 + secret_key.encode("utf-8"), hash_content.encode("utf-8"), hashlib.sha256 ).hexdigest() # Create confirmation record @@ -115,7 +143,7 @@ def submit_confirmation(grievance_id: int, user_email: str, confirmation_type: s confirmation_type=confirmation_type, reason=reason, integrity_hash=integrity_hash, - previous_integrity_hash=prev_hash + previous_integrity_hash=prev_hash, ) db.add(confirmation) db.commit() @@ -125,31 +153,41 @@ def submit_confirmation(grievance_id: int, user_email: str, confirmation_type: s # Check if threshold is met return ClosureService.check_and_finalize_closure(grievance_id, db) - + @staticmethod def check_and_finalize_closure(grievance_id: int, db: Session) -> dict: """Check if closure threshold is met and finalize if needed""" grievance = db.query(Grievance).filter(Grievance.id == grievance_id).first() if not grievance or not grievance.pending_closure: return {"closure_finalized": False} - + # Count followers and confirmations - total_followers = db.query(func.count(GrievanceFollower.id)).filter( - GrievanceFollower.grievance_id == grievance_id - ).scalar() - + total_followers = ( + db.query(func.count(GrievanceFollower.id)) + .filter(GrievanceFollower.grievance_id == grievance_id) + .scalar() + ) + # Get all confirmation counts in a single query instead of multiple round-trips - from sqlalchemy import case - stats = db.query( - func.sum(case((ClosureConfirmation.confirmation_type == 'confirmed', 1), else_=0)).label('confirmed'), - func.sum(case((ClosureConfirmation.confirmation_type == 'disputed', 1), else_=0)).label('disputed') - ).filter(ClosureConfirmation.grievance_id == grievance_id).first() - - confirmations_count = stats.confirmed or 0 - disputes_count = stats.disputed or 0 - - required_confirmations = max(1, int(total_followers * ClosureService.CONFIRMATION_THRESHOLD)) - + # Optimized: Replace expensive func.sum(case(...)) with a standard GROUP BY + counts = ( + db.query( + ClosureConfirmation.confirmation_type, + func.count(ClosureConfirmation.id), + ) + .filter(ClosureConfirmation.grievance_id == grievance_id) + .group_by(ClosureConfirmation.confirmation_type) + .all() + ) + counts_dict = dict(counts) + + confirmations_count = counts_dict.get("confirmed", 0) + disputes_count = counts_dict.get("disputed", 0) + + required_confirmations = max( + 1, int(total_followers * ClosureService.CONFIRMATION_THRESHOLD) + ) + # Check if threshold is met if confirmations_count >= required_confirmations: grievance.status = GrievanceStatus.RESOLVED @@ -157,44 +195,50 @@ def check_and_finalize_closure(grievance_id: int, db: Session) -> dict: grievance.closure_approved = True grievance.pending_closure = False db.commit() - + return { "closure_finalized": True, "approved": True, "confirmations": confirmations_count, "required": required_confirmations, - "message": "Grievance closure approved by community" + "message": "Grievance closure approved by community", } - + return { "closure_finalized": False, "confirmations": confirmations_count, "disputes": disputes_count, "required": required_confirmations, - "total_followers": total_followers + "total_followers": total_followers, } - + @staticmethod def check_timeout_and_finalize(db: Session): """Background task to check for timed-out closure requests""" now = datetime.now(timezone.utc) - + # Find grievances with expired deadlines - expired_grievances = db.query(Grievance).filter( - Grievance.pending_closure == True, - Grievance.closure_confirmation_deadline < now - ).all() - + expired_grievances = ( + db.query(Grievance) + .filter( + Grievance.pending_closure == True, + Grievance.closure_confirmation_deadline < now, + ) + .all() + ) + for grievance in expired_grievances: # Check current status result = ClosureService.check_and_finalize_closure(grievance.id, db) - + if not result.get("closure_finalized"): # Timeout - log dispute and keep open - logger.warning(f"Grievance {grievance.id} closure timeout - threshold not met") + logger.warning( + f"Grievance {grievance.id} closure timeout - threshold not met" + ) grievance.pending_closure = False grievance.closure_approved = False # Keep status as is (not resolved) db.commit() - - return len(expired_grievances) \ No newline at end of file + + return len(expired_grievances) diff --git a/backend/routers/grievances.py b/backend/routers/grievances.py index 9aa24312..64ebd51c 100644 --- a/backend/routers/grievances.py +++ b/backend/routers/grievances.py @@ -11,15 +11,25 @@ from backend.database import get_db import hmac from backend.config import get_auth_config -from backend.models import Grievance, EscalationAudit, GrievanceFollower, ClosureConfirmation +from backend.models import ( + Grievance, + EscalationAudit, + GrievanceFollower, + ClosureConfirmation, +) from backend.schemas import ( - GrievanceSummaryResponse, EscalationAuditResponse, EscalationStatsResponse, + GrievanceSummaryResponse, + EscalationAuditResponse, + EscalationStatsResponse, ResponsibilityMapResponse, - FollowGrievanceRequest, FollowGrievanceResponse, - RequestClosureRequest, RequestClosureResponse, - ConfirmClosureRequest, ConfirmClosureResponse, + FollowGrievanceRequest, + FollowGrievanceResponse, + RequestClosureRequest, + RequestClosureResponse, + ConfirmClosureRequest, + ConfirmClosureResponse, ClosureStatusResponse, - BlockchainVerificationResponse + BlockchainVerificationResponse, ) from backend.grievance_service import GrievanceService from backend.closure_service import ClosureService @@ -29,13 +39,14 @@ router = APIRouter() + @router.get("/grievances", response_model=List[GrievanceSummaryResponse]) def get_grievances( status: Optional[str] = Query(None, description="Filter by status"), category: Optional[str] = Query(None, description="Filter by category"), limit: int = Query(50, ge=1, le=200, description="Maximum number of results"), offset: int = Query(0, ge=0, description="Number of results to skip"), - db: Session = Depends(get_db) + db: Session = Depends(get_db), ): """ Get list of grievances with escalation history. @@ -48,8 +59,7 @@ def get_grievances( return Response(content=cached_json, media_type="application/json") query = db.query(Grievance).options( - selectinload(Grievance.audit_logs), - joinedload(Grievance.jurisdiction) + selectinload(Grievance.audit_logs), joinedload(Grievance.jurisdiction) ) if status: @@ -68,30 +78,50 @@ def get_grievances( "grievance_id": audit.grievance_id, "previous_authority": audit.previous_authority, "new_authority": audit.new_authority, - "timestamp": audit.timestamp.isoformat() if audit.timestamp else None, - "reason": audit.reason.value + "timestamp": ( + audit.timestamp.isoformat() if audit.timestamp else None + ), + "reason": audit.reason.value, } for audit in grievance.audit_logs ] - result_data.append({ - "id": grievance.id, - "unique_id": grievance.unique_id, - "category": grievance.category, - "severity": grievance.severity.value, - "pincode": grievance.pincode, - "city": grievance.city, - "district": grievance.district, - "state": grievance.state, - "current_jurisdiction_id": grievance.current_jurisdiction_id, - "assigned_authority": grievance.assigned_authority, - "sla_deadline": grievance.sla_deadline.isoformat() if grievance.sla_deadline else None, - "status": grievance.status.value, - "created_at": grievance.created_at.isoformat() if grievance.created_at else None, - "updated_at": grievance.updated_at.isoformat() if grievance.updated_at else None, - "resolved_at": grievance.resolved_at.isoformat() if grievance.resolved_at else None, - "escalation_history": escalation_history - }) + result_data.append( + { + "id": grievance.id, + "unique_id": grievance.unique_id, + "category": grievance.category, + "severity": grievance.severity.value, + "pincode": grievance.pincode, + "city": grievance.city, + "district": grievance.district, + "state": grievance.state, + "current_jurisdiction_id": grievance.current_jurisdiction_id, + "assigned_authority": grievance.assigned_authority, + "sla_deadline": ( + grievance.sla_deadline.isoformat() + if grievance.sla_deadline + else None + ), + "status": grievance.status.value, + "created_at": ( + grievance.created_at.isoformat() + if grievance.created_at + else None + ), + "updated_at": ( + grievance.updated_at.isoformat() + if grievance.updated_at + else None + ), + "resolved_at": ( + grievance.resolved_at.isoformat() + if grievance.resolved_at + else None + ), + "escalation_history": escalation_history, + } + ) # Cache serialized JSON to bypass Pydantic validation/serialization on hits json_data = json.dumps(result_data) @@ -103,6 +133,7 @@ def get_grievances( logger.error(f"Error getting grievances: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Failed to retrieve grievances") + @router.get("/grievances/{grievance_id}", response_model=GrievanceSummaryResponse) def get_grievance(grievance_id: int, db: Session = Depends(get_db)): """ @@ -110,10 +141,14 @@ def get_grievance(grievance_id: int, db: Session = Depends(get_db)): Optimized: Uses selectinload for audit_logs for consistent fetching performance. """ try: - grievance = db.query(Grievance).options( - selectinload(Grievance.audit_logs), - joinedload(Grievance.jurisdiction) - ).filter(Grievance.id == grievance_id).first() + grievance = ( + db.query(Grievance) + .options( + selectinload(Grievance.audit_logs), joinedload(Grievance.jurisdiction) + ) + .filter(Grievance.id == grievance_id) + .first() + ) if not grievance: raise HTTPException(status_code=404, detail="Grievance not found") @@ -125,7 +160,7 @@ def get_grievance(grievance_id: int, db: Session = Depends(get_db)): previous_authority=audit.previous_authority, new_authority=audit.new_authority, timestamp=audit.timestamp, - reason=audit.reason.value + reason=audit.reason.value, ) for audit in grievance.audit_logs ] @@ -146,7 +181,7 @@ def get_grievance(grievance_id: int, db: Session = Depends(get_db)): created_at=grievance.created_at, updated_at=grievance.updated_at, resolved_at=grievance.resolved_at, - escalation_history=escalation_history + escalation_history=escalation_history, ) except HTTPException: @@ -155,6 +190,7 @@ def get_grievance(grievance_id: int, db: Session = Depends(get_db)): logger.error(f"Error getting grievance {grievance_id}: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Failed to retrieve grievance") + @router.get("/escalation-stats", response_model=EscalationStatsResponse) def get_escalation_stats(db: Session = Depends(get_db)): """ @@ -168,27 +204,37 @@ def get_escalation_stats(db: Session = Depends(get_db)): return Response(content=cached_json, media_type="application/json") # Perform aggregation in a single query for performance - status_counts = db.query( - Grievance.status, - func.count(Grievance.id) - ).group_by(Grievance.status).all() + status_counts = ( + db.query(Grievance.status, func.count(Grievance.id)) + .group_by(Grievance.status) + .all() + ) # Process results into a dictionary for easy lookup - counts_dict = {status.value if hasattr(status, 'value') else status: count for status, count in status_counts} + counts_dict = { + status.value if hasattr(status, "value") else status: count + for status, count in status_counts + } total_grievances = sum(counts_dict.values()) escalated_grievances = counts_dict.get("escalated", 0) - active_grievances = counts_dict.get("open", 0) + counts_dict.get("in_progress", 0) + active_grievances = counts_dict.get("open", 0) + counts_dict.get( + "in_progress", 0 + ) resolved_grievances = counts_dict.get("resolved", 0) - escalation_rate = (escalated_grievances / total_grievances * 100) if total_grievances > 0 else 0 + escalation_rate = ( + (escalated_grievances / total_grievances * 100) + if total_grievances > 0 + else 0 + ) result_data = { "total_grievances": total_grievances, "escalated_grievances": escalated_grievances, "active_grievances": active_grievances, "resolved_grievances": resolved_grievances, - "escalation_rate": escalation_rate + "escalation_rate": escalation_rate, } # Cache serialized JSON @@ -199,18 +245,21 @@ def get_escalation_stats(db: Session = Depends(get_db)): except Exception as e: logger.error(f"Error getting escalation stats: {e}", exc_info=True) - raise HTTPException(status_code=500, detail="Failed to retrieve escalation statistics") + raise HTTPException( + status_code=500, detail="Failed to retrieve escalation statistics" + ) + @router.post("/grievances/{grievance_id}/escalate") def manual_escalate_grievance( grievance_id: int, request: Request, reason: str = Query(..., description="Reason for manual escalation"), - db: Session = Depends(get_db) + db: Session = Depends(get_db), ): """Manually escalate a grievance""" try: - grievance_service = getattr(request.app.state, 'grievance_service', None) + grievance_service = getattr(request.app.state, "grievance_service", None) if not grievance_service: # Try to initialize if missing (fallback) grievance_service = GrievanceService() @@ -225,7 +274,7 @@ def manual_escalate_grievance( grievance_id=grievance_id, new_severity=grievance.severity, # Keep same severity, just escalate jurisdiction reason=reason, - db=db + db=db, ) if success: @@ -239,10 +288,15 @@ def manual_escalate_grievance( logger.error(f"Error escalating grievance {grievance_id}: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Failed to escalate grievance") + def _load_responsibility_map(): # Assuming the data folder is at the root level relative to where backend is run # Adjust path as necessary. - file_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "data", "responsibility_map.json") + file_path = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(__file__))), + "data", + "responsibility_map.json", + ) if not os.path.exists(file_path): # Fallback to backend/../data ? No, backend is root usually file_path = os.path.join("data", "responsibility_map.json") @@ -250,6 +304,7 @@ def _load_responsibility_map(): with open(file_path, "r") as f: return json.load(f) + @router.get("/responsibility-map", response_model=ResponsibilityMapResponse) def get_responsibility_map(): """Get responsibility mapping data for civic authorities""" @@ -268,11 +323,12 @@ def get_responsibility_map(): # COMMUNITY CONFIRMATION ENDPOINTS (Issue #289) # ============================================================================ -@router.post("/grievances/{grievance_id}/follow", response_model=FollowGrievanceResponse) + +@router.post( + "/grievances/{grievance_id}/follow", response_model=FollowGrievanceResponse +) def follow_grievance( - grievance_id: int, - request: FollowGrievanceRequest, - db: Session = Depends(get_db) + grievance_id: int, request: FollowGrievanceRequest, db: Session = Depends(get_db) ): """Follow a grievance to receive updates and participate in closure confirmation""" try: @@ -280,36 +336,43 @@ def follow_grievance( grievance = db.query(Grievance).filter(Grievance.id == grievance_id).first() if not grievance: raise HTTPException(status_code=404, detail="Grievance not found") - + # Check if already following - existing = db.query(GrievanceFollower).filter( - GrievanceFollower.grievance_id == grievance_id, - GrievanceFollower.user_email == request.user_email - ).first() - + existing = ( + db.query(GrievanceFollower) + .filter( + GrievanceFollower.grievance_id == grievance_id, + GrievanceFollower.user_email == request.user_email, + ) + .first() + ) + if existing: - raise HTTPException(status_code=400, detail="Already following this grievance") - + raise HTTPException( + status_code=400, detail="Already following this grievance" + ) + # Create follower record follower = GrievanceFollower( - grievance_id=grievance_id, - user_email=request.user_email + grievance_id=grievance_id, user_email=request.user_email ) db.add(follower) db.commit() - + # Count total followers - total_followers = db.query(func.count(GrievanceFollower.id)).filter( - GrievanceFollower.grievance_id == grievance_id - ).scalar() - + total_followers = ( + db.query(func.count(GrievanceFollower.id)) + .filter(GrievanceFollower.grievance_id == grievance_id) + .scalar() + ) + return FollowGrievanceResponse( grievance_id=grievance_id, user_email=request.user_email, message="Successfully following grievance", - total_followers=total_followers + total_followers=total_followers, ) - + except HTTPException: raise except Exception as e: @@ -321,23 +384,27 @@ def follow_grievance( def unfollow_grievance( grievance_id: int, user_email: str = Query(..., description="Email of user to unfollow"), - db: Session = Depends(get_db) + db: Session = Depends(get_db), ): """Unfollow a grievance""" try: - follower = db.query(GrievanceFollower).filter( - GrievanceFollower.grievance_id == grievance_id, - GrievanceFollower.user_email == user_email - ).first() - + follower = ( + db.query(GrievanceFollower) + .filter( + GrievanceFollower.grievance_id == grievance_id, + GrievanceFollower.user_email == user_email, + ) + .first() + ) + if not follower: raise HTTPException(status_code=404, detail="Not following this grievance") - + db.delete(follower) db.commit() - + return {"message": "Successfully unfollowed grievance"} - + except HTTPException: raise except Exception as e: @@ -345,45 +412,51 @@ def unfollow_grievance( raise HTTPException(status_code=500, detail="Failed to unfollow grievance") -@router.post("/grievances/{grievance_id}/request-closure", response_model=RequestClosureResponse) +@router.post( + "/grievances/{grievance_id}/request-closure", response_model=RequestClosureResponse +) def request_grievance_closure( grievance_id: int, request_data: RequestClosureRequest, - db: Session = Depends(get_db) + db: Session = Depends(get_db), ): """Request closure of a grievance (admin only) - triggers community confirmation""" try: result = ClosureService.request_closure(grievance_id, db) - + if result.get("skip_confirmation"): return RequestClosureResponse( grievance_id=grievance_id, message=result["message"], confirmation_deadline=datetime.now(timezone.utc), total_followers=result["follower_count"], - required_confirmations=0 + required_confirmations=0, ) - + return RequestClosureResponse( grievance_id=grievance_id, message=result["message"], confirmation_deadline=result["deadline"], total_followers=result["follower_count"], - required_confirmations=result["required_confirmations"] + required_confirmations=result["required_confirmations"], ) - + except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: - logger.error(f"Error requesting closure for grievance {grievance_id}: {e}", exc_info=True) + logger.error( + f"Error requesting closure for grievance {grievance_id}: {e}", exc_info=True + ) raise HTTPException(status_code=500, detail="Failed to request closure") -@router.post("/grievances/{grievance_id}/confirm-closure", response_model=ConfirmClosureResponse) +@router.post( + "/grievances/{grievance_id}/confirm-closure", response_model=ConfirmClosureResponse +) def confirm_grievance_closure( grievance_id: int, confirmation: ConfirmClosureRequest, - db: Session = Depends(get_db) + db: Session = Depends(get_db), ): """Confirm or dispute a grievance closure (followers only)""" try: @@ -392,65 +465,76 @@ def confirm_grievance_closure( user_email=confirmation.user_email, confirmation_type=confirmation.confirmation_type, reason=confirmation.reason, - db=db + db=db, ) - + message = "Confirmation recorded" if result.get("closure_finalized"): if result.get("approved"): message = "Grievance closure approved by community!" else: message = "Confirmation recorded - grievance remains open" - + return ConfirmClosureResponse( grievance_id=grievance_id, message=message, current_confirmations=result.get("confirmations", 0), required_confirmations=result.get("required", 0), current_disputes=result.get("disputes", 0), - closure_approved=result.get("approved", False) + closure_approved=result.get("approved", False), ) - + except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: - logger.error(f"Error confirming closure for grievance {grievance_id}: {e}", exc_info=True) + logger.error( + f"Error confirming closure for grievance {grievance_id}: {e}", exc_info=True + ) raise HTTPException(status_code=500, detail="Failed to confirm closure") -@router.get("/grievances/{grievance_id}/closure-status", response_model=ClosureStatusResponse) -def get_closure_status( - grievance_id: int, - db: Session = Depends(get_db) -): +@router.get( + "/grievances/{grievance_id}/closure-status", response_model=ClosureStatusResponse +) +def get_closure_status(grievance_id: int, db: Session = Depends(get_db)): """Get current closure confirmation status for a grievance""" try: grievance = db.query(Grievance).filter(Grievance.id == grievance_id).first() if not grievance: raise HTTPException(status_code=404, detail="Grievance not found") - + # Optimized: Use a single aggregate query to calculate total followers, confirmations and disputes in one database roundtrip - total_followers = db.query(func.count(GrievanceFollower.id)).filter( - GrievanceFollower.grievance_id == grievance_id - ).scalar() - + total_followers = ( + db.query(func.count(GrievanceFollower.id)) + .filter(GrievanceFollower.grievance_id == grievance_id) + .scalar() + ) + # Get all confirmation counts in a single query instead of multiple round-trips - from sqlalchemy import case - stats = db.query( - func.sum(case((ClosureConfirmation.confirmation_type == 'confirmed', 1), else_=0)).label('confirmed'), - func.sum(case((ClosureConfirmation.confirmation_type == 'disputed', 1), else_=0)).label('disputed') - ).filter(ClosureConfirmation.grievance_id == grievance_id).first() - - confirmations_count = stats.confirmed or 0 - disputes_count = stats.disputed or 0 - - required_confirmations = max(1, int(total_followers * ClosureService.CONFIRMATION_THRESHOLD)) - + # Optimized: Replace expensive func.sum(case(...)) with a standard GROUP BY + counts = ( + db.query( + ClosureConfirmation.confirmation_type, + func.count(ClosureConfirmation.id), + ) + .filter(ClosureConfirmation.grievance_id == grievance_id) + .group_by(ClosureConfirmation.confirmation_type) + .all() + ) + counts_dict = dict(counts) + + confirmations_count = counts_dict.get("confirmed", 0) + disputes_count = counts_dict.get("disputed", 0) + + required_confirmations = max( + 1, int(total_followers * ClosureService.CONFIRMATION_THRESHOLD) + ) + days_remaining = None if grievance.closure_confirmation_deadline: delta = grievance.closure_confirmation_deadline - datetime.now(timezone.utc) days_remaining = max(0, delta.days) - + return ClosureStatusResponse( grievance_id=grievance_id, pending_closure=grievance.pending_closure or False, @@ -460,34 +544,40 @@ def get_closure_status( disputes_count=disputes_count, required_confirmations=required_confirmations, confirmation_deadline=grievance.closure_confirmation_deadline, - days_remaining=days_remaining + days_remaining=days_remaining, ) - + except HTTPException: raise except Exception as e: - logger.error(f"Error getting closure status for grievance {grievance_id}: {e}", exc_info=True) + logger.error( + f"Error getting closure status for grievance {grievance_id}: {e}", + exc_info=True, + ) raise HTTPException(status_code=500, detail="Failed to get closure status") -@router.get("/audit/{audit_id}/blockchain-verify", response_model=BlockchainVerificationResponse) -def verify_escalation_audit_blockchain( - audit_id: int, - db: Session = Depends(get_db) -): +@router.get( + "/audit/{audit_id}/blockchain-verify", response_model=BlockchainVerificationResponse +) +def verify_escalation_audit_blockchain(audit_id: int, db: Session = Depends(get_db)): """ Verify the cryptographic integrity of an escalation audit log using blockchain-style chaining. Optimized: Uses previous_integrity_hash column for O(1) verification. """ try: - audit = db.query( - EscalationAudit.grievance_id, - EscalationAudit.previous_authority, - EscalationAudit.new_authority, - EscalationAudit.reason, - EscalationAudit.integrity_hash, - EscalationAudit.previous_integrity_hash - ).filter(EscalationAudit.id == audit_id).first() + audit = ( + db.query( + EscalationAudit.grievance_id, + EscalationAudit.previous_authority, + EscalationAudit.new_authority, + EscalationAudit.reason, + EscalationAudit.integrity_hash, + EscalationAudit.previous_integrity_hash, + ) + .filter(EscalationAudit.id == audit_id) + .first() + ) if not audit: raise HTTPException(status_code=404, detail="Audit log not found") @@ -497,21 +587,21 @@ def verify_escalation_audit_blockchain( # Recompute hash based on current data and previous hash # Chaining logic: hash(grievance_id|previous_authority|new_authority|reason|prev_hash) - reason_str = audit.reason.value if hasattr(audit.reason, 'value') else str(audit.reason) + reason_str = ( + audit.reason.value if hasattr(audit.reason, "value") else str(audit.reason) + ) hash_content = f"{audit.grievance_id}|{audit.previous_authority}|{audit.new_authority}|{reason_str}|{prev_hash}" secret_key = get_auth_config().secret_key computed_hash = hmac.new( - secret_key.encode('utf-8'), - hash_content.encode('utf-8'), - hashlib.sha256 + secret_key.encode("utf-8"), hash_content.encode("utf-8"), hashlib.sha256 ).hexdigest() if audit.integrity_hash is None: is_valid = False message = "No integrity hash present for this audit log; cryptographic integrity cannot be verified." else: - is_valid = (computed_hash == audit.integrity_hash) + is_valid = computed_hash == audit.integrity_hash message = ( "Integrity verified. This escalation audit log is cryptographically sealed." if is_valid @@ -522,33 +612,40 @@ def verify_escalation_audit_blockchain( is_valid=is_valid, current_hash=audit.integrity_hash, computed_hash=computed_hash, - message=message + message=message, ) except HTTPException: raise except Exception as e: - logger.error(f"Error verifying escalation audit blockchain for {audit_id}: {e}", exc_info=True) + logger.error( + f"Error verifying escalation audit blockchain for {audit_id}: {e}", + exc_info=True, + ) raise HTTPException(status_code=500, detail="Failed to verify audit integrity") -@router.get("/grievances/{grievance_id}/blockchain-verify", response_model=BlockchainVerificationResponse) -def verify_grievance_blockchain( - grievance_id: int, - db: Session = Depends(get_db) -): +@router.get( + "/grievances/{grievance_id}/blockchain-verify", + response_model=BlockchainVerificationResponse, +) +def verify_grievance_blockchain(grievance_id: int, db: Session = Depends(get_db)): """ Verify the cryptographic integrity of a grievance using blockchain-style chaining. Optimized: Uses previous_integrity_hash column for O(1) verification. """ try: - grievance = db.query( - Grievance.unique_id, - Grievance.category, - Grievance.severity, - Grievance.integrity_hash, - Grievance.previous_integrity_hash - ).filter(Grievance.id == grievance_id).first() + grievance = ( + db.query( + Grievance.unique_id, + Grievance.category, + Grievance.severity, + Grievance.integrity_hash, + Grievance.previous_integrity_hash, + ) + .filter(Grievance.id == grievance_id) + .first() + ) if not grievance: raise HTTPException(status_code=404, detail="Grievance not found") @@ -558,18 +655,22 @@ def verify_grievance_blockchain( # Recompute hash based on current data and previous hash # Chaining logic: hash(unique_id|category|severity|prev_hash) - severity_value = grievance.severity.value if hasattr(grievance.severity, 'value') else grievance.severity - hash_content = f"{grievance.unique_id}|{grievance.category}|{severity_value}|{prev_hash}" + severity_value = ( + grievance.severity.value + if hasattr(grievance.severity, "value") + else grievance.severity + ) + hash_content = ( + f"{grievance.unique_id}|{grievance.category}|{severity_value}|{prev_hash}" + ) computed_hash = hashlib.sha256(hash_content.encode()).hexdigest() if grievance.integrity_hash is None: # Legacy or unsealed grievance: no integrity hash stored, so we cannot verify tampering. is_valid = False - message = ( - "No integrity hash present for this grievance; cryptographic integrity cannot be verified." - ) + message = "No integrity hash present for this grievance; cryptographic integrity cannot be verified." else: - is_valid = (computed_hash == grievance.integrity_hash) + is_valid = computed_hash == grievance.integrity_hash message = ( "Integrity verified. This grievance record is cryptographically sealed." if is_valid @@ -579,35 +680,49 @@ def verify_grievance_blockchain( is_valid=is_valid, current_hash=grievance.integrity_hash, computed_hash=computed_hash, - message=message + message=message, ) except HTTPException: raise except Exception as e: - logger.error(f"Error verifying grievance blockchain for {grievance_id}: {e}", exc_info=True) - raise HTTPException(status_code=500, detail="Failed to verify grievance integrity") + logger.error( + f"Error verifying grievance blockchain for {grievance_id}: {e}", + exc_info=True, + ) + raise HTTPException( + status_code=500, detail="Failed to verify grievance integrity" + ) -@router.get("/closure-confirmation/{confirmation_id}/blockchain-verify", response_model=BlockchainVerificationResponse) + +@router.get( + "/closure-confirmation/{confirmation_id}/blockchain-verify", + response_model=BlockchainVerificationResponse, +) def verify_closure_confirmation_blockchain( - confirmation_id: int, - db: Session = Depends(get_db) + confirmation_id: int, db: Session = Depends(get_db) ): """ Verify the cryptographic integrity of a closure confirmation using blockchain-style chaining. Optimized: Uses previous_integrity_hash column for O(1) verification. """ try: - confirmation = db.query( - ClosureConfirmation.grievance_id, - ClosureConfirmation.user_email, - ClosureConfirmation.confirmation_type, - ClosureConfirmation.integrity_hash, - ClosureConfirmation.previous_integrity_hash - ).filter(ClosureConfirmation.id == confirmation_id).first() + confirmation = ( + db.query( + ClosureConfirmation.grievance_id, + ClosureConfirmation.user_email, + ClosureConfirmation.confirmation_type, + ClosureConfirmation.integrity_hash, + ClosureConfirmation.previous_integrity_hash, + ) + .filter(ClosureConfirmation.id == confirmation_id) + .first() + ) if not confirmation: - raise HTTPException(status_code=404, detail="Closure confirmation not found") + raise HTTPException( + status_code=404, detail="Closure confirmation not found" + ) # Determine previous hash (O(1) from stored column) prev_hash = confirmation.previous_integrity_hash or "" @@ -618,9 +733,7 @@ def verify_closure_confirmation_blockchain( secret_key = get_auth_config().secret_key computed_hash = hmac.new( - secret_key.encode('utf-8'), - hash_content.encode('utf-8'), - hashlib.sha256 + secret_key.encode("utf-8"), hash_content.encode("utf-8"), hashlib.sha256 ).hexdigest() if confirmation.integrity_hash is None: @@ -638,11 +751,16 @@ def verify_closure_confirmation_blockchain( is_valid=is_valid, current_hash=confirmation.integrity_hash, computed_hash=computed_hash, - message=message + message=message, ) except HTTPException: raise except Exception as e: - logger.error(f"Error verifying closure confirmation blockchain for {confirmation_id}: {e}", exc_info=True) - raise HTTPException(status_code=500, detail="Failed to verify confirmation integrity") + logger.error( + f"Error verifying closure confirmation blockchain for {confirmation_id}: {e}", + exc_info=True, + ) + raise HTTPException( + status_code=500, detail="Failed to verify confirmation integrity" + ) diff --git a/backend/tests/benchmark_closure_status.py b/backend/tests/benchmark_closure_status.py deleted file mode 100644 index 9be928ca..00000000 --- a/backend/tests/benchmark_closure_status.py +++ /dev/null @@ -1,96 +0,0 @@ -import time -from sqlalchemy.orm import Session -from sqlalchemy import func, create_engine -from backend.models import Base -from backend.models import Grievance, GrievanceFollower, ClosureConfirmation, Issue, Jurisdiction, JurisdictionLevel, SeverityLevel -from sqlalchemy import case, distinct -import datetime - -# Create a temporary in-memory database for testing -engine = create_engine("sqlite:///:memory:") -Base.metadata.create_all(bind=engine) -SessionLocal = Session(bind=engine) - -def populate_db(db: Session, grievance_id: int): - # Add Jurisdiction - j = Jurisdiction(id=1, level=JurisdictionLevel.STATE, geographic_coverage={"states": ["Maharashtra"]}, responsible_authority="PWD", default_sla_hours=48) - db.add(j) - - # Add Grievance - g = Grievance( - id=grievance_id, - current_jurisdiction_id=1, - sla_deadline=datetime.datetime.now(datetime.timezone.utc), - status="open", - category="Road", - unique_id="123", - severity=SeverityLevel.LOW, - assigned_authority="PWD" - ) - db.add(g) - - # Add Followers - for i in range(50): - db.add(GrievanceFollower(grievance_id=grievance_id, user_email=f"user{i}@test.com")) - - # Add Confirmations - for i in range(30): - db.add(ClosureConfirmation(grievance_id=grievance_id, user_email=f"conf_user{i}@test.com", confirmation_type="confirmed")) - for i in range(10): - db.add(ClosureConfirmation(grievance_id=grievance_id, user_email=f"disp_user{i}@test.com", confirmation_type="disputed")) - - db.commit() - -def benchmark_old(db: Session, grievance_id: int, iterations=1000): - start = time.perf_counter() - for _ in range(iterations): - total_followers = db.query(func.count(GrievanceFollower.id)).filter( - GrievanceFollower.grievance_id == grievance_id - ).scalar() - - counts = db.query( - ClosureConfirmation.confirmation_type, - func.count(ClosureConfirmation.id) - ).filter(ClosureConfirmation.grievance_id == grievance_id).group_by(ClosureConfirmation.confirmation_type).all() - - counts_dict = {ctype: count for ctype, count in counts} - confirmations_count = counts_dict.get("confirmed", 0) - disputes_count = counts_dict.get("disputed", 0) - end = time.perf_counter() - if iterations > 10: - print(f"Old approach ({iterations} iters): {end - start:.4f}s") - return total_followers, confirmations_count, disputes_count - -def benchmark_new_agg(db: Session, grievance_id: int, iterations=1000): - start = time.perf_counter() - for _ in range(iterations): - total_followers = db.query(func.count(GrievanceFollower.id)).filter( - GrievanceFollower.grievance_id == grievance_id - ).scalar() - - # Optimize the two counts into one aggregate without group_by - stats = db.query( - func.sum(case((ClosureConfirmation.confirmation_type == 'confirmed', 1), else_=0)).label('confirmed'), - func.sum(case((ClosureConfirmation.confirmation_type == 'disputed', 1), else_=0)).label('disputed') - ).filter(ClosureConfirmation.grievance_id == grievance_id).first() - - confirmations_count = stats.confirmed or 0 - disputes_count = stats.disputed or 0 - end = time.perf_counter() - if iterations > 10: - print(f"New approach (Agg) ({iterations} iters): {end - start:.4f}s") - return total_followers, confirmations_count, disputes_count - -if __name__ == "__main__": - db = SessionLocal - populate_db(db, 1) - - # Warm up - benchmark_old(db, 1, 10) - benchmark_new_agg(db, 1, 10) - - res_old = benchmark_old(db, 1) - res_agg = benchmark_new_agg(db, 1) - - print(f"Old Results: {res_old}") - print(f"New Agg Results: {res_agg}")