-
Notifications
You must be signed in to change notification settings - Fork 206
Add new pipe convenience methods for the async readers
#420
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| //===----------------------------------------------------------------------===// | ||
| // | ||
| // This source file is part of the Swift Async Algorithms open source project | ||
| // | ||
| // Copyright (c) 2026 Apple Inc. and the Swift project authors | ||
| // Licensed under Apache License v2.0 with Runtime Library Exception | ||
| // | ||
| // See https://swift.org/LICENSE.txt for license information | ||
| // | ||
| //===----------------------------------------------------------------------===// | ||
|
|
||
| #if UnstableAsyncStreaming && compiler(>=6.4) | ||
|
|
||
| import ContainersPreview | ||
|
|
||
| @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) | ||
| extension AsyncReader where Self: ~Copyable, Self: ~Escapable, Self.ReadElement: ~Copyable { | ||
| /// Pipes all elements from this reader into the given writer. | ||
| /// | ||
| /// This method consumes the reader and writes all of its elements into the writer's | ||
| /// destination. It iterates over each buffer the reader produces and hands it to the | ||
| /// writer until this reader's stream ends. | ||
| /// | ||
| /// ## Example | ||
| /// | ||
| /// ```swift | ||
| /// let dataReader: DataAsyncReader = ... | ||
| /// var fileWriter: FileCallerAsyncWriter = ... | ||
| /// | ||
| /// // Copy all data from reader to writer | ||
| /// try await dataReader.pipe(into: &fileWriter) | ||
| /// ``` | ||
| /// | ||
| /// - Parameter writer: A ``CallerAsyncWriter`` to receive the elements. The writer is | ||
| /// mutated in place and remains usable after this operation. | ||
| /// | ||
| /// - Throws: An error originating from the read or write operations. | ||
| public consuming func pipe<Writer>( | ||
| into writer: inout Writer | ||
| ) async throws(EitherError<ReadFailure, Writer.WriteFailure>) | ||
| where Writer: CallerAsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement { | ||
| try await self.forEachBuffer { (buffer: inout Buffer) throws(Writer.WriteFailure) in | ||
| try await writer.write(buffer: &buffer) | ||
| } | ||
| } | ||
|
|
||
| /// Pipes all elements from this reader into the given writer, copying each element from the reader's | ||
| /// buffer into the writer's buffer. | ||
| /// | ||
| /// This method consumes the reader and writes all of its elements into the writer's | ||
| /// destination. Because both protocols supply their own buffer, each element must be | ||
| /// transferred from the reader's buffer into the writer's buffer. The writer's buffer | ||
| /// may be smaller than the reader's, in which case multiple `write` calls are issued | ||
| /// per chunk produced by the reader. | ||
| /// | ||
| /// ## Example | ||
| /// | ||
| /// ```swift | ||
| /// let dataReader: DataAsyncReader = ... | ||
| /// var fileWriter: FileAsyncWriter = ... | ||
| /// | ||
| /// // Copy all data from reader to writer | ||
| /// try await dataReader.pipe(copyingInto: &fileWriter) | ||
| /// ``` | ||
| /// | ||
| /// - Parameter writer: An ``AsyncWriter`` to receive the elements. The writer is | ||
| /// mutated in place and remains usable after this operation. | ||
| /// | ||
| /// - Throws: An error originating from the read or write operations. | ||
| public consuming func pipe<Writer>( | ||
| copyingInto writer: inout Writer | ||
| ) async throws(EitherError<ReadFailure, Writer.WriteFailure>) | ||
| where Writer: AsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement { | ||
| try await self.forEachBuffer { (readerBuffer: inout Buffer) throws(Writer.WriteFailure) in | ||
| var consumer = readerBuffer.consumeAll() | ||
| while let firstElement = consumer.next() { | ||
| var pending: ReadElement? = firstElement | ||
| do throws(EitherError<Writer.WriteFailure, Never>) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. …I can't believe I learned some Swift syntax from this PR 😂 |
||
| try await writer.write { (writerBuffer: inout Writer.Buffer) in | ||
| switch consume pending { | ||
| case .some(let element): | ||
| writerBuffer.append(element) | ||
| case .none: | ||
| break | ||
| } | ||
| pending = nil | ||
| // TODO: We should check if we can use one of the append methods instead of | ||
| // element by element copies in the future | ||
| while writerBuffer.freeCapacity > 0 { | ||
| guard let element = consumer.next() else { return } | ||
| writerBuffer.append(element) | ||
| } | ||
| } | ||
| } catch { | ||
| switch error { | ||
| case .first(let writeFailure): | ||
| throw writeFailure | ||
| case .second: | ||
| fatalError("Unreachable") | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| #endif | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,107 @@ | ||
| //===----------------------------------------------------------------------===// | ||
| // | ||
| // This source file is part of the Swift Async Algorithms open source project | ||
| // | ||
| // Copyright (c) 2026 Apple Inc. and the Swift project authors | ||
| // Licensed under Apache License v2.0 with Runtime Library Exception | ||
| // | ||
| // See https://swift.org/LICENSE.txt for license information | ||
| // | ||
| //===----------------------------------------------------------------------===// | ||
|
|
||
| #if UnstableAsyncStreaming && compiler(>=6.4) | ||
|
|
||
| import ContainersPreview | ||
| import BasicContainers | ||
|
|
||
| @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) | ||
| extension CallerAsyncReader where Self: ~Copyable, Self: ~Escapable, Self.ReadElement: ~Copyable { | ||
| /// Pipes all elements from this reader into the given writer. | ||
| /// | ||
| /// This method consumes the reader and writes all of its elements into the writer's | ||
| /// destination. It continuously reads chunks into buffers supplied by the writer and | ||
| /// flushes them until this reader's stream ends. | ||
| /// | ||
| /// ## Example | ||
| /// | ||
| /// ```swift | ||
| /// let dataReader: DataCallerAsyncReader = ... | ||
| /// var fileWriter: FileAsyncWriter = ... | ||
| /// | ||
| /// // Copy all data from reader to writer | ||
| /// try await dataReader.pipe(into: &fileWriter) | ||
| /// ``` | ||
| /// | ||
| /// - Parameter writer: An ``AsyncWriter`` to receive the elements. The writer is mutated | ||
| /// in place and remains usable after this operation. | ||
| /// | ||
| /// - Throws: An error originating from the read or write operations. | ||
| public consuming func pipe<Writer>( | ||
| into writer: inout Writer | ||
| ) async throws(EitherError<Writer.WriteFailure, ReadFailure>) | ||
| where Writer: AsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement { | ||
| var shouldContinue = true | ||
| while shouldContinue { | ||
| try await writer | ||
| .write { (buffer: inout Writer.Buffer) throws(ReadFailure) in | ||
| try await self.read(into: &buffer) | ||
| if buffer.count == 0 { | ||
| shouldContinue = false | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Pipes all elements from this reader into the given writer through an intermediate buffer. | ||
| /// | ||
| /// This method consumes the reader and writes all of its elements into the writer's | ||
| /// destination. Because neither the reader nor the writer supplies a buffer, this | ||
| /// method allocates an intermediate buffer of the requested capacity and reuses it | ||
| /// across iterations: each iteration fills the buffer from the reader and then hands | ||
| /// it to the writer to drain. | ||
| /// | ||
| /// ## Example | ||
| /// | ||
| /// ```swift | ||
| /// let dataReader: DataCallerAsyncReader = ... | ||
| /// var fileWriter: FileCallerAsyncWriter = ... | ||
| /// | ||
| /// // Copy all data from reader to writer using a 4 KB intermediate buffer | ||
| /// try await dataReader.pipe(bufferingInto: &fileWriter, intermediateCapacity: 4096) | ||
| /// ``` | ||
| /// | ||
| /// - Parameters: | ||
| /// - writer: A ``CallerAsyncWriter`` to receive the elements. The writer is mutated | ||
| /// in place and remains usable after this operation. | ||
| /// - intermediateCapacity: The capacity of the intermediate buffer that mediates | ||
| /// between the reader and writer. Larger values reduce the number of read and | ||
| /// write calls at the cost of memory. | ||
| /// | ||
| /// - Throws: An error originating from the read or write operations. | ||
| public consuming func pipe<Writer>( | ||
| bufferingInto writer: inout Writer, | ||
| intermediateCapacity: Int | ||
| ) async throws(EitherError<ReadFailure, Writer.WriteFailure>) | ||
| where Writer: CallerAsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement { | ||
| var buffer = UniqueArray<ReadElement>(minimumCapacity: intermediateCapacity) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another place we need a modernized version of withUnsafeTemporaryAllocation
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah the new |
||
| var shouldContinue = true | ||
| while shouldContinue { | ||
| do throws(ReadFailure) { | ||
| try await self.read(into: &buffer) | ||
| } catch { | ||
| throw .first(error) | ||
| } | ||
| if buffer.count == 0 { | ||
| shouldContinue = false | ||
| } else { | ||
| do throws(Writer.WriteFailure) { | ||
| try await writer.write(buffer: &buffer) | ||
| assert(buffer.count == 0, "CallerAsyncWriter must drain the buffer during write(buffer:)") | ||
| } catch { | ||
| throw .second(error) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| #endif | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The functionality here is great, but I really don't like this name. We can fix it later (i.e. it shouldn't block landing), but we should find something better. I'd love something like init(consumingContentsOf:), but I can see how that would get awkward if init should take other parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other names I thought about where
fuse(into:)or evenread(into:). I am very open to changing the name but I intentionally did not put it on the writer types as neither aninitnor a method because I felt it belongs on the producing type and allows nicer chaining. Again everything up for further discussion.