diff --git a/adaptive_concurrency.go b/adaptive_concurrency.go index 5c00d55..d907885 100644 --- a/adaptive_concurrency.go +++ b/adaptive_concurrency.go @@ -33,18 +33,26 @@ type AdaptiveLimiter struct { // Decay mechanism lastDecayUpdate time.Time decayInterval time.Duration + + // Health channel for backpressure signaling + healthCh chan StateEvent + healthChInit bool + + // Track previous max for change detection + prevMaxConcurrency int32 } // NewAdaptiveLimiter creates a new AdaptiveLimiter with default configurations. func NewAdaptiveLimiter() *AdaptiveLimiter { return &AdaptiveLimiter{ - maxConcurrency: 10, // Initial guess - minConcurrency: 1, // Floor - limitScaling: 0.8, // Multiplicative Decrease factor - threshold: 1.5, // Threshold for increase/decrease - alpha: 0.1, // EWMA smoothing factor - decayInterval: time.Minute, - lastDecayUpdate: time.Now(), + maxConcurrency: 10, // Initial guess + prevMaxConcurrency: 10, + minConcurrency: 1, // Floor + limitScaling: 0.8, // Multiplicative Decrease factor + threshold: 1.5, // Threshold for increase/decrease + alpha: 0.1, // EWMA smoothing factor + decayInterval: time.Minute, + lastDecayUpdate: time.Now(), } } @@ -101,6 +109,7 @@ func (al *AdaptiveLimiter) update(rtt time.Duration) { if float64(al.rttEWMA) < float64(al.minBaselineRTT)*al.threshold { // Additive Increase atomic.AddInt32(&al.maxConcurrency, 1) + al.checkAndEmitEvent(false) } else { // Multiplicative Decrease newMax := float64(atomic.LoadInt32(&al.maxConcurrency)) * al.limitScaling @@ -108,10 +117,56 @@ func (al *AdaptiveLimiter) update(rtt time.Duration) { newMax = float64(al.minConcurrency) } atomic.StoreInt32(&al.maxConcurrency, int32(newMax)) + al.checkAndEmitEvent(true) + } +} + +func (al *AdaptiveLimiter) checkAndEmitEvent(decreased bool) { + currentMax := atomic.LoadInt32(&al.maxConcurrency) + prev := al.prevMaxConcurrency + + if decreased { + threshold := float64(prev) * 0.5 + if float64(currentMax) <= threshold { + al.emitStateEvent(HealthStateDegraded, "capacity decreased") + } + } else { + if currentMax > prev { + al.emitStateEvent(HealthStateHealthy, "capacity increased") + } } + al.prevMaxConcurrency = currentMax } // GetMaxConcurrency returns the current maximum concurrency limit. func (al *AdaptiveLimiter) GetMaxConcurrency() int32 { return atomic.LoadInt32(&al.maxConcurrency) } + +// Health returns a channel that emits state change events. +// The channel is created lazily on first access. +// The caller should use a select with default to handle non-blocking receive. +func (al *AdaptiveLimiter) Health() <-chan StateEvent { + al.mu.Lock() + defer al.mu.Unlock() + + if !al.healthChInit { + al.healthCh = make(chan StateEvent, 10) + al.healthChInit = true + } + return al.healthCh +} + +func (al *AdaptiveLimiter) emitStateEvent(state HealthState, message string) { + if al.healthChInit { + select { + case al.healthCh <- StateEvent{ + Component: "adaptive-limiter", + State: state, + Timestamp: time.Now(), + Message: message, + }: + default: + } + } +} diff --git a/circuit/breaker.go b/circuit/breaker.go index f5ff593..14491d0 100644 --- a/circuit/breaker.go +++ b/circuit/breaker.go @@ -11,7 +11,7 @@ import ( "time" ) -// State represents the current state of the circuit breaker. +// State represents the health state of the circuit breaker. type State int const ( @@ -104,6 +104,10 @@ type Breaker struct { halfOpenCalls uint64 halfOpenFailures uint64 halfOpenCompleted uint64 + + // Health channel for backpressure signaling + healthCh chan StateEvent + healthChInit bool } // New returns a new Breaker with the provided configuration. @@ -217,6 +221,34 @@ func (b *Breaker) Reset() { b.transitionToClosed() } +// Health returns a channel that emits state change events. +// The channel is created lazily on first access. +// The caller should use a select with default to handle non-blocking receive. +func (b *Breaker) Health() <-chan StateEvent { + b.mu.Lock() + defer b.mu.Unlock() + + if !b.healthChInit { + b.healthCh = make(chan StateEvent, 10) + b.healthChInit = true + } + return b.healthCh +} + +func (b *Breaker) emitStateEvent(state HealthState, message string) { + if b.healthChInit { + select { + case b.healthCh <- StateEvent{ + Component: "circuit-breaker", + State: state, + Timestamp: time.Now(), + Message: message, + }: + default: + } + } +} + func (b *Breaker) onFailure(state State) { if state == StateHalfOpen { b.halfOpenFailures++ @@ -252,8 +284,8 @@ func (b *Breaker) onSuccess(state State) { func (b *Breaker) transitionToOpen() { b.state = StateOpen b.openedAt = time.Now() - // Clear window stats when opening. b.clearWindow() + b.emitStateEvent(HealthStateUnhealthy, "circuit opened") } func (b *Breaker) transitionToHalfOpen() { @@ -261,11 +293,13 @@ func (b *Breaker) transitionToHalfOpen() { b.halfOpenCalls = 0 b.halfOpenFailures = 0 b.halfOpenCompleted = 0 + b.emitStateEvent(HealthStateDegraded, "circuit half-open") } func (b *Breaker) transitionToClosed() { b.state = StateClosed b.clearWindow() + b.emitStateEvent(HealthStateHealthy, "circuit closed") } func (b *Breaker) clearWindow() { diff --git a/circuit/statevent.go b/circuit/statevent.go new file mode 100644 index 0000000..f16ba87 --- /dev/null +++ b/circuit/statevent.go @@ -0,0 +1,29 @@ +// Copyright (c) 2026 Onur Cinar. +// The source code is provided under MIT License. +// https://github.com/cinar/resile + +package circuit + +import ( + "time" +) + +// State represents the health state of a resilience component. +type HealthState int + +const ( + // StateHealthy represents a healthy state where requests are allowed. + HealthStateHealthy HealthState = iota + // StateDegraded represents a degraded but functional state. + HealthStateDegraded + // StateUnhealthy represents an unhealthy state (e.g., circuit open). + HealthStateUnhealthy +) + +// StateEvent represents a state change event emitted by resilience components. +type StateEvent struct { + Component string + State HealthState + Timestamp time.Time + Message string +} diff --git a/examples/backpressure/main.go b/examples/backpressure/main.go new file mode 100644 index 0000000..6e452b5 --- /dev/null +++ b/examples/backpressure/main.go @@ -0,0 +1,167 @@ +// Copyright (c) 2026 Onur Cinar. +// The source code is provided under MIT License. +// https://github.com/cinar/resile + +package main + +import ( + "context" + "errors" + "fmt" + "log" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/cinar/resile" + "github.com/cinar/resile/circuit" +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigCh + fmt.Println("\nShutting down...") + cancel() + }() + + fmt.Println("=== Go-Native Backpressure Signaling Example ===") + fmt.Println() + + demonstrateCircuitBreakerBackpressure(ctx) + fmt.Println() + demonstrateAdaptiveLimiterBackpressure(ctx) +} + +func demonstrateCircuitBreakerBackpressure(ctx context.Context) { + fmt.Println("--- Circuit Breaker Backpressure ---") + + cb := circuit.New(circuit.Config{ + WindowType: circuit.WindowCountBased, + WindowSize: 5, + FailureRateThreshold: 50.0, + MinimumCalls: 3, + ResetTimeout: 2 * time.Second, + HalfOpenMaxCalls: 2, + }) + + p := resile.NewPolicy( + resile.WithCircuitBreaker(cb), + resile.WithMaxAttempts(1), + ) + + healthCh := cb.Health() + + action := func(ctx context.Context) error { + return errors.New("service unavailable") + } + + circuitBreakerWorker(ctx, p, healthCh, action, 10) +} + +func demonstrateAdaptiveLimiterBackpressure(ctx context.Context) { + fmt.Println("--- Adaptive Limiter Backpressure ---") + + al := resile.NewAdaptiveLimiter() + + p := resile.NewPolicy( + resile.WithAdaptiveLimiterInstance(al), + ) + + healthCh := al.Health() + + action := func(ctx context.Context) error { + time.Sleep(50 * time.Millisecond) + return nil + } + + limiterWorker(ctx, p, healthCh, action, 20) +} + +func circuitBreakerWorker(ctx context.Context, p *resile.Policy, healthCh <-chan circuit.StateEvent, action func(context.Context) error, iterations int) { + var wg sync.WaitGroup + paused := false + + for i := 0; i < iterations; i++ { + select { + case <-ctx.Done(): + return + case event := <-healthCh: + switch event.State { + case circuit.HealthStateUnhealthy: + fmt.Printf(" [!] Backpressure: Circuit OPEN - pausing workers\n") + paused = true + case circuit.HealthStateDegraded: + fmt.Printf(" [!] Backpressure: Circuit HALF-OPEN - limiting probes\n") + paused = false + case circuit.HealthStateHealthy: + fmt.Printf(" [!] Backpressure: Circuit CLOSED - resuming workers\n") + paused = false + } + fmt.Printf(" Event: %s: %s\n", event.Component, event.Message) + default: + } + + if paused { + fmt.Println(" [.] Worker paused, waiting for recovery...") + time.Sleep(100 * time.Millisecond) + continue + } + + wg.Add(1) + go func(id int) { + defer wg.Done() + err := p.DoErr(ctx, action) + if err != nil { + log.Printf("Worker %d: error: %v", id, err) + } else if id%5 == 0 { + fmt.Printf("Worker %d: processed\n", id) + } + }(i) + + time.Sleep(50 * time.Millisecond) + } + + wg.Wait() +} + +func limiterWorker(ctx context.Context, p *resile.Policy, healthCh <-chan resile.StateEvent, action func(context.Context) error, iterations int) { + var wg sync.WaitGroup + + for i := 0; i < iterations; i++ { + select { + case <-ctx.Done(): + return + case event := <-healthCh: + switch event.State { + case resile.HealthStateDegraded: + fmt.Printf(" [!] Backpressure: Capacity DEGRADED - slowing down\n") + case resile.HealthStateHealthy: + fmt.Printf(" [!] Backpressure: Capacity HEALTHY - normal processing\n") + } + fmt.Printf(" Event: %s: %s\n", event.Component, event.Message) + default: + } + + wg.Add(1) + go func(id int) { + defer wg.Done() + err := p.DoErr(ctx, action) + if err != nil { + log.Printf("Worker %d: error: %v", id, err) + } else if id%5 == 0 { + fmt.Printf("Worker %d: processed\n", id) + } + }(i) + + time.Sleep(30 * time.Millisecond) + } + + wg.Wait() +} diff --git a/policy.go b/policy.go index 3ad5970..9f7163e 100644 --- a/policy.go +++ b/policy.go @@ -8,6 +8,8 @@ import ( "context" "errors" "time" + + "github.com/cinar/resile/circuit" ) // fatalError is a private wrapper to indicate an error should not be retried. @@ -120,5 +122,17 @@ func (p *Policy) DoErr(ctx context.Context, action func(context.Context) error) return p.config.execute(ctx, nil, action) } +// Health returns a channel that emits state change events from the underlying resilience components. +// It returns nil if no health-reporting component is configured. +// The caller should use a select with default to handle non-blocking receive. +// Note: Only circuit breaker health events are exposed through Policy.Health(). +// For adaptive limiter events, access the limiter directly via AdaptiveLimiter.Health(). +func (p *Policy) Health() <-chan circuit.StateEvent { + if p.config.CircuitBreaker != nil { + return p.config.CircuitBreaker.Health() + } + return nil +} + // middleware defines a function that wraps a doAction with additional resilience logic. type middleware func(doAction) doAction diff --git a/statevent.go b/statevent.go new file mode 100644 index 0000000..edd3b76 --- /dev/null +++ b/statevent.go @@ -0,0 +1,29 @@ +// Copyright (c) 2026 Onur Cinar. +// The source code is provided under MIT License. +// https://github.com/cinar/resile + +package resile + +import ( + "time" +) + +// HealthState represents the health state of a resilience component. +type HealthState int + +const ( + // HealthStateHealthy represents a healthy state where requests are allowed. + HealthStateHealthy HealthState = iota + // HealthStateDegraded represents a degraded but functional state. + HealthStateDegraded + // HealthStateUnhealthy represents an unhealthy state (e.g., circuit open). + HealthStateUnhealthy +) + +// StateEvent represents a state change event emitted by resilience components. +type StateEvent struct { + Component string + State HealthState + Timestamp time.Time + Message string +}