Skip to content

Conversation

@zhtshr
Copy link

@zhtshr zhtshr commented Feb 11, 2026

Enable Disaggregation Feature

Summary

This PR introduces a disaggregation architecture to LightX2V, enabling distributed deployment of the video generation pipeline across multiple devices or machines.

What's New

Core Functionality

  • Service Decoupling: Separate encoder and transformer services that can run independently
  • High-Performance Communication: ZeroMQ and RDMA-based messaging with Mooncake transfer engine
  • Flexible Deployment: Support for single-machine multi-GPU and cross-machine distributed setups

New Components

  • lightx2v/disagg/: Complete disaggregation package
    • conn.py: Data connection and management
    • services/encoder.py: Encoder service implementation
    • services/transformer.py: Transformer service implementation
    • examples/: Usage examples for WAN I2V and T2V models

Key Benefits

  1. Resource Flexibility: Distribute compute-intensive tasks across multiple devices
  2. Scalability: Easy horizontal scaling for production deployments
  3. Memory Efficiency: Run large models on hardware-constrained environments
  4. Service-Oriented: Build microservice-based video generation systems

Usage Example

python3 lightx2v/disagg/examples/wan_t2v_service.py

See lightx2v/disagg/examples/ for complete working examples.

Backward Compatibility

✅ This is an optional feature that doesn't affect existing functionality:

  • Default mode preserves current behavior
  • All existing APIs remain unchanged
  • Users can opt-in to use disaggregation when needed

Testing

  • ✅ Tested with WAN I2V and T2V models
  • ✅ Verified cross-device communication stability
  • ✅ Validated accuracy matches single-machine mode

Files Changed

  • Added: lightx2v/disagg/ package with all disaggregation modules
  • Modified: None (purely additive)

Future Enhancements

  • Automatic service discovery
  • Load balancing across multiple workers
  • Enhanced monitoring and health checks

Type: Feature
Breaking Changes: None
Documentation: Included in lightx2v/disagg/examples/

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @zhtshr, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a disaggregation architecture to the LightX2V project, transforming the video generation pipeline into a distributed system. By decoupling the encoder and transformer components into independent services, it significantly enhances resource flexibility, scalability, and memory efficiency. This change allows for more versatile deployments, from multi-GPU single machines to cross-machine distributed environments, without altering existing APIs or workflows.

Highlights

  • Service Decoupling: Introduced separate encoder and transformer services that can operate independently, enabling distributed deployment of the video generation pipeline.
  • High-Performance Communication: Implemented ZeroMQ and RDMA-based messaging with the Mooncake transfer engine for efficient data transfer between decoupled services.
  • Flexible Deployment: Supports various deployment scenarios, including single-machine multi-GPU and cross-machine distributed setups.
  • Resource Flexibility & Scalability: Allows distribution of compute-intensive tasks across multiple devices, facilitating horizontal scaling for production and memory efficiency for large models.
  • New Disaggregation Package: Added a new lightx2v/disagg/ package containing connection management, service implementations (encoder, transformer), and usage examples.
  • Backward Compatibility: The disaggregation feature is optional and does not impact existing functionality or APIs.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • configs/mooncake_config.json
    • Added a new configuration file for Mooncake transfer engine settings.
  • lightx2v/disagg/init.py
    • Added package initialization for the disaggregation module.
  • lightx2v/disagg/conn.py
    • Added core connection management classes (DataManager, DataSender, DataReceiver) for inter-service communication using ZeroMQ and Mooncake.
  • lightx2v/disagg/examples/mooncake_client.py
    • Added an example client script demonstrating Mooncake data transfer.
  • lightx2v/disagg/examples/mooncake_server.py
    • Added an example server script demonstrating Mooncake data transfer.
  • lightx2v/disagg/examples/wan_i2v.py
    • Added a standard WAN I2V example script.
  • lightx2v/disagg/examples/wan_i2v_service.py
    • Added an example script demonstrating the disaggregated WAN I2V service.
  • lightx2v/disagg/examples/wan_t2v.py
    • Added a standard WAN T2V example script.
  • lightx2v/disagg/examples/wan_t2v_service.py
    • Added an example script demonstrating the disaggregated WAN T2V service.
  • lightx2v/disagg/mooncake.py
    • Added a wrapper class for the Mooncake transfer engine, including configuration loading and memory management.
  • lightx2v/disagg/protocol.py
    • Added data classes for defining the communication protocol, including TensorMetadata, AllocationRequest, RemoteBuffer, and MemoryHandle.
  • lightx2v/disagg/services/init.py
    • Added package initialization for the disaggregation services module.
  • lightx2v/disagg/services/base.py
    • Added an abstract base class for disaggregation services.
  • lightx2v/disagg/services/encoder.py
    • Added the EncoderService, responsible for text/image/VAE encoding and sending processed data to the transformer service.
  • lightx2v/disagg/services/transformer.py
    • Added the TransformerService, responsible for receiving encoded data, executing the diffusion process, and performing VAE decoding.
  • lightx2v/disagg/utils.py
    • Added utility functions for loading WAN models, setting configuration, and estimating buffer sizes for disaggregated components.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant disaggregation feature, enabling distributed deployment of the video generation pipeline. The overall architecture is well-designed, separating concerns into services, connection management, and utilities. My review focuses on improving code quality, portability, and fixing potential bugs. Key areas of feedback include correcting ZeroMQ context management, fixing a bug in an example client, improving the portability of example scripts by removing hardcoded paths, and enhancing code style and consistency. Overall, this is a solid contribution, and the suggested changes will help make the new code more robust and maintainable.

Comment on lines +46 to +48
if ret_value != 0:
print("Mooncake memory registration failed.")
raise RuntimeError("Mooncake memory registration failed.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The check if ret_value != 0: will raise a NameError if PROTOCOL is not 'rdma', because ret_value would not have been defined. This check and the following lines should be inside the if PROTOCOL == 'rdma': block.

Suggested change
if ret_value != 0:
print("Mooncake memory registration failed.")
raise RuntimeError("Mooncake memory registration failed.")
if ret_value != 0:
print("Mooncake memory registration failed.")
raise RuntimeError("Mooncake memory registration failed.")

Comment on lines +101 to +102
buf = torch.empty((nbytes,), dtype=torch.uint8, #device=torch.device(f"cuda:{self.receiver_engine_rank}")
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The device argument for torch.empty is commented out, which means the buffer will be allocated on the CPU. If these buffers are intended for RDMA over InfiniBand with GPU Direct, they typically need to be allocated on the GPU. The torch.cuda.set_device call on line 94 is also commented out, which might be related. Please verify if this is the intended behavior.

            buf = torch.empty((nbytes,), dtype=torch.uint8, device=torch.device(f"cuda:{self.receiver_engine_rank}"))

Comment on lines +144 to +148
buf = torch.empty(
(nbytes,),
dtype=torch.uint8,
# device=torch.device(f"cuda:{self.sender_engine_rank}"),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The device argument for torch.empty is commented out, which means the buffer will be allocated on the CPU. If these buffers are intended for RDMA over InfiniBand with GPU Direct, they typically need to be allocated on the GPU. The torch.cuda.set_device call on line 135 is also commented out, which might be related. Please verify if this is the intended behavior.

Suggested change
buf = torch.empty(
(nbytes,),
dtype=torch.uint8,
# device=torch.device(f"cuda:{self.sender_engine_rank}"),
)
buf = torch.empty(
(nbytes,),
dtype=torch.uint8,
device=torch.device(f"cuda:{self.sender_engine_rank}"),
)

@@ -0,0 +1,89 @@
import logging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The script imports and configures the standard logging module (on line 10) but then exclusively uses loguru.logger for logging. To avoid confusion and unused code, it's better to remove the import and configuration for the standard logging module.


def main():
# 1. Configuration
model_path = "/root/zht/LightX2V/models/Wan-AI/Wan2.1-T2V-1.3B"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Hardcoded absolute paths like this one (and on line 18) make the script difficult to run for other users on different systems. It's better to use relative paths, command-line arguments, or environment variables to specify these paths.

Comment on lines +279 to +280
def failure_exception(self):
raise Exception("Fake DataSender Exception")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Raising a generic Exception with a message like 'Fake DataSender Exception' is not ideal for production code. It's better to use a more specific exception type like RuntimeError and provide a more descriptive message. Consider defining a custom exception class for this module for even better error handling.

Suggested change
def failure_exception(self):
raise Exception("Fake DataSender Exception")
def failure_exception(self):
raise RuntimeError("DataSender failed due to an internal error.")

buffer_ptrs = [buf.data_ptr() for buf in self._rdma_buffers]
self.data_sender.send(buffer_ptrs)

import time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Imports should be at the top of the file, as per PEP 8 guidelines. Moving import time to the top improves readability and consistency.

seed_all(self.config["seed"])

data_bootstrap_addr = self.config.get("data_bootstrap_addr", "127.0.0.1")
data_bootstrap_room = self.config.get("data_bootstrap_room", 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with how sender_engine_rank and receiver_engine_rank are handled, it's better to cast data_bootstrap_room to an integer here. It's currently cast later in the DataReceiver constructor.

Suggested change
data_bootstrap_room = self.config.get("data_bootstrap_room", 0)
data_bootstrap_room = int(self.config.get("data_bootstrap_room", 0))


@cache
def _connect(self, endpoint: str):
socket = zmq.Context().socket(zmq.PUSH)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To follow the single ZMQ context per process best practice, this method should use the context created in __init__ instead of creating a new one.

Suggested change
socket = zmq.Context().socket(zmq.PUSH)
socket = self.context.socket(zmq.PUSH)

return hashlib.sha256(data).hexdigest()

# Poll for data from EncoderService
import time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Imports should be at the top of the file, as per PEP 8 guidelines. Moving import time to the top improves readability and consistency.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants