-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbroadcast.go
More file actions
74 lines (70 loc) · 1.7 KB
/
broadcast.go
File metadata and controls
74 lines (70 loc) · 1.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package broadcast
import (
"sync"
"time"
)
type BroadCaster struct {
mu sync.RWMutex
receivers map[chan<- interface{}]struct{}
}
func New() *BroadCaster {
return &BroadCaster{
mu: sync.RWMutex{},
receivers: make(map[chan<- interface{}]struct{}),
}
}
// Join returns a sender and a receiver, which are for sending messages to and
// receiving messages from respectively. The parameter timeout is to not block
// the sending action to other clients if one client is slow or unresponsive. If
// you want to exit the broadcast group, you must close the returned sender
// channel.
func (b *BroadCaster) Join(timeout time.Duration) (sender chan<- interface{}, receiver <-chan interface{}) {
se, re := make(chan interface{}), make(chan interface{})
b.mu.Lock()
b.receivers[re] = struct{}{}
b.mu.Unlock()
go func() {
timer := time.NewTimer(timeout)
for m := range se {
var blocking bool
if timeout > 0 {
// Only block the sending operation when the timeout is bigger than 0.
blocking = true
// Reset the timer.
if !timer.Stop() {
// The timer may have been drained, where the operation will block forever.
select {
case <-timer.C:
default:
}
}
timer.Reset(timeout)
}
// Broadcast messages.
b.mu.RLock()
for r := range b.receivers {
if r != re {
if blocking {
select {
case r <- m:
case <-timer.C:
// If this operation has expired, the next operations should be non-blocking.
blocking = false
}
} else {
select {
case r <- m:
default:
}
}
}
}
b.mu.RUnlock()
}
b.mu.Lock()
close(re)
delete(b.receivers, re)
b.mu.Unlock()
}()
return se, re
}