Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions cyberdrop_dl/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import asyncio
import json
from collections.abc import Awaitable, Callable
from datetime import datetime
from pathlib import Path

from aiohttp import web

from cyberdrop_dl.utils.utilities import parse_url

router = web.RouteTableDef()


def json_error_wrapper(
func: Callable[[web.Request], Awaitable[web.Response]],
) -> Callable[[web.Request], Awaitable[web.Response]]:
async def handler(request: web.Request) -> web.Response:
try:
return await func(request)
except asyncio.CancelledError:
raise
except json.JSONDecodeError:
return web.json_response({"status": "failed", "reason": "Invalid payload"}, status=400)
except Exception as ex:
return web.json_response({"status": "failed", "reason": repr(ex)}, status=400)

return handler


@router.get("/")
async def root(request: web.Request) -> web.Response:
return web.Response(text="Cyberdrop DL API")


@router.post("/add_urls")
@json_error_wrapper
async def add_urls(request: web.Request) -> web.Response:
post = await request.json()
urls: list[str] = post["urls"]
print(urls) # noqa: T201
if not urls:
raise ValueError("No urls to add")
for url in urls:
try:
parse_url(url)
except Exception:
raise ValueError(f"Invalid {url = }") from None

name = str(datetime.now()).replace(":", "_")
Path(name).write_text("\n".join(urls))

Check failure on line 50 in cyberdrop_dl/api.py

View workflow job for this annotation

GitHub Actions / ruff

ruff (ASYNC240)

cyberdrop_dl/api.py:50:5: ASYNC240 Async functions should not use pathlib.Path methods, use trio.Path or anyio.path
return web.json_response({"status": "ok"})


async def run() -> None:
app = web.Application()
app.add_routes(router)
runner = web.AppRunner(app, handle_signals=False)
await runner.setup()

try:
site = web.TCPSite(runner)
await site.start()
print(f"Running server on {site.name}") # noqa: T201

except (web.GracefulExit, KeyboardInterrupt):
pass

finally:
await runner.cleanup()


if __name__ == "__main__":
import asyncio

asyncio.run(run())
Loading