Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions app/ledger_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from __future__ import annotations

import json
import re
from typing import Annotated, Any

from fastapi import APIRouter, FastAPI, HTTPException, Query
from sqlalchemy import select
from sqlalchemy.orm import Session

from app.db import session_scope
from app.models import LedgerEntry, Proof
from app.serializers import ledger_to_dict

SQLITE_INTEGER_MAX = 2**63 - 1
PROOF_HASH_RE = re.compile(r"^[0-9a-f]{64}$")


def register_ledger_api_routes(app: FastAPI | APIRouter, *, database_url: str) -> None:
@app.get("/api/v1/ledger")
def api_ledger(limit: Annotated[int, Query(ge=1, le=200)] = 50) -> list[dict[str, Any]]:
return ledger_entries(database_url, limit)

@app.get("/api/v1/ledger/{sequence}")
def api_ledger_entry(sequence: int) -> dict[str, Any]:
return ledger_entry(database_url, sequence)

@app.get("/api/v1/proofs/{proof_hash}")
def api_proof(proof_hash: str) -> dict[str, Any]:
return proof_payload(database_url, proof_hash)


def ledger_entries(database_url: str, limit: int = 50) -> list[dict[str, Any]]:
with session_scope(database_url) as session:
entries = session.scalars(
select(LedgerEntry).order_by(LedgerEntry.sequence.desc()).limit(limit)
).all()
proofs = proof_hashes_by_sequence(session, [entry.sequence for entry in entries])
return [ledger_to_dict(entry, proofs.get(entry.sequence)) for entry in entries]


def ledger_entry(database_url: str, sequence: int) -> dict[str, Any]:
sequence = positive_ledger_sequence(sequence)
with session_scope(database_url) as session:
entry = session.get(LedgerEntry, sequence)
if entry is None:
raise HTTPException(status_code=404, detail="ledger entry not found")
proof = session.scalar(select(Proof).where(Proof.ledger_sequence == sequence).limit(1))
return ledger_to_dict(entry, proof.hash if proof else None)


def proof_payload(database_url: str, proof_hash: str) -> dict[str, Any]:
proof_hash = proof_hash_from_path(proof_hash)
with session_scope(database_url) as session:
proof = session.get(Proof, proof_hash)
if proof is None:
raise HTTPException(status_code=404, detail="proof not found")
try:
data = json.loads(proof.public_json)
except json.JSONDecodeError as exc:
raise HTTPException(status_code=500, detail="invalid proof payload") from exc
if not isinstance(data, dict):
raise HTTPException(status_code=500, detail="invalid proof payload")
return data


def proof_hashes_by_sequence(session: Session, sequences: list[int]) -> dict[int, str]:
if not sequences:
return {}
rows = session.execute(
select(Proof.ledger_sequence, Proof.hash).where(Proof.ledger_sequence.in_(sequences))
).all()
return {int(sequence): str(proof_hash) for sequence, proof_hash in rows}


def positive_ledger_sequence(sequence: int) -> int:
if sequence <= 0:
raise HTTPException(status_code=400, detail="ledger sequence must be positive")
if sequence > SQLITE_INTEGER_MAX:
raise HTTPException(status_code=400, detail="ledger sequence is too large")
return sequence


def proof_hash_from_path(proof_hash: str) -> str:
if proof_hash != proof_hash.strip():
raise HTTPException(status_code=400, detail="proof hash must be 64 hex characters")
clean = proof_hash.lower()
if not PROOF_HASH_RE.fullmatch(clean):
raise HTTPException(status_code=400, detail="proof hash must be 64 hex characters")
return clean
88 changes: 24 additions & 64 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Annotated, Any
from urllib.parse import urlencode, urlsplit, urlunsplit
from urllib.parse import unquote, urlencode, urlsplit, urlunsplit

import httpx
from fastapi import Depends, FastAPI, Form, HTTPException, Query, Request
Expand Down Expand Up @@ -48,6 +48,14 @@
submit_wallet_transfer,
validate_public_url,
)
from app.ledger_api import (
ledger_entries,
ledger_entry,
proof_hash_from_path,
proof_hashes_by_sequence,
proof_payload,
register_ledger_api_routes,
)
from app.mcp import handle_mcp_request
from app.models import (
Account,
Expand Down Expand Up @@ -97,7 +105,6 @@
"X-Frame-Options": "DENY",
}
GITHUB_LOGIN_RE = re.compile(r"^[a-z0-9](?:[a-z0-9-]{0,37}[a-z0-9])?$")
HEX_HASH_RE = re.compile(r"^[0-9a-f]{64}$")
API_DOCS_CSP = (
"default-src 'self'; "
"base-uri 'self'; "
Expand Down Expand Up @@ -260,15 +267,6 @@ def _is_ltc_lab_host(request: Request) -> bool:
return _host_without_port(request) in {"ltclab.site", "www.ltclab.site"}


def _proof_hashes_by_sequence(session: Session, sequences: list[int]) -> dict[int, str]:
if not sequences:
return {}
rows = session.execute(
select(Proof.ledger_sequence, Proof.hash).where(Proof.ledger_sequence.in_(sequences))
).all()
return {int(sequence): str(proof_hash) for sequence, proof_hash in rows}


def _oauth_configured(settings: Settings) -> bool:
return bool(
settings.github_oauth_client_id
Expand All @@ -278,13 +276,17 @@ def _oauth_configured(settings: Settings) -> bool:


def _safe_next_path(next_path: str | None) -> str:
decoded_next_path = unquote(next_path) if next_path else ""
if (
not next_path
or not next_path.startswith("/")
or next_path.startswith("//")
or len(next_path) > 2048
or "\\" in next_path
or decoded_next_path.startswith("//")
or "\\" in decoded_next_path
or any(ord(char) < 32 or 127 <= ord(char) < 160 for char in next_path)
or any(ord(char) < 32 or 127 <= ord(char) < 160 for char in decoded_next_path)
):
return "/me"
return next_path
Expand Down Expand Up @@ -344,30 +346,13 @@ def _positive_bounty_id(bounty_id: int) -> int:
return bounty_id


def _positive_ledger_sequence(sequence: int) -> int:
if sequence <= 0:
raise HTTPException(status_code=400, detail="ledger sequence must be positive")
if sequence > SQLITE_INTEGER_MAX:
raise HTTPException(status_code=400, detail="ledger sequence is too large")
return sequence


def _normalized_wallet_address(address: str) -> str:
try:
return normalize_wallet_address(address)
except WalletError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc


def _proof_hash_from_path(proof_hash: str) -> str:
if proof_hash != proof_hash.strip():
raise HTTPException(status_code=400, detail="proof hash must be 64 hex characters")
clean = proof_hash.lower()
if not HEX_HASH_RE.fullmatch(clean):
raise HTTPException(status_code=400, detail="proof hash must be 64 hex characters")
return clean


def _signed_value(value: str, secret: str) -> str:
timestamp = str(int(time.time()))
body = f"{value}|{timestamp}"
Expand Down Expand Up @@ -1058,36 +1043,7 @@ async def api_submit_transfer(request: Request) -> dict[str, Any]:
raise HTTPException(status_code=400, detail=str(exc)) from exc
return wallet_transfer_to_dict(transfer)

@app.get("/api/v1/ledger")
def api_ledger(limit: Annotated[int, Query(ge=1, le=200)] = 50) -> list[dict[str, Any]]:
with session_scope(db_url) as session:
entries = session.scalars(
select(LedgerEntry).order_by(LedgerEntry.sequence.desc()).limit(limit)
).all()
proofs = _proof_hashes_by_sequence(session, [entry.sequence for entry in entries])
return [ledger_to_dict(entry, proofs.get(entry.sequence)) for entry in entries]

@app.get("/api/v1/ledger/{sequence}")
def api_ledger_entry(sequence: int) -> dict[str, Any]:
sequence = _positive_ledger_sequence(sequence)
with session_scope(db_url) as session:
entry = session.get(LedgerEntry, sequence)
if entry is None:
raise HTTPException(status_code=404, detail="ledger entry not found")
proof = session.scalar(select(Proof).where(Proof.ledger_sequence == sequence).limit(1))
return ledger_to_dict(entry, proof.hash if proof else None)

@app.get("/api/v1/proofs/{proof_hash}")
def api_proof(proof_hash: str) -> dict[str, Any]:
proof_hash = _proof_hash_from_path(proof_hash)
with session_scope(db_url) as session:
proof = session.get(Proof, proof_hash)
if proof is None:
raise HTTPException(status_code=404, detail="proof not found")
data = json.loads(proof.public_json)
if not isinstance(data, dict):
raise HTTPException(status_code=500, detail="invalid proof payload")
return data
register_ledger_api_routes(app, database_url=db_url)

@app.get("/api/v1/activity")
def api_activity(q: str | None = Query(None)) -> dict[str, Any]:
Expand Down Expand Up @@ -1179,12 +1135,14 @@ def bounty_page(request: Request, bounty_id: int) -> HTMLResponse:

@app.get("/ledger", response_class=HTMLResponse)
def ledger_page(request: Request) -> HTMLResponse:
return templates.TemplateResponse(request, "ledger.html", {"entries": api_ledger()})
return templates.TemplateResponse(
request, "ledger.html", {"entries": ledger_entries(db_url)}
)

@app.get("/ledger/{sequence}", response_class=HTMLResponse)
def ledger_entry_page(request: Request, sequence: int) -> HTMLResponse:
return templates.TemplateResponse(
request, "ledger_entry.html", {"entry": api_ledger_entry(sequence)}
request, "ledger_entry.html", {"entry": ledger_entry(db_url, sequence)}
)

@app.get("/activity", response_class=HTMLResponse)
Expand All @@ -1203,7 +1161,7 @@ def account_page(request: Request, account: str) -> HTMLResponse:
.order_by(LedgerEntry.sequence.desc())
.limit(100)
).all()
proofs = _proof_hashes_by_sequence(session, [entry.sequence for entry in entries])
proofs = proof_hashes_by_sequence(session, [entry.sequence for entry in entries])
transactions = [ledger_to_dict(entry, proofs.get(entry.sequence)) for entry in entries]
accepted_work = safe_accepted_work_for_account(session, account)
return templates.TemplateResponse(
Expand Down Expand Up @@ -1245,7 +1203,7 @@ def wallet_page(request: Request, address: str) -> HTMLResponse:
.order_by(LedgerEntry.sequence.desc())
.limit(100)
).all()
proofs = _proof_hashes_by_sequence(session, [entry.sequence for entry in entries])
proofs = proof_hashes_by_sequence(session, [entry.sequence for entry in entries])
transactions = [ledger_to_dict(entry, proofs.get(entry.sequence)) for entry in entries]
return templates.TemplateResponse(
request,
Expand All @@ -1260,7 +1218,9 @@ def transfer_page(request: Request) -> HTMLResponse:
@app.get("/proofs/{proof_hash}", response_class=HTMLResponse)
def proof_page(request: Request, proof_hash: str) -> HTMLResponse:
return templates.TemplateResponse(
request, "proof.html", {"proof": api_proof(proof_hash), "proof_hash": proof_hash}
request,
"proof.html",
{"proof": proof_payload(db_url, proof_hash), "proof_hash": proof_hash},
)

@app.get("/docs", response_class=HTMLResponse)
Expand Down Expand Up @@ -1735,7 +1695,7 @@ def optional_bool_arg(field: str, default: bool = False) -> bool:
)
return json.dumps(ledger_to_dict(entry, proof.hash if proof else None))
if name == "get_proof":
proof = session.get(Proof, _proof_hash_from_path(str_arg("hash")))
proof = session.get(Proof, proof_hash_from_path(str_arg("hash")))
if proof is None:
return "proof not found"
public_payload = json.loads(proof.public_json)
Expand Down
111 changes: 111 additions & 0 deletions tests/test_ledger_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from __future__ import annotations

import pytest
from fastapi import HTTPException

from app.db import create_schema, session_scope
from app.ledger.service import create_bounty, ensure_genesis, pay_bounty
from app.ledger_api import (
ledger_entries,
ledger_entry,
positive_ledger_sequence,
proof_hash_from_path,
proof_payload,
)
from app.models import Proof


def test_ledger_api_helpers_shape_ledger_and_proof_payloads(sqlite_url: str) -> None:
create_schema(sqlite_url)
with session_scope(sqlite_url) as session:
ensure_genesis(session)
bounty = create_bounty(
session,
repo="ramimbo/mergework",
issue_number=320,
issue_url="https://github.com/ramimbo/mergework/issues/320",
title="Ledger API route extraction",
reward_mrwk="25",
acceptance="Ledger API helpers should be independently testable.",
)
proof = pay_bounty(
session,
bounty_id=bounty.id,
to_account="github:alice",
submission_url="https://github.com/ramimbo/mergework/pull/320",
accepted_by="maintainer",
verifier_result={"label": "mrwk:accepted"},
)
proof_sequence = proof.ledger_sequence
proof_hash = proof.hash

entries = ledger_entries(sqlite_url, 10)
entry = ledger_entry(sqlite_url, proof_sequence)
payload = proof_payload(sqlite_url, proof_hash)

assert entries[0]["sequence"] == proof_sequence
assert entries[0]["proof_hash"] == proof_hash
assert entry["proof_hash"] == proof_hash
assert payload["kind"] == "bounty_payment"
assert payload["submission_url"] == "https://github.com/ramimbo/mergework/pull/320"


def test_ledger_api_helpers_reject_malformed_path_values(sqlite_url: str) -> None:
assert positive_ledger_sequence(1) == 1
assert proof_hash_from_path("A" * 64) == "a" * 64

for sequence, detail in (
(0, "ledger sequence must be positive"),
(2**63, "ledger sequence is too large"),
):
with pytest.raises(HTTPException) as exc_info:
positive_ledger_sequence(sequence)
assert exc_info.value.status_code == 400
assert exc_info.value.detail == detail

for proof_hash in (" " + ("a" * 64), "not-a-proof-hash", "g" * 64):
with pytest.raises(HTTPException) as exc_info:
proof_hash_from_path(proof_hash)
assert exc_info.value.status_code == 400
assert exc_info.value.detail == "proof hash must be 64 hex characters"


def test_proof_payload_reports_missing_or_invalid_payloads(sqlite_url: str) -> None:
create_schema(sqlite_url)

with pytest.raises(HTTPException) as missing_info:
proof_payload(sqlite_url, "a" * 64)
assert missing_info.value.status_code == 404
assert missing_info.value.detail == "proof not found"

with session_scope(sqlite_url) as session:
ensure_genesis(session)
bounty = create_bounty(
session,
repo="ramimbo/mergework",
issue_number=321,
issue_url="https://github.com/ramimbo/mergework/issues/321",
title="Invalid proof payload",
reward_mrwk="25",
acceptance="Ledger API proof payload errors should be stable.",
)
proof = pay_bounty(
session,
bounty_id=bounty.id,
to_account="github:alice",
submission_url="https://github.com/ramimbo/mergework/pull/321",
accepted_by="maintainer",
verifier_result={"label": "mrwk:accepted"},
)
proof_hash = proof.hash

for invalid_payload in ("{", "[]"):
with session_scope(sqlite_url) as session:
proof_row = session.get(Proof, proof_hash)
assert proof_row is not None
proof_row.public_json = invalid_payload

with pytest.raises(HTTPException) as invalid_info:
proof_payload(sqlite_url, proof_hash)
assert invalid_info.value.status_code == 500
assert invalid_info.value.detail == "invalid proof payload"
Loading