diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 9267222c..b52117d9 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -251,11 +251,22 @@ impl Send { return; } - // Clear all pending outbound frames. - // Note that we don't call `self.recv_err` because we want to enqueue - // the reset frame before transitioning the stream inside - // `reclaim_all_capacity`. - self.prioritize.clear_queue(buffer, stream); + // If the stream hasn't been opened yet (its initial HEADERS are still + // sitting in `pending_open`/`pending_send`), clearing the queue would + // drop those HEADERS and let a RST_STREAM become the first frame on an + // idle stream. HTTP/2 forbids that: §5.1 allows only HEADERS/PRIORITY + // on idle streams and §6.4 says RST_STREAM on idle is a PROTOCOL_ERROR. + // Keep the queued HEADERS so the stream opens, then send the reset + // immediately after. + if !stream.is_pending_open { + // Otherwise, drop any buffered DATA/HEADERS and only send the + // reset. + // + // Note that we don't call `self.recv_err` because we want to enqueue + // the reset frame before transitioning the stream inside + // `reclaim_all_capacity`. + self.prioritize.clear_queue(buffer, stream); + } let frame = frame::Reset::new(stream.id, reason); diff --git a/tests/h2-tests/tests/client_request.rs b/tests/h2-tests/tests/client_request.rs index c99cc531..7ad5ee85 100644 --- a/tests/h2-tests/tests/client_request.rs +++ b/tests/h2-tests/tests/client_request.rs @@ -1993,6 +1993,72 @@ async fn server_drop_connection_after_go_away() { join(srv, h2).await; } +#[tokio::test] +async fn reset_before_headers_reaches_peer_without_headers() { + // Repro: body future errors immediately and hyper/h2 converts that into a + // RST_STREAM before the queued HEADERS are ever written, so the peer sees + // a reset for an idle stream and treats it as a PROTOCOL_ERROR. + h2_support::trace_init!(); + + let (io, srv) = mock::new(); + + // Server task: perform handshake then observe the first frame. + let srv = async move { + let mut srv = srv; + let settings = srv.assert_client_handshake().await; + assert_default_settings!(settings); + + let frame = tokio::time::timeout(Duration::from_secs(1), srv.next()) + .await + .expect("timed out waiting for first frame") + .expect("unexpected EOF") + .expect("frame error"); + + match frame { + frame::Frame::Headers(h) if h.stream_id() == StreamId::from(1) => { + assert!(h.is_end_stream() == false); + } + frame::Frame::Reset(rst) if rst.stream_id() == StreamId::from(1) => { + panic!( + "BUG: client sent RST_STREAM before any HEADERS on stream 1; reason={:?}", + rst.reason() + ); + } + other => panic!("unexpected first frame: {:?}", other), + } + }; + + // Client task: queue HEADERS, immediately reset, then drive the connection. + let client = async move { + let (client, conn) = client::handshake(io).await.unwrap(); + + let req = Request::builder() + .method("POST") + .uri("https://example.com/") + .body(()) + .unwrap(); + let mut client = client.ready().await.expect("poll_ready"); + let (_resp_fut, mut send_stream) = client.send_request(req, false).unwrap(); + + // Simulate body error (reqwest wraps into io::Error::Other) by resetting + // immediately after the stream is created. + send_stream.send_reset(Reason::INTERNAL_ERROR); + + // Now start driving the connection so the queued frames get written. + let conn_task = tokio::spawn(async move { + let _ = conn.await; + }); + + // Give the connection a moment to flush frames. + tokio::time::sleep(Duration::from_millis(10)).await; + + drop(send_stream); + let _ = conn_task.await; + }; + + join(srv, client).await; +} + const SETTINGS: &[u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; const SETTINGS_ACK: &[u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];