Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion agent_actions/logging/events/formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class AgentActionsFormatter:
# Status colors for Rich
COLORS = {
"OK": "green",
"PARTIAL": "yellow",
"SKIP": "yellow",
"CACHED": "cyan",
"ERROR": "red",
Expand Down Expand Up @@ -81,14 +82,19 @@ def _format_workflow_complete(self, event: BaseEvent) -> str:
ts = self._timestamp(event)
elapsed = event.data.get("elapsed_time", 0.0)
completed = event.data.get("actions_completed", 0)
partial = event.data.get("actions_partial", 0)
skipped = event.data.get("actions_skipped", 0)
failed = event.data.get("actions_failed", 0)

ok = self._status("OK") if completed > 0 else "OK"
part = self._status("PARTIAL") if partial > 0 else "PARTIAL"
skip = self._status("SKIP") if skipped > 0 else "SKIP"
err = self._status("ERROR") if failed > 0 else "ERROR"

return f"{ts}Completed in {elapsed:.2f}s | {completed} {ok} | {skipped} {skip} | {failed} {err}"
return (
f"{ts}Completed in {elapsed:.2f}s | {completed} {ok} | "
f"{partial} {part} | {skipped} {skip} | {failed} {err}"
)

def _format_workflow_failed(self, event: BaseEvent) -> str:
ts = self._timestamp(event)
Expand Down
6 changes: 4 additions & 2 deletions agent_actions/logging/events/workflow_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class WorkflowCompleteEvent(BaseEvent):
workflow_name: str = ""
elapsed_time: float = 0.0
actions_completed: int = 0
actions_partial: int = 0
actions_skipped: int = 0
actions_failed: int = 0
total_tokens: int = 0
Expand All @@ -60,13 +61,14 @@ def __post_init__(self) -> None:
self.category = EventCategories.WORKFLOW
self.message = (
f"Completed workflow {self.workflow_name} in {self.elapsed_time:.2f}s | "
f"{self.actions_completed} completed | {self.actions_skipped} skipped | "
f"{self.actions_failed} failed"
f"{self.actions_completed} completed | {self.actions_partial} partial | "
f"{self.actions_skipped} skipped | {self.actions_failed} failed"
)
self.data = {
"workflow_name": self.workflow_name,
"elapsed_time": self.elapsed_time,
"actions_completed": self.actions_completed,
"actions_partial": self.actions_partial,
"actions_skipped": self.actions_skipped,
"actions_failed": self.actions_failed,
"total_tokens": self.total_tokens,
Expand Down
16 changes: 16 additions & 0 deletions agent_actions/processing/result_collector.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Shared result aggregation for processing output records."""

import collections
import json
import logging
from typing import TYPE_CHECKING, Any, Optional

Expand Down Expand Up @@ -178,12 +179,27 @@ def collect_results(
)
)
if storage_backend and result.source_guid:
snapshot_source = result.source_snapshot or result.input_record
input_snapshot_str = None
if snapshot_source and isinstance(snapshot_source, dict):
try:
input_snapshot_str = json.dumps(
snapshot_source, ensure_ascii=False, default=str
)
except (TypeError, ValueError) as snap_err:
logger.debug(
"Could not serialize input snapshot for %s: %s",
result.source_guid,
snap_err,
)
input_snapshot_str = None
_safe_set_disposition(
storage_backend,
agent_name,
result.source_guid,
DISPOSITION_FAILED,
reason=result.error or "processing_error",
input_snapshot=input_snapshot_str,
)

elif status == ProcessingStatus.FILTERED:
Expand Down
16 changes: 15 additions & 1 deletion agent_actions/storage/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,14 @@ def set_disposition( # noqa: B027
disposition: str | Disposition,
reason: str | None = None,
relative_path: str | None = None,
input_snapshot: str | None = None,
) -> None:
"""Write a disposition record (use NODE_LEVEL_RECORD_ID for node-level signals)."""
"""Write a disposition record (use NODE_LEVEL_RECORD_ID for node-level signals).

Args:
input_snapshot: JSON-serialized input record for failed items.
Implementations SHOULD truncate to a reasonable limit (recommended 10KB).
"""
# No-op: subclass must override to persist dispositions.

def get_disposition(
Expand Down Expand Up @@ -149,6 +155,14 @@ def clear_disposition(
"""Delete matching disposition records. Returns count deleted."""
return 0

def get_failed_items(self, action_name: str) -> list[dict[str, Any]]:
"""Return item-level failure dispositions, excluding node-level sentinels."""
return [
d
for d in self.get_disposition(action_name, disposition=DISPOSITION_FAILED)
if d.get("record_id") != NODE_LEVEL_RECORD_ID
]

def delete_target(self, action_name: str) -> int:
"""Delete all target data for an action. Returns count deleted.

Expand Down
24 changes: 20 additions & 4 deletions agent_actions/storage/backends/sqlite_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class SQLiteBackend(StorageBackend):
disposition TEXT NOT NULL,
reason TEXT,
relative_path TEXT,
input_snapshot TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(action_name, record_id, disposition)
)
Expand Down Expand Up @@ -165,6 +166,12 @@ def initialize(self) -> None:
cursor.execute(self.DISPOSITION_INDEX_ACTION_SQL)
cursor.execute(self.DISPOSITION_INDEX_ACTION_DISP_SQL)
cursor.execute(self.DISPOSITION_INDEX_ACTION_RECORD_SQL)
# Migration: add input_snapshot column for existing databases
try:
cursor.execute("ALTER TABLE record_disposition ADD COLUMN input_snapshot TEXT")
logger.debug("Added input_snapshot column to record_disposition")
except sqlite3.OperationalError:
logger.debug("input_snapshot column already exists in record_disposition")
self.connection.commit()
logger.info(
"Initialized SQLite storage backend: %s",
Expand Down Expand Up @@ -491,6 +498,7 @@ def set_disposition(
disposition: str | Disposition,
reason: str | None = None,
relative_path: str | None = None,
input_snapshot: str | None = None,
) -> None:
"""Write a disposition record (INSERT OR REPLACE)."""
action_name = self._validate_identifier(action_name, "action_name")
Expand All @@ -501,17 +509,24 @@ def set_disposition(
raise ValueError(
f"Invalid disposition '{disposition}'. Valid: {sorted(VALID_DISPOSITIONS)}"
)
# Cap input_snapshot at 10KB to prevent storage bloat.
# Wrap truncated content so consumers can detect and skip invalid JSON.
if input_snapshot and len(input_snapshot) > 10240:
input_snapshot = (
'{"__truncated__": true, "partial": ' + json.dumps(input_snapshot[:8192]) + "}"
)

with self._lock:
cursor = self.connection.cursor()
try:
cursor.execute(
"""
INSERT OR REPLACE INTO record_disposition
(action_name, record_id, disposition, reason, relative_path, created_at)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
(action_name, record_id, disposition, reason, relative_path,
input_snapshot, created_at)
VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
""",
(action_name, record_id, disposition, reason, relative_path),
(action_name, record_id, disposition, reason, relative_path, input_snapshot),
)
self.connection.commit()
logger.debug(
Expand Down Expand Up @@ -545,7 +560,8 @@ def get_disposition(
action_name = self._validate_identifier(action_name, "action_name")

query = (
"SELECT action_name, record_id, disposition, reason, relative_path, created_at"
"SELECT action_name, record_id, disposition, reason, relative_path,"
" input_snapshot, created_at"
" FROM record_disposition WHERE action_name = ?"
)
params: list[str] = [action_name]
Expand Down
3 changes: 2 additions & 1 deletion agent_actions/workflow/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from agent_actions.workflow.config_pipeline import load_workflow_configs
from agent_actions.workflow.execution_events import WorkflowEventLogger
from agent_actions.workflow.managers.artifacts import ArtifactLinker
from agent_actions.workflow.managers.state import COMPLETED_STATUSES
from agent_actions.workflow.models import (
ActionLogParams,
RuntimeContext,
Expand Down Expand Up @@ -475,7 +476,7 @@ def _run_single_action(self, idx: int, action_name: str, total_actions: int) ->
if result.status == "skipped":
return False # Continue to next action

if result.output_folder and result.status == "completed":
if result.output_folder and result.status in COMPLETED_STATUSES:
self.state.ephemeral_directories.append(
{
"output_folder": result.output_folder,
Expand Down
4 changes: 3 additions & 1 deletion agent_actions/workflow/execution_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
WorkflowFailedEvent,
WorkflowStartEvent,
)
from agent_actions.workflow.managers.state import COMPLETED_STATUSES
from agent_actions.workflow.models import ActionLogParams, WorkflowRuntimeConfig, WorkflowServices

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -90,7 +91,7 @@ def log_action_skip(self, idx: int, action_name: str, total_actions: int):

def log_action_result(self, params: ActionLogParams):
"""Log action execution result via event system."""
if params.result.success and params.result.status == "completed":
if params.result.success and params.result.status in COMPLETED_STATUSES:
tokens = {}
if hasattr(params.result, "tokens") and params.result.tokens:
tokens = params.result.tokens
Expand Down Expand Up @@ -129,6 +130,7 @@ def finalize_workflow(self, elapsed_time: float = 0.0):
workflow_name=self.agent_name,
elapsed_time=elapsed_time,
actions_completed=summary.get("completed", 0),
actions_partial=summary.get("completed_with_failures", 0),
actions_skipped=summary.get("skipped", 0),
actions_failed=summary.get("failed", 0),
)
Expand Down
45 changes: 34 additions & 11 deletions agent_actions/workflow/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
)
from agent_actions.tooling.docs.run_tracker import ActionCompleteConfig
from agent_actions.utils.constants import DEFAULT_ACTION_KIND
from agent_actions.workflow.managers.state import COMPLETED_STATUSES

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -146,7 +147,7 @@ def _maybe_invalidate_completed_status(
self, action_name: str, action_config: ActionConfigDict, current_status: str
) -> str:
"""Reset to pending if limit config changed since last completion."""
if current_status != "completed":
if current_status not in COMPLETED_STATUSES:
return current_status
details = self.deps.state_manager.get_status_details(action_name)
if details.get("record_limit") != action_config.get("record_limit") or details.get(
Expand Down Expand Up @@ -332,17 +333,20 @@ def _handle_run_success(
metrics=ExecutionMetrics(duration=duration),
)

# Normal completion
# Normal completion — check for partial item failures
final_status = self._resolve_completion_status(params.action_name)

self.deps.state_manager.update_status(
params.action_name, "completed", **self._limit_metadata(params.action_config)
params.action_name, final_status, **self._limit_metadata(params.action_config)
)
tokens = get_last_usage()

tracker_status = "success" if final_status == "completed" else "partial"
if self.run_tracker is not None and self.run_id is not None:
config = ActionCompleteConfig(
run_id=self.run_id,
action_name=params.action_name,
status="success",
status=tracker_status,
duration_seconds=duration,
tokens=tokens,
files_processed=0,
Expand All @@ -352,7 +356,7 @@ def _handle_run_success(
return ActionExecutionResult(
success=True,
output_folder=output_folder,
status="completed",
status=final_status,
metrics=ExecutionMetrics(
duration=duration,
tokens=tokens,
Expand Down Expand Up @@ -398,6 +402,23 @@ def _write_skipped_disposition(self, action_name: str, reason: str) -> None:
disp_err,
)

def _resolve_completion_status(self, action_name: str) -> str:
"""Return 'completed_with_failures' if item-level failures exist, else 'completed'."""
storage_backend = getattr(self.deps.action_runner, "storage_backend", None)
if storage_backend is not None:
try:
item_failures = storage_backend.get_failed_items(action_name)
if item_failures:
logger.warning(
"Action '%s' completed with %d item-level failure(s)",
action_name,
len(item_failures),
)
return "completed_with_failures"
except Exception as e:
logger.warning("Could not check partial failures for %s: %s", action_name, e)
return "completed"

def _handle_run_failure(
self, params: ActionRunParams, error: Exception
) -> ActionExecutionResult:
Expand Down Expand Up @@ -536,7 +557,7 @@ def execute_action_sync(
action_name, action_config, current_status
)

if current_status == "completed":
if current_status in COMPLETED_STATUSES:
should_skip, result = self._verify_completion_status(action_name)
if should_skip:
if result is None:
Expand Down Expand Up @@ -598,7 +619,7 @@ async def execute_action_async(
action_name, action_config, current_status
)

if current_status == "completed":
if current_status in COMPLETED_STATUSES:
should_skip, result = self._verify_completion_status(action_name)
if should_skip:
if result is None:
Expand Down Expand Up @@ -654,8 +675,9 @@ def _handle_batch_check(
duration = (datetime.now() - start_time).total_seconds()

if batch_status == "completed":
final_status = self._resolve_completion_status(action_name)
self.deps.state_manager.update_status(
action_name, "completed", **self._limit_metadata(action_config)
action_name, final_status, **self._limit_metadata(action_config)
)
fire_event(
BatchCompleteEvent(
Expand All @@ -670,7 +692,7 @@ def _handle_batch_check(
return ActionExecutionResult(
success=True,
output_folder=output_folder,
status="completed",
status=final_status,
metrics=ExecutionMetrics(duration=duration),
)

Expand Down Expand Up @@ -728,8 +750,9 @@ async def _handle_batch_check_async(
duration = (datetime.now() - start_time).total_seconds()

if batch_status == "completed":
final_status = self._resolve_completion_status(action_name)
self.deps.state_manager.update_status(
action_name, "completed", **self._limit_metadata(action_config)
action_name, final_status, **self._limit_metadata(action_config)
)
fire_event(
BatchCompleteEvent(
Expand All @@ -744,7 +767,7 @@ async def _handle_batch_check_async(
return ActionExecutionResult(
success=True,
output_folder=output_folder,
status="completed",
status=final_status,
metrics=ExecutionMetrics(duration=duration),
)

Expand Down
Loading
Loading