diff --git a/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/data/output_samples/workflow_output.json b/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/data/output_samples/workflow_output.json index 1e7ad0e580..ce6d57d432 100644 --- a/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/data/output_samples/workflow_output.json +++ b/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/data/output_samples/workflow_output.json @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:832a5311fb051c545b3b0d3862bf432ef6345f93a6c030250f9b98436cc33145 -size 993205 +oid sha256:6f85da4e4ca5390cb333643c547004a53d2425cb010879c79b8e7aac7a065381 +size 1014978 diff --git a/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/data/output_samples/workflow_output_atif.json b/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/data/output_samples/workflow_output_atif.json index 35150245ff..fccab4ee54 100644 --- a/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/data/output_samples/workflow_output_atif.json +++ b/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/data/output_samples/workflow_output_atif.json @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:7c615d3d082705efd85e01e1a89e9294cb98b1723dfba658faaeb8f9ae2588fe -size 104469 +oid sha256:4235791575d6d26a3833c77d48b56a2923b92e75a986d67bc4ada51a16d184cc +size 115049 diff --git a/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/telemetry_metrics_analysis_agent.py b/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/telemetry_metrics_analysis_agent.py index a8321eaad5..7d0a2089a5 100644 --- a/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/telemetry_metrics_analysis_agent.py +++ b/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/telemetry_metrics_analysis_agent.py @@ -16,11 +16,13 @@ from pydantic import Field from nat.builder.builder import Builder +from nat.builder.context import Context from nat.builder.framework_enum import LLMFrameworkEnum from nat.builder.function_info import FunctionInfo from nat.cli.register_workflow import register_function from nat.data_models.component_ref import LLMRef from nat.data_models.function import FunctionBaseConfig +from nat.data_models.intermediate_step import TraceMetadata from . import utils from .prompts import TelemetryMetricsAnalysisAgentPrompts @@ -88,7 +90,14 @@ def telemetry_metrics_analysis_agent(state: MessagesState): # Execute analysis and get response input_message = f"Host to investigate: {host_id}. Alert type: {alert_type}" - response = await agent_executor.ainvoke({"messages": [HumanMessage(content=input_message)]}) + delegation_metadata = TraceMetadata(provided_metadata={"is_subagent_delegation": True}) + with Context.get().push_active_function("telemetry_metrics_analysis_subagent_call", + input_data={ + "host_id": host_id, + "alert_type": alert_type, + }, + metadata=delegation_metadata): + response = await agent_executor.ainvoke({"messages": [HumanMessage(content=input_message)]}) conclusion = response["messages"][-1].content diff --git a/packages/nvidia_nat_atif/intermediate-step-to-atif-mapping.md b/packages/nvidia_nat_atif/intermediate-step-to-atif-mapping.md index 30a0d53974..98516b61e6 100644 --- a/packages/nvidia_nat_atif/intermediate-step-to-atif-mapping.md +++ b/packages/nvidia_nat_atif/intermediate-step-to-atif-mapping.md @@ -18,72 +18,97 @@ limitations under the License. # IntermediateStep to ATIF Mapping -This document explains how IntermediateStep event streams are mapped to ATIF trajectories. - -It is intentionally generic and applies to the current conversion path used by the toolkit. - -## ID Mappings +This document defines the current conversion contract from NAT `IntermediateStep` +events to ATIF trajectories. + +The conversion is intentionally split into two passes: + +1. **Pass 1: build execution structure** + - Determine root context ownership and delegated child context ownership. + - Build subagent linkage metadata from explicit delegation markers. +2. **Pass 2: project contexts to ATIF** + - Convert each context into ATIF steps using deterministic step window rules. + - Emit flat tool call and observation rows with lineage metadata for nesting reconstruction. + +The converter code implementing this model lives in: +`packages/nvidia_nat_core/src/nat/utils/atif_converter.py`, with key entry points: +`_pass1_build_execution_structure()` and `_pass2_project_context_to_steps()`. + +## Pass 1: Execution Structure Rules + +- Events are ordered by `event_timestamp`. +- Context ownership is computed from callable lineage and explicit delegation markers. +- Delegation boundaries are explicit only: + - marker source: `metadata.provided_metadata["is_subagent_delegation"] == true` + - no implicit delegation inference in the clean mapping path. +- Pass 1 outputs: + - `root_events` + - `child_events_by_session` + - `subagent_ref_by_call_id` + +## Pass 2: Context Projection Rules + +- Each context is projected independently into ATIF steps. +- `WORKFLOW_START` can emit a `source="user"` step for the root context. +- Step windows are anchored by terminal events: + - `LLM_END` + - `TOOL_END` + - `FUNCTION_END` + - `WORKFLOW_END` (terminal output only when not redundant) +- Tool rows are flat by design: + - nested tool calls are kept in `tool_calls`, `observation.results`, + `extra.tool_ancestry`, and `extra.tool_invocations` + - subscribers reconstruct nesting using ancestry `function_id` and `parent_id`. +- Chunk events (`LLM_NEW_TOKEN`, `SPAN_CHUNK`) do not emit standalone ATIF steps. + +## Subagent Trajectory Rules + +- Parent observation rows include `subagent_trajectory_ref` only for explicitly + marked delegating calls. +- Child trajectories are built from delegated context events only. +- Embedded children are stored under: + - `trajectory.extra["subagent_trajectories"]` +- Reference resolution precedence: + 1. `subagent_trajectory_ref.trajectory_path` + 2. `trajectory.extra.subagent_trajectories[session_id]` + +## ID and Lineage Mapping | IntermediateStep | ATIF | Mapping Rule | Notes | |---|---|---|---| | `payload.UUID` | `tool_calls[*].tool_call_id` | `tool_call_id = "call_" + payload.UUID` | Invocation occurrence identity | -| `payload.UUID` | `observation.results[*].source_call_id` | `source_call_id = tool_call_id` | Observation links to invocation | -| `payload.UUID` | `extra.tool_invocations[*].invocation_id` | `invocation_id = tool_call_id` | Timing row identity for same invocation | +| `payload.UUID` | `observation.results[*].source_call_id` | `source_call_id = tool_call_id` | Observation-to-call link | +| `payload.UUID` | `extra.tool_invocations[*].invocation_id` | `invocation_id = tool_call_id` | Invocation timing row identity | | `function_ancestry.function_id` | `extra.tool_ancestry[*].function_id` | Direct match | Callable lineage node identity | -| `function_ancestry.parent_id` | `extra.tool_ancestry[*].parent_id` | Direct match | Parent callable lineage node identity | -| `function_ancestry.function_id` (step context) | `extra.ancestry.function_id` | Direct match | Step-level callable context | -| Not applicable | `step_id` | Generated ATIF sequence counter | Not derived from IntermediateStep UUID | +| `function_ancestry.parent_id` | `extra.tool_ancestry[*].parent_id` | Direct match | Lineage parent node identity | +| `function_ancestry.function_id` (step anchor) | `extra.ancestry.function_id` | Direct match | Step context identity | +| Not applicable | `step_id` | Generated sequentially from 1 | Not derived from UUID | -## Name Mappings +## Name Mapping | IntermediateStep | ATIF | Mapping Rule | Notes | |---|---|---|---| -| `payload.name` (tool, function, or LLM by event type) | `tool_calls[*].function_name` or `model_name` | Context dependent | IntermediateStep name is polymorphic by event type | -| `function_ancestry.function_name` | `extra.tool_ancestry[*].function_name` | Direct match | Callable lineage node name | -| `function_ancestry.parent_name` | `extra.tool_ancestry[*].parent_name` | Direct match | Parent callable lineage node name | -| `function_ancestry.function_name` (step context) | `extra.ancestry.function_name` | Direct match | Step-level lineage name | - -## Event-to-Step Mapping - -- `WORKFLOW_START` maps to an ATIF user step (`source = "user"`). -- `LLM_END` starts an ATIF agent turn candidate step. -- `TOOL_END` and `FUNCTION_END` are accumulated into the pending ATIF step as observed invocations. -- `WORKFLOW_END` may emit a terminal ATIF agent step when final output is present and not redundant. -- `LLM_NEW_TOKEN` and other non-terminal chunk events are not directly emitted as standalone ATIF steps. - -## Identity Semantics - -- Invocation identity and callable identity are intentionally different: - - Invocation identity: `tool_call_id`, `source_call_id`, `invocation_id` - - Callable identity: `function_id`, `parent_id` -- Correct lineage interpretation requires both: - - use invocation IDs for per-call correlation, - - use callable IDs for hierarchy and repeated-call disambiguation. - -## Timing Mappings - -- IntermediateStep end events commonly use: - - `event_timestamp` as end timestamp, - - `span_event_timestamp` as start timestamp. -- ATIF timing is represented in: - - `extra.invocation.start_timestamp` and `extra.invocation.end_timestamp` (step-level), - - `extra.tool_invocations[*].start_timestamp` and `extra.tool_invocations[*].end_timestamp` (per invocation). - -## Practical Validation Checklist - -- Verify `tool_call_id == source_call_id == invocation_id` for each ATIF invocation row. -- Verify `tool_call_id == "call_" + payload.UUID` for mapped tool or function end events. -- Verify callable lineage consistency: - - `function_ancestry.function_id <-> extra.tool_ancestry[*].function_id` - - `function_ancestry.parent_id <-> extra.tool_ancestry[*].parent_id` -- Verify name consistency: - - `function_ancestry.function_name <-> extra.tool_ancestry[*].function_name` - - `function_ancestry.parent_name <-> extra.tool_ancestry[*].parent_name` - -## Additional Identifiers Worth Tracking - -- IntermediateStep structural parent linkage: `parent_id` -- Event semantics: `payload.event_type` -- Time surfaces: `event_timestamp`, `span_event_timestamp`, ATIF `timestamp` -- Session scope: ATIF `session_id` -- Framework or provider run IDs when present in metadata (for example, model framework trace IDs) +| `payload.name` on `LLM_END` | `model_name` | Direct match | Model name for agent step | +| `payload.name` on `TOOL_END` or `FUNCTION_END` | `tool_calls[*].function_name` | Direct match | Invocation display name | +| `function_ancestry.function_name` | `extra.tool_ancestry[*].function_name` | Direct match | Callable lineage name | +| `function_ancestry.parent_name` | `extra.tool_ancestry[*].parent_name` | Direct match | Parent lineage name | + +## Timing Mapping + +- `span_event_timestamp` maps to start timestamp surfaces when present. +- `event_timestamp` maps to end timestamp surfaces. +- ATIF timing fields: + - `extra.invocation.start_timestamp` + - `extra.invocation.end_timestamp` + - `extra.tool_invocations[*].start_timestamp` + - `extra.tool_invocations[*].end_timestamp` + +## Validation Checklist + +- `tool_call_id == source_call_id == invocation_id` for each projected invocation row. +- `tool_call_id == "call_" + payload.UUID` for mapped tool and function end events. +- Lineage consistency: + - `function_ancestry.function_id` equals `extra.tool_ancestry[*].function_id` + - `function_ancestry.parent_id` equals `extra.tool_ancestry[*].parent_id` +- Step IDs are sequential and start from 1. +- Every emitted `subagent_trajectory_ref.session_id` resolves by precedence rules. diff --git a/packages/nvidia_nat_core/src/nat/utils/atif_converter.py b/packages/nvidia_nat_core/src/nat/utils/atif_converter.py index a0c9944bf1..2b3a73a141 100644 --- a/packages/nvidia_nat_core/src/nat/utils/atif_converter.py +++ b/packages/nvidia_nat_core/src/nat/utils/atif_converter.py @@ -44,8 +44,8 @@ from nat.atif import ATIFStepMetrics from nat.atif import ATIFToolCall from nat.atif import ATIFTrajectory +from nat.atif import SubagentTrajectoryRef from nat.data_models.intermediate_step import IntermediateStep -from nat.data_models.intermediate_step import IntermediateStepCategory from nat.data_models.intermediate_step import IntermediateStepState from nat.data_models.intermediate_step import IntermediateStepType from nat.data_models.intermediate_step import TraceMetadata @@ -190,6 +190,58 @@ def _parse_tool_arguments(raw_input: Any) -> dict[str, Any]: return {} +def _extract_tool_arguments_from_ist(ist: IntermediateStep) -> dict[str, Any]: + """Extract tool arguments from preferred IST sources with fallback.""" + if ist.data and ist.data.input is not None: + return _parse_tool_arguments(ist.data.input) + + metadata = _event_metadata(ist) + metadata_tool_inputs: Any = None + if isinstance(metadata, TraceMetadata): + metadata_tool_inputs = metadata.tool_inputs + elif isinstance(metadata, dict): + metadata_tool_inputs = metadata.get("tool_inputs") + + if metadata_tool_inputs is not None: + return _parse_tool_arguments(metadata_tool_inputs) + + return {} + + +def _extract_subagent_delegation_flag(metadata: Any) -> bool: + """Extract optional subagent delegation flag from metadata payload.""" + if isinstance(metadata, TraceMetadata): + provided = metadata.provided_metadata + if isinstance(provided, dict): + return bool(provided.get("is_subagent_delegation", provided.get("subagent_delegation", False))) + return False + if isinstance(metadata, dict): + provided = metadata.get("provided_metadata") + if isinstance(provided, dict): + return bool(provided.get("is_subagent_delegation", provided.get("subagent_delegation", False))) + return bool(metadata.get("is_subagent_delegation", metadata.get("subagent_delegation", False))) + return False + + +def _event_metadata(ist: IntermediateStep) -> Any: + """Return metadata payload from the most reliable event surface.""" + payload_metadata = getattr(ist.payload, "metadata", None) + if payload_metadata is not None: + return payload_metadata + return getattr(ist, "metadata", None) + + +def _event_uuid(ist: IntermediateStep) -> str: + """Return stable event UUID from top-level or payload fields.""" + top_level = getattr(ist, "UUID", None) + if top_level: + return str(top_level) + payload_level = getattr(ist.payload, "UUID", None) + if payload_level: + return str(payload_level) + return "" + + # --------------------------------------------------------------------------- # Internal accumulator # --------------------------------------------------------------------------- @@ -201,9 +253,20 @@ class _ObservedInvocation: order_key: float tool_call: ATIFToolCall - observation: ATIFObservationResult + tool_output: str ancestry: AtifAncestry invocation: AtifInvocationInfo + is_subagent_delegation: bool + + +@dataclass +class _ExecutionStructure: + """Pass-1 execution structure extracted from IST.""" + + root_events: list[IntermediateStep] + child_events_by_session: dict[str, list[IntermediateStep]] + child_agent_name_by_session: dict[str, str] + subagent_ref_by_call_id: dict[str, SubagentTrajectoryRef] class _PendingAgentTurn: @@ -220,7 +283,7 @@ def __init__(self, message: str, timestamp: float, model_name: str | None, metri self.observed_invocations: list[_ObservedInvocation] = [] -def _record_observed_invocation(pending: _PendingAgentTurn, ist: IntermediateStep) -> None: +def _record_observed_invocation(pending: _PendingAgentTurn, ist: IntermediateStep, *, start_flag: bool = False) -> None: """Record an observed invocation as a tool_call + observation pair.""" tool_name = ist.name or "unknown_tool" if tool_name == "": @@ -228,20 +291,284 @@ def _record_observed_invocation(pending: _PendingAgentTurn, ist: IntermediateSte return tool_input: dict[str, Any] = {} tool_output = "" + tool_input = _extract_tool_arguments_from_ist(ist) if ist.data: - tool_input = _parse_tool_arguments(ist.data.input) tool_output = _safe_str(ist.data.output) - call_id = f"call_{ist.UUID}" + event_uuid = _event_uuid(ist) + if not event_uuid: + logger.warning("Skipping invocation without UUID for tool=%s", tool_name) + return + call_id = f"call_{event_uuid}" + is_subagent_delegation = _extract_subagent_delegation_flag(_event_metadata(ist)) or start_flag pending.observed_invocations.append( _ObservedInvocation( order_key=ist.payload.span_event_timestamp or ist.event_timestamp, tool_call=ATIFToolCall(tool_call_id=call_id, function_name=tool_name, arguments=tool_input), - observation=ATIFObservationResult(source_call_id=call_id, content=tool_output), + tool_output=tool_output, ancestry=_atif_ancestry_from_ist(ist), invocation=_atif_invocation_from_ist(ist, invocation_id=call_id), + is_subagent_delegation=is_subagent_delegation, )) +def _build_flat_observation_rows( + observed: list[_ObservedInvocation], + *, + subagent_ref_by_call_id: dict[str, SubagentTrajectoryRef] | None = None) -> list[ATIFObservationResult]: + """Build observation rows and attach explicit subagent refs when available.""" + results = [ + ATIFObservationResult(source_call_id=obs.tool_call.tool_call_id, content=obs.tool_output) for obs in observed + ] + + if not subagent_ref_by_call_id: + return results + + for i, obs in enumerate(observed): + ref = subagent_ref_by_call_id.get(obs.tool_call.tool_call_id) + if ref is not None: + results[i].subagent_trajectory_ref = [ref] + + return results + + +def _pass2_project_context_to_steps( + events: list[IntermediateStep], + *, + step_id_start: int, + include_workflow_start_user_step: bool, + subagent_ref_by_call_id: dict[str, SubagentTrajectoryRef] | None = None, +) -> tuple[list[ATIFStep], int, int, int, int]: + """Pass-2 projection for one context from IST events to ATIF steps.""" + atif_steps: list[ATIFStep] = [] + step_id = step_id_start + pending: _PendingAgentTurn | None = None + total_prompt = 0 + total_completion = 0 + total_cached = 0 + + def _flush_pending() -> None: + nonlocal step_id, pending + if pending is None: + return + sorted_invocations = sorted(pending.observed_invocations, key=lambda i: i.order_key) + tool_calls = [obs.tool_call for obs in sorted_invocations] or None + observations = _build_flat_observation_rows( + sorted_invocations, + subagent_ref_by_call_id=subagent_ref_by_call_id, + ) + observation = ATIFObservation(results=observations) if observations else None + tool_ancestry = [obs.ancestry for obs in sorted_invocations] + tool_invocations = [obs.invocation for obs in sorted_invocations] or None + if pending.ancestry is None: + raise ValueError("Pending ATIF step is missing required ancestry metadata") + step_extra = AtifStepExtra( + ancestry=pending.ancestry, + invocation=pending.invocation, + tool_ancestry=tool_ancestry, + tool_invocations=tool_invocations, + **pending.extra, + ) + atif_steps.append( + ATIFStep( + step_id=step_id, + source="agent", + message=pending.message, + timestamp=_epoch_to_iso(pending.timestamp), + model_name=pending.model_name, + tool_calls=tool_calls, + observation=observation, + metrics=pending.metrics, + extra=step_extra.model_dump(exclude_none=True), + )) + step_id += 1 + pending = None + + for ist in events: + event_type = ist.event_type + state = ist.event_state + + if include_workflow_start_user_step and event_type == IntermediateStepType.WORKFLOW_START: + user_input = "" + if ist.data and ist.data.input is not None: + user_input = _extract_user_input(ist.data.input) + step_extra = _atif_step_extra_model_from_ist(ist) + extra = step_extra.model_dump(exclude_none=True) + atif_steps.append( + ATIFStep( + step_id=step_id, + source="user", + message=user_input, + timestamp=_epoch_to_iso(ist.event_timestamp), + extra=extra or None, + )) + step_id += 1 + continue + + if event_type == IntermediateStepType.LLM_END: + _flush_pending() + llm_output = "" + if ist.data and ist.data.output is not None: + llm_output = _safe_str(ist.data.output) + metrics = _extract_metrics(ist) + if metrics: + total_prompt += metrics.prompt_tokens or 0 + total_completion += metrics.completion_tokens or 0 + total_cached += metrics.cached_tokens or 0 + pending = _PendingAgentTurn( + message=llm_output, + timestamp=ist.event_timestamp, + model_name=ist.name, + metrics=metrics, + ) + pending.ancestry = _atif_ancestry_from_ist(ist) + pending.invocation = _atif_invocation_from_ist(ist) + continue + + if event_type in (IntermediateStepType.TOOL_END, IntermediateStepType.FUNCTION_END): + if pending is None: + pending = _PendingAgentTurn( + message="", + timestamp=ist.event_timestamp, + model_name=None, + metrics=None, + ) + pending.ancestry = _atif_ancestry_from_ist(ist) + pending.invocation = _atif_invocation_from_ist(ist) + _record_observed_invocation(pending, ist) + continue + + if event_type == IntermediateStepType.WORKFLOW_END: + _flush_pending() + final_output = "" + if ist.data and ist.data.output is not None: + final_output = _safe_str(ist.data.output) + last_agent_msg = "" + last_agent_ts: float | None = None + for s in reversed(atif_steps): + if s.source == "agent": + last_agent_msg = str(s.message) + last_agent_ts = _iso_to_epoch(s.timestamp) if s.timestamp else None + break + should_emit_terminal_step = bool(final_output) and (final_output != last_agent_msg or + (last_agent_ts is not None + and ist.event_timestamp > last_agent_ts)) + if should_emit_terminal_step: + step_extra = _atif_step_extra_model_from_ist(ist) + extra = step_extra.model_dump(exclude_none=True) + atif_steps.append( + ATIFStep( + step_id=step_id, + source="agent", + message=final_output, + timestamp=_epoch_to_iso(ist.event_timestamp), + extra=extra or None, + )) + step_id += 1 + continue + + if state == IntermediateStepState.START: + continue + if event_type in (IntermediateStepType.LLM_NEW_TOKEN, IntermediateStepType.SPAN_CHUNK): + continue + if state == IntermediateStepState.END: + continue + + _flush_pending() + return atif_steps, step_id, total_prompt, total_completion, total_cached + + +def _pass1_build_execution_structure(sorted_steps: list[IntermediateStep], *, session_id: str) -> _ExecutionStructure: + """Build root and child context ownership from IST events.""" + delegation_flags_by_uuid: dict[str, bool] = {} + delegated_callable_ids: set[str] = set() + for ist in sorted_steps: + if ist.event_state == IntermediateStepState.START and _extract_subagent_delegation_flag(_event_metadata(ist)): + event_uuid = _event_uuid(ist) + if event_uuid: + delegation_flags_by_uuid[event_uuid] = True + function_id = ist.function_ancestry.function_id + if function_id: + delegated_callable_ids.add(function_id) + + end_events = [s for s in sorted_steps if s.event_state == IntermediateStepState.END] + children_by_parent: dict[str, list[IntermediateStep]] = {} + for e in end_events: + pid = e.function_ancestry.parent_id + if pid: + children_by_parent.setdefault(pid, []).append(e) + + wrapper_events: list[IntermediateStep] = [] + for e in end_events: + # Delegation boundaries are represented by callable-level completion. + # Do not use TOOL_END here: nested tool rows inherit parent ancestry and can + # otherwise be misclassified as delegation wrappers. + if e.event_type != IntermediateStepType.FUNCTION_END: + continue + event_uuid = _event_uuid(e) + if (_extract_subagent_delegation_flag(_event_metadata(e)) + or (event_uuid and delegation_flags_by_uuid.get(event_uuid, False)) + or e.function_ancestry.function_id in delegated_callable_ids): + wrapper_events.append(e) + + child_session_by_wrapper_call_id: dict[str, str] = {} + child_events_by_session: dict[str, list[IntermediateStep]] = {} + child_agent_name_by_session: dict[str, str] = {} + delegated_function_ids: set[str] = set() + wrapper_event_ids: set[str] = set() + + for wrapper in wrapper_events: + wrapper_uuid = _event_uuid(wrapper) + if not wrapper_uuid: + continue + wrapper_event_ids.add(wrapper_uuid) + wrapper_call_id = f"call_{wrapper_uuid}" + wrapper_fn_id = wrapper.function_ancestry.function_id + if not wrapper_fn_id: + continue + + subtree_ids: set[str] = set() + # Delegated callable scope is anchored by wrapper function_id itself. + # Include that function_id and recursively include end-event children. + frontier = [wrapper_fn_id] + while frontier: + node = frontier.pop() + if node in subtree_ids: + continue + subtree_ids.add(node) + for child in children_by_parent.get(node, []): + frontier.append(child.function_ancestry.function_id) + + child_events = [ + e for e in end_events if e.function_ancestry.function_id in subtree_ids and _event_uuid(e) != wrapper_uuid + ] + if not child_events: + continue + child_session_id = f"{session_id}:{wrapper_call_id}" + child_session_by_wrapper_call_id[wrapper_call_id] = child_session_id + child_events_by_session[child_session_id] = sorted(child_events, key=lambda s: s.event_timestamp) + child_agent_name_by_session[child_session_id] = ( + wrapper.name or wrapper.function_ancestry.function_name or "nat-agent" + ) + delegated_function_ids.update(subtree_ids) + + root_events = [ + e for e in sorted_steps + if (e.event_type in {IntermediateStepType.WORKFLOW_START, IntermediateStepType.WORKFLOW_END} + or _event_uuid(e) in wrapper_event_ids + or e.function_ancestry.function_id not in delegated_function_ids) + ] + subagent_ref_by_call_id = { + call_id: SubagentTrajectoryRef(session_id=child_sid) + for call_id, child_sid in child_session_by_wrapper_call_id.items() + } + return _ExecutionStructure( + root_events=root_events, + child_events_by_session=child_events_by_session, + child_agent_name_by_session=child_agent_name_by_session, + subagent_ref_by_call_id=subagent_ref_by_call_id, + ) + + # --------------------------------------------------------------------------- # Batch converter # --------------------------------------------------------------------------- @@ -250,6 +577,10 @@ def _record_observed_invocation(pending: _PendingAgentTurn, ist: IntermediateSte class IntermediateStepToATIFConverter: """Convert a complete list of NAT IntermediateSteps to an ATIF trajectory.""" + def __init__(self, *, allow_implicit_subagent_delegation: bool = False) -> None: + # Legacy option retained for API compatibility; clean converter ignores implicit delegation. + self._allow_implicit_subagent_delegation = allow_implicit_subagent_delegation + def convert( self, steps: list[IntermediateStep], @@ -258,214 +589,56 @@ def convert( agent_name: str | None = None, ) -> ATIFTrajectory: """Convert a list of IntermediateSteps to an ATIF trajectory.""" + trajectory_session_id = session_id or str(uuid.uuid4()) if not steps: return ATIFTrajectory( - session_id=session_id or str(uuid.uuid4()), + session_id=trajectory_session_id, agent=ATIFAgentConfig(name=agent_name or "nat-agent", version="0.0.0"), ) sorted_steps = sorted(steps, key=lambda s: s.event_timestamp) - atif_steps: list[ATIFStep] = [] - step_id = 1 - agent_config = ATIFAgentConfig(name=agent_name or "nat-agent", version="0.0.0") - tool_defs_captured = False - pending: _PendingAgentTurn | None = None - total_prompt = 0 - total_completion = 0 - total_cached = 0 - - def _flush_pending() -> None: - nonlocal step_id, pending - if pending is None: - return - sorted_invocations = sorted(pending.observed_invocations, key=lambda i: i.order_key) - tool_calls = [obs.tool_call for obs in sorted_invocations] or None - observations = [obs.observation for obs in sorted_invocations] - observation = ATIFObservation(results=observations) if observations else None - tool_ancestry = [obs.ancestry for obs in sorted_invocations] - tool_invocations = [obs.invocation for obs in sorted_invocations] or None - if pending.ancestry is None: - raise ValueError("Pending agent turn is missing required ATIF ancestry metadata") - step_extra = AtifStepExtra( - ancestry=pending.ancestry, - invocation=pending.invocation, - tool_ancestry=tool_ancestry, - tool_invocations=tool_invocations, - **pending.extra, - ) - atif_steps.append( - ATIFStep( - step_id=step_id, - source="agent", - message=pending.message, - timestamp=_epoch_to_iso(pending.timestamp), - model_name=pending.model_name, - tool_calls=tool_calls, - observation=observation, - metrics=pending.metrics, - extra=step_extra.model_dump(exclude_none=True), - )) - step_id += 1 - pending = None - for ist in sorted_steps: - event_type = ist.event_type - category = ist.event_category - state = ist.event_state - - if event_type == IntermediateStepType.WORKFLOW_START: - user_input = "" - if ist.data and ist.data.input is not None: - user_input = _extract_user_input(ist.data.input) - if agent_name is None: + if agent_name is None: + for ist in sorted_steps: + if ist.event_type == IntermediateStepType.WORKFLOW_START: fn_name = ist.function_ancestry.function_name if fn_name and fn_name != "root": agent_config.name = fn_name - step_extra = _atif_step_extra_model_from_ist(ist) - extra = step_extra.model_dump(exclude_none=True) - atif_steps.append( - ATIFStep( - step_id=step_id, - source="user", - message=user_input, - timestamp=_epoch_to_iso(ist.event_timestamp), - extra=extra or None, - )) - step_id += 1 - continue - - if event_type == IntermediateStepType.WORKFLOW_END: - _flush_pending() - final_output = "" - if ist.data and ist.data.output is not None: - final_output = _safe_str(ist.data.output) - last_agent_msg = "" - last_agent_ts: float | None = None - for s in reversed(atif_steps): - if s.source == "agent": - last_agent_msg = str(s.message) - last_agent_ts = _iso_to_epoch(s.timestamp) if s.timestamp else None break - should_emit_terminal_step = bool(final_output) and (final_output != last_agent_msg or - (last_agent_ts is not None - and ist.event_timestamp > last_agent_ts)) - if should_emit_terminal_step: - step_extra = _atif_step_extra_model_from_ist(ist) - extra = step_extra.model_dump(exclude_none=True) - atif_steps.append( - ATIFStep( - step_id=step_id, - source="agent", - message=final_output, - timestamp=_epoch_to_iso(ist.event_timestamp), - extra=extra or None, - )) - step_id += 1 - continue - if event_type == IntermediateStepType.LLM_END: - _flush_pending() - llm_output = "" - if ist.data and ist.data.output is not None: - llm_output = _safe_str(ist.data.output) - metrics = _extract_metrics(ist) - if metrics: - total_prompt += metrics.prompt_tokens or 0 - total_completion += metrics.completion_tokens or 0 - total_cached += metrics.cached_tokens or 0 - if not tool_defs_captured: - defs = _extract_tool_definitions(ist) - if defs: - agent_config.tool_definitions = defs - tool_defs_captured = True + for ist in sorted_steps: + if ist.event_type == IntermediateStepType.LLM_END: if ist.name and not agent_config.model_name: agent_config.model_name = ist.name - pending = _PendingAgentTurn( - message=llm_output, - timestamp=ist.event_timestamp, - model_name=ist.name, - metrics=metrics, - ) - pending.ancestry = _atif_ancestry_from_ist(ist) - pending.invocation = _atif_invocation_from_ist(ist) - continue - - if event_type == IntermediateStepType.TOOL_END: - if pending is not None: - _record_observed_invocation(pending, ist) - else: - orphan_pending = _PendingAgentTurn(message="", - timestamp=ist.event_timestamp, - model_name=None, - metrics=None) - orphan_pending.ancestry = _atif_ancestry_from_ist(ist) - _record_observed_invocation(orphan_pending, ist) - if not orphan_pending.observed_invocations: - continue - invocation = orphan_pending.observed_invocations[0] - step_extra = _atif_step_extra_model_from_ist(ist) - step_extra.tool_invocations = [invocation.invocation] - step_extra.tool_ancestry = [invocation.ancestry] - extra = step_extra.model_dump(exclude_none=True) - atif_steps.append( - ATIFStep( - step_id=step_id, - source="agent", - message="", - timestamp=_epoch_to_iso(ist.event_timestamp), - tool_calls=[invocation.tool_call], - observation=ATIFObservation(results=[invocation.observation]), - extra=extra or None, - )) - step_id += 1 - continue - - if event_type == IntermediateStepType.FUNCTION_END: - if pending is not None: - _record_observed_invocation(pending, ist) - else: - orphan_pending = _PendingAgentTurn(message="", - timestamp=ist.event_timestamp, - model_name=None, - metrics=None) - orphan_pending.ancestry = _atif_ancestry_from_ist(ist) - _record_observed_invocation(orphan_pending, ist) - if not orphan_pending.observed_invocations: - continue - invocation = orphan_pending.observed_invocations[0] - step_extra = _atif_step_extra_model_from_ist(ist) - step_extra.tool_invocations = [invocation.invocation] - step_extra.tool_ancestry = [invocation.ancestry] - extra = step_extra.model_dump(exclude_none=True) - atif_steps.append( - ATIFStep( - step_id=step_id, - source="agent", - message="", - timestamp=_epoch_to_iso(ist.event_timestamp), - tool_calls=[invocation.tool_call], - observation=ATIFObservation(results=[invocation.observation]), - extra=extra or None, - )) - step_id += 1 - continue - - if state == IntermediateStepState.START: - continue - if event_type == IntermediateStepType.LLM_NEW_TOKEN: - continue - if event_type == IntermediateStepType.SPAN_CHUNK: - continue - - if state == IntermediateStepState.END and category not in ( - IntermediateStepCategory.LLM, - IntermediateStepCategory.TOOL, - IntermediateStepCategory.WORKFLOW, - ): - continue + defs = _extract_tool_definitions(ist) + if defs and not agent_config.tool_definitions: + agent_config.tool_definitions = defs + + execution_structure = _pass1_build_execution_structure(sorted_steps, session_id=trajectory_session_id) + atif_steps, _, total_prompt, total_completion, total_cached = _pass2_project_context_to_steps( + execution_structure.root_events, + step_id_start=1, + include_workflow_start_user_step=True, + subagent_ref_by_call_id=execution_structure.subagent_ref_by_call_id, + ) - _flush_pending() + child_trajectories: dict[str, dict[str, Any]] = {} + for child_session_id, child_events in execution_structure.child_events_by_session.items(): + child_steps, _, _, _, _ = _pass2_project_context_to_steps( + child_events, + step_id_start=1, + include_workflow_start_user_step=False, + subagent_ref_by_call_id=None, + ) + child_agent = agent_config.model_copy(deep=True) + child_agent.name = execution_structure.child_agent_name_by_session.get(child_session_id, child_agent.name) + child_trajectory = ATIFTrajectory( + session_id=child_session_id, + agent=child_agent, + steps=child_steps, + ) + child_trajectories[child_session_id] = child_trajectory.model_dump(exclude_none=True, mode="json") final_metrics = None agent_step_count = sum(1 for s in atif_steps if s.source == "agent") @@ -477,11 +650,13 @@ def _flush_pending() -> None: total_steps=agent_step_count, ) + trajectory_extra = {"subagent_trajectories": child_trajectories} if child_trajectories else None return ATIFTrajectory( - session_id=session_id or str(uuid.uuid4()), + session_id=trajectory_session_id, agent=agent_config, steps=atif_steps, final_metrics=final_metrics, + extra=trajectory_extra, ) @@ -491,231 +666,53 @@ def _flush_pending() -> None: class ATIFStreamConverter: - """Stateful converter that emits ATIF steps incrementally.""" - - def __init__(self, agent_name: str = "nat-agent"): - self._step_id: int = 1 - self._agent_config = ATIFAgentConfig(name=agent_name, version="0.0.0") - self._tool_defs_captured = False - self._pending: _PendingAgentTurn | None = None - self._emitted_steps: list[ATIFStep] = [] - self._total_prompt = 0 - self._total_completion = 0 - self._total_cached = 0 + """Stateful converter that reuses the same two-pass conversion model. + + This stream adapter accumulates IST events and re-projects ATIF using the + same batch converter logic on each push. It keeps behavior consistent + between batch and stream conversion paths. + """ + + def __init__(self, agent_name: str = "nat-agent", *, allow_implicit_subagent_delegation: bool = False): + self._session_id: str = str(uuid.uuid4()) + self._agent_name = agent_name + self._buffered_events: list[IntermediateStep] = [] + self._last_projected_steps: list[ATIFStep] = [] + self._returned_step_count = 0 + self._latest_trajectory = ATIFTrajectory( + session_id=self._session_id, + agent=ATIFAgentConfig(name=agent_name, version="0.0.0"), + steps=[], + ) + self._batch_converter = IntermediateStepToATIFConverter( + allow_implicit_subagent_delegation=allow_implicit_subagent_delegation) @property def agent_config(self) -> ATIFAgentConfig: - """Current agent configuration (populated as steps arrive).""" - return self._agent_config + """Current agent configuration based on latest projection.""" + return self._latest_trajectory.agent def push(self, ist: IntermediateStep) -> ATIFStep | None: - """Process one IntermediateStep and return a flushed ATIF step if available.""" - event_type = ist.event_type - category = ist.event_category - state = ist.event_state - - if event_type == IntermediateStepType.WORKFLOW_START: - user_input = "" - if ist.data and ist.data.input is not None: - user_input = _extract_user_input(ist.data.input) - fn_name = ist.function_ancestry.function_name - if fn_name and fn_name != "root": - self._agent_config.name = fn_name - step_extra = _atif_step_extra_model_from_ist(ist) - extra = step_extra.model_dump(exclude_none=True) - step = ATIFStep( - step_id=self._step_id, - source="user", - message=user_input, - timestamp=_epoch_to_iso(ist.event_timestamp), - extra=extra or None, - ) - self._step_id += 1 - self._emitted_steps.append(step) + """Add one IST event and return next newly visible ATIF step.""" + self._buffered_events.append(ist) + self._latest_trajectory = self._batch_converter.convert( + steps=self._buffered_events, + session_id=self._session_id, + agent_name=self._agent_name, + ) + self._last_projected_steps = list(self._latest_trajectory.steps) + if self._returned_step_count < len(self._last_projected_steps): + step = self._last_projected_steps[self._returned_step_count] + self._returned_step_count += 1 return step - - if event_type == IntermediateStepType.WORKFLOW_END: - results: list[ATIFStep] = [] - flushed = self._flush_pending() - if flushed: - results.append(flushed) - final_output = "" - if ist.data and ist.data.output is not None: - final_output = _safe_str(ist.data.output) - last_agent_msg = "" - last_agent_ts: float | None = None - for s in reversed(self._emitted_steps): - if s.source == "agent": - last_agent_msg = str(s.message) - last_agent_ts = _iso_to_epoch(s.timestamp) if s.timestamp else None - break - should_emit_terminal_step = bool(final_output) and (final_output != last_agent_msg or - (last_agent_ts is not None - and ist.event_timestamp > last_agent_ts)) - if should_emit_terminal_step: - step_extra = _atif_step_extra_model_from_ist(ist) - extra = step_extra.model_dump(exclude_none=True) - final_step = ATIFStep( - step_id=self._step_id, - source="agent", - message=final_output, - timestamp=_epoch_to_iso(ist.event_timestamp), - extra=extra or None, - ) - self._step_id += 1 - self._emitted_steps.append(final_step) - results.append(final_step) - return results[0] if results else None - - if event_type == IntermediateStepType.LLM_END: - flushed = self._flush_pending() - llm_output = "" - if ist.data and ist.data.output is not None: - llm_output = _safe_str(ist.data.output) - metrics = _extract_metrics(ist) - if metrics: - self._total_prompt += metrics.prompt_tokens or 0 - self._total_completion += metrics.completion_tokens or 0 - self._total_cached += metrics.cached_tokens or 0 - if not self._tool_defs_captured: - defs = _extract_tool_definitions(ist) - if defs: - self._agent_config.tool_definitions = defs - self._tool_defs_captured = True - if ist.name and not self._agent_config.model_name: - self._agent_config.model_name = ist.name - self._pending = _PendingAgentTurn( - message=llm_output, - timestamp=ist.event_timestamp, - model_name=ist.name, - metrics=metrics, - ) - self._pending.ancestry = _atif_ancestry_from_ist(ist) - self._pending.invocation = _atif_invocation_from_ist(ist) - return flushed - - if event_type == IntermediateStepType.TOOL_END: - if self._pending is not None: - _record_observed_invocation(self._pending, ist) - return None - - orphan_pending = _PendingAgentTurn(message="", timestamp=ist.event_timestamp, model_name=None, metrics=None) - orphan_pending.ancestry = _atif_ancestry_from_ist(ist) - _record_observed_invocation(orphan_pending, ist) - if not orphan_pending.observed_invocations: - return None - invocation = orphan_pending.observed_invocations[0] - step_extra = _atif_step_extra_model_from_ist(ist) - step_extra.tool_invocations = [invocation.invocation] - step_extra.tool_ancestry = [invocation.ancestry] - extra = step_extra.model_dump(exclude_none=True) - orphan_step = ATIFStep( - step_id=self._step_id, - source="agent", - message="", - timestamp=_epoch_to_iso(ist.event_timestamp), - tool_calls=[invocation.tool_call], - observation=ATIFObservation(results=[invocation.observation]), - extra=extra or None, - ) - self._step_id += 1 - self._emitted_steps.append(orphan_step) - return orphan_step - - if event_type == IntermediateStepType.FUNCTION_END: - if self._pending is not None: - _record_observed_invocation(self._pending, ist) - return None - - orphan_pending = _PendingAgentTurn(message="", timestamp=ist.event_timestamp, model_name=None, metrics=None) - orphan_pending.ancestry = _atif_ancestry_from_ist(ist) - _record_observed_invocation(orphan_pending, ist) - if not orphan_pending.observed_invocations: - return None - invocation = orphan_pending.observed_invocations[0] - step_extra = _atif_step_extra_model_from_ist(ist) - step_extra.tool_invocations = [invocation.invocation] - step_extra.tool_ancestry = [invocation.ancestry] - extra = step_extra.model_dump(exclude_none=True) - orphan_step = ATIFStep( - step_id=self._step_id, - source="agent", - message="", - timestamp=_epoch_to_iso(ist.event_timestamp), - tool_calls=[invocation.tool_call], - observation=ATIFObservation(results=[invocation.observation]), - extra=extra or None, - ) - self._step_id += 1 - self._emitted_steps.append(orphan_step) - return orphan_step - - if state == IntermediateStepState.END and category not in ( - IntermediateStepCategory.LLM, - IntermediateStepCategory.TOOL, - IntermediateStepCategory.WORKFLOW, - ): - return None - return None def finalize(self) -> list[ATIFStep]: - """Flush any pending agent turn and return remaining steps.""" - result: list[ATIFStep] = [] - flushed = self._flush_pending() - if flushed: - result.append(flushed) - return result + """Return all projected ATIF steps not yet returned by `push`.""" + remaining = self._last_projected_steps[self._returned_step_count:] + self._returned_step_count = len(self._last_projected_steps) + return remaining def get_trajectory(self) -> ATIFTrajectory: - """Build the complete ATIF trajectory from all emitted steps.""" - agent_step_count = sum(1 for s in self._emitted_steps if s.source == "agent") - final_metrics = None - if self._total_prompt or self._total_completion or self._total_cached or agent_step_count: - final_metrics = ATIFFinalMetrics( - total_prompt_tokens=self._total_prompt or None, - total_completion_tokens=self._total_completion or None, - total_cached_tokens=self._total_cached or None, - total_steps=agent_step_count, - ) - return ATIFTrajectory( - agent=self._agent_config, - steps=list(self._emitted_steps), - final_metrics=final_metrics, - ) - - def _flush_pending(self) -> ATIFStep | None: - """Convert the pending turn into an ATIFStep and clear it.""" - if self._pending is None: - return None - pending = self._pending - sorted_invocations = sorted(pending.observed_invocations, key=lambda i: i.order_key) - tool_calls = [obs.tool_call for obs in sorted_invocations] or None - observations = [obs.observation for obs in sorted_invocations] - observation = ATIFObservation(results=observations) if observations else None - tool_ancestry = [obs.ancestry for obs in sorted_invocations] - tool_invocations = [obs.invocation for obs in sorted_invocations] or None - if pending.ancestry is None: - raise ValueError("Pending agent turn is missing required ATIF ancestry metadata") - step_extra = AtifStepExtra( - ancestry=pending.ancestry, - invocation=pending.invocation, - tool_ancestry=tool_ancestry, - tool_invocations=tool_invocations, - **pending.extra, - ) - step = ATIFStep( - step_id=self._step_id, - source="agent", - message=pending.message, - timestamp=_epoch_to_iso(pending.timestamp), - model_name=pending.model_name, - tool_calls=tool_calls, - observation=observation, - metrics=pending.metrics, - extra=step_extra.model_dump(exclude_none=True), - ) - self._step_id += 1 - self._emitted_steps.append(step) - self._pending = None - return step + """Return trajectory projected from all buffered IST events.""" + return self._latest_trajectory diff --git a/packages/nvidia_nat_core/tests/nat/utils/test_atif_converter.py b/packages/nvidia_nat_core/tests/nat/utils/test_atif_converter.py index a3a601b84b..9d3ad434a4 100644 --- a/packages/nvidia_nat_core/tests/nat/utils/test_atif_converter.py +++ b/packages/nvidia_nat_core/tests/nat/utils/test_atif_converter.py @@ -24,6 +24,7 @@ from nat.data_models.intermediate_step import IntermediateStepPayload from nat.data_models.intermediate_step import IntermediateStepType from nat.data_models.intermediate_step import StreamEventData +from nat.data_models.intermediate_step import TraceMetadata from nat.data_models.intermediate_step import UsageInfo from nat.data_models.invocation_node import InvocationNode from nat.data_models.token_usage import TokenUsageBaseModel @@ -943,6 +944,101 @@ def test_converter_ignores_non_exported_events(self, batch_converter: Intermedia assert result.steps[1].message == "final" assert result.steps[1].tool_calls is None + def test_implicit_subagent_delegation_is_disabled_by_default(self): + """Implicit delegation inference is opt-in and disabled by default.""" + converter = IntermediateStepToATIFConverter() + steps = [ + _make_step(IntermediateStepType.WORKFLOW_START, input_data="q", timestamp_offset=0.0), + _make_step(IntermediateStepType.LLM_END, output_data="thinking", timestamp_offset=1.0), + _make_step( + IntermediateStepType.FUNCTION_END, + name="child_agent", + timestamp_offset=2.0, + function_name="parent_agent", + function_id="wrapper-fn", + function_parent_id="root", + step_uuid="wrapper-step", + ), + _make_step( + IntermediateStepType.FUNCTION_END, + name="inner_tool", + timestamp_offset=3.0, + function_name="child_agent", + function_id="child-fn", + function_parent_id="wrapper-fn", + step_uuid="child-step", + ), + _make_step(IntermediateStepType.LLM_END, output_data="done", timestamp_offset=4.0), + _make_step(IntermediateStepType.WORKFLOW_END, output_data="done", timestamp_offset=5.0), + ] + + result = converter.convert(steps) + first_agent_turn = result.steps[1] + assert first_agent_turn.observation is not None + assert first_agent_turn.observation.results[0].subagent_trajectory_ref is None + + def test_implicit_subagent_delegation_can_be_enabled(self): + """Explicit delegation marker emits subagent refs and embedded child trajectory.""" + converter = IntermediateStepToATIFConverter(allow_implicit_subagent_delegation=True) + wrapper_uuid = "wrapper-step" + wrapper_start = _make_step( + IntermediateStepType.FUNCTION_START, + name="child_agent", + timestamp_offset=1.5, + function_name="parent_agent", + function_id="wrapper-fn", + function_parent_id="root", + step_uuid=wrapper_uuid, + ) + wrapper_start.payload.metadata = TraceMetadata(provided_metadata={"is_subagent_delegation": True}) + steps = [ + _make_step(IntermediateStepType.WORKFLOW_START, input_data="q", timestamp_offset=0.0), + _make_step(IntermediateStepType.LLM_END, output_data="thinking", timestamp_offset=1.0), + wrapper_start, + _make_step( + IntermediateStepType.FUNCTION_END, + name="child_agent", + timestamp_offset=2.0, + function_name="parent_agent", + function_id="wrapper-fn", + function_parent_id="root", + step_uuid=wrapper_uuid, + ), + _make_step( + IntermediateStepType.LLM_END, + name="child-model", + output_data="child thinking", + timestamp_offset=2.1, + function_name="child_agent", + function_id="child-root-fn", + function_parent_id="wrapper-fn", + ), + _make_step( + IntermediateStepType.FUNCTION_END, + name="inner_tool", + timestamp_offset=2.2, + function_name="child_agent", + function_id="child-fn", + function_parent_id="child-root-fn", + step_uuid="child-step", + ), + _make_step(IntermediateStepType.LLM_END, output_data="done", timestamp_offset=4.0), + _make_step(IntermediateStepType.WORKFLOW_END, output_data="done", timestamp_offset=5.0), + ] + + result = converter.convert(steps) + first_agent_turn = result.steps[1] + assert first_agent_turn.observation is not None + refs = first_agent_turn.observation.results[0].subagent_trajectory_ref + assert refs is not None + assert len(refs) == 1 + child_sid = refs[0].session_id + assert result.extra is not None + assert "subagent_trajectories" in result.extra + embedded = result.extra["subagent_trajectories"] + assert isinstance(embedded, dict) + assert child_sid in embedded + # --------------------------------------------------------------------------- # Stream converter tests @@ -953,177 +1049,38 @@ class TestStreamConverter: """Tests for ATIFStreamConverter.""" def test_workflow_start_emits_user_step(self): - """WORKFLOW_START produces an immediate user step.""" - converter = ATIFStreamConverter() - step = _make_step( - IntermediateStepType.WORKFLOW_START, - input_data="hello", - timestamp_offset=0.0, - ) - result = converter.push(step) - assert result is not None - assert result.source == "user" - assert result.message == "hello" - - def test_llm_end_flushes_previous_turn(self): - """Second LLM_END flushes the first turn.""" + """`WORKFLOW_START` emits the first projected user step.""" converter = ATIFStreamConverter() - converter.push(_make_step( - IntermediateStepType.WORKFLOW_START, - input_data="q", - timestamp_offset=0.0, - )) - # First LLM_END → creates pending, nothing to flush yet - result1 = converter.push( - _make_step( - IntermediateStepType.LLM_END, - name="gpt-4", - output_data="thinking...", - timestamp_offset=1.0, - )) - assert result1 is None # Nothing flushed yet - - # Second LLM_END → flushes the first turn - result2 = converter.push( - _make_step( - IntermediateStepType.LLM_END, - name="gpt-4", - output_data="done", - timestamp_offset=2.0, - )) - assert result2 is not None - assert result2.source == "agent" - assert result2.message == "thinking..." - - def test_tool_end_attaches_to_pending(self): - """TOOL_END attaches to the current pending agent turn.""" - converter = ATIFStreamConverter() - converter.push(_make_step( - IntermediateStepType.WORKFLOW_START, - input_data="q", - timestamp_offset=0.0, - )) - converter.push(_make_step( - IntermediateStepType.LLM_END, - output_data="let me search", - timestamp_offset=1.0, - )) result = converter.push( _make_step( - IntermediateStepType.TOOL_END, - name="search", - input_data={"query": "test"}, - output_data="found it", - timestamp_offset=2.0, - step_uuid="tool-1", - )) - # Tool attaches to pending, doesn't emit yet - assert result is None - - # Finalize flushes - remaining = converter.finalize() - assert len(remaining) == 1 - flushed = remaining[0] - assert flushed.tool_calls is not None - assert len(flushed.tool_calls) == 1 - assert flushed.tool_calls[0].function_name == "search" - assert flushed.observation.results[0].content == "found it" - - def test_stream_converter_emits_orphan_tool_end(self): - """Orphan `TOOL_END` emits an immediate standalone agent step.""" - converter = ATIFStreamConverter() - converter.push(_make_step( - IntermediateStepType.WORKFLOW_START, - input_data="q", - timestamp_offset=0.0, - )) - orphan = converter.push( - _make_step( - IntermediateStepType.TOOL_END, - name="search", - input_data='{"query": "orphan"}', - output_data="found orphan", - timestamp_offset=1.0, - step_uuid="stream-orphan-tool-1", + IntermediateStepType.WORKFLOW_START, + input_data="hello", + timestamp_offset=0.0, )) - assert orphan is not None - assert orphan.source == "agent" - assert orphan.message == "" - assert orphan.tool_calls is not None - assert len(orphan.tool_calls) == 1 - assert orphan.observation is not None - assert len(orphan.observation.results) == 1 - assert orphan.observation.results[0].source_call_id == orphan.tool_calls[0].tool_call_id - - def test_stream_converter_populates_tool_definitions_from_llm_metadata(self): - """`LLM_END` metadata tool schemas populate stream converter agent config.""" - from nat.data_models.intermediate_step import ToolDetails - from nat.data_models.intermediate_step import ToolParameters - from nat.data_models.intermediate_step import ToolSchema - from nat.data_models.intermediate_step import TraceMetadata + assert result is not None + assert result.source == "user" + assert result.message == "hello" + def test_finalize_returns_unreturned_steps(self): + """`finalize` returns projected steps not returned by `push`.""" converter = ATIFStreamConverter() - converter.push(_make_step( - IntermediateStepType.WORKFLOW_START, - input_data="q", - timestamp_offset=0.0, - )) - llm_end = _make_step( - IntermediateStepType.LLM_END, - name="gpt-4", - output_data="using tools", - timestamp_offset=1.0, - ) - llm_end.payload.metadata = TraceMetadata(tools_schema=[ - ToolSchema( - type="function", - function=ToolDetails( - name="weather", - description="Get weather", - parameters=ToolParameters(properties={}), - ), - ) - ]) - - pushed = converter.push(llm_end) - assert pushed is None - assert converter.agent_config.tool_definitions is not None - assert len(converter.agent_config.tool_definitions) == 1 - assert converter.agent_config.tool_definitions[0]["function"]["name"] == "weather" - - def test_finalize_flushes_pending(self): - """finalize() returns any remaining pending turn.""" - converter = ATIFStreamConverter() - converter.push(_make_step( - IntermediateStepType.WORKFLOW_START, - input_data="q", - timestamp_offset=0.0, - )) - converter.push(_make_step( - IntermediateStepType.LLM_END, - output_data="answer", - timestamp_offset=1.0, - )) + converter.push(_make_step(IntermediateStepType.WORKFLOW_START, input_data="q", timestamp_offset=0.0)) + converter.push(_make_step(IntermediateStepType.LLM_END, output_data="thinking", timestamp_offset=1.0)) remaining = converter.finalize() assert len(remaining) == 1 - assert remaining[0].message == "answer" - - def test_finalize_empty_when_nothing_pending(self): - """finalize() returns empty list if no pending turn.""" - converter = ATIFStreamConverter() - assert converter.finalize() == [] + assert remaining[0].source == "agent" + assert remaining[0].message == "thinking" def test_get_trajectory_builds_complete( self, simple_trajectory: list[IntermediateStep], ): - """get_trajectory() returns a complete trajectory after all steps.""" + """`get_trajectory` returns the latest projected trajectory.""" converter = ATIFStreamConverter() for ist in simple_trajectory: converter.push(ist) converter.finalize() trajectory = converter.get_trajectory() - assert isinstance(trajectory, ATIFTrajectory) assert trajectory.schema_version == "ATIF-v1.6" assert len(trajectory.steps) >= 2 @@ -1134,18 +1091,14 @@ def test_stream_matches_batch( simple_trajectory: list[IntermediateStep], batch_converter: IntermediateStepToATIFConverter, ): - """Stream converter produces the same steps as batch converter.""" + """Stream converter uses the same two-pass projection as batch.""" batch_result = batch_converter.convert(simple_trajectory, session_id="test") - stream_conv = ATIFStreamConverter() for ist in simple_trajectory: stream_conv.push(ist) stream_conv.finalize() stream_result = stream_conv.get_trajectory() - assert len(stream_result.steps) == len(batch_result.steps) for s_step, b_step in zip(stream_result.steps, batch_result.steps, strict=True): assert s_step.source == b_step.source assert s_step.message == b_step.message - if b_step.tool_calls: - assert len(s_step.tool_calls) == len(b_step.tool_calls) diff --git a/packages/nvidia_nat_eval/scripts/README.md b/packages/nvidia_nat_eval/scripts/README.md new file mode 100644 index 0000000000..d94df3959b --- /dev/null +++ b/packages/nvidia_nat_eval/scripts/README.md @@ -0,0 +1,43 @@ + + +# Scripts for ATIF Evaluation Utilities + +## Convert `workflow_output.json` to `workflow_output_atif.json` + +Use `convert_workflow_output_to_atif.py` to convert legacy IST workflow dumps into ATIF sample output format. + +### Script + +- `packages/nvidia_nat_eval/scripts/convert_workflow_output_to_atif.py` + +### Example + +```bash +python packages/nvidia_nat_eval/scripts/convert_workflow_output_to_atif.py \ + --input ".tmp/nat/examples/advanced_agents/alert_triage_agent/output/offline_atif/workflow_output.json" \ + --output-dir ".tmp/nat/examples/advanced_agents/alert_triage_agent/output/offline_atif" +``` + +This writes: + +- `.tmp/nat/examples/advanced_agents/alert_triage_agent/output/offline_atif/workflow_output_atif.json` + +### Required input setting + +To make conversion complete, the IST dump must include all events. + +- Set `workflow_output_step_filter: []` in your eval config (empty list means no filtering). +- If event filtering is enabled, required IST events can be missing and conversion output can be incomplete or incorrect. + +For example, in: + +- `examples/advanced_agents/alert_triage_agent/configs/config_offline_atif.yml` + +keep: + +```yaml +workflow_output_step_filter: [] +``` diff --git a/packages/nvidia_nat_eval/scripts/convert_workflow_output_to_atif.py b/packages/nvidia_nat_eval/scripts/convert_workflow_output_to_atif.py new file mode 100644 index 0000000000..d0ed6c24a6 --- /dev/null +++ b/packages/nvidia_nat_eval/scripts/convert_workflow_output_to_atif.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Convert legacy `workflow_output.json` (IST) into `workflow_output_atif.json`. + +Example: + python packages/nvidia_nat_eval/scripts/convert_workflow_output_to_atif.py \ + --input ".tmp/nat/examples/advanced_agents/alert_triage_agent/output/offline_atif/workflow_output.json" \ + --output-dir ".tmp/nat/examples/advanced_agents/alert_triage_agent/output/offline_atif" +""" + +from __future__ import annotations + +import argparse +import json +from pathlib import Path +from typing import Any + +from nat.data_models.intermediate_step import IntermediateStep +from nat.utils.atif_converter import IntermediateStepToATIFConverter + + +def _load_json(path: Path) -> Any: + return json.loads(path.read_text(encoding="utf-8")) + + +def _iter_items(payload: Any) -> list[dict[str, Any]]: + """Normalize legacy workflow output payload into item dictionaries.""" + if isinstance(payload, list): + return [item for item in payload if isinstance(item, dict) and isinstance(item.get("intermediate_steps"), list)] + if isinstance(payload, dict) and isinstance(payload.get("intermediate_steps"), list): + return [payload] + raise ValueError("Unsupported input JSON shape. Expected item(s) with `intermediate_steps`.") + + +def _session_id_for_item(item_id: Any) -> str: + return str(item_id) + + +def _convert_item(item: dict[str, Any], + *, + converter: IntermediateStepToATIFConverter, + item_index: int, + agent_name: str | None) -> dict[str, Any]: + """Convert one legacy item with `intermediate_steps` to ATIF sample shape.""" + item_id = item.get("id", item_index) + steps = [IntermediateStep.model_validate(raw_step) for raw_step in item["intermediate_steps"]] + trajectory = converter.convert( + steps=steps, + session_id=_session_id_for_item(item_id), + agent_name=agent_name, + ) + + subagent_trajectories: dict[str, Any] = {} + if isinstance(trajectory.extra, dict): + embedded = trajectory.extra.get("subagent_trajectories") + if isinstance(embedded, dict): + subagent_trajectories = embedded + + return { + "item_id": item_id, + "trajectory": trajectory.model_dump(mode="json"), + "subagent_trajectories": subagent_trajectories, + "expected_output_obj": item.get("answer"), + "output_obj": item.get("generated_answer"), + "metadata": {}, + } + + +def _resolve_output_path(input_path: Path, output_path: Path | None, output_dir: Path | None) -> Path: + if output_path is not None: + return output_path + if output_dir is not None: + return output_dir / "workflow_output_atif.json" + return input_path.with_name("workflow_output_atif.json") + + +def main() -> None: + """Parse arguments and run IST to ATIF conversion.""" + parser = argparse.ArgumentParser( + description="Convert legacy workflow_output.json (IntermediateStep dump) to workflow_output_atif.json") + parser.add_argument("--input", type=Path, required=True, help="Path to workflow_output.json") + parser.add_argument("--output", type=Path, default=None, help="Output file path for workflow_output_atif.json") + parser.add_argument("--output-dir", type=Path, default=None, help="Directory to write workflow_output_atif.json") + parser.add_argument("--agent-name", default=None, help="Optional agent name override") + parser.add_argument("--compact", action="store_true", help="Write compact JSON (no pretty indentation)") + args = parser.parse_args() + + if args.output is not None and args.output_dir is not None: + raise ValueError("Provide only one of `--output` or `--output-dir`.") + + payload = _load_json(args.input) + items = _iter_items(payload) + converter = IntermediateStepToATIFConverter() + converted = [ + _convert_item(item, converter=converter, item_index=i, agent_name=args.agent_name) + for i, item in enumerate(items) + ] + + output_path = _resolve_output_path(args.input, args.output, args.output_dir) + output_path.parent.mkdir(parents=True, exist_ok=True) + output_text = json.dumps(converted, ensure_ascii=True, indent=None if args.compact else 2) + output_path.write_text(output_text + "\n", encoding="utf-8") + print(f"Wrote {len(converted)} ATIF sample(s) to {output_path}") + + +if __name__ == "__main__": + main() diff --git a/packages/nvidia_nat_eval/src/nat/plugins/eval/evaluator/atif_evaluator.py b/packages/nvidia_nat_eval/src/nat/plugins/eval/evaluator/atif_evaluator.py index c5b5720efa..339ab304cf 100644 --- a/packages/nvidia_nat_eval/src/nat/plugins/eval/evaluator/atif_evaluator.py +++ b/packages/nvidia_nat_eval/src/nat/plugins/eval/evaluator/atif_evaluator.py @@ -33,6 +33,11 @@ class AtifEvalSample(BaseModel): item_id: Any = Field(description="Identifier matching the source EvalInputItem.") trajectory: ATIFTrajectory = Field(description="Canonical ATIF trajectory.") + subagent_trajectories: dict[str, ATIFTrajectory] = Field( + default_factory=dict, + description=("In-memory child trajectories keyed by " + "`subagent_trajectory_ref.session_id`."), + ) expected_output_obj: Any = Field(default=None, description="Optional expected output reference.") output_obj: Any = Field(default=None, description="Optional workflow output reference.") metadata: dict[str, Any] = Field(default_factory=dict, description="Optional evaluator metadata.") diff --git a/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py b/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py index ed599fa20f..25e32919f9 100644 --- a/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py +++ b/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py @@ -21,6 +21,7 @@ from __future__ import annotations +import logging from collections.abc import Mapping from typing import Any @@ -31,12 +32,18 @@ from nat.plugins.eval.evaluator.atif_evaluator import AtifEvalSampleList from nat.utils.atif_converter import IntermediateStepToATIFConverter +logger = logging.getLogger(__name__) + class EvalAtifAdapter: """Build and cache ATIF trajectories for eval items.""" - def __init__(self, converter: IntermediateStepToATIFConverter | None = None) -> None: - self._converter = converter or IntermediateStepToATIFConverter() + def __init__(self, + converter: IntermediateStepToATIFConverter | None = None, + *, + allow_implicit_subagent_delegation: bool = False) -> None: + self._converter = converter or IntermediateStepToATIFConverter( + allow_implicit_subagent_delegation=allow_implicit_subagent_delegation) self._cache: dict[str, ATIFTrajectory] = {} @staticmethod @@ -44,6 +51,11 @@ def _cache_key(item_id: Any) -> str: item_type = type(item_id) return f"{item_type.__module__}.{item_type.__qualname__}:{item_id!r}" + @staticmethod + def _trajectory_session_id(item_id: Any) -> str: + """Return the canonical ATIF trajectory session identifier.""" + return str(item_id) + def _coerce_trajectory(self, value: Any) -> ATIFTrajectory: if isinstance(value, ATIFTrajectory): return value @@ -51,6 +63,130 @@ def _coerce_trajectory(self, value: Any) -> ATIFTrajectory: return ATIFTrajectory.model_validate(value) raise TypeError(f"Unsupported ATIF trajectory payload type: {type(value)}") + @staticmethod + def _step_ancestry_ids(step) -> tuple[str | None, str | None]: + """Return (function_id, parent_id) from a step's extra ancestry payload.""" + extra = step.extra or {} + if not isinstance(extra, dict): + return None, None + ancestry = extra.get("ancestry") + if not isinstance(ancestry, dict): + return None, None + function_id = ancestry.get("function_id") + parent_id = ancestry.get("parent_id") + if function_id is not None: + function_id = str(function_id) + if parent_id is not None: + parent_id = str(parent_id) + return function_id, parent_id + + @staticmethod + def _collect_subtree_function_ids(trajectory: ATIFTrajectory, root_function_id: str) -> set[str]: + """Collect function_ids in the ancestry subtree rooted at root_function_id.""" + parent_to_children: dict[str, set[str]] = {} + for step in trajectory.steps: + function_id, parent_id = EvalAtifAdapter._step_ancestry_ids(step) + if not function_id or not parent_id: + continue + parent_to_children.setdefault(parent_id, set()).add(function_id) + + subtree_ids: set[str] = set() + frontier = [root_function_id] + while frontier: + node = frontier.pop() + if node in subtree_ids: + continue + subtree_ids.add(node) + frontier.extend(parent_to_children.get(node, ())) + return subtree_ids + + @staticmethod + def _build_subtrajectory(trajectory: ATIFTrajectory, *, session_id: str, + root_function_id: str) -> ATIFTrajectory | None: + """Build a child trajectory view for a delegated function subtree.""" + subtree_ids = EvalAtifAdapter._collect_subtree_function_ids(trajectory, root_function_id) + if not subtree_ids: + return None + + selected_steps = [] + for step in trajectory.steps: + function_id, _ = EvalAtifAdapter._step_ancestry_ids(step) + if function_id in subtree_ids: + selected_steps.append(step.model_copy(deep=True)) + if not selected_steps: + return None + + reindexed_steps = [step.model_copy(update={"step_id": idx}) for idx, step in enumerate(selected_steps, start=1)] + return ATIFTrajectory( + schema_version=trajectory.schema_version, + session_id=session_id, + agent=trajectory.agent.model_copy(deep=True), + steps=reindexed_steps, + notes=trajectory.notes, + extra={ + "parent_session_id": trajectory.session_id, + "root_function_id": root_function_id, + }, + ) + + def _build_subagent_trajectory_map(self, trajectory: ATIFTrajectory, *, item_id: Any) -> dict[str, ATIFTrajectory]: + """Build in-memory session_id -> trajectory map for subagent references.""" + extra = trajectory.extra if isinstance(trajectory.extra, dict) else None + embedded = extra.get("subagent_trajectories") if extra else None + if isinstance(embedded, Mapping): + parsed: dict[str, ATIFTrajectory] = {} + for sid, value in embedded.items(): + try: + parsed[str(sid)] = value if isinstance(value, + ATIFTrajectory) else ATIFTrajectory.model_validate(value) + except Exception: + logger.warning("Invalid embedded subagent trajectory for item_id=%s session_id=%s", item_id, sid) + if parsed: + return parsed + + by_session_id: dict[str, ATIFTrajectory] = {} + unresolved: list[str] = [] + + for step in trajectory.steps: + if step.observation is None: + continue + for result in step.observation.results: + refs = result.subagent_trajectory_ref or [] + for ref in refs: + session_id = ref.session_id + if not session_id: + continue + if session_id in by_session_id: + continue + root_function_id = None + if isinstance(ref.extra, dict): + child_function_id = ref.extra.get("child_function_id") + if child_function_id is not None: + root_function_id = str(child_function_id) + child_traj = (self._build_subtrajectory( + trajectory, + session_id=session_id, + root_function_id=root_function_id, + ) if root_function_id else None) + if child_traj is None: + unresolved.append(session_id) + # Degraded fallback keeps refs resolvable in-memory. + child_traj = trajectory.model_copy( + update={ + "session_id": session_id, + "extra": { + "parent_session_id": trajectory.session_id, + "resolution_mode": "fallback_full_trajectory", + }, + }, + deep=True, + ) + by_session_id[session_id] = child_traj + + if unresolved: + logger.warning("ATIF subagent map fallback for item_id=%s; unresolved refs=%s", item_id, unresolved) + return by_session_id + def get_trajectory(self, item: EvalInputItem, prebuilt: ATIFTrajectory | Mapping[str, Any] | None = None) -> ATIFTrajectory: @@ -62,7 +198,10 @@ def get_trajectory(self, if prebuilt is not None: trajectory = self._coerce_trajectory(prebuilt) else: - trajectory = self._converter.convert(steps=item.trajectory, session_id=key) + trajectory = self._converter.convert( + steps=item.trajectory, + session_id=self._trajectory_session_id(item.id), + ) self._cache[key] = trajectory return trajectory @@ -89,10 +228,12 @@ def build_samples( samples: AtifEvalSampleList = [] for item in eval_input.eval_input_items: trajectory = self._cache[self._cache_key(item.id)] + subagent_map = self._build_subagent_trajectory_map(trajectory, item_id=item.id) samples.append( AtifEvalSample( item_id=item.id, trajectory=trajectory, + subagent_trajectories=subagent_map, expected_output_obj=item.expected_output_obj, output_obj=item.output_obj, metadata={}, diff --git a/packages/nvidia_nat_eval/tests/eval/test_atif_adapter.py b/packages/nvidia_nat_eval/tests/eval/test_atif_adapter.py index f56334c4cf..ba4aab4543 100644 --- a/packages/nvidia_nat_eval/tests/eval/test_atif_adapter.py +++ b/packages/nvidia_nat_eval/tests/eval/test_atif_adapter.py @@ -73,3 +73,60 @@ def test_build_samples_uses_prebuilt_trajectory_without_conversion(): assert len(samples) == 1 assert samples[0].trajectory.agent.name == "prebuilt-agent" assert samples[0].item_id == "sample-a" + + +def test_build_samples_populates_in_memory_subagent_map(): + converter = _CountingConverter() + adapter = EvalAtifAdapter(converter=converter) + item = _make_eval_input_item("sample-b") + eval_input = EvalInput(eval_input_items=[item]) + prebuilt = ATIFTrajectory.model_validate({ + "session_id": + "sample-b", + "agent": { + "name": "prebuilt-agent", + "version": "0.0.0", + }, + "steps": [{ + "step_id": 1, + "source": "agent", + "message": "", + "tool_calls": [{ + "tool_call_id": "call_parent", + "function_name": "delegator", + "arguments": {}, + }], + "observation": { + "results": [{ + "source_call_id": + "call_parent", + "content": + "delegated", + "subagent_trajectory_ref": [{ + "session_id": "sample-b:call_child", + "trajectory_path": None, + "extra": { + "child_function_id": "child-fn", + }, + }], + }], + }, + "extra": { + "ancestry": { + "function_id": "child-fn", + "parent_id": "parent-fn", + "function_name": "delegator", + }, + }, + }], + }) + + samples = adapter.build_samples(eval_input, prebuilt_trajectories={"sample-b": prebuilt}) + + assert len(samples) == 1 + sub_map = samples[0].subagent_trajectories + assert isinstance(sub_map, dict) + assert "sample-b:call_child" in sub_map + child = sub_map["sample-b:call_child"] + assert child.session_id == "sample-b:call_child" + assert child.steps[0].step_id == 1