diff --git a/pubky-sdk/src/client/core.rs b/pubky-sdk/src/client/core.rs index df90b4617..545387180 100644 --- a/pubky-sdk/src/client/core.rs +++ b/pubky-sdk/src/client/core.rs @@ -233,6 +233,9 @@ impl PubkyHttpClientBuilder { #[cfg(not(target_arch = "wasm32"))] icann_http: icann_http_builder.build()?, + #[cfg(not(target_arch = "wasm32"))] + transport: super::http_targets::native::TransportResolver::new(), + #[cfg(target_arch = "wasm32")] testnet_host: self.testnet_host.clone(), }) @@ -333,6 +336,10 @@ pub struct PubkyHttpClient { #[cfg(not(target_arch = "wasm32"))] pub(crate) icann_http: reqwest::Client, + /// Resolves and caches per-host transport decisions (`PubkyTLS` vs ICANN fallback). + #[cfg(not(target_arch = "wasm32"))] + pub(crate) transport: super::http_targets::native::TransportResolver, + /// The hostname to use for testnet URL transformations (WASM only). #[cfg(target_arch = "wasm32")] pub(crate) testnet_host: Option, diff --git a/pubky-sdk/src/client/http_targets/native.rs b/pubky-sdk/src/client/http_targets/native.rs index 5b79deabc..01cadddeb 100644 --- a/pubky-sdk/src/client/http_targets/native.rs +++ b/pubky-sdk/src/client/http_targets/native.rs @@ -1,8 +1,130 @@ +use std::collections::HashMap; +use std::sync::{Arc, Mutex, PoisonError, RwLock}; +use std::time::{Duration, Instant}; + +use futures_util::StreamExt; +use tokio::net::TcpStream; + use crate::errors::RequestError; use crate::{PubkyHttpClient, PublicKey, Result, cross_log}; use reqwest::{IntoUrl, Method, RequestBuilder}; use url::Url; +const TRANSPORT_CACHE_TTL: Duration = Duration::from_secs(60); +const PROBE_TIMEOUT: Duration = Duration::from_millis(1500); + +#[derive(Debug, Clone)] +pub(crate) enum ResolvedTransport { + PubkyTls, + Icann { domain: String, port: Option }, +} + +/// Resolves and caches per-host transport decisions (`PubkyTLS` vs ICANN). +/// +/// Accepts a `&pkarr::Client` reference when resolution is needed — does not +/// own the pkarr client, which is shared across the SDK. +#[derive(Debug, Clone)] +pub(crate) struct TransportResolver { + cache: Arc>>, + guards: Arc>>>>, +} + +impl TransportResolver { + pub(crate) fn new() -> Self { + Self { + cache: Arc::new(RwLock::new(HashMap::new())), + guards: Arc::new(Mutex::new(HashMap::new())), + } + } + + /// Look up the transport for `pk`, resolving via PKARR on cache miss. + pub(crate) async fn resolve(&self, pk: &str, pkarr: &pkarr::Client) -> ResolvedTransport { + if let Some(t) = self.cached(pk) { + return t; + } + self.resolve_and_cache(pk, pkarr).await + } + + /// Fast path: return a cached, non-expired transport decision. + fn cached(&self, pk: &str) -> Option { + let cache = self.cache.read().unwrap_or_else(PoisonError::into_inner); + cache + .get(pk) + .filter(|(ts, _)| ts.elapsed() < TRANSPORT_CACHE_TTL) + .map(|(_, t)| t.clone()) + } + + /// Slow path: acquire a per-key guard, double-check the cache, resolve, + /// and store the result. + async fn resolve_and_cache(&self, pk: &str, pkarr: &pkarr::Client) -> ResolvedTransport { + let guard = { + let mut guards = self.guards.lock().unwrap_or_else(PoisonError::into_inner); + Arc::clone(guards.entry(pk.to_string()).or_default()) + }; + let _lock = guard.lock().await; + + // Another task may have resolved while we waited for the guard. + if let Some(t) = self.cached(pk) { + return t; + } + + let t = Self::resolve_from_pkarr(pkarr, pk).await; + self.cache + .write() + .unwrap_or_else(PoisonError::into_inner) + .insert(pk.to_string(), (Instant::now(), t.clone())); + t + } + + /// Inspect PKARR endpoints and probe reachability to pick a transport. + async fn resolve_from_pkarr(pkarr: &pkarr::Client, qname: &str) -> ResolvedTransport { + let stream = pkarr.resolve_https_endpoints(qname); + futures_util::pin_mut!(stream); + + let mut has_direct = false; + let mut direct_addrs = Vec::new(); + let mut icann: Option<(String, Option)> = None; + + while let Some(ep) = stream.next().await { + if let Some(domain) = ep.domain() { + if icann.is_none() { + icann = Some((domain.to_string(), ep.port())); + } + } else { + has_direct = true; + direct_addrs.extend(ep.to_socket_addrs()); + } + } + + let Some((domain, port)) = icann else { + return ResolvedTransport::PubkyTls; + }; + if !has_direct { + return ResolvedTransport::Icann { domain, port }; + } + + // Both exist — probe direct endpoint reachability. + if probe_reachable(&direct_addrs, PROBE_TIMEOUT).await { + ResolvedTransport::PubkyTls + } else { + cross_log!( + warn, + "Direct endpoint unreachable for {qname}; ICANN fallback to {domain}" + ); + ResolvedTransport::Icann { domain, port } + } + } +} + +async fn probe_reachable(addrs: &[std::net::SocketAddr], timeout: Duration) -> bool { + for addr in addrs { + if let Ok(Ok(_)) = tokio::time::timeout(timeout, TcpStream::connect(addr)).await { + return true; + } + } + false +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum HostKind { ResolvedPubky, @@ -26,40 +148,65 @@ fn classify_host(host: &str) -> HostKind { impl PubkyHttpClient { /// Constructs a [`reqwest::RequestBuilder`] for the given HTTP `method` and `url`, - /// routing through the client’s unified request path. + /// routing through the client's unified request path. /// /// This method ensures that special Pubky and pkarr hosts are resolved according to /// platform-specific rules (native or WASM), including: /// - Detecting `_pubky.` hosts and applying the correct TLS handling. /// - Routing standard ICANN domains through the `icann_http` client on native builds. + /// - When both a direct (IP:PORT) and an ICANN (domain) endpoint exist, TCP-probing + /// the direct endpoint and falling back to ICANN if unreachable. /// - /// On native targets, this is effectively a thin wrapper around [`PubkyHttpClient::request`], - /// while on WASM it also performs host transformation and may add the `pubky-host` header. + /// Transport decisions are cached per public key with a short TTL. /// /// Returns a [`Result`] containing the prepared `RequestBuilder`, or a URL/transport /// parsing error if the supplied `url` is invalid. - /// - /// [`PubkyHttpClient::request`]: crate::PubkyHttpClient::request - #[allow( - clippy::unused_async, - reason = "native implementation stays async to share the same signature as the WASM backend" - )] pub(crate) async fn cross_request( &self, method: Method, mut url: Url, ) -> Result { - let _ = self.prepare_request(&mut url).await?; - Ok(self.request(method, &url)) + let Some(pk) = self.prepare_request(&mut url).await? else { + return Ok(self.request(method, &url)); + }; + let transport = self.transport.resolve(&pk, &self.pkarr).await; + self.build_pubky_request(method, &url, &pk, &transport) + } + + /// Build a [`RequestBuilder`] for a resolved pubky host transport. + fn build_pubky_request( + &self, + method: Method, + url: &Url, + pk: &str, + transport: &ResolvedTransport, + ) -> Result { + match transport { + ResolvedTransport::PubkyTls => Ok(self.http.request(method, url.as_str())), + ResolvedTransport::Icann { domain, port } => { + let mut icann_url = url.clone(); + icann_url.set_host(Some(domain))?; + if let Some(p) = port { + icann_url + .set_port(Some(*p)) + .map_err(|_err| url::ParseError::InvalidPort)?; + } + cross_log!(debug, "ICANN fallback for {pk} via {domain}"); + Ok(self + .icann_http + .request(method, icann_url.as_str()) + .header("pubky-host", pk)) + } + } } - /// Prepare a request for callers that need the WASM-style preflight. + /// Detect pubky hosts and return the z32 public key when applicable. /// /// Native builds do not rewrite URLs; we only detect pubky hosts and return the /// `pubky-host` value when applicable. /// /// # Errors - /// - Returns [`crate::errors::RequestError::Validation`] if the host uses a `pubky` prefix. + /// Returns [`RequestError::Validation`] if the host uses a `pubky` prefix. #[allow( clippy::unused_async, reason = "keep async signature aligned with WASM build" @@ -94,7 +241,7 @@ impl PubkyHttpClient { Ok(None) } - /// Start building a `Request` with the `Method` and `Url` (native-only) + /// Start building a `Request` with the `Method` and `Url` (native-only). /// /// Returns a `RequestBuilder`, which will allow setting headers and /// the request body before sending. @@ -118,29 +265,15 @@ impl PubkyHttpClient { if let Some(ref host) = host { match classify_host(host) { HostKind::ResolvedPubky => { - cross_log!( - debug, - "Routing request for resolved _pubky host {} via Pubky TLS", - host - ); + cross_log!(debug, "PubkyTLS request for resolved _pubky host {}", host); return self.http.request(method, url_str); } HostKind::Icann => { - // TODO: remove icann_http when we can control reqwest connection - // and or create a tls config per connection. - cross_log!( - debug, - "Routing request for ICANN host {} via standard TLS", - host - ); + cross_log!(debug, "Standard TLS request for ICANN host {}", host); return self.icann_http.request(method, url_str); } HostKind::Pubky => { - cross_log!( - debug, - "Routing request for pubky host {} via PubkyTLS", - host - ); + cross_log!(debug, "PubkyTLS request for pubky host {}", host); } } } @@ -148,3 +281,116 @@ impl PubkyHttpClient { self.http.request(method, url_str) } } + +#[cfg(test)] +mod tests { + use super::*; + use pkarr::dns::rdata::SVCB; + use pkarr::{Keypair, SignedPacket}; + + #[test] + fn classify_hosts() { + assert_eq!(classify_host("example.com"), HostKind::Icann); + let z32 = "o4dksfbqk85ogzdb5osziw6befigbuxmuxkuxq8434q89uj56uyy"; + assert_eq!( + classify_host(&format!("_pubky.{z32}")), + HostKind::ResolvedPubky + ); + assert_eq!(classify_host(z32), HostKind::Pubky); + } + + #[tokio::test] + async fn probe_unreachable_returns_false() { + let addr = "192.0.2.1:1".parse().unwrap(); // TEST-NET-1, RFC 5737 + assert!(!probe_reachable(&[addr], Duration::from_millis(100)).await); + } + + /// Helper: build a pkarr client with a pre-cached signed packet (no real network). + fn pkarr_with_packet(keypair: &Keypair, packet: &SignedPacket) -> pkarr::Client { + let mut builder = PubkyHttpClient::builder(); + builder.pkarr(|b| b.no_default_network().bootstrap(&["127.0.0.1:1"])); + let client = builder.build().unwrap(); + let cache_key: pkarr::CacheKey = keypair.public_key().into(); + client.pkarr.cache().unwrap().put(&cache_key, packet); + client.pkarr + } + + #[test] + fn build_pubky_request_icann_rewrites_url_and_sets_header() { + let client = PubkyHttpClient::builder() + .pkarr(|b| b.no_default_network().bootstrap(&["127.0.0.1:1"])) + .build() + .unwrap(); + let z32 = "o4dksfbqk85ogzdb5osziw6befigbuxmuxkuxq8434q89uj56uyy"; + let url = Url::parse(&format!("https://{z32}/pub/app/file.txt")).unwrap(); + let transport = ResolvedTransport::Icann { + domain: "example.com".to_string(), + port: Some(8443), + }; + + let req = client + .build_pubky_request(Method::GET, &url, z32, &transport) + .unwrap() + .build() + .unwrap(); + + assert_eq!(req.url().host_str(), Some("example.com")); + assert_eq!(req.url().port(), Some(8443)); + assert_eq!(req.url().path(), "/pub/app/file.txt"); + assert_eq!(req.headers().get("pubky-host").unwrap(), z32); + } + + #[tokio::test] + async fn resolve_transport_direct_only() { + let kp = Keypair::random(); + let mut svcb = SVCB::new(1, ".".try_into().unwrap()); + svcb.set_port(6881); + let packet = SignedPacket::builder() + .https(".".try_into().unwrap(), svcb, 3600) + .address(".".try_into().unwrap(), "192.0.2.1".parse().unwrap(), 3600) + .sign(&kp) + .unwrap(); + let pkarr = pkarr_with_packet(&kp, &packet); + + let t = TransportResolver::resolve_from_pkarr(&pkarr, &kp.public_key().to_string()).await; + assert!(matches!(t, ResolvedTransport::PubkyTls)); + } + + #[tokio::test] + async fn resolve_transport_icann_only() { + let kp = Keypair::random(); + let svcb = SVCB::new(1, "example.com".try_into().unwrap()); + let packet = SignedPacket::builder() + .https(".".try_into().unwrap(), svcb, 3600) + .sign(&kp) + .unwrap(); + let pkarr = pkarr_with_packet(&kp, &packet); + + let t = TransportResolver::resolve_from_pkarr(&pkarr, &kp.public_key().to_string()).await; + assert!(matches!(t, ResolvedTransport::Icann { .. })); + if let ResolvedTransport::Icann { domain, .. } = t { + assert_eq!(domain, "example.com"); + } + } + + #[tokio::test] + async fn resolve_transport_both_unreachable_direct_falls_back() { + let kp = Keypair::random(); + let mut direct = SVCB::new(1, ".".try_into().unwrap()); + direct.set_port(6881); + let icann = SVCB::new(10, "example.com".try_into().unwrap()); + let packet = SignedPacket::builder() + .https(".".try_into().unwrap(), direct, 3600) + .https(".".try_into().unwrap(), icann, 3600) + .address(".".try_into().unwrap(), "192.0.2.1".parse().unwrap(), 3600) + .sign(&kp) + .unwrap(); + let pkarr = pkarr_with_packet(&kp, &packet); + + let t = TransportResolver::resolve_from_pkarr(&pkarr, &kp.public_key().to_string()).await; + assert!( + matches!(t, ResolvedTransport::Icann { ref domain, .. } if domain == "example.com"), + "expected ICANN fallback, got {t:?}" + ); + } +}