From 02b521fb02f7b9124433f453d4a3ca9956ae0d2a Mon Sep 17 00:00:00 2001 From: Ayush Raj Chourasia Date: Thu, 2 Apr 2026 21:37:30 +0000 Subject: [PATCH 1/3] fix(auth): stop trusting X-User-Id for identity --- src/extension_shield/api/auth_identity.py | 22 +++++++++++++ src/extension_shield/api/main.py | 20 +----------- tests/api/test_user_identity_source.py | 38 +++++++++++++++++++++++ 3 files changed, 61 insertions(+), 19 deletions(-) create mode 100644 src/extension_shield/api/auth_identity.py create mode 100644 tests/api/test_user_identity_source.py diff --git a/src/extension_shield/api/auth_identity.py b/src/extension_shield/api/auth_identity.py new file mode 100644 index 0000000..ecdf857 --- /dev/null +++ b/src/extension_shield/api/auth_identity.py @@ -0,0 +1,22 @@ +""" +Authentication identity helpers for API routes. + +These helpers intentionally avoid importing heavy runtime dependencies so they +can be tested in isolation. +""" + +from typing import Any + + +def get_user_id(request: Any) -> str: + """ + Best-effort user identifier. + + Use only Supabase-authenticated user_id (JWT `sub`) from request state. + Never trust user-controlled headers for identity. + """ + state_user = getattr(getattr(request, "state", None), "user_id", None) + if state_user: + return str(state_user) + + return "anon" diff --git a/src/extension_shield/api/main.py b/src/extension_shield/api/main.py index 3ebbc4c..22f0dd1 100644 --- a/src/extension_shield/api/main.py +++ b/src/extension_shield/api/main.py @@ -43,6 +43,7 @@ from extension_shield.workflow.state import WorkflowState, WorkflowStatus from extension_shield.api.database import db, SupabaseDatabase, _is_extension_id from extension_shield.api.supabase_auth import get_current_user_id as _get_current_user_id +from extension_shield.api.auth_identity import get_user_id as _get_user_id from extension_shield.core.config import get_settings from extension_shield.utils.mode import require_cloud, get_feature_flags, is_oss_telemetry_allowed, require_cloud_dep from extension_shield.api.csp_middleware import CSPMiddleware @@ -385,25 +386,6 @@ async def add_security_headers(request: Request, call_next): deep_scan_usage: Dict[str, Dict[str, int]] = {} -def _get_user_id(request: Request) -> str: - """ - Best-effort user identifier. - - Prefer Supabase-authenticated user_id (JWT `sub`) when available. - If absent, allow an optional `X-User-Id` header for local/dev usage. - No IP-based fallback (privacy-first). - """ - state_user = getattr(getattr(request, "state", None), "user_id", None) - if state_user: - return str(state_user) - - header_user = request.headers.get("x-user-id") or request.headers.get("X-User-Id") - if header_user: - return header_user.strip() - - return "anon" - - def _get_client_ip(request: Request) -> str: """ Get the client's IP address for rate limiting anonymous users. diff --git a/tests/api/test_user_identity_source.py b/tests/api/test_user_identity_source.py new file mode 100644 index 0000000..4407aa4 --- /dev/null +++ b/tests/api/test_user_identity_source.py @@ -0,0 +1,38 @@ +""" +Tests for API user identity extraction helper. + +Security regression coverage: +- Identity must come from authenticated request state only. +- X-User-Id header must never be trusted. +""" + +from types import SimpleNamespace + +from extension_shield.api.auth_identity import get_user_id + + +def test_get_user_id_prefers_authenticated_state_user_id(): + request = SimpleNamespace( + state=SimpleNamespace(user_id="real-user-123"), + headers={"X-User-Id": "attacker-id"}, + ) + + assert get_user_id(request) == "real-user-123" + + +def test_get_user_id_ignores_x_user_id_header_when_not_authenticated(): + request = SimpleNamespace( + state=SimpleNamespace(user_id=None), + headers={"X-User-Id": "attacker-id"}, + ) + + assert get_user_id(request) == "anon" + + +def test_get_user_id_returns_anon_without_authenticated_user(): + request = SimpleNamespace( + state=SimpleNamespace(user_id=None), + headers={}, + ) + + assert get_user_id(request) == "anon" From db9a224c2bd167dcdc551cdf7f9488b23a4c5874 Mon Sep 17 00:00:00 2001 From: Ayush Raj Chourasia Date: Thu, 2 Apr 2026 21:46:50 +0000 Subject: [PATCH 2/3] fix(auth): block private file access for non-owners --- src/extension_shield/api/auth_identity.py | 20 ++++++++++++++++++++ src/extension_shield/api/main.py | 11 ++++++++++- tests/api/test_user_identity_source.py | 22 +++++++++++++++++++++- 3 files changed, 51 insertions(+), 2 deletions(-) diff --git a/src/extension_shield/api/auth_identity.py b/src/extension_shield/api/auth_identity.py index ecdf857..e039b43 100644 --- a/src/extension_shield/api/auth_identity.py +++ b/src/extension_shield/api/auth_identity.py @@ -20,3 +20,23 @@ def get_user_id(request: Any) -> str: return str(state_user) return "anon" + + +def can_view_private_scan(request_user_id: Any, scan_result: Any) -> bool: + """ + Return whether the requester may view a scan result that may be private. + + Public scan results remain visible to everyone. Private scan results require + the authenticated request user_id to match the stored owner user_id. + """ + if not isinstance(scan_result, dict): + return False + + if scan_result.get("visibility") != "private": + return True + + owner_user_id = scan_result.get("user_id") + if not owner_user_id or not request_user_id: + return False + + return str(owner_user_id) == str(request_user_id) diff --git a/src/extension_shield/api/main.py b/src/extension_shield/api/main.py index 22f0dd1..27a6244 100644 --- a/src/extension_shield/api/main.py +++ b/src/extension_shield/api/main.py @@ -43,7 +43,10 @@ from extension_shield.workflow.state import WorkflowState, WorkflowStatus from extension_shield.api.database import db, SupabaseDatabase, _is_extension_id from extension_shield.api.supabase_auth import get_current_user_id as _get_current_user_id -from extension_shield.api.auth_identity import get_user_id as _get_user_id +from extension_shield.api.auth_identity import ( + can_view_private_scan, + get_user_id as _get_user_id, +) from extension_shield.core.config import get_settings from extension_shield.utils.mode import require_cloud, get_feature_flags, is_oss_telemetry_allowed, require_cloud_dep from extension_shield.api.csp_middleware import CSPMiddleware @@ -3020,6 +3023,9 @@ async def get_file_list(extension_id: str, http_request: Request) -> FileListRes if not results: raise HTTPException(status_code=404, detail="Extension not found") + if not can_view_private_scan(user_id, results): + raise HTTPException(status_code=404, detail="File not found") + extracted_path = results.get("extracted_path") if not extracted_path or not os.path.exists(extracted_path): raise HTTPException(status_code=404, detail="Extracted files not found") @@ -3051,6 +3057,9 @@ async def get_file_content(extension_id: str, file_path: str, http_request: Requ if not results: raise HTTPException(status_code=404, detail="Extension not found") + if not can_view_private_scan(user_id, results): + raise HTTPException(status_code=404, detail="File not found") + extracted_path = results.get("extracted_path") if not extracted_path: raise HTTPException(status_code=404, detail="Extracted files not found") diff --git a/tests/api/test_user_identity_source.py b/tests/api/test_user_identity_source.py index 4407aa4..b5cc7d8 100644 --- a/tests/api/test_user_identity_source.py +++ b/tests/api/test_user_identity_source.py @@ -8,7 +8,7 @@ from types import SimpleNamespace -from extension_shield.api.auth_identity import get_user_id +from extension_shield.api.auth_identity import can_view_private_scan, get_user_id def test_get_user_id_prefers_authenticated_state_user_id(): @@ -36,3 +36,23 @@ def test_get_user_id_returns_anon_without_authenticated_user(): ) assert get_user_id(request) == "anon" + + +def test_can_view_private_scan_allows_public_results(): + result = {"visibility": "public", "user_id": "owner-123"} + + assert can_view_private_scan(None, result) + assert can_view_private_scan("any-user", result) + + +def test_can_view_private_scan_blocks_non_owner_for_private_result(): + result = {"visibility": "private", "user_id": "owner-123"} + + assert not can_view_private_scan(None, result) + assert not can_view_private_scan("other-user", result) + + +def test_can_view_private_scan_allows_owner_for_private_result(): + result = {"visibility": "private", "user_id": "owner-123"} + + assert can_view_private_scan("owner-123", result) From 2ffa01090da0da2fb976766ec42ecb993428ba91 Mon Sep 17 00:00:00 2001 From: Ayush Raj Chourasia Date: Thu, 2 Apr 2026 21:59:12 +0000 Subject: [PATCH 3/3] fix(security): use constant-time admin key checks --- src/extension_shield/api/main.py | 9 +++-- tests/api/test_admin_endpoints.py | 58 +++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/src/extension_shield/api/main.py b/src/extension_shield/api/main.py index 27a6244..249ef58 100644 --- a/src/extension_shield/api/main.py +++ b/src/extension_shield/api/main.py @@ -6,6 +6,7 @@ """ import base64 +import hmac import mimetypes import os from pathlib import Path @@ -454,7 +455,7 @@ def _require_admin_key(request: Request) -> None: detail="X-Admin-Key header is required" ) - if provided_key != admin_key: + if not hmac.compare_digest(str(provided_key), str(admin_key)): raise HTTPException( status_code=403, detail="Invalid admin API key" @@ -481,7 +482,11 @@ def _require_admin_or_telemetry_key(request: Request) -> None: status_code=403, detail="X-Admin-Key header is required" ) - valid = (admin_key and provided == admin_key) or (telemetry_key and provided == telemetry_key) + valid = ( + bool(admin_key) and hmac.compare_digest(str(provided), str(admin_key)) + ) or ( + bool(telemetry_key) and hmac.compare_digest(str(provided), str(telemetry_key)) + ) if not valid: raise HTTPException( status_code=403, diff --git a/tests/api/test_admin_endpoints.py b/tests/api/test_admin_endpoints.py index 4f6f665..2e8c49b 100644 --- a/tests/api/test_admin_endpoints.py +++ b/tests/api/test_admin_endpoints.py @@ -128,6 +128,32 @@ def test_delete_scan_with_correct_admin_key_succeeds(self, client, admin_key): # Should succeed (200 or 204 depending on implementation) assert response.status_code in [200, 204, 404] # 404 if extension not found in DB + def test_delete_scan_uses_constant_time_key_compare(self, client, admin_key): + """DELETE should validate admin key through compare_digest.""" + with patch("extension_shield.api.main.get_settings") as mock_get_settings, \ + patch("extension_shield.utils.mode.get_feature_flags") as mock_flags, \ + patch("extension_shield.api.main.hmac.compare_digest", return_value=False) as mock_compare: + from unittest.mock import MagicMock + settings = MagicMock() + settings.admin_api_key = admin_key + settings.telemetry_admin_key = None + mock_get_settings.return_value = settings + flags = MagicMock() + flags.mode = "cloud" + flags.telemetry_enabled = True + mock_flags.return_value = flags + + test_extension_id = "test-ext-123" + scan_results[test_extension_id] = {"extension_id": test_extension_id, "status": "completed"} + + response = client.delete( + f"/api/scan/{test_extension_id}", + headers={"X-Admin-Key": admin_key}, + ) + + assert response.status_code == 403 + mock_compare.assert_called_once() + def test_delete_scan_without_configured_admin_key_returns_403(self, client): """DELETE when admin key is not configured should return 403.""" with patch("extension_shield.api.main.get_settings") as mock_get_settings, \ @@ -266,6 +292,38 @@ def test_telemetry_summary_with_telemetry_key_succeeds(self, client, admin_key, assert response.status_code == 200 + def test_telemetry_summary_uses_constant_time_key_compare(self, client, admin_key, telemetry_key): + """GET should validate admin or telemetry key through compare_digest.""" + with patch("extension_shield.api.main.get_settings") as mock_get_settings, \ + patch("extension_shield.utils.mode.get_feature_flags") as mock_flags, \ + patch("extension_shield.api.main.hmac.compare_digest", side_effect=[False, True]) as mock_compare: + from unittest.mock import MagicMock + settings = MagicMock() + settings.admin_api_key = admin_key + settings.telemetry_admin_key = telemetry_key + mock_get_settings.return_value = settings + flags = MagicMock() + flags.mode = "cloud" + flags.telemetry_enabled = True + mock_flags.return_value = flags + + with patch("extension_shield.api.main.db") as mock_db: + mock_db.get_page_view_summary.return_value = { + "days": 14, + "start_day": None, + "end_day": None, + "by_day": {}, + "by_path": {}, + "rows": [], + } + response = client.get( + "/api/telemetry/summary", + headers={"X-Admin-Key": telemetry_key}, + ) + + assert response.status_code == 200 + assert mock_compare.call_count == 2 + def test_telemetry_summary_falls_back_to_admin_key(self, client, admin_key): """GET should fallback to ADMIN_API_KEY when TELEMETRY_ADMIN_KEY is not set.""" with patch("extension_shield.api.main.get_settings") as mock_get_settings, \