From c1a76b686544858a4d7354e3cdcc3a4423aa4c4f Mon Sep 17 00:00:00 2001 From: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> Date: Tue, 7 Apr 2026 08:23:12 -0700 Subject: [PATCH 01/13] subagent traj ref Signed-off-by: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> --- .../telemetry_metrics_analysis_agent.py | 11 +- .../src/nat/utils/atif_converter.py | 129 +++++++++++++++--- .../plugins/eval/evaluator/atif_evaluator.py | 5 + .../nat/plugins/eval/runtime/atif_adapter.py | 115 ++++++++++++++++ .../tests/eval/test_atif_adapter.py | 56 ++++++++ 5 files changed, 296 insertions(+), 20 deletions(-) diff --git a/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/telemetry_metrics_analysis_agent.py b/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/telemetry_metrics_analysis_agent.py index a8321eaad5..7d0a2089a5 100644 --- a/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/telemetry_metrics_analysis_agent.py +++ b/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/telemetry_metrics_analysis_agent.py @@ -16,11 +16,13 @@ from pydantic import Field from nat.builder.builder import Builder +from nat.builder.context import Context from nat.builder.framework_enum import LLMFrameworkEnum from nat.builder.function_info import FunctionInfo from nat.cli.register_workflow import register_function from nat.data_models.component_ref import LLMRef from nat.data_models.function import FunctionBaseConfig +from nat.data_models.intermediate_step import TraceMetadata from . import utils from .prompts import TelemetryMetricsAnalysisAgentPrompts @@ -88,7 +90,14 @@ def telemetry_metrics_analysis_agent(state: MessagesState): # Execute analysis and get response input_message = f"Host to investigate: {host_id}. Alert type: {alert_type}" - response = await agent_executor.ainvoke({"messages": [HumanMessage(content=input_message)]}) + delegation_metadata = TraceMetadata(provided_metadata={"is_subagent_delegation": True}) + with Context.get().push_active_function("telemetry_metrics_analysis_subagent_call", + input_data={ + "host_id": host_id, + "alert_type": alert_type, + }, + metadata=delegation_metadata): + response = await agent_executor.ainvoke({"messages": [HumanMessage(content=input_message)]}) conclusion = response["messages"][-1].content diff --git a/packages/nvidia_nat_core/src/nat/utils/atif_converter.py b/packages/nvidia_nat_core/src/nat/utils/atif_converter.py index a0c9944bf1..4bdbb12bd2 100644 --- a/packages/nvidia_nat_core/src/nat/utils/atif_converter.py +++ b/packages/nvidia_nat_core/src/nat/utils/atif_converter.py @@ -44,6 +44,7 @@ from nat.atif import ATIFStepMetrics from nat.atif import ATIFToolCall from nat.atif import ATIFTrajectory +from nat.atif import SubagentTrajectoryRef from nat.data_models.intermediate_step import IntermediateStep from nat.data_models.intermediate_step import IntermediateStepCategory from nat.data_models.intermediate_step import IntermediateStepState @@ -190,6 +191,18 @@ def _parse_tool_arguments(raw_input: Any) -> dict[str, Any]: return {} +def _extract_subagent_delegation_flag(metadata: Any) -> bool: + """Extract optional subagent delegation flag from metadata payload.""" + if isinstance(metadata, TraceMetadata): + provided = metadata.provided_metadata + if isinstance(provided, dict): + return bool(provided.get("is_subagent_delegation", provided.get("subagent_delegation", False))) + return False + if isinstance(metadata, dict): + return bool(metadata.get("is_subagent_delegation", metadata.get("subagent_delegation", False))) + return False + + # --------------------------------------------------------------------------- # Internal accumulator # --------------------------------------------------------------------------- @@ -201,9 +214,10 @@ class _ObservedInvocation: order_key: float tool_call: ATIFToolCall - observation: ATIFObservationResult + tool_output: str ancestry: AtifAncestry invocation: AtifInvocationInfo + is_subagent_delegation: bool class _PendingAgentTurn: @@ -220,7 +234,10 @@ def __init__(self, message: str, timestamp: float, model_name: str | None, metri self.observed_invocations: list[_ObservedInvocation] = [] -def _record_observed_invocation(pending: _PendingAgentTurn, ist: IntermediateStep) -> None: +def _record_observed_invocation(pending: _PendingAgentTurn, + ist: IntermediateStep, + *, + start_flag: bool = False) -> None: """Record an observed invocation as a tool_call + observation pair.""" tool_name = ist.name or "unknown_tool" if tool_name == "": @@ -232,16 +249,71 @@ def _record_observed_invocation(pending: _PendingAgentTurn, ist: IntermediateSte tool_input = _parse_tool_arguments(ist.data.input) tool_output = _safe_str(ist.data.output) call_id = f"call_{ist.UUID}" + is_subagent_delegation = _extract_subagent_delegation_flag(ist.metadata) or start_flag pending.observed_invocations.append( _ObservedInvocation( order_key=ist.payload.span_event_timestamp or ist.event_timestamp, tool_call=ATIFToolCall(tool_call_id=call_id, function_name=tool_name, arguments=tool_input), - observation=ATIFObservationResult(source_call_id=call_id, content=tool_output), + tool_output=tool_output, ancestry=_atif_ancestry_from_ist(ist), invocation=_atif_invocation_from_ist(ist, invocation_id=call_id), + is_subagent_delegation=is_subagent_delegation, )) +def _build_observation_results(observed: list[_ObservedInvocation], trajectory_session_id: str) -> list[ATIFObservationResult]: + """Build observation rows and attach in-memory subagent refs when detected. + + A wrapper/delegation pattern is inferred when: + - either the wrapper invocation is explicitly flagged as subagent delegation, or + - the wrapper row appears to represent delegation based on existing lineage + (tool call name differs from ancestry function name), and + - another invocation in the same ATIF step has: + - ancestry.function_name equal to the wrapper tool call name, and + - ancestry.parent_id equal to the wrapper ancestry.function_id. + """ + results = [ + ATIFObservationResult(source_call_id=obs.tool_call.tool_call_id, content=obs.tool_output) + for obs in observed + ] + + for i, wrapper in enumerate(observed): + wrapper_call_name = wrapper.tool_call.function_name + wrapper_ancestry = wrapper.ancestry + inferred_delegation = wrapper_ancestry.function_name != wrapper_call_name + if not (wrapper.is_subagent_delegation or inferred_delegation): + continue + + refs: list[SubagentTrajectoryRef] = [] + seen_ref_session_ids: set[str] = set() + for child in observed: + child_ancestry = child.ancestry + if child_ancestry.function_name != wrapper_call_name: + continue + if child_ancestry.parent_id != wrapper_ancestry.function_id: + continue + + ref_session_id = f"{trajectory_session_id}:{child.tool_call.tool_call_id}" + if ref_session_id in seen_ref_session_ids: + continue + seen_ref_session_ids.add(ref_session_id) + refs.append( + SubagentTrajectoryRef( + session_id=ref_session_id, + extra={ + "parent_tool_call_id": wrapper.tool_call.tool_call_id, + "child_tool_call_id": child.tool_call.tool_call_id, + "child_function_id": child_ancestry.function_id, + "child_function_name": child_ancestry.function_name, + }, + )) + + if refs: + results[i].subagent_trajectory_ref = refs + + return results + + # --------------------------------------------------------------------------- # Batch converter # --------------------------------------------------------------------------- @@ -258,9 +330,10 @@ def convert( agent_name: str | None = None, ) -> ATIFTrajectory: """Convert a list of IntermediateSteps to an ATIF trajectory.""" + trajectory_session_id = session_id or str(uuid.uuid4()) if not steps: return ATIFTrajectory( - session_id=session_id or str(uuid.uuid4()), + session_id=trajectory_session_id, agent=ATIFAgentConfig(name=agent_name or "nat-agent", version="0.0.0"), ) @@ -274,6 +347,7 @@ def convert( total_prompt = 0 total_completion = 0 total_cached = 0 + delegation_flags_by_uuid: dict[str, bool] = {} def _flush_pending() -> None: nonlocal step_id, pending @@ -281,7 +355,7 @@ def _flush_pending() -> None: return sorted_invocations = sorted(pending.observed_invocations, key=lambda i: i.order_key) tool_calls = [obs.tool_call for obs in sorted_invocations] or None - observations = [obs.observation for obs in sorted_invocations] + observations = _build_observation_results(sorted_invocations, trajectory_session_id) observation = ATIFObservation(results=observations) if observations else None tool_ancestry = [obs.ancestry for obs in sorted_invocations] tool_invocations = [obs.invocation for obs in sorted_invocations] or None @@ -314,6 +388,9 @@ def _flush_pending() -> None: category = ist.event_category state = ist.event_state + if state == IntermediateStepState.START and _extract_subagent_delegation_flag(ist.metadata): + delegation_flags_by_uuid[ist.UUID] = True + if event_type == IntermediateStepType.WORKFLOW_START: user_input = "" if ist.data and ist.data.input is not None: @@ -392,21 +469,23 @@ def _flush_pending() -> None: continue if event_type == IntermediateStepType.TOOL_END: + start_flag = delegation_flags_by_uuid.pop(ist.UUID, False) if pending is not None: - _record_observed_invocation(pending, ist) + _record_observed_invocation(pending, ist, start_flag=start_flag) else: orphan_pending = _PendingAgentTurn(message="", timestamp=ist.event_timestamp, model_name=None, metrics=None) orphan_pending.ancestry = _atif_ancestry_from_ist(ist) - _record_observed_invocation(orphan_pending, ist) + _record_observed_invocation(orphan_pending, ist, start_flag=start_flag) if not orphan_pending.observed_invocations: continue invocation = orphan_pending.observed_invocations[0] step_extra = _atif_step_extra_model_from_ist(ist) step_extra.tool_invocations = [invocation.invocation] step_extra.tool_ancestry = [invocation.ancestry] + observation_results = _build_observation_results([invocation], trajectory_session_id) extra = step_extra.model_dump(exclude_none=True) atif_steps.append( ATIFStep( @@ -415,28 +494,30 @@ def _flush_pending() -> None: message="", timestamp=_epoch_to_iso(ist.event_timestamp), tool_calls=[invocation.tool_call], - observation=ATIFObservation(results=[invocation.observation]), + observation=ATIFObservation(results=observation_results), extra=extra or None, )) step_id += 1 continue if event_type == IntermediateStepType.FUNCTION_END: + start_flag = delegation_flags_by_uuid.pop(ist.UUID, False) if pending is not None: - _record_observed_invocation(pending, ist) + _record_observed_invocation(pending, ist, start_flag=start_flag) else: orphan_pending = _PendingAgentTurn(message="", timestamp=ist.event_timestamp, model_name=None, metrics=None) orphan_pending.ancestry = _atif_ancestry_from_ist(ist) - _record_observed_invocation(orphan_pending, ist) + _record_observed_invocation(orphan_pending, ist, start_flag=start_flag) if not orphan_pending.observed_invocations: continue invocation = orphan_pending.observed_invocations[0] step_extra = _atif_step_extra_model_from_ist(ist) step_extra.tool_invocations = [invocation.invocation] step_extra.tool_ancestry = [invocation.ancestry] + observation_results = _build_observation_results([invocation], trajectory_session_id) extra = step_extra.model_dump(exclude_none=True) atif_steps.append( ATIFStep( @@ -445,7 +526,7 @@ def _flush_pending() -> None: message="", timestamp=_epoch_to_iso(ist.event_timestamp), tool_calls=[invocation.tool_call], - observation=ATIFObservation(results=[invocation.observation]), + observation=ATIFObservation(results=observation_results), extra=extra or None, )) step_id += 1 @@ -478,7 +559,7 @@ def _flush_pending() -> None: ) return ATIFTrajectory( - session_id=session_id or str(uuid.uuid4()), + session_id=trajectory_session_id, agent=agent_config, steps=atif_steps, final_metrics=final_metrics, @@ -495,6 +576,7 @@ class ATIFStreamConverter: def __init__(self, agent_name: str = "nat-agent"): self._step_id: int = 1 + self._session_id: str = str(uuid.uuid4()) self._agent_config = ATIFAgentConfig(name=agent_name, version="0.0.0") self._tool_defs_captured = False self._pending: _PendingAgentTurn | None = None @@ -502,6 +584,7 @@ def __init__(self, agent_name: str = "nat-agent"): self._total_prompt = 0 self._total_completion = 0 self._total_cached = 0 + self._delegation_flags_by_uuid: dict[str, bool] = {} @property def agent_config(self) -> ATIFAgentConfig: @@ -514,6 +597,9 @@ def push(self, ist: IntermediateStep) -> ATIFStep | None: category = ist.event_category state = ist.event_state + if state == IntermediateStepState.START and _extract_subagent_delegation_flag(ist.metadata): + self._delegation_flags_by_uuid[ist.UUID] = True + if event_type == IntermediateStepType.WORKFLOW_START: user_input = "" if ist.data and ist.data.input is not None: @@ -595,19 +681,21 @@ def push(self, ist: IntermediateStep) -> ATIFStep | None: return flushed if event_type == IntermediateStepType.TOOL_END: + start_flag = self._delegation_flags_by_uuid.pop(ist.UUID, False) if self._pending is not None: - _record_observed_invocation(self._pending, ist) + _record_observed_invocation(self._pending, ist, start_flag=start_flag) return None orphan_pending = _PendingAgentTurn(message="", timestamp=ist.event_timestamp, model_name=None, metrics=None) orphan_pending.ancestry = _atif_ancestry_from_ist(ist) - _record_observed_invocation(orphan_pending, ist) + _record_observed_invocation(orphan_pending, ist, start_flag=start_flag) if not orphan_pending.observed_invocations: return None invocation = orphan_pending.observed_invocations[0] step_extra = _atif_step_extra_model_from_ist(ist) step_extra.tool_invocations = [invocation.invocation] step_extra.tool_ancestry = [invocation.ancestry] + observation_results = _build_observation_results([invocation], self._session_id) extra = step_extra.model_dump(exclude_none=True) orphan_step = ATIFStep( step_id=self._step_id, @@ -615,7 +703,7 @@ def push(self, ist: IntermediateStep) -> ATIFStep | None: message="", timestamp=_epoch_to_iso(ist.event_timestamp), tool_calls=[invocation.tool_call], - observation=ATIFObservation(results=[invocation.observation]), + observation=ATIFObservation(results=observation_results), extra=extra or None, ) self._step_id += 1 @@ -623,19 +711,21 @@ def push(self, ist: IntermediateStep) -> ATIFStep | None: return orphan_step if event_type == IntermediateStepType.FUNCTION_END: + start_flag = self._delegation_flags_by_uuid.pop(ist.UUID, False) if self._pending is not None: - _record_observed_invocation(self._pending, ist) + _record_observed_invocation(self._pending, ist, start_flag=start_flag) return None orphan_pending = _PendingAgentTurn(message="", timestamp=ist.event_timestamp, model_name=None, metrics=None) orphan_pending.ancestry = _atif_ancestry_from_ist(ist) - _record_observed_invocation(orphan_pending, ist) + _record_observed_invocation(orphan_pending, ist, start_flag=start_flag) if not orphan_pending.observed_invocations: return None invocation = orphan_pending.observed_invocations[0] step_extra = _atif_step_extra_model_from_ist(ist) step_extra.tool_invocations = [invocation.invocation] step_extra.tool_ancestry = [invocation.ancestry] + observation_results = _build_observation_results([invocation], self._session_id) extra = step_extra.model_dump(exclude_none=True) orphan_step = ATIFStep( step_id=self._step_id, @@ -643,7 +733,7 @@ def push(self, ist: IntermediateStep) -> ATIFStep | None: message="", timestamp=_epoch_to_iso(ist.event_timestamp), tool_calls=[invocation.tool_call], - observation=ATIFObservation(results=[invocation.observation]), + observation=ATIFObservation(results=observation_results), extra=extra or None, ) self._step_id += 1 @@ -679,6 +769,7 @@ def get_trajectory(self) -> ATIFTrajectory: total_steps=agent_step_count, ) return ATIFTrajectory( + session_id=self._session_id, agent=self._agent_config, steps=list(self._emitted_steps), final_metrics=final_metrics, @@ -691,7 +782,7 @@ def _flush_pending(self) -> ATIFStep | None: pending = self._pending sorted_invocations = sorted(pending.observed_invocations, key=lambda i: i.order_key) tool_calls = [obs.tool_call for obs in sorted_invocations] or None - observations = [obs.observation for obs in sorted_invocations] + observations = _build_observation_results(sorted_invocations, self._session_id) observation = ATIFObservation(results=observations) if observations else None tool_ancestry = [obs.ancestry for obs in sorted_invocations] tool_invocations = [obs.invocation for obs in sorted_invocations] or None diff --git a/packages/nvidia_nat_eval/src/nat/plugins/eval/evaluator/atif_evaluator.py b/packages/nvidia_nat_eval/src/nat/plugins/eval/evaluator/atif_evaluator.py index c5b5720efa..339ab304cf 100644 --- a/packages/nvidia_nat_eval/src/nat/plugins/eval/evaluator/atif_evaluator.py +++ b/packages/nvidia_nat_eval/src/nat/plugins/eval/evaluator/atif_evaluator.py @@ -33,6 +33,11 @@ class AtifEvalSample(BaseModel): item_id: Any = Field(description="Identifier matching the source EvalInputItem.") trajectory: ATIFTrajectory = Field(description="Canonical ATIF trajectory.") + subagent_trajectories: dict[str, ATIFTrajectory] = Field( + default_factory=dict, + description=("In-memory child trajectories keyed by " + "`subagent_trajectory_ref.session_id`."), + ) expected_output_obj: Any = Field(default=None, description="Optional expected output reference.") output_obj: Any = Field(default=None, description="Optional workflow output reference.") metadata: dict[str, Any] = Field(default_factory=dict, description="Optional evaluator metadata.") diff --git a/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py b/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py index ed599fa20f..3be6cd0cae 100644 --- a/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py +++ b/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py @@ -21,6 +21,7 @@ from __future__ import annotations +import logging from collections.abc import Mapping from typing import Any @@ -31,6 +32,8 @@ from nat.plugins.eval.evaluator.atif_evaluator import AtifEvalSampleList from nat.utils.atif_converter import IntermediateStepToATIFConverter +logger = logging.getLogger(__name__) + class EvalAtifAdapter: """Build and cache ATIF trajectories for eval items.""" @@ -51,6 +54,116 @@ def _coerce_trajectory(self, value: Any) -> ATIFTrajectory: return ATIFTrajectory.model_validate(value) raise TypeError(f"Unsupported ATIF trajectory payload type: {type(value)}") + @staticmethod + def _step_ancestry_ids(step) -> tuple[str | None, str | None]: + """Return (function_id, parent_id) from a step's extra ancestry payload.""" + extra = step.extra or {} + if not isinstance(extra, dict): + return None, None + ancestry = extra.get("ancestry") + if not isinstance(ancestry, dict): + return None, None + function_id = ancestry.get("function_id") + parent_id = ancestry.get("parent_id") + if function_id is not None: + function_id = str(function_id) + if parent_id is not None: + parent_id = str(parent_id) + return function_id, parent_id + + @staticmethod + def _collect_subtree_function_ids(trajectory: ATIFTrajectory, root_function_id: str) -> set[str]: + """Collect function_ids in the ancestry subtree rooted at root_function_id.""" + parent_to_children: dict[str, set[str]] = {} + for step in trajectory.steps: + function_id, parent_id = EvalAtifAdapter._step_ancestry_ids(step) + if not function_id or not parent_id: + continue + parent_to_children.setdefault(parent_id, set()).add(function_id) + + subtree_ids: set[str] = set() + frontier = [root_function_id] + while frontier: + node = frontier.pop() + if node in subtree_ids: + continue + subtree_ids.add(node) + frontier.extend(parent_to_children.get(node, ())) + return subtree_ids + + @staticmethod + def _build_subtrajectory(trajectory: ATIFTrajectory, *, session_id: str, root_function_id: str) -> ATIFTrajectory | None: + """Build a child trajectory view for a delegated function subtree.""" + subtree_ids = EvalAtifAdapter._collect_subtree_function_ids(trajectory, root_function_id) + if not subtree_ids: + return None + + selected_steps = [] + for step in trajectory.steps: + function_id, _ = EvalAtifAdapter._step_ancestry_ids(step) + if function_id in subtree_ids: + selected_steps.append(step.model_copy(deep=True)) + if not selected_steps: + return None + + reindexed_steps = [step.model_copy(update={"step_id": idx}) for idx, step in enumerate(selected_steps, start=1)] + return ATIFTrajectory( + schema_version=trajectory.schema_version, + session_id=session_id, + agent=trajectory.agent.model_copy(deep=True), + steps=reindexed_steps, + notes=trajectory.notes, + extra={ + "parent_session_id": trajectory.session_id, + "root_function_id": root_function_id, + }, + ) + + def _build_subagent_trajectory_map(self, trajectory: ATIFTrajectory, *, item_id: Any) -> dict[str, ATIFTrajectory]: + """Build in-memory session_id -> trajectory map for subagent references.""" + by_session_id: dict[str, ATIFTrajectory] = {} + unresolved: list[str] = [] + + for step in trajectory.steps: + if step.observation is None: + continue + for result in step.observation.results: + refs = result.subagent_trajectory_ref or [] + for ref in refs: + session_id = ref.session_id + if not session_id: + continue + if session_id in by_session_id: + continue + root_function_id = None + if isinstance(ref.extra, dict): + child_function_id = ref.extra.get("child_function_id") + if child_function_id is not None: + root_function_id = str(child_function_id) + child_traj = (self._build_subtrajectory( + trajectory, + session_id=session_id, + root_function_id=root_function_id, + ) if root_function_id else None) + if child_traj is None: + unresolved.append(session_id) + # Degraded fallback keeps refs resolvable in-memory. + child_traj = trajectory.model_copy( + update={ + "session_id": session_id, + "extra": { + "parent_session_id": trajectory.session_id, + "resolution_mode": "fallback_full_trajectory", + }, + }, + deep=True, + ) + by_session_id[session_id] = child_traj + + if unresolved: + logger.warning("ATIF subagent map fallback for item_id=%s; unresolved refs=%s", item_id, unresolved) + return by_session_id + def get_trajectory(self, item: EvalInputItem, prebuilt: ATIFTrajectory | Mapping[str, Any] | None = None) -> ATIFTrajectory: @@ -89,10 +202,12 @@ def build_samples( samples: AtifEvalSampleList = [] for item in eval_input.eval_input_items: trajectory = self._cache[self._cache_key(item.id)] + subagent_map = self._build_subagent_trajectory_map(trajectory, item_id=item.id) samples.append( AtifEvalSample( item_id=item.id, trajectory=trajectory, + subagent_trajectories=subagent_map, expected_output_obj=item.expected_output_obj, output_obj=item.output_obj, metadata={}, diff --git a/packages/nvidia_nat_eval/tests/eval/test_atif_adapter.py b/packages/nvidia_nat_eval/tests/eval/test_atif_adapter.py index f56334c4cf..e8a277b8a2 100644 --- a/packages/nvidia_nat_eval/tests/eval/test_atif_adapter.py +++ b/packages/nvidia_nat_eval/tests/eval/test_atif_adapter.py @@ -73,3 +73,59 @@ def test_build_samples_uses_prebuilt_trajectory_without_conversion(): assert len(samples) == 1 assert samples[0].trajectory.agent.name == "prebuilt-agent" assert samples[0].item_id == "sample-a" + + +def test_build_samples_populates_in_memory_subagent_map(): + converter = _CountingConverter() + adapter = EvalAtifAdapter(converter=converter) + item = _make_eval_input_item("sample-b") + eval_input = EvalInput(eval_input_items=[item]) + prebuilt = ATIFTrajectory.model_validate({ + "session_id": "sample-b", + "agent": { + "name": "prebuilt-agent", + "version": "0.0.0", + }, + "steps": [ + { + "step_id": 1, + "source": "agent", + "message": "", + "tool_calls": [{ + "tool_call_id": "call_parent", + "function_name": "delegator", + "arguments": {}, + }], + "observation": { + "results": [{ + "source_call_id": "call_parent", + "content": "delegated", + "subagent_trajectory_ref": [{ + "session_id": "sample-b:call_child", + "trajectory_path": None, + "extra": { + "child_function_id": "child-fn", + }, + }], + }], + }, + "extra": { + "ancestry": { + "function_id": "child-fn", + "parent_id": "parent-fn", + "function_name": "delegator", + }, + }, + } + ], + }) + + samples = adapter.build_samples(eval_input, prebuilt_trajectories={"sample-b": prebuilt}) + + assert len(samples) == 1 + sub_map = samples[0].subagent_trajectories + assert isinstance(sub_map, dict) + assert "sample-b:call_child" in sub_map + child = sub_map["sample-b:call_child"] + assert child.session_id == "sample-b:call_child" + assert child.steps[0].step_id == 1 From 34831864a00a12d6be64a51011012ce8fb79fe44 Mon Sep 17 00:00:00 2001 From: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> Date: Tue, 7 Apr 2026 12:36:59 -0700 Subject: [PATCH 02/13] Turn implicit package off by default Signed-off-by: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> --- .../src/nat/utils/atif_converter.py | 54 ++++++++++++--- .../tests/nat/utils/test_atif_converter.py | 68 +++++++++++++++++++ .../nat/plugins/eval/runtime/atif_adapter.py | 8 ++- 3 files changed, 117 insertions(+), 13 deletions(-) diff --git a/packages/nvidia_nat_core/src/nat/utils/atif_converter.py b/packages/nvidia_nat_core/src/nat/utils/atif_converter.py index 4bdbb12bd2..309e2763cd 100644 --- a/packages/nvidia_nat_core/src/nat/utils/atif_converter.py +++ b/packages/nvidia_nat_core/src/nat/utils/atif_converter.py @@ -261,12 +261,16 @@ def _record_observed_invocation(pending: _PendingAgentTurn, )) -def _build_observation_results(observed: list[_ObservedInvocation], trajectory_session_id: str) -> list[ATIFObservationResult]: +def _build_observation_results(observed: list[_ObservedInvocation], + trajectory_session_id: str, + *, + allow_implicit_delegation: bool) -> list[ATIFObservationResult]: """Build observation rows and attach in-memory subagent refs when detected. A wrapper/delegation pattern is inferred when: - - either the wrapper invocation is explicitly flagged as subagent delegation, or - - the wrapper row appears to represent delegation based on existing lineage + - the wrapper invocation is explicitly flagged as subagent delegation, or + - implicit delegation inference is enabled and the wrapper row appears to + represent delegation based on existing lineage (tool call name differs from ancestry function name), and - another invocation in the same ATIF step has: - ancestry.function_name equal to the wrapper tool call name, and @@ -280,7 +284,7 @@ def _build_observation_results(observed: list[_ObservedInvocation], trajectory_s for i, wrapper in enumerate(observed): wrapper_call_name = wrapper.tool_call.function_name wrapper_ancestry = wrapper.ancestry - inferred_delegation = wrapper_ancestry.function_name != wrapper_call_name + inferred_delegation = allow_implicit_delegation and wrapper_ancestry.function_name != wrapper_call_name if not (wrapper.is_subagent_delegation or inferred_delegation): continue @@ -322,6 +326,9 @@ def _build_observation_results(observed: list[_ObservedInvocation], trajectory_s class IntermediateStepToATIFConverter: """Convert a complete list of NAT IntermediateSteps to an ATIF trajectory.""" + def __init__(self, *, allow_implicit_subagent_delegation: bool = False) -> None: + self._allow_implicit_subagent_delegation = allow_implicit_subagent_delegation + def convert( self, steps: list[IntermediateStep], @@ -355,7 +362,11 @@ def _flush_pending() -> None: return sorted_invocations = sorted(pending.observed_invocations, key=lambda i: i.order_key) tool_calls = [obs.tool_call for obs in sorted_invocations] or None - observations = _build_observation_results(sorted_invocations, trajectory_session_id) + observations = _build_observation_results( + sorted_invocations, + trajectory_session_id, + allow_implicit_delegation=self._allow_implicit_subagent_delegation, + ) observation = ATIFObservation(results=observations) if observations else None tool_ancestry = [obs.ancestry for obs in sorted_invocations] tool_invocations = [obs.invocation for obs in sorted_invocations] or None @@ -485,7 +496,11 @@ def _flush_pending() -> None: step_extra = _atif_step_extra_model_from_ist(ist) step_extra.tool_invocations = [invocation.invocation] step_extra.tool_ancestry = [invocation.ancestry] - observation_results = _build_observation_results([invocation], trajectory_session_id) + observation_results = _build_observation_results( + [invocation], + trajectory_session_id, + allow_implicit_delegation=self._allow_implicit_subagent_delegation, + ) extra = step_extra.model_dump(exclude_none=True) atif_steps.append( ATIFStep( @@ -517,7 +532,11 @@ def _flush_pending() -> None: step_extra = _atif_step_extra_model_from_ist(ist) step_extra.tool_invocations = [invocation.invocation] step_extra.tool_ancestry = [invocation.ancestry] - observation_results = _build_observation_results([invocation], trajectory_session_id) + observation_results = _build_observation_results( + [invocation], + trajectory_session_id, + allow_implicit_delegation=self._allow_implicit_subagent_delegation, + ) extra = step_extra.model_dump(exclude_none=True) atif_steps.append( ATIFStep( @@ -574,10 +593,11 @@ def _flush_pending() -> None: class ATIFStreamConverter: """Stateful converter that emits ATIF steps incrementally.""" - def __init__(self, agent_name: str = "nat-agent"): + def __init__(self, agent_name: str = "nat-agent", *, allow_implicit_subagent_delegation: bool = False): self._step_id: int = 1 self._session_id: str = str(uuid.uuid4()) self._agent_config = ATIFAgentConfig(name=agent_name, version="0.0.0") + self._allow_implicit_subagent_delegation = allow_implicit_subagent_delegation self._tool_defs_captured = False self._pending: _PendingAgentTurn | None = None self._emitted_steps: list[ATIFStep] = [] @@ -695,7 +715,11 @@ def push(self, ist: IntermediateStep) -> ATIFStep | None: step_extra = _atif_step_extra_model_from_ist(ist) step_extra.tool_invocations = [invocation.invocation] step_extra.tool_ancestry = [invocation.ancestry] - observation_results = _build_observation_results([invocation], self._session_id) + observation_results = _build_observation_results( + [invocation], + self._session_id, + allow_implicit_delegation=self._allow_implicit_subagent_delegation, + ) extra = step_extra.model_dump(exclude_none=True) orphan_step = ATIFStep( step_id=self._step_id, @@ -725,7 +749,11 @@ def push(self, ist: IntermediateStep) -> ATIFStep | None: step_extra = _atif_step_extra_model_from_ist(ist) step_extra.tool_invocations = [invocation.invocation] step_extra.tool_ancestry = [invocation.ancestry] - observation_results = _build_observation_results([invocation], self._session_id) + observation_results = _build_observation_results( + [invocation], + self._session_id, + allow_implicit_delegation=self._allow_implicit_subagent_delegation, + ) extra = step_extra.model_dump(exclude_none=True) orphan_step = ATIFStep( step_id=self._step_id, @@ -782,7 +810,11 @@ def _flush_pending(self) -> ATIFStep | None: pending = self._pending sorted_invocations = sorted(pending.observed_invocations, key=lambda i: i.order_key) tool_calls = [obs.tool_call for obs in sorted_invocations] or None - observations = _build_observation_results(sorted_invocations, self._session_id) + observations = _build_observation_results( + sorted_invocations, + self._session_id, + allow_implicit_delegation=self._allow_implicit_subagent_delegation, + ) observation = ATIFObservation(results=observations) if observations else None tool_ancestry = [obs.ancestry for obs in sorted_invocations] tool_invocations = [obs.invocation for obs in sorted_invocations] or None diff --git a/packages/nvidia_nat_core/tests/nat/utils/test_atif_converter.py b/packages/nvidia_nat_core/tests/nat/utils/test_atif_converter.py index a3a601b84b..1c598fc2cf 100644 --- a/packages/nvidia_nat_core/tests/nat/utils/test_atif_converter.py +++ b/packages/nvidia_nat_core/tests/nat/utils/test_atif_converter.py @@ -943,6 +943,74 @@ def test_converter_ignores_non_exported_events(self, batch_converter: Intermedia assert result.steps[1].message == "final" assert result.steps[1].tool_calls is None + def test_implicit_subagent_delegation_is_disabled_by_default(self): + """Implicit delegation inference is opt-in and disabled by default.""" + converter = IntermediateStepToATIFConverter() + steps = [ + _make_step(IntermediateStepType.WORKFLOW_START, input_data="q", timestamp_offset=0.0), + _make_step(IntermediateStepType.LLM_END, output_data="thinking", timestamp_offset=1.0), + _make_step( + IntermediateStepType.FUNCTION_END, + name="child_agent", + timestamp_offset=2.0, + function_name="parent_agent", + function_id="wrapper-fn", + function_parent_id="root", + step_uuid="wrapper-step", + ), + _make_step( + IntermediateStepType.FUNCTION_END, + name="inner_tool", + timestamp_offset=3.0, + function_name="child_agent", + function_id="child-fn", + function_parent_id="wrapper-fn", + step_uuid="child-step", + ), + _make_step(IntermediateStepType.LLM_END, output_data="done", timestamp_offset=4.0), + _make_step(IntermediateStepType.WORKFLOW_END, output_data="done", timestamp_offset=5.0), + ] + + result = converter.convert(steps) + first_agent_turn = result.steps[1] + assert first_agent_turn.observation is not None + assert first_agent_turn.observation.results[0].subagent_trajectory_ref is None + + def test_implicit_subagent_delegation_can_be_enabled(self): + """Implicit delegation inference can be enabled explicitly.""" + converter = IntermediateStepToATIFConverter(allow_implicit_subagent_delegation=True) + steps = [ + _make_step(IntermediateStepType.WORKFLOW_START, input_data="q", timestamp_offset=0.0), + _make_step(IntermediateStepType.LLM_END, output_data="thinking", timestamp_offset=1.0), + _make_step( + IntermediateStepType.FUNCTION_END, + name="child_agent", + timestamp_offset=2.0, + function_name="parent_agent", + function_id="wrapper-fn", + function_parent_id="root", + step_uuid="wrapper-step", + ), + _make_step( + IntermediateStepType.FUNCTION_END, + name="inner_tool", + timestamp_offset=3.0, + function_name="child_agent", + function_id="child-fn", + function_parent_id="wrapper-fn", + step_uuid="child-step", + ), + _make_step(IntermediateStepType.LLM_END, output_data="done", timestamp_offset=4.0), + _make_step(IntermediateStepType.WORKFLOW_END, output_data="done", timestamp_offset=5.0), + ] + + result = converter.convert(steps) + first_agent_turn = result.steps[1] + assert first_agent_turn.observation is not None + refs = first_agent_turn.observation.results[0].subagent_trajectory_ref + assert refs is not None + assert len(refs) == 1 + # --------------------------------------------------------------------------- # Stream converter tests diff --git a/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py b/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py index 3be6cd0cae..cefdf8d649 100644 --- a/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py +++ b/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py @@ -38,8 +38,12 @@ class EvalAtifAdapter: """Build and cache ATIF trajectories for eval items.""" - def __init__(self, converter: IntermediateStepToATIFConverter | None = None) -> None: - self._converter = converter or IntermediateStepToATIFConverter() + def __init__(self, + converter: IntermediateStepToATIFConverter | None = None, + *, + allow_implicit_subagent_delegation: bool = False) -> None: + self._converter = converter or IntermediateStepToATIFConverter( + allow_implicit_subagent_delegation=allow_implicit_subagent_delegation) self._cache: dict[str, ATIFTrajectory] = {} @staticmethod From af59334116ef1ad88a26130869b9578c76f6be10 Mon Sep 17 00:00:00 2001 From: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> Date: Wed, 8 Apr 2026 10:18:21 -0700 Subject: [PATCH 03/13] Refactor ATIF converter Signed-off-by: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> --- .../src/nat/utils/atif_converter.py | 806 +++++++----------- .../tests/nat/utils/test_atif_converter.py | 209 +---- .../nat/plugins/eval/runtime/atif_adapter.py | 15 +- .../tests/eval/test_atif_adapter.py | 63 +- 4 files changed, 399 insertions(+), 694 deletions(-) diff --git a/packages/nvidia_nat_core/src/nat/utils/atif_converter.py b/packages/nvidia_nat_core/src/nat/utils/atif_converter.py index 309e2763cd..7de569f3c3 100644 --- a/packages/nvidia_nat_core/src/nat/utils/atif_converter.py +++ b/packages/nvidia_nat_core/src/nat/utils/atif_converter.py @@ -46,7 +46,6 @@ from nat.atif import ATIFTrajectory from nat.atif import SubagentTrajectoryRef from nat.data_models.intermediate_step import IntermediateStep -from nat.data_models.intermediate_step import IntermediateStepCategory from nat.data_models.intermediate_step import IntermediateStepState from nat.data_models.intermediate_step import IntermediateStepType from nat.data_models.intermediate_step import TraceMetadata @@ -220,6 +219,15 @@ class _ObservedInvocation: is_subagent_delegation: bool +@dataclass +class _ExecutionStructure: + """Pass-1 execution structure extracted from IST.""" + + root_events: list[IntermediateStep] + child_events_by_session: dict[str, list[IntermediateStep]] + subagent_ref_by_call_id: dict[str, SubagentTrajectoryRef] + + class _PendingAgentTurn: """Accumulator for an in-progress ATIF agent turn.""" @@ -234,10 +242,7 @@ def __init__(self, message: str, timestamp: float, model_name: str | None, metri self.observed_invocations: list[_ObservedInvocation] = [] -def _record_observed_invocation(pending: _PendingAgentTurn, - ist: IntermediateStep, - *, - start_flag: bool = False) -> None: +def _record_observed_invocation(pending: _PendingAgentTurn, ist: IntermediateStep, *, start_flag: bool = False) -> None: """Record an observed invocation as a tool_call + observation pair.""" tool_name = ist.name or "unknown_tool" if tool_name == "": @@ -261,61 +266,241 @@ def _record_observed_invocation(pending: _PendingAgentTurn, )) -def _build_observation_results(observed: list[_ObservedInvocation], - trajectory_session_id: str, - *, - allow_implicit_delegation: bool) -> list[ATIFObservationResult]: - """Build observation rows and attach in-memory subagent refs when detected. - - A wrapper/delegation pattern is inferred when: - - the wrapper invocation is explicitly flagged as subagent delegation, or - - implicit delegation inference is enabled and the wrapper row appears to - represent delegation based on existing lineage - (tool call name differs from ancestry function name), and - - another invocation in the same ATIF step has: - - ancestry.function_name equal to the wrapper tool call name, and - - ancestry.parent_id equal to the wrapper ancestry.function_id. - """ +def _build_flat_observation_rows(observed: list[_ObservedInvocation], + *, + subagent_ref_by_call_id: dict[str, SubagentTrajectoryRef] | None = None + ) -> list[ATIFObservationResult]: + """Build observation rows and attach explicit subagent refs when available.""" results = [ - ATIFObservationResult(source_call_id=obs.tool_call.tool_call_id, content=obs.tool_output) - for obs in observed + ATIFObservationResult(source_call_id=obs.tool_call.tool_call_id, content=obs.tool_output) for obs in observed ] - for i, wrapper in enumerate(observed): - wrapper_call_name = wrapper.tool_call.function_name - wrapper_ancestry = wrapper.ancestry - inferred_delegation = allow_implicit_delegation and wrapper_ancestry.function_name != wrapper_call_name - if not (wrapper.is_subagent_delegation or inferred_delegation): - continue + if not subagent_ref_by_call_id: + return results - refs: list[SubagentTrajectoryRef] = [] - seen_ref_session_ids: set[str] = set() - for child in observed: - child_ancestry = child.ancestry - if child_ancestry.function_name != wrapper_call_name: - continue - if child_ancestry.parent_id != wrapper_ancestry.function_id: - continue + for i, obs in enumerate(observed): + ref = subagent_ref_by_call_id.get(obs.tool_call.tool_call_id) + if ref is not None: + results[i].subagent_trajectory_ref = [ref] - ref_session_id = f"{trajectory_session_id}:{child.tool_call.tool_call_id}" - if ref_session_id in seen_ref_session_ids: - continue - seen_ref_session_ids.add(ref_session_id) - refs.append( - SubagentTrajectoryRef( - session_id=ref_session_id, - extra={ - "parent_tool_call_id": wrapper.tool_call.tool_call_id, - "child_tool_call_id": child.tool_call.tool_call_id, - "child_function_id": child_ancestry.function_id, - "child_function_name": child_ancestry.function_name, - }, + return results + + +def _pass2_project_context_to_steps( + events: list[IntermediateStep], + *, + step_id_start: int, + include_workflow_start_user_step: bool, + subagent_ref_by_call_id: dict[str, SubagentTrajectoryRef] | None = None, +) -> tuple[list[ATIFStep], int, int, int, int]: + """Pass-2 projection for one context from IST events to ATIF steps.""" + atif_steps: list[ATIFStep] = [] + step_id = step_id_start + pending: _PendingAgentTurn | None = None + total_prompt = 0 + total_completion = 0 + total_cached = 0 + + def _flush_pending() -> None: + nonlocal step_id, pending + if pending is None: + return + sorted_invocations = sorted(pending.observed_invocations, key=lambda i: i.order_key) + tool_calls = [obs.tool_call for obs in sorted_invocations] or None + observations = _build_flat_observation_rows( + sorted_invocations, + subagent_ref_by_call_id=subagent_ref_by_call_id, + ) + observation = ATIFObservation(results=observations) if observations else None + tool_ancestry = [obs.ancestry for obs in sorted_invocations] + tool_invocations = [obs.invocation for obs in sorted_invocations] or None + if pending.ancestry is None: + raise ValueError("Pending ATIF step is missing required ancestry metadata") + step_extra = AtifStepExtra( + ancestry=pending.ancestry, + invocation=pending.invocation, + tool_ancestry=tool_ancestry, + tool_invocations=tool_invocations, + **pending.extra, + ) + atif_steps.append( + ATIFStep( + step_id=step_id, + source="agent", + message=pending.message, + timestamp=_epoch_to_iso(pending.timestamp), + model_name=pending.model_name, + tool_calls=tool_calls, + observation=observation, + metrics=pending.metrics, + extra=step_extra.model_dump(exclude_none=True), + )) + step_id += 1 + pending = None + + for ist in events: + event_type = ist.event_type + state = ist.event_state + + if include_workflow_start_user_step and event_type == IntermediateStepType.WORKFLOW_START: + user_input = "" + if ist.data and ist.data.input is not None: + user_input = _extract_user_input(ist.data.input) + step_extra = _atif_step_extra_model_from_ist(ist) + extra = step_extra.model_dump(exclude_none=True) + atif_steps.append( + ATIFStep( + step_id=step_id, + source="user", + message=user_input, + timestamp=_epoch_to_iso(ist.event_timestamp), + extra=extra or None, )) + step_id += 1 + continue - if refs: - results[i].subagent_trajectory_ref = refs + if event_type == IntermediateStepType.LLM_END: + _flush_pending() + llm_output = "" + if ist.data and ist.data.output is not None: + llm_output = _safe_str(ist.data.output) + metrics = _extract_metrics(ist) + if metrics: + total_prompt += metrics.prompt_tokens or 0 + total_completion += metrics.completion_tokens or 0 + total_cached += metrics.cached_tokens or 0 + pending = _PendingAgentTurn( + message=llm_output, + timestamp=ist.event_timestamp, + model_name=ist.name, + metrics=metrics, + ) + pending.ancestry = _atif_ancestry_from_ist(ist) + pending.invocation = _atif_invocation_from_ist(ist) + continue - return results + if event_type in (IntermediateStepType.TOOL_END, IntermediateStepType.FUNCTION_END): + if pending is None: + pending = _PendingAgentTurn( + message="", + timestamp=ist.event_timestamp, + model_name=None, + metrics=None, + ) + pending.ancestry = _atif_ancestry_from_ist(ist) + pending.invocation = _atif_invocation_from_ist(ist) + _record_observed_invocation(pending, ist) + continue + + if event_type == IntermediateStepType.WORKFLOW_END: + _flush_pending() + final_output = "" + if ist.data and ist.data.output is not None: + final_output = _safe_str(ist.data.output) + last_agent_msg = "" + last_agent_ts: float | None = None + for s in reversed(atif_steps): + if s.source == "agent": + last_agent_msg = str(s.message) + last_agent_ts = _iso_to_epoch(s.timestamp) if s.timestamp else None + break + should_emit_terminal_step = bool(final_output) and (final_output != last_agent_msg or + (last_agent_ts is not None + and ist.event_timestamp > last_agent_ts)) + if should_emit_terminal_step: + step_extra = _atif_step_extra_model_from_ist(ist) + extra = step_extra.model_dump(exclude_none=True) + atif_steps.append( + ATIFStep( + step_id=step_id, + source="agent", + message=final_output, + timestamp=_epoch_to_iso(ist.event_timestamp), + extra=extra or None, + )) + step_id += 1 + continue + + if state == IntermediateStepState.START: + continue + if event_type in (IntermediateStepType.LLM_NEW_TOKEN, IntermediateStepType.SPAN_CHUNK): + continue + if state == IntermediateStepState.END: + continue + + _flush_pending() + return atif_steps, step_id, total_prompt, total_completion, total_cached + + +def _pass1_build_execution_structure(sorted_steps: list[IntermediateStep], *, + session_id: str) -> _ExecutionStructure: + """Build root and child context ownership from IST events.""" + delegation_flags_by_uuid: dict[str, bool] = {} + for ist in sorted_steps: + if ist.event_state == IntermediateStepState.START and _extract_subagent_delegation_flag(ist.metadata): + delegation_flags_by_uuid[ist.UUID] = True + + end_events = [s for s in sorted_steps if s.event_state == IntermediateStepState.END] + children_by_parent: dict[str, list[IntermediateStep]] = {} + for e in end_events: + pid = e.function_ancestry.parent_id + if pid: + children_by_parent.setdefault(pid, []).append(e) + + wrapper_events: list[IntermediateStep] = [] + for e in end_events: + if e.event_type not in (IntermediateStepType.TOOL_END, IntermediateStepType.FUNCTION_END): + continue + if _extract_subagent_delegation_flag(e.metadata) or delegation_flags_by_uuid.get(e.UUID, False): + wrapper_events.append(e) + + child_session_by_wrapper_call_id: dict[str, str] = {} + child_events_by_session: dict[str, list[IntermediateStep]] = {} + delegated_function_ids: set[str] = set() + + for wrapper in wrapper_events: + wrapper_call_id = f"call_{wrapper.UUID}" + wrapper_fn_id = wrapper.function_ancestry.function_id + direct_children = children_by_parent.get(wrapper_fn_id, []) + preferred_roots = [c for c in direct_children if c.function_ancestry.function_name == (wrapper.name or "") + ] or direct_children + if not preferred_roots: + continue + child_root = sorted(preferred_roots, key=lambda s: s.event_timestamp)[0] + child_root_fn_id = child_root.function_ancestry.function_id + + subtree_ids: set[str] = set() + frontier = [child_root_fn_id] + while frontier: + node = frontier.pop() + if node in subtree_ids: + continue + subtree_ids.add(node) + for child in children_by_parent.get(node, []): + frontier.append(child.function_ancestry.function_id) + + child_events = [e for e in end_events if e.function_ancestry.function_id in subtree_ids and e.UUID != wrapper.UUID] + if not child_events: + continue + child_session_id = f"{session_id}:{wrapper_call_id}" + child_session_by_wrapper_call_id[wrapper_call_id] = child_session_id + child_events_by_session[child_session_id] = sorted(child_events, key=lambda s: s.event_timestamp) + delegated_function_ids.update(subtree_ids) + + root_events = [ + e for e in sorted_steps + if (e.event_type == IntermediateStepType.WORKFLOW_START or e.event_type == IntermediateStepType.WORKFLOW_END + or e.function_ancestry.function_id not in delegated_function_ids) + ] + subagent_ref_by_call_id = { + call_id: SubagentTrajectoryRef(session_id=child_sid) + for call_id, child_sid in child_session_by_wrapper_call_id.items() + } + return _ExecutionStructure( + root_events=root_events, + child_events_by_session=child_events_by_session, + subagent_ref_by_call_id=subagent_ref_by_call_id, + ) # --------------------------------------------------------------------------- @@ -327,6 +512,7 @@ class IntermediateStepToATIFConverter: """Convert a complete list of NAT IntermediateSteps to an ATIF trajectory.""" def __init__(self, *, allow_implicit_subagent_delegation: bool = False) -> None: + # Legacy option retained for API compatibility; clean converter ignores implicit delegation. self._allow_implicit_subagent_delegation = allow_implicit_subagent_delegation def convert( @@ -345,227 +531,46 @@ def convert( ) sorted_steps = sorted(steps, key=lambda s: s.event_timestamp) - atif_steps: list[ATIFStep] = [] - step_id = 1 - agent_config = ATIFAgentConfig(name=agent_name or "nat-agent", version="0.0.0") - tool_defs_captured = False - pending: _PendingAgentTurn | None = None - total_prompt = 0 - total_completion = 0 - total_cached = 0 - delegation_flags_by_uuid: dict[str, bool] = {} - - def _flush_pending() -> None: - nonlocal step_id, pending - if pending is None: - return - sorted_invocations = sorted(pending.observed_invocations, key=lambda i: i.order_key) - tool_calls = [obs.tool_call for obs in sorted_invocations] or None - observations = _build_observation_results( - sorted_invocations, - trajectory_session_id, - allow_implicit_delegation=self._allow_implicit_subagent_delegation, - ) - observation = ATIFObservation(results=observations) if observations else None - tool_ancestry = [obs.ancestry for obs in sorted_invocations] - tool_invocations = [obs.invocation for obs in sorted_invocations] or None - if pending.ancestry is None: - raise ValueError("Pending agent turn is missing required ATIF ancestry metadata") - step_extra = AtifStepExtra( - ancestry=pending.ancestry, - invocation=pending.invocation, - tool_ancestry=tool_ancestry, - tool_invocations=tool_invocations, - **pending.extra, - ) - atif_steps.append( - ATIFStep( - step_id=step_id, - source="agent", - message=pending.message, - timestamp=_epoch_to_iso(pending.timestamp), - model_name=pending.model_name, - tool_calls=tool_calls, - observation=observation, - metrics=pending.metrics, - extra=step_extra.model_dump(exclude_none=True), - )) - step_id += 1 - pending = None - for ist in sorted_steps: - event_type = ist.event_type - category = ist.event_category - state = ist.event_state - - if state == IntermediateStepState.START and _extract_subagent_delegation_flag(ist.metadata): - delegation_flags_by_uuid[ist.UUID] = True - - if event_type == IntermediateStepType.WORKFLOW_START: - user_input = "" - if ist.data and ist.data.input is not None: - user_input = _extract_user_input(ist.data.input) - if agent_name is None: + if agent_name is None: + for ist in sorted_steps: + if ist.event_type == IntermediateStepType.WORKFLOW_START: fn_name = ist.function_ancestry.function_name if fn_name and fn_name != "root": agent_config.name = fn_name - step_extra = _atif_step_extra_model_from_ist(ist) - extra = step_extra.model_dump(exclude_none=True) - atif_steps.append( - ATIFStep( - step_id=step_id, - source="user", - message=user_input, - timestamp=_epoch_to_iso(ist.event_timestamp), - extra=extra or None, - )) - step_id += 1 - continue - - if event_type == IntermediateStepType.WORKFLOW_END: - _flush_pending() - final_output = "" - if ist.data and ist.data.output is not None: - final_output = _safe_str(ist.data.output) - last_agent_msg = "" - last_agent_ts: float | None = None - for s in reversed(atif_steps): - if s.source == "agent": - last_agent_msg = str(s.message) - last_agent_ts = _iso_to_epoch(s.timestamp) if s.timestamp else None break - should_emit_terminal_step = bool(final_output) and (final_output != last_agent_msg or - (last_agent_ts is not None - and ist.event_timestamp > last_agent_ts)) - if should_emit_terminal_step: - step_extra = _atif_step_extra_model_from_ist(ist) - extra = step_extra.model_dump(exclude_none=True) - atif_steps.append( - ATIFStep( - step_id=step_id, - source="agent", - message=final_output, - timestamp=_epoch_to_iso(ist.event_timestamp), - extra=extra or None, - )) - step_id += 1 - continue - if event_type == IntermediateStepType.LLM_END: - _flush_pending() - llm_output = "" - if ist.data and ist.data.output is not None: - llm_output = _safe_str(ist.data.output) - metrics = _extract_metrics(ist) - if metrics: - total_prompt += metrics.prompt_tokens or 0 - total_completion += metrics.completion_tokens or 0 - total_cached += metrics.cached_tokens or 0 - if not tool_defs_captured: - defs = _extract_tool_definitions(ist) - if defs: - agent_config.tool_definitions = defs - tool_defs_captured = True + for ist in sorted_steps: + if ist.event_type == IntermediateStepType.LLM_END: if ist.name and not agent_config.model_name: agent_config.model_name = ist.name - pending = _PendingAgentTurn( - message=llm_output, - timestamp=ist.event_timestamp, - model_name=ist.name, - metrics=metrics, - ) - pending.ancestry = _atif_ancestry_from_ist(ist) - pending.invocation = _atif_invocation_from_ist(ist) - continue - - if event_type == IntermediateStepType.TOOL_END: - start_flag = delegation_flags_by_uuid.pop(ist.UUID, False) - if pending is not None: - _record_observed_invocation(pending, ist, start_flag=start_flag) - else: - orphan_pending = _PendingAgentTurn(message="", - timestamp=ist.event_timestamp, - model_name=None, - metrics=None) - orphan_pending.ancestry = _atif_ancestry_from_ist(ist) - _record_observed_invocation(orphan_pending, ist, start_flag=start_flag) - if not orphan_pending.observed_invocations: - continue - invocation = orphan_pending.observed_invocations[0] - step_extra = _atif_step_extra_model_from_ist(ist) - step_extra.tool_invocations = [invocation.invocation] - step_extra.tool_ancestry = [invocation.ancestry] - observation_results = _build_observation_results( - [invocation], - trajectory_session_id, - allow_implicit_delegation=self._allow_implicit_subagent_delegation, - ) - extra = step_extra.model_dump(exclude_none=True) - atif_steps.append( - ATIFStep( - step_id=step_id, - source="agent", - message="", - timestamp=_epoch_to_iso(ist.event_timestamp), - tool_calls=[invocation.tool_call], - observation=ATIFObservation(results=observation_results), - extra=extra or None, - )) - step_id += 1 - continue - - if event_type == IntermediateStepType.FUNCTION_END: - start_flag = delegation_flags_by_uuid.pop(ist.UUID, False) - if pending is not None: - _record_observed_invocation(pending, ist, start_flag=start_flag) - else: - orphan_pending = _PendingAgentTurn(message="", - timestamp=ist.event_timestamp, - model_name=None, - metrics=None) - orphan_pending.ancestry = _atif_ancestry_from_ist(ist) - _record_observed_invocation(orphan_pending, ist, start_flag=start_flag) - if not orphan_pending.observed_invocations: - continue - invocation = orphan_pending.observed_invocations[0] - step_extra = _atif_step_extra_model_from_ist(ist) - step_extra.tool_invocations = [invocation.invocation] - step_extra.tool_ancestry = [invocation.ancestry] - observation_results = _build_observation_results( - [invocation], - trajectory_session_id, - allow_implicit_delegation=self._allow_implicit_subagent_delegation, - ) - extra = step_extra.model_dump(exclude_none=True) - atif_steps.append( - ATIFStep( - step_id=step_id, - source="agent", - message="", - timestamp=_epoch_to_iso(ist.event_timestamp), - tool_calls=[invocation.tool_call], - observation=ATIFObservation(results=observation_results), - extra=extra or None, - )) - step_id += 1 - continue - - if state == IntermediateStepState.START: - continue - if event_type == IntermediateStepType.LLM_NEW_TOKEN: - continue - if event_type == IntermediateStepType.SPAN_CHUNK: - continue - - if state == IntermediateStepState.END and category not in ( - IntermediateStepCategory.LLM, - IntermediateStepCategory.TOOL, - IntermediateStepCategory.WORKFLOW, - ): - continue + defs = _extract_tool_definitions(ist) + if defs and not agent_config.tool_definitions: + agent_config.tool_definitions = defs + + execution_structure = _pass1_build_execution_structure(sorted_steps, session_id=trajectory_session_id) + atif_steps, _, total_prompt, total_completion, total_cached = _pass2_project_context_to_steps( + execution_structure.root_events, + step_id_start=1, + include_workflow_start_user_step=True, + subagent_ref_by_call_id=execution_structure.subagent_ref_by_call_id, + ) - _flush_pending() + child_trajectories: dict[str, dict[str, Any]] = {} + for child_session_id, child_events in execution_structure.child_events_by_session.items(): + child_steps, _, _, _, _ = _pass2_project_context_to_steps( + child_events, + step_id_start=1, + include_workflow_start_user_step=False, + subagent_ref_by_call_id=None, + ) + child_trajectory = ATIFTrajectory( + session_id=child_session_id, + agent=agent_config.model_copy(deep=True), + steps=child_steps, + ) + child_trajectories[child_session_id] = child_trajectory.model_dump(exclude_none=True, mode="json") final_metrics = None agent_step_count = sum(1 for s in atif_steps if s.source == "agent") @@ -577,11 +582,13 @@ def _flush_pending() -> None: total_steps=agent_step_count, ) + trajectory_extra = {"subagent_trajectories": child_trajectories} if child_trajectories else None return ATIFTrajectory( session_id=trajectory_session_id, agent=agent_config, steps=atif_steps, final_metrics=final_metrics, + extra=trajectory_extra, ) @@ -591,254 +598,53 @@ def _flush_pending() -> None: class ATIFStreamConverter: - """Stateful converter that emits ATIF steps incrementally.""" + """Stateful converter that reuses the same two-pass conversion model. + + This stream adapter accumulates IST events and re-projects ATIF using the + same batch converter logic on each push. It keeps behavior consistent + between batch and stream conversion paths. + """ def __init__(self, agent_name: str = "nat-agent", *, allow_implicit_subagent_delegation: bool = False): - self._step_id: int = 1 self._session_id: str = str(uuid.uuid4()) - self._agent_config = ATIFAgentConfig(name=agent_name, version="0.0.0") - self._allow_implicit_subagent_delegation = allow_implicit_subagent_delegation - self._tool_defs_captured = False - self._pending: _PendingAgentTurn | None = None - self._emitted_steps: list[ATIFStep] = [] - self._total_prompt = 0 - self._total_completion = 0 - self._total_cached = 0 - self._delegation_flags_by_uuid: dict[str, bool] = {} + self._agent_name = agent_name + self._buffered_events: list[IntermediateStep] = [] + self._last_projected_steps: list[ATIFStep] = [] + self._returned_step_count = 0 + self._latest_trajectory = ATIFTrajectory( + session_id=self._session_id, + agent=ATIFAgentConfig(name=agent_name, version="0.0.0"), + steps=[], + ) + self._batch_converter = IntermediateStepToATIFConverter( + allow_implicit_subagent_delegation=allow_implicit_subagent_delegation) @property def agent_config(self) -> ATIFAgentConfig: - """Current agent configuration (populated as steps arrive).""" - return self._agent_config + """Current agent configuration based on latest projection.""" + return self._latest_trajectory.agent def push(self, ist: IntermediateStep) -> ATIFStep | None: - """Process one IntermediateStep and return a flushed ATIF step if available.""" - event_type = ist.event_type - category = ist.event_category - state = ist.event_state - - if state == IntermediateStepState.START and _extract_subagent_delegation_flag(ist.metadata): - self._delegation_flags_by_uuid[ist.UUID] = True - - if event_type == IntermediateStepType.WORKFLOW_START: - user_input = "" - if ist.data and ist.data.input is not None: - user_input = _extract_user_input(ist.data.input) - fn_name = ist.function_ancestry.function_name - if fn_name and fn_name != "root": - self._agent_config.name = fn_name - step_extra = _atif_step_extra_model_from_ist(ist) - extra = step_extra.model_dump(exclude_none=True) - step = ATIFStep( - step_id=self._step_id, - source="user", - message=user_input, - timestamp=_epoch_to_iso(ist.event_timestamp), - extra=extra or None, - ) - self._step_id += 1 - self._emitted_steps.append(step) + """Add one IST event and return next newly visible ATIF step.""" + self._buffered_events.append(ist) + self._latest_trajectory = self._batch_converter.convert( + steps=self._buffered_events, + session_id=self._session_id, + agent_name=self._agent_name, + ) + self._last_projected_steps = list(self._latest_trajectory.steps) + if self._returned_step_count < len(self._last_projected_steps): + step = self._last_projected_steps[self._returned_step_count] + self._returned_step_count += 1 return step - - if event_type == IntermediateStepType.WORKFLOW_END: - results: list[ATIFStep] = [] - flushed = self._flush_pending() - if flushed: - results.append(flushed) - final_output = "" - if ist.data and ist.data.output is not None: - final_output = _safe_str(ist.data.output) - last_agent_msg = "" - last_agent_ts: float | None = None - for s in reversed(self._emitted_steps): - if s.source == "agent": - last_agent_msg = str(s.message) - last_agent_ts = _iso_to_epoch(s.timestamp) if s.timestamp else None - break - should_emit_terminal_step = bool(final_output) and (final_output != last_agent_msg or - (last_agent_ts is not None - and ist.event_timestamp > last_agent_ts)) - if should_emit_terminal_step: - step_extra = _atif_step_extra_model_from_ist(ist) - extra = step_extra.model_dump(exclude_none=True) - final_step = ATIFStep( - step_id=self._step_id, - source="agent", - message=final_output, - timestamp=_epoch_to_iso(ist.event_timestamp), - extra=extra or None, - ) - self._step_id += 1 - self._emitted_steps.append(final_step) - results.append(final_step) - return results[0] if results else None - - if event_type == IntermediateStepType.LLM_END: - flushed = self._flush_pending() - llm_output = "" - if ist.data and ist.data.output is not None: - llm_output = _safe_str(ist.data.output) - metrics = _extract_metrics(ist) - if metrics: - self._total_prompt += metrics.prompt_tokens or 0 - self._total_completion += metrics.completion_tokens or 0 - self._total_cached += metrics.cached_tokens or 0 - if not self._tool_defs_captured: - defs = _extract_tool_definitions(ist) - if defs: - self._agent_config.tool_definitions = defs - self._tool_defs_captured = True - if ist.name and not self._agent_config.model_name: - self._agent_config.model_name = ist.name - self._pending = _PendingAgentTurn( - message=llm_output, - timestamp=ist.event_timestamp, - model_name=ist.name, - metrics=metrics, - ) - self._pending.ancestry = _atif_ancestry_from_ist(ist) - self._pending.invocation = _atif_invocation_from_ist(ist) - return flushed - - if event_type == IntermediateStepType.TOOL_END: - start_flag = self._delegation_flags_by_uuid.pop(ist.UUID, False) - if self._pending is not None: - _record_observed_invocation(self._pending, ist, start_flag=start_flag) - return None - - orphan_pending = _PendingAgentTurn(message="", timestamp=ist.event_timestamp, model_name=None, metrics=None) - orphan_pending.ancestry = _atif_ancestry_from_ist(ist) - _record_observed_invocation(orphan_pending, ist, start_flag=start_flag) - if not orphan_pending.observed_invocations: - return None - invocation = orphan_pending.observed_invocations[0] - step_extra = _atif_step_extra_model_from_ist(ist) - step_extra.tool_invocations = [invocation.invocation] - step_extra.tool_ancestry = [invocation.ancestry] - observation_results = _build_observation_results( - [invocation], - self._session_id, - allow_implicit_delegation=self._allow_implicit_subagent_delegation, - ) - extra = step_extra.model_dump(exclude_none=True) - orphan_step = ATIFStep( - step_id=self._step_id, - source="agent", - message="", - timestamp=_epoch_to_iso(ist.event_timestamp), - tool_calls=[invocation.tool_call], - observation=ATIFObservation(results=observation_results), - extra=extra or None, - ) - self._step_id += 1 - self._emitted_steps.append(orphan_step) - return orphan_step - - if event_type == IntermediateStepType.FUNCTION_END: - start_flag = self._delegation_flags_by_uuid.pop(ist.UUID, False) - if self._pending is not None: - _record_observed_invocation(self._pending, ist, start_flag=start_flag) - return None - - orphan_pending = _PendingAgentTurn(message="", timestamp=ist.event_timestamp, model_name=None, metrics=None) - orphan_pending.ancestry = _atif_ancestry_from_ist(ist) - _record_observed_invocation(orphan_pending, ist, start_flag=start_flag) - if not orphan_pending.observed_invocations: - return None - invocation = orphan_pending.observed_invocations[0] - step_extra = _atif_step_extra_model_from_ist(ist) - step_extra.tool_invocations = [invocation.invocation] - step_extra.tool_ancestry = [invocation.ancestry] - observation_results = _build_observation_results( - [invocation], - self._session_id, - allow_implicit_delegation=self._allow_implicit_subagent_delegation, - ) - extra = step_extra.model_dump(exclude_none=True) - orphan_step = ATIFStep( - step_id=self._step_id, - source="agent", - message="", - timestamp=_epoch_to_iso(ist.event_timestamp), - tool_calls=[invocation.tool_call], - observation=ATIFObservation(results=observation_results), - extra=extra or None, - ) - self._step_id += 1 - self._emitted_steps.append(orphan_step) - return orphan_step - - if state == IntermediateStepState.END and category not in ( - IntermediateStepCategory.LLM, - IntermediateStepCategory.TOOL, - IntermediateStepCategory.WORKFLOW, - ): - return None - return None def finalize(self) -> list[ATIFStep]: - """Flush any pending agent turn and return remaining steps.""" - result: list[ATIFStep] = [] - flushed = self._flush_pending() - if flushed: - result.append(flushed) - return result + """Return all projected ATIF steps not yet returned by `push`.""" + remaining = self._last_projected_steps[self._returned_step_count:] + self._returned_step_count = len(self._last_projected_steps) + return remaining def get_trajectory(self) -> ATIFTrajectory: - """Build the complete ATIF trajectory from all emitted steps.""" - agent_step_count = sum(1 for s in self._emitted_steps if s.source == "agent") - final_metrics = None - if self._total_prompt or self._total_completion or self._total_cached or agent_step_count: - final_metrics = ATIFFinalMetrics( - total_prompt_tokens=self._total_prompt or None, - total_completion_tokens=self._total_completion or None, - total_cached_tokens=self._total_cached or None, - total_steps=agent_step_count, - ) - return ATIFTrajectory( - session_id=self._session_id, - agent=self._agent_config, - steps=list(self._emitted_steps), - final_metrics=final_metrics, - ) - - def _flush_pending(self) -> ATIFStep | None: - """Convert the pending turn into an ATIFStep and clear it.""" - if self._pending is None: - return None - pending = self._pending - sorted_invocations = sorted(pending.observed_invocations, key=lambda i: i.order_key) - tool_calls = [obs.tool_call for obs in sorted_invocations] or None - observations = _build_observation_results( - sorted_invocations, - self._session_id, - allow_implicit_delegation=self._allow_implicit_subagent_delegation, - ) - observation = ATIFObservation(results=observations) if observations else None - tool_ancestry = [obs.ancestry for obs in sorted_invocations] - tool_invocations = [obs.invocation for obs in sorted_invocations] or None - if pending.ancestry is None: - raise ValueError("Pending agent turn is missing required ATIF ancestry metadata") - step_extra = AtifStepExtra( - ancestry=pending.ancestry, - invocation=pending.invocation, - tool_ancestry=tool_ancestry, - tool_invocations=tool_invocations, - **pending.extra, - ) - step = ATIFStep( - step_id=self._step_id, - source="agent", - message=pending.message, - timestamp=_epoch_to_iso(pending.timestamp), - model_name=pending.model_name, - tool_calls=tool_calls, - observation=observation, - metrics=pending.metrics, - extra=step_extra.model_dump(exclude_none=True), - ) - self._step_id += 1 - self._emitted_steps.append(step) - self._pending = None - return step + """Return trajectory projected from all buffered IST events.""" + return self._latest_trajectory diff --git a/packages/nvidia_nat_core/tests/nat/utils/test_atif_converter.py b/packages/nvidia_nat_core/tests/nat/utils/test_atif_converter.py index 1c598fc2cf..99240e33c9 100644 --- a/packages/nvidia_nat_core/tests/nat/utils/test_atif_converter.py +++ b/packages/nvidia_nat_core/tests/nat/utils/test_atif_converter.py @@ -22,6 +22,7 @@ from nat.builder.framework_enum import LLMFrameworkEnum from nat.data_models.intermediate_step import IntermediateStep from nat.data_models.intermediate_step import IntermediateStepPayload +from nat.data_models.intermediate_step import TraceMetadata from nat.data_models.intermediate_step import IntermediateStepType from nat.data_models.intermediate_step import StreamEventData from nat.data_models.intermediate_step import UsageInfo @@ -977,11 +978,23 @@ def test_implicit_subagent_delegation_is_disabled_by_default(self): assert first_agent_turn.observation.results[0].subagent_trajectory_ref is None def test_implicit_subagent_delegation_can_be_enabled(self): - """Implicit delegation inference can be enabled explicitly.""" + """Explicit delegation marker emits subagent refs and embedded child trajectory.""" converter = IntermediateStepToATIFConverter(allow_implicit_subagent_delegation=True) + wrapper_uuid = "wrapper-step" + wrapper_start = _make_step( + IntermediateStepType.FUNCTION_START, + name="child_agent", + timestamp_offset=1.5, + function_name="parent_agent", + function_id="wrapper-fn", + function_parent_id="root", + step_uuid=wrapper_uuid, + ) + wrapper_start.payload.metadata = TraceMetadata(provided_metadata={"is_subagent_delegation": True}) steps = [ _make_step(IntermediateStepType.WORKFLOW_START, input_data="q", timestamp_offset=0.0), _make_step(IntermediateStepType.LLM_END, output_data="thinking", timestamp_offset=1.0), + wrapper_start, _make_step( IntermediateStepType.FUNCTION_END, name="child_agent", @@ -989,15 +1002,24 @@ def test_implicit_subagent_delegation_can_be_enabled(self): function_name="parent_agent", function_id="wrapper-fn", function_parent_id="root", - step_uuid="wrapper-step", + step_uuid=wrapper_uuid, + ), + _make_step( + IntermediateStepType.LLM_END, + name="child-model", + output_data="child thinking", + timestamp_offset=2.1, + function_name="child_agent", + function_id="child-root-fn", + function_parent_id="wrapper-fn", ), _make_step( IntermediateStepType.FUNCTION_END, name="inner_tool", - timestamp_offset=3.0, + timestamp_offset=2.2, function_name="child_agent", function_id="child-fn", - function_parent_id="wrapper-fn", + function_parent_id="child-root-fn", step_uuid="child-step", ), _make_step(IntermediateStepType.LLM_END, output_data="done", timestamp_offset=4.0), @@ -1010,6 +1032,12 @@ def test_implicit_subagent_delegation_can_be_enabled(self): refs = first_agent_turn.observation.results[0].subagent_trajectory_ref assert refs is not None assert len(refs) == 1 + child_sid = refs[0].session_id + assert result.extra is not None + assert "subagent_trajectories" in result.extra + embedded = result.extra["subagent_trajectories"] + assert isinstance(embedded, dict) + assert child_sid in embedded # --------------------------------------------------------------------------- @@ -1021,177 +1049,38 @@ class TestStreamConverter: """Tests for ATIFStreamConverter.""" def test_workflow_start_emits_user_step(self): - """WORKFLOW_START produces an immediate user step.""" - converter = ATIFStreamConverter() - step = _make_step( - IntermediateStepType.WORKFLOW_START, - input_data="hello", - timestamp_offset=0.0, - ) - result = converter.push(step) - assert result is not None - assert result.source == "user" - assert result.message == "hello" - - def test_llm_end_flushes_previous_turn(self): - """Second LLM_END flushes the first turn.""" - converter = ATIFStreamConverter() - converter.push(_make_step( - IntermediateStepType.WORKFLOW_START, - input_data="q", - timestamp_offset=0.0, - )) - # First LLM_END → creates pending, nothing to flush yet - result1 = converter.push( - _make_step( - IntermediateStepType.LLM_END, - name="gpt-4", - output_data="thinking...", - timestamp_offset=1.0, - )) - assert result1 is None # Nothing flushed yet - - # Second LLM_END → flushes the first turn - result2 = converter.push( - _make_step( - IntermediateStepType.LLM_END, - name="gpt-4", - output_data="done", - timestamp_offset=2.0, - )) - assert result2 is not None - assert result2.source == "agent" - assert result2.message == "thinking..." - - def test_tool_end_attaches_to_pending(self): - """TOOL_END attaches to the current pending agent turn.""" + """`WORKFLOW_START` emits the first projected user step.""" converter = ATIFStreamConverter() - converter.push(_make_step( - IntermediateStepType.WORKFLOW_START, - input_data="q", - timestamp_offset=0.0, - )) - converter.push(_make_step( - IntermediateStepType.LLM_END, - output_data="let me search", - timestamp_offset=1.0, - )) result = converter.push( _make_step( - IntermediateStepType.TOOL_END, - name="search", - input_data={"query": "test"}, - output_data="found it", - timestamp_offset=2.0, - step_uuid="tool-1", - )) - # Tool attaches to pending, doesn't emit yet - assert result is None - - # Finalize flushes - remaining = converter.finalize() - assert len(remaining) == 1 - flushed = remaining[0] - assert flushed.tool_calls is not None - assert len(flushed.tool_calls) == 1 - assert flushed.tool_calls[0].function_name == "search" - assert flushed.observation.results[0].content == "found it" - - def test_stream_converter_emits_orphan_tool_end(self): - """Orphan `TOOL_END` emits an immediate standalone agent step.""" - converter = ATIFStreamConverter() - converter.push(_make_step( - IntermediateStepType.WORKFLOW_START, - input_data="q", - timestamp_offset=0.0, - )) - orphan = converter.push( - _make_step( - IntermediateStepType.TOOL_END, - name="search", - input_data='{"query": "orphan"}', - output_data="found orphan", - timestamp_offset=1.0, - step_uuid="stream-orphan-tool-1", + IntermediateStepType.WORKFLOW_START, + input_data="hello", + timestamp_offset=0.0, )) - assert orphan is not None - assert orphan.source == "agent" - assert orphan.message == "" - assert orphan.tool_calls is not None - assert len(orphan.tool_calls) == 1 - assert orphan.observation is not None - assert len(orphan.observation.results) == 1 - assert orphan.observation.results[0].source_call_id == orphan.tool_calls[0].tool_call_id - - def test_stream_converter_populates_tool_definitions_from_llm_metadata(self): - """`LLM_END` metadata tool schemas populate stream converter agent config.""" - from nat.data_models.intermediate_step import ToolDetails - from nat.data_models.intermediate_step import ToolParameters - from nat.data_models.intermediate_step import ToolSchema - from nat.data_models.intermediate_step import TraceMetadata + assert result is not None + assert result.source == "user" + assert result.message == "hello" + def test_finalize_returns_unreturned_steps(self): + """`finalize` returns projected steps not returned by `push`.""" converter = ATIFStreamConverter() - converter.push(_make_step( - IntermediateStepType.WORKFLOW_START, - input_data="q", - timestamp_offset=0.0, - )) - llm_end = _make_step( - IntermediateStepType.LLM_END, - name="gpt-4", - output_data="using tools", - timestamp_offset=1.0, - ) - llm_end.payload.metadata = TraceMetadata(tools_schema=[ - ToolSchema( - type="function", - function=ToolDetails( - name="weather", - description="Get weather", - parameters=ToolParameters(properties={}), - ), - ) - ]) - - pushed = converter.push(llm_end) - assert pushed is None - assert converter.agent_config.tool_definitions is not None - assert len(converter.agent_config.tool_definitions) == 1 - assert converter.agent_config.tool_definitions[0]["function"]["name"] == "weather" - - def test_finalize_flushes_pending(self): - """finalize() returns any remaining pending turn.""" - converter = ATIFStreamConverter() - converter.push(_make_step( - IntermediateStepType.WORKFLOW_START, - input_data="q", - timestamp_offset=0.0, - )) - converter.push(_make_step( - IntermediateStepType.LLM_END, - output_data="answer", - timestamp_offset=1.0, - )) + converter.push(_make_step(IntermediateStepType.WORKFLOW_START, input_data="q", timestamp_offset=0.0)) + converter.push(_make_step(IntermediateStepType.LLM_END, output_data="thinking", timestamp_offset=1.0)) remaining = converter.finalize() assert len(remaining) == 1 - assert remaining[0].message == "answer" - - def test_finalize_empty_when_nothing_pending(self): - """finalize() returns empty list if no pending turn.""" - converter = ATIFStreamConverter() - assert converter.finalize() == [] + assert remaining[0].source == "agent" + assert remaining[0].message == "thinking" def test_get_trajectory_builds_complete( self, simple_trajectory: list[IntermediateStep], ): - """get_trajectory() returns a complete trajectory after all steps.""" + """`get_trajectory` returns the latest projected trajectory.""" converter = ATIFStreamConverter() for ist in simple_trajectory: converter.push(ist) converter.finalize() trajectory = converter.get_trajectory() - assert isinstance(trajectory, ATIFTrajectory) assert trajectory.schema_version == "ATIF-v1.6" assert len(trajectory.steps) >= 2 @@ -1202,18 +1091,14 @@ def test_stream_matches_batch( simple_trajectory: list[IntermediateStep], batch_converter: IntermediateStepToATIFConverter, ): - """Stream converter produces the same steps as batch converter.""" + """Stream converter uses the same two-pass projection as batch.""" batch_result = batch_converter.convert(simple_trajectory, session_id="test") - stream_conv = ATIFStreamConverter() for ist in simple_trajectory: stream_conv.push(ist) stream_conv.finalize() stream_result = stream_conv.get_trajectory() - assert len(stream_result.steps) == len(batch_result.steps) for s_step, b_step in zip(stream_result.steps, batch_result.steps, strict=True): assert s_step.source == b_step.source assert s_step.message == b_step.message - if b_step.tool_calls: - assert len(s_step.tool_calls) == len(b_step.tool_calls) diff --git a/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py b/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py index cefdf8d649..017e9f4903 100644 --- a/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py +++ b/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py @@ -96,7 +96,8 @@ def _collect_subtree_function_ids(trajectory: ATIFTrajectory, root_function_id: return subtree_ids @staticmethod - def _build_subtrajectory(trajectory: ATIFTrajectory, *, session_id: str, root_function_id: str) -> ATIFTrajectory | None: + def _build_subtrajectory(trajectory: ATIFTrajectory, *, session_id: str, + root_function_id: str) -> ATIFTrajectory | None: """Build a child trajectory view for a delegated function subtree.""" subtree_ids = EvalAtifAdapter._collect_subtree_function_ids(trajectory, root_function_id) if not subtree_ids: @@ -125,6 +126,18 @@ def _build_subtrajectory(trajectory: ATIFTrajectory, *, session_id: str, root_fu def _build_subagent_trajectory_map(self, trajectory: ATIFTrajectory, *, item_id: Any) -> dict[str, ATIFTrajectory]: """Build in-memory session_id -> trajectory map for subagent references.""" + extra = trajectory.extra if isinstance(trajectory.extra, dict) else None + embedded = extra.get("subagent_trajectories") if extra else None + if isinstance(embedded, Mapping): + parsed: dict[str, ATIFTrajectory] = {} + for sid, value in embedded.items(): + try: + parsed[str(sid)] = value if isinstance(value, ATIFTrajectory) else ATIFTrajectory.model_validate(value) + except Exception: + logger.warning("Invalid embedded subagent trajectory for item_id=%s session_id=%s", item_id, sid) + if parsed: + return parsed + by_session_id: dict[str, ATIFTrajectory] = {} unresolved: list[str] = [] diff --git a/packages/nvidia_nat_eval/tests/eval/test_atif_adapter.py b/packages/nvidia_nat_eval/tests/eval/test_atif_adapter.py index e8a277b8a2..ba4aab4543 100644 --- a/packages/nvidia_nat_eval/tests/eval/test_atif_adapter.py +++ b/packages/nvidia_nat_eval/tests/eval/test_atif_adapter.py @@ -81,43 +81,44 @@ def test_build_samples_populates_in_memory_subagent_map(): item = _make_eval_input_item("sample-b") eval_input = EvalInput(eval_input_items=[item]) prebuilt = ATIFTrajectory.model_validate({ - "session_id": "sample-b", + "session_id": + "sample-b", "agent": { "name": "prebuilt-agent", "version": "0.0.0", }, - "steps": [ - { - "step_id": 1, - "source": "agent", - "message": "", - "tool_calls": [{ - "tool_call_id": "call_parent", - "function_name": "delegator", - "arguments": {}, - }], - "observation": { - "results": [{ - "source_call_id": "call_parent", - "content": "delegated", - "subagent_trajectory_ref": [{ - "session_id": "sample-b:call_child", - "trajectory_path": None, - "extra": { - "child_function_id": "child-fn", - }, - }], + "steps": [{ + "step_id": 1, + "source": "agent", + "message": "", + "tool_calls": [{ + "tool_call_id": "call_parent", + "function_name": "delegator", + "arguments": {}, + }], + "observation": { + "results": [{ + "source_call_id": + "call_parent", + "content": + "delegated", + "subagent_trajectory_ref": [{ + "session_id": "sample-b:call_child", + "trajectory_path": None, + "extra": { + "child_function_id": "child-fn", + }, }], + }], + }, + "extra": { + "ancestry": { + "function_id": "child-fn", + "parent_id": "parent-fn", + "function_name": "delegator", }, - "extra": { - "ancestry": { - "function_id": "child-fn", - "parent_id": "parent-fn", - "function_name": "delegator", - }, - }, - } - ], + }, + }], }) samples = adapter.build_samples(eval_input, prebuilt_trajectories={"sample-b": prebuilt}) From afc79aec951776fb627c33b83ea9edd29dbc244b Mon Sep 17 00:00:00 2001 From: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> Date: Wed, 8 Apr 2026 10:28:27 -0700 Subject: [PATCH 04/13] Update IST to ATIF mapping guidelines Signed-off-by: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> --- .../intermediate-step-to-atif-mapping.md | 145 ++++++++++-------- 1 file changed, 85 insertions(+), 60 deletions(-) diff --git a/packages/nvidia_nat_atif/intermediate-step-to-atif-mapping.md b/packages/nvidia_nat_atif/intermediate-step-to-atif-mapping.md index 30a0d53974..98516b61e6 100644 --- a/packages/nvidia_nat_atif/intermediate-step-to-atif-mapping.md +++ b/packages/nvidia_nat_atif/intermediate-step-to-atif-mapping.md @@ -18,72 +18,97 @@ limitations under the License. # IntermediateStep to ATIF Mapping -This document explains how IntermediateStep event streams are mapped to ATIF trajectories. - -It is intentionally generic and applies to the current conversion path used by the toolkit. - -## ID Mappings +This document defines the current conversion contract from NAT `IntermediateStep` +events to ATIF trajectories. + +The conversion is intentionally split into two passes: + +1. **Pass 1: build execution structure** + - Determine root context ownership and delegated child context ownership. + - Build subagent linkage metadata from explicit delegation markers. +2. **Pass 2: project contexts to ATIF** + - Convert each context into ATIF steps using deterministic step window rules. + - Emit flat tool call and observation rows with lineage metadata for nesting reconstruction. + +The converter code implementing this model lives in: +`packages/nvidia_nat_core/src/nat/utils/atif_converter.py`, with key entry points: +`_pass1_build_execution_structure()` and `_pass2_project_context_to_steps()`. + +## Pass 1: Execution Structure Rules + +- Events are ordered by `event_timestamp`. +- Context ownership is computed from callable lineage and explicit delegation markers. +- Delegation boundaries are explicit only: + - marker source: `metadata.provided_metadata["is_subagent_delegation"] == true` + - no implicit delegation inference in the clean mapping path. +- Pass 1 outputs: + - `root_events` + - `child_events_by_session` + - `subagent_ref_by_call_id` + +## Pass 2: Context Projection Rules + +- Each context is projected independently into ATIF steps. +- `WORKFLOW_START` can emit a `source="user"` step for the root context. +- Step windows are anchored by terminal events: + - `LLM_END` + - `TOOL_END` + - `FUNCTION_END` + - `WORKFLOW_END` (terminal output only when not redundant) +- Tool rows are flat by design: + - nested tool calls are kept in `tool_calls`, `observation.results`, + `extra.tool_ancestry`, and `extra.tool_invocations` + - subscribers reconstruct nesting using ancestry `function_id` and `parent_id`. +- Chunk events (`LLM_NEW_TOKEN`, `SPAN_CHUNK`) do not emit standalone ATIF steps. + +## Subagent Trajectory Rules + +- Parent observation rows include `subagent_trajectory_ref` only for explicitly + marked delegating calls. +- Child trajectories are built from delegated context events only. +- Embedded children are stored under: + - `trajectory.extra["subagent_trajectories"]` +- Reference resolution precedence: + 1. `subagent_trajectory_ref.trajectory_path` + 2. `trajectory.extra.subagent_trajectories[session_id]` + +## ID and Lineage Mapping | IntermediateStep | ATIF | Mapping Rule | Notes | |---|---|---|---| | `payload.UUID` | `tool_calls[*].tool_call_id` | `tool_call_id = "call_" + payload.UUID` | Invocation occurrence identity | -| `payload.UUID` | `observation.results[*].source_call_id` | `source_call_id = tool_call_id` | Observation links to invocation | -| `payload.UUID` | `extra.tool_invocations[*].invocation_id` | `invocation_id = tool_call_id` | Timing row identity for same invocation | +| `payload.UUID` | `observation.results[*].source_call_id` | `source_call_id = tool_call_id` | Observation-to-call link | +| `payload.UUID` | `extra.tool_invocations[*].invocation_id` | `invocation_id = tool_call_id` | Invocation timing row identity | | `function_ancestry.function_id` | `extra.tool_ancestry[*].function_id` | Direct match | Callable lineage node identity | -| `function_ancestry.parent_id` | `extra.tool_ancestry[*].parent_id` | Direct match | Parent callable lineage node identity | -| `function_ancestry.function_id` (step context) | `extra.ancestry.function_id` | Direct match | Step-level callable context | -| Not applicable | `step_id` | Generated ATIF sequence counter | Not derived from IntermediateStep UUID | +| `function_ancestry.parent_id` | `extra.tool_ancestry[*].parent_id` | Direct match | Lineage parent node identity | +| `function_ancestry.function_id` (step anchor) | `extra.ancestry.function_id` | Direct match | Step context identity | +| Not applicable | `step_id` | Generated sequentially from 1 | Not derived from UUID | -## Name Mappings +## Name Mapping | IntermediateStep | ATIF | Mapping Rule | Notes | |---|---|---|---| -| `payload.name` (tool, function, or LLM by event type) | `tool_calls[*].function_name` or `model_name` | Context dependent | IntermediateStep name is polymorphic by event type | -| `function_ancestry.function_name` | `extra.tool_ancestry[*].function_name` | Direct match | Callable lineage node name | -| `function_ancestry.parent_name` | `extra.tool_ancestry[*].parent_name` | Direct match | Parent callable lineage node name | -| `function_ancestry.function_name` (step context) | `extra.ancestry.function_name` | Direct match | Step-level lineage name | - -## Event-to-Step Mapping - -- `WORKFLOW_START` maps to an ATIF user step (`source = "user"`). -- `LLM_END` starts an ATIF agent turn candidate step. -- `TOOL_END` and `FUNCTION_END` are accumulated into the pending ATIF step as observed invocations. -- `WORKFLOW_END` may emit a terminal ATIF agent step when final output is present and not redundant. -- `LLM_NEW_TOKEN` and other non-terminal chunk events are not directly emitted as standalone ATIF steps. - -## Identity Semantics - -- Invocation identity and callable identity are intentionally different: - - Invocation identity: `tool_call_id`, `source_call_id`, `invocation_id` - - Callable identity: `function_id`, `parent_id` -- Correct lineage interpretation requires both: - - use invocation IDs for per-call correlation, - - use callable IDs for hierarchy and repeated-call disambiguation. - -## Timing Mappings - -- IntermediateStep end events commonly use: - - `event_timestamp` as end timestamp, - - `span_event_timestamp` as start timestamp. -- ATIF timing is represented in: - - `extra.invocation.start_timestamp` and `extra.invocation.end_timestamp` (step-level), - - `extra.tool_invocations[*].start_timestamp` and `extra.tool_invocations[*].end_timestamp` (per invocation). - -## Practical Validation Checklist - -- Verify `tool_call_id == source_call_id == invocation_id` for each ATIF invocation row. -- Verify `tool_call_id == "call_" + payload.UUID` for mapped tool or function end events. -- Verify callable lineage consistency: - - `function_ancestry.function_id <-> extra.tool_ancestry[*].function_id` - - `function_ancestry.parent_id <-> extra.tool_ancestry[*].parent_id` -- Verify name consistency: - - `function_ancestry.function_name <-> extra.tool_ancestry[*].function_name` - - `function_ancestry.parent_name <-> extra.tool_ancestry[*].parent_name` - -## Additional Identifiers Worth Tracking - -- IntermediateStep structural parent linkage: `parent_id` -- Event semantics: `payload.event_type` -- Time surfaces: `event_timestamp`, `span_event_timestamp`, ATIF `timestamp` -- Session scope: ATIF `session_id` -- Framework or provider run IDs when present in metadata (for example, model framework trace IDs) +| `payload.name` on `LLM_END` | `model_name` | Direct match | Model name for agent step | +| `payload.name` on `TOOL_END` or `FUNCTION_END` | `tool_calls[*].function_name` | Direct match | Invocation display name | +| `function_ancestry.function_name` | `extra.tool_ancestry[*].function_name` | Direct match | Callable lineage name | +| `function_ancestry.parent_name` | `extra.tool_ancestry[*].parent_name` | Direct match | Parent lineage name | + +## Timing Mapping + +- `span_event_timestamp` maps to start timestamp surfaces when present. +- `event_timestamp` maps to end timestamp surfaces. +- ATIF timing fields: + - `extra.invocation.start_timestamp` + - `extra.invocation.end_timestamp` + - `extra.tool_invocations[*].start_timestamp` + - `extra.tool_invocations[*].end_timestamp` + +## Validation Checklist + +- `tool_call_id == source_call_id == invocation_id` for each projected invocation row. +- `tool_call_id == "call_" + payload.UUID` for mapped tool and function end events. +- Lineage consistency: + - `function_ancestry.function_id` equals `extra.tool_ancestry[*].function_id` + - `function_ancestry.parent_id` equals `extra.tool_ancestry[*].parent_id` +- Step IDs are sequential and start from 1. +- Every emitted `subagent_trajectory_ref.session_id` resolves by precedence rules. From 1ea50376820950a8e9b66389a14b71cf45cd7dc0 Mon Sep 17 00:00:00 2001 From: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> Date: Wed, 8 Apr 2026 10:32:08 -0700 Subject: [PATCH 05/13] pre-commit fixes Signed-off-by: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> --- .../src/nat/utils/atif_converter.py | 19 ++++++++++--------- .../tests/nat/utils/test_atif_converter.py | 2 +- .../nat/plugins/eval/runtime/atif_adapter.py | 3 ++- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/packages/nvidia_nat_core/src/nat/utils/atif_converter.py b/packages/nvidia_nat_core/src/nat/utils/atif_converter.py index 7de569f3c3..10b55fc347 100644 --- a/packages/nvidia_nat_core/src/nat/utils/atif_converter.py +++ b/packages/nvidia_nat_core/src/nat/utils/atif_converter.py @@ -266,10 +266,10 @@ def _record_observed_invocation(pending: _PendingAgentTurn, ist: IntermediateSte )) -def _build_flat_observation_rows(observed: list[_ObservedInvocation], - *, - subagent_ref_by_call_id: dict[str, SubagentTrajectoryRef] | None = None - ) -> list[ATIFObservationResult]: +def _build_flat_observation_rows( + observed: list[_ObservedInvocation], + *, + subagent_ref_by_call_id: dict[str, SubagentTrajectoryRef] | None = None) -> list[ATIFObservationResult]: """Build observation rows and attach explicit subagent refs when available.""" results = [ ATIFObservationResult(source_call_id=obs.tool_call.tool_call_id, content=obs.tool_output) for obs in observed @@ -432,8 +432,7 @@ def _flush_pending() -> None: return atif_steps, step_id, total_prompt, total_completion, total_cached -def _pass1_build_execution_structure(sorted_steps: list[IntermediateStep], *, - session_id: str) -> _ExecutionStructure: +def _pass1_build_execution_structure(sorted_steps: list[IntermediateStep], *, session_id: str) -> _ExecutionStructure: """Build root and child context ownership from IST events.""" delegation_flags_by_uuid: dict[str, bool] = {} for ist in sorted_steps: @@ -463,7 +462,7 @@ def _pass1_build_execution_structure(sorted_steps: list[IntermediateStep], *, wrapper_fn_id = wrapper.function_ancestry.function_id direct_children = children_by_parent.get(wrapper_fn_id, []) preferred_roots = [c for c in direct_children if c.function_ancestry.function_name == (wrapper.name or "") - ] or direct_children + ] or direct_children if not preferred_roots: continue child_root = sorted(preferred_roots, key=lambda s: s.event_timestamp)[0] @@ -479,7 +478,9 @@ def _pass1_build_execution_structure(sorted_steps: list[IntermediateStep], *, for child in children_by_parent.get(node, []): frontier.append(child.function_ancestry.function_id) - child_events = [e for e in end_events if e.function_ancestry.function_id in subtree_ids and e.UUID != wrapper.UUID] + child_events = [ + e for e in end_events if e.function_ancestry.function_id in subtree_ids and e.UUID != wrapper.UUID + ] if not child_events: continue child_session_id = f"{session_id}:{wrapper_call_id}" @@ -489,7 +490,7 @@ def _pass1_build_execution_structure(sorted_steps: list[IntermediateStep], *, root_events = [ e for e in sorted_steps - if (e.event_type == IntermediateStepType.WORKFLOW_START or e.event_type == IntermediateStepType.WORKFLOW_END + if (e.event_type in {IntermediateStepType.WORKFLOW_START, IntermediateStepType.WORKFLOW_END} or e.function_ancestry.function_id not in delegated_function_ids) ] subagent_ref_by_call_id = { diff --git a/packages/nvidia_nat_core/tests/nat/utils/test_atif_converter.py b/packages/nvidia_nat_core/tests/nat/utils/test_atif_converter.py index 99240e33c9..9d3ad434a4 100644 --- a/packages/nvidia_nat_core/tests/nat/utils/test_atif_converter.py +++ b/packages/nvidia_nat_core/tests/nat/utils/test_atif_converter.py @@ -22,9 +22,9 @@ from nat.builder.framework_enum import LLMFrameworkEnum from nat.data_models.intermediate_step import IntermediateStep from nat.data_models.intermediate_step import IntermediateStepPayload -from nat.data_models.intermediate_step import TraceMetadata from nat.data_models.intermediate_step import IntermediateStepType from nat.data_models.intermediate_step import StreamEventData +from nat.data_models.intermediate_step import TraceMetadata from nat.data_models.intermediate_step import UsageInfo from nat.data_models.invocation_node import InvocationNode from nat.data_models.token_usage import TokenUsageBaseModel diff --git a/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py b/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py index 017e9f4903..3c598532c6 100644 --- a/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py +++ b/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py @@ -132,7 +132,8 @@ def _build_subagent_trajectory_map(self, trajectory: ATIFTrajectory, *, item_id: parsed: dict[str, ATIFTrajectory] = {} for sid, value in embedded.items(): try: - parsed[str(sid)] = value if isinstance(value, ATIFTrajectory) else ATIFTrajectory.model_validate(value) + parsed[str(sid)] = value if isinstance(value, + ATIFTrajectory) else ATIFTrajectory.model_validate(value) except Exception: logger.warning("Invalid embedded subagent trajectory for item_id=%s session_id=%s", item_id, sid) if parsed: From b827d78fea3de5cc1b7769ae4b285ce0d916e48c Mon Sep 17 00:00:00 2001 From: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> Date: Wed, 8 Apr 2026 11:06:09 -0700 Subject: [PATCH 06/13] Add a script to convert any workflow_output.json to workflow_output_atif.json Signed-off-by: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> --- .../src/nat/utils/atif_converter.py | 37 +++++- .../convert_workflow_output_to_atif.py | 121 ++++++++++++++++++ 2 files changed, 153 insertions(+), 5 deletions(-) create mode 100644 packages/nvidia_nat_eval/scripts/convert_workflow_output_to_atif.py diff --git a/packages/nvidia_nat_core/src/nat/utils/atif_converter.py b/packages/nvidia_nat_core/src/nat/utils/atif_converter.py index 10b55fc347..b3648dc095 100644 --- a/packages/nvidia_nat_core/src/nat/utils/atif_converter.py +++ b/packages/nvidia_nat_core/src/nat/utils/atif_converter.py @@ -202,6 +202,17 @@ def _extract_subagent_delegation_flag(metadata: Any) -> bool: return False +def _event_uuid(ist: IntermediateStep) -> str: + """Return stable event UUID from top-level or payload fields.""" + top_level = getattr(ist, "UUID", None) + if top_level: + return str(top_level) + payload_level = getattr(ist.payload, "UUID", None) + if payload_level: + return str(payload_level) + return "" + + # --------------------------------------------------------------------------- # Internal accumulator # --------------------------------------------------------------------------- @@ -253,7 +264,11 @@ def _record_observed_invocation(pending: _PendingAgentTurn, ist: IntermediateSte if ist.data: tool_input = _parse_tool_arguments(ist.data.input) tool_output = _safe_str(ist.data.output) - call_id = f"call_{ist.UUID}" + event_uuid = _event_uuid(ist) + if not event_uuid: + logger.warning("Skipping invocation without UUID for tool=%s", tool_name) + return + call_id = f"call_{event_uuid}" is_subagent_delegation = _extract_subagent_delegation_flag(ist.metadata) or start_flag pending.observed_invocations.append( _ObservedInvocation( @@ -435,9 +450,15 @@ def _flush_pending() -> None: def _pass1_build_execution_structure(sorted_steps: list[IntermediateStep], *, session_id: str) -> _ExecutionStructure: """Build root and child context ownership from IST events.""" delegation_flags_by_uuid: dict[str, bool] = {} + delegated_callable_ids: set[str] = set() for ist in sorted_steps: if ist.event_state == IntermediateStepState.START and _extract_subagent_delegation_flag(ist.metadata): - delegation_flags_by_uuid[ist.UUID] = True + event_uuid = _event_uuid(ist) + if event_uuid: + delegation_flags_by_uuid[event_uuid] = True + function_id = ist.function_ancestry.function_id + if function_id: + delegated_callable_ids.add(function_id) end_events = [s for s in sorted_steps if s.event_state == IntermediateStepState.END] children_by_parent: dict[str, list[IntermediateStep]] = {} @@ -450,7 +471,10 @@ def _pass1_build_execution_structure(sorted_steps: list[IntermediateStep], *, se for e in end_events: if e.event_type not in (IntermediateStepType.TOOL_END, IntermediateStepType.FUNCTION_END): continue - if _extract_subagent_delegation_flag(e.metadata) or delegation_flags_by_uuid.get(e.UUID, False): + event_uuid = _event_uuid(e) + if (_extract_subagent_delegation_flag(e.metadata) + or (event_uuid and delegation_flags_by_uuid.get(event_uuid, False)) + or e.function_ancestry.function_id in delegated_callable_ids): wrapper_events.append(e) child_session_by_wrapper_call_id: dict[str, str] = {} @@ -458,7 +482,10 @@ def _pass1_build_execution_structure(sorted_steps: list[IntermediateStep], *, se delegated_function_ids: set[str] = set() for wrapper in wrapper_events: - wrapper_call_id = f"call_{wrapper.UUID}" + wrapper_uuid = _event_uuid(wrapper) + if not wrapper_uuid: + continue + wrapper_call_id = f"call_{wrapper_uuid}" wrapper_fn_id = wrapper.function_ancestry.function_id direct_children = children_by_parent.get(wrapper_fn_id, []) preferred_roots = [c for c in direct_children if c.function_ancestry.function_name == (wrapper.name or "") @@ -479,7 +506,7 @@ def _pass1_build_execution_structure(sorted_steps: list[IntermediateStep], *, se frontier.append(child.function_ancestry.function_id) child_events = [ - e for e in end_events if e.function_ancestry.function_id in subtree_ids and e.UUID != wrapper.UUID + e for e in end_events if e.function_ancestry.function_id in subtree_ids and _event_uuid(e) != wrapper_uuid ] if not child_events: continue diff --git a/packages/nvidia_nat_eval/scripts/convert_workflow_output_to_atif.py b/packages/nvidia_nat_eval/scripts/convert_workflow_output_to_atif.py new file mode 100644 index 0000000000..51058ffa02 --- /dev/null +++ b/packages/nvidia_nat_eval/scripts/convert_workflow_output_to_atif.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Convert legacy `workflow_output.json` (IST) into `workflow_output_atif.json`. + +Example: + python packages/nvidia_nat_eval/scripts/convert_workflow_output_to_atif.py \ + --input ".tmp/nat/examples/advanced_agents/alert_triage_agent/output/offline_atif/workflow_output.json" \ + --output-dir ".tmp/nat/examples/advanced_agents/alert_triage_agent/output/offline_atif" +""" + +from __future__ import annotations + +import argparse +import json +from pathlib import Path +from typing import Any + +from nat.data_models.intermediate_step import IntermediateStep +from nat.utils.atif_converter import IntermediateStepToATIFConverter + + +def _load_json(path: Path) -> Any: + return json.loads(path.read_text(encoding="utf-8")) + + +def _iter_items(payload: Any) -> list[dict[str, Any]]: + """Normalize legacy workflow output payload into item dictionaries.""" + if isinstance(payload, list): + return [item for item in payload if isinstance(item, dict) and isinstance(item.get("intermediate_steps"), list)] + if isinstance(payload, dict) and isinstance(payload.get("intermediate_steps"), list): + return [payload] + raise ValueError("Unsupported input JSON shape. Expected item(s) with `intermediate_steps`.") + + +def _session_id_for_item(item_id: Any) -> str: + item_type = type(item_id) + return f"{item_type.__module__}.{item_type.__qualname__}:{item_id!r}" + + +def _convert_item(item: dict[str, Any], + *, + converter: IntermediateStepToATIFConverter, + item_index: int, + agent_name: str | None) -> dict[str, Any]: + """Convert one legacy item with `intermediate_steps` to ATIF sample shape.""" + item_id = item.get("id", item_index) + steps = [IntermediateStep.model_validate(raw_step) for raw_step in item["intermediate_steps"]] + trajectory = converter.convert( + steps=steps, + session_id=_session_id_for_item(item_id), + agent_name=agent_name, + ) + + subagent_trajectories: dict[str, Any] = {} + if isinstance(trajectory.extra, dict): + embedded = trajectory.extra.get("subagent_trajectories") + if isinstance(embedded, dict): + subagent_trajectories = embedded + + return { + "item_id": item_id, + "trajectory": trajectory.model_dump(mode="json"), + "subagent_trajectories": subagent_trajectories, + "expected_output_obj": item.get("answer"), + "output_obj": item.get("generated_answer"), + "metadata": {}, + } + + +def _resolve_output_path(input_path: Path, output_path: Path | None, output_dir: Path | None) -> Path: + if output_path is not None: + return output_path + if output_dir is not None: + return output_dir / "workflow_output_atif.json" + return input_path.with_name("workflow_output_atif.json") + + +def main() -> None: + """Parse arguments and run IST to ATIF conversion.""" + parser = argparse.ArgumentParser( + description="Convert legacy workflow_output.json (IntermediateStep dump) to workflow_output_atif.json") + parser.add_argument("--input", type=Path, required=True, help="Path to workflow_output.json") + parser.add_argument("--output", type=Path, default=None, help="Output file path for workflow_output_atif.json") + parser.add_argument("--output-dir", type=Path, default=None, help="Directory to write workflow_output_atif.json") + parser.add_argument("--agent-name", default=None, help="Optional agent name override") + parser.add_argument("--compact", action="store_true", help="Write compact JSON (no pretty indentation)") + args = parser.parse_args() + + if args.output is not None and args.output_dir is not None: + raise ValueError("Provide only one of `--output` or `--output-dir`.") + + payload = _load_json(args.input) + items = _iter_items(payload) + converter = IntermediateStepToATIFConverter() + converted = [ + _convert_item(item, converter=converter, item_index=i, agent_name=args.agent_name) for i, item in enumerate(items) + ] + + output_path = _resolve_output_path(args.input, args.output, args.output_dir) + output_path.parent.mkdir(parents=True, exist_ok=True) + output_text = json.dumps(converted, ensure_ascii=True, indent=None if args.compact else 2) + output_path.write_text(output_text + "\n", encoding="utf-8") + print(f"Wrote {len(converted)} ATIF sample(s) to {output_path}") + + +if __name__ == "__main__": + main() + From baf069a68570ab88393a2648534055e318f97c63 Mon Sep 17 00:00:00 2001 From: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> Date: Wed, 8 Apr 2026 11:10:25 -0700 Subject: [PATCH 07/13] Add documentation for the offline converter scripts Signed-off-by: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> --- packages/nvidia_nat_eval/scripts/README.md | 43 +++++++++++++++++++ .../convert_workflow_output_to_atif.py | 4 +- 2 files changed, 45 insertions(+), 2 deletions(-) create mode 100644 packages/nvidia_nat_eval/scripts/README.md diff --git a/packages/nvidia_nat_eval/scripts/README.md b/packages/nvidia_nat_eval/scripts/README.md new file mode 100644 index 0000000000..d94df3959b --- /dev/null +++ b/packages/nvidia_nat_eval/scripts/README.md @@ -0,0 +1,43 @@ + + +# Scripts for ATIF Evaluation Utilities + +## Convert `workflow_output.json` to `workflow_output_atif.json` + +Use `convert_workflow_output_to_atif.py` to convert legacy IST workflow dumps into ATIF sample output format. + +### Script + +- `packages/nvidia_nat_eval/scripts/convert_workflow_output_to_atif.py` + +### Example + +```bash +python packages/nvidia_nat_eval/scripts/convert_workflow_output_to_atif.py \ + --input ".tmp/nat/examples/advanced_agents/alert_triage_agent/output/offline_atif/workflow_output.json" \ + --output-dir ".tmp/nat/examples/advanced_agents/alert_triage_agent/output/offline_atif" +``` + +This writes: + +- `.tmp/nat/examples/advanced_agents/alert_triage_agent/output/offline_atif/workflow_output_atif.json` + +### Required input setting + +To make conversion complete, the IST dump must include all events. + +- Set `workflow_output_step_filter: []` in your eval config (empty list means no filtering). +- If event filtering is enabled, required IST events can be missing and conversion output can be incomplete or incorrect. + +For example, in: + +- `examples/advanced_agents/alert_triage_agent/configs/config_offline_atif.yml` + +keep: + +```yaml +workflow_output_step_filter: [] +``` diff --git a/packages/nvidia_nat_eval/scripts/convert_workflow_output_to_atif.py b/packages/nvidia_nat_eval/scripts/convert_workflow_output_to_atif.py index 51058ffa02..039cc5a3d5 100644 --- a/packages/nvidia_nat_eval/scripts/convert_workflow_output_to_atif.py +++ b/packages/nvidia_nat_eval/scripts/convert_workflow_output_to_atif.py @@ -106,7 +106,8 @@ def main() -> None: items = _iter_items(payload) converter = IntermediateStepToATIFConverter() converted = [ - _convert_item(item, converter=converter, item_index=i, agent_name=args.agent_name) for i, item in enumerate(items) + _convert_item(item, converter=converter, item_index=i, agent_name=args.agent_name) + for i, item in enumerate(items) ] output_path = _resolve_output_path(args.input, args.output, args.output_dir) @@ -118,4 +119,3 @@ def main() -> None: if __name__ == "__main__": main() - From 860f4bad704898fbaa52c9574a25f3381fc70845 Mon Sep 17 00:00:00 2001 From: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> Date: Wed, 8 Apr 2026 11:56:09 -0700 Subject: [PATCH 08/13] Fix a bug with identifying subagent delegation Signed-off-by: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> --- .../src/nat/utils/atif_converter.py | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/packages/nvidia_nat_core/src/nat/utils/atif_converter.py b/packages/nvidia_nat_core/src/nat/utils/atif_converter.py index b3648dc095..73200aa7ce 100644 --- a/packages/nvidia_nat_core/src/nat/utils/atif_converter.py +++ b/packages/nvidia_nat_core/src/nat/utils/atif_converter.py @@ -198,10 +198,21 @@ def _extract_subagent_delegation_flag(metadata: Any) -> bool: return bool(provided.get("is_subagent_delegation", provided.get("subagent_delegation", False))) return False if isinstance(metadata, dict): + provided = metadata.get("provided_metadata") + if isinstance(provided, dict): + return bool(provided.get("is_subagent_delegation", provided.get("subagent_delegation", False))) return bool(metadata.get("is_subagent_delegation", metadata.get("subagent_delegation", False))) return False +def _event_metadata(ist: IntermediateStep) -> Any: + """Return metadata payload from the most reliable event surface.""" + payload_metadata = getattr(ist.payload, "metadata", None) + if payload_metadata is not None: + return payload_metadata + return getattr(ist, "metadata", None) + + def _event_uuid(ist: IntermediateStep) -> str: """Return stable event UUID from top-level or payload fields.""" top_level = getattr(ist, "UUID", None) @@ -269,7 +280,7 @@ def _record_observed_invocation(pending: _PendingAgentTurn, ist: IntermediateSte logger.warning("Skipping invocation without UUID for tool=%s", tool_name) return call_id = f"call_{event_uuid}" - is_subagent_delegation = _extract_subagent_delegation_flag(ist.metadata) or start_flag + is_subagent_delegation = _extract_subagent_delegation_flag(_event_metadata(ist)) or start_flag pending.observed_invocations.append( _ObservedInvocation( order_key=ist.payload.span_event_timestamp or ist.event_timestamp, @@ -452,7 +463,7 @@ def _pass1_build_execution_structure(sorted_steps: list[IntermediateStep], *, se delegation_flags_by_uuid: dict[str, bool] = {} delegated_callable_ids: set[str] = set() for ist in sorted_steps: - if ist.event_state == IntermediateStepState.START and _extract_subagent_delegation_flag(ist.metadata): + if ist.event_state == IntermediateStepState.START and _extract_subagent_delegation_flag(_event_metadata(ist)): event_uuid = _event_uuid(ist) if event_uuid: delegation_flags_by_uuid[event_uuid] = True @@ -469,10 +480,13 @@ def _pass1_build_execution_structure(sorted_steps: list[IntermediateStep], *, se wrapper_events: list[IntermediateStep] = [] for e in end_events: - if e.event_type not in (IntermediateStepType.TOOL_END, IntermediateStepType.FUNCTION_END): + # Delegation boundaries are represented by callable-level completion. + # Do not use TOOL_END here: nested tool rows inherit parent ancestry and can + # otherwise be misclassified as delegation wrappers. + if e.event_type != IntermediateStepType.FUNCTION_END: continue event_uuid = _event_uuid(e) - if (_extract_subagent_delegation_flag(e.metadata) + if (_extract_subagent_delegation_flag(_event_metadata(e)) or (event_uuid and delegation_flags_by_uuid.get(event_uuid, False)) or e.function_ancestry.function_id in delegated_callable_ids): wrapper_events.append(e) From 889d9109ccc715aa68806c40b2742691ede600fd Mon Sep 17 00:00:00 2001 From: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> Date: Wed, 8 Apr 2026 12:00:30 -0700 Subject: [PATCH 09/13] Update the captured outputs Signed-off-by: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> --- .../data/output_samples/workflow_output.json | 4 ++-- .../data/output_samples/workflow_output_atif.json | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/data/output_samples/workflow_output.json b/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/data/output_samples/workflow_output.json index 1e7ad0e580..ce6d57d432 100644 --- a/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/data/output_samples/workflow_output.json +++ b/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/data/output_samples/workflow_output.json @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:832a5311fb051c545b3b0d3862bf432ef6345f93a6c030250f9b98436cc33145 -size 993205 +oid sha256:6f85da4e4ca5390cb333643c547004a53d2425cb010879c79b8e7aac7a065381 +size 1014978 diff --git a/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/data/output_samples/workflow_output_atif.json b/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/data/output_samples/workflow_output_atif.json index 35150245ff..fccab4ee54 100644 --- a/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/data/output_samples/workflow_output_atif.json +++ b/examples/advanced_agents/alert_triage_agent/src/nat_alert_triage_agent/data/output_samples/workflow_output_atif.json @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:7c615d3d082705efd85e01e1a89e9294cb98b1723dfba658faaeb8f9ae2588fe -size 104469 +oid sha256:4235791575d6d26a3833c77d48b56a2923b92e75a986d67bc4ada51a16d184cc +size 115049 From 555a9e02a6d2d136a866851c83e7e2d129b15e63 Mon Sep 17 00:00:00 2001 From: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> Date: Wed, 8 Apr 2026 12:42:43 -0700 Subject: [PATCH 10/13] Fix session id of child trajectories Signed-off-by: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> --- .../scripts/convert_workflow_output_to_atif.py | 3 +-- .../src/nat/plugins/eval/runtime/atif_adapter.py | 10 +++++++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/packages/nvidia_nat_eval/scripts/convert_workflow_output_to_atif.py b/packages/nvidia_nat_eval/scripts/convert_workflow_output_to_atif.py index 039cc5a3d5..d0ed6c24a6 100644 --- a/packages/nvidia_nat_eval/scripts/convert_workflow_output_to_atif.py +++ b/packages/nvidia_nat_eval/scripts/convert_workflow_output_to_atif.py @@ -46,8 +46,7 @@ def _iter_items(payload: Any) -> list[dict[str, Any]]: def _session_id_for_item(item_id: Any) -> str: - item_type = type(item_id) - return f"{item_type.__module__}.{item_type.__qualname__}:{item_id!r}" + return str(item_id) def _convert_item(item: dict[str, Any], diff --git a/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py b/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py index 3c598532c6..25e32919f9 100644 --- a/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py +++ b/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/atif_adapter.py @@ -51,6 +51,11 @@ def _cache_key(item_id: Any) -> str: item_type = type(item_id) return f"{item_type.__module__}.{item_type.__qualname__}:{item_id!r}" + @staticmethod + def _trajectory_session_id(item_id: Any) -> str: + """Return the canonical ATIF trajectory session identifier.""" + return str(item_id) + def _coerce_trajectory(self, value: Any) -> ATIFTrajectory: if isinstance(value, ATIFTrajectory): return value @@ -193,7 +198,10 @@ def get_trajectory(self, if prebuilt is not None: trajectory = self._coerce_trajectory(prebuilt) else: - trajectory = self._converter.convert(steps=item.trajectory, session_id=key) + trajectory = self._converter.convert( + steps=item.trajectory, + session_id=self._trajectory_session_id(item.id), + ) self._cache[key] = trajectory return trajectory From b3a83ed58099336c691d02554005499b13c4afb8 Mon Sep 17 00:00:00 2001 From: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> Date: Thu, 9 Apr 2026 08:33:21 -0700 Subject: [PATCH 11/13] Update subagent name Signed-off-by: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> --- packages/nvidia_nat_core/src/nat/utils/atif_converter.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/packages/nvidia_nat_core/src/nat/utils/atif_converter.py b/packages/nvidia_nat_core/src/nat/utils/atif_converter.py index 73200aa7ce..4b35879393 100644 --- a/packages/nvidia_nat_core/src/nat/utils/atif_converter.py +++ b/packages/nvidia_nat_core/src/nat/utils/atif_converter.py @@ -247,6 +247,7 @@ class _ExecutionStructure: root_events: list[IntermediateStep] child_events_by_session: dict[str, list[IntermediateStep]] + child_agent_name_by_session: dict[str, str] subagent_ref_by_call_id: dict[str, SubagentTrajectoryRef] @@ -493,6 +494,7 @@ def _pass1_build_execution_structure(sorted_steps: list[IntermediateStep], *, se child_session_by_wrapper_call_id: dict[str, str] = {} child_events_by_session: dict[str, list[IntermediateStep]] = {} + child_agent_name_by_session: dict[str, str] = {} delegated_function_ids: set[str] = set() for wrapper in wrapper_events: @@ -527,6 +529,7 @@ def _pass1_build_execution_structure(sorted_steps: list[IntermediateStep], *, se child_session_id = f"{session_id}:{wrapper_call_id}" child_session_by_wrapper_call_id[wrapper_call_id] = child_session_id child_events_by_session[child_session_id] = sorted(child_events, key=lambda s: s.event_timestamp) + child_agent_name_by_session[child_session_id] = wrapper.name or child_root.function_ancestry.function_name or "nat-agent" delegated_function_ids.update(subtree_ids) root_events = [ @@ -541,6 +544,7 @@ def _pass1_build_execution_structure(sorted_steps: list[IntermediateStep], *, se return _ExecutionStructure( root_events=root_events, child_events_by_session=child_events_by_session, + child_agent_name_by_session=child_agent_name_by_session, subagent_ref_by_call_id=subagent_ref_by_call_id, ) @@ -607,9 +611,11 @@ def convert( include_workflow_start_user_step=False, subagent_ref_by_call_id=None, ) + child_agent = agent_config.model_copy(deep=True) + child_agent.name = execution_structure.child_agent_name_by_session.get(child_session_id, child_agent.name) child_trajectory = ATIFTrajectory( session_id=child_session_id, - agent=agent_config.model_copy(deep=True), + agent=child_agent, steps=child_steps, ) child_trajectories[child_session_id] = child_trajectory.model_dump(exclude_none=True, mode="json") From 42609e475a7c7c19ef2355571c9c807d4e8c56b8 Mon Sep 17 00:00:00 2001 From: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> Date: Thu, 9 Apr 2026 16:01:08 -0700 Subject: [PATCH 12/13] Fix child orphan step handling Signed-off-by: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> --- .../src/nat/utils/atif_converter.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/packages/nvidia_nat_core/src/nat/utils/atif_converter.py b/packages/nvidia_nat_core/src/nat/utils/atif_converter.py index 4b35879393..1a660168b7 100644 --- a/packages/nvidia_nat_core/src/nat/utils/atif_converter.py +++ b/packages/nvidia_nat_core/src/nat/utils/atif_converter.py @@ -496,23 +496,22 @@ def _pass1_build_execution_structure(sorted_steps: list[IntermediateStep], *, se child_events_by_session: dict[str, list[IntermediateStep]] = {} child_agent_name_by_session: dict[str, str] = {} delegated_function_ids: set[str] = set() + wrapper_event_ids: set[str] = set() for wrapper in wrapper_events: wrapper_uuid = _event_uuid(wrapper) if not wrapper_uuid: continue + wrapper_event_ids.add(wrapper_uuid) wrapper_call_id = f"call_{wrapper_uuid}" wrapper_fn_id = wrapper.function_ancestry.function_id - direct_children = children_by_parent.get(wrapper_fn_id, []) - preferred_roots = [c for c in direct_children if c.function_ancestry.function_name == (wrapper.name or "") - ] or direct_children - if not preferred_roots: + if not wrapper_fn_id: continue - child_root = sorted(preferred_roots, key=lambda s: s.event_timestamp)[0] - child_root_fn_id = child_root.function_ancestry.function_id subtree_ids: set[str] = set() - frontier = [child_root_fn_id] + # Delegated callable scope is anchored by wrapper function_id itself. + # Include that function_id and recursively include end-event children. + frontier = [wrapper_fn_id] while frontier: node = frontier.pop() if node in subtree_ids: @@ -529,12 +528,13 @@ def _pass1_build_execution_structure(sorted_steps: list[IntermediateStep], *, se child_session_id = f"{session_id}:{wrapper_call_id}" child_session_by_wrapper_call_id[wrapper_call_id] = child_session_id child_events_by_session[child_session_id] = sorted(child_events, key=lambda s: s.event_timestamp) - child_agent_name_by_session[child_session_id] = wrapper.name or child_root.function_ancestry.function_name or "nat-agent" + child_agent_name_by_session[child_session_id] = wrapper.name or wrapper.function_ancestry.function_name or "nat-agent" delegated_function_ids.update(subtree_ids) root_events = [ e for e in sorted_steps if (e.event_type in {IntermediateStepType.WORKFLOW_START, IntermediateStepType.WORKFLOW_END} + or _event_uuid(e) in wrapper_event_ids or e.function_ancestry.function_id not in delegated_function_ids) ] subagent_ref_by_call_id = { From 996412529c4e3582399c72aac15330a42997ac7b Mon Sep 17 00:00:00 2001 From: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> Date: Thu, 9 Apr 2026 16:08:14 -0700 Subject: [PATCH 13/13] Add tool args Signed-off-by: Anuradha Karuppiah <26330987+AnuradhaKaruppiah@users.noreply.github.com> --- .../src/nat/utils/atif_converter.py | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/packages/nvidia_nat_core/src/nat/utils/atif_converter.py b/packages/nvidia_nat_core/src/nat/utils/atif_converter.py index 1a660168b7..2b3a73a141 100644 --- a/packages/nvidia_nat_core/src/nat/utils/atif_converter.py +++ b/packages/nvidia_nat_core/src/nat/utils/atif_converter.py @@ -190,6 +190,24 @@ def _parse_tool_arguments(raw_input: Any) -> dict[str, Any]: return {} +def _extract_tool_arguments_from_ist(ist: IntermediateStep) -> dict[str, Any]: + """Extract tool arguments from preferred IST sources with fallback.""" + if ist.data and ist.data.input is not None: + return _parse_tool_arguments(ist.data.input) + + metadata = _event_metadata(ist) + metadata_tool_inputs: Any = None + if isinstance(metadata, TraceMetadata): + metadata_tool_inputs = metadata.tool_inputs + elif isinstance(metadata, dict): + metadata_tool_inputs = metadata.get("tool_inputs") + + if metadata_tool_inputs is not None: + return _parse_tool_arguments(metadata_tool_inputs) + + return {} + + def _extract_subagent_delegation_flag(metadata: Any) -> bool: """Extract optional subagent delegation flag from metadata payload.""" if isinstance(metadata, TraceMetadata): @@ -273,8 +291,8 @@ def _record_observed_invocation(pending: _PendingAgentTurn, ist: IntermediateSte return tool_input: dict[str, Any] = {} tool_output = "" + tool_input = _extract_tool_arguments_from_ist(ist) if ist.data: - tool_input = _parse_tool_arguments(ist.data.input) tool_output = _safe_str(ist.data.output) event_uuid = _event_uuid(ist) if not event_uuid: @@ -528,7 +546,9 @@ def _pass1_build_execution_structure(sorted_steps: list[IntermediateStep], *, se child_session_id = f"{session_id}:{wrapper_call_id}" child_session_by_wrapper_call_id[wrapper_call_id] = child_session_id child_events_by_session[child_session_id] = sorted(child_events, key=lambda s: s.event_timestamp) - child_agent_name_by_session[child_session_id] = wrapper.name or wrapper.function_ancestry.function_name or "nat-agent" + child_agent_name_by_session[child_session_id] = ( + wrapper.name or wrapper.function_ancestry.function_name or "nat-agent" + ) delegated_function_ids.update(subtree_ids) root_events = [