Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 62 additions & 6 deletions pulsar/blue_green_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package pulsar

import (
"context"
"errors"
"fmt"
"net/http"
"runtime"
Expand Down Expand Up @@ -148,6 +149,7 @@ func testTopicMigrate(
// Signals both producer and consumer have processed `messageCountBeforeUnload` messages
wgSendAndReceiveMessages := sync.WaitGroup{}
wgSendAndReceiveMessages.Add(2)
errCh := make(chan error, 1)
Comment thread
nodece marked this conversation as resolved.

// Producer
go func() {
Expand All @@ -159,14 +161,32 @@ func testTopicMigrate(
}

pm := ProducerMessage{Payload: []byte(fmt.Sprintf("hello-%d", i))}
retryStarted := time.Now()

for true {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
_, err := producer.Send(ctx, &pm)
err := func() error {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
_, err := producer.Send(ctx, &pm)
return err
}()
if err == nil {
break
}
if errors.Is(err, ErrTopicTerminated) || errors.Is(err, ErrProducerClosed) {
select {
case errCh <- fmt.Errorf("producer became terminal during migration at message %d: %w", i, err):
default:
}
Comment thread
BewareMyPower marked this conversation as resolved.
return
}
if time.Since(retryStarted) > 30*time.Second {
select {
case errCh <- fmt.Errorf("producer send retry exceeded 30s at message %d: %w", i, err):
default:
}
Comment thread
BewareMyPower marked this conversation as resolved.
return
}
time.Sleep(1000 * time.Millisecond)
}

Expand All @@ -185,16 +205,24 @@ func testTopicMigrate(
wgUnload.Wait()
}

retryStarted := time.Now()
for true {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
m, err := consumer.Receive(ctx)
cancel()
Comment thread
BewareMyPower marked this conversation as resolved.
if err == nil {
err = consumer.Ack(m)
if err == nil {
break
}
}
if time.Since(retryStarted) > 30*time.Second {
select {
case errCh <- fmt.Errorf("consumer receive/ack retry exceeded 30s at message %d: %w", i, err):
default:
}
Comment thread
BewareMyPower marked this conversation as resolved.
return
}
time.Sleep(100 * time.Millisecond)
}

Expand All @@ -205,7 +233,33 @@ func testTopicMigrate(
}()

// Unload the bundle, triggering the producers and consumers to reconnect to the specified broker.
wgSendAndReceiveMessages.Wait()
waitWithError := func(wg *sync.WaitGroup, stage string) bool {
doneCh := make(chan struct{})
go func() {
wg.Wait()
close(doneCh)
}()

select {
case <-doneCh:
select {
case err := <-errCh:
req.NoError(err, stage)
return false
default:
}
return true
case err := <-errCh:
req.NoError(err, stage)
return false
case <-time.After(90 * time.Second):
req.FailNow(stage + " timeout")
return false
}
}
if !waitWithError(&wgSendAndReceiveMessages, "waiting for pre-unload send/receive") {
return
}

topicMigrationURL = fmt.Sprintf(
"/admin/v2/clusters/%s/migrate?migrated=true", cluster)
Expand Down Expand Up @@ -248,5 +302,7 @@ func testTopicMigrate(
return req.Equal(dstTopicBrokerURL, topicBrokerURL)
})

wgRoutines.Wait()
if !waitWithError(&wgRoutines, "waiting for producer/consumer routines") {
return
}
}
Loading