diff --git a/aw-datastore/src/worker.rs b/aw-datastore/src/worker.rs index 4df7ef4d..e9481da1 100644 --- a/aw-datastore/src/worker.rs +++ b/aw-datastore/src/worker.rs @@ -138,6 +138,24 @@ impl DatastoreWorker { conn } }; + + // WAL turns each commit into a single sequential WAL append+fsync where + // delete mode paid two fsyncs plus journal-file churn, and lets future + // reader connections proceed while a commit is in flight. + // synchronous=FULL is set explicitly (rather than relying on the + // default) so a commit remains durable on disk the moment it returns; + // with NORMAL the WAL is only synced at checkpoints, which would + // silently widen the loss window on power failure. + // In-memory databases ignore the request (journal_mode stays "memory"). + let journal_mode: String = conn + .pragma_update_and_check(None, "journal_mode", "WAL", |row| row.get(0)) + .expect("Failed to query journal_mode"); + if !matches!(&method, DatastoreMethod::Memory()) && journal_mode != "wal" { + warn!("Failed to enable WAL (journal_mode={journal_mode}), continuing without it"); + } + conn.pragma_update(None, "synchronous", "FULL") + .expect("Failed to set synchronous=FULL"); + let mut ds = DatastoreInstance::new(&conn, true).unwrap(); // Ensure legacy import @@ -179,6 +197,15 @@ impl DatastoreWorker { self.uncommitted_events = 0; self.commit = false; + // ForceCommit and Close promise the caller that their data is + // committed, so their acks are held back until the transaction + // below has actually committed. Acking first (as before) let a + // caller reopen the database and read a pre-commit snapshot — + // harmless under the rollback journal's locking, but a real race + // in WAL mode where readers never block on the writer. + // All other commands are acked immediately: a watcher heartbeat + // must not wait up to 15 s for the batch commit. + let mut deferred_ack = None; loop { let (request, response_sender) = match self.responder.poll() { Ok((req, res_sender)) => (req, res_sender), @@ -189,7 +216,13 @@ impl DatastoreWorker { break; } }; + let ack_after_commit = matches!(request, Command::ForceCommit() | Command::Close()); let response = self.handle_request(request, &mut ds, &tx); + if ack_after_commit { + // Both commands force a commit, so the loop ends here. + deferred_ack = Some((response_sender, response)); + break; + } response_sender.respond(response); let now: DateTime = Utc::now(); @@ -207,7 +240,11 @@ impl DatastoreWorker { self.commit, self.uncommitted_events ); match tx.commit() { - Ok(_) => (), + Ok(_) => { + if let Some((sender, response)) = deferred_ack.take() { + sender.respond(response); + } + } Err(err) => { error!( "Failed to commit datastore transaction ({} events lost): {err}", @@ -219,6 +256,11 @@ impl DatastoreWorker { // know to retry. Rolled-back events create a gap in the timeline; // watchers will resume sending heartbeats from current state, but the // specific batch of events is permanently lost. + if let Some((sender, _)) = deferred_ack.take() { + sender.respond(Err(DatastoreError::InternalError(format!( + "Failed to commit datastore transaction: {err}" + )))); + } } } if self.quit {