Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 62 additions & 7 deletions adaptive_concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,26 @@ type AdaptiveLimiter struct {
// Decay mechanism
lastDecayUpdate time.Time
decayInterval time.Duration

// Health channel for backpressure signaling
healthCh chan StateEvent
healthChInit bool

// Track previous max for change detection
prevMaxConcurrency int32
}

// NewAdaptiveLimiter creates a new AdaptiveLimiter with default configurations.
func NewAdaptiveLimiter() *AdaptiveLimiter {
return &AdaptiveLimiter{
maxConcurrency: 10, // Initial guess
minConcurrency: 1, // Floor
limitScaling: 0.8, // Multiplicative Decrease factor
threshold: 1.5, // Threshold for increase/decrease
alpha: 0.1, // EWMA smoothing factor
decayInterval: time.Minute,
lastDecayUpdate: time.Now(),
maxConcurrency: 10, // Initial guess
prevMaxConcurrency: 10,
minConcurrency: 1, // Floor
limitScaling: 0.8, // Multiplicative Decrease factor
threshold: 1.5, // Threshold for increase/decrease
alpha: 0.1, // EWMA smoothing factor
decayInterval: time.Minute,
lastDecayUpdate: time.Now(),
}
}

Expand Down Expand Up @@ -101,17 +109,64 @@ func (al *AdaptiveLimiter) update(rtt time.Duration) {
if float64(al.rttEWMA) < float64(al.minBaselineRTT)*al.threshold {
// Additive Increase
atomic.AddInt32(&al.maxConcurrency, 1)
al.checkAndEmitEvent(false)
} else {
// Multiplicative Decrease
newMax := float64(atomic.LoadInt32(&al.maxConcurrency)) * al.limitScaling
if newMax < float64(al.minConcurrency) {
newMax = float64(al.minConcurrency)
}
atomic.StoreInt32(&al.maxConcurrency, int32(newMax))
al.checkAndEmitEvent(true)
}
}

func (al *AdaptiveLimiter) checkAndEmitEvent(decreased bool) {
currentMax := atomic.LoadInt32(&al.maxConcurrency)
prev := al.prevMaxConcurrency

if decreased {
threshold := float64(prev) * 0.5
if float64(currentMax) <= threshold {
al.emitStateEvent(HealthStateDegraded, "capacity decreased")
}
} else {
if currentMax > prev {
al.emitStateEvent(HealthStateHealthy, "capacity increased")
}
}
al.prevMaxConcurrency = currentMax
}

// GetMaxConcurrency returns the current maximum concurrency limit.
func (al *AdaptiveLimiter) GetMaxConcurrency() int32 {
return atomic.LoadInt32(&al.maxConcurrency)
}

// Health returns a channel that emits state change events.
// The channel is created lazily on first access.
// The caller should use a select with default to handle non-blocking receive.
func (al *AdaptiveLimiter) Health() <-chan StateEvent {
al.mu.Lock()
defer al.mu.Unlock()

if !al.healthChInit {
al.healthCh = make(chan StateEvent, 10)
al.healthChInit = true
}
return al.healthCh
}

func (al *AdaptiveLimiter) emitStateEvent(state HealthState, message string) {
if al.healthChInit {
select {
case al.healthCh <- StateEvent{
Component: "adaptive-limiter",
State: state,
Timestamp: time.Now(),
Message: message,
}:
default:
}
}
}
38 changes: 36 additions & 2 deletions circuit/breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"time"
)

// State represents the current state of the circuit breaker.
// State represents the health state of the circuit breaker.
type State int

const (
Expand Down Expand Up @@ -104,6 +104,10 @@ type Breaker struct {
halfOpenCalls uint64
halfOpenFailures uint64
halfOpenCompleted uint64

// Health channel for backpressure signaling
healthCh chan StateEvent
healthChInit bool
}

// New returns a new Breaker with the provided configuration.
Expand Down Expand Up @@ -217,6 +221,34 @@ func (b *Breaker) Reset() {
b.transitionToClosed()
}

// Health returns a channel that emits state change events.
// The channel is created lazily on first access.
// The caller should use a select with default to handle non-blocking receive.
func (b *Breaker) Health() <-chan StateEvent {
b.mu.Lock()
defer b.mu.Unlock()

if !b.healthChInit {
b.healthCh = make(chan StateEvent, 10)
b.healthChInit = true
}
return b.healthCh
}

func (b *Breaker) emitStateEvent(state HealthState, message string) {
if b.healthChInit {
select {
case b.healthCh <- StateEvent{
Component: "circuit-breaker",
State: state,
Timestamp: time.Now(),
Message: message,
}:
default:
}
}
}

func (b *Breaker) onFailure(state State) {
if state == StateHalfOpen {
b.halfOpenFailures++
Expand Down Expand Up @@ -252,20 +284,22 @@ func (b *Breaker) onSuccess(state State) {
func (b *Breaker) transitionToOpen() {
b.state = StateOpen
b.openedAt = time.Now()
// Clear window stats when opening.
b.clearWindow()
b.emitStateEvent(HealthStateUnhealthy, "circuit opened")
}

func (b *Breaker) transitionToHalfOpen() {
b.state = StateHalfOpen
b.halfOpenCalls = 0
b.halfOpenFailures = 0
b.halfOpenCompleted = 0
b.emitStateEvent(HealthStateDegraded, "circuit half-open")
}

func (b *Breaker) transitionToClosed() {
b.state = StateClosed
b.clearWindow()
b.emitStateEvent(HealthStateHealthy, "circuit closed")
}

func (b *Breaker) clearWindow() {
Expand Down
29 changes: 29 additions & 0 deletions circuit/statevent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2026 Onur Cinar.
// The source code is provided under MIT License.
// https://github.com/cinar/resile

package circuit

import (
"time"
)

// State represents the health state of a resilience component.
type HealthState int

const (
// StateHealthy represents a healthy state where requests are allowed.
HealthStateHealthy HealthState = iota
// StateDegraded represents a degraded but functional state.
HealthStateDegraded
// StateUnhealthy represents an unhealthy state (e.g., circuit open).
HealthStateUnhealthy
)

// StateEvent represents a state change event emitted by resilience components.
type StateEvent struct {
Component string
State HealthState
Timestamp time.Time
Message string
}
167 changes: 167 additions & 0 deletions examples/backpressure/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright (c) 2026 Onur Cinar.
// The source code is provided under MIT License.
// https://github.com/cinar/resile

package main

import (
"context"
"errors"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/cinar/resile"
"github.com/cinar/resile/circuit"
)

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
fmt.Println("\nShutting down...")
cancel()
}()

fmt.Println("=== Go-Native Backpressure Signaling Example ===")
fmt.Println()

demonstrateCircuitBreakerBackpressure(ctx)
fmt.Println()
demonstrateAdaptiveLimiterBackpressure(ctx)
}

func demonstrateCircuitBreakerBackpressure(ctx context.Context) {
fmt.Println("--- Circuit Breaker Backpressure ---")

cb := circuit.New(circuit.Config{
WindowType: circuit.WindowCountBased,
WindowSize: 5,
FailureRateThreshold: 50.0,
MinimumCalls: 3,
ResetTimeout: 2 * time.Second,
HalfOpenMaxCalls: 2,
})

p := resile.NewPolicy(
resile.WithCircuitBreaker(cb),
resile.WithMaxAttempts(1),
)

healthCh := cb.Health()

action := func(ctx context.Context) error {
return errors.New("service unavailable")
}

circuitBreakerWorker(ctx, p, healthCh, action, 10)
}

func demonstrateAdaptiveLimiterBackpressure(ctx context.Context) {
fmt.Println("--- Adaptive Limiter Backpressure ---")

al := resile.NewAdaptiveLimiter()

p := resile.NewPolicy(
resile.WithAdaptiveLimiterInstance(al),
)

healthCh := al.Health()

action := func(ctx context.Context) error {
time.Sleep(50 * time.Millisecond)
return nil
}

limiterWorker(ctx, p, healthCh, action, 20)
}

func circuitBreakerWorker(ctx context.Context, p *resile.Policy, healthCh <-chan circuit.StateEvent, action func(context.Context) error, iterations int) {
var wg sync.WaitGroup
paused := false

for i := 0; i < iterations; i++ {
select {
case <-ctx.Done():
return
case event := <-healthCh:
switch event.State {
case circuit.HealthStateUnhealthy:
fmt.Printf(" [!] Backpressure: Circuit OPEN - pausing workers\n")
paused = true
case circuit.HealthStateDegraded:
fmt.Printf(" [!] Backpressure: Circuit HALF-OPEN - limiting probes\n")
paused = false
case circuit.HealthStateHealthy:
fmt.Printf(" [!] Backpressure: Circuit CLOSED - resuming workers\n")
paused = false
}
fmt.Printf(" Event: %s: %s\n", event.Component, event.Message)
default:
}

if paused {
fmt.Println(" [.] Worker paused, waiting for recovery...")
time.Sleep(100 * time.Millisecond)
continue
}

wg.Add(1)
go func(id int) {
defer wg.Done()
err := p.DoErr(ctx, action)
if err != nil {
log.Printf("Worker %d: error: %v", id, err)
} else if id%5 == 0 {
fmt.Printf("Worker %d: processed\n", id)
}
}(i)

time.Sleep(50 * time.Millisecond)
}

wg.Wait()
}

func limiterWorker(ctx context.Context, p *resile.Policy, healthCh <-chan resile.StateEvent, action func(context.Context) error, iterations int) {
var wg sync.WaitGroup

for i := 0; i < iterations; i++ {
select {
case <-ctx.Done():
return
case event := <-healthCh:
switch event.State {
case resile.HealthStateDegraded:
fmt.Printf(" [!] Backpressure: Capacity DEGRADED - slowing down\n")
case resile.HealthStateHealthy:
fmt.Printf(" [!] Backpressure: Capacity HEALTHY - normal processing\n")
}
fmt.Printf(" Event: %s: %s\n", event.Component, event.Message)
default:
}

wg.Add(1)
go func(id int) {
defer wg.Done()
err := p.DoErr(ctx, action)
if err != nil {
log.Printf("Worker %d: error: %v", id, err)
} else if id%5 == 0 {
fmt.Printf("Worker %d: processed\n", id)
}
}(i)

time.Sleep(30 * time.Millisecond)
}

wg.Wait()
}
Loading
Loading