Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions neverust-core/src/advertiser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,15 @@ impl Advertiser {

/// Stop the advertiser engine
pub async fn stop(&self) {
{
{
let mut running = self.running.write().await;
if !*running {
return;
}

info!("Stopping advertiser engine");
*running = false;
}
}

// Send stop message
let _ = self.tx.send(AdvertiseMessage::Stop);
Expand Down
28 changes: 19 additions & 9 deletions neverust-core/src/blockexc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ async fn write_length_prefixed<W: AsyncWriteExt + Unpin>(
pub enum BlockExcMode {
#[default]
Altruistic,
MarketPlace{ price_per_byte: u64 },
MarketPlace {
price_per_byte: u64,
},
}
impl std::str::FromStr for BlockExcMode {
type Err = &'static str;
Expand All @@ -95,17 +97,16 @@ impl std::str::FromStr for BlockExcMode {
match s {
"altruistic" => Ok(Self::Altruistic),
"marketplace" => Ok(Self::MarketPlace { price_per_byte: 1 }),
_ => Err("unrecognised mode (options: 'altruistic', 'marketplace')")
_ => Err("unrecognised mode (options: 'altruistic', 'marketplace')"),
}
}
}


impl BlockExcMode {
pub fn mode_string(&self) -> String {
match self {
Self::Altruistic => "altruistic".to_string(),
Self::MarketPlace { price_per_byte } => format!("Market @ {} per byte", price_per_byte)
Self::MarketPlace { price_per_byte } => format!("Market @ {} per byte", price_per_byte),
}
}
fn price_per_byte(&self) -> Option<u64> {
Expand All @@ -115,7 +116,6 @@ impl BlockExcMode {
None
}
}

}
/// BlockExc connection handler
pub struct BlockExcHandler {
Expand Down Expand Up @@ -244,7 +244,11 @@ impl ConnectionHandler for BlockExcHandler {
let block_store = self.block_store.clone();
let mode = self.mode.clone();
let metrics = self.metrics.clone();
info!("BlockExc: Fully negotiated inbound stream from {} (mode: {})", peer_id, mode.mode_string());
info!(
"BlockExc: Fully negotiated inbound stream from {} (mode: {})",
peer_id,
mode.mode_string()
);

// Spawn task to handle the stream - read messages from remote peer
tokio::spawn(async move {
Expand Down Expand Up @@ -364,7 +368,10 @@ impl ConnectionHandler for BlockExcHandler {
break;
}
}
} else if let BlockExcMode::MarketPlace { price_per_byte: _ } = mode {
} else if let BlockExcMode::MarketPlace {
price_per_byte: _,
} = mode
{
// MARKETPLACE MODE: Check payment before serving
info!("BlockExc: MARKETPLACE MODE - checking payment from {}", peer_id);

Expand Down Expand Up @@ -443,7 +450,10 @@ impl ConnectionHandler for BlockExcHandler {
{
let block_price =
(block.data.len() as u64)
* mode.price_per_byte().unwrap_or_default();
* mode
.price_per_byte()
.unwrap_or_default(
);
info!("BlockExc: Block {} available for {} units", cid, block_price);

block_presences.push(
Expand Down Expand Up @@ -481,7 +491,7 @@ impl ConnectionHandler for BlockExcHandler {
}
}
}
}
}
}
}
Err(e) => {
Expand Down
1 change: 0 additions & 1 deletion neverust-core/src/chunker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ impl<R: AsyncRead + Unpin> Chunker<R> {

Ok(Some(buffer))
}

}

impl<R> Chunker<R> {
Expand Down
2 changes: 1 addition & 1 deletion neverust-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl Default for Config {
api_port: 8080,
log_level: "info".to_string(),
bootstrap_nodes: Vec::new(),
mode: BlockExcMode::MarketPlace { price_per_byte: 1 }
mode: BlockExcMode::MarketPlace { price_per_byte: 1 },
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion neverust-core/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,12 @@ impl Manifest {
hcodec: self.hcodec as u32,
version: self.version,
filename: self.filename.clone().unwrap_or_default(),
mimetype: self.mimetype.as_ref().map(|mt| mt.essence_str().to_string()).unwrap_or_default().to_string(),
mimetype: self
.mimetype
.as_ref()
.map(|mt| mt.essence_str().to_string())
.unwrap_or_default()
.to_string(),
..Default::default()
};

Expand Down
5 changes: 2 additions & 3 deletions neverust-core/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;

use crate::blockexc::{BlockExcMode, BlockExcBehaviour };
use crate::blockexc::{BlockExcBehaviour, BlockExcMode};
use crate::identify_shim::{IdentifyBehaviour, IdentifyConfig};
use crate::storage::BlockStore;

Expand Down Expand Up @@ -117,8 +117,7 @@ pub async fn create_swarm(
let identify_behaviour = IdentifyBehaviour::new(identify_config);

// Create behavior: BlockExc + Identify
let (blockexc_behaviour, block_request_tx) =
BlockExcBehaviour::new(block_store, mode, metrics);
let (blockexc_behaviour, block_request_tx) = BlockExcBehaviour::new(block_store, mode, metrics);
let behaviour = Behaviour {
blockexc: blockexc_behaviour,
identify: identify_behaviour,
Expand Down
7 changes: 6 additions & 1 deletion neverust-core/src/pending_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,12 @@ impl PendingBlocksManager {

/// Get the number of retries remaining for a block
pub fn retries_remaining(&self, cid: &Cid) -> Option<u32> {
self.state.lock().unwrap().pending.get(cid).map(|p| p.retries_left)
self.state
.lock()
.unwrap()
.pending
.get(cid)
.map(|p| p.retries_left)
}

/// Check if retries are exhausted for a block
Expand Down
11 changes: 3 additions & 8 deletions neverust-core/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,8 @@ pub async fn run_node(config: Config) -> Result<(), P2PError> {
info!("Initialized metrics collector");

// Create swarm first to get peer ID (pass metrics for P2P traffic tracking)
let (mut swarm, block_request_tx, keypair) = create_swarm(
block_store.clone(),
config.mode.clone(),
metrics.clone(),
)
.await?;
let (mut swarm, block_request_tx, keypair) =
create_swarm(block_store.clone(), config.mode.clone(), metrics.clone()).await?;
let peer_id = swarm.local_peer_id().to_string();

// Initialize BlockExc client for requesting blocks from peers (via channel to swarm)
Expand Down Expand Up @@ -275,7 +271,6 @@ pub async fn run_node(config: Config) -> Result<(), P2PError> {
config.bootstrap_nodes.clone()
};


// Main event loop
main_loop(swarm, listen_addrs, bootstrap_addrs, metrics).await;

Expand All @@ -286,7 +281,7 @@ async fn main_loop(
mut swarm: Swarm<Behaviour>,
listen_addrs: Arc<std::sync::RwLock<Vec<Multiaddr>>>,
bootstrap_addrs: Vec<String>,
metrics: Metrics
metrics: Metrics,
) {
// Track if we've established listen addresses
let mut tcp_listening = false;
Expand Down
7 changes: 5 additions & 2 deletions neverust-core/src/traffic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,16 @@ async fn block_upload_loop_p2p(
"[TRAFFIC-P2P] Node {} failed to create block: {}",
config.node_id, e
);
continue
continue;
}
};
let cid = block.cid;
match block_store.put(block).await {
Ok(_) => {
info!("[TRAFFIC-P2P] Node {} generated 1MiB block: {} - advertising to network", config.node_id, cid);
info!(
"[TRAFFIC-P2P] Node {} generated 1MiB block: {} - advertising to network",
config.node_id, cid
);

// Track this CID for P2P discovery
known_cids.write().await.insert(cid);
Expand Down
28 changes: 12 additions & 16 deletions tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ async fn test_create_swarm_and_listen() {

let block_store = Arc::new(BlockStore::new());
let metrics = Metrics::new();
let (mut swarm, _tx, _keypair) =
create_swarm(block_store, BlockExcMode::Altruistic, metrics)
.await
.expect("Failed to create swarm");
let (mut swarm, _tx, _keypair) = create_swarm(block_store, BlockExcMode::Altruistic, metrics)
.await
.expect("Failed to create swarm");
let peer_id = *swarm.local_peer_id();

info!("✅ Created swarm with peer ID: {}", peer_id);
Expand Down Expand Up @@ -114,10 +113,9 @@ async fn test_dial_bootstrap_node() {
// Create swarm
let block_store = Arc::new(BlockStore::new());
let metrics = Metrics::new();
let (mut swarm, _tx, _keypair) =
create_swarm(block_store, BlockExcMode::Altruistic, metrics)
.await
.expect("Failed to create swarm");
let (mut swarm, _tx, _keypair) = create_swarm(block_store, BlockExcMode::Altruistic, metrics)
.await
.expect("Failed to create swarm");
let local_peer_id = *swarm.local_peer_id();
info!("📝 Local peer ID: {}", local_peer_id);

Expand Down Expand Up @@ -254,10 +252,9 @@ async fn test_connect_and_verify_all_protocols() {
// Create swarm
let block_store = Arc::new(BlockStore::new());
let metrics = Metrics::new();
let (mut swarm, _tx, _keypair) =
create_swarm(block_store, BlockExcMode::Altruistic, metrics)
.await
.expect("Failed to create swarm");
let (mut swarm, _tx, _keypair) = create_swarm(block_store, BlockExcMode::Altruistic, metrics)
.await
.expect("Failed to create swarm");
info!("📝 Local peer: {}", swarm.local_peer_id());

// Listen
Expand Down Expand Up @@ -509,10 +506,9 @@ async fn test_blockexc_protocol_detailed() {
// Create swarm
let block_store = Arc::new(BlockStore::new());
let metrics = Metrics::new();
let (mut swarm, _tx, _keypair) =
create_swarm(block_store, BlockExcMode::Altruistic, metrics)
.await
.expect("Failed to create swarm");
let (mut swarm, _tx, _keypair) = create_swarm(block_store, BlockExcMode::Altruistic, metrics)
.await
.expect("Failed to create swarm");
let local_peer_id = *swarm.local_peer_id();
info!("📝 Local peer: {}", local_peer_id);

Expand Down
10 changes: 5 additions & 5 deletions tests/manifest_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ async fn test_manifest_with_metadata() {
tree_cid,
DEFAULT_BLOCK_SIZE as u64,
test_data.len() as u64,
Some(0xcd02), // codex-block codec
Some(0x12), // sha2-256 codec
Some(1), // version
None, // filename
Some(0xcd02), // codex-block codec
Some(0x12), // sha2-256 codec
Some(1), // version
None, // filename
Some(mime::TEXT_PLAIN), // mimetype
);

Expand Down Expand Up @@ -145,7 +145,7 @@ async fn test_manifest_encoding_decoding() {
Some(0xcd02),
Some(0x12),
Some(1),
None, // filename
None, // filename
Some(mime::APPLICATION_OCTET_STREAM), // mimetype
);

Expand Down