From 3c4bf04c141c45e077a1c9745af553a3137af7a9 Mon Sep 17 00:00:00 2001 From: yorkxyzhang Date: Mon, 25 Oct 2021 11:41:43 +0800 Subject: [PATCH 1/4] [feature] expose error to user when producer reconnectToBroker --- pulsar/consumer_partition.go | 2 +- pulsar/producer_partition.go | 10 +++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 30639bd741..3ccb16b355 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1001,7 +1001,7 @@ func (pc *partitionConsumer) reconnectToBroker() { return } errMsg := err.Error() - if strings.Contains(errMsg, errTopicNotFount) { + if strings.Contains(errMsg, errTopicNotFount) || strings.Contains(errMsg,errMetadata){ // when topic is deleted, we should give up reconnection. pc.log.Warn("Topic Not Found.") break diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b2b92735c2..a20d0c1e01 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -60,6 +60,8 @@ var ( ) var errTopicNotFount = "TopicNotFound" +var errMetadata = "MetadataError" + type partitionProducer struct { state ua.Int32 @@ -67,6 +69,7 @@ type partitionProducer struct { topic string log log.Logger cnx internal.Connection + err error options *ProducerOptions producerName string @@ -350,13 +353,14 @@ func (p *partitionProducer) reconnectToBroker() { time.Sleep(d) atomic.AddUint64(&p.epoch, 1) err := p.grabCnx() + p.err = err if err == nil { // Successfully reconnected p.log.WithField("cnx", p.cnx.ID()).Info("Reconnected producer to broker") return } errMsg := err.Error() - if strings.Contains(errMsg, errTopicNotFount) { + if strings.Contains(errMsg, errTopicNotFount) || strings.Contains(errMsg,errMetadata){ // when topic is deleted, we should give up reconnection. p.log.Warn("Topic Not Found.") break @@ -742,6 +746,10 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer callback(nil, msg, errProducerClosed) return } + if p.err != nil { + callback(nil,msg,p.err) + return + } sr := &sendRequest{ ctx: ctx, From 2e8408a18b0a7cec0803acf72fb448b174ad12ee Mon Sep 17 00:00:00 2001 From: yorkxyzhang Date: Tue, 16 Nov 2021 16:47:41 +0800 Subject: [PATCH 2/4] fix code style --- pulsar/consumer_partition.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 3ccb16b355..f20d2bc682 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1001,7 +1001,7 @@ func (pc *partitionConsumer) reconnectToBroker() { return } errMsg := err.Error() - if strings.Contains(errMsg, errTopicNotFount) || strings.Contains(errMsg,errMetadata){ + if strings.Contains(errMsg, errTopicNotFount) || strings.Contains(errMsg, errMetadata) { // when topic is deleted, we should give up reconnection. pc.log.Warn("Topic Not Found.") break From 4ac03e54135fe9ef955f234d238bafd680c09788 Mon Sep 17 00:00:00 2001 From: yorkxyzhang Date: Tue, 16 Nov 2021 16:58:39 +0800 Subject: [PATCH 3/4] add comment when internal reconnect failed and fix code style --- pulsar/producer_partition.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index a20d0c1e01..ad54ee6d85 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -62,7 +62,6 @@ var ( var errTopicNotFount = "TopicNotFound" var errMetadata = "MetadataError" - type partitionProducer struct { state ua.Int32 client *client @@ -353,6 +352,9 @@ func (p *partitionProducer) reconnectToBroker() { time.Sleep(d) atomic.AddUint64(&p.epoch, 1) err := p.grabCnx() + // In reconnection logic, grabCnx maybe return err, but we did not return the error. + // So in partitionProducer struct, we define an err object to make it easier for users to + // determine what caused the grabCnx error. p.err = err if err == nil { // Successfully reconnected @@ -360,7 +362,7 @@ func (p *partitionProducer) reconnectToBroker() { return } errMsg := err.Error() - if strings.Contains(errMsg, errTopicNotFount) || strings.Contains(errMsg,errMetadata){ + if strings.Contains(errMsg, errTopicNotFount) || strings.Contains(errMsg, errMetadata) { // when topic is deleted, we should give up reconnection. p.log.Warn("Topic Not Found.") break @@ -747,7 +749,7 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer return } if p.err != nil { - callback(nil,msg,p.err) + callback(nil, msg, p.err) return } From e090d27882188c4be5e50bdfda2b151f0878e8fe Mon Sep 17 00:00:00 2001 From: yorkxyzhang Date: Wed, 17 Nov 2021 20:58:42 +0800 Subject: [PATCH 4/4] remove metadataError check --- pulsar/consumer_partition.go | 2 +- pulsar/producer_partition.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 9f73e7d8bc..c8e5d9f974 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1007,7 +1007,7 @@ func (pc *partitionConsumer) reconnectToBroker() { return } errMsg := err.Error() - if strings.Contains(errMsg, errTopicNotFount) || strings.Contains(errMsg, errMetadata) { + if strings.Contains(errMsg, errTopicNotFount) { // when topic is deleted, we should give up reconnection. pc.log.Warn("Topic Not Found.") break diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 8b00bd5f46..3a066f3d92 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -60,7 +60,6 @@ var ( ) var errTopicNotFount = "TopicNotFound" -var errMetadata = "MetadataError" type partitionProducer struct { state ua.Int32 @@ -362,7 +361,7 @@ func (p *partitionProducer) reconnectToBroker() { return } errMsg := err.Error() - if strings.Contains(errMsg, errTopicNotFount) || strings.Contains(errMsg, errMetadata) { + if strings.Contains(errMsg, errTopicNotFount) { // when topic is deleted, we should give up reconnection. p.log.Warn("Topic Not Found.") break