Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,5 +180,6 @@ def invalidate(self):
blockchain_last_hash_cache = ThreadSafeCache(ttl=3600, max_size=1)
grievance_last_hash_cache = ThreadSafeCache(ttl=3600, max_size=1)
resolution_last_hash_cache = ThreadSafeCache(ttl=3600, max_size=1)
audit_last_hash_cache = ThreadSafeCache(ttl=3600, max_size=2)
visit_last_hash_cache = ThreadSafeCache(ttl=3600, max_size=2)
user_issues_cache = ThreadSafeCache(ttl=300, max_size=50) # 5 minutes TTL
54 changes: 49 additions & 5 deletions backend/escalation_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@
"""

import datetime
import hashlib
import hmac
import os
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import: os is imported but not referenced in this module after the changes. Please remove it to avoid lint warnings and keep imports minimal.

Suggested change
import os

Copilot uses AI. Check for mistakes.
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from backend.models import Grievance, Jurisdiction, EscalationAudit, GrievanceStatus, JurisdictionLevel, EscalationReason, SeverityLevel
from backend.database import SessionLocal
from backend.cache import audit_last_hash_cache
from backend.config import get_config
from backend.routing_service import RoutingService
from backend.sla_config_service import SLAConfigService

Expand Down Expand Up @@ -41,8 +46,10 @@ def evaluate_and_escalate_grievances(self, db: Session = None) -> Dict[str, int]
Returns:
Dictionary with escalation statistics
"""
should_close = False
if db is None:
db = SessionLocal()
should_close = True

try:
# Get grievances that need evaluation
Expand All @@ -63,7 +70,7 @@ def evaluate_and_escalate_grievances(self, db: Session = None) -> Dict[str, int]
}

finally:
if db is not SessionLocal():
if should_close:
db.close()

def escalate_grievance_severity(self, grievance_id: int, new_severity: SeverityLevel,
Expand All @@ -80,8 +87,10 @@ def escalate_grievance_severity(self, grievance_id: int, new_severity: SeverityL
Returns:
True if escalation successful
"""
should_close = False
if db is None:
db = SessionLocal()
should_close = True

try:
grievance = db.query(Grievance).filter(Grievance.id == grievance_id).first()
Expand All @@ -108,7 +117,7 @@ def escalate_grievance_severity(self, grievance_id: int, new_severity: SeverityL
print(f"Error escalating grievance severity: {e}")
return False
finally:
if db is not SessionLocal():
if should_close:
db.close()

def manual_escalate(self, grievance_id: int, reason: str = "", db: Session = None) -> bool:
Expand All @@ -123,8 +132,10 @@ def manual_escalate(self, grievance_id: int, reason: str = "", db: Session = Non
Returns:
True if escalation successful
"""
should_close = False
if db is None:
db = SessionLocal()
should_close = True

try:
grievance = db.query(Grievance).filter(Grievance.id == grievance_id).first()
Expand All @@ -134,7 +145,7 @@ def manual_escalate(self, grievance_id: int, reason: str = "", db: Session = Non
return self._escalate_grievance(grievance, EscalationReason.MANUAL, db, reason)

finally:
if db is not SessionLocal():
if should_close:
db.close()

def _get_grievances_for_evaluation(self, db: Session) -> List[Grievance]:
Expand Down Expand Up @@ -170,7 +181,13 @@ def _should_escalate(self, grievance: Grievance, db: Session) -> bool:
"""
# Check if SLA is breached
now = datetime.datetime.now(datetime.timezone.utc)
if grievance.sla_deadline >= now:

# Handle naive datetimes from SQLite
deadline = grievance.sla_deadline
if deadline and deadline.tzinfo is None:
deadline = deadline.replace(tzinfo=datetime.timezone.utc)

if deadline >= now:
return False

# Check if escalation is possible
Expand Down Expand Up @@ -248,18 +265,45 @@ def _escalate_grievance(self, grievance: Grievance, reason: EscalationReason,
# Recalculate SLA
self._recalculate_sla(grievance, db)

# Optimized Blockchain logic: Cache-first retrieval to ensure O(1) creation path
prev_hash = audit_last_hash_cache.get("last_hash")
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai Bot Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Under concurrent escalations, two transactions can read the same prev_hash from the in-process cache (or from the non-locking fallback query), both commit with the same previous_integrity_hash, and the chain forks. The cache-first approach provides no atomicity guarantee. Consider computing and updating the last-hash under a DB-level lock (e.g., a singleton chain-state row with SELECT ... FOR UPDATE, or a unique constraint on previous_integrity_hash with retry on conflict).

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/escalation_engine.py, line 269:

<comment>Under concurrent escalations, two transactions can read the same `prev_hash` from the in-process cache (or from the non-locking fallback query), both commit with the same `previous_integrity_hash`, and the chain forks. The cache-first approach provides no atomicity guarantee. Consider computing and updating the last-hash under a DB-level lock (e.g., a singleton chain-state row with `SELECT ... FOR UPDATE`, or a unique constraint on `previous_integrity_hash` with retry on conflict).</comment>

<file context>
@@ -248,18 +265,45 @@ def _escalate_grievance(self, grievance: Grievance, reason: EscalationReason,
             self._recalculate_sla(grievance, db)
 
+            # Optimized Blockchain logic: Cache-first retrieval to ensure O(1) creation path
+            prev_hash = audit_last_hash_cache.get("last_hash")
+            if prev_hash is None:
+                # Cache miss: Fetch only the last hash from DB
</file context>
Fix with Cubic

if prev_hash is None:
# Cache miss: Fetch only the last hash from DB
last_audit = db.query(EscalationAudit.integrity_hash).order_by(EscalationAudit.id.desc()).first()
prev_hash = last_audit[0] if last_audit and last_audit[0] else ""
# Populate cache for subsequent escalations
audit_last_hash_cache.set(data=prev_hash, key="last_hash")
Comment on lines +268 to +275
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Make chain-head advancement atomic across requests.

prev_hash is read from process-local state, the new audit is committed, and only then is the head cache updated. Two requests can therefore read the same head and both insert children of the same previous_integrity_hash; separate workers make this even easier because each process has its own audit_last_hash_cache. That forks the audit chain while verify_audit_blockchain() can still report each record as valid.

Use a DB-backed chain-head row or another cross-process lock/CAS so “read head → insert audit → advance head” happens atomically.

Also applies to: 304-305

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/escalation_engine.py` around lines 268 - 275, The code reads the
current chain head from the process-local audit_last_hash_cache into prev_hash,
inserts a new EscalationAudit row, then updates the cache, which allows
concurrent requests/processes to fork the chain; to fix, replace the cache-only
head with a DB-backed chain-head row or use a transactional SELECT ... FOR
UPDATE / row-level lock or an atomic compare-and-swap (upsert) around reading
and advancing the head so the sequence "read head → insert new EscalationAudit →
advance head" is performed atomically across processes; update the code paths
that reference audit_last_hash_cache and prev_hash (also around lines 304-305)
to acquire the DB lock/perform CAS, insert the audit within the same
transaction, and only then update the cache from the committed DB head so
verify_audit_blockchain() cannot observe divergent forks.


Comment on lines +268 to +276
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prev_hash is sourced from an in-process cache (or a non-locking ORDER BY ... DESC query on cache miss). Under concurrent escalations, two transactions can observe the same prev_hash and both commit, causing the chain to fork (multiple rows pointing at the same previous_integrity_hash). If a linear chain is required, compute/update the last-hash value under a DB lock/transactional guard (e.g., a singleton chain-state row with SELECT ... FOR UPDATE, or a unique constraint + retry on conflict) rather than relying on process-local caching.

Copilot uses AI. Check for mistakes.
# HMAC-SHA256 chaining: hash(grievance_id|prev_auth|new_auth|reason|prev_hash)
# Using centralized config to avoid hardcoded secret fallbacks (Security compliance)
app_config = get_config()
secret_key = app_config.secret_key.encode('utf-8')
reason_val = reason.value if hasattr(reason, 'value') else reason
hash_content = f"{grievance.id}|{previous_authority}|{grievance.assigned_authority}|{reason_val}|{prev_hash}"
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai Bot Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: The timestamp and notes fields of EscalationAudit are not included in hash_content, so they can be altered in the database without breaking the integrity chain. For a tamper-detection audit trail, the timestamp is critical — an attacker with DB access could backdate escalation records undetected. Generate the timestamp before computing the hash and include both timestamp and notes in the sealed content.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/escalation_engine.py, line 282:

<comment>The `timestamp` and `notes` fields of `EscalationAudit` are not included in `hash_content`, so they can be altered in the database without breaking the integrity chain. For a tamper-detection audit trail, the timestamp is critical — an attacker with DB access could backdate escalation records undetected. Generate the timestamp before computing the hash and include both `timestamp` and `notes` in the sealed content.</comment>

<file context>
@@ -248,18 +265,45 @@ def _escalate_grievance(self, grievance: Grievance, reason: EscalationReason,
+            app_config = get_config()
+            secret_key = app_config.secret_key.encode('utf-8')
+            reason_val = reason.value if hasattr(reason, 'value') else reason
+            hash_content = f"{grievance.id}|{previous_authority}|{grievance.assigned_authority}|{reason_val}|{prev_hash}"
+
+            integrity_hash = hmac.new(
</file context>
Fix with Cubic


integrity_hash = hmac.new(
secret_key,
hash_content.encode('utf-8'),
hashlib.sha256
).hexdigest()

Comment on lines +277 to +289
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The HMAC payload used for integrity_hash excludes mutable audit fields like timestamp and notes. As a result, edits to those fields would not be detected by verification, even though they are part of the audit record. If the intent is full audit-record integrity, include all fields you want protected (e.g., timestamp in a stable format and notes with a deterministic null/empty representation) in the hashed content, and ensure the same canonicalization is used in both creation and verification paths.

Copilot uses AI. Check for mistakes.
# Create audit log
audit_log = EscalationAudit(
grievance_id=grievance.id,
previous_authority=previous_authority,
new_authority=grievance.assigned_authority,
reason=reason,
notes=notes
notes=notes,
integrity_hash=integrity_hash,
previous_integrity_hash=prev_hash
Comment on lines +277 to +298
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Seal the manual justification text too.

notes is persisted on EscalationAudit but excluded from hash_content. For manual escalations that field carries the operator-supplied reason, so it can be edited later without invalidating integrity_hash. Include notes—and any other mutable audit fields you want to make tamper-evident—in the canonical HMAC payload here and in verify_audit_blockchain().

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/escalation_engine.py` around lines 277 - 298, The HMAC payload
(hash_content) omits mutable operator text (notes), allowing tampering; update
the hash construction in the escalation routine where hash_content is built
(currently using grievance.id, previous_authority, grievance.assigned_authority,
reason_val, prev_hash) to also include notes (and any other mutable audit fields
you want tamper-evident) in a deterministic, canonical form (e.g.,
normalize/escape or JSON-serialize fields and include notes after reason_val),
regenerate integrity_hash from that canonical payload, and persist as before on
EscalationAudit; mirror the exact same canonicalization and field order in
verify_audit_blockchain() so verification uses the identical payload (handle
None/empty values consistently).

)

db.add(audit_log)
db.commit()

# Update cache after successful commit to maintain chain integrity
audit_last_hash_cache.set(data=integrity_hash, key="last_hash")

return True

except Exception as e:
Expand Down
13 changes: 13 additions & 0 deletions backend/init_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,19 @@ def index_exists(table, index_name):
if not index_exists("resolution_evidence", "ix_resolution_evidence_previous_integrity_hash"):
conn.execute(text("CREATE INDEX IF NOT EXISTS ix_resolution_evidence_previous_integrity_hash ON resolution_evidence (previous_integrity_hash)"))

# Escalation Audits Table Migrations
if inspector.has_table("escalation_audits"):
if not column_exists("escalation_audits", "integrity_hash"):
conn.execute(text("ALTER TABLE escalation_audits ADD COLUMN integrity_hash VARCHAR"))
logger.info("Added integrity_hash column to escalation_audits")

if not column_exists("escalation_audits", "previous_integrity_hash"):
conn.execute(text("ALTER TABLE escalation_audits ADD COLUMN previous_integrity_hash VARCHAR"))
logger.info("Added previous_integrity_hash column to escalation_audits")

if not index_exists("escalation_audits", "ix_escalation_audits_previous_integrity_hash"):
conn.execute(text("CREATE INDEX IF NOT EXISTS ix_escalation_audits_previous_integrity_hash ON escalation_audits (previous_integrity_hash)"))

# Resolution Proof Tokens Table Migrations
if inspector.has_table("resolution_proof_tokens"):
if not column_exists("resolution_proof_tokens", "nonce"):
Expand Down
4 changes: 4 additions & 0 deletions backend/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ class EscalationAudit(Base):
reason = Column(Enum(EscalationReason), nullable=False)
notes = Column(Text, nullable=True) # Additional context

# Blockchain integrity fields
integrity_hash = Column(String, nullable=True)
previous_integrity_hash = Column(String, nullable=True, index=True)

# Relationships
grievance = relationship("Grievance", back_populates="audit_logs")

Expand Down
61 changes: 59 additions & 2 deletions backend/routers/grievances.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import logging
import hashlib
import hmac
from datetime import datetime, timezone

from backend.database import get_db
Expand All @@ -21,6 +22,7 @@
)
from backend.grievance_service import GrievanceService
from backend.closure_service import ClosureService
from backend.config import get_config

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -192,9 +194,8 @@ def manual_escalate_grievance(
raise HTTPException(status_code=404, detail="Grievance not found")

# Perform manual escalation
success = grievance_service.escalation_engine.escalate_grievance_severity(
success = grievance_service.escalation_engine.manual_escalate(
grievance_id=grievance_id,
new_severity=grievance.severity, # Keep same severity, just escalate jurisdiction
reason=reason,
db=db
)
Expand Down Expand Up @@ -496,3 +497,59 @@ def verify_grievance_blockchain(
except Exception as e:
logger.error(f"Error verifying grievance blockchain for {grievance_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Failed to verify grievance integrity")


@router.get("/audit/{audit_id}/blockchain-verify", response_model=BlockchainVerificationResponse)
def verify_audit_blockchain(
audit_id: int,
db: Session = Depends(get_db)
):
"""
Verify the cryptographic integrity of an escalation audit record using HMAC-SHA256 chaining.
Optimized: Uses previous_integrity_hash column for O(1) verification.
"""
try:
audit = db.query(EscalationAudit).filter(EscalationAudit.id == audit_id).first()

if not audit:
raise HTTPException(status_code=404, detail="Audit record not found")

# Determine previous hash (O(1) from stored column)
prev_hash = audit.previous_integrity_hash or ""

# HMAC-SHA256 chaining: hash(grievance_id|prev_auth|new_auth|reason|prev_hash)
# Using centralized config for secret key to ensure security compliance
app_config = get_config()
secret_key = app_config.secret_key.encode('utf-8')
reason_val = audit.reason.value if hasattr(audit.reason, 'value') else audit.reason
hash_content = f"{audit.grievance_id}|{audit.previous_authority}|{audit.new_authority}|{reason_val}|{prev_hash}"

computed_hash = hmac.new(
secret_key,
hash_content.encode('utf-8'),
hashlib.sha256
).hexdigest()

if audit.integrity_hash is None:
is_valid = False
message = "No integrity hash present for this audit record; cryptographic integrity cannot be verified."
else:
is_valid = hmac.compare_digest(computed_hash, audit.integrity_hash)
message = (
"Integrity verified. This escalation audit record is cryptographically sealed."
if is_valid
else "Integrity check failed! The audit data does not match its cryptographic seal."
)

return BlockchainVerificationResponse(
is_valid=is_valid,
current_hash=audit.integrity_hash,
computed_hash=computed_hash,
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai Bot Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Returning computed_hash from an HMAC-based verification endpoint creates a signing oracle. Any caller can obtain the correct HMAC for the current database state, so an attacker with DB write access can tamper with a record, hit this endpoint to get the valid HMAC, then update integrity_hash to match — completely bypassing tamper detection. For HMAC-protected records, the response should only include is_valid and message, not the computed digest.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/routers/grievances.py, line 547:

<comment>Returning `computed_hash` from an HMAC-based verification endpoint creates a signing oracle. Any caller can obtain the correct HMAC for the current database state, so an attacker with DB write access can tamper with a record, hit this endpoint to get the valid HMAC, then update `integrity_hash` to match — completely bypassing tamper detection. For HMAC-protected records, the response should only include `is_valid` and `message`, not the computed digest.</comment>

<file context>
@@ -496,3 +497,59 @@ def verify_grievance_blockchain(
+        return BlockchainVerificationResponse(
+            is_valid=is_valid,
+            current_hash=audit.integrity_hash,
+            computed_hash=computed_hash,
+            message=message
+        )
</file context>
Fix with Cubic

message=message
)
Comment on lines +544 to +549
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The audit blockchain verification endpoint returns computed_hash in the API response. Because this is an HMAC derived from the server secret, the endpoint effectively becomes a signing oracle: anyone who can tamper with audit fields in the DB could call this endpoint to obtain a valid HMAC for the modified record and then update integrity_hash to match, defeating the seal. Consider removing computed_hash from the public response (or only returning it behind admin auth / debug mode) and using a separate response schema for audit verification that only returns is_valid (+ maybe current_hash and a message).

Suggested change
return BlockchainVerificationResponse(
is_valid=is_valid,
current_hash=audit.integrity_hash,
computed_hash=computed_hash,
message=message
)
return {
"is_valid": is_valid,
"current_hash": audit.integrity_hash,
"message": message
}

Copilot uses AI. Check for mistakes.

except HTTPException:
raise
except Exception as e:
logger.error(f"Error verifying audit blockchain for {audit_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Failed to verify audit integrity")
2 changes: 2 additions & 0 deletions backend/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ class EscalationAuditResponse(BaseModel):
new_authority: str = Field(..., description="New authority after escalation")
timestamp: datetime = Field(..., description="When the escalation occurred")
reason: str = Field(..., description="Reason for escalation (SLA_BREACH, SEVERITY_UPGRADE, MANUAL)")
integrity_hash: Optional[str] = Field(None, description="Cryptographic integrity seal")
previous_integrity_hash: Optional[str] = Field(None, description="Linked hash for O(1) verification")

class GrievanceSummaryResponse(BaseModel):
id: int = Field(..., description="Grievance ID")
Expand Down
Loading
Loading