Skip to content
94 changes: 94 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"slices"
"time"
)

// Configuration represents a static set of nodes on which multicast or
Expand Down Expand Up @@ -197,6 +198,99 @@ func (c Configuration) Difference(other Configuration) Configuration {
return c.Remove(other.NodeIDs()...)
}

// SortBy returns a new Configuration with nodes ordered by the given comparator.
// The original configuration is not modified.
//
// Use this with the built-in node comparator functions [ID], [LastNodeError],
// and [Latency]:
//
// fastest := cfg.SortBy(gorums.Latency) // ascending by latency
// healthy := cfg.SortBy(gorums.LastNodeError) // no-error nodes first
//
// Comparators can be combined for multi-key ordering:
//
// cfg.SortBy(func(a, b *Node) int {
// if r := gorums.LastNodeError(a, b); r != 0 {
// return r
// }
// return gorums.Latency(a, b)
// })
//
// SortBy uses a stable sort, so nodes with equal comparator values retain
// their original relative order.
//
// Note: quorum calls contact every node in the configuration regardless of
// order. Sorting only affects which nodes are selected when the result is
// sliced to a smaller subset, e.g., cfg.SortBy(gorums.Latency)[:quorumSize].
// See the "Latency-Based Node Selection" section of the user guide for
// guidance on sub-configuration sizing and re-sort frequency.
func (c Configuration) SortBy(cmp func(*Node, *Node) int) Configuration {
if len(c) == 0 {
return nil
}
sorted := slices.Clone(c)
slices.SortStableFunc(sorted, cmp)
return sorted
}

// Watch starts a background goroutine that calls derive(c) every interval and
// emits the result on the returned channel whenever it differs from the previous
// result. The initial result is always emitted before the first tick, so callers
// always receive a valid configuration immediately. The interval must be greater
// than zero, and derive must be non-nil; otherwise, Watch will panic.
//
// The derive function receives the full configuration c and returns any derived
// sub-configuration. Typical examples:
//
// // Latency-based top-k subset:
// cfg.Watch(ctx, 5*time.Second, func(c gorums.Configuration) gorums.Configuration {
// return c.SortBy(gorums.Latency)[:quorumSize]
// })
//
// // Skip failed nodes first, then pick fastest:
// cfg.Watch(ctx, 5*time.Second, func(c gorums.Configuration) gorums.Configuration {
// return c.WithoutErrors(lastErr).SortBy(gorums.Latency)[:quorumSize]
// })
//
// The returned channel has a buffer of 1. If the consumer is slow and has not
// yet read the previous update, the goroutine skips the emission and waits for
// the next tick to re-evaluate.
//
// The goroutine exits and the channel is closed when ctx is cancelled.
func (c Configuration) Watch(ctx context.Context, interval time.Duration, derive func(Configuration) Configuration) <-chan Configuration {
if interval <= 0 {
panic("gorums: Watch interval must be positive")
}
if derive == nil {
panic("gorums: Watch derive function must be non-nil")
}
ch := make(chan Configuration, 1)
go func() {
defer close(ch)
current := derive(c)
ch <- current

ticker := time.NewTicker(interval)
defer ticker.Stop()
Comment thread
meling marked this conversation as resolved.
for {
select {
case <-ticker.C:
fresh := derive(c)
if !fresh.Equal(current) {
current = fresh
select {
case ch <- current:
default: // consumer hasn't read yet; next tick will re-evaluate
}
}
case <-ctx.Done():
return
}
}
}()
return ch
}

// WithoutErrors returns a new Configuration excluding nodes that failed in the
// given QuorumCallError. If specific error types are provided, only nodes whose
// errors match one of those types (using errors.Is) will be excluded.
Expand Down
73 changes: 73 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gorums_test
import (
"errors"
"fmt"
"slices"
"sync"
"testing"

Expand Down Expand Up @@ -241,6 +242,78 @@ func TestEmptyConfiguration(t *testing.T) {
})
}

func TestConfigurationSortBy(t *testing.T) {
cfg, err := gorums.NewConfig(gorums.WithNodeList(nodes), gorums.InsecureDialOptions(t))
if err != nil {
t.Fatal(err)
}
t.Cleanup(gorums.Closer(t, cfg))

t.Run("SortByID", func(t *testing.T) {
sorted := cfg.SortBy(gorums.ID)
if sorted.Size() != cfg.Size() {
t.Fatalf("sorted.Size() = %d, want %d", sorted.Size(), cfg.Size())
}
for i := 1; i < sorted.Size(); i++ {
if sorted[i].ID() < sorted[i-1].ID() {
t.Errorf("SortBy(ID): not sorted at position %d (id %d < id %d)",
i, sorted[i].ID(), sorted[i-1].ID())
}
}
})

t.Run("SortByLatency/AllUnmeasured", func(t *testing.T) {
// Fresh config: no calls yet, so all nodes report latency < 0.
for _, n := range cfg.Nodes() {
if n.Latency() >= 0 {
t.Fatalf("expected no latency measurement on fresh node, got %v", n.Latency())
}
}
sorted := cfg.SortBy(gorums.Latency)
if sorted.Size() != cfg.Size() {
t.Fatalf("sorted.Size() = %d, want %d", sorted.Size(), cfg.Size())
}
Comment thread
meling marked this conversation as resolved.
// When all nodes are unmeasured their reported latencies are all -1; thus, the
// Latency comparator returns 0 for every latency pair (the latencies are equal),
// so a stable sort must preserve the original order.
if got, want := sorted.NodeIDs(), cfg.NodeIDs(); !slices.Equal(got, want) {
t.Errorf("SortBy(Latency) with all-unmeasured nodes changed order: got %v, want %v", got, want)
}
})

t.Run("ReturnsNewConfiguration", func(t *testing.T) {
sorted := cfg.SortBy(gorums.ID)
cfgSlice := cfg.Nodes()
sortedSlice := sorted.Nodes()
if len(cfgSlice) > 0 && len(sortedSlice) > 0 && &cfgSlice[0] == &sortedSlice[0] {
t.Error("SortBy returned same backing array — violates immutability")
}
})

t.Run("Empty/ReturnsNil", func(t *testing.T) {
var empty gorums.Configuration
if got := empty.SortBy(gorums.ID); got != nil {
t.Fatalf("empty.SortBy(ID) = %v, want nil", got)
}
if got := empty.SortBy(gorums.Latency); got != nil {
t.Fatalf("empty.SortBy(Latency) = %v, want nil", got)
}
})

t.Run("SortByLastNodeErrorThenLatency", func(t *testing.T) {
// Composing two comparators should not panic and must return a valid config.
sorted := cfg.SortBy(func(a, b *gorums.Node) int {
if r := gorums.LastNodeError(a, b); r != 0 {
return r
}
return gorums.Latency(a, b)
})
if sorted.Size() != cfg.Size() {
t.Fatalf("composed sort size = %d, want %d", sorted.Size(), cfg.Size())
}
})
}

func assertPanicMessage(t *testing.T, want string, fn func()) {
t.Helper()
defer func() {
Expand Down
188 changes: 188 additions & 0 deletions doc/user-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1409,6 +1409,194 @@ func ExampleConfigClient() {
}
```

## Latency-Based Node Selection

Gorums tracks the round-trip latency to each node as an exponentially weighted
moving average, accessible via `node.Latency()`.
This section explains how to use that information to construct faster
sub-configurations, and what to watch out for when doing so.

### The Latency Comparator

`gorums.Latency` is a comparator function compatible with `slices.SortFunc` and `Configuration.SortBy`.
The comparator orders nodes ascending by their current latency estimates;
nodes without any measurements (freshly created, never sent traffic) are sorted last.

```go
// Sort all nodes by ascending latency.
sorted := cfg.SortBy(gorums.Latency)

// Pick the two fastest nodes.
fast2 := cfg.SortBy(gorums.Latency)[:2]
```

Comparators can be chained for multi-key ordering.
For example, healthy nodes first, then by latency within each group:

```go
sorted := cfg.SortBy(func(a, b *gorums.Node) int {
if r := gorums.LastNodeError(a, b); r != 0 {
return r
}
return gorums.Latency(a, b)
})
```

### Using a Smaller Fast Configuration

The most practical use of latency-based selection is reducing the quorum size to the fastest subset of nodes.
Sending to fewer nodes lowers tail latency without weakening correctness, as long as the subset still meets your quorum threshold.

```go
const n = 5 // total nodes
const f = 2 // tolerated failures (n = 2f+1)
quorumSize := n/2 + 1 // simple majority for crash-fault tolerance = 3

// Re-derive the fast sub-configuration periodically (see guidance below).
fastCfg := allNodesCfg.SortBy(gorums.Latency)[:quorumSize]
fastCfgCtx := fastCfg.Context(ctx)

reply, err := ReadQC(fastCfgCtx, &ReadRequest{Key: "x"}).Majority()
```

Combining with error-based filtering is straightforward: drop failed nodes
first, then pick the fastest of those that remain:

```go
var qcErr gorums.QuorumCallError
if errors.As(err, &qcErr) {
fastCfg = cfg.WithoutErrors(qcErr).SortBy(gorums.Latency)[:quorumSize]
}
```

> **Note:** For quorum calls, all nodes in the configuration are contacted —
> the ordering only matters when you slice the result to a subset.
> Sorting a full configuration without slicing has no effect on call latency.

### How Often to Re-Sort

`SortBy` returns a snapshot of the ordering at one point in time.
Latency measurements change as network conditions shift; the snapshot does not
auto-update.

As a rule of thumb:

* **Every few seconds** is a reasonable re-sort interval for most deployments.
A periodic goroutine or a lazy re-sort at the start of each request batch
both work well.
* **On every single call** is usually unnecessary and wastes allocations.
Each `SortBy` clones the node slice.
* **After a failed quorum call**, always re-evaluate: a node that caused the
failure should be excluded via `WithoutErrors` before re-sorting.
* **After a topology change** (node added or removed), derive the sub-configuration
from the new full configuration rather than sorting an outdated one.

A simple periodic refresh pattern using `Configuration.Watch`:

```go
updates := allNodesCfg.Watch(ctx, 5*time.Second, func(c gorums.Configuration) gorums.Configuration {
return c.SortBy(gorums.Latency)[:quorumSize]
})
fastCfg := <-updates // initial snapshot, available before the first tick

// In your request loop or a dedicated goroutine, consume updates as they arrive:
for cfg := range updates {
fastCfg = cfg // ordering changed; start using the new snapshot
}
```

`Watch` calls the derive function every five seconds and emits a new
sub-configuration only when the result changes. The initial snapshot is sent
immediately, so callers always receive a valid configuration without waiting
for the first tick.

### Combining Watch with Error Filtering

To skip failed nodes on every periodic refresh, capture the most recent
`gorums.QuorumCallError` in a mutex-guarded variable and include it in the
derive function:

```go
var mu sync.Mutex
var lastQCErr gorums.QuorumCallError // zero value excludes no nodes

// After each failed quorum call, record the error so the next Watch tick
// can exclude the offending nodes:
//
// var qcErr gorums.QuorumCallError
// if errors.As(err, &qcErr) {
// mu.Lock(); lastQCErr = qcErr; mu.Unlock()
// }

updates := allNodesCfg.Watch(ctx, 5*time.Second, func(c gorums.Configuration) gorums.Configuration {
mu.Lock()
qcErr := lastQCErr
mu.Unlock()
return c.WithoutErrors(qcErr).SortBy(gorums.Latency)[:quorumSize]
})
```

The zero value of `gorums.QuorumCallError` carries no node errors, so the
initial derive call behaves identically to the pure-latency example above.

Comment thread
meling marked this conversation as resolved.
### Measurement Limits

`Node.Latency()` has several limits that are worth understanding before building
on it:

* **No traffic → no measurement.** The estimate is only updated on successful
responses. A node that has never received a response returns a negative value.
`SortBy(gorums.Latency)` pushes such nodes to the end of the slice, so you
will not accidentally pick an unmeasured node when slicing the front.

* **Staleness.** If traffic to a node stops, the estimate holds its last
observed value indefinitely. A node that appeared fast previously will
continue to look fast until new responses arrive and update the average,
even if network conditions have since changed.

* **Slow convergence.** The moving average uses a smoothing factor of 0.2, so
a sudden step-change in latency takes roughly five round trips to be reflected
in the estimate. Short-lived spikes are smoothed away, which is usually
desirable but means the estimate lags behind rapid fluctuations.

* **RPC compute time included.** The measurement covers the full round-trip
from sending the request to receiving the response. For long-running RPCs,
server-side processing time is included in the estimate, which means the
measurement reflects more than just network latency. In workloads dominated
by heavy RPCs, this can skew node selection toward nodes with lighter server
load rather than nodes that are genuinely closer on the network.

* **Latency is per node, not per RPC.** The estimate is a single value
maintained for each node, shared across all RPC types sent to that node.
It reflects the node's overall responsiveness rather than the latency of
any particular operation, making it a blunt instrument for workloads where
different RPCs have meaningfully different latency profiles.

* **Multicast and unicast calls are not measured.** Only calls that receive
replies update the latency estimate. Multicast calls (fire-and-forget) and
unicast calls with no response do not contribute measurements, so nodes
contacted exclusively via these call types will remain unmeasured.

* **No variance information.** A single average cannot distinguish a stable
low-latency node from a high-variance node whose average happens to look good.
If jitter matters for your workload, the built-in estimate is not sufficient
on its own.

### Best Practices

The limitations above can be mitigated with a few straightforward patterns:

* **Keep traffic flowing.** Latency estimates are only updated when RPCs
complete successfully. If your application has quiet periods, the estimates
for idle nodes will become stale. Scheduling periodic lightweight RPCs
(such as a ping or a no-op read) ensures that estimates remain fresh even
when there is no organic traffic.

* **Quorum calls contact all nodes.** Latency-based ordering only improves
performance when you slice the sorted configuration to a smaller subset.
Passing a full-size sorted configuration to a quorum call provides no speedup
because every node in the configuration is contacted regardless of order.

## Interactive REPL

The storage example (`examples/storage`) includes an interactive Read-Eval-Print Loop (REPL) that lets you send RPCs and quorum calls against live storage servers.
Expand Down
Loading
Loading