diff --git a/backend/routers/issues.py b/backend/routers/issues.py index de27e1a9..542896e2 100644 --- a/backend/routers/issues.py +++ b/backend/routers/issues.py @@ -647,6 +647,7 @@ async def verify_blockchain_integrity(issue_id: int, db: Session = Depends(get_d # Determine previous hash (use stored link or fallback for legacy records) prev_hash = current_issue.previous_integrity_hash + chain_link_valid = True if prev_hash is None: # Fallback for legacy records created before O(1) optimization @@ -654,13 +655,22 @@ async def verify_blockchain_integrity(issue_id: int, db: Session = Depends(get_d lambda: db.query(Issue.integrity_hash).filter(Issue.id < issue_id).order_by(Issue.id.desc()).first() ) prev_hash = prev_issue_hash[0] if prev_issue_hash and prev_issue_hash[0] else "" + else: + # Cross-check: verify stored previous_integrity_hash matches the actual predecessor in DB. + # This guards against concurrent creation/cache races or tampered previous_integrity_hash values. + actual_prev = await run_in_threadpool( + lambda: db.query(Issue.integrity_hash).filter(Issue.id < issue_id).order_by(Issue.id.desc()).first() + ) + actual_prev_hash = actual_prev[0] if actual_prev and actual_prev[0] else "" + if prev_hash != actual_prev_hash: + chain_link_valid = False # Recompute hash based on current data and previous hash # Chaining logic: hash(description|category|prev_hash) hash_content = f"{current_issue.description}|{current_issue.category}|{prev_hash}" computed_hash = hashlib.sha256(hash_content.encode()).hexdigest() - is_valid = (computed_hash == current_issue.integrity_hash) + is_valid = chain_link_valid and (computed_hash == current_issue.integrity_hash) if is_valid: message = "Integrity verified. This report is cryptographically sealed and has not been tampered with." diff --git a/tests/test_blockchain.py b/tests/test_blockchain.py index 341ecf49..a306d29d 100644 --- a/tests/test_blockchain.py +++ b/tests/test_blockchain.py @@ -79,6 +79,41 @@ def test_blockchain_verification_failure(client, db_session): assert data["is_valid"] == False assert data["message"].startswith("Integrity check failed") +def test_blockchain_verification_tampered_previous_hash(client, db_session): + # Create a genuine predecessor issue + hash1_content = "Real predecessor|Road|" + hash1 = hashlib.sha256(hash1_content.encode()).hexdigest() + issue1 = Issue( + description="Real predecessor", + category="Road", + integrity_hash=hash1 + ) + db_session.add(issue1) + db_session.commit() + db_session.refresh(issue1) + + # Create a second issue whose previous_integrity_hash is forged (doesn't match issue1's hash) + forged_prev = "forgedprevioushashvalue" + hash2_content = f"Second issue|Garbage|{forged_prev}" + hash2 = hashlib.sha256(hash2_content.encode()).hexdigest() + issue2 = Issue( + description="Second issue", + category="Garbage", + integrity_hash=hash2, + previous_integrity_hash=forged_prev, + ) + db_session.add(issue2) + db_session.commit() + db_session.refresh(issue2) + + # Verification must fail: stored previous_integrity_hash doesn't match the DB predecessor + response = client.get(f"/api/issues/{issue2.id}/blockchain-verify") + assert response.status_code == 200 + data = response.json() + assert data["is_valid"] == False + assert data["message"].startswith("Integrity check failed") + + def test_upvote_optimization(client, db_session): issue = Issue( description="Test issue for upvote",