diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index fbcc5b9776..3e20b0df0b 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -91,6 +91,8 @@ type partitionProducer struct { client *client topic string log log.Logger + cnx internal.Connection + err error conn uAtomic.Value @@ -446,6 +448,10 @@ func (p *partitionProducer) reconnectToBroker() { atomic.AddUint64(&p.epoch, 1) err := p.grabCnx() + // In reconnection logic, grabCnx maybe return err, but we did not return the error. + // So in partitionProducer struct, we define an err object to make it easier for users to + // determine what caused the grabCnx error. + p.err = err if err == nil { // Successfully reconnected p.log.WithField("cnx", p._getConn().ID()).Info("Reconnected producer to broker") @@ -1208,6 +1214,10 @@ func (p *partitionProducer) internalSendAsync( runCallback(callback, nil, msg, err) return } + if p.err != nil { + callback(nil, msg, p.err) + return + } sr := sendRequestPool.Get().(*sendRequest) *sr = sendRequest{