diff --git a/Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift b/Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift new file mode 100644 index 00000000..6cdbb8be --- /dev/null +++ b/Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift @@ -0,0 +1,196 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if UnstableAsyncStreaming && compiler(>=6.4) + +public import ContainersPreview +import BasicContainers + +@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) +extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable { + /// Collects elements from the reader up to a specified limit and processes them with a body function. + /// + /// This method continuously reads elements from the async reader, accumulating them in a buffer + /// until either it reaches the end of the stream (indicated by an empty `Span`) or reaches + /// the specified limit. Once collection completes, it passes the accumulated elements to the + /// provided body function as a `Span` for processing. + /// + /// - Parameters: + /// - limit: The maximum number of elements to collect. This prevents unbounded memory + /// growth when reading from potentially infinite streams. + /// - body: A closure that receives a `Span` containing all collected elements and returns + /// a result of type `Result`. The method calls this closure once after collecting all + /// elements successfully. + /// + /// - Returns: The value returned by the body closure after processing the collected elements. + /// + /// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation + /// or a `Failure` from the body closure. + /// + /// ## Example + /// + /// ```swift + /// var reader: SomeAsyncReader = ... + /// + /// let processedData = try await reader.collect(upTo: 1000) { span in + /// // Process all collected elements + /// } + /// ``` + /// + /// ## Memory Considerations + /// + /// Since this method buffers all elements in memory before processing, it should be used + /// with caution on large datasets. The `limit` parameter serves as a safety mechanism + /// to prevent excessive memory usage. + public mutating func collect( + upTo limit: Int, + body: (consuming InputSpan) async throws(Failure) -> Result + ) async throws(EitherError) -> Result { + // TODO: In the future we might want to use a temporary allocation instead + // but those don't support async closures yet. + var buffer = UniqueArray() + buffer.reserveCapacity(limit) + var shouldContinue = true + do { + while shouldContinue { + try await self.read( + maximumCount: limit - buffer.count + ) { (span: consuming InputSpan) in + guard span.count > 0 else { + shouldContinue = false + return + } + precondition(span.count <= limit - buffer.count) + while let element = span.popFirst() { + buffer.append(element) + } + } + } + } catch { + switch error { + case .first(let error): + throw .first(error) + case .second: + fatalError() + } + } + do { + var consumer = buffer.consumeAll() + return try await body(consumer.drainNext()) + } catch { + throw .second(error) + } + } + + /// Collects elements from the reader up to a specified limit and processes them with a body function. + /// + /// This method continuously reads elements from the async reader, accumulating them in a buffer + /// until either it reaches the end of the stream (indicated by an empty `Span`) or reaches + /// the specified limit. Once collection completes, it passes the accumulated elements to the + /// provided body function as a `Span` for processing. + /// + /// - Parameters: + /// - limit: The maximum number of elements to collect. This prevents unbounded memory + /// growth when reading from potentially infinite streams. + /// - body: A closure that receives a `Span` containing all collected elements and returns + /// a result of type `Result`. The method calls this closure once after collecting all + /// elements successfully. + /// + /// - Returns: The value returned by the body closure after processing the collected elements. + /// + /// ## Example + /// + /// ```swift + /// var reader: SomeAsyncReader = ... + /// + /// let processedData = try await reader.collect(upTo: 1000) { span in + /// // Process all collected elements + /// } + /// ``` + /// + /// ## Memory Considerations + /// + /// Since this method buffers all elements in memory before processing, it should be used + /// with caution on large datasets. The `limit` parameter serves as a safety mechanism + /// to prevent excessive memory usage. + public mutating func collect( + upTo limit: Int, + body: (consuming InputSpan) async -> Result + ) async -> Result where ReadFailure == Never { + // TODO: In the future we might want to use a temporary allocation instead + // but those don't support async closures yet. + var buffer = UniqueArray() + buffer.reserveCapacity(limit) + var shouldContinue = true + while limit - buffer.count > 0 && shouldContinue { + // This force-try is safe since neither read nor the closure are throwing + try! await self.read( + maximumCount: limit - buffer.count + ) { (span: consuming InputSpan) in + precondition(span.count <= limit - buffer.count) + guard span.count > 0 else { + // This means the underlying reader is finished and we can return + shouldContinue = false + return + } + while let element = span.popFirst() { + buffer.append(element) + } + } + } + var consumer = buffer.consumeAll() + return await body(consumer.drainNext()) + } + + /// Collects elements from the reader into an output span until the span is full. + /// + /// This method continuously reads elements from the async reader and appends them to the + /// provided output span until the span reaches its capacity. This provides an efficient + /// way to fill a pre-allocated buffer with elements from the reader. + /// + /// - Parameter outputSpan: An `OutputSpan` to append read elements into. The method continues + /// reading until this span is full. + /// + /// - Throws: An error of type `ReadFailure` if any read operation fails. + /// + /// ## Example + /// + /// ```swift + /// var reader: SomeAsyncReader = ... + /// var buffer = [Int](repeating: 0, count: 100) + /// + /// try await buffer.withOutputSpan { outputSpan in + /// try await reader.collect(into: &outputSpan) + /// } + /// ``` + public mutating func collect( + into outputSpan: inout OutputSpan + ) async throws(ReadFailure) { + while !outputSpan.isFull { + do { + try await self.read(maximumCount: outputSpan.freeCapacity) { (span: consuming InputSpan) in + while let element = span.popFirst() { + outputSpan.append(element) + } + } + } catch { + switch error { + case .first(let error): + throw error + case .second: + fatalError() + } + } + } + } +} + +#endif diff --git a/Sources/AsyncStreaming/AsyncReader/AsyncReader.swift b/Sources/AsyncStreaming/AsyncReader/AsyncReader.swift index 4185167f..549a36b1 100644 --- a/Sources/AsyncStreaming/AsyncReader/AsyncReader.swift +++ b/Sources/AsyncStreaming/AsyncReader/AsyncReader.swift @@ -58,7 +58,7 @@ public protocol AsyncReader: ~Copyable, ~Escapable { } @available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, visionOS 1.0, *) -extension AsyncReader where Self: ~Copyable, Self: ~Escapable { +extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable { /// Reads elements with no upper bound on span size. public mutating func read( body: (consuming InputSpan) async throws(Failure) -> Return diff --git a/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+collectTests.swift b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+collectTests.swift new file mode 100644 index 00000000..36fa922c --- /dev/null +++ b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+collectTests.swift @@ -0,0 +1,101 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if UnstableAsyncStreaming && compiler(>=6.4) + +import AsyncStreaming +import BasicContainers +import ContainersPreview +import Testing + +@Suite +struct AsyncReaderCollectTests { + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func collectAllElements() async { + var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5])) + + let result = await reader.collect(upTo: 10) { span in + return Array(span) + } + + #expect(result == [1, 2, 3, 4, 5]) + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func collectWithExactLimit() async { + var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5])) + + let result = await reader.collect(upTo: 5) { span in + return Array(span) + } + + #expect(result == [1, 2, 3, 4, 5]) + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func collectEmptyReader() async { + var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 0, copying: [])) + + let result = await reader.collect(upTo: 10) { span in + return span.count + } + + #expect(result == 0) + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func collectProcessesAllElements() async { + var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [10, 20, 30])) + + let result = await reader.collect(upTo: 10) { span in + var sum = 0 + for i in span.indices { + sum += span[i] + } + return sum + } + + #expect(result == 60) + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func collectIntoOutputSpan() async { + // TODO: Cannot test this yet since we can't get `InputSpan`s available in async contexts + // var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5])) + // var buffer = RigidArray.init(capacity: 5) + // + // await buffer.append(count: 5) { outputSpan in + // await reader.collect(into: &outputSpan) + // } + // + // #expect(buffer.count == 5) + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func collectWithNeverFailingReader() async { + var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3])) + + // This tests the Never overload + let result = await reader.collect(upTo: 10) { span in + return span.count + } + + #expect(result == 3) + } +} + +#endif