Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 196 additions & 0 deletions Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2026 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

#if UnstableAsyncStreaming && compiler(>=6.4)

public import ContainersPreview
import BasicContainers

@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable {
/// Collects elements from the reader up to a specified limit and processes them with a body function.
///
/// This method continuously reads elements from the async reader, accumulating them in a buffer
/// until either it reaches the end of the stream (indicated by an empty `Span`) or reaches
/// the specified limit. Once collection completes, it passes the accumulated elements to the
/// provided body function as a `Span` for processing.
///
/// - Parameters:
/// - limit: The maximum number of elements to collect. This prevents unbounded memory
/// growth when reading from potentially infinite streams.
/// - body: A closure that receives a `Span` containing all collected elements and returns
/// a result of type `Result`. The method calls this closure once after collecting all
/// elements successfully.
///
/// - Returns: The value returned by the body closure after processing the collected elements.
///
/// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation
/// or a `Failure` from the body closure.
///
/// ## Example
///
/// ```swift
/// var reader: SomeAsyncReader = ...
///
/// let processedData = try await reader.collect(upTo: 1000) { span in
/// // Process all collected elements
/// }
/// ```
///
/// ## Memory Considerations
///
/// Since this method buffers all elements in memory before processing, it should be used
/// with caution on large datasets. The `limit` parameter serves as a safety mechanism
/// to prevent excessive memory usage.
public mutating func collect<Result, Failure: Error>(
upTo limit: Int,
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Result
) async throws(EitherError<ReadFailure, Failure>) -> Result {
// TODO: In the future we might want to use a temporary allocation instead
// but those don't support async closures yet.
var buffer = UniqueArray<ReadElement>()
buffer.reserveCapacity(limit)
var shouldContinue = true
do {
while shouldContinue {
try await self.read(
maximumCount: limit - buffer.count
) { (span: consuming InputSpan<ReadElement>) in
guard span.count > 0 else {
shouldContinue = false
return
}
precondition(span.count <= limit - buffer.count)
while let element = span.popFirst() {
buffer.append(element)
}
}
}
} catch {
switch error {
case .first(let error):
throw .first(error)
case .second:
fatalError()
}
}
do {
var consumer = buffer.consumeAll()
return try await body(consumer.drainNext())
} catch {
throw .second(error)
}
}

/// Collects elements from the reader up to a specified limit and processes them with a body function.
///
/// This method continuously reads elements from the async reader, accumulating them in a buffer
/// until either it reaches the end of the stream (indicated by an empty `Span`) or reaches
/// the specified limit. Once collection completes, it passes the accumulated elements to the
/// provided body function as a `Span` for processing.
///
/// - Parameters:
/// - limit: The maximum number of elements to collect. This prevents unbounded memory
/// growth when reading from potentially infinite streams.
/// - body: A closure that receives a `Span` containing all collected elements and returns
/// a result of type `Result`. The method calls this closure once after collecting all
/// elements successfully.
///
/// - Returns: The value returned by the body closure after processing the collected elements.
///
/// ## Example
///
/// ```swift
/// var reader: SomeAsyncReader = ...
///
/// let processedData = try await reader.collect(upTo: 1000) { span in
/// // Process all collected elements
/// }
/// ```
///
/// ## Memory Considerations
///
/// Since this method buffers all elements in memory before processing, it should be used
/// with caution on large datasets. The `limit` parameter serves as a safety mechanism
/// to prevent excessive memory usage.
public mutating func collect<Result>(
upTo limit: Int,
body: (consuming InputSpan<ReadElement>) async -> Result
) async -> Result where ReadFailure == Never {
// TODO: In the future we might want to use a temporary allocation instead
// but those don't support async closures yet.
var buffer = UniqueArray<ReadElement>()
buffer.reserveCapacity(limit)
var shouldContinue = true
while limit - buffer.count > 0 && shouldContinue {
// This force-try is safe since neither read nor the closure are throwing
try! await self.read(
maximumCount: limit - buffer.count
) { (span: consuming InputSpan<ReadElement>) in
precondition(span.count <= limit - buffer.count)
guard span.count > 0 else {
// This means the underlying reader is finished and we can return
shouldContinue = false
return
}
while let element = span.popFirst() {
buffer.append(element)
}
}
}
var consumer = buffer.consumeAll()
return await body(consumer.drainNext())
}

/// Collects elements from the reader into an output span until the span is full.
///
/// This method continuously reads elements from the async reader and appends them to the
/// provided output span until the span reaches its capacity. This provides an efficient
/// way to fill a pre-allocated buffer with elements from the reader.
///
/// - Parameter outputSpan: An `OutputSpan` to append read elements into. The method continues
/// reading until this span is full.
///
/// - Throws: An error of type `ReadFailure` if any read operation fails.
///
/// ## Example
///
/// ```swift
/// var reader: SomeAsyncReader = ...
/// var buffer = [Int](repeating: 0, count: 100)
///
/// try await buffer.withOutputSpan { outputSpan in
/// try await reader.collect(into: &outputSpan)
/// }
/// ```
public mutating func collect(
into outputSpan: inout OutputSpan<ReadElement>
) async throws(ReadFailure) {
while !outputSpan.isFull {
do {
try await self.read(maximumCount: outputSpan.freeCapacity) { (span: consuming InputSpan<ReadElement>) in
while let element = span.popFirst() {
outputSpan.append(element)
}
}
} catch {
switch error {
case .first(let error):
throw error
case .second:
fatalError()
}
}
}
}
}

#endif
2 changes: 1 addition & 1 deletion Sources/AsyncStreaming/AsyncReader/AsyncReader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public protocol AsyncReader<ReadElement, ReadFailure>: ~Copyable, ~Escapable {
}

@available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, visionOS 1.0, *)
extension AsyncReader where Self: ~Copyable, Self: ~Escapable {
extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable {
/// Reads elements with no upper bound on span size.
public mutating func read<Return: ~Copyable, Failure: Error>(
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Return
Expand Down
101 changes: 101 additions & 0 deletions Tests/AsyncStreamingTests/AsyncReader/AsyncReader+collectTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2026 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

#if UnstableAsyncStreaming && compiler(>=6.4)

import AsyncStreaming
import BasicContainers
import ContainersPreview
import Testing

@Suite
struct AsyncReaderCollectTests {
@Test
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
func collectAllElements() async {
var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]))

let result = await reader.collect(upTo: 10) { span in
return Array(span)
}

#expect(result == [1, 2, 3, 4, 5])
}

@Test
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
func collectWithExactLimit() async {
var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]))

let result = await reader.collect(upTo: 5) { span in
return Array(span)
}

#expect(result == [1, 2, 3, 4, 5])
}

@Test
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
func collectEmptyReader() async {
var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 0, copying: []))

let result = await reader.collect(upTo: 10) { span in
return span.count
}

#expect(result == 0)
}

@Test
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
func collectProcessesAllElements() async {
var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [10, 20, 30]))

let result = await reader.collect(upTo: 10) { span in
var sum = 0
for i in span.indices {
sum += span[i]
}
return sum
}

#expect(result == 60)
}

@Test
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
func collectIntoOutputSpan() async {
// TODO: Cannot test this yet since we can't get `InputSpan`s available in async contexts
// var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]))
// var buffer = RigidArray<Int>.init(capacity: 5)
//
// await buffer.append(count: 5) { outputSpan in
// await reader.collect(into: &outputSpan)
// }
//
// #expect(buffer.count == 5)
}

@Test
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
func collectWithNeverFailingReader() async {
var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3]))

// This tests the Never overload
let result = await reader.collect(upTo: 10) { span in
return span.count
}

#expect(result == 3)
}
}

#endif
Loading