Two bugs in the amqp091 stream path:
streamSubscribe ignores the error from StreamConnection.NewConsumer and then unconditionally calls consumer.Close(). When the consumer fails to construct it returns (nil, err), and the subsequent Close() panics the goroutine. The error should be surfaced as *pb.Error and the increment/Close path skipped.
Stats() and connectionCleaner read produced, consumed, and ActiveStreams as plain int64s, but writers use atomic.AddInt64. The reads should use atomic.LoadInt64 to remove the data race.
Two bugs in the amqp091 stream path:
streamSubscribeignores the error fromStreamConnection.NewConsumerand then unconditionally callsconsumer.Close(). When the consumer fails to construct it returns(nil, err), and the subsequentClose()panics the goroutine. The error should be surfaced as*pb.Errorand the increment/Closepath skipped.Stats()andconnectionCleanerreadproduced,consumed, andActiveStreamsas plainint64s, but writers useatomic.AddInt64. The reads should useatomic.LoadInt64to remove the data race.