Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lading/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ reqwest = { version = "0.12", default-features = false, features = [
rustc-hash = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
humantime-serde = { version = "1.1" }
serde_qs = { version = "0.15", default-features = false }
serde_yaml = { version = "0.9" }
sha2 = { workspace = true, features = ["std"] }
Expand Down
32 changes: 29 additions & 3 deletions lading/src/generator/file_gen/logrotate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ use std::{
num::NonZeroU32,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};

use tokio::time::Instant;

use byte_unit::Byte;
use futures::future::join_all;
use metrics::counter;
Expand Down Expand Up @@ -117,6 +120,11 @@ pub enum Error {
ThrottleConversion(#[from] ThrottleConversionError),
}

#[allow(clippy::unnecessary_wraps)]
fn default_flush_every() -> Option<Duration> {
Some(Duration::from_secs(1))
}

#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)]
#[serde(deny_unknown_fields)]
/// Configuration of [`FileGen`]
Expand Down Expand Up @@ -150,6 +158,10 @@ pub struct Config {
/// Throughput profile controlling emission rate (bytes or blocks).
#[serde(default)]
pub throttle: Option<ThrottleConfig>,
/// Force flush at a regular interval. Defaults to 1 second.
/// Accepts human-readable durations (e.g., "1s", "500ms", "2s").
#[serde(default = "default_flush_every", with = "humantime_serde")]
pub flush_every: Option<Duration>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -240,6 +252,7 @@ impl Server {
throughput_throttle,
shutdown.clone(),
child_labels,
config.flush_every,
);

handles.push(tokio::spawn(child.spin()));
Expand Down Expand Up @@ -290,6 +303,7 @@ struct Child {
throttle: BlockThrottle,
shutdown: lading_signal::Watcher,
labels: Vec<(String, String)>,
flush_every: Option<Duration>,
}

impl Child {
Expand All @@ -303,6 +317,7 @@ impl Child {
throttle: BlockThrottle,
shutdown: lading_signal::Watcher,
labels: Vec<(String, String)>,
flush_every: Option<Duration>,
) -> Self {
let mut names = Vec::with_capacity((total_rotations + 1).into());
names.push(PathBuf::from(basename));
Expand All @@ -325,6 +340,7 @@ impl Child {
throttle,
shutdown,
labels,
flush_every,
}
}

Expand All @@ -335,6 +351,7 @@ impl Child {
.maximum_capacity_bytes(self.maximum_block_size);
let mut total_bytes_written: u64 = 0;
let maximum_bytes_per_log: u64 = u64::from(self.maximum_bytes_per_log.get());
let mut last_flush = Instant::now();

let total_names = self.names.len();
// SAFETY: By construction there is guaranteed to be at least one name.
Expand Down Expand Up @@ -374,7 +391,7 @@ impl Child {
result = self.throttle.wait_for_block(&self.block_cache, &handle) => {
match result {
Ok(()) => {
write_bytes(self.block_cache.advance(&mut handle),
let did_rotate = write_bytes(self.block_cache.advance(&mut handle),
&mut fp,
&mut total_bytes_written,
buffer_capacity,
Expand All @@ -383,6 +400,12 @@ impl Child {
&self.names,
last_name,
&self.labels).await?;
if did_rotate {
last_flush = Instant::now();
} else if self.flush_every.is_some_and(|d| last_flush.elapsed() >= d) {
fp.flush().await.map_err(|err| Error::IoFlush { err })?;
last_flush = Instant::now();
}
}
Err(err) => {
error!("Discarding block due to throttle error: {err}");
Expand All @@ -401,6 +424,8 @@ impl Child {
}
}

/// Writes a block to the file, rotating if necessary.
/// Returns `true` if a rotation occurred (and thus the file was flushed).
#[allow(clippy::too_many_arguments)]
async fn write_bytes(
blk: &block::Block,
Expand All @@ -412,7 +437,7 @@ async fn write_bytes(
names: &[PathBuf],
last_name: &Path,
labels: &[(String, String)],
) -> Result<(), Error> {
) -> Result<bool, Error> {
let total_bytes = u64::from(blk.total_bytes.get());

{
Expand Down Expand Up @@ -479,7 +504,8 @@ async fn write_bytes(
})?,
);
*total_bytes_written = 0;
return Ok(true);
}

Ok(())
Ok(false)
}
21 changes: 19 additions & 2 deletions lading/src/generator/file_gen/traditional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ use std::{
Arc,
atomic::{AtomicU32, Ordering},
},
time::Duration,
};

use tokio::time::Instant;

use byte_unit::Byte;
use metrics::counter;
use rand::{SeedableRng, prelude::StdRng};
Expand Down Expand Up @@ -78,6 +81,11 @@ fn default_rotation() -> bool {
true
}

#[allow(clippy::unnecessary_wraps)]
fn default_flush_every() -> Option<Duration> {
Some(Duration::from_secs(1))
}

#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)]
#[serde(deny_unknown_fields)]
/// Configuration of [`FileGen`]
Expand Down Expand Up @@ -121,6 +129,10 @@ pub struct Config {
rotate: bool,
/// The load throttle configuration
pub throttle: Option<ThrottleConfig>,
/// Force flush at a regular interval. Defaults to 1 second.
/// Accepts human-readable durations (e.g., "1s", "500ms", "2s").
#[serde(default = "default_flush_every", with = "humantime_serde")]
pub flush_every: Option<Duration>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -200,6 +212,7 @@ impl Server {
file_index: Arc::clone(&file_index),
rotate: config.rotate,
shutdown: shutdown.clone(),
flush_every: config.flush_every,
};

handles.spawn(child.spin());
Expand Down Expand Up @@ -275,6 +288,7 @@ struct Child {
rotate: bool,
file_index: Arc<AtomicU32>,
shutdown: lading_signal::Watcher,
flush_every: Option<Duration>,
}

impl Child {
Expand All @@ -287,8 +301,6 @@ impl Child {
let mut path = path_from_template(&self.path_template, file_index);

let mut handle = self.block_cache.handle();
// Setting write buffer capacity (per second) approximately equal to the throttle's maximum capacity
// (converted to bytes if necessary) to approximate flush every second.
let buffer_capacity = self
.throttle
.maximum_capacity_bytes(self.maximum_block_size);
Expand All @@ -312,6 +324,7 @@ impl Child {
);
let shutdown_wait = self.shutdown.recv();
tokio::pin!(shutdown_wait);
let mut last_flush = Instant::now();
loop {
tokio::select! {
result = self.throttle.wait_for_block(&self.block_cache, &handle) => {
Expand All @@ -328,6 +341,7 @@ impl Child {

if total_bytes_written > maximum_bytes_per_file {
fp.flush().await?;
last_flush = Instant::now();
if self.rotate {
// Delete file, leaving any open file handlers intact. This
// includes our own `fp` for the time being.
Expand Down Expand Up @@ -355,6 +369,9 @@ impl Child {
})?,
);
total_bytes_written = 0;
} else if self.flush_every.is_some_and(|d| last_flush.elapsed() >= d) {
fp.flush().await?;
last_flush = Instant::now();
}
}
Err(err) => {
Expand Down
Loading