diff --git a/Cargo.lock b/Cargo.lock index 80f5aa4..16d0b15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -925,8 +925,7 @@ dependencies = [ [[package]] name = "ant-node" version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9161d53c72cfcc0dd8fee14bff64c0bad06642f074d52c5c91d9ee203acb4042" +source = "git+https://github.com/WithAutonomi/ant-node.git?branch=feat%2Fclose-group-quorum-validation#019bddbf54a75ea0bfe324d3960b633aa6b282d8" dependencies = [ "aes-gcm-siv", "ant-evm", diff --git a/ant-core/Cargo.toml b/ant-core/Cargo.toml index 561698d..caed492 100644 --- a/ant-core/Cargo.toml +++ b/ant-core/Cargo.toml @@ -38,7 +38,7 @@ tracing = "0.1" bytes = "1" lru = "0.16" rand = "0.8" -ant-node = "0.7" +ant-node = { git = "https://github.com/WithAutonomi/ant-node.git", branch = "feat/close-group-quorum-validation" } saorsa-pqc = "0.5" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/ant-core/src/data/client/quote.rs b/ant-core/src/data/client/quote.rs index 6840c45..d85152e 100644 --- a/ant-core/src/data/client/quote.rs +++ b/ant-core/src/data/client/quote.rs @@ -14,6 +14,7 @@ use ant_node::core::{MultiAddr, PeerId}; use ant_node::payment::calculate_price; use ant_node::{CLOSE_GROUP_MAJORITY, CLOSE_GROUP_SIZE}; use futures::stream::{FuturesUnordered, StreamExt}; +use std::collections::HashSet; use std::time::Duration; use tracing::{debug, info, warn}; @@ -95,6 +96,7 @@ impl Client { ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success { quote, already_stored, + close_group, }) => { if already_stored { debug!("Peer {peer_id_clone} already has chunk"); @@ -104,7 +106,7 @@ impl Client { Ok(payment_quote) => { let price = calculate_price(&payment_quote.quoting_metrics); debug!("Received quote from {peer_id_clone}: price = {price}"); - Some(Ok((payment_quote, price))) + Some(Ok((payment_quote, price, close_group))) } Err(e) => Some(Err(Error::Serialization(format!( "Failed to deserialize quote from {peer_id_clone}: {e}" @@ -134,12 +136,15 @@ impl Client { // Wait for all quote requests to complete or timeout. // Early-return if CLOSE_GROUP_MAJORITY peers report already stored. let mut quotes = Vec::with_capacity(CLOSE_GROUP_SIZE); + let mut close_group_views: Vec<(PeerId, Vec<[u8; 32]>)> = + Vec::with_capacity(CLOSE_GROUP_SIZE); let mut already_stored_count = 0usize; let mut failures: Vec = Vec::new(); while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await { match quote_result { - Ok((quote, price)) => { + Ok((quote, price, close_group)) => { + close_group_views.push((peer_id, close_group)); quotes.push((peer_id, addrs, quote, price)); } Err(Error::AlreadyStored) => { @@ -160,19 +165,491 @@ impl Client { } } - if quotes.len() >= CLOSE_GROUP_SIZE { - info!( - "Collected {} quotes for address {}", + if quotes.len() < CLOSE_GROUP_SIZE { + return Err(Error::InsufficientPeers(format!( + "Got {} quotes, need {CLOSE_GROUP_SIZE}. Failures: [{}]", quotes.len(), - hex::encode(address) + failures.join("; ") + ))); + } + + // Validate close-group quorum: each responding peer should recognize + // most of the other queried peers in its own close-group view. + let quorum_indices = Self::validate_close_group_quorum("es, &close_group_views)?; + + // Reorder quotes so quorum members come first. These peers mutually + // recognize each other and are most likely to accept payment proofs, + // so chunk_put_to_close_group should target them as the initial set. + let quorum_set: HashSet = quorum_indices.iter().copied().collect(); + let (quorum, non_quorum): (Vec<_>, Vec<_>) = quotes + .into_iter() + .enumerate() + .partition(|(i, _)| quorum_set.contains(i)); + let reordered: Vec<_> = quorum + .into_iter() + .chain(non_quorum) + .map(|(_, q)| q) + .collect(); + + info!( + "Collected {} quotes for address {} (close-group quorum verified, {} quorum members prioritized)", + reordered.len(), + hex::encode(address), + quorum_indices.len(), + ); + Ok(reordered) + } + + /// Validate close-group quorum by finding the largest subset of queried + /// peers that mutually recognize each other. + /// + /// "Mutual recognition" means: for every pair (P, Q) in the subset, + /// Q appears in P's close-group view. This matches the server-side + /// `CLOSE_GROUP_MAJORITY` threshold that nodes enforce during payment + /// verification. + /// + /// Fails if no mutually-recognizing subset of size `CLOSE_GROUP_MAJORITY` + /// exists. Returns the indices of the quorum members on success so + /// callers can prioritize those peers for uploads. + fn validate_close_group_quorum( + quotes: &[(PeerId, Vec, PaymentQuote, Amount)], + close_group_views: &[(PeerId, Vec<[u8; 32]>)], + ) -> Result> { + let peer_ids: Vec<[u8; 32]> = quotes + .iter() + .map(|(peer_id, _, _, _)| *peer_id.as_bytes()) + .collect(); + + // Build a lookup: peer_bytes → set of peers it recognizes + let views: Vec<([u8; 32], HashSet<[u8; 32]>)> = close_group_views + .iter() + .map(|(peer_id, view)| (*peer_id.as_bytes(), view.iter().copied().collect())) + .collect(); + + // Check subsets from largest to smallest (CLOSE_GROUP_SIZE down to + // CLOSE_GROUP_MAJORITY). For CLOSE_GROUP_SIZE=5 this is at most + // C(5,5) + C(5,4) + C(5,3) = 1 + 5 + 10 = 16 checks. + let quorum_indices = Self::find_largest_mutual_subset(&peer_ids, &views); + + if quorum_indices.len() >= CLOSE_GROUP_MAJORITY { + info!( + "Close-group quorum passed: {}/{} peers mutually recognize each other", + quorum_indices.len(), + peer_ids.len() ); - return Ok(quotes); + Ok(quorum_indices) + } else { + Err(Error::CloseGroupQuorumFailure(format!( + "Largest mutually-recognizing subset is {} peers (need {CLOSE_GROUP_MAJORITY})", + quorum_indices.len() + ))) + } + } + + /// Find the largest subset of `peer_ids` where every peer in the subset + /// appears in every other peer's close-group view. + /// + /// Returns the indices of the members in the largest mutual subset, + /// or an empty vec if no subset of size `CLOSE_GROUP_MAJORITY` exists. + fn find_largest_mutual_subset( + peer_ids: &[[u8; 32]], + views: &[([u8; 32], HashSet<[u8; 32]>)], + ) -> Vec { + let n = peer_ids.len(); + + // Try subset sizes from largest to smallest. + for size in (CLOSE_GROUP_MAJORITY..=n).rev() { + // Iterate all index combinations of the given size. + let mut indices: Vec = (0..size).collect(); + loop { + if Self::is_mutual_subset(peer_ids, views, &indices) { + return indices; + } + if !Self::next_combination(&mut indices, n) { + break; + } + } + } + + vec![] + } + + /// Check whether the peers at the given indices mutually recognize each other. + fn is_mutual_subset( + peer_ids: &[[u8; 32]], + views: &[([u8; 32], HashSet<[u8; 32]>)], + indices: &[usize], + ) -> bool { + for &i in indices { + // Find this peer's view + let peer_bytes = peer_ids[i]; + let view = views + .iter() + .find(|(id, _)| *id == peer_bytes) + .map(|(_, v)| v); + + let Some(view) = view else { + return false; + }; + + // Every OTHER peer in the subset must appear in this peer's view + for &j in indices { + if i == j { + continue; + } + if !view.contains(&peer_ids[j]) { + return false; + } + } + } + true + } + + /// Advance an index combination to the next one in lexicographic order. + /// Returns false when all combinations have been exhausted. + fn next_combination(indices: &mut [usize], n: usize) -> bool { + let k = indices.len(); + // Find the rightmost index that can be incremented + let mut i = k; + while i > 0 { + i -= 1; + if indices[i] < n - k + i { + indices[i] += 1; + // Reset all subsequent indices + for j in (i + 1)..k { + indices[j] = indices[j - 1] + 1; + } + return true; + } + } + false + } +} + +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used)] +mod tests { + use super::*; + use ant_evm::{Amount, PaymentQuote, QuotingMetrics, RewardsAddress}; + use ant_node::core::{MultiAddr, PeerId}; + use ant_node::CLOSE_GROUP_SIZE; + use std::collections::HashSet; + use std::net::{Ipv4Addr, SocketAddr}; + use std::time::SystemTime; + + /// Create a deterministic peer ID from an index byte. + fn peer(id: u8) -> [u8; 32] { + let mut bytes = [0u8; 32]; + bytes[0] = id; + bytes + } + + /// Build a views array where each peer's view contains the given neighbors. + fn make_views(entries: &[([u8; 32], &[[u8; 32]])]) -> Vec<([u8; 32], HashSet<[u8; 32]>)> { + entries + .iter() + .map(|(id, neighbors)| (*id, neighbors.iter().copied().collect())) + .collect() + } + + // ─── next_combination ────────────────────────────────────────────── + + #[test] + fn next_combination_enumerates_all_c_5_3() { + let n = 5; + let k = 3; + let mut indices: Vec = (0..k).collect(); + let mut count = 1; // first combination is [0,1,2] + while Client::next_combination(&mut indices, n) { + count += 1; + } + // C(5,3) = 10 + assert_eq!(count, 10); + } + + #[test] + fn next_combination_enumerates_all_c_5_5() { + let n = 5; + let k = 5; + let mut indices: Vec = (0..k).collect(); + let mut count = 1; + while Client::next_combination(&mut indices, n) { + count += 1; + } + // C(5,5) = 1 + assert_eq!(count, 1); + } + + #[test] + fn next_combination_single_element() { + let n = 5; + let k = 1; + let mut indices: Vec = (0..k).collect(); + let mut count = 1; + while Client::next_combination(&mut indices, n) { + count += 1; } + // C(5,1) = 5 + assert_eq!(count, 5); + } + + #[test] + fn next_combination_empty_returns_false() { + let mut indices: Vec = vec![]; + assert!(!Client::next_combination(&mut indices, 5)); + } + + // ─── is_mutual_subset ────────────────────────────────────────────── + + #[test] + fn is_mutual_subset_two_peers_recognize_each_other() { + let a = peer(1); + let b = peer(2); + let peer_ids = vec![a, b]; + let views = make_views(&[(a, &[b]), (b, &[a])]); + + assert!(Client::is_mutual_subset(&peer_ids, &views, &[0, 1])); + } + + #[test] + fn is_mutual_subset_asymmetric_fails() { + let a = peer(1); + let b = peer(2); + let peer_ids = vec![a, b]; + // A sees B, but B does NOT see A + let views = make_views(&[(a, &[b]), (b, &[])]); + + assert!(!Client::is_mutual_subset(&peer_ids, &views, &[0, 1])); + } + + #[test] + fn is_mutual_subset_missing_view_returns_false() { + let a = peer(1); + let b = peer(2); + let peer_ids = vec![a, b]; + // Only A has a view entry; B is missing entirely + let views = make_views(&[(a, &[b])]); + + assert!(!Client::is_mutual_subset(&peer_ids, &views, &[0, 1])); + } + + // ─── find_largest_mutual_subset ──────────────────────────────────── + + #[test] + fn all_five_peers_mutually_recognize() { + let peers: Vec<[u8; 32]> = (1..=5).map(peer).collect(); + let views = make_views(&[ + (peers[0], &[peers[1], peers[2], peers[3], peers[4]]), + (peers[1], &[peers[0], peers[2], peers[3], peers[4]]), + (peers[2], &[peers[0], peers[1], peers[3], peers[4]]), + (peers[3], &[peers[0], peers[1], peers[2], peers[4]]), + (peers[4], &[peers[0], peers[1], peers[2], peers[3]]), + ]); + + let result = Client::find_largest_mutual_subset(&peers, &views); + assert_eq!(result.len(), 5); + assert_eq!(result, vec![0, 1, 2, 3, 4]); + } + + #[test] + fn three_of_five_mutually_recognize() { + let peers: Vec<[u8; 32]> = (1..=5).map(peer).collect(); + // Peers 0,1,2 see each other; peers 3,4 have empty views + let views = make_views(&[ + (peers[0], &[peers[1], peers[2]]), + (peers[1], &[peers[0], peers[2]]), + (peers[2], &[peers[0], peers[1]]), + (peers[3], &[]), + (peers[4], &[]), + ]); + + let result = Client::find_largest_mutual_subset(&peers, &views); + assert_eq!(result.len(), 3); + assert_eq!(result, vec![0, 1, 2]); + } + + #[test] + fn two_of_five_below_majority() { + let peers: Vec<[u8; 32]> = (1..=5).map(peer).collect(); + // Only peers 0 and 1 see each other + let views = make_views(&[ + (peers[0], &[peers[1]]), + (peers[1], &[peers[0]]), + (peers[2], &[]), + (peers[3], &[]), + (peers[4], &[]), + ]); + + // Largest mutual subset is 2, below CLOSE_GROUP_MAJORITY (3) + assert!(Client::find_largest_mutual_subset(&peers, &views).is_empty()); + } + + #[test] + fn empty_views_returns_zero() { + let peers: Vec<[u8; 32]> = (1..=5).map(peer).collect(); + let views = make_views(&[ + (peers[0], &[]), + (peers[1], &[]), + (peers[2], &[]), + (peers[3], &[]), + (peers[4], &[]), + ]); + + assert!(Client::find_largest_mutual_subset(&peers, &views).is_empty()); + } + + #[test] + fn four_of_five_one_rogue_peer() { + let peers: Vec<[u8; 32]> = (1..=5).map(peer).collect(); + // Peer 4 doesn't recognize anyone, but the other 4 form a clique + let views = make_views(&[ + (peers[0], &[peers[1], peers[2], peers[3]]), + (peers[1], &[peers[0], peers[2], peers[3]]), + (peers[2], &[peers[0], peers[1], peers[3]]), + (peers[3], &[peers[0], peers[1], peers[2]]), + (peers[4], &[]), + ]); + + let result = Client::find_largest_mutual_subset(&peers, &views); + assert_eq!(result.len(), 4); + assert_eq!(result, vec![0, 1, 2, 3]); + } + + #[test] + fn asymmetric_recognition_reduces_clique() { + let peers: Vec<[u8; 32]> = (1..=5).map(peer).collect(); + // All see each other except: peer 3 does NOT see peer 0 + let views = make_views(&[ + (peers[0], &[peers[1], peers[2], peers[3], peers[4]]), + (peers[1], &[peers[0], peers[2], peers[3], peers[4]]), + (peers[2], &[peers[0], peers[1], peers[3], peers[4]]), + (peers[3], &[peers[1], peers[2], peers[4]]), // missing peers[0] + (peers[4], &[peers[0], peers[1], peers[2], peers[3]]), + ]); + + // {0,1,2,3,4} fails (3 doesn't see 0). Both {0,1,2,4} and {1,2,3,4} + // are valid 4-peer cliques; the algorithm returns the first found in + // lexicographic order. + let result = Client::find_largest_mutual_subset(&peers, &views); + assert_eq!(result.len(), 4); + assert_eq!(result, vec![0, 1, 2, 4]); + } + + // ─── validate_close_group_quorum (integration) ───────────────────── + + fn make_test_quote() -> PaymentQuote { + PaymentQuote { + content: xor_name::XorName([0u8; 32]), + timestamp: SystemTime::now(), + quoting_metrics: QuotingMetrics { + data_size: 0, + data_type: 0, + close_records_stored: 0, + records_per_type: vec![], + max_records: 0, + received_payment_count: 0, + live_time: 0, + network_density: None, + network_size: None, + }, + pub_key: vec![], + signature: vec![], + rewards_address: RewardsAddress::new([0u8; 20]), + } + } + + fn make_dummy_addr() -> Vec { + let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 12345); + vec![MultiAddr::quic(addr)] + } + + type Quotes = Vec<(PeerId, Vec, PaymentQuote, Amount)>; + type CloseGroupViews = Vec<(PeerId, Vec<[u8; 32]>)>; + + fn make_quotes_and_views( + peer_ids: &[PeerId], + neighbor_map: &[Vec<[u8; 32]>], + ) -> (Quotes, CloseGroupViews) { + let quotes: Quotes = peer_ids + .iter() + .map(|pid| (*pid, make_dummy_addr(), make_test_quote(), Amount::ZERO)) + .collect(); + + let views: CloseGroupViews = peer_ids + .iter() + .zip(neighbor_map.iter()) + .map(|(pid, neighbors)| (*pid, neighbors.clone())) + .collect(); + + (quotes, views) + } + + #[test] + fn validate_quorum_all_mutual_passes() { + let peer_ids: Vec = (1..=CLOSE_GROUP_SIZE) + .map(|i| PeerId::from_bytes(peer(i as u8))) + .collect(); + + let neighbor_map: Vec> = peer_ids + .iter() + .map(|me| { + peer_ids + .iter() + .filter(|p| p != &me) + .map(|p| *p.as_bytes()) + .collect() + }) + .collect(); + + let (quotes, views) = make_quotes_and_views(&peer_ids, &neighbor_map); + let indices = Client::validate_close_group_quorum("es, &views).unwrap(); + assert_eq!(indices.len(), CLOSE_GROUP_SIZE); + } + + #[test] + fn validate_quorum_empty_views_fails() { + let peer_ids: Vec = (1..=CLOSE_GROUP_SIZE) + .map(|i| PeerId::from_bytes(peer(i as u8))) + .collect(); + + let neighbor_map: Vec> = vec![vec![]; CLOSE_GROUP_SIZE]; + + let (quotes, views) = make_quotes_and_views(&peer_ids, &neighbor_map); + let result = Client::validate_close_group_quorum("es, &views); + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + Error::CloseGroupQuorumFailure(_) + )); + } + + #[test] + fn validate_quorum_exactly_majority_passes() { + let peer_ids: Vec = (1..=CLOSE_GROUP_SIZE) + .map(|i| PeerId::from_bytes(peer(i as u8))) + .collect(); + + // Only first CLOSE_GROUP_MAJORITY peers see each other + let majority = &peer_ids[..CLOSE_GROUP_MAJORITY]; + let neighbor_map: Vec> = peer_ids + .iter() + .map(|me| { + if majority.contains(me) { + majority + .iter() + .filter(|p| p != &me) + .map(|p| *p.as_bytes()) + .collect() + } else { + vec![] + } + }) + .collect(); - Err(Error::InsufficientPeers(format!( - "Got {} quotes, need {CLOSE_GROUP_SIZE}. Failures: [{}]", - quotes.len(), - failures.join("; ") - ))) + let (quotes, views) = make_quotes_and_views(&peer_ids, &neighbor_map); + let indices = Client::validate_close_group_quorum("es, &views).unwrap(); + assert_eq!(indices.len(), CLOSE_GROUP_MAJORITY); + // Only the first 3 peers (indices 0,1,2) form the quorum + assert_eq!(indices, vec![0, 1, 2]); } } diff --git a/ant-core/src/data/error.rs b/ant-core/src/data/error.rs index e637ecb..c46812f 100644 --- a/ant-core/src/data/error.rs +++ b/ant-core/src/data/error.rs @@ -63,6 +63,12 @@ pub enum Error { /// Data already exists on the network — no payment needed. #[error("already stored on network")] AlreadyStored, + + /// Close group quorum check failed — the queried peers do not mutually + /// recognize each other as close group members, so payment would not + /// result in durable storage. + #[error("close group quorum failure: {0}")] + CloseGroupQuorumFailure(String), } impl From for Error { diff --git a/ant-core/tests/support/mod.rs b/ant-core/tests/support/mod.rs index 66b81ab..a21f111 100644 --- a/ant-core/tests/support/mod.rs +++ b/ant-core/tests/support/mod.rs @@ -246,6 +246,7 @@ impl MiniTestnet { }, cache_capacity: 1000, local_rewards_address: rewards_address, + local_peer_id: *identity.peer_id().as_bytes(), }; let payment_verifier = Arc::new(PaymentVerifier::new(payment_config)); let metrics_tracker = QuotingMetricsTracker::new(TEST_MAX_RECORDS, TEST_INITIAL_RECORDS); @@ -271,6 +272,7 @@ impl MiniTestnet { storage, payment_verifier, Arc::new(quote_generator), + Some(Arc::clone(&node)), )); // Start message handler loop