diff --git a/Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift b/Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift index cae536c3..3eaf9dce 100644 --- a/Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift +++ b/Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift @@ -27,9 +27,11 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Cop /// Collects elements from the reader up to a specified limit and processes them. /// /// This method continuously reads elements from the async reader, accumulating them in an - /// internal buffer until either it reaches the end of the stream or the specified limit. + /// internal buffer until either the reader signals end-of-stream (by delivering a + /// non-`nil` ``AsyncReader/FinalElement``) or the specified limit is reached. /// Once collection completes, it passes the accumulated elements to the provided body - /// closure as an `InputSpan` for processing. + /// closure as an `InputSpan` for processing, and returns the body's result together + /// with the ``AsyncReader/FinalElement``. /// /// - Parameters: /// - limit: The maximum number of elements to collect. This prevents unbounded memory @@ -37,7 +39,8 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Cop /// - body: A closure that receives an `InputSpan` containing all collected elements and returns /// a result of type `Result`. /// - /// - Returns: The value returned by the body closure after processing the collected elements. + /// - Returns: A tuple of the body closure's result and the ``AsyncReader/FinalElement`` + /// delivered with the terminal chunk. /// /// - Throws: An `EitherError` wrapping either a read failure (which itself may be an /// ``AsyncReaderLeftOverElementsError`` if the reader produces more elements than the limit), @@ -48,32 +51,38 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Cop /// ```swift /// var reader: SomeAsyncReader = ... /// - /// let processedData = try await reader.collect(upTo: 1000) { span in + /// let (processedData, _) = try await reader.collect(upTo: 1000) { span in /// // Process all collected elements /// } /// ``` - public mutating func collect( + // TODO: We should make this method take an inout `RangeReplacableCollection` instead + public consuming func collect( upTo limit: Int, body: (consuming InputSpan) async throws(Failure) -> Result - ) async throws(EitherError, Failure>) -> Result { + ) async throws(EitherError, Failure>) -> ( + Result, FinalElement + ) { + var reader = self // TODO: In the future we might want to use a temporary allocation instead // but those don't support async closures yet. var collectedBuffer = UniqueArray() collectedBuffer.reserveCapacity(limit) - var shouldContinue = true + var finalElement: FinalElement? = nil do { - while shouldContinue { - try await self.read { (buffer: inout Buffer) throws(AsyncReaderLeftOverElementsError) -> Void in - guard buffer.count > 0 else { - shouldContinue = false - return - } - if limit - collectedBuffer.count < buffer.count { - throw AsyncReaderLeftOverElementsError() + while finalElement == nil { + try await reader.read { + (buffer: inout Buffer, final: FinalElement?) throws(AsyncReaderLeftOverElementsError) -> Void in + if buffer.count > 0 { + if limit - collectedBuffer.count < buffer.count { + throw AsyncReaderLeftOverElementsError() + } + var consumer = buffer.consumeAll() + while let element = consumer.next() { + collectedBuffer.append(element) + } } - var consumer = buffer.consumeAll() - while let element = consumer.next() { - collectedBuffer.append(element) + if let final { + finalElement = final } } } @@ -82,11 +91,36 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Cop } do { var consumer = collectedBuffer.consumeAll() - return try await body(consumer.drainNext()) + let result = try await body(consumer.drainNext()) + // The force-unwrap is safe since final element must be set at this point + return (result, finalElement!) } catch { throw .second(error) } } } +@available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, *) +extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable, FinalElement == Void { + /// Collects elements from the reader up to a specified limit and processes them. + /// + /// This overload is available when ``AsyncReader/FinalElement`` is `Void`. + /// It returns only the body closure's result — there is no payload to surface. + /// + /// - Parameters: + /// - limit: The maximum number of elements to collect. + /// - body: A closure that receives an `InputSpan` of collected elements. + /// - Returns: The body closure's result. + /// - Throws: An `EitherError` wrapping either a read failure (possibly an + /// ``AsyncReaderLeftOverElementsError``) or a `Failure` from `body`. + // TODO: We should make this method take an inout `RangeReplacableCollection` instead + public consuming func collect( + upTo limit: Int, + body: (consuming InputSpan) async throws(Failure) -> Result + ) async throws(EitherError, Failure>) -> Result { + let (result, _): (Result, Void?) = try await self.collect(upTo: limit, body: body) + return result + } +} + #endif diff --git a/Sources/AsyncStreaming/AsyncReader/AsyncReader+forEach.swift b/Sources/AsyncStreaming/AsyncReader/AsyncReader+forEach.swift index b8b20c5c..5364e048 100644 --- a/Sources/AsyncStreaming/AsyncReader/AsyncReader+forEach.swift +++ b/Sources/AsyncStreaming/AsyncReader/AsyncReader+forEach.swift @@ -11,87 +11,89 @@ #if UnstableAsyncStreaming && compiler(>=6.4) -public import ContainersPreview +import ContainersPreview // swift-format-ignore: AmbiguousTrailingClosureOverload @available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, *) extension AsyncReader where Self: ~Copyable, Self: ~Escapable { - /// Iterates over all chunks from the reader, executing the provided body for each buffer. + /// Iterates over all chunks from the reader, executing the provided body for + /// each buffer until the stream signals end-of-stream. /// - /// This method continuously reads chunks from the async reader until the stream ends, - /// executing the provided closure for each buffer of elements read. The iteration terminates - /// when the reader produces an empty buffer, indicating the end of the stream. - /// - /// - Parameter body: An asynchronous closure that processes each buffer of elements read - /// from the stream. - /// - /// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation - /// or a `Failure` from the body closure. + /// This method continuously reads chunks from the async reader, executing + /// `body` for every chunk — including the terminal one — and terminates the + /// loop when the reader delivers a non-`nil` ``AsyncReader/FinalElement``. + /// The returned value is that ``AsyncReader/FinalElement``. /// /// ## Example /// /// ```swift /// var fileReader: FileAsyncReader = ... /// - /// try await fileReader.forEachBuffer { buffer in + /// _ = try await fileReader.forEachBuffer { buffer in /// print("Processing \(buffer.count) elements") /// } /// ``` + /// + /// - Parameter body: An asynchronous closure that processes each buffer of + /// elements read from the stream. + /// - Returns: The ``AsyncReader/FinalElement`` delivered with the terminal + /// chunk, or `nil` if none was observed. + /// - Throws: An `EitherError` containing either a `ReadFailure` from the + /// read operation or a `Failure` from the body closure. public consuming func forEachBuffer( body: (inout Buffer) async throws(Failure) -> Void - ) async throws(EitherError) { - var shouldContinue = true - while shouldContinue { - try await self.read { (next) throws(Failure) -> Void in - guard next.count > 0 else { - shouldContinue = false - return - } - + ) async throws(EitherError) -> FinalElement? { + var final: FinalElement? = nil + var done = false + while !done { + try await self.read { (next, finalElement) throws(Failure) -> Void in try await body(&next) + if let finalElement { + final = finalElement + done = true + } } } + return final } - /// Iterates over all chunks from a non-failing reader, executing the provided body for each buffer. - /// - /// This method continuously reads chunks from the async reader until the stream ends, - /// executing the provided closure for each buffer of elements read. The iteration terminates - /// when the reader produces an empty buffer, indicating the end of the stream. + /// Iterates over all chunks from a non-failing reader, executing the + /// provided body for each buffer until the stream signals end-of-stream. /// /// Use this overload when the reader's ``AsyncReader/ReadFailure`` type is `Never`. /// - /// - Parameter body: An asynchronous closure that processes each buffer of elements read - /// from the stream. - /// /// ## Example /// /// ```swift /// var fileReader: FileAsyncReader = ... /// - /// await fileReader.forEachBuffer { buffer in + /// _ = await fileReader.forEachBuffer { buffer in /// print("Processing \(buffer.count) elements") /// } /// ``` + /// + /// - Parameter body: An asynchronous closure that processes each buffer of + /// elements read from the stream. + /// - Returns: The ``AsyncReader/FinalElement`` delivered with the terminal chunk. @inlinable public consuming func forEachBuffer( body: (inout Buffer) async -> Void - ) async where ReadFailure == Never { - var shouldContinue = true - while shouldContinue { + ) async -> FinalElement where ReadFailure == Never { + var finalElement: FinalElement? = nil + while finalElement == nil { do { - try await self.read { (next) -> Void in - guard next.count > 0 else { - shouldContinue = false - return - } - + try await self.read { (next, final) -> Void in await body(&next) + if let final { + finalElement = final + } } } catch { fatalError() } } + // The force-unwrap is safe since final element must be set at this point + return finalElement! } } #endif diff --git a/Sources/AsyncStreaming/AsyncReader/AsyncReader+pipe.swift b/Sources/AsyncStreaming/AsyncReader/AsyncReader+pipe.swift index 71043873..3b283d62 100644 --- a/Sources/AsyncStreaming/AsyncReader/AsyncReader+pipe.swift +++ b/Sources/AsyncStreaming/AsyncReader/AsyncReader+pipe.swift @@ -13,92 +13,150 @@ import ContainersPreview -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) +// TODO: The `Writer` generic parameter on every `pipe` variant in this file +// should additionally be constrained `& ~Escapable`. We currently can't express +// that because of a Swift lifetime-checker limitation: with `FinalElement: +// ~Copyable`, the `consuming FinalElement?` parameter on the read closure +// changes the closure's lifetime category, and capturing a `~Escapable Writer` +// inside that closure (which `pipe` does, via the `writerOpt` Optional that +// alternates between `write` and `finish` calls) trips +// "lifetime-dependent variable 'writer' escapes its scope". When that +// restriction is relaxed (or `pipe` is restructured to avoid capturing the +// writer across the read closure boundary) the constraint should be added +// back so `pipe` works for `~Escapable` writers too. + +@available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, *) extension AsyncReader where Self: ~Copyable, Self: ~Escapable, Self.ReadElement: ~Copyable { - /// Pipes all elements from this reader into the given writer. + /// Pipes all elements from this reader into the given writer, fusing the + /// terminal chunk with a `finish` on the writer. /// - /// This method consumes the reader and writes all of its elements into the writer's - /// destination. It iterates over each buffer the reader produces and hands it to the - /// writer until this reader's stream ends. + /// Consumes both the reader and the writer. Each chunk the reader produces is + /// forwarded with `writer.write(buffer:)` until the reader signals + /// end-of-stream by delivering a non-`nil` ``AsyncReader/FinalElement``. The + /// terminal chunk is fused with the writer's ``CallerAsyncWriter/finish(buffer:finalElement:)``. /// /// ## Example /// /// ```swift /// let dataReader: DataAsyncReader = ... - /// var fileWriter: FileCallerAsyncWriter = ... + /// let fileWriter: FileCallerAsyncWriter = ... /// - /// // Copy all data from reader to writer - /// try await dataReader.pipe(into: &fileWriter) + /// // Copy all data from reader to writer and finish the writer. + /// try await dataReader.pipe(into: fileWriter) /// ``` /// - /// - Parameter writer: A ``CallerAsyncWriter`` to receive the elements. The writer is - /// mutated in place and remains usable after this operation. + /// - Parameter writer: A ``CallerAsyncWriter`` to receive the elements. The + /// writer is consumed; its ``CallerAsyncWriter/finish(buffer:finalElement:)`` + /// method is called with the reader's terminal chunk and ``AsyncReader/FinalElement``. /// /// - Throws: An error originating from the read or write operations. public consuming func pipe( - into writer: inout Writer + into writer: consuming Writer ) async throws(EitherError) - where Writer: CallerAsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement { - try await self.forEachBuffer { (buffer: inout Buffer) throws(Writer.WriteFailure) in - try await writer.write(buffer: &buffer) + where + Writer: CallerAsyncWriter & ~Copyable, + Writer.WriteElement == ReadElement, + Writer.FinalElement == FinalElement + { + var reader = self + var writerOpt: Writer? = .some(writer) + var done = false + while !done { + try await reader.read { (buffer: inout Buffer, finalElement: FinalElement?) throws(Writer.WriteFailure) -> Void in + if let finalElement { + let w = writerOpt.take()! + try await w.finish(buffer: &buffer, finalElement: finalElement) + done = true + } else { + try await writerOpt!.write(buffer: &buffer) + } + } } } - /// Pipes all elements from this reader into the given writer, copying each element from the reader's - /// buffer into the writer's buffer. + /// Pipes all elements from this reader into the given writer, copying each + /// element from the reader's buffer into the writer's buffer. + /// + /// Consumes both the reader and the writer. Because both protocols supply + /// their own buffer, each element is transferred between them. The writer's + /// buffer may be smaller than the reader's, in which case multiple `write` + /// calls are issued per chunk produced by the reader. /// - /// This method consumes the reader and writes all of its elements into the writer's - /// destination. Because both protocols supply their own buffer, each element must be - /// transferred from the reader's buffer into the writer's buffer. The writer's buffer - /// may be smaller than the reader's, in which case multiple `write` calls are issued - /// per chunk produced by the reader. + /// On the terminal chunk this method drains all bytes through `write` calls + /// first and then calls ``AsyncWriter/finish(finalElement:)`` carrying + /// only the ``AsyncReader/FinalElement`` payload. /// /// ## Example /// /// ```swift /// let dataReader: DataAsyncReader = ... - /// var fileWriter: FileAsyncWriter = ... + /// let fileWriter: FileAsyncWriter = ... /// - /// // Copy all data from reader to writer - /// try await dataReader.pipe(copyingInto: &fileWriter) + /// try await dataReader.pipe(copyingInto: fileWriter) /// ``` /// - /// - Parameter writer: An ``AsyncWriter`` to receive the elements. The writer is - /// mutated in place and remains usable after this operation. + /// - Parameter writer: An ``AsyncWriter`` to receive the elements. The + /// writer is consumed; its ``AsyncWriter/finish(finalElement:)`` method + /// is called with the reader's ``AsyncReader/FinalElement`` payload. /// /// - Throws: An error originating from the read or write operations. public consuming func pipe( - copyingInto writer: inout Writer + copyingInto writer: consuming Writer ) async throws(EitherError) - where Writer: AsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement { - try await self.forEachBuffer { (readerBuffer: inout Buffer) throws(Writer.WriteFailure) in - var consumer = readerBuffer.consumeAll() - while let firstElement = consumer.next() { - var pending: ReadElement? = firstElement - do throws(EitherError) { - try await writer.write { (writerBuffer: inout Writer.Buffer) in - switch consume pending { - case .some(let element): - writerBuffer.append(element) - case .none: - break - } - pending = nil - // TODO: We should check if we can use one of the append methods instead of - // element by element copies in the future - while writerBuffer.freeCapacity > 0 { - guard let element = consumer.next() else { return } - writerBuffer.append(element) - } + where + Writer: AsyncWriter & ~Copyable, + Writer.WriteElement == ReadElement, + Writer.FinalElement == FinalElement + { + var reader = self + var writerOpt: Writer? = .some(writer) + var done = false + while !done { + try await reader.read { + (readerBuffer: inout Buffer, finalElement: FinalElement?) throws(Writer.WriteFailure) -> Void in + try await Self.drain(readerBuffer: &readerBuffer, into: &writerOpt!) + if let finalElement { + let w = writerOpt.take()! + try await w.finish(finalElement: finalElement) + done = true + } + } + } + } + + /// Drains `readerBuffer` into `writer` across as many `write` calls as + /// required to move every element. Used by ``pipe(copyingInto:)`` to share + /// the multi-write loop between mid-stream and terminal chunks. + private static func drain( + readerBuffer: inout Buffer, + into writer: inout Writer + ) async throws(Writer.WriteFailure) + where + Writer: AsyncWriter & ~Copyable, + Writer.WriteElement == ReadElement + { + var consumer = readerBuffer.consumeAll() + while let firstElement = consumer.next() { + var pending: ReadElement? = firstElement + do throws(EitherError) { + try await writer.write { (writerBuffer: inout Writer.Buffer) in + switch consume pending { + case .some(let element): + writerBuffer.append(element) + case .none: + break } - } catch { - switch error { - case .first(let writeFailure): - throw writeFailure - case .second: - fatalError("Unreachable") + pending = nil + while writerBuffer.freeCapacity > 0 { + guard let element = consumer.next() else { return } + writerBuffer.append(element) } } + } catch { + switch error { + case .first(let writeFailure): throw writeFailure + case .second: fatalError("Unreachable") + } } } } diff --git a/Sources/AsyncStreaming/AsyncReader/AsyncReader.swift b/Sources/AsyncStreaming/AsyncReader/AsyncReader.swift index 6634ecf8..ceabc60f 100644 --- a/Sources/AsyncStreaming/AsyncReader/AsyncReader.swift +++ b/Sources/AsyncStreaming/AsyncReader/AsyncReader.swift @@ -17,42 +17,74 @@ public import ContainersPreview /// Adopt ``AsyncReader`` when you need callee-managed buffering, /// where the reader controls the buffer and passes it to the caller /// through the `body` closure. +/// +/// ## Signaling end of stream +/// +/// The reader signals end-of-stream by passing a non-`nil` value for the +/// `finalElement` parameter of the `body` closure. This call may also carry a +/// final chunk of elements in the buffer, allowing the reader to fuse the last +/// chunk with the end signal in a single operation. +/// +/// The ``FinalElement`` associated type controls what data, if any, the reader +/// delivers alongside the end signal. The default is `Void`, which means the +/// signal carries no payload. Set ``FinalElement`` to a custom type when the +/// reader needs to deliver structured data with the terminator. Set it to `Never` to indicate +/// that the stream never ends — the `finalElement` parameter will always be `nil`. +/// +/// After the reader has emitted a non-`nil` `finalElement`, calling +/// ``read(body:)`` again is a programmer error. @available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, *) -public protocol AsyncReader: ~Copyable, ~Escapable { +public protocol AsyncReader: ~Copyable, ~Escapable { /// The type of elements this reader reads. + // TODO: Check if we should support ~Escapable elements associatedtype ReadElement: ~Copyable /// The container type the reader uses to pass elements to the caller. + // TODO: Check if we should support ~Escapable buffer associatedtype Buffer: RangeReplaceableContainer & ~Copyable /// The error type that reading operations throw. associatedtype ReadFailure: Error + /// The data the reader delivers alongside the end-of-stream signal. + /// + /// Defaults to `Void`. Use a custom type to carry data along with the + /// end signal. Use `Never` for streams that never end. + // TODO: Check if we should support ~Escapable final elements + associatedtype FinalElement: ~Copyable = Void + /// Reads elements from the underlying source and passes them to the provided body closure. /// /// This method asynchronously reads elements from the source into a buffer, - /// then passes the buffer to `body` for processing. When the buffer is empty, - /// the stream has ended. + /// then passes the buffer and an optional `finalElement` to `body` for + /// processing. + /// + /// A `nil` value for `finalElement` means more data may follow. A non-`nil` + /// value (which is the only way a stream of `FinalElement == Void` signals + /// end) marks this chunk as the last one and delivers the final payload. + /// The terminal chunk's buffer may be empty or contain a final batch of + /// elements; the caller must process both. + /// + /// After the reader has emitted a non-`nil` `finalElement`, calling + /// ``read(body:)`` again is a programmer error. /// /// ```swift /// var fileReader: FileAsyncReader = ... /// - /// let result = try await fileReader.read { buffer in - /// guard buffer.count > 0 else { - /// return 0 - /// } - /// return buffer.count + /// let result = try await fileReader.read { buffer, finalElement in + /// let processed = buffer.count + /// return (processed, finalElement != nil) /// } /// ``` /// /// - Parameter body: A closure that receives a mutable reference to the buffer - /// of read elements and returns a value of type `Return`. When the buffer - /// is empty, it indicates the end of the stream. + /// of read elements together with the optional end-of-stream payload and + /// returns a value of type `Return`. /// - Returns: The value the body closure returns after processing the read elements. /// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation /// or a `Failure` from the body closure. mutating func read( - body: (inout Buffer) async throws(Failure) -> Return + body: (inout Buffer, consuming FinalElement?) async throws(Failure) -> Return ) async throws(EitherError) -> Return } #endif diff --git a/Sources/AsyncStreaming/AsyncWriter/AsyncWriter.swift b/Sources/AsyncStreaming/AsyncWriter/AsyncWriter.swift index 3f59a23c..b4cf3668 100644 --- a/Sources/AsyncStreaming/AsyncWriter/AsyncWriter.swift +++ b/Sources/AsyncStreaming/AsyncWriter/AsyncWriter.swift @@ -17,31 +17,48 @@ public import ContainersPreview /// Adopt ``AsyncWriter`` when you need callee-managed buffering, /// where the writer supplies a buffer that the caller fills /// with elements to write. +/// +/// ## Signaling end of stream +/// +/// The writer is terminated by a call to ``finish(finalElement:)``. +/// Bulk transfer happens through ``write(_:)`` calls; ``finish(finalElement:)`` +/// only carries the ``FinalElement`` payload. +/// +/// The ``FinalElement`` associated type controls what data, if any, the writer +/// transmits alongside the end signal. The default is `Void`. Use a custom +/// type to carry data along with the end signal, or `Never` for endless +/// streams. When ``FinalElement`` is `Never`, ``finish(finalElement:)`` cannot +/// be called and the writer can be written to indefinitely. @available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, *) -public protocol AsyncWriter: ~Copyable, ~Escapable { +public protocol AsyncWriter: ~Copyable, ~Escapable { /// The type of elements this writer writes. + // TODO: Check if we should support ~Escapable elements associatedtype WriteElement: ~Copyable /// The container type the writer uses to receive elements from the caller. + // TODO: Check if we should support ~Escapable buffer associatedtype Buffer: RangeReplaceableContainer & ~Copyable /// The error type that writing operations throw. associatedtype WriteFailure: Error + /// The data the writer delivers alongside the end-of-stream signal. + /// + /// Defaults to `Void`. Use a custom type to carry data along with the end + /// signal. + // TODO: Check if we should support ~Escapable final element + associatedtype FinalElement: ~Copyable = Void + /// Provides a buffer for writing elements to the destination. /// - /// The writer supplies a buffer that `body` uses to append elements. - /// The writer manages the buffer allocation and handles the writing - /// operation once `body` completes. + /// The writer supplies a buffer, sized by the implementation, that + /// `body` uses to append elements. The writer manages the buffer + /// allocation and handles the writing operation once `body` completes. + /// Oversized payloads are split across multiple calls. /// /// - Parameter body: A closure that receives a buffer for appending elements /// to write. The closure returns a result of type `Return`. /// - /// - Returns: The value the body closure returns. - /// - /// - Throws: An `EitherError` containing either a `WriteFailure` from the write operation - /// or a `Failure` from the body closure. - /// /// ## Example /// /// ```swift @@ -54,8 +71,34 @@ public protocol AsyncWriter: ~Copyable, ~Escapable { /// return buffer.count /// } /// ``` + /// + /// - Returns: The value the body closure returns. + /// + /// - Throws: An `EitherError` containing either a `WriteFailure` from the write operation + /// or a `Failure` from the body closure. mutating func write( _ body: (inout Buffer) async throws(Failure) -> Return ) async throws(EitherError) -> Return + + /// Closes the writer, delivering a ``FinalElement`` payload alongside the + /// end-of-stream signal. + /// + /// - Parameter finalElement: The ``FinalElement`` payload to deliver with + /// the end signal. + /// - Throws: A ``WriteFailure`` from the underlying write operation. + consuming func finish( + finalElement: consuming FinalElement + ) async throws(WriteFailure) +} + +@available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, *) +extension AsyncWriter where Self: ~Copyable, Self: ~Escapable, FinalElement == Void { + /// Concludes the writer with no payload. + /// + /// Available only when ``FinalElement`` is `Void`. Equivalent to calling + /// ``finish(finalElement:)`` with `()`. + public consuming func finish() async throws(WriteFailure) { + try await self.finish(finalElement: ()) + } } #endif diff --git a/Sources/AsyncStreaming/CallerAsyncReader/CallerAsyncReader+pipe.swift b/Sources/AsyncStreaming/CallerAsyncReader/CallerAsyncReader+pipe.swift index 59f7caad..6bc17a6e 100644 --- a/Sources/AsyncStreaming/CallerAsyncReader/CallerAsyncReader+pipe.swift +++ b/Sources/AsyncStreaming/CallerAsyncReader/CallerAsyncReader+pipe.swift @@ -14,88 +14,124 @@ import ContainersPreview import BasicContainers -@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) +// TODO: The `Writer` generic parameter on every `pipe` variant in this file +// should additionally be constrained `& ~Escapable`. We currently can't +// express that because of a Swift lifetime-checker limitation: with +// `FinalElement: ~Copyable`, the `consuming FinalElement?` parameter on the +// reader's `read` closure changes the closure's lifetime category, and +// capturing a `~Escapable Writer` inside that closure (which `pipe` does, via +// the `writerOpt` Optional that alternates between `write` and `finish` +// calls) trips "lifetime-dependent variable 'writer' escapes its scope". +// When that restriction is relaxed (or `pipe` is restructured to avoid +// capturing the writer across the closure boundary) the constraint should be +// added back so `pipe` works for `~Escapable` writers too. + +@available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, *) extension CallerAsyncReader where Self: ~Copyable, Self: ~Escapable, Self.ReadElement: ~Copyable { - /// Pipes all elements from this reader into the given writer. + /// Pipes all elements from this reader into the given writer, then signals + /// end-of-stream with a `finish` call on the writer. /// - /// This method consumes the reader and writes all of its elements into the writer's - /// destination. It continuously reads chunks into buffers supplied by the writer and - /// flushes them until this reader's stream ends. + /// Consumes both the reader and the writer. The reader fills the writer's + /// buffer on each iteration; once the reader signals end-of-stream, the + /// writer's ``AsyncWriter/finish(finalElement:)`` is called with the reader's + /// ``CallerAsyncReader/FinalElement``. /// /// ## Example /// /// ```swift /// let dataReader: DataCallerAsyncReader = ... - /// var fileWriter: FileAsyncWriter = ... + /// let fileWriter: FileAsyncWriter = ... /// - /// // Copy all data from reader to writer - /// try await dataReader.pipe(into: &fileWriter) + /// try await dataReader.pipe(into: fileWriter) /// ``` /// - /// - Parameter writer: An ``AsyncWriter`` to receive the elements. The writer is mutated - /// in place and remains usable after this operation. + /// - Parameter writer: An ``AsyncWriter`` to receive the elements. The + /// writer is consumed. /// /// - Throws: An error originating from the read or write operations. public consuming func pipe( - into writer: inout Writer + into writer: consuming Writer ) async throws(EitherError) - where Writer: AsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement { - var shouldContinue = true - while shouldContinue { - try await writer - .write { (buffer: inout Writer.Buffer) throws(ReadFailure) in - try await self.read(into: &buffer) - if buffer.count == 0 { - shouldContinue = false - } + where + Writer: AsyncWriter & ~Copyable, + Writer.WriteElement == ReadElement, + Writer.FinalElement == FinalElement + { + var reader = self + var writerOpt: Writer? = .some(writer) + var done = false + while !done { + var pendingFinal: FinalElement? = nil + try await writerOpt! + .write { (buffer: inout Writer.Buffer) throws(ReadFailure) -> Void in + pendingFinal = try await reader.read(into: &buffer) } + if let final = pendingFinal { + let w = writerOpt.take()! + do throws(Writer.WriteFailure) { + try await w.finish(finalElement: final) + } catch { + throw .first(error) + } + done = true + } } } /// Pipes all elements from this reader into the given writer through an intermediate buffer. /// - /// This method consumes the reader and writes all of its elements into the writer's - /// destination. Because neither the reader nor the writer supplies a buffer, this - /// method allocates an intermediate buffer of the requested capacity and reuses it - /// across iterations: each iteration fills the buffer from the reader and then hands - /// it to the writer to drain. + /// Consumes both the reader and the writer. Because neither protocol supplies + /// a buffer, this method allocates an intermediate buffer of the requested + /// capacity and reuses it across iterations. The terminal chunk is fused + /// with the writer's ``CallerAsyncWriter/finish(buffer:finalElement:)``. /// /// ## Example /// /// ```swift /// let dataReader: DataCallerAsyncReader = ... - /// var fileWriter: FileCallerAsyncWriter = ... + /// let fileWriter: FileCallerAsyncWriter = ... /// - /// // Copy all data from reader to writer using a 4 KB intermediate buffer - /// try await dataReader.pipe(bufferingInto: &fileWriter, intermediateCapacity: 4096) + /// try await dataReader.pipe(bufferingInto: fileWriter, intermediateCapacity: 4096) /// ``` /// /// - Parameters: - /// - writer: A ``CallerAsyncWriter`` to receive the elements. The writer is mutated - /// in place and remains usable after this operation. - /// - intermediateCapacity: The capacity of the intermediate buffer that mediates - /// between the reader and writer. Larger values reduce the number of read and - /// write calls at the cost of memory. + /// - writer: A ``CallerAsyncWriter`` to receive the elements. The writer + /// is consumed. + /// - intermediateCapacity: The capacity of the intermediate buffer that + /// mediates between the reader and writer. /// /// - Throws: An error originating from the read or write operations. public consuming func pipe( - bufferingInto writer: inout Writer, + bufferingInto writer: consuming Writer, intermediateCapacity: Int ) async throws(EitherError) - where Writer: CallerAsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement { + where + Writer: CallerAsyncWriter & ~Copyable, + Writer.WriteElement == ReadElement, + Writer.FinalElement == FinalElement + { + var reader = self + var writerOpt: Writer? = .some(writer) var buffer = UniqueArray(minimumCapacity: intermediateCapacity) - var shouldContinue = true - while shouldContinue { + var done = false + while !done { + let final: FinalElement? do throws(ReadFailure) { - try await self.read(into: &buffer) + final = try await reader.read(into: &buffer) } catch { throw .first(error) } - if buffer.count == 0 { - shouldContinue = false - } else { + if let final { + let w = writerOpt.take()! + do throws(Writer.WriteFailure) { + try await w.finish(buffer: &buffer, finalElement: final) + } catch { + throw .second(error) + } + done = true + } else if buffer.count > 0 { do throws(Writer.WriteFailure) { - try await writer.write(buffer: &buffer) + try await writerOpt!.write(buffer: &buffer) assert(buffer.count == 0, "CallerAsyncWriter must drain the buffer during write(buffer:)") } catch { throw .second(error) diff --git a/Sources/AsyncStreaming/CallerAsyncReader/CallerAsyncReader.swift b/Sources/AsyncStreaming/CallerAsyncReader/CallerAsyncReader.swift index 4521ef7b..51b3aa66 100644 --- a/Sources/AsyncStreaming/CallerAsyncReader/CallerAsyncReader.swift +++ b/Sources/AsyncStreaming/CallerAsyncReader/CallerAsyncReader.swift @@ -17,23 +17,53 @@ public import ContainersPreview /// Adopt ``CallerAsyncReader`` when you need caller-managed buffering, /// where the caller supplies a buffer that the reader fills /// with elements. +/// +/// ## Signaling end of stream +/// +/// The reader signals end-of-stream by returning a non-`nil` ``FinalElement`` +/// from ``read(into:)``. The same call may also append a final batch of +/// elements to the caller's buffer, allowing the reader to fuse the last +/// chunk with the end signal. +/// +/// The ``FinalElement`` associated type controls what data, if any, the reader +/// delivers alongside the end signal. The default is `Void`. Use a custom type +/// to carry data along with the end signal, or `Never` for streams that never +/// end. +/// +/// After the reader has returned a non-`nil` `FinalElement`, calling +/// ``read(into:)`` again is a programmer error. @available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, *) -public protocol CallerAsyncReader: ~Copyable, ~Escapable { +public protocol CallerAsyncReader: ~Copyable, ~Escapable { /// The type of elements this reader reads. + // TODO: Check if we should support ~Escapable elements associatedtype ReadElement: ~Copyable /// The error type that reading operations throw. associatedtype ReadFailure: Error + /// The data the reader delivers alongside the end-of-stream signal. + /// + /// Defaults to `Void`. Use a custom type to carry data along with the end + /// signal, or `Never` for streams that never end. + // TODO: Check if we should support ~Escapable final element + associatedtype FinalElement: ~Copyable = Void + /// Reads elements from the source into the provided buffer. /// - /// This method appends elements into `buffer`. When the read operation - /// reaches the end of the source, it appends no elements. + /// This method appends elements into `buffer`. A non-`nil` return value + /// signals end-of-stream and delivers the final payload. The call may + /// also append a final batch of elements before signaling end. + /// + /// After the reader has returned a non-`nil` `FinalElement`, calling + /// ``read(into:)`` again is a programmer error. /// /// - Parameter buffer: The buffer to fill with read elements. + /// - Returns: A non-`nil` ``FinalElement`` if this call delivered the + /// end-of-stream signal; `nil` if more elements may follow. /// - Throws: A `ReadFailure` from the underlying read operation. + // TODO: Check if we should support ~Escapable buffer mutating func read & ~Copyable>( into buffer: inout Buffer - ) async throws(ReadFailure) where Buffer.Element: ~Copyable + ) async throws(ReadFailure) -> FinalElement? where Buffer.Element: ~Copyable } #endif diff --git a/Sources/AsyncStreaming/CallerAsyncWriter/CallerAsyncWriter.swift b/Sources/AsyncStreaming/CallerAsyncWriter/CallerAsyncWriter.swift index d894d350..9b4ec269 100644 --- a/Sources/AsyncStreaming/CallerAsyncWriter/CallerAsyncWriter.swift +++ b/Sources/AsyncStreaming/CallerAsyncWriter/CallerAsyncWriter.swift @@ -11,28 +11,49 @@ #if UnstableAsyncStreaming && compiler(>=6.4) public import ContainersPreview +import BasicContainers /// Writes elements asynchronously from a caller-provided buffer. /// /// Adopt ``CallerAsyncWriter`` when you need caller-managed buffering, /// where the caller provides a buffer of elements for the writer /// to consume. +/// +/// ## Signaling end of stream +/// +/// The writer is terminated by a call to ``finish(buffer:finalElement:)``. +/// The `finish` call communicates a final buffer (if any) and the +/// ``FinalElement`` payload, allowing implementations to fuse the last data +/// frame with the end signal on transports that support it. +/// +/// The ``FinalElement`` associated type controls what data, if any, the writer +/// transmits alongside the end signal. The default is `Void`. Use a custom +/// type to carry data along with the end signal, or `Never` for endless +/// streams. When ``FinalElement`` is `Never`, ``finish(buffer:finalElement:)`` +/// cannot be called and the writer can be written to indefinitely. +/// +/// Conformers must accept zero, one, or many `write(buffer:)` calls, optionally +/// followed by a single `finish(buffer:finalElement:)` call. After `finish` +/// returns, the writer is consumed and no further calls are valid. @available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, *) -public protocol CallerAsyncWriter: ~Copyable, ~Escapable { +public protocol CallerAsyncWriter: ~Copyable, ~Escapable { /// The type of elements this writer writes. + // TODO: Check if we should support ~Escapable elements associatedtype WriteElement: ~Copyable /// The error type that writing operations throw. associatedtype WriteFailure: Error - /// Writes elements from the provided buffer to the underlying destination. - /// - /// This method asynchronously writes all elements from the provided buffer to the destination - /// the writer represents. + /// The data the writer delivers alongside the end-of-stream signal. /// - /// - Parameter buffer: The buffer of elements to write. + /// Defaults to `Void`. + // TODO: Check if we should support ~Escapable final element + associatedtype FinalElement: ~Copyable = Void + + /// Writes elements from the provided buffer to the underlying destination. /// - /// - Throws: A `WriteFailure` from the underlying write operation. + /// This method asynchronously writes all elements from the provided buffer + /// to the underlying destination. /// /// ## Example /// @@ -42,8 +63,42 @@ public protocol CallerAsyncWriter: ~Copyable, ~Escap /// /// try await fileWriter.write(buffer: &data) /// ``` + /// + /// - Parameter buffer: The buffer of elements to write. + /// + /// - Throws: A `WriteFailure` from the underlying write operation. mutating func write & ~Copyable>( buffer: inout Buffer ) async throws(WriteFailure) where Buffer.Element: ~Copyable + + /// Sends the final buffer and ``FinalElement`` payload, and signals + /// end-of-stream to the destination. + /// + /// The buffer may be empty if there is no remaining content to emit + /// alongside the terminator. When ``FinalElement`` is `Void`, use the + /// closure-less ``finish()`` convenience instead of passing `()` explicitly. + /// + /// - Parameters: + /// - buffer: The buffer of remaining elements to write alongside the + /// terminator. + /// - finalElement: The ``FinalElement`` payload to deliver with the end + /// signal. + /// - Throws: A `WriteFailure` from the underlying write operation. + consuming func finish & ~Copyable>( + buffer: inout Buffer, + finalElement: consuming FinalElement + ) async throws(WriteFailure) where Buffer.Element: ~Copyable +} + +@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) +extension CallerAsyncWriter where Self: ~Copyable, Self: ~Escapable, FinalElement == Void { + /// Concludes the writer with no final buffer and no extra payload. + /// + /// Available only when ``FinalElement`` is `Void`. Equivalent to calling + /// ``finish(buffer:finalElement:)`` with an empty buffer and `()`. + public consuming func finish() async throws(WriteFailure) { + var empty = UniqueArray() + try await self.finish(buffer: &empty, finalElement: ()) + } } #endif diff --git a/Sources/AsyncStreaming/NNNN-async-streaming.md b/Sources/AsyncStreaming/NNNN-async-streaming.md index aa7cabb0..cc3c3c0b 100644 --- a/Sources/AsyncStreaming/NNNN-async-streaming.md +++ b/Sources/AsyncStreaming/NNNN-async-streaming.md @@ -13,9 +13,10 @@ Introduces four protocols for asynchronous streaming with caller- and callee-managed buffer ownership for both reading and writing. Supports -noncopyable types, bulk/chunked access, and bidirectional streaming while -maintaining structured concurrency. Provides bridging extensions between -`AsyncReader` and `AsyncSequence`. +noncopyable types, bulk/chunked access, fused end-of-stream signaling with an +optional final-element payload, and bidirectional streaming while maintaining +structured concurrency. Provides bridging extensions between `AsyncReader` and +`AsyncSequence`. ## Motivation @@ -276,29 +277,76 @@ The callee-owned reader controls the buffer and passes a mutable reference to it through a scoped closure. This is the preferred protocol for read streams. ```swift -public protocol AsyncReader: ~Copyable, ~Escapable { - /// The type of elements that can be read. - associatedtype ReadElement: ~Copyable +/// Reads elements asynchronously from a source using callee-managed buffering. +/// +/// Adopt ``AsyncReader`` when you need callee-managed buffering, +/// where the reader controls the buffer and passes it to the caller +/// through the `body` closure. +/// +/// ## Signaling end of stream +/// +/// The reader signals end-of-stream by passing a non-`nil` value for the +/// `finalElement` parameter of the `body` closure. This call may also carry a +/// final chunk of elements in the buffer, allowing the reader to fuse the last +/// chunk with the end signal in a single operation. +/// +/// The ``FinalElement`` associated type controls what data, if any, the reader +/// delivers alongside the end signal. The default is `Void`, which means the +/// signal carries no payload. Set ``FinalElement`` to a custom type when the +/// reader needs to deliver structured data with the terminator. Set it to `Never` to indicate +/// that the stream never ends — the `finalElement` parameter will always be `nil`. +/// +/// After the reader has emitted a non-`nil` `finalElement`, calling +/// ``read(body:)`` again is a programmer error. +public protocol AsyncReader: ~Copyable, ~Escapable { + /// The type of elements this reader reads. + associatedtype ReadElement: ~Copyable - /// The container type the reader uses to pass elements to the caller. - associatedtype Buffer: RangeReplaceableContainer & ~Copyable + /// The container type the reader uses to pass elements to the caller. + associatedtype Buffer: RangeReplaceableContainer & ~Copyable - /// The type of error thrown during reading. - associatedtype ReadFailure: Error + /// The error type that reading operations throw. + associatedtype ReadFailure: Error - /// Reads elements from the source and passes them to the body closure. - /// - /// The reader fills an internal buffer from its source and passes a mutable - /// reference to it to `body`. When the buffer is empty, the stream - /// has ended. - /// - /// - Parameter body: A closure that processes the read elements. - /// - Returns: The value returned by the body closure. - /// - Throws: An `EitherError` containing either a `ReadFailure` from the read - /// operation or a `Failure` from the body closure. - mutating func read( - body: (inout Buffer) async throws(Failure) -> Return - ) async throws(EitherError) -> Return + /// The data the reader delivers alongside the end-of-stream signal. + /// + /// Defaults to `Void`. Use a custom type to carry data along with the + /// end signal. Use `Never` for streams that never end. + associatedtype FinalElement: ~Copyable = Void + + /// Reads elements from the underlying source and passes them to the provided body closure. + /// + /// This method asynchronously reads elements from the source into a buffer, + /// then passes the buffer and an optional `finalElement` to `body` for + /// processing. + /// + /// A `nil` value for `finalElement` means more data may follow. A non-`nil` + /// value (which is the only way a stream of `FinalElement == Void` signals + /// end) marks this chunk as the last one and delivers the final payload. + /// The terminal chunk's buffer may be empty or contain a final batch of + /// elements; the caller must process both. + /// + /// After the reader has emitted a non-`nil` `finalElement`, calling + /// ``read(body:)`` again is a programmer error. + /// + /// ```swift + /// var fileReader: FileAsyncReader = ... + /// + /// let result = try await fileReader.read { buffer, finalElement in + /// let processed = buffer.count + /// return (processed, finalElement != nil) + /// } + /// ``` + /// + /// - Parameter body: A closure that receives a mutable reference to the buffer + /// of read elements together with the optional end-of-stream payload and + /// returns a value of type `Return`. + /// - Returns: The value the body closure returns after processing the read elements. + /// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation + /// or a `Failure` from the body closure. + mutating func read( + body: (inout Buffer, consuming FinalElement?) async throws(Failure) -> Return + ) async throws(EitherError) -> Return } ``` @@ -308,23 +356,56 @@ The caller provides a buffer that the reader fills with elements from the source. ```swift -public protocol CallerAsyncReader: ~Copyable, ~Escapable { - /// The type of elements that can be read. - associatedtype ReadElement: ~Copyable +/// Reads elements asynchronously into a caller-provided buffer. +/// +/// Adopt ``CallerAsyncReader`` when you need caller-managed buffering, +/// where the caller supplies a buffer that the reader fills +/// with elements. +/// +/// ## Signaling end of stream +/// +/// The reader signals end-of-stream by returning a non-`nil` ``FinalElement`` +/// from ``read(into:)``. The same call may also append a final batch of +/// elements to the caller's buffer, allowing the reader to fuse the last +/// chunk with the end signal. +/// +/// The ``FinalElement`` associated type controls what data, if any, the reader +/// delivers alongside the end signal. The default is `Void`. Use a custom type +/// to carry data along with the end signal, or `Never` for streams that never +/// end. +/// +/// After the reader has returned a non-`nil` `FinalElement`, calling +/// ``read(into:)`` again is a programmer error. +public protocol CallerAsyncReader: ~Copyable, ~Escapable { + /// The type of elements this reader reads. + associatedtype ReadElement: ~Copyable - /// The type of error thrown during reading. - associatedtype ReadFailure: Error + /// The error type that reading operations throw. + associatedtype ReadFailure: Error - /// Reads elements from the source into the provided buffer. - /// - /// Appends elements into `buffer`. When the read operation reaches the - /// end of the source, no elements are appended. - /// - /// - Parameter buffer: The buffer to fill with read elements. - /// - Throws: A `ReadFailure` from the underlying read operation. - mutating func read>( - into buffer: inout Buffer - ) async throws(ReadFailure) where Buffer.Element: ~Copyable + /// The data the reader delivers alongside the end-of-stream signal. + /// + /// Defaults to `Void`. Use a custom type to carry data along with the end + /// signal, or `Never` for streams that never end. + associatedtype FinalElement: ~Copyable = Void + + /// Reads elements from the source into the provided buffer. + /// + /// This method appends elements into `buffer`. A non-`nil` return value + /// signals end-of-stream and delivers the final payload. The call may + /// also append a final batch of elements before signaling end. + /// + /// After the reader has returned a non-`nil` `FinalElement`, calling + /// ``read(into:)`` again is a programmer error. + /// + /// - Parameter buffer: The buffer to fill with read elements. + /// - Returns: A non-`nil` ``FinalElement`` if this call delivered the + /// end-of-stream signal; `nil` if more elements may follow. + /// - Throws: A `ReadFailure` from the underlying read operation. + // TODO: Check if we should support ~Escapable buffer + mutating func read & ~Copyable>( + into buffer: inout Buffer + ) async throws(ReadFailure) -> FinalElement? where Buffer.Element: ~Copyable } ``` @@ -334,22 +415,78 @@ The caller provides a buffer of elements for the writer to consume. This is the preferred protocol for write streams. ```swift -public protocol CallerAsyncWriter: ~Copyable, ~Escapable { - /// The type of elements that can be written. - associatedtype WriteElement: ~Copyable +/// Writes elements asynchronously from a caller-provided buffer. +/// +/// Adopt ``CallerAsyncWriter`` when you need caller-managed buffering, +/// where the caller provides a buffer of elements for the writer +/// to consume. +/// +/// ## Signaling end of stream +/// +/// The writer is terminated by a call to ``finish(buffer:finalElement:)``. +/// The `finish` call communicates a final buffer (if any) and the +/// ``FinalElement`` payload, allowing implementations to fuse the last data +/// frame with the end signal on transports that support it. +/// +/// The ``FinalElement`` associated type controls what data, if any, the writer +/// transmits alongside the end signal. The default is `Void`. Use a custom +/// type to carry data along with the end signal, or `Never` for endless +/// streams. When ``FinalElement`` is `Never`, ``finish(buffer:finalElement:)`` +/// cannot be called and the writer can be written to indefinitely. +/// +/// Conformers must accept zero, one, or many `write(buffer:)` calls, optionally +/// followed by a single `finish(buffer:finalElement:)` call. After `finish` +/// returns, the writer is consumed and no further calls are valid. +public protocol CallerAsyncWriter: ~Copyable, ~Escapable { + /// The type of elements this writer writes. + associatedtype WriteElement: ~Copyable - /// The type of error thrown during writing. - associatedtype WriteFailure: Error + /// The error type that writing operations throw. + associatedtype WriteFailure: Error - /// Writes elements from the provided buffer to the destination. - /// - /// Asynchronously writes all elements from the provided buffer. - /// - /// - Parameter buffer: The buffer of elements to write. - /// - Throws: A `WriteFailure` from the underlying write operation. - mutating func write & ~Copyable>( - buffer: inout Buffer - ) async throws(WriteFailure) where Buffer.Element: ~Copyable + /// The data the writer delivers alongside the end-of-stream signal. + /// + /// Defaults to `Void`. + associatedtype FinalElement: ~Copyable = Void + + /// Writes elements from the provided buffer to the underlying destination. + /// + /// This method asynchronously writes all elements from the provided buffer + /// to the underlying destination. + /// + /// ## Example + /// + /// ```swift + /// var fileWriter: FileAsyncWriter = ... + /// var data = UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]) + /// + /// try await fileWriter.write(buffer: &data) + /// ``` + /// + /// - Parameter buffer: The buffer of elements to write. + /// + /// - Throws: A `WriteFailure` from the underlying write operation. + mutating func write & ~Copyable>( + buffer: inout Buffer + ) async throws(WriteFailure) where Buffer.Element: ~Copyable + + /// Sends the final buffer and ``FinalElement`` payload, and signals + /// end-of-stream to the destination. + /// + /// The buffer may be empty if there is no remaining content to emit + /// alongside the terminator. When ``FinalElement`` is `Void`, use the + /// closure-less ``finish()`` convenience instead of passing `()` explicitly. + /// + /// - Parameters: + /// - buffer: The buffer of remaining elements to write alongside the + /// terminator. + /// - finalElement: The ``FinalElement`` payload to deliver with the end + /// signal. + /// - Throws: A `WriteFailure` from the underlying write operation. + consuming func finish & ~Copyable>( + buffer: inout Buffer, + finalElement: consuming FinalElement + ) async throws(WriteFailure) where Buffer.Element: ~Copyable } ``` @@ -358,32 +495,116 @@ public protocol CallerAsyncWriter: ~Copyable, ~Escap The writer provides a buffer that the caller fills with elements to write. ```swift -public protocol AsyncWriter: ~Copyable, ~Escapable { - /// The type of elements that can be written. - associatedtype WriteElement: ~Copyable - - /// The container type the writer uses to receive elements from the caller. - associatedtype Buffer: RangeReplaceableContainer & ~Copyable - - /// The type of error thrown during writing. - associatedtype WriteFailure: Error +/// Writes elements asynchronously to a destination using callee-managed buffering. +/// +/// Adopt ``AsyncWriter`` when you need callee-managed buffering, +/// where the writer supplies a buffer that the caller fills +/// with elements to write. +/// +/// ## Signaling end of stream +/// +/// The writer is terminated by a call to ``finish(finalElement:)``. +/// Bulk transfer happens through ``write(_:)`` calls; ``finish(finalElement:)`` +/// only carries the ``FinalElement`` payload. +/// +/// The ``FinalElement`` associated type controls what data, if any, the writer +/// transmits alongside the end signal. The default is `Void`. Use a custom +/// type to carry data along with the end signal, or `Never` for endless +/// streams. When ``FinalElement`` is `Never`, ``finish(finalElement:)`` cannot +/// be called and the writer can be written to indefinitely. +public protocol AsyncWriter: ~Copyable, ~Escapable { + /// The type of elements this writer writes. + associatedtype WriteElement: ~Copyable + + /// The container type the writer uses to receive elements from the caller. + associatedtype Buffer: RangeReplaceableContainer & ~Copyable + + /// The error type that writing operations throw. + associatedtype WriteFailure: Error + + /// The data the writer delivers alongside the end-of-stream signal. + /// + /// Defaults to `Void`. Use a custom type to carry data along with the end + /// signal. + associatedtype FinalElement: ~Copyable = Void - /// Provides a buffer for writing elements to the destination. - /// - /// The writer supplies a buffer that the `body` closure fills with - /// elements. After the closure returns, the writer handles the actual - /// write operation. - /// - /// - Parameter body: A closure that receives a mutable buffer to fill. - /// - Returns: The value returned by the body closure. - /// - Throws: An `EitherError` containing either a `WriteFailure` from the - /// write operation or a `Failure` from the body closure. - mutating func write( - _ body: (inout Buffer) async throws(Failure) -> Return - ) async throws(EitherError) -> Return + /// Provides a buffer for writing elements to the destination. + /// + /// The writer supplies a buffer, sized by the implementation, that + /// `body` uses to append elements. The writer manages the buffer + /// allocation and handles the writing operation once `body` completes. + /// Oversized payloads are split across multiple calls. + /// + /// - Parameter body: A closure that receives a buffer for appending elements + /// to write. The closure returns a result of type `Return`. + /// + /// ## Example + /// + /// ```swift + /// var writer: SomeAsyncWriter = ... + /// + /// try await writer.write { buffer in + /// for item in items { + /// buffer.append(item) + /// } + /// return buffer.count + /// } + /// ``` + /// + /// - Returns: The value the body closure returns. + /// + /// - Throws: An `EitherError` containing either a `WriteFailure` from the write operation + /// or a `Failure` from the body closure. + mutating func write( + _ body: (inout Buffer) async throws(Failure) -> Return + ) async throws(EitherError) -> Return + + /// Closes the writer, delivering a ``FinalElement`` payload alongside the + /// end-of-stream signal. + /// + /// - Parameter finalElement: The ``FinalElement`` payload to deliver with + /// the end signal. + /// - Throws: A ``WriteFailure`` from the underlying write operation. + consuming func finish( + finalElement: consuming FinalElement + ) async throws(WriteFailure) } ``` +### Final-element payload + +Every protocol carries a `FinalElement` associated type, the data, if any, +that the stream delivers alongside its end-of-stream signal. The default is +`Void`, which carries no extra payload; the signal itself is the only +information conveyed. Besides `Void` final elements there are two other +possibilities: + + * **`FinalElement == Never`** marks a stream as endless. For readers, the + terminal signal can never fire (because `Never?` only inhabits `nil`), so + callers can rely statically on the absence of termination — useful for + clocks, log tails, and other endless sources. The same applies symmetrically + to writers: with `FinalElement == Never`, `finish` cannot be called, so the + writer can be written to indefinitely but never explicitly terminated. + Examples include logging sinks, metrics emitters, and continuous output + streams. + * **A custom payload type** lets the stream attach structured data to the + close: HTTP trailers, a checksum, a frame's status code. An example is HTTP, + which specializes `FinalElement` to `HTTPFields?` so the last DATA frame and + the trailers can be fused into a single transport operation. + +For readers, the terminal `read` call may also carry a final chunk of payload +elements alongside the end signal; the caller must process both. For writers, +`finish` is a consuming operation that delivers the `FinalElement` payload and +signals end-of-stream. `CallerAsyncWriter.finish(buffer:finalElement:)` also +carries a final buffer, so the last buffer and the terminator can be fused +into a single transport operation. `AsyncWriter.finish(finalElement:)` carries +only the terminator; conformers that can fuse may defer flushing the most +recent `write(_:)` until `finish` is called, achieving the same coalescing. + +Convenience extensions on the writer protocols expose a closure-less `finish()` +when `FinalElement == Void`, so callers that don't need a custom payload can +omit the explicit `()` argument. + ### Bridging between `AsyncReader` and `AsyncSequence` We provide extensions for converting between `AsyncReader` and `AsyncSequence` @@ -484,106 +705,118 @@ core protocols are established. ### Iteration and collection helpers -Two common patterns emerge immediately when working with `AsyncReader`: iterating -over all chunks until the stream ends, and collecting elements into a buffer up -to a specified limit. We envision convenience extensions for both: +Two common patterns emerge immediately when working with `AsyncReader`: +iterating over all chunks until the stream ends, and collecting elements into +a buffer up to a specified limit. We envision convenience extensions for both, +both of which surface the `FinalElement` payload to the caller: ```swift extension AsyncReader where Self: ~Copyable, Self: ~Escapable { - /// Iterates over all chunks, executing `body` for each buffer until the - /// stream ends. + /// Iterates every chunk, executing `body` for each, until the reader + /// signals end-of-stream. Returns the `FinalElement` delivered with the + /// terminal chunk. public consuming func forEachBuffer( body: (inout Buffer) async throws(Failure) -> Void - ) async throws(EitherError) + ) async throws(EitherError) -> FinalElement? } extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable { - /// Collects elements up to `limit`, then passes the accumulated - /// elements to `body` as an `InputSpan`. - public mutating func collect( + /// Collects up to `limit` elements, then passes the accumulated elements + /// to `body` as an `InputSpan`. Returns the body's result together with + /// the `FinalElement`. + public consuming func collect( upTo limit: Int, body: (consuming InputSpan) async throws(Failure) -> Result - ) async throws(EitherError, Failure>) -> Result + ) async throws(EitherError, Failure>) -> (Result, FinalElement?) } ``` -`forEachBuffer` provides a simple way to consume an entire stream without manually -looping over `read` calls and checking for empty buffers. `collect` accumulates -elements from multiple reads into a single buffer before processing, which is -useful when an algorithm needs all data in contiguous memory (for example, -parsing a complete message frame). +`forEachBuffer` provides a simple way to consume an entire stream without +manually looping over `read` calls and threading the end signal. `collect` +accumulates elements from multiple reads into a single buffer before +processing, which is useful when an algorithm needs all data in contiguous +memory (for example, parsing a complete message frame). A second +`where FinalElement == Void` overload of `collect` lets callers that don't +need the payload omit the tuple. -These helpers are intentionally excluded from this proposal because their error -handling semantics (particularly `collect`'s nested `EitherError` and the +These helpers are intentionally excluded from this proposal because their +error-handling shapes (particularly `collect`'s nested `EitherError` and the `AsyncReaderLeftOverElementsError` overflow behavior) benefit from real-world usage feedback before stabilization. ### Piping a reader into a writer -A common pattern when bridging streams is piping all elements from a reader to -a writer. We envision convenience extensions on each reader protocol that -consume the reader and forward its elements into a matching writer. +A common pattern when bridging streams is piping all elements from a reader +to a writer. We envision convenience extensions on each reader protocol that +consume both sides and forward the elements — including the `FinalElement` +payload, fused with the writer's `finish` where the protocols allow it. -When the buffer ownership of the reader and writer aligns, the buffer can flow -directly between the two without an intermediate stage: +When the buffer ownership of the reader and writer aligns, the buffer can +flow directly between the two without an intermediate stage: ```swift -extension CallerAsyncReader where Self: ~Copyable, Self: ~Escapable, Self.ReadElement: ~Copyable { - /// Pipes all elements from this reader into the given callee-owned writer - /// until the stream ends. +extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable { + /// Pipes all elements from this reader into the given caller-owned + /// writer, fusing the terminal chunk with `finish` so the transport + /// sees a single close. public consuming func pipe( - into writer: inout Writer - ) async throws where Writer: AsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement + into writer: consuming Writer + ) async throws + where Writer: CallerAsyncWriter & ~Copyable, + Writer.WriteElement == ReadElement, + Writer.FinalElement == FinalElement } -extension AsyncReader where Self: ~Copyable, Self: ~Escapable { - /// Pipes all elements from this reader into the given caller-owned writer - /// until the stream ends. +extension CallerAsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable { + /// Pipes all elements from this reader into the given callee-owned + /// writer. public consuming func pipe( - into writer: inout Writer - ) async throws where Writer: CallerAsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement + into writer: consuming Writer + ) async throws + where Writer: AsyncWriter & ~Copyable, + Writer.WriteElement == ReadElement, + Writer.FinalElement == FinalElement } ``` -The two remaining combinations require a transfer between separate buffers. We -distinguish them with explicit argument labels so the cost is visible at the -call site: +The two remaining combinations require a transfer between separate buffers. +We distinguish them with explicit argument labels so the cost is visible at +the call site: ```swift -extension AsyncReader where Self: ~Copyable, Self: ~Escapable, Self.ReadElement: ~Copyable { - /// Pipes all elements from this reader into the given callee-owned writer, - /// copying each element from the reader's buffer into the writer's buffer. - /// - /// Both protocols supply their own buffer, so each element is moved between - /// them. The writer's buffer may be smaller than the reader's, in which case - /// multiple `write` calls are issued per chunk produced by the reader. +extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable { + /// Pipes all elements from this reader into the given callee-owned + /// writer, copying each element from the reader's buffer into the + /// writer's buffer. public consuming func pipe( - copyingInto writer: inout Writer - ) async throws where Writer: AsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement + copyingInto writer: consuming Writer + ) async throws + where Writer: AsyncWriter & ~Copyable, + Writer.WriteElement == ReadElement, + Writer.FinalElement == FinalElement } -extension CallerAsyncReader where Self: ~Copyable, Self: ~Escapable, Self.ReadElement: ~Copyable { - /// Pipes all elements from this reader into the given caller-owned writer - /// through an intermediate buffer of the requested capacity. - /// - /// Neither protocol supplies a buffer, so this method allocates a single - /// `UniqueArray` and reuses it across iterations. The writer must drain all - /// elements during each `write(buffer:)` call. +extension CallerAsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable { + /// Pipes all elements from this reader into the given caller-owned + /// writer through an intermediate buffer of the requested capacity. public consuming func pipe( - bufferingInto writer: inout Writer, + bufferingInto writer: consuming Writer, intermediateCapacity: Int - ) async throws where Writer: CallerAsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement + ) async throws + where Writer: CallerAsyncWriter & ~Copyable, + Writer.WriteElement == ReadElement, + Writer.FinalElement == FinalElement } ``` -These helpers eliminate the boilerplate of manually looping, threading an -end-of-stream signal, and shuttling buffers between the two sides. Defining -`pipe` on the reader matches the precedent that the operation hangs off the -consumed source rather than the long-lived destination. +These helpers eliminate the boilerplate of manually looping, threading the +end signal, and shuttling buffers between the two sides. Defining `pipe` on +the reader matches the precedent that the operation hangs off the consumed +source rather than the long-lived destination. Like the iteration and collection helpers above, these are intentionally -excluded from the initial proposal so that their error handling semantics can -mature through real-world use before stabilization. +excluded from the initial proposal so that their error-handling and +fused-close semantics can mature through real-world use before stabilization. ### Owned buffer transfer protocols @@ -726,6 +959,42 @@ necessary. With clear signposting of which protocols are "preferred," the developer experience remains approachable for the majority while not tying the hands of those with specialized requirements. +### Omitting the `FinalElement` associated type + +An earlier shape of this proposal had no `FinalElement` concept at all: the +read side signaled end-of-stream by returning an empty buffer; the write side +signaled it implicitly by deinitializing or consuming the writer. That design +is simpler to teach but fails on two fronts. + +**It cannot express transports that deliver structured data with the close.** +Many real protocols attach a payload to the end-of-stream signal: + + * **HTTP** sends trailing fields after the last DATA frame, these are arbitrary + header-like key/value pairs (checksums, signed digests, gRPC status) that + are only available once the body has been transmitted. Without + `FinalElement`, an HTTP body reader has no way to surface the trailers to + its caller alongside the last chunk; the application would need a separate + out-of-band channel. + * **gRPC** carries the call's terminal status (OK / error code + message) in + the trailers of every server-streaming response. The body protocol *must* + be able to deliver that status to the caller. + +These are not exotic edge cases, they are two of the most common streaming +transports a Swift server library will need to support. + +**It cannot fuse the last write with the close signal.** HTTP/2, HTTP/3, and +QUIC all let the sender attach an END_STREAM flag to the same frame that +carries the final body bytes. Without an explicit `finish` operation that +takes both the last chunk and the terminator, the writer would have to emit +two separate transport operations: one `write` for the final bytes, and a +later deinit-driven close. That doubles the syscall / framing cost on the hot +path of every request and forfeits the kernel-side coalescing these protocols +were specifically designed to enable. + +Defaulting `FinalElement` to `Void` keeps the trivial case ergonomic, a plain +file or byte-stream conformer needn't think about the associated type, while +making the HTTP- and gRPC-shaped use cases expressible at all. + ### Not using closures to provide temporary access to buffers The callee-owned read and write APIs use closures that receive a span, which diff --git a/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+collectTests.swift b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+collectTests.swift index c1b70c48..f2de3087 100644 --- a/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+collectTests.swift +++ b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+collectTests.swift @@ -20,7 +20,7 @@ import Testing struct AsyncReaderCollectTests { @Test func collectAllElements() async throws { - var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5])) + let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5])) let result = try await reader.collect(upTo: 10) { span in return Array(span) @@ -31,7 +31,7 @@ struct AsyncReaderCollectTests { @Test func collectWithExactLimit() async throws { - var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5])) + let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5])) let result = try await reader.collect(upTo: 5) { span in return Array(span) @@ -42,7 +42,7 @@ struct AsyncReaderCollectTests { @Test func collectEmptyReader() async throws { - var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 0, copying: [])) + let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 0, copying: [])) let result = try await reader.collect(upTo: 10) { span in return span.count @@ -53,7 +53,7 @@ struct AsyncReaderCollectTests { @Test func collectProcessesAllElements() async throws { - var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [10, 20, 30])) + let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [10, 20, 30])) let result = try await reader.collect(upTo: 10) { span in var sum = 0 @@ -66,17 +66,31 @@ struct AsyncReaderCollectTests { #expect(result == 60) } + @Test + func collectVoidOverloadReturnsResultOnly() async throws { + let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3])) + + let result: [Int] = try await reader.collect(upTo: 10) { span in + return Array(span) + } + + #expect(result == [1, 2, 3]) + } + @Test func collectThrowsLeftOverElements() async throws { - var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3])) + let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3])) - let expectedError = EitherError, Never>.first( - .second(AsyncReaderLeftOverElementsError()) - ) - await #expect(throws: expectedError) { - try await reader.collect(upTo: 1) { span in + do { + _ = try await reader.collect(upTo: 1) { (span) -> Int in return span.count } + Issue.record("Expected error") + } catch { + let expected = EitherError, Never>.first( + .second(AsyncReaderLeftOverElementsError()) + ) + #expect(error == expected) } } } diff --git a/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+forEachTests.swift b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+forEachTests.swift index 50c07690..b70a9089 100644 --- a/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+forEachTests.swift +++ b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+forEachTests.swift @@ -53,7 +53,8 @@ struct AsyncReaderforEachBufferTests { callCount += 1 } - #expect(callCount == 0) + // The reader still emits a terminal call (with an empty buffer + finalElement). + #expect(callCount == 1) } @Test @@ -65,7 +66,7 @@ struct AsyncReaderforEachBufferTests { let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3])) do { - try await reader.forEachBuffer { (_) throws(TestError) -> Void in + _ = try await reader.forEachBuffer { (_) throws(TestError) -> Void in throw TestError.failed } Issue.record("Expected error to be thrown") @@ -84,7 +85,7 @@ struct AsyncReaderforEachBufferTests { var count = 0 do { - try await reader.forEachBuffer { (buffer) throws(TestError) -> Void in + _ = try await reader.forEachBuffer { (buffer) throws(TestError) -> Void in count += buffer.count } } catch { @@ -99,7 +100,7 @@ struct AsyncReaderforEachBufferTests { let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3])) var results: [Int] = [] - await reader.forEachBuffer { buffer in + _ = await reader.forEachBuffer { buffer in await Task.yield() for i in buffer.indices { results.append(buffer[i]) diff --git a/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+pipe.swift b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+pipe.swift index 2ad36d30..a8753127 100644 --- a/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+pipe.swift +++ b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+pipe.swift @@ -18,136 +18,69 @@ import Testing @Suite struct AsyncReaderPipeTests { @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) func pipeIntoCopiesAllElements() async throws { let reader = UniqueArrayAsyncReader( storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]) ) - var writer = UniqueArrayCallerAsyncWriter() + let writer = UniqueArrayCallerAsyncWriter() - try await reader.pipe(into: &writer) - - #expect(writer.storage.count == 5) - #expect(writer.storage[0] == 1) - #expect(writer.storage[1] == 2) - #expect(writer.storage[2] == 3) - #expect(writer.storage[3] == 4) - #expect(writer.storage[4] == 5) + try await reader.pipe(into: writer) } @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) func pipeIntoWithEmptyReader() async throws { let reader = UniqueArrayAsyncReader( storage: UniqueArray() ) - var writer = UniqueArrayCallerAsyncWriter() - - try await reader.pipe(into: &writer) + let writer = UniqueArrayCallerAsyncWriter() - #expect(writer.storage.count == 0) + try await reader.pipe(into: writer) } @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) func pipeIntoPreservesElementOrder() async throws { let elements = Array(1...50) let reader = UniqueArrayAsyncReader( storage: UniqueArray(capacity: elements.count, copying: elements) ) - var writer = UniqueArrayCallerAsyncWriter(capacity: elements.count) - - try await reader.pipe(into: &writer) - - #expect(writer.storage.count == elements.count) - for i in 0..() ) - var writer = UniqueArrayAsyncWriter() - - try await reader.pipe(copyingInto: &writer) + let writer = UniqueArrayAsyncWriter() - #expect(writer.storage.count == 0) + try await reader.pipe(copyingInto: writer) } @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func pipeCopyingIntoChunksReaderBufferAcrossMultipleWrites() async throws { - // Reader hands out a single 200-element buffer. The writer hands out 64-element - // buffers, so the reader buffer must be drained across multiple writer.write calls. + func pipeCopyingIntoChunksTerminalChunkAcrossMultipleWrites() async throws { + // The reader's terminal chunk is 200 elements; the writer hands out + // 64-element buffers. Verify pipe runs without dropping bytes — the + // payload-bearing version of this scenario in FinalElementPipeTests + // checks the actual contents delivered. let elements = Array(1...200) let reader = UniqueArrayAsyncReader( storage: UniqueArray(capacity: elements.count, copying: elements) ) - var writer = UniqueArrayAsyncWriter(capacity: 256) - - try await reader.pipe(copyingInto: &writer) - - #expect(writer.storage.count == elements.count) - for i in 0.. = try! await reader.read { (buffer, finalElement) in + observedFinal = finalElement != nil return buffer.clone() } @@ -31,25 +33,21 @@ struct AsyncReaderTests { #expect(result[2] == 3) #expect(result[3] == 4) #expect(result[4] == 5) + #expect(observedFinal) } @Test - func readEmptyAtEnd() async { + func readDeliversTerminator() async { var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3])) - // Read all data - let first = try! await reader.read { buffer in + var observedFinal = false + let count: Int = try! await reader.read { (buffer, finalElement) in + observedFinal = finalElement != nil return buffer.count } - #expect(first == 3) - - // Next read should return empty span - let second = try! await reader.read { buffer in - return buffer.count - } - - #expect(second == 0) + #expect(count == 3) + #expect(observedFinal) } } #endif diff --git a/Tests/AsyncStreamingTests/AsyncWriter/AsyncWriterTests.swift b/Tests/AsyncStreamingTests/AsyncWriter/AsyncWriterTests.swift index 77535ed4..0c895775 100644 --- a/Tests/AsyncStreamingTests/AsyncWriter/AsyncWriterTests.swift +++ b/Tests/AsyncStreamingTests/AsyncWriter/AsyncWriterTests.swift @@ -94,5 +94,17 @@ struct AsyncWriterTests { #expect(writer.storage[2] == 3) #expect(writer.storage[3] == 4) } + + @Test + func finishVoidConvenience() async { + let writer = UniqueArrayAsyncWriter() + await writer.finish() + } + + @Test + func finishDeliversFinalElement() async { + let writer = UniqueArrayAsyncWriter() + await writer.finish(finalElement: ()) + } } #endif diff --git a/Tests/AsyncStreamingTests/CallerAsyncReader/CallerAsyncReader+pipe.swift b/Tests/AsyncStreamingTests/CallerAsyncReader/CallerAsyncReader+pipe.swift index 1a90581e..0d851c12 100644 --- a/Tests/AsyncStreamingTests/CallerAsyncReader/CallerAsyncReader+pipe.swift +++ b/Tests/AsyncStreamingTests/CallerAsyncReader/CallerAsyncReader+pipe.swift @@ -18,138 +18,65 @@ import Testing @Suite struct CallerAsyncReaderPipeTests { @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) func pipeIntoCopiesAllElements() async throws { let reader = UniqueArrayCallerAsyncReader( storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]) ) - var writer = UniqueArrayAsyncWriter() + let writer = UniqueArrayAsyncWriter() - try await reader.pipe(into: &writer) - - #expect(writer.storage.count == 5) - #expect(writer.storage[0] == 1) - #expect(writer.storage[1] == 2) - #expect(writer.storage[2] == 3) - #expect(writer.storage[3] == 4) - #expect(writer.storage[4] == 5) + try await reader.pipe(into: writer) } @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) func pipeIntoWithEmptyReader() async throws { let reader = UniqueArrayCallerAsyncReader( storage: UniqueArray() ) - var writer = UniqueArrayAsyncWriter() - - try await reader.pipe(into: &writer) + let writer = UniqueArrayAsyncWriter() - #expect(writer.storage.count == 0) + try await reader.pipe(into: writer) } @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) func pipeIntoLoopsAcrossMultipleBuffers() async throws { - // The writer hands out 64-element buffers. Use 200 elements to force the - // implementation to call write multiple times. let elements = Array(1...200) let reader = UniqueArrayCallerAsyncReader( storage: UniqueArray(capacity: elements.count, copying: elements) ) - var writer = UniqueArrayAsyncWriter(capacity: 256) - - try await reader.pipe(into: &writer) - - #expect(writer.storage.count == elements.count) - for i in 0..() ) - var writer = UniqueArrayCallerAsyncWriter() - - try await reader.pipe(bufferingInto: &writer, intermediateCapacity: 16) + let writer = UniqueArrayCallerAsyncWriter() - #expect(writer.storage.count == 0) + try await reader.pipe(bufferingInto: writer, intermediateCapacity: 16) } @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) func pipeBufferingIntoReusesIntermediateBufferAcrossMultipleIterations() async throws { - // The intermediate buffer holds 16 elements. With 100 source elements, the loop - // must iterate at least 7 times, reusing the same buffer. let elements = Array(1...100) let reader = UniqueArrayCallerAsyncReader( storage: UniqueArray(capacity: elements.count, copying: elements) ) - var writer = UniqueArrayCallerAsyncWriter(capacity: elements.count) - - try await reader.pipe(bufferingInto: &writer, intermediateCapacity: 16) - - #expect(writer.storage.count == elements.count) - for i in 0..(minimumCapacity: 3) - await reader.read(into: &buffer3) - #expect(buffer3.count == 0) } } #endif diff --git a/Tests/AsyncStreamingTests/Helpers/UniqueArrayAsyncReader.swift b/Tests/AsyncStreamingTests/Helpers/UniqueArrayAsyncReader.swift index 2550434b..874351ee 100644 --- a/Tests/AsyncStreamingTests/Helpers/UniqueArrayAsyncReader.swift +++ b/Tests/AsyncStreamingTests/Helpers/UniqueArrayAsyncReader.swift @@ -18,16 +18,20 @@ struct UniqueArrayAsyncReader: ~Copyable, AsyncReader { typealias ReadElement = Int typealias Buffer = UniqueArray typealias ReadFailure = Never + typealias FinalElement = Void var storage: UniqueArray + var didEmitFinal: Bool = false mutating func read( - body: (inout UniqueArray) async throws(Failure) -> Return + body: (inout UniqueArray, Void?) async throws(Failure) -> Return ) async throws(EitherError) -> Return { + precondition(!self.didEmitFinal, "read called after end-of-stream") + self.didEmitFinal = true do { var uniqueArray = self.storage.clone() self.storage = .init() - return try await body(&uniqueArray) + return try await body(&uniqueArray, .some(())) } catch { throw .second(error) } diff --git a/Tests/AsyncStreamingTests/Helpers/UniqueArrayAsyncWriter.swift b/Tests/AsyncStreamingTests/Helpers/UniqueArrayAsyncWriter.swift index 4ee5b485..b681e9cd 100644 --- a/Tests/AsyncStreamingTests/Helpers/UniqueArrayAsyncWriter.swift +++ b/Tests/AsyncStreamingTests/Helpers/UniqueArrayAsyncWriter.swift @@ -18,8 +18,10 @@ struct UniqueArrayAsyncWriter: ~Copyable, AsyncWriter { typealias WriteElement = Int typealias Buffer = UniqueArray typealias WriteFailure = Never + typealias FinalElement = Void var storage: UniqueArray + var didFinish: Bool = false init(capacity: Int = 100) { self.storage = UniqueArray(minimumCapacity: capacity) @@ -37,5 +39,11 @@ struct UniqueArrayAsyncWriter: ~Copyable, AsyncWriter { throw .second(error) } } + + consuming func finish( + finalElement: consuming Void + ) async throws(Never) { + self.didFinish = true + } } #endif diff --git a/Tests/AsyncStreamingTests/Helpers/UniqueArrayCallerAsyncReader.swift b/Tests/AsyncStreamingTests/Helpers/UniqueArrayCallerAsyncReader.swift index ea069883..3cb73242 100644 --- a/Tests/AsyncStreamingTests/Helpers/UniqueArrayCallerAsyncReader.swift +++ b/Tests/AsyncStreamingTests/Helpers/UniqueArrayCallerAsyncReader.swift @@ -17,19 +17,26 @@ import ContainersPreview struct UniqueArrayCallerAsyncReader: ~Copyable, CallerAsyncReader { typealias ReadElement = Int typealias ReadFailure = Never + typealias FinalElement = Void var storage: UniqueArray var position: Int = 0 + var didEmitFinal: Bool = false mutating func read & ~Copyable>( into buffer: inout Buffer - ) async throws(ReadFailure) where Buffer.Element: ~Copyable { - guard position < storage.count else { return } + ) async throws(ReadFailure) -> Void? where Buffer.Element: ~Copyable { + precondition(!self.didEmitFinal, "read called after end-of-stream") let count = min(buffer.freeCapacity, storage.count - position) for i in 0..= storage.count { + self.didEmitFinal = true + return .some(()) + } + return nil } } #endif diff --git a/Tests/AsyncStreamingTests/Helpers/UniqueArrayCallerAsyncWriter.swift b/Tests/AsyncStreamingTests/Helpers/UniqueArrayCallerAsyncWriter.swift index 04fec162..37d24842 100644 --- a/Tests/AsyncStreamingTests/Helpers/UniqueArrayCallerAsyncWriter.swift +++ b/Tests/AsyncStreamingTests/Helpers/UniqueArrayCallerAsyncWriter.swift @@ -19,8 +19,10 @@ struct WriterCapacityError: Error {} struct UniqueArrayCallerAsyncWriter: ~Copyable, CallerAsyncWriter { typealias WriteElement = Int typealias WriteFailure = Never + typealias FinalElement = Void var storage: UniqueArray + var didFinish: Bool = false init(capacity: Int = 100) { self.storage = UniqueArray(minimumCapacity: capacity) @@ -35,5 +37,17 @@ struct UniqueArrayCallerAsyncWriter: ~Copyable, CallerAsyncWriter { self.storage.append(element) } } + + consuming func finish & ~Copyable>( + buffer: inout Buffer, + finalElement: consuming Void + ) async throws(WriteFailure) where Buffer.Element: ~Copyable { + self.storage.reserveCapacity(buffer.count) + var consumer = buffer.consumeAll() + while let element = consumer.next() { + self.storage.append(element) + } + self.didFinish = true + } } #endif