diff --git a/azure-pipelines/node.yml b/azure-pipelines/node.yml index be0803f0..5253c6f5 100644 --- a/azure-pipelines/node.yml +++ b/azure-pipelines/node.yml @@ -30,4 +30,4 @@ steps: displayName: 🧪 yarn tslint inputs: projectDirectory: src/nerdbank-streams - arguments: tslint --project . + arguments: lint diff --git a/src/nerdbank-streams/package.json b/src/nerdbank-streams/package.json index 98cec8d2..5389ffe4 100644 --- a/src/nerdbank-streams/package.json +++ b/src/nerdbank-streams/package.json @@ -28,7 +28,8 @@ "scripts": { "build": "tsc -p gulpfile.tsconfig.json && gulp", "watch": "node ./node_modules/typescript/bin/tsc -p tsconfig.json -w", - "test": "jasmine" + "test": "jasmine", + "lint": "tslint --project ." }, "devDependencies": { "@types/jasmine": "^4.0.3", @@ -52,6 +53,7 @@ "await-semaphore": "^0.1.3", "cancellationtoken": "^2.0.1", "caught": "^0.1.3", - "msgpack-lite": "^0.1.26" + "msgpack-lite": "^0.1.26", + "plexer": "^2.0.0" } } diff --git a/src/nerdbank-streams/src/FullDuplexStream.ts b/src/nerdbank-streams/src/FullDuplexStream.ts index c3932415..28086eaa 100644 --- a/src/nerdbank-streams/src/FullDuplexStream.ts +++ b/src/nerdbank-streams/src/FullDuplexStream.ts @@ -1,4 +1,5 @@ import { Duplex, PassThrough } from "stream"; +import duplexer = require('plexer') export class FullDuplexStream { public static CreatePair(): { first: Duplex, second: Duplex } { @@ -11,20 +12,6 @@ export class FullDuplexStream { } public static Splice(readable: NodeJS.ReadableStream, writable: NodeJS.WritableStream): Duplex { - const duplex = new Duplex({ - write(chunk, encoding, callback) { - writable.write(chunk, encoding, callback); - }, - - final(callback) { - writable.end(callback); - }, - }); - - // All reads and events come directly from the readable stream. - duplex.read = readable.read.bind(readable); - duplex.on = readable.on.bind(readable) as any; - - return duplex; + return duplexer(writable, readable) } } diff --git a/src/nerdbank-streams/src/Utilities.ts b/src/nerdbank-streams/src/Utilities.ts index 35bc881c..b87d4a32 100644 --- a/src/nerdbank-streams/src/Utilities.ts +++ b/src/nerdbank-streams/src/Utilities.ts @@ -1,6 +1,5 @@ import CancellationToken from "cancellationtoken"; import { Readable, Writable } from "stream"; -import { Deferred } from "./Deferred"; import { IDisposableObservable } from "./IDisposableObservable"; export async function writeAsync(stream: NodeJS.WritableStream, chunk: any) { @@ -35,20 +34,118 @@ export function writeSubstream(stream: NodeJS.WritableStream): NodeJS.WritableSt }); } -export function readSubstream(stream: NodeJS.ReadableStream): NodeJS.ReadableStream { +/** + * Reads the next chunk from a stream, asynchronously waiting for more to be read if necessary. + * @param stream The stream to read from. + * @param cancellationToken A token whose cancellation will result in immediate rejection of the previously returned promise. + * @returns The result of reading from the stream. This will be null if the end of the stream is reached before any more can be read. + */ +export function readAsync(stream: NodeJS.ReadableStream, cancellationToken?: CancellationToken): Promise { + if (!(stream.isPaused() || (stream as Readable).readableFlowing !== true)) { + throw new Error('Stream must not be in flowing mode.'); + } + + const result = stream.read() + if (result) { + return Promise.resolve(result) + } + + return new Promise((resolve, reject) => { + const ctReg = cancellationToken?.onCancelled(reason => { + cleanup(); + reject(reason); + }); + stream.once('data', onData); + stream.once('error', onError); + stream.once('end', onEnd); + stream.resume() + + function onData(chunk) { + cleanup(); + resolve(chunk); + } + + function onError(...args) { + cleanup(); + reject(...args); + } + + function onEnd() { + cleanup(); + resolve(null); + } + + function cleanup() { + stream.pause(); + stream.off('data', onData); + stream.off('error', onError); + stream.off('end', onEnd); + if (ctReg) { + ctReg(); + } + } + }) +} + +/** + * Returns a readable stream that will read just a slice of some existing stream. + * @param stream The stream to read from. + * @param length The maximum number of bytes to read from the stream. + * @returns A stream that will read up to the given number of elements, leaving the rest in the underlying stream. + */ +export function sliceStream(stream: NodeJS.ReadableStream, length: number): Readable { return new Readable({ async read(_: number) { - const lenBuffer = await getBufferFrom(stream, 4); - const dv = new DataView(lenBuffer.buffer, lenBuffer.byteOffset, lenBuffer.length); - const chunkSize = dv.getUint32(0, false); - if (chunkSize === 0) { + if (length > 0) { + const chunk = stream.read() ?? await readAsync(stream); + if (!chunk) { + // We've reached the end of the source stream. + this.push(null); + return; + } + + const countToConsume = Math.min(length, chunk.length) + length -= countToConsume + stream.unshift(chunk.slice(countToConsume)) + if (this.push(chunk.slice(0, countToConsume)) && length === 0) { + // Save another call later by informing immediately that we're at the end of the stream. + this.push(null); + } + } else { this.push(null); - return; } + }, + }); +} + +export function readSubstream(stream: NodeJS.ReadableStream): NodeJS.ReadableStream { + let currentSlice: Readable | null = null + return new Readable({ + async read(_: number) { + while (true) { + if (currentSlice === null) { + const lenBuffer = await getBufferFrom(stream, 4); + const dv = new DataView(lenBuffer.buffer, lenBuffer.byteOffset, lenBuffer.length); + const length = dv.getUint32(0, false); + if (length === 0) { + // We've reached the end of the substream. + this.push(null); + return; + } + + currentSlice = sliceStream(stream, length) + } - // TODO: make this *stream* instead of read as an atomic chunk. - const payload = await getBufferFrom(stream, chunkSize); - this.push(payload); + const chunk = await readAsync(currentSlice); + if (!chunk) { + // We've reached the end of this chunk. We'll have to read the next header. + currentSlice = null; + continue; + } + + this.push(chunk); + return; + } }, }); } @@ -71,81 +168,34 @@ export async function getBufferFrom( allowEndOfStream: boolean = false, cancellationToken?: CancellationToken): Promise { - const streamEnded = new Deferred(); - if (size === 0) { - return Buffer.from([]); + return Buffer.alloc(0) } - let readBuffer: Buffer | null = null; - let index: number = 0; - while (size > 0) { - cancellationToken?.throwIfCancelled(); - let availableSize = (readable as Readable).readableLength; - if (!availableSize) { - // Check the end of stream - if ((readable as Readable).readableEnded || streamEnded.isCompleted) { - // stream is closed - if (!allowEndOfStream) { - throw new Error("Stream terminated before required bytes were read."); - } - - // Returns what has been read so far - if (readBuffer === null) { - return null; - } - - // we need trim extra spaces - return readBuffer.subarray(0, index) - } - - // we retain this behavior when availableSize === false - // to make existing unit tests happy (which assumes we will try to read stream when no data is ready.) - availableSize = size; - } else if (availableSize > size) { - availableSize = size; - } - - const newBuffer = readable.read(availableSize) as Buffer; - if (newBuffer) { - if (newBuffer.length < availableSize && !allowEndOfStream) { - throw new Error("Stream terminated before required bytes were read."); - } - - if (readBuffer === null) { - if (availableSize === size || newBuffer.length < availableSize) { - // in the fast pass, we read the entire data once, and donot allocate an extra array. - return newBuffer; - } + const initialData = readable.read(size) as Buffer | null; + if (initialData) { + return initialData; + } - // if we read partial data, we need allocate a buffer to join all data together. - readBuffer = Buffer.alloc(size); + let totalBytesRead = 0 + const result = Buffer.alloc(size); + const streamSlice = sliceStream(readable, size); + while (totalBytesRead < size) { + const chunk = await readAsync(streamSlice, cancellationToken) as Buffer | null + if (chunk === null) { + // We reached the end prematurely. + if (allowEndOfStream) { + return totalBytesRead === 0 ? null : result.subarray(0, totalBytesRead) + } else { + throw new Error(`End of stream encountered after only ${totalBytesRead} bytes when ${size} were expected.`); } - - // now append new data to the buffer - newBuffer.copy(readBuffer, index); - - size -= newBuffer.length; - index += newBuffer.length; } - if (size > 0) { - const bytesAvailable = new Deferred(); - const bytesAvailableCallback = bytesAvailable.resolve.bind(bytesAvailable); - const streamEndedCallback = streamEnded.resolve.bind(streamEnded); - readable.once("readable", bytesAvailableCallback); - readable.once("end", streamEndedCallback); - try { - const endPromise = Promise.race([bytesAvailable.promise, streamEnded.promise]); - await (cancellationToken ? cancellationToken.racePromise(endPromise) : endPromise); - } finally { - readable.removeListener("readable", bytesAvailableCallback); - readable.removeListener("end", streamEndedCallback); - } - } + chunk.copy(result, totalBytesRead); + totalBytesRead += chunk.length; } - return readBuffer; + return result; } export function throwIfDisposed(value: IDisposableObservable) { diff --git a/src/nerdbank-streams/src/index.ts b/src/nerdbank-streams/src/index.ts index 81a19d3a..6059f447 100644 --- a/src/nerdbank-streams/src/index.ts +++ b/src/nerdbank-streams/src/index.ts @@ -4,5 +4,5 @@ export { FullDuplexStream } from "./FullDuplexStream"; export { IDisposableObservable } from "./IDisposableObservable"; export { MultiplexingStream } from "./MultiplexingStream"; export { MultiplexingStreamOptions } from "./MultiplexingStreamOptions"; -export { writeSubstream, readSubstream } from "./Utilities"; +export { writeSubstream, readSubstream, readAsync, sliceStream } from "./Utilities"; export { QualifiedChannelId, ChannelSource } from "./QualifiedChannelId"; diff --git a/src/nerdbank-streams/src/tests/FullDuplexStream.spec.ts b/src/nerdbank-streams/src/tests/FullDuplexStream.spec.ts index 80b11e1f..1874cb2b 100644 --- a/src/nerdbank-streams/src/tests/FullDuplexStream.spec.ts +++ b/src/nerdbank-streams/src/tests/FullDuplexStream.spec.ts @@ -1,7 +1,7 @@ import { PassThrough, Readable, Writable } from "stream"; import { Deferred } from "../Deferred"; import { FullDuplexStream } from "../FullDuplexStream"; -import { getBufferFrom } from "../Utilities"; +import { getBufferFrom, readAsync } from "../Utilities"; import { delay } from "./Timeout"; describe("FullDuplexStream.CreatePair", () => { @@ -24,20 +24,20 @@ describe("FullDuplexStream.CreatePair", () => { await endPropagatesEndEvent(pair.second, pair.first); }); - it("stream1 write end leads to stream2 finish event", async () => { + it("stream1 write end leads to stream1 finish event", async () => { const pair = FullDuplexStream.CreatePair(); - await endPropagatesFinishEvent(pair.first, pair.second); - await endPropagatesFinishEvent(pair.second, pair.first); + await endRaisesFinishEvent(pair.first); + await endRaisesFinishEvent(pair.second); }); async function writePropagation(first: Writable, second: Readable): Promise { first.write("abc"); - expect(second.read()).toEqual(Buffer.from("abc")); + expect(await readAsync(second)).toEqual(Buffer.from("abc")); } - async function endPropagatesFinishEvent(first: Writable, second: Readable): Promise { + async function endRaisesFinishEvent(first: Writable): Promise { const signal = new Deferred(); - second.once("finish", () => { + first.once("finish", () => { signal.resolve(); }); expect(signal.isCompleted).toBe(false); @@ -63,8 +63,8 @@ describe("FullDuplexStream.Splice", () => { let duplex: NodeJS.ReadWriteStream; beforeEach(() => { - readable = new PassThrough({ writableHighWaterMark : 8 }); - writable = new PassThrough({ writableHighWaterMark : 8 }); + readable = new PassThrough({ writableHighWaterMark: 8 }); + writable = new PassThrough({ writableHighWaterMark: 8 }); duplex = FullDuplexStream.Splice(readable, writable); }); @@ -88,6 +88,12 @@ describe("FullDuplexStream.Splice", () => { expect(buffer).toBeNull(); }); + it("unshift", async () => { + duplex.unshift(Buffer.from([1, 2, 3])) + const result = duplex.read() + expect(result).toEqual(Buffer.from([1, 2, 3])) + }) + it("Read should yield when data is not ready", async () => { const task = writeToStream(duplex, "abcdefgh", 4); const buffer = await getBufferFrom(writable, 32); diff --git a/src/nerdbank-streams/src/tests/MultiplexingStream.Interop.spec.ts b/src/nerdbank-streams/src/tests/MultiplexingStream.Interop.spec.ts index 09a3348b..fc861c5a 100644 --- a/src/nerdbank-streams/src/tests/MultiplexingStream.Interop.spec.ts +++ b/src/nerdbank-streams/src/tests/MultiplexingStream.Interop.spec.ts @@ -4,6 +4,7 @@ import { Deferred } from "../Deferred"; import { FullDuplexStream } from "../FullDuplexStream"; import { MultiplexingStream } from "../MultiplexingStream"; import { ChannelOptions } from "../ChannelOptions"; +import { readAsync } from "../Utilities"; [1, 2, 3].forEach(protocolMajorVersion => { describe(`MultiplexingStream v${protocolMajorVersion} (interop) `, () => { @@ -101,34 +102,11 @@ import { ChannelOptions } from "../ChannelOptions"; return deferred.promise; } - async function readAsync(readable: NodeJS.ReadableStream): Promise { - let readBuffer = readable.read() as Buffer; - - if (readBuffer === null) { - const bytesAvailable = new Deferred(); - const streamEnded = new Deferred(); - const bytesAvailableCallback = bytesAvailable.resolve.bind(bytesAvailable); - const streamEndedCallback = streamEnded.resolve.bind(streamEnded); - readable.once("readable", bytesAvailableCallback); - readable.once("end", streamEndedCallback); - await Promise.race([bytesAvailable.promise, streamEnded.promise]); - readable.removeListener("readable", bytesAvailableCallback); - readable.removeListener("end", streamEndedCallback); - if (bytesAvailable.isCompleted) { - readBuffer = readable.read() as Buffer; - } else { - return null; - } - } - - return readBuffer; - } - async function readLineAsync(readable: NodeJS.ReadableStream): Promise { const buffers: Buffer[] = []; while (true) { - const segment = await readAsync(readable); + const segment = await readAsync(readable) as Buffer | null; if (segment === null) { break; } diff --git a/src/nerdbank-streams/src/tests/Substream.spec.ts b/src/nerdbank-streams/src/tests/Substream.spec.ts index 41905a4b..1cc3ee64 100644 --- a/src/nerdbank-streams/src/tests/Substream.spec.ts +++ b/src/nerdbank-streams/src/tests/Substream.spec.ts @@ -1,6 +1,5 @@ import "jasmine"; import { PassThrough } from "stream"; -import { Deferred } from "../Deferred"; import { getBufferFrom, readSubstream, writeAsync, writeSubstream } from "../Utilities"; describe("Substream", () => { @@ -135,12 +134,8 @@ describe("Substream", () => { await endAsync(thru); const substream = readSubstream(thru); - let readPayload = await getBufferFrom(substream, payload1.length); - expect(readPayload).toEqual(payload1); - readPayload = await getBufferFrom(substream, payload2.length); - expect(readPayload).toEqual(payload2); - - await expectEndOfStream(substream); + const readPayload = await getBufferFrom(substream, 10, true); + expect(readPayload).toEqual(Buffer.from([1, 2, 3, 4, 5, 6])); await expectEndOfStream(thru); }); @@ -183,24 +178,16 @@ describe("Substream", () => { await writeAsync(stream, Buffer.from(dv.buffer, dv.byteOffset, dv.byteLength)); } - async function endAsync(stream: NodeJS.WritableStream) { - const deferred = new Deferred(); - stream.end(() => deferred.resolve()); - return deferred.promise; - } - - function tick(): Promise { - const finished = new Deferred(); - process.nextTick(() => finished.resolve()); - return finished.promise; + function endAsync(stream: NodeJS.WritableStream) { + return new Promise(resolve => stream.end(resolve)); } - async function expectEndOfStream(stream: NodeJS.ReadableStream): Promise { - const finished = new Deferred(); - stream.once("end", () => finished.resolve()); - while (!finished.isCompleted) { - expect(stream.read()).toBeNull(); - await tick(); - } + function expectEndOfStream(stream: NodeJS.ReadableStream): Promise { + expect(stream.read()).toBeNull() + stream.resume(); + return new Promise((resolve, reject) => { + stream.once("end", () => resolve()); + stream.once("data", () => reject(new Error('EOF expected.'))); + }) } }); diff --git a/src/nerdbank-streams/src/tests/Utilities.spec.ts b/src/nerdbank-streams/src/tests/Utilities.spec.ts new file mode 100644 index 00000000..86f6b261 --- /dev/null +++ b/src/nerdbank-streams/src/tests/Utilities.spec.ts @@ -0,0 +1,71 @@ +import CancellationToken from "cancellationtoken"; +import { PassThrough } from "stream" +import { readAsync, sliceStream } from "../Utilities"; + +let thru: PassThrough +beforeEach(() => { + thru = new PassThrough(); +}) + +describe('readAsync', () => { + it('returns immediately with results', async () => { + thru.write(Buffer.from([1, 2, 3])) + thru.write(Buffer.from([4, 5, 6])) + + const result = await readAsync(thru) + expect(result).toEqual(Buffer.from([1, 2, 3, 4, 5, 6])) + }) + + it('to wait for data', async () => { + const resultPromise = readAsync(thru); + + thru.write(Buffer.from([1, 2, 3])) + thru.write(Buffer.from([4, 5, 6])) + + const result = await resultPromise; + expect(result).toEqual(Buffer.from([1, 2, 3])) + }) + + it('to return null at EOF', async () => { + thru.end() + expect(await readAsync(thru)).toBeNull() + }) + + it('to propagate errors', async () => { + const error = new Error('Mock error') + thru.destroy(error) + await expectAsync(readAsync(thru)).toBeRejectedWith(error); + }) + + it('bails on cancellation', async () => { + const cts = CancellationToken.create(); + const readPromise = readAsync(thru, cts.token); + cts.cancel(); + await expectAsync(readPromise).toBeRejected(); + }) +}) + +describe('sliceStream', () => { + it('returns null on empty', async () => { + thru.end() + const slice = sliceStream(thru, 5) + expect(slice.read()).toBeNull() + }) + + it('returns subset of upper stream', async () => { + thru.push(Buffer.from([1, 2, 3, 4, 5, 6])) + const slice = sliceStream(thru, 3) + expect(await readAsync(slice)).toEqual(Buffer.from([1, 2, 3])) + expect(await readAsync(slice)).toBeNull() + expect(await readAsync(thru)).toEqual(Buffer.from([4, 5, 6])) + }) + + it('handles slice that exceeds stream length', async () => { + thru.end(Buffer.from([1, 2, 3])) + const slice = sliceStream(thru, 6) + + const result = await readAsync(slice) + expect(result).toEqual(Buffer.from([1, 2, 3])) + expect(await readAsync(slice)).toBeNull() + }) +}) diff --git a/src/nerdbank-streams/yarn.lock b/src/nerdbank-streams/yarn.lock index 740ab880..21cebdc3 100644 --- a/src/nerdbank-streams/yarn.lock +++ b/src/nerdbank-streams/yarn.lock @@ -1411,6 +1411,11 @@ isobject@^3.0.0, isobject@^3.0.1: resolved "https://registry.yarnpkg.com/isobject/-/isobject-3.0.1.tgz#4e431e92b11a9731636aa1f9c8d1ccbcfdab78df" integrity sha512-WhB9zCku7EGTj/HQQRz5aUQEUeoQZH2bWcltRErOpymJ4boYE6wL9Tbr23krRPSZ+C5zqNSrSw+Cc7sZZ4b7vg== +isstream@^0.1.2: + version "0.1.2" + resolved "https://registry.yarnpkg.com/isstream/-/isstream-0.1.2.tgz#47e63f7af55afa6f92e1500e690eb8b8529c099a" + integrity sha512-Yljz7ffyPbrLpLngrMtZ7NduUgVvi6wG9RJ9IUcyCd59YQ911PBJphODUcbOVbqYfxe1wuYf/LJ8PauMRwsM/g== + jasmine-core@^4.5.0: version "4.5.0" resolved "https://registry.yarnpkg.com/jasmine-core/-/jasmine-core-4.5.0.tgz#1a6bd0bde3f60996164311c88a0995d67ceda7c3" @@ -1925,6 +1930,14 @@ pinkie@^2.0.0: resolved "https://registry.yarnpkg.com/pinkie/-/pinkie-2.0.4.tgz#72556b80cfa0d48a974e80e77248e80ed4f7f870" integrity sha512-MnUuEycAemtSaeFSjXKW/aroV7akBbY+Sv+RkyqFjgAe73F+MR0TBWKBRDkmfWq/HiFmdavfZ1G7h4SPZXaCSg== +plexer@^2.0.0: + version "2.0.0" + resolved "https://registry.yarnpkg.com/plexer/-/plexer-2.0.0.tgz#d5ff402f7fb79cce592101c193807d0b085ea41f" + integrity sha512-MWAhwDaaaNA9sfbZz00HjSxrwOxEUD8BW4pHG0obnjvJpyG3RcOjSIcDIQFW3NP+RDJlzPbyXdgCR3V24tyUdw== + dependencies: + isstream "^0.1.2" + readable-stream "^3.4.0" + plugin-error@^1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/plugin-error/-/plugin-error-1.0.1.tgz#77016bd8919d0ac377fdcdd0322328953ca5781c" @@ -2006,7 +2019,7 @@ readable-stream@1.1: isarray "0.0.1" string_decoder "~0.10.x" -"readable-stream@2 || 3": +"readable-stream@2 || 3", readable-stream@^3.4.0: version "3.6.0" resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-3.6.0.tgz#337bbda3adc0706bd3e024426a286d4b4b2c9198" integrity sha512-BViHy7LKeTz4oNnkcLJ+lVSL6vpiFeX6/d3oSH8zCW7UxP2onchk+vTGB143xuFjHS3deTgkKoXXymXqymiIdA==