diff --git a/Examples/ExampleMiddleware/HTTPServerLoggingMiddleware.swift b/Examples/ExampleMiddleware/HTTPServerLoggingMiddleware.swift index 7322cff..5d1bbee 100644 --- a/Examples/ExampleMiddleware/HTTPServerLoggingMiddleware.swift +++ b/Examples/ExampleMiddleware/HTTPServerLoggingMiddleware.swift @@ -151,6 +151,7 @@ where public struct RequestBodyAsyncReader: AsyncReader, ~Copyable { public typealias ReadElement = UInt8 public typealias ReadFailure = Base.Underlying.ReadFailure + public typealias Buffer = Base.Underlying.Buffer private var underlying: Base.Underlying private let logger: Logger @@ -160,16 +161,13 @@ where self.logger = logger } - public mutating func read( - maximumCount: Int?, - body: (consuming Span) async throws(Failure) -> Return + public mutating func read( + body: (inout Buffer) async throws(Failure) -> Return ) async throws(EitherError) -> Return { let logger = self.logger - return try await self.underlying.read( - maximumCount: maximumCount - ) { (span: Span) async throws(Failure) -> Return in - logger.info("Received next chunk \(span.count)") - return try await body(span) + return try await self.underlying.read { (buffer: inout Buffer) async throws(Failure) -> Return in + logger.info("Received next chunk \(buffer.count)") + return try await body(&buffer) } } } @@ -219,6 +217,7 @@ where public struct ResponseBodyAsyncWriter: AsyncWriter, ~Copyable { public typealias WriteElement = UInt8 public typealias WriteFailure = Base.Underlying.WriteFailure + public typealias Buffer = Base.Underlying.Buffer private var underlying: Base.Underlying private let logger: Logger @@ -228,14 +227,13 @@ where self.logger = logger } - public mutating func write( - _ body: (inout OutputSpan) async throws(Failure) -> Result + public mutating func write( + _ body: (inout Buffer) async throws(Failure) -> Result ) async throws(EitherError) -> Result { - return try await self.underlying.write { (outputSpan: inout OutputSpan) async throws(Failure) -> Result in - defer { - self.logger.info("Wrote response bytes \(outputSpan.count)") - } - return try await body(&outputSpan) + return try await self.underlying.write { (buffer: inout Buffer) async throws(Failure) -> Result in + let result = try await body(&buffer) + self.logger.info("Wrote response bytes \(buffer.count)") + return result } } } diff --git a/Examples/ProxyServer/ProxyServer.swift b/Examples/ProxyServer/ProxyServer.swift index 3962928..c2d1570 100644 --- a/Examples/ProxyServer/ProxyServer.swift +++ b/Examples/ProxyServer/ProxyServer.swift @@ -27,10 +27,14 @@ struct ProxyServer { } static func proxy(server: some HTTPServer, client: some HTTPClient) async throws { - try await server.serve { request, requestContext, serverRequestBodyAndTrailers, responseSender in + try await server.serve { + request, + requestContext, + serverRequestBodyAndTrailers, + responseSender in // We need to use a mutex here to move the requestBodyAndTrailers into the // @Sendable restartable body - let serverRequestBodyAndTrailers = Mutex(Optional(serverRequestBodyAndTrailers)) + let serverRequestBodyAndTrailers = Mutex(Disconnected(value: Optional(serverRequestBodyAndTrailers))) // Needed since we are lacking call-once closures var responseSender = Optional(responseSender) @@ -41,7 +45,9 @@ struct ProxyServer { var clientRequestBody = clientRequestBody // This takes the request body out of the mutex. Any restarts would hit // a force-unwrap. - let serverRequestBodyAndTrailers = serverRequestBodyAndTrailers.withLock { $0.take()! } + let serverRequestBodyAndTrailers = serverRequestBodyAndTrailers.withLock { + $0.swap(newValue: nil) + }! return try await serverRequestBodyAndTrailers.consumeAndConclude { serverRequestBody in try await clientRequestBody.write(serverRequestBody) @@ -62,3 +68,28 @@ struct ProxyServer { } } } + +@usableFromInline +struct Disconnected: ~Copyable, Sendable { + // This is safe since we take the value as sending and take consumes it + // and returns it as sending. + private nonisolated(unsafe) var value: Value? + + @usableFromInline + init(value: consuming sending Value) { + unsafe self.value = .some(value) + } + + @usableFromInline + consuming func take() -> sending Value { + nonisolated(unsafe) let value = unsafe self.value.take()! + return unsafe value + } + + @usableFromInline + mutating func swap(newValue: consuming sending Value) -> sending Value { + nonisolated(unsafe) let value = unsafe self.value.take()! + unsafe self.value = consume newValue + return unsafe value + } +} diff --git a/Examples/WASMClient/main.swift b/Examples/WASMClient/main.swift index 258bcbb..116f3e0 100644 --- a/Examples/WASMClient/main.swift +++ b/Examples/WASMClient/main.swift @@ -12,6 +12,8 @@ //===----------------------------------------------------------------------===// import AsyncStreaming +import BasicContainers +import ContainersPreview import FetchHTTPClient import Foundation import HTTPAPIs @@ -91,20 +93,12 @@ do { var reader = reader status.set("⏳ Read \(bytes.count) bytes") - while true { - let shouldContinue = try await reader.read(maximumCount: nil) { span in - if span.isEmpty { - return false - } - for i in span.indices { - bytes.append(span[i]) - } - status.set("⏳ Read \(bytes.count) bytes") - return true - } - if !shouldContinue { - break + try await reader.forEachBuffer { buffer in + var consumer = buffer.consumeAll() + while let b = consumer.next() { + bytes.append(b) } + status.set("⏳ Read \(bytes.count) bytes") } return bytes } diff --git a/Package.swift b/Package.swift index 8b39e6e..5c72dc7 100644 --- a/Package.swift +++ b/Package.swift @@ -29,7 +29,6 @@ let package = Package( .library(name: "HTTPClient", targets: ["HTTPClient"]), .library(name: "URLSessionHTTPClient", targets: ["URLSessionHTTPClient"]), .library(name: "AHCHTTPClient", targets: ["AHCHTTPClient"]), - .library(name: "AsyncStreaming", targets: ["AsyncStreaming"]), .library(name: "NetworkTypes", targets: ["NetworkTypes"]), .library(name: "Middleware", targets: ["Middleware"]), .library(name: "HTTPClientConformance", targets: ["HTTPClientConformance"]), @@ -40,12 +39,13 @@ let package = Package( ], dependencies: [ .package( - url: "https://github.com/FranzBusch/swift-collections.git", - branch: "fb-async" + url: "https://github.com/apple/swift-collections.git", + from: "1.5.0" ), .package( url: "https://github.com/apple/swift-async-algorithms.git", - from: "1.1.2" + revision: "d0b4a06d0f173a2f3be27d3ea21b3c3aa18db440", + traits: ["UnstableAsyncStreaming"] ), .package(url: "https://github.com/apple/swift-http-types.git", from: "1.5.1"), .package(url: "https://github.com/apple/swift-certificates.git", from: "1.16.0"), @@ -62,7 +62,7 @@ let package = Package( .target( name: "HTTPAPIs", dependencies: [ - "AsyncStreaming", + .product(name: "AsyncStreaming", package: "swift-async-algorithms"), "NetworkTypes", .product(name: "HTTPTypes", package: "swift-http-types"), ], @@ -81,16 +81,6 @@ let package = Package( name: "NetworkTypes", swiftSettings: extraSettings ), - .target( - name: "AsyncStreaming", - dependencies: [ - .product( - name: "BasicContainers", - package: "swift-collections" - ) - ], - swiftSettings: extraSettings - ), .target( name: "Middleware", swiftSettings: extraSettings @@ -99,7 +89,7 @@ let package = Package( name: "AHCHTTPClient", dependencies: [ "HTTPAPIs", - "AsyncStreaming", + .product(name: "AsyncStreaming", package: "swift-async-algorithms"), "NetworkTypes", .product(name: "HTTPTypes", package: "swift-http-types"), .product(name: "HTTPTypesFoundation", package: "swift-http-types"), @@ -113,7 +103,7 @@ let package = Package( name: "URLSessionHTTPClient", dependencies: [ "HTTPAPIs", - "AsyncStreaming", + .product(name: "AsyncStreaming", package: "swift-async-algorithms"), "NetworkTypes", .product(name: "HTTPTypes", package: "swift-http-types"), .product(name: "HTTPTypesFoundation", package: "swift-http-types"), @@ -129,7 +119,7 @@ let package = Package( "HTTPClient", // These dependencies are needed by the `swift-http-server` that // we borrowed. - "AsyncStreaming", + .product(name: "AsyncStreaming", package: "swift-async-algorithms"), .product(name: "DequeModule", package: "swift-collections"), .product(name: "BasicContainers", package: "swift-collections"), .product(name: "X509", package: "swift-certificates"), @@ -161,18 +151,11 @@ let package = Package( ], swiftSettings: extraSettings ), - .testTarget( - name: "AsyncStreamingTests", - dependencies: [ - "AsyncStreaming" - ], - swiftSettings: extraSettings - ), .testTarget( name: "HTTPAPIsTests", dependencies: [ "HTTPAPIs", - "AsyncStreaming", + .product(name: "AsyncStreaming", package: "swift-async-algorithms"), "NetworkTypes", .product(name: "AsyncAlgorithms", package: "swift-async-algorithms"), ], @@ -287,7 +270,8 @@ if enableWASM { name: "FetchHTTPClient", dependencies: [ "HTTPAPIs", - "AsyncStreaming", + .product(name: "AsyncStreaming", package: "swift-async-algorithms"), + .product(name: "BasicContainers", package: "swift-collections"), .product(name: "HTTPTypes", package: "swift-http-types"), .product( name: "JavaScriptKit", @@ -306,6 +290,8 @@ if enableWASM { name: "WASMClient", dependencies: [ "FetchHTTPClient", + .product(name: "BasicContainers", package: "swift-collections"), + .product(name: "ContainersPreview", package: "swift-collections"), .product( name: "JavaScriptKit", package: "JavaScriptKit", diff --git a/Sources/AHCHTTPClient/AHC+HTTPClient.swift b/Sources/AHCHTTPClient/AHC+HTTPClient.swift index 4469f5a..6952480 100644 --- a/Sources/AHCHTTPClient/AHC+HTTPClient.swift +++ b/Sources/AHCHTTPClient/AHC+HTTPClient.swift @@ -12,7 +12,7 @@ //===----------------------------------------------------------------------===// @_spi(ExperimentalHTTPAPIsSupport) public import AsyncHTTPClient -import BasicContainers +public import BasicContainers import Foundation @_exported public import HTTPAPIs import HTTPTypes @@ -32,46 +32,42 @@ extension AsyncHTTPClient.HTTPClient: HTTPAPIs.HTTPClient { public struct RequestBodyWriter: AsyncWriter, ~Copyable { public typealias WriteElement = UInt8 public typealias WriteFailure = any Error + public typealias Buffer = UniqueArray let requestWriter: HTTPClientRequest.Body.RequestWriter var byteBuffer: ByteBuffer - var rigidArray: RigidArray + var buffer: UniqueArray? init(_ requestWriter: HTTPClientRequest.Body.RequestWriter) { self.requestWriter = requestWriter self.byteBuffer = ByteBuffer() self.byteBuffer.reserveCapacity(2 ^ 16) - self.rigidArray = RigidArray(capacity: 2 ^ 16) // ~ 65k bytes + self.buffer = UniqueArray(minimumCapacity: 2 ^ 16) } - public mutating func write( - _ body: (inout OutputSpan) async throws(Failure) -> Result - ) async throws(AsyncStreaming.EitherError) -> Result where Failure: Error { - let result: Result + public mutating func write( + _ body: (inout UniqueArray) async throws(Failure) -> Return + ) async throws(AsyncStreaming.EitherError) -> Return where Failure: Error { + let result: Return + // This force-unwrap is safe since there can only be one concurrent write + var buffer = self.buffer.take()! do { - // TODO: rigidArray needs a clear all - self.rigidArray.removeAll() - self.rigidArray.reserveCapacity(1024) - result = try await self.rigidArray.append(count: 1024) { (span) async throws(Failure) -> Result in - try await body(&span) - } - - if self.rigidArray.isEmpty { - return result - } + result = try await body(&buffer) } catch { + buffer.removeAll() + self.buffer = consume buffer throw .second(error) } + if buffer.count == 0 { + self.buffer = consume buffer + return result + } do { self.byteBuffer.clear() - - // we need to use an uninitilized helper rigidarray here to make the compiler happy - // with regards overlapping memory access. - var localArray = RigidArray(capacity: 0) - swap(&localArray, &self.rigidArray) - unsafe self.byteBuffer.writeBytes(localArray.span.bytes) - swap(&localArray, &self.rigidArray) + self.byteBuffer.writeBytes(buffer.span.bytes) + buffer.removeAll() + self.buffer = consume buffer try await self.requestWriter.writeRequestBodyPart(self.byteBuffer) } catch { throw .first(error) @@ -113,74 +109,34 @@ extension AsyncHTTPClient.HTTPClient: HTTPAPIs.HTTPClient { public struct ResponseBodyReader: AsyncReader, ~Copyable { public typealias ReadElement = UInt8 public typealias ReadFailure = any Error + public typealias Buffer = UniqueArray var underlying: HTTPClientResponse.Body.AsyncIterator - var out = RigidArray() - var readerIndex = 0 + var buffer = UniqueArray() - public mutating func read( - maximumCount: Int?, - body: (consuming Span) async throws(Failure) -> Return + public mutating func read( + body: (inout UniqueArray) async throws(Failure) -> Return ) async throws(AsyncStreaming.EitherError) -> Return where Failure: Error { + let byteBuffer: ByteBuffer? do { - // if have enough data for the read request available, hand it to the user right away - if let maximumCount, maximumCount <= self.out.count - self.readerIndex { - defer { - self.readerIndex += maximumCount - self.reallocateIfNeeded() - } - return try await body(self.out.span.extracting(self.readerIndex..<(self.readerIndex + maximumCount))) - } - - // we have data remaining in the local buffer. hand that to the user next. - if self.readerIndex < self.out.count { - defer { - self.readerIndex = self.out.count - self.reallocateIfNeeded() - } - return try await body(self.out.span.extracting(self.readerIndex.. { _ in } - return try await body(array.span) - } - - let readLength = maximumCount != nil ? min(maximumCount!, buffer.readableBytes) : buffer.readableBytes - self.out.reserveCapacity(self.out.count + buffer.readableBytes) - let alreadyRead = self.out.count - unsafe buffer.withUnsafeReadableBytes { rawBufferPtr in - let usbptr = unsafe rawBufferPtr.assumingMemoryBound(to: UInt8.self) - unsafe self.out.append(copying: usbptr) - } - defer { - self.readerIndex = alreadyRead + readLength - self.reallocateIfNeeded() - } - return try await body(self.out.span.extracting(alreadyRead..<(alreadyRead + readLength))) - } catch let error as Failure { - throw .second(error) + byteBuffer = try await self.underlying.next(isolation: #isolation) } catch { throw .first(error) } - } - private mutating func reallocateIfNeeded() { - guard self.readerIndex > 2 ^ 16 else { - return + if let byteBuffer, byteBuffer.readableBytes > 0 { + buffer.reserveCapacity(byteBuffer.readableBytes) + unsafe byteBuffer.withUnsafeReadableBytes { rawBufferPtr in + let usbptr = unsafe rawBufferPtr.assumingMemoryBound(to: UInt8.self) + unsafe self.buffer.append(copying: usbptr) + } } - let newCapacity = max(self.out.count - self.readerIndex, 2 ^ 16) - - self.out = RigidArray(capacity: newCapacity) { - // this is probably super slow. - for i in self.readerIndex..: Error { - /// An error of the first type. - /// - /// The associated value contains the specific error instance of type `First`. - case first(First) - - /// An error of the second type. - /// - /// The associated value contains the specific error instance of type `Second`. - case second(Second) - - /// Throws the underlying error by unwrapping this either error. - /// - /// This method extracts and throws the actual error contained within the either error, - /// whether it's the first or second type. This is useful when you need to propagate - /// the original error without the either error wrapper. - /// - /// - Throws: The underlying error, either of type `First` or `Second`. - /// - /// ## Example - /// - /// ```swift - /// do { - /// // Some operation that returns EitherError - /// let result = try await operation() - /// } catch let eitherError as EitherError { - /// try eitherError.unwrap() // Throws the original error - /// } - /// ``` - public func unwrap() throws -> Never { - switch self { - case .first(let first): - throw first - case .second(let second): - throw second - } - } -} - -extension EitherError: Equatable where First: Equatable, Second: Equatable {} -extension EitherError: Hashable where First: Hashable, Second: Hashable {} diff --git a/Sources/AsyncStreaming/Internal/InlineArray+convenience.swift b/Sources/AsyncStreaming/Internal/InlineArray+convenience.swift deleted file mode 100644 index 8130d80..0000000 --- a/Sources/AsyncStreaming/Internal/InlineArray+convenience.swift +++ /dev/null @@ -1,23 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -extension InlineArray where Element: ~Copyable { - package static func one(value: consuming Element) -> InlineArray<1, Element> { - return InlineArray<1, Element>(first: value) { _ in fatalError() } - } - - package static func zero(of elementType: Element.Type = Element.self) -> InlineArray<0, Element> { - return InlineArray<0, Element> { _ in } - } -} diff --git a/Sources/AsyncStreaming/README.md b/Sources/AsyncStreaming/README.md deleted file mode 100644 index d00f088..0000000 --- a/Sources/AsyncStreaming/README.md +++ /dev/null @@ -1,388 +0,0 @@ -# AsyncReader and AsyncWriter protocols - -## Introduction - -This target introduces new `AsyncReader` and `AsyncWriter` protocols that -provide a pull/push-based interface for asynchronous streaming such as file I/O, -networking and more. It builds on the learnings of `AsyncSequence` with support -for `~Copyable` and `~Escapable` types, typed throws, lifetimes and more. - -## Motivation - -While `AsyncSequence` has seen widespread adoption for consuming asynchronous -streams, several limitations have emerged over the past years: - -### No support for `~Copyable` and `~Escapable` types - -`AsyncSequence` was introduced before `~Copyable` and `~Escapable` types were -introduced, hence, the current `AsyncSequence` protocol's does not support types -with those constraints. Furthermore, it doesn't allow elements with those -constraints either. - -### Iterator pattern isn't fitting - -`AsyncSequence` followed the design principles of its synchronous counter part -`Sequence`. While iterators are a good abstraction for those it became obvious -that for asynchronous sequences they aren't a good fit. This is due to two -reasons. First, most asynchronous sequences do not support multiple iterators. -Secondly, most asynchronous sequences are not replayable. - -### Final elements - -Some asynchronous sequences can finish with a special last element. A common -example are HTTP trailers that are an optional part at the end of an HTTP -request or response. The current `AsyncSequence` protocol only allows to express -this by making the `Element` an `Either` like type. - -### Bulk iteration - -The current `AsyncIterator.next()` method only allows iteration element by -element. This limits performance by requiring multiple calls to retrieve -elements from the iterator even if those elements are already available. - -### Bi-directional streaming and Structured Concurrency - -`AsyncSequence`s are used to express a series of asynchronous elements such as -the requests or response body parts of an HTTP request. Various APIs around the -ecosystem have adopted `AsyncSequence`s such as `NIOFileSystem`, -`AsyncHTTPClient` or `grpc-swift`. During the design and implementation of APIs -that support bi-directional streaming such as HTTP or gRPC it became apparent -that pull-based `AsyncSequence`s model is only working for one side of the -bi-directional streaming. Trying to express both side as an `AsyncSequence` -forced the introduction of unstructured tasks breaking Structured Concurrency -guarantees. - -```swift -func bidirectionalStreaming(input: some AsyncSequence) async throws -> some AsyncSequence { - // The output async sequence can start producing values before the input has been fully streamed - // this forces us to create an unstructured task to continue iterating the input after the return of this method - Task { - for await byte in input { - // Send byte - } - } - return ConcreteAsyncSequence() -} -``` - -This is due to that fact that `AsyncSequence` is a pull-based model, if the -input and output in a bi-directional streaming setup are related then using a -pull-based model into both directions can work; however, when the two are -unrelated then a push-based model for the output is a better fit. Hence, we see -a proliferation of asynchronous writer protocols and types throughout the -ecosystem such as: -- [NIOAsyncWriter](https://github.com/apple/swift-nio/blob/main/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift) -- [WritableFileHandleProtocol](https://github.com/apple/swift-nio/blob/767ea9ee09c4227d32f230c7e24bb9f5a6a5cfd9/Sources/NIOFS/FileHandleProtocol.swift#L448) -- [RPCWriterProtocol](https://github.com/grpc/grpc-swift-2/blob/5c04d83ba35f4343dcf691a000bcb89f68755587/Sources/GRPCCore/Streaming/RPCWriterProtocol.swift#L19) - -### Some algorithms break Structured Concurrency - -During the implementation of various algorithms inside `swift-async-algorithms`, -we learned that whenever the production of values needs to outlive a single call -to the iterator's `next()` method it forced us to use unstructured tasks. -Examples of this are: -- [merge](https://github.com/apple/swift-async-algorithms/blob/26111a6fb73ce448a41579bbdb12bdebd66672f1/Sources/AsyncAlgorithms/Merge/AsyncMerge2Sequence.swift#L16) - where a single call to `next` races multiple base asynchronous sequences. We - return the first value produced by any of the bases but the calls to the other - bases still need to continue. -- [zip](https://github.com/apple/swift-async-algorithms/blob/26111a6fb73ce448a41579bbdb12bdebd66672f1/Sources/AsyncAlgorithms/Zip/AsyncZip2Sequence.swift#L15) - same problem as `merge`. -- [buffer](https://github.com/apple/swift-async-algorithms/blob/26111a6fb73ce448a41579bbdb12bdebd66672f1/Sources/AsyncAlgorithms/Buffer/AsyncBufferSequence.swift#L25) - where the base needs to produce elements until the buffer is full - -While the implementations try their best to make the usage of unstructured tasks -as _structured_ as possible, there are multiple problems with their usage: -1. Cancellation needs to be propagated manually -2. Priority escalation needs to be propagated manually -3. Task executor preference needs to be propagated manually -4. Task locals are only copied on the first call to `next` - -## Proposed solution - -### `AsyncReader` - -`AsyncReader` is a replacement to `AsyncSequence` that addresses the above -limitations. It allows `~Copyable` elements and offers bulk -iteration by providing a `Span`. - -```swift -try await fileReader.read { span in - print(span.count) -} -``` - -### `ConcludingAsyncReader` - -The `ConcludingAsyncReader` is a new type that provides scoped access to an -`AsyncReader`. Once, the user is done with the `AsyncReader` the concluding -final element is returned. - -```swift -let trailers = try await httpRequestConcludingReader.consumeAndConclude { bodyReader in - // Use the bodyReader to read the HTTP request body - try await bodyReader.read { chunk in - print(chunk) - } -} - -// The trailers are returned once we are done with the body reader -print(trailers) -``` - -### `AsyncWriter` - -`AsyncWriter` is the push-based counter part to `AsyncReader` that models an -asynchronous writable type. Similar to `AsyncReader` it allows `~Copyable` elements - and offers bulk writing by offering an `OutputSpan` to write into. - -```swift -var values = [1, 2, 3, 4] -try await fileWriter.write { outputSpan in - for value in values { - outputSpan.append(value) - } -} -``` - -### `ConcludingAsyncWriter` - -`ConcludingAsyncWriter` is the counter part to the `ConcludingAsyncReader`. It -provides access to a scoped writer. Once the user is done with the writer they -can return a final element. - -```swift -try await httpRequestConcludingWriter.consumeAndConclude { bodyWriter in - // Use the bodyWriter to write the HTTP request body - try await bodyWriter.write(values.span.bytes) - - // Return the trailers as the final element - return HTTPFields(...) -} -``` - -## Detailed design - -### `AsyncReader` - -```swift -/// A protocol that represents an asynchronous reader capable of reading elements from some source. -/// -/// ``AsyncReader`` defines an interface for types that can asynchronously read elements -/// of a specified type from a source. -public protocol AsyncReader: ~Copyable, ~Escapable { - /// The type of elements that can be read by this reader. - associatedtype ReadElement: ~Copyable - - /// The type of error that can be thrown during reading operations. - associatedtype ReadFailure: Error - - /// Reads elements from the underlying source and processes them with the provided body closure. - /// - /// This method asynchronously reads a span of elements from whatever source the reader - /// represents, then passes them to the provided body closure. The operation may complete immediately - /// or may await resources or processing time. - /// - /// - Parameter maximumCount: The maximum count of items the caller is ready - /// to process, or nil if the caller is prepared to accept an arbitrarily - /// large span. If non-nil, the maximum must be greater than zero. - /// - /// - Parameter body: A closure that consumes a span of read elements and performs some operation - /// on them, returning a value of type `Return`. When the span is empty, it indicates - /// the end of the reading operation or stream. - /// - /// - Returns: The value returned by the body closure after processing the read elements. - /// - /// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation - /// or a `Failure` from the body closure. - /// - /// ```swift - /// var fileReader: FileAsyncReader = ... - /// - /// // Read data from a file asynchronously and process it - /// let result = try await fileReader.read { data in - /// guard data.count > 0 else { - /// // Handle end of stream/terminal value - /// return finalProcessedValue - /// } - /// // Process the data - /// return data - /// } - /// ``` - mutating func read( - maximumCount: Int?, - body: (consuming Span) async throws(Failure) -> Return - ) async throws(EitherError) -> Return - -} -``` - -### `ConcludingAsyncReader` - -```swift -/// A protocol that represents an asynchronous reader that produces elements and concludes with a final value. -/// -/// ``ConcludingAsyncReader`` adds functionality to asynchronous readers that need to -/// provide a conclusive element after all reads are completed. This is particularly useful -/// for streams that have meaningful completion states beyond just terminating, such as -/// HTTP responses that include headers after the body is fully read. -public protocol ConcludingAsyncReader: ~Copyable, ~Escapable { - /// The underlying asynchronous reader type that produces elements. - associatedtype Underlying: AsyncReader, ~Copyable, ~Escapable - - /// The type of the final element produced after all reads are completed. - associatedtype FinalElement - - /// Processes the underlying async reader until completion and returns both the result of processing - /// and a final element. - /// - /// - Parameter body: A closure that takes the underlying `AsyncReader` and returns a value. - /// - Returns: A tuple containing the value returned by the body closure and the final element. - /// - Throws: Any error thrown by the body closure or encountered while processing the reader. - /// - /// - Note: This method consumes the concluding async reader, meaning it can only be called once on a value type. - /// - /// ```swift - /// let responseReader: HTTPResponseReader = ... - /// - /// // Process the body while capturing the final response status - /// let (bodyData, statusCode) = try await responseReader.consumeAndConclude { reader in - /// var collectedData = Data() - /// while let chunk = try await reader.read(body: { $0 }) { - /// collectedData.append(chunk) - /// } - /// return collectedData - /// } - /// ``` - consuming func consumeAndConclude( - body: (consuming sending Underlying) async throws(Failure) -> Return - ) async throws(Failure) -> (Return, FinalElement) -} - -``` - -### `AsyncWriter` - -```swift -/// A protocol that represents an asynchronous writer capable of providing a buffer to write into. -/// -/// ``AsyncWriter`` defines an interface for types that can asynchronously write elements -/// to a destination by providing an output span buffer for efficient batch writing operations. -public protocol AsyncWriter: ~Copyable, ~Escapable { - /// The type of elements that can be written by this writer. - associatedtype WriteElement: ~Copyable - - /// The type of error that can be thrown during writing operations. - associatedtype WriteFailure: Error - - /// Provides a buffer to write elements into. - /// - /// This method supplies an output span that the body closure can use to append elements - /// for writing. The writer manages the buffer allocation and handles the actual writing - /// operation once the body closure completes. - /// - /// - Parameter body: A closure that receives an `OutputSpan` for appending elements - /// to write. The closure can return a result of type `Result`. - /// - /// - Returns: The value returned by the body closure. - /// - /// - Throws: An `EitherError` containing either a `WriteFailure` from the write operation - /// or a `Failure` from the body closure. - /// - /// ## Example - /// - /// ```swift - /// var writer: SomeAsyncWriter = ... - /// - /// try await writer.write { outputSpan in - /// for item in items { - /// outputSpan.append(item) - /// } - /// return outputSpan.count - /// } - /// ``` - mutating func write( - _ body: (inout OutputSpan) async throws(Failure) -> Result - ) async throws(EitherError) -> Result - - /// Writes a span of elements to the underlying destination. - /// - /// This method asynchronously writes all elements from the provided span to whatever destination - /// the writer represents. The operation may require multiple write calls to complete if the - /// writer cannot accept all elements at once. - /// - /// - Parameter span: The span of elements to write. - /// - /// - Throws: An `EitherError` containing either a `WriteFailure` from the write operation - /// or an `AsyncWriterWroteShortError` if the writer cannot accept any more data before - /// all elements are written. - /// - /// ## Example - /// - /// ```swift - /// var fileWriter: FileAsyncWriter = ... - /// let dataBuffer: [UInt8] = [1, 2, 3, 4, 5] - /// - /// // Write the entire span to a file asynchronously - /// try await fileWriter.write(dataBuffer.span) - /// ``` - mutating func write( - _ span: Span - ) async throws(EitherError) -} -``` - -### `ConcludingAsyncWriter` - -```swift -/// A protocol that represents an asynchronous writer that produces a final value upon completion. -/// -/// ``ConcludingAsyncWriter`` adds functionality to asynchronous writers that need to -/// provide a conclusive element after writing is complete. This is particularly useful -/// for streams that have meaningful completion states, such as HTTP response that need -/// to finalize with optional trailers. -public protocol ConcludingAsyncWriter: ~Copyable, ~Escapable { - /// The underlying asynchronous writer type. - associatedtype Underlying: AsyncWriter, ~Copyable, ~Escapable - - /// The type of the final element produced after writing is complete. - associatedtype FinalElement - - /// Allows writing to the underlying async writer and produces a final element upon completion. - /// - /// - Parameter body: A closure that takes the underlying writer and returns both a value and a final element. - /// - Returns: The value returned by the body closure. - /// - Throws: Any error thrown by the body closure or encountered while writing. - /// - /// - Note: This method consumes the concluding async writer, meaning it can only be called once on a value type. - /// - /// ```swift - /// let responseWriter: HTTPResponseWriter = ... - /// - /// // Write the response body and produce a final status - /// let result = try await responseWriter.produceAndConclude { writer in - /// try await writer.write(data) - /// return (true, trailers) - /// } - /// ``` - consuming func produceAndConclude( - body: (consuming sending Underlying) async throws -> (Return, FinalElement) - ) async throws -> Return -} -``` - -## Alternatives considered - -### Naming - -We considered various other names for these types such as: - -- `AsyncReader` and `AsyncWriter` alternatives: - - `AsyncReadable` and `AsyncWritable` -- `ConcludingAsyncReader` and `ConcludingAsyncWriter` alternatives: - - `FinalElementAsyncReader` and `FinalElementAsyncWriter` - -### Async generators - -Asynchronous generators might provide an alternative to the current -`AsyncSequence` and the `AsyncReader` here. However, they would require -significant compiler features and potentially only replace the _read_ side. diff --git a/Sources/AsyncStreaming/Reader/Array+AsyncReader.swift b/Sources/AsyncStreaming/Reader/Array+AsyncReader.swift deleted file mode 100644 index ad7aae5..0000000 --- a/Sources/AsyncStreaming/Reader/Array+AsyncReader.swift +++ /dev/null @@ -1,85 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -extension Array { - /// Creates an async reader that provides access to the array's elements. - /// - /// This method converts an array into an ``AsyncReader`` implementation, allowing - /// the array's elements to be read through the async reader interface. - /// - /// - Returns: An ``AsyncReader`` that produces all elements of the array. - /// - /// ## Example - /// - /// ```swift - /// let numbers = [1, 2, 3, 4, 5] - /// var reader = numbers.asyncReader() - /// - /// try await reader.forEach { span in - /// print("Read \(span.count) numbers") - /// } - /// ``` - public func asyncReader() -> some AsyncReader & SendableMetatype { - return ArrayAsyncReader(array: self) - } -} - -/// An async reader implementation that provides array elements through the AsyncReader interface. -/// -/// This internal reader type wraps an array and delivers its elements through the ``AsyncReader`` -/// protocol. It maintains a current read position and can deliver elements in chunks based on -/// the requested maximum count. -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -struct ArrayAsyncReader: AsyncReader { - typealias ReadElement = Element - typealias ReadFailure = Never - - let array: [Element] - var index: Array.Index - - init(array: [Element]) { - self.array = array - self.index = array.startIndex - } - - mutating func read( - maximumCount: Int?, - body: (consuming Span) async throws(Failure) -> Return - ) async throws(EitherError) -> Return { - do { - guard self.index < self.array.endIndex else { - return try await body([Element]().span) - } - - guard let maximumCount else { - defer { - self.index = self.array.span.indices.endIndex - } - return try await body(self.array.span.extracting(self.index...)) - } - let endIndex = min( - self.array.span.indices.endIndex, - self.index.advanced( - by: maximumCount - ) - ) - defer { - self.index = endIndex - } - return try await body(self.array.span.extracting(self.index..( - upTo limit: Int, - body: (Span) async throws(Failure) -> Result - ) async throws(EitherError) -> Result { - var buffer = [ReadElement]() - var shouldContinue = true - do { - while shouldContinue { - try await self.read(maximumCount: limit - buffer.count) { span in - guard span.count > 0 else { - shouldContinue = false - return - } - precondition(span.count <= limit - buffer.count) - for index in span.indices { - buffer.append(span[index]) - } - } - } - } catch { - switch error { - case .first(let error): - throw .first(error) - case .second: - fatalError() - } - } - do { - return try await body(buffer.span) - } catch { - throw .second(error) - } - } - - /// Collects elements from the reader up to a specified limit and processes them with a body function. - /// - /// This method continuously reads elements from the async reader, accumulating them in a buffer - /// until either it reaches the end of the stream (indicated by an empty `Span`) or reaches - /// the specified limit. Once collection completes, it passes the accumulated elements to the - /// provided body function as a `Span` for processing. - /// - /// - Parameters: - /// - limit: The maximum number of elements to collect. This prevents unbounded memory - /// growth when reading from potentially infinite streams. - /// - body: A closure that receives a `Span` containing all collected elements and returns - /// a result of type `Result`. The method calls this closure once after collecting all - /// elements successfully. - /// - /// - Returns: The value returned by the body closure after processing the collected elements. - /// - /// ## Example - /// - /// ```swift - /// var reader: SomeAsyncReader = ... - /// - /// let processedData = try await reader.collect(upTo: 1000) { span in - /// // Process all collected elements - /// } - /// ``` - /// - /// ## Memory Considerations - /// - /// Since this method buffers all elements in memory before processing, it should be used - /// with caution on large datasets. The `limit` parameter serves as a safety mechanism - /// to prevent excessive memory usage. - #if compiler(<6.3) - @_lifetime(&self) - #endif - public mutating func collect( - upTo limit: Int, - body: (Span) async -> Result - ) async -> Result where ReadFailure == Never { - var buffer = [ReadElement]() - var shouldContinue = true - while limit - buffer.count > 0 && shouldContinue { - // This force-try is safe since neither read nor the closure are throwing - try! await self.read(maximumCount: limit - buffer.count) { span in - precondition(span.count <= limit - buffer.count) - guard span.count > 0 else { - // This means the underlying reader is finished and we can return - shouldContinue = false - return - } - for index in span.indices { - buffer.append(span[index]) - } - } - } - return await body(buffer.span) - } - - /// Collects elements from the reader into an output span until the span is full. - /// - /// This method continuously reads elements from the async reader and appends them to the - /// provided output span until the span reaches its capacity. This provides an efficient - /// way to fill a pre-allocated buffer with elements from the reader. - /// - /// - Parameter outputSpan: An `OutputSpan` to append read elements into. The method continues - /// reading until this span is full. - /// - /// - Throws: An error of type `ReadFailure` if any read operation fails. - /// - /// ## Example - /// - /// ```swift - /// var reader: SomeAsyncReader = ... - /// var buffer = [Int](repeating: 0, count: 100) - /// - /// try await buffer.withOutputSpan { outputSpan in - /// try await reader.collect(into: &outputSpan) - /// } - /// ``` - #if compiler(<6.3) - @_lifetime(&self) - #endif - public mutating func collect( - into outputSpan: inout OutputSpan - ) async throws(ReadFailure) { - while !outputSpan.isFull { - do { - try await self.read(maximumCount: outputSpan.freeCapacity) { span in - for index in span.indices { - outputSpan.append(span[index]) - } - } - } catch { - switch error { - case .first(let error): - throw error - case .second: - fatalError() - } - } - } - } -} diff --git a/Sources/AsyncStreaming/Reader/AsyncReader+forEach.swift b/Sources/AsyncStreaming/Reader/AsyncReader+forEach.swift deleted file mode 100644 index 4f8cc8f..0000000 --- a/Sources/AsyncStreaming/Reader/AsyncReader+forEach.swift +++ /dev/null @@ -1,99 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -// swift-format-ignore: AmbiguousTrailingClosureOverload -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -extension AsyncReader where Self: ~Copyable, Self: ~Escapable { - /// Iterates over all elements from the reader, executing the provided body for each span. - /// - /// This method continuously reads elements from the async reader until the stream ends, - /// executing the provided closure for each span of elements read. The iteration terminates - /// when the reader produces an empty span, indicating the end of the stream. - /// - /// - Parameter body: An asynchronous closure that processes each span of elements read - /// from the stream. The closure receives a `Span` for each read operation. - /// - /// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation - /// or a `Failure` from the body closure. - /// - /// ## Example - /// - /// ```swift - /// var fileReader: FileAsyncReader = ... - /// - /// // Process each chunk of data from the file - /// try await fileReader.forEach { chunk in - /// print("Processing \(chunk.count) elements") - /// // Process the chunk - /// } - /// ``` - public consuming func forEach( - body: (consuming Span) async throws(Failure) -> Void - ) async throws(EitherError) { - var shouldContinue = true - while shouldContinue { - try await self.read(maximumCount: nil) { (next) throws(Failure) -> Void in - guard next.count > 0 else { - shouldContinue = false - return - } - - try await body(next) - } - } - } - - /// Iterates over all elements from the reader, executing the provided body for each span. - /// - /// This method continuously reads elements from the async reader until the stream ends, - /// executing the provided closure for each span of elements read. The iteration terminates - /// when the reader produces an empty span, indicating the end of the stream. - /// - /// - Parameter body: An asynchronous closure that processes each span of elements read - /// from the stream. The closure receives a `Span` for each read operation. - /// - /// - Throws: An error of type `Failure` from the body closure. Since this reader never fails, - /// only the body closure can throw errors. - /// - /// ## Example - /// - /// ```swift - /// var fileReader: FileAsyncReader = ... - /// - /// // Process each chunk of data from the file - /// try await fileReader.forEach { chunk in - /// print("Processing \(chunk.count) elements") - /// // Process the chunk - /// } - /// ``` - @inlinable - public consuming func forEach( - body: (consuming Span) async -> Void - ) async where ReadFailure == Never { - var shouldContinue = true - while shouldContinue { - do { - try await self.read(maximumCount: nil) { (next) -> Void in - guard next.count > 0 else { - shouldContinue = false - return - } - - await body(next) - } - } catch { - fatalError() - } - } - } -} diff --git a/Sources/AsyncStreaming/Reader/AsyncReader+map.swift b/Sources/AsyncStreaming/Reader/AsyncReader+map.swift deleted file mode 100644 index 81ce059..0000000 --- a/Sources/AsyncStreaming/Reader/AsyncReader+map.swift +++ /dev/null @@ -1,98 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import BasicContainers - -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -extension AsyncReader where Self: ~Copyable, Self: ~Escapable { - /// Transforms elements read from this reader using the provided transformation function. - /// - /// This method creates a new async reader that applies the specified transformation to each - /// element read from the underlying reader. The transformation is applied lazily as elements - /// are read, maintaining the streaming nature of the operation. - /// - /// - Parameter transformation: An asynchronous closure that transforms each read element - /// of type `ReadElement` into a new element of type `MappedElement`. - /// - /// - Returns: A new ``AsyncReader`` that produces transformed elements of type `MappedElement`. - /// - /// ## Example - /// - /// ```swift - /// var dataReader: SomeAsyncReader = ... - /// - /// // Transform the spans into their element count - /// var countReader = dataReader.map { span in - /// span.count - /// } - /// - /// try await countReader.forEach { span in - /// print("Received chunk with \(span[0]) values") - /// } - /// ``` - @_lifetime(copy self) - public consuming func map( - _ transformation: @escaping (borrowing ReadElement) async -> MappedElement - ) -> some (AsyncReader & ~Copyable & ~Escapable) { - return AsyncMapReader(base: self, transformation: transformation) - } -} - -/// An async reader that transforms elements from a base reader using a mapping function. -/// -/// This internal reader type wraps another async reader and applies a transformation -/// to each element read from the base reader. The transformation is applied lazily -/// as elements are read, maintaining the streaming nature of the operation. -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -struct AsyncMapReader: AsyncReader, ~Copyable, ~Escapable { - typealias ReadElement = MappedElement - typealias ReadFailure = Base.ReadFailure - - var base: Base - var transformation: (borrowing Base.ReadElement) async -> MappedElement - - @_lifetime(copy base) - init( - base: consuming Base, - transformation: @escaping (borrowing Base.ReadElement) async -> MappedElement - ) { - self.base = base - self.transformation = transformation - } - - #if compiler(<6.3) - @_lifetime(&self) - #endif - mutating func read( - maximumCount: Int?, - body: (consuming Span) async throws(Failure) -> Return - ) async throws(EitherError) -> Return { - var buffer = RigidArray() - return try await self.base - .read(maximumCount: maximumCount) { (span) throws(Failure) -> Return in - guard span.count > 0 else { - let emptySpan = InlineArray<0, MappedElement>.zero() - return try await body(emptySpan.span) - } - - buffer.reserveCapacity(span.count) - - for index in span.indices { - let transformed = await self.transformation(span[index]) - buffer.append(transformed) - } - - return try await body(buffer.span) - } - } -} diff --git a/Sources/AsyncStreaming/Reader/AsyncReader.swift b/Sources/AsyncStreaming/Reader/AsyncReader.swift deleted file mode 100644 index 6cf333b..0000000 --- a/Sources/AsyncStreaming/Reader/AsyncReader.swift +++ /dev/null @@ -1,158 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -/// A protocol that represents an asynchronous reader capable of reading elements from some source. -/// -/// ``AsyncReader`` defines an interface for types that can asynchronously read elements -/// of a specified type from a source. -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -public protocol AsyncReader: ~Copyable, ~Escapable { - /// The type of elements that can be read by this reader. - associatedtype ReadElement: ~Copyable - - /// The type of error that can be thrown during reading operations. - associatedtype ReadFailure: Error - - /// Reads elements from the underlying source and processes them with the provided body closure. - /// - /// This method asynchronously reads a span of elements from whatever source the reader - /// represents, then passes them to the provided body closure. The operation may complete immediately - /// or may await resources or processing time. - /// - /// - Parameter maximumCount: The maximum count of items the caller is ready - /// to process, or nil if the caller is prepared to accept an arbitrarily - /// large span. If non-nil, the maximum must be greater than zero. - /// - /// - Parameter body: A closure that consumes a span of read elements and performs some operation - /// on them, returning a value of type `Return`. When the span is empty, it indicates - /// the end of the reading operation or stream. - /// - /// - Returns: The value returned by the body closure after processing the read elements. - /// - /// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation - /// or a `Failure` from the body closure. - /// - /// ```swift - /// var fileReader: FileAsyncReader = ... - /// - /// // Read data from a file asynchronously and process it - /// let result = try await fileReader.read { data in - /// guard data.count > 0 else { - /// // Handle end of stream/terminal value - /// return finalProcessedValue - /// } - /// // Process the data - /// return data - /// } - /// ``` - #if compiler(<6.3) - @_lifetime(&self) - #endif - mutating func read( - maximumCount: Int?, - body: (consuming Span) async throws(Failure) -> Return - ) async throws(EitherError) -> Return - -} - -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -extension AsyncReader where Self: ~Copyable, Self: ~Escapable { - /// Reads elements from the underlying source and processes them with the provided body closure. - /// - /// This is a convenience method for async readers that never fail, simplifying the error handling - /// by directly throwing the body closure's error type instead of wrapping it in `EitherError`. - /// - /// - Parameter maximumCount: The maximum count of items the caller is ready to process, - /// or nil if the caller is prepared to accept an arbitrarily large span. - /// - /// - Parameter body: A closure that consumes a span of read elements and performs some operation - /// on them, returning a value of type `Return`. - /// - /// - Returns: The value returned by the body closure after processing the read elements. - /// - /// - Throws: An error of type `Failure` if the body closure throws. - /// - /// ## Example - /// - /// ```swift - /// var reader: some AsyncReader = ... // Never-failing reader - /// - /// let result = try await reader.read(maximumCount: 100) { span in - /// // Process the span - /// return span.count - /// } - /// ``` - #if compiler(<6.3) - @_lifetime(&self) - #endif - public mutating func read( - maximumCount: Int?, - body: (consuming Span) async throws(Failure) -> Return - ) async throws(Failure) -> Return where Self.ReadFailure == Never { - do { - return try await self.read(maximumCount: maximumCount) { (span) throws(Failure) -> Return in - return try await body(span) - } - } catch { - switch error { - case .first: - fatalError() - case .second(let error): - throw error - } - } - } -} - -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -extension AsyncReader where ReadElement: Copyable { - /// Reads elements from this reader into the provided output span. - /// - /// This method reads a span of elements from the underlying reader and appends them - /// to the provided output span. This is a convenience method for readers with copyable - /// elements that need to populate an existing output buffer. The method reads up to - /// the free capacity available in the output span. - /// - /// - Parameter outputSpan: An `OutputSpan` to append read elements into. - /// - /// - Throws: An error of type `ReadFailure` if the read operation cannot be completed successfully. - /// - /// ## Example - /// - /// ```swift - /// var reader: some AsyncReader = ... - /// var buffer = [Int](repeating: 0, count: 100) - /// - /// await buffer.withOutputSpan { outputSpan in - /// await reader.read(into: &outputSpan) - /// } - /// ``` - public mutating func read( - into outputSpan: inout OutputSpan - ) async throws(ReadFailure) { - do { - try await self.read(maximumCount: outputSpan.freeCapacity) { span in - for i in span.indices { - outputSpan.append(span[i]) - } - } - } catch { - switch error { - case .first(let error): - throw error - case .second: - fatalError() - } - } - } -} diff --git a/Sources/AsyncStreaming/Reader/ConcludingAsyncReader+collect.swift b/Sources/AsyncStreaming/Reader/ConcludingAsyncReader+collect.swift deleted file mode 100644 index c0076b0..0000000 --- a/Sources/AsyncStreaming/Reader/ConcludingAsyncReader+collect.swift +++ /dev/null @@ -1,53 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -extension ConcludingAsyncReader where Self: ~Copyable, Underlying: ~Copyable { - /// Collects elements from the underlying async reader and returns both the processed result and final element. - /// - /// This method provides a convenient way to collect elements from the underlying reader while - /// capturing both the processing result and the final element that concludes the reading operation. - /// It combines the functionality of ``AsyncReader/collect(upTo:body:)-(_,(Span) -> Result)`` from ``AsyncReader`` with the concluding - /// behavior of ``ConcludingAsyncReader``. - /// - /// - Parameters: - /// - limit: The maximum number of elements to collect from the underlying reader. - /// - body: A closure that processes the collected elements as a `Span` and returns a result. - /// - /// - Returns: A tuple containing the result from processing the collected elements and the final element. - /// - /// - Throws: Any error thrown by the underlying read operations or the body closure during - /// the collection and processing of elements. - /// - /// ## Example - /// - /// ```swift - /// let responseReader: HTTPConcludingReader = ... - /// - /// // Collect response data and get final headers - /// let (processedData, finalHeaders) = try await responseReader.collect(upTo: 1024 * 1024) { span in - /// // Process all collected elements - /// } - /// ``` - public consuming func collect( - upTo limit: Int, - body: (Span) async throws -> Result - ) async throws -> (Result, FinalElement) where Underlying.ReadElement: Copyable { - try await self.consumeAndConclude { reader in - var reader = reader - return try await reader.collect(upTo: limit) { span in - try await body(span) - } - } - } -} diff --git a/Sources/AsyncStreaming/Writer/AsyncWriter.swift b/Sources/AsyncStreaming/Writer/AsyncWriter.swift deleted file mode 100644 index 2ceaa0e..0000000 --- a/Sources/AsyncStreaming/Writer/AsyncWriter.swift +++ /dev/null @@ -1,159 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -/// A protocol that represents an asynchronous writer capable of providing a buffer to write into. -/// -/// ``AsyncWriter`` defines an interface for types that can asynchronously write elements -/// to a destination by providing an output span buffer for efficient batch writing operations. -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -public protocol AsyncWriter: ~Copyable, ~Escapable { - /// The type of elements that can be written by this writer. - associatedtype WriteElement: ~Copyable - - /// The type of error that can be thrown during writing operations. - associatedtype WriteFailure: Error - - /// Provides a buffer to write elements into. - /// - /// This method supplies an output span that the body closure can use to append elements - /// for writing. The writer manages the buffer allocation and handles the actual writing - /// operation once the body closure completes. - /// - /// - Parameter body: A closure that receives an `OutputSpan` for appending elements - /// to write. The closure can return a result of type `Result`. - /// - /// - Returns: The value returned by the body closure. - /// - /// - Throws: An `EitherError` containing either a `WriteFailure` from the write operation - /// or a `Failure` from the body closure. - /// - /// ## Example - /// - /// ```swift - /// var writer: SomeAsyncWriter = ... - /// - /// try await writer.write { outputSpan in - /// for item in items { - /// outputSpan.append(item) - /// } - /// return outputSpan.count - /// } - /// ``` - // TODO: EOF should be signaled by providing an empty output span? - #if compiler(<6.3) - @_lifetime(self: copy self) - #endif - mutating func write( - _ body: (inout OutputSpan) async throws(Failure) -> Result - ) async throws(EitherError) -> Result - - /// Writes a span of elements to the underlying destination. - /// - /// This method asynchronously writes all elements from the provided span to whatever destination - /// the writer represents. The operation may require multiple write calls to complete if the - /// writer cannot accept all elements at once. - /// - /// - Parameter span: The span of elements to write. - /// - /// - Throws: An `EitherError` containing either a `WriteFailure` from the write operation - /// or an `AsyncWriterWroteShortError` if the writer cannot accept any more data before - /// all elements are written. - /// - /// ## Example - /// - /// ```swift - /// var fileWriter: FileAsyncWriter = ... - /// let dataBuffer: [UInt8] = [1, 2, 3, 4, 5] - /// - /// // Write the entire span to a file asynchronously - /// try await fileWriter.write(dataBuffer.span) - /// ``` - #if compiler(<6.3) - @_lifetime(self: copy self) - #endif - mutating func write( - _ span: Span - ) async throws(EitherError) -} - -/// An error that indicates the writer was unable to accept all provided elements. -/// -/// The default `write(_: Span)` implementation throws this error when the writer provides -/// an empty output span but elements remain to be written. -public struct AsyncWriterWroteShortError: Error { - private let dummy: (any Sendable)? = nil // TODO: This is just here to workaround https://github.com/swiftlang/swift/pull/86843 - public init() {} -} - -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -extension AsyncWriter where Self: ~Copyable, Self: ~Escapable { - /// Writes the provided element to the underlying destination. - /// - /// This method asynchronously writes the given element to whatever destination the writer - /// represents. The operation may complete immediately or may await resources or processing time. - /// - /// - Parameter element: The element to write. This typically represents a single item or a collection - /// of items depending on the specific writer implementation. - /// - /// - Throws: An error of type `WriteFailure` if the write operation cannot be completed successfully. - /// - /// - Note: This method requires `mutating` because writing operations often change the internal - /// state of the writer. - /// - /// ```swift - /// var fileWriter: FileAsyncWriter = ... - /// - /// // Write data to a file asynchronously - /// try await fileWriter.write(dataChunk) - /// ``` - #if compiler(<6.3) - @_lifetime(self: copy self) - #endif - public mutating func write(_ element: consuming WriteElement) async throws(WriteFailure) { - // Since the element is ~Copyable but we don't have call-once closures - // we need to move it into an Optional and then take it out once. This - // also makes the below force unwrap safe - var opt = Optional(element) - do { - try await self.write { outputSpan in - outputSpan.append(opt.take()!) - } - } catch { - switch error { - case .first(let error): - throw error - case .second: - fatalError() - } - } - } - - #if compiler(<6.3) - @_lifetime(self: copy self) - #endif - public mutating func write(_ span: Span) async throws(EitherError) - where WriteElement: Copyable { - var index = span.indices.startIndex - while index < span.indices.endIndex { - try await self.write { (outputSpan) throws(AsyncWriterWroteShortError) -> Void in - guard outputSpan.capacity != 0 else { - throw AsyncWriterWroteShortError() - } - while !outputSpan.isFull && index < span.indices.endIndex { - outputSpan.append(span[index]) - index += 1 - } - } - } - } -} diff --git a/Sources/AsyncStreaming/Writer/RigidArray+AsyncWriter.swift b/Sources/AsyncStreaming/Writer/RigidArray+AsyncWriter.swift deleted file mode 100644 index 6ac53e9..0000000 --- a/Sources/AsyncStreaming/Writer/RigidArray+AsyncWriter.swift +++ /dev/null @@ -1,57 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -public import BasicContainers - -/// Conforms `RigidArray` to the ``AsyncWriter`` protocol. -/// -/// This extension enables `RigidArray` to be used as an asynchronous writer, allowing -/// elements to be appended through the async writer interface. -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -extension RigidArray: AsyncWriter { - /// Provides a buffer to write elements into the rigid array. - /// - /// This method allocates space for elements in the array and provides an `OutputSpan` - /// that the body closure can use to append elements. The method appends up to the - /// specified count of elements to the array. - /// - /// - Parameter body: A closure that receives an `OutputSpan` to write elements into. - /// - /// - Returns: The value returned by the body closure. - /// - /// - Throws: Any error thrown by the body closure. - /// - /// ## Example - /// - /// ```swift - /// var array = RigidArray() - /// - /// try await array.write { outputSpan in - /// for i in 0..<5 { - /// outputSpan.append(i) - /// } - /// } - /// ``` - public mutating func write( - _ body: (inout OutputSpan) async throws(Failure) -> Result - ) async throws(EitherError) -> Result { - do { - // TODO: Reconsider adding count to AsyncWriter - return try await self.append(count: 10) { (outputSpan) async throws(Failure) -> Result in - try await body(&outputSpan) - } - } catch { - throw .second(error) - } - } -} diff --git a/Sources/FetchHTTPClient/FetchHTTPClient.swift b/Sources/FetchHTTPClient/FetchHTTPClient.swift index 33bc7bf..cb6825e 100644 --- a/Sources/FetchHTTPClient/FetchHTTPClient.swift +++ b/Sources/FetchHTTPClient/FetchHTTPClient.swift @@ -125,19 +125,22 @@ public final class FetchHTTPClient: HTTPAPIs.HTTPClient { } public struct RequestBodyWriter: AsyncWriter, ~Copyable { + public typealias WriteElement = UInt8 + public typealias WriteFailure = any Error + public typealias Buffer = UniqueArray + let buffer: RequestBodyBuffer - public mutating func write( - _ body: nonisolated(nonsending) (inout OutputSpan) async throws(Failure) -> Result - ) async throws(AsyncStreaming.EitherError) -> Result where Failure: Error { + public mutating func write( + _ body: nonisolated(nonsending) (inout UniqueArray) async throws(Failure) -> Return + ) async throws(AsyncStreaming.EitherError) -> Return where Failure: Error { + let result: Return do { - // Each write attempt gets approximately a page of memory to populate with data. - return try await buffer.array.append(count: 4 * 1024) { span in - return try await body(&span) - } + result = try await body(&self.buffer.array) } catch { - throw .first(error) + throw .second(error) } + return result } } @@ -152,64 +155,38 @@ public final class FetchHTTPClient: HTTPAPIs.HTTPClient { } public struct ResponseBodyReader: AsyncReader, ~Copyable { + public typealias ReadElement = UInt8 + public typealias ReadFailure = any Error + public typealias Buffer = UniqueArray + let reader: ReadableStreamDefaultReader - var buffer = [UInt8]() - var curIndex = 0 + var buffer = UniqueArray() - public mutating func read( - maximumCount: Int?, - body: nonisolated(nonsending) (consuming Span) async throws(Failure) -> Return + public mutating func read( + body: nonisolated(nonsending) (inout UniqueArray) async throws(Failure) -> Return ) async throws(AsyncStreaming.EitherError) -> Return where Failure: Error { - if buffer.isEmpty { - // Read more data in from JS - let chunk: Chunk - do { - chunk = try await reader.read() - } catch { - throw .first(error) - } - if chunk.done { - do { - return try await body(Span()) - } catch { - throw .second(error) - } - } + let chunk: Chunk + do { + chunk = try await self.reader.read() + } catch { + throw .first(error) + } + if !chunk.done { guard let bytes = chunk.value, !bytes.isEmpty else { // If not done, there must be bytes that can be read throw .first(FetchError.BadAssumptionJS) } - - buffer = bytes - } - - let range: Range - let numRemainingElements = buffer.count - curIndex - if let maximumCount, numRemainingElements > maximumCount { - // There is more data in this buffer than the user wants. - // Give them a smaller span, update the index - let endIndex = curIndex + maximumCount - range = curIndex..` from the write operation. + /// - Throws: An error originating from the read or write operations. /// /// ## Example /// @@ -34,18 +35,23 @@ extension AsyncWriter where Self: ~Copyable, Self: ~Escapable { /// // Copy all data from reader to writer /// try await fileWriter.write(dataReader) /// ``` - /// - /// ## Discussion - /// - /// This method provides a convenient way to pipe data from one async stream to another, - /// automatically handling the iteration and transfer of elements. The operation continues - /// until the reader signals completion by producing an empty span. + #if compiler(<6.3) @_lifetime(self: copy self) - public mutating func write( - _ reader: consuming some (AsyncReader & ~Copyable & ~Escapable) - ) async throws(EitherError>) where WriteElement: Copyable { - try await reader.forEach { (span) throws(EitherError) -> Void in - try await self.write(span) + #endif + public mutating func write( + _ reader: consuming Reader + ) async throws + where + Reader: AsyncReader & ~Copyable & ~Escapable, + Reader.ReadElement == WriteElement + { + try await reader.forEachBuffer { (readBuffer: inout Reader.Buffer) in + try await self.write { (writeBuffer: inout Self.Buffer) in + writeBuffer.append( + moving: readBuffer.startIndex..) async throws(WriteFailure) + where WriteElement: Copyable { + do { + try await self.write { (buffer: inout Self.Buffer) in + buffer.append(copying: span) + } + } catch { + switch error { + case .first(let error): + throw error + case .second: + fatalError() + } + } + } +} diff --git a/Sources/HTTPAPIs/Client/HTTPClient+Conveniences.swift b/Sources/HTTPAPIs/Client/HTTPClient+Conveniences.swift index 394b429..672a564 100644 --- a/Sources/HTTPAPIs/Client/HTTPClient+Conveniences.swift +++ b/Sources/HTTPAPIs/Client/HTTPClient+Conveniences.swift @@ -234,7 +234,7 @@ where if $0.count > limit { throw LengthLimitExceededError() } - return unsafe $0.withUnsafeBytes { unsafe Data($0) } + return $0.span.withUnsafeBytes { unsafe Data($0) } }.0 } } diff --git a/Sources/HTTPAPIs/ConcludingAsyncReader+collect.swift b/Sources/HTTPAPIs/ConcludingAsyncReader+collect.swift new file mode 100644 index 0000000..e7efa26 --- /dev/null +++ b/Sources/HTTPAPIs/ConcludingAsyncReader+collect.swift @@ -0,0 +1,65 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift HTTP API Proposal open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +public import AsyncStreaming +import BasicContainers +public import ContainersPreview + +@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) +extension ConcludingAsyncReader where Self: ~Copyable, Underlying: ~Copyable { + /// Collects elements from the underlying async reader and returns both the processed result and final element. + /// + /// Reads elements from the underlying reader until either the accumulated count reaches `limit` + /// or the stream ends. Any elements the reader produces beyond `limit` are discarded. + /// + /// - Parameters: + /// - limit: The maximum number of elements to collect from the underlying reader. + /// - body: A closure that processes the collected elements as an `InputSpan` and returns a result. + /// + /// - Returns: A tuple containing the result from processing the collected elements and the final element. + /// + /// - Throws: Any error thrown by the underlying read operations or the body closure during + /// the collection and processing of elements. + public consuming func collect( + upTo limit: Int, + body: (consuming InputSpan) async throws -> Result + ) async throws -> (Result, FinalElement) { + try await self.consumeAndConclude { reader in + var reader = reader + var accumulated = UniqueArray() + var eof = false + while accumulated.count < limit && !eof { + try await reader.read { buffer in + if buffer.count == 0 { + eof = true + return + } + let remainingCapacity = limit - accumulated.count + if buffer.count <= remainingCapacity { + accumulated.append( + moving: buffer.startIndex..( - _ body: (inout OutputSpan) async throws(Failure) -> Result - ) async throws(AsyncStreaming.EitherError) -> Result where Failure: Error { - try await self.actual.write(body) - } + public typealias WriteElement = UInt8 + public typealias WriteFailure = any Error + public typealias Buffer = UniqueArray - public mutating func write( - _ span: Span - ) async throws(EitherError) { - try await self.actual.write(span) + public mutating func write( + _ body: (inout UniqueArray) async throws(Failure) -> Return + ) async throws(AsyncStreaming.EitherError) -> Return where Failure: Error { + try await self.actual.write(body) } var actual: ActualHTTPClient.RequestWriter @@ -55,11 +54,14 @@ public final class DefaultHTTPClient: HTTPAPIs.HTTPClient { public struct ResponseConcludingReader: ConcludingAsyncReader, ~Copyable { public struct Underlying: AsyncReader, ~Copyable { - public mutating func read( - maximumCount: Int?, - body: (consuming Span) async throws(Failure) -> Return + public typealias ReadElement = UInt8 + public typealias ReadFailure = any Error + public typealias Buffer = UniqueArray + + public mutating func read( + body: (inout UniqueArray) async throws(Failure) -> Return ) async throws(AsyncStreaming.EitherError) -> Return where Failure: Error { - try await self.actual.read(maximumCount: maximumCount, body: body) + try await self.actual.read(body: body) } var actual: ActualHTTPClient.ResponseConcludingReader.Underlying diff --git a/Sources/HTTPClientConformance/HTTPClientConformance.swift b/Sources/HTTPClientConformance/HTTPClientConformance.swift index 0320f12..82249c4 100644 --- a/Sources/HTTPClientConformance/HTTPClientConformance.swift +++ b/Sources/HTTPClientConformance/HTTPClientConformance.swift @@ -11,6 +11,8 @@ // //===----------------------------------------------------------------------===// +import BasicContainers +import ContainersPreview import Foundation public import HTTPClient import HTTPTypes @@ -296,7 +298,7 @@ struct ConformanceTestSuite { ) { response, responseBodyAndTrailers in #expect(response.status == .ok) let (body, _) = try await responseBodyAndTrailers.collect(upTo: 1024) { span in - return String(copying: try UTF8Span(validating: span)) + return String(copying: try UTF8Span(validating: span.span)) } #expect(body == "1234") } @@ -317,7 +319,7 @@ struct ConformanceTestSuite { ) { response, responseBodyAndTrailers in #expect(response.status == .ok) let (body, trailers) = try await responseBodyAndTrailers.collect(upTo: 1024) { span in - return String(copying: try UTF8Span(validating: span)) + return String(copying: try UTF8Span(validating: span.span)) } #expect(body.isEmpty) #expect(trailers == nil) @@ -343,7 +345,7 @@ struct ConformanceTestSuite { ) { response, responseBodyAndTrailers in #expect(response.status == .ok) let (jsonRequest, _) = try await responseBodyAndTrailers.collect(upTo: 1024) { span in - let body = String(copying: try UTF8Span(validating: span)) + let body = String(copying: try UTF8Span(validating: span.span)) let data = body.data(using: .utf8)! return try JSONDecoder().decode(JSONHTTPRequest.self, from: data) } @@ -370,7 +372,7 @@ struct ConformanceTestSuite { ) { response, responseBodyAndTrailers in #expect(response.status == .ok) let (body, _) = try await responseBodyAndTrailers.collect(upTo: 1024) { span in - let body = String(copying: try UTF8Span(validating: span)) + let body = String(copying: try UTF8Span(validating: span.span)) return body } @@ -402,7 +404,7 @@ struct ConformanceTestSuite { } let (body, _) = try await responseBodyAndTrailers.collect(upTo: 1024) { span in - return String(copying: try UTF8Span(validating: span)) + return String(copying: try UTF8Span(validating: span.span)) } #expect(body == "TEST\n") } @@ -431,7 +433,7 @@ struct ConformanceTestSuite { } let (body, _) = try await responseBodyAndTrailers.collect(upTo: 1024) { span in - return String(copying: try UTF8Span(validating: span)) + return String(copying: try UTF8Span(validating: span.span)) } #expect(body == "TEST\n") } @@ -460,7 +462,7 @@ struct ConformanceTestSuite { } let (body, _) = try await responseBodyAndTrailers.collect(upTo: 1024) { span in - return String(copying: try UTF8Span(validating: span)) + return String(copying: try UTF8Span(validating: span.span)) } #expect(body == "TEST\n") } @@ -481,7 +483,7 @@ struct ConformanceTestSuite { let contentEncoding = response.headerFields[.contentEncoding] #expect(contentEncoding == nil || contentEncoding == "identity") let (body, _) = try await responseBodyAndTrailers.collect(upTo: 1024) { span in - return String(copying: try UTF8Span(validating: span)) + return String(copying: try UTF8Span(validating: span.span)) } #expect(body == "TEST\n") } @@ -507,7 +509,7 @@ struct ConformanceTestSuite { ) { response, responseBodyAndTrailers in #expect(response.status == .ok) let (jsonRequest, _) = try await responseBodyAndTrailers.collect(upTo: 1024) { span in - let body = String(copying: try UTF8Span(validating: span)) + let body = String(copying: try UTF8Span(validating: span.span)) let data = body.data(using: .utf8)! return try JSONDecoder().decode(JSONHTTPRequest.self, from: data) } @@ -532,7 +534,7 @@ struct ConformanceTestSuite { ) { response, responseBodyAndTrailers in #expect(response.status == .ok) let (jsonRequest, _) = try await responseBodyAndTrailers.collect(upTo: 1024) { span in - let body = String(copying: try UTF8Span(validating: span)) + let body = String(copying: try UTF8Span(validating: span.span)) let data = body.data(using: .utf8)! return try JSONDecoder().decode(JSONHTTPRequest.self, from: data) } @@ -663,10 +665,12 @@ struct ConformanceTestSuite { #expect(response.status == .ok) let _ = try await responseBodyAndTrailers.consumeAndConclude { reader in var numberOfChunks = 0 - try await reader.forEach { span in + try await reader.forEachBuffer { buffer in numberOfChunks += 1 - #expect(span.count == 1) - #expect(span[0] == UInt8(ascii: "A")) + #expect(buffer.count == 1) + var consumer = buffer.consumeAll() + let first = consumer.next() + #expect(first == UInt8(ascii: "A")) // Unblock the writer continuation.yield() @@ -693,7 +697,7 @@ struct ConformanceTestSuite { ) { response, responseBodyAndTrailers in #expect(response.status == .ok) let (jsonRequest, _) = try await responseBodyAndTrailers.collect(upTo: 1024) { span in - let body = String(copying: try UTF8Span(validating: span)) + let body = String(copying: try UTF8Span(validating: span.span)) let data = body.data(using: .utf8)! return try JSONDecoder().decode(JSONHTTPRequest.self, from: data) } @@ -731,8 +735,11 @@ struct ConformanceTestSuite { #expect(response.status == .ok) let _ = try await responseBodyAndTrailers.consumeAndConclude { reader in // Read all chunks from server - try await reader.forEach { span in - let chunk = String(copying: try UTF8Span(validating: span)) + try await reader.forEachBuffer { buffer in + var bytes = [UInt8]() + var consumer = buffer.consumeAll() + while let b = consumer.next() { bytes.append(b) } + let chunk = String(copying: try UTF8Span(validating: bytes.span)) #expect(chunk == "A") // Give chunk to the writer to echo back @@ -812,10 +819,10 @@ struct ConformanceTestSuite { // The client may choose to return however much of the body it already // has downloaded, but eventually it must throw an exception because // the response is incomplete and the task has been cancelled. - while true { - try await reader.collect(upTo: .max) { - #expect($0.count > 0) - } + try await reader.forEachBuffer { buffer in + #expect(buffer.count > 0) + var consumer = buffer.consumeAll() + while consumer.next() != nil {} } } } @@ -880,7 +887,7 @@ struct ConformanceTestSuite { ) { response, responseBodyAndTrailers in #expect(response.status == .ok) let (echo, _) = try await responseBodyAndTrailers.collect(upTo: 2_000_000) { span in - return String(copying: try UTF8Span(validating: span)) + return String(copying: try UTF8Span(validating: span.span)) } #expect(echo == String(repeating: "A", count: 1_000_000)) } @@ -901,7 +908,7 @@ struct ConformanceTestSuite { ) { response, responseBodyAndTrailers in #expect(response.status == .ok) let (character, _) = try await responseBodyAndTrailers.collect(upTo: 1) { span in - return String(copying: try UTF8Span(validating: span)) + return String(copying: try UTF8Span(validating: span.span)) } #expect(character == "A") } @@ -925,18 +932,10 @@ struct ConformanceTestSuite { let (result, _) = try await responseBodyAndTrailers.consumeAndConclude { reader in var result = [UInt8]() var reader = reader - var breakTheLoop = false - while !breakTheLoop { - breakTheLoop = try await reader.read(maximumCount: 1) { bytes in - guard bytes.isEmpty else { - precondition(bytes.count == 1) - result.append(bytes[0]) - return false - } - return true - } + try await reader.forEachBuffer { buffer in + var consumer = buffer.consumeAll() + while let b = consumer.next() { result.append(b) } } - return result } #expect(result == [UInt8](repeating: UInt8(ascii: "A"), count: 1_000_000)) @@ -956,7 +955,7 @@ struct ConformanceTestSuite { ) { response, responseBodyAndTrailers in #expect(response.status == .ok) let (body, trailers) = try await responseBodyAndTrailers.collect(upTo: 1024) { span in - return String(copying: try UTF8Span(validating: span)) + return String(copying: try UTF8Span(validating: span.span)) } #expect(body.isEmpty) #expect(trailers == nil) @@ -1004,7 +1003,7 @@ struct ConformanceTestSuite { request: request, ) { response, responseBodyAndTrailers in let (jsonRequest, _) = try await responseBodyAndTrailers.collect(upTo: 1024) { span in - let body = String(copying: try UTF8Span(validating: span)) + let body = String(copying: try UTF8Span(validating: span.span)) let data = body.data(using: .utf8)! return try JSONDecoder().decode(JSONHTTPRequest.self, from: data) } @@ -1051,7 +1050,7 @@ struct ConformanceTestSuite { let clientCookie = try await client.perform(request: request2) { response, responseBodyAndTrailers in // The server gave us the request back. Check that the cookie was in the request. let (jsonRequest, _) = try await responseBodyAndTrailers.collect(upTo: 1024) { span in - let body = String(copying: try UTF8Span(validating: span)) + let body = String(copying: try UTF8Span(validating: span.span)) let data = body.data(using: .utf8)! return try JSONDecoder().decode(JSONHTTPRequest.self, from: data) } @@ -1117,7 +1116,7 @@ struct ConformanceTestSuite { #expect(response.headerFields[.cached] == "true") } let (response, _) = try await responseBodyAndTrailers.collect(upTo: 5) { span in - return String(copying: try UTF8Span(validating: span)) + return String(copying: try UTF8Span(validating: span.span)) } #expect(response == expectedResponse) } @@ -1140,7 +1139,7 @@ struct ConformanceTestSuite { request: request, ) { response, responseBodyAndTrailers in let (jsonRequest, _) = try await responseBodyAndTrailers.collect(upTo: 1024) { span in - let body = String(copying: try UTF8Span(validating: span)) + let body = String(copying: try UTF8Span(validating: span.span)) let data = body.data(using: .utf8)! return try JSONDecoder().decode(JSONHTTPRequest.self, from: data) } @@ -1169,7 +1168,7 @@ struct ConformanceTestSuite { ) { response, responseBodyAndTrailers in #expect(response.status == .ok) let (body, trailers) = try await responseBodyAndTrailers.collect(upTo: 1024) { span in - return String(copying: try UTF8Span(validating: span)) + return String(copying: try UTF8Span(validating: span.span)) } // Verify the body @@ -1206,7 +1205,7 @@ struct ConformanceTestSuite { ) { response, responseBodyAndTrailers in #expect(response.status == .ok) let (jsonRequest, _) = try await responseBodyAndTrailers.collect(upTo: 1024) { span in - let body = String(copying: try UTF8Span(validating: span)) + let body = String(copying: try UTF8Span(validating: span.span)) let data = body.data(using: .utf8)! return try JSONDecoder().decode(JSONHTTPRequest.self, from: data) } diff --git a/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPRequestConcludingAsyncReader.swift b/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPRequestConcludingAsyncReader.swift index 00ce7e1..8d2c56d 100644 --- a/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPRequestConcludingAsyncReader.swift +++ b/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPRequestConcludingAsyncReader.swift @@ -11,7 +11,8 @@ // //===----------------------------------------------------------------------===// -public import AsyncStreaming +public import BasicContainers +public import HTTPAPIs public import HTTPTypes import NIOCore import NIOHTTPTypes @@ -37,6 +38,9 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable /// The type of errors that can occur during reading operations. public typealias ReadFailure = any Error + /// The buffer type used to hand elements to the caller. + public typealias Buffer = UniqueArray + /// The HTTP trailer fields captured at the end of the request. fileprivate var state: ReaderState @@ -55,14 +59,8 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable } /// Reads a chunk of request body data. - /// - /// - Parameter body: A function that consumes the read element (or nil for end of stream) - /// and returns a value of type `Return`. - /// - Returns: The value returned by the body function after processing the read element. - /// - Throws: An error if the reading operation fails. - public mutating func read( - maximumCount: Int?, - body: nonisolated(nonsending) (consuming Span) async throws(Failure) -> Return + public mutating func read( + body: nonisolated(nonsending) (inout UniqueArray) async throws(Failure) -> Return ) async throws(EitherError) -> Return { let requestPart: HTTPRequestPart? do { @@ -71,21 +69,27 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable throw .first(error) } - do { - switch requestPart { - case .head: - fatalError() - case .body(let element): - return try await body(Array(buffer: element).span) - case .end(let trailers): - self.state.wrapped.withLock { state in - state.trailers = trailers - state.finishedReading = true - } - return try await body(.init()) - case .none: - return try await body(.init()) + var buffer = UniqueArray() + switch requestPart { + case .head: + fatalError() + case .body(let element): + buffer.reserveCapacity(element.readableBytes) + unsafe element.withUnsafeReadableBytes { rawBufferPtr in + let usbptr = unsafe rawBufferPtr.assumingMemoryBound(to: UInt8.self) + unsafe buffer.append(copying: usbptr) + } + case .end(let trailers): + self.state.wrapped.withLock { state in + state.trailers = trailers + state.finishedReading = true } + case .none: + break + } + + do { + return try await body(&buffer) } catch { throw .second(error) } diff --git a/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPRequestContext.swift b/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPRequestContext.swift deleted file mode 100644 index 63e2909..0000000 --- a/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPRequestContext.swift +++ /dev/null @@ -1,19 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -/// A context object that carries additional information about an HTTP request. -/// -/// `HTTPRequestContext` provides a way to pass metadata through the HTTP request pipeline. -public struct HTTPRequestContext: Sendable { - public init() {} -} diff --git a/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPResponseConcludingAsyncWriter.swift b/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPResponseConcludingAsyncWriter.swift index f89c20c..e34cc67 100644 --- a/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPResponseConcludingAsyncWriter.swift +++ b/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPResponseConcludingAsyncWriter.swift @@ -11,8 +11,8 @@ // //===----------------------------------------------------------------------===// -public import AsyncStreaming -import BasicContainers +public import BasicContainers +public import HTTPAPIs public import HTTPTypes import NIOCore import NIOHTTPTypes @@ -40,6 +40,9 @@ public struct HTTPResponseConcludingAsyncWriter: ConcludingAsyncWriter, ~Copyabl /// The type of errors that can occur during writing operations. public typealias WriteFailure = any Error + /// The buffer type used to receive elements from the caller. + public typealias Buffer = UniqueArray + /// The underlying NIO writer for HTTP response parts. private var writer: NIOAsyncChannelOutboundWriter @@ -51,33 +54,24 @@ public struct HTTPResponseConcludingAsyncWriter: ConcludingAsyncWriter, ~Copyabl } /// Writes a chunk of response body data to the underlying writer. - /// - /// - Parameter element: A span of bytes representing the body chunk to write. - /// - Throws: An error if the writing operation fails. - public mutating func write( - _ body: nonisolated(nonsending) (inout OutputSpan) async throws(Failure) -> Result - ) async throws(EitherError) -> Result { - var buffer = RigidArray.init(capacity: 1024) - - let result: Result + public mutating func write( + _ body: nonisolated(nonsending) (inout UniqueArray) async throws(Failure) -> Return + ) async throws(EitherError) -> Return { + var buffer = UniqueArray() + let result: Return do { - result = try await buffer.append(count: 1024) { span in - try await body(&span) - } + result = try await body(&buffer) } catch { - throw .first(error) + throw .second(error) } - var byteBuffer = ByteBuffer() - byteBuffer.reserveCapacity(buffer.count) - for index in buffer.indices { - byteBuffer.writeInteger(buffer[index]) + if buffer.count == 0 { + return result } - // buffer.span.withUnsafeBufferPointer { buffer in - // <#code#> - // } - // var byteBuffer = ByteBuffer() + var byteBuffer = ByteBuffer() + byteBuffer.reserveCapacity(buffer.count) + unsafe byteBuffer.writeBytes(buffer.span.bytes) do { try await self.writer.write(.body(byteBuffer)) @@ -86,18 +80,6 @@ public struct HTTPResponseConcludingAsyncWriter: ConcludingAsyncWriter, ~Copyabl } return result - - // let pointer = buffer.withUnsafeMutableBufferPointer { $0 } - // var span = OutputSpan( - // buffer: pointer, - // initializedCount: 0 - // ) - // do { - // let bodyResult = try await body(&span) - // - // } catch { - // throw .second(error) - // } } } diff --git a/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPResponseSender.swift b/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPResponseSender.swift deleted file mode 100644 index 60a9e00..0000000 --- a/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPResponseSender.swift +++ /dev/null @@ -1,59 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -public import AsyncStreaming -public import HTTPTypes - -/// This type ensures that a single non-informational (1xx) `HTTPResponse` is sent back to the client when handling a request. -/// -/// The user will get a ``HTTPResponseSender`` as part of -/// ``HTTPServerRequestHandler/handle(request:requestContext:requestBodyAndTrailers:responseSender:)``, and they -/// will only be allowed to call ``send(_:)`` once before the sender is consumed and cannot be referenced again. -/// ``sendInformational(_:)`` may be called zero or more times. -/// -/// This forces structure in the response flow, requiring users to send a single response before they can stream a response body and -/// trailers using the returned `ResponseWriter`. -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -public struct HTTPResponseSender: ~Copyable -where ResponseWriter.Underlying: ~Copyable & ~Escapable { - private let _sendInformational: (HTTPResponse) async throws -> Void - private let _send: (HTTPResponse) async throws -> ResponseWriter - - public init( - send: @escaping (HTTPResponse) async throws -> ResponseWriter, - sendInformational: @escaping (HTTPResponse) async throws -> Void - ) { - self._send = send - self._sendInformational = sendInformational - } - - /// Send the given `HTTPResponse` and get back a `ResponseWriter` to which to write a response body and trailers. - /// - Parameter response: The final `HTTPResponse` to send back to the client. - /// - Returns: The `ResponseWriter` to which to write a response body and trailers. - /// - Important: Note this method is consuming: after you send this response, you won't be able to send any more responses. - /// If you need to send an informational (1xx) response, use ``sendInformational(_:)`` instead. - consuming public func send(_ response: HTTPResponse) async throws -> ResponseWriter { - precondition(response.status.kind != .informational) - return try await self._send(response) - } - - /// Send the given informational (1xx) response. - /// - Parameter response: An informational `HTTPResponse` to send back to the client. - public func sendInformational(_ response: HTTPResponse) async throws { - precondition(response.status.kind == .informational) - return try await _sendInformational(response) - } -} - -@available(*, unavailable) -extension HTTPResponseSender: Sendable {} diff --git a/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPServer.swift b/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPServer.swift deleted file mode 100644 index db89875..0000000 --- a/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPServer.swift +++ /dev/null @@ -1,68 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -public import AsyncStreaming -public import HTTPTypes - -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -/// A protocol that defines the interface for an HTTP server. -/// -/// ``HTTPServer`` provides the contract for server implementations that accept incoming HTTP connections and process requests -/// using an ``HTTPServerRequestHandler``. -public protocol HTTPServer: Sendable, ~Copyable, ~Escapable { - /// The ``ConcludingAsyncReader`` to use when reading requests. ``ConcludingAsyncReader/FinalElement`` - /// must be an optional `HTTPFields`, and ``ConcludingAsyncReader/Underlying`` must use `UInt8` as its - /// `ReadElement`. - associatedtype RequestReader: ConcludingAsyncReader & ~Copyable & SendableMetatype - where - RequestReader.Underlying: ~Copyable, - RequestReader.Underlying.ReadElement == UInt8, - RequestReader.Underlying.ReadFailure == any Error, - RequestReader.FinalElement == HTTPFields? - - /// The ``ConcludingAsyncWriter`` to use when writing responses. ``ConcludingAsyncWriter/FinalElement`` - /// must be an optional `HTTPFields`, and ``ConcludingAsyncWriter/Underlying`` must use `UInt8` as its - /// `WriteElement`. - associatedtype ResponseWriter: ConcludingAsyncWriter & ~Copyable & SendableMetatype - where - ResponseWriter.Underlying: ~Copyable, - ResponseWriter.Underlying.WriteElement == UInt8, - ResponseWriter.Underlying.WriteFailure == any Error, - ResponseWriter.FinalElement == HTTPFields? - - /// Starts an HTTP server with the specified request handler. - /// - /// This method creates and runs an HTTP server that processes incoming requests using the provided - /// ``HTTPServerRequestHandler`` implementation. - /// - /// Implementations of this method should handle each connection concurrently using Swift's structured concurrency. - /// - /// - Parameters: - /// - handler: A ``HTTPServerRequestHandler`` implementation that processes incoming HTTP requests. The handler - /// receives each request along with its context, a body and trailers reader, and an ``HTTPResponseSender``. - /// - /// ## Example - /// - /// ```swift - /// let server = // create an instance of a type conforming to the `HTTPServer` protocol - /// try await server.serve(handler: YourRequestHandler()) - /// ``` - func serve( - handler: RequestHandler - ) async throws - where - RequestHandler.RequestReader == RequestReader, - RequestHandler.ResponseWriter == ResponseWriter, - RequestHandler.RequestReader: ~Copyable, - RequestHandler.ResponseWriter: ~Copyable -} diff --git a/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPServerClosureRequestHandler.swift b/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPServerClosureRequestHandler.swift deleted file mode 100644 index 3c22f5e..0000000 --- a/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPServerClosureRequestHandler.swift +++ /dev/null @@ -1,131 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -public import AsyncStreaming -public import HTTPTypes - -/// A closure-based implementation of ``HTTPServerRequestHandler``. -/// -/// ``HTTPServerClosureRequestHandler`` provides a convenient way to create an HTTP request handler -/// using a closure instead of conforming a custom type to the ``HTTPServerRequestHandler`` protocol. -/// This is useful for simple handlers or when you need to create handlers dynamically. -/// -/// - Example: -/// ```swift -/// let echoHandler = HTTPServerClosureRequestHandler { request, context, bodyReader, responseSender in -/// // Read the entire request body -/// let (bodyData, _) = try await bodyReader.consumeAndConclude { reader in -/// // ... body reading code ... -/// } -/// -/// // Create and send response -/// var response = HTTPResponse(status: .ok) -/// let responseWriter = try await responseSender.send(response) -/// try await responseWriter.produceAndConclude { writer in -/// try await writer.write(bodyData.span) -/// return ((), nil) -/// } -/// } -/// ``` -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -public struct HTTPServerClosureRequestHandler< - ConcludingRequestReader: ConcludingAsyncReader & ~Copyable & SendableMetatype, - RequestReader: AsyncReader & ~Copyable, - ConcludingResponseWriter: ConcludingAsyncWriter & ~Copyable & SendableMetatype, - RequestWriter: AsyncWriter & ~Copyable ->: HTTPServerRequestHandler { - public typealias ResponseWriter = ConcludingResponseWriter - public typealias RequestReader = ConcludingRequestReader - - /// The underlying closure that handles HTTP requests. - private let _handler: - nonisolated(nonsending) @Sendable ( - HTTPRequest, - HTTPRequestContext, - consuming sending ConcludingRequestReader, - consuming sending HTTPResponseSender - ) async throws -> Void - - /// Creates a new closure-based HTTP request handler. - /// - /// - Parameter handler: A closure that will be called to handle each incoming HTTP request. - /// The closure takes the same parameters as the ``HTTPServerRequestHandler/handle(request:requestBodyAndTrailers:responseSender:)`` method. - public init( - handler: - nonisolated(nonsending) @Sendable @escaping ( - HTTPRequest, - HTTPRequestContext, - consuming sending ConcludingRequestReader, - consuming sending HTTPResponseSender - ) async throws -> Void - ) { - self._handler = handler - } - - /// Handles an incoming HTTP request by delegating to the closure provided at initialization. - /// - /// This method simply forwards all parameters to the handler closure. - /// - /// - Parameters: - /// - request: The HTTP request headers and metadata. - /// - requestContext: A ``HTTPRequestContext``. - /// - requestBodyAndTrailers: A reader for accessing the request body data and trailing headers. - /// - responseSender: An ``HTTPResponseSender`` to send the HTTP response. - public func handle( - request: HTTPRequest, - requestContext: HTTPRequestContext, - requestBodyAndTrailers: consuming sending ConcludingRequestReader, - responseSender: consuming sending HTTPResponseSender - ) async throws { - try await self._handler(request, requestContext, requestBodyAndTrailers, responseSender) - } -} - -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -extension HTTPServer where RequestReader.Underlying: Escapable, ResponseWriter.Underlying: Escapable { - /// Starts an HTTP server with a closure-based request handler. - /// - /// This method provides a convenient way to start an HTTP server using a closure to handle incoming requests. - /// - /// - Parameters: - /// - handler: An async closure that processes HTTP requests. The closure receives: - /// - `HTTPRequest`: The incoming HTTP request with headers and metadata. - /// - ``HTTPRequestContext``: The request's context. - /// - ``HTTPRequestConcludingAsyncReader``: An async reader for consuming the request body and trailers. - /// - ``HTTPResponseSender``: A non-copyable wrapper for a function that accepts an `HTTPResponse` and provides access to an ``HTTPResponseConcludingAsyncWriter``. - /// - /// ## Example - /// - /// ```swift - /// try await server.serve { request, bodyReader, responseSender in - /// // Process the request - /// let response = HTTPResponse(status: .ok) - /// let writer = try await responseSender.send(response) - /// try await writer.produceAndConclude { writer in - /// try await writer.write("Hello, World!".utf8) - /// return ((), nil) - /// } - /// } - /// ``` - public func serve( - handler: - nonisolated(nonsending) @Sendable @escaping ( - _ request: HTTPRequest, - _ requestContext: HTTPRequestContext, - _ requestBodyAndTrailers: consuming sending RequestReader, - _ responseSender: consuming sending HTTPResponseSender - ) async throws -> Void - ) async throws { - try await self.serve(handler: HTTPServerClosureRequestHandler(handler: handler)) - } -} diff --git a/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPServerRequestHandler.swift b/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPServerRequestHandler.swift deleted file mode 100644 index f617f4a..0000000 --- a/Sources/HTTPClientConformance/HTTPServerForTesting/HTTPServerRequestHandler.swift +++ /dev/null @@ -1,119 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -public import AsyncStreaming -public import HTTPTypes - -/// A protocol that defines the contract for handling HTTP server requests. -/// -/// ``HTTPServerRequestHandler`` provides a structured way to process incoming HTTP requests and generate appropriate responses. -/// Conforming types implement the ``handle(request:requestContext:requestBodyAndTrailers:responseSender:)`` method, -/// which is called by the HTTP server for each incoming request. The handler is responsible for: -/// -/// - Processing the request headers. -/// - Reading the request body data using the provided `RequestReader` -/// - Generating and sending an appropriate response using the response callback -/// -/// This protocol fully supports bi-directional streaming HTTP request handling including the optional request and response trailers. -/// -/// # Example -/// -/// ```swift -/// struct EchoHandler: HTTPServerRequestHandler { -/// func handle( -/// request: HTTPRequest, -/// requestContext: HTTPRequestContext, -/// requestConcludingAsyncReader: consuming sending HTTPRequestConcludingAsyncReader, -/// responseSender: consuming sending HTTPResponseSender -/// ) async throws { -/// // Read the entire request body -/// let (bodyData, trailers) = try await requestConcludingAsyncReader.consumeAndConclude { reader in -/// var reader = reader -/// var data = [UInt8]() -/// var shouldContinue = true -/// while shouldContinue { -/// try await reader.read { span in -/// guard let span else { -/// shouldContinue = false -/// return -/// } -/// data.reserveCapacity(data.count + span.count) -/// for index in span.indices { -/// data.append(span[index]) -/// } -/// } -/// } -/// return data -/// } -/// -/// // Create a response -/// var response = HTTPResponse(status: .ok) -/// response.headerFields[.contentType] = "text/plain" -/// -/// // Send the response and write the echo data back -/// let responseWriter = try await responseSender.send(response) -/// try await responseWriter.produceAndConclude { writer in -/// var writer = writer -/// try await writer.write(bodyData.span) -/// return ((), nil) // No trailers -/// } -/// } -/// } -/// ``` -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -public protocol HTTPServerRequestHandler: Sendable { - /// The ``ConcludingAsyncReader`` to use when reading requests. ``ConcludingAsyncReader/FinalElement`` - /// must be an optional `HTTPFields`, and ``ConcludingAsyncReader/Underlying`` must use `Span` as its - /// `ReadElement`. - associatedtype RequestReader: ConcludingAsyncReader & ~Copyable & SendableMetatype - where - RequestReader.Underlying: ~Copyable, - RequestReader.Underlying.ReadElement == UInt8, - RequestReader.FinalElement == HTTPFields? - - /// The ``ConcludingAsyncWriter`` to use when writing responses. ``ConcludingAsyncWriter/FinalElement`` - /// must be an optional `HTTPFields`, and ``ConcludingAsyncWriter/Underlying`` must use `Span` as its - /// `WriteElement`. - associatedtype ResponseWriter: ConcludingAsyncWriter & ~Copyable & SendableMetatype - where - ResponseWriter.Underlying: ~Copyable, - ResponseWriter.Underlying.WriteElement == UInt8, - ResponseWriter.FinalElement == HTTPFields? - - /// Handles an incoming HTTP request and generates a response. - /// - /// This method is called by the HTTP server for each incoming client request. Implementations should: - /// 1. Examine the request headers in the `request` parameter - /// 2. Read the request body data from the `RequestReader` as needed - /// 3. Process the request and prepare a response - /// 4. Optionally call ``HTTPResponseSender/sendInformational(_:)`` as needed - /// 4. Call the ``HTTPResponseSender/send(_:)`` with an appropriate HTTP response - /// 5. Write the response body data to the returned `ResponseWriter` - /// - /// - Parameters: - /// - request: The HTTP request headers and metadata. - /// - requestContext: A ``HTTPRequestContext``. - /// - requestBodyAndTrailers: A reader for accessing the request body data and trailing headers. - /// This follows the `ConcludingAsyncReader` pattern, allowing for incremental reading of request body data - /// and concluding with any trailer fields sent at the end of the request. - /// - responseSender: An ``HTTPResponseSender`` that takes an HTTP response and returns a writer for the - /// response body. The returned writer allows for the incremental writing of the response body, and supports trailers. - /// - /// - Throws: Any error encountered during request processing or response generation. - func handle( - request: HTTPRequest, - requestContext: HTTPRequestContext, - requestBodyAndTrailers: consuming sending RequestReader, - responseSender: consuming sending HTTPResponseSender - ) async throws -} diff --git a/Sources/HTTPClientConformance/HTTPServerForTesting/NIOHTTPServer+HTTP1_1.swift b/Sources/HTTPClientConformance/HTTPServerForTesting/NIOHTTPServer+HTTP1_1.swift index 8811f54..8bffc4f 100644 --- a/Sources/HTTPClientConformance/HTTPServerForTesting/NIOHTTPServer+HTTP1_1.swift +++ b/Sources/HTTPClientConformance/HTTPServerForTesting/NIOHTTPServer+HTTP1_1.swift @@ -11,6 +11,7 @@ // //===----------------------------------------------------------------------===// +import HTTPAPIs import NIOCore import NIOEmbedded import NIOHTTP1 @@ -22,7 +23,7 @@ import NIOPosix extension NIOHTTPServer { func serveInsecureHTTP1_1( bindTarget: NIOHTTPServerConfiguration.BindTarget, - handler: some HTTPServerRequestHandler, + handler: some HTTPServerRequestHandler, asyncChannelConfiguration: NIOAsyncChannel.Configuration ) async throws { let serverChannel = try await self.setupHTTP1_1ServerChannel( @@ -70,7 +71,7 @@ extension NIOHTTPServer { func _serveInsecureHTTP1_1( serverChannel: NIOAsyncChannel, Never>, - handler: some HTTPServerRequestHandler + handler: some HTTPServerRequestHandler ) async throws { try await withThrowingDiscardingTaskGroup { group in try await serverChannel.executeThenClose { inbound in diff --git a/Sources/HTTPClientConformance/HTTPServerForTesting/NIOHTTPServer+SecureUpgrade.swift b/Sources/HTTPClientConformance/HTTPServerForTesting/NIOHTTPServer+SecureUpgrade.swift index 2b5c863..a46b3a5 100644 --- a/Sources/HTTPClientConformance/HTTPServerForTesting/NIOHTTPServer+SecureUpgrade.swift +++ b/Sources/HTTPClientConformance/HTTPServerForTesting/NIOHTTPServer+SecureUpgrade.swift @@ -11,6 +11,7 @@ // //===----------------------------------------------------------------------===// +import HTTPAPIs import Logging import NIOCore import NIOEmbedded @@ -27,7 +28,7 @@ extension NIOHTTPServer { func serveSecureUpgrade( bindTarget: NIOHTTPServerConfiguration.BindTarget, tlsConfiguration: TLSConfiguration, - handler: some HTTPServerRequestHandler, + handler: some HTTPServerRequestHandler, asyncChannelConfiguration: NIOAsyncChannel.Configuration, http2Configuration: NIOHTTP2Handler.Configuration, verificationCallback: (@Sendable ([X509.Certificate]) async throws -> CertificateVerificationResult)? = nil @@ -119,7 +120,7 @@ extension NIOHTTPServer { func _serveSecureUpgrade( serverChannel: NIOAsyncChannel, Never>, - handler: some HTTPServerRequestHandler + handler: some HTTPServerRequestHandler ) async throws { try await withThrowingDiscardingTaskGroup { group in try await serverChannel.executeThenClose { inbound in diff --git a/Sources/HTTPClientConformance/HTTPServerForTesting/NIOHTTPServer.swift b/Sources/HTTPClientConformance/HTTPServerForTesting/NIOHTTPServer.swift index 5fee4b3..416728a 100644 --- a/Sources/HTTPClientConformance/HTTPServerForTesting/NIOHTTPServer.swift +++ b/Sources/HTTPClientConformance/HTTPServerForTesting/NIOHTTPServer.swift @@ -11,6 +11,7 @@ // //===----------------------------------------------------------------------===// +public import HTTPAPIs import HTTPTypes public import Logging import NIOCertificateReloading @@ -77,8 +78,8 @@ import X509 /// ``` @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) public struct NIOHTTPServer: HTTPServer { - public typealias RequestReader = HTTPRequestConcludingAsyncReader - public typealias ResponseWriter = HTTPResponseConcludingAsyncWriter + public typealias RequestConcludingReader = HTTPRequestConcludingAsyncReader + public typealias ResponseConcludingWriter = HTTPResponseConcludingAsyncWriter let logger: Logger private let configuration: NIOHTTPServerConfiguration @@ -144,7 +145,7 @@ public struct NIOHTTPServer: HTTPServer { /// handler: EchoHandler() /// ) /// ``` - public func serve(handler: some HTTPServerRequestHandler) async throws { + public func serve(handler: some HTTPServerRequestHandler) async throws { defer { switch self.listeningAddressState.withLockedValue({ $0.close() }) { case .failPromise(let promise, let error): @@ -264,7 +265,7 @@ public struct NIOHTTPServer: HTTPServer { func handleRequestChannel( channel: NIOAsyncChannel, - handler: some HTTPServerRequestHandler + handler: some HTTPServerRequestHandler ) async throws { do { try await channel diff --git a/Sources/HTTPClientConformance/HTTPServerForTesting/RequestResponseMiddlewareBox.swift b/Sources/HTTPClientConformance/HTTPServerForTesting/RequestResponseMiddlewareBox.swift index 1aeb4a3..473f183 100644 --- a/Sources/HTTPClientConformance/HTTPServerForTesting/RequestResponseMiddlewareBox.swift +++ b/Sources/HTTPClientConformance/HTTPServerForTesting/RequestResponseMiddlewareBox.swift @@ -11,7 +11,7 @@ // //===----------------------------------------------------------------------===// -public import AsyncStreaming +public import HTTPAPIs public import HTTPTypes /// This type holds the values passed to the ``HTTPServerRequestHandler`` when handling a request. diff --git a/Sources/HTTPClientConformance/HTTPServerForTesting/TestHTTPServer.swift b/Sources/HTTPClientConformance/HTTPServerForTesting/TestHTTPServer.swift index 643d349..04feecd 100644 --- a/Sources/HTTPClientConformance/HTTPServerForTesting/TestHTTPServer.swift +++ b/Sources/HTTPClientConformance/HTTPServerForTesting/TestHTTPServer.swift @@ -12,7 +12,10 @@ //===----------------------------------------------------------------------===// import AsyncStreaming +import BasicContainers +import ContainersPreview import Foundation +import HTTPAPIs import HTTPTypes import Logging import Synchronization @@ -119,7 +122,7 @@ func serve(server: NIOHTTPServer) async throws { // Parse the body as a UTF8 string and capture trailers let (body, requestTrailers) = try await requestBodyAndTrailers.collect(upTo: 1024) { span in - return String(copying: try UTF8Span(validating: span)) + return String(copying: try UTF8Span(validating: span.span)) } // Collect the trailers that were sent in with the request @@ -334,10 +337,11 @@ func serve(server: NIOHTTPServer) async throws { try await writer.write("A".utf8.span) // Wait for the client to write the same chunk to the request body - try await reader.read(maximumCount: 1) { span in - if span.count != 1 || span[0] != UInt8(ascii: "A") { + try await reader.read { buffer in + if buffer.count != 1 || buffer[buffer.startIndex] != UInt8(ascii: "A") { assertionFailure("Received unexpected span") } + buffer.removeAll() } } } diff --git a/Sources/URLSessionHTTPClient/URLSessionHTTPClient.swift b/Sources/URLSessionHTTPClient/URLSessionHTTPClient.swift index 499e462..ca30f06 100644 --- a/Sources/URLSessionHTTPClient/URLSessionHTTPClient.swift +++ b/Sources/URLSessionHTTPClient/URLSessionHTTPClient.swift @@ -14,6 +14,7 @@ @_exported public import HTTPAPIs #if canImport(Darwin) +public import BasicContainers import Foundation import HTTPTypesFoundation import NetworkTypes @@ -23,39 +24,99 @@ import Synchronization @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) public final class URLSessionHTTPClient: HTTPClient, IdleTimerEntryProvider { public struct RequestWriter: AsyncWriter, ~Copyable { - public mutating func write( - _ body: (inout OutputSpan) async throws(Failure) -> Result - ) async throws(AsyncStreaming.EitherError) -> Result where Failure: Error { - try await self.actual.write(body) - } + public typealias WriteElement = UInt8 + public typealias WriteFailure = any Error + public typealias Buffer = UniqueArray + + var actual: URLSessionRequestStreamBridge + var buffer: UniqueArray? - public mutating func write( - _ span: Span - ) async throws(EitherError) { - try await self.actual.write(span) + init(actual: URLSessionRequestStreamBridge) { + self.actual = actual + self.buffer = UniqueArray(minimumCapacity: 1024) } - var actual: URLSessionRequestStreamBridge + public mutating func write( + _ body: (inout UniqueArray) async throws(Failure) -> Return + ) async throws(AsyncStreaming.EitherError) -> Return where Failure: Error { + let result: Return + // This force-unwrap is safe since there can only be one concurrent write. + var buffer = self.buffer.take()! + do { + result = try await body(&buffer) + } catch { + buffer.removeAll() + self.buffer = consume buffer + throw .second(error) + } + if buffer.count == 0 { + self.buffer = consume buffer + return result + } + + do { + try await self.actual.internalWrite(buffer.span) + buffer.removeAll() + self.buffer = consume buffer + } catch { + buffer.removeAll() + self.buffer = consume buffer + throw .first(error) + } + return result + } } public struct ResponseConcludingReader: ConcludingAsyncReader, ~Copyable { public struct Underlying: AsyncReader, ~Copyable { - public mutating func read( - maximumCount: Int?, - body: (consuming Span) async throws(Failure) -> Return - ) async throws(AsyncStreaming.EitherError) -> Return where Failure: Error { - try await self.actual.read(maximumCount: maximumCount, body: body) - } + public typealias ReadElement = UInt8 + public typealias ReadFailure = any Error + public typealias Buffer = UniqueArray var actual: URLSessionTaskDelegateBridge + var buffer: UniqueArray? + + init(actual: URLSessionTaskDelegateBridge) { + self.actual = actual + self.buffer = UniqueArray(minimumCapacity: 1024) + } + + public mutating func read( + body: (inout UniqueArray) async throws(Failure) -> Return + ) async throws(AsyncStreaming.EitherError) -> Return where Failure: Error { + let data: Data? + do { + data = try await self.actual.data(maximumCount: nil) + } catch { + throw .first(error) + } + + // This force-unwrap is safe since there can only be one concurrent read. + var buffer = self.buffer.take()! + if let data, !data.isEmpty { + buffer.reserveCapacity(data.count) + buffer.append(copying: data.span) + } + + let result: Return + do { + result = try await body(&buffer) + } catch { + buffer.removeAll() + self.buffer = consume buffer + throw .second(error) + } + buffer.removeAll() + self.buffer = consume buffer + return result + } } public func consumeAndConclude( body: (consuming sending Underlying) async throws(Failure) -> Return ) async throws(Failure) -> (Return, HTTPFields?) where Failure: Error { - try await self.actual.consumeAndConclude { actual throws(Failure) in - try await body(Underlying(actual: actual)) - } + let result = try await body(Underlying(actual: self.actual)) + return (result, self.actual.responseTrailerFields) } let actual: URLSessionTaskDelegateBridge @@ -311,7 +372,7 @@ public final class URLSessionHTTPClient: HTTPClient, IdleTimerEntryProvider { let delegateBridge: URLSessionTaskDelegateBridge if let body { task = session.startTask().uploadTask(withStreamedRequest: request) - delegateBridge = URLSessionTaskDelegateBridge(task: task, body: .init(other: body, transform: RequestWriter.init)) + delegateBridge = URLSessionTaskDelegateBridge(task: task, body: body) } else { task = session.startTask().dataTask(with: request) delegateBridge = URLSessionTaskDelegateBridge(task: task, body: nil) diff --git a/Sources/URLSessionHTTPClient/URLSessionRequestStreamBridge.swift b/Sources/URLSessionHTTPClient/URLSessionRequestStreamBridge.swift index 7a3a062..0f295eb 100644 --- a/Sources/URLSessionHTTPClient/URLSessionRequestStreamBridge.swift +++ b/Sources/URLSessionHTTPClient/URLSessionRequestStreamBridge.swift @@ -55,7 +55,7 @@ final class URLSessionRequestStreamBridge: NSObject, StreamDelegate, Sendable { self.lockedState.withLock(\.writeFailed) } - private func internalWrite(_ span: Span) async throws { + func internalWrite(_ span: Span) async throws { self.lockedState.withLock { state in if !state.outputStreamOpened { state.outputStreamOpened = true @@ -72,7 +72,7 @@ final class URLSessionRequestStreamBridge: NSObject, StreamDelegate, Sendable { try Task.checkCancellation() try await self.waitForSpace() let written = try self.lockedState.withLock { state in - let written = unsafe remaining.withUnsafeBufferPointer { buffer in + let written = remaining.withUnsafeBufferPointer { buffer in unsafe state.outputStream.write(buffer.baseAddress!, maxLength: buffer.count) } if written < 0 { @@ -138,35 +138,4 @@ final class URLSessionRequestStreamBridge: NSObject, StreamDelegate, Sendable { } } } - -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -extension URLSessionRequestStreamBridge: AsyncWriter { - func write( - _ body: (inout OutputSpan) async throws(Failure) -> Result - ) async throws(EitherError) -> Result { - // TODO: Either this needs to be inline or configurable - var array = RigidArray(capacity: 1024) - do { - let result = try await array.append(count: 1024) { outputSpan in - try await body(&outputSpan) - } - try await self.internalWrite(array.span) - return result - } catch let error as Failure { - throw .second(error) - } catch { - throw .first(error) - } - } - - func write( - _ span: Span - ) async throws(EitherError) { - do { - try await self.internalWrite(span) - } catch { - throw .first(error) - } - } -} #endif diff --git a/Sources/URLSessionHTTPClient/URLSessionTaskDelegateBridge+ConcludingAsyncReader.swift b/Sources/URLSessionHTTPClient/URLSessionTaskDelegateBridge+ConcludingAsyncReader.swift deleted file mode 100644 index 3e05610..0000000 --- a/Sources/URLSessionHTTPClient/URLSessionTaskDelegateBridge+ConcludingAsyncReader.swift +++ /dev/null @@ -1,53 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -#if canImport(Darwin) -import HTTPAPIs -import Foundation - -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -extension URLSessionTaskDelegateBridge: ConcludingAsyncReader { - func consumeAndConclude( - body: (consuming sending URLSessionTaskDelegateBridge) async throws(Failure) -> Return - ) async throws(Failure) -> (Return, HTTPFields?) { - try await (body(self), self.responseTrailerFields) - } -} - -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -extension URLSessionTaskDelegateBridge: AsyncReader { - func read( - maximumCount: Int?, - body: (consuming Span) async throws(Failure) -> Return - ) async throws(EitherError) -> Return { - let data: Data? - do { - data = try await self.data(maximumCount: maximumCount) - } catch { - throw .first(error) - } - guard let data else { - do { - return try await body(InlineArray<0, UInt8>.zero().span) - } catch { - throw .second(error) - } - } - do { - return try await body(data.span) - } catch { - throw .second(error) - } - } -} -#endif diff --git a/Sources/URLSessionHTTPClient/URLSessionTaskDelegateBridge.swift b/Sources/URLSessionHTTPClient/URLSessionTaskDelegateBridge.swift index a52f824..58295a2 100644 --- a/Sources/URLSessionHTTPClient/URLSessionTaskDelegateBridge.swift +++ b/Sources/URLSessionHTTPClient/URLSessionTaskDelegateBridge.swift @@ -39,11 +39,11 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionDataDele // limits. private let stream: AsyncStream private let continuation: AsyncStream.Continuation - private let requestBody: HTTPClientRequestBody? + private let requestBody: HTTPClientRequestBody? // TODO: Can we get rid of this task and instead use on task group per client? private let requestBodyTask: Mutex?> = .init(nil) - init(task: URLSessionTask, body: consuming HTTPClientRequestBody?) { + init(task: URLSessionTask, body: consuming HTTPClientRequestBody?) { self.task = task var continuation: AsyncStream.Continuation? self.stream = AsyncStream { continuation = $0 } @@ -260,7 +260,7 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionDataDele let bridge = URLSessionRequestStreamBridge(task: task) completionHandler(bridge.inputStream) do { - let trailerFields = try await requestBody.produce(into: bridge) + let trailerFields = try await requestBody.produce(into: URLSessionHTTPClient.RequestWriter(actual: bridge)) bridge.close(trailerFields: trailerFields) } catch { if bridge.writeFailed { @@ -291,7 +291,7 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionDataDele let bridge = URLSessionRequestStreamBridge(task: task) completionHandler(bridge.inputStream) do { - let trailerFields = try await requestBody.produce(offset: offset, into: bridge) + let trailerFields = try await requestBody.produce(offset: offset, into: URLSessionHTTPClient.RequestWriter(actual: bridge)) bridge.close(trailerFields: trailerFields) } catch { if bridge.writeFailed { diff --git a/Tests/AsyncStreamingTests/Helpers/Array+Span.swift b/Tests/AsyncStreamingTests/Helpers/Array+Span.swift deleted file mode 100644 index 8a21041..0000000 --- a/Tests/AsyncStreamingTests/Helpers/Array+Span.swift +++ /dev/null @@ -1,27 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -extension Array { - init(_ span: Span) { - self.init() - for index in span.indices { - self.append(span[index]) - } - } - - mutating func append(span: Span) { - for index in span.indices { - self.append(span[index]) - } - } -} diff --git a/Tests/AsyncStreamingTests/Helpers/TestConcludingAsyncReader.swift b/Tests/AsyncStreamingTests/Helpers/TestConcludingAsyncReader.swift deleted file mode 100644 index 4ff4d13..0000000 --- a/Tests/AsyncStreamingTests/Helpers/TestConcludingAsyncReader.swift +++ /dev/null @@ -1,62 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import AsyncStreaming - -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -struct TestConcludingReader: ConcludingAsyncReader { - struct UnderlyingReader: AsyncReader { - typealias ReadElement = Int - typealias ReadFailure = Never - - var data: [Int] - var position: Int = 0 - - mutating func read( - maximumCount: Int?, - body: (consuming Span) async throws(Failure) -> Return - ) async throws(EitherError) -> Return { - do { - guard position < data.count else { - return try await body([Int]().span) - } - - let count: Int - if let maximumCount { - count = min(maximumCount, data.count - position) - } else { - count = data.count - position - } - - let endIndex = position + count - defer { position = endIndex } - return try await body(data[position..( - body: (consuming sending Underlying) async throws(Failure) -> Return - ) async throws(Failure) -> (Return, Int) { - let reader = UnderlyingReader(data: data) - let result = try await body(reader) - return (result, data.count) - } -} diff --git a/Tests/AsyncStreamingTests/Helpers/TestConcludingAsyncWriter.swift b/Tests/AsyncStreamingTests/Helpers/TestConcludingAsyncWriter.swift deleted file mode 100644 index 5a4a0d9..0000000 --- a/Tests/AsyncStreamingTests/Helpers/TestConcludingAsyncWriter.swift +++ /dev/null @@ -1,58 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import AsyncStreaming -import BasicContainers - -// Helper: A test concluding writer that accumulates elements and tracks if finalized -struct TestConcludingWriter: ConcludingAsyncWriter { - struct UnderlyingWriter: AsyncWriter { - typealias WriteElement = Int - typealias WriteFailure = Never - - var storage: [Int] - - mutating func write( - _ body: (inout OutputSpan) async throws(Failure) -> Result - ) async throws(EitherError) -> Result { - do { - var buffer = RigidArray(capacity: 10) - - return try await buffer.append(count: 10) { outputSpan async throws(Failure) -> Result in - let result = try await body(&outputSpan) - storage.append(span: outputSpan.span) - return result - } - } catch { - throw .second(error) - } - } - - mutating func write( - _ span: Span - ) async throws(EitherError) { - storage.append(span: span) - } - } - - typealias Underlying = UnderlyingWriter - typealias FinalElement = String - - consuming func produceAndConclude( - body: (consuming sending Underlying) async throws -> (Return, String) - ) async throws -> Return { - let writer = UnderlyingWriter(storage: []) - let (result, _) = try await body(writer) - return result - } -} diff --git a/Tests/AsyncStreamingTests/Helpers/TestReader.swift b/Tests/AsyncStreamingTests/Helpers/TestReader.swift deleted file mode 100644 index 1fb0df7..0000000 --- a/Tests/AsyncStreamingTests/Helpers/TestReader.swift +++ /dev/null @@ -1,48 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import AsyncStreaming -import BasicContainers - -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) -struct SimpleReader: AsyncReader { - typealias ReadElement = Int - typealias ReadFailure = Never - - var data: [Int] - var position: Int = 0 - - mutating func read( - maximumCount: Int?, - body: (consuming Span) async throws(Failure) -> Return - ) async throws(EitherError) -> Return { - do { - guard position < data.count else { - return try await body([Int]().span) - } - - let count: Int - if let maximumCount { - count = min(maximumCount, data.count - position) - } else { - count = data.count - position - } - - let endIndex = position + count - defer { position = endIndex } - return try await body(data[position..( - _ body: (inout OutputSpan) async throws(Failure) -> Result - ) async throws(EitherError) -> Result { - do { - let count = min(10, capacity - storage.count) - var buffer = RigidArray(capacity: count) - - return try await buffer.append(count: count) { outputSpan async throws(Failure) -> Result in - let result = try await body(&outputSpan) - storage.append(span: outputSpan.span) - return result - } - } catch { - throw .second(error) - } - } - - mutating func write( - _ span: Span - ) async throws(EitherError) { - guard span.count <= capacity - storage.count else { - throw .second(AsyncWriterWroteShortError()) - } - storage.append(span: span) - } -} diff --git a/Tests/AsyncStreamingTests/Reader/Array+AsyncReaderTests.swift b/Tests/AsyncStreamingTests/Reader/Array+AsyncReaderTests.swift deleted file mode 100644 index ee064b3..0000000 --- a/Tests/AsyncStreamingTests/Reader/Array+AsyncReaderTests.swift +++ /dev/null @@ -1,49 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import AsyncStreaming -import Testing - -@Suite -struct ArrayAsyncReaderTests { - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func oneSpan() async throws { - let array = [1, 2, 3].asyncReader() - var counter = 0 - await array.forEach { span in - counter += 1 - #expect(span.count == 3) - } - #expect(counter == 1) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func multipleSpans() async throws { - var array = [1, 2, 3].asyncReader() - var counter = 0 - var continueReading = true - while continueReading { - try await array.read(maximumCount: 1) { span in - guard span.count > 0 else { - continueReading = false - return - } - counter += 1 - #expect(span.count == 1) - } - } - #expect(counter == 3) - } -} diff --git a/Tests/AsyncStreamingTests/Reader/AsyncReader+collectTests.swift b/Tests/AsyncStreamingTests/Reader/AsyncReader+collectTests.swift deleted file mode 100644 index 78b8226..0000000 --- a/Tests/AsyncStreamingTests/Reader/AsyncReader+collectTests.swift +++ /dev/null @@ -1,97 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import AsyncStreaming -import BasicContainers -import Testing - -@Suite -struct AsyncReaderCollectTests { - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func collectAllElements() async { - var reader = [1, 2, 3, 4, 5].asyncReader() - - let result = await reader.collect(upTo: 10) { span in - return Array(span) - } - - #expect(result == [1, 2, 3, 4, 5]) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func collectWithExactLimit() async { - var reader = [1, 2, 3, 4, 5].asyncReader() - - let result = await reader.collect(upTo: 5) { span in - return Array(span) - } - - #expect(result == [1, 2, 3, 4, 5]) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func collectEmptyReader() async { - var reader = [Int]().asyncReader() - - let result = await reader.collect(upTo: 10) { span in - return span.count - } - - #expect(result == 0) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func collectProcessesAllElements() async { - var reader = [10, 20, 30].asyncReader() - - let result = await reader.collect(upTo: 10) { span in - var sum = 0 - for i in span.indices { - sum += span[i] - } - return sum - } - - #expect(result == 60) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func collectIntoOutputSpan() async { - var reader = [1, 2, 3, 4, 5].asyncReader() - var buffer = RigidArray.init(capacity: 5) - - await buffer.append(count: 5) { outputSpan in - await reader.collect(into: &outputSpan) - } - - #expect(buffer.count == 5) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func collectWithNeverFailingReader() async { - var reader = [1, 2, 3].asyncReader() - - // This tests the Never overload - let result = await reader.collect(upTo: 10) { span in - return span.count - } - - #expect(result == 3) - } -} diff --git a/Tests/AsyncStreamingTests/Reader/AsyncReader+forEachTests.swift b/Tests/AsyncStreamingTests/Reader/AsyncReader+forEachTests.swift deleted file mode 100644 index 8695d27..0000000 --- a/Tests/AsyncStreamingTests/Reader/AsyncReader+forEachTests.swift +++ /dev/null @@ -1,138 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import AsyncStreaming -import Testing - -@Suite -struct AsyncReaderForEachTests { - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func forEachIteratesAllSpans() async throws { - let reader = [1, 2, 3, 4, 5].asyncReader() - var elementCount = 0 - - await reader.forEach { span in - elementCount += span.count - } - - #expect(elementCount == 5) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func forEachProcessesElements() async throws { - let reader = [10, 20, 30].asyncReader() - var sum = 0 - - await reader.forEach { span in - for i in span.indices { - sum += span[i] - } - } - - #expect(sum == 60) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func forEachWithEmptyReader() async throws { - let reader = [Int]().asyncReader() - var callCount = 0 - - await reader.forEach { span in - callCount += 1 - } - - #expect(callCount == 0) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func forEachWithThrowingBody() async { - enum TestError: Error { - case failed - } - - let reader = [1, 2, 3].asyncReader() - - do { - try await reader.forEach { (span) throws(TestError) -> Void in - throw TestError.failed - } - Issue.record("Expected error to be thrown") - } catch { - #expect(error == EitherError.second(TestError.failed)) - } - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func forEachWithNeverFailingReader() async { - enum TestError: Error { - case failed - } - - let reader = [1, 2, 3].asyncReader() - var count = 0 - - do { - try await reader.forEach { (span) throws(TestError) -> Void in - count += span.count - } - } catch { - Issue.record("No error should be thrown from reader") - } - - #expect(count == 3) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func forEachWithAsyncWork() async throws { - let reader = [1, 2, 3].asyncReader() - var results: [Int] = [] - - await reader.forEach { span in - await Task.yield() - for i in span.indices { - results.append(span[i]) - } - } - - #expect(results == [1, 2, 3]) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func forEachMultipleSpans() async { - var reader = [1, 2, 3, 4, 5, 6].asyncReader() - var spanCounts: [Int] = [] - - // Force reading in smaller chunks - while true { - let hasMore = try! await reader.read(maximumCount: 2) { span in - if span.count > 0 { - spanCounts.append(span.count) - return true - } - return false - } - if !hasMore { - break - } - } - - #expect(spanCounts == [2, 2, 2]) - } -} diff --git a/Tests/AsyncStreamingTests/Reader/AsyncReader+mapTests.swift b/Tests/AsyncStreamingTests/Reader/AsyncReader+mapTests.swift deleted file mode 100644 index 9f7df49..0000000 --- a/Tests/AsyncStreamingTests/Reader/AsyncReader+mapTests.swift +++ /dev/null @@ -1,126 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -#if !canImport(Darwin) || swift(>=6.3) // Disabled on older compilers on Darwin due to a runtime crash -import AsyncStreaming -import Testing - -@Suite -struct AsyncReaderMapTests { - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func mapTransformsElements() async throws { - let reader = [1, 2, 3, 4, 5].asyncReader() - let mappedReader = reader.map { $0 * 2 } - - var results: [Int] = [] - await mappedReader.forEach { span in - for i in span.indices { - results.append(span[i]) - } - } - - #expect(results == [2, 4, 6, 8, 10]) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func mapWithTypeConversion() async throws { - let reader = [1, 2, 3].asyncReader() - let mappedReader = reader.map { String($0) } - - var results: [String] = [] - await mappedReader.forEach { span in - for i in span.indices { - results.append(span[i]) - } - } - - #expect(results == ["1", "2", "3"]) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func mapEmptyReader() async throws { - let reader = [Int]().asyncReader() - let mappedReader = reader.map { $0 * 2 } - - var count = 0 - await mappedReader.forEach { span in - count += span.count - } - - #expect(count == 0) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func mapWithAsyncTransformation() async throws { - let reader = [1, 2, 3].asyncReader() - let mappedReader = reader.map { value in - // Simulate async work - await Task.yield() - return value * 10 - } - - var results: [Int] = [] - await mappedReader.forEach { span in - for i in span.indices { - results.append(span[i]) - } - } - - #expect(results == [10, 20, 30]) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func mapPreservesChunking() async { - let reader = [1, 2, 3, 4, 5, 6].asyncReader() - var mappedReader = reader.map { $0 + 100 } - - // Read in chunks - var chunks: [[Int]] = [] - while true { - let chunk = try! await mappedReader.read(maximumCount: 2) { span in - return Array(span) - } - if chunk.isEmpty { - break - } - chunks.append(chunk) - } - - #expect(chunks == [[101, 102], [103, 104], [105, 106]]) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func mapChaining() async throws { - let reader = [1, 2, 3].asyncReader() - let mappedReader = - reader - .map { $0 * 2 } - .map { $0 + 10 } - - var results: [Int] = [] - await mappedReader.forEach { span in - for i in span.indices { - results.append(span[i]) - } - } - - #expect(results == [12, 14, 16]) - } -} -#endif diff --git a/Tests/AsyncStreamingTests/Reader/AsyncReaderTests.swift b/Tests/AsyncStreamingTests/Reader/AsyncReaderTests.swift deleted file mode 100644 index 6839bfe..0000000 --- a/Tests/AsyncStreamingTests/Reader/AsyncReaderTests.swift +++ /dev/null @@ -1,94 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import AsyncStreaming -import BasicContainers -import Testing - -@Suite -struct AsyncReaderTests { - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func readWithMaximumCount() async { - var reader = SimpleReader(data: [1, 2, 3, 4, 5]) - - let result = try! await reader.read(maximumCount: 3) { span in - return Array(span) - } - - #expect(result == [1, 2, 3]) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func readWithoutMaximumCount() async { - var reader = SimpleReader(data: [1, 2, 3, 4, 5]) - - let result = try! await reader.read(maximumCount: nil) { span in - return Array(span) - } - - #expect(result == [1, 2, 3, 4, 5]) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func readEmptySpanAtEnd() async { - var reader = SimpleReader(data: [1, 2, 3]) - - // Read all data - _ = try! await reader.read(maximumCount: nil) { span in - return Array(span) - } - - // Next read should return empty span - let result = try! await reader.read(maximumCount: nil) { span in - return span.count - } - - #expect(result == 0) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func readMultipleChunks() async { - var reader = SimpleReader(data: [1, 2, 3, 4, 5, 6]) - var chunks: [[Int]] = [] - - while true { - let chunk = try! await reader.read(maximumCount: 2) { span in - return Array(span) - } - if chunk.isEmpty { - break - } - chunks.append(chunk) - } - - #expect(chunks == [[1, 2], [3, 4], [5, 6]]) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func readIntoCopyableElements() async { - var reader = SimpleReader(data: [1, 2, 3, 4, 5]) - var buffer = RigidArray() - buffer.reserveCapacity(5) - - await buffer.append(count: 5) { outputSpan in - await reader.read(into: &outputSpan) - } - - #expect(buffer.count == 5) - } -} diff --git a/Tests/AsyncStreamingTests/Reader/ConcludingAsyncReaderTests.swift b/Tests/AsyncStreamingTests/Reader/ConcludingAsyncReaderTests.swift deleted file mode 100644 index 37c908d..0000000 --- a/Tests/AsyncStreamingTests/Reader/ConcludingAsyncReaderTests.swift +++ /dev/null @@ -1,99 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import AsyncStreaming -import Testing - -@Suite -struct ConcludingAsyncReaderTests { - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func consumeAndConcludeReturnsResult() async throws { - let reader = TestConcludingReader(data: [1, 2, 3, 4, 5]) - - let (result, finalElement) = await reader.consumeAndConclude { reader in - let reader = reader - var sum = 0 - await reader.forEach { span in - for i in span.indices { - sum += span[i] - } - } - return sum - } - - #expect(result == 15) - #expect(finalElement == 5) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func consumeAndConcludeWithEmptyReader() async throws { - let reader = TestConcludingReader(data: []) - - let (result, finalElement) = await reader.consumeAndConclude { reader in - let reader = reader - var count = 0 - await reader.forEach { span in - count += span.count - } - return count - } - - #expect(result == 0) - #expect(finalElement == 0) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func collectReturnsResultAndFinal() async { - let reader = TestConcludingReader(data: [10, 20, 30]) - - let (collected, finalElement) = try! await reader.collect(upTo: 10) { span in - return Array(span) - } - - #expect(collected == [10, 20, 30]) - #expect(finalElement == 3) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func collectEmptyConcludingReader() async { - let reader = TestConcludingReader(data: []) - - let (collected, finalElement) = try! await reader.collect(upTo: 10) { span in - return Array(span) - } - - #expect(collected == []) - #expect(finalElement == 0) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func collectProcessesAllElements() async { - let reader = TestConcludingReader(data: [1, 2, 3, 4]) - - let (sum, finalElement) = try! await reader.collect(upTo: 10) { span in - var total = 0 - for i in span.indices { - total += span[i] - } - return total - } - - #expect(sum == 10) - #expect(finalElement == 4) - } -} diff --git a/Tests/AsyncStreamingTests/Writer/AsyncWriter+AsyncReaderTests.swift b/Tests/AsyncStreamingTests/Writer/AsyncWriter+AsyncReaderTests.swift deleted file mode 100644 index e111aa9..0000000 --- a/Tests/AsyncStreamingTests/Writer/AsyncWriter+AsyncReaderTests.swift +++ /dev/null @@ -1,132 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift HTTP API Proposal open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -#if !canImport(Darwin) || swift(>=6.3) // Disabled on older compilers on Darwin due to a runtime crash -import AsyncStreaming -import Testing - -@Suite -struct AsyncWriterAsyncReaderTests { - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func writeReaderToWriter() async { - let reader = [1, 2, 3, 4, 5].asyncReader() - var writer = TestWriter() - - try! await writer.write(reader) - - #expect(writer.storage == [1, 2, 3, 4, 5]) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func writeEmptyReaderToWriter() async { - let reader = [Int]().asyncReader() - var writer = TestWriter() - - try! await writer.write(reader) - - #expect(writer.storage == []) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func writeLargeReaderToWriter() async { - let data = Array(1...100) - let reader = data.asyncReader() - var writer = TestWriter() - - try! await writer.write(reader) - - #expect(writer.storage == data) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func writeReaderStreamingBehavior() async { - // Create a reader that will produce multiple spans - struct ChunkedReader: AsyncReader { - typealias ReadElement = Int - typealias ReadFailure = Never - - var data: [Int] - var position: Int = 0 - let chunkSize: Int - - mutating func read( - maximumCount: Int?, - body: (consuming Span) async throws(Failure) -> Return - ) async throws(EitherError) -> Return { - do { - guard position < data.count else { - return try await body([Int]().span) - } - - let count = min(chunkSize, data.count - position) - let endIndex = position + count - defer { position = endIndex } - return try await body(data[position.. - public mutating func read( - maximumCount: Int?, - body: nonisolated(nonsending) (consuming Span) async throws(F) -> Return + public mutating func read( + body: nonisolated(nonsending) (inout UniqueArray) async throws(F) -> Return ) async throws(EitherError) -> Return { let element: Element? do { @@ -31,17 +32,13 @@ extension MultiProducerSingleConsumerAsyncChannel: AsyncReader { throw .first(error) } - do { - guard let element else { - return try await body(InlineArray<0, Element>.zero().span) - } + var buffer = UniqueArray() + if let element { + buffer.append(element) + } - return try await body( - InlineArray< - 1, - Element - >.one(value: element).span - ) + do { + return try await body(&buffer) } catch { throw .second(error) } @@ -52,28 +49,27 @@ extension MultiProducerSingleConsumerAsyncChannel: AsyncReader { extension MultiProducerSingleConsumerAsyncChannel.Source: AsyncWriter where Element == UInt8 { public typealias WriteElement = Element public typealias WriteFailure = any Error + public typealias Buffer = UniqueArray - public mutating func write( - _ body: nonisolated(nonsending) (inout OutputSpan) async throws(F) -> Result - ) async throws(EitherError) -> Result { - var buffer = RigidArray(capacity: 1) - let result: Result + public mutating func write( + _ body: nonisolated(nonsending) (inout UniqueArray) async throws(F) -> Return + ) async throws(EitherError) -> Return { + var buffer = UniqueArray() + let result: Return do { - result = try await buffer.append(count: 1) { outputSpan async throws(F) -> Result in - try await body(&outputSpan) - } + result = try await body(&buffer) } catch { throw .second(error) } - if buffer.count == 1 { + var consumer = buffer.consumeAll() + while let element = consumer.next() { do { - try await self.send(buffer.removeLast()) + try await self.send(element) } catch { throw .first(error) } } return result } - }