From b9390a03f591ff88e87d65b644b1201ff1b1898f Mon Sep 17 00:00:00 2001 From: mnajafian-nv Date: Tue, 2 Jun 2026 15:57:29 -0700 Subject: [PATCH 1/2] fix: improve OpenClaw observability consistency Signed-off-by: mnajafian-nv --- crates/core/src/observability/atif.rs | 58 ++++- .../core/src/observability/openinference.rs | 100 +++++++- crates/core/tests/unit/atif_tests.rs | 107 +++++++++ .../tests/unit/observability/atof_tests.rs | 215 +++++++++++++++++- .../unit/observability/openinference_tests.rs | 191 +++++++++++++++- integrations/openclaw/src/hook-replay/llm.ts | 37 ++- integrations/openclaw/test/llm-replay.test.ts | 129 +++++++++++ 7 files changed, 823 insertions(+), 14 deletions(-) diff --git a/crates/core/src/observability/atif.rs b/crates/core/src/observability/atif.rs index 96674db1..efc0dade 100644 --- a/crates/core/src/observability/atif.rs +++ b/crates/core/src/observability/atif.rs @@ -433,12 +433,16 @@ fn unwrap_llm_request(input: &Json) -> Json { /// Extract the user-facing message content from a raw LLM response. /// -/// Looks for OpenAI-compatible and Hermes-compatible response content fields. +/// Looks for provider response content fields that can be represented as an +/// ATIF agent message. /// Tool-call-only responses use an empty string message and keep the full /// response under `Step.extra.llm_response`. fn extract_llm_response_message(output: &Json) -> Json { if let Some(obj) = output.as_object() { if let Some(content) = non_null_object_field(obj, "content") { + if let Some(message) = anthropic_messages_content_message(output, &content) { + return message; + } return atif_content_value(&content); } if let Some(content) = obj @@ -483,6 +487,38 @@ fn atif_content_value(value: &Json) -> Json { } } +fn anthropic_messages_content_message(output: &Json, content: &Json) -> Option { + let object = output.as_object()?; + if object.get("type").and_then(Json::as_str) != Some("message") { + return None; + } + let blocks = content.as_array()?; + let mut text_parts = Vec::new(); + let mut has_tool_use = false; + for block in blocks { + let Some(block_object) = block.as_object() else { + continue; + }; + match block_object.get("type").and_then(Json::as_str) { + Some("text") => { + if let Some(text) = block_object.get("text").and_then(Json::as_str) + && !text.trim().is_empty() + { + text_parts.push(text.to_string()); + } + } + Some("tool_use") => has_tool_use = true, + _ => {} + } + } + match text_parts.as_slice() { + [] if has_tool_use => Some(empty_message()), + [] => None, + [text] => Some(Json::String(text.clone())), + _ => Some(Json::String(text_parts.join("\n"))), + } +} + fn observation_content_value(value: &Json) -> Option { (!value.is_null()).then(|| value.clone()) } @@ -865,7 +901,8 @@ fn extract_tool_calls(output: &Json) -> Option> { let arr = tool_call_array(output) .filter(|arr| !arr.is_empty()) .map(|arr| arr.iter().collect::>()) - .or_else(|| openai_responses_function_call_items(output))?; + .or_else(|| openai_responses_function_call_items(output)) + .or_else(|| anthropic_messages_tool_use_items(output))?; let mut calls = Vec::with_capacity(arr.len()); for (index, tc) in arr.iter().enumerate() { let tc_obj = tc.as_object()?; @@ -892,7 +929,8 @@ fn extract_tool_calls(output: &Json) -> Option> { let raw_arguments = func .and_then(|f| f.get("arguments")) .or_else(|| tc_obj.get("arguments")) - .or_else(|| tc_obj.get("args")); + .or_else(|| tc_obj.get("args")) + .or_else(|| tc_obj.get("input")); let arguments = normalize_tool_arguments(raw_arguments); // Skip entries with no id and no name — they are not meaningful. if id.is_empty() && name.is_empty() { @@ -936,6 +974,19 @@ fn openai_responses_function_call_items(output: &Json) -> Option> { (!function_call_items.is_empty()).then_some(function_call_items) } +fn anthropic_messages_tool_use_items(output: &Json) -> Option> { + let object = output.as_object()?; + if object.get("type").and_then(Json::as_str) != Some("message") { + return None; + } + let content_blocks = object.get("content").and_then(Json::as_array)?; + let tool_use_items = content_blocks + .iter() + .filter(|item| item.get("type").and_then(Json::as_str) == Some("tool_use")) + .collect::>(); + (!tool_use_items.is_empty()).then_some(tool_use_items) +} + fn normalize_tool_arguments(raw_arguments: Option<&Json>) -> Json { let Some(raw_arguments) = raw_arguments else { return serde_json::json!({}); @@ -968,6 +1019,7 @@ fn tool_call_extra(tool_call: &Json) -> Option { | "function_name" | "arguments" | "args" + | "input" ) { extra.insert(key.clone(), value.clone()); } diff --git a/crates/core/src/observability/openinference.rs b/crates/core/src/observability/openinference.rs index aac56981..0e9baac6 100644 --- a/crates/core/src/observability/openinference.rs +++ b/crates/core/src/observability/openinference.rs @@ -779,7 +779,23 @@ fn usage_from_manual_llm_output(output: Option<&Json>) -> Option { "cacheReadInputTokens", "cacheRead", ], - ); + ) + .or_else(|| { + first_nested_u64_from_manual_usage( + usage, + token_usage, + "input_tokens_details", + "cached_tokens", + ) + }) + .or_else(|| { + first_nested_u64_from_manual_usage( + usage, + token_usage, + "prompt_tokens_details", + "cached_tokens", + ) + }); let cache_write_tokens = first_u64_from_manual_usage( usage, token_usage, @@ -838,6 +854,29 @@ fn first_u64_from_manual_usage( .or_else(|| token_usage.and_then(|value| first_u64(value, keys))) } +fn first_nested_u64_from_manual_usage( + usage: Option<&serde_json::Map>, + token_usage: Option<&serde_json::Map>, + parent_key: &str, + child_key: &str, +) -> Option { + usage + .and_then(|value| nested_u64(value, parent_key, child_key)) + .or_else(|| token_usage.and_then(|value| nested_u64(value, parent_key, child_key))) +} + +fn nested_u64( + usage: &serde_json::Map, + parent_key: &str, + child_key: &str, +) -> Option { + usage + .get(parent_key) + .and_then(Json::as_object) + .and_then(|details| details.get(child_key)) + .and_then(Json::as_u64) +} + fn first_u64(usage: &serde_json::Map, keys: &[&str]) -> Option { keys.iter() .find_map(|key| usage.get(*key).and_then(Json::as_u64)) @@ -985,8 +1024,13 @@ fn display_text_from_json(value: &Json) -> Option { } } object - .get("choices") - .and_then(display_text_from_chat_choices) + .get("output") + .and_then(display_text_from_openai_responses_output) + .or_else(|| { + object + .get("choices") + .and_then(display_text_from_chat_choices) + }) .or_else(|| { object .get("tool_calls") @@ -998,6 +1042,56 @@ fn display_text_from_json(value: &Json) -> Option { } } +fn display_text_from_openai_responses_output(value: &Json) -> Option { + let items = value.as_array()?; + let mut entries = Vec::new(); + let mut tool_names = Vec::new(); + for item in items { + let Some(object) = item.as_object() else { + continue; + }; + match object.get("type").and_then(Json::as_str) { + Some("message") => { + if let Some(content) = object + .get("content") + .and_then(display_text_from_openai_responses_content) + { + entries.push(content); + } + } + Some("function_call") => { + if let Some(name) = object.get("name").and_then(Json::as_str) { + tool_names.push(name.to_string()); + } + } + _ => {} + } + } + if !tool_names.is_empty() { + entries.push(format!("Requested tools: {}", tool_names.join(", "))); + } + let text = entries.join("\n").trim().to_string(); + if text.is_empty() { None } else { Some(text) } +} + +fn display_text_from_openai_responses_content(value: &Json) -> Option { + let content = value.as_array()?; + let text = content + .iter() + .filter_map(|part| { + let object = part.as_object()?; + match object.get("type").and_then(Json::as_str) { + Some("output_text" | "text") => object.get("text").and_then(Json::as_str), + _ => None, + } + }) + .collect::>() + .join("\n") + .trim() + .to_string(); + if text.is_empty() { None } else { Some(text) } +} + fn display_text_from_messages(value: &Json) -> Option { let messages = value.as_array()?; let text = messages diff --git a/crates/core/tests/unit/atif_tests.rs b/crates/core/tests/unit/atif_tests.rs index a59048e6..16a533a7 100644 --- a/crates/core/tests/unit/atif_tests.rs +++ b/crates/core/tests/unit/atif_tests.rs @@ -755,6 +755,113 @@ fn test_exporter_openai_responses_lifecycle_extracts_messages() { assert_eq!(agent_extra.llm_response.unwrap()["id"], json!("resp_1")); } +#[test] +fn test_exporter_anthropic_messages_lifecycle_promotes_tool_use_blocks() { + let exporter = AtifExporter::new("session-1".to_string(), make_agent_info()); + let llm_uuid = Uuid::now_v7(); + let tool_uuid = Uuid::now_v7(); + let base = base_timestamp(); + + let mut start = event_builder(llm_uuid, EventType::Start) + .name("claude-sonnet-4") + .scope_type(ScopeType::Llm) + .input(json!({ + "model": "claude-sonnet-4", + "system": "Be concise.", + "messages": [{"role": "user", "content": "Find the file."}], + "tools": [{ + "name": "search", + "input_schema": { + "type": "object", + "properties": {"query": {"type": "string"}} + } + }] + })) + .model_name("claude-sonnet-4") + .build(); + let mut end = event_builder(llm_uuid, EventType::End) + .name("claude-sonnet-4") + .scope_type(ScopeType::Llm) + .output(json!({ + "id": "msg_01", + "type": "message", + "role": "assistant", + "model": "claude-sonnet-4", + "content": [ + {"type": "text", "text": "I will search for it."}, + { + "type": "tool_use", + "id": "toolu_01", + "name": "search", + "input": {"query": "file"} + } + ], + "stop_reason": "tool_use", + "usage": { + "input_tokens": 11, + "output_tokens": 7, + "cache_read_input_tokens": 3, + "cache_creation_input_tokens": 5 + } + })) + .model_name("claude-sonnet-4") + .build(); + let mut tool_end = event_builder(tool_uuid, EventType::End) + .name("search") + .scope_type(ScopeType::Tool) + .parent_uuid(llm_uuid) + .tool_call_id("toolu_01") + .output(json!({"matches": ["README.md"]})) + .build(); + + for (offset, event) in [&mut start, &mut end, &mut tool_end] + .into_iter() + .enumerate() + { + set_event_timestamp(event, base + chrono::Duration::milliseconds(offset as i64)); + } + + { + let mut state = exporter.state.lock().unwrap(); + state.events.extend([start, end, tool_end]); + } + + let trajectory = exporter.export().unwrap(); + assert_atif_v17_shape(&trajectory); + assert_eq!(trajectory.steps.len(), 2); + + let user_step = &trajectory.steps[0]; + assert_eq!(user_step.source, "user"); + assert_eq!(user_step.message, json!("Find the file.")); + let user_extra: AtifStepExtra = + serde_json::from_value(user_step.extra.clone().unwrap()).unwrap(); + let llm_request = user_extra.llm_request.unwrap(); + assert_eq!(llm_request["system"], json!("Be concise.")); + assert_eq!(llm_request["tools"][0]["name"], json!("search")); + + let agent_step = &trajectory.steps[1]; + assert_eq!(agent_step.source, "agent"); + assert_eq!(agent_step.message, json!("I will search for it.")); + assert_eq!(agent_step.model_name, Some("claude-sonnet-4".to_string())); + let metrics = agent_step.metrics.as_ref().unwrap(); + assert_eq!(metrics.prompt_tokens, Some(11)); + assert_eq!(metrics.completion_tokens, Some(7)); + assert_eq!(metrics.cached_tokens, Some(8)); + let tool_call = &agent_step.tool_calls.as_ref().unwrap()[0]; + assert_eq!(tool_call.tool_call_id, "toolu_01"); + assert_eq!(tool_call.function_name, "search"); + assert_eq!(tool_call.arguments, json!({"query": "file"})); + let observation = agent_step.observation.as_ref().unwrap(); + assert_eq!( + observation.results[0].source_call_id.as_deref(), + Some("toolu_01") + ); + assert_eq!( + observation.results[0].content, + Some(json!({"matches": ["README.md"]})) + ); +} + #[test] fn test_openai_responses_input_extracts_latest_user_content_block() { let message = extract_user_messages(&json!({ diff --git a/crates/core/tests/unit/observability/atof_tests.rs b/crates/core/tests/unit/observability/atof_tests.rs index 51fe6af2..43831c0d 100644 --- a/crates/core/tests/unit/observability/atof_tests.rs +++ b/crates/core/tests/unit/observability/atof_tests.rs @@ -5,7 +5,8 @@ use super::*; use crate::api::event::{ - BaseEvent, CategoryProfile, Event, EventCategory, MarkEvent, ScopeCategory, ScopeEvent, + BaseEvent, CategoryProfile, DataSchema, Event, EventCategory, MarkEvent, ScopeCategory, + ScopeEvent, }; use crate::api::runtime::NemoRelayContextState; use crate::api::runtime::global_context; @@ -103,6 +104,40 @@ fn make_annotated_llm_event(name: &str) -> Event { )) } +fn wire_format_llm_event( + uuid: Uuid, + parent_uuid: Option, + scope_category: ScopeCategory, + name: &str, + model_name: &str, + gateway_path: &str, + data: serde_json::Value, +) -> Event { + Event::Scope(ScopeEvent::new( + BaseEvent::builder() + .uuid(uuid) + .parent_uuid_opt(parent_uuid) + .name(name) + .data(data) + .data_schema( + DataSchema::builder() + .name("llm.provider_payload") + .version("1") + .build(), + ) + .metadata(json!({ + "source": "openclaw.public_plugin", + "gateway_path": gateway_path, + "provider_payload_exact": true + })) + .build(), + scope_category, + Vec::new(), + EventCategory::llm(), + Some(CategoryProfile::builder().model_name(model_name).build()), + )) +} + fn read_jsonl(path: &Path) -> Vec { fs::read_to_string(path) .unwrap() @@ -218,6 +253,184 @@ fn subscriber_writes_canonical_event_jsonl() { ); } +#[test] +fn subscriber_preserves_wire_format_llm_lifecycle_payloads_as_raw_jsonl() { + let dir = temp_dir("atof-wire-formats"); + let exporter = AtofExporter::new( + AtofExporterConfig::new() + .with_output_directory(&dir) + .with_filename("events.jsonl"), + ) + .unwrap(); + let subscriber = exporter.subscriber(); + + let anthropic_uuid = Uuid::now_v7(); + let responses_uuid = Uuid::now_v7(); + let chat_uuid = Uuid::now_v7(); + let parent_uuid = Uuid::now_v7(); + + let events = [ + wire_format_llm_event( + anthropic_uuid, + Some(parent_uuid), + ScopeCategory::Start, + "anthropic.messages", + "claude-sonnet-4", + "/v1/messages", + json!({ + "model": "claude-sonnet-4", + "messages": [{"role": "user", "content": "Find the file."}], + "tools": [{"name": "search", "input_schema": {"type": "object"}}] + }), + ), + wire_format_llm_event( + anthropic_uuid, + Some(parent_uuid), + ScopeCategory::End, + "anthropic.messages", + "claude-sonnet-4", + "/v1/messages", + json!({ + "id": "msg_01", + "type": "message", + "content": [ + {"type": "text", "text": "I will search."}, + {"type": "tool_use", "id": "toolu_01", "name": "search", "input": {"query": "file"}} + ], + "usage": { + "input_tokens": 11, + "output_tokens": 7, + "cache_read_input_tokens": 3, + "cache_creation_input_tokens": 5 + } + }), + ), + wire_format_llm_event( + responses_uuid, + Some(parent_uuid), + ScopeCategory::Start, + "openai.responses", + "gpt-4o", + "/v1/responses", + json!({ + "model": "gpt-4o", + "input": "Find the weather.", + "tools": [{"type": "function", "name": "get_weather"}] + }), + ), + wire_format_llm_event( + responses_uuid, + Some(parent_uuid), + ScopeCategory::End, + "openai.responses", + "gpt-4o", + "/v1/responses", + json!({ + "id": "resp_1", + "output": [ + {"type": "message", "content": [{"type": "output_text", "text": "I will check."}]}, + {"type": "function_call", "call_id": "call_weather_1", "name": "get_weather", "arguments": "{\"city\":\"SF\"}"} + ], + "usage": { + "input_tokens": 75, + "output_tokens": 20, + "total_tokens": 95, + "input_tokens_details": {"cached_tokens": 10} + } + }), + ), + wire_format_llm_event( + chat_uuid, + Some(parent_uuid), + ScopeCategory::Start, + "openai.chat_completions", + "gpt-4o", + "/v1/chat/completions", + json!({ + "model": "gpt-4o", + "messages": [{"role": "user", "content": "Inspect the files."}], + "tools": [{"type": "function", "function": {"name": "read"}}] + }), + ), + wire_format_llm_event( + chat_uuid, + Some(parent_uuid), + ScopeCategory::End, + "openai.chat_completions", + "gpt-4o", + "/v1/chat/completions", + json!({ + "choices": [{ + "message": { + "role": "assistant", + "content": "I will inspect.", + "tool_calls": [{"id": "call_read_1", "function": {"name": "read", "arguments": "{\"path\":\"api.py\"}"}}] + } + }], + "usage": { + "prompt_tokens": 3, + "completion_tokens": 4, + "total_tokens": 7, + "prompt_tokens_details": {"cached_tokens": 2} + } + }), + ), + ]; + + for event in &events { + subscriber(event); + } + exporter.force_flush().unwrap(); + + let lines = read_jsonl(exporter.path()); + assert_eq!(lines.len(), events.len()); + for (line, event) in lines.iter().zip(events.iter()) { + assert_eq!(line, &event.try_to_json_value().unwrap()); + assert_eq!(line["kind"], "scope"); + assert_eq!(line["atof_version"], "0.1"); + assert_eq!(line["parent_uuid"], parent_uuid.to_string()); + assert_eq!(line["category"], "llm"); + assert_eq!(line["data_schema"]["name"], "llm.provider_payload"); + assert_eq!(line["data_schema"]["version"], "1"); + assert_eq!(line["metadata"]["source"], "openclaw.public_plugin"); + assert_eq!(line["metadata"]["provider_payload_exact"], true); + } + + assert_eq!(lines[0]["name"], "anthropic.messages"); + assert_eq!(lines[0]["scope_category"], "start"); + assert_eq!(lines[0]["metadata"]["gateway_path"], "/v1/messages"); + assert_eq!( + lines[0]["category_profile"]["model_name"], + "claude-sonnet-4" + ); + assert_eq!(lines[0]["data"]["messages"][0]["content"], "Find the file."); + assert_eq!(lines[1]["scope_category"], "end"); + assert_eq!(lines[1]["data"]["content"][1]["type"], "tool_use"); + assert_eq!(lines[1]["data"]["usage"]["cache_creation_input_tokens"], 5); + + assert_eq!(lines[2]["metadata"]["gateway_path"], "/v1/responses"); + assert_eq!(lines[2]["data"]["input"], "Find the weather."); + assert_eq!(lines[3]["data"]["output"][1]["type"], "function_call"); + assert_eq!( + lines[3]["data"]["usage"]["input_tokens_details"]["cached_tokens"], + 10 + ); + + assert_eq!(lines[4]["metadata"]["gateway_path"], "/v1/chat/completions"); + assert_eq!( + lines[4]["data"]["messages"][0]["content"], + "Inspect the files." + ); + assert_eq!( + lines[5]["data"]["choices"][0]["message"]["tool_calls"][0]["id"], + "call_read_1" + ); + assert_eq!( + lines[5]["data"]["usage"]["prompt_tokens_details"]["cached_tokens"], + 2 + ); +} + #[test] fn register_deregister_flush_and_shutdown_work_with_runtime_events() { let _guard = crate::observability::test_mutex().lock().unwrap(); diff --git a/crates/core/tests/unit/observability/openinference_tests.rs b/crates/core/tests/unit/observability/openinference_tests.rs index a4745a3e..b203ae37 100644 --- a/crates/core/tests/unit/observability/openinference_tests.rs +++ b/crates/core/tests/unit/observability/openinference_tests.rs @@ -688,7 +688,12 @@ fn output_value_extracts_chat_completion_display_text() { ] } }], - "usage": {"prompt_tokens": 3, "completion_tokens": 4, "total_tokens": 7} + "usage": { + "prompt_tokens": 3, + "completion_tokens": 4, + "total_tokens": 7, + "prompt_tokens_details": {"cached_tokens": 2} + } })), )); @@ -709,6 +714,93 @@ fn output_value_extracts_chat_completion_display_text() { attributes.get("llm.token_count.prompt"), Some(&"3".to_string()) ); + assert_eq!( + attributes.get("llm.token_count.prompt_details.cache_read"), + Some(&"2".to_string()) + ); +} + +#[test] +fn output_value_extracts_openai_responses_display_text_and_usage() { + let (provider, exporter) = make_provider(); + let mut processor = + OpenInferenceEventProcessor::new(provider.clone(), "test-scope".to_string()); + let root_uuid = Uuid::now_v7(); + + processor.process(&make_scope_event_with_profile( + ScopeCategory::Start, + root_uuid, + None, + "openai.responses", + ScopeType::Llm, + Some(json!({ + "input": "Find the weather.", + "model": "gpt-4o" + })), + Some(CategoryProfile::builder().model_name("gpt-4o").build()), + )); + processor.process(&make_end_event( + root_uuid, + None, + "openai.responses", + ScopeType::Llm, + Some(json!({ + "id": "resp_1", + "status": "completed", + "output": [ + {"type": "reasoning", "summary": []}, + { + "type": "message", + "content": [ + {"type": "output_text", "text": "I will check the weather."} + ] + }, + { + "type": "function_call", + "call_id": "call_weather_1", + "name": "get_weather", + "arguments": "{\"city\":\"SF\"}", + "status": "completed" + } + ], + "usage": { + "input_tokens": 75, + "output_tokens": 20, + "total_tokens": 95, + "input_tokens_details": {"cached_tokens": 10} + } + })), + )); + + processor.force_flush().unwrap(); + + let spans = exporter.get_finished_spans().unwrap(); + assert_eq!(spans.len(), 1); + let attributes = attr_map(&spans[0].attributes); + assert_eq!( + attributes.get("llm.model_name"), + Some(&"gpt-4o".to_string()) + ); + assert_eq!( + attributes.get("output.value"), + Some(&"I will check the weather.\nRequested tools: get_weather".to_string()) + ); + assert_eq!( + attributes.get("llm.token_count.prompt"), + Some(&"75".to_string()) + ); + assert_eq!( + attributes.get("llm.token_count.completion"), + Some(&"20".to_string()) + ); + assert_eq!( + attributes.get("llm.token_count.total"), + Some(&"95".to_string()) + ); + assert_eq!( + attributes.get("llm.token_count.prompt_details.cache_read"), + Some(&"10".to_string()) + ); } #[test] @@ -1406,6 +1498,103 @@ fn llm_end_with_manual_usage_payload_emits_token_count_attributes() { ); } +#[test] +fn anthropic_messages_output_emits_openinference_text_tool_and_usage_attributes() { + let (provider, exporter) = make_provider(); + let mut processor = + OpenInferenceEventProcessor::new(provider.clone(), "test-scope".to_string()); + let uuid = Uuid::now_v7(); + + processor.process(&make_scope_event_with_profile( + ScopeCategory::Start, + uuid, + None, + "claude-sonnet-4", + ScopeType::Llm, + Some(json!({ + "messages": [{"role": "user", "content": "Find the file."}], + "model": "claude-sonnet-4" + })), + Some( + CategoryProfile::builder() + .model_name("claude-sonnet-4") + .build(), + ), + )); + processor.process(&make_scope_event_with_profile( + ScopeCategory::End, + uuid, + None, + "claude-sonnet-4", + ScopeType::Llm, + Some(json!({ + "id": "msg_01", + "type": "message", + "role": "assistant", + "model": "claude-sonnet-4", + "content": [ + {"type": "text", "text": "I will search for it."}, + { + "type": "tool_use", + "id": "toolu_01", + "name": "search", + "input": {"query": "file"} + } + ], + "stop_reason": "tool_use", + "usage": { + "input_tokens": 11, + "output_tokens": 7, + "cache_read_input_tokens": 3, + "cache_creation_input_tokens": 5 + } + })), + Some( + CategoryProfile::builder() + .model_name("claude-sonnet-4") + .build(), + ), + )); + + processor.force_flush().unwrap(); + + let spans = exporter.get_finished_spans().unwrap(); + assert_eq!(spans.len(), 1); + let attributes = attr_map(&spans[0].attributes); + assert_eq!( + attributes.get("openinference.span.kind"), + Some(&"LLM".to_string()) + ); + assert_eq!( + attributes.get("llm.model_name"), + Some(&"claude-sonnet-4".to_string()) + ); + assert_eq!( + attributes.get("input.value"), + Some(&"user: Find the file.".to_string()) + ); + assert_eq!( + attributes.get("output.value"), + Some(&"I will search for it.\nRequested tools: search".to_string()) + ); + assert_eq!( + attributes.get("llm.token_count.prompt"), + Some(&"11".to_string()) + ); + assert_eq!( + attributes.get("llm.token_count.completion"), + Some(&"7".to_string()) + ); + assert_eq!( + attributes.get("llm.token_count.prompt_details.cache_read"), + Some(&"3".to_string()) + ); + assert_eq!( + attributes.get("llm.token_count.prompt_details.cache_write"), + Some(&"5".to_string()) + ); +} + #[test] fn llm_end_with_inconsistent_manual_usage_omits_invalid_total_tokens() { let (provider, exporter) = make_provider(); diff --git a/integrations/openclaw/src/hook-replay/llm.ts b/integrations/openclaw/src/hook-replay/llm.ts index e2a9f407..63eccef6 100644 --- a/integrations/openclaw/src/hook-replay/llm.ts +++ b/integrations/openclaw/src/hook-replay/llm.ts @@ -1178,17 +1178,36 @@ function trajectoryRunKey(session: SessionState, runId?: string): string { return runId ?? session.sessionId; } -/** Normalize provider usage into OpenInference-friendly token/cost fields. */ +/** Normalize provider usage into NeMo Relay token and cost fields. */ function mapUsage(usage: unknown): Record | undefined { if (!isRecord(usage)) { return undefined; } const mapped: Record = {}; - const input = numberField(usage, 'input') ?? numberField(usage, 'prompt_tokens'); - const output = numberField(usage, 'output') ?? numberField(usage, 'completion_tokens'); - const cacheRead = numberField(usage, 'cacheRead') ?? numberField(usage, 'cache_read_tokens'); - const cacheWrite = numberField(usage, 'cacheWrite') ?? numberField(usage, 'cache_write_tokens'); - const total = numberField(usage, 'total') ?? numberField(usage, 'totalTokens') ?? numberField(usage, 'total_tokens'); + const input = + numberField(usage, 'input') ?? + numberField(usage, 'prompt_tokens') ?? + numberField(usage, 'input_tokens') ?? + numberField(usage, 'inputTokens'); + const output = + numberField(usage, 'output') ?? + numberField(usage, 'completion_tokens') ?? + numberField(usage, 'output_tokens') ?? + numberField(usage, 'outputTokens'); + const cacheRead = + numberField(usage, 'cacheRead') ?? + numberField(usage, 'cache_read_tokens') ?? + numberField(usage, 'cache_read_input_tokens') ?? + nestedNumberField(usage, 'input_tokens_details', 'cached_tokens') ?? + nestedNumberField(usage, 'prompt_tokens_details', 'cached_tokens'); + const cacheWrite = + numberField(usage, 'cacheWrite') ?? + numberField(usage, 'cache_write_tokens') ?? + numberField(usage, 'cache_creation_input_tokens'); + const total = + numberField(usage, 'total') ?? + numberField(usage, 'totalTokens') ?? + numberField(usage, 'total_tokens'); const totalCanIncludeCompletion = total === undefined || output === undefined || total >= output; const prompt = total !== undefined && output !== undefined && totalCanIncludeCompletion ? total - output : input; const totalCanIncludePrompt = total === undefined || prompt === undefined || total >= prompt; @@ -1230,6 +1249,12 @@ function numberField(record: Record, key: string): number | und return typeof value === 'number' && Number.isFinite(value) ? value : undefined; } +/** Read a finite numeric field nested one object deep from a generic hook record. */ +function nestedNumberField(record: Record, objectKey: string, fieldKey: string): number | undefined { + const value = record[objectKey]; + return isRecord(value) ? numberField(value, fieldKey) : undefined; +} + /** Copy model_call_ended details into a retained timing record. */ function applyModelCallEnd(record: ModelCallRecord, event: PluginHookModelCallEndedEvent, nowMs: number): void { record.observedAtMs = nowMs; diff --git a/integrations/openclaw/test/llm-replay.test.ts b/integrations/openclaw/test/llm-replay.test.ts index 5702fafa..0e067de1 100644 --- a/integrations/openclaw/test/llm-replay.test.ts +++ b/integrations/openclaw/test/llm-replay.test.ts @@ -104,6 +104,135 @@ describe('LLM replay', () => { assert.deepEqual((response.openclaw as ResponseOpenClaw).assistant_tool_call_names, ['web_search']); }); + it('normalizes Anthropic messages content blocks from message-write replay', () => { + const nf = createNemoRelayRuntime(); + const backend = createBackend(nf); + const assistantMessage = { + role: 'assistant', + provider: 'anthropic', + model: 'claude-sonnet-4', + content: [ + { type: 'text', text: 'I will search for it.' }, + { + type: 'tool_use', + id: 'toolu-search-1', + name: 'web_search', + input: { query: 'release notes' }, + }, + ], + usage: { + input_tokens: 11, + output_tokens: 7, + cache_read_input_tokens: 3, + cache_creation_input_tokens: 5, + }, + }; + + backend.onLlmInput( + { + ...llmInput(), + provider: 'anthropic', + model: 'claude-sonnet-4', + prompt: 'Find the release notes.', + historyMessages: [{ role: 'user', content: [{ type: 'text', text: 'Find the release notes.' }] }], + }, + { runId: 'run-1', sessionId: 'session-1' }, + ); + backend.onModelCallEnded( + { + ...modelEnded('call-1', 42), + provider: 'anthropic', + model: 'claude-sonnet-4', + }, + { runId: 'run-1', sessionId: 'session-1' }, + ); + backend.onBeforeMessageWrite({ message: assistantMessage }, { sessionKey: 'session-1' }); + backend.onAgentEnd( + { + runId: 'run-1', + messages: [ + { role: 'user', content: [{ type: 'text', text: 'Find the release notes.' }] }, + assistantMessage, + ], + success: true, + }, + { runId: 'run-1', sessionId: 'session-1' }, + ); + + assert.equal(nf.calls.llmCall.length, 1); + assert.equal(nf.calls.llmCallEnd.length, 1); + const request = nf.calls.llmCall[0]?.request as ReplayRequest; + assert.deepEqual(request.content.messages, [ + { role: 'user', content: [{ type: 'text', text: 'Find the release notes.' }] }, + ]); + const response = nf.calls.llmCallEnd[0]?.response as ReplayResponse; + assert.equal(response.content, 'I will search for it.'); + assert.deepEqual(response.tool_calls, [ + { + type: 'tool_use', + id: 'toolu-search-1', + name: 'web_search', + input: { stripped: true }, + }, + ]); + assert.deepEqual(response.usage, { + prompt_tokens: 11, + completion_tokens: 7, + cached_tokens: 3, + cache_read_tokens: 3, + cache_write_tokens: 5, + total_tokens: 18, + }); + assert.deepEqual((response.openclaw as ResponseOpenClaw).assistant_tool_call_names, ['web_search']); + }); + + it('normalizes OpenAI responses usage from message-write replay', () => { + const nf = createNemoRelayRuntime(); + const backend = createBackend(nf); + const assistantMessage = { + role: 'assistant', + provider: 'openai', + model: 'gpt-4.1', + content: [{ type: 'output_text', text: 'Done.' }], + usage: { + input_tokens: 75, + output_tokens: 20, + total_tokens: 95, + input_tokens_details: { cached_tokens: 10 }, + }, + }; + + backend.onLlmInput( + { + ...llmInput(), + model: 'gpt-4.1', + prompt: 'Use the responses endpoint.', + historyMessages: [{ role: 'user', content: 'Use the responses endpoint.' }], + }, + { runId: 'run-1', sessionId: 'session-1' }, + ); + backend.onModelCallEnded({ ...modelEnded('call-1', 42), model: 'gpt-4.1' }, { runId: 'run-1', sessionId: 'session-1' }); + backend.onBeforeMessageWrite({ message: assistantMessage }, { sessionKey: 'session-1' }); + backend.onAgentEnd( + { + runId: 'run-1', + messages: [{ role: 'user', content: 'Use the responses endpoint.' }, assistantMessage], + success: true, + }, + { runId: 'run-1', sessionId: 'session-1' }, + ); + + const response = nf.calls.llmCallEnd[0]?.response as ReplayResponse; + assert.equal(response.content, 'Done.'); + assert.deepEqual(response.usage, { + prompt_tokens: 75, + completion_tokens: 20, + cached_tokens: 10, + cache_read_tokens: 10, + total_tokens: 95, + }); + }); + it('uses the observed input time as the fallback llm span start time', () => { const now = Date.now; const nf = createNemoRelayRuntime(); From bec87874e9770adae04369a1d7c1acdc1f98bd13 Mon Sep 17 00:00:00 2001 From: mnajafian-nv Date: Tue, 2 Jun 2026 17:08:25 -0700 Subject: [PATCH 2/2] Update crates/core/src/observability/openinference.rs Co-authored-by: Will Killian <2007799+willkill07@users.noreply.github.com> Signed-off-by: mnajafian-nv --- crates/core/src/observability/openinference.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/observability/openinference.rs b/crates/core/src/observability/openinference.rs index 0e9baac6..990902fa 100644 --- a/crates/core/src/observability/openinference.rs +++ b/crates/core/src/observability/openinference.rs @@ -1086,7 +1086,7 @@ fn display_text_from_openai_responses_content(value: &Json) -> Option { } }) .collect::>() - .join("\n") + .join("\n\n") .trim() .to_string(); if text.is_empty() { None } else { Some(text) }