Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 55 additions & 3 deletions crates/core/src/observability/atif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,12 +433,16 @@ fn unwrap_llm_request(input: &Json) -> Json {

/// Extract the user-facing message content from a raw LLM response.
///
/// Looks for OpenAI-compatible and Hermes-compatible response content fields.
/// Looks for provider response content fields that can be represented as an
/// ATIF agent message.
/// Tool-call-only responses use an empty string message and keep the full
/// response under `Step.extra.llm_response`.
fn extract_llm_response_message(output: &Json) -> Json {
if let Some(obj) = output.as_object() {
if let Some(content) = non_null_object_field(obj, "content") {
if let Some(message) = anthropic_messages_content_message(output, &content) {
return message;
}
return atif_content_value(&content);
}
if let Some(content) = obj
Expand Down Expand Up @@ -483,6 +487,38 @@ fn atif_content_value(value: &Json) -> Json {
}
}

fn anthropic_messages_content_message(output: &Json, content: &Json) -> Option<Json> {
let object = output.as_object()?;
if object.get("type").and_then(Json::as_str) != Some("message") {
return None;
}
let blocks = content.as_array()?;
let mut text_parts = Vec::new();
let mut has_tool_use = false;
for block in blocks {
let Some(block_object) = block.as_object() else {
continue;
};
match block_object.get("type").and_then(Json::as_str) {
Some("text") => {
if let Some(text) = block_object.get("text").and_then(Json::as_str)
&& !text.trim().is_empty()
{
text_parts.push(text.to_string());
}
}
Some("tool_use") => has_tool_use = true,
_ => {}
}
}
match text_parts.as_slice() {
[] if has_tool_use => Some(empty_message()),
[] => None,
[text] => Some(Json::String(text.clone())),
_ => Some(Json::String(text_parts.join("\n"))),
}
}

fn observation_content_value(value: &Json) -> Option<Json> {
(!value.is_null()).then(|| value.clone())
}
Expand Down Expand Up @@ -865,7 +901,8 @@ fn extract_tool_calls(output: &Json) -> Option<Vec<AtifToolCall>> {
let arr = tool_call_array(output)
.filter(|arr| !arr.is_empty())
.map(|arr| arr.iter().collect::<Vec<_>>())
.or_else(|| openai_responses_function_call_items(output))?;
.or_else(|| openai_responses_function_call_items(output))
.or_else(|| anthropic_messages_tool_use_items(output))?;
let mut calls = Vec::with_capacity(arr.len());
for (index, tc) in arr.iter().enumerate() {
let tc_obj = tc.as_object()?;
Expand All @@ -892,7 +929,8 @@ fn extract_tool_calls(output: &Json) -> Option<Vec<AtifToolCall>> {
let raw_arguments = func
.and_then(|f| f.get("arguments"))
.or_else(|| tc_obj.get("arguments"))
.or_else(|| tc_obj.get("args"));
.or_else(|| tc_obj.get("args"))
.or_else(|| tc_obj.get("input"));
let arguments = normalize_tool_arguments(raw_arguments);
// Skip entries with no id and no name — they are not meaningful.
if id.is_empty() && name.is_empty() {
Expand Down Expand Up @@ -936,6 +974,19 @@ fn openai_responses_function_call_items(output: &Json) -> Option<Vec<&Json>> {
(!function_call_items.is_empty()).then_some(function_call_items)
}

fn anthropic_messages_tool_use_items(output: &Json) -> Option<Vec<&Json>> {
let object = output.as_object()?;
if object.get("type").and_then(Json::as_str) != Some("message") {
return None;
}
let content_blocks = object.get("content").and_then(Json::as_array)?;
let tool_use_items = content_blocks
.iter()
.filter(|item| item.get("type").and_then(Json::as_str) == Some("tool_use"))
.collect::<Vec<_>>();
(!tool_use_items.is_empty()).then_some(tool_use_items)
}

fn normalize_tool_arguments(raw_arguments: Option<&Json>) -> Json {
let Some(raw_arguments) = raw_arguments else {
return serde_json::json!({});
Expand Down Expand Up @@ -968,6 +1019,7 @@ fn tool_call_extra(tool_call: &Json) -> Option<Json> {
| "function_name"
| "arguments"
| "args"
| "input"
) {
extra.insert(key.clone(), value.clone());
}
Expand Down
100 changes: 97 additions & 3 deletions crates/core/src/observability/openinference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,23 @@ fn usage_from_manual_llm_output(output: Option<&Json>) -> Option<Usage> {
"cacheReadInputTokens",
"cacheRead",
],
);
)
.or_else(|| {
first_nested_u64_from_manual_usage(
usage,
token_usage,
"input_tokens_details",
"cached_tokens",
)
})
.or_else(|| {
first_nested_u64_from_manual_usage(
usage,
token_usage,
"prompt_tokens_details",
"cached_tokens",
)
});
let cache_write_tokens = first_u64_from_manual_usage(
usage,
token_usage,
Expand Down Expand Up @@ -838,6 +854,29 @@ fn first_u64_from_manual_usage(
.or_else(|| token_usage.and_then(|value| first_u64(value, keys)))
}

fn first_nested_u64_from_manual_usage(
usage: Option<&serde_json::Map<String, Json>>,
token_usage: Option<&serde_json::Map<String, Json>>,
parent_key: &str,
child_key: &str,
) -> Option<u64> {
usage
.and_then(|value| nested_u64(value, parent_key, child_key))
.or_else(|| token_usage.and_then(|value| nested_u64(value, parent_key, child_key)))
}

fn nested_u64(
usage: &serde_json::Map<String, Json>,
parent_key: &str,
child_key: &str,
) -> Option<u64> {
usage
.get(parent_key)
.and_then(Json::as_object)
.and_then(|details| details.get(child_key))
.and_then(Json::as_u64)
}

fn first_u64(usage: &serde_json::Map<String, Json>, keys: &[&str]) -> Option<u64> {
keys.iter()
.find_map(|key| usage.get(*key).and_then(Json::as_u64))
Expand Down Expand Up @@ -985,8 +1024,13 @@ fn display_text_from_json(value: &Json) -> Option<String> {
}
}
object
.get("choices")
.and_then(display_text_from_chat_choices)
.get("output")
.and_then(display_text_from_openai_responses_output)
.or_else(|| {
object
.get("choices")
.and_then(display_text_from_chat_choices)
})
.or_else(|| {
object
.get("tool_calls")
Expand All @@ -998,6 +1042,56 @@ fn display_text_from_json(value: &Json) -> Option<String> {
}
}

fn display_text_from_openai_responses_output(value: &Json) -> Option<String> {
let items = value.as_array()?;
let mut entries = Vec::new();
let mut tool_names = Vec::new();
for item in items {
let Some(object) = item.as_object() else {
continue;
};
match object.get("type").and_then(Json::as_str) {
Some("message") => {
if let Some(content) = object
.get("content")
.and_then(display_text_from_openai_responses_content)
{
entries.push(content);
}
}
Some("function_call") => {
if let Some(name) = object.get("name").and_then(Json::as_str) {
tool_names.push(name.to_string());
}
}
_ => {}
}
}
if !tool_names.is_empty() {
entries.push(format!("Requested tools: {}", tool_names.join(", ")));
}
let text = entries.join("\n").trim().to_string();
if text.is_empty() { None } else { Some(text) }
}

fn display_text_from_openai_responses_content(value: &Json) -> Option<String> {
let content = value.as_array()?;
let text = content
.iter()
.filter_map(|part| {
let object = part.as_object()?;
match object.get("type").and_then(Json::as_str) {
Some("output_text" | "text") => object.get("text").and_then(Json::as_str),
_ => None,
}
})
.collect::<Vec<_>>()
.join("\n\n")
.trim()
.to_string();
if text.is_empty() { None } else { Some(text) }
}

fn display_text_from_messages(value: &Json) -> Option<String> {
let messages = value.as_array()?;
let text = messages
Expand Down
107 changes: 107 additions & 0 deletions crates/core/tests/unit/atif_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,113 @@ fn test_exporter_openai_responses_lifecycle_extracts_messages() {
assert_eq!(agent_extra.llm_response.unwrap()["id"], json!("resp_1"));
}

#[test]
fn test_exporter_anthropic_messages_lifecycle_promotes_tool_use_blocks() {
let exporter = AtifExporter::new("session-1".to_string(), make_agent_info());
let llm_uuid = Uuid::now_v7();
let tool_uuid = Uuid::now_v7();
let base = base_timestamp();

let mut start = event_builder(llm_uuid, EventType::Start)
.name("claude-sonnet-4")
.scope_type(ScopeType::Llm)
.input(json!({
"model": "claude-sonnet-4",
"system": "Be concise.",
"messages": [{"role": "user", "content": "Find the file."}],
"tools": [{
"name": "search",
"input_schema": {
"type": "object",
"properties": {"query": {"type": "string"}}
}
}]
}))
.model_name("claude-sonnet-4")
.build();
let mut end = event_builder(llm_uuid, EventType::End)
.name("claude-sonnet-4")
.scope_type(ScopeType::Llm)
.output(json!({
"id": "msg_01",
"type": "message",
"role": "assistant",
"model": "claude-sonnet-4",
"content": [
{"type": "text", "text": "I will search for it."},
{
"type": "tool_use",
"id": "toolu_01",
"name": "search",
"input": {"query": "file"}
}
],
"stop_reason": "tool_use",
"usage": {
"input_tokens": 11,
"output_tokens": 7,
"cache_read_input_tokens": 3,
"cache_creation_input_tokens": 5
}
}))
.model_name("claude-sonnet-4")
.build();
let mut tool_end = event_builder(tool_uuid, EventType::End)
.name("search")
.scope_type(ScopeType::Tool)
.parent_uuid(llm_uuid)
.tool_call_id("toolu_01")
.output(json!({"matches": ["README.md"]}))
.build();

for (offset, event) in [&mut start, &mut end, &mut tool_end]
.into_iter()
.enumerate()
{
set_event_timestamp(event, base + chrono::Duration::milliseconds(offset as i64));
}

{
let mut state = exporter.state.lock().unwrap();
state.events.extend([start, end, tool_end]);
}

let trajectory = exporter.export().unwrap();
assert_atif_v17_shape(&trajectory);
assert_eq!(trajectory.steps.len(), 2);

let user_step = &trajectory.steps[0];
assert_eq!(user_step.source, "user");
assert_eq!(user_step.message, json!("Find the file."));
let user_extra: AtifStepExtra =
serde_json::from_value(user_step.extra.clone().unwrap()).unwrap();
let llm_request = user_extra.llm_request.unwrap();
assert_eq!(llm_request["system"], json!("Be concise."));
assert_eq!(llm_request["tools"][0]["name"], json!("search"));

let agent_step = &trajectory.steps[1];
assert_eq!(agent_step.source, "agent");
assert_eq!(agent_step.message, json!("I will search for it."));
assert_eq!(agent_step.model_name, Some("claude-sonnet-4".to_string()));
let metrics = agent_step.metrics.as_ref().unwrap();
assert_eq!(metrics.prompt_tokens, Some(11));
assert_eq!(metrics.completion_tokens, Some(7));
assert_eq!(metrics.cached_tokens, Some(8));
let tool_call = &agent_step.tool_calls.as_ref().unwrap()[0];
assert_eq!(tool_call.tool_call_id, "toolu_01");
assert_eq!(tool_call.function_name, "search");
assert_eq!(tool_call.arguments, json!({"query": "file"}));
let observation = agent_step.observation.as_ref().unwrap();
assert_eq!(
observation.results[0].source_call_id.as_deref(),
Some("toolu_01")
);
assert_eq!(
observation.results[0].content,
Some(json!({"matches": ["README.md"]}))
);
}

#[test]
fn test_openai_responses_input_extracts_latest_user_content_block() {
let message = extract_user_messages(&json!({
Expand Down
Loading
Loading