Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions backend/app/crud/evaluations/langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def create_langfuse_dataset_run(
ground_truth = result["ground_truth"]
response_id = result.get("response_id")
usage_raw = result.get("usage")
question_id = result.get("question_id")

dataset_item = dataset_items_map.get(item_id)
if not dataset_item:
Expand All @@ -105,6 +106,8 @@ def create_langfuse_dataset_run(
}
if response_id:
metadata["response_id"] = response_id
if question_id:
metadata["question_id"] = question_id

# Create trace with basic info
langfuse.trace(
Expand Down Expand Up @@ -250,7 +253,7 @@ def upload_dataset_to_langfuse(
f"duplication_factor={duplication_factor}"
)

def upload_item(item: dict[str, str], duplicate_num: int) -> bool:
def upload_item(item: dict[str, str], duplicate_num: int, question_id: str) -> bool:
try:
langfuse.create_dataset_item(
dataset_name=dataset_name,
Expand All @@ -260,6 +263,7 @@ def upload_item(item: dict[str, str], duplicate_num: int) -> bool:
"original_question": item["question"],
"duplicate_number": duplicate_num + 1,
"duplication_factor": duplication_factor,
"question_id": question_id,
},
)
return True
Expand All @@ -275,19 +279,22 @@ def upload_item(item: dict[str, str], duplicate_num: int) -> bool:
# Create or get dataset in Langfuse
dataset = langfuse.create_dataset(name=dataset_name)

upload_tasks = [
(item, duplicate_num)
for item in items
for duplicate_num in range(duplication_factor)
]
# Generate question_id for each unique question before duplication
# All duplicates of the same question share the same question_id
# Using 1-based integer IDs for easier sorting and grouping
upload_tasks = []
for idx, item in enumerate(items, start=1):
question_id = idx
for duplicate_num in range(duplication_factor):
upload_tasks.append((item, duplicate_num, question_id))

# Upload items concurrently using ThreadPoolExecutor
total_uploaded = 0
with ThreadPoolExecutor(max_workers=4) as executor:
# Submit all upload tasks and collect the futures
futures = []
for item, dup_num in upload_tasks:
future = executor.submit(upload_item, item, dup_num)
for item, dup_num, question_id in upload_tasks:
future = executor.submit(upload_item, item, dup_num, question_id)
futures.append(future)

for future in as_completed(futures):
Expand Down Expand Up @@ -416,6 +423,7 @@ def fetch_trace_scores_from_langfuse(
"question": "",
"llm_answer": "",
"ground_truth_answer": "",
"question_id": "",
"scores": [],
}

Expand All @@ -433,11 +441,12 @@ def fetch_trace_scores_from_langfuse(
elif isinstance(trace.output, str):
trace_data["llm_answer"] = trace.output

# Get ground truth from metadata
# Get ground truth and question_id from metadata
if trace.metadata and isinstance(trace.metadata, dict):
trace_data["ground_truth_answer"] = trace.metadata.get(
"ground_truth", ""
)
trace_data["question_id"] = trace.metadata.get("question_id", "")

# Add scores from this trace
if trace.scores:
Expand Down
4 changes: 4 additions & 0 deletions backend/app/crud/evaluations/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ def parse_evaluation_output(
question = dataset_item["input"].get("question", "")
ground_truth = dataset_item["expected_output"].get("answer", "")

# Extract question_id from dataset item metadata
question_id = dataset_item.get("metadata", {}).get("question_id")

results.append(
{
"item_id": item_id,
Expand All @@ -162,6 +165,7 @@ def parse_evaluation_output(
"ground_truth": ground_truth,
"response_id": response_id,
"usage": usage,
"question_id": question_id,
}
)

Expand Down
159 changes: 159 additions & 0 deletions backend/app/tests/crud/evaluations/test_langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,91 @@ def test_create_langfuse_dataset_run_with_cost_tracking(self) -> None:
mock_langfuse.flush.assert_called_once()
assert mock_langfuse.trace.call_count == 2

def test_create_langfuse_dataset_run_with_question_id(self) -> None:
"""Test that question_id is included in trace metadata."""
mock_langfuse = MagicMock()
mock_dataset = MagicMock()
mock_generation = MagicMock()

mock_item1 = MagicMock()
mock_item1.id = "item_1"
mock_item1.observe.return_value.__enter__.return_value = "trace_id_1"

mock_dataset.items = [mock_item1]
mock_langfuse.get_dataset.return_value = mock_dataset
mock_langfuse.generation.return_value = mock_generation

results = [
{
"item_id": "item_1",
"question": "What is 2+2?",
"generated_output": "4",
"ground_truth": "4",
"response_id": "resp_123",
"usage": {
"input_tokens": 10,
"output_tokens": 5,
"total_tokens": 15,
},
"question_id": 1,
},
]

trace_id_mapping = create_langfuse_dataset_run(
langfuse=mock_langfuse,
dataset_name="test_dataset",
run_name="test_run",
results=results,
model="gpt-4o",
)

assert len(trace_id_mapping) == 1

# Verify trace was called with question_id in metadata
trace_call = mock_langfuse.trace.call_args
assert trace_call.kwargs["metadata"]["question_id"] == 1

# Verify generation was called with question_id in metadata
generation_call = mock_langfuse.generation.call_args
assert generation_call.kwargs["metadata"]["question_id"] == 1

def test_create_langfuse_dataset_run_without_question_id(self) -> None:
"""Test that traces work without question_id (backwards compatibility)."""
mock_langfuse = MagicMock()
mock_dataset = MagicMock()

mock_item1 = MagicMock()
mock_item1.id = "item_1"
mock_item1.observe.return_value.__enter__.return_value = "trace_id_1"

mock_dataset.items = [mock_item1]
mock_langfuse.get_dataset.return_value = mock_dataset

# Results without question_id
results = [
{
"item_id": "item_1",
"question": "What is 2+2?",
"generated_output": "4",
"ground_truth": "4",
"response_id": "resp_123",
"usage": None,
},
]

trace_id_mapping = create_langfuse_dataset_run(
langfuse=mock_langfuse,
dataset_name="test_dataset",
run_name="test_run",
results=results,
)

assert len(trace_id_mapping) == 1

# Verify trace was called without question_id in metadata
trace_call = mock_langfuse.trace.call_args
assert "question_id" not in trace_call.kwargs["metadata"]


class TestUpdateTracesWithCosineScores:
"""Test updating Langfuse traces with cosine similarity scores."""
Expand Down Expand Up @@ -411,6 +496,80 @@ def test_upload_dataset_to_langfuse_duplication_metadata(self, valid_items):
assert duplicate_numbers.count(2) == 3
assert duplicate_numbers.count(3) == 3

def test_upload_dataset_to_langfuse_question_id_in_metadata(self, valid_items):
"""Test that question_id is included in metadata as integer."""
mock_langfuse = MagicMock()
mock_dataset = MagicMock()
mock_dataset.id = "dataset_123"
mock_langfuse.create_dataset.return_value = mock_dataset

upload_dataset_to_langfuse(
langfuse=mock_langfuse,
items=valid_items,
dataset_name="test_dataset",
duplication_factor=1,
)

calls = mock_langfuse.create_dataset_item.call_args_list
assert len(calls) == 3

question_ids = []
for call_args in calls:
metadata = call_args.kwargs.get("metadata", {})
assert "question_id" in metadata
assert metadata["question_id"] is not None
# Verify it's an integer (1-based index)
assert isinstance(metadata["question_id"], int)
question_ids.append(metadata["question_id"])

# Verify sequential IDs starting from 1
assert sorted(question_ids) == [1, 2, 3]

def test_upload_dataset_to_langfuse_same_question_id_for_duplicates(
self, valid_items
):
"""Test that all duplicates of the same question share the same question_id."""
mock_langfuse = MagicMock()
mock_dataset = MagicMock()
mock_dataset.id = "dataset_123"
mock_langfuse.create_dataset.return_value = mock_dataset

upload_dataset_to_langfuse(
langfuse=mock_langfuse,
items=valid_items,
dataset_name="test_dataset",
duplication_factor=3,
)

calls = mock_langfuse.create_dataset_item.call_args_list
assert len(calls) == 9 # 3 items * 3 duplicates

# Group calls by original_question
question_ids_by_question: dict[str, set[int]] = {}
for call_args in calls:
metadata = call_args.kwargs.get("metadata", {})
original_question = metadata.get("original_question")
question_id = metadata.get("question_id")

# Verify question_id is an integer
assert isinstance(question_id, int)

if original_question not in question_ids_by_question:
question_ids_by_question[original_question] = set()
question_ids_by_question[original_question].add(question_id)

# Verify each question has exactly one unique question_id across all duplicates
for question, question_ids in question_ids_by_question.items():
assert (
len(question_ids) == 1
), f"Question '{question}' has multiple question_ids: {question_ids}"

# Verify different questions have different question_ids (1, 2, 3)
all_unique_ids: set[int] = set()
for qid_set in question_ids_by_question.values():
all_unique_ids.update(qid_set)
assert all_unique_ids == {1, 2, 3} # 3 unique questions = IDs 1, 2, 3

def test_upload_dataset_to_langfuse_empty_items(self) -> None:
"""Test with empty items list."""
mock_langfuse = MagicMock()
Expand Down
31 changes: 31 additions & 0 deletions backend/app/tests/crud/evaluations/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def test_parse_evaluation_output_basic(self) -> None:
"id": "item1",
"input": {"question": "What is 2+2?"},
"expected_output": {"answer": "4"},
"metadata": {"question_id": 1},
}
]

Expand All @@ -64,6 +65,36 @@ def test_parse_evaluation_output_basic(self) -> None:
assert results[0]["ground_truth"] == "4"
assert results[0]["response_id"] == "resp_123"
assert results[0]["usage"]["total_tokens"] == 15
assert results[0]["question_id"] == 1

def test_parse_evaluation_output_without_question_id(self) -> None:
"""Test parsing dataset items without question_id (backwards compatibility)."""
raw_results = [
{
"custom_id": "item1",
"response": {
"body": {
"id": "resp_123",
"output": "Answer text",
"usage": {"total_tokens": 10},
}
},
}
]

dataset_items = [
{
"id": "item1",
"input": {"question": "Test question?"},
"expected_output": {"answer": "Test answer"},
# No metadata / question_id
}
]

results = parse_evaluation_output(raw_results, dataset_items)

assert len(results) == 1
assert results[0]["question_id"] is None

def test_parse_evaluation_output_simple_string(self) -> None:
"""Test parsing with simple string output."""
Expand Down