Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions backend/app/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
onboarding,
credentials,
cron,
evaluations,
fine_tuning,
model_evaluation,
collection_job,
)
from app.api.routes.evaluations import dataset as evaluation_dataset, evaluation
from app.core.config import settings

api_router = APIRouter()
Expand All @@ -37,7 +37,8 @@
api_router.include_router(cron.router)
api_router.include_router(documents.router)
api_router.include_router(doc_transformation_job.router)
api_router.include_router(evaluations.router)
api_router.include_router(evaluation_dataset.router)
api_router.include_router(evaluation.router)
api_router.include_router(llm.router)
api_router.include_router(login.router)
api_router.include_router(onboarding.router)
Expand Down
13 changes: 0 additions & 13 deletions backend/app/api/routes/evaluations/__init__.py

This file was deleted.

2 changes: 1 addition & 1 deletion backend/app/api/routes/evaluations/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

logger = logging.getLogger(__name__)

router = APIRouter()
router = APIRouter(prefix="/evaluations/datasets", tags=["Evaluation"])


def _dataset_to_response(dataset: EvaluationDataset) -> DatasetUploadResponse:
Expand Down
2 changes: 1 addition & 1 deletion backend/app/api/routes/evaluations/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

logger = logging.getLogger(__name__)

router = APIRouter()
router = APIRouter(prefix="/evaluations", tags=["Evaluation"])


@router.post(
Expand Down
8 changes: 8 additions & 0 deletions backend/app/crud/evaluations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,11 @@
process_completed_embedding_batch,
process_completed_evaluation,
)
from app.crud.evaluations.score import (
CategoricalSummaryScore,
EvaluationScore,
NumericSummaryScore,
SummaryScore,
TraceData,
TraceScore,
)
32 changes: 25 additions & 7 deletions backend/app/crud/evaluations/core.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import logging
from typing import Any

from langfuse import Langfuse
from sqlmodel import Session, select

from app.core.util import now
from app.crud.evaluations.langfuse import fetch_trace_scores_from_langfuse
from app.crud.evaluations.score import EvaluationScore
from app.models import EvaluationRun

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -201,13 +201,13 @@ def get_or_fetch_score(
eval_run: EvaluationRun,
langfuse: Langfuse,
force_refetch: bool = False,
) -> dict[str, Any]:
) -> EvaluationScore:
"""
Get cached score with trace info or fetch from Langfuse and update.

This function implements a cache-on-first-request pattern:
- If score already has 'traces' key, return it
- Otherwise, fetch from Langfuse, update score column, and return
- Otherwise, fetch from Langfuse, merge with existing summary_scores, and return
- If force_refetch is True, always fetch fresh data from Langfuse

Args:
Expand All @@ -224,8 +224,8 @@ def get_or_fetch_score(
Exception: If Langfuse API calls fail
"""
# Check if score already exists with traces
has_score = eval_run.score is not None and "traces" in eval_run.score
if not force_refetch and has_score:
has_traces = eval_run.score is not None and "traces" in eval_run.score
if not force_refetch and has_traces:
logger.info(
f"[get_or_fetch_score] Returning existing score | evaluation_id={eval_run.id}"
)
Expand All @@ -237,13 +237,31 @@ def get_or_fetch_score(
f"run={eval_run.run_name} | force_refetch={force_refetch}"
)

# Get existing summary_scores if any (e.g., cosine_similarity from cron job)
existing_summary_scores = []
if eval_run.score and "summary_scores" in eval_run.score:
existing_summary_scores = eval_run.score.get("summary_scores", [])

# Fetch from Langfuse
score = fetch_trace_scores_from_langfuse(
langfuse_score = fetch_trace_scores_from_langfuse(
langfuse=langfuse,
dataset_name=eval_run.dataset_name,
run_name=eval_run.run_name,
)

# Merge summary_scores: existing scores + new scores from Langfuse
existing_scores_map = {s["name"]: s for s in existing_summary_scores}
for langfuse_summary in langfuse_score.get("summary_scores", []):
existing_scores_map[langfuse_summary["name"]] = langfuse_summary

merged_summary_scores = list(existing_scores_map.values())

# Build final score with merged summary_scores and traces
score: EvaluationScore = {
"summary_scores": merged_summary_scores,
"traces": langfuse_score.get("traces", []),
}

# Update score column using existing helper
update_evaluation_run(session=session, eval_run=eval_run, score=score)

Expand All @@ -260,7 +278,7 @@ def save_score(
eval_run_id: int,
organization_id: int,
project_id: int,
score: dict[str, Any],
score: EvaluationScore,
) -> EvaluationRun | None:
"""
Save score to evaluation run with its own session.
Expand Down
12 changes: 7 additions & 5 deletions backend/app/crud/evaluations/langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import numpy as np
from langfuse import Langfuse

from app.crud.evaluations.score import EvaluationScore, TraceData, TraceScore

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -319,7 +321,7 @@ def fetch_trace_scores_from_langfuse(
langfuse: Langfuse,
dataset_name: str,
run_name: str,
) -> dict[str, Any]:
) -> EvaluationScore:
"""
Fetch trace scores from Langfuse for an evaluation run.

Expand Down Expand Up @@ -402,14 +404,14 @@ def fetch_trace_scores_from_langfuse(
)

# 3. Fetch trace details with scores for each trace
traces = []
traces: list[TraceData] = []
# Track score aggregations by name: {name: {"data_type": str, "values": list}}
score_aggregations: dict[str, dict[str, Any]] = {}

for trace_id in trace_ids:
try:
trace = langfuse.api.trace.get(trace_id)
trace_data: dict[str, Any] = {
trace_data: TraceData = {
"trace_id": trace_id,
"question": "",
"llm_answer": "",
Expand Down Expand Up @@ -453,7 +455,7 @@ def fetch_trace_scores_from_langfuse(
):
score_value = round(float(score_value), 2)

score_entry: dict[str, Any] = {
score_entry: TraceScore = {
"name": score_name,
"value": score_value,
"data_type": data_type,
Expand Down Expand Up @@ -534,7 +536,7 @@ def fetch_trace_scores_from_langfuse(
}
)

result: dict[str, Any] = {
result: EvaluationScore = {
"summary_scores": summary_scores,
"traces": traces,
}
Expand Down
26 changes: 12 additions & 14 deletions backend/app/crud/evaluations/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,21 +382,19 @@ async def process_completed_embedding_batch(
# Step 4: Calculate similarity scores
similarity_stats = calculate_average_similarity(embedding_pairs=embedding_pairs)

# Step 5: Update evaluation_run with scores
if eval_run.score is None:
eval_run.score = {}

eval_run.score["cosine_similarity"] = {
"avg": similarity_stats["cosine_similarity_avg"],
"std": similarity_stats["cosine_similarity_std"],
"total_pairs": similarity_stats["total_pairs"],
}

# Optionally store per-item scores if not too large
if len(similarity_stats.get("per_item_scores", [])) <= 100:
eval_run.score["cosine_similarity"]["per_item_scores"] = similarity_stats[
"per_item_scores"
# Step 5: Update evaluation_run with scores in summary_scores format
# This format is consistent with what Langfuse returns when fetching traces
eval_run.score = {
"summary_scores": [
{
"name": "cosine_similarity",
"avg": round(float(similarity_stats["cosine_similarity_avg"]), 2),
"std": round(float(similarity_stats["cosine_similarity_std"]), 2),
"total_pairs": similarity_stats["total_pairs"],
"data_type": "NUMERIC",
}
]
}

# Step 6: Update Langfuse traces with cosine similarity scores
logger.info(
Expand Down
56 changes: 56 additions & 0 deletions backend/app/crud/evaluations/score.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""
Type definitions for evaluation scores.

This module contains TypedDict definitions for type-safe score data
used throughout the evaluation system.
"""

from typing import NotRequired, TypedDict


class TraceScore(TypedDict):
"""A score attached to a trace."""

name: str
value: float | str
data_type: str
comment: NotRequired[str]


class TraceData(TypedDict):
"""Data for a single trace including Q&A and scores."""

trace_id: str
question: str
llm_answer: str
ground_truth_answer: str
scores: list[TraceScore]


class NumericSummaryScore(TypedDict):
"""Summary statistics for a numeric score across all traces."""

name: str
avg: float
std: float
total_pairs: int
data_type: str


class CategoricalSummaryScore(TypedDict):
"""Summary statistics for a categorical score across all traces."""

name: str
distribution: dict[str, int]
total_pairs: int
data_type: str


SummaryScore = NumericSummaryScore | CategoricalSummaryScore


class EvaluationScore(TypedDict):
"""Complete evaluation score data with traces and summary statistics."""

summary_scores: list[SummaryScore]
traces: list[TraceData]
50 changes: 39 additions & 11 deletions backend/app/services/evaluations/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,19 +263,27 @@ def get_evaluation_with_scores(
if not eval_run:
return None, None

if not get_trace_info:
return eval_run, None

# Only fetch trace info for completed evaluations
if eval_run.status != "completed":
return eval_run, (
f"Trace info is only available for completed evaluations. "
f"Current status: {eval_run.status}"
)
if get_trace_info:
return eval_run, (
f"Trace info is only available for completed evaluations. "
f"Current status: {eval_run.status}"
)
return eval_run, None

# Check if we already have cached summary_scores
has_summary_scores = (
eval_run.score is not None and "summary_scores" in eval_run.score
)

# Check if we already have cached scores
has_cached_score = eval_run.score is not None and "traces" in eval_run.score
if not resync_score and has_cached_score:
# If not requesting trace info, return existing score (with summary_scores)
if not get_trace_info:
return eval_run, None

# Check if we already have cached traces
has_cached_traces = eval_run.score is not None and "traces" in eval_run.score
if not resync_score and has_cached_traces:
return eval_run, None

langfuse = get_langfuse_client(
Expand All @@ -288,9 +296,12 @@ def get_evaluation_with_scores(
dataset_name = eval_run.dataset_name
run_name = eval_run.run_name
eval_run_id = eval_run.id
existing_summary_scores = (
eval_run.score.get("summary_scores", []) if has_summary_scores else []
)

try:
score = fetch_trace_scores_from_langfuse(
langfuse_score = fetch_trace_scores_from_langfuse(
langfuse=langfuse,
dataset_name=dataset_name,
run_name=run_name,
Expand All @@ -309,6 +320,23 @@ def get_evaluation_with_scores(
)
return eval_run, f"Failed to fetch trace info from Langfuse: {str(e)}"

# Merge summary_scores: existing scores + new scores from Langfuse
# Create a map of existing scores by name
existing_scores_map = {s["name"]: s for s in existing_summary_scores}
langfuse_summary_scores = langfuse_score.get("summary_scores", [])

# Merge: Langfuse scores take precedence (more up-to-date)
for langfuse_summary in langfuse_summary_scores:
existing_scores_map[langfuse_summary["name"]] = langfuse_summary

merged_summary_scores = list(existing_scores_map.values())

# Build final score with merged summary_scores and traces
score = {
"summary_scores": merged_summary_scores,
"traces": langfuse_score.get("traces", []),
}

eval_run = save_score(
eval_run_id=eval_run_id,
organization_id=organization_id,
Expand Down
10 changes: 7 additions & 3 deletions backend/app/tests/crud/evaluations/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,9 +527,13 @@ async def test_process_completed_embedding_batch_success(
db.refresh(result)
assert result.status == "completed"
assert result.score is not None
assert "cosine_similarity" in result.score
assert result.score["cosine_similarity"]["avg"] == 0.95
mock_update_traces.assert_called_once()
assert "summary_scores" in result.score
summary_scores = result.score["summary_scores"]
cosine_score = next(
(s for s in summary_scores if s["name"] == "cosine_similarity"), None
)
assert cosine_score is not None
assert cosine_score["avg"] == 0.95

@pytest.mark.asyncio
@patch("app.crud.evaluations.processing.download_batch_results")
Expand Down