From eeafc0ed3c2b182534c47b5ab6a098d1e9b9c9e8 Mon Sep 17 00:00:00 2001 From: lizhen <3039213175@qq.com> Date: Sat, 21 Feb 2026 13:11:04 +0800 Subject: [PATCH 1/2] fix: explicit use of utf-8 encoding when reading module files --- main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.py b/main.py index e277d3a3..84590421 100644 --- a/main.py +++ b/main.py @@ -134,7 +134,7 @@ async def load_module_from_path(module_name, module_path): try: # Read the module content - with open(module_path, "r") as file: + with open(module_path, "r", encoding="utf-8") as file: content = file.read() # Parse frontmatter From 879d7f77f8f41f71bcc0eae4ebd2d806212e273b Mon Sep 17 00:00:00 2001 From: pytdong Date: Thu, 19 Mar 2026 22:39:26 +0800 Subject: [PATCH 2/2] feat: Asynchronous Pipeline Support --- main.py | 114 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 112 insertions(+), 2 deletions(-) diff --git a/main.py b/main.py index 84590421..2461bd7f 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,5 @@ +import inspect + from fastapi import FastAPI, Request, Depends, status, HTTPException, UploadFile, File from fastapi.middleware.cors import CORSMiddleware from fastapi.concurrency import run_in_threadpool @@ -5,7 +7,7 @@ from starlette.responses import StreamingResponse, Response from pydantic import BaseModel, ConfigDict -from typing import List, Union, Generator, Iterator +from typing import List, Union, Generator, Iterator, AsyncGenerator from utils.pipelines.auth import bearer_security, get_current_user @@ -786,4 +788,112 @@ def stream_content(): ], } - return await run_in_threadpool(job) + # Asynchronous Job (for non-blocking pipelines) + async def async_job(): + + if form_data.stream: + res = await pipe( + user_message=user_message, + model_id=pipeline_id, + messages=messages, + body=form_data.model_dump(), + ) + logging.info(f"stream:true:{res}") + + async def stream_content(): + if isinstance(res, str): + message = stream_message_template(form_data.model, res) + logging.info(f"stream_content:str:{message}") + yield f"data: {json.dumps(message)}\n\n" + + elif inspect.isasyncgen(res) or isinstance(res, AsyncGenerator): + async for chunk in res: + if isinstance(chunk, BaseModel): + chunk = chunk.model_dump_json() + chunk = f"data: {chunk}" + elif isinstance(chunk, dict): + chunk = json.dumps(chunk) + chunk = f"data: {chunk}" + + try: + chunk = chunk.decode("utf-8") + logging.info(f"stream_content:AsyncGenerator:{chunk}") + except: + pass + + if isinstance(chunk, str) and chunk.startswith("data:"): + yield f"{chunk}\n\n" + else: + chunk = stream_message_template(form_data.model, chunk) + yield f"data: {json.dumps(chunk)}\n\n" + else: + logging.warning(f"Unhandled async response type: {type(res)}") + + + finish_message = { + "id": f"{form_data.model}-{str(uuid.uuid4())}", + "object": "chat.completion.chunk", + "created": int(time.time()), + "model": form_data.model, + "choices": [ + { + "index": 0, + "delta": {}, + "logprobs": None, + "finish_reason": "stop", + } + ], + } + yield f"data: {json.dumps(finish_message)}\n\n" + yield "data: [DONE]" + + return StreamingResponse(stream_content(), media_type="text/event-stream") + + else: + res = await pipe( + user_message=user_message, + model_id=pipeline_id, + messages=messages, + body=form_data.model_dump(), + ) + logging.info(f"stream:false:{res}") + + if isinstance(res, dict): + return res + elif isinstance(res, BaseModel): + return res.model_dump() + else: + message = "" + if isinstance(res, str): + message = res + elif inspect.isasyncgen(res) or isinstance(res, AsyncGenerator): + async for chunk in res: + message = f"{message}{chunk}" + else: + logging.warning(f"Unhandled async response type: {type(res)}") + + logging.info(f"stream:false:{message}") + return { + "id": f"{form_data.model}-{str(uuid.uuid4())}", + "object": "chat.completion", + "created": int(time.time()), + "model": form_data.model, + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": message, + }, + "logprobs": None, + "finish_reason": "stop", + } + ], + } + + if inspect.iscoroutinefunction(pipe): + logging.info(f"Executing ASYNC job for pipeline: {form_data.model}") + return await async_job() + else: + logging.info(f"Executing SYNC job in thread pool for pipeline: {form_data.model}") + return await run_in_threadpool(job) \ No newline at end of file