Type-safe stream processing primitives for Go channels.
Build composable pipelines from simple parts — batching, windowing, flow control — with unified error handling and deterministic testing.
Every processor shares the same signature:
Process(ctx context.Context, in <-chan Result[T]) <-chan Result[Out]Result[T] unifies success and error in a single channel — no dual-channel complexity:
// Filter keeps items matching a predicate
filter := streamz.NewFilter(func(o Order) bool { return o.Total > 0 })
// Mapper transforms items
mapper := streamz.NewMapper(func(o Order) Order {
o.ProcessedAt = time.Now()
return o
})
// Batcher collects items by size or time
batcher := streamz.NewBatcher[Order](streamz.BatchConfig{
MaxSize: 100,
MaxLatency: time.Second,
})Compose them into pipelines — each output feeds the next input:
filtered := filter.Process(ctx, orders)
mapped := mapper.Process(ctx, filtered)
batched := batcher.Process(ctx, mapped)
for batch := range batched {
if batch.IsSuccess() {
bulkInsert(batch.Value())
}
}One interface. One channel. Every pattern.
go get github.com/zoobz-io/streamzRequires Go 1.24+.
package main
import (
"context"
"fmt"
"time"
"github.com/zoobz-io/streamz"
)
type Order struct {
ID string
Total float64
ProcessedAt time.Time
}
func main() {
ctx := context.Background()
// Source channel
orders := make(chan streamz.Result[Order], 10)
go func() {
defer close(orders)
orders <- streamz.Success(Order{ID: "A", Total: 99.99})
orders <- streamz.Success(Order{ID: "B", Total: 149.99})
orders <- streamz.Success(Order{ID: "C", Total: 0}) // will be filtered
}()
// Build processors
filter := streamz.NewFilter(func(o Order) bool { return o.Total > 0 })
mapper := streamz.NewMapper(func(o Order) Order {
o.ProcessedAt = time.Now()
return o
})
// Compose pipeline
filtered := filter.Process(ctx, orders)
processed := mapper.Process(ctx, filtered)
// Consume results
for result := range processed {
if result.IsSuccess() {
o := result.Value()
fmt.Printf("Processed: %s ($%.2f) at %v\n", o.ID, o.Total, o.ProcessedAt)
}
}
}| Feature | Description | Docs |
|---|---|---|
| Result[T] Pattern | Unified success/error handling in a single channel | Concepts |
| Processor Interface | Common pattern for all stream operations | Processors |
| Batching & Windowing | Time and size-based aggregation | Architecture |
| Flow Control | Throttle, debounce, buffer, backpressure | Backpressure |
| Deterministic Testing | Clock abstraction for reproducible tests | Testing |
| Error Handling | Skip, retry, dead letter queues | Error Handling |
- Type-safe — Full compile-time checking with Go generics
- Composable — Complex pipelines from simple, reusable parts
- Unified errors —
Result[T]eliminates dual-channel complexity - Deterministic testing — Clock abstraction enables reproducible time-based tests
- Production ready — Proper channel lifecycle, no goroutine leaks
- Minimal dependencies — Standard library plus clockz
streamz enables a pattern: define processors once, compose them into any pipeline.
Your stream operations become reusable building blocks. Validation, enrichment, batching, rate limiting — each is a processor. Combine them in different configurations for different use cases.
// Reusable processors
validate := streamz.NewFilter(isValid)
enrich := streamz.NewAsyncMapper(fetchMetadata).WithWorkers(10)
batch := streamz.NewBatcher[Order](streamz.BatchConfig{MaxSize: 100})
throttle := streamz.NewThrottle[[]Order](100 * time.Millisecond)
// Real-time pipeline
realtime := mapper.Process(ctx, filter.Process(ctx, orders))
// Batch pipeline with rate limiting
batched := throttle.Process(ctx, batch.Process(ctx, enrich.Process(ctx, validate.Process(ctx, orders))))Time-dependent processors use clockz for deterministic testing — advance time explicitly, verify behavior reproducibly.
Full documentation is available in the docs/ directory:
- Quickstart — Build your first pipeline
- Concepts — Result[T], processors, composition
- Architecture — Pipeline patterns and design
- Processors — Processor interface and lifecycle
- Channels — Channel management and cleanup
- Backpressure — Flow control strategies
- Error Handling — Error patterns and recovery
- Testing — Deterministic testing with clock abstraction
- Best Practices — Production recommendations
- Performance — Optimization and benchmarking
- Patterns — Common design patterns
- Recipes — Complete examples for common scenarios
- API Reference — Complete processor documentation
See CONTRIBUTING.md for guidelines. Run make help for available commands.
MIT License — see LICENSE for details.