-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpipe.js
More file actions
68 lines (53 loc) · 1.23 KB
/
pipe.js
File metadata and controls
68 lines (53 loc) · 1.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class Pipe {
#closed = false;
#resolves = [];
#promises = [];
#add() {
this.#promises.push(new Promise((res, rej) => {
this.#resolves.push([res, rej]);
}));
}
read() {
if (this.#closed) return Promise.reject("EOF: closed");
if (this.#resolves.length === 0) this.#add();
return new Promise((res, rej) => {
let resolve = this.#resolves.shift();
resolve[0]([res, rej]);
});
}
async write(ele) {
if (this.#closed) throw "EOF: closed";
if (this.#promises.length === 0) this.#add();
let promise = await this.#promises.shift();
promise[0](ele);
}
close() {
console.log("closing pipe");
this.#closed = true;
this.#resolves.forEach((res) => res[1]("EOF: closed"));
this.#resolves = [];
this.#promises.forEach((prm) => prm.then((res) => res[1]("EOF: closed")));
this.#promises = [];
}
}
class PipeReader {
#pipe;
constructor(pipe) {
this.#pipe = pipe;
}
async read() {
console.log("read");
return await this.#pipe.read();
}
}
class PipeWriter {
#pipe;
constructor(pipe) {
this.#pipe = pipe;
}
async write(ele) {
console.log("write");
await this.#pipe.write(ele);
}
}
export {Pipe, PipeReader, PipeWriter};