diff --git a/crates/browser-use-agent/src/mcp/tests.rs b/crates/browser-use-agent/src/mcp/tests.rs index 722caa4c..8ec202f4 100644 --- a/crates/browser-use-agent/src/mcp/tests.rs +++ b/crates/browser-use-agent/src/mcp/tests.rs @@ -83,14 +83,38 @@ for line in sys.stdin: "error": {"code": -32601, "message": "method not found"}}) "#; +const STDIO_STALL_AFTER_INIT_PY: &str = r#" +import sys, json, time + +def send(obj): + sys.stdout.write(json.dumps(obj) + "\n") + sys.stdout.flush() + +line = sys.stdin.readline() +msg = json.loads(line) +send({"jsonrpc": "2.0", "id": msg.get("id"), "result": { + "protocolVersion": "2024-11-05", + "capabilities": {}, + "serverInfo": {"name": "stall", "version": "0.0.1"}, +}}) + +# Do not read stdin again. A large client request can fill the pipe while the +# transport is still in write_json, before its per-request response timeout is +# armed. +time.sleep(60) +"#; + /// Write the python fixture into a tempdir and return its path. The tempdir is /// kept alive by the returned guard. fn write_stdio_fixture() -> (tempfile::TempDir, std::path::PathBuf) { + write_stdio_fixture_source(STDIO_FIXTURE_PY) +} + +fn write_stdio_fixture_source(source: &str) -> (tempfile::TempDir, std::path::PathBuf) { let dir = tempfile::tempdir().expect("tempdir"); let path = dir.path().join("server.py"); let mut f = std::fs::File::create(&path).expect("create fixture"); - f.write_all(STDIO_FIXTURE_PY.as_bytes()) - .expect("write fixture"); + f.write_all(source.as_bytes()).expect("write fixture"); f.flush().expect("flush fixture"); (dir, path) } @@ -161,6 +185,34 @@ async fn stdio_error_result_maps_is_error() { assert_eq!(mcp_result_tool_content(&result.into_seam()), "kaboom"); } +#[tokio::test] +#[ignore = "diagnostic repro: stdio MCP timeout does not cover a blocked stdin write"] +async fn repro_stdio_call_can_remain_pending_before_response_timeout_is_armed() { + let (_dir, script) = write_stdio_fixture_source(STDIO_STALL_AFTER_INIT_PY); + let transport = StdioTransport::connect( + "python3", + &[script.to_string_lossy().to_string()], + &HashMap::new(), + None, + Duration::from_secs(2), + Duration::from_millis(50), + ) + .await + .expect("connect stalling stdio fixture"); + + let large_arg = "x".repeat(32 * 1024 * 1024); + let result = tokio::time::timeout( + Duration::from_secs(2), + transport.call_tool("stall", Some(json!({ "blob": large_arg }))), + ) + .await; + + assert!( + result.is_err(), + "MCP call returned inside the outer watchdog; this repro expects write_json to remain pending before the tool timeout" + ); +} + // --------------------------------------------------------------------------- // http transport (loopback TcpListener) // --------------------------------------------------------------------------- diff --git a/crates/browser-use-llm/src/route/client.rs b/crates/browser-use-llm/src/route/client.rs index 121bdabc..18150bc5 100644 --- a/crates/browser-use-llm/src/route/client.rs +++ b/crates/browser-use-llm/src/route/client.rs @@ -1285,4 +1285,44 @@ mod tests { // The bearer token must never leak into the transport error message. assert!(!err.message.contains("sk-not-used"), "leaked token: {err}"); } + + #[tokio::test] + #[ignore = "diagnostic repro: current ModelClient has no request/stream-open timeout"] + async fn repro_model_stream_open_can_remain_pending_without_client_timeout() { + use tokio::io::AsyncReadExt; + + let listener = tokio::net::TcpListener::bind(("127.0.0.1", 0)) + .await + .expect("bind local listener"); + let addr = listener.local_addr().expect("local addr"); + let server = tokio::spawn(async move { + let Ok((mut socket, _)) = listener.accept().await else { + return; + }; + let mut buf = [0u8; 1024]; + let _ = socket.read(&mut buf).await; + tokio::time::sleep(Duration::from_secs(30)).await; + }); + + let client = ModelClient::with_retry(RetryPolicy { + max_attempts: 1, + ..RetryPolicy::default() + }); + let route = Route::new( + Box::new(OpenAiResponsesProtocol::new()), + Endpoint::new(format!("http://{addr}"), "/v1/responses"), + Auth::bearer("sk-not-used"), + ); + let mut req = LlmRequest::new("gpt-5.1-codex", "openai"); + req.messages.push(crate::schema::Message::user_text("hi")); + + let result = + tokio::time::timeout(Duration::from_millis(250), client.stream(&route, &req)).await; + server.abort(); + + assert!( + result.is_err(), + "stream open returned inside the outer watchdog; this repro expects it to remain pending" + ); + } } diff --git a/crates/browser-use-python-worker/src/lib.rs b/crates/browser-use-python-worker/src/lib.rs index f4a53597..ee302efe 100644 --- a/crates/browser-use-python-worker/src/lib.rs +++ b/crates/browser-use-python-worker/src/lib.rs @@ -555,6 +555,50 @@ mod tests { Ok(()) } + #[test] + #[ignore = "diagnostic repro: omitted timeout waits for snippet completion instead of self-recovering"] + fn repro_worker_without_timeout_waits_for_snippet_completion() -> Result<()> { + let repo_root = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .and_then(Path::parent) + .context("repo root")? + .to_path_buf(); + let temp = tempfile::tempdir()?; + let cwd = temp.path().to_path_buf(); + let artifact_dir = temp.path().join("artifacts"); + let pythonpath = repo_root.join("python"); + let (tx, rx) = std::sync::mpsc::channel(); + + let handle = std::thread::spawn(move || { + let result = (|| -> Result { + let mut worker = PythonWorker::start_with_pythonpath("python3", pythonpath)?; + worker.run_with_timeout( + "s1", + cwd, + artifact_dir, + "import time\ntime.sleep(1.5)\nresult = 'finished'", + None, + ) + })(); + let _ = tx.send(result); + }); + + match rx.recv_timeout(Duration::from_millis(250)) { + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {} + other => panic!( + "python call returned before the outer watchdog; expected no self-timeout: {other:?}" + ), + } + + let response = rx + .recv_timeout(Duration::from_secs(3)) + .expect("worker should finish after snippet sleep")?; + handle.join().expect("worker thread join"); + assert!(response.ok, "{response:?}"); + assert_eq!(response.data, Value::String("finished".to_string())); + Ok(()) + } + #[test] fn worker_hard_times_out_threadpool_shutdown_hang_and_recovers() -> Result<()> { let repo_root = PathBuf::from(env!("CARGO_MANIFEST_DIR"))