Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions src/proto/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,22 @@ impl Send {
return;
}

// Clear all pending outbound frames.
// Note that we don't call `self.recv_err` because we want to enqueue
// the reset frame before transitioning the stream inside
// `reclaim_all_capacity`.
self.prioritize.clear_queue(buffer, stream);
// If the stream hasn't been opened yet (its initial HEADERS are still
// sitting in `pending_open`/`pending_send`), clearing the queue would
// drop those HEADERS and let a RST_STREAM become the first frame on an
// idle stream. HTTP/2 forbids that: §5.1 allows only HEADERS/PRIORITY
// on idle streams and §6.4 says RST_STREAM on idle is a PROTOCOL_ERROR.
// Keep the queued HEADERS so the stream opens, then send the reset
// immediately after.
if !stream.is_pending_open {
// Otherwise, drop any buffered DATA/HEADERS and only send the
// reset.
//
// Note that we don't call `self.recv_err` because we want to enqueue
// the reset frame before transitioning the stream inside
// `reclaim_all_capacity`.
self.prioritize.clear_queue(buffer, stream);
}

let frame = frame::Reset::new(stream.id, reason);

Expand Down
66 changes: 66 additions & 0 deletions tests/h2-tests/tests/client_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1993,6 +1993,72 @@ async fn server_drop_connection_after_go_away() {
join(srv, h2).await;
}

#[tokio::test]
async fn reset_before_headers_reaches_peer_without_headers() {
// Repro: body future errors immediately and hyper/h2 converts that into a
// RST_STREAM before the queued HEADERS are ever written, so the peer sees
// a reset for an idle stream and treats it as a PROTOCOL_ERROR.
h2_support::trace_init!();

let (io, srv) = mock::new();

// Server task: perform handshake then observe the first frame.
let srv = async move {
let mut srv = srv;
let settings = srv.assert_client_handshake().await;
assert_default_settings!(settings);

let frame = tokio::time::timeout(Duration::from_secs(1), srv.next())
.await
.expect("timed out waiting for first frame")
.expect("unexpected EOF")
.expect("frame error");

match frame {
frame::Frame::Headers(h) if h.stream_id() == StreamId::from(1) => {
assert!(h.is_end_stream() == false);
}
frame::Frame::Reset(rst) if rst.stream_id() == StreamId::from(1) => {
panic!(
"BUG: client sent RST_STREAM before any HEADERS on stream 1; reason={:?}",
rst.reason()
);
}
other => panic!("unexpected first frame: {:?}", other),
}
};

// Client task: queue HEADERS, immediately reset, then drive the connection.
let client = async move {
let (client, conn) = client::handshake(io).await.unwrap();

let req = Request::builder()
.method("POST")
.uri("https://example.com/")
.body(())
.unwrap();
let mut client = client.ready().await.expect("poll_ready");
let (_resp_fut, mut send_stream) = client.send_request(req, false).unwrap();

// Simulate body error (reqwest wraps into io::Error::Other) by resetting
// immediately after the stream is created.
send_stream.send_reset(Reason::INTERNAL_ERROR);

// Now start driving the connection so the queued frames get written.
let conn_task = tokio::spawn(async move {
let _ = conn.await;
});

// Give the connection a moment to flush frames.
tokio::time::sleep(Duration::from_millis(10)).await;

drop(send_stream);
let _ = conn_task.await;
};

join(srv, client).await;
}

const SETTINGS: &[u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0];
const SETTINGS_ACK: &[u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];

Expand Down