From 8d4ad5f3244912186530ffeabe2abb819ab03052 Mon Sep 17 00:00:00 2001 From: Muizz Lateef Date: Thu, 2 Apr 2026 11:16:40 +0100 Subject: [PATCH 1/4] feat: partial failure visibility with completed_with_failures status When an action processes N items and some fail while others succeed, the action is now marked "completed_with_failures" instead of silently completing as "completed". This gives operators visibility into partial failures without halting downstream actions. Changes: - Storage backend: add input_snapshot param to set_disposition() and get_failed_items() method on abstract interface - SQLite: add input_snapshot column with schema migration for existing DBs - Result collector: persists JSON-serialized input snapshot for failed items - Executor: queries storage after success to detect partial failures - State manager: completed_with_failures as terminal status, circuit breaker ignores it (descendants run on successful records) - Tally: shows "N OK | M PARTIAL | S SKIP | K ERROR" - Level lines: yellow for levels with partial failures --- agent_actions/logging/events/formatters.py | 7 ++++- .../logging/events/workflow_events.py | 6 ++-- agent_actions/processing/result_collector.py | 11 +++++++ agent_actions/storage/backend.py | 16 +++++++++- .../storage/backends/sqlite_backend.py | 20 ++++++++++--- agent_actions/workflow/coordinator.py | 2 +- agent_actions/workflow/execution_events.py | 6 +++- agent_actions/workflow/executor.py | 30 ++++++++++++++----- agent_actions/workflow/managers/state.py | 19 ++++++++---- .../workflow/parallel/action_executor.py | 7 +++-- tests/unit/core/test_result_collector.py | 4 ++- .../unit/workflow/test_executor_lifecycle.py | 2 ++ 12 files changed, 104 insertions(+), 26 deletions(-) diff --git a/agent_actions/logging/events/formatters.py b/agent_actions/logging/events/formatters.py index d45143d..1dbd027 100644 --- a/agent_actions/logging/events/formatters.py +++ b/agent_actions/logging/events/formatters.py @@ -81,14 +81,19 @@ def _format_workflow_complete(self, event: BaseEvent) -> str: ts = self._timestamp(event) elapsed = event.data.get("elapsed_time", 0.0) completed = event.data.get("actions_completed", 0) + partial = event.data.get("actions_partial", 0) skipped = event.data.get("actions_skipped", 0) failed = event.data.get("actions_failed", 0) ok = self._status("OK") if completed > 0 else "OK" + part = self._status("PARTIAL") if partial > 0 else "PARTIAL" skip = self._status("SKIP") if skipped > 0 else "SKIP" err = self._status("ERROR") if failed > 0 else "ERROR" - return f"{ts}Completed in {elapsed:.2f}s | {completed} {ok} | {skipped} {skip} | {failed} {err}" + return ( + f"{ts}Completed in {elapsed:.2f}s | {completed} {ok} | " + f"{partial} {part} | {skipped} {skip} | {failed} {err}" + ) def _format_workflow_failed(self, event: BaseEvent) -> str: ts = self._timestamp(event) diff --git a/agent_actions/logging/events/workflow_events.py b/agent_actions/logging/events/workflow_events.py index 4205e2d..7aaf160 100644 --- a/agent_actions/logging/events/workflow_events.py +++ b/agent_actions/logging/events/workflow_events.py @@ -51,6 +51,7 @@ class WorkflowCompleteEvent(BaseEvent): workflow_name: str = "" elapsed_time: float = 0.0 actions_completed: int = 0 + actions_partial: int = 0 actions_skipped: int = 0 actions_failed: int = 0 total_tokens: int = 0 @@ -60,13 +61,14 @@ def __post_init__(self) -> None: self.category = EventCategories.WORKFLOW self.message = ( f"Completed workflow {self.workflow_name} in {self.elapsed_time:.2f}s | " - f"{self.actions_completed} completed | {self.actions_skipped} skipped | " - f"{self.actions_failed} failed" + f"{self.actions_completed} completed | {self.actions_partial} partial | " + f"{self.actions_skipped} skipped | {self.actions_failed} failed" ) self.data = { "workflow_name": self.workflow_name, "elapsed_time": self.elapsed_time, "actions_completed": self.actions_completed, + "actions_partial": self.actions_partial, "actions_skipped": self.actions_skipped, "actions_failed": self.actions_failed, "total_tokens": self.total_tokens, diff --git a/agent_actions/processing/result_collector.py b/agent_actions/processing/result_collector.py index 8ca79a2..b9765c5 100644 --- a/agent_actions/processing/result_collector.py +++ b/agent_actions/processing/result_collector.py @@ -1,6 +1,7 @@ """Shared result aggregation for processing output records.""" import collections +import json import logging from typing import TYPE_CHECKING, Any, Optional @@ -178,12 +179,22 @@ def collect_results( ) ) if storage_backend and result.source_guid: + snapshot_source = result.source_snapshot or result.input_record + input_snapshot_str = None + if snapshot_source and isinstance(snapshot_source, dict): + try: + input_snapshot_str = json.dumps( + snapshot_source, ensure_ascii=False, default=str + ) + except (TypeError, ValueError): + input_snapshot_str = None _safe_set_disposition( storage_backend, agent_name, result.source_guid, DISPOSITION_FAILED, reason=result.error or "processing_error", + input_snapshot=input_snapshot_str, ) elif status == ProcessingStatus.FILTERED: diff --git a/agent_actions/storage/backend.py b/agent_actions/storage/backend.py index 34a2151..1c0922c 100644 --- a/agent_actions/storage/backend.py +++ b/agent_actions/storage/backend.py @@ -118,8 +118,14 @@ def set_disposition( # noqa: B027 disposition: str | Disposition, reason: str | None = None, relative_path: str | None = None, + input_snapshot: str | None = None, ) -> None: - """Write a disposition record (use NODE_LEVEL_RECORD_ID for node-level signals).""" + """Write a disposition record (use NODE_LEVEL_RECORD_ID for node-level signals). + + Args: + input_snapshot: JSON-serialized input record for failed items. + Implementations SHOULD truncate to a reasonable limit (recommended 10KB). + """ # No-op: subclass must override to persist dispositions. def get_disposition( @@ -149,6 +155,14 @@ def clear_disposition( """Delete matching disposition records. Returns count deleted.""" return 0 + def get_failed_items(self, action_name: str) -> list[dict[str, Any]]: + """Return item-level failure dispositions, excluding node-level sentinels.""" + return [ + d + for d in self.get_disposition(action_name, disposition=DISPOSITION_FAILED) + if d.get("record_id") != NODE_LEVEL_RECORD_ID + ] + def delete_target(self, action_name: str) -> int: """Delete all target data for an action. Returns count deleted. diff --git a/agent_actions/storage/backends/sqlite_backend.py b/agent_actions/storage/backends/sqlite_backend.py index 0f229fa..3beed4c 100644 --- a/agent_actions/storage/backends/sqlite_backend.py +++ b/agent_actions/storage/backends/sqlite_backend.py @@ -49,6 +49,7 @@ class SQLiteBackend(StorageBackend): disposition TEXT NOT NULL, reason TEXT, relative_path TEXT, + input_snapshot TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, UNIQUE(action_name, record_id, disposition) ) @@ -165,6 +166,11 @@ def initialize(self) -> None: cursor.execute(self.DISPOSITION_INDEX_ACTION_SQL) cursor.execute(self.DISPOSITION_INDEX_ACTION_DISP_SQL) cursor.execute(self.DISPOSITION_INDEX_ACTION_RECORD_SQL) + # Migration: add input_snapshot column for existing databases + try: + cursor.execute("ALTER TABLE record_disposition ADD COLUMN input_snapshot TEXT") + except sqlite3.OperationalError: + pass # Column already exists self.connection.commit() logger.info( "Initialized SQLite storage backend: %s", @@ -491,6 +497,7 @@ def set_disposition( disposition: str | Disposition, reason: str | None = None, relative_path: str | None = None, + input_snapshot: str | None = None, ) -> None: """Write a disposition record (INSERT OR REPLACE).""" action_name = self._validate_identifier(action_name, "action_name") @@ -501,6 +508,9 @@ def set_disposition( raise ValueError( f"Invalid disposition '{disposition}'. Valid: {sorted(VALID_DISPOSITIONS)}" ) + # Cap input_snapshot at 10KB to prevent storage bloat + if input_snapshot and len(input_snapshot) > 10240: + input_snapshot = input_snapshot[:10240] with self._lock: cursor = self.connection.cursor() @@ -508,10 +518,11 @@ def set_disposition( cursor.execute( """ INSERT OR REPLACE INTO record_disposition - (action_name, record_id, disposition, reason, relative_path, created_at) - VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP) + (action_name, record_id, disposition, reason, relative_path, + input_snapshot, created_at) + VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) """, - (action_name, record_id, disposition, reason, relative_path), + (action_name, record_id, disposition, reason, relative_path, input_snapshot), ) self.connection.commit() logger.debug( @@ -545,7 +556,8 @@ def get_disposition( action_name = self._validate_identifier(action_name, "action_name") query = ( - "SELECT action_name, record_id, disposition, reason, relative_path, created_at" + "SELECT action_name, record_id, disposition, reason, relative_path," + " input_snapshot, created_at" " FROM record_disposition WHERE action_name = ?" ) params: list[str] = [action_name] diff --git a/agent_actions/workflow/coordinator.py b/agent_actions/workflow/coordinator.py index 4e3fda3..cf67f52 100644 --- a/agent_actions/workflow/coordinator.py +++ b/agent_actions/workflow/coordinator.py @@ -475,7 +475,7 @@ def _run_single_action(self, idx: int, action_name: str, total_actions: int) -> if result.status == "skipped": return False # Continue to next action - if result.output_folder and result.status == "completed": + if result.output_folder and result.status in {"completed", "completed_with_failures"}: self.state.ephemeral_directories.append( { "output_folder": result.output_folder, diff --git a/agent_actions/workflow/execution_events.py b/agent_actions/workflow/execution_events.py index 43b6d93..6bde603 100644 --- a/agent_actions/workflow/execution_events.py +++ b/agent_actions/workflow/execution_events.py @@ -90,7 +90,10 @@ def log_action_skip(self, idx: int, action_name: str, total_actions: int): def log_action_result(self, params: ActionLogParams): """Log action execution result via event system.""" - if params.result.success and params.result.status == "completed": + if params.result.success and params.result.status in { + "completed", + "completed_with_failures", + }: tokens = {} if hasattr(params.result, "tokens") and params.result.tokens: tokens = params.result.tokens @@ -129,6 +132,7 @@ def finalize_workflow(self, elapsed_time: float = 0.0): workflow_name=self.agent_name, elapsed_time=elapsed_time, actions_completed=summary.get("completed", 0), + actions_partial=summary.get("completed_with_failures", 0), actions_skipped=summary.get("skipped", 0), actions_failed=summary.get("failed", 0), ) diff --git a/agent_actions/workflow/executor.py b/agent_actions/workflow/executor.py index 760068c..e2ff143 100644 --- a/agent_actions/workflow/executor.py +++ b/agent_actions/workflow/executor.py @@ -146,7 +146,7 @@ def _maybe_invalidate_completed_status( self, action_name: str, action_config: ActionConfigDict, current_status: str ) -> str: """Reset to pending if limit config changed since last completion.""" - if current_status != "completed": + if current_status not in {"completed", "completed_with_failures"}: return current_status details = self.deps.state_manager.get_status_details(action_name) if details.get("record_limit") != action_config.get("record_limit") or details.get( @@ -332,17 +332,33 @@ def _handle_run_success( metrics=ExecutionMetrics(duration=duration), ) - # Normal completion + # Normal completion — check for partial item failures + final_status = "completed" + storage_backend = getattr(self.deps.action_runner, "storage_backend", None) + if storage_backend is not None: + try: + item_failures = storage_backend.get_failed_items(params.action_name) + if item_failures: + final_status = "completed_with_failures" + logger.warning( + "Action '%s' completed with %d item-level failure(s)", + params.action_name, + len(item_failures), + ) + except Exception as e: + logger.debug("Could not check partial failures for %s: %s", params.action_name, e) + self.deps.state_manager.update_status( - params.action_name, "completed", **self._limit_metadata(params.action_config) + params.action_name, final_status, **self._limit_metadata(params.action_config) ) tokens = get_last_usage() + tracker_status = "success" if final_status == "completed" else "partial" if self.run_tracker is not None and self.run_id is not None: config = ActionCompleteConfig( run_id=self.run_id, action_name=params.action_name, - status="success", + status=tracker_status, duration_seconds=duration, tokens=tokens, files_processed=0, @@ -352,7 +368,7 @@ def _handle_run_success( return ActionExecutionResult( success=True, output_folder=output_folder, - status="completed", + status=final_status, metrics=ExecutionMetrics( duration=duration, tokens=tokens, @@ -536,7 +552,7 @@ def execute_action_sync( action_name, action_config, current_status ) - if current_status == "completed": + if current_status in {"completed", "completed_with_failures"}: should_skip, result = self._verify_completion_status(action_name) if should_skip: if result is None: @@ -598,7 +614,7 @@ async def execute_action_async( action_name, action_config, current_status ) - if current_status == "completed": + if current_status in {"completed", "completed_with_failures"}: should_skip, result = self._verify_completion_status(action_name) if should_skip: if result is None: diff --git a/agent_actions/workflow/managers/state.py b/agent_actions/workflow/managers/state.py index 3a56f32..4a353ad 100644 --- a/agent_actions/workflow/managers/state.py +++ b/agent_actions/workflow/managers/state.py @@ -87,9 +87,13 @@ def is_skipped(self, action_name: str) -> bool: """Return True if action was skipped due to upstream dependency failure.""" return self.get_status(action_name) == "skipped" + def is_completed_with_failures(self, action_name: str) -> bool: + """Return True if action completed with partial item failures.""" + return self.get_status(action_name) == "completed_with_failures" + def get_pending_actions(self, agents: list[str]) -> list[str]: - """Return actions that are not yet completed, failed, or skipped (runnable).""" - terminal = {"completed", "failed", "skipped"} + """Return actions that are not yet in a terminal state (runnable).""" + terminal = {"completed", "failed", "skipped", "completed_with_failures"} return [agent for agent in agents if self.get_status(agent) not in terminal] def get_batch_submitted_actions(self, agents: list[str]) -> list[str]: @@ -122,12 +126,15 @@ def get_summary(self) -> dict[str, int]: return summary def is_workflow_complete(self) -> bool: - """Return True if all actions have 'completed' status.""" - return all(details.get("status") == "completed" for details in self.action_status.values()) + """Return True if all actions completed (including partial failures).""" + return all( + details.get("status") in {"completed", "completed_with_failures"} + for details in self.action_status.values() + ) def is_workflow_done(self) -> bool: - """Return True if all actions are in a terminal state (completed, failed, or skipped).""" - terminal = {"completed", "failed", "skipped"} + """Return True if all actions are in a terminal state.""" + terminal = {"completed", "failed", "skipped", "completed_with_failures"} return all(details.get("status") in terminal for details in self.action_status.values()) def has_any_failed(self) -> bool: diff --git a/agent_actions/workflow/parallel/action_executor.py b/agent_actions/workflow/parallel/action_executor.py index 7894968..c20190e 100644 --- a/agent_actions/workflow/parallel/action_executor.py +++ b/agent_actions/workflow/parallel/action_executor.py @@ -243,7 +243,7 @@ async def run_with_limit(action): def _fire_action_result_event(self, action_name: str, idx: int, total: int, result): """Fire action complete or failed event for an execution result.""" - if result.success and result.status == "completed": + if result.success and result.status in {"completed", "completed_with_failures"}: tokens = result.metrics.tokens if result.metrics and result.metrics.tokens else {} fire_event( ActionCompleteEvent( @@ -355,10 +355,13 @@ async def execute_level_async(self, params: LevelExecutionParams) -> bool: duration = (datetime.now() - start_time).total_seconds() has_failed = params.state_manager.get_failed_actions(params.level_actions) + has_partial = any( + params.state_manager.is_completed_with_failures(a) for a in params.level_actions + ) has_skipped = any(params.state_manager.is_skipped(a) for a in params.level_actions) if has_failed: color = "red" - elif has_skipped: + elif has_partial or has_skipped: color = "yellow" else: color = "green" diff --git a/tests/unit/core/test_result_collector.py b/tests/unit/core/test_result_collector.py index 085037a..d47e865 100644 --- a/tests/unit/core/test_result_collector.py +++ b/tests/unit/core/test_result_collector.py @@ -355,6 +355,7 @@ def test_failed_result_writes_disposition(self): "src-fail", "failed", reason="timeout", + input_snapshot=None, ) def test_failed_result_default_reason(self): @@ -381,6 +382,7 @@ def test_failed_result_default_reason(self): "src-fail2", "failed", reason="processing_error", + input_snapshot=None, ) def test_skipped_result_writes_disposition(self): @@ -522,4 +524,4 @@ def test_mixed_statuses_write_correct_dispositions(self): assert backend.set_disposition.call_count == 2 calls = backend.set_disposition.call_args_list assert calls[0] == (("agent", "filt", "filtered"), {"reason": "guard_filter"}) - assert calls[1] == (("agent", "fail", "failed"), {"reason": "err"}) + assert calls[1] == (("agent", "fail", "failed"), {"reason": "err", "input_snapshot": None}) diff --git a/tests/unit/workflow/test_executor_lifecycle.py b/tests/unit/workflow/test_executor_lifecycle.py index e438f1b..82362a1 100644 --- a/tests/unit/workflow/test_executor_lifecycle.py +++ b/tests/unit/workflow/test_executor_lifecycle.py @@ -31,6 +31,8 @@ def mock_deps(): deps.action_runner.workflow_name = "test_workflow" deps.action_runner.get_action_folder.return_value = "/tmp/agent_io" deps.action_runner.execution_order = ["agent_a", "agent_b"] + # Default: no item-level failures (so actions complete as "completed", not "completed_with_failures") + deps.action_runner.storage_backend.get_failed_items.return_value = [] # Default status details for limit-change detection (no limits stored) deps.state_manager.get_status_details.return_value = {"status": "completed"} return deps From f30a825adb962cb4853fcccc9db8edb7d5ea18bf Mon Sep 17 00:00:00 2001 From: Muizz Lateef Date: Thu, 2 Apr 2026 11:25:09 +0100 Subject: [PATCH 2/4] =?UTF-8?q?fix:=20address=20review=20=E2=80=94=20batch?= =?UTF-8?q?=20path,=20dependency=20check,=20manifest=20resume?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Extract _resolve_completion_status() helper and use it in both online and batch completion paths (fixes batch actions ignoring partial failures) - dependency.py: accept completed_with_failures for upstream checks (fixes downstream blocking on partial failures) - manifest.py: is_action_completed() and get_completed_actions() accept completed_with_failures (fixes unnecessary re-runs on resume) --- agent_actions/workflow/executor.py | 42 +++++++++++-------- agent_actions/workflow/managers/manifest.py | 11 +++-- agent_actions/workflow/parallel/dependency.py | 5 ++- tests/unit/workflow/test_executor_events.py | 1 + 4 files changed, 36 insertions(+), 23 deletions(-) diff --git a/agent_actions/workflow/executor.py b/agent_actions/workflow/executor.py index e2ff143..a868cd7 100644 --- a/agent_actions/workflow/executor.py +++ b/agent_actions/workflow/executor.py @@ -333,20 +333,7 @@ def _handle_run_success( ) # Normal completion — check for partial item failures - final_status = "completed" - storage_backend = getattr(self.deps.action_runner, "storage_backend", None) - if storage_backend is not None: - try: - item_failures = storage_backend.get_failed_items(params.action_name) - if item_failures: - final_status = "completed_with_failures" - logger.warning( - "Action '%s' completed with %d item-level failure(s)", - params.action_name, - len(item_failures), - ) - except Exception as e: - logger.debug("Could not check partial failures for %s: %s", params.action_name, e) + final_status = self._resolve_completion_status(params.action_name) self.deps.state_manager.update_status( params.action_name, final_status, **self._limit_metadata(params.action_config) @@ -414,6 +401,23 @@ def _write_skipped_disposition(self, action_name: str, reason: str) -> None: disp_err, ) + def _resolve_completion_status(self, action_name: str) -> str: + """Return 'completed_with_failures' if item-level failures exist, else 'completed'.""" + storage_backend = getattr(self.deps.action_runner, "storage_backend", None) + if storage_backend is not None: + try: + item_failures = storage_backend.get_failed_items(action_name) + if item_failures: + logger.warning( + "Action '%s' completed with %d item-level failure(s)", + action_name, + len(item_failures), + ) + return "completed_with_failures" + except Exception as e: + logger.debug("Could not check partial failures for %s: %s", action_name, e) + return "completed" + def _handle_run_failure( self, params: ActionRunParams, error: Exception ) -> ActionExecutionResult: @@ -670,8 +674,9 @@ def _handle_batch_check( duration = (datetime.now() - start_time).total_seconds() if batch_status == "completed": + final_status = self._resolve_completion_status(action_name) self.deps.state_manager.update_status( - action_name, "completed", **self._limit_metadata(action_config) + action_name, final_status, **self._limit_metadata(action_config) ) fire_event( BatchCompleteEvent( @@ -686,7 +691,7 @@ def _handle_batch_check( return ActionExecutionResult( success=True, output_folder=output_folder, - status="completed", + status=final_status, metrics=ExecutionMetrics(duration=duration), ) @@ -744,8 +749,9 @@ async def _handle_batch_check_async( duration = (datetime.now() - start_time).total_seconds() if batch_status == "completed": + final_status = self._resolve_completion_status(action_name) self.deps.state_manager.update_status( - action_name, "completed", **self._limit_metadata(action_config) + action_name, final_status, **self._limit_metadata(action_config) ) fire_event( BatchCompleteEvent( @@ -760,7 +766,7 @@ async def _handle_batch_check_async( return ActionExecutionResult( success=True, output_folder=output_folder, - status="completed", + status=final_status, metrics=ExecutionMetrics(duration=duration), ) diff --git a/agent_actions/workflow/managers/manifest.py b/agent_actions/workflow/managers/manifest.py index 79df0c4..1a35ef3 100644 --- a/agent_actions/workflow/managers/manifest.py +++ b/agent_actions/workflow/managers/manifest.py @@ -219,9 +219,12 @@ def get_action_index(self, action_name: str) -> int | None: return None def is_action_completed(self, action_name: str) -> bool: - """Return True if action status is 'completed'.""" + """Return True if action completed (including partial failures).""" action = self.manifest.get("actions", {}).get(action_name) - return action is not None and action.get("status") == "completed" + return action is not None and action.get("status") in { + "completed", + "completed_with_failures", + } def is_action_skipped(self, action_name: str) -> bool: """Return True if action status is 'skipped'.""" @@ -339,10 +342,10 @@ def mark_workflow_failed(self, error: str) -> None: self._save_manifest() def get_completed_actions(self) -> list[str]: - """Return all completed action names.""" + """Return all completed action names (including partial failures).""" completed = [] for action_name, action_data in self.manifest.get("actions", {}).items(): - if action_data.get("status") == "completed": + if action_data.get("status") in {"completed", "completed_with_failures"}: completed.append(action_name) return completed diff --git a/agent_actions/workflow/parallel/dependency.py b/agent_actions/workflow/parallel/dependency.py index 5dd8883..4b2d3a7 100644 --- a/agent_actions/workflow/parallel/dependency.py +++ b/agent_actions/workflow/parallel/dependency.py @@ -164,7 +164,10 @@ def _check_workflow_complete(self, workflow_name: str) -> bool: try: with open(upstream_status_file, encoding="utf-8") as f: status_data = json.load(f) - return all(details.get("status") == "completed" for details in status_data.values()) + return all( + details.get("status") in {"completed", "completed_with_failures"} + for details in status_data.values() + ) except (OSError, json.JSONDecodeError, KeyError): return False diff --git a/tests/unit/workflow/test_executor_events.py b/tests/unit/workflow/test_executor_events.py index b7c75d8..00ad75b 100644 --- a/tests/unit/workflow/test_executor_events.py +++ b/tests/unit/workflow/test_executor_events.py @@ -21,6 +21,7 @@ def mock_deps(self): deps.action_runner = MagicMock() deps.action_runner.workflow_name = "test_workflow" deps.action_runner.get_action_folder.return_value = "/tmp/agent_io" + deps.action_runner.storage_backend.get_failed_items.return_value = [] return deps @pytest.fixture From 4495fda0804cf4700eef21b6137283a94c2587b4 Mon Sep 17 00:00:00 2001 From: Muizz Lateef Date: Thu, 2 Apr 2026 11:46:07 +0100 Subject: [PATCH 3/4] =?UTF-8?q?fix:=20address=20review=20=E2=80=94=205=20W?= =?UTF-8?q?ARNINGs=20+=205=20MINORs=20+=20test=20coverage=20gaps?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit WARNINGs fixed: - is_completed() now includes completed_with_failures (stale-completion verification works on resume) - Added PARTIAL to COLORS dict (tally coloring now works) - _resolve_completion_status exception path logs at WARNING not DEBUG - JSON truncation produces valid JSON with __truncated__ sentinel instead of mid-character slice - TOCTOU acknowledged (low impact, documented) MINORs fixed: - mark_action_completed() accepts status parameter - Migration ALTER TABLE logs at DEBUG on both branches - json.dumps failure logs at DEBUG - All test coverage gaps filled (13 new tests): _resolve_completion_status (4 paths), get_failed_items sentinel filtering, circuit breaker ignores partial, is_completed_with_failures, terminal sets, is_workflow_complete with partial, level coloring for partial failures --- agent_actions/logging/events/formatters.py | 1 + agent_actions/processing/result_collector.py | 7 +- .../storage/backends/sqlite_backend.py | 10 +- agent_actions/workflow/executor.py | 2 +- agent_actions/workflow/managers/manifest.py | 5 +- agent_actions/workflow/managers/state.py | 4 +- .../managers/test_state_extensions.py | 41 ++++++ tests/unit/workflow/test_circuit_breaker.py | 117 ++++++++++++++++++ 8 files changed, 178 insertions(+), 9 deletions(-) diff --git a/agent_actions/logging/events/formatters.py b/agent_actions/logging/events/formatters.py index 1dbd027..ae48afc 100644 --- a/agent_actions/logging/events/formatters.py +++ b/agent_actions/logging/events/formatters.py @@ -16,6 +16,7 @@ class AgentActionsFormatter: # Status colors for Rich COLORS = { "OK": "green", + "PARTIAL": "yellow", "SKIP": "yellow", "CACHED": "cyan", "ERROR": "red", diff --git a/agent_actions/processing/result_collector.py b/agent_actions/processing/result_collector.py index b9765c5..042b77c 100644 --- a/agent_actions/processing/result_collector.py +++ b/agent_actions/processing/result_collector.py @@ -186,7 +186,12 @@ def collect_results( input_snapshot_str = json.dumps( snapshot_source, ensure_ascii=False, default=str ) - except (TypeError, ValueError): + except (TypeError, ValueError) as snap_err: + logger.debug( + "Could not serialize input snapshot for %s: %s", + result.source_guid, + snap_err, + ) input_snapshot_str = None _safe_set_disposition( storage_backend, diff --git a/agent_actions/storage/backends/sqlite_backend.py b/agent_actions/storage/backends/sqlite_backend.py index 3beed4c..dd065bf 100644 --- a/agent_actions/storage/backends/sqlite_backend.py +++ b/agent_actions/storage/backends/sqlite_backend.py @@ -169,8 +169,9 @@ def initialize(self) -> None: # Migration: add input_snapshot column for existing databases try: cursor.execute("ALTER TABLE record_disposition ADD COLUMN input_snapshot TEXT") + logger.debug("Added input_snapshot column to record_disposition") except sqlite3.OperationalError: - pass # Column already exists + logger.debug("input_snapshot column already exists in record_disposition") self.connection.commit() logger.info( "Initialized SQLite storage backend: %s", @@ -508,9 +509,12 @@ def set_disposition( raise ValueError( f"Invalid disposition '{disposition}'. Valid: {sorted(VALID_DISPOSITIONS)}" ) - # Cap input_snapshot at 10KB to prevent storage bloat + # Cap input_snapshot at 10KB to prevent storage bloat. + # Wrap truncated content so consumers can detect and skip invalid JSON. if input_snapshot and len(input_snapshot) > 10240: - input_snapshot = input_snapshot[:10240] + input_snapshot = ( + '{"__truncated__": true, "partial": ' + json.dumps(input_snapshot[:8192]) + "}" + ) with self._lock: cursor = self.connection.cursor() diff --git a/agent_actions/workflow/executor.py b/agent_actions/workflow/executor.py index a868cd7..1eaee09 100644 --- a/agent_actions/workflow/executor.py +++ b/agent_actions/workflow/executor.py @@ -415,7 +415,7 @@ def _resolve_completion_status(self, action_name: str) -> str: ) return "completed_with_failures" except Exception as e: - logger.debug("Could not check partial failures for %s: %s", action_name, e) + logger.warning("Could not check partial failures for %s: %s", action_name, e) return "completed" def _handle_run_failure( diff --git a/agent_actions/workflow/managers/manifest.py b/agent_actions/workflow/managers/manifest.py index 1a35ef3..fffba45 100644 --- a/agent_actions/workflow/managers/manifest.py +++ b/agent_actions/workflow/managers/manifest.py @@ -254,8 +254,9 @@ def mark_action_completed( self, action_name: str, record_count: int | None = None, + status: str = "completed", ) -> None: - """Mark an action as completed. + """Mark an action as completed (or completed_with_failures). Raises: KeyError: If action not found in manifest. @@ -269,7 +270,7 @@ def mark_action_completed( "ManifestManager._manifest is None; " "initialize_manifest() or load_manifest() must be called first" ) - self._manifest["actions"][action_name]["status"] = "completed" + self._manifest["actions"][action_name]["status"] = status self._manifest["actions"][action_name]["completed_at"] = datetime.now().isoformat() if record_count is not None: self._manifest["actions"][action_name]["record_count"] = record_count diff --git a/agent_actions/workflow/managers/state.py b/agent_actions/workflow/managers/state.py index 4a353ad..0bf1123 100644 --- a/agent_actions/workflow/managers/state.py +++ b/agent_actions/workflow/managers/state.py @@ -72,8 +72,8 @@ def get_status_details(self, action_name: str) -> dict[str, Any]: return self.action_status.get(action_name, {"status": "pending"}) def is_completed(self, action_name: str) -> bool: - """Return True if action is completed.""" - return self.get_status(action_name) == "completed" + """Return True if action completed (including partial failures).""" + return self.get_status(action_name) in {"completed", "completed_with_failures"} def is_batch_submitted(self, action_name: str) -> bool: """Return True if action has batch jobs submitted.""" diff --git a/tests/unit/workflow/managers/test_state_extensions.py b/tests/unit/workflow/managers/test_state_extensions.py index fddd809..9c9765f 100644 --- a/tests/unit/workflow/managers/test_state_extensions.py +++ b/tests/unit/workflow/managers/test_state_extensions.py @@ -152,3 +152,44 @@ def test_summary_with_skipped(self, tmp_path): summary = mgr.get_summary() assert summary == {"completed": 1, "skipped": 1, "failed": 1} + + +class TestCompletedWithFailures: + """Tests for completed_with_failures status.""" + + def test_is_completed_with_failures(self, tmp_path): + mgr = ActionStateManager(tmp_path / "s.json", ["a"]) + mgr.update_status("a", "completed_with_failures") + assert mgr.is_completed_with_failures("a") is True + assert mgr.is_completed("a") is True # is_completed includes partial + + def test_is_completed_with_failures_false(self, tmp_path): + mgr = ActionStateManager(tmp_path / "s.json", ["a", "b"]) + mgr.update_status("a", "completed") + assert mgr.is_completed_with_failures("a") is False + assert mgr.is_completed_with_failures("b") is False + + def test_get_pending_excludes_completed_with_failures(self, tmp_path): + mgr = ActionStateManager(tmp_path / "s.json", ["a", "b"]) + mgr.update_status("a", "completed_with_failures") + assert mgr.get_pending_actions(["a", "b"]) == ["b"] + + def test_is_workflow_complete_with_partial(self, tmp_path): + mgr = ActionStateManager(tmp_path / "s.json", ["a", "b"]) + mgr.update_status("a", "completed") + mgr.update_status("b", "completed_with_failures") + assert mgr.is_workflow_complete() is True + + def test_is_workflow_done_with_partial(self, tmp_path): + mgr = ActionStateManager(tmp_path / "s.json", ["a", "b"]) + mgr.update_status("a", "completed_with_failures") + mgr.update_status("b", "failed") + assert mgr.is_workflow_done() is True + + def test_summary_with_partial(self, tmp_path): + mgr = ActionStateManager(tmp_path / "s.json", ["a", "b", "c"]) + mgr.update_status("a", "completed") + mgr.update_status("b", "completed_with_failures") + mgr.update_status("c", "failed") + summary = mgr.get_summary() + assert summary == {"completed": 1, "completed_with_failures": 1, "failed": 1} diff --git a/tests/unit/workflow/test_circuit_breaker.py b/tests/unit/workflow/test_circuit_breaker.py index 28c1076..0d00a44 100644 --- a/tests/unit/workflow/test_circuit_breaker.py +++ b/tests/unit/workflow/test_circuit_breaker.py @@ -302,3 +302,120 @@ def test_red_takes_precedence_over_yellow(self, tmp_path): assert has_failed assert has_skipped + + def test_yellow_for_completed_with_failures(self, tmp_path): + """Level line is yellow when action has partial failures.""" + mgr = ActionStateManager(tmp_path / "status.json", ["a", "b"]) + mgr.update_status("a", "completed") + mgr.update_status("b", "completed_with_failures") + + has_failed = mgr.get_failed_actions(["a", "b"]) + has_partial = any(mgr.is_completed_with_failures(a) for a in ["a", "b"]) + + assert not has_failed + assert has_partial + + +class TestResolveCompletionStatus: + """Tests for _resolve_completion_status().""" + + @patch("agent_actions.workflow.executor.fire_event") + def test_returns_completed_when_no_failures(self, mock_fire, executor, mock_deps): + mock_deps.action_runner.storage_backend.get_failed_items.return_value = [] + assert executor._resolve_completion_status("agent_a") == "completed" + + @patch("agent_actions.workflow.executor.fire_event") + def test_returns_completed_with_failures_when_items_failed( + self, mock_fire, executor, mock_deps + ): + mock_deps.action_runner.storage_backend.get_failed_items.return_value = [ + {"record_id": "guid-1", "disposition": "failed", "reason": "timeout"} + ] + assert executor._resolve_completion_status("agent_a") == "completed_with_failures" + + @patch("agent_actions.workflow.executor.fire_event") + def test_returns_completed_when_no_storage_backend(self, mock_fire, executor, mock_deps): + mock_deps.action_runner.storage_backend = None + assert executor._resolve_completion_status("agent_a") == "completed" + + @patch("agent_actions.workflow.executor.fire_event") + def test_returns_completed_on_storage_error(self, mock_fire, executor, mock_deps): + mock_deps.action_runner.storage_backend.get_failed_items.side_effect = RuntimeError( + "DB error" + ) + assert executor._resolve_completion_status("agent_a") == "completed" + + +class TestCircuitBreakerIgnoresPartial: + """completed_with_failures must NOT trigger circuit breaker.""" + + def test_completed_with_failures_not_detected_by_upstream_health(self, executor, mock_deps): + mock_deps.state_manager.is_failed.return_value = False + mock_deps.state_manager.is_skipped.return_value = False + mock_deps.action_runner.storage_backend = None + + config = {"dependencies": ["agent_a"]} + result = executor._check_upstream_health("agent_b", config) + assert result is None + + +class TestGetFailedItems: + """Tests for StorageBackend.get_failed_items() default implementation.""" + + def test_filters_node_level_sentinel(self): + from agent_actions.storage.backend import NODE_LEVEL_RECORD_ID, StorageBackend + + class FakeBackend(StorageBackend): + @classmethod + def create(cls, **kwargs): + return cls() + + @property + def backend_type(self): + return "fake" + + def initialize(self): + pass + + def write_target(self, *a, **kw): + pass + + def read_target(self, *a, **kw): + return [] + + def write_source(self, *a, **kw): + pass + + def read_source(self, *a, **kw): + return [] + + def list_target_files(self, *a, **kw): + return [] + + def list_source_files(self, *a, **kw): + return [] + + def preview_target(self, *a, **kw): + return {} + + def get_storage_stats(self): + return {} + + def delete_target(self, *a, **kw): + return 0 + + def get_disposition(self, action_name, record_id=None, disposition=None): + return [ + { + "record_id": NODE_LEVEL_RECORD_ID, + "disposition": "failed", + "reason": "total wipeout", + }, + {"record_id": "guid-1", "disposition": "failed", "reason": "timeout"}, + {"record_id": "guid-2", "disposition": "failed", "reason": "429"}, + ] + + backend = FakeBackend() + items = backend.get_failed_items("action_a") + assert len(items) == 2 + assert all(i["record_id"] != NODE_LEVEL_RECORD_ID for i in items) From f227edcf4fae619d0d9bc73f47bb2c3547481e6b Mon Sep 17 00:00:00 2001 From: Muizz Lateef Date: Thu, 2 Apr 2026 12:01:07 +0100 Subject: [PATCH 4/4] refactor: extract COMPLETED_STATUSES and TERMINAL_STATUSES constants MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The set {"completed", "completed_with_failures"} was repeated as inline literals in 9 locations across 6 files. Extracted as COMPLETED_STATUSES and TERMINAL_STATUSES frozensets in state.py — single source of truth when new statuses are added. --- agent_actions/workflow/coordinator.py | 3 ++- agent_actions/workflow/execution_events.py | 6 ++---- agent_actions/workflow/executor.py | 7 ++++--- agent_actions/workflow/managers/manifest.py | 8 +++----- agent_actions/workflow/managers/state.py | 20 ++++++++++++------- .../workflow/parallel/action_executor.py | 3 ++- agent_actions/workflow/parallel/dependency.py | 4 ++-- 7 files changed, 28 insertions(+), 23 deletions(-) diff --git a/agent_actions/workflow/coordinator.py b/agent_actions/workflow/coordinator.py index cf67f52..07caef9 100644 --- a/agent_actions/workflow/coordinator.py +++ b/agent_actions/workflow/coordinator.py @@ -15,6 +15,7 @@ from agent_actions.workflow.config_pipeline import load_workflow_configs from agent_actions.workflow.execution_events import WorkflowEventLogger from agent_actions.workflow.managers.artifacts import ArtifactLinker +from agent_actions.workflow.managers.state import COMPLETED_STATUSES from agent_actions.workflow.models import ( ActionLogParams, RuntimeContext, @@ -475,7 +476,7 @@ def _run_single_action(self, idx: int, action_name: str, total_actions: int) -> if result.status == "skipped": return False # Continue to next action - if result.output_folder and result.status in {"completed", "completed_with_failures"}: + if result.output_folder and result.status in COMPLETED_STATUSES: self.state.ephemeral_directories.append( { "output_folder": result.output_folder, diff --git a/agent_actions/workflow/execution_events.py b/agent_actions/workflow/execution_events.py index 6bde603..711fe9f 100644 --- a/agent_actions/workflow/execution_events.py +++ b/agent_actions/workflow/execution_events.py @@ -14,6 +14,7 @@ WorkflowFailedEvent, WorkflowStartEvent, ) +from agent_actions.workflow.managers.state import COMPLETED_STATUSES from agent_actions.workflow.models import ActionLogParams, WorkflowRuntimeConfig, WorkflowServices logger = logging.getLogger(__name__) @@ -90,10 +91,7 @@ def log_action_skip(self, idx: int, action_name: str, total_actions: int): def log_action_result(self, params: ActionLogParams): """Log action execution result via event system.""" - if params.result.success and params.result.status in { - "completed", - "completed_with_failures", - }: + if params.result.success and params.result.status in COMPLETED_STATUSES: tokens = {} if hasattr(params.result, "tokens") and params.result.tokens: tokens = params.result.tokens diff --git a/agent_actions/workflow/executor.py b/agent_actions/workflow/executor.py index 1eaee09..8147dd6 100644 --- a/agent_actions/workflow/executor.py +++ b/agent_actions/workflow/executor.py @@ -26,6 +26,7 @@ ) from agent_actions.tooling.docs.run_tracker import ActionCompleteConfig from agent_actions.utils.constants import DEFAULT_ACTION_KIND +from agent_actions.workflow.managers.state import COMPLETED_STATUSES logger = logging.getLogger(__name__) @@ -146,7 +147,7 @@ def _maybe_invalidate_completed_status( self, action_name: str, action_config: ActionConfigDict, current_status: str ) -> str: """Reset to pending if limit config changed since last completion.""" - if current_status not in {"completed", "completed_with_failures"}: + if current_status not in COMPLETED_STATUSES: return current_status details = self.deps.state_manager.get_status_details(action_name) if details.get("record_limit") != action_config.get("record_limit") or details.get( @@ -556,7 +557,7 @@ def execute_action_sync( action_name, action_config, current_status ) - if current_status in {"completed", "completed_with_failures"}: + if current_status in COMPLETED_STATUSES: should_skip, result = self._verify_completion_status(action_name) if should_skip: if result is None: @@ -618,7 +619,7 @@ async def execute_action_async( action_name, action_config, current_status ) - if current_status in {"completed", "completed_with_failures"}: + if current_status in COMPLETED_STATUSES: should_skip, result = self._verify_completion_status(action_name) if should_skip: if result is None: diff --git a/agent_actions/workflow/managers/manifest.py b/agent_actions/workflow/managers/manifest.py index fffba45..efc2c13 100644 --- a/agent_actions/workflow/managers/manifest.py +++ b/agent_actions/workflow/managers/manifest.py @@ -12,6 +12,7 @@ from typing import Any, cast from agent_actions.errors import ConfigurationError +from agent_actions.workflow.managers.state import COMPLETED_STATUSES logger = logging.getLogger(__name__) @@ -221,10 +222,7 @@ def get_action_index(self, action_name: str) -> int | None: def is_action_completed(self, action_name: str) -> bool: """Return True if action completed (including partial failures).""" action = self.manifest.get("actions", {}).get(action_name) - return action is not None and action.get("status") in { - "completed", - "completed_with_failures", - } + return action is not None and action.get("status") in COMPLETED_STATUSES def is_action_skipped(self, action_name: str) -> bool: """Return True if action status is 'skipped'.""" @@ -346,7 +344,7 @@ def get_completed_actions(self) -> list[str]: """Return all completed action names (including partial failures).""" completed = [] for action_name, action_data in self.manifest.get("actions", {}).items(): - if action_data.get("status") in {"completed", "completed_with_failures"}: + if action_data.get("status") in COMPLETED_STATUSES: completed.append(action_name) return completed diff --git a/agent_actions/workflow/managers/state.py b/agent_actions/workflow/managers/state.py index 0bf1123..685f33d 100644 --- a/agent_actions/workflow/managers/state.py +++ b/agent_actions/workflow/managers/state.py @@ -7,6 +7,13 @@ logger = logging.getLogger(__name__) +# Status sets used across the workflow engine. Import from here to avoid +# scattered set literals that drift when new statuses are added. +COMPLETED_STATUSES: frozenset[str] = frozenset({"completed", "completed_with_failures"}) +TERMINAL_STATUSES: frozenset[str] = frozenset( + {"completed", "failed", "skipped", "completed_with_failures"} +) + class ActionStateManager: """Manages action execution state persistence and queries.""" @@ -73,7 +80,7 @@ def get_status_details(self, action_name: str) -> dict[str, Any]: def is_completed(self, action_name: str) -> bool: """Return True if action completed (including partial failures).""" - return self.get_status(action_name) in {"completed", "completed_with_failures"} + return self.get_status(action_name) in COMPLETED_STATUSES def is_batch_submitted(self, action_name: str) -> bool: """Return True if action has batch jobs submitted.""" @@ -93,8 +100,7 @@ def is_completed_with_failures(self, action_name: str) -> bool: def get_pending_actions(self, agents: list[str]) -> list[str]: """Return actions that are not yet in a terminal state (runnable).""" - terminal = {"completed", "failed", "skipped", "completed_with_failures"} - return [agent for agent in agents if self.get_status(agent) not in terminal] + return [agent for agent in agents if self.get_status(agent) not in TERMINAL_STATUSES] def get_batch_submitted_actions(self, agents: list[str]) -> list[str]: """Return actions with batch jobs submitted.""" @@ -128,14 +134,14 @@ def get_summary(self) -> dict[str, int]: def is_workflow_complete(self) -> bool: """Return True if all actions completed (including partial failures).""" return all( - details.get("status") in {"completed", "completed_with_failures"} - for details in self.action_status.values() + details.get("status") in COMPLETED_STATUSES for details in self.action_status.values() ) def is_workflow_done(self) -> bool: """Return True if all actions are in a terminal state.""" - terminal = {"completed", "failed", "skipped", "completed_with_failures"} - return all(details.get("status") in terminal for details in self.action_status.values()) + return all( + details.get("status") in TERMINAL_STATUSES for details in self.action_status.values() + ) def has_any_failed(self) -> bool: """Return True if any action has 'failed' status.""" diff --git a/agent_actions/workflow/parallel/action_executor.py b/agent_actions/workflow/parallel/action_executor.py index c20190e..13af00a 100644 --- a/agent_actions/workflow/parallel/action_executor.py +++ b/agent_actions/workflow/parallel/action_executor.py @@ -14,6 +14,7 @@ from agent_actions.errors import WorkflowError, get_error_detail from agent_actions.logging.core.manager import fire_event from agent_actions.logging.events import ActionCompleteEvent, ActionFailedEvent +from agent_actions.workflow.managers.state import COMPLETED_STATUSES logger = logging.getLogger(__name__) @@ -243,7 +244,7 @@ async def run_with_limit(action): def _fire_action_result_event(self, action_name: str, idx: int, total: int, result): """Fire action complete or failed event for an execution result.""" - if result.success and result.status in {"completed", "completed_with_failures"}: + if result.success and result.status in COMPLETED_STATUSES: tokens = result.metrics.tokens if result.metrics and result.metrics.tokens else {} fire_event( ActionCompleteEvent( diff --git a/agent_actions/workflow/parallel/dependency.py b/agent_actions/workflow/parallel/dependency.py index 4b2d3a7..1e89ab3 100644 --- a/agent_actions/workflow/parallel/dependency.py +++ b/agent_actions/workflow/parallel/dependency.py @@ -11,6 +11,7 @@ from typing import TYPE_CHECKING, Any from agent_actions.workflow.managers.artifacts import ArtifactLinker +from agent_actions.workflow.managers.state import COMPLETED_STATUSES from agent_actions.workflow.workspace_index import WorkspaceIndex if TYPE_CHECKING: @@ -165,8 +166,7 @@ def _check_workflow_complete(self, workflow_name: str) -> bool: with open(upstream_status_file, encoding="utf-8") as f: status_data = json.load(f) return all( - details.get("status") in {"completed", "completed_with_failures"} - for details in status_data.values() + details.get("status") in COMPLETED_STATUSES for details in status_data.values() ) except (OSError, json.JSONDecodeError, KeyError): return False