-
Notifications
You must be signed in to change notification settings - Fork 77
fix(auth): remove X-User-Id identity trust #76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| """ | ||
| Authentication identity helpers for API routes. | ||
|
|
||
| These helpers intentionally avoid importing heavy runtime dependencies so they | ||
| can be tested in isolation. | ||
| """ | ||
|
|
||
| from typing import Any | ||
|
|
||
|
|
||
| def get_user_id(request: Any) -> str: | ||
| """ | ||
| Best-effort user identifier. | ||
|
|
||
| Use only Supabase-authenticated user_id (JWT `sub`) from request state. | ||
| Never trust user-controlled headers for identity. | ||
| """ | ||
| state_user = getattr(getattr(request, "state", None), "user_id", None) | ||
| if state_user: | ||
| return str(state_user) | ||
|
|
||
| return "anon" | ||
|
|
||
|
|
||
| def can_view_private_scan(request_user_id: Any, scan_result: Any) -> bool: | ||
| """ | ||
| Return whether the requester may view a scan result that may be private. | ||
|
|
||
| Public scan results remain visible to everyone. Private scan results require | ||
| the authenticated request user_id to match the stored owner user_id. | ||
| """ | ||
| if not isinstance(scan_result, dict): | ||
| return False | ||
|
|
||
| if scan_result.get("visibility") != "private": | ||
| return True | ||
|
|
||
| owner_user_id = scan_result.get("user_id") | ||
| if not owner_user_id or not request_user_id: | ||
| return False | ||
|
|
||
| return str(owner_user_id) == str(request_user_id) |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -6,6 +6,7 @@ | |||||
| """ | ||||||
|
|
||||||
| import base64 | ||||||
| import hmac | ||||||
| import mimetypes | ||||||
| import os | ||||||
| from pathlib import Path | ||||||
|
|
@@ -43,6 +44,10 @@ | |||||
| from extension_shield.workflow.state import WorkflowState, WorkflowStatus | ||||||
| from extension_shield.api.database import db, SupabaseDatabase, _is_extension_id | ||||||
| from extension_shield.api.supabase_auth import get_current_user_id as _get_current_user_id | ||||||
| from extension_shield.api.auth_identity import ( | ||||||
| can_view_private_scan, | ||||||
| get_user_id as _get_user_id, | ||||||
| ) | ||||||
| from extension_shield.core.config import get_settings | ||||||
| from extension_shield.utils.mode import require_cloud, get_feature_flags, is_oss_telemetry_allowed, require_cloud_dep | ||||||
| from extension_shield.api.csp_middleware import CSPMiddleware | ||||||
|
|
@@ -385,25 +390,6 @@ async def add_security_headers(request: Request, call_next): | |||||
| deep_scan_usage: Dict[str, Dict[str, int]] = {} | ||||||
|
|
||||||
|
|
||||||
| def _get_user_id(request: Request) -> str: | ||||||
| """ | ||||||
| Best-effort user identifier. | ||||||
|
|
||||||
| Prefer Supabase-authenticated user_id (JWT `sub`) when available. | ||||||
| If absent, allow an optional `X-User-Id` header for local/dev usage. | ||||||
| No IP-based fallback (privacy-first). | ||||||
| """ | ||||||
| state_user = getattr(getattr(request, "state", None), "user_id", None) | ||||||
| if state_user: | ||||||
| return str(state_user) | ||||||
|
|
||||||
| header_user = request.headers.get("x-user-id") or request.headers.get("X-User-Id") | ||||||
| if header_user: | ||||||
| return header_user.strip() | ||||||
|
|
||||||
| return "anon" | ||||||
|
|
||||||
|
|
||||||
| def _get_client_ip(request: Request) -> str: | ||||||
| """ | ||||||
| Get the client's IP address for rate limiting anonymous users. | ||||||
|
|
@@ -469,7 +455,7 @@ def _require_admin_key(request: Request) -> None: | |||||
| detail="X-Admin-Key header is required" | ||||||
| ) | ||||||
|
|
||||||
| if provided_key != admin_key: | ||||||
| if not hmac.compare_digest(str(provided_key), str(admin_key)): | ||||||
| raise HTTPException( | ||||||
| status_code=403, | ||||||
| detail="Invalid admin API key" | ||||||
|
|
@@ -496,7 +482,11 @@ def _require_admin_or_telemetry_key(request: Request) -> None: | |||||
| status_code=403, | ||||||
| detail="X-Admin-Key header is required" | ||||||
| ) | ||||||
| valid = (admin_key and provided == admin_key) or (telemetry_key and provided == telemetry_key) | ||||||
| valid = ( | ||||||
| bool(admin_key) and hmac.compare_digest(str(provided), str(admin_key)) | ||||||
| ) or ( | ||||||
| bool(telemetry_key) and hmac.compare_digest(str(provided), str(telemetry_key)) | ||||||
| ) | ||||||
| if not valid: | ||||||
| raise HTTPException( | ||||||
| status_code=403, | ||||||
|
|
@@ -3038,6 +3028,9 @@ async def get_file_list(extension_id: str, http_request: Request) -> FileListRes | |||||
| if not results: | ||||||
| raise HTTPException(status_code=404, detail="Extension not found") | ||||||
|
|
||||||
| if not can_view_private_scan(user_id, results): | ||||||
| raise HTTPException(status_code=404, detail="File not found") | ||||||
|
|
||||||
| extracted_path = results.get("extracted_path") | ||||||
| if not extracted_path or not os.path.exists(extracted_path): | ||||||
| raise HTTPException(status_code=404, detail="Extracted files not found") | ||||||
|
|
@@ -3069,6 +3062,9 @@ async def get_file_content(extension_id: str, file_path: str, http_request: Requ | |||||
| if not results: | ||||||
| raise HTTPException(status_code=404, detail="Extension not found") | ||||||
|
|
||||||
| if not can_view_private_scan(user_id, results): | ||||||
| raise HTTPException(status_code=404, detail="File not found") | ||||||
|
||||||
| raise HTTPException(status_code=404, detail="File not found") | |
| raise HTTPException(status_code=404, detail="Scan results not found") |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| """ | ||
| Tests for API user identity extraction helper. | ||
|
|
||
| Security regression coverage: | ||
| - Identity must come from authenticated request state only. | ||
| - X-User-Id header must never be trusted. | ||
| """ | ||
|
Comment on lines
+1
to
+7
|
||
|
|
||
| from types import SimpleNamespace | ||
|
|
||
| from extension_shield.api.auth_identity import can_view_private_scan, get_user_id | ||
|
|
||
|
|
||
| def test_get_user_id_prefers_authenticated_state_user_id(): | ||
| request = SimpleNamespace( | ||
| state=SimpleNamespace(user_id="real-user-123"), | ||
| headers={"X-User-Id": "attacker-id"}, | ||
| ) | ||
|
|
||
| assert get_user_id(request) == "real-user-123" | ||
|
|
||
|
|
||
| def test_get_user_id_ignores_x_user_id_header_when_not_authenticated(): | ||
| request = SimpleNamespace( | ||
| state=SimpleNamespace(user_id=None), | ||
| headers={"X-User-Id": "attacker-id"}, | ||
| ) | ||
|
|
||
| assert get_user_id(request) == "anon" | ||
|
|
||
|
|
||
| def test_get_user_id_returns_anon_without_authenticated_user(): | ||
| request = SimpleNamespace( | ||
| state=SimpleNamespace(user_id=None), | ||
| headers={}, | ||
| ) | ||
|
|
||
| assert get_user_id(request) == "anon" | ||
|
|
||
|
|
||
| def test_can_view_private_scan_allows_public_results(): | ||
| result = {"visibility": "public", "user_id": "owner-123"} | ||
|
|
||
| assert can_view_private_scan(None, result) | ||
| assert can_view_private_scan("any-user", result) | ||
|
|
||
|
|
||
| def test_can_view_private_scan_blocks_non_owner_for_private_result(): | ||
| result = {"visibility": "private", "user_id": "owner-123"} | ||
|
|
||
| assert not can_view_private_scan(None, result) | ||
| assert not can_view_private_scan("other-user", result) | ||
|
|
||
|
|
||
| def test_can_view_private_scan_allows_owner_for_private_result(): | ||
| result = {"visibility": "private", "user_id": "owner-123"} | ||
|
|
||
| assert can_view_private_scan("owner-123", result) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The 404 raised on private-scan authorization failure uses
detail="File not found", but this endpoint returns a file list (not a single file). Consider using a more accurate generic detail (e.g., "Not found" / "Scan results not found") to avoid confusing clients while still not leaking existence.