Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 53 additions & 19 deletions Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,20 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Cop
/// Collects elements from the reader up to a specified limit and processes them.
///
/// This method continuously reads elements from the async reader, accumulating them in an
/// internal buffer until either it reaches the end of the stream or the specified limit.
/// internal buffer until either the reader signals end-of-stream (by delivering a
/// non-`nil` ``AsyncReader/FinalElement``) or the specified limit is reached.
/// Once collection completes, it passes the accumulated elements to the provided body
/// closure as an `InputSpan` for processing.
/// closure as an `InputSpan` for processing, and returns the body's result together
/// with the ``AsyncReader/FinalElement``.
///
/// - Parameters:
/// - limit: The maximum number of elements to collect. This prevents unbounded memory
/// growth when reading from potentially infinite streams.
/// - body: A closure that receives an `InputSpan` containing all collected elements and returns
/// a result of type `Result`.
///
/// - Returns: The value returned by the body closure after processing the collected elements.
/// - Returns: A tuple of the body closure's result and the ``AsyncReader/FinalElement``
/// delivered with the terminal chunk.
///
/// - Throws: An `EitherError` wrapping either a read failure (which itself may be an
/// ``AsyncReaderLeftOverElementsError`` if the reader produces more elements than the limit),
Expand All @@ -48,32 +51,38 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Cop
/// ```swift
/// var reader: SomeAsyncReader = ...
///
/// let processedData = try await reader.collect(upTo: 1000) { span in
/// let (processedData, _) = try await reader.collect(upTo: 1000) { span in
/// // Process all collected elements
/// }
/// ```
public mutating func collect<Result, Failure: Error>(
// TODO: We should make this method take an inout `RangeReplacableCollection` instead
public consuming func collect<Result, Failure: Error>(
upTo limit: Int,
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Result
) async throws(EitherError<EitherError<ReadFailure, AsyncReaderLeftOverElementsError>, Failure>) -> Result {
) async throws(EitherError<EitherError<ReadFailure, AsyncReaderLeftOverElementsError>, Failure>) -> (
Result, FinalElement
) {
var reader = self
// TODO: In the future we might want to use a temporary allocation instead
// but those don't support async closures yet.
var collectedBuffer = UniqueArray<ReadElement>()
collectedBuffer.reserveCapacity(limit)
var shouldContinue = true
var finalElement: FinalElement? = nil
do {
while shouldContinue {
try await self.read { (buffer: inout Buffer) throws(AsyncReaderLeftOverElementsError) -> Void in
guard buffer.count > 0 else {
shouldContinue = false
return
}
if limit - collectedBuffer.count < buffer.count {
throw AsyncReaderLeftOverElementsError()
while finalElement == nil {
try await reader.read {
(buffer: inout Buffer, final: FinalElement?) throws(AsyncReaderLeftOverElementsError) -> Void in
if buffer.count > 0 {
if limit - collectedBuffer.count < buffer.count {
throw AsyncReaderLeftOverElementsError()
}
var consumer = buffer.consumeAll()
while let element = consumer.next() {
collectedBuffer.append(element)
}
}
var consumer = buffer.consumeAll()
while let element = consumer.next() {
collectedBuffer.append(element)
if let final {
finalElement = final
}
}
}
Expand All @@ -82,11 +91,36 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Cop
}
do {
var consumer = collectedBuffer.consumeAll()
return try await body(consumer.drainNext())
let result = try await body(consumer.drainNext())
// The force-unwrap is safe since final element must be set at this point
return (result, finalElement!)
} catch {
throw .second(error)
}
}
}

@available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, *)
extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable, FinalElement == Void {
/// Collects elements from the reader up to a specified limit and processes them.
///
/// This overload is available when ``AsyncReader/FinalElement`` is `Void`.
/// It returns only the body closure's result — there is no payload to surface.
///
/// - Parameters:
/// - limit: The maximum number of elements to collect.
/// - body: A closure that receives an `InputSpan` of collected elements.
/// - Returns: The body closure's result.
/// - Throws: An `EitherError` wrapping either a read failure (possibly an
/// ``AsyncReaderLeftOverElementsError``) or a `Failure` from `body`.
// TODO: We should make this method take an inout `RangeReplacableCollection` instead
public consuming func collect<Result, Failure: Error>(
upTo limit: Int,
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Result
) async throws(EitherError<EitherError<ReadFailure, AsyncReaderLeftOverElementsError>, Failure>) -> Result {
let (result, _): (Result, Void?) = try await self.collect(upTo: limit, body: body)
return result
}
}

#endif
80 changes: 41 additions & 39 deletions Sources/AsyncStreaming/AsyncReader/AsyncReader+forEach.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,87 +11,89 @@

#if UnstableAsyncStreaming && compiler(>=6.4)

public import ContainersPreview
import ContainersPreview

// swift-format-ignore: AmbiguousTrailingClosureOverload
@available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, *)
extension AsyncReader where Self: ~Copyable, Self: ~Escapable {
/// Iterates over all chunks from the reader, executing the provided body for each buffer.
/// Iterates over all chunks from the reader, executing the provided body for
/// each buffer until the stream signals end-of-stream.
///
/// This method continuously reads chunks from the async reader until the stream ends,
/// executing the provided closure for each buffer of elements read. The iteration terminates
/// when the reader produces an empty buffer, indicating the end of the stream.
///
/// - Parameter body: An asynchronous closure that processes each buffer of elements read
/// from the stream.
///
/// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation
/// or a `Failure` from the body closure.
/// This method continuously reads chunks from the async reader, executing
/// `body` for every chunk — including the terminal one — and terminates the
/// loop when the reader delivers a non-`nil` ``AsyncReader/FinalElement``.
/// The returned value is that ``AsyncReader/FinalElement``.
///
/// ## Example
///
/// ```swift
/// var fileReader: FileAsyncReader = ...
///
/// try await fileReader.forEachBuffer { buffer in
/// _ = try await fileReader.forEachBuffer { buffer in
/// print("Processing \(buffer.count) elements")
/// }
/// ```
///
/// - Parameter body: An asynchronous closure that processes each buffer of
/// elements read from the stream.
/// - Returns: The ``AsyncReader/FinalElement`` delivered with the terminal
/// chunk, or `nil` if none was observed.
/// - Throws: An `EitherError` containing either a `ReadFailure` from the
/// read operation or a `Failure` from the body closure.
public consuming func forEachBuffer<Failure: Error>(
body: (inout Buffer) async throws(Failure) -> Void
) async throws(EitherError<ReadFailure, Failure>) {
var shouldContinue = true
while shouldContinue {
try await self.read { (next) throws(Failure) -> Void in
guard next.count > 0 else {
shouldContinue = false
return
}

) async throws(EitherError<ReadFailure, Failure>) -> FinalElement? {
var final: FinalElement? = nil
var done = false
while !done {
try await self.read { (next, finalElement) throws(Failure) -> Void in
try await body(&next)
if let finalElement {
final = finalElement
done = true
}
}
}
return final
}

/// Iterates over all chunks from a non-failing reader, executing the provided body for each buffer.
///
/// This method continuously reads chunks from the async reader until the stream ends,
/// executing the provided closure for each buffer of elements read. The iteration terminates
/// when the reader produces an empty buffer, indicating the end of the stream.
/// Iterates over all chunks from a non-failing reader, executing the
/// provided body for each buffer until the stream signals end-of-stream.
///
/// Use this overload when the reader's ``AsyncReader/ReadFailure`` type is `Never`.
///
/// - Parameter body: An asynchronous closure that processes each buffer of elements read
/// from the stream.
///
/// ## Example
///
/// ```swift
/// var fileReader: FileAsyncReader = ...
///
/// await fileReader.forEachBuffer { buffer in
/// _ = await fileReader.forEachBuffer { buffer in
/// print("Processing \(buffer.count) elements")
/// }
/// ```
///
/// - Parameter body: An asynchronous closure that processes each buffer of
/// elements read from the stream.
/// - Returns: The ``AsyncReader/FinalElement`` delivered with the terminal chunk.
@inlinable
public consuming func forEachBuffer(
body: (inout Buffer) async -> Void
) async where ReadFailure == Never {
var shouldContinue = true
while shouldContinue {
) async -> FinalElement where ReadFailure == Never {
var finalElement: FinalElement? = nil
while finalElement == nil {
do {
try await self.read { (next) -> Void in
guard next.count > 0 else {
shouldContinue = false
return
}

try await self.read { (next, final) -> Void in
await body(&next)
if let final {
finalElement = final
}
}
} catch {
fatalError()
}
}
// The force-unwrap is safe since final element must be set at this point
return finalElement!
}
}
#endif
Loading
Loading