diff --git a/lib/internal/webstreams/transformstream.js b/lib/internal/webstreams/transformstream.js index 371b597dc681d6..e2d15c05bca249 100644 --- a/lib/internal/webstreams/transformstream.js +++ b/lib/internal/webstreams/transformstream.js @@ -72,7 +72,6 @@ const { const assert = require('internal/assert'); const kSkipThrow = Symbol('kSkipThrow'); - const getNonWritablePropertyDescriptor = (value) => { return { __proto__: null, @@ -524,7 +523,12 @@ function transformStreamDefaultControllerError(controller, error) { async function transformStreamDefaultControllerPerformTransform(controller, chunk) { try { - return await controller[kState].transformAlgorithm(chunk, controller); + const transformAlgorithm = controller[kState].transformAlgorithm; + if (transformAlgorithm === undefined) { + // Algorithms were cleared by a concurrent cancel/abort/close. + return; + } + return await transformAlgorithm(chunk, controller); } catch (error) { transformStreamError(controller[kState].stream, error); throw error; diff --git a/test/parallel/test-whatwg-transformstream-cancel-write-race.js b/test/parallel/test-whatwg-transformstream-cancel-write-race.js new file mode 100644 index 00000000000000..5c32a25b9246f9 --- /dev/null +++ b/test/parallel/test-whatwg-transformstream-cancel-write-race.js @@ -0,0 +1,51 @@ +'use strict'; + +require('../common'); +const { test } = require('node:test'); +const assert = require('node:assert'); +const { TransformStream } = require('stream/web'); +const { setTimeout } = require('timers/promises'); + +// https://github.com/nodejs/node/issues/62036 + +test('Late write racing with reader.cancel() should not throw an internal TypeError', async () => { + const stream = new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk); + }, + }); + + await setTimeout(0); + + const reader = stream.readable.getReader(); + const writer = stream.writable.getWriter(); + + // Release backpressure. + const pendingRead = reader.read(); + + // Simulate client disconnect / shutdown. + const pendingCancel = reader.cancel(new Error('client disconnected')); + + // Late write racing with cancel. + const pendingLateWrite = writer.write('late-write'); + + const [ + readResult, + cancelResult, + lateWriteResult, + ] = await Promise.allSettled([ + pendingRead, + pendingCancel, + pendingLateWrite, + ]); + + assert.strictEqual(readResult.status, 'fulfilled'); + assert.strictEqual(cancelResult.status, 'fulfilled'); + if (lateWriteResult.status === 'rejected') { + const err = lateWriteResult.reason; + const isNotAFunction = err instanceof TypeError && + /transformAlgorithm is not a function/.test(err.message); + assert.ok(!isNotAFunction, + `Internal implementation error leaked: ${err.message}`); + } +});