From 15b57e22f0cae61eb52cc7ab9e7e9b1dcfbfffca Mon Sep 17 00:00:00 2001 From: hejuewei Date: Tue, 13 Jan 2026 14:36:26 +0800 Subject: [PATCH] feat(pipeline): add DBGPT integration pipeline --- .../pipelines/integrations/dbgpt_pipeline.py | 204 ++++++++++++++++++ 1 file changed, 204 insertions(+) create mode 100644 examples/pipelines/integrations/dbgpt_pipeline.py diff --git a/examples/pipelines/integrations/dbgpt_pipeline.py b/examples/pipelines/integrations/dbgpt_pipeline.py new file mode 100644 index 00000000..06e26292 --- /dev/null +++ b/examples/pipelines/integrations/dbgpt_pipeline.py @@ -0,0 +1,204 @@ +# -*- coding: utf-8 -*- +""" +DBGPT Integration Pipeline for OpenWebUI + +This pipeline enables OpenWebUI to integrate with a self-hosted DBGPT service +as an external AI backend. It is designed as a blocking-only pipeline for +enterprise-grade stability, demo scenarios, and text-to-SQL use cases. + +Key features: +- Blocking request mode (no streaming exposed to OpenWebUI) +- Robust exception shielding +- Output normalization for mixed DBGPT responses (text / chart-view / think) +""" + +from typing import List, Union, Optional +import json +import re +import requests +from pprint import pprint + + +class Pipeline: + def __init__(self): + # Pipeline display name + self.name = "DBGPT Integration Pipeline" + + # ===== DBGPT configuration ===== + # Users can modify these values according to their deployment + self.dbgpt_api_url = "http://localhost:5670/api/v1/chat/completions" + self.app_code = "your_dbgpt_app_code" + self.model_name = "Qwen3-32B" + self.domain = "default_domain" + + # Behavior flags + self.debug = False + self.timeout = 300 # seconds + + # --------------------------------------------------------------------- + # Lifecycle hooks + # --------------------------------------------------------------------- + + async def on_startup(self): + print(f"[{self.name}] startup") + + async def on_shutdown(self): + print(f"[{self.name}] shutdown") + + # --------------------------------------------------------------------- + # Optional inlet / outlet hooks (kept for consistency with examples) + # --------------------------------------------------------------------- + + async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: + if self.debug: + print("[inlet] body:") + pprint(body) + print("[inlet] user:") + pprint(user) + return body + + async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: + if self.debug: + print("[outlet] body:") + pprint(body) + print("[outlet] user:") + pprint(user) + return body + + # --------------------------------------------------------------------- + # Core pipeline entry + # --------------------------------------------------------------------- + + def pipe( + self, + user_message: str, + model_id: str, + messages: List[dict], + body: dict, + ) -> Union[str]: + """ + Blocking-only pipeline. + + Although DBGPT itself may support streaming internally, + this pipeline always returns a finalized response to OpenWebUI + to ensure stability and consistent error handling. + """ + + user_message = self._sanitize_user_message(user_message) + + if self.debug: + print(f"[pipe] model_id: {model_id}") + print(f"[pipe] user_message: {user_message}") + + payload = { + "app_code": self.app_code, + "conv_uid": self._get_conv_uid(body), + "chat_mode": "chat_with_db_execute", + "model_name": self.model_name, + "user_input": user_message, + "temperature": 0.2, + "max_new_tokens": 3000, + "select_param": self.domain, + } + + raw_answer = "" + + try: + response = requests.post( + self.dbgpt_api_url, + json=payload, + stream=False, + timeout=self.timeout, + ) + + response.raise_for_status() + + for line in response.iter_lines(): + if not line: + continue + + decoded = line.decode("utf-8").strip() + if not decoded.startswith("data: "): + continue + + data = decoded[6:] + if data == "[DONE]": + break + + try: + chunk = json.loads(data) + message = ( + chunk.get("choices", [{}])[0] + .get("message", {}) + .get("content") + ) + if message: + raw_answer += message + except Exception: + # Silently ignore malformed chunks + continue + + except Exception as exc: + print(f"[{self.name}] DBGPT request failed:", exc) + return "Sorry, the system failed to complete the query." + + return self._clean_answer(raw_answer) + + # --------------------------------------------------------------------- + # Helpers + # --------------------------------------------------------------------- + + def _get_conv_uid(self, body: dict) -> Optional[str]: + """ + Conversation UID strategy: + - Same OpenWebUI user → same DBGPT conversation + """ + if not body: + return None + + user = body.get("user") + if user and user.get("id"): + return f"dbgpt-user-{user['id']}" + return None + + def _sanitize_user_message(self, text: str) -> str: + """ + Extract the last USER message from OpenWebUI chat history. + """ + if not text: + return "" + + marker = "### Chat History:" + if marker in text: + text = text.split(marker, 1)[1] + + match = re.search(r"([\s\S]*?)", text) + if not match: + return text.strip().splitlines()[0] + + history = match.group(1) + user_msgs = re.findall(r"USER:\s*(.+)", history) + + return user_msgs[-1].strip() if user_msgs else "" + + def _clean_answer(self, text: str) -> str: + """ + Normalize DBGPT mixed output: + - Remove internal JSON blocks + - Remove tags + - Preserve if present + """ + if not text: + return "No result was returned." + + chart_match = re.search(r"", text) + chart_html = chart_match.group(0) if chart_match else "" + + text = re.sub(r"", "", text) + text = re.sub(r"^\s*\{[\s\S]*?\}\s*", "", text) + text = re.sub(r"[\s\S]*?", "", text) + + lines = [line.strip() for line in text.splitlines() if line.strip()] + result = lines[-1] if lines else "Query completed." + + return f"{result}\n{chart_html}" if chart_html else result