Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions async.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,27 @@ func (f *Async[Resp]) Done() bool {
// AsyncMajority returns an Async future that resolves when a majority quorum is reached.
// Messages are sent immediately (synchronously) to preserve ordering when multiple
// async calls are created in sequence.
func (r *Responses[Resp]) AsyncMajority() *Async[Resp] {
func (r *Responses[T, Resp]) AsyncMajority() *Async[Resp] {
quorumSize := r.size/2 + 1
return r.AsyncThreshold(quorumSize)
}

// AsyncFirst returns an Async future that resolves when the first response is received.
// Messages are sent immediately (synchronously) to preserve ordering.
func (r *Responses[Resp]) AsyncFirst() *Async[Resp] {
func (r *Responses[T, Resp]) AsyncFirst() *Async[Resp] {
return r.AsyncThreshold(1)
}

// AsyncAll returns an Async future that resolves when all nodes have responded.
// Messages are sent immediately (synchronously) to preserve ordering.
func (r *Responses[Resp]) AsyncAll() *Async[Resp] {
func (r *Responses[T, Resp]) AsyncAll() *Async[Resp] {
return r.AsyncThreshold(r.size)
}

// AsyncThreshold returns an Async future that resolves when the threshold is reached.
// Messages are sent immediately (synchronously) to preserve ordering when multiple
// async calls are created in sequence.
func (r *Responses[Resp]) AsyncThreshold(threshold int) *Async[Resp] {
func (r *Responses[T, Resp]) AsyncThreshold(threshold int) *Async[Resp] {
// Send messages synchronously before spawning the goroutine to preserve ordering
r.sendNow()

Expand Down
10 changes: 5 additions & 5 deletions async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

func TestAsync(t *testing.T) {
// a type alias short hand for the responses type
type respType = *gorums.Responses[*pb.StringValue]
type respType = *gorums.Responses[uint32, *pb.StringValue]
tests := []struct {
name string
call func(respType) *gorums.Async[*pb.StringValue]
Expand Down Expand Up @@ -45,7 +45,7 @@ func TestAsync(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
config := gorums.TestConfiguration(t, tt.numNodes, gorums.EchoServerFn)
ctx := gorums.TestContext(t, 2*time.Second)
responses := gorums.QuorumCall[*pb.StringValue, *pb.StringValue](
responses := gorums.QuorumCall[uint32, *pb.StringValue, *pb.StringValue](
config.Context(ctx),
pb.String("test"),
mock.TestMethod,
Expand All @@ -71,7 +71,7 @@ func TestAsync_Error(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel() // Cancel immediately

responses := gorums.QuorumCall[*pb.StringValue, *pb.StringValue](
responses := gorums.QuorumCall[uint32, *pb.StringValue, *pb.StringValue](
config.Context(ctx),
pb.String("test"),
mock.TestMethod,
Expand All @@ -92,7 +92,7 @@ func BenchmarkAsyncQuorumCall(b *testing.B) {
b.Run(fmt.Sprintf("AsyncMajority/%d", numNodes), func(b *testing.B) {
b.ReportAllocs()
for b.Loop() {
future := gorums.QuorumCall[*pb.StringValue, *pb.StringValue](
future := gorums.QuorumCall[uint32, *pb.StringValue, *pb.StringValue](
cfgCtx,
pb.String("test"),
mock.TestMethod,
Expand All @@ -108,7 +108,7 @@ func BenchmarkAsyncQuorumCall(b *testing.B) {
b.Run(fmt.Sprintf("BlockingMajority/%d", numNodes), func(b *testing.B) {
b.ReportAllocs()
for b.Loop() {
_, err := gorums.QuorumCall[*pb.StringValue, *pb.StringValue](
_, err := gorums.QuorumCall[uint32, *pb.StringValue, *pb.StringValue](
cfgCtx,
pb.String("test"),
mock.TestMethod,
Expand Down
10 changes: 5 additions & 5 deletions benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ type Bench struct {

type (
benchFunc func(Options) (*Result, error)
qcFunc func(*gorums.ConfigContext, *Echo, int, ...gorums.CallOption) (*Echo, error)
asyncQCFunc func(*gorums.ConfigContext, *Echo, int, ...gorums.CallOption) AsyncEcho
qcFunc func(*ConfigContext, *Echo, int, ...gorums.CallOption) (*Echo, error)
asyncQCFunc func(*ConfigContext, *Echo, int, ...gorums.CallOption) AsyncEcho
serverFunc func(context.Context, *TimedMsg)
)

Expand Down Expand Up @@ -233,7 +233,7 @@ func GetBenchmarks(config Configuration) []Bench {
Name: "QuorumCall",
Description: "NodeStream based quorum call implementation with FIFO ordering",
runBench: func(opts Options) (*Result, error) {
return runQCBenchmark(opts, config, func(ctx *gorums.ConfigContext, in *Echo, quorumSize int, callOpts ...gorums.CallOption) (*Echo, error) {
return runQCBenchmark(opts, config, func(ctx *ConfigContext, in *Echo, quorumSize int, callOpts ...gorums.CallOption) (*Echo, error) {
return QuorumCall(ctx, in, callOpts...).Threshold(quorumSize)
})
},
Expand All @@ -242,7 +242,7 @@ func GetBenchmarks(config Configuration) []Bench {
Name: "AsyncQuorumCall",
Description: "NodeStream based async quorum call implementation with FIFO ordering",
runBench: func(opts Options) (*Result, error) {
return runAsyncQCBenchmark(opts, config, func(ctx *gorums.ConfigContext, in *Echo, quorumSize int, callOpts ...gorums.CallOption) AsyncEcho {
return runAsyncQCBenchmark(opts, config, func(ctx *ConfigContext, in *Echo, quorumSize int, callOpts ...gorums.CallOption) AsyncEcho {
return QuorumCall(ctx, in, callOpts...).AsyncThreshold(quorumSize)
})
},
Expand All @@ -251,7 +251,7 @@ func GetBenchmarks(config Configuration) []Bench {
Name: "SlowServer",
Description: "Quorum Call with a 10s processing time on the server",
runBench: func(opts Options) (*Result, error) {
return runQCBenchmark(opts, config, func(ctx *gorums.ConfigContext, in *Echo, quorumSize int, callOpts ...gorums.CallOption) (*Echo, error) {
return runQCBenchmark(opts, config, func(ctx *ConfigContext, in *Echo, quorumSize int, callOpts ...gorums.CallOption) (*Echo, error) {
return SlowServer(ctx, in, callOpts...).Threshold(quorumSize)
})
},
Expand Down
2 changes: 1 addition & 1 deletion benchmark/benchmark.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 27 additions & 20 deletions benchmark/benchmark_gorums.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions callopts.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package gorums

import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/runtime/protoimpl"
)

Expand Down Expand Up @@ -54,7 +53,7 @@ func IgnoreErrors() CallOption {
// resp, err := ReadQC(ctx, req,
// gorums.Interceptors(loggingInterceptor, filterInterceptor),
// ).Majority()
func Interceptors[Req, Resp proto.Message](interceptors ...QuorumInterceptor[Req, Resp]) CallOption {
func Interceptors[T NodeID, Req, Resp msg](interceptors ...QuorumInterceptor[T, Req, Resp]) CallOption {
return func(o *callOptions) {
for _, interceptor := range interceptors {
o.interceptors = append(o.interceptors, interceptor)
Expand Down
4 changes: 3 additions & 1 deletion callopts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ func TestCallOptionsMustWaitSendDone(t *testing.T) {
}

func BenchmarkGetCallOptions(b *testing.B) {
interceptor := func(_ *ClientCtx[msg, msg], next ResponseSeq[msg]) ResponseSeq[msg] { return next }
interceptor := func(_ *ClientCtx[uint32, msg, msg], next ResponseSeq[uint32, msg]) ResponseSeq[uint32, msg] {
return next
}
tests := []struct {
numOpts int
}{
Expand Down
Loading
Loading