From b1b49c81124bd440333e5e74f282ab416d1ce270 Mon Sep 17 00:00:00 2001 From: Wolf Mermelstein Date: Wed, 10 Jun 2026 20:47:31 +0000 Subject: [PATCH 1/7] add git blame esque hovers --- .../block-md/component/MarkdownEditor.tsx | 9 ++ .../plugins/hover-tooltip/HoverTooltip.tsx | 114 ++++++++++++++++++ .../hover-tooltip/hoverTooltipPlugin.ts | 97 +++++++++++++++ .../plugins/hover-tooltip/index.ts | 2 + .../LexicalMarkdown/plugins/index.ts | 1 + .../LexicalMarkdown/plugins/pluginManager.ts | 2 +- .../service-clients/service-sync/client.ts | 30 +++++ .../migrations/0002_add_blame.sql | 9 ++ rust/sync-service/src/d1.rs | 102 ++++++++++++++++ rust/sync-service/src/durable_object.rs | 26 +++- rust/sync-service/src/state.rs | 56 ++++++++- .../src/storage/backends/durable_kv.rs | 10 +- rust/sync-service/src/storage/mod.rs | 8 +- rust/sync-service/src/websocket.rs | 16 ++- 14 files changed, 468 insertions(+), 14 deletions(-) create mode 100644 js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/HoverTooltip.tsx create mode 100644 js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/hoverTooltipPlugin.ts create mode 100644 js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/index.ts create mode 100644 rust/sync-service/database/user-peer-mapping/migrations/0002_add_blame.sql diff --git a/js/app/packages/block-md/component/MarkdownEditor.tsx b/js/app/packages/block-md/component/MarkdownEditor.tsx index 8c4fd9ec0a..a5c587143f 100644 --- a/js/app/packages/block-md/component/MarkdownEditor.tsx +++ b/js/app/packages/block-md/component/MarkdownEditor.tsx @@ -76,6 +76,11 @@ import { } from '@core/component/LexicalMarkdown/plugins/checkbox-to-task'; import { codePlugin } from '@core/component/LexicalMarkdown/plugins/code/codePlugin'; import { emojisPlugin } from '@core/component/LexicalMarkdown/plugins/emojis/emojisPlugin'; +import { + createHoverTooltipStore, + HoverTooltip, + hoverTooltipPlugin, +} from '@core/component/LexicalMarkdown/plugins/hover-tooltip'; import { DO_SEARCH_COMMAND, FloatingSearchHighlight, @@ -909,6 +914,9 @@ export function MarkdownEditor(props: { }, }); + const [hoverTooltipStore, setHoverTooltipStore] = createHoverTooltipStore(); + plugins.use(hoverTooltipPlugin({ setState: (s) => setHoverTooltipStore(s) })); + const [wordcountStats, setWordcountStats] = createWordcountStatsStore(); plugins.use( wordcountPlugin({ setStore: setWordcountStats, debounceTime: 200 }) @@ -984,6 +992,7 @@ export function MarkdownEditor(props: { : `This document is blank...`} + diff --git a/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/HoverTooltip.tsx b/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/HoverTooltip.tsx new file mode 100644 index 0000000000..07a26cbc10 --- /dev/null +++ b/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/HoverTooltip.tsx @@ -0,0 +1,114 @@ +import { UserIcon } from '@core/component/UserIcon'; +import { syncServiceClient } from '@service-sync/client'; +import { macroIdToEmail, tryMacroId, useDisplayNameParts } from '@core/user'; +import { + createEffect, + createResource, + createSignal, + onCleanup, + Show, + untrack, +} from 'solid-js'; +import type { Store } from 'solid-js/store'; +import type { HoverTooltipState } from './hoverTooltipPlugin'; + +const SHOW_DELAY_MS = 600; + +function formatRelativeTime(date: Date): string { + const sec = Math.floor((Date.now() - date.getTime()) / 1000); + if (sec < 60) return 'just now'; + const min = Math.floor(sec / 60); + if (min < 60) return `${min} minute${min === 1 ? '' : 's'} ago`; + const hr = Math.floor(min / 60); + if (hr < 24) return `${hr} hour${hr === 1 ? '' : 's'} ago`; + const day = Math.floor(hr / 24); + if (day < 7) return `${day} day${day === 1 ? '' : 's'} ago`; + return date.toLocaleDateString(); +} + +function UserLine(props: { userId: string; editedAt: Date }) { + const macroId = tryMacroId(props.userId); + const { firstName } = useDisplayNameParts(macroId); + const name = () => + firstName() || + (macroId ? macroIdToEmail(macroId).split('@')[0] : props.userId); + + return ( + + + {name()}, {formatRelativeTime(props.editedAt)} + + ); +} + +export function HoverTooltip(props: { + state: Store; + documentId: string; +}) { + const [visible, setVisible] = createSignal(false); + let shownAtX = 0; + let shownAtY = 0; + let timer: ReturnType | null = null; + + const hide = () => { + if (timer) { + clearTimeout(timer); + timer = null; + } + setVisible(false); + }; + + const [blame] = createResource( + () => (props.state.hovering ? props.state.nodeId : null), + async (nodeId) => { + const res = await syncServiceClient.getNodeBlame({ + documentId: props.documentId, + nodeId, + }); + return res.isOk() ? res.value : null; + } + ); + + createEffect(() => { + const { hovering, x, y } = props.state; + + if (!hovering) return hide(); + + // After shown: any pointer move dismisses. + if (untrack(visible)) { + if (x !== shownAtX || y !== shownAtY) hide(); + return; + } + + // Pre-show: each move restarts the timer. It only fires when the cursor settles. + if (timer) clearTimeout(timer); + timer = setTimeout(() => { + shownAtX = x; + shownAtY = y; + setVisible(true); + }, SHOW_DELAY_MS); + }); + + onCleanup(hide); + + return ( + + {(b) => ( +
+ +
+ )} +
+ ); +} diff --git a/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/hoverTooltipPlugin.ts b/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/hoverTooltipPlugin.ts new file mode 100644 index 0000000000..efe77f35d4 --- /dev/null +++ b/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/hoverTooltipPlugin.ts @@ -0,0 +1,97 @@ +import { isTouchDevice } from '@core/mobile/isTouchDevice'; +import { $getId } from '@lexical-core'; +import { mergeRegister } from '@lexical/utils'; +import { + $getNearestNodeFromDOMNode, + $isTextNode, + type LexicalEditor, + type LexicalNode, +} from 'lexical'; +import { createStore } from 'solid-js/store'; + +export type HoverTooltipState = { + hovering: boolean; + x: number; + y: number; + nodeId: string | null; +}; + +export function createHoverTooltipStore() { + return createStore({ + hovering: false, + x: 0, + y: 0, + nodeId: null, + }); +} + +type HoverTooltipPluginProps = { + setState: (state: Partial) => void; +}; + +function registerHoverTooltipPlugin( + editor: LexicalEditor, + props: HoverTooltipPluginProps +) { + const handlePointerMove = (e: MouseEvent) => { + if (isTouchDevice()) return; + const target = e.target; + if (!(target instanceof HTMLElement)) { + props.setState({ hovering: false, nodeId: null }); + return; + } + + // Suppress while the user has a text selection — that's when the + // formatting popup shows and the tooltip would compete with it. + const sel = window.getSelection(); + if (sel && !sel.isCollapsed && sel.toString().length > 0) { + props.setState({ hovering: false, nodeId: null }); + return; + } + + editor.read(() => { + const node = $getNearestNodeFromDOMNode(target); + if (!node || !$isTextNode(node)) { + props.setState({ hovering: false, nodeId: null }); + return; + } + // Walk up to the nearest ancestor that has a stable ID. + let cursor: LexicalNode | null = node; + let nodeId: string | null = null; + while (cursor) { + nodeId = $getId(cursor); + if (nodeId) break; + cursor = cursor.getParent(); + } + props.setState({ + hovering: true, + x: e.clientX, + y: e.clientY, + nodeId, + }); + }); + }; + + const dismiss = () => { + props.setState({ hovering: false, nodeId: null }); + }; + + return mergeRegister( + editor.registerRootListener((root, prevRoot) => { + if (root) { + root.addEventListener('pointermove', handlePointerMove); + root.addEventListener('pointerleave', dismiss); + root.addEventListener('pointerdown', dismiss); + } + if (prevRoot) { + prevRoot.removeEventListener('pointermove', handlePointerMove); + prevRoot.removeEventListener('pointerleave', dismiss); + prevRoot.removeEventListener('pointerdown', dismiss); + } + }) + ); +} + +export function hoverTooltipPlugin(props: HoverTooltipPluginProps) { + return (editor: LexicalEditor) => registerHoverTooltipPlugin(editor, props); +} diff --git a/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/index.ts b/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/index.ts new file mode 100644 index 0000000000..00c698eef8 --- /dev/null +++ b/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/index.ts @@ -0,0 +1,2 @@ +export * from './HoverTooltip'; +export * from './hoverTooltipPlugin'; diff --git a/js/app/packages/core/component/LexicalMarkdown/plugins/index.ts b/js/app/packages/core/component/LexicalMarkdown/plugins/index.ts index fc6010a27d..3c1933e660 100644 --- a/js/app/packages/core/component/LexicalMarkdown/plugins/index.ts +++ b/js/app/packages/core/component/LexicalMarkdown/plugins/index.ts @@ -15,6 +15,7 @@ export * from './file-paste'; export * from './find-and-replace'; export * from './generate'; export * from './horizontal-rules'; +export * from './hover-tooltip'; export * from './insert-text'; export * from './ios-cursor-scroll'; export * from './katex'; diff --git a/js/app/packages/core/component/LexicalMarkdown/plugins/pluginManager.ts b/js/app/packages/core/component/LexicalMarkdown/plugins/pluginManager.ts index b90a8eb9e2..940fa39828 100644 --- a/js/app/packages/core/component/LexicalMarkdown/plugins/pluginManager.ts +++ b/js/app/packages/core/component/LexicalMarkdown/plugins/pluginManager.ts @@ -21,7 +21,7 @@ import { checklistPlugin } from './checklist/'; import { customDeletePlugin } from './custom-delete'; import { markdownShortcutsPlugin } from './markdown-shortcuts'; -type PluginFunction = (editor: LexicalEditor) => () => void; +export type PluginFunction = (editor: LexicalEditor) => () => void; /** * Create a binding between a LexicalEditor and the ability to register plugins diff --git a/js/app/packages/service-clients/service-sync/client.ts b/js/app/packages/service-clients/service-sync/client.ts index 3a6e420af2..5e81971bf1 100644 --- a/js/app/packages/service-clients/service-sync/client.ts +++ b/js/app/packages/service-clients/service-sync/client.ts @@ -179,6 +179,36 @@ export const syncServiceClient = { return ok(response.value as MetadataResponse); }, + /** + * Look up who last edited a given Lexical node and when. `user_id` is a + * MacroId resolved server-side; `null` if the peer has no recorded user + * (anonymous edits or legacy data not yet mirrored locally). + */ + async getNodeBlame(args: { documentId: string; nodeId: string }) { + const token = await getPermissionToken('document', args.documentId); + + const response = await syncFetch<{ + peer_id: string; + user_id: string | null; + timestamp_ms: number; + }>(`/document/${args.documentId}/blame/${args.nodeId}`, { + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${token}`, + }, + method: 'GET', + }); + + if (response.isErr()) { + return err(response.error); + } + const { peer_id, user_id, timestamp_ms } = response.value; + return ok({ + peerId: peer_id, + userId: user_id, + editedAt: new Date(timestamp_ms), + }); + }, async getSnapshot(args: { documentId: string }) { const token = await getPermissionToken('document', args.documentId); const response = await platformFetch( diff --git a/rust/sync-service/database/user-peer-mapping/migrations/0002_add_blame.sql b/rust/sync-service/database/user-peer-mapping/migrations/0002_add_blame.sql new file mode 100644 index 0000000000..b149981f52 --- /dev/null +++ b/rust/sync-service/database/user-peer-mapping/migrations/0002_add_blame.sql @@ -0,0 +1,9 @@ +CREATE TABLE blame ( + document_id TEXT NOT NULL, + node_id TEXT NOT NULL, + peer_id TEXT NOT NULL, + timestamp_ms INTEGER NOT NULL, + PRIMARY KEY (document_id, node_id) +); + +CREATE INDEX idx_blame_document_id ON blame (document_id); diff --git a/rust/sync-service/src/d1.rs b/rust/sync-service/src/d1.rs index ce4dfcaf1c..0486bfbe73 100644 --- a/rust/sync-service/src/d1.rs +++ b/rust/sync-service/src/d1.rs @@ -88,3 +88,105 @@ pub async fn get_peers_for_document_id( Ok(peers) } + +/// Record "last edited by" for a batch of Lexical nodes touched by a single +/// update. Stamps all rows with the current time. +pub async fn record_blame( + env: &worker::Env, + document_id: &str, + peer_id: u64, + node_ids: &[String], +) -> worker::Result<()> { + let now_ms = web_time::SystemTime::now() + .duration_since(web_time::UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .unwrap_or(0); + tracing::info!( + document_id = document_id, + peer_id = peer_id, + count = node_ids.len(), + node_ids = ?node_ids, + "record_blame" + ); + for node_id in node_ids { + let db = env.d1(crate::constants::USER_PEER_D1_BINDING)?; + if let Err(e) = upsert_blame(db, document_id, node_id, peer_id, now_ms).await { + tracing::error!(error = ?e, node_id = node_id, "upsert_blame failed"); + return Err(e); + } + } + Ok(()) +} + +/// Upsert a single (document_id, node_id) -> (peer_id, timestamp_ms) row. +pub async fn upsert_blame( + db: D1Database, + document_id: &str, + node_id: &str, + peer_id: u64, + timestamp_ms: i64, +) -> worker::Result<()> { + let result = db + .prepare( + "INSERT INTO blame (document_id, node_id, peer_id, timestamp_ms) \ + VALUES (?, ?, ?, ?) \ + ON CONFLICT(document_id, node_id) DO UPDATE SET \ + peer_id = excluded.peer_id, \ + timestamp_ms = excluded.timestamp_ms;", + ) + .bind(&[ + document_id.into(), + node_id.into(), + peer_id.to_string().into(), + // d1 js doesn't support bigint + (timestamp_ms as f64).into(), + ])? + .run() + .await?; + if let Some(e) = result.error() { + error!(error = e, "upsert_blame D1 error"); + return Err(worker::Error::from(e)); + } + Ok(()) +} + +#[derive(serde::Deserialize, serde::Serialize)] +pub struct BlameRow { + pub peer_id: String, + pub user_id: Option, + pub timestamp_ms: i64, +} + +/// JOIN blame with peer_user_map to get last-edit info plus resolved user_id. +pub async fn get_blame_for_node( + db: D1Database, + document_id: &str, + node_id: &str, +) -> worker::Result> { + let statement = db.prepare( + " + SELECT b.peer_id AS peer_id, + p.user_id AS user_id, + b.timestamp_ms AS timestamp_ms + FROM blame b + LEFT JOIN peer_user_map p + ON p.document_id = b.document_id + AND p.peer_id = b.peer_id + WHERE b.document_id = ? AND b.node_id = ? + LIMIT 1; + ", + ); + let result = statement + .bind(&[document_id.into(), node_id.into()])? + .all() + .await?; + let mut rows = result.results::()?; + let row = rows.pop(); + tracing::info!( + document_id = document_id, + node_id = node_id, + found = row.is_some(), + "get_blame_for_node" + ); + Ok(row) +} diff --git a/rust/sync-service/src/durable_object.rs b/rust/sync-service/src/durable_object.rs index c68546ecc4..33a361381e 100644 --- a/rust/sync-service/src/durable_object.rs +++ b/rust/sync-service/src/durable_object.rs @@ -54,6 +54,7 @@ mod path { pub const ACTIVE_PEERS_MARKER: &str = "active_peers"; pub const PEER: &str = "peer"; pub const METADATA: &str = "metadata"; + pub const BLAME: &str = "blame"; pub const DEBUG_DUMP_OPERATIONS: &str = "debug_dump_operations"; pub const DEBUG_DO_KV_GET: &str = "debug_do_kv_get"; pub const DEBUG_DO_KV_LIST: &str = "debug_do_kv_list"; @@ -203,7 +204,7 @@ impl<'a> Wsm<'a> { } Ok(()) } - async fn get_peer_ids(&mut self) -> Result> { + pub async fn get_peer_ids(&mut self) -> Result> { self.maybe_update_ws_meta_map().await?; let ws_id = self.get_ws_id()?.to_string(); Ok(self @@ -280,6 +281,10 @@ async fn bump_alarm(state: &State) -> Result<()> { } impl DocumentSyncSession { + pub fn env(&self) -> &Env { + &self.env + } + pub fn get_websockets(&self) -> Vec { self.state.get_websockets() } @@ -315,6 +320,9 @@ impl DocumentSyncSession { or_unauth!(claims.has_document_id_access(document_id).then_some(())); match rest { path::METADATA => return self.metadata_handler(document_id).await, + path::BLAME => { + return self.blame_handler(matched.params.get("node_id")).await; + } path::RAW => return self.raw_handler(document_id).await, path::SNAPSHOT => return self.snapshot_handler(req, document_id).await, path::ACTIVE_PEERS_MARKER => return self.active_peer_ids_handler().await, @@ -544,6 +552,19 @@ impl DocumentSyncSession { }) } + /// Return who last edited the given Lexical node and when. Joins blame + /// with peer_user_map in D1 so user_id (MacroId) comes back resolved. + /// Returns 404 if the node has no recorded edits. + async fn blame_handler(&self, node_id: Option<&str>) -> Result { + let node_id = node_id.ok_or_else(|| Error::from("missing node_id"))?; + let document_id = self.document_id().await?.to_string(); + let db = self.env.d1(USER_PEER_D1_BINDING)?; + match crate::d1::get_blame_for_node(db, &document_id, node_id).await? { + Some(row) => ResponseBuilder::new().from_json(&row), + None => Ok(response(status_codes::NOT_FOUND)), + } + } + async fn connect_handler(&self, req: Request, document_id: &str) -> Result { let (res, elap) = timeit!({ let claims = or_unauth!(decode_jwt(&req, &self.env, TokenFrom::QueryParams).ok()); @@ -760,6 +781,9 @@ pub static ROUTER: LazyLock> = LazyLock::new(|| { router .insert("/document/{document_id}/metadata", path::METADATA) .unwrap(); + router + .insert("/document/{document_id}/blame/{node_id}", path::BLAME) + .unwrap(); router .insert( "/document/{document_id}/debug_dump_operations", diff --git a/rust/sync-service/src/state.rs b/rust/sync-service/src/state.rs index 343318bb7d..83320b5e11 100644 --- a/rust/sync-service/src/state.rs +++ b/rust/sync-service/src/state.rs @@ -1,6 +1,6 @@ use std::{borrow::Cow, sync::Mutex}; -use loro::{ExportMode, Frontiers, LoroDoc, ToJson}; +use loro::{Container, ContainerID, ExportMode, Frontiers, LoroDoc, LoroValue, ToJson}; use tracing::debug; use web_time::Instant; use worker::Result; @@ -83,17 +83,65 @@ impl DocumentState { .unwrap_context("last_export mutex poisoned") = Some(Instant::now()); } - /// Import a new update into the document state - pub fn import(&self, update: &[u8]) -> Result<()> { + /// Import an update into the document. Returns the set of Lexical node IDs + /// whose backing Loro containers were touched, deduplicated. + pub fn import(&self, update: &[u8]) -> Result> { + let before = self.loro_doc.oplog_frontiers(); self.loro_doc .import_with(update, FROM_CLIENT_TAG) .context("failed to import update")?; + let after = self.loro_doc.oplog_frontiers(); + + let mut touched: Vec = Vec::new(); + if let Ok(diff) = self.loro_doc.diff(&before, &after) { + for (container_id, _) in diff.iter() { + if let Some(node_id) = self.find_lexical_id(container_id) { + touched.push(node_id); + } + } + } + touched.sort(); + touched.dedup(); + *self .last_update .lock() .unwrap_context("last_update mutex poisoned") = Some(Instant::now()); - Ok(()) + Ok(touched) + } + + /// Walk from a changed container up to the nearest ancestor LoroMap whose + /// `$` submap has an `id` — that string is the Lexical node ID. + fn find_lexical_id(&self, container_id: &ContainerID) -> Option { + // Start with the container itself, then walk its ancestors. + let mut candidates: Vec = vec![container_id.clone()]; + if let Some(path) = self.loro_doc.get_path_to_container(container_id) { + for (cid, _) in path.into_iter().rev() { + candidates.push(cid); + } + } + + for cid in candidates { + let Some(container) = self.loro_doc.get_container(cid) else { + continue; + }; + let Container::Map(map) = container else { + continue; + }; + let Some(meta_voc) = map.get("$") else { continue }; + let Some(Container::Map(meta_map)) = meta_voc.into_container().ok() else { + continue; + }; + let Some(id_voc) = meta_map.get("id") else { + continue; + }; + let Some(LoroValue::String(id)) = id_voc.into_value().ok() else { + continue; + }; + return Some(id.to_string()); + } + None } /// Export the document state as a snapshot diff --git a/rust/sync-service/src/storage/backends/durable_kv.rs b/rust/sync-service/src/storage/backends/durable_kv.rs index 0d908ca08f..f5cf5edc97 100644 --- a/rust/sync-service/src/storage/backends/durable_kv.rs +++ b/rust/sync-service/src/storage/backends/durable_kv.rs @@ -93,17 +93,21 @@ impl DurableKVStorage { self.list_do_kv(PENDING_OP_PREFIX).await } - pub async fn apply_op(&self, document_state: &DocumentState, op_update: &[u8]) -> Result<()> { + pub async fn apply_op( + &self, + document_state: &DocumentState, + op_update: &[u8], + ) -> Result> { let op_id = self.ids.id(); let op_key = pending_op_key(&op_id); - document_state.import(op_update)?; + let touched_nodes = document_state.import(op_update)?; self.inner.put(&op_key, op_update).await?; self.applied_keys .write() .unwrap_context("applied_keys mutex poisoned") .insert(op_key); self.inner.put(&all_op_key(&op_id), op_update).await?; - Ok(()) + Ok(touched_nodes) } pub async fn apply_pending_ops(&self, snapshot: &DocumentState) -> Result<()> { diff --git a/rust/sync-service/src/storage/mod.rs b/rust/sync-service/src/storage/mod.rs index 1e97ff692d..b4d6a4270b 100644 --- a/rust/sync-service/src/storage/mod.rs +++ b/rust/sync-service/src/storage/mod.rs @@ -86,14 +86,14 @@ impl SessionStorage { Ok(res) } - /// append a new pending operation to the operation log + /// Append a new pending operation to the operation log and return the + /// Lexical node IDs that were touched (for blame tracking). pub async fn append_pending_operation( &self, operation: &[u8], document_state: &DocumentState, - ) -> Result<()> { - self.oplog.apply_op(document_state, operation).await?; - Ok(()) + ) -> Result> { + self.oplog.apply_op(document_state, operation).await } pub async fn clear_applied_ops(&self) -> Result<()> { diff --git a/rust/sync-service/src/websocket.rs b/rust/sync-service/src/websocket.rs index dbf63957b8..64a8a76f77 100644 --- a/rust/sync-service/src/websocket.rs +++ b/rust/sync-service/src/websocket.rs @@ -126,10 +126,24 @@ pub async fn process_message( tracing::warn!("received update from peer without edit permission"); return Ok(()); } - session_storage + let touched_nodes = session_storage .append_pending_operation(&update, document_state) .await?; + // Record "last edited by" for each Lexical node touched. + if !touched_nodes.is_empty() { + let peer_ids = Wsm::new(dss, ws).get_peer_ids().await.unwrap_or_default(); + if let Some(&peer_id) = peer_ids.first() { + let _ = crate::d1::record_blame( + dss.env(), + document_id, + peer_id, + &touched_nodes, + ) + .await; + } + } + { // ACK the sender first: the update is durably stored at this // point, and a failed send to some *other* peer must not block From 457f48364862aa4cb1acfc2765b13a51aeabb7ae Mon Sep 17 00:00:00 2001 From: Wolf Mermelstein Date: Wed, 10 Jun 2026 22:43:43 +0000 Subject: [PATCH 2/7] lazy fetch --- .../plugins/hover-tooltip/HoverTooltip.tsx | 73 ++++++++++--------- .../hover-tooltip/hoverTooltipPlugin.ts | 51 ++++++++----- 2 files changed, 71 insertions(+), 53 deletions(-) diff --git a/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/HoverTooltip.tsx b/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/HoverTooltip.tsx index 07a26cbc10..0f9ee5a6e3 100644 --- a/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/HoverTooltip.tsx +++ b/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/HoverTooltip.tsx @@ -1,6 +1,8 @@ import { UserIcon } from '@core/component/UserIcon'; import { syncServiceClient } from '@service-sync/client'; import { macroIdToEmail, tryMacroId, useDisplayNameParts } from '@core/user'; +import { formatRelativeTimestamp } from '@entity'; +import { debounce } from '@solid-primitives/scheduled'; import { createEffect, createResource, @@ -12,26 +14,14 @@ import { import type { Store } from 'solid-js/store'; import type { HoverTooltipState } from './hoverTooltipPlugin'; -const SHOW_DELAY_MS = 600; - -function formatRelativeTime(date: Date): string { - const sec = Math.floor((Date.now() - date.getTime()) / 1000); - if (sec < 60) return 'just now'; - const min = Math.floor(sec / 60); - if (min < 60) return `${min} minute${min === 1 ? '' : 's'} ago`; - const hr = Math.floor(min / 60); - if (hr < 24) return `${hr} hour${hr === 1 ? '' : 's'} ago`; - const day = Math.floor(hr / 24); - if (day < 7) return `${day} day${day === 1 ? '' : 's'} ago`; - return date.toLocaleDateString(); -} +const FETCH_DELAY_MS = 400; +const SHOW_DELAY_MS = 500; function UserLine(props: { userId: string; editedAt: Date }) { const macroId = tryMacroId(props.userId); const { firstName } = useDisplayNameParts(macroId); const name = () => - firstName() || - (macroId ? macroIdToEmail(macroId).split('@')[0] : props.userId); + firstName() || (macroId ? macroIdToEmail(macroId) : props.userId); return ( @@ -41,7 +31,7 @@ function UserLine(props: { userId: string; editedAt: Date }) { suppressClick showTooltip={false} /> - {name()}, {formatRelativeTime(props.editedAt)} + {name()}, {formatRelativeTimestamp(props.editedAt)} ); } @@ -51,29 +41,46 @@ export function HoverTooltip(props: { documentId: string; }) { const [visible, setVisible] = createSignal(false); + // The nodeId we've actually committed to fetching for. Debounced from the + // raw hovered nodeId so we don't fire a request the instant the cursor + // crosses a text node. + const [armedNodeId, setArmedNodeId] = createSignal(null); let shownAtX = 0; let shownAtY = 0; - let timer: ReturnType | null = null; + let showTimer: ReturnType | null = null; + + const debouncedArm = debounce((nodeId: string | null) => { + setArmedNodeId(nodeId); + }, FETCH_DELAY_MS); const hide = () => { - if (timer) { - clearTimeout(timer); - timer = null; - } + debouncedArm.clear(); + if (showTimer) clearTimeout(showTimer); + showTimer = null; + setArmedNodeId(null); setVisible(false); }; - const [blame] = createResource( - () => (props.state.hovering ? props.state.nodeId : null), - async (nodeId) => { - const res = await syncServiceClient.getNodeBlame({ - documentId: props.documentId, - nodeId, - }); - return res.isOk() ? res.value : null; + const [blame] = createResource(armedNodeId, async (nodeId) => { + const res = await syncServiceClient.getNodeBlame({ + documentId: props.documentId, + nodeId, + }); + return res.isOk() ? res.value : null; + }); + + // Drive the fetch — debounced on nodeId only, ignores cursor x/y. + createEffect(() => { + const nodeId = props.state.hovering ? props.state.nodeId : null; + if (nodeId === null) { + debouncedArm.clear(); + setArmedNodeId(null); + } else { + debouncedArm(nodeId); } - ); + }); + // Drive the visibility — based on cursor stillness (x/y). createEffect(() => { const { hovering, x, y } = props.state; @@ -85,9 +92,9 @@ export function HoverTooltip(props: { return; } - // Pre-show: each move restarts the timer. It only fires when the cursor settles. - if (timer) clearTimeout(timer); - timer = setTimeout(() => { + // Pre-show: each move restarts the show timer. + if (showTimer) clearTimeout(showTimer); + showTimer = setTimeout(() => { shownAtX = x; shownAtY = y; setVisible(true); diff --git a/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/hoverTooltipPlugin.ts b/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/hoverTooltipPlugin.ts index efe77f35d4..567d03a357 100644 --- a/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/hoverTooltipPlugin.ts +++ b/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/hoverTooltipPlugin.ts @@ -29,6 +29,27 @@ type HoverTooltipPluginProps = { setState: (state: Partial) => void; }; +/** + * Given a DOM element under the cursor, find the stable Lexical id of the + * nearest ancestor text-bearing node. Must be called inside `editor.read()` + * so the Lexical state is readable. + * + * Returns null when the cursor isn't over a text node, or when no ancestor + * has been assigned a stable id yet (typically transient nodes). + */ +function $resolveHoveredNodeId(target: HTMLElement): string | null { + const node = $getNearestNodeFromDOMNode(target); + if (!node || !$isTextNode(node)) return null; + + let cursor: LexicalNode | null = node; + while (cursor) { + const id = $getId(cursor); + if (id) return id; + cursor = cursor.getParent(); + } + return null; +} + function registerHoverTooltipPlugin( editor: LexicalEditor, props: HoverTooltipPluginProps @@ -49,26 +70,16 @@ function registerHoverTooltipPlugin( return; } - editor.read(() => { - const node = $getNearestNodeFromDOMNode(target); - if (!node || !$isTextNode(node)) { - props.setState({ hovering: false, nodeId: null }); - return; - } - // Walk up to the nearest ancestor that has a stable ID. - let cursor: LexicalNode | null = node; - let nodeId: string | null = null; - while (cursor) { - nodeId = $getId(cursor); - if (nodeId) break; - cursor = cursor.getParent(); - } - props.setState({ - hovering: true, - x: e.clientX, - y: e.clientY, - nodeId, - }); + const nodeId = editor.read(() => $resolveHoveredNodeId(target)); + if (nodeId === null) { + props.setState({ hovering: false, nodeId: null }); + return; + } + props.setState({ + hovering: true, + x: e.clientX, + y: e.clientY, + nodeId, }); }; From e4dc5304cae3f5117bf97f525a328de710bb5089 Mon Sep 17 00:00:00 2001 From: Wolf Mermelstein Date: Wed, 10 Jun 2026 22:48:02 +0000 Subject: [PATCH 3/7] general clean ups --- rust/sync-service/src/durable_object.rs | 11 ++++++++ rust/sync-service/src/state.rs | 34 +++++++++++++++---------- rust/sync-service/src/websocket.rs | 27 ++++++++++++++------ 3 files changed, 51 insertions(+), 21 deletions(-) diff --git a/rust/sync-service/src/durable_object.rs b/rust/sync-service/src/durable_object.rs index 33a361381e..ecd1dec7a8 100644 --- a/rust/sync-service/src/durable_object.rs +++ b/rust/sync-service/src/durable_object.rs @@ -285,6 +285,17 @@ impl DocumentSyncSession { &self.env } + /// Spawn a fire-and-forget task that the DO will keep alive past the + /// current request, but won't block subsequent requests on. Use for + /// background writes (e.g. blame, snapshot push) that mustn't be cancelled + /// when the handler returns. + pub fn wait_until(&self, future: F) + where + F: std::future::Future + 'static, + { + self.state.wait_until(future); + } + pub fn get_websockets(&self) -> Vec { self.state.get_websockets() } diff --git a/rust/sync-service/src/state.rs b/rust/sync-service/src/state.rs index 83320b5e11..946b22ec3b 100644 --- a/rust/sync-service/src/state.rs +++ b/rust/sync-service/src/state.rs @@ -92,28 +92,36 @@ impl DocumentState { .context("failed to import update")?; let after = self.loro_doc.oplog_frontiers(); - let mut touched: Vec = Vec::new(); - if let Ok(diff) = self.loro_doc.diff(&before, &after) { - for (container_id, _) in diff.iter() { - if let Some(node_id) = self.find_lexical_id(container_id) { - touched.push(node_id); - } - } - } - touched.sort(); - touched.dedup(); - *self .last_update .lock() .unwrap_context("last_update mutex poisoned") = Some(Instant::now()); - Ok(touched) + Ok(self.touched_lexical_ids(&before, &after)) + } + + /// Diff the two frontiers and return the Lexical node IDs whose backing + /// containers were modified, deduplicated. Empty `Vec` on diff failure. + fn touched_lexical_ids(&self, before: &Frontiers, after: &Frontiers) -> Vec { + let Ok(diff) = self.loro_doc.diff(before, after) else { + return Vec::new(); + }; + let mut touched: Vec = diff + .iter() + .filter_map(|(cid, _)| self.find_lexical_id(cid)) + .collect(); + touched.sort(); + touched.dedup(); + touched } /// Walk from a changed container up to the nearest ancestor LoroMap whose /// `$` submap has an `id` — that string is the Lexical node ID. fn find_lexical_id(&self, container_id: &ContainerID) -> Option { + // Real Lexical docs never nest this deep; cap as a safety net against + // unexpectedly large paths from Loro. + const MAX_DEPTH: usize = 100; + // Start with the container itself, then walk its ancestors. let mut candidates: Vec = vec![container_id.clone()]; if let Some(path) = self.loro_doc.get_path_to_container(container_id) { @@ -122,7 +130,7 @@ impl DocumentState { } } - for cid in candidates { + for cid in candidates.into_iter().take(MAX_DEPTH) { let Some(container) = self.loro_doc.get_container(cid) else { continue; }; diff --git a/rust/sync-service/src/websocket.rs b/rust/sync-service/src/websocket.rs index 64a8a76f77..aa7a161503 100644 --- a/rust/sync-service/src/websocket.rs +++ b/rust/sync-service/src/websocket.rs @@ -130,17 +130,28 @@ pub async fn process_message( .append_pending_operation(&update, document_state) .await?; - // Record "last edited by" for each Lexical node touched. + // Record "last edited by" for each Lexical node touched. Runs + // in the background via `wait_until` so the ACK + peer broadcast + // below don't block on D1, and the write still completes even + // after we return from this handler. if !touched_nodes.is_empty() { let peer_ids = Wsm::new(dss, ws).get_peer_ids().await.unwrap_or_default(); if let Some(&peer_id) = peer_ids.first() { - let _ = crate::d1::record_blame( - dss.env(), - document_id, - peer_id, - &touched_nodes, - ) - .await; + let env = dss.env().clone(); + let document_id = document_id.to_string(); + dss.wait_until(async move { + if let Err(e) = + crate::d1::record_blame(&env, &document_id, peer_id, &touched_nodes) + .await + { + tracing::warn!( + error = ?e, + document_id = document_id, + peer_id = peer_id, + "record_blame failed" + ); + } + }); } } From 5405d6803e8cbfa2aa4f1ac4ae1133ad5c2a681e Mon Sep 17 00:00:00 2001 From: Wolf Mermelstein Date: Wed, 10 Jun 2026 22:49:01 +0000 Subject: [PATCH 4/7] linter --- js/app/packages/block-md/component/MarkdownEditor.tsx | 10 +++++----- .../plugins/hover-tooltip/HoverTooltip.tsx | 9 ++------- .../plugins/hover-tooltip/hoverTooltipPlugin.ts | 2 +- 3 files changed, 8 insertions(+), 13 deletions(-) diff --git a/js/app/packages/block-md/component/MarkdownEditor.tsx b/js/app/packages/block-md/component/MarkdownEditor.tsx index a5c587143f..e832369ac7 100644 --- a/js/app/packages/block-md/component/MarkdownEditor.tsx +++ b/js/app/packages/block-md/component/MarkdownEditor.tsx @@ -76,11 +76,6 @@ import { } from '@core/component/LexicalMarkdown/plugins/checkbox-to-task'; import { codePlugin } from '@core/component/LexicalMarkdown/plugins/code/codePlugin'; import { emojisPlugin } from '@core/component/LexicalMarkdown/plugins/emojis/emojisPlugin'; -import { - createHoverTooltipStore, - HoverTooltip, - hoverTooltipPlugin, -} from '@core/component/LexicalMarkdown/plugins/hover-tooltip'; import { DO_SEARCH_COMMAND, FloatingSearchHighlight, @@ -88,6 +83,11 @@ import { type NodekeyOffset, SearchHighlight, } from '@core/component/LexicalMarkdown/plugins/find-and-replace'; +import { + createHoverTooltipStore, + HoverTooltip, + hoverTooltipPlugin, +} from '@core/component/LexicalMarkdown/plugins/hover-tooltip'; import { iosCursorScrollPlugin } from '@core/component/LexicalMarkdown/plugins/ios-cursor-scroll'; import { GO_TO_LOCATION_COMMAND, diff --git a/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/HoverTooltip.tsx b/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/HoverTooltip.tsx index 0f9ee5a6e3..2d97e64991 100644 --- a/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/HoverTooltip.tsx +++ b/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/HoverTooltip.tsx @@ -1,7 +1,7 @@ import { UserIcon } from '@core/component/UserIcon'; -import { syncServiceClient } from '@service-sync/client'; import { macroIdToEmail, tryMacroId, useDisplayNameParts } from '@core/user'; import { formatRelativeTimestamp } from '@entity'; +import { syncServiceClient } from '@service-sync/client'; import { debounce } from '@solid-primitives/scheduled'; import { createEffect, @@ -25,12 +25,7 @@ function UserLine(props: { userId: string; editedAt: Date }) { return ( - + {name()}, {formatRelativeTimestamp(props.editedAt)} ); diff --git a/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/hoverTooltipPlugin.ts b/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/hoverTooltipPlugin.ts index 567d03a357..0ab48f6fb0 100644 --- a/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/hoverTooltipPlugin.ts +++ b/js/app/packages/core/component/LexicalMarkdown/plugins/hover-tooltip/hoverTooltipPlugin.ts @@ -1,6 +1,6 @@ import { isTouchDevice } from '@core/mobile/isTouchDevice'; -import { $getId } from '@lexical-core'; import { mergeRegister } from '@lexical/utils'; +import { $getId } from '@lexical-core'; import { $getNearestNodeFromDOMNode, $isTextNode, From a24c75b5948f757fd22fb5aec66d9e81e6b95443 Mon Sep 17 00:00:00 2001 From: Wolf Mermelstein Date: Thu, 11 Jun 2026 19:18:58 +0000 Subject: [PATCH 5/7] flush every alarm --- rust/sync-service/src/d1.rs | 97 +++++++++++-------------- rust/sync-service/src/durable_object.rs | 36 +++++++++ rust/sync-service/src/websocket.rs | 37 +++++----- 3 files changed, 98 insertions(+), 72 deletions(-) diff --git a/rust/sync-service/src/d1.rs b/rust/sync-service/src/d1.rs index 0486bfbe73..b4a40e00ff 100644 --- a/rust/sync-service/src/d1.rs +++ b/rust/sync-service/src/d1.rs @@ -89,67 +89,58 @@ pub async fn get_peers_for_document_id( Ok(peers) } -/// Record "last edited by" for a batch of Lexical nodes touched by a single -/// update. Stamps all rows with the current time. -pub async fn record_blame( +/// A single pending "last edited by" event, buffered until the +/// next alarm tick flushes everything +#[derive(Debug, Clone)] +pub struct BlameEvent { + pub document_id: String, + pub node_id: String, + pub peer_id: u64, + pub timestamp_ms: i64, +} + +/// Maximum statements per D1 `batch()` call. D1 has a per-batch statement +/// limit; chunking keeps us comfortably under it. +const BATCH_CHUNK_SIZE: usize = 100; + +/// Bulk-upsert a list of buffered blame events. Uses D1's `batch()` so all +/// events in a chunk commit in a single round-trip. +pub async fn insert_blame_many( env: &worker::Env, - document_id: &str, - peer_id: u64, - node_ids: &[String], + events: &[BlameEvent], ) -> worker::Result<()> { - let now_ms = web_time::SystemTime::now() - .duration_since(web_time::UNIX_EPOCH) - .map(|d| d.as_millis() as i64) - .unwrap_or(0); - tracing::info!( - document_id = document_id, - peer_id = peer_id, - count = node_ids.len(), - node_ids = ?node_ids, - "record_blame" - ); - for node_id in node_ids { - let db = env.d1(crate::constants::USER_PEER_D1_BINDING)?; - if let Err(e) = upsert_blame(db, document_id, node_id, peer_id, now_ms).await { - tracing::error!(error = ?e, node_id = node_id, "upsert_blame failed"); - return Err(e); - } + if events.is_empty() { + return Ok(()); } - Ok(()) -} + tracing::info!(count = events.len(), "insert_blame_many"); -/// Upsert a single (document_id, node_id) -> (peer_id, timestamp_ms) row. -pub async fn upsert_blame( - db: D1Database, - document_id: &str, - node_id: &str, - peer_id: u64, - timestamp_ms: i64, -) -> worker::Result<()> { - let result = db - .prepare( - "INSERT INTO blame (document_id, node_id, peer_id, timestamp_ms) \ - VALUES (?, ?, ?, ?) \ - ON CONFLICT(document_id, node_id) DO UPDATE SET \ - peer_id = excluded.peer_id, \ - timestamp_ms = excluded.timestamp_ms;", - ) - .bind(&[ - document_id.into(), - node_id.into(), - peer_id.to_string().into(), - // d1 js doesn't support bigint - (timestamp_ms as f64).into(), - ])? - .run() - .await?; - if let Some(e) = result.error() { - error!(error = e, "upsert_blame D1 error"); - return Err(worker::Error::from(e)); + for chunk in events.chunks(BATCH_CHUNK_SIZE) { + let db = env.d1(crate::constants::USER_PEER_D1_BINDING)?; + let stmts: Vec<_> = chunk + .iter() + .map(|e| { + db.prepare( + "INSERT INTO blame (document_id, node_id, peer_id, timestamp_ms) \ + VALUES (?, ?, ?, ?) \ + ON CONFLICT(document_id, node_id) DO UPDATE SET \ + peer_id = excluded.peer_id, \ + timestamp_ms = excluded.timestamp_ms;", + ) + .bind(&[ + e.document_id.as_str().into(), + e.node_id.as_str().into(), + e.peer_id.to_string().into(), + // d1 js doesn't support bigint + (e.timestamp_ms as f64).into(), + ]) + }) + .collect::>>()?; + db.batch(stmts).await?; } Ok(()) } + #[derive(serde::Deserialize, serde::Serialize)] pub struct BlameRow { pub peer_id: String, diff --git a/rust/sync-service/src/durable_object.rs b/rust/sync-service/src/durable_object.rs index ecd1dec7a8..624cf35538 100644 --- a/rust/sync-service/src/durable_object.rs +++ b/rust/sync-service/src/durable_object.rs @@ -118,6 +118,8 @@ pub struct DocumentSyncSession { /// a map from websocket's ID's to websocket metadata ws_meta_map: Arc>, msg_buffer: Arc>>, + /// Buffered blame events. Flushed via D1 batch on each alarm tick. + pending_blame: Arc>>, } mod u64_serde_strings { @@ -299,6 +301,37 @@ impl DocumentSyncSession { pub fn get_websockets(&self) -> Vec { self.state.get_websockets() } + + /// Buffer blame events in memory. Cheap synchronous push; events are + /// flushed to D1 in a single `batch()` from the alarm tick. + pub fn push_blame_events(&self, events: Vec) { + if events.is_empty() { + return; + } + self.pending_blame + .lock("DocumentSyncSession::push_blame_events") + .extend(events); + } + + /// Drain the pending blame buffer and write all events via a single D1 + /// batch in the background. Returns immediately; the actual write runs + /// inside `wait_until` so the alarm handler doesn't block on D1. + fn flush_pending_blame(&self) { + let pending: Vec = std::mem::take( + &mut *self + .pending_blame + .lock("DocumentSyncSession::flush_pending_blame"), + ); + if pending.is_empty() { + return; + } + let env = self.env.clone(); + self.state.wait_until(async move { + if let Err(e) = crate::d1::insert_blame_many(&env, &pending).await { + warn!(error = ?e, "failed to flush pending blame"); + } + }); + } async fn inner_fetch(&self, req: Request) -> Result { let url = req.url()?; let matched = ROUTER @@ -830,6 +863,7 @@ impl DurableObject for DocumentSyncSession { awareness: EphemeralStore::new(5_000), ws_meta_map: Arc::new(Mutex::new(Default::default())), msg_buffer: Arc::new(Mutex::new(vec![])), + pending_blame: Arc::new(Mutex::new(Vec::new())), } } @@ -956,6 +990,8 @@ impl DurableObject for DocumentSyncSession { }); } + self.flush_pending_blame(); + // Re-arm the alarm while clients are connected so the in-memory state // stays warm and pending updates keep getting persisted. Updates reach // peers when they happen (PeerUpdate broadcast); pushing a full diff --git a/rust/sync-service/src/websocket.rs b/rust/sync-service/src/websocket.rs index aa7a161503..02803261cd 100644 --- a/rust/sync-service/src/websocket.rs +++ b/rust/sync-service/src/websocket.rs @@ -130,28 +130,27 @@ pub async fn process_message( .append_pending_operation(&update, document_state) .await?; - // Record "last edited by" for each Lexical node touched. Runs - // in the background via `wait_until` so the ACK + peer broadcast - // below don't block on D1, and the write still completes even - // after we return from this handler. + // Buffer "last edited by" events in memory. The DO's alarm flushes + // them via a single D1 `batch()` every few seconds, so this stays + // off the edit hot path entirely. if !touched_nodes.is_empty() { let peer_ids = Wsm::new(dss, ws).get_peer_ids().await.unwrap_or_default(); if let Some(&peer_id) = peer_ids.first() { - let env = dss.env().clone(); - let document_id = document_id.to_string(); - dss.wait_until(async move { - if let Err(e) = - crate::d1::record_blame(&env, &document_id, peer_id, &touched_nodes) - .await - { - tracing::warn!( - error = ?e, - document_id = document_id, - peer_id = peer_id, - "record_blame failed" - ); - } - }); + let now_ms = web_time::SystemTime::now() + .duration_since(web_time::UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .unwrap_or(0); + dss.push_blame_events( + touched_nodes + .into_iter() + .map(|node_id| crate::d1::BlameEvent { + document_id: document_id.to_string(), + node_id, + peer_id, + timestamp_ms: now_ms, + }) + .collect(), + ); } } From 50071a4dd5eb2233930bf8103e826d8886397b78 Mon Sep 17 00:00:00 2001 From: Wolf Mermelstein Date: Tue, 16 Jun 2026 19:49:02 +0000 Subject: [PATCH 6/7] clean up --- rust/sync-service/src/durable_object.rs | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/rust/sync-service/src/durable_object.rs b/rust/sync-service/src/durable_object.rs index 624cf35538..bf095138b0 100644 --- a/rust/sync-service/src/durable_object.rs +++ b/rust/sync-service/src/durable_object.rs @@ -283,27 +283,10 @@ async fn bump_alarm(state: &State) -> Result<()> { } impl DocumentSyncSession { - pub fn env(&self) -> &Env { - &self.env - } - - /// Spawn a fire-and-forget task that the DO will keep alive past the - /// current request, but won't block subsequent requests on. Use for - /// background writes (e.g. blame, snapshot push) that mustn't be cancelled - /// when the handler returns. - pub fn wait_until(&self, future: F) - where - F: std::future::Future + 'static, - { - self.state.wait_until(future); - } - pub fn get_websockets(&self) -> Vec { self.state.get_websockets() } - /// Buffer blame events in memory. Cheap synchronous push; events are - /// flushed to D1 in a single `batch()` from the alarm tick. pub fn push_blame_events(&self, events: Vec) { if events.is_empty() { return; @@ -596,9 +579,6 @@ impl DocumentSyncSession { }) } - /// Return who last edited the given Lexical node and when. Joins blame - /// with peer_user_map in D1 so user_id (MacroId) comes back resolved. - /// Returns 404 if the node has no recorded edits. async fn blame_handler(&self, node_id: Option<&str>) -> Result { let node_id = node_id.ok_or_else(|| Error::from("missing node_id"))?; let document_id = self.document_id().await?.to_string(); From 16e061d945604f0e8406191044fe4395b4687c33 Mon Sep 17 00:00:00 2001 From: Wolf Mermelstein Date: Tue, 16 Jun 2026 19:53:17 +0000 Subject: [PATCH 7/7] add helper --- rust/sync-service/src/websocket.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/rust/sync-service/src/websocket.rs b/rust/sync-service/src/websocket.rs index e9ecfc8f2f..fc1a2cfba6 100644 --- a/rust/sync-service/src/websocket.rs +++ b/rust/sync-service/src/websocket.rs @@ -14,6 +14,13 @@ use crate::{ storage::SessionStorage, }; +fn now_ms() -> i64 { + web_time::SystemTime::now() + .duration_since(web_time::UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .unwrap_or(0) +} + fn serialize<'a, T: bebop::Record<'a>>( obj: T, msg_buf: &mut Vec, @@ -117,16 +124,10 @@ pub async fn process_message( .append_pending_operation(&update, document_state) .await?; - // Buffer "last edited by" events in memory. The DO's alarm flushes - // them via a single D1 `batch()` every few seconds, so this stays - // off the edit hot path entirely. if !touched_nodes.is_empty() { let peer_ids = Wsm::new(dss, ws).get_peer_ids().await.unwrap_or_default(); if let Some(&peer_id) = peer_ids.first() { - let now_ms = web_time::SystemTime::now() - .duration_since(web_time::UNIX_EPOCH) - .map(|d| d.as_millis() as i64) - .unwrap_or(0); + let now_ms = now_ms(); dss.push_blame_events( touched_nodes .into_iter()