Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions internal/provider/connectors/amqp091/amqp091.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type BrokerDetails struct {
consumed int64
produced int64
clientDisconnect bool
lastPubSubEvent time.Time
lastPubSubEvent int64
tlsConfig *tls.Config
tlsEnabled bool
shutdownChan chan bool
Expand Down Expand Up @@ -460,7 +460,7 @@ func (prov *amqp091provider) Connect(ctx context.Context, cf *pb.ConnectionConfi
consumed: 0,
ActiveStreams: 0,
clientDisconnect: false,
lastPubSubEvent: time.Now(),
lastPubSubEvent: time.Now().UnixNano(),
shutdownChan: make(chan bool, 1),
pubChannelCtx: pubChCtx,
pubChannelCancel: pubChCancel,
Expand Down Expand Up @@ -606,7 +606,7 @@ func (bd *BrokerDetails) bindingKnown(name string) bool {
}

func (bd *BrokerDetails) updateLastPubSubEvent() {
bd.lastPubSubEvent = time.Now()
atomic.StoreInt64(&bd.lastPubSubEvent, time.Now().UnixNano())
}

func (bd *BrokerDetails) incrementStreamCount() {
Expand Down Expand Up @@ -1295,7 +1295,17 @@ func (prov *amqp091provider) streamSubscribe(ctx context.Context, bd *BrokerDeta
atomic.AddInt64(&bd.consumed, 1)
}

consumer, _ := bd.StreamConnection.NewConsumer(source.GetName(), consumerName, offset, handleMessages, source.GetSingleActiveConsumer())
// NewConsumer can return a non-nil error (e.g. "Invalid Offset" when offset
// is the default) while still handing back a usable consumer, so gate on
// consumer == nil rather than on the error.
consumer, consErr := bd.StreamConnection.NewConsumer(source.GetName(), consumerName, offset, handleMessages, source.GetSingleActiveConsumer())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can look at the error from NewConsumer because we get an 'Invalid Offset' error whenever we use 0(the default). We should check consumer for nil though.

if consumer == nil {
msg := "failed to create stream consumer"
if consErr != nil {
msg = consErr.Error()
}
return &pb.Error{Message: msg}
}
bd.incrementStreamCount()
defer bd.decrementStreamCount()
<-ctx.Done()
Expand Down Expand Up @@ -1988,9 +1998,9 @@ func (prov *amqp091provider) Stats() *provider.Stats {
conn := connRaw.(*BrokerDetails)
clientStat.ID = conn.ClientIdentifier
clientStat.ActiveMessages = conn.activeMessages.Length()
clientStat.Streams = int(conn.ActiveStreams)
clientStat.Produced = int(conn.produced)
clientStat.Consumed = int(conn.consumed)
clientStat.Streams = int(atomic.LoadInt64(&conn.ActiveStreams))
clientStat.Produced = int(atomic.LoadInt64(&conn.produced))
clientStat.Consumed = int(atomic.LoadInt64(&conn.consumed))
stats.Clients = append(stats.Clients, clientStat)
}
return stats
Expand Down
8 changes: 5 additions & 3 deletions internal/provider/connectors/amqp091/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package amqp091

import (
"context"
"sync/atomic"
"time"

"github.com/sassoftware/arke/internal/provider"
Expand Down Expand Up @@ -32,9 +33,10 @@ func connectionCleaner(ctx context.Context) {
for _, connID := range prov.connections.GetList() {
if conn, ok := prov.connections.Get(connID); ok {
bd := conn.(*BrokerDetails)
util.Logger.Tracef("Client %v has %d open streams", connID, bd.ActiveStreams)
lastKnown := time.Since(bd.lastPubSubEvent)
if bd.ActiveStreams < 1 && lastKnown > cleanInterval {
activeStreams := atomic.LoadInt64(&bd.ActiveStreams)
util.Logger.Tracef("Client %v has %d open streams", connID, activeStreams)
lastKnown := time.Since(time.Unix(0, atomic.LoadInt64(&bd.lastPubSubEvent)))
if activeStreams < 1 && lastKnown > cleanInterval {
util.Logger.Debugf("Client %v has had no streams open for %v. Assuming dead. Disconnecting.", connID, lastKnown)
prov.disconnectClientByIdentifier(connID)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/provider/connectors/amqp091/cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestConnectionCleaner_CleansInactiveConnections(t *testing.T) {
prov := provy.(*amqp091provider)
prov.connections.Add("test-conn", &BrokerDetails{
ClientIdentifier: "test-conn",
lastPubSubEvent: time.Now().Add(-3 * time.Second), // Simulate inactivity
lastPubSubEvent: time.Now().Add(-3 * time.Second).UnixNano(), // Simulate inactivity
shutdownChan: shutdownChan,
pubChannelCancel: cancel,
})
Expand Down
120 changes: 120 additions & 0 deletions internal/provider/connectors/amqp091/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,126 @@ func Test_streamSubscribe(t *testing.T) {
}
}

// Test_streamSubscribe_NewConsumerResult covers what streamSubscribe does with
// each (consumer, err) combination returned by StreamConnection.NewConsumer.
// The upstream rabbitmq-stream-go-client returns a non-nil error (e.g.
// "Invalid Offset") in some cases where the consumer it hands back is still
// usable, so the gate must be nil-on-consumer rather than err != nil.
func Test_streamSubscribe_NewConsumerResult(t *testing.T) {
prov := NewAMQP091Provider().(*amqp091provider)

origNewAmqpConn091 := NewAmqpConn091
origNewStreamConn := NewStreamConn
origGetClientIdentifier := GetClientIdentifier
GetClientIdentifier = func(context.Context) (string, error) {
return "1234", nil
}
defer func() {
GetClientIdentifier = origGetClientIdentifier
NewAmqpConn091 = origNewAmqpConn091
NewStreamConn = origNewStreamConn
}()

address := stockAddress()
address.Type = pb.Address_STREAM

subTests := []struct {
name string
consumerNil bool
consumerErr error
wantErr bool
wantErrMsg string
}{
{
name: "non-nil consumer with error succeeds",
consumerNil: false,
consumerErr: errors.New("Invalid Offset"),
wantErr: false,
},
{
name: "nil consumer with error surfaces the error",
consumerNil: true,
consumerErr: errors.New("connection refused"),
wantErr: true,
wantErrMsg: "connection refused",
},
{
name: "nil consumer with no error fails",
consumerNil: true,
consumerErr: nil,
wantErr: true,
wantErrMsg: "failed to create stream consumer",
},
}

for _, subt := range subTests {
t.Run(subt.name, func(t *testing.T) {
src := &pb.Source{
Name: "srcname",
Address: address,
Options: map[string]string{"Offset": "0"},
Type: pb.Source_STREAM,
PrefetchCount: 4,
}

bd := &BrokerDetails{}
bd.activeMessages = util.NewConcurrentMap()
ctx, cancel := context.WithCancel(t.Context())

mc := make(chan *pb.Message, 1)
go func() {
for range mc {
}
}()
defer close(mc)

errs := make(chan amqp091Error)

amock := &amqpConnectionMock{}
amock.On("Connect").Return(nil)
amock.On("IsClosed").Return(false)
amock.On("Close").Return(nil)
amock.On("NotifyClose").Return(errs)
NewAmqpConn091 = func(string, string, *tls.Config) amqp091ConnectionShim {
return amock
}

smock := &streamConnectionMock{}
smock.On("Connect").Return(nil)
smock.On("IsClosed").Return(false)
smock.On("DeclareStream").Return(nil)

if subt.consumerNil {
smock.On("NewConsumer", src.GetName(), src.GetName(), "0", mock.Anything, mock.AnythingOfType("bool")).Return(nil, subt.consumerErr)
} else {
pmock := &streamConsumerMock{}
pmock.On("Close").Return(nil)
smock.On("NewConsumer", src.GetName(), src.GetName(), "0", mock.Anything, mock.AnythingOfType("bool")).Return(pmock, subt.consumerErr)
}

NewStreamConn = func(string, string, *tls.Config) streamConnectionShim {
return smock
}

go func() {
time.Sleep(500 * time.Millisecond)
cancel()
}()

err := prov.streamSubscribe(ctx, bd, src, mc)

if subt.wantErr {
assert.NotNil(t, err)
if err != nil {
assert.Contains(t, err.GetMessage(), subt.wantErrMsg)
}
} else {
assert.Nil(t, err)
}
})
}
}

func Test_SubscribeStreamAutoDeleteOrExclusive(t *testing.T) {
prov := NewAMQP091Provider()

Expand Down