From 302c50571a3cf711a76345596f2e293326118970 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Sun, 12 Apr 2026 12:25:34 +0200 Subject: [PATCH 1/6] feat: add WaitForConfig and WaitForClientConfig to System MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This adds push-based notification for config checks using a close-and-replace channel pattern on inboundManager. A single configCh is closed and replaced on every rebuildConfig call, waking all waiters simultaneously. A separate stopCh (closed via sync.Once) unblocks waiters when the system stops. New API on System: WaitForConfig(ctx, cond) — waits on known-peer config WaitForClientConfig(ctx, cond) — waits on client-peer config Both return nil when cond is satisfied, ctx.Err() on cancellation, and ErrStopped when the system is stopped before the condition is met. The condition is checked immediately so they return without blocking if already satisfied. Add ErrStopped sentinel error to errors.go. Add six subtests under TestWaitForConfig covering: condition already met, condition met after connect, context cancelled, system stopped, concurrent waiters, and client config variant. Document WaitForConfig and WaitForClientConfig in the user guide. Fixes #323 --- doc/user-guide.md | 55 ++++++++++++++++++++++++ errors.go | 4 ++ inbound_manager.go | 44 +++++++++++++++++++ system.go | 23 ++++++++++ system_test.go | 103 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 229 insertions(+) diff --git a/doc/user-guide.md b/doc/user-guide.md index d3e9f68f..7e9ab946 100644 --- a/doc/user-guide.md +++ b/doc/user-guide.md @@ -1217,6 +1217,61 @@ log.Println("quorum ready, starting to serve") The self-node is always present in `cfg`, so a three-node cluster (`quorumSize = 2`) will fire the signal as soon as a single remote peer connects. +## Waiting for Configuration + +`System.WaitForConfig` and `System.WaitForClientConfig` block until a condition on the configuration is satisfied, or until the context is cancelled or the system is stopped. +They replace the need to poll `Config()` in a loop and eliminate the latency and CPU overhead of polling. + +```go +// Block until all three known peers are connected. +ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) +defer cancel() +if err := sys.WaitForConfig(ctx, func(cfg gorums.Configuration) bool { + return cfg.Size() == 3 +}); err != nil { + log.Fatal("peers did not connect in time:", err) +} +``` + +The condition is checked immediately against the current configuration, so the call returns without blocking if the condition is already satisfied. + +### WaitForConfig + +`WaitForConfig` waits on the known-peer configuration — the set of pre-configured peers that have connected, plus the local node itself. +Use this when you need a quorum of static cluster members to be present before beginning to serve requests. + +```go +err := sys.WaitForConfig(ctx, func(cfg gorums.Configuration) bool { + return cfg.Size() >= quorumSize +}) +``` + +### WaitForClientConfig + +`WaitForClientConfig` waits on the client-peer configuration — the set of anonymous clients that have connected dynamically and are reachable for reverse-direction calls. +Use this when a server should not proceed until a minimum number of clients have registered. + +```go +err := sys.WaitForClientConfig(ctx, func(cfg gorums.Configuration) bool { + return cfg.Size() >= expectedClients +}) +``` + +### Return values + +| Condition | Return value | +| --------------------------------- | ------------------- | +| `cond` returns `true` | `nil` | +| `ctx` is cancelled or times out | `ctx.Err()` | +| `sys.Stop()` called before `cond` | `gorums.ErrStopped` | + +### Relationship to `onChange` + +`WaitForConfig` and the `onChange` callback (see [WithConfig onChange Callback](#withconfig-onchange-callback)) serve complementary purposes. +`onChange` is suited for reactive work that must happen synchronously on every configuration change — for example, triggering a leader election or updating an atomic counter. +`WaitForConfig` is suited for startup synchronization — blocking until the cluster reaches a desired state before the application begins normal operation. +Unlike `onChange`, `WaitForConfig` composes naturally with `context.WithTimeout` and `context.WithCancel`. + ## Error Handling Gorums provides structured error types to help you understand and handle failures in quorum calls. diff --git a/errors.go b/errors.go index bb6794a8..704a5778 100644 --- a/errors.go +++ b/errors.go @@ -22,6 +22,10 @@ var ErrTypeMismatch = stream.ErrTypeMismatch // This allows the response iterator to account for all nodes without blocking. var ErrSkipNode = errors.New("skip node") +// ErrStopped is returned by [System.WaitForConfig] and [System.WaitForClientConfig] +// when the system is stopped before the condition is met. +var ErrStopped = errors.New("system stopped") + // QuorumCallError reports on a failed quorum call. // It provides detailed information about which nodes failed. type QuorumCallError struct { diff --git a/inbound_manager.go b/inbound_manager.go index 44135ce4..650acc65 100644 --- a/inbound_manager.go +++ b/inbound_manager.go @@ -78,6 +78,9 @@ type inboundManager struct { handler stream.RequestHandler // handler for dispatching incoming requests on all inbound nodes onConfigChange func(Configuration) // optional; called after each known-peer config change nextClientID uint32 // next ID to assign to a client peer + configCh chan struct{} // closed and replaced on each config/clientConfig change; protected by mu + stopCh chan struct{} // closed on shutdown to unblock waiters; never replaced + stopOnce sync.Once // ensures stopCh is closed exactly once } // clientIDStart is the starting ID for dynamically assigned client peers. @@ -103,6 +106,8 @@ func newInboundManager(myID uint32, opt NodeListOption, sendBuffer uint, onConfi handler: handler, onConfigChange: onConfigChange, nextClientID: clientIDStart, + configCh: make(chan struct{}), + stopCh: make(chan struct{}), } if opt != nil { if _, err := opt.newConfig(im); err != nil { @@ -312,6 +317,45 @@ func (im *inboundManager) rebuildConfig() { if cfgChanged && im.onConfigChange != nil { im.onConfigChange(cfg) } + // Broadcast config change to all waiters. + close(im.configCh) + im.configCh = make(chan struct{}) +} + +// checkConfig checks the condition under the read lock and returns the +// current broadcast channel if the condition is not yet met. +func (im *inboundManager) checkConfig(cond func() bool) (met bool, ch <-chan struct{}) { + im.mu.RLock() + defer im.mu.RUnlock() + if cond() { + return true, nil + } + return false, im.configCh +} + +// waitForConfig blocks until cond returns true or until ctx is cancelled +// or the inboundManager is closed. The cond function is called while the +// read lock is held, so it may safely read inboundManager fields directly. +func (im *inboundManager) waitForConfig(ctx context.Context, cond func() bool) error { + for { + met, ch := im.checkConfig(cond) + if met { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-im.stopCh: + return ErrStopped + case <-ch: + } + } +} + +// close signals all waiters to stop and prevents new waits from blocking. +// Called from [System.Stop]. +func (im *inboundManager) close() { + im.stopOnce.Do(func() { close(im.stopCh) }) } // nilPeerNode implements [stream.PeerNode] for regular clients that have no diff --git a/system.go b/system.go index ebd48add..d94b28d0 100644 --- a/system.go +++ b/system.go @@ -1,6 +1,7 @@ package gorums import ( + "context" "errors" "fmt" "io" @@ -156,6 +157,26 @@ func (s *System) ClientConfig() Configuration { return s.srv.ClientConfig() } +// WaitForConfig blocks until cond returns true for the current known-peer +// [Configuration], or until ctx is cancelled or the system is stopped. +// The condition is checked immediately against the current configuration, +// so it may return without blocking if the condition is already satisfied. +func (s *System) WaitForConfig(ctx context.Context, cond func(Configuration) bool) error { + return s.srv.waitForConfig(ctx, func() bool { + return cond(s.srv.config) + }) +} + +// WaitForClientConfig blocks until cond returns true for the current +// client-peer [Configuration], or until ctx is cancelled or the system is stopped. +// The condition is checked immediately against the current configuration, +// so it may return without blocking if the condition is already satisfied. +func (s *System) WaitForClientConfig(ctx context.Context, cond func(Configuration) bool) error { + return s.srv.waitForConfig(ctx, func() bool { + return cond(s.srv.clientConfig) + }) +} + // RegisterService registers the service with the server using the provided register function. // The closer is added to the list of closers to be closed when the system is stopped. // @@ -184,6 +205,8 @@ func (s *System) Serve() error { // on the client side will get notified by connection errors. // It is safe to call Stop before [System.Serve] to avoid resource leaks. func (s *System) Stop() (errs error) { + // Unblock any WaitForConfig / WaitForClientConfig callers. + s.srv.close() // We cannot use graceful stop here since multicast methods does not // respond to the client, and thus would block indefinitely. s.srv.Stop() diff --git a/system_test.go b/system_test.go index 993e30a8..19410f05 100644 --- a/system_test.go +++ b/system_test.go @@ -746,3 +746,106 @@ func TestSystemLocalDispatchContentionSlowReplica(t *testing.T) { t.Errorf("All: got %q, want %q", result.GetValue(), "echo: all-call") } } + +func TestWaitForConfig(t *testing.T) { + t.Run("ConditionAlreadyMet", func(t *testing.T) { + systems := gorums.TestSystems(t, 3) + awaitSystemReady(t, systems) + + ctx := gorums.TestContext(t, 2*time.Second) + if err := systems[0].WaitForConfig(ctx, func(cfg gorums.Configuration) bool { + return cfg.Size() == 3 + }); err != nil { + t.Fatalf("WaitForConfig: %v", err) + } + }) + + t.Run("ConditionMetAfterConnect", func(t *testing.T) { + systems := gorums.TestSystems(t, 3) + + ctx := gorums.TestContext(t, 5*time.Second) + if err := systems[0].WaitForConfig(ctx, func(cfg gorums.Configuration) bool { + return cfg.Size() == 3 + }); err != nil { + t.Fatalf("WaitForConfig: %v", err) + } + }) + + t.Run("ContextCancelled", func(t *testing.T) { + sys, err := gorums.NewSystem("127.0.0.1:0") + if err != nil { + t.Fatalf("NewSystem: %v", err) + } + go func() { _ = sys.Serve() }() + t.Cleanup(func() { _ = sys.Stop() }) + + ctx, cancel := context.WithTimeout(t.Context(), 50*time.Millisecond) + defer cancel() + err = sys.WaitForConfig(ctx, func(cfg gorums.Configuration) bool { + return cfg.Size() == 3 // never true + }) + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("expected DeadlineExceeded, got: %v", err) + } + }) + + t.Run("SystemStopped", func(t *testing.T) { + sys, err := gorums.NewSystem("127.0.0.1:0") + if err != nil { + t.Fatalf("NewSystem: %v", err) + } + go func() { _ = sys.Serve() }() + + errCh := make(chan error, 1) + go func() { + errCh <- sys.WaitForConfig(context.Background(), func(cfg gorums.Configuration) bool { + return cfg.Size() == 3 // never true + }) + }() + + // Give WaitForConfig time to enter the select. + time.Sleep(20 * time.Millisecond) + _ = sys.Stop() + + select { + case err := <-errCh: + if !errors.Is(err, gorums.ErrStopped) { + t.Fatalf("expected ErrStopped, got: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("WaitForConfig did not return after Stop") + } + }) + + t.Run("ConcurrentWaiters", func(t *testing.T) { + systems := gorums.TestSystems(t, 3) + + const waiters = 5 + errCh := make(chan error, waiters) + for range waiters { + go func() { + ctx := gorums.TestContext(t, 5*time.Second) + errCh <- systems[0].WaitForConfig(ctx, func(cfg gorums.Configuration) bool { + return cfg.Size() == 3 + }) + }() + } + + for range waiters { + if err := <-errCh; err != nil { + t.Errorf("WaitForConfig: %v", err) + } + } + }) + + t.Run("ClientConfig", func(t *testing.T) { + sysServer, _, _ := createClientServerSystems(t) + + ctx := gorums.TestContext(t, 5*time.Second) + if err := sysServer.WaitForClientConfig(ctx, func(cfg gorums.Configuration) bool { + return cfg.Size() == 1 + }); err != nil { + t.Fatalf("WaitForClientConfig: %v", err) + } + }) +} From 549cc3f650074852e1e281d537ead3a89589552a Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Sun, 12 Apr 2026 14:09:43 +0200 Subject: [PATCH 2/6] refactor: replace test helper WaitForConfigCondition with new wait helpers Remove the polling WaitForConfigCondition helper from testing_shared.go. Add waitForKnownConfig and waitForClientConfig to inboundManager, each accepting func(Configuration) bool matching the public System API shape. Simplify System.WaitForConfig and System.WaitForClientConfig to delegate directly to the new typed helpers instead of inlining raw field access. Add mustWaitForConfig and mustWaitForClientConfig test helpers in inbound_manager_test.go that call the typed methods, eliminating the locking-regiment leak where callers had to read internal fields directly. Replace all WaitForConfigCondition call sites in system_test.go with the public System.WaitForConfig / System.WaitForClientConfig API. This was mainly done to make sure that the new APIs work well also for tests. --- inbound_manager.go | 22 ++++++++++++++++++++- inbound_manager_test.go | 42 +++++++++++++++++++++++++++++++---------- system.go | 8 ++------ system_test.go | 21 +++++++++++++++------ testing_shared.go | 13 ------------- 5 files changed, 70 insertions(+), 36 deletions(-) diff --git a/inbound_manager.go b/inbound_manager.go index 650acc65..34a36fcb 100644 --- a/inbound_manager.go +++ b/inbound_manager.go @@ -335,7 +335,7 @@ func (im *inboundManager) checkConfig(cond func() bool) (met bool, ch <-chan str // waitForConfig blocks until cond returns true or until ctx is cancelled // or the inboundManager is closed. The cond function is called while the -// read lock is held, so it may safely read inboundManager fields directly. +// read lock is held, so it must not acquire any additional locks. func (im *inboundManager) waitForConfig(ctx context.Context, cond func() bool) error { for { met, ch := im.checkConfig(cond) @@ -352,6 +352,26 @@ func (im *inboundManager) waitForConfig(ctx context.Context, cond func() bool) e } } +// waitForKnownConfig blocks until cond returns true for the current known-peer +// [Configuration], or until ctx is cancelled or the server is stopped. +// The cond function receives the current known-peer configuration and must not +// acquire any additional locks. +func (im *inboundManager) waitForKnownConfig(ctx context.Context, cond func(Configuration) bool) error { + return im.waitForConfig(ctx, func() bool { + return cond(im.config) + }) +} + +// waitForClientConfig blocks until cond returns true for the current client-peer +// [Configuration], or until ctx is cancelled or the server is stopped. +// The cond function receives the current client-peer configuration and must not +// acquire any additional locks. +func (im *inboundManager) waitForClientConfig(ctx context.Context, cond func(Configuration) bool) error { + return im.waitForConfig(ctx, func() bool { + return cond(im.clientConfig) + }) +} + // close signals all waiters to stop and prevents new waits from blocking. // Called from [System.Stop]. func (im *inboundManager) close() { diff --git a/inbound_manager_test.go b/inbound_manager_test.go index ae4b63e8..85dfbe2a 100644 --- a/inbound_manager_test.go +++ b/inbound_manager_test.go @@ -525,6 +525,28 @@ func TestOnConfigChangeCallbackIdempotentCleanup(t *testing.T) { } } +// mustWaitForConfig blocks until cond returns true for srv's known-peer +// Configuration, or fails the test after a 2-second timeout. +func mustWaitForConfig(t *testing.T, srv *Server, cond func(Configuration) bool) { + t.Helper() + ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second) + defer cancel() + if err := srv.waitForKnownConfig(ctx, cond); err != nil { + t.Fatalf("waitForKnownConfig: %v", err) + } +} + +// mustWaitForClientConfig blocks until cond returns true for srv's client-peer +// Configuration, or fails the test after a 2-second timeout. +func mustWaitForClientConfig(t *testing.T, srv *Server, cond func(Configuration) bool) { + t.Helper() + ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second) + defer cancel() + if err := srv.waitForClientConfig(ctx, cond); err != nil { + t.Fatalf("waitForClientConfig: %v", err) + } +} + // testPeerServer creates a Server with WithPeers(1, peerNodes()), starts it // via TestServers, and returns the server and its addresses. func testPeerServer(t *testing.T) (*Server, []string) { @@ -577,7 +599,7 @@ func TestKnownPeerConnects(t *testing.T) { connectAsPeer(t, 2, addrs) - WaitForConfigCondition(t, srv.Config, equalNodeIDs([]uint32{1, 2})) + mustWaitForConfig(t, srv, equalNodeIDs([]uint32{1, 2})) checkIDs(t, srv.Config(), []uint32{1, 2}, "after connect") } @@ -588,14 +610,14 @@ func TestKnownPeerDisconnects(t *testing.T) { srv, addrs := testPeerServer(t) cfg := connectAsPeer(t, 2, addrs) - WaitForConfigCondition(t, srv.Config, equalNodeIDs([]uint32{1, 2})) + mustWaitForConfig(t, srv, equalNodeIDs([]uint32{1, 2})) // Close the configuration to trigger disconnect; Close is idempotent so // t.Cleanup (registered by connectAsPeer) is harmless. if err := cfg.Close(); err != nil { t.Fatalf("cfg.Close() error: %v", err) } - WaitForConfigCondition(t, srv.Config, equalNodeIDs([]uint32{1})) + mustWaitForConfig(t, srv, equalNodeIDs([]uint32{1})) checkIDs(t, srv.Config(), []uint32{1}, "after disconnect") } @@ -639,7 +661,7 @@ func TestKnownPeerServerCallsClient(t *testing.T) { t.Cleanup(Closer(t, cfg)) // Wait for the peer to appear in the inbound config. - WaitForConfigCondition(t, srv.Config, equalNodeIDs([]uint32{1, 2})) + mustWaitForConfig(t, srv, equalNodeIDs([]uint32{1, 2})) // Server sends a request to the client via the inbound node. inboundCfg := srv.Config() @@ -729,7 +751,7 @@ func TestClientConfigConnects(t *testing.T) { connectAsPeerClient(t, addrs) // Client peer should appear with auto-assigned ID >= clientIDStart. - WaitForConfigCondition(t, srv.ClientConfig, func(cfg Configuration) bool { return len(cfg) > 0 }) + mustWaitForClientConfig(t, srv, func(cfg Configuration) bool { return len(cfg) > 0 }) cfg := srv.ClientConfig() if len(cfg) != 1 { t.Fatalf("ClientConfig has %d nodes; want 1", len(cfg)) @@ -747,7 +769,7 @@ func TestClientConfigDisconnects(t *testing.T) { cfg := connectAsPeerClient(t, addrs) // Wait for the client peer to appear. - WaitForConfigCondition(t, srv.ClientConfig, func(cfg Configuration) bool { return len(cfg) > 0 }) + mustWaitForClientConfig(t, srv, func(cfg Configuration) bool { return len(cfg) > 0 }) if len(srv.ClientConfig()) != 1 { t.Fatalf("ClientConfig has %d nodes; want 1", len(srv.ClientConfig())) } @@ -758,7 +780,7 @@ func TestClientConfigDisconnects(t *testing.T) { } // Wait for config to become empty. - WaitForConfigCondition(t, srv.ClientConfig, func(cfg Configuration) bool { return len(cfg) == 0 }) + mustWaitForClientConfig(t, srv, func(cfg Configuration) bool { return len(cfg) == 0 }) checkIDs(t, srv.ClientConfig(), []uint32{}, "after disconnect") } @@ -772,13 +794,13 @@ func TestClientConfigMixedMode(t *testing.T) { // Connect known peer (ID 2). connectAsPeer(t, 2, addrs) - WaitForConfigCondition(t, srv.Config, equalNodeIDs([]uint32{1, 2})) + mustWaitForConfig(t, srv, equalNodeIDs([]uint32{1, 2})) // Connect peer-capable anonymous client (dynamic peer). connectAsPeerClient(t, addrs) // Wait for 1 dynamic node. - WaitForConfigCondition(t, srv.ClientConfig, func(cfg Configuration) bool { return len(cfg) == 1 }) + mustWaitForClientConfig(t, srv, func(cfg Configuration) bool { return len(cfg) == 1 }) dynCfg := srv.ClientConfig() if len(dynCfg) != 1 { t.Fatalf("ClientConfig has %d nodes; want 1", len(dynCfg)) @@ -825,7 +847,7 @@ func TestClientConfigServerCallsClient(t *testing.T) { t.Cleanup(Closer(t, clientConfig)) // Wait for the client to appear in the server's ClientConfig. - WaitForConfigCondition(t, srv.ClientConfig, func(cfg Configuration) bool { return len(cfg) > 0 }) + mustWaitForClientConfig(t, srv, func(cfg Configuration) bool { return len(cfg) > 0 }) // Trigger: client multicasts TestMethod to the server; server fans it back via ClientConfig. ctx := TestContext(t, 2*time.Second) diff --git a/system.go b/system.go index d94b28d0..287f1b6b 100644 --- a/system.go +++ b/system.go @@ -162,9 +162,7 @@ func (s *System) ClientConfig() Configuration { // The condition is checked immediately against the current configuration, // so it may return without blocking if the condition is already satisfied. func (s *System) WaitForConfig(ctx context.Context, cond func(Configuration) bool) error { - return s.srv.waitForConfig(ctx, func() bool { - return cond(s.srv.config) - }) + return s.srv.waitForKnownConfig(ctx, cond) } // WaitForClientConfig blocks until cond returns true for the current @@ -172,9 +170,7 @@ func (s *System) WaitForConfig(ctx context.Context, cond func(Configuration) boo // The condition is checked immediately against the current configuration, // so it may return without blocking if the condition is already satisfied. func (s *System) WaitForClientConfig(ctx context.Context, cond func(Configuration) bool) error { - return s.srv.waitForConfig(ctx, func() bool { - return cond(s.srv.clientConfig) - }) + return s.srv.waitForClientConfig(ctx, cond) } // RegisterService registers the service with the server using the provided register function. diff --git a/system_test.go b/system_test.go index 19410f05..bc75dee8 100644 --- a/system_test.go +++ b/system_test.go @@ -145,9 +145,12 @@ func TestSystemSymmetricConfigurationConnectsAllPeers(t *testing.T) { // Wait for connections to establish for i, sys := range systems { - gorums.WaitForConfigCondition(t, sys.Config, func(cfg gorums.Configuration) bool { + ctx := gorums.TestContext(t, 5*time.Second) + if err := sys.WaitForConfig(ctx, func(cfg gorums.Configuration) bool { return cfg.Size() == len(systems) - }) + }); err != nil { + t.Fatalf("system %d: WaitForConfig: %v", i+1, err) + } if got := sys.Config().Size(); got != len(systems) { t.Fatalf("system %d config size: %d, expected: %d", i+1, got, len(systems)) } @@ -172,18 +175,24 @@ func waitWithTimeout(t *testing.T, wg *sync.WaitGroup) { func awaitSystemReady(t *testing.T, systems []*gorums.System) { t.Helper() for _, sys := range systems { - gorums.WaitForConfigCondition(t, sys.Config, func(cfg gorums.Configuration) bool { + ctx := gorums.TestContext(t, 5*time.Second) + if err := sys.WaitForConfig(ctx, func(cfg gorums.Configuration) bool { return cfg.Size() == len(systems) - }) + }); err != nil { + t.Fatalf("awaitSystemReady: %v", err) + } } } // awaitClientReady waits until the server's ClientConfig contains n connected peers. func awaitClientReady(t *testing.T, sys *gorums.System, n int) { t.Helper() - gorums.WaitForConfigCondition(t, sys.ClientConfig, func(cfg gorums.Configuration) bool { + ctx := gorums.TestContext(t, 5*time.Second) + if err := sys.WaitForClientConfig(ctx, func(cfg gorums.Configuration) bool { return cfg.Size() == n - }) + }); err != nil { + t.Fatalf("awaitClientReady: %v", err) + } } // createClientServerSystems creates a server system and a client for back-channel testing. diff --git a/testing_shared.go b/testing_shared.go index 346399b3..bfadd59d 100644 --- a/testing_shared.go +++ b/testing_shared.go @@ -48,19 +48,6 @@ func InsecureDialOptions(_ testing.TB) DialOption { ) } -// WaitForConfigCondition polls the config function until the condition cond returns true -// or the timeout elapses. This is useful for waiting on dynamic config updates. -func WaitForConfigCondition(t testing.TB, config func() Configuration, cond func(Configuration) bool) { - t.Helper() - deadline := time.Now().Add(2 * time.Second) - for time.Now().Before(deadline) { - if cond(config()) { - return - } - time.Sleep(5 * time.Millisecond) - } - t.Errorf("timeout waiting for config; got %v", config().NodeIDs()) -} // TestQuorumCallError creates a QuorumCallError for testing. // The nodeErrors map contains node IDs and their corresponding errors. From 61b10262bad8cd951f512a8c77018c0327e6c829 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Sun, 12 Apr 2026 18:36:49 +0200 Subject: [PATCH 3/6] feat(examples/storage): add real-cluster mode using WaitForConfig Extend the storage example to demonstrate WaitForConfig in a real multi-process cluster scenario. This commit also moves Gorums specific code to server.go keeping main.go for scaffolding. Three modes are now supported from a single binary: - No flags: spins up an in-process four-node cluster (runLocalCluster) and opens the client REPL directly against it. - -cluster -addrs : spawns one server subprocess per address using os/exec, prints connection instructions, and stops all children on Ctrl-C. - -serve -addrs [,...]: runs as a single leaf-node server, used by -cluster internally or for manual deployments. Other cleanups: splitAddrs called once in main; runClient errors are returned instead of calling log.Fatal; helper functions peerConfig, registerAndServe, and runLocalCluster are extracted to keep runServer focused; Added Makefile start target to use the new -cluster flag. --- .vscode/gorums.txt | 1 + examples/Makefile | 7 ++- examples/storage/client.go | 13 +++-- examples/storage/cluster.go | 69 ++++++++++++++++++++++++ examples/storage/main.go | 75 +++++++++++++------------- examples/storage/server.go | 104 +++++++++++++++++++++++++++++++----- 6 files changed, 213 insertions(+), 56 deletions(-) create mode 100644 examples/storage/cluster.go diff --git a/.vscode/gorums.txt b/.vscode/gorums.txt index 54702d14..4eea5bce 100644 --- a/.vscode/gorums.txt +++ b/.vscode/gorums.txt @@ -63,6 +63,7 @@ memprofile mktemp multicast multicasts +myaddr naddr netrwhist nofoo diff --git a/examples/Makefile b/examples/Makefile index a27c3352..2a4237ce 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -4,10 +4,15 @@ proto_go := $(proto_src:%.proto=%.pb.go) gorums_go := $(proto_src:%.proto=%_gorums.pb.go) binaries := storage/storage -.PHONY: all debug proto download tools $(binaries) +.PHONY: all debug proto download tools start $(binaries) all: $(binaries) +# Start a three-node local storage cluster for manual testing. +# Starts three servers on localhost:9081-9083. Press Ctrl-C to stop all servers. +start: storage/storage + @storage/storage -cluster -addrs localhost:9081,localhost:9082,localhost:9083 + # Build binaries in debug mode: Tell the Go compiler to compile without # optimizations (-N) or inlining (-l) so that debuggers, stack traces, # and variable inspection behave exactly like the source code. diff --git a/examples/storage/client.go b/examples/storage/client.go index 5de6de12..0314e6f6 100644 --- a/examples/storage/client.go +++ b/examples/storage/client.go @@ -1,7 +1,7 @@ package main import ( - "log" + "fmt" "github.com/relab/gorums" "github.com/relab/gorums/examples/storage/proto" @@ -11,15 +11,14 @@ import ( func runClient(addresses []string) error { if len(addresses) < 1 { - log.Fatalln("No addresses provided!") + return fmt.Errorf("no server addresses provided") } - cfg, err := gorums.NewConfig(gorums.WithNodeList(addresses), - gorums.WithDialOptions( - grpc.WithTransportCredentials(insecure.NewCredentials()), // disable TLS - ), + cfg, err := gorums.NewConfig( + gorums.WithNodeList(addresses), + gorums.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())), ) if err != nil { - log.Fatal(err) + return err } defer cfg.Close() return Repl(cfg) diff --git a/examples/storage/cluster.go b/examples/storage/cluster.go new file mode 100644 index 00000000..d9231d62 --- /dev/null +++ b/examples/storage/cluster.go @@ -0,0 +1,69 @@ +package main + +import ( + "fmt" + "os" + "os/exec" + "os/signal" + "strings" + "syscall" +) + +// runCluster spawns one server subprocess per address in the comma-separated +// addrs string, prints connection instructions, and waits for a signal before +// stopping all subprocesses. The ic string is the raw -interceptors flag value, +// forwarded to each subprocess unchanged. +func runCluster(addrs string, ic string) error { + all := splitAddrs(addrs) + if len(all) == 0 { + return fmt.Errorf("no addresses provided") + } + exe, err := os.Executable() + if err != nil { + return fmt.Errorf("could not determine executable path: %w", err) + } + + cmds := make([]*exec.Cmd, len(all)) + for i, addr := range all { + // Put this server's address first, then the remaining peers. + nodeArgs := addr + "," + strings.Join(append(all[:i:i], all[i+1:]...), ",") + args := []string{"-serve", "-addrs", nodeArgs} + if ic != "" { + args = append(args, "-interceptors", ic) + } + cmd := exec.Command(exe, args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if err := cmd.Start(); err != nil { + stopAll(cmds[:i]) + return fmt.Errorf("failed to start server %q: %w", addr, err) + } + cmds[i] = cmd + } + + fmt.Printf("\nCluster running on %s.\n", strings.Join(all, ", ")) + fmt.Printf("Connect a client with:\n %s -addrs %s\n\n", exe, addrs) + fmt.Println("Press Ctrl-C to stop all servers.") + + signals := make(chan os.Signal, 1) + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) + <-signals + + fmt.Fprintln(os.Stderr, "\nStopping servers...") + stopAll(cmds) + return nil +} + +// stopAll sends SIGTERM to all running processes and waits for them to exit. +func stopAll(cmds []*exec.Cmd) { + for _, cmd := range cmds { + if cmd != nil && cmd.Process != nil { + _ = cmd.Process.Signal(syscall.SIGTERM) + } + } + for _, cmd := range cmds { + if cmd != nil { + _ = cmd.Wait() + } + } +} diff --git a/examples/storage/main.go b/examples/storage/main.go index 39595c3d..5bfcf97c 100644 --- a/examples/storage/main.go +++ b/examples/storage/main.go @@ -2,64 +2,67 @@ package main import ( "flag" - "fmt" "log" - "os" "strings" "github.com/relab/gorums" "github.com/relab/gorums/examples/interceptors" pb "github.com/relab/gorums/examples/storage/proto" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) func main() { - server := flag.String("server", "", "Start as a server on given address.") - remotes := flag.String("connect", "", "Comma-separated list of servers to connect to.") + cluster := flag.Bool("cluster", false, "Spawn a server process for each address in -addrs.") + serve := flag.Bool("serve", false, "Run as a single server node on addrs[0]; the remaining addresses are peers.") + addrs := flag.String("addrs", "", "Comma-separated list of server addresses.") ic := flag.String("interceptors", "", "Comma-separated list of interceptors to enable (logging, nofoo, metadata, delayed).") flag.Parse() - srvOpts := parseInterceptors(*ic) + if *cluster { + if *addrs == "" { + log.Fatal("Usage: storage -cluster -addrs [,...]") + } + if err := runCluster(*addrs, *ic); err != nil { + log.Fatal(err) + } + return + } - if *server != "" { - runServer(*server, srvOpts) + all := splitAddrs(*addrs) + if *serve { + if len(all) == 0 { + log.Fatal("Usage: storage -serve -addrs [,...]") + } + if err := runServer(all[0], all, parseInterceptors(*ic)); err != nil { + log.Fatal(err) + } return } - addrs := strings.Split(*remotes, ",") - // start local servers if no remote servers were specified - if len(addrs) == 1 && addrs[0] == "" { - // NewLocalSystems pre-allocates all listeners and configures each system - // with WithConfig (node IDs 1..n). Passing dial options auto-creates an - // outbound Configuration for each system (accessible via sys.OutboundConfig). - dialOpts := gorums.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())) - systems, stop, err := gorums.NewLocalSystems(4, gorums.WithServerOptions(srvOpts), dialOpts) + // Client mode: connect to the provided addresses, or spin up a local cluster (same process). + if len(all) == 0 { + clusterAddrs, stop, err := runLocalCluster(parseInterceptors(*ic)) if err != nil { - log.Fatalf("Failed to create local systems: %v", err) + log.Fatal(err) } defer stop() - - addrs = make([]string, len(systems)) - for i, sys := range systems { - addrs[i] = sys.Addr() - } - for i, sys := range systems { - storage := newStorageServer(os.Stderr, fmt.Sprintf("node %d", i)) - sys.RegisterService(nil, func(srv *gorums.Server) { - pb.RegisterStorageServer(srv, storage) - }) - go func() { - if err := sys.Serve(); err != nil { - log.Printf("Server error: %v", err) - } - }() - } + all = clusterAddrs + } + if err := runClient(all); err != nil { + log.Fatal(err) } +} - if runClient(addrs) != nil { - os.Exit(1) +// splitAddrs splits a comma-separated address string, trimming spaces. +// Returns nil if s is empty. +func splitAddrs(s string) []string { + if s == "" { + return nil + } + parts := strings.Split(s, ",") + for i, p := range parts { + parts[i] = strings.TrimSpace(p) } + return parts } // parseInterceptors converts a comma-separated interceptor list into server options. diff --git a/examples/storage/server.go b/examples/storage/server.go index 66aaf0ed..b60e10f8 100644 --- a/examples/storage/server.go +++ b/examples/storage/server.go @@ -2,44 +2,124 @@ package main import ( "bytes" + "context" "fmt" "io" "log" "os" "os/signal" + "slices" "sync" "syscall" "time" "github.com/relab/gorums" pb "github.com/relab/gorums/examples/storage/proto" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/types/known/timestamppb" ) -func runServer(address string, srvOpt gorums.ServerOption) { - sys, err := gorums.NewSystem(address, gorums.WithServerOptions(srvOpt)) +// runServer starts a storage server. peers[0] is the address to listen on; +// the remaining entries are the other cluster peers. The server waits until +// all peers have connected before logging the ready message. +func runServer(address string, peers []string, srvOpt gorums.ServerOption) error { + if len(peers) == 0 { + return fmt.Errorf("no peer addresses provided") + } + myID, peerList, err := peerConfig(address, peers) if err != nil { - log.Fatalf("Failed to create system on '%s': %v", address, err) + return err + } + sys, err := gorums.NewSystem(address, + gorums.WithServerOptions(srvOpt, gorums.WithConfig(myID, peerList)), + gorums.WithOutboundNodes(peerList), + gorums.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())), + ) + if err != nil { + return fmt.Errorf("failed to create system on %q: %w", address, err) } - storage := newStorageServer(os.Stderr, sys.Addr()) - sys.RegisterService(nil, func(srv *gorums.Server) { - pb.RegisterStorageServer(srv, storage) - }) // catch signals in order to shut down gracefully signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) + registerAndServe(sys, myID) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := sys.WaitForConfig(ctx, func(cfg gorums.Configuration) bool { + return cfg.Size() == len(peers) + }); err != nil { + return fmt.Errorf("peers did not connect in time: %w", err) + } + + log.Printf("Started storage server on %s\n", sys.Addr()) + + <-signals + return sys.Stop() +} + +// runLocalCluster starts four in-process servers for local testing. +// It returns the server addresses and a stop function. The caller must +// call stop when the cluster is no longer needed. +func runLocalCluster(srvOpts gorums.ServerOption) ([]string, func(), error) { + dialOpts := gorums.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())) + systems, stop, err := gorums.NewLocalSystems(4, gorums.WithServerOptions(srvOpts), dialOpts) + if err != nil { + return nil, nil, fmt.Errorf("failed to create local systems: %w", err) + } + + addrs := make([]string, len(systems)) + for i, sys := range systems { + addrs[i] = sys.Addr() + registerAndServe(sys, uint32(i+1)) + } + + // Wait for all systems to see each other before opening the client REPL. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + for _, sys := range systems { + if err := sys.WaitForConfig(ctx, func(cfg gorums.Configuration) bool { + return cfg.Size() == len(systems) + }); err != nil { + stop() + return nil, nil, fmt.Errorf("cluster failed to connect: %w", err) + } + } + + return addrs, stop, nil +} + +// peerConfig returns the node ID and peer list for a server with the given +// address and peer addresses. The node ID is assumed to follow the ordering +// semantics of WithNodeList, where each node's ID is the 1-based index of +// its address in the sorted peer list. Sorting ensures deterministic node ID +// assignment regardless of the order of addresses in the input list. +// The server's own address must be included in the peer list. +// It returns an error if the server's address is not found in the peer list. +func peerConfig(address string, peers []string) (uint32, gorums.NodeListOption, error) { + sorted := slices.Clone(peers) + slices.Sort(sorted) + idx := slices.Index(sorted, address) + if idx < 0 { + return 0, nil, fmt.Errorf("server address %q not found in -addrs list", address) + } + return uint32(idx + 1), gorums.WithNodeList(sorted), nil +} + +// registerAndServe registers the storage service on sys and starts serving in +// a background goroutine. The server log output is labelled with the node ID. +func registerAndServe(sys *gorums.System, id uint32) { + storage := newStorageServer(os.Stderr, fmt.Sprintf("node %d", id)) + sys.RegisterService(nil, func(srv *gorums.Server) { + pb.RegisterStorageServer(srv, storage) + }) go func() { if err := sys.Serve(); err != nil { log.Printf("Server error: %v", err) } }() - - log.Printf("Started storage server on %s\n", sys.Addr()) - - <-signals - _ = sys.Stop() } type state struct { From 49045371d928fac641fc1aea5590a13cb3ab0d94 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Sun, 12 Apr 2026 19:05:42 +0200 Subject: [PATCH 4/6] fix(test): pass context to gorums.WaitForConfig in TestWaitForConfig --- system_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/system_test.go b/system_test.go index bc75dee8..adcc38c5 100644 --- a/system_test.go +++ b/system_test.go @@ -832,12 +832,12 @@ func TestWaitForConfig(t *testing.T) { const waiters = 5 errCh := make(chan error, waiters) for range waiters { - go func() { - ctx := gorums.TestContext(t, 5*time.Second) + ctx := gorums.TestContext(t, 5*time.Second) + go func(ctx context.Context) { errCh <- systems[0].WaitForConfig(ctx, func(cfg gorums.Configuration) bool { return cfg.Size() == 3 }) - }() + }(ctx) } for range waiters { From 61508c900dcab0600aa06e8438b2bfceee2d961b Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Sun, 12 Apr 2026 19:09:05 +0200 Subject: [PATCH 5/6] refactor(runCluster): remove extra comma in nodeArgs construction --- examples/storage/cluster.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/storage/cluster.go b/examples/storage/cluster.go index d9231d62..a3654e72 100644 --- a/examples/storage/cluster.go +++ b/examples/storage/cluster.go @@ -26,7 +26,8 @@ func runCluster(addrs string, ic string) error { cmds := make([]*exec.Cmd, len(all)) for i, addr := range all { // Put this server's address first, then the remaining peers. - nodeArgs := addr + "," + strings.Join(append(all[:i:i], all[i+1:]...), ",") + nodeAddrs := append([]string{addr}, append(all[:i:i], all[i+1:]...)...) + nodeArgs := strings.Join(nodeAddrs, ",") args := []string{"-serve", "-addrs", nodeArgs} if ic != "" { args = append(args, "-interceptors", ic) From 1c919f594edfa747c4522734f3cd403be4dd342f Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Sun, 12 Apr 2026 19:16:04 +0200 Subject: [PATCH 6/6] refactor(splitAddrs): ignore empty entries --- examples/storage/main.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/examples/storage/main.go b/examples/storage/main.go index 5bfcf97c..5639a2f4 100644 --- a/examples/storage/main.go +++ b/examples/storage/main.go @@ -58,11 +58,14 @@ func splitAddrs(s string) []string { if s == "" { return nil } - parts := strings.Split(s, ",") - for i, p := range parts { - parts[i] = strings.TrimSpace(p) + var addrs []string + for addr := range strings.SplitSeq(s, ",") { + trimmed := strings.TrimSpace(addr) + if trimmed != "" { + addrs = append(addrs, trimmed) + } } - return parts + return addrs } // parseInterceptors converts a comma-separated interceptor list into server options.