From 5c7282364b5f5f3540185fe681eb1f3efe3a5777 Mon Sep 17 00:00:00 2001 From: Muizz Lateef Date: Thu, 2 Apr 2026 10:05:00 +0100 Subject: [PATCH 1/4] fix: dependency-skipped actions now show as SKIP in tally and level lines reflect failures Dependency-skipped actions were incorrectly counted as ERROR in the tally (always showing "0 SKIP") because _handle_dependency_skip set status to "failed". Level completion lines were always green regardless of failures. Changes: - Introduce "skipped" as a proper terminal status for dependency-skipped actions - _handle_dependency_skip now sets status="skipped" and writes DISPOSITION_SKIPPED - _check_upstream_health checks both is_failed and is_skipped for transitive cascade - get_pending_actions and is_workflow_done treat "skipped" as terminal - Level completion lines: red (failures), yellow (skips), green (all OK) - Tally now correctly shows "N OK | M SKIP | K ERROR" --- agent_actions/workflow/executor.py | 53 ++++++-- agent_actions/workflow/managers/_MANIFEST.md | 2 +- agent_actions/workflow/managers/state.py | 12 +- .../workflow/parallel/action_executor.py | 15 ++- tasks/todo.md | 121 ++++++++++++++++++ .../managers/test_state_extensions.py | 62 ++++++++- tests/unit/workflow/test_circuit_breaker.py | 80 +++++++++--- 7 files changed, 309 insertions(+), 36 deletions(-) create mode 100644 tasks/todo.md diff --git a/agent_actions/workflow/executor.py b/agent_actions/workflow/executor.py index bbfe4f1..8202a4c 100644 --- a/agent_actions/workflow/executor.py +++ b/agent_actions/workflow/executor.py @@ -19,7 +19,11 @@ BatchCompleteEvent, BatchSubmittedEvent, ) -from agent_actions.storage.backend import DISPOSITION_FAILED, NODE_LEVEL_RECORD_ID +from agent_actions.storage.backend import ( + DISPOSITION_FAILED, + DISPOSITION_SKIPPED, + NODE_LEVEL_RECORD_ID, +) from agent_actions.tooling.docs.run_tracker import ActionCompleteConfig from agent_actions.utils.constants import DEFAULT_ACTION_KIND @@ -72,7 +76,7 @@ class ActionExecutionResult: success: bool output_folder: str | None = None - status: str = "completed" # 'completed', 'batch_submitted', 'failed' + status: str = "completed" # 'completed', 'batch_submitted', 'failed', 'skipped' error: Exception | None = None metrics: ExecutionMetrics = field(default_factory=ExecutionMetrics) @@ -374,6 +378,24 @@ def _write_failed_disposition(self, action_name: str, reason: str) -> None: disp_err, ) + def _write_skipped_disposition(self, action_name: str, reason: str) -> None: + """Write DISPOSITION_SKIPPED to storage so downstream and future runs detect the skip.""" + storage_backend = getattr(self.deps.action_runner, "storage_backend", None) + if storage_backend is not None: + try: + storage_backend.set_disposition( + action_name=action_name, + record_id=NODE_LEVEL_RECORD_ID, + disposition=DISPOSITION_SKIPPED, + reason=reason[:500], + ) + except Exception as disp_err: + logger.warning( + "Failed to write DISPOSITION_SKIPPED for %s: %s", + action_name, + disp_err, + ) + def _handle_run_failure( self, params: ActionRunParams, error: Exception ) -> ActionExecutionResult: @@ -416,17 +438,22 @@ def _cleanup_correlation( def _check_upstream_health( self, action_name: str, action_config: ActionConfigDict ) -> str | None: - """Return the name of a failed upstream dependency, or None if all healthy.""" + """Return the name of a failed or skipped upstream dependency, or None if all healthy.""" dependencies = action_config.get("dependencies", []) if not dependencies: return None for dep in dependencies: - if self.deps.state_manager.is_failed(dep): + if self.deps.state_manager.is_failed(dep) or self.deps.state_manager.is_skipped(dep): return dep - # Also check disposition — covers cascaded failures from prior levels + # Also check disposition — covers cascaded failures/skips from prior levels storage_backend = getattr(self.deps.action_runner, "storage_backend", None) - if storage_backend is not None and storage_backend.has_disposition( - dep, DISPOSITION_FAILED, record_id=NODE_LEVEL_RECORD_ID + if storage_backend is not None and ( + storage_backend.has_disposition( + dep, DISPOSITION_FAILED, record_id=NODE_LEVEL_RECORD_ID + ) + or storage_backend.has_disposition( + dep, DISPOSITION_SKIPPED, record_id=NODE_LEVEL_RECORD_ID + ) ): return dep return None @@ -441,12 +468,12 @@ def _handle_dependency_skip( ) -> ActionExecutionResult: """Handle action skip due to upstream dependency failure. - State is set to ``"failed"`` so transitive dependents also skip via - ``is_failed``. ``success=True`` keeps independent branches alive. + State is set to ``"skipped"`` so transitive dependents also skip via + ``is_skipped``. ``success=True`` keeps independent branches alive. """ reason = f"Upstream dependency '{failed_dependency}' failed" - self.deps.state_manager.update_status(action_name, "failed") - self._write_failed_disposition(action_name, reason) + self.deps.state_manager.update_status(action_name, "skipped") + self._write_skipped_disposition(action_name, reason) duration = (datetime.now() - start_time).total_seconds() total_actions = ( @@ -467,14 +494,14 @@ def _handle_dependency_skip( config = ActionCompleteConfig( run_id=self.run_id, action_name=action_name, - status="failed", + status="skipped", duration_seconds=duration, skip_reason=reason, ) self.run_tracker.record_action_complete(config=config) return ActionExecutionResult( - success=True, status="failed", metrics=ExecutionMetrics(duration=duration) + success=True, status="skipped", metrics=ExecutionMetrics(duration=duration) ) def execute_action_sync( diff --git a/agent_actions/workflow/managers/_MANIFEST.md b/agent_actions/workflow/managers/_MANIFEST.md index 99d0176..5ebc0d6 100644 --- a/agent_actions/workflow/managers/_MANIFEST.md +++ b/agent_actions/workflow/managers/_MANIFEST.md @@ -14,4 +14,4 @@ Tracks workflow artifacts, batching, loops, state, and skip logic used by the ru | `manifest.py` | Module | Generates workflow manifests consumed by tooling/docs. | `tooling.docs`, `workflow` | | `output.py` | Module | `ActionOutputManager`: loads upstream outputs, resolves version correlation, creates passthrough outputs. `detect_explicit_version_consumption()` result is lazy-cached per instance to avoid redundant computation. | `output`, `workflow` | | `skip.py` | Module | Skip logic used when upstream items fail or guard conditions filter them. | `validation`, `workflow` | -| `state.py` | Module | ActionStateManager — manages action execution state persistence and queries. | `workflow`, `state_management` | +| `state.py` | Module | ActionStateManager — manages action execution state persistence and queries. Terminal states: `completed`, `failed`, `skipped`. Key methods: `is_failed()`, `is_skipped()`, `get_pending_actions()`, `is_workflow_done()`, `get_summary()`. | `workflow`, `state_management` | diff --git a/agent_actions/workflow/managers/state.py b/agent_actions/workflow/managers/state.py index 7436140..3a56f32 100644 --- a/agent_actions/workflow/managers/state.py +++ b/agent_actions/workflow/managers/state.py @@ -83,9 +83,13 @@ def is_failed(self, action_name: str) -> bool: """Return True if action has failed.""" return self.get_status(action_name) == "failed" + def is_skipped(self, action_name: str) -> bool: + """Return True if action was skipped due to upstream dependency failure.""" + return self.get_status(action_name) == "skipped" + def get_pending_actions(self, agents: list[str]) -> list[str]: - """Return actions that are not yet completed or failed (runnable).""" - terminal = {"completed", "failed"} + """Return actions that are not yet completed, failed, or skipped (runnable).""" + terminal = {"completed", "failed", "skipped"} return [agent for agent in agents if self.get_status(agent) not in terminal] def get_batch_submitted_actions(self, agents: list[str]) -> list[str]: @@ -122,8 +126,8 @@ def is_workflow_complete(self) -> bool: return all(details.get("status") == "completed" for details in self.action_status.values()) def is_workflow_done(self) -> bool: - """Return True if all actions are in a terminal state (completed or failed).""" - terminal = {"completed", "failed"} + """Return True if all actions are in a terminal state (completed, failed, or skipped).""" + terminal = {"completed", "failed", "skipped"} return all(details.get("status") in terminal for details in self.action_status.values()) def has_any_failed(self) -> bool: diff --git a/agent_actions/workflow/parallel/action_executor.py b/agent_actions/workflow/parallel/action_executor.py index 24a2285..308cbec 100644 --- a/agent_actions/workflow/parallel/action_executor.py +++ b/agent_actions/workflow/parallel/action_executor.py @@ -353,5 +353,18 @@ async def execute_level_async(self, params: LevelExecutionParams) -> bool: return False duration = (datetime.now() - start_time).total_seconds() - self.console.print(f"[green]Action {params.level_idx} complete ({duration:.2f}s)[/green]") + + has_failed = params.state_manager.get_failed_actions(params.level_actions) + has_skipped = any( + params.state_manager.is_skipped(a) for a in params.level_actions + ) + if has_failed: + color = "red" + elif has_skipped: + color = "yellow" + else: + color = "green" + self.console.print( + f"[{color}]Action {params.level_idx} complete ({duration:.2f}s)[/{color}]" + ) return True diff --git a/tasks/todo.md b/tasks/todo.md new file mode 100644 index 0000000..6d250fa --- /dev/null +++ b/tasks/todo.md @@ -0,0 +1,121 @@ +# Failure Propagation & Visibility Fixes + +Root issue: per-item failures don't propagate to action status, circuit breaker never fires, operator sees misleading green output. + +Reference: `failures.txt`, PR #67 (added circuit breaker but upstream never signals failure). + +--- + +## Phase 1 — Bugfix: Accurate tally + level coloring (REVISED) + +**Discovery:** PR #67 already added the zero-output check at `pipeline.py:493-508`. The total-wipeout case (0/N items) already propagates as `"failed"`. The remaining bugs were: +- `_handle_dependency_skip` used `status="failed"` → tally showed 0 SKIP +- Level completion line was always green + +**Approach:** Introduced `"skipped"` as a proper terminal status for dependency-skipped actions. + +- [x] **1.1** Add `is_skipped()` to state manager, update terminal sets + - Added `is_skipped()` method to `ActionStateManager` + - Updated `get_pending_actions` terminal: `{"completed", "failed", "skipped"}` + - Updated `is_workflow_done` terminal: `{"completed", "failed", "skipped"}` + +- [x] **1.2** Update executor: `_handle_dependency_skip` → `status="skipped"` + - Added `DISPOSITION_SKIPPED` import + - Added `_write_skipped_disposition()` method + - Changed `_handle_dependency_skip`: state → "skipped", disposition → SKIPPED, result → "skipped" + - Updated `_check_upstream_health`: checks `is_failed OR is_skipped`, checks both dispositions + +- [x] **1.3** Fix level completion line coloring + - `action_executor.py:355`: red if failed, yellow if skipped, green if all OK + +- [x] **1.4** Tests + - Updated all circuit breaker tests to expect "skipped" + - Added `test_dep_skipped_via_state_manager` and `test_dep_skipped_via_disposition` + - Added `TestWriteSkippedDisposition` test class + - Added `TestIsSkipped`, `TestWorkflowDoneWithSkipped` in state extensions + - All 4271 tests pass, ruff clean + +--- + +## Phase 2 — Observability: Partial failure visibility + +**Goal:** When K/N items succeed, action is `completed_with_failures`, item-level failures are persisted and surfaced. + +- [ ] **2.1** Add `completed_with_failures` status to state manager + - New terminal status alongside `completed` and `failed` + - Circuit breaker ignores this status (descendants run on partial output) + - Tally counts it as `PARTIAL` + - Verify: `pytest`, `ruff check .` + +- [ ] **2.2** Item-level failure tracking in storage backend + - Persist which items (by `source_guid` or index) failed and the error message + - Storage schema addition (e.g., `item_failures` table or disposition record) + - Verify: unit test asserting failed items are persisted and retrievable + +- [ ] **2.3** Populate `completed_with_failures` from executor + - After action runner returns: if `items_succeeded > 0 AND items_succeeded < items_attempted` → status `completed_with_failures` + - Write item-level failures to storage backend + - Verify: `pytest`, manual run with partial failure confirms new status + persisted failures + +- [ ] **2.4** Surface partial failures in output + - Action completion log: `"Action 'X': 9/10 items OK, 1 failed (see item failures)"` + - Tally: `"8 OK | 1 PARTIAL | 0 SKIP | 2 ERROR"` + - Level line: yellow for levels containing `completed_with_failures` actions + - Verify: manual run, visual inspection + +- [ ] **2.5** Tests for Phase 2 + - Test: partial failure → status is `completed_with_failures`, not `completed` or `failed` + - Test: item-level failures are persisted with guid + error + - Test: descendants of `completed_with_failures` action still run (not skipped) + - Test: tally shows PARTIAL count + - Verify: `pytest` + +--- + +## Phase 3 — UX: Pause-and-surface + retry + +**Goal:** Workflow pauses on partial failure (default), user can retry failed items or continue. Configurable retry policy. + +- [ ] **3.1** Pause-and-surface on partial failure (default behavior) + - After a level containing `completed_with_failures` actions, pause workflow + - Print summary: `"9/10 OK, 1 failed. Run 'agac retry ' or 'agac run --continue'"` + - List failed items with truncated error messages + - Verify: manual run with partial failure → workflow pauses with clear message + +- [ ] **3.2** `on_partial_failure` config option + - Per-action or workflow-level config: `on_partial_failure: pause | continue` + - Default: `pause` + - `continue`: skip the pause, proceed with partial results (for automated pipelines) + - Verify: `pytest`, manual run with `continue` config → no pause + +- [ ] **3.3** `agac retry` command / `--retry-failed` flag + - `agac retry `: re-run only failed items for the specified action + - `agac run --retry-failed`: re-run failed items for all actions with failures + - Uses persisted item-level failures (Phase 2) to filter input to only failed guids + - On success: update item status, recalculate action status + - Verify: manual run → fail items → retry → items succeed → action becomes `completed` + +- [ ] **3.4** Configurable retry policy + - Per-action config: `retry: {max_attempts: 3, backoff: exponential}` + - Executor retries failed items inline up to `max_attempts` before finalizing + - Only items still failing after all retries are persisted as failures + - Verify: `pytest`, manual run with transient 429 error → retried automatically + +- [ ] **3.5** Tests for Phase 3 + - Test: workflow pauses on partial failure by default + - Test: `on_partial_failure: continue` skips pause + - Test: retry command re-processes only failed items + - Test: retry policy with backoff retries transient errors + - Test: after successful retry, action status updates to `completed` + - Verify: `pytest` + +--- + +## Working Notes + +- Circuit breaker code: `executor.py:416-432` (`_check_upstream_health`) +- Failure recording: `executor.py:377-397` (`_handle_run_failure`) +- Level completion line: `action_executor.py:356` +- Tally output: TBD — need to locate the exact counter logic +- Action runner item processing: TBD — need to trace where per-item errors are caught +- State manager: `managers/state.py` — `is_failed()`, `get_pending_actions()` diff --git a/tests/unit/workflow/managers/test_state_extensions.py b/tests/unit/workflow/managers/test_state_extensions.py index da40822..fddd809 100644 --- a/tests/unit/workflow/managers/test_state_extensions.py +++ b/tests/unit/workflow/managers/test_state_extensions.py @@ -83,8 +83,20 @@ def test_excludes_both_completed_and_failed(self, tmp_path): pending = mgr.get_pending_actions(["a", "b", "c", "d"]) assert pending == ["c", "d"] + def test_excludes_skipped(self, tmp_path): + """get_pending_actions excludes skipped actions.""" + status_file = tmp_path / "status.json" + mgr = ActionStateManager(status_file, ["a", "b", "c"]) + mgr.update_status("a", "completed") + mgr.update_status("b", "skipped") + + pending = mgr.get_pending_actions(["a", "b", "c"]) + assert "a" not in pending + assert "b" not in pending + assert "c" in pending + def test_all_terminal_returns_empty(self, tmp_path): - """When all are completed/failed, returns empty list.""" + """When all are completed/failed/skipped, returns empty list.""" status_file = tmp_path / "status.json" mgr = ActionStateManager(status_file, ["a", "b"]) mgr.update_status("a", "completed") @@ -92,3 +104,51 @@ def test_all_terminal_returns_empty(self, tmp_path): pending = mgr.get_pending_actions(["a", "b"]) assert pending == [] + + +class TestIsSkipped: + """Tests for is_skipped().""" + + def test_is_skipped_true(self, tmp_path): + """is_skipped returns True for skipped actions.""" + status_file = tmp_path / "status.json" + mgr = ActionStateManager(status_file, ["a"]) + mgr.update_status("a", "skipped") + + assert mgr.is_skipped("a") is True + + def test_is_skipped_false_for_other_statuses(self, tmp_path): + """is_skipped returns False for non-skipped actions.""" + status_file = tmp_path / "status.json" + mgr = ActionStateManager(status_file, ["a", "b", "c"]) + mgr.update_status("a", "completed") + mgr.update_status("b", "failed") + + assert mgr.is_skipped("a") is False + assert mgr.is_skipped("b") is False + assert mgr.is_skipped("c") is False + + +class TestWorkflowDoneWithSkipped: + """Tests for is_workflow_done() with skipped status.""" + + def test_done_with_skipped(self, tmp_path): + """is_workflow_done returns True when mix of completed, failed, and skipped.""" + status_file = tmp_path / "status.json" + mgr = ActionStateManager(status_file, ["a", "b", "c"]) + mgr.update_status("a", "completed") + mgr.update_status("b", "failed") + mgr.update_status("c", "skipped") + + assert mgr.is_workflow_done() is True + + def test_summary_with_skipped(self, tmp_path): + """get_summary correctly counts skipped actions.""" + status_file = tmp_path / "status.json" + mgr = ActionStateManager(status_file, ["a", "b", "c"]) + mgr.update_status("a", "completed") + mgr.update_status("b", "skipped") + mgr.update_status("c", "failed") + + summary = mgr.get_summary() + assert summary == {"completed": 1, "skipped": 1, "failed": 1} diff --git a/tests/unit/workflow/test_circuit_breaker.py b/tests/unit/workflow/test_circuit_breaker.py index effdbff..f395a4b 100644 --- a/tests/unit/workflow/test_circuit_breaker.py +++ b/tests/unit/workflow/test_circuit_breaker.py @@ -5,7 +5,11 @@ import pytest -from agent_actions.storage.backend import DISPOSITION_FAILED, NODE_LEVEL_RECORD_ID +from agent_actions.storage.backend import ( + DISPOSITION_FAILED, + DISPOSITION_SKIPPED, + NODE_LEVEL_RECORD_ID, +) from agent_actions.workflow.executor import ( ActionExecutionResult, ActionExecutor, @@ -54,6 +58,7 @@ def test_no_dependencies_key_returns_none(self, executor): def test_all_deps_healthy_returns_none(self, executor, mock_deps): """All dependencies healthy — returns None.""" mock_deps.state_manager.is_failed.return_value = False + mock_deps.state_manager.is_skipped.return_value = False mock_deps.action_runner.storage_backend = None config = {"dependencies": ["agent_a"]} @@ -68,24 +73,43 @@ def test_dep_failed_via_state_manager(self, executor, mock_deps): result = executor._check_upstream_health("agent_b", config) assert result == "agent_a" + def test_dep_skipped_via_state_manager(self, executor, mock_deps): + """One dep skipped (state_manager.is_skipped) returns dep name.""" + mock_deps.state_manager.is_failed.return_value = False + mock_deps.state_manager.is_skipped.return_value = True + + config = {"dependencies": ["agent_a"]} + result = executor._check_upstream_health("agent_b", config) + assert result == "agent_a" + def test_dep_failed_via_disposition(self, executor, mock_deps): """One dep has DISPOSITION_FAILED in storage returns dep name.""" mock_deps.state_manager.is_failed.return_value = False + mock_deps.state_manager.is_skipped.return_value = False storage = MagicMock() - storage.has_disposition.return_value = True + storage.has_disposition.side_effect = lambda dep, disp, **kw: disp == DISPOSITION_FAILED mock_deps.action_runner.storage_backend = storage config = {"dependencies": ["agent_a"]} result = executor._check_upstream_health("agent_b", config) + assert result == "agent_a" + def test_dep_skipped_via_disposition(self, executor, mock_deps): + """One dep has DISPOSITION_SKIPPED in storage returns dep name.""" + mock_deps.state_manager.is_failed.return_value = False + mock_deps.state_manager.is_skipped.return_value = False + storage = MagicMock() + storage.has_disposition.side_effect = lambda dep, disp, **kw: disp == DISPOSITION_SKIPPED + mock_deps.action_runner.storage_backend = storage + + config = {"dependencies": ["agent_a"]} + result = executor._check_upstream_health("agent_b", config) assert result == "agent_a" - storage.has_disposition.assert_called_once_with( - "agent_a", DISPOSITION_FAILED, record_id=NODE_LEVEL_RECORD_ID - ) def test_no_storage_backend_only_checks_state_manager(self, executor, mock_deps): """No storage backend — only checks state_manager.""" mock_deps.state_manager.is_failed.return_value = False + mock_deps.state_manager.is_skipped.return_value = False mock_deps.action_runner.storage_backend = None config = {"dependencies": ["agent_a"]} @@ -97,18 +121,18 @@ class TestHandleDependencySkip: """Tests for _handle_dependency_skip().""" @patch("agent_actions.workflow.executor.fire_event") - def test_updates_state_to_failed(self, mock_fire, executor, mock_deps): - """Updates state to 'failed'.""" + def test_updates_state_to_skipped(self, mock_fire, executor, mock_deps): + """Updates state to 'skipped'.""" mock_deps.action_runner.storage_backend = None start_time = datetime.now() executor._handle_dependency_skip("agent_b", 1, {}, start_time, "agent_a") - mock_deps.state_manager.update_status.assert_called_once_with("agent_b", "failed") + mock_deps.state_manager.update_status.assert_called_once_with("agent_b", "skipped") @patch("agent_actions.workflow.executor.fire_event") - def test_writes_failed_disposition(self, mock_fire, executor, mock_deps): - """Writes DISPOSITION_FAILED to storage.""" + def test_writes_skipped_disposition(self, mock_fire, executor, mock_deps): + """Writes DISPOSITION_SKIPPED to storage.""" storage = MagicMock() mock_deps.action_runner.storage_backend = storage start_time = datetime.now() @@ -117,7 +141,7 @@ def test_writes_failed_disposition(self, mock_fire, executor, mock_deps): storage.set_disposition.assert_called_once() call_kwargs = storage.set_disposition.call_args - assert call_kwargs[1]["disposition"] == DISPOSITION_FAILED + assert call_kwargs[1]["disposition"] == DISPOSITION_SKIPPED assert call_kwargs[1]["action_name"] == "agent_b" @patch("agent_actions.workflow.executor.fire_event") @@ -137,7 +161,7 @@ def test_fires_action_skip_event(self, mock_fire, executor, mock_deps): @patch("agent_actions.workflow.executor.fire_event") def test_records_in_run_tracker_if_available(self, mock_fire, executor, mock_deps): - """Records in run_tracker if available.""" + """Records in run_tracker with status 'skipped'.""" mock_deps.action_runner.storage_backend = None executor.run_tracker = MagicMock() executor.run_id = "run-123" @@ -147,12 +171,12 @@ def test_records_in_run_tracker_if_available(self, mock_fire, executor, mock_dep executor.run_tracker.record_action_complete.assert_called_once() config = executor.run_tracker.record_action_complete.call_args[1]["config"] - assert config.status == "failed" + assert config.status == "skipped" assert config.run_id == "run-123" @patch("agent_actions.workflow.executor.fire_event") - def test_returns_failed_result_with_success_true(self, mock_fire, executor, mock_deps): - """Returns ActionExecutionResult(success=True, status='failed') — failed state, but independent branches continue.""" + def test_returns_skipped_result_with_success_true(self, mock_fire, executor, mock_deps): + """Returns ActionExecutionResult(success=True, status='skipped') — skipped state, but independent branches continue.""" mock_deps.action_runner.storage_backend = None start_time = datetime.now() @@ -160,7 +184,7 @@ def test_returns_failed_result_with_success_true(self, mock_fire, executor, mock assert isinstance(result, ActionExecutionResult) assert result.success is True - assert result.status == "failed" + assert result.status == "skipped" class TestWriteFailedDisposition: @@ -195,3 +219,27 @@ def test_noops_when_storage_backend_is_none(self, executor, mock_deps): # Should not raise; nothing happens executor._write_failed_disposition("agent_a", "Some error") + + +class TestWriteSkippedDisposition: + """Tests for _write_skipped_disposition().""" + + def test_writes_disposition_when_storage_available(self, executor, mock_deps): + """Writes DISPOSITION_SKIPPED when storage backend is available.""" + storage = MagicMock() + mock_deps.action_runner.storage_backend = storage + + executor._write_skipped_disposition("agent_b", "Upstream failed") + + storage.set_disposition.assert_called_once_with( + action_name="agent_b", + record_id=NODE_LEVEL_RECORD_ID, + disposition=DISPOSITION_SKIPPED, + reason="Upstream failed", + ) + + def test_noops_when_storage_backend_is_none(self, executor, mock_deps): + """No-ops when storage backend is None.""" + mock_deps.action_runner.storage_backend = None + + executor._write_skipped_disposition("agent_b", "Upstream failed") From 09eb67b344db6c14360c3413d5168dda3a7b7002 Mon Sep 17 00:00:00 2001 From: Muizz Lateef Date: Thu, 2 Apr 2026 10:08:58 +0100 Subject: [PATCH 2/4] style: fix ruff formatting in action_executor.py --- agent_actions/workflow/parallel/action_executor.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/agent_actions/workflow/parallel/action_executor.py b/agent_actions/workflow/parallel/action_executor.py index 308cbec..7894968 100644 --- a/agent_actions/workflow/parallel/action_executor.py +++ b/agent_actions/workflow/parallel/action_executor.py @@ -355,9 +355,7 @@ async def execute_level_async(self, params: LevelExecutionParams) -> bool: duration = (datetime.now() - start_time).total_seconds() has_failed = params.state_manager.get_failed_actions(params.level_actions) - has_skipped = any( - params.state_manager.is_skipped(a) for a in params.level_actions - ) + has_skipped = any(params.state_manager.is_skipped(a) for a in params.level_actions) if has_failed: color = "red" elif has_skipped: From 37da46ca0b2ff6b2311517e248c97c102eff8828 Mon Sep 17 00:00:00 2001 From: Muizz Lateef Date: Thu, 2 Apr 2026 10:10:19 +0100 Subject: [PATCH 3/4] fix: lazy-load all external provider SDKs to prevent CLI crash on missing packages (#79) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: lazy-load all external provider SDKs to prevent CLI crash on missing packages Previously only Gemini and Mistral used lazy imports in CLIENT_REGISTRY. The remaining providers (OpenAI, Anthropic, Cohere, Groq, Ollama) were eagerly imported at module level, causing the entire CLI to crash with ImportError if any single SDK was missing or version-incompatible — even when the user wasn't using that provider. All external SDK providers now use the same lazy "module:Class" string pattern. Internal providers (agac, hitl, tool) remain eager since they have no external dependencies. _resolve_client() catches ImportError and raises DependencyError with an actionable install command. The preflight validator gracefully skips capability checks for unavailable providers. * test: add sync guard asserting _VENDOR_PACKAGES covers all lazy registry entries Prevents silent degradation of DependencyError messages when a new lazy provider is added to CLIENT_REGISTRY without a corresponding package mapping. * fix: address PR review — test gaps, error handling, and diagnostics - Add tests for ImportError paths: _resolve_client() raises DependencyError with correct context; _resolve_capabilities() returns None gracefully - Fix vacuous pass: snapshot CLIENT_REGISTRY in fixture so sync-guard test checks actual lazy-string entries, not already-resolved classes - Catch AttributeError alongside ImportError for wrong class names in lazy entries (developer error scenario) - Preserve original traceback with `from err` instead of `from None` - Add logger.debug when _resolve_capabilities skips an unavailable SDK - Document DependencyError in invoke_client docstring Raises section --- .../llm/realtime/services/invocation.py | 51 +++++++++++++------ .../vendor_compatibility_validator.py | 13 ++++- .../preflight/test_vendor_parity.py | 47 ++++++++++++++++- 3 files changed, 93 insertions(+), 18 deletions(-) diff --git a/agent_actions/llm/realtime/services/invocation.py b/agent_actions/llm/realtime/services/invocation.py index 0c5ca82..75b9690 100644 --- a/agent_actions/llm/realtime/services/invocation.py +++ b/agent_actions/llm/realtime/services/invocation.py @@ -8,27 +8,33 @@ from typing import Any from agent_actions.llm.providers.agac.client import AgacClient -from agent_actions.llm.providers.anthropic.client import AnthropicClient -from agent_actions.llm.providers.cohere.client import CohereClient -from agent_actions.llm.providers.groq.client import GroqClient from agent_actions.llm.providers.hitl.client import HitlClient -from agent_actions.llm.providers.mistral.client import MistralClient -from agent_actions.llm.providers.ollama.client import OllamaClient -from agent_actions.llm.providers.openai.client import OpenAIClient from agent_actions.llm.providers.tools.client import ToolClient logger = logging.getLogger(__name__) -# Client registry +# Vendor → pip package name, used for actionable DependencyError messages. +_VENDOR_PACKAGES: dict[str, str] = { + "openai": "openai", + "anthropic": "anthropic", + "cohere": "cohere", + "groq": "groq", + "ollama": "ollama", + "gemini": "google-genai", + "mistral": "mistralai", +} + +# Client registry — external SDK providers use lazy "module:Class" strings +# so the CLI doesn't crash when an unused provider's SDK is absent or broken. CLIENT_REGISTRY: dict[str, Any] = { - "openai": OpenAIClient, - "ollama": OllamaClient, - # Lazy import avoids deprecated SDK warnings for non-Gemini commands. + "openai": "agent_actions.llm.providers.openai.client:OpenAIClient", + "ollama": "agent_actions.llm.providers.ollama.client:OllamaClient", "gemini": "agent_actions.llm.providers.gemini.client:GeminiClient", - "cohere": CohereClient, - "mistral": MistralClient, - "anthropic": AnthropicClient, - "groq": GroqClient, + "cohere": "agent_actions.llm.providers.cohere.client:CohereClient", + "mistral": "agent_actions.llm.providers.mistral.client:MistralClient", + "anthropic": "agent_actions.llm.providers.anthropic.client:AnthropicClient", + "groq": "agent_actions.llm.providers.groq.client:GroqClient", + # Internal providers — no external SDK deps, safe to import eagerly. "tool": ToolClient, "agac-provider": AgacClient, "hitl": HitlClient, @@ -44,7 +50,21 @@ def _resolve_client(model_vendor: str) -> Any: entry = CLIENT_REGISTRY[model_vendor] if isinstance(entry, str): module_path, class_name = entry.split(":", 1) - cls = getattr(importlib.import_module(module_path), class_name) + try: + mod = importlib.import_module(module_path) + cls = getattr(mod, class_name) + except (ImportError, AttributeError) as err: + from agent_actions.errors import DependencyError + + package = _VENDOR_PACKAGES.get(model_vendor, model_vendor) + raise DependencyError( + f"{model_vendor} provider requires the '{package}' package", + context={ + "client_type": model_vendor, + "package": package, + "install_command": f"uv pip install {package}", + }, + ) from err CLIENT_REGISTRY[model_vendor] = cls return cls return entry @@ -90,6 +110,7 @@ def invoke_client( Raises: ValueError: If client is not supported + DependencyError: If the provider's SDK package is not installed """ if model_vendor not in CLIENT_REGISTRY: raise ValueError(f"Unsupported model vendor: {model_vendor}") diff --git a/agent_actions/validation/preflight/vendor_compatibility_validator.py b/agent_actions/validation/preflight/vendor_compatibility_validator.py index 3616247..050e2b1 100644 --- a/agent_actions/validation/preflight/vendor_compatibility_validator.py +++ b/agent_actions/validation/preflight/vendor_compatibility_validator.py @@ -1,8 +1,11 @@ """Vendor compatibility validator for pre-flight validation.""" import importlib +import logging from typing import Any +logger = logging.getLogger(__name__) + from agent_actions.llm.realtime.services.invocation import CLIENT_REGISTRY from agent_actions.output.response.config_fields import get_default from agent_actions.validation.base_validator import BaseValidator @@ -21,14 +24,20 @@ def _resolve_capabilities(vendor: str) -> dict[str, Any] | None: Handles lazy string entries in CLIENT_REGISTRY (e.g. gemini) by importing them on demand, avoiding eager SDK imports at module level. - Returns ``None`` if the resolved class has no ``CAPABILITIES`` attribute. + Returns ``None`` if the resolved class has no ``CAPABILITIES`` attribute + or if the provider's SDK is not installed. """ entry = CLIENT_REGISTRY.get(vendor) if entry is None: return None if isinstance(entry, str): module_path, class_name = entry.split(":", 1) - cls = getattr(importlib.import_module(module_path), class_name) + try: + cls = getattr(importlib.import_module(module_path), class_name) + except (ImportError, AttributeError): + logger.debug("Skipping capabilities for '%s': SDK not available", vendor) + return None + CLIENT_REGISTRY[vendor] = cls else: cls = entry caps = getattr(cls, "CAPABILITIES", None) diff --git a/tests/unit/validation/preflight/test_vendor_parity.py b/tests/unit/validation/preflight/test_vendor_parity.py index d367c54..ebe6ab4 100644 --- a/tests/unit/validation/preflight/test_vendor_parity.py +++ b/tests/unit/validation/preflight/test_vendor_parity.py @@ -1,10 +1,12 @@ """Vendor parity tests — ensures each client class declares CAPABILITIES.""" +import copy import importlib +from unittest.mock import patch import pytest -from agent_actions.llm.realtime.services.invocation import CLIENT_REGISTRY +from agent_actions.llm.realtime.services.invocation import _VENDOR_PACKAGES, CLIENT_REGISTRY from agent_actions.validation.preflight.vendor_compatibility_validator import ( VALID_VENDORS, VendorCompatibilityValidator, @@ -12,11 +14,17 @@ _resolve_capabilities, ) +# Snapshot the original registry so tests can restore lazy-string entries +# after earlier tests resolve them into class objects. +_ORIGINAL_REGISTRY = copy.copy(CLIENT_REGISTRY) + @pytest.fixture(autouse=True) def reset_vendor_cache(): VendorCompatibilityValidator.clear_cache() yield + # Restore lazy-string entries that may have been resolved during the test. + CLIENT_REGISTRY.update(_ORIGINAL_REGISTRY) VendorCompatibilityValidator.clear_cache() @@ -93,9 +101,46 @@ def test_all_runtime_vendors_pass_validation(self): result = validator.validate_vendor_config(config) assert result, f"Vendor '{vendor}' failed preflight: {validator.get_errors()}" + def test_vendor_packages_covers_all_lazy_entries(self): + """Every lazy-string entry in CLIENT_REGISTRY must have a _VENDOR_PACKAGES mapping.""" + lazy_vendors = {k for k, v in CLIENT_REGISTRY.items() if isinstance(v, str)} + missing = lazy_vendors - set(_VENDOR_PACKAGES) + assert not missing, ( + f"Lazy vendors missing from _VENDOR_PACKAGES: {missing}. " + "Add entries so DependencyError shows the correct pip package name." + ) + def test_clear_cache_reinitialises_on_next_access(self): """clear_cache() resets the cache so the next call to _get_vendor_capabilities re-builds it.""" caps_first = _get_vendor_capabilities() VendorCompatibilityValidator.clear_cache() caps_second = _get_vendor_capabilities() assert caps_first == caps_second + + def test_resolve_client_raises_dependency_error_on_missing_sdk(self): + """_resolve_client must raise DependencyError with correct context when SDK is missing.""" + from agent_actions.errors import DependencyError + from agent_actions.llm.realtime.services.invocation import _resolve_client + + # Force a lazy string entry so the import path is exercised. + CLIENT_REGISTRY["mistral"] = _ORIGINAL_REGISTRY["mistral"] + + with patch.object(importlib, "import_module", side_effect=ImportError("no module")): + with pytest.raises(DependencyError) as exc_info: + _resolve_client("mistral") + + err = exc_info.value + assert "mistralai" in str(err) + assert err.context["package"] == "mistralai" + assert err.context["install_command"] == "uv pip install mistralai" + assert err.context["client_type"] == "mistral" + + def test_resolve_capabilities_returns_none_on_missing_sdk(self): + """_resolve_capabilities must return None (not crash) when SDK is missing.""" + # Force a lazy string entry. + CLIENT_REGISTRY["mistral"] = _ORIGINAL_REGISTRY["mistral"] + + with patch.object(importlib, "import_module", side_effect=ImportError("no module")): + result = _resolve_capabilities("mistral") + + assert result is None From c1e10ff65c827e76c0e803654765bdaa9776e71c Mon Sep 17 00:00:00 2001 From: Muizz Lateef Date: Thu, 2 Apr 2026 10:20:47 +0100 Subject: [PATCH 4/4] =?UTF-8?q?fix:=20address=20review=20findings=20?= =?UTF-8?q?=E2=80=94=20stale=20disposition,=20reason=20string,=20test=20ga?= =?UTF-8?q?ps?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Clear DISPOSITION_SKIPPED in _verify_completion_status on re-run (prevents stale skip markers from incorrectly skipping on retry) - Reason string now says "skipped" for cascaded skips instead of "failed" - Add _write_skipped_disposition exception path test - Add level coloring tests (green/yellow/red/precedence) - Explicit is_skipped=False in test_dep_failed_via_state_manager --- agent_actions/workflow/executor.py | 35 ++++++------ tests/unit/workflow/test_circuit_breaker.py | 59 +++++++++++++++++++++ 2 files changed, 79 insertions(+), 15 deletions(-) diff --git a/agent_actions/workflow/executor.py b/agent_actions/workflow/executor.py index 8202a4c..760068c 100644 --- a/agent_actions/workflow/executor.py +++ b/agent_actions/workflow/executor.py @@ -182,22 +182,24 @@ def _verify_completion_status( # Check disposition FIRST — a failed action may have partial # results in storage. The disposition is the authoritative # signal; output existence is irrelevant when it's set. - if storage_backend.has_disposition( - action_name, - DISPOSITION_FAILED, - record_id=NODE_LEVEL_RECORD_ID, - ): - logger.info( - "Action %s has DISPOSITION_FAILED from prior run — re-running", - action_name, - ) - storage_backend.clear_disposition( + for disp in (DISPOSITION_FAILED, DISPOSITION_SKIPPED): + if storage_backend.has_disposition( action_name, - DISPOSITION_FAILED, + disp, record_id=NODE_LEVEL_RECORD_ID, - ) - self.deps.state_manager.update_status(action_name, "pending") - return (False, None) + ): + logger.info( + "Action %s has %s from prior run — re-running", + action_name, + disp, + ) + storage_backend.clear_disposition( + action_name, + disp, + record_id=NODE_LEVEL_RECORD_ID, + ) + self.deps.state_manager.update_status(action_name, "pending") + return (False, None) target_files = storage_backend.list_target_files(action_name) if not target_files: @@ -471,7 +473,10 @@ def _handle_dependency_skip( State is set to ``"skipped"`` so transitive dependents also skip via ``is_skipped``. ``success=True`` keeps independent branches alive. """ - reason = f"Upstream dependency '{failed_dependency}' failed" + dep_status = ( + "skipped" if self.deps.state_manager.is_skipped(failed_dependency) else "failed" + ) + reason = f"Upstream dependency '{failed_dependency}' {dep_status}" self.deps.state_manager.update_status(action_name, "skipped") self._write_skipped_disposition(action_name, reason) diff --git a/tests/unit/workflow/test_circuit_breaker.py b/tests/unit/workflow/test_circuit_breaker.py index f395a4b..28c1076 100644 --- a/tests/unit/workflow/test_circuit_breaker.py +++ b/tests/unit/workflow/test_circuit_breaker.py @@ -68,6 +68,7 @@ def test_all_deps_healthy_returns_none(self, executor, mock_deps): def test_dep_failed_via_state_manager(self, executor, mock_deps): """One dep failed (state_manager.is_failed) returns dep name.""" mock_deps.state_manager.is_failed.return_value = True + mock_deps.state_manager.is_skipped.return_value = False config = {"dependencies": ["agent_a"]} result = executor._check_upstream_health("agent_b", config) @@ -238,8 +239,66 @@ def test_writes_disposition_when_storage_available(self, executor, mock_deps): reason="Upstream failed", ) + def test_logs_warning_on_storage_error(self, executor, mock_deps, caplog): + """Logs warning on storage error (doesn't raise).""" + storage = MagicMock() + storage.set_disposition.side_effect = RuntimeError("DB error") + mock_deps.action_runner.storage_backend = storage + + executor._write_skipped_disposition("agent_b", "Upstream failed") + def test_noops_when_storage_backend_is_none(self, executor, mock_deps): """No-ops when storage backend is None.""" mock_deps.action_runner.storage_backend = None executor._write_skipped_disposition("agent_b", "Upstream failed") + + +class TestLevelCompletionColoring: + """Tests for level completion line color logic (red/yellow/green).""" + + def test_green_when_all_ok(self, tmp_path): + """Level line is green when all actions completed successfully.""" + mgr = ActionStateManager(tmp_path / "status.json", ["a", "b"]) + mgr.update_status("a", "completed") + mgr.update_status("b", "completed") + + has_failed = mgr.get_failed_actions(["a", "b"]) + has_skipped = any(mgr.is_skipped(a) for a in ["a", "b"]) + + assert not has_failed + assert not has_skipped + + def test_red_when_action_failed(self, tmp_path): + """Level line is red when any action failed.""" + mgr = ActionStateManager(tmp_path / "status.json", ["a", "b"]) + mgr.update_status("a", "completed") + mgr.update_status("b", "failed") + + has_failed = mgr.get_failed_actions(["a", "b"]) + assert has_failed + + def test_yellow_when_action_skipped(self, tmp_path): + """Level line is yellow when actions are skipped but none failed.""" + mgr = ActionStateManager(tmp_path / "status.json", ["a", "b"]) + mgr.update_status("a", "completed") + mgr.update_status("b", "skipped") + + has_failed = mgr.get_failed_actions(["a", "b"]) + has_skipped = any(mgr.is_skipped(a) for a in ["a", "b"]) + + assert not has_failed + assert has_skipped + + def test_red_takes_precedence_over_yellow(self, tmp_path): + """Red (failed) takes precedence over yellow (skipped).""" + mgr = ActionStateManager(tmp_path / "status.json", ["a", "b", "c"]) + mgr.update_status("a", "completed") + mgr.update_status("b", "failed") + mgr.update_status("c", "skipped") + + has_failed = mgr.get_failed_actions(["a", "b", "c"]) + has_skipped = any(mgr.is_skipped(a) for a in ["a", "b", "c"]) + + assert has_failed + assert has_skipped