diff --git a/agent_actions/llm/batch/services/processing_recovery.py b/agent_actions/llm/batch/services/processing_recovery.py index 4bc7ab5..ca8c3f7 100644 --- a/agent_actions/llm/batch/services/processing_recovery.py +++ b/agent_actions/llm/batch/services/processing_recovery.py @@ -26,6 +26,7 @@ from agent_actions.logging.events import BatchCompleteEvent from agent_actions.processing.types import RecoveryMetadata from agent_actions.storage.backend import ( + DISPOSITION_DEFERRED, DISPOSITION_EXHAUSTED, DISPOSITION_FAILED, DISPOSITION_FILTERED, @@ -515,6 +516,9 @@ def write_record_dispositions( finalize_batch_output() (multi-batch collection path). These are mutually exclusive entry points — a given batch never flows through both. + Also clears any prior DEFERRED disposition for each record, since the + batch result represents the final status (ARCH-003). + Disposition writes are telemetry — errors are logged but never propagated. """ if not service._storage_backend: @@ -525,6 +529,23 @@ def write_record_dispositions( continue metadata = item.get("metadata", {}) + try: + # Clear the DEFERRED disposition now that the batch result has + # arrived. For success records this is the only disposition + # action; for non-success records the final disposition is + # written immediately below. + service._storage_backend.clear_disposition( + action_name, + disposition=DISPOSITION_DEFERRED, + record_id=source_guid, + ) + except Exception: + logger.debug( + "Could not clear DEFERRED disposition for %s (may not exist)", + source_guid, + exc_info=True, + ) + try: if metadata.get("retry_exhausted"): service._storage_backend.set_disposition( diff --git a/agent_actions/logging/events/data_pipeline_events.py b/agent_actions/logging/events/data_pipeline_events.py index a7ccccc..bb6e4c7 100644 --- a/agent_actions/logging/events/data_pipeline_events.py +++ b/agent_actions/logging/events/data_pipeline_events.py @@ -461,18 +461,24 @@ class ResultCollectionCompleteEvent(BaseEvent): total_failed: int = 0 total_exhausted: int = 0 total_unprocessed: int = 0 + total_deferred: int = 0 guard_condition: str = "" guard_on_false: str = "" def __post_init__(self) -> None: self.level = EventLevel.INFO self.category = EventCategories.DATA_PROCESSING - self.message = ( - f"[{self.action_name}] Result collection complete: " - f"{self.total_success} success, {self.total_skipped} skipped, " - f"{self.total_filtered} filtered, {self.total_failed} failed, " - f"{self.total_exhausted} exhausted, {self.total_unprocessed} unprocessed" - ) + parts = [ + f"{self.total_success} success", + f"{self.total_skipped} skipped", + f"{self.total_filtered} filtered", + f"{self.total_failed} failed", + f"{self.total_exhausted} exhausted", + f"{self.total_unprocessed} unprocessed", + ] + if self.total_deferred: + parts.append(f"{self.total_deferred} deferred") + self.message = f"[{self.action_name}] Result collection complete: " + ", ".join(parts) self.data = { "action_name": self.action_name, "total_success": self.total_success, @@ -481,6 +487,7 @@ def __post_init__(self) -> None: "total_failed": self.total_failed, "total_exhausted": self.total_exhausted, "total_unprocessed": self.total_unprocessed, + "total_deferred": self.total_deferred, "guard_condition": self.guard_condition, "guard_on_false": self.guard_on_false, } diff --git a/agent_actions/processing/result_collector.py b/agent_actions/processing/result_collector.py index 60eb366..6f6bd40 100644 --- a/agent_actions/processing/result_collector.py +++ b/agent_actions/processing/result_collector.py @@ -16,6 +16,7 @@ ) from agent_actions.processing.types import ProcessingResult, ProcessingStatus from agent_actions.storage.backend import ( + DISPOSITION_DEFERRED, DISPOSITION_EXHAUSTED, DISPOSITION_FAILED, DISPOSITION_FILTERED, @@ -48,6 +49,7 @@ class CollectionStats: skipped: int = 0 filtered: int = 0 exhausted: int = 0 + deferred: int = 0 unprocessed: int = 0 @@ -262,10 +264,27 @@ def collect_results( ) elif status == ProcessingStatus.DEFERRED: + task_id = result.task_id or "" logger.info( - "Collected DEFERRED result source_guid=%s", + "Collected DEFERRED result source_guid=%s task_id=%s", result.source_guid, + task_id, ) + fire_event( + ResultCollectedEvent( + action_name=agent_name, + result_index=idx, + status="deferred", + ) + ) + if storage_backend and result.source_guid: + _safe_set_disposition( + storage_backend, + agent_name, + result.source_guid, + DISPOSITION_DEFERRED, + reason=f"batch_queued:task_id={task_id}", + ) else: logger.debug("Unhandled result status=%s", status) # type: ignore[unreachable] @@ -283,6 +302,7 @@ def collect_results( total_failed=stats["failed"], total_exhausted=stats["exhausted"], total_unprocessed=stats["unprocessed"], + total_deferred=stats["deferred"], guard_condition=guard_condition, guard_on_false=guard_on_false, ) @@ -316,6 +336,7 @@ def collect_results( skipped=stats["skipped"], filtered=stats["filtered"], exhausted=stats["exhausted"], + deferred=stats["deferred"], unprocessed=stats["unprocessed"], ) diff --git a/agent_actions/storage/backend.py b/agent_actions/storage/backend.py index 1c0922c..a8ce9b8 100644 --- a/agent_actions/storage/backend.py +++ b/agent_actions/storage/backend.py @@ -12,6 +12,7 @@ DISPOSITION_FILTERED = "filtered" DISPOSITION_EXHAUSTED = "exhausted" DISPOSITION_FAILED = "failed" +DISPOSITION_DEFERRED = "deferred" DISPOSITION_UNPROCESSED = "unprocessed" @@ -23,6 +24,7 @@ class Disposition(str, Enum): FILTERED = DISPOSITION_FILTERED EXHAUSTED = DISPOSITION_EXHAUSTED FAILED = DISPOSITION_FAILED + DEFERRED = DISPOSITION_DEFERRED UNPROCESSED = DISPOSITION_UNPROCESSED diff --git a/agent_actions/workflow/managers/batch.py b/agent_actions/workflow/managers/batch.py index fe1bace..1a1f0cf 100644 --- a/agent_actions/workflow/managers/batch.py +++ b/agent_actions/workflow/managers/batch.py @@ -15,7 +15,7 @@ BatchResultsProcessedEvent, BatchStatusEvent, ) -from agent_actions.storage.backend import DISPOSITION_PASSTHROUGH +from agent_actions.storage.backend import DISPOSITION_DEFERRED, DISPOSITION_PASSTHROUGH if TYPE_CHECKING: from agent_actions.storage.backend import StorageBackend @@ -143,6 +143,36 @@ def _process_batch_results( ) raise + self._warn_orphaned_deferred(agent_name) + + def _warn_orphaned_deferred(self, agent_name: str) -> None: + """Log a warning if DEFERRED dispositions remain after batch processing. + + Orphaned DEFERRED records indicate records that were queued for batch + execution but never received a final result — e.g. due to a submission + failure or a provider-side drop. This is diagnostic only and never + raises. + """ + try: + orphans = self.storage_backend.get_disposition( + agent_name, disposition=DISPOSITION_DEFERRED + ) + if orphans: + sample_ids = [r.get("record_id", "?") for r in orphans[:10]] + logger.warning( + "[%s] %d record(s) still in DEFERRED state after batch completion " + "— possible orphans: %s", + agent_name, + len(orphans), + sample_ids, + ) + except Exception: + logger.debug( + "Could not check for orphaned DEFERRED dispositions for %s", + agent_name, + exc_info=True, + ) + def check_batch_submission( self, agent_name: str, agent_idx: int, agent_io_path: Path ) -> str | None: diff --git a/tests/unit/core/test_result_collector.py b/tests/unit/core/test_result_collector.py index 6eb5dc9..5325c83 100644 --- a/tests/unit/core/test_result_collector.py +++ b/tests/unit/core/test_result_collector.py @@ -503,6 +503,64 @@ def test_no_source_guid_no_disposition(self): backend.set_disposition.assert_not_called() + def test_deferred_result_writes_disposition(self): + """DEFERRED records write a disposition with source_guid as record_id.""" + backend = self._make_backend() + deferred = ProcessingResult.deferred( + task_id="task-123", + source_guid="src-def", + ) + + ResultCollector.collect_results( + [deferred], + {}, + "agent", + is_first_stage=False, + storage_backend=backend, + ) + + backend.set_disposition.assert_called_once_with( + "agent", + "src-def", + "deferred", + reason="batch_queued:task_id=task-123", + ) + + def test_deferred_result_no_source_guid_no_disposition(self): + """DEFERRED records without source_guid skip the disposition write.""" + backend = self._make_backend() + deferred = ProcessingResult.deferred( + task_id="task-456", + source_guid=None, + ) + + ResultCollector.collect_results( + [deferred], + {}, + "agent", + is_first_stage=False, + storage_backend=backend, + ) + + backend.set_disposition.assert_not_called() + + def test_deferred_result_counted_in_stats(self): + """DEFERRED records are counted in CollectionStats.deferred.""" + deferred = ProcessingResult.deferred( + task_id="task-789", + source_guid="src-d", + ) + + _, stats = ResultCollector.collect_results( + [deferred], + {}, + "agent", + is_first_stage=False, + ) + + assert stats.deferred == 1 + assert stats.success == 0 + def test_mixed_statuses_write_correct_dispositions(self): """Multiple statuses in one batch write the right dispositions.""" backend = self._make_backend() @@ -525,3 +583,25 @@ def test_mixed_statuses_write_correct_dispositions(self): calls = backend.set_disposition.call_args_list assert calls[0] == (("agent", "filt", "filtered"), {"reason": "guard_filter"}) assert calls[1] == (("agent", "fail", "failed"), {"reason": "err", "input_snapshot": None}) + + def test_mixed_with_deferred_writes_all_dispositions(self): + """DEFERRED + other statuses each write their own disposition.""" + backend = self._make_backend() + + results = [ + ProcessingResult.deferred(task_id="t-1", source_guid="src-d"), + ProcessingResult.filtered(source_guid="src-f"), + ] + + ResultCollector.collect_results( + results, + {}, + "agent", + is_first_stage=False, + storage_backend=backend, + ) + + assert backend.set_disposition.call_count == 2 + calls = backend.set_disposition.call_args_list + assert calls[0] == (("agent", "src-d", "deferred"), {"reason": "batch_queued:task_id=t-1"}) + assert calls[1] == (("agent", "src-f", "filtered"), {"reason": "guard_filter"}) diff --git a/tests/unit/wave3/test_enrichment_complete_event.py b/tests/unit/wave3/test_enrichment_complete_event.py index cfb7373..2a16057 100644 --- a/tests/unit/wave3/test_enrichment_complete_event.py +++ b/tests/unit/wave3/test_enrichment_complete_event.py @@ -173,19 +173,34 @@ def test_overlapping_schema_and_observe_fields(self): class TestDeferredStatusHandling: - """DEFERRED status should be handled with INFO log, not 'Unhandled'.""" + """DEFERRED status should be handled with disposition write, not just a log.""" - def test_deferred_status_logs_info(self): - from agent_actions.processing.result_collector import ResultCollector - - result = MagicMock() - result.status = ProcessingStatus.DEFERRED - result.source_guid = "test-guid" - result.output_data = None - result.skip_reason = None - - collector = ResultCollector.__new__(ResultCollector) - collector.logger = MagicMock() - - # Verify the DEFERRED branch exists as a known status + def test_deferred_status_exists(self): + """Verify the DEFERRED branch exists as a known status.""" assert hasattr(ProcessingStatus, "DEFERRED") + + def test_deferred_writes_disposition(self): + """DEFERRED records write a DISPOSITION_DEFERRED to the storage backend.""" + from agent_actions.processing.result_collector import ResultCollector + from agent_actions.processing.types import ProcessingResult + + deferred = ProcessingResult.deferred( + task_id="task-abc", + source_guid="test-guid", + ) + backend = MagicMock() + + ResultCollector.collect_results( + [deferred], + {}, + "test_agent", + is_first_stage=False, + storage_backend=backend, + ) + + backend.set_disposition.assert_called_once_with( + "test_agent", + "test-guid", + "deferred", + reason="batch_queued:task_id=task-abc", + )