Skip to content

Commit a28fd20

Browse files
committed
feat(tui): support cloudflare jwt gateways
Signed-off-by: Alex Lewontin <alex.lewontin@canonical.com>
1 parent 1e4b2eb commit a28fd20

4 files changed

Lines changed: 212 additions & 1 deletion

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/openshell-tui/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ owo-colors = { workspace = true }
2727
serde = { workspace = true }
2828
tracing = { workspace = true }
2929
url = { workspace = true }
30+
tokio-tungstenite = { workspace = true }
31+
futures = { workspace = true }
3032

3133
[lints]
3234
workspace = true
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Edge-authenticated WebSocket tunnel proxy for TUI gateway switching.
5+
6+
use futures::stream::{SplitSink, SplitStream};
7+
use futures::{SinkExt, StreamExt};
8+
use miette::{IntoDiagnostic, Result};
9+
use std::net::SocketAddr;
10+
use std::sync::Arc;
11+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
12+
use tokio::net::{TcpListener, TcpStream};
13+
use tokio_tungstenite::tungstenite::Message;
14+
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
15+
use tokio_tungstenite::tungstenite::http::HeaderValue;
16+
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
17+
use tracing::{debug, error, warn};
18+
19+
pub struct EdgeTunnelProxy {
20+
pub local_addr: SocketAddr,
21+
}
22+
23+
#[derive(Clone)]
24+
struct TunnelConfig {
25+
ws_url: String,
26+
edge_token: String,
27+
}
28+
29+
pub async fn start_tunnel_proxy(
30+
gateway_endpoint: &str,
31+
edge_token: &str,
32+
) -> Result<EdgeTunnelProxy> {
33+
let listener = TcpListener::bind("127.0.0.1:0").await.into_diagnostic()?;
34+
let local_addr = listener.local_addr().into_diagnostic()?;
35+
let ws_url = format!(
36+
"{}/_ws_tunnel",
37+
gateway_endpoint
38+
.replacen("https://", "wss://", 1)
39+
.replacen("http://", "ws://", 1)
40+
.trim_end_matches('/')
41+
);
42+
let config = Arc::new(TunnelConfig {
43+
ws_url,
44+
edge_token: edge_token.to_string(),
45+
});
46+
47+
debug!(
48+
local_addr = %local_addr,
49+
gateway = %gateway_endpoint,
50+
"starting TUI edge tunnel proxy"
51+
);
52+
tokio::spawn(accept_loop(listener, config));
53+
Ok(EdgeTunnelProxy { local_addr })
54+
}
55+
56+
async fn accept_loop(listener: TcpListener, config: Arc<TunnelConfig>) {
57+
loop {
58+
match listener.accept().await {
59+
Ok((stream, peer)) => {
60+
let config = Arc::clone(&config);
61+
tokio::spawn(async move {
62+
if let Err(err) = handle_connection(stream, &config).await {
63+
warn!(peer = %peer, error = %err, "TUI edge tunnel connection failed");
64+
}
65+
});
66+
}
67+
Err(err) => {
68+
error!(error = %err, "failed to accept TUI edge tunnel connection");
69+
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
70+
}
71+
}
72+
}
73+
}
74+
75+
async fn handle_connection(tcp_stream: TcpStream, config: &TunnelConfig) -> Result<()> {
76+
let ws_stream = open_ws(config).await?;
77+
let (ws_sink, ws_source) = ws_stream.split();
78+
let (tcp_read, tcp_write) = tokio::io::split(tcp_stream);
79+
80+
let mut tcp_to_ws = tokio::spawn(copy_tcp_to_ws(tcp_read, ws_sink));
81+
let mut ws_to_tcp = tokio::spawn(copy_ws_to_tcp(ws_source, tcp_write));
82+
83+
tokio::select! {
84+
res = &mut tcp_to_ws => {
85+
if let Err(err) = res {
86+
debug!(error = %err, "TUI tcp->ws task panicked");
87+
}
88+
ws_to_tcp.abort();
89+
}
90+
res = &mut ws_to_tcp => {
91+
if let Err(err) = res {
92+
debug!(error = %err, "TUI ws->tcp task panicked");
93+
}
94+
tcp_to_ws.abort();
95+
}
96+
}
97+
98+
Ok(())
99+
}
100+
101+
async fn open_ws(config: &TunnelConfig) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
102+
let mut request = (&config.ws_url).into_client_request().into_diagnostic()?;
103+
let token_val = HeaderValue::from_str(&config.edge_token)
104+
.map_err(|err| miette::miette!("invalid edge token header value: {err}"))?;
105+
request
106+
.headers_mut()
107+
.insert("Cf-Access-Token", token_val.clone());
108+
request
109+
.headers_mut()
110+
.insert("Cf-Access-Jwt-Assertion", token_val);
111+
request.headers_mut().insert(
112+
"Cookie",
113+
HeaderValue::from_str(&format!("CF_Authorization={}", config.edge_token))
114+
.map_err(|err| miette::miette!("invalid edge token cookie value: {err}"))?,
115+
);
116+
117+
let (ws_stream, response) = tokio_tungstenite::connect_async(request)
118+
.await
119+
.map_err(|err| miette::miette!("WebSocket connect failed: {err}"))?;
120+
debug!(status = %response.status(), "TUI edge WebSocket connected");
121+
Ok(ws_stream)
122+
}
123+
124+
async fn copy_tcp_to_ws(
125+
mut tcp_read: tokio::io::ReadHalf<TcpStream>,
126+
mut ws_sink: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
127+
) {
128+
let mut buf = vec![0_u8; 32 * 1024];
129+
loop {
130+
match tcp_read.read(&mut buf).await {
131+
Ok(0) => {
132+
let _ = ws_sink.close().await;
133+
break;
134+
}
135+
Ok(n) => {
136+
if ws_sink
137+
.send(Message::Binary(buf[..n].to_vec().into()))
138+
.await
139+
.is_err()
140+
{
141+
break;
142+
}
143+
}
144+
Err(err) => {
145+
debug!(error = %err, "TUI tcp read error");
146+
let _ = ws_sink.close().await;
147+
break;
148+
}
149+
}
150+
}
151+
}
152+
153+
async fn copy_ws_to_tcp(
154+
mut ws_source: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
155+
mut tcp_write: tokio::io::WriteHalf<TcpStream>,
156+
) {
157+
while let Some(msg) = ws_source.next().await {
158+
match msg {
159+
Ok(Message::Binary(data)) => {
160+
if tcp_write.write_all(&data).await.is_err() {
161+
break;
162+
}
163+
}
164+
Ok(Message::Close(_)) => break,
165+
Ok(Message::Ping(_) | Message::Pong(_) | Message::Text(_) | Message::Frame(_)) => {}
166+
Err(err) => {
167+
debug!(error = %err, "TUI WebSocket read error");
168+
break;
169+
}
170+
}
171+
}
172+
let _ = tcp_write.shutdown().await;
173+
}

crates/openshell-tui/src/lib.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
mod app;
55
mod clipboard;
6+
mod edge_tunnel;
67
mod event;
78
pub mod theme;
89
mod ui;
@@ -18,7 +19,7 @@ use crossterm::terminal::{
1819
EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode,
1920
};
2021
use miette::{IntoDiagnostic, Result};
21-
use openshell_bootstrap::GatewayMetadata;
22+
use openshell_bootstrap::{GatewayMetadata, edge_token::load_edge_token};
2223
use openshell_core::auth::EdgeAuthInterceptor;
2324
use openshell_core::metadata::{ObjectId, ObjectLabels, ObjectName};
2425
use openshell_core::proto::SandboxPhase;
@@ -496,13 +497,15 @@ async fn handle_gateway_switch(app: &mut App) {
496497
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
497498
enum GatewayChannelMode {
498499
Oidc,
500+
Edge,
499501
Plaintext,
500502
Mtls,
501503
}
502504

503505
fn gateway_channel_mode(meta: Option<&GatewayMetadata>, endpoint: &str) -> GatewayChannelMode {
504506
match meta.and_then(|m| m.auth_mode.as_deref()) {
505507
Some("oidc") => GatewayChannelMode::Oidc,
508+
Some("cloudflare_jwt") => GatewayChannelMode::Edge,
506509
Some("plaintext") => GatewayChannelMode::Plaintext,
507510
_ if endpoint.starts_with("http://") => GatewayChannelMode::Plaintext,
508511
_ => GatewayChannelMode::Mtls,
@@ -535,6 +538,17 @@ async fn connect_to_gateway(name: &str, endpoint: &str) -> Result<(Channel, Edge
535538
let channel = build_oidc_channel(name, endpoint).await?;
536539
Ok((channel, interceptor))
537540
}
541+
GatewayChannelMode::Edge => {
542+
let token = load_edge_token(name).ok_or_else(|| {
543+
miette::miette!(
544+
"No edge token for gateway '{name}'.\n\
545+
Authenticate with: openshell gateway login"
546+
)
547+
})?;
548+
let interceptor = EdgeAuthInterceptor::new(None, Some(&token))?;
549+
let channel = build_edge_channel(endpoint, &token).await?;
550+
Ok((channel, interceptor))
551+
}
538552
GatewayChannelMode::Plaintext => {
539553
let channel = build_plaintext_channel(endpoint).await?;
540554
Ok((channel, EdgeAuthInterceptor::noop()))
@@ -546,6 +560,14 @@ async fn connect_to_gateway(name: &str, endpoint: &str) -> Result<(Channel, Edge
546560
}
547561
}
548562

563+
async fn build_edge_channel(endpoint: &str, token: &str) -> Result<Channel> {
564+
if endpoint.starts_with("https://") {
565+
let proxy = edge_tunnel::start_tunnel_proxy(endpoint, token).await?;
566+
return build_plaintext_channel(&format!("http://{}", proxy.local_addr)).await;
567+
}
568+
build_plaintext_channel(endpoint).await
569+
}
570+
549571
async fn build_plaintext_channel(endpoint: &str) -> Result<Channel> {
550572
Endpoint::from_shared(endpoint.to_string())
551573
.into_diagnostic()?
@@ -2562,6 +2584,18 @@ mod tests {
25622584
);
25632585
}
25642586

2587+
#[test]
2588+
fn gateway_channel_mode_prefers_edge_metadata() {
2589+
let meta = GatewayMetadata {
2590+
auth_mode: Some("cloudflare_jwt".to_string()),
2591+
..Default::default()
2592+
};
2593+
assert_eq!(
2594+
gateway_channel_mode(Some(&meta), "https://gateway.example.com"),
2595+
GatewayChannelMode::Edge
2596+
);
2597+
}
2598+
25652599
#[test]
25662600
fn gateway_channel_mode_uses_http_endpoint_fallback() {
25672601
assert_eq!(

0 commit comments

Comments
 (0)