Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pulsar/internal/channel_cond.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (c *chCond) wait() {
}

// waitWithContext Same as wait() call, but the end condition can also be controlled through the context.
// It blocks until either a broadcast occurs or the context is done.
func (c *chCond) waitWithContext(ctx context.Context) bool {
n := c.notifyChan()
c.L.Unlock()
Expand All @@ -56,8 +57,6 @@ func (c *chCond) waitWithContext(ctx context.Context) bool {
return true
case <-ctx.Done():
return false
default:
return true
}
}

Expand Down
35 changes: 35 additions & 0 deletions pulsar/internal/channel_cond_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,38 @@ func TestChCondWithContext(_ *testing.T) {
cancel()
wg.Wait()
}

func TestChCondWithContextBlocks(t *testing.T) {
// Verify that waitWithContext actually blocks (does not return via a default case)
// until either a broadcast or context cancellation occurs.
cond := newCond(&sync.Mutex{})
started := make(chan struct{})
done := make(chan struct{})

go func() {
cond.L.Lock()
close(started)
cond.waitWithContext(context.Background())
cond.L.Unlock()
close(done)
}()

<-started
// Give the goroutine time to enter the select. If there were a default case,
// it would return immediately and close done before the broadcast below.
time.Sleep(20 * time.Millisecond)

select {
case <-done:
t.Fatal("waitWithContext returned before broadcast — default case must have fired")
default:
}

cond.broadcast()

select {
case <-done:
case <-time.After(time.Second):
t.Fatal("waitWithContext did not unblock after broadcast")
}
}
Loading