-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi.py
More file actions
39 lines (33 loc) · 1.53 KB
/
api.py
File metadata and controls
39 lines (33 loc) · 1.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# web/api.py
import logging
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.responses import HTMLResponse, JSONResponse
from telegram import Update
from oauth_flow.agent import oauth_agent
from telegram_ux import ui
logger = logging.getLogger("WebAPI")
def create_fastapi_app(telegram_app):
app = FastAPI(title="Agentic Telegram Bot API", version="1.0")
@app.post("/webhook")
async def webhook(request: Request, background_tasks: BackgroundTasks):
data = await request.json()
update = Update.de_json(data, telegram_app.bot)
background_tasks.add_task(telegram_app.process_update, update)
return JSONResponse({"ok": True})
@app.get("/oauth/callback", response_class=HTMLResponse)
async def oauth_callback(request: Request):
code = request.query_params.get("code")
state = request.query_params.get("state")
if not code or not state:
return HTMLResponse(ui.oauth_error_html("Missing code or state"), status_code=400)
try:
await oauth_agent.run("callback", {"telegram_id": state, "code": code})
await telegram_app.bot.send_message(chat_id=state, text=ui.connect_success())
return HTMLResponse(ui.oauth_auto_close_html())
except Exception as e:
logger.exception("OAuth callback failed: %s", e)
return HTMLResponse(ui.oauth_error_html(str(e)), status_code=500)
@app.get("/")
async def home():
return {"status": "ok", "message": "Bot live and agent-ready!"}
return app