-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbehaviorObservable.go
More file actions
105 lines (94 loc) · 2.58 KB
/
behaviorObservable.go
File metadata and controls
105 lines (94 loc) · 2.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package rx
import (
"sync"
)
// BehaviorObservable represents an object that can emit a set of values over a
// period of time. This object is a special case as opposed to simple observable
// differs in that it emits the last value emitted to new subscribers upon
// subscription.
type BehaviorObservable struct {
baseObservable
currentValue interface{}
}
// NewBehaviorObservable creates new behavior observable.
func NewBehaviorObservable(source Observable, initialValue interface{}) Observable {
observable := BehaviorObservable{
baseObservable: baseObservable{
_source: source,
_input: make(chan interface{}),
register: make(chan chan<- interface{}),
unregister: make(chan chan<- interface{}),
outputs: make(map[chan<- interface{}]*outputMetadata),
mutex: &sync.RWMutex{},
},
currentValue: initialValue,
}
go observable.run()
return observable
}
// Subscribe registers Observer handlers for notifications it will emit.
func (o BehaviorObservable) Subscribe(handlers ...EventHandler) *Subscription {
sub := o.subscribe(true, handlers...)
var observable Observable = o
for observable.source() != nil {
observable = observable.source()
}
observable.input() <- message{ch: sub.channel, data: o.currentValue}
return sub
}
// Pipe is used to stitch together functional operators into a chain.
func (o BehaviorObservable) Pipe(operations ...OperatorFunction) Observable {
var observable Observable = o
for _, operation := range operations {
observable = operation(observable)
}
return observable
}
// ToPromise returns a awaitable channel which closes after receiving one
// notification.
func (o BehaviorObservable) ToPromise() func() chan interface{} {
channel := make(chan interface{})
sub := o.Subscribe(
NextFunc(func(item interface{}) {
channel <- item
}),
ErrorFunc(func(err error) {
channel <- err
}),
CompleteFunc(func() {
}),
)
var closed bool
return func() chan interface{} {
output := make(chan interface{})
func() {
value, ok := <-channel
if ok {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer func() {
recover()
}()
wg.Done()
output <- value
}()
wg.Wait()
close(channel)
sub.Unsubscribe()
}
}()
if !closed {
closed = true
return output
}
close(output)
return output
}
}
// SetCurrentValue sets the current value of the observable which will be
// emitted to the next subscriber. This value mostly adjusts automatically
// based on the last emitted value.
func (o *BehaviorObservable) SetCurrentValue(value interface{}) {
o.currentValue = value
}