From 94e4b374480f25949fd400ac215f41b89fae249f Mon Sep 17 00:00:00 2001 From: Dane Hunt Date: Wed, 6 May 2026 16:41:33 -0400 Subject: [PATCH 1/2] feat: do not create exchanges and bindings for streams Signed-off-by: Dane Hunt --- .../provider/connectors/amqp091/amqp091.go | 12 +- .../connectors/amqp091/amqp091_test.go | 144 ++++++++++++++++++ 2 files changed, 154 insertions(+), 2 deletions(-) diff --git a/internal/provider/connectors/amqp091/amqp091.go b/internal/provider/connectors/amqp091/amqp091.go index d6476f0..9383848 100644 --- a/internal/provider/connectors/amqp091/amqp091.go +++ b/internal/provider/connectors/amqp091/amqp091.go @@ -626,6 +626,11 @@ func (bd *BrokerDetails) decrementStreamCount() { } func (prov *amqp091provider) declareExchange(address *pb.Address, bd *BrokerDetails, amqpChannel amqp091ChannelShim) error { + // Streams manage their own topology via the stream protocol; skip legacy AMQP exchange declaration. + if address.GetType() == pb.Address_STREAM { + return nil + } + // don't try to declare an exchange with amq. in the name if strings.Contains(address.GetName(), "amq.") { return nil @@ -946,8 +951,11 @@ func (prov *amqp091provider) declareBinding(source *pb.Source, bd *BrokerDetails } } - removed := bd.cleanupBindings(source, subjects) - util.Logger.Tracef("removed %d bindings from %s", len(removed), source.GetName()) + // Streams manage their own topology; skip AMQP binding reconciliation. + if source.GetAddress().GetType() != pb.Address_STREAM { + removed := bd.cleanupBindings(source, subjects) + util.Logger.Tracef("removed %d bindings from %s", len(removed), source.GetName()) + } bd.knownBindings.Add(knownBindingKey, true) return nil diff --git a/internal/provider/connectors/amqp091/amqp091_test.go b/internal/provider/connectors/amqp091/amqp091_test.go index 4e80ca1..daa2912 100644 --- a/internal/provider/connectors/amqp091/amqp091_test.go +++ b/internal/provider/connectors/amqp091/amqp091_test.go @@ -34,6 +34,8 @@ const testDeadLetterAddress = "dla" const testContentTypeJSON = "application/json" const testContentEncodingText = "text" const testXMatchAny = "any" +const testAddressName = "testAddress" +const testSubject = "testSubject" func init() { // Register the MockProvider with the Provider factory. @@ -2462,6 +2464,148 @@ func Test_cleanupBindings_none(t *testing.T) { assert.Len(t, removed, 0) } +// Test_declareExchange verifies that declareExchange calls ExchangeDeclare on +// the AMQP channel for non-STREAM addresses and skips it for STREAM addresses. +func Test_declareExchange(t *testing.T) { + tests := map[string]struct { + addressType pb.Address_TargetType + expectedDeclareCount int + }{ + "stream skips declaration": { + addressType: pb.Address_STREAM, + expectedDeclareCount: 0, + }, + "topic declares exchange": { + addressType: pb.Address_TOPIC, + expectedDeclareCount: 1, + }, + "queue declares exchange": { + addressType: pb.Address_QUEUE, + expectedDeclareCount: 1, + }, + "filter declares exchange": { + addressType: pb.Address_FILTER, + expectedDeclareCount: 1, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + addr := &pb.Address{ + Name: testAddressName, + Type: tc.addressType, + Subjects: []string{"#"}, + } + bd := &BrokerDetails{knownExchanges: util.NewConcurrentMap()} + cmock := &amqpChannelMock{} + // Only mock ExchangeDeclare if the address type is not stream + if addr.GetType() != pb.Address_STREAM { + addressTypeStr, err := addressTypeToAmqpType(addr.GetType()) + assert.Nil(t, err) + cmock.On("ExchangeDeclare", testAddressName, addressTypeStr, false).Return(nil).Once() + } + + prov := NewAMQP091Provider().(*amqp091provider) + err := prov.declareExchange(addr, bd, cmock) + assert.Nil(t, err) + cmock.AssertNumberOfCalls(t, "ExchangeDeclare", tc.expectedDeclareCount) + cmock.AssertExpectations(t) + }) + } +} + +// trackingManagementServer returns a test HTTP server that records whether +// any DELETE request was received. +func trackingManagementServer(t *testing.T) (*httptest.Server, *int) { + t.Helper() + count := 0 + deleteCount := &count + bindingBody := []byte(`[{"source":"arke.test","vhost":"tenant","destination":"queue","destination_type":"queue","routing_key":"routingkey","arguments":{},"properties_key":"routingkey"}]`) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + w.WriteHeader(http.StatusOK) + _, err := w.Write(bindingBody) + assert.Nil(t, err) + case http.MethodDelete: + *deleteCount++ + w.WriteHeader(http.StatusNoContent) + default: + w.WriteHeader(http.StatusMethodNotAllowed) + } + })) + return server, deleteCount +} + +// setupDeclareBindingBD creates a minimal BrokerDetails wired to the provided +// management server URL. +func setupDeclareBindingBD(t *testing.T, msrv *httptest.Server) *BrokerDetails { + t.Helper() + u, err := url.Parse(msrv.URL) + assert.Nil(t, err) + port, _ := strconv.Atoi(u.Port()) + return &BrokerDetails{ + knownBindings: util.NewConcurrentMap(), + connectionConfig: &pb.ConnectionConfiguration{ + Host: u.Hostname(), + Tenant: testTenant, + AdminPort: int32(port), //nolint:gosec + Credentials: &pb.Credentials{Username: "user", Password: "password"}, + }, + } +} + +// Test_declareBinding_Reconciliation verifies that declareBinding calls +// cleanupBindings (issues management DELETE requests) for non-STREAM addresses +// and skips it entirely for STREAM addresses. +func Test_declareBinding_Reconciliation(t *testing.T) { + tests := map[string]struct { + addressType pb.Address_TargetType + expectDelete bool + }{ + "stream skips cleanup": { + addressType: pb.Address_STREAM, + expectDelete: false, + }, + "topic reconciles stale bindings": { + addressType: pb.Address_TOPIC, + expectDelete: true, + }, + "queue reconciles stale bindings": { + addressType: pb.Address_QUEUE, + expectDelete: true, + }, + "filter reconciles stale bindings": { + addressType: pb.Address_FILTER, + expectDelete: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + addr := &pb.Address{ + Name: testAddressName, + Type: tc.addressType, + Subjects: []string{testSubject}, + } + msrv, deleteCount := trackingManagementServer(t) + defer msrv.Close() + + bd := setupDeclareBindingBD(t, msrv) + src := &pb.Source{Name: addr.Name, Address: addr} + + cmock := &amqpChannelMock{} + cmock.On("QueueBind", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + + prov := NewAMQP091Provider().(*amqp091provider) + err := prov.declareBinding(src, bd, cmock, false) + assert.Nil(t, err) + assert.Equal(t, tc.expectDelete, *deleteCount > 0) + cmock.AssertExpectations(t) + }) + } +} + func Test_declareQueueAutoDelete(t *testing.T) { var autoDeleteTests = []struct { autoDelete bool From 440b4e4883014046be91989865af45f1897f99be Mon Sep 17 00:00:00 2001 From: Dane Hunt Date: Thu, 7 May 2026 13:33:05 -0400 Subject: [PATCH 2/2] feat: create exchanges and bindings for streams with non stream sources Signed-off-by: Dane Hunt --- .../provider/connectors/amqp091/amqp091.go | 32 ++-- .../connectors/amqp091/amqp091_test.go | 144 ---------------- .../connectors/amqp091/stream_test.go | 159 +++++++++++++++++- 3 files changed, 180 insertions(+), 155 deletions(-) diff --git a/internal/provider/connectors/amqp091/amqp091.go b/internal/provider/connectors/amqp091/amqp091.go index 9383848..0c5e211 100644 --- a/internal/provider/connectors/amqp091/amqp091.go +++ b/internal/provider/connectors/amqp091/amqp091.go @@ -626,11 +626,6 @@ func (bd *BrokerDetails) decrementStreamCount() { } func (prov *amqp091provider) declareExchange(address *pb.Address, bd *BrokerDetails, amqpChannel amqp091ChannelShim) error { - // Streams manage their own topology via the stream protocol; skip legacy AMQP exchange declaration. - if address.GetType() == pb.Address_STREAM { - return nil - } - // don't try to declare an exchange with amq. in the name if strings.Contains(address.GetName(), "amq.") { return nil @@ -951,11 +946,8 @@ func (prov *amqp091provider) declareBinding(source *pb.Source, bd *BrokerDetails } } - // Streams manage their own topology; skip AMQP binding reconciliation. - if source.GetAddress().GetType() != pb.Address_STREAM { - removed := bd.cleanupBindings(source, subjects) - util.Logger.Tracef("removed %d bindings from %s", len(removed), source.GetName()) - } + removed := bd.cleanupBindings(source, subjects) + util.Logger.Tracef("removed %d bindings from %s", len(removed), source.GetName()) bd.knownBindings.Add(knownBindingKey, true) return nil @@ -1217,11 +1209,31 @@ func (prov *amqp091provider) streamSubscribe(ctx context.Context, bd *BrokerDeta ttl = val } + amqpChannel, err := bd.Connection.NewChannel(false) + if err != nil { + return &pb.Error{Message: err.Error()} + } + defer amqpChannel.Close() + + if source.GetAddress().GetType() != pb.Address_STREAM { + err := prov.declareExchange(source.GetAddress(), bd, amqpChannel) + if err != nil { + util.Logger.Debugf("Failed to declare exchange for source %s: %v", source.GetName(), err) + } + } + dErr := bd.StreamConnection.DeclareStream(source.GetName(), ttl) if dErr != nil { return &pb.Error{IsFatal: true, Message: fmt.Sprintf("failed to declare stream: %s", dErr.Error())} } + if source.GetAddress().GetType() != pb.Address_STREAM { + err := prov.declareBinding(source, bd, amqpChannel, true) + if err != nil { + util.Logger.Debugf("Failed to declare binding for source %s: %s", source.GetName(), err.Error()) + } + } + if source.GetDeclareOnly() { // if we reach here, everything has succeeded and we should return from Consume if source.DeclareOnly = true return nil diff --git a/internal/provider/connectors/amqp091/amqp091_test.go b/internal/provider/connectors/amqp091/amqp091_test.go index daa2912..4e80ca1 100644 --- a/internal/provider/connectors/amqp091/amqp091_test.go +++ b/internal/provider/connectors/amqp091/amqp091_test.go @@ -34,8 +34,6 @@ const testDeadLetterAddress = "dla" const testContentTypeJSON = "application/json" const testContentEncodingText = "text" const testXMatchAny = "any" -const testAddressName = "testAddress" -const testSubject = "testSubject" func init() { // Register the MockProvider with the Provider factory. @@ -2464,148 +2462,6 @@ func Test_cleanupBindings_none(t *testing.T) { assert.Len(t, removed, 0) } -// Test_declareExchange verifies that declareExchange calls ExchangeDeclare on -// the AMQP channel for non-STREAM addresses and skips it for STREAM addresses. -func Test_declareExchange(t *testing.T) { - tests := map[string]struct { - addressType pb.Address_TargetType - expectedDeclareCount int - }{ - "stream skips declaration": { - addressType: pb.Address_STREAM, - expectedDeclareCount: 0, - }, - "topic declares exchange": { - addressType: pb.Address_TOPIC, - expectedDeclareCount: 1, - }, - "queue declares exchange": { - addressType: pb.Address_QUEUE, - expectedDeclareCount: 1, - }, - "filter declares exchange": { - addressType: pb.Address_FILTER, - expectedDeclareCount: 1, - }, - } - - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - addr := &pb.Address{ - Name: testAddressName, - Type: tc.addressType, - Subjects: []string{"#"}, - } - bd := &BrokerDetails{knownExchanges: util.NewConcurrentMap()} - cmock := &amqpChannelMock{} - // Only mock ExchangeDeclare if the address type is not stream - if addr.GetType() != pb.Address_STREAM { - addressTypeStr, err := addressTypeToAmqpType(addr.GetType()) - assert.Nil(t, err) - cmock.On("ExchangeDeclare", testAddressName, addressTypeStr, false).Return(nil).Once() - } - - prov := NewAMQP091Provider().(*amqp091provider) - err := prov.declareExchange(addr, bd, cmock) - assert.Nil(t, err) - cmock.AssertNumberOfCalls(t, "ExchangeDeclare", tc.expectedDeclareCount) - cmock.AssertExpectations(t) - }) - } -} - -// trackingManagementServer returns a test HTTP server that records whether -// any DELETE request was received. -func trackingManagementServer(t *testing.T) (*httptest.Server, *int) { - t.Helper() - count := 0 - deleteCount := &count - bindingBody := []byte(`[{"source":"arke.test","vhost":"tenant","destination":"queue","destination_type":"queue","routing_key":"routingkey","arguments":{},"properties_key":"routingkey"}]`) - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case http.MethodGet: - w.WriteHeader(http.StatusOK) - _, err := w.Write(bindingBody) - assert.Nil(t, err) - case http.MethodDelete: - *deleteCount++ - w.WriteHeader(http.StatusNoContent) - default: - w.WriteHeader(http.StatusMethodNotAllowed) - } - })) - return server, deleteCount -} - -// setupDeclareBindingBD creates a minimal BrokerDetails wired to the provided -// management server URL. -func setupDeclareBindingBD(t *testing.T, msrv *httptest.Server) *BrokerDetails { - t.Helper() - u, err := url.Parse(msrv.URL) - assert.Nil(t, err) - port, _ := strconv.Atoi(u.Port()) - return &BrokerDetails{ - knownBindings: util.NewConcurrentMap(), - connectionConfig: &pb.ConnectionConfiguration{ - Host: u.Hostname(), - Tenant: testTenant, - AdminPort: int32(port), //nolint:gosec - Credentials: &pb.Credentials{Username: "user", Password: "password"}, - }, - } -} - -// Test_declareBinding_Reconciliation verifies that declareBinding calls -// cleanupBindings (issues management DELETE requests) for non-STREAM addresses -// and skips it entirely for STREAM addresses. -func Test_declareBinding_Reconciliation(t *testing.T) { - tests := map[string]struct { - addressType pb.Address_TargetType - expectDelete bool - }{ - "stream skips cleanup": { - addressType: pb.Address_STREAM, - expectDelete: false, - }, - "topic reconciles stale bindings": { - addressType: pb.Address_TOPIC, - expectDelete: true, - }, - "queue reconciles stale bindings": { - addressType: pb.Address_QUEUE, - expectDelete: true, - }, - "filter reconciles stale bindings": { - addressType: pb.Address_FILTER, - expectDelete: true, - }, - } - - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - addr := &pb.Address{ - Name: testAddressName, - Type: tc.addressType, - Subjects: []string{testSubject}, - } - msrv, deleteCount := trackingManagementServer(t) - defer msrv.Close() - - bd := setupDeclareBindingBD(t, msrv) - src := &pb.Source{Name: addr.Name, Address: addr} - - cmock := &amqpChannelMock{} - cmock.On("QueueBind", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - - prov := NewAMQP091Provider().(*amqp091provider) - err := prov.declareBinding(src, bd, cmock, false) - assert.Nil(t, err) - assert.Equal(t, tc.expectDelete, *deleteCount > 0) - cmock.AssertExpectations(t) - }) - } -} - func Test_declareQueueAutoDelete(t *testing.T) { var autoDeleteTests = []struct { autoDelete bool diff --git a/internal/provider/connectors/amqp091/stream_test.go b/internal/provider/connectors/amqp091/stream_test.go index 0d1abc6..49b71b1 100644 --- a/internal/provider/connectors/amqp091/stream_test.go +++ b/internal/provider/connectors/amqp091/stream_test.go @@ -8,7 +8,9 @@ import ( "crypto/tls" "errors" "fmt" + "net/url" "reflect" + "strconv" "sync" "testing" "time" @@ -533,12 +535,14 @@ func Test_SubscribeStream(t *testing.T) { pmock := &streamConsumerMock{} pmock.On("Close").Return(nil) smock.On("NewConsumer", src.GetName(), src.GetName(), src.GetOptions()["Offset"], mock.Anything, mock.AnythingOfType("bool")).Return(pmock, nil) + smock.On("StoreOffset", src.GetName(), src.GetName(), mock.Anything).Return(nil) oldNewStreamConn := NewStreamConn NewStreamConn = func(string, string, *tls.Config) streamConnectionShim { return smock } cmock := &amqpChannelMock{} + cmock.On("Close").Return(nil) amock := &amqpConnectionMock{} amock.On("Connect").Return(nil) @@ -547,6 +551,7 @@ func Test_SubscribeStream(t *testing.T) { errs := make(chan amqp091Error) amock.On("NotifyClose").Return(errs) + amock.On("NewChannel", false).Return(cmock, nil) oldNewAmqpConn091 := NewAmqpConn091 NewAmqpConn091 = func(string, string, *tls.Config) amqp091ConnectionShim { return amock @@ -585,10 +590,13 @@ func Test_SubscribeStream(t *testing.T) { cancel() time.Sleep(1000 * time.Millisecond) + // address type is STREAM: exchange and binding must NOT be declared + cmock.AssertNumberOfCalls(t, "ExchangeDeclare", 0) + cmock.AssertNumberOfCalls(t, "QueueBind", 0) + cmock.AssertExpectations(t) amock.AssertExpectations(t) pmock.AssertExpectations(t) smock.AssertExpectations(t) - cmock.AssertExpectations(t) } func Test_SubscribeStreamBadOpt(t *testing.T) { @@ -716,6 +724,11 @@ func Test_streamSubscribe(t *testing.T) { return amock } + cmock := &amqpChannelMock{} + cmock.On("Close").Return(nil) + amock.On("NewChannel", false).Return(cmock, nil) + bd.Connection = amock + pmock := &streamConsumerMock{} pmock.On("Close").Return(nil) @@ -1023,6 +1036,7 @@ func Test_SubscribeStreamFailedDeclare(t *testing.T) { } cmock := &amqpChannelMock{} + cmock.On("Close").Return(nil) amock := &amqpConnectionMock{} amock.On("Connect").Return(nil) @@ -1031,6 +1045,7 @@ func Test_SubscribeStreamFailedDeclare(t *testing.T) { errs := make(chan amqp091Error) amock.On("NotifyClose").Return(errs) + amock.On("NewChannel", false).Return(cmock, nil) oldNewAmqpConn091 := NewAmqpConn091 NewAmqpConn091 = func(string, string, *tls.Config) amqp091ConnectionShim { return amock @@ -1102,6 +1117,7 @@ func Test_StreamRetry(t *testing.T) { } cmock := &amqpChannelMock{} + cmock.On("Close").Return(nil) amock := &amqpConnectionMock{} amock.On("Connect").Return(nil) @@ -1110,6 +1126,7 @@ func Test_StreamRetry(t *testing.T) { errs := make(chan amqp091Error) amock.On("NotifyClose").Return(errs) + amock.On("NewChannel", false).Return(cmock, nil) oldNewAmqpConn091 := NewAmqpConn091 NewAmqpConn091 = func(string, string, *tls.Config) amqp091ConnectionShim { return amock @@ -1199,6 +1216,10 @@ func Test_Subscribe_Stream_DeclareOnly(t *testing.T) { return amock } + cmock := &amqpChannelMock{} + cmock.On("Close").Return(nil) + amock.On("NewChannel", false).Return(cmock, nil) + defer func() { NewStreamConn = oldNewStreamConn }() @@ -1223,3 +1244,139 @@ func Test_Subscribe_Stream_DeclareOnly(t *testing.T) { }) } } + +// Test_streamSubscribe_ExchangeAndBindingDeclaration verifies that +// when subscribing to a Source_STREAM source, an exchange and +// binding are declared on the AMQP channel if (and only if) the address type +// is NOT Address_STREAM. +func Test_streamSubscribe_ExchangeAndBindingDeclaration(t *testing.T) { + tests := []struct { + name string + addressType pb.Address_TargetType + expectExchangeDeclare bool + expectQueueBind bool + needsManagementServer bool // cleanupBindings makes HTTP calls only when bindings are actually declared + }{ + { + name: "non-stream address type declares exchange and binding", + addressType: pb.Address_TOPIC, + expectExchangeDeclare: true, + expectQueueBind: true, + needsManagementServer: true, + }, + { + name: "stream address type skips exchange and binding", + addressType: pb.Address_STREAM, + expectExchangeDeclare: false, + expectQueueBind: false, + needsManagementServer: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + prov := NewAMQP091Provider() + + oldGetClientIdentifier := GetClientIdentifier + GetClientIdentifier = func(context.Context) (string, error) { + return "1234", nil + } + oldNewAmqpConn091 := NewAmqpConn091 + oldNewStreamConn := NewStreamConn + defer func() { + GetClientIdentifier = oldGetClientIdentifier + NewAmqpConn091 = oldNewAmqpConn091 + NewStreamConn = oldNewStreamConn + }() + + const addressName = "addressName" + const sourceName = "srcName" + addr := &pb.Address{ + Name: addressName, + Subjects: []string{"subject1"}, + Type: tc.addressType, + } + src := &pb.Source{ + Name: sourceName, + Address: addr, + Type: pb.Source_STREAM, + Options: map[string]string{"Offset": "0"}, + DeclareOnly: true, // stop after declare; avoids consumer setup + } + + // stream connection mock + smock := &streamConnectionMock{} + smock.On("Connect").Return(nil) + smock.On("IsClosed").Return(false) + smock.On("DeclareStream").Return(nil) + NewStreamConn = func(string, string, *tls.Config) streamConnectionShim { + return smock + } + + // amqp channel mock – ExchangeDeclare and QueueBind are only + // expected when the address type is not Address_STREAM + cmock := &amqpChannelMock{} + cmock.On("Close").Return(nil) + if tc.expectExchangeDeclare { + cmock.On("ExchangeDeclare", addressName, "topic", false).Return(nil) + } + if tc.expectQueueBind { + cmock.On("QueueBind", sourceName, "subject1", addressName, mock.Anything).Return(nil) + } + + // amqp connection mock + amock := &amqpConnectionMock{} + amock.On("Connect").Return(nil) + errs := make(chan amqp091Error) + amock.On("NotifyClose").Return(errs) + amock.On("NewChannel", false).Return(cmock, nil) + NewAmqpConn091 = func(string, string, *tls.Config) amqp091ConnectionShim { + return amock + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cc := &pb.ConnectionConfiguration{} + + // The cleanupBindings call inside declareBinding issues an HTTP + // request to the management API; wire that up when needed. + if tc.needsManagementServer { + msrv := mockManagementRequestServer() + defer msrv.Close() + u, serr := url.Parse(msrv.URL) + assert.Nil(t, serr) + cc.Host = u.Hostname() + cc.Tenant = testTenant + i, _ := strconv.Atoi(u.Port()) + cc.AdminPort = int32(i) //nolint:gosec + } + + connectErr := prov.Connect(ctx, cc, false) + assert.Nil(t, connectErr) + + mc := make(chan *pb.Message) + defer close(mc) + + subscribeErr := prov.Subscribe(ctx, src, mc) + assert.Nil(t, subscribeErr) + + // Core assertions: verify exchange and binding call counts + cmock.AssertNumberOfCalls(t, "ExchangeDeclare", func() int { + if tc.expectExchangeDeclare { + return 1 + } + return 0 + }()) + cmock.AssertNumberOfCalls(t, "QueueBind", func() int { + if tc.expectQueueBind { + return 1 + } + return 0 + }()) + + cmock.AssertExpectations(t) + amock.AssertExpectations(t) + smock.AssertExpectations(t) + }) + } +}