[AsyncStreaming] Add support for final elements#422
Conversation
572ec81 to
c313c92
Compare
| public mutating func collect<Result, Failure: Error>( | ||
| public consuming func collect<Result, Failure: Error>( | ||
| upTo limit: Int, | ||
| body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Result |
There was a problem hiding this comment.
Should the final element be delivered to the closure?
There was a problem hiding this comment.
I intend to change this API in a future PR to be spelled collect(into: &buffer) and that should return the FinalElement as well.
## Motivation Real transports deliver structured data alongside end-of-stream and need to fuse the last write with the close. HTTP trailers and gRPC status are the obvious cases. Neither can be expressed by an empty-buffer terminator, and both lose H2/H3/QUIC's DATA+END_STREAM coalescing without a fused call. ## Modification Adds `FinalElement: ~Copyable = Void` as a primary associated type on all four protocols. `AsyncReader` delivers it via a `consuming FinalElement?` closure parameter; `CallerAsyncReader` returns it. Both writers gain a consuming `finish` carrying the last chunk and the payload in one call. `forEachBuffer`, `collect`, and the `pipe` variants thread the payload through. `collect` becomes consuming and gains a `Void`-final overload returning just the result. Adds an "Alternatives considered" entry covering what would break without this: HTTP body, gRPC, and fused close on H2/H3/QUIC. ## Result The four protocols can back HTTP body, gRPC streaming, and similar shapes without giving up the fused close. Default `Void` keeps simple conformers unchanged; `Never` marks infinite streams. Custom and `~Copyable` payloads work end to end.
c313c92 to
90a5e45
Compare
|
|
||
| try await reader.pipe(into: &writer) | ||
|
|
||
| #expect(writer.storage.count == 5) |
There was a problem hiding this comment.
There are a few tests where I had to drop the expectations since pipe is now consuming. However, I have added them back in a follow up PR #425. I needed a few more fundamental building blocks to properly write those tests.
guoye-zhang
left a comment
There was a problem hiding this comment.
We could simplify CallerAsyncWriter into a single method that takes a buffer and an optional final element, since the code to write the buffer can usually be shared between those methods.
| /// } | ||
| /// ``` | ||
| public mutating func collect<Result, Failure: Error>( | ||
| // TODO: We should make this method take an inout `RangeReplacableCollection` instead |
There was a problem hiding this comment.
What's keeping us from doing this now? Also shouldn't it be a RangeReplaceableContainer?
There was a problem hiding this comment.
I want to keep the scope of the PR limited to one change at a time
There was a problem hiding this comment.
Sure. Should it not be a Container though instead of a Collection? Out of curiosity, you don't have to update the comment.
There was a problem hiding this comment.
Ah yeah it should. My bad :D
| switch error { | ||
| case .first(let writeFailure): throw writeFailure | ||
| case .second: fatalError("Unreachable") |
There was a problem hiding this comment.
If the second error in EitherError is Never, can't we call unwrap() and have the type system realise it only throws the first? Or is it not that smart that we need this?
There was a problem hiding this comment.
Sadly no. The problem is that would require extensions like this
extension EitherError where First == Never {
public func unwrap() throws(Second) {
switch self {
case .first:
fatalError()
case .second(let second):
throw second
}
}
}
extension EitherError where Second == Never {
public func unwrap() throws(First) {
switch self {
case .first(let first):
throw first
case .second:
fatalError()
}
}
}which result in ambiguity at the call site since you can't overload on the thrown type
| /// The reader signals end-of-stream by passing a non-`nil` value for the | ||
| /// `finalElement` parameter of the `body` closure. This call may also carry a |
There was a problem hiding this comment.
This would allow empty buffers being received but not meaning end of stream, right? Is that a concern?
There was a problem hiding this comment.
Some protocols have it as valid to send empty buffers for liveliness probes so this would be a supported pattern.
|
|
||
| do { | ||
| try await reader.forEachBuffer { (_) throws(TestError) -> Void in | ||
| _ = try await reader.forEachBuffer { (_) throws(TestError) -> Void in |
There was a problem hiding this comment.
Nit: do we want to annotate forEachBuffer as @discardableResult?
There was a problem hiding this comment.
I wanted to see more adoption before doing that.
I thought about this but I think this is worse for two reasons:
|
Yeah not being consuming is unfortunate, and would necessitate a separate function.
That's somewhat debatable. Suppose we are writing a middleware that transforms the body: Implementations have to either duplicate code or make both functions call a common helper |
Motivation
Real transports deliver structured data alongside end-of-stream and need to fuse the last write with the close. HTTP trailers and gRPC status are the obvious cases. Neither can be expressed by an empty-buffer terminator, and both lose H2/H3/QUIC's DATA+END_STREAM coalescing without a fused call.
Modification
Adds
FinalElement: ~Copyable = Voidas a primary associated type on all four protocols.AsyncReaderdelivers it via aconsuming FinalElement?closure parameter;CallerAsyncReaderreturns it. Both writers gain a consumingfinishcarrying the last chunk and the payload in one call.forEachBuffer,collect, and thepipevariants thread the payload through.collectbecomes consuming and gains aVoid-final overload returning just the result.Adds an "Alternatives considered" entry covering what would break without this: HTTP body, gRPC, and fused close on H2/H3/QUIC.
Result
The four protocols can back HTTP body, gRPC streaming, and similar shapes without giving up the fused close. Default
Voidkeeps simple conformers unchanged;Nevermarks infinite streams. Custom and~Copyablepayloads work end to end.