diff --git a/Sources/AsyncStreaming/AsyncReader/AsyncReader+pipe.swift b/Sources/AsyncStreaming/AsyncReader/AsyncReader+pipe.swift new file mode 100644 index 00000000..71043873 --- /dev/null +++ b/Sources/AsyncStreaming/AsyncReader/AsyncReader+pipe.swift @@ -0,0 +1,106 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if UnstableAsyncStreaming && compiler(>=6.4) + +import ContainersPreview + +@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) +extension AsyncReader where Self: ~Copyable, Self: ~Escapable, Self.ReadElement: ~Copyable { + /// Pipes all elements from this reader into the given writer. + /// + /// This method consumes the reader and writes all of its elements into the writer's + /// destination. It iterates over each buffer the reader produces and hands it to the + /// writer until this reader's stream ends. + /// + /// ## Example + /// + /// ```swift + /// let dataReader: DataAsyncReader = ... + /// var fileWriter: FileCallerAsyncWriter = ... + /// + /// // Copy all data from reader to writer + /// try await dataReader.pipe(into: &fileWriter) + /// ``` + /// + /// - Parameter writer: A ``CallerAsyncWriter`` to receive the elements. The writer is + /// mutated in place and remains usable after this operation. + /// + /// - Throws: An error originating from the read or write operations. + public consuming func pipe( + into writer: inout Writer + ) async throws(EitherError) + where Writer: CallerAsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement { + try await self.forEachBuffer { (buffer: inout Buffer) throws(Writer.WriteFailure) in + try await writer.write(buffer: &buffer) + } + } + + /// Pipes all elements from this reader into the given writer, copying each element from the reader's + /// buffer into the writer's buffer. + /// + /// This method consumes the reader and writes all of its elements into the writer's + /// destination. Because both protocols supply their own buffer, each element must be + /// transferred from the reader's buffer into the writer's buffer. The writer's buffer + /// may be smaller than the reader's, in which case multiple `write` calls are issued + /// per chunk produced by the reader. + /// + /// ## Example + /// + /// ```swift + /// let dataReader: DataAsyncReader = ... + /// var fileWriter: FileAsyncWriter = ... + /// + /// // Copy all data from reader to writer + /// try await dataReader.pipe(copyingInto: &fileWriter) + /// ``` + /// + /// - Parameter writer: An ``AsyncWriter`` to receive the elements. The writer is + /// mutated in place and remains usable after this operation. + /// + /// - Throws: An error originating from the read or write operations. + public consuming func pipe( + copyingInto writer: inout Writer + ) async throws(EitherError) + where Writer: AsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement { + try await self.forEachBuffer { (readerBuffer: inout Buffer) throws(Writer.WriteFailure) in + var consumer = readerBuffer.consumeAll() + while let firstElement = consumer.next() { + var pending: ReadElement? = firstElement + do throws(EitherError) { + try await writer.write { (writerBuffer: inout Writer.Buffer) in + switch consume pending { + case .some(let element): + writerBuffer.append(element) + case .none: + break + } + pending = nil + // TODO: We should check if we can use one of the append methods instead of + // element by element copies in the future + while writerBuffer.freeCapacity > 0 { + guard let element = consumer.next() else { return } + writerBuffer.append(element) + } + } + } catch { + switch error { + case .first(let writeFailure): + throw writeFailure + case .second: + fatalError("Unreachable") + } + } + } + } + } +} +#endif diff --git a/Sources/AsyncStreaming/CallerAsyncReader/CallerAsyncReader+pipe.swift b/Sources/AsyncStreaming/CallerAsyncReader/CallerAsyncReader+pipe.swift new file mode 100644 index 00000000..59f7caad --- /dev/null +++ b/Sources/AsyncStreaming/CallerAsyncReader/CallerAsyncReader+pipe.swift @@ -0,0 +1,107 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if UnstableAsyncStreaming && compiler(>=6.4) + +import ContainersPreview +import BasicContainers + +@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) +extension CallerAsyncReader where Self: ~Copyable, Self: ~Escapable, Self.ReadElement: ~Copyable { + /// Pipes all elements from this reader into the given writer. + /// + /// This method consumes the reader and writes all of its elements into the writer's + /// destination. It continuously reads chunks into buffers supplied by the writer and + /// flushes them until this reader's stream ends. + /// + /// ## Example + /// + /// ```swift + /// let dataReader: DataCallerAsyncReader = ... + /// var fileWriter: FileAsyncWriter = ... + /// + /// // Copy all data from reader to writer + /// try await dataReader.pipe(into: &fileWriter) + /// ``` + /// + /// - Parameter writer: An ``AsyncWriter`` to receive the elements. The writer is mutated + /// in place and remains usable after this operation. + /// + /// - Throws: An error originating from the read or write operations. + public consuming func pipe( + into writer: inout Writer + ) async throws(EitherError) + where Writer: AsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement { + var shouldContinue = true + while shouldContinue { + try await writer + .write { (buffer: inout Writer.Buffer) throws(ReadFailure) in + try await self.read(into: &buffer) + if buffer.count == 0 { + shouldContinue = false + } + } + } + } + + /// Pipes all elements from this reader into the given writer through an intermediate buffer. + /// + /// This method consumes the reader and writes all of its elements into the writer's + /// destination. Because neither the reader nor the writer supplies a buffer, this + /// method allocates an intermediate buffer of the requested capacity and reuses it + /// across iterations: each iteration fills the buffer from the reader and then hands + /// it to the writer to drain. + /// + /// ## Example + /// + /// ```swift + /// let dataReader: DataCallerAsyncReader = ... + /// var fileWriter: FileCallerAsyncWriter = ... + /// + /// // Copy all data from reader to writer using a 4 KB intermediate buffer + /// try await dataReader.pipe(bufferingInto: &fileWriter, intermediateCapacity: 4096) + /// ``` + /// + /// - Parameters: + /// - writer: A ``CallerAsyncWriter`` to receive the elements. The writer is mutated + /// in place and remains usable after this operation. + /// - intermediateCapacity: The capacity of the intermediate buffer that mediates + /// between the reader and writer. Larger values reduce the number of read and + /// write calls at the cost of memory. + /// + /// - Throws: An error originating from the read or write operations. + public consuming func pipe( + bufferingInto writer: inout Writer, + intermediateCapacity: Int + ) async throws(EitherError) + where Writer: CallerAsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement { + var buffer = UniqueArray(minimumCapacity: intermediateCapacity) + var shouldContinue = true + while shouldContinue { + do throws(ReadFailure) { + try await self.read(into: &buffer) + } catch { + throw .first(error) + } + if buffer.count == 0 { + shouldContinue = false + } else { + do throws(Writer.WriteFailure) { + try await writer.write(buffer: &buffer) + assert(buffer.count == 0, "CallerAsyncWriter must drain the buffer during write(buffer:)") + } catch { + throw .second(error) + } + } + } + } +} +#endif diff --git a/Sources/AsyncStreaming/NNNN-async-streaming.md b/Sources/AsyncStreaming/NNNN-async-streaming.md index 3d317daa..aa7cabb0 100644 --- a/Sources/AsyncStreaming/NNNN-async-streaming.md +++ b/Sources/AsyncStreaming/NNNN-async-streaming.md @@ -518,6 +518,73 @@ handling semantics (particularly `collect`'s nested `EitherError` and the `AsyncReaderLeftOverElementsError` overflow behavior) benefit from real-world usage feedback before stabilization. +### Piping a reader into a writer + +A common pattern when bridging streams is piping all elements from a reader to +a writer. We envision convenience extensions on each reader protocol that +consume the reader and forward its elements into a matching writer. + +When the buffer ownership of the reader and writer aligns, the buffer can flow +directly between the two without an intermediate stage: + +```swift +extension CallerAsyncReader where Self: ~Copyable, Self: ~Escapable, Self.ReadElement: ~Copyable { + /// Pipes all elements from this reader into the given callee-owned writer + /// until the stream ends. + public consuming func pipe( + into writer: inout Writer + ) async throws where Writer: AsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement +} + +extension AsyncReader where Self: ~Copyable, Self: ~Escapable { + /// Pipes all elements from this reader into the given caller-owned writer + /// until the stream ends. + public consuming func pipe( + into writer: inout Writer + ) async throws where Writer: CallerAsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement +} +``` + +The two remaining combinations require a transfer between separate buffers. We +distinguish them with explicit argument labels so the cost is visible at the +call site: + +```swift +extension AsyncReader where Self: ~Copyable, Self: ~Escapable, Self.ReadElement: ~Copyable { + /// Pipes all elements from this reader into the given callee-owned writer, + /// copying each element from the reader's buffer into the writer's buffer. + /// + /// Both protocols supply their own buffer, so each element is moved between + /// them. The writer's buffer may be smaller than the reader's, in which case + /// multiple `write` calls are issued per chunk produced by the reader. + public consuming func pipe( + copyingInto writer: inout Writer + ) async throws where Writer: AsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement +} + +extension CallerAsyncReader where Self: ~Copyable, Self: ~Escapable, Self.ReadElement: ~Copyable { + /// Pipes all elements from this reader into the given caller-owned writer + /// through an intermediate buffer of the requested capacity. + /// + /// Neither protocol supplies a buffer, so this method allocates a single + /// `UniqueArray` and reuses it across iterations. The writer must drain all + /// elements during each `write(buffer:)` call. + public consuming func pipe( + bufferingInto writer: inout Writer, + intermediateCapacity: Int + ) async throws where Writer: CallerAsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement +} +``` + +These helpers eliminate the boilerplate of manually looping, threading an +end-of-stream signal, and shuttling buffers between the two sides. Defining +`pipe` on the reader matches the precedent that the operation hangs off the +consumed source rather than the long-lived destination. + +Like the iteration and collection helpers above, these are intentionally +excluded from the initial proposal so that their error handling semantics can +mature through real-world use before stabilization. + ### Owned buffer transfer protocols The four protocols in this proposal all use generic `RangeReplaceableContainer` diff --git a/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+pipe.swift b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+pipe.swift new file mode 100644 index 00000000..2ad36d30 --- /dev/null +++ b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+pipe.swift @@ -0,0 +1,153 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if UnstableAsyncStreaming && compiler(>=6.4) +import AsyncStreaming +import BasicContainers +import ContainersPreview +import Testing + +@Suite +struct AsyncReaderPipeTests { + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func pipeIntoCopiesAllElements() async throws { + let reader = UniqueArrayAsyncReader( + storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]) + ) + var writer = UniqueArrayCallerAsyncWriter() + + try await reader.pipe(into: &writer) + + #expect(writer.storage.count == 5) + #expect(writer.storage[0] == 1) + #expect(writer.storage[1] == 2) + #expect(writer.storage[2] == 3) + #expect(writer.storage[3] == 4) + #expect(writer.storage[4] == 5) + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func pipeIntoWithEmptyReader() async throws { + let reader = UniqueArrayAsyncReader( + storage: UniqueArray() + ) + var writer = UniqueArrayCallerAsyncWriter() + + try await reader.pipe(into: &writer) + + #expect(writer.storage.count == 0) + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func pipeIntoPreservesElementOrder() async throws { + let elements = Array(1...50) + let reader = UniqueArrayAsyncReader( + storage: UniqueArray(capacity: elements.count, copying: elements) + ) + var writer = UniqueArrayCallerAsyncWriter(capacity: elements.count) + + try await reader.pipe(into: &writer) + + #expect(writer.storage.count == elements.count) + for i in 0..() + ) + var writer = UniqueArrayAsyncWriter() + + try await reader.pipe(copyingInto: &writer) + + #expect(writer.storage.count == 0) + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func pipeCopyingIntoChunksReaderBufferAcrossMultipleWrites() async throws { + // Reader hands out a single 200-element buffer. The writer hands out 64-element + // buffers, so the reader buffer must be drained across multiple writer.write calls. + let elements = Array(1...200) + let reader = UniqueArrayAsyncReader( + storage: UniqueArray(capacity: elements.count, copying: elements) + ) + var writer = UniqueArrayAsyncWriter(capacity: 256) + + try await reader.pipe(copyingInto: &writer) + + #expect(writer.storage.count == elements.count) + for i in 0..=6.4) +import AsyncStreaming +import BasicContainers +import ContainersPreview +import Testing + +@Suite +struct CallerAsyncReaderPipeTests { + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func pipeIntoCopiesAllElements() async throws { + let reader = UniqueArrayCallerAsyncReader( + storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]) + ) + var writer = UniqueArrayAsyncWriter() + + try await reader.pipe(into: &writer) + + #expect(writer.storage.count == 5) + #expect(writer.storage[0] == 1) + #expect(writer.storage[1] == 2) + #expect(writer.storage[2] == 3) + #expect(writer.storage[3] == 4) + #expect(writer.storage[4] == 5) + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func pipeIntoWithEmptyReader() async throws { + let reader = UniqueArrayCallerAsyncReader( + storage: UniqueArray() + ) + var writer = UniqueArrayAsyncWriter() + + try await reader.pipe(into: &writer) + + #expect(writer.storage.count == 0) + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func pipeIntoLoopsAcrossMultipleBuffers() async throws { + // The writer hands out 64-element buffers. Use 200 elements to force the + // implementation to call write multiple times. + let elements = Array(1...200) + let reader = UniqueArrayCallerAsyncReader( + storage: UniqueArray(capacity: elements.count, copying: elements) + ) + var writer = UniqueArrayAsyncWriter(capacity: 256) + + try await reader.pipe(into: &writer) + + #expect(writer.storage.count == elements.count) + for i in 0..() + ) + var writer = UniqueArrayCallerAsyncWriter() + + try await reader.pipe(bufferingInto: &writer, intermediateCapacity: 16) + + #expect(writer.storage.count == 0) + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func pipeBufferingIntoReusesIntermediateBufferAcrossMultipleIterations() async throws { + // The intermediate buffer holds 16 elements. With 100 source elements, the loop + // must iterate at least 7 times, reusing the same buffer. + let elements = Array(1...100) + let reader = UniqueArrayCallerAsyncReader( + storage: UniqueArray(capacity: elements.count, copying: elements) + ) + var writer = UniqueArrayCallerAsyncWriter(capacity: elements.count) + + try await reader.pipe(bufferingInto: &writer, intermediateCapacity: 16) + + #expect(writer.storage.count == elements.count) + for i in 0..