diff --git a/src/extension_shield/api/main.py b/src/extension_shield/api/main.py index 3ebbc4c..bf77ec7 100644 --- a/src/extension_shield/api/main.py +++ b/src/extension_shield/api/main.py @@ -6,6 +6,7 @@ """ import base64 +import hmac import mimetypes import os from pathlib import Path @@ -469,7 +470,7 @@ def _require_admin_key(request: Request) -> None: detail="X-Admin-Key header is required" ) - if provided_key != admin_key: + if not hmac.compare_digest(str(provided_key), str(admin_key)): raise HTTPException( status_code=403, detail="Invalid admin API key" @@ -496,7 +497,11 @@ def _require_admin_or_telemetry_key(request: Request) -> None: status_code=403, detail="X-Admin-Key header is required" ) - valid = (admin_key and provided == admin_key) or (telemetry_key and provided == telemetry_key) + valid = ( + bool(admin_key) and hmac.compare_digest(str(provided), str(admin_key)) + ) or ( + bool(telemetry_key) and hmac.compare_digest(str(provided), str(telemetry_key)) + ) if not valid: raise HTTPException( status_code=403, diff --git a/tests/api/test_admin_endpoints.py b/tests/api/test_admin_endpoints.py index 4f6f665..2e8c49b 100644 --- a/tests/api/test_admin_endpoints.py +++ b/tests/api/test_admin_endpoints.py @@ -128,6 +128,32 @@ def test_delete_scan_with_correct_admin_key_succeeds(self, client, admin_key): # Should succeed (200 or 204 depending on implementation) assert response.status_code in [200, 204, 404] # 404 if extension not found in DB + def test_delete_scan_uses_constant_time_key_compare(self, client, admin_key): + """DELETE should validate admin key through compare_digest.""" + with patch("extension_shield.api.main.get_settings") as mock_get_settings, \ + patch("extension_shield.utils.mode.get_feature_flags") as mock_flags, \ + patch("extension_shield.api.main.hmac.compare_digest", return_value=False) as mock_compare: + from unittest.mock import MagicMock + settings = MagicMock() + settings.admin_api_key = admin_key + settings.telemetry_admin_key = None + mock_get_settings.return_value = settings + flags = MagicMock() + flags.mode = "cloud" + flags.telemetry_enabled = True + mock_flags.return_value = flags + + test_extension_id = "test-ext-123" + scan_results[test_extension_id] = {"extension_id": test_extension_id, "status": "completed"} + + response = client.delete( + f"/api/scan/{test_extension_id}", + headers={"X-Admin-Key": admin_key}, + ) + + assert response.status_code == 403 + mock_compare.assert_called_once() + def test_delete_scan_without_configured_admin_key_returns_403(self, client): """DELETE when admin key is not configured should return 403.""" with patch("extension_shield.api.main.get_settings") as mock_get_settings, \ @@ -266,6 +292,38 @@ def test_telemetry_summary_with_telemetry_key_succeeds(self, client, admin_key, assert response.status_code == 200 + def test_telemetry_summary_uses_constant_time_key_compare(self, client, admin_key, telemetry_key): + """GET should validate admin or telemetry key through compare_digest.""" + with patch("extension_shield.api.main.get_settings") as mock_get_settings, \ + patch("extension_shield.utils.mode.get_feature_flags") as mock_flags, \ + patch("extension_shield.api.main.hmac.compare_digest", side_effect=[False, True]) as mock_compare: + from unittest.mock import MagicMock + settings = MagicMock() + settings.admin_api_key = admin_key + settings.telemetry_admin_key = telemetry_key + mock_get_settings.return_value = settings + flags = MagicMock() + flags.mode = "cloud" + flags.telemetry_enabled = True + mock_flags.return_value = flags + + with patch("extension_shield.api.main.db") as mock_db: + mock_db.get_page_view_summary.return_value = { + "days": 14, + "start_day": None, + "end_day": None, + "by_day": {}, + "by_path": {}, + "rows": [], + } + response = client.get( + "/api/telemetry/summary", + headers={"X-Admin-Key": telemetry_key}, + ) + + assert response.status_code == 200 + assert mock_compare.call_count == 2 + def test_telemetry_summary_falls_back_to_admin_key(self, client, admin_key): """GET should fallback to ADMIN_API_KEY when TELEMETRY_ADMIN_KEY is not set.""" with patch("extension_shield.api.main.get_settings") as mock_get_settings, \