diff --git a/backend/app/crud/evaluations/langfuse.py b/backend/app/crud/evaluations/langfuse.py index eced1e7b6..477de7e57 100644 --- a/backend/app/crud/evaluations/langfuse.py +++ b/backend/app/crud/evaluations/langfuse.py @@ -88,6 +88,7 @@ def create_langfuse_dataset_run( ground_truth = result["ground_truth"] response_id = result.get("response_id") usage_raw = result.get("usage") + question_id = result.get("question_id") dataset_item = dataset_items_map.get(item_id) if not dataset_item: @@ -105,6 +106,8 @@ def create_langfuse_dataset_run( } if response_id: metadata["response_id"] = response_id + if question_id: + metadata["question_id"] = question_id # Create trace with basic info langfuse.trace( @@ -250,7 +253,7 @@ def upload_dataset_to_langfuse( f"duplication_factor={duplication_factor}" ) - def upload_item(item: dict[str, str], duplicate_num: int) -> bool: + def upload_item(item: dict[str, str], duplicate_num: int, question_id: str) -> bool: try: langfuse.create_dataset_item( dataset_name=dataset_name, @@ -260,6 +263,7 @@ def upload_item(item: dict[str, str], duplicate_num: int) -> bool: "original_question": item["question"], "duplicate_number": duplicate_num + 1, "duplication_factor": duplication_factor, + "question_id": question_id, }, ) return True @@ -275,19 +279,22 @@ def upload_item(item: dict[str, str], duplicate_num: int) -> bool: # Create or get dataset in Langfuse dataset = langfuse.create_dataset(name=dataset_name) - upload_tasks = [ - (item, duplicate_num) - for item in items - for duplicate_num in range(duplication_factor) - ] + # Generate question_id for each unique question before duplication + # All duplicates of the same question share the same question_id + # Using 1-based integer IDs for easier sorting and grouping + upload_tasks = [] + for idx, item in enumerate(items, start=1): + question_id = idx + for duplicate_num in range(duplication_factor): + upload_tasks.append((item, duplicate_num, question_id)) # Upload items concurrently using ThreadPoolExecutor total_uploaded = 0 with ThreadPoolExecutor(max_workers=4) as executor: # Submit all upload tasks and collect the futures futures = [] - for item, dup_num in upload_tasks: - future = executor.submit(upload_item, item, dup_num) + for item, dup_num, question_id in upload_tasks: + future = executor.submit(upload_item, item, dup_num, question_id) futures.append(future) for future in as_completed(futures): @@ -416,6 +423,7 @@ def fetch_trace_scores_from_langfuse( "question": "", "llm_answer": "", "ground_truth_answer": "", + "question_id": "", "scores": [], } @@ -433,11 +441,12 @@ def fetch_trace_scores_from_langfuse( elif isinstance(trace.output, str): trace_data["llm_answer"] = trace.output - # Get ground truth from metadata + # Get ground truth and question_id from metadata if trace.metadata and isinstance(trace.metadata, dict): trace_data["ground_truth_answer"] = trace.metadata.get( "ground_truth", "" ) + trace_data["question_id"] = trace.metadata.get("question_id", "") # Add scores from this trace if trace.scores: diff --git a/backend/app/crud/evaluations/processing.py b/backend/app/crud/evaluations/processing.py index 076ac9f32..f77a5925f 100644 --- a/backend/app/crud/evaluations/processing.py +++ b/backend/app/crud/evaluations/processing.py @@ -154,6 +154,9 @@ def parse_evaluation_output( question = dataset_item["input"].get("question", "") ground_truth = dataset_item["expected_output"].get("answer", "") + # Extract question_id from dataset item metadata + question_id = dataset_item.get("metadata", {}).get("question_id") + results.append( { "item_id": item_id, @@ -162,6 +165,7 @@ def parse_evaluation_output( "ground_truth": ground_truth, "response_id": response_id, "usage": usage, + "question_id": question_id, } ) diff --git a/backend/app/tests/crud/evaluations/test_langfuse.py b/backend/app/tests/crud/evaluations/test_langfuse.py index 21a9d2262..198972e4d 100644 --- a/backend/app/tests/crud/evaluations/test_langfuse.py +++ b/backend/app/tests/crud/evaluations/test_langfuse.py @@ -274,6 +274,91 @@ def test_create_langfuse_dataset_run_with_cost_tracking(self) -> None: mock_langfuse.flush.assert_called_once() assert mock_langfuse.trace.call_count == 2 + def test_create_langfuse_dataset_run_with_question_id(self) -> None: + """Test that question_id is included in trace metadata.""" + mock_langfuse = MagicMock() + mock_dataset = MagicMock() + mock_generation = MagicMock() + + mock_item1 = MagicMock() + mock_item1.id = "item_1" + mock_item1.observe.return_value.__enter__.return_value = "trace_id_1" + + mock_dataset.items = [mock_item1] + mock_langfuse.get_dataset.return_value = mock_dataset + mock_langfuse.generation.return_value = mock_generation + + results = [ + { + "item_id": "item_1", + "question": "What is 2+2?", + "generated_output": "4", + "ground_truth": "4", + "response_id": "resp_123", + "usage": { + "input_tokens": 10, + "output_tokens": 5, + "total_tokens": 15, + }, + "question_id": 1, + }, + ] + + trace_id_mapping = create_langfuse_dataset_run( + langfuse=mock_langfuse, + dataset_name="test_dataset", + run_name="test_run", + results=results, + model="gpt-4o", + ) + + assert len(trace_id_mapping) == 1 + + # Verify trace was called with question_id in metadata + trace_call = mock_langfuse.trace.call_args + assert trace_call.kwargs["metadata"]["question_id"] == 1 + + # Verify generation was called with question_id in metadata + generation_call = mock_langfuse.generation.call_args + assert generation_call.kwargs["metadata"]["question_id"] == 1 + + def test_create_langfuse_dataset_run_without_question_id(self) -> None: + """Test that traces work without question_id (backwards compatibility).""" + mock_langfuse = MagicMock() + mock_dataset = MagicMock() + + mock_item1 = MagicMock() + mock_item1.id = "item_1" + mock_item1.observe.return_value.__enter__.return_value = "trace_id_1" + + mock_dataset.items = [mock_item1] + mock_langfuse.get_dataset.return_value = mock_dataset + + # Results without question_id + results = [ + { + "item_id": "item_1", + "question": "What is 2+2?", + "generated_output": "4", + "ground_truth": "4", + "response_id": "resp_123", + "usage": None, + }, + ] + + trace_id_mapping = create_langfuse_dataset_run( + langfuse=mock_langfuse, + dataset_name="test_dataset", + run_name="test_run", + results=results, + ) + + assert len(trace_id_mapping) == 1 + + # Verify trace was called without question_id in metadata + trace_call = mock_langfuse.trace.call_args + assert "question_id" not in trace_call.kwargs["metadata"] + class TestUpdateTracesWithCosineScores: """Test updating Langfuse traces with cosine similarity scores.""" @@ -411,6 +496,80 @@ def test_upload_dataset_to_langfuse_duplication_metadata(self, valid_items): assert duplicate_numbers.count(2) == 3 assert duplicate_numbers.count(3) == 3 + def test_upload_dataset_to_langfuse_question_id_in_metadata(self, valid_items): + """Test that question_id is included in metadata as integer.""" + mock_langfuse = MagicMock() + mock_dataset = MagicMock() + mock_dataset.id = "dataset_123" + mock_langfuse.create_dataset.return_value = mock_dataset + + upload_dataset_to_langfuse( + langfuse=mock_langfuse, + items=valid_items, + dataset_name="test_dataset", + duplication_factor=1, + ) + + calls = mock_langfuse.create_dataset_item.call_args_list + assert len(calls) == 3 + + question_ids = [] + for call_args in calls: + metadata = call_args.kwargs.get("metadata", {}) + assert "question_id" in metadata + assert metadata["question_id"] is not None + # Verify it's an integer (1-based index) + assert isinstance(metadata["question_id"], int) + question_ids.append(metadata["question_id"]) + + # Verify sequential IDs starting from 1 + assert sorted(question_ids) == [1, 2, 3] + + def test_upload_dataset_to_langfuse_same_question_id_for_duplicates( + self, valid_items + ): + """Test that all duplicates of the same question share the same question_id.""" + mock_langfuse = MagicMock() + mock_dataset = MagicMock() + mock_dataset.id = "dataset_123" + mock_langfuse.create_dataset.return_value = mock_dataset + + upload_dataset_to_langfuse( + langfuse=mock_langfuse, + items=valid_items, + dataset_name="test_dataset", + duplication_factor=3, + ) + + calls = mock_langfuse.create_dataset_item.call_args_list + assert len(calls) == 9 # 3 items * 3 duplicates + + # Group calls by original_question + question_ids_by_question: dict[str, set[int]] = {} + for call_args in calls: + metadata = call_args.kwargs.get("metadata", {}) + original_question = metadata.get("original_question") + question_id = metadata.get("question_id") + + # Verify question_id is an integer + assert isinstance(question_id, int) + + if original_question not in question_ids_by_question: + question_ids_by_question[original_question] = set() + question_ids_by_question[original_question].add(question_id) + + # Verify each question has exactly one unique question_id across all duplicates + for question, question_ids in question_ids_by_question.items(): + assert ( + len(question_ids) == 1 + ), f"Question '{question}' has multiple question_ids: {question_ids}" + + # Verify different questions have different question_ids (1, 2, 3) + all_unique_ids: set[int] = set() + for qid_set in question_ids_by_question.values(): + all_unique_ids.update(qid_set) + assert all_unique_ids == {1, 2, 3} # 3 unique questions = IDs 1, 2, 3 + def test_upload_dataset_to_langfuse_empty_items(self) -> None: """Test with empty items list.""" mock_langfuse = MagicMock() diff --git a/backend/app/tests/crud/evaluations/test_processing.py b/backend/app/tests/crud/evaluations/test_processing.py index afb0ac0ed..06ef838a5 100644 --- a/backend/app/tests/crud/evaluations/test_processing.py +++ b/backend/app/tests/crud/evaluations/test_processing.py @@ -52,6 +52,7 @@ def test_parse_evaluation_output_basic(self) -> None: "id": "item1", "input": {"question": "What is 2+2?"}, "expected_output": {"answer": "4"}, + "metadata": {"question_id": 1}, } ] @@ -64,6 +65,36 @@ def test_parse_evaluation_output_basic(self) -> None: assert results[0]["ground_truth"] == "4" assert results[0]["response_id"] == "resp_123" assert results[0]["usage"]["total_tokens"] == 15 + assert results[0]["question_id"] == 1 + + def test_parse_evaluation_output_without_question_id(self) -> None: + """Test parsing dataset items without question_id (backwards compatibility).""" + raw_results = [ + { + "custom_id": "item1", + "response": { + "body": { + "id": "resp_123", + "output": "Answer text", + "usage": {"total_tokens": 10}, + } + }, + } + ] + + dataset_items = [ + { + "id": "item1", + "input": {"question": "Test question?"}, + "expected_output": {"answer": "Test answer"}, + # No metadata / question_id + } + ] + + results = parse_evaluation_output(raw_results, dataset_items) + + assert len(results) == 1 + assert results[0]["question_id"] is None def test_parse_evaluation_output_simple_string(self) -> None: """Test parsing with simple string output."""