Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions .planning/ws-disconnect-cancellation.plan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# WebSocket Disconnect Cancellation Plan

## Goal

Close issue #111 by making WebSocket handlers observe client disconnects at interpreter safe points, matching the cancellation behavior added for normal HTTP handlers.

## Current State

- HTTP handlers use a per-request `Arc<AtomicBool>` plus `CancelOnDrop`; the forked interpreter polls that flag and exits when axum drops the response future.
- WebSocket routes currently fork one interpreter per connection and hold it in a `parking_lot::Mutex`.
- The WS loop calls the Forge handler synchronously inside the async upgrade task. While a long handler is running, the task is not polling `socket.recv()`, so it cannot observe a closed socket or `Message::Close`.
- The forked WS interpreter currently keeps its default cancellation token instead of a connection-scoped token wired to socket lifecycle.

## Implementation

1. In the `"WS"` route branch in `src/runtime/server.rs`, create one connection-scoped `Arc<AtomicBool>` at upgrade time and assign it to the forked interpreter before wrapping it in the connection mutex.
2. Split the WebSocket into sender and receiver halves.
3. Spawn a lightweight receiver task that:
- forwards `Message::Text` payloads to the main per-connection loop through a bounded Tokio channel with capacity 1,
- uses non-blocking `try_send`; if a client sends more than one queued message while the previous handler is still running, treat that as connection backpressure overflow, set the cancel flag, and stop the receiver,
- treats `Message::Close`, receive errors, and stream end as disconnect,
- stores `true` into the connection cancel flag on disconnect.
4. Process text messages sequentially in the main loop:
- run the Forge handler inside `tokio::task::spawn_blocking`, entering the current tracing span like the HTTP handler path,
- clone the `Arc<parking_lot::Mutex<Interpreter>>` into the blocking closure; acquire and drop the `MutexGuard` entirely inside that closure. The guard must never be held across an `.await` or acquired on the async side before entering `spawn_blocking`,
- after the handler returns, skip sending if the cancel flag was set,
- if `sender.send()` fails, set the cancel flag and stop.
5. Add a local drop guard for the upgrade task so any exit path sets the cancel flag.
6. Respect client `Message::Close` by setting the flag and terminating the connection loop.
7. Preserve current non-cancellation error semantics: if the handler returns an error and the connection is still active, send `error: <message>` back as before. If the cancel flag is set, skip the send because the peer is gone or the connection is closing.
8. Abort the receiver task when the main connection loop exits so shutdown does not leave a detached task holding connection resources.
9. Document the Ping/Pong assumption: axum 0.8 wraps tungstenite, whose codec handles automatic Pong responses before yielding messages. Non-text messages other than `Close` remain ignored as today.
10. Avoid changing per-connection state semantics: messages on one WS connection remain sequential and share the same forked interpreter; different WS connections stay isolated.

## Tests

Add an integration test in `tests/server_concurrency.rs` or a new WS-focused integration test using `tokio_tungstenite`:

1. Boot a Forge server with:
- `/ping` for readiness,
- a `@ws("/ws")` handler that writes a temp `started` sentinel, runs a long loop with at least one statement boundary per iteration, periodically writes a `progress` sentinel, and writes a `finished` sentinel only after the loop completes.
2. Connect a WS client, send one text message, wait until `started` proves the handler is running, then close/drop the client without waiting for a response.
3. Wait for `progress` to stop changing after disconnect. Because the handler writes progress from inside the loop body, a continued-running handler keeps changing this file; a cancelled handler stops.
4. Assert `finished` does not appear. If it appears, the loop completed normally instead of being cancelled.
5. Keep the loop body cancellation-friendly by using a statement boundary inside the `repeat` body; the interpreter checks `cancelled` at each `exec_stmt`.

Note: a Forge-level positive `after_safe` sentinel is not viable because the same cancellation flag remains set after `safe { ... }` catches the first `cancelled` error; the next statement would immediately observe cancellation before writing the sentinel.

Run:

- `cargo fmt`
- focused WS integration test
- `cargo test --test server_concurrency`
- `cargo test`

## Rollback

Revert the WS branch changes and remove the new integration test. HTTP handler cancellation remains unchanged.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- **WebSocket handlers now observe client disconnect cancellation** — WS connections install a connection-scoped cancellation token, run message handlers on the blocking pool, and keep polling the socket for close/error while handlers run so long-running loops exit at the next interpreter safe point. ([#146](https://github.com/humancto/forge-lang/pull/146))
- **VM error traces now include columns when available** — bytecode chunks carry source columns alongside line tables, old v1.1 bytecode still deserializes with zero columns, and standalone decorator statements now fail VM compilation instead of being silently ignored.
- **OpenTelemetry feature path is now CI-tested and cheaper when inactive** — CI builds `--features otel`, the OTel export path has a smoke test, and request-span traceparent extraction is skipped unless OTel export was activated at runtime.
- **Empty request IDs no longer produce blank span fields** — inbound `X-Request-Id: ` now records as `"unknown"` with a warning, and request-id extraction is covered for empty, non-ASCII, and oversized header values.
Expand Down
138 changes: 115 additions & 23 deletions src/runtime/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use tracing::Level;
use crate::interpreter::{Interpreter, RuntimeError, Value};
use crate::runtime::metadata::{CorsMode, ServerPlan};
use crate::runtime::tracing_init;
use futures_util::{SinkExt, StreamExt};

/// Cap on the recorded `request_id` length.
///
Expand Down Expand Up @@ -168,7 +169,7 @@ impl Drop for CancelOnDrop {
// these per request.
tracing::debug!(
target: "forge.server",
"response future dropped; cancel flag set"
"runtime future dropped; cancel flag set"
);
}
}
Expand Down Expand Up @@ -238,6 +239,18 @@ fn call_handler(
}
}

fn call_ws_handler(interp: &mut Interpreter, handler_name: &str, text: String) -> String {
let handler = interp.env.get(handler_name);
if let Some(h) = handler {
match interp.call_function(h, vec![Value::String(text)]) {
Ok(v) => format!("{}", v),
Err(e) => format!("error: {}", e.message),
}
} else {
"handler not found".to_string()
}
}

/// Run a Forge handler with full per-request lifecycle:
/// 1. Acquire a backpressure permit, or 503 if exhausted.
/// 2. Set up the cancel-on-drop guard.
Expand Down Expand Up @@ -435,9 +448,10 @@ pub async fn start_server(
// WebSocket handlers hold session state across messages, so
// a per-request fork is the wrong model. Each connection
// gets its own forked interpreter held inside a
// parking_lot::Mutex (messages on a single connection
// arrive serially; the lock just gives us !Send across
// await). Different connections are still fully isolated.
// parking_lot::Mutex. The guard is acquired only inside
// spawn_blocking so synchronous Forge execution never
// blocks the async socket task. Different connections
// are still fully isolated.
let hn = hn.clone();
app = app.route(
&axum_path,
Expand All @@ -447,30 +461,108 @@ pub async fn start_server(
let template = state.template.clone();
let hn = hn.clone();
async move {
ws.on_upgrade(move |mut socket| async move {
ws.on_upgrade(move |socket| async move {
use axum::extract::ws::Message;
let interp = Arc::new(parking_lot::Mutex::new(template.fork()));
while let Some(Ok(msg)) = socket.recv().await {
if let Message::Text(text) = msg {
let response = {
let mut interp = interp.lock();
let handler = interp.env.get(&hn);
if let Some(h) = handler {
match interp.call_function(
h,
vec![Value::String(text.to_string())],
) {
Ok(v) => format!("{}", v),
Err(e) => format!("error: {}", e.message),

let cancelled = Arc::new(AtomicBool::new(false));
let _drop_guard = CancelOnDrop(cancelled.clone());

let mut conn_interp = template.fork();
conn_interp.cancelled = cancelled.clone();
let interp = Arc::new(parking_lot::Mutex::new(conn_interp));
let (mut sender, mut receiver) = socket.split();
let (text_tx, mut text_rx) =
tokio::sync::mpsc::channel::<String>(1);

let cancel_for_receiver = cancelled.clone();
let receiver_task = tokio::spawn(async move {
// Axum 0.8/tungstenite handles Pong replies in the
// codec before yielding messages here. We only need
// to forward text and treat Close/errors as cancel.
while let Some(msg) = receiver.next().await {
match msg {
Ok(Message::Text(text)) => {
if text_tx.try_send(text.to_string()).is_err() {
cancel_for_receiver
.store(true, Ordering::Release);
break;
Comment on lines +485 to +488
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid dropping WS connections on brief message bursts

Using try_send on a channel of capacity 1 makes the server treat a temporarily full queue as a disconnect and immediately set the cancel flag. In practice, a client that sends two text frames back-to-back (or before the async loop has polled text_rx) can have its connection cancelled even though the server could have processed both sequentially; this is a behavioral regression from the previous loop that handled all incoming messages in order.

Useful? React with 👍 / 👎.

}
} else {
"handler not found".to_string()
}
};
let _ =
socket.send(Message::Text(response.into())).await;
Ok(Message::Close(_)) | Err(_) => {
cancel_for_receiver
.store(true, Ordering::Release);
break;
}
Ok(_) => {}
}
}
cancel_for_receiver.store(true, Ordering::Release);
});

while let Some(text) = text_rx.recv().await {
if cancelled.load(Ordering::Acquire) {
break;
}

let interp_for_blocking = interp.clone();
let hn_for_blocking = hn.clone();
let span = tracing::Span::current();
let join = tokio::task::spawn_blocking(move || {
let _g = span.enter();
let mut interp = interp_for_blocking.lock();
call_ws_handler(&mut interp, &hn_for_blocking, text)
});

let response = match join.await {
Ok(response) => response,
Err(join_err) if join_err.is_panic() => {
let payload = join_err.into_panic();
let msg = if let Some(s) =
payload.downcast_ref::<&str>()
{
(*s).to_string()
} else if let Some(s) =
payload.downcast_ref::<String>()
{
s.clone()
} else {
"<non-string panic payload>".to_string()
};
tracing::error!(
target: "forge.server",
handler = %hn,
panic = %msg,
"websocket handler panicked"
);
"error: internal handler panic".to_string()
}
Err(join_err) => {
tracing::error!(
target: "forge.server",
handler = %hn,
error = %join_err,
"websocket handler task failed"
);
"error: handler task failed".to_string()
}
};

if cancelled.load(Ordering::Acquire) {
break;
}

if sender
.send(Message::Text(response.into()))
.await
.is_err()
{
cancelled.store(true, Ordering::Release);
break;
}
}

cancelled.store(true, Ordering::Release);
receiver_task.abort();
})
}
},
Expand Down
Loading
Loading