diff --git a/examples/filters/langfuse_v3_filter_pipeline.py b/examples/filters/langfuse_v3_filter_pipeline.py index a046eeea..b496a16c 100644 --- a/examples/filters/langfuse_v3_filter_pipeline.py +++ b/examples/filters/langfuse_v3_filter_pipeline.py @@ -1,19 +1,18 @@ """ title: Langfuse Filter Pipeline for v3 author: open-webui -date: 2025-07-31 -version: 0.0.1 +date: 2025-12-07 +version: 0.0.2 license: MIT description: A filter pipeline that uses Langfuse v3. -requirements: langfuse>=3.0.0 +requirements: langfuse>=3.10.5 """ -from typing import List, Optional import os import uuid import json - +from typing import List, Optional from utils.pipelines.main import get_last_assistant_message from pydantic import BaseModel from langfuse import Langfuse @@ -212,7 +211,7 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: "session_id": chat_id, "interface": "open-webui", } - + # Create trace with all necessary information trace = self.langfuse.start_span( name=f"chat:{chat_id}", @@ -245,18 +244,14 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: "interface": "open-webui", } trace.update_trace( + user_id=user_email, + session_id=chat_id, tags=tags_list if tags_list else None, + input=body, metadata=trace_metadata, ) - - # Update metadata with type - metadata["type"] = task_name - metadata["interface"] = "open-webui" - # Log user input as event try: - trace = self.chat_traces[chat_id] - # Create complete event metadata event_metadata = { **metadata, @@ -276,7 +271,6 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: self.log(f"User input event logged for chat_id: {chat_id}") except Exception as e: self.log(f"Failed to log user input event: {e}") - return body async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: @@ -295,40 +289,37 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: if chat_id == "local": session_id = body.get("session_id") chat_id = f"temporary-session-{session_id}" - - metadata = body.get("metadata", {}) - # Defaulting to 'llm_response' if no task is provided - task_name = metadata.get("task", "llm_response") - - # Build tags - tags_list = self._build_tags(task_name) - if chat_id not in self.chat_traces: self.log(f"[WARNING] No matching trace found for chat_id: {chat_id}, attempting to re-register.") # Re-run inlet to register if somehow missing return await self.inlet(body, user) - self.chat_traces[chat_id] + trace = self.chat_traces[chat_id] + + metadata = body.get("metadata", {}) + task_name = metadata.get("task", "llm_response") + tags_list = self._build_tags(task_name) assistant_message = get_last_assistant_message(body["messages"]) assistant_message_obj = get_last_assistant_message_obj(body["messages"]) - usage = None + usage_details = None if assistant_message_obj: info = assistant_message_obj.get("usage", {}) if isinstance(info, dict): - input_tokens = info.get("prompt_eval_count") or info.get("prompt_tokens") - output_tokens = info.get("eval_count") or info.get("completion_tokens") - if input_tokens is not None and output_tokens is not None: - usage = { - "input": input_tokens, - "output": output_tokens, - "unit": "TOKENS", + input_tokens = info.get("prompt_eval_count") or info.get("prompt_tokens") or info.get("input_tokens") + output_tokens = info.get("eval_count") or info.get("completion_tokens") or info.get("output_tokens") + if input_tokens is not None or output_tokens is not None: + usage_details = { + "input": int(input_tokens) if input_tokens is not None else 0, + "output": int(output_tokens) if output_tokens is not None else 0, + "total": (int(input_tokens) if input_tokens is not None else 0) + + (int(output_tokens) if output_tokens is not None else 0), } - self.log(f"Usage data extracted: {usage}") + self.log(f"Usage data extracted: {usage_details}") # Update the trace with complete output information - trace = self.chat_traces[chat_id] + metadata["type"] = task_name metadata["interface"] = "open-webui" @@ -340,16 +331,9 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: "session_id": chat_id, "interface": "open-webui", "task": task_name, + } - - # Update trace with output and complete metadata - trace.update_trace( - output=assistant_message, - metadata=complete_trace_metadata, - tags=tags_list if tags_list else None, - ) - # Outlet: Always create LLM generation (this is the LLM response) # Determine which model value to use based on the use_model_name valve model_id = self.model_names.get(chat_id, {}).get("id", body.get("model")) model_name = self.model_names.get(chat_id, {}).get("name", "unknown") @@ -361,24 +345,21 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: else model_id ) - # Add both values to metadata regardless of valve setting - metadata["model_id"] = model_id - metadata["model_name"] = model_name - + # Create complete generation metadata + generation_metadata = { + **complete_trace_metadata, + "type": "llm_response", + "model_id": model_id, + "model_name": model_name, + "generation_id": str(uuid.uuid4()), + } + + trace = self.chat_traces[chat_id] + # Create LLM generation for the response try: - trace = self.chat_traces[chat_id] - - # Create complete generation metadata - generation_metadata = { - **complete_trace_metadata, - "type": "llm_response", - "model_id": model_id, - "model_name": model_name, - "generation_id": str(uuid.uuid4()), - } - - generation = trace.start_generation( + generation = trace.start_observation( + as_type='generation', name=f"llm_response:{str(uuid.uuid4())}", model=model_value, input=body["messages"], @@ -387,14 +368,21 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: ) # Update with usage if available - if usage: - generation.update(usage=usage) + if usage_details: + generation.update(usage_details=usage_details) generation.end() self.log(f"LLM generation completed for chat_id: {chat_id}") except Exception as e: self.log(f"Failed to create LLM generation: {e}") + # Update trace with output and complete metadata + trace.update_trace( + output=assistant_message, + metadata=complete_trace_metadata, + tags=tags_list if tags_list else None, + ) + trace.end() # Flush data to Langfuse try: if self.langfuse: