From ce08ba7e019b5c20fbab88f0cec9168f8767a405 Mon Sep 17 00:00:00 2001 From: gydeng Date: Wed, 20 Aug 2025 03:22:34 +0000 Subject: [PATCH 01/10] improve: modify the structure of NegativeAckTracker to reduce memory overhead --- go.mod | 5 +- go.sum | 11 +++- pulsar/consumer.go | 5 ++ pulsar/consumer_impl.go | 1 + pulsar/consumer_partition.go | 3 +- pulsar/consumer_test.go | 78 +++++++++++++++++++++- pulsar/consumer_zero_queue_test.go | 20 +++++- pulsar/negative_acks_tracker.go | 98 +++++++++++++++++++++------- pulsar/negative_acks_tracker_test.go | 6 +- 9 files changed, 193 insertions(+), 34 deletions(-) diff --git a/go.mod b/go.mod index be3bf744ba..9d2fccf7e6 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,11 @@ require ( github.com/99designs/keyring v1.2.1 github.com/AthenZ/athenz v1.12.13 github.com/DataDog/zstd v1.5.0 - github.com/bits-and-blooms/bitset v1.4.0 + github.com/RoaringBitmap/roaring/v2 v2.8.0 + github.com/bits-and-blooms/bitset v1.12.0 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc + github.com/emirpasic/gods v1.18.1 github.com/golang-jwt/jwt/v5 v5.2.2 github.com/golang/protobuf v1.5.4 github.com/google/uuid v1.6.0 @@ -78,6 +80,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/morikuni/aec v1.0.0 // indirect + github.com/mschoch/smat v0.2.0 // indirect github.com/mtibben/percent v0.2.1 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nxadm/tail v1.4.8 // indirect diff --git a/go.sum b/go.sum index e150c46c92..d93a2279ed 100644 --- a/go.sum +++ b/go.sum @@ -14,12 +14,14 @@ github.com/DataDog/zstd v1.5.0 h1:+K/VEwIAaPcHiMtQvpLD4lqW7f0Gk3xdYZmI1hD+CXo= github.com/DataDog/zstd v1.5.0/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= +github.com/RoaringBitmap/roaring/v2 v2.8.0 h1:y1rdtixfXvaITKzkfiKvScI0hlBJHe9sfzJp8cgeM7w= +github.com/RoaringBitmap/roaring/v2 v2.8.0/go.mod h1:FiJcsfkGje/nZBZgCu0ZxCPOKD/hVXDS2dXi7/eUFE0= github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4= github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/bits-and-blooms/bitset v1.4.0 h1:+YZ8ePm+He2pU3dZlIZiOeAKfrBkXi1lSrXJ/Xzgbu8= -github.com/bits-and-blooms/bitset v1.4.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= +github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA= +github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE= github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= @@ -55,6 +57,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY= github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -156,6 +160,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= +github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= +github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= @@ -348,6 +354,7 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= diff --git a/pulsar/consumer.go b/pulsar/consumer.go index d611c6916e..c1398c58f4 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -183,6 +183,11 @@ type ConsumerOptions struct { // processed. Default is 1 min. (See `Consumer.Nack()`) NackRedeliveryDelay time.Duration + // NackPrecisionBit specifies the precision bit for nack redelivery delay. + // This is used to trim the lower bits of the nack redelivery delay to reduce memory usage. + // Default is 8 bits. + NackPrecisionBit int64 + // Name specifies the consumer name. Name string diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index c8b28b1f28..9da749433d 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -452,6 +452,7 @@ func newPartitionConsumerOpts(topic, consumerName string, idx int, options Consu receiverQueueSize: options.ReceiverQueueSize, nackRedeliveryDelay: nackRedeliveryDelay, nackBackoffPolicy: options.NackBackoffPolicy, + nackPrecisionBit: options.NackPrecisionBit, metadata: options.Properties, subProperties: options.SubscriptionProperties, replicateSubscriptionState: options.ReplicateSubscriptionState, diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 2a2f4eb0b4..1d69acc65e 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -104,6 +104,7 @@ type partitionConsumerOpts struct { autoReceiverQueueSize bool nackRedeliveryDelay time.Duration nackBackoffPolicy NackBackoffPolicy + nackPrecisionBit int64 metadata map[string]string subProperties map[string]string replicateSubscriptionState bool @@ -423,7 +424,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon pc.decryptor = decryptor - pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log) + pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log, options.nackPrecisionBit) err := pc.grabConn("") if err != nil { diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 4f67c3639a..de11a0db36 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1185,6 +1185,7 @@ func TestConsumerNack(t *testing.T) { SubscriptionName: "sub-1", Type: Shared, NackRedeliveryDelay: 1 * time.Second, + NackPrecisionBit: 8, }) assert.Nil(t, err) defer consumer.Close() @@ -1216,13 +1217,86 @@ func TestConsumerNack(t *testing.T) { // Failed messages should be resent // We should only receive the odd messages - for i := 1; i < N; i += 2 { + receivedOdd := 0 + expectedOdd := (N + 1) / 2 // Expected number of odd message IDs + + for receivedOdd < expectedOdd { msg, err := consumer.Receive(ctx) assert.Nil(t, err) - assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) + // Extract message ID from the payload (e.g., "msg-content-15") + var id int + _, err = fmt.Sscanf(string(msg.Payload()), "msg-content-%d", &id) + assert.Nil(t, err) + + // Count only odd message IDs + if id%2 == 1 { + assert.True(t, id%2 == 1) // Optional check, included for clarity + receivedOdd++ + } + + // Acknowledge message to mark it as processed consumer.Ack(msg) } + + // Verify that the correct number of odd messages were received + assert.Equal(t, expectedOdd, receivedOdd) +} + +func TestNegativeAckPrecisionBitCnt(t *testing.T) { + const delayMs = 2000 + + for precision := 1; precision <= 8; precision++ { + topicName := fmt.Sprintf("testNegativeAckPrecisionBitCnt-%d-%d", precision, time.Now().UnixNano()) + ctx := context.Background() + client, err := NewClient(ClientOptions{URL: lookupURL}) + assert.Nil(t, err) + defer client.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: "sub-1", + Type: Shared, + NackRedeliveryDelay: time.Duration(delayMs) * time.Millisecond, + NackPrecisionBit: int64(precision), + }) + assert.Nil(t, err) + defer consumer.Close() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + }) + assert.Nil(t, err) + defer producer.Close() + + // Send single message + content := "test-0" + _, err = producer.Send(ctx, &ProducerMessage{ + Payload: []byte(content), + }) + assert.Nil(t, err) + + // Receive and send negative ack + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + assert.Equal(t, content, string(msg.Payload())) + consumer.Nack(msg) + + // Calculate expected redelivery window + expectedRedelivery := time.Now().Add(time.Duration(delayMs) * time.Millisecond) + deviation := time.Duration(int64(1) << precision) + + // Wait for redelivery + redelivered, err := consumer.Receive(ctx) + assert.Nil(t, err) + assert.Equal(t, content, string(redelivered.Payload())) + + now := time.Now() + // Assert that redelivery happens >= expected - deviation + assert.GreaterOrEqual(t, now.UnixMilli(), expectedRedelivery.UnixMilli()-deviation.Milliseconds()) + + consumer.Ack(redelivered) + } } func TestConsumerCompression(t *testing.T) { diff --git a/pulsar/consumer_zero_queue_test.go b/pulsar/consumer_zero_queue_test.go index 1538a3c4c6..545fdf0664 100644 --- a/pulsar/consumer_zero_queue_test.go +++ b/pulsar/consumer_zero_queue_test.go @@ -431,6 +431,7 @@ func TestZeroQueueConsumer_Nack(t *testing.T) { Type: Shared, NackRedeliveryDelay: 1 * time.Second, EnableZeroQueueConsumer: true, + NackPrecisionBit: 8, }) assert.Nil(t, err) _, ok := consumer.(*zeroQueueConsumer) @@ -464,13 +465,28 @@ func TestZeroQueueConsumer_Nack(t *testing.T) { // Failed messages should be resent // We should only receive the odd messages - for i := 1; i < N; i += 2 { + receivedOdd := 0 + expectedOdd := (N + 1) / 2 + + for receivedOdd < expectedOdd { msg, err := consumer.Receive(ctx) assert.Nil(t, err) - assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) + + // Extract message ID + var id int + _, err = fmt.Sscanf(string(msg.Payload()), "msg-content-%d", &id) + assert.Nil(t, err) + + // Only accept odd message IDs + if id%2 == 1 { + receivedOdd++ + } consumer.Ack(msg) } + + // Assert we received the expected count of odd messages + assert.Equal(t, expectedOdd, receivedOdd) } func TestZeroQueueConsumer_Seek(t *testing.T) { diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go index 1331c7dfae..962f8b990e 100644 --- a/pulsar/negative_acks_tracker.go +++ b/pulsar/negative_acks_tracker.go @@ -21,6 +21,8 @@ import ( "sync" "time" + "github.com/RoaringBitmap/roaring/v2/roaring64" + "github.com/emirpasic/gods/trees/avltree" log "github.com/apache/pulsar-client-go/pulsar/log" ) @@ -28,12 +30,15 @@ type redeliveryConsumer interface { Redeliver(msgIDs []messageID) } +type LedgerId = int64 + type negativeAcksTracker struct { sync.Mutex doneCh chan interface{} doneOnce sync.Once - negativeAcks map[messageID]time.Time + negativeAcks *avltree.Tree + nackPrecisionBit int64 rc redeliveryConsumer nackBackoff NackBackoffPolicy tick *time.Ticker @@ -42,14 +47,26 @@ type negativeAcksTracker struct { } func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, - nackBackoffPolicy NackBackoffPolicy, logger log.Logger) *negativeAcksTracker { + nackBackoffPolicy NackBackoffPolicy, logger log.Logger, nackPrecisionBit int64) *negativeAcksTracker { t := &negativeAcksTracker{ doneCh: make(chan interface{}), - negativeAcks: make(map[messageID]time.Time), + negativeAcks: avltree.NewWith(func(a, b interface{}) int { + // compare time.Time + timeA := a.(time.Time) + timeB := b.(time.Time) + if timeA.Before(timeB) { + return -1 + } + if timeA.After(timeB) { + return 1 + } + return 0 + }), rc: rc, nackBackoff: nackBackoffPolicy, log: logger, + nackPrecisionBit: nackPrecisionBit, } if nackBackoffPolicy != nil { @@ -65,6 +82,14 @@ func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, return t } +func trimLowerBit(ts int64, precisionBit int64) int64 { + if precisionBit <= 0 { + return ts + } + mask := ^((int64(1) << precisionBit) - 1) + return ts & mask +} + func (t *negativeAcksTracker) Add(msgID *messageID) { // Always clear up the batch index since we want to track the nack // for the entire batch @@ -77,14 +102,20 @@ func (t *negativeAcksTracker) Add(msgID *messageID) { t.Lock() defer t.Unlock() - _, present := t.negativeAcks[batchMsgID] - if present { - // The batch is already being tracked - return - } - targetTime := time.Now().Add(t.delay) - t.negativeAcks[batchMsgID] = targetTime + trimmedTime := time.UnixMilli(trimLowerBit(targetTime.UnixMilli(), t.nackPrecisionBit)) + // try get trimmedTime + value, exists := t.negativeAcks.Get(trimmedTime) + if !exists { + newMap := make(map[LedgerId]*roaring64.Bitmap) + t.negativeAcks.Put(trimmedTime, newMap) + value = newMap + } + bitmapMap := value.(map[LedgerId]*roaring64.Bitmap) + if _, exists := bitmapMap[batchMsgID.ledgerID]; !exists { + bitmapMap[batchMsgID.ledgerID] = roaring64.NewBitmap() + } + bitmapMap[batchMsgID.ledgerID].Add(uint64(batchMsgID.entryID)) } func (t *negativeAcksTracker) AddMessage(msg Message) { @@ -103,14 +134,20 @@ func (t *negativeAcksTracker) AddMessage(msg Message) { t.Lock() defer t.Unlock() - _, present := t.negativeAcks[batchMsgID] - if present { - // The batch is already being tracked - return - } - targetTime := time.Now().Add(nackBackoffDelay) - t.negativeAcks[batchMsgID] = targetTime + trimmedTime := time.UnixMilli(trimLowerBit(targetTime.UnixMilli(), t.nackPrecisionBit)) + // try get trimmedTime + value, exists := t.negativeAcks.Get(trimmedTime) + if !exists { + newMap := make(map[LedgerId]*roaring64.Bitmap) + t.negativeAcks.Put(trimmedTime, newMap) + value = newMap + } + bitmapMap := value.(map[LedgerId]*roaring64.Bitmap) + if _, exists := bitmapMap[batchMsgID.ledgerID]; !exists { + bitmapMap[batchMsgID.ledgerID] = roaring64.NewBitmap() + } + bitmapMap[batchMsgID.ledgerID].Add(uint64(batchMsgID.entryID)) } func (t *negativeAcksTracker) track() { @@ -127,13 +164,28 @@ func (t *negativeAcksTracker) track() { t.Lock() - for msgID, targetTime := range t.negativeAcks { - t.log.Debugf("MsgId: %v -- targetTime: %v -- now: %v", msgID, targetTime, now) - if targetTime.Before(now) { - t.log.Debugf("Adding MsgId: %v", msgID) - msgIDs = append(msgIDs, msgID) - delete(t.negativeAcks, msgID) + iterator := t.negativeAcks.Iterator() + for iterator.Next() { + targetTime := iterator.Key().(time.Time) + // because use ordered map, so we can early break + if targetTime.After(now) { + break + } + + ledgerMap := iterator.Value().(map[LedgerId]*roaring64.Bitmap) + for ledgerID, entrySet := range ledgerMap { + for _, entryID := range entrySet.ToArray() { + msgID := messageID{ + ledgerID: ledgerID, + entryID: int64(entryID), + batchIdx: 0, + } + msgIDs = append(msgIDs, msgID) + } } + + // Safe deletion during iteration + t.negativeAcks.Remove(targetTime) } t.Unlock() diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go index 3f03ac446a..d025f32363 100644 --- a/pulsar/negative_acks_tracker_test.go +++ b/pulsar/negative_acks_tracker_test.go @@ -80,7 +80,7 @@ func (nmc *nackMockedConsumer) Wait() <-chan messageID { func TestNacksTracker(t *testing.T) { nmc := newNackMockedConsumer(nil) - nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger()) + nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger(), 8) nacks.Add(&messageID{ ledgerID: 1, @@ -113,7 +113,7 @@ func TestNacksTracker(t *testing.T) { func TestNacksWithBatchesTracker(t *testing.T) { nmc := newNackMockedConsumer(nil) - nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger()) + nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger(), 8) nacks.Add(&messageID{ ledgerID: 1, @@ -156,7 +156,7 @@ func TestNacksWithBatchesTracker(t *testing.T) { func TestNackBackoffTracker(t *testing.T) { nmc := newNackMockedConsumer(new(defaultNackBackoffPolicy)) - nacks := newNegativeAcksTracker(nmc, testNackDelay, new(defaultNackBackoffPolicy), log.DefaultNopLogger()) + nacks := newNegativeAcksTracker(nmc, testNackDelay, new(defaultNackBackoffPolicy), log.DefaultNopLogger(), 8) nacks.AddMessage(new(mockMessage1)) nacks.AddMessage(new(mockMessage2)) From 9c2d7800b24300aca09a6fbf799fcc63deb85516 Mon Sep 17 00:00:00 2001 From: gydeng Date: Wed, 20 Aug 2025 11:09:30 +0000 Subject: [PATCH 02/10] fix TestNegativeAckPrecisionBitCnt bug --- pulsar/consumer_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index ca2e5e3f9c..a7a3827d9a 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1244,7 +1244,7 @@ func TestConsumerNack(t *testing.T) { } func TestNegativeAckPrecisionBitCnt(t *testing.T) { - const delayMs = 2000 + const delay = 1 * time.Second for precision := 1; precision <= 8; precision++ { topicName := fmt.Sprintf("testNegativeAckPrecisionBitCnt-%d-%d", precision, time.Now().UnixNano()) @@ -1257,7 +1257,7 @@ func TestNegativeAckPrecisionBitCnt(t *testing.T) { Topic: topicName, SubscriptionName: "sub-1", Type: Shared, - NackRedeliveryDelay: time.Duration(delayMs) * time.Millisecond, + NackRedeliveryDelay: delay, NackPrecisionBit: int64(precision), }) assert.Nil(t, err) @@ -1283,8 +1283,8 @@ func TestNegativeAckPrecisionBitCnt(t *testing.T) { consumer.Nack(msg) // Calculate expected redelivery window - expectedRedelivery := time.Now().Add(time.Duration(delayMs) * time.Millisecond) - deviation := time.Duration(int64(1) << precision) + expectedRedelivery := time.Now().Add(delay) + deviation := time.Duration(int64(1) << precision) * time.Millisecond // Wait for redelivery redelivered, err := consumer.Receive(ctx) From 726e83c16fc6ec975447f2db237d1f03a039cb2d Mon Sep 17 00:00:00 2001 From: gydeng Date: Wed, 20 Aug 2025 11:29:10 +0000 Subject: [PATCH 03/10] chore: formatting issues --- pulsar/consumer_partition.go | 5 +- pulsar/consumer_test.go | 108 ++++++++++++++++---------------- pulsar/negative_acks_tracker.go | 42 ++++++------- 3 files changed, 78 insertions(+), 77 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 270f655e4b..8a7867e7d1 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -104,7 +104,7 @@ type partitionConsumerOpts struct { autoReceiverQueueSize bool nackRedeliveryDelay time.Duration nackBackoffPolicy NackBackoffPolicy - nackPrecisionBit int64 + nackPrecisionBit int64 metadata map[string]string subProperties map[string]string replicateSubscriptionState bool @@ -424,7 +424,8 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon pc.decryptor = decryptor - pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log, options.nackPrecisionBit) + pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log, + options.nackPrecisionBit) err := pc.grabConn("") if err != nil { diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index a7a3827d9a..2393e9c1c2 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1185,7 +1185,7 @@ func TestConsumerNack(t *testing.T) { SubscriptionName: "sub-1", Type: Shared, NackRedeliveryDelay: 1 * time.Second, - NackPrecisionBit: 8, + NackPrecisionBit: 8, }) assert.Nil(t, err) defer consumer.Close() @@ -1244,59 +1244,59 @@ func TestConsumerNack(t *testing.T) { } func TestNegativeAckPrecisionBitCnt(t *testing.T) { - const delay = 1 * time.Second - - for precision := 1; precision <= 8; precision++ { - topicName := fmt.Sprintf("testNegativeAckPrecisionBitCnt-%d-%d", precision, time.Now().UnixNano()) - ctx := context.Background() - client, err := NewClient(ClientOptions{URL: lookupURL}) - assert.Nil(t, err) - defer client.Close() - - consumer, err := client.Subscribe(ConsumerOptions{ - Topic: topicName, - SubscriptionName: "sub-1", - Type: Shared, - NackRedeliveryDelay: delay, - NackPrecisionBit: int64(precision), - }) - assert.Nil(t, err) - defer consumer.Close() - - producer, err := client.CreateProducer(ProducerOptions{ - Topic: topicName, - }) - assert.Nil(t, err) - defer producer.Close() - - // Send single message - content := "test-0" - _, err = producer.Send(ctx, &ProducerMessage{ - Payload: []byte(content), - }) - assert.Nil(t, err) - - // Receive and send negative ack - msg, err := consumer.Receive(ctx) - assert.Nil(t, err) - assert.Equal(t, content, string(msg.Payload())) - consumer.Nack(msg) - - // Calculate expected redelivery window - expectedRedelivery := time.Now().Add(delay) - deviation := time.Duration(int64(1) << precision) * time.Millisecond - - // Wait for redelivery - redelivered, err := consumer.Receive(ctx) - assert.Nil(t, err) - assert.Equal(t, content, string(redelivered.Payload())) - - now := time.Now() - // Assert that redelivery happens >= expected - deviation - assert.GreaterOrEqual(t, now.UnixMilli(), expectedRedelivery.UnixMilli()-deviation.Milliseconds()) - - consumer.Ack(redelivered) - } + const delay = 1 * time.Second + + for precision := 1; precision <= 8; precision++ { + topicName := fmt.Sprintf("testNegativeAckPrecisionBitCnt-%d-%d", precision, time.Now().UnixNano()) + ctx := context.Background() + client, err := NewClient(ClientOptions{URL: lookupURL}) + assert.Nil(t, err) + defer client.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: "sub-1", + Type: Shared, + NackRedeliveryDelay: delay, + NackPrecisionBit: int64(precision), + }) + assert.Nil(t, err) + defer consumer.Close() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + }) + assert.Nil(t, err) + defer producer.Close() + + // Send single message + content := "test-0" + _, err = producer.Send(ctx, &ProducerMessage{ + Payload: []byte(content), + }) + assert.Nil(t, err) + + // Receive and send negative ack + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + assert.Equal(t, content, string(msg.Payload())) + consumer.Nack(msg) + + // Calculate expected redelivery window + expectedRedelivery := time.Now().Add(delay) + deviation := time.Duration(int64(1)<= expected - deviation + assert.GreaterOrEqual(t, now.UnixMilli(), expectedRedelivery.UnixMilli()-deviation.Milliseconds()) + + consumer.Ack(redelivered) + } } func TestConsumerCompression(t *testing.T) { diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go index 962f8b990e..f4e250ed3a 100644 --- a/pulsar/negative_acks_tracker.go +++ b/pulsar/negative_acks_tracker.go @@ -22,35 +22,35 @@ import ( "time" "github.com/RoaringBitmap/roaring/v2/roaring64" - "github.com/emirpasic/gods/trees/avltree" log "github.com/apache/pulsar-client-go/pulsar/log" + "github.com/emirpasic/gods/trees/avltree" ) type redeliveryConsumer interface { Redeliver(msgIDs []messageID) } -type LedgerId = int64 +type LedgerID = int64 type negativeAcksTracker struct { sync.Mutex - doneCh chan interface{} - doneOnce sync.Once - negativeAcks *avltree.Tree + doneCh chan interface{} + doneOnce sync.Once + negativeAcks *avltree.Tree nackPrecisionBit int64 - rc redeliveryConsumer - nackBackoff NackBackoffPolicy - tick *time.Ticker - delay time.Duration - log log.Logger + rc redeliveryConsumer + nackBackoff NackBackoffPolicy + tick *time.Ticker + delay time.Duration + log log.Logger } func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, nackBackoffPolicy NackBackoffPolicy, logger log.Logger, nackPrecisionBit int64) *negativeAcksTracker { t := &negativeAcksTracker{ - doneCh: make(chan interface{}), + doneCh: make(chan interface{}), negativeAcks: avltree.NewWith(func(a, b interface{}) int { // compare time.Time timeA := a.(time.Time) @@ -62,10 +62,10 @@ func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, return 1 } return 0 - }), - rc: rc, - nackBackoff: nackBackoffPolicy, - log: logger, + }), + rc: rc, + nackBackoff: nackBackoffPolicy, + log: logger, nackPrecisionBit: nackPrecisionBit, } @@ -107,11 +107,11 @@ func (t *negativeAcksTracker) Add(msgID *messageID) { // try get trimmedTime value, exists := t.negativeAcks.Get(trimmedTime) if !exists { - newMap := make(map[LedgerId]*roaring64.Bitmap) + newMap := make(map[LedgerID]*roaring64.Bitmap) t.negativeAcks.Put(trimmedTime, newMap) value = newMap } - bitmapMap := value.(map[LedgerId]*roaring64.Bitmap) + bitmapMap := value.(map[LedgerID]*roaring64.Bitmap) if _, exists := bitmapMap[batchMsgID.ledgerID]; !exists { bitmapMap[batchMsgID.ledgerID] = roaring64.NewBitmap() } @@ -139,11 +139,11 @@ func (t *negativeAcksTracker) AddMessage(msg Message) { // try get trimmedTime value, exists := t.negativeAcks.Get(trimmedTime) if !exists { - newMap := make(map[LedgerId]*roaring64.Bitmap) + newMap := make(map[LedgerID]*roaring64.Bitmap) t.negativeAcks.Put(trimmedTime, newMap) value = newMap } - bitmapMap := value.(map[LedgerId]*roaring64.Bitmap) + bitmapMap := value.(map[LedgerID]*roaring64.Bitmap) if _, exists := bitmapMap[batchMsgID.ledgerID]; !exists { bitmapMap[batchMsgID.ledgerID] = roaring64.NewBitmap() } @@ -167,12 +167,12 @@ func (t *negativeAcksTracker) track() { iterator := t.negativeAcks.Iterator() for iterator.Next() { targetTime := iterator.Key().(time.Time) - // because use ordered map, so we can early break + // because use ordered map, so we can early break if targetTime.After(now) { break } - ledgerMap := iterator.Value().(map[LedgerId]*roaring64.Bitmap) + ledgerMap := iterator.Value().(map[LedgerID]*roaring64.Bitmap) for ledgerID, entrySet := range ledgerMap { for _, entryID := range entrySet.ToArray() { msgID := messageID{ From efdd3a1e4406e1b94776deb494d943d7f4d1c189 Mon Sep 17 00:00:00 2001 From: gydeng Date: Mon, 15 Sep 2025 12:33:55 +0000 Subject: [PATCH 04/10] fix: fix issues with TestConsumerNack --- pulsar/consumer_impl.go | 4 ++++ pulsar/consumer_test.go | 7 ++---- pulsar/negative_acks_tracker.go | 42 ++++++++++++--------------------- 3 files changed, 21 insertions(+), 32 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 17e9f4f7bb..0a6195ec6c 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -121,6 +121,10 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { options.NackBackoffPolicy = new(defaultNackBackoffPolicy) } + if options.NackPrecisionBit <= 0 { + options.NackPrecisionBit = 8 + } + // did the user pass in a message channel? messageCh := options.MessageChannel if options.MessageChannel == nil { diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 8b79481c46..7c227b82e6 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1229,11 +1229,8 @@ func TestConsumerNack(t *testing.T) { _, err = fmt.Sscanf(string(msg.Payload()), "msg-content-%d", &id) assert.Nil(t, err) - // Count only odd message IDs - if id%2 == 1 { - assert.True(t, id%2 == 1) // Optional check, included for clarity - receivedOdd++ - } + assert.True(t, id%2 == 1) + receivedOdd++ // Acknowledge message to mark it as processed consumer.Ack(msg) diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go index f4e250ed3a..d1a030ed0a 100644 --- a/pulsar/negative_acks_tracker.go +++ b/pulsar/negative_acks_tracker.go @@ -90,19 +90,11 @@ func trimLowerBit(ts int64, precisionBit int64) int64 { return ts & mask } -func (t *negativeAcksTracker) Add(msgID *messageID) { - // Always clear up the batch index since we want to track the nack - // for the entire batch - batchMsgID := messageID{ - ledgerID: msgID.ledgerID, - entryID: msgID.entryID, - batchIdx: 0, - } - +func putNackEntry(t *negativeAcksTracker, batchMsgID *messageID, delay time.Duration) { t.Lock() defer t.Unlock() - targetTime := time.Now().Add(t.delay) + targetTime := time.Now().Add(delay) trimmedTime := time.UnixMilli(trimLowerBit(targetTime.UnixMilli(), t.nackPrecisionBit)) // try get trimmedTime value, exists := t.negativeAcks.Get(trimmedTime) @@ -118,6 +110,18 @@ func (t *negativeAcksTracker) Add(msgID *messageID) { bitmapMap[batchMsgID.ledgerID].Add(uint64(batchMsgID.entryID)) } +func (t *negativeAcksTracker) Add(msgID *messageID) { + // Always clear up the batch index since we want to track the nack + // for the entire batch + batchMsgID := messageID{ + ledgerID: msgID.ledgerID, + entryID: msgID.entryID, + batchIdx: 0, + } + + putNackEntry(t, &batchMsgID, t.delay) +} + func (t *negativeAcksTracker) AddMessage(msg Message) { nackBackoffDelay := t.nackBackoff.Next(msg.RedeliveryCount()) @@ -131,23 +135,7 @@ func (t *negativeAcksTracker) AddMessage(msg Message) { batchIdx: 0, } - t.Lock() - defer t.Unlock() - - targetTime := time.Now().Add(nackBackoffDelay) - trimmedTime := time.UnixMilli(trimLowerBit(targetTime.UnixMilli(), t.nackPrecisionBit)) - // try get trimmedTime - value, exists := t.negativeAcks.Get(trimmedTime) - if !exists { - newMap := make(map[LedgerID]*roaring64.Bitmap) - t.negativeAcks.Put(trimmedTime, newMap) - value = newMap - } - bitmapMap := value.(map[LedgerID]*roaring64.Bitmap) - if _, exists := bitmapMap[batchMsgID.ledgerID]; !exists { - bitmapMap[batchMsgID.ledgerID] = roaring64.NewBitmap() - } - bitmapMap[batchMsgID.ledgerID].Add(uint64(batchMsgID.entryID)) + putNackEntry(t, &batchMsgID, nackBackoffDelay) } func (t *negativeAcksTracker) track() { From 755933628a89e9bc9bde8dfb3c423217e7c8696b Mon Sep 17 00:00:00 2001 From: gydeng Date: Fri, 26 Sep 2025 08:45:01 +0000 Subject: [PATCH 05/10] chore: revert changes to TestConsumerNack --- pulsar/consumer.go | 2 +- pulsar/consumer_impl.go | 5 +++-- pulsar/consumer_partition.go | 2 +- pulsar/consumer_test.go | 22 ++++----------------- pulsar/consumer_zero_queue_test.go | 1 - pulsar/negative_acks_tracker.go | 29 ++++++++++++++++++---------- pulsar/negative_acks_tracker_test.go | 6 +++--- 7 files changed, 31 insertions(+), 36 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index e5dbf21574..7996335a82 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -194,7 +194,7 @@ type ConsumerOptions struct { // NackPrecisionBit specifies the precision bit for nack redelivery delay. // This is used to trim the lower bits of the nack redelivery delay to reduce memory usage. // Default is 8 bits. - NackPrecisionBit int64 + NackPrecisionBit *int64 // Name specifies the consumer name. Name string diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 0a6195ec6c..e34435b135 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -121,8 +121,9 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { options.NackBackoffPolicy = new(defaultNackBackoffPolicy) } - if options.NackPrecisionBit <= 0 { - options.NackPrecisionBit = 8 + if options.NackPrecisionBit == nil || *options.NackPrecisionBit < 0 { + defaultVal := int64(8) + options.NackPrecisionBit = &defaultVal } // did the user pass in a message channel? diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 624eb836d7..bfd44b1837 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -104,7 +104,7 @@ type partitionConsumerOpts struct { autoReceiverQueueSize bool nackRedeliveryDelay time.Duration nackBackoffPolicy NackBackoffPolicy - nackPrecisionBit int64 + nackPrecisionBit *int64 metadata map[string]string subProperties map[string]string replicateSubscriptionState bool diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 92a9bcd9bf..e56845b4ce 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1185,7 +1185,6 @@ func TestConsumerNack(t *testing.T) { SubscriptionName: "sub-1", Type: Shared, NackRedeliveryDelay: 1 * time.Second, - NackPrecisionBit: 8, }) assert.Nil(t, err) defer consumer.Close() @@ -1217,27 +1216,13 @@ func TestConsumerNack(t *testing.T) { // Failed messages should be resent // We should only receive the odd messages - receivedOdd := 0 - expectedOdd := (N + 1) / 2 // Expected number of odd message IDs - - for receivedOdd < expectedOdd { + for i := 1; i < N; i += 2 { msg, err := consumer.Receive(ctx) assert.Nil(t, err) + assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) - // Extract message ID from the payload (e.g., "msg-content-15") - var id int - _, err = fmt.Sscanf(string(msg.Payload()), "msg-content-%d", &id) - assert.Nil(t, err) - - assert.True(t, id%2 == 1) - receivedOdd++ - - // Acknowledge message to mark it as processed consumer.Ack(msg) } - - // Verify that the correct number of odd messages were received - assert.Equal(t, expectedOdd, receivedOdd) } func TestNegativeAckPrecisionBitCnt(t *testing.T) { @@ -1250,12 +1235,13 @@ func TestNegativeAckPrecisionBitCnt(t *testing.T) { assert.Nil(t, err) defer client.Close() + precision := int64(precision) consumer, err := client.Subscribe(ConsumerOptions{ Topic: topicName, SubscriptionName: "sub-1", Type: Shared, NackRedeliveryDelay: delay, - NackPrecisionBit: int64(precision), + NackPrecisionBit: &precision, }) assert.Nil(t, err) defer consumer.Close() diff --git a/pulsar/consumer_zero_queue_test.go b/pulsar/consumer_zero_queue_test.go index a0817c9d20..d1cfeb016d 100644 --- a/pulsar/consumer_zero_queue_test.go +++ b/pulsar/consumer_zero_queue_test.go @@ -553,7 +553,6 @@ func TestZeroQueueConsumer_Nack(t *testing.T) { Type: Shared, NackRedeliveryDelay: 1 * time.Second, EnableZeroQueueConsumer: true, - NackPrecisionBit: 8, }) assert.Nil(t, err) _, ok := consumer.(*zeroQueueConsumer) diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go index d1a030ed0a..f3d87f8b50 100644 --- a/pulsar/negative_acks_tracker.go +++ b/pulsar/negative_acks_tracker.go @@ -38,7 +38,7 @@ type negativeAcksTracker struct { doneCh chan interface{} doneOnce sync.Once negativeAcks *avltree.Tree - nackPrecisionBit int64 + nackPrecisionBit *int64 rc redeliveryConsumer nackBackoff NackBackoffPolicy tick *time.Ticker @@ -47,21 +47,26 @@ type negativeAcksTracker struct { } func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, - nackBackoffPolicy NackBackoffPolicy, logger log.Logger, nackPrecisionBit int64) *negativeAcksTracker { + nackBackoffPolicy NackBackoffPolicy, logger log.Logger, nackPrecisionBit *int64) *negativeAcksTracker { t := &negativeAcksTracker{ doneCh: make(chan interface{}), negativeAcks: avltree.NewWith(func(a, b interface{}) int { - // compare time.Time - timeA := a.(time.Time) - timeB := b.(time.Time) + // Perform type assertions and handle invalid types. + timeA, okA := a.(time.Time) + timeB, okB := b.(time.Time) + + if !okA || !okB { + panic("invalid type: both values must be of type time.Time") + } + + // Compare the two time.Time values. if timeA.Before(timeB) { return -1 - } - if timeA.After(timeB) { + } else if timeA.After(timeB) { return 1 } - return 0 + return 0 // Equal times. }), rc: rc, nackBackoff: nackBackoffPolicy, @@ -95,7 +100,7 @@ func putNackEntry(t *negativeAcksTracker, batchMsgID *messageID, delay time.Dura defer t.Unlock() targetTime := time.Now().Add(delay) - trimmedTime := time.UnixMilli(trimLowerBit(targetTime.UnixMilli(), t.nackPrecisionBit)) + trimmedTime := time.UnixMilli(trimLowerBit(targetTime.UnixMilli(), *t.nackPrecisionBit)) // try get trimmedTime value, exists := t.negativeAcks.Get(trimmedTime) if !exists { @@ -103,7 +108,11 @@ func putNackEntry(t *negativeAcksTracker, batchMsgID *messageID, delay time.Dura t.negativeAcks.Put(trimmedTime, newMap) value = newMap } - bitmapMap := value.(map[LedgerID]*roaring64.Bitmap) + bitmapMap, ok := value.(map[LedgerID]*roaring64.Bitmap) + if !ok { + t.log.Errorf("negativeAcksTracker: value for time %v is not of expected type map[LedgerID]*roaring64.Bitmap", trimmedTime) + return + } if _, exists := bitmapMap[batchMsgID.ledgerID]; !exists { bitmapMap[batchMsgID.ledgerID] = roaring64.NewBitmap() } diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go index d025f32363..29caa31688 100644 --- a/pulsar/negative_acks_tracker_test.go +++ b/pulsar/negative_acks_tracker_test.go @@ -80,7 +80,7 @@ func (nmc *nackMockedConsumer) Wait() <-chan messageID { func TestNacksTracker(t *testing.T) { nmc := newNackMockedConsumer(nil) - nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger(), 8) + nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger(), nil) nacks.Add(&messageID{ ledgerID: 1, @@ -113,7 +113,7 @@ func TestNacksTracker(t *testing.T) { func TestNacksWithBatchesTracker(t *testing.T) { nmc := newNackMockedConsumer(nil) - nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger(), 8) + nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger(), nil) nacks.Add(&messageID{ ledgerID: 1, @@ -156,7 +156,7 @@ func TestNacksWithBatchesTracker(t *testing.T) { func TestNackBackoffTracker(t *testing.T) { nmc := newNackMockedConsumer(new(defaultNackBackoffPolicy)) - nacks := newNegativeAcksTracker(nmc, testNackDelay, new(defaultNackBackoffPolicy), log.DefaultNopLogger(), 8) + nacks := newNegativeAcksTracker(nmc, testNackDelay, new(defaultNackBackoffPolicy), log.DefaultNopLogger(), nil) nacks.AddMessage(new(mockMessage1)) nacks.AddMessage(new(mockMessage2)) From ff59841a591a0a66d79e25ca86ad0f3359cf088b Mon Sep 17 00:00:00 2001 From: gydeng Date: Fri, 26 Sep 2025 11:44:07 +0000 Subject: [PATCH 06/10] fix: add defaultNackPrecisionBitVal --- pulsar/consumer_impl.go | 3 +-- pulsar/negative_acks_tracker.go | 5 ++++- pulsar/negative_acks_tracker_test.go | 9 ++++++--- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index e34435b135..f1bbd8d541 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -122,8 +122,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { } if options.NackPrecisionBit == nil || *options.NackPrecisionBit < 0 { - defaultVal := int64(8) - options.NackPrecisionBit = &defaultVal + options.NackPrecisionBit = &defaultNackPrecisionBit } // did the user pass in a message channel? diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go index f3d87f8b50..c289bfc52a 100644 --- a/pulsar/negative_acks_tracker.go +++ b/pulsar/negative_acks_tracker.go @@ -110,7 +110,8 @@ func putNackEntry(t *negativeAcksTracker, batchMsgID *messageID, delay time.Dura } bitmapMap, ok := value.(map[LedgerID]*roaring64.Bitmap) if !ok { - t.log.Errorf("negativeAcksTracker: value for time %v is not of expected type map[LedgerID]*roaring64.Bitmap", trimmedTime) + t.log.Errorf("negativeAcksTracker: value for time %v is not of expected type map[LedgerID]*roaring64.Bitmap", + trimmedTime) return } if _, exists := bitmapMap[batchMsgID.ledgerID]; !exists { @@ -202,3 +203,5 @@ func (t *negativeAcksTracker) Close() { t.doneCh <- nil }) } + +var defaultNackPrecisionBit = int64(8) diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go index 29caa31688..7da7dc363c 100644 --- a/pulsar/negative_acks_tracker_test.go +++ b/pulsar/negative_acks_tracker_test.go @@ -29,6 +29,8 @@ import ( const testNackDelay = 300 * time.Millisecond +var testNackPrecisionBit = &defaultNackPrecisionBit + type nackMockedConsumer struct { ch chan messageID closed bool @@ -80,7 +82,7 @@ func (nmc *nackMockedConsumer) Wait() <-chan messageID { func TestNacksTracker(t *testing.T) { nmc := newNackMockedConsumer(nil) - nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger(), nil) + nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger(), testNackPrecisionBit) nacks.Add(&messageID{ ledgerID: 1, @@ -113,7 +115,7 @@ func TestNacksTracker(t *testing.T) { func TestNacksWithBatchesTracker(t *testing.T) { nmc := newNackMockedConsumer(nil) - nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger(), nil) + nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger(), testNackPrecisionBit) nacks.Add(&messageID{ ledgerID: 1, @@ -156,7 +158,8 @@ func TestNacksWithBatchesTracker(t *testing.T) { func TestNackBackoffTracker(t *testing.T) { nmc := newNackMockedConsumer(new(defaultNackBackoffPolicy)) - nacks := newNegativeAcksTracker(nmc, testNackDelay, new(defaultNackBackoffPolicy), log.DefaultNopLogger(), nil) + nacks := newNegativeAcksTracker(nmc, testNackDelay, new(defaultNackBackoffPolicy), log.DefaultNopLogger(), + testNackPrecisionBit) nacks.AddMessage(new(mockMessage1)) nacks.AddMessage(new(mockMessage2)) From 584c93fb9862216b28a0bab65049220cf4bc3ab0 Mon Sep 17 00:00:00 2001 From: gydeng Date: Fri, 26 Sep 2025 12:09:10 +0000 Subject: [PATCH 07/10] fix: revert changes to TestZeroQueueConsumer_Nack --- pulsar/consumer_zero_queue_test.go | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/pulsar/consumer_zero_queue_test.go b/pulsar/consumer_zero_queue_test.go index d1cfeb016d..06db433bd9 100644 --- a/pulsar/consumer_zero_queue_test.go +++ b/pulsar/consumer_zero_queue_test.go @@ -586,28 +586,13 @@ func TestZeroQueueConsumer_Nack(t *testing.T) { // Failed messages should be resent // We should only receive the odd messages - receivedOdd := 0 - expectedOdd := (N + 1) / 2 - - for receivedOdd < expectedOdd { + for i := 1; i < N; i += 2 { msg, err := consumer.Receive(ctx) assert.Nil(t, err) - - // Extract message ID - var id int - _, err = fmt.Sscanf(string(msg.Payload()), "msg-content-%d", &id) - assert.Nil(t, err) - - // Only accept odd message IDs - if id%2 == 1 { - receivedOdd++ - } + assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) consumer.Ack(msg) } - - // Assert we received the expected count of odd messages - assert.Equal(t, expectedOdd, receivedOdd) } func TestZeroQueueConsumer_Seek(t *testing.T) { From 61c14ea41661ac6453cf9e4a692539298534ff22 Mon Sep 17 00:00:00 2001 From: gydeng Date: Sun, 12 Oct 2025 05:55:38 +0000 Subject: [PATCH 08/10] add TestNackPrecisionBitDefaultBehavior --- pulsar/consumer_impl.go | 6 +- pulsar/consumer_test.go | 138 +++++++++++++++++++++++++++ pulsar/negative_acks_tracker.go | 8 +- pulsar/negative_acks_tracker_test.go | 2 +- 4 files changed, 147 insertions(+), 7 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index f1bbd8d541..7b1fcc3393 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -121,8 +121,10 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { options.NackBackoffPolicy = new(defaultNackBackoffPolicy) } - if options.NackPrecisionBit == nil || *options.NackPrecisionBit < 0 { - options.NackPrecisionBit = &defaultNackPrecisionBit + if options.NackPrecisionBit == nil { + options.NackPrecisionBit = Ptr(defaultNackPrecisionBit) + } else if *options.NackPrecisionBit < 0 { + return nil, newError(InvalidConfiguration, "NackPrecisionBit cannot be negative") } // did the user pass in a message channel? diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index e56845b4ce..0677a88326 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1277,11 +1277,149 @@ func TestNegativeAckPrecisionBitCnt(t *testing.T) { now := time.Now() // Assert that redelivery happens >= expected - deviation assert.GreaterOrEqual(t, now.UnixMilli(), expectedRedelivery.UnixMilli()-deviation.Milliseconds()) + // since the client ticks at an interval of delay / 3 (i.e., 333 ms in this test), + // we add an extra 400 milliseconds to reduce the flaky + assert.LessOrEqual(t, now.UnixMilli(), expectedRedelivery.UnixMilli()+400) consumer.Ack(redelivered) } } +func TestNackPrecisionBitDefaultBehavior(t *testing.T) { + // Test that default NackPrecisionBit (8) behaves the same as explicitly setting it to 8 + // This test uses precise timing to verify that messages within 256ms window are grouped together + + const delay = 300 * time.Millisecond // Tracker scans every 100ms (delay/3) + ctx := context.Background() + + // Create two consumers with different topic names to avoid conflicts + topicNameDefault := fmt.Sprintf("testNackPrecisionBitDefault-default-%d", time.Now().UnixNano()) + topicNameExplicit := fmt.Sprintf("testNackPrecisionBitDefault-explicit-%d", time.Now().UnixNano()) + + client, err := NewClient(ClientOptions{URL: lookupURL}) + assert.Nil(t, err) + defer client.Close() + + // Consumer 1: Default NackPrecisionBit (should be 8) + consumerDefault, err := client.Subscribe(ConsumerOptions{ + Topic: topicNameDefault, + SubscriptionName: "sub-default", + Type: Shared, + NackRedeliveryDelay: delay, + // NackPrecisionBit is not set, should use default value of 8 + }) + assert.Nil(t, err) + defer consumerDefault.Close() + + // Consumer 2: Explicit NackPrecisionBit set to 8 + consumerExplicit, err := client.Subscribe(ConsumerOptions{ + Topic: topicNameExplicit, + SubscriptionName: "sub-explicit", + Type: Shared, + NackRedeliveryDelay: delay, + NackPrecisionBit: Ptr(defaultNackPrecisionBit), + }) + assert.Nil(t, err) + defer consumerExplicit.Close() + + // Create producers for both topics + producerDefault, err := client.CreateProducer(ProducerOptions{ + Topic: topicNameDefault, + }) + assert.Nil(t, err) + defer producerDefault.Close() + + producerExplicit, err := client.CreateProducer(ProducerOptions{ + Topic: topicNameExplicit, + }) + assert.Nil(t, err) + defer producerExplicit.Close() + + // Test function to verify precision bit behavior for a given consumer and producer + testPrecisionBitBehavior := func(consumer Consumer, producer Producer, topicName string) { + // Wait for next 256ms boundary to align timing + now := time.Now() + ms := now.UnixMilli() + nextBoundary := ((ms / 256) + 1) * 256 // Next 256ms boundary + waitTime := time.Duration(nextBoundary-ms) * time.Millisecond + time.Sleep(waitTime) + + // Send first message at 256ms boundary + content1 := fmt.Sprintf("msg1-%s", topicName) + _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(content1), + }) + assert.Nil(t, err) + + // Wait 200ms (within 256ms window) + time.Sleep(200 * time.Millisecond) + + // Send second message 200ms after first (still within 256ms window) + content2 := fmt.Sprintf("msg2-%s", topicName) + _, err = producer.Send(ctx, &ProducerMessage{ + Payload: []byte(content2), + }) + assert.Nil(t, err) + + // Receive both messages and nack them + msg1, err := consumer.Receive(ctx) + assert.Nil(t, err) + assert.Equal(t, content1, string(msg1.Payload())) + consumer.Nack(msg1) + + msg2, err := consumer.Receive(ctx) + assert.Nil(t, err) + assert.Equal(t, content2, string(msg2.Payload())) + consumer.Nack(msg2) + + // Record when nacks were sent + nackTime := time.Now() + expectedRedelivery := nackTime.Add(delay) + + // Wait for redeliveries - both messages should be redelivered together + // because they are within the 256ms precision bit window + redelivered1, err := consumer.Receive(ctx) + assert.Nil(t, err) + redeliveryTime1 := time.Now() + + redelivered2, err := consumer.Receive(ctx) + assert.Nil(t, err) + redeliveryTime2 := time.Now() + + // Verify both messages were redelivered + assert.True(t, string(redelivered1.Payload()) == content1 || string(redelivered1.Payload()) == content2, + "First redelivered message should be one of the original messages") + assert.True(t, string(redelivered2.Payload()) == content1 || string(redelivered2.Payload()) == content2, + "Second redelivered message should be one of the original messages") + assert.NotEqual(t, string(redelivered1.Payload()), string(redelivered2.Payload()), + "Redelivered messages should be different") + + // KEY TEST: Both messages should be redelivered simultaneously (within 10ms of each other to reduce flaky) + // because 256ms alignment groups both messages in the same redelivery interval + timeDiff := redeliveryTime2.Sub(redeliveryTime1) + assert.Less(t, timeDiff, 10*time.Millisecond, + "Both redelivered messages should arrive simultaneously (within 10ms) due to 256ms alignment "+ + "placing them in the same redelivery interval") + + // Redelivery should happen within expected window (considering 256ms precision) + minExpected := expectedRedelivery.Add(-256 * time.Millisecond) + maxExpected := expectedRedelivery.Add(150 * time.Millisecond) // Buffer for test stability + + assert.GreaterOrEqual(t, redeliveryTime1.UnixMilli(), minExpected.UnixMilli(), + "Redelivery should happen after minimum expected time") + assert.LessOrEqual(t, redeliveryTime2.UnixMilli(), maxExpected.UnixMilli(), + "Redelivery should happen before maximum expected time") + + // Acknowledge both messages + consumer.Ack(redelivered1) + consumer.Ack(redelivered2) + } + + // Test both consumers + testPrecisionBitBehavior(consumerDefault, producerDefault, "default") + testPrecisionBitBehavior(consumerExplicit, producerExplicit, "explicit") +} + func TestConsumerCompression(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go index c289bfc52a..6e4b3eb215 100644 --- a/pulsar/negative_acks_tracker.go +++ b/pulsar/negative_acks_tracker.go @@ -110,9 +110,7 @@ func putNackEntry(t *negativeAcksTracker, batchMsgID *messageID, delay time.Dura } bitmapMap, ok := value.(map[LedgerID]*roaring64.Bitmap) if !ok { - t.log.Errorf("negativeAcksTracker: value for time %v is not of expected type map[LedgerID]*roaring64.Bitmap", - trimmedTime) - return + panic("negativeAcksTracker: value is not of expected type map[LedgerID]*roaring64.Bitmap") } if _, exists := bitmapMap[batchMsgID.ledgerID]; !exists { bitmapMap[batchMsgID.ledgerID] = roaring64.NewBitmap() @@ -204,4 +202,6 @@ func (t *negativeAcksTracker) Close() { }) } -var defaultNackPrecisionBit = int64(8) +func Ptr[T any](v T) *T { return &v } + +const defaultNackPrecisionBit = int64(8) diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go index 7da7dc363c..436f60c0db 100644 --- a/pulsar/negative_acks_tracker_test.go +++ b/pulsar/negative_acks_tracker_test.go @@ -29,7 +29,7 @@ import ( const testNackDelay = 300 * time.Millisecond -var testNackPrecisionBit = &defaultNackPrecisionBit +var testNackPrecisionBit = Ptr(defaultNackPrecisionBit) type nackMockedConsumer struct { ch chan messageID From 3961d960a045d5f26920b2a81978a817e54f239a Mon Sep 17 00:00:00 2001 From: gydeng Date: Tue, 14 Oct 2025 03:45:53 +0000 Subject: [PATCH 09/10] chore: update TestNegativeAckPrecisionBitCnt test --- pulsar/consumer_impl.go | 2 +- pulsar/consumer_test.go | 229 ++++++++------------------- pulsar/negative_acks_tracker.go | 2 +- pulsar/negative_acks_tracker_test.go | 2 +- 4 files changed, 68 insertions(+), 167 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 7b1fcc3393..b3d3194cc6 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -122,7 +122,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { } if options.NackPrecisionBit == nil { - options.NackPrecisionBit = Ptr(defaultNackPrecisionBit) + options.NackPrecisionBit = ptr(defaultNackPrecisionBit) } else if *options.NackPrecisionBit < 0 { return nil, newError(InvalidConfiguration, "NackPrecisionBit cannot be negative") } diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 0677a88326..50743a04ee 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1226,198 +1226,99 @@ func TestConsumerNack(t *testing.T) { } func TestNegativeAckPrecisionBitCnt(t *testing.T) { - const delay = 1 * time.Second + // Validate behavior across precision bits and default (nil -> 8) + const delay = 300 * time.Millisecond // Tracker scans every 100ms (delay/3) + ctx := context.Background() - for precision := 1; precision <= 8; precision++ { - topicName := fmt.Sprintf("testNegativeAckPrecisionBitCnt-%d-%d", precision, time.Now().UnixNano()) - ctx := context.Background() - client, err := NewClient(ClientOptions{URL: lookupURL}) - assert.Nil(t, err) - defer client.Close() + client, err := NewClient(ClientOptions{URL: lookupURL}) + assert.Nil(t, err) + defer client.Close() - precision := int64(precision) + // Helper to verify behavior for a given NackPrecisionBit and boundary bits. + testPrecisionBitBehavior := func(nackPrecisionBit *int64, boundaryBits int64) { + // Create topic, consumer and producer inside the function + topicName := fmt.Sprintf("testNackPrecisionBit-%d-%d", boundaryBits, time.Now().UnixNano()) consumer, err := client.Subscribe(ConsumerOptions{ Topic: topicName, - SubscriptionName: "sub-1", + SubscriptionName: fmt.Sprintf("sub-%d", boundaryBits), Type: Shared, NackRedeliveryDelay: delay, - NackPrecisionBit: &precision, + NackPrecisionBit: nackPrecisionBit, // can be nil for default behavior }) assert.Nil(t, err) defer consumer.Close() - producer, err := client.CreateProducer(ProducerOptions{ - Topic: topicName, - }) + producer, err := client.CreateProducer(ProducerOptions{Topic: topicName}) assert.Nil(t, err) defer producer.Close() - // Send single message - content := "test-0" - _, err = producer.Send(ctx, &ProducerMessage{ - Payload: []byte(content), - }) - assert.Nil(t, err) + // Align to the next window boundary based on boundaryBits + windowMs := int64(1) << boundaryBits + nowMs := time.Now().UnixMilli() + nextBoundaryMs := ((nowMs / windowMs) + 1) * windowMs // Next boundary + time.Sleep(time.Duration(nextBoundaryMs-nowMs) * time.Millisecond) - // Receive and send negative ack - msg, err := consumer.Receive(ctx) + // Send first message at the boundary + content1 := fmt.Sprintf("msg1-p%d", boundaryBits) + _, err = producer.Send(ctx, &ProducerMessage{Payload: []byte(content1)}) assert.Nil(t, err) - assert.Equal(t, content, string(msg.Payload())) - consumer.Nack(msg) - // Calculate expected redelivery window - expectedRedelivery := time.Now().Add(delay) - deviation := time.Duration(int64(1)<= expected - deviation - assert.GreaterOrEqual(t, now.UnixMilli(), expectedRedelivery.UnixMilli()-deviation.Milliseconds()) - // since the client ticks at an interval of delay / 3 (i.e., 333 ms in this test), - // we add an extra 400 milliseconds to reduce the flaky - assert.LessOrEqual(t, now.UnixMilli(), expectedRedelivery.UnixMilli()+400) - - consumer.Ack(redelivered) - } -} - -func TestNackPrecisionBitDefaultBehavior(t *testing.T) { - // Test that default NackPrecisionBit (8) behaves the same as explicitly setting it to 8 - // This test uses precise timing to verify that messages within 256ms window are grouped together - - const delay = 300 * time.Millisecond // Tracker scans every 100ms (delay/3) - ctx := context.Background() - - // Create two consumers with different topic names to avoid conflicts - topicNameDefault := fmt.Sprintf("testNackPrecisionBitDefault-default-%d", time.Now().UnixNano()) - topicNameExplicit := fmt.Sprintf("testNackPrecisionBitDefault-explicit-%d", time.Now().UnixNano()) - - client, err := NewClient(ClientOptions{URL: lookupURL}) - assert.Nil(t, err) - defer client.Close() - - // Consumer 1: Default NackPrecisionBit (should be 8) - consumerDefault, err := client.Subscribe(ConsumerOptions{ - Topic: topicNameDefault, - SubscriptionName: "sub-default", - Type: Shared, - NackRedeliveryDelay: delay, - // NackPrecisionBit is not set, should use default value of 8 - }) - assert.Nil(t, err) - defer consumerDefault.Close() - - // Consumer 2: Explicit NackPrecisionBit set to 8 - consumerExplicit, err := client.Subscribe(ConsumerOptions{ - Topic: topicNameExplicit, - SubscriptionName: "sub-explicit", - Type: Shared, - NackRedeliveryDelay: delay, - NackPrecisionBit: Ptr(defaultNackPrecisionBit), - }) - assert.Nil(t, err) - defer consumerExplicit.Close() - - // Create producers for both topics - producerDefault, err := client.CreateProducer(ProducerOptions{ - Topic: topicNameDefault, - }) - assert.Nil(t, err) - defer producerDefault.Close() - producerExplicit, err := client.CreateProducer(ProducerOptions{ - Topic: topicNameExplicit, - }) - assert.Nil(t, err) - defer producerExplicit.Close() - - // Test function to verify precision bit behavior for a given consumer and producer - testPrecisionBitBehavior := func(consumer Consumer, producer Producer, topicName string) { - // Wait for next 256ms boundary to align timing - now := time.Now() - ms := now.UnixMilli() - nextBoundary := ((ms / 256) + 1) * 256 // Next 256ms boundary - waitTime := time.Duration(nextBoundary-ms) * time.Millisecond - time.Sleep(waitTime) - - // Send first message at 256ms boundary - content1 := fmt.Sprintf("msg1-%s", topicName) - _, err := producer.Send(ctx, &ProducerMessage{ - Payload: []byte(content1), - }) + // Receive and nack both messages + m1, err := consumer.Receive(ctx) + assert.Nil(t, err) + assert.Equal(t, content1, string(m1.Payload())) + consumer.Nack(m1) + m2, err := consumer.Receive(ctx) assert.Nil(t, err) + assert.Equal(t, content2, string(m2.Payload())) + consumer.Nack(m2) - // Wait 200ms (within 256ms window) - time.Sleep(200 * time.Millisecond) + // Expected redelivery window considering precision and tracker tick + expected := time.Now().Add(delay) + deviation := time.Duration(windowMs) * time.Millisecond - // Send second message 200ms after first (still within 256ms window) - content2 := fmt.Sprintf("msg2-%s", topicName) - _, err = producer.Send(ctx, &ProducerMessage{ - Payload: []byte(content2), - }) + // Both should be redelivered in the same cycle + rm1, err := consumer.Receive(ctx) assert.Nil(t, err) - - // Receive both messages and nack them - msg1, err := consumer.Receive(ctx) + redeliveryTime1 := time.Now() + rm2, err := consumer.Receive(ctx) assert.Nil(t, err) - assert.Equal(t, content1, string(msg1.Payload())) - consumer.Nack(msg1) + redeliveryTime2 := time.Now() - msg2, err := consumer.Receive(ctx) - assert.Nil(t, err) - assert.Equal(t, content2, string(msg2.Payload())) - consumer.Nack(msg2) + // KEY TEST: Both messages should be redelivered simultaneously (within 1 ms) + // because precision alignment groups both messages in the same time bucket + // filter out precision bits < 5 because they are flaky, their time granularity is too small (i.e., 1 ms to 16 ms) + if boundaryBits >= 5 { + assert.InDelta(t, redeliveryTime1.UnixMilli(), redeliveryTime2.UnixMilli(), 1) + } - // Record when nacks were sent - nackTime := time.Now() - expectedRedelivery := nackTime.Add(delay) + // Redelivery should occur within [expected-window, expected+buffer] + minExpected := expected.Add(-deviation) + maxExpected := expected.Add(150 * time.Millisecond) + assert.GreaterOrEqual(t, redeliveryTime1.UnixMilli(), minExpected.UnixMilli()) + assert.LessOrEqual(t, redeliveryTime2.UnixMilli(), maxExpected.UnixMilli()) - // Wait for redeliveries - both messages should be redelivered together - // because they are within the 256ms precision bit window - redelivered1, err := consumer.Receive(ctx) - assert.Nil(t, err) - redeliveryTime1 := time.Now() + consumer.Ack(rm1) + consumer.Ack(rm2) + } - redelivered2, err := consumer.Receive(ctx) - assert.Nil(t, err) - redeliveryTime2 := time.Now() + // Run for precision bits 1...8 with matching boundary bits + for bits := int64(1); bits <= int64(8); bits++ { + t.Run(fmt.Sprintf("PrecisionBits=%d", bits), func(_ *testing.T) { + testPrecisionBitBehavior(ptr(bits), bits) + }) + } - // Verify both messages were redelivered - assert.True(t, string(redelivered1.Payload()) == content1 || string(redelivered1.Payload()) == content2, - "First redelivered message should be one of the original messages") - assert.True(t, string(redelivered2.Payload()) == content1 || string(redelivered2.Payload()) == content2, - "Second redelivered message should be one of the original messages") - assert.NotEqual(t, string(redelivered1.Payload()), string(redelivered2.Payload()), - "Redelivered messages should be different") - - // KEY TEST: Both messages should be redelivered simultaneously (within 10ms of each other to reduce flaky) - // because 256ms alignment groups both messages in the same redelivery interval - timeDiff := redeliveryTime2.Sub(redeliveryTime1) - assert.Less(t, timeDiff, 10*time.Millisecond, - "Both redelivered messages should arrive simultaneously (within 10ms) due to 256ms alignment "+ - "placing them in the same redelivery interval") - - // Redelivery should happen within expected window (considering 256ms precision) - minExpected := expectedRedelivery.Add(-256 * time.Millisecond) - maxExpected := expectedRedelivery.Add(150 * time.Millisecond) // Buffer for test stability - - assert.GreaterOrEqual(t, redeliveryTime1.UnixMilli(), minExpected.UnixMilli(), - "Redelivery should happen after minimum expected time") - assert.LessOrEqual(t, redeliveryTime2.UnixMilli(), maxExpected.UnixMilli(), - "Redelivery should happen before maximum expected time") - - // Acknowledge both messages - consumer.Ack(redelivered1) - consumer.Ack(redelivered2) - } - - // Test both consumers - testPrecisionBitBehavior(consumerDefault, producerDefault, "default") - testPrecisionBitBehavior(consumerExplicit, producerExplicit, "explicit") + // Default behavior (nil) should match precision bit 8 + t.Run("DefaultPrecisionBits=8", func(_ *testing.T) { + testPrecisionBitBehavior(nil, int64(8)) + }) } func TestConsumerCompression(t *testing.T) { diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go index 6e4b3eb215..586fe72760 100644 --- a/pulsar/negative_acks_tracker.go +++ b/pulsar/negative_acks_tracker.go @@ -202,6 +202,6 @@ func (t *negativeAcksTracker) Close() { }) } -func Ptr[T any](v T) *T { return &v } +func ptr[T any](v T) *T { return &v } const defaultNackPrecisionBit = int64(8) diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go index 436f60c0db..bf39534628 100644 --- a/pulsar/negative_acks_tracker_test.go +++ b/pulsar/negative_acks_tracker_test.go @@ -29,7 +29,7 @@ import ( const testNackDelay = 300 * time.Millisecond -var testNackPrecisionBit = Ptr(defaultNackPrecisionBit) +var testNackPrecisionBit = ptr(defaultNackPrecisionBit) type nackMockedConsumer struct { ch chan messageID From 560d86f98295e9145c8e58cc1701f264de4a1bb9 Mon Sep 17 00:00:00 2001 From: gydeng Date: Tue, 14 Oct 2025 07:20:17 +0000 Subject: [PATCH 10/10] fix: make TestNegativeAckPrecisionBitCnt more stable --- pulsar/consumer_test.go | 12 ++++++------ pulsar/negative_acks_tracker.go | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 50743a04ee..9f20ed8417 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1263,8 +1263,8 @@ func TestNegativeAckPrecisionBitCnt(t *testing.T) { _, err = producer.Send(ctx, &ProducerMessage{Payload: []byte(content1)}) assert.Nil(t, err) - // Send second message around 1/2 into the window (still in same window) - time.Sleep(time.Duration(windowMs/2) * time.Millisecond) + // Send second message around 3/4 into the window (still in same window) + time.Sleep(time.Duration(windowMs*3/4) * time.Millisecond) content2 := fmt.Sprintf("msg2-p%d", boundaryBits) _, err = producer.Send(ctx, &ProducerMessage{Payload: []byte(content2)}) assert.Nil(t, err) @@ -1291,10 +1291,10 @@ func TestNegativeAckPrecisionBitCnt(t *testing.T) { assert.Nil(t, err) redeliveryTime2 := time.Now() - // KEY TEST: Both messages should be redelivered simultaneously (within 1 ms) - // because precision alignment groups both messages in the same time bucket - // filter out precision bits < 5 because they are flaky, their time granularity is too small (i.e., 1 ms to 16 ms) - if boundaryBits >= 5 { + // For both the default precision (nil) and precisionBit=8, boundaryBits is 8. + // This checks that the default precisionBit is correctly set to 8, + // and that its redelivery behavior matches a consumer explicitly configured with precisionBit=8. + if boundaryBits == 8 { assert.InDelta(t, redeliveryTime1.UnixMilli(), redeliveryTime2.UnixMilli(), 1) } diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go index 586fe72760..ac9f08d790 100644 --- a/pulsar/negative_acks_tracker.go +++ b/pulsar/negative_acks_tracker.go @@ -30,7 +30,7 @@ type redeliveryConsumer interface { Redeliver(msgIDs []messageID) } -type LedgerID = int64 +type ledgerID = int64 type negativeAcksTracker struct { sync.Mutex @@ -104,11 +104,11 @@ func putNackEntry(t *negativeAcksTracker, batchMsgID *messageID, delay time.Dura // try get trimmedTime value, exists := t.negativeAcks.Get(trimmedTime) if !exists { - newMap := make(map[LedgerID]*roaring64.Bitmap) + newMap := make(map[ledgerID]*roaring64.Bitmap) t.negativeAcks.Put(trimmedTime, newMap) value = newMap } - bitmapMap, ok := value.(map[LedgerID]*roaring64.Bitmap) + bitmapMap, ok := value.(map[ledgerID]*roaring64.Bitmap) if !ok { panic("negativeAcksTracker: value is not of expected type map[LedgerID]*roaring64.Bitmap") } @@ -168,7 +168,7 @@ func (t *negativeAcksTracker) track() { break } - ledgerMap := iterator.Value().(map[LedgerID]*roaring64.Bitmap) + ledgerMap := iterator.Value().(map[ledgerID]*roaring64.Bitmap) for ledgerID, entrySet := range ledgerMap { for _, entryID := range entrySet.ToArray() { msgID := messageID{