diff --git a/backend/routers/issues.py b/backend/routers/issues.py index de27e1a9..ce0ddc17 100644 --- a/backend/routers/issues.py +++ b/backend/routers/issues.py @@ -5,6 +5,7 @@ from sqlalchemy.orm import Session, defer from sqlalchemy import func from typing import List, Union, Dict, Any +import asyncio import uuid import os import logging @@ -39,6 +40,13 @@ router = APIRouter() +# Asyncio lock to serialize blockchain hash generation within a single process. +# This prevents concurrent requests from reading the same prev_hash and forking +# the chain. Note: for multi-process deployments (multiple Gunicorn workers), +# each process holds its own lock and cache; the DB query on cache miss acts as +# the cross-process source of truth. +_blockchain_lock = asyncio.Lock() + @router.post("/api/issues", response_model=IssueCreateWithDeduplicationResponse, status_code=201) async def create_issue( request: Request, @@ -171,47 +179,48 @@ async def create_issue( try: # Save to DB only if no nearby issues found or deduplication failed if deduplication_info is None or not deduplication_info.has_nearby_issues: - # Blockchain feature: calculate integrity hash for the report - # Performance Boost: Use thread-safe cache to eliminate DB query for last hash - prev_hash = blockchain_last_hash_cache.get("last_hash") - if prev_hash is None: - # Cache miss: Fetch only the last hash from DB - prev_issue = await run_in_threadpool( - lambda: db.query(Issue.integrity_hash).order_by(Issue.id.desc()).first() + # Blockchain feature: calculate integrity hash for the report. + # Use an asyncio lock to make the read→compute→write sequence atomic + # within this process, preventing concurrent requests from forking the chain. + async with _blockchain_lock: + prev_hash = blockchain_last_hash_cache.get("last_hash") + if prev_hash is None: + # Cache miss: fetch only the last hash from DB + prev_issue = await run_in_threadpool( + lambda: db.query(Issue.integrity_hash).order_by(Issue.id.desc()).first() + ) + prev_hash = prev_issue[0] if prev_issue and prev_issue[0] else "" + + # Simple but effective SHA-256 chaining + hash_content = f"{description}|{category}|{prev_hash}" + integrity_hash = hashlib.sha256(hash_content.encode()).hexdigest() + + # RAG Retrieval (New) + relevant_rule = rag_service.retrieve(description) + initial_action_plan = None + if relevant_rule: + initial_action_plan = {"relevant_government_rule": relevant_rule} + + new_issue = Issue( + reference_id=str(uuid.uuid4()), + description=description, + category=category, + image_path=image_path, + source="web", + user_email=user_email, + latitude=latitude, + longitude=longitude, + location=location, + action_plan=initial_action_plan, + integrity_hash=integrity_hash, + previous_integrity_hash=prev_hash ) - prev_hash = prev_issue[0] if prev_issue and prev_issue[0] else "" - blockchain_last_hash_cache.set(data=prev_hash, key="last_hash") - - # Simple but effective SHA-256 chaining - hash_content = f"{description}|{category}|{prev_hash}" - integrity_hash = hashlib.sha256(hash_content.encode()).hexdigest() - - # Update cache for next report - blockchain_last_hash_cache.set(data=integrity_hash, key="last_hash") - - # RAG Retrieval (New) - relevant_rule = rag_service.retrieve(description) - initial_action_plan = None - if relevant_rule: - initial_action_plan = {"relevant_government_rule": relevant_rule} - - new_issue = Issue( - reference_id=str(uuid.uuid4()), - description=description, - category=category, - image_path=image_path, - source="web", - user_email=user_email, - latitude=latitude, - longitude=longitude, - location=location, - action_plan=initial_action_plan, - integrity_hash=integrity_hash, - previous_integrity_hash=prev_hash - ) - # Offload blocking DB operations to threadpool - await run_in_threadpool(save_issue_db, db, new_issue) + # Offload blocking DB operations to threadpool + await run_in_threadpool(save_issue_db, db, new_issue) + + # Update cache only after a successful DB save to keep it consistent + blockchain_last_hash_cache.set(data=integrity_hash, key="last_hash") else: # Don't create new issue, just return deduplication info new_issue = None