Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions agent_actions/llm/batch/services/processing_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from agent_actions.logging.events import BatchCompleteEvent
from agent_actions.processing.types import RecoveryMetadata
from agent_actions.storage.backend import (
DISPOSITION_DEFERRED,
DISPOSITION_EXHAUSTED,
DISPOSITION_FAILED,
DISPOSITION_FILTERED,
Expand Down Expand Up @@ -515,6 +516,9 @@ def write_record_dispositions(
finalize_batch_output() (multi-batch collection path). These are
mutually exclusive entry points — a given batch never flows through both.

Also clears any prior DEFERRED disposition for each record, since the
batch result represents the final status (ARCH-003).

Disposition writes are telemetry — errors are logged but never propagated.
"""
if not service._storage_backend:
Expand All @@ -525,6 +529,23 @@ def write_record_dispositions(
continue
metadata = item.get("metadata", {})

try:
# Clear the DEFERRED disposition now that the batch result has
# arrived. For success records this is the only disposition
# action; for non-success records the final disposition is
# written immediately below.
service._storage_backend.clear_disposition(
action_name,
disposition=DISPOSITION_DEFERRED,
record_id=source_guid,
)
except Exception:
logger.debug(
"Could not clear DEFERRED disposition for %s (may not exist)",
source_guid,
exc_info=True,
)

try:
if metadata.get("retry_exhausted"):
service._storage_backend.set_disposition(
Expand Down
19 changes: 13 additions & 6 deletions agent_actions/logging/events/data_pipeline_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,18 +461,24 @@ class ResultCollectionCompleteEvent(BaseEvent):
total_failed: int = 0
total_exhausted: int = 0
total_unprocessed: int = 0
total_deferred: int = 0
guard_condition: str = ""
guard_on_false: str = ""

def __post_init__(self) -> None:
self.level = EventLevel.INFO
self.category = EventCategories.DATA_PROCESSING
self.message = (
f"[{self.action_name}] Result collection complete: "
f"{self.total_success} success, {self.total_skipped} skipped, "
f"{self.total_filtered} filtered, {self.total_failed} failed, "
f"{self.total_exhausted} exhausted, {self.total_unprocessed} unprocessed"
)
parts = [
f"{self.total_success} success",
f"{self.total_skipped} skipped",
f"{self.total_filtered} filtered",
f"{self.total_failed} failed",
f"{self.total_exhausted} exhausted",
f"{self.total_unprocessed} unprocessed",
]
if self.total_deferred:
parts.append(f"{self.total_deferred} deferred")
self.message = f"[{self.action_name}] Result collection complete: " + ", ".join(parts)
self.data = {
"action_name": self.action_name,
"total_success": self.total_success,
Expand All @@ -481,6 +487,7 @@ def __post_init__(self) -> None:
"total_failed": self.total_failed,
"total_exhausted": self.total_exhausted,
"total_unprocessed": self.total_unprocessed,
"total_deferred": self.total_deferred,
"guard_condition": self.guard_condition,
"guard_on_false": self.guard_on_false,
}
Expand Down
23 changes: 22 additions & 1 deletion agent_actions/processing/result_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)
from agent_actions.processing.types import ProcessingResult, ProcessingStatus
from agent_actions.storage.backend import (
DISPOSITION_DEFERRED,
DISPOSITION_EXHAUSTED,
DISPOSITION_FAILED,
DISPOSITION_FILTERED,
Expand Down Expand Up @@ -48,6 +49,7 @@ class CollectionStats:
skipped: int = 0
filtered: int = 0
exhausted: int = 0
deferred: int = 0
unprocessed: int = 0


Expand Down Expand Up @@ -262,10 +264,27 @@ def collect_results(
)

elif status == ProcessingStatus.DEFERRED:
task_id = result.task_id or ""
logger.info(
"Collected DEFERRED result source_guid=%s",
"Collected DEFERRED result source_guid=%s task_id=%s",
result.source_guid,
task_id,
)
fire_event(
ResultCollectedEvent(
action_name=agent_name,
result_index=idx,
status="deferred",
)
)
if storage_backend and result.source_guid:
_safe_set_disposition(
storage_backend,
agent_name,
result.source_guid,
DISPOSITION_DEFERRED,
reason=f"batch_queued:task_id={task_id}",
)

else:
logger.debug("Unhandled result status=%s", status) # type: ignore[unreachable]
Expand All @@ -283,6 +302,7 @@ def collect_results(
total_failed=stats["failed"],
total_exhausted=stats["exhausted"],
total_unprocessed=stats["unprocessed"],
total_deferred=stats["deferred"],
guard_condition=guard_condition,
guard_on_false=guard_on_false,
)
Expand Down Expand Up @@ -316,6 +336,7 @@ def collect_results(
skipped=stats["skipped"],
filtered=stats["filtered"],
exhausted=stats["exhausted"],
deferred=stats["deferred"],
unprocessed=stats["unprocessed"],
)

Expand Down
2 changes: 2 additions & 0 deletions agent_actions/storage/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
DISPOSITION_FILTERED = "filtered"
DISPOSITION_EXHAUSTED = "exhausted"
DISPOSITION_FAILED = "failed"
DISPOSITION_DEFERRED = "deferred"
DISPOSITION_UNPROCESSED = "unprocessed"


Expand All @@ -23,6 +24,7 @@ class Disposition(str, Enum):
FILTERED = DISPOSITION_FILTERED
EXHAUSTED = DISPOSITION_EXHAUSTED
FAILED = DISPOSITION_FAILED
DEFERRED = DISPOSITION_DEFERRED
UNPROCESSED = DISPOSITION_UNPROCESSED


Expand Down
32 changes: 31 additions & 1 deletion agent_actions/workflow/managers/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
BatchResultsProcessedEvent,
BatchStatusEvent,
)
from agent_actions.storage.backend import DISPOSITION_PASSTHROUGH
from agent_actions.storage.backend import DISPOSITION_DEFERRED, DISPOSITION_PASSTHROUGH

if TYPE_CHECKING:
from agent_actions.storage.backend import StorageBackend
Expand Down Expand Up @@ -143,6 +143,36 @@ def _process_batch_results(
)
raise

self._warn_orphaned_deferred(agent_name)

def _warn_orphaned_deferred(self, agent_name: str) -> None:
"""Log a warning if DEFERRED dispositions remain after batch processing.

Orphaned DEFERRED records indicate records that were queued for batch
execution but never received a final result — e.g. due to a submission
failure or a provider-side drop. This is diagnostic only and never
raises.
"""
try:
orphans = self.storage_backend.get_disposition(
agent_name, disposition=DISPOSITION_DEFERRED
)
if orphans:
sample_ids = [r.get("record_id", "?") for r in orphans[:10]]
logger.warning(
"[%s] %d record(s) still in DEFERRED state after batch completion "
"— possible orphans: %s",
agent_name,
len(orphans),
sample_ids,
)
except Exception:
logger.debug(
"Could not check for orphaned DEFERRED dispositions for %s",
agent_name,
exc_info=True,
)

def check_batch_submission(
self, agent_name: str, agent_idx: int, agent_io_path: Path
) -> str | None:
Expand Down
80 changes: 80 additions & 0 deletions tests/unit/core/test_result_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,64 @@ def test_no_source_guid_no_disposition(self):

backend.set_disposition.assert_not_called()

def test_deferred_result_writes_disposition(self):
"""DEFERRED records write a disposition with source_guid as record_id."""
backend = self._make_backend()
deferred = ProcessingResult.deferred(
task_id="task-123",
source_guid="src-def",
)

ResultCollector.collect_results(
[deferred],
{},
"agent",
is_first_stage=False,
storage_backend=backend,
)

backend.set_disposition.assert_called_once_with(
"agent",
"src-def",
"deferred",
reason="batch_queued:task_id=task-123",
)

def test_deferred_result_no_source_guid_no_disposition(self):
"""DEFERRED records without source_guid skip the disposition write."""
backend = self._make_backend()
deferred = ProcessingResult.deferred(
task_id="task-456",
source_guid=None,
)

ResultCollector.collect_results(
[deferred],
{},
"agent",
is_first_stage=False,
storage_backend=backend,
)

backend.set_disposition.assert_not_called()

def test_deferred_result_counted_in_stats(self):
"""DEFERRED records are counted in CollectionStats.deferred."""
deferred = ProcessingResult.deferred(
task_id="task-789",
source_guid="src-d",
)

_, stats = ResultCollector.collect_results(
[deferred],
{},
"agent",
is_first_stage=False,
)

assert stats.deferred == 1
assert stats.success == 0

def test_mixed_statuses_write_correct_dispositions(self):
"""Multiple statuses in one batch write the right dispositions."""
backend = self._make_backend()
Expand All @@ -525,3 +583,25 @@ def test_mixed_statuses_write_correct_dispositions(self):
calls = backend.set_disposition.call_args_list
assert calls[0] == (("agent", "filt", "filtered"), {"reason": "guard_filter"})
assert calls[1] == (("agent", "fail", "failed"), {"reason": "err", "input_snapshot": None})

def test_mixed_with_deferred_writes_all_dispositions(self):
"""DEFERRED + other statuses each write their own disposition."""
backend = self._make_backend()

results = [
ProcessingResult.deferred(task_id="t-1", source_guid="src-d"),
ProcessingResult.filtered(source_guid="src-f"),
]

ResultCollector.collect_results(
results,
{},
"agent",
is_first_stage=False,
storage_backend=backend,
)

assert backend.set_disposition.call_count == 2
calls = backend.set_disposition.call_args_list
assert calls[0] == (("agent", "src-d", "deferred"), {"reason": "batch_queued:task_id=t-1"})
assert calls[1] == (("agent", "src-f", "filtered"), {"reason": "guard_filter"})
43 changes: 29 additions & 14 deletions tests/unit/wave3/test_enrichment_complete_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,19 +173,34 @@ def test_overlapping_schema_and_observe_fields(self):


class TestDeferredStatusHandling:
"""DEFERRED status should be handled with INFO log, not 'Unhandled'."""
"""DEFERRED status should be handled with disposition write, not just a log."""

def test_deferred_status_logs_info(self):
from agent_actions.processing.result_collector import ResultCollector

result = MagicMock()
result.status = ProcessingStatus.DEFERRED
result.source_guid = "test-guid"
result.output_data = None
result.skip_reason = None

collector = ResultCollector.__new__(ResultCollector)
collector.logger = MagicMock()

# Verify the DEFERRED branch exists as a known status
def test_deferred_status_exists(self):
"""Verify the DEFERRED branch exists as a known status."""
assert hasattr(ProcessingStatus, "DEFERRED")

def test_deferred_writes_disposition(self):
"""DEFERRED records write a DISPOSITION_DEFERRED to the storage backend."""
from agent_actions.processing.result_collector import ResultCollector
from agent_actions.processing.types import ProcessingResult

deferred = ProcessingResult.deferred(
task_id="task-abc",
source_guid="test-guid",
)
backend = MagicMock()

ResultCollector.collect_results(
[deferred],
{},
"test_agent",
is_first_stage=False,
storage_backend=backend,
)

backend.set_disposition.assert_called_once_with(
"test_agent",
"test-guid",
"deferred",
reason="batch_queued:task_id=task-abc",
)
Loading