Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions Sources/AsyncStreaming/AsyncReader/AsyncReader+pipe.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2026 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

#if UnstableAsyncStreaming && compiler(>=6.4)

import ContainersPreview

@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
extension AsyncReader where Self: ~Copyable, Self: ~Escapable, Self.ReadElement: ~Copyable {
/// Pipes all elements from this reader into the given writer.
///
/// This method consumes the reader and writes all of its elements into the writer's
/// destination. It iterates over each buffer the reader produces and hands it to the
/// writer until this reader's stream ends.
///
/// ## Example
///
/// ```swift
/// let dataReader: DataAsyncReader = ...
/// var fileWriter: FileCallerAsyncWriter = ...
///
/// // Copy all data from reader to writer
/// try await dataReader.pipe(into: &fileWriter)
/// ```
///
/// - Parameter writer: A ``CallerAsyncWriter`` to receive the elements. The writer is
/// mutated in place and remains usable after this operation.
///
/// - Throws: An error originating from the read or write operations.
public consuming func pipe<Writer>(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The functionality here is great, but I really don't like this name. We can fix it later (i.e. it shouldn't block landing), but we should find something better. I'd love something like init(consumingContentsOf:), but I can see how that would get awkward if init should take other parameters.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other names I thought about where fuse(into:) or even read(into:). I am very open to changing the name but I intentionally did not put it on the writer types as neither an init nor a method because I felt it belongs on the producing type and allows nicer chaining. Again everything up for further discussion.

into writer: inout Writer
) async throws(EitherError<ReadFailure, Writer.WriteFailure>)
where Writer: CallerAsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement {
try await self.forEachBuffer { (buffer: inout Buffer) throws(Writer.WriteFailure) in
try await writer.write(buffer: &buffer)
}
}

/// Pipes all elements from this reader into the given writer, copying each element from the reader's
/// buffer into the writer's buffer.
///
/// This method consumes the reader and writes all of its elements into the writer's
/// destination. Because both protocols supply their own buffer, each element must be
/// transferred from the reader's buffer into the writer's buffer. The writer's buffer
/// may be smaller than the reader's, in which case multiple `write` calls are issued
/// per chunk produced by the reader.
///
/// ## Example
///
/// ```swift
/// let dataReader: DataAsyncReader = ...
/// var fileWriter: FileAsyncWriter = ...
///
/// // Copy all data from reader to writer
/// try await dataReader.pipe(copyingInto: &fileWriter)
/// ```
///
/// - Parameter writer: An ``AsyncWriter`` to receive the elements. The writer is
/// mutated in place and remains usable after this operation.
///
/// - Throws: An error originating from the read or write operations.
public consuming func pipe<Writer>(
copyingInto writer: inout Writer
) async throws(EitherError<ReadFailure, Writer.WriteFailure>)
where Writer: AsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement {
try await self.forEachBuffer { (readerBuffer: inout Buffer) throws(Writer.WriteFailure) in
var consumer = readerBuffer.consumeAll()
while let firstElement = consumer.next() {
var pending: ReadElement? = firstElement
do throws(EitherError<Writer.WriteFailure, Never>) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

…I can't believe I learned some Swift syntax from this PR 😂

try await writer.write { (writerBuffer: inout Writer.Buffer) in
switch consume pending {
case .some(let element):
writerBuffer.append(element)
case .none:
break
}
pending = nil
// TODO: We should check if we can use one of the append methods instead of
// element by element copies in the future
while writerBuffer.freeCapacity > 0 {
guard let element = consumer.next() else { return }
writerBuffer.append(element)
}
}
} catch {
switch error {
case .first(let writeFailure):
throw writeFailure
case .second:
fatalError("Unreachable")
}
}
}
}
}
}
#endif
107 changes: 107 additions & 0 deletions Sources/AsyncStreaming/CallerAsyncReader/CallerAsyncReader+pipe.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2026 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

#if UnstableAsyncStreaming && compiler(>=6.4)

import ContainersPreview
import BasicContainers

@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
extension CallerAsyncReader where Self: ~Copyable, Self: ~Escapable, Self.ReadElement: ~Copyable {
/// Pipes all elements from this reader into the given writer.
///
/// This method consumes the reader and writes all of its elements into the writer's
/// destination. It continuously reads chunks into buffers supplied by the writer and
/// flushes them until this reader's stream ends.
///
/// ## Example
///
/// ```swift
/// let dataReader: DataCallerAsyncReader = ...
/// var fileWriter: FileAsyncWriter = ...
///
/// // Copy all data from reader to writer
/// try await dataReader.pipe(into: &fileWriter)
/// ```
///
/// - Parameter writer: An ``AsyncWriter`` to receive the elements. The writer is mutated
/// in place and remains usable after this operation.
///
/// - Throws: An error originating from the read or write operations.
public consuming func pipe<Writer>(
into writer: inout Writer
) async throws(EitherError<Writer.WriteFailure, ReadFailure>)
where Writer: AsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement {
var shouldContinue = true
while shouldContinue {
try await writer
.write { (buffer: inout Writer.Buffer) throws(ReadFailure) in
try await self.read(into: &buffer)
if buffer.count == 0 {
shouldContinue = false
}
}
}
}

/// Pipes all elements from this reader into the given writer through an intermediate buffer.
///
/// This method consumes the reader and writes all of its elements into the writer's
/// destination. Because neither the reader nor the writer supplies a buffer, this
/// method allocates an intermediate buffer of the requested capacity and reuses it
/// across iterations: each iteration fills the buffer from the reader and then hands
/// it to the writer to drain.
///
/// ## Example
///
/// ```swift
/// let dataReader: DataCallerAsyncReader = ...
/// var fileWriter: FileCallerAsyncWriter = ...
///
/// // Copy all data from reader to writer using a 4 KB intermediate buffer
/// try await dataReader.pipe(bufferingInto: &fileWriter, intermediateCapacity: 4096)
/// ```
///
/// - Parameters:
/// - writer: A ``CallerAsyncWriter`` to receive the elements. The writer is mutated
/// in place and remains usable after this operation.
/// - intermediateCapacity: The capacity of the intermediate buffer that mediates
/// between the reader and writer. Larger values reduce the number of read and
/// write calls at the cost of memory.
///
/// - Throws: An error originating from the read or write operations.
public consuming func pipe<Writer>(
bufferingInto writer: inout Writer,
intermediateCapacity: Int
) async throws(EitherError<ReadFailure, Writer.WriteFailure>)
where Writer: CallerAsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement {
var buffer = UniqueArray<ReadElement>(minimumCapacity: intermediateCapacity)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another place we need a modernized version of withUnsafeTemporaryAllocation

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the new withUnsafeTemporaryAllocation with OutputSpan gets us close but the OutputSpan is not available in an async context so we can't use it here...

var shouldContinue = true
while shouldContinue {
do throws(ReadFailure) {
try await self.read(into: &buffer)
} catch {
throw .first(error)
}
if buffer.count == 0 {
shouldContinue = false
} else {
do throws(Writer.WriteFailure) {
try await writer.write(buffer: &buffer)
assert(buffer.count == 0, "CallerAsyncWriter must drain the buffer during write(buffer:)")
} catch {
throw .second(error)
}
}
}
}
}
#endif
67 changes: 67 additions & 0 deletions Sources/AsyncStreaming/NNNN-async-streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,73 @@ handling semantics (particularly `collect`'s nested `EitherError` and the
`AsyncReaderLeftOverElementsError` overflow behavior) benefit from real-world
usage feedback before stabilization.

### Piping a reader into a writer

A common pattern when bridging streams is piping all elements from a reader to
a writer. We envision convenience extensions on each reader protocol that
consume the reader and forward its elements into a matching writer.

When the buffer ownership of the reader and writer aligns, the buffer can flow
directly between the two without an intermediate stage:

```swift
extension CallerAsyncReader where Self: ~Copyable, Self: ~Escapable, Self.ReadElement: ~Copyable {
/// Pipes all elements from this reader into the given callee-owned writer
/// until the stream ends.
public consuming func pipe<Writer>(
into writer: inout Writer
) async throws where Writer: AsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement
}

extension AsyncReader where Self: ~Copyable, Self: ~Escapable {
/// Pipes all elements from this reader into the given caller-owned writer
/// until the stream ends.
public consuming func pipe<Writer>(
into writer: inout Writer
) async throws where Writer: CallerAsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement
}
```

The two remaining combinations require a transfer between separate buffers. We
distinguish them with explicit argument labels so the cost is visible at the
call site:

```swift
extension AsyncReader where Self: ~Copyable, Self: ~Escapable, Self.ReadElement: ~Copyable {
/// Pipes all elements from this reader into the given callee-owned writer,
/// copying each element from the reader's buffer into the writer's buffer.
///
/// Both protocols supply their own buffer, so each element is moved between
/// them. The writer's buffer may be smaller than the reader's, in which case
/// multiple `write` calls are issued per chunk produced by the reader.
public consuming func pipe<Writer>(
copyingInto writer: inout Writer
) async throws where Writer: AsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement
}

extension CallerAsyncReader where Self: ~Copyable, Self: ~Escapable, Self.ReadElement: ~Copyable {
/// Pipes all elements from this reader into the given caller-owned writer
/// through an intermediate buffer of the requested capacity.
///
/// Neither protocol supplies a buffer, so this method allocates a single
/// `UniqueArray` and reuses it across iterations. The writer must drain all
/// elements during each `write(buffer:)` call.
public consuming func pipe<Writer>(
bufferingInto writer: inout Writer,
intermediateCapacity: Int
) async throws where Writer: CallerAsyncWriter & ~Copyable & ~Escapable, Writer.WriteElement == ReadElement
}
```

These helpers eliminate the boilerplate of manually looping, threading an
end-of-stream signal, and shuttling buffers between the two sides. Defining
`pipe` on the reader matches the precedent that the operation hangs off the
consumed source rather than the long-lived destination.

Like the iteration and collection helpers above, these are intentionally
excluded from the initial proposal so that their error handling semantics can
mature through real-world use before stabilization.

### Owned buffer transfer protocols

The four protocols in this proposal all use generic `RangeReplaceableContainer`
Expand Down
Loading
Loading