Skip to content

zoobz-io/streamz

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

19 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

streamz

CI Status codecov Go Report Card CodeQL Go Reference License Go Version Release

Type-safe stream processing primitives for Go channels.

Build composable pipelines from simple parts — batching, windowing, flow control — with unified error handling and deterministic testing.

One Channel, Every Pattern

Every processor shares the same signature:

Process(ctx context.Context, in <-chan Result[T]) <-chan Result[Out]

Result[T] unifies success and error in a single channel — no dual-channel complexity:

// Filter keeps items matching a predicate
filter := streamz.NewFilter(func(o Order) bool { return o.Total > 0 })

// Mapper transforms items
mapper := streamz.NewMapper(func(o Order) Order {
    o.ProcessedAt = time.Now()
    return o
})

// Batcher collects items by size or time
batcher := streamz.NewBatcher[Order](streamz.BatchConfig{
    MaxSize:    100,
    MaxLatency: time.Second,
})

Compose them into pipelines — each output feeds the next input:

filtered := filter.Process(ctx, orders)
mapped := mapper.Process(ctx, filtered)
batched := batcher.Process(ctx, mapped)

for batch := range batched {
    if batch.IsSuccess() {
        bulkInsert(batch.Value())
    }
}

One interface. One channel. Every pattern.

Install

go get github.com/zoobz-io/streamz

Requires Go 1.24+.

Quick Start

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/zoobz-io/streamz"
)

type Order struct {
    ID          string
    Total       float64
    ProcessedAt time.Time
}

func main() {
    ctx := context.Background()

    // Source channel
    orders := make(chan streamz.Result[Order], 10)
    go func() {
        defer close(orders)
        orders <- streamz.Success(Order{ID: "A", Total: 99.99})
        orders <- streamz.Success(Order{ID: "B", Total: 149.99})
        orders <- streamz.Success(Order{ID: "C", Total: 0}) // will be filtered
    }()

    // Build processors
    filter := streamz.NewFilter(func(o Order) bool { return o.Total > 0 })
    mapper := streamz.NewMapper(func(o Order) Order {
        o.ProcessedAt = time.Now()
        return o
    })

    // Compose pipeline
    filtered := filter.Process(ctx, orders)
    processed := mapper.Process(ctx, filtered)

    // Consume results
    for result := range processed {
        if result.IsSuccess() {
            o := result.Value()
            fmt.Printf("Processed: %s ($%.2f) at %v\n", o.ID, o.Total, o.ProcessedAt)
        }
    }
}

Capabilities

Feature Description Docs
Result[T] Pattern Unified success/error handling in a single channel Concepts
Processor Interface Common pattern for all stream operations Processors
Batching & Windowing Time and size-based aggregation Architecture
Flow Control Throttle, debounce, buffer, backpressure Backpressure
Deterministic Testing Clock abstraction for reproducible tests Testing
Error Handling Skip, retry, dead letter queues Error Handling

Why streamz?

  • Type-safe — Full compile-time checking with Go generics
  • Composable — Complex pipelines from simple, reusable parts
  • Unified errorsResult[T] eliminates dual-channel complexity
  • Deterministic testing — Clock abstraction enables reproducible time-based tests
  • Production ready — Proper channel lifecycle, no goroutine leaks
  • Minimal dependencies — Standard library plus clockz

Composable Stream Architecture

streamz enables a pattern: define processors once, compose them into any pipeline.

Your stream operations become reusable building blocks. Validation, enrichment, batching, rate limiting — each is a processor. Combine them in different configurations for different use cases.

// Reusable processors
validate := streamz.NewFilter(isValid)
enrich := streamz.NewAsyncMapper(fetchMetadata).WithWorkers(10)
batch := streamz.NewBatcher[Order](streamz.BatchConfig{MaxSize: 100})
throttle := streamz.NewThrottle[[]Order](100 * time.Millisecond)

// Real-time pipeline
realtime := mapper.Process(ctx, filter.Process(ctx, orders))

// Batch pipeline with rate limiting
batched := throttle.Process(ctx, batch.Process(ctx, enrich.Process(ctx, validate.Process(ctx, orders))))

Time-dependent processors use clockz for deterministic testing — advance time explicitly, verify behavior reproducibly.

Documentation

Full documentation is available in the docs/ directory:

Learn

Guides

Cookbook

  • Recipes — Complete examples for common scenarios

Reference

Contributing

See CONTRIBUTING.md for guidelines. Run make help for available commands.

License

MIT License — see LICENSE for details.