-
Notifications
You must be signed in to change notification settings - Fork 0
feat: per-node config revision system for causal consistency (Phase 2) #186
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 Stale When a pin management event (input added/removed) triggers a layout change in the compositor, the emitted view data inherits the stale Scenario walkthrough
Client A misses the layout update until its next config change bumps the rev. (Refers to lines 664-671) Was this helpful? React with 👍 or 👎 to provide feedback. Debug |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -285,6 +285,12 @@ pub struct CompositorNode { | |
| input_pins: Vec<InputPin>, | ||
| /// Next input ID for dynamic pin naming. | ||
| next_input_id: usize, | ||
| /// Sender nonce from the last `UpdateParams` (`_sender` field). | ||
| /// Stamped into view data so clients can detect stale self-echoes. | ||
| config_sender: String, | ||
| /// Config revision from the last `UpdateParams` (`_rev` field). | ||
| /// Stamped into view data alongside `config_sender`. | ||
| config_rev: u64, | ||
| } | ||
|
|
||
| impl CompositorNode { | ||
|
|
@@ -311,7 +317,14 @@ impl CompositorNode { | |
| }, | ||
| ); | ||
|
|
||
| Self { config, limits, input_pins, next_input_id } | ||
| Self { | ||
| config, | ||
| limits, | ||
| input_pins, | ||
| next_input_id, | ||
| config_sender: String::new(), | ||
| config_rev: 0, | ||
| } | ||
| } | ||
|
|
||
| /// The set of video packet types accepted by compositor input pins. | ||
|
|
@@ -602,14 +615,26 @@ impl ProcessorNode for CompositorNode { | |
| tracing::info!("CompositorNode received shutdown"); | ||
| break; | ||
| }, | ||
| NodeControlMessage::UpdateParams(params) => { | ||
| NodeControlMessage::UpdateParams(ref params) => { | ||
| // Extract transient sync metadata before | ||
| // deserialization strips unknown fields. | ||
| // Always overwrite (not conditionally set) so that | ||
| // non-stamped UpdateParams clears stale values. | ||
| self.config_sender = params.get("_sender") | ||
| .and_then(|v| v.as_str()) | ||
| .map(str::to_string) | ||
| .unwrap_or_default(); | ||
| self.config_rev = params.get("_rev") | ||
| .and_then(serde_json::Value::as_u64) | ||
| .unwrap_or(0); | ||
|
|
||
| let old_fps = self.config.fps; | ||
| Self::apply_update_params( | ||
| &mut self.config, | ||
| &self.limits, | ||
| &mut image_overlays, | ||
| &mut text_overlays, | ||
| params, | ||
| params.clone(), | ||
| &mut stats_tracker, | ||
| ); | ||
| layer_configs_dirty = true; | ||
|
|
@@ -719,7 +744,13 @@ impl ProcessorNode for CompositorNode { | |
|
|
||
| // Emit layout via view data if it changed. | ||
| if last_layout.as_ref() != Some(&scene.layout) { | ||
| if let Ok(json) = serde_json::to_value(&scene.layout) { | ||
| if let Ok(mut json) = serde_json::to_value(&scene.layout) { | ||
| // Stamp view data with the sender/rev from the last | ||
| // UpdateParams so clients can detect stale self-echoes. | ||
| if !self.config_sender.is_empty() { | ||
| json["_sender"] = serde_json::Value::from(self.config_sender.as_str()); | ||
| json["_rev"] = serde_json::Value::from(self.config_rev); | ||
| } | ||
| view_data_helpers::emit_view_data(&view_data_tx, &node_name, || json); | ||
| } | ||
| last_layout = Some(scene.layout.clone()); | ||
|
Comment on lines
+747
to
756
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚩 Multi-client concurrent edit: last-writer-wins stamp can cause missed geometry updates The compositor node stores only the most recent Example: Client A sends Was this helpful? React with 👍 or 👎 to provide feedback. Debug |
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,169 @@ | ||
| // SPDX-FileCopyrightText: © 2025 StreamKit Contributors | ||
| // | ||
| // SPDX-License-Identifier: MPL-2.0 | ||
|
|
||
| /** | ||
| * Unit tests for the compositor commit adapter's causal-consistency stamping. | ||
| * | ||
| * Verifies that every commit path (commitLayers, commitOverlays, commitAll) | ||
| * injects `_sender` and `_rev` into outgoing config/params, and that the | ||
| * rev counter increments monotonically across calls. | ||
| */ | ||
|
|
||
| import { describe, it, expect, vi, beforeEach } from 'vitest'; | ||
|
|
||
| import { createCommitAdapter } from './compositorCommit'; | ||
| import type { LayerState, TextOverlayState, ImageOverlayState } from './compositorLayerParsers'; | ||
| import { resetAllConfigRevs } from './useConfigRev'; | ||
|
|
||
| // Mock the WebSocket service to control the client nonce | ||
| vi.mock('@/services/websocket', () => ({ | ||
| getWebSocketService: () => ({ | ||
| getClientNonce: () => 'test-nonce-123', | ||
| }), | ||
| })); | ||
|
|
||
| const NODE_ID = 'compositor'; | ||
|
|
||
| function makeLayerState(id: string): LayerState { | ||
| return { | ||
| id, | ||
| x: 0, | ||
| y: 0, | ||
| width: 640, | ||
| height: 480, | ||
| opacity: 1.0, | ||
| zIndex: 0, | ||
| rotationDegrees: 0, | ||
| mirrorHorizontal: false, | ||
| mirrorVertical: false, | ||
| visible: true, | ||
| cropX: 0.5, | ||
| cropY: 0.5, | ||
| cropZoom: 1.0, | ||
| cropShape: 'rect', | ||
| }; | ||
| } | ||
|
|
||
| function makeRefs() { | ||
| return { | ||
| paramsRef: { current: { width: 1280, height: 720 } as Record<string, unknown> }, | ||
| layersRef: { current: [makeLayerState('in_0')] as LayerState[] }, | ||
| textOverlaysRef: { current: [] as TextOverlayState[] }, | ||
| imageOverlaysRef: { current: [] as ImageOverlayState[] }, | ||
| }; | ||
| } | ||
|
|
||
| beforeEach(() => { | ||
| resetAllConfigRevs(); | ||
| }); | ||
|
|
||
| describe('CommitAdapter causal-consistency stamping', () => { | ||
| it('commitLayers via onConfigChange stamps _sender and _rev', () => { | ||
| const onConfigChange = vi.fn(); | ||
| const refs = makeRefs(); | ||
|
|
||
| const adapter = createCommitAdapter( | ||
| NODE_ID, | ||
| onConfigChange, | ||
| undefined, | ||
| refs.paramsRef, | ||
| refs.layersRef, | ||
| refs.textOverlaysRef, | ||
| refs.imageOverlaysRef | ||
| )!; | ||
|
|
||
| adapter.commitLayers([makeLayerState('in_0')]); | ||
|
|
||
| expect(onConfigChange).toHaveBeenCalledTimes(1); | ||
| const config = onConfigChange.mock.calls[0][1] as Record<string, unknown>; | ||
| expect(config._sender).toBe('test-nonce-123'); | ||
| expect(config._rev).toBe(1); | ||
| }); | ||
|
|
||
| it('commitLayers via onParamChange does NOT send standalone _sender/_rev (avoids param wipe)', () => { | ||
| const onParamChange = vi.fn(); | ||
| const refs = makeRefs(); | ||
|
|
||
| const adapter = createCommitAdapter( | ||
| NODE_ID, | ||
| undefined, | ||
| onParamChange, | ||
| refs.paramsRef, | ||
| refs.layersRef, | ||
| refs.textOverlaysRef, | ||
| refs.imageOverlaysRef | ||
| )!; | ||
|
|
||
| adapter.commitLayers([makeLayerState('in_0')]); | ||
|
|
||
| // Only the real param is sent — no standalone _sender/_rev messages | ||
| // because each tuneNode call replaces the server's full node.params. | ||
| expect(onParamChange).toHaveBeenCalledTimes(1); | ||
| expect(onParamChange.mock.calls[0][1]).toBe('layers'); | ||
| }); | ||
|
|
||
| it('rev increments monotonically across multiple commits', () => { | ||
| const onConfigChange = vi.fn(); | ||
| const refs = makeRefs(); | ||
|
|
||
| const adapter = createCommitAdapter( | ||
| NODE_ID, | ||
| onConfigChange, | ||
| undefined, | ||
| refs.paramsRef, | ||
| refs.layersRef, | ||
| refs.textOverlaysRef, | ||
| refs.imageOverlaysRef | ||
| )!; | ||
|
|
||
| adapter.commitLayers([makeLayerState('in_0')]); | ||
| adapter.commitOverlays([], []); | ||
| adapter.commitAll([makeLayerState('in_0')], [], []); | ||
|
|
||
| const rev1 = (onConfigChange.mock.calls[0][1] as Record<string, unknown>)._rev; | ||
| const rev2 = (onConfigChange.mock.calls[1][1] as Record<string, unknown>)._rev; | ||
| const rev3 = (onConfigChange.mock.calls[2][1] as Record<string, unknown>)._rev; | ||
|
|
||
| expect(rev1).toBe(1); | ||
| expect(rev2).toBe(2); | ||
| expect(rev3).toBe(3); | ||
| }); | ||
|
|
||
| it('commitOverlays via onParamChange does NOT send standalone _sender/_rev', () => { | ||
| const onParamChange = vi.fn(); | ||
| const refs = makeRefs(); | ||
|
|
||
| const adapter = createCommitAdapter( | ||
| NODE_ID, | ||
| undefined, | ||
| onParamChange, | ||
| refs.paramsRef, | ||
| refs.layersRef, | ||
| refs.textOverlaysRef, | ||
| refs.imageOverlaysRef | ||
| )!; | ||
|
|
||
| adapter.commitOverlays([], []); | ||
|
|
||
| // Only real params sent — no _sender/_rev standalone messages | ||
| expect(onParamChange).toHaveBeenCalledTimes(2); | ||
| const calls = onParamChange.mock.calls; | ||
| expect(calls[0][1]).toBe('text_overlays'); | ||
| expect(calls[1][1]).toBe('image_overlays'); | ||
| }); | ||
|
|
||
| it('returns null when both callbacks are undefined', () => { | ||
| const refs = makeRefs(); | ||
| const adapter = createCommitAdapter( | ||
| NODE_ID, | ||
| undefined, | ||
| undefined, | ||
| refs.paramsRef, | ||
| refs.layersRef, | ||
| refs.textOverlaysRef, | ||
| refs.imageOverlaysRef | ||
| ); | ||
| expect(adapter).toBeNull(); | ||
| }); | ||
| }); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,77 @@ | ||
| // SPDX-FileCopyrightText: © 2025 StreamKit Contributors | ||
| // | ||
| // SPDX-License-Identifier: MPL-2.0 | ||
|
|
||
| /** | ||
| * Unit tests for stale view-data gating in compositorServerSync. | ||
| * | ||
| * These tests verify that: | ||
| * - View data with matching _sender and _rev <= local rev is suppressed | ||
| * - View data from other senders is always applied | ||
| * - View data with _rev > local rev is applied (newer from server) | ||
| * - The activeInteractionRef guard suppresses view data during interactions | ||
| */ | ||
|
|
||
| import { describe, it, expect } from 'vitest'; | ||
|
|
||
| import type { ResolvedLayer } from '@/types/generated/compositor-types'; | ||
|
|
||
| import type { LayerState } from './compositorLayerParsers'; | ||
| import { mapServerLayers } from './compositorServerSync'; | ||
|
|
||
| function makeLayer(id: string, x: number, width: number): LayerState { | ||
| return { | ||
| id, | ||
| x, | ||
| y: 0, | ||
| width, | ||
| height: 720, | ||
| opacity: 1.0, | ||
| zIndex: 0, | ||
| rotationDegrees: 0, | ||
| mirrorHorizontal: false, | ||
| mirrorVertical: false, | ||
| visible: true, | ||
| cropX: 0.5, | ||
| cropY: 0.5, | ||
| cropZoom: 1.0, | ||
| cropShape: 'rect', | ||
| }; | ||
| } | ||
|
|
||
| describe('mapServerLayers — pure geometry merge', () => { | ||
| it('updates geometry from server for matching layers', () => { | ||
| const prev = [makeLayer('in_0', 0, 1280)]; | ||
| const serverLayers: ResolvedLayer[] = [{ id: 'in_0', x: 160, y: 0, width: 960, height: 720 }]; | ||
|
|
||
| const result = mapServerLayers(prev, serverLayers); | ||
|
|
||
| expect(result[0].x).toBe(160); | ||
| expect(result[0].width).toBe(960); | ||
| // Config-driven fields preserved | ||
| expect(result[0].opacity).toBe(1.0); | ||
| expect(result[0].visible).toBe(true); | ||
| }); | ||
|
|
||
| it('returns same reference when geometry is unchanged', () => { | ||
| const prev = [makeLayer('in_0', 160, 960)]; | ||
| const serverLayers: ResolvedLayer[] = [{ id: 'in_0', x: 160, y: 0, width: 960, height: 720 }]; | ||
|
|
||
| const result = mapServerLayers(prev, serverLayers); | ||
|
|
||
| expect(result).toBe(prev); // referential equality | ||
| }); | ||
|
|
||
| it('filters out layers not in local state', () => { | ||
| const prev = [makeLayer('in_0', 0, 1280)]; | ||
| const serverLayers: ResolvedLayer[] = [ | ||
| { id: 'in_0', x: 160, y: 0, width: 960, height: 720 }, | ||
| { id: 'in_1', x: 0, y: 0, width: 320, height: 240 }, | ||
| ]; | ||
|
|
||
| const result = mapServerLayers(prev, serverLayers); | ||
|
|
||
| expect(result).toHaveLength(1); | ||
| expect(result[0].id).toBe('in_0'); | ||
| }); | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚩 Synchronous
handle_tune_nodelacks_-prefix stripping (inconsistency with fire-and-forget handler)The synchronous
handle_tune_nodeatapps/skit/src/websocket_handlers.rs:850storesparams.clone()directly into the pipeline model without stripping_-prefixed keys, unlike its fire-and-forget counterpart at line 983-986 which doesmap.retain(|k, _| !k.starts_with('_')). While the current client never sends_sender/_revthrough the synchronousTuneNodeaction (the stamped configs usetunenodeasyncandtunenodesilent), this inconsistency means that if any API client sends_sender/_revvia the synchronous path, those keys would leak into the durable pipeline model and appear inGetPipelineresponses. Consider applying the same sanitization for defensive consistency.(Refers to lines 847-857)
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
Playground