Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pubky-sdk/src/client/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ impl PubkyHttpClientBuilder {
#[cfg(not(target_arch = "wasm32"))]
icann_http: icann_http_builder.build()?,

#[cfg(not(target_arch = "wasm32"))]
transport: super::http_targets::native::TransportResolver::new(),

#[cfg(target_arch = "wasm32")]
testnet_host: self.testnet_host.clone(),
})
Expand Down Expand Up @@ -333,6 +336,10 @@ pub struct PubkyHttpClient {
#[cfg(not(target_arch = "wasm32"))]
pub(crate) icann_http: reqwest::Client,

/// Resolves and caches per-host transport decisions (`PubkyTLS` vs ICANN fallback).
#[cfg(not(target_arch = "wasm32"))]
pub(crate) transport: super::http_targets::native::TransportResolver,

/// The hostname to use for testnet URL transformations (WASM only).
#[cfg(target_arch = "wasm32")]
pub(crate) testnet_host: Option<String>,
Expand Down
308 changes: 277 additions & 31 deletions pubky-sdk/src/client/http_targets/native.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,130 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex, PoisonError, RwLock};
use std::time::{Duration, Instant};

use futures_util::StreamExt;
use tokio::net::TcpStream;

use crate::errors::RequestError;
use crate::{PubkyHttpClient, PublicKey, Result, cross_log};
use reqwest::{IntoUrl, Method, RequestBuilder};
use url::Url;

const TRANSPORT_CACHE_TTL: Duration = Duration::from_secs(60);
const PROBE_TIMEOUT: Duration = Duration::from_millis(1500);

#[derive(Debug, Clone)]
pub(crate) enum ResolvedTransport {
PubkyTls,
Icann { domain: String, port: Option<u16> },
}

/// Resolves and caches per-host transport decisions (`PubkyTLS` vs ICANN).
///
/// Accepts a `&pkarr::Client` reference when resolution is needed — does not
/// own the pkarr client, which is shared across the SDK.
#[derive(Debug, Clone)]
pub(crate) struct TransportResolver {
cache: Arc<RwLock<HashMap<String, (Instant, ResolvedTransport)>>>,
guards: Arc<Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>>,
}

impl TransportResolver {
pub(crate) fn new() -> Self {
Self {
cache: Arc::new(RwLock::new(HashMap::new())),
guards: Arc::new(Mutex::new(HashMap::new())),
}
}

/// Look up the transport for `pk`, resolving via PKARR on cache miss.
pub(crate) async fn resolve(&self, pk: &str, pkarr: &pkarr::Client) -> ResolvedTransport {
if let Some(t) = self.cached(pk) {
return t;
}
self.resolve_and_cache(pk, pkarr).await
}

/// Fast path: return a cached, non-expired transport decision.
fn cached(&self, pk: &str) -> Option<ResolvedTransport> {
let cache = self.cache.read().unwrap_or_else(PoisonError::into_inner);
cache
.get(pk)
.filter(|(ts, _)| ts.elapsed() < TRANSPORT_CACHE_TTL)
.map(|(_, t)| t.clone())
}

/// Slow path: acquire a per-key guard, double-check the cache, resolve,
/// and store the result.
async fn resolve_and_cache(&self, pk: &str, pkarr: &pkarr::Client) -> ResolvedTransport {
let guard = {
let mut guards = self.guards.lock().unwrap_or_else(PoisonError::into_inner);
Arc::clone(guards.entry(pk.to_string()).or_default())
};
let _lock = guard.lock().await;

// Another task may have resolved while we waited for the guard.
if let Some(t) = self.cached(pk) {
return t;
}

let t = Self::resolve_from_pkarr(pkarr, pk).await;
self.cache
.write()
.unwrap_or_else(PoisonError::into_inner)
.insert(pk.to_string(), (Instant::now(), t.clone()));
t
}

/// Inspect PKARR endpoints and probe reachability to pick a transport.
async fn resolve_from_pkarr(pkarr: &pkarr::Client, qname: &str) -> ResolvedTransport {
let stream = pkarr.resolve_https_endpoints(qname);
futures_util::pin_mut!(stream);

let mut has_direct = false;
let mut direct_addrs = Vec::new();
let mut icann: Option<(String, Option<u16>)> = None;

while let Some(ep) = stream.next().await {
if let Some(domain) = ep.domain() {
if icann.is_none() {
icann = Some((domain.to_string(), ep.port()));
}
} else {
has_direct = true;
direct_addrs.extend(ep.to_socket_addrs());
}
}

let Some((domain, port)) = icann else {
return ResolvedTransport::PubkyTls;
};
if !has_direct {
return ResolvedTransport::Icann { domain, port };
}

// Both exist — probe direct endpoint reachability.
if probe_reachable(&direct_addrs, PROBE_TIMEOUT).await {
ResolvedTransport::PubkyTls
} else {
cross_log!(
warn,
"Direct endpoint unreachable for {qname}; ICANN fallback to {domain}"
);
ResolvedTransport::Icann { domain, port }
}
}
}

async fn probe_reachable(addrs: &[std::net::SocketAddr], timeout: Duration) -> bool {
for addr in addrs {
if let Ok(Ok(_)) = tokio::time::timeout(timeout, TcpStream::connect(addr)).await {
return true;
}
}
false
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum HostKind {
ResolvedPubky,
Expand All @@ -26,40 +148,65 @@ fn classify_host(host: &str) -> HostKind {

impl PubkyHttpClient {
/// Constructs a [`reqwest::RequestBuilder`] for the given HTTP `method` and `url`,
/// routing through the clients unified request path.
/// routing through the client's unified request path.
///
/// This method ensures that special Pubky and pkarr hosts are resolved according to
/// platform-specific rules (native or WASM), including:
/// - Detecting `_pubky.<public-key>` hosts and applying the correct TLS handling.
/// - Routing standard ICANN domains through the `icann_http` client on native builds.
/// - When both a direct (IP:PORT) and an ICANN (domain) endpoint exist, TCP-probing
/// the direct endpoint and falling back to ICANN if unreachable.
///
/// On native targets, this is effectively a thin wrapper around [`PubkyHttpClient::request`],
/// while on WASM it also performs host transformation and may add the `pubky-host` header.
/// Transport decisions are cached per public key with a short TTL.
///
/// Returns a [`Result`] containing the prepared `RequestBuilder`, or a URL/transport
/// parsing error if the supplied `url` is invalid.
///
/// [`PubkyHttpClient::request`]: crate::PubkyHttpClient::request
#[allow(
clippy::unused_async,
reason = "native implementation stays async to share the same signature as the WASM backend"
)]
pub(crate) async fn cross_request(
&self,
method: Method,
mut url: Url,
) -> Result<RequestBuilder> {
let _ = self.prepare_request(&mut url).await?;
Ok(self.request(method, &url))
let Some(pk) = self.prepare_request(&mut url).await? else {
return Ok(self.request(method, &url));
};
let transport = self.transport.resolve(&pk, &self.pkarr).await;
self.build_pubky_request(method, &url, &pk, &transport)
}

/// Build a [`RequestBuilder`] for a resolved pubky host transport.
fn build_pubky_request(
Comment thread
86667 marked this conversation as resolved.
&self,
method: Method,
url: &Url,
pk: &str,
transport: &ResolvedTransport,
) -> Result<RequestBuilder> {
match transport {
ResolvedTransport::PubkyTls => Ok(self.http.request(method, url.as_str())),
ResolvedTransport::Icann { domain, port } => {
let mut icann_url = url.clone();
icann_url.set_host(Some(domain))?;
if let Some(p) = port {
icann_url
.set_port(Some(*p))
.map_err(|_err| url::ParseError::InvalidPort)?;
}
cross_log!(debug, "ICANN fallback for {pk} via {domain}");
Ok(self
.icann_http
.request(method, icann_url.as_str())
.header("pubky-host", pk))
}
}
}

/// Prepare a request for callers that need the WASM-style preflight.
/// Detect pubky hosts and return the z32 public key when applicable.
///
/// Native builds do not rewrite URLs; we only detect pubky hosts and return the
/// `pubky-host` value when applicable.
///
/// # Errors
/// - Returns [`crate::errors::RequestError::Validation`] if the host uses a `pubky` prefix.
/// Returns [`RequestError::Validation`] if the host uses a `pubky` prefix.
#[allow(
clippy::unused_async,
reason = "keep async signature aligned with WASM build"
Expand Down Expand Up @@ -94,7 +241,7 @@ impl PubkyHttpClient {
Ok(None)
}

/// Start building a `Request` with the `Method` and `Url` (native-only)
/// Start building a `Request` with the `Method` and `Url` (native-only).
///
/// Returns a `RequestBuilder`, which will allow setting headers and
/// the request body before sending.
Expand All @@ -118,33 +265,132 @@ impl PubkyHttpClient {
if let Some(ref host) = host {
match classify_host(host) {
HostKind::ResolvedPubky => {
cross_log!(
debug,
"Routing request for resolved _pubky host {} via Pubky TLS",
host
);
cross_log!(debug, "PubkyTLS request for resolved _pubky host {}", host);
return self.http.request(method, url_str);
}
HostKind::Icann => {
// TODO: remove icann_http when we can control reqwest connection
// and or create a tls config per connection.
cross_log!(
debug,
"Routing request for ICANN host {} via standard TLS",
host
);
cross_log!(debug, "Standard TLS request for ICANN host {}", host);
return self.icann_http.request(method, url_str);
}
HostKind::Pubky => {
cross_log!(
debug,
"Routing request for pubky host {} via PubkyTLS",
host
);
cross_log!(debug, "PubkyTLS request for pubky host {}", host);
}
}
}

self.http.request(method, url_str)
}
}

#[cfg(test)]
mod tests {
use super::*;
use pkarr::dns::rdata::SVCB;
use pkarr::{Keypair, SignedPacket};

#[test]
fn classify_hosts() {
assert_eq!(classify_host("example.com"), HostKind::Icann);
let z32 = "o4dksfbqk85ogzdb5osziw6befigbuxmuxkuxq8434q89uj56uyy";
assert_eq!(
classify_host(&format!("_pubky.{z32}")),
HostKind::ResolvedPubky
);
assert_eq!(classify_host(z32), HostKind::Pubky);
}

#[tokio::test]
async fn probe_unreachable_returns_false() {
let addr = "192.0.2.1:1".parse().unwrap(); // TEST-NET-1, RFC 5737
assert!(!probe_reachable(&[addr], Duration::from_millis(100)).await);
}

/// Helper: build a pkarr client with a pre-cached signed packet (no real network).
fn pkarr_with_packet(keypair: &Keypair, packet: &SignedPacket) -> pkarr::Client {
let mut builder = PubkyHttpClient::builder();
builder.pkarr(|b| b.no_default_network().bootstrap(&["127.0.0.1:1"]));
let client = builder.build().unwrap();
let cache_key: pkarr::CacheKey = keypair.public_key().into();
client.pkarr.cache().unwrap().put(&cache_key, packet);
client.pkarr
}

#[test]
fn build_pubky_request_icann_rewrites_url_and_sets_header() {
let client = PubkyHttpClient::builder()
.pkarr(|b| b.no_default_network().bootstrap(&["127.0.0.1:1"]))
.build()
.unwrap();
let z32 = "o4dksfbqk85ogzdb5osziw6befigbuxmuxkuxq8434q89uj56uyy";
let url = Url::parse(&format!("https://{z32}/pub/app/file.txt")).unwrap();
let transport = ResolvedTransport::Icann {
domain: "example.com".to_string(),
port: Some(8443),
};

let req = client
.build_pubky_request(Method::GET, &url, z32, &transport)
.unwrap()
.build()
.unwrap();

assert_eq!(req.url().host_str(), Some("example.com"));
assert_eq!(req.url().port(), Some(8443));
assert_eq!(req.url().path(), "/pub/app/file.txt");
assert_eq!(req.headers().get("pubky-host").unwrap(), z32);
}

#[tokio::test]
async fn resolve_transport_direct_only() {
let kp = Keypair::random();
let mut svcb = SVCB::new(1, ".".try_into().unwrap());
svcb.set_port(6881);
let packet = SignedPacket::builder()
.https(".".try_into().unwrap(), svcb, 3600)
.address(".".try_into().unwrap(), "192.0.2.1".parse().unwrap(), 3600)
.sign(&kp)
.unwrap();
let pkarr = pkarr_with_packet(&kp, &packet);

let t = TransportResolver::resolve_from_pkarr(&pkarr, &kp.public_key().to_string()).await;
assert!(matches!(t, ResolvedTransport::PubkyTls));
}

#[tokio::test]
async fn resolve_transport_icann_only() {
let kp = Keypair::random();
let svcb = SVCB::new(1, "example.com".try_into().unwrap());
let packet = SignedPacket::builder()
.https(".".try_into().unwrap(), svcb, 3600)
.sign(&kp)
.unwrap();
let pkarr = pkarr_with_packet(&kp, &packet);

let t = TransportResolver::resolve_from_pkarr(&pkarr, &kp.public_key().to_string()).await;
assert!(matches!(t, ResolvedTransport::Icann { .. }));
if let ResolvedTransport::Icann { domain, .. } = t {
assert_eq!(domain, "example.com");
}
}

#[tokio::test]
async fn resolve_transport_both_unreachable_direct_falls_back() {
let kp = Keypair::random();
let mut direct = SVCB::new(1, ".".try_into().unwrap());
direct.set_port(6881);
let icann = SVCB::new(10, "example.com".try_into().unwrap());
let packet = SignedPacket::builder()
.https(".".try_into().unwrap(), direct, 3600)
.https(".".try_into().unwrap(), icann, 3600)
.address(".".try_into().unwrap(), "192.0.2.1".parse().unwrap(), 3600)
.sign(&kp)
.unwrap();
let pkarr = pkarr_with_packet(&kp, &packet);

let t = TransportResolver::resolve_from_pkarr(&pkarr, &kp.public_key().to_string()).await;
assert!(
matches!(t, ResolvedTransport::Icann { ref domain, .. } if domain == "example.com"),
"expected ICANN fallback, got {t:?}"
);
}
}
Loading