Tonic dsa#2678
Closed
taooceros wants to merge 4 commits into
Closed
Conversation
|
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR refactors tonic’s codec and streaming pipeline to support async encoders/decoders and updates client/server glue code accordingly.
Changes:
- Make
Encoder/Decoderasynchronous via GAT-based associated futures and update built-in codec implementations. - Rewrite encode/decode streaming internals to await codec futures and support pending codecs.
- Update client/server request/response handling (including new boxing points) and add tests/bench updates for async codecs.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| tonic/src/server/grpc.rs | Switch request extraction to Streaming::message() and box the request stream. |
| tonic/src/client/grpc.rs | Use new_response_or_empty and box streaming responses. |
| tonic/src/codec/mod.rs | Change public Encoder/Decoder traits to async via associated futures. |
| tonic/src/codec/encode.rs | Replace custom Stream impl with async_stream generator and await encoder futures. |
| tonic/src/codec/decode.rs | Refactor streaming decode to support async decoder futures; introduce boxing + unsafe lifetime extension. |
| tonic/benches/decode.rs | Update bench decoder to new async Decoder trait shape. |
| tonic/Cargo.toml | Add async-stream dependency. |
| tonic-protobuf/src/lib.rs | Update protobuf codec to async Encoder/Decoder trait. |
| tonic-prost/src/codec.rs | Update prost codec to async traits and add tests for pending encoders/decoders. |
| examples/src/json-codec/common.rs | Update JSON codec example to async traits and add Send bound. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+132
to
+138
| /// The future returned by [`Encoder::encode`]. | ||
| type EncodeFuture<'a>: Future<Output = Result<(), Self::Error>> + Send + 'a | ||
| where | ||
| Self: 'a; | ||
|
|
||
| /// Encodes a message into the provided buffer. | ||
| fn encode(&mut self, item: Self::Item, dst: &mut EncodeBuf<'_>) -> Result<(), Self::Error>; | ||
| fn encode<'a>(&'a mut self, item: Self::Item, dst: EncodeBuf<'a>) -> Self::EncodeFuture<'a>; |
Comment on lines
+154
to
+157
| /// The future returned by [`Decoder::decode`]. | ||
| type DecodeFuture<'a>: Future<Output = Result<Option<Self::Item>, Self::Error>> + Send + 'a | ||
| where | ||
| Self: 'a; |
Comment on lines
163
to
+164
| /// for you. | ||
| fn decode(&mut self, src: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error>; | ||
| fn decode<'a>(&'a mut self, src: DecodeBuf<'a>) -> Self::DecodeFuture<'a>; |
Comment on lines
63
to
77
| Poll::Ready(Some(Ok(item))) => { | ||
| if let Err(status) = encode_item( | ||
| encoder, | ||
| buf, | ||
| uncompression_buf, | ||
| *compression_encoding, | ||
| *max_message_size, | ||
| &mut encoder, | ||
| &mut buf, | ||
| &mut uncompression_buf, | ||
| compression_encoding, | ||
| max_message_size, | ||
| buffer_settings, | ||
| item, | ||
| ) { | ||
| return Poll::Ready(Some(Err(status))); | ||
| ) | ||
| .await | ||
| { | ||
| yield Err(status); | ||
| continue; | ||
| } |
Comment on lines
+249
to
+261
| unsafe fn extend_decode_future_lifetime<'a, T, D>( | ||
| future: D::DecodeFuture<'a>, | ||
| ) -> D::DecodeFuture<'static> | ||
| where | ||
| T: Send + 'static, | ||
| D: Decoder<Item = T, Error = Status> + Send + 'static, | ||
| { | ||
| // SAFETY: `StreamingInner` stores the future together with the decoder and | ||
| // buffers it borrows. The type is !Unpin, the future is always dropped in | ||
| // `PinnedDrop` before those fields, and no method moves or mutates the | ||
| // borrowed fields while `decode` is `Some`. | ||
| unsafe { std::mem::transmute::<D::DecodeFuture<'a>, D::DecodeFuture<'static>>(future) } | ||
| } |
Comment on lines
+354
to
+362
| Streaming::new_response_or_empty( | ||
| decoder, | ||
| body, | ||
| status_code, | ||
| expect_additional_trailers, | ||
| encoding, | ||
| self.config.max_decoding_message_size, | ||
| ) | ||
| .boxed() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
Solution