Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/gorums.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ memprofile
mktemp
multicast
multicasts
myaddr
naddr
netrwhist
nofoo
Expand Down
55 changes: 55 additions & 0 deletions doc/user-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,61 @@ log.Println("quorum ready, starting to serve")

The self-node is always present in `cfg`, so a three-node cluster (`quorumSize = 2`) will fire the signal as soon as a single remote peer connects.

## Waiting for Configuration

`System.WaitForConfig` and `System.WaitForClientConfig` block until a condition on the configuration is satisfied, or until the context is cancelled or the system is stopped.
They replace the need to poll `Config()` in a loop and eliminate the latency and CPU overhead of polling.

```go
// Block until all three known peers are connected.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := sys.WaitForConfig(ctx, func(cfg gorums.Configuration) bool {
return cfg.Size() == 3
}); err != nil {
log.Fatal("peers did not connect in time:", err)
}
```

The condition is checked immediately against the current configuration, so the call returns without blocking if the condition is already satisfied.

### WaitForConfig

`WaitForConfig` waits on the known-peer configuration — the set of pre-configured peers that have connected, plus the local node itself.
Use this when you need a quorum of static cluster members to be present before beginning to serve requests.

```go
err := sys.WaitForConfig(ctx, func(cfg gorums.Configuration) bool {
return cfg.Size() >= quorumSize
})
```

### WaitForClientConfig

`WaitForClientConfig` waits on the client-peer configuration — the set of anonymous clients that have connected dynamically and are reachable for reverse-direction calls.
Use this when a server should not proceed until a minimum number of clients have registered.

```go
err := sys.WaitForClientConfig(ctx, func(cfg gorums.Configuration) bool {
return cfg.Size() >= expectedClients
})
```

### Return values

| Condition | Return value |
| --------------------------------- | ------------------- |
| `cond` returns `true` | `nil` |
| `ctx` is cancelled or times out | `ctx.Err()` |
| `sys.Stop()` called before `cond` | `gorums.ErrStopped` |

### Relationship to `onChange`

`WaitForConfig` and the `onChange` callback (see [WithConfig onChange Callback](#withconfig-onchange-callback)) serve complementary purposes.
`onChange` is suited for reactive work that must happen synchronously on every configuration change — for example, triggering a leader election or updating an atomic counter.
`WaitForConfig` is suited for startup synchronization — blocking until the cluster reaches a desired state before the application begins normal operation.
Unlike `onChange`, `WaitForConfig` composes naturally with `context.WithTimeout` and `context.WithCancel`.

## Error Handling

Gorums provides structured error types to help you understand and handle failures in quorum calls.
Expand Down
4 changes: 4 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ var ErrTypeMismatch = stream.ErrTypeMismatch
// This allows the response iterator to account for all nodes without blocking.
var ErrSkipNode = errors.New("skip node")

// ErrStopped is returned by [System.WaitForConfig] and [System.WaitForClientConfig]
// when the system is stopped before the condition is met.
var ErrStopped = errors.New("system stopped")

// QuorumCallError reports on a failed quorum call.
// It provides detailed information about which nodes failed.
type QuorumCallError struct {
Expand Down
7 changes: 6 additions & 1 deletion examples/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ proto_go := $(proto_src:%.proto=%.pb.go)
gorums_go := $(proto_src:%.proto=%_gorums.pb.go)
binaries := storage/storage

.PHONY: all debug proto download tools $(binaries)
.PHONY: all debug proto download tools start $(binaries)

all: $(binaries)

# Start a three-node local storage cluster for manual testing.
# Starts three servers on localhost:9081-9083. Press Ctrl-C to stop all servers.
start: storage/storage
@storage/storage -cluster -addrs localhost:9081,localhost:9082,localhost:9083

# Build binaries in debug mode: Tell the Go compiler to compile without
# optimizations (-N) or inlining (-l) so that debuggers, stack traces,
# and variable inspection behave exactly like the source code.
Expand Down
13 changes: 6 additions & 7 deletions examples/storage/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package main

import (
"log"
"fmt"

"github.com/relab/gorums"
"github.com/relab/gorums/examples/storage/proto"
Expand All @@ -11,15 +11,14 @@ import (

func runClient(addresses []string) error {
if len(addresses) < 1 {
log.Fatalln("No addresses provided!")
return fmt.Errorf("no server addresses provided")
}
cfg, err := gorums.NewConfig(gorums.WithNodeList(addresses),
gorums.WithDialOptions(
grpc.WithTransportCredentials(insecure.NewCredentials()), // disable TLS
),
cfg, err := gorums.NewConfig(
gorums.WithNodeList(addresses),
gorums.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())),
)
if err != nil {
log.Fatal(err)
return err
}
defer cfg.Close()
return Repl(cfg)
Expand Down
70 changes: 70 additions & 0 deletions examples/storage/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package main

import (
"fmt"
"os"
"os/exec"
"os/signal"
"strings"
"syscall"
)

// runCluster spawns one server subprocess per address in the comma-separated
// addrs string, prints connection instructions, and waits for a signal before
// stopping all subprocesses. The ic string is the raw -interceptors flag value,
// forwarded to each subprocess unchanged.
func runCluster(addrs string, ic string) error {
all := splitAddrs(addrs)
if len(all) == 0 {
return fmt.Errorf("no addresses provided")
}
exe, err := os.Executable()
if err != nil {
return fmt.Errorf("could not determine executable path: %w", err)
}

cmds := make([]*exec.Cmd, len(all))
for i, addr := range all {
// Put this server's address first, then the remaining peers.
nodeAddrs := append([]string{addr}, append(all[:i:i], all[i+1:]...)...)
nodeArgs := strings.Join(nodeAddrs, ",")
args := []string{"-serve", "-addrs", nodeArgs}
if ic != "" {
args = append(args, "-interceptors", ic)
}
cmd := exec.Command(exe, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
stopAll(cmds[:i])
return fmt.Errorf("failed to start server %q: %w", addr, err)
}
cmds[i] = cmd
}

fmt.Printf("\nCluster running on %s.\n", strings.Join(all, ", "))
fmt.Printf("Connect a client with:\n %s -addrs %s\n\n", exe, addrs)
fmt.Println("Press Ctrl-C to stop all servers.")

signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
<-signals

fmt.Fprintln(os.Stderr, "\nStopping servers...")
stopAll(cmds)
return nil
}

// stopAll sends SIGTERM to all running processes and waits for them to exit.
func stopAll(cmds []*exec.Cmd) {
for _, cmd := range cmds {
if cmd != nil && cmd.Process != nil {
_ = cmd.Process.Signal(syscall.SIGTERM)
}
}
for _, cmd := range cmds {
if cmd != nil {
_ = cmd.Wait()
}
}
}
78 changes: 42 additions & 36 deletions examples/storage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,64 +2,70 @@ package main

import (
"flag"
"fmt"
"log"
"os"
"strings"

"github.com/relab/gorums"
"github.com/relab/gorums/examples/interceptors"
pb "github.com/relab/gorums/examples/storage/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

func main() {
server := flag.String("server", "", "Start as a server on given address.")
remotes := flag.String("connect", "", "Comma-separated list of servers to connect to.")
cluster := flag.Bool("cluster", false, "Spawn a server process for each address in -addrs.")
serve := flag.Bool("serve", false, "Run as a single server node on addrs[0]; the remaining addresses are peers.")
addrs := flag.String("addrs", "", "Comma-separated list of server addresses.")
ic := flag.String("interceptors", "", "Comma-separated list of interceptors to enable (logging, nofoo, metadata, delayed).")
flag.Parse()

srvOpts := parseInterceptors(*ic)
if *cluster {
if *addrs == "" {
log.Fatal("Usage: storage -cluster -addrs <addr>[,<addr>...]")
}
if err := runCluster(*addrs, *ic); err != nil {
log.Fatal(err)
}
return
}

if *server != "" {
runServer(*server, srvOpts)
all := splitAddrs(*addrs)
if *serve {
if len(all) == 0 {
log.Fatal("Usage: storage -serve -addrs <myaddr>[,<peer>...]")
}
if err := runServer(all[0], all, parseInterceptors(*ic)); err != nil {
log.Fatal(err)
}
return
}

addrs := strings.Split(*remotes, ",")
// start local servers if no remote servers were specified
if len(addrs) == 1 && addrs[0] == "" {
// NewLocalSystems pre-allocates all listeners and configures each system
// with WithConfig (node IDs 1..n). Passing dial options auto-creates an
// outbound Configuration for each system (accessible via sys.OutboundConfig).
dialOpts := gorums.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials()))
systems, stop, err := gorums.NewLocalSystems(4, gorums.WithServerOptions(srvOpts), dialOpts)
// Client mode: connect to the provided addresses, or spin up a local cluster (same process).
if len(all) == 0 {
clusterAddrs, stop, err := runLocalCluster(parseInterceptors(*ic))
if err != nil {
log.Fatalf("Failed to create local systems: %v", err)
log.Fatal(err)
}
defer stop()

addrs = make([]string, len(systems))
for i, sys := range systems {
addrs[i] = sys.Addr()
}
for i, sys := range systems {
storage := newStorageServer(os.Stderr, fmt.Sprintf("node %d", i))
sys.RegisterService(nil, func(srv *gorums.Server) {
pb.RegisterStorageServer(srv, storage)
})
go func() {
if err := sys.Serve(); err != nil {
log.Printf("Server error: %v", err)
}
}()
}
all = clusterAddrs
}
if err := runClient(all); err != nil {
log.Fatal(err)
}
}

if runClient(addrs) != nil {
os.Exit(1)
// splitAddrs splits a comma-separated address string, trimming spaces.
// Returns nil if s is empty.
func splitAddrs(s string) []string {
if s == "" {
return nil
}
var addrs []string
for addr := range strings.SplitSeq(s, ",") {
trimmed := strings.TrimSpace(addr)
if trimmed != "" {
addrs = append(addrs, trimmed)
}
}
return addrs
}

// parseInterceptors converts a comma-separated interceptor list into server options.
Expand Down
Loading
Loading