-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbuffercount.go
More file actions
34 lines (33 loc) · 879 Bytes
/
buffercount.go
File metadata and controls
34 lines (33 loc) · 879 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package rx
func BufferCount[T any](observable Observable[T], bufferSize, startBufferEvery int) Observable[[]T] {
return func(observe Observer[[]T], scheduler Scheduler, subscriber Subscriber) {
var buffer []T
observer := func(next T, err error, done bool) {
switch {
case !done:
buffer = append(buffer, next)
n := len(buffer)
if n >= bufferSize {
if n == bufferSize {
clone := append(make([]T, 0, n), buffer...)
observe(clone, nil, false)
}
if n >= startBufferEvery {
n = copy(buffer, buffer[startBufferEvery:])
buffer = buffer[:n]
}
}
case err != nil:
observe(nil, err, true)
default:
n := len(buffer)
if 0 < n && n <= bufferSize {
Of(buffer)(observe, scheduler, subscriber)
} else {
observe(nil, nil, true)
}
}
}
observable(observer, scheduler, subscriber)
}
}