Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 34 additions & 24 deletions crates/core/src/observability/atif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ const TOKEN_USAGE_KNOWN_KEYS: &[&str] = &[
"completion_tokens",
"cached_tokens",
"cost_usd",
"cost",
"prompt_token_ids",
"completion_token_ids",
"logprobs",
Expand All @@ -638,7 +639,7 @@ const TOKEN_USAGE_KNOWN_KEYS: &[&str] = &[
///
/// Supports NeMo Relay `token_usage` and provider-native `usage` payloads.
/// Populates `extra` with any unknown usage keys (e.g. reasoning_tokens or total_tokens).
/// Returns `None` if the response has no recognized token counts.
/// Returns `None` if the response has no recognized token or cost metrics.
fn extract_metrics(output: &Json) -> Option<AtifMetrics> {
let usage = token_usage_object(output)?;
let prompt = usage_u64(usage, &["prompt_tokens", "input_tokens"]);
Expand All @@ -651,7 +652,10 @@ fn extract_metrics(output: &Json) -> Option<AtifMetrics> {
&["cache_read_input_tokens", "cache_creation_input_tokens"],
)
});
let cost = usage.get("cost_usd").and_then(Json::as_f64);
let cost = usage
.get("cost_usd")
.and_then(Json::as_f64)
.or_else(|| usage.get("cost")?.as_object()?.get("total")?.as_f64());
let prompt_ids = usage
.get("prompt_token_ids")
.and_then(Json::as_array)
Expand All @@ -675,7 +679,7 @@ fn extract_metrics(output: &Json) -> Option<AtifMetrics> {
} else {
Some(Json::Object(extra_map))
};
if prompt.is_none() && completion.is_none() && cached.is_none() {
if prompt.is_none() && completion.is_none() && cached.is_none() && cost.is_none() {
return None;
}
Some(AtifMetrics {
Expand Down Expand Up @@ -1064,44 +1068,50 @@ fn event_extra(event: &Event) -> Json {
Json::Object(extra)
}

/// Compute aggregate `final_metrics` by summing token counts across all steps.
/// Compute aggregate `final_metrics` by summing metrics across all steps.
///
/// Always returns `Some(AtifFinalMetrics)` with `total_steps` set. Token/cost
/// fields are populated when at least one step carries metrics.
/// Always returns `Some(AtifFinalMetrics)` with `total_steps` set. Each token
/// or cost total is populated only when at least one step provides that field.
fn compute_final_metrics(steps: &[AtifStep]) -> Option<AtifFinalMetrics> {
let mut total_prompt: u64 = 0;
let mut total_completion: u64 = 0;
let mut total_cached: u64 = 0;
let mut total_cost: f64 = 0.0;
let mut has_any = false;
let mut has_prompt = false;
let mut has_completion = false;
let mut has_cached = false;
let mut has_cost = false;

for step in steps {
if let Some(m) = &step.metrics {
has_any = true;
total_prompt += m.prompt_tokens.unwrap_or(0);
total_completion += m.completion_tokens.unwrap_or(0);
total_cached += m.cached_tokens.unwrap_or(0);
total_cost += m.cost_usd.unwrap_or(0.0);
if let Some(prompt_tokens) = m.prompt_tokens {
has_prompt = true;
total_prompt += prompt_tokens;
}
if let Some(completion_tokens) = m.completion_tokens {
has_completion = true;
total_completion += completion_tokens;
}
if let Some(cached_tokens) = m.cached_tokens {
has_cached = true;
total_cached += cached_tokens;
}
if let Some(cost) = m.cost_usd {
has_cost = true;
total_cost += cost;
}
}
}

Some(AtifFinalMetrics {
total_prompt_tokens: if has_any { Some(total_prompt) } else { None },
total_completion_tokens: if has_any {
total_prompt_tokens: if has_prompt { Some(total_prompt) } else { None },
total_completion_tokens: if has_completion {
Some(total_completion)
} else {
None
},
total_cached_tokens: if has_any && total_cached > 0 {
Some(total_cached)
} else {
None
},
total_cost_usd: if has_any && total_cost > 0.0 {
Some(total_cost)
} else {
None
},
total_cached_tokens: if has_cached { Some(total_cached) } else { None },
total_cost_usd: if has_cost { Some(total_cost) } else { None },
total_steps: Some(steps.len() as u64),
extra: None,
})
Expand Down
34 changes: 25 additions & 9 deletions crates/core/src/observability/openinference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,10 @@ fn start_attributes(event: &Event) -> Vec<KeyValue> {

fn end_attributes(event: &Event) -> Vec<KeyValue> {
let mut attributes = Vec::new();
let is_llm = event
.category()
.is_some_and(|category| category.as_str() == "llm");

push_serialized(
&mut attributes,
"nemo_relay.end.output_json",
Expand All @@ -696,10 +700,7 @@ fn end_attributes(event: &Event) -> Vec<KeyValue> {
attributes.push(KeyValue::new(oi::output::VALUE, output));
attributes.push(KeyValue::new(oi::output::MIME_TYPE, mime_type));
}
let fallback_usage = if event
.category()
.is_some_and(|category| category.as_str() == "llm")
{
let fallback_usage = if is_llm {
usage_from_manual_llm_output(event.output())
} else {
None
Expand All @@ -708,11 +709,7 @@ fn end_attributes(event: &Event) -> Vec<KeyValue> {
.annotated_response()
.and_then(|response| response.usage.as_ref())
.or(fallback_usage.as_ref());
if event
.category()
.is_some_and(|category| category.as_str() == "llm")
&& let Some(usage) = usage
{
if is_llm && let Some(usage) = usage {
if let Some(v) = usage.prompt_tokens {
attributes.push(KeyValue::new(oi::llm::token_count::PROMPT, v as i64));
}
Expand All @@ -735,9 +732,28 @@ fn end_attributes(event: &Event) -> Vec<KeyValue> {
));
}
}
if is_llm && let Some(cost_total) = cost_total_from_manual_llm_output(event.output()) {
attributes.push(KeyValue::new(oi::llm::cost::TOTAL, cost_total));
}
attributes
}

fn cost_total_from_manual_llm_output(output: Option<&Json>) -> Option<f64> {
let object = output?.as_object()?;
let usage = object.get("usage").and_then(Json::as_object);
let token_usage = object.get("token_usage").and_then(Json::as_object);
usage
.and_then(cost_total_from_usage)
.or_else(|| token_usage.and_then(cost_total_from_usage))
}

fn cost_total_from_usage(usage: &serde_json::Map<String, Json>) -> Option<f64> {
usage
.get("cost_usd")
.and_then(Json::as_f64)
.or_else(|| usage.get("cost")?.as_object()?.get("total")?.as_f64())
}

fn usage_from_manual_llm_output(output: Option<&Json>) -> Option<Usage> {
let object = output?.as_object()?;
let usage = object.get("usage").and_then(Json::as_object);
Expand Down
40 changes: 39 additions & 1 deletion crates/core/tests/unit/atif_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@ fn test_extract_metrics_supports_provider_usage_payloads() {
"prompt_tokens": 10,
"completion_tokens": 20,
"total_tokens": 30,
"cost_usd": 0.001,
"prompt_tokens_details": {
"cached_tokens": 4
}
Expand All @@ -627,19 +628,56 @@ fn test_extract_metrics_supports_provider_usage_payloads() {
openai_metrics.extra.as_ref().unwrap()["total_tokens"],
json!(30)
);
assert_eq!(openai_metrics.cost_usd, Some(0.001));

let anthropic_metrics = extract_metrics(&json!({
"usage": {
"input_tokens": 11,
"output_tokens": 22,
"cache_read_input_tokens": 3,
"cache_creation_input_tokens": 5
"cache_creation_input_tokens": 5,
"cost": { "total": 0.0042 }
}
}))
.unwrap();
assert_eq!(anthropic_metrics.prompt_tokens, Some(11));
assert_eq!(anthropic_metrics.completion_tokens, Some(22));
assert_eq!(anthropic_metrics.cached_tokens, Some(8));
assert_eq!(anthropic_metrics.cost_usd, Some(0.0042));
}

#[test]
fn test_final_metrics_preserve_explicit_zero_cost_without_fabricating_tokens() {
let final_metrics = compute_final_metrics(&[AtifStep {
step_id: 1,
source: "assistant".to_string(),
message: json!("done"),
timestamp: None,
model_name: None,
reasoning_effort: None,
reasoning_content: None,
tool_calls: None,
observation: None,
metrics: Some(AtifMetrics {
prompt_tokens: None,
completion_tokens: None,
cached_tokens: None,
cost_usd: Some(0.0),
prompt_token_ids: None,
completion_token_ids: None,
logprobs: None,
extra: None,
}),
llm_call_count: None,
is_copied_context: None,
extra: None,
}])
.unwrap();

assert_eq!(final_metrics.total_prompt_tokens, None);
assert_eq!(final_metrics.total_completion_tokens, None);
assert_eq!(final_metrics.total_cached_tokens, None);
assert_eq!(final_metrics.total_cost_usd, Some(0.0));
}

#[test]
Expand Down
12 changes: 9 additions & 3 deletions crates/core/tests/unit/observability/atof_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ fn subscriber_preserves_wire_format_llm_lifecycle_payloads_as_raw_jsonl() {
"input_tokens": 11,
"output_tokens": 7,
"cache_read_input_tokens": 3,
"cache_creation_input_tokens": 5
"cache_creation_input_tokens": 5,
"cost": {"total": 0.0042}
}
}),
),
Expand Down Expand Up @@ -335,7 +336,8 @@ fn subscriber_preserves_wire_format_llm_lifecycle_payloads_as_raw_jsonl() {
"input_tokens": 75,
"output_tokens": 20,
"total_tokens": 95,
"input_tokens_details": {"cached_tokens": 10}
"input_tokens_details": {"cached_tokens": 10},
"cost_usd": 0.005
}
}),
),
Expand Down Expand Up @@ -371,7 +373,8 @@ fn subscriber_preserves_wire_format_llm_lifecycle_payloads_as_raw_jsonl() {
"prompt_tokens": 3,
"completion_tokens": 4,
"total_tokens": 7,
"prompt_tokens_details": {"cached_tokens": 2}
"prompt_tokens_details": {"cached_tokens": 2},
"cost_usd": 0.001
}
}),
),
Expand Down Expand Up @@ -407,6 +410,7 @@ fn subscriber_preserves_wire_format_llm_lifecycle_payloads_as_raw_jsonl() {
assert_eq!(lines[1]["scope_category"], "end");
assert_eq!(lines[1]["data"]["content"][1]["type"], "tool_use");
assert_eq!(lines[1]["data"]["usage"]["cache_creation_input_tokens"], 5);
assert_eq!(lines[1]["data"]["usage"]["cost"]["total"], 0.0042);

assert_eq!(lines[2]["metadata"]["gateway_path"], "/v1/responses");
assert_eq!(lines[2]["data"]["input"], "Find the weather.");
Expand All @@ -415,6 +419,7 @@ fn subscriber_preserves_wire_format_llm_lifecycle_payloads_as_raw_jsonl() {
lines[3]["data"]["usage"]["input_tokens_details"]["cached_tokens"],
10
);
assert_eq!(lines[3]["data"]["usage"]["cost_usd"], 0.005);

assert_eq!(lines[4]["metadata"]["gateway_path"], "/v1/chat/completions");
assert_eq!(
Expand All @@ -429,6 +434,7 @@ fn subscriber_preserves_wire_format_llm_lifecycle_payloads_as_raw_jsonl() {
lines[5]["data"]["usage"]["prompt_tokens_details"]["cached_tokens"],
2
);
assert_eq!(lines[5]["data"]["usage"]["cost_usd"], 0.001);
}

#[test]
Expand Down
16 changes: 13 additions & 3 deletions crates/core/tests/unit/observability/openinference_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,8 @@ fn output_value_extracts_chat_completion_display_text() {
"prompt_tokens": 3,
"completion_tokens": 4,
"total_tokens": 7,
"prompt_tokens_details": {"cached_tokens": 2}
"prompt_tokens_details": {"cached_tokens": 2},
"cost_usd": 0.001
}
})),
));
Expand All @@ -718,6 +719,7 @@ fn output_value_extracts_chat_completion_display_text() {
attributes.get("llm.token_count.prompt_details.cache_read"),
Some(&"2".to_string())
);
assert_eq!(attributes.get("llm.cost.total"), Some(&"0.001".to_string()));
}

#[test]
Expand Down Expand Up @@ -767,7 +769,8 @@ fn output_value_extracts_openai_responses_display_text_and_usage() {
"input_tokens": 75,
"output_tokens": 20,
"total_tokens": 95,
"input_tokens_details": {"cached_tokens": 10}
"input_tokens_details": {"cached_tokens": 10},
"cost_usd": 0.005
}
})),
));
Expand Down Expand Up @@ -801,6 +804,7 @@ fn output_value_extracts_openai_responses_display_text_and_usage() {
attributes.get("llm.token_count.prompt_details.cache_read"),
Some(&"10".to_string())
);
assert_eq!(attributes.get("llm.cost.total"), Some(&"0.005".to_string()));
}

#[test]
Expand Down Expand Up @@ -1496,6 +1500,7 @@ fn llm_end_with_manual_usage_payload_emits_token_count_attributes() {
attributes.get("llm.token_count.prompt_details.cache_write"),
Some(&"10".to_string())
);
assert!(!attributes.contains_key("llm.cost.total"));
}

#[test]
Expand Down Expand Up @@ -1546,7 +1551,8 @@ fn anthropic_messages_output_emits_openinference_text_tool_and_usage_attributes(
"input_tokens": 11,
"output_tokens": 7,
"cache_read_input_tokens": 3,
"cache_creation_input_tokens": 5
"cache_creation_input_tokens": 5,
"cost": {"total": 0.0042}
}
})),
Some(
Expand Down Expand Up @@ -1593,6 +1599,10 @@ fn anthropic_messages_output_emits_openinference_text_tool_and_usage_attributes(
attributes.get("llm.token_count.prompt_details.cache_write"),
Some(&"5".to_string())
);
assert_eq!(
attributes.get("llm.cost.total"),
Some(&"0.0042".to_string())
);
}

#[test]
Expand Down
3 changes: 2 additions & 1 deletion docs/supported-integrations/openclaw-plugin.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ emits diagnostic marks instead of inventing latency. This keeps traces honest
and makes current fidelity boundaries explicit.

When OpenClaw provides usage data, the plugin maps input, output, total, cache
read, cache write, and cost fields into OpenInference-friendly usage fields.
read, and cache write fields into OpenInference token-count attributes, and
maps explicit cost fields into OpenInference cost attributes.

## Troubleshooting

Expand Down
2 changes: 2 additions & 0 deletions integrations/openclaw/test/llm-replay.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ describe('LLM replay', () => {
output_tokens: 7,
cache_read_input_tokens: 3,
cache_creation_input_tokens: 5,
cost: { total: 0.0042 },
},
};

Expand Down Expand Up @@ -182,6 +183,7 @@ describe('LLM replay', () => {
cache_read_tokens: 3,
cache_write_tokens: 5,
total_tokens: 18,
cost_usd: 0.0042,
});
assert.deepEqual((response.openclaw as ResponseOpenClaw).assistant_tool_call_names, ['web_search']);
});
Expand Down
Loading