Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 12 additions & 18 deletions eval_protocol/adapters/langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,24 +155,12 @@ def _convert_trace_to_evaluation_row(self, trace: Any, include_tool_calls: bool
observations_response.data if hasattr(observations_response, "data") else list(observations_response)
)

# Look for conversation history in trace output or observations
messages = []
conversation_found = False

# Look for complete conversation in observations
if not conversation_found:
for obs in observations:
# Check each observation's output for complete conversation array
if hasattr(obs, "output") and obs.output:
conversation = self._extract_conversation_from_output(obs.output)
if conversation:
messages = conversation
conversation_found = True
break

# Fallback: try extracting from observations using old method
if not conversation_found:
messages = self._extract_messages_from_observations(observations, include_tool_calls)

for obs in observations:
if obs.name == "agent run":
Comment thread
xzrderek marked this conversation as resolved.
Outdated
messages = self._extract_conversation_from_output(obs.output)
break

if not messages:
return None
Expand Down Expand Up @@ -359,10 +347,16 @@ def _extract_conversation_from_output(self, output: Any) -> Optional[List[Messag

# Handle tool responses
name = None
tool_call_id = None
if role == "tool":
name = msg_data.get("name")
tool_call_id = msg_data.get("id")

messages.append(Message(role=role, content=content, name=name, tool_calls=tool_calls))
messages.append(
Message(
role=role, content=content, name=name, tool_calls=tool_calls, tool_call_id=tool_call_id
)
)

return messages if messages else None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:
if len(row.messages) == 0:
raise ValueError("Messages is empty. Please provide a non-empty dataset")

messages_payload = [{"role": m.role, "content": m.content} for m in row.messages]
messages_payload = [message.model_dump() for message in row.messages]

request_params = {"messages": messages_payload, **config.completion_params}
# Ensure caching is disabled only for this request (review feedback)
Expand Down
16 changes: 11 additions & 5 deletions eval_protocol/pytest/evaluation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
parse_ep_num_runs,
parse_ep_passed_threshold,
rollout_processor_with_retry,
split_multi_turn_rows,
)

from ..common_utils import load_jsonl
Expand All @@ -84,6 +85,7 @@ def evaluation_test(
steps: int = 30,
mode: EvaluationTestMode = "pointwise",
combine_datasets: bool = True,
split_multi_turn: bool = False,
Comment thread
xzrderek marked this conversation as resolved.
Outdated
logger: DatasetLogger | None = None,
exception_handler_config: ExceptionHandlerConfig | None = None,
) -> Callable[[TestFunction], TestFunction]:
Expand Down Expand Up @@ -150,6 +152,9 @@ def evaluation_test(
mode: Evaluation mode. "pointwise" (default) applies test function to each row (rollout result).
"groupwise" applies test function to a group of rollout results from the same original row (for use cases such as dpo/grpo).
"all" applies test function to the whole dataset.
split_multi_turn: If True, splits multi-turn conversations into individual evaluation rows
for each assistant response. Each row will contain the conversation context up to that point
and the assistant's response as ground truth. Useful for Arena-Hard-Auto style evaluations.
logger: DatasetLogger to use for logging. If not provided, a default logger will be used.
exception_handler_config: Configuration for exception handling and backoff retry logic.
If not provided, a default configuration will be used with common retryable exceptions.
Expand Down Expand Up @@ -244,6 +249,9 @@ def _log_eval_error(status: Status, rows: list[EvaluationRow] | None, passed: bo
else:
raise ValueError("No input dataset, input messages, or input rows provided")

if split_multi_turn:
data = split_multi_turn_rows(data)

for row in data:
# generate a stable row_id for each row
if row.input_metadata.row_id is None:
Expand All @@ -266,11 +274,9 @@ def _log_eval_error(status: Status, rows: list[EvaluationRow] | None, passed: bo
passed=None,
)
for row in data:
# Only set completion_params if they don't already exist
if not row.input_metadata.completion_params:
row.input_metadata.completion_params = (
completion_params if completion_params is not None else {}
)
row.input_metadata.completion_params = (
completion_params if completion_params is not None else {}
)
# Add mode to session_data
if row.input_metadata.session_data is None:
row.input_metadata.session_data = {}
Expand Down
20 changes: 11 additions & 9 deletions eval_protocol/pytest/evaluation_test_postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,17 @@ def postprocess(
passed = success_passed and standard_error_passed

# Update eval metadata passed field for all results
for result in all_results:
for r in result:
if r.eval_metadata is not None:
r.eval_metadata.passed = passed
if r.evaluation_result is not None:
r.evaluation_result.agg_score = agg_score
r.evaluation_result.standard_error = standard_error
r.execution_metadata.experiment_duration_seconds = experiment_duration_seconds
active_logger.log(r)
for results in all_results:
for result in results:
if result.eval_metadata is not None:
result.eval_metadata.passed = passed
if result.evaluation_result is not None:
if result.evaluation_result.agg_score is None:
result.evaluation_result.agg_score = agg_score
if result.evaluation_result.standard_error is None:
result.evaluation_result.standard_error = standard_error
result.execution_metadata.experiment_duration_seconds = experiment_duration_seconds
active_logger.log(result)

# Optional: print and/or persist a summary artifact for CI
try:
Expand Down
39 changes: 39 additions & 0 deletions eval_protocol/pytest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,3 +352,42 @@ def add_cost_metrics(row: EvaluationRow) -> None:
output_cost=output_cost,
total_cost=total_cost,
)


def split_multi_turn_rows(data: list[EvaluationRow]) -> list[EvaluationRow]:
"""
Split multi-turn conversation rows into individual evaluation rows for each assistant message.

Args:
data: List of EvaluationRow objects

Returns:
List of expanded EvaluationRow objects, one for each assistant message
"""
expanded_rows = []

for row in data:
messages = row.messages
tools = row.tools
input_metadata = row.input_metadata

assistant_positions = []
for i, message in enumerate(messages):
if message.role == "assistant":
assistant_positions.append(i)

# Create separate evaluation rows on each assistant message (where the comparison model will respond)
for assistant_pos in assistant_positions:
messages_before_assistant = messages[:assistant_pos]
ground_truth_message = messages[assistant_pos].content

expanded_rows.append(
EvaluationRow(
messages=messages_before_assistant,
tools=tools,
input_metadata=input_metadata,
ground_truth=ground_truth_message,
)
)

return expanded_rows
206 changes: 206 additions & 0 deletions eval_protocol/quickstart/llm_judge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
"""
Default LLM judge for Eval Protocol. Inspired by Arena-Hard-Auto.
"""

import os
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
import pandas as pd
from tqdm import tqdm

import pytest

from eval_protocol.models import EvaluateResult, EvaluationRow, MetricResult
from eval_protocol.pytest import evaluation_test
from eval_protocol.pytest.default_single_turn_rollout_process import SingleTurnRolloutProcessor
from eval_protocol.quickstart.utils import pairwise_judgment

# Langfuse client setup
try:
from langfuse import get_client # pyright: ignore[reportPrivateImportUsage]

LANGFUSE_AVAILABLE = True
langfuse = get_client()
except ImportError:
LANGFUSE_AVAILABLE = False
langfuse = None


def fetch_langfuse_traces_as_evaluation_rows(
hours_back: int = 168, tags: Optional[List[str]] = None
) -> List[EvaluationRow]:
try:
from eval_protocol.adapters.langfuse import create_langfuse_adapter

if not os.getenv("LANGFUSE_PUBLIC_KEY") or not os.getenv("LANGFUSE_SECRET_KEY"):
raise ValueError("LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY must be set")

adapter = create_langfuse_adapter(
public_key=os.getenv("LANGFUSE_PUBLIC_KEY"), # pyright: ignore[reportArgumentType]
secret_key=os.getenv("LANGFUSE_SECRET_KEY"), # pyright: ignore[reportArgumentType]
host=os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"),
)

now = datetime.now()
from_timestamp = now - timedelta(hours=hours_back)

return adapter.get_evaluation_rows(
limit=20, from_timestamp=from_timestamp, to_timestamp=now, include_tool_calls=True, tags=tags
)

except Exception as e:
print(f"❌ LangfuseAdapter failed: {e}")
return []


@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Skip in CI")
@pytest.mark.asyncio
@evaluation_test(
input_rows=[fetch_langfuse_traces_as_evaluation_rows()],
completion_params=[{"model": "gpt-4o"}],
rollout_processor=SingleTurnRolloutProcessor(),
split_multi_turn=True,
mode="all",
)
async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]:
"""
Simplified LLM Judge for Arena-Hard-Auto style pairwise comparisons.

Each row contains:
- messages[:-1]: Question/prompt (conversation context)
- messages[-1]: Model B's answer (comparison model response)
- ground_truth: Model A's answer (original assistant response)
"""

if not rows:
print("❌ No evaluation rows provided")
return rows

print(f"🔄 Processing {len(rows)} evaluation rows for LLM judging...")

model_name = rows[0].input_metadata.completion_params.get("model", "unknown_model")

# Generate judgments directly from rows
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor

def run_judgment(row: EvaluationRow) -> Optional[Dict[str, Any]]:
"""Run pairwise judgment for a single evaluation row."""
if not row.messages:
return None

# Extract question and answers
question_text = "\n".join([f"{msg.role}: {msg.content}" for msg in row.messages[:-1]])
model_a_answer = row.ground_truth # Original response
model_b_answer = row.messages[-1].content # Comparison model response

games = []

# Round 1: A vs B (original vs comparison)
result1 = pairwise_judgment(
question_text=question_text,
answer_a=model_a_answer,
answer_b=model_b_answer,
)
games.append(result1)

# Round 2: B vs A (comparison vs original)
result2 = pairwise_judgment(
question_text=question_text,
answer_a=model_b_answer,
answer_b=model_a_answer,
)
games.append(result2)

row.evaluation_result = EvaluateResult(
score=0.0,
reason=f"LLM Judge comparison: Round 1: {result1['score']}, Round 2: {result2['score']}"
if result1 and result2
else "Failed to get judgement scores",
metrics={
"round1_judgment": MetricResult(
score=0.0, reason=result1["judgment"] if result1 else "Failed to get judgment reason"
),
"round2_judgment": MetricResult(
score=0.0, reason=result2["judgment"] if result2 else "Failed to get judgment reason"
),
},
)

return {"model": model_name, "games": games}

judgments = []
max_workers = 64

with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(run_judgment, row) for row in rows]

for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Generating judgments"):
result = future.result()
if result and result["games"][0] and result["games"][1]:
judgments.append(result)

if not judgments:
print("❌ No valid judgments generated")
return rows

print(f"✅ Generated {len(judgments)} valid judgments")

# Convert to scores for leaderboard
label_to_score = {
"A>B": [1],
"A>>B": [1] * 3,
"A=B": [0.5],
"A<<B": [0] * 3,
"A<B": [0],
"B>A": [0],
"B>>A": [0] * 3,
"B=A": [0.5],
"B<<A": [1] * 3,
"B<A": [1],
}

# Extract scores from judgments
scores_data = []
for judgment in judgments:
game1, game2 = judgment["games"]
if game1 and game2 and game1.get("score") and game2.get("score"):
# Convert judgment scores to numerical scores
scores = label_to_score[game2["score"]] + [1 - s for s in label_to_score[game1["score"]]]
for score in scores:
scores_data.append(score)

if not scores_data:
print("❌ No valid scores extracted")
return rows

# Create DataFrame (single column of scores)
battles = pd.DataFrame({"score": scores_data})

# Bootstrap sampling for calculating relative performance to original model at fixed 50%
bootstrap_means = [
battles.sample(frac=1.0, replace=True)["score"].mean() for _ in tqdm(range(100), desc="Bootstrap sampling")
]

# Calculate final scores
bootstraps = pd.Series(bootstrap_means)
mean_score = bootstraps.mean()
lower_score = bootstraps.quantile(0.05)
upper_score = bootstraps.quantile(0.95)

# Print leaderboard
print("\n##### LLM Judge Results (90th percentile CI) #####")

clean_model_name = model_name.split("/")[-1] # Clean model name

print(f"{clean_model_name}: {mean_score:.1%} (CI: {lower_score:.1%} - {upper_score:.1%})")
print("original: 50.0% (CI: 50.0% - 50.0%)")

for row in rows:
# This is hacky, but it's the only way to get the score into the evaluation result in our current pattern
if row.evaluation_result:
row.evaluation_result.score = mean_score
# Standard error approximation from 90% CI: SE ≈ (upper - lower) / (2 × 1.645), but this is not quite right bc it assumes a normal distribution
row.evaluation_result.standard_error = (upper_score - lower_score) / (2 * 1.645)

return rows
Loading
Loading