Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 8 additions & 77 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Annotated, Any
from urllib.parse import urlencode, urlsplit, urlunsplit
from urllib.parse import unquote, urlencode

import httpx
from fastapi import Depends, FastAPI, Form, HTTPException, Query, Request
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse, Response
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from sqlalchemy import func, or_, select, update
Expand Down Expand Up @@ -58,6 +58,7 @@
Submission,
Wallet,
)
from app.security_headers import register_security_headers_middleware
from app.serializers import (
accepted_work_for_account,
account_accepted_summary,
Expand All @@ -79,66 +80,14 @@
templates = Jinja2Templates(directory=str(BASE_DIR / "templates"))
templates.env.globals["safe_public_url"] = public_url_or_none

SECURITY_HEADERS = {
"Content-Security-Policy": (
"default-src 'self'; "
"base-uri 'self'; "
"frame-ancestors 'none'; "
"form-action 'self'; "
"connect-src 'self'; "
"img-src 'self' data:; "
"object-src 'none'; "
"script-src 'self'; "
"style-src 'self'"
),
"Referrer-Policy": "no-referrer",
"Strict-Transport-Security": "max-age=31536000; includeSubDomains",
"X-Content-Type-Options": "nosniff",
"X-Frame-Options": "DENY",
}
GITHUB_LOGIN_RE = re.compile(r"^[a-z0-9](?:[a-z0-9-]{0,37}[a-z0-9])?$")
HEX_HASH_RE = re.compile(r"^[0-9a-f]{64}$")
API_DOCS_CSP = (
"default-src 'self'; "
"base-uri 'self'; "
"frame-ancestors 'none'; "
"form-action 'self'; "
"connect-src 'self'; "
"font-src 'self' data: https://fonts.gstatic.com; "
"img-src 'self' data: https://fastapi.tiangolo.com https://cdn.redoc.ly; "
"object-src 'none'; "
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net https://fonts.googleapis.com; "
"worker-src 'self' blob:"
)
API_DOCS_PATHS = {"/api/docs", "/api/redoc"}
SQLITE_INTEGER_MAX = 2**63 - 1
DEFAULT_ATTEMPT_TTL_SECONDS = 24 * 60 * 60
MIN_ATTEMPT_TTL_SECONDS = 60
MAX_ATTEMPT_TTL_SECONDS = 7 * 24 * 60 * 60


def _request_was_forwarded_https(request: Request) -> bool:
forwarded_proto = request.headers.get("x-forwarded-proto", "")
if forwarded_proto:
return forwarded_proto.split(",", 1)[0].strip().lower() == "https"
return request.url.scheme == "https"


def _preserve_forwarded_https_redirect(request: Request, response: Response) -> None:
if response.status_code not in {307, 308} or not _request_was_forwarded_https(request):
return
location = response.headers.get("location")
if not location:
return
parsed = urlsplit(location)
if parsed.scheme != "http" or parsed.netloc != request.url.netloc:
return
response.headers["location"] = urlunsplit(
("https", parsed.netloc, parsed.path, parsed.query, parsed.fragment)
)


def _issue_number_search_value(query: str) -> int | None:
if not query.isdigit():
return None
Expand Down Expand Up @@ -278,13 +227,17 @@ def _oauth_configured(settings: Settings) -> bool:


def _safe_next_path(next_path: str | None) -> str:
decoded_next_path = unquote(next_path) if next_path else ""
if (
not next_path
or not next_path.startswith("/")
or next_path.startswith("//")
or len(next_path) > 2048
or "\\" in next_path
or decoded_next_path.startswith("//")
or "\\" in decoded_next_path
or any(ord(char) < 32 or 127 <= ord(char) < 160 for char in next_path)
or any(ord(char) < 32 or 127 <= ord(char) < 160 for char in decoded_next_path)
):
return "/me"
return next_path
Expand Down Expand Up @@ -486,29 +439,7 @@ def post_only_route() -> None:
headers={"Allow": "POST"},
)

@app.middleware("http")
async def add_security_headers(request: Request, call_next: Any) -> Any:
original_method = request.scope["method"]
if original_method == "HEAD":
request.scope["method"] = "GET"
try:
response = await call_next(request)
finally:
request.scope["method"] = original_method
if original_method == "HEAD":
headers = dict(response.headers)
headers["content-length"] = "0"
response = Response(
status_code=response.status_code,
headers=headers,
media_type=response.media_type,
)
if request.url.path in API_DOCS_PATHS:
response.headers["Content-Security-Policy"] = API_DOCS_CSP
_preserve_forwarded_https_redirect(request, response)
for name, value in SECURITY_HEADERS.items():
response.headers.setdefault(name, value)
return response
register_security_headers_middleware(app)

static_dir = BASE_DIR / "static"
if static_dir.exists():
Expand Down
94 changes: 94 additions & 0 deletions app/security_headers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from __future__ import annotations

from typing import Any
from urllib.parse import urlsplit, urlunsplit

from fastapi import FastAPI, Request
from fastapi.responses import Response

SECURITY_HEADERS = {
"Content-Security-Policy": (
"default-src 'self'; "
"base-uri 'self'; "
"frame-ancestors 'none'; "
"form-action 'self'; "
"connect-src 'self'; "
"img-src 'self' data:; "
"object-src 'none'; "
"script-src 'self'; "
"style-src 'self'"
),
"Referrer-Policy": "no-referrer",
"Strict-Transport-Security": "max-age=31536000; includeSubDomains",
"X-Content-Type-Options": "nosniff",
"X-Frame-Options": "DENY",
}
API_DOCS_CSP = (
"default-src 'self'; "
"base-uri 'self'; "
"frame-ancestors 'none'; "
"form-action 'self'; "
"connect-src 'self'; "
"font-src 'self' data: https://fonts.gstatic.com; "
"img-src 'self' data: https://fastapi.tiangolo.com https://cdn.redoc.ly; "
"object-src 'none'; "
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net https://fonts.googleapis.com; "
"worker-src 'self' blob:"
)
API_DOCS_PATHS = {"/api/docs", "/api/redoc"}


def request_was_forwarded_https(request: Request) -> bool:
forwarded_proto = request.headers.get("x-forwarded-proto", "")
if forwarded_proto:
return forwarded_proto.split(",", 1)[0].strip().lower() == "https"
return request.url.scheme == "https"


def preserve_forwarded_https_redirect(request: Request, response: Response) -> None:
if response.status_code not in {307, 308} or not request_was_forwarded_https(request):
return
location = response.headers.get("location")
if not location:
return
parsed = urlsplit(location)
if parsed.scheme != "http" or parsed.netloc != request.url.netloc:
return
response.headers["location"] = urlunsplit(
("https", parsed.netloc, parsed.path, parsed.query, parsed.fragment)
)


def apply_security_headers(request: Request, response: Response) -> Response:
if request.url.path in API_DOCS_PATHS:
response.headers["Content-Security-Policy"] = API_DOCS_CSP
preserve_forwarded_https_redirect(request, response)
for name, value in SECURITY_HEADERS.items():
response.headers.setdefault(name, value)
return response


async def security_headers_middleware(request: Request, call_next: Any) -> Response:
original_method = request.scope["method"]
if original_method == "HEAD":
request.scope["method"] = "GET"
try:
response = await call_next(request)
finally:
request.scope["method"] = original_method
if original_method == "HEAD":
headers = dict(response.headers)
headers["content-length"] = "0"
response = Response(
status_code=response.status_code,
headers=headers,
media_type=response.media_type,
)
Comment on lines +80 to +87
return apply_security_headers(request, response)


def register_security_headers_middleware(app: FastAPI) -> None:
@app.middleware("http")
async def add_security_headers(request: Request, call_next: Any) -> Response:
return await security_headers_middleware(request, call_next)
77 changes: 77 additions & 0 deletions tests/test_security_headers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from __future__ import annotations

from fastapi.testclient import TestClient

from app.db import create_schema, session_scope
from app.ledger.service import create_bounty, ensure_genesis
from app.main import create_app
from app.security_headers import API_DOCS_CSP, SECURITY_HEADERS


def test_security_header_defaults_are_applied_to_browser_routes(sqlite_url: str) -> None:
client = TestClient(create_app(database_url=sqlite_url, webhook_secret="secret"))

response = client.get("/")

for name, value in SECURITY_HEADERS.items():
assert response.headers[name.lower()] == value


def test_security_header_middleware_preserves_head_as_get(sqlite_url: str) -> None:
create_schema(sqlite_url)
with session_scope(sqlite_url) as session:
ensure_genesis(session)
create_bounty(
session,
repo="ramimbo/mergework",
issue_number=320,
issue_url="https://github.com/ramimbo/mergework/issues/320",
title="Security header middleware",
reward_mrwk="25",
acceptance="HEAD requests should keep GET route semantics without a body.",
)
client = TestClient(create_app(database_url=sqlite_url, webhook_secret="secret"))
Comment on lines +21 to +33

response = client.head("/api/v1/bounties")

assert response.status_code == 200
assert response.content == b""
assert response.headers["content-length"] == "0"
assert response.headers["x-frame-options"] == "DENY"


def test_api_docs_use_relaxed_docs_csp(sqlite_url: str) -> None:
client = TestClient(create_app(database_url=sqlite_url, webhook_secret="secret"))

response = client.get("/api/docs")

assert response.status_code == 200
assert response.headers["content-security-policy"] == API_DOCS_CSP


def test_forwarded_https_redirects_keep_https_scheme(sqlite_url: str) -> None:
create_schema(sqlite_url)
with session_scope(sqlite_url) as session:
ensure_genesis(session)
bounty = create_bounty(
session,
repo="ramimbo/mergework",
issue_number=321,
issue_url="https://github.com/ramimbo/mergework/issues/321",
title="Forwarded HTTPS redirect",
reward_mrwk="25",
acceptance="Trailing slash redirects should not downgrade public HTTPS requests.",
)
client = TestClient(
create_app(database_url=sqlite_url, webhook_secret="secret"),
base_url="http://mrwk.ltclab.site",
)
Comment on lines +53 to +68

response = client.get(
f"/bounties/{bounty.id}/",
headers={"x-forwarded-proto": "https"},
follow_redirects=False,
)

assert response.status_code == 307
assert response.headers["location"] == f"https://mrwk.ltclab.site/bounties/{bounty.id}"
Loading