Skip to content

Commit 5191e9a

Browse files
committed
Optimizations: set_len to max file size to save up on metadata updates; sync_file_range instead of all file
1 parent 8a80ddd commit 5191e9a

12 files changed

Lines changed: 504 additions & 55 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ repository = "https://github.com/sweet-security/candystore"
1010
[dependencies]
1111
crc16-ibm3740-fast = "0.5.0"
1212
fslock = "0.2.1"
13+
libc = "0.2.183"
1314
memmap2 = "0.9.10"
1415
num_cpus = "1.17.0"
1516
parking_lot = "0.12.5"
@@ -26,7 +27,6 @@ zerocopy = { version = "0.8.47", features = ["derive"] }
2627
proptest = "1.10.0"
2728
tempfile = "3"
2829
rand = "0.10.0"
29-
libc = "0.2.183"
3030

3131
[features]
3232
whitebox-testing = []

src/data_file.rs

Lines changed: 130 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::internal::{
1717
DATA_ENTRY_OFFSET_MAGIC, DATA_ENTRY_OFFSET_MASK, DATA_FILE_SIGNATURE, DATA_FILE_VERSION,
1818
EntryType, FILE_OFFSET_ALIGNMENT, KEY_NAMESPACE_BITS, KVBuf, KVRef, KeyNamespace,
1919
MAX_KEY_NAMESPACE, PAGE_SIZE, READ_BUFFER_SIZE, SIZE_HINT_UNIT, data_file_path,
20-
invalid_data_error, read_available_at, read_into_at, sync_dir, write_all_at,
20+
invalid_data_error, read_available_at, read_into_at, sync_dir, sync_file_range, write_all_at,
2121
};
2222
use crate::types::{Config, Error, MAX_USER_KEY_SIZE, MAX_USER_VALUE_SIZE, Result};
2323

@@ -197,24 +197,124 @@ impl Drop for InflightGuard<'_> {
197197
pub(crate) struct DataFile {
198198
pub(crate) file: File,
199199
file_offset: AtomicU64,
200+
last_synced_offset: AtomicU64,
200201
sealed_for_rotation: AtomicBool,
201202
config: Arc<Config>,
202203
pub(crate) file_idx: u16,
203204
pub(crate) file_ordinal: u64,
205+
preallocated: bool,
206+
recovery_tail_upper_bound: u64,
204207
}
205208

206209
impl DataFile {
207210
pub(crate) fn used_bytes(&self) -> u64 {
208211
self.file_offset.load(Ordering::Acquire)
209212
}
210213

214+
pub(crate) fn recovery_tail_upper_bound(&self) -> u64 {
215+
self.recovery_tail_upper_bound
216+
}
217+
218+
pub(crate) fn sync_data(&self, start_offset: u64, end_offset: u64) -> Result<()> {
219+
let used_bytes = self.used_bytes();
220+
let start_offset = start_offset.min(used_bytes);
221+
let end_offset = end_offset.min(used_bytes);
222+
if end_offset <= start_offset {
223+
return Ok(());
224+
}
225+
226+
if !self.preallocated {
227+
self.file.sync_all().map_err(Error::IOError)?;
228+
} else {
229+
sync_file_range(
230+
&self.file,
231+
size_of::<DataFileHeader>() as u64 + start_offset,
232+
end_offset - start_offset,
233+
)?;
234+
}
235+
self.last_synced_offset
236+
.fetch_max(end_offset, Ordering::Release);
237+
Ok(())
238+
}
239+
240+
pub(crate) fn sync_to_current(&self) -> Result<()> {
241+
let start = self.last_synced_offset.load(Ordering::Acquire);
242+
self.sync_data(start, self.used_bytes())
243+
}
244+
211245
pub(crate) fn truncate_to_offset(&self, file_offset: u64) -> Result<()> {
212246
debug_assert_eq!(file_offset % FILE_OFFSET_ALIGNMENT, 0);
213-
self.file
214-
.set_len(size_of::<DataFileHeader>() as u64 + file_offset)
215-
.map_err(Error::IOError)?;
247+
if self.preallocated {
248+
// A crash between the two set_len calls would leave the file
249+
// non-preallocated. That is harmless: the next open will
250+
// detect it as non-preallocated and fall back to sync_all
251+
// until rotation creates a fresh preallocated file.
252+
self.file
253+
.set_len(size_of::<DataFileHeader>() as u64 + file_offset)
254+
.map_err(Error::IOError)?;
255+
self.file
256+
.set_len(size_of::<DataFileHeader>() as u64 + self.config.max_data_file_size as u64)
257+
.map_err(Error::IOError)?;
258+
} else {
259+
self.file
260+
.set_len(size_of::<DataFileHeader>() as u64 + file_offset)
261+
.map_err(Error::IOError)?;
262+
}
216263
self.file_offset.store(file_offset, Ordering::Release);
217-
self.file.sync_all().map_err(Error::IOError)
264+
self.file.sync_all().map_err(Error::IOError)?;
265+
self.last_synced_offset
266+
.store(file_offset, Ordering::Release);
267+
Ok(())
268+
}
269+
270+
fn used_data_upper_bound(file: &File, physical_data_len: u64) -> Result<u64> {
271+
if physical_data_len == 0 {
272+
return Ok(0);
273+
}
274+
275+
let mut end = physical_data_len;
276+
while end > 0 {
277+
let start = end.saturating_sub(READ_BUFFER_SIZE as u64);
278+
let chunk = read_available_at(
279+
file,
280+
(end - start) as usize,
281+
size_of::<DataFileHeader>() as u64 + start,
282+
)
283+
.map_err(Error::IOError)?;
284+
if let Some(rel) = chunk.iter().rposition(|byte| *byte != 0) {
285+
let aligned = (start + rel as u64 + 1).next_multiple_of(FILE_OFFSET_ALIGNMENT);
286+
return Ok(aligned.min(physical_data_len));
287+
}
288+
end = start;
289+
}
290+
291+
Ok(0)
292+
}
293+
294+
/// Scans forward from offset 0, parsing each entry, and returns the
295+
/// aligned end of the last valid entry. We temporarily set `file_offset`
296+
/// to `tail_upper_bound` so that `read_next_entry_ref` won't short-circuit
297+
/// before reaching it. This is safe because `open` is single-threaded;
298+
/// the real value is overwritten by the caller immediately after.
299+
fn detect_used_bytes(&self, tail_upper_bound: u64) -> Result<u64> {
300+
if tail_upper_bound == 0 {
301+
return Ok(0);
302+
}
303+
304+
self.file_offset.store(tail_upper_bound, Ordering::Release);
305+
306+
let mut offset = 0u64;
307+
let mut read_buf = Vec::new();
308+
let mut buf_file_offset = 0u64;
309+
let mut last_durable_offset = 0u64;
310+
while let Some((_, _, next_offset)) =
311+
self.read_next_entry_ref(offset, &mut read_buf, &mut buf_file_offset)?
312+
{
313+
offset = next_offset;
314+
last_durable_offset = next_offset.next_multiple_of(FILE_OFFSET_ALIGNMENT);
315+
}
316+
317+
Ok(last_durable_offset)
218318
}
219319

220320
fn parse_data_entry(buf: &[u8], offset: u64) -> Result<ParsedDataEntry> {
@@ -296,23 +396,30 @@ impl DataFile {
296396
"invalid data file header",
297397
)));
298398
}
299-
let mut file_offset = file
399+
let physical_data_len = file
300400
.metadata()
301401
.map_err(Error::IOError)?
302402
.len()
303403
.saturating_sub(size_of::<DataFileHeader>() as u64);
304-
file_offset -= file_offset % FILE_OFFSET_ALIGNMENT;
305-
file.set_len(size_of::<DataFileHeader>() as u64 + file_offset)
306-
.map_err(Error::IOError)?;
404+
let preallocated = physical_data_len == config.max_data_file_size as u64;
405+
let recovery_tail_upper_bound = Self::used_data_upper_bound(&file, physical_data_len)?;
307406

308-
Ok(Self {
407+
let inst = Self {
309408
file,
310-
file_offset: AtomicU64::new(file_offset),
409+
file_offset: AtomicU64::new(physical_data_len),
410+
last_synced_offset: AtomicU64::new(0),
311411
sealed_for_rotation: AtomicBool::new(false),
312412
config,
313413
file_idx,
314414
file_ordinal: header.ordinal,
315-
})
415+
preallocated,
416+
recovery_tail_upper_bound,
417+
};
418+
let used_bytes = inst.detect_used_bytes(recovery_tail_upper_bound)?;
419+
inst.file_offset.store(used_bytes, Ordering::Release);
420+
inst.last_synced_offset.store(used_bytes, Ordering::Release);
421+
422+
Ok(inst)
316423
}
317424

318425
pub(crate) fn create(
@@ -328,7 +435,7 @@ impl DataFile {
328435
.write(true)
329436
.open(data_file_path(base_path, file_idx))
330437
.map_err(Error::IOError)?;
331-
file.set_len(size_of::<DataFileHeader>() as u64)
438+
file.set_len(size_of::<DataFileHeader>() as u64 + config.max_data_file_size as u64)
332439
.map_err(Error::IOError)?;
333440
let header = DataFileHeader {
334441
magic: *DATA_FILE_SIGNATURE,
@@ -343,10 +450,13 @@ impl DataFile {
343450
Ok(Self {
344451
file,
345452
file_offset: AtomicU64::new(0),
453+
last_synced_offset: AtomicU64::new(0),
346454
sealed_for_rotation: AtomicBool::new(false),
347455
config,
348456
file_idx,
349457
file_ordinal: ordinal,
458+
preallocated: true,
459+
recovery_tail_upper_bound: 0,
350460
})
351461
}
352462

@@ -554,9 +664,16 @@ impl DataFile {
554664
read_buf: &'a mut Vec<u8>,
555665
buf_file_offset: &mut u64,
556666
) -> Result<Option<(KVRef<'a>, u64, u64)>> {
667+
let used_bytes = self.used_bytes();
668+
if offset >= used_bytes {
669+
return Ok(None);
670+
}
557671
offset = offset.next_multiple_of(FILE_OFFSET_ALIGNMENT);
558672

559673
loop {
674+
if offset >= used_bytes {
675+
return Ok(None);
676+
}
560677
let buf_start = if offset >= *buf_file_offset {
561678
(offset - *buf_file_offset) as usize
562679
} else {

src/internal.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,50 @@ pub(crate) fn sync_dir(_path: &Path) -> Result<()> {
7070
Ok(())
7171
}
7272

73+
#[cfg(target_os = "linux")]
74+
pub(crate) fn sync_file_range(file: &File, offset: u64, len: u64) -> Result<()> {
75+
use std::os::fd::AsRawFd;
76+
77+
if len == 0 {
78+
return Ok(());
79+
}
80+
81+
let sync_offset = i64::try_from(offset)
82+
.map_err(|_| Error::IOError(std::io::Error::other("sync offset overflow")))?;
83+
let sync_len = i64::try_from(len)
84+
.map_err(|_| Error::IOError(std::io::Error::other("sync length overflow")))?;
85+
86+
let rc = unsafe {
87+
libc::sync_file_range(
88+
file.as_raw_fd(),
89+
sync_offset,
90+
sync_len,
91+
libc::SYNC_FILE_RANGE_WAIT_BEFORE
92+
| libc::SYNC_FILE_RANGE_WRITE
93+
| libc::SYNC_FILE_RANGE_WAIT_AFTER,
94+
)
95+
};
96+
if rc == 0 {
97+
return Ok(());
98+
}
99+
100+
let err = std::io::Error::last_os_error();
101+
match err.raw_os_error() {
102+
Some(libc::EINVAL | libc::ENOSYS | libc::EOPNOTSUPP) => {
103+
file.sync_all().map_err(Error::IOError)
104+
}
105+
_ => Err(Error::IOError(err)),
106+
}
107+
}
108+
109+
#[cfg(not(target_os = "linux"))]
110+
pub(crate) fn sync_file_range(file: &File, _offset: u64, len: u64) -> Result<()> {
111+
if len == 0 {
112+
return Ok(());
113+
}
114+
file.sync_all().map_err(Error::IOError)
115+
}
116+
73117
pub(crate) fn parse_data_file_idx(path: &Path) -> Option<u16> {
74118
let name = path.file_name()?.to_str()?;
75119
let suffix = name.strip_prefix("data_")?;

src/store.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ struct CheckpointSnapshot {
8080
checkpoint_offset: u64,
8181
checkpointed_delta: i64,
8282
last_commit_ordinal: u64,
83+
last_commit_offset: u64,
8384
}
8485

8586
#[derive(Default)]
@@ -465,20 +466,23 @@ impl StoreInner {
465466
.ok_or(Error::MissingDataFile(active_idx))?;
466467
let (checkpoint_ordinal, checkpoint_offset, checkpointed_delta) =
467468
self.inflight_tracker.checkpoint_progress(&active_file);
468-
let last_commit_ordinal = self.index_file.checkpoint_cursor().0;
469+
let (last_commit_ordinal, last_commit_offset) = self.index_file.checkpoint_cursor();
469470
Ok(CheckpointSnapshot {
470471
checkpoint_ordinal,
471472
checkpoint_offset,
472473
checkpointed_delta,
473474
last_commit_ordinal,
475+
last_commit_offset,
474476
})
475477
}
476478

477479
fn sync_checkpoint(&self, snap: CheckpointSnapshot) -> Result<()> {
478480
let files = self.data_files.read();
479481
for data_file in files.values() {
480-
if data_file.file_ordinal >= snap.last_commit_ordinal {
481-
data_file.file.sync_all().map_err(Error::IOError)?;
482+
if data_file.file_ordinal > snap.last_commit_ordinal {
483+
data_file.sync_to_current()?;
484+
} else if data_file.file_ordinal == snap.last_commit_ordinal {
485+
data_file.sync_data(snap.last_commit_offset, data_file.used_bytes())?;
482486
}
483487
}
484488
drop(files);
@@ -1247,7 +1251,7 @@ impl CandyStore {
12471251
self.inner.index_file.sync_all()?;
12481252
let files = self.inner.data_files.read();
12491253
for data_file in files.values() {
1250-
data_file.file.sync_all().map_err(Error::IOError)?;
1254+
data_file.sync_to_current()?;
12511255
}
12521256
sync_dir(&self.inner.base_path)
12531257
}

src/store/checkpoint.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,10 +230,12 @@ impl StoreInner {
230230
) {
231231
let snapshot_for_follow_up = snapshot.as_ref().ok().copied();
232232
let result = snapshot.and_then(|snap| self.sync_checkpoint(snap));
233+
let mut should_signal_compaction = false;
233234

234235
let mut state = self.checkpoint_state.lock();
235236
match result {
236237
Ok(()) => {
238+
should_signal_compaction = true;
237239
state.last_checkpoint_dur_ms =
238240
u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX);
239241
if let Some(target_epoch) = target_epoch {
@@ -265,6 +267,11 @@ impl StoreInner {
265267
}
266268
}
267269
self.checkpoint_condvar.notify_all();
270+
drop(state);
271+
272+
if should_signal_compaction {
273+
self.signal_compaction_scan();
274+
}
268275
}
269276

270277
fn run_checkpoint_worker(self: &Arc<Self>) {

src/store/compaction.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ impl StoreInner {
238238
if !removed.is_empty() {
239239
let active_idx = self.active_file_idx.load(Ordering::Acquire);
240240
if let Some(active_file) = self.data_files.read().get(&active_idx).cloned() {
241-
let _ = active_file.file.sync_all();
241+
let _ = active_file.sync_to_current();
242242
}
243243
let _ = self.index_file.sync_all();
244244
}

src/store/recovery.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,8 @@ impl CandyStore {
9393
// extent. This handles the case where the data file was truncated
9494
// (e.g. disk-full or corruption) and ensures the replay loop won't
9595
// encounter stale pointers when comparing existing entries.
96-
let pre_rebuild_used_bytes = data_file.used_bytes();
97-
let pre_purge_extent = pre_rebuild_used_bytes.next_multiple_of(FILE_OFFSET_ALIGNMENT);
96+
let pre_rebuild_tail_upper_bound = data_file.recovery_tail_upper_bound();
97+
let pre_purge_extent = pre_rebuild_tail_upper_bound.next_multiple_of(FILE_OFFSET_ALIGNMENT);
9898
self.apply_recovery_delta(
9999
self.purge_uncommitted_file_entries(data_file.file_idx, pre_purge_extent)?,
100100
pending_committed_delta,
@@ -151,11 +151,11 @@ impl CandyStore {
151151

152152
let durable_extent = last_durable_offset.next_multiple_of(FILE_OFFSET_ALIGNMENT);
153153

154-
if durable_extent < pre_rebuild_used_bytes {
154+
if durable_extent < pre_rebuild_tail_upper_bound {
155155
self.inner
156156
.stats
157157
.num_rebuild_purged_bytes
158-
.fetch_add(pre_rebuild_used_bytes - durable_extent, Ordering::Relaxed);
158+
.fetch_add(pre_rebuild_tail_upper_bound - durable_extent, Ordering::Relaxed);
159159
data_file.truncate_to_offset(durable_extent)?;
160160
}
161161

0 commit comments

Comments
 (0)