diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index 275e202f76b..e378010513a 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -84,6 +84,29 @@ function getGoAwayError (session, errorCode) { : new SocketError(`HTTP/2: "GOAWAY" frame received with code ${errorCode}`, util.getSocketInfo(session[kSocket]))) } +function resetHttp2Session (session, err) { + const client = session[kClient] + const socket = session[kSocket] + + if (client[kHTTP2Session] === session) { + client[kSocket] = null + client[kHTTPContext] = null + client[kHTTP2Session] = null + } + + if (socket != null && socket[kError] == null) { + socket[kError] = err + } + + if (!session.closed && !session.destroyed) { + try { + session.destroy(err) + } catch {} + } + + util.destroy(socket, err) +} + function getGoAwayPendingIdx (client, lastStreamID) { const maxAcceptedStreamID = Number.isInteger(lastStreamID) ? lastStreamID : Number.MAX_SAFE_INTEGER @@ -125,6 +148,10 @@ function clearRequestStream (request) { cleanup?.(stream) } +function requeueUnsentRequest (client, request) { + client[kQueue].splice(client[kPendingIdx] + 1, 0, request) +} + function canRetryRequestAfterGoAway (request) { const { body } = request @@ -740,6 +767,16 @@ function writeH2 (client, request) { try { return session.request(headers, options) } catch (err) { + if (err?.code === 'ERR_HTTP2_INVALID_SESSION') { + const wrappedErr = new SocketError(err.message, util.getSocketInfo(session[kSocket])) + wrappedErr.cause = err + session[kError] = wrappedErr + resetHttp2Session(session, wrappedErr) + requeueUnsentRequest(client, request) + + return null + } + const wrappedErr = new InformationalError(err.message, { cause: err }) session[kError] = wrappedErr session[kSocket][kError] = wrappedErr diff --git a/test/http2-invalid-session.js b/test/http2-invalid-session.js new file mode 100644 index 00000000000..06c0b7e53a9 --- /dev/null +++ b/test/http2-invalid-session.js @@ -0,0 +1,157 @@ +'use strict' + +const { test, after } = require('node:test') +const { EventEmitter, once } = require('node:events') +const { createSecureServer } = require('node:http2') +const { tspl } = require('@matteo.collina/tspl') + +const pem = require('@metcoder95/https-pem') + +const { Client } = require('..') + +test('invalid HTTP/2 session stream creation is requeued on a fresh session', async (t) => { + t = tspl(t, { plan: 5 }) + + const http2 = require('node:http2') + const originalConnect = http2.connect + + class FakeSession extends EventEmitter { + constructor () { + super() + this.closed = false + this.destroyed = false + } + + request () { + const err = new Error('The session has been destroyed') + err.code = 'ERR_HTTP2_INVALID_SESSION' + throw err + } + + destroy () { + if (this.destroyed) { + return + } + + this.destroyed = true + this.emit('close') + } + + ref () {} + unref () {} + } + + const session = new FakeSession() + let connectCalls = 0 + let streams = 0 + + http2.connect = function connectStub (...args) { + connectCalls++ + + if (connectCalls === 1) { + return session + } + + return originalConnect.apply(this, args) + } + + after(() => { + http2.connect = originalConnect + }) + + const server = createSecureServer(await pem.generate({ opts: { keySize: 2048 } })) + server.on('stream', (stream) => { + streams++ + stream.respond({ ':status': 200 }) + stream.end('ok') + }) + + after(() => server.close()) + await once(server.listen(0), 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + allowH2: true, + connect: { + rejectUnauthorized: false + } + }) + after(() => client.close()) + + const response = await client.request({ path: '/', method: 'GET' }) + const chunks = [] + response.body.on('data', chunk => { + chunks.push(chunk) + }) + await once(response.body, 'end') + + t.strictEqual(response.statusCode, 200) + t.strictEqual(Buffer.concat(chunks).toString(), 'ok') + t.strictEqual(streams, 1) + t.strictEqual(connectCalls, 2) + t.strictEqual(session.destroyed, true) + + await t.completed +}) + +test('truncated HTTP/2 server session resets the client session', async (t) => { + t = tspl(t, { plan: 4 }) + + const server = createSecureServer(await pem.generate({ opts: { keySize: 2048 } })) + const serverSockets = [] + let streams = 0 + + server.on('secureConnection', (socket) => { + socket.on('error', () => {}) + serverSockets.push(socket) + }) + server.on('sessionError', () => {}) + server.on('tlsClientError', () => {}) + + server.on('stream', (stream) => { + streams++ + + if (streams === 1) { + stream.respond({ ':status': 200 }) + stream.write('partial', () => { + setImmediate(() => { + serverSockets[0].destroy() + }) + }) + return + } + + stream.respond({ ':status': 200 }) + stream.end('ok') + }) + + after(() => server.close()) + await once(server.listen(0), 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + allowH2: true, + connect: { + rejectUnauthorized: false + }, + maxConcurrentStreams: 1 + }) + after(() => client.close()) + + const first = await client.request({ path: '/', method: 'GET' }) + + first.body.resume() + const [err] = await once(first.body, 'error') + t.ok(err.code === 'UND_ERR_SOCKET' || err.code === 'ECONNRESET') + + const response = await client.request({ path: '/second', method: 'GET' }) + const chunks = [] + response.body.on('data', chunk => { + chunks.push(chunk) + }) + await once(response.body, 'end') + + t.strictEqual(response.statusCode, 200) + t.strictEqual(Buffer.concat(chunks).toString(), 'ok') + t.strictEqual(streams, 2) + + await t.completed +})