Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ impl<S: SpanSubmitter + DatasetRegistrar + DatasetFetcher + DatasetSummarizer +
row_id: row.id.clone(),
span_id: row.id,
is_merge: row.is_merge,
span_components: None,
org_id: String::new(),
org_name: Some(self.org_name.clone()),
project_name: Some(self.project_name.clone()),
Expand Down
11 changes: 9 additions & 2 deletions src/experiments/experiment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::experiments::metadata::{
use crate::experiments::span_builder::ExperimentSpanBuilder;
use crate::experiments::summary::{ExperimentSummary, MetricSummary, ScoreSummary};
use crate::span::{SpanBuilder, SpanHandle, SpanLog, SpanSubmitter};
use crate::types::{ParentSpanInfo, SpanPayload, SpanType};
use crate::types::{ParentSpanInfo, SpanAttributes, SpanPayload, SpanType};

/// Metadata from experiment registration, cached after lazy initialization.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -184,6 +184,7 @@ impl<
row_id: id.to_string(),
span_id: id.to_string(), // Use same ID for merge
is_merge: true,
span_components: None,
org_id: self.org_id.clone(),
org_name: self.org_name.clone(),
project_name: None,
Expand Down Expand Up @@ -365,6 +366,7 @@ impl<
row_id: id.to_string(),
span_id: id.to_string(),
is_merge: true,
span_components: None,
org_id: self.org_id.clone(),
org_name: self.org_name.clone(),
project_name: None,
Expand All @@ -377,7 +379,12 @@ impl<
metrics: event.metrics,
tags: event.tags,
context: event.context,
span_attributes: None,
span_attributes: event.name.map(|name| SpanAttributes {
name: Some(name),
span_type: None,
purpose: None,
extra: HashMap::new(),
}),
};

let parent_info = ParentSpanInfo::Experiment {
Expand Down
171 changes: 146 additions & 25 deletions src/log_queue/queue.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::Context;
use chrono::Utc;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;

use super::batching::batch_and_serialize_rows;
Expand Down Expand Up @@ -364,6 +365,7 @@ impl LogQueueCore {
row_id,
span_id,
is_merge,
span_components,
org_id,
org_name,
project_name,
Expand All @@ -379,6 +381,48 @@ impl LogQueueCore {
span_attributes,
} = payload;

if let Some(span_components) = span_components {
let destination = self
.destination_from_span_components(
token,
&org_id,
org_name.as_deref(),
&span_components,
)
.await?;
let root_span_id = span_components
.root_span_id
.clone()
.unwrap_or_else(|| span_id.clone());

return Ok(Logs3Row {
id: row_id,
is_merge: if is_merge { Some(true) } else { None },
merge_paths: None,
span_id,
root_span_id,
span_parents: None,
destination,
org_id,
org_name,
input,
output,
expected,
error,
scores,
metadata,
metrics,
tags,
context,
span_attributes,
extra: HashMap::new(),
created: Utc::now(),
xact_id: None,
object_delete: None,
audit_source: Some("api".to_string()),
});
}

let project_id = if let Some(ref pn) = project_name {
Some(
self.ensure_project_id(token, &org_id, org_name.as_deref(), pn)
Expand Down Expand Up @@ -465,6 +509,48 @@ impl LogQueueCore {
})
}

async fn destination_from_span_components(
&self,
token: &str,
org_id: &str,
org_name: Option<&str>,
span_components: &crate::span_components::SpanComponents,
) -> std::result::Result<LogDestination, anyhow::Error> {
if let Some(object_id) = span_components.object_id.as_ref() {
return Ok(match span_components.object_type {
SpanObjectType::Experiment => LogDestination::experiment(object_id.clone()),
SpanObjectType::ProjectLogs => LogDestination::project_logs(object_id.clone()),
SpanObjectType::PlaygroundLogs => {
LogDestination::playground_logs(object_id.clone())
}
});
}

match span_components.object_type {
SpanObjectType::ProjectLogs => {
let args = span_components
.compute_object_metadata_args
.as_ref()
.ok_or_else(|| anyhow::anyhow!("missing compute_object_metadata_args"))?;
if let Some(project_id) = args.get("project_id").and_then(Value::as_str) {
return Ok(LogDestination::project_logs(project_id.to_string()));
}
let project_name = args
.get("project_name")
.and_then(Value::as_str)
.ok_or_else(|| anyhow::anyhow!("missing project_name"))?;
let project_id = self
.ensure_project_id(token, org_id, org_name, project_name)
.await?;
Ok(LogDestination::project_logs(project_id))
}
SpanObjectType::Experiment => anyhow::bail!("experiment span is missing object_id"),
SpanObjectType::PlaygroundLogs => {
anyhow::bail!("playground span is missing object_id")
}
}
}

/// Ensure a project ID is available for the given project name, registering
/// it via the API if it is not yet cached.
///
Expand Down Expand Up @@ -606,42 +692,76 @@ impl LogQueueCore {
parent_info,
} = cmd;

// Build a best-effort destination from parent_info (project registration
// hasn't happened yet, so project-name-based destinations are unknown).
let span_components = payload.span_components.clone();

let span_id = payload.span_id.clone();
let destination = match parent_info.as_ref() {
Some(ParentSpanInfo::Experiment { object_id }) => {
LogDestination::experiment(object_id.clone())
}
Some(ParentSpanInfo::ProjectLogs { object_id }) => {
LogDestination::project_logs(object_id.clone())
}
Some(ParentSpanInfo::PlaygroundLogs { object_id }) => {
LogDestination::playground_logs(object_id.clone())
}
Some(ParentSpanInfo::Dataset { object_id }) => {
LogDestination::dataset(object_id.clone())
}
Some(ParentSpanInfo::FullSpan {
object_type,
object_id,
..
}) => match object_type {
SpanObjectType::Experiment => LogDestination::experiment(object_id.clone()),
SpanObjectType::ProjectLogs => LogDestination::project_logs(object_id.clone()),
SpanObjectType::PlaygroundLogs => {
let destination = match span_components.as_ref() {
Some(span_components) => match (
span_components.object_type,
span_components.object_id.as_ref(),
) {
(SpanObjectType::Experiment, Some(object_id)) => {
LogDestination::experiment(object_id.clone())
}
(SpanObjectType::ProjectLogs, Some(object_id)) => {
LogDestination::project_logs(object_id.clone())
}
(SpanObjectType::PlaygroundLogs, Some(object_id)) => {
LogDestination::playground_logs(object_id.clone())
}
(SpanObjectType::ProjectLogs, None) => match span_components
.compute_object_metadata_args
.as_ref()
.and_then(|args| args.get("project_id"))
.and_then(Value::as_str)
{
Some(project_id) => LogDestination::project_logs(project_id.to_string()),
None => return,
},
(SpanObjectType::Experiment, None) => return,
(SpanObjectType::PlaygroundLogs, None) => return,
},
None => match parent_info.as_ref() {
Some(ParentSpanInfo::Experiment { object_id }) => {
LogDestination::experiment(object_id.clone())
}
Some(ParentSpanInfo::ProjectLogs { object_id }) => {
LogDestination::project_logs(object_id.clone())
}
Some(ParentSpanInfo::ProjectName { .. }) => return,
Some(ParentSpanInfo::PlaygroundLogs { object_id }) => {
LogDestination::playground_logs(object_id.clone())
}
Some(ParentSpanInfo::Dataset { object_id }) => {
LogDestination::dataset(object_id.clone())
}
Some(ParentSpanInfo::FullSpan {
object_type,
object_id,
..
}) => match object_type {
SpanObjectType::Experiment => LogDestination::experiment(object_id.clone()),
SpanObjectType::ProjectLogs => {
LogDestination::project_logs(object_id.clone())
}
SpanObjectType::PlaygroundLogs => {
LogDestination::playground_logs(object_id.clone())
}
},
None => return,
},
_ => LogDestination::project_logs("unknown".to_string()),
};

let root_span_id = span_components
.as_ref()
.and_then(|components| components.root_span_id.clone())
.unwrap_or_else(|| span_id.clone());
let row = Logs3Row {
id: payload.row_id,
span_id: span_id.clone(),
is_merge: if payload.is_merge { Some(true) } else { None },
merge_paths: None,
root_span_id: span_id,
root_span_id,
span_parents: None,
destination,
org_id: payload.org_id,
Expand Down Expand Up @@ -826,6 +946,7 @@ mod tests {
row_id: id.to_string(),
span_id: id.to_string(),
is_merge: false,
span_components: None,
org_id: "org".to_string(),
org_name: Some("test-org".to_string()),
project_name: None,
Expand Down
Loading
Loading