Skip to content

Commit 1f675de

Browse files
committed
refactor: clean up comments and naming
1 parent ed887bb commit 1f675de

File tree

2 files changed

+14
-24
lines changed

2 files changed

+14
-24
lines changed

crates/rmcp/src/transport/streamable_http_client.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -282,9 +282,7 @@ impl<C: StreamableHttpClient> StreamableHttpClientWorker<C> {
282282

283283
impl<C: StreamableHttpClient> StreamableHttpClientWorker<C> {
284284
/// Convert a raw SSE stream into a JSON-RPC message stream without
285-
/// reconnection logic. Used for per-request POST SSE responses where
286-
/// we close the stream after the first response and want the underlying
287-
/// HTTP connection to be returned to the pool promptly.
285+
/// reconnection logic.
288286
fn raw_sse_to_jsonrpc(
289287
stream: BoxedSseStream,
290288
) -> impl Stream<Item = Result<ServerJsonRpcMessage, StreamableHttpError<C::Error>>> + Send + 'static
@@ -347,11 +345,8 @@ impl<C: StreamableHttpClient> StreamableHttpClientWorker<C> {
347345
}
348346
if close_on_response && is_response {
349347
tracing::debug!("got response, draining sse stream for connection reuse");
350-
// Drain remaining stream bytes so the HTTP/1.1 connection can
351-
// be returned to the pool instead of being discarded. The
352-
// server closes the channel shortly after sending the response,
353-
// so this normally completes in microseconds on localhost. The
354-
// timeout guards against servers that keep the stream open.
348+
// Consume the remaining stream so the HTTP/1.1 connection
349+
// returns to the pool cleanly.
355350
let _ = tokio::time::timeout(std::time::Duration::from_millis(50), async {
356351
while sse_stream.next().await.is_some() {}
357352
})
@@ -788,10 +783,6 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
788783
Ok(())
789784
}
790785
Ok(StreamableHttpPostResponse::Sse(stream, ..)) => {
791-
// Per-request POST SSE streams use a thin
792-
// adapter instead of SseAutoReconnectStream so
793-
// the stream ends immediately when the server
794-
// closes the channel, enabling connection reuse.
795786
streams.spawn(Self::execute_sse_stream(
796787
Self::raw_sse_to_jsonrpc(stream),
797788
sse_worker_tx.clone(),

crates/rmcp/tests/test_streamable_http_connection_reuse.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ struct SumRequest {
2424
}
2525

2626
#[derive(Debug, Clone)]
27-
struct EchoServer {
27+
struct SumServer {
2828
tool_router: ToolRouter<Self>,
2929
}
3030

31-
impl EchoServer {
31+
impl SumServer {
3232
fn new() -> Self {
3333
Self {
3434
tool_router: Self::tool_router(),
@@ -37,15 +37,15 @@ impl EchoServer {
3737
}
3838

3939
#[tool_router]
40-
impl EchoServer {
40+
impl SumServer {
4141
#[tool(description = "Sum two numbers")]
4242
fn sum(&self, Parameters(SumRequest { a, b }): Parameters<SumRequest>) -> String {
4343
(a + b).to_string()
4444
}
4545
}
4646

4747
#[tool_handler(router = self.tool_router)]
48-
impl ServerHandler for EchoServer {
48+
impl ServerHandler for SumServer {
4949
fn get_info(&self) -> ServerInfo {
5050
ServerInfo::new(ServerCapabilities::builder().enable_tools().build())
5151
}
@@ -59,14 +59,13 @@ impl ServerHandler for EchoServer {
5959
async fn test_subsequent_tool_calls_reuse_connections() -> anyhow::Result<()> {
6060
let ct = CancellationToken::new();
6161

62-
let service: StreamableHttpService<EchoServer, LocalSessionManager> =
63-
StreamableHttpService::new(
64-
|| Ok(EchoServer::new()),
65-
Default::default(),
66-
StreamableHttpServerConfig::default()
67-
.with_sse_keep_alive(None)
68-
.with_cancellation_token(ct.child_token()),
69-
);
62+
let service: StreamableHttpService<SumServer, LocalSessionManager> = StreamableHttpService::new(
63+
|| Ok(SumServer::new()),
64+
Default::default(),
65+
StreamableHttpServerConfig::default()
66+
.with_sse_keep_alive(None)
67+
.with_cancellation_token(ct.child_token()),
68+
);
7069

7170
let router = axum::Router::new().nest_service("/mcp", service);
7271
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;

0 commit comments

Comments
 (0)