diff --git a/docker-compose.yml b/docker-compose.yml index 0a30cc2..e0cb150 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,7 +8,7 @@ services: dockerfile: Dockerfile container_name: extension-shield ports: - - "8007:8007" + - "127.0.0.1:8007:8007" # Localhost only — not accessible from the local network volumes: # Persist downloaded extensions - ./extensions_storage:/app/extensions_storage diff --git a/docs/GET_STARTED.md b/docs/GET_STARTED.md index 5ac8c13..86a788c 100644 --- a/docs/GET_STARTED.md +++ b/docs/GET_STARTED.md @@ -87,6 +87,8 @@ cp .env.example .env docker compose up --build # → API at http://localhost:8007 ``` + +> **Network note:** The Docker config binds to `127.0.0.1:8007` (localhost only). This means the API is not reachable from other devices on your network. If you need external access (e.g. a shared dev server), change the binding to `0.0.0.0:8007:8007` in `docker-compose.yml` and ensure your firewall rules are appropriate. For production, place the API behind a reverse proxy (nginx, Caddy) rather than exposing it directly. --- diff --git a/src/extension_shield/api/main.py b/src/extension_shield/api/main.py index cbcd50c..be55b28 100644 --- a/src/extension_shield/api/main.py +++ b/src/extension_shield/api/main.py @@ -389,6 +389,9 @@ async def add_security_headers(request: Request, call_next): ANONYMOUS_DAILY_DEEP_SCAN_LIMIT = 1 # anonymous (IP-based) users – after 1 scan, prompt login # deep_scan_usage[user_id][YYYY-MM-DD] = used_count deep_scan_usage: Dict[str, Dict[str, int]] = {} +# Lock to prevent race conditions when multiple concurrent requests check/increment the same counter +import threading as _threading +_deep_scan_usage_lock = _threading.Lock() def _get_user_id(request: Request) -> str: @@ -541,13 +544,14 @@ def _deep_scan_limit_status(rate_limit_key: str) -> Dict[str, Any]: def _consume_deep_scan(user_id: str) -> Dict[str, Any]: - status = _deep_scan_limit_status(user_id) - if status["remaining"] <= 0: - return status - day_key = status["day_key"] - deep_scan_usage.setdefault(user_id, {}) - deep_scan_usage[user_id][day_key] = deep_scan_usage[user_id].get(day_key, 0) + 1 - return _deep_scan_limit_status(user_id) + with _deep_scan_usage_lock: + status = _deep_scan_limit_status(user_id) + if status["remaining"] <= 0: + return status + day_key = status["day_key"] + deep_scan_usage.setdefault(user_id, {}) + deep_scan_usage[user_id][day_key] = deep_scan_usage[user_id].get(day_key, 0) + 1 + return _deep_scan_limit_status(user_id) def _has_cached_results(extension_id: str) -> bool: @@ -3080,13 +3084,33 @@ async def get_file_content(extension_id: str, file_path: str, http_request: Requ # Construct full file path full_path = os.path.join(extracted_path, file_path) - # Security check: ensure path is within extracted directory - if not os.path.abspath(full_path).startswith(os.path.abspath(extracted_path)): + # Security check: use commonpath to prevent path traversal bypasses. + # os.path.abspath(...).startswith(...) is vulnerable when one path is a + # prefix of another directory name (e.g. /tmp/ext_abc vs /tmp/ext_abcdef). + abs_full = os.path.abspath(full_path) + abs_extracted = os.path.abspath(extracted_path) + try: + if os.path.commonpath([abs_extracted, abs_full]) != abs_extracted: + raise HTTPException(status_code=403, detail="Access denied") + except ValueError: + # commonpath raises ValueError on Windows when paths are on different drives raise HTTPException(status_code=403, detail="Access denied") if not os.path.exists(full_path): raise HTTPException(status_code=404, detail="File not found") + # Guard against reading very large files into memory (e.g. bundled assets). + _MAX_FILE_READ_BYTES = 5 * 1024 * 1024 # 5 MB + try: + file_size = os.path.getsize(full_path) + except OSError: + file_size = 0 + if file_size > _MAX_FILE_READ_BYTES: + raise HTTPException( + status_code=413, + detail=f"File too large to display ({file_size // 1024} KB). Maximum is {_MAX_FILE_READ_BYTES // 1024} KB.", + ) + try: with open(full_path, "r", encoding="utf-8") as f: content = f.read() diff --git a/src/extension_shield/core/manifest_parser.py b/src/extension_shield/core/manifest_parser.py index e8dc005..0403368 100644 --- a/src/extension_shield/core/manifest_parser.py +++ b/src/extension_shield/core/manifest_parser.py @@ -308,6 +308,15 @@ def parse(self) -> Optional[Dict[str, Any]]: logger.info("Parsing manifest from: %s", manifest_path) + # Guard against malformed or malicious manifests that are unreasonably large. + _MAX_MANIFEST_BYTES = 512 * 1024 # 512 KB + manifest_size = manifest_path.stat().st_size + if manifest_size > _MAX_MANIFEST_BYTES: + raise ValueError( + f"manifest.json is too large ({manifest_size} bytes). " + f"Maximum allowed size is {_MAX_MANIFEST_BYTES} bytes." + ) + try: with open(manifest_path, "r", encoding="utf-8") as f: raw_manifest = json.load(f)