diff --git a/pulsar/internal/channel_cond.go b/pulsar/internal/channel_cond.go index 38301abef8..94b303a1fa 100644 --- a/pulsar/internal/channel_cond.go +++ b/pulsar/internal/channel_cond.go @@ -47,6 +47,7 @@ func (c *chCond) wait() { } // waitWithContext Same as wait() call, but the end condition can also be controlled through the context. +// It blocks until either a broadcast occurs or the context is done. func (c *chCond) waitWithContext(ctx context.Context) bool { n := c.notifyChan() c.L.Unlock() @@ -56,8 +57,6 @@ func (c *chCond) waitWithContext(ctx context.Context) bool { return true case <-ctx.Done(): return false - default: - return true } } diff --git a/pulsar/internal/channel_cond_test.go b/pulsar/internal/channel_cond_test.go index 93a0408439..0a58f4fa6f 100644 --- a/pulsar/internal/channel_cond_test.go +++ b/pulsar/internal/channel_cond_test.go @@ -53,3 +53,38 @@ func TestChCondWithContext(_ *testing.T) { cancel() wg.Wait() } + +func TestChCondWithContextBlocks(t *testing.T) { + // Verify that waitWithContext actually blocks (does not return via a default case) + // until either a broadcast or context cancellation occurs. + cond := newCond(&sync.Mutex{}) + started := make(chan struct{}) + done := make(chan struct{}) + + go func() { + cond.L.Lock() + close(started) + cond.waitWithContext(context.Background()) + cond.L.Unlock() + close(done) + }() + + <-started + // Give the goroutine time to enter the select. If there were a default case, + // it would return immediately and close done before the broadcast below. + time.Sleep(20 * time.Millisecond) + + select { + case <-done: + t.Fatal("waitWithContext returned before broadcast — default case must have fired") + default: + } + + cond.broadcast() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("waitWithContext did not unblock after broadcast") + } +}