Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions Sources/AsyncStreaming/AsyncReader/AsyncReader+forEach.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2026 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

#if UnstableAsyncStreaming && compiler(>=6.4)

public import ContainersPreview

// swift-format-ignore: AmbiguousTrailingClosureOverload
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
extension AsyncReader where Self: ~Copyable, Self: ~Escapable {
/// Iterates over all chunks from the reader, executing the provided body for each span.
///
/// This method continuously reads chunks from the async reader until the stream ends,
/// executing the provided closure for each span of elements read. The iteration terminates
/// when the reader produces an empty span, indicating the end of the stream.
///
/// - Parameter body: An asynchronous closure that processes each span of elements read
/// from the stream. The closure receives a `Span<ReadElement>` for each read operation.
///
/// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation
/// or a `Failure` from the body closure.
///
/// ## Example
///
/// ```swift
/// var fileReader: FileAsyncReader = ...
///
/// // Process each chunk of data from the file
/// try await fileReader.forEach { chunk in
/// print("Processing \(chunk.count) elements")
/// // Process the chunk
/// }
/// ```
public consuming func forEachChunk<Failure: Error>(
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Void
) async throws(EitherError<ReadFailure, Failure>) {
var shouldContinue = true
while shouldContinue {
try await self.read { (next) throws(Failure) -> Void in
guard next.count > 0 else {
shouldContinue = false
return
}

try await body(next)
}
}
}

/// Iterates over all chunks from the reader, executing the provided body for each span.
///
/// This method continuously reads chunks from the async reader until the stream ends,
/// executing the provided closure for each span of elements read. The iteration terminates
/// when the reader produces an empty span, indicating the end of the stream.
///
/// - Parameter body: An asynchronous closure that processes each span of elements read
/// from the stream. The closure receives a `Span<ReadElement>` for each read operation.
///
/// - Throws: An error of type `Failure` from the body closure. Since this reader never fails,
/// only the body closure can throw errors.
///
/// ## Example
///
/// ```swift
/// var fileReader: FileAsyncReader = ...
///
/// // Process each chunk of data from the file
/// try await fileReader.forEach { chunk in
/// print("Processing \(chunk.count) elements")
/// // Process the chunk
/// }
/// ```
@inlinable
public consuming func forEachChunk(
body: (consuming InputSpan<ReadElement>) async -> Void
) async where ReadFailure == Never {
var shouldContinue = true
while shouldContinue {
do {
try await self.read { (next) -> Void in
guard next.count > 0 else {
shouldContinue = false
return
}

await body(next)
}
} catch {
fatalError()
}
}
}
}
#endif
3 changes: 3 additions & 0 deletions Sources/AsyncStreaming/EitherError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,7 @@ public enum EitherError<First: Error, Second: Error>: Error {
}
}
}

extension EitherError: Equatable where First: Equatable, Second: Equatable {}
extension EitherError: Hashable where First: Hashable, Second: Hashable {}
#endif
142 changes: 142 additions & 0 deletions Tests/AsyncStreamingTests/AsyncReader/AsyncReader+forEachTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2026 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

#if UnstableAsyncStreaming && compiler(>=6.4)

import AsyncStreaming
import BasicContainers
import ContainersPreview
import Testing

@Suite
struct AsyncReaderforEachChunkTests {
@Test
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
func forEachChunkIteratesAllSpans() async throws {
let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]))
var elementCount = 0

await reader.forEachChunk { span in
elementCount += span.count
}

#expect(elementCount == 5)
}

@Test
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
func forEachChunkProcessesElements() async throws {
let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [10, 20, 30]))
var sum = 0

await reader.forEachChunk { span in
for i in span.indices {
sum += span[i]
}
}

#expect(sum == 60)
}

@Test
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
func forEachChunkWithEmptyReader() async throws {
let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 0, copying: []))
var callCount = 0

await reader.forEachChunk { span in
callCount += 1
}

#expect(callCount == 0)
}

@Test
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
func forEachChunkWithThrowingBody() async {
enum TestError: Error {
case failed
}

let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3]))

do {
try await reader.forEachChunk { (span) throws(TestError) -> Void in
throw TestError.failed
}
Issue.record("Expected error to be thrown")
} catch {
#expect(error == EitherError.second(TestError.failed))
}
}

@Test
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
func forEachChunkWithNeverFailingReader() async {
enum TestError: Error {
case failed
}

let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3]))
var count = 0

do {
try await reader.forEachChunk { (span) throws(TestError) -> Void in
count += span.count
}
} catch {
Issue.record("No error should be thrown from reader")
}

#expect(count == 3)
}

@Test
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
func forEachChunkWithAsyncWork() async throws {
let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3]))
var results: [Int] = []

await reader.forEachChunk { span in
await Task.yield()
for i in span.indices {
results.append(span[i])
}
}

#expect(results == [1, 2, 3])
}

@Test
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
func forEachChunkMultipleSpans() async {
var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 6, copying: [1, 2, 3, 4, 5, 6]))
var spanCounts: [Int] = []

// Force reading in smaller chunks
while true {
let hasMore = try! await reader.read(maximumCount: 2) { span in
if span.count > 0 {
spanCounts.append(span.count)
return true
}
return false
}
if !hasMore {
break
}
}

#expect(spanCounts == [2, 2, 2])
}
}

#endif
Loading