Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion apps/skit/src/websocket_handlers.rs
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Synchronous handle_tune_node lacks _-prefix stripping (inconsistency with fire-and-forget handler)

The synchronous handle_tune_node at apps/skit/src/websocket_handlers.rs:850 stores params.clone() directly into the pipeline model without stripping _-prefixed keys, unlike its fire-and-forget counterpart at line 983-986 which does map.retain(|k, _| !k.starts_with('_')). While the current client never sends _sender/_rev through the synchronous TuneNode action (the stamped configs use tunenodeasync and tunenodesilent), this inconsistency means that if any API client sends _sender/_rev via the synchronous path, those keys would leak into the durable pipeline model and appear in GetPipeline responses. Consider applying the same sanitization for defensive consistency.

(Refers to lines 847-857)

Staging: Open in Devin

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground

Original file line number Diff line number Diff line change
Expand Up @@ -975,9 +975,18 @@ async fn handle_tune_node_fire_and_forget(
}

{
// Store sanitized params: strip transient sync metadata
// (_sender, _rev, etc.) from the durable pipeline model.
// Top-level keys prefixed with `_` are reserved for
// in-flight metadata and must not leak into persistence
// or GetPipeline responses.
let mut durable_params = params.clone();
if let serde_json::Value::Object(ref mut map) = durable_params {
map.retain(|k, _| !k.starts_with('_'));
}
let mut pipeline = session.pipeline.lock().await;
if let Some(node) = pipeline.nodes.get_mut(&node_id) {
node.params = Some(params.clone());
node.params = Some(durable_params);
} else {
warn!(
node_id = %node_id,
Expand Down
39 changes: 35 additions & 4 deletions crates/nodes/src/video/compositor/mod.rs
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Stale _sender/_rev on view data after pin-management layout changes causes client to suppress legitimate updates

When a pin management event (input added/removed) triggers a layout change in the compositor, the emitted view data inherits the stale config_sender/config_rev from the last UpdateParams. The client that last sent a config update will suppress this view data via the stale echo gate in ui/src/hooks/compositorServerSync.ts:153-157, missing the layout change.

Scenario walkthrough
  1. Client A sends UpdateParams with _sender: "A", _rev: 5 → compositor stores config_sender="A", config_rev=5, marks dirty, emits view data stamped A/5.
  2. Client A's gate: sender==A && rev(5) <= localRev(5) → suppressed (correct — client already has this layout).
  3. A new input connects → handle_pin_management runs at line 664, sets layer_configs_dirty = true but does NOT clear config_sender/config_rev.
  4. On next tick, resolve_scene rebuilds layout (new auto-PiP layer), layout differs from last_layout → view data emitted stamped with stale A/5.
  5. Client A's gate: sender==A && rev(5) <= localRev(5) → suppressed (incorrect — this is a new layout from the pin event that the client needs).

Client A misses the layout update until its next config change bumps the rev.

(Refers to lines 664-671)

Staging: Open in Devin

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground

Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,12 @@ pub struct CompositorNode {
input_pins: Vec<InputPin>,
/// Next input ID for dynamic pin naming.
next_input_id: usize,
/// Sender nonce from the last `UpdateParams` (`_sender` field).
/// Stamped into view data so clients can detect stale self-echoes.
config_sender: String,
/// Config revision from the last `UpdateParams` (`_rev` field).
/// Stamped into view data alongside `config_sender`.
config_rev: u64,
}

impl CompositorNode {
Expand All @@ -311,7 +317,14 @@ impl CompositorNode {
},
);

Self { config, limits, input_pins, next_input_id }
Self {
config,
limits,
input_pins,
next_input_id,
config_sender: String::new(),
config_rev: 0,
}
}

/// The set of video packet types accepted by compositor input pins.
Expand Down Expand Up @@ -602,14 +615,26 @@ impl ProcessorNode for CompositorNode {
tracing::info!("CompositorNode received shutdown");
break;
},
NodeControlMessage::UpdateParams(params) => {
NodeControlMessage::UpdateParams(ref params) => {
// Extract transient sync metadata before
// deserialization strips unknown fields.
// Always overwrite (not conditionally set) so that
// non-stamped UpdateParams clears stale values.
self.config_sender = params.get("_sender")
.and_then(|v| v.as_str())
.map(str::to_string)
.unwrap_or_default();
self.config_rev = params.get("_rev")
.and_then(serde_json::Value::as_u64)
.unwrap_or(0);

let old_fps = self.config.fps;
Self::apply_update_params(
&mut self.config,
&self.limits,
&mut image_overlays,
&mut text_overlays,
params,
params.clone(),
&mut stats_tracker,
);
layer_configs_dirty = true;
Expand Down Expand Up @@ -719,7 +744,13 @@ impl ProcessorNode for CompositorNode {

// Emit layout via view data if it changed.
if last_layout.as_ref() != Some(&scene.layout) {
if let Ok(json) = serde_json::to_value(&scene.layout) {
if let Ok(mut json) = serde_json::to_value(&scene.layout) {
// Stamp view data with the sender/rev from the last
// UpdateParams so clients can detect stale self-echoes.
if !self.config_sender.is_empty() {
json["_sender"] = serde_json::Value::from(self.config_sender.as_str());
json["_rev"] = serde_json::Value::from(self.config_rev);
}
view_data_helpers::emit_view_data(&view_data_tx, &node_name, || json);
}
last_layout = Some(scene.layout.clone());
Comment on lines +747 to 756
Copy link
Contributor Author

@staging-devin-ai-integration staging-devin-ai-integration bot Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Multi-client concurrent edit: last-writer-wins stamp can cause missed geometry updates

The compositor node stores only the most recent config_sender/config_rev from UpdateParams. When two clients send concurrent updates, the second writer's metadata overwrites the first's. The resulting view data is stamped with the second writer's identity. The second writer's client may gate this view data (if rev <= localRev), missing geometry effects contributed by the first writer's change.

Example: Client A sends _sender=A, _rev=5, then Client B sends _sender=B, _rev=3. Both arrive before a tick. View data is emitted with _sender=B, _rev=3. Client B gates it (if localRev ≥ 3) and misses A's geometry contribution. Client A applies it (different sender). This is an inherent limitation of single-sender stamping and would require vector clocks or similar for a complete solution. For the target use case (preventing stale self-echoes during high-frequency slider drags by a single user), the approach is sufficient.

Staging: Open in Devin

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground

Expand Down
169 changes: 169 additions & 0 deletions ui/src/hooks/compositorCommit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// SPDX-FileCopyrightText: © 2025 StreamKit Contributors
//
// SPDX-License-Identifier: MPL-2.0

/**
* Unit tests for the compositor commit adapter's causal-consistency stamping.
*
* Verifies that every commit path (commitLayers, commitOverlays, commitAll)
* injects `_sender` and `_rev` into outgoing config/params, and that the
* rev counter increments monotonically across calls.
*/

import { describe, it, expect, vi, beforeEach } from 'vitest';

import { createCommitAdapter } from './compositorCommit';
import type { LayerState, TextOverlayState, ImageOverlayState } from './compositorLayerParsers';
import { resetAllConfigRevs } from './useConfigRev';

// Mock the WebSocket service to control the client nonce
vi.mock('@/services/websocket', () => ({
getWebSocketService: () => ({
getClientNonce: () => 'test-nonce-123',
}),
}));

const NODE_ID = 'compositor';

function makeLayerState(id: string): LayerState {
return {
id,
x: 0,
y: 0,
width: 640,
height: 480,
opacity: 1.0,
zIndex: 0,
rotationDegrees: 0,
mirrorHorizontal: false,
mirrorVertical: false,
visible: true,
cropX: 0.5,
cropY: 0.5,
cropZoom: 1.0,
cropShape: 'rect',
};
}

function makeRefs() {
return {
paramsRef: { current: { width: 1280, height: 720 } as Record<string, unknown> },
layersRef: { current: [makeLayerState('in_0')] as LayerState[] },
textOverlaysRef: { current: [] as TextOverlayState[] },
imageOverlaysRef: { current: [] as ImageOverlayState[] },
};
}

beforeEach(() => {
resetAllConfigRevs();
});

describe('CommitAdapter causal-consistency stamping', () => {
it('commitLayers via onConfigChange stamps _sender and _rev', () => {
const onConfigChange = vi.fn();
const refs = makeRefs();

const adapter = createCommitAdapter(
NODE_ID,
onConfigChange,
undefined,
refs.paramsRef,
refs.layersRef,
refs.textOverlaysRef,
refs.imageOverlaysRef
)!;

adapter.commitLayers([makeLayerState('in_0')]);

expect(onConfigChange).toHaveBeenCalledTimes(1);
const config = onConfigChange.mock.calls[0][1] as Record<string, unknown>;
expect(config._sender).toBe('test-nonce-123');
expect(config._rev).toBe(1);
});

it('commitLayers via onParamChange does NOT send standalone _sender/_rev (avoids param wipe)', () => {
const onParamChange = vi.fn();
const refs = makeRefs();

const adapter = createCommitAdapter(
NODE_ID,
undefined,
onParamChange,
refs.paramsRef,
refs.layersRef,
refs.textOverlaysRef,
refs.imageOverlaysRef
)!;

adapter.commitLayers([makeLayerState('in_0')]);

// Only the real param is sent — no standalone _sender/_rev messages
// because each tuneNode call replaces the server's full node.params.
expect(onParamChange).toHaveBeenCalledTimes(1);
expect(onParamChange.mock.calls[0][1]).toBe('layers');
});

it('rev increments monotonically across multiple commits', () => {
const onConfigChange = vi.fn();
const refs = makeRefs();

const adapter = createCommitAdapter(
NODE_ID,
onConfigChange,
undefined,
refs.paramsRef,
refs.layersRef,
refs.textOverlaysRef,
refs.imageOverlaysRef
)!;

adapter.commitLayers([makeLayerState('in_0')]);
adapter.commitOverlays([], []);
adapter.commitAll([makeLayerState('in_0')], [], []);

const rev1 = (onConfigChange.mock.calls[0][1] as Record<string, unknown>)._rev;
const rev2 = (onConfigChange.mock.calls[1][1] as Record<string, unknown>)._rev;
const rev3 = (onConfigChange.mock.calls[2][1] as Record<string, unknown>)._rev;

expect(rev1).toBe(1);
expect(rev2).toBe(2);
expect(rev3).toBe(3);
});

it('commitOverlays via onParamChange does NOT send standalone _sender/_rev', () => {
const onParamChange = vi.fn();
const refs = makeRefs();

const adapter = createCommitAdapter(
NODE_ID,
undefined,
onParamChange,
refs.paramsRef,
refs.layersRef,
refs.textOverlaysRef,
refs.imageOverlaysRef
)!;

adapter.commitOverlays([], []);

// Only real params sent — no _sender/_rev standalone messages
expect(onParamChange).toHaveBeenCalledTimes(2);
const calls = onParamChange.mock.calls;
expect(calls[0][1]).toBe('text_overlays');
expect(calls[1][1]).toBe('image_overlays');
});

it('returns null when both callbacks are undefined', () => {
const refs = makeRefs();
const adapter = createCommitAdapter(
NODE_ID,
undefined,
undefined,
refs.paramsRef,
refs.layersRef,
refs.textOverlaysRef,
refs.imageOverlaysRef
);
expect(adapter).toBeNull();
});
});
25 changes: 18 additions & 7 deletions ui/src/hooks/compositorCommit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
serializeTextOverlays,
} from './compositorLayerParsers';
import type { LayerState, TextOverlayState, ImageOverlayState } from './compositorLayerParsers';
import { bumpConfigRev, getClientNonce } from './useConfigRev';

// ── Commit adapter ──────────────────────────────────────────────────────────

Expand Down Expand Up @@ -58,14 +59,24 @@ export function createCommitAdapter(
): CommitAdapter | null {
if (!onConfigChange && !onParamChange) return null;

/** Stamp a config object with causal-consistency metadata. */
function stamp(config: Record<string, unknown>): Record<string, unknown> {
const rev = bumpConfigRev(nodeId);
return { ...config, _sender: getClientNonce(), _rev: rev };
}

// NOTE: The onParamChange path (tuneNode) sends each key as a separate
// UpdateParams WS message that replaces the server's full node.params.
// Sending _sender/_rev as standalone messages would wipe durable params
// to {} after stripping. Only the onConfigChange path (tuneNodeConfig)
// can safely carry stamped metadata because it sends the full config
// object in a single message.

return {
commitLayers(layers: LayerState[]) {
if (onConfigChange) {
const config = buildConfig(
paramsRef.current,
layers,
textOverlaysRef.current,
imageOverlaysRef.current
const config = stamp(
buildConfig(paramsRef.current, layers, textOverlaysRef.current, imageOverlaysRef.current)
);
onConfigChange(nodeId, config);
} else if (onParamChange) {
Expand All @@ -75,7 +86,7 @@ export function createCommitAdapter(

commitOverlays(text: TextOverlayState[], img: ImageOverlayState[]) {
if (onConfigChange) {
const config = buildConfig(paramsRef.current, layersRef.current, text, img);
const config = stamp(buildConfig(paramsRef.current, layersRef.current, text, img));
onConfigChange(nodeId, config);
} else if (onParamChange) {
onParamChange(nodeId, 'text_overlays', serializeTextOverlays(text));
Expand All @@ -90,7 +101,7 @@ export function createCommitAdapter(
changed?: { layers?: boolean; overlays?: boolean }
) {
if (onConfigChange) {
const config = buildConfig(paramsRef.current, layers, text, img);
const config = stamp(buildConfig(paramsRef.current, layers, text, img));
onConfigChange(nodeId, config);
} else if (onParamChange) {
const sendLayers = changed?.layers ?? true;
Expand Down
77 changes: 77 additions & 0 deletions ui/src/hooks/compositorServerSync.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// SPDX-FileCopyrightText: © 2025 StreamKit Contributors
//
// SPDX-License-Identifier: MPL-2.0

/**
* Unit tests for stale view-data gating in compositorServerSync.
*
* These tests verify that:
* - View data with matching _sender and _rev <= local rev is suppressed
* - View data from other senders is always applied
* - View data with _rev > local rev is applied (newer from server)
* - The activeInteractionRef guard suppresses view data during interactions
*/

import { describe, it, expect } from 'vitest';

import type { ResolvedLayer } from '@/types/generated/compositor-types';

import type { LayerState } from './compositorLayerParsers';
import { mapServerLayers } from './compositorServerSync';

function makeLayer(id: string, x: number, width: number): LayerState {
return {
id,
x,
y: 0,
width,
height: 720,
opacity: 1.0,
zIndex: 0,
rotationDegrees: 0,
mirrorHorizontal: false,
mirrorVertical: false,
visible: true,
cropX: 0.5,
cropY: 0.5,
cropZoom: 1.0,
cropShape: 'rect',
};
}

describe('mapServerLayers — pure geometry merge', () => {
it('updates geometry from server for matching layers', () => {
const prev = [makeLayer('in_0', 0, 1280)];
const serverLayers: ResolvedLayer[] = [{ id: 'in_0', x: 160, y: 0, width: 960, height: 720 }];

const result = mapServerLayers(prev, serverLayers);

expect(result[0].x).toBe(160);
expect(result[0].width).toBe(960);
// Config-driven fields preserved
expect(result[0].opacity).toBe(1.0);
expect(result[0].visible).toBe(true);
});

it('returns same reference when geometry is unchanged', () => {
const prev = [makeLayer('in_0', 160, 960)];
const serverLayers: ResolvedLayer[] = [{ id: 'in_0', x: 160, y: 0, width: 960, height: 720 }];

const result = mapServerLayers(prev, serverLayers);

expect(result).toBe(prev); // referential equality
});

it('filters out layers not in local state', () => {
const prev = [makeLayer('in_0', 0, 1280)];
const serverLayers: ResolvedLayer[] = [
{ id: 'in_0', x: 160, y: 0, width: 960, height: 720 },
{ id: 'in_1', x: 0, y: 0, width: 320, height: 240 },
];

const result = mapServerLayers(prev, serverLayers);

expect(result).toHaveLength(1);
expect(result[0].id).toBe('in_0');
});
});
Loading
Loading