+# -----------------------------------------------------------------------------
+#
+# Copyright (c) Qualcomm Technologies, Inc. and/or its subsidiaries.
+# SPDX-License-Identifier: BSD-3-Clause
+#
+# ----------------------------------------------------------------------------
+"""
+QEfficient WAN Image-to-Video Pipeline Implementation
+
+This module provides an optimized implementation of the WAN image-to-video pipeline
+for high-performance image-to-video generation on Qualcomm AI hardware.
+The pipeline supports WAN 2.2 architectures with unified transformer for converting
+static images into dynamic video sequences with temporal consistency.
+
+TODO: 1. Update Umt5 to Qaic; present running on cpu
+"""
+
+import os
+import time
+from typing import Any, Callable, Dict, List, Optional, Tuple, Union
+
+import numpy as np
+import torch
+from diffusers import WanImageToVideoPipeline
+from diffusers.image_processor import PipelineImageInput
+from diffusers.utils.torch_utils import randn_tensor
+from tqdm import tqdm
+
+from QEfficient.diffusers.models.transformers.transformer_wan import QEffWanUnifiedWrapper
+from QEfficient.diffusers.pipelines.pipeline_module import QEffVAE, QEffWanUnifiedTransformer
+from QEfficient.diffusers.pipelines.pipeline_utils import (
+ ONNX_SUBFUNCTION_MODULE,
+ ModulePerf,
+ QEffPipelineOutput,
+ calculate_latent_dimensions_with_frames,
+ compile_modules_parallel,
+ compile_modules_sequential,
+ config_manager,
+ set_execute_params,
+ update_npi_path,
+)
+from QEfficient.generation.cloud_infer import QAICInferenceSession
+from QEfficient.utils import constants
+from QEfficient.utils.logging_utils import logger
+
+
+[docs]class QEffWanImageToVideoPipeline:
+
"""
+
QEfficient-optimized WAN image-to-video pipeline for high-performance video generation on Qualcomm AI hardware.
+
+
This pipeline provides an optimized implementation of the WAN image-to-video diffusion model
+
specifically designed for deployment on Qualcomm AI Cloud (QAIC) devices. It extends the original
+
HuggingFace WAN image-to-video model with QEfficient-optimized components that can be exported to ONNX format
+
and compiled into Qualcomm Program Container (QPC) files for efficient video generation from static images.
+
+
The pipeline supports the complete WAN image-to-video workflow including:
+
- Image conditioning and preprocessing for temporal consistency
+
- UMT5 text encoding for rich semantic understanding
+
- Unified transformer architecture: Combines multiple transformer stages into a single optimized model
+
- VAE encoding/decoding for image-to-latent and latent-to-video conversion
+
+
Attributes:
+
text_encoder: UMT5 text encoder for semantic text understanding (TODO: QEfficient optimization)
+
vae_encoder (QEffVAE): VAE encoder for converting input images to latent space
+
unified_wrapper (QEffWanUnifiedWrapper): Wrapper combining transformer stages
+
transformer (QEffWanUnifiedTransformer): Optimized unified transformer for denoising
+
vae_decoder (QEffVAE): VAE decoder for latent-to-video conversion
+
modules (Dict[str, Any]): Dictionary of pipeline modules for batch operations
+
model (WanImageToVideoPipeline): Original HuggingFace WAN I2V model reference
+
tokenizer: Text tokenizer for preprocessing
+
scheduler: Diffusion scheduler for timestep management
+
+
Example:
+
>>> from QEfficient.diffusers.pipelines.wan import QEffWanImageToVideoPipeline
+
>>> from PIL import Image
+
>>>
+
>>> # Load pipeline and input image
+
>>> pipeline = QEffWanImageToVideoPipeline.from_pretrained("Wan-AI/Wan2.2-I2V-A14B-Diffusers")
+
>>> image = Image.open("input_frame.jpg")
+
>>>
+
>>> # Generate video with motion
+
>>> result = pipeline(
+
... image=image,
+
... prompt="A person walking through a sunny garden with flowing motion",
+
... height=544,
+
... width=720,
+
... num_frames=81,
+
... num_inference_steps=4,
+
... guidance_scale=1.0
+
... )
+
>>> # Save generated video
+
>>> frames = result.images[0]
+
>>> export_to_video(frames, "generated_video.mp4", fps=16)
+
"""
+
+
_hf_auto_class = WanImageToVideoPipeline
+
+
def __init__(self, model, **kwargs):
+
"""
+
Initialize the QEfficient WAN image-to-video pipeline.
+
+
This pipeline provides an optimized implementation of the WAN image-to-video model
+
for deployment on Qualcomm AI hardware. It wraps the original HuggingFace WAN I2V model
+
components with QEfficient-optimized versions that can be exported to ONNX and compiled
+
for QAIC devices.
+
+
Args:
+
model (WanImageToVideoPipeline): Pre-loaded WanImageToVideoPipeline model with
+
transformer, transformer_2, VAE, and text encoder components
+
**kwargs: Additional keyword arguments including configuration parameters
+
"""
+
# Wrap model components with QEfficient optimized versions
+
self.model = model
+
self.custom_config = None
+
+
# Text encoder (TODO: Replace with QEfficient UMT5 optimization)
+
self.text_encoder = model.text_encoder
+
# Create unified transformer wrapper combining dual-stage models(high, low noise DiTs)
+
self.unified_wrapper = QEffWanUnifiedWrapper(model.transformer, model.transformer_2)
+
self.transformer = QEffWanUnifiedTransformer(self.unified_wrapper)
+
# VAE encoder for image-to-latent conversion
+
self.vae_encoder = QEffVAE(model.vae, "encoder")
+
# VAE decoder for latent-to-video conversion
+
self.vae_decoder = QEffVAE(model.vae, "decoder")
+
+
# Store all modules in a dictionary for easy iteration during export/compile
+
self.modules = {
+
"vae_encoder": self.vae_encoder,
+
"transformer": self.transformer,
+
"vae_decoder": self.vae_decoder,
+
}
+
+
# Copy tokenizers and scheduler from the original model
+
self.tokenizer = model.tokenizer
+
self.text_encoder.tokenizer = model.tokenizer
+
self.scheduler = model.scheduler
+
+
self.vae_encoder.get_onnx_params = self.vae_encoder.get_img_encoder_onnx_params
+
self.vae_decoder.get_onnx_params = self.vae_decoder.get_video_onnx_params
+
+
# Extract patch dimensions from transformer configuration
+
_, self.patch_height, self.patch_width = self.transformer.model.config.patch_size
+
+
@property
+
def do_classifier_free_guidance(self):
+
"""
+
Determine if classifier-free guidance should be used.
+
+
Returns:
+
bool: True if CFG should be applied based on current guidance scales
+
"""
+
return self._guidance_scale > 1.0 and (self._guidance_scale_2 is None or self._guidance_scale_2 > 1.0)
+
+
[docs] @classmethod
+
def from_pretrained(
+
cls,
+
pretrained_model_name_or_path: Optional[Union[str, os.PathLike]],
+
**kwargs,
+
):
+
"""
+
Load a pretrained WAN image-to-video model from HuggingFace Hub or local path and wrap it with QEfficient optimizations.
+
+
This class method provides a convenient way to instantiate a QEffWanImageToVideoPipeline from a pretrained
+
WAN I2V model. It automatically loads the base WanImageToVideoPipeline model in float32 precision on CPU
+
and wraps all components with QEfficient-optimized versions for QAIC deployment.
+
+
Args:
+
pretrained_model_name_or_path (str or os.PathLike): Either a HuggingFace model identifier
+
or a local path to a saved WAN I2V model directory. Should contain transformer, transformer_2,
+
text_encoder, and VAE components optimized for image-to-video generation.
+
**kwargs: Additional keyword arguments passed to WanImageToVideoPipeline.from_pretrained().
+
+
Returns:
+
QEffWanImageToVideoPipeline: A fully initialized I2V pipeline instance with QEfficient-optimized components
+
ready for export, compilation, and inference on QAIC devices.
+
+
Raises:
+
ValueError: If the model path is invalid or model cannot be loaded
+
OSError: If there are issues accessing the model files
+
RuntimeError: If model initialization fails
+
+
Example:
+
>>> # Load from HuggingFace Hub
+
>>> pipeline = QEffWanImageToVideoPipeline.from_pretrained("Wan-AI/Wan2.2-I2V-A14B-Diffusers")
+
>>>
+
>>> # Load from local path
+
>>> pipeline = QEffWanImageToVideoPipeline.from_pretrained("/local/path/to/wan/i2v")
+
>>>
+
>>> # Load with custom cache directory
+
>>> pipeline = QEffWanImageToVideoPipeline.from_pretrained(
+
... "Wan-AI/Wan2.2-I2V-A14B-Diffusers",
+
... cache_dir="/custom/cache/dir"
+
... )
+
"""
+
# Load the base WAN model in float32 on CPU for optimization
+
model = cls._hf_auto_class.from_pretrained(
+
pretrained_model_name_or_path,
+
torch_dtype=torch.float32,
+
device_map="cpu",
+
**kwargs,
+
)
+
return cls(
+
model=model,
+
pretrained_model_name_or_path=pretrained_model_name_or_path,
+
**kwargs,
+
)
+
+
[docs] def export(
+
self,
+
export_dir: Optional[str] = None,
+
use_onnx_subfunctions: bool = False,
+
) -> str:
+
"""
+
Export all pipeline modules to ONNX format for deployment preparation.
+
+
This method systematically exports the VAE encoder, unified transformer, and VAE decoder to ONNX format with
+
image-to-video specific configurations including temporal dimensions, dynamic axes, and
+
optimization settings.
+
+
The export process prepares the models for subsequent compilation to QPC format, enabling
+
efficient inference on QAIC hardware. ONNX subfunctions can be used for certain modules
+
to optimize memory usage and performance.
+
+
Args:
+
export_dir (str, optional): Target directory for saving ONNX model files. If None,
+
uses the default export directory structure. The directory will be created
+
if it doesn't exist.
+
use_onnx_subfunctions (bool, default=False): Whether to enable ONNX subfunction
+
optimization for supported modules. This can optimize the graph structure
+
and improve compilation efficiency for complex models like the transformer.
+
+
Returns:
+
str: Absolute path to the export directory containing all ONNX model files.
+
+
Raises:
+
RuntimeError: If ONNX export fails for any module
+
OSError: If there are issues creating the export directory or writing files
+
ValueError: If module configurations are invalid
+
+
Example:
+
>>> pipeline = QEffWanImageToVideoPipeline.from_pretrained("Wan-AI/Wan2.2-I2V-A14B-Diffusers")
+
>>> export_path = pipeline.export(
+
... export_dir="/path/to/export",
+
... use_onnx_subfunctions=True
+
... )
+
>>> print(f"Models exported to: {export_path}")
+
"""
+
+
# Export each module with corresponding parameters
+
for module_name, module_obj in tqdm(self.modules.items(), desc="Exporting modules", unit="module"):
+
# Get ONNX export configuration with video dimensions
+
example_inputs, dynamic_axes, output_names = module_obj.get_onnx_params()
+
+
# Prepare export parameters
+
export_params = {
+
"inputs": example_inputs,
+
"output_names": output_names,
+
"dynamic_axes": dynamic_axes,
+
"export_dir": export_dir,
+
}
+
+
# Enable ONNX subfunctions for supported modules if requested
+
if use_onnx_subfunctions and module_name in ONNX_SUBFUNCTION_MODULE:
+
export_params["use_onnx_subfunctions"] = True
+
+
module_obj.export(**export_params)
+
+
[docs] @staticmethod
+
def get_default_config_path():
+
"""
+
Get the default configuration file path for WAN pipeline.
+
+
Returns:
+
str: Path to the default WAN configuration JSON file.
+
"""
+
return os.path.join(os.path.dirname(os.path.dirname(__file__)), "configs/wan_i2v_config.json")
+
+
[docs] @staticmethod
+
def get_vae_encoder_npi_path():
+
"""
+
Get the default VAE encoder NPI configuration file path for WAN I2V pipeline.
+
+
Returns:
+
str: Path to the default WAN I2V VAE encoder NPI file.
+
"""
+
return os.path.join(os.path.dirname(os.path.dirname(__file__)), "configs/npi_wan_i2v_vae_encoder.yaml")
+
+
[docs] def compile(
+
self,
+
compile_config: Optional[str] = None,
+
parallel: bool = False,
+
height: int = constants.WAN_ONNX_EXPORT_HEIGHT_45P,
+
width: int = constants.WAN_ONNX_EXPORT_WIDTH_45P,
+
num_frames: int = constants.WAN_ONNX_EXPORT_FRAMES,
+
use_onnx_subfunctions: bool = False,
+
) -> str:
+
"""
+
Compiles the ONNX graphs of the different model components for deployment on Qualcomm AI hardware.
+
+
This method takes the ONNX paths of the transformer and compiles them into an optimized format
+
for inference using JSON-based configuration.
+
+
Args:
+
compile_config (str, optional): Path to a JSON configuration file containing
+
compilation settings, device mappings, and optimization parameters. If None,
+
uses the default configuration.
+
parallel (bool, default=False): Compilation mode selection:
+
- True: Compile modules in parallel using ThreadPoolExecutor for faster processing
+
- False: Compile modules sequentially for lower resource usage
+
height (int, default=192): Target image height in pixels.
+
width (int, default=320): Target image width in pixels.
+
num_frames (int, deafult=81) : Target num of frames in pixel space
+
use_onnx_subfunctions (bool, default=False): Whether to export models with ONNX
+
subfunctions before compilation if not already exported.
+
+
Raises:
+
RuntimeError: If compilation fails for any module or if QAIC compiler is not available
+
FileNotFoundError: If ONNX models haven't been exported or config file is missing
+
ValueError: If configuration parameters are invalid
+
OSError: If there are issues with file I/O during compilation
+
+
Example:
+
>>> pipeline = QEffWanImageToVideoPipeline.from_pretrained("Wan-AI/Wan2.2-I2V-A14B-Diffusers")
+
>>> # Sequential compilation with default config
+
>>> pipeline.compile(height=480, width=832, num_frames=81)
+
>>>
+
>>> # Parallel compilation with custom config
+
>>> pipeline.compile(
+
... compile_config="/path/to/custom_config.json",
+
... parallel=True,
+
... height=480,
+
... width=832,
+
... num_frames=81
+
... )
+
"""
+
# Load compilation configuration
+
config_manager(self, config_source=compile_config, use_onnx_subfunctions=use_onnx_subfunctions)
+
+
# Set device IDs, qpc path if precompiled qpc exist
+
set_execute_params(self)
+
+
# Ensure all modules are exported to ONNX before compilation
+
if any(
+
path is None
+
for path in [
+
self.vae_encoder.onnx_path,
+
self.transformer.onnx_path,
+
self.vae_decoder.onnx_path,
+
]
+
):
+
self.export(use_onnx_subfunctions=use_onnx_subfunctions)
+
+
# Configure pipeline dimensions and calculate compressed latent parameters
+
cl, latent_height, latent_width, latent_frames = calculate_latent_dimensions_with_frames(
+
height,
+
width,
+
num_frames,
+
self.model.vae.config.scale_factor_spatial,
+
self.model.vae.config.scale_factor_temporal,
+
self.patch_height,
+
self.patch_width,
+
)
+
+
# # Update NPI path for vae encoder
+
vae_npi_full_path = self.get_vae_encoder_npi_path()
+
update_npi_path(self, vae_npi_full_path, module_name="vae_encoder")
+
+
# Prepare dynamic specialization updates based on video dimensions
+
specialization_updates = {
+
"vae_encoder": {
+
"num_frames": num_frames,
+
"height": height,
+
"width": width,
+
},
+
"transformer": [
+
# high noise
+
{
+
"cl": cl, # Compressed latent dimension
+
"latent_height": latent_height, # Latent space height
+
"latent_width": latent_width, # Latent space width
+
"latent_frames": latent_frames, # Latent frames
+
},
+
# low noise
+
{
+
"cl": cl, # Compressed latent dimension
+
"latent_height": latent_height, # Latent space height
+
"latent_width": latent_width, # Latent space width
+
"latent_frames": latent_frames, # Latent frames
+
},
+
],
+
"vae_decoder": {
+
"latent_frames": latent_frames,
+
"latent_height": latent_height,
+
"latent_width": latent_width,
+
},
+
}
+
+
# Use generic utility functions for compilation
+
if parallel:
+
compile_modules_parallel(self.modules, self.custom_config, specialization_updates)
+
else:
+
compile_modules_sequential(self.modules, self.custom_config, specialization_updates)
+
+
[docs] def prepare_latents(
+
self,
+
image: PipelineImageInput,
+
batch_size: int,
+
num_channels_latents: int = 16,
+
height: int = 480,
+
width: int = 832,
+
num_frames: int = 81,
+
dtype: Optional[torch.dtype] = None,
+
device: Optional[torch.device] = None,
+
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
+
latents: Optional[torch.Tensor] = None,
+
last_image: Optional[torch.Tensor] = None,
+
) -> Tuple[torch.Tensor, torch.Tensor]:
+
"""
+
Prepare latent variables for image-to-video generation with temporal conditioning.
+
+
This method handles the complex process of preparing latent tensors for I2V generation,
+
including image conditioning, temporal mask generation, and VAE encoding. It creates
+
the initial noise latents and processes the input image(s) to create conditioning
+
information that maintains temporal consistency throughout video generation.
+
+
Args:
+
image (PipelineImageInput): Input image(s) to condition the video generation.
+
Can be PIL Image, numpy array, or torch tensor.
+
batch_size (int): Number of videos to generate in parallel.
+
num_channels_latents (int, default=16): Number of channels in the latent space.
+
height (int, default=480): Target video height in pixels.
+
width (int, default=832): Target video width in pixels.
+
num_frames (int, default=81): Number of frames in the generated video.
+
dtype (torch.dtype, optional): Data type for latent tensors. If None, uses float32.
+
device (torch.device, optional): Device to place tensors on. If None, uses CPU.
+
generator (torch.Generator or List[torch.Generator], optional): Random generator(s)
+
for reproducible latent initialization.
+
latents (torch.Tensor, optional): Pre-generated latent tensors. If None, random
+
latents are created.
+
last_image (torch.Tensor, optional): Optional last frame image for video completion
+
tasks. Used to create temporal boundaries.
+
+
Returns:
+
Tuple[torch.Tensor, torch.Tensor]: A tuple containing:
+
- latents: Initial noise latents for denoising process
+
- condition: Conditioning tensor combining temporal masks and image latents
+
OR (if expand_timesteps=True):
+
- latents: Initial noise latents
+
- latent_condition: Image conditioning latents
+
+
Raises:
+
ValueError: If generator list length doesn't match batch size
+
RuntimeError: If VAE encoding fails or tensor operations fail
+
+
"""
+
num_latent_frames = (num_frames - 1) // self.model.vae.config.scale_factor_temporal + 1
+
latent_height = height // self.model.vae.config.scale_factor_spatial
+
latent_width = width // self.model.vae.config.scale_factor_spatial
+
+
shape = (batch_size, num_channels_latents, num_latent_frames, latent_height, latent_width)
+
if isinstance(generator, list) and len(generator) != batch_size:
+
raise ValueError(
+
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
+
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
+
)
+
+
if latents is None:
+
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
+
else:
+
latents = latents.to(device=device, dtype=dtype)
+
+
image = image.unsqueeze(2) # [batch_size, channels, 1, height, width]
+
+
if self.model.config.expand_timesteps:
+
video_condition = image
+
+
elif last_image is None:
+
video_condition = torch.cat(
+
[image, image.new_zeros(image.shape[0], image.shape[1], num_frames - 1, height, width)], dim=2
+
)
+
else:
+
last_image = last_image.unsqueeze(2)
+
video_condition = torch.cat(
+
[image, image.new_zeros(image.shape[0], image.shape[1], num_frames - 2, height, width), last_image],
+
dim=2,
+
)
+
video_condition = video_condition.to(device=device, dtype=self.model.vae.dtype)
+
+
latents_mean = (
+
torch.tensor(self.model.vae.config.latents_mean)
+
.view(1, self.model.vae.config.z_dim, 1, 1, 1)
+
.to(latents.device, latents.dtype)
+
)
+
latents_std = 1.0 / torch.tensor(self.model.vae.config.latents_std).view(
+
1, self.model.vae.config.z_dim, 1, 1, 1
+
).to(latents.device, latents.dtype)
+
+
# Initialize VAE encoder inference session
+
if self.vae_encoder.qpc_session is None:
+
self.vae_encoder.qpc_session = QAICInferenceSession(
+
str(self.vae_encoder.qpc_path), device_ids=self.vae_encoder.device_ids
+
)
+
+
# # Allocate output buffer for VAE encoder
+
output_buffer = {
+
"latents": np.random.rand(
+
batch_size, constants.WAN_DIT_I2V_IMG_LATENT_CHANNELS, num_latent_frames, latent_height, latent_width
+
).astype(np.int32)
+
}
+
self.vae_encoder.qpc_session.set_buffers(output_buffer)
+
+
aic_vae_encoder_input = {"image": video_condition.detach().numpy()}
+
+
# Vae encoder QAIC inference
+
start_vae_time = time.perf_counter()
+
outputs = self.vae_encoder.qpc_session.run(aic_vae_encoder_input)
+
end_vae_time = time.perf_counter()
+
vae_encoder_perf = end_vae_time - start_vae_time
+
+
qaic_op = torch.from_numpy(outputs["latents"])
+
latent_condition_mean, logvar = torch.chunk(qaic_op, 2, dim=1)
+
latent_condition = latent_condition_mean.repeat(batch_size, 1, 1, 1, 1)
+
+
latent_condition = latent_condition.to(dtype)
+
latent_condition = (latent_condition - latents_mean) * latents_std
+
+
if self.model.config.expand_timesteps:
+
first_frame_mask = torch.ones(
+
1, 1, num_latent_frames, latent_height, latent_width, dtype=dtype, device=device
+
)
+
first_frame_mask[:, :, 0] = 0
+
return latents, latent_condition, first_frame_mask, vae_encoder_perf
+
+
mask_lat_size = torch.ones(batch_size, 1, num_frames, latent_height, latent_width)
+
+
if last_image is None:
+
mask_lat_size[:, :, list(range(1, num_frames))] = 0
+
else:
+
mask_lat_size[:, :, list(range(1, num_frames - 1))] = 0
+
first_frame_mask = mask_lat_size[:, :, 0:1]
+
first_frame_mask = torch.repeat_interleave(
+
first_frame_mask, dim=2, repeats=self.model.vae.config.scale_factor_temporal
+
)
+
mask_lat_size = torch.concat([first_frame_mask, mask_lat_size[:, :, 1:, :]], dim=2)
+
mask_lat_size = mask_lat_size.view(
+
batch_size, -1, self.model.vae.config.scale_factor_temporal, latent_height, latent_width
+
)
+
mask_lat_size = mask_lat_size.transpose(1, 2)
+
mask_lat_size = mask_lat_size.to(latent_condition.device)
+
+
return latents, torch.concat([mask_lat_size, latent_condition], dim=1), vae_encoder_perf
+
+
def __call__(
+
self,
+
image: PipelineImageInput,
+
prompt: Union[str, List[str]] = None,
+
negative_prompt: Union[str, List[str]] = None,
+
height: int = 544,
+
width: int = 720,
+
num_frames: int = 81,
+
num_inference_steps: int = 50,
+
guidance_scale: float = 1.0,
+
guidance_scale_2: Optional[float] = None,
+
num_videos_per_prompt: Optional[int] = 1,
+
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
+
latents: Optional[torch.Tensor] = None,
+
prompt_embeds: Optional[torch.Tensor] = None,
+
negative_prompt_embeds: Optional[torch.Tensor] = None,
+
image_embeds: Optional[torch.Tensor] = None,
+
last_image: Optional[torch.Tensor] = None,
+
output_type: Optional[str] = "np",
+
return_dict: bool = True,
+
attention_kwargs: Optional[Dict[str, Any]] = None,
+
callback_on_step_end: Optional[Union[Callable[[int, int, Dict], None]]] = None,
+
callback_on_step_end_tensor_inputs: List[str] = ["latents"],
+
max_sequence_length: int = 512,
+
custom_config_path: Optional[str] = None,
+
use_onnx_subfunctions: bool = False,
+
parallel_compile: bool = True,
+
):
+
"""
+
Generate videos from input images and text prompts using the QEfficient-optimized WAN I2V pipeline on QAIC hardware.
+
+
This is the main entry point for image-to-video generation. It orchestrates the complete WAN I2V
+
diffusion pipeline optimized for Qualcomm AI Cloud devices, converting static images into dynamic
+
video sequences with temporal consistency and text-guided motion.
+
+
Args:
+
image (PipelineImageInput): Input image(s) to condition the video generation. Can be PIL Image,
+
numpy array, or torch tensor. This serves as the first frame or conditioning frame for the video.
+
prompt (str or List[str], optional): Primary text prompt(s) describing the desired motion and content
+
for the video. Required unless `prompt_embeds` is provided.
+
negative_prompt (str or List[str], optional): Negative prompt(s) describing what to avoid
+
in the generated video. Used with classifier-free guidance.
+
height (int, optional): Target video height in pixels. Must be divisible by VAE scale factor.
+
Default: 480.
+
width (int, optional): Target video width in pixels. Must be divisible by VAE scale factor.
+
Default: 832.
+
num_frames (int, optional): Number of video frames to generate. Must satisfy temporal
+
divisibility requirements (num_frames - 1) % temporal_scale_factor == 0. Default: 81.
+
num_inference_steps (int, optional): Number of denoising steps. More steps generally
+
improve quality but increase generation time. Default: 50.
+
guidance_scale (float, optional): Guidance scale for classifier-free guidance in high-noise stage.
+
Default: 3.0.
+
guidance_scale_2 (float, optional): Guidance scale for low-noise stage in WAN 2.2.
+
If None, uses guidance_scale value.
+
num_videos_per_prompt (int, optional): Number of videos to generate per prompt. Default: 1.
+
generator (torch.Generator or List[torch.Generator], optional): Random generator for
+
reproducible generation.
+
latents (torch.Tensor, optional): Pre-generated latent tensors. If None, random latents
+
are generated based on video dimensions.
+
prompt_embeds (torch.Tensor, optional): Pre-computed text embeddings from UMT5 encoder.
+
Shape: [batch, seq_len, hidden_dim].
+
negative_prompt_embeds (torch.Tensor, optional): Pre-computed negative text embeddings.
+
image_embeds (torch.Tensor, optional): Pre-computed image embeddings (currently unused).
+
last_image (torch.Tensor, optional): Optional last frame image for video completion tasks.
+
Used to create temporal boundaries in the generated video.
+
output_type (str, optional): Output format. Options: "np" (default), "pil", or "latent".
+
return_dict (bool, optional): Whether to return a dictionary or tuple. Default: True.
+
attention_kwargs (Dict[str, Any], optional): Additional attention arguments for transformer.
+
callback_on_step_end (Callable, optional): Callback function executed after each denoising step.
+
callback_on_step_end_tensor_inputs (List[str], optional): Tensor names to pass to callback.
+
Default: ["latents"].
+
max_sequence_length (int, optional): Maximum token sequence length for text encoder. Default: 512.
+
custom_config_path (str, optional): Path to custom JSON configuration file for compilation.
+
use_onnx_subfunctions (bool, optional): Whether to export transformer blocks as ONNX subfunctions.
+
Default: False.
+
parallel_compile (bool, optional): Whether to compile modules in parallel. Default: True.
+
+
Returns:
+
QEffPipelineOutput: A dataclass containing:
+
- images: Generated video(s) in the format specified by `output_type`
+
- pipeline_module: Performance metrics for each pipeline component (transformer, VAE decoder)
+
+
Raises:
+
ValueError: If input validation fails or parameters are incompatible
+
RuntimeError: If compilation fails or QAIC devices are unavailable
+
FileNotFoundError: If custom config file is specified but not found
+
+
Example:
+
>>> from QEfficient.diffusers.pipelines.wan import QEffWanImageToVideoPipeline
+
>>> from PIL import Image
+
>>>
+
>>> # Load pipeline and input image
+
>>> pipeline = QEffWanImageToVideoPipeline.from_pretrained("Wan-AI/Wan2.2-I2V-A14B-Diffusers")
+
>>> image = Image.open("input_frame.jpg")
+
>>>
+
>>> # Generate video with motion
+
>>> result = pipeline(
+
... image=image,
+
... prompt="A person walking through a sunny garden with flowing motion",
+
... height=544,
+
... width=720,
+
... num_frames=81,
+
... num_inference_steps=4,
+
... guidance_scale=1.0
+
... )
+
>>>
+
>>> # Save generated video
+
>>> frames = result.images[0]
+
>>> export_to_video(frames, "generated_video.mp4", fps=16)
+
"""
+
device = self.model._execution_device
+
+
# Compile models with custom configuration if needed
+
self.compile(
+
compile_config=custom_config_path,
+
parallel=parallel_compile,
+
use_onnx_subfunctions=use_onnx_subfunctions,
+
height=height,
+
width=width,
+
num_frames=num_frames,
+
)
+
+
# Step 1: Validate all inputs
+
self.model.check_inputs(
+
prompt,
+
negative_prompt,
+
image,
+
height,
+
width,
+
prompt_embeds,
+
negative_prompt_embeds,
+
image_embeds,
+
callback_on_step_end_tensor_inputs,
+
guidance_scale_2,
+
)
+
+
# Ensure num_frames satisfies temporal divisibility requirements
+
if num_frames % self.model.vae.config.scale_factor_temporal != 1:
+
logger.warning(
+
f"`num_frames - 1` has to be divisible by {self.model.vae.config.scale_factor_temporal}. Rounding to the nearest number."
+
)
+
num_frames = (
+
num_frames // self.model.vae.config.scale_factor_temporal * self.model.vae.config.scale_factor_temporal
+
+ 1
+
)
+
num_frames = max(num_frames, 1)
+
+
if self.model.config.boundary_ratio is not None and guidance_scale_2 is None:
+
guidance_scale_2 = guidance_scale
+
+
# Initialize pipeline state
+
self._guidance_scale = guidance_scale
+
self._guidance_scale_2 = guidance_scale_2 if guidance_scale_2 is not None else guidance_scale
+
self._attention_kwargs = attention_kwargs
+
self._current_timestep = None
+
self._interrupt = False
+
+
# Step 2: Determine batch size from inputs
+
if prompt is not None and isinstance(prompt, str):
+
batch_size = 1
+
elif prompt is not None and isinstance(prompt, list):
+
batch_size = len(prompt)
+
else:
+
batch_size = prompt_embeds.shape[0]
+
+
# Step 3: Encode input prompts using UMT5 text encoder
+
# TODO: Update UMT5 on QAIC
+
prompt_embeds, negative_prompt_embeds = self.model.encode_prompt(
+
prompt=prompt,
+
negative_prompt=negative_prompt,
+
do_classifier_free_guidance=self.do_classifier_free_guidance,
+
num_videos_per_prompt=num_videos_per_prompt,
+
prompt_embeds=prompt_embeds,
+
negative_prompt_embeds=negative_prompt_embeds,
+
max_sequence_length=max_sequence_length,
+
device=device,
+
)
+
+
# Convert embeddings to transformer dtype for compatibility
+
transformer_dtype = self.transformer.model.transformer_high.dtype
+
prompt_embeds = prompt_embeds.to(transformer_dtype)
+
if negative_prompt_embeds is not None:
+
negative_prompt_embeds = negative_prompt_embeds.to(transformer_dtype)
+
+
# Step 4: Prepare timesteps for denoising process
+
self.scheduler.set_timesteps(num_inference_steps, device=device)
+
timesteps = self.scheduler.timesteps
+
+
# Step 5: Prepare initial latent variables for video generation
+
num_channels_latents = self.model.vae.config.z_dim
+
image = self.model.video_processor.preprocess(image, height=height, width=width).to(device, dtype=torch.float32)
+
if last_image is not None:
+
last_image = self.video_processor.preprocess(last_image, height=height, width=width).to(
+
device, dtype=torch.float32
+
)
+
+
latents_outputs = self.prepare_latents(
+
image,
+
batch_size * num_videos_per_prompt,
+
num_channels_latents,
+
height,
+
width,
+
num_frames,
+
torch.float32,
+
device,
+
generator,
+
latents,
+
last_image,
+
)
+
if self.model.config.expand_timesteps:
+
# wan 2.2 5b i2v use firt_frame_mask to mask timesteps
+
latents, condition, first_frame_mask, vae_encoder_perf = latents_outputs
+
else:
+
latents, condition, vae_encoder_perf = latents_outputs
+
+
# 6. Denoising loop
+
num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order
+
self._num_timesteps = len(timesteps)
+
+
if self.model.config.boundary_ratio is not None:
+
boundary_timestep = self.model.config.boundary_ratio * self.scheduler.config.num_train_timesteps
+
else:
+
boundary_timestep = None
+
+
# Step 7: Initialize QAIC inference session for transformer
+
if self.transformer.qpc_session is None:
+
qpc_load_start = time.perf_counter()
+
self.transformer.qpc_session = QAICInferenceSession(
+
str(self.transformer.qpc_path), device_ids=self.transformer.device_ids
+
)
+
qpc_load_end = time.perf_counter()
+
print(f" DIT QAICInferenceSession time {qpc_load_end - qpc_load_start:.2f} seconds")
+
+
# Calculate compressed latent dimension for transformer buffer allocation
+
cl, _, _, _ = calculate_latent_dimensions_with_frames(
+
height,
+
width,
+
num_frames,
+
self.model.vae.config.scale_factor_spatial,
+
self.model.vae.config.scale_factor_temporal,
+
self.patch_height,
+
self.patch_width,
+
)
+
# Allocate output buffer for QAIC inference
+
output_buffer = {
+
"output": np.random.rand(
+
batch_size,
+
cl, # Compressed latent dimension
+
constants.WAN_DIT_OUT_CHANNELS,
+
).astype(np.int32),
+
}
+
self.transformer.qpc_session.set_buffers(output_buffer)
+
transformer_perf = []
+
+
# Step 8: Denoising loop with dual-stage processing
+
with self.model.progress_bar(total=num_inference_steps) as progress_bar:
+
for i, t in enumerate(timesteps):
+
if self._interrupt:
+
continue
+
+
self._current_timestep = t
+
+
# Determine which model to use based on boundary timestep
+
if boundary_timestep is None or t >= boundary_timestep:
+
# High-noise stage
+
current_model = self.transformer.model.transformer_high
+
current_guidance_scale = guidance_scale
+
model_type = torch.ones(1, dtype=torch.int64) # High-noise model indicator
+
else:
+
# Low-noise stage
+
current_model = self.transformer.model.transformer_low
+
current_guidance_scale = guidance_scale_2
+
model_type = torch.ones(2, dtype=torch.int64) # Low-noise model indicator
+
+
# Prepare latent input with proper dtype
+
latent_model_input = latents.to(transformer_dtype)
+
+
# Handle timestep expansion for temporal consistency
+
if self.model.config.expand_timesteps:
+
latent_model_input = (1 - first_frame_mask) * condition + first_frame_mask * latents
+
latent_model_input = latent_model_input.to(transformer_dtype)
+
+
# seq_len: num_latent_frames * (latent_height // patch_size) * (latent_width // patch_size)
+
temp_ts = (first_frame_mask[0][0][:, ::2, ::2] * t).flatten()
+
# batch_size, seq_len
+
timestep = temp_ts.unsqueeze(0).expand(latents.shape[0], -1)
+
else:
+
latent_model_input = torch.cat([latents, condition], dim=1).to(transformer_dtype)
+
timestep = t.expand(latents.shape[0])
+
+
# Extract dimensions for patch processing
+
batch_size, num_channels, latent_frames, latent_height, latent_width = latent_model_input.shape
+
p_t, p_h, p_w = current_model.config.patch_size
+
post_patch_num_frames = latent_frames // p_t
+
post_patch_height = latent_height // p_h
+
post_patch_width = latent_width // p_w
+
+
# Generate rotary position embeddings
+
rotary_emb = current_model.rope(latent_model_input)
+
rotary_emb = torch.cat(rotary_emb, dim=0)
+
ts_seq_len = None
+
timestep = timestep.flatten()
+
+
# Generate conditioning embeddings (time + text)
+
temb, timestep_proj, encoder_hidden_states, encoder_hidden_states_image = (
+
current_model.condition_embedder(
+
timestep, prompt_embeds, encoder_hidden_states_image=None, timestep_seq_len=ts_seq_len
+
)
+
)
+
+
# Generate negative conditioning for classifier-free guidance
+
if self.do_classifier_free_guidance:
+
temb, timestep_proj, encoder_hidden_states_neg, encoder_hidden_states_image = (
+
current_model.condition_embedder(
+
timestep,
+
negative_prompt_embeds,
+
encoder_hidden_states_image=None,
+
timestep_seq_len=ts_seq_len,
+
)
+
)
+
+
# Reshape timestep projection for transformer input
+
timestep_proj = timestep_proj.unflatten(1, (6, -1))
+
+
# Prepare inputs for QAIC inference
+
inputs_aic = {
+
"hidden_states": latent_model_input.detach().numpy(),
+
"encoder_hidden_states": encoder_hidden_states.detach().numpy(),
+
"rotary_emb": rotary_emb.detach().numpy(),
+
"temb": temb.detach().numpy(),
+
"timestep_proj": timestep_proj.detach().numpy(),
+
"tsp": model_type.detach().numpy(), # Transformer stage pointer
+
}
+
+
# Prepare negative inputs for classifier-free guidance
+
if self.do_classifier_free_guidance:
+
inputs_aic2 = {
+
"hidden_states": latent_model_input.detach().numpy(),
+
"encoder_hidden_states": encoder_hidden_states_neg.detach().numpy(),
+
"rotary_emb": rotary_emb.detach().numpy(),
+
"temb": temb.detach().numpy(),
+
"timestep_proj": timestep_proj.detach().numpy(),
+
}
+
+
# Run conditional prediction with caching context
+
with current_model.cache_context("cond"):
+
# QAIC inference for conditional prediction
+
start_transformer_step_time = time.perf_counter()
+
outputs = self.transformer.qpc_session.run(inputs_aic)
+
end_transformer_step_time = time.perf_counter()
+
transformer_perf.append(end_transformer_step_time - start_transformer_step_time)
+
print(f"DIT {i} time {end_transformer_step_time - start_transformer_step_time:.2f} seconds")
+
+
# Process transformer output
+
hidden_states = torch.tensor(outputs["output"])
+
+
# Reshape output from patches back to video format
+
hidden_states = hidden_states.reshape(
+
batch_size, post_patch_num_frames, post_patch_height, post_patch_width, p_t, p_h, p_w, -1
+
)
+
+
# Permute dimensions to reconstruct video tensor
+
hidden_states = hidden_states.permute(0, 7, 1, 4, 2, 5, 3, 6)
+
noise_pred = hidden_states.flatten(6, 7).flatten(4, 5).flatten(2, 3)
+
+
# Run unconditional prediction for classifier-free guidance
+
if self.do_classifier_free_guidance: # Note: CFG will increase DIT num steps.
+
with current_model.cache_context("uncond"):
+
# QAIC inference for unconditional prediction
+
start_transformer_step_time = time.perf_counter()
+
outputs = self.transformer.qpc_session.run(inputs_aic2)
+
end_transformer_step_time = time.perf_counter()
+
transformer_perf.append(end_transformer_step_time - start_transformer_step_time)
+
+
# Process unconditional output
+
hidden_states = torch.tensor(outputs["output"])
+
+
# Reshape unconditional output
+
hidden_states = hidden_states.reshape(
+
batch_size, post_patch_num_frames, post_patch_height, post_patch_width, p_t, p_h, p_w, -1
+
)
+
+
hidden_states = hidden_states.permute(0, 7, 1, 4, 2, 5, 3, 6)
+
noise_uncond = hidden_states.flatten(6, 7).flatten(4, 5).flatten(2, 3)
+
+
# Apply classifier-free guidance
+
noise_pred = noise_uncond + current_guidance_scale * (noise_pred - noise_uncond)
+
+
# Update latents using scheduler (x_t -> x_t-1)
+
latents = self.scheduler.step(noise_pred, t, latents, return_dict=False)[0]
+
+
# Execute callback if provided # TODO: optimize to run DIT and vae in parallel
+
if callback_on_step_end is not None:
+
callback_kwargs = {}
+
for k in callback_on_step_end_tensor_inputs:
+
callback_kwargs[k] = locals()[k]
+
callback_outputs = callback_on_step_end(self, i, callback_kwargs, num_frames=num_frames)
+
latents = callback_outputs.pop("latents", latents)
+
prompt_embeds = callback_outputs.pop("prompt_embeds", prompt_embeds)
+
negative_prompt_embeds = callback_outputs.pop("negative_prompt_embeds", negative_prompt_embeds)
+
+
# Update progress bar
+
if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0):
+
progress_bar.update()
+
+
self._current_timestep = None
+
if self.model.config.expand_timesteps:
+
latents = (1 - first_frame_mask) * condition + first_frame_mask * latents
+
+
# Prepare latents for VAE decoding
+
latents = latents.to(self.vae_decoder.model.dtype)
+
+
# Apply VAE normalization (denormalization)
+
latents_mean = (
+
torch.tensor(self.vae_decoder.model.config.latents_mean)
+
.view(1, self.vae_decoder.model.config.z_dim, 1, 1, 1)
+
.to(latents.device, latents.dtype)
+
)
+
latents_std = 1.0 / torch.tensor(self.vae_decoder.model.config.latents_std).view(
+
1, self.vae_decoder.model.config.z_dim, 1, 1, 1
+
).to(latents.device, latents.dtype)
+
latents = latents / latents_std + latents_mean
+
+
# Initialize VAE decoder inference session
+
if self.vae_decoder.qpc_session is None:
+
self.vae_decoder.qpc_session = QAICInferenceSession(
+
str(self.vae_decoder.qpc_path), device_ids=self.vae_decoder.device_ids
+
)
+
+
# # Allocate output buffer for VAE decoder
+
output_buffer = {"sample": np.random.rand(batch_size, 3, num_frames, height, width).astype(np.int32)}
+
self.vae_decoder.qpc_session.set_buffers(output_buffer)
+
inputs = {"latent_sample": latents.numpy()}
+
+
start_decode_time = time.perf_counter()
+
video = self.vae_decoder.qpc_session.run(inputs)
+
end_decode_time = time.perf_counter()
+
vae_decoder_perf = end_decode_time - start_decode_time
+
+
# Post-process video for output
+
video_tensor = torch.from_numpy(video["sample"])
+
video = self.model.video_processor.postprocess_video(video_tensor)
+
+
# Step 10: Collect performance metrics
+
perf_data = {
+
"vae_encoder": vae_encoder_perf,
+
"transformer": transformer_perf,
+
"vae_decoder": vae_decoder_perf,
+
}
+
+
# Build performance metrics for output
+
perf_metrics = [ModulePerf(module_name=name, perf=perf_data[name]) for name in perf_data.keys()]
+
+
return QEffPipelineOutput(
+
pipeline_module=perf_metrics,
+
images=video,
+
)
+