diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 53ce63f..00c227f 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -16,7 +16,7 @@ jobs: with: linux_6_0_enabled: false linux_6_1_enabled: false - linux_6_2_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable" + linux_6_2_enabled: false linux_6_3_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable" linux_nightly_next_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable" linux_nightly_main_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable" @@ -31,3 +31,5 @@ jobs: with: linux_6_0_enabled: false linux_6_1_enabled: false + linux_6_2_enabled: false + linux_6_3_enabled: true diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 5c82e85..39e7f61 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -12,7 +12,7 @@ jobs: name: Soundness uses: swiftlang/github-workflows/.github/workflows/soundness.yml@0.0.7 with: - api_breakage_check_container_image: "swift:6.2-noble" + api_breakage_check_container_image: "swift:6.3-noble" format_check_container_image: "swiftlang/swift:nightly-main-noble" # Needed due to https://github.com/swiftlang/swift-format/issues/1081 license_header_check_project_name: "Swift HTTP Server" @@ -23,9 +23,7 @@ jobs: linux_5_10_enabled: false linux_6_0_enabled: false linux_6_1_enabled: false - # linux_6_1_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error" - linux_6_2_enabled: true - linux_6_2_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error" + linux_6_2_enabled: false linux_6_3_enabled: true linux_6_3_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error" linux_nightly_next_arguments_override: "--explicit-target-dependency-import-check error" @@ -38,7 +36,7 @@ jobs: linux_5_10_enabled: false linux_6_0_enabled: false linux_6_1_enabled: false - linux_6_2_enabled: true + linux_6_2_enabled: false linux_6_3_enabled: true static-sdk: @@ -52,4 +50,5 @@ jobs: linux_5_10_enabled: false linux_6_0_enabled: false linux_6_1_enabled: false - linux_6_2_enabled: true + linux_6_2_enabled: false + linux_6_3_enabled: true diff --git a/Sources/NIOHTTPServer/HTTPKeepAliveHandler.swift b/Sources/NIOHTTPServer/HTTPKeepAliveHandler.swift new file mode 100644 index 0000000..c51636c --- /dev/null +++ b/Sources/NIOHTTPServer/HTTPKeepAliveHandler.swift @@ -0,0 +1,191 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift HTTP Server open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift HTTP Server project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift HTTP Server project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import HTTPTypes +import NIOCore +import NIOHTTPTypes + +/// A NIO channel handler that ensures HTTP/1.1 keep-alive semantics are honored when +/// the server starts writing a response before the request body has been fully read. +/// +/// The handler buffers final response parts (head + any body fragments + end) when +/// they are written before the request `.end` has been received. The buffer is +/// released at the next deadline: +/// +/// - **`channelReadComplete`**: the end of an inbound read cycle. +/// - **`flush`**: an upstream writer (e.g. `NIOAsyncChannelOutboundWriter`) forced a +/// flush. +/// +/// At each deadline, if request `.end` has arrived, the buffer is flushed as-is and +/// the connection is reusable. If request `.end` has *not* arrived, the head is +/// amended with `Connection: close`, the buffer is flushed, and the connection is +/// closed once response `.end` is written. This protects against clients that keep +/// uploading request body bytes after the response has completed (which would +/// otherwise force the server to drain unbounded data) and gives the client an +/// explicit signal not to pipeline another request on the connection. +/// +/// Informational (1xx) responses pass through unchanged and do not affect buffering +/// state. +@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) +final class HTTPKeepAliveHandler: ChannelDuplexHandler { + typealias InboundIn = HTTPRequestPart + typealias InboundOut = HTTPRequestPart + typealias OutboundIn = HTTPResponsePart + typealias OutboundOut = HTTPResponsePart + + private struct BufferedWrite { + var part: HTTPResponsePart + var promise: EventLoopPromise? + } + + private enum FinalResponseState { + /// No final response part has been written yet for the current request. + /// Informational (1xx) responses may have been passed through. + case notStarted + /// The final response head was written before request `.end` arrived. The + /// head — and any additional response parts (body fragments, `.end`) the + /// handler wrote in the same window — are buffered until the next deadline + /// (`channelReadComplete` or `flush`), at which point we decide whether to + /// keep the connection alive or amend the head with `Connection: close`. + case buffering(head: BufferedWrite, additional: [BufferedWrite]) + /// The head has been flushed; remaining parts stream directly. If + /// `closeAfterResponseEnd` is true, the head carried `Connection: close` + /// and we close once response `.end` is written. + case streaming + } + + /// `true` when the request `.end` has been received on the inbound side, or no + /// request is currently in flight. `false` between receiving a request `.head` + /// and its `.end`. + private var requestEndReceived: Bool = true + + /// `true` if we've committed to closing the connection after this response's + /// `.end` is written. Set when the buffer is flushed while request `.end` has + /// not yet arrived (so we add `Connection: close`). Cleared when a new request + /// begins. + private var closeAfterResponseEnd: Bool = false + + private var finalResponseState: FinalResponseState = .notStarted + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let part = self.unwrapInboundIn(data) + switch part { + case .head: + // Begin a new request. (Any previous request's response must have + // completed already since HTTPServerPipelineHandler enforces ordering.) + self.requestEndReceived = false + self.closeAfterResponseEnd = false + self.finalResponseState = .notStarted + case .body: + break + case .end: + self.requestEndReceived = true + } + context.fireChannelRead(data) + } + + func channelReadComplete(context: ChannelHandlerContext) { + // End of an inbound read cycle: this is the deadline for deciding whether + // the buffered response can be sent as-is (keep-alive) or must include + // `Connection: close`. If request `.end` arrived during the cycle the head + // is flushed unchanged; otherwise we amend the head and close after + // response `.end`. + if case .buffering = self.finalResponseState { + self.flushBuffer(context: context) + } + context.fireChannelReadComplete() + } + + func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { + let part = self.unwrapOutboundIn(data) + switch self.finalResponseState { + case .notStarted: + // Informational (1xx) responses pass through without affecting state: they + // don't conclude the response, so we remain in `.notStarted` until the + // final response head is written. + if case .head(let response) = part, response.status.kind == .informational { + context.write(data, promise: promise) + return + } + if self.requestEndReceived { + // Request fully read; stream the response directly. + self.finalResponseState = .streaming + context.write(data, promise: promise) + } else { + // Start buffering with the head. Additional parts (body, end) the + // handler may write before the next deadline are appended below. + self.finalResponseState = .buffering( + head: BufferedWrite(part: part, promise: promise), + additional: [] + ) + } + case .buffering(let head, var additional): + additional.append(BufferedWrite(part: part, promise: promise)) + self.finalResponseState = .buffering(head: head, additional: additional) + case .streaming: + context.write(data, promise: promise) + if case .end = part, self.closeAfterResponseEnd { + // The head we flushed earlier carried `Connection: close`; close + // the connection now that the response is complete. + context.flush() + context.close(mode: .output, promise: nil) + } + } + } + + func flush(context: ChannelHandlerContext) { + // An upstream writer forced a flush. Same deadline as `channelReadComplete`: + // release any buffered parts, with `Connection: close` if request `.end` + // hasn't arrived. + if case .buffering = self.finalResponseState { + self.flushBuffer(context: context) + } + context.flush() + } + + /// Releases buffered response parts to the pipeline. If request `.end` has not + /// yet arrived, amend the head with `Connection: close` and arrange to close + /// the connection once response `.end` is written. + private func flushBuffer(context: ChannelHandlerContext) { + guard case .buffering(var head, let additional) = self.finalResponseState else { return } + + if !self.requestEndReceived { + // Amend the head with `Connection: close` before flushing. + if case .head(var response) = head.part { + response.headerFields[.connection] = "close" + head.part = .head(response) + } + self.closeAfterResponseEnd = true + } + + self.finalResponseState = .streaming + + context.write(self.wrapOutboundOut(head.part), promise: head.promise) + var sawEnd = false + for write in additional { + context.write(self.wrapOutboundOut(write.part), promise: write.promise) + if case .end = write.part { + sawEnd = true + } + } + context.flush() + + if sawEnd && self.closeAfterResponseEnd { + // The response was fully buffered (head + ... + end) and we have to + // close. Close now (the flush above ensured the writes reached the + // wire). + context.close(mode: .output, promise: nil) + } + } +} diff --git a/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift b/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift index d22323c..d3d609c 100644 --- a/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift +++ b/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift @@ -47,22 +47,21 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable fileprivate var state: ReaderState /// The iterator that provides HTTP request parts from the underlying channel. - private var iterator: NIOAsyncChannelInboundStream.AsyncIterator + /// Taken from `state` at construction; returned to `state` when this reader + /// observes request `.end` so the outer request loop can recover it for + /// HTTP/1.1 keep-alive. + private var iterator: NIOAsyncChannelInboundStream.AsyncIterator? /// A reusable buffer handed to the body closure on each call to ``read(body:)``. /// Reusing it across calls preserves the allocation; the buffer is cleared /// (while keeping its capacity) at the start of every read. private var buffer: UniqueArray - /// Initializes a new request body reader with the given NIO async channel iterator. - /// - /// - Parameter iterator: The NIO async channel inbound stream iterator to use for reading request parts. - fileprivate init( - iterator: consuming sending NIOAsyncChannelInboundStream.AsyncIterator, - readerState: ReaderState - ) { - self.iterator = iterator + /// Initializes a new request body reader, taking the iterator from the + /// shared `ReaderState`. + fileprivate init(readerState: ReaderState) { self.state = readerState + self.iterator = readerState.takeIterator() self.buffer = UniqueArray() } @@ -72,7 +71,7 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable ) async throws(EitherError) -> Return { let requestPart: HTTPRequestPart? do { - requestPart = try await self.iterator.next(isolation: #isolation) + requestPart = try await self.iterator?.next(isolation: #isolation) } catch { throw .first(error) } @@ -85,9 +84,14 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable self.buffer.reserveCapacity(element.readableBytes) self.buffer.append(copying: element.readableBytesUInt8Span) case .end(let trailers): + // Move the iterator back into ReaderState so the outer request + // loop can recover it for the next request on the same connection + // (HTTP/1.1 keep-alive). + nonisolated(unsafe) let iter = self.iterator.take() self.state.wrapped.withLock { state in state.trailers = trailers state.finishedReading = true + _ = state.iterator.swap(newValue: iter) } case .none: throw .first(RequestBodyReadError.streamEndedBeforeReceivingRequestEnd) @@ -102,15 +106,31 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable } final class ReaderState: Sendable { - struct Wrapped { + struct Wrapped: ~Copyable { var trailers: HTTPFields? = nil var finishedReading: Bool = false + + /// The iterator. Initially populated from the channel; taken by the + /// body reader at construction time and returned by it once request + /// `.end` has been observed (for HTTP/1.1 keep-alive recovery). + var iterator: + Disconnected< + NIOAsyncChannelInboundStream.AsyncIterator? + > } let wrapped: Mutex - init() { - self.wrapped = .init(.init()) + init(iterator: consuming sending NIOAsyncChannelInboundStream.AsyncIterator) { + self.wrapped = .init(.init(iterator: Disconnected(value: iterator))) + } + + /// Takes the iterator out of the state. Returns the iterator if present, + /// or `nil` if it's already been taken (e.g. by the body reader). + func takeIterator() -> sending NIOAsyncChannelInboundStream.AsyncIterator? { + self.wrapped.withLock { state in + state.iterator.swap(newValue: nil) + } } } @@ -123,18 +143,12 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable /// The type of errors that can occur during reading operations. public typealias Failure = any Error - private var iterator: Disconnected.AsyncIterator?> - internal var state: ReaderState - /// Initializes a new HTTP request body and trailers reader with the given NIO async channel iterator. + /// Initializes a new HTTP request body and trailers reader. /// - /// - Parameter iterator: The NIO async channel inbound stream iterator to use for reading request parts. - init( - iterator: consuming sending NIOAsyncChannelInboundStream.AsyncIterator, - readerState: ReaderState - ) { - self.iterator = .init(value: iterator) + /// - Parameter readerState: The shared reader state that holds the iterator and captures trailers. + init(readerState: ReaderState) { self.state = readerState } @@ -166,14 +180,10 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable public consuming func consumeAndConclude( body: nonisolated(nonsending) (consuming sending RequestBodyAsyncReader) async throws(Failure) -> Return ) async throws(Failure) -> (Return, HTTPFields?) { - if let iterator = self.iterator.take() { - let partsReader = RequestBodyAsyncReader(iterator: iterator, readerState: self.state) - let result = try await body(partsReader) - let trailers = self.state.wrapped.withLock { $0.trailers } - return (result, trailers) - } else { - fatalError("consumeAndConclude called more than once") - } + let partsReader = RequestBodyAsyncReader(readerState: self.state) + let result = try await body(partsReader) + let trailers = self.state.wrapped.withLock { $0.trailers } + return (result, trailers) } } diff --git a/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift b/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift index fbe7a7d..c7444c0 100644 --- a/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift +++ b/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// import HTTPAPIs +import Logging import NIOCore import NIOExtras import NIOHTTP1 @@ -45,7 +46,7 @@ extension NIOHTTPServer { do { for try await http1Channel in inbound { group.addTask { - await self.handleRequestChannel(channel: http1Channel, handler: handler) + await self.handleHTTP1RequestChannel(channel: http1Channel, handler: handler) } } @@ -114,6 +115,7 @@ extension NIOHTTPServer { ) -> EventLoopFuture> { channel.pipeline.configureHTTPServerPipeline().flatMapThrowing { try channel.pipeline.syncOperations.addHandler(HTTP1ToHTTPServerCodec(secure: false)) + try channel.pipeline.syncOperations.addHandler(HTTPKeepAliveHandler()) return try NIOAsyncChannel( wrappingChannelSynchronously: channel, @@ -121,4 +123,41 @@ extension NIOHTTPServer { ) } } + + /// Handles an HTTP/1.1 connection channel, which may carry multiple serial requests on the + /// same connection (keep-alive). + func handleHTTP1RequestChannel( + channel: NIOAsyncChannel, + handler: some HTTPServerRequestHandler + ) async { + do { + try await channel.executeThenClose { inbound, outbound in + var iterator = inbound.makeAsyncIterator() + + requestLoop: while !Task.isCancelled { + guard let httpRequest = try await self.nextRequestHead(from: &iterator) else { + break requestLoop + } + + guard + let recoveredIterator = try await self.invokeHandler( + request: httpRequest, + iterator: iterator, + outbound: outbound, + handler: handler + ) + else { + // Handler did not fully consume the request; cannot continue on this + // connection. + break requestLoop + } + + iterator = recoveredIterator + } + } + } catch { + self.logger.debug("Error thrown while handling HTTP/1.1 connection", metadata: ["error": "\(error)"]) + try? await channel.channel.close() + } + } } diff --git a/Sources/NIOHTTPServer/NIOHTTPServer+SecureUpgrade.swift b/Sources/NIOHTTPServer/NIOHTTPServer+SecureUpgrade.swift index 5c24a2f..8e38c28 100644 --- a/Sources/NIOHTTPServer/NIOHTTPServer+SecureUpgrade.swift +++ b/Sources/NIOHTTPServer/NIOHTTPServer+SecureUpgrade.swift @@ -110,7 +110,7 @@ extension NIOHTTPServer { let chainFuture = requestChannel.channel.nioSSL_peerValidatedCertificateChain() await Self.$connectionContext.withValue(ConnectionContext(chainFuture)) { - await self.handleRequestChannel( + await self.handleHTTP1RequestChannel( channel: requestChannel, handler: handler ) @@ -137,7 +137,7 @@ extension NIOHTTPServer { try await Self.$connectionContext.withValue(ConnectionContext(chainFuture)) { for try await streamChannel in multiplexer.inbound { streamGroup.addTask { - await self.handleRequestChannel( + await self.handleHTTP2StreamChannel( channel: streamChannel, handler: handler ) @@ -313,6 +313,45 @@ extension NIOHTTPServer { } } } + + /// Handles an HTTP/2 stream channel, which carries exactly one request per stream. + func handleHTTP2StreamChannel( + channel: NIOAsyncChannel, + handler: some HTTPServerRequestHandler + ) async { + do { + try await channel + .executeThenClose { inbound, outbound in + var iterator = inbound.makeAsyncIterator() + + guard let httpRequest = try await self.nextRequestHead(from: &iterator) else { + outbound.finish() + return + } + + _ = try await self.invokeHandler( + request: httpRequest, + iterator: iterator, + outbound: outbound, + handler: handler + ) + + // TODO: handle other state scenarios. + // For example, if we didn't finish reading but we wrote back a response, we + // should send a RST_STREAM with NO_ERROR set. If we finished reading but we + // didn't write back a response, then RST_STREAM is also likely appropriate but + // unclear about the error. + + // Finish the outbound and wait on the close future to make sure all pending + // writes are actually written. + outbound.finish() + try await channel.channel.closeFuture.get() + } + } catch { + self.logger.debug("Error thrown while handling HTTP/2 stream: \(error)") + try? await channel.channel.close() + } + } } @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) diff --git a/Sources/NIOHTTPServer/NIOHTTPServer.swift b/Sources/NIOHTTPServer/NIOHTTPServer.swift index a97a443..454b381 100644 --- a/Sources/NIOHTTPServer/NIOHTTPServer.swift +++ b/Sources/NIOHTTPServer/NIOHTTPServer.swift @@ -221,103 +221,80 @@ public struct NIOHTTPServer: HTTPServer { } } - /// Handles a single HTTP request. + /// Reads the next request head from the iterator. Returns `nil` if the connection is done or + /// an unexpected part is received. /// - /// - Note: Errors do not propagate to the caller. When an error occurs, it is logged and the channel is closed. - /// - /// - Parameters: - /// - channel: The async channel to read the request from and write the response to. - /// - handler: The request handler. - func handleRequestChannel( - channel: NIOAsyncChannel, + /// Skips over leftover `.body` and `.end` parts from a previous request that the + /// handler didn't fully consume. The ``HTTPKeepAliveHandler`` separately ensures that connections are closed (with + /// `Connection: close`) when the server responds before the request `.end` arrives, preventing unbounded leftover state. + func nextRequestHead( + from iterator: inout NIOAsyncChannelInboundStream.AsyncIterator + ) async throws -> HTTPRequest? { + while true { + switch try await iterator.next(isolation: #isolation) { + case .head(let request): + return request + case .body, .end: + // Leftover parts from a previous request. Skip and look for the next head. + continue + case .none: + self.logger.trace("No more request parts on connection") + return nil + } + } + } + + /// Shared core: invokes the request handler with the appropriate reader/writer state. + /// Returns the recovered iterator if the request was fully consumed (for HTTP/1.1 reuse), + /// or `nil` if the request could not be fully consumed. + func invokeHandler( + request: HTTPRequest, + iterator: consuming sending NIOAsyncChannelInboundStream.AsyncIterator, + outbound: NIOAsyncChannelOutboundWriter, handler: some HTTPServerRequestHandler - ) async { + ) async throws -> NIOAsyncChannelInboundStream.AsyncIterator? { + let readerState = HTTPRequestConcludingAsyncReader.ReaderState(iterator: iterator) + let writerState = HTTPResponseConcludingAsyncWriter.WriterState() + do { - try await channel.executeThenClose { inbound, outbound in - var iterator = inbound.makeAsyncIterator() - - let nextPart: HTTPRequestPart? - do { - nextPart = try await iterator.next() - } catch { - self.logger.error( - "Error thrown while advancing the request iterator", - metadata: ["error": "\(error)"] + try await handler.handle( + request: request, + requestContext: HTTPRequestContext(), + requestBodyAndTrailers: HTTPRequestConcludingAsyncReader( + readerState: readerState + ), + responseSender: HTTPResponseSender { response in + try await outbound.write(.head(response)) + return HTTPResponseConcludingAsyncWriter( + writer: outbound, + writerState: writerState ) - throw error - } - - let httpRequest: HTTPRequest - switch nextPart { - case .head(let request): - httpRequest = request - case .body: - self.logger.debug("Unexpectedly received body on connection. Closing now") - outbound.finish() - return - case .end: - self.logger.debug("Unexpectedly received end on connection. Closing now") - outbound.finish() - return - case .none: - self.logger.trace("No more requests parts on connection") - return + } sendInformational: { response in + try await outbound.write(.head(response)) } - - let readerState = HTTPRequestConcludingAsyncReader.ReaderState() - let writerState = HTTPResponseConcludingAsyncWriter.WriterState() - - do { - try await handler.handle( - request: httpRequest, - requestContext: HTTPRequestContext(), - requestBodyAndTrailers: HTTPRequestConcludingAsyncReader( - iterator: iterator, - readerState: readerState - ), - responseSender: HTTPResponseSender { response in - try await outbound.write(.head(response)) - return HTTPResponseConcludingAsyncWriter( - writer: outbound, - writerState: writerState - ) - } sendInformational: { response in - try await outbound.write(.head(response)) - } - ) - } catch { - if !readerState.wrapped.withLock({ $0.finishedReading }) { - self.logger.error("Did not finish reading but error thrown.") - // TODO: if h2 reset stream; if h1 try draining request? - } - - if !writerState.wrapped.withLock({ $0.finishedWriting }) { - self.logger.error("Did not write response but error thrown.") - // TODO: we need to do something, possibly just close the connection or - // reset the stream with the appropriate error. - } - - throw error - } - - // TODO: handle other state scenarios. - // For example, if we're using h2 and we didn't finish reading but we wrote back - // a response, we should send a RST_STREAM with NO_ERROR set. - // If we finished reading but we didn't write back a response, then RST_STREAM - // is also likely appropriate but unclear about the error. - // For h1, we should close the connection. - - // Finish the outbound and wait on the close future to make sure all pending - // writes are actually written. - outbound.finish() - try await channel.channel.closeFuture.get() - } + ) } catch { - // TODO: We need to send a response head here potentially - self.logger.error("Error thrown while handling connection", metadata: ["error": "\(error)"]) + logger.error("Error thrown while handling request: \(error)") + if !readerState.wrapped.withLock({ $0.finishedReading }) { + logger.error("Did not finish reading but error thrown.") + } + if !writerState.wrapped.withLock({ $0.finishedWriting }) { + logger.error("Did not write response but error thrown.") + } + throw error + } - try? await channel.channel.close() + // If the handler didn't properly conclude the response, the HTTP codec + // is in an inconsistent state and the connection cannot be reused. + if !writerState.wrapped.withLock({ $0.finishedWriting }) { + self.logger.debug("Handler did not conclude the response. Closing connection.") + return nil } + + // Recover the iterator for potential connection reuse. If the handler started + // reading the request body but didn't finish, the iterator was consumed by the + // reader and not returned, so we can't reuse the connection. + return readerState.takeIterator() } /// Fail the listening address promise if the server is shutting down before it began listening. diff --git a/Tests/NIOHTTPServerTests/HTTPKeepAliveHandlerTests.swift b/Tests/NIOHTTPServerTests/HTTPKeepAliveHandlerTests.swift new file mode 100644 index 0000000..ceab345 --- /dev/null +++ b/Tests/NIOHTTPServerTests/HTTPKeepAliveHandlerTests.swift @@ -0,0 +1,494 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift HTTP Server open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift HTTP Server project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift HTTP Server project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import HTTPAPIs +import HTTPTypes +import Logging +import NIOCore +import NIOHTTPTypes +import NIOPosix +import Synchronization +import Testing + +@testable import NIOHTTPServer + +@Suite +struct HTTPKeepAliveHandlerTests { + let serverLogger = Logger(label: "HTTPKeepAliveHandlerTests") + + /// Verifies the happy case: when a client pipelines multiple HTTP/1.1 requests + /// on a single connection, all responses are returned in order and the connection + /// stays alive (no `Connection: close`). + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + @Test("Pipelined requests on a single connection all succeed") + func testPipelinedRequests() async throws { + let server = NIOHTTPServer( + logger: self.serverLogger, + configuration: try .init( + bindTarget: .hostAndPort(host: "127.0.0.1", port: 0), + supportedHTTPVersions: [.http1_1], + transportSecurity: .plaintext + ) + ) + + let requestCount = 5 + + try await NIOHTTPServerTests.withServer( + server: server, + serverHandler: HTTPServerClosureRequestHandler { request, _, reader, sender in + try await NIOHTTPServerTests.echoResponse(readUpTo: 1024, reader: reader, sender: sender) + }, + body: { serverAddress in + let client = try await ClientBootstrap(group: .singletonMultiThreadedEventLoopGroup) + .connectToTestHTTP1Server(at: serverAddress) + + try await client.executeThenClose { inbound, outbound in + // Pipeline all requests up-front, then read all responses. + for i in 1...requestCount { + try await outbound.write( + .head(.init(method: .post, scheme: "http", authority: "", path: "/\(i)")) + ) + try await outbound.write(.body(ByteBuffer(string: "request-\(i)"))) + try await outbound.write(.end(nil)) + } + + var responseIterator = inbound.makeAsyncIterator() + for i in 1...requestCount { + let headPart = try await responseIterator.next() + guard case .head(let response) = headPart else { + Issue.record("Expected .head for request \(i), got \(String(describing: headPart))") + return + } + #expect(response.status == .ok) + // Connection should remain keep-alive — no Connection: close header. + #expect( + response.headerFields[.connection] != "close", + "Response \(i) unexpectedly had Connection: close: \(response.headerFields)" + ) + + // Drain body parts until .end. + var collectedBody = ByteBuffer() + while true { + let part = try await responseIterator.next() + if case .body(let buf) = part { + collectedBody.writeImmutableBuffer(buf) + } else if case .end = part { + break + } else { + Issue.record("Unexpected part for request \(i): \(String(describing: part))") + return + } + } + #expect(collectedBody == ByteBuffer(string: "request-\(i)")) + } + } + } + ) + } + + /// Verifies that when the handler writes a short response (head + end, no body) + /// before the request `.end` has arrived, the response head includes a + /// `Connection: close` header and the server closes the connection. + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + @Test("Server sends head+end (no body) before request .end — Connection: close in header") + func testShortResponseBeforeRequestEnd() async throws { + let server = NIOHTTPServer( + logger: self.serverLogger, + configuration: try .init( + bindTarget: .hostAndPort(host: "127.0.0.1", port: 0), + supportedHTTPVersions: [.http1_1], + transportSecurity: .plaintext + ) + ) + + try await NIOHTTPServerTests.withServer( + server: server, + serverHandler: HTTPServerClosureRequestHandler { _, _, reader, sender in + // Read just one byte of the body to confirm we got past the head, then + // write a body-less response (head + end only). Because the head is + // still buffered by the keep-alive handler when `.end` is written, the + // handler amends the head with `Connection: close` before flushing. + let _ = try await reader.consumeAndConclude { partsReader in + var partsReader = partsReader + try await partsReader.read { _ in } + } + let writer = try await sender.send( + .init(status: .ok, headerFields: [.contentLength: "0"]) + ) + try await writer.writeAndConclude("".utf8.span, finalElement: nil) + }, + body: { serverAddress in + let client = try await ClientBootstrap(group: .singletonMultiThreadedEventLoopGroup) + .connectToTestHTTP1Server(at: serverAddress) + + try await client.executeThenClose { inbound, outbound in + try await outbound.write( + .head(.init(method: .post, scheme: "http", authority: "", path: "/")) + ) + try await outbound.write(.body(ByteBuffer(string: "x"))) + + // Read the response: should have Connection: close in the head. + var responseIterator = inbound.makeAsyncIterator() + let headPart = try await responseIterator.next() + guard case .head(let response) = headPart else { + Issue.record("Expected .head, got \(String(describing: headPart))") + return + } + #expect(response.status == .ok) + #expect( + response.headerFields[.connection] == "close", + "Expected Connection: close, got headers: \(response.headerFields)" + ) + + // Drain until .end, then verify channel closed. + var sawEnd = false + while !sawEnd { + let part = try await responseIterator.next() + switch part { + case .body: + continue + case .end: + sawEnd = true + case .none: + Issue.record("Stream ended before response .end") + return + case .head: + Issue.record("Unexpected second .head: \(String(describing: part))") + return + } + } + + let next = try await responseIterator.next() + #expect(next == nil, "Expected channel to be closed; got \(String(describing: next))") + } + } + ) + } + + /// Verifies that informational (1xx) responses pass through the keep-alive handler + /// without affecting buffering state. The handler writes a `100 Continue` before + /// the request `.end` has arrived; the client must receive that informational + /// response immediately (without waiting for request `.end`), and the connection + /// must remain alive after the final response. + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + @Test("Informational (1xx) responses pass through without buffering or closing") + func testInformationalResponsePassesThrough() async throws { + let server = NIOHTTPServer( + logger: self.serverLogger, + configuration: try .init( + bindTarget: .hostAndPort(host: "127.0.0.1", port: 0), + supportedHTTPVersions: [.http1_1], + transportSecurity: .plaintext + ) + ) + + try await NIOHTTPServerTests.withServer( + server: server, + serverHandler: HTTPServerClosureRequestHandler { request, _, reader, sender in + // Only the first request exercises informational semantics; the + // pipelined second request (path "/second") just verifies keep-alive. + if request.path == "/" { + try await sender.sendInformational(.init(status: .continue)) + } + + // Read the full request body (until .end). + let _ = try await reader.consumeAndConclude { partsReader in + var partsReader = partsReader + try await partsReader.collect(upTo: 1024) { _ in } + } + + // Write the final response. + let writer = try await sender.send( + .init(status: .ok, headerFields: [.contentLength: "5"]) + ) + try await writer.writeAndConclude("hello".utf8.span, finalElement: nil) + }, + body: { serverAddress in + let client = try await ClientBootstrap(group: .singletonMultiThreadedEventLoopGroup) + .connectToTestHTTP1Server(at: serverAddress) + + try await client.executeThenClose { inbound, outbound in + try await outbound.write( + .head(.init(method: .post, scheme: "http", authority: "", path: "/")) + ) + + // Read the 100 Continue before sending the request body — this + // only works if the informational response was forwarded without + // being buffered by the keep-alive handler. + var responseIterator = inbound.makeAsyncIterator() + let informationalPart = try await responseIterator.next() + guard case .head(let informational) = informationalPart else { + Issue.record("Expected informational .head, got \(String(describing: informationalPart))") + return + } + #expect(informational.status == .continue) + + // Now send the body and end so the server can write the final + // response. + try await outbound.write(.body(ByteBuffer(string: "hello"))) + try await outbound.write(.end(nil)) + + // Read the final 200 OK response. + let finalHeadPart = try await responseIterator.next() + guard case .head(let response) = finalHeadPart else { + Issue.record("Expected final .head, got \(String(describing: finalHeadPart))") + return + } + #expect(response.status == .ok) + #expect( + response.headerFields[.connection] != "close", + "Expected keep-alive after informational flow; got headers: \(response.headerFields)" + ) + + // Drain body and end. + var sawEnd = false + while !sawEnd { + let part = try await responseIterator.next() + switch part { + case .body: + continue + case .end: + sawEnd = true + case .none: + Issue.record("Stream ended before response .end") + return + case .head: + Issue.record("Unexpected .head: \(String(describing: part))") + return + } + } + + // Pipeline a second request to verify keep-alive actually works. + try await outbound.write( + .head(.init(method: .get, scheme: "http", authority: "", path: "/second")) + ) + try await outbound.write(.end(nil)) + + let secondHead = try await responseIterator.next() + guard case .head(let secondResponse) = secondHead else { + Issue.record("Expected second .head, got \(String(describing: secondHead))") + return + } + #expect(secondResponse.status == .ok) + } + } + ) + } + + /// Verifies bidirectional streaming over HTTP/1.1: the handler writes the + /// response head and body chunks while concurrently reading the request body. + /// The client and server ping-pong — the client sends one byte, waits for its + /// echo, then sends the next. This only works if the keep-alive handler flushes + /// the response head as soon as a body chunk is written, rather than buffering + /// everything until request `.end` arrives. Because the head is flushed before + /// request `.end` arrives, the response carries `Connection: close` and the + /// server closes the connection after writing response `.end`. + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + @Test("Bidirectional streaming works — head is flushed (with Connection: close) when a body part is written") + func testBidirectionalStreamingOverHTTP1() async throws { + let server = NIOHTTPServer( + logger: self.serverLogger, + configuration: try .init( + bindTarget: .hostAndPort(host: "127.0.0.1", port: 0), + supportedHTTPVersions: [.http1_1], + transportSecurity: .plaintext + ) + ) + + try await NIOHTTPServerTests.withServer( + server: server, + serverHandler: HTTPServerClosureRequestHandler { _, _, reader, sender in + // Echo request body parts back as response body parts, concurrently + // with reading from the request body. + var maybeReader = Optional(reader) + let writer = try await sender.send(.init(status: .ok)) + try await writer.produceAndConclude { responseBodyWriter in + var responseBodyWriter = responseBodyWriter + let reader = maybeReader.take()! + let _ = try await reader.consumeAndConclude { bodyReader in + try await bodyReader.forEachBuffer { buffer in + try await responseBodyWriter.write(buffer.span) + } + } + return nil + } + }, + body: { serverAddress in + let client = try await ClientBootstrap(group: .singletonMultiThreadedEventLoopGroup) + .connectToTestHTTP1Server(at: serverAddress) + + try await client.executeThenClose { inbound, outbound in + try await outbound.write( + .head(.init(method: .post, scheme: "http", authority: "", path: "/")) + ) + // Write the first body byte before reading the response head, so + // the server has something to echo — this unblocks the buffered + // head in the keep-alive handler. This mirrors how real + // bidirectional clients (like the conformance `/echo` test) work. + let chunkCount = 5 + let firstByte = ByteBuffer(bytes: [UInt8(ascii: "A")]) + try await outbound.write(.body(firstByte)) + + var responseIterator = inbound.makeAsyncIterator() + let headPart = try await responseIterator.next() + guard case .head(let response) = headPart else { + Issue.record("Expected .head, got \(String(describing: headPart))") + return + } + #expect(response.status == .ok) + // The head was flushed because a body part was written before + // request `.end` arrived, so it carries `Connection: close`. + #expect( + response.headerFields[.connection] == "close", + "Expected Connection: close on bidirectional flow; got \(response.headerFields)" + ) + + // Read the echo of the first byte. + let firstEcho = try await responseIterator.next() + #expect(firstEcho == .body(firstByte)) + + // Ping-pong: write a byte, read its echo. + for i in 1...makeStream() + let (handlerCanFinishStream, handlerCanFinish) = AsyncStream.makeStream() + + try await NIOHTTPServerTests.withServer( + server: server, + serverHandler: HTTPServerClosureRequestHandler { _, _, reader, sender in + // Write the response head before reading anything. The keep-alive + // handler buffers it because request `.end` hasn't arrived. + let writer = try await sender.send( + .init(status: .ok, headerFields: [.contentLength: "5"]) + ) + responseHeadWritten.yield() + responseHeadWritten.finish() + + // Wait for the test to confirm it saw `Connection: close` before + // we drain the request and finish the response. + var canFinishIterator = handlerCanFinishStream.makeAsyncIterator() + _ = await canFinishIterator.next() + + // Drain the request body + end and then write the response body + end. + let _ = try await reader.consumeAndConclude { partsReader in + var partsReader = partsReader + try await partsReader.collect(upTo: 1024) { _ in } + } + try await writer.writeAndConclude("hello".utf8.span, finalElement: nil) + }, + body: { serverAddress in + let client = try await ClientBootstrap(group: .singletonMultiThreadedEventLoopGroup) + .connectToTestHTTP1Server(at: serverAddress) + + try await client.executeThenClose { inbound, outbound in + // Send only the head. + try await outbound.write( + .head(.init(method: .post, scheme: "http", authority: "", path: "/")) + ) + + // Wait for the handler to write the response head. + var signalIterator = responseHeadWrittenStream.makeAsyncIterator() + _ = await signalIterator.next() + + // Send a single body byte, WITHOUT request `.end`. The server + // will see this as its own read cycle that ends with the + // request `.end` still missing — triggering the + // `Connection: close` amendment. + try await outbound.write(.body(ByteBuffer(string: "x"))) + + // Read the response head. It must carry `Connection: close`. + var responseIterator = inbound.makeAsyncIterator() + let headPart = try await responseIterator.next() + guard case .head(let response) = headPart else { + Issue.record("Expected .head, got \(String(describing: headPart))") + return + } + #expect(response.status == .ok) + #expect( + response.headerFields[.connection] == "close", + "Expected Connection: close after read cycle ended without request .end; got \(response.headerFields)" + ) + + // Let the handler finish and send the rest of the request. + handlerCanFinish.yield() + handlerCanFinish.finish() + try await outbound.write(.end(nil)) + + // Drain the response body + end. + var sawEnd = false + while !sawEnd { + let part = try await responseIterator.next() + switch part { + case .body: + continue + case .end: + sawEnd = true + case .none: + Issue.record("Stream ended before response .end") + return + case .head: + Issue.record("Unexpected second .head: \(String(describing: part))") + return + } + } + + // The server should have closed the connection. + let next = try await responseIterator.next() + #expect(next == nil, "Expected channel close after response; got \(String(describing: next))") + } + } + ) + } +} diff --git a/Tests/NIOHTTPServerTests/HTTPRequestConcludingAsyncReaderTests.swift b/Tests/NIOHTTPServerTests/HTTPRequestConcludingAsyncReaderTests.swift index 36641b6..f32b030 100644 --- a/Tests/NIOHTTPServerTests/HTTPRequestConcludingAsyncReaderTests.swift +++ b/Tests/NIOHTTPServerTests/HTTPRequestConcludingAsyncReaderTests.swift @@ -37,8 +37,7 @@ struct HTTPRequestConcludingAsyncReaderTests { source.finish() let requestReader = HTTPRequestConcludingAsyncReader( - iterator: stream.makeAsyncIterator(), - readerState: .init() + readerState: .init(iterator: stream.makeAsyncIterator()) ) _ = try await requestReader.consumeAndConclude { bodyReader in @@ -59,8 +58,7 @@ struct HTTPRequestConcludingAsyncReaderTests { source.finish() let requestReader = HTTPRequestConcludingAsyncReader( - iterator: stream.makeAsyncIterator(), - readerState: .init() + readerState: .init(iterator: stream.makeAsyncIterator()) ) _ = try await requestReader.consumeAndConclude { bodyReader in @@ -91,7 +89,7 @@ struct HTTPRequestConcludingAsyncReaderTests { source.finish() // Then start reading the request - let requestReader = HTTPRequestConcludingAsyncReader(iterator: stream.makeAsyncIterator(), readerState: .init()) + let requestReader = HTTPRequestConcludingAsyncReader(readerState: .init(iterator: stream.makeAsyncIterator())) let (requestBody, finalElement) = try await requestReader.consumeAndConclude { bodyReader in var bodyReader = bodyReader @@ -140,8 +138,7 @@ struct HTTPRequestConcludingAsyncReaderTests { group.addTask { let requestReader = HTTPRequestConcludingAsyncReader( - iterator: stream.makeAsyncIterator(), - readerState: .init() + readerState: .init(iterator: stream.makeAsyncIterator()) ) let (_, finalElement) = try await requestReader.consumeAndConclude { bodyReader in // Read all body chunks @@ -173,8 +170,7 @@ struct HTTPRequestConcludingAsyncReaderTests { source.finish() let requestReader = HTTPRequestConcludingAsyncReader( - iterator: stream.makeAsyncIterator(), - readerState: .init() + readerState: .init(iterator: stream.makeAsyncIterator()) ) _ = await requestReader.consumeAndConclude { bodyReader in @@ -203,8 +199,7 @@ struct HTTPRequestConcludingAsyncReaderTests { source.finish() let requestReader = HTTPRequestConcludingAsyncReader( - iterator: stream.makeAsyncIterator(), - readerState: .init() + readerState: .init(iterator: stream.makeAsyncIterator()) ) _ = await requestReader.consumeAndConclude { requestBodyReader in diff --git a/Tests/NIOHTTPServerTests/NIOHTTPServer+ServiceLifecycleTests.swift b/Tests/NIOHTTPServerTests/NIOHTTPServer+ServiceLifecycleTests.swift index 1b2175c..40c9f1f 100644 --- a/Tests/NIOHTTPServerTests/NIOHTTPServer+ServiceLifecycleTests.swift +++ b/Tests/NIOHTTPServerTests/NIOHTTPServer+ServiceLifecycleTests.swift @@ -61,7 +61,10 @@ struct NIOHTTPServiceLifecycleTests { firstChunkReadPromise.succeed() - try await bodyReader.read { _ in } + var requestFinished = false + while !requestFinished { + try await bodyReader.read { if $0.isEmpty { requestFinished = true } } + } } let responseBodyWriter = try await responseSender.send(.init(status: .ok)) @@ -184,6 +187,13 @@ struct NIOHTTPServiceLifecycleTests { // Wait for the server to shut down. try await group.waitForAll() + // Wait for the client channel to be fully closed. The server has closed + // its side of the connection, but the client's event loop may not have + // processed the TCP FIN/RST yet. closeFuture completes only once the + // channel is fully inactive, which is a stronger guarantee than just + // draining inbound (which may return while the channel is half-closed). + try await client.channel.closeFuture.get() + // We shouldn't be able to complete our request; the server should have shut down. await #expect(throws: ChannelError.ioOnClosedChannel) { try await outbound.write(Self.reqBody) diff --git a/Tests/NIOHTTPServerTests/NIOHTTPServerTests.swift b/Tests/NIOHTTPServerTests/NIOHTTPServerTests.swift index 00bf10b..52d3fa1 100644 --- a/Tests/NIOHTTPServerTests/NIOHTTPServerTests.swift +++ b/Tests/NIOHTTPServerTests/NIOHTTPServerTests.swift @@ -110,7 +110,8 @@ struct NIOHTTPServerTests { inbound, expectedHead: [Self.responseHead(status: .ok, for: .http1_1)], expectedBody: [Self.bodyData], - expectedTrailers: Self.trailer + expectedTrailers: Self.trailer, + expectStreamEnd: false ) } } @@ -190,7 +191,8 @@ struct NIOHTTPServerTests { inbound, expectedHead: [Self.responseHead(status: .ok, for: httpVersion)], expectedBody: [Self.bodyData], - expectedTrailers: Self.trailer + expectedTrailers: Self.trailer, + expectStreamEnd: httpVersion == .http2 ) responseReceived() @@ -240,7 +242,8 @@ struct NIOHTTPServerTests { Self.responseHead(status: .ok, for: httpVersion), ], expectedBody: [Self.bodyData], - expectedTrailers: Self.trailer + expectedTrailers: Self.trailer, + expectStreamEnd: httpVersion == .http2 ) responseReceived() } @@ -371,6 +374,60 @@ struct NIOHTTPServerTests { ) } + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + @Test("Multiple serial HTTP/1.1 requests on the same connection") + func testMultipleSerialHTTP1Requests() async throws { + let server = NIOHTTPServer( + logger: self.serverLogger, + configuration: try .init( + bindTarget: .hostAndPort(host: "127.0.0.1", port: 0), + supportedHTTPVersions: [.http1_1], + transportSecurity: .plaintext + ) + ) + + let requestCount = 3 + + try await confirmation(expectedCount: requestCount) { responseReceived in + try await Self.withServer( + server: server, + serverHandler: HTTPServerClosureRequestHandler { request, requestContext, reader, responseWriter in + // Echo the request body back as the response body. + try await Self.echoResponse(readUpTo: 1024, reader: reader, sender: responseWriter) + }, + body: { serverAddress in + let client = try await ClientBootstrap(group: .singletonMultiThreadedEventLoopGroup) + .connectToTestHTTP1Server(at: serverAddress) + + try await client.executeThenClose { inbound, outbound in + var responseIterator = inbound.makeAsyncIterator() + + for i in 1...requestCount { + // Send request + try await outbound.write( + .head(.init(method: .post, scheme: "http", authority: "", path: "/\(i)")) + ) + try await outbound.write(Self.reqBody) + try await outbound.write(.end(nil)) + + // Read response + let headPart = try await responseIterator.next() + #expect(headPart == .head(Self.responseHead(status: .ok, for: .http1_1))) + + let bodyPart = try await responseIterator.next() + #expect(bodyPart == .body(Self.bodyData)) + + let endPart = try await responseIterator.next() + #expect(endPart == .end(nil)) + + responseReceived() + } + } + } + ) + } + } + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) @Test("Multiple concurrent connections", arguments: [HTTPVersion.http1_1, HTTPVersion.http2]) func testMultipleConcurrentConnections(httpVersion: HTTPVersion) async throws { @@ -426,7 +483,8 @@ struct NIOHTTPServerTests { try await Self.validateResponse( inbound, expectedHead: [Self.responseHead(status: .ok, for: httpVersion)], - expectedBody: [Self.bodyData] + expectedBody: [Self.bodyData], + expectStreamEnd: httpVersion == .http2 ) responseReceived() @@ -542,7 +600,6 @@ struct NIOHTTPServerTests { try await firstClientChannel.executeThenClose { inbound, outbound in // Only send a request head; finish the stream immediately afterwards. try await outbound.write(.head(.init(method: .post, scheme: "http", authority: "", path: "/"))) - outbound.finish() } try await firstRequestErrorCaught.futureResult.get() @@ -558,7 +615,8 @@ struct NIOHTTPServerTests { try await Self.validateResponse( inbound, expectedHead: [Self.responseHead(status: .ok, for: .http1_1)], - expectedBody: [Self.bodyData] + expectedBody: [Self.bodyData], + expectStreamEnd: false ) responseReceived() @@ -633,7 +691,8 @@ struct NIOHTTPServerTests { inbound, expectedHead: [Self.responseHead(status: .ok, for: .http1_1)], expectedBody: [Self.bodyData], - expectedTrailers: Self.trailer + expectedTrailers: Self.trailer, + expectStreamEnd: false ) } @@ -650,7 +709,8 @@ struct NIOHTTPServerTests { inbound, expectedHead: [Self.responseHead(status: .ok, for: .http1_1)], expectedBody: [Self.bodyData], - expectedTrailers: Self.trailer + expectedTrailers: Self.trailer, + expectStreamEnd: false ) } } @@ -700,7 +760,8 @@ struct NIOHTTPServerTests { inbound, expectedHead: [Self.responseHead(status: .ok, for: .http1_1)], expectedBody: [Self.bodyData], - expectedTrailers: Self.trailer + expectedTrailers: Self.trailer, + expectStreamEnd: false ) } } @@ -829,6 +890,7 @@ extension NIOHTTPServerTests { expectedHead: [HTTPResponse], expectedBody: [ByteBuffer], expectedTrailers: HTTPFields? = nil, + expectStreamEnd: Bool = true, sourceLocation: SourceLocation = #_sourceLocation ) async throws { var responseIterator = responseStream.makeAsyncIterator() @@ -846,11 +908,13 @@ extension NIOHTTPServerTests { let endResponsePart = try await responseIterator.next() #expect(endResponsePart == .end(expectedTrailers), sourceLocation: sourceLocation) - #expect( - try await responseIterator.next() == nil, - "Received another response part when the response stream should have finished.", - sourceLocation: sourceLocation - ) + if expectStreamEnd { + #expect( + try await responseIterator.next() == nil, + "Received another response part when the response stream should have finished.", + sourceLocation: sourceLocation + ) + } } /// Unwraps a negotiated channel, asserting it matches the expected `httpVersion`. For HTTP/2, opens and returns a diff --git a/Tests/NIOHTTPServerTests/Utilities/TestingChannelClientServer/TestingChannelServer+HTTP1.swift b/Tests/NIOHTTPServerTests/Utilities/TestingChannelClientServer/TestingChannelServer+HTTP1.swift index 9af92f8..1892b35 100644 --- a/Tests/NIOHTTPServerTests/Utilities/TestingChannelClientServer/TestingChannelServer+HTTP1.swift +++ b/Tests/NIOHTTPServerTests/Utilities/TestingChannelClientServer/TestingChannelServer+HTTP1.swift @@ -48,7 +48,7 @@ struct TestingChannelHTTP1Server { logger: logger, // The server won't actually be bound to this host and port; we just have to pass this argument. configuration: try .init( - bindTarget: .hostAndPort(host: "127.0.0.1", port: 8000), + bindTarget: .hostAndPort(host: "127.0.0.1", port: 0), supportedHTTPVersions: [.http1_1], transportSecurity: .plaintext ) @@ -98,7 +98,7 @@ struct TestingChannelHTTP1Server { try await body(clientAsyncChannel) - try await serverTestConnectionChannel.close() + try? await serverTestConnectionChannel.close() } } }