Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent_actions/config/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class ProcessingMode(Enum):
This enum controls *how* a single component runs: synchronously, asynchronously,
or auto-detected. Values: SYNC, ASYNC, AUTO.

Not to be confused with ``processing.types.ProcessingMode`` which controls the
Not to be confused with ``config.types.RunMode`` which controls the
*pipeline-level* dispatch mode (ONLINE vs BATCH).
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from agent_actions.output.writer import FileWriter
from agent_actions.processing.processor import RecordProcessor
from agent_actions.processing.result_collector import ResultCollector
from agent_actions.processing.types import ProcessingContext, ProcessingMode
from agent_actions.processing.types import ProcessingContext
from agent_actions.prompt.formatter import PromptFormatter
from agent_actions.storage.backend import DISPOSITION_PASSTHROUGH, NODE_LEVEL_RECORD_ID
from agent_actions.utils.constants import CHUNK_CONFIG_KEY
Expand Down Expand Up @@ -683,7 +683,7 @@ def _process_online_mode_with_record_processor(
processing_context = ProcessingContext(
agent_config=cast("ActionConfigDict", ctx.agent_config),
agent_name=ctx.agent_name,
mode=ProcessingMode.ONLINE,
mode=RunMode.ONLINE,
is_first_stage=True,
file_path=str(file_path),
output_directory=str(output_directory),
Expand Down
3 changes: 2 additions & 1 deletion agent_actions/llm/batch/processing/preparator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pathlib import Path
from typing import Any

from agent_actions.config.types import RunMode
from agent_actions.errors import ConfigurationError
from agent_actions.llm.batch.core.batch_constants import ContextMetaKeys, FilterStatus
from agent_actions.llm.batch.core.batch_context_metadata import BatchContextMetadata
Expand Down Expand Up @@ -256,7 +257,7 @@ def _build_preparation_context(
agent_config=agent_config,
agent_name=agent_name,
is_first_stage=False, # Batch is always subsequent-stage
is_batch_mode=True, # Batch processing mode
mode=RunMode.BATCH,
source_data=source_data,
agent_indices=self.action_indices,
dependency_configs=self.dependency_configs,
Expand Down
4 changes: 2 additions & 2 deletions agent_actions/processing/_MANIFEST.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ lineage helpers, recovery flows, and transformation pipelines.
| `helpers.py` | Module | Shared helpers (UUID construction, tuple flattening) for processors. | `processing` |
| `processor.py` | Module | Base processor that glues loaders, transformers, and error handling. | `input`, `processing` |
| `result_collector.py` | Module | Collects main vs side outputs, handles duplicates. Counts UNPROCESSED results separately from successes. | `output` |
| `prepared_task.py` | Module | `GuardStatus` enum (PASSED, SKIPPED, FILTERED, UPSTREAM_UNPROCESSED) and `PreparedTask` dataclass output by TaskPreparer. | `typing` |
| `prepared_task.py` | Module | `GuardStatus` enum (PASSED, SKIPPED, FILTERED, UPSTREAM_UNPROCESSED), `PreparedTask` dataclass, and `PreparationContext` (carries `mode: RunMode` directly). | `typing` |
| `task_preparer.py` | Module | Unified task preparation (normalize, prompt, guard) for batch/online. Short-circuits upstream-unprocessed records before context loading. | `input`, `prompt` |
| `types.py` | Module | `ProcessingStatus` enum (SUCCESS, SKIPPED, FILTERED, FAILED, EXHAUSTED, DEFERRED, UNPROCESSED), `ProcessingResult` factories, and `ProcessingContext`. | `typing` |
| `types.py` | Module | `ProcessingStatus` enum (SUCCESS, SKIPPED, FILTERED, FAILED, EXHAUSTED, DEFERRED, UNPROCESSED), `ProcessingResult` factories, and `ProcessingContext` (uses `RunMode` for mode). | `typing` |
5 changes: 2 additions & 3 deletions agent_actions/processing/batch_context_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@

from typing import Any, cast

from agent_actions.config.types import ActionConfigDict
from agent_actions.config.types import ActionConfigDict, RunMode
from agent_actions.processing.types import (
ProcessingContext,
ProcessingMode,
ProcessingResult,
RecoveryMetadata,
)
Expand All @@ -25,7 +24,7 @@ def to_processing_context(
return ProcessingContext(
agent_config=cast(ActionConfigDict, agent_config),
agent_name=agent_config.get("agent_type", "unknown_action"),
mode=ProcessingMode.BATCH,
mode=RunMode.BATCH,
is_first_stage=False,
current_item=original_row,
record_index=record_index,
Expand Down
6 changes: 4 additions & 2 deletions agent_actions/processing/batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

from typing import Any, Optional

from agent_actions.config.types import RunMode

from .invocation import BatchProvider, InvocationStrategy
from .record_processor import RecordProcessor
from .types import ProcessingContext, ProcessingMode, ProcessingResult
from .types import ProcessingContext, ProcessingResult


class BatchProcessor:
Expand All @@ -19,7 +21,7 @@ def __init__(
agent_config: dict[str, Any],
agent_name: str,
strategy: InvocationStrategy | None = None,
mode: ProcessingMode = ProcessingMode.ONLINE,
mode: RunMode = RunMode.ONLINE,
provider: Optional["BatchProvider"] = None,
):
self._processor = RecordProcessor(
Expand Down
6 changes: 3 additions & 3 deletions agent_actions/processing/invocation/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@
if TYPE_CHECKING:
from agent_actions.processing.recovery.response_validator import ResponseValidator

from agent_actions.config.types import RunMode
from agent_actions.processing.invocation.batch import BatchStrategy
from agent_actions.processing.invocation.online import OnlineStrategy
from agent_actions.processing.invocation.strategy import BatchProvider, InvocationStrategy
from agent_actions.processing.types import ProcessingMode


class InvocationStrategyFactory:
"""Create invocation strategies based on processing mode."""

@staticmethod
def create(
mode: ProcessingMode,
mode: RunMode,
agent_config: dict[str, Any],
provider: BatchProvider | None = None,
) -> InvocationStrategy:
Expand All @@ -27,7 +27,7 @@ def create(
Raises:
ValueError: If BATCH mode requested without provider.
"""
if mode == ProcessingMode.BATCH:
if mode == RunMode.BATCH:
if provider is None:
raise ValueError(
f"BatchProvider required for BATCH mode (action: '{agent_config.get('agent_type', 'unknown')}')"
Expand Down
7 changes: 4 additions & 3 deletions agent_actions/processing/prepared_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from enum import Enum
from typing import TYPE_CHECKING, Any, Optional, cast

from agent_actions.config.types import RunMode

if TYPE_CHECKING:
from agent_actions.processing.types import ProcessingContext
from agent_actions.storage.backend import StorageBackend
Expand Down Expand Up @@ -64,7 +66,7 @@ class PreparationContext:
agent_config: dict[str, Any]
agent_name: str
is_first_stage: bool = False
is_batch_mode: bool = False
mode: RunMode = RunMode.ONLINE
source_data: list[dict[str, Any]] | None = None
agent_indices: dict[str, int] | None = None
dependency_configs: dict[str, Any] | None = None
Expand All @@ -80,14 +82,13 @@ class PreparationContext:
@classmethod
def from_processing_context(cls, context: "ProcessingContext") -> "PreparationContext":
"""Create PreparationContext from a ProcessingContext."""
from agent_actions.processing.types import ProcessingMode
from agent_actions.utils.tools_resolver import resolve_tools_path

return cls(
agent_config=cast(dict[str, Any], context.agent_config),
agent_name=context.agent_name,
is_first_stage=context.is_first_stage,
is_batch_mode=context.mode == ProcessingMode.BATCH,
mode=context.mode,
source_data=context.source_data,
agent_indices=context.agent_indices,
dependency_configs=context.dependency_configs,
Expand Down
6 changes: 3 additions & 3 deletions agent_actions/processing/record_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import UTC, datetime
from typing import Any, Optional, cast

from agent_actions.config.types import RunMode
from agent_actions.errors import ConfigurationError, SchemaValidationError
from agent_actions.errors.operations import TemplateVariableError
from agent_actions.errors.processing import EmptyOutputError
Expand All @@ -29,7 +30,6 @@
from .task_preparer import TaskPreparer, get_task_preparer
from .types import (
ProcessingContext,
ProcessingMode,
ProcessingResult,
ProcessingStatus,
)
Expand All @@ -54,13 +54,13 @@ def __init__(
agent_config: dict[str, Any],
agent_name: str,
strategy: InvocationStrategy | None = None,
mode: ProcessingMode = ProcessingMode.ONLINE,
mode: RunMode = RunMode.ONLINE,
provider: Optional["BatchProvider"] = None,
):
self.agent_config = agent_config
self.agent_name = agent_name

if strategy is not None and (mode != ProcessingMode.ONLINE or provider is not None):
if strategy is not None and (mode != RunMode.ONLINE or provider is not None):
logger.warning(
"Both 'strategy' and 'mode'/'provider' specified for %s; "
"'strategy' takes precedence",
Expand Down
5 changes: 1 addition & 4 deletions agent_actions/processing/task_preparer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from collections.abc import Callable
from typing import Any

from agent_actions.config.types import RunMode
from agent_actions.processing.prepared_task import (
GuardStatus,
PreparationContext,
Expand Down Expand Up @@ -261,13 +260,11 @@ def _render_prompt(
"""Render prompt template using pre-loaded field context."""
from agent_actions.prompt.service import PromptPreparationService

mode: RunMode = RunMode.BATCH if context.is_batch_mode else RunMode.ONLINE

return PromptPreparationService.prepare_prompt_with_field_context(
agent_config=context.agent_config,
agent_name=context.agent_name,
contents=content if isinstance(content, dict) else {},
mode=mode,
mode=context.mode,
field_context=field_context,
tools_path=context.tools_path,
)
Expand Down
11 changes: 2 additions & 9 deletions agent_actions/processing/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from enum import Enum
from typing import TYPE_CHECKING, Any, Optional

from agent_actions.config.types import ActionConfigDict
from agent_actions.config.types import ActionConfigDict, RunMode

if TYPE_CHECKING:
from agent_actions.storage.backend import StorageBackend
Expand All @@ -22,13 +22,6 @@ class ProcessingStatus(Enum):
UNPROCESSED = "unprocessed" # Upstream failed/skipped this record


class ProcessingMode(Enum):
"""Workflow-level data flow mode: ONLINE (synchronous) or BATCH (queued)."""

ONLINE = "online"
BATCH = "batch"


@dataclass
class RetryState:
"""Retry-related state for a processing operation."""
Expand Down Expand Up @@ -285,7 +278,7 @@ class ProcessingContext:

agent_config: ActionConfigDict
agent_name: str
mode: ProcessingMode = ProcessingMode.ONLINE
mode: RunMode = RunMode.ONLINE
is_first_stage: bool = False
source_data: list[dict[str, Any]] = field(default_factory=list)
file_path: str | None = None
Expand Down
6 changes: 2 additions & 4 deletions agent_actions/prompt/data_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@
from agent_actions.storage.backend import StorageBackend
from agent_actions.config.di.container import registry
from agent_actions.config.interfaces import IGenerator, ProcessingMode
from agent_actions.config.types import RunMode
from agent_actions.errors import GenerationError
from agent_actions.processing.processor import RecordProcessor
from agent_actions.processing.types import (
ProcessingContext,
ProcessingStatus,
)
from agent_actions.processing.types import (
ProcessingMode as CoreProcessingMode,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -78,7 +76,7 @@ def create_agent_with_data(
context = ProcessingContext(
agent_config=cast(ActionConfigDict, self.agent_config),
agent_name=self.agent_name,
mode=CoreProcessingMode.ONLINE,
mode=RunMode.ONLINE,
is_first_stage=False, # This is subsequent-stage processing
source_data=source_content if isinstance(source_content, list) else [],
file_path=file_path,
Expand Down
4 changes: 2 additions & 2 deletions agent_actions/workflow/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from agent_actions.output.writer import FileWriter
from agent_actions.processing.processor import RecordProcessor
from agent_actions.processing.result_collector import ResultCollector
from agent_actions.processing.types import ProcessingContext, ProcessingMode
from agent_actions.processing.types import ProcessingContext
from agent_actions.prompt.context.scope_file_mode import apply_observe_for_file_mode
from agent_actions.storage.backend import DISPOSITION_PASSTHROUGH, NODE_LEVEL_RECORD_ID
from agent_actions.utils.constants import MODEL_VENDOR_KEY
Expand Down Expand Up @@ -456,7 +456,7 @@ def _process_by_strategy(
context = ProcessingContext(
agent_config=self.config.action_config,
agent_name=self.config.action_name,
mode=ProcessingMode.ONLINE,
mode=RunMode.ONLINE,
is_first_stage=False,
source_data=source_data, # Pass the loaded source data
file_path=file_path,
Expand Down
12 changes: 6 additions & 6 deletions tests/core/test_invocation_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pytest

from agent_actions.config.types import RunMode
from agent_actions.processing.invocation import (
BatchStrategy,
BatchSubmissionResult,
Expand All @@ -12,7 +13,7 @@
OnlineStrategy,
)
from agent_actions.processing.prepared_task import GuardStatus, PreparedTask
from agent_actions.processing.types import ProcessingContext, ProcessingMode
from agent_actions.processing.types import ProcessingContext


@pytest.fixture
Expand Down Expand Up @@ -62,7 +63,7 @@ def basic_context():
return ProcessingContext(
agent_config={"agent_type": "test_agent", "prompt": "test"},
agent_name="test_agent",
mode=ProcessingMode.ONLINE,
mode=RunMode.ONLINE,
)


Expand Down Expand Up @@ -339,7 +340,7 @@ def test_batch_mode_requires_provider(self):
"""Test BATCH mode raises error without provider."""
with pytest.raises(ValueError, match="BatchProvider required"):
InvocationStrategyFactory.create(
mode=ProcessingMode.BATCH,
mode=RunMode.BATCH,
agent_config={"agent_type": "test"},
)

Expand All @@ -355,7 +356,7 @@ def test_batch_mode_without_provider_raises(self):
RecordProcessor(
agent_config={},
agent_name="test",
mode=ProcessingMode.BATCH,
mode=RunMode.BATCH,
)


Expand All @@ -380,7 +381,6 @@ def test_batch_invocation_returns_deferred_not_filtered(
from agent_actions.processing.processor import RecordProcessor
from agent_actions.processing.types import (
ProcessingContext,
ProcessingMode,
ProcessingStatus,
)

Expand Down Expand Up @@ -414,7 +414,7 @@ def test_batch_invocation_returns_deferred_not_filtered(
context = ProcessingContext(
agent_config={"agent_type": "test"},
agent_name="test",
mode=ProcessingMode.BATCH,
mode=RunMode.BATCH,
)

result = processor.process({"raw": "data"}, context)
Expand Down
8 changes: 4 additions & 4 deletions tests/core/test_record_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

import pytest

from agent_actions.config.types import RunMode
from agent_actions.processing.processor import RecordProcessor
from agent_actions.processing.types import (
ProcessingContext,
ProcessingMode,
ProcessingResult,
ProcessingStatus,
)
Expand Down Expand Up @@ -129,7 +129,7 @@ def process(self, item, context):
agent_name="test_action",
agent_indices={"test_action": 0},
is_first_stage=False,
mode=ProcessingMode.BATCH,
mode=RunMode.BATCH,
)

# ConfigurationError should be re-raised, not caught
Expand Down Expand Up @@ -160,7 +160,7 @@ def process(self, item, context):
agent_name="test_action",
agent_indices={"test_action": 0},
is_first_stage=False,
mode=ProcessingMode.BATCH,
mode=RunMode.BATCH,
)

# TemplateVariableError should be re-raised, not caught
Expand All @@ -182,7 +182,7 @@ def process(self, item, context):
agent_name="test_action",
agent_indices={"test_action": 0},
is_first_stage=False,
mode=ProcessingMode.BATCH,
mode=RunMode.BATCH,
)

# Other exceptions should be caught and converted to failed results
Expand Down
Loading
Loading