Skip to content

feat: add TransactionalParallelBatchNode with saga-style compensation#2

Open
weise25 wants to merge 1 commit into
mainfrom
feature/transactional-parallel-batch-node
Open

feat: add TransactionalParallelBatchNode with saga-style compensation#2
weise25 wants to merge 1 commit into
mainfrom
feature/transactional-parallel-batch-node

Conversation

@weise25

@weise25 weise25 commented May 1, 2026

Copy link
Copy Markdown
Owner

Problem

AsyncParallelBatchNode runs all items concurrently, but provides no rollback mechanism when one task fails. Because all tasks are awaited via a settle pattern, every task that completed before the failure has already produced its side effects — database writes, API calls, external mutations — and there is no automatic way to undo them.

This leaves the system in a partial, inconsistent state:

  • Records inserted into a DB that should not exist
  • API charges made that cannot be reversed automatically
  • Files written that are never cleaned up
  • SharedStore populated with partial results that downstream nodes cannot trust

The existing docblock warns about this, but places the entire burden on the caller.

Solution

This PR introduces TransactionalParallelBatchNode, an abstract base class that implements the Saga pattern directly inside the framework.

How it works

  1. All items are still executed concurrently (same behaviour as AsyncParallelBatchNode).
  2. As each task completes successfully, it is recorded in a $completed list.
  3. If any task fails, all recorded completions are compensated in reverse order before the original exception is re-thrown.
  4. Compensation failures are swallowed individually so that all compensations are always attempted.

Usage

Extend the class and implement execAsync() as usual, plus the new compensate() method:

class SaveDocumentsNode extends TransactionalParallelBatchNode
{
    public function prepAsync(SharedStore $shared): PromiseInterface
    {
        return async(fn() => $shared->documents)();
    }

    public function execAsync(mixed $document): PromiseInterface
    {
        return async(function () use ($document) {
            $id = $this->params['db']->insert($document);
            return $id;
        })();
    }

    public function compensate(mixed $item, mixed $result): PromiseInterface
    {
        return async(function () use ($result) {
            $this->params['db']->delete($result);
        })();
    }
}

Caveats (documented in class docblock)

  • Compensation is best-effort: if compensate() itself throws, the error is swallowed and the next compensation is attempted. The original exception is always re-thrown.
  • Not a replacement for DB transactions: for strict atomicity, use database-level transactions. This class covers the cases where no native transaction mechanism is available (external APIs, multi-service writes, etc.).
  • AsyncParallelBatchNode is unchanged — existing code is unaffected.

Files changed

  • src/TransactionalParallelBatchNode.php — new file

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant