fix(amqp091): handle NewConsumer failure and atomic-load shared counters#84
Open
miotte wants to merge 3 commits into
Open
fix(amqp091): handle NewConsumer failure and atomic-load shared counters#84miotte wants to merge 3 commits into
miotte wants to merge 3 commits into
Conversation
rsperl
reviewed
May 19, 2026
rsperl
approved these changes
May 19, 2026
bithckr
reviewed
May 22, 2026
| } | ||
|
|
||
| consumer, _ := bd.StreamConnection.NewConsumer(source.GetName(), consumerName, offset, handleMessages, source.GetSingleActiveConsumer()) | ||
| consumer, consErr := bd.StreamConnection.NewConsumer(source.GetName(), consumerName, offset, handleMessages, source.GetSingleActiveConsumer()) |
Contributor
There was a problem hiding this comment.
I don't think we can look at the error from NewConsumer because we get an 'Invalid Offset' error whenever we use 0(the default). We should check consumer for nil though.
- streamSubscribe ignored the error from StreamConnection.NewConsumer and then unconditionally called consumer.Close(); a failed consumer returned (nil, err) and panicked the goroutine. Surface the error as *pb.Error and skip the increment/Close path. - Stats() and connectionCleaner read produced/consumed/ActiveStreams as plain int64s while writers use atomic.AddInt64. Switch the readers to atomic.LoadInt64 to remove the data race. Signed-off-by: Michael Otteni <MichaelGOtteni@gmail.com>
updateLastPubSubEvent wrote bd.lastPubSubEvent as a plain time.Time from multiple goroutines (publish, subscribe, stream count changes) while connectionCleaner read it without synchronization. time.Time is a multi-word struct, so this risked torn reads and triggered the race detector alongside the ActiveStreams/produced/consumed fields hardened in the prior commit. Store the timestamp as an int64 UnixNano and use atomic.StoreInt64 / atomic.LoadInt64 to match the pattern already used for the sibling counters on BrokerDetails. Signed-off-by: Michael Otteni <MichaelGOtteni@gmail.com>
…rror NewConsumer can return a non-nil error (e.g. "Invalid Offset" with the default offset) while still handing back a usable consumer, so checking the error rejected valid subscribes. Gate on consumer == nil instead and still surface the error string when one is present. Signed-off-by: Michael Otteni <MichaelGOtteni@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #90