From 9549262b31d58e21d66b20bc64e06e56d97e64be Mon Sep 17 00:00:00 2001 From: Ankur Goyal Date: Wed, 1 Apr 2026 12:19:20 -0700 Subject: [PATCH 1/5] sync log --- src/dataset.rs | 3 +- src/eval/runner.rs | 12 +-- src/experiments/experiment.rs | 10 +- src/logger.rs | 28 ++---- src/span.rs | 183 +++++++++++++++++----------------- src/stream.rs | 7 +- src/test_utils.rs | 11 +- tests/http_flow.rs | 16 ++- tests/span_lifecycle.rs | 3 +- 9 files changed, 131 insertions(+), 142 deletions(-) diff --git a/src/dataset.rs b/src/dataset.rs index 57c37c4..93e101b 100644 --- a/src/dataset.rs +++ b/src/dataset.rs @@ -749,8 +749,7 @@ impl, - ) { + fn submit(&self, token: String, payload: SpanPayload, parent_info: Option) { self.submit_payload(token, payload, parent_info) } + async fn flush(&self) -> Result<()> { + BraintrustClient::flush(self).await + } + async fn trigger_flush(&self) -> Result<()> { - self.trigger_flush().await + BraintrustClient::trigger_flush(self).await } } @@ -1463,8 +1462,7 @@ mod tests { .input(Value::String("hello".into())) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); client.flush().await.expect("client flush"); } @@ -1524,8 +1522,7 @@ mod tests { .input(Value::String("input".into())) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); client.flush().await.expect("client flush"); @@ -1753,8 +1750,7 @@ mod tests { span.log(SpanLog { input: Some(Value::String("test".into())), ..Default::default() - }) - .await; + }); span.flush().await.expect("flush"); client.flush().await.expect("client flush"); @@ -1811,8 +1807,7 @@ mod tests { span.log(SpanLog { input: Some(Value::String("test".into())), ..Default::default() - }) - .await; + }); span.flush().await.expect("flush"); client.flush().await.expect("client flush"); @@ -1889,8 +1884,7 @@ mod tests { span.log(SpanLog { input: Some(Value::String("test".into())), ..Default::default() - }) - .await; + }); span.flush().await.expect("flush"); client.flush().await.expect("client flush"); diff --git a/src/span.rs b/src/span.rs index 45bc5e8..9bdad41 100644 --- a/src/span.rs +++ b/src/span.rs @@ -1,11 +1,10 @@ use std::collections::HashMap; use std::fmt; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::{SystemTime, UNIX_EPOCH}; use async_trait::async_trait; use serde_json::{Map, Value}; -use tokio::sync::Mutex; use uuid::Uuid; use crate::error::Result; @@ -204,12 +203,10 @@ pub(crate) trait SpanSubmitter: Send + Sync { /// Submit a span payload for queuing (fire-and-forget). /// The payload is queued internally and processed by a background worker. /// Dropping is handled internally if the queue is full. - async fn submit( - &self, - token: String, - payload: SpanPayload, - parent_info: Option, - ); + fn submit(&self, token: String, payload: SpanPayload, parent_info: Option); + + /// Flush all queued span data through to the server. + async fn flush(&self) -> Result<()>; /// Trigger a non-blocking background flush. async fn trigger_flush(&self) -> Result<()>; @@ -373,76 +370,25 @@ impl SpanHandle { /// Log event data to this span. All fields are optional. /// Multiple calls will merge data (later values overwrite earlier ones). - pub async fn log(&self, event: SpanLog) { - let mut inner = self.inner.lock().await; + /// Each call synchronously queues the current span snapshot for background processing. + pub fn log(&self, event: SpanLog) { + let payload = { + let mut inner = self.inner.lock().expect("span mutex should not be poisoned"); + apply_span_log_to_data(&mut inner, event); + current_span_payload(&mut inner) + }; - if let Some(name) = event.name { - inner.name = Some(name); - } - if let Some(input) = event.input { - inner.input = Some(input); - } - if let Some(output) = event.output { - inner.output = Some(output); - } - if let Some(expected) = event.expected { - inner.expected = Some(expected); - } - if let Some(error) = event.error { - inner.error = Some(error); - } - if let Some(scores) = event.scores { - for (key, value) in scores { - inner.scores.insert(key, value); - } - } - if let Some(metadata) = event.metadata { - for (key, value) in metadata { - inner.metadata.insert(key, value); - } - } - if let Some(metrics) = event.metrics { - for (key, value) in metrics { - inner.metrics.insert(key, value); - } - } - if let Some(tags) = event.tags { - inner.tags.extend(tags); - } - if let Some(context) = event.context { - inner.context = Some(context); - } + self.submitter + .submit(self.token.clone(), payload, self.parent_info.clone()); } - /// Flush span data to Braintrust. Can be called multiple times - last writer wins. - /// Each call updates the same span (same row_id and span_id). - /// First flush sends with is_merge=false (replace), subsequent flushes send is_merge=true (merge). + /// Flush all queued span data through to Braintrust. /// - /// This does not mark the span as completed. Call [`SpanHandle::end`] to set - /// the `end` metric before final flush. + /// This does not create a new queued span update. Call [`SpanHandle::log`] or + /// [`SpanHandle::end`] first to submit changes, then call `flush()` when you need + /// to ensure the background queue has been processed. pub async fn flush(&self) -> Result<()> { - let payload: SpanPayload = { - let mut inner = self.inner.lock().await; - if let Some(start) = inner.start_time { - inner.metrics.entry("start".to_string()).or_insert(start); - } - if let Some(end) = inner.end_time { - inner.metrics.insert("end".to_string(), end); - } - - // Create payload from current state (captures has_flushed for is_merge) - let payload: SpanPayload = inner.clone().into(); - - // Mark as flushed for subsequent calls - inner.has_flushed = true; - - payload - }; - - self.submitter - .submit(self.token.clone(), payload, self.parent_info.clone()) - .await; - Ok(()) + self.submitter.flush().await } /// Mark the span as ended with the current timestamp. @@ -458,11 +404,17 @@ impl SpanHandle { /// Calling this multiple times is idempotent: once an end time is set, /// subsequent calls return the previously-set value. pub async fn end_with_time(&self, end_time: f64) -> f64 { - let mut inner = self.inner.lock().await; - if let Some(existing) = inner.end_time { - return existing; - } - inner.end_time = Some(end_time); + let (end_time, payload) = { + let mut inner = self.inner.lock().expect("span mutex should not be poisoned"); + if let Some(existing) = inner.end_time { + return existing; + } + inner.end_time = Some(end_time); + (end_time, current_span_payload(&mut inner)) + }; + + self.submitter + .submit(self.token.clone(), payload, self.parent_info.clone()); end_time } @@ -479,7 +431,7 @@ impl SpanHandle { /// The exported SpanComponents includes the span's IDs and any propagated_event /// data that should flow to child spans. pub async fn export(&self) -> Result { - let inner = self.inner.lock().await; + let inner = self.inner.lock().expect("span mutex should not be poisoned"); // Determine object_type and object_id from parent_info if available, // otherwise default to ProjectLogs @@ -544,6 +496,58 @@ impl SpanHandle { } } +fn apply_span_log_to_data(inner: &mut SpanData, event: SpanLog) { + if let Some(name) = event.name { + inner.name = Some(name); + } + if let Some(input) = event.input { + inner.input = Some(input); + } + if let Some(output) = event.output { + inner.output = Some(output); + } + if let Some(expected) = event.expected { + inner.expected = Some(expected); + } + if let Some(error) = event.error { + inner.error = Some(error); + } + if let Some(scores) = event.scores { + for (key, value) in scores { + inner.scores.insert(key, value); + } + } + if let Some(metadata) = event.metadata { + for (key, value) in metadata { + inner.metadata.insert(key, value); + } + } + if let Some(metrics) = event.metrics { + for (key, value) in metrics { + inner.metrics.insert(key, value); + } + } + if let Some(tags) = event.tags { + inner.tags.extend(tags); + } + if let Some(context) = event.context { + inner.context = Some(context); + } +} + +fn current_span_payload(inner: &mut SpanData) -> SpanPayload { + if let Some(start) = inner.start_time { + inner.metrics.entry("start".to_string()).or_insert(start); + } + if let Some(end) = inner.end_time { + inner.metrics.insert("end".to_string(), end); + } + + let payload: SpanPayload = inner.clone().into(); + inner.has_flushed = true; + payload +} + #[derive(Clone, Default)] struct SpanData { row_id: String, @@ -705,8 +709,7 @@ mod tests { .output(json!({"output": "world"})) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); let spans = collector.spans(); @@ -731,8 +734,7 @@ mod tests { })) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); let captured = collector.spans().into_iter().next().unwrap(); @@ -758,8 +760,7 @@ mod tests { .input(json!("data")) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); let captured = collector.spans().into_iter().next().unwrap(); @@ -792,8 +793,7 @@ mod tests { .context(json!({"source": "test"})) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); let captured = collector.spans().into_iter().next().unwrap(); @@ -815,8 +815,7 @@ mod tests { .error(json!({"message": "Something went wrong", "code": 500})) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); let captured = collector.spans().into_iter().next().unwrap(); @@ -834,8 +833,7 @@ mod tests { .input(json!({"input": "hello"})) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); let captured = collector.spans().into_iter().next().unwrap(); @@ -892,8 +890,7 @@ mod tests { .input(json!({"child": "input"})) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); diff --git a/src/stream.rs b/src/stream.rs index 06e759d..4c9937f 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -859,7 +859,7 @@ where .metric("time_to_first_token", ttft_secs) .build() { - span.log(log).await; + span.log(log); } } @@ -886,8 +886,7 @@ where span.log(SpanLog { output: Some(output), ..Default::default() - }) - .await; + }); } } drop(agg); @@ -988,7 +987,7 @@ async fn finalize_span( builder = builder.metrics(metrics); } if let Ok(log) = builder.build() { - span.log(log).await; + span.log(log); } } Err(e) => { diff --git a/src/test_utils.rs b/src/test_utils.rs index 3fdafcf..0075a6b 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -58,12 +58,7 @@ impl MockBraintrustClient { #[async_trait] impl SpanSubmitter for MockBraintrustClient { - async fn submit( - &self, - token: String, - payload: SpanPayload, - parent_info: Option, - ) { + fn submit(&self, token: String, payload: SpanPayload, parent_info: Option) { self.collector.push(CapturedSpan { token, payload, @@ -71,6 +66,10 @@ impl SpanSubmitter for MockBraintrustClient { }); } + async fn flush(&self) -> Result<()> { + Ok(()) + } + async fn trigger_flush(&self) -> Result<()> { // Mock: no-op for tests Ok(()) diff --git a/tests/http_flow.rs b/tests/http_flow.rs index 9391be8..5622ed3 100644 --- a/tests/http_flow.rs +++ b/tests/http_flow.rs @@ -39,8 +39,7 @@ async fn submits_with_bearer_token() { .input(Value::String("input".into())) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); client.flush().await.expect("client flush"); @@ -79,10 +78,17 @@ async fn flush_is_fire_and_forget() { .project_name("demo") .build(); - // flush() queues the data and returns immediately. - // The actual HTTP submission (which fails) happens in the background. + span.log( + SpanLog::builder() + .input(Value::String("input".into())) + .build() + .expect("build"), + ); + + // flush() waits for queued work to be processed. Background submission errors + // are still swallowed by the queue and should not bubble out here. let result = span.flush().await; - assert!(result.is_ok(), "flush() should be fire-and-forget"); + assert!(result.is_ok(), "flush() should not surface background submission errors"); // Drain pending items before drop so the Drop impl's synchronous flush path // is a no-op (queue is empty). The 500 from project registration is swallowed. diff --git a/tests/span_lifecycle.rs b/tests/span_lifecycle.rs index 2840cfc..a0a65f3 100644 --- a/tests/span_lifecycle.rs +++ b/tests/span_lifecycle.rs @@ -45,8 +45,7 @@ async fn span_lifecycle_flushes_to_logs_endpoint() { .output(Value::String("output".into())) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); client.flush().await.expect("client flush"); From 3faa9f7981028d212fca68850155b323ea3623ff Mon Sep 17 00:00:00 2001 From: Ankur Goyal Date: Wed, 1 Apr 2026 12:27:05 -0700 Subject: [PATCH 2/5] clean up --- src/span.rs | 52 +++++++++++++++++++++------------------------------- 1 file changed, 21 insertions(+), 31 deletions(-) diff --git a/src/span.rs b/src/span.rs index 9bdad41..bc5dac9 100644 --- a/src/span.rs +++ b/src/span.rs @@ -497,42 +497,32 @@ impl SpanHandle { } fn apply_span_log_to_data(inner: &mut SpanData, event: SpanLog) { - if let Some(name) = event.name { - inner.name = Some(name); - } - if let Some(input) = event.input { - inner.input = Some(input); - } - if let Some(output) = event.output { - inner.output = Some(output); - } - if let Some(expected) = event.expected { - inner.expected = Some(expected); - } - if let Some(error) = event.error { - inner.error = Some(error); - } - if let Some(scores) = event.scores { - for (key, value) in scores { - inner.scores.insert(key, value); - } - } - if let Some(metadata) = event.metadata { - for (key, value) in metadata { - inner.metadata.insert(key, value); - } + macro_rules! replace_if_some { + ($($field:ident),* $(,)?) => { + $( + if let Some(value) = event.$field { + inner.$field = Some(value); + } + )* + }; } - if let Some(metrics) = event.metrics { - for (key, value) in metrics { - inner.metrics.insert(key, value); - } + + macro_rules! merge_map_if_some { + ($($field:ident),* $(,)?) => { + $( + if let Some(values) = event.$field { + inner.$field.extend(values); + } + )* + }; } + + replace_if_some!(name, input, output, expected, error, context); + merge_map_if_some!(scores, metadata, metrics); + if let Some(tags) = event.tags { inner.tags.extend(tags); } - if let Some(context) = event.context { - inner.context = Some(context); - } } fn current_span_payload(inner: &mut SpanData) -> SpanPayload { From 43cdb0a2487171723107a0145f5d2205009659f3 Mon Sep 17 00:00:00 2001 From: Ankur Goyal Date: Wed, 1 Apr 2026 12:27:35 -0700 Subject: [PATCH 3/5] clippy --- src/dataset.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dataset.rs b/src/dataset.rs index 93e101b..48bb1e6 100644 --- a/src/dataset.rs +++ b/src/dataset.rs @@ -747,7 +747,7 @@ impl Date: Wed, 1 Apr 2026 12:28:52 -0700 Subject: [PATCH 4/5] fmt --- src/dataset.rs | 3 +-- src/eval/runner.rs | 26 ++++++++++++-------------- src/span.rs | 15 ++++++++++++--- tests/http_flow.rs | 5 ++++- 4 files changed, 29 insertions(+), 20 deletions(-) diff --git a/src/dataset.rs b/src/dataset.rs index 48bb1e6..e01e33e 100644 --- a/src/dataset.rs +++ b/src/dataset.rs @@ -747,8 +747,7 @@ impl output, Err(e) => { // Log error to span - case_span - .log( - crate::SpanLog::builder() - .error(json!(e.to_string())) - .build() - .unwrap(), - ); + case_span.log( + crate::SpanLog::builder() + .error(json!(e.to_string())) + .build() + .unwrap(), + ); case_span.end().await; return Ok(EvalResult { @@ -266,13 +265,12 @@ where // Set output in span if let Ok(output_json) = serde_json::to_value(&output) { - case_span - .log( - crate::SpanLog::builder() - .output(output_json) - .build() - .unwrap(), - ); + case_span.log( + crate::SpanLog::builder() + .output(output_json) + .build() + .unwrap(), + ); } // Run scorers in parallel diff --git a/src/span.rs b/src/span.rs index bc5dac9..3481ac8 100644 --- a/src/span.rs +++ b/src/span.rs @@ -373,7 +373,10 @@ impl SpanHandle { /// Each call synchronously queues the current span snapshot for background processing. pub fn log(&self, event: SpanLog) { let payload = { - let mut inner = self.inner.lock().expect("span mutex should not be poisoned"); + let mut inner = self + .inner + .lock() + .expect("span mutex should not be poisoned"); apply_span_log_to_data(&mut inner, event); current_span_payload(&mut inner) }; @@ -405,7 +408,10 @@ impl SpanHandle { /// subsequent calls return the previously-set value. pub async fn end_with_time(&self, end_time: f64) -> f64 { let (end_time, payload) = { - let mut inner = self.inner.lock().expect("span mutex should not be poisoned"); + let mut inner = self + .inner + .lock() + .expect("span mutex should not be poisoned"); if let Some(existing) = inner.end_time { return existing; } @@ -431,7 +437,10 @@ impl SpanHandle { /// The exported SpanComponents includes the span's IDs and any propagated_event /// data that should flow to child spans. pub async fn export(&self) -> Result { - let inner = self.inner.lock().expect("span mutex should not be poisoned"); + let inner = self + .inner + .lock() + .expect("span mutex should not be poisoned"); // Determine object_type and object_id from parent_info if available, // otherwise default to ProjectLogs diff --git a/tests/http_flow.rs b/tests/http_flow.rs index 5622ed3..fbe5d9b 100644 --- a/tests/http_flow.rs +++ b/tests/http_flow.rs @@ -88,7 +88,10 @@ async fn flush_is_fire_and_forget() { // flush() waits for queued work to be processed. Background submission errors // are still swallowed by the queue and should not bubble out here. let result = span.flush().await; - assert!(result.is_ok(), "flush() should not surface background submission errors"); + assert!( + result.is_ok(), + "flush() should not surface background submission errors" + ); // Drain pending items before drop so the Drop impl's synchronous flush path // is a no-op (queue is empty). The 500 from project registration is swallowed. From d8772341f0f9cba09470e2cae241155113e9c926 Mon Sep 17 00:00:00 2001 From: Ankur Goyal Date: Wed, 1 Apr 2026 14:00:52 -0700 Subject: [PATCH 5/5] remoe more async --- src/eval/runner.rs | 6 +++--- src/logger.rs | 2 +- src/span.rs | 18 +++++++++--------- src/stream.rs | 2 +- tests/span_lifecycle.rs | 1 - 5 files changed, 14 insertions(+), 15 deletions(-) diff --git a/src/eval/runner.rs b/src/eval/runner.rs index 2e3f464..96db594 100644 --- a/src/eval/runner.rs +++ b/src/eval/runner.rs @@ -163,7 +163,7 @@ where } // End root span - root_span.end().await; + root_span.end(); // Compute statistics let score_stats = EvalSummary::::compute_stats(&results); @@ -248,7 +248,7 @@ where .build() .unwrap(), ); - case_span.end().await; + case_span.end(); return Ok(EvalResult { input: case.input.clone(), @@ -322,7 +322,7 @@ where } case_span.log(final_log.build().unwrap()); - case_span.end().await; + case_span.end(); if !quiet { let score_str = all_scores diff --git a/src/logger.rs b/src/logger.rs index 00c9e12..68300b7 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -561,7 +561,7 @@ impl BraintrustClient { /// Update an existing span using explicit credentials instead of shared login state. /// /// This is the safe entrypoint for multi-tenant `skip_login` clients. - pub async fn update_span_with_credentials( + pub fn update_span_with_credentials( &self, token: impl Into, org_id: impl Into, diff --git a/src/span.rs b/src/span.rs index 3481ac8..b4fd51d 100644 --- a/src/span.rs +++ b/src/span.rs @@ -398,15 +398,15 @@ impl SpanHandle { /// /// Calling `end()` multiple times is idempotent: once an end time is set, /// subsequent calls return the same value without overwriting it. - pub async fn end(&self) -> f64 { - self.end_with_time(epoch_secs()).await + pub fn end(&self) -> f64 { + self.end_with_time(epoch_secs()) } /// Mark the span as ended with an explicit timestamp (seconds since Unix epoch). /// /// Calling this multiple times is idempotent: once an end time is set, /// subsequent calls return the previously-set value. - pub async fn end_with_time(&self, end_time: f64) -> f64 { + pub fn end_with_time(&self, end_time: f64) -> f64 { let (end_time, payload) = { let mut inner = self .inner @@ -436,7 +436,7 @@ impl SpanHandle { /// /// The exported SpanComponents includes the span's IDs and any propagated_event /// data that should flow to child spans. - pub async fn export(&self) -> Result { + pub fn export(&self) -> Result { let inner = self .inner .lock() @@ -844,7 +844,7 @@ mod tests { #[tokio::test] async fn end_sets_end_metric_on_flush() { let (span, collector) = build_test_span(); - span.end_with_time(123.0).await; + span.end_with_time(123.0); span.flush().await.expect("flush"); let captured = collector.spans().into_iter().next().unwrap(); @@ -855,8 +855,8 @@ mod tests { #[tokio::test] async fn end_is_idempotent() { let (span, _collector) = build_test_span(); - let first = span.end_with_time(123.0).await; - let second = span.end_with_time(456.0).await; + let first = span.end_with_time(123.0); + let second = span.end_with_time(456.0); assert_eq!(first, 123.0); assert_eq!(second, 123.0); } @@ -931,7 +931,7 @@ mod tests { let span = builder.parent_info(parent_info).build(); // Export the span - let exported = span.export().await.unwrap(); + let exported = span.export().unwrap(); // Verify exported SpanComponents has propagated_event assert!(exported.propagated_event.is_some()); @@ -947,7 +947,7 @@ mod tests { let (builder, _collector) = mock_span_builder(); let span = builder.project_name("demo-project").build(); - let exported = span.export().await.unwrap(); + let exported = span.export().unwrap(); assert_eq!(exported.object_type, SpanObjectType::ProjectLogs); assert!(exported.object_id.is_none()); diff --git a/src/stream.rs b/src/stream.rs index 4c9937f..cf97ea0 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -995,7 +995,7 @@ async fn finalize_span( } } } - span.end().await; + span.end(); // Flush span with aggregated output if let Err(e) = span.flush().await { tracing::warn!("Failed to flush span: {}", e); diff --git a/tests/span_lifecycle.rs b/tests/span_lifecycle.rs index a0a65f3..3a52bef 100644 --- a/tests/span_lifecycle.rs +++ b/tests/span_lifecycle.rs @@ -171,7 +171,6 @@ async fn client_update_span_with_credentials_works_without_priming_login_state() .build() .expect("build"), ) - .await .expect("update"); client.flush().await.expect("flush");