Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"websockets>=11.0.3",
],
extras_require={
"capture": ["videodb-capture-bin>=0.2.7"],
"capture": ["videodb-capture-bin>=0.2.8"],
},
classifiers=[
"Intended Audience :: Developers",
Expand Down
2 changes: 1 addition & 1 deletion videodb/__about__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@



__version__ = "0.4.0"
__version__ = "0.4.1"
__title__ = "videodb"
__author__ = "videodb"
__email__ = "contact@videodb.io"
Expand Down
46 changes: 30 additions & 16 deletions videodb/capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
import json
import uuid
import os
import warnings
from typing import Optional, Dict, List, Any

from videodb._constants import VIDEO_DB_API

logger = logging.getLogger(__name__)

def get_recorder_path():
def get_capture_binary_path():
"""
Attempts to find the path to the recorder binary.
Attempts to find the path to the capture binary.
If the optional 'videodb-capture-bin' package is not installed,
it raises a RuntimeError with instructions.
"""
Expand All @@ -21,13 +22,13 @@ def get_recorder_path():
except ImportError:
error_msg = (
"Capture runtime not found.\n"
"To use recording features, please install the capture dependencies:\n"
"To use capture features, please install the capture dependencies:\n"
"pip install 'videodb[capture]'"
)
logger.error(error_msg)
raise RuntimeError(error_msg)
except Exception as e:
logger.error(f"Failed to resolve recorder path: {e}")
logger.error(f"Failed to resolve capture binary path: {e}")
raise


Expand All @@ -53,6 +54,7 @@ def __init__(
self.type = type
self._client = client
self.store = False
self.is_primary = False

def __repr__(self):
return f"Channel(id={self.id}, name={self.name}, type={self.type})"
Expand Down Expand Up @@ -91,6 +93,7 @@ def to_dict(self) -> Dict[str, Any]:
"name": self.name,
"record": True,
"store": self.store,
"is_primary": self.is_primary,
}


Expand Down Expand Up @@ -167,14 +170,14 @@ def __init__(
self._session_id: Optional[str] = None
self._proc = None
self._futures: Dict[str, asyncio.Future] = {}
self._binary_path = get_recorder_path()
self._binary_path = get_capture_binary_path()
self._event_queue = asyncio.Queue()

def __repr__(self) -> str:
return f"CaptureClient(base_url={self.base_url})"

async def _ensure_process(self):
"""Ensure the recorder binary is running."""
"""Ensure the capture binary is running."""
if self._proc is not None and self._proc.returncode is None:
return

Expand All @@ -194,7 +197,7 @@ async def _ensure_process(self):
async def _send_command(
self, command: str, params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Send a command to the recorder binary and await response.
"""Send a command to the capture binary and await response.

:param str command: Command name.
:param dict params: Command parameters.
Expand All @@ -210,7 +213,7 @@ async def _send_command(
"params": params or {},
}

# Framing: videodb_recorder|<JSON>\n
# IPC protocol framing: videodb_recorder|<JSON>\n
message = f"videodb_recorder|{json.dumps(payload)}\n"
self._proc.stdin.write(message.encode("utf-8"))
await self._proc.stdin.drain()
Expand Down Expand Up @@ -254,18 +257,18 @@ async def _read_stdout_loop(self):
await self._event_queue.put(data)

except Exception as e:
logger.error(f"Failed to parse recorder message: {e}")
logger.error(f"Failed to parse capture message: {e}")

async def _read_stderr_loop(self):
"""Loop to read stderr and log messages."""
while True:
line = await self._proc.stderr.readline()
if not line:
break
logger.debug(f"[Recorder Binary]: {line.decode('utf-8', errors='replace').strip()}")
logger.debug(f"[Capture Binary]: {line.decode('utf-8', errors='replace').strip()}")

async def shutdown(self):
"""Cleanly terminate the recorder binary process."""
"""Cleanly terminate the capture binary process."""
if self._proc:
try:
# Try graceful shutdown command first
Expand Down Expand Up @@ -366,12 +369,26 @@ async def start_session(

:param str capture_session_id: The ID of the capture session.
:param list[Channel] channels: List of Channel objects to record.
:param str primary_video_channel_id: ID of the primary video channel.
Set channel.is_primary = True on the desired video channel.
:param str primary_video_channel_id: Deprecated. Set
channel.is_primary = True on the desired video channel instead.
:raises ValueError: If no channels are specified.
"""
if not channels:
raise ValueError("At least one channel must be specified for capture.")

if primary_video_channel_id is not None:
warnings.warn(
"primary_video_channel_id is deprecated. "
"Set channel.is_primary = True on the desired video channel instead.",
DeprecationWarning,
stacklevel=2,
)
for ch in channels:
if ch.id == primary_video_channel_id:
ch.is_primary = True
break

self._session_id = capture_session_id

payload = {
Expand All @@ -380,9 +397,6 @@ async def start_session(
"channels": [ch.to_dict() for ch in channels],
}

if primary_video_channel_id:
payload["primary_video_channel_id"] = primary_video_channel_id

await self._send_command("startRecording", payload)

async def stop_session(self) -> None:
Expand All @@ -392,7 +406,7 @@ async def stop_session(self) -> None:
await self._send_command("stopRecording", {"sessionId": self._session_id})

async def events(self):
"""Async generator that yields events from the recorder."""
"""Async generator that yields events from the capture binary."""
while True:
try:
# Use a timeout so we can check if the process is still alive
Expand Down
58 changes: 57 additions & 1 deletion videodb/capture_session.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import List
from typing import List, Optional
from videodb._constants import ApiPath
from videodb.capture import VideoChannel, AudioChannel
from videodb.rtstream import RTStream


Expand All @@ -10,6 +12,10 @@ class CaptureSession:
:ivar str end_user_id: ID of the end user
:ivar str client_id: Client-provided session ID
:ivar str status: Current status of the session
:ivar list channels: List of channel dicts with id, name, type, is_primary
:ivar str primary_video_channel_id: Channel ID of the primary video source
:ivar str export_status: Current export status (exporting, exported, failed)
:ivar dict exported_videos: Mapping of channel_id to exported video_id
"""

def __init__(self, _connection, id: str, collection_id: str, **kwargs) -> None:
Expand All @@ -35,6 +41,21 @@ def _update_attributes(self, data: dict) -> None:
self.callback_url = data.get("callback_url")
self.exported_video_id = data.get("exported_video_id")
self.metadata = data.get("metadata", {})
self.channels = []
for ch_data in data.get("channels", []):
ch_type = ch_data.get("type")
ch_id = ch_data.get("channel_id", "")
ch_name = ch_data.get("name", "")
if ch_type == "video":
ch = VideoChannel(id=ch_id, name=ch_name)
else:
ch = AudioChannel(id=ch_id, name=ch_name)
ch.store = ch_data.get("store", False)
ch.is_primary = ch_data.get("is_primary", False)
self.channels.append(ch)

self.primary_video_channel_id = data.get("primary_video_channel_id")
self.export_status = data.get("export_status")

self.rtstreams = []
for rts_data in data.get("rtstreams", []):
Expand All @@ -43,6 +64,15 @@ def _update_attributes(self, data: dict) -> None:
stream = RTStream(self._connection, **rts_data)
self.rtstreams.append(stream)

@property
def displays(self) -> List[VideoChannel]:
"""Video channels in the session.

:return: List of VideoChannel objects
:rtype: list[VideoChannel]
"""
return [ch for ch in self.channels if isinstance(ch, VideoChannel)]

def get_rtstream(self, category: str) -> List[RTStream]:
"""Get list of RTStreams by category.

Expand All @@ -59,3 +89,29 @@ def get_rtstream(self, category: str) -> List[RTStream]:
filtered_streams.append(stream)

return filtered_streams

def export(self, video_channel_id: Optional[str] = None, ws_connection_id: Optional[str] = None) -> dict:
"""Trigger export for this capture session.

Call repeatedly to poll for completion. Returns ``export_status``
of ``"exporting"`` while in progress, ``"exported"`` with
``video_id``, ``stream_url``, and ``player_url`` when done.

:param str video_channel_id: Optional channel ID of the video to export.
Defaults to the primary video channel.
:param str ws_connection_id: WebSocket connection ID for push
notification when export completes (optional).
:return: Export response with session_id, video_channel_id,
export_status, and video_id/stream_url/player_url when exported.
:rtype: dict
"""
data = {}
if video_channel_id:
data["video_channel_id"] = video_channel_id
if ws_connection_id:
data["connection_id"] = ws_connection_id

return self._connection.post(
path=f"{ApiPath.collection}/{self.collection_id}/{ApiPath.capture}/{ApiPath.session}/{self.id}/{ApiPath.export}",
data=data,
)
8 changes: 4 additions & 4 deletions videodb/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,10 +707,10 @@ def clip(
) -> str:
"""Generate a clip from the video using a prompt.
:param str prompt: Prompt to generate the clip
:param str content_type: Content type for the clip
:param str model_name: Model name for generation
:return: The stream url of the generated clip
:rtype: str
:param str content_type: Content type for the clip. Valid options: "spoken", "visual", "multimodal"
:param str model_name: Model tier for generation. Valid options: "basic", "pro", "ultra"
:return: The search result of the generated clip
:rtype: :class:`SearchResult <SearchResult>`
"""

clip_data = self._connection.post(
Expand Down