From 2e4bf966faf42fc0fc2ef44fdb6bacc9ab56f286 Mon Sep 17 00:00:00 2001 From: Brayo Date: Fri, 12 Jun 2026 13:43:31 +0300 Subject: [PATCH 1/2] perf(datastore): enable WAL journal mode with synchronous=FULL In delete mode every batch commit paid two fsyncs plus rollback-journal file create/delete churn; in WAL a commit is a single sequential WAL append+fsync. It also stops commits from blocking future reader connections, which is groundwork for serving reads in parallel with the writer. synchronous=FULL is set explicitly so each commit is durable the moment it returns, same as the delete-mode default. The overall durability window is unchanged: the worker already batches events into one transaction committed every 15 s / 100 events / bucket change, and that policy is untouched. In-memory datastores (tests, --no-db) ignore the pragma and keep journal_mode=memory. Verified on a 98 MB production snapshot: WAL active, clean checkpoint on shutdown, integrity ok, heartbeat throughput +4% (5572 -> 5815/s), reads unchanged. --- aw-datastore/src/worker.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/aw-datastore/src/worker.rs b/aw-datastore/src/worker.rs index 4df7ef4d..c41e639b 100644 --- a/aw-datastore/src/worker.rs +++ b/aw-datastore/src/worker.rs @@ -138,6 +138,24 @@ impl DatastoreWorker { conn } }; + + // WAL turns each commit into a single sequential WAL append+fsync where + // delete mode paid two fsyncs plus journal-file churn, and lets future + // reader connections proceed while a commit is in flight. + // synchronous=FULL is set explicitly (rather than relying on the + // default) so a commit remains durable on disk the moment it returns; + // with NORMAL the WAL is only synced at checkpoints, which would + // silently widen the loss window on power failure. + // In-memory databases ignore the request (journal_mode stays "memory"). + let journal_mode: String = conn + .pragma_update_and_check(None, "journal_mode", "WAL", |row| row.get(0)) + .expect("Failed to query journal_mode"); + if !matches!(&method, DatastoreMethod::Memory()) && journal_mode != "wal" { + warn!("Failed to enable WAL (journal_mode={journal_mode}), continuing without it"); + } + conn.pragma_update(None, "synchronous", "FULL") + .expect("Failed to set synchronous=FULL"); + let mut ds = DatastoreInstance::new(&conn, true).unwrap(); // Ensure legacy import From 30c914bf549f9532481349270e7ddb431755b891 Mon Sep 17 00:00:00 2001 From: Brayo Date: Fri, 12 Jun 2026 14:13:12 +0300 Subject: [PATCH 2/2] fix(datastore): ack ForceCommit/Close only after the transaction commits The worker acked ForceCommit and Close inside the request loop and only committed the batch transaction after breaking out of it, so callers were told their data was committed before it was. Under the rollback journal this was masked by file locking, but with WAL readers never block on the writer: a caller could force_commit, reopen the database, and read a pre-commit snapshot. This is exactly how test_datastore_reload failed on the ubuntu CI runner (bucket metadata cache populated from a snapshot taken before the forced commit landed). Hold the ack for these two commands until tx.commit() returns, and propagate a commit failure to the caller as an error instead of a silent success. Heartbeats and other commands are still acked immediately, since they must not wait for the batch commit. --- aw-datastore/src/worker.rs | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/aw-datastore/src/worker.rs b/aw-datastore/src/worker.rs index c41e639b..e9481da1 100644 --- a/aw-datastore/src/worker.rs +++ b/aw-datastore/src/worker.rs @@ -197,6 +197,15 @@ impl DatastoreWorker { self.uncommitted_events = 0; self.commit = false; + // ForceCommit and Close promise the caller that their data is + // committed, so their acks are held back until the transaction + // below has actually committed. Acking first (as before) let a + // caller reopen the database and read a pre-commit snapshot — + // harmless under the rollback journal's locking, but a real race + // in WAL mode where readers never block on the writer. + // All other commands are acked immediately: a watcher heartbeat + // must not wait up to 15 s for the batch commit. + let mut deferred_ack = None; loop { let (request, response_sender) = match self.responder.poll() { Ok((req, res_sender)) => (req, res_sender), @@ -207,7 +216,13 @@ impl DatastoreWorker { break; } }; + let ack_after_commit = matches!(request, Command::ForceCommit() | Command::Close()); let response = self.handle_request(request, &mut ds, &tx); + if ack_after_commit { + // Both commands force a commit, so the loop ends here. + deferred_ack = Some((response_sender, response)); + break; + } response_sender.respond(response); let now: DateTime = Utc::now(); @@ -225,7 +240,11 @@ impl DatastoreWorker { self.commit, self.uncommitted_events ); match tx.commit() { - Ok(_) => (), + Ok(_) => { + if let Some((sender, response)) = deferred_ack.take() { + sender.respond(response); + } + } Err(err) => { error!( "Failed to commit datastore transaction ({} events lost): {err}", @@ -237,6 +256,11 @@ impl DatastoreWorker { // know to retry. Rolled-back events create a gap in the timeline; // watchers will resume sending heartbeats from current state, but the // specific batch of events is permanently lost. + if let Some((sender, _)) = deferred_ack.take() { + sender.respond(Err(DatastoreError::InternalError(format!( + "Failed to commit datastore transaction: {err}" + )))); + } } } if self.quit {