Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
20f832d
feat: implement full replication subsystem
mickvandijke Mar 31, 2026
bb9ad07
test: add replication e2e tests and wire ReplicationEngine into test …
mickvandijke Mar 31, 2026
04732aa
fix: resolve all clippy warnings with -D warnings on all targets
mickvandijke Mar 31, 2026
ea07e03
fix: resolve doc link error and Windows CI test failures
mickvandijke Mar 31, 2026
4b0a271
test: add comprehensive Section 18 test coverage (36 new tests)
mickvandijke Apr 1, 2026
02883f1
test: add final 12 Section 18 scenarios for full test matrix coverage
mickvandijke Apr 1, 2026
796a51f
ci: enable e2e and replication tests in CI workflows
mickvandijke Apr 1, 2026
0521e31
test: add final 9 Section 18 scenarios for complete test matrix coverage
mickvandijke Apr 1, 2026
a3c1d71
fix: run e2e tests single-threaded to prevent LMDB TLS exhaustion
mickvandijke Apr 1, 2026
cea1966
fix: resolve protocol name validation and request-response handling i…
mickvandijke Apr 1, 2026
eec04ce
fix: handle Instant::checked_sub failure on Windows in bootstrap clai…
mickvandijke Apr 1, 2026
b4dce78
test: fix 7 weak/mislabeled Section 18 replication scenario tests
mickvandijke Apr 1, 2026
13be5e6
fix: resolve message handler deadlock and bootstrap flag in replicati…
mickvandijke Apr 1, 2026
a46b8fe
test: fix 6 missing/mislabeled/weak Section 18 replication scenario t…
mickvandijke Apr 1, 2026
5b2a7bd
fix: resolve clippy doc_markdown and needless_range_loop warnings
mickvandijke Apr 1, 2026
31d9d76
refactor: trigger replication sync on KClosestPeersChanged instead of…
mickvandijke Apr 1, 2026
16d5ba5
fix: gate bootstrap sync on DhtNetworkEvent::BootstrapComplete
mickvandijke Apr 1, 2026
2a69e58
feat: dynamic audit batch sizing based on local store size
mickvandijke Apr 1, 2026
0a3a01f
refactor: use local RT lookups instead of network lookups in audit
mickvandijke Apr 1, 2026
85eb73a
refactor: pick audit peer first, then find their responsible keys
mickvandijke Apr 1, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ jobs:
uses: foundry-rs/foundry-toolchain@v1
with:
version: nightly
- name: Run tests
run: cargo test
- name: Run unit tests
run: cargo test --lib --features test-utils
- name: Run e2e tests
run: cargo test --test e2e --features test-utils -- --test-threads=1

doc:
name: Documentation
Expand Down
6 changes: 4 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,10 @@ jobs:
- name: Run clippy
run: cargo clippy --all-targets --all-features -- -D warnings

- name: Run tests
run: cargo test
- name: Run unit tests
run: cargo test --lib --features test-utils
- name: Run e2e tests
run: cargo test --test e2e --features test-utils -- --test-threads=1

build:
name: Build ${{ matrix.target }}
Expand Down
658 changes: 658 additions & 0 deletions docs/REPLICATION_DESIGN.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/ant_protocol/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use serde::{Deserialize, Serialize};

/// Protocol identifier for chunk operations.
pub const CHUNK_PROTOCOL_ID: &str = "autonomi/ant/chunk/v1";
pub const CHUNK_PROTOCOL_ID: &str = "autonomi.ant.chunk.v1";

Comment on lines 11 to 13
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing CHUNK_PROTOCOL_ID changes the protocol string used for routing chunk messages. This is a wire-compatibility breaking change for any existing nodes/clients still using the previous ID. Consider calling this out explicitly in the PR description/release notes and/or providing a migration/compatibility strategy (e.g., supporting both IDs for a deprecation window or bumping protocol/versioning accordingly).

Copilot uses AI. Check for mistakes.
/// Current protocol version.
pub const PROTOCOL_VERSION: u16 = 1;
Expand Down Expand Up @@ -519,7 +519,7 @@ mod tests {

#[test]
fn test_constants() {
assert_eq!(CHUNK_PROTOCOL_ID, "autonomi/ant/chunk/v1");
assert_eq!(CHUNK_PROTOCOL_ID, "autonomi.ant.chunk.v1");
assert_eq!(PROTOCOL_VERSION, 1);
assert_eq!(MAX_CHUNK_SIZE, 4 * 1024 * 1024);
assert_eq!(DATA_TYPE_CHUNK, 0);
Expand Down
4 changes: 4 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pub enum Error {
#[error("invalid chunk: {0}")]
InvalidChunk(String),

/// Replication error.
#[error("replication error: {0}")]
Replication(String),

/// Node is shutting down.
#[error("node is shutting down")]
ShuttingDown,
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub mod error;
pub mod event;
pub mod node;
pub mod payment;
pub mod replication;
pub mod storage;
pub mod upgrade;

Expand All @@ -65,6 +66,7 @@ pub use error::{Error, Result};
pub use event::{NodeEvent, NodeEventsChannel};
pub use node::{NodeBuilder, RunningNode};
pub use payment::{PaymentStatus, PaymentVerifier, PaymentVerifierConfig};
pub use replication::{config::ReplicationConfig, ReplicationEngine};
pub use storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};

/// Re-exports from `saorsa-core` so downstream crates (e.g. `ant-client`)
Expand Down
52 changes: 51 additions & 1 deletion src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use crate::event::{create_event_channel, NodeEvent, NodeEventsChannel, NodeEvent
use crate::payment::metrics::QuotingMetricsTracker;
use crate::payment::wallet::parse_rewards_address;
use crate::payment::{EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator};
use crate::replication::config::ReplicationConfig;
use crate::replication::ReplicationEngine;
use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
use crate::upgrade::{
upgrade_cache_dir, AutoApplyUpgrader, BinaryCache, ReleaseCache, UpgradeMonitor, UpgradeResult,
Expand Down Expand Up @@ -133,15 +135,43 @@ impl NodeBuilder {
None
};

let p2p_arc = Arc::new(p2p_node);

// Initialize replication engine (if storage is enabled)
let replication_engine = if let Some(ref protocol) = ant_protocol {
let repl_config = ReplicationConfig::default();
let storage_arc = protocol.storage();
let payment_verifier_arc = protocol.payment_verifier_arc();
match ReplicationEngine::new(
repl_config,
Arc::clone(&p2p_arc),
storage_arc,
payment_verifier_arc,
&self.config.root_dir,
shutdown.clone(),
)
.await
{
Ok(engine) => Some(engine),
Err(e) => {
warn!("Failed to initialize replication engine: {e}");
None
}
}
} else {
None
};

let node = RunningNode {
config: self.config,
p2p_node: Arc::new(p2p_node),
p2p_node: p2p_arc,
shutdown,
events_tx,
events_rx: Some(events_rx),
upgrade_monitor,
bootstrap_manager,
ant_protocol,
replication_engine,
protocol_task: None,
upgrade_exit_code: Arc::new(AtomicI32::new(-1)),
};
Expand Down Expand Up @@ -431,6 +461,8 @@ pub struct RunningNode {
bootstrap_manager: Option<BootstrapManager>,
/// ANT protocol handler for chunk storage.
ant_protocol: Option<Arc<AntProtocol>>,
/// Replication engine (manages neighbor sync, verification, audits).
replication_engine: Option<ReplicationEngine>,
/// Protocol message routing background task.
protocol_task: Option<JoinHandle<()>>,
/// Exit code requested by a successful upgrade (-1 = no upgrade exit pending).
Expand Down Expand Up @@ -466,6 +498,14 @@ impl RunningNode {
pub async fn run(&mut self) -> Result<()> {
info!("Node runtime loop starting");

// Subscribe to DHT events BEFORE starting the P2P node so the
// bootstrap-sync task does not miss the BootstrapComplete event
// emitted during P2PNode::start().
let dht_events_for_bootstrap = self
.replication_engine
.as_ref()
.map(|_| self.p2p_node.dht_manager().subscribe_events());

// Start the P2P node
self.p2p_node
.start()
Expand Down Expand Up @@ -493,6 +533,16 @@ impl RunningNode {
// Start protocol message routing (P2P → AntProtocol → P2P response)
self.start_protocol_routing();

// Start replication engine background tasks
if let Some(ref mut engine) = self.replication_engine {
// Safety: dht_events_for_bootstrap is Some when replication_engine
// is Some (both arms use the same condition).
if let Some(dht_events) = dht_events_for_bootstrap {
engine.start(dht_events);
}
info!("Replication engine started");
}

// Start upgrade monitor if enabled
if let Some(monitor) = self.upgrade_monitor.take() {
let events_tx = self.events_tx.clone();
Expand Down
Loading
Loading