Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 177 additions & 6 deletions wavesyncdb/src/web_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,14 @@ impl WebSyncClient {
.with_behaviour(|key, relay_client| WebBehaviour {
snapshot: request_response::Behaviour::new(
[(SNAPSHOT_PROTOCOL, request_response::ProtocolSupport::Full)],
request_response::Config::default(),
// Match the native side (engine/behaviour.rs): a
// catch-up `ChangesetResponse` against a peer with
// months of history can take well over the default
// 10s, especially over a circuit relay. 30s matches
// what the native swarm uses and keeps the two
// sides symmetric. Issue #59 sub-bug 2.
request_response::Config::default()
.with_request_timeout(Duration::from_secs(30)),
),
push: request_response::Behaviour::new(
[(PUSH_PROTOCOL, request_response::ProtocolSupport::Full)],
Expand Down Expand Up @@ -1231,6 +1238,67 @@ async fn send_version_vector(state: &EngineState, out_tx: &mpsc::UnboundedSender
let _ = out_tx.send(req);
}

/// Send a `VersionVector` request over the swarm transport to a freshly
/// connected peer, asking for every shadow change strictly newer than the
/// last `db_version` we recorded from this peer. Mirrors
/// [`send_version_vector`] (the loopback variant) but routes via the
/// `snapshot` request-response behavior.
///
/// **Why this exists:** without this trigger, a fresh `WebSyncClient`
/// joining a relay-mediated mesh receives only *future* writes — any
/// historical state on the peer never lands in IndexedDB, because the
/// gossipsub mesh only forwards real-time `Push` envelopes. The
/// `peer_versions` table in `BrowserStore` was always being written on
/// successful Push receipt but never read for catch-up. See issue #57.
///
/// HMAC: when a passphrase is configured, the `VersionVector` request
/// MUST carry an HMAC tag — Rule 2.7 in `CLAUDE.md` (HMAC verification
/// is mandatory on ALL message paths, the catch-up path included).
/// Peers drop unauthenticated `VersionVector` requests silently.
///
/// `db_version=0` semantics (Rule 2.5): if the new peer hasn't been
/// seen before, `get_peer_version` returns 0 and the peer replies with
/// the full history. This is the new-peer onboarding signal — do not
/// change it.
async fn send_version_vector_swarm(
peer: LibPeerId,
state: &EngineState,
swarm: &mut Swarm<WebBehaviour>,
) {
let store = match &state.store {
Some(s) => s,
None => return, // ephemeral clients don't track peer versions
};
let peer_key = peer.to_string();
let last_seen = match store.get_peer_version(&peer_key).await {
Ok(v) => v,
Err(e) => {
log::warn!("swarm: peer_version read for {peer_key} failed: {e}");
0
}
};
let my_db_version = *state.db_version.lock().await;
let mut req = SyncRequest::VersionVector {
my_db_version,
your_last_db_version: last_seen,
site_id: state.site_id,
topic: state.topic.clone(),
hmac: None,
};
if let Some(gk) = &state.group_key {
if let Ok(bytes) = serde_json::to_vec(&req) {
let tag = gk.mac(&bytes);
if let SyncRequest::VersionVector { ref mut hmac, .. } = req {
*hmac = Some(tag);
}
}
}
log::debug!(
"swarm: requesting catch-up from {peer} since db_version={last_seen} (we are at {my_db_version})"
);
let _ = swarm.behaviour_mut().snapshot.send_request(&peer, req);
}

async fn handle_submit_local_loopback(
state: &EngineState,
end: &mut LoopbackEnd,
Expand Down Expand Up @@ -1544,6 +1612,12 @@ async fn run_swarm(
// handler — see the comment in `handle_event` for the reentrancy
// reason. Drained on every loop iteration before re-polling.
let mut pending_announces: Vec<LibPeerId> = Vec::new();
// Sync peers we want to ping with a `VersionVector` catch-up
// request as soon as they connect. Deferred for the same
// wasm_bindgen_futures executor-reentrancy reason as the relay
// announce queue above. Drained right after `pending_announces`.
// See issue #57.
let mut pending_version_vectors: Vec<LibPeerId> = Vec::new();

loop {
for peer in pending_announces.drain(..) {
Expand All @@ -1553,6 +1627,12 @@ async fn run_swarm(
};
let _ = swarm.behaviour_mut().push.send_request(&peer, req);
}
for peer in pending_version_vectors.drain(..).collect::<Vec<_>>() {
// We collect-then-iterate so the borrow of
// `pending_version_vectors` released before
// `send_version_vector_swarm` takes &mut swarm.
send_version_vector_swarm(peer, &state, &mut swarm).await;
}

tokio::select! {
cmd = cmd_rx.recv() => {
Expand All @@ -1574,7 +1654,15 @@ async fn run_swarm(
}
}
event = swarm.select_next_some() => {
handle_event(event, &mut connected, &mut relay_connected, &mut pending_announces, &state, &mut swarm).await;
handle_event(
event,
&mut connected,
&mut relay_connected,
&mut pending_announces,
&mut pending_version_vectors,
&state,
&mut swarm,
).await;
}
}
}
Expand Down Expand Up @@ -1681,6 +1769,7 @@ async fn handle_event(
connected: &mut HashSet<LibPeerId>,
relay_connected: &mut bool,
pending_announces: &mut Vec<LibPeerId>,
pending_version_vectors: &mut Vec<LibPeerId>,
state: &EngineState,
swarm: &mut Swarm<WebBehaviour>,
) {
Expand Down Expand Up @@ -1728,6 +1817,13 @@ async fn handle_event(
},
endpoint.get_remote_address()
);
// Defer the version-vector catch-up `send_request` to
// the next loop iteration. Same wasm_bindgen_futures
// executor-reentrancy hazard as the relay announce
// path. Without this trigger every browser tab would
// permanently miss any history the peer already had
// — see issue #57.
pending_version_vectors.push(peer_id);
}
push_status(state, connected, *relay_connected);
}
Expand Down Expand Up @@ -2135,12 +2231,87 @@ async fn handle_snapshot_event(
.send_response(channel, SyncResponse::IdentityAck);
}
},
// Real-network counterpart of the loopback ChangesetResponse
// path. Without this, the catch-up data ships from the peer
// and lands in the swarm event loop only to be dropped — see
// issue #59. Mirrors the native engine's
// `handle_changeset_response` (`engine/sync_handler.rs`):
// verify topic, verify HMAC (against bytes that include the
// actual `your_last_db_version`, not a placeholder — the
// sender computed the tag over the real value), reconstruct a
// SyncChangeset so the existing apply path is reused, and
// call `apply_remote_changeset`. That function persists each
// winning change, broadcasts on `resolved_tx`, and updates
// `peer_versions[peer]` — no separate `set_peer_version` call
// is needed here.
Event::Message {
message: Message::Response { .. },
peer,
message: Message::Response { response, .. },
..
} => {
// PushAck etc. — no-op for now.
}
} => match response {
SyncResponse::ChangesetResponse {
changes,
my_db_version,
your_last_db_version,
site_id: peer_site_id,
topic: peer_topic,
hmac,
} => {
if peer_topic != state.topic {
log::debug!(
"WebSyncClient: dropping ChangesetResponse from {peer} — topic mismatch"
);
return;
}
if let Some(gk) = &state.group_key {
let verify = SyncResponse::ChangesetResponse {
changes: changes.clone(),
my_db_version,
your_last_db_version,
site_id: peer_site_id,
topic: peer_topic.clone(),
hmac: None,
};
let bytes = match serde_json::to_vec(&verify) {
Ok(b) => b,
Err(_) => return,
};
let tag = match hmac {
Some(t) => t,
None => {
log::debug!(
"WebSyncClient: dropping ChangesetResponse from {peer} — missing HMAC"
);
return;
}
};
if !gk.verify(&bytes, &tag) {
log::debug!(
"WebSyncClient: dropping ChangesetResponse from {peer} — bad HMAC"
);
return;
}
}
log::info!(
"WebSyncClient: received ChangesetResponse from {peer} with {} changes (their db_version={my_db_version})",
changes.len()
);
// Reconstruct the same `SyncChangeset` shape the
// inbound-Push path constructs, so `apply_remote_changeset`
// handles persistence + `resolved_tx` broadcast +
// `peer_versions` update uniformly.
let changeset = SyncChangeset {
site_id: peer_site_id,
db_version: my_db_version,
changes,
};
let _ = state.inbound_tx.send(changeset.clone());
apply_remote_changeset(state, &peer, &changeset).await;
}
SyncResponse::PushAck | SyncResponse::IdentityAck => {
// Acknowledgements only — nothing to apply.
}
},
Event::OutboundFailure { peer, error, .. } => {
log::warn!("WebSyncClient: outbound to {peer} failed: {error}");
}
Expand Down
Loading