diff --git a/agent_actions/workflow/executor.py b/agent_actions/workflow/executor.py index bbfe4f1..760068c 100644 --- a/agent_actions/workflow/executor.py +++ b/agent_actions/workflow/executor.py @@ -19,7 +19,11 @@ BatchCompleteEvent, BatchSubmittedEvent, ) -from agent_actions.storage.backend import DISPOSITION_FAILED, NODE_LEVEL_RECORD_ID +from agent_actions.storage.backend import ( + DISPOSITION_FAILED, + DISPOSITION_SKIPPED, + NODE_LEVEL_RECORD_ID, +) from agent_actions.tooling.docs.run_tracker import ActionCompleteConfig from agent_actions.utils.constants import DEFAULT_ACTION_KIND @@ -72,7 +76,7 @@ class ActionExecutionResult: success: bool output_folder: str | None = None - status: str = "completed" # 'completed', 'batch_submitted', 'failed' + status: str = "completed" # 'completed', 'batch_submitted', 'failed', 'skipped' error: Exception | None = None metrics: ExecutionMetrics = field(default_factory=ExecutionMetrics) @@ -178,22 +182,24 @@ def _verify_completion_status( # Check disposition FIRST — a failed action may have partial # results in storage. The disposition is the authoritative # signal; output existence is irrelevant when it's set. - if storage_backend.has_disposition( - action_name, - DISPOSITION_FAILED, - record_id=NODE_LEVEL_RECORD_ID, - ): - logger.info( - "Action %s has DISPOSITION_FAILED from prior run — re-running", + for disp in (DISPOSITION_FAILED, DISPOSITION_SKIPPED): + if storage_backend.has_disposition( action_name, - ) - storage_backend.clear_disposition( - action_name, - DISPOSITION_FAILED, + disp, record_id=NODE_LEVEL_RECORD_ID, - ) - self.deps.state_manager.update_status(action_name, "pending") - return (False, None) + ): + logger.info( + "Action %s has %s from prior run — re-running", + action_name, + disp, + ) + storage_backend.clear_disposition( + action_name, + disp, + record_id=NODE_LEVEL_RECORD_ID, + ) + self.deps.state_manager.update_status(action_name, "pending") + return (False, None) target_files = storage_backend.list_target_files(action_name) if not target_files: @@ -374,6 +380,24 @@ def _write_failed_disposition(self, action_name: str, reason: str) -> None: disp_err, ) + def _write_skipped_disposition(self, action_name: str, reason: str) -> None: + """Write DISPOSITION_SKIPPED to storage so downstream and future runs detect the skip.""" + storage_backend = getattr(self.deps.action_runner, "storage_backend", None) + if storage_backend is not None: + try: + storage_backend.set_disposition( + action_name=action_name, + record_id=NODE_LEVEL_RECORD_ID, + disposition=DISPOSITION_SKIPPED, + reason=reason[:500], + ) + except Exception as disp_err: + logger.warning( + "Failed to write DISPOSITION_SKIPPED for %s: %s", + action_name, + disp_err, + ) + def _handle_run_failure( self, params: ActionRunParams, error: Exception ) -> ActionExecutionResult: @@ -416,17 +440,22 @@ def _cleanup_correlation( def _check_upstream_health( self, action_name: str, action_config: ActionConfigDict ) -> str | None: - """Return the name of a failed upstream dependency, or None if all healthy.""" + """Return the name of a failed or skipped upstream dependency, or None if all healthy.""" dependencies = action_config.get("dependencies", []) if not dependencies: return None for dep in dependencies: - if self.deps.state_manager.is_failed(dep): + if self.deps.state_manager.is_failed(dep) or self.deps.state_manager.is_skipped(dep): return dep - # Also check disposition — covers cascaded failures from prior levels + # Also check disposition — covers cascaded failures/skips from prior levels storage_backend = getattr(self.deps.action_runner, "storage_backend", None) - if storage_backend is not None and storage_backend.has_disposition( - dep, DISPOSITION_FAILED, record_id=NODE_LEVEL_RECORD_ID + if storage_backend is not None and ( + storage_backend.has_disposition( + dep, DISPOSITION_FAILED, record_id=NODE_LEVEL_RECORD_ID + ) + or storage_backend.has_disposition( + dep, DISPOSITION_SKIPPED, record_id=NODE_LEVEL_RECORD_ID + ) ): return dep return None @@ -441,12 +470,15 @@ def _handle_dependency_skip( ) -> ActionExecutionResult: """Handle action skip due to upstream dependency failure. - State is set to ``"failed"`` so transitive dependents also skip via - ``is_failed``. ``success=True`` keeps independent branches alive. + State is set to ``"skipped"`` so transitive dependents also skip via + ``is_skipped``. ``success=True`` keeps independent branches alive. """ - reason = f"Upstream dependency '{failed_dependency}' failed" - self.deps.state_manager.update_status(action_name, "failed") - self._write_failed_disposition(action_name, reason) + dep_status = ( + "skipped" if self.deps.state_manager.is_skipped(failed_dependency) else "failed" + ) + reason = f"Upstream dependency '{failed_dependency}' {dep_status}" + self.deps.state_manager.update_status(action_name, "skipped") + self._write_skipped_disposition(action_name, reason) duration = (datetime.now() - start_time).total_seconds() total_actions = ( @@ -467,14 +499,14 @@ def _handle_dependency_skip( config = ActionCompleteConfig( run_id=self.run_id, action_name=action_name, - status="failed", + status="skipped", duration_seconds=duration, skip_reason=reason, ) self.run_tracker.record_action_complete(config=config) return ActionExecutionResult( - success=True, status="failed", metrics=ExecutionMetrics(duration=duration) + success=True, status="skipped", metrics=ExecutionMetrics(duration=duration) ) def execute_action_sync( diff --git a/agent_actions/workflow/managers/_MANIFEST.md b/agent_actions/workflow/managers/_MANIFEST.md index 99d0176..5ebc0d6 100644 --- a/agent_actions/workflow/managers/_MANIFEST.md +++ b/agent_actions/workflow/managers/_MANIFEST.md @@ -14,4 +14,4 @@ Tracks workflow artifacts, batching, loops, state, and skip logic used by the ru | `manifest.py` | Module | Generates workflow manifests consumed by tooling/docs. | `tooling.docs`, `workflow` | | `output.py` | Module | `ActionOutputManager`: loads upstream outputs, resolves version correlation, creates passthrough outputs. `detect_explicit_version_consumption()` result is lazy-cached per instance to avoid redundant computation. | `output`, `workflow` | | `skip.py` | Module | Skip logic used when upstream items fail or guard conditions filter them. | `validation`, `workflow` | -| `state.py` | Module | ActionStateManager — manages action execution state persistence and queries. | `workflow`, `state_management` | +| `state.py` | Module | ActionStateManager — manages action execution state persistence and queries. Terminal states: `completed`, `failed`, `skipped`. Key methods: `is_failed()`, `is_skipped()`, `get_pending_actions()`, `is_workflow_done()`, `get_summary()`. | `workflow`, `state_management` | diff --git a/agent_actions/workflow/managers/state.py b/agent_actions/workflow/managers/state.py index 7436140..3a56f32 100644 --- a/agent_actions/workflow/managers/state.py +++ b/agent_actions/workflow/managers/state.py @@ -83,9 +83,13 @@ def is_failed(self, action_name: str) -> bool: """Return True if action has failed.""" return self.get_status(action_name) == "failed" + def is_skipped(self, action_name: str) -> bool: + """Return True if action was skipped due to upstream dependency failure.""" + return self.get_status(action_name) == "skipped" + def get_pending_actions(self, agents: list[str]) -> list[str]: - """Return actions that are not yet completed or failed (runnable).""" - terminal = {"completed", "failed"} + """Return actions that are not yet completed, failed, or skipped (runnable).""" + terminal = {"completed", "failed", "skipped"} return [agent for agent in agents if self.get_status(agent) not in terminal] def get_batch_submitted_actions(self, agents: list[str]) -> list[str]: @@ -122,8 +126,8 @@ def is_workflow_complete(self) -> bool: return all(details.get("status") == "completed" for details in self.action_status.values()) def is_workflow_done(self) -> bool: - """Return True if all actions are in a terminal state (completed or failed).""" - terminal = {"completed", "failed"} + """Return True if all actions are in a terminal state (completed, failed, or skipped).""" + terminal = {"completed", "failed", "skipped"} return all(details.get("status") in terminal for details in self.action_status.values()) def has_any_failed(self) -> bool: diff --git a/agent_actions/workflow/parallel/action_executor.py b/agent_actions/workflow/parallel/action_executor.py index 24a2285..7894968 100644 --- a/agent_actions/workflow/parallel/action_executor.py +++ b/agent_actions/workflow/parallel/action_executor.py @@ -353,5 +353,16 @@ async def execute_level_async(self, params: LevelExecutionParams) -> bool: return False duration = (datetime.now() - start_time).total_seconds() - self.console.print(f"[green]Action {params.level_idx} complete ({duration:.2f}s)[/green]") + + has_failed = params.state_manager.get_failed_actions(params.level_actions) + has_skipped = any(params.state_manager.is_skipped(a) for a in params.level_actions) + if has_failed: + color = "red" + elif has_skipped: + color = "yellow" + else: + color = "green" + self.console.print( + f"[{color}]Action {params.level_idx} complete ({duration:.2f}s)[/{color}]" + ) return True diff --git a/tasks/todo.md b/tasks/todo.md new file mode 100644 index 0000000..6d250fa --- /dev/null +++ b/tasks/todo.md @@ -0,0 +1,121 @@ +# Failure Propagation & Visibility Fixes + +Root issue: per-item failures don't propagate to action status, circuit breaker never fires, operator sees misleading green output. + +Reference: `failures.txt`, PR #67 (added circuit breaker but upstream never signals failure). + +--- + +## Phase 1 — Bugfix: Accurate tally + level coloring (REVISED) + +**Discovery:** PR #67 already added the zero-output check at `pipeline.py:493-508`. The total-wipeout case (0/N items) already propagates as `"failed"`. The remaining bugs were: +- `_handle_dependency_skip` used `status="failed"` → tally showed 0 SKIP +- Level completion line was always green + +**Approach:** Introduced `"skipped"` as a proper terminal status for dependency-skipped actions. + +- [x] **1.1** Add `is_skipped()` to state manager, update terminal sets + - Added `is_skipped()` method to `ActionStateManager` + - Updated `get_pending_actions` terminal: `{"completed", "failed", "skipped"}` + - Updated `is_workflow_done` terminal: `{"completed", "failed", "skipped"}` + +- [x] **1.2** Update executor: `_handle_dependency_skip` → `status="skipped"` + - Added `DISPOSITION_SKIPPED` import + - Added `_write_skipped_disposition()` method + - Changed `_handle_dependency_skip`: state → "skipped", disposition → SKIPPED, result → "skipped" + - Updated `_check_upstream_health`: checks `is_failed OR is_skipped`, checks both dispositions + +- [x] **1.3** Fix level completion line coloring + - `action_executor.py:355`: red if failed, yellow if skipped, green if all OK + +- [x] **1.4** Tests + - Updated all circuit breaker tests to expect "skipped" + - Added `test_dep_skipped_via_state_manager` and `test_dep_skipped_via_disposition` + - Added `TestWriteSkippedDisposition` test class + - Added `TestIsSkipped`, `TestWorkflowDoneWithSkipped` in state extensions + - All 4271 tests pass, ruff clean + +--- + +## Phase 2 — Observability: Partial failure visibility + +**Goal:** When K/N items succeed, action is `completed_with_failures`, item-level failures are persisted and surfaced. + +- [ ] **2.1** Add `completed_with_failures` status to state manager + - New terminal status alongside `completed` and `failed` + - Circuit breaker ignores this status (descendants run on partial output) + - Tally counts it as `PARTIAL` + - Verify: `pytest`, `ruff check .` + +- [ ] **2.2** Item-level failure tracking in storage backend + - Persist which items (by `source_guid` or index) failed and the error message + - Storage schema addition (e.g., `item_failures` table or disposition record) + - Verify: unit test asserting failed items are persisted and retrievable + +- [ ] **2.3** Populate `completed_with_failures` from executor + - After action runner returns: if `items_succeeded > 0 AND items_succeeded < items_attempted` → status `completed_with_failures` + - Write item-level failures to storage backend + - Verify: `pytest`, manual run with partial failure confirms new status + persisted failures + +- [ ] **2.4** Surface partial failures in output + - Action completion log: `"Action 'X': 9/10 items OK, 1 failed (see item failures)"` + - Tally: `"8 OK | 1 PARTIAL | 0 SKIP | 2 ERROR"` + - Level line: yellow for levels containing `completed_with_failures` actions + - Verify: manual run, visual inspection + +- [ ] **2.5** Tests for Phase 2 + - Test: partial failure → status is `completed_with_failures`, not `completed` or `failed` + - Test: item-level failures are persisted with guid + error + - Test: descendants of `completed_with_failures` action still run (not skipped) + - Test: tally shows PARTIAL count + - Verify: `pytest` + +--- + +## Phase 3 — UX: Pause-and-surface + retry + +**Goal:** Workflow pauses on partial failure (default), user can retry failed items or continue. Configurable retry policy. + +- [ ] **3.1** Pause-and-surface on partial failure (default behavior) + - After a level containing `completed_with_failures` actions, pause workflow + - Print summary: `"9/10 OK, 1 failed. Run 'agac retry ' or 'agac run --continue'"` + - List failed items with truncated error messages + - Verify: manual run with partial failure → workflow pauses with clear message + +- [ ] **3.2** `on_partial_failure` config option + - Per-action or workflow-level config: `on_partial_failure: pause | continue` + - Default: `pause` + - `continue`: skip the pause, proceed with partial results (for automated pipelines) + - Verify: `pytest`, manual run with `continue` config → no pause + +- [ ] **3.3** `agac retry` command / `--retry-failed` flag + - `agac retry `: re-run only failed items for the specified action + - `agac run --retry-failed`: re-run failed items for all actions with failures + - Uses persisted item-level failures (Phase 2) to filter input to only failed guids + - On success: update item status, recalculate action status + - Verify: manual run → fail items → retry → items succeed → action becomes `completed` + +- [ ] **3.4** Configurable retry policy + - Per-action config: `retry: {max_attempts: 3, backoff: exponential}` + - Executor retries failed items inline up to `max_attempts` before finalizing + - Only items still failing after all retries are persisted as failures + - Verify: `pytest`, manual run with transient 429 error → retried automatically + +- [ ] **3.5** Tests for Phase 3 + - Test: workflow pauses on partial failure by default + - Test: `on_partial_failure: continue` skips pause + - Test: retry command re-processes only failed items + - Test: retry policy with backoff retries transient errors + - Test: after successful retry, action status updates to `completed` + - Verify: `pytest` + +--- + +## Working Notes + +- Circuit breaker code: `executor.py:416-432` (`_check_upstream_health`) +- Failure recording: `executor.py:377-397` (`_handle_run_failure`) +- Level completion line: `action_executor.py:356` +- Tally output: TBD — need to locate the exact counter logic +- Action runner item processing: TBD — need to trace where per-item errors are caught +- State manager: `managers/state.py` — `is_failed()`, `get_pending_actions()` diff --git a/tests/unit/workflow/managers/test_state_extensions.py b/tests/unit/workflow/managers/test_state_extensions.py index da40822..fddd809 100644 --- a/tests/unit/workflow/managers/test_state_extensions.py +++ b/tests/unit/workflow/managers/test_state_extensions.py @@ -83,8 +83,20 @@ def test_excludes_both_completed_and_failed(self, tmp_path): pending = mgr.get_pending_actions(["a", "b", "c", "d"]) assert pending == ["c", "d"] + def test_excludes_skipped(self, tmp_path): + """get_pending_actions excludes skipped actions.""" + status_file = tmp_path / "status.json" + mgr = ActionStateManager(status_file, ["a", "b", "c"]) + mgr.update_status("a", "completed") + mgr.update_status("b", "skipped") + + pending = mgr.get_pending_actions(["a", "b", "c"]) + assert "a" not in pending + assert "b" not in pending + assert "c" in pending + def test_all_terminal_returns_empty(self, tmp_path): - """When all are completed/failed, returns empty list.""" + """When all are completed/failed/skipped, returns empty list.""" status_file = tmp_path / "status.json" mgr = ActionStateManager(status_file, ["a", "b"]) mgr.update_status("a", "completed") @@ -92,3 +104,51 @@ def test_all_terminal_returns_empty(self, tmp_path): pending = mgr.get_pending_actions(["a", "b"]) assert pending == [] + + +class TestIsSkipped: + """Tests for is_skipped().""" + + def test_is_skipped_true(self, tmp_path): + """is_skipped returns True for skipped actions.""" + status_file = tmp_path / "status.json" + mgr = ActionStateManager(status_file, ["a"]) + mgr.update_status("a", "skipped") + + assert mgr.is_skipped("a") is True + + def test_is_skipped_false_for_other_statuses(self, tmp_path): + """is_skipped returns False for non-skipped actions.""" + status_file = tmp_path / "status.json" + mgr = ActionStateManager(status_file, ["a", "b", "c"]) + mgr.update_status("a", "completed") + mgr.update_status("b", "failed") + + assert mgr.is_skipped("a") is False + assert mgr.is_skipped("b") is False + assert mgr.is_skipped("c") is False + + +class TestWorkflowDoneWithSkipped: + """Tests for is_workflow_done() with skipped status.""" + + def test_done_with_skipped(self, tmp_path): + """is_workflow_done returns True when mix of completed, failed, and skipped.""" + status_file = tmp_path / "status.json" + mgr = ActionStateManager(status_file, ["a", "b", "c"]) + mgr.update_status("a", "completed") + mgr.update_status("b", "failed") + mgr.update_status("c", "skipped") + + assert mgr.is_workflow_done() is True + + def test_summary_with_skipped(self, tmp_path): + """get_summary correctly counts skipped actions.""" + status_file = tmp_path / "status.json" + mgr = ActionStateManager(status_file, ["a", "b", "c"]) + mgr.update_status("a", "completed") + mgr.update_status("b", "skipped") + mgr.update_status("c", "failed") + + summary = mgr.get_summary() + assert summary == {"completed": 1, "skipped": 1, "failed": 1} diff --git a/tests/unit/workflow/test_circuit_breaker.py b/tests/unit/workflow/test_circuit_breaker.py index effdbff..28c1076 100644 --- a/tests/unit/workflow/test_circuit_breaker.py +++ b/tests/unit/workflow/test_circuit_breaker.py @@ -5,7 +5,11 @@ import pytest -from agent_actions.storage.backend import DISPOSITION_FAILED, NODE_LEVEL_RECORD_ID +from agent_actions.storage.backend import ( + DISPOSITION_FAILED, + DISPOSITION_SKIPPED, + NODE_LEVEL_RECORD_ID, +) from agent_actions.workflow.executor import ( ActionExecutionResult, ActionExecutor, @@ -54,6 +58,7 @@ def test_no_dependencies_key_returns_none(self, executor): def test_all_deps_healthy_returns_none(self, executor, mock_deps): """All dependencies healthy — returns None.""" mock_deps.state_manager.is_failed.return_value = False + mock_deps.state_manager.is_skipped.return_value = False mock_deps.action_runner.storage_backend = None config = {"dependencies": ["agent_a"]} @@ -63,6 +68,16 @@ def test_all_deps_healthy_returns_none(self, executor, mock_deps): def test_dep_failed_via_state_manager(self, executor, mock_deps): """One dep failed (state_manager.is_failed) returns dep name.""" mock_deps.state_manager.is_failed.return_value = True + mock_deps.state_manager.is_skipped.return_value = False + + config = {"dependencies": ["agent_a"]} + result = executor._check_upstream_health("agent_b", config) + assert result == "agent_a" + + def test_dep_skipped_via_state_manager(self, executor, mock_deps): + """One dep skipped (state_manager.is_skipped) returns dep name.""" + mock_deps.state_manager.is_failed.return_value = False + mock_deps.state_manager.is_skipped.return_value = True config = {"dependencies": ["agent_a"]} result = executor._check_upstream_health("agent_b", config) @@ -71,21 +86,31 @@ def test_dep_failed_via_state_manager(self, executor, mock_deps): def test_dep_failed_via_disposition(self, executor, mock_deps): """One dep has DISPOSITION_FAILED in storage returns dep name.""" mock_deps.state_manager.is_failed.return_value = False + mock_deps.state_manager.is_skipped.return_value = False storage = MagicMock() - storage.has_disposition.return_value = True + storage.has_disposition.side_effect = lambda dep, disp, **kw: disp == DISPOSITION_FAILED mock_deps.action_runner.storage_backend = storage config = {"dependencies": ["agent_a"]} result = executor._check_upstream_health("agent_b", config) + assert result == "agent_a" + + def test_dep_skipped_via_disposition(self, executor, mock_deps): + """One dep has DISPOSITION_SKIPPED in storage returns dep name.""" + mock_deps.state_manager.is_failed.return_value = False + mock_deps.state_manager.is_skipped.return_value = False + storage = MagicMock() + storage.has_disposition.side_effect = lambda dep, disp, **kw: disp == DISPOSITION_SKIPPED + mock_deps.action_runner.storage_backend = storage + config = {"dependencies": ["agent_a"]} + result = executor._check_upstream_health("agent_b", config) assert result == "agent_a" - storage.has_disposition.assert_called_once_with( - "agent_a", DISPOSITION_FAILED, record_id=NODE_LEVEL_RECORD_ID - ) def test_no_storage_backend_only_checks_state_manager(self, executor, mock_deps): """No storage backend — only checks state_manager.""" mock_deps.state_manager.is_failed.return_value = False + mock_deps.state_manager.is_skipped.return_value = False mock_deps.action_runner.storage_backend = None config = {"dependencies": ["agent_a"]} @@ -97,18 +122,18 @@ class TestHandleDependencySkip: """Tests for _handle_dependency_skip().""" @patch("agent_actions.workflow.executor.fire_event") - def test_updates_state_to_failed(self, mock_fire, executor, mock_deps): - """Updates state to 'failed'.""" + def test_updates_state_to_skipped(self, mock_fire, executor, mock_deps): + """Updates state to 'skipped'.""" mock_deps.action_runner.storage_backend = None start_time = datetime.now() executor._handle_dependency_skip("agent_b", 1, {}, start_time, "agent_a") - mock_deps.state_manager.update_status.assert_called_once_with("agent_b", "failed") + mock_deps.state_manager.update_status.assert_called_once_with("agent_b", "skipped") @patch("agent_actions.workflow.executor.fire_event") - def test_writes_failed_disposition(self, mock_fire, executor, mock_deps): - """Writes DISPOSITION_FAILED to storage.""" + def test_writes_skipped_disposition(self, mock_fire, executor, mock_deps): + """Writes DISPOSITION_SKIPPED to storage.""" storage = MagicMock() mock_deps.action_runner.storage_backend = storage start_time = datetime.now() @@ -117,7 +142,7 @@ def test_writes_failed_disposition(self, mock_fire, executor, mock_deps): storage.set_disposition.assert_called_once() call_kwargs = storage.set_disposition.call_args - assert call_kwargs[1]["disposition"] == DISPOSITION_FAILED + assert call_kwargs[1]["disposition"] == DISPOSITION_SKIPPED assert call_kwargs[1]["action_name"] == "agent_b" @patch("agent_actions.workflow.executor.fire_event") @@ -137,7 +162,7 @@ def test_fires_action_skip_event(self, mock_fire, executor, mock_deps): @patch("agent_actions.workflow.executor.fire_event") def test_records_in_run_tracker_if_available(self, mock_fire, executor, mock_deps): - """Records in run_tracker if available.""" + """Records in run_tracker with status 'skipped'.""" mock_deps.action_runner.storage_backend = None executor.run_tracker = MagicMock() executor.run_id = "run-123" @@ -147,12 +172,12 @@ def test_records_in_run_tracker_if_available(self, mock_fire, executor, mock_dep executor.run_tracker.record_action_complete.assert_called_once() config = executor.run_tracker.record_action_complete.call_args[1]["config"] - assert config.status == "failed" + assert config.status == "skipped" assert config.run_id == "run-123" @patch("agent_actions.workflow.executor.fire_event") - def test_returns_failed_result_with_success_true(self, mock_fire, executor, mock_deps): - """Returns ActionExecutionResult(success=True, status='failed') — failed state, but independent branches continue.""" + def test_returns_skipped_result_with_success_true(self, mock_fire, executor, mock_deps): + """Returns ActionExecutionResult(success=True, status='skipped') — skipped state, but independent branches continue.""" mock_deps.action_runner.storage_backend = None start_time = datetime.now() @@ -160,7 +185,7 @@ def test_returns_failed_result_with_success_true(self, mock_fire, executor, mock assert isinstance(result, ActionExecutionResult) assert result.success is True - assert result.status == "failed" + assert result.status == "skipped" class TestWriteFailedDisposition: @@ -195,3 +220,85 @@ def test_noops_when_storage_backend_is_none(self, executor, mock_deps): # Should not raise; nothing happens executor._write_failed_disposition("agent_a", "Some error") + + +class TestWriteSkippedDisposition: + """Tests for _write_skipped_disposition().""" + + def test_writes_disposition_when_storage_available(self, executor, mock_deps): + """Writes DISPOSITION_SKIPPED when storage backend is available.""" + storage = MagicMock() + mock_deps.action_runner.storage_backend = storage + + executor._write_skipped_disposition("agent_b", "Upstream failed") + + storage.set_disposition.assert_called_once_with( + action_name="agent_b", + record_id=NODE_LEVEL_RECORD_ID, + disposition=DISPOSITION_SKIPPED, + reason="Upstream failed", + ) + + def test_logs_warning_on_storage_error(self, executor, mock_deps, caplog): + """Logs warning on storage error (doesn't raise).""" + storage = MagicMock() + storage.set_disposition.side_effect = RuntimeError("DB error") + mock_deps.action_runner.storage_backend = storage + + executor._write_skipped_disposition("agent_b", "Upstream failed") + + def test_noops_when_storage_backend_is_none(self, executor, mock_deps): + """No-ops when storage backend is None.""" + mock_deps.action_runner.storage_backend = None + + executor._write_skipped_disposition("agent_b", "Upstream failed") + + +class TestLevelCompletionColoring: + """Tests for level completion line color logic (red/yellow/green).""" + + def test_green_when_all_ok(self, tmp_path): + """Level line is green when all actions completed successfully.""" + mgr = ActionStateManager(tmp_path / "status.json", ["a", "b"]) + mgr.update_status("a", "completed") + mgr.update_status("b", "completed") + + has_failed = mgr.get_failed_actions(["a", "b"]) + has_skipped = any(mgr.is_skipped(a) for a in ["a", "b"]) + + assert not has_failed + assert not has_skipped + + def test_red_when_action_failed(self, tmp_path): + """Level line is red when any action failed.""" + mgr = ActionStateManager(tmp_path / "status.json", ["a", "b"]) + mgr.update_status("a", "completed") + mgr.update_status("b", "failed") + + has_failed = mgr.get_failed_actions(["a", "b"]) + assert has_failed + + def test_yellow_when_action_skipped(self, tmp_path): + """Level line is yellow when actions are skipped but none failed.""" + mgr = ActionStateManager(tmp_path / "status.json", ["a", "b"]) + mgr.update_status("a", "completed") + mgr.update_status("b", "skipped") + + has_failed = mgr.get_failed_actions(["a", "b"]) + has_skipped = any(mgr.is_skipped(a) for a in ["a", "b"]) + + assert not has_failed + assert has_skipped + + def test_red_takes_precedence_over_yellow(self, tmp_path): + """Red (failed) takes precedence over yellow (skipped).""" + mgr = ActionStateManager(tmp_path / "status.json", ["a", "b", "c"]) + mgr.update_status("a", "completed") + mgr.update_status("b", "failed") + mgr.update_status("c", "skipped") + + has_failed = mgr.get_failed_actions(["a", "b", "c"]) + has_skipped = any(mgr.is_skipped(a) for a in ["a", "b", "c"]) + + assert has_failed + assert has_skipped