Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 54 additions & 2 deletions crates/browser-use-agent/src/mcp/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,38 @@ for line in sys.stdin:
"error": {"code": -32601, "message": "method not found"}})
"#;

const STDIO_STALL_AFTER_INIT_PY: &str = r#"
import sys, json, time

def send(obj):
sys.stdout.write(json.dumps(obj) + "\n")
sys.stdout.flush()

line = sys.stdin.readline()
msg = json.loads(line)
send({"jsonrpc": "2.0", "id": msg.get("id"), "result": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"serverInfo": {"name": "stall", "version": "0.0.1"},
}})

# Do not read stdin again. A large client request can fill the pipe while the
# transport is still in write_json, before its per-request response timeout is
# armed.
time.sleep(60)
"#;

/// Write the python fixture into a tempdir and return its path. The tempdir is
/// kept alive by the returned guard.
fn write_stdio_fixture() -> (tempfile::TempDir, std::path::PathBuf) {
write_stdio_fixture_source(STDIO_FIXTURE_PY)
}

fn write_stdio_fixture_source(source: &str) -> (tempfile::TempDir, std::path::PathBuf) {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("server.py");
let mut f = std::fs::File::create(&path).expect("create fixture");
f.write_all(STDIO_FIXTURE_PY.as_bytes())
.expect("write fixture");
f.write_all(source.as_bytes()).expect("write fixture");
f.flush().expect("flush fixture");
(dir, path)
}
Expand Down Expand Up @@ -161,6 +185,34 @@ async fn stdio_error_result_maps_is_error() {
assert_eq!(mcp_result_tool_content(&result.into_seam()), "kaboom");
}

#[tokio::test]
#[ignore = "diagnostic repro: stdio MCP timeout does not cover a blocked stdin write"]
async fn repro_stdio_call_can_remain_pending_before_response_timeout_is_armed() {
let (_dir, script) = write_stdio_fixture_source(STDIO_STALL_AFTER_INIT_PY);
let transport = StdioTransport::connect(
"python3",
&[script.to_string_lossy().to_string()],
&HashMap::new(),
None,
Duration::from_secs(2),
Duration::from_millis(50),
)
.await
.expect("connect stalling stdio fixture");

let large_arg = "x".repeat(32 * 1024 * 1024);
let result = tokio::time::timeout(
Duration::from_secs(2),
transport.call_tool("stall", Some(json!({ "blob": large_arg }))),
)
.await;

assert!(
result.is_err(),
"MCP call returned inside the outer watchdog; this repro expects write_json to remain pending before the tool timeout"
);
}

// ---------------------------------------------------------------------------
// http transport (loopback TcpListener)
// ---------------------------------------------------------------------------
Expand Down
40 changes: 40 additions & 0 deletions crates/browser-use-llm/src/route/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1285,4 +1285,44 @@ mod tests {
// The bearer token must never leak into the transport error message.
assert!(!err.message.contains("sk-not-used"), "leaked token: {err}");
}

#[tokio::test]
#[ignore = "diagnostic repro: current ModelClient has no request/stream-open timeout"]
async fn repro_model_stream_open_can_remain_pending_without_client_timeout() {
use tokio::io::AsyncReadExt;

let listener = tokio::net::TcpListener::bind(("127.0.0.1", 0))
.await
.expect("bind local listener");
let addr = listener.local_addr().expect("local addr");
let server = tokio::spawn(async move {
let Ok((mut socket, _)) = listener.accept().await else {
return;
};
let mut buf = [0u8; 1024];
let _ = socket.read(&mut buf).await;
tokio::time::sleep(Duration::from_secs(30)).await;
});

let client = ModelClient::with_retry(RetryPolicy {
max_attempts: 1,
..RetryPolicy::default()
});
let route = Route::new(
Box::new(OpenAiResponsesProtocol::new()),
Endpoint::new(format!("http://{addr}"), "/v1/responses"),
Auth::bearer("sk-not-used"),
);
let mut req = LlmRequest::new("gpt-5.1-codex", "openai");
req.messages.push(crate::schema::Message::user_text("hi"));

let result =
tokio::time::timeout(Duration::from_millis(250), client.stream(&route, &req)).await;
server.abort();

assert!(
result.is_err(),
"stream open returned inside the outer watchdog; this repro expects it to remain pending"
);
}
}
44 changes: 44 additions & 0 deletions crates/browser-use-python-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,50 @@ mod tests {
Ok(())
}

#[test]
#[ignore = "diagnostic repro: omitted timeout waits for snippet completion instead of self-recovering"]
fn repro_worker_without_timeout_waits_for_snippet_completion() -> Result<()> {
let repo_root = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.parent()
.and_then(Path::parent)
.context("repo root")?
.to_path_buf();
let temp = tempfile::tempdir()?;
let cwd = temp.path().to_path_buf();
let artifact_dir = temp.path().join("artifacts");
let pythonpath = repo_root.join("python");
let (tx, rx) = std::sync::mpsc::channel();

let handle = std::thread::spawn(move || {
let result = (|| -> Result<RunPythonResponse> {
let mut worker = PythonWorker::start_with_pythonpath("python3", pythonpath)?;
worker.run_with_timeout(
"s1",
cwd,
artifact_dir,
"import time\ntime.sleep(1.5)\nresult = 'finished'",
None,
)
})();
let _ = tx.send(result);
});

match rx.recv_timeout(Duration::from_millis(250)) {
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
other => panic!(
"python call returned before the outer watchdog; expected no self-timeout: {other:?}"
),
}

let response = rx
.recv_timeout(Duration::from_secs(3))
.expect("worker should finish after snippet sleep")?;
handle.join().expect("worker thread join");
assert!(response.ok, "{response:?}");
assert_eq!(response.data, Value::String("finished".to_string()));
Ok(())
}

#[test]
fn worker_hard_times_out_threadpool_shutdown_hang_and_recovers() -> Result<()> {
let repo_root = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
Expand Down