Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 75 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ zenpipe(100)
4. [Usage](#usage)
- [Pipeline Operations](#pipeline-operations)
- [Class Methods as Operations](#class-methods-as-operations)
- [Context Passing](#context-passing)
- [Exception Handling](#exception-handling)
- [More Examples](#more-examples)
5. [API Reference](#api-reference)
6. [Contributing](#contributing)
Expand All @@ -58,10 +60,11 @@ composer require dynamik-dev/zenpipe-php
## Usage
### Pipeline Operations

Pipeline operations are functions that take an input and return a processed value. Each operation can receive up to three parameters:
Pipeline operations are functions that take an input and return a processed value. Each operation can receive up to four parameters:
- `$input`: The value being processed
- `$next`: A callback to pass the value to the next operation
- `$return`: (Optional) A callback to exit the pipeline early with a value
- `$context`: (Optional) A shared context object passed to all operations

#### Basic Operation Example
Let's build an input sanitization pipeline:
Expand Down Expand Up @@ -157,6 +160,77 @@ $pipeline = zenpipe()
]);
```

### Context Passing

You can pass a shared context object to all operations using `withContext()`. This is useful for sharing state, configuration, or dependencies across the pipeline without threading them through the value.

```php
// Use any object as context - your own DTO, stdClass, or array
class RequestContext
{
public function __construct(
public string $userId,
public array $permissions,
public array $logs = []
) {}
}

$context = new RequestContext(
userId: 'user-123',
permissions: ['read', 'write']
);

$result = zenpipe(['action' => 'update', 'data' => [...]])
->withContext($context)
->pipe(function ($request, $next, $return, RequestContext $ctx) {
if (!in_array('write', $ctx->permissions)) {
return $return(['error' => 'Unauthorized']);
}
$ctx->logs[] = "Permission check passed for {$ctx->userId}";
return $next($request);
})
->pipe(function ($request, $next, $return, RequestContext $ctx) {
$ctx->logs[] = "Processing {$request['action']}";
return $next([...$request, 'processed_by' => $ctx->userId]);
})
->process();

// Context is mutable - logs are accumulated across operations
// $context->logs = ['Permission check passed for user-123', 'Processing update']
```

Type hint your context parameter in the operation signature for IDE support:

```php
/** @var ZenPipe<array, RequestContext> */
$pipeline = zenpipe()
->withContext(new RequestContext(...))
->pipe(fn($value, $next, $return, RequestContext $ctx) => ...);
```

### Exception Handling

Use `catch()` to handle exceptions gracefully without breaking the pipeline:

```php
$result = zenpipe($userData)
->pipe(fn($data, $next) => $next(validateInput($data)))
->pipe(fn($data, $next) => $next(processPayment($data))) // might throw
->pipe(fn($data, $next) => $next(sendConfirmation($data)))
->catch(fn(Throwable $e, $originalValue) => [
'error' => $e->getMessage(),
'input' => $originalValue,
])
->process();
```

The catch handler receives:
- `$e`: The thrown exception (`Throwable`)
- `$value`: The original input value passed to `process()`
- `$context`: The context set via `withContext()` (null if not set)

If no catch handler is set, exceptions propagate normally.

### More Examples

#### RAG Processes
Expand Down
48 changes: 48 additions & 0 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,47 @@ Static factory method to create a new pipeline instance.
- `$initialValue` (mixed|null): The initial value to be processed through the pipeline.
- **Returns:** A new `ZenPipe` instance.

### withContext()

```php
public function withContext(mixed $context): self
```

Sets a context object that will be passed to all operations as the fourth parameter.

- **Parameters:**
- `$context` (mixed): Any value to be passed as context (object, array, DTO, etc.)
- **Returns:** The `ZenPipe` instance for method chaining.

**Example:**
```php
$pipeline = zenpipe($value)
->withContext(new MyContext())
->pipe(fn($v, $next, $return, MyContext $ctx) => $next($v));
```

### catch()

```php
public function catch(callable $handler): self
```

Sets an exception handler for the pipeline.

- **Parameters:**
- `$handler` (callable): A function that receives `(Throwable $e, mixed $originalValue, mixed $context)` and returns a fallback value.
- **Returns:** The `ZenPipe` instance for method chaining.

**Example:**
```php
$pipeline = zenpipe($value)
->withContext($myContext)
->pipe(fn($v, $next) => $next(riskyOperation($v)))
->catch(fn($e, $value, $ctx) => ['error' => $e->getMessage()]);
```

If an exception occurs and no catch handler is set, the exception propagates normally.

### pipe()

```php
Expand All @@ -51,6 +92,13 @@ Adds an operation to the pipeline.
- **Returns:** The `ZenPipe` instance for method chaining.
- **Throws:** `\InvalidArgumentException` if the specified class does not exist.

**Operation Parameters:**
Operations receive up to four parameters:
1. `$value` - The current value being processed
2. `$next` - Callback to pass value to next operation
3. `$return` - Callback to exit pipeline early with a value
4. `$context` - The context set via `withContext()` (null if not set)

### process()

```php
Expand Down
5 changes: 4 additions & 1 deletion phpstan.neon
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ parameters:
- vendor
ignoreErrors:
- '#PHPDoc tag @var#'
-
-
message: '#Template type T of function zenpipe\(\) is not referenced in a parameter.#'
path: src/helpers.php
-
message: '#Template type TContext of function zenpipe\(\) is not referenced in a parameter.#'
path: src/helpers.php
reportUnmatchedIgnoredErrors: false
59 changes: 51 additions & 8 deletions src/ZenPipe.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,54 @@

/**
* @template T
* @template TContext
*/
class ZenPipe
{
/** @var array<callable|array{class-string, string}> */
protected array $operations = [];

/** @var TContext|null */
protected mixed $context = null;

/** @var callable|null */
protected $exceptionHandler = null;

public function __construct(protected mixed $initialValue = null)
{
}

/**
* Set an exception handler for the pipeline.
*
* @param callable(\Throwable, mixed, TContext|null): mixed $handler
* @return self<T, TContext>
*/
public function catch(callable $handler): self
{
$this->exceptionHandler = $handler;

return $this;
}

/**
* Set the context to be passed to each operation.
*
* @template TNewContext
*
* @param TNewContext $context
* @return self<T, TNewContext>
*/
public function withContext(mixed $context): self
{
$this->context = $context;

return $this;
}

/**
* @param mixed|null $initialValue
* @return self<T>
* @return self<T, TContext>
*/
public static function make(mixed $initialValue = null): self
{
Expand All @@ -25,7 +60,7 @@ public static function make(mixed $initialValue = null): self

/**
* @param callable|array{class-string, string} $operation
* @return self<T>
* @return self<T, TContext>
*/
public function pipe($operation): self
{
Expand Down Expand Up @@ -78,6 +113,7 @@ public function __invoke($initialValue)
* @param T|null $initialValue
* @return T
* @throws \InvalidArgumentException
* @throws \Throwable
*/
public function process($initialValue = null)
{
Expand All @@ -93,17 +129,24 @@ public function process($initialValue = null)
$this->passThroughOperation()
);

return $pipeline($value);
try {
return $pipeline($value);
} catch (\Throwable $e) {
if ($this->exceptionHandler !== null) {
return ($this->exceptionHandler)($e, $value, $this->context);
}
throw $e;
}
}

/**
* This method is used to carry the value through the pipeline.
* It wraps the next operation in a closure that can handle both
* static method calls and regular callables.
*
* The operation can now accept a third parameter for early return:
* - For callables: function($value, $next, $return)
* - For class methods: method($value, $next, $return)
* Operations can accept up to four parameters:
* - For callables: function($value, $next, $return, $context)
* - For class methods: method($value, $next, $return, $context)
*
* @return callable
*/
Expand All @@ -120,10 +163,10 @@ public function carry(): callable
$method = $operation[1];
$instance = new $class();

return $instance->$method($value, $next, $return);
return $instance->$method($value, $next, $return, $this->context);
}

return $operation($value, $next, $return);
return $operation($value, $next, $return, $this->context);
};
};
}
Expand Down
4 changes: 3 additions & 1 deletion src/helpers.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
* Create a new ZenPipe instance.
*
* @template T
* @return ZenPipe<T>
* @template TContext
*
* @return ZenPipe<T, TContext>
*/
function zenpipe(mixed $initialValue = null): ZenPipe
{
Expand Down
Loading