Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 81 additions & 29 deletions crates/harness-store/src/json_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1920,16 +1920,21 @@ impl JsonFileChannelBindingStore {
impl ChannelBindingStore for JsonFileChannelBindingStore {
async fn upsert(&self, binding: &ChannelBinding) -> Result<(), BoxError> {
let mut guard = self.state.write().await;
if let Some(slot) = guard.iter_mut().find(|b| {
// Build the next state without touching `guard`, flush while still
// holding the lock (so concurrent disk writes can't reorder), and only
// commit the in-memory mutation once the flush succeeds — a failed
// flush leaves memory and disk consistent (both unchanged).
let mut next = guard.clone();
if let Some(slot) = next.iter_mut().find(|b| {
b.channel == binding.channel && b.channel_chat_id == binding.channel_chat_id
}) {
*slot = binding.clone();
} else {
guard.push(binding.clone());
next.push(binding.clone());
}
let snapshot = guard.clone();
drop(guard);
self.flush(&snapshot).await
self.flush(&next).await?;
*guard = next;
Ok(())
}

async fn lookup(
Expand All @@ -1954,29 +1959,28 @@ impl ChannelBindingStore for JsonFileChannelBindingStore {

async fn delete(&self, channel: &str, channel_chat_id: &str) -> Result<bool, BoxError> {
let mut guard = self.state.write().await;
let before = guard.len();
guard.retain(|b| !(b.channel == channel && b.channel_chat_id == channel_chat_id));
let removed = before != guard.len();
if !removed {
let mut next = guard.clone();
let before = next.len();
next.retain(|b| !(b.channel == channel && b.channel_chat_id == channel_chat_id));
if before == next.len() {
return Ok(false);
}
let snapshot = guard.clone();
drop(guard);
self.flush(&snapshot).await?;
self.flush(&next).await?;
*guard = next;
Ok(true)
}

async fn delete_for_conversation(&self, conversation_id: &str) -> Result<usize, BoxError> {
let mut guard = self.state.write().await;
let before = guard.len();
guard.retain(|b| b.conversation_id != conversation_id);
let removed = before - guard.len();
let mut next = guard.clone();
let before = next.len();
next.retain(|b| b.conversation_id != conversation_id);
let removed = before - next.len();
if removed == 0 {
return Ok(0);
}
let snapshot = guard.clone();
drop(guard);
self.flush(&snapshot).await?;
self.flush(&next).await?;
*guard = next;
Ok(removed)
}
}
Expand Down Expand Up @@ -2032,14 +2036,15 @@ impl harness_channel::ChannelInstanceStore for JsonFileChannelInstanceStore {
instance: &harness_channel::ChannelInstance,
) -> Result<(), BoxError> {
let mut guard = self.state.write().await;
if let Some(slot) = guard.iter_mut().find(|i| i.id == instance.id) {
let mut next = guard.clone();
if let Some(slot) = next.iter_mut().find(|i| i.id == instance.id) {
*slot = instance.clone();
} else {
guard.push(instance.clone());
next.push(instance.clone());
}
let snapshot = guard.clone();
drop(guard);
self.flush(&snapshot).await
self.flush(&next).await?;
*guard = next;
Ok(())
}

async fn get(
Expand All @@ -2059,14 +2064,14 @@ impl harness_channel::ChannelInstanceStore for JsonFileChannelInstanceStore {

async fn delete(&self, id: &str) -> Result<bool, BoxError> {
let mut guard = self.state.write().await;
let before = guard.len();
guard.retain(|i| i.id != id);
if before == guard.len() {
let mut next = guard.clone();
let before = next.len();
next.retain(|i| i.id != id);
if before == next.len() {
return Ok(false);
}
let snapshot = guard.clone();
drop(guard);
self.flush(&snapshot).await?;
self.flush(&next).await?;
*guard = next;
Ok(true)
}
}
Expand Down Expand Up @@ -3029,4 +3034,51 @@ mod tests {
let rows = store.list_for_channel("wecom").await.unwrap();
assert_eq!(rows.len(), 1);
}

// A failed flush must leave the in-memory state untouched so memory never
// diverges from disk (issue #52). We force the flush to fail by occupying
// the `atomic_write` temp path with a directory, which makes the temp-file
// write error out.
#[tokio::test]
async fn channel_binding_rolls_back_on_flush_failure() {
let dir = tempdir().unwrap();
let store = JsonFileChannelBindingStore::open(dir.path()).unwrap();
store
.upsert(&ChannelBinding::new("wecom", "g1", "conv-good", None, None))
.await
.unwrap();

// Block the temp-file write: atomic_write targets `<path>.tmp`.
let tmp = dir.path().join("channel_bindings.json.tmp");
std::fs::create_dir(&tmp).unwrap();

// Overwriting an existing key must fail at flush and roll back.
let err = store
.upsert(&ChannelBinding::new("wecom", "g1", "conv-bad", None, None))
.await;
assert!(err.is_err());
// A brand-new key must also fail without being added.
let err = store
.upsert(&ChannelBinding::new("feishu", "f1", "conv-bad", None, None))
.await;
assert!(err.is_err());

// In-memory state is unchanged: the good row survives, the bad ones
// never landed.
let got = store.lookup("wecom", "g1").await.unwrap().unwrap();
assert_eq!(got.conversation_id, "conv-good");
assert!(store.lookup("feishu", "f1").await.unwrap().is_none());

// Unblock and confirm a subsequent write commits cleanly.
std::fs::remove_dir(&tmp).unwrap();
store
.upsert(&ChannelBinding::new("wecom", "g1", "conv-new", None, None))
.await
.unwrap();
let reopened = JsonFileChannelBindingStore::open(dir.path()).unwrap();
assert_eq!(
reopened.lookup("wecom", "g1").await.unwrap().unwrap().conversation_id,
"conv-new"
);
}
}
Loading