Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
298 changes: 298 additions & 0 deletions app/bounty_attempts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
from __future__ import annotations

from collections.abc import Awaitable, Callable
from datetime import UTC, datetime, timedelta
from typing import Any

from fastapi import Depends, FastAPI, HTTPException, Query, Request
from fastapi.responses import JSONResponse
from sqlalchemy import func, select, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session

from app.db import session_scope
from app.ledger.service import LedgerError, validate_public_url
from app.models import Bounty, BountyAttempt

DEFAULT_ATTEMPT_TTL_SECONDS = 24 * 60 * 60
MIN_ATTEMPT_TTL_SECONDS = 60
MAX_ATTEMPT_TTL_SECONDS = 7 * 24 * 60 * 60

JsonObjectLoader = Callable[[Request], Awaitable[dict[str, Any]]]
LoginDependency = Callable[[Request], str]
RequiredString = Callable[[dict[str, Any], str], str]
OptionalInteger = Callable[[dict[str, Any], str, int], int]
NormalizeAccount = Callable[[str], str]
PositiveBountyId = Callable[[int], int]


def _utc_now() -> datetime:
return datetime.now(UTC)


def _as_utc(value: datetime) -> datetime:
if value.tzinfo is None:
return value.replace(tzinfo=UTC)
return value.astimezone(UTC)


def _attempt_effective_status(attempt: BountyAttempt, now: datetime) -> str:
if attempt.status == "active" and _as_utc(attempt.expires_at) <= now:
return "expired"
return attempt.status


async def _optional_json_object(request: Request, json_object: JsonObjectLoader) -> dict[str, Any]:
if not (await request.body()).strip():
return {}
return await json_object(request)


def bounty_attempt_to_dict(attempt: BountyAttempt, now: datetime | None = None) -> dict[str, Any]:
now = _as_utc(now or _utc_now())
return {
"id": attempt.id,
"bounty_id": attempt.bounty_id,
"submitter_account": attempt.submitter_account,
"source_url": attempt.source_url,
"status": _attempt_effective_status(attempt, now),
"expires_at": _as_utc(attempt.expires_at).isoformat(),
"created_at": _as_utc(attempt.created_at).isoformat(),
"updated_at": _as_utc(attempt.updated_at).isoformat(),
}


def _active_attempt_conditions(bounty_id: int, now: datetime) -> tuple[Any, ...]:
return (
BountyAttempt.bounty_id == bounty_id,
BountyAttempt.status == "active",
BountyAttempt.expires_at > now,
)


def bounty_attempt_warnings(session: Session, bounty: Bounty, now: datetime) -> list[str]:
warnings: list[str] = []
awards_remaining = max(0, bounty.max_awards - bounty.awards_paid)
if bounty.status != "open":
warnings.append(f"bounty is {bounty.status}")
awards_remaining = 0
if awards_remaining <= 0:
warnings.append("bounty has no award slots remaining")
active_count = session.scalar(
select(func.count())
.select_from(BountyAttempt)
.where(*_active_attempt_conditions(bounty.id, now))
)
if active_count and active_count > 1:
warnings.append(f"bounty has {active_count} active attempts")
return warnings


def list_bounty_attempts(
session: Session,
bounty: Bounty,
*,
include_expired: bool = False,
limit: int | None = None,
now: datetime | None = None,
) -> dict[str, Any]:
now = _as_utc(now or _utc_now())
query = select(BountyAttempt).where(BountyAttempt.bounty_id == bounty.id)
if not include_expired:
query = query.where(*_active_attempt_conditions(bounty.id, now))
query = query.order_by(BountyAttempt.created_at.desc(), BountyAttempt.id.desc())
if limit is not None:
query = query.limit(limit)
attempts = session.scalars(query).all()
return {
"warnings": bounty_attempt_warnings(session, bounty, now),
"attempts": [bounty_attempt_to_dict(attempt, now) for attempt in attempts],
}


def expire_stale_bounty_attempts(
session: Session, bounty_id: int, now: datetime, submitter_account: str | None = None
) -> None:
query = update(BountyAttempt).where(
BountyAttempt.bounty_id == bounty_id,
BountyAttempt.status == "active",
BountyAttempt.expires_at <= now,
)
if submitter_account is not None:
query = query.where(BountyAttempt.submitter_account == submitter_account)
session.execute(query.values(status="expired", updated_at=now))


def register_bounty_attempt_routes(
app: FastAPI,
*,
db_url: str,
require_github_login: LoginDependency,
json_object: JsonObjectLoader,
required_str: RequiredString,
optional_int: OptionalInteger,
normalized_account: NormalizeAccount,
positive_bounty_id: PositiveBountyId,
sqlite_integer_max: int,
) -> None:
def attempt_submitter_account(data: dict[str, Any], github_login: str) -> str:
submitter_account = f"github:{github_login}"
if data.get("submitter_account") is None:
return submitter_account
requested_account = normalized_account(required_str(data, "submitter_account"))
if requested_account != submitter_account:
raise HTTPException(status_code=403, detail="submitter_account does not match login")
return submitter_account

@app.get("/api/v1/bounties/{bounty_id}/attempts")
def api_bounty_attempts(bounty_id: int, include_expired: bool = Query(False)) -> dict[str, Any]:
bounty_id = positive_bounty_id(bounty_id)
now = _utc_now()
with session_scope(db_url) as session:
bounty = session.get(Bounty, bounty_id)
if bounty is None:
raise HTTPException(status_code=404, detail="bounty not found")
listing = list_bounty_attempts(
session, bounty, include_expired=include_expired, now=now
)
return {
"bounty_id": bounty_id,
"warnings": listing["warnings"],
"attempts": listing["attempts"],
}

@app.post("/api/v1/bounties/{bounty_id}/attempts")
async def api_create_bounty_attempt(
bounty_id: int,
request: Request,
github_login: str = Depends(require_github_login),
) -> JSONResponse:
bounty_id = positive_bounty_id(bounty_id)
data = await _optional_json_object(request, json_object)
submitter_account = attempt_submitter_account(data, github_login)
ttl_seconds = optional_int(data, "ttl_seconds", DEFAULT_ATTEMPT_TTL_SECONDS)
if ttl_seconds < MIN_ATTEMPT_TTL_SECONDS:
raise HTTPException(status_code=400, detail="ttl_seconds must be at least 60")
if ttl_seconds > MAX_ATTEMPT_TTL_SECONDS:
raise HTTPException(status_code=400, detail="ttl_seconds must be no more than 604800")
source = data.get("source_url", "")
if source is None:
source = ""
if not isinstance(source, str):
raise HTTPException(status_code=400, detail="source_url must be a string")
source = source.strip()
try:
source_url = validate_public_url(source) if source else None
except LedgerError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
now = _utc_now()
with session_scope(db_url) as session:
bounty = session.get(Bounty, bounty_id)
if bounty is None:
raise HTTPException(status_code=404, detail="bounty not found")
expire_stale_bounty_attempts(session, bounty_id, now, submitter_account)
awards_remaining = max(0, bounty.max_awards - bounty.awards_paid)
if bounty.status != "open" or awards_remaining <= 0:
return JSONResponse(
status_code=409,
content={
"status": "not_available",
"bounty_id": bounty_id,
"warnings": bounty_attempt_warnings(session, bounty, now),
},
)
existing = session.scalar(
select(BountyAttempt)
.where(
*_active_attempt_conditions(bounty_id, now),
BountyAttempt.submitter_account == submitter_account,
)
.order_by(BountyAttempt.created_at.desc(), BountyAttempt.id.desc())
.limit(1)
)
if existing is not None:
return JSONResponse(
status_code=409,
content={
"status": "duplicate_active_attempt",
"attempt": bounty_attempt_to_dict(existing, now),
"warnings": bounty_attempt_warnings(session, bounty, now),
},
)
attempt = BountyAttempt(
bounty_id=bounty_id,
submitter_account=submitter_account,
source_url=source_url,
status="active",
expires_at=now + timedelta(seconds=ttl_seconds),
created_at=now,
updated_at=now,
)
session.add(attempt)
try:
session.flush()
except IntegrityError:
session.rollback()
bounty = session.get(Bounty, bounty_id)
existing = session.scalar(
select(BountyAttempt)
.where(
*_active_attempt_conditions(bounty_id, now),
BountyAttempt.submitter_account == submitter_account,
)
.order_by(BountyAttempt.created_at.desc(), BountyAttempt.id.desc())
.limit(1)
)
if bounty is None or existing is None:
raise HTTPException(
status_code=409, detail="active attempt already exists"
) from None
return JSONResponse(
status_code=409,
content={
"status": "duplicate_active_attempt",
"attempt": bounty_attempt_to_dict(existing, now),
"warnings": bounty_attempt_warnings(session, bounty, now),
},
)
return JSONResponse(
status_code=201,
content={
"status": "registered",
"attempt": bounty_attempt_to_dict(attempt, now),
"warnings": bounty_attempt_warnings(session, bounty, now),
},
)

@app.post("/api/v1/bounty-attempts/{attempt_id}/release")
async def api_release_bounty_attempt(
attempt_id: int,
request: Request,
github_login: str = Depends(require_github_login),
) -> dict[str, Any]:
if attempt_id <= 0:
raise HTTPException(status_code=400, detail="attempt id must be positive")
if attempt_id > sqlite_integer_max:
raise HTTPException(status_code=400, detail="attempt id is too large")
data = await _optional_json_object(request, json_object)
submitter_account = attempt_submitter_account(data, github_login)
now = _utc_now()
with session_scope(db_url) as session:
attempt = session.get(BountyAttempt, attempt_id)
if attempt is None:
raise HTTPException(status_code=404, detail="attempt not found")
if attempt.submitter_account != submitter_account:
raise HTTPException(status_code=403, detail="submitter_account does not match")
effective_status = _attempt_effective_status(attempt, now)
if effective_status != "active":
return {
"status": f"already_{effective_status}",
"attempt": bounty_attempt_to_dict(attempt, now),
}
attempt.status = "released"
attempt.updated_at = now
session.flush()
return {
"status": "released",
"attempt": bounty_attempt_to_dict(attempt, now),
}
Loading
Loading