diff --git a/src/dataset.rs b/src/dataset.rs index 57c37c4..e01e33e 100644 --- a/src/dataset.rs +++ b/src/dataset.rs @@ -747,10 +747,8 @@ impl::compute_stats(&results); @@ -232,7 +232,7 @@ where log_builder = log_builder.tags(tags.clone()); } - case_span.log(log_builder.build().unwrap()).await; + case_span.log(log_builder.build().unwrap()); // Create hooks for the task let hooks = TaskHooks::new(&case_span, case.expected.as_ref()); @@ -242,15 +242,13 @@ where Ok(output) => output, Err(e) => { // Log error to span - case_span - .log( - crate::SpanLog::builder() - .error(json!(e.to_string())) - .build() - .unwrap(), - ) - .await; - case_span.end().await; + case_span.log( + crate::SpanLog::builder() + .error(json!(e.to_string())) + .build() + .unwrap(), + ); + case_span.end(); return Ok(EvalResult { input: case.input.clone(), @@ -267,14 +265,12 @@ where // Set output in span if let Ok(output_json) = serde_json::to_value(&output) { - case_span - .log( - crate::SpanLog::builder() - .output(output_json) - .build() - .unwrap(), - ) - .await; + case_span.log( + crate::SpanLog::builder() + .output(output_json) + .build() + .unwrap(), + ); } // Run scorers in parallel @@ -324,9 +320,9 @@ where if let Some(tags) = &final_tags { final_log = final_log.tags(tags.clone()); } - case_span.log(final_log.build().unwrap()).await; + case_span.log(final_log.build().unwrap()); - case_span.end().await; + case_span.end(); if !quiet { let score_str = all_scores diff --git a/src/experiments/experiment.rs b/src/experiments/experiment.rs index b0bdc35..a995f76 100644 --- a/src/experiments/experiment.rs +++ b/src/experiments/experiment.rs @@ -127,7 +127,7 @@ impl< // Convert ExperimentLog to SpanLog and log let span_log = event.into_span_log(); - span.log(span_log).await; + span.log(span_log); // Record end time for next log() call let end_time = SystemTime::now() @@ -205,8 +205,7 @@ impl< }; self.submitter - .submit(self.token.clone(), payload, Some(parent_info)) - .await; + .submit(self.token.clone(), payload, Some(parent_info)); } /// Get a summary of the experiment's performance. @@ -339,7 +338,7 @@ impl< /// /// ```ignore /// let result = experiment.traced(|span| async move { - /// span.log(SpanLog::builder().input(json!({"x": 1})).build().unwrap()).await; + /// span.log(SpanLog::builder().input(json!({"x": 1})).build().unwrap()); /// compute_result() /// }).await; /// ``` @@ -392,8 +391,7 @@ impl< }; self.submitter - .submit(self.token.clone(), payload, Some(parent_info)) - .await; + .submit(self.token.clone(), payload, Some(parent_info)); } /// Fetch the base experiment used for comparison. diff --git a/src/logger.rs b/src/logger.rs index 0621236..68300b7 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -561,7 +561,7 @@ impl BraintrustClient { /// Update an existing span using explicit credentials instead of shared login state. /// /// This is the safe entrypoint for multi-tenant `skip_login` clients. - pub async fn update_span_with_credentials( + pub fn update_span_with_credentials( &self, token: impl Into, org_id: impl Into, @@ -904,17 +904,16 @@ impl Drop for BraintrustClient { #[async_trait::async_trait] impl SpanSubmitter for BraintrustClient { - async fn submit( - &self, - token: String, - payload: SpanPayload, - parent_info: Option, - ) { + fn submit(&self, token: String, payload: SpanPayload, parent_info: Option) { self.submit_payload(token, payload, parent_info) } + async fn flush(&self) -> Result<()> { + BraintrustClient::flush(self).await + } + async fn trigger_flush(&self) -> Result<()> { - self.trigger_flush().await + BraintrustClient::trigger_flush(self).await } } @@ -1463,8 +1462,7 @@ mod tests { .input(Value::String("hello".into())) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); client.flush().await.expect("client flush"); } @@ -1524,8 +1522,7 @@ mod tests { .input(Value::String("input".into())) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); client.flush().await.expect("client flush"); @@ -1753,8 +1750,7 @@ mod tests { span.log(SpanLog { input: Some(Value::String("test".into())), ..Default::default() - }) - .await; + }); span.flush().await.expect("flush"); client.flush().await.expect("client flush"); @@ -1811,8 +1807,7 @@ mod tests { span.log(SpanLog { input: Some(Value::String("test".into())), ..Default::default() - }) - .await; + }); span.flush().await.expect("flush"); client.flush().await.expect("client flush"); @@ -1889,8 +1884,7 @@ mod tests { span.log(SpanLog { input: Some(Value::String("test".into())), ..Default::default() - }) - .await; + }); span.flush().await.expect("flush"); client.flush().await.expect("client flush"); diff --git a/src/span.rs b/src/span.rs index 45bc5e8..b4fd51d 100644 --- a/src/span.rs +++ b/src/span.rs @@ -1,11 +1,10 @@ use std::collections::HashMap; use std::fmt; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::{SystemTime, UNIX_EPOCH}; use async_trait::async_trait; use serde_json::{Map, Value}; -use tokio::sync::Mutex; use uuid::Uuid; use crate::error::Result; @@ -204,12 +203,10 @@ pub(crate) trait SpanSubmitter: Send + Sync { /// Submit a span payload for queuing (fire-and-forget). /// The payload is queued internally and processed by a background worker. /// Dropping is handled internally if the queue is full. - async fn submit( - &self, - token: String, - payload: SpanPayload, - parent_info: Option, - ); + fn submit(&self, token: String, payload: SpanPayload, parent_info: Option); + + /// Flush all queued span data through to the server. + async fn flush(&self) -> Result<()>; /// Trigger a non-blocking background flush. async fn trigger_flush(&self) -> Result<()>; @@ -373,96 +370,57 @@ impl SpanHandle { /// Log event data to this span. All fields are optional. /// Multiple calls will merge data (later values overwrite earlier ones). - pub async fn log(&self, event: SpanLog) { - let mut inner = self.inner.lock().await; + /// Each call synchronously queues the current span snapshot for background processing. + pub fn log(&self, event: SpanLog) { + let payload = { + let mut inner = self + .inner + .lock() + .expect("span mutex should not be poisoned"); + apply_span_log_to_data(&mut inner, event); + current_span_payload(&mut inner) + }; - if let Some(name) = event.name { - inner.name = Some(name); - } - if let Some(input) = event.input { - inner.input = Some(input); - } - if let Some(output) = event.output { - inner.output = Some(output); - } - if let Some(expected) = event.expected { - inner.expected = Some(expected); - } - if let Some(error) = event.error { - inner.error = Some(error); - } - if let Some(scores) = event.scores { - for (key, value) in scores { - inner.scores.insert(key, value); - } - } - if let Some(metadata) = event.metadata { - for (key, value) in metadata { - inner.metadata.insert(key, value); - } - } - if let Some(metrics) = event.metrics { - for (key, value) in metrics { - inner.metrics.insert(key, value); - } - } - if let Some(tags) = event.tags { - inner.tags.extend(tags); - } - if let Some(context) = event.context { - inner.context = Some(context); - } + self.submitter + .submit(self.token.clone(), payload, self.parent_info.clone()); } - /// Flush span data to Braintrust. Can be called multiple times - last writer wins. - /// Each call updates the same span (same row_id and span_id). - /// First flush sends with is_merge=false (replace), subsequent flushes send is_merge=true (merge). + /// Flush all queued span data through to Braintrust. /// - /// This does not mark the span as completed. Call [`SpanHandle::end`] to set - /// the `end` metric before final flush. + /// This does not create a new queued span update. Call [`SpanHandle::log`] or + /// [`SpanHandle::end`] first to submit changes, then call `flush()` when you need + /// to ensure the background queue has been processed. pub async fn flush(&self) -> Result<()> { - let payload: SpanPayload = { - let mut inner = self.inner.lock().await; - if let Some(start) = inner.start_time { - inner.metrics.entry("start".to_string()).or_insert(start); - } - if let Some(end) = inner.end_time { - inner.metrics.insert("end".to_string(), end); - } - - // Create payload from current state (captures has_flushed for is_merge) - let payload: SpanPayload = inner.clone().into(); - - // Mark as flushed for subsequent calls - inner.has_flushed = true; - - payload - }; - - self.submitter - .submit(self.token.clone(), payload, self.parent_info.clone()) - .await; - Ok(()) + self.submitter.flush().await } /// Mark the span as ended with the current timestamp. /// /// Calling `end()` multiple times is idempotent: once an end time is set, /// subsequent calls return the same value without overwriting it. - pub async fn end(&self) -> f64 { - self.end_with_time(epoch_secs()).await + pub fn end(&self) -> f64 { + self.end_with_time(epoch_secs()) } /// Mark the span as ended with an explicit timestamp (seconds since Unix epoch). /// /// Calling this multiple times is idempotent: once an end time is set, /// subsequent calls return the previously-set value. - pub async fn end_with_time(&self, end_time: f64) -> f64 { - let mut inner = self.inner.lock().await; - if let Some(existing) = inner.end_time { - return existing; - } - inner.end_time = Some(end_time); + pub fn end_with_time(&self, end_time: f64) -> f64 { + let (end_time, payload) = { + let mut inner = self + .inner + .lock() + .expect("span mutex should not be poisoned"); + if let Some(existing) = inner.end_time { + return existing; + } + inner.end_time = Some(end_time); + (end_time, current_span_payload(&mut inner)) + }; + + self.submitter + .submit(self.token.clone(), payload, self.parent_info.clone()); end_time } @@ -478,8 +436,11 @@ impl SpanHandle { /// /// The exported SpanComponents includes the span's IDs and any propagated_event /// data that should flow to child spans. - pub async fn export(&self) -> Result { - let inner = self.inner.lock().await; + pub fn export(&self) -> Result { + let inner = self + .inner + .lock() + .expect("span mutex should not be poisoned"); // Determine object_type and object_id from parent_info if available, // otherwise default to ProjectLogs @@ -544,6 +505,48 @@ impl SpanHandle { } } +fn apply_span_log_to_data(inner: &mut SpanData, event: SpanLog) { + macro_rules! replace_if_some { + ($($field:ident),* $(,)?) => { + $( + if let Some(value) = event.$field { + inner.$field = Some(value); + } + )* + }; + } + + macro_rules! merge_map_if_some { + ($($field:ident),* $(,)?) => { + $( + if let Some(values) = event.$field { + inner.$field.extend(values); + } + )* + }; + } + + replace_if_some!(name, input, output, expected, error, context); + merge_map_if_some!(scores, metadata, metrics); + + if let Some(tags) = event.tags { + inner.tags.extend(tags); + } +} + +fn current_span_payload(inner: &mut SpanData) -> SpanPayload { + if let Some(start) = inner.start_time { + inner.metrics.entry("start".to_string()).or_insert(start); + } + if let Some(end) = inner.end_time { + inner.metrics.insert("end".to_string(), end); + } + + let payload: SpanPayload = inner.clone().into(); + inner.has_flushed = true; + payload +} + #[derive(Clone, Default)] struct SpanData { row_id: String, @@ -705,8 +708,7 @@ mod tests { .output(json!({"output": "world"})) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); let spans = collector.spans(); @@ -731,8 +733,7 @@ mod tests { })) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); let captured = collector.spans().into_iter().next().unwrap(); @@ -758,8 +759,7 @@ mod tests { .input(json!("data")) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); let captured = collector.spans().into_iter().next().unwrap(); @@ -792,8 +792,7 @@ mod tests { .context(json!({"source": "test"})) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); let captured = collector.spans().into_iter().next().unwrap(); @@ -815,8 +814,7 @@ mod tests { .error(json!({"message": "Something went wrong", "code": 500})) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); let captured = collector.spans().into_iter().next().unwrap(); @@ -834,8 +832,7 @@ mod tests { .input(json!({"input": "hello"})) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); let captured = collector.spans().into_iter().next().unwrap(); @@ -847,7 +844,7 @@ mod tests { #[tokio::test] async fn end_sets_end_metric_on_flush() { let (span, collector) = build_test_span(); - span.end_with_time(123.0).await; + span.end_with_time(123.0); span.flush().await.expect("flush"); let captured = collector.spans().into_iter().next().unwrap(); @@ -858,8 +855,8 @@ mod tests { #[tokio::test] async fn end_is_idempotent() { let (span, _collector) = build_test_span(); - let first = span.end_with_time(123.0).await; - let second = span.end_with_time(456.0).await; + let first = span.end_with_time(123.0); + let second = span.end_with_time(456.0); assert_eq!(first, 123.0); assert_eq!(second, 123.0); } @@ -892,8 +889,7 @@ mod tests { .input(json!({"child": "input"})) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); @@ -935,7 +931,7 @@ mod tests { let span = builder.parent_info(parent_info).build(); // Export the span - let exported = span.export().await.unwrap(); + let exported = span.export().unwrap(); // Verify exported SpanComponents has propagated_event assert!(exported.propagated_event.is_some()); @@ -951,7 +947,7 @@ mod tests { let (builder, _collector) = mock_span_builder(); let span = builder.project_name("demo-project").build(); - let exported = span.export().await.unwrap(); + let exported = span.export().unwrap(); assert_eq!(exported.object_type, SpanObjectType::ProjectLogs); assert!(exported.object_id.is_none()); diff --git a/src/stream.rs b/src/stream.rs index 06e759d..cf97ea0 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -859,7 +859,7 @@ where .metric("time_to_first_token", ttft_secs) .build() { - span.log(log).await; + span.log(log); } } @@ -886,8 +886,7 @@ where span.log(SpanLog { output: Some(output), ..Default::default() - }) - .await; + }); } } drop(agg); @@ -988,7 +987,7 @@ async fn finalize_span( builder = builder.metrics(metrics); } if let Ok(log) = builder.build() { - span.log(log).await; + span.log(log); } } Err(e) => { @@ -996,7 +995,7 @@ async fn finalize_span( } } } - span.end().await; + span.end(); // Flush span with aggregated output if let Err(e) = span.flush().await { tracing::warn!("Failed to flush span: {}", e); diff --git a/src/test_utils.rs b/src/test_utils.rs index 3fdafcf..0075a6b 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -58,12 +58,7 @@ impl MockBraintrustClient { #[async_trait] impl SpanSubmitter for MockBraintrustClient { - async fn submit( - &self, - token: String, - payload: SpanPayload, - parent_info: Option, - ) { + fn submit(&self, token: String, payload: SpanPayload, parent_info: Option) { self.collector.push(CapturedSpan { token, payload, @@ -71,6 +66,10 @@ impl SpanSubmitter for MockBraintrustClient { }); } + async fn flush(&self) -> Result<()> { + Ok(()) + } + async fn trigger_flush(&self) -> Result<()> { // Mock: no-op for tests Ok(()) diff --git a/tests/http_flow.rs b/tests/http_flow.rs index 9391be8..fbe5d9b 100644 --- a/tests/http_flow.rs +++ b/tests/http_flow.rs @@ -39,8 +39,7 @@ async fn submits_with_bearer_token() { .input(Value::String("input".into())) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); client.flush().await.expect("client flush"); @@ -79,10 +78,20 @@ async fn flush_is_fire_and_forget() { .project_name("demo") .build(); - // flush() queues the data and returns immediately. - // The actual HTTP submission (which fails) happens in the background. + span.log( + SpanLog::builder() + .input(Value::String("input".into())) + .build() + .expect("build"), + ); + + // flush() waits for queued work to be processed. Background submission errors + // are still swallowed by the queue and should not bubble out here. let result = span.flush().await; - assert!(result.is_ok(), "flush() should be fire-and-forget"); + assert!( + result.is_ok(), + "flush() should not surface background submission errors" + ); // Drain pending items before drop so the Drop impl's synchronous flush path // is a no-op (queue is empty). The 500 from project registration is swallowed. diff --git a/tests/span_lifecycle.rs b/tests/span_lifecycle.rs index 2840cfc..3a52bef 100644 --- a/tests/span_lifecycle.rs +++ b/tests/span_lifecycle.rs @@ -45,8 +45,7 @@ async fn span_lifecycle_flushes_to_logs_endpoint() { .output(Value::String("output".into())) .build() .expect("build"), - ) - .await; + ); span.flush().await.expect("flush"); client.flush().await.expect("client flush"); @@ -172,7 +171,6 @@ async fn client_update_span_with_credentials_works_without_priming_login_state() .build() .expect("build"), ) - .await .expect("update"); client.flush().await.expect("flush");