diff --git a/Cargo.toml b/Cargo.toml index 934159a..961c030 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,11 +8,14 @@ iroh = "0.24.0" iroh-blobs = "0.24.0" # Pin to avoid keyvaluedb-sqlite 0.1.6 + rusqlite 0.38 break (u64 FromSql not implemented) keyvaluedb-sqlite = "=0.1.5" +keyvaluedb = "=0.1.6" # pin to prevent 0.1.7 which breaks keyvaluedb-sqlite 0.1.5 rusqlite = "=0.37.0" veilid-core = { git = "https://gitlab.com/veilid/veilid.git", tag = "v0.5.1" } +veilid-tools = { git = "https://gitlab.com/veilid/veilid.git", tag = "v0.5.1" } # v0.3.0 matches Veilid 0.5.1 API veilid-iroh-blobs = { git = "https://github.com/RangerMauve/veilid-iroh-blobs", tag = "v0.3.0" } tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } xdg = "2.4" tmpdir = "1" serde = "1.0.204" @@ -35,3 +38,4 @@ base64 = "0.22.1" # Temporary patch for Veilid fanout queue underflow bug [patch."https://gitlab.com/veilid/veilid.git"] veilid-core = { git = "https://gitlab.com/tripledoublev/veilid.git", branch = "fix-underflow" } +veilid-tools = { git = "https://gitlab.com/tripledoublev/veilid.git", branch = "fix-underflow" } diff --git a/src/backend.rs b/src/backend.rs index f45db69..67e5721 100644 --- a/src/backend.rs +++ b/src/backend.rs @@ -349,12 +349,47 @@ impl Backend { keys.public_key.clone().into_value(), keys.secret_key.clone().unwrap().into_value(), ); - let dht_record = routing_context - .open_dht_record( - record_key.clone(), - Some(KeyPair::new(CRYPTO_KIND_VLD0, bare_keypair)), - ) - .await?; + let keypair = Some(KeyPair::new(CRYPTO_KIND_VLD0, bare_keypair)); + let mut dht_record = None; + let max_retries = 6; + let mut retries = max_retries; + + while retries > 0 { + retries -= 1; + match routing_context + .open_dht_record(record_key.clone(), keypair.clone()) + .await + { + Ok(record) => { + dht_record = Some(record); + break; + } + Err(e) => { + warn!("Failed to open group DHT record: {e}. Retries left: {retries}"); + if retries == 0 { + return Err(anyhow!( + "Unable to open group DHT record after {max_retries} attempts: {e}" + )); + } + } + } + let backoff_ms = 500 * (max_retries - retries) as u64; + tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await; + } + + let dht_record = dht_record.ok_or_else(|| anyhow!("Group DHT record retrieval failed"))?; + + // Persist the keypair so refresh_group/get_group can reload it later + let protected_store = veilid.protected_store().unwrap(); + CommonKeypair { + id: record_key.clone(), + public_key: keys.public_key.clone(), + secret_key: keys.secret_key.clone(), + encryption_key: keys.encryption_key.clone(), + } + .store_keypair(&protected_store) + .await + .map_err(|e| anyhow!(e))?; let mut group = Group::new( dht_record.clone(), diff --git a/src/group.rs b/src/group.rs index fa85cab..dd5462b 100644 --- a/src/group.rs +++ b/src/group.rs @@ -138,9 +138,13 @@ impl Group { } // Retry configuration - const MAX_RETRIES: u32 = 3; + // Veilid route establishment + iroh tunnel setup can be transiently flaky, especially + // under load in CI or when routes are regenerating. Keep this bounded but resilient. + // Keep this bounded: higher-level callers (HTTP endpoints/tests) can retry too. + const MAX_RETRIES: u32 = 5; const INITIAL_DELAY_MS: u64 = 500; - const MAX_DELAY_MS: u64 = 2000; + const MAX_DELAY_MS: u64 = 4000; + const PER_PEER_TIMEOUT_SECS: u64 = 10; for attempt in 0..MAX_RETRIES { for repo in repos.iter() { @@ -153,26 +157,36 @@ impl Group { if let Ok(route_id_blob) = repo.get_route_id_blob().await { // It's faster to try and fail, than to ask then try - let result = self - .iroh_blobs - .download_file_from(route_id_blob, hash) - .await; - + // Guard against hung downloads so a single peer doesn't stall the whole request. + let result = tokio::time::timeout( + tokio::time::Duration::from_secs(PER_PEER_TIMEOUT_SECS), + self.iroh_blobs.download_file_from(route_id_blob, hash), + ) + .await; + match result { - Ok(()) => { + Ok(Ok(())) => { info!("Successfully downloaded hash {} from peer {}", hash.to_hex(), hex::encode(repo.id().opaque().ref_value()) ); return Ok(()); } - Err(e) => { + Ok(Err(e)) => { warn!( "Unable to download from peer {}: {}", hex::encode(repo.id().opaque().ref_value()), e ); } + Err(_) => { + warn!( + "Timed out downloading hash {} from peer {} after {}s", + hash.to_hex(), + hex::encode(repo.id().opaque().ref_value()), + PER_PEER_TIMEOUT_SECS + ); + } } } else { warn!(