From 85dfb43f9a95ea70bb94c76d313a7c5eabcecfff Mon Sep 17 00:00:00 2001 From: Jake Saferstein Date: Tue, 6 Jan 2026 13:44:36 -0500 Subject: [PATCH] flush every --- Cargo.lock | 17 +++++++++++ lading/Cargo.toml | 1 + lading/src/generator/file_gen/logrotate.rs | 32 ++++++++++++++++++-- lading/src/generator/file_gen/traditional.rs | 21 +++++++++++-- 4 files changed, 66 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 94b8b4ec2..94fd4bd15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1411,6 +1411,22 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" + +[[package]] +name = "humantime-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c" +dependencies = [ + "humantime", + "serde", +] + [[package]] name = "hyper" version = "1.8.1" @@ -1860,6 +1876,7 @@ dependencies = [ "http", "http-body-util", "http-serde", + "humantime-serde", "hyper", "hyper-util", "k8s-openapi", diff --git a/lading/Cargo.toml b/lading/Cargo.toml index a53e1ea31..e56538640 100644 --- a/lading/Cargo.toml +++ b/lading/Cargo.toml @@ -68,6 +68,7 @@ reqwest = { version = "0.12", default-features = false, features = [ rustc-hash = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +humantime-serde = { version = "1.1" } serde_qs = { version = "0.15", default-features = false } serde_yaml = { version = "0.9" } sha2 = { workspace = true, features = ["std"] } diff --git a/lading/src/generator/file_gen/logrotate.rs b/lading/src/generator/file_gen/logrotate.rs index 8588bd9b0..bdf076192 100644 --- a/lading/src/generator/file_gen/logrotate.rs +++ b/lading/src/generator/file_gen/logrotate.rs @@ -18,8 +18,11 @@ use std::{ num::NonZeroU32, path::{Path, PathBuf}, sync::Arc, + time::Duration, }; +use tokio::time::Instant; + use byte_unit::Byte; use futures::future::join_all; use metrics::counter; @@ -117,6 +120,11 @@ pub enum Error { ThrottleConversion(#[from] ThrottleConversionError), } +#[allow(clippy::unnecessary_wraps)] +fn default_flush_every() -> Option { + Some(Duration::from_secs(1)) +} + #[derive(Debug, Deserialize, Serialize, PartialEq, Clone)] #[serde(deny_unknown_fields)] /// Configuration of [`FileGen`] @@ -150,6 +158,10 @@ pub struct Config { /// Throughput profile controlling emission rate (bytes or blocks). #[serde(default)] pub throttle: Option, + /// Force flush at a regular interval. Defaults to 1 second. + /// Accepts human-readable durations (e.g., "1s", "500ms", "2s"). + #[serde(default = "default_flush_every", with = "humantime_serde")] + pub flush_every: Option, } #[derive(Debug)] @@ -240,6 +252,7 @@ impl Server { throughput_throttle, shutdown.clone(), child_labels, + config.flush_every, ); handles.push(tokio::spawn(child.spin())); @@ -290,6 +303,7 @@ struct Child { throttle: BlockThrottle, shutdown: lading_signal::Watcher, labels: Vec<(String, String)>, + flush_every: Option, } impl Child { @@ -303,6 +317,7 @@ impl Child { throttle: BlockThrottle, shutdown: lading_signal::Watcher, labels: Vec<(String, String)>, + flush_every: Option, ) -> Self { let mut names = Vec::with_capacity((total_rotations + 1).into()); names.push(PathBuf::from(basename)); @@ -325,6 +340,7 @@ impl Child { throttle, shutdown, labels, + flush_every, } } @@ -335,6 +351,7 @@ impl Child { .maximum_capacity_bytes(self.maximum_block_size); let mut total_bytes_written: u64 = 0; let maximum_bytes_per_log: u64 = u64::from(self.maximum_bytes_per_log.get()); + let mut last_flush = Instant::now(); let total_names = self.names.len(); // SAFETY: By construction there is guaranteed to be at least one name. @@ -374,7 +391,7 @@ impl Child { result = self.throttle.wait_for_block(&self.block_cache, &handle) => { match result { Ok(()) => { - write_bytes(self.block_cache.advance(&mut handle), + let did_rotate = write_bytes(self.block_cache.advance(&mut handle), &mut fp, &mut total_bytes_written, buffer_capacity, @@ -383,6 +400,12 @@ impl Child { &self.names, last_name, &self.labels).await?; + if did_rotate { + last_flush = Instant::now(); + } else if self.flush_every.is_some_and(|d| last_flush.elapsed() >= d) { + fp.flush().await.map_err(|err| Error::IoFlush { err })?; + last_flush = Instant::now(); + } } Err(err) => { error!("Discarding block due to throttle error: {err}"); @@ -401,6 +424,8 @@ impl Child { } } +/// Writes a block to the file, rotating if necessary. +/// Returns `true` if a rotation occurred (and thus the file was flushed). #[allow(clippy::too_many_arguments)] async fn write_bytes( blk: &block::Block, @@ -412,7 +437,7 @@ async fn write_bytes( names: &[PathBuf], last_name: &Path, labels: &[(String, String)], -) -> Result<(), Error> { +) -> Result { let total_bytes = u64::from(blk.total_bytes.get()); { @@ -479,7 +504,8 @@ async fn write_bytes( })?, ); *total_bytes_written = 0; + return Ok(true); } - Ok(()) + Ok(false) } diff --git a/lading/src/generator/file_gen/traditional.rs b/lading/src/generator/file_gen/traditional.rs index bc55f15b5..3614b8b46 100644 --- a/lading/src/generator/file_gen/traditional.rs +++ b/lading/src/generator/file_gen/traditional.rs @@ -20,8 +20,11 @@ use std::{ Arc, atomic::{AtomicU32, Ordering}, }, + time::Duration, }; +use tokio::time::Instant; + use byte_unit::Byte; use metrics::counter; use rand::{SeedableRng, prelude::StdRng}; @@ -78,6 +81,11 @@ fn default_rotation() -> bool { true } +#[allow(clippy::unnecessary_wraps)] +fn default_flush_every() -> Option { + Some(Duration::from_secs(1)) +} + #[derive(Debug, Deserialize, Serialize, PartialEq, Clone)] #[serde(deny_unknown_fields)] /// Configuration of [`FileGen`] @@ -121,6 +129,10 @@ pub struct Config { rotate: bool, /// The load throttle configuration pub throttle: Option, + /// Force flush at a regular interval. Defaults to 1 second. + /// Accepts human-readable durations (e.g., "1s", "500ms", "2s"). + #[serde(default = "default_flush_every", with = "humantime_serde")] + pub flush_every: Option, } #[derive(Debug)] @@ -200,6 +212,7 @@ impl Server { file_index: Arc::clone(&file_index), rotate: config.rotate, shutdown: shutdown.clone(), + flush_every: config.flush_every, }; handles.spawn(child.spin()); @@ -275,6 +288,7 @@ struct Child { rotate: bool, file_index: Arc, shutdown: lading_signal::Watcher, + flush_every: Option, } impl Child { @@ -287,8 +301,6 @@ impl Child { let mut path = path_from_template(&self.path_template, file_index); let mut handle = self.block_cache.handle(); - // Setting write buffer capacity (per second) approximately equal to the throttle's maximum capacity - // (converted to bytes if necessary) to approximate flush every second. let buffer_capacity = self .throttle .maximum_capacity_bytes(self.maximum_block_size); @@ -312,6 +324,7 @@ impl Child { ); let shutdown_wait = self.shutdown.recv(); tokio::pin!(shutdown_wait); + let mut last_flush = Instant::now(); loop { tokio::select! { result = self.throttle.wait_for_block(&self.block_cache, &handle) => { @@ -328,6 +341,7 @@ impl Child { if total_bytes_written > maximum_bytes_per_file { fp.flush().await?; + last_flush = Instant::now(); if self.rotate { // Delete file, leaving any open file handlers intact. This // includes our own `fp` for the time being. @@ -355,6 +369,9 @@ impl Child { })?, ); total_bytes_written = 0; + } else if self.flush_every.is_some_and(|d| last_flush.elapsed() >= d) { + fp.flush().await?; + last_flush = Instant::now(); } } Err(err) => {