From 637487d1e84b8cefacec4243e1583a3e0e21dfdb Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Tue, 24 Mar 2026 07:46:34 -0300 Subject: [PATCH 01/14] Fix multiple requests on same HTTP1 connection # Conflicts: # Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift # Sources/NIOHTTPServer/NIOHTTPServer+SecureUpgrade.swift # Sources/NIOHTTPServer/NIOHTTPServer.swift --- .../HTTPRequestConcludingAsyncReader.swift | 76 +++++---- .../NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift | 40 ++++- .../NIOHTTPServer+SecureUpgrade.swift | 43 +++++- Sources/NIOHTTPServer/NIOHTTPServer.swift | 146 +++++++----------- .../NIOHTTPServer/RequestBodyReadError.swift | 6 +- ...TTPRequestConcludingAsyncReaderTests.swift | 21 +-- .../NIOHTTPServer+ServiceLifecycleTests.swift | 2 +- .../NIOHTTPServerTests.swift | 83 ++++++++-- .../TestingChannelServer+HTTP1.swift | 2 +- 9 files changed, 263 insertions(+), 156 deletions(-) diff --git a/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift b/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift index b70c4dd..9959116 100644 --- a/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift +++ b/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift @@ -62,9 +62,10 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable private var state: State /// The iterator that provides HTTP request parts from the underlying channel. - private var iterator: NIOAsyncChannelInboundStream.AsyncIterator + /// Taken from ReaderState once at the start of reading, and returned when reading completes. + private var iterator: NIOAsyncChannelInboundStream.AsyncIterator? - init(iterator: NIOAsyncChannelInboundStream.AsyncIterator) { + init(iterator: NIOAsyncChannelInboundStream.AsyncIterator?) { self.state = .readingBody(.noExcess) self.iterator = iterator } @@ -72,7 +73,7 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable enum ReadResult { case readBody(ByteBuffer) case readEnd(HTTPFields?) - case streamFinished + case requestFinished } mutating func read(limit: Int?) async throws -> ReadResult { @@ -88,14 +89,14 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable case .noExcess: // There is no excess from previous reads. We obtain the next element from the stream. - let requestPart = try await self.iterator.next(isolation: #isolation) + let requestPart = try await self.iterator?.next(isolation: #isolation) switch requestPart { case .head: fatalError("Unexpectedly received a request head.") case .none: - throw RequestBodyReadError.streamEndedBeforeReceivingRequestEnd + throw RequestBodyReadError.requestEndedBeforeReceivingEnd case .body(let element): bodyElement = element @@ -119,20 +120,23 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable return .readBody(bodyElement) case .finished: - return .streamFinished + return .requestFinished } } + + /// Takes the iterator out of the state machine for recovery. + mutating func takeIterator() -> NIOAsyncChannelInboundStream.AsyncIterator? { + self.iterator.take() + } } var requestBodyStateMachine: RequestBodyStateMachine - /// Initializes a new request body reader with the given NIO async channel iterator. + /// Initializes a new request body reader. /// - /// - Parameter iterator: The NIO async channel inbound stream iterator to use for reading request parts. - fileprivate init( - iterator: consuming sending NIOAsyncChannelInboundStream.AsyncIterator, - readerState: ReaderState - ) { + /// Takes the iterator from ReaderState so the state machine owns it for the entire read cycle. + fileprivate init(readerState: ReaderState) { + let iterator = readerState.takeIterator() self.requestBodyStateMachine = .init(iterator: iterator) self.state = readerState } @@ -160,13 +164,16 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable return try await body(Array(buffer: readElement).span) case .readEnd(let trailers): + // Reading is complete. Return the iterator to ReaderState. + nonisolated(unsafe) let iterator = self.requestBodyStateMachine.takeIterator() self.state.wrapped.withLock { state in state.trailers = trailers state.finishedReading = true + state.iterator = Disconnected(value: iterator) } return try await body(.init()) - case .streamFinished: + case .requestFinished: return try await body(.init()) } } catch { @@ -176,15 +183,28 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable } final class ReaderState: Sendable { - struct Wrapped { + struct Wrapped: ~Copyable { var trailers: HTTPFields? = nil var finishedReading: Bool = false + + /// The iterator that provides HTTP request parts from the underlying channel. + /// Stored here between read cycles for HTTP/1.1 keep-alive recovery. + var iterator: + Disconnected< + NIOAsyncChannelInboundStream.AsyncIterator? + > } let wrapped: Mutex - init() { - self.wrapped = .init(.init()) + init(iterator: consuming sending NIOAsyncChannelInboundStream.AsyncIterator) { + self.wrapped = .init(.init(iterator: Disconnected(value: iterator))) + } + + func takeIterator() -> sending NIOAsyncChannelInboundStream.AsyncIterator? { + self.wrapped.withLock { state in + state.iterator.swap(newValue: nil) + } } } @@ -197,18 +217,12 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable /// The type of errors that can occur during reading operations. public typealias Failure = any Error - private var iterator: Disconnected.AsyncIterator?> - internal var state: ReaderState - /// Initializes a new HTTP request body and trailers reader with the given NIO async channel iterator. + /// Initializes a new HTTP request body and trailers reader. /// - /// - Parameter iterator: The NIO async channel inbound stream iterator to use for reading request parts. - init( - iterator: consuming sending NIOAsyncChannelInboundStream.AsyncIterator, - readerState: ReaderState - ) { - self.iterator = Disconnected(value: iterator) + /// - Parameter readerState: The shared reader state that holds the iterator and captures trailers. + init(readerState: ReaderState) { self.state = readerState } @@ -240,14 +254,10 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable public consuming func consumeAndConclude( body: nonisolated(nonsending) (consuming sending RequestBodyAsyncReader) async throws(Failure) -> Return ) async throws(Failure) -> (Return, HTTPFields?) { - if let iterator = self.iterator.take() { - let partsReader = RequestBodyAsyncReader(iterator: iterator, readerState: self.state) - let result = try await body(partsReader) - let trailers = self.state.wrapped.withLock { $0.trailers } - return (result, trailers) - } else { - fatalError("consumeAndConclude called more than once") - } + let partsReader = RequestBodyAsyncReader(readerState: self.state) + let result = try await body(partsReader) + let trailers = self.state.wrapped.withLock { $0.trailers } + return (result, trailers) } } diff --git a/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift b/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift index d8c2f01..19d5ee8 100644 --- a/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift +++ b/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// import HTTPAPIs +import Logging import NIOCore import NIOExtras import NIOHTTP1 @@ -45,7 +46,7 @@ extension NIOHTTPServer { do { for try await http1Channel in inbound { group.addTask { - await self.handleRequestChannel(channel: http1Channel, handler: handler) + await self.handleHTTP1RequestChannel(channel: http1Channel, handler: handler) } } @@ -105,4 +106,41 @@ extension NIOHTTPServer { ) } } + + /// Handles an HTTP/1.1 connection channel, which may carry multiple serial requests on the + /// same connection (keep-alive). + func handleHTTP1RequestChannel( + channel: NIOAsyncChannel, + handler: some HTTPServerRequestHandler + ) async { + do { + try await channel.executeThenClose { inbound, outbound in + var iterator = inbound.makeAsyncIterator() + + requestLoop: while true { + guard let httpRequest = try await self.nextRequestHead(from: &iterator) else { + break requestLoop + } + + guard + let recoveredIterator = try await self.invokeHandler( + request: httpRequest, + iterator: iterator, + outbound: outbound, + handler: handler + ) + else { + // Handler did not fully consume the request; cannot continue on this + // connection. + break requestLoop + } + + iterator = recoveredIterator + } + } + } catch { + self.logger.debug("Error thrown while handling HTTP/1.1 connection", metadata: ["error": "\(error)"]) + try? await channel.channel.close() + } + } } diff --git a/Sources/NIOHTTPServer/NIOHTTPServer+SecureUpgrade.swift b/Sources/NIOHTTPServer/NIOHTTPServer+SecureUpgrade.swift index 3b708f0..9ff2316 100644 --- a/Sources/NIOHTTPServer/NIOHTTPServer+SecureUpgrade.swift +++ b/Sources/NIOHTTPServer/NIOHTTPServer+SecureUpgrade.swift @@ -110,7 +110,7 @@ extension NIOHTTPServer { let chainFuture = requestChannel.channel.nioSSL_peerValidatedCertificateChain() await Self.$connectionContext.withValue(ConnectionContext(chainFuture)) { - await self.handleRequestChannel( + await self.handleHTTP1RequestChannel( channel: requestChannel, handler: handler ) @@ -137,7 +137,7 @@ extension NIOHTTPServer { try await Self.$connectionContext.withValue(ConnectionContext(chainFuture)) { for try await streamChannel in multiplexer.inbound { streamGroup.addTask { - await self.handleRequestChannel( + await self.handleHTTP2StreamChannel( channel: streamChannel, handler: handler ) @@ -297,6 +297,45 @@ extension NIOHTTPServer { } } } + + /// Handles an HTTP/2 stream channel, which carries exactly one request per stream. + func handleHTTP2StreamChannel( + channel: NIOAsyncChannel, + handler: some HTTPServerRequestHandler + ) async { + do { + try await channel + .executeThenClose { inbound, outbound in + var iterator = inbound.makeAsyncIterator() + + guard let httpRequest = try await self.nextRequestHead(from: &iterator) else { + outbound.finish() + return + } + + _ = try await self.invokeHandler( + request: httpRequest, + iterator: iterator, + outbound: outbound, + handler: handler + ) + + // TODO: handle other state scenarios. + // For example, if we didn't finish reading but we wrote back a response, we + // should send a RST_STREAM with NO_ERROR set. If we finished reading but we + // didn't write back a response, then RST_STREAM is also likely appropriate but + // unclear about the error. + + // Finish the outbound and wait on the close future to make sure all pending + // writes are actually written. + outbound.finish() + try await channel.channel.closeFuture.get() + } + } catch { + self.logger.debug("Error thrown while handling HTTP/2 stream: \(error)") + try? await channel.channel.close() + } + } } @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) diff --git a/Sources/NIOHTTPServer/NIOHTTPServer.swift b/Sources/NIOHTTPServer/NIOHTTPServer.swift index 96755c6..4e8e971 100644 --- a/Sources/NIOHTTPServer/NIOHTTPServer.swift +++ b/Sources/NIOHTTPServer/NIOHTTPServer.swift @@ -213,103 +213,67 @@ public struct NIOHTTPServer: HTTPServer { } } - /// Handles a single HTTP request. - /// - /// - Note: Errors do not propagate to the caller. When an error occurs, it is logged and the channel is closed. - /// - /// - Parameters: - /// - channel: The async channel to read the request from and write the response to. - /// - handler: The request handler. - func handleRequestChannel( - channel: NIOAsyncChannel, - handler: some HTTPServerRequestHandler - ) async { - do { - try await channel.executeThenClose { inbound, outbound in - var iterator = inbound.makeAsyncIterator() - - let nextPart: HTTPRequestPart? - do { - nextPart = try await iterator.next() - } catch { - self.logger.error( - "Error thrown while advancing the request iterator", - metadata: ["error": "\(error)"] - ) - throw error - } + /// Reads the next request head from the iterator. Returns `nil` if the connection is done or + /// an unexpected part is received. + func nextRequestHead( + from iterator: inout NIOAsyncChannelInboundStream.AsyncIterator + ) async throws -> HTTPRequest? { + switch try await iterator.next(isolation: #isolation) { + case .head(let request): + return request + case .body: + self.logger.debug("Unexpectedly received body on connection. Closing now.") + return nil + case .end: + self.logger.debug("Unexpectedly received end on connection. Closing now.") + return nil + case .none: + self.logger.trace("No more request parts on connection") + return nil + } + } - let httpRequest: HTTPRequest - switch nextPart { - case .head(let request): - httpRequest = request - case .body: - self.logger.debug("Unexpectedly received body on connection. Closing now") - outbound.finish() - return - case .end: - self.logger.debug("Unexpectedly received end on connection. Closing now") - outbound.finish() - return - case .none: - self.logger.trace("No more requests parts on connection") - return - } + /// Shared core: invokes the request handler with the appropriate reader/writer state. + /// Returns the recovered iterator if the request was fully consumed (for HTTP/1.1 reuse), + /// or `nil` if the request could not be fully consumed. + func invokeHandler( + request: HTTPRequest, + iterator: consuming sending NIOAsyncChannelInboundStream.AsyncIterator, + outbound: NIOAsyncChannelOutboundWriter, + handler: some HTTPServerRequestHandler + ) async throws -> NIOAsyncChannelInboundStream.AsyncIterator? { + let readerState = HTTPRequestConcludingAsyncReader.ReaderState(iterator: iterator) + let writerState = HTTPResponseConcludingAsyncWriter.WriterState() - let readerState = HTTPRequestConcludingAsyncReader.ReaderState() - let writerState = HTTPResponseConcludingAsyncWriter.WriterState() - - do { - try await handler.handle( - request: httpRequest, - requestContext: HTTPRequestContext(), - requestBodyAndTrailers: HTTPRequestConcludingAsyncReader( - iterator: iterator, - readerState: readerState - ), - responseSender: HTTPResponseSender { response in - try await outbound.write(.head(response)) - return HTTPResponseConcludingAsyncWriter( - writer: outbound, - writerState: writerState - ) - } sendInformational: { response in - try await outbound.write(.head(response)) - } + do { + try await handler.handle( + request: request, + requestContext: HTTPRequestContext(), + requestBodyAndTrailers: HTTPRequestConcludingAsyncReader( + readerState: readerState + ), + responseSender: HTTPResponseSender { response in + try await outbound.write(.head(response)) + return HTTPResponseConcludingAsyncWriter( + writer: outbound, + writerState: writerState ) - } catch { - if !readerState.wrapped.withLock({ $0.finishedReading }) { - self.logger.error("Did not finish reading but error thrown.") - // TODO: if h2 reset stream; if h1 try draining request? - } - - if !writerState.wrapped.withLock({ $0.finishedWriting }) { - self.logger.error("Did not write response but error thrown.") - // TODO: we need to do something, possibly just close the connection or - // reset the stream with the appropriate error. - } - - throw error + } sendInformational: { response in + try await outbound.write(.head(response)) } - - // TODO: handle other state scenarios. - // For example, if we're using h2 and we didn't finish reading but we wrote back - // a response, we should send a RST_STREAM with NO_ERROR set. - // If we finished reading but we didn't write back a response, then RST_STREAM - // is also likely appropriate but unclear about the error. - // For h1, we should close the connection. - - // Finish the outbound and wait on the close future to make sure all pending - // writes are actually written. - outbound.finish() - try await channel.channel.closeFuture.get() - } + ) } catch { - // TODO: We need to send a response head here potentially - self.logger.error("Error thrown while handling connection", metadata: ["error": "\(error)"]) - - try? await channel.channel.close() + logger.error("Error thrown while handling request: \(error)") + if !readerState.wrapped.withLock({ $0.finishedReading }) { + logger.error("Did not finish reading but error thrown.") + } + if !writerState.wrapped.withLock({ $0.finishedWriting }) { + logger.error("Did not write response but error thrown.") + } + throw error } + + return readerState.takeIterator() } /// Fail the listening address promise if the server is shutting down before it began listening. diff --git a/Sources/NIOHTTPServer/RequestBodyReadError.swift b/Sources/NIOHTTPServer/RequestBodyReadError.swift index ec23503..ebd72ed 100644 --- a/Sources/NIOHTTPServer/RequestBodyReadError.swift +++ b/Sources/NIOHTTPServer/RequestBodyReadError.swift @@ -13,12 +13,12 @@ //===----------------------------------------------------------------------===// enum RequestBodyReadError: Error, CustomStringConvertible { - case streamEndedBeforeReceivingRequestEnd + case requestEndedBeforeReceivingEnd var description: String { switch self { - case .streamEndedBeforeReceivingRequestEnd: - "The request stream unexpectedly ended before receiving a request end part." + case .requestEndedBeforeReceivingEnd: + "The request unexpectedly ended before receiving a request end part." } } } diff --git a/Tests/NIOHTTPServerTests/HTTPRequestConcludingAsyncReaderTests.swift b/Tests/NIOHTTPServerTests/HTTPRequestConcludingAsyncReaderTests.swift index 7095595..325abb9 100644 --- a/Tests/NIOHTTPServerTests/HTTPRequestConcludingAsyncReaderTests.swift +++ b/Tests/NIOHTTPServerTests/HTTPRequestConcludingAsyncReaderTests.swift @@ -36,8 +36,7 @@ struct HTTPRequestConcludingAsyncReaderTests { source.finish() let requestReader = HTTPRequestConcludingAsyncReader( - iterator: stream.makeAsyncIterator(), - readerState: .init() + readerState: .init(iterator: stream.makeAsyncIterator()) ) _ = try await requestReader.consumeAndConclude { bodyReader in @@ -58,8 +57,7 @@ struct HTTPRequestConcludingAsyncReaderTests { source.finish() let requestReader = HTTPRequestConcludingAsyncReader( - iterator: stream.makeAsyncIterator(), - readerState: .init() + readerState: .init(iterator: stream.makeAsyncIterator()) ) _ = try await requestReader.consumeAndConclude { bodyReader in @@ -90,7 +88,7 @@ struct HTTPRequestConcludingAsyncReaderTests { source.finish() // Then start reading the request - let requestReader = HTTPRequestConcludingAsyncReader(iterator: stream.makeAsyncIterator(), readerState: .init()) + let requestReader = HTTPRequestConcludingAsyncReader(readerState: .init(iterator: stream.makeAsyncIterator())) let (requestBody, finalElement) = try await requestReader.consumeAndConclude { bodyReader in var bodyReader = bodyReader @@ -140,8 +138,7 @@ struct HTTPRequestConcludingAsyncReaderTests { group.addTask { let requestReader = HTTPRequestConcludingAsyncReader( - iterator: stream.makeAsyncIterator(), - readerState: .init() + readerState: .init(iterator: stream.makeAsyncIterator()) ) let (_, finalElement) = try await requestReader.consumeAndConclude { bodyReader in // Read all body chunks @@ -174,8 +171,7 @@ struct HTTPRequestConcludingAsyncReaderTests { source.finish() let requestReader = HTTPRequestConcludingAsyncReader( - iterator: stream.makeAsyncIterator(), - readerState: .init() + readerState: .init(iterator: stream.makeAsyncIterator()) ) _ = await requestReader.consumeAndConclude { bodyReader in @@ -204,8 +200,7 @@ struct HTTPRequestConcludingAsyncReaderTests { source.finish() let requestReader = HTTPRequestConcludingAsyncReader( - iterator: stream.makeAsyncIterator(), - readerState: .init() + readerState: .init(iterator: stream.makeAsyncIterator()) ) _ = try await requestReader.consumeAndConclude { requestBodyReader in @@ -237,7 +232,7 @@ struct HTTPRequestConcludingAsyncReaderTests { let streamIterator = stream.makeAsyncIterator() - let requestReader = HTTPRequestConcludingAsyncReader(iterator: streamIterator, readerState: .init()) + let requestReader = HTTPRequestConcludingAsyncReader(readerState: .init(iterator: streamIterator)) _ = try await requestReader.consumeAndConclude { requestBodyReader in var requestBodyReader = requestBodyReader @@ -290,7 +285,7 @@ struct HTTPRequestConcludingAsyncReaderTests { let streamIterator = stream.makeAsyncIterator() - let requestReader = HTTPRequestConcludingAsyncReader(iterator: streamIterator, readerState: .init()) + let requestReader = HTTPRequestConcludingAsyncReader(readerState: .init(iterator: streamIterator)) _ = try await requestReader.consumeAndConclude { requestBodyReader in var requestBodyReader = requestBodyReader diff --git a/Tests/NIOHTTPServerTests/NIOHTTPServer+ServiceLifecycleTests.swift b/Tests/NIOHTTPServerTests/NIOHTTPServer+ServiceLifecycleTests.swift index aff9b36..47ea58c 100644 --- a/Tests/NIOHTTPServerTests/NIOHTTPServer+ServiceLifecycleTests.swift +++ b/Tests/NIOHTTPServerTests/NIOHTTPServer+ServiceLifecycleTests.swift @@ -234,7 +234,7 @@ struct NIOHTTPServiceLifecycleTests { // intentional because we want to keep the connection alive until the grace timer (500ms) fires. try await bodyReader.read(maximumCount: Self.bodyData.readableBytes) { _ in } } - #expect(throws: RequestBodyReadError.streamEndedBeforeReceivingRequestEnd) { try error.unwrap() } + #expect(throws: RequestBodyReadError.requestEndedBeforeReceivingEnd) { try error.unwrap() } } } } diff --git a/Tests/NIOHTTPServerTests/NIOHTTPServerTests.swift b/Tests/NIOHTTPServerTests/NIOHTTPServerTests.swift index 2eef788..226f6e4 100644 --- a/Tests/NIOHTTPServerTests/NIOHTTPServerTests.swift +++ b/Tests/NIOHTTPServerTests/NIOHTTPServerTests.swift @@ -110,7 +110,8 @@ struct NIOHTTPServerTests { inbound, expectedHead: [Self.responseHead(status: .ok, for: .http1_1)], expectedBody: [Self.bodyData], - expectedTrailers: Self.trailer + expectedTrailers: Self.trailer, + expectStreamEnd: false ) } } @@ -190,7 +191,8 @@ struct NIOHTTPServerTests { inbound, expectedHead: [Self.responseHead(status: .ok, for: httpVersion)], expectedBody: [Self.bodyData], - expectedTrailers: Self.trailer + expectedTrailers: Self.trailer, + expectStreamEnd: httpVersion == .http2 ) responseReceived() @@ -240,7 +242,8 @@ struct NIOHTTPServerTests { Self.responseHead(status: .ok, for: httpVersion), ], expectedBody: [Self.bodyData], - expectedTrailers: Self.trailer + expectedTrailers: Self.trailer, + expectStreamEnd: httpVersion == .http2 ) responseReceived() } @@ -372,6 +375,60 @@ struct NIOHTTPServerTests { ) } + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + @Test("Multiple serial HTTP/1.1 requests on the same connection") + func testMultipleSerialHTTP1Requests() async throws { + let server = NIOHTTPServer( + logger: self.serverLogger, + configuration: try .init( + bindTarget: .hostAndPort(host: "127.0.0.1", port: 0), + supportedHTTPVersions: [.http1_1], + transportSecurity: .plaintext + ) + ) + + let requestCount = 3 + + try await confirmation(expectedCount: requestCount) { responseReceived in + try await Self.withServer( + server: server, + serverHandler: HTTPServerClosureRequestHandler { request, requestContext, reader, responseWriter in + // Echo the request body back as the response body. + try await Self.echoResponse(readUpTo: 1024, reader: reader, sender: responseWriter) + }, + body: { serverAddress in + let client = try await ClientBootstrap(group: .singletonMultiThreadedEventLoopGroup) + .connectToTestHTTP1Server(at: serverAddress) + + try await client.executeThenClose { inbound, outbound in + var responseIterator = inbound.makeAsyncIterator() + + for i in 1...requestCount { + // Send request + try await outbound.write( + .head(.init(method: .post, scheme: "http", authority: "", path: "/\(i)")) + ) + try await outbound.write(Self.reqBody) + try await outbound.write(.end(nil)) + + // Read response + let headPart = try await responseIterator.next() + #expect(headPart == .head(Self.responseHead(status: .ok, for: .http1_1))) + + let bodyPart = try await responseIterator.next() + #expect(bodyPart == .body(Self.bodyData)) + + let endPart = try await responseIterator.next() + #expect(endPart == .end(nil)) + + responseReceived() + } + } + } + ) + } + } + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) @Test("Multiple concurrent connections", arguments: [HTTPVersion.http1_1, HTTPVersion.http2]) func testMultipleConcurrentConnections(httpVersion: HTTPVersion) async throws { @@ -427,7 +484,8 @@ struct NIOHTTPServerTests { try await Self.validateResponse( inbound, expectedHead: [Self.responseHead(status: .ok, for: httpVersion)], - expectedBody: [Self.bodyData] + expectedBody: [Self.bodyData], + expectStreamEnd: httpVersion == .http2 ) responseReceived() @@ -543,7 +601,6 @@ struct NIOHTTPServerTests { try await firstClientChannel.executeThenClose { inbound, outbound in // Only send a request head; finish the stream immediately afterwards. try await outbound.write(.head(.init(method: .post, scheme: "http", authority: "", path: "/"))) - outbound.finish() } try await firstRequestErrorCaught.futureResult.get() @@ -559,7 +616,8 @@ struct NIOHTTPServerTests { try await Self.validateResponse( inbound, expectedHead: [Self.responseHead(status: .ok, for: .http1_1)], - expectedBody: [Self.bodyData] + expectedBody: [Self.bodyData], + expectStreamEnd: false ) responseReceived() @@ -615,6 +673,7 @@ extension NIOHTTPServerTests { expectedHead: [HTTPResponse], expectedBody: [ByteBuffer], expectedTrailers: HTTPFields? = nil, + expectStreamEnd: Bool = true, sourceLocation: SourceLocation = #_sourceLocation ) async throws { var responseIterator = responseStream.makeAsyncIterator() @@ -632,11 +691,13 @@ extension NIOHTTPServerTests { let endResponsePart = try await responseIterator.next() #expect(endResponsePart == .end(expectedTrailers), sourceLocation: sourceLocation) - #expect( - try await responseIterator.next() == nil, - "Received another response part when the response stream should have finished.", - sourceLocation: sourceLocation - ) + if expectStreamEnd { + #expect( + try await responseIterator.next() == nil, + "Received another response part when the response stream should have finished.", + sourceLocation: sourceLocation + ) + } } /// Unwraps a negotiated channel, asserting it matches the expected `httpVersion`. For HTTP/2, opens and returns a diff --git a/Tests/NIOHTTPServerTests/Utilities/TestingChannelClientServer/TestingChannelServer+HTTP1.swift b/Tests/NIOHTTPServerTests/Utilities/TestingChannelClientServer/TestingChannelServer+HTTP1.swift index 9af92f8..4153738 100644 --- a/Tests/NIOHTTPServerTests/Utilities/TestingChannelClientServer/TestingChannelServer+HTTP1.swift +++ b/Tests/NIOHTTPServerTests/Utilities/TestingChannelClientServer/TestingChannelServer+HTTP1.swift @@ -98,7 +98,7 @@ struct TestingChannelHTTP1Server { try await body(clientAsyncChannel) - try await serverTestConnectionChannel.close() + try? await serverTestConnectionChannel.close() } } } From 4e5ad11e714084abfb528e4a4af1d4942021ff09 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Thu, 2 Apr 2026 12:02:29 +0100 Subject: [PATCH 02/14] Format --- .../NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift b/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift index 19d5ee8..15e02a5 100644 --- a/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift +++ b/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift @@ -115,29 +115,29 @@ extension NIOHTTPServer { ) async { do { try await channel.executeThenClose { inbound, outbound in - var iterator = inbound.makeAsyncIterator() + var iterator = inbound.makeAsyncIterator() - requestLoop: while true { - guard let httpRequest = try await self.nextRequestHead(from: &iterator) else { - break requestLoop - } - - guard - let recoveredIterator = try await self.invokeHandler( - request: httpRequest, - iterator: iterator, - outbound: outbound, - handler: handler - ) - else { - // Handler did not fully consume the request; cannot continue on this - // connection. - break requestLoop - } + requestLoop: while true { + guard let httpRequest = try await self.nextRequestHead(from: &iterator) else { + break requestLoop + } - iterator = recoveredIterator + guard + let recoveredIterator = try await self.invokeHandler( + request: httpRequest, + iterator: iterator, + outbound: outbound, + handler: handler + ) + else { + // Handler did not fully consume the request; cannot continue on this + // connection. + break requestLoop } + + iterator = recoveredIterator } + } } catch { self.logger.debug("Error thrown while handling HTTP/1.1 connection", metadata: ["error": "\(error)"]) try? await channel.channel.close() From cfc55a4f95052f8543ebc8f441ced28a2df12e63 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Thu, 9 Apr 2026 16:09:14 +0100 Subject: [PATCH 03/14] Exit request loop if task is cancelled --- Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift b/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift index 15e02a5..58a0cc7 100644 --- a/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift +++ b/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift @@ -117,7 +117,7 @@ extension NIOHTTPServer { try await channel.executeThenClose { inbound, outbound in var iterator = inbound.makeAsyncIterator() - requestLoop: while true { + requestLoop: while !Task.isCancelled { guard let httpRequest = try await self.nextRequestHead(from: &iterator) else { break requestLoop } From fd80541fdccc0570ffa462d95d2fe7dbededf331 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Thu, 9 Apr 2026 16:09:34 +0100 Subject: [PATCH 04/14] Simplify iterator sharing cycle --- .../HTTPRequestConcludingAsyncReader.swift | 45 ++++++++----------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift b/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift index 9959116..6bd5f28 100644 --- a/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift +++ b/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift @@ -60,19 +60,20 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable } private var state: State + private var readerState: ReaderState /// The iterator that provides HTTP request parts from the underlying channel. /// Taken from ReaderState once at the start of reading, and returned when reading completes. private var iterator: NIOAsyncChannelInboundStream.AsyncIterator? - init(iterator: NIOAsyncChannelInboundStream.AsyncIterator?) { + init(readerState: ReaderState) { self.state = .readingBody(.noExcess) - self.iterator = iterator + self.readerState = readerState + self.iterator = readerState.takeIterator() } enum ReadResult { case readBody(ByteBuffer) - case readEnd(HTTPFields?) case requestFinished } @@ -103,7 +104,14 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable case .end(let trailers): self.state = .finished - return .readEnd(trailers) + nonisolated(unsafe) let iterator = self.iterator.take() + self.readerState.wrapped.withLock { state in + state.finishedReading = true + state.trailers = trailers + let disconnected = Disconnected(value: iterator) + state.iterator = disconnected.take() + } + return .requestFinished } } @@ -123,11 +131,6 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable return .requestFinished } } - - /// Takes the iterator out of the state machine for recovery. - mutating func takeIterator() -> NIOAsyncChannelInboundStream.AsyncIterator? { - self.iterator.take() - } } var requestBodyStateMachine: RequestBodyStateMachine @@ -136,8 +139,7 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable /// /// Takes the iterator from ReaderState so the state machine owns it for the entire read cycle. fileprivate init(readerState: ReaderState) { - let iterator = readerState.takeIterator() - self.requestBodyStateMachine = .init(iterator: iterator) + self.requestBodyStateMachine = .init(readerState: readerState) self.state = readerState } @@ -163,16 +165,6 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable case .readBody(let readElement): return try await body(Array(buffer: readElement).span) - case .readEnd(let trailers): - // Reading is complete. Return the iterator to ReaderState. - nonisolated(unsafe) let iterator = self.requestBodyStateMachine.takeIterator() - self.state.wrapped.withLock { state in - state.trailers = trailers - state.finishedReading = true - state.iterator = Disconnected(value: iterator) - } - return try await body(.init()) - case .requestFinished: return try await body(.init()) } @@ -189,21 +181,20 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable /// The iterator that provides HTTP request parts from the underlying channel. /// Stored here between read cycles for HTTP/1.1 keep-alive recovery. - var iterator: - Disconnected< - NIOAsyncChannelInboundStream.AsyncIterator? - > + var iterator: NIOAsyncChannelInboundStream.AsyncIterator? } let wrapped: Mutex init(iterator: consuming sending NIOAsyncChannelInboundStream.AsyncIterator) { - self.wrapped = .init(.init(iterator: Disconnected(value: iterator))) + self.wrapped = .init(.init(iterator: iterator)) } func takeIterator() -> sending NIOAsyncChannelInboundStream.AsyncIterator? { self.wrapped.withLock { state in - state.iterator.swap(newValue: nil) + let iterator = state.iterator + state.iterator = nil + return iterator } } } From 3f0a85a3e372b1c4c18d2d6837d234796f1f31e2 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Mon, 13 Apr 2026 16:35:38 +0100 Subject: [PATCH 05/14] Fix flaky test --- .../NIOHTTPServer+ServiceLifecycleTests.swift | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/Tests/NIOHTTPServerTests/NIOHTTPServer+ServiceLifecycleTests.swift b/Tests/NIOHTTPServerTests/NIOHTTPServer+ServiceLifecycleTests.swift index 47ea58c..95b8261 100644 --- a/Tests/NIOHTTPServerTests/NIOHTTPServer+ServiceLifecycleTests.swift +++ b/Tests/NIOHTTPServerTests/NIOHTTPServer+ServiceLifecycleTests.swift @@ -184,6 +184,13 @@ struct NIOHTTPServiceLifecycleTests { // Wait for the server to shut down. try await group.waitForAll() + // Wait for the client channel to be fully closed. The server has closed + // its side of the connection, but the client's event loop may not have + // processed the TCP FIN/RST yet. closeFuture completes only once the + // channel is fully inactive, which is a stronger guarantee than just + // draining inbound (which may return while the channel is half-closed). + try await client.channel.closeFuture.get() + // We shouldn't be able to complete our request; the server should have shut down. await #expect(throws: ChannelError.ioOnClosedChannel) { try await outbound.write(Self.reqBody) From 03621554ad4ed4c00dec615e15903e9d76e66f05 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Mon, 13 Apr 2026 17:43:58 +0100 Subject: [PATCH 06/14] Disable 6.2 --- .github/workflows/main.yml | 4 +++- .github/workflows/pull_request.yml | 11 +++++------ Package.swift | 2 +- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 53ce63f..00c227f 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -16,7 +16,7 @@ jobs: with: linux_6_0_enabled: false linux_6_1_enabled: false - linux_6_2_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable" + linux_6_2_enabled: false linux_6_3_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable" linux_nightly_next_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable" linux_nightly_main_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable" @@ -31,3 +31,5 @@ jobs: with: linux_6_0_enabled: false linux_6_1_enabled: false + linux_6_2_enabled: false + linux_6_3_enabled: true diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 5c82e85..39e7f61 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -12,7 +12,7 @@ jobs: name: Soundness uses: swiftlang/github-workflows/.github/workflows/soundness.yml@0.0.7 with: - api_breakage_check_container_image: "swift:6.2-noble" + api_breakage_check_container_image: "swift:6.3-noble" format_check_container_image: "swiftlang/swift:nightly-main-noble" # Needed due to https://github.com/swiftlang/swift-format/issues/1081 license_header_check_project_name: "Swift HTTP Server" @@ -23,9 +23,7 @@ jobs: linux_5_10_enabled: false linux_6_0_enabled: false linux_6_1_enabled: false - # linux_6_1_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error" - linux_6_2_enabled: true - linux_6_2_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error" + linux_6_2_enabled: false linux_6_3_enabled: true linux_6_3_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error" linux_nightly_next_arguments_override: "--explicit-target-dependency-import-check error" @@ -38,7 +36,7 @@ jobs: linux_5_10_enabled: false linux_6_0_enabled: false linux_6_1_enabled: false - linux_6_2_enabled: true + linux_6_2_enabled: false linux_6_3_enabled: true static-sdk: @@ -52,4 +50,5 @@ jobs: linux_5_10_enabled: false linux_6_0_enabled: false linux_6_1_enabled: false - linux_6_2_enabled: true + linux_6_2_enabled: false + linux_6_3_enabled: true diff --git a/Package.swift b/Package.swift index 859d9fa..249d23d 100644 --- a/Package.swift +++ b/Package.swift @@ -1,4 +1,4 @@ -// swift-tools-version:6.2 +// swift-tools-version:6.3 //===----------------------------------------------------------------------===// // // This source file is part of the Swift HTTP Server open source project From 89ec7678af5bac9af1ebe77f131cc1abe1731bd1 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Fri, 24 Apr 2026 10:14:48 +0100 Subject: [PATCH 07/14] Drain request when not done reading --- .../HTTPRequestConcludingAsyncReader.swift | 45 ++++++++++++++----- Sources/NIOHTTPServer/NIOHTTPServer.swift | 42 ++++++++++++++++- 2 files changed, 74 insertions(+), 13 deletions(-) diff --git a/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift b/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift index 6bd5f28..e718026 100644 --- a/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift +++ b/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift @@ -38,7 +38,7 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable /// The type of errors that can occur during reading operations. public typealias ReadFailure = any Error - /// The HTTP trailer fields captured at the end of the request. + /// The shared reader state that holds the iterator and captures trailers. fileprivate var state: ReaderState struct RequestBodyStateMachine { @@ -62,14 +62,9 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable private var state: State private var readerState: ReaderState - /// The iterator that provides HTTP request parts from the underlying channel. - /// Taken from ReaderState once at the start of reading, and returned when reading completes. - private var iterator: NIOAsyncChannelInboundStream.AsyncIterator? - init(readerState: ReaderState) { self.state = .readingBody(.noExcess) self.readerState = readerState - self.iterator = readerState.takeIterator() } enum ReadResult { @@ -90,26 +85,45 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable case .noExcess: // There is no excess from previous reads. We obtain the next element from the stream. - let requestPart = try await self.iterator?.next(isolation: #isolation) + // Take the iterator from ReaderState, read one part, and put it back. + // This ensures the iterator is always recoverable even if the reader + // is dropped without consuming .end. + guard var iterator = self.readerState.takeIterator() else { + throw RequestBodyReadError.requestEndedBeforeReceivingEnd + } + + let requestPart: HTTPRequestPart? + do { + requestPart = try await iterator.next(isolation: #isolation) + } catch { + // Put the iterator back before propagating the error. + nonisolated(unsafe) let iter = iterator + self.readerState.putIterator(iter) + throw error + } switch requestPart { case .head: + nonisolated(unsafe) let iter = iterator + self.readerState.putIterator(iter) fatalError("Unexpectedly received a request head.") case .none: + // Stream ended without .end — don't put iterator back. throw RequestBodyReadError.requestEndedBeforeReceivingEnd case .body(let element): + nonisolated(unsafe) let iter = iterator + self.readerState.putIterator(iter) bodyElement = element case .end(let trailers): self.state = .finished - nonisolated(unsafe) let iterator = self.iterator.take() + nonisolated(unsafe) let iter = iterator + self.readerState.putIterator(iter) self.readerState.wrapped.withLock { state in state.finishedReading = true state.trailers = trailers - let disconnected = Disconnected(value: iterator) - state.iterator = disconnected.take() } return .requestFinished } @@ -136,8 +150,6 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable var requestBodyStateMachine: RequestBodyStateMachine /// Initializes a new request body reader. - /// - /// Takes the iterator from ReaderState so the state machine owns it for the entire read cycle. fileprivate init(readerState: ReaderState) { self.requestBodyStateMachine = .init(readerState: readerState) self.state = readerState @@ -197,6 +209,15 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable return iterator } } + + func putIterator( + _ iterator: consuming sending NIOAsyncChannelInboundStream.AsyncIterator + ) { + var disconnected = Disconnected(value: Optional(iterator)) + self.wrapped.withLock { state in + state.iterator = disconnected.swap(newValue: nil) + } + } } /// The underlying reader type for the HTTP request body. diff --git a/Sources/NIOHTTPServer/NIOHTTPServer.swift b/Sources/NIOHTTPServer/NIOHTTPServer.swift index 4e8e971..c38bac3 100644 --- a/Sources/NIOHTTPServer/NIOHTTPServer.swift +++ b/Sources/NIOHTTPServer/NIOHTTPServer.swift @@ -273,7 +273,47 @@ public struct NIOHTTPServer: HTTPServer { throw error } - return readerState.takeIterator() + // If the handler didn't properly conclude the response, the HTTP codec + // is in an inconsistent state and the connection cannot be reused. + if !writerState.wrapped.withLock({ $0.finishedWriting }) { + self.logger.debug("Handler did not conclude the response. Closing connection.") + return nil + } + + // Recover the iterator for potential connection reuse. + guard var recoveredIterator = readerState.takeIterator() else { + // The handler started reading the request body but didn't finish. + // The iterator was consumed by the reader and not returned. + return nil + } + + // If the handler didn't fully consume the request body, drain the remaining + // parts so the iterator is positioned at the start of the next request. + // Errors during draining are not propagated — if the drain fails, we simply + // cannot reuse this connection. + if !readerState.wrapped.withLock({ $0.finishedReading }) { + do { + drainLoop: while true { + switch try await recoveredIterator.next(isolation: #isolation) { + case .head: + self.logger.debug( + "Unexpectedly received request head while draining unconsumed request body." + ) + return nil + case .body: + continue drainLoop + case .end: + break drainLoop + case .none: + return nil + } + } + } catch { + return nil + } + } + + return recoveredIterator } /// Fail the listening address promise if the server is shutting down before it began listening. From 51d83beaefc31279465bad85f999d71b3001e3ef Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 29 Apr 2026 14:37:21 +0100 Subject: [PATCH 08/14] Add some additional temporary logging --- .../NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift | 49 ++++++++++++++++--- 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift b/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift index 58a0cc7..2b53b58 100644 --- a/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift +++ b/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift @@ -97,13 +97,15 @@ extension NIOHTTPServer { channel: any Channel, asyncChannelConfiguration: NIOAsyncChannel.Configuration ) -> EventLoopFuture> { - channel.pipeline.configureHTTPServerPipeline().flatMapThrowing { - try channel.pipeline.syncOperations.addHandler(HTTP1ToHTTPServerCodec(secure: false)) + channel.pipeline.addHandler(LoggingHandler()).flatMap { + channel.pipeline.configureHTTPServerPipeline().flatMapThrowing { + try channel.pipeline.syncOperations.addHandler(HTTP1ToHTTPServerCodec(secure: false)) - return try NIOAsyncChannel( - wrappingChannelSynchronously: channel, - configuration: asyncChannelConfiguration - ) + return try NIOAsyncChannel( + wrappingChannelSynchronously: channel, + configuration: asyncChannelConfiguration + ) + } } } @@ -144,3 +146,38 @@ extension NIOHTTPServer { } } } + +import Runtime +final class LoggingHandler: ChannelDuplexHandler, Sendable { + typealias InboundIn = ByteBuffer + typealias OutboundIn = ByteBuffer + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + print("channelRead: \(self.unwrapInboundIn(data))") + context.fireChannelRead(data) + } + + func channelReadComplete(context: ChannelHandlerContext) { + print("channelReadComplete") + context.fireChannelReadComplete() + } + + func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise?) { + if #available(macOS 26.0, *) { + print("close", try! Backtrace.capture().symbolicated()?.description) + } else { + // Fallback on earlier versions + } + context.close(mode: mode, promise: promise) + } + + func flush(context: ChannelHandlerContext) { + print("flush") + context.flush() + } + + func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { + print("write: \(self.unwrapOutboundIn(data))") + context.write(data, promise: promise) + } +} From cc71a04fa8246e3c3f611aac0e031d035b0a3fb6 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Thu, 30 Apr 2026 12:42:43 +0100 Subject: [PATCH 09/14] Revert "Add some additional temporary logging" This reverts commit 51d83beaefc31279465bad85f999d71b3001e3ef. --- .../NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift | 49 +++---------------- 1 file changed, 6 insertions(+), 43 deletions(-) diff --git a/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift b/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift index 2b53b58..58a0cc7 100644 --- a/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift +++ b/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift @@ -97,15 +97,13 @@ extension NIOHTTPServer { channel: any Channel, asyncChannelConfiguration: NIOAsyncChannel.Configuration ) -> EventLoopFuture> { - channel.pipeline.addHandler(LoggingHandler()).flatMap { - channel.pipeline.configureHTTPServerPipeline().flatMapThrowing { - try channel.pipeline.syncOperations.addHandler(HTTP1ToHTTPServerCodec(secure: false)) + channel.pipeline.configureHTTPServerPipeline().flatMapThrowing { + try channel.pipeline.syncOperations.addHandler(HTTP1ToHTTPServerCodec(secure: false)) - return try NIOAsyncChannel( - wrappingChannelSynchronously: channel, - configuration: asyncChannelConfiguration - ) - } + return try NIOAsyncChannel( + wrappingChannelSynchronously: channel, + configuration: asyncChannelConfiguration + ) } } @@ -146,38 +144,3 @@ extension NIOHTTPServer { } } } - -import Runtime -final class LoggingHandler: ChannelDuplexHandler, Sendable { - typealias InboundIn = ByteBuffer - typealias OutboundIn = ByteBuffer - - func channelRead(context: ChannelHandlerContext, data: NIOAny) { - print("channelRead: \(self.unwrapInboundIn(data))") - context.fireChannelRead(data) - } - - func channelReadComplete(context: ChannelHandlerContext) { - print("channelReadComplete") - context.fireChannelReadComplete() - } - - func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise?) { - if #available(macOS 26.0, *) { - print("close", try! Backtrace.capture().symbolicated()?.description) - } else { - // Fallback on earlier versions - } - context.close(mode: mode, promise: promise) - } - - func flush(context: ChannelHandlerContext) { - print("flush") - context.flush() - } - - func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { - print("write: \(self.unwrapOutboundIn(data))") - context.write(data, promise: promise) - } -} From 3d25f8147db755f1936c4240cc4bd2882aad257d Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 6 May 2026 10:47:04 +0100 Subject: [PATCH 10/14] Drain requests up to 254KB at end of handler in H1 --- Sources/NIOHTTPServer/NIOHTTPServer.swift | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/Sources/NIOHTTPServer/NIOHTTPServer.swift b/Sources/NIOHTTPServer/NIOHTTPServer.swift index c38bac3..14e613d 100644 --- a/Sources/NIOHTTPServer/NIOHTTPServer.swift +++ b/Sources/NIOHTTPServer/NIOHTTPServer.swift @@ -84,6 +84,10 @@ public struct NIOHTTPServer: HTTPServer { public typealias RequestConcludingReader = HTTPRequestConcludingAsyncReader public typealias ResponseConcludingWriter = HTTPResponseConcludingAsyncWriter + /// Maximum number of bytes to drain from an unconsumed request body before + /// giving up and closing the connection. + private var maxDrainBytes: Int { 256 * 1024 } + let logger: Logger let configuration: NIOHTTPServerConfiguration @@ -289,9 +293,10 @@ public struct NIOHTTPServer: HTTPServer { // If the handler didn't fully consume the request body, drain the remaining // parts so the iterator is positioned at the start of the next request. - // Errors during draining are not propagated — if the drain fails, we simply - // cannot reuse this connection. + // To prevent an attacker from keeping the connection in an infinite draining + // state, we only drain up to `Self.bytesDrained`. If more remains, close the connection. if !readerState.wrapped.withLock({ $0.finishedReading }) { + var bytesDrained = 0 do { drainLoop: while true { switch try await recoveredIterator.next(isolation: #isolation) { @@ -300,7 +305,11 @@ public struct NIOHTTPServer: HTTPServer { "Unexpectedly received request head while draining unconsumed request body." ) return nil - case .body: + case .body(let buffer): + bytesDrained += buffer.readableBytes + if bytesDrained > self.maxDrainBytes { + return nil + } continue drainLoop case .end: break drainLoop From ba2dde73a9da57e061754f4cb2bb1e44f2f7f2e1 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Fri, 15 May 2026 17:02:28 +0100 Subject: [PATCH 11/14] Add HTTPKeepAliveHandler --- .../NIOHTTPServer/HTTPKeepAliveHandler.swift | 171 ++++++ .../NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift | 1 + Sources/NIOHTTPServer/NIOHTTPServer.swift | 74 +-- .../HTTPKeepAliveHandlerTests.swift | 493 ++++++++++++++++++ 4 files changed, 684 insertions(+), 55 deletions(-) create mode 100644 Sources/NIOHTTPServer/HTTPKeepAliveHandler.swift create mode 100644 Tests/NIOHTTPServerTests/HTTPKeepAliveHandlerTests.swift diff --git a/Sources/NIOHTTPServer/HTTPKeepAliveHandler.swift b/Sources/NIOHTTPServer/HTTPKeepAliveHandler.swift new file mode 100644 index 0000000..e3e54e2 --- /dev/null +++ b/Sources/NIOHTTPServer/HTTPKeepAliveHandler.swift @@ -0,0 +1,171 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift HTTP Server open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift HTTP Server project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift HTTP Server project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import HTTPTypes +import NIOCore +import NIOHTTPTypes + +/// A NIO channel handler that ensures HTTP/1.1 keep-alive semantics are honored when +/// the server starts writing a response before the request body has been fully read. +/// +/// The handler buffers only the outbound response head whenever it is written before +/// the request `.end` has been received. The buffered head is released in one of three +/// ways: +/// +/// - **Request `.end` arrives before any response part is written**: the head is +/// flushed as-is and the response streams normally. The connection can be reused. +/// - **A response body part is written before request `.end` arrives**: the buffered +/// head is amended with `Connection: close`, then flushed; subsequent parts stream +/// directly; once response `.end` is written, the connection is closed. +/// - **Response `.end` is written while the head is still buffered (no body written)**: +/// the head is amended with `Connection: close`, flushed, followed by `.end`, then +/// the connection is closed. +/// +/// Any time the head is flushed *because* the handler started producing the response +/// before the request was fully read, the client receives `Connection: close` and the +/// server itself closes the connection after writing response `.end`. This protects +/// against clients that keep uploading request body bytes after the response has +/// completed (which would otherwise force the server to drain unbounded data) and +/// gives the client an explicit signal not to pipeline another request on the +/// connection. +/// +/// Informational (1xx) responses pass through unchanged and do not affect buffering +/// state. +@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) +final class HTTPKeepAliveHandler: ChannelDuplexHandler { + typealias InboundIn = HTTPRequestPart + typealias InboundOut = HTTPRequestPart + typealias OutboundIn = HTTPResponsePart + typealias OutboundOut = HTTPResponsePart + + private struct BufferedWrite { + var part: HTTPResponsePart + var promise: EventLoopPromise? + } + + private enum FinalResponseState { + /// No final response has been written yet for the current request. Informational + /// (1xx) responses may have been passed through. + case notStarted + /// The final response head was written before request `.end` arrived. The head + /// is buffered until either request `.end` arrives (flush as-is, transition to + /// streaming, keep-alive), or a response body or `.end` is written (flush head + /// with `Connection: close`, transition to streaming, close after response + /// `.end`). + case bufferingHead(BufferedWrite) + /// The response is being streamed directly. If `closeAfterResponseEnd` is + /// true, the connection will be closed once response `.end` is written. + case streaming + } + + /// `true` when the request `.end` has been received on the inbound side, or no + /// request is currently in flight. `false` between receiving a request `.head` + /// and its `.end`. + private var requestEndReceived: Bool = true + + /// `true` if we've committed to closing the connection after this response's + /// `.end` is written. Set when the buffered head is flushed because a response + /// body or `.end` was written before request `.end` arrived. Cleared when a new + /// request begins. + private var closeAfterResponseEnd: Bool = false + + private var finalResponseState: FinalResponseState = .notStarted + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let part = self.unwrapInboundIn(data) + switch part { + case .head: + // Begin a new request. (Any previous request's response must have + // completed already since HTTPServerPipelineHandler enforces ordering.) + self.requestEndReceived = false + self.closeAfterResponseEnd = false + self.finalResponseState = .notStarted + case .body: + break + case .end: + self.requestEndReceived = true + // If we've been buffering the response head, flush it now: we can keep the + // connection alive. + if case .bufferingHead(let buffered) = self.finalResponseState { + self.finalResponseState = .streaming + context.write(self.wrapOutboundOut(buffered.part), promise: buffered.promise) + context.flush() + } + } + context.fireChannelRead(data) + } + + func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { + let part = self.unwrapOutboundIn(data) + switch self.finalResponseState { + case .notStarted: + // Informational (1xx) responses pass through without affecting state: they + // don't conclude the response, so we remain in `.notStarted` until the + // final response head is written. + if case .head(let response) = part, response.status.kind == .informational { + context.write(data, promise: promise) + return + } + if self.requestEndReceived { + // Request fully read; stream the response directly. + self.finalResponseState = .streaming + context.write(data, promise: promise) + } else { + // Buffer just the head until we know whether a body will follow. + self.finalResponseState = .bufferingHead(BufferedWrite(part: part, promise: promise)) + } + case .bufferingHead(let buffered): + // Reaching this case means the handler is producing more of the response + // before request `.end` arrived (otherwise `channelRead(.end)` would have + // flushed the buffer and transitioned us to `.streaming`). Amend the head + // with `Connection: close` so the client knows not to reuse the + // connection, and remember to close after writing response `.end`. + var headPart = buffered.part + if case .head(var response) = headPart { + response.headerFields[.connection] = "close" + headPart = .head(response) + } + self.closeAfterResponseEnd = true + self.finalResponseState = .streaming + + switch part { + case .end: + // Response is just head + end (no body). Flush head + end and close. + context.write(self.wrapOutboundOut(headPart), promise: buffered.promise) + context.write(data, promise: promise) + context.flush() + context.close(mode: .all, promise: nil) + case .body: + // Flush head + body and continue streaming. We'll close once response + // `.end` is written. + context.write(self.wrapOutboundOut(headPart), promise: buffered.promise) + context.write(data, promise: promise) + context.flush() + case .head: + preconditionFailure( + "HTTPKeepAliveHandler received a second response head while the previous head was still buffered. " + + "A handler must only write one final response head per request." + ) + } + case .streaming: + context.write(data, promise: promise) + if case .end = part, self.closeAfterResponseEnd { + // The head we flushed earlier carried `Connection: close`; close + // the connection now that the response is complete. + context.flush() + context.close(mode: .all, promise: nil) + } + } + } +} diff --git a/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift b/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift index 58a0cc7..b670b61 100644 --- a/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift +++ b/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift @@ -99,6 +99,7 @@ extension NIOHTTPServer { ) -> EventLoopFuture> { channel.pipeline.configureHTTPServerPipeline().flatMapThrowing { try channel.pipeline.syncOperations.addHandler(HTTP1ToHTTPServerCodec(secure: false)) + try channel.pipeline.syncOperations.addHandler(HTTPKeepAliveHandler()) return try NIOAsyncChannel( wrappingChannelSynchronously: channel, diff --git a/Sources/NIOHTTPServer/NIOHTTPServer.swift b/Sources/NIOHTTPServer/NIOHTTPServer.swift index 14e613d..abb5901 100644 --- a/Sources/NIOHTTPServer/NIOHTTPServer.swift +++ b/Sources/NIOHTTPServer/NIOHTTPServer.swift @@ -84,10 +84,6 @@ public struct NIOHTTPServer: HTTPServer { public typealias RequestConcludingReader = HTTPRequestConcludingAsyncReader public typealias ResponseConcludingWriter = HTTPResponseConcludingAsyncWriter - /// Maximum number of bytes to drain from an unconsumed request body before - /// giving up and closing the connection. - private var maxDrainBytes: Int { 256 * 1024 } - let logger: Logger let configuration: NIOHTTPServerConfiguration @@ -219,21 +215,24 @@ public struct NIOHTTPServer: HTTPServer { /// Reads the next request head from the iterator. Returns `nil` if the connection is done or /// an unexpected part is received. + /// + /// Skips over leftover `.body` and `.end` parts from a previous request that the + /// handler didn't fully consume. The ``HTTPKeepAliveHandler`` separately ensures that connections are closed (with + /// `Connection: close`) when the server responds before the request `.end` arrives, preventing unbounded leftover state. func nextRequestHead( from iterator: inout NIOAsyncChannelInboundStream.AsyncIterator ) async throws -> HTTPRequest? { - switch try await iterator.next(isolation: #isolation) { - case .head(let request): - return request - case .body: - self.logger.debug("Unexpectedly received body on connection. Closing now.") - return nil - case .end: - self.logger.debug("Unexpectedly received end on connection. Closing now.") - return nil - case .none: - self.logger.trace("No more request parts on connection") - return nil + while true { + switch try await iterator.next(isolation: #isolation) { + case .head(let request): + return request + case .body, .end: + // Leftover parts from a previous request. Skip and look for the next head. + continue + case .none: + self.logger.trace("No more request parts on connection") + return nil + } } } @@ -284,45 +283,10 @@ public struct NIOHTTPServer: HTTPServer { return nil } - // Recover the iterator for potential connection reuse. - guard var recoveredIterator = readerState.takeIterator() else { - // The handler started reading the request body but didn't finish. - // The iterator was consumed by the reader and not returned. - return nil - } - - // If the handler didn't fully consume the request body, drain the remaining - // parts so the iterator is positioned at the start of the next request. - // To prevent an attacker from keeping the connection in an infinite draining - // state, we only drain up to `Self.bytesDrained`. If more remains, close the connection. - if !readerState.wrapped.withLock({ $0.finishedReading }) { - var bytesDrained = 0 - do { - drainLoop: while true { - switch try await recoveredIterator.next(isolation: #isolation) { - case .head: - self.logger.debug( - "Unexpectedly received request head while draining unconsumed request body." - ) - return nil - case .body(let buffer): - bytesDrained += buffer.readableBytes - if bytesDrained > self.maxDrainBytes { - return nil - } - continue drainLoop - case .end: - break drainLoop - case .none: - return nil - } - } - } catch { - return nil - } - } - - return recoveredIterator + // Recover the iterator for potential connection reuse. If the handler started + // reading the request body but didn't finish, the iterator was consumed by the + // reader and not returned, so we can't reuse the connection. + return readerState.takeIterator() } /// Fail the listening address promise if the server is shutting down before it began listening. diff --git a/Tests/NIOHTTPServerTests/HTTPKeepAliveHandlerTests.swift b/Tests/NIOHTTPServerTests/HTTPKeepAliveHandlerTests.swift new file mode 100644 index 0000000..461c047 --- /dev/null +++ b/Tests/NIOHTTPServerTests/HTTPKeepAliveHandlerTests.swift @@ -0,0 +1,493 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift HTTP Server open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift HTTP Server project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift HTTP Server project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import HTTPAPIs +import HTTPTypes +import Logging +import NIOCore +import NIOHTTPTypes +import NIOPosix +import Synchronization +import Testing + +@testable import NIOHTTPServer + +@Suite +struct HTTPKeepAliveHandlerTests { + let serverLogger = Logger(label: "HTTPKeepAliveHandlerTests") + + /// Verifies the happy case: when a client pipelines multiple HTTP/1.1 requests + /// on a single connection, all responses are returned in order and the connection + /// stays alive (no `Connection: close`). + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + @Test("Pipelined requests on a single connection all succeed") + func testPipelinedRequests() async throws { + let server = NIOHTTPServer( + logger: self.serverLogger, + configuration: try .init( + bindTarget: .hostAndPort(host: "127.0.0.1", port: 0), + supportedHTTPVersions: [.http1_1], + transportSecurity: .plaintext + ) + ) + + let requestCount = 5 + + try await NIOHTTPServerTests.withServer( + server: server, + serverHandler: HTTPServerClosureRequestHandler { request, _, reader, sender in + try await NIOHTTPServerTests.echoResponse(readUpTo: 1024, reader: reader, sender: sender) + }, + body: { serverAddress in + let client = try await ClientBootstrap(group: .singletonMultiThreadedEventLoopGroup) + .connectToTestHTTP1Server(at: serverAddress) + + try await client.executeThenClose { inbound, outbound in + // Pipeline all requests up-front, then read all responses. + for i in 1...requestCount { + try await outbound.write( + .head(.init(method: .post, scheme: "http", authority: "", path: "/\(i)")) + ) + try await outbound.write(.body(ByteBuffer(string: "request-\(i)"))) + try await outbound.write(.end(nil)) + } + + var responseIterator = inbound.makeAsyncIterator() + for i in 1...requestCount { + let headPart = try await responseIterator.next() + guard case .head(let response) = headPart else { + Issue.record("Expected .head for request \(i), got \(String(describing: headPart))") + return + } + #expect(response.status == .ok) + // Connection should remain keep-alive — no Connection: close header. + #expect( + response.headerFields[.connection] != "close", + "Response \(i) unexpectedly had Connection: close: \(response.headerFields)" + ) + + // Drain body parts until .end. + var collectedBody = ByteBuffer() + while true { + let part = try await responseIterator.next() + if case .body(let buf) = part { + collectedBody.writeImmutableBuffer(buf) + } else if case .end = part { + break + } else { + Issue.record("Unexpected part for request \(i): \(String(describing: part))") + return + } + } + #expect(collectedBody == ByteBuffer(string: "request-\(i)")) + } + } + } + ) + } + + /// Verifies that when the handler writes a short response (head + end, no body) + /// before the request `.end` has arrived, the response head includes a + /// `Connection: close` header and the server closes the connection. + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + @Test("Server sends head+end (no body) before request .end — Connection: close in header") + func testShortResponseBeforeRequestEnd() async throws { + let server = NIOHTTPServer( + logger: self.serverLogger, + configuration: try .init( + bindTarget: .hostAndPort(host: "127.0.0.1", port: 0), + supportedHTTPVersions: [.http1_1], + transportSecurity: .plaintext + ) + ) + + try await NIOHTTPServerTests.withServer( + server: server, + serverHandler: HTTPServerClosureRequestHandler { _, _, reader, sender in + // Read just one byte of the body to confirm we got past the head, then + // write a body-less response (head + end only). Because the head is + // still buffered by the keep-alive handler when `.end` is written, the + // handler amends the head with `Connection: close` before flushing. + let _ = try await reader.consumeAndConclude { partsReader in + var partsReader = partsReader + try await partsReader.read(maximumCount: 1) { _ in } + } + let writer = try await sender.send( + .init(status: .ok, headerFields: [.contentLength: "0"]) + ) + try await writer.writeAndConclude("".utf8.span, finalElement: nil) + }, + body: { serverAddress in + let client = try await ClientBootstrap(group: .singletonMultiThreadedEventLoopGroup) + .connectToTestHTTP1Server(at: serverAddress) + + try await client.executeThenClose { inbound, outbound in + try await outbound.write( + .head(.init(method: .post, scheme: "http", authority: "", path: "/")) + ) + try await outbound.write(.body(ByteBuffer(string: "x"))) + + // Read the response: should have Connection: close in the head. + var responseIterator = inbound.makeAsyncIterator() + let headPart = try await responseIterator.next() + guard case .head(let response) = headPart else { + Issue.record("Expected .head, got \(String(describing: headPart))") + return + } + #expect(response.status == .ok) + #expect( + response.headerFields[.connection] == "close", + "Expected Connection: close, got headers: \(response.headerFields)" + ) + + // Drain until .end, then verify channel closed. + var sawEnd = false + while !sawEnd { + let part = try await responseIterator.next() + switch part { + case .body: + continue + case .end: + sawEnd = true + case .none: + Issue.record("Stream ended before response .end") + return + case .head: + Issue.record("Unexpected second .head: \(String(describing: part))") + return + } + } + + let next = try await responseIterator.next() + #expect(next == nil, "Expected channel to be closed; got \(String(describing: next))") + } + } + ) + } + + /// Verifies that informational (1xx) responses pass through the keep-alive handler + /// without affecting buffering state. The handler writes a `100 Continue` before + /// the request `.end` has arrived; the client must receive that informational + /// response immediately (without waiting for request `.end`), and the connection + /// must remain alive after the final response. + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + @Test("Informational (1xx) responses pass through without buffering or closing") + func testInformationalResponsePassesThrough() async throws { + let server = NIOHTTPServer( + logger: self.serverLogger, + configuration: try .init( + bindTarget: .hostAndPort(host: "127.0.0.1", port: 0), + supportedHTTPVersions: [.http1_1], + transportSecurity: .plaintext + ) + ) + + try await NIOHTTPServerTests.withServer( + server: server, + serverHandler: HTTPServerClosureRequestHandler { request, _, reader, sender in + // Only the first request exercises informational semantics; the + // pipelined second request (path "/second") just verifies keep-alive. + if request.path == "/" { + try await sender.sendInformational(.init(status: .continue)) + } + + // Read the full request body (until .end). + let _ = try await reader.consumeAndConclude { partsReader in + var partsReader = partsReader + try await partsReader.collect(upTo: 1024) { _ in } + } + + // Write the final response. + let writer = try await sender.send( + .init(status: .ok, headerFields: [.contentLength: "5"]) + ) + try await writer.writeAndConclude("hello".utf8.span, finalElement: nil) + }, + body: { serverAddress in + let client = try await ClientBootstrap(group: .singletonMultiThreadedEventLoopGroup) + .connectToTestHTTP1Server(at: serverAddress) + + try await client.executeThenClose { inbound, outbound in + try await outbound.write( + .head(.init(method: .post, scheme: "http", authority: "", path: "/")) + ) + + // Read the 100 Continue before sending the request body — this + // only works if the informational response was forwarded without + // being buffered by the keep-alive handler. + var responseIterator = inbound.makeAsyncIterator() + let informationalPart = try await responseIterator.next() + guard case .head(let informational) = informationalPart else { + Issue.record("Expected informational .head, got \(String(describing: informationalPart))") + return + } + #expect(informational.status == .continue) + + // Now send the body and end so the server can write the final + // response. + try await outbound.write(.body(ByteBuffer(string: "hello"))) + try await outbound.write(.end(nil)) + + // Read the final 200 OK response. + let finalHeadPart = try await responseIterator.next() + guard case .head(let response) = finalHeadPart else { + Issue.record("Expected final .head, got \(String(describing: finalHeadPart))") + return + } + #expect(response.status == .ok) + #expect( + response.headerFields[.connection] != "close", + "Expected keep-alive after informational flow; got headers: \(response.headerFields)" + ) + + // Drain body and end. + var sawEnd = false + while !sawEnd { + let part = try await responseIterator.next() + switch part { + case .body: + continue + case .end: + sawEnd = true + case .none: + Issue.record("Stream ended before response .end") + return + case .head: + Issue.record("Unexpected .head: \(String(describing: part))") + return + } + } + + // Pipeline a second request to verify keep-alive actually works. + try await outbound.write( + .head(.init(method: .get, scheme: "http", authority: "", path: "/second")) + ) + try await outbound.write(.end(nil)) + + let secondHead = try await responseIterator.next() + guard case .head(let secondResponse) = secondHead else { + Issue.record("Expected second .head, got \(String(describing: secondHead))") + return + } + #expect(secondResponse.status == .ok) + } + } + ) + } + + /// Verifies bidirectional streaming over HTTP/1.1: the handler writes the + /// response head and body chunks while concurrently reading the request body. + /// The client and server ping-pong — the client sends one byte, waits for its + /// echo, then sends the next. This only works if the keep-alive handler flushes + /// the response head as soon as a body chunk is written, rather than buffering + /// everything until request `.end` arrives. Because the head is flushed before + /// request `.end` arrives, the response carries `Connection: close` and the + /// server closes the connection after writing response `.end`. + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + @Test("Bidirectional streaming works — head is flushed (with Connection: close) when a body part is written") + func testBidirectionalStreamingOverHTTP1() async throws { + let server = NIOHTTPServer( + logger: self.serverLogger, + configuration: try .init( + bindTarget: .hostAndPort(host: "127.0.0.1", port: 0), + supportedHTTPVersions: [.http1_1], + transportSecurity: .plaintext + ) + ) + + try await NIOHTTPServerTests.withServer( + server: server, + serverHandler: HTTPServerClosureRequestHandler { _, _, reader, sender in + // Echo request body parts back as response body parts, concurrently + // with reading from the request body. + var maybeReader = Optional(reader) + let writer = try await sender.send(.init(status: .ok)) + try await writer.produceAndConclude { responseBodyWriter in + var responseBodyWriter = responseBodyWriter + let reader = maybeReader.take()! + let _ = try await reader.consumeAndConclude { bodyReader in + try await bodyReader.forEach { span in + try await responseBodyWriter.write(span) + } + } + return nil + } + }, + body: { serverAddress in + let client = try await ClientBootstrap(group: .singletonMultiThreadedEventLoopGroup) + .connectToTestHTTP1Server(at: serverAddress) + + try await client.executeThenClose { inbound, outbound in + try await outbound.write( + .head(.init(method: .post, scheme: "http", authority: "", path: "/")) + ) + // Write the first body byte before reading the response head, so + // the server has something to echo — this unblocks the buffered + // head in the keep-alive handler. This mirrors how real + // bidirectional clients (like the conformance `/echo` test) work. + let chunkCount = 5 + let firstByte = ByteBuffer(bytes: [UInt8(ascii: "A")]) + try await outbound.write(.body(firstByte)) + + var responseIterator = inbound.makeAsyncIterator() + let headPart = try await responseIterator.next() + guard case .head(let response) = headPart else { + Issue.record("Expected .head, got \(String(describing: headPart))") + return + } + #expect(response.status == .ok) + // The head was flushed because a body part was written before + // request `.end` arrived, so it carries `Connection: close`. + #expect( + response.headerFields[.connection] == "close", + "Expected Connection: close on bidirectional flow; got \(response.headerFields)" + ) + + // Read the echo of the first byte. + let firstEcho = try await responseIterator.next() + #expect(firstEcho == .body(firstByte)) + + // Ping-pong: write a byte, read its echo. + for i in 1...makeStream() + + try await NIOHTTPServerTests.withServer( + server: server, + serverHandler: HTTPServerClosureRequestHandler { _, _, reader, sender in + // Send the response head immediately, before reading anything from + // the request body. At this point request `.end` has NOT been sent + // by the client, so the keep-alive handler will buffer the head. + let writer = try await sender.send( + .init(status: .ok, headerFields: [.contentLength: "5"]) + ) + responseHeadWritten.yield() + responseHeadWritten.finish() + + // Now read the request body + end. + let _ = try await reader.consumeAndConclude { partsReader in + var partsReader = partsReader + try await partsReader.collect(upTo: 1024) { _ in } + } + + // Write the response body + end. + try await writer.writeAndConclude("hello".utf8.span, finalElement: nil) + }, + body: { serverAddress in + let client = try await ClientBootstrap(group: .singletonMultiThreadedEventLoopGroup) + .connectToTestHTTP1Server(at: serverAddress) + + try await client.executeThenClose { inbound, outbound in + // Send only the head — do NOT send body or end yet. + try await outbound.write( + .head(.init(method: .post, scheme: "http", authority: "", path: "/")) + ) + + // Wait for the handler to write the response head. Because the + // request `.end` has not yet been sent, the response head is + // currently buffered by the keep-alive handler. + var signalIterator = responseHeadWrittenStream.makeAsyncIterator() + _ = await signalIterator.next() + + // Now send the body + end. This should cause the keep-alive + // handler to flush the buffered response head and enter the + // streaming state — the connection must remain alive. + try await outbound.write(.body(ByteBuffer(string: "hello"))) + try await outbound.write(.end(nil)) + + // Read the response. + var responseIterator = inbound.makeAsyncIterator() + let headPart = try await responseIterator.next() + guard case .head(let response) = headPart else { + Issue.record("Expected .head, got \(String(describing: headPart))") + return + } + #expect(response.status == .ok) + #expect( + response.headerFields[.connection] != "close", + "Expected keep-alive, got headers: \(response.headerFields)" + ) + + // Drain body and end. + var sawEnd = false + while !sawEnd { + let part = try await responseIterator.next() + switch part { + case .body: + continue + case .end: + sawEnd = true + case .none: + Issue.record("Stream ended before response .end") + return + case .head: + Issue.record("Unexpected second .head: \(String(describing: part))") + return + } + } + + // Pipeline a second request to verify keep-alive actually works. + try await outbound.write( + .head(.init(method: .get, scheme: "http", authority: "", path: "/second")) + ) + try await outbound.write(.end(nil)) + + let secondHead = try await responseIterator.next() + guard case .head(let secondResponse) = secondHead else { + Issue.record("Expected second .head, got \(String(describing: secondHead))") + return + } + #expect(secondResponse.status == .ok) + } + } + ) + } +} From 4dba699141d03d53faa2b2e4bebbf050c63d382f Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Fri, 15 May 2026 17:08:44 +0100 Subject: [PATCH 12/14] Format --- Sources/NIOHTTPServer/HTTPKeepAliveHandler.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/NIOHTTPServer/HTTPKeepAliveHandler.swift b/Sources/NIOHTTPServer/HTTPKeepAliveHandler.swift index e3e54e2..cbbe7bd 100644 --- a/Sources/NIOHTTPServer/HTTPKeepAliveHandler.swift +++ b/Sources/NIOHTTPServer/HTTPKeepAliveHandler.swift @@ -155,7 +155,7 @@ final class HTTPKeepAliveHandler: ChannelDuplexHandler { case .head: preconditionFailure( "HTTPKeepAliveHandler received a second response head while the previous head was still buffered. " - + "A handler must only write one final response head per request." + + "A handler must only write one final response head per request." ) } case .streaming: From dc7f30e7b1e3cc6ea439314b811f96d2a0f239f7 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Mon, 18 May 2026 16:26:28 +0100 Subject: [PATCH 13/14] Format --- Tests/NIOHTTPServerTests/HTTPKeepAliveHandlerTests.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/Tests/NIOHTTPServerTests/HTTPKeepAliveHandlerTests.swift b/Tests/NIOHTTPServerTests/HTTPKeepAliveHandlerTests.swift index 461c047..189b8ee 100644 --- a/Tests/NIOHTTPServerTests/HTTPKeepAliveHandlerTests.swift +++ b/Tests/NIOHTTPServerTests/HTTPKeepAliveHandlerTests.swift @@ -317,6 +317,7 @@ struct HTTPKeepAliveHandlerTests { var responseBodyWriter = responseBodyWriter let reader = maybeReader.take()! let _ = try await reader.consumeAndConclude { bodyReader in + // swift-format-ignore: ReplaceForEachWithForLoop try await bodyReader.forEach { span in try await responseBodyWriter.write(span) } From e3cfba5cb76479d38145db0e3723f0acd75e0c1b Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 20 May 2026 16:04:14 +0100 Subject: [PATCH 14/14] Change when connection:close is appended to response --- .../NIOHTTPServer/HTTPKeepAliveHandler.swift | 164 ++++++++++-------- .../HTTPKeepAliveHandlerTests.swift | 79 ++++----- 2 files changed, 132 insertions(+), 111 deletions(-) diff --git a/Sources/NIOHTTPServer/HTTPKeepAliveHandler.swift b/Sources/NIOHTTPServer/HTTPKeepAliveHandler.swift index cbbe7bd..6df480a 100644 --- a/Sources/NIOHTTPServer/HTTPKeepAliveHandler.swift +++ b/Sources/NIOHTTPServer/HTTPKeepAliveHandler.swift @@ -19,26 +19,21 @@ import NIOHTTPTypes /// A NIO channel handler that ensures HTTP/1.1 keep-alive semantics are honored when /// the server starts writing a response before the request body has been fully read. /// -/// The handler buffers only the outbound response head whenever it is written before -/// the request `.end` has been received. The buffered head is released in one of three -/// ways: +/// The handler buffers final response parts (head + any body fragments + end) when +/// they are written before the request `.end` has been received. The buffer is +/// released at the next deadline: /// -/// - **Request `.end` arrives before any response part is written**: the head is -/// flushed as-is and the response streams normally. The connection can be reused. -/// - **A response body part is written before request `.end` arrives**: the buffered -/// head is amended with `Connection: close`, then flushed; subsequent parts stream -/// directly; once response `.end` is written, the connection is closed. -/// - **Response `.end` is written while the head is still buffered (no body written)**: -/// the head is amended with `Connection: close`, flushed, followed by `.end`, then -/// the connection is closed. +/// - **`channelReadComplete`**: the end of an inbound read cycle. +/// - **`flush`**: an upstream writer (e.g. `NIOAsyncChannelOutboundWriter`) forced a +/// flush. /// -/// Any time the head is flushed *because* the handler started producing the response -/// before the request was fully read, the client receives `Connection: close` and the -/// server itself closes the connection after writing response `.end`. This protects -/// against clients that keep uploading request body bytes after the response has -/// completed (which would otherwise force the server to drain unbounded data) and -/// gives the client an explicit signal not to pipeline another request on the -/// connection. +/// At each deadline, if request `.end` has arrived, the buffer is flushed as-is and +/// the connection is reusable. If request `.end` has *not* arrived, the head is +/// amended with `Connection: close`, the buffer is flushed, and the connection is +/// closed once response `.end` is written. This protects against clients that keep +/// uploading request body bytes after the response has completed (which would +/// otherwise force the server to drain unbounded data) and gives the client an +/// explicit signal not to pipeline another request on the connection. /// /// Informational (1xx) responses pass through unchanged and do not affect buffering /// state. @@ -55,17 +50,18 @@ final class HTTPKeepAliveHandler: ChannelDuplexHandler { } private enum FinalResponseState { - /// No final response has been written yet for the current request. Informational - /// (1xx) responses may have been passed through. + /// No final response part has been written yet for the current request. + /// Informational (1xx) responses may have been passed through. case notStarted - /// The final response head was written before request `.end` arrived. The head - /// is buffered until either request `.end` arrives (flush as-is, transition to - /// streaming, keep-alive), or a response body or `.end` is written (flush head - /// with `Connection: close`, transition to streaming, close after response - /// `.end`). - case bufferingHead(BufferedWrite) - /// The response is being streamed directly. If `closeAfterResponseEnd` is - /// true, the connection will be closed once response `.end` is written. + /// The final response head was written before request `.end` arrived. The + /// head — and any additional response parts (body fragments, `.end`) the + /// handler wrote in the same window — are buffered until the next deadline + /// (`channelReadComplete` or `flush`), at which point we decide whether to + /// keep the connection alive or amend the head with `Connection: close`. + case buffering(head: BufferedWrite, additional: [BufferedWrite]) + /// The head has been flushed; remaining parts stream directly. If + /// `closeAfterResponseEnd` is true, the head carried `Connection: close` + /// and we close once response `.end` is written. case streaming } @@ -75,9 +71,9 @@ final class HTTPKeepAliveHandler: ChannelDuplexHandler { private var requestEndReceived: Bool = true /// `true` if we've committed to closing the connection after this response's - /// `.end` is written. Set when the buffered head is flushed because a response - /// body or `.end` was written before request `.end` arrived. Cleared when a new - /// request begins. + /// `.end` is written. Set when the buffer is flushed while request `.end` has + /// not yet arrived (so we add `Connection: close`). Cleared when a new request + /// begins. private var closeAfterResponseEnd: Bool = false private var finalResponseState: FinalResponseState = .notStarted @@ -95,17 +91,22 @@ final class HTTPKeepAliveHandler: ChannelDuplexHandler { break case .end: self.requestEndReceived = true - // If we've been buffering the response head, flush it now: we can keep the - // connection alive. - if case .bufferingHead(let buffered) = self.finalResponseState { - self.finalResponseState = .streaming - context.write(self.wrapOutboundOut(buffered.part), promise: buffered.promise) - context.flush() - } } context.fireChannelRead(data) } + func channelReadComplete(context: ChannelHandlerContext) { + // End of an inbound read cycle: this is the deadline for deciding whether + // the buffered response can be sent as-is (keep-alive) or must include + // `Connection: close`. If request `.end` arrived during the cycle the head + // is flushed unchanged; otherwise we amend the head and close after + // response `.end`. + if case .buffering = self.finalResponseState { + self.flushBuffer(context: context) + } + context.fireChannelReadComplete() + } + func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { let part = self.unwrapOutboundIn(data) switch self.finalResponseState { @@ -122,42 +123,16 @@ final class HTTPKeepAliveHandler: ChannelDuplexHandler { self.finalResponseState = .streaming context.write(data, promise: promise) } else { - // Buffer just the head until we know whether a body will follow. - self.finalResponseState = .bufferingHead(BufferedWrite(part: part, promise: promise)) - } - case .bufferingHead(let buffered): - // Reaching this case means the handler is producing more of the response - // before request `.end` arrived (otherwise `channelRead(.end)` would have - // flushed the buffer and transitioned us to `.streaming`). Amend the head - // with `Connection: close` so the client knows not to reuse the - // connection, and remember to close after writing response `.end`. - var headPart = buffered.part - if case .head(var response) = headPart { - response.headerFields[.connection] = "close" - headPart = .head(response) - } - self.closeAfterResponseEnd = true - self.finalResponseState = .streaming - - switch part { - case .end: - // Response is just head + end (no body). Flush head + end and close. - context.write(self.wrapOutboundOut(headPart), promise: buffered.promise) - context.write(data, promise: promise) - context.flush() - context.close(mode: .all, promise: nil) - case .body: - // Flush head + body and continue streaming. We'll close once response - // `.end` is written. - context.write(self.wrapOutboundOut(headPart), promise: buffered.promise) - context.write(data, promise: promise) - context.flush() - case .head: - preconditionFailure( - "HTTPKeepAliveHandler received a second response head while the previous head was still buffered. " - + "A handler must only write one final response head per request." + // Start buffering with the head. Additional parts (body, end) the + // handler may write before the next deadline are appended below. + self.finalResponseState = .buffering( + head: BufferedWrite(part: part, promise: promise), + additional: [] ) } + case .buffering(let head, var additional): + additional.append(BufferedWrite(part: part, promise: promise)) + self.finalResponseState = .buffering(head: head, additional: additional) case .streaming: context.write(data, promise: promise) if case .end = part, self.closeAfterResponseEnd { @@ -168,4 +143,49 @@ final class HTTPKeepAliveHandler: ChannelDuplexHandler { } } } + + func flush(context: ChannelHandlerContext) { + // An upstream writer forced a flush. Same deadline as `channelReadComplete`: + // release any buffered parts, with `Connection: close` if request `.end` + // hasn't arrived. + if case .buffering = self.finalResponseState { + self.flushBuffer(context: context) + } + context.flush() + } + + /// Releases buffered response parts to the pipeline. If request `.end` has not + /// yet arrived, amend the head with `Connection: close` and arrange to close + /// the connection once response `.end` is written. + private func flushBuffer(context: ChannelHandlerContext) { + guard case .buffering(var head, let additional) = self.finalResponseState else { return } + + if !self.requestEndReceived { + // Amend the head with `Connection: close` before flushing. + if case .head(var response) = head.part { + response.headerFields[.connection] = "close" + head.part = .head(response) + } + self.closeAfterResponseEnd = true + } + + self.finalResponseState = .streaming + + context.write(self.wrapOutboundOut(head.part), promise: head.promise) + var sawEnd = false + for write in additional { + context.write(self.wrapOutboundOut(write.part), promise: write.promise) + if case .end = write.part { + sawEnd = true + } + } + context.flush() + + if sawEnd && self.closeAfterResponseEnd { + // The response was fully buffered (head + ... + end) and we have to + // close. Close now (the flush above ensured the writes reached the + // wire). + context.close(mode: .all, promise: nil) + } + } } diff --git a/Tests/NIOHTTPServerTests/HTTPKeepAliveHandlerTests.swift b/Tests/NIOHTTPServerTests/HTTPKeepAliveHandlerTests.swift index 189b8ee..5a74a1f 100644 --- a/Tests/NIOHTTPServerTests/HTTPKeepAliveHandlerTests.swift +++ b/Tests/NIOHTTPServerTests/HTTPKeepAliveHandlerTests.swift @@ -381,12 +381,19 @@ struct HTTPKeepAliveHandlerTests { ) } - /// Verifies that when the response head is written before request `.end` arrives, - /// and the request `.end` arrives before any response body is written, the - /// keep-alive handler flushes the buffered head and keeps the connection alive. + /// Verifies that if an inbound read cycle ends without the request `.end` having + /// arrived while the handler is mid-response, the buffered response head is + /// amended with `Connection: close` and flushed, and the server closes the + /// connection once response `.end` is written. + /// + /// We force the timing by having the handler write the response head, signal + /// the client to send a body chunk (without `.end`), and then wait. When the + /// server reads the body chunk, the read cycle ends with the head still + /// buffered and request `.end` still missing — the keep-alive handler must add + /// `Connection: close`. @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - @Test("Response head buffered, request .end arrives first — keep-alive works") - func testResponseHeadBufferedThenRequestEndArrives_KeepsAlive() async throws { + @Test("Read cycle ends without request .end while head is buffered — Connection: close added") + func testReadCycleEndsWithoutRequestEnd_AddsConnectionClose() async throws { let server = NIOHTTPServer( logger: self.serverLogger, configuration: try .init( @@ -396,30 +403,30 @@ struct HTTPKeepAliveHandlerTests { ) ) - // Signals from the handler to the client. We use these to force the exact - // sequence: handler writes response head BEFORE the client sends request - // `.end`, so the response head is buffered by the keep-alive handler. let (responseHeadWrittenStream, responseHeadWritten) = AsyncStream.makeStream() + let (handlerCanFinishStream, handlerCanFinish) = AsyncStream.makeStream() try await NIOHTTPServerTests.withServer( server: server, serverHandler: HTTPServerClosureRequestHandler { _, _, reader, sender in - // Send the response head immediately, before reading anything from - // the request body. At this point request `.end` has NOT been sent - // by the client, so the keep-alive handler will buffer the head. + // Write the response head before reading anything. The keep-alive + // handler buffers it because request `.end` hasn't arrived. let writer = try await sender.send( .init(status: .ok, headerFields: [.contentLength: "5"]) ) responseHeadWritten.yield() responseHeadWritten.finish() - // Now read the request body + end. + // Wait for the test to confirm it saw `Connection: close` before + // we drain the request and finish the response. + var canFinishIterator = handlerCanFinishStream.makeAsyncIterator() + _ = await canFinishIterator.next() + + // Drain the request body + end and then write the response body + end. let _ = try await reader.consumeAndConclude { partsReader in var partsReader = partsReader try await partsReader.collect(upTo: 1024) { _ in } } - - // Write the response body + end. try await writer.writeAndConclude("hello".utf8.span, finalElement: nil) }, body: { serverAddress in @@ -427,24 +434,22 @@ struct HTTPKeepAliveHandlerTests { .connectToTestHTTP1Server(at: serverAddress) try await client.executeThenClose { inbound, outbound in - // Send only the head — do NOT send body or end yet. + // Send only the head. try await outbound.write( .head(.init(method: .post, scheme: "http", authority: "", path: "/")) ) - // Wait for the handler to write the response head. Because the - // request `.end` has not yet been sent, the response head is - // currently buffered by the keep-alive handler. + // Wait for the handler to write the response head. var signalIterator = responseHeadWrittenStream.makeAsyncIterator() _ = await signalIterator.next() - // Now send the body + end. This should cause the keep-alive - // handler to flush the buffered response head and enter the - // streaming state — the connection must remain alive. - try await outbound.write(.body(ByteBuffer(string: "hello"))) - try await outbound.write(.end(nil)) + // Send a single body byte, WITHOUT request `.end`. The server + // will see this as its own read cycle that ends with the + // request `.end` still missing — triggering the + // `Connection: close` amendment. + try await outbound.write(.body(ByteBuffer(string: "x"))) - // Read the response. + // Read the response head. It must carry `Connection: close`. var responseIterator = inbound.makeAsyncIterator() let headPart = try await responseIterator.next() guard case .head(let response) = headPart else { @@ -453,11 +458,16 @@ struct HTTPKeepAliveHandlerTests { } #expect(response.status == .ok) #expect( - response.headerFields[.connection] != "close", - "Expected keep-alive, got headers: \(response.headerFields)" + response.headerFields[.connection] == "close", + "Expected Connection: close after read cycle ended without request .end; got \(response.headerFields)" ) - // Drain body and end. + // Let the handler finish and send the rest of the request. + handlerCanFinish.yield() + handlerCanFinish.finish() + try await outbound.write(.end(nil)) + + // Drain the response body + end. var sawEnd = false while !sawEnd { let part = try await responseIterator.next() @@ -475,18 +485,9 @@ struct HTTPKeepAliveHandlerTests { } } - // Pipeline a second request to verify keep-alive actually works. - try await outbound.write( - .head(.init(method: .get, scheme: "http", authority: "", path: "/second")) - ) - try await outbound.write(.end(nil)) - - let secondHead = try await responseIterator.next() - guard case .head(let secondResponse) = secondHead else { - Issue.record("Expected second .head, got \(String(describing: secondHead))") - return - } - #expect(secondResponse.status == .ok) + // The server should have closed the connection. + let next = try await responseIterator.next() + #expect(next == nil, "Expected channel close after response; got \(String(describing: next))") } } )