From 52ed930651a577359194c47c5e753b00feb0fcee Mon Sep 17 00:00:00 2001 From: _00_ <131402327+rgaricano@users.noreply.github.com> Date: Sun, 7 Dec 2025 23:13:33 +0100 Subject: [PATCH 1/4] FIX: langfusev3 pipeline Revision of the pipeline, clean and fixed some issues and v3 requirements. --- .../filters/langfuse_v3_filter_pipeline.py | 92 ++++++++----------- 1 file changed, 39 insertions(+), 53 deletions(-) diff --git a/examples/filters/langfuse_v3_filter_pipeline.py b/examples/filters/langfuse_v3_filter_pipeline.py index a046eeea..89900867 100644 --- a/examples/filters/langfuse_v3_filter_pipeline.py +++ b/examples/filters/langfuse_v3_filter_pipeline.py @@ -1,19 +1,18 @@ """ title: Langfuse Filter Pipeline for v3 -author: open-webui +author: open-webui v.0.0.1 - (@rgaricano rev.0.0.2) date: 2025-07-31 -version: 0.0.1 +version: 0.0.2 license: MIT description: A filter pipeline that uses Langfuse v3. requirements: langfuse>=3.0.0 """ -from typing import List, Optional import os import uuid import json - +from typing import List, Optional from utils.pipelines.main import get_last_assistant_message from pydantic import BaseModel from langfuse import Langfuse @@ -212,7 +211,7 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: "session_id": chat_id, "interface": "open-webui", } - + # Create trace with all necessary information trace = self.langfuse.start_span( name=f"chat:{chat_id}", @@ -245,18 +244,14 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: "interface": "open-webui", } trace.update_trace( + user_id=user_email, + session_id=chat_id, tags=tags_list if tags_list else None, + input=body, metadata=trace_metadata, ) - - # Update metadata with type - metadata["type"] = task_name - metadata["interface"] = "open-webui" - # Log user input as event try: - trace = self.chat_traces[chat_id] - # Create complete event metadata event_metadata = { **metadata, @@ -276,7 +271,6 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: self.log(f"User input event logged for chat_id: {chat_id}") except Exception as e: self.log(f"Failed to log user input event: {e}") - return body async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: @@ -295,40 +289,35 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: if chat_id == "local": session_id = body.get("session_id") chat_id = f"temporary-session-{session_id}" - - metadata = body.get("metadata", {}) - # Defaulting to 'llm_response' if no task is provided - task_name = metadata.get("task", "llm_response") - - # Build tags - tags_list = self._build_tags(task_name) - if chat_id not in self.chat_traces: self.log(f"[WARNING] No matching trace found for chat_id: {chat_id}, attempting to re-register.") # Re-run inlet to register if somehow missing return await self.inlet(body, user) - self.chat_traces[chat_id] + trace = self.chat_traces[chat_id] + + metadata = body.get("metadata", {}) + task_name = metadata.get("task", "llm_response") + tags_list = self._build_tags(task_name) assistant_message = get_last_assistant_message(body["messages"]) assistant_message_obj = get_last_assistant_message_obj(body["messages"]) - usage = None + usage_details = None if assistant_message_obj: info = assistant_message_obj.get("usage", {}) if isinstance(info, dict): input_tokens = info.get("prompt_eval_count") or info.get("prompt_tokens") output_tokens = info.get("eval_count") or info.get("completion_tokens") if input_tokens is not None and output_tokens is not None: - usage = { + usage_details = { "input": input_tokens, "output": output_tokens, - "unit": "TOKENS", } - self.log(f"Usage data extracted: {usage}") + self.log(f"Usage data extracted: {usage_details}") # Update the trace with complete output information - trace = self.chat_traces[chat_id] + metadata["type"] = task_name metadata["interface"] = "open-webui" @@ -340,16 +329,9 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: "session_id": chat_id, "interface": "open-webui", "task": task_name, + } - - # Update trace with output and complete metadata - trace.update_trace( - output=assistant_message, - metadata=complete_trace_metadata, - tags=tags_list if tags_list else None, - ) - # Outlet: Always create LLM generation (this is the LLM response) # Determine which model value to use based on the use_model_name valve model_id = self.model_names.get(chat_id, {}).get("id", body.get("model")) model_name = self.model_names.get(chat_id, {}).get("name", "unknown") @@ -361,24 +343,21 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: else model_id ) - # Add both values to metadata regardless of valve setting - metadata["model_id"] = model_id - metadata["model_name"] = model_name - + # Create complete generation metadata + generation_metadata = { + **complete_trace_metadata, + "type": "llm_response", + "model_id": model_id, + "model_name": model_name, + "generation_id": str(uuid.uuid4()), + } + + trace = self.chat_traces[chat_id] + # Create LLM generation for the response try: - trace = self.chat_traces[chat_id] - - # Create complete generation metadata - generation_metadata = { - **complete_trace_metadata, - "type": "llm_response", - "model_id": model_id, - "model_name": model_name, - "generation_id": str(uuid.uuid4()), - } - - generation = trace.start_generation( + generation = trace.start_observation( + as_type='generation', name=f"llm_response:{str(uuid.uuid4())}", model=model_value, input=body["messages"], @@ -387,14 +366,21 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: ) # Update with usage if available - if usage: - generation.update(usage=usage) + if usage_details: + generation.update(usage_details=usage_details) generation.end() self.log(f"LLM generation completed for chat_id: {chat_id}") except Exception as e: self.log(f"Failed to create LLM generation: {e}") + # Update trace with output and complete metadata + trace.update_trace( + output=assistant_message, + metadata=complete_trace_metadata, + tags=tags_list if tags_list else None, + ) + trace.end() # Flush data to Langfuse try: if self.langfuse: From 14c2803a5d8e14a54abd6e01d43aaa4df0c6e006 Mon Sep 17 00:00:00 2001 From: _00_ <131402327+rgaricano@users.noreply.github.com> Date: Mon, 8 Dec 2025 00:56:22 +0100 Subject: [PATCH 2/4] Modify date in langfuse_v3_filter_pipeline.py Updated author and date information in the Langfuse filter pipeline. --- examples/filters/langfuse_v3_filter_pipeline.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/filters/langfuse_v3_filter_pipeline.py b/examples/filters/langfuse_v3_filter_pipeline.py index 89900867..9e70804c 100644 --- a/examples/filters/langfuse_v3_filter_pipeline.py +++ b/examples/filters/langfuse_v3_filter_pipeline.py @@ -1,7 +1,7 @@ """ title: Langfuse Filter Pipeline for v3 -author: open-webui v.0.0.1 - (@rgaricano rev.0.0.2) -date: 2025-07-31 +author: open-webui +date: 2025-12-07 version: 0.0.2 license: MIT description: A filter pipeline that uses Langfuse v3. From 925c01adf4980a54582153bf5480879508421445 Mon Sep 17 00:00:00 2001 From: _00_ <131402327+rgaricano@users.noreply.github.com> Date: Tue, 9 Dec 2025 12:15:43 +0100 Subject: [PATCH 3/4] Enhance token usage extraction logic Modified the usage_details logic to handle the cases where only one of input_tokens or output_tokens are None or missing in certain scenarios. --- examples/filters/langfuse_v3_filter_pipeline.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/examples/filters/langfuse_v3_filter_pipeline.py b/examples/filters/langfuse_v3_filter_pipeline.py index 9e70804c..d996cbbb 100644 --- a/examples/filters/langfuse_v3_filter_pipeline.py +++ b/examples/filters/langfuse_v3_filter_pipeline.py @@ -307,12 +307,14 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: if assistant_message_obj: info = assistant_message_obj.get("usage", {}) if isinstance(info, dict): - input_tokens = info.get("prompt_eval_count") or info.get("prompt_tokens") - output_tokens = info.get("eval_count") or info.get("completion_tokens") - if input_tokens is not None and output_tokens is not None: + input_tokens = info.get("prompt_eval_count") or info.get("prompt_tokens") or info.get("input_tokens") + output_tokens = info.get("eval_count") or info.get("completion_tokens") or info.get("output_tokens") + if input_tokens is not None or output_tokens is not None: usage_details = { - "input": input_tokens, - "output": output_tokens, + "input": int(input_tokens) if input_tokens is not None else 0, + "output": int(output_tokens) if output_tokens is not None else 0, + "total": (int(input_tokens) if input_tokens is not None else 0) + + (int(output_tokens) if output_tokens is not None else 0), } self.log(f"Usage data extracted: {usage_details}") From 3b83e7949cc1a436546c5d1eff5145094eaffecf Mon Sep 17 00:00:00 2001 From: _00_ <131402327+rgaricano@users.noreply.github.com> Date: Mon, 5 Jan 2026 11:30:49 +0100 Subject: [PATCH 4/4] Update langfuse_v3_filter_pipeline.py --- examples/filters/langfuse_v3_filter_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/filters/langfuse_v3_filter_pipeline.py b/examples/filters/langfuse_v3_filter_pipeline.py index d996cbbb..b496a16c 100644 --- a/examples/filters/langfuse_v3_filter_pipeline.py +++ b/examples/filters/langfuse_v3_filter_pipeline.py @@ -5,7 +5,7 @@ version: 0.0.2 license: MIT description: A filter pipeline that uses Langfuse v3. -requirements: langfuse>=3.0.0 +requirements: langfuse>=3.10.5 """ import os