From 2e8e9d0f017b0a8409f3044d6d2f39643600f585 Mon Sep 17 00:00:00 2001 From: AI Assistant Date: Thu, 22 Jan 2026 08:37:16 +0000 Subject: [PATCH] Add pipeline throttling Signed-off-by: AI Assistant --- src/scope/server/pipeline_processor.py | 23 +++ src/scope/server/pipeline_throttler.py | 213 +++++++++++++++++++++++++ 2 files changed, 236 insertions(+) create mode 100644 src/scope/server/pipeline_throttler.py diff --git a/src/scope/server/pipeline_processor.py b/src/scope/server/pipeline_processor.py index 1f27546a..31f4a1d3 100644 --- a/src/scope/server/pipeline_processor.py +++ b/src/scope/server/pipeline_processor.py @@ -12,6 +12,7 @@ from scope.core.pipelines.controller import parse_ctrl_input from .pipeline_manager import PipelineNotAvailableException +from .pipeline_throttler import PipelineThrottler logger = logging.getLogger(__name__) @@ -84,6 +85,11 @@ def __init__( "vace_use_input_video", True ) + # Throttler for controlling processing rate in chained pipelines + # Throttling is applied when this pipeline produces frames faster than + # the next pipeline in the chain can consume them + self.throttler = PipelineThrottler() + def _resize_output_queue(self, target_size: int): """Resize the output queue to the target size, transferring existing frames. @@ -122,6 +128,9 @@ def set_next_processor(self, next_processor: "PipelineProcessor"): """ self.next_processor = next_processor + # Set throttler's reference to next processor for throttling decisions + self.throttler.set_next_processor(next_processor) + # Calculate output queue size based on next processor's requirements next_pipeline = next_processor.pipeline if hasattr(next_pipeline, "prepare"): @@ -337,6 +346,7 @@ def process_chunk(self): requirements = self.pipeline.prepare(**prepare_params) video_input = None + input_frame_count = 0 if requirements is not None: current_chunk_size = requirements.input_size @@ -353,6 +363,7 @@ def process_chunk(self): # Use prepare_chunk to uniformly sample frames from the queue video_input = self.prepare_chunk(input_queue_ref, current_chunk_size) + input_frame_count = len(video_input) if video_input else 0 try: # Pass parameters (excluding prepare-only parameters) @@ -425,6 +436,12 @@ def process_chunk(self): f"Pipeline {self.pipeline_id} processed in {processing_time:.4f}s, {num_frames} frames" ) + # Record batch timing for throttling calculations + if input_frame_count > 0: + self.throttler.record_input_batch(input_frame_count, processing_time) + if num_frames > 0: + self.throttler.record_output_batch(num_frames, processing_time) + # Normalize to [0, 255] and convert to uint8 output = ( (output * 255.0) @@ -454,6 +471,12 @@ def process_chunk(self): f"Output queue full for {self.pipeline_id}, dropping processed frame" ) continue + + # Apply throttling if this pipeline is producing faster than next can consume + # Only throttle if: (1) has video input, (2) has next processor + if video_input is not None and self.next_processor is not None: + self.throttler.throttle() + except Exception as e: if self._is_recoverable(e): logger.error( diff --git a/src/scope/server/pipeline_throttler.py b/src/scope/server/pipeline_throttler.py new file mode 100644 index 00000000..41d43976 --- /dev/null +++ b/src/scope/server/pipeline_throttler.py @@ -0,0 +1,213 @@ +"""Pipeline throttler for controlling frame processing rate in chained pipelines.""" + +import logging +import threading +import time +from collections import deque +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .pipeline_processor import PipelineProcessor + +logger = logging.getLogger(__name__) + +# Throttling constants +FPS_SAMPLE_SIZE = 30 +FPS_MIN_SAMPLES = 5 +MIN_FPS = 1.0 +MAX_FPS = 120.0 + +# Multiplier for target FPS when throttling +# e.g., if next pipeline processes at 6 FPS, target ~9 FPS (1.5x) +THROTTLE_TARGET_MULTIPLIER = 1.5 + + +class PipelineThrottler: + """Controls processing rate of a pipeline based on downstream pipeline performance. + + When pipelines are chained (A -> B -> C), a faster upstream pipeline should not + produce frames much faster than the downstream pipeline can consume them. + This throttler measures the downstream pipeline's input processing rate and + adds appropriate delays to match it. + + To disable throttling, simply don't create a throttler instance (set to None). + """ + + def __init__(self): + """Initialize the throttler.""" + self._lock = threading.Lock() + + # Track this pipeline's output FPS (how fast it produces frames) + self._output_times: deque[float] = deque(maxlen=FPS_SAMPLE_SIZE) + self._output_fps: float = MAX_FPS + + # Track this pipeline's input FPS (how fast it consumes frames) + self._input_times: deque[float] = deque(maxlen=FPS_SAMPLE_SIZE) + self._input_fps: float = MAX_FPS + + # Reference to next processor (set externally) + self._next_processor: PipelineProcessor | None = None + + def set_next_processor(self, processor: "PipelineProcessor | None"): + """Set the next processor in the chain for throttling decisions. + + Args: + processor: The next pipeline processor, or None if this is the last. + """ + with self._lock: + self._next_processor = processor + + def record_input_batch(self, num_frames: int, processing_time: float): + """Record input batch processing for FPS calculation. + + Args: + num_frames: Number of input frames in the batch. + processing_time: Time taken to process the batch in seconds. + """ + if num_frames <= 0 or processing_time <= 0: + return + + with self._lock: + current_time = time.time() + # Record timestamps for each frame in the batch + for i in range(num_frames): + # Distribute timestamps across the processing time + frame_time = ( + current_time + - processing_time + + (processing_time * (i + 1) / num_frames) + ) + self._input_times.append(frame_time) + + self._update_input_fps() + + def record_output_batch(self, num_frames: int, processing_time: float): + """Record output batch for FPS calculation. + + Args: + num_frames: Number of output frames produced. + processing_time: Time taken to produce the batch in seconds. + """ + if num_frames <= 0 or processing_time <= 0: + return + + with self._lock: + current_time = time.time() + # Record timestamps for each frame in the batch + for i in range(num_frames): + frame_time = ( + current_time + - processing_time + + (processing_time * (i + 1) / num_frames) + ) + self._output_times.append(frame_time) + + self._update_output_fps() + + def _update_input_fps(self): + """Update input FPS calculation. Must be called with lock held.""" + if len(self._input_times) >= FPS_MIN_SAMPLES: + times = list(self._input_times) + time_span = times[-1] - times[0] + if time_span >= 0.05: # At least 50ms + num_frames = len(times) + fps = num_frames / time_span + self._input_fps = max(MIN_FPS, min(MAX_FPS, fps)) + + def _update_output_fps(self): + """Update output FPS calculation. Must be called with lock held.""" + if len(self._output_times) >= FPS_MIN_SAMPLES: + times = list(self._output_times) + time_span = times[-1] - times[0] + if time_span >= 0.05: # At least 50ms + num_frames = len(times) + fps = num_frames / time_span + self._output_fps = max(MIN_FPS, min(MAX_FPS, fps)) + + def get_input_fps(self) -> float: + """Get the current input FPS (how fast this pipeline consumes frames).""" + with self._lock: + return self._input_fps + + def get_output_fps(self) -> float: + """Get the current output FPS (how fast this pipeline produces frames).""" + with self._lock: + return self._output_fps + + def should_throttle(self) -> bool: + """Check if this pipeline should be throttled. + + Returns: + True if throttling should be applied, False otherwise. + """ + with self._lock: + # No throttling if no next processor + if self._next_processor is None: + return False + + next_input_fps = self._next_processor.throttler.get_input_fps() + + # Throttle if we're producing faster than the target rate + return self._output_fps > next_input_fps * THROTTLE_TARGET_MULTIPLIER + + def calculate_delay(self) -> float: + """Calculate the delay needed to match downstream processing rate. + + Returns: + Delay in seconds to sleep, or 0 if no delay needed. + """ + with self._lock: + if self._next_processor is None: + return 0.0 + + next_input_fps = self._next_processor.throttler.get_input_fps() + + # Target FPS is slightly higher than next pipeline's input FPS + target_fps = next_input_fps * THROTTLE_TARGET_MULTIPLIER + + # Don't throttle if we're not faster than the target + if self._output_fps <= target_fps: + return 0.0 + + # Calculate delay needed per frame + # Current interval: 1/output_fps + # Target interval: 1/target_fps + # Delay = target_interval - current_interval + if target_fps <= 0: + return 0.0 + + current_interval = 1.0 / self._output_fps if self._output_fps > 0 else 0 + target_interval = 1.0 / target_fps + + delay = target_interval - current_interval + + # Only return positive delays, capped to reasonable maximum + return max(0.0, min(delay, 1.0)) + + def throttle(self): + """Apply throttling by sleeping if necessary. + + This should be called after processing a batch and before starting the next. + """ + delay = self.calculate_delay() + if delay > 0: + logger.debug( + f"Throttling: sleeping {delay:.3f}s " + f"(output={self._output_fps:.1f}fps, " + f"next_input={self._get_next_input_fps():.1f}fps)" + ) + time.sleep(delay) + + def _get_next_input_fps(self) -> float: + """Get next processor's input FPS. Must be called with lock held.""" + if self._next_processor is None: + return MAX_FPS + return self._next_processor.throttler.get_input_fps() + + def reset(self): + """Reset FPS tracking data.""" + with self._lock: + self._input_times.clear() + self._output_times.clear() + self._input_fps = MAX_FPS + self._output_fps = MAX_FPS