diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 2328ca882b..7c57416b5d 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -459,7 +459,9 @@ func (c *consumer) AckID(msgID MessageID) error { } if mid.consumer != nil { - return mid.Ack() + if pc, ok := (mid.consumer).(*partitionConsumer); ok && pc.getConsumerState() == consumerReady { + return mid.Ack() + } } return c.consumers[mid.partitionIdx].AckID(mid) @@ -522,8 +524,10 @@ func (c *consumer) Nack(msg Message) { } if mid.consumer != nil { - mid.Nack() - return + if pc, ok := (mid.consumer).(*partitionConsumer); ok && pc.getConsumerState() == consumerReady { + mid.Nack() + return + } } c.consumers[mid.partitionIdx].NackMsg(msg) return @@ -539,8 +543,10 @@ func (c *consumer) NackID(msgID MessageID) { } if mid.consumer != nil { - mid.Nack() - return + if pc, ok := (mid.consumer).(*partitionConsumer); ok && pc.getConsumerState() == consumerReady { + mid.Nack() + return + } } c.consumers[mid.partitionIdx].NackID(mid)