diff --git a/.vscode/gorums.txt b/.vscode/gorums.txt index 54702d14..4eea5bce 100644 --- a/.vscode/gorums.txt +++ b/.vscode/gorums.txt @@ -63,6 +63,7 @@ memprofile mktemp multicast multicasts +myaddr naddr netrwhist nofoo diff --git a/doc/user-guide.md b/doc/user-guide.md index d3e9f68f..7e9ab946 100644 --- a/doc/user-guide.md +++ b/doc/user-guide.md @@ -1217,6 +1217,61 @@ log.Println("quorum ready, starting to serve") The self-node is always present in `cfg`, so a three-node cluster (`quorumSize = 2`) will fire the signal as soon as a single remote peer connects. +## Waiting for Configuration + +`System.WaitForConfig` and `System.WaitForClientConfig` block until a condition on the configuration is satisfied, or until the context is cancelled or the system is stopped. +They replace the need to poll `Config()` in a loop and eliminate the latency and CPU overhead of polling. + +```go +// Block until all three known peers are connected. +ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) +defer cancel() +if err := sys.WaitForConfig(ctx, func(cfg gorums.Configuration) bool { + return cfg.Size() == 3 +}); err != nil { + log.Fatal("peers did not connect in time:", err) +} +``` + +The condition is checked immediately against the current configuration, so the call returns without blocking if the condition is already satisfied. + +### WaitForConfig + +`WaitForConfig` waits on the known-peer configuration — the set of pre-configured peers that have connected, plus the local node itself. +Use this when you need a quorum of static cluster members to be present before beginning to serve requests. + +```go +err := sys.WaitForConfig(ctx, func(cfg gorums.Configuration) bool { + return cfg.Size() >= quorumSize +}) +``` + +### WaitForClientConfig + +`WaitForClientConfig` waits on the client-peer configuration — the set of anonymous clients that have connected dynamically and are reachable for reverse-direction calls. +Use this when a server should not proceed until a minimum number of clients have registered. + +```go +err := sys.WaitForClientConfig(ctx, func(cfg gorums.Configuration) bool { + return cfg.Size() >= expectedClients +}) +``` + +### Return values + +| Condition | Return value | +| --------------------------------- | ------------------- | +| `cond` returns `true` | `nil` | +| `ctx` is cancelled or times out | `ctx.Err()` | +| `sys.Stop()` called before `cond` | `gorums.ErrStopped` | + +### Relationship to `onChange` + +`WaitForConfig` and the `onChange` callback (see [WithConfig onChange Callback](#withconfig-onchange-callback)) serve complementary purposes. +`onChange` is suited for reactive work that must happen synchronously on every configuration change — for example, triggering a leader election or updating an atomic counter. +`WaitForConfig` is suited for startup synchronization — blocking until the cluster reaches a desired state before the application begins normal operation. +Unlike `onChange`, `WaitForConfig` composes naturally with `context.WithTimeout` and `context.WithCancel`. + ## Error Handling Gorums provides structured error types to help you understand and handle failures in quorum calls. diff --git a/errors.go b/errors.go index bb6794a8..704a5778 100644 --- a/errors.go +++ b/errors.go @@ -22,6 +22,10 @@ var ErrTypeMismatch = stream.ErrTypeMismatch // This allows the response iterator to account for all nodes without blocking. var ErrSkipNode = errors.New("skip node") +// ErrStopped is returned by [System.WaitForConfig] and [System.WaitForClientConfig] +// when the system is stopped before the condition is met. +var ErrStopped = errors.New("system stopped") + // QuorumCallError reports on a failed quorum call. // It provides detailed information about which nodes failed. type QuorumCallError struct { diff --git a/examples/Makefile b/examples/Makefile index a27c3352..2a4237ce 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -4,10 +4,15 @@ proto_go := $(proto_src:%.proto=%.pb.go) gorums_go := $(proto_src:%.proto=%_gorums.pb.go) binaries := storage/storage -.PHONY: all debug proto download tools $(binaries) +.PHONY: all debug proto download tools start $(binaries) all: $(binaries) +# Start a three-node local storage cluster for manual testing. +# Starts three servers on localhost:9081-9083. Press Ctrl-C to stop all servers. +start: storage/storage + @storage/storage -cluster -addrs localhost:9081,localhost:9082,localhost:9083 + # Build binaries in debug mode: Tell the Go compiler to compile without # optimizations (-N) or inlining (-l) so that debuggers, stack traces, # and variable inspection behave exactly like the source code. diff --git a/examples/storage/client.go b/examples/storage/client.go index 5de6de12..0314e6f6 100644 --- a/examples/storage/client.go +++ b/examples/storage/client.go @@ -1,7 +1,7 @@ package main import ( - "log" + "fmt" "github.com/relab/gorums" "github.com/relab/gorums/examples/storage/proto" @@ -11,15 +11,14 @@ import ( func runClient(addresses []string) error { if len(addresses) < 1 { - log.Fatalln("No addresses provided!") + return fmt.Errorf("no server addresses provided") } - cfg, err := gorums.NewConfig(gorums.WithNodeList(addresses), - gorums.WithDialOptions( - grpc.WithTransportCredentials(insecure.NewCredentials()), // disable TLS - ), + cfg, err := gorums.NewConfig( + gorums.WithNodeList(addresses), + gorums.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())), ) if err != nil { - log.Fatal(err) + return err } defer cfg.Close() return Repl(cfg) diff --git a/examples/storage/cluster.go b/examples/storage/cluster.go new file mode 100644 index 00000000..a3654e72 --- /dev/null +++ b/examples/storage/cluster.go @@ -0,0 +1,70 @@ +package main + +import ( + "fmt" + "os" + "os/exec" + "os/signal" + "strings" + "syscall" +) + +// runCluster spawns one server subprocess per address in the comma-separated +// addrs string, prints connection instructions, and waits for a signal before +// stopping all subprocesses. The ic string is the raw -interceptors flag value, +// forwarded to each subprocess unchanged. +func runCluster(addrs string, ic string) error { + all := splitAddrs(addrs) + if len(all) == 0 { + return fmt.Errorf("no addresses provided") + } + exe, err := os.Executable() + if err != nil { + return fmt.Errorf("could not determine executable path: %w", err) + } + + cmds := make([]*exec.Cmd, len(all)) + for i, addr := range all { + // Put this server's address first, then the remaining peers. + nodeAddrs := append([]string{addr}, append(all[:i:i], all[i+1:]...)...) + nodeArgs := strings.Join(nodeAddrs, ",") + args := []string{"-serve", "-addrs", nodeArgs} + if ic != "" { + args = append(args, "-interceptors", ic) + } + cmd := exec.Command(exe, args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if err := cmd.Start(); err != nil { + stopAll(cmds[:i]) + return fmt.Errorf("failed to start server %q: %w", addr, err) + } + cmds[i] = cmd + } + + fmt.Printf("\nCluster running on %s.\n", strings.Join(all, ", ")) + fmt.Printf("Connect a client with:\n %s -addrs %s\n\n", exe, addrs) + fmt.Println("Press Ctrl-C to stop all servers.") + + signals := make(chan os.Signal, 1) + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) + <-signals + + fmt.Fprintln(os.Stderr, "\nStopping servers...") + stopAll(cmds) + return nil +} + +// stopAll sends SIGTERM to all running processes and waits for them to exit. +func stopAll(cmds []*exec.Cmd) { + for _, cmd := range cmds { + if cmd != nil && cmd.Process != nil { + _ = cmd.Process.Signal(syscall.SIGTERM) + } + } + for _, cmd := range cmds { + if cmd != nil { + _ = cmd.Wait() + } + } +} diff --git a/examples/storage/main.go b/examples/storage/main.go index 39595c3d..5639a2f4 100644 --- a/examples/storage/main.go +++ b/examples/storage/main.go @@ -2,64 +2,70 @@ package main import ( "flag" - "fmt" "log" - "os" "strings" "github.com/relab/gorums" "github.com/relab/gorums/examples/interceptors" pb "github.com/relab/gorums/examples/storage/proto" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) func main() { - server := flag.String("server", "", "Start as a server on given address.") - remotes := flag.String("connect", "", "Comma-separated list of servers to connect to.") + cluster := flag.Bool("cluster", false, "Spawn a server process for each address in -addrs.") + serve := flag.Bool("serve", false, "Run as a single server node on addrs[0]; the remaining addresses are peers.") + addrs := flag.String("addrs", "", "Comma-separated list of server addresses.") ic := flag.String("interceptors", "", "Comma-separated list of interceptors to enable (logging, nofoo, metadata, delayed).") flag.Parse() - srvOpts := parseInterceptors(*ic) + if *cluster { + if *addrs == "" { + log.Fatal("Usage: storage -cluster -addrs [,...]") + } + if err := runCluster(*addrs, *ic); err != nil { + log.Fatal(err) + } + return + } - if *server != "" { - runServer(*server, srvOpts) + all := splitAddrs(*addrs) + if *serve { + if len(all) == 0 { + log.Fatal("Usage: storage -serve -addrs [,...]") + } + if err := runServer(all[0], all, parseInterceptors(*ic)); err != nil { + log.Fatal(err) + } return } - addrs := strings.Split(*remotes, ",") - // start local servers if no remote servers were specified - if len(addrs) == 1 && addrs[0] == "" { - // NewLocalSystems pre-allocates all listeners and configures each system - // with WithConfig (node IDs 1..n). Passing dial options auto-creates an - // outbound Configuration for each system (accessible via sys.OutboundConfig). - dialOpts := gorums.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())) - systems, stop, err := gorums.NewLocalSystems(4, gorums.WithServerOptions(srvOpts), dialOpts) + // Client mode: connect to the provided addresses, or spin up a local cluster (same process). + if len(all) == 0 { + clusterAddrs, stop, err := runLocalCluster(parseInterceptors(*ic)) if err != nil { - log.Fatalf("Failed to create local systems: %v", err) + log.Fatal(err) } defer stop() - - addrs = make([]string, len(systems)) - for i, sys := range systems { - addrs[i] = sys.Addr() - } - for i, sys := range systems { - storage := newStorageServer(os.Stderr, fmt.Sprintf("node %d", i)) - sys.RegisterService(nil, func(srv *gorums.Server) { - pb.RegisterStorageServer(srv, storage) - }) - go func() { - if err := sys.Serve(); err != nil { - log.Printf("Server error: %v", err) - } - }() - } + all = clusterAddrs } + if err := runClient(all); err != nil { + log.Fatal(err) + } +} - if runClient(addrs) != nil { - os.Exit(1) +// splitAddrs splits a comma-separated address string, trimming spaces. +// Returns nil if s is empty. +func splitAddrs(s string) []string { + if s == "" { + return nil + } + var addrs []string + for addr := range strings.SplitSeq(s, ",") { + trimmed := strings.TrimSpace(addr) + if trimmed != "" { + addrs = append(addrs, trimmed) + } } + return addrs } // parseInterceptors converts a comma-separated interceptor list into server options. diff --git a/examples/storage/server.go b/examples/storage/server.go index 66aaf0ed..b60e10f8 100644 --- a/examples/storage/server.go +++ b/examples/storage/server.go @@ -2,44 +2,124 @@ package main import ( "bytes" + "context" "fmt" "io" "log" "os" "os/signal" + "slices" "sync" "syscall" "time" "github.com/relab/gorums" pb "github.com/relab/gorums/examples/storage/proto" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/types/known/timestamppb" ) -func runServer(address string, srvOpt gorums.ServerOption) { - sys, err := gorums.NewSystem(address, gorums.WithServerOptions(srvOpt)) +// runServer starts a storage server. peers[0] is the address to listen on; +// the remaining entries are the other cluster peers. The server waits until +// all peers have connected before logging the ready message. +func runServer(address string, peers []string, srvOpt gorums.ServerOption) error { + if len(peers) == 0 { + return fmt.Errorf("no peer addresses provided") + } + myID, peerList, err := peerConfig(address, peers) if err != nil { - log.Fatalf("Failed to create system on '%s': %v", address, err) + return err + } + sys, err := gorums.NewSystem(address, + gorums.WithServerOptions(srvOpt, gorums.WithConfig(myID, peerList)), + gorums.WithOutboundNodes(peerList), + gorums.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())), + ) + if err != nil { + return fmt.Errorf("failed to create system on %q: %w", address, err) } - storage := newStorageServer(os.Stderr, sys.Addr()) - sys.RegisterService(nil, func(srv *gorums.Server) { - pb.RegisterStorageServer(srv, storage) - }) // catch signals in order to shut down gracefully signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) + registerAndServe(sys, myID) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := sys.WaitForConfig(ctx, func(cfg gorums.Configuration) bool { + return cfg.Size() == len(peers) + }); err != nil { + return fmt.Errorf("peers did not connect in time: %w", err) + } + + log.Printf("Started storage server on %s\n", sys.Addr()) + + <-signals + return sys.Stop() +} + +// runLocalCluster starts four in-process servers for local testing. +// It returns the server addresses and a stop function. The caller must +// call stop when the cluster is no longer needed. +func runLocalCluster(srvOpts gorums.ServerOption) ([]string, func(), error) { + dialOpts := gorums.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())) + systems, stop, err := gorums.NewLocalSystems(4, gorums.WithServerOptions(srvOpts), dialOpts) + if err != nil { + return nil, nil, fmt.Errorf("failed to create local systems: %w", err) + } + + addrs := make([]string, len(systems)) + for i, sys := range systems { + addrs[i] = sys.Addr() + registerAndServe(sys, uint32(i+1)) + } + + // Wait for all systems to see each other before opening the client REPL. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + for _, sys := range systems { + if err := sys.WaitForConfig(ctx, func(cfg gorums.Configuration) bool { + return cfg.Size() == len(systems) + }); err != nil { + stop() + return nil, nil, fmt.Errorf("cluster failed to connect: %w", err) + } + } + + return addrs, stop, nil +} + +// peerConfig returns the node ID and peer list for a server with the given +// address and peer addresses. The node ID is assumed to follow the ordering +// semantics of WithNodeList, where each node's ID is the 1-based index of +// its address in the sorted peer list. Sorting ensures deterministic node ID +// assignment regardless of the order of addresses in the input list. +// The server's own address must be included in the peer list. +// It returns an error if the server's address is not found in the peer list. +func peerConfig(address string, peers []string) (uint32, gorums.NodeListOption, error) { + sorted := slices.Clone(peers) + slices.Sort(sorted) + idx := slices.Index(sorted, address) + if idx < 0 { + return 0, nil, fmt.Errorf("server address %q not found in -addrs list", address) + } + return uint32(idx + 1), gorums.WithNodeList(sorted), nil +} + +// registerAndServe registers the storage service on sys and starts serving in +// a background goroutine. The server log output is labelled with the node ID. +func registerAndServe(sys *gorums.System, id uint32) { + storage := newStorageServer(os.Stderr, fmt.Sprintf("node %d", id)) + sys.RegisterService(nil, func(srv *gorums.Server) { + pb.RegisterStorageServer(srv, storage) + }) go func() { if err := sys.Serve(); err != nil { log.Printf("Server error: %v", err) } }() - - log.Printf("Started storage server on %s\n", sys.Addr()) - - <-signals - _ = sys.Stop() } type state struct { diff --git a/inbound_manager.go b/inbound_manager.go index 44135ce4..34a36fcb 100644 --- a/inbound_manager.go +++ b/inbound_manager.go @@ -78,6 +78,9 @@ type inboundManager struct { handler stream.RequestHandler // handler for dispatching incoming requests on all inbound nodes onConfigChange func(Configuration) // optional; called after each known-peer config change nextClientID uint32 // next ID to assign to a client peer + configCh chan struct{} // closed and replaced on each config/clientConfig change; protected by mu + stopCh chan struct{} // closed on shutdown to unblock waiters; never replaced + stopOnce sync.Once // ensures stopCh is closed exactly once } // clientIDStart is the starting ID for dynamically assigned client peers. @@ -103,6 +106,8 @@ func newInboundManager(myID uint32, opt NodeListOption, sendBuffer uint, onConfi handler: handler, onConfigChange: onConfigChange, nextClientID: clientIDStart, + configCh: make(chan struct{}), + stopCh: make(chan struct{}), } if opt != nil { if _, err := opt.newConfig(im); err != nil { @@ -312,6 +317,65 @@ func (im *inboundManager) rebuildConfig() { if cfgChanged && im.onConfigChange != nil { im.onConfigChange(cfg) } + // Broadcast config change to all waiters. + close(im.configCh) + im.configCh = make(chan struct{}) +} + +// checkConfig checks the condition under the read lock and returns the +// current broadcast channel if the condition is not yet met. +func (im *inboundManager) checkConfig(cond func() bool) (met bool, ch <-chan struct{}) { + im.mu.RLock() + defer im.mu.RUnlock() + if cond() { + return true, nil + } + return false, im.configCh +} + +// waitForConfig blocks until cond returns true or until ctx is cancelled +// or the inboundManager is closed. The cond function is called while the +// read lock is held, so it must not acquire any additional locks. +func (im *inboundManager) waitForConfig(ctx context.Context, cond func() bool) error { + for { + met, ch := im.checkConfig(cond) + if met { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-im.stopCh: + return ErrStopped + case <-ch: + } + } +} + +// waitForKnownConfig blocks until cond returns true for the current known-peer +// [Configuration], or until ctx is cancelled or the server is stopped. +// The cond function receives the current known-peer configuration and must not +// acquire any additional locks. +func (im *inboundManager) waitForKnownConfig(ctx context.Context, cond func(Configuration) bool) error { + return im.waitForConfig(ctx, func() bool { + return cond(im.config) + }) +} + +// waitForClientConfig blocks until cond returns true for the current client-peer +// [Configuration], or until ctx is cancelled or the server is stopped. +// The cond function receives the current client-peer configuration and must not +// acquire any additional locks. +func (im *inboundManager) waitForClientConfig(ctx context.Context, cond func(Configuration) bool) error { + return im.waitForConfig(ctx, func() bool { + return cond(im.clientConfig) + }) +} + +// close signals all waiters to stop and prevents new waits from blocking. +// Called from [System.Stop]. +func (im *inboundManager) close() { + im.stopOnce.Do(func() { close(im.stopCh) }) } // nilPeerNode implements [stream.PeerNode] for regular clients that have no diff --git a/inbound_manager_test.go b/inbound_manager_test.go index ae4b63e8..85dfbe2a 100644 --- a/inbound_manager_test.go +++ b/inbound_manager_test.go @@ -525,6 +525,28 @@ func TestOnConfigChangeCallbackIdempotentCleanup(t *testing.T) { } } +// mustWaitForConfig blocks until cond returns true for srv's known-peer +// Configuration, or fails the test after a 2-second timeout. +func mustWaitForConfig(t *testing.T, srv *Server, cond func(Configuration) bool) { + t.Helper() + ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second) + defer cancel() + if err := srv.waitForKnownConfig(ctx, cond); err != nil { + t.Fatalf("waitForKnownConfig: %v", err) + } +} + +// mustWaitForClientConfig blocks until cond returns true for srv's client-peer +// Configuration, or fails the test after a 2-second timeout. +func mustWaitForClientConfig(t *testing.T, srv *Server, cond func(Configuration) bool) { + t.Helper() + ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second) + defer cancel() + if err := srv.waitForClientConfig(ctx, cond); err != nil { + t.Fatalf("waitForClientConfig: %v", err) + } +} + // testPeerServer creates a Server with WithPeers(1, peerNodes()), starts it // via TestServers, and returns the server and its addresses. func testPeerServer(t *testing.T) (*Server, []string) { @@ -577,7 +599,7 @@ func TestKnownPeerConnects(t *testing.T) { connectAsPeer(t, 2, addrs) - WaitForConfigCondition(t, srv.Config, equalNodeIDs([]uint32{1, 2})) + mustWaitForConfig(t, srv, equalNodeIDs([]uint32{1, 2})) checkIDs(t, srv.Config(), []uint32{1, 2}, "after connect") } @@ -588,14 +610,14 @@ func TestKnownPeerDisconnects(t *testing.T) { srv, addrs := testPeerServer(t) cfg := connectAsPeer(t, 2, addrs) - WaitForConfigCondition(t, srv.Config, equalNodeIDs([]uint32{1, 2})) + mustWaitForConfig(t, srv, equalNodeIDs([]uint32{1, 2})) // Close the configuration to trigger disconnect; Close is idempotent so // t.Cleanup (registered by connectAsPeer) is harmless. if err := cfg.Close(); err != nil { t.Fatalf("cfg.Close() error: %v", err) } - WaitForConfigCondition(t, srv.Config, equalNodeIDs([]uint32{1})) + mustWaitForConfig(t, srv, equalNodeIDs([]uint32{1})) checkIDs(t, srv.Config(), []uint32{1}, "after disconnect") } @@ -639,7 +661,7 @@ func TestKnownPeerServerCallsClient(t *testing.T) { t.Cleanup(Closer(t, cfg)) // Wait for the peer to appear in the inbound config. - WaitForConfigCondition(t, srv.Config, equalNodeIDs([]uint32{1, 2})) + mustWaitForConfig(t, srv, equalNodeIDs([]uint32{1, 2})) // Server sends a request to the client via the inbound node. inboundCfg := srv.Config() @@ -729,7 +751,7 @@ func TestClientConfigConnects(t *testing.T) { connectAsPeerClient(t, addrs) // Client peer should appear with auto-assigned ID >= clientIDStart. - WaitForConfigCondition(t, srv.ClientConfig, func(cfg Configuration) bool { return len(cfg) > 0 }) + mustWaitForClientConfig(t, srv, func(cfg Configuration) bool { return len(cfg) > 0 }) cfg := srv.ClientConfig() if len(cfg) != 1 { t.Fatalf("ClientConfig has %d nodes; want 1", len(cfg)) @@ -747,7 +769,7 @@ func TestClientConfigDisconnects(t *testing.T) { cfg := connectAsPeerClient(t, addrs) // Wait for the client peer to appear. - WaitForConfigCondition(t, srv.ClientConfig, func(cfg Configuration) bool { return len(cfg) > 0 }) + mustWaitForClientConfig(t, srv, func(cfg Configuration) bool { return len(cfg) > 0 }) if len(srv.ClientConfig()) != 1 { t.Fatalf("ClientConfig has %d nodes; want 1", len(srv.ClientConfig())) } @@ -758,7 +780,7 @@ func TestClientConfigDisconnects(t *testing.T) { } // Wait for config to become empty. - WaitForConfigCondition(t, srv.ClientConfig, func(cfg Configuration) bool { return len(cfg) == 0 }) + mustWaitForClientConfig(t, srv, func(cfg Configuration) bool { return len(cfg) == 0 }) checkIDs(t, srv.ClientConfig(), []uint32{}, "after disconnect") } @@ -772,13 +794,13 @@ func TestClientConfigMixedMode(t *testing.T) { // Connect known peer (ID 2). connectAsPeer(t, 2, addrs) - WaitForConfigCondition(t, srv.Config, equalNodeIDs([]uint32{1, 2})) + mustWaitForConfig(t, srv, equalNodeIDs([]uint32{1, 2})) // Connect peer-capable anonymous client (dynamic peer). connectAsPeerClient(t, addrs) // Wait for 1 dynamic node. - WaitForConfigCondition(t, srv.ClientConfig, func(cfg Configuration) bool { return len(cfg) == 1 }) + mustWaitForClientConfig(t, srv, func(cfg Configuration) bool { return len(cfg) == 1 }) dynCfg := srv.ClientConfig() if len(dynCfg) != 1 { t.Fatalf("ClientConfig has %d nodes; want 1", len(dynCfg)) @@ -825,7 +847,7 @@ func TestClientConfigServerCallsClient(t *testing.T) { t.Cleanup(Closer(t, clientConfig)) // Wait for the client to appear in the server's ClientConfig. - WaitForConfigCondition(t, srv.ClientConfig, func(cfg Configuration) bool { return len(cfg) > 0 }) + mustWaitForClientConfig(t, srv, func(cfg Configuration) bool { return len(cfg) > 0 }) // Trigger: client multicasts TestMethod to the server; server fans it back via ClientConfig. ctx := TestContext(t, 2*time.Second) diff --git a/system.go b/system.go index ebd48add..287f1b6b 100644 --- a/system.go +++ b/system.go @@ -1,6 +1,7 @@ package gorums import ( + "context" "errors" "fmt" "io" @@ -156,6 +157,22 @@ func (s *System) ClientConfig() Configuration { return s.srv.ClientConfig() } +// WaitForConfig blocks until cond returns true for the current known-peer +// [Configuration], or until ctx is cancelled or the system is stopped. +// The condition is checked immediately against the current configuration, +// so it may return without blocking if the condition is already satisfied. +func (s *System) WaitForConfig(ctx context.Context, cond func(Configuration) bool) error { + return s.srv.waitForKnownConfig(ctx, cond) +} + +// WaitForClientConfig blocks until cond returns true for the current +// client-peer [Configuration], or until ctx is cancelled or the system is stopped. +// The condition is checked immediately against the current configuration, +// so it may return without blocking if the condition is already satisfied. +func (s *System) WaitForClientConfig(ctx context.Context, cond func(Configuration) bool) error { + return s.srv.waitForClientConfig(ctx, cond) +} + // RegisterService registers the service with the server using the provided register function. // The closer is added to the list of closers to be closed when the system is stopped. // @@ -184,6 +201,8 @@ func (s *System) Serve() error { // on the client side will get notified by connection errors. // It is safe to call Stop before [System.Serve] to avoid resource leaks. func (s *System) Stop() (errs error) { + // Unblock any WaitForConfig / WaitForClientConfig callers. + s.srv.close() // We cannot use graceful stop here since multicast methods does not // respond to the client, and thus would block indefinitely. s.srv.Stop() diff --git a/system_test.go b/system_test.go index 993e30a8..adcc38c5 100644 --- a/system_test.go +++ b/system_test.go @@ -145,9 +145,12 @@ func TestSystemSymmetricConfigurationConnectsAllPeers(t *testing.T) { // Wait for connections to establish for i, sys := range systems { - gorums.WaitForConfigCondition(t, sys.Config, func(cfg gorums.Configuration) bool { + ctx := gorums.TestContext(t, 5*time.Second) + if err := sys.WaitForConfig(ctx, func(cfg gorums.Configuration) bool { return cfg.Size() == len(systems) - }) + }); err != nil { + t.Fatalf("system %d: WaitForConfig: %v", i+1, err) + } if got := sys.Config().Size(); got != len(systems) { t.Fatalf("system %d config size: %d, expected: %d", i+1, got, len(systems)) } @@ -172,18 +175,24 @@ func waitWithTimeout(t *testing.T, wg *sync.WaitGroup) { func awaitSystemReady(t *testing.T, systems []*gorums.System) { t.Helper() for _, sys := range systems { - gorums.WaitForConfigCondition(t, sys.Config, func(cfg gorums.Configuration) bool { + ctx := gorums.TestContext(t, 5*time.Second) + if err := sys.WaitForConfig(ctx, func(cfg gorums.Configuration) bool { return cfg.Size() == len(systems) - }) + }); err != nil { + t.Fatalf("awaitSystemReady: %v", err) + } } } // awaitClientReady waits until the server's ClientConfig contains n connected peers. func awaitClientReady(t *testing.T, sys *gorums.System, n int) { t.Helper() - gorums.WaitForConfigCondition(t, sys.ClientConfig, func(cfg gorums.Configuration) bool { + ctx := gorums.TestContext(t, 5*time.Second) + if err := sys.WaitForClientConfig(ctx, func(cfg gorums.Configuration) bool { return cfg.Size() == n - }) + }); err != nil { + t.Fatalf("awaitClientReady: %v", err) + } } // createClientServerSystems creates a server system and a client for back-channel testing. @@ -746,3 +755,106 @@ func TestSystemLocalDispatchContentionSlowReplica(t *testing.T) { t.Errorf("All: got %q, want %q", result.GetValue(), "echo: all-call") } } + +func TestWaitForConfig(t *testing.T) { + t.Run("ConditionAlreadyMet", func(t *testing.T) { + systems := gorums.TestSystems(t, 3) + awaitSystemReady(t, systems) + + ctx := gorums.TestContext(t, 2*time.Second) + if err := systems[0].WaitForConfig(ctx, func(cfg gorums.Configuration) bool { + return cfg.Size() == 3 + }); err != nil { + t.Fatalf("WaitForConfig: %v", err) + } + }) + + t.Run("ConditionMetAfterConnect", func(t *testing.T) { + systems := gorums.TestSystems(t, 3) + + ctx := gorums.TestContext(t, 5*time.Second) + if err := systems[0].WaitForConfig(ctx, func(cfg gorums.Configuration) bool { + return cfg.Size() == 3 + }); err != nil { + t.Fatalf("WaitForConfig: %v", err) + } + }) + + t.Run("ContextCancelled", func(t *testing.T) { + sys, err := gorums.NewSystem("127.0.0.1:0") + if err != nil { + t.Fatalf("NewSystem: %v", err) + } + go func() { _ = sys.Serve() }() + t.Cleanup(func() { _ = sys.Stop() }) + + ctx, cancel := context.WithTimeout(t.Context(), 50*time.Millisecond) + defer cancel() + err = sys.WaitForConfig(ctx, func(cfg gorums.Configuration) bool { + return cfg.Size() == 3 // never true + }) + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("expected DeadlineExceeded, got: %v", err) + } + }) + + t.Run("SystemStopped", func(t *testing.T) { + sys, err := gorums.NewSystem("127.0.0.1:0") + if err != nil { + t.Fatalf("NewSystem: %v", err) + } + go func() { _ = sys.Serve() }() + + errCh := make(chan error, 1) + go func() { + errCh <- sys.WaitForConfig(context.Background(), func(cfg gorums.Configuration) bool { + return cfg.Size() == 3 // never true + }) + }() + + // Give WaitForConfig time to enter the select. + time.Sleep(20 * time.Millisecond) + _ = sys.Stop() + + select { + case err := <-errCh: + if !errors.Is(err, gorums.ErrStopped) { + t.Fatalf("expected ErrStopped, got: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("WaitForConfig did not return after Stop") + } + }) + + t.Run("ConcurrentWaiters", func(t *testing.T) { + systems := gorums.TestSystems(t, 3) + + const waiters = 5 + errCh := make(chan error, waiters) + for range waiters { + ctx := gorums.TestContext(t, 5*time.Second) + go func(ctx context.Context) { + errCh <- systems[0].WaitForConfig(ctx, func(cfg gorums.Configuration) bool { + return cfg.Size() == 3 + }) + }(ctx) + } + + for range waiters { + if err := <-errCh; err != nil { + t.Errorf("WaitForConfig: %v", err) + } + } + }) + + t.Run("ClientConfig", func(t *testing.T) { + sysServer, _, _ := createClientServerSystems(t) + + ctx := gorums.TestContext(t, 5*time.Second) + if err := sysServer.WaitForClientConfig(ctx, func(cfg gorums.Configuration) bool { + return cfg.Size() == 1 + }); err != nil { + t.Fatalf("WaitForClientConfig: %v", err) + } + }) +} diff --git a/testing_shared.go b/testing_shared.go index 346399b3..bfadd59d 100644 --- a/testing_shared.go +++ b/testing_shared.go @@ -48,19 +48,6 @@ func InsecureDialOptions(_ testing.TB) DialOption { ) } -// WaitForConfigCondition polls the config function until the condition cond returns true -// or the timeout elapses. This is useful for waiting on dynamic config updates. -func WaitForConfigCondition(t testing.TB, config func() Configuration, cond func(Configuration) bool) { - t.Helper() - deadline := time.Now().Add(2 * time.Second) - for time.Now().Before(deadline) { - if cond(config()) { - return - } - time.Sleep(5 * time.Millisecond) - } - t.Errorf("timeout waiting for config; got %v", config().NodeIDs()) -} // TestQuorumCallError creates a QuorumCallError for testing. // The nodeErrors map contains node IDs and their corresponding errors.