Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 120 additions & 62 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex};
use bytes::Bytes;
use tokio::sync::Semaphore;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::task::JoinHandle;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, Span, debug, info_span, warn};

Expand All @@ -20,15 +20,17 @@ use crate::{LspError, Result};
/// At startup, the transport is split into a reader half and a writer
/// half. The writer half moves into a dedicated send-loop task that
/// drains an `unbounded_channel` of outgoing messages. The read-loop
/// owns the reader and spawns every request and non-lifecycle
/// notification handler against `Arc<S>`. Each spawned request is
/// tracked in an in-flight registry keyed by `RequestId`, so a
/// `$/cancelRequest` notification can trigger the handler's
/// [`CancellationToken`] and drop the handler future at its next yield
/// — the wire then carries a `-32800 RequestCancelled` response (ADR
/// 0007). Responses and outgoing notifications all flow through the
/// same channel — the send-loop is the sole writer to the transport.
pub(crate) async fn run<S, T>(server: S, transport: T, concurrency_limit: usize) -> Result<()>
/// owns the reader and spawns every spawned handler into a shared
/// [`JoinSet`] against `Arc<S>`. Each in-flight request is also tracked
/// in a registry keyed by `RequestId` holding its [`CancellationToken`],
/// so a `$/cancelRequest` can trigger that token and drop the handler
/// future at its next yield — the wire then carries a `-32800
/// RequestCancelled` response (ADR 0007). On `exit`, the read-loop aborts
/// the entire [`JoinSet`] so no in-flight handler is awaited to
/// completion (issue #4). Responses and outgoing notifications all flow
/// through the same channel — the send-loop is the sole writer to the
/// transport.
pub(crate) async fn run<S, T>(server: S, transport: T, concurrency_limit: usize) -> Result<Outcome>
where
S: LanguageServer,
T: Transport,
Expand All @@ -41,33 +43,64 @@ where
let state: SharedState = Arc::new(Mutex::new(State::Uninitialized));
let registry: Registry = Arc::new(Mutex::new(HashMap::new()));
let permits = Arc::new(Semaphore::new(concurrency_limit));
// Every spawned handler lives here. Requests also self-remove from
// `registry` on completion; this set additionally lets `exit` abort
// them all at once.
let mut tasks: JoinSet<()> = JoinSet::new();

loop {
// Reap finished handlers so the set doesn't grow unbounded over a
// long session (each completed task already released its permit).
while tasks.try_join_next().is_some() {}

let msg = match reader.recv().await {
Ok(msg) => msg,
Err(TransportError::Closed) => {
// Peer disconnected before `exit`. Drain whatever
// in-flight handlers have already queued, then return;
// unlike `exit`, we let outstanding handlers finish
// rather than abort them.
warn!("transport closed by peer before exit notification");
drop(out_tx);
let _ = send_handle.await;
return Ok(());
return Ok(Outcome::TransportClosed);
}
Err(e) => return Err(Error::Transport(e)),
};

let flow = dispatch(&server, &out_tx, &state, &registry, &permits, msg).await?;
let flow = dispatch(
&server, &out_tx, &state, &registry, &permits, &mut tasks, msg,
)
.await?;
if let Flow::Exit(code) = flow {
// Drop our master sender so the send-loop can drain on its own
// once any in-flight handler tasks release their clones; then
// bail out via process::exit per LSP semantics. Spawned
// handlers and the send-loop die with the process — issue #4
// tightens lifecycle ordering on top of this.
// `exit` means "stop now": abort every in-flight handler and
// wait for them to drop (which releases their clones of the
// outgoing sender). Then drop our master sender so the
// send-loop drains whatever was already queued and exits
// cleanly, and hand the exit code back to the entry point —
// which decides whether to terminate the process (binary) or
// simply return (library / tests).
tasks.shutdown().await;
drop(out_tx);
let _ = send_handle.await;
std::process::exit(code);
return Ok(Outcome::Exit(code));
}
}
}

/// What ended the dispatcher's read-loop. The entry point maps this to a
/// process exit for a real binary (`StdioBuilder::serve`) or simply
/// returns it for the library escape hatch (`lspf::serve`), so the same
/// dispatcher is testable in-process without a `process::exit` that would
/// take the test runner down with it.
pub(crate) enum Outcome {
/// The peer closed the transport before sending `exit`.
TransportClosed,
/// An `exit` notification was processed; carries the LSP exit code
/// (0 if `shutdown` preceded it, else 1).
Exit(i32),
}

async fn send_loop<W: TransportWriter>(mut writer: W, mut out_rx: UnboundedReceiver<RawMessage>) {
while let Some(msg) = out_rx.recv().await {
if let Err(e) = writer.send(msg).await {
Expand All @@ -94,16 +127,13 @@ enum Flow {
Exit(i32),
}

/// Entry in the in-flight registry: the task running the handler plus
/// the cancellation token wired into it. Removed atomically by
/// In-flight request registry: maps each spawned request's `RequestId`
/// to its [`CancellationToken`]. The entry is removed atomically by
/// whichever happens first — the handler completing, or a
/// `$/cancelRequest` arriving for its id.
struct InFlight {
handle: JoinHandle<()>,
token: CancellationToken,
}

type Registry = Arc<Mutex<HashMap<RequestId, InFlight>>>;
/// `$/cancelRequest` arriving for its id — and that removal arbitrates
/// who writes the single wire response. The handler's [`JoinHandle`]
/// lives in the read-loop's [`JoinSet`], not here.
type Registry = Arc<Mutex<HashMap<RequestId, CancellationToken>>>;

#[derive(serde::Deserialize)]
struct CancelParams {
Expand All @@ -116,6 +146,7 @@ async fn dispatch<S>(
state: &SharedState,
registry: &Registry,
permits: &Arc<Semaphore>,
tasks: &mut JoinSet<()>,
msg: RawMessage,
) -> Result<Flow>
where
Expand All @@ -125,6 +156,16 @@ where
RawMessage::Request { id, method, params } => {
let span = info_span!("request", method = %method, id = ?id);

// Initialize precedence: until `initialize` completes, every
// other request is refused with `ServerNotInitialized`
// *before* any handler task is spawned (issue #4). Gating the
// spawn step — not a post-spawn check inside the task — is
// what keeps the guarantee under concurrent dispatch.
if method != "initialize" && *state.lock().unwrap() == State::Uninitialized {
enqueue_error(out_tx, id, LspError::ServerNotInitialized);
return Ok(Flow::Continue);
}

match method.as_ref() {
"initialize" => {
if *state.lock().unwrap() != State::Uninitialized {
Expand All @@ -139,30 +180,33 @@ where
);
return Ok(Flow::Continue);
}
// Run inline (ADR 0003): the read-loop blocks here until
// `initialize` completes, so the `state → Running`
// transition is synchronous and every later message sees
// the post-init state. Spawning instead would let the
// next message be dispatched while still `Uninitialized`,
// defeating initialize-precedence (issue #4). A slow
// `initialize` stalling the read-loop is correct per the
// LSP spec — clients may not send other requests until it
// returns. initialize is therefore not cancellable; the
// token is a never-firing placeholder.
let params = parse_params(&params)?;
let server = Arc::clone(server);
let state = Arc::clone(state);
let permit = acquire_permit(permits).await;
spawn_request(
registry,
out_tx,
span,
id,
permit,
move |ctx, ct| async move {
let result = server.initialize(&ctx, params, ct).await;
if result.is_ok() {
*state.lock().unwrap() = State::Running;
}
result.and_then(to_value)
},
);
let ctx = Context::for_request(id.clone(), span.clone(), out_tx.clone());
let result = server
.initialize(&ctx, params, CancellationToken::new())
.instrument(span)
.await;
if result.is_ok() {
*state.lock().unwrap() = State::Running;
}
enqueue_value_response(out_tx, id, result.and_then(to_value));
}
"shutdown" => {
let server = Arc::clone(server);
let state = Arc::clone(state);
let permit = acquire_permit(permits).await;
spawn_request(
tasks,
registry,
out_tx,
span,
Expand All @@ -178,18 +222,28 @@ where
);
}
other => {
let snapshot = *state.lock().unwrap();
if snapshot == State::Uninitialized {
enqueue_error(out_tx, id, LspError::ServerNotInitialized);
} else {
enqueue_error(out_tx, id, LspError::MethodNotFound(other.to_string()));
}
// Uninitialized was already refused by the gate above,
// so reaching here means the server is running.
enqueue_error(out_tx, id, LspError::MethodNotFound(other.to_string()));
}
}
}
RawMessage::Notification { method, params } => {
let span = info_span!("notification", method = %method);

// Initialize precedence (LSP §Initialize): until `initialize`
// completes, every notification is dropped except `exit`
// (which lets an uninitialized server still shut down) and
// `initialized` (the handshake's other half). Dropping happens
// before any handler is spawned (issue #4).
if method != "initialized"
&& method != "exit"
&& *state.lock().unwrap() == State::Uninitialized
{
debug!(method = %method, "dropping notification before initialize");
return Ok(Flow::Continue);
}

match method.as_ref() {
"exit" => {
let ctx = Context::for_notification(span.clone(), out_tx.clone());
Expand All @@ -208,6 +262,7 @@ where
let params = parse_params(&params)?;
let permit = acquire_permit(permits).await;
spawn_notification(
tasks,
server,
out_tx,
span,
Expand All @@ -221,6 +276,7 @@ where
let params = parse_params(&params)?;
let permit = acquire_permit(permits).await;
spawn_notification(
tasks,
server,
out_tx,
span,
Expand Down Expand Up @@ -258,7 +314,11 @@ where
/// is still there, it writes the response; if `$/cancelRequest`
/// already removed it (and wrote `-32800`), the task's response is
/// dropped silently.
///
/// The task is spawned into the shared [`JoinSet`] so `exit` can abort it
/// along with every other in-flight handler.
fn spawn_request<F, Fut>(
tasks: &mut JoinSet<()>,
registry: &Registry,
out_tx: &UnboundedSender<RawMessage>,
span: Span,
Expand All @@ -281,7 +341,7 @@ fn spawn_request<F, Fut>(
let span_for_ctx = span.clone();
let out_tx_for_ctx = out_tx.clone();

let handle = tokio::spawn(
tasks.spawn(
async move {
// Hold the permit for the lifetime of the task; dropping at
// task end (whether the body finished, was cancelled, or
Expand Down Expand Up @@ -312,10 +372,7 @@ fn spawn_request<F, Fut>(
.instrument(span),
);

registry
.lock()
.unwrap()
.insert(id, InFlight { handle, token: ct });
registry.lock().unwrap().insert(id, ct);
}

fn handle_cancel(registry: &Registry, out_tx: &UnboundedSender<RawMessage>, params: &Bytes) {
Expand All @@ -327,22 +384,23 @@ fn handle_cancel(registry: &Registry, out_tx: &UnboundedSender<RawMessage>, para
return;
}
};
let entry = registry.lock().unwrap().remove(&parsed.id);
if let Some(entry) = entry {
let token = registry.lock().unwrap().remove(&parsed.id);
if let Some(token) = token {
// Cancel the token (wakes polite `ct.cancelled().await`s and
// flips `ct.is_cancelled()`) and write the wire response. The
// spawned task's own `select!` then drops the body future at
// its next yield — we don't call `JoinHandle::abort` directly
// its next yield — we don't abort its `JoinHandle` directly
// because abort races with the polite path: it can drop the
// future before the handler ever gets polled with the token
// observed.
entry.token.cancel();
// observed. (The task stays in the `JoinSet` and is reaped once
// it finishes.)
token.cancel();
enqueue_error(out_tx, parsed.id, LspError::RequestCancelled);
drop(entry.handle);
}
}

fn spawn_notification<S, F, Fut>(
tasks: &mut JoinSet<()>,
server: &Arc<S>,
out_tx: &UnboundedSender<RawMessage>,
span: tracing::Span,
Expand All @@ -356,7 +414,7 @@ fn spawn_notification<S, F, Fut>(
let server = Arc::clone(server);
let out_tx = out_tx.clone();
let span_for_task = span.clone();
tokio::spawn(
tasks.spawn(
async move {
let _permit = permit;
let ctx = Context::for_notification(span_for_task, out_tx);
Expand Down
6 changes: 4 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ where
S: LanguageServer,
T: Transport,
{
dispatcher::run(server, transport, DEFAULT_CONCURRENCY_LIMIT).await
dispatcher::run(server, transport, DEFAULT_CONCURRENCY_LIMIT).await?;
Ok(())
}

/// Like [`serve`], but with an explicit cap on in-flight handler tasks
Expand All @@ -54,5 +55,6 @@ where
S: LanguageServer,
T: Transport,
{
dispatcher::run(server, transport, concurrency_limit).await
dispatcher::run(server, transport, concurrency_limit).await?;
Ok(())
}
9 changes: 8 additions & 1 deletion src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ impl<S: LanguageServer> StdioBuilder<S> {

pub async fn serve(self) -> crate::Result<()> {
let transport = StdioTransport::new();
crate::dispatcher::run(self.server, transport, self.concurrency_limit).await
match crate::dispatcher::run(self.server, transport, self.concurrency_limit).await? {
// Peer hung up before `exit`: return normally and let the
// caller's `main` decide the process disposition.
crate::dispatcher::Outcome::TransportClosed => Ok(()),
// `exit` notification: terminate the process with the LSP
// exit code, per the spec's lifecycle contract.
crate::dispatcher::Outcome::Exit(code) => std::process::exit(code),
}
}
}
Loading
Loading