From 332697e35fd24f3d13b75cdaee1afc9994642527 Mon Sep 17 00:00:00 2001 From: Andrew Arnott Date: Sat, 10 Dec 2022 14:26:50 -0700 Subject: [PATCH 1/3] Simplify Substream tests --- .../src/tests/Substream.spec.ts | 25 ++++++------------- 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/src/nerdbank-streams/src/tests/Substream.spec.ts b/src/nerdbank-streams/src/tests/Substream.spec.ts index 41905a4b..dc8f9f12 100644 --- a/src/nerdbank-streams/src/tests/Substream.spec.ts +++ b/src/nerdbank-streams/src/tests/Substream.spec.ts @@ -1,6 +1,5 @@ import "jasmine"; import { PassThrough } from "stream"; -import { Deferred } from "../Deferred"; import { getBufferFrom, readSubstream, writeAsync, writeSubstream } from "../Utilities"; describe("Substream", () => { @@ -183,24 +182,14 @@ describe("Substream", () => { await writeAsync(stream, Buffer.from(dv.buffer, dv.byteOffset, dv.byteLength)); } - async function endAsync(stream: NodeJS.WritableStream) { - const deferred = new Deferred(); - stream.end(() => deferred.resolve()); - return deferred.promise; + function endAsync(stream: NodeJS.WritableStream) { + return new Promise(resolve => stream.end(resolve)); } - function tick(): Promise { - const finished = new Deferred(); - process.nextTick(() => finished.resolve()); - return finished.promise; - } - - async function expectEndOfStream(stream: NodeJS.ReadableStream): Promise { - const finished = new Deferred(); - stream.once("end", () => finished.resolve()); - while (!finished.isCompleted) { - expect(stream.read()).toBeNull(); - await tick(); - } + function expectEndOfStream(stream: NodeJS.ReadableStream): Promise { + return new Promise((resolve, reject) => { + stream.once("end", () => resolve()); + stream.once("data", () => reject(new Error('EOF expected.'))); + }) } }); From 5a0a549623881822bb201397f45cbb86e596bf4a Mon Sep 17 00:00:00 2001 From: Andrew Arnott Date: Sat, 10 Dec 2022 16:27:54 -0700 Subject: [PATCH 2/3] Fix readSubstream to not buffer everything first Also export new `sliceStream` and `readAsync` functions. --- src/nerdbank-streams/src/Utilities.ts | 114 ++++++++++++++++-- src/nerdbank-streams/src/index.ts | 2 +- .../src/tests/Utilities.spec.ts | 62 ++++++++++ 3 files changed, 167 insertions(+), 11 deletions(-) create mode 100644 src/nerdbank-streams/src/tests/Utilities.spec.ts diff --git a/src/nerdbank-streams/src/Utilities.ts b/src/nerdbank-streams/src/Utilities.ts index 35bc881c..faaed11e 100644 --- a/src/nerdbank-streams/src/Utilities.ts +++ b/src/nerdbank-streams/src/Utilities.ts @@ -35,20 +35,114 @@ export function writeSubstream(stream: NodeJS.WritableStream): NodeJS.WritableSt }); } -export function readSubstream(stream: NodeJS.ReadableStream): NodeJS.ReadableStream { +/** + * Reads the next chunk from a stream, asynchronously waiting for more to be read if necessary. + * @param stream The stream to read from. + * @param cancellationToken A token whose cancellation will result in immediate rejection of the previously returned promise. + * @returns The result of reading from the stream. This will be null if the end of the stream is reached before any more can be read. + */ +export function readAsync(stream: NodeJS.ReadableStream, cancellationToken?: CancellationToken): Promise { + let result = stream.read() + if (result) { + return Promise.resolve(result) + } + + return new Promise((resolve, reject) => { + const ctReg = cancellationToken?.onCancelled(reason => { + cleanup(); + reject(reason); + }); + stream.once('data', onData); + stream.once('error', onError); + stream.once('end', onEnd); + + function onData(chunk) { + cleanup(); + resolve(chunk); + } + + function onError(...args) { + cleanup(); + reject(...args); + } + + function onEnd() { + cleanup(); + resolve(null); + } + + function cleanup() { + stream.off('data', onData); + stream.off('error', onError); + stream.off('end', onEnd); + if (ctReg) { + ctReg(); + } + } + }) +} + +/** + * Returns a readable stream that will read just a slice of some existing stream. + * @param stream The stream to read from. + * @param length The maximum number of bytes to read from the stream. + * @returns A stream that will read up to the given number of elements, leaving the rest in the underlying stream. + */ +export function sliceStream(stream: NodeJS.ReadableStream, length: number): Readable { return new Readable({ async read(_: number) { - const lenBuffer = await getBufferFrom(stream, 4); - const dv = new DataView(lenBuffer.buffer, lenBuffer.byteOffset, lenBuffer.length); - const chunkSize = dv.getUint32(0, false); - if (chunkSize === 0) { - this.push(null); - return; + while (length > 0) { + const chunk = await readAsync(stream); + if (!chunk) { + // We've reached the end of the source stream. + this.push(null); + return; + } + + const countToConsume = Math.min(length, chunk.length) + length -= countToConsume + stream.unshift(chunk.slice(countToConsume)) + if (!this.push(chunk.slice(0, countToConsume))) { + return; + } } - // TODO: make this *stream* instead of read as an atomic chunk. - const payload = await getBufferFrom(stream, chunkSize); - this.push(payload); + this.push(null); + }, + }); +} + +export function readSubstream(stream: NodeJS.ReadableStream): NodeJS.ReadableStream { + let currentSlice: Readable | null = null + return new Readable({ + async read(_: number) { + while (true) { + if (currentSlice === null) { + const lenBuffer = await getBufferFrom(stream, 4); + const dv = new DataView(lenBuffer.buffer, lenBuffer.byteOffset, lenBuffer.length); + const length = dv.getUint32(0, false); + if (length === 0) { + // We've reached the end of the substream. + this.push(null); + return; + } + + currentSlice = sliceStream(stream, length) + } + + while (currentSlice !== null) { + const chunk = await readAsync(currentSlice); + if (!chunk) { + // We've reached the end of this chunk. We'll have to read the next header. + currentSlice = null; + break; + } + + if (!this.push(chunk)) { + return; + } + } + } }, }); } diff --git a/src/nerdbank-streams/src/index.ts b/src/nerdbank-streams/src/index.ts index 81a19d3a..6059f447 100644 --- a/src/nerdbank-streams/src/index.ts +++ b/src/nerdbank-streams/src/index.ts @@ -4,5 +4,5 @@ export { FullDuplexStream } from "./FullDuplexStream"; export { IDisposableObservable } from "./IDisposableObservable"; export { MultiplexingStream } from "./MultiplexingStream"; export { MultiplexingStreamOptions } from "./MultiplexingStreamOptions"; -export { writeSubstream, readSubstream } from "./Utilities"; +export { writeSubstream, readSubstream, readAsync, sliceStream } from "./Utilities"; export { QualifiedChannelId, ChannelSource } from "./QualifiedChannelId"; diff --git a/src/nerdbank-streams/src/tests/Utilities.spec.ts b/src/nerdbank-streams/src/tests/Utilities.spec.ts new file mode 100644 index 00000000..49970a72 --- /dev/null +++ b/src/nerdbank-streams/src/tests/Utilities.spec.ts @@ -0,0 +1,62 @@ +import CancellationToken from "cancellationtoken"; +import { PassThrough } from "stream" +import { readAsync, sliceStream } from "../Utilities"; + +let thru: PassThrough +beforeEach(() => { + thru = new PassThrough(); +}) + +describe('readAsync', function () { + it('returns immediately with results', async function () { + thru.write(Buffer.from([1, 2, 3])) + thru.write(Buffer.from([4, 5, 6])) + + const result = await readAsync(thru) + expect(result).toEqual(Buffer.from([1, 2, 3, 4, 5, 6])) + }) + + it('to wait for data', async function () { + const resultPromise = readAsync(thru); + + thru.write(Buffer.from([1, 2, 3])) + thru.write(Buffer.from([4, 5, 6])) + + const result = await resultPromise; + expect(result).toEqual(Buffer.from([1, 2, 3])) + }) + + it('to return null at EOF', async function () { + thru.end() + expect(await readAsync(thru)).toBeNull() + }) + + it('to propagate errors', async function () { + const error = new Error('Mock error') + thru.destroy(error) + await expectAsync(readAsync(thru)).toBeRejectedWith(error); + }) + + it('bails on cancellation', async function () { + const cts = CancellationToken.create(); + const readPromise = readAsync(thru, cts.token); + cts.cancel(); + await expectAsync(readPromise).toBeRejected(); + }) +}) + +describe('sliceStream', function () { + it('returns null on empty', async function () { + thru.end() + const slice = sliceStream(thru, 5) + expect(slice.read()).toBeNull() + }) + + it('returns subset of upper stream', async function () { + thru.push(Buffer.from([1, 2, 3, 4, 5, 6])) + const slice = sliceStream(thru, 3) + expect(await readAsync(slice)).toEqual(Buffer.from([1, 2, 3])) + expect(await readAsync(slice)).toBeNull() + expect(await readAsync(thru)).toEqual(Buffer.from([4, 5, 6])) + }) +}) From 8e2fabf71a1765144b86a0f7cfeef8f032c183de Mon Sep 17 00:00:00 2001 From: Andrew Arnott Date: Mon, 12 Dec 2022 07:16:48 -0700 Subject: [PATCH 3/3] Test and fix more use cases --- azure-pipelines/node.yml | 2 +- src/nerdbank-streams/package.json | 6 +- src/nerdbank-streams/src/FullDuplexStream.ts | 17 +-- src/nerdbank-streams/src/Utilities.ts | 126 ++++++------------ .../src/tests/FullDuplexStream.spec.ts | 24 ++-- .../tests/MultiplexingStream.Interop.spec.ts | 26 +--- .../src/tests/Substream.spec.ts | 10 +- .../src/tests/Utilities.spec.ts | 27 ++-- src/nerdbank-streams/yarn.lock | 15 ++- 9 files changed, 101 insertions(+), 152 deletions(-) diff --git a/azure-pipelines/node.yml b/azure-pipelines/node.yml index be0803f0..5253c6f5 100644 --- a/azure-pipelines/node.yml +++ b/azure-pipelines/node.yml @@ -30,4 +30,4 @@ steps: displayName: 🧪 yarn tslint inputs: projectDirectory: src/nerdbank-streams - arguments: tslint --project . + arguments: lint diff --git a/src/nerdbank-streams/package.json b/src/nerdbank-streams/package.json index 98cec8d2..5389ffe4 100644 --- a/src/nerdbank-streams/package.json +++ b/src/nerdbank-streams/package.json @@ -28,7 +28,8 @@ "scripts": { "build": "tsc -p gulpfile.tsconfig.json && gulp", "watch": "node ./node_modules/typescript/bin/tsc -p tsconfig.json -w", - "test": "jasmine" + "test": "jasmine", + "lint": "tslint --project ." }, "devDependencies": { "@types/jasmine": "^4.0.3", @@ -52,6 +53,7 @@ "await-semaphore": "^0.1.3", "cancellationtoken": "^2.0.1", "caught": "^0.1.3", - "msgpack-lite": "^0.1.26" + "msgpack-lite": "^0.1.26", + "plexer": "^2.0.0" } } diff --git a/src/nerdbank-streams/src/FullDuplexStream.ts b/src/nerdbank-streams/src/FullDuplexStream.ts index c3932415..28086eaa 100644 --- a/src/nerdbank-streams/src/FullDuplexStream.ts +++ b/src/nerdbank-streams/src/FullDuplexStream.ts @@ -1,4 +1,5 @@ import { Duplex, PassThrough } from "stream"; +import duplexer = require('plexer') export class FullDuplexStream { public static CreatePair(): { first: Duplex, second: Duplex } { @@ -11,20 +12,6 @@ export class FullDuplexStream { } public static Splice(readable: NodeJS.ReadableStream, writable: NodeJS.WritableStream): Duplex { - const duplex = new Duplex({ - write(chunk, encoding, callback) { - writable.write(chunk, encoding, callback); - }, - - final(callback) { - writable.end(callback); - }, - }); - - // All reads and events come directly from the readable stream. - duplex.read = readable.read.bind(readable); - duplex.on = readable.on.bind(readable) as any; - - return duplex; + return duplexer(writable, readable) } } diff --git a/src/nerdbank-streams/src/Utilities.ts b/src/nerdbank-streams/src/Utilities.ts index faaed11e..b87d4a32 100644 --- a/src/nerdbank-streams/src/Utilities.ts +++ b/src/nerdbank-streams/src/Utilities.ts @@ -1,6 +1,5 @@ import CancellationToken from "cancellationtoken"; import { Readable, Writable } from "stream"; -import { Deferred } from "./Deferred"; import { IDisposableObservable } from "./IDisposableObservable"; export async function writeAsync(stream: NodeJS.WritableStream, chunk: any) { @@ -42,7 +41,11 @@ export function writeSubstream(stream: NodeJS.WritableStream): NodeJS.WritableSt * @returns The result of reading from the stream. This will be null if the end of the stream is reached before any more can be read. */ export function readAsync(stream: NodeJS.ReadableStream, cancellationToken?: CancellationToken): Promise { - let result = stream.read() + if (!(stream.isPaused() || (stream as Readable).readableFlowing !== true)) { + throw new Error('Stream must not be in flowing mode.'); + } + + const result = stream.read() if (result) { return Promise.resolve(result) } @@ -55,6 +58,7 @@ export function readAsync(stream: NodeJS.ReadableStream, cancellationToken?: Can stream.once('data', onData); stream.once('error', onError); stream.once('end', onEnd); + stream.resume() function onData(chunk) { cleanup(); @@ -72,6 +76,7 @@ export function readAsync(stream: NodeJS.ReadableStream, cancellationToken?: Can } function cleanup() { + stream.pause(); stream.off('data', onData); stream.off('error', onError); stream.off('end', onEnd); @@ -91,8 +96,8 @@ export function readAsync(stream: NodeJS.ReadableStream, cancellationToken?: Can export function sliceStream(stream: NodeJS.ReadableStream, length: number): Readable { return new Readable({ async read(_: number) { - while (length > 0) { - const chunk = await readAsync(stream); + if (length > 0) { + const chunk = stream.read() ?? await readAsync(stream); if (!chunk) { // We've reached the end of the source stream. this.push(null); @@ -102,12 +107,13 @@ export function sliceStream(stream: NodeJS.ReadableStream, length: number): Read const countToConsume = Math.min(length, chunk.length) length -= countToConsume stream.unshift(chunk.slice(countToConsume)) - if (!this.push(chunk.slice(0, countToConsume))) { - return; + if (this.push(chunk.slice(0, countToConsume)) && length === 0) { + // Save another call later by informing immediately that we're at the end of the stream. + this.push(null); } + } else { + this.push(null); } - - this.push(null); }, }); } @@ -130,18 +136,15 @@ export function readSubstream(stream: NodeJS.ReadableStream): NodeJS.ReadableStr currentSlice = sliceStream(stream, length) } - while (currentSlice !== null) { - const chunk = await readAsync(currentSlice); - if (!chunk) { - // We've reached the end of this chunk. We'll have to read the next header. - currentSlice = null; - break; - } - - if (!this.push(chunk)) { - return; - } + const chunk = await readAsync(currentSlice); + if (!chunk) { + // We've reached the end of this chunk. We'll have to read the next header. + currentSlice = null; + continue; } + + this.push(chunk); + return; } }, }); @@ -165,81 +168,34 @@ export async function getBufferFrom( allowEndOfStream: boolean = false, cancellationToken?: CancellationToken): Promise { - const streamEnded = new Deferred(); - if (size === 0) { - return Buffer.from([]); + return Buffer.alloc(0) } - let readBuffer: Buffer | null = null; - let index: number = 0; - while (size > 0) { - cancellationToken?.throwIfCancelled(); - let availableSize = (readable as Readable).readableLength; - if (!availableSize) { - // Check the end of stream - if ((readable as Readable).readableEnded || streamEnded.isCompleted) { - // stream is closed - if (!allowEndOfStream) { - throw new Error("Stream terminated before required bytes were read."); - } - - // Returns what has been read so far - if (readBuffer === null) { - return null; - } - - // we need trim extra spaces - return readBuffer.subarray(0, index) - } - - // we retain this behavior when availableSize === false - // to make existing unit tests happy (which assumes we will try to read stream when no data is ready.) - availableSize = size; - } else if (availableSize > size) { - availableSize = size; - } - - const newBuffer = readable.read(availableSize) as Buffer; - if (newBuffer) { - if (newBuffer.length < availableSize && !allowEndOfStream) { - throw new Error("Stream terminated before required bytes were read."); - } - - if (readBuffer === null) { - if (availableSize === size || newBuffer.length < availableSize) { - // in the fast pass, we read the entire data once, and donot allocate an extra array. - return newBuffer; - } + const initialData = readable.read(size) as Buffer | null; + if (initialData) { + return initialData; + } - // if we read partial data, we need allocate a buffer to join all data together. - readBuffer = Buffer.alloc(size); + let totalBytesRead = 0 + const result = Buffer.alloc(size); + const streamSlice = sliceStream(readable, size); + while (totalBytesRead < size) { + const chunk = await readAsync(streamSlice, cancellationToken) as Buffer | null + if (chunk === null) { + // We reached the end prematurely. + if (allowEndOfStream) { + return totalBytesRead === 0 ? null : result.subarray(0, totalBytesRead) + } else { + throw new Error(`End of stream encountered after only ${totalBytesRead} bytes when ${size} were expected.`); } - - // now append new data to the buffer - newBuffer.copy(readBuffer, index); - - size -= newBuffer.length; - index += newBuffer.length; } - if (size > 0) { - const bytesAvailable = new Deferred(); - const bytesAvailableCallback = bytesAvailable.resolve.bind(bytesAvailable); - const streamEndedCallback = streamEnded.resolve.bind(streamEnded); - readable.once("readable", bytesAvailableCallback); - readable.once("end", streamEndedCallback); - try { - const endPromise = Promise.race([bytesAvailable.promise, streamEnded.promise]); - await (cancellationToken ? cancellationToken.racePromise(endPromise) : endPromise); - } finally { - readable.removeListener("readable", bytesAvailableCallback); - readable.removeListener("end", streamEndedCallback); - } - } + chunk.copy(result, totalBytesRead); + totalBytesRead += chunk.length; } - return readBuffer; + return result; } export function throwIfDisposed(value: IDisposableObservable) { diff --git a/src/nerdbank-streams/src/tests/FullDuplexStream.spec.ts b/src/nerdbank-streams/src/tests/FullDuplexStream.spec.ts index 80b11e1f..1874cb2b 100644 --- a/src/nerdbank-streams/src/tests/FullDuplexStream.spec.ts +++ b/src/nerdbank-streams/src/tests/FullDuplexStream.spec.ts @@ -1,7 +1,7 @@ import { PassThrough, Readable, Writable } from "stream"; import { Deferred } from "../Deferred"; import { FullDuplexStream } from "../FullDuplexStream"; -import { getBufferFrom } from "../Utilities"; +import { getBufferFrom, readAsync } from "../Utilities"; import { delay } from "./Timeout"; describe("FullDuplexStream.CreatePair", () => { @@ -24,20 +24,20 @@ describe("FullDuplexStream.CreatePair", () => { await endPropagatesEndEvent(pair.second, pair.first); }); - it("stream1 write end leads to stream2 finish event", async () => { + it("stream1 write end leads to stream1 finish event", async () => { const pair = FullDuplexStream.CreatePair(); - await endPropagatesFinishEvent(pair.first, pair.second); - await endPropagatesFinishEvent(pair.second, pair.first); + await endRaisesFinishEvent(pair.first); + await endRaisesFinishEvent(pair.second); }); async function writePropagation(first: Writable, second: Readable): Promise { first.write("abc"); - expect(second.read()).toEqual(Buffer.from("abc")); + expect(await readAsync(second)).toEqual(Buffer.from("abc")); } - async function endPropagatesFinishEvent(first: Writable, second: Readable): Promise { + async function endRaisesFinishEvent(first: Writable): Promise { const signal = new Deferred(); - second.once("finish", () => { + first.once("finish", () => { signal.resolve(); }); expect(signal.isCompleted).toBe(false); @@ -63,8 +63,8 @@ describe("FullDuplexStream.Splice", () => { let duplex: NodeJS.ReadWriteStream; beforeEach(() => { - readable = new PassThrough({ writableHighWaterMark : 8 }); - writable = new PassThrough({ writableHighWaterMark : 8 }); + readable = new PassThrough({ writableHighWaterMark: 8 }); + writable = new PassThrough({ writableHighWaterMark: 8 }); duplex = FullDuplexStream.Splice(readable, writable); }); @@ -88,6 +88,12 @@ describe("FullDuplexStream.Splice", () => { expect(buffer).toBeNull(); }); + it("unshift", async () => { + duplex.unshift(Buffer.from([1, 2, 3])) + const result = duplex.read() + expect(result).toEqual(Buffer.from([1, 2, 3])) + }) + it("Read should yield when data is not ready", async () => { const task = writeToStream(duplex, "abcdefgh", 4); const buffer = await getBufferFrom(writable, 32); diff --git a/src/nerdbank-streams/src/tests/MultiplexingStream.Interop.spec.ts b/src/nerdbank-streams/src/tests/MultiplexingStream.Interop.spec.ts index 09a3348b..fc861c5a 100644 --- a/src/nerdbank-streams/src/tests/MultiplexingStream.Interop.spec.ts +++ b/src/nerdbank-streams/src/tests/MultiplexingStream.Interop.spec.ts @@ -4,6 +4,7 @@ import { Deferred } from "../Deferred"; import { FullDuplexStream } from "../FullDuplexStream"; import { MultiplexingStream } from "../MultiplexingStream"; import { ChannelOptions } from "../ChannelOptions"; +import { readAsync } from "../Utilities"; [1, 2, 3].forEach(protocolMajorVersion => { describe(`MultiplexingStream v${protocolMajorVersion} (interop) `, () => { @@ -101,34 +102,11 @@ import { ChannelOptions } from "../ChannelOptions"; return deferred.promise; } - async function readAsync(readable: NodeJS.ReadableStream): Promise { - let readBuffer = readable.read() as Buffer; - - if (readBuffer === null) { - const bytesAvailable = new Deferred(); - const streamEnded = new Deferred(); - const bytesAvailableCallback = bytesAvailable.resolve.bind(bytesAvailable); - const streamEndedCallback = streamEnded.resolve.bind(streamEnded); - readable.once("readable", bytesAvailableCallback); - readable.once("end", streamEndedCallback); - await Promise.race([bytesAvailable.promise, streamEnded.promise]); - readable.removeListener("readable", bytesAvailableCallback); - readable.removeListener("end", streamEndedCallback); - if (bytesAvailable.isCompleted) { - readBuffer = readable.read() as Buffer; - } else { - return null; - } - } - - return readBuffer; - } - async function readLineAsync(readable: NodeJS.ReadableStream): Promise { const buffers: Buffer[] = []; while (true) { - const segment = await readAsync(readable); + const segment = await readAsync(readable) as Buffer | null; if (segment === null) { break; } diff --git a/src/nerdbank-streams/src/tests/Substream.spec.ts b/src/nerdbank-streams/src/tests/Substream.spec.ts index dc8f9f12..1cc3ee64 100644 --- a/src/nerdbank-streams/src/tests/Substream.spec.ts +++ b/src/nerdbank-streams/src/tests/Substream.spec.ts @@ -134,12 +134,8 @@ describe("Substream", () => { await endAsync(thru); const substream = readSubstream(thru); - let readPayload = await getBufferFrom(substream, payload1.length); - expect(readPayload).toEqual(payload1); - readPayload = await getBufferFrom(substream, payload2.length); - expect(readPayload).toEqual(payload2); - - await expectEndOfStream(substream); + const readPayload = await getBufferFrom(substream, 10, true); + expect(readPayload).toEqual(Buffer.from([1, 2, 3, 4, 5, 6])); await expectEndOfStream(thru); }); @@ -187,6 +183,8 @@ describe("Substream", () => { } function expectEndOfStream(stream: NodeJS.ReadableStream): Promise { + expect(stream.read()).toBeNull() + stream.resume(); return new Promise((resolve, reject) => { stream.once("end", () => resolve()); stream.once("data", () => reject(new Error('EOF expected.'))); diff --git a/src/nerdbank-streams/src/tests/Utilities.spec.ts b/src/nerdbank-streams/src/tests/Utilities.spec.ts index 49970a72..86f6b261 100644 --- a/src/nerdbank-streams/src/tests/Utilities.spec.ts +++ b/src/nerdbank-streams/src/tests/Utilities.spec.ts @@ -7,8 +7,8 @@ beforeEach(() => { thru = new PassThrough(); }) -describe('readAsync', function () { - it('returns immediately with results', async function () { +describe('readAsync', () => { + it('returns immediately with results', async () => { thru.write(Buffer.from([1, 2, 3])) thru.write(Buffer.from([4, 5, 6])) @@ -16,7 +16,7 @@ describe('readAsync', function () { expect(result).toEqual(Buffer.from([1, 2, 3, 4, 5, 6])) }) - it('to wait for data', async function () { + it('to wait for data', async () => { const resultPromise = readAsync(thru); thru.write(Buffer.from([1, 2, 3])) @@ -26,18 +26,18 @@ describe('readAsync', function () { expect(result).toEqual(Buffer.from([1, 2, 3])) }) - it('to return null at EOF', async function () { + it('to return null at EOF', async () => { thru.end() expect(await readAsync(thru)).toBeNull() }) - it('to propagate errors', async function () { + it('to propagate errors', async () => { const error = new Error('Mock error') thru.destroy(error) await expectAsync(readAsync(thru)).toBeRejectedWith(error); }) - it('bails on cancellation', async function () { + it('bails on cancellation', async () => { const cts = CancellationToken.create(); const readPromise = readAsync(thru, cts.token); cts.cancel(); @@ -45,18 +45,27 @@ describe('readAsync', function () { }) }) -describe('sliceStream', function () { - it('returns null on empty', async function () { +describe('sliceStream', () => { + it('returns null on empty', async () => { thru.end() const slice = sliceStream(thru, 5) expect(slice.read()).toBeNull() }) - it('returns subset of upper stream', async function () { + it('returns subset of upper stream', async () => { thru.push(Buffer.from([1, 2, 3, 4, 5, 6])) const slice = sliceStream(thru, 3) expect(await readAsync(slice)).toEqual(Buffer.from([1, 2, 3])) expect(await readAsync(slice)).toBeNull() expect(await readAsync(thru)).toEqual(Buffer.from([4, 5, 6])) }) + + it('handles slice that exceeds stream length', async () => { + thru.end(Buffer.from([1, 2, 3])) + const slice = sliceStream(thru, 6) + + const result = await readAsync(slice) + expect(result).toEqual(Buffer.from([1, 2, 3])) + expect(await readAsync(slice)).toBeNull() + }) }) diff --git a/src/nerdbank-streams/yarn.lock b/src/nerdbank-streams/yarn.lock index 740ab880..21cebdc3 100644 --- a/src/nerdbank-streams/yarn.lock +++ b/src/nerdbank-streams/yarn.lock @@ -1411,6 +1411,11 @@ isobject@^3.0.0, isobject@^3.0.1: resolved "https://registry.yarnpkg.com/isobject/-/isobject-3.0.1.tgz#4e431e92b11a9731636aa1f9c8d1ccbcfdab78df" integrity sha512-WhB9zCku7EGTj/HQQRz5aUQEUeoQZH2bWcltRErOpymJ4boYE6wL9Tbr23krRPSZ+C5zqNSrSw+Cc7sZZ4b7vg== +isstream@^0.1.2: + version "0.1.2" + resolved "https://registry.yarnpkg.com/isstream/-/isstream-0.1.2.tgz#47e63f7af55afa6f92e1500e690eb8b8529c099a" + integrity sha512-Yljz7ffyPbrLpLngrMtZ7NduUgVvi6wG9RJ9IUcyCd59YQ911PBJphODUcbOVbqYfxe1wuYf/LJ8PauMRwsM/g== + jasmine-core@^4.5.0: version "4.5.0" resolved "https://registry.yarnpkg.com/jasmine-core/-/jasmine-core-4.5.0.tgz#1a6bd0bde3f60996164311c88a0995d67ceda7c3" @@ -1925,6 +1930,14 @@ pinkie@^2.0.0: resolved "https://registry.yarnpkg.com/pinkie/-/pinkie-2.0.4.tgz#72556b80cfa0d48a974e80e77248e80ed4f7f870" integrity sha512-MnUuEycAemtSaeFSjXKW/aroV7akBbY+Sv+RkyqFjgAe73F+MR0TBWKBRDkmfWq/HiFmdavfZ1G7h4SPZXaCSg== +plexer@^2.0.0: + version "2.0.0" + resolved "https://registry.yarnpkg.com/plexer/-/plexer-2.0.0.tgz#d5ff402f7fb79cce592101c193807d0b085ea41f" + integrity sha512-MWAhwDaaaNA9sfbZz00HjSxrwOxEUD8BW4pHG0obnjvJpyG3RcOjSIcDIQFW3NP+RDJlzPbyXdgCR3V24tyUdw== + dependencies: + isstream "^0.1.2" + readable-stream "^3.4.0" + plugin-error@^1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/plugin-error/-/plugin-error-1.0.1.tgz#77016bd8919d0ac377fdcdd0322328953ca5781c" @@ -2006,7 +2019,7 @@ readable-stream@1.1: isarray "0.0.1" string_decoder "~0.10.x" -"readable-stream@2 || 3": +"readable-stream@2 || 3", readable-stream@^3.4.0: version "3.6.0" resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-3.6.0.tgz#337bbda3adc0706bd3e024426a286d4b4b2c9198" integrity sha512-BViHy7LKeTz4oNnkcLJ+lVSL6vpiFeX6/d3oSH8zCW7UxP2onchk+vTGB143xuFjHS3deTgkKoXXymXqymiIdA==