-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.js
More file actions
69 lines (61 loc) · 2.28 KB
/
index.js
File metadata and controls
69 lines (61 loc) · 2.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
export const CLOSED = Symbol('CLOSED');
/**
* Creates a simple channel based-upon promises. The size of the channel's
* buffer is unbounded, so this should not be used in cases where messages are
* put onto the channel faster than consumers can take messages off it.
*
* @return {Object} The new channel.
*/
export function createChannel() {
let isClosed = false;
const messageQueue = [];
const resolveQueue = [];
/**
* Attempts to put a a new `value` onto the channel.
* @param {any} value The value to put onto the channel.
* @return {Boolean} True if the value was put onto the channel. If the
* channel is closed no value is put onto the channel, and
* false is returned.
*/
function put(value) {
if (isClosed) return false;
if (resolveQueue.length) resolveQueue.shift()(value);
else messageQueue.push(value);
return true;
}
/**
* Attempts to take a value from the channel. A promise is returned that will
* resolve to a value from the channel when one is available. If the channel
* is closed, and the channel has finished draining the returned promise
* resolved the CLOSED. If the channel becomes closed while the promise is
* still unfulfilled, the promise is resolved with CLOSED.
* @return {Promise<a>} Promise that resolves with the next available value
* from the channel, or with CLOSED if the channel is
* closed, or becomes closed while awaiting a value.
*/
function take() {
if (isClosed && !messageQueue.length)
return Promise.resolve(CLOSED);
if (messageQueue.length)
return Promise.resolve(messageQueue.shift());
return new Promise(resolve => resolveQueue.push(resolve));
}
/**
* Closes a channel. Any outstanding take()s that are awaiting another value
* from the channel will have their promises resolved with CLOSED.
* @return {Boolean} True if the the channel was closed, false if the cannel
* was already closed.
*/
function close() {
if (isClosed) return false;
isClosed = true;
if (resolveQueue.length)
resolveQueue.map(resolve => resolve(CLOSED));
return true;
}
return Object.freeze({
put,
take,
close
});
}