diff --git a/Examples/MiddlewareClient/ExampleMiddlewareClient.swift b/Examples/MiddlewareClient/ExampleMiddlewareClient.swift index a86b8cb..21645de 100644 --- a/Examples/MiddlewareClient/ExampleMiddlewareClient.swift +++ b/Examples/MiddlewareClient/ExampleMiddlewareClient.swift @@ -20,20 +20,16 @@ import Middleware @available(anyAppleOS 26.0, *) struct ExampleMiddlewareClient< Client: HTTPClient & ~Copyable, - OutWriter: CallerAsyncWriter & ~Copyable & SendableMetatype, ClientMiddleware: Middleware & Sendable >: HTTPClient, ~Copyable where - OutWriter.WriteElement == UInt8, - OutWriter.FinalElement == HTTPFields?, - Client.Writer: SendableMetatype, ClientMiddleware.Input: ~Copyable, ClientMiddleware.NextInput: ~Copyable, - ClientMiddleware.Input == HTTPClientMiddlewareInput, + ClientMiddleware.Input == HTTPClientMiddlewareInput, ClientMiddleware.NextInput == HTTPClientMiddlewareInput { typealias RequestOptions = Client.RequestOptions - typealias Writer = OutWriter + typealias Writer = Client.Writer typealias Reader = Client.Reader var defaultRequestOptions: Client.RequestOptions { @@ -54,9 +50,9 @@ where mutating func perform( request: HTTPRequest, - body: consuming HTTPClientRequestBody?, + body: consuming HTTPClientRequestBody?, options: RequestOptions, - responseHandler: (HTTPResponse, consuming Reader) async throws -> Return + responseHandler: (HTTPResponse, consuming Reader, consuming Future) async throws -> Return ) async throws -> Return { return try await self.middleware.intercept( input: HTTPClientMiddlewareInput(request: request, body: body) diff --git a/Examples/ProxyServer/ProxyServer.swift b/Examples/ProxyServer/ProxyServer.swift index e0dce27..247155d 100644 --- a/Examples/ProxyServer/ProxyServer.swift +++ b/Examples/ProxyServer/ProxyServer.swift @@ -52,8 +52,9 @@ struct ProxyServer { }! // Pipe the server request body straight into the upstream writer. try await reader.pipe(into: upstreamWriter) + return nil } - ) { response, upstreamReader in + ) { response, upstreamReader, _ in // Pipe the upstream client response body straight into the // downstream response sender. let writer = try await responseSender.take()!.send(response) diff --git a/Package.swift b/Package.swift index daddd26..7ada537 100644 --- a/Package.swift +++ b/Package.swift @@ -19,11 +19,11 @@ let extraSettings: [SwiftSetting] = [ let package = Package( name: "HTTPAPIProposal", platforms: [ // TODO: Needed until https://github.com/swiftlang/swift/issues/89028 is fixed - .macOS(.v15), - .iOS(.v18), - .watchOS(.v11), - .tvOS(.v18), - .visionOS(.v2), + .macOS(.v27), + .iOS(.v27), + .watchOS(.v27), + .tvOS(.v27), + .visionOS(.v27), ], products: [ .library(name: "HTTPAPIs", targets: ["HTTPAPIs"]), diff --git a/Sources/AHCHTTPClient/AHC+HTTPClient.swift b/Sources/AHCHTTPClient/AHC+HTTPClient.swift index 616b61c..2d065ee 100644 --- a/Sources/AHCHTTPClient/AHC+HTTPClient.swift +++ b/Sources/AHCHTTPClient/AHC+HTTPClient.swift @@ -156,7 +156,7 @@ extension AsyncHTTPClient.HTTPClient: HTTPAPIs.HTTPClient { request: HTTPRequest, body: consuming HTTPClientRequestBody?, options: RequestOptions, - responseHandler: (HTTPResponse, consuming Reader) async throws -> Return + responseHandler: (HTTPResponse, consuming Reader, consuming Future) async throws -> Return ) async throws -> Return { guard let url = request.url else { fatalError() @@ -165,6 +165,10 @@ extension AsyncHTTPClient.HTTPClient: HTTPAPIs.HTTPClient { var result: Result? await withTaskGroup(of: Void.self) { taskGroup in + let pair = Promise.makePromise(of: Writer?.self) + var requestBodyPromise = Optional(pair.a) + let requestBodyFuture = pair.b + var ahcRequest = HTTPClientRequest(url: url.absoluteString) ahcRequest.method = .init(rawValue: request.method.rawValue) if !request.headerFields.isEmpty { @@ -181,10 +185,12 @@ extension AsyncHTTPClient.HTTPClient: HTTPAPIs.HTTPClient { for await ahcWriter in asyncStream { do { let writer = Writer(ahcWriter) - try await body.produce(into: writer) + let result = try await body.produce(into: writer) + requestBodyPromise.take()!.fulfill(result) // writer.finish already calls requestBodyStreamFinished break // the loop } catch let error { + requestBodyPromise.take()!.fulfill(error: error) // if we fail because the user throws in upload, we have to cancel the // upload and fail the request I guess. ahcWriter.fail(error) @@ -211,7 +217,7 @@ extension AsyncHTTPClient.HTTPClient: HTTPAPIs.HTTPClient { headerFields: responseFields ) - result = .success(try await responseHandler(response, Reader(body: ahcResponse.body))) + result = .success(try await responseHandler(response, Reader(body: ahcResponse.body), requestBodyFuture)) } catch { result = .failure(error) } diff --git a/Sources/HTTPAPIs/Client/HTTPClient+Conveniences.swift b/Sources/HTTPAPIs/Client/HTTPClient+Conveniences.swift index 165d1e8..b047a2a 100644 --- a/Sources/HTTPAPIs/Client/HTTPClient+Conveniences.swift +++ b/Sources/HTTPAPIs/Client/HTTPClient+Conveniences.swift @@ -48,7 +48,7 @@ where request: HTTPRequest, body: consuming HTTPClientRequestBody? = nil, options: RequestOptions? = nil, - responseHandler: (HTTPResponse, consuming Reader) async throws -> Return, + responseHandler: (HTTPResponse, consuming Reader, consuming Future) async throws -> Return, ) async throws -> Return { let options = options ?? self.defaultRequestOptions return try await self.perform(request: request, body: body, options: options, responseHandler: responseHandler) @@ -76,7 +76,7 @@ where ) async throws -> (response: HTTPResponse, bodyData: Data) { let request = HTTPRequest(url: url, headerFields: headerFields) let options = options ?? self.defaultRequestOptions - return try await self.perform(request: request, body: nil, options: options) { response, reader in + return try await self.perform(request: request, body: nil, options: options) { response, reader, _ in ( response, try await Self.collectBody(reader, upTo: limit) @@ -108,7 +108,7 @@ where ) async throws -> (response: HTTPResponse, bodyData: Data) { let request = HTTPRequest(method: .post, url: url, headerFields: headerFields) let options = options ?? self.defaultRequestOptions - return try await self.perform(request: request, body: .data(bodyData), options: options) { response, reader in + return try await self.perform(request: request, body: .data(bodyData), options: options) { response, reader, _ in ( response, try await Self.collectBody(reader, upTo: limit) @@ -140,7 +140,7 @@ where ) async throws -> (response: HTTPResponse, bodyData: Data) { let request = HTTPRequest(method: .put, url: url, headerFields: headerFields) let options = options ?? self.defaultRequestOptions - return try await self.perform(request: request, body: .data(bodyData), options: options) { response, reader in + return try await self.perform(request: request, body: .data(bodyData), options: options) { response, reader, _ in ( response, try await Self.collectBody(reader, upTo: limit) @@ -172,7 +172,7 @@ where ) async throws -> (response: HTTPResponse, bodyData: Data) { let request = HTTPRequest(method: .delete, url: url, headerFields: headerFields) let options = options ?? self.defaultRequestOptions - return try await self.perform(request: request, body: bodyData.map { .data($0) }, options: options) { response, reader in + return try await self.perform(request: request, body: bodyData.map { .data($0) }, options: options) { response, reader, _ in ( response, try await Self.collectBody(reader, upTo: limit) @@ -204,7 +204,7 @@ where ) async throws -> (response: HTTPResponse, bodyData: Data) { let request = HTTPRequest(method: .patch, url: url, headerFields: headerFields) let options = options ?? self.defaultRequestOptions - return try await self.perform(request: request, body: .data(bodyData), options: options) { response, reader in + return try await self.perform(request: request, body: .data(bodyData), options: options) { response, reader, _ in ( response, try await Self.collectBody(reader, upTo: limit) diff --git a/Sources/HTTPAPIs/Client/HTTPClient.swift b/Sources/HTTPAPIs/Client/HTTPClient.swift index 776894a..9209345 100644 --- a/Sources/HTTPAPIs/Client/HTTPClient.swift +++ b/Sources/HTTPAPIs/Client/HTTPClient.swift @@ -63,6 +63,6 @@ public protocol HTTPClient: Sendable, ~Copyable, ~Escapable { request: HTTPRequest, body: consuming HTTPClientRequestBody?, options: RequestOptions, - responseHandler: (HTTPResponse, consuming Reader) async throws -> Return + responseHandler: (HTTPResponse, consuming Reader, consuming Future) async throws -> Return ) async throws -> Return } diff --git a/Sources/HTTPAPIs/Client/HTTPClientRequestBody+Data.swift b/Sources/HTTPAPIs/Client/HTTPClientRequestBody+Data.swift index 03f3db8..4d89efa 100644 --- a/Sources/HTTPAPIs/Client/HTTPClientRequestBody+Data.swift +++ b/Sources/HTTPAPIs/Client/HTTPClientRequestBody+Data.swift @@ -31,6 +31,7 @@ extension HTTPClientRequestBody where Writer: ~Copyable { copying: data.span.extracting(droppingFirst: Int(offset)) ) try await writer.finish(buffer: &buffer, finalElement: nil) + return nil } } } diff --git a/Sources/HTTPAPIs/Client/HTTPClientRequestBody.swift b/Sources/HTTPAPIs/Client/HTTPClientRequestBody.swift index dbff258..67f01cd 100644 --- a/Sources/HTTPAPIs/Client/HTTPClientRequestBody.swift +++ b/Sources/HTTPAPIs/Client/HTTPClientRequestBody.swift @@ -72,8 +72,8 @@ where Writer.WriteElement == UInt8, Writer.FinalElement == HTTPFields? { public let knownLength: Int64? private enum WriteBody { - case restartable(@Sendable (consuming Writer) async throws -> Void) - case seekable(@Sendable (Int64, consuming Writer) async throws -> Void) + case restartable(@Sendable (consuming Writer) async throws -> Writer?) + case seekable(@Sendable (Int64, consuming Writer) async throws -> Writer?) } private let writeBody: WriteBody @@ -81,7 +81,7 @@ where Writer.WriteElement == UInt8, Writer.FinalElement == HTTPFields? { /// - Parameters: /// - writer: The destination into which to write the body. /// - Throws: An error thrown from the body closure. - public func produce(into writer: consuming Writer) async throws { + public func produce(into writer: consuming Writer) async throws -> Writer? { switch self.writeBody { case .restartable(let writeBody): try await writeBody(writer) @@ -96,7 +96,7 @@ where Writer.WriteElement == UInt8, Writer.FinalElement == HTTPFields? { /// - offset: The offset from which to start writing the body. /// - writer: The destination into which to write the body. /// - Throws: An error thrown from the body closure. - public func produce(offset: Int64, into writer: consuming Writer) async throws { + public func produce(offset: Int64, into writer: consuming Writer) async throws -> Writer? { switch self.writeBody { case .restartable: fatalError("Request body is not seekable") @@ -119,7 +119,7 @@ where Writer.WriteElement == UInt8, Writer.FinalElement == HTTPFields? { /// ``CallerAsyncWriter/finish(buffer:finalElement:)`` to terminate the body. public static func restartable( knownLength: Int64? = nil, - _ body: @escaping @Sendable (consuming Writer) async throws -> Void + _ body: @escaping @Sendable (consuming Writer) async throws -> Writer? ) -> Self { Self.init( knownLength: knownLength, @@ -141,7 +141,7 @@ where Writer.WriteElement == UInt8, Writer.FinalElement == HTTPFields? { /// ``CallerAsyncWriter/finish(buffer:finalElement:)`` to terminate the body. public static func seekable( knownLength: Int64? = nil, - _ body: @escaping @Sendable (Int64, consuming Writer) async throws -> Void + _ body: @escaping @Sendable (Int64, consuming Writer) async throws -> Writer? ) -> Self { Self.init( knownLength: knownLength, @@ -158,18 +158,27 @@ where Writer.WriteElement == UInt8, Writer.FinalElement == HTTPFields? { // modules to map bodies. package init( other: HTTPClientRequestBody, - transform: @escaping @Sendable (consuming Writer) -> OtherWriter + transform: @escaping @Sendable (consuming Writer) -> OtherWriter, + reverseTransform: @escaping @Sendable (consuming OtherWriter) -> Writer ) where Writer: SendableMetatype { self.knownLength = other.knownLength self.writeBody = switch other.writeBody { case .restartable(let writeBody): .restartable { writer in - try await writeBody(transform(writer)) + if let writer = try await writeBody(transform(writer)) { + reverseTransform(writer) + } else { + nil + } } case .seekable(let writeBody): .seekable { offset, writer in - try await writeBody(offset, transform(writer)) + if let writer = try await writeBody(offset, transform(writer)) { + reverseTransform(writer) + } else { + nil + } } } } diff --git a/Sources/HTTPAPIs/Client/PromiseFuture.swift b/Sources/HTTPAPIs/Client/PromiseFuture.swift new file mode 100644 index 0000000..13b8666 --- /dev/null +++ b/Sources/HTTPAPIs/Client/PromiseFuture.swift @@ -0,0 +1,206 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift HTTP API Proposal open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift HTTP API Proposal project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Synchronization + +@available(anyAppleOS 26.0, *) +private protocol FutureProtocol: AnyObject, Sendable { + associatedtype Value: ~Copyable + func value() async throws -> sending Value +} + +@available(anyAppleOS 26.0, *) +private final class FulfillableFuture: FutureProtocol { + private enum State: ~Copyable { + case none + case continuation(Continuation) + case value(Disconnected) + case error(any Error) + + mutating func fulfill(_ value: consuming sending Value) { + switch consume self { + case .none: + self = .value(Disconnected(value: value)) + case .continuation(let continuation): + self = .none + continuation.resume(returning: value) + case .value: + fatalError() + case .error: + fatalError() + } + } + + mutating func fulfill(error: any Error) { + switch consume self { + case .none: + self = .error(error) + case .continuation(let continuation): + self = .none + continuation.resume(throwing: error) + case .value: + fatalError() + case .error: + fatalError() + } + } + + mutating func take(continuation: consuming Continuation) { + switch consume self { + case .none: + self = .continuation(continuation) + case .continuation: + fatalError() + case .value(let value): + self = .none + continuation.resume(returning: value.take()) + case .error(let error): + self = .none + continuation.resume(throwing: error) + } + } + } + private let state: Mutex = .init(.none) + + func fulfill(_ value: consuming sending Value) { + var value = Optional(Disconnected(value: value)) + self.state.withLock { + $0.fulfill(value.take()!.take()) + } + } + + func fulfill(error: any Error) { + self.state.withLock { + $0.fulfill(error: error) + } + } + + func value() async throws -> sending Value { + try await withContinuation(throwing: (any Error).self) { + var continuation = Optional($0) + self.state.withLock { + $0.take(continuation: continuation.take()!) + } + } + } +} + +@available(anyAppleOS 26.0, *) +private final class MappedFuture: FutureProtocol { + private let originalPromise: any FutureProtocol + private let transform: @Sendable (consuming sending OriginalValue) -> sending Value + + init(originalPromise: any FutureProtocol, transform: @Sendable @escaping (consuming sending OriginalValue) -> sending Value) { + self.originalPromise = originalPromise + self.transform = transform + } + + func value() async throws -> sending Value { + let value = try await self.originalPromise.value() + return self.transform(value) + } +} + +@available(anyAppleOS 26.0, *) +private final class ImmediateFuture: FutureProtocol { + private let value: Mutex?> + + init(_ value: consuming sending Value) { + self.value = .init(.init(value: value)) + } + + func value() async throws -> sending Value { + self.value.withLock { + $0.take()!.take() + } + } +} + +@frozen +public struct Pair: ~Copyable { + public let a: A + public let b: B + + public init(_ a: consuming A, _ b: consuming B) { + self.a = a + self.b = b + } +} + +@available(anyAppleOS 26.0, *) +public struct Promise: ~Copyable, Sendable { + private let state: FulfillableFuture + + public static func makePromise(of type: Value.Type) -> Pair, Future> { + let state = FulfillableFuture() + let promise = Promise(state: state) + let future = Future(state: state) + return Pair(promise, future) + } + + public consuming func fulfill(_ value: consuming sending Value) { + self.state.fulfill(value) + } + + public consuming func fulfill(error: any Error) { + self.state.fulfill(error: error) + } +} + +@available(anyAppleOS 26.0, *) +public struct Future: ~Copyable, Sendable { + private let state: any FutureProtocol + + fileprivate init(state: any FutureProtocol) { + self.state = state + } + + public init(immediateValue value: consuming sending Value) { + self.state = ImmediateFuture(value) + } + + public consuming func value() async throws -> Value { + try await self.state.value() + } + + public consuming func map(_ transform: @Sendable @escaping (consuming sending Value) -> sending NewValue) -> Future + { + let state: any FutureProtocol = MappedFuture(originalPromise: self.state, transform: transform) + return Future(state: state) + } +} + +@usableFromInline +struct Disconnected: ~Copyable, Sendable { + // This is safe since we take the value as sending and take consumes it + // and returns it as sending. + private nonisolated(unsafe) var value: Value? + + @usableFromInline + init(value: consuming sending Value) { + unsafe self.value = .some(value) + } + + @usableFromInline + consuming func take() -> sending Value { + nonisolated(unsafe) let value = unsafe self.value.take()! + return unsafe value + } + + @usableFromInline + mutating func swap(newValue: consuming sending Value) -> sending Value { + nonisolated(unsafe) let value = unsafe self.value.take()! + unsafe self.value = consume newValue + return unsafe value + } +} diff --git a/Sources/HTTPClient/DefaultHTTPClient.swift b/Sources/HTTPClient/DefaultHTTPClient.swift index e890ff4..3142ba1 100644 --- a/Sources/HTTPClient/DefaultHTTPClient.swift +++ b/Sources/HTTPClient/DefaultHTTPClient.swift @@ -44,7 +44,7 @@ public final class DefaultHTTPClient: HTTPAPIs.HTTPClient { public typealias WriteFailure = any Error public typealias FinalElement = HTTPFields? - private var actual: ActualHTTPClient.Writer + fileprivate var actual: ActualHTTPClient.Writer init(actual: consuming ActualHTTPClient.Writer) { self.actual = actual @@ -143,15 +143,22 @@ public final class DefaultHTTPClient: HTTPAPIs.HTTPClient { request: HTTPRequest, body: consuming HTTPClientRequestBody?, options: HTTPRequestOptions, - responseHandler: (HTTPResponse, consuming Reader) async throws -> Return + responseHandler: (HTTPResponse, consuming Reader, consuming Future) async throws -> Return ) async throws -> Return { // TODO: translate request options let options = self.client.defaultRequestOptions let body = body.map { - HTTPClientRequestBody(other: $0) { Writer(actual: $0) } + HTTPClientRequestBody( + other: $0, + transform: { Writer(actual: $0) }, + reverseTransform: { $0.actual } + ) } - return try await self.client.perform(request: request, body: body, options: options) { response, actualReader in - try await responseHandler(response, Reader(actual: actualReader)) + return try await self.client.perform(request: request, body: body, options: options) { response, actualReader, writer in + let writer: Future = writer.map { + if let writer = $0 { Writer(actual: writer) } else { nil } + } + return try await responseHandler(response, Reader(actual: actualReader), writer) } } } diff --git a/Sources/HTTPClient/HTTP+Conveniences.swift b/Sources/HTTPClient/HTTP+Conveniences.swift index c2ae02f..3130aad 100644 --- a/Sources/HTTPClient/HTTP+Conveniences.swift +++ b/Sources/HTTPClient/HTTP+Conveniences.swift @@ -42,7 +42,7 @@ extension HTTP { body: consuming HTTPClientRequestBody? = nil, options: HTTPRequestOptions = .init(), on client: DefaultHTTPClient = .shared, - responseHandler: (HTTPResponse, consuming DefaultHTTPClient.Reader) async throws -> Return, + responseHandler: (HTTPResponse, consuming DefaultHTTPClient.Reader, consuming Future) async throws -> Return, ) async throws -> Return { return try await client.perform(request: request, body: body, options: options, responseHandler: responseHandler) } diff --git a/Sources/HTTPClientConformance/HTTPClientConformance.swift b/Sources/HTTPClientConformance/HTTPClientConformance.swift index f074be1..d809b58 100644 --- a/Sources/HTTPClientConformance/HTTPClientConformance.swift +++ b/Sources/HTTPClientConformance/HTTPClientConformance.swift @@ -166,7 +166,7 @@ struct ConformanceTestSuite { await #expect(throws: (any Error).self) { try await client.perform( request: request, - ) { _, _ in } + ) { _, _, _ in } } } @@ -180,7 +180,7 @@ struct ConformanceTestSuite { ) try await client.perform( request: request, - ) { response, _ in + ) { response, _, _ in #expect(response.status == .ok) } } @@ -196,7 +196,7 @@ struct ConformanceTestSuite { await #expect(throws: (any Error).self) { try await client.perform( request: request, - ) { _, _ in } + ) { _, _, _ in } } } @@ -210,7 +210,7 @@ struct ConformanceTestSuite { ) try await client.perform( request: request - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .noContent) var array = UniqueArray(minimumCapacity: 1024) _ = try await responseBodyAndTrailers.collect(into: &array) @@ -229,7 +229,7 @@ struct ConformanceTestSuite { ) try await client.perform( request: request - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .notModified) var array = UniqueArray(minimumCapacity: 1024) _ = try await responseBodyAndTrailers.collect(into: &array) @@ -251,7 +251,7 @@ struct ConformanceTestSuite { await #expect(throws: (any Error).self) { try await client.perform( request: request - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) var array = UniqueArray(minimumCapacity: 1024) _ = try await responseBodyAndTrailers.collect(into: &array) @@ -274,7 +274,7 @@ struct ConformanceTestSuite { await #expect(throws: (any Error).self) { try await client.perform( request: request - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) var array = UniqueArray(minimumCapacity: 1024) _ = try await responseBodyAndTrailers.collect(into: &array) @@ -295,7 +295,7 @@ struct ConformanceTestSuite { try await client.perform( request: request - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) var array = UniqueArray(minimumCapacity: 1024) _ = try await responseBodyAndTrailers.collect(into: &array) @@ -316,7 +316,7 @@ struct ConformanceTestSuite { ) try await client.perform( request: request, - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) var array = UniqueArray(minimumCapacity: 1024) let trailers = try await responseBodyAndTrailers.collect(into: &array) @@ -337,10 +337,11 @@ struct ConformanceTestSuite { ) try await client.perform( request: request, - body: .restartable(knownLength: 0) { sender in + body: .restartable { sender in try await sender.finish() + return nil } - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) var array = UniqueArray(minimumCapacity: 1024) _ = try await responseBodyAndTrailers.collect(into: &array) @@ -364,8 +365,9 @@ struct ConformanceTestSuite { body: .restartable { sender in var body = UniqueArray.init(copying: "Hello World".utf8) try await sender.finish(buffer: &body, finalElement: nil) + return nil } - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) var array = UniqueArray(minimumCapacity: 1024) _ = try await responseBodyAndTrailers.collect(into: &array) @@ -386,7 +388,7 @@ struct ConformanceTestSuite { ) try await client.perform( request: request - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) // If gzip is not advertised by the client, a fallback to no-encoding @@ -415,7 +417,7 @@ struct ConformanceTestSuite { ) try await client.perform( request: request - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) // If deflate is not advertised by the client, a fallback to no-encoding @@ -444,7 +446,7 @@ struct ConformanceTestSuite { ) try await client.perform( request: request - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) // If brotli is not advertised by the client, a fallback to no-encoding @@ -473,7 +475,7 @@ struct ConformanceTestSuite { ) try await client.perform( request: request, - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) let contentEncoding = response.headerFields[.contentEncoding] #expect(contentEncoding == nil || contentEncoding == "identity") @@ -499,8 +501,9 @@ struct ConformanceTestSuite { body: .restartable { sender in var body = UniqueArray.init(copying: "Hello World".utf8) try await sender.finish(buffer: &body, finalElement: nil) + return nil } - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) var array = UniqueArray(minimumCapacity: 1024) _ = try await responseBodyAndTrailers.collect(into: &array) @@ -525,7 +528,7 @@ struct ConformanceTestSuite { try await client.perform( request: request, - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) var array = UniqueArray(minimumCapacity: 1024) _ = try await responseBodyAndTrailers.collect(into: &array) @@ -552,7 +555,7 @@ struct ConformanceTestSuite { await #expect(throws: (any Error).self) { try await client.perform( request: request, - ) { _, _ in } + ) { _, _, _ in } } } @@ -567,7 +570,7 @@ struct ConformanceTestSuite { try await client.perform( request: request, - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .notFound) var array = UniqueArray(minimumCapacity: 1024) _ = try await responseBodyAndTrailers.collect(into: &array) @@ -587,7 +590,7 @@ struct ConformanceTestSuite { try await client.perform( request: request, - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == 999) var array = UniqueArray(minimumCapacity: 1024) _ = try await responseBodyAndTrailers.collect(into: &array) @@ -610,7 +613,7 @@ struct ConformanceTestSuite { group.addTask { try await client.perform( request: request, - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) var array = UniqueArray(minimumCapacity: 1024) _ = try await responseBodyAndTrailers.collect(into: &array) @@ -654,8 +657,9 @@ struct ConformanceTestSuite { await writerWaiting.first(where: { true }) } try await writer.finish(trailer: nil) + return nil } - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) let reader = responseBodyAndTrailers var numberOfChunks = 0 @@ -690,7 +694,7 @@ struct ConformanceTestSuite { try await client.perform( request: request, - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) var array = UniqueArray(minimumCapacity: 1024) _ = try await responseBodyAndTrailers.collect(into: &array) @@ -727,8 +731,9 @@ struct ConformanceTestSuite { try await writer.write(buffer: &buffer) } try await writer.finish(trailer: nil) + return nil } - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) let reader = responseBodyAndTrailers // Read all chunks from server @@ -769,7 +774,7 @@ struct ConformanceTestSuite { try await client.perform( request: request, - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in assertionFailure("Never expected to actually receive a response") } } @@ -807,7 +812,7 @@ struct ConformanceTestSuite { try await client.perform( request: request, - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) let reader = responseBodyAndTrailers @@ -878,8 +883,9 @@ struct ConformanceTestSuite { // Write out 1Mb of "A" var body = UniqueArray.init(copying: String(repeating: "A", count: 1_000_000).data(using: .ascii)!) try await sender.finish(buffer: &body, finalElement: nil) + return nil } - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) var array = UniqueArray(minimumCapacity: 2_000_000) _ = try await responseBodyAndTrailers.collect(into: &array) @@ -902,7 +908,7 @@ struct ConformanceTestSuite { // when the reader produces more elements than the limit, so we tolerate it. try await client.perform( request: request, - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) var reader = responseBodyAndTrailers var firstByte: UInt8? = nil @@ -933,7 +939,7 @@ struct ConformanceTestSuite { // Read the whole body a byte at a time from the reader. try await client.perform( request: request, - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) let reader = responseBodyAndTrailers @@ -957,7 +963,7 @@ struct ConformanceTestSuite { ) try await client.perform( request: request, - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) var array = UniqueArray(minimumCapacity: 1024) let trailers = try await responseBodyAndTrailers.collect(into: &array) @@ -977,7 +983,7 @@ struct ConformanceTestSuite { ) try await client.perform( request: request, - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) let values = response.headerFields[values: .init("X-Test")!] @@ -1006,7 +1012,7 @@ struct ConformanceTestSuite { ) try await client.perform( request: request, - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in var array = UniqueArray(minimumCapacity: 1024) _ = try await responseBodyAndTrailers.collect(into: &array) let body = String(copying: try UTF8Span(validating: array.span)) @@ -1035,7 +1041,7 @@ struct ConformanceTestSuite { authority: "127.0.0.1:\(testServerPort)", path: "/cookie" ) - let serverCookie = try await client.perform(request: request1) { response, responseBodyAndTrailers in + let serverCookie = try await client.perform(request: request1) { response, responseBodyAndTrailers, _ in // Parse the cookie #expect(response.headerFields.contains(.setCookie)) let values = response.headerFields[values: .setCookie] @@ -1052,7 +1058,7 @@ struct ConformanceTestSuite { authority: "127.0.0.1:\(testServerPort)", path: "/request" ) - let clientCookie = try await client.perform(request: request2) { response, responseBodyAndTrailers in + let clientCookie = try await client.perform(request: request2) { response, responseBodyAndTrailers, _ in // The server gave us the request back. Check that the cookie was in the request. var array = UniqueArray(minimumCapacity: 1024) _ = try await responseBodyAndTrailers.collect(into: &array) @@ -1110,7 +1116,7 @@ struct ConformanceTestSuite { for attempt in 0..<2 { try await client.perform( request: request - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) #expect(response.headerFields[.eTag] == expectedResponse) if attempt == 0 { @@ -1142,7 +1148,7 @@ struct ConformanceTestSuite { let request = HTTPRequest(url: components.url!) try await client.perform( request: request, - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in var array = UniqueArray(minimumCapacity: 1024) _ = try await responseBodyAndTrailers.collect(into: &array) let body = String(copying: try UTF8Span(validating: array.span)) @@ -1170,7 +1176,7 @@ struct ConformanceTestSuite { ) try await client.perform( request: request - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) var array = UniqueArray(minimumCapacity: 1024) let trailers = try await responseBodyAndTrailers.collect(into: &array) @@ -1208,8 +1214,9 @@ struct ConformanceTestSuite { .init("X-Request-Trailer-Two")!: "second-trailer-value", ] ) + return nil } - ) { response, responseBodyAndTrailers in + ) { response, responseBodyAndTrailers, _ in #expect(response.status == .ok) var array = UniqueArray(minimumCapacity: 1024) _ = try await responseBodyAndTrailers.collect(into: &array) diff --git a/Sources/URLSessionHTTPClient/URLSessionHTTPClient.swift b/Sources/URLSessionHTTPClient/URLSessionHTTPClient.swift index 56b66e7..57b5114 100644 --- a/Sources/URLSessionHTTPClient/URLSessionHTTPClient.swift +++ b/Sources/URLSessionHTTPClient/URLSessionHTTPClient.swift @@ -371,7 +371,7 @@ public final class URLSessionHTTPClient: HTTPClient, IdleTimerEntryProvider { request: HTTPRequest, body: consuming HTTPClientRequestBody?, options: URLSessionRequestOptions, - responseHandler: (HTTPResponse, consuming Reader) async throws -> Return + responseHandler: (HTTPResponse, consuming Reader, consuming Future) async throws -> Return ) async throws -> Return { guard request.schemeSupported else { throw HTTPTypeConversionError.unsupportedScheme @@ -396,11 +396,13 @@ public final class URLSessionHTTPClient: HTTPClient, IdleTimerEntryProvider { var result: Result? = nil try await withTaskCancellationHandler { do { - let response = try await delegateBridge.processDelegateCallbacksBeforeResponse(options) + var futureWriter: Future? = nil + let response = try await delegateBridge.processDelegateCallbacksBeforeResponse(options, futureWriter: &futureWriter) guard let response = (response as? HTTPURLResponse)?.httpResponse else { throw HTTPTypeConversionError.failedToConvertURLTypeToHTTPTypes } - result = .success(try await responseHandler(response, Reader(actual: delegateBridge))) + let writer = futureWriter ?? Future(immediateValue: nil) + result = .success(try await responseHandler(response, Reader(actual: delegateBridge), writer)) } catch { result = .failure(error) } diff --git a/Sources/URLSessionHTTPClient/URLSessionTaskDelegateBridge.swift b/Sources/URLSessionHTTPClient/URLSessionTaskDelegateBridge.swift index 3c9152d..ad4f2d5 100644 --- a/Sources/URLSessionHTTPClient/URLSessionTaskDelegateBridge.swift +++ b/Sources/URLSessionHTTPClient/URLSessionTaskDelegateBridge.swift @@ -40,8 +40,6 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionTaskDele private let stream: AsyncStream private let continuation: AsyncStream.Continuation private let requestBody: HTTPClientRequestBody? - // TODO: Can we get rid of this task and instead use on task group per client? - private let requestBodyTask: Mutex?> = .init(nil) init(task: URLSessionTask, body: consuming HTTPClientRequestBody?) { self.task = task @@ -55,7 +53,7 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionTaskDele private static let highWatermark = 256 * 1024 - private struct TaskState { + private struct TaskState: ~Copyable { // This describes the current state of the HTTP response and also the waiting relationship // between external client invoking the API and URLSession. enum State { @@ -73,7 +71,10 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionTaskDele } var state: State = .awaitingResponse var completionContinuation: CheckedContinuation? = nil - var responseTrailerFields: HTTPFields? + var responseTrailerFields: HTTPFields? = nil + // TODO: Can we get rid of this task and instead use on task group per client? + var requestBodyTask: Task? = nil + var requestBodyResult: Future? = nil } private let state: Mutex = .init(.init()) @@ -99,9 +100,9 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionTaskDele break } } - return state + return state.state } - switch oldState.state { + switch oldState { case .awaitingResponse: self.continuation.yield(.response(response)) case .awaitingData, .awaitingConsumption: @@ -137,9 +138,9 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionTaskDele ) } } - return state + return state.state } - switch oldState.state { + switch oldState { case .awaitingData(let continuation): continuation.resume(returning: (false, nil)) case .awaitingResponse: @@ -152,7 +153,7 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionTaskDele } func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: (any Error)?) { - let oldState = self.state.withLock { state in + let (oldState, completionContinuation) = self.state.withLock { state in defer { switch state.state { case .awaitingData: @@ -173,9 +174,9 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionTaskDele state.responseTrailerFields = trailerFields } } - return state + return (state.state, state.completionContinuation) } - switch oldState.state { + switch oldState { case .awaitingResponse: self.continuation.yield(.error(error ?? URLError(.unknown))) case .awaitingData(let continuation): @@ -183,7 +184,7 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionTaskDele case .awaitingConsumption: break } - oldState.completionContinuation?.resume() + completionContinuation?.resume() self.continuation.finish() } @@ -249,16 +250,23 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionTaskDele guard let requestBody = self.requestBody else { fatalError() } - self.requestBodyTask.withLock { - let oldTask = $0 + self.state.withLock { + let oldTask = $0.requestBodyTask oldTask?.cancel() - $0 = Task.immediate { + + let pair = Promise.makePromise(of: URLSessionHTTPClient.Writer?.self) + var requestBodyPromise = Optional(pair.a) + $0.requestBodyResult = consume pair.b + + $0.requestBodyTask = Task.immediate { await oldTask?.value let bridge = URLSessionRequestStreamBridge(task: task) completionHandler(bridge.inputStream) do { - try await requestBody.produce(into: URLSessionHTTPClient.Writer(actual: bridge)) + let result = try await requestBody.produce(into: URLSessionHTTPClient.Writer(actual: bridge)) + requestBodyPromise.take()!.fulfill(result) } catch { + requestBodyPromise.take()!.fulfill(error: error) if bridge.writeFailed { // Ignore error } else { @@ -279,16 +287,23 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionTaskDele guard let requestBody = self.requestBody else { fatalError() } - self.requestBodyTask.withLock { - let oldTask = $0 + self.state.withLock { + let oldTask = $0.requestBodyTask oldTask?.cancel() - $0 = Task.immediate { + + let pair = Promise.makePromise(of: URLSessionHTTPClient.Writer?.self) + var requestBodyPromise = Optional(pair.a) + $0.requestBodyResult = consume pair.b + + $0.requestBodyTask = Task.immediate { await oldTask?.value let bridge = URLSessionRequestStreamBridge(task: task) completionHandler(bridge.inputStream) do { - try await requestBody.produce(offset: offset, into: URLSessionHTTPClient.Writer(actual: bridge)) + let result = try await requestBody.produce(offset: offset, into: URLSessionHTTPClient.Writer(actual: bridge)) + requestBodyPromise.take()!.fulfill(result) } catch { + requestBodyPromise.take()!.fulfill(error: error) if bridge.writeFailed { // Ignore error } else { @@ -341,10 +356,14 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionTaskDele self.continuation.yield(.error(error)) } - func processDelegateCallbacksBeforeResponse(_ options: URLSessionRequestOptions) async throws -> URLResponse { + func processDelegateCallbacksBeforeResponse( + _ options: URLSessionRequestOptions, + futureWriter: inout Future? + ) async throws -> URLResponse { for await callback in self.stream { switch callback { case .response(let response): + futureWriter = self.state.withLock { $0.requestBodyResult.take() } return response case .redirection(let response, let request, let completionHandler): if let redirectionHandler = options.redirectionHandler { @@ -441,11 +460,11 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionTaskDele case .challenge(_, let completionHandler): completionHandler(.cancelAuthenticationChallenge, nil) case .error(let error): - await self.requestBodyTask.withLock { $0 }?.value + await self.state.withLock { $0.requestBodyTask }?.value throw error } } - await self.requestBodyTask.withLock { $0 }?.value + await self.state.withLock { $0.requestBodyTask }?.value await withCheckedContinuation { continuation in self.state.withLock { state in if case .awaitingConsumption(_, true, _, _) = state.state { diff --git a/Tests/HTTPAPIsTests/EchoTests.swift b/Tests/HTTPAPIsTests/EchoTests.swift index a320051..13c4521 100644 --- a/Tests/HTTPAPIsTests/EchoTests.swift +++ b/Tests/HTTPAPIsTests/EchoTests.swift @@ -52,8 +52,9 @@ struct HTTPClientAndServerTests { buffer: &body, finalElement: [.date: "test"] ) + return nil } - ) { (response: HTTPResponse, reader: consuming TestClientAndServer.AsyncChannelBodyReader) in + ) { (response: HTTPResponse, reader: consuming TestClientAndServer.AsyncChannelBodyReader, _) in #expect(response.status == .ok) var responseBody = UniqueArray(minimumCapacity: 100) let trailer = try await reader.collect(into: &responseBody) diff --git a/Tests/HTTPAPIsTests/Helpers/HTTPClientAndServerTests.swift b/Tests/HTTPAPIsTests/Helpers/HTTPClientAndServerTests.swift index 84c5198..ca7de9d 100644 --- a/Tests/HTTPAPIsTests/Helpers/HTTPClientAndServerTests.swift +++ b/Tests/HTTPAPIsTests/Helpers/HTTPClientAndServerTests.swift @@ -204,7 +204,7 @@ final class TestClientAndServer: HTTPClient, HTTPServer { request: HTTPRequest, body: consuming HTTPClientRequestBody?, options: RequestOptions, - responseHandler: (HTTPResponse, consuming AsyncChannelBodyReader) async throws -> Return + responseHandler: (HTTPResponse, consuming AsyncChannelBodyReader, consuming Future) async throws -> Return ) async throws -> Return { let response = try await withCheckedThrowingContinuation { continuation in self.requests.withLock { requests in @@ -232,7 +232,8 @@ final class TestClientAndServer: HTTPClient, HTTPServer { return try await responseHandler( response.response, // Needed since we are lacking call-once closures - response.takeResponseReader() + response.takeResponseReader(), + Future(immediateValue: nil) ) } diff --git a/Tests/HTTPAPIsTests/ServerCapabilityTests.swift b/Tests/HTTPAPIsTests/ServerCapabilityTests.swift index e698c50..78184ec 100644 --- a/Tests/HTTPAPIsTests/ServerCapabilityTests.swift +++ b/Tests/HTTPAPIsTests/ServerCapabilityTests.swift @@ -60,7 +60,7 @@ struct ServerCapabilityTests { try await client.perform( request: request, body: nil - ) { response, reader in + ) { response, reader, _ in #expect(response.status == .ok) var reader = reader _ = try await reader.collect(upTo: 100) { _ in }