Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ tempfile = "3"
futures-util = "0.3"
libp2p = { version = "0.56", features = ["tcp", "ping", "tokio", "macros", "secp256k1", "noise"] }
criterion = { version = "0.5", features = ["async_tokio", "html_reports"] }
mime = "0.3.17"

[profile.release]
lto = true
Expand Down
4 changes: 2 additions & 2 deletions benches/p2p_benchmarks.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
use neverust_core::{create_swarm, Block, BlockStore, Metrics};
use neverust_core::{blockexc::BlockExcMode, create_swarm, Block, BlockStore, Metrics};
use std::sync::Arc;
use tokio::runtime::Runtime;

Expand Down Expand Up @@ -65,7 +65,7 @@ fn bench_swarm_creation(c: &mut Criterion) {
let block_store = Arc::new(BlockStore::new());
let metrics = Metrics::new();
black_box(
create_swarm(block_store, "altruistic".to_string(), 0, metrics)
create_swarm(block_store, BlockExcMode::Altruistic, metrics)
.await
.unwrap(),
)
Expand Down
1 change: 1 addition & 0 deletions neverust-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ bincode = "1.3"
rocksdb = "0.22"
num_cpus = "1.16"
discv5 = "0.10"
mime = "0.3.17"

# High-performance transfer (TGP-style) via Citadel
citadel-transfer = { git = "https://github.com/rifflabs/citadel.git", branch = "main" }
Expand Down
40 changes: 21 additions & 19 deletions neverust-core/src/advertiser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ impl Advertiser {

Self {
discovery,
block_store: None,
tx,
rx: Arc::new(RwLock::new(rx)),
in_flight: Arc::new(RwLock::new(HashSet::new())),
max_concurrent,
readvertise_interval,
block_store: None,
in_flight: Arc::new(RwLock::new(HashSet::new())),
task_handle: Arc::new(RwLock::new(None)),
local_store_handle: Arc::new(RwLock::new(None)),
running: Arc::new(RwLock::new(false)),
Expand All @@ -152,18 +152,19 @@ impl Advertiser {
/// 1. Advertisement loop - processes queued blocks
/// 2. Local store loop - periodically iterates all blocks in BlockStore (if set)
pub async fn start(&self) -> Result<()> {
let mut running = self.running.write().await;
if *running {
return Err(AdvertiserError::AlreadyRunning);
}
{
let mut running = self.running.write().await;
if *running {
return Err(AdvertiserError::AlreadyRunning);
}

info!(
"Starting advertiser engine (max_concurrent={}, readvertise_interval={:?})",
self.max_concurrent, self.readvertise_interval
);
info!(
"Starting advertiser engine (max_concurrent={}, readvertise_interval={:?})",
self.max_concurrent, self.readvertise_interval
);

*running = true;
drop(running);
*running = true;
}

// Start advertisement loop
let handle = self.spawn_advertise_loop();
Expand All @@ -183,14 +184,15 @@ impl Advertiser {

/// Stop the advertiser engine
pub async fn stop(&self) {
let mut running = self.running.write().await;
if !*running {
return;
}
{
let mut running = self.running.write().await;
if !*running {
return;
}

info!("Stopping advertiser engine");
*running = false;
drop(running);
info!("Stopping advertiser engine");
*running = false;
}

// Send stop message
let _ = self.tx.send(AdvertiseMessage::Stop);
Expand Down
3 changes: 2 additions & 1 deletion neverust-core/src/archivist_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ pub struct ArchivistProof {
pub path: Vec<Vec<u8>>,
}

/// A proof node in a Merkle proof path (deprecated, use ArchivistProof instead)
/// A proof node in a Merkle proof path
#[deprecated = "use ArchivistProof instead"]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ProofNode {
/// The hash of the sibling node
Expand Down
3 changes: 3 additions & 0 deletions neverust-core/src/chunker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ impl<R: AsyncRead + Unpin> Chunker<R> {
Ok(Some(buffer))
}

}

impl<R> Chunker<R> {
/// Get the configured chunk size
pub fn chunk_size(&self) -> usize {
self.chunk_size
Expand Down
16 changes: 1 addition & 15 deletions neverust-core/src/cid_blake3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,14 @@ pub fn blake3_cid(data: &[u8]) -> Result<Cid, CidError> {
}

/// Streaming SHA-256 verifier for blocks (Archivist-compatible)
#[derive(Default)]
pub struct StreamingVerifier {
hasher: Sha256,
expected_cid: Option<Cid>,
bytes_processed: usize,
}

impl StreamingVerifier {
/// Create a new streaming verifier without expected CID
pub fn new() -> Self {
Self {
hasher: Sha256::new(),
expected_cid: None,
bytes_processed: 0,
}
}

/// Create a new streaming verifier with expected CID
pub fn new_with_cid(expected_cid: Cid) -> Self {
Self {
Expand Down Expand Up @@ -134,12 +126,6 @@ impl StreamingVerifier {
}
}

impl Default for StreamingVerifier {
fn default() -> Self {
Self::new()
}
}

/// Verify data against a CID using BLAKE3
pub fn verify_blake3(data: &[u8], expected_cid: &Cid) -> Result<(), CidError> {
let computed_cid = blake3_cid(data)?;
Expand Down
15 changes: 6 additions & 9 deletions neverust-core/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! They are encoded using protobuf and stored as blocks in the network.

use cid::Cid;
use mime::Mime;
use prost::Message as ProstMessage;
use std::io::Cursor;
use thiserror::Error;
Expand Down Expand Up @@ -115,7 +116,7 @@ pub struct Manifest {
/// Original filename (optional)
pub filename: Option<String>,
/// MIME type (optional)
pub mimetype: Option<String>,
pub mimetype: Option<Mime>,
/// Erasure coding information (if protected)
pub erasure: Option<ErasureInfo>,
}
Expand All @@ -131,7 +132,7 @@ impl Manifest {
hcodec: Option<u64>,
version: Option<u32>,
filename: Option<String>,
mimetype: Option<String>,
mimetype: Option<Mime>,
) -> Self {
Self {
tree_cid,
Expand Down Expand Up @@ -161,7 +162,7 @@ impl Manifest {
original_dataset_size: u64,
protected_strategy: StrategyType,
filename: Option<String>,
mimetype: Option<String>,
mimetype: Option<Mime>,
) -> Self {
Self {
tree_cid,
Expand Down Expand Up @@ -226,7 +227,7 @@ impl Manifest {
hcodec: self.hcodec as u32,
version: self.version,
filename: self.filename.clone().unwrap_or_default(),
mimetype: self.mimetype.clone().unwrap_or_default(),
mimetype: self.mimetype.as_ref().map(|mt| mt.essence_str().to_string()).unwrap_or_default().to_string(),
..Default::default()
};

Expand Down Expand Up @@ -340,11 +341,7 @@ impl Manifest {
} else {
Some(header.filename)
},
mimetype: if header.mimetype.is_empty() {
None
} else {
Some(header.mimetype)
},
mimetype: header.mimetype.parse().ok(),
erasure,
})
}
Expand Down
3 changes: 2 additions & 1 deletion neverust-core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ impl Metrics {
}

// Peer connection metrics

// Eventual consistency between cores is all that's required. Metrics that are slightly off
// because updates have not propogated across caches is non critical at time of writing.
pub fn peer_connected(&self) {
self.inner.peer_connections.fetch_add(1, Ordering::Relaxed);
self.inner.total_peers_seen.fetch_add(1, Ordering::Relaxed);
Expand Down
4 changes: 2 additions & 2 deletions neverust-core/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ mod tests {
let block_store = Arc::new(BlockStore::new());
let metrics = crate::metrics::Metrics::new();
let (swarm, _block_request_tx, _keypair) =
create_swarm(block_store, "altruistic".to_string(), 1, metrics)
create_swarm(block_store, BlockExcMode::Altruistic, metrics)
.await
.unwrap();
assert!(swarm.local_peer_id().to_string().len() > 0);
Expand All @@ -167,7 +167,7 @@ mod tests {
let block_store = Arc::new(BlockStore::new());
let metrics = crate::metrics::Metrics::new();
let (mut swarm, _block_request_tx, _keypair) =
create_swarm(block_store, "altruistic".to_string(), 1, metrics)
create_swarm(block_store, BlockExcMode::Altruistic, metrics)
.await
.unwrap();
let addr: Multiaddr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
Expand Down
24 changes: 9 additions & 15 deletions neverust-core/src/pending_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,32 +221,29 @@ impl PendingBlocksManager {

/// Get all pending block CIDs
pub fn get_pending_cids(&self) -> Vec<Cid> {
let state = self.state.lock().unwrap();
state.pending.keys().copied().collect()
self.state.lock().unwrap().pending.keys().copied().collect()
}

/// Get the number of pending blocks
pub fn len(&self) -> usize {
let state = self.state.lock().unwrap();
state.pending.len()
self.state.lock().unwrap().pending.len()
}

/// Check if there are no pending blocks
pub fn is_empty(&self) -> bool {
let state = self.state.lock().unwrap();
state.pending.is_empty()
self.state.lock().unwrap().pending.is_empty()
}

/// Get the number of retries remaining for a block
pub fn retries_remaining(&self, cid: &Cid) -> Option<u32> {
let state = self.state.lock().unwrap();
state.pending.get(cid).map(|p| p.retries_left)
self.state.lock().unwrap().pending.get(cid).map(|p| p.retries_left)
}

/// Check if retries are exhausted for a block
pub fn retries_exhausted(&self, cid: &Cid) -> bool {
let state = self.state.lock().unwrap();
state
self.state
.lock()
.unwrap()
.pending
.get(cid)
.map(|p| p.retries_left == 0)
Expand All @@ -257,18 +254,15 @@ impl PendingBlocksManager {
///
/// All waiters will receive channel errors.
pub fn clear(&self) {
let mut state = self.state.lock().unwrap();
state.pending.clear();
self.state.lock().unwrap().pending.clear();
trace!("Cleared all pending blocks");
}

/// Remove a pending block request without completing it
///
/// The waiter will receive a channel error.
pub fn cancel(&self, cid: &Cid) -> bool {
let mut state = self.state.lock().unwrap();

if state.pending.remove(cid).is_some() {
if self.state.lock().unwrap().pending.remove(cid).is_some() {
trace!(cid = ?cid, "Cancelled pending block request");
true
} else {
Expand Down
31 changes: 20 additions & 11 deletions neverust-core/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ use crate::{
metrics::Metrics,
p2p::{create_swarm, P2PError},
storage::BlockStore,
traffic,
traffic, Behaviour,
};
use futures::StreamExt;
use libp2p::{swarm::SwarmEvent, Multiaddr};
use libp2p::{swarm::SwarmEvent, Multiaddr, Swarm};
use std::sync::Arc;
use tokio::signal;
use tracing::{error, info, warn};
Expand Down Expand Up @@ -275,13 +275,29 @@ pub async fn run_node(config: Config) -> Result<(), P2PError> {
config.bootstrap_nodes.clone()
};


// Main event loop
main_loop(swarm, listen_addrs, bootstrap_addrs, metrics).await;

info!("Node stopped");
Ok(())
}
async fn main_loop(
mut swarm: Swarm<Behaviour>,
listen_addrs: Arc<std::sync::RwLock<Vec<Multiaddr>>>,
bootstrap_addrs: Vec<String>,
metrics: Metrics
) {
// Track if we've established listen addresses
let mut tcp_listening = false;
let mut bootstrapped = false;

// Main event loop
loop {
tokio::select! {
biased; // Check for ctrl_c first
_ = signal::ctrl_c() => {
info!("Received Ctrl+C, shutting down...");
break;
},
event = swarm.select_next_some() => {
match event {
SwarmEvent::NewListenAddr { address, .. } => {
Expand Down Expand Up @@ -415,13 +431,6 @@ pub async fn run_node(config: Config) -> Result<(), P2PError> {
_ => {}
}
}
_ = signal::ctrl_c() => {
info!("Received Ctrl+C, shutting down...");
break;
}
}
}

info!("Node stopped");
Ok(())
}
3 changes: 2 additions & 1 deletion neverust-core/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! RocksDB-backed persistent block storage
//! RocksDB-backed persistent block storage.
//! At time of writing, intent is to switch out for ReDB
//!
//! Provides CID-indexed block storage with BLAKE3 verification,
//! persistent storage via RocksDB, and optimized configuration
Expand Down
Loading