Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/scope/core/pipelines/controller_viz/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __init__(
"MouseRight": ((15, -15), (0.2, 0.2, 0.8)), # Blue, top-right
}

def __call__(self, **kwargs) -> torch.Tensor:
def __call__(self, **kwargs) -> dict:
"""Render controller input visualization.

Args:
Expand Down Expand Up @@ -153,4 +153,4 @@ def __call__(self, **kwargs) -> torch.Tensor:
self._output[0, iy1:iy2, ix1:ix2, 1] = color[1]
self._output[0, iy1:iy2, ix1:ix2, 2] = color[2]

return self._output.clamp(0, 1)
return {"video": self._output.clamp(0, 1)}
15 changes: 8 additions & 7 deletions src/scope/core/pipelines/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

import torch
from pydantic import BaseModel

if TYPE_CHECKING:
Expand Down Expand Up @@ -61,17 +60,19 @@ def get_config_class(cls) -> type[BasePipelineConfig]:
return BasePipelineConfig

@abstractmethod
def __call__(
self, input: torch.Tensor | list[torch.Tensor] | None = None, **kwargs
) -> torch.Tensor:
def __call__(self, **kwargs) -> dict:
"""
Process a chunk of video frames.

Args:
input: A tensor in BCTHW format OR a list of frame tensors in THWC format (in [0, 255] range), or None
**kwargs: Additional parameters
**kwargs: Pipeline parameters. The input video is passed with the "video" key.
The video value is a list of tensors, where each tensor has shape
(1, H, W, C) in THWC format with values in [0, 255] range (uint8).
The list contains one tensor per frame. Other common parameters include
prompts, init_cache, etc.

Returns:
A processed chunk tensor in THWC format and [0, 1] range
A dictionary containing the processed video tensor under the "video" key.
The video tensor is in THWC format and [0, 1] range.
"""
pass
6 changes: 3 additions & 3 deletions src/scope/core/pipelines/krea_realtime_video/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,13 @@ def prepare(self, **kwargs) -> Requirements | None:
"""Return input requirements based on current mode."""
return prepare_for_mode(self.__class__, self.components.config, kwargs)

def __call__(self, **kwargs) -> torch.Tensor:
def __call__(self, **kwargs) -> dict:
self.first_call, self.last_mode = handle_mode_transition(
self.state, self.components.vae, self.first_call, self.last_mode, kwargs
)
return self._generate(**kwargs)

def _generate(self, **kwargs) -> torch.Tensor:
def _generate(self, **kwargs) -> dict:
# Handle runtime LoRA scale updates before writing into state.
lora_scales = kwargs.get("lora_scales")
if lora_scales is not None:
Expand Down Expand Up @@ -258,4 +258,4 @@ def _generate(self, **kwargs) -> torch.Tensor:
apply_mode_defaults_to_state(self.state, self.__class__, mode, kwargs)

_, self.state = self.blocks(self.components, self.state)
return postprocess_chunk(self.state.values["output_video"])
return {"video": postprocess_chunk(self.state.values["output_video"])}
3 changes: 2 additions & 1 deletion src/scope/core/pipelines/krea_realtime_video/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ def generate_video(

prompts = [{"text": prompt_text, "weight": 100}]
# Reset cache on first call of each video generation
output = pipeline(
output_dict = pipeline(
prompts=prompts, kv_cache_attention_bias=0.3, init_cache=is_first_call
)
output = output_dict["video"]
is_first_call = False

num_output_frames, _, _, _ = output.shape
Expand Down
6 changes: 3 additions & 3 deletions src/scope/core/pipelines/longlive/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,13 @@ def prepare(self, **kwargs) -> Requirements | None:
"""Return input requirements based on current mode."""
return prepare_for_mode(self.__class__, self.components.config, kwargs)

def __call__(self, **kwargs) -> torch.Tensor:
def __call__(self, **kwargs) -> dict:
self.first_call, self.last_mode = handle_mode_transition(
self.state, self.components.vae, self.first_call, self.last_mode, kwargs
)
return self._generate(**kwargs)

def _generate(self, **kwargs) -> torch.Tensor:
def _generate(self, **kwargs) -> dict:
# Handle runtime LoRA scale updates before writing into state.
lora_scales = kwargs.get("lora_scales")
if lora_scales is not None:
Expand Down Expand Up @@ -239,4 +239,4 @@ def _generate(self, **kwargs) -> torch.Tensor:
apply_mode_defaults_to_state(self.state, self.__class__, mode, kwargs)

_, self.state = self.blocks(self.components, self.state)
return postprocess_chunk(self.state.values["output_video"])
return {"video": postprocess_chunk(self.state.values["output_video"])}
3 changes: 2 additions & 1 deletion src/scope/core/pipelines/longlive/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def generate_video(

prompts = [{"text": prompt_text, "weight": 100}]
# Reset cache on first call of each video generation
output = pipeline(prompts=prompts, init_cache=is_first_call)
output_dict = pipeline(prompts=prompts, init_cache=is_first_call)
output = output_dict["video"]
is_first_call = False

num_output_frames, _, _, _ = output.shape
Expand Down
3 changes: 2 additions & 1 deletion src/scope/core/pipelines/longlive/test_vace.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,8 @@ def main():
)

# Generate
output = pipeline(**kwargs)
output_dict = pipeline(**kwargs)
output = output_dict["video"]
is_first_chunk = False

# Metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,8 @@ def main():
)

# Generate
output = pipeline(**kwargs)
output_dict = pipeline(**kwargs)
output = output_dict["video"]

# Metrics
num_output_frames, _, _, _ = output.shape
Expand Down
6 changes: 3 additions & 3 deletions src/scope/core/pipelines/memflow/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,13 @@ def prepare(self, **kwargs) -> Requirements | None:
"""Return input requirements based on current mode."""
return prepare_for_mode(self.__class__, self.components.config, kwargs)

def __call__(self, **kwargs) -> torch.Tensor:
def __call__(self, **kwargs) -> dict:
self.first_call, self.last_mode = handle_mode_transition(
self.state, self.components.vae, self.first_call, self.last_mode, kwargs
)
return self._generate(**kwargs)

def _generate(self, **kwargs) -> torch.Tensor:
def _generate(self, **kwargs) -> dict:
# Handle runtime LoRA scale updates before writing into state.
lora_scales = kwargs.get("lora_scales")
if lora_scales is not None:
Expand Down Expand Up @@ -239,4 +239,4 @@ def _generate(self, **kwargs) -> torch.Tensor:
apply_mode_defaults_to_state(self.state, self.__class__, mode, kwargs)

_, self.state = self.blocks(self.components, self.state)
return postprocess_chunk(self.state.values["output_video"])
return {"video": postprocess_chunk(self.state.values["output_video"])}
3 changes: 2 additions & 1 deletion src/scope/core/pipelines/memflow/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def generate_video(

prompts = [{"text": prompt_text, "weight": 100}]
# Reset cache on first call of each video generation
output = pipeline(prompts=prompts, init_cache=is_first_call)
output_dict = pipeline(prompts=prompts, init_cache=is_first_call)
output = output_dict["video"]
is_first_call = False

num_output_frames, _, _, _ = output.shape
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ def generate_test_video(
)

# Generate
output = pipeline(**kwargs)
output_dict = pipeline(**kwargs)
output = output_dict["video"]

# Metrics
num_output_frames, _, _, _ = output.shape
Expand Down
4 changes: 2 additions & 2 deletions src/scope/core/pipelines/passthrough/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def prepare(self, **kwargs) -> Requirements:
def __call__(
self,
**kwargs,
) -> torch.Tensor:
) -> dict:
input = kwargs.get("video")

if input is None:
Expand All @@ -53,4 +53,4 @@ def __call__(

input = rearrange(input, "B C T H W -> B T C H W")

return postprocess_chunk(input)
return {"video": postprocess_chunk(input)}
6 changes: 3 additions & 3 deletions src/scope/core/pipelines/reward_forcing/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,13 @@ def prepare(self, **kwargs) -> Requirements | None:
"""Return input requirements based on current mode."""
return prepare_for_mode(self.__class__, self.components.config, kwargs)

def __call__(self, **kwargs) -> torch.Tensor:
def __call__(self, **kwargs) -> dict:
self.first_call, self.last_mode = handle_mode_transition(
self.state, self.components.vae, self.first_call, self.last_mode, kwargs
)
return self._generate(**kwargs)

def _generate(self, **kwargs) -> torch.Tensor:
def _generate(self, **kwargs) -> dict:
# Handle runtime LoRA scale updates before writing into state.
lora_scales = kwargs.get("lora_scales")
if lora_scales is not None:
Expand Down Expand Up @@ -213,4 +213,4 @@ def _generate(self, **kwargs) -> torch.Tensor:
apply_mode_defaults_to_state(self.state, self.__class__, mode, kwargs)

_, self.state = self.blocks(self.components, self.state)
return postprocess_chunk(self.state.values["output_video"])
return {"video": postprocess_chunk(self.state.values["output_video"])}
3 changes: 2 additions & 1 deletion src/scope/core/pipelines/reward_forcing/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def generate_video(

prompts = [{"text": prompt_text, "weight": 100}]
# Reset cache on first call of each video generation
output = pipeline(prompts=prompts, init_cache=is_first_call)
output_dict = pipeline(prompts=prompts, init_cache=is_first_call)
output = output_dict["video"]
is_first_call = False

num_output_frames, _, _, _ = output.shape
Expand Down
4 changes: 2 additions & 2 deletions src/scope/core/pipelines/rife/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def prepare(self, **kwargs) -> Requirements:
def __call__(
self,
**kwargs,
) -> torch.Tensor:
) -> dict:
input = kwargs.get("video")

if input is None:
Expand Down Expand Up @@ -89,4 +89,4 @@ def __call__(
interpolated_float = interpolated.float() / 255.0

# Return THWC [0, 1] float format (same as postprocess_chunk output)
return interpolated_float
return {"video": interpolated_float}
3 changes: 2 additions & 1 deletion src/scope/core/pipelines/rife/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def main():

# Process all frames at once
start = time.time()
output = pipeline(video=video_list)
output_dict = pipeline(video=video_list)
output = output_dict["video"]
latency = time.time() - start

num_output_frames = output.shape[0]
Expand Down
6 changes: 3 additions & 3 deletions src/scope/core/pipelines/streamdiffusionv2/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,13 @@ def prepare(self, **kwargs) -> Requirements | None:
"""Return input requirements based on current mode."""
return prepare_for_mode(self.__class__, self.components.config, kwargs)

def __call__(self, **kwargs) -> torch.Tensor:
def __call__(self, **kwargs) -> dict:
self.first_call, self.last_mode = handle_mode_transition(
self.state, self.components.vae, self.first_call, self.last_mode, kwargs
)
return self._generate(**kwargs)

def _generate(self, **kwargs) -> torch.Tensor:
def _generate(self, **kwargs) -> dict:
# Handle runtime LoRA scale updates before writing into state.
lora_scales = kwargs.get("lora_scales")
if lora_scales is not None:
Expand Down Expand Up @@ -217,4 +217,4 @@ def _generate(self, **kwargs) -> torch.Tensor:
apply_mode_defaults_to_state(self.state, self.__class__, mode, kwargs)

_, self.state = self.blocks(self.components, self.state)
return postprocess_chunk(self.state.values["output_video"])
return {"video": postprocess_chunk(self.state.values["output_video"])}
3 changes: 2 additions & 1 deletion src/scope/core/pipelines/streamdiffusionv2/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@

start = time.time()
# output is TCHW
output = pipeline(video=chunk, prompts=prompts)
output_dict = pipeline(video=chunk, prompts=prompts)
output = output_dict["video"]

num_output_frames, _, _, _ = output.shape
latency = time.time() - start
Expand Down
3 changes: 2 additions & 1 deletion src/scope/core/pipelines/streamdiffusionv2/test_r2v.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@
kwargs["vace_ref_images"] = ref_images
kwargs["vace_context_scale"] = 1.0

output = pipeline(**kwargs)
output_dict = pipeline(**kwargs)
output = output_dict["video"]
is_first_chunk = False

num_output_frames, _, _, _ = output.shape
Expand Down
6 changes: 4 additions & 2 deletions src/scope/core/pipelines/video_depth_anything/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def __init__(
def prepare(self, **kwargs) -> Requirements:
return Requirements(input_size=4)

def __call__(self, **kwargs) -> torch.Tensor:
def __call__(self, **kwargs) -> dict:
"""Process video frames and return depth maps.

Args:
Expand Down Expand Up @@ -152,4 +152,6 @@ def __call__(self, **kwargs) -> torch.Tensor:
if d_max > d_min
else torch.zeros_like(depths)
)
return depths.unsqueeze(-1).repeat(1, 1, 1, 3) # THWC with 3 channels
return {
"video": depths.unsqueeze(-1).repeat(1, 1, 1, 3)
} # THWC with 3 channels
3 changes: 2 additions & 1 deletion src/scope/core/pipelines/video_depth_anything/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ def main():
for i, frame in enumerate(video_list):
start = time.time()
# Call pipeline with single frame
depth = pipeline(video=[frame])
output_dict = pipeline(video=[frame])
depth = output_dict["video"]
latency = time.time() - start

num_output_frames, _, _, _ = depth.shape
Expand Down
5 changes: 4 additions & 1 deletion src/scope/server/pipeline_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,10 @@ def process_chunk(self):
# Latent initialization: route to video
call_params["video"] = video_input

output = self.pipeline(**call_params)
output_dict = self.pipeline(**call_params)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we extract other keys and forward them to handle processors that produce auxiliary outputs?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think we should actually pass the whole dict into the next processor.

But... I'd prefer to do it in a separate PR to keep this PR dedicated to just the interface change.

One thing to think about is how to pass data between pipeline processors.

  • For video, we currently extract individual frames and add then to the output_queue
  • If we want to pass all outputs, that poses a question how to pass them:
    • One option is what you did here, so treat "video" the same as before and all other params just set as parameter
    • More correct and long-term option would be to queue the whole "dict", but then we would need to rethink how the queuing and waiting works, because the each pipeline can produce / consume different n of frames


# Extract video from the returned dictionary
output = output_dict["video"]

# Clear one-shot parameters after use to prevent sending them on subsequent chunks
# These parameters should only be sent when explicitly provided in parameter updates
Expand Down
Loading