diff --git a/Cargo.lock b/Cargo.lock index 53258e7..f79c077 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2704,6 +2704,7 @@ dependencies = [ "criterion", "futures-util", "libp2p", + "mime", "neverust-core", "predicates", "tempfile", @@ -2728,6 +2729,7 @@ dependencies = [ "futures", "hex", "libp2p", + "mime", "multihash", "num_cpus", "prost", diff --git a/Cargo.toml b/Cargo.toml index 733451e..f06d84d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ tempfile = "3" futures-util = "0.3" libp2p = { version = "0.56", features = ["tcp", "ping", "tokio", "macros", "secp256k1", "noise"] } criterion = { version = "0.5", features = ["async_tokio", "html_reports"] } +mime = "0.3.17" [profile.release] lto = true diff --git a/benches/p2p_benchmarks.rs b/benches/p2p_benchmarks.rs index 0acd185..bfa1cc2 100644 --- a/benches/p2p_benchmarks.rs +++ b/benches/p2p_benchmarks.rs @@ -1,5 +1,5 @@ use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; -use neverust_core::{create_swarm, Block, BlockStore, Metrics}; +use neverust_core::{blockexc::BlockExcMode, create_swarm, Block, BlockStore, Metrics}; use std::sync::Arc; use tokio::runtime::Runtime; @@ -65,7 +65,7 @@ fn bench_swarm_creation(c: &mut Criterion) { let block_store = Arc::new(BlockStore::new()); let metrics = Metrics::new(); black_box( - create_swarm(block_store, "altruistic".to_string(), 0, metrics) + create_swarm(block_store, BlockExcMode::Altruistic, metrics) .await .unwrap(), ) diff --git a/neverust-core/Cargo.toml b/neverust-core/Cargo.toml index 27d6a06..0ea9819 100644 --- a/neverust-core/Cargo.toml +++ b/neverust-core/Cargo.toml @@ -35,6 +35,7 @@ bincode = "1.3" rocksdb = "0.22" num_cpus = "1.16" discv5 = "0.10" +mime = "0.3.17" # High-performance transfer (TGP-style) via Citadel citadel-transfer = { git = "https://github.com/rifflabs/citadel.git", branch = "main" } diff --git a/neverust-core/src/advertiser.rs b/neverust-core/src/advertiser.rs index 2e4932a..553450d 100644 --- a/neverust-core/src/advertiser.rs +++ b/neverust-core/src/advertiser.rs @@ -121,12 +121,12 @@ impl Advertiser { Self { discovery, - block_store: None, tx, rx: Arc::new(RwLock::new(rx)), - in_flight: Arc::new(RwLock::new(HashSet::new())), max_concurrent, readvertise_interval, + block_store: None, + in_flight: Arc::new(RwLock::new(HashSet::new())), task_handle: Arc::new(RwLock::new(None)), local_store_handle: Arc::new(RwLock::new(None)), running: Arc::new(RwLock::new(false)), @@ -152,18 +152,19 @@ impl Advertiser { /// 1. Advertisement loop - processes queued blocks /// 2. Local store loop - periodically iterates all blocks in BlockStore (if set) pub async fn start(&self) -> Result<()> { - let mut running = self.running.write().await; - if *running { - return Err(AdvertiserError::AlreadyRunning); - } + { + let mut running = self.running.write().await; + if *running { + return Err(AdvertiserError::AlreadyRunning); + } - info!( - "Starting advertiser engine (max_concurrent={}, readvertise_interval={:?})", - self.max_concurrent, self.readvertise_interval - ); + info!( + "Starting advertiser engine (max_concurrent={}, readvertise_interval={:?})", + self.max_concurrent, self.readvertise_interval + ); - *running = true; - drop(running); + *running = true; + } // Start advertisement loop let handle = self.spawn_advertise_loop(); @@ -183,14 +184,15 @@ impl Advertiser { /// Stop the advertiser engine pub async fn stop(&self) { - let mut running = self.running.write().await; - if !*running { - return; - } + { + let mut running = self.running.write().await; + if !*running { + return; + } - info!("Stopping advertiser engine"); - *running = false; - drop(running); + info!("Stopping advertiser engine"); + *running = false; + } // Send stop message let _ = self.tx.send(AdvertiseMessage::Stop); diff --git a/neverust-core/src/archivist_tree.rs b/neverust-core/src/archivist_tree.rs index fe52c38..cd4a30f 100644 --- a/neverust-core/src/archivist_tree.rs +++ b/neverust-core/src/archivist_tree.rs @@ -88,7 +88,8 @@ pub struct ArchivistProof { pub path: Vec>, } -/// A proof node in a Merkle proof path (deprecated, use ArchivistProof instead) +/// A proof node in a Merkle proof path +#[deprecated = "use ArchivistProof instead"] #[derive(Debug, Clone, PartialEq, Eq)] pub struct ProofNode { /// The hash of the sibling node diff --git a/neverust-core/src/chunker.rs b/neverust-core/src/chunker.rs index b8687c2..32ece86 100644 --- a/neverust-core/src/chunker.rs +++ b/neverust-core/src/chunker.rs @@ -64,6 +64,9 @@ impl Chunker { Ok(Some(buffer)) } +} + +impl Chunker { /// Get the configured chunk size pub fn chunk_size(&self) -> usize { self.chunk_size diff --git a/neverust-core/src/cid_blake3.rs b/neverust-core/src/cid_blake3.rs index f613c09..73b1a6c 100644 --- a/neverust-core/src/cid_blake3.rs +++ b/neverust-core/src/cid_blake3.rs @@ -54,6 +54,7 @@ pub fn blake3_cid(data: &[u8]) -> Result { } /// Streaming SHA-256 verifier for blocks (Archivist-compatible) +#[derive(Default)] pub struct StreamingVerifier { hasher: Sha256, expected_cid: Option, @@ -61,15 +62,6 @@ pub struct StreamingVerifier { } impl StreamingVerifier { - /// Create a new streaming verifier without expected CID - pub fn new() -> Self { - Self { - hasher: Sha256::new(), - expected_cid: None, - bytes_processed: 0, - } - } - /// Create a new streaming verifier with expected CID pub fn new_with_cid(expected_cid: Cid) -> Self { Self { @@ -134,12 +126,6 @@ impl StreamingVerifier { } } -impl Default for StreamingVerifier { - fn default() -> Self { - Self::new() - } -} - /// Verify data against a CID using BLAKE3 pub fn verify_blake3(data: &[u8], expected_cid: &Cid) -> Result<(), CidError> { let computed_cid = blake3_cid(data)?; diff --git a/neverust-core/src/manifest.rs b/neverust-core/src/manifest.rs index 9dededd..470dcc5 100644 --- a/neverust-core/src/manifest.rs +++ b/neverust-core/src/manifest.rs @@ -4,6 +4,7 @@ //! They are encoded using protobuf and stored as blocks in the network. use cid::Cid; +use mime::Mime; use prost::Message as ProstMessage; use std::io::Cursor; use thiserror::Error; @@ -115,7 +116,7 @@ pub struct Manifest { /// Original filename (optional) pub filename: Option, /// MIME type (optional) - pub mimetype: Option, + pub mimetype: Option, /// Erasure coding information (if protected) pub erasure: Option, } @@ -131,7 +132,7 @@ impl Manifest { hcodec: Option, version: Option, filename: Option, - mimetype: Option, + mimetype: Option, ) -> Self { Self { tree_cid, @@ -161,7 +162,7 @@ impl Manifest { original_dataset_size: u64, protected_strategy: StrategyType, filename: Option, - mimetype: Option, + mimetype: Option, ) -> Self { Self { tree_cid, @@ -226,7 +227,7 @@ impl Manifest { hcodec: self.hcodec as u32, version: self.version, filename: self.filename.clone().unwrap_or_default(), - mimetype: self.mimetype.clone().unwrap_or_default(), + mimetype: self.mimetype.as_ref().map(|mt| mt.essence_str().to_string()).unwrap_or_default().to_string(), ..Default::default() }; @@ -340,11 +341,7 @@ impl Manifest { } else { Some(header.filename) }, - mimetype: if header.mimetype.is_empty() { - None - } else { - Some(header.mimetype) - }, + mimetype: header.mimetype.parse().ok(), erasure, }) } diff --git a/neverust-core/src/metrics.rs b/neverust-core/src/metrics.rs index f6c1ccd..cc97128 100644 --- a/neverust-core/src/metrics.rs +++ b/neverust-core/src/metrics.rs @@ -68,7 +68,8 @@ impl Metrics { } // Peer connection metrics - + // Eventual consistency between cores is all that's required. Metrics that are slightly off + // because updates have not propogated across caches is non critical at time of writing. pub fn peer_connected(&self) { self.inner.peer_connections.fetch_add(1, Ordering::Relaxed); self.inner.total_peers_seen.fetch_add(1, Ordering::Relaxed); diff --git a/neverust-core/src/p2p.rs b/neverust-core/src/p2p.rs index b531b26..dcbc6c3 100644 --- a/neverust-core/src/p2p.rs +++ b/neverust-core/src/p2p.rs @@ -156,7 +156,7 @@ mod tests { let block_store = Arc::new(BlockStore::new()); let metrics = crate::metrics::Metrics::new(); let (swarm, _block_request_tx, _keypair) = - create_swarm(block_store, "altruistic".to_string(), 1, metrics) + create_swarm(block_store, BlockExcMode::Altruistic, metrics) .await .unwrap(); assert!(swarm.local_peer_id().to_string().len() > 0); @@ -167,7 +167,7 @@ mod tests { let block_store = Arc::new(BlockStore::new()); let metrics = crate::metrics::Metrics::new(); let (mut swarm, _block_request_tx, _keypair) = - create_swarm(block_store, "altruistic".to_string(), 1, metrics) + create_swarm(block_store, BlockExcMode::Altruistic, metrics) .await .unwrap(); let addr: Multiaddr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); diff --git a/neverust-core/src/pending_blocks.rs b/neverust-core/src/pending_blocks.rs index f9b7eef..5eff89b 100644 --- a/neverust-core/src/pending_blocks.rs +++ b/neverust-core/src/pending_blocks.rs @@ -221,32 +221,29 @@ impl PendingBlocksManager { /// Get all pending block CIDs pub fn get_pending_cids(&self) -> Vec { - let state = self.state.lock().unwrap(); - state.pending.keys().copied().collect() + self.state.lock().unwrap().pending.keys().copied().collect() } /// Get the number of pending blocks pub fn len(&self) -> usize { - let state = self.state.lock().unwrap(); - state.pending.len() + self.state.lock().unwrap().pending.len() } /// Check if there are no pending blocks pub fn is_empty(&self) -> bool { - let state = self.state.lock().unwrap(); - state.pending.is_empty() + self.state.lock().unwrap().pending.is_empty() } /// Get the number of retries remaining for a block pub fn retries_remaining(&self, cid: &Cid) -> Option { - let state = self.state.lock().unwrap(); - state.pending.get(cid).map(|p| p.retries_left) + self.state.lock().unwrap().pending.get(cid).map(|p| p.retries_left) } /// Check if retries are exhausted for a block pub fn retries_exhausted(&self, cid: &Cid) -> bool { - let state = self.state.lock().unwrap(); - state + self.state + .lock() + .unwrap() .pending .get(cid) .map(|p| p.retries_left == 0) @@ -257,8 +254,7 @@ impl PendingBlocksManager { /// /// All waiters will receive channel errors. pub fn clear(&self) { - let mut state = self.state.lock().unwrap(); - state.pending.clear(); + self.state.lock().unwrap().pending.clear(); trace!("Cleared all pending blocks"); } @@ -266,9 +262,7 @@ impl PendingBlocksManager { /// /// The waiter will receive a channel error. pub fn cancel(&self, cid: &Cid) -> bool { - let mut state = self.state.lock().unwrap(); - - if state.pending.remove(cid).is_some() { + if self.state.lock().unwrap().pending.remove(cid).is_some() { trace!(cid = ?cid, "Cancelled pending block request"); true } else { diff --git a/neverust-core/src/runtime.rs b/neverust-core/src/runtime.rs index 1cfd26b..d75f526 100644 --- a/neverust-core/src/runtime.rs +++ b/neverust-core/src/runtime.rs @@ -14,10 +14,10 @@ use crate::{ metrics::Metrics, p2p::{create_swarm, P2PError}, storage::BlockStore, - traffic, + traffic, Behaviour, }; use futures::StreamExt; -use libp2p::{swarm::SwarmEvent, Multiaddr}; +use libp2p::{swarm::SwarmEvent, Multiaddr, Swarm}; use std::sync::Arc; use tokio::signal; use tracing::{error, info, warn}; @@ -275,13 +275,29 @@ pub async fn run_node(config: Config) -> Result<(), P2PError> { config.bootstrap_nodes.clone() }; + + // Main event loop + main_loop(swarm, listen_addrs, bootstrap_addrs, metrics).await; + + info!("Node stopped"); + Ok(()) +} +async fn main_loop( + mut swarm: Swarm, + listen_addrs: Arc>>, + bootstrap_addrs: Vec, + metrics: Metrics +) { // Track if we've established listen addresses let mut tcp_listening = false; let mut bootstrapped = false; - - // Main event loop loop { tokio::select! { + biased; // Check for ctrl_c first + _ = signal::ctrl_c() => { + info!("Received Ctrl+C, shutting down..."); + break; + }, event = swarm.select_next_some() => { match event { SwarmEvent::NewListenAddr { address, .. } => { @@ -415,13 +431,6 @@ pub async fn run_node(config: Config) -> Result<(), P2PError> { _ => {} } } - _ = signal::ctrl_c() => { - info!("Received Ctrl+C, shutting down..."); - break; - } } } - - info!("Node stopped"); - Ok(()) } diff --git a/neverust-core/src/storage.rs b/neverust-core/src/storage.rs index 968f598..b3f1105 100644 --- a/neverust-core/src/storage.rs +++ b/neverust-core/src/storage.rs @@ -1,4 +1,5 @@ -//! RocksDB-backed persistent block storage +//! RocksDB-backed persistent block storage. +//! At time of writing, intent is to switch out for ReDB //! //! Provides CID-indexed block storage with BLAKE3 verification, //! persistent storage via RocksDB, and optimized configuration diff --git a/neverust-core/src/traffic.rs b/neverust-core/src/traffic.rs index da789d7..6ca7616 100644 --- a/neverust-core/src/traffic.rs +++ b/neverust-core/src/traffic.rs @@ -116,6 +116,10 @@ async fn block_upload_loop_p2p( let base_interval = Duration::from_secs(60) / config.upload_rate; loop { + // Add random jitter (0-50% of base interval) + let jitter_ms = rand::random::() % (base_interval.as_millis() as u64 / 2); + let jitter = Duration::from_millis(jitter_ms); + sleep(base_interval + jitter).await; // Generate random block data (1 MiB) let data: Vec = { let mut rng = rand::thread_rng(); @@ -123,41 +127,36 @@ async fn block_upload_loop_p2p( }; // Create and store block - match Block::new(data) { - Ok(block) => { - let cid = block.cid; - match block_store.put(block).await { - Ok(_) => { - info!("[TRAFFIC-P2P] Node {} generated 1MiB block: {} - advertising to network", config.node_id, cid); - - // Track this CID for P2P discovery - known_cids.write().await.insert(cid); - - // Advertise block availability via P2P - if let Err(e) = p2p_tx.send(P2PCommand::AdvertiseBlock(cid)) { - warn!("[TRAFFIC-P2P] Failed to advertise block {}: {}", cid, e); - } - } - Err(e) => { - warn!( - "[TRAFFIC-P2P] Node {} failed to store block: {}", - config.node_id, e - ); - } + let block = match Block::new(data) { + Ok(block) => block, + Err(e) => { + warn!( + "[TRAFFIC-P2P] Node {} failed to create block: {}", + config.node_id, e + ); + continue + } + }; + let cid = block.cid; + match block_store.put(block).await { + Ok(_) => { + info!("[TRAFFIC-P2P] Node {} generated 1MiB block: {} - advertising to network", config.node_id, cid); + + // Track this CID for P2P discovery + known_cids.write().await.insert(cid); + + // Advertise block availability via P2P + if let Err(e) = p2p_tx.send(P2PCommand::AdvertiseBlock(cid)) { + warn!("[TRAFFIC-P2P] Failed to advertise block {}: {}", cid, e); } } Err(e) => { warn!( - "[TRAFFIC-P2P] Node {} failed to create block: {}", + "[TRAFFIC-P2P] Node {} failed to store block: {}", config.node_id, e ); } } - - // Add random jitter (0-50% of base interval) - let jitter_ms = rand::random::() % (base_interval.as_millis() as u64 / 2); - let jitter = Duration::from_millis(jitter_ms); - sleep(base_interval + jitter).await; } } diff --git a/tests/manifest_integration.rs b/tests/manifest_integration.rs index a69336d..cdc90a8 100644 --- a/tests/manifest_integration.rs +++ b/tests/manifest_integration.rs @@ -80,7 +80,7 @@ async fn test_manifest_with_metadata() { Some(0x12), // sha2-256 codec Some(1), // version None, // filename - Some("text/plain".to_string()), // mimetype + Some(mime::TEXT_PLAIN), // mimetype ); assert_eq!(manifest.blocks_count(), 1); @@ -146,7 +146,7 @@ async fn test_manifest_encoding_decoding() { Some(0x12), Some(1), None, // filename - Some("application/octet-stream".to_string()), // mimetype + Some(mime::APPLICATION_OCTET_STREAM), // mimetype ); // Encode