-
Notifications
You must be signed in to change notification settings - Fork 206
[AsyncStreaming] Add support for final elements #422
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,17 +27,20 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Cop | |
| /// Collects elements from the reader up to a specified limit and processes them. | ||
| /// | ||
| /// This method continuously reads elements from the async reader, accumulating them in an | ||
| /// internal buffer until either it reaches the end of the stream or the specified limit. | ||
| /// internal buffer until either the reader signals end-of-stream (by delivering a | ||
| /// non-`nil` ``AsyncReader/FinalElement``) or the specified limit is reached. | ||
| /// Once collection completes, it passes the accumulated elements to the provided body | ||
| /// closure as an `InputSpan` for processing. | ||
| /// closure as an `InputSpan` for processing, and returns the body's result together | ||
| /// with the ``AsyncReader/FinalElement``. | ||
| /// | ||
| /// - Parameters: | ||
| /// - limit: The maximum number of elements to collect. This prevents unbounded memory | ||
| /// growth when reading from potentially infinite streams. | ||
| /// - body: A closure that receives an `InputSpan` containing all collected elements and returns | ||
| /// a result of type `Result`. | ||
| /// | ||
| /// - Returns: The value returned by the body closure after processing the collected elements. | ||
| /// - Returns: A tuple of the body closure's result and the ``AsyncReader/FinalElement`` | ||
| /// delivered with the terminal chunk. | ||
| /// | ||
| /// - Throws: An `EitherError` wrapping either a read failure (which itself may be an | ||
| /// ``AsyncReaderLeftOverElementsError`` if the reader produces more elements than the limit), | ||
|
|
@@ -48,32 +51,38 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Cop | |
| /// ```swift | ||
| /// var reader: SomeAsyncReader = ... | ||
| /// | ||
| /// let processedData = try await reader.collect(upTo: 1000) { span in | ||
| /// let (processedData, _) = try await reader.collect(upTo: 1000) { span in | ||
| /// // Process all collected elements | ||
| /// } | ||
| /// ``` | ||
| public mutating func collect<Result, Failure: Error>( | ||
| // TODO: We should make this method take an inout `RangeReplacableCollection` instead | ||
| public consuming func collect<Result, Failure: Error>( | ||
| upTo limit: Int, | ||
| body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Result | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should the final element be delivered to the closure?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I intend to change this API in a future PR to be spelled |
||
| ) async throws(EitherError<EitherError<ReadFailure, AsyncReaderLeftOverElementsError>, Failure>) -> Result { | ||
| ) async throws(EitherError<EitherError<ReadFailure, AsyncReaderLeftOverElementsError>, Failure>) -> ( | ||
| Result, FinalElement | ||
| ) { | ||
| var reader = self | ||
| // TODO: In the future we might want to use a temporary allocation instead | ||
| // but those don't support async closures yet. | ||
| var collectedBuffer = UniqueArray<ReadElement>() | ||
| collectedBuffer.reserveCapacity(limit) | ||
| var shouldContinue = true | ||
| var finalElement: FinalElement? = nil | ||
| do { | ||
| while shouldContinue { | ||
| try await self.read { (buffer: inout Buffer) throws(AsyncReaderLeftOverElementsError) -> Void in | ||
| guard buffer.count > 0 else { | ||
| shouldContinue = false | ||
| return | ||
| } | ||
| if limit - collectedBuffer.count < buffer.count { | ||
| throw AsyncReaderLeftOverElementsError() | ||
| while finalElement == nil { | ||
| try await reader.read { | ||
| (buffer: inout Buffer, final: FinalElement?) throws(AsyncReaderLeftOverElementsError) -> Void in | ||
| if buffer.count > 0 { | ||
| if limit - collectedBuffer.count < buffer.count { | ||
| throw AsyncReaderLeftOverElementsError() | ||
| } | ||
| var consumer = buffer.consumeAll() | ||
| while let element = consumer.next() { | ||
| collectedBuffer.append(element) | ||
| } | ||
| } | ||
| var consumer = buffer.consumeAll() | ||
| while let element = consumer.next() { | ||
| collectedBuffer.append(element) | ||
| if let final { | ||
| finalElement = final | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -82,11 +91,36 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Cop | |
| } | ||
| do { | ||
| var consumer = collectedBuffer.consumeAll() | ||
| return try await body(consumer.drainNext()) | ||
| let result = try await body(consumer.drainNext()) | ||
| // The force-unwrap is safe since final element must be set at this point | ||
| return (result, finalElement!) | ||
| } catch { | ||
| throw .second(error) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, *) | ||
| extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable, FinalElement == Void { | ||
| /// Collects elements from the reader up to a specified limit and processes them. | ||
| /// | ||
| /// This overload is available when ``AsyncReader/FinalElement`` is `Void`. | ||
| /// It returns only the body closure's result — there is no payload to surface. | ||
| /// | ||
| /// - Parameters: | ||
| /// - limit: The maximum number of elements to collect. | ||
| /// - body: A closure that receives an `InputSpan` of collected elements. | ||
| /// - Returns: The body closure's result. | ||
| /// - Throws: An `EitherError` wrapping either a read failure (possibly an | ||
| /// ``AsyncReaderLeftOverElementsError``) or a `Failure` from `body`. | ||
| // TODO: We should make this method take an inout `RangeReplacableCollection` instead | ||
| public consuming func collect<Result, Failure: Error>( | ||
| upTo limit: Int, | ||
| body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Result | ||
| ) async throws(EitherError<EitherError<ReadFailure, AsyncReaderLeftOverElementsError>, Failure>) -> Result { | ||
| let (result, _): (Result, Void?) = try await self.collect(upTo: limit, body: body) | ||
| return result | ||
| } | ||
| } | ||
|
|
||
| #endif | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's keeping us from doing this now? Also shouldn't it be a
RangeReplaceableContainer?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to keep the scope of the PR limited to one change at a time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Should it not be a
Containerthough instead of aCollection? Out of curiosity, you don't have to update the comment.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah it should. My bad :D