diff --git a/ant-core/Cargo.toml b/ant-core/Cargo.toml index 561698d..8da3c72 100644 --- a/ant-core/Cargo.toml +++ b/ant-core/Cargo.toml @@ -90,3 +90,7 @@ path = "tests/merkle_unit.rs" [[test]] name = "e2e_merkle" path = "tests/e2e_merkle.rs" + +[[test]] +name = "e2e_huge_file" +path = "tests/e2e_huge_file.rs" diff --git a/ant-core/src/data/client/batch.rs b/ant-core/src/data/client/batch.rs index 1c5df1c..67e3e46 100644 --- a/ant-core/src/data/client/batch.rs +++ b/ant-core/src/data/client/batch.rs @@ -14,6 +14,8 @@ use ant_node::core::{MultiAddr, PeerId}; use ant_node::payment::{serialize_single_node_proof, PaymentProof, SingleNodePayment}; use bytes::Bytes; use evmlib::common::TxHash; +use evmlib::contract::payment_vault::get_market_price; +use evmlib::wallet::PayForQuotesError; use futures::stream::{self, StreamExt}; use std::collections::{HashMap, HashSet}; use tracing::{debug, info}; @@ -180,12 +182,11 @@ impl Client { .map(|(_, _, quote, _)| quote.quoting_metrics.clone()) .collect(); - let contract_prices = - evmlib::contract::payment_vault::get_market_price(evm_network, metrics_batch) - .await - .map_err(|e| { - Error::Payment(format!("Failed to get market prices from contract: {e}")) - })?; + let contract_prices = get_market_price(evm_network, metrics_batch) + .await + .map_err(|e| { + Error::Payment(format!("Failed to get market prices from contract: {e}")) + })?; if contract_prices.len() != quotes_with_peers.len() { return Err(Error::Payment(format!( @@ -237,8 +238,8 @@ impl Client { let wallet = self.require_wallet()?; // Flatten all quote payments from all chunks into a single batch. - let mut all_payments = - Vec::with_capacity(prepared.len() * prepared[0].payment.quotes.len()); + let total_quotes: usize = prepared.iter().map(|c| c.payment.quotes.len()).sum(); + let mut all_payments = Vec::with_capacity(total_quotes); for chunk in &prepared { for info in &chunk.payment.quotes { all_payments.push((info.quote_hash, info.rewards_address, info.amount)); @@ -251,11 +252,13 @@ impl Client { all_payments.len() ); - let (tx_hash_map, _gas_info) = wallet.pay_for_quotes(all_payments).await.map_err( - |evmlib::wallet::PayForQuotesError(err, _)| { - Error::Payment(format!("Batch payment failed: {err}")) - }, - )?; + let (tx_hash_map, _gas_info) = + wallet + .pay_for_quotes(all_payments) + .await + .map_err(|PayForQuotesError(err, _)| { + Error::Payment(format!("Batch payment failed: {err}")) + })?; info!( "Batch payment succeeded: {} transactions", diff --git a/ant-core/src/data/client/file.rs b/ant-core/src/data/client/file.rs index 9c10dca..252520a 100644 --- a/ant-core/src/data/client/file.rs +++ b/ant-core/src/data/client/file.rs @@ -3,10 +3,15 @@ //! Upload files directly from disk without loading them entirely into memory. //! Uses `stream_encrypt` to process files in 8KB chunks, encrypting and //! uploading each piece as it's produced. +//! +//! Encrypted chunks are spilled to a temporary directory during encryption +//! so that peak memory usage is bounded to one wave (~256 MB for 64 × 4 MB +//! chunks) regardless of file size. +//! //! For in-memory data uploads, see the `data` module. use crate::data::client::batch::{finalize_batch_payment, PaymentIntent, PreparedChunk}; -use crate::data::client::merkle::PaymentMode; +use crate::data::client::merkle::{MerkleBatchPaymentResult, PaymentMode}; use crate::data::client::Client; use crate::data::error::{Error, Result}; use ant_evm::QuoteHash; @@ -14,13 +19,171 @@ use ant_node::ant_protocol::DATA_TYPE_CHUNK; use ant_node::client::compute_address; use bytes::Bytes; use evmlib::common::TxHash; +use futures::stream::{self, StreamExt}; use self_encryption::{stream_encrypt, streaming_decrypt, DataMap}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::io::Write; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use tokio::runtime::Handle; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; +use xor_name::XorName; + +/// Number of chunks per upload wave (matches batch.rs PAYMENT_WAVE_SIZE). +const UPLOAD_WAVE_SIZE: usize = 64; + +/// Extra headroom percentage for disk space check. +/// +/// Encrypted chunks are slightly larger than the source data due to padding +/// and self-encryption overhead. We require file_size + 10% free space in +/// the temp directory to account for this. +const DISK_SPACE_HEADROOM_PERCENT: u64 = 10; + +/// Temporary on-disk buffer for encrypted chunks. +/// +/// During file encryption, chunks are written to a temp directory so that +/// only their 32-byte addresses stay in memory. At upload time chunks are +/// read back one wave at a time, keeping peak RAM at ~`UPLOAD_WAVE_SIZE × 4 MB`. +struct ChunkSpill { + /// Temp directory holding spilled chunk files (named by hex address). + dir: PathBuf, + /// Deduplicated list of chunk addresses. + addresses: Vec<[u8; 32]>, + /// Tracks seen addresses for deduplication. + seen: HashSet<[u8; 32]>, + /// Running total of unique chunk byte sizes (for average-size calculation). + total_bytes: u64, +} + +impl ChunkSpill { + /// Create a new spill directory under the system temp dir. + /// + /// Uses `create_dir` (not `create_dir_all`) so creation fails if the + /// directory already exists, preventing silent reuse of a stale spill. + fn new() -> Result { + let unique: u64 = rand::random(); + let dir = std::env::temp_dir().join(format!(".ant_spill_{}_{unique}", std::process::id())); + std::fs::create_dir(&dir)?; + Ok(Self { + dir, + addresses: Vec::new(), + seen: HashSet::new(), + total_bytes: 0, + }) + } + + /// Write one encrypted chunk to disk and record its address. + /// + /// Deduplicates by content address: if the same chunk was already + /// spilled, the write and accounting are skipped. This prevents + /// double-uploads and inflated quoting metrics. + fn push(&mut self, content: &[u8]) -> Result<()> { + let address = compute_address(content); + if !self.seen.insert(address) { + return Ok(()); + } + let path = self.dir.join(hex::encode(address)); + std::fs::write(&path, content)?; + self.total_bytes += content.len() as u64; + self.addresses.push(address); + Ok(()) + } + + /// Number of chunks stored. + fn len(&self) -> usize { + self.addresses.len() + } + + /// Total bytes of all spilled chunks. + fn total_bytes(&self) -> u64 { + self.total_bytes + } + + /// Average chunk size in bytes (for quoting metrics). + fn avg_chunk_size(&self) -> u64 { + if self.addresses.is_empty() { + return 0; + } + self.total_bytes / self.addresses.len() as u64 + } + + /// Read a single chunk back from disk by address. + fn read_chunk(&self, address: &[u8; 32]) -> Result { + let path = self.dir.join(hex::encode(address)); + let data = std::fs::read(&path).map_err(|e| { + Error::Io(std::io::Error::new( + e.kind(), + format!("reading spilled chunk {}: {e}", hex::encode(address)), + )) + })?; + Ok(Bytes::from(data)) + } + + /// Iterate over address slices in wave-sized groups. + fn waves(&self) -> std::slice::Chunks<'_, [u8; 32]> { + self.addresses.chunks(UPLOAD_WAVE_SIZE) + } + + /// Read a wave of chunks from disk. + fn read_wave(&self, wave_addrs: &[[u8; 32]]) -> Result> { + let mut out = Vec::with_capacity(wave_addrs.len()); + for addr in wave_addrs { + let content = self.read_chunk(addr)?; + out.push((content, *addr)); + } + Ok(out) + } + + /// Clean up the spill directory. + fn cleanup(&self) { + if let Err(e) = std::fs::remove_dir_all(&self.dir) { + warn!( + "Failed to clean up chunk spill dir {}: {e}", + self.dir.display() + ); + } + } +} + +impl Drop for ChunkSpill { + fn drop(&mut self) { + self.cleanup(); + } +} + +/// Check that the temp directory has enough free space for the spilled chunks. +/// +/// `file_size` is the source file's byte count. We require +/// `file_size + 10%` free space in the temp dir to account for +/// self-encryption overhead. +fn check_disk_space_for_spill(file_size: u64) -> Result<()> { + let tmp = std::env::temp_dir(); + let available = fs2::available_space(&tmp).map_err(|e| { + Error::Io(std::io::Error::new( + e.kind(), + format!("failed to query disk space on {}: {e}", tmp.display()), + )) + })?; + + // Use integer arithmetic to avoid f64 precision loss on large file sizes. + let headroom = file_size / DISK_SPACE_HEADROOM_PERCENT; + let required = file_size.saturating_add(headroom); + + if available < required { + let avail_mb = available / (1024 * 1024); + let req_mb = required / (1024 * 1024); + return Err(Error::InsufficientDiskSpace(format!( + "need ~{req_mb} MB in temp dir ({}) but only {avail_mb} MB available", + tmp.display() + ))); + } + + debug!( + "Disk space check passed: {available} bytes available, {required} bytes required (temp: {})", + tmp.display() + ); + Ok(()) +} /// Result of a file upload: the `DataMap` needed to retrieve the file. #[derive(Debug, Clone)] @@ -84,9 +247,10 @@ fn spawn_file_encryption(path: PathBuf) -> Result { Some(Bytes::from(buffer)) } Err(e) => { - if let Ok(mut guard) = read_error_clone.lock() { - *guard = Some(e); - } + let mut guard = read_error_clone + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + *guard = Some(e); None } } @@ -100,7 +264,10 @@ fn spawn_file_encryption(path: PathBuf) -> Result { // stream_encrypt sees None (EOF) when a read fails, so it stops // producing chunks. We must detect this before sending the // partial results to avoid uploading a truncated DataMap. - if let Ok(guard) = read_error.lock() { + { + let guard = read_error + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); if let Some(ref e) = *guard { return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string()))); } @@ -114,7 +281,10 @@ fn spawn_file_encryption(path: PathBuf) -> Result { } // Final check: read error after last chunk (stream saw EOF). - if let Ok(guard) = read_error.lock() { + { + let guard = read_error + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); if let Some(ref e) = *guard { return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string()))); } @@ -123,7 +293,9 @@ fn spawn_file_encryption(path: PathBuf) -> Result { let datamap = stream .into_datamap() .ok_or_else(|| Error::Encryption("no DataMap after encryption".to_string()))?; - let _ = datamap_tx.send(datamap); + if datamap_tx.send(datamap).is_err() { + warn!("DataMap receiver dropped — upload may have been cancelled"); + } Ok(()) }); @@ -133,48 +305,16 @@ fn spawn_file_encryption(path: PathBuf) -> Result { impl Client { /// Upload a file to the network using streaming self-encryption. /// - /// The file is read in 8KB chunks, encrypted via `stream_encrypt`, - /// and each encrypted chunk is uploaded as it's produced. The file - /// is never fully loaded into memory. + /// Automatically selects merkle batch payment for files that produce + /// 64+ chunks (saves gas). Encrypted chunks are spilled to a temp + /// directory so peak memory stays at ~256 MB regardless of file size. /// /// # Errors /// /// Returns an error if the file cannot be read, encryption fails, /// or any chunk cannot be stored. pub async fn file_upload(&self, path: &Path) -> Result { - debug!("Streaming file upload: {}", path.display()); - - let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?; - - // Collect all chunks from encryption channel. - let mut chunk_contents = Vec::new(); - while let Some(content) = chunk_rx.recv().await { - chunk_contents.push(content); - } - - // Await encryption completion to catch errors before paying for storage. - handle - .await - .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))? - .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?; - - let data_map = datamap_rx - .await - .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?; - - let addresses = self.batch_upload_chunks(chunk_contents).await?; - let chunks_stored = addresses.len(); - - info!( - "File uploaded: {chunks_stored} chunks stored ({})", - path.display() - ); - - Ok(FileUploadResult { - data_map, - chunks_stored, - payment_mode_used: PaymentMode::Single, - }) + self.file_upload_with_mode(path, PaymentMode::Auto).await } /// Phase 1 of external-signer upload: encrypt file and prepare chunks. @@ -184,40 +324,50 @@ impl Client { /// and a [`PaymentIntent`] that the external signer uses to construct /// and submit the on-chain payment transaction. /// + /// **Memory note:** Encryption uses disk spilling for bounded memory, but + /// the returned [`PreparedUpload`] holds all chunk content in memory (each + /// [`PreparedChunk`] contains a `Bytes` with the full chunk data). This is + /// inherent to the two-phase external-signer protocol — the chunks must + /// stay in memory until [`Client::finalize_upload`] stores them. For very + /// large files, prefer [`Client::file_upload`] which streams directly. + /// /// # Errors /// - /// Returns an error if the file cannot be read, encryption fails, - /// or quote collection fails. + /// Returns an error if there is insufficient disk space, the file cannot + /// be read, encryption fails, or quote collection fails. pub async fn file_prepare_upload(&self, path: &Path) -> Result { debug!( "Preparing file upload for external signing: {}", path.display() ); - let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?; + let file_size = std::fs::metadata(path)?.len(); + check_disk_space_for_spill(file_size)?; - // Collect all chunks from encryption channel. - let mut chunk_contents = Vec::new(); - while let Some(content) = chunk_rx.recv().await { - chunk_contents.push(content); - } - - // Await encryption completion to catch errors before collecting quotes. - handle - .await - .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))? - .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?; + let (spill, data_map) = self.encrypt_file_to_spill(path).await?; - let data_map = datamap_rx - .await - .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?; + info!( + "Encrypted {} into {} chunks for external signing (spilled to disk)", + path.display(), + spill.len() + ); - // Prepare each chunk (collect quotes, fetch contract prices). - let mut prepared_chunks = Vec::with_capacity(chunk_contents.len()); - for content in chunk_contents { + // Read each chunk from disk and collect quotes. Note: all PreparedChunks + // accumulate in memory because the external-signer protocol requires them + // for finalize_upload. This is NOT memory-bounded for large files. + let mut prepared_chunks = Vec::with_capacity(spill.len()); + for (i, addr) in spill.addresses.iter().enumerate() { + let content = spill.read_chunk(addr)?; if let Some(prepared) = self.prepare_chunk_payment(content).await? { prepared_chunks.push(prepared); } + if (i + 1) % 100 == 0 { + info!( + "Prepared {}/{} chunks for external signing", + i + 1, + spill.len() + ); + } } let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks); @@ -264,13 +414,17 @@ impl Client { /// Upload a file with a specific payment mode. /// - /// Uses buffer-then-pay strategy: encrypts file, collects all chunks, - /// then pays via merkle batch or per-chunk depending on mode and count. + /// Before encryption, checks that the temp directory has enough free + /// disk space for the spilled chunks (~1.1× source file size). + /// + /// Encrypted chunks are spilled to a temp directory during encryption + /// so that only their 32-byte addresses stay in memory. At upload time, + /// chunks are read back one wave at a time (~64 × 4 MB ≈ 256 MB peak). /// /// # Errors /// - /// Returns an error if the file cannot be read, encryption fails, - /// or any chunk cannot be stored. + /// Returns an error if there is insufficient disk space, the file cannot + /// be read, encryption fails, or any chunk cannot be stored. #[allow(clippy::too_many_lines)] pub async fn file_upload_with_mode( &self, @@ -282,66 +436,58 @@ impl Client { path.display() ); - let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?; - - // Collect all chunks first (buffer-then-pay). - let mut chunk_contents = Vec::new(); - while let Some(content) = chunk_rx.recv().await { - chunk_contents.push(content); - } + // Pre-flight: verify enough temp disk space for the chunk spill. + let file_size = std::fs::metadata(path)?.len(); + check_disk_space_for_spill(file_size)?; - // Await encryption completion to catch errors before paying for storage. - handle - .await - .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))? - .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?; + // Phase 1: Encrypt file and spill chunks to temp directory. + // Only 32-byte addresses stay in memory — chunk data lives on disk. + let (spill, data_map) = self.encrypt_file_to_spill(path).await?; - let data_map = datamap_rx - .await - .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?; - - let chunk_count = chunk_contents.len(); + let chunk_count = spill.len(); + info!( + "Encrypted {} into {chunk_count} chunks (spilled to disk)", + path.display() + ); + // Phase 2: Decide payment mode and upload in waves from disk. + // + // Note on merkle proof memory: MerkleBatchPaymentResult.proofs holds all + // proofs simultaneously (~86 KB each due to ML-DSA-65 signatures in the + // candidate pool). For a 100 GB file (~25k chunks), this is ~2 GB. This + // is acceptable because merkle payments save significant gas costs — the + // gas savings far outweigh the proof memory overhead. let (chunks_stored, actual_mode) = if self.should_use_merkle(chunk_count, mode) { - // Merkle batch payment path + // Merkle batch payment path — needs all addresses upfront for tree. info!("Using merkle batch payment for {chunk_count} file chunks"); - let addresses: Vec<[u8; 32]> = - chunk_contents.iter().map(|c| compute_address(c)).collect(); - - let avg_size = - chunk_contents.iter().map(bytes::Bytes::len).sum::() / chunk_count.max(1); - let avg_size_u64 = u64::try_from(avg_size).unwrap_or(0); - let batch_result = match self - .pay_for_merkle_batch(&addresses, DATA_TYPE_CHUNK, avg_size_u64) + .pay_for_merkle_batch(&spill.addresses, DATA_TYPE_CHUNK, spill.avg_chunk_size()) .await { Ok(result) => result, Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => { info!("Merkle needs more peers ({msg}), falling back to wave-batch"); - let addresses = self.batch_upload_chunks(chunk_contents).await?; + let stored = self.upload_waves_single(&spill).await?; return Ok(FileUploadResult { data_map, - chunks_stored: addresses.len(), + chunks_stored: stored, payment_mode_used: PaymentMode::Single, }); } Err(e) => return Err(e), }; - let stored = self - .merkle_upload_chunks(chunk_contents, addresses, &batch_result) - .await?; + let stored = self.upload_waves_merkle(&spill, &batch_result).await?; (stored, PaymentMode::Merkle) } else { - // Wave-based batch payment path (single EVM tx per wave). - let addresses = self.batch_upload_chunks(chunk_contents).await?; - (addresses.len(), PaymentMode::Single) + // Wave-based per-chunk payment path. + let stored = self.upload_waves_single(&spill).await?; + (stored, PaymentMode::Single) }; info!( - "File uploaded with mode {mode:?}: {chunks_stored} chunks stored ({})", + "File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})", path.display() ); @@ -352,6 +498,116 @@ impl Client { }) } + /// Encrypt a file and spill chunks to a temp directory. + /// + /// Logs progress every 100 chunks so users get feedback during + /// multi-GB encryptions. + /// + /// Returns the spill buffer (addresses on disk) and the `DataMap`. + async fn encrypt_file_to_spill(&self, path: &Path) -> Result<(ChunkSpill, DataMap)> { + let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?; + + let mut spill = ChunkSpill::new()?; + while let Some(content) = chunk_rx.recv().await { + spill.push(&content)?; + if spill.len() % 100 == 0 { + let mb = spill.total_bytes() / (1024 * 1024); + info!( + "Encryption progress: {} chunks spilled ({mb} MB) — {}", + spill.len(), + path.display() + ); + } + } + + // Await encryption completion to catch errors before paying. + handle + .await + .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))? + .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?; + + let data_map = datamap_rx + .await + .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?; + + Ok((spill, data_map)) + } + + /// Upload chunks from a spill using wave-based per-chunk (single) payments. + /// + /// Reads one wave at a time from disk, prepares quotes, pays, and stores. + /// Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE` (~256 MB). + async fn upload_waves_single(&self, spill: &ChunkSpill) -> Result { + let mut total_stored = 0usize; + let total_chunks = spill.len(); + let waves: Vec<&[[u8; 32]]> = spill.waves().collect(); + let wave_count = waves.len(); + + for (wave_idx, wave_addrs) in waves.into_iter().enumerate() { + let wave_num = wave_idx + 1; + let wave_data: Vec = wave_addrs + .iter() + .map(|addr| spill.read_chunk(addr)) + .collect::>>()?; + + info!( + "Wave {wave_num}/{wave_count}: uploading {} chunks (single payment) — {total_stored}/{total_chunks} stored so far", + wave_data.len() + ); + let addresses = self.batch_upload_chunks(wave_data).await?; + total_stored += addresses.len(); + } + + Ok(total_stored) + } + + /// Upload chunks from a spill using pre-computed merkle proofs. + /// + /// Reads one wave at a time from disk, pairs each chunk with its proof, + /// and uploads concurrently. Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE`. + async fn upload_waves_merkle( + &self, + spill: &ChunkSpill, + batch_result: &MerkleBatchPaymentResult, + ) -> Result { + let mut total_stored = 0usize; + let total_chunks = spill.len(); + let waves: Vec<&[[u8; 32]]> = spill.waves().collect(); + let wave_count = waves.len(); + + for (wave_idx, wave_addrs) in waves.into_iter().enumerate() { + let wave_num = wave_idx + 1; + let wave = spill.read_wave(wave_addrs)?; + + info!( + "Wave {wave_num}/{wave_count}: uploading {} chunks (merkle) — {total_stored}/{total_chunks} stored so far", + wave.len() + ); + + let mut upload_stream = stream::iter(wave.into_iter().map(|(content, addr)| { + let proof_bytes = batch_result.proofs.get(&addr).cloned(); + async move { + let proof = proof_bytes.ok_or_else(|| { + Error::Payment(format!( + "Missing merkle proof for chunk {}", + hex::encode(addr) + )) + })?; + let peers = self.close_group_peers(&addr).await?; + self.chunk_put_to_close_group(content, proof, &peers).await + } + })) + .buffer_unordered(self.config().chunk_concurrency); + + while let Some(result) = upload_stream.next().await { + result?; + total_stored += 1; + } + } + + Ok(total_stored) + } + /// Download and decrypt a file from the network, writing it to disk. /// /// Uses `streaming_decrypt` so that only one batch of chunks lives in @@ -361,6 +617,13 @@ impl Client { /// /// Returns the number of bytes written. /// + /// # Panics + /// + /// Requires a multi-threaded Tokio runtime (`flavor = "multi_thread"`). + /// Will panic if called from a `current_thread` runtime because + /// `streaming_decrypt` takes a synchronous callback that must bridge + /// back to async via `block_in_place`. + /// /// # Errors /// /// Returns an error if any chunk cannot be retrieved, decryption fails, @@ -371,8 +634,8 @@ impl Client { let handle = Handle::current(); - let stream = streaming_decrypt(data_map, |batch: &[(usize, xor_name::XorName)]| { - let batch_owned: Vec<(usize, xor_name::XorName)> = batch.to_vec(); + let stream = streaming_decrypt(data_map, |batch: &[(usize, XorName)]| { + let batch_owned: Vec<(usize, XorName)> = batch.to_vec(); tokio::task::block_in_place(|| { handle.block_on(async { @@ -427,22 +690,121 @@ impl Client { })(); match write_result { - Ok(bytes_written) => { - std::fs::rename(&tmp_path, output)?; - info!( - "File downloaded: {bytes_written} bytes written to {}", - output.display() - ); - Ok(bytes_written) - } + Ok(bytes_written) => match std::fs::rename(&tmp_path, output) { + Ok(()) => { + info!( + "File downloaded: {bytes_written} bytes written to {}", + output.display() + ); + Ok(bytes_written) + } + Err(rename_err) => { + if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) { + warn!( + "Failed to remove temp download file {}: {cleanup_err}", + tmp_path.display() + ); + } + Err(rename_err.into()) + } + }, Err(e) => { - let _ = std::fs::remove_file(&tmp_path); + if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) { + warn!( + "Failed to remove temp download file {}: {cleanup_err}", + tmp_path.display() + ); + } Err(e) } } } } +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + + #[test] + fn disk_space_check_passes_for_small_file() { + // A 1 KB file should always pass the disk space check + check_disk_space_for_spill(1024).unwrap(); + } + + #[test] + fn disk_space_check_fails_for_absurd_size() { + // Requesting space for a 1 exabyte file should fail on any real system + let result = check_disk_space_for_spill(u64::MAX / 2); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + matches!(err, Error::InsufficientDiskSpace(_)), + "expected InsufficientDiskSpace, got: {err}" + ); + } + + #[test] + fn chunk_spill_round_trip() { + let mut spill = ChunkSpill::new().unwrap(); + let data1 = vec![0xAA; 1024]; + let data2 = vec![0xBB; 2048]; + + spill.push(&data1).unwrap(); + spill.push(&data2).unwrap(); + + assert_eq!(spill.len(), 2); + assert_eq!(spill.total_bytes(), 1024 + 2048); + assert_eq!(spill.avg_chunk_size(), (1024 + 2048) / 2); + + // Read back and verify + let chunk1 = spill.read_chunk(spill.addresses.first().unwrap()).unwrap(); + assert_eq!(&chunk1[..], &data1[..]); + + let chunk2 = spill.read_chunk(spill.addresses.get(1).unwrap()).unwrap(); + assert_eq!(&chunk2[..], &data2[..]); + + // Verify waves with 1-chunk wave size + let waves: Vec<_> = spill.addresses.chunks(1).collect(); + assert_eq!(waves.len(), 2); + } + + #[test] + fn chunk_spill_cleanup_on_drop() { + let dir; + { + let spill = ChunkSpill::new().unwrap(); + dir = spill.dir.clone(); + assert!(dir.exists()); + } + // After drop, the directory should be cleaned up + assert!(!dir.exists(), "spill dir should be removed on drop"); + } + + #[test] + fn chunk_spill_deduplicates_identical_content() { + let mut spill = ChunkSpill::new().unwrap(); + let data = vec![0xCC; 512]; + + spill.push(&data).unwrap(); + spill.push(&data).unwrap(); // same content, should be skipped + spill.push(&data).unwrap(); // again + + assert_eq!(spill.len(), 1, "duplicate chunks should be deduplicated"); + assert_eq!( + spill.total_bytes(), + 512, + "total_bytes should count unique only" + ); + + // Different content should still be added + let data2 = vec![0xDD; 256]; + spill.push(&data2).unwrap(); + assert_eq!(spill.len(), 2); + assert_eq!(spill.total_bytes(), 512 + 256); + } +} + /// Compile-time assertions that Client file method futures are Send. #[cfg(test)] mod send_assertions { diff --git a/ant-core/src/data/error.rs b/ant-core/src/data/error.rs index e637ecb..7d5d542 100644 --- a/ant-core/src/data/error.rs +++ b/ant-core/src/data/error.rs @@ -63,6 +63,10 @@ pub enum Error { /// Data already exists on the network — no payment needed. #[error("already stored on network")] AlreadyStored, + + /// Not enough disk space for the operation. + #[error("insufficient disk space: {0}")] + InsufficientDiskSpace(String), } impl From for Error { @@ -158,6 +162,15 @@ mod tests { assert_eq!(err.to_string(), "encryption error: decrypt failed"); } + #[test] + fn test_display_insufficient_disk_space() { + let err = Error::InsufficientDiskSpace("need 100 MB but only 10 MB available".to_string()); + assert_eq!( + err.to_string(), + "insufficient disk space: need 100 MB but only 10 MB available" + ); + } + #[test] fn test_from_io_error() { let io_err = std::io::Error::new(std::io::ErrorKind::PermissionDenied, "access denied"); diff --git a/ant-core/tests/e2e_huge_file.rs b/ant-core/tests/e2e_huge_file.rs new file mode 100644 index 0000000..e708ceb --- /dev/null +++ b/ant-core/tests/e2e_huge_file.rs @@ -0,0 +1,501 @@ +//! E2E tests for huge file upload/download with bounded memory. +//! +//! Verifies that multi-GB files can be uploaded and downloaded without +//! memory growing proportionally to file size, thanks to chunk spilling. + +#![allow(clippy::unwrap_used, clippy::expect_used)] + +mod support; + +use ant_core::data::{Client, ClientConfig}; +use serial_test::serial; +use std::io::Write; +use std::sync::Arc; +use support::MiniTestnet; +use tempfile::TempDir; + +/// Size of the 1 GB test file. +const FILE_SIZE_1GB: u64 = 1024 * 1024 * 1024; + +/// Maximum allowed current-RSS increase during upload, in bytes. +/// +/// The chunk spill mechanism keeps peak memory to one wave (~256 MB). +/// We allow up to 512 MB of RSS growth for the wave buffer, allocator +/// overhead, and test infrastructure churn. Without spilling, RSS would +/// grow by ≥1 GB (all encrypted chunks held simultaneously). +const MAX_RSS_INCREASE_BYTES: u64 = 512 * 1024 * 1024; + +async fn setup_large() -> (Client, MiniTestnet) { + let testnet = MiniTestnet::start(6).await; + let node = testnet.node(3).expect("Node 3 should exist"); + + let client = Client::from_node(Arc::clone(&node), ClientConfig::default()) + .with_wallet(testnet.wallet().clone()); + + (client, testnet) +} + +/// Get **current** resident set size in bytes (NOT peak). +/// +/// Unlike `ru_maxrss` (which is a monotonic high-water mark and never +/// decreases), this returns the actual RSS right now. This is what we +/// need to verify bounded memory — we sample before and after the +/// operation and check the delta. +fn get_current_rss_bytes() -> Option { + #[cfg(target_os = "macos")] + { + get_current_rss_macos() + } + #[cfg(target_os = "linux")] + { + get_current_rss_linux() + } + #[cfg(not(any(target_os = "macos", target_os = "linux")))] + { + None + } +} + +/// Get current RSS on macOS via `mach_task_basic_info`. +/// +/// `resident_size` is the actual current RSS, not a peak. +#[cfg(target_os = "macos")] +fn get_current_rss_macos() -> Option { + use std::mem; + + // mach_task_basic_info struct layout + #[repr(C)] + struct MachTaskBasicInfo { + virtual_size: u64, + resident_size: u64, + resident_size_max: u64, + user_time: libc::time_value_t, + system_time: libc::time_value_t, + policy: i32, + suspend_count: i32, + } + + const MACH_TASK_BASIC_INFO: u32 = 20; + + extern "C" { + fn mach_task_self() -> u32; + fn task_info( + target_task: u32, + flavor: u32, + task_info_out: *mut MachTaskBasicInfo, + task_info_out_cnt: *mut u32, + ) -> i32; + } + + let mut info: MachTaskBasicInfo = unsafe { mem::zeroed() }; + let mut count = (mem::size_of::() / mem::size_of::()) as u32; + + let kr = unsafe { + task_info( + mach_task_self(), + MACH_TASK_BASIC_INFO, + &mut info, + &mut count, + ) + }; + + if kr == 0 { + Some(info.resident_size) + } else { + None + } +} + +/// Get current RSS on Linux via /proc/self/statm. +/// +/// Field 1 (resident) is in pages. +#[cfg(target_os = "linux")] +fn get_current_rss_linux() -> Option { + let statm = std::fs::read_to_string("/proc/self/statm").ok()?; + let resident_pages: u64 = statm.split_whitespace().nth(1)?.parse().ok()?; + let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) }; + if page_size <= 0 { + return None; + } + Some(resident_pages * page_size as u64) +} + +/// Fixed PRNG seed for deterministic, incompressible test data. +/// +/// Using a seeded PRNG (not a repeating pattern) ensures encrypted chunks +/// are full-size (~4 MB each), exercising the worst-case wave buffer. +/// A repeating 256-byte pattern would compress to tiny chunks, making +/// the memory test pass trivially without stressing the wave buffer. +const TEST_SEED: u64 = 0xDEAD_BEEF_CAFE_BABE; + +/// Simple xorshift64 PRNG — deterministic, fast, incompressible output. +struct Xorshift64(u64); + +impl Xorshift64 { + fn new(seed: u64) -> Self { + Self(seed) + } + + fn next_u8(&mut self) -> u8 { + self.0 ^= self.0 << 13; + self.0 ^= self.0 >> 7; + self.0 ^= self.0 << 17; + (self.0 & 0xFF) as u8 + } +} + +/// Create a deterministic test file of the given size. +/// +/// Uses a seeded PRNG to produce incompressible content so encrypted chunks +/// are full-size (~4 MB each), properly exercising the wave buffer. The same +/// seed is used in `verify_file_content` for verification. +fn create_test_file(dir: &TempDir, size: u64) -> std::path::PathBuf { + let path = dir.path().join("huge_test_file.bin"); + let mut file = std::fs::File::create(&path).expect("create test file"); + + let mut rng = Xorshift64::new(TEST_SEED); + let mut remaining = size; + + // Write in 1 MB chunks to avoid holding the whole file in memory + let write_buf_size: usize = 1024 * 1024; + let mut buf = vec![0u8; write_buf_size]; + while remaining > 0 { + let to_write = remaining.min(write_buf_size as u64) as usize; + for byte in buf.iter_mut().take(to_write) { + *byte = rng.next_u8(); + } + file.write_all(&buf[..to_write]) + .expect("write chunk to test file"); + remaining -= to_write as u64; + } + file.flush().expect("flush test file"); + drop(file); + drop(buf); + + let meta = std::fs::metadata(&path).expect("stat test file"); + assert_eq!(meta.len(), size, "test file size mismatch"); + + path +} + +/// Verify downloaded file matches the expected PRNG sequence. +/// +/// Regenerates the same PRNG stream and compares byte-by-byte, +/// reading in 1 MB chunks to avoid loading the file into memory. +fn verify_file_content(path: &std::path::Path, expected_size: u64) { + let meta = std::fs::metadata(path).expect("stat downloaded file"); + assert_eq!( + meta.len(), + expected_size, + "downloaded file size should match original" + ); + + let mut rng = Xorshift64::new(TEST_SEED); + let file = std::fs::File::open(path).expect("open downloaded file"); + let mut reader = std::io::BufReader::new(file); + let mut offset = 0u64; + let mut buf = vec![0u8; 1024 * 1024]; + + loop { + let n = std::io::Read::read(&mut reader, &mut buf).expect("read downloaded file"); + if n == 0 { + break; + } + for &byte in &buf[..n] { + let expected = rng.next_u8(); + if byte != expected { + panic!( + "content mismatch at byte {offset}: got {byte:#04x}, expected {expected:#04x}" + ); + } + offset += 1; + } + } + assert_eq!(offset, expected_size, "total bytes read mismatch"); +} + +/// Upload and download a 1 GB file, verifying data integrity and bounded memory. +/// +/// This test verifies that: +/// 1. A 1 GB file can be uploaded without running out of memory +/// 2. The downloaded file is byte-for-byte identical +/// 3. **Current** RSS (not peak) stays well below the file size during upload +/// +/// We measure current RSS (via `mach_task_basic_info` on macOS, +/// `/proc/self/statm` on Linux) — NOT `ru_maxrss` which is a monotonic +/// high-water mark that could give false passes. +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn test_huge_file_upload_download_1gb() { + let (client, testnet) = setup_large().await; + + let work_dir = TempDir::new().expect("create work dir"); + + // Phase 1: Create a 1 GB test file. + // Do this BEFORE measuring RSS baseline so the write buffer is freed. + eprintln!("Creating {FILE_SIZE_1GB} byte test file..."); + let input_path = create_test_file(&work_dir, FILE_SIZE_1GB); + eprintln!("Test file created at {}", input_path.display()); + + // Let the allocator settle — drop any write buffers, run a GC-like pass. + tokio::task::yield_now().await; + + // Record baseline RSS AFTER file creation and infra setup so the + // delta only captures the upload operation itself. + let rss_before = get_current_rss_bytes(); + if let Some(rss) = rss_before { + eprintln!("RSS baseline before upload: {} MB", rss / (1024 * 1024)); + } + + // Phase 2: Upload + eprintln!("Uploading 1 GB file..."); + let result = client + .file_upload(input_path.as_path()) + .await + .expect("1 GB file upload should succeed"); + + // Measure RSS immediately after upload completes + let rss_after_upload = get_current_rss_bytes(); + + eprintln!( + "Upload complete: {} chunks stored, mode: {:?}", + result.chunks_stored, result.payment_mode_used + ); + + // A 1 GB file at 4 MB max chunk size produces ~256 chunks minimum + assert!( + result.chunks_stored >= 200, + "1 GB file should produce at least 200 chunks, got {}", + result.chunks_stored + ); + + // Phase 3: Check memory impact using CURRENT RSS (not peak) + if let (Some(before), Some(after)) = (rss_before, rss_after_upload) { + let increase = after.saturating_sub(before); + let increase_mb = increase / (1024 * 1024); + let max_mb = MAX_RSS_INCREASE_BYTES / (1024 * 1024); + eprintln!( + "Current RSS before: {} MB, after: {} MB, increase: {increase_mb} MB (limit: {max_mb} MB)", + before / (1024 * 1024), + after / (1024 * 1024) + ); + assert!( + increase < MAX_RSS_INCREASE_BYTES, + "RSS increased by {increase_mb} MB which exceeds the {max_mb} MB limit — \ + chunk spilling may not be working correctly. \ + Before: {} MB, After: {} MB", + before / (1024 * 1024), + after / (1024 * 1024) + ); + } else { + // On macOS/Linux, RSS measurement must succeed — a silent skip would + // let the bounded-memory claim go unverified in CI. + #[cfg(any(target_os = "macos", target_os = "linux"))] + panic!("RSS measurement failed on a supported platform — cannot verify bounded memory"); + + #[cfg(not(any(target_os = "macos", target_os = "linux")))] + eprintln!("WARNING: Could not measure current RSS on this platform, skipping memory check"); + } + + // Phase 4: Download to a separate file + let output_path = work_dir.path().join("downloaded_huge.bin"); + eprintln!("Downloading 1 GB file..."); + let bytes_written = client + .file_download(&result.data_map, &output_path) + .await + .expect("1 GB file download should succeed"); + + assert_eq!( + bytes_written, FILE_SIZE_1GB, + "bytes_written should equal original file size" + ); + eprintln!("Download complete: {bytes_written} bytes written"); + + // Phase 5: Verify content integrity (streaming, not in-memory) + eprintln!("Verifying file content..."); + verify_file_content(&output_path, FILE_SIZE_1GB); + eprintln!("Content verification passed — all {FILE_SIZE_1GB} bytes match"); + + drop(client); + testnet.teardown().await; +} + +/// Test that uploading fails early with a clear error when disk space is +/// insufficient for the chunk spill. +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn test_file_upload_disk_space_check() { + let (client, testnet) = setup_large().await; + + // Verify the check works for a small file (should succeed) + let work_dir = TempDir::new().expect("create work dir"); + let small_path = work_dir.path().join("small.bin"); + { + let mut f = std::fs::File::create(&small_path).expect("create small file"); + f.write_all(&vec![0xAA; 4096]).expect("write small file"); + f.flush().expect("flush small file"); + } + + let result = client.file_upload(small_path.as_path()).await; + assert!( + result.is_ok(), + "small file upload should succeed (disk space sufficient): {:?}", + result.err() + ); + + drop(client); + testnet.teardown().await; +} + +/// Upload and download a 4 GB file, proving memory is bounded regardless of +/// file size. +/// +/// The key assertion: RSS increase for a 4 GB file should be comparable to +/// the 1 GB test (~185 MB), NOT 4x larger. If RSS scales linearly with +/// file size, spilling is broken. If it stays flat, spilling works. +/// +/// This test also exercises multi-wave dynamics much more heavily: +/// 4 GB → ~1024 chunks → ~16 waves of 64 chunks each. +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn test_huge_file_upload_download_4gb() { + let file_size: u64 = 4 * 1024 * 1024 * 1024; + + let (client, testnet) = setup_large().await; + let work_dir = TempDir::new().expect("create work dir"); + + eprintln!("Creating 4 GB test file..."); + let input_path = create_test_file(&work_dir, file_size); + eprintln!("Test file created at {}", input_path.display()); + + tokio::task::yield_now().await; + + let rss_before = get_current_rss_bytes(); + if let Some(rss) = rss_before { + eprintln!("RSS baseline before upload: {} MB", rss / (1024 * 1024)); + } + + eprintln!("Uploading 4 GB file..."); + let result = client + .file_upload(input_path.as_path()) + .await + .expect("4 GB file upload should succeed"); + + let rss_after_upload = get_current_rss_bytes(); + + eprintln!( + "Upload complete: {} chunks stored, mode: {:?}", + result.chunks_stored, result.payment_mode_used + ); + + // 4 GB at MAX_CHUNK_SIZE (4,190,208 bytes) → ~1026 chunks minimum + assert!( + result.chunks_stored >= 1000, + "4 GB file should produce at least 1000 chunks, got {}", + result.chunks_stored + ); + + // Memory check: same 512 MB limit as the 1 GB test. + // If spilling works, a 4 GB file should use roughly the same memory + // as a 1 GB file — the wave buffer is the same size regardless. + if let (Some(before), Some(after)) = (rss_before, rss_after_upload) { + let increase = after.saturating_sub(before); + let increase_mb = increase / (1024 * 1024); + let max_mb = MAX_RSS_INCREASE_BYTES / (1024 * 1024); + eprintln!( + "Current RSS before: {} MB, after: {} MB, increase: {increase_mb} MB (limit: {max_mb} MB)", + before / (1024 * 1024), + after / (1024 * 1024) + ); + assert!( + increase < MAX_RSS_INCREASE_BYTES, + "RSS increased by {increase_mb} MB for a 4 GB file — \ + this exceeds the {max_mb} MB limit and suggests memory \ + scales with file size instead of staying bounded. \ + Before: {} MB, After: {} MB", + before / (1024 * 1024), + after / (1024 * 1024) + ); + } else { + // On macOS/Linux, RSS measurement must succeed — a silent skip would + // let the bounded-memory claim go unverified in CI. + #[cfg(any(target_os = "macos", target_os = "linux"))] + panic!("RSS measurement failed on a supported platform — cannot verify bounded memory"); + + #[cfg(not(any(target_os = "macos", target_os = "linux")))] + eprintln!("WARNING: Could not measure current RSS on this platform, skipping memory check"); + } + + // Download and verify + let output_path = work_dir.path().join("downloaded_4gb.bin"); + eprintln!("Downloading 4 GB file..."); + let bytes_written = client + .file_download(&result.data_map, &output_path) + .await + .expect("4 GB file download should succeed"); + + assert_eq!( + bytes_written, file_size, + "bytes_written should equal original file size" + ); + eprintln!("Download complete: {bytes_written} bytes written"); + + eprintln!("Verifying file content..."); + verify_file_content(&output_path, file_size); + eprintln!("Content verification passed — all 4 GB match"); + + drop(client); + testnet.teardown().await; +} + +/// Test that a moderately large file (64 MB) round-trips correctly. +/// +/// This test verifies data integrity on a medium file. Memory bounding +/// is not asserted here because 64 MB produces ~20 chunks which fit in +/// a single wave — there are no multi-wave dynamics to test. The 1 GB +/// test is the one that proves memory is bounded across waves. +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn test_large_file_64mb_round_trip() { + let file_size: u64 = 64 * 1024 * 1024; + + let (client, testnet) = setup_large().await; + let work_dir = TempDir::new().expect("create work dir"); + + eprintln!("Creating 64 MB test file..."); + let input_path = create_test_file(&work_dir, file_size); + + eprintln!("Uploading 64 MB file..."); + let result = client + .file_upload(input_path.as_path()) + .await + .expect("64 MB file upload should succeed"); + + eprintln!( + "Upload complete: {} chunks stored, mode: {:?}", + result.chunks_stored, result.payment_mode_used + ); + + // 64 MB at ~4 MB chunks -> ~16+ chunks + assert!( + result.chunks_stored >= 10, + "64 MB file should produce at least 10 chunks, got {}", + result.chunks_stored + ); + + let output_path = work_dir.path().join("downloaded_64mb.bin"); + eprintln!("Downloading 64 MB file..."); + let bytes_written = client + .file_download(&result.data_map, &output_path) + .await + .expect("64 MB file download should succeed"); + + assert_eq!(bytes_written, file_size); + + eprintln!("Verifying content..."); + verify_file_content(&output_path, file_size); + eprintln!("64 MB round-trip verified"); + + drop(client); + testnet.teardown().await; +}