-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
65 lines (47 loc) · 1.43 KB
/
main.py
File metadata and controls
65 lines (47 loc) · 1.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from fastapi import FastAPI, HTTPException
import logging
from models import ReviewRequest
from redis_client import r
from multiprocessing import Process, Queue
from contextlib import asynccontextmanager
import worker
MAX_QUEUE_SIZE = 50
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger("tabelog_app")
result_queue = Queue()
@asynccontextmanager
async def lifespan(app: FastAPI):
p = Process(target=worker.run, args=(result_queue,))
p.start()
yield
p.terminate()
p.join()
app = FastAPI(lifespan=lifespan)
@app.post("/enqueue")
def enqueue_url(request: ReviewRequest):
if r.llen("task_queue") >= MAX_QUEUE_SIZE:
raise HTTPException(status_code=429, detail="대기열이 가득 찼습니다.")
r.rpush("task_queue", request.model_dump_json())
return {"message": "작업이 대기열에 추가되었습니다.", "queue_size": r.llen("task_queue")}
@app.get("/result")
def get_result():
if result_queue.empty():
return None
return result_queue.get()
@app.get("/queue/status")
def get_queue_status():
return {
"queue_size": r.llen("task_queue"),
"max_queue_size": MAX_QUEUE_SIZE
}
@app.get("/result_size")
def get_queue_status():
return {
"queue_size": result_queue.qsize()
}
@app.get("/")
def root():
return {"message": "KcELECTRA NER API is running!"}