Skip to content

Commit 6d140aa

Browse files
committed
stream: reduce allocations on WHATWG streams hot paths
Pure-JavaScript optimizations to lib/internal/webstreams/*: - Reuse the pull and write promise-reaction closures per controller instead of allocating two fresh closures per chunk. They are created lazily on the first pull/write so construction-only workloads never pay for them. - Add the buffered fast path to the ReadableStream async iterator, mirroring the one ReadableStreamDefaultReader.read() already has: when data is queued in a default controller, dequeue directly and skip the read request, the deferred promise, and the promise chaining on the following call. - Run the post-start step through queueMicrotask() when the start algorithm result is not an object (fulfillment is guaranteed and no .then lookup is observable), instead of wrapping it in new Promise((r) => r(result)) plus a two-closure reaction. Object and thenable results keep the promise path since their adoption timing and .then lookups are observable. - Specialize the promise-callback wrappers for user algorithms by arity (0/1/2), replacing the rest-parameter + ReflectApply form that allocated an arguments array per invocation. The exact number of arguments each user callback observes is preserved. - Share immutable nil records for the writable stream closeRequest, inFlightWriteRequest, inFlightCloseRequest and pendingAbortRequest resets; these records are only ever replaced wholesale. Push the PromiseWithResolvers() record directly as the write request rather than rebuilding an identical object. - Remove dead per-instance allocations: the never-read close record in the writable stream state, the placeholder close/ready records that reader/writer setup unconditionally replaces, the per-stream () => 1 size algorithm closures, and the kControllerErrorFunction placeholder plus bound function (now a prototype method; byte streams keep their historical no-op behavior there). Benchmark results vs main, same-day builds, benchmark/compare.js --runs 10 (statistically significant rows): creation WritableStream (n=500k) +97.1% *** creation TransformStream (n=500k) +50.3% *** creation ReadableStream (n=500k) +13.9% *** creation ReadableStreamBYOBReader (n=500k) +13.0% *** creation ReadableStreamDefaultReader (n=500k) +9.8% ** readable-async-iterator +38.1% *** pipe-to (16 hwm configs) +2.6..+4.5% (all positive) js_transfer WS / TS / RS +7.7% / +6.3% / +3.0% readable-read normal/byob, read-buffered parity (n.s.) WPT streams/compression/encoding results are identical to main (1403/338/3822 subtests passed, same 8 expected failures by name), and all webstreams-related parallel tests pass. Assisted-by: Claude Fable 5 <noreply@anthropic.com>
1 parent 19c46ab commit 6d140aa

4 files changed

Lines changed: 228 additions & 165 deletions

File tree

lib/internal/webstreams/readablestream.js

Lines changed: 109 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,9 @@ const {
104104
canCopyArrayBuffer,
105105
cloneAsUint8Array,
106106
copyArrayBuffer,
107-
createPromiseCallback,
107+
createPromiseCallback1,
108108
customInspect,
109+
defaultSizeAlgorithm,
109110
dequeueValue,
110111
enqueueValueWithSize,
111112
extractHighWaterMark,
@@ -252,7 +253,6 @@ class ReadableStream {
252253
this[kState] = createReadableStreamState();
253254

254255
this[kIsClosedPromise] = PromiseWithResolvers();
255-
this[kControllerErrorFunction] = () => {};
256256

257257
// The spec requires handling of the strategy first
258258
// here. Specifically, if getting the size and
@@ -294,6 +294,15 @@ class ReadableStream {
294294
return this[kState].state === 'readable';
295295
}
296296

297+
[kControllerErrorFunction](error) {
298+
// Used by the internal stream interop (addAbortSignal). Historically
299+
// only default controllers were wired here; byte stream controllers
300+
// keep the previous no-op behavior.
301+
const controller = this[kState].controller;
302+
if (isReadableStreamDefaultController(controller))
303+
controller.error(error);
304+
}
305+
297306
/**
298307
* @readonly
299308
* @type {boolean}
@@ -535,9 +544,38 @@ class ReadableStream {
535544
state.current = PromiseResolve();
536545
started = true;
537546
}
538-
state.current = state.current !== undefined ?
539-
PromisePrototypeThen(state.current, nextSteps, nextSteps) :
540-
nextSteps();
547+
if (state.current !== undefined) {
548+
state.current =
549+
PromisePrototypeThen(state.current, nextSteps, nextSteps);
550+
return state.current;
551+
}
552+
// No read is in flight. Mirror the buffered fast path of
553+
// ReadableStreamDefaultReader.read(): when data is already queued
554+
// in a default controller, resolve immediately without allocating
555+
// a read request. The result settles synchronously, so leaving
556+
// state.current undefined matches the state the slow path reaches
557+
// once its read request callbacks have settled.
558+
const stream = reader[kState].stream;
559+
if (!state.done && stream !== undefined) {
560+
const controller = stream[kState].controller;
561+
if (stream[kState].state === 'readable' &&
562+
isReadableStreamDefaultController(controller) &&
563+
controller[kState].queue.length > 0) {
564+
stream[kState].disturbed = true;
565+
const chunk = dequeueValue(controller);
566+
567+
if (controller[kState].closeRequested &&
568+
!controller[kState].queue.length) {
569+
readableStreamDefaultControllerClearAlgorithms(controller);
570+
readableStreamClose(stream);
571+
} else {
572+
readableStreamDefaultControllerCallPullIfNeeded(controller);
573+
}
574+
575+
return PromiseResolve({ done: false, value: chunk });
576+
}
577+
}
578+
state.current = nextSteps();
541579
return state.current;
542580
},
543581

@@ -611,7 +649,7 @@ class ReadableStream {
611649
// lingering promise not being properly resolved.
612650
// https://github.com/nodejs/node/issues/51486
613651
new transfer.CrossRealmTransformReadableSource(port, true),
614-
0, () => 1);
652+
0, defaultSizeAlgorithm);
615653
}
616654
}
617655

@@ -835,11 +873,8 @@ class ReadableStreamDefaultReader {
835873
this[kState] = {
836874
readRequests: [],
837875
stream: undefined,
838-
close: {
839-
promise: undefined,
840-
resolve: undefined,
841-
reject: undefined,
842-
},
876+
// The record is unconditionally replaced during setup.
877+
close: undefined,
843878
};
844879
setupReadableStreamDefaultReader(this, stream);
845880
}
@@ -949,11 +984,8 @@ class ReadableStreamBYOBReader {
949984
this[kState] = {
950985
stream: undefined,
951986
readIntoRequests: [],
952-
close: {
953-
promise: undefined,
954-
resolve: undefined,
955-
reject: undefined,
956-
},
987+
// The record is unconditionally replaced during setup.
988+
close: undefined,
957989
};
958990
setupReadableStreamBYOBReader(this, stream);
959991
}
@@ -1287,7 +1319,7 @@ function InternalReadableStream(start, pull, cancel, highWaterMark, size) {
12871319
ObjectSetPrototypeOf(InternalReadableStream.prototype, ReadableStream.prototype);
12881320
ObjectSetPrototypeOf(InternalReadableStream, ReadableStream);
12891321

1290-
function createReadableStream(start, pull, cancel, highWaterMark = 1, size = () => 1) {
1322+
function createReadableStream(start, pull, cancel, highWaterMark = 1, size = defaultSizeAlgorithm) {
12911323
const stream = new InternalReadableStream(start, pull, cancel, highWaterMark, size);
12921324

12931325
// For spec compliance the InternalReadableStream must be a ReadableStream
@@ -2475,16 +2507,24 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller) {
24752507
}
24762508
assert(!controller[kState].pullAgain);
24772509
controller[kState].pulling = true;
2478-
PromisePrototypeThen(
2479-
controller[kState].pullAlgorithm(controller),
2480-
() => {
2510+
if (controller[kState].pullFulfilled === undefined) {
2511+
// The pull reaction closures only capture the controller, so they are
2512+
// created once on the first pull and reused for every subsequent pull
2513+
// instead of allocating two fresh closures per chunk.
2514+
controller[kState].pullFulfilled = () => {
24812515
controller[kState].pulling = false;
24822516
if (controller[kState].pullAgain) {
24832517
controller[kState].pullAgain = false;
24842518
readableStreamDefaultControllerCallPullIfNeeded(controller);
24852519
}
2486-
},
2487-
(error) => readableStreamDefaultControllerError(controller, error));
2520+
};
2521+
controller[kState].pullRejected =
2522+
(error) => readableStreamDefaultControllerError(controller, error);
2523+
}
2524+
PromisePrototypeThen(
2525+
controller[kState].pullAlgorithm(controller),
2526+
controller[kState].pullFulfilled,
2527+
controller[kState].pullRejected);
24882528
}
24892529

24902530
function readableStreamDefaultControllerClearAlgorithms(controller) {
@@ -2547,17 +2587,33 @@ function setupReadableStreamDefaultController(
25472587
pullAgain: false,
25482588
pullAlgorithm,
25492589
pulling: false,
2590+
pullFulfilled: undefined,
2591+
pullRejected: undefined,
25502592
queue: [],
25512593
queueTotalSize: 0,
25522594
started: false,
25532595
sizeAlgorithm,
25542596
stream,
25552597
};
25562598
stream[kState].controller = controller;
2557-
stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller);
25582599

25592600
const startResult = startAlgorithm();
25602601

2602+
if (startResult === null ||
2603+
(typeof startResult !== 'object' && typeof startResult !== 'function')) {
2604+
// Non-thenable start result: fulfillment is guaranteed and no .then
2605+
// lookup on the result is observable, so run the post-start step
2606+
// directly at the exact microtask position the promise reaction
2607+
// would have had, skipping two promise allocations.
2608+
queueMicrotask(() => {
2609+
controller[kState].started = true;
2610+
assert(!controller[kState].pulling);
2611+
assert(!controller[kState].pullAgain);
2612+
readableStreamDefaultControllerCallPullIfNeeded(controller);
2613+
});
2614+
return;
2615+
}
2616+
25612617
PromisePrototypeThen(
25622618
new Promise((r) => r(startResult)),
25632619
() => {
@@ -2582,10 +2638,10 @@ function setupReadableStreamDefaultControllerFromSource(
25822638
FunctionPrototypeBind(start, source, controller) :
25832639
nonOpStart;
25842640
const pullAlgorithm = pull ?
2585-
createPromiseCallback('source.pull', pull, source) :
2641+
createPromiseCallback1('source.pull', pull, source) :
25862642
nonOpPull;
25872643
const cancelAlgorithm = cancel ?
2588-
createPromiseCallback('source.cancel', cancel, source) :
2644+
createPromiseCallback1('source.cancel', cancel, source) :
25892645
nonOpCancel;
25902646

25912647
setupReadableStreamDefaultController(
@@ -3236,16 +3292,23 @@ function readableByteStreamControllerCallPullIfNeeded(controller) {
32363292
}
32373293
assert(!controller[kState].pullAgain);
32383294
controller[kState].pulling = true;
3239-
PromisePrototypeThen(
3240-
controller[kState].pullAlgorithm(controller),
3241-
() => {
3295+
if (controller[kState].pullFulfilled === undefined) {
3296+
// See readableStreamDefaultControllerCallPullIfNeeded: created once,
3297+
// reused for every pull.
3298+
controller[kState].pullFulfilled = () => {
32423299
controller[kState].pulling = false;
32433300
if (controller[kState].pullAgain) {
32443301
controller[kState].pullAgain = false;
32453302
readableByteStreamControllerCallPullIfNeeded(controller);
32463303
}
3247-
},
3248-
(error) => readableByteStreamControllerError(controller, error));
3304+
};
3305+
controller[kState].pullRejected =
3306+
(error) => readableByteStreamControllerError(controller, error);
3307+
}
3308+
PromisePrototypeThen(
3309+
controller[kState].pullAlgorithm(controller),
3310+
controller[kState].pullFulfilled,
3311+
controller[kState].pullRejected);
32493312
}
32503313

32513314
function readableByteStreamControllerError(controller, error) {
@@ -3367,6 +3430,8 @@ function setupReadableByteStreamController(
33673430
closeRequested: false,
33683431
pullAgain: false,
33693432
pulling: false,
3433+
pullFulfilled: undefined,
3434+
pullRejected: undefined,
33703435
started: false,
33713436
stream,
33723437
queue: [],
@@ -3381,6 +3446,18 @@ function setupReadableByteStreamController(
33813446

33823447
const startResult = startAlgorithm();
33833448

3449+
if (startResult === null ||
3450+
(typeof startResult !== 'object' && typeof startResult !== 'function')) {
3451+
// See setupReadableStreamDefaultController.
3452+
queueMicrotask(() => {
3453+
controller[kState].started = true;
3454+
assert(!controller[kState].pulling);
3455+
assert(!controller[kState].pullAgain);
3456+
readableByteStreamControllerCallPullIfNeeded(controller);
3457+
});
3458+
return;
3459+
}
3460+
33843461
PromisePrototypeThen(
33853462
new Promise((r) => r(startResult)),
33863463
() => {
@@ -3405,10 +3482,10 @@ function setupReadableByteStreamControllerFromSource(
34053482
FunctionPrototypeBind(start, source, controller) :
34063483
nonOpStart;
34073484
const pullAlgorithm = pull ?
3408-
createPromiseCallback('source.pull', pull, source, controller) :
3485+
createPromiseCallback1('source.pull', pull, source) :
34093486
nonOpPull;
34103487
const cancelAlgorithm = cancel ?
3411-
createPromiseCallback('source.cancel', cancel, source) :
3488+
createPromiseCallback1('source.cancel', cancel, source) :
34123489
nonOpCancel;
34133490

34143491
if (autoAllocateChunkSize === 0) {

lib/internal/webstreams/transformstream.js

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ const {
4343
} = require('internal/worker/js_transferable');
4444

4545
const {
46-
createPromiseCallback,
46+
createPromiseCallback1,
47+
createPromiseCallback2,
4748
customInspect,
4849
extractHighWaterMark,
4950
extractSizeAlgorithm,
@@ -462,13 +463,13 @@ function setupTransformStreamDefaultControllerFromTransformer(
462463
const flush = transformer?.flush;
463464
const cancel = transformer?.cancel;
464465
const transformAlgorithm = transform ?
465-
createPromiseCallback('transformer.transform', transform, transformer) :
466+
createPromiseCallback2('transformer.transform', transform, transformer) :
466467
defaultTransformAlgorithm;
467468
const flushAlgorithm = flush ?
468-
createPromiseCallback('transformer.flush', flush, transformer) :
469+
createPromiseCallback1('transformer.flush', flush, transformer) :
469470
nonOpFlush;
470471
const cancelAlgorithm = cancel ?
471-
createPromiseCallback('transformer.cancel', cancel, transformer) :
472+
createPromiseCallback1('transformer.cancel', cancel, transformer) :
472473
nonOpCancel;
473474

474475
setupTransformStreamDefaultController(

lib/internal/webstreams/util.js

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ const {
77
ArrayPrototypePush,
88
ArrayPrototypeShift,
99
AsyncIteratorPrototype,
10+
FunctionPrototypeCall,
1011
MathMax,
1112
NumberIsNaN,
1213
PromisePrototypeThen,
13-
ReflectApply,
1414
ReflectGet,
1515
Symbol,
1616
Uint8Array,
@@ -69,8 +69,12 @@ function extractHighWaterMark(value, defaultHWM) {
6969
return coercedValue;
7070
}
7171

72+
// The default size algorithm is never exposed to user code, so a single
73+
// shared function avoids one closure allocation per stream.
74+
const defaultSizeAlgorithm = () => 1;
75+
7276
function extractSizeAlgorithm(size) {
73-
if (size === undefined) return () => 1;
77+
if (size === undefined) return defaultSizeAlgorithm;
7478
validateFunction(size, 'strategy.size');
7579
return size;
7680
}
@@ -169,9 +173,25 @@ function enqueueValueWithSize(controller, value, size) {
169173
controller[kState].queueTotalSize += size;
170174
}
171175

172-
function createPromiseCallback(name, fn, thisArg) {
176+
// Arity-specialized variants of the promise-callback wrapper. The generic
177+
// rest-parameter + ReflectApply form allocated an arguments array on every
178+
// invocation; these run on per-chunk hot paths (pull/write/transform), so
179+
// each known call-site arity gets its own wrapper. The exact number of
180+
// arguments passed through to the user callback is observable and must be
181+
// preserved.
182+
function createPromiseCallback0(name, fn, thisArg) {
183+
validateFunction(fn, name);
184+
return async () => FunctionPrototypeCall(fn, thisArg);
185+
}
186+
187+
function createPromiseCallback1(name, fn, thisArg) {
188+
validateFunction(fn, name);
189+
return async (arg) => FunctionPrototypeCall(fn, thisArg, arg);
190+
}
191+
192+
function createPromiseCallback2(name, fn, thisArg) {
173193
validateFunction(fn, name);
174-
return async (...args) => ReflectApply(fn, thisArg, args);
194+
return async (arg1, arg2) => FunctionPrototypeCall(fn, thisArg, arg1, arg2);
175195
}
176196

177197
function isPromisePending(promise) {
@@ -213,8 +233,11 @@ module.exports = {
213233
canCopyArrayBuffer,
214234
cloneAsUint8Array,
215235
copyArrayBuffer,
216-
createPromiseCallback,
236+
createPromiseCallback0,
237+
createPromiseCallback1,
238+
createPromiseCallback2,
217239
customInspect,
240+
defaultSizeAlgorithm,
218241
dequeueValue,
219242
enqueueValueWithSize,
220243
extractHighWaterMark,

0 commit comments

Comments
 (0)