-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathoperator_connectable.go
More file actions
235 lines (205 loc) · 7.84 KB
/
operator_connectable.go
File metadata and controls
235 lines (205 loc) · 7.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
// Copyright 2025 samber.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://github.com/samber/ro/blob/main/licenses/LICENSE.apache.md
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ro
import (
"context"
"sync"
"sync/atomic"
)
// ShareConfig is the configuration for the Share operator.
type ShareConfig[T any] struct {
Connector func() Subject[T]
ResetOnError bool
ResetOnComplete bool
ResetOnRefCountZero bool
}
// Share creates a new Observable that multicasts (shares) the original
// Observable. As long as there is at least one subscription to the
// multicasted Observable, the source Observable will be subscribed and
// emitting data. When all subscribers have unsubscribed, the source
// Observable will be unsubscribed.
//
// This is an alias for ShareWithConfig with default configuration.
// Play: https://go.dev/play/p/C34fv02jAIH
func Share[T any]() func(Observable[T]) Observable[T] {
return ShareWithConfig(ShareConfig[T]{
Connector: defaultConnector[T],
ResetOnError: true,
ResetOnComplete: true,
ResetOnRefCountZero: true,
})
}
// ShareWithConfig creates a new Observable that multicasts (shares) the
// original Observable. As long as there is at least one subscription to the
// multicasted Observable, the source Observable will be subscribed and
// emitting data. When all subscribers have unsubscribed, the source
// Observable will be unsubscribed.
//
// The configuration allows to customize the behavior of the shared
// Observable:
// - `Connector` is a factory function that creates a new Subject for each
// subscription. The Subject can be any type of Subject, such as a
// ReplaySubject, a BehaviorSubject, a ReplaySubject, etc.
// - `ResetOnError` determines whether the shared Observable should be reset
// when an error is emitted.
// - `ResetOnComplete` determines whether the shared Observable should be reset
// when it completes.
// - `ResetOnRefCountZero` determines whether the shared Observable should be
// reset when the reference count reaches zero.
//
// Play: https://go.dev/play/p/C34fv02jAIH
func ShareWithConfig[T any](config ShareConfig[T]) func(Observable[T]) Observable[T] {
if config.Connector == nil {
panic(ErrConnectableObservableMissingConnectorFactory)
}
return func(source Observable[T]) Observable[T] {
// Subscriptions to `source` can be concurrent, so we protect shared
// objects against race conditions.
var mu sync.Mutex
// var subject atomic.Pointer[Subject[T]]
var subject Subject[T]
var sourceSubscription Subscription // subscription between the source and the subject
refCount := 0 // not an atomic counter, because it is protected by mutex
var hasBeenResetOnError int32 // atomic.Bool is not available in Go 1.18
var hasBeenResetOnCompletion int32 // atomic.Bool is not available in Go 1.18
// Unsafe: must be called in a mutex lock.
getOrCreateSubject := func() (Subject[T], Subscription, bool) {
if subject == nil || sourceSubscription == nil {
subject = config.Connector()
sourceSubscription = NewSubscription(nil)
return subject, sourceSubscription, true
}
return subject, sourceSubscription, false
}
// Unsafe: must be called in a mutex lock.
reset := func(currentSubject Subject[T], currentSourceSubscription Subscription) {
// never nil
currentSourceSubscription.Unsubscribe()
if currentSourceSubscription == sourceSubscription {
sourceSubscription = nil
}
if currentSubject == subject {
subject = nil
}
}
return NewObservableWithContext(func(subscriberCtx context.Context, destination Observer[T]) Teardown {
mu.Lock()
refCount++
// `currentSubject` is a backup (local reference) of `subject`
// to manipulate it even after reset.
currentSubject, currentSourceSubscription, createdSubject := getOrCreateSubject()
mu.Unlock()
// Expected to be non-blocking.
// This is the subscription between the subject and the new observer.
sub := currentSubject.SubscribeWithContext(subscriberCtx, destination)
if createdSubject {
atomic.StoreInt32(&hasBeenResetOnError, 0)
atomic.StoreInt32(&hasBeenResetOnCompletion, 0)
// We need to handle errors and completion so we added a
// proxy observer between source and subject.
proxy := NewSubscriber(
NewObserverWithContext(
currentSubject.NextWithContext,
func(ctx context.Context, err error) {
if config.ResetOnError {
mu.Lock()
reset(currentSubject, currentSourceSubscription)
mu.Unlock()
} else {
atomic.StoreInt32(&hasBeenResetOnError, 1)
}
currentSubject.ErrorWithContext(ctx, err)
},
func(ctx context.Context) {
if config.ResetOnComplete {
mu.Lock()
reset(currentSubject, currentSourceSubscription)
mu.Unlock()
} else {
atomic.StoreInt32(&hasBeenResetOnCompletion, 1)
}
currentSubject.CompleteWithContext(ctx)
},
),
)
// Subscription between the source and the subject.
sourceSubscription.AddUnsubscribable(
source.SubscribeWithContext(subscriberCtx, proxy),
)
}
return func() {
sub.Unsubscribe()
mu.Lock()
refCount--
if config.ResetOnRefCountZero {
if refCount == 0 && atomic.LoadInt32(&hasBeenResetOnError) == 0 && atomic.LoadInt32(&hasBeenResetOnCompletion) == 0 {
reset(currentSubject, currentSourceSubscription)
}
}
mu.Unlock()
}
})
}
}
// ShareReplayConfig is the configuration for the ShareReplay operator.
type ShareReplayConfig struct {
ResetOnRefCountZero bool
}
// ShareReplay creates a new Observable that multicasts (shares) the original
// Observable and replays a specified number of items to any future
// subscribers. As long as there is at least one subscription to the
// multicasted Observable, the source Observable will be subscribed and
// emitting data. When all subscribers have unsubscribed, the source
// Observable will be unsubscribed.
//
// This is an alias for ShareReplayWithConfig with default configuration.
// Play: https://go.dev/play/p/QmsDbChzRgu
func ShareReplay[T any](bufferSize int) func(Observable[T]) Observable[T] {
return ShareWithConfig(
ShareConfig[T]{
Connector: func() Subject[T] {
return NewReplaySubject[T](bufferSize)
},
ResetOnError: true,
ResetOnComplete: false,
ResetOnRefCountZero: false,
},
)
}
// ShareReplayWithConfig creates a new Observable that multicasts (shares) the
// original Observable and replays a specified number of items to any future
// subscribers. As long as there is at least one subscription to the
// multicasted Observable, the source Observable will be subscribed and
// emitting data. When all subscribers have unsubscribed, the source
// Observable will be unsubscribed.
//
// The configuration allows to customize the behavior of the shared
// Observable:
// - `bufferSize` is the number of items to replay to future subscribers.
// - `ResetOnRefCountZero` determines whether the shared Observable should be
// reset when the reference count reaches zero.
//
// Play: https://go.dev/play/p/QmsDbChzRgu
func ShareReplayWithConfig[T any](bufferSize int, config ShareReplayConfig) func(Observable[T]) Observable[T] {
return ShareWithConfig(
ShareConfig[T]{
Connector: func() Subject[T] {
return NewReplaySubject[T](bufferSize)
},
ResetOnError: true,
ResetOnComplete: false,
ResetOnRefCountZero: config.ResetOnRefCountZero,
},
)
}