From e0d778fc8d64f2e815029815d3af946b65436c87 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 30 May 2026 01:27:45 +0000 Subject: [PATCH] Fix JSON channel store flush race and missing rollback (#52) JsonFileChannelBindingStore / JsonFileChannelInstanceStore mutated in-memory state under the write lock, then released the lock before flushing the snapshot to disk. This allowed two concurrent writers' atomic_write calls to reorder (last write wins on disk, durably losing a change while memory still showed it), and a failed flush left the in-memory mutation in place with no rollback, so memory diverged from disk. Build the next state without touching the guard, flush it while still holding the write lock (serialising disk writes so they cannot reorder), and commit the in-memory mutation only after the flush succeeds. A failed flush now leaves both memory and disk unchanged. Applied to upsert / delete / delete_for_conversation on the binding store and upsert / delete on the instance store. Adds a regression test that forces a flush failure and asserts the in-memory state rolls back. --- crates/harness-store/src/json_file.rs | 110 +++++++++++++++++++------- 1 file changed, 81 insertions(+), 29 deletions(-) diff --git a/crates/harness-store/src/json_file.rs b/crates/harness-store/src/json_file.rs index 9fc250c..1ff84ed 100644 --- a/crates/harness-store/src/json_file.rs +++ b/crates/harness-store/src/json_file.rs @@ -1920,16 +1920,21 @@ impl JsonFileChannelBindingStore { impl ChannelBindingStore for JsonFileChannelBindingStore { async fn upsert(&self, binding: &ChannelBinding) -> Result<(), BoxError> { let mut guard = self.state.write().await; - if let Some(slot) = guard.iter_mut().find(|b| { + // Build the next state without touching `guard`, flush while still + // holding the lock (so concurrent disk writes can't reorder), and only + // commit the in-memory mutation once the flush succeeds — a failed + // flush leaves memory and disk consistent (both unchanged). + let mut next = guard.clone(); + if let Some(slot) = next.iter_mut().find(|b| { b.channel == binding.channel && b.channel_chat_id == binding.channel_chat_id }) { *slot = binding.clone(); } else { - guard.push(binding.clone()); + next.push(binding.clone()); } - let snapshot = guard.clone(); - drop(guard); - self.flush(&snapshot).await + self.flush(&next).await?; + *guard = next; + Ok(()) } async fn lookup( @@ -1954,29 +1959,28 @@ impl ChannelBindingStore for JsonFileChannelBindingStore { async fn delete(&self, channel: &str, channel_chat_id: &str) -> Result { let mut guard = self.state.write().await; - let before = guard.len(); - guard.retain(|b| !(b.channel == channel && b.channel_chat_id == channel_chat_id)); - let removed = before != guard.len(); - if !removed { + let mut next = guard.clone(); + let before = next.len(); + next.retain(|b| !(b.channel == channel && b.channel_chat_id == channel_chat_id)); + if before == next.len() { return Ok(false); } - let snapshot = guard.clone(); - drop(guard); - self.flush(&snapshot).await?; + self.flush(&next).await?; + *guard = next; Ok(true) } async fn delete_for_conversation(&self, conversation_id: &str) -> Result { let mut guard = self.state.write().await; - let before = guard.len(); - guard.retain(|b| b.conversation_id != conversation_id); - let removed = before - guard.len(); + let mut next = guard.clone(); + let before = next.len(); + next.retain(|b| b.conversation_id != conversation_id); + let removed = before - next.len(); if removed == 0 { return Ok(0); } - let snapshot = guard.clone(); - drop(guard); - self.flush(&snapshot).await?; + self.flush(&next).await?; + *guard = next; Ok(removed) } } @@ -2032,14 +2036,15 @@ impl harness_channel::ChannelInstanceStore for JsonFileChannelInstanceStore { instance: &harness_channel::ChannelInstance, ) -> Result<(), BoxError> { let mut guard = self.state.write().await; - if let Some(slot) = guard.iter_mut().find(|i| i.id == instance.id) { + let mut next = guard.clone(); + if let Some(slot) = next.iter_mut().find(|i| i.id == instance.id) { *slot = instance.clone(); } else { - guard.push(instance.clone()); + next.push(instance.clone()); } - let snapshot = guard.clone(); - drop(guard); - self.flush(&snapshot).await + self.flush(&next).await?; + *guard = next; + Ok(()) } async fn get( @@ -2059,14 +2064,14 @@ impl harness_channel::ChannelInstanceStore for JsonFileChannelInstanceStore { async fn delete(&self, id: &str) -> Result { let mut guard = self.state.write().await; - let before = guard.len(); - guard.retain(|i| i.id != id); - if before == guard.len() { + let mut next = guard.clone(); + let before = next.len(); + next.retain(|i| i.id != id); + if before == next.len() { return Ok(false); } - let snapshot = guard.clone(); - drop(guard); - self.flush(&snapshot).await?; + self.flush(&next).await?; + *guard = next; Ok(true) } } @@ -3029,4 +3034,51 @@ mod tests { let rows = store.list_for_channel("wecom").await.unwrap(); assert_eq!(rows.len(), 1); } + + // A failed flush must leave the in-memory state untouched so memory never + // diverges from disk (issue #52). We force the flush to fail by occupying + // the `atomic_write` temp path with a directory, which makes the temp-file + // write error out. + #[tokio::test] + async fn channel_binding_rolls_back_on_flush_failure() { + let dir = tempdir().unwrap(); + let store = JsonFileChannelBindingStore::open(dir.path()).unwrap(); + store + .upsert(&ChannelBinding::new("wecom", "g1", "conv-good", None, None)) + .await + .unwrap(); + + // Block the temp-file write: atomic_write targets `.tmp`. + let tmp = dir.path().join("channel_bindings.json.tmp"); + std::fs::create_dir(&tmp).unwrap(); + + // Overwriting an existing key must fail at flush and roll back. + let err = store + .upsert(&ChannelBinding::new("wecom", "g1", "conv-bad", None, None)) + .await; + assert!(err.is_err()); + // A brand-new key must also fail without being added. + let err = store + .upsert(&ChannelBinding::new("feishu", "f1", "conv-bad", None, None)) + .await; + assert!(err.is_err()); + + // In-memory state is unchanged: the good row survives, the bad ones + // never landed. + let got = store.lookup("wecom", "g1").await.unwrap().unwrap(); + assert_eq!(got.conversation_id, "conv-good"); + assert!(store.lookup("feishu", "f1").await.unwrap().is_none()); + + // Unblock and confirm a subsequent write commits cleanly. + std::fs::remove_dir(&tmp).unwrap(); + store + .upsert(&ChannelBinding::new("wecom", "g1", "conv-new", None, None)) + .await + .unwrap(); + let reopened = JsonFileChannelBindingStore::open(dir.path()).unwrap(); + assert_eq!( + reopened.lookup("wecom", "g1").await.unwrap().unwrap().conversation_id, + "conv-new" + ); + } }