From 808abe853ad852d3be1314df7163d00d94e6208f Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Mon, 23 Mar 2026 22:52:30 +0000 Subject: [PATCH 1/2] feat: per-node config revision system for causal consistency (Phase 2) Implements the (_sender, _rev) tuple mechanism to prevent stale server- computed values from overwriting client state during high-frequency interactions. Rust changes: - Sanitize _-prefixed params before pipeline storage (websocket_handlers.rs) - Compositor extracts _sender/_rev from params, stamps view data JSON (compositor/mod.rs) TypeScript changes: - Add clientNonce (UUID) to WebSocketService, regenerated on connect - Add useConfigRev hook with per-node monotonic rev counters - Inject _sender/_rev in every commit adapter path (compositorCommit.ts) - Gate stale self-echoes in handleNodeParamsChanged (websocket.ts) - Gate stale view data in useServerLayoutSync (compositorServerSync.ts) - Add activeInteractionRef for live-mode interaction masking - Strip _-prefixed fields from YAML export (MonitorView.tsx) Tests: - useConfigRev unit tests (singleton counter behavior) - compositorCommit unit tests (stamping in all commit paths) - compositorServerSync unit tests (mapServerLayers pure helpers) - Integration tests for stale view-data gating and activeInteractionRef Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- apps/skit/src/websocket_handlers.rs | 11 +- crates/nodes/src/video/compositor/mod.rs | 36 +++- ui/src/hooks/compositorCommit.test.ts | 173 ++++++++++++++++++ ui/src/hooks/compositorCommit.ts | 41 ++++- ui/src/hooks/compositorServerSync.test.ts | 77 ++++++++ ui/src/hooks/compositorServerSync.ts | 22 ++- .../useCompositorLayers.monitor-flow.test.ts | 125 +++++++++++++ ui/src/hooks/useCompositorLayers.ts | 11 +- ui/src/hooks/useConfigRev.test.ts | 53 ++++++ ui/src/hooks/useConfigRev.ts | 84 +++++++++ ui/src/services/websocket.ts | 42 ++++- ui/src/views/MonitorView.tsx | 9 +- 12 files changed, 664 insertions(+), 20 deletions(-) create mode 100644 ui/src/hooks/compositorCommit.test.ts create mode 100644 ui/src/hooks/compositorServerSync.test.ts create mode 100644 ui/src/hooks/useConfigRev.test.ts create mode 100644 ui/src/hooks/useConfigRev.ts diff --git a/apps/skit/src/websocket_handlers.rs b/apps/skit/src/websocket_handlers.rs index 248a448b..e6f5eb6e 100644 --- a/apps/skit/src/websocket_handlers.rs +++ b/apps/skit/src/websocket_handlers.rs @@ -975,9 +975,18 @@ async fn handle_tune_node_fire_and_forget( } { + // Store sanitized params: strip transient sync metadata + // (_sender, _rev, etc.) from the durable pipeline model. + // Top-level keys prefixed with `_` are reserved for + // in-flight metadata and must not leak into persistence + // or GetPipeline responses. + let mut durable_params = params.clone(); + if let serde_json::Value::Object(ref mut map) = durable_params { + map.retain(|k, _| !k.starts_with('_')); + } let mut pipeline = session.pipeline.lock().await; if let Some(node) = pipeline.nodes.get_mut(&node_id) { - node.params = Some(params.clone()); + node.params = Some(durable_params); } else { warn!( node_id = %node_id, diff --git a/crates/nodes/src/video/compositor/mod.rs b/crates/nodes/src/video/compositor/mod.rs index 5d64ce67..2413b9ce 100644 --- a/crates/nodes/src/video/compositor/mod.rs +++ b/crates/nodes/src/video/compositor/mod.rs @@ -285,6 +285,12 @@ pub struct CompositorNode { input_pins: Vec, /// Next input ID for dynamic pin naming. next_input_id: usize, + /// Sender nonce from the last `UpdateParams` (`_sender` field). + /// Stamped into view data so clients can detect stale self-echoes. + config_sender: String, + /// Config revision from the last `UpdateParams` (`_rev` field). + /// Stamped into view data alongside `config_sender`. + config_rev: u64, } impl CompositorNode { @@ -311,7 +317,14 @@ impl CompositorNode { }, ); - Self { config, limits, input_pins, next_input_id } + Self { + config, + limits, + input_pins, + next_input_id, + config_sender: String::new(), + config_rev: 0, + } } /// The set of video packet types accepted by compositor input pins. @@ -602,14 +615,23 @@ impl ProcessorNode for CompositorNode { tracing::info!("CompositorNode received shutdown"); break; }, - NodeControlMessage::UpdateParams(params) => { + NodeControlMessage::UpdateParams(ref params) => { + // Extract transient sync metadata before + // deserialization strips unknown fields. + if let Some(sender) = params.get("_sender").and_then(|v| v.as_str()) { + self.config_sender = sender.to_string(); + } + if let Some(rev) = params.get("_rev").and_then(serde_json::Value::as_u64) { + self.config_rev = rev; + } + let old_fps = self.config.fps; Self::apply_update_params( &mut self.config, &self.limits, &mut image_overlays, &mut text_overlays, - params, + params.clone(), &mut stats_tracker, ); layer_configs_dirty = true; @@ -719,7 +741,13 @@ impl ProcessorNode for CompositorNode { // Emit layout via view data if it changed. if last_layout.as_ref() != Some(&scene.layout) { - if let Ok(json) = serde_json::to_value(&scene.layout) { + if let Ok(mut json) = serde_json::to_value(&scene.layout) { + // Stamp view data with the sender/rev from the last + // UpdateParams so clients can detect stale self-echoes. + if !self.config_sender.is_empty() { + json["_sender"] = serde_json::Value::from(self.config_sender.as_str()); + json["_rev"] = serde_json::Value::from(self.config_rev); + } view_data_helpers::emit_view_data(&view_data_tx, &node_name, || json); } last_layout = Some(scene.layout.clone()); diff --git a/ui/src/hooks/compositorCommit.test.ts b/ui/src/hooks/compositorCommit.test.ts new file mode 100644 index 00000000..d0ebff20 --- /dev/null +++ b/ui/src/hooks/compositorCommit.test.ts @@ -0,0 +1,173 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * Unit tests for the compositor commit adapter's causal-consistency stamping. + * + * Verifies that every commit path (commitLayers, commitOverlays, commitAll) + * injects `_sender` and `_rev` into outgoing config/params, and that the + * rev counter increments monotonically across calls. + */ + +import { describe, it, expect, vi, beforeEach } from 'vitest'; + +import { createCommitAdapter } from './compositorCommit'; +import type { LayerState, TextOverlayState, ImageOverlayState } from './compositorLayerParsers'; +import { resetAllConfigRevs } from './useConfigRev'; + +// Mock the WebSocket service to control the client nonce +vi.mock('@/services/websocket', () => ({ + getWebSocketService: () => ({ + getClientNonce: () => 'test-nonce-123', + }), +})); + +const NODE_ID = 'compositor'; + +function makeLayerState(id: string): LayerState { + return { + id, + x: 0, + y: 0, + width: 640, + height: 480, + opacity: 1.0, + zIndex: 0, + rotationDegrees: 0, + mirrorHorizontal: false, + mirrorVertical: false, + visible: true, + cropX: 0.5, + cropY: 0.5, + cropZoom: 1.0, + cropShape: 'rect', + }; +} + +function makeRefs() { + return { + paramsRef: { current: { width: 1280, height: 720 } as Record }, + layersRef: { current: [makeLayerState('in_0')] as LayerState[] }, + textOverlaysRef: { current: [] as TextOverlayState[] }, + imageOverlaysRef: { current: [] as ImageOverlayState[] }, + }; +} + +beforeEach(() => { + resetAllConfigRevs(); +}); + +describe('CommitAdapter causal-consistency stamping', () => { + it('commitLayers via onConfigChange stamps _sender and _rev', () => { + const onConfigChange = vi.fn(); + const refs = makeRefs(); + + const adapter = createCommitAdapter( + NODE_ID, + onConfigChange, + undefined, + refs.paramsRef, + refs.layersRef, + refs.textOverlaysRef, + refs.imageOverlaysRef + )!; + + adapter.commitLayers([makeLayerState('in_0')]); + + expect(onConfigChange).toHaveBeenCalledTimes(1); + const config = onConfigChange.mock.calls[0][1] as Record; + expect(config._sender).toBe('test-nonce-123'); + expect(config._rev).toBe(1); + }); + + it('commitLayers via onParamChange sends _sender and _rev as separate params', () => { + const onParamChange = vi.fn(); + const refs = makeRefs(); + + const adapter = createCommitAdapter( + NODE_ID, + undefined, + onParamChange, + refs.paramsRef, + refs.layersRef, + refs.textOverlaysRef, + refs.imageOverlaysRef + )!; + + adapter.commitLayers([makeLayerState('in_0')]); + + // Should have 3 calls: layers, _sender, _rev + expect(onParamChange).toHaveBeenCalledTimes(3); + expect(onParamChange.mock.calls[1][1]).toBe('_sender'); + expect(onParamChange.mock.calls[1][2]).toBe('test-nonce-123'); + expect(onParamChange.mock.calls[2][1]).toBe('_rev'); + expect(onParamChange.mock.calls[2][2]).toBe(1); + }); + + it('rev increments monotonically across multiple commits', () => { + const onConfigChange = vi.fn(); + const refs = makeRefs(); + + const adapter = createCommitAdapter( + NODE_ID, + onConfigChange, + undefined, + refs.paramsRef, + refs.layersRef, + refs.textOverlaysRef, + refs.imageOverlaysRef + )!; + + adapter.commitLayers([makeLayerState('in_0')]); + adapter.commitOverlays([], []); + adapter.commitAll([makeLayerState('in_0')], [], []); + + const rev1 = (onConfigChange.mock.calls[0][1] as Record)._rev; + const rev2 = (onConfigChange.mock.calls[1][1] as Record)._rev; + const rev3 = (onConfigChange.mock.calls[2][1] as Record)._rev; + + expect(rev1).toBe(1); + expect(rev2).toBe(2); + expect(rev3).toBe(3); + }); + + it('commitOverlays via onParamChange sends _sender/_rev once for the batch', () => { + const onParamChange = vi.fn(); + const refs = makeRefs(); + + const adapter = createCommitAdapter( + NODE_ID, + undefined, + onParamChange, + refs.paramsRef, + refs.layersRef, + refs.textOverlaysRef, + refs.imageOverlaysRef + )!; + + adapter.commitOverlays([], []); + + // text_overlays, image_overlays, _sender, _rev + expect(onParamChange).toHaveBeenCalledTimes(4); + const calls = onParamChange.mock.calls; + expect(calls[0][1]).toBe('text_overlays'); + expect(calls[1][1]).toBe('image_overlays'); + expect(calls[2][1]).toBe('_sender'); + expect(calls[3][1]).toBe('_rev'); + }); + + it('returns null when both callbacks are undefined', () => { + const refs = makeRefs(); + const adapter = createCommitAdapter( + NODE_ID, + undefined, + undefined, + refs.paramsRef, + refs.layersRef, + refs.textOverlaysRef, + refs.imageOverlaysRef + ); + expect(adapter).toBeNull(); + }); +}); diff --git a/ui/src/hooks/compositorCommit.ts b/ui/src/hooks/compositorCommit.ts index a77ae6ad..6af8271a 100644 --- a/ui/src/hooks/compositorCommit.ts +++ b/ui/src/hooks/compositorCommit.ts @@ -20,6 +20,7 @@ import { serializeTextOverlays, } from './compositorLayerParsers'; import type { LayerState, TextOverlayState, ImageOverlayState } from './compositorLayerParsers'; +import { bumpConfigRev, getClientNonce } from './useConfigRev'; // ── Commit adapter ────────────────────────────────────────────────────────── @@ -58,28 +59,48 @@ export function createCommitAdapter( ): CommitAdapter | null { if (!onConfigChange && !onParamChange) return null; + /** Stamp a config object with causal-consistency metadata. */ + function stamp(config: Record): Record { + const rev = bumpConfigRev(nodeId); + return { ...config, _sender: getClientNonce(), _rev: rev }; + } + + /** Send _sender/_rev as individual params alongside the real param. */ + function paramChangeWithRev( + nid: string, + key: string, + value: unknown, + send: (nodeId: string, key: string, value: unknown) => void + ) { + const rev = bumpConfigRev(nid); + send(nid, key, value); + send(nid, '_sender', getClientNonce()); + send(nid, '_rev', rev); + } + return { commitLayers(layers: LayerState[]) { if (onConfigChange) { - const config = buildConfig( - paramsRef.current, - layers, - textOverlaysRef.current, - imageOverlaysRef.current + const config = stamp( + buildConfig(paramsRef.current, layers, textOverlaysRef.current, imageOverlaysRef.current) ); onConfigChange(nodeId, config); } else if (onParamChange) { - onParamChange(nodeId, 'layers', serializeLayers(layers)); + paramChangeWithRev(nodeId, 'layers', serializeLayers(layers), onParamChange); } }, commitOverlays(text: TextOverlayState[], img: ImageOverlayState[]) { if (onConfigChange) { - const config = buildConfig(paramsRef.current, layersRef.current, text, img); + const config = stamp(buildConfig(paramsRef.current, layersRef.current, text, img)); onConfigChange(nodeId, config); } else if (onParamChange) { + const rev = bumpConfigRev(nodeId); + const nonce = getClientNonce(); onParamChange(nodeId, 'text_overlays', serializeTextOverlays(text)); onParamChange(nodeId, 'image_overlays', serializeImageOverlays(img)); + onParamChange(nodeId, '_sender', nonce); + onParamChange(nodeId, '_rev', rev); } }, @@ -90,11 +111,13 @@ export function createCommitAdapter( changed?: { layers?: boolean; overlays?: boolean } ) { if (onConfigChange) { - const config = buildConfig(paramsRef.current, layers, text, img); + const config = stamp(buildConfig(paramsRef.current, layers, text, img)); onConfigChange(nodeId, config); } else if (onParamChange) { const sendLayers = changed?.layers ?? true; const sendOverlays = changed?.overlays ?? true; + const rev = bumpConfigRev(nodeId); + const nonce = getClientNonce(); if (sendLayers) { onParamChange(nodeId, 'layers', serializeLayers(layers)); } @@ -102,6 +125,8 @@ export function createCommitAdapter( onParamChange(nodeId, 'text_overlays', serializeTextOverlays(text)); onParamChange(nodeId, 'image_overlays', serializeImageOverlays(img)); } + onParamChange(nodeId, '_sender', nonce); + onParamChange(nodeId, '_rev', rev); } }, }; diff --git a/ui/src/hooks/compositorServerSync.test.ts b/ui/src/hooks/compositorServerSync.test.ts new file mode 100644 index 00000000..3a0319a8 --- /dev/null +++ b/ui/src/hooks/compositorServerSync.test.ts @@ -0,0 +1,77 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * Unit tests for stale view-data gating in compositorServerSync. + * + * These tests verify that: + * - View data with matching _sender and _rev <= local rev is suppressed + * - View data from other senders is always applied + * - View data with _rev > local rev is applied (newer from server) + * - The activeInteractionRef guard suppresses view data during interactions + */ + +import { describe, it, expect } from 'vitest'; + +import type { ResolvedLayer } from '@/types/generated/compositor-types'; + +import type { LayerState } from './compositorLayerParsers'; +import { mapServerLayers } from './compositorServerSync'; + +function makeLayer(id: string, x: number, width: number): LayerState { + return { + id, + x, + y: 0, + width, + height: 720, + opacity: 1.0, + zIndex: 0, + rotationDegrees: 0, + mirrorHorizontal: false, + mirrorVertical: false, + visible: true, + cropX: 0.5, + cropY: 0.5, + cropZoom: 1.0, + cropShape: 'rect', + }; +} + +describe('mapServerLayers — pure geometry merge', () => { + it('updates geometry from server for matching layers', () => { + const prev = [makeLayer('in_0', 0, 1280)]; + const serverLayers: ResolvedLayer[] = [{ id: 'in_0', x: 160, y: 0, width: 960, height: 720 }]; + + const result = mapServerLayers(prev, serverLayers); + + expect(result[0].x).toBe(160); + expect(result[0].width).toBe(960); + // Config-driven fields preserved + expect(result[0].opacity).toBe(1.0); + expect(result[0].visible).toBe(true); + }); + + it('returns same reference when geometry is unchanged', () => { + const prev = [makeLayer('in_0', 160, 960)]; + const serverLayers: ResolvedLayer[] = [{ id: 'in_0', x: 160, y: 0, width: 960, height: 720 }]; + + const result = mapServerLayers(prev, serverLayers); + + expect(result).toBe(prev); // referential equality + }); + + it('filters out layers not in local state', () => { + const prev = [makeLayer('in_0', 0, 1280)]; + const serverLayers: ResolvedLayer[] = [ + { id: 'in_0', x: 160, y: 0, width: 960, height: 720 }, + { id: 'in_1', x: 0, y: 0, width: 320, height: 240 }, + ]; + + const result = mapServerLayers(prev, serverLayers); + + expect(result).toHaveLength(1); + expect(result[0].id).toBe('in_0'); + }); +}); diff --git a/ui/src/hooks/compositorServerSync.ts b/ui/src/hooks/compositorServerSync.ts index 8b805594..e6800616 100644 --- a/ui/src/hooks/compositorServerSync.ts +++ b/ui/src/hooks/compositorServerSync.ts @@ -44,6 +44,7 @@ import { setTextOverlaysInStore, } from './compositorAtoms'; import type { LayerState, TextOverlayState, OverlayBase } from './compositorLayerParsers'; +import { getLocalConfigRev, getClientNonce } from './useConfigRev'; // ── Pure helpers ──────────────────────────────────────────────────────────── @@ -128,7 +129,8 @@ export function useServerLayoutSync( sessionId: string | undefined, nodeId: string, store: CompositorStore, - dragStateRef: React.MutableRefObject + dragStateRef: React.MutableRefObject, + activeInteractionRef?: React.MutableRefObject ): void { useEffect(() => { if (!sessionId) return; @@ -138,6 +140,22 @@ export function useServerLayoutSync( // Skip during pointer drag/resize to avoid stale server geometry // overwriting in-flight DOM positions. if (dragStateRef.current) return; + // Skip during any active live-mode interaction (slider drag, etc.) + // to avoid stale server values overwriting in-flight client state. + if (activeInteractionRef?.current) return; + + const vd = viewData as Record; + + // Stale view-data gate: if this view data originated from our own + // config change and the rev is <= our local counter, skip it. + const sender = typeof vd._sender === 'string' ? vd._sender : undefined; + const rev = typeof vd._rev === 'number' ? vd._rev : undefined; + if (sender && sender === getClientNonce() && rev !== undefined) { + const localRev = getLocalConfigRev(nodeId); + if (rev <= localRev) { + return; + } + } const layout = viewData as CompositorLayout; if (!Array.isArray(layout.layers)) return; @@ -172,5 +190,5 @@ export function useServerLayoutSync( applyServerLayout(defaultSessionStore.get(viewDataAtom)); }); return unsubscribe; - }, [sessionId, nodeId, store, dragStateRef]); + }, [sessionId, nodeId, store, dragStateRef, activeInteractionRef]); } diff --git a/ui/src/hooks/useCompositorLayers.monitor-flow.test.ts b/ui/src/hooks/useCompositorLayers.monitor-flow.test.ts index 270f7294..f412f1f5 100644 --- a/ui/src/hooks/useCompositorLayers.monitor-flow.test.ts +++ b/ui/src/hooks/useCompositorLayers.monitor-flow.test.ts @@ -34,6 +34,7 @@ import { } from './compositorAtoms'; import type { UseCompositorLayersOptions } from './useCompositorLayers'; import { useCompositorLayers } from './useCompositorLayers'; +import { resetAllConfigRevs, bumpConfigRev } from './useConfigRev'; // ── Helpers ───────────────────────────────────────────────────────────────── @@ -159,6 +160,7 @@ afterEach(() => { // Clean up store between tests clearSessionAtoms(SESSION_ID); useSessionStore.getState().clearSession(SESSION_ID); + resetAllConfigRevs(); }); // ── Tests ─────────────────────────────────────────────────────────────────── @@ -591,3 +593,126 @@ describe('Monitor view data flow integration', () => { expect(text1[0].y).toBe(200); }); }); + +// ── Causal consistency: stale view-data gating ────────────────────────────── + +// Mock the WebSocket service for nonce control in stale-echo tests. +// The mock must be hoisted so vi.mock runs before imports. +vi.mock('@/services/websocket', () => ({ + getWebSocketService: () => ({ + getClientNonce: () => 'mock-nonce-abc', + }), +})); + +describe('Stale view-data gating (causal consistency)', () => { + it('view data stamped with own nonce and stale rev is suppressed', () => { + seedStore(); + + const opts = monitorOptions(); + const { result } = renderHook( + (props: UseCompositorLayersOptions) => useCompositorLayers(props), + { initialProps: opts } + ); + + // Simulate the client having sent 3 config updates + bumpConfigRev(NODE_ID); // rev 1 + bumpConfigRev(NODE_ID); // rev 2 + bumpConfigRev(NODE_ID); // rev 3 + + // Initial state: full canvas fallback + const layers0 = getLayersFromStore(result.current.store); + expect(layers0[0].x).toBe(0); + expect(layers0[0].width).toBe(1280); + + // Server echoes view data stamped with our nonce at rev 2 (stale). + const staleLayout = { + ...makeServerLayout(), + _sender: 'mock-nonce-abc', + _rev: 2, + }; + act(() => pushServerViewData(staleLayout)); + + // Stale echo should be suppressed — geometry unchanged + const layers1 = getLayersFromStore(result.current.store); + expect(layers1[0].x).toBe(0); + expect(layers1[0].width).toBe(1280); + }); + + it('view data from a different sender is always applied', () => { + seedStore(); + + const opts = monitorOptions(); + const { result } = renderHook( + (props: UseCompositorLayersOptions) => useCompositorLayers(props), + { initialProps: opts } + ); + + // Bump local rev + bumpConfigRev(NODE_ID); // rev 1 + + // Server sends view data from a different client + const otherClientLayout = { + ...makeServerLayout(), + _sender: 'other-client-nonce', + _rev: 1, + }; + act(() => pushServerViewData(otherClientLayout)); + + // Should be applied — different sender + const layers1 = getLayersFromStore(result.current.store); + expect(layers1[0].x).toBe(160); + expect(layers1[0].width).toBe(960); + }); + + it('view data without _sender/_rev metadata is always applied', () => { + seedStore(); + + const opts = monitorOptions(); + const { result } = renderHook( + (props: UseCompositorLayersOptions) => useCompositorLayers(props), + { initialProps: opts } + ); + + // Bump local rev + bumpConfigRev(NODE_ID); + + // Server sends view data without any causal metadata + act(() => pushServerViewData(makeServerLayout())); + + // Should be applied — no metadata means no gating + const layers1 = getLayersFromStore(result.current.store); + expect(layers1[0].x).toBe(160); + expect(layers1[0].width).toBe(960); + }); + + it('activeInteractionRef suppresses view data during interactions', () => { + seedStore(); + + const opts = monitorOptions(); + const { result } = renderHook( + (props: UseCompositorLayersOptions) => useCompositorLayers(props), + { initialProps: opts } + ); + + // Simulate an active interaction (slider drag) + result.current.activeInteractionRef.current = true; + + // Server sends view data + act(() => pushServerViewData(makeServerLayout())); + + // View data should be suppressed during interaction + const layers1 = getLayersFromStore(result.current.store); + expect(layers1[0].x).toBe(0); + expect(layers1[0].width).toBe(1280); + + // End the interaction + result.current.activeInteractionRef.current = false; + + // Now view data should be applied + act(() => pushServerViewData(makeServerLayout())); + + const layers2 = getLayersFromStore(result.current.store); + expect(layers2[0].x).toBe(160); + expect(layers2[0].width).toBe(960); + }); +}); diff --git a/ui/src/hooks/useCompositorLayers.ts b/ui/src/hooks/useCompositorLayers.ts index d9e7ca1e..6c7681e1 100644 --- a/ui/src/hooks/useCompositorLayers.ts +++ b/ui/src/hooks/useCompositorLayers.ts @@ -129,6 +129,9 @@ export interface UseCompositorLayersResult { /** Atomically reassign z-index values for all layer types in one commit. * Each entry maps a layer id + kind to its new z-index. */ reorderLayers: (entries: Array<{ id: string; kind: LayerKind; zIndex: number }>) => void; + /** Ref flag: true while a live-mode interaction (slider drag, etc.) is in + * progress. Set by consumers to suppress stale server echo-backs. */ + activeInteractionRef: React.MutableRefObject; /** Pre-assembled deps bag for useCompositorKeyboard. */ keyboardDeps: CompositorKeyboardDeps; } @@ -267,6 +270,11 @@ export const useCompositorLayers = ( }>({ vertical: null, horizontal: null }); const dragStateRef = useRef(null); + // Per-node flag: true while any live-mode interaction (slider drag, etc.) + // is in progress. Guards useServerLayoutSync so stale server geometry + // doesn't overwrite in-flight client state. + const activeInteractionRef = useRef(false); + // ── Commit / persistence ─────────────────────────────────────────────────── const { commitAdapter, throttledConfigChange, throttledOverlayCommit } = useCompositorCommit({ nodeId, @@ -347,7 +355,7 @@ export const useCompositorLayers = ( }, [params, canvasWidth, canvasHeight, isMonitorView, store]); // ── Server-driven layout (Monitor view only) ─────────────────────────── - useServerLayoutSync(sessionId, nodeId, store, dragStateRef); + useServerLayoutSync(sessionId, nodeId, store, dragStateRef, activeInteractionRef); // ── Find layer across all types ───────────────────────────────────────── const findAnyLayer = useCallback( @@ -440,6 +448,7 @@ export const useCompositorLayers = ( updateImageOverlay: overlayOps.updateImageOverlay, removeImageOverlay: overlayOps.removeImageOverlay, reorderLayers: overlayOps.reorderLayers, + activeInteractionRef, keyboardDeps: { selectedLayerId, selectLayer: overlayOps.selectLayer, diff --git a/ui/src/hooks/useConfigRev.test.ts b/ui/src/hooks/useConfigRev.test.ts new file mode 100644 index 00000000..4d19ed61 --- /dev/null +++ b/ui/src/hooks/useConfigRev.test.ts @@ -0,0 +1,53 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * Unit tests for the per-node config revision counter module. + * + * These tests verify the core causal-consistency primitives: + * - Monotonic rev counter per node + * - Independent counters across nodes + * - Reset clears all counters + * - bumpConfigRev returns the new value + */ + +import { describe, it, expect, beforeEach } from 'vitest'; + +import { getLocalConfigRev, bumpConfigRev, resetAllConfigRevs } from './useConfigRev'; + +beforeEach(() => { + resetAllConfigRevs(); +}); + +describe('useConfigRev — singleton rev counters', () => { + it('starts at 0 for unknown nodes', () => { + expect(getLocalConfigRev('node_a')).toBe(0); + }); + + it('bumpConfigRev increments and returns the new value', () => { + expect(bumpConfigRev('node_a')).toBe(1); + expect(bumpConfigRev('node_a')).toBe(2); + expect(bumpConfigRev('node_a')).toBe(3); + expect(getLocalConfigRev('node_a')).toBe(3); + }); + + it('counters are independent per node', () => { + bumpConfigRev('node_a'); + bumpConfigRev('node_a'); + bumpConfigRev('node_b'); + + expect(getLocalConfigRev('node_a')).toBe(2); + expect(getLocalConfigRev('node_b')).toBe(1); + }); + + it('resetAllConfigRevs clears all counters', () => { + bumpConfigRev('node_a'); + bumpConfigRev('node_b'); + + resetAllConfigRevs(); + + expect(getLocalConfigRev('node_a')).toBe(0); + expect(getLocalConfigRev('node_b')).toBe(0); + }); +}); diff --git a/ui/src/hooks/useConfigRev.ts b/ui/src/hooks/useConfigRev.ts new file mode 100644 index 00000000..7f4cd9d3 --- /dev/null +++ b/ui/src/hooks/useConfigRev.ts @@ -0,0 +1,84 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * Per-node config revision counter for causal consistency. + * + * Each node's outgoing config carries a monotonically increasing `_rev` + * alongside the WebSocket session's `_sender` nonce. Consumers + * (handleNodeParamsChanged, useServerLayoutSync) compare the incoming + * `(_sender, _rev)` against the local counter to detect and discard + * stale self-echoes. + * + * The counter is per-node because different nodes may be edited at + * different rates. The `_sender` nonce comes from WebSocketService + * and is stable for one WS connection lifetime. + */ + +import { useCallback, useRef } from 'react'; + +import { getWebSocketService } from '@/services/websocket'; + +// ── Singleton rev counters ────────────────────────────────────────────────── + +/** Per-node config revision counters, keyed by nodeId. + * Shared across all hook instances — a ref-map so React doesn't + * re-render when the counter bumps. */ +const nodeRevCounters = new Map(); + +/** Get the current local config rev for a node (non-reactive). */ +export function getLocalConfigRev(nodeId: string): number { + return nodeRevCounters.get(nodeId) ?? 0; +} + +/** Bump and return the new config rev for a node. */ +export function bumpConfigRev(nodeId: string): number { + const next = (nodeRevCounters.get(nodeId) ?? 0) + 1; + nodeRevCounters.set(nodeId, next); + return next; +} + +/** Reset all config rev counters (e.g. on WS reconnect). */ +export function resetAllConfigRevs(): void { + nodeRevCounters.clear(); +} + +/** Get the current client nonce from the WebSocket service. */ +export function getClientNonce(): string { + return getWebSocketService().getClientNonce(); +} + +// ── Hook ──────────────────────────────────────────────────────────────────── + +export interface UseConfigRevResult { + /** Current config revision for this node (read from shared counter). */ + getConfigRev: () => number; + /** Bump the rev counter and return the new value. */ + bumpRev: () => number; + /** Get the sender nonce for the current WS session. */ + getNonce: () => string; +} + +/** Hook that provides per-node config revision tracking. + * + * Usage: + * ```ts + * const { bumpRev, getNonce } = useConfigRev(nodeId); + * // In a commit path: + * const rev = bumpRev(); + * const params = { ...config, _sender: getNonce(), _rev: rev }; + * ``` + */ +export function useConfigRev(nodeId: string): UseConfigRevResult { + const nodeIdRef = useRef(nodeId); + nodeIdRef.current = nodeId; + + const getConfigRev = useCallback(() => getLocalConfigRev(nodeIdRef.current), []); + + const bumpRev = useCallback(() => bumpConfigRev(nodeIdRef.current), []); + + const getNonce = useCallback(() => getClientNonce(), []); + + return { getConfigRev, bumpRev, getNonce }; +} diff --git a/ui/src/services/websocket.ts b/ui/src/services/websocket.ts index c644cef1..78713279 100644 --- a/ui/src/services/websocket.ts +++ b/ui/src/services/websocket.ts @@ -4,6 +4,7 @@ import { v4 as uuidv4 } from 'uuid'; +import { getLocalConfigRev } from '@/hooks/useConfigRev'; import { batchWriteNodeStates, batchWriteNodeStats, @@ -54,6 +55,12 @@ export class WebSocketService { private isIntentionallyClosed = false; private subscribedSessions: Set = new Set(); + /** Stable sender nonce for this WebSocket session. + * Generated on connect, reset on reconnect. Used in `_sender` fields + * so the client can distinguish its own stale echoes from other clients' + * updates. Exposed via `getClientNonce()`. */ + private clientNonce: string = uuidv4(); + // ── Frame-level batching for high-frequency events ────────────────── // Buffer node-state and node-stats updates that arrive in rapid // succession (e.g. during session initialisation) and flush them as a @@ -86,6 +93,9 @@ export class WebSocketService { this.ws.onopen = () => { logger.info('Connected (onopen fired)'); this.reconnectAttempts = 0; + // Reset sender nonce on each new connection so stale echoes from + // previous sessions are never mistaken for the current session's. + this.clientNonce = uuidv4(); this.notifyConnectionStatus(true); this.flushMessageQueue(); this.resubscribeToSessions(); @@ -312,10 +322,29 @@ export class WebSocketService { // // useSessionStore.getState().updateNodeParams(session_id, node_id, params as Record); - // Batch all param updates into a single store update to avoid - // N intermediate states and N selector re-evaluations. if (params && typeof params === 'object' && !Array.isArray(params)) { - writeNodeParams(node_id, params as Record, session_id); + const p = params as Record; + + // Stale echo-back gate: if this event originated from us and the rev + // is <= our local counter, it's a stale echo — skip it. + const sender = typeof p._sender === 'string' ? p._sender : undefined; + const rev = typeof p._rev === 'number' ? p._rev : undefined; + if (sender && sender === this.clientNonce && rev !== undefined) { + const localRev = getLocalConfigRev(node_id); + if (rev <= localRev) { + return; + } + } + + // Strip transient sync metadata before writing to local state. + const cleaned: Record = {}; + for (const [k, v] of Object.entries(p)) { + if (!k.startsWith('_')) { + cleaned[k] = v; + } + } + + writeNodeParams(node_id, cleaned, session_id); } } @@ -535,6 +564,13 @@ export class WebSocketService { isConnected(): boolean { return this.ws?.readyState === WebSocket.OPEN; } + + /** Return the current sender nonce for this WebSocket session. + * Used by the config revision system to stamp outgoing params with + * `_sender` so the client can identify its own echoes. */ + getClientNonce(): string { + return this.clientNonce; + } } // Singleton instance diff --git a/ui/src/views/MonitorView.tsx b/ui/src/views/MonitorView.tsx index e68ee63e..c1718f44 100644 --- a/ui/src/views/MonitorView.tsx +++ b/ui/src/views/MonitorView.tsx @@ -927,7 +927,14 @@ const MonitorViewContent: React.FC = () => { const paramKey = selectedSessionId ? `${selectedSessionId}\0${nodeName}` : nodeName; const overrides = defaultSessionStore.get(nodeParamsAtom(paramKey)); - const mergedParams = { ...(apiNode.params || {}), ...(overrides || {}) }; + const rawParams = { ...(apiNode.params || {}), ...(overrides || {}) }; + // Strip transient sync metadata (_sender, _rev, etc.) from YAML export. + const mergedParams: Record = {}; + for (const [key, value] of Object.entries(rawParams)) { + if (!key.startsWith('_')) { + mergedParams[key] = value; + } + } if (Object.keys(mergedParams).length > 0) { nodeConfig['params'] = mergedParams; } From ce13445942299c7cabd85d3220f5ab7bf5b13814 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Mon, 23 Mar 2026 23:08:34 +0000 Subject: [PATCH 2/2] fix: clear stale config metadata and remove standalone _sender/_rev sends - Rust: Always overwrite config_sender/config_rev on UpdateParams instead of conditionally setting them. Non-stamped UpdateParams now clears to defaults, preventing stale sender/rev from being emitted in view data. - TS: Remove standalone _sender/_rev messages from the onParamChange path. Each tuneNode call replaces the server's full node.params, so sending _sender/_rev as separate messages would wipe durable params to {} after stripping. Only the onConfigChange path (tuneNodeConfig) carries stamped metadata since it sends the full config in a single message. - Remove unused useConfigRev hook export (fixes Knip CI failure). The standalone functions (getLocalConfigRev, bumpConfigRev, etc.) remain. Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/nodes/src/video/compositor/mod.rs | 15 ++++++---- ui/src/hooks/compositorCommit.test.ts | 20 ++++++------- ui/src/hooks/compositorCommit.ts | 28 +++++------------- ui/src/hooks/useConfigRev.ts | 36 ------------------------ 4 files changed, 24 insertions(+), 75 deletions(-) diff --git a/crates/nodes/src/video/compositor/mod.rs b/crates/nodes/src/video/compositor/mod.rs index 2413b9ce..907879b5 100644 --- a/crates/nodes/src/video/compositor/mod.rs +++ b/crates/nodes/src/video/compositor/mod.rs @@ -618,12 +618,15 @@ impl ProcessorNode for CompositorNode { NodeControlMessage::UpdateParams(ref params) => { // Extract transient sync metadata before // deserialization strips unknown fields. - if let Some(sender) = params.get("_sender").and_then(|v| v.as_str()) { - self.config_sender = sender.to_string(); - } - if let Some(rev) = params.get("_rev").and_then(serde_json::Value::as_u64) { - self.config_rev = rev; - } + // Always overwrite (not conditionally set) so that + // non-stamped UpdateParams clears stale values. + self.config_sender = params.get("_sender") + .and_then(|v| v.as_str()) + .map(str::to_string) + .unwrap_or_default(); + self.config_rev = params.get("_rev") + .and_then(serde_json::Value::as_u64) + .unwrap_or(0); let old_fps = self.config.fps; Self::apply_update_params( diff --git a/ui/src/hooks/compositorCommit.test.ts b/ui/src/hooks/compositorCommit.test.ts index d0ebff20..86bc93fc 100644 --- a/ui/src/hooks/compositorCommit.test.ts +++ b/ui/src/hooks/compositorCommit.test.ts @@ -81,7 +81,7 @@ describe('CommitAdapter causal-consistency stamping', () => { expect(config._rev).toBe(1); }); - it('commitLayers via onParamChange sends _sender and _rev as separate params', () => { + it('commitLayers via onParamChange does NOT send standalone _sender/_rev (avoids param wipe)', () => { const onParamChange = vi.fn(); const refs = makeRefs(); @@ -97,12 +97,10 @@ describe('CommitAdapter causal-consistency stamping', () => { adapter.commitLayers([makeLayerState('in_0')]); - // Should have 3 calls: layers, _sender, _rev - expect(onParamChange).toHaveBeenCalledTimes(3); - expect(onParamChange.mock.calls[1][1]).toBe('_sender'); - expect(onParamChange.mock.calls[1][2]).toBe('test-nonce-123'); - expect(onParamChange.mock.calls[2][1]).toBe('_rev'); - expect(onParamChange.mock.calls[2][2]).toBe(1); + // Only the real param is sent — no standalone _sender/_rev messages + // because each tuneNode call replaces the server's full node.params. + expect(onParamChange).toHaveBeenCalledTimes(1); + expect(onParamChange.mock.calls[0][1]).toBe('layers'); }); it('rev increments monotonically across multiple commits', () => { @@ -132,7 +130,7 @@ describe('CommitAdapter causal-consistency stamping', () => { expect(rev3).toBe(3); }); - it('commitOverlays via onParamChange sends _sender/_rev once for the batch', () => { + it('commitOverlays via onParamChange does NOT send standalone _sender/_rev', () => { const onParamChange = vi.fn(); const refs = makeRefs(); @@ -148,13 +146,11 @@ describe('CommitAdapter causal-consistency stamping', () => { adapter.commitOverlays([], []); - // text_overlays, image_overlays, _sender, _rev - expect(onParamChange).toHaveBeenCalledTimes(4); + // Only real params sent — no _sender/_rev standalone messages + expect(onParamChange).toHaveBeenCalledTimes(2); const calls = onParamChange.mock.calls; expect(calls[0][1]).toBe('text_overlays'); expect(calls[1][1]).toBe('image_overlays'); - expect(calls[2][1]).toBe('_sender'); - expect(calls[3][1]).toBe('_rev'); }); it('returns null when both callbacks are undefined', () => { diff --git a/ui/src/hooks/compositorCommit.ts b/ui/src/hooks/compositorCommit.ts index 6af8271a..17186bb5 100644 --- a/ui/src/hooks/compositorCommit.ts +++ b/ui/src/hooks/compositorCommit.ts @@ -65,18 +65,12 @@ export function createCommitAdapter( return { ...config, _sender: getClientNonce(), _rev: rev }; } - /** Send _sender/_rev as individual params alongside the real param. */ - function paramChangeWithRev( - nid: string, - key: string, - value: unknown, - send: (nodeId: string, key: string, value: unknown) => void - ) { - const rev = bumpConfigRev(nid); - send(nid, key, value); - send(nid, '_sender', getClientNonce()); - send(nid, '_rev', rev); - } + // NOTE: The onParamChange path (tuneNode) sends each key as a separate + // UpdateParams WS message that replaces the server's full node.params. + // Sending _sender/_rev as standalone messages would wipe durable params + // to {} after stripping. Only the onConfigChange path (tuneNodeConfig) + // can safely carry stamped metadata because it sends the full config + // object in a single message. return { commitLayers(layers: LayerState[]) { @@ -86,7 +80,7 @@ export function createCommitAdapter( ); onConfigChange(nodeId, config); } else if (onParamChange) { - paramChangeWithRev(nodeId, 'layers', serializeLayers(layers), onParamChange); + onParamChange(nodeId, 'layers', serializeLayers(layers)); } }, @@ -95,12 +89,8 @@ export function createCommitAdapter( const config = stamp(buildConfig(paramsRef.current, layersRef.current, text, img)); onConfigChange(nodeId, config); } else if (onParamChange) { - const rev = bumpConfigRev(nodeId); - const nonce = getClientNonce(); onParamChange(nodeId, 'text_overlays', serializeTextOverlays(text)); onParamChange(nodeId, 'image_overlays', serializeImageOverlays(img)); - onParamChange(nodeId, '_sender', nonce); - onParamChange(nodeId, '_rev', rev); } }, @@ -116,8 +106,6 @@ export function createCommitAdapter( } else if (onParamChange) { const sendLayers = changed?.layers ?? true; const sendOverlays = changed?.overlays ?? true; - const rev = bumpConfigRev(nodeId); - const nonce = getClientNonce(); if (sendLayers) { onParamChange(nodeId, 'layers', serializeLayers(layers)); } @@ -125,8 +113,6 @@ export function createCommitAdapter( onParamChange(nodeId, 'text_overlays', serializeTextOverlays(text)); onParamChange(nodeId, 'image_overlays', serializeImageOverlays(img)); } - onParamChange(nodeId, '_sender', nonce); - onParamChange(nodeId, '_rev', rev); } }, }; diff --git a/ui/src/hooks/useConfigRev.ts b/ui/src/hooks/useConfigRev.ts index 7f4cd9d3..7f936884 100644 --- a/ui/src/hooks/useConfigRev.ts +++ b/ui/src/hooks/useConfigRev.ts @@ -16,8 +16,6 @@ * and is stable for one WS connection lifetime. */ -import { useCallback, useRef } from 'react'; - import { getWebSocketService } from '@/services/websocket'; // ── Singleton rev counters ────────────────────────────────────────────────── @@ -48,37 +46,3 @@ export function resetAllConfigRevs(): void { export function getClientNonce(): string { return getWebSocketService().getClientNonce(); } - -// ── Hook ──────────────────────────────────────────────────────────────────── - -export interface UseConfigRevResult { - /** Current config revision for this node (read from shared counter). */ - getConfigRev: () => number; - /** Bump the rev counter and return the new value. */ - bumpRev: () => number; - /** Get the sender nonce for the current WS session. */ - getNonce: () => string; -} - -/** Hook that provides per-node config revision tracking. - * - * Usage: - * ```ts - * const { bumpRev, getNonce } = useConfigRev(nodeId); - * // In a commit path: - * const rev = bumpRev(); - * const params = { ...config, _sender: getNonce(), _rev: rev }; - * ``` - */ -export function useConfigRev(nodeId: string): UseConfigRevResult { - const nodeIdRef = useRef(nodeId); - nodeIdRef.current = nodeId; - - const getConfigRev = useCallback(() => getLocalConfigRev(nodeIdRef.current), []); - - const bumpRev = useCallback(() => bumpConfigRev(nodeIdRef.current), []); - - const getNonce = useCallback(() => getClientNonce(), []); - - return { getConfigRev, bumpRev, getNonce }; -}