diff --git a/Sources/AsyncStreaming/AsyncReader/AsyncReader+forEach.swift b/Sources/AsyncStreaming/AsyncReader/AsyncReader+forEach.swift new file mode 100644 index 00000000..42ab37dc --- /dev/null +++ b/Sources/AsyncStreaming/AsyncReader/AsyncReader+forEach.swift @@ -0,0 +1,102 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if UnstableAsyncStreaming && compiler(>=6.4) + +public import ContainersPreview + +// swift-format-ignore: AmbiguousTrailingClosureOverload +@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) +extension AsyncReader where Self: ~Copyable, Self: ~Escapable { + /// Iterates over all chunks from the reader, executing the provided body for each span. + /// + /// This method continuously reads chunks from the async reader until the stream ends, + /// executing the provided closure for each span of elements read. The iteration terminates + /// when the reader produces an empty span, indicating the end of the stream. + /// + /// - Parameter body: An asynchronous closure that processes each span of elements read + /// from the stream. The closure receives a `Span` for each read operation. + /// + /// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation + /// or a `Failure` from the body closure. + /// + /// ## Example + /// + /// ```swift + /// var fileReader: FileAsyncReader = ... + /// + /// // Process each chunk of data from the file + /// try await fileReader.forEach { chunk in + /// print("Processing \(chunk.count) elements") + /// // Process the chunk + /// } + /// ``` + public consuming func forEachChunk( + body: (consuming InputSpan) async throws(Failure) -> Void + ) async throws(EitherError) { + var shouldContinue = true + while shouldContinue { + try await self.read { (next) throws(Failure) -> Void in + guard next.count > 0 else { + shouldContinue = false + return + } + + try await body(next) + } + } + } + + /// Iterates over all chunks from the reader, executing the provided body for each span. + /// + /// This method continuously reads chunks from the async reader until the stream ends, + /// executing the provided closure for each span of elements read. The iteration terminates + /// when the reader produces an empty span, indicating the end of the stream. + /// + /// - Parameter body: An asynchronous closure that processes each span of elements read + /// from the stream. The closure receives a `Span` for each read operation. + /// + /// - Throws: An error of type `Failure` from the body closure. Since this reader never fails, + /// only the body closure can throw errors. + /// + /// ## Example + /// + /// ```swift + /// var fileReader: FileAsyncReader = ... + /// + /// // Process each chunk of data from the file + /// try await fileReader.forEach { chunk in + /// print("Processing \(chunk.count) elements") + /// // Process the chunk + /// } + /// ``` + @inlinable + public consuming func forEachChunk( + body: (consuming InputSpan) async -> Void + ) async where ReadFailure == Never { + var shouldContinue = true + while shouldContinue { + do { + try await self.read { (next) -> Void in + guard next.count > 0 else { + shouldContinue = false + return + } + + await body(next) + } + } catch { + fatalError() + } + } + } +} +#endif diff --git a/Sources/AsyncStreaming/EitherError.swift b/Sources/AsyncStreaming/EitherError.swift index 103e236f..2f7ccc60 100644 --- a/Sources/AsyncStreaming/EitherError.swift +++ b/Sources/AsyncStreaming/EitherError.swift @@ -53,4 +53,7 @@ public enum EitherError: Error { } } } + +extension EitherError: Equatable where First: Equatable, Second: Equatable {} +extension EitherError: Hashable where First: Hashable, Second: Hashable {} #endif diff --git a/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+forEachTests.swift b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+forEachTests.swift new file mode 100644 index 00000000..ae31157a --- /dev/null +++ b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+forEachTests.swift @@ -0,0 +1,142 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if UnstableAsyncStreaming && compiler(>=6.4) + +import AsyncStreaming +import BasicContainers +import ContainersPreview +import Testing + +@Suite +struct AsyncReaderforEachChunkTests { + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func forEachChunkIteratesAllSpans() async throws { + let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5])) + var elementCount = 0 + + await reader.forEachChunk { span in + elementCount += span.count + } + + #expect(elementCount == 5) + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func forEachChunkProcessesElements() async throws { + let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [10, 20, 30])) + var sum = 0 + + await reader.forEachChunk { span in + for i in span.indices { + sum += span[i] + } + } + + #expect(sum == 60) + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func forEachChunkWithEmptyReader() async throws { + let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 0, copying: [])) + var callCount = 0 + + await reader.forEachChunk { span in + callCount += 1 + } + + #expect(callCount == 0) + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func forEachChunkWithThrowingBody() async { + enum TestError: Error { + case failed + } + + let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3])) + + do { + try await reader.forEachChunk { (span) throws(TestError) -> Void in + throw TestError.failed + } + Issue.record("Expected error to be thrown") + } catch { + #expect(error == EitherError.second(TestError.failed)) + } + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func forEachChunkWithNeverFailingReader() async { + enum TestError: Error { + case failed + } + + let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3])) + var count = 0 + + do { + try await reader.forEachChunk { (span) throws(TestError) -> Void in + count += span.count + } + } catch { + Issue.record("No error should be thrown from reader") + } + + #expect(count == 3) + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func forEachChunkWithAsyncWork() async throws { + let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3])) + var results: [Int] = [] + + await reader.forEachChunk { span in + await Task.yield() + for i in span.indices { + results.append(span[i]) + } + } + + #expect(results == [1, 2, 3]) + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func forEachChunkMultipleSpans() async { + var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 6, copying: [1, 2, 3, 4, 5, 6])) + var spanCounts: [Int] = [] + + // Force reading in smaller chunks + while true { + let hasMore = try! await reader.read(maximumCount: 2) { span in + if span.count > 0 { + spanCounts.append(span.count) + return true + } + return false + } + if !hasMore { + break + } + } + + #expect(spanCounts == [2, 2, 2]) + } +} + +#endif