diff --git a/apps/skit/src/websocket_handlers.rs b/apps/skit/src/websocket_handlers.rs index 248a448b..e6f5eb6e 100644 --- a/apps/skit/src/websocket_handlers.rs +++ b/apps/skit/src/websocket_handlers.rs @@ -975,9 +975,18 @@ async fn handle_tune_node_fire_and_forget( } { + // Store sanitized params: strip transient sync metadata + // (_sender, _rev, etc.) from the durable pipeline model. + // Top-level keys prefixed with `_` are reserved for + // in-flight metadata and must not leak into persistence + // or GetPipeline responses. + let mut durable_params = params.clone(); + if let serde_json::Value::Object(ref mut map) = durable_params { + map.retain(|k, _| !k.starts_with('_')); + } let mut pipeline = session.pipeline.lock().await; if let Some(node) = pipeline.nodes.get_mut(&node_id) { - node.params = Some(params.clone()); + node.params = Some(durable_params); } else { warn!( node_id = %node_id, diff --git a/crates/nodes/src/video/compositor/mod.rs b/crates/nodes/src/video/compositor/mod.rs index 5d64ce67..907879b5 100644 --- a/crates/nodes/src/video/compositor/mod.rs +++ b/crates/nodes/src/video/compositor/mod.rs @@ -285,6 +285,12 @@ pub struct CompositorNode { input_pins: Vec, /// Next input ID for dynamic pin naming. next_input_id: usize, + /// Sender nonce from the last `UpdateParams` (`_sender` field). + /// Stamped into view data so clients can detect stale self-echoes. + config_sender: String, + /// Config revision from the last `UpdateParams` (`_rev` field). + /// Stamped into view data alongside `config_sender`. + config_rev: u64, } impl CompositorNode { @@ -311,7 +317,14 @@ impl CompositorNode { }, ); - Self { config, limits, input_pins, next_input_id } + Self { + config, + limits, + input_pins, + next_input_id, + config_sender: String::new(), + config_rev: 0, + } } /// The set of video packet types accepted by compositor input pins. @@ -602,14 +615,26 @@ impl ProcessorNode for CompositorNode { tracing::info!("CompositorNode received shutdown"); break; }, - NodeControlMessage::UpdateParams(params) => { + NodeControlMessage::UpdateParams(ref params) => { + // Extract transient sync metadata before + // deserialization strips unknown fields. + // Always overwrite (not conditionally set) so that + // non-stamped UpdateParams clears stale values. + self.config_sender = params.get("_sender") + .and_then(|v| v.as_str()) + .map(str::to_string) + .unwrap_or_default(); + self.config_rev = params.get("_rev") + .and_then(serde_json::Value::as_u64) + .unwrap_or(0); + let old_fps = self.config.fps; Self::apply_update_params( &mut self.config, &self.limits, &mut image_overlays, &mut text_overlays, - params, + params.clone(), &mut stats_tracker, ); layer_configs_dirty = true; @@ -719,7 +744,13 @@ impl ProcessorNode for CompositorNode { // Emit layout via view data if it changed. if last_layout.as_ref() != Some(&scene.layout) { - if let Ok(json) = serde_json::to_value(&scene.layout) { + if let Ok(mut json) = serde_json::to_value(&scene.layout) { + // Stamp view data with the sender/rev from the last + // UpdateParams so clients can detect stale self-echoes. + if !self.config_sender.is_empty() { + json["_sender"] = serde_json::Value::from(self.config_sender.as_str()); + json["_rev"] = serde_json::Value::from(self.config_rev); + } view_data_helpers::emit_view_data(&view_data_tx, &node_name, || json); } last_layout = Some(scene.layout.clone()); diff --git a/ui/src/hooks/compositorCommit.test.ts b/ui/src/hooks/compositorCommit.test.ts new file mode 100644 index 00000000..86bc93fc --- /dev/null +++ b/ui/src/hooks/compositorCommit.test.ts @@ -0,0 +1,169 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * Unit tests for the compositor commit adapter's causal-consistency stamping. + * + * Verifies that every commit path (commitLayers, commitOverlays, commitAll) + * injects `_sender` and `_rev` into outgoing config/params, and that the + * rev counter increments monotonically across calls. + */ + +import { describe, it, expect, vi, beforeEach } from 'vitest'; + +import { createCommitAdapter } from './compositorCommit'; +import type { LayerState, TextOverlayState, ImageOverlayState } from './compositorLayerParsers'; +import { resetAllConfigRevs } from './useConfigRev'; + +// Mock the WebSocket service to control the client nonce +vi.mock('@/services/websocket', () => ({ + getWebSocketService: () => ({ + getClientNonce: () => 'test-nonce-123', + }), +})); + +const NODE_ID = 'compositor'; + +function makeLayerState(id: string): LayerState { + return { + id, + x: 0, + y: 0, + width: 640, + height: 480, + opacity: 1.0, + zIndex: 0, + rotationDegrees: 0, + mirrorHorizontal: false, + mirrorVertical: false, + visible: true, + cropX: 0.5, + cropY: 0.5, + cropZoom: 1.0, + cropShape: 'rect', + }; +} + +function makeRefs() { + return { + paramsRef: { current: { width: 1280, height: 720 } as Record }, + layersRef: { current: [makeLayerState('in_0')] as LayerState[] }, + textOverlaysRef: { current: [] as TextOverlayState[] }, + imageOverlaysRef: { current: [] as ImageOverlayState[] }, + }; +} + +beforeEach(() => { + resetAllConfigRevs(); +}); + +describe('CommitAdapter causal-consistency stamping', () => { + it('commitLayers via onConfigChange stamps _sender and _rev', () => { + const onConfigChange = vi.fn(); + const refs = makeRefs(); + + const adapter = createCommitAdapter( + NODE_ID, + onConfigChange, + undefined, + refs.paramsRef, + refs.layersRef, + refs.textOverlaysRef, + refs.imageOverlaysRef + )!; + + adapter.commitLayers([makeLayerState('in_0')]); + + expect(onConfigChange).toHaveBeenCalledTimes(1); + const config = onConfigChange.mock.calls[0][1] as Record; + expect(config._sender).toBe('test-nonce-123'); + expect(config._rev).toBe(1); + }); + + it('commitLayers via onParamChange does NOT send standalone _sender/_rev (avoids param wipe)', () => { + const onParamChange = vi.fn(); + const refs = makeRefs(); + + const adapter = createCommitAdapter( + NODE_ID, + undefined, + onParamChange, + refs.paramsRef, + refs.layersRef, + refs.textOverlaysRef, + refs.imageOverlaysRef + )!; + + adapter.commitLayers([makeLayerState('in_0')]); + + // Only the real param is sent — no standalone _sender/_rev messages + // because each tuneNode call replaces the server's full node.params. + expect(onParamChange).toHaveBeenCalledTimes(1); + expect(onParamChange.mock.calls[0][1]).toBe('layers'); + }); + + it('rev increments monotonically across multiple commits', () => { + const onConfigChange = vi.fn(); + const refs = makeRefs(); + + const adapter = createCommitAdapter( + NODE_ID, + onConfigChange, + undefined, + refs.paramsRef, + refs.layersRef, + refs.textOverlaysRef, + refs.imageOverlaysRef + )!; + + adapter.commitLayers([makeLayerState('in_0')]); + adapter.commitOverlays([], []); + adapter.commitAll([makeLayerState('in_0')], [], []); + + const rev1 = (onConfigChange.mock.calls[0][1] as Record)._rev; + const rev2 = (onConfigChange.mock.calls[1][1] as Record)._rev; + const rev3 = (onConfigChange.mock.calls[2][1] as Record)._rev; + + expect(rev1).toBe(1); + expect(rev2).toBe(2); + expect(rev3).toBe(3); + }); + + it('commitOverlays via onParamChange does NOT send standalone _sender/_rev', () => { + const onParamChange = vi.fn(); + const refs = makeRefs(); + + const adapter = createCommitAdapter( + NODE_ID, + undefined, + onParamChange, + refs.paramsRef, + refs.layersRef, + refs.textOverlaysRef, + refs.imageOverlaysRef + )!; + + adapter.commitOverlays([], []); + + // Only real params sent — no _sender/_rev standalone messages + expect(onParamChange).toHaveBeenCalledTimes(2); + const calls = onParamChange.mock.calls; + expect(calls[0][1]).toBe('text_overlays'); + expect(calls[1][1]).toBe('image_overlays'); + }); + + it('returns null when both callbacks are undefined', () => { + const refs = makeRefs(); + const adapter = createCommitAdapter( + NODE_ID, + undefined, + undefined, + refs.paramsRef, + refs.layersRef, + refs.textOverlaysRef, + refs.imageOverlaysRef + ); + expect(adapter).toBeNull(); + }); +}); diff --git a/ui/src/hooks/compositorCommit.ts b/ui/src/hooks/compositorCommit.ts index a77ae6ad..17186bb5 100644 --- a/ui/src/hooks/compositorCommit.ts +++ b/ui/src/hooks/compositorCommit.ts @@ -20,6 +20,7 @@ import { serializeTextOverlays, } from './compositorLayerParsers'; import type { LayerState, TextOverlayState, ImageOverlayState } from './compositorLayerParsers'; +import { bumpConfigRev, getClientNonce } from './useConfigRev'; // ── Commit adapter ────────────────────────────────────────────────────────── @@ -58,14 +59,24 @@ export function createCommitAdapter( ): CommitAdapter | null { if (!onConfigChange && !onParamChange) return null; + /** Stamp a config object with causal-consistency metadata. */ + function stamp(config: Record): Record { + const rev = bumpConfigRev(nodeId); + return { ...config, _sender: getClientNonce(), _rev: rev }; + } + + // NOTE: The onParamChange path (tuneNode) sends each key as a separate + // UpdateParams WS message that replaces the server's full node.params. + // Sending _sender/_rev as standalone messages would wipe durable params + // to {} after stripping. Only the onConfigChange path (tuneNodeConfig) + // can safely carry stamped metadata because it sends the full config + // object in a single message. + return { commitLayers(layers: LayerState[]) { if (onConfigChange) { - const config = buildConfig( - paramsRef.current, - layers, - textOverlaysRef.current, - imageOverlaysRef.current + const config = stamp( + buildConfig(paramsRef.current, layers, textOverlaysRef.current, imageOverlaysRef.current) ); onConfigChange(nodeId, config); } else if (onParamChange) { @@ -75,7 +86,7 @@ export function createCommitAdapter( commitOverlays(text: TextOverlayState[], img: ImageOverlayState[]) { if (onConfigChange) { - const config = buildConfig(paramsRef.current, layersRef.current, text, img); + const config = stamp(buildConfig(paramsRef.current, layersRef.current, text, img)); onConfigChange(nodeId, config); } else if (onParamChange) { onParamChange(nodeId, 'text_overlays', serializeTextOverlays(text)); @@ -90,7 +101,7 @@ export function createCommitAdapter( changed?: { layers?: boolean; overlays?: boolean } ) { if (onConfigChange) { - const config = buildConfig(paramsRef.current, layers, text, img); + const config = stamp(buildConfig(paramsRef.current, layers, text, img)); onConfigChange(nodeId, config); } else if (onParamChange) { const sendLayers = changed?.layers ?? true; diff --git a/ui/src/hooks/compositorServerSync.test.ts b/ui/src/hooks/compositorServerSync.test.ts new file mode 100644 index 00000000..3a0319a8 --- /dev/null +++ b/ui/src/hooks/compositorServerSync.test.ts @@ -0,0 +1,77 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * Unit tests for stale view-data gating in compositorServerSync. + * + * These tests verify that: + * - View data with matching _sender and _rev <= local rev is suppressed + * - View data from other senders is always applied + * - View data with _rev > local rev is applied (newer from server) + * - The activeInteractionRef guard suppresses view data during interactions + */ + +import { describe, it, expect } from 'vitest'; + +import type { ResolvedLayer } from '@/types/generated/compositor-types'; + +import type { LayerState } from './compositorLayerParsers'; +import { mapServerLayers } from './compositorServerSync'; + +function makeLayer(id: string, x: number, width: number): LayerState { + return { + id, + x, + y: 0, + width, + height: 720, + opacity: 1.0, + zIndex: 0, + rotationDegrees: 0, + mirrorHorizontal: false, + mirrorVertical: false, + visible: true, + cropX: 0.5, + cropY: 0.5, + cropZoom: 1.0, + cropShape: 'rect', + }; +} + +describe('mapServerLayers — pure geometry merge', () => { + it('updates geometry from server for matching layers', () => { + const prev = [makeLayer('in_0', 0, 1280)]; + const serverLayers: ResolvedLayer[] = [{ id: 'in_0', x: 160, y: 0, width: 960, height: 720 }]; + + const result = mapServerLayers(prev, serverLayers); + + expect(result[0].x).toBe(160); + expect(result[0].width).toBe(960); + // Config-driven fields preserved + expect(result[0].opacity).toBe(1.0); + expect(result[0].visible).toBe(true); + }); + + it('returns same reference when geometry is unchanged', () => { + const prev = [makeLayer('in_0', 160, 960)]; + const serverLayers: ResolvedLayer[] = [{ id: 'in_0', x: 160, y: 0, width: 960, height: 720 }]; + + const result = mapServerLayers(prev, serverLayers); + + expect(result).toBe(prev); // referential equality + }); + + it('filters out layers not in local state', () => { + const prev = [makeLayer('in_0', 0, 1280)]; + const serverLayers: ResolvedLayer[] = [ + { id: 'in_0', x: 160, y: 0, width: 960, height: 720 }, + { id: 'in_1', x: 0, y: 0, width: 320, height: 240 }, + ]; + + const result = mapServerLayers(prev, serverLayers); + + expect(result).toHaveLength(1); + expect(result[0].id).toBe('in_0'); + }); +}); diff --git a/ui/src/hooks/compositorServerSync.ts b/ui/src/hooks/compositorServerSync.ts index 8b805594..e6800616 100644 --- a/ui/src/hooks/compositorServerSync.ts +++ b/ui/src/hooks/compositorServerSync.ts @@ -44,6 +44,7 @@ import { setTextOverlaysInStore, } from './compositorAtoms'; import type { LayerState, TextOverlayState, OverlayBase } from './compositorLayerParsers'; +import { getLocalConfigRev, getClientNonce } from './useConfigRev'; // ── Pure helpers ──────────────────────────────────────────────────────────── @@ -128,7 +129,8 @@ export function useServerLayoutSync( sessionId: string | undefined, nodeId: string, store: CompositorStore, - dragStateRef: React.MutableRefObject + dragStateRef: React.MutableRefObject, + activeInteractionRef?: React.MutableRefObject ): void { useEffect(() => { if (!sessionId) return; @@ -138,6 +140,22 @@ export function useServerLayoutSync( // Skip during pointer drag/resize to avoid stale server geometry // overwriting in-flight DOM positions. if (dragStateRef.current) return; + // Skip during any active live-mode interaction (slider drag, etc.) + // to avoid stale server values overwriting in-flight client state. + if (activeInteractionRef?.current) return; + + const vd = viewData as Record; + + // Stale view-data gate: if this view data originated from our own + // config change and the rev is <= our local counter, skip it. + const sender = typeof vd._sender === 'string' ? vd._sender : undefined; + const rev = typeof vd._rev === 'number' ? vd._rev : undefined; + if (sender && sender === getClientNonce() && rev !== undefined) { + const localRev = getLocalConfigRev(nodeId); + if (rev <= localRev) { + return; + } + } const layout = viewData as CompositorLayout; if (!Array.isArray(layout.layers)) return; @@ -172,5 +190,5 @@ export function useServerLayoutSync( applyServerLayout(defaultSessionStore.get(viewDataAtom)); }); return unsubscribe; - }, [sessionId, nodeId, store, dragStateRef]); + }, [sessionId, nodeId, store, dragStateRef, activeInteractionRef]); } diff --git a/ui/src/hooks/useCompositorLayers.monitor-flow.test.ts b/ui/src/hooks/useCompositorLayers.monitor-flow.test.ts index 270f7294..f412f1f5 100644 --- a/ui/src/hooks/useCompositorLayers.monitor-flow.test.ts +++ b/ui/src/hooks/useCompositorLayers.monitor-flow.test.ts @@ -34,6 +34,7 @@ import { } from './compositorAtoms'; import type { UseCompositorLayersOptions } from './useCompositorLayers'; import { useCompositorLayers } from './useCompositorLayers'; +import { resetAllConfigRevs, bumpConfigRev } from './useConfigRev'; // ── Helpers ───────────────────────────────────────────────────────────────── @@ -159,6 +160,7 @@ afterEach(() => { // Clean up store between tests clearSessionAtoms(SESSION_ID); useSessionStore.getState().clearSession(SESSION_ID); + resetAllConfigRevs(); }); // ── Tests ─────────────────────────────────────────────────────────────────── @@ -591,3 +593,126 @@ describe('Monitor view data flow integration', () => { expect(text1[0].y).toBe(200); }); }); + +// ── Causal consistency: stale view-data gating ────────────────────────────── + +// Mock the WebSocket service for nonce control in stale-echo tests. +// The mock must be hoisted so vi.mock runs before imports. +vi.mock('@/services/websocket', () => ({ + getWebSocketService: () => ({ + getClientNonce: () => 'mock-nonce-abc', + }), +})); + +describe('Stale view-data gating (causal consistency)', () => { + it('view data stamped with own nonce and stale rev is suppressed', () => { + seedStore(); + + const opts = monitorOptions(); + const { result } = renderHook( + (props: UseCompositorLayersOptions) => useCompositorLayers(props), + { initialProps: opts } + ); + + // Simulate the client having sent 3 config updates + bumpConfigRev(NODE_ID); // rev 1 + bumpConfigRev(NODE_ID); // rev 2 + bumpConfigRev(NODE_ID); // rev 3 + + // Initial state: full canvas fallback + const layers0 = getLayersFromStore(result.current.store); + expect(layers0[0].x).toBe(0); + expect(layers0[0].width).toBe(1280); + + // Server echoes view data stamped with our nonce at rev 2 (stale). + const staleLayout = { + ...makeServerLayout(), + _sender: 'mock-nonce-abc', + _rev: 2, + }; + act(() => pushServerViewData(staleLayout)); + + // Stale echo should be suppressed — geometry unchanged + const layers1 = getLayersFromStore(result.current.store); + expect(layers1[0].x).toBe(0); + expect(layers1[0].width).toBe(1280); + }); + + it('view data from a different sender is always applied', () => { + seedStore(); + + const opts = monitorOptions(); + const { result } = renderHook( + (props: UseCompositorLayersOptions) => useCompositorLayers(props), + { initialProps: opts } + ); + + // Bump local rev + bumpConfigRev(NODE_ID); // rev 1 + + // Server sends view data from a different client + const otherClientLayout = { + ...makeServerLayout(), + _sender: 'other-client-nonce', + _rev: 1, + }; + act(() => pushServerViewData(otherClientLayout)); + + // Should be applied — different sender + const layers1 = getLayersFromStore(result.current.store); + expect(layers1[0].x).toBe(160); + expect(layers1[0].width).toBe(960); + }); + + it('view data without _sender/_rev metadata is always applied', () => { + seedStore(); + + const opts = monitorOptions(); + const { result } = renderHook( + (props: UseCompositorLayersOptions) => useCompositorLayers(props), + { initialProps: opts } + ); + + // Bump local rev + bumpConfigRev(NODE_ID); + + // Server sends view data without any causal metadata + act(() => pushServerViewData(makeServerLayout())); + + // Should be applied — no metadata means no gating + const layers1 = getLayersFromStore(result.current.store); + expect(layers1[0].x).toBe(160); + expect(layers1[0].width).toBe(960); + }); + + it('activeInteractionRef suppresses view data during interactions', () => { + seedStore(); + + const opts = monitorOptions(); + const { result } = renderHook( + (props: UseCompositorLayersOptions) => useCompositorLayers(props), + { initialProps: opts } + ); + + // Simulate an active interaction (slider drag) + result.current.activeInteractionRef.current = true; + + // Server sends view data + act(() => pushServerViewData(makeServerLayout())); + + // View data should be suppressed during interaction + const layers1 = getLayersFromStore(result.current.store); + expect(layers1[0].x).toBe(0); + expect(layers1[0].width).toBe(1280); + + // End the interaction + result.current.activeInteractionRef.current = false; + + // Now view data should be applied + act(() => pushServerViewData(makeServerLayout())); + + const layers2 = getLayersFromStore(result.current.store); + expect(layers2[0].x).toBe(160); + expect(layers2[0].width).toBe(960); + }); +}); diff --git a/ui/src/hooks/useCompositorLayers.ts b/ui/src/hooks/useCompositorLayers.ts index d9e7ca1e..6c7681e1 100644 --- a/ui/src/hooks/useCompositorLayers.ts +++ b/ui/src/hooks/useCompositorLayers.ts @@ -129,6 +129,9 @@ export interface UseCompositorLayersResult { /** Atomically reassign z-index values for all layer types in one commit. * Each entry maps a layer id + kind to its new z-index. */ reorderLayers: (entries: Array<{ id: string; kind: LayerKind; zIndex: number }>) => void; + /** Ref flag: true while a live-mode interaction (slider drag, etc.) is in + * progress. Set by consumers to suppress stale server echo-backs. */ + activeInteractionRef: React.MutableRefObject; /** Pre-assembled deps bag for useCompositorKeyboard. */ keyboardDeps: CompositorKeyboardDeps; } @@ -267,6 +270,11 @@ export const useCompositorLayers = ( }>({ vertical: null, horizontal: null }); const dragStateRef = useRef(null); + // Per-node flag: true while any live-mode interaction (slider drag, etc.) + // is in progress. Guards useServerLayoutSync so stale server geometry + // doesn't overwrite in-flight client state. + const activeInteractionRef = useRef(false); + // ── Commit / persistence ─────────────────────────────────────────────────── const { commitAdapter, throttledConfigChange, throttledOverlayCommit } = useCompositorCommit({ nodeId, @@ -347,7 +355,7 @@ export const useCompositorLayers = ( }, [params, canvasWidth, canvasHeight, isMonitorView, store]); // ── Server-driven layout (Monitor view only) ─────────────────────────── - useServerLayoutSync(sessionId, nodeId, store, dragStateRef); + useServerLayoutSync(sessionId, nodeId, store, dragStateRef, activeInteractionRef); // ── Find layer across all types ───────────────────────────────────────── const findAnyLayer = useCallback( @@ -440,6 +448,7 @@ export const useCompositorLayers = ( updateImageOverlay: overlayOps.updateImageOverlay, removeImageOverlay: overlayOps.removeImageOverlay, reorderLayers: overlayOps.reorderLayers, + activeInteractionRef, keyboardDeps: { selectedLayerId, selectLayer: overlayOps.selectLayer, diff --git a/ui/src/hooks/useConfigRev.test.ts b/ui/src/hooks/useConfigRev.test.ts new file mode 100644 index 00000000..4d19ed61 --- /dev/null +++ b/ui/src/hooks/useConfigRev.test.ts @@ -0,0 +1,53 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * Unit tests for the per-node config revision counter module. + * + * These tests verify the core causal-consistency primitives: + * - Monotonic rev counter per node + * - Independent counters across nodes + * - Reset clears all counters + * - bumpConfigRev returns the new value + */ + +import { describe, it, expect, beforeEach } from 'vitest'; + +import { getLocalConfigRev, bumpConfigRev, resetAllConfigRevs } from './useConfigRev'; + +beforeEach(() => { + resetAllConfigRevs(); +}); + +describe('useConfigRev — singleton rev counters', () => { + it('starts at 0 for unknown nodes', () => { + expect(getLocalConfigRev('node_a')).toBe(0); + }); + + it('bumpConfigRev increments and returns the new value', () => { + expect(bumpConfigRev('node_a')).toBe(1); + expect(bumpConfigRev('node_a')).toBe(2); + expect(bumpConfigRev('node_a')).toBe(3); + expect(getLocalConfigRev('node_a')).toBe(3); + }); + + it('counters are independent per node', () => { + bumpConfigRev('node_a'); + bumpConfigRev('node_a'); + bumpConfigRev('node_b'); + + expect(getLocalConfigRev('node_a')).toBe(2); + expect(getLocalConfigRev('node_b')).toBe(1); + }); + + it('resetAllConfigRevs clears all counters', () => { + bumpConfigRev('node_a'); + bumpConfigRev('node_b'); + + resetAllConfigRevs(); + + expect(getLocalConfigRev('node_a')).toBe(0); + expect(getLocalConfigRev('node_b')).toBe(0); + }); +}); diff --git a/ui/src/hooks/useConfigRev.ts b/ui/src/hooks/useConfigRev.ts new file mode 100644 index 00000000..7f936884 --- /dev/null +++ b/ui/src/hooks/useConfigRev.ts @@ -0,0 +1,48 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * Per-node config revision counter for causal consistency. + * + * Each node's outgoing config carries a monotonically increasing `_rev` + * alongside the WebSocket session's `_sender` nonce. Consumers + * (handleNodeParamsChanged, useServerLayoutSync) compare the incoming + * `(_sender, _rev)` against the local counter to detect and discard + * stale self-echoes. + * + * The counter is per-node because different nodes may be edited at + * different rates. The `_sender` nonce comes from WebSocketService + * and is stable for one WS connection lifetime. + */ + +import { getWebSocketService } from '@/services/websocket'; + +// ── Singleton rev counters ────────────────────────────────────────────────── + +/** Per-node config revision counters, keyed by nodeId. + * Shared across all hook instances — a ref-map so React doesn't + * re-render when the counter bumps. */ +const nodeRevCounters = new Map(); + +/** Get the current local config rev for a node (non-reactive). */ +export function getLocalConfigRev(nodeId: string): number { + return nodeRevCounters.get(nodeId) ?? 0; +} + +/** Bump and return the new config rev for a node. */ +export function bumpConfigRev(nodeId: string): number { + const next = (nodeRevCounters.get(nodeId) ?? 0) + 1; + nodeRevCounters.set(nodeId, next); + return next; +} + +/** Reset all config rev counters (e.g. on WS reconnect). */ +export function resetAllConfigRevs(): void { + nodeRevCounters.clear(); +} + +/** Get the current client nonce from the WebSocket service. */ +export function getClientNonce(): string { + return getWebSocketService().getClientNonce(); +} diff --git a/ui/src/services/websocket.ts b/ui/src/services/websocket.ts index c644cef1..78713279 100644 --- a/ui/src/services/websocket.ts +++ b/ui/src/services/websocket.ts @@ -4,6 +4,7 @@ import { v4 as uuidv4 } from 'uuid'; +import { getLocalConfigRev } from '@/hooks/useConfigRev'; import { batchWriteNodeStates, batchWriteNodeStats, @@ -54,6 +55,12 @@ export class WebSocketService { private isIntentionallyClosed = false; private subscribedSessions: Set = new Set(); + /** Stable sender nonce for this WebSocket session. + * Generated on connect, reset on reconnect. Used in `_sender` fields + * so the client can distinguish its own stale echoes from other clients' + * updates. Exposed via `getClientNonce()`. */ + private clientNonce: string = uuidv4(); + // ── Frame-level batching for high-frequency events ────────────────── // Buffer node-state and node-stats updates that arrive in rapid // succession (e.g. during session initialisation) and flush them as a @@ -86,6 +93,9 @@ export class WebSocketService { this.ws.onopen = () => { logger.info('Connected (onopen fired)'); this.reconnectAttempts = 0; + // Reset sender nonce on each new connection so stale echoes from + // previous sessions are never mistaken for the current session's. + this.clientNonce = uuidv4(); this.notifyConnectionStatus(true); this.flushMessageQueue(); this.resubscribeToSessions(); @@ -312,10 +322,29 @@ export class WebSocketService { // // useSessionStore.getState().updateNodeParams(session_id, node_id, params as Record); - // Batch all param updates into a single store update to avoid - // N intermediate states and N selector re-evaluations. if (params && typeof params === 'object' && !Array.isArray(params)) { - writeNodeParams(node_id, params as Record, session_id); + const p = params as Record; + + // Stale echo-back gate: if this event originated from us and the rev + // is <= our local counter, it's a stale echo — skip it. + const sender = typeof p._sender === 'string' ? p._sender : undefined; + const rev = typeof p._rev === 'number' ? p._rev : undefined; + if (sender && sender === this.clientNonce && rev !== undefined) { + const localRev = getLocalConfigRev(node_id); + if (rev <= localRev) { + return; + } + } + + // Strip transient sync metadata before writing to local state. + const cleaned: Record = {}; + for (const [k, v] of Object.entries(p)) { + if (!k.startsWith('_')) { + cleaned[k] = v; + } + } + + writeNodeParams(node_id, cleaned, session_id); } } @@ -535,6 +564,13 @@ export class WebSocketService { isConnected(): boolean { return this.ws?.readyState === WebSocket.OPEN; } + + /** Return the current sender nonce for this WebSocket session. + * Used by the config revision system to stamp outgoing params with + * `_sender` so the client can identify its own echoes. */ + getClientNonce(): string { + return this.clientNonce; + } } // Singleton instance diff --git a/ui/src/views/MonitorView.tsx b/ui/src/views/MonitorView.tsx index e68ee63e..c1718f44 100644 --- a/ui/src/views/MonitorView.tsx +++ b/ui/src/views/MonitorView.tsx @@ -927,7 +927,14 @@ const MonitorViewContent: React.FC = () => { const paramKey = selectedSessionId ? `${selectedSessionId}\0${nodeName}` : nodeName; const overrides = defaultSessionStore.get(nodeParamsAtom(paramKey)); - const mergedParams = { ...(apiNode.params || {}), ...(overrides || {}) }; + const rawParams = { ...(apiNode.params || {}), ...(overrides || {}) }; + // Strip transient sync metadata (_sender, _rev, etc.) from YAML export. + const mergedParams: Record = {}; + for (const [key, value] of Object.entries(rawParams)) { + if (!key.startsWith('_')) { + mergedParams[key] = value; + } + } if (Object.keys(mergedParams).length > 0) { nodeConfig['params'] = mergedParams; }