diff --git a/crates/harness-store/src/json_file.rs b/crates/harness-store/src/json_file.rs index 9fc250c..1ff84ed 100644 --- a/crates/harness-store/src/json_file.rs +++ b/crates/harness-store/src/json_file.rs @@ -1920,16 +1920,21 @@ impl JsonFileChannelBindingStore { impl ChannelBindingStore for JsonFileChannelBindingStore { async fn upsert(&self, binding: &ChannelBinding) -> Result<(), BoxError> { let mut guard = self.state.write().await; - if let Some(slot) = guard.iter_mut().find(|b| { + // Build the next state without touching `guard`, flush while still + // holding the lock (so concurrent disk writes can't reorder), and only + // commit the in-memory mutation once the flush succeeds — a failed + // flush leaves memory and disk consistent (both unchanged). + let mut next = guard.clone(); + if let Some(slot) = next.iter_mut().find(|b| { b.channel == binding.channel && b.channel_chat_id == binding.channel_chat_id }) { *slot = binding.clone(); } else { - guard.push(binding.clone()); + next.push(binding.clone()); } - let snapshot = guard.clone(); - drop(guard); - self.flush(&snapshot).await + self.flush(&next).await?; + *guard = next; + Ok(()) } async fn lookup( @@ -1954,29 +1959,28 @@ impl ChannelBindingStore for JsonFileChannelBindingStore { async fn delete(&self, channel: &str, channel_chat_id: &str) -> Result { let mut guard = self.state.write().await; - let before = guard.len(); - guard.retain(|b| !(b.channel == channel && b.channel_chat_id == channel_chat_id)); - let removed = before != guard.len(); - if !removed { + let mut next = guard.clone(); + let before = next.len(); + next.retain(|b| !(b.channel == channel && b.channel_chat_id == channel_chat_id)); + if before == next.len() { return Ok(false); } - let snapshot = guard.clone(); - drop(guard); - self.flush(&snapshot).await?; + self.flush(&next).await?; + *guard = next; Ok(true) } async fn delete_for_conversation(&self, conversation_id: &str) -> Result { let mut guard = self.state.write().await; - let before = guard.len(); - guard.retain(|b| b.conversation_id != conversation_id); - let removed = before - guard.len(); + let mut next = guard.clone(); + let before = next.len(); + next.retain(|b| b.conversation_id != conversation_id); + let removed = before - next.len(); if removed == 0 { return Ok(0); } - let snapshot = guard.clone(); - drop(guard); - self.flush(&snapshot).await?; + self.flush(&next).await?; + *guard = next; Ok(removed) } } @@ -2032,14 +2036,15 @@ impl harness_channel::ChannelInstanceStore for JsonFileChannelInstanceStore { instance: &harness_channel::ChannelInstance, ) -> Result<(), BoxError> { let mut guard = self.state.write().await; - if let Some(slot) = guard.iter_mut().find(|i| i.id == instance.id) { + let mut next = guard.clone(); + if let Some(slot) = next.iter_mut().find(|i| i.id == instance.id) { *slot = instance.clone(); } else { - guard.push(instance.clone()); + next.push(instance.clone()); } - let snapshot = guard.clone(); - drop(guard); - self.flush(&snapshot).await + self.flush(&next).await?; + *guard = next; + Ok(()) } async fn get( @@ -2059,14 +2064,14 @@ impl harness_channel::ChannelInstanceStore for JsonFileChannelInstanceStore { async fn delete(&self, id: &str) -> Result { let mut guard = self.state.write().await; - let before = guard.len(); - guard.retain(|i| i.id != id); - if before == guard.len() { + let mut next = guard.clone(); + let before = next.len(); + next.retain(|i| i.id != id); + if before == next.len() { return Ok(false); } - let snapshot = guard.clone(); - drop(guard); - self.flush(&snapshot).await?; + self.flush(&next).await?; + *guard = next; Ok(true) } } @@ -3029,4 +3034,51 @@ mod tests { let rows = store.list_for_channel("wecom").await.unwrap(); assert_eq!(rows.len(), 1); } + + // A failed flush must leave the in-memory state untouched so memory never + // diverges from disk (issue #52). We force the flush to fail by occupying + // the `atomic_write` temp path with a directory, which makes the temp-file + // write error out. + #[tokio::test] + async fn channel_binding_rolls_back_on_flush_failure() { + let dir = tempdir().unwrap(); + let store = JsonFileChannelBindingStore::open(dir.path()).unwrap(); + store + .upsert(&ChannelBinding::new("wecom", "g1", "conv-good", None, None)) + .await + .unwrap(); + + // Block the temp-file write: atomic_write targets `.tmp`. + let tmp = dir.path().join("channel_bindings.json.tmp"); + std::fs::create_dir(&tmp).unwrap(); + + // Overwriting an existing key must fail at flush and roll back. + let err = store + .upsert(&ChannelBinding::new("wecom", "g1", "conv-bad", None, None)) + .await; + assert!(err.is_err()); + // A brand-new key must also fail without being added. + let err = store + .upsert(&ChannelBinding::new("feishu", "f1", "conv-bad", None, None)) + .await; + assert!(err.is_err()); + + // In-memory state is unchanged: the good row survives, the bad ones + // never landed. + let got = store.lookup("wecom", "g1").await.unwrap().unwrap(); + assert_eq!(got.conversation_id, "conv-good"); + assert!(store.lookup("feishu", "f1").await.unwrap().is_none()); + + // Unblock and confirm a subsequent write commits cleanly. + std::fs::remove_dir(&tmp).unwrap(); + store + .upsert(&ChannelBinding::new("wecom", "g1", "conv-new", None, None)) + .await + .unwrap(); + let reopened = JsonFileChannelBindingStore::open(dir.path()).unwrap(); + assert_eq!( + reopened.lookup("wecom", "g1").await.unwrap().unwrap().conversation_id, + "conv-new" + ); + } }