-
Notifications
You must be signed in to change notification settings - Fork 35
⚡ Bolt: [performance improvement] Implement serialization caching and blockchain audit logs #658
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -259,6 +259,19 @@ def index_exists(table, index_name): | |
| conn.execute(text("ALTER TABLE resolution_proof_tokens ADD COLUMN valid_until DATETIME")) | ||
| logger.info("Added valid_until column to resolution_proof_tokens") | ||
|
|
||
| # Evidence Audit Logs Table Migrations | ||
| if inspector.has_table("evidence_audit_logs"): | ||
| if not column_exists("evidence_audit_logs", "integrity_hash"): | ||
| conn.execute(text("ALTER TABLE evidence_audit_logs ADD COLUMN integrity_hash VARCHAR")) | ||
| logger.info("Added integrity_hash column to evidence_audit_logs") | ||
|
|
||
| if not column_exists("evidence_audit_logs", "previous_integrity_hash"): | ||
| conn.execute(text("ALTER TABLE evidence_audit_logs ADD COLUMN previous_integrity_hash VARCHAR")) | ||
| logger.info("Added previous_integrity_hash column to evidence_audit_logs") | ||
|
|
||
| if not index_exists("evidence_audit_logs", "ix_evidence_audit_logs_previous_integrity_hash"): | ||
| conn.execute(text("CREATE INDEX IF NOT EXISTS ix_evidence_audit_logs_previous_integrity_hash ON evidence_audit_logs (previous_integrity_hash)")) | ||
|
|
||
|
Comment on lines
+262
to
+274
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Backfill or explicitly reset the evidence-audit chain here. This migration only adds nullable columns and the index. Any existing 🤖 Prompt for AI Agents |
||
| logger.info("Database migration check completed successfully.") | ||
|
|
||
| except Exception as e: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,7 +25,7 @@ | |
| EvidenceAuditLog, VerificationStatus, GrievanceStatus | ||
| ) | ||
| from backend.config import get_config | ||
| from backend.cache import resolution_last_hash_cache | ||
| from backend.cache import resolution_last_hash_cache, evidence_audit_last_hash_cache | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
@@ -604,16 +604,37 @@ def _create_audit_log( | |
| actor_email: str, | ||
| db: Session | ||
| ) -> EvidenceAuditLog: | ||
| """Create an append-only audit log entry.""" | ||
| """ | ||
| Create an append-only audit log entry with blockchain integrity. | ||
| Optimized: Uses evidence_audit_last_hash_cache for O(1) chaining. | ||
| """ | ||
|
Comment on lines
+607
to
+610
|
||
| # Blockchain feature: calculate integrity hash for the audit log | ||
| prev_hash = evidence_audit_last_hash_cache.get("last_hash") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P1: Cache-based previous-hash lookup is not atomic across concurrent writes, so simultaneous inserts can fork the audit chain. Prompt for AI agents |
||
| if prev_hash is None: | ||
| # Cache miss: Fetch only the last hash from DB | ||
| last_audit = db.query(EvidenceAuditLog.integrity_hash).order_by(EvidenceAuditLog.id.desc()).first() | ||
| prev_hash = last_audit[0] if last_audit and last_audit[0] else "" | ||
| evidence_audit_last_hash_cache.set(data=prev_hash, key="last_hash") | ||
|
Comment on lines
+611
to
+617
|
||
|
|
||
| # Chaining logic: hash(evidence_id|action|actor_email|prev_hash) | ||
| hash_content = f"{evidence_id}|{action}|{actor_email}|{prev_hash}" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P1: Audit integrity hash excludes the Prompt for AI agents |
||
| integrity_hash = hashlib.sha256(hash_content.encode()).hexdigest() | ||
|
Comment on lines
+619
to
+621
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seal the full audit payload, and use HMAC here. The new digest only covers 🔐 Example direction- # Chaining logic: hash(evidence_id|action|actor_email|prev_hash)
- hash_content = f"{evidence_id}|{action}|{actor_email}|{prev_hash}"
- integrity_hash = hashlib.sha256(hash_content.encode()).hexdigest()
+ timestamp = datetime.now(timezone.utc).replace(microsecond=0)
+ payload = json.dumps(
+ {
+ "evidence_id": evidence_id,
+ "action": action,
+ "details": details,
+ "actor_email": actor_email,
+ "timestamp": timestamp.isoformat(),
+ "previous_integrity_hash": prev_hash,
+ },
+ sort_keys=True,
+ separators=(",", ":"),
+ )
+ integrity_hash = ResolutionProofService._sign_payload(payload)
log = EvidenceAuditLog(
evidence_id=evidence_id,
action=action,
details=details,
actor_email=actor_email,
+ timestamp=timestamp,
integrity_hash=integrity_hash,
previous_integrity_hash=prev_hash
)Also applies to: 628-629 🤖 Prompt for AI Agents |
||
|
|
||
|
Comment on lines
+619
to
+622
|
||
| log = EvidenceAuditLog( | ||
| evidence_id=evidence_id, | ||
| action=action, | ||
| details=details, | ||
| actor_email=actor_email, | ||
| integrity_hash=integrity_hash, | ||
| previous_integrity_hash=prev_hash | ||
| ) | ||
| db.add(log) | ||
| db.commit() | ||
| db.refresh(log) | ||
|
|
||
| # Update cache after successful commit | ||
| evidence_audit_last_hash_cache.set(data=integrity_hash, key="last_hash") | ||
|
|
||
| return log | ||
|
|
||
| @staticmethod | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,12 +4,13 @@ | |
| Issue #288: Field Officer Check-In System With Location Verification | ||
| """ | ||
|
|
||
| from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form | ||
| from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Response | ||
| from sqlalchemy.orm import Session | ||
| from sqlalchemy import func, case | ||
| from typing import List, Optional | ||
| import logging | ||
| import os | ||
| import json | ||
| from datetime import datetime, timezone | ||
|
|
||
| from backend.database import get_db | ||
|
|
@@ -31,7 +32,7 @@ | |
| calculate_visit_metrics, | ||
| get_geofencing_service | ||
| ) | ||
| from backend.cache import visit_last_hash_cache | ||
| from backend.cache import visit_last_hash_cache, visit_stats_cache | ||
| from backend.schemas import BlockchainVerificationResponse | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
@@ -424,8 +425,14 @@ def get_visit_statistics(db: Session = Depends(get_db)): | |
| Get aggregate statistics for all field officer visits using optimized SQL queries | ||
|
|
||
| Returns metrics like total visits, verification status, geo-fence compliance, etc. | ||
| Optimized: Uses serialization caching and a single aggregate SQL query. | ||
| """ | ||
| try: | ||
| # Check cache | ||
| cached_json = visit_stats_cache.get("default") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P2: Prompt for AI agents |
||
| if cached_json: | ||
| return Response(content=cached_json, media_type="application/json") | ||
|
Comment on lines
+431
to
+434
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Invalidate This cache is populated here, but the write paths in this router ( 🧹 Example invalidation points@@ def officer_check_in(...):
db.commit()
db.refresh(new_visit)
+ visit_stats_cache.invalidate("default")
@@ def officer_check_out(...):
db.commit()
db.refresh(visit)
+ visit_stats_cache.invalidate("default")
@@ def verify_visit(...):
db.commit()
+ visit_stats_cache.invalidate("default")Also applies to: 468-472 🤖 Prompt for AI Agents
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P2: Returning a raw Prompt for AI agents |
||
|
|
||
|
Comment on lines
+431
to
+435
|
||
| # Optimized: Use a single aggregate query to fetch multiple statistics in one database roundtrip | ||
| stats = db.query( | ||
| func.count(FieldOfficerVisit.id).label('total'), | ||
|
|
@@ -449,14 +456,20 @@ def get_visit_statistics(db: Session = Depends(get_db)): | |
| else: | ||
| average_distance = 0.0 | ||
|
|
||
| return VisitStatsResponse( | ||
| total_visits=total_visits, | ||
| verified_visits=verified_visits, | ||
| within_geofence_count=within_geofence_count, | ||
| outside_geofence_count=outside_geofence_count, | ||
| unique_officers=unique_officers, | ||
| average_distance_from_site=average_distance | ||
| ) | ||
| stats_data = { | ||
| "total_visits": total_visits, | ||
| "verified_visits": verified_visits, | ||
| "within_geofence_count": within_geofence_count, | ||
| "outside_geofence_count": outside_geofence_count, | ||
| "unique_officers": unique_officers, | ||
| "average_distance_from_site": average_distance | ||
| } | ||
|
|
||
| # Cache serialized JSON to bypass Pydantic overhead on hits | ||
| json_data = json.dumps(stats_data) | ||
| visit_stats_cache.set(json_data, "default") | ||
|
|
||
| return Response(content=json_data, media_type="application/json") | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Error calculating visit statistics: {e}", exc_info=True) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,8 @@ | |
| ClosureStatusResponse, | ||
| BlockchainVerificationResponse | ||
| ) | ||
| from backend.cache import grievances_list_cache, grievance_stats_cache | ||
| from fastapi import Response | ||
| from backend.grievance_service import GrievanceService | ||
| from backend.closure_service import ClosureService | ||
|
|
||
|
|
@@ -38,9 +40,15 @@ def get_grievances( | |
| ): | ||
| """ | ||
| Get list of grievances with escalation history. | ||
| Optimized: Uses selectinload for audit_logs to avoid Cartesian product and improve O(N) fetching. | ||
| Optimized: Uses serialization caching and selectinload for audit_logs. | ||
| """ | ||
| try: | ||
| # Check cache | ||
| cache_key = f"grievances_{status}_{category}_{limit}_{offset}" | ||
| cached_json = grievances_list_cache.get(cache_key) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P2: The new grievance list/stats caches are never invalidated after grievance mutations, so clients can receive stale escalation/list data until TTL expires. Prompt for AI agents |
||
| if cached_json: | ||
| return Response(content=cached_json, media_type="application/json") | ||
|
Comment on lines
+46
to
+50
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Invalidate grievance caches after grievance state changes. These caches are filled here, but I don't see matching invalidation when grievance state changes. Also applies to: 98-102, 167-170, 196-200 🤖 Prompt for AI Agents |
||
|
|
||
|
Comment on lines
+47
to
+51
|
||
| query = db.query(Grievance).options( | ||
| selectinload(Grievance.audit_logs), | ||
| joinedload(Grievance.jurisdiction) | ||
|
|
@@ -53,41 +61,45 @@ def get_grievances( | |
|
|
||
| grievances = query.offset(offset).limit(limit).all() | ||
|
|
||
| # Convert to response format | ||
| # Convert to response format (dictionaries for faster JSON serialization) | ||
| result = [] | ||
| for grievance in grievances: | ||
| escalation_history = [ | ||
| EscalationAuditResponse( | ||
| id=audit.id, | ||
| grievance_id=audit.grievance_id, | ||
| previous_authority=audit.previous_authority, | ||
| new_authority=audit.new_authority, | ||
| timestamp=audit.timestamp, | ||
| reason=audit.reason.value | ||
| ) | ||
| { | ||
| "id": audit.id, | ||
| "grievance_id": audit.grievance_id, | ||
| "previous_authority": audit.previous_authority, | ||
| "new_authority": audit.new_authority, | ||
| "timestamp": audit.timestamp.isoformat() if audit.timestamp else None, | ||
| "reason": audit.reason.value | ||
| } | ||
| for audit in grievance.audit_logs | ||
| ] | ||
|
|
||
| result.append(GrievanceSummaryResponse( | ||
| id=grievance.id, | ||
| unique_id=grievance.unique_id, | ||
| category=grievance.category, | ||
| severity=grievance.severity.value, | ||
| pincode=grievance.pincode, | ||
| city=grievance.city, | ||
| district=grievance.district, | ||
| state=grievance.state, | ||
| current_jurisdiction_id=grievance.current_jurisdiction_id, | ||
| assigned_authority=grievance.assigned_authority, | ||
| sla_deadline=grievance.sla_deadline, | ||
| status=grievance.status.value, | ||
| created_at=grievance.created_at, | ||
| updated_at=grievance.updated_at, | ||
| resolved_at=grievance.resolved_at, | ||
| escalation_history=escalation_history | ||
| )) | ||
|
|
||
| return result | ||
| result.append({ | ||
| "id": grievance.id, | ||
| "unique_id": grievance.unique_id, | ||
| "category": grievance.category, | ||
| "severity": grievance.severity.value, | ||
| "pincode": grievance.pincode, | ||
| "city": grievance.city, | ||
| "district": grievance.district, | ||
| "state": grievance.state, | ||
| "current_jurisdiction_id": grievance.current_jurisdiction_id, | ||
| "assigned_authority": grievance.assigned_authority, | ||
| "sla_deadline": grievance.sla_deadline.isoformat() if grievance.sla_deadline else None, | ||
| "status": grievance.status.value, | ||
| "created_at": grievance.created_at.isoformat() if grievance.created_at else None, | ||
| "updated_at": grievance.updated_at.isoformat() if grievance.updated_at else None, | ||
| "resolved_at": grievance.resolved_at.isoformat() if grievance.resolved_at else None, | ||
| "escalation_history": escalation_history | ||
| }) | ||
|
|
||
| # Cache serialized JSON to bypass Pydantic overhead on hits | ||
| json_data = json.dumps(result) | ||
| grievances_list_cache.set(json_data, cache_key) | ||
|
|
||
| return Response(content=json_data, media_type="application/json") | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Error getting grievances: {e}", exc_info=True) | ||
|
|
@@ -149,9 +161,14 @@ def get_grievance(grievance_id: int, db: Session = Depends(get_db)): | |
| def get_escalation_stats(db: Session = Depends(get_db)): | ||
| """ | ||
| Get escalation statistics. | ||
| Optimized: Uses a single GROUP BY query instead of 4 separate count queries. | ||
| Optimized: Uses serialization caching and a single GROUP BY query. | ||
| """ | ||
| try: | ||
| # Check cache | ||
| cached_json = grievance_stats_cache.get("default") | ||
| if cached_json: | ||
| return Response(content=cached_json, media_type="application/json") | ||
|
|
||
|
Comment on lines
+167
to
+171
|
||
| # Perform aggregation in a single query for performance | ||
| status_counts = db.query( | ||
| Grievance.status, | ||
|
|
@@ -168,13 +185,19 @@ def get_escalation_stats(db: Session = Depends(get_db)): | |
|
|
||
| escalation_rate = (escalated_grievances / total_grievances * 100) if total_grievances > 0 else 0 | ||
|
|
||
| return EscalationStatsResponse( | ||
| total_grievances=total_grievances, | ||
| escalated_grievances=escalated_grievances, | ||
| active_grievances=active_grievances, | ||
| resolved_grievances=resolved_grievances, | ||
| escalation_rate=escalation_rate | ||
| ) | ||
| stats_data = { | ||
| "total_grievances": total_grievances, | ||
| "escalated_grievances": escalated_grievances, | ||
| "active_grievances": active_grievances, | ||
| "resolved_grievances": resolved_grievances, | ||
| "escalation_rate": escalation_rate | ||
| } | ||
|
|
||
| # Cache serialized JSON to bypass Pydantic overhead on hits | ||
| json_data = json.dumps(stats_data) | ||
| grievance_stats_cache.set(json_data, "default") | ||
|
|
||
| return Response(content=json_data, media_type="application/json") | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Error getting escalation stats: {e}", exc_info=True) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The migration adds integrity columns but doesn’t backfill
integrity_hash/previous_integrity_hashfor existingevidence_audit_logsrows. If integrity verification is expected to cover historical logs, consider a backfill step (iterate in id/timestamp order and compute the chain), or explicitly document that chaining starts only from the first post-migration entry.