diff --git a/agentlightning/emergence/__init__.py b/agentlightning/emergence/__init__.py new file mode 100644 index 000000000..0bfdf001f --- /dev/null +++ b/agentlightning/emergence/__init__.py @@ -0,0 +1,46 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Open Emergence: an opposing worldview for agent training. + +This module sits alongside agent-lightning's optimization loop, holding +optimization in tension with exploration rather than replacing it. Every +component is additive and best-effort -- removing the entire module leaves +the core training loop unchanged. + +Reference: docs/open-emergence-design.md +""" + +from agentlightning.emergence.novelty import NoveltyAwareAdapter, NoveltyDetector, NoveltyScore, TrajectoryShape +from agentlightning.emergence.entropy import TrajectoryEntropy +from agentlightning.emergence.monitoring import CollapseSignal, EntropySnapshot, ExplorationDecayMonitor +from agentlightning.emergence.pareto import ParetoClassification, ParetoPoint, ParetoTracker +from agentlightning.emergence.reward_audit import RewardAuditAdapter, RewardStalenessAuditor +from agentlightning.emergence.dissolution import DissolutionEngine, DissolutionMetadata, DissolutionPolicy, ValidityCondition +from agentlightning.emergence.semconv import EmergenceSpanAttributes + +__all__ = [ + # Gap 5: Novelty Detection + "NoveltyDetector", + "NoveltyAwareAdapter", + "NoveltyScore", + "TrajectoryShape", + # Gap 1: Entropy Monitoring + "TrajectoryEntropy", + "ExplorationDecayMonitor", + "EntropySnapshot", + "CollapseSignal", + # Gap 3: Pareto Tension + "ParetoTracker", + "ParetoPoint", + "ParetoClassification", + # Gap 2: Reward Staleness + "RewardStalenessAuditor", + "RewardAuditAdapter", + # Gap 4: Dissolution + "DissolutionEngine", + "DissolutionMetadata", + "DissolutionPolicy", + "ValidityCondition", + # Semantic Conventions + "EmergenceSpanAttributes", +] diff --git a/agentlightning/emergence/dissolution.py b/agentlightning/emergence/dissolution.py new file mode 100644 index 000000000..c1ecd659f --- /dev/null +++ b/agentlightning/emergence/dissolution.py @@ -0,0 +1,304 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Gap 4: Policy dissolution mechanism. + +Manages resource lifecycle with TTL, validity conditions, and re-validation. +Wraps a LightningStore to intercept resource retrieval and check dissolution +conditions before returning resources. + +Dissolution condition: if resource lifecycle management moves into +CollectionBasedLightningStore natively, the wrapper adds indirection +without benefit. +""" + +from __future__ import annotations + +import logging +import time +from enum import Enum +from typing import Any, Callable, Dict, List, Literal, Optional, Tuple + +from pydantic import BaseModel, Field + +from agentlightning.store.base import LightningStore +from agentlightning.types import ResourcesUpdate + +from .types import ConditionResult, DissolutionAction, DissolutionSignal + +logger = logging.getLogger(__name__) + + +class DissolutionPolicy(str, Enum): + """What to do when dissolution triggers.""" + + REVALIDATE = "revalidate" + """Re-run validation, keep if still good.""" + REGRESS = "regress" + """Fall back to previous version.""" + EXPLORE = "explore" + """Switch to exploration mode (no resource pinning).""" + + +class ValidityCondition(BaseModel): + """A condition that must remain true for a resource to be valid.""" + + name: str + description: str = "" + check_type: Literal["reward_threshold", "entropy_threshold", "custom"] = "custom" + parameters: Dict[str, Any] = Field(default_factory=dict) + + +class ValidationRecord(BaseModel): + """Record of a re-validation attempt.""" + + timestamp: float + trigger: str + result: str + """'valid' | 'invalid' | 'error'""" + details: str = "" + + +class DissolutionMetadata(BaseModel): + """Metadata attached to resource versions for dissolution tracking. + + Stored in ResourcesUpdate's metadata dict under the key + 'agentlightning.emergence.dissolution'. + """ + + ttl_seconds: Optional[int] = None + """Time-to-live. After this duration, the resource should be + re-validated before use. None = no temporal expiry.""" + + created_at: float = 0.0 + """Timestamp when this resource version was created.""" + + validity_conditions: List[ValidityCondition] = Field(default_factory=list) + + validation_history: List[ValidationRecord] = Field(default_factory=list) + + on_dissolution: DissolutionPolicy = DissolutionPolicy.REVALIDATE + + +class DissolutionEngine: + """Manages resource lifecycle with TTL, validity conditions, and re-validation. + + Wraps a LightningStore to intercept resource retrieval and check + dissolution conditions before returning resources. + + The engine NEVER automatically removes resources. It surfaces + dissolution signals. The algorithm (or human operator) decides + whether to act. + """ + + def __init__( + self, + store: LightningStore, + default_ttl: Optional[int] = None, + check_interval: int = 10, + ): + self._store = store + self._default_ttl = default_ttl + self._check_interval = check_interval + self._dissolution_cache: Dict[str, DissolutionMetadata] = {} + self._check_counter = 0 + self._condition_checkers: Dict[str, Callable[..., ConditionResult]] = {} + + def register_condition_checker( + self, + check_type: str, + checker: Callable[..., ConditionResult], + ) -> None: + """Register a function that evaluates a specific condition type.""" + self._condition_checkers[check_type] = checker + + async def get_resources_with_dissolution_check( + self, + resources_id: Optional[str] = None, + ) -> Tuple[Optional[ResourcesUpdate], Optional[DissolutionSignal]]: + """Fetch resources, checking dissolution conditions. + + Returns the resources AND any dissolution signal. The caller + decides what to do -- the engine does not block resource access. + """ + if resources_id: + resources = await self._store.get_resources_by_id(resources_id) + else: + resources = await self._store.get_latest_resources() + + if resources is None: + return None, None + + self._check_counter += 1 + if self._check_counter % self._check_interval != 0: + return resources, None + + signal = self._check_dissolution(resources) + return resources, signal + + def _check_dissolution(self, resources: ResourcesUpdate) -> Optional[DissolutionSignal]: + """Check TTL and validity conditions for a resource.""" + meta = self._dissolution_cache.get(resources.resources_id) + if meta is None: + return None + + now = time.time() + + # TTL check + ttl = meta.ttl_seconds or self._default_ttl + if ttl is not None and meta.created_at > 0: + age = now - meta.created_at + if age > ttl: + severity = "warning" if age < ttl * 2 else "critical" + return DissolutionSignal( + trigger="ttl_expired", + severity=severity, + recommendation=( + f"Resource version {resources.resources_id} has been active for " + f"{age / 3600:.1f}h (TTL: {ttl / 3600:.1f}h). " + f"Could indicate the environment has changed since training. " + f"Consider re-validation." + ), + ) + + # Validity condition checks + for condition in meta.validity_conditions: + result = self._evaluate_condition(condition) + if result is not None and not result.passed: + severity = "warning" + return DissolutionSignal( + trigger=f"condition_failed:{condition.name}", + severity=severity, + recommendation=( + f"Validity condition '{condition.name}' failed for resource " + f"{resources.resources_id}: {result.description}. " + f"Could indicate the resource is no longer valid for the " + f"current environment." + ), + ) + + return None + + def _evaluate_condition(self, condition: ValidityCondition) -> Optional[ConditionResult]: + """Evaluate a single validity condition.""" + checker = self._condition_checkers.get(condition.check_type) + if checker is None: + logger.debug( + "No checker registered for condition type '%s'; skipping.", + condition.check_type, + ) + return None + + try: + return checker(condition) + except Exception: + logger.debug("Condition check failed for '%s'.", condition.name, exc_info=True) + return ConditionResult( + condition_name=condition.name, + passed=True, # Best-effort: don't block on check failure + description="Check failed; treated as passing (best-effort).", + ) + + async def attach_dissolution_metadata( + self, + resources_id: str, + ttl_seconds: Optional[int] = None, + validity_conditions: Optional[List[ValidityCondition]] = None, + policy: DissolutionPolicy = DissolutionPolicy.REVALIDATE, + ) -> None: + """Attach dissolution metadata to a resource version.""" + self._dissolution_cache[resources_id] = DissolutionMetadata( + ttl_seconds=ttl_seconds or self._default_ttl, + created_at=time.time(), + validity_conditions=validity_conditions or [], + on_dissolution=policy, + ) + + async def check_conditions( + self, + resources_id: str, + ) -> List[ConditionResult]: + """Evaluate all validity conditions for a resource version.""" + meta = self._dissolution_cache.get(resources_id) + if meta is None: + return [] + + results: List[ConditionResult] = [] + for condition in meta.validity_conditions: + result = self._evaluate_condition(condition) + if result is not None: + results.append(result) + return results + + async def dissolve( + self, + resources_id: str, + trigger: str, + ) -> DissolutionAction: + """Execute dissolution policy for a resource version. + + REVALIDATE: signal that re-validation is needed + REGRESS: find previous version, mark current as dissolved + EXPLORE: clear resource pinning, let runners use no resource + """ + meta = self._dissolution_cache.get(resources_id) + policy = meta.on_dissolution if meta else DissolutionPolicy.REVALIDATE + + # Record in validation history + if meta: + meta.validation_history.append( + ValidationRecord( + timestamp=time.time(), + trigger=trigger, + result="dissolved", + details=f"Policy: {policy.value}", + ) + ) + + if policy == DissolutionPolicy.REVALIDATE: + return DissolutionAction( + resources_id=resources_id, + policy=policy.value, + action_taken="revalidation_requested", + description=( + f"Resource {resources_id} marked for re-validation due to: {trigger}. " + f"The algorithm should re-run validation rollouts before continuing." + ), + ) + + elif policy == DissolutionPolicy.REGRESS: + # Find the previous resource version + all_resources = await self._store.query_resources( + sort_by="version", sort_order="desc", limit=2 + ) + if len(all_resources) > 1: + previous = all_resources[1] + return DissolutionAction( + resources_id=resources_id, + policy=policy.value, + action_taken=f"regressed_to:{previous.resources_id}", + description=( + f"Resource {resources_id} dissolved. Regressed to previous " + f"version {previous.resources_id} (v{previous.version})." + ), + ) + return DissolutionAction( + resources_id=resources_id, + policy=policy.value, + action_taken="no_previous_version", + description=( + f"Resource {resources_id} dissolution requested but no previous " + f"version exists. Could indicate this is the initial resource." + ), + ) + + else: # EXPLORE + return DissolutionAction( + resources_id=resources_id, + policy=policy.value, + action_taken="exploration_mode", + description=( + f"Resource {resources_id} dissolved. Entering exploration mode — " + f"runners should proceed without resource pinning to allow " + f"behavioral diversity." + ), + ) diff --git a/agentlightning/emergence/entropy.py b/agentlightning/emergence/entropy.py new file mode 100644 index 000000000..834c4d5bb --- /dev/null +++ b/agentlightning/emergence/entropy.py @@ -0,0 +1,152 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Gap 1: Trajectory entropy computation. + +Computes behavioral diversity metrics from trace trees. Entropy is computed +over the distribution of trajectory *shapes* (tool call sequences, branching +patterns) rather than trajectory *content* (specific tokens). + +Dissolution condition: when agent-lightning's native metrics include behavioral +diversity metrics, making external entropy computation redundant. +""" + +from __future__ import annotations + +import logging +import math +import re +from collections import Counter +from typing import TYPE_CHECKING, Dict, List, Optional, Sequence + +from agentlightning.types import Triplet + +from .novelty import TrajectoryShape + +if TYPE_CHECKING: + from agentlightning.adapter.triplet import TraceTree + +logger = logging.getLogger(__name__) + + +class TrajectoryEntropy: + """Compute behavioral diversity metrics from trace trees. + + Entropy is computed over the distribution of trajectory *shapes* + (tool call sequences, branching patterns) rather than trajectory + *content* (specific tokens). This distinguishes structural diversity + from surface-level variation. + """ + + def compute_shape_entropy( + self, + trees: Sequence["TraceTree"], + window_size: int = 50, + ) -> float: + """H(shape distribution) over recent trajectories. + + Shape = tuple of (span.name, depth) for each node in traversal order. + High entropy: agents take structurally different paths. + Low entropy: agents follow the same structural pattern. + """ + recent = list(trees)[-window_size:] + if not recent: + return 0.0 + + shapes: List[str] = [] + for tree in recent: + try: + shape = TrajectoryShape.from_trace_tree(tree) + shapes.append(shape.fingerprint) + except Exception: + logger.debug("Shape extraction failed for tree; skipping.", exc_info=True) + continue + + return _shannon_entropy(shapes) + + def compute_tool_entropy( + self, + trees: Sequence["TraceTree"], + window_size: int = 50, + llm_call_match: str = r"openai\.chat\.completion", + ) -> float: + """H(tool call distribution) over recent trajectories. + + Measures whether agents use diverse tools or converge on + a narrow subset. Computed from tool call spans matched via + the same llm_call_match regex used by TracerTraceToTriplet. + """ + recent = list(trees)[-window_size:] + if not recent: + return 0.0 + + tool_names: List[str] = [] + for tree in recent: + try: + for node in tree.traverse(): + if re.search(llm_call_match, node.span.name): + tool_names.append(node.span.name) + except Exception: + logger.debug("Tool extraction failed; skipping.", exc_info=True) + continue + + return _shannon_entropy(tool_names) + + def compute_reward_entropy( + self, + triplets: Sequence[Triplet], + n_bins: int = 20, + window_size: int = 50, + ) -> float: + """H(reward distribution) over recent triplets. + + High reward entropy: outcomes are spread across the reward range. + Low reward entropy: outcomes cluster at one value (convergence). + Note: low entropy + high mean reward could be genuine mastery + OR premature convergence. This metric alone cannot distinguish. + """ + recent = [t for t in list(triplets)[-window_size:] if t.reward is not None] + if not recent: + return 0.0 + + rewards = [t.reward for t in recent] + min_r = min(rewards) # type: ignore[type-var] + max_r = max(rewards) # type: ignore[type-var] + + if max_r == min_r: + return 0.0 + + # Bin rewards into n_bins buckets + bin_width = (max_r - min_r) / n_bins # type: ignore[operator] + bins: List[str] = [] + for r in rewards: + bin_idx = min(int((r - min_r) / bin_width), n_bins - 1) # type: ignore[operator] + bins.append(str(bin_idx)) + + return _shannon_entropy(bins) + + +def _shannon_entropy(items: List[str]) -> float: + """Compute Shannon entropy of a list of categorical items. + + Returns entropy in nats (natural log), normalized to [0, 1] range + where 1 means maximum diversity. + """ + if not items: + return 0.0 + counts = Counter(items) + total = len(items) + n_categories = len(counts) + if n_categories <= 1: + return 0.0 + + entropy = 0.0 + for count in counts.values(): + p = count / total + if p > 0: + entropy -= p * math.log(p) + + # Normalize by max possible entropy + max_entropy = math.log(n_categories) + if max_entropy == 0: + return 0.0 + return entropy / max_entropy diff --git a/agentlightning/emergence/monitoring.py b/agentlightning/emergence/monitoring.py new file mode 100644 index 000000000..236067c6d --- /dev/null +++ b/agentlightning/emergence/monitoring.py @@ -0,0 +1,199 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Gap 1: Sliding window collapse detection. + +Tracks entropy over sliding windows and detects exploration collapse. +Collapse detection: when entropy drops below threshold AND reward is still +improving, the system may be narrowing rather than improving. + +This is a signal, not a conclusion. The monitor surfaces the tension; +it does not resolve it. + +Dissolution condition: when algorithms natively implement exploration-exploitation +balancing that makes external collapse detection unnecessary. +""" + +from __future__ import annotations + +import logging +from collections import deque +from typing import TYPE_CHECKING, List, Optional, Sequence + +from pydantic import BaseModel + +from agentlightning.types import Triplet + +from .entropy import TrajectoryEntropy + +if TYPE_CHECKING: + from agentlightning.adapter.triplet import TraceTree + +logger = logging.getLogger(__name__) + + +class EntropySnapshot(BaseModel): + """Point-in-time entropy measurement.""" + + shape_entropy: float = 0.0 + tool_entropy: float = 0.0 + reward_entropy: float = 0.0 + mean_reward: float = 0.0 + window_index: int = 0 + + +class CollapseSignal(BaseModel): + """Signal emitted when exploration collapse is detected.""" + + entropy_trend: float + """Slope of entropy over trend_window (negative = declining).""" + reward_trend: float + """Slope of reward over trend_window (positive = improving).""" + severity: str + """'low' | 'medium' | 'high'""" + description: str + """Hedged interpretation using 'could indicate...' language.""" + + +class ExplorationDecayMonitor: + """Track entropy over sliding windows and detect collapse. + + Collapse detection: when entropy drops below threshold AND reward + is still improving, the system may be narrowing rather than improving. + + This is a signal, not a conclusion. The monitor surfaces the tension; + it does not resolve it. + """ + + def __init__( + self, + window_size: int = 50, + alert_threshold: float = 0.3, + trend_window: int = 5, + ): + self._window_size = window_size + self._alert_threshold = alert_threshold + self._trend_window = trend_window + self._history: deque[EntropySnapshot] = deque(maxlen=1000) + self._entropy_calculator = TrajectoryEntropy() + self._window_counter = 0 + + def record(self, trees: Sequence["TraceTree"], triplets: Sequence[Triplet]) -> EntropySnapshot: + """Compute and store entropy snapshot for current window.""" + shape_entropy = 0.0 + tool_entropy = 0.0 + reward_entropy = 0.0 + mean_reward = 0.0 + + try: + shape_entropy = self._entropy_calculator.compute_shape_entropy(trees, self._window_size) + except Exception: + logger.debug("Shape entropy computation failed.", exc_info=True) + + try: + tool_entropy = self._entropy_calculator.compute_tool_entropy(trees, self._window_size) + except Exception: + logger.debug("Tool entropy computation failed.", exc_info=True) + + try: + reward_entropy = self._entropy_calculator.compute_reward_entropy(triplets, window_size=self._window_size) + except Exception: + logger.debug("Reward entropy computation failed.", exc_info=True) + + rewarded = [t for t in triplets if t.reward is not None] + if rewarded: + mean_reward = sum(t.reward for t in rewarded) / len(rewarded) # type: ignore[misc] + + snapshot = EntropySnapshot( + shape_entropy=shape_entropy, + tool_entropy=tool_entropy, + reward_entropy=reward_entropy, + mean_reward=mean_reward, + window_index=self._window_counter, + ) + self._history.append(snapshot) + self._window_counter += 1 + return snapshot + + def detect_collapse(self) -> Optional[CollapseSignal]: + """Check if entropy is declining while reward is stable/improving. + + Returns CollapseSignal if collapse pattern detected, None otherwise. + """ + if len(self._history) < self._trend_window: + return None + + recent = list(self._history)[-self._trend_window:] + + # Compute trends using simple linear regression slope + entropy_values = [s.shape_entropy for s in recent] + reward_values = [s.mean_reward for s in recent] + + entropy_trend = _compute_trend(entropy_values) + reward_trend = _compute_trend(reward_values) + + latest_entropy = recent[-1].shape_entropy + + # Collapse pattern: entropy declining AND (reward stable or improving) + if entropy_trend >= 0: + return None + if latest_entropy >= self._alert_threshold: + return None + + # Determine severity + if latest_entropy < self._alert_threshold * 0.5 and reward_trend > 0: + severity = "high" + elif latest_entropy < self._alert_threshold and reward_trend >= 0: + severity = "medium" + else: + severity = "low" + + first_entropy = recent[0].shape_entropy + description = ( + f"Trajectory entropy: {latest_entropy:.2f} " + f"({'↓' if entropy_trend < 0 else '↑'} from {first_entropy:.2f} over {self._trend_window} windows). " + f"Reward: {recent[-1].mean_reward:.2f} " + f"({'↑' if reward_trend > 0 else '→'} from {recent[0].mean_reward:.2f}). " + f"Could indicate policy narrowing — high reward may reflect convergence to " + f"a single strategy rather than genuine improvement across diverse approaches." + ) + + return CollapseSignal( + entropy_trend=entropy_trend, + reward_trend=reward_trend, + severity=severity, + description=description, + ) + + def summary(self) -> str: + """Human-readable summary with 'could be' language.""" + if not self._history: + return "No entropy data recorded yet." + + latest = self._history[-1] + lines = [ + f"Shape entropy: {latest.shape_entropy:.2f}", + f"Tool entropy: {latest.tool_entropy:.2f}", + f"Reward entropy: {latest.reward_entropy:.2f}", + f"Mean reward: {latest.mean_reward:.2f}", + f"Windows recorded: {len(self._history)}", + ] + + collapse = self.detect_collapse() + if collapse: + lines.append(f"Collapse signal ({collapse.severity}): {collapse.description}") + + return "\n".join(lines) + + +def _compute_trend(values: List[float]) -> float: + """Compute simple linear regression slope over a sequence of values.""" + n = len(values) + if n < 2: + return 0.0 + x_mean = (n - 1) / 2.0 + y_mean = sum(values) / n + numerator = sum((i - x_mean) * (v - y_mean) for i, v in enumerate(values)) + denominator = sum((i - x_mean) ** 2 for i in range(n)) + if denominator == 0: + return 0.0 + return numerator / denominator diff --git a/agentlightning/emergence/novelty.py b/agentlightning/emergence/novelty.py new file mode 100644 index 000000000..3876eb4d1 --- /dev/null +++ b/agentlightning/emergence/novelty.py @@ -0,0 +1,316 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Gap 5: Novel vs. routine behavior distinction. + +Detects whether a trajectory represents novel or routine behavior by +comparing its structural *shape* (tool call sequences, branching patterns) +against a running codebook of known shapes. + +Dissolution condition: when agent-lightning's span system includes native +trajectory fingerprinting, making external shape computation redundant. +""" + +from __future__ import annotations + +import hashlib +import logging +import math +import time +from collections import deque +from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Tuple + +from pydantic import BaseModel, ConfigDict + +from agentlightning.adapter.base import TraceAdapter +from agentlightning.types import Span, Triplet + +if TYPE_CHECKING: + from agentlightning.adapter.triplet import TracerTraceToTriplet, TraceTree + +logger = logging.getLogger(__name__) + + +class TrajectoryShape(BaseModel): + """Structural skeleton of a trajectory, ignoring token content.""" + + nodes: Tuple[Tuple[str, int, int], ...] + """Sequence of (span_name, depth, child_count) tuples.""" + fingerprint: str + """Hex digest uniquely identifying this shape.""" + + model_config = ConfigDict(frozen=True) + + @classmethod + def from_trace_tree(cls, tree: "TraceTree") -> "TrajectoryShape": + """Extract shape from a trace tree via depth-first traversal.""" + nodes: List[Tuple[str, int, int]] = [] + + def _visit(node: "TraceTree", depth: int) -> None: + nodes.append((node.span.name, depth, len(node.children))) + for child in node.children: + _visit(child, depth + 1) + + _visit(tree, 0) + nodes_tuple = tuple(nodes) + fp = hashlib.sha256(str(nodes_tuple).encode()).hexdigest()[:16] + return cls(nodes=nodes_tuple, fingerprint=fp) + + +class ShapeEntry(BaseModel): + """Codebook entry tracking a known trajectory shape.""" + + shape: TrajectoryShape + count: int = 0 + first_seen: float = 0.0 + last_seen: float = 0.0 + novelty_score: float = 1.0 + + +class NoveltyScore(BaseModel): + """Result of scoring a trajectory's novelty.""" + + score: float + """0.0 (completely routine) to 1.0 (never seen).""" + nearest_shape: Optional[str] = None + """Fingerprint of the most similar known shape.""" + similarity_to_nearest: float = 0.0 + first_seen: bool = False + classification: str = "novel" + """'novel' | 'familiar' | 'routine'""" + + +def _shape_similarity(a: TrajectoryShape, b: TrajectoryShape) -> float: + """Compute structural similarity between two shapes. + + Uses set overlap of (name, depth) pairs as a fast proxy for + structural similarity. Returns 0.0-1.0. + """ + set_a = set((name, depth) for name, depth, _ in a.nodes) + set_b = set((name, depth) for name, depth, _ in b.nodes) + if not set_a and not set_b: + return 1.0 + if not set_a or not set_b: + return 0.0 + intersection = len(set_a & set_b) + union = len(set_a | set_b) + return intersection / union if union > 0 else 0.0 + + +class NoveltyDetector: + """Detect whether a trajectory represents novel or routine behavior. + + Novelty is defined structurally: a trajectory is novel if its shape + (sequence of tool calls, branching pattern, response structure) has + not been seen before. Token-level variation within a known shape is + NOT novelty -- it's exploration noise. + + The detector maintains a running codebook of known trajectory shapes. + New shapes start with high novelty scores that decay as they're seen + more frequently. + + Dissolution condition: when neural novelty detection (learned embeddings + of trajectory space) becomes efficient enough for online use. + """ + + def __init__( + self, + shape_similarity_threshold: float = 0.85, + novelty_decay_rate: float = 0.95, + max_codebook_size: int = 1000, + ): + self._similarity_threshold = shape_similarity_threshold + self._decay_rate = novelty_decay_rate + self._max_codebook_size = max_codebook_size + self._codebook: Dict[str, ShapeEntry] = {} + self._recent_classifications: deque[str] = deque(maxlen=1000) + + def compute_shape(self, tree: "TraceTree") -> TrajectoryShape: + """Extract structural shape from a trace tree.""" + return TrajectoryShape.from_trace_tree(tree) + + def _find_nearest(self, shape: TrajectoryShape) -> Tuple[Optional[str], float]: + """Find the most similar shape in the codebook.""" + best_fp: Optional[str] = None + best_sim = 0.0 + for fp, entry in self._codebook.items(): + sim = _shape_similarity(shape, entry.shape) + if sim > best_sim: + best_sim = sim + best_fp = fp + return best_fp, best_sim + + def score_novelty(self, tree: "TraceTree") -> NoveltyScore: + """Score a trajectory's novelty against the codebook. + + Updates the codebook as a side effect. + """ + shape = self.compute_shape(tree) + now = time.time() + + # Exact match + if shape.fingerprint in self._codebook: + entry = self._codebook[shape.fingerprint] + entry.count += 1 + entry.last_seen = now + entry.novelty_score *= self._decay_rate + score = entry.novelty_score + classification = "routine" if score < 0.3 else "familiar" + self._recent_classifications.append(classification) + return NoveltyScore( + score=score, + nearest_shape=shape.fingerprint, + similarity_to_nearest=1.0, + first_seen=False, + classification=classification, + ) + + # Check for similar shapes + nearest_fp, nearest_sim = self._find_nearest(shape) + + if nearest_sim >= self._similarity_threshold and nearest_fp is not None: + # Similar enough to count as a variant of a known shape + entry = self._codebook[nearest_fp] + entry.count += 1 + entry.last_seen = now + entry.novelty_score *= self._decay_rate + score = max(entry.novelty_score, 1.0 - nearest_sim) + classification = "familiar" + self._recent_classifications.append(classification) + return NoveltyScore( + score=score, + nearest_shape=nearest_fp, + similarity_to_nearest=nearest_sim, + first_seen=False, + classification=classification, + ) + + # Genuinely novel shape + if len(self._codebook) >= self._max_codebook_size: + # Evict least recently seen entry + oldest_fp = min(self._codebook, key=lambda fp: self._codebook[fp].last_seen) + del self._codebook[oldest_fp] + + self._codebook[shape.fingerprint] = ShapeEntry( + shape=shape, + count=1, + first_seen=now, + last_seen=now, + novelty_score=1.0, + ) + self._recent_classifications.append("novel") + return NoveltyScore( + score=1.0, + nearest_shape=nearest_fp, + similarity_to_nearest=nearest_sim, + first_seen=True, + classification="novel", + ) + + def get_discovery_rate(self, window_size: int = 100) -> float: + """Fraction of recent trajectories classified as 'novel'. + + Declining discovery rate is a leading indicator of exploration + collapse -- the system is no longer finding new behavioral patterns. + """ + if not self._recent_classifications: + return 0.0 + recent = list(self._recent_classifications)[-window_size:] + return sum(1 for c in recent if c == "novel") / len(recent) + + def get_codebook_summary(self) -> str: + """Summarize known trajectory shapes with 'could be' language.""" + if not self._codebook: + return "Codebook: empty (no trajectories observed yet)." + + total_observations = sum(e.count for e in self._codebook.values()) + sorted_entries = sorted(self._codebook.values(), key=lambda e: e.count, reverse=True) + + lines = [f"Codebook: {len(self._codebook)} known shapes."] + top_n = min(5, len(sorted_entries)) + lines.append(f"Top {top_n} by frequency:") + cumulative_pct = 0.0 + for i, entry in enumerate(sorted_entries[:top_n]): + pct = (entry.count / total_observations * 100) if total_observations > 0 else 0 + cumulative_pct += pct + # Represent shape as its first few span names + shape_repr = " -> ".join(n for n, _, _ in entry.shape.nodes[:4]) + if len(entry.shape.nodes) > 4: + shape_repr += " -> ..." + lines.append(f" {i + 1}. [{shape_repr}] (seen {entry.count}x, ~{pct:.0f}%)") + + discovery_rate = self.get_discovery_rate() + lines.append(f"Discovery rate (last 100): {discovery_rate:.2f}.") + if discovery_rate < 0.05 and len(self._codebook) > 10: + lines.append( + f"Could indicate behavioral convergence -- " + f"{top_n} shapes account for ~{cumulative_pct:.0f}% of all trajectories." + ) + + return "\n".join(lines) + + +class NoveltyAwareAdapter(TraceAdapter[List[Triplet]]): + """Wraps TracerTraceToTriplet to weight novel trajectories. + + Does NOT replace the base adapter. Produces the same Triplet format + with additional metadata and optional sampling weights. + + Novel high-reward trajectories get higher sampling weight. + Routine high-reward trajectories get standard weight. + Novel low-reward trajectories get standard weight (exploration is + not unconditionally good -- it needs reward context). + + Dissolution condition: when the base TracerTraceToTriplet adapter supports + configurable weighting functions, making this wrapper unnecessary. + """ + + def __init__( + self, + base_adapter: "TracerTraceToTriplet", + novelty_detector: NoveltyDetector, + novelty_weight_multiplier: float = 2.0, + ): + self._base = base_adapter + self._detector = novelty_detector + self._weight_multiplier = novelty_weight_multiplier + + def adapt(self, source: Sequence[Span], /) -> List[Triplet]: + """Adapt with novelty annotation. + + Each Triplet.metadata gets: + - 'novelty_score': float + - 'novelty_classification': str + - 'sampling_weight': float (1.0 for routine, multiplier for novel+rewarded) + """ + from agentlightning.adapter.triplet import TraceTree + + triplets = self._base.adapt(source) + if not triplets: + return triplets + + # Build the trace tree to score novelty + try: + source_normalized = list(source) + tree = TraceTree.from_spans(source_normalized) + if self._base.repair_hierarchy: + tree.repair_hierarchy() + novelty = self._detector.score_novelty(tree) + except Exception: + logger.debug("Novelty scoring failed; proceeding without enrichment.", exc_info=True) + return triplets + + enriched: List[Triplet] = [] + for triplet in triplets: + weight = 1.0 + if novelty.classification == "novel" and triplet.reward is not None and triplet.reward > 0: + weight = self._weight_multiplier + + updated_metadata = { + **triplet.metadata, + "novelty_score": novelty.score, + "novelty_classification": novelty.classification, + "sampling_weight": weight, + } + enriched.append(triplet.model_copy(update={"metadata": updated_metadata})) + + return enriched diff --git a/agentlightning/emergence/pareto.py b/agentlightning/emergence/pareto.py new file mode 100644 index 000000000..552c5d8c4 --- /dev/null +++ b/agentlightning/emergence/pareto.py @@ -0,0 +1,226 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Gap 3: Multi-objective tension support. + +Track Pareto fronts across reward dimensions, preserving the trade-off +structure that scalar reward collapse destroys. + +Dissolution condition: when agent-lightning's Triplet model natively supports +vector rewards rather than Optional[float]. +""" + +from __future__ import annotations + +import logging +import math +from collections import deque +from typing import Dict, List, Optional, Tuple + +from pydantic import BaseModel + +logger = logging.getLogger(__name__) + + +class ParetoPoint(BaseModel): + """A point in multi-dimensional reward space.""" + + rollout_id: str + values: Dict[str, float] + rank: int = 0 + """0 = Pareto-optimal, N = dominated by N front layers.""" + + +class ParetoClassification(BaseModel): + """Result of classifying a new point against the Pareto front.""" + + rank: int + dominated_by: List[str] + dominates: List[str] + tension_report: str = "" + + +def _dominates(a: Dict[str, float], b: Dict[str, float]) -> bool: + """Return True if a Pareto-dominates b (at least as good in all, strictly better in one).""" + keys = set(a.keys()) | set(b.keys()) + at_least_as_good = all(a.get(k, 0.0) >= b.get(k, 0.0) for k in keys) + strictly_better = any(a.get(k, 0.0) > b.get(k, 0.0) for k in keys) + return at_least_as_good and strictly_better + + +class ParetoTracker: + """Track Pareto fronts across reward dimensions. + + Instead of collapsing multi-dimensional rewards to a scalar, + maintains the full Pareto surface and surfaces the trade-offs + that optimization would otherwise hide. + + Dissolution condition: when all reward matching policies preserve + dimensional structure by default. + """ + + def __init__( + self, + dimensions: List[str], + primary_key: Optional[str] = None, + ): + self._dimensions = dimensions + self._primary_key = primary_key or (dimensions[0] if dimensions else None) + self._front: List[ParetoPoint] = [] + self._history: deque[ParetoPoint] = deque(maxlen=5000) + + def add_point( + self, + rollout_id: str, + values: Dict[str, float], + ) -> ParetoClassification: + """Classify a new point against the current front.""" + point = ParetoPoint(rollout_id=rollout_id, values=values) + self._history.append(point) + + dominated_by: List[str] = [] + dominates_list: List[str] = [] + + for existing in self._front: + if _dominates(existing.values, values): + dominated_by.append(existing.rollout_id) + elif _dominates(values, existing.values): + dominates_list.append(existing.rollout_id) + + # Update front + if dominates_list: + self._front = [p for p in self._front if p.rollout_id not in set(dominates_list)] + + rank = len(dominated_by) + point.rank = rank + + if rank == 0: + self._front.append(point) + + # Generate tension report + tension_map = self.get_tension_map() + tensions = [(k, v) for k, v in tension_map.items() if v < -0.3] + if tensions: + most_tense = min(tensions, key=lambda x: x[1]) + tension_report = ( + f"{most_tense[0][0]} vs {most_tense[0][1]} " + f"(\u03c1 = {most_tense[1]:.2f})" + ) + else: + tension_report = "No strong dimensional tensions detected." + + return ParetoClassification( + rank=rank, + dominated_by=dominated_by, + dominates=dominates_list, + tension_report=tension_report, + ) + + def get_front(self, rank: int = 0) -> List[ParetoPoint]: + """Get the Nth Pareto front layer.""" + if rank == 0: + return list(self._front) + + # Compute layered fronts + remaining = list(self._history) + for _ in range(rank): + front_ids = set() + for i, p in enumerate(remaining): + is_dominated = False + for j, q in enumerate(remaining): + if i != j and _dominates(q.values, p.values): + is_dominated = True + break + if not is_dominated: + front_ids.add(p.rollout_id) + remaining = [p for p in remaining if p.rollout_id not in front_ids] + + # The remaining points form the Nth front candidates + front: List[ParetoPoint] = [] + for p in remaining: + is_dominated = False + for q in remaining: + if p.rollout_id != q.rollout_id and _dominates(q.values, p.values): + is_dominated = True + break + if not is_dominated: + front.append(ParetoPoint(rollout_id=p.rollout_id, values=p.values, rank=rank)) + + return front + + def get_tension_map(self) -> Dict[Tuple[str, str], float]: + """Pairwise correlation between dimensions across all points. + + Negative correlation = structural trade-off (tension). + Positive correlation = aligned objectives (no tension). + Near-zero = independent objectives. + """ + if len(self._history) < 3: + return {} + + result: Dict[Tuple[str, str], float] = {} + for i, dim_a in enumerate(self._dimensions): + for dim_b in self._dimensions[i + 1:]: + values_a = [p.values.get(dim_a, 0.0) for p in self._history] + values_b = [p.values.get(dim_b, 0.0) for p in self._history] + corr = _pearson_correlation(values_a, values_b) + result[(dim_a, dim_b)] = corr + + return result + + def summary(self) -> str: + """Human-readable tension summary with 'could be' language.""" + if not self._history: + return "No Pareto data recorded yet." + + front_size = len(self._front) + total = len(self._history) + n_dims = len(self._dimensions) + + lines = [ + f"Pareto front: {front_size} non-dominated solutions across {n_dims} dimensions.", + f"Total observations: {total}.", + ] + + tension_map = self.get_tension_map() + tensions = sorted(tension_map.items(), key=lambda x: x[1]) + if tensions and tensions[0][1] < -0.3: + (dim_a, dim_b), corr = tensions[0] + lines.append(f"Primary tension: {dim_a} vs {dim_b} (\u03c1 = {corr:.2f}).") + + # Check which direction the front favors + front_values_a = [p.values.get(dim_a, 0.0) for p in self._front] + front_values_b = [p.values.get(dim_b, 0.0) for p in self._front] + if front_values_a and front_values_b: + mean_a = sum(front_values_a) / len(front_values_a) + mean_b = sum(front_values_b) / len(front_values_b) + if mean_a > mean_b: + lines.append( + f"Current front favors {dim_a} — {dim_b} ceiling could indicate " + f"unexplored strategies that sacrifice {dim_a} for depth." + ) + else: + lines.append( + f"Current front favors {dim_b} — {dim_a} ceiling could indicate " + f"unexplored strategies that sacrifice {dim_b} for speed." + ) + + return "\n".join(lines) + + +def _pearson_correlation(x: List[float], y: List[float]) -> float: + """Compute Pearson correlation between two lists.""" + n = len(x) + if n < 2 or n != len(y): + return 0.0 + + mean_x = sum(x) / n + mean_y = sum(y) / n + + cov = sum((xi - mean_x) * (yi - mean_y) for xi, yi in zip(x, y)) + std_x = math.sqrt(sum((xi - mean_x) ** 2 for xi in x)) + std_y = math.sqrt(sum((yi - mean_y) ** 2 for yi in y)) + + if std_x == 0 or std_y == 0: + return 0.0 + + return cov / (std_x * std_y) diff --git a/agentlightning/emergence/reward_audit.py b/agentlightning/emergence/reward_audit.py new file mode 100644 index 000000000..d1c199a17 --- /dev/null +++ b/agentlightning/emergence/reward_audit.py @@ -0,0 +1,258 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Gap 2: Reward function staleness detection. + +Detects drift between reward signals and independent success metrics. +The auditor maintains two parallel streams and checks for correlation decay. + +Dissolution condition: when reward models include built-in calibration or +uncertainty quantification, making external staleness detection redundant. +""" + +from __future__ import annotations + +import hashlib +import logging +import math +from collections import deque +from typing import Any, Callable, Dict, List, Optional, Sequence + +from agentlightning.adapter.base import TraceAdapter +from agentlightning.emitter.reward import get_reward_value, get_rewards_from_span +from agentlightning.types import Span + +from .types import AuditRecord, DistributionShiftReport, StalenessReport + +logger = logging.getLogger(__name__) + + +class RewardStalenessAuditor: + """Detect drift between reward signals and independent success metrics. + + The auditor maintains two parallel streams: + 1. Reward values from emit_reward() -- the optimization signal + 2. Independent success measurements -- ground truth checks + + When these diverge beyond a threshold, the reward function may be stale. + + This requires the developer to define independent_check() -- a function + that evaluates task success without using the reward function. The auditor + cannot detect staleness without a reference signal. + """ + + def __init__( + self, + audit_frequency: int = 50, + divergence_threshold: float = 0.2, + window_size: int = 100, + ): + self._audit_frequency = audit_frequency + self._divergence_threshold = divergence_threshold + self._window_size = window_size + self._reward_history: deque[tuple[str, float]] = deque(maxlen=window_size) + self._success_history: deque[tuple[str, float]] = deque(maxlen=window_size) + self._historical_rewards: deque[float] = deque(maxlen=window_size * 2) + self._observation_count = 0 + + def record_reward(self, reward: float, rollout_id: str) -> None: + """Record emitted reward for audit comparison.""" + self._reward_history.append((rollout_id, reward)) + self._historical_rewards.append(reward) + self._observation_count += 1 + + def record_independent_check(self, success: float, rollout_id: str) -> None: + """Record independent success measurement. + + This is the critical input that the developer must provide. + Without it, staleness cannot be detected -- only reward distribution + changes (which could be genuine improvement). + """ + self._success_history.append((rollout_id, success)) + + def audit(self) -> Optional[StalenessReport]: + """Run staleness check if enough data has accumulated. + + Computes rank correlation between reward and independent success. + If correlation drops below threshold, returns StalenessReport. + """ + if self._observation_count % self._audit_frequency != 0: + return None + + # Match rollouts that have both reward and success + reward_map = {rid: val for rid, val in self._reward_history} + success_map = {rid: val for rid, val in self._success_history} + common_ids = sorted(set(reward_map.keys()) & set(success_map.keys())) + + if len(common_ids) < 5: + return None + + rewards = [reward_map[rid] for rid in common_ids] + successes = [success_map[rid] for rid in common_ids] + correlation = _spearman_rank_correlation(rewards, successes) + + if correlation >= (1.0 - self._divergence_threshold): + return None + + severity = "advisory" + if correlation < 0.5: + severity = "critical" + elif correlation < 0.7: + severity = "warning" + + return StalenessReport( + rank_correlation=correlation, + window_size=len(common_ids), + severity=severity, + description=( + f"Reward-success correlation: {correlation:.2f} over {len(common_ids)} rollouts. " + f"Could indicate reward function drift — the optimization signal may no longer " + f"reflect actual task success. Independent validation recommended." + ), + ) + + def get_distribution_shift(self) -> Optional[DistributionShiftReport]: + """Detect reward distribution changes even without independent checks. + + Uses KL divergence between recent and historical reward distributions. + This is weaker than correlation-based staleness detection -- distribution + shift could be genuine improvement -- but requires no independent signal. + """ + if len(self._historical_rewards) < self._window_size: + return None + + all_rewards = list(self._historical_rewards) + midpoint = len(all_rewards) // 2 + historical = all_rewards[:midpoint] + recent = all_rewards[midpoint:] + + if not historical or not recent: + return None + + kl_div = _kl_divergence_binned(historical, recent) + + if kl_div < 0.1: + return None + + return DistributionShiftReport( + kl_divergence=kl_div, + window_size=len(recent), + description=( + f"Reward distribution has shifted (KL divergence: {kl_div:.2f}). " + f"Could indicate reward hacking, environment change, or genuine " + f"capability improvement. Independent validation recommended." + ), + ) + + +class RewardAuditAdapter(TraceAdapter[List[AuditRecord]]): + """Adapter that processes traces for reward audit rather than training. + + Implements the same TraceAdapter[T_to] interface as TracerTraceToTriplet, + but produces audit records instead of training triplets. + + Can run alongside the training adapter without interference. + + Dissolution condition: when audit data flows through the primary adapter + pipeline rather than requiring a separate adapter. + """ + + def adapt(self, source: Sequence[Span], /) -> List[AuditRecord]: + """Extract reward spans and pair with task metadata for audit.""" + records: List[AuditRecord] = [] + + for span in source: + reward_value = get_reward_value(span) + if reward_value is None: + continue + + # Extract dimensional rewards + reward_dimensions: Dict[str, float] = {} + reward_list = get_rewards_from_span(span) + for r in reward_list: + reward_dimensions[r.name] = r.value + + # Hash task input for grouping + task_input_hash = hashlib.sha256( + f"{span.rollout_id}".encode() + ).hexdigest()[:12] + + records.append( + AuditRecord( + rollout_id=span.rollout_id, + attempt_id=span.attempt_id, + reward_value=reward_value, + reward_dimensions=reward_dimensions, + task_input_hash=task_input_hash, + timestamp=span.start_time, + ) + ) + + return records + + +def _rank(values: List[float]) -> List[float]: + """Convert values to ranks (average rank for ties).""" + n = len(values) + indexed = sorted(enumerate(values), key=lambda x: x[1]) + ranks = [0.0] * n + i = 0 + while i < n: + j = i + while j < n - 1 and indexed[j + 1][1] == indexed[i][1]: + j += 1 + avg_rank = (i + j) / 2.0 + 1 + for k in range(i, j + 1): + ranks[indexed[k][0]] = avg_rank + i = j + 1 + return ranks + + +def _spearman_rank_correlation(x: List[float], y: List[float]) -> float: + """Compute Spearman rank correlation between two lists.""" + n = len(x) + if n < 2: + return 0.0 + + rx = _rank(x) + ry = _rank(y) + + mean_rx = sum(rx) / n + mean_ry = sum(ry) / n + + cov = sum((rxi - mean_rx) * (ryi - mean_ry) for rxi, ryi in zip(rx, ry)) + std_rx = math.sqrt(sum((rxi - mean_rx) ** 2 for rxi in rx)) + std_ry = math.sqrt(sum((ryi - mean_ry) ** 2 for ryi in ry)) + + if std_rx == 0 or std_ry == 0: + return 0.0 + + return cov / (std_rx * std_ry) + + +def _kl_divergence_binned(p_samples: List[float], q_samples: List[float], n_bins: int = 20) -> float: + """Approximate KL divergence using binned distributions.""" + all_samples = p_samples + q_samples + min_val = min(all_samples) + max_val = max(all_samples) + if max_val == min_val: + return 0.0 + + bin_width = (max_val - min_val) / n_bins + epsilon = 1e-10 + + def _bin_counts(samples: List[float]) -> List[float]: + counts = [0.0] * n_bins + for s in samples: + idx = min(int((s - min_val) / bin_width), n_bins - 1) + counts[idx] += 1.0 + total = sum(counts) + return [(c + epsilon) / (total + n_bins * epsilon) for c in counts] + + p_dist = _bin_counts(p_samples) + q_dist = _bin_counts(q_samples) + + kl = 0.0 + for p, q in zip(p_dist, q_dist): + if p > 0: + kl += p * math.log(p / q) + return max(0.0, kl) diff --git a/agentlightning/emergence/semconv.py b/agentlightning/emergence/semconv.py new file mode 100644 index 000000000..c88a12325 --- /dev/null +++ b/agentlightning/emergence/semconv.py @@ -0,0 +1,21 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Semantic conventions for open emergence span attributes.""" + +from enum import Enum + + +class EmergenceSpanAttributes(Enum): + """Attributes for open emergence monitoring. + + These extend LightningSpanAttributes without modifying it. + """ + + ENTROPY_SHAPE = "agentlightning.emergence.entropy.shape" + ENTROPY_TOOL = "agentlightning.emergence.entropy.tool" + ENTROPY_REWARD = "agentlightning.emergence.entropy.reward" + COLLAPSE_SEVERITY = "agentlightning.emergence.collapse.severity" + COLLAPSE_DESCRIPTION = "agentlightning.emergence.collapse.description" + NOVELTY_SCORE = "agentlightning.emergence.novelty.score" + NOVELTY_CLASSIFICATION = "agentlightning.emergence.novelty.classification" + NOVELTY_NEAREST_SHAPE = "agentlightning.emergence.novelty.nearest_shape" diff --git a/agentlightning/emergence/types.py b/agentlightning/emergence/types.py new file mode 100644 index 000000000..2a5b6940f --- /dev/null +++ b/agentlightning/emergence/types.py @@ -0,0 +1,68 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Shared types for the open emergence module.""" + +from __future__ import annotations + +from enum import Enum +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + + +class AuditRecord(BaseModel): + """Record produced by RewardAuditAdapter for staleness analysis.""" + + rollout_id: str + attempt_id: str + reward_value: Optional[float] = None + reward_dimensions: Dict[str, float] = Field(default_factory=dict) + task_input_hash: str = "" + timestamp: Optional[float] = None + + +class StalenessReport(BaseModel): + """Report from reward staleness detection.""" + + rank_correlation: float + """Spearman rank correlation between reward and independent success.""" + window_size: int + severity: str = "advisory" + """'advisory' | 'warning' | 'critical'""" + description: str = "" + + +class DistributionShiftReport(BaseModel): + """Report from reward distribution shift detection.""" + + kl_divergence: float + window_size: int + description: str = "" + + +class ConditionResult(BaseModel): + """Result of evaluating a single validity condition.""" + + condition_name: str + passed: bool + value: Optional[float] = None + threshold: Optional[float] = None + description: str = "" + + +class DissolutionSignal(BaseModel): + """Signal emitted when a dissolution condition fires.""" + + trigger: str + severity: str = "advisory" + """'advisory' | 'warning' | 'critical'""" + recommendation: str = "" + + +class DissolutionAction(BaseModel): + """Action taken when dissolution is executed.""" + + resources_id: str + policy: str + action_taken: str + description: str = "" diff --git a/docs/open-emergence-design.md b/docs/open-emergence-design.md new file mode 100644 index 000000000..c4955f119 --- /dev/null +++ b/docs/open-emergence-design.md @@ -0,0 +1,1149 @@ +# Open Emergence: An Opposing Worldview for Agent Training + +> Design document for [Issue #490: Exploration Collapse](https://github.com/microsoft/agent-lightning/issues/490) +> +> Cross-reference: Dissolution conditions for every structural decision in this document +> are collected in the final section. + +--- + +## The Problem This Document Addresses + +Agent-lightning implements a **closed optimization loop**: runners emit spans, adapters +compress them into triplets, algorithms consume triplets to produce better resources, +and those resources flow back to runners. Every component exists to tighten this cycle. + +This is the system's greatest strength. It is also the architectural root of exploration +collapse. + +The tighter the loop, the faster policies converge. The faster policies converge, the +narrower the behavioral distribution. The narrower the distribution, the less capacity +the system retains for discovering behaviors outside the reward function's scope. Agents +that are better by any metric are also more predictable. + +This document proposes five engineering interventions — not to replace optimization, but +to hold it in tension with open-ended exploration. The goal is not a system that explores +*instead of* optimizing, but one that can do both simultaneously without collapsing either. + +--- + +## The Core Tension (Hold It, Don't Resolve It) + +Optimization asks: "How do I get more of what I already know is good?" +Exploration asks: "What exists that I don't yet know how to value?" + +These are not complementary. They are structurally contradictory. Optimization narrows; +exploration widens. Any system that claims to do both is either oscillating between them +(which works but wastes cycles) or has found a way to hold both simultaneously (which is +what this document attempts). + +The architectural pattern: **every intervention below creates a tension that the system +cannot automatically resolve.** Resolution requires judgment — human or agent — applied +in context. The interventions surface the *need* for judgment; they do not substitute +for it. + +--- + +## Design Principles + +### 1. Non-Invasive Integration + +Every intervention plugs into agent-lightning's existing architecture through documented +interfaces. No core classes are modified. The system should work identically when emergence +modules are absent — they are additive, not replacement. + +**Concretely:** New adapters implement `TraceAdapter[T_to]`. New span attributes extend +`LightningSpanAttributes`. New store behaviors wrap `LightningStore` via delegation. New +reward policies extend `RewardMatchPolicy`. No monkey-patching, no subclass overrides of +internal methods. + +### 2. "Could Be" Language + +All interpretive output from emergence modules uses hedged language: "could indicate," +"~X%," "at current rates." This prevents the emergence layer from becoming another +optimization signal that the system collapses into certainty. + +### 3. Best-Effort Enrichment + +All emergence computations are best-effort. If entropy calculation fails, the system +proceeds without it. If novelty detection times out, the span ships without a novelty +score. No emergence feature blocks the core training loop. + +**Pattern:** `Promise.allSettled()` (or Python equivalent: `asyncio.gather(*tasks, return_exceptions=True)`) wrapping all enrichment calls. + +### 4. Dissolution Conditions + +Every structural decision in this document carries an explicit condition under which it +should be removed. See the final section. + +--- + +## Gap 1: Exploration Decay Monitoring + +### What's Missing + +Agent-lightning has no metrics to distinguish genuine behavioral improvement from policy +narrowing. A reward curve that goes up and to the right looks identical whether the agent +is discovering diverse high-quality strategies or converging to a single high-reward +behavior repeated with minor variations. + +### Where It Lands + +**Primary integration point:** The `TraceTree` class in `agentlightning/adapter/triplet.py`. + +The trace tree already builds a hierarchical representation of agent behavior via +`TraceTree.from_spans()`. It traverses depth-first via `traverse()`, detects LLM calls +via `find_llm_calls()`, and converts to trajectories via `to_trajectory()`. This is the +natural place to compute behavioral entropy — the tree structure is already there. + +**Secondary integration point:** New span attributes in `agentlightning/semconv.py`. + +### Proposed Architecture + +``` +agentlightning/ + emergence/ + entropy.py # Trajectory entropy computation + monitoring.py # Sliding window decay detection +``` + +#### `entropy.py`: Trajectory Entropy + +```python +class TrajectoryEntropy: + """Compute behavioral diversity metrics from trace trees. + + Entropy is computed over the distribution of trajectory *shapes* + (tool call sequences, branching patterns) rather than trajectory + *content* (specific tokens). This distinguishes structural diversity + from surface-level variation. + """ + + def compute_shape_entropy( + self, + trees: Sequence[TraceTree], + window_size: int = 50, + ) -> float: + """H(shape distribution) over recent trajectories. + + Shape = tuple of (span.name, depth) for each node in traversal order. + High entropy: agents take structurally different paths. + Low entropy: agents follow the same structural pattern. + """ + + def compute_tool_entropy( + self, + trees: Sequence[TraceTree], + window_size: int = 50, + ) -> float: + """H(tool call distribution) over recent trajectories. + + Measures whether agents use diverse tools or converge on + a narrow subset. Computed from tool call spans matched via + the same llm_call_match regex used by TracerTraceToTriplet. + """ + + def compute_reward_entropy( + self, + triplets: Sequence[Triplet], + n_bins: int = 20, + window_size: int = 50, + ) -> float: + """H(reward distribution) over recent triplets. + + High reward entropy: outcomes are spread across the reward range. + Low reward entropy: outcomes cluster at one value (convergence). + Note: low entropy + high mean reward could be genuine mastery + OR premature convergence. This metric alone cannot distinguish. + """ +``` + +#### `monitoring.py`: Decay Detection + +```python +class ExplorationDecayMonitor: + """Track entropy over sliding windows and detect collapse. + + Collapse detection: when entropy drops below threshold AND reward + is still improving, the system may be narrowing rather than improving. + + This is a signal, not a conclusion. The monitor surfaces the tension; + it does not resolve it. + """ + + def __init__( + self, + window_size: int = 50, + alert_threshold: float = 0.3, # Entropy below this triggers alert + trend_window: int = 5, # Number of windows to compute trend + ): + self._history: deque[EntropySnapshot] = deque(maxlen=1000) + + def record(self, trees: Sequence[TraceTree], triplets: Sequence[Triplet]) -> EntropySnapshot: + """Compute and store entropy snapshot for current window.""" + + def detect_collapse(self) -> Optional[CollapseSignal]: + """Check if entropy is declining while reward is stable/improving. + + Returns CollapseSignal with: + - entropy_trend: slope of entropy over trend_window + - reward_trend: slope of reward over trend_window + - severity: "low" | "medium" | "high" + - description: hedged interpretation ("could indicate...") + + Returns None if no collapse pattern detected. + """ + + def summary(self) -> str: + """Human-readable summary with 'could be' language. + + Example: + 'Trajectory entropy: 0.42 (↓ from 0.71 over 5 windows). + Reward: 0.85 (↑ from 0.72). Could indicate policy narrowing — + high reward may reflect convergence to a single strategy rather + than genuine improvement across diverse approaches.' + """ +``` + +#### Span Attribute Extensions + +New semantic conventions in `semconv.py`: + +```python +class EmergenceSpanAttributes(Enum): + """Attributes for open emergence monitoring.""" + + ENTROPY_SHAPE = "agentlightning.emergence.entropy.shape" + ENTROPY_TOOL = "agentlightning.emergence.entropy.tool" + ENTROPY_REWARD = "agentlightning.emergence.entropy.reward" + COLLAPSE_SEVERITY = "agentlightning.emergence.collapse.severity" + COLLAPSE_DESCRIPTION = "agentlightning.emergence.collapse.description" +``` + +### Integration with Existing Components + +The `ExplorationDecayMonitor` integrates at two points: + +1. **Algorithm.run()**: After each evaluation round, the algorithm can optionally + query the monitor. This is advisory — the algorithm decides whether to act on + collapse signals. For APO, this could trigger beam width expansion. For VERL, + this could increase the KL divergence penalty weight. + +2. **Dashboard**: Entropy metrics surface as time-series alongside reward curves, + giving operators visual detection of narrowing. + +The monitor does **not** automatically modify training behavior. It surfaces information. +The decision to act is human (or algorithm-specific logic that the developer writes). + +--- + +## Gap 2: Reward Function Staleness Detection + +### What's Missing + +Agent-lightning's reward system (`emit_reward()` in `emitter/reward.py`) assumes that +once a reward function is defined, it remains valid. There is no mechanism to detect when +the relationship between reward signal and actual task success has drifted — due to +environment changes, API updates, distribution shifts, or reward hacking. + +### Where It Lands + +**Primary integration point:** Between `emit_reward()` and the store. The staleness +detector wraps reward emission to accumulate comparison data. + +**Secondary integration point:** A new `RewardAuditAdapter` that sits alongside +`TracerTraceToTriplet` and `TraceToMessages`, implementing the same `TraceAdapter` +interface but producing audit reports instead of training data. + +### Proposed Architecture + +``` +agentlightning/ + emergence/ + reward_audit.py # Staleness detection and audit reporting +``` + +#### `reward_audit.py`: Staleness Detection + +```python +class RewardStalenessAuditor: + """Detect drift between reward signals and independent success metrics. + + The auditor maintains two parallel streams: + 1. Reward values from emit_reward() — the optimization signal + 2. Independent success measurements — ground truth checks + + When these diverge beyond a threshold, the reward function may be stale. + + This requires the developer to define independent_check() — a function + that evaluates task success without using the reward function. The auditor + cannot detect staleness without a reference signal. + """ + + def __init__( + self, + audit_frequency: int = 50, # Audit every N rollouts + divergence_threshold: float = 0.2, # Spearman rank correlation drop + window_size: int = 100, + ): + self._reward_history: deque[float] = deque(maxlen=window_size) + self._success_history: deque[float] = deque(maxlen=window_size) + + def record_reward(self, reward: float, rollout_id: str) -> None: + """Record emitted reward for audit comparison.""" + + def record_independent_check(self, success: float, rollout_id: str) -> None: + """Record independent success measurement. + + This is the critical input that the developer must provide. + Without it, staleness cannot be detected — only reward distribution + changes (which could be genuine improvement). + """ + + def audit(self) -> Optional[StalenessReport]: + """Run staleness check if enough data has accumulated. + + Computes rank correlation between reward and independent success. + If correlation drops below threshold, returns StalenessReport. + + Returns None if insufficient data or no staleness detected. + """ + + def get_distribution_shift(self) -> Optional[DistributionShiftReport]: + """Detect reward distribution changes even without independent checks. + + Uses KL divergence between recent and historical reward distributions. + This is weaker than correlation-based staleness detection — distribution + shift could be genuine improvement — but requires no independent signal. + + Output uses 'could be' language: + 'Reward distribution has shifted (KL divergence: 0.34). Could indicate + reward hacking, environment change, or genuine capability improvement. + Independent validation recommended.' + """ +``` + +#### `RewardAuditAdapter`: Audit-Oriented Trace Processing + +```python +class RewardAuditAdapter(TraceAdapter[List[AuditRecord]]): + """Adapter that processes traces for reward audit rather than training. + + Implements the same TraceAdapter[T_to] interface as TracerTraceToTriplet, + but produces audit records instead of training triplets. + + Can run alongside the training adapter without interference. + """ + + def adapt(self, source: Sequence[Span]) -> List[AuditRecord]: + """Extract reward spans and pair with task metadata for audit. + + Returns AuditRecord containing: + - rollout_id, attempt_id + - emitted reward value + - task input hash (for grouping by task type) + - span timestamp (for temporal analysis) + - reward dimension breakdown (if multi-dimensional) + """ +``` + +### Integration Pattern + +The auditor is **opt-in** and runs as a side-channel: + +```python +trainer = Trainer( + algorithm=APO(...), + # ... standard config ... +) + +# Opt-in: attach reward auditor +auditor = RewardStalenessAuditor(audit_frequency=50) +trainer.attach_auditor(auditor) # New method on Trainer + +# Developer provides independent check +@auditor.independent_check +async def check_task_success(rollout: AttemptedRollout) -> float: + """Developer-defined ground truth evaluation.""" + # e.g., actually verify the agent's output against known-good answers + ... +``` + +The auditor **never modifies** the training loop. It produces reports that surface in +the dashboard and optionally trigger alerts. + +--- + +## Gap 3: Multi-Objective Tension Support + +### What's Missing + +Agent-lightning forces competing objectives into single scalar values. The `Triplet` +model (`agentlightning/types/core.py`) has `reward: Optional[float]` — a single number. +The reward matching policies (`FIRST_OCCURRENCE`, `FIRST_SIBLING`) each produce one +float per LLM call. Multi-dimensional rewards exist in emission (`emit_reward()` accepts +`Dict[str, float]`) but are collapsed to a primary key before reaching the algorithm. + +This collapse destroys the Pareto structure of multi-objective problems. When "speed" +and "thoroughness" compete, averaging them produces mediocre agents that are neither +fast nor thorough. The trade-off is invisible to the algorithm. + +### Where It Lands + +**Primary integration point:** The `Triplet` class and `TraceTree.to_trajectory()`. + +**Secondary integration point:** A new `RewardMatchPolicy.PARETO_FRONT` that preserves +dimensional tension instead of collapsing to scalar. + +### Proposed Architecture + +``` +agentlightning/ + emergence/ + pareto.py # Pareto front tracking and tension preservation +``` + +#### Extending the Triplet Model + +The existing `Triplet` already has `metadata: Dict[str, Any]`. Rather than modifying +the core class (which would break existing algorithms), dimensional rewards flow through +metadata: + +```python +# In Triplet.metadata: +{ + "reward_dimensions": { + "speed": 0.9, + "thoroughness": 0.3, + "novelty": 0.7, + }, + "pareto_rank": 2, # 0 = Pareto-optimal + "dominated_by": ["rollout_abc"], + "tension": "speed vs thoroughness (ρ = -0.72)", +} +``` + +#### `pareto.py`: Pareto Front Tracking + +```python +class ParetoTracker: + """Track Pareto fronts across reward dimensions. + + Instead of collapsing multi-dimensional rewards to a scalar, + maintains the full Pareto surface and surfaces the trade-offs + that optimization would otherwise hide. + """ + + def __init__( + self, + dimensions: List[str], # e.g., ["speed", "thoroughness", "novelty"] + primary_key: Optional[str] = None, # For backward compatibility + ): + self._front: List[ParetoPoint] = [] + self._history: deque[ParetoPoint] = deque(maxlen=5000) + + def add_point( + self, + rollout_id: str, + values: Dict[str, float], + ) -> ParetoClassification: + """Classify a new point against the current front. + + Returns: + - rank: 0 if Pareto-optimal, N if dominated by N front layers + - dominated_by: list of rollout_ids that dominate this point + - dominates: list of rollout_ids this point displaces from front + - tension_report: which dimensions are in trade-off + """ + + def get_front(self, rank: int = 0) -> List[ParetoPoint]: + """Get the Nth Pareto front layer.""" + + def get_tension_map(self) -> Dict[Tuple[str, str], float]: + """Pairwise correlation between dimensions across all points. + + Negative correlation = structural trade-off (tension). + Positive correlation = aligned objectives (no tension). + Near-zero = independent objectives. + + Example output: + { + ("speed", "thoroughness"): -0.72, # Strong tension + ("speed", "novelty"): 0.15, # Independent + ("thoroughness", "novelty"): 0.41, # Mildly aligned + } + """ + + def summary(self) -> str: + """Human-readable tension summary. + + Example: + 'Pareto front: 12 non-dominated solutions across 3 dimensions. + Primary tension: speed vs thoroughness (ρ = -0.72). + Current front favors speed — thoroughness ceiling could indicate + unexplored strategies that sacrifice speed for depth.' + """ +``` + +#### New Reward Match Policy + +Extend `RewardMatchPolicy` enum: + +```python +class RewardMatchPolicy(str, Enum): + FIRST_SIBLING = "first_sibling" + FIRST_OCCURRENCE = "first_occurrence" + + # New: preserve dimensional structure + DIMENSIONAL = "dimensional" + """Preserve all reward dimensions in Triplet.metadata instead of + collapsing to primary_key scalar. The scalar Triplet.reward field + gets the primary_key value for backward compatibility; full dimensions + are in metadata['reward_dimensions']. + + Algorithms that understand multi-objective can read metadata. + Algorithms that don't can ignore it and use the scalar as before. + """ +``` + +### Integration with Algorithms + +**APO**: The textual gradient computation in `textual_gradient_and_apply_edit()` could +receive a `tension_report` alongside the rollout results, allowing the LLM-based critic +to reason about trade-offs explicitly: "This prompt improved speed but sacrificed +thoroughness. The Pareto front suggests this trade-off is steep — consider prompts that +maintain thoroughness while recovering speed through structural efficiency." + +**VERL**: The PPO reward can optionally receive Pareto rank as a supplementary signal, +penalizing solutions that are dominated on multiple fronts rather than just low on the +scalar reward. + +Both integrations are **opt-in** and backward-compatible. Existing algorithms that read +only `Triplet.reward` work unchanged. + +--- + +## Gap 4: Policy Dissolution Mechanism + +### What's Missing + +Agent-lightning's resource versioning (`ResourcesUpdate` in `types/resources.py`) is +monotonically increasing. Version 5 strictly replaces version 4. Resources accumulate +permanently — there is no unlearning, no expiry, no re-validation. + +This means trained behaviors persist even when: +- The environment has changed (API updates, new tool availability) +- The reward function has drifted (Gap 2) +- The behavior was optimal for a specific context that no longer applies +- The behavior was a local optimum that blocks discovery of better strategies + +### Where It Lands + +**Primary integration point:** `ResourcesUpdate` in `types/resources.py` and the store's +`add_resources()` / `get_latest_resources()` / `update_resources()` methods. + +**Secondary integration point:** The `Algorithm.run()` loop, which currently fetches +the latest resource version unconditionally. + +### Proposed Architecture + +``` +agentlightning/ + emergence/ + dissolution.py # TTL metadata, validity conditions, re-validation +``` + +#### Resource Dissolution Metadata + +Extend `ResourcesUpdate` metadata (not the class itself — use the existing +`metadata` pattern): + +```python +class DissolutionMetadata(BaseModel): + """Metadata attached to resource versions for dissolution tracking. + + Stored in ResourcesUpdate's metadata dict under the key + 'agentlightning.emergence.dissolution'. + """ + + # Temporal dissolution + ttl_seconds: Optional[int] = None + """Time-to-live. After this duration, the resource should be + re-validated before use. None = no temporal expiry.""" + + created_at: float + """Timestamp when this resource version was created.""" + + # Conditional dissolution + validity_conditions: List[ValidityCondition] = [] + """Conditions that must remain true for this resource to be valid. + When any condition fails, the resource should be re-validated.""" + + # Audit trail + validation_history: List[ValidationRecord] = [] + """Record of re-validation attempts and results.""" + + # Dissolution policy + on_dissolution: DissolutionPolicy = DissolutionPolicy.REVALIDATE + """What to do when dissolution triggers: + REVALIDATE: re-run validation, keep if still good + REGRESS: fall back to previous version + EXPLORE: switch to exploration mode (no resource pinning) + """ + +class ValidityCondition(BaseModel): + """A condition that must remain true for a resource to be valid.""" + + name: str + """Human-readable condition name.""" + + description: str + """What this condition checks.""" + + check_type: Literal["reward_threshold", "entropy_threshold", "custom"] + """Type of validity check.""" + + parameters: Dict[str, Any] = {} + """Parameters for the check (threshold values, etc.).""" + +class DissolutionPolicy(str, Enum): + REVALIDATE = "revalidate" + REGRESS = "regress" + EXPLORE = "explore" +``` + +#### `dissolution.py`: Dissolution Engine + +```python +class DissolutionEngine: + """Manages resource lifecycle with TTL, validity conditions, and re-validation. + + Wraps a LightningStore to intercept resource retrieval and check + dissolution conditions before returning resources. + """ + + def __init__( + self, + store: LightningStore, + default_ttl: Optional[int] = None, + check_interval: int = 10, # Check every N rollouts + ): + self._store = store + self._dissolution_cache: Dict[str, DissolutionMetadata] = {} + + async def get_resources_with_dissolution_check( + self, + resources_id: Optional[str] = None, + ) -> Tuple[ResourcesUpdate, Optional[DissolutionSignal]]: + """Fetch resources, checking dissolution conditions. + + Returns the resources AND any dissolution signal. The caller + decides what to do — the engine does not block resource access. + + DissolutionSignal contains: + - trigger: which condition fired ("ttl_expired", "reward_below_threshold", ...) + - severity: "advisory" | "warning" | "critical" + - recommendation: hedged text ("Resource version 5 has been active for 48h. + Could indicate the environment has changed since training. Consider + re-validation.") + """ + + async def attach_dissolution_metadata( + self, + resources_id: str, + ttl_seconds: Optional[int] = None, + validity_conditions: Optional[List[ValidityCondition]] = None, + policy: DissolutionPolicy = DissolutionPolicy.REVALIDATE, + ) -> None: + """Attach dissolution metadata to a resource version.""" + + async def check_conditions( + self, + resources_id: str, + ) -> List[ConditionResult]: + """Evaluate all validity conditions for a resource version. + + Returns per-condition results. Failed conditions are signals, + not automatic actions. + """ + + async def dissolve( + self, + resources_id: str, + trigger: str, + ) -> DissolutionAction: + """Execute dissolution policy for a resource version. + + REVALIDATE: re-run validation rollouts, keep or discard + REGRESS: find previous version, mark current as dissolved + EXPLORE: clear resource pinning, let runners use no resource + + Returns DissolutionAction describing what was done. + """ +``` + +### Integration with Training Loop + +The dissolution engine sits between the algorithm and the store: + +```python +# In Algorithm.run() or a custom algorithm: +engine = DissolutionEngine(store=self.get_store(), default_ttl=3600) + +# Before evaluation round: +resources, signal = await engine.get_resources_with_dissolution_check() +if signal and signal.severity == "critical": + logger.warning(f"Dissolution signal: {signal.recommendation}") + # Developer decides: re-validate, regress, or continue +``` + +The engine **never** automatically removes resources. It surfaces dissolution signals. +The algorithm (or human operator) decides whether to act. + +--- + +## Gap 5: Novel vs. Routine Behavior Distinction + +### What's Missing + +Agent-lightning's span system captures what happened but not whether it's new. A span +with `name="openai.chat.completion"` and high reward could represent: + +- A genuinely novel strategy the agent has never tried before +- The 500th repetition of a known-good strategy with minor token variation + +These look identical in the trace tree. The `TracerTraceToTriplet` adapter treats them +identically when building training triplets. The algorithm optimizes both the same way. + +This means novel discoveries get overwhelmed by routine high-reward trajectories in the +training data. At scale (128-GPU distributed training), novel behaviors are statistical +noise in a sea of routine exploitation. + +### Where It Lands + +**Primary integration point:** New span attributes via `EmergenceSpanAttributes` and a +`NoveltyDetector` that annotates spans before they reach the adapter. + +**Secondary integration point:** A `NoveltyAwareAdapter` that wraps `TracerTraceToTriplet` +to weight novel trajectories differently. + +### Proposed Architecture + +``` +agentlightning/ + emergence/ + novelty.py # Novelty detection and annotation +``` + +#### `novelty.py`: Novelty Detection + +```python +class NoveltyDetector: + """Detect whether a trajectory represents novel or routine behavior. + + Novelty is defined structurally: a trajectory is novel if its shape + (sequence of tool calls, branching pattern, response structure) has + not been seen before. Token-level variation within a known shape is + NOT novelty — it's exploration noise. + + The detector maintains a running codebook of known trajectory shapes. + New shapes start with high novelty scores that decay as they're seen + more frequently. + """ + + def __init__( + self, + shape_similarity_threshold: float = 0.85, + novelty_decay_rate: float = 0.95, # Per-observation decay + max_codebook_size: int = 1000, + ): + self._codebook: Dict[str, ShapeEntry] = {} + + def compute_shape(self, tree: TraceTree) -> TrajectoryShape: + """Extract structural shape from a trace tree. + + Shape = sequence of (span_name, depth, child_count) tuples. + Ignores token content, timestamps, and specific attribute values. + Captures the structural skeleton of what the agent did. + """ + + def score_novelty(self, tree: TraceTree) -> NoveltyScore: + """Score a trajectory's novelty against the codebook. + + Returns NoveltyScore with: + - score: 0.0 (completely routine) to 1.0 (never seen) + - nearest_shape: most similar known shape (if any) + - similarity_to_nearest: cosine similarity of shapes + - first_seen: whether this exact shape is being recorded for first time + - classification: "novel" | "familiar" | "routine" + """ + + def annotate_spans( + self, + spans: Sequence[Span], + tree: TraceTree, + score: NoveltyScore, + ) -> Sequence[Span]: + """Add novelty attributes to spans. + + Adds to root span: + - agentlightning.emergence.novelty.score: float + - agentlightning.emergence.novelty.classification: str + - agentlightning.emergence.novelty.nearest_shape: str + """ + + def get_discovery_rate(self, window_size: int = 100) -> float: + """Fraction of recent trajectories classified as 'novel'. + + Declining discovery rate is a leading indicator of exploration + collapse — the system is no longer finding new behavioral patterns. + """ + + def get_codebook_summary(self) -> str: + """Summarize known trajectory shapes. + + Example: + 'Codebook: 47 known shapes. Top 5 by frequency: + 1. [search → analyze → respond] (seen 234×, 45% of trajectories) + 2. [search → search → analyze → respond] (seen 89×, 17%) + 3. [respond directly] (seen 67×, 13%) + ... + Discovery rate (last 100): 0.03 (declining from 0.12 over 5 windows). + Could indicate behavioral convergence — 3 shapes account for 75% + of all trajectories.' + """ +``` + +#### `NoveltyAwareAdapter`: Weighted Training Data + +```python +class NoveltyAwareAdapter(TraceAdapter[List[Triplet]]): + """Wraps TracerTraceToTriplet to weight novel trajectories. + + Does NOT replace the base adapter. Produces the same Triplet format + with additional metadata and optional sampling weights. + + Novel high-reward trajectories get higher sampling weight. + Routine high-reward trajectories get standard weight. + Novel low-reward trajectories get standard weight (exploration is + not unconditionally good — it needs reward context). + """ + + def __init__( + self, + base_adapter: TracerTraceToTriplet, + novelty_detector: NoveltyDetector, + novelty_weight_multiplier: float = 2.0, + ): + self._base = base_adapter + self._detector = novelty_detector + self._weight_multiplier = novelty_weight_multiplier + + def adapt(self, source: Sequence[Span]) -> List[Triplet]: + """Adapt with novelty annotation. + + Each Triplet.metadata gets: + - 'novelty_score': float + - 'novelty_classification': str + - 'sampling_weight': float (1.0 for routine, multiplier for novel+rewarded) + """ +``` + +### Integration with Algorithms + +**APO**: When computing textual gradients, novel high-reward trajectories can be +highlighted in the prompt to the critic LLM: "This trajectory used a novel approach +(first seen) and achieved high reward. Consider what about this approach is transferable." + +**VERL**: Sampling weights from `NoveltyAwareAdapter` directly influence which trajectories +appear more frequently in PPO training batches, giving novel discoveries more gradient +signal without overriding the reward. + +--- + +## The Integrated System + +### Module Structure + +``` +agentlightning/ + emergence/ + __init__.py + entropy.py # Gap 1: Exploration decay monitoring + monitoring.py # Gap 1: Sliding window collapse detection + reward_audit.py # Gap 2: Reward staleness detection + pareto.py # Gap 3: Multi-objective tension tracking + dissolution.py # Gap 4: Resource TTL and re-validation + novelty.py # Gap 5: Novel vs routine distinction + types.py # Shared types for emergence module + semconv.py # Emergence-specific span attributes +``` + +### Cross-Module Interactions + +The five gaps are not independent. They reinforce each other: + +``` + ┌─────────────────────────┐ + │ Gap 1: Entropy Monitor │ + │ (behavioral diversity) │ + └──────────┬──────────────┘ + │ entropy feeds + ▼ +┌───────────────────┐ ┌─────────────────┐ ┌──────────────────┐ +│ Gap 2: Reward │◄──│ Gap 3: Pareto │──►│ Gap 5: Novelty │ +│ Staleness Audit │ │ Tension Tracker │ │ Detection │ +│ (signal validity) │ │ (trade-offs) │ │ (discovery rate) │ +└───────────┬───────┘ └────────┬────────┘ └──────────┬───────┘ + │ │ │ + │ staleness │ pareto rank │ novelty score + │ informs │ informs │ informs + ▼ ▼ ▼ + ┌────────────────────────────────────────────┐ + │ Gap 4: Dissolution Engine │ + │ (resource lifecycle management) │ + └────────────────────────────────────────────┘ +``` + +- **Entropy decline** (Gap 1) triggers dissolution condition checks (Gap 4) +- **Reward staleness** (Gap 2) is a dissolution trigger for resources trained under stale rewards +- **Pareto front stagnation** (Gap 3) — when the front stops expanding, it could indicate + exploration collapse or genuine Pareto optimality. Cross-reference with entropy (Gap 1) + to distinguish +- **Discovery rate decline** (Gap 5) is both an entropy component (Gap 1) and a dissolution + condition (Gap 4) + +### Generative Friction Points (Deliberately Maintained) + +**Friction 1: Novelty vs. Reward** +A novel trajectory with low reward and a routine trajectory with high reward are in +structural tension. The system surfaces both without resolving which is "better." +Resolution depends on context: early in training, novelty should be weighted higher; +late in training, reward matters more. But "early" and "late" are themselves judgment +calls that the system cannot make. + +**Friction 2: Dissolution vs. Stability** +Dissolving resources creates exploration pressure but also destroys learned behaviors. +The system tracks both the cost of keeping stale resources (potential reward hacking, +environmental mismatch) and the cost of dissolving them (loss of learned capabilities, +training instability). Neither cost dominates. + +**Friction 3: Pareto Front vs. Scalar Reward** +Existing algorithms (APO, VERL) consume scalar rewards. The Pareto tracker preserves +dimensional structure. These disagree on what "best" means. The system maintains both +views — the scalar for optimization, the Pareto for awareness — without resolving the +disagreement. + +--- + +## Dissolution Conditions + +Every structural decision in this document carries a condition under which it should be +removed. + +### Module: emergence/ + +**The emergence module as a whole.** +Serves as long as agent-lightning's core training loop lacks built-in exploration +pressure. If future versions of the framework add native entropy regularization, +multi-objective optimization, and resource lifecycle management, this module becomes +redundant scaffolding and should be dissolved. + +### Gap 1: Exploration Decay Monitoring + +**Trajectory entropy computation.** +Dissolution: When agent-lightning's native metrics (`@tracked` decorator, Prometheus +integration) include behavioral diversity metrics, making external entropy computation +redundant. + +**Sliding window collapse detection.** +Dissolution: When algorithms natively implement exploration-exploitation balancing +(e.g., entropy bonus in PPO reward, beam diversity in APO) that makes external collapse +detection unnecessary. Evidence: collapse signals are never acted upon because the +algorithm already prevents collapse. + +**Shape-based entropy (vs. token-level).** +Dissolution: When language model behavioral diversity can be measured at the token level +efficiently. Currently, shape-based measurement is a compression that loses information +but is computationally tractable. If token-level entropy becomes cheap, shape-based +measurement adds unnecessary abstraction. + +### Gap 2: Reward Function Staleness Detection + +**Independent check requirement.** +Serves as long as reward functions are black boxes to the training system. If future +reward models include built-in calibration or uncertainty quantification, external +staleness detection adds redundant computation. + +**Distribution shift detection (without independent checks).** +Dissolution: When reward models natively report confidence scores or calibration +metrics, making KL divergence on reward distributions a less informative proxy. + +**RewardAuditAdapter.** +Dissolution: When audit data flows through the primary adapter pipeline rather than +requiring a separate adapter. Evidence: all algorithms consume audit records alongside +training data. + +### Gap 3: Multi-Objective Tension Support + +**Pareto front tracking.** +Dissolution: When agent-lightning's Triplet model natively supports vector rewards +rather than Optional[float]. At that point, Pareto tracking should move from the +emergence module into core. + +**Dimensional reward in Triplet.metadata.** +Serves as a backward-compatible shim. Dissolves when the Triplet class gains a +`reward_dimensions: Optional[Dict[str, float]]` field, making the metadata workaround +unnecessary. + +**DIMENSIONAL reward match policy.** +Dissolution: When all reward matching policies preserve dimensional structure by +default, making a special policy unnecessary. + +**Tension map (pairwise dimension correlation).** +Dissolution: When dashboard visualization handles multi-objective trade-off display +natively, making the text-based tension map redundant. + +### Gap 4: Policy Dissolution Mechanism + +**TTL-based resource expiry.** +Dissolution: When the training environment is provably stationary (fixed tasks, fixed +evaluation criteria, no distribution shift). In stationary environments, resources +don't go stale and TTL adds unnecessary complexity. + +**Validity conditions on resources.** +Dissolution: When all resources are re-validated on every use (streaming validation +rather than point-in-time checks). At that point, validity conditions are redundant +because staleness is detected continuously. + +**DissolutionPolicy.EXPLORE mode.** +Dissolution: When the runner natively supports resource-free exploration rollouts, +making explicit "explore mode" switching unnecessary. + +**The dissolution engine wrapper around LightningStore.** +Serves as long as dissolution logic is external to the store. If resource lifecycle +management moves into `CollectionBasedLightningStore` natively, the wrapper adds +indirection without benefit. + +### Gap 5: Novel vs. Routine Behavior Distinction + +**Shape-based novelty detection.** +Dissolution: When agent-lightning's span system includes native trajectory +fingerprinting (e.g., a hash of the trace tree structure), making external shape +computation redundant. + +**Codebook-based classification.** +Dissolution: When neural novelty detection (e.g., learned embeddings of trajectory +space) becomes efficient enough for online use. Codebook-based classification is a +discrete approximation that loses continuous similarity information. + +**NoveltyAwareAdapter wrapper.** +Dissolution: When the base `TracerTraceToTriplet` adapter supports configurable +weighting functions, making the wrapper unnecessary. Evidence: all users configure +weights through the base adapter rather than using the wrapper. + +**Discovery rate metric.** +Dissolution: When entropy monitoring (Gap 1) fully subsumes novelty tracking. Currently, +discovery rate captures information that entropy does not (whether *new shapes* are +appearing, not just whether the distribution is spread). If entropy metrics are extended +to track shape inventory growth, discovery rate becomes redundant. + +### Cross-Module Interactions + +**The interaction diagram above.** +Dissolution: When the five gaps are sufficiently independent that cross-referencing adds +complexity without insight. Evidence: operators use individual gap tools in isolation and +never benefit from cross-gap signals. + +**Generative friction points.** +These dissolve when agents reliably surface competing strategies without explicit +scaffolding. Evidence: removing a friction point does not reduce the diversity of +strategies agents discover. + +### "Could Be" Language Pattern + +**Hedged output in all emergence modules.** +Dissolution: When the emergence modules produce high-confidence signals that operators +treat as actionable without hedging. At that point, "could indicate" language adds +uncertainty where none exists and should be replaced with direct statements. Evidence: +operators consistently act on emergence signals and report that hedging is noise. + +### Best-Effort Enrichment Pattern + +**`asyncio.gather(return_exceptions=True)` wrapping.** +Dissolution: When emergence computations are reliable enough that failures drop below +~1%. At that point, switch to hard errors so silent data gaps don't produce silently +misleading output. + +--- + +## What This Is Not + +1. **Not an exploration algorithm.** This document does not propose epsilon-greedy, + UCB, or any other exploration strategy. Those are algorithms that *resolve* the + exploration-exploitation trade-off. This document proposes mechanisms that *surface* + the trade-off so it can be resolved by judgment. + +2. **Not a replacement for optimization.** Every module is additive. Remove them all + and agent-lightning works exactly as before. The emergence layer exists alongside + the optimization loop, not instead of it. + +3. **Not prescriptive.** The modules surface signals. They do not automatically modify + training behavior. An entropy collapse signal does not automatically widen the beam. + A dissolution trigger does not automatically remove resources. A novelty score does + not automatically increase sampling weight. Each signal requires a decision. + +4. **Not permanent.** Every structure in this document is designed to be removed when + it's no longer needed. The dissolution conditions section is not decoration — it is + the most important part of the document. + +--- + +## Implementation Order + +If this design is approved, implementation should proceed in this order: + +1. **Gap 5: Novelty Detection** — smallest surface area, clearest value signal, tests + the integration pattern (new adapter wrapping existing adapter). + +2. **Gap 1: Entropy Monitoring** — builds on novelty detection's shape computation, + provides the behavioral diversity baseline that other gaps reference. + +3. **Gap 3: Pareto Tension** — extends the reward pipeline, tests the metadata-based + backward compatibility pattern. + +4. **Gap 2: Reward Staleness** — requires the most developer input (independent checks), + tests the side-channel audit pattern. + +5. **Gap 4: Dissolution Engine** — depends on signals from all other gaps, should be + implemented last so it can reference real signal types. + +Each gap is independently useful. The implementation order maximizes the value of +partial completion. + +--- + +## Relationship to Spectra MCP Server Patterns + +This design draws directly from patterns validated in the [Spectra Finance MCP Server](https://github.com/Finanzgoblin/mcp-spectra-finance): + +| Spectra Pattern | Agent-Lightning Equivalent | +|---|---| +| Layer 3 "could be" hints | Hedged output in all emergence modules | +| `formatVolumeHints` (signals at knowledge boundary) | `ExplorationDecayMonitor.summary()` | +| Generative friction (raw APY vs effective APY) | Pareto front vs scalar reward | +| Dissolution conditions per structural decision | Gap 4 + final section of this document | +| Best-effort enrichment (`Promise.allSettled`) | `asyncio.gather(return_exceptions=True)` | +| Negative signals (surfacing absence) | Discovery rate decline, codebook stagnation | +| Navigation paths between tools | Cross-module interaction diagram | + +The key difference: Spectra's emergence patterns operate at the **tool output layer** +(shaping what an LLM sees). Agent-lightning's emergence patterns operate at the +**training loop layer** (shaping what an algorithm learns from). The philosophy is the +same — hold tension open, surface ambiguity, dissolve when no longer needed — but the +engineering target is fundamentally different. + +The Spectra server proved that "could be" language and maintained tension produce better +autonomous agent behavior than prescriptive conclusions. This design applies the same +insight to the training process itself: an agent that trains with tension preserved will +be more capable than one that trains with tension resolved. diff --git a/tests/emergence/__init__.py b/tests/emergence/__init__.py new file mode 100644 index 000000000..2a50eae89 --- /dev/null +++ b/tests/emergence/__init__.py @@ -0,0 +1 @@ +# Copyright (c) Microsoft. All rights reserved. diff --git a/tests/emergence/conftest.py b/tests/emergence/conftest.py new file mode 100644 index 000000000..b77f1757f --- /dev/null +++ b/tests/emergence/conftest.py @@ -0,0 +1,108 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Shared fixtures for emergence module tests.""" + +from __future__ import annotations + +# Patch Unix-only modules for Windows compatibility before any agentlightning imports +import platform +import sys +import types as _types +from unittest.mock import MagicMock + +if platform.system() == "Windows": + for _mod_name in ("fcntl", "pwd", "grp", "resource"): + if _mod_name not in sys.modules: + sys.modules[_mod_name] = _types.ModuleType(_mod_name) + # gunicorn is deeply Unix-specific; mock the entire tree + for _mod_name in [ + "gunicorn", "gunicorn.app", "gunicorn.app.base", "gunicorn.arbiter", + "gunicorn.sock", "gunicorn.systemd", "gunicorn.util", "gunicorn.config", + "gunicorn.errors", "gunicorn.http", "gunicorn.http.wsgi", + "gunicorn.workers", "gunicorn.workers.base", + ]: + if _mod_name not in sys.modules: + sys.modules[_mod_name] = MagicMock() + +import itertools +from typing import Any, Dict, List, Optional + +import pytest + +from agentlightning.adapter.triplet import TraceTree +from agentlightning.types import Span, Triplet + +_SEQ = itertools.count() + + +def make_span( + name: str = "test_span", + *, + rollout_id: str = "rollout-1", + attempt_id: str = "attempt-1", + parent_id: Optional[str] = None, + span_id: Optional[str] = None, + trace_id: Optional[str] = None, + start_time: float = 0.0, + end_time: float = 1.0, + attributes: Optional[Dict[str, Any]] = None, +) -> Span: + """Create a minimal Span for testing.""" + return Span.from_attributes( + rollout_id=rollout_id, + attempt_id=attempt_id, + sequence_id=next(_SEQ), + name=name, + span_id=span_id, + trace_id=trace_id, + parent_id=parent_id, + start_time=start_time, + end_time=end_time, + attributes=attributes or {}, + ) + + +def make_tree( + name: str = "root", + children: Optional[List[TraceTree]] = None, + *, + start_time: float = 0.0, + end_time: float = 1.0, +) -> TraceTree: + """Create a TraceTree node for testing.""" + span = make_span(name=name, start_time=start_time, end_time=end_time) + return TraceTree(id=span.span_id, span=span, children=children or []) + + +def make_diverse_trees(n: int = 10) -> List[TraceTree]: + """Create n structurally diverse trace trees.""" + trees: List[TraceTree] = [] + tool_names = ["search", "analyze", "respond", "compute", "lookup", "transform"] + for i in range(n): + name = tool_names[i % len(tool_names)] + child_count = (i % 3) + 1 + children = [ + make_tree(f"{name}_child_{j}", start_time=float(j), end_time=float(j + 1)) + for j in range(child_count) + ] + trees.append(make_tree(f"root_{name}", children=children)) + return trees + + +def make_uniform_trees(n: int = 10) -> List[TraceTree]: + """Create n structurally identical trace trees.""" + trees: List[TraceTree] = [] + for _ in range(n): + child = make_tree("openai.chat.completion", start_time=0.1, end_time=0.5) + trees.append(make_tree("root", children=[child])) + return trees + + +def make_triplet(reward: Optional[float] = None, **metadata: Any) -> Triplet: + """Create a Triplet for testing.""" + return Triplet( + prompt={"token_ids": [1, 2, 3]}, + response={"token_ids": [4, 5, 6]}, + reward=reward, + metadata=metadata, + ) diff --git a/tests/emergence/test_dissolution.py b/tests/emergence/test_dissolution.py new file mode 100644 index 000000000..b3d041ae2 --- /dev/null +++ b/tests/emergence/test_dissolution.py @@ -0,0 +1,231 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tests for Gap 4: Dissolution engine.""" + +from __future__ import annotations + +import time +from typing import Any, Dict, List, Optional, Sequence +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from agentlightning.emergence.dissolution import ( + DissolutionEngine, + DissolutionMetadata, + DissolutionPolicy, + ValidityCondition, +) +from agentlightning.emergence.types import ConditionResult, DissolutionSignal +from agentlightning.store.base import LightningStore +from agentlightning.types import ResourcesUpdate + + +def _make_resources( + resources_id: str = "res-1", + version: int = 1, +) -> ResourcesUpdate: + return ResourcesUpdate( + resources_id=resources_id, + create_time=time.time(), + update_time=time.time(), + version=version, + resources={}, + ) + + +class FakeStore(LightningStore): + """Minimal fake store for dissolution tests.""" + + def __init__(self, resources: Optional[List[ResourcesUpdate]] = None): + self._resources = resources if resources is not None else [_make_resources()] + + async def get_latest_resources(self) -> Optional[ResourcesUpdate]: + return self._resources[-1] if self._resources else None + + async def get_resources_by_id(self, resources_id: str) -> Optional[ResourcesUpdate]: + for r in self._resources: + if r.resources_id == resources_id: + return r + return None + + async def query_resources(self, **kwargs: Any) -> Sequence[ResourcesUpdate]: + result = list(self._resources) + if kwargs.get("sort_order") == "desc": + result.reverse() + limit = kwargs.get("limit", -1) + if limit > 0: + result = result[:limit] + return result + + +class TestDissolutionEngine: + @pytest.mark.asyncio + async def test_no_signal_without_metadata(self): + store = FakeStore() + engine = DissolutionEngine(store, check_interval=1) + resources, signal = await engine.get_resources_with_dissolution_check() + assert resources is not None + assert signal is None + + @pytest.mark.asyncio + async def test_ttl_expiry_signal(self): + store = FakeStore() + engine = DissolutionEngine(store, check_interval=1) + + # Attach metadata with expired TTL + await engine.attach_dissolution_metadata( + "res-1", + ttl_seconds=1, + ) + # Backdate the creation time + engine._dissolution_cache["res-1"].created_at = time.time() - 100 + + resources, signal = await engine.get_resources_with_dissolution_check("res-1") + assert resources is not None + assert signal is not None + assert signal.trigger == "ttl_expired" + assert "could indicate" in signal.recommendation.lower() or "Could indicate" in signal.recommendation + + @pytest.mark.asyncio + async def test_condition_failure_signal(self): + store = FakeStore() + engine = DissolutionEngine(store, check_interval=1) + + condition = ValidityCondition( + name="min_reward", + check_type="reward_threshold", + parameters={"threshold": 0.5}, + ) + await engine.attach_dissolution_metadata( + "res-1", + validity_conditions=[condition], + ) + # Register a checker that fails + engine.register_condition_checker( + "reward_threshold", + lambda cond: ConditionResult( + condition_name=cond.name, + passed=False, + value=0.3, + threshold=0.5, + description="Reward below threshold.", + ), + ) + + resources, signal = await engine.get_resources_with_dissolution_check("res-1") + assert signal is not None + assert "condition_failed" in signal.trigger + + @pytest.mark.asyncio + async def test_check_interval_respected(self): + store = FakeStore() + engine = DissolutionEngine(store, check_interval=5) + + await engine.attach_dissolution_metadata("res-1", ttl_seconds=1) + engine._dissolution_cache["res-1"].created_at = time.time() - 100 + + # First 4 checks should not check dissolution + for _ in range(4): + _, signal = await engine.get_resources_with_dissolution_check("res-1") + assert signal is None + + # 5th check should trigger + _, signal = await engine.get_resources_with_dissolution_check("res-1") + assert signal is not None + + @pytest.mark.asyncio + async def test_dissolve_revalidate(self): + store = FakeStore() + engine = DissolutionEngine(store) + await engine.attach_dissolution_metadata( + "res-1", + policy=DissolutionPolicy.REVALIDATE, + ) + action = await engine.dissolve("res-1", "test_trigger") + assert action.policy == "revalidate" + assert "re-validation" in action.description.lower() + + @pytest.mark.asyncio + async def test_dissolve_regress(self): + resources = [_make_resources("res-1", 1), _make_resources("res-2", 2)] + store = FakeStore(resources) + engine = DissolutionEngine(store) + await engine.attach_dissolution_metadata( + "res-2", + policy=DissolutionPolicy.REGRESS, + ) + action = await engine.dissolve("res-2", "test_trigger") + assert action.policy == "regress" + assert "res-1" in action.action_taken + + @pytest.mark.asyncio + async def test_dissolve_regress_no_previous(self): + store = FakeStore([_make_resources("res-1")]) + engine = DissolutionEngine(store) + await engine.attach_dissolution_metadata( + "res-1", + policy=DissolutionPolicy.REGRESS, + ) + action = await engine.dissolve("res-1", "test_trigger") + assert "no_previous_version" in action.action_taken + + @pytest.mark.asyncio + async def test_dissolve_explore(self): + store = FakeStore() + engine = DissolutionEngine(store) + await engine.attach_dissolution_metadata( + "res-1", + policy=DissolutionPolicy.EXPLORE, + ) + action = await engine.dissolve("res-1", "test_trigger") + assert action.policy == "explore" + assert "exploration" in action.description.lower() + + @pytest.mark.asyncio + async def test_check_conditions(self): + store = FakeStore() + engine = DissolutionEngine(store) + condition = ValidityCondition( + name="test_cond", + check_type="custom", + ) + await engine.attach_dissolution_metadata( + "res-1", + validity_conditions=[condition], + ) + engine.register_condition_checker( + "custom", + lambda cond: ConditionResult( + condition_name=cond.name, + passed=True, + ), + ) + results = await engine.check_conditions("res-1") + assert len(results) == 1 + assert results[0].passed + + @pytest.mark.asyncio + async def test_check_conditions_empty(self): + store = FakeStore() + engine = DissolutionEngine(store) + results = await engine.check_conditions("nonexistent") + assert results == [] + + @pytest.mark.asyncio + async def test_no_resources(self): + store = FakeStore([]) + engine = DissolutionEngine(store) + resources, signal = await engine.get_resources_with_dissolution_check() + assert resources is None + assert signal is None + + @pytest.mark.asyncio + async def test_validation_history_recorded(self): + store = FakeStore() + engine = DissolutionEngine(store) + await engine.attach_dissolution_metadata("res-1") + await engine.dissolve("res-1", "test") + meta = engine._dissolution_cache["res-1"] + assert len(meta.validation_history) == 1 + assert meta.validation_history[0].trigger == "test" diff --git a/tests/emergence/test_entropy.py b/tests/emergence/test_entropy.py new file mode 100644 index 000000000..6b4084da5 --- /dev/null +++ b/tests/emergence/test_entropy.py @@ -0,0 +1,86 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tests for Gap 1: Entropy computation.""" + +import pytest + +from agentlightning.emergence.entropy import TrajectoryEntropy, _shannon_entropy + +from .conftest import make_diverse_trees, make_triplet, make_uniform_trees + + +class TestShannonEntropy: + def test_empty(self): + assert _shannon_entropy([]) == 0.0 + + def test_single_category(self): + assert _shannon_entropy(["a", "a", "a"]) == 0.0 + + def test_uniform_distribution(self): + # Uniform distribution should have entropy close to 1.0 + items = ["a", "b", "c", "d"] * 10 + entropy = _shannon_entropy(items) + assert 0.95 <= entropy <= 1.0 + + def test_skewed_distribution(self): + items = ["a"] * 90 + ["b"] * 10 + entropy = _shannon_entropy(items) + assert 0.0 < entropy < 0.7 + + +class TestTrajectoryEntropy: + def test_shape_entropy_diverse(self): + calc = TrajectoryEntropy() + trees = make_diverse_trees(20) + entropy = calc.compute_shape_entropy(trees, window_size=20) + assert entropy > 0.0 + + def test_shape_entropy_uniform(self): + calc = TrajectoryEntropy() + trees = make_uniform_trees(20) + entropy = calc.compute_shape_entropy(trees, window_size=20) + # All identical trees -> low entropy + assert entropy == 0.0 + + def test_shape_entropy_empty(self): + calc = TrajectoryEntropy() + assert calc.compute_shape_entropy([], window_size=10) == 0.0 + + def test_tool_entropy_diverse(self): + calc = TrajectoryEntropy() + trees = make_diverse_trees(10) + # These trees don't have openai.chat.completion spans, so we use a broader match + entropy = calc.compute_tool_entropy(trees, window_size=10, llm_call_match=r"root_") + assert entropy > 0.0 + + def test_tool_entropy_empty(self): + calc = TrajectoryEntropy() + assert calc.compute_tool_entropy([], window_size=10) == 0.0 + + def test_reward_entropy_spread(self): + calc = TrajectoryEntropy() + triplets = [make_triplet(reward=float(i) / 10) for i in range(10)] + entropy = calc.compute_reward_entropy(triplets, window_size=10) + assert entropy > 0.0 + + def test_reward_entropy_constant(self): + calc = TrajectoryEntropy() + triplets = [make_triplet(reward=1.0) for _ in range(10)] + entropy = calc.compute_reward_entropy(triplets, window_size=10) + assert entropy == 0.0 + + def test_reward_entropy_no_rewards(self): + calc = TrajectoryEntropy() + triplets = [make_triplet(reward=None) for _ in range(10)] + entropy = calc.compute_reward_entropy(triplets, window_size=10) + assert entropy == 0.0 + + def test_window_size_respected(self): + calc = TrajectoryEntropy() + trees = make_diverse_trees(100) + # Small window should only look at last 5 + entropy_small = calc.compute_shape_entropy(trees, window_size=5) + entropy_large = calc.compute_shape_entropy(trees, window_size=100) + # Both should compute, but may differ + assert entropy_small >= 0.0 + assert entropy_large >= 0.0 diff --git a/tests/emergence/test_monitoring.py b/tests/emergence/test_monitoring.py new file mode 100644 index 000000000..3628e56be --- /dev/null +++ b/tests/emergence/test_monitoring.py @@ -0,0 +1,101 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tests for Gap 1: Exploration collapse monitoring.""" + +import pytest + +from agentlightning.emergence.monitoring import ( + CollapseSignal, + EntropySnapshot, + ExplorationDecayMonitor, + _compute_trend, +) + +from .conftest import make_diverse_trees, make_triplet, make_uniform_trees + + +class TestComputeTrend: + def test_increasing(self): + assert _compute_trend([1.0, 2.0, 3.0, 4.0]) > 0 + + def test_decreasing(self): + assert _compute_trend([4.0, 3.0, 2.0, 1.0]) < 0 + + def test_constant(self): + assert _compute_trend([5.0, 5.0, 5.0]) == 0.0 + + def test_single_value(self): + assert _compute_trend([1.0]) == 0.0 + + def test_empty(self): + assert _compute_trend([]) == 0.0 + + +class TestExplorationDecayMonitor: + def test_record_creates_snapshot(self): + monitor = ExplorationDecayMonitor(window_size=5) + trees = make_diverse_trees(5) + triplets = [make_triplet(reward=0.5) for _ in range(5)] + snapshot = monitor.record(trees, triplets) + assert isinstance(snapshot, EntropySnapshot) + assert snapshot.window_index == 0 + + def test_detect_collapse_insufficient_data(self): + monitor = ExplorationDecayMonitor(trend_window=5) + # Not enough windows recorded + assert monitor.detect_collapse() is None + + def test_detect_collapse_pattern(self): + monitor = ExplorationDecayMonitor( + window_size=5, + alert_threshold=0.5, + trend_window=3, + ) + # Simulate declining entropy with improving reward + # Record diverse trajectories first, then uniform ones + diverse = make_diverse_trees(5) + uniform = make_uniform_trees(5) + + # Window 1: high entropy + triplets_low = [make_triplet(reward=0.3) for _ in range(5)] + monitor.record(diverse, triplets_low) + + # Window 2: medium entropy + triplets_mid = [make_triplet(reward=0.6) for _ in range(5)] + monitor.record(diverse[:3] + uniform[:2], triplets_mid) + + # Window 3: low entropy, high reward + triplets_high = [make_triplet(reward=0.9) for _ in range(5)] + monitor.record(uniform, triplets_high) + + signal = monitor.detect_collapse() + # May or may not detect collapse depending on actual entropy values + # but should not crash + if signal is not None: + assert isinstance(signal, CollapseSignal) + assert signal.severity in ("low", "medium", "high") + assert "could indicate" in signal.description.lower() or "Could indicate" in signal.description + + def test_no_collapse_when_entropy_high(self): + monitor = ExplorationDecayMonitor( + window_size=5, + alert_threshold=0.1, + trend_window=3, + ) + diverse = make_diverse_trees(5) + for _ in range(3): + monitor.record(diverse, [make_triplet(reward=0.5) for _ in range(5)]) + # Entropy should be high with diverse trees + signal = monitor.detect_collapse() + assert signal is None + + def test_summary(self): + monitor = ExplorationDecayMonitor() + assert "No entropy data" in monitor.summary() + + trees = make_diverse_trees(5) + triplets = [make_triplet(reward=0.5) for _ in range(5)] + monitor.record(trees, triplets) + summary = monitor.summary() + assert "Shape entropy" in summary + assert "Mean reward" in summary diff --git a/tests/emergence/test_novelty.py b/tests/emergence/test_novelty.py new file mode 100644 index 000000000..28d2351f2 --- /dev/null +++ b/tests/emergence/test_novelty.py @@ -0,0 +1,179 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tests for Gap 5: Novelty Detection.""" + +import pytest + +from agentlightning.adapter.triplet import TracerTraceToTriplet +from agentlightning.emergence.novelty import ( + NoveltyAwareAdapter, + NoveltyDetector, + NoveltyScore, + TrajectoryShape, + _shape_similarity, +) + +from .conftest import make_diverse_trees, make_span, make_tree, make_uniform_trees + + +class TestTrajectoryShape: + def test_from_trace_tree(self): + child = make_tree("child_a") + tree = make_tree("root", children=[child]) + shape = TrajectoryShape.from_trace_tree(tree) + assert len(shape.nodes) == 2 + assert shape.nodes[0][0] == "root" # name + assert shape.nodes[0][1] == 0 # depth + assert shape.nodes[1][0] == "child_a" + assert shape.nodes[1][1] == 1 # depth + + def test_fingerprint_uniqueness(self): + tree_a = make_tree("root", children=[make_tree("a")]) + tree_b = make_tree("root", children=[make_tree("b")]) + shape_a = TrajectoryShape.from_trace_tree(tree_a) + shape_b = TrajectoryShape.from_trace_tree(tree_b) + assert shape_a.fingerprint != shape_b.fingerprint + + def test_fingerprint_stability(self): + tree = make_tree("root", children=[make_tree("child")]) + shape1 = TrajectoryShape.from_trace_tree(tree) + shape2 = TrajectoryShape.from_trace_tree(tree) + assert shape1.fingerprint == shape2.fingerprint + + +class TestShapeSimilarity: + def test_identical_shapes(self): + tree = make_tree("root", children=[make_tree("a")]) + shape = TrajectoryShape.from_trace_tree(tree) + assert _shape_similarity(shape, shape) == 1.0 + + def test_completely_different(self): + shape_a = TrajectoryShape.from_trace_tree( + make_tree("x", children=[make_tree("y", children=[make_tree("z")])]) + ) + shape_b = TrajectoryShape.from_trace_tree( + make_tree("a", children=[make_tree("b")]) + ) + sim = _shape_similarity(shape_a, shape_b) + assert 0.0 <= sim < 1.0 + + def test_empty_shapes(self): + a = TrajectoryShape(nodes=(), fingerprint="a") + b = TrajectoryShape(nodes=(), fingerprint="b") + assert _shape_similarity(a, b) == 1.0 + + +class TestNoveltyDetector: + def test_first_trajectory_is_novel(self): + detector = NoveltyDetector() + tree = make_tree("root", children=[make_tree("child")]) + score = detector.score_novelty(tree) + assert score.score == 1.0 + assert score.first_seen is True + assert score.classification == "novel" + + def test_repeated_trajectory_becomes_routine(self): + detector = NoveltyDetector(novelty_decay_rate=0.5) + tree = make_tree("root", children=[make_tree("child")]) + + # First observation + score1 = detector.score_novelty(tree) + assert score1.classification == "novel" + + # Repeated observations decay novelty + for _ in range(10): + score = detector.score_novelty(tree) + + assert score.score < 0.3 + assert score.classification == "routine" + + def test_different_structures_are_novel(self): + detector = NoveltyDetector() + tree_a = make_tree("root", children=[make_tree("a")]) + tree_b = make_tree("root", children=[make_tree("b"), make_tree("c")]) + + score_a = detector.score_novelty(tree_a) + score_b = detector.score_novelty(tree_b) + assert score_a.first_seen is True + assert score_b.first_seen is True + + def test_similar_shapes_recognized(self): + detector = NoveltyDetector(shape_similarity_threshold=0.5) + tree_a = make_tree("root", children=[make_tree("search"), make_tree("respond")]) + detector.score_novelty(tree_a) + + # Very similar tree + tree_b = make_tree("root", children=[make_tree("search"), make_tree("respond")]) + score = detector.score_novelty(tree_b) + assert score.first_seen is False + + def test_codebook_eviction(self): + detector = NoveltyDetector(max_codebook_size=3) + for i in range(5): + tree = make_tree(f"root_{i}", children=[make_tree(f"child_{i}")]) + detector.score_novelty(tree) + assert len(detector._codebook) <= 3 + + def test_discovery_rate(self): + detector = NoveltyDetector() + trees = make_diverse_trees(10) + for tree in trees: + detector.score_novelty(tree) + rate = detector.get_discovery_rate(window_size=10) + assert rate > 0.0 + + def test_discovery_rate_empty(self): + detector = NoveltyDetector() + assert detector.get_discovery_rate() == 0.0 + + def test_codebook_summary(self): + detector = NoveltyDetector() + assert "empty" in detector.get_codebook_summary() + + for tree in make_diverse_trees(5): + detector.score_novelty(tree) + summary = detector.get_codebook_summary() + assert "known shapes" in summary + assert "Discovery rate" in summary + + +class TestNoveltyAwareAdapter: + def test_enriches_triplets_with_metadata(self): + base = TracerTraceToTriplet( + llm_call_match=r"openai\.chat\.completion", + repair_hierarchy=False, + ) + detector = NoveltyDetector() + adapter = NoveltyAwareAdapter(base, detector) + + # Create spans that form a tree with an LLM call + root = make_span("root", span_id="root-1", start_time=0.0, end_time=2.0) + llm = make_span( + "openai.chat.completion", + parent_id="root-1", + start_time=0.1, + end_time=0.5, + attributes={ + "gen_ai.response.id": "resp-1", + "gen_ai.prompt.0.role": "user", + "gen_ai.prompt.0.content": "hello", + "gen_ai.completion.0.role": "assistant", + "gen_ai.completion.0.content": "world", + "prompt_token_ids": [1, 2, 3], + "response_token_ids": [4, 5, 6], + }, + ) + triplets = adapter.adapt([root, llm]) + if triplets: + assert "novelty_score" in triplets[0].metadata + assert "novelty_classification" in triplets[0].metadata + assert "sampling_weight" in triplets[0].metadata + + def test_handles_empty_source(self): + base = TracerTraceToTriplet(repair_hierarchy=False) + detector = NoveltyDetector() + adapter = NoveltyAwareAdapter(base, detector) + # Empty source should not crash + # (TracerTraceToTriplet will raise ValueError on empty spans, but that's expected) + with pytest.raises(ValueError): + adapter.adapt([]) diff --git a/tests/emergence/test_pareto.py b/tests/emergence/test_pareto.py new file mode 100644 index 000000000..bc6711fb7 --- /dev/null +++ b/tests/emergence/test_pareto.py @@ -0,0 +1,121 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tests for Gap 3: Pareto tension tracking.""" + +import pytest + +from agentlightning.emergence.pareto import ( + ParetoClassification, + ParetoPoint, + ParetoTracker, + _dominates, + _pearson_correlation, +) + + +class TestDominates: + def test_a_dominates_b(self): + assert _dominates({"x": 2.0, "y": 3.0}, {"x": 1.0, "y": 2.0}) + + def test_equal_no_domination(self): + assert not _dominates({"x": 1.0, "y": 1.0}, {"x": 1.0, "y": 1.0}) + + def test_trade_off_no_domination(self): + # a is better in x but worse in y + assert not _dominates({"x": 2.0, "y": 1.0}, {"x": 1.0, "y": 2.0}) + + def test_partial_domination(self): + # a is better in x, equal in y + assert _dominates({"x": 2.0, "y": 1.0}, {"x": 1.0, "y": 1.0}) + + +class TestPearsonCorrelation: + def test_perfect_positive(self): + corr = _pearson_correlation([1.0, 2.0, 3.0], [2.0, 4.0, 6.0]) + assert abs(corr - 1.0) < 0.01 + + def test_perfect_negative(self): + corr = _pearson_correlation([1.0, 2.0, 3.0], [6.0, 4.0, 2.0]) + assert abs(corr - (-1.0)) < 0.01 + + def test_zero_variance(self): + corr = _pearson_correlation([1.0, 1.0, 1.0], [2.0, 4.0, 6.0]) + assert corr == 0.0 + + def test_insufficient_data(self): + assert _pearson_correlation([1.0], [2.0]) == 0.0 + + +class TestParetoTracker: + def test_first_point_is_pareto_optimal(self): + tracker = ParetoTracker(dimensions=["speed", "quality"]) + result = tracker.add_point("r1", {"speed": 0.8, "quality": 0.6}) + assert result.rank == 0 + assert len(result.dominated_by) == 0 + + def test_dominated_point(self): + tracker = ParetoTracker(dimensions=["speed", "quality"]) + tracker.add_point("r1", {"speed": 0.8, "quality": 0.8}) + result = tracker.add_point("r2", {"speed": 0.5, "quality": 0.5}) + assert result.rank > 0 + assert "r1" in result.dominated_by + + def test_new_point_displaces_front(self): + tracker = ParetoTracker(dimensions=["speed", "quality"]) + tracker.add_point("r1", {"speed": 0.5, "quality": 0.5}) + result = tracker.add_point("r2", {"speed": 0.8, "quality": 0.8}) + assert result.rank == 0 + assert "r1" in result.dominates + + def test_pareto_front_with_trade_offs(self): + tracker = ParetoTracker(dimensions=["speed", "quality"]) + tracker.add_point("r1", {"speed": 0.9, "quality": 0.3}) + tracker.add_point("r2", {"speed": 0.3, "quality": 0.9}) + tracker.add_point("r3", {"speed": 0.6, "quality": 0.6}) + front = tracker.get_front(rank=0) + # r1 and r2 should both be on the front (trade-off) + front_ids = [p.rollout_id for p in front] + assert "r1" in front_ids + assert "r2" in front_ids + + def test_tension_map_negative_correlation(self): + tracker = ParetoTracker(dimensions=["speed", "quality"]) + # Create negatively correlated points + for i in range(10): + speed = float(i) / 10 + quality = 1.0 - speed + tracker.add_point(f"r{i}", {"speed": speed, "quality": quality}) + tension = tracker.get_tension_map() + assert ("speed", "quality") in tension + assert tension[("speed", "quality")] < -0.5 + + def test_tension_map_insufficient_data(self): + tracker = ParetoTracker(dimensions=["speed", "quality"]) + tracker.add_point("r1", {"speed": 0.5, "quality": 0.5}) + assert tracker.get_tension_map() == {} + + def test_summary_empty(self): + tracker = ParetoTracker(dimensions=["a", "b"]) + assert "No Pareto data" in tracker.summary() + + def test_summary_with_tension(self): + tracker = ParetoTracker(dimensions=["speed", "quality"]) + for i in range(20): + speed = float(i) / 20 + quality = 1.0 - speed + tracker.add_point(f"r{i}", {"speed": speed, "quality": quality}) + summary = tracker.summary() + assert "Pareto front" in summary + assert "tension" in summary.lower() or "ρ" in summary + + def test_get_front_layered(self): + tracker = ParetoTracker(dimensions=["x", "y"]) + # Layer 0: non-dominated + tracker.add_point("a", {"x": 1.0, "y": 0.0}) + tracker.add_point("b", {"x": 0.0, "y": 1.0}) + # Layer 1: dominated by a or b + tracker.add_point("c", {"x": 0.5, "y": 0.0}) + tracker.add_point("d", {"x": 0.0, "y": 0.5}) + + front_0 = tracker.get_front(rank=0) + assert len(front_0) >= 2 diff --git a/tests/emergence/test_reward_audit.py b/tests/emergence/test_reward_audit.py new file mode 100644 index 000000000..79f4cde93 --- /dev/null +++ b/tests/emergence/test_reward_audit.py @@ -0,0 +1,151 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tests for Gap 2: Reward staleness detection.""" + +import pytest + +from agentlightning.emergence.reward_audit import ( + RewardAuditAdapter, + RewardStalenessAuditor, + _kl_divergence_binned, + _rank, + _spearman_rank_correlation, +) +from agentlightning.emergence.types import AuditRecord + +from .conftest import make_span + + +class TestRank: + def test_simple(self): + ranks = _rank([3.0, 1.0, 2.0]) + assert ranks == [3.0, 1.0, 2.0] + + def test_ties(self): + ranks = _rank([1.0, 1.0, 3.0]) + assert ranks[0] == ranks[1] # Tied ranks should be average + assert ranks[2] == 3.0 + + +class TestSpearmanCorrelation: + def test_perfect_positive(self): + corr = _spearman_rank_correlation([1.0, 2.0, 3.0, 4.0], [2.0, 4.0, 6.0, 8.0]) + assert abs(corr - 1.0) < 0.01 + + def test_perfect_negative(self): + corr = _spearman_rank_correlation([1.0, 2.0, 3.0, 4.0], [8.0, 6.0, 4.0, 2.0]) + assert abs(corr - (-1.0)) < 0.01 + + def test_insufficient_data(self): + assert _spearman_rank_correlation([1.0], [2.0]) == 0.0 + + +class TestKLDivergence: + def test_identical_distributions(self): + samples = [float(i) for i in range(20)] + kl = _kl_divergence_binned(samples, samples) + assert kl < 0.01 + + def test_different_distributions(self): + p = [float(i) for i in range(20)] + q = [float(i + 10) for i in range(20)] + kl = _kl_divergence_binned(p, q) + assert kl > 0.0 + + def test_constant_values(self): + p = [1.0] * 20 + q = [1.0] * 20 + kl = _kl_divergence_binned(p, q) + assert kl == 0.0 + + +class TestRewardStalenessAuditor: + def test_no_audit_before_frequency(self): + auditor = RewardStalenessAuditor(audit_frequency=10) + for i in range(9): + auditor.record_reward(float(i), f"r{i}") + auditor.record_independent_check(float(i), f"r{i}") + # Not at audit_frequency yet (obs count is 9, not divisible by 10) + result = auditor.audit() + assert result is None + + def test_audit_at_frequency_with_high_correlation(self): + auditor = RewardStalenessAuditor(audit_frequency=10, divergence_threshold=0.5) + for i in range(10): + auditor.record_reward(float(i), f"r{i}") + auditor.record_independent_check(float(i), f"r{i}") + # Perfect correlation should not trigger staleness + result = auditor.audit() + assert result is None + + def test_audit_detects_divergence(self): + auditor = RewardStalenessAuditor(audit_frequency=10, divergence_threshold=0.2) + for i in range(10): + auditor.record_reward(float(i), f"r{i}") + # Independent check is inversely correlated + auditor.record_independent_check(float(9 - i), f"r{i}") + result = auditor.audit() + assert result is not None + assert result.rank_correlation < 0.0 + assert "could indicate" in result.description.lower() or "Could indicate" in result.description + + def test_distribution_shift_detected(self): + auditor = RewardStalenessAuditor(window_size=20) + # First half: low rewards + for i in range(20): + auditor.record_reward(float(i) * 0.01, f"r{i}") + # Second half: high rewards + for i in range(20): + auditor.record_reward(float(i) * 0.1 + 5.0, f"r{i + 20}") + report = auditor.get_distribution_shift() + if report is not None: + assert report.kl_divergence > 0 + + def test_no_distribution_shift_insufficient_data(self): + auditor = RewardStalenessAuditor(window_size=100) + for i in range(5): + auditor.record_reward(float(i), f"r{i}") + assert auditor.get_distribution_shift() is None + + +class TestRewardAuditAdapter: + def test_extracts_reward_spans(self): + adapter = RewardAuditAdapter() + # Create a reward span + reward_span = make_span( + "agentlightning.annotation", + attributes={ + "agentlightning.reward.0.name": "primary", + "agentlightning.reward.0.value": 0.75, + }, + rollout_id="rollout-1", + attempt_id="attempt-1", + start_time=1.0, + ) + non_reward_span = make_span("openai.chat.completion") + + records = adapter.adapt([reward_span, non_reward_span]) + assert len(records) == 1 + assert records[0].rollout_id == "rollout-1" + assert records[0].reward_value == 0.75 + + def test_handles_empty_source(self): + adapter = RewardAuditAdapter() + records = adapter.adapt([]) + assert records == [] + + def test_multi_dimensional_reward(self): + adapter = RewardAuditAdapter() + span = make_span( + "agentlightning.annotation", + attributes={ + "agentlightning.reward.0.name": "speed", + "agentlightning.reward.0.value": 0.9, + "agentlightning.reward.1.name": "quality", + "agentlightning.reward.1.value": 0.6, + }, + ) + records = adapter.adapt([span]) + assert len(records) == 1 + assert records[0].reward_dimensions.get("speed") == 0.9 + assert records[0].reward_dimensions.get("quality") == 0.6