diff --git a/config.go b/config.go index 9d34363c..2ab981ba 100644 --- a/config.go +++ b/config.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "slices" + "time" ) // Configuration represents a static set of nodes on which multicast or @@ -197,6 +198,99 @@ func (c Configuration) Difference(other Configuration) Configuration { return c.Remove(other.NodeIDs()...) } +// SortBy returns a new Configuration with nodes ordered by the given comparator. +// The original configuration is not modified. +// +// Use this with the built-in node comparator functions [ID], [LastNodeError], +// and [Latency]: +// +// fastest := cfg.SortBy(gorums.Latency) // ascending by latency +// healthy := cfg.SortBy(gorums.LastNodeError) // no-error nodes first +// +// Comparators can be combined for multi-key ordering: +// +// cfg.SortBy(func(a, b *Node) int { +// if r := gorums.LastNodeError(a, b); r != 0 { +// return r +// } +// return gorums.Latency(a, b) +// }) +// +// SortBy uses a stable sort, so nodes with equal comparator values retain +// their original relative order. +// +// Note: quorum calls contact every node in the configuration regardless of +// order. Sorting only affects which nodes are selected when the result is +// sliced to a smaller subset, e.g., cfg.SortBy(gorums.Latency)[:quorumSize]. +// See the "Latency-Based Node Selection" section of the user guide for +// guidance on sub-configuration sizing and re-sort frequency. +func (c Configuration) SortBy(cmp func(*Node, *Node) int) Configuration { + if len(c) == 0 { + return nil + } + sorted := slices.Clone(c) + slices.SortStableFunc(sorted, cmp) + return sorted +} + +// Watch starts a background goroutine that calls derive(c) every interval and +// emits the result on the returned channel whenever it differs from the previous +// result. The initial result is always emitted before the first tick, so callers +// always receive a valid configuration immediately. The interval must be greater +// than zero, and derive must be non-nil; otherwise, Watch will panic. +// +// The derive function receives the full configuration c and returns any derived +// sub-configuration. Typical examples: +// +// // Latency-based top-k subset: +// cfg.Watch(ctx, 5*time.Second, func(c gorums.Configuration) gorums.Configuration { +// return c.SortBy(gorums.Latency)[:quorumSize] +// }) +// +// // Skip failed nodes first, then pick fastest: +// cfg.Watch(ctx, 5*time.Second, func(c gorums.Configuration) gorums.Configuration { +// return c.WithoutErrors(lastErr).SortBy(gorums.Latency)[:quorumSize] +// }) +// +// The returned channel has a buffer of 1. If the consumer is slow and has not +// yet read the previous update, the goroutine skips the emission and waits for +// the next tick to re-evaluate. +// +// The goroutine exits and the channel is closed when ctx is cancelled. +func (c Configuration) Watch(ctx context.Context, interval time.Duration, derive func(Configuration) Configuration) <-chan Configuration { + if interval <= 0 { + panic("gorums: Watch interval must be positive") + } + if derive == nil { + panic("gorums: Watch derive function must be non-nil") + } + ch := make(chan Configuration, 1) + go func() { + defer close(ch) + current := derive(c) + ch <- current + + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + fresh := derive(c) + if !fresh.Equal(current) { + current = fresh + select { + case ch <- current: + default: // consumer hasn't read yet; next tick will re-evaluate + } + } + case <-ctx.Done(): + return + } + } + }() + return ch +} + // WithoutErrors returns a new Configuration excluding nodes that failed in the // given QuorumCallError. If specific error types are provided, only nodes whose // errors match one of those types (using errors.Is) will be excluded. diff --git a/config_test.go b/config_test.go index e934c8fd..06da70cd 100644 --- a/config_test.go +++ b/config_test.go @@ -3,6 +3,7 @@ package gorums_test import ( "errors" "fmt" + "slices" "sync" "testing" @@ -241,6 +242,78 @@ func TestEmptyConfiguration(t *testing.T) { }) } +func TestConfigurationSortBy(t *testing.T) { + cfg, err := gorums.NewConfig(gorums.WithNodeList(nodes), gorums.InsecureDialOptions(t)) + if err != nil { + t.Fatal(err) + } + t.Cleanup(gorums.Closer(t, cfg)) + + t.Run("SortByID", func(t *testing.T) { + sorted := cfg.SortBy(gorums.ID) + if sorted.Size() != cfg.Size() { + t.Fatalf("sorted.Size() = %d, want %d", sorted.Size(), cfg.Size()) + } + for i := 1; i < sorted.Size(); i++ { + if sorted[i].ID() < sorted[i-1].ID() { + t.Errorf("SortBy(ID): not sorted at position %d (id %d < id %d)", + i, sorted[i].ID(), sorted[i-1].ID()) + } + } + }) + + t.Run("SortByLatency/AllUnmeasured", func(t *testing.T) { + // Fresh config: no calls yet, so all nodes report latency < 0. + for _, n := range cfg.Nodes() { + if n.Latency() >= 0 { + t.Fatalf("expected no latency measurement on fresh node, got %v", n.Latency()) + } + } + sorted := cfg.SortBy(gorums.Latency) + if sorted.Size() != cfg.Size() { + t.Fatalf("sorted.Size() = %d, want %d", sorted.Size(), cfg.Size()) + } + // When all nodes are unmeasured their reported latencies are all -1; thus, the + // Latency comparator returns 0 for every latency pair (the latencies are equal), + // so a stable sort must preserve the original order. + if got, want := sorted.NodeIDs(), cfg.NodeIDs(); !slices.Equal(got, want) { + t.Errorf("SortBy(Latency) with all-unmeasured nodes changed order: got %v, want %v", got, want) + } + }) + + t.Run("ReturnsNewConfiguration", func(t *testing.T) { + sorted := cfg.SortBy(gorums.ID) + cfgSlice := cfg.Nodes() + sortedSlice := sorted.Nodes() + if len(cfgSlice) > 0 && len(sortedSlice) > 0 && &cfgSlice[0] == &sortedSlice[0] { + t.Error("SortBy returned same backing array — violates immutability") + } + }) + + t.Run("Empty/ReturnsNil", func(t *testing.T) { + var empty gorums.Configuration + if got := empty.SortBy(gorums.ID); got != nil { + t.Fatalf("empty.SortBy(ID) = %v, want nil", got) + } + if got := empty.SortBy(gorums.Latency); got != nil { + t.Fatalf("empty.SortBy(Latency) = %v, want nil", got) + } + }) + + t.Run("SortByLastNodeErrorThenLatency", func(t *testing.T) { + // Composing two comparators should not panic and must return a valid config. + sorted := cfg.SortBy(func(a, b *gorums.Node) int { + if r := gorums.LastNodeError(a, b); r != 0 { + return r + } + return gorums.Latency(a, b) + }) + if sorted.Size() != cfg.Size() { + t.Fatalf("composed sort size = %d, want %d", sorted.Size(), cfg.Size()) + } + }) +} + func assertPanicMessage(t *testing.T, want string, fn func()) { t.Helper() defer func() { diff --git a/doc/user-guide.md b/doc/user-guide.md index 12bfb9ab..d3e9f68f 100644 --- a/doc/user-guide.md +++ b/doc/user-guide.md @@ -1409,6 +1409,194 @@ func ExampleConfigClient() { } ``` +## Latency-Based Node Selection + +Gorums tracks the round-trip latency to each node as an exponentially weighted +moving average, accessible via `node.Latency()`. +This section explains how to use that information to construct faster +sub-configurations, and what to watch out for when doing so. + +### The Latency Comparator + +`gorums.Latency` is a comparator function compatible with `slices.SortFunc` and `Configuration.SortBy`. +The comparator orders nodes ascending by their current latency estimates; +nodes without any measurements (freshly created, never sent traffic) are sorted last. + +```go +// Sort all nodes by ascending latency. +sorted := cfg.SortBy(gorums.Latency) + +// Pick the two fastest nodes. +fast2 := cfg.SortBy(gorums.Latency)[:2] +``` + +Comparators can be chained for multi-key ordering. +For example, healthy nodes first, then by latency within each group: + +```go +sorted := cfg.SortBy(func(a, b *gorums.Node) int { + if r := gorums.LastNodeError(a, b); r != 0 { + return r + } + return gorums.Latency(a, b) +}) +``` + +### Using a Smaller Fast Configuration + +The most practical use of latency-based selection is reducing the quorum size to the fastest subset of nodes. +Sending to fewer nodes lowers tail latency without weakening correctness, as long as the subset still meets your quorum threshold. + +```go +const n = 5 // total nodes +const f = 2 // tolerated failures (n = 2f+1) +quorumSize := n/2 + 1 // simple majority for crash-fault tolerance = 3 + +// Re-derive the fast sub-configuration periodically (see guidance below). +fastCfg := allNodesCfg.SortBy(gorums.Latency)[:quorumSize] +fastCfgCtx := fastCfg.Context(ctx) + +reply, err := ReadQC(fastCfgCtx, &ReadRequest{Key: "x"}).Majority() +``` + +Combining with error-based filtering is straightforward: drop failed nodes +first, then pick the fastest of those that remain: + +```go +var qcErr gorums.QuorumCallError +if errors.As(err, &qcErr) { + fastCfg = cfg.WithoutErrors(qcErr).SortBy(gorums.Latency)[:quorumSize] +} +``` + +> **Note:** For quorum calls, all nodes in the configuration are contacted — +> the ordering only matters when you slice the result to a subset. +> Sorting a full configuration without slicing has no effect on call latency. + +### How Often to Re-Sort + +`SortBy` returns a snapshot of the ordering at one point in time. +Latency measurements change as network conditions shift; the snapshot does not +auto-update. + +As a rule of thumb: + +* **Every few seconds** is a reasonable re-sort interval for most deployments. + A periodic goroutine or a lazy re-sort at the start of each request batch + both work well. +* **On every single call** is usually unnecessary and wastes allocations. + Each `SortBy` clones the node slice. +* **After a failed quorum call**, always re-evaluate: a node that caused the + failure should be excluded via `WithoutErrors` before re-sorting. +* **After a topology change** (node added or removed), derive the sub-configuration + from the new full configuration rather than sorting an outdated one. + +A simple periodic refresh pattern using `Configuration.Watch`: + +```go +updates := allNodesCfg.Watch(ctx, 5*time.Second, func(c gorums.Configuration) gorums.Configuration { + return c.SortBy(gorums.Latency)[:quorumSize] +}) +fastCfg := <-updates // initial snapshot, available before the first tick + +// In your request loop or a dedicated goroutine, consume updates as they arrive: +for cfg := range updates { + fastCfg = cfg // ordering changed; start using the new snapshot +} +``` + +`Watch` calls the derive function every five seconds and emits a new +sub-configuration only when the result changes. The initial snapshot is sent +immediately, so callers always receive a valid configuration without waiting +for the first tick. + +### Combining Watch with Error Filtering + +To skip failed nodes on every periodic refresh, capture the most recent +`gorums.QuorumCallError` in a mutex-guarded variable and include it in the +derive function: + +```go +var mu sync.Mutex +var lastQCErr gorums.QuorumCallError // zero value excludes no nodes + +// After each failed quorum call, record the error so the next Watch tick +// can exclude the offending nodes: +// +// var qcErr gorums.QuorumCallError +// if errors.As(err, &qcErr) { +// mu.Lock(); lastQCErr = qcErr; mu.Unlock() +// } + +updates := allNodesCfg.Watch(ctx, 5*time.Second, func(c gorums.Configuration) gorums.Configuration { + mu.Lock() + qcErr := lastQCErr + mu.Unlock() + return c.WithoutErrors(qcErr).SortBy(gorums.Latency)[:quorumSize] +}) +``` + +The zero value of `gorums.QuorumCallError` carries no node errors, so the +initial derive call behaves identically to the pure-latency example above. + +### Measurement Limits + +`Node.Latency()` has several limits that are worth understanding before building +on it: + +* **No traffic → no measurement.** The estimate is only updated on successful + responses. A node that has never received a response returns a negative value. + `SortBy(gorums.Latency)` pushes such nodes to the end of the slice, so you + will not accidentally pick an unmeasured node when slicing the front. + +* **Staleness.** If traffic to a node stops, the estimate holds its last + observed value indefinitely. A node that appeared fast previously will + continue to look fast until new responses arrive and update the average, + even if network conditions have since changed. + +* **Slow convergence.** The moving average uses a smoothing factor of 0.2, so + a sudden step-change in latency takes roughly five round trips to be reflected + in the estimate. Short-lived spikes are smoothed away, which is usually + desirable but means the estimate lags behind rapid fluctuations. + +* **RPC compute time included.** The measurement covers the full round-trip + from sending the request to receiving the response. For long-running RPCs, + server-side processing time is included in the estimate, which means the + measurement reflects more than just network latency. In workloads dominated + by heavy RPCs, this can skew node selection toward nodes with lighter server + load rather than nodes that are genuinely closer on the network. + +* **Latency is per node, not per RPC.** The estimate is a single value + maintained for each node, shared across all RPC types sent to that node. + It reflects the node's overall responsiveness rather than the latency of + any particular operation, making it a blunt instrument for workloads where + different RPCs have meaningfully different latency profiles. + +* **Multicast and unicast calls are not measured.** Only calls that receive + replies update the latency estimate. Multicast calls (fire-and-forget) and + unicast calls with no response do not contribute measurements, so nodes + contacted exclusively via these call types will remain unmeasured. + +* **No variance information.** A single average cannot distinguish a stable + low-latency node from a high-variance node whose average happens to look good. + If jitter matters for your workload, the built-in estimate is not sufficient + on its own. + +### Best Practices + +The limitations above can be mitigated with a few straightforward patterns: + +* **Keep traffic flowing.** Latency estimates are only updated when RPCs + complete successfully. If your application has quiet periods, the estimates + for idle nodes will become stale. Scheduling periodic lightweight RPCs + (such as a ping or a no-op read) ensures that estimates remain fresh even + when there is no organic traffic. + +* **Quorum calls contact all nodes.** Latency-based ordering only improves + performance when you slice the sorted configuration to a smaller subset. + Passing a full-size sorted configuration to a quorum call provides no speedup + because every node in the configuration is contacted regardless of order. + ## Interactive REPL The storage example (`examples/storage`) includes an interactive Read-Eval-Print Loop (REPL) that lets you send RPCs and quorum calls against live storage servers. diff --git a/internal/stream/router.go b/internal/stream/router.go index b871fd3a..a57f68f9 100644 --- a/internal/stream/router.go +++ b/internal/stream/router.go @@ -66,6 +66,26 @@ func NewMessageRouter(handler ...RequestHandler) *MessageRouter { } } +// NewMessageRouterWithLatency creates a new MessageRouter with an initial latency +// for testing. The latency may be updated by subsequent message routing operations. +// This function should only be used in tests. +// +// To change the latency after creation, use [MessageRouter.SetLatency]. +func NewMessageRouterWithLatency(latency time.Duration) *MessageRouter { + return &MessageRouter{ + pending: make(map[uint64]Request), + latency: latency, + } +} + +// SetLatency directly sets the latency estimate. This function should only +// be used in tests to simulate latency changes without actual message routing. +func (r *MessageRouter) SetLatency(latency time.Duration) { + r.mu.Lock() + defer r.mu.Unlock() + r.latency = latency +} + // DispatchLocalRequest handles the request in-process for the local node, // bypassing the network. It delivers the request to the registered handler, // serializing execution the same way remote nodes do: the next dispatch is diff --git a/node.go b/node.go index 3b75e5ce..b263d383 100644 --- a/node.go +++ b/node.go @@ -254,20 +254,34 @@ func (n *Node) LastErr() error { return nil } -// Latency returns the latency between the client and this node. +// Latency returns the current round-trip latency estimate for this node, +// computed as an exponentially weighted moving average with a +// smoothing factor of 0.2 (roughly a 5-sample window). +// +// The returned value has several important limits: +// - It returns -1s until the first successful response is received; treat +// negative values as "no data" rather than a real measurement. +// - The estimate is only updated when there is active traffic. On an idle +// node the value may be arbitrarily stale and will not reflect recent +// changes in network conditions. +// - A step-change in latency takes several round trips to settle because +// each new sample contributes only 20% of the new value. +// +// Use the [Latency] comparator with [Configuration.SortBy] to order nodes +// by their current observed latency. func (n *Node) Latency() time.Duration { return n.router.Latency() } // ID compares nodes by their identifier in increasing order. -// It is compatible with [slices.SortFunc] and related helpers. +// It is compatible with [slices.SortFunc] and [Configuration.SortBy]. var ID = func(a, b *Node) int { return cmp.Compare(a.id, b.id) } // LastNodeError compares nodes by their LastErr() status. // Nodes with no error sort before nodes with an error. -// It is compatible with [slices.SortFunc] and related helpers. +// It is compatible with [slices.SortFunc] and [Configuration.SortBy]. var LastNodeError = func(a, b *Node) int { aErr := a.LastErr() bErr := b.LastErr() @@ -281,5 +295,24 @@ var LastNodeError = func(a, b *Node) int { } } +// Latency compares nodes by their current latency estimate in ascending order. +// Nodes with no measurement yet (negative latency value) sort after nodes with a +// measurement. It is compatible with [slices.SortFunc] and [Configuration.SortBy]. +var Latency = func(a, b *Node) int { + la, lb := a.Latency(), b.Latency() + // Note: cmp.Compare alone would sort negative sentinel values first + // (as the smallest numbers), making unmeasured nodes appear fastest. + // The switch guards against that by pushing any negative value to the end. + switch { + case la < 0 && lb < 0: + return 0 + case la < 0: + return 1 + case lb < 0: + return -1 + } + return cmp.Compare(la, lb) +} + // compile-time assertion for interface compliance. var _ stream.PeerNode = (*Node)(nil) diff --git a/node_test.go b/node_test.go index 325fb763..57a12445 100644 --- a/node_test.go +++ b/node_test.go @@ -21,6 +21,9 @@ func TestNodeSort(t *testing.T) { n.channel.Store(stream.NewChannelWithState(err)) return n } + makeNodeWithLatency := func(id uint32, lat time.Duration) *Node { + return &Node{id: id, router: stream.NewMessageRouterWithLatency(lat)} + } someErr := errors.New("some error") nodes := []*Node{ makeNode(100, nil), @@ -68,6 +71,163 @@ func TestNodeSort(t *testing.T) { } } }) + + t.Run("ByLatency", func(t *testing.T) { + // Node 3 has no measurement (-1s): should sort last. + // Remaining nodes sort ascending by latency. + ns := []*Node{ + makeNodeWithLatency(1, 30*time.Millisecond), + makeNodeWithLatency(2, 10*time.Millisecond), + makeNodeWithLatency(3, -1*time.Second), // no measurement + makeNodeWithLatency(4, 20*time.Millisecond), + } + slices.SortFunc(ns, Latency) + // Expected: 2 (10ms), 4 (20ms), 1 (30ms), 3 (no data). + wantIDs := []uint32{2, 4, 1, 3} + for i, n := range ns { + if n.id != wantIDs[i] { + t.Errorf("by latency: position %d: got id %d, want %d", i, n.id, wantIDs[i]) + printNodes(t, ns) + } + } + }) + + t.Run("ByLatency/AllUnmeasured", func(t *testing.T) { + // All nodes without measurements: stable order must be preserved. + ns := []*Node{ + makeNodeWithLatency(1, -1*time.Second), + makeNodeWithLatency(2, -1*time.Second), + makeNodeWithLatency(3, -1*time.Second), + } + slices.SortStableFunc(ns, Latency) + wantIDs := []uint32{1, 2, 3} + for i, n := range ns { + if n.id != wantIDs[i] { + t.Errorf("by latency (all unmeasured): position %d: got id %d, want %d", i, n.id, wantIDs[i]) + } + } + }) + + t.Run("ByLatencyThenID", func(t *testing.T) { + // Two nodes with the same latency: secondary sort by ID breaks ties. + ns := []*Node{ + makeNodeWithLatency(10, 20*time.Millisecond), + makeNodeWithLatency(5, 10*time.Millisecond), + makeNodeWithLatency(7, 20*time.Millisecond), + } + slices.SortFunc(ns, func(a, b *Node) int { + if r := Latency(a, b); r != 0 { + return r + } + return ID(a, b) + }) + // Expected: 5 (10ms), 7 (20ms, lower id), 10 (20ms, higher id). + wantIDs := []uint32{5, 7, 10} + for i, n := range ns { + if n.id != wantIDs[i] { + t.Errorf("by latency then id: position %d: got id %d, want %d", i, n.id, wantIDs[i]) + printNodes(t, ns) + } + } + }) +} + +func TestConfigurationWatch(t *testing.T) { + makeNodeWithLatency := func(id uint32, lat time.Duration) *Node { + return &Node{id: id, router: stream.NewMessageRouterWithLatency(lat)} + } + + // allNodes has five nodes; top-3 by ascending latency are 2(10ms), 3(20ms), 1(30ms). + allNodes := Configuration{ + makeNodeWithLatency(1, 30*time.Millisecond), + makeNodeWithLatency(2, 10*time.Millisecond), + makeNodeWithLatency(3, 20*time.Millisecond), + makeNodeWithLatency(4, 40*time.Millisecond), + makeNodeWithLatency(5, 50*time.Millisecond), + } + const quorumSize = 3 + fastTop3 := func(c Configuration) Configuration { return c.SortBy(Latency)[:quorumSize] } + + t.Run("EmitsInitialSnapshot", func(t *testing.T) { + // Use a very long interval so only the initial emission fires. + updates := allNodes.Watch(t.Context(), time.Hour, fastTop3) + snap := <-updates + if len(snap) != quorumSize { + t.Fatalf("initial snapshot size = %d, want %d", len(snap), quorumSize) + } + wantIDs := []uint32{2, 3, 1} + for i, n := range snap { + if n.ID() != wantIDs[i] { + t.Errorf("position %d: got id %d, want %d", i, n.ID(), wantIDs[i]) + } + } + }) + + t.Run("NoEmissionWhenUnchanged", func(t *testing.T) { + updates := allNodes.Watch(t.Context(), 10*time.Millisecond, fastTop3) + <-updates // drain initial emission + + // Latencies are fixed, so no further emission should arrive. + select { + case cfg, ok := <-updates: + if ok { + t.Errorf("unexpected second emission: got ids %v", cfg.NodeIDs()) + } + case <-time.After(100 * time.Millisecond): + // expected: no second emission + } + }) + + t.Run("EmitsOnOrderChange", func(t *testing.T) { + n1 := makeNodeWithLatency(1, 10*time.Millisecond) + n2 := makeNodeWithLatency(2, 30*time.Millisecond) + n3 := makeNodeWithLatency(3, 20*time.Millisecond) + cfg := Configuration{n1, n2, n3} + top2 := func(c Configuration) Configuration { return c.SortBy(Latency)[:2] } + + const interval = 20 * time.Millisecond + updates := cfg.Watch(t.Context(), interval, top2) + first := <-updates + // Initial top-2: [1(10ms), 3(20ms)] + wantFirst := []uint32{1, 3} + for i, n := range first { + if n.ID() != wantFirst[i] { + t.Errorf("initial: position %d got id %d, want %d", i, n.ID(), wantFirst[i]) + } + } + + // Swap latencies: node 2 becomes fastest. + n1.router.SetLatency(40 * time.Millisecond) + n2.router.SetLatency(5 * time.Millisecond) + + select { + case second := <-updates: + // New top-2: [2(5ms), 3(20ms)] + wantSecond := []uint32{2, 3} + for i, n := range second { + if n.ID() != wantSecond[i] { + t.Errorf("after swap: position %d got id %d, want %d", i, n.ID(), wantSecond[i]) + } + } + case <-time.After(5 * interval): + t.Error("expected a second emission after latency swap, but none arrived") + } + }) + + t.Run("ChannelClosedOnCtxCancel", func(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + updates := allNodes.Watch(ctx, time.Hour, fastTop3) + <-updates // drain initial + cancel() + select { + case _, ok := <-updates: + if ok { + t.Error("channel should be closed after ctx cancel") + } + case <-time.After(time.Second): + t.Error("channel should be closed promptly after ctx cancel") + } + }) } func printNodes(t *testing.T, nodes []*Node) {