Skip to content

InitialStrategy silently marks action completed when all records fail #82

@Muizzkolapo

Description

@Muizzkolapo

Summary

When the first action in a workflow fails (e.g., all records get a 401 API key error), the action is still marked as "completed" (OK) instead of "failed". This prevents the circuit breaker from firing, so downstream actions attempt to process empty data and cascade into confusing failures.

Root Cause

initial_pipeline.py:694-721 (_process_online_mode_with_record_processor) is missing the failure detection check that exists in pipeline.py:498-508.

pipeline.py (StandardStrategy) — has the check:

if data and not output:
    failed_results = [r for r in results if r.status == ProcessingStatus.FAILED]
    if failed_results:
        raise RuntimeError(
            f"Action '{self.config.action_name}' produced 0 records — "
            f"all {len(data)} input item(s) failed: {summary}"
        )

initial_pipeline.py (InitialStrategy) — missing the check:

results = processor.process_batch(data_chunk, processing_context)
processed_items = ResultCollector.collect_results(results, ...)
# <-- NO failure check here
file_writer.write_target(processed_items)  # writes [] to DB
return str(output_file_path)  # returns successfully

When all records fail with a non-retriable error (401 auth), process_batch catches each exception and creates ProcessingResult.failed(). collect_results returns an empty list (FAILED records don't add to output). Without the failure check, write_target([]) saves record_count=0 to the DB and the action returns successfully.

Reproduction

# Set an invalid API key in review_analyzer config
agac run -a review_analyzer

Output:

09:39:34 | Non-retriable error (action=extract_claims): groq API error: Error code: 401
09:39:34 |  Error processing item 0: groq API error: ...
09:39:35 |  Error processing item 1: groq API error: ...
09:39:35 |  Processing failed for source_guid=...: Error processing item 0
09:39:35 |  Processing failed for source_guid=...: Error processing item 1
Action 0 complete (2.40s)           # <-- should be ERROR, not OK
...
09:39:35 | Completed in 2.43s | 4 OK | 3 SKIP | 1 ERROR

extract_claims shows as OK despite all records failing. The circuit breaker never fires. score_quality_1/2/3 run on empty data (also OK). aggregate_scores is the first to actually error because version correlation finds no real outputs.

Suggested Fix

Add the same failure detection check to _process_online_mode_with_record_processor in initial_pipeline.py after collect_results, before write_target:

if data_chunk and not processed_items:
    from agent_actions.processing.types import ProcessingStatus
    failed_results = [r for r in results if r.status == ProcessingStatus.FAILED]
    if failed_results:
        failed_msgs = [r.error for r in failed_results if r.error]
        summary = "; ".join(failed_msgs[:3])
        raise RuntimeError(
            f"Action '{ctx.agent_name}' produced 0 records — "
            f"all {len(data_chunk)} input item(s) failed: {summary}"
        )

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions