From 35020f2b4b30ba6d49c43f47a34102e2c86b2fb1 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Sat, 23 May 2026 09:58:06 +0200 Subject: [PATCH 1/8] fix: support Node 26 and legacy global dispatcher Signed-off-by: Matteo Collina --- .github/workflows/ci.yml | 6 +- .github/workflows/nodejs-shared.yml | 3 + .github/workflows/nodejs.yml | 10 ++ index.js | 2 + lib/dispatcher/dispatcher1-wrapper.js | 122 ++++++++++++++++ lib/global.js | 14 +- scripts/run-node-test-ci.mjs | 152 ++++++++++++++++++++ test/node-test/client-errors.js | 22 ++- test/node-test/global-dispatcher-version.js | 128 +++++++++++++++++ test/types/global-dispatcher.test-d.ts | 3 +- types/dispatcher1-wrapper.d.ts | 7 + types/index.d.ts | 4 +- 12 files changed, 465 insertions(+), 8 deletions(-) create mode 100644 lib/dispatcher/dispatcher1-wrapper.js create mode 100644 scripts/run-node-test-ci.mjs create mode 100644 test/node-test/global-dispatcher-version.js create mode 100644 types/dispatcher1-wrapper.d.ts diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1262b8a023e..db3eb50f76e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -59,7 +59,7 @@ jobs: fail-fast: false max-parallel: 0 matrix: - node-version: ['20', '22', '24', '25'] + node-version: ['20', '22', '24', '25', '26'] runs-on: ['ubuntu-latest', 'windows-latest', 'macos-latest'] exclude: - node-version: '20' @@ -78,7 +78,7 @@ jobs: fail-fast: false max-parallel: 0 matrix: - node-version: ['24', '25'] + node-version: ['24', '25', '26'] runs-on: ['ubuntu-latest'] uses: ./.github/workflows/nodejs.yml with: @@ -273,7 +273,7 @@ jobs: fail-fast: false max-parallel: 0 matrix: - node-version: ['24', '25'] + node-version: ['24', '26'] runs-on: ['ubuntu-latest'] with: node-version: ${{ matrix.node-version }} diff --git a/.github/workflows/nodejs-shared.yml b/.github/workflows/nodejs-shared.yml index 36e1fd57ae6..e8ec93a62bc 100644 --- a/.github/workflows/nodejs-shared.yml +++ b/.github/workflows/nodejs-shared.yml @@ -81,6 +81,9 @@ jobs: rm -rf deps/undici ./configure --shared-builtin-undici/undici-path ${{ github.workspace }}/undici/loader.js --ninja --prefix=./final make + if grep -q '^build-ffi-tests:' Makefile; then + make build-ffi-tests + fi make install echo "$(pwd)/final/bin" >> $GITHUB_PATH diff --git a/.github/workflows/nodejs.yml b/.github/workflows/nodejs.yml index c07c16845f3..42988038547 100644 --- a/.github/workflows/nodejs.yml +++ b/.github/workflows/nodejs.yml @@ -79,12 +79,22 @@ jobs: UNDICI_NO_WASM_SIMD: ${{ inputs['no-wasm-simd'] }} - name: Test node-test + if: inputs.node-version != '26' run: npm run test:node-test id: test-node-test env: CI: true NODE_V8_COVERAGE: ${{ inputs.codecov == true && './coverage/tmp' || '' }} UNDICI_NO_WASM_SIMD: ${{ inputs['no-wasm-simd'] }} + + - name: Test node-test (Node 26 diagnostic runner) + if: inputs.node-version == '26' + run: node scripts/run-node-test-ci.mjs + id: test-node-test-node26 + env: + CI: true + NODE_V8_COVERAGE: ${{ inputs.codecov == true && './coverage/tmp' || '' }} + UNDICI_NO_WASM_SIMD: ${{ inputs['no-wasm-simd'] }} - name: Test fetch run: npm run test:fetch diff --git a/index.js b/index.js index 708a8ee80c5..04e73389211 100644 --- a/index.js +++ b/index.js @@ -6,6 +6,7 @@ const Pool = require('./lib/dispatcher/pool') const BalancedPool = require('./lib/dispatcher/balanced-pool') const RoundRobinPool = require('./lib/dispatcher/round-robin-pool') const Agent = require('./lib/dispatcher/agent') +const Dispatcher1Wrapper = require('./lib/dispatcher/dispatcher1-wrapper') const ProxyAgent = require('./lib/dispatcher/proxy-agent') const Socks5ProxyAgent = require('./lib/dispatcher/socks5-proxy-agent') const EnvHttpProxyAgent = require('./lib/dispatcher/env-http-proxy-agent') @@ -35,6 +36,7 @@ module.exports.Pool = Pool module.exports.BalancedPool = BalancedPool module.exports.RoundRobinPool = RoundRobinPool module.exports.Agent = Agent +module.exports.Dispatcher1Wrapper = Dispatcher1Wrapper module.exports.ProxyAgent = ProxyAgent module.exports.Socks5ProxyAgent = Socks5ProxyAgent module.exports.EnvHttpProxyAgent = EnvHttpProxyAgent diff --git a/lib/dispatcher/dispatcher1-wrapper.js b/lib/dispatcher/dispatcher1-wrapper.js new file mode 100644 index 00000000000..6574cfe424f --- /dev/null +++ b/lib/dispatcher/dispatcher1-wrapper.js @@ -0,0 +1,122 @@ +'use strict' + +const Dispatcher = require('./dispatcher') +const { InvalidArgumentError } = require('../core/errors') + +class LegacyHandlerWrapper { + #handler + + constructor (handler) { + this.#handler = handler + } + + onRequestStart (controller, context) { + this.#handler.onConnect?.((reason) => controller.abort(reason), context) + } + + onRequestUpgrade (controller, statusCode, headers, socket) { + const rawHeaders = controller?.rawHeaders ?? toRawHeaders(headers ?? {}) + this.#handler.onUpgrade?.(statusCode, rawHeaders, socket) + } + + onResponseStart (controller, statusCode, headers, statusMessage) { + const rawHeaders = controller?.rawHeaders ?? toRawHeaders(headers ?? {}) + + if (this.#handler.onHeaders?.(statusCode, rawHeaders, () => controller.resume(), statusMessage) === false) { + controller.pause() + } + } + + onResponseData (controller, chunk) { + if (this.#handler.onData?.(chunk) === false) { + controller.pause() + } + } + + onResponseEnd (controller, trailers) { + const rawTrailers = controller?.rawTrailers ?? toRawHeaders(trailers ?? {}) + this.#handler.onComplete?.(rawTrailers) + } + + onResponseError (_controller, err) { + if (!this.#handler.onError) { + throw err + } + + this.#handler.onError(err) + } + + onBodySent (chunk) { + this.#handler.onBodySent?.(chunk) + } + + onRequestSent () { + this.#handler.onRequestSent?.() + } + + onResponseStarted () { + this.#handler.onResponseStarted?.() + } +} + +class Dispatcher1Wrapper extends Dispatcher { + #dispatcher + + constructor (dispatcher) { + super() + + if (!dispatcher || typeof dispatcher.dispatch !== 'function') { + throw new InvalidArgumentError('Argument dispatcher must implement dispatch') + } + + this.#dispatcher = dispatcher + } + + static wrapHandler (handler) { + if (!handler || typeof handler !== 'object') { + throw new InvalidArgumentError('handler must be an object') + } + + if (typeof handler.onRequestStart === 'function') { + return handler + } + + return new LegacyHandlerWrapper(handler) + } + + dispatch (opts, handler) { + // Legacy (v1) consumers do not support HTTP/2, so force HTTP/1.1. + // See https://github.com/nodejs/undici/issues/4989 + if (opts.allowH2 !== false) { + opts = { ...opts, allowH2: false } + } + + return this.#dispatcher.dispatch(opts, Dispatcher1Wrapper.wrapHandler(handler)) + } + + close (...args) { + return this.#dispatcher.close(...args) + } + + destroy (...args) { + return this.#dispatcher.destroy(...args) + } +} + +module.exports = Dispatcher1Wrapper + +function toRawHeaderValue (value) { + return Array.isArray(value) + ? value.map((item) => Buffer.from(item, 'latin1')) + : Buffer.from(value, 'latin1') +} + +function toRawHeaders (headers) { + const rawHeaders = [] + + for (const [key, value] of Object.entries(headers)) { + rawHeaders.push(Buffer.from(key, 'latin1'), toRawHeaderValue(value)) + } + + return rawHeaders +} diff --git a/lib/global.js b/lib/global.js index b61d779e498..81ab7d10195 100644 --- a/lib/global.js +++ b/lib/global.js @@ -2,9 +2,11 @@ // We include a version number for the Dispatcher API. In case of breaking changes, // this version number must be increased to avoid conflicts. -const globalDispatcher = Symbol.for('undici.globalDispatcher.1') +const globalDispatcher = Symbol.for('undici.globalDispatcher.2') +const legacyGlobalDispatcher = Symbol.for('undici.globalDispatcher.1') const { InvalidArgumentError } = require('./core/errors') const Agent = require('./dispatcher/agent') +const Dispatcher1Wrapper = require('./dispatcher/dispatcher1-wrapper') if (getGlobalDispatcher() === undefined) { setGlobalDispatcher(new Agent()) @@ -14,12 +16,22 @@ function setGlobalDispatcher (agent) { if (!agent || typeof agent.dispatch !== 'function') { throw new InvalidArgumentError('Argument agent must implement Agent') } + Object.defineProperty(globalThis, globalDispatcher, { value: agent, writable: true, enumerable: false, configurable: false }) + + const legacyAgent = agent instanceof Dispatcher1Wrapper ? agent : new Dispatcher1Wrapper(agent) + + Object.defineProperty(globalThis, legacyGlobalDispatcher, { + value: legacyAgent, + writable: true, + enumerable: false, + configurable: false + }) } function getGlobalDispatcher () { diff --git a/scripts/run-node-test-ci.mjs b/scripts/run-node-test-ci.mjs new file mode 100644 index 00000000000..2be1d86b0b2 --- /dev/null +++ b/scripts/run-node-test-ci.mjs @@ -0,0 +1,152 @@ +import { readdir } from 'node:fs/promises' +import { join, resolve } from 'node:path' +import { run } from 'node:test' +import Reporters from 'node:test/reporters' +import { finished } from 'node:stream/promises' +import os from 'node:os' +import process from 'node:process' + +// Match borp's workaround. +delete process.env.NODE_TEST_CONTEXT + +const cwd = process.cwd() +const root = resolve(cwd, 'test/node-test') +const concurrency = os.availableParallelism() - 1 || 1 +const timeout = 180000 +const hardTimeout = Number(process.env.NODE_TEST_CI_HARD_TIMEOUT_MS || 10 * 60 * 1000) + +function isStdioSocket (handle) { + return handle?.constructor?.name === 'Socket' && [0, 1, 2].includes(handle?._handle?.fd) +} + +function describeHandle (handle) { + const type = handle?.constructor?.name || typeof handle + + if (type === 'Socket') { + return { + type, + fd: handle?._handle?.fd, + local: handle.localAddress && handle.localPort ? `${handle.localAddress}:${handle.localPort}` : undefined, + remote: handle.remoteAddress && handle.remotePort ? `${handle.remoteAddress}:${handle.remotePort}` : undefined, + readable: handle.readable, + writable: handle.writable, + destroyed: handle.destroyed + } + } + + if (type === 'Server') { + return { + type, + listening: handle.listening, + address: typeof handle.address === 'function' ? handle.address() : undefined + } + } + + if (type === 'Timeout') { + return { + type, + idleTimeout: handle._idleTimeout, + hasRef: typeof handle.hasRef === 'function' ? handle.hasRef() : undefined + } + } + + if (type === 'ChildProcess') { + return { + type, + pid: handle.pid, + spawnfile: handle.spawnfile, + exitCode: handle.exitCode, + signalCode: handle.signalCode, + killed: handle.killed + } + } + + return { + type, + keys: Object.keys(handle || {}).slice(0, 12) + } +} + +function dumpDiagnostics (label) { + const handles = process._getActiveHandles() + .filter((handle) => !isStdioSocket(handle)) + .map(describeHandle) + const requests = process._getActiveRequests().map((request) => request?.constructor?.name || typeof request) + const resources = typeof process.getActiveResourcesInfo === 'function' + ? process.getActiveResourcesInfo() + : [] + + console.error(`[node-test-ci] ${label}`) + console.error(`[node-test-ci] active resources: ${JSON.stringify(resources)}`) + console.error(`[node-test-ci] active requests (${requests.length}): ${JSON.stringify(requests)}`) + console.error(`[node-test-ci] active handles (${handles.length}): ${JSON.stringify(handles)}`) +} + +async function collectNodeTests (dir) { + const entries = await readdir(dir, { withFileTypes: true }) + const files = [] + + for (const entry of entries) { + const path = join(dir, entry.name) + + if (entry.isDirectory()) { + files.push(...await collectNodeTests(path)) + continue + } + + if (entry.isFile() && entry.name.endsWith('.js')) { + files.push(path) + } + } + + return files +} + +const watchdog = setTimeout(() => { + console.error(`[node-test-ci] hard timeout after ${hardTimeout}ms`) + dumpDiagnostics('before forced failure') + // eslint-disable-next-line n/no-process-exit + process.exit(1) +}, hardTimeout) + +const files = await collectNodeTests(root) +files.sort() + +const stream = run({ + cwd, + files, + concurrency, + timeout, + coverage: false +}) + +stream.on('test:fail', () => { + process.exitCode = 1 +}) + +const reporters = [Reporters.spec] + +for (const Reporter of reporters) { + const reporter = Reporter.prototype && Object.getOwnPropertyDescriptor(Reporter.prototype, 'constructor') + ? new Reporter() + : Reporter + stream.compose(reporter).pipe(process.stdout) +} + +try { + await finished(stream) + clearTimeout(watchdog) + dumpDiagnostics('after stream finished') + + await new Promise((resolve) => setTimeout(resolve, 250)) + dumpDiagnostics('250ms after stream finished') + + // eslint-disable-next-line n/no-process-exit + process.exit(process.exitCode ?? 0) +} catch (err) { + clearTimeout(watchdog) + console.error(err) + dumpDiagnostics('after runner error') + // eslint-disable-next-line n/no-process-exit + process.exit(1) +} diff --git a/test/node-test/client-errors.js b/test/node-test/client-errors.js index d260f628b85..38d526d9e8b 100644 --- a/test/node-test/client-errors.js +++ b/test/node-test/client-errors.js @@ -143,7 +143,16 @@ function errorAndPipelining (type) { p.strictEqual('a string', Buffer.concat(bufs).toString('utf8')) }) - server.once('request', (req, res) => { + server.on('request', function onRequest (req, res) { + if (req.method === 'POST') { + // Node.js 26 can surface a retried POST attempt before the queued GET. + // Tear it down immediately and keep waiting for the follow-up GET. + req.resume() + req.socket?.destroy() + return + } + + server.removeListener('request', onRequest) p.strictEqual('/', req.url) p.strictEqual('GET', req.method) res.setHeader('content-type', 'text/plain') @@ -218,7 +227,16 @@ function errorAndChunkedEncodingPipelining (type) { p.strictEqual('a string', Buffer.concat(bufs).toString('utf8')) }) - server.once('request', (req, res) => { + server.on('request', function onRequest (req, res) { + if (req.method === 'POST') { + // Node.js 26 can surface a retried POST attempt before the queued GET. + // Tear it down immediately and keep waiting for the follow-up GET. + req.resume() + req.socket?.destroy() + return + } + + server.removeListener('request', onRequest) p.strictEqual('/', req.url) p.strictEqual('GET', req.method) res.setHeader('content-type', 'text/plain') diff --git a/test/node-test/global-dispatcher-version.js b/test/node-test/global-dispatcher-version.js new file mode 100644 index 00000000000..e20a181a4dd --- /dev/null +++ b/test/node-test/global-dispatcher-version.js @@ -0,0 +1,128 @@ +'use strict' + +const assert = require('node:assert') +const { test } = require('node:test') +const { spawnSync } = require('node:child_process') +const { join } = require('node:path') + +const cwd = join(__dirname, '../..') + +function runNode (source) { + return spawnSync(process.execPath, ['-e', source], { + cwd, + encoding: 'utf8' + }) +} + +test('setGlobalDispatcher does not break Node.js global fetch', () => { + const script = ` + const { Agent, setGlobalDispatcher } = require('./index.js') + const http = require('node:http') + const { once } = require('node:events') + + ;(async () => { + const server = http.createServer((req, res) => res.end('ok')) + server.listen(0) + await once(server, 'listening') + + setGlobalDispatcher(new Agent()) + const url = 'http://127.0.0.1:' + server.address().port + const res = await fetch(url) + process.stdout.write(await res.text()) + + server.close() + })().catch((err) => { + console.error(err?.cause?.stack || err?.stack || err) + process.exit(1) + }) + ` + + const result = runNode(script) + assert.strictEqual(result.status, 0, result.stderr) + assert.strictEqual(result.stdout, 'ok') +}) + +test('setGlobalDispatcher mirrors a v1-compatible dispatcher that Node.js global fetch uses', () => { + const script = ` + const { Agent, Dispatcher1Wrapper, setGlobalDispatcher } = require('./index.js') + const http = require('node:http') + const { once } = require('node:events') + + ;(async () => { + const dispatcherV1Symbol = Symbol.for('undici.globalDispatcher.1') + const dispatcherV2Symbol = Symbol.for('undici.globalDispatcher.2') + const server = http.createServer((req, res) => res.end('ok')) + server.listen(0) + await once(server, 'listening') + + let count = 0 + class CountingAgent extends Agent { + dispatch (opts, handler) { + count++ + return super.dispatch(opts, handler) + } + } + + const agent = new CountingAgent() + setGlobalDispatcher(agent) + + const dispatcherV1 = globalThis[dispatcherV1Symbol] + if (!(dispatcherV1 instanceof Dispatcher1Wrapper)) { + throw new Error('expected v1 global dispatcher to be a Dispatcher1Wrapper') + } + + const url = 'http://127.0.0.1:' + server.address().port + const res = await fetch(url) + const body = await res.text() + + process.stdout.write(JSON.stringify({ + body, + count, + mirroredV2: globalThis[dispatcherV2Symbol] === agent + })) + + server.close() + })().catch((err) => { + console.error(err?.cause?.stack || err?.stack || err) + process.exit(1) + }) + ` + + const result = runNode(script) + assert.strictEqual(result.status, 0, result.stderr) + + const payload = JSON.parse(result.stdout) + assert.strictEqual(payload.body, 'ok') + assert.strictEqual(payload.count, 1) + assert.strictEqual(payload.mirroredV2, true) +}) + +test('Dispatcher1Wrapper bridges legacy handlers to a new Agent', () => { + const script = ` + const { Agent, Dispatcher1Wrapper } = require('./index.js') + const http = require('node:http') + const { once } = require('node:events') + + ;(async () => { + const server = http.createServer((req, res) => res.end('ok')) + server.listen(0) + await once(server, 'listening') + + const dispatcherV1 = Symbol.for('undici.globalDispatcher.1') + globalThis[dispatcherV1] = new Dispatcher1Wrapper(new Agent()) + + const url = 'http://127.0.0.1:' + server.address().port + const res = await fetch(url) + process.stdout.write(await res.text()) + + server.close() + })().catch((err) => { + console.error(err?.cause?.stack || err?.stack || err) + process.exit(1) + }) + ` + + const result = runNode(script) + assert.strictEqual(result.status, 0, result.stderr) + assert.strictEqual(result.stdout, 'ok') +}) diff --git a/test/types/global-dispatcher.test-d.ts b/test/types/global-dispatcher.test-d.ts index 18f4bac2621..a9ea4636d92 100644 --- a/test/types/global-dispatcher.test-d.ts +++ b/test/types/global-dispatcher.test-d.ts @@ -1,5 +1,5 @@ import { expectAssignable } from 'tsd' -import { setGlobalDispatcher, Dispatcher, getGlobalDispatcher } from '../..' +import { setGlobalDispatcher, Dispatcher, Dispatcher1Wrapper, getGlobalDispatcher } from '../..' { expectAssignable(setGlobalDispatcher(new Dispatcher())) @@ -8,3 +8,4 @@ import { setGlobalDispatcher, Dispatcher, getGlobalDispatcher } from '../..' } expectAssignable(getGlobalDispatcher()) +expectAssignable(new Dispatcher1Wrapper(new Dispatcher())) diff --git a/types/dispatcher1-wrapper.d.ts b/types/dispatcher1-wrapper.d.ts new file mode 100644 index 00000000000..f8a7d5e0d63 --- /dev/null +++ b/types/dispatcher1-wrapper.d.ts @@ -0,0 +1,7 @@ +import Dispatcher from './dispatcher' + +export default Dispatcher1Wrapper + +declare class Dispatcher1Wrapper extends Dispatcher { + constructor (dispatcher: Dispatcher) +} diff --git a/types/index.d.ts b/types/index.d.ts index f1b66e811c4..891358f3310 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -11,6 +11,7 @@ import H2CClient from './h2c-client' import buildConnector from './connector' import errors from './errors' import Agent from './agent' +import Dispatcher1Wrapper from './dispatcher1-wrapper' import MockClient from './mock-client' import MockPool from './mock-pool' import MockAgent from './mock-agent' @@ -44,7 +45,7 @@ export { Interceptable } from './mock-interceptor' declare function globalThisInstall (): void -export { Dispatcher, BalancedPool, RoundRobinPool, Pool, Client, buildConnector, errors, Agent, request, stream, pipeline, connect, upgrade, setGlobalDispatcher, getGlobalDispatcher, setGlobalOrigin, getGlobalOrigin, interceptors, cacheStores, MockClient, MockPool, MockAgent, SnapshotAgent, MockCallHistory, MockCallHistoryLog, mockErrors, ProxyAgent, Socks5ProxyAgent, EnvHttpProxyAgent, RedirectHandler, DecoratorHandler, RetryHandler, RetryAgent, H2CClient, globalThisInstall as install } +export { Dispatcher, BalancedPool, RoundRobinPool, Pool, Client, buildConnector, errors, Agent, Dispatcher1Wrapper, request, stream, pipeline, connect, upgrade, setGlobalDispatcher, getGlobalDispatcher, setGlobalOrigin, getGlobalOrigin, interceptors, cacheStores, MockClient, MockPool, MockAgent, SnapshotAgent, MockCallHistory, MockCallHistoryLog, mockErrors, ProxyAgent, Socks5ProxyAgent, EnvHttpProxyAgent, RedirectHandler, DecoratorHandler, RetryHandler, RetryAgent, H2CClient, globalThisInstall as install } export default Undici declare namespace Undici { @@ -60,6 +61,7 @@ declare namespace Undici { const buildConnector: typeof import('./connector').default const errors: typeof import('./errors').default const Agent: typeof import('./agent').default + const Dispatcher1Wrapper: typeof import('./dispatcher1-wrapper').default const setGlobalDispatcher: typeof import('./global-dispatcher').setGlobalDispatcher const getGlobalDispatcher: typeof import('./global-dispatcher').getGlobalDispatcher const request: typeof import('./api').request From 5650cc3ab307a00655d65a44d0089148e25a5f01 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 28 May 2026 11:01:50 +0000 Subject: [PATCH 2/8] test: fix Node 26 stream error ordering --- .github/workflows/nodejs.yml | 10 --- scripts/run-node-test-ci.mjs | 152 -------------------------------- test/node-test/client-errors.js | 12 ++- 3 files changed, 10 insertions(+), 164 deletions(-) delete mode 100644 scripts/run-node-test-ci.mjs diff --git a/.github/workflows/nodejs.yml b/.github/workflows/nodejs.yml index 42988038547..c07c16845f3 100644 --- a/.github/workflows/nodejs.yml +++ b/.github/workflows/nodejs.yml @@ -79,22 +79,12 @@ jobs: UNDICI_NO_WASM_SIMD: ${{ inputs['no-wasm-simd'] }} - name: Test node-test - if: inputs.node-version != '26' run: npm run test:node-test id: test-node-test env: CI: true NODE_V8_COVERAGE: ${{ inputs.codecov == true && './coverage/tmp' || '' }} UNDICI_NO_WASM_SIMD: ${{ inputs['no-wasm-simd'] }} - - - name: Test node-test (Node 26 diagnostic runner) - if: inputs.node-version == '26' - run: node scripts/run-node-test-ci.mjs - id: test-node-test-node26 - env: - CI: true - NODE_V8_COVERAGE: ${{ inputs.codecov == true && './coverage/tmp' || '' }} - UNDICI_NO_WASM_SIMD: ${{ inputs['no-wasm-simd'] }} - name: Test fetch run: npm run test:fetch diff --git a/scripts/run-node-test-ci.mjs b/scripts/run-node-test-ci.mjs deleted file mode 100644 index 2be1d86b0b2..00000000000 --- a/scripts/run-node-test-ci.mjs +++ /dev/null @@ -1,152 +0,0 @@ -import { readdir } from 'node:fs/promises' -import { join, resolve } from 'node:path' -import { run } from 'node:test' -import Reporters from 'node:test/reporters' -import { finished } from 'node:stream/promises' -import os from 'node:os' -import process from 'node:process' - -// Match borp's workaround. -delete process.env.NODE_TEST_CONTEXT - -const cwd = process.cwd() -const root = resolve(cwd, 'test/node-test') -const concurrency = os.availableParallelism() - 1 || 1 -const timeout = 180000 -const hardTimeout = Number(process.env.NODE_TEST_CI_HARD_TIMEOUT_MS || 10 * 60 * 1000) - -function isStdioSocket (handle) { - return handle?.constructor?.name === 'Socket' && [0, 1, 2].includes(handle?._handle?.fd) -} - -function describeHandle (handle) { - const type = handle?.constructor?.name || typeof handle - - if (type === 'Socket') { - return { - type, - fd: handle?._handle?.fd, - local: handle.localAddress && handle.localPort ? `${handle.localAddress}:${handle.localPort}` : undefined, - remote: handle.remoteAddress && handle.remotePort ? `${handle.remoteAddress}:${handle.remotePort}` : undefined, - readable: handle.readable, - writable: handle.writable, - destroyed: handle.destroyed - } - } - - if (type === 'Server') { - return { - type, - listening: handle.listening, - address: typeof handle.address === 'function' ? handle.address() : undefined - } - } - - if (type === 'Timeout') { - return { - type, - idleTimeout: handle._idleTimeout, - hasRef: typeof handle.hasRef === 'function' ? handle.hasRef() : undefined - } - } - - if (type === 'ChildProcess') { - return { - type, - pid: handle.pid, - spawnfile: handle.spawnfile, - exitCode: handle.exitCode, - signalCode: handle.signalCode, - killed: handle.killed - } - } - - return { - type, - keys: Object.keys(handle || {}).slice(0, 12) - } -} - -function dumpDiagnostics (label) { - const handles = process._getActiveHandles() - .filter((handle) => !isStdioSocket(handle)) - .map(describeHandle) - const requests = process._getActiveRequests().map((request) => request?.constructor?.name || typeof request) - const resources = typeof process.getActiveResourcesInfo === 'function' - ? process.getActiveResourcesInfo() - : [] - - console.error(`[node-test-ci] ${label}`) - console.error(`[node-test-ci] active resources: ${JSON.stringify(resources)}`) - console.error(`[node-test-ci] active requests (${requests.length}): ${JSON.stringify(requests)}`) - console.error(`[node-test-ci] active handles (${handles.length}): ${JSON.stringify(handles)}`) -} - -async function collectNodeTests (dir) { - const entries = await readdir(dir, { withFileTypes: true }) - const files = [] - - for (const entry of entries) { - const path = join(dir, entry.name) - - if (entry.isDirectory()) { - files.push(...await collectNodeTests(path)) - continue - } - - if (entry.isFile() && entry.name.endsWith('.js')) { - files.push(path) - } - } - - return files -} - -const watchdog = setTimeout(() => { - console.error(`[node-test-ci] hard timeout after ${hardTimeout}ms`) - dumpDiagnostics('before forced failure') - // eslint-disable-next-line n/no-process-exit - process.exit(1) -}, hardTimeout) - -const files = await collectNodeTests(root) -files.sort() - -const stream = run({ - cwd, - files, - concurrency, - timeout, - coverage: false -}) - -stream.on('test:fail', () => { - process.exitCode = 1 -}) - -const reporters = [Reporters.spec] - -for (const Reporter of reporters) { - const reporter = Reporter.prototype && Object.getOwnPropertyDescriptor(Reporter.prototype, 'constructor') - ? new Reporter() - : Reporter - stream.compose(reporter).pipe(process.stdout) -} - -try { - await finished(stream) - clearTimeout(watchdog) - dumpDiagnostics('after stream finished') - - await new Promise((resolve) => setTimeout(resolve, 250)) - dumpDiagnostics('250ms after stream finished') - - // eslint-disable-next-line n/no-process-exit - process.exit(process.exitCode ?? 0) -} catch (err) { - clearTimeout(watchdog) - console.error(err) - dumpDiagnostics('after runner error') - // eslint-disable-next-line n/no-process-exit - process.exit(1) -} diff --git a/test/node-test/client-errors.js b/test/node-test/client-errors.js index 38d526d9e8b..94992dc7235 100644 --- a/test/node-test/client-errors.js +++ b/test/node-test/client-errors.js @@ -175,8 +175,12 @@ function errorAndPipelining (type) { opaque: 'asd', body: maybeWrapStream(new Readable({ read () { + if (this.sent) { + return + } + this.sent = true this.push('a string') - this.destroy(new Error('kaboom')) + setImmediate(() => this.destroy(new Error('kaboom'))) } }), type) }, (err, data) => { @@ -255,8 +259,12 @@ function errorAndChunkedEncodingPipelining (type) { opaque: 'asd', body: maybeWrapStream(new Readable({ read () { + if (this.sent) { + return + } + this.sent = true this.push('a string') - this.destroy(new Error('kaboom')) + setImmediate(() => this.destroy(new Error('kaboom'))) } }), type) }, (err, data) => { From ec171319f7bb5c4220e6bab22772ab2a501662e5 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 28 May 2026 15:08:10 +0000 Subject: [PATCH 3/8] fix: mirror global dispatcher for Node fetch --- index.js | 2 - lib/dispatcher/dispatcher1-wrapper.js | 122 -------------------- lib/global.js | 5 +- test/node-test/global-dispatcher-version.js | 41 +------ test/types/global-dispatcher.test-d.ts | 3 +- types/dispatcher1-wrapper.d.ts | 7 -- types/index.d.ts | 4 +- 7 files changed, 7 insertions(+), 177 deletions(-) delete mode 100644 lib/dispatcher/dispatcher1-wrapper.js delete mode 100644 types/dispatcher1-wrapper.d.ts diff --git a/index.js b/index.js index 04e73389211..708a8ee80c5 100644 --- a/index.js +++ b/index.js @@ -6,7 +6,6 @@ const Pool = require('./lib/dispatcher/pool') const BalancedPool = require('./lib/dispatcher/balanced-pool') const RoundRobinPool = require('./lib/dispatcher/round-robin-pool') const Agent = require('./lib/dispatcher/agent') -const Dispatcher1Wrapper = require('./lib/dispatcher/dispatcher1-wrapper') const ProxyAgent = require('./lib/dispatcher/proxy-agent') const Socks5ProxyAgent = require('./lib/dispatcher/socks5-proxy-agent') const EnvHttpProxyAgent = require('./lib/dispatcher/env-http-proxy-agent') @@ -36,7 +35,6 @@ module.exports.Pool = Pool module.exports.BalancedPool = BalancedPool module.exports.RoundRobinPool = RoundRobinPool module.exports.Agent = Agent -module.exports.Dispatcher1Wrapper = Dispatcher1Wrapper module.exports.ProxyAgent = ProxyAgent module.exports.Socks5ProxyAgent = Socks5ProxyAgent module.exports.EnvHttpProxyAgent = EnvHttpProxyAgent diff --git a/lib/dispatcher/dispatcher1-wrapper.js b/lib/dispatcher/dispatcher1-wrapper.js deleted file mode 100644 index 6574cfe424f..00000000000 --- a/lib/dispatcher/dispatcher1-wrapper.js +++ /dev/null @@ -1,122 +0,0 @@ -'use strict' - -const Dispatcher = require('./dispatcher') -const { InvalidArgumentError } = require('../core/errors') - -class LegacyHandlerWrapper { - #handler - - constructor (handler) { - this.#handler = handler - } - - onRequestStart (controller, context) { - this.#handler.onConnect?.((reason) => controller.abort(reason), context) - } - - onRequestUpgrade (controller, statusCode, headers, socket) { - const rawHeaders = controller?.rawHeaders ?? toRawHeaders(headers ?? {}) - this.#handler.onUpgrade?.(statusCode, rawHeaders, socket) - } - - onResponseStart (controller, statusCode, headers, statusMessage) { - const rawHeaders = controller?.rawHeaders ?? toRawHeaders(headers ?? {}) - - if (this.#handler.onHeaders?.(statusCode, rawHeaders, () => controller.resume(), statusMessage) === false) { - controller.pause() - } - } - - onResponseData (controller, chunk) { - if (this.#handler.onData?.(chunk) === false) { - controller.pause() - } - } - - onResponseEnd (controller, trailers) { - const rawTrailers = controller?.rawTrailers ?? toRawHeaders(trailers ?? {}) - this.#handler.onComplete?.(rawTrailers) - } - - onResponseError (_controller, err) { - if (!this.#handler.onError) { - throw err - } - - this.#handler.onError(err) - } - - onBodySent (chunk) { - this.#handler.onBodySent?.(chunk) - } - - onRequestSent () { - this.#handler.onRequestSent?.() - } - - onResponseStarted () { - this.#handler.onResponseStarted?.() - } -} - -class Dispatcher1Wrapper extends Dispatcher { - #dispatcher - - constructor (dispatcher) { - super() - - if (!dispatcher || typeof dispatcher.dispatch !== 'function') { - throw new InvalidArgumentError('Argument dispatcher must implement dispatch') - } - - this.#dispatcher = dispatcher - } - - static wrapHandler (handler) { - if (!handler || typeof handler !== 'object') { - throw new InvalidArgumentError('handler must be an object') - } - - if (typeof handler.onRequestStart === 'function') { - return handler - } - - return new LegacyHandlerWrapper(handler) - } - - dispatch (opts, handler) { - // Legacy (v1) consumers do not support HTTP/2, so force HTTP/1.1. - // See https://github.com/nodejs/undici/issues/4989 - if (opts.allowH2 !== false) { - opts = { ...opts, allowH2: false } - } - - return this.#dispatcher.dispatch(opts, Dispatcher1Wrapper.wrapHandler(handler)) - } - - close (...args) { - return this.#dispatcher.close(...args) - } - - destroy (...args) { - return this.#dispatcher.destroy(...args) - } -} - -module.exports = Dispatcher1Wrapper - -function toRawHeaderValue (value) { - return Array.isArray(value) - ? value.map((item) => Buffer.from(item, 'latin1')) - : Buffer.from(value, 'latin1') -} - -function toRawHeaders (headers) { - const rawHeaders = [] - - for (const [key, value] of Object.entries(headers)) { - rawHeaders.push(Buffer.from(key, 'latin1'), toRawHeaderValue(value)) - } - - return rawHeaders -} diff --git a/lib/global.js b/lib/global.js index 81ab7d10195..651ef2c1277 100644 --- a/lib/global.js +++ b/lib/global.js @@ -6,7 +6,6 @@ const globalDispatcher = Symbol.for('undici.globalDispatcher.2') const legacyGlobalDispatcher = Symbol.for('undici.globalDispatcher.1') const { InvalidArgumentError } = require('./core/errors') const Agent = require('./dispatcher/agent') -const Dispatcher1Wrapper = require('./dispatcher/dispatcher1-wrapper') if (getGlobalDispatcher() === undefined) { setGlobalDispatcher(new Agent()) @@ -24,10 +23,8 @@ function setGlobalDispatcher (agent) { configurable: false }) - const legacyAgent = agent instanceof Dispatcher1Wrapper ? agent : new Dispatcher1Wrapper(agent) - Object.defineProperty(globalThis, legacyGlobalDispatcher, { - value: legacyAgent, + value: agent, writable: true, enumerable: false, configurable: false diff --git a/test/node-test/global-dispatcher-version.js b/test/node-test/global-dispatcher-version.js index e20a181a4dd..ed4c0a2e18a 100644 --- a/test/node-test/global-dispatcher-version.js +++ b/test/node-test/global-dispatcher-version.js @@ -42,9 +42,9 @@ test('setGlobalDispatcher does not break Node.js global fetch', () => { assert.strictEqual(result.stdout, 'ok') }) -test('setGlobalDispatcher mirrors a v1-compatible dispatcher that Node.js global fetch uses', () => { +test('setGlobalDispatcher mirrors the dispatcher under the v1 symbol that Node.js global fetch uses', () => { const script = ` - const { Agent, Dispatcher1Wrapper, setGlobalDispatcher } = require('./index.js') + const { Agent, setGlobalDispatcher } = require('./index.js') const http = require('node:http') const { once } = require('node:events') @@ -66,11 +66,6 @@ test('setGlobalDispatcher mirrors a v1-compatible dispatcher that Node.js global const agent = new CountingAgent() setGlobalDispatcher(agent) - const dispatcherV1 = globalThis[dispatcherV1Symbol] - if (!(dispatcherV1 instanceof Dispatcher1Wrapper)) { - throw new Error('expected v1 global dispatcher to be a Dispatcher1Wrapper') - } - const url = 'http://127.0.0.1:' + server.address().port const res = await fetch(url) const body = await res.text() @@ -78,6 +73,7 @@ test('setGlobalDispatcher mirrors a v1-compatible dispatcher that Node.js global process.stdout.write(JSON.stringify({ body, count, + mirroredV1: globalThis[dispatcherV1Symbol] === agent, mirroredV2: globalThis[dispatcherV2Symbol] === agent })) @@ -94,35 +90,6 @@ test('setGlobalDispatcher mirrors a v1-compatible dispatcher that Node.js global const payload = JSON.parse(result.stdout) assert.strictEqual(payload.body, 'ok') assert.strictEqual(payload.count, 1) + assert.strictEqual(payload.mirroredV1, true) assert.strictEqual(payload.mirroredV2, true) }) - -test('Dispatcher1Wrapper bridges legacy handlers to a new Agent', () => { - const script = ` - const { Agent, Dispatcher1Wrapper } = require('./index.js') - const http = require('node:http') - const { once } = require('node:events') - - ;(async () => { - const server = http.createServer((req, res) => res.end('ok')) - server.listen(0) - await once(server, 'listening') - - const dispatcherV1 = Symbol.for('undici.globalDispatcher.1') - globalThis[dispatcherV1] = new Dispatcher1Wrapper(new Agent()) - - const url = 'http://127.0.0.1:' + server.address().port - const res = await fetch(url) - process.stdout.write(await res.text()) - - server.close() - })().catch((err) => { - console.error(err?.cause?.stack || err?.stack || err) - process.exit(1) - }) - ` - - const result = runNode(script) - assert.strictEqual(result.status, 0, result.stderr) - assert.strictEqual(result.stdout, 'ok') -}) diff --git a/test/types/global-dispatcher.test-d.ts b/test/types/global-dispatcher.test-d.ts index a9ea4636d92..18f4bac2621 100644 --- a/test/types/global-dispatcher.test-d.ts +++ b/test/types/global-dispatcher.test-d.ts @@ -1,5 +1,5 @@ import { expectAssignable } from 'tsd' -import { setGlobalDispatcher, Dispatcher, Dispatcher1Wrapper, getGlobalDispatcher } from '../..' +import { setGlobalDispatcher, Dispatcher, getGlobalDispatcher } from '../..' { expectAssignable(setGlobalDispatcher(new Dispatcher())) @@ -8,4 +8,3 @@ import { setGlobalDispatcher, Dispatcher, Dispatcher1Wrapper, getGlobalDispatche } expectAssignable(getGlobalDispatcher()) -expectAssignable(new Dispatcher1Wrapper(new Dispatcher())) diff --git a/types/dispatcher1-wrapper.d.ts b/types/dispatcher1-wrapper.d.ts deleted file mode 100644 index f8a7d5e0d63..00000000000 --- a/types/dispatcher1-wrapper.d.ts +++ /dev/null @@ -1,7 +0,0 @@ -import Dispatcher from './dispatcher' - -export default Dispatcher1Wrapper - -declare class Dispatcher1Wrapper extends Dispatcher { - constructor (dispatcher: Dispatcher) -} diff --git a/types/index.d.ts b/types/index.d.ts index 891358f3310..f1b66e811c4 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -11,7 +11,6 @@ import H2CClient from './h2c-client' import buildConnector from './connector' import errors from './errors' import Agent from './agent' -import Dispatcher1Wrapper from './dispatcher1-wrapper' import MockClient from './mock-client' import MockPool from './mock-pool' import MockAgent from './mock-agent' @@ -45,7 +44,7 @@ export { Interceptable } from './mock-interceptor' declare function globalThisInstall (): void -export { Dispatcher, BalancedPool, RoundRobinPool, Pool, Client, buildConnector, errors, Agent, Dispatcher1Wrapper, request, stream, pipeline, connect, upgrade, setGlobalDispatcher, getGlobalDispatcher, setGlobalOrigin, getGlobalOrigin, interceptors, cacheStores, MockClient, MockPool, MockAgent, SnapshotAgent, MockCallHistory, MockCallHistoryLog, mockErrors, ProxyAgent, Socks5ProxyAgent, EnvHttpProxyAgent, RedirectHandler, DecoratorHandler, RetryHandler, RetryAgent, H2CClient, globalThisInstall as install } +export { Dispatcher, BalancedPool, RoundRobinPool, Pool, Client, buildConnector, errors, Agent, request, stream, pipeline, connect, upgrade, setGlobalDispatcher, getGlobalDispatcher, setGlobalOrigin, getGlobalOrigin, interceptors, cacheStores, MockClient, MockPool, MockAgent, SnapshotAgent, MockCallHistory, MockCallHistoryLog, mockErrors, ProxyAgent, Socks5ProxyAgent, EnvHttpProxyAgent, RedirectHandler, DecoratorHandler, RetryHandler, RetryAgent, H2CClient, globalThisInstall as install } export default Undici declare namespace Undici { @@ -61,7 +60,6 @@ declare namespace Undici { const buildConnector: typeof import('./connector').default const errors: typeof import('./errors').default const Agent: typeof import('./agent').default - const Dispatcher1Wrapper: typeof import('./dispatcher1-wrapper').default const setGlobalDispatcher: typeof import('./global-dispatcher').setGlobalDispatcher const getGlobalDispatcher: typeof import('./global-dispatcher').getGlobalDispatcher const request: typeof import('./api').request From 1f9b9b308b870e35aa1f28dafe99afcc59df9024 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 28 May 2026 20:11:42 +0000 Subject: [PATCH 4/8] test: avoid stream error ordering assumptions --- test/node-test/client-errors.js | 58 +++++++++++++++++++++------------ 1 file changed, 38 insertions(+), 20 deletions(-) diff --git a/test/node-test/client-errors.js b/test/node-test/client-errors.js index 94992dc7235..5b75fcd58f7 100644 --- a/test/node-test/client-errors.js +++ b/test/node-test/client-errors.js @@ -127,6 +127,9 @@ function errorAndPipelining (type) { const p = tspl(t, { plan: 12 }) const server = createServer({ joinDuplicateHeaders: true }) + let body + let bodyErrorSent = false + server.once('request', (req, res) => { p.strictEqual('/', req.url) p.strictEqual('POST', req.method) @@ -135,6 +138,11 @@ function errorAndPipelining (type) { const bufs = [] req.on('data', (buf) => { bufs.push(buf) + + if (!bodyErrorSent) { + bodyErrorSent = true + setImmediate(() => body.destroy(new Error('kaboom'))) + } }) req.on('aborted', () => { @@ -165,6 +173,16 @@ function errorAndPipelining (type) { const client = new Client(`http://localhost:${server.address().port}`) t.after(client.destroy.bind(client)) + body = new Readable({ + read () { + if (this.sent) { + return + } + this.sent = true + this.push('a string') + } + }) + client.request({ path: '/', method: 'POST', @@ -173,16 +191,7 @@ function errorAndPipelining (type) { 'content-length': 42 }, opaque: 'asd', - body: maybeWrapStream(new Readable({ - read () { - if (this.sent) { - return - } - this.sent = true - this.push('a string') - setImmediate(() => this.destroy(new Error('kaboom'))) - } - }), type) + body: maybeWrapStream(body, type) }, (err, data) => { p.strictEqual(err.message, 'kaboom') p.strictEqual(data.opaque, 'asd') @@ -215,6 +224,9 @@ function errorAndChunkedEncodingPipelining (type) { const p = tspl(t, { plan: 12 }) const server = createServer({ joinDuplicateHeaders: true }) + let body + let bodyErrorSent = false + server.once('request', (req, res) => { p.strictEqual('/', req.url) p.strictEqual('POST', req.method) @@ -223,6 +235,11 @@ function errorAndChunkedEncodingPipelining (type) { const bufs = [] req.on('data', (buf) => { bufs.push(buf) + + if (!bodyErrorSent) { + bodyErrorSent = true + setImmediate(() => body.destroy(new Error('kaboom'))) + } }) req.on('aborted', () => { @@ -253,20 +270,21 @@ function errorAndChunkedEncodingPipelining (type) { const client = new Client(`http://localhost:${server.address().port}`) t.after(client.destroy.bind(client)) + body = new Readable({ + read () { + if (this.sent) { + return + } + this.sent = true + this.push('a string') + } + }) + client.request({ path: '/', method: 'POST', opaque: 'asd', - body: maybeWrapStream(new Readable({ - read () { - if (this.sent) { - return - } - this.sent = true - this.push('a string') - setImmediate(() => this.destroy(new Error('kaboom'))) - } - }), type) + body: maybeWrapStream(body, type) }, (err, data) => { p.strictEqual(err.message, 'kaboom') p.strictEqual(data.opaque, 'asd') From b33f13bb7e39a5fdb58a8165883545668786826e Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Fri, 29 May 2026 07:57:32 +0000 Subject: [PATCH 5/8] test: assert queued request ordering --- test/node-test/client-errors.js | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/test/node-test/client-errors.js b/test/node-test/client-errors.js index 5b75fcd58f7..856bacb35a0 100644 --- a/test/node-test/client-errors.js +++ b/test/node-test/client-errors.js @@ -151,16 +151,7 @@ function errorAndPipelining (type) { p.strictEqual('a string', Buffer.concat(bufs).toString('utf8')) }) - server.on('request', function onRequest (req, res) { - if (req.method === 'POST') { - // Node.js 26 can surface a retried POST attempt before the queued GET. - // Tear it down immediately and keep waiting for the follow-up GET. - req.resume() - req.socket?.destroy() - return - } - - server.removeListener('request', onRequest) + server.once('request', (req, res) => { p.strictEqual('/', req.url) p.strictEqual('GET', req.method) res.setHeader('content-type', 'text/plain') @@ -248,16 +239,7 @@ function errorAndChunkedEncodingPipelining (type) { p.strictEqual('a string', Buffer.concat(bufs).toString('utf8')) }) - server.on('request', function onRequest (req, res) { - if (req.method === 'POST') { - // Node.js 26 can surface a retried POST attempt before the queued GET. - // Tear it down immediately and keep waiting for the follow-up GET. - req.resume() - req.socket?.destroy() - return - } - - server.removeListener('request', onRequest) + server.once('request', (req, res) => { p.strictEqual('/', req.url) p.strictEqual('GET', req.method) res.setHeader('content-type', 'text/plain') From f9504cd30b4e8bb65de5ba99570cb82f42f5dbe0 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Fri, 29 May 2026 08:28:09 +0000 Subject: [PATCH 6/8] fix: avoid eager h1 stream body reads --- lib/dispatcher/client-h1.js | 5 ---- test/node-test/client-errors.js | 50 ++++++++------------------------- 2 files changed, 12 insertions(+), 43 deletions(-) diff --git a/lib/dispatcher/client-h1.js b/lib/dispatcher/client-h1.js index 10acac37dbf..3da6f9ae2a2 100644 --- a/lib/dispatcher/client-h1.js +++ b/lib/dispatcher/client-h1.js @@ -1071,11 +1071,6 @@ function writeH1 (client, request) { headers.push('content-type', body.type) } - if (body && typeof body.read === 'function') { - // Try to read EOF in order to get length. - body.read(0) - } - const bodyLength = util.bodyLength(body) contentLength = bodyLength ?? contentLength diff --git a/test/node-test/client-errors.js b/test/node-test/client-errors.js index 856bacb35a0..d260f628b85 100644 --- a/test/node-test/client-errors.js +++ b/test/node-test/client-errors.js @@ -127,9 +127,6 @@ function errorAndPipelining (type) { const p = tspl(t, { plan: 12 }) const server = createServer({ joinDuplicateHeaders: true }) - let body - let bodyErrorSent = false - server.once('request', (req, res) => { p.strictEqual('/', req.url) p.strictEqual('POST', req.method) @@ -138,11 +135,6 @@ function errorAndPipelining (type) { const bufs = [] req.on('data', (buf) => { bufs.push(buf) - - if (!bodyErrorSent) { - bodyErrorSent = true - setImmediate(() => body.destroy(new Error('kaboom'))) - } }) req.on('aborted', () => { @@ -164,16 +156,6 @@ function errorAndPipelining (type) { const client = new Client(`http://localhost:${server.address().port}`) t.after(client.destroy.bind(client)) - body = new Readable({ - read () { - if (this.sent) { - return - } - this.sent = true - this.push('a string') - } - }) - client.request({ path: '/', method: 'POST', @@ -182,7 +164,12 @@ function errorAndPipelining (type) { 'content-length': 42 }, opaque: 'asd', - body: maybeWrapStream(body, type) + body: maybeWrapStream(new Readable({ + read () { + this.push('a string') + this.destroy(new Error('kaboom')) + } + }), type) }, (err, data) => { p.strictEqual(err.message, 'kaboom') p.strictEqual(data.opaque, 'asd') @@ -215,9 +202,6 @@ function errorAndChunkedEncodingPipelining (type) { const p = tspl(t, { plan: 12 }) const server = createServer({ joinDuplicateHeaders: true }) - let body - let bodyErrorSent = false - server.once('request', (req, res) => { p.strictEqual('/', req.url) p.strictEqual('POST', req.method) @@ -226,11 +210,6 @@ function errorAndChunkedEncodingPipelining (type) { const bufs = [] req.on('data', (buf) => { bufs.push(buf) - - if (!bodyErrorSent) { - bodyErrorSent = true - setImmediate(() => body.destroy(new Error('kaboom'))) - } }) req.on('aborted', () => { @@ -252,21 +231,16 @@ function errorAndChunkedEncodingPipelining (type) { const client = new Client(`http://localhost:${server.address().port}`) t.after(client.destroy.bind(client)) - body = new Readable({ - read () { - if (this.sent) { - return - } - this.sent = true - this.push('a string') - } - }) - client.request({ path: '/', method: 'POST', opaque: 'asd', - body: maybeWrapStream(body, type) + body: maybeWrapStream(new Readable({ + read () { + this.push('a string') + this.destroy(new Error('kaboom')) + } + }), type) }, (err, data) => { p.strictEqual(err.message, 'kaboom') p.strictEqual(data.opaque, 'asd') From 67f2cfb184ac213663a283fd24cfe8d7b5be57f1 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Fri, 29 May 2026 10:31:04 +0000 Subject: [PATCH 7/8] fix: keep h1 length probe for no-payload methods --- lib/dispatcher/client-h1.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/dispatcher/client-h1.js b/lib/dispatcher/client-h1.js index 3da6f9ae2a2..64619b1f101 100644 --- a/lib/dispatcher/client-h1.js +++ b/lib/dispatcher/client-h1.js @@ -1071,6 +1071,11 @@ function writeH1 (client, request) { headers.push('content-type', body.type) } + if (!expectsPayload && body && typeof body.read === 'function') { + // Try to read EOF in order to get length for methods that do not expect a payload. + body.read(0) + } + const bodyLength = util.bodyLength(body) contentLength = bodyLength ?? contentLength From 3d93b7360e9fbdfa641dd473d71a6be7daaf7077 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Fri, 29 May 2026 15:38:07 +0000 Subject: [PATCH 8/8] test: lift stream reconnect coverage from main --- lib/dispatcher/client-h1.js | 4 +- test/node-test/client-errors.js | 106 +++++++++++++++++++------------- 2 files changed, 65 insertions(+), 45 deletions(-) diff --git a/lib/dispatcher/client-h1.js b/lib/dispatcher/client-h1.js index 64619b1f101..10acac37dbf 100644 --- a/lib/dispatcher/client-h1.js +++ b/lib/dispatcher/client-h1.js @@ -1071,8 +1071,8 @@ function writeH1 (client, request) { headers.push('content-type', body.type) } - if (!expectsPayload && body && typeof body.read === 'function') { - // Try to read EOF in order to get length for methods that do not expect a payload. + if (body && typeof body.read === 'function') { + // Try to read EOF in order to get length. body.read(0) } diff --git a/test/node-test/client-errors.js b/test/node-test/client-errors.js index d260f628b85..90813539d8a 100644 --- a/test/node-test/client-errors.js +++ b/test/node-test/client-errors.js @@ -122,33 +122,71 @@ test('GET errors and reconnect with pipelining 3', async (t) => { await p.completed }) -function errorAndPipelining (type) { - test(`POST with a ${type} that errors and pipelining 1 should reconnect`, async (t) => { - const p = tspl(t, { plan: 12 }) +function installErrorAndReconnectServer (server, p, { contentLength, trackPostWithPlan }) { + let sawPost = false + let sawGet = false - const server = createServer({ joinDuplicateHeaders: true }) - server.once('request', (req, res) => { + server.on('request', (req, res) => { + if (req.method === 'GET') { + if (sawGet) { + req.socket?.destroy() + return + } + + sawGet = true + p.strictEqual('/', req.url) + p.strictEqual('GET', req.method) + res.setHeader('content-type', 'text/plain') + res.end('hello') + return + } + + if (sawPost) { + // Node.js 26 can surface additional POST attempts around the queued GET. + // Tear them down and keep the test focused on the reconnect behavior. + req.resume() + req.socket?.destroy() + return + } + + sawPost = true + + if (trackPostWithPlan) { p.strictEqual('/', req.url) p.strictEqual('POST', req.method) - p.strictEqual('42', req.headers['content-length']) + p.strictEqual(req.headers['content-length'], contentLength) + } else { + assert.strictEqual('/', req.url) + assert.strictEqual('POST', req.method) + assert.strictEqual(req.headers['content-length'], contentLength) + } - const bufs = [] - req.on('data', (buf) => { - bufs.push(buf) - }) + const bufs = [] + req.on('data', (buf) => { + bufs.push(buf) + }) - req.on('aborted', () => { - // we will abruptly close the connection here - // but this will still end + req.on('aborted', () => { + // we will abruptly close the connection here + // but this will still end + if (trackPostWithPlan) { p.strictEqual('a string', Buffer.concat(bufs).toString('utf8')) - }) + } else { + assert.strictEqual('a string', Buffer.concat(bufs).toString('utf8')) + } + }) + }) +} - server.once('request', (req, res) => { - p.strictEqual('/', req.url) - p.strictEqual('GET', req.method) - res.setHeader('content-type', 'text/plain') - res.end('hello') - }) +function errorAndPipelining (type) { + test(`POST with a ${type} that errors and pipelining 1 should reconnect`, async (t) => { + const trackPostWithPlan = type !== consts.STREAM + const p = tspl(t, { plan: trackPostWithPlan ? 12 : 8 }) + + const server = createServer({ joinDuplicateHeaders: true }) + installErrorAndReconnectServer(server, p, { + contentLength: '42', + trackPostWithPlan }) t.after(closeServerAsPromise(server)) @@ -199,31 +237,13 @@ errorAndPipelining(consts.ASYNC_ITERATOR) function errorAndChunkedEncodingPipelining (type) { test(`POST with chunked encoding, ${type} body that errors and pipelining 1 should reconnect`, async (t) => { - const p = tspl(t, { plan: 12 }) + const trackPostWithPlan = type !== consts.STREAM + const p = tspl(t, { plan: trackPostWithPlan ? 12 : 8 }) const server = createServer({ joinDuplicateHeaders: true }) - server.once('request', (req, res) => { - p.strictEqual('/', req.url) - p.strictEqual('POST', req.method) - p.strictEqual(req.headers['content-length'], undefined) - - const bufs = [] - req.on('data', (buf) => { - bufs.push(buf) - }) - - req.on('aborted', () => { - // we will abruptly close the connection here - // but this will still end - p.strictEqual('a string', Buffer.concat(bufs).toString('utf8')) - }) - - server.once('request', (req, res) => { - p.strictEqual('/', req.url) - p.strictEqual('GET', req.method) - res.setHeader('content-type', 'text/plain') - res.end('hello') - }) + installErrorAndReconnectServer(server, p, { + contentLength: undefined, + trackPostWithPlan }) t.after(closeServerAsPromise(server))