diff --git a/app/main.py b/app/main.py index 7e7c4f9c..1611f9b8 100644 --- a/app/main.py +++ b/app/main.py @@ -9,7 +9,7 @@ from datetime import UTC, datetime, timedelta from pathlib import Path from typing import Annotated, Any -from urllib.parse import urlencode, urlsplit, urlunsplit +from urllib.parse import unquote, urlencode, urlsplit, urlunsplit import httpx from fastapi import Depends, FastAPI, Form, HTTPException, Query, Request @@ -73,7 +73,7 @@ wallet_transfer_to_dict, ) from app.wallets import WalletError, normalize_wallet_address -from app.webhooks.github import handle_github_webhook +from app.webhooks.routes import register_github_webhook_route BASE_DIR = Path(__file__).resolve().parent templates = Jinja2Templates(directory=str(BASE_DIR / "templates")) @@ -278,13 +278,17 @@ def _oauth_configured(settings: Settings) -> bool: def _safe_next_path(next_path: str | None) -> str: + decoded_next_path = unquote(next_path) if next_path else "" if ( not next_path or not next_path.startswith("/") or next_path.startswith("//") or len(next_path) > 2048 or "\\" in next_path + or decoded_next_path.startswith("//") + or "\\" in decoded_next_path or any(ord(char) < 32 or 127 <= ord(char) < 160 for char in next_path) + or any(ord(char) < 32 or 127 <= ord(char) < 160 for char in decoded_next_path) ): return "/me" return next_path @@ -1094,20 +1098,12 @@ def api_activity(q: str | None = Query(None)) -> dict[str, Any]: with session_scope(db_url) as session: return activity_to_dict(session, q) - @app.post("/webhooks/github") - async def github_webhook(request: Request) -> JSONResponse: - body = await request.body() - headers = {key: value for key, value in request.headers.items()} - normalized = { - "X-GitHub-Delivery": headers.get("x-github-delivery", ""), - "X-GitHub-Event": headers.get("x-github-event", ""), - "X-Hub-Signature-256": headers.get("x-hub-signature-256", ""), - } - result = handle_github_webhook( - db_url, normalized, body, secret, settings.github_accepted_labelers - ) - code = 401 if result["status"] == "unauthorized" else 200 - return JSONResponse(result, status_code=code) + register_github_webhook_route( + app, + database_url=db_url, + webhook_secret=secret, + accepted_labelers=settings.github_accepted_labelers, + ) @app.post("/mcp") async def mcp(request: Request) -> Any: diff --git a/app/webhooks/routes.py b/app/webhooks/routes.py new file mode 100644 index 00000000..1734a106 --- /dev/null +++ b/app/webhooks/routes.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from collections.abc import Mapping +from typing import Any + +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse + +from app.webhooks.github import handle_github_webhook + +GITHUB_WEBHOOK_HEADERS = { + "X-GitHub-Delivery": "x-github-delivery", + "X-GitHub-Event": "x-github-event", + "X-Hub-Signature-256": "x-hub-signature-256", +} + + +def github_webhook_headers(headers: Mapping[str, str]) -> dict[str, str]: + normalized = {key.lower(): value for key, value in headers.items()} + return { + canonical: normalized.get(header_name, "") + for canonical, header_name in GITHUB_WEBHOOK_HEADERS.items() + } + + +def github_webhook_status_code(result: Mapping[str, Any]) -> int: + return 401 if result.get("status") == "unauthorized" else 200 + + +def register_github_webhook_route( + app: FastAPI, + *, + database_url: str, + webhook_secret: str, + accepted_labelers: tuple[str, ...] = (), +) -> None: + @app.post("/webhooks/github") + async def github_webhook(request: Request) -> JSONResponse: + body = await request.body() + result = handle_github_webhook( + database_url, + github_webhook_headers(request.headers), + body, + webhook_secret, + accepted_labelers, + ) + return JSONResponse(result, status_code=github_webhook_status_code(result)) diff --git a/tests/test_webhook_routes.py b/tests/test_webhook_routes.py new file mode 100644 index 00000000..37d594b1 --- /dev/null +++ b/tests/test_webhook_routes.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +import hashlib +import hmac + +from fastapi.testclient import TestClient + +from app.db import session_scope +from app.main import create_app +from app.models import WebhookEvent + + +def _signature(secret: str, body: bytes) -> str: + digest = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest() + return f"sha256={digest}" + + +def test_github_webhook_route_normalizes_headers(sqlite_url: str) -> None: + client = TestClient(create_app(database_url=sqlite_url, webhook_secret="secret")) + body = b'{"action":"opened"}' + + response = client.post( + "/webhooks/github", + content=body, + headers={ + "X-GitHub-Delivery": "delivery-route-ignored", + "X-GitHub-Event": "issues", + "X-Hub-Signature-256": _signature("secret", body), + }, + ) + + assert response.status_code == 200 + assert response.json() == {"status": "ignored"} + with session_scope(sqlite_url) as session: + event = session.get(WebhookEvent, "delivery-route-ignored") + assert event is not None + assert event.event_type == "issues" + assert event.processed_status == "ignored" + + +def test_github_webhook_route_returns_401_for_bad_signature(sqlite_url: str) -> None: + client = TestClient(create_app(database_url=sqlite_url, webhook_secret="secret")) + + response = client.post( + "/webhooks/github", + content=b'{"action":"opened"}', + headers={ + "X-GitHub-Delivery": "delivery-bad-signature", + "X-GitHub-Event": "issues", + "X-Hub-Signature-256": "sha256=bad", + }, + ) + + assert response.status_code == 401 + assert response.json() == {"status": "unauthorized"}