Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 47 additions & 59 deletions examples/filters/langfuse_v3_filter_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
"""
title: Langfuse Filter Pipeline for v3
author: open-webui
date: 2025-07-31
version: 0.0.1
date: 2025-12-07
version: 0.0.2
license: MIT
description: A filter pipeline that uses Langfuse v3.
requirements: langfuse>=3.0.0
requirements: langfuse>=3.10.5
"""

from typing import List, Optional
import os
import uuid
import json


from typing import List, Optional
from utils.pipelines.main import get_last_assistant_message
from pydantic import BaseModel
from langfuse import Langfuse
Expand Down Expand Up @@ -212,7 +211,7 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
"session_id": chat_id,
"interface": "open-webui",
}

# Create trace with all necessary information
trace = self.langfuse.start_span(
name=f"chat:{chat_id}",
Expand Down Expand Up @@ -245,18 +244,14 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
"interface": "open-webui",
}
trace.update_trace(
user_id=user_email,
session_id=chat_id,
tags=tags_list if tags_list else None,
input=body,
metadata=trace_metadata,
)

# Update metadata with type
metadata["type"] = task_name
metadata["interface"] = "open-webui"

# Log user input as event
try:
trace = self.chat_traces[chat_id]

# Create complete event metadata
event_metadata = {
**metadata,
Expand All @@ -276,7 +271,6 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
self.log(f"User input event logged for chat_id: {chat_id}")
except Exception as e:
self.log(f"Failed to log user input event: {e}")

return body

async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
Expand All @@ -295,40 +289,37 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
if chat_id == "local":
session_id = body.get("session_id")
chat_id = f"temporary-session-{session_id}"

metadata = body.get("metadata", {})
# Defaulting to 'llm_response' if no task is provided
task_name = metadata.get("task", "llm_response")

# Build tags
tags_list = self._build_tags(task_name)

if chat_id not in self.chat_traces:
self.log(f"[WARNING] No matching trace found for chat_id: {chat_id}, attempting to re-register.")
# Re-run inlet to register if somehow missing
return await self.inlet(body, user)

self.chat_traces[chat_id]
trace = self.chat_traces[chat_id]

metadata = body.get("metadata", {})
task_name = metadata.get("task", "llm_response")
tags_list = self._build_tags(task_name)

assistant_message = get_last_assistant_message(body["messages"])
assistant_message_obj = get_last_assistant_message_obj(body["messages"])

usage = None
usage_details = None
if assistant_message_obj:
info = assistant_message_obj.get("usage", {})
if isinstance(info, dict):
input_tokens = info.get("prompt_eval_count") or info.get("prompt_tokens")
output_tokens = info.get("eval_count") or info.get("completion_tokens")
if input_tokens is not None and output_tokens is not None:
usage = {
"input": input_tokens,
"output": output_tokens,
"unit": "TOKENS",
input_tokens = info.get("prompt_eval_count") or info.get("prompt_tokens") or info.get("input_tokens")
output_tokens = info.get("eval_count") or info.get("completion_tokens") or info.get("output_tokens")
if input_tokens is not None or output_tokens is not None:
usage_details = {
"input": int(input_tokens) if input_tokens is not None else 0,
"output": int(output_tokens) if output_tokens is not None else 0,
"total": (int(input_tokens) if input_tokens is not None else 0) +
(int(output_tokens) if output_tokens is not None else 0),
}
self.log(f"Usage data extracted: {usage}")
self.log(f"Usage data extracted: {usage_details}")

# Update the trace with complete output information
trace = self.chat_traces[chat_id]


metadata["type"] = task_name
metadata["interface"] = "open-webui"
Expand All @@ -340,16 +331,9 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
"session_id": chat_id,
"interface": "open-webui",
"task": task_name,

}

# Update trace with output and complete metadata
trace.update_trace(
output=assistant_message,
metadata=complete_trace_metadata,
tags=tags_list if tags_list else None,
)

# Outlet: Always create LLM generation (this is the LLM response)
# Determine which model value to use based on the use_model_name valve
model_id = self.model_names.get(chat_id, {}).get("id", body.get("model"))
model_name = self.model_names.get(chat_id, {}).get("name", "unknown")
Expand All @@ -361,24 +345,21 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
else model_id
)

# Add both values to metadata regardless of valve setting
metadata["model_id"] = model_id
metadata["model_name"] = model_name

# Create complete generation metadata
generation_metadata = {
**complete_trace_metadata,
"type": "llm_response",
"model_id": model_id,
"model_name": model_name,
"generation_id": str(uuid.uuid4()),
}

trace = self.chat_traces[chat_id]

# Create LLM generation for the response
try:
trace = self.chat_traces[chat_id]

# Create complete generation metadata
generation_metadata = {
**complete_trace_metadata,
"type": "llm_response",
"model_id": model_id,
"model_name": model_name,
"generation_id": str(uuid.uuid4()),
}

generation = trace.start_generation(
generation = trace.start_observation(
as_type='generation',
name=f"llm_response:{str(uuid.uuid4())}",
model=model_value,
input=body["messages"],
Expand All @@ -387,14 +368,21 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
)

# Update with usage if available
if usage:
generation.update(usage=usage)
if usage_details:
generation.update(usage_details=usage_details)

generation.end()
self.log(f"LLM generation completed for chat_id: {chat_id}")
except Exception as e:
self.log(f"Failed to create LLM generation: {e}")

# Update trace with output and complete metadata
trace.update_trace(
output=assistant_message,
metadata=complete_trace_metadata,
tags=tags_list if tags_list else None,
)
trace.end()
# Flush data to Langfuse
try:
if self.langfuse:
Expand Down