Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion aw-datastore/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,24 @@ impl DatastoreWorker {
conn
}
};

// WAL turns each commit into a single sequential WAL append+fsync where
// delete mode paid two fsyncs plus journal-file churn, and lets future
// reader connections proceed while a commit is in flight.
// synchronous=FULL is set explicitly (rather than relying on the
// default) so a commit remains durable on disk the moment it returns;
// with NORMAL the WAL is only synced at checkpoints, which would
// silently widen the loss window on power failure.
// In-memory databases ignore the request (journal_mode stays "memory").
let journal_mode: String = conn
.pragma_update_and_check(None, "journal_mode", "WAL", |row| row.get(0))
.expect("Failed to query journal_mode");
if !matches!(&method, DatastoreMethod::Memory()) && journal_mode != "wal" {
warn!("Failed to enable WAL (journal_mode={journal_mode}), continuing without it");
}
conn.pragma_update(None, "synchronous", "FULL")
.expect("Failed to set synchronous=FULL");

let mut ds = DatastoreInstance::new(&conn, true).unwrap();

// Ensure legacy import
Expand Down Expand Up @@ -179,6 +197,15 @@ impl DatastoreWorker {

self.uncommitted_events = 0;
self.commit = false;
// ForceCommit and Close promise the caller that their data is
// committed, so their acks are held back until the transaction
// below has actually committed. Acking first (as before) let a
// caller reopen the database and read a pre-commit snapshot —
// harmless under the rollback journal's locking, but a real race
// in WAL mode where readers never block on the writer.
// All other commands are acked immediately: a watcher heartbeat
// must not wait up to 15 s for the batch commit.
let mut deferred_ack = None;
loop {
let (request, response_sender) = match self.responder.poll() {
Ok((req, res_sender)) => (req, res_sender),
Expand All @@ -189,7 +216,13 @@ impl DatastoreWorker {
break;
}
};
let ack_after_commit = matches!(request, Command::ForceCommit() | Command::Close());
let response = self.handle_request(request, &mut ds, &tx);
if ack_after_commit {
// Both commands force a commit, so the loop ends here.
deferred_ack = Some((response_sender, response));
break;
}
response_sender.respond(response);

let now: DateTime<Utc> = Utc::now();
Expand All @@ -207,7 +240,11 @@ impl DatastoreWorker {
self.commit, self.uncommitted_events
);
match tx.commit() {
Ok(_) => (),
Ok(_) => {
if let Some((sender, response)) = deferred_ack.take() {
sender.respond(response);
}
}
Err(err) => {
error!(
"Failed to commit datastore transaction ({} events lost): {err}",
Expand All @@ -219,6 +256,11 @@ impl DatastoreWorker {
// know to retry. Rolled-back events create a gap in the timeline;
// watchers will resume sending heartbeats from current state, but the
// specific batch of events is permanently lost.
if let Some((sender, _)) = deferred_ack.take() {
sender.respond(Err(DatastoreError::InternalError(format!(
"Failed to commit datastore transaction: {err}"
))));
}
}
}
if self.quit {
Expand Down
Loading