diff --git a/js/app/packages/block-md/component/MarkdownEditor.tsx b/js/app/packages/block-md/component/MarkdownEditor.tsx index 54ba9893f4..00d22b43b4 100644 --- a/js/app/packages/block-md/component/MarkdownEditor.tsx +++ b/js/app/packages/block-md/component/MarkdownEditor.tsx @@ -83,6 +83,11 @@ import { type NodekeyOffset, SearchHighlight, } from '@core/component/LexicalMarkdown/plugins/find-and-replace'; +import { + createHoverTooltipStore, + HoverTooltip, + hoverTooltipPlugin, +} from '@core/component/LexicalMarkdown/plugins/hover-tooltip'; import { iosCursorScrollPlugin } from '@core/component/LexicalMarkdown/plugins/ios-cursor-scroll'; import { GO_TO_LOCATION_COMMAND, @@ -937,6 +942,9 @@ export function MarkdownEditor(props: { }, }); + const [hoverTooltipStore, setHoverTooltipStore] = createHoverTooltipStore(); + plugins.use(hoverTooltipPlugin({ setState: (s) => setHoverTooltipStore(s) })); + const [wordcountStats, setWordcountStats] = createWordcountStatsStore(); plugins.use( wordcountPlugin({ setStore: setWordcountStats, debounceTime: 200 }) @@ -1021,6 +1029,7 @@ export function MarkdownEditor(props: { {getBlankMarkdownPlaceholder(canEdit())} + diff --git a/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/HoverTooltip.tsx b/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/HoverTooltip.tsx new file mode 100644 index 0000000000..2d97e64991 --- /dev/null +++ b/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/HoverTooltip.tsx @@ -0,0 +1,116 @@ +import { UserIcon } from '@core/component/UserIcon'; +import { macroIdToEmail, tryMacroId, useDisplayNameParts } from '@core/user'; +import { formatRelativeTimestamp } from '@entity'; +import { syncServiceClient } from '@service-sync/client'; +import { debounce } from '@solid-primitives/scheduled'; +import { + createEffect, + createResource, + createSignal, + onCleanup, + Show, + untrack, +} from 'solid-js'; +import type { Store } from 'solid-js/store'; +import type { HoverTooltipState } from './hoverTooltipPlugin'; + +const FETCH_DELAY_MS = 400; +const SHOW_DELAY_MS = 500; + +function UserLine(props: { userId: string; editedAt: Date }) { + const macroId = tryMacroId(props.userId); + const { firstName } = useDisplayNameParts(macroId); + const name = () => + firstName() || (macroId ? macroIdToEmail(macroId) : props.userId); + + return ( + + + {name()}, {formatRelativeTimestamp(props.editedAt)} + + ); +} + +export function HoverTooltip(props: { + state: Store; + documentId: string; +}) { + const [visible, setVisible] = createSignal(false); + // The nodeId we've actually committed to fetching for. Debounced from the + // raw hovered nodeId so we don't fire a request the instant the cursor + // crosses a text node. + const [armedNodeId, setArmedNodeId] = createSignal(null); + let shownAtX = 0; + let shownAtY = 0; + let showTimer: ReturnType | null = null; + + const debouncedArm = debounce((nodeId: string | null) => { + setArmedNodeId(nodeId); + }, FETCH_DELAY_MS); + + const hide = () => { + debouncedArm.clear(); + if (showTimer) clearTimeout(showTimer); + showTimer = null; + setArmedNodeId(null); + setVisible(false); + }; + + const [blame] = createResource(armedNodeId, async (nodeId) => { + const res = await syncServiceClient.getNodeBlame({ + documentId: props.documentId, + nodeId, + }); + return res.isOk() ? res.value : null; + }); + + // Drive the fetch — debounced on nodeId only, ignores cursor x/y. + createEffect(() => { + const nodeId = props.state.hovering ? props.state.nodeId : null; + if (nodeId === null) { + debouncedArm.clear(); + setArmedNodeId(null); + } else { + debouncedArm(nodeId); + } + }); + + // Drive the visibility — based on cursor stillness (x/y). + createEffect(() => { + const { hovering, x, y } = props.state; + + if (!hovering) return hide(); + + // After shown: any pointer move dismisses. + if (untrack(visible)) { + if (x !== shownAtX || y !== shownAtY) hide(); + return; + } + + // Pre-show: each move restarts the show timer. + if (showTimer) clearTimeout(showTimer); + showTimer = setTimeout(() => { + shownAtX = x; + shownAtY = y; + setVisible(true); + }, SHOW_DELAY_MS); + }); + + onCleanup(hide); + + return ( + + {(b) => ( +
+ +
+ )} +
+ ); +} diff --git a/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/hoverTooltipPlugin.ts b/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/hoverTooltipPlugin.ts new file mode 100644 index 0000000000..0ab48f6fb0 --- /dev/null +++ b/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/hoverTooltipPlugin.ts @@ -0,0 +1,108 @@ +import { isTouchDevice } from '@core/mobile/isTouchDevice'; +import { mergeRegister } from '@lexical/utils'; +import { $getId } from '@lexical-core'; +import { + $getNearestNodeFromDOMNode, + $isTextNode, + type LexicalEditor, + type LexicalNode, +} from 'lexical'; +import { createStore } from 'solid-js/store'; + +export type HoverTooltipState = { + hovering: boolean; + x: number; + y: number; + nodeId: string | null; +}; + +export function createHoverTooltipStore() { + return createStore({ + hovering: false, + x: 0, + y: 0, + nodeId: null, + }); +} + +type HoverTooltipPluginProps = { + setState: (state: Partial) => void; +}; + +/** + * Given a DOM element under the cursor, find the stable Lexical id of the + * nearest ancestor text-bearing node. Must be called inside `editor.read()` + * so the Lexical state is readable. + * + * Returns null when the cursor isn't over a text node, or when no ancestor + * has been assigned a stable id yet (typically transient nodes). + */ +function $resolveHoveredNodeId(target: HTMLElement): string | null { + const node = $getNearestNodeFromDOMNode(target); + if (!node || !$isTextNode(node)) return null; + + let cursor: LexicalNode | null = node; + while (cursor) { + const id = $getId(cursor); + if (id) return id; + cursor = cursor.getParent(); + } + return null; +} + +function registerHoverTooltipPlugin( + editor: LexicalEditor, + props: HoverTooltipPluginProps +) { + const handlePointerMove = (e: MouseEvent) => { + if (isTouchDevice()) return; + const target = e.target; + if (!(target instanceof HTMLElement)) { + props.setState({ hovering: false, nodeId: null }); + return; + } + + // Suppress while the user has a text selection — that's when the + // formatting popup shows and the tooltip would compete with it. + const sel = window.getSelection(); + if (sel && !sel.isCollapsed && sel.toString().length > 0) { + props.setState({ hovering: false, nodeId: null }); + return; + } + + const nodeId = editor.read(() => $resolveHoveredNodeId(target)); + if (nodeId === null) { + props.setState({ hovering: false, nodeId: null }); + return; + } + props.setState({ + hovering: true, + x: e.clientX, + y: e.clientY, + nodeId, + }); + }; + + const dismiss = () => { + props.setState({ hovering: false, nodeId: null }); + }; + + return mergeRegister( + editor.registerRootListener((root, prevRoot) => { + if (root) { + root.addEventListener('pointermove', handlePointerMove); + root.addEventListener('pointerleave', dismiss); + root.addEventListener('pointerdown', dismiss); + } + if (prevRoot) { + prevRoot.removeEventListener('pointermove', handlePointerMove); + prevRoot.removeEventListener('pointerleave', dismiss); + prevRoot.removeEventListener('pointerdown', dismiss); + } + }) + ); +} + +export function hoverTooltipPlugin(props: HoverTooltipPluginProps) { + return (editor: LexicalEditor) => registerHoverTooltipPlugin(editor, props); +} diff --git a/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/index.ts b/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/index.ts new file mode 100644 index 0000000000..00c698eef8 --- /dev/null +++ b/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/index.ts @@ -0,0 +1,2 @@ +export * from './HoverTooltip'; +export * from './hoverTooltipPlugin'; diff --git a/js/app/packages/core/component/LexicalMarkdown/plugins/index.ts b/js/app/packages/core/component/LexicalMarkdown/plugins/index.ts index fc6010a27d..3c1933e660 100644 --- a/js/app/packages/core/component/LexicalMarkdown/plugins/index.ts +++ b/js/app/packages/core/component/LexicalMarkdown/plugins/index.ts @@ -15,6 +15,7 @@ export * from './file-paste'; export * from './find-and-replace'; export * from './generate'; export * from './horizontal-rules'; +export * from './hover-tooltip'; export * from './insert-text'; export * from './ios-cursor-scroll'; export * from './katex'; diff --git a/js/app/packages/core/component/LexicalMarkdown/plugins/pluginManager.ts b/js/app/packages/core/component/LexicalMarkdown/plugins/pluginManager.ts index b90a8eb9e2..940fa39828 100644 --- a/js/app/packages/core/component/LexicalMarkdown/plugins/pluginManager.ts +++ b/js/app/packages/core/component/LexicalMarkdown/plugins/pluginManager.ts @@ -21,7 +21,7 @@ import { checklistPlugin } from './checklist/'; import { customDeletePlugin } from './custom-delete'; import { markdownShortcutsPlugin } from './markdown-shortcuts'; -type PluginFunction = (editor: LexicalEditor) => () => void; +export type PluginFunction = (editor: LexicalEditor) => () => void; /** * Create a binding between a LexicalEditor and the ability to register plugins diff --git a/js/app/packages/service-clients/service-sync/client.ts b/js/app/packages/service-clients/service-sync/client.ts index 3a6e420af2..5e81971bf1 100644 --- a/js/app/packages/service-clients/service-sync/client.ts +++ b/js/app/packages/service-clients/service-sync/client.ts @@ -179,6 +179,36 @@ export const syncServiceClient = { return ok(response.value as MetadataResponse); }, + /** + * Look up who last edited a given Lexical node and when. `user_id` is a + * MacroId resolved server-side; `null` if the peer has no recorded user + * (anonymous edits or legacy data not yet mirrored locally). + */ + async getNodeBlame(args: { documentId: string; nodeId: string }) { + const token = await getPermissionToken('document', args.documentId); + + const response = await syncFetch<{ + peer_id: string; + user_id: string | null; + timestamp_ms: number; + }>(`/document/${args.documentId}/blame/${args.nodeId}`, { + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${token}`, + }, + method: 'GET', + }); + + if (response.isErr()) { + return err(response.error); + } + const { peer_id, user_id, timestamp_ms } = response.value; + return ok({ + peerId: peer_id, + userId: user_id, + editedAt: new Date(timestamp_ms), + }); + }, async getSnapshot(args: { documentId: string }) { const token = await getPermissionToken('document', args.documentId); const response = await platformFetch( diff --git a/rust/sync-service/database/user-peer-mapping/migrations/0002_add_blame.sql b/rust/sync-service/database/user-peer-mapping/migrations/0002_add_blame.sql new file mode 100644 index 0000000000..b149981f52 --- /dev/null +++ b/rust/sync-service/database/user-peer-mapping/migrations/0002_add_blame.sql @@ -0,0 +1,9 @@ +CREATE TABLE blame ( + document_id TEXT NOT NULL, + node_id TEXT NOT NULL, + peer_id TEXT NOT NULL, + timestamp_ms INTEGER NOT NULL, + PRIMARY KEY (document_id, node_id) +); + +CREATE INDEX idx_blame_document_id ON blame (document_id); diff --git a/rust/sync-service/src/d1.rs b/rust/sync-service/src/d1.rs index ce4dfcaf1c..b4a40e00ff 100644 --- a/rust/sync-service/src/d1.rs +++ b/rust/sync-service/src/d1.rs @@ -88,3 +88,96 @@ pub async fn get_peers_for_document_id( Ok(peers) } + +/// A single pending "last edited by" event, buffered until the +/// next alarm tick flushes everything +#[derive(Debug, Clone)] +pub struct BlameEvent { + pub document_id: String, + pub node_id: String, + pub peer_id: u64, + pub timestamp_ms: i64, +} + +/// Maximum statements per D1 `batch()` call. D1 has a per-batch statement +/// limit; chunking keeps us comfortably under it. +const BATCH_CHUNK_SIZE: usize = 100; + +/// Bulk-upsert a list of buffered blame events. Uses D1's `batch()` so all +/// events in a chunk commit in a single round-trip. +pub async fn insert_blame_many( + env: &worker::Env, + events: &[BlameEvent], +) -> worker::Result<()> { + if events.is_empty() { + return Ok(()); + } + tracing::info!(count = events.len(), "insert_blame_many"); + + for chunk in events.chunks(BATCH_CHUNK_SIZE) { + let db = env.d1(crate::constants::USER_PEER_D1_BINDING)?; + let stmts: Vec<_> = chunk + .iter() + .map(|e| { + db.prepare( + "INSERT INTO blame (document_id, node_id, peer_id, timestamp_ms) \ + VALUES (?, ?, ?, ?) \ + ON CONFLICT(document_id, node_id) DO UPDATE SET \ + peer_id = excluded.peer_id, \ + timestamp_ms = excluded.timestamp_ms;", + ) + .bind(&[ + e.document_id.as_str().into(), + e.node_id.as_str().into(), + e.peer_id.to_string().into(), + // d1 js doesn't support bigint + (e.timestamp_ms as f64).into(), + ]) + }) + .collect::>>()?; + db.batch(stmts).await?; + } + Ok(()) +} + + +#[derive(serde::Deserialize, serde::Serialize)] +pub struct BlameRow { + pub peer_id: String, + pub user_id: Option, + pub timestamp_ms: i64, +} + +/// JOIN blame with peer_user_map to get last-edit info plus resolved user_id. +pub async fn get_blame_for_node( + db: D1Database, + document_id: &str, + node_id: &str, +) -> worker::Result> { + let statement = db.prepare( + " + SELECT b.peer_id AS peer_id, + p.user_id AS user_id, + b.timestamp_ms AS timestamp_ms + FROM blame b + LEFT JOIN peer_user_map p + ON p.document_id = b.document_id + AND p.peer_id = b.peer_id + WHERE b.document_id = ? AND b.node_id = ? + LIMIT 1; + ", + ); + let result = statement + .bind(&[document_id.into(), node_id.into()])? + .all() + .await?; + let mut rows = result.results::()?; + let row = rows.pop(); + tracing::info!( + document_id = document_id, + node_id = node_id, + found = row.is_some(), + "get_blame_for_node" + ); + Ok(row) +} diff --git a/rust/sync-service/src/durable_object.rs b/rust/sync-service/src/durable_object.rs index c68546ecc4..bf095138b0 100644 --- a/rust/sync-service/src/durable_object.rs +++ b/rust/sync-service/src/durable_object.rs @@ -54,6 +54,7 @@ mod path { pub const ACTIVE_PEERS_MARKER: &str = "active_peers"; pub const PEER: &str = "peer"; pub const METADATA: &str = "metadata"; + pub const BLAME: &str = "blame"; pub const DEBUG_DUMP_OPERATIONS: &str = "debug_dump_operations"; pub const DEBUG_DO_KV_GET: &str = "debug_do_kv_get"; pub const DEBUG_DO_KV_LIST: &str = "debug_do_kv_list"; @@ -117,6 +118,8 @@ pub struct DocumentSyncSession { /// a map from websocket's ID's to websocket metadata ws_meta_map: Arc>, msg_buffer: Arc>>, + /// Buffered blame events. Flushed via D1 batch on each alarm tick. + pending_blame: Arc>>, } mod u64_serde_strings { @@ -203,7 +206,7 @@ impl<'a> Wsm<'a> { } Ok(()) } - async fn get_peer_ids(&mut self) -> Result> { + pub async fn get_peer_ids(&mut self) -> Result> { self.maybe_update_ws_meta_map().await?; let ws_id = self.get_ws_id()?.to_string(); Ok(self @@ -283,6 +286,35 @@ impl DocumentSyncSession { pub fn get_websockets(&self) -> Vec { self.state.get_websockets() } + + pub fn push_blame_events(&self, events: Vec) { + if events.is_empty() { + return; + } + self.pending_blame + .lock("DocumentSyncSession::push_blame_events") + .extend(events); + } + + /// Drain the pending blame buffer and write all events via a single D1 + /// batch in the background. Returns immediately; the actual write runs + /// inside `wait_until` so the alarm handler doesn't block on D1. + fn flush_pending_blame(&self) { + let pending: Vec = std::mem::take( + &mut *self + .pending_blame + .lock("DocumentSyncSession::flush_pending_blame"), + ); + if pending.is_empty() { + return; + } + let env = self.env.clone(); + self.state.wait_until(async move { + if let Err(e) = crate::d1::insert_blame_many(&env, &pending).await { + warn!(error = ?e, "failed to flush pending blame"); + } + }); + } async fn inner_fetch(&self, req: Request) -> Result { let url = req.url()?; let matched = ROUTER @@ -315,6 +347,9 @@ impl DocumentSyncSession { or_unauth!(claims.has_document_id_access(document_id).then_some(())); match rest { path::METADATA => return self.metadata_handler(document_id).await, + path::BLAME => { + return self.blame_handler(matched.params.get("node_id")).await; + } path::RAW => return self.raw_handler(document_id).await, path::SNAPSHOT => return self.snapshot_handler(req, document_id).await, path::ACTIVE_PEERS_MARKER => return self.active_peer_ids_handler().await, @@ -544,6 +579,16 @@ impl DocumentSyncSession { }) } + async fn blame_handler(&self, node_id: Option<&str>) -> Result { + let node_id = node_id.ok_or_else(|| Error::from("missing node_id"))?; + let document_id = self.document_id().await?.to_string(); + let db = self.env.d1(USER_PEER_D1_BINDING)?; + match crate::d1::get_blame_for_node(db, &document_id, node_id).await? { + Some(row) => ResponseBuilder::new().from_json(&row), + None => Ok(response(status_codes::NOT_FOUND)), + } + } + async fn connect_handler(&self, req: Request, document_id: &str) -> Result { let (res, elap) = timeit!({ let claims = or_unauth!(decode_jwt(&req, &self.env, TokenFrom::QueryParams).ok()); @@ -760,6 +805,9 @@ pub static ROUTER: LazyLock> = LazyLock::new(|| { router .insert("/document/{document_id}/metadata", path::METADATA) .unwrap(); + router + .insert("/document/{document_id}/blame/{node_id}", path::BLAME) + .unwrap(); router .insert( "/document/{document_id}/debug_dump_operations", @@ -795,6 +843,7 @@ impl DurableObject for DocumentSyncSession { awareness: EphemeralStore::new(5_000), ws_meta_map: Arc::new(Mutex::new(Default::default())), msg_buffer: Arc::new(Mutex::new(vec![])), + pending_blame: Arc::new(Mutex::new(Vec::new())), } } @@ -921,6 +970,8 @@ impl DurableObject for DocumentSyncSession { }); } + self.flush_pending_blame(); + // Re-arm the alarm while clients are connected so the in-memory state // stays warm and pending updates keep getting persisted. Updates reach // peers when they happen (PeerUpdate broadcast); pushing a full diff --git a/rust/sync-service/src/state.rs b/rust/sync-service/src/state.rs index 0bdbd95162..28e32599d9 100644 --- a/rust/sync-service/src/state.rs +++ b/rust/sync-service/src/state.rs @@ -1,5 +1,6 @@ use std::{borrow::Cow, sync::Mutex}; +use loro::{Container, ContainerID, ExportMode, Frontiers, LoroDoc, LoroValue, ToJson}; use loro::{ExportMode, Frontiers, LoroDoc, ToJson, VersionVector}; use tracing::debug; use web_time::Instant; @@ -83,17 +84,73 @@ impl DocumentState { .unwrap_context("last_export mutex poisoned") = Some(Instant::now()); } - /// Import a new update into the document state - pub fn import(&self, update: &[u8]) -> Result<()> { + /// Import an update into the document. Returns the set of Lexical node IDs + /// whose backing Loro containers were touched, deduplicated. + pub fn import(&self, update: &[u8]) -> Result> { + let before = self.loro_doc.oplog_frontiers(); self.loro_doc .import_with(update, FROM_CLIENT_TAG) .context("failed to import update")?; + let after = self.loro_doc.oplog_frontiers(); + *self .last_update .lock() .unwrap_context("last_update mutex poisoned") = Some(Instant::now()); - Ok(()) + Ok(self.touched_lexical_ids(&before, &after)) + } + + /// Diff the two frontiers and return the Lexical node IDs whose backing + /// containers were modified, deduplicated. Empty `Vec` on diff failure. + fn touched_lexical_ids(&self, before: &Frontiers, after: &Frontiers) -> Vec { + let Ok(diff) = self.loro_doc.diff(before, after) else { + return Vec::new(); + }; + let mut touched: Vec = diff + .iter() + .filter_map(|(cid, _)| self.find_lexical_id(cid)) + .collect(); + touched.sort(); + touched.dedup(); + touched + } + + /// Walk from a changed container up to the nearest ancestor LoroMap whose + /// `$` submap has an `id` — that string is the Lexical node ID. + fn find_lexical_id(&self, container_id: &ContainerID) -> Option { + // Real Lexical docs never nest this deep; cap as a safety net against + // unexpectedly large paths from Loro. + const MAX_DEPTH: usize = 100; + + // Start with the container itself, then walk its ancestors. + let mut candidates: Vec = vec![container_id.clone()]; + if let Some(path) = self.loro_doc.get_path_to_container(container_id) { + for (cid, _) in path.into_iter().rev() { + candidates.push(cid); + } + } + + for cid in candidates.into_iter().take(MAX_DEPTH) { + let Some(container) = self.loro_doc.get_container(cid) else { + continue; + }; + let Container::Map(map) = container else { + continue; + }; + let Some(meta_voc) = map.get("$") else { continue }; + let Some(Container::Map(meta_map)) = meta_voc.into_container().ok() else { + continue; + }; + let Some(id_voc) = meta_map.get("id") else { + continue; + }; + let Some(LoroValue::String(id)) = id_voc.into_value().ok() else { + continue; + }; + return Some(id.to_string()); + } + None } /// Export the document state as a snapshot diff --git a/rust/sync-service/src/storage/backends/durable_kv.rs b/rust/sync-service/src/storage/backends/durable_kv.rs index 0d908ca08f..f5cf5edc97 100644 --- a/rust/sync-service/src/storage/backends/durable_kv.rs +++ b/rust/sync-service/src/storage/backends/durable_kv.rs @@ -93,17 +93,21 @@ impl DurableKVStorage { self.list_do_kv(PENDING_OP_PREFIX).await } - pub async fn apply_op(&self, document_state: &DocumentState, op_update: &[u8]) -> Result<()> { + pub async fn apply_op( + &self, + document_state: &DocumentState, + op_update: &[u8], + ) -> Result> { let op_id = self.ids.id(); let op_key = pending_op_key(&op_id); - document_state.import(op_update)?; + let touched_nodes = document_state.import(op_update)?; self.inner.put(&op_key, op_update).await?; self.applied_keys .write() .unwrap_context("applied_keys mutex poisoned") .insert(op_key); self.inner.put(&all_op_key(&op_id), op_update).await?; - Ok(()) + Ok(touched_nodes) } pub async fn apply_pending_ops(&self, snapshot: &DocumentState) -> Result<()> { diff --git a/rust/sync-service/src/storage/mod.rs b/rust/sync-service/src/storage/mod.rs index 1e97ff692d..b4d6a4270b 100644 --- a/rust/sync-service/src/storage/mod.rs +++ b/rust/sync-service/src/storage/mod.rs @@ -86,14 +86,14 @@ impl SessionStorage { Ok(res) } - /// append a new pending operation to the operation log + /// Append a new pending operation to the operation log and return the + /// Lexical node IDs that were touched (for blame tracking). pub async fn append_pending_operation( &self, operation: &[u8], document_state: &DocumentState, - ) -> Result<()> { - self.oplog.apply_op(document_state, operation).await?; - Ok(()) + ) -> Result> { + self.oplog.apply_op(document_state, operation).await } pub async fn clear_applied_ops(&self) -> Result<()> { diff --git a/rust/sync-service/src/websocket.rs b/rust/sync-service/src/websocket.rs index 76818d66b0..fc1a2cfba6 100644 --- a/rust/sync-service/src/websocket.rs +++ b/rust/sync-service/src/websocket.rs @@ -14,6 +14,13 @@ use crate::{ storage::SessionStorage, }; +fn now_ms() -> i64 { + web_time::SystemTime::now() + .duration_since(web_time::UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .unwrap_or(0) +} + fn serialize<'a, T: bebop::Record<'a>>( obj: T, msg_buf: &mut Vec, @@ -113,6 +120,45 @@ pub async fn process_message( tracing::warn!("received update from peer without edit permission"); return Ok(()); } + let touched_nodes = session_storage + .append_pending_operation(&update, document_state) + .await?; + + if !touched_nodes.is_empty() { + let peer_ids = Wsm::new(dss, ws).get_peer_ids().await.unwrap_or_default(); + if let Some(&peer_id) = peer_ids.first() { + let now_ms = now_ms(); + dss.push_blame_events( + touched_nodes + .into_iter() + .map(|node_id| crate::d1::BlameEvent { + document_id: document_id.to_string(), + node_id, + peer_id, + timestamp_ms: now_ms, + }) + .collect(), + ); + } + } + + { + // ACK the sender first: the update is durably stored at this + // point, and a failed send to some *other* peer must not block + // the ack, or the sender tears down a healthy connection. The + // reverse holds too — if the sender's socket is broken the + // peers below must still receive the update (the sender will + // re-send it after reconnecting, which is a harmless + // duplicate), so a failed ack is logged rather than returned. + let message = FromRemote::RemoteUpdateAck { + update: SliceWrapper::Raw(&update), + }; + let mut buf = buf.lock("serialize RemoteUpdateAck in PeerUpdate handler"); + let serialized = + serialize(message, &mut buf).context("Failed serializing update")?; + if let Err(e) = ws.send_with_bytes(serialized) { + tracing::warn!(error = ?e, "failed to send ack to the update's sender"); + } for update in &updates { session_storage