Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,16 @@ pub struct FilesystemSpec {
/// Default: 4096
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub block_size: u64,

/// Maximum number of concurrent write operations allowed.
/// Each write involves streaming data to a temp file and calling sync_all(),
/// which can saturate disk I/O when many writes happen simultaneously.
/// Limiting concurrency prevents disk saturation from blocking the async
/// runtime.
/// A value of 0 means unlimited (no concurrency limit).
/// Default: 0
#[serde(default)]
pub max_concurrent_writes: usize,
}

// NetApp ONTAP S3 Spec
Expand Down
33 changes: 33 additions & 0 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use nativelink_util::store_trait::{
RemoveItemCallback, StoreDriver, StoreKey, StoreKeyBorrow, StoreOptimizations, UploadSizeInfo,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt, Take};
use tokio::sync::Semaphore;
use tokio_stream::wrappers::ReadDirStream;
use tracing::{debug, error, info, trace, warn};

Expand Down Expand Up @@ -636,6 +637,8 @@ pub struct FilesystemStore<Fe: FileEntry = FileEntryImpl> {
read_buffer_size: usize,
weak_self: Weak<Self>,
rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
/// Limits concurrent write operations to prevent disk I/O saturation.
write_semaphore: Option<Semaphore>,
}

impl<Fe: FileEntry> FilesystemStore<Fe> {
Expand Down Expand Up @@ -693,13 +696,19 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
} else {
spec.read_buffer_size as usize
};
let write_semaphore = if spec.max_concurrent_writes > 0 {
Some(Semaphore::new(spec.max_concurrent_writes))
} else {
None
};
Ok(Arc::new_cyclic(|weak_self| Self {
shared_context,
evicting_map,
block_size,
read_buffer_size,
weak_self: weak_self.clone(),
rename_fn,
write_semaphore,
}))
}

Expand Down Expand Up @@ -749,12 +758,24 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
data_size += data_len as u64;
}

let _permit = if let Some(sem) = &self.write_semaphore {
Some(
sem.acquire()
.await
.map_err(|_| make_err!(Code::Internal, "Write semaphore closed"))?,
)
} else {
None
};

temp_file
.as_ref()
.sync_all()
.await
.err_tip(|| "Failed to sync_data in filesystem store")?;

drop(_permit);

temp_file.advise_dontneed();
trace!(?temp_file, "Dropping file to update_file");
drop(temp_file);
Expand Down Expand Up @@ -952,12 +973,24 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
.err_tip(|| format!("Failed to write data to {}", temp_full_path.display()))?;
}

let _permit = if let Some(sem) = &self.write_semaphore {
Some(
sem.acquire()
.await
.map_err(|_| make_err!(Code::Internal, "Write semaphore closed"))?,
)
} else {
None
};

temp_file
.as_ref()
.sync_all()
.await
.err_tip(|| "Failed to sync_data in filesystem store update_oneshot")?;

drop(_permit);

temp_file.advise_dontneed();
drop(temp_file);

Expand Down
2 changes: 2 additions & 0 deletions nativelink-store/tests/filesystem_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ async fn file_continues_to_stream_on_content_replace_test() -> Result<(), Error>
}),
block_size: 1,
read_buffer_size: 1,
..Default::default()
})
.await?,
);
Expand Down Expand Up @@ -528,6 +529,7 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> {
}),
block_size: 1,
read_buffer_size: 1,
..Default::default()
})
.await?,
);
Expand Down
Loading