Skip to content

[AsyncStreaming] Add support for final elements#422

Open
FranzBusch wants to merge 1 commit into
apple:mainfrom
FranzBusch:fb-async-streaming-final-element
Open

[AsyncStreaming] Add support for final elements#422
FranzBusch wants to merge 1 commit into
apple:mainfrom
FranzBusch:fb-async-streaming-final-element

Conversation

@FranzBusch
Copy link
Copy Markdown
Member

Motivation

Real transports deliver structured data alongside end-of-stream and need to fuse the last write with the close. HTTP trailers and gRPC status are the obvious cases. Neither can be expressed by an empty-buffer terminator, and both lose H2/H3/QUIC's DATA+END_STREAM coalescing without a fused call.

Modification

Adds FinalElement: ~Copyable = Void as a primary associated type on all four protocols. AsyncReader delivers it via a consuming FinalElement? closure parameter; CallerAsyncReader returns it. Both writers gain a consuming finish carrying the last chunk and the payload in one call.

forEachBuffer, collect, and the pipe variants thread the payload through. collect becomes consuming and gains a Void-final overload returning just the result.

Adds an "Alternatives considered" entry covering what would break without this: HTTP body, gRPC, and fused close on H2/H3/QUIC.

Result

The four protocols can back HTTP body, gRPC streaming, and similar shapes without giving up the fused close. Default Void keeps simple conformers unchanged; Never marks infinite streams. Custom and ~Copyable payloads work end to end.

Comment thread Sources/AsyncStreaming/AsyncWriter/AsyncWriter.swift Outdated
@FranzBusch FranzBusch force-pushed the fb-async-streaming-final-element branch from 572ec81 to c313c92 Compare May 22, 2026 14:45
public mutating func collect<Result, Failure: Error>(
public consuming func collect<Result, Failure: Error>(
upTo limit: Int,
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Result
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the final element be delivered to the closure?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intend to change this API in a future PR to be spelled collect(into: &buffer) and that should return the FinalElement as well.

Comment thread Sources/AsyncStreaming/AsyncReader/AsyncReader.swift
## Motivation

Real transports deliver structured data alongside end-of-stream and need to fuse the last write with the close. HTTP trailers and gRPC status are the obvious cases. Neither can be expressed by an empty-buffer terminator, and both lose H2/H3/QUIC's DATA+END_STREAM coalescing without a fused call.

## Modification

Adds `FinalElement: ~Copyable = Void` as a primary associated type on all four protocols. `AsyncReader` delivers it via a `consuming FinalElement?` closure parameter; `CallerAsyncReader` returns it. Both writers gain a consuming `finish` carrying the last chunk and the payload in one call.

`forEachBuffer`, `collect`, and the `pipe` variants thread the payload through. `collect` becomes consuming and gains a `Void`-final overload returning just the result.

Adds an "Alternatives considered" entry covering what would break without
this: HTTP body, gRPC, and fused close on H2/H3/QUIC.

## Result

The four protocols can back HTTP body, gRPC streaming, and similar shapes without giving up the fused close. Default `Void` keeps simple conformers unchanged; `Never` marks infinite streams. Custom and `~Copyable` payloads work end to end.
@FranzBusch FranzBusch force-pushed the fb-async-streaming-final-element branch from c313c92 to 90a5e45 Compare May 26, 2026 11:32
@FranzBusch FranzBusch marked this pull request as ready for review May 26, 2026 11:40

try await reader.pipe(into: &writer)

#expect(writer.storage.count == 5)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few tests where I had to drop the expectations since pipe is now consuming. However, I have added them back in a follow up PR #425. I needed a few more fundamental building blocks to properly write those tests.

Copy link
Copy Markdown
Contributor

@guoye-zhang guoye-zhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could simplify CallerAsyncWriter into a single method that takes a buffer and an optional final element, since the code to write the buffer can usually be shared between those methods.

Copy link
Copy Markdown

@gjcairo gjcairo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall just a few questions.

/// }
/// ```
public mutating func collect<Result, Failure: Error>(
// TODO: We should make this method take an inout `RangeReplacableCollection` instead
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's keeping us from doing this now? Also shouldn't it be a RangeReplaceableContainer?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to keep the scope of the PR limited to one change at a time

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Should it not be a Container though instead of a Collection? Out of curiosity, you don't have to update the comment.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah it should. My bad :D

Comment on lines +156 to +158
switch error {
case .first(let writeFailure): throw writeFailure
case .second: fatalError("Unreachable")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the second error in EitherError is Never, can't we call unwrap() and have the type system realise it only throws the first? Or is it not that smart that we need this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly no. The problem is that would require extensions like this

extension EitherError where First == Never {
  public func unwrap() throws(Second) {
    switch self {
    case .first:
      fatalError()
    case .second(let second):
      throw second
    }
  }
}

extension EitherError where Second == Never {
  public func unwrap() throws(First) {
    switch self {
    case .first(let first):
      throw first
    case .second:
      fatalError()
    }
  }
}

which result in ambiguity at the call site since you can't overload on the thrown type

Comment on lines +23 to +24
/// The reader signals end-of-stream by passing a non-`nil` value for the
/// `finalElement` parameter of the `body` closure. This call may also carry a
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would allow empty buffers being received but not meaning end of stream, right? Is that a concern?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some protocols have it as valid to send empty buffers for liveliness probes so this would be a supported pattern.


do {
try await reader.forEachBuffer { (_) throws(TestError) -> Void in
_ = try await reader.forEachBuffer { (_) throws(TestError) -> Void in
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: do we want to annotate forEachBuffer as @discardableResult?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to see more adoption before doing that.

@FranzBusch
Copy link
Copy Markdown
Member Author

We could simplify CallerAsyncWriter into a single method that takes a buffer and an optional final element, since the code to write the buffer can usually be shared between those methods.

I thought about this but I think this is worse for two reasons:

  1. finish is consuming right now and makes it impossible to produce more writes after finishing
  2. The ergonomics become a lot worse when always having an optional final element on write

@FranzBusch FranzBusch requested a review from gjcairo May 27, 2026 12:20
@guoye-zhang
Copy link
Copy Markdown
Contributor

  1. finish is consuming right now and makes it impossible to produce more writes after finishing

Yeah not being consuming is unfortunate, and would necessitate a separate function.

  1. The ergonomics become a lot worse when always having an optional final element on write

That's somewhat debatable. Suppose we are writing a middleware that transforms the body:

func write(buffer: Buffer) {
    let processed = // process body
    self.next.write(buffer: processed)
}

func finish(buffer: Buffer, finalElement: Void) {
    let processed = // process body
    self.next.finish(buffer: processed, finalElement: ())
}

Implementations have to either duplicate code or make both functions call a common helper

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants