Summary
When the first action in a workflow fails (e.g., all records get a 401 API key error), the action is still marked as "completed" (OK) instead of "failed". This prevents the circuit breaker from firing, so downstream actions attempt to process empty data and cascade into confusing failures.
Root Cause
initial_pipeline.py:694-721 (_process_online_mode_with_record_processor) is missing the failure detection check that exists in pipeline.py:498-508.
pipeline.py (StandardStrategy) — has the check:
if data and not output:
failed_results = [r for r in results if r.status == ProcessingStatus.FAILED]
if failed_results:
raise RuntimeError(
f"Action '{self.config.action_name}' produced 0 records — "
f"all {len(data)} input item(s) failed: {summary}"
)
initial_pipeline.py (InitialStrategy) — missing the check:
results = processor.process_batch(data_chunk, processing_context)
processed_items = ResultCollector.collect_results(results, ...)
# <-- NO failure check here
file_writer.write_target(processed_items) # writes [] to DB
return str(output_file_path) # returns successfully
When all records fail with a non-retriable error (401 auth), process_batch catches each exception and creates ProcessingResult.failed(). collect_results returns an empty list (FAILED records don't add to output). Without the failure check, write_target([]) saves record_count=0 to the DB and the action returns successfully.
Reproduction
# Set an invalid API key in review_analyzer config
agac run -a review_analyzer
Output:
09:39:34 | Non-retriable error (action=extract_claims): groq API error: Error code: 401
09:39:34 | Error processing item 0: groq API error: ...
09:39:35 | Error processing item 1: groq API error: ...
09:39:35 | Processing failed for source_guid=...: Error processing item 0
09:39:35 | Processing failed for source_guid=...: Error processing item 1
Action 0 complete (2.40s) # <-- should be ERROR, not OK
...
09:39:35 | Completed in 2.43s | 4 OK | 3 SKIP | 1 ERROR
extract_claims shows as OK despite all records failing. The circuit breaker never fires. score_quality_1/2/3 run on empty data (also OK). aggregate_scores is the first to actually error because version correlation finds no real outputs.
Suggested Fix
Add the same failure detection check to _process_online_mode_with_record_processor in initial_pipeline.py after collect_results, before write_target:
if data_chunk and not processed_items:
from agent_actions.processing.types import ProcessingStatus
failed_results = [r for r in results if r.status == ProcessingStatus.FAILED]
if failed_results:
failed_msgs = [r.error for r in failed_results if r.error]
summary = "; ".join(failed_msgs[:3])
raise RuntimeError(
f"Action '{ctx.agent_name}' produced 0 records — "
f"all {len(data_chunk)} input item(s) failed: {summary}"
)
Related
Summary
When the first action in a workflow fails (e.g., all records get a 401 API key error), the action is still marked as "completed" (OK) instead of "failed". This prevents the circuit breaker from firing, so downstream actions attempt to process empty data and cascade into confusing failures.
Root Cause
initial_pipeline.py:694-721(_process_online_mode_with_record_processor) is missing the failure detection check that exists inpipeline.py:498-508.pipeline.py(StandardStrategy) — has the check:initial_pipeline.py(InitialStrategy) — missing the check:When all records fail with a non-retriable error (401 auth),
process_batchcatches each exception and createsProcessingResult.failed().collect_resultsreturns an empty list (FAILED records don't add to output). Without the failure check,write_target([])savesrecord_count=0to the DB and the action returns successfully.Reproduction
# Set an invalid API key in review_analyzer config agac run -a review_analyzerOutput:
extract_claimsshows as OK despite all records failing. The circuit breaker never fires.score_quality_1/2/3run on empty data (also OK).aggregate_scoresis the first to actually error because version correlation finds no real outputs.Suggested Fix
Add the same failure detection check to
_process_online_mode_with_record_processorininitial_pipeline.pyaftercollect_results, beforewrite_target:Related
pipeline.pybut missedinitial_pipeline.py