-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathbraintrust.py
More file actions
281 lines (222 loc) · 9.72 KB
/
braintrust.py
File metadata and controls
281 lines (222 loc) · 9.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
"""Braintrust adapter for Eval Protocol.
This adapter allows pulling data from Braintrust deployments and converting it
to EvaluationRow format for use in evaluation pipelines.
"""
import logging
import os
import random
import time
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Protocol
import requests
from eval_protocol.models import EvaluationRow, InputMetadata, Message
from .base import BaseAdapter
from .utils import extract_messages_from_data
logger = logging.getLogger(__name__)
class TraceConverter(Protocol):
"""Protocol for custom trace-to-EvaluationRow converter functions.
A converter function should take a Braintrust trace along with processing
options and return an EvaluationRow or None to skip the trace.
"""
def __call__(
self,
trace: Dict[str, Any],
include_tool_calls: bool,
) -> Optional[EvaluationRow]:
"""Convert a Braintrust trace to an EvaluationRow.
Args:
trace: The Braintrust trace object to convert
include_tool_calls: Whether to include tool calling information
Returns:
EvaluationRow or None if the trace should be skipped
"""
...
def convert_trace_to_evaluation_row(trace: Dict[str, Any], include_tool_calls: bool = True) -> Optional[EvaluationRow]:
"""Convert a Braintrust trace to EvaluationRow format.
Args:
trace: Braintrust trace object
include_tool_calls: Whether to include tool calling information
Returns:
EvaluationRow or None if conversion fails
"""
try:
# Extract messages from the trace
messages = extract_messages_from_trace(trace, include_tool_calls)
# Extract tools if available
tools = None
if include_tool_calls:
metadata = trace.get("metadata", {})
tools = metadata.get("tools")
if not tools:
hidden_params = metadata.get("hidden_params", {})
optional_params = hidden_params.get("optional_params", {})
tools = optional_params.get("tools")
if not messages:
return None
return EvaluationRow(
messages=messages,
tools=tools,
input_metadata=InputMetadata(
session_data={
"braintrust_trace_id": trace.get("id"),
}
),
)
except (AttributeError, ValueError, KeyError) as e:
logger.error("Error converting trace %s: %s", trace.get("id", "unknown"), e)
return None
def extract_messages_from_trace(trace: Dict[str, Any], include_tool_calls: bool = True) -> List[Message]:
"""Extract messages from Braintrust trace input and output.
Args:
trace: Braintrust trace object
include_tool_calls: Whether to include tool calling information
Returns:
List of Message objects
"""
messages = []
try:
# Look for complete conversations (input + output arrays)
input_data = trace.get("input")
output_data = None
output_list = trace.get("output", [])
if output_list and len(output_list) > 0:
first_output = output_list[0]
if isinstance(first_output, dict):
output_data = first_output.get("message")
# Skip spans without meaningful conversation data
if not input_data or not output_data:
return messages
# Extract messages from input and output
if input_data:
messages.extend(extract_messages_from_data(input_data, include_tool_calls))
if output_data:
messages.extend(extract_messages_from_data(output_data, include_tool_calls))
except (AttributeError, ValueError, KeyError) as e:
logger.warning("Error processing trace %s: %s", trace.get("id", "unknown"), e)
return messages
class BraintrustAdapter(BaseAdapter):
"""Adapter to pull data from Braintrust and convert to EvaluationRow format.
This adapter can pull both chat conversations and tool calling traces from
Braintrust deployments and convert them into the EvaluationRow format expected
by the evaluation protocol.
Examples:
Basic usage:
>>> adapter = BraintrustAdapter(
... api_key="your_api_key",
... project_id="your_project_id"
... )
>>> btql_query = "select: * from: project_logs('your_project_id') traces limit: 10"
>>> rows = adapter.get_evaluation_rows(btql_query)
Using BTQL for custom queries:
>>> btql_query = '''
... select: *
... from: project_logs('your_project_id') traces
... filter: metadata.agent_name = 'agent_instance'
... limit: 50
... '''
>>> rows = adapter.get_evaluation_rows(btql_query)
"""
def __init__(
self,
api_key: Optional[str] = None,
api_url: Optional[str] = None,
project_id: Optional[str] = None,
):
"""Initialize the Braintrust adapter.
Args:
api_key: Braintrust API key (defaults to BRAINTRUST_API_KEY env var)
api_url: Braintrust API URL (defaults to BRAINTRUST_API_URL env var)
project_id: Project ID to fetch logs from (defaults to BRAINTRUST_PROJECT_ID env var)
"""
self.api_key = api_key or os.getenv("BRAINTRUST_API_KEY")
self.api_url = api_url or os.getenv("BRAINTRUST_API_URL", "https://api.braintrust.dev")
self.project_id = project_id or os.getenv("BRAINTRUST_PROJECT_ID")
if not self.api_key:
raise ValueError("BRAINTRUST_API_KEY environment variable or api_key parameter required")
if not self.project_id:
raise ValueError("BRAINTRUST_PROJECT_ID environment variable or project_id parameter required")
def get_evaluation_rows(
self,
btql_query: str,
include_tool_calls: bool = True,
converter: Optional[TraceConverter] = None,
) -> List[EvaluationRow]:
"""Get evaluation rows using a custom BTQL query.
Args:
btql_query: The BTQL query string to execute
include_tool_calls: Whether to include tool calling information
converter: Optional custom converter implementing TraceConverter protocol
Returns:
List[EvaluationRow]: Converted evaluation rows
"""
eval_rows = []
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
response = requests.post(f"{self.api_url}/btql", headers=headers, json={"query": btql_query, "fmt": "json"})
response.raise_for_status()
query_response = response.json()
if not query_response or not query_response.get("data"):
logger.debug("No data returned from BTQL query")
return eval_rows
all_traces = query_response["data"]
logger.debug("BTQL query returned %d traces", len(all_traces))
# Process each selected trace
for trace in all_traces:
try:
if converter:
eval_row = converter(trace, include_tool_calls)
else:
eval_row = convert_trace_to_evaluation_row(trace, include_tool_calls)
if eval_row:
eval_rows.append(eval_row)
except (AttributeError, ValueError, KeyError) as e:
logger.warning("Failed to convert trace %s: %s", trace.get("id", "unknown"), e)
continue
logger.info("Successfully processed %d BTQL results into %d evaluation rows", len(all_traces), len(eval_rows))
return eval_rows
def upload_scores(self, rows: List[EvaluationRow], model_name: str, mean_score: float) -> None:
"""Upload evaluation scores back to Braintrust traces for tracking and analysis.
Creates score entries in Braintrust for each unique trace_id found in the evaluation
rows' session data. This allows you to see evaluation results directly in the
Braintrust UI alongside the original traces.
Args:
rows: List of EvaluationRow objects with session_data containing trace IDs
model_name: Name of the model (used as the score name in Braintrust)
mean_score: The calculated mean score to push to Braintrust
Note:
Silently handles errors if rows lack session data
"""
try:
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
feedback_items = []
for trace_id in set(
row.input_metadata.session_data["braintrust_trace_id"]
for row in rows
if row.evaluation_result and row.input_metadata and row.input_metadata.session_data
):
if trace_id:
feedback_items.append({"id": trace_id, "scores": {model_name: mean_score}})
if feedback_items:
payload = {"feedback": feedback_items}
response = requests.post(
f"{self.api_url}/v1/project_logs/{self.project_id}/feedback",
headers=headers,
json=payload,
)
response.raise_for_status()
except Exception as e:
logger.warning("Failed to push scores to Braintrust: %s", e)
def create_braintrust_adapter(
api_key: Optional[str] = None,
api_url: Optional[str] = None,
project_id: Optional[str] = None,
) -> BraintrustAdapter:
"""Factory function to create a Braintrust adapter."""
return BraintrustAdapter(
api_key=api_key,
api_url=api_url,
project_id=project_id,
)
__all__ = ["BraintrustAdapter", "create_braintrust_adapter"]