From c23efc4412ee478f99be8e4ae7d794a935c182b8 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Wed, 20 May 2026 15:43:06 +0800 Subject: [PATCH 1/4] fix: avoid context leak in blue-green migration test Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- pulsar/blue_green_migration_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar/blue_green_migration_test.go b/pulsar/blue_green_migration_test.go index 4cd2e4765c..2a4b4174e8 100644 --- a/pulsar/blue_green_migration_test.go +++ b/pulsar/blue_green_migration_test.go @@ -162,8 +162,8 @@ func testTopicMigrate( for true { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() _, err := producer.Send(ctx, &pm) + cancel() if err == nil { break } @@ -187,8 +187,8 @@ func testTopicMigrate( for true { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() m, err := consumer.Receive(ctx) + cancel() if err == nil { err = consumer.Ack(m) if err == nil { From e6a540d9d9709e50587d8f896af28f17900f420a Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Wed, 20 May 2026 15:47:13 +0800 Subject: [PATCH 2/4] fix: avoid infinite retry in blue-green migration test Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- pulsar/blue_green_migration_test.go | 51 +++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/pulsar/blue_green_migration_test.go b/pulsar/blue_green_migration_test.go index 2a4b4174e8..1690849b36 100644 --- a/pulsar/blue_green_migration_test.go +++ b/pulsar/blue_green_migration_test.go @@ -21,6 +21,7 @@ package pulsar import ( "context" + "errors" "fmt" "net/http" "runtime" @@ -148,6 +149,7 @@ func testTopicMigrate( // Signals both producer and consumer have processed `messageCountBeforeUnload` messages wgSendAndReceiveMessages := sync.WaitGroup{} wgSendAndReceiveMessages.Add(2) + errCh := make(chan error, 1) // Producer go func() { @@ -159,6 +161,7 @@ func testTopicMigrate( } pm := ProducerMessage{Payload: []byte(fmt.Sprintf("hello-%d", i))} + retryStarted := time.Now() for true { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) @@ -167,6 +170,20 @@ func testTopicMigrate( if err == nil { break } + if errors.Is(err, ErrTopicTerminated) || errors.Is(err, ErrProducerClosed) { + select { + case errCh <- fmt.Errorf("producer became terminal during migration at message %d: %w", i, err): + default: + } + return + } + if time.Since(retryStarted) > 30*time.Second { + select { + case errCh <- fmt.Errorf("producer send retry exceeded 30s at message %d: %w", i, err): + default: + } + return + } time.Sleep(1000 * time.Millisecond) } @@ -185,6 +202,7 @@ func testTopicMigrate( wgUnload.Wait() } + retryStarted := time.Now() for true { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) m, err := consumer.Receive(ctx) @@ -195,6 +213,13 @@ func testTopicMigrate( break } } + if time.Since(retryStarted) > 30*time.Second { + select { + case errCh <- fmt.Errorf("consumer receive/ack retry exceeded 30s at message %d: %w", i, err): + default: + } + return + } time.Sleep(100 * time.Millisecond) } @@ -205,7 +230,27 @@ func testTopicMigrate( }() // Unload the bundle, triggering the producers and consumers to reconnect to the specified broker. - wgSendAndReceiveMessages.Wait() + waitWithError := func(wg *sync.WaitGroup, stage string) bool { + doneCh := make(chan struct{}) + go func() { + wg.Wait() + close(doneCh) + }() + + select { + case <-doneCh: + return true + case err := <-errCh: + req.NoError(err, stage) + return false + case <-time.After(90 * time.Second): + req.FailNow(stage + " timeout") + return false + } + } + if !waitWithError(&wgSendAndReceiveMessages, "waiting for pre-unload send/receive") { + return + } topicMigrationURL = fmt.Sprintf( "/admin/v2/clusters/%s/migrate?migrated=true", cluster) @@ -248,5 +293,7 @@ func testTopicMigrate( return req.Equal(dstTopicBrokerURL, topicBrokerURL) }) - wgRoutines.Wait() + if !waitWithError(&wgRoutines, "waiting for producer/consumer routines") { + return + } } From 6c3464635b623b1d1cb51bbd0370d8507a1454ba Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Mon, 1 Jun 2026 16:00:45 +0800 Subject: [PATCH 3/4] Address comment --- pulsar/blue_green_migration_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pulsar/blue_green_migration_test.go b/pulsar/blue_green_migration_test.go index 1690849b36..badb9863a1 100644 --- a/pulsar/blue_green_migration_test.go +++ b/pulsar/blue_green_migration_test.go @@ -164,9 +164,12 @@ func testTopicMigrate( retryStarted := time.Now() for true { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - _, err := producer.Send(ctx, &pm) - cancel() + err := func() error { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + _, err := producer.Send(ctx, &pm) + return err + } if err == nil { break } From 750f4deaf2b2f5e5b620e68d2aff922399f69693 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 2 Jun 2026 16:30:31 +0800 Subject: [PATCH 4/4] Address comment --- pulsar/blue_green_migration_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pulsar/blue_green_migration_test.go b/pulsar/blue_green_migration_test.go index badb9863a1..5c0f5af483 100644 --- a/pulsar/blue_green_migration_test.go +++ b/pulsar/blue_green_migration_test.go @@ -169,7 +169,7 @@ func testTopicMigrate( defer cancel() _, err := producer.Send(ctx, &pm) return err - } + }() if err == nil { break } @@ -242,6 +242,12 @@ func testTopicMigrate( select { case <-doneCh: + select { + case err := <-errCh: + req.NoError(err, stage) + return false + default: + } return true case err := <-errCh: req.NoError(err, stage)