From a865358766ff774b73b348b11999e4fca9039a9a Mon Sep 17 00:00:00 2001 From: PGarule Date: Wed, 8 Sep 2021 15:56:49 +0530 Subject: [PATCH 1/6] add context param in producer interceptor --- pulsar/producer_interceptor.go | 14 ++++++++------ pulsar/producer_partition.go | 4 ++-- pulsar/producer_test.go | 9 +++++---- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/pulsar/producer_interceptor.go b/pulsar/producer_interceptor.go index cb2cc152f5..d14f3f1b57 100644 --- a/pulsar/producer_interceptor.go +++ b/pulsar/producer_interceptor.go @@ -17,26 +17,28 @@ package pulsar +import "context" + type ProducerInterceptor interface { // BeforeSend This is called before send the message to the brokers. This method is allowed to modify the - BeforeSend(producer Producer, message *ProducerMessage) + BeforeSend(ctx context.Context, producer Producer, message *ProducerMessage) // OnSendAcknowledgement This method is called when the message sent to the broker has been acknowledged, // or when sending the message fails. - OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) + OnSendAcknowledgement(ctx context.Context, producer Producer, message *ProducerMessage, msgID MessageID) } type ProducerInterceptors []ProducerInterceptor -func (x ProducerInterceptors) BeforeSend(producer Producer, message *ProducerMessage) { +func (x ProducerInterceptors) BeforeSend(ctx context.Context, producer Producer, message *ProducerMessage) { for i := range x { - x[i].BeforeSend(producer, message) + x[i].BeforeSend(ctx, producer, message) } } -func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { +func (x ProducerInterceptors) OnSendAcknowledgement(ctx context.Context, producer Producer, message *ProducerMessage, msgID MessageID) { for i := range x { - x[i].OnSendAcknowledgement(producer, message, msgID) + x[i].OnSendAcknowledgement(ctx, producer, message, msgID) } } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 4ae4e002ef..3702d532e0 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -739,7 +739,7 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer flushImmediately: flushImmediately, publishTime: time.Now(), } - p.options.Interceptors.BeforeSend(p, msg) + p.options.Interceptors.BeforeSend(ctx, p, msg) if p.options.DisableBlockIfQueueFull { if !p.publishSemaphore.TryAcquire() { @@ -818,7 +818,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) sr.callback(msgID, sr.msg, nil) } - p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID) + p.options.Interceptors.OnSendAcknowledgement(sr.ctx, p, sr.msg, msgID) } } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index f914017316..40959b6536 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1107,9 +1107,10 @@ func TestProducuerSendFailOnInvalidKey(t *testing.T) { type noopProduceInterceptor struct{} -func (noopProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {} +func (noopProduceInterceptor) BeforeSend(ctx context.Context, producer Producer, message *ProducerMessage) { +} -func (noopProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { +func (noopProduceInterceptor) OnSendAcknowledgement(ctx context.Context, producer Producer, message *ProducerMessage, msgID MessageID) { } // copyPropertyIntercepotr copy all keys in message properties map and add a suffix @@ -1118,11 +1119,11 @@ type metricProduceInterceptor struct { ackn int } -func (x *metricProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) { +func (x *metricProduceInterceptor) BeforeSend(ctx context.Context, producer Producer, message *ProducerMessage) { x.sendn++ } -func (x *metricProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { +func (x *metricProduceInterceptor) OnSendAcknowledgement(ctx context.Context, producer Producer, message *ProducerMessage, msgID MessageID) { x.ackn++ } From 1bdccde50fab5f350c0350d57d13a15a85ecd249 Mon Sep 17 00:00:00 2001 From: PGarule Date: Wed, 8 Sep 2021 16:27:52 +0530 Subject: [PATCH 2/6] remove context from anAck method --- pulsar/producer_interceptor.go | 6 +++--- pulsar/producer_partition.go | 2 +- pulsar/producer_test.go | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pulsar/producer_interceptor.go b/pulsar/producer_interceptor.go index d14f3f1b57..e545cf32f6 100644 --- a/pulsar/producer_interceptor.go +++ b/pulsar/producer_interceptor.go @@ -25,7 +25,7 @@ type ProducerInterceptor interface { // OnSendAcknowledgement This method is called when the message sent to the broker has been acknowledged, // or when sending the message fails. - OnSendAcknowledgement(ctx context.Context, producer Producer, message *ProducerMessage, msgID MessageID) + OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) } type ProducerInterceptors []ProducerInterceptor @@ -36,9 +36,9 @@ func (x ProducerInterceptors) BeforeSend(ctx context.Context, producer Producer, } } -func (x ProducerInterceptors) OnSendAcknowledgement(ctx context.Context, producer Producer, message *ProducerMessage, msgID MessageID) { +func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { for i := range x { - x[i].OnSendAcknowledgement(ctx, producer, message, msgID) + x[i].OnSendAcknowledgement(producer, message, msgID) } } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 3702d532e0..49c3c360e9 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -818,7 +818,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) sr.callback(msgID, sr.msg, nil) } - p.options.Interceptors.OnSendAcknowledgement(sr.ctx, p, sr.msg, msgID) + p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID) } } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 40959b6536..d329bf3bd0 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1110,7 +1110,7 @@ type noopProduceInterceptor struct{} func (noopProduceInterceptor) BeforeSend(ctx context.Context, producer Producer, message *ProducerMessage) { } -func (noopProduceInterceptor) OnSendAcknowledgement(ctx context.Context, producer Producer, message *ProducerMessage, msgID MessageID) { +func (noopProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { } // copyPropertyIntercepotr copy all keys in message properties map and add a suffix @@ -1123,7 +1123,7 @@ func (x *metricProduceInterceptor) BeforeSend(ctx context.Context, producer Prod x.sendn++ } -func (x *metricProduceInterceptor) OnSendAcknowledgement(ctx context.Context, producer Producer, message *ProducerMessage, msgID MessageID) { +func (x *metricProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { x.ackn++ } From 8b25fb8b53b930e68bd7e6ae7e1d3ebb0cf68906 Mon Sep 17 00:00:00 2001 From: PGarule Date: Wed, 8 Sep 2021 23:26:10 +0530 Subject: [PATCH 3/6] add comment --- pulsar/producer_partition.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 49c3c360e9..43b59f8178 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -739,6 +739,7 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer flushImmediately: flushImmediately, publishTime: time.Now(), } + // call interceptor with context p.options.Interceptors.BeforeSend(ctx, p, msg) if p.options.DisableBlockIfQueueFull { From 0f555f74723a648f2be6410b6635d432af8795a5 Mon Sep 17 00:00:00 2001 From: PGarule Date: Fri, 1 Oct 2021 09:22:54 +0530 Subject: [PATCH 4/6] dummy commit to trigger CI --- pulsar/producer_partition.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 43b59f8178..0204f651b2 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -739,7 +739,7 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer flushImmediately: flushImmediately, publishTime: time.Now(), } - // call interceptor with context + // call interceptor with context parameter p.options.Interceptors.BeforeSend(ctx, p, msg) if p.options.DisableBlockIfQueueFull { From 1fbe3d1dad92e6a2d4e84daaac1d8dc9bf4577e2 Mon Sep 17 00:00:00 2001 From: PGarule Date: Wed, 6 Oct 2021 07:04:11 +0530 Subject: [PATCH 5/6] dummy commit --- pulsar/producer_partition.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 580c12e40d..49b627c277 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -748,7 +748,7 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer flushImmediately: flushImmediately, publishTime: time.Now(), } - // call interceptor with context parameter + // call interceptor with context parameter p.options.Interceptors.BeforeSend(ctx, p, msg) if p.options.DisableBlockIfQueueFull { From 4b6c7e90c2accb5345c4c0d727707376c9d5d8ce Mon Sep 17 00:00:00 2001 From: PGarule Date: Wed, 6 Oct 2021 07:09:47 +0530 Subject: [PATCH 6/6] revert of dummy commit --- pulsar/producer_partition.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 49b627c277..580c12e40d 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -748,7 +748,7 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer flushImmediately: flushImmediately, publishTime: time.Now(), } - // call interceptor with context parameter + // call interceptor with context parameter p.options.Interceptors.BeforeSend(ctx, p, msg) if p.options.DisableBlockIfQueueFull {