From 019bddbf54a75ea0bfe324d3960b633aa6b282d8 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 30 Mar 2026 18:13:26 +0200 Subject: [PATCH 1/4] feat: add close group views to quote responses and validate during payment Nodes now return their local close group view (up to CLOSE_GROUP_SIZE peer IDs) in ChunkQuoteResponse::Success, enabling clients to verify close-group quorum before paying. On the payment verification side, nodes now check that the other peers in a payment proof are known close group members for the content address. The new validate_close_group_membership step requires at least CLOSE_GROUP_MAJORITY proof peers to appear in the node's local close group view (self + DHT peers). This prevents malicious clients from including arbitrary attacker nodes in payment proofs. Key changes: - ChunkQuoteResponse::Success gains a close_group field (Vec<[u8; 32]>) - AntProtocol holds Option> for local DHT lookups - handle_quote and handle_put query the local routing table - PaymentVerifier.verify_payment accepts local_close_group for membership checks - PaymentVerifierConfig gains local_peer_id for building the full close group set Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.lock | 2 + src/ant_protocol/chunk.rs | 4 + src/devnet.rs | 2 + src/node.rs | 11 ++- src/payment/verifier.rs | 153 +++++++++++++++++++++++++++------- src/storage/handler.rs | 52 ++++++++++-- src/storage/mod.rs | 2 +- tests/e2e/data_types/chunk.rs | 2 + tests/e2e/testnet.rs | 2 + 9 files changed, 192 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7f98bc1..7658a3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5217,6 +5217,8 @@ dependencies = [ [[package]] name = "saorsa-core" version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d3d05b97f789b0e0b7d54b2fe05f05edfafb94f72d065482fc20ce1e9fab69e" dependencies = [ "anyhow", "async-trait", diff --git a/src/ant_protocol/chunk.rs b/src/ant_protocol/chunk.rs index 0cbba46..0b1d5e6 100644 --- a/src/ant_protocol/chunk.rs +++ b/src/ant_protocol/chunk.rs @@ -236,6 +236,10 @@ pub enum ChunkQuoteResponse { quote: Vec, /// `true` when the chunk already exists on this node (skip payment). already_stored: bool, + /// Peer IDs (raw 32-byte BLAKE3 hashes) this node considers closest to + /// the content address, excluding itself. Clients use these views to + /// verify close-group quorum before paying. + close_group: Vec<[u8; 32]>, }, /// Quote generation failed. Error(ProtocolError), diff --git a/src/devnet.rs b/src/devnet.rs index f61fc40..8baa4f3 100644 --- a/src/devnet.rs +++ b/src/devnet.rs @@ -588,6 +588,7 @@ impl Devnet { evm: evm_config, cache_capacity: DEVNET_PAYMENT_CACHE_CAPACITY, local_rewards_address: rewards_address, + local_peer_id: *identity.peer_id().as_bytes(), }; let payment_verifier = PaymentVerifier::new(payment_config); let metrics_tracker = @@ -602,6 +603,7 @@ impl Devnet { Arc::new(storage), Arc::new(payment_verifier), Arc::new(quote_generator), + None, )) } diff --git a/src/node.rs b/src/node.rs index 2726f9d..f9bc76a 100644 --- a/src/node.rs +++ b/src/node.rs @@ -123,10 +123,14 @@ impl NodeBuilder { None }; + // Wrap P2P node in Arc early so it can be shared with the protocol handler. + let p2p_node = Arc::new(p2p_node); + // Initialize ANT protocol handler for chunk storage let ant_protocol = if self.config.storage.enabled { Some(Arc::new( - Self::build_ant_protocol(&self.config, &identity).await?, + Self::build_ant_protocol(&self.config, &identity, Some(Arc::clone(&p2p_node))) + .await?, )) } else { info!("Chunk storage disabled"); @@ -135,7 +139,7 @@ impl NodeBuilder { let node = RunningNode { config: self.config, - p2p_node: Arc::new(p2p_node), + p2p_node, shutdown, events_tx, events_rx: Some(events_rx), @@ -330,6 +334,7 @@ impl NodeBuilder { async fn build_ant_protocol( config: &NodeConfig, identity: &NodeIdentity, + p2p_node: Option>, ) -> Result { // Create LMDB storage let storage_config = LmdbStorageConfig { @@ -363,6 +368,7 @@ impl NodeBuilder { }, cache_capacity: config.payment.cache_capacity, local_rewards_address: rewards_address, + local_peer_id: *identity.peer_id().as_bytes(), }; let payment_verifier = PaymentVerifier::new(payment_config); // Safe: 5GB fits in usize on all supported 64-bit platforms. @@ -379,6 +385,7 @@ impl NodeBuilder { Arc::new(storage), Arc::new(payment_verifier), Arc::new(quote_generator), + p2p_node, ); info!( diff --git a/src/payment/verifier.rs b/src/payment/verifier.rs index c36e5b8..cdd4087 100644 --- a/src/payment/verifier.rs +++ b/src/payment/verifier.rs @@ -3,7 +3,7 @@ //! This is the core payment verification logic for ant-node. //! All new data requires EVM payment on Arbitrum (no free tier). -use crate::ant_protocol::CLOSE_GROUP_SIZE; +use crate::ant_protocol::{CLOSE_GROUP_MAJORITY, CLOSE_GROUP_SIZE}; use crate::error::{Error, Result}; use crate::payment::cache::{CacheStats, VerifiedCache, XorName}; use crate::payment::proof::{ @@ -18,6 +18,7 @@ use evmlib::Network as EvmNetwork; use lru::LruCache; use parking_lot::Mutex; use saorsa_core::identity::node_identity::peer_id_from_public_key_bytes; +use std::collections::HashSet; use std::num::NonZeroUsize; use std::time::SystemTime; use tracing::{debug, info}; @@ -75,6 +76,10 @@ pub struct PaymentVerifierConfig { /// Local node's rewards address. /// The verifier rejects payments that don't include this node as a recipient. pub local_rewards_address: RewardsAddress, + /// Local node's peer ID (32-byte BLAKE3 hash of ML-DSA-65 public key). + /// Used to build the full close group view (self + DHT peers) during + /// payment proof validation. + pub local_peer_id: [u8; 32], } /// Status returned by payment verification. @@ -199,6 +204,7 @@ impl PaymentVerifier { &self, xorname: &XorName, payment_proof: Option<&[u8]>, + local_close_group: &[[u8; 32]], ) -> Result { // First check if payment is required let status = self.check_payment_required(xorname); @@ -237,7 +243,8 @@ impl PaymentVerifier { debug!("Proof includes {} transaction hash(es)", tx_hashes.len()); } - self.verify_evm_payment(xorname, &payment).await?; + self.verify_evm_payment(xorname, &payment, local_close_group) + .await?; } None => { let tag = proof.first().copied().unwrap_or(0); @@ -300,7 +307,12 @@ impl PaymentVerifier { /// For unit tests that don't need on-chain verification, pre-populate /// the cache so `verify_payment` returns `CachedAsVerified` before /// reaching this method. - async fn verify_evm_payment(&self, xorname: &XorName, payment: &ProofOfPayment) -> Result<()> { + async fn verify_evm_payment( + &self, + xorname: &XorName, + payment: &ProofOfPayment, + local_close_group: &[[u8; 32]], + ) -> Result<()> { if tracing::enabled!(tracing::Level::DEBUG) { let xorname_hex = hex::encode(xorname); let quote_count = payment.peer_quotes.len(); @@ -312,6 +324,7 @@ impl PaymentVerifier { Self::validate_quote_timestamps(payment)?; Self::validate_peer_bindings(payment)?; self.validate_local_recipient(payment)?; + self.validate_close_group_membership(payment, local_close_group)?; // Verify quote signatures (CPU-bound, run off async runtime) let peer_quotes = payment.peer_quotes.clone(); @@ -692,6 +705,59 @@ impl PaymentVerifier { } Ok(()) } + + /// Verify that the peers in the payment proof are known close group members. + /// + /// Extracts peer IDs from the proof via `BLAKE3(pub_key)` and checks that at + /// least `CLOSE_GROUP_MAJORITY` appear in this node's close group view + /// (`local_close_group` + self). + /// + /// Skipped when `local_close_group` is empty (unit tests without DHT). + fn validate_close_group_membership( + &self, + payment: &ProofOfPayment, + local_close_group: &[[u8; 32]], + ) -> Result<()> { + if local_close_group.is_empty() { + return Ok(()); + } + + // Build the full close group set: DHT peers + this node itself. + let mut known_peers: HashSet<[u8; 32]> = local_close_group.iter().copied().collect(); + known_peers.insert(self.config.local_peer_id); + + // Extract peer IDs from the proof by hashing each quote's pub_key. + let mut recognized = 0usize; + for (encoded_peer_id, quote) in &payment.peer_quotes { + match peer_id_from_public_key_bytes("e.pub_key) { + Ok(peer_id) => { + if known_peers.contains(peer_id.as_bytes()) { + recognized += 1; + } else { + debug!("Proof peer {} not in local close group", peer_id.to_hex()); + } + } + Err(e) => { + debug!( + "Failed to derive peer ID from quote pub_key for {encoded_peer_id:?}: {e}" + ); + } + } + } + + if recognized >= CLOSE_GROUP_MAJORITY { + debug!( + "Close group membership validated: {recognized}/{} proof peers recognized", + payment.peer_quotes.len() + ); + Ok(()) + } else { + Err(Error::Payment(format!( + "Too few proof peers are known close group members: {recognized}/{} recognized (need {CLOSE_GROUP_MAJORITY})", + payment.peer_quotes.len() + ))) + } + } } #[cfg(test)] @@ -706,6 +772,7 @@ mod tests { evm: EvmVerifierConfig::default(), cache_capacity: 100, local_rewards_address: RewardsAddress::new([1u8; 20]), + local_peer_id: [1u8; 32], }; PaymentVerifier::new(config) } @@ -739,7 +806,7 @@ mod tests { let xorname = [1u8; 32]; // No proof provided => should return an error (EVM is always on) - let result = verifier.verify_payment(&xorname, None).await; + let result = verifier.verify_payment(&xorname, None, &[]).await; assert!( result.is_err(), "Expected Err without proof, got: {result:?}" @@ -755,7 +822,7 @@ mod tests { verifier.cache.insert(xorname); // Should succeed without payment (cached) - let result = verifier.verify_payment(&xorname, None).await; + let result = verifier.verify_payment(&xorname, None, &[]).await; assert!(result.is_ok()); assert_eq!(result.expect("cached"), PaymentStatus::CachedAsVerified); } @@ -802,7 +869,9 @@ mod tests { // Proof smaller than MIN_PAYMENT_PROOF_SIZE_BYTES let small_proof = vec![0u8; MIN_PAYMENT_PROOF_SIZE_BYTES - 1]; - let result = verifier.verify_payment(&xorname, Some(&small_proof)).await; + let result = verifier + .verify_payment(&xorname, Some(&small_proof), &[]) + .await; assert!(result.is_err()); let err_msg = format!("{}", result.expect_err("should fail")); assert!( @@ -818,7 +887,9 @@ mod tests { // Proof larger than MAX_PAYMENT_PROOF_SIZE_BYTES let large_proof = vec![0u8; MAX_PAYMENT_PROOF_SIZE_BYTES + 1]; - let result = verifier.verify_payment(&xorname, Some(&large_proof)).await; + let result = verifier + .verify_payment(&xorname, Some(&large_proof), &[]) + .await; assert!(result.is_err()); let err_msg = format!("{}", result.expect_err("should fail")); assert!( @@ -835,7 +906,7 @@ mod tests { // Exactly MIN_PAYMENT_PROOF_SIZE_BYTES with unknown tag — rejected let boundary_proof = vec![0xFFu8; MIN_PAYMENT_PROOF_SIZE_BYTES]; let result = verifier - .verify_payment(&xorname, Some(&boundary_proof)) + .verify_payment(&xorname, Some(&boundary_proof), &[]) .await; assert!(result.is_err()); let err_msg = format!("{}", result.expect_err("should fail")); @@ -853,7 +924,7 @@ mod tests { // Exactly MAX_PAYMENT_PROOF_SIZE_BYTES with unknown tag — rejected let boundary_proof = vec![0xFFu8; MAX_PAYMENT_PROOF_SIZE_BYTES]; let result = verifier - .verify_payment(&xorname, Some(&boundary_proof)) + .verify_payment(&xorname, Some(&boundary_proof), &[]) .await; assert!(result.is_err()); let err_msg = format!("{}", result.expect_err("should fail")); @@ -871,7 +942,7 @@ mod tests { // Valid tag (0x01) but garbage payload — should fail deserialization let mut garbage = vec![crate::ant_protocol::PROOF_TAG_SINGLE_NODE]; garbage.extend_from_slice(&[0xAB; 63]); - let result = verifier.verify_payment(&xorname, Some(&garbage)).await; + let result = verifier.verify_payment(&xorname, Some(&garbage), &[]).await; assert!(result.is_err()); let err_msg = format!("{}", result.expect_err("should fail")); assert!( @@ -926,7 +997,7 @@ mod tests { let v = verifier.clone(); handles.push(tokio::spawn(async move { let xorname = [i; 32]; - v.verify_payment(&xorname, None).await + v.verify_payment(&xorname, None, &[]).await })); } @@ -1055,7 +1126,7 @@ mod tests { let proof_bytes = serialize_single_node_proof(&proof).expect("serialize proof"); let result = verifier - .verify_payment(&target_xorname, Some(&proof_bytes)) + .verify_payment(&target_xorname, Some(&proof_bytes), &[]) .await; assert!(result.is_err(), "Should reject mismatched content address"); @@ -1128,7 +1199,9 @@ mod tests { } let proof_bytes = serialize_proof(peer_quotes); - let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), &[]) + .await; assert!(result.is_err(), "Should reject expired quote"); let err_msg = format!("{}", result.expect_err("should fail")); @@ -1159,7 +1232,9 @@ mod tests { } let proof_bytes = serialize_proof(peer_quotes); - let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), &[]) + .await; assert!(result.is_err(), "Should reject future-timestamped quote"); let err_msg = format!("{}", result.expect_err("should fail")); @@ -1190,7 +1265,9 @@ mod tests { } let proof_bytes = serialize_proof(peer_quotes); - let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), &[]) + .await; // Should NOT fail at timestamp check (will fail later at pub_key binding) let err_msg = format!("{}", result.expect_err("should fail at later check")); @@ -1221,7 +1298,9 @@ mod tests { } let proof_bytes = serialize_proof(peer_quotes); - let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), &[]) + .await; assert!( result.is_err(), @@ -1255,7 +1334,9 @@ mod tests { } let proof_bytes = serialize_proof(peer_quotes); - let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), &[]) + .await; // Should NOT fail at timestamp check (will fail later at pub_key binding) let err_msg = format!("{}", result.expect_err("should fail at later check")); @@ -1294,6 +1375,7 @@ mod tests { }, cache_capacity: 100, local_rewards_address: local_addr, + local_peer_id: [0xAAu8; 32], }; let verifier = PaymentVerifier::new(config); @@ -1316,7 +1398,9 @@ mod tests { } let proof_bytes = serialize_proof(peer_quotes); - let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), &[]) + .await; assert!(result.is_err(), "Should reject payment not addressed to us"); let err_msg = format!("{}", result.expect_err("should fail")); @@ -1355,7 +1439,9 @@ mod tests { } let proof_bytes = serialize_proof(peer_quotes); - let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), &[]) + .await; assert!(result.is_err(), "Should reject wrong peer binding"); let err_msg = format!("{}", result.expect_err("should fail")); @@ -1383,7 +1469,7 @@ mod tests { merkle_garbage.extend_from_slice(&[0xAB; 63]); let result = verifier - .verify_payment(&xorname, Some(&merkle_garbage)) + .verify_payment(&xorname, Some(&merkle_garbage), &[]) .await; assert!( @@ -1433,7 +1519,9 @@ mod tests { // verify_payment should process it through the single-node path. // It will fail at quote validation (fake pub_key), but we verify // it passes the deserialization stage by checking the error type. - let result = verifier.verify_payment(&xorname, Some(&tagged_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&tagged_bytes), &[]) + .await; assert!(result.is_err(), "Should fail at quote validation stage"); let err_msg = format!("{}", result.expect_err("should fail")); @@ -1608,7 +1696,7 @@ mod tests { let wrong_xorname = [0xFFu8; 32]; let result = verifier - .verify_payment(&wrong_xorname, Some(&tagged_proof)) + .verify_payment(&wrong_xorname, Some(&tagged_proof), &[]) .await; assert!( @@ -1636,7 +1724,9 @@ mod tests { bad_proof.push(0x00); } - let result = verifier.verify_payment(&xorname, Some(&bad_proof)).await; + let result = verifier + .verify_payment(&xorname, Some(&bad_proof), &[]) + .await; assert!(result.is_err(), "Should reject malformed merkle body"); let err_msg = format!("{}", result.expect_err("should fail")); @@ -1688,6 +1778,7 @@ mod tests { evm: EvmVerifierConfig::default(), cache_capacity: 100, local_rewards_address: RewardsAddress::new([1u8; 20]), + local_peer_id: [1u8; 32], }; let verifier = PaymentVerifier::new(config); @@ -1803,7 +1894,7 @@ mod tests { let tagged = crate::payment::proof::serialize_merkle_proof(&merkle_proof).expect("serialize"); - let result = verifier.verify_payment(&xorname, Some(&tagged)).await; + let result = verifier.verify_payment(&xorname, Some(&tagged), &[]).await; assert!( result.is_err(), @@ -1833,7 +1924,7 @@ mod tests { verifier.pool_cache.lock().put(pool_hash, info); } - let result = verifier.verify_payment(&xorname, Some(&tagged)).await; + let result = verifier.verify_payment(&xorname, Some(&tagged), &[]).await; assert!( result.is_err(), @@ -1868,7 +1959,9 @@ mod tests { verifier.pool_cache.lock().put(pool_hash, info); } - let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await; + let result = verifier + .verify_payment(&xorname, Some(&tagged_proof), &[]) + .await; assert!( result.is_err(), @@ -1902,7 +1995,9 @@ mod tests { verifier.pool_cache.lock().put(pool_hash, info); } - let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await; + let result = verifier + .verify_payment(&xorname, Some(&tagged_proof), &[]) + .await; assert!(result.is_err(), "Should reject paid node address mismatch"); let err_msg = format!("{}", result.expect_err("should fail")); @@ -1928,7 +2023,9 @@ mod tests { verifier.pool_cache.lock().put(pool_hash, info); } - let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await; + let result = verifier + .verify_payment(&xorname, Some(&tagged_proof), &[]) + .await; assert!( result.is_err(), diff --git a/src/storage/handler.rs b/src/storage/handler.rs index 12e5449..168b38f 100644 --- a/src/storage/handler.rs +++ b/src/storage/handler.rs @@ -30,14 +30,15 @@ use crate::ant_protocol::{ ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest, - MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, DATA_TYPE_CHUNK, - MAX_CHUNK_SIZE, + MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, CLOSE_GROUP_SIZE, + DATA_TYPE_CHUNK, MAX_CHUNK_SIZE, }; use crate::client::compute_address; use crate::error::{Error, Result}; use crate::payment::{PaymentVerifier, QuoteGenerator}; use crate::storage::lmdb::LmdbStorage; use bytes::Bytes; +use saorsa_core::P2PNode; use std::sync::Arc; use tracing::{debug, info, warn}; @@ -53,6 +54,9 @@ pub struct AntProtocol { /// Quote generator for creating storage quotes. /// Also handles merkle candidate quote signing via ML-DSA-65. quote_generator: Arc, + /// P2P node for local close-group lookups during quote generation. + /// `None` only in unit tests where a full P2P node is unavailable. + p2p_node: Option>, } impl AntProtocol { @@ -63,16 +67,19 @@ impl AntProtocol { /// * `storage` - LMDB storage for chunk persistence /// * `payment_verifier` - Payment verifier for validating payments /// * `quote_generator` - Quote generator for creating storage quotes + /// * `p2p_node` - P2P node for local close-group lookups (`None` in unit tests) #[must_use] pub fn new( storage: Arc, payment_verifier: Arc, quote_generator: Arc, + p2p_node: Option>, ) -> Self { Self { storage, payment_verifier, quote_generator, + p2p_node, } } @@ -106,7 +113,7 @@ impl AntProtocol { ChunkMessageBody::GetResponse(self.handle_get(req).await) } ChunkMessageBody::QuoteRequest(ref req) => { - ChunkMessageBody::QuoteResponse(self.handle_quote(req)) + ChunkMessageBody::QuoteResponse(self.handle_quote(req).await) } ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => { ChunkMessageBody::MerkleCandidateQuoteResponse( @@ -171,10 +178,26 @@ impl AntProtocol { Ok(false) => {} } - // 4. Verify payment + // 4. Look up local close group for this content address. + let local_close_group: Vec<[u8; 32]> = match self.p2p_node { + Some(ref p2p) => p2p + .dht() + .find_closest_nodes_local(&address, CLOSE_GROUP_SIZE) + .await + .iter() + .map(|node| *node.peer_id.as_bytes()) + .collect(), + None => Vec::new(), + }; + + // 5. Verify payment (including close group membership check) let payment_result = self .payment_verifier - .verify_payment(&address, request.payment_proof.as_deref()) + .verify_payment( + &address, + request.payment_proof.as_deref(), + &local_close_group, + ) .await; match payment_result { @@ -232,7 +255,7 @@ impl AntProtocol { } /// Handle a quote request. - fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse { + async fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse { let addr_hex = hex::encode(request.address); let data_size = request.data_size; debug!("Handling quote request for {addr_hex} (size: {data_size})"); @@ -265,6 +288,19 @@ impl AntProtocol { }); } + // Query local routing table for this node's view of the close group. + // This is an in-memory lookup — no network round-trips. + let close_group: Vec<[u8; 32]> = match self.p2p_node { + Some(ref p2p) => p2p + .dht() + .find_closest_nodes_local(&request.address, CLOSE_GROUP_SIZE) + .await + .iter() + .map(|node| *node.peer_id.as_bytes()) + .collect(), + None => Vec::new(), + }; + match self .quote_generator .create_quote(request.address, data_size_usize, request.data_type) @@ -275,6 +311,7 @@ impl AntProtocol { Ok(quote_bytes) => ChunkQuoteResponse::Success { quote: quote_bytes, already_stored, + close_group, }, Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!( "Failed to serialize quote: {e}" @@ -416,6 +453,7 @@ mod tests { evm: EvmVerifierConfig::default(), cache_capacity: 100_000, local_rewards_address: rewards_address, + local_peer_id: [1u8; 32], }; let payment_verifier = Arc::new(PaymentVerifier::new(payment_config)); let metrics_tracker = QuotingMetricsTracker::new(1000, 100); @@ -434,7 +472,7 @@ mod tests { .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec()) }); - let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator)); + let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator), None); (protocol, temp_dir) } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 949fff6..c9ad10c 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -38,7 +38,7 @@ //! let storage = Arc::new(LmdbStorage::new(config).await?); //! //! // Create protocol handler -//! let protocol = AntProtocol::new(storage, Arc::new(payment_verifier), Arc::new(quote_generator)); +//! let protocol = AntProtocol::new(storage, Arc::new(payment_verifier), Arc::new(quote_generator), None); //! //! // Register with saorsa-core //! listener.register_protocol(protocol).await?; diff --git a/tests/e2e/data_types/chunk.rs b/tests/e2e/data_types/chunk.rs index 3822c41..e6a038d 100644 --- a/tests/e2e/data_types/chunk.rs +++ b/tests/e2e/data_types/chunk.rs @@ -440,6 +440,7 @@ mod tests { evm: EvmVerifierConfig { network }, cache_capacity: 100, local_rewards_address: rewards_address, + local_peer_id: [0x01; 32], }); let metrics_tracker = QuotingMetricsTracker::new(1000, 100); let quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker); @@ -448,6 +449,7 @@ mod tests { Arc::new(storage), Arc::new(payment_verifier), Arc::new(quote_generator), + None, ); Ok((protocol, temp_dir, testnet)) diff --git a/tests/e2e/testnet.rs b/tests/e2e/testnet.rs index e61c1a2..65ea246 100644 --- a/tests/e2e/testnet.rs +++ b/tests/e2e/testnet.rs @@ -1077,6 +1077,7 @@ impl TestNetwork { }, cache_capacity: TEST_PAYMENT_CACHE_CAPACITY, local_rewards_address: rewards_address, + local_peer_id: *identity.peer_id().as_bytes(), }; let payment_verifier = PaymentVerifier::new(payment_config); @@ -1111,6 +1112,7 @@ impl TestNetwork { Arc::new(storage), Arc::new(payment_verifier), Arc::new(quote_generator), + None, )) } From 2e026b6e132ce8e9f8236f74debf078b12c3f5aa Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 30 Mar 2026 19:06:29 +0200 Subject: [PATCH 2/4] fix: address PR review - wire p2p_node to devnet/testnet, deduplicate DHT lookups MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Devnet and e2e test nodes were constructed with p2p_node=None and never received it later, silently skipping close-group validation. Switch AntProtocol.p2p_node to OnceLock with a set_p2p_node() setter so the P2P node can be injected after construction. Wire it up in both devnet::start_node and testnet::start_node. Also extracts the duplicated DHT lookup into AntProtocol::local_close_group(), fixes the step-numbering comment in handle_put (5→6 for the store step), and clarifies docs around find_closest_nodes_local semantics (excludes self, CLOSE_GROUP_SIZE count rationale). Co-Authored-By: Claude Opus 4.6 (1M context) --- src/ant_protocol/chunk.rs | 8 ++-- src/devnet.rs | 2 + src/storage/handler.rs | 79 +++++++++++++++++++++++++-------------- tests/e2e/testnet.rs | 2 + 4 files changed, 60 insertions(+), 31 deletions(-) diff --git a/src/ant_protocol/chunk.rs b/src/ant_protocol/chunk.rs index 0b1d5e6..5113f33 100644 --- a/src/ant_protocol/chunk.rs +++ b/src/ant_protocol/chunk.rs @@ -236,9 +236,11 @@ pub enum ChunkQuoteResponse { quote: Vec, /// `true` when the chunk already exists on this node (skip payment). already_stored: bool, - /// Peer IDs (raw 32-byte BLAKE3 hashes) this node considers closest to - /// the content address, excluding itself. Clients use these views to - /// verify close-group quorum before paying. + /// Up to `CLOSE_GROUP_SIZE` peer IDs (raw 32-byte BLAKE3 hashes) this + /// node considers closest to the content address, **excluding itself** + /// (the local node is filtered out by the DHT query). Clients combine + /// these views from multiple nodes to verify close-group quorum before + /// paying. close_group: Vec<[u8; 32]>, }, /// Quote generation failed. diff --git a/src/devnet.rs b/src/devnet.rs index 8baa4f3..8c51bfa 100644 --- a/src/devnet.rs +++ b/src/devnet.rs @@ -645,6 +645,8 @@ impl Devnet { *node.state.write().await = NodeState::Running; if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) { + // Inject P2P node into protocol handler for close-group lookups. + let _ = protocol.set_p2p_node(Arc::clone(p2p)); let mut events = p2p.subscribe_events(); let p2p_clone = Arc::clone(p2p); let protocol_clone = Arc::clone(protocol); diff --git a/src/storage/handler.rs b/src/storage/handler.rs index 168b38f..99779b2 100644 --- a/src/storage/handler.rs +++ b/src/storage/handler.rs @@ -40,6 +40,7 @@ use crate::storage::lmdb::LmdbStorage; use bytes::Bytes; use saorsa_core::P2PNode; use std::sync::Arc; +use std::sync::OnceLock; use tracing::{debug, info, warn}; /// ANT protocol handler. @@ -54,9 +55,10 @@ pub struct AntProtocol { /// Quote generator for creating storage quotes. /// Also handles merkle candidate quote signing via ML-DSA-65. quote_generator: Arc, - /// P2P node for local close-group lookups during quote generation. - /// `None` only in unit tests where a full P2P node is unavailable. - p2p_node: Option>, + /// P2P node for local close-group lookups during quote and payment + /// validation. Initialised via the constructor or [`set_p2p_node`] when + /// the P2P layer starts after the protocol handler (devnet / test nodes). + p2p_node: OnceLock>, } impl AntProtocol { @@ -67,7 +69,8 @@ impl AntProtocol { /// * `storage` - LMDB storage for chunk persistence /// * `payment_verifier` - Payment verifier for validating payments /// * `quote_generator` - Quote generator for creating storage quotes - /// * `p2p_node` - P2P node for local close-group lookups (`None` in unit tests) + /// * `p2p_node` - P2P node for local close-group lookups (`None` in unit tests + /// or when the P2P layer is not yet started — see [`set_p2p_node`]) #[must_use] pub fn new( storage: Arc, @@ -75,11 +78,51 @@ impl AntProtocol { quote_generator: Arc, p2p_node: Option>, ) -> Self { + let lock = OnceLock::new(); + if let Some(node) = p2p_node { + // Fresh OnceLock — set cannot fail. + let _ = lock.set(node); + } Self { storage, payment_verifier, quote_generator, - p2p_node, + p2p_node: lock, + } + } + + /// Inject the P2P node after construction. + /// + /// Used by devnet and test harnesses where the `P2PNode` is created after + /// the `AntProtocol` handler. + /// + /// # Errors + /// + /// Returns the rejected `Arc` if a node was already set. + pub fn set_p2p_node(&self, node: Arc) -> std::result::Result<(), Arc> { + self.p2p_node.set(node) + } + + /// Query the local routing table for the closest peers to `address`. + /// + /// Returns up to `CLOSE_GROUP_SIZE` peer IDs **excluding this node**. + /// The local node is intentionally omitted because `find_closest_nodes_local` + /// filters out self — the caller adds `local_peer_id` separately when + /// building the full close-group set for validation. + /// + /// We request `CLOSE_GROUP_SIZE` (not `CLOSE_GROUP_SIZE - 1`) because this + /// node may not be in the actual close group for the target address — asking + /// for fewer peers could exclude a legitimate member. + async fn local_close_group(&self, address: &[u8; 32]) -> Vec<[u8; 32]> { + match self.p2p_node.get() { + Some(p2p) => p2p + .dht() + .find_closest_nodes_local(address, CLOSE_GROUP_SIZE) + .await + .iter() + .map(|node| *node.peer_id.as_bytes()) + .collect(), + None => Vec::new(), } } @@ -179,16 +222,7 @@ impl AntProtocol { } // 4. Look up local close group for this content address. - let local_close_group: Vec<[u8; 32]> = match self.p2p_node { - Some(ref p2p) => p2p - .dht() - .find_closest_nodes_local(&address, CLOSE_GROUP_SIZE) - .await - .iter() - .map(|node| *node.peer_id.as_bytes()) - .collect(), - None => Vec::new(), - }; + let local_close_group = self.local_close_group(&address).await; // 5. Verify payment (including close group membership check) let payment_result = self @@ -214,7 +248,7 @@ impl AntProtocol { } } - // 5. Store chunk + // 6. Store chunk match self.storage.put(&address, &request.content).await { Ok(_) => { let content_len = request.content.len(); @@ -288,18 +322,7 @@ impl AntProtocol { }); } - // Query local routing table for this node's view of the close group. - // This is an in-memory lookup — no network round-trips. - let close_group: Vec<[u8; 32]> = match self.p2p_node { - Some(ref p2p) => p2p - .dht() - .find_closest_nodes_local(&request.address, CLOSE_GROUP_SIZE) - .await - .iter() - .map(|node| *node.peer_id.as_bytes()) - .collect(), - None => Vec::new(), - }; + let close_group = self.local_close_group(&request.address).await; match self .quote_generator diff --git a/tests/e2e/testnet.rs b/tests/e2e/testnet.rs index 65ea246..fe9e8a7 100644 --- a/tests/e2e/testnet.rs +++ b/tests/e2e/testnet.rs @@ -1155,6 +1155,8 @@ impl TestNetwork { // Start protocol handler that routes incoming P2P messages to AntProtocol if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) { + // Inject P2P node into protocol handler for close-group lookups. + let _ = protocol.set_p2p_node(Arc::clone(p2p)); let mut events = p2p.subscribe_events(); let p2p_clone = Arc::clone(p2p); let protocol_clone = Arc::clone(protocol); From f0cabbfe102ffb42bdc9c4079012b024499c0779 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 30 Mar 2026 19:49:21 +0200 Subject: [PATCH 3/4] fix: require all proof peers to be in the current close group MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the permissive majority check (3/5 recognized) with strict validation that rejects the proof if ANY peer is not in the verifying node's current close group. This prevents malicious clients from including attacker-controlled nodes in payment proofs. Also promotes invalid ML-DSA pub_key derivation from a silent debug log to a hard error — a bad pub_key in a proof is suspicious, not benign. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/payment/verifier.rs | 56 ++++++++++++++++------------------------- 1 file changed, 22 insertions(+), 34 deletions(-) diff --git a/src/payment/verifier.rs b/src/payment/verifier.rs index cdd4087..8eeef32 100644 --- a/src/payment/verifier.rs +++ b/src/payment/verifier.rs @@ -3,7 +3,7 @@ //! This is the core payment verification logic for ant-node. //! All new data requires EVM payment on Arbitrum (no free tier). -use crate::ant_protocol::{CLOSE_GROUP_MAJORITY, CLOSE_GROUP_SIZE}; +use crate::ant_protocol::CLOSE_GROUP_SIZE; use crate::error::{Error, Result}; use crate::payment::cache::{CacheStats, VerifiedCache, XorName}; use crate::payment::proof::{ @@ -706,11 +706,11 @@ impl PaymentVerifier { Ok(()) } - /// Verify that the peers in the payment proof are known close group members. + /// Verify that **every** peer in the payment proof is a known close group member. /// - /// Extracts peer IDs from the proof via `BLAKE3(pub_key)` and checks that at - /// least `CLOSE_GROUP_MAJORITY` appear in this node's close group view - /// (`local_close_group` + self). + /// Builds the known set from the current DHT close group plus this node + /// itself, then checks that each proof peer (derived via `BLAKE3(pub_key)`) + /// appears in that set. Rejects the proof if ANY peer is unrecognized. /// /// Skipped when `local_close_group` is empty (unit tests without DHT). fn validate_close_group_membership( @@ -722,41 +722,29 @@ impl PaymentVerifier { return Ok(()); } - // Build the full close group set: DHT peers + this node itself. + // Build the known peer set: current DHT close group + this node. let mut known_peers: HashSet<[u8; 32]> = local_close_group.iter().copied().collect(); known_peers.insert(self.config.local_peer_id); - // Extract peer IDs from the proof by hashing each quote's pub_key. - let mut recognized = 0usize; - for (encoded_peer_id, quote) in &payment.peer_quotes { - match peer_id_from_public_key_bytes("e.pub_key) { - Ok(peer_id) => { - if known_peers.contains(peer_id.as_bytes()) { - recognized += 1; - } else { - debug!("Proof peer {} not in local close group", peer_id.to_hex()); - } - } - Err(e) => { - debug!( - "Failed to derive peer ID from quote pub_key for {encoded_peer_id:?}: {e}" - ); - } + // Every proof peer must be in the known set. + for (_encoded_peer_id, quote) in &payment.peer_quotes { + let peer_id = peer_id_from_public_key_bytes("e.pub_key).map_err(|e| { + Error::Payment(format!("Invalid ML-DSA pub_key in proof quote: {e}")) + })?; + + if !known_peers.contains(peer_id.as_bytes()) { + return Err(Error::Payment(format!( + "Proof peer {} is not in the current close group", + peer_id.to_hex() + ))); } } - if recognized >= CLOSE_GROUP_MAJORITY { - debug!( - "Close group membership validated: {recognized}/{} proof peers recognized", - payment.peer_quotes.len() - ); - Ok(()) - } else { - Err(Error::Payment(format!( - "Too few proof peers are known close group members: {recognized}/{} recognized (need {CLOSE_GROUP_MAJORITY})", - payment.peer_quotes.len() - ))) - } + debug!( + "Close group membership validated: all {} proof peers recognized", + payment.peer_quotes.len() + ); + Ok(()) } } From 133f6662ce5d9bc22faf9af329e5b1d156b3fd11 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Tue, 31 Mar 2026 10:31:06 +0200 Subject: [PATCH 4/4] test: add close-group membership validation tests and handle set_p2p_node errors Address Copilot review feedback: add 4 unit tests covering the accept, reject, skip, and local-peer-known paths of validate_close_group_membership, and handle set_p2p_node errors explicitly in devnet (warn) and testnet (propagate as TestnetError::Startup). Co-Authored-By: Claude Opus 4.6 (1M context) --- src/devnet.rs | 4 +- src/payment/verifier.rs | 163 ++++++++++++++++++++++++++++++++++++++++ tests/e2e/testnet.rs | 7 +- 3 files changed, 172 insertions(+), 2 deletions(-) diff --git a/src/devnet.rs b/src/devnet.rs index 8c51bfa..47fd452 100644 --- a/src/devnet.rs +++ b/src/devnet.rs @@ -646,7 +646,9 @@ impl Devnet { if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) { // Inject P2P node into protocol handler for close-group lookups. - let _ = protocol.set_p2p_node(Arc::clone(p2p)); + if protocol.set_p2p_node(Arc::clone(p2p)).is_err() { + warn!("P2P node already set on protocol handler for devnet node {index}"); + } let mut events = p2p.subscribe_events(); let p2p_clone = Arc::clone(p2p); let protocol_clone = Arc::clone(protocol); diff --git a/src/payment/verifier.rs b/src/payment/verifier.rs index 8eeef32..2c672fa 100644 --- a/src/payment/verifier.rs +++ b/src/payment/verifier.rs @@ -752,6 +752,9 @@ impl PaymentVerifier { #[allow(clippy::expect_used)] mod tests { use super::*; + use ant_evm::EncodedPeerId; + use saorsa_core::MlDsa65; + use saorsa_pqc::pqc::MlDsaOperations; /// Create a verifier for unit tests. EVM is always on, but tests can /// pre-populate the cache to bypass on-chain verification. @@ -2026,4 +2029,164 @@ mod tests { "Error should mention depth/count mismatch: {err_msg}" ); } + + // ========================================================================= + // Close-group membership validation tests + // ========================================================================= + + #[test] + fn test_close_group_all_peers_recognised_accepted() { + let ml_dsa = MlDsa65::new(); + let mut peer_quotes = Vec::new(); + let mut close_group_ids: Vec<[u8; 32]> = Vec::new(); + + // Generate CLOSE_GROUP_SIZE peers with real ML-DSA keys. + for _ in 0..CLOSE_GROUP_SIZE { + let (public_key, _) = ml_dsa.generate_keypair().expect("keygen"); + let pub_key_bytes = public_key.as_bytes().to_vec(); + let ant_peer_id = + peer_id_from_public_key_bytes(&pub_key_bytes).expect("peer id from pub key"); + close_group_ids.push(*ant_peer_id.as_bytes()); + + let encoded = encoded_peer_id_for_pub_key(&pub_key_bytes); + let mut quote = make_fake_quote( + [0xAA; 32], + SystemTime::now(), + RewardsAddress::new([1u8; 20]), + ); + quote.pub_key = pub_key_bytes; + peer_quotes.push((encoded, quote)); + } + + let payment = ProofOfPayment { peer_quotes }; + + // Verifier whose local_peer_id is NOT one of the proof peers (but that's + // fine — it only needs to be in the known set, and we insert it). + let config = PaymentVerifierConfig { + evm: EvmVerifierConfig::default(), + cache_capacity: 100, + local_rewards_address: RewardsAddress::new([1u8; 20]), + local_peer_id: [0xBBu8; 32], + }; + let verifier = PaymentVerifier::new(config); + + let result = verifier.validate_close_group_membership(&payment, &close_group_ids); + assert!( + result.is_ok(), + "All proof peers are in close group — should accept: {result:?}" + ); + } + + #[test] + fn test_close_group_unknown_peer_rejected() { + let ml_dsa = MlDsa65::new(); + let mut peer_quotes = Vec::new(); + let mut close_group_ids: Vec<[u8; 32]> = Vec::new(); + + // Generate CLOSE_GROUP_SIZE peers; include all but the last in the + // close group so one peer is "unknown". + for i in 0..CLOSE_GROUP_SIZE { + let (public_key, _) = ml_dsa.generate_keypair().expect("keygen"); + let pub_key_bytes = public_key.as_bytes().to_vec(); + let ant_peer_id = + peer_id_from_public_key_bytes(&pub_key_bytes).expect("peer id from pub key"); + + // Only add the first N-1 peers to the close group. + if i < CLOSE_GROUP_SIZE - 1 { + close_group_ids.push(*ant_peer_id.as_bytes()); + } + + let encoded = encoded_peer_id_for_pub_key(&pub_key_bytes); + let mut quote = make_fake_quote( + [0xAA; 32], + SystemTime::now(), + RewardsAddress::new([1u8; 20]), + ); + quote.pub_key = pub_key_bytes; + peer_quotes.push((encoded, quote)); + } + + let payment = ProofOfPayment { peer_quotes }; + + let config = PaymentVerifierConfig { + evm: EvmVerifierConfig::default(), + cache_capacity: 100, + local_rewards_address: RewardsAddress::new([1u8; 20]), + local_peer_id: [0xBBu8; 32], + }; + let verifier = PaymentVerifier::new(config); + + let result = verifier.validate_close_group_membership(&payment, &close_group_ids); + assert!(result.is_err(), "One unknown peer — should reject"); + let err_msg = format!("{}", result.expect_err("should fail")); + assert!( + err_msg.contains("not in the current close group"), + "Error should mention close group: {err_msg}" + ); + } + + #[test] + fn test_close_group_empty_skips_validation() { + // With an empty close group (unit test / no DHT), validation is skipped. + let verifier = create_test_verifier(); + + let quote = make_fake_quote( + [0xAA; 32], + SystemTime::now(), + RewardsAddress::new([1u8; 20]), + ); + let keypair = libp2p::identity::Keypair::generate_ed25519(); + let peer_id = libp2p::PeerId::from_public_key(&keypair.public()); + let peer_quotes = vec![(EncodedPeerId::from(peer_id), quote)]; + + let payment = ProofOfPayment { peer_quotes }; + + let result = verifier.validate_close_group_membership(&payment, &[]); + assert!( + result.is_ok(), + "Empty close group should skip validation: {result:?}" + ); + } + + #[test] + fn test_close_group_local_peer_is_implicitly_known() { + let ml_dsa = MlDsa65::new(); + + // Generate a single peer whose BLAKE3 ID we'll set as local_peer_id. + let (public_key, _) = ml_dsa.generate_keypair().expect("keygen"); + let pub_key_bytes = public_key.as_bytes().to_vec(); + let ant_peer_id = + peer_id_from_public_key_bytes(&pub_key_bytes).expect("peer id from pub key"); + + let encoded = encoded_peer_id_for_pub_key(&pub_key_bytes); + let mut quote = make_fake_quote( + [0xAA; 32], + SystemTime::now(), + RewardsAddress::new([1u8; 20]), + ); + quote.pub_key = pub_key_bytes; + + let payment = ProofOfPayment { + peer_quotes: vec![(encoded, quote)], + }; + + // The local_peer_id matches the proof peer, and the close group + // contains at least one entry (so validation isn't skipped) but + // does NOT contain the proof peer — only local_peer_id does. + let config = PaymentVerifierConfig { + evm: EvmVerifierConfig::default(), + cache_capacity: 100, + local_rewards_address: RewardsAddress::new([1u8; 20]), + local_peer_id: *ant_peer_id.as_bytes(), + }; + let verifier = PaymentVerifier::new(config); + + // Close group has a dummy entry so validation isn't skipped. + let dummy_peer = [0xFFu8; 32]; + let result = verifier.validate_close_group_membership(&payment, &[dummy_peer]); + assert!( + result.is_ok(), + "Proof peer matches local_peer_id — should accept: {result:?}" + ); + } } diff --git a/tests/e2e/testnet.rs b/tests/e2e/testnet.rs index fe9e8a7..9468e66 100644 --- a/tests/e2e/testnet.rs +++ b/tests/e2e/testnet.rs @@ -1156,7 +1156,12 @@ impl TestNetwork { // Start protocol handler that routes incoming P2P messages to AntProtocol if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) { // Inject P2P node into protocol handler for close-group lookups. - let _ = protocol.set_p2p_node(Arc::clone(p2p)); + protocol.set_p2p_node(Arc::clone(p2p)).map_err(|_| { + TestnetError::Startup(format!( + "P2P node already set on protocol handler for node {}", + node.index, + )) + })?; let mut events = p2p.subscribe_events(); let p2p_clone = Arc::clone(p2p); let protocol_clone = Arc::clone(protocol);