Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ uv sync --all-extras
uv run pytest

# Run linting
uv run ruff check decart/
uv run ruff check decart/ tests/ examples/

# Format code
uv run black decart/ tests/ examples/
Expand Down
3 changes: 3 additions & 0 deletions decart/lipsync/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .client import RealtimeLipsyncClient

__all__ = ["RealtimeLipsyncClient"]
172 changes: 172 additions & 0 deletions decart/lipsync/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import asyncio
import websockets
from typing import Optional, Tuple
from .messages import (
LipsyncClientMessage,
LipsyncServerMessage,
LipsyncServerMessageAdapter,
LipsyncConfigMessage,
LipsyncConfigAckMessage,
LipsyncAudioInputMessage,
LipsyncVideoInputMessage,
LipsyncInterruptAudioMessage,
LipsyncSyncedOutputMessage,
LipsyncErrorMessage,
)
import fractions
import time
import logging
import cv2
import numpy as np

logger = logging.getLogger(__name__)


class RealtimeLipsyncClient:

DECART_LIPSYNC_ENDPOINT = "/router/lipsync/ws"
VIDEO_FPS = 25

def __init__(
self,
api_key: str,
base_url: str = "https://api.decart.ai",
audio_sample_rate: int = 16000,
video_fps: int = VIDEO_FPS,
sync_latency: float = 0.0,
):
"""
Args:
api_key: The API key for the Decart Lipsync server
url: The URL of the Decart Lipsync server
audio_sample_rate: The sample rate of the audio
video_fps: The FPS of the video
sync_latency: Delay next frame up to this many seconds, to account for variable latency
"""
self._url = f"{base_url}{self.DECART_LIPSYNC_ENDPOINT}".replace(
"https://", "wss://"
).replace("http://", "ws://")
self._api_key = api_key
self._audio_sample_rate = audio_sample_rate
self._video_fps = video_fps
self._sync_latency = sync_latency

self._websocket: Optional[websockets.ClientConnection] = None
self._out_queue = asyncio.Queue()
self._response_handling_task: Optional[asyncio.Task] = None

self._video_frame_interval = fractions.Fraction(1, video_fps)
self._video_out_frame_index = 0
self._video_out_start_time = 0

async def _recv(self) -> LipsyncServerMessage:
response = await self._websocket.recv()
return LipsyncServerMessageAdapter.validate_json(response)

async def _send(self, message: LipsyncClientMessage):
msg = message.model_dump_json()
await self._websocket.send(msg)

async def _handle_server_responses(self):
try:
while self._websocket is not None:
response = await self._recv()
if isinstance(response, LipsyncSyncedOutputMessage):
await self._out_queue.put(response)
elif isinstance(response, LipsyncErrorMessage):
logger.error(f"Lipsync server error: {response.message}")
raise Exception(response.message)
else:
logger.error(f"Unknown response from lipsync server: {response}")
except asyncio.CancelledError:
pass
except websockets.exceptions.ConnectionClosedOK:
logger.debug("Connection closed by server")

async def _decode_video_frame(self, video_frame: bytes) -> bytes:
def _decode_video_frame_sync(video_frame: bytes) -> bytes:
nparr = np.frombuffer(video_frame, np.uint8)
video_frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
return video_frame

return await asyncio.to_thread(_decode_video_frame_sync, video_frame)

async def _encode_video_frame(self, image: np.ndarray) -> bytes:
def _encode_video_frame_sync(image: np.ndarray) -> bytes:
success, encoded_image = cv2.imencode(".jpeg", image)
if not success:
raise Exception("Failed to encode video frame as JPEG")
return encoded_image.tobytes()

return await asyncio.to_thread(_encode_video_frame_sync, image)

async def _decode_audio_frame(self, audio_frame: bytes) -> bytes:
return audio_frame

async def connect(self):
logger.debug(f"Connecting to lipsync server at {self._url}")
self._websocket = await websockets.connect(f"{self._url}?api_key={self._api_key}")
logger.debug("WebSocket connected")
# Initial handshake
await self._send(
LipsyncConfigMessage(
video_fps=self._video_fps,
audio_sample_rate=self._audio_sample_rate,
)
)
logger.debug("Configuration sent")
response = await self._recv()
if not isinstance(response, LipsyncConfigAckMessage):
raise Exception(f"Configuration not acknowledged by server: {response}")
logger.debug("Configuration acknowledged")

self._response_handling_task = asyncio.create_task(self._handle_server_responses())

logger.debug("Connected to lipsync server")

async def disconnect(self):
if self._websocket is not None:
await self._websocket.close()
self._websocket = None

if self._response_handling_task is not None:
self._response_handling_task.cancel()
try:
await self._response_handling_task
except asyncio.CancelledError:
pass
self._response_handling_task = None

async def send_audio(self, audio_data: bytes):
await self._send(LipsyncAudioInputMessage(audio_data=audio_data))

async def send_video_frame_bytes(self, video_frame_bytes: bytes):
await self._send(LipsyncVideoInputMessage(video_frame=video_frame_bytes))

async def send_video_frame(self, image: np.ndarray):
encoded_image = await self._encode_video_frame(image)
await self.send_video_frame_bytes(encoded_image)

async def interrupt_audio(self):
await self._send(LipsyncInterruptAudioMessage())

async def get_synced_output(self, timeout: Optional[float] = None) -> Tuple[bytes, bytes]:
synced_output: LipsyncSyncedOutputMessage = await asyncio.wait_for(
self._out_queue.get(), timeout=timeout
)

video_frame = await self._decode_video_frame(synced_output.video_frame)
audio_frame = await self._decode_audio_frame(synced_output.audio_frame)

if self._video_out_frame_index == 0:
self._video_out_start_time = time.time() + self._sync_latency

time_til_frame = (
self._video_out_start_time
+ (self._video_out_frame_index * self._video_frame_interval)
- time.time()
)
if time_til_frame > 0:
await asyncio.sleep(time_til_frame)

return video_frame, audio_frame
58 changes: 58 additions & 0 deletions decart/lipsync/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from pydantic import BaseModel, Field, ConfigDict, TypeAdapter
from typing import Literal, Union, Annotated


class LipsyncMessage(BaseModel):
model_config = ConfigDict(ser_json_bytes="base64", val_json_bytes="base64")


class LipsyncConfigMessage(LipsyncMessage):
type: Literal["config"] = "config"
video_fps: int
audio_sample_rate: int


class LipsyncConfigAckMessage(LipsyncMessage):
type: Literal["config_ack"] = "config_ack"


class LipsyncAudioInputMessage(LipsyncMessage):
type: Literal["audio_input"] = "audio_input"
audio_data: bytes


class LipsyncVideoInputMessage(LipsyncMessage):
type: Literal["video_input"] = "video_input"
video_frame: bytes


class LipsyncInterruptAudioMessage(LipsyncMessage):
type: Literal["interrupt_audio"] = "interrupt_audio"


class LipsyncSyncedOutputMessage(LipsyncMessage):
type: Literal["synced_result"] = "synced_result"
video_frame: bytes
audio_frame: bytes


class LipsyncErrorMessage(LipsyncMessage):
type: Literal["error"] = "error"
message: str


LipsyncClientMessage = Annotated[
Union[
LipsyncConfigMessage,
LipsyncAudioInputMessage,
LipsyncVideoInputMessage,
LipsyncInterruptAudioMessage,
],
Field(discriminator="type"),
]
LipsyncServerMessage = Annotated[
Union[LipsyncConfigAckMessage, LipsyncSyncedOutputMessage, LipsyncErrorMessage],
Field(discriminator="type"),
]

LipsyncServerMessageAdapter = TypeAdapter(LipsyncServerMessage)
114 changes: 114 additions & 0 deletions examples/lipsync_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#!/usr/bin/env python
"""
Example of using Decart's Realtime Lipsync API to synchronize audio with video.

This example loads a video file and an audio file, processes them through the
Decart Lipsync API, and saves the lipsynced result to a new video file.

Usage:
python lipsync_file.py <video_file> <audio_file> <output_file>

Example:
python lipsync_file.py input.mp4 speech.wav output_lipsynced.mp4
python lipsync_file.py input.mp4 speech.mp3 output_lipsynced.mp4
"""

import asyncio
import os
import sys
import cv2
from pathlib import Path

from decart.lipsync import RealtimeLipsyncClient


async def process_lipsync(video_path: str, audio_path: str, output_path: str):
"""Process video and audio through Decart's lipsync API."""

# Get API key
api_key = os.getenv("DECART_API_KEY")
if not api_key:
print("Error: Please set DECART_API_KEY environment variable")
return

# Initialize client
client = RealtimeLipsyncClient(api_key=api_key)

print(f"Processing: {video_path} + {audio_path} -> {output_path}")

# Connect to server
await client.connect()
print("Connected to Decart Lipsync server")

try:
# Load audio data - handle different formats
with open(audio_path, "rb") as f:
audio_data = f.read()

# Send audio to server (server handles chunking)
await client.send_audio(audio_data)

# Load video frames and convert to RGB
frame_count = 0
cap = cv2.VideoCapture(video_path)
while True:
ret, frame = cap.read()
if not ret:
break
frame_count += 1

# Convert from BGR (OpenCV default) to RGB
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
print(rgb_frame.shape)
await client.send_video_frame(rgb_frame)
cap.release()

# Receive lipsynced output
out = cv2.VideoWriter(
output_path,
cv2.VideoWriter_fourcc(*"mp4v"),
client._video_fps,
(rgb_frame.shape[1], rgb_frame.shape[0]),
)
for i in range(frame_count):
try:

video_frame, audio_frame = await client.get_synced_output(timeout=1.0)
bgr_frame = cv2.cvtColor(video_frame, cv2.COLOR_RGB2BGR)
out.write(bgr_frame)
except asyncio.TimeoutError:
print(f"Warning: Timeout at frame {i}")
break
out.release()

finally:
await client.disconnect()
print("Disconnected from server")


async def main():
"""Main entry point."""
if len(sys.argv) != 4:
print("Usage: python lipsync_file.py <video_file> <wav_audio_file> <output_file>")
print("Example: python lipsync_file.py input.mp4 speech.wav output_lipsynced.mp4")
sys.exit(1)

video_path = sys.argv[1]
audio_path = sys.argv[2]
output_path = sys.argv[3]

# Check input files exist
if not Path(video_path).exists():
print(f"Error: Video file not found: {video_path}")
sys.exit(1)

if not Path(audio_path).exists():
print(f"Error: Audio file not found: {audio_path}")
sys.exit(1)

# Process the files
await process_lipsync(video_path, audio_path, output_path)


if __name__ == "__main__":
asyncio.run(main())
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ dependencies = [
"aiohttp>=3.9.0",
"aiofiles>=23.0.0",
"pydantic>=2.0.0",
"websockets>=15.0.1",
"numpy>=2.0.2",
"opencv-python>=4.11.0.86",
]

[project.optional-dependencies]
Expand Down
Loading
Loading