From 522ea5af32c80d56d7cf223ed3d5d53e08118565 Mon Sep 17 00:00:00 2001 From: pvg13 Date: Wed, 13 May 2026 14:42:45 +0200 Subject: [PATCH 1/2] fix(web): trigger version-vector catch-up on swarm ConnectionEstablished (#57) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A fresh `WebSyncClient` joining a relay-mediated mesh was receiving only *future* writes from peers. Historical state already on the peers (rows written before this tab existed) never landed in IndexedDB because the swarm path never sent a `VersionVector` request when a peer connected. The loopback path already did this correctly on every offline→online transition (`send_version_vector`, called from `run_loopback`'s `was_online` edge), but `run_swarm`'s `ConnectionEstablished` arm only inserted the peer into `connected` and pushed a status update. The persistence side was always ready — `BrowserStore` writes `peer_versions[peer]` on every successful inbound Push and exposes `get_peer_version()` — but nothing read it. After this commit, every non-relay `ConnectionEstablished` enqueues the peer into a new `pending_version_vectors: Vec` queue that's drained at the top of each `run_swarm` loop iteration, paralleling the existing `pending_announces` pattern. The drain calls the new `send_version_vector_swarm` helper which mirrors the loopback variant but routes the request via `swarm.behaviour_mut().snapshot.send_request(&peer, req)`. Why deferred via queue instead of called synchronously from `handle_event`: same wasm_bindgen_futures executor-reentrancy hazard the relay branch already documents at length — calling `request_response.send_request` synchronously inside `ConnectionEstablished` can wake a task whose poll re-enters `Inner::run` while we still hold the outer borrow from `swarm.select_next_some()`. The queue → drain pattern keeps all async send sites at the top of the loop, between selects. Invariants preserved: - HMAC mandatory on ALL request paths. When `group_key` is configured the new request is signed with `gk.mac(&bytes)` before being sent, identical to the loopback variant. Peers silently drop unauthenticated `VersionVector` requests. - `db_version=0` is the new-peer onboarding signal. `get_peer_version` returns 0 for peers never seen before, so a fresh tab's first `VersionVector` to each peer carries `your_last_db_version=0`, asking for full history. This is the only mechanism for initial state transfer over the swarm path. - Swarm cannot be held across awaits. The drain phase takes `&mut swarm` for `send_version_vector_swarm`'s own `swarm.behaviour_mut().snapshot.send_request` call but the `get_peer_version().await` and `state.db_version.lock().await` happen before any swarm borrow — the swarm is only touched at the final synchronous `send_request` line. Validation: - `cargo check --target wasm32-unknown-unknown -p wavesyncdb --features web,dioxus`: clean - `cargo check --target wasm32-unknown-unknown -p example-qr-pairing -p wavesync-website`: clean - `cargo clippy -p wavesyncdb -p wavesyncdb_derive -p wavesync_relay -- -D warnings`: clean - `cargo test -p wavesyncdb --lib`: 167 unit tests pass - `cargo fmt --check`: clean No automated test for this path — it requires a real browser dialing a real relay with at least one connected peer holding history. Manual repro plan is in the PR body. Known followup (deliberately not in this PR): on rapid reconnect storms the queue can fire multiple `VersionVector`s for the same peer if `ConnectionClosed`+`ConnectionEstablished` cycle inside a single loop iteration. The receiver responds idempotently so this is wasteful rather than incorrect. Switch the queue to a `HashSet` or add a per-peer in-flight tracker if production traffic shows this matters. Closes #57 --- wavesyncdb/src/web_engine.rs | 91 +++++++++++++++++++++++++++++++++++- 1 file changed, 90 insertions(+), 1 deletion(-) diff --git a/wavesyncdb/src/web_engine.rs b/wavesyncdb/src/web_engine.rs index 95ab937..70e0369 100644 --- a/wavesyncdb/src/web_engine.rs +++ b/wavesyncdb/src/web_engine.rs @@ -1231,6 +1231,67 @@ async fn send_version_vector(state: &EngineState, out_tx: &mpsc::UnboundedSender let _ = out_tx.send(req); } +/// Send a `VersionVector` request over the swarm transport to a freshly +/// connected peer, asking for every shadow change strictly newer than the +/// last `db_version` we recorded from this peer. Mirrors +/// [`send_version_vector`] (the loopback variant) but routes via the +/// `snapshot` request-response behavior. +/// +/// **Why this exists:** without this trigger, a fresh `WebSyncClient` +/// joining a relay-mediated mesh receives only *future* writes — any +/// historical state on the peer never lands in IndexedDB, because the +/// gossipsub mesh only forwards real-time `Push` envelopes. The +/// `peer_versions` table in `BrowserStore` was always being written on +/// successful Push receipt but never read for catch-up. See issue #57. +/// +/// HMAC: when a passphrase is configured, the `VersionVector` request +/// MUST carry an HMAC tag — Rule 2.7 in `CLAUDE.md` (HMAC verification +/// is mandatory on ALL message paths, the catch-up path included). +/// Peers drop unauthenticated `VersionVector` requests silently. +/// +/// `db_version=0` semantics (Rule 2.5): if the new peer hasn't been +/// seen before, `get_peer_version` returns 0 and the peer replies with +/// the full history. This is the new-peer onboarding signal — do not +/// change it. +async fn send_version_vector_swarm( + peer: LibPeerId, + state: &EngineState, + swarm: &mut Swarm, +) { + let store = match &state.store { + Some(s) => s, + None => return, // ephemeral clients don't track peer versions + }; + let peer_key = peer.to_string(); + let last_seen = match store.get_peer_version(&peer_key).await { + Ok(v) => v, + Err(e) => { + log::warn!("swarm: peer_version read for {peer_key} failed: {e}"); + 0 + } + }; + let my_db_version = *state.db_version.lock().await; + let mut req = SyncRequest::VersionVector { + my_db_version, + your_last_db_version: last_seen, + site_id: state.site_id, + topic: state.topic.clone(), + hmac: None, + }; + if let Some(gk) = &state.group_key { + if let Ok(bytes) = serde_json::to_vec(&req) { + let tag = gk.mac(&bytes); + if let SyncRequest::VersionVector { ref mut hmac, .. } = req { + *hmac = Some(tag); + } + } + } + log::debug!( + "swarm: requesting catch-up from {peer} since db_version={last_seen} (we are at {my_db_version})" + ); + let _ = swarm.behaviour_mut().snapshot.send_request(&peer, req); +} + async fn handle_submit_local_loopback( state: &EngineState, end: &mut LoopbackEnd, @@ -1544,6 +1605,12 @@ async fn run_swarm( // handler — see the comment in `handle_event` for the reentrancy // reason. Drained on every loop iteration before re-polling. let mut pending_announces: Vec = Vec::new(); + // Sync peers we want to ping with a `VersionVector` catch-up + // request as soon as they connect. Deferred for the same + // wasm_bindgen_futures executor-reentrancy reason as the relay + // announce queue above. Drained right after `pending_announces`. + // See issue #57. + let mut pending_version_vectors: Vec = Vec::new(); loop { for peer in pending_announces.drain(..) { @@ -1553,6 +1620,12 @@ async fn run_swarm( }; let _ = swarm.behaviour_mut().push.send_request(&peer, req); } + for peer in pending_version_vectors.drain(..).collect::>() { + // We collect-then-iterate so the borrow of + // `pending_version_vectors` released before + // `send_version_vector_swarm` takes &mut swarm. + send_version_vector_swarm(peer, &state, &mut swarm).await; + } tokio::select! { cmd = cmd_rx.recv() => { @@ -1574,7 +1647,15 @@ async fn run_swarm( } } event = swarm.select_next_some() => { - handle_event(event, &mut connected, &mut relay_connected, &mut pending_announces, &state, &mut swarm).await; + handle_event( + event, + &mut connected, + &mut relay_connected, + &mut pending_announces, + &mut pending_version_vectors, + &state, + &mut swarm, + ).await; } } } @@ -1681,6 +1762,7 @@ async fn handle_event( connected: &mut HashSet, relay_connected: &mut bool, pending_announces: &mut Vec, + pending_version_vectors: &mut Vec, state: &EngineState, swarm: &mut Swarm, ) { @@ -1728,6 +1810,13 @@ async fn handle_event( }, endpoint.get_remote_address() ); + // Defer the version-vector catch-up `send_request` to + // the next loop iteration. Same wasm_bindgen_futures + // executor-reentrancy hazard as the relay announce + // path. Without this trigger every browser tab would + // permanently miss any history the peer already had + // — see issue #57. + pending_version_vectors.push(peer_id); } push_status(state, connected, *relay_connected); } From a52e876f92bb5a6445e072f263d42a9578523a40 Mon Sep 17 00:00:00 2001 From: pvg13 Date: Wed, 13 May 2026 14:56:55 +0200 Subject: [PATCH 2/2] fix(web): apply ChangesetResponse + bump snapshot timeout to 30s (#59) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to the version-vector trigger from #57. With the trigger in place the web sends the request and the native peer responds, but two issues remained that left the catch-up still broken in the field. ## Sub-bug 1: response was dropped on the floor `handle_snapshot_event`'s `Message::Response { .. }` arm was a no-op ("PushAck etc. — no-op for now"), so the `ChangesetResponse` carrying the actual catch-up data ran straight to `/dev/null`. The loopback path doesn't hit this because its responder sends a fresh `Push` request through `out_tx` instead of a `send_response`, which routes through the regular Push handler that already calls `apply_remote_changeset`. The swarm path had neither apply logic nor a "Push back" fallback. Fix: the response arm now matches on `SyncResponse::ChangesetResponse`, mirrors the native engine's `handle_changeset_response` (`engine/sync_handler.rs:52-93`): - verify the response's `topic` against ours, - verify the HMAC against bytes that include the *real* `your_last_db_version` value (the sender computed the tag over the real value; using a placeholder would always fail verification), - log the count + responder's db_version for diagnostic visibility, - reconstruct the same `SyncChangeset` shape the inbound-Push path builds, broadcast it on `inbound_tx`, and call `apply_remote_changeset` — which already persists each winning change, broadcasts on `resolved_tx`, and updates `peer_versions[peer]`. No separate `set_peer_version` is needed. `PushAck` and `IdentityAck` keep their no-op semantics. ## Sub-bug 2: 10s default timeout was too short for full history `request_response::Config::default()` carries a 10s request timeout. For a fresh tab pulling months of history against a populated peer over a circuit relay, the responder's `get_changes_since(0)` scan + JSON serialization + HMAC + circuit-relay hop frequently exceeded 10s, surfacing as `OutboundFailure: Timeout while waiting for a response`. The native side already uses `with_request_timeout(Duration::from_secs(30))` on its snapshot behaviour (`engine/behaviour.rs:72`). The web was asymmetrically slower than the native side's tolerance; matching the 30s value keeps the two ends symmetric. If real-world catch-ups ever need more, chunking the response into multiple Pushes is the next lever (followup, not this PR). Invariants preserved (same as #57): - HMAC mandatory on ALL message paths — the response handler drops unauthenticated `ChangesetResponse` silently. - `db_version=0` semantics still drive full-history onboarding. - Swarm not held across awaits — `apply_remote_changeset` is awaited but only `&state` (and `&peer`) are passed; the swarm borrow ends before the await. Closes #59 --- wavesyncdb/src/web_engine.rs | 92 ++++++++++++++++++++++++++++++++++-- 1 file changed, 87 insertions(+), 5 deletions(-) diff --git a/wavesyncdb/src/web_engine.rs b/wavesyncdb/src/web_engine.rs index 70e0369..ba22ff9 100644 --- a/wavesyncdb/src/web_engine.rs +++ b/wavesyncdb/src/web_engine.rs @@ -647,7 +647,14 @@ impl WebSyncClient { .with_behaviour(|key, relay_client| WebBehaviour { snapshot: request_response::Behaviour::new( [(SNAPSHOT_PROTOCOL, request_response::ProtocolSupport::Full)], - request_response::Config::default(), + // Match the native side (engine/behaviour.rs): a + // catch-up `ChangesetResponse` against a peer with + // months of history can take well over the default + // 10s, especially over a circuit relay. 30s matches + // what the native swarm uses and keeps the two + // sides symmetric. Issue #59 sub-bug 2. + request_response::Config::default() + .with_request_timeout(Duration::from_secs(30)), ), push: request_response::Behaviour::new( [(PUSH_PROTOCOL, request_response::ProtocolSupport::Full)], @@ -2224,12 +2231,87 @@ async fn handle_snapshot_event( .send_response(channel, SyncResponse::IdentityAck); } }, + // Real-network counterpart of the loopback ChangesetResponse + // path. Without this, the catch-up data ships from the peer + // and lands in the swarm event loop only to be dropped — see + // issue #59. Mirrors the native engine's + // `handle_changeset_response` (`engine/sync_handler.rs`): + // verify topic, verify HMAC (against bytes that include the + // actual `your_last_db_version`, not a placeholder — the + // sender computed the tag over the real value), reconstruct a + // SyncChangeset so the existing apply path is reused, and + // call `apply_remote_changeset`. That function persists each + // winning change, broadcasts on `resolved_tx`, and updates + // `peer_versions[peer]` — no separate `set_peer_version` call + // is needed here. Event::Message { - message: Message::Response { .. }, + peer, + message: Message::Response { response, .. }, .. - } => { - // PushAck etc. — no-op for now. - } + } => match response { + SyncResponse::ChangesetResponse { + changes, + my_db_version, + your_last_db_version, + site_id: peer_site_id, + topic: peer_topic, + hmac, + } => { + if peer_topic != state.topic { + log::debug!( + "WebSyncClient: dropping ChangesetResponse from {peer} — topic mismatch" + ); + return; + } + if let Some(gk) = &state.group_key { + let verify = SyncResponse::ChangesetResponse { + changes: changes.clone(), + my_db_version, + your_last_db_version, + site_id: peer_site_id, + topic: peer_topic.clone(), + hmac: None, + }; + let bytes = match serde_json::to_vec(&verify) { + Ok(b) => b, + Err(_) => return, + }; + let tag = match hmac { + Some(t) => t, + None => { + log::debug!( + "WebSyncClient: dropping ChangesetResponse from {peer} — missing HMAC" + ); + return; + } + }; + if !gk.verify(&bytes, &tag) { + log::debug!( + "WebSyncClient: dropping ChangesetResponse from {peer} — bad HMAC" + ); + return; + } + } + log::info!( + "WebSyncClient: received ChangesetResponse from {peer} with {} changes (their db_version={my_db_version})", + changes.len() + ); + // Reconstruct the same `SyncChangeset` shape the + // inbound-Push path constructs, so `apply_remote_changeset` + // handles persistence + `resolved_tx` broadcast + + // `peer_versions` update uniformly. + let changeset = SyncChangeset { + site_id: peer_site_id, + db_version: my_db_version, + changes, + }; + let _ = state.inbound_tx.send(changeset.clone()); + apply_remote_changeset(state, &peer, &changeset).await; + } + SyncResponse::PushAck | SyncResponse::IdentityAck => { + // Acknowledgements only — nothing to apply. + } + }, Event::OutboundFailure { peer, error, .. } => { log::warn!("WebSyncClient: outbound to {peer} failed: {error}"); }