Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ services:
dockerfile: Dockerfile
container_name: extension-shield
ports:
- "8007:8007"
- "127.0.0.1:8007:8007" # Localhost only — not accessible from the local network
volumes:
# Persist downloaded extensions
- ./extensions_storage:/app/extensions_storage
Expand Down
2 changes: 2 additions & 0 deletions docs/GET_STARTED.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ cp .env.example .env
docker compose up --build
# → API at http://localhost:8007
```

> **Network note:** The Docker config binds to `127.0.0.1:8007` (localhost only). This means the API is not reachable from other devices on your network. If you need external access (e.g. a shared dev server), change the binding to `0.0.0.0:8007:8007` in `docker-compose.yml` and ensure your firewall rules are appropriate. For production, place the API behind a reverse proxy (nginx, Caddy) rather than exposing it directly.
</details>

---
Expand Down
42 changes: 33 additions & 9 deletions src/extension_shield/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,9 @@ async def add_security_headers(request: Request, call_next):
ANONYMOUS_DAILY_DEEP_SCAN_LIMIT = 1 # anonymous (IP-based) users – after 1 scan, prompt login
# deep_scan_usage[user_id][YYYY-MM-DD] = used_count
deep_scan_usage: Dict[str, Dict[str, int]] = {}
# Lock to prevent race conditions when multiple concurrent requests check/increment the same counter
import threading as _threading
_deep_scan_usage_lock = _threading.Lock()


def _get_user_id(request: Request) -> str:
Expand Down Expand Up @@ -541,13 +544,14 @@ def _deep_scan_limit_status(rate_limit_key: str) -> Dict[str, Any]:


def _consume_deep_scan(user_id: str) -> Dict[str, Any]:
status = _deep_scan_limit_status(user_id)
if status["remaining"] <= 0:
return status
day_key = status["day_key"]
deep_scan_usage.setdefault(user_id, {})
deep_scan_usage[user_id][day_key] = deep_scan_usage[user_id].get(day_key, 0) + 1
return _deep_scan_limit_status(user_id)
with _deep_scan_usage_lock:
status = _deep_scan_limit_status(user_id)
if status["remaining"] <= 0:
return status
day_key = status["day_key"]
deep_scan_usage.setdefault(user_id, {})
deep_scan_usage[user_id][day_key] = deep_scan_usage[user_id].get(day_key, 0) + 1
return _deep_scan_limit_status(user_id)


def _has_cached_results(extension_id: str) -> bool:
Expand Down Expand Up @@ -3080,13 +3084,33 @@ async def get_file_content(extension_id: str, file_path: str, http_request: Requ
# Construct full file path
full_path = os.path.join(extracted_path, file_path)

# Security check: ensure path is within extracted directory
if not os.path.abspath(full_path).startswith(os.path.abspath(extracted_path)):
# Security check: use commonpath to prevent path traversal bypasses.
# os.path.abspath(...).startswith(...) is vulnerable when one path is a
# prefix of another directory name (e.g. /tmp/ext_abc vs /tmp/ext_abcdef).
abs_full = os.path.abspath(full_path)
abs_extracted = os.path.abspath(extracted_path)
try:
if os.path.commonpath([abs_extracted, abs_full]) != abs_extracted:
raise HTTPException(status_code=403, detail="Access denied")
except ValueError:
# commonpath raises ValueError on Windows when paths are on different drives
raise HTTPException(status_code=403, detail="Access denied")

if not os.path.exists(full_path):
raise HTTPException(status_code=404, detail="File not found")

# Guard against reading very large files into memory (e.g. bundled assets).
_MAX_FILE_READ_BYTES = 5 * 1024 * 1024 # 5 MB
try:
file_size = os.path.getsize(full_path)
except OSError:
file_size = 0
if file_size > _MAX_FILE_READ_BYTES:
raise HTTPException(
status_code=413,
detail=f"File too large to display ({file_size // 1024} KB). Maximum is {_MAX_FILE_READ_BYTES // 1024} KB.",
)

try:
with open(full_path, "r", encoding="utf-8") as f:
content = f.read()
Expand Down
9 changes: 9 additions & 0 deletions src/extension_shield/core/manifest_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,15 @@ def parse(self) -> Optional[Dict[str, Any]]:

logger.info("Parsing manifest from: %s", manifest_path)

# Guard against malformed or malicious manifests that are unreasonably large.
_MAX_MANIFEST_BYTES = 512 * 1024 # 512 KB
manifest_size = manifest_path.stat().st_size
if manifest_size > _MAX_MANIFEST_BYTES:
raise ValueError(
f"manifest.json is too large ({manifest_size} bytes). "
f"Maximum allowed size is {_MAX_MANIFEST_BYTES} bytes."
)

try:
with open(manifest_path, "r", encoding="utf-8") as f:
raw_manifest = json.load(f)
Expand Down
Loading