From 4f46233201258cabc564890984807eaaa5ee118e Mon Sep 17 00:00:00 2001 From: Alexandr Rutkowski Date: Sun, 12 Apr 2026 12:05:49 +0200 Subject: [PATCH 01/10] Implement hystrix adaptive closer --- closers/hystrix-adaptive/adaptive.go | 213 ++++++++++++++++++++++ closers/hystrix-adaptive/adaptive_test.go | 90 +++++++++ closers/hystrix-adaptive/config.go | 80 ++++++++ closers/hystrix-adaptive/doc.go | 4 + 4 files changed, 387 insertions(+) create mode 100644 closers/hystrix-adaptive/adaptive.go create mode 100644 closers/hystrix-adaptive/adaptive_test.go create mode 100644 closers/hystrix-adaptive/config.go create mode 100644 closers/hystrix-adaptive/doc.go diff --git a/closers/hystrix-adaptive/adaptive.go b/closers/hystrix-adaptive/adaptive.go new file mode 100644 index 0000000..1ac201e --- /dev/null +++ b/closers/hystrix-adaptive/adaptive.go @@ -0,0 +1,213 @@ +package hystrixadaptive + +import ( + "context" + "encoding/json" + "sync" + "time" + + "github.com/cep21/circuit/v4" + "github.com/cep21/circuit/v4/closers/hystrix" + "github.com/cep21/circuit/v4/faststats" +) + +// AdaptiveOpener composes *hystrix.Opener and overrides ShouldOpen to avoid opening when +// recent failures are mostly timeouts during elevated latency headroom. +type AdaptiveOpener struct { + Opener *hystrix.Opener + + mu sync.Mutex + config ConfigureAdaptive + + // extra is added to BaselineLatency when deciding if a success was "slow" and for ShouldOpen. + extra time.Duration + + timeoutCount faststats.RollingCounter + failureCount faststats.RollingCounter +} + +var _ circuit.ClosedToOpen = (*AdaptiveOpener)(nil) + +// OpenerFactory returns a ClosedToOpen factory that wraps hystrix.OpenerFactory. +func OpenerFactory(config ConfigureAdaptive) func() circuit.ClosedToOpen { + return func() circuit.ClosedToOpen { + cfg := config + cfg.Merge(defaultConfigureAdaptive) + inner := hystrix.OpenerFactory(cfg.ConfigureOpener)().(*hystrix.Opener) + a := &AdaptiveOpener{Opener: inner} + a.setConfigNotThreadSafeLocked(cfg) + return a + } +} + +// ShouldOpen delegates to the Hystrix opener, then may suppress opening when headroom is +// non-zero and rolling failures are predominantly timeouts (ambient slowness). +func (a *AdaptiveOpener) ShouldOpen(ctx context.Context, now time.Time) bool { + if !a.Opener.ShouldOpen(ctx, now) { + return false + } + a.mu.Lock() + defer a.mu.Unlock() + if a.extra <= 0 { + return true + } + t := a.timeoutCount.RollingSumAt(now) + f := a.failureCount.RollingSumAt(now) + if t+f == 0 { + return true + } + ratio := float64(t) / float64(t+f) + if ratio >= a.config.MinTimeoutRatioToDefer { + return false + } + return true +} + +func (a *AdaptiveOpener) Prevent(ctx context.Context, now time.Time) bool { + return a.Opener.Prevent(ctx, now) +} + +func (a *AdaptiveOpener) Closed(ctx context.Context, now time.Time) { + a.Opener.Closed(ctx, now) + a.resetAdaptive(now) +} + +func (a *AdaptiveOpener) Opened(ctx context.Context, now time.Time) { + a.Opener.Opened(ctx, now) + a.resetAdaptive(now) +} + +func (a *AdaptiveOpener) Success(ctx context.Context, now time.Time, d time.Duration) { + a.Opener.Success(ctx, now, d) + a.mu.Lock() + defer a.mu.Unlock() + a.adjustExtraOnSuccessLocked(d) +} + +func (a *AdaptiveOpener) ErrBadRequest(ctx context.Context, now time.Time, d time.Duration) { + a.Opener.ErrBadRequest(ctx, now, d) +} + +func (a *AdaptiveOpener) ErrInterrupt(ctx context.Context, now time.Time, d time.Duration) { + a.Opener.ErrInterrupt(ctx, now, d) +} + +func (a *AdaptiveOpener) ErrFailure(ctx context.Context, now time.Time, d time.Duration) { + a.Opener.ErrFailure(ctx, now, d) + a.mu.Lock() + defer a.mu.Unlock() + a.failureCount.Inc(now) +} + +func (a *AdaptiveOpener) ErrTimeout(ctx context.Context, now time.Time, d time.Duration) { + a.Opener.ErrTimeout(ctx, now, d) + a.mu.Lock() + defer a.mu.Unlock() + a.timeoutCount.Inc(now) + a.bumpExtraLocked() +} + +func (a *AdaptiveOpener) ErrConcurrencyLimitReject(ctx context.Context, now time.Time) { + a.Opener.ErrConcurrencyLimitReject(ctx, now) +} + +func (a *AdaptiveOpener) ErrShortCircuit(ctx context.Context, now time.Time) { + a.Opener.ErrShortCircuit(ctx, now) +} + +func (a *AdaptiveOpener) adjustExtraOnSuccessLocked(d time.Duration) { + base := a.config.BaselineLatency + maxE := a.config.MaxExtraLatency + effectiveSlow := base + a.extra + switch { + case d > effectiveSlow: + a.extra += a.config.IncreaseExtra + if a.extra > maxE { + a.extra = maxE + } + case d < base: + a.extra -= a.config.DecreaseExtra + if a.extra < 0 { + a.extra = 0 + } + } +} + +func (a *AdaptiveOpener) bumpExtraLocked() { + maxE := a.config.MaxExtraLatency + a.extra += a.config.IncreaseExtra + if a.extra > maxE { + a.extra = maxE + } +} + +func (a *AdaptiveOpener) resetAdaptive(now time.Time) { + a.mu.Lock() + defer a.mu.Unlock() + a.extra = 0 + a.timeoutCount.Reset(now) + a.failureCount.Reset(now) +} + +// SetConfigThreadSafe updates hystrix opener fields from ConfigureOpener. +func (a *AdaptiveOpener) SetConfigThreadSafe(props ConfigureAdaptive) { + props.Merge(defaultConfigureAdaptive) + a.mu.Lock() + a.config = props + a.mu.Unlock() + a.Opener.SetConfigThreadSafe(props.ConfigureOpener) +} + +// SetConfigNotThreadSafe reinitializes rolling windows for the adaptive split counters. +func (a *AdaptiveOpener) SetConfigNotThreadSafe(props ConfigureAdaptive) { + a.setConfigNotThreadSafeLocked(props) +} + +func (a *AdaptiveOpener) setConfigNotThreadSafeLocked(props ConfigureAdaptive) { + props.Merge(defaultConfigureAdaptive) + a.mu.Lock() + a.config = props + a.extra = 0 + a.mu.Unlock() + a.Opener.SetConfigNotThreadSafe(props.ConfigureOpener) + ho := props.ConfigureOpener + nowFn := ho.Now + if nowFn == nil { + nowFn = time.Now + } + t := nowFn() + rollingCounterBucketWidth := time.Duration(ho.RollingDuration.Nanoseconds() / int64(ho.NumBuckets)) + a.mu.Lock() + a.timeoutCount = faststats.NewRollingCounter(rollingCounterBucketWidth, ho.NumBuckets, t) + a.failureCount = faststats.NewRollingCounter(rollingCounterBucketWidth, ho.NumBuckets, t) + a.mu.Unlock() +} + +// Config returns the merged adaptive configuration (including embedded hystrix opener config). +func (a *AdaptiveOpener) Config() ConfigureAdaptive { + a.mu.Lock() + defer a.mu.Unlock() + return a.config +} + +// ExtraLatency returns the current adaptive headroom on top of BaselineLatency. +func (a *AdaptiveOpener) ExtraLatency() time.Duration { + a.mu.Lock() + defer a.mu.Unlock() + return a.extra +} + +// MarshalJSON exposes opener state for debugging. +func (a *AdaptiveOpener) MarshalJSON() ([]byte, error) { + a.mu.Lock() + defer a.mu.Unlock() + return json.Marshal(map[string]interface{}{ + "hystrix": a.Opener, + "config": a.config, + "extra_latency_ns": int64(a.extra), + "timeouts": &a.timeoutCount, + "failures": &a.failureCount, + }) +} + +var _ json.Marshaler = (*AdaptiveOpener)(nil) diff --git a/closers/hystrix-adaptive/adaptive_test.go b/closers/hystrix-adaptive/adaptive_test.go new file mode 100644 index 0000000..3ccfb7d --- /dev/null +++ b/closers/hystrix-adaptive/adaptive_test.go @@ -0,0 +1,90 @@ +package hystrixadaptive + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/cep21/circuit/v4/closers/hystrix" +) + +func TestAdaptiveOpener_TimeoutHeavyDefersOpen(t *testing.T) { + ctx := context.Background() + o := OpenerFactory(ConfigureAdaptive{ + ConfigureOpener: hystrix.ConfigureOpener{ + RequestVolumeThreshold: 3, + ErrorThresholdPercentage: 50, + NumBuckets: 10, + RollingDuration: 10 * time.Second, + }, + MinTimeoutRatioToDefer: 0.85, + })().(*AdaptiveOpener) + // Timestamps must be >= rolling window StartTime (set when the opener is constructed). + now := time.Now() + + if o.ShouldOpen(ctx, now) { + t.Fatal("should not open with no traffic") + } + for i := 0; i < 3; i++ { + o.ErrTimeout(ctx, now, 100*time.Millisecond) + } + if o.ExtraLatency() <= 0 { + t.Fatal("expected non-zero extra headroom after timeouts") + } + if !o.Opener.ShouldOpen(ctx, now) { + t.Fatal("inner hystrix opener should want to open") + } + if o.ShouldOpen(ctx, now) { + t.Fatal("adaptive layer should defer open when failures are mostly timeouts with headroom") + } +} + +func TestAdaptiveOpener_FailuresStillOpen(t *testing.T) { + ctx := context.Background() + o := OpenerFactory(ConfigureAdaptive{ + ConfigureOpener: hystrix.ConfigureOpener{ + RequestVolumeThreshold: 3, + ErrorThresholdPercentage: 50, + NumBuckets: 10, + RollingDuration: 10 * time.Second, + }, + })().(*AdaptiveOpener) + now := time.Now() + + for i := 0; i < 3; i++ { + o.ErrFailure(ctx, now, time.Millisecond) + } + if !o.ShouldOpen(ctx, now) { + t.Fatal("expected open when rolling window is failure-dominated") + } +} + +func TestAdaptiveOpener_MarshalJSON(t *testing.T) { + o := OpenerFactory(ConfigureAdaptive{})().(*AdaptiveOpener) + ctx := context.Background() + now := time.Now() + o.ErrTimeout(ctx, now, time.Second) + b, err := o.MarshalJSON() + if err != nil { + t.Fatal(err) + } + if !strings.Contains(string(b), "timeouts") { + t.Fatalf("expected timeouts in json: %s", b) + } +} + +func TestFactoryConfigure(t *testing.T) { + f := Factory{ + ConfigureAdaptive: ConfigureAdaptive{ + ConfigureOpener: hystrix.ConfigureOpener{ + RequestVolumeThreshold: 7, + }, + }, + } + cfg := f.Configure("x") + ao := cfg.General.ClosedToOpenFactory().(*AdaptiveOpener) + if ao.Config().ConfigureOpener.RequestVolumeThreshold != 7 { + t.Fatalf("got threshold %d", ao.Config().ConfigureOpener.RequestVolumeThreshold) + } +} diff --git a/closers/hystrix-adaptive/config.go b/closers/hystrix-adaptive/config.go new file mode 100644 index 0000000..8cdeaaf --- /dev/null +++ b/closers/hystrix-adaptive/config.go @@ -0,0 +1,80 @@ +package hystrixadaptive + +import ( + "time" + + "github.com/cep21/circuit/v4" + "github.com/cep21/circuit/v4/closers/hystrix" +) + +// ConfigureAdaptive configures the adaptive opener and embeds hystrix.ConfigureOpener. +type ConfigureAdaptive struct { + hystrix.ConfigureOpener + + // BaselineLatency is the expected healthy latency (e.g. matches a strict timeout budget + // before adaptive headroom is applied). + BaselineLatency time.Duration + // MaxExtraLatency caps how much additional headroom can accumulate (e.g. 200ms above baseline). + MaxExtraLatency time.Duration + // IncreaseExtra is added to the headroom when a run is slower than baseline+headroom or on timeout. + IncreaseExtra time.Duration + // DecreaseExtra is subtracted from headroom on fast successes (duration below BaselineLatency). + DecreaseExtra time.Duration + // MinTimeoutRatioToDefer is the minimum rolling ratio of timeouts to (timeouts+failures) + // required before ShouldOpen defers to the inner opener when headroom is non-zero. + MinTimeoutRatioToDefer float64 +} + +// Merge fills zero values from other. +func (c *ConfigureAdaptive) Merge(other ConfigureAdaptive) { + c.ConfigureOpener.Merge(other.ConfigureOpener) + if c.BaselineLatency == 0 { + c.BaselineLatency = other.BaselineLatency + } + if c.MaxExtraLatency == 0 { + c.MaxExtraLatency = other.MaxExtraLatency + } + if c.IncreaseExtra == 0 { + c.IncreaseExtra = other.IncreaseExtra + } + if c.DecreaseExtra == 0 { + c.DecreaseExtra = other.DecreaseExtra + } + if c.MinTimeoutRatioToDefer == 0 { + c.MinTimeoutRatioToDefer = other.MinTimeoutRatioToDefer + } +} + +var defaultConfigureAdaptive = ConfigureAdaptive{ + ConfigureOpener: hystrix.ConfigureOpener{ + RequestVolumeThreshold: 20, + ErrorThresholdPercentage: 50, + Now: time.Now, + NumBuckets: 10, + RollingDuration: 10 * time.Second, + }, + BaselineLatency: 100 * time.Millisecond, + MaxExtraLatency: 200 * time.Millisecond, + IncreaseExtra: 10 * time.Millisecond, + DecreaseExtra: 10 * time.Millisecond, + MinTimeoutRatioToDefer: 0.85, +} + +// Factory builds circuit configs that use the adaptive opener with optional hystrix closer wiring. +type Factory struct { + hystrix.Factory + ConfigureAdaptive ConfigureAdaptive + CreateConfigureAdaptive []func(circuitName string) ConfigureAdaptive +} + +// Configure returns a circuit.Config with adaptive ClosedToOpen and hystrix OpenToClosed. +func (f *Factory) Configure(circuitName string) circuit.Config { + cfg := f.Factory.Configure(circuitName) + final := ConfigureAdaptive{} + for i := len(f.CreateConfigureAdaptive) - 1; i >= 0; i-- { + final.Merge(f.CreateConfigureAdaptive[i](circuitName)) + } + final.Merge(f.ConfigureAdaptive) + cfg.General.ClosedToOpenFactory = OpenerFactory(final) + return cfg +} diff --git a/closers/hystrix-adaptive/doc.go b/closers/hystrix-adaptive/doc.go new file mode 100644 index 0000000..7dc01dd --- /dev/null +++ b/closers/hystrix-adaptive/doc.go @@ -0,0 +1,4 @@ +// Package hystrixadaptive provides a ClosedToOpen implementation that composes the +// standard Hystrix opener and adapts open decisions when failures are dominated by +// timeouts while extra latency headroom is elevated (ambient slowness). +package hystrixadaptive From 6dcdcebdb48d9d205fbefbfd724e6dd7e5a506a6 Mon Sep 17 00:00:00 2001 From: Alexandr Rutkowski Date: Sun, 12 Apr 2026 12:08:02 +0200 Subject: [PATCH 02/10] Update README.md to show how to use hystrix adaptive --- README.md | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/README.md b/README.md index af67575..91c3ba6 100644 --- a/README.md +++ b/README.md @@ -142,6 +142,47 @@ fmt.Println("This is a hystrix configured circuit", c.Name()) // Output: This is a hystrix configured circuit hystrix-circuit ``` +## Adaptive Hystrix opener (`closers/hystrix-adaptive`) + +The [hystrix](https://pkg.go.dev/github.com/cep21/circuit/v4/closers/hystrix) opener opens the circuit when error rate and volume pass configured thresholds. If the whole dependency slows down a little (for example every call hits your execution timeout), that can look like 100% errors even though the system is only degraded. The **`hystrixadaptive`** package composes the same `*hystrix.Opener` and makes **`ShouldOpen`** adaptive: it tracks extra latency **headroom** (min / max / step size on success and timeout) and, when headroom is non-zero, can **defer opening** while recent failures are mostly **timeouts** rather than application failures. + +This does **not** change **`Execution.Timeout`** on the circuit. You still set the run deadline in `circuit.ExecutionConfig` (and may raise it toward `BaselineLatency + MaxExtraLatency` by other means if you want slow requests to complete). The adaptive opener only changes when the breaker **opens** given the same Hystrix rolling error metrics. + +Use **`hystrixadaptive.Factory`** like **`hystrix.Factory`**: embed **`hystrix.Factory`** for the closer (and optional base opener defaults), and set **`ConfigureAdaptive`** for adaptive fields. The package name is **`hystrixadaptive`** (`import "github.com/cep21/circuit/v4/closers/hystrix-adaptive"`). + +```go +configuration := hystrixadaptive.Factory{ + Factory: hystrix.Factory{ + ConfigureCloser: hystrix.ConfigureCloser{ + // Same half-open / sleep window behavior as plain Hystrix + }, + }, + ConfigureAdaptive: hystrixadaptive.ConfigureAdaptive{ + ConfigureOpener: hystrix.ConfigureOpener{ + RequestVolumeThreshold: 10, + }, + // Expected healthy latency; "fast" successes decay headroom below this. + BaselineLatency: 100 * time.Millisecond, + // Cap on accumulated headroom (e.g. allow up to baseline + 200ms conceptually). + MaxExtraLatency: 200 * time.Millisecond, + // Added on each timeout and on successes slower than baseline+headroom. + IncreaseExtra: 10 * time.Millisecond, + // Subtracted when a success finishes faster than BaselineLatency. + DecreaseExtra: 10 * time.Millisecond, + // While headroom > 0, defer opening if timeouts/(timeouts+failures) is at least this ratio. + MinTimeoutRatioToDefer: 0.85, + }, +} +h := circuit.Manager{ + DefaultCircuitProperties: []circuit.CommandPropertiesConstructor{configuration.Configure}, +} +c := h.MustCreateCircuit("adaptive-hystrix") +fmt.Println("This circuit uses the adaptive Hystrix opener", c.Name()) +// Output: This circuit uses the adaptive Hystrix opener adaptive-hystrix +``` + +To wire only the opener (for example with a custom closer), use **`hystrixadaptive.OpenerFactory`** with **`hystrixadaptive.ConfigureAdaptive`** (which embeds **`hystrix.ConfigureOpener`**). At runtime you can type-assert **`ClosedToOpen`** to **`*hystrixadaptive.AdaptiveOpener`** and call **`ExtraLatency()`** or inspect **`Opener`** (`*hystrix.Opener`) if you need the inner metrics. + ## [Enable dashboard metrics](https://godoc.org/github.com/cep21/circuit/metriceventstream#example-MetricEventStream) Dashboard metrics can be enabled with the MetricEventStream object. This example creates an event stream handler, From 22822b285fb18f521409cefd9d9ee15a2ed07eea Mon Sep 17 00:00:00 2001 From: Alexandr Rutkowski Date: Sun, 12 Apr 2026 12:13:26 +0200 Subject: [PATCH 03/10] Add more tests to check how hystrix adaptive add extra latency --- closers/hystrix-adaptive/adaptive_test.go | 103 ++++++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/closers/hystrix-adaptive/adaptive_test.go b/closers/hystrix-adaptive/adaptive_test.go index 3ccfb7d..f364df3 100644 --- a/closers/hystrix-adaptive/adaptive_test.go +++ b/closers/hystrix-adaptive/adaptive_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/cep21/circuit/v4" "github.com/cep21/circuit/v4/closers/hystrix" ) @@ -88,3 +89,105 @@ func TestFactoryConfigure(t *testing.T) { t.Fatalf("got threshold %d", ao.Config().ConfigureOpener.RequestVolumeThreshold) } } + +func TestAdaptiveOpener_FastSuccessClearsExtraHeadroom(t *testing.T) { + ctx := context.Background() + o := OpenerFactory(ConfigureAdaptive{ + ConfigureOpener: hystrix.ConfigureOpener{ + RequestVolumeThreshold: 3, + ErrorThresholdPercentage: 50, + NumBuckets: 10, + RollingDuration: 10 * time.Second, + }, + BaselineLatency: 100 * time.Millisecond, + IncreaseExtra: 10 * time.Millisecond, + DecreaseExtra: 10 * time.Millisecond, + MaxExtraLatency: 200 * time.Millisecond, + })().(*AdaptiveOpener) + now := time.Now() + + o.ErrTimeout(ctx, now, 100*time.Millisecond) + if got := o.ExtraLatency(); got != 10*time.Millisecond { + t.Fatalf("after one timeout, extra = %v, want 10ms", got) + } + o.Success(ctx, now, 5*time.Millisecond) + if got := o.ExtraLatency(); got != 0 { + t.Fatalf("after fast success below baseline, extra = %v, want 0", got) + } +} + +func TestAdaptiveOpener_ClosedResetsAdaptiveState(t *testing.T) { + ctx := context.Background() + o := OpenerFactory(ConfigureAdaptive{ + ConfigureOpener: hystrix.ConfigureOpener{ + RequestVolumeThreshold: 3, + ErrorThresholdPercentage: 50, + NumBuckets: 10, + RollingDuration: 10 * time.Second, + }, + })().(*AdaptiveOpener) + now := time.Now() + o.ErrTimeout(ctx, now, time.Millisecond) + if o.ExtraLatency() <= 0 { + t.Fatal("expected extra after timeout") + } + o.Closed(ctx, now) + if o.ExtraLatency() != 0 { + t.Fatalf("Closed should reset extra, got %v", o.ExtraLatency()) + } +} + +// TestCircuit_AdaptiveVsPlainHystrix_OpenerBehavior drives the real circuit Execute path and +// checks that plain Hystrix opens on a timeout-only burst while the adaptive opener stays closed. +func TestCircuit_AdaptiveVsPlainHystrix_OpenerBehavior(t *testing.T) { + ctx := context.Background() + opener := hystrix.ConfigureOpener{ + RequestVolumeThreshold: 3, + ErrorThresholdPercentage: 50, + NumBuckets: 10, + RollingDuration: 10 * time.Second, + } + + runTimeoutBurst := func(factory func() circuit.ClosedToOpen) bool { + cfg := circuit.Config{ + General: circuit.GeneralConfig{ + ClosedToOpenFactory: factory, + OpenToClosedFactory: hystrix.CloserFactory(hystrix.ConfigureCloser{ + SleepWindow: time.Hour, + }), + }, + Execution: circuit.ExecutionConfig{ + Timeout: 5 * time.Millisecond, + }, + } + c := circuit.NewCircuitFromConfig("opener-behavior", cfg) + for i := 0; i < 3; i++ { + _ = c.Execute(ctx, func(ctx context.Context) error { + select { + case <-time.After(50 * time.Millisecond): + return nil + case <-ctx.Done(): + return ctx.Err() + } + }, nil) + } + return c.IsOpen() + } + + t.Run("plainHystrixOpens", func(t *testing.T) { + opened := runTimeoutBurst(hystrix.OpenerFactory(opener)) + if !opened { + t.Fatal("expected plain Hystrix opener to open the circuit after three timeouts with volume=3 and 100% errors") + } + }) + + t.Run("adaptiveStaysClosed", func(t *testing.T) { + opened := runTimeoutBurst(OpenerFactory(ConfigureAdaptive{ + ConfigureOpener: opener, + MinTimeoutRatioToDefer: 0.85, + })) + if opened { + t.Fatal("expected adaptive opener to defer opening while failures are timeout-heavy and headroom is non-zero") + } + }) +} From f6fe654b3cd0317eea9955b7448c558d98c1110a Mon Sep 17 00:00:00 2001 From: Alexandr Rutkowski Date: Sun, 12 Apr 2026 12:31:23 +0200 Subject: [PATCH 04/10] Add example and comments for implementation for hystrix adaptive --- closers/hystrix-adaptive/adaptive.go | 26 ++++++-- closers/hystrix-adaptive/config.go | 10 ++-- closers/hystrix-adaptive/doc.go | 8 ++- closers/hystrix-adaptive/example_test.go | 75 ++++++++++++++++++++++++ 4 files changed, 107 insertions(+), 12 deletions(-) create mode 100644 closers/hystrix-adaptive/example_test.go diff --git a/closers/hystrix-adaptive/adaptive.go b/closers/hystrix-adaptive/adaptive.go index 1ac201e..a5957b1 100644 --- a/closers/hystrix-adaptive/adaptive.go +++ b/closers/hystrix-adaptive/adaptive.go @@ -26,15 +26,19 @@ type AdaptiveOpener struct { failureCount faststats.RollingCounter } -var _ circuit.ClosedToOpen = (*AdaptiveOpener)(nil) +// Compile-time assertions that AdaptiveOpener implements circuit.ClosedToOpen and json.Marshaler +var ( + _ circuit.ClosedToOpen = (*AdaptiveOpener)(nil) + _ json.Marshaler = (*AdaptiveOpener)(nil) +) // OpenerFactory returns a ClosedToOpen factory that wraps hystrix.OpenerFactory. func OpenerFactory(config ConfigureAdaptive) func() circuit.ClosedToOpen { return func() circuit.ClosedToOpen { cfg := config cfg.Merge(defaultConfigureAdaptive) - inner := hystrix.OpenerFactory(cfg.ConfigureOpener)().(*hystrix.Opener) - a := &AdaptiveOpener{Opener: inner} + opener := hystrix.OpenerFactory(cfg.ConfigureOpener)().(*hystrix.Opener) + a := &AdaptiveOpener{Opener: opener} a.setConfigNotThreadSafeLocked(cfg) return a } @@ -63,20 +67,24 @@ func (a *AdaptiveOpener) ShouldOpen(ctx context.Context, now time.Time) bool { return true } +// Prevent delegates to the Hystrix opener func (a *AdaptiveOpener) Prevent(ctx context.Context, now time.Time) bool { return a.Opener.Prevent(ctx, now) } +// Closed resets the adaptive state and delegates to the Hystrix opener func (a *AdaptiveOpener) Closed(ctx context.Context, now time.Time) { a.Opener.Closed(ctx, now) a.resetAdaptive(now) } +// Opened resets the adaptive state and delegates to the Hystrix opener func (a *AdaptiveOpener) Opened(ctx context.Context, now time.Time) { a.Opener.Opened(ctx, now) a.resetAdaptive(now) } +// Success adjusts the adaptive headroom and delegates to the Hystrix opener func (a *AdaptiveOpener) Success(ctx context.Context, now time.Time, d time.Duration) { a.Opener.Success(ctx, now, d) a.mu.Lock() @@ -84,14 +92,17 @@ func (a *AdaptiveOpener) Success(ctx context.Context, now time.Time, d time.Dura a.adjustExtraOnSuccessLocked(d) } +// ErrBadRequest delegates to the Hystrix opener func (a *AdaptiveOpener) ErrBadRequest(ctx context.Context, now time.Time, d time.Duration) { a.Opener.ErrBadRequest(ctx, now, d) } +// ErrInterrupt delegates to the Hystrix opener func (a *AdaptiveOpener) ErrInterrupt(ctx context.Context, now time.Time, d time.Duration) { a.Opener.ErrInterrupt(ctx, now, d) } +// ErrFailure increases the failure count and delegates to the Hystrix opener func (a *AdaptiveOpener) ErrFailure(ctx context.Context, now time.Time, d time.Duration) { a.Opener.ErrFailure(ctx, now, d) a.mu.Lock() @@ -99,6 +110,7 @@ func (a *AdaptiveOpener) ErrFailure(ctx context.Context, now time.Time, d time.D a.failureCount.Inc(now) } +// ErrTimeout increases the timeout count and delegates to the Hystrix opener func (a *AdaptiveOpener) ErrTimeout(ctx context.Context, now time.Time, d time.Duration) { a.Opener.ErrTimeout(ctx, now, d) a.mu.Lock() @@ -107,14 +119,17 @@ func (a *AdaptiveOpener) ErrTimeout(ctx context.Context, now time.Time, d time.D a.bumpExtraLocked() } +// ErrConcurrencyLimitReject delegates to the Hystrix opener func (a *AdaptiveOpener) ErrConcurrencyLimitReject(ctx context.Context, now time.Time) { a.Opener.ErrConcurrencyLimitReject(ctx, now) } +// ErrShortCircuit delegates to the Hystrix opener func (a *AdaptiveOpener) ErrShortCircuit(ctx context.Context, now time.Time) { a.Opener.ErrShortCircuit(ctx, now) } +// adjustExtraOnSuccessLocked adjusts the adaptive headroom based on the success duration func (a *AdaptiveOpener) adjustExtraOnSuccessLocked(d time.Duration) { base := a.config.BaselineLatency maxE := a.config.MaxExtraLatency @@ -133,6 +148,7 @@ func (a *AdaptiveOpener) adjustExtraOnSuccessLocked(d time.Duration) { } } +// bumpExtraLocked increases the adaptive headroom based on the increase extra func (a *AdaptiveOpener) bumpExtraLocked() { maxE := a.config.MaxExtraLatency a.extra += a.config.IncreaseExtra @@ -141,6 +157,7 @@ func (a *AdaptiveOpener) bumpExtraLocked() { } } +// resetAdaptive resets the adaptive state func (a *AdaptiveOpener) resetAdaptive(now time.Time) { a.mu.Lock() defer a.mu.Unlock() @@ -163,6 +180,7 @@ func (a *AdaptiveOpener) SetConfigNotThreadSafe(props ConfigureAdaptive) { a.setConfigNotThreadSafeLocked(props) } +// setConfigNotThreadSafeLocked sets the adaptive configuration and reinitializes rolling windows func (a *AdaptiveOpener) setConfigNotThreadSafeLocked(props ConfigureAdaptive) { props.Merge(defaultConfigureAdaptive) a.mu.Lock() @@ -209,5 +227,3 @@ func (a *AdaptiveOpener) MarshalJSON() ([]byte, error) { "failures": &a.failureCount, }) } - -var _ json.Marshaler = (*AdaptiveOpener)(nil) diff --git a/closers/hystrix-adaptive/config.go b/closers/hystrix-adaptive/config.go index 8cdeaaf..556ddea 100644 --- a/closers/hystrix-adaptive/config.go +++ b/closers/hystrix-adaptive/config.go @@ -45,6 +45,7 @@ func (c *ConfigureAdaptive) Merge(other ConfigureAdaptive) { } } +// defaultConfigureAdaptive is the default configuration for the adaptive opener var defaultConfigureAdaptive = ConfigureAdaptive{ ConfigureOpener: hystrix.ConfigureOpener{ RequestVolumeThreshold: 20, @@ -63,6 +64,7 @@ var defaultConfigureAdaptive = ConfigureAdaptive{ // Factory builds circuit configs that use the adaptive opener with optional hystrix closer wiring. type Factory struct { hystrix.Factory + ConfigureAdaptive ConfigureAdaptive CreateConfigureAdaptive []func(circuitName string) ConfigureAdaptive } @@ -70,11 +72,11 @@ type Factory struct { // Configure returns a circuit.Config with adaptive ClosedToOpen and hystrix OpenToClosed. func (f *Factory) Configure(circuitName string) circuit.Config { cfg := f.Factory.Configure(circuitName) - final := ConfigureAdaptive{} + adaptiveCfg := ConfigureAdaptive{} for i := len(f.CreateConfigureAdaptive) - 1; i >= 0; i-- { - final.Merge(f.CreateConfigureAdaptive[i](circuitName)) + adaptiveCfg.Merge(f.CreateConfigureAdaptive[i](circuitName)) } - final.Merge(f.ConfigureAdaptive) - cfg.General.ClosedToOpenFactory = OpenerFactory(final) + adaptiveCfg.Merge(f.ConfigureAdaptive) + cfg.General.ClosedToOpenFactory = OpenerFactory(adaptiveCfg) return cfg } diff --git a/closers/hystrix-adaptive/doc.go b/closers/hystrix-adaptive/doc.go index 7dc01dd..b83d5a6 100644 --- a/closers/hystrix-adaptive/doc.go +++ b/closers/hystrix-adaptive/doc.go @@ -1,4 +1,6 @@ -// Package hystrixadaptive provides a ClosedToOpen implementation that composes the -// standard Hystrix opener and adapts open decisions when failures are dominated by -// timeouts while extra latency headroom is elevated (ambient slowness). +/* +Package hystrixadaptive provides a ClosedToOpen implementation that composes the +standard Hystrix opener and adapts open decisions when failures are dominated by +timeouts while extra latency headroom is elevated (ambient slowness). +*/ package hystrixadaptive diff --git a/closers/hystrix-adaptive/example_test.go b/closers/hystrix-adaptive/example_test.go new file mode 100644 index 0000000..af1bf5f --- /dev/null +++ b/closers/hystrix-adaptive/example_test.go @@ -0,0 +1,75 @@ +package hystrixadaptive_test + +import ( + "fmt" + "time" + + "github.com/cep21/circuit/v4" + "github.com/cep21/circuit/v4/closers/hystrix" + hystrixadaptive "github.com/cep21/circuit/v4/closers/hystrix-adaptive" +) + +// This example wires a manager with the adaptive Hystrix opener: same rolling error logic as +// closers/hystrix, plus adaptive ShouldOpen behavior (see package doc). +func ExampleFactory() { + configuration := hystrixadaptive.Factory{ + Factory: hystrix.Factory{ + ConfigureOpener: hystrix.ConfigureOpener{ + RequestVolumeThreshold: 10, + }, + ConfigureCloser: hystrix.ConfigureCloser{}, + }, + ConfigureAdaptive: hystrixadaptive.ConfigureAdaptive{ + BaselineLatency: 100 * time.Millisecond, + MaxExtraLatency: 200 * time.Millisecond, + IncreaseExtra: 10 * time.Millisecond, + DecreaseExtra: 10 * time.Millisecond, + MinTimeoutRatioToDefer: 0.85, + }, + } + h := circuit.Manager{ + DefaultCircuitProperties: []circuit.CommandPropertiesConstructor{configuration.Configure}, + } + c := h.MustCreateCircuit("adaptive-hystrix") + fmt.Println("circuit:", c.Name()) + // Output: + // circuit: adaptive-hystrix +} + +// You can use OpenerFactory directly when you build a [circuit.Config] yourself and pair it +// with [hystrix.CloserFactory] (or another OpenToClosed implementation). +func ExampleOpenerFactory() { + cfg := circuit.Config{ + General: circuit.GeneralConfig{ + ClosedToOpenFactory: hystrixadaptive.OpenerFactory(hystrixadaptive.ConfigureAdaptive{ + ConfigureOpener: hystrix.ConfigureOpener{ + RequestVolumeThreshold: 10, + }, + }), + OpenToClosedFactory: hystrix.CloserFactory(hystrix.ConfigureCloser{}), + }, + } + c := circuit.NewCircuitFromConfig("custom-opener", cfg) + fmt.Println("circuit:", c.Name()) + // Output: + // circuit: custom-opener +} + +// Baseline latency and other adaptive fields can be updated at runtime together with the +// embedded Hystrix opener thresholds via [hystrixadaptive.AdaptiveOpener.SetConfigThreadSafe]. +func ExampleAdaptiveOpener_SetConfigThreadSafe() { + configuration := hystrixadaptive.Factory{} + h := circuit.Manager{ + DefaultCircuitProperties: []circuit.CommandPropertiesConstructor{configuration.Configure}, + } + c := h.MustCreateCircuit("adaptive-circuit") + ao := c.ClosedToOpen.(*hystrixadaptive.AdaptiveOpener) + fmt.Println("default baseline:", ao.Config().BaselineLatency) + ao.SetConfigThreadSafe(hystrixadaptive.ConfigureAdaptive{ + BaselineLatency: 50 * time.Millisecond, + }) + fmt.Println("new baseline:", ao.Config().BaselineLatency) + // Output: + // default baseline: 100ms + // new baseline: 50ms +} From 72ef794fc24f5681b4f7ba19fb3a8be8959c30da Mon Sep 17 00:00:00 2001 From: Alexandr Rutkowski Date: Sun, 12 Apr 2026 12:33:09 +0200 Subject: [PATCH 05/10] Prettify comments in hystrix adaptive --- closers/hystrix-adaptive/adaptive.go | 18 +++++++++--------- closers/hystrix-adaptive/adaptive_test.go | 4 ++-- closers/hystrix-adaptive/config.go | 18 +++++++++--------- closers/hystrix-adaptive/doc.go | 2 +- closers/hystrix-adaptive/example_test.go | 6 +++--- 5 files changed, 24 insertions(+), 24 deletions(-) diff --git a/closers/hystrix-adaptive/adaptive.go b/closers/hystrix-adaptive/adaptive.go index a5957b1..45bd304 100644 --- a/closers/hystrix-adaptive/adaptive.go +++ b/closers/hystrix-adaptive/adaptive.go @@ -12,14 +12,14 @@ import ( ) // AdaptiveOpener composes *hystrix.Opener and overrides ShouldOpen to avoid opening when -// recent failures are mostly timeouts during elevated latency headroom. +// recent failures are mostly timeouts during elevated latency headroom type AdaptiveOpener struct { Opener *hystrix.Opener mu sync.Mutex config ConfigureAdaptive - // extra is added to BaselineLatency when deciding if a success was "slow" and for ShouldOpen. + // extra is added to BaselineLatency when deciding if a success was "slow" and for ShouldOpen extra time.Duration timeoutCount faststats.RollingCounter @@ -32,7 +32,7 @@ var ( _ json.Marshaler = (*AdaptiveOpener)(nil) ) -// OpenerFactory returns a ClosedToOpen factory that wraps hystrix.OpenerFactory. +// OpenerFactory returns a ClosedToOpen factory that wraps hystrix.OpenerFactory func OpenerFactory(config ConfigureAdaptive) func() circuit.ClosedToOpen { return func() circuit.ClosedToOpen { cfg := config @@ -45,7 +45,7 @@ func OpenerFactory(config ConfigureAdaptive) func() circuit.ClosedToOpen { } // ShouldOpen delegates to the Hystrix opener, then may suppress opening when headroom is -// non-zero and rolling failures are predominantly timeouts (ambient slowness). +// non-zero and rolling failures are predominantly timeouts (ambient slowness) func (a *AdaptiveOpener) ShouldOpen(ctx context.Context, now time.Time) bool { if !a.Opener.ShouldOpen(ctx, now) { return false @@ -166,7 +166,7 @@ func (a *AdaptiveOpener) resetAdaptive(now time.Time) { a.failureCount.Reset(now) } -// SetConfigThreadSafe updates hystrix opener fields from ConfigureOpener. +// SetConfigThreadSafe updates hystrix opener fields from ConfigureOpener func (a *AdaptiveOpener) SetConfigThreadSafe(props ConfigureAdaptive) { props.Merge(defaultConfigureAdaptive) a.mu.Lock() @@ -175,7 +175,7 @@ func (a *AdaptiveOpener) SetConfigThreadSafe(props ConfigureAdaptive) { a.Opener.SetConfigThreadSafe(props.ConfigureOpener) } -// SetConfigNotThreadSafe reinitializes rolling windows for the adaptive split counters. +// SetConfigNotThreadSafe reinitializes rolling windows for the adaptive split counters func (a *AdaptiveOpener) SetConfigNotThreadSafe(props ConfigureAdaptive) { a.setConfigNotThreadSafeLocked(props) } @@ -201,21 +201,21 @@ func (a *AdaptiveOpener) setConfigNotThreadSafeLocked(props ConfigureAdaptive) { a.mu.Unlock() } -// Config returns the merged adaptive configuration (including embedded hystrix opener config). +// Config returns the merged adaptive configuration (including embedded hystrix opener config) func (a *AdaptiveOpener) Config() ConfigureAdaptive { a.mu.Lock() defer a.mu.Unlock() return a.config } -// ExtraLatency returns the current adaptive headroom on top of BaselineLatency. +// ExtraLatency returns the current adaptive headroom on top of BaselineLatency func (a *AdaptiveOpener) ExtraLatency() time.Duration { a.mu.Lock() defer a.mu.Unlock() return a.extra } -// MarshalJSON exposes opener state for debugging. +// MarshalJSON exposes opener state for debugging func (a *AdaptiveOpener) MarshalJSON() ([]byte, error) { a.mu.Lock() defer a.mu.Unlock() diff --git a/closers/hystrix-adaptive/adaptive_test.go b/closers/hystrix-adaptive/adaptive_test.go index f364df3..646b427 100644 --- a/closers/hystrix-adaptive/adaptive_test.go +++ b/closers/hystrix-adaptive/adaptive_test.go @@ -21,7 +21,7 @@ func TestAdaptiveOpener_TimeoutHeavyDefersOpen(t *testing.T) { }, MinTimeoutRatioToDefer: 0.85, })().(*AdaptiveOpener) - // Timestamps must be >= rolling window StartTime (set when the opener is constructed). + // Timestamps must be >= rolling window StartTime (set when the opener is constructed) now := time.Now() if o.ShouldOpen(ctx, now) { @@ -138,7 +138,7 @@ func TestAdaptiveOpener_ClosedResetsAdaptiveState(t *testing.T) { } // TestCircuit_AdaptiveVsPlainHystrix_OpenerBehavior drives the real circuit Execute path and -// checks that plain Hystrix opens on a timeout-only burst while the adaptive opener stays closed. +// checks that plain Hystrix opens on a timeout-only burst while the adaptive opener stays closed func TestCircuit_AdaptiveVsPlainHystrix_OpenerBehavior(t *testing.T) { ctx := context.Background() opener := hystrix.ConfigureOpener{ diff --git a/closers/hystrix-adaptive/config.go b/closers/hystrix-adaptive/config.go index 556ddea..6320256 100644 --- a/closers/hystrix-adaptive/config.go +++ b/closers/hystrix-adaptive/config.go @@ -7,25 +7,25 @@ import ( "github.com/cep21/circuit/v4/closers/hystrix" ) -// ConfigureAdaptive configures the adaptive opener and embeds hystrix.ConfigureOpener. +// ConfigureAdaptive configures the adaptive opener and embeds hystrix.ConfigureOpener type ConfigureAdaptive struct { hystrix.ConfigureOpener // BaselineLatency is the expected healthy latency (e.g. matches a strict timeout budget - // before adaptive headroom is applied). + // before adaptive headroom is applied) BaselineLatency time.Duration - // MaxExtraLatency caps how much additional headroom can accumulate (e.g. 200ms above baseline). + // MaxExtraLatency caps how much additional headroom can accumulate (e.g. 200ms above baseline) MaxExtraLatency time.Duration - // IncreaseExtra is added to the headroom when a run is slower than baseline+headroom or on timeout. + // IncreaseExtra is added to the headroom when a run is slower than baseline+headroom or on timeout IncreaseExtra time.Duration - // DecreaseExtra is subtracted from headroom on fast successes (duration below BaselineLatency). + // DecreaseExtra is subtracted from headroom on fast successes (duration below BaselineLatency) DecreaseExtra time.Duration // MinTimeoutRatioToDefer is the minimum rolling ratio of timeouts to (timeouts+failures) - // required before ShouldOpen defers to the inner opener when headroom is non-zero. + // required before ShouldOpen defers to the inner opener when headroom is non-zero MinTimeoutRatioToDefer float64 } -// Merge fills zero values from other. +// Merge fills zero values from other func (c *ConfigureAdaptive) Merge(other ConfigureAdaptive) { c.ConfigureOpener.Merge(other.ConfigureOpener) if c.BaselineLatency == 0 { @@ -61,7 +61,7 @@ var defaultConfigureAdaptive = ConfigureAdaptive{ MinTimeoutRatioToDefer: 0.85, } -// Factory builds circuit configs that use the adaptive opener with optional hystrix closer wiring. +// Factory builds circuit configs that use the adaptive opener with optional hystrix closer wiring type Factory struct { hystrix.Factory @@ -69,7 +69,7 @@ type Factory struct { CreateConfigureAdaptive []func(circuitName string) ConfigureAdaptive } -// Configure returns a circuit.Config with adaptive ClosedToOpen and hystrix OpenToClosed. +// Configure returns a circuit.Config with adaptive ClosedToOpen and hystrix OpenToClosed func (f *Factory) Configure(circuitName string) circuit.Config { cfg := f.Factory.Configure(circuitName) adaptiveCfg := ConfigureAdaptive{} diff --git a/closers/hystrix-adaptive/doc.go b/closers/hystrix-adaptive/doc.go index b83d5a6..2792632 100644 --- a/closers/hystrix-adaptive/doc.go +++ b/closers/hystrix-adaptive/doc.go @@ -1,6 +1,6 @@ /* Package hystrixadaptive provides a ClosedToOpen implementation that composes the standard Hystrix opener and adapts open decisions when failures are dominated by -timeouts while extra latency headroom is elevated (ambient slowness). +timeouts while extra latency headroom is elevated (ambient slowness) */ package hystrixadaptive diff --git a/closers/hystrix-adaptive/example_test.go b/closers/hystrix-adaptive/example_test.go index af1bf5f..ce0c023 100644 --- a/closers/hystrix-adaptive/example_test.go +++ b/closers/hystrix-adaptive/example_test.go @@ -10,7 +10,7 @@ import ( ) // This example wires a manager with the adaptive Hystrix opener: same rolling error logic as -// closers/hystrix, plus adaptive ShouldOpen behavior (see package doc). +// closers/hystrix, plus adaptive ShouldOpen behavior (see package doc) func ExampleFactory() { configuration := hystrixadaptive.Factory{ Factory: hystrix.Factory{ @@ -37,7 +37,7 @@ func ExampleFactory() { } // You can use OpenerFactory directly when you build a [circuit.Config] yourself and pair it -// with [hystrix.CloserFactory] (or another OpenToClosed implementation). +// with [hystrix.CloserFactory] (or another OpenToClosed implementation) func ExampleOpenerFactory() { cfg := circuit.Config{ General: circuit.GeneralConfig{ @@ -56,7 +56,7 @@ func ExampleOpenerFactory() { } // Baseline latency and other adaptive fields can be updated at runtime together with the -// embedded Hystrix opener thresholds via [hystrixadaptive.AdaptiveOpener.SetConfigThreadSafe]. +// embedded Hystrix opener thresholds via [hystrixadaptive.AdaptiveOpener.SetConfigThreadSafe] func ExampleAdaptiveOpener_SetConfigThreadSafe() { configuration := hystrixadaptive.Factory{} h := circuit.Manager{ From ec89fda089231147d0c5eacbca91be23a3336570 Mon Sep 17 00:00:00 2001 From: Alexandr Rutkowski Date: Sun, 12 Apr 2026 12:40:19 +0200 Subject: [PATCH 06/10] Update README.md --- README.md | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 91c3ba6..05578ce 100644 --- a/README.md +++ b/README.md @@ -142,13 +142,9 @@ fmt.Println("This is a hystrix configured circuit", c.Name()) // Output: This is a hystrix configured circuit hystrix-circuit ``` -## Adaptive Hystrix opener (`closers/hystrix-adaptive`) +## Adaptive Hystrix -The [hystrix](https://pkg.go.dev/github.com/cep21/circuit/v4/closers/hystrix) opener opens the circuit when error rate and volume pass configured thresholds. If the whole dependency slows down a little (for example every call hits your execution timeout), that can look like 100% errors even though the system is only degraded. The **`hystrixadaptive`** package composes the same `*hystrix.Opener` and makes **`ShouldOpen`** adaptive: it tracks extra latency **headroom** (min / max / step size on success and timeout) and, when headroom is non-zero, can **defer opening** while recent failures are mostly **timeouts** rather than application failures. - -This does **not** change **`Execution.Timeout`** on the circuit. You still set the run deadline in `circuit.ExecutionConfig` (and may raise it toward `BaselineLatency + MaxExtraLatency` by other means if you want slow requests to complete). The adaptive opener only changes when the breaker **opens** given the same Hystrix rolling error metrics. - -Use **`hystrixadaptive.Factory`** like **`hystrix.Factory`**: embed **`hystrix.Factory`** for the closer (and optional base opener defaults), and set **`ConfigureAdaptive`** for adaptive fields. The package name is **`hystrixadaptive`** (`import "github.com/cep21/circuit/v4/closers/hystrix-adaptive"`). +The [hystrix](https://pkg.go.dev/github.com/cep21/circuit/v4/closers/hystrix) opener trips the breaker when enough requests fail. That is what you want when the dependency is actually unhealthy—but when *everything* is simply a bit slow, many calls may time out and look like total failure even though nothing is uniquely broken. **`hystrixadaptive`** layers on top of the usual Hystrix opener: it can be more patient in that situation (mostly timeouts, not hard errors), give a little slack while the outage pattern looks like blanket slowness, and tighten again when fast successes return. You tune how much slack is allowed and how quickly it grows or shrinks. ```go configuration := hystrixadaptive.Factory{ @@ -181,8 +177,6 @@ fmt.Println("This circuit uses the adaptive Hystrix opener", c.Name()) // Output: This circuit uses the adaptive Hystrix opener adaptive-hystrix ``` -To wire only the opener (for example with a custom closer), use **`hystrixadaptive.OpenerFactory`** with **`hystrixadaptive.ConfigureAdaptive`** (which embeds **`hystrix.ConfigureOpener`**). At runtime you can type-assert **`ClosedToOpen`** to **`*hystrixadaptive.AdaptiveOpener`** and call **`ExtraLatency()`** or inspect **`Opener`** (`*hystrix.Opener`) if you need the inner metrics. - ## [Enable dashboard metrics](https://godoc.org/github.com/cep21/circuit/metriceventstream#example-MetricEventStream) Dashboard metrics can be enabled with the MetricEventStream object. This example creates an event stream handler, From 8945d7445f9b0ea56ee54132426bb8382fcb02ed Mon Sep 17 00:00:00 2001 From: Alexandr Rutkowski Date: Sun, 12 Apr 2026 12:45:39 +0200 Subject: [PATCH 07/10] Rename adaptive opener to opener bacause of package name --- closers/hystrix-adaptive/example_test.go | 12 ++-- .../{adaptive.go => opener.go} | 71 ++++++++++--------- .../{adaptive_test.go => opener_test.go} | 22 +++--- 3 files changed, 54 insertions(+), 51 deletions(-) rename closers/hystrix-adaptive/{adaptive.go => opener.go} (68%) rename closers/hystrix-adaptive/{adaptive_test.go => opener_test.go} (90%) diff --git a/closers/hystrix-adaptive/example_test.go b/closers/hystrix-adaptive/example_test.go index ce0c023..585a5b3 100644 --- a/closers/hystrix-adaptive/example_test.go +++ b/closers/hystrix-adaptive/example_test.go @@ -56,14 +56,10 @@ func ExampleOpenerFactory() { } // Baseline latency and other adaptive fields can be updated at runtime together with the -// embedded Hystrix opener thresholds via [hystrixadaptive.AdaptiveOpener.SetConfigThreadSafe] -func ExampleAdaptiveOpener_SetConfigThreadSafe() { - configuration := hystrixadaptive.Factory{} - h := circuit.Manager{ - DefaultCircuitProperties: []circuit.CommandPropertiesConstructor{configuration.Configure}, - } - c := h.MustCreateCircuit("adaptive-circuit") - ao := c.ClosedToOpen.(*hystrixadaptive.AdaptiveOpener) +// embedded Hystrix opener thresholds via [hystrixadaptive.Opener.SetConfigThreadSafe] +// Use [hystrixadaptive.NewOpener] when you need the concrete type without a type assertion +func ExampleOpener_SetConfigThreadSafe() { + ao := hystrixadaptive.NewOpener(hystrixadaptive.ConfigureAdaptive{}) fmt.Println("default baseline:", ao.Config().BaselineLatency) ao.SetConfigThreadSafe(hystrixadaptive.ConfigureAdaptive{ BaselineLatency: 50 * time.Millisecond, diff --git a/closers/hystrix-adaptive/adaptive.go b/closers/hystrix-adaptive/opener.go similarity index 68% rename from closers/hystrix-adaptive/adaptive.go rename to closers/hystrix-adaptive/opener.go index 45bd304..df8b3c2 100644 --- a/closers/hystrix-adaptive/adaptive.go +++ b/closers/hystrix-adaptive/opener.go @@ -11,9 +11,10 @@ import ( "github.com/cep21/circuit/v4/faststats" ) -// AdaptiveOpener composes *hystrix.Opener and overrides ShouldOpen to avoid opening when -// recent failures are mostly timeouts during elevated latency headroom -type AdaptiveOpener struct { +// Opener is the adaptive ClosedToOpen implementation: it wraps an inner *hystrix.Opener +// (field Opener) and overrides ShouldOpen to avoid opening when recent failures are mostly +// timeouts during elevated latency headroom +type Opener struct { Opener *hystrix.Opener mu sync.Mutex @@ -26,27 +27,33 @@ type AdaptiveOpener struct { failureCount faststats.RollingCounter } -// Compile-time assertions that AdaptiveOpener implements circuit.ClosedToOpen and json.Marshaler +// Compile-time assertions that Opener implements circuit.ClosedToOpen and json.Marshaler var ( - _ circuit.ClosedToOpen = (*AdaptiveOpener)(nil) - _ json.Marshaler = (*AdaptiveOpener)(nil) + _ circuit.ClosedToOpen = (*Opener)(nil) + _ json.Marshaler = (*Opener)(nil) ) +// NewOpener returns a new Opener with defaults merged into config. +// It is the same implementation as the value returned from OpenerFactory(config)(). +func NewOpener(config ConfigureAdaptive) *Opener { + cfg := config + cfg.Merge(defaultConfigureAdaptive) + inner := hystrix.OpenerFactory(cfg.ConfigureOpener)().(*hystrix.Opener) + a := &Opener{Opener: inner} + a.setConfigNotThreadSafeLocked(cfg) + return a +} + // OpenerFactory returns a ClosedToOpen factory that wraps hystrix.OpenerFactory func OpenerFactory(config ConfigureAdaptive) func() circuit.ClosedToOpen { return func() circuit.ClosedToOpen { - cfg := config - cfg.Merge(defaultConfigureAdaptive) - opener := hystrix.OpenerFactory(cfg.ConfigureOpener)().(*hystrix.Opener) - a := &AdaptiveOpener{Opener: opener} - a.setConfigNotThreadSafeLocked(cfg) - return a + return NewOpener(config) } } // ShouldOpen delegates to the Hystrix opener, then may suppress opening when headroom is // non-zero and rolling failures are predominantly timeouts (ambient slowness) -func (a *AdaptiveOpener) ShouldOpen(ctx context.Context, now time.Time) bool { +func (a *Opener) ShouldOpen(ctx context.Context, now time.Time) bool { if !a.Opener.ShouldOpen(ctx, now) { return false } @@ -68,24 +75,24 @@ func (a *AdaptiveOpener) ShouldOpen(ctx context.Context, now time.Time) bool { } // Prevent delegates to the Hystrix opener -func (a *AdaptiveOpener) Prevent(ctx context.Context, now time.Time) bool { +func (a *Opener) Prevent(ctx context.Context, now time.Time) bool { return a.Opener.Prevent(ctx, now) } // Closed resets the adaptive state and delegates to the Hystrix opener -func (a *AdaptiveOpener) Closed(ctx context.Context, now time.Time) { +func (a *Opener) Closed(ctx context.Context, now time.Time) { a.Opener.Closed(ctx, now) a.resetAdaptive(now) } // Opened resets the adaptive state and delegates to the Hystrix opener -func (a *AdaptiveOpener) Opened(ctx context.Context, now time.Time) { +func (a *Opener) Opened(ctx context.Context, now time.Time) { a.Opener.Opened(ctx, now) a.resetAdaptive(now) } // Success adjusts the adaptive headroom and delegates to the Hystrix opener -func (a *AdaptiveOpener) Success(ctx context.Context, now time.Time, d time.Duration) { +func (a *Opener) Success(ctx context.Context, now time.Time, d time.Duration) { a.Opener.Success(ctx, now, d) a.mu.Lock() defer a.mu.Unlock() @@ -93,17 +100,17 @@ func (a *AdaptiveOpener) Success(ctx context.Context, now time.Time, d time.Dura } // ErrBadRequest delegates to the Hystrix opener -func (a *AdaptiveOpener) ErrBadRequest(ctx context.Context, now time.Time, d time.Duration) { +func (a *Opener) ErrBadRequest(ctx context.Context, now time.Time, d time.Duration) { a.Opener.ErrBadRequest(ctx, now, d) } // ErrInterrupt delegates to the Hystrix opener -func (a *AdaptiveOpener) ErrInterrupt(ctx context.Context, now time.Time, d time.Duration) { +func (a *Opener) ErrInterrupt(ctx context.Context, now time.Time, d time.Duration) { a.Opener.ErrInterrupt(ctx, now, d) } // ErrFailure increases the failure count and delegates to the Hystrix opener -func (a *AdaptiveOpener) ErrFailure(ctx context.Context, now time.Time, d time.Duration) { +func (a *Opener) ErrFailure(ctx context.Context, now time.Time, d time.Duration) { a.Opener.ErrFailure(ctx, now, d) a.mu.Lock() defer a.mu.Unlock() @@ -111,7 +118,7 @@ func (a *AdaptiveOpener) ErrFailure(ctx context.Context, now time.Time, d time.D } // ErrTimeout increases the timeout count and delegates to the Hystrix opener -func (a *AdaptiveOpener) ErrTimeout(ctx context.Context, now time.Time, d time.Duration) { +func (a *Opener) ErrTimeout(ctx context.Context, now time.Time, d time.Duration) { a.Opener.ErrTimeout(ctx, now, d) a.mu.Lock() defer a.mu.Unlock() @@ -120,17 +127,17 @@ func (a *AdaptiveOpener) ErrTimeout(ctx context.Context, now time.Time, d time.D } // ErrConcurrencyLimitReject delegates to the Hystrix opener -func (a *AdaptiveOpener) ErrConcurrencyLimitReject(ctx context.Context, now time.Time) { +func (a *Opener) ErrConcurrencyLimitReject(ctx context.Context, now time.Time) { a.Opener.ErrConcurrencyLimitReject(ctx, now) } // ErrShortCircuit delegates to the Hystrix opener -func (a *AdaptiveOpener) ErrShortCircuit(ctx context.Context, now time.Time) { +func (a *Opener) ErrShortCircuit(ctx context.Context, now time.Time) { a.Opener.ErrShortCircuit(ctx, now) } // adjustExtraOnSuccessLocked adjusts the adaptive headroom based on the success duration -func (a *AdaptiveOpener) adjustExtraOnSuccessLocked(d time.Duration) { +func (a *Opener) adjustExtraOnSuccessLocked(d time.Duration) { base := a.config.BaselineLatency maxE := a.config.MaxExtraLatency effectiveSlow := base + a.extra @@ -149,7 +156,7 @@ func (a *AdaptiveOpener) adjustExtraOnSuccessLocked(d time.Duration) { } // bumpExtraLocked increases the adaptive headroom based on the increase extra -func (a *AdaptiveOpener) bumpExtraLocked() { +func (a *Opener) bumpExtraLocked() { maxE := a.config.MaxExtraLatency a.extra += a.config.IncreaseExtra if a.extra > maxE { @@ -158,7 +165,7 @@ func (a *AdaptiveOpener) bumpExtraLocked() { } // resetAdaptive resets the adaptive state -func (a *AdaptiveOpener) resetAdaptive(now time.Time) { +func (a *Opener) resetAdaptive(now time.Time) { a.mu.Lock() defer a.mu.Unlock() a.extra = 0 @@ -167,7 +174,7 @@ func (a *AdaptiveOpener) resetAdaptive(now time.Time) { } // SetConfigThreadSafe updates hystrix opener fields from ConfigureOpener -func (a *AdaptiveOpener) SetConfigThreadSafe(props ConfigureAdaptive) { +func (a *Opener) SetConfigThreadSafe(props ConfigureAdaptive) { props.Merge(defaultConfigureAdaptive) a.mu.Lock() a.config = props @@ -176,12 +183,12 @@ func (a *AdaptiveOpener) SetConfigThreadSafe(props ConfigureAdaptive) { } // SetConfigNotThreadSafe reinitializes rolling windows for the adaptive split counters -func (a *AdaptiveOpener) SetConfigNotThreadSafe(props ConfigureAdaptive) { +func (a *Opener) SetConfigNotThreadSafe(props ConfigureAdaptive) { a.setConfigNotThreadSafeLocked(props) } // setConfigNotThreadSafeLocked sets the adaptive configuration and reinitializes rolling windows -func (a *AdaptiveOpener) setConfigNotThreadSafeLocked(props ConfigureAdaptive) { +func (a *Opener) setConfigNotThreadSafeLocked(props ConfigureAdaptive) { props.Merge(defaultConfigureAdaptive) a.mu.Lock() a.config = props @@ -202,21 +209,21 @@ func (a *AdaptiveOpener) setConfigNotThreadSafeLocked(props ConfigureAdaptive) { } // Config returns the merged adaptive configuration (including embedded hystrix opener config) -func (a *AdaptiveOpener) Config() ConfigureAdaptive { +func (a *Opener) Config() ConfigureAdaptive { a.mu.Lock() defer a.mu.Unlock() return a.config } // ExtraLatency returns the current adaptive headroom on top of BaselineLatency -func (a *AdaptiveOpener) ExtraLatency() time.Duration { +func (a *Opener) ExtraLatency() time.Duration { a.mu.Lock() defer a.mu.Unlock() return a.extra } // MarshalJSON exposes opener state for debugging -func (a *AdaptiveOpener) MarshalJSON() ([]byte, error) { +func (a *Opener) MarshalJSON() ([]byte, error) { a.mu.Lock() defer a.mu.Unlock() return json.Marshal(map[string]interface{}{ diff --git a/closers/hystrix-adaptive/adaptive_test.go b/closers/hystrix-adaptive/opener_test.go similarity index 90% rename from closers/hystrix-adaptive/adaptive_test.go rename to closers/hystrix-adaptive/opener_test.go index 646b427..611c7e3 100644 --- a/closers/hystrix-adaptive/adaptive_test.go +++ b/closers/hystrix-adaptive/opener_test.go @@ -10,7 +10,7 @@ import ( "github.com/cep21/circuit/v4/closers/hystrix" ) -func TestAdaptiveOpener_TimeoutHeavyDefersOpen(t *testing.T) { +func TestOpener_TimeoutHeavyDefersOpen(t *testing.T) { ctx := context.Background() o := OpenerFactory(ConfigureAdaptive{ ConfigureOpener: hystrix.ConfigureOpener{ @@ -20,7 +20,7 @@ func TestAdaptiveOpener_TimeoutHeavyDefersOpen(t *testing.T) { RollingDuration: 10 * time.Second, }, MinTimeoutRatioToDefer: 0.85, - })().(*AdaptiveOpener) + })().(*Opener) // Timestamps must be >= rolling window StartTime (set when the opener is constructed) now := time.Now() @@ -41,7 +41,7 @@ func TestAdaptiveOpener_TimeoutHeavyDefersOpen(t *testing.T) { } } -func TestAdaptiveOpener_FailuresStillOpen(t *testing.T) { +func TestOpener_FailuresStillOpen(t *testing.T) { ctx := context.Background() o := OpenerFactory(ConfigureAdaptive{ ConfigureOpener: hystrix.ConfigureOpener{ @@ -50,7 +50,7 @@ func TestAdaptiveOpener_FailuresStillOpen(t *testing.T) { NumBuckets: 10, RollingDuration: 10 * time.Second, }, - })().(*AdaptiveOpener) + })().(*Opener) now := time.Now() for i := 0; i < 3; i++ { @@ -61,8 +61,8 @@ func TestAdaptiveOpener_FailuresStillOpen(t *testing.T) { } } -func TestAdaptiveOpener_MarshalJSON(t *testing.T) { - o := OpenerFactory(ConfigureAdaptive{})().(*AdaptiveOpener) +func TestOpener_MarshalJSON(t *testing.T) { + o := OpenerFactory(ConfigureAdaptive{})().(*Opener) ctx := context.Background() now := time.Now() o.ErrTimeout(ctx, now, time.Second) @@ -84,13 +84,13 @@ func TestFactoryConfigure(t *testing.T) { }, } cfg := f.Configure("x") - ao := cfg.General.ClosedToOpenFactory().(*AdaptiveOpener) + ao := cfg.General.ClosedToOpenFactory().(*Opener) if ao.Config().ConfigureOpener.RequestVolumeThreshold != 7 { t.Fatalf("got threshold %d", ao.Config().ConfigureOpener.RequestVolumeThreshold) } } -func TestAdaptiveOpener_FastSuccessClearsExtraHeadroom(t *testing.T) { +func TestOpener_FastSuccessClearsExtraHeadroom(t *testing.T) { ctx := context.Background() o := OpenerFactory(ConfigureAdaptive{ ConfigureOpener: hystrix.ConfigureOpener{ @@ -103,7 +103,7 @@ func TestAdaptiveOpener_FastSuccessClearsExtraHeadroom(t *testing.T) { IncreaseExtra: 10 * time.Millisecond, DecreaseExtra: 10 * time.Millisecond, MaxExtraLatency: 200 * time.Millisecond, - })().(*AdaptiveOpener) + })().(*Opener) now := time.Now() o.ErrTimeout(ctx, now, 100*time.Millisecond) @@ -116,7 +116,7 @@ func TestAdaptiveOpener_FastSuccessClearsExtraHeadroom(t *testing.T) { } } -func TestAdaptiveOpener_ClosedResetsAdaptiveState(t *testing.T) { +func TestOpener_ClosedResetsAdaptiveState(t *testing.T) { ctx := context.Background() o := OpenerFactory(ConfigureAdaptive{ ConfigureOpener: hystrix.ConfigureOpener{ @@ -125,7 +125,7 @@ func TestAdaptiveOpener_ClosedResetsAdaptiveState(t *testing.T) { NumBuckets: 10, RollingDuration: 10 * time.Second, }, - })().(*AdaptiveOpener) + })().(*Opener) now := time.Now() o.ErrTimeout(ctx, now, time.Millisecond) if o.ExtraLatency() <= 0 { From 3febce970f9752e7ce4e468c79159d9e846d68a4 Mon Sep 17 00:00:00 2001 From: Alexandr Rutkowski Date: Sun, 12 Apr 2026 12:46:59 +0200 Subject: [PATCH 08/10] Update README.md section about hystric adaptive --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 05578ce..11ea1ed 100644 --- a/README.md +++ b/README.md @@ -142,9 +142,9 @@ fmt.Println("This is a hystrix configured circuit", c.Name()) // Output: This is a hystrix configured circuit hystrix-circuit ``` -## Adaptive Hystrix +## Adaptive Hystrix (`closers/hystrix-adaptive`) -The [hystrix](https://pkg.go.dev/github.com/cep21/circuit/v4/closers/hystrix) opener trips the breaker when enough requests fail. That is what you want when the dependency is actually unhealthy—but when *everything* is simply a bit slow, many calls may time out and look like total failure even though nothing is uniquely broken. **`hystrixadaptive`** layers on top of the usual Hystrix opener: it can be more patient in that situation (mostly timeouts, not hard errors), give a little slack while the outage pattern looks like blanket slowness, and tighten again when fast successes return. You tune how much slack is allowed and how quickly it grows or shrinks. +The [hystrix](https://pkg.go.dev/github.com/cep21/circuit/v4/closers/hystrix) opener trips the breaker when enough requests fail. That is what you want when the dependency is actually unhealthy—but when *everything* is simply a bit slow, many calls may time out and look like total failure even though nothing is uniquely broken. Package **`hystrixadaptive`** (`import "github.com/cep21/circuit/v4/closers/hystrix-adaptive"`) layers on top of the usual Hystrix opener: it can be more patient in that situation (mostly timeouts, not hard errors), give a little slack while the outage pattern looks like blanket slowness, and tighten again when fast successes return. You tune how much slack is allowed and how quickly it grows or shrinks. It only affects **when** the breaker opens; **per-call** deadlines are still whatever you set on **`Execution.Timeout`**. ```go configuration := hystrixadaptive.Factory{ From c9f1340e5e08fdb4ae9800e2213bb313cd35c396 Mon Sep 17 00:00:00 2001 From: Alexandr Rutkowski Date: Sun, 12 Apr 2026 14:18:12 +0200 Subject: [PATCH 09/10] Update comments and MaxExtraLatency logic in opener --- closers/hystrix-adaptive/config.go | 22 +++++----- closers/hystrix-adaptive/doc.go | 9 ++-- closers/hystrix-adaptive/example_test.go | 10 ++--- closers/hystrix-adaptive/opener.go | 54 +++++++++++------------- closers/hystrix-adaptive/opener_test.go | 31 +++++++++++++- 5 files changed, 73 insertions(+), 53 deletions(-) diff --git a/closers/hystrix-adaptive/config.go b/closers/hystrix-adaptive/config.go index 6320256..b71c355 100644 --- a/closers/hystrix-adaptive/config.go +++ b/closers/hystrix-adaptive/config.go @@ -7,25 +7,23 @@ import ( "github.com/cep21/circuit/v4/closers/hystrix" ) -// ConfigureAdaptive configures the adaptive opener and embeds hystrix.ConfigureOpener +// ConfigureAdaptive holds adaptive policy and embeds hystrix.ConfigureOpener; it does not set circuit execution timeouts type ConfigureAdaptive struct { hystrix.ConfigureOpener - // BaselineLatency is the expected healthy latency (e.g. matches a strict timeout budget - // before adaptive headroom is applied) + // BaselineLatency is the nominal fast path; successes below it decrease extra BaselineLatency time.Duration - // MaxExtraLatency caps how much additional headroom can accumulate (e.g. 200ms above baseline) + // MaxExtraLatency caps extra; slow-success threshold is roughly baseline+extra; at the cap, timeout deferral ends if inner ShouldOpen MaxExtraLatency time.Duration - // IncreaseExtra is added to the headroom when a run is slower than baseline+headroom or on timeout + // IncreaseExtra added to extra on each timeout and on each success slower than baseline+current extra IncreaseExtra time.Duration - // DecreaseExtra is subtracted from headroom on fast successes (duration below BaselineLatency) + // DecreaseExtra subtracted from extra when a success finishes faster than BaselineLatency DecreaseExtra time.Duration - // MinTimeoutRatioToDefer is the minimum rolling ratio of timeouts to (timeouts+failures) - // required before ShouldOpen defers to the inner opener when headroom is non-zero + // MinTimeoutRatioToDefer is the rolling timeouts/(timeouts+failures) above which ShouldOpen may defer while 0 < extra < MaxExtraLatency MinTimeoutRatioToDefer float64 } -// Merge fills zero values from other +// Merge copies missing fields from other func (c *ConfigureAdaptive) Merge(other ConfigureAdaptive) { c.ConfigureOpener.Merge(other.ConfigureOpener) if c.BaselineLatency == 0 { @@ -45,7 +43,7 @@ func (c *ConfigureAdaptive) Merge(other ConfigureAdaptive) { } } -// defaultConfigureAdaptive is the default configuration for the adaptive opener +// defaultConfigureAdaptive is the default adaptive configuration var defaultConfigureAdaptive = ConfigureAdaptive{ ConfigureOpener: hystrix.ConfigureOpener{ RequestVolumeThreshold: 20, @@ -61,7 +59,7 @@ var defaultConfigureAdaptive = ConfigureAdaptive{ MinTimeoutRatioToDefer: 0.85, } -// Factory builds circuit configs that use the adaptive opener with optional hystrix closer wiring +// Factory merges hystrix.Factory with adaptive configuration type Factory struct { hystrix.Factory @@ -69,7 +67,7 @@ type Factory struct { CreateConfigureAdaptive []func(circuitName string) ConfigureAdaptive } -// Configure returns a circuit.Config with adaptive ClosedToOpen and hystrix OpenToClosed +// Configure returns circuit.Config with adaptive ClosedToOpen from this factory func (f *Factory) Configure(circuitName string) circuit.Config { cfg := f.Factory.Configure(circuitName) adaptiveCfg := ConfigureAdaptive{} diff --git a/closers/hystrix-adaptive/doc.go b/closers/hystrix-adaptive/doc.go index 2792632..2482599 100644 --- a/closers/hystrix-adaptive/doc.go +++ b/closers/hystrix-adaptive/doc.go @@ -1,6 +1,9 @@ /* -Package hystrixadaptive provides a ClosedToOpen implementation that composes the -standard Hystrix opener and adapts open decisions when failures are dominated by -timeouts while extra latency headroom is elevated (ambient slowness) +Package hystrixadaptive implements ClosedToOpen by wrapping the Hystrix opener and +deferring trips when failures are mostly timeouts while headroom is below its cap + +Additive headroom "extra" sits on top of BaselineLatency; timeouts and slow successes +raise it (capped by MaxExtraLatency); successes faster than BaselineLatency lower it; +set per-request deadlines on circuit.Execution, not in this package */ package hystrixadaptive diff --git a/closers/hystrix-adaptive/example_test.go b/closers/hystrix-adaptive/example_test.go index 585a5b3..88b2b66 100644 --- a/closers/hystrix-adaptive/example_test.go +++ b/closers/hystrix-adaptive/example_test.go @@ -9,8 +9,7 @@ import ( hystrixadaptive "github.com/cep21/circuit/v4/closers/hystrix-adaptive" ) -// This example wires a manager with the adaptive Hystrix opener: same rolling error logic as -// closers/hystrix, plus adaptive ShouldOpen behavior (see package doc) +// ExampleFactory wires circuit.Manager with adaptive ClosedToOpen on top of hystrix.Factory func ExampleFactory() { configuration := hystrixadaptive.Factory{ Factory: hystrix.Factory{ @@ -36,8 +35,7 @@ func ExampleFactory() { // circuit: adaptive-hystrix } -// You can use OpenerFactory directly when you build a [circuit.Config] yourself and pair it -// with [hystrix.CloserFactory] (or another OpenToClosed implementation) +// ExampleOpenerFactory builds a [circuit.Config] with OpenerFactory and a Hystrix closer func ExampleOpenerFactory() { cfg := circuit.Config{ General: circuit.GeneralConfig{ @@ -55,9 +53,7 @@ func ExampleOpenerFactory() { // circuit: custom-opener } -// Baseline latency and other adaptive fields can be updated at runtime together with the -// embedded Hystrix opener thresholds via [hystrixadaptive.Opener.SetConfigThreadSafe] -// Use [hystrixadaptive.NewOpener] when you need the concrete type without a type assertion +// ExampleOpener_SetConfigThreadSafe updates adaptive fields via SetConfigThreadSafe; use NewOpener for a concrete *Opener func ExampleOpener_SetConfigThreadSafe() { ao := hystrixadaptive.NewOpener(hystrixadaptive.ConfigureAdaptive{}) fmt.Println("default baseline:", ao.Config().BaselineLatency) diff --git a/closers/hystrix-adaptive/opener.go b/closers/hystrix-adaptive/opener.go index df8b3c2..9e177fd 100644 --- a/closers/hystrix-adaptive/opener.go +++ b/closers/hystrix-adaptive/opener.go @@ -11,30 +11,26 @@ import ( "github.com/cep21/circuit/v4/faststats" ) -// Opener is the adaptive ClosedToOpen implementation: it wraps an inner *hystrix.Opener -// (field Opener) and overrides ShouldOpen to avoid opening when recent failures are mostly -// timeouts during elevated latency headroom +// Opener wraps hystrix.Opener and may defer ShouldOpen when failures are timeout-heavy and extra is under the cap type Opener struct { Opener *hystrix.Opener mu sync.Mutex config ConfigureAdaptive - // extra is added to BaselineLatency when deciding if a success was "slow" and for ShouldOpen + // extra is added to BaselineLatency for slow-success detection and ShouldOpen extra time.Duration timeoutCount faststats.RollingCounter failureCount faststats.RollingCounter } -// Compile-time assertions that Opener implements circuit.ClosedToOpen and json.Marshaler var ( _ circuit.ClosedToOpen = (*Opener)(nil) _ json.Marshaler = (*Opener)(nil) ) -// NewOpener returns a new Opener with defaults merged into config. -// It is the same implementation as the value returned from OpenerFactory(config)(). +// NewOpener merges defaults into config and returns a new Opener (same as OpenerFactory(config)()) func NewOpener(config ConfigureAdaptive) *Opener { cfg := config cfg.Merge(defaultConfigureAdaptive) @@ -44,15 +40,14 @@ func NewOpener(config ConfigureAdaptive) *Opener { return a } -// OpenerFactory returns a ClosedToOpen factory that wraps hystrix.OpenerFactory +// OpenerFactory wraps hystrix.OpenerFactory with adaptive behavior func OpenerFactory(config ConfigureAdaptive) func() circuit.ClosedToOpen { return func() circuit.ClosedToOpen { return NewOpener(config) } } -// ShouldOpen delegates to the Hystrix opener, then may suppress opening when headroom is -// non-zero and rolling failures are predominantly timeouts (ambient slowness) +// ShouldOpen defers only if inner wants open, extra is in (0, MaxExtraLatency), and timeout ratio is high enough func (a *Opener) ShouldOpen(ctx context.Context, now time.Time) bool { if !a.Opener.ShouldOpen(ctx, now) { return false @@ -62,8 +57,13 @@ func (a *Opener) ShouldOpen(ctx context.Context, now time.Time) bool { if a.extra <= 0 { return true } + if a.config.MaxExtraLatency > 0 && a.extra >= a.config.MaxExtraLatency { + return true + } + t := a.timeoutCount.RollingSumAt(now) f := a.failureCount.RollingSumAt(now) + if t+f == 0 { return true } @@ -74,24 +74,24 @@ func (a *Opener) ShouldOpen(ctx context.Context, now time.Time) bool { return true } -// Prevent delegates to the Hystrix opener +// Prevent forwards to the inner opener func (a *Opener) Prevent(ctx context.Context, now time.Time) bool { return a.Opener.Prevent(ctx, now) } -// Closed resets the adaptive state and delegates to the Hystrix opener +// Closed resets adaptive state and forwards to the inner opener func (a *Opener) Closed(ctx context.Context, now time.Time) { a.Opener.Closed(ctx, now) a.resetAdaptive(now) } -// Opened resets the adaptive state and delegates to the Hystrix opener +// Opened resets adaptive state and forwards to the inner opener func (a *Opener) Opened(ctx context.Context, now time.Time) { a.Opener.Opened(ctx, now) a.resetAdaptive(now) } -// Success adjusts the adaptive headroom and delegates to the Hystrix opener +// Success adjusts extra and forwards to the inner opener func (a *Opener) Success(ctx context.Context, now time.Time, d time.Duration) { a.Opener.Success(ctx, now, d) a.mu.Lock() @@ -99,17 +99,17 @@ func (a *Opener) Success(ctx context.Context, now time.Time, d time.Duration) { a.adjustExtraOnSuccessLocked(d) } -// ErrBadRequest delegates to the Hystrix opener +// ErrBadRequest forwards to the inner opener func (a *Opener) ErrBadRequest(ctx context.Context, now time.Time, d time.Duration) { a.Opener.ErrBadRequest(ctx, now, d) } -// ErrInterrupt delegates to the Hystrix opener +// ErrInterrupt forwards to the inner opener func (a *Opener) ErrInterrupt(ctx context.Context, now time.Time, d time.Duration) { a.Opener.ErrInterrupt(ctx, now, d) } -// ErrFailure increases the failure count and delegates to the Hystrix opener +// ErrFailure increments adaptive failure tally and forwards to the inner opener func (a *Opener) ErrFailure(ctx context.Context, now time.Time, d time.Duration) { a.Opener.ErrFailure(ctx, now, d) a.mu.Lock() @@ -117,7 +117,7 @@ func (a *Opener) ErrFailure(ctx context.Context, now time.Time, d time.Duration) a.failureCount.Inc(now) } -// ErrTimeout increases the timeout count and delegates to the Hystrix opener +// ErrTimeout increments adaptive timeout tally, bumps extra, and forwards to the inner opener func (a *Opener) ErrTimeout(ctx context.Context, now time.Time, d time.Duration) { a.Opener.ErrTimeout(ctx, now, d) a.mu.Lock() @@ -126,17 +126,16 @@ func (a *Opener) ErrTimeout(ctx context.Context, now time.Time, d time.Duration) a.bumpExtraLocked() } -// ErrConcurrencyLimitReject delegates to the Hystrix opener +// ErrConcurrencyLimitReject forwards to the inner opener func (a *Opener) ErrConcurrencyLimitReject(ctx context.Context, now time.Time) { a.Opener.ErrConcurrencyLimitReject(ctx, now) } -// ErrShortCircuit delegates to the Hystrix opener +// ErrShortCircuit forwards to the inner opener func (a *Opener) ErrShortCircuit(ctx context.Context, now time.Time) { a.Opener.ErrShortCircuit(ctx, now) } -// adjustExtraOnSuccessLocked adjusts the adaptive headroom based on the success duration func (a *Opener) adjustExtraOnSuccessLocked(d time.Duration) { base := a.config.BaselineLatency maxE := a.config.MaxExtraLatency @@ -155,7 +154,6 @@ func (a *Opener) adjustExtraOnSuccessLocked(d time.Duration) { } } -// bumpExtraLocked increases the adaptive headroom based on the increase extra func (a *Opener) bumpExtraLocked() { maxE := a.config.MaxExtraLatency a.extra += a.config.IncreaseExtra @@ -164,7 +162,6 @@ func (a *Opener) bumpExtraLocked() { } } -// resetAdaptive resets the adaptive state func (a *Opener) resetAdaptive(now time.Time) { a.mu.Lock() defer a.mu.Unlock() @@ -173,7 +170,7 @@ func (a *Opener) resetAdaptive(now time.Time) { a.failureCount.Reset(now) } -// SetConfigThreadSafe updates hystrix opener fields from ConfigureOpener +// SetConfigThreadSafe updates adaptive and Hystrix fields without rebuilding adaptive rolling counters func (a *Opener) SetConfigThreadSafe(props ConfigureAdaptive) { props.Merge(defaultConfigureAdaptive) a.mu.Lock() @@ -182,12 +179,11 @@ func (a *Opener) SetConfigThreadSafe(props ConfigureAdaptive) { a.Opener.SetConfigThreadSafe(props.ConfigureOpener) } -// SetConfigNotThreadSafe reinitializes rolling windows for the adaptive split counters +// SetConfigNotThreadSafe rebuilds rolling windows and resets extra; prefer when rolling parameters change func (a *Opener) SetConfigNotThreadSafe(props ConfigureAdaptive) { a.setConfigNotThreadSafeLocked(props) } -// setConfigNotThreadSafeLocked sets the adaptive configuration and reinitializes rolling windows func (a *Opener) setConfigNotThreadSafeLocked(props ConfigureAdaptive) { props.Merge(defaultConfigureAdaptive) a.mu.Lock() @@ -208,21 +204,21 @@ func (a *Opener) setConfigNotThreadSafeLocked(props ConfigureAdaptive) { a.mu.Unlock() } -// Config returns the merged adaptive configuration (including embedded hystrix opener config) +// Config returns the merged ConfigureAdaptive (including embedded hystrix config) func (a *Opener) Config() ConfigureAdaptive { a.mu.Lock() defer a.mu.Unlock() return a.config } -// ExtraLatency returns the current adaptive headroom on top of BaselineLatency +// ExtraLatency returns current extra headroom above BaselineLatency func (a *Opener) ExtraLatency() time.Duration { a.mu.Lock() defer a.mu.Unlock() return a.extra } -// MarshalJSON exposes opener state for debugging +// MarshalJSON is for debugging func (a *Opener) MarshalJSON() ([]byte, error) { a.mu.Lock() defer a.mu.Unlock() diff --git a/closers/hystrix-adaptive/opener_test.go b/closers/hystrix-adaptive/opener_test.go index 611c7e3..e519dc2 100644 --- a/closers/hystrix-adaptive/opener_test.go +++ b/closers/hystrix-adaptive/opener_test.go @@ -41,6 +41,34 @@ func TestOpener_TimeoutHeavyDefersOpen(t *testing.T) { } } +func TestOpener_MaxExtraLatencySaturatedAllowsOpen(t *testing.T) { + ctx := context.Background() + o := OpenerFactory(ConfigureAdaptive{ + ConfigureOpener: hystrix.ConfigureOpener{ + RequestVolumeThreshold: 3, + ErrorThresholdPercentage: 50, + NumBuckets: 10, + RollingDuration: 10 * time.Second, + }, + MinTimeoutRatioToDefer: 0.85, + MaxExtraLatency: 30 * time.Millisecond, + IncreaseExtra: 10 * time.Millisecond, + })().(*Opener) + now := time.Now() + for i := 0; i < 3; i++ { + o.ErrTimeout(ctx, now, time.Millisecond) + } + if got := o.ExtraLatency(); got != 30*time.Millisecond { + t.Fatalf("extra headroom: got %v want 30ms", got) + } + if !o.Opener.ShouldOpen(ctx, now) { + t.Fatal("inner opener should want to open") + } + if !o.ShouldOpen(ctx, now) { + t.Fatal("expected open when extra is saturated at MaxExtraLatency (timeout defer ends)") + } +} + func TestOpener_FailuresStillOpen(t *testing.T) { ctx := context.Background() o := OpenerFactory(ConfigureAdaptive{ @@ -137,8 +165,7 @@ func TestOpener_ClosedResetsAdaptiveState(t *testing.T) { } } -// TestCircuit_AdaptiveVsPlainHystrix_OpenerBehavior drives the real circuit Execute path and -// checks that plain Hystrix opens on a timeout-only burst while the adaptive opener stays closed +// TestCircuit_AdaptiveVsPlainHystrix_OpenerBehavior compares Execute timeout bursts: plain Hystrix opens, adaptive defers func TestCircuit_AdaptiveVsPlainHystrix_OpenerBehavior(t *testing.T) { ctx := context.Background() opener := hystrix.ConfigureOpener{ From 60b4ee201b0ad0b1feefa3db3fdd973e3d901fd5 Mon Sep 17 00:00:00 2001 From: Alexandr Rutkowski Date: Sun, 12 Apr 2026 14:42:25 +0200 Subject: [PATCH 10/10] Verify correctness of the hystrix-adaptive opener --- closers/hystrix-adaptive/opener.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/closers/hystrix-adaptive/opener.go b/closers/hystrix-adaptive/opener.go index 9e177fd..bf4a613 100644 --- a/closers/hystrix-adaptive/opener.go +++ b/closers/hystrix-adaptive/opener.go @@ -54,6 +54,7 @@ func (a *Opener) ShouldOpen(ctx context.Context, now time.Time) bool { } a.mu.Lock() defer a.mu.Unlock() + if a.extra <= 0 { return true } @@ -67,10 +68,12 @@ func (a *Opener) ShouldOpen(ctx context.Context, now time.Time) bool { if t+f == 0 { return true } + ratio := float64(t) / float64(t+f) if ratio >= a.config.MinTimeoutRatioToDefer { return false } + return true }