Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package main

import (
"context"
"fmt"
"testing"

"github.com/liftbridge-io/go-liftbridge"
"github.com/stretchr/testify/assert"
)

func (mlbc MockLiftBridgeClient) CreateStream(ctx context.Context, stream liftbridge.StreamInfo) error {
fmt.Println("fake CreateStream() called")
return nil
}

func Test_Client_CreateStream(t *testing.T) {
mlbc := MockLiftBridgeClient{
streamInfo: liftbridge.StreamInfo{
Subject: "foo",
Name: "foo-stream",
ReplicationFactor: 3,
},
}
assert.NoError(t, createStream(mlbc, context.Background(), mlbc.streamInfo))
}
17 changes: 15 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
package main

import (
"context"
"sync"

liftbridge "github.com/liftbridge-io/go-liftbridge"
)

type EventStreamClient interface {
Pub()
Sub()
CreateStream(context.Context, liftbridge.StreamInfo) error
}

type LiftBridgeClient struct {
wg *sync.WaitGroup
wg *sync.WaitGroup
servers []string
streamInfo liftbridge.StreamInfo
client liftbridge.Client
}

func main() {
lbc := LiftBridgeClient{
wg: &sync.WaitGroup{},
wg: &sync.WaitGroup{},
servers: []string{"localhost:9292", "localhost:9293", "localhost:9294"}, // TODO(simar7): Add cmd line flag option
streamInfo: liftbridge.StreamInfo{
Subject: "foo",
Name: "foo-stream",
ReplicationFactor: 3,
},
}
lbc.wg.Add(2)
go sub(lbc)
Expand Down
4 changes: 3 additions & 1 deletion main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"sync"
"testing"

liftbridge "github.com/liftbridge-io/go-liftbridge"
"github.com/stretchr/testify/assert"
)

type MockLiftBridgeClient struct {
wg *sync.WaitGroup
wg *sync.WaitGroup
streamInfo liftbridge.StreamInfo
}

func (mlbc MockLiftBridgeClient) Pub() {
Expand Down
47 changes: 44 additions & 3 deletions sub.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,53 @@
package main

import "fmt"
import (
"context"
"fmt"

liftbridge "github.com/liftbridge-io/go-liftbridge"
lift "github.com/liftbridge-io/go-liftbridge/liftbridge-grpc"
)

func sub(lbcIface EventStreamClient) {
lbcIface.Sub()
}

func (lbc LiftBridgeClient) Sub() {
defer lbc.wg.Done()
fmt.Println("real Sub() called")

// TODO(simar7): Add exponential backoff rather than panic
client, err := liftbridge.Connect(lbc.servers)
if err != nil {
panic(err)
}
lbc.client = client
defer lbc.client.Close()

if err := lbc.client.CreateStream(context.Background(), lbc.streamInfo); err != nil {
if err != liftbridge.ErrStreamExists {
panic(err)
}
}

ctx := context.Background()
if err := client.Subscribe(ctx, lbc.streamInfo.Subject, lbc.streamInfo.Name, func(msg *lift.Message, err error) {
if err != nil {
panic(err)
}
fmt.Println(msg.Offset, string(msg.Value))
}); err != nil {
panic(err)
}

<-ctx.Done()
}

func sub(lbcIface EventStreamClient) {
lbcIface.Sub()
func createStream(lbcIface EventStreamClient, ctx context.Context, stream liftbridge.StreamInfo) error {
return lbcIface.CreateStream(ctx, stream)
}

func (lbc LiftBridgeClient) CreateStream(ctx context.Context, stream liftbridge.StreamInfo) error {
fmt.Println("real createStream() called")
return nil
}