diff --git a/wavesyncdb/src/web_engine.rs b/wavesyncdb/src/web_engine.rs index 95ab937..ba22ff9 100644 --- a/wavesyncdb/src/web_engine.rs +++ b/wavesyncdb/src/web_engine.rs @@ -647,7 +647,14 @@ impl WebSyncClient { .with_behaviour(|key, relay_client| WebBehaviour { snapshot: request_response::Behaviour::new( [(SNAPSHOT_PROTOCOL, request_response::ProtocolSupport::Full)], - request_response::Config::default(), + // Match the native side (engine/behaviour.rs): a + // catch-up `ChangesetResponse` against a peer with + // months of history can take well over the default + // 10s, especially over a circuit relay. 30s matches + // what the native swarm uses and keeps the two + // sides symmetric. Issue #59 sub-bug 2. + request_response::Config::default() + .with_request_timeout(Duration::from_secs(30)), ), push: request_response::Behaviour::new( [(PUSH_PROTOCOL, request_response::ProtocolSupport::Full)], @@ -1231,6 +1238,67 @@ async fn send_version_vector(state: &EngineState, out_tx: &mpsc::UnboundedSender let _ = out_tx.send(req); } +/// Send a `VersionVector` request over the swarm transport to a freshly +/// connected peer, asking for every shadow change strictly newer than the +/// last `db_version` we recorded from this peer. Mirrors +/// [`send_version_vector`] (the loopback variant) but routes via the +/// `snapshot` request-response behavior. +/// +/// **Why this exists:** without this trigger, a fresh `WebSyncClient` +/// joining a relay-mediated mesh receives only *future* writes — any +/// historical state on the peer never lands in IndexedDB, because the +/// gossipsub mesh only forwards real-time `Push` envelopes. The +/// `peer_versions` table in `BrowserStore` was always being written on +/// successful Push receipt but never read for catch-up. See issue #57. +/// +/// HMAC: when a passphrase is configured, the `VersionVector` request +/// MUST carry an HMAC tag — Rule 2.7 in `CLAUDE.md` (HMAC verification +/// is mandatory on ALL message paths, the catch-up path included). +/// Peers drop unauthenticated `VersionVector` requests silently. +/// +/// `db_version=0` semantics (Rule 2.5): if the new peer hasn't been +/// seen before, `get_peer_version` returns 0 and the peer replies with +/// the full history. This is the new-peer onboarding signal — do not +/// change it. +async fn send_version_vector_swarm( + peer: LibPeerId, + state: &EngineState, + swarm: &mut Swarm, +) { + let store = match &state.store { + Some(s) => s, + None => return, // ephemeral clients don't track peer versions + }; + let peer_key = peer.to_string(); + let last_seen = match store.get_peer_version(&peer_key).await { + Ok(v) => v, + Err(e) => { + log::warn!("swarm: peer_version read for {peer_key} failed: {e}"); + 0 + } + }; + let my_db_version = *state.db_version.lock().await; + let mut req = SyncRequest::VersionVector { + my_db_version, + your_last_db_version: last_seen, + site_id: state.site_id, + topic: state.topic.clone(), + hmac: None, + }; + if let Some(gk) = &state.group_key { + if let Ok(bytes) = serde_json::to_vec(&req) { + let tag = gk.mac(&bytes); + if let SyncRequest::VersionVector { ref mut hmac, .. } = req { + *hmac = Some(tag); + } + } + } + log::debug!( + "swarm: requesting catch-up from {peer} since db_version={last_seen} (we are at {my_db_version})" + ); + let _ = swarm.behaviour_mut().snapshot.send_request(&peer, req); +} + async fn handle_submit_local_loopback( state: &EngineState, end: &mut LoopbackEnd, @@ -1544,6 +1612,12 @@ async fn run_swarm( // handler — see the comment in `handle_event` for the reentrancy // reason. Drained on every loop iteration before re-polling. let mut pending_announces: Vec = Vec::new(); + // Sync peers we want to ping with a `VersionVector` catch-up + // request as soon as they connect. Deferred for the same + // wasm_bindgen_futures executor-reentrancy reason as the relay + // announce queue above. Drained right after `pending_announces`. + // See issue #57. + let mut pending_version_vectors: Vec = Vec::new(); loop { for peer in pending_announces.drain(..) { @@ -1553,6 +1627,12 @@ async fn run_swarm( }; let _ = swarm.behaviour_mut().push.send_request(&peer, req); } + for peer in pending_version_vectors.drain(..).collect::>() { + // We collect-then-iterate so the borrow of + // `pending_version_vectors` released before + // `send_version_vector_swarm` takes &mut swarm. + send_version_vector_swarm(peer, &state, &mut swarm).await; + } tokio::select! { cmd = cmd_rx.recv() => { @@ -1574,7 +1654,15 @@ async fn run_swarm( } } event = swarm.select_next_some() => { - handle_event(event, &mut connected, &mut relay_connected, &mut pending_announces, &state, &mut swarm).await; + handle_event( + event, + &mut connected, + &mut relay_connected, + &mut pending_announces, + &mut pending_version_vectors, + &state, + &mut swarm, + ).await; } } } @@ -1681,6 +1769,7 @@ async fn handle_event( connected: &mut HashSet, relay_connected: &mut bool, pending_announces: &mut Vec, + pending_version_vectors: &mut Vec, state: &EngineState, swarm: &mut Swarm, ) { @@ -1728,6 +1817,13 @@ async fn handle_event( }, endpoint.get_remote_address() ); + // Defer the version-vector catch-up `send_request` to + // the next loop iteration. Same wasm_bindgen_futures + // executor-reentrancy hazard as the relay announce + // path. Without this trigger every browser tab would + // permanently miss any history the peer already had + // — see issue #57. + pending_version_vectors.push(peer_id); } push_status(state, connected, *relay_connected); } @@ -2135,12 +2231,87 @@ async fn handle_snapshot_event( .send_response(channel, SyncResponse::IdentityAck); } }, + // Real-network counterpart of the loopback ChangesetResponse + // path. Without this, the catch-up data ships from the peer + // and lands in the swarm event loop only to be dropped — see + // issue #59. Mirrors the native engine's + // `handle_changeset_response` (`engine/sync_handler.rs`): + // verify topic, verify HMAC (against bytes that include the + // actual `your_last_db_version`, not a placeholder — the + // sender computed the tag over the real value), reconstruct a + // SyncChangeset so the existing apply path is reused, and + // call `apply_remote_changeset`. That function persists each + // winning change, broadcasts on `resolved_tx`, and updates + // `peer_versions[peer]` — no separate `set_peer_version` call + // is needed here. Event::Message { - message: Message::Response { .. }, + peer, + message: Message::Response { response, .. }, .. - } => { - // PushAck etc. — no-op for now. - } + } => match response { + SyncResponse::ChangesetResponse { + changes, + my_db_version, + your_last_db_version, + site_id: peer_site_id, + topic: peer_topic, + hmac, + } => { + if peer_topic != state.topic { + log::debug!( + "WebSyncClient: dropping ChangesetResponse from {peer} — topic mismatch" + ); + return; + } + if let Some(gk) = &state.group_key { + let verify = SyncResponse::ChangesetResponse { + changes: changes.clone(), + my_db_version, + your_last_db_version, + site_id: peer_site_id, + topic: peer_topic.clone(), + hmac: None, + }; + let bytes = match serde_json::to_vec(&verify) { + Ok(b) => b, + Err(_) => return, + }; + let tag = match hmac { + Some(t) => t, + None => { + log::debug!( + "WebSyncClient: dropping ChangesetResponse from {peer} — missing HMAC" + ); + return; + } + }; + if !gk.verify(&bytes, &tag) { + log::debug!( + "WebSyncClient: dropping ChangesetResponse from {peer} — bad HMAC" + ); + return; + } + } + log::info!( + "WebSyncClient: received ChangesetResponse from {peer} with {} changes (their db_version={my_db_version})", + changes.len() + ); + // Reconstruct the same `SyncChangeset` shape the + // inbound-Push path constructs, so `apply_remote_changeset` + // handles persistence + `resolved_tx` broadcast + + // `peer_versions` update uniformly. + let changeset = SyncChangeset { + site_id: peer_site_id, + db_version: my_db_version, + changes, + }; + let _ = state.inbound_tx.send(changeset.clone()); + apply_remote_changeset(state, &peer, &changeset).await; + } + SyncResponse::PushAck | SyncResponse::IdentityAck => { + // Acknowledgements only — nothing to apply. + } + }, Event::OutboundFailure { peer, error, .. } => { log::warn!("WebSyncClient: outbound to {peer} failed: {error}"); }