From 3e4bae90d4d92a053a3490ec9167cf02db4ee0aa Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Thu, 9 Apr 2026 22:56:31 +0200 Subject: [PATCH 1/8] feat: add latency-based node selection via SortBy and Latency comparator Add Configuration.SortBy and the Latency node comparator so callers can explicitly order or subset nodes by observed round-trip latency without hiding mutable network measurements behind implicit call behavior. - internal/stream: add NewMessageRouterWithLatency test helper - node.go: expand Node.Latency() doc with measurement limits; add Latency comparator (compatible with slices.SortFunc and Configuration.SortBy) - config.go: add Configuration.SortBy using stable sort - node_test.go: tests for Latency comparator ordering and tie-breaking - config_test.go: tests for SortBy including empty config and composition - doc/user-guide.md: new "Latency-Based Node Selection" section covering fast sub-configurations, re-sort frequency guidance, and measurement limits Fixes #320 --- config.go | 35 ++++++++++ config_test.go | 66 ++++++++++++++++++ doc/user-guide.md | 137 ++++++++++++++++++++++++++++++++++++++ internal/stream/router.go | 10 +++ node.go | 36 +++++++++- node_test.go | 62 +++++++++++++++++ 6 files changed, 343 insertions(+), 3 deletions(-) diff --git a/config.go b/config.go index 9d34363c..c36c9f90 100644 --- a/config.go +++ b/config.go @@ -197,6 +197,41 @@ func (c Configuration) Difference(other Configuration) Configuration { return c.Remove(other.NodeIDs()...) } +// SortBy returns a new Configuration with nodes ordered by the given comparator. +// The original configuration is not modified. +// +// Use this with the built-in node comparator functions [ID], [LastNodeError], +// and [Latency]: +// +// fastest := cfg.SortBy(gorums.Latency) // ascending by latency +// healthy := cfg.SortBy(gorums.LastNodeError) // no-error nodes first +// +// Comparators can be combined for multi-key ordering: +// +// cfg.SortBy(func(a, b *Node) int { +// if r := gorums.LastNodeError(a, b); r != 0 { +// return r +// } +// return gorums.Latency(a, b) +// }) +// +// SortBy uses a stable sort, so nodes with equal comparator values retain +// their original relative order. +// +// Note: quorum calls contact every node in the configuration regardless of +// order. Sorting only affects which nodes are selected when the result is +// sliced to a smaller subset, e.g., cfg.SortBy(gorums.Latency)[:quorumSize]. +// See the "Latency-Based Node Selection" section of the user guide for +// guidance on sub-configuration sizing and re-sort frequency. +func (c Configuration) SortBy(cmp func(*Node, *Node) int) Configuration { + if len(c) == 0 { + return nil + } + sorted := slices.Clone(c) + slices.SortStableFunc(sorted, cmp) + return sorted +} + // WithoutErrors returns a new Configuration excluding nodes that failed in the // given QuorumCallError. If specific error types are provided, only nodes whose // errors match one of those types (using errors.Is) will be excluded. diff --git a/config_test.go b/config_test.go index e934c8fd..d3de661a 100644 --- a/config_test.go +++ b/config_test.go @@ -241,6 +241,72 @@ func TestEmptyConfiguration(t *testing.T) { }) } +func TestConfigurationSortBy(t *testing.T) { + cfg, err := gorums.NewConfig(gorums.WithNodeList(nodes), gorums.InsecureDialOptions(t)) + if err != nil { + t.Fatal(err) + } + t.Cleanup(gorums.Closer(t, cfg)) + + t.Run("SortByID", func(t *testing.T) { + sorted := cfg.SortBy(gorums.ID) + if sorted.Size() != cfg.Size() { + t.Fatalf("sorted.Size() = %d, want %d", sorted.Size(), cfg.Size()) + } + for i := 1; i < sorted.Size(); i++ { + if sorted[i].ID() < sorted[i-1].ID() { + t.Errorf("SortBy(ID): not sorted at position %d (id %d < id %d)", + i, sorted[i].ID(), sorted[i-1].ID()) + } + } + }) + + t.Run("SortByLatency/AllUnmeasured", func(t *testing.T) { + // Fresh config: no calls yet, so all nodes report latency < 0. + for _, n := range cfg.Nodes() { + if n.Latency() >= 0 { + t.Fatalf("expected no latency measurement on fresh node, got %v", n.Latency()) + } + } + sorted := cfg.SortBy(gorums.Latency) + if sorted.Size() != cfg.Size() { + t.Fatalf("sorted.Size() = %d, want %d", sorted.Size(), cfg.Size()) + } + }) + + t.Run("ReturnsNewConfiguration", func(t *testing.T) { + sorted := cfg.SortBy(gorums.ID) + cfgSlice := cfg.Nodes() + sortedSlice := sorted.Nodes() + if len(cfgSlice) > 0 && len(sortedSlice) > 0 && &cfgSlice[0] == &sortedSlice[0] { + t.Error("SortBy returned same backing array — violates immutability") + } + }) + + t.Run("Empty/ReturnsNil", func(t *testing.T) { + var empty gorums.Configuration + if got := empty.SortBy(gorums.ID); got != nil { + t.Fatalf("empty.SortBy(ID) = %v, want nil", got) + } + if got := empty.SortBy(gorums.Latency); got != nil { + t.Fatalf("empty.SortBy(Latency) = %v, want nil", got) + } + }) + + t.Run("SortByLastNodeErrorThenLatency", func(t *testing.T) { + // Composing two comparators should not panic and must return a valid config. + sorted := cfg.SortBy(func(a, b *gorums.Node) int { + if r := gorums.LastNodeError(a, b); r != 0 { + return r + } + return gorums.Latency(a, b) + }) + if sorted.Size() != cfg.Size() { + t.Fatalf("composed sort size = %d, want %d", sorted.Size(), cfg.Size()) + } + }) +} + func assertPanicMessage(t *testing.T, want string, fn func()) { t.Helper() defer func() { diff --git a/doc/user-guide.md b/doc/user-guide.md index 12bfb9ab..0cb54907 100644 --- a/doc/user-guide.md +++ b/doc/user-guide.md @@ -1409,6 +1409,143 @@ func ExampleConfigClient() { } ``` +## Latency-Based Node Selection + +Gorums tracks the round-trip latency to each node as an exponentially weighted +moving average, accessible via `node.Latency()`. +This section explains how to use that information to construct faster +sub-configurations, and what to watch out for when doing so. + +### The Latency Comparator + +`gorums.Latency` is a comparator function compatible with `slices.SortFunc` and `Configuration.SortBy`. +The comparator orders nodes ascending by their current latency estimates; +nodes without any measurements (freshly created, never sent traffic) are sorted last. + +```go +// Sort all nodes by ascending latency. +sorted := cfg.SortBy(gorums.Latency) + +// Pick the two fastest nodes. +fast2 := cfg.SortBy(gorums.Latency)[:2] +``` + +Comparators can be chained for multi-key ordering. +For example, healthy nodes first, then by latency within each group: + +```go +sorted := cfg.SortBy(func(a, b *gorums.Node) int { + if r := gorums.LastNodeError(a, b); r != 0 { + return r + } + return gorums.Latency(a, b) +}) +``` + +### Using a Smaller Fast Configuration + +The most practical use of latency-based selection is reducing the quorum size to the fastest subset of nodes. +Sending to fewer nodes lowers tail latency without weakening correctness, as long as the subset still meets your quorum threshold. + +```go +const n = 5 // total nodes +const f = 1 // tolerated failures +quorumSize := (n+f)/2 + 1 // simple majority for crash-fault tolerance = 3 + +// Re-derive the fast sub-configuration periodically (see guidance below). +fastCfg := allNodesCfg.SortBy(gorums.Latency)[:quorumSize] +fastCfgCtx := fastCfg.Context(ctx) + +reply, err := ReadQC(fastCfgCtx, &ReadRequest{Key: "x"}).Majority() +``` + +Combining with error-based filtering is straightforward: drop failed nodes +first, then pick the fastest of those that remain: + +```go +var qcErr gorums.QuorumCallError +if errors.As(err, &qcErr) { + fastCfg = cfg.WithoutErrors(qcErr).SortBy(gorums.Latency)[:quorumSize] +} +``` + +> **Note:** For quorum calls, all nodes in the configuration are contacted — +> the ordering only matters when you slice the result to a subset. +> Sorting a full configuration without slicing has no effect on call latency. + +### How Often to Re-Sort + +`SortBy` returns a snapshot of the ordering at one point in time. +Latency measurements change as network conditions shift; the snapshot does not +auto-update. + +As a rule of thumb: + +* **Every few seconds** is a reasonable re-sort interval for most deployments. + A periodic goroutine or a lazy re-sort at the start of each request batch + both work well. +* **On every single call** is usually unnecessary and wastes allocations. + Each `SortBy` clones the node slice. +* **After a failed quorum call**, always re-evaluate: a node that caused the + failure should be excluded via `WithoutErrors` before re-sorting. +* **After a topology change** (node added or removed), derive the sub-configuration + from the new full configuration rather than sorting an outdated one. + +A simple periodic refresh pattern: + +```go +var ( + mu sync.Mutex + fastCfg gorums.Configuration +) + +// Refresh the fast sub-configuration every 5 seconds. +go func() { + for range time.Tick(5 * time.Second) { + fresh := allNodesCfg.SortBy(gorums.Latency)[:quorumSize] + mu.Lock() + fastCfg = fresh + mu.Unlock() + } +}() + +// Callers read the most recently refreshed configuration. +mu.Lock() +cfg := fastCfg +mu.Unlock() +cfgCtx := cfg.Context(ctx) +``` + +### Measurement Limits + +`Node.Latency()` has several limits that are worth understanding before building +on it: + +* **No traffic → no measurement.** The estimate is only updated on successful + responses. A freshly created node, or a node that has been idle for a long + time, returns a negative value. `SortBy(gorums.Latency)` pushes such nodes + to the end of the slice, so you will not accidentally pick an unmeasured node + when slicing the front. + +* **Staleness.** If traffic to a node stops, the estimate freezes at the last + observed value. A node that was fast yesterday but degraded overnight will + still look fast until new responses arrive and update the average. + +* **Slow convergence.** The moving average uses a smoothing factor of 0.2, so + a sudden step-change in latency takes roughly five round trips to be reflected + in the estimate. Short-lived spikes are smoothed away, which is usually + desirable but means the estimate lags behind rapid fluctuations. + +* **No variance information.** A single average cannot distinguish a stable + low-latency node from a high-variance node whose average happens to look good. + If jitter matters for your workload, the built-in estimate is not sufficient + on its own. + +* **Quorum calls contact all nodes.** Latency-based ordering only improves + performance when you slice the sorted configuration to a smaller subset. + Passing a full-size sorted configuration to a quorum call provides no speedup + because every node in the configuration is contacted regardless of order. + ## Interactive REPL The storage example (`examples/storage`) includes an interactive Read-Eval-Print Loop (REPL) that lets you send RPCs and quorum calls against live storage servers. diff --git a/internal/stream/router.go b/internal/stream/router.go index b871fd3a..6570d68d 100644 --- a/internal/stream/router.go +++ b/internal/stream/router.go @@ -66,6 +66,16 @@ func NewMessageRouter(handler ...RequestHandler) *MessageRouter { } } +// NewMessageRouterWithLatency creates a new MessageRouter with an initial latency +// for testing. The latency may be updated by subsequent message routing operations. +// This function should only be used in tests. +func NewMessageRouterWithLatency(latency time.Duration) *MessageRouter { + return &MessageRouter{ + pending: make(map[uint64]Request), + latency: latency, + } +} + // DispatchLocalRequest handles the request in-process for the local node, // bypassing the network. It delivers the request to the registered handler, // serializing execution the same way remote nodes do: the next dispatch is diff --git a/node.go b/node.go index 3b75e5ce..6dbeeaf4 100644 --- a/node.go +++ b/node.go @@ -254,20 +254,34 @@ func (n *Node) LastErr() error { return nil } -// Latency returns the latency between the client and this node. +// Latency returns the current round-trip latency estimate for this node, +// computed as an exponentially weighted moving average with a +// smoothing factor of 0.2 (roughly a 5-sample window). +// +// The returned value has several important limits: +// - It returns -1s until the first successful response is received; treat +// negative values as "no data" rather than a real measurement. +// - The estimate is only updated when there is active traffic. On an idle +// node the value may be arbitrarily stale and will not reflect recent +// changes in network conditions. +// - A step-change in latency takes several round trips to settle because +// each new sample contributes only 20% of the new value. +// +// Use [Latency] as a comparator with [Configuration.SortBy] to order nodes +// by their current observed latency. func (n *Node) Latency() time.Duration { return n.router.Latency() } // ID compares nodes by their identifier in increasing order. -// It is compatible with [slices.SortFunc] and related helpers. +// It is compatible with [slices.SortFunc] and [Configuration.SortBy]. var ID = func(a, b *Node) int { return cmp.Compare(a.id, b.id) } // LastNodeError compares nodes by their LastErr() status. // Nodes with no error sort before nodes with an error. -// It is compatible with [slices.SortFunc] and related helpers. +// It is compatible with [slices.SortFunc] and [Configuration.SortBy]. var LastNodeError = func(a, b *Node) int { aErr := a.LastErr() bErr := b.LastErr() @@ -281,5 +295,21 @@ var LastNodeError = func(a, b *Node) int { } } +// Latency compares nodes by their current latency estimate in ascending order. +// Nodes with no measurement yet (negative latency value) sort after nodes with a +// measurement. It is compatible with [slices.SortFunc] and [Configuration.SortBy]. +var Latency = func(a, b *Node) int { + la, lb := a.Latency(), b.Latency() + switch { + case la < 0 && lb < 0: + return 0 + case la < 0: + return 1 + case lb < 0: + return -1 + } + return cmp.Compare(la, lb) +} + // compile-time assertion for interface compliance. var _ stream.PeerNode = (*Node)(nil) diff --git a/node_test.go b/node_test.go index 325fb763..f3423687 100644 --- a/node_test.go +++ b/node_test.go @@ -21,6 +21,9 @@ func TestNodeSort(t *testing.T) { n.channel.Store(stream.NewChannelWithState(err)) return n } + makeNodeWithLatency := func(id uint32, lat time.Duration) *Node { + return &Node{id: id, router: stream.NewMessageRouterWithLatency(lat)} + } someErr := errors.New("some error") nodes := []*Node{ makeNode(100, nil), @@ -68,6 +71,65 @@ func TestNodeSort(t *testing.T) { } } }) + + t.Run("ByLatency", func(t *testing.T) { + // Node 3 has no measurement (-1s): should sort last. + // Remaining nodes sort ascending by latency. + ns := []*Node{ + makeNodeWithLatency(1, 30*time.Millisecond), + makeNodeWithLatency(2, 10*time.Millisecond), + makeNodeWithLatency(3, -1*time.Second), // no measurement + makeNodeWithLatency(4, 20*time.Millisecond), + } + slices.SortFunc(ns, Latency) + // Expected: 2 (10ms), 4 (20ms), 1 (30ms), 3 (no data). + wantIDs := []uint32{2, 4, 1, 3} + for i, n := range ns { + if n.id != wantIDs[i] { + t.Errorf("by latency: position %d: got id %d, want %d", i, n.id, wantIDs[i]) + printNodes(t, ns) + } + } + }) + + t.Run("ByLatency/AllUnmeasured", func(t *testing.T) { + // All nodes without measurements: stable order must be preserved. + ns := []*Node{ + makeNodeWithLatency(1, -1*time.Second), + makeNodeWithLatency(2, -1*time.Second), + makeNodeWithLatency(3, -1*time.Second), + } + slices.SortStableFunc(ns, Latency) + wantIDs := []uint32{1, 2, 3} + for i, n := range ns { + if n.id != wantIDs[i] { + t.Errorf("by latency (all unmeasured): position %d: got id %d, want %d", i, n.id, wantIDs[i]) + } + } + }) + + t.Run("ByLatencyThenID", func(t *testing.T) { + // Two nodes with the same latency: secondary sort by ID breaks ties. + ns := []*Node{ + makeNodeWithLatency(10, 20*time.Millisecond), + makeNodeWithLatency(5, 10*time.Millisecond), + makeNodeWithLatency(7, 20*time.Millisecond), + } + slices.SortFunc(ns, func(a, b *Node) int { + if r := Latency(a, b); r != 0 { + return r + } + return ID(a, b) + }) + // Expected: 5 (10ms), 7 (20ms, lower id), 10 (20ms, higher id). + wantIDs := []uint32{5, 7, 10} + for i, n := range ns { + if n.id != wantIDs[i] { + t.Errorf("by latency then id: position %d: got id %d, want %d", i, n.id, wantIDs[i]) + printNodes(t, ns) + } + } + }) } func printNodes(t *testing.T, nodes []*Node) { From 87a7ef4f9ac03b82efc800dc00ae0ab4eb6d45b9 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Fri, 10 Apr 2026 13:09:19 +0200 Subject: [PATCH 2/8] feat(config): add Watch method for channel-based configuration updates Introduces Configuration.Watch, a new method that starts a background goroutine to periodically re-derive a sub-configuration and emit it on a channel only when it differs from the previous result. The initial result is always emitted before the first tick, ensuring callers receive a valid configuration immediately. The returned channel has a 1-element buffer; if the consumer is slow, emissions are skipped and re-evaluated on the next tick. This replaces the bespoke mutex-based refresh pattern previously documented in the user guide. Examples are updated to show common use cases: latency-based top-k selection and error-filtering with latency sorting. Also adds SetLatency test helper to MessageRouter to support configuration watch tests via latency simulation. --- config.go | 52 +++++++++++++++++++++ doc/user-guide.md | 42 +++++++++-------- internal/stream/router.go | 10 ++++ node_test.go | 98 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 182 insertions(+), 20 deletions(-) diff --git a/config.go b/config.go index c36c9f90..cb80acd2 100644 --- a/config.go +++ b/config.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "slices" + "time" ) // Configuration represents a static set of nodes on which multicast or @@ -232,6 +233,57 @@ func (c Configuration) SortBy(cmp func(*Node, *Node) int) Configuration { return sorted } +// Watch starts a background goroutine that calls derive(c) every interval and +// emits the result on the returned channel whenever it differs from the previous +// result. The initial result is always emitted before the first tick, so callers +// always receive a valid configuration immediately. +// +// The derive function receives the full configuration c and returns any derived +// sub-configuration. Typical examples: +// +// // Latency-based top-k subset: +// cfg.Watch(ctx, 5*time.Second, func(c gorums.Configuration) gorums.Configuration { +// return c.SortBy(gorums.Latency)[:quorumSize] +// }) +// +// // Skip failed nodes first, then pick fastest: +// cfg.Watch(ctx, 5*time.Second, func(c gorums.Configuration) gorums.Configuration { +// return c.WithoutErrors(lastErr).SortBy(gorums.Latency)[:quorumSize] +// }) +// +// The returned channel has a buffer of 1. If the consumer is slow and has not +// yet read the previous update, the goroutine skips the emission and waits for +// the next tick to re-evaluate. +// +// The goroutine exits and the channel is closed when ctx is cancelled. +func (c Configuration) Watch(ctx context.Context, interval time.Duration, derive func(Configuration) Configuration) <-chan Configuration { + ch := make(chan Configuration, 1) + go func() { + defer close(ch) + current := derive(c) + ch <- current + + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + fresh := derive(c) + if !fresh.Equal(current) { + current = fresh + select { + case ch <- current: + default: // consumer hasn't read yet; next tick will re-evaluate + } + } + case <-ctx.Done(): + return + } + } + }() + return ch +} + // WithoutErrors returns a new Configuration excluding nodes that failed in the // given QuorumCallError. If specific error types are provided, only nodes whose // errors match one of those types (using errors.Is) will be excluded. diff --git a/doc/user-guide.md b/doc/user-guide.md index 0cb54907..3c20eac1 100644 --- a/doc/user-guide.md +++ b/doc/user-guide.md @@ -1491,31 +1491,33 @@ As a rule of thumb: * **After a topology change** (node added or removed), derive the sub-configuration from the new full configuration rather than sorting an outdated one. -A simple periodic refresh pattern: +A simple periodic refresh pattern using `Configuration.Watch`: ```go -var ( - mu sync.Mutex - fastCfg gorums.Configuration -) - -// Refresh the fast sub-configuration every 5 seconds. -go func() { - for range time.Tick(5 * time.Second) { - fresh := allNodesCfg.SortBy(gorums.Latency)[:quorumSize] - mu.Lock() - fastCfg = fresh - mu.Unlock() - } -}() +updates := allNodesCfg.Watch(ctx, 5*time.Second, func(c gorums.Configuration) gorums.Configuration { + return c.SortBy(gorums.Latency)[:quorumSize] +}) +fastCfg := <-updates // initial snapshot, available before the first tick -// Callers read the most recently refreshed configuration. -mu.Lock() -cfg := fastCfg -mu.Unlock() -cfgCtx := cfg.Context(ctx) +// In your request loop or a dedicated goroutine, consume updates as they arrive: +for cfg := range updates { + fastCfg = cfg // ordering changed; start using the new snapshot +} ``` +`Watch` calls the derive function every five seconds and emits a new +sub-configuration only when the result changes. The initial snapshot is sent +immediately, so callers always receive a valid configuration without waiting +for the first tick. + +The derive function can combine any operations on the configuration. For +example, to skip failed nodes before selecting the fastest: + +```go +updates := allNodesCfg.Watch(ctx, 5*time.Second, func(c gorums.Configuration) gorums.Configuration { + return c.WithoutErrors(lastErr).SortBy(gorums.Latency)[:quorumSize] +}) + ### Measurement Limits `Node.Latency()` has several limits that are worth understanding before building diff --git a/internal/stream/router.go b/internal/stream/router.go index 6570d68d..a57f68f9 100644 --- a/internal/stream/router.go +++ b/internal/stream/router.go @@ -69,6 +69,8 @@ func NewMessageRouter(handler ...RequestHandler) *MessageRouter { // NewMessageRouterWithLatency creates a new MessageRouter with an initial latency // for testing. The latency may be updated by subsequent message routing operations. // This function should only be used in tests. +// +// To change the latency after creation, use [MessageRouter.SetLatency]. func NewMessageRouterWithLatency(latency time.Duration) *MessageRouter { return &MessageRouter{ pending: make(map[uint64]Request), @@ -76,6 +78,14 @@ func NewMessageRouterWithLatency(latency time.Duration) *MessageRouter { } } +// SetLatency directly sets the latency estimate. This function should only +// be used in tests to simulate latency changes without actual message routing. +func (r *MessageRouter) SetLatency(latency time.Duration) { + r.mu.Lock() + defer r.mu.Unlock() + r.latency = latency +} + // DispatchLocalRequest handles the request in-process for the local node, // bypassing the network. It delivers the request to the registered handler, // serializing execution the same way remote nodes do: the next dispatch is diff --git a/node_test.go b/node_test.go index f3423687..57a12445 100644 --- a/node_test.go +++ b/node_test.go @@ -132,6 +132,104 @@ func TestNodeSort(t *testing.T) { }) } +func TestConfigurationWatch(t *testing.T) { + makeNodeWithLatency := func(id uint32, lat time.Duration) *Node { + return &Node{id: id, router: stream.NewMessageRouterWithLatency(lat)} + } + + // allNodes has five nodes; top-3 by ascending latency are 2(10ms), 3(20ms), 1(30ms). + allNodes := Configuration{ + makeNodeWithLatency(1, 30*time.Millisecond), + makeNodeWithLatency(2, 10*time.Millisecond), + makeNodeWithLatency(3, 20*time.Millisecond), + makeNodeWithLatency(4, 40*time.Millisecond), + makeNodeWithLatency(5, 50*time.Millisecond), + } + const quorumSize = 3 + fastTop3 := func(c Configuration) Configuration { return c.SortBy(Latency)[:quorumSize] } + + t.Run("EmitsInitialSnapshot", func(t *testing.T) { + // Use a very long interval so only the initial emission fires. + updates := allNodes.Watch(t.Context(), time.Hour, fastTop3) + snap := <-updates + if len(snap) != quorumSize { + t.Fatalf("initial snapshot size = %d, want %d", len(snap), quorumSize) + } + wantIDs := []uint32{2, 3, 1} + for i, n := range snap { + if n.ID() != wantIDs[i] { + t.Errorf("position %d: got id %d, want %d", i, n.ID(), wantIDs[i]) + } + } + }) + + t.Run("NoEmissionWhenUnchanged", func(t *testing.T) { + updates := allNodes.Watch(t.Context(), 10*time.Millisecond, fastTop3) + <-updates // drain initial emission + + // Latencies are fixed, so no further emission should arrive. + select { + case cfg, ok := <-updates: + if ok { + t.Errorf("unexpected second emission: got ids %v", cfg.NodeIDs()) + } + case <-time.After(100 * time.Millisecond): + // expected: no second emission + } + }) + + t.Run("EmitsOnOrderChange", func(t *testing.T) { + n1 := makeNodeWithLatency(1, 10*time.Millisecond) + n2 := makeNodeWithLatency(2, 30*time.Millisecond) + n3 := makeNodeWithLatency(3, 20*time.Millisecond) + cfg := Configuration{n1, n2, n3} + top2 := func(c Configuration) Configuration { return c.SortBy(Latency)[:2] } + + const interval = 20 * time.Millisecond + updates := cfg.Watch(t.Context(), interval, top2) + first := <-updates + // Initial top-2: [1(10ms), 3(20ms)] + wantFirst := []uint32{1, 3} + for i, n := range first { + if n.ID() != wantFirst[i] { + t.Errorf("initial: position %d got id %d, want %d", i, n.ID(), wantFirst[i]) + } + } + + // Swap latencies: node 2 becomes fastest. + n1.router.SetLatency(40 * time.Millisecond) + n2.router.SetLatency(5 * time.Millisecond) + + select { + case second := <-updates: + // New top-2: [2(5ms), 3(20ms)] + wantSecond := []uint32{2, 3} + for i, n := range second { + if n.ID() != wantSecond[i] { + t.Errorf("after swap: position %d got id %d, want %d", i, n.ID(), wantSecond[i]) + } + } + case <-time.After(5 * interval): + t.Error("expected a second emission after latency swap, but none arrived") + } + }) + + t.Run("ChannelClosedOnCtxCancel", func(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + updates := allNodes.Watch(ctx, time.Hour, fastTop3) + <-updates // drain initial + cancel() + select { + case _, ok := <-updates: + if ok { + t.Error("channel should be closed after ctx cancel") + } + case <-time.After(time.Second): + t.Error("channel should be closed promptly after ctx cancel") + } + }) +} + func printNodes(t *testing.T, nodes []*Node) { t.Helper() for i, n := range nodes { From dfcd65766840b325b28230dd1a176a5efa105296 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Sat, 11 Apr 2026 09:50:37 +0200 Subject: [PATCH 3/8] test: add stability check for SortBy with unmeasured nodes --- config_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/config_test.go b/config_test.go index d3de661a..06da70cd 100644 --- a/config_test.go +++ b/config_test.go @@ -3,6 +3,7 @@ package gorums_test import ( "errors" "fmt" + "slices" "sync" "testing" @@ -272,6 +273,12 @@ func TestConfigurationSortBy(t *testing.T) { if sorted.Size() != cfg.Size() { t.Fatalf("sorted.Size() = %d, want %d", sorted.Size(), cfg.Size()) } + // When all nodes are unmeasured their reported latencies are all -1; thus, the + // Latency comparator returns 0 for every latency pair (the latencies are equal), + // so a stable sort must preserve the original order. + if got, want := sorted.NodeIDs(), cfg.NodeIDs(); !slices.Equal(got, want) { + t.Errorf("SortBy(Latency) with all-unmeasured nodes changed order: got %v, want %v", got, want) + } }) t.Run("ReturnsNewConfiguration", func(t *testing.T) { From 0b402600fa7e60babc40a6777c689b3e9852212b Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Sat, 11 Apr 2026 10:15:18 +0200 Subject: [PATCH 4/8] doc: clarify the switch guard in the Latency comparator The Latency comparator orders nodes without latency measurements last. --- node.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/node.go b/node.go index 6dbeeaf4..b263d383 100644 --- a/node.go +++ b/node.go @@ -267,7 +267,7 @@ func (n *Node) LastErr() error { // - A step-change in latency takes several round trips to settle because // each new sample contributes only 20% of the new value. // -// Use [Latency] as a comparator with [Configuration.SortBy] to order nodes +// Use the [Latency] comparator with [Configuration.SortBy] to order nodes // by their current observed latency. func (n *Node) Latency() time.Duration { return n.router.Latency() @@ -300,6 +300,9 @@ var LastNodeError = func(a, b *Node) int { // measurement. It is compatible with [slices.SortFunc] and [Configuration.SortBy]. var Latency = func(a, b *Node) int { la, lb := a.Latency(), b.Latency() + // Note: cmp.Compare alone would sort negative sentinel values first + // (as the smallest numbers), making unmeasured nodes appear fastest. + // The switch guards against that by pushing any negative value to the end. switch { case la < 0 && lb < 0: return 0 From 5d3e2273bafa61a3fea99e3607c084b1c7400115 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Sat, 11 Apr 2026 10:27:40 +0200 Subject: [PATCH 5/8] fix(config): add validation for Watch interval and derive function --- config.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/config.go b/config.go index cb80acd2..2ab981ba 100644 --- a/config.go +++ b/config.go @@ -236,7 +236,8 @@ func (c Configuration) SortBy(cmp func(*Node, *Node) int) Configuration { // Watch starts a background goroutine that calls derive(c) every interval and // emits the result on the returned channel whenever it differs from the previous // result. The initial result is always emitted before the first tick, so callers -// always receive a valid configuration immediately. +// always receive a valid configuration immediately. The interval must be greater +// than zero, and derive must be non-nil; otherwise, Watch will panic. // // The derive function receives the full configuration c and returns any derived // sub-configuration. Typical examples: @@ -257,6 +258,12 @@ func (c Configuration) SortBy(cmp func(*Node, *Node) int) Configuration { // // The goroutine exits and the channel is closed when ctx is cancelled. func (c Configuration) Watch(ctx context.Context, interval time.Duration, derive func(Configuration) Configuration) <-chan Configuration { + if interval <= 0 { + panic("gorums: Watch interval must be positive") + } + if derive == nil { + panic("gorums: Watch derive function must be non-nil") + } ch := make(chan Configuration, 1) go func() { defer close(ch) From 721280f52ed39f285b1f1118bd84fe4425bfa7a5 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Sat, 11 Apr 2026 10:40:34 +0200 Subject: [PATCH 6/8] doc: add guidance for combining Watch with error filtering in user guide --- doc/user-guide.md | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/doc/user-guide.md b/doc/user-guide.md index 3c20eac1..a9f10dda 100644 --- a/doc/user-guide.md +++ b/doc/user-guide.md @@ -1510,13 +1510,34 @@ sub-configuration only when the result changes. The initial snapshot is sent immediately, so callers always receive a valid configuration without waiting for the first tick. -The derive function can combine any operations on the configuration. For -example, to skip failed nodes before selecting the fastest: +### Combining Watch with Error Filtering + +To skip failed nodes on every periodic refresh, capture the most recent +`gorums.QuorumCallError` in a mutex-guarded variable and include it in the +derive function: ```go +var mu sync.Mutex +var lastQCErr gorums.QuorumCallError // zero value excludes no nodes + +// After each failed quorum call, record the error so the next Watch tick +// can exclude the offending nodes: +// +// var qcErr gorums.QuorumCallError +// if errors.As(err, &qcErr) { +// mu.Lock(); lastQCErr = qcErr; mu.Unlock() +// } + updates := allNodesCfg.Watch(ctx, 5*time.Second, func(c gorums.Configuration) gorums.Configuration { - return c.WithoutErrors(lastErr).SortBy(gorums.Latency)[:quorumSize] + mu.Lock() + qcErr := lastQCErr + mu.Unlock() + return c.WithoutErrors(qcErr).SortBy(gorums.Latency)[:quorumSize] }) +``` + +The zero value of `gorums.QuorumCallError` carries no node errors, so the +initial derive call behaves identically to the pure-latency example above. ### Measurement Limits From a34518d2fbbd25161dc937d9df95850c2d8080ab Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Sat, 11 Apr 2026 10:46:09 +0200 Subject: [PATCH 7/8] fix(doc): tolerated failures and quorum size calculation in user guide The LLM hallucinated some bad calculations for the quorum size and tolerated failures value. --- doc/user-guide.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/user-guide.md b/doc/user-guide.md index a9f10dda..e4a6f0c5 100644 --- a/doc/user-guide.md +++ b/doc/user-guide.md @@ -1449,8 +1449,8 @@ Sending to fewer nodes lowers tail latency without weakening correctness, as lon ```go const n = 5 // total nodes -const f = 1 // tolerated failures -quorumSize := (n+f)/2 + 1 // simple majority for crash-fault tolerance = 3 +const f = 2 // tolerated failures (n = 2f+1) +quorumSize := n/2 + 1 // simple majority for crash-fault tolerance = 3 // Re-derive the fast sub-configuration periodically (see guidance below). fastCfg := allNodesCfg.SortBy(gorums.Latency)[:quorumSize] From ad82cb426fe1f9ae8cebdbaaead612b867ba9c77 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Sat, 11 Apr 2026 12:05:56 +0200 Subject: [PATCH 8/8] doc: update Measurement Limits section This fixes some LLM misconceptions in the measurement limits section and adds some additional limitations to clarify what the latency measurement is actually measuring (RPC compute included and shared across all RPCs hosted on the node). --- doc/user-guide.md | 42 +++++++++++++++++++++++++++++++++++------- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/doc/user-guide.md b/doc/user-guide.md index e4a6f0c5..d3e9f68f 100644 --- a/doc/user-guide.md +++ b/doc/user-guide.md @@ -1545,25 +1545,53 @@ initial derive call behaves identically to the pure-latency example above. on it: * **No traffic → no measurement.** The estimate is only updated on successful - responses. A freshly created node, or a node that has been idle for a long - time, returns a negative value. `SortBy(gorums.Latency)` pushes such nodes - to the end of the slice, so you will not accidentally pick an unmeasured node - when slicing the front. + responses. A node that has never received a response returns a negative value. + `SortBy(gorums.Latency)` pushes such nodes to the end of the slice, so you + will not accidentally pick an unmeasured node when slicing the front. -* **Staleness.** If traffic to a node stops, the estimate freezes at the last - observed value. A node that was fast yesterday but degraded overnight will - still look fast until new responses arrive and update the average. +* **Staleness.** If traffic to a node stops, the estimate holds its last + observed value indefinitely. A node that appeared fast previously will + continue to look fast until new responses arrive and update the average, + even if network conditions have since changed. * **Slow convergence.** The moving average uses a smoothing factor of 0.2, so a sudden step-change in latency takes roughly five round trips to be reflected in the estimate. Short-lived spikes are smoothed away, which is usually desirable but means the estimate lags behind rapid fluctuations. +* **RPC compute time included.** The measurement covers the full round-trip + from sending the request to receiving the response. For long-running RPCs, + server-side processing time is included in the estimate, which means the + measurement reflects more than just network latency. In workloads dominated + by heavy RPCs, this can skew node selection toward nodes with lighter server + load rather than nodes that are genuinely closer on the network. + +* **Latency is per node, not per RPC.** The estimate is a single value + maintained for each node, shared across all RPC types sent to that node. + It reflects the node's overall responsiveness rather than the latency of + any particular operation, making it a blunt instrument for workloads where + different RPCs have meaningfully different latency profiles. + +* **Multicast and unicast calls are not measured.** Only calls that receive + replies update the latency estimate. Multicast calls (fire-and-forget) and + unicast calls with no response do not contribute measurements, so nodes + contacted exclusively via these call types will remain unmeasured. + * **No variance information.** A single average cannot distinguish a stable low-latency node from a high-variance node whose average happens to look good. If jitter matters for your workload, the built-in estimate is not sufficient on its own. +### Best Practices + +The limitations above can be mitigated with a few straightforward patterns: + +* **Keep traffic flowing.** Latency estimates are only updated when RPCs + complete successfully. If your application has quiet periods, the estimates + for idle nodes will become stale. Scheduling periodic lightweight RPCs + (such as a ping or a no-op read) ensures that estimates remain fresh even + when there is no organic traffic. + * **Quorum calls contact all nodes.** Latency-based ordering only improves performance when you slice the sorted configuration to a smaller subset. Passing a full-size sorted configuration to a quorum call provides no speedup