Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 48 additions & 39 deletions backend/routers/issues.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from sqlalchemy.orm import Session, defer
from sqlalchemy import func
from typing import List, Union, Dict, Any
import asyncio
import uuid
import os
import logging
Expand Down Expand Up @@ -39,6 +40,13 @@

router = APIRouter()

# Asyncio lock to serialize blockchain hash generation within a single process.
# This prevents concurrent requests from reading the same prev_hash and forking
# the chain. Note: for multi-process deployments (multiple Gunicorn workers),
# each process holds its own lock and cache; the DB query on cache miss acts as
# the cross-process source of truth.
_blockchain_lock = asyncio.Lock()

@router.post("/api/issues", response_model=IssueCreateWithDeduplicationResponse, status_code=201)
async def create_issue(
request: Request,
Expand Down Expand Up @@ -171,47 +179,48 @@ async def create_issue(
try:
# Save to DB only if no nearby issues found or deduplication failed
if deduplication_info is None or not deduplication_info.has_nearby_issues:
# Blockchain feature: calculate integrity hash for the report
# Performance Boost: Use thread-safe cache to eliminate DB query for last hash
prev_hash = blockchain_last_hash_cache.get("last_hash")
if prev_hash is None:
# Cache miss: Fetch only the last hash from DB
prev_issue = await run_in_threadpool(
lambda: db.query(Issue.integrity_hash).order_by(Issue.id.desc()).first()
# Blockchain feature: calculate integrity hash for the report.
# Use an asyncio lock to make the read→compute→write sequence atomic
# within this process, preventing concurrent requests from forking the chain.
async with _blockchain_lock:
prev_hash = blockchain_last_hash_cache.get("last_hash")
if prev_hash is None:
# Cache miss: fetch only the last hash from DB
prev_issue = await run_in_threadpool(
lambda: db.query(Issue.integrity_hash).order_by(Issue.id.desc()).first()
)
prev_hash = prev_issue[0] if prev_issue and prev_issue[0] else ""

# Simple but effective SHA-256 chaining
hash_content = f"{description}|{category}|{prev_hash}"
integrity_hash = hashlib.sha256(hash_content.encode()).hexdigest()

# RAG Retrieval (New)
relevant_rule = rag_service.retrieve(description)
initial_action_plan = None
if relevant_rule:
initial_action_plan = {"relevant_government_rule": relevant_rule}

new_issue = Issue(
reference_id=str(uuid.uuid4()),
description=description,
category=category,
image_path=image_path,
source="web",
user_email=user_email,
latitude=latitude,
longitude=longitude,
location=location,
action_plan=initial_action_plan,
integrity_hash=integrity_hash,
previous_integrity_hash=prev_hash
)
prev_hash = prev_issue[0] if prev_issue and prev_issue[0] else ""
blockchain_last_hash_cache.set(data=prev_hash, key="last_hash")

# Simple but effective SHA-256 chaining
hash_content = f"{description}|{category}|{prev_hash}"
integrity_hash = hashlib.sha256(hash_content.encode()).hexdigest()

# Update cache for next report
blockchain_last_hash_cache.set(data=integrity_hash, key="last_hash")

# RAG Retrieval (New)
relevant_rule = rag_service.retrieve(description)
initial_action_plan = None
if relevant_rule:
initial_action_plan = {"relevant_government_rule": relevant_rule}

new_issue = Issue(
reference_id=str(uuid.uuid4()),
description=description,
category=category,
image_path=image_path,
source="web",
user_email=user_email,
latitude=latitude,
longitude=longitude,
location=location,
action_plan=initial_action_plan,
integrity_hash=integrity_hash,
previous_integrity_hash=prev_hash
)

# Offload blocking DB operations to threadpool
await run_in_threadpool(save_issue_db, db, new_issue)
# Offload blocking DB operations to threadpool
await run_in_threadpool(save_issue_db, db, new_issue)

# Update cache only after a successful DB save to keep it consistent
blockchain_last_hash_cache.set(data=integrity_hash, key="last_hash")
else:
# Don't create new issue, just return deduplication info
new_issue = None
Expand Down