Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 60 additions & 28 deletions agent_actions/workflow/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
BatchCompleteEvent,
BatchSubmittedEvent,
)
from agent_actions.storage.backend import DISPOSITION_FAILED, NODE_LEVEL_RECORD_ID
from agent_actions.storage.backend import (
DISPOSITION_FAILED,
DISPOSITION_SKIPPED,
NODE_LEVEL_RECORD_ID,
)
from agent_actions.tooling.docs.run_tracker import ActionCompleteConfig
from agent_actions.utils.constants import DEFAULT_ACTION_KIND

Expand Down Expand Up @@ -72,7 +76,7 @@ class ActionExecutionResult:

success: bool
output_folder: str | None = None
status: str = "completed" # 'completed', 'batch_submitted', 'failed'
status: str = "completed" # 'completed', 'batch_submitted', 'failed', 'skipped'
error: Exception | None = None
metrics: ExecutionMetrics = field(default_factory=ExecutionMetrics)

Expand Down Expand Up @@ -178,22 +182,24 @@ def _verify_completion_status(
# Check disposition FIRST — a failed action may have partial
# results in storage. The disposition is the authoritative
# signal; output existence is irrelevant when it's set.
if storage_backend.has_disposition(
action_name,
DISPOSITION_FAILED,
record_id=NODE_LEVEL_RECORD_ID,
):
logger.info(
"Action %s has DISPOSITION_FAILED from prior run — re-running",
for disp in (DISPOSITION_FAILED, DISPOSITION_SKIPPED):
if storage_backend.has_disposition(
action_name,
)
storage_backend.clear_disposition(
action_name,
DISPOSITION_FAILED,
disp,
record_id=NODE_LEVEL_RECORD_ID,
)
self.deps.state_manager.update_status(action_name, "pending")
return (False, None)
):
logger.info(
"Action %s has %s from prior run — re-running",
action_name,
disp,
)
storage_backend.clear_disposition(
action_name,
disp,
record_id=NODE_LEVEL_RECORD_ID,
)
self.deps.state_manager.update_status(action_name, "pending")
return (False, None)

target_files = storage_backend.list_target_files(action_name)
if not target_files:
Expand Down Expand Up @@ -374,6 +380,24 @@ def _write_failed_disposition(self, action_name: str, reason: str) -> None:
disp_err,
)

def _write_skipped_disposition(self, action_name: str, reason: str) -> None:
"""Write DISPOSITION_SKIPPED to storage so downstream and future runs detect the skip."""
storage_backend = getattr(self.deps.action_runner, "storage_backend", None)
if storage_backend is not None:
try:
storage_backend.set_disposition(
action_name=action_name,
record_id=NODE_LEVEL_RECORD_ID,
disposition=DISPOSITION_SKIPPED,
reason=reason[:500],
)
except Exception as disp_err:
logger.warning(
"Failed to write DISPOSITION_SKIPPED for %s: %s",
action_name,
disp_err,
)

def _handle_run_failure(
self, params: ActionRunParams, error: Exception
) -> ActionExecutionResult:
Expand Down Expand Up @@ -416,17 +440,22 @@ def _cleanup_correlation(
def _check_upstream_health(
self, action_name: str, action_config: ActionConfigDict
) -> str | None:
"""Return the name of a failed upstream dependency, or None if all healthy."""
"""Return the name of a failed or skipped upstream dependency, or None if all healthy."""
dependencies = action_config.get("dependencies", [])
if not dependencies:
return None
for dep in dependencies:
if self.deps.state_manager.is_failed(dep):
if self.deps.state_manager.is_failed(dep) or self.deps.state_manager.is_skipped(dep):
return dep
# Also check disposition — covers cascaded failures from prior levels
# Also check disposition — covers cascaded failures/skips from prior levels
storage_backend = getattr(self.deps.action_runner, "storage_backend", None)
if storage_backend is not None and storage_backend.has_disposition(
dep, DISPOSITION_FAILED, record_id=NODE_LEVEL_RECORD_ID
if storage_backend is not None and (
storage_backend.has_disposition(
dep, DISPOSITION_FAILED, record_id=NODE_LEVEL_RECORD_ID
)
or storage_backend.has_disposition(
dep, DISPOSITION_SKIPPED, record_id=NODE_LEVEL_RECORD_ID
)
):
return dep
return None
Expand All @@ -441,12 +470,15 @@ def _handle_dependency_skip(
) -> ActionExecutionResult:
"""Handle action skip due to upstream dependency failure.

State is set to ``"failed"`` so transitive dependents also skip via
``is_failed``. ``success=True`` keeps independent branches alive.
State is set to ``"skipped"`` so transitive dependents also skip via
``is_skipped``. ``success=True`` keeps independent branches alive.
"""
reason = f"Upstream dependency '{failed_dependency}' failed"
self.deps.state_manager.update_status(action_name, "failed")
self._write_failed_disposition(action_name, reason)
dep_status = (
"skipped" if self.deps.state_manager.is_skipped(failed_dependency) else "failed"
)
reason = f"Upstream dependency '{failed_dependency}' {dep_status}"
self.deps.state_manager.update_status(action_name, "skipped")
self._write_skipped_disposition(action_name, reason)

duration = (datetime.now() - start_time).total_seconds()
total_actions = (
Expand All @@ -467,14 +499,14 @@ def _handle_dependency_skip(
config = ActionCompleteConfig(
run_id=self.run_id,
action_name=action_name,
status="failed",
status="skipped",
duration_seconds=duration,
skip_reason=reason,
)
self.run_tracker.record_action_complete(config=config)

return ActionExecutionResult(
success=True, status="failed", metrics=ExecutionMetrics(duration=duration)
success=True, status="skipped", metrics=ExecutionMetrics(duration=duration)
)

def execute_action_sync(
Expand Down
2 changes: 1 addition & 1 deletion agent_actions/workflow/managers/_MANIFEST.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ Tracks workflow artifacts, batching, loops, state, and skip logic used by the ru
| `manifest.py` | Module | Generates workflow manifests consumed by tooling/docs. | `tooling.docs`, `workflow` |
| `output.py` | Module | `ActionOutputManager`: loads upstream outputs, resolves version correlation, creates passthrough outputs. `detect_explicit_version_consumption()` result is lazy-cached per instance to avoid redundant computation. | `output`, `workflow` |
| `skip.py` | Module | Skip logic used when upstream items fail or guard conditions filter them. | `validation`, `workflow` |
| `state.py` | Module | ActionStateManager — manages action execution state persistence and queries. | `workflow`, `state_management` |
| `state.py` | Module | ActionStateManager — manages action execution state persistence and queries. Terminal states: `completed`, `failed`, `skipped`. Key methods: `is_failed()`, `is_skipped()`, `get_pending_actions()`, `is_workflow_done()`, `get_summary()`. | `workflow`, `state_management` |
12 changes: 8 additions & 4 deletions agent_actions/workflow/managers/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,13 @@ def is_failed(self, action_name: str) -> bool:
"""Return True if action has failed."""
return self.get_status(action_name) == "failed"

def is_skipped(self, action_name: str) -> bool:
"""Return True if action was skipped due to upstream dependency failure."""
return self.get_status(action_name) == "skipped"

def get_pending_actions(self, agents: list[str]) -> list[str]:
"""Return actions that are not yet completed or failed (runnable)."""
terminal = {"completed", "failed"}
"""Return actions that are not yet completed, failed, or skipped (runnable)."""
terminal = {"completed", "failed", "skipped"}
return [agent for agent in agents if self.get_status(agent) not in terminal]

def get_batch_submitted_actions(self, agents: list[str]) -> list[str]:
Expand Down Expand Up @@ -122,8 +126,8 @@ def is_workflow_complete(self) -> bool:
return all(details.get("status") == "completed" for details in self.action_status.values())

def is_workflow_done(self) -> bool:
"""Return True if all actions are in a terminal state (completed or failed)."""
terminal = {"completed", "failed"}
"""Return True if all actions are in a terminal state (completed, failed, or skipped)."""
terminal = {"completed", "failed", "skipped"}
return all(details.get("status") in terminal for details in self.action_status.values())

def has_any_failed(self) -> bool:
Expand Down
13 changes: 12 additions & 1 deletion agent_actions/workflow/parallel/action_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,5 +353,16 @@ async def execute_level_async(self, params: LevelExecutionParams) -> bool:
return False

duration = (datetime.now() - start_time).total_seconds()
self.console.print(f"[green]Action {params.level_idx} complete ({duration:.2f}s)[/green]")

has_failed = params.state_manager.get_failed_actions(params.level_actions)
has_skipped = any(params.state_manager.is_skipped(a) for a in params.level_actions)
if has_failed:
color = "red"
elif has_skipped:
color = "yellow"
else:
color = "green"
self.console.print(
f"[{color}]Action {params.level_idx} complete ({duration:.2f}s)[/{color}]"
)
return True
121 changes: 121 additions & 0 deletions tasks/todo.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Failure Propagation & Visibility Fixes

Root issue: per-item failures don't propagate to action status, circuit breaker never fires, operator sees misleading green output.

Reference: `failures.txt`, PR #67 (added circuit breaker but upstream never signals failure).

---

## Phase 1 — Bugfix: Accurate tally + level coloring (REVISED)

**Discovery:** PR #67 already added the zero-output check at `pipeline.py:493-508`. The total-wipeout case (0/N items) already propagates as `"failed"`. The remaining bugs were:
- `_handle_dependency_skip` used `status="failed"` → tally showed 0 SKIP
- Level completion line was always green

**Approach:** Introduced `"skipped"` as a proper terminal status for dependency-skipped actions.

- [x] **1.1** Add `is_skipped()` to state manager, update terminal sets
- Added `is_skipped()` method to `ActionStateManager`
- Updated `get_pending_actions` terminal: `{"completed", "failed", "skipped"}`
- Updated `is_workflow_done` terminal: `{"completed", "failed", "skipped"}`

- [x] **1.2** Update executor: `_handle_dependency_skip` → `status="skipped"`
- Added `DISPOSITION_SKIPPED` import
- Added `_write_skipped_disposition()` method
- Changed `_handle_dependency_skip`: state → "skipped", disposition → SKIPPED, result → "skipped"
- Updated `_check_upstream_health`: checks `is_failed OR is_skipped`, checks both dispositions

- [x] **1.3** Fix level completion line coloring
- `action_executor.py:355`: red if failed, yellow if skipped, green if all OK

- [x] **1.4** Tests
- Updated all circuit breaker tests to expect "skipped"
- Added `test_dep_skipped_via_state_manager` and `test_dep_skipped_via_disposition`
- Added `TestWriteSkippedDisposition` test class
- Added `TestIsSkipped`, `TestWorkflowDoneWithSkipped` in state extensions
- All 4271 tests pass, ruff clean

---

## Phase 2 — Observability: Partial failure visibility

**Goal:** When K/N items succeed, action is `completed_with_failures`, item-level failures are persisted and surfaced.

- [ ] **2.1** Add `completed_with_failures` status to state manager
- New terminal status alongside `completed` and `failed`
- Circuit breaker ignores this status (descendants run on partial output)
- Tally counts it as `PARTIAL`
- Verify: `pytest`, `ruff check .`

- [ ] **2.2** Item-level failure tracking in storage backend
- Persist which items (by `source_guid` or index) failed and the error message
- Storage schema addition (e.g., `item_failures` table or disposition record)
- Verify: unit test asserting failed items are persisted and retrievable

- [ ] **2.3** Populate `completed_with_failures` from executor
- After action runner returns: if `items_succeeded > 0 AND items_succeeded < items_attempted` → status `completed_with_failures`
- Write item-level failures to storage backend
- Verify: `pytest`, manual run with partial failure confirms new status + persisted failures

- [ ] **2.4** Surface partial failures in output
- Action completion log: `"Action 'X': 9/10 items OK, 1 failed (see item failures)"`
- Tally: `"8 OK | 1 PARTIAL | 0 SKIP | 2 ERROR"`
- Level line: yellow for levels containing `completed_with_failures` actions
- Verify: manual run, visual inspection

- [ ] **2.5** Tests for Phase 2
- Test: partial failure → status is `completed_with_failures`, not `completed` or `failed`
- Test: item-level failures are persisted with guid + error
- Test: descendants of `completed_with_failures` action still run (not skipped)
- Test: tally shows PARTIAL count
- Verify: `pytest`

---

## Phase 3 — UX: Pause-and-surface + retry

**Goal:** Workflow pauses on partial failure (default), user can retry failed items or continue. Configurable retry policy.

- [ ] **3.1** Pause-and-surface on partial failure (default behavior)
- After a level containing `completed_with_failures` actions, pause workflow
- Print summary: `"9/10 OK, 1 failed. Run 'agac retry <action>' or 'agac run --continue'"`
- List failed items with truncated error messages
- Verify: manual run with partial failure → workflow pauses with clear message

- [ ] **3.2** `on_partial_failure` config option
- Per-action or workflow-level config: `on_partial_failure: pause | continue`
- Default: `pause`
- `continue`: skip the pause, proceed with partial results (for automated pipelines)
- Verify: `pytest`, manual run with `continue` config → no pause

- [ ] **3.3** `agac retry` command / `--retry-failed` flag
- `agac retry <action>`: re-run only failed items for the specified action
- `agac run --retry-failed`: re-run failed items for all actions with failures
- Uses persisted item-level failures (Phase 2) to filter input to only failed guids
- On success: update item status, recalculate action status
- Verify: manual run → fail items → retry → items succeed → action becomes `completed`

- [ ] **3.4** Configurable retry policy
- Per-action config: `retry: {max_attempts: 3, backoff: exponential}`
- Executor retries failed items inline up to `max_attempts` before finalizing
- Only items still failing after all retries are persisted as failures
- Verify: `pytest`, manual run with transient 429 error → retried automatically

- [ ] **3.5** Tests for Phase 3
- Test: workflow pauses on partial failure by default
- Test: `on_partial_failure: continue` skips pause
- Test: retry command re-processes only failed items
- Test: retry policy with backoff retries transient errors
- Test: after successful retry, action status updates to `completed`
- Verify: `pytest`

---

## Working Notes

- Circuit breaker code: `executor.py:416-432` (`_check_upstream_health`)
- Failure recording: `executor.py:377-397` (`_handle_run_failure`)
- Level completion line: `action_executor.py:356`
- Tally output: TBD — need to locate the exact counter logic
- Action runner item processing: TBD — need to trace where per-item errors are caught
- State manager: `managers/state.py` — `is_failed()`, `get_pending_actions()`
62 changes: 61 additions & 1 deletion tests/unit/workflow/managers/test_state_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,72 @@ def test_excludes_both_completed_and_failed(self, tmp_path):
pending = mgr.get_pending_actions(["a", "b", "c", "d"])
assert pending == ["c", "d"]

def test_excludes_skipped(self, tmp_path):
"""get_pending_actions excludes skipped actions."""
status_file = tmp_path / "status.json"
mgr = ActionStateManager(status_file, ["a", "b", "c"])
mgr.update_status("a", "completed")
mgr.update_status("b", "skipped")

pending = mgr.get_pending_actions(["a", "b", "c"])
assert "a" not in pending
assert "b" not in pending
assert "c" in pending

def test_all_terminal_returns_empty(self, tmp_path):
"""When all are completed/failed, returns empty list."""
"""When all are completed/failed/skipped, returns empty list."""
status_file = tmp_path / "status.json"
mgr = ActionStateManager(status_file, ["a", "b"])
mgr.update_status("a", "completed")
mgr.update_status("b", "failed")

pending = mgr.get_pending_actions(["a", "b"])
assert pending == []


class TestIsSkipped:
"""Tests for is_skipped()."""

def test_is_skipped_true(self, tmp_path):
"""is_skipped returns True for skipped actions."""
status_file = tmp_path / "status.json"
mgr = ActionStateManager(status_file, ["a"])
mgr.update_status("a", "skipped")

assert mgr.is_skipped("a") is True

def test_is_skipped_false_for_other_statuses(self, tmp_path):
"""is_skipped returns False for non-skipped actions."""
status_file = tmp_path / "status.json"
mgr = ActionStateManager(status_file, ["a", "b", "c"])
mgr.update_status("a", "completed")
mgr.update_status("b", "failed")

assert mgr.is_skipped("a") is False
assert mgr.is_skipped("b") is False
assert mgr.is_skipped("c") is False


class TestWorkflowDoneWithSkipped:
"""Tests for is_workflow_done() with skipped status."""

def test_done_with_skipped(self, tmp_path):
"""is_workflow_done returns True when mix of completed, failed, and skipped."""
status_file = tmp_path / "status.json"
mgr = ActionStateManager(status_file, ["a", "b", "c"])
mgr.update_status("a", "completed")
mgr.update_status("b", "failed")
mgr.update_status("c", "skipped")

assert mgr.is_workflow_done() is True

def test_summary_with_skipped(self, tmp_path):
"""get_summary correctly counts skipped actions."""
status_file = tmp_path / "status.json"
mgr = ActionStateManager(status_file, ["a", "b", "c"])
mgr.update_status("a", "completed")
mgr.update_status("b", "skipped")
mgr.update_status("c", "failed")

summary = mgr.get_summary()
assert summary == {"completed": 1, "skipped": 1, "failed": 1}
Loading
Loading