Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions nodes/src/nodes/embedding_video/IGlobal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# =============================================================================
# MIT License
# Copyright (c) 2026 Aparavi Software AG
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# =============================================================================

import os
import threading
from rocketlib import IGlobalBase, OPEN_MODE
from ai.common.config import Config


class IGlobal(IGlobalBase):
"""
IGlobal manages global lifecycle events for the video embedding node.

It handles setup of the embedding model, frame extraction configuration,
and shared resources at the start of the node's global lifecycle, and
ensures proper cleanup when the lifecycle ends.
"""

def beginGlobal(self):
"""
Initialize resources needed for the video embedding node.

This includes:
- Installing node-specific dependencies from requirements.txt.
- Importing and loading the embedding model for frame embedding.
- Loading frame extraction configuration (interval, max frames, etc.).
- Creating a thread lock for device access during video processing.
"""
if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG:
return

from depends import depends
Comment thread
asclearuc marked this conversation as resolved.

requirements = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt')
depends(requirements)

# Import torch to ensure the PyTorch framework is loaded.
import ai.common.torch # noqa: F401

# Import Embedding class locally to avoid circular imports.
from nodes.embedding_image.embedding import Embedding

# Access the 'bag' from the current endpoint.
bag = self.IEndpoint.endpoint.bag

# Retrieve node-specific configuration.
config = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig)

# Instantiate the Embedding object for generating frame embeddings.
# Pass raw connConfig since Embedding calls getNodeConfig internally.
self.embedding = Embedding(self.glb.logicalType, self.glb.connConfig, bag)

# Store frame extraction settings from the config.
self.frame_interval = config.get('interval', 5)
self.max_frames = config.get('max_frames', 50)
self.start_time = config.get('start_time', 0)
self.duration = config.get('duration', 0)
self.max_video_size_bytes = config.get('maxVideoSizeMB', 500) * 1024 * 1024

# Mutex for device access during video processing.
self.device_lock = threading.Lock()

def endGlobal(self):
"""
Clean up resources when the global lifecycle ends.

Releases the embedding model reference and device lock to allow
garbage collection of resources.
"""
self.embedding = None
self.device_lock = None
225 changes: 225 additions & 0 deletions nodes/src/nodes/embedding_video/IInstance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
# =============================================================================
# MIT License
# Copyright (c) 2026 Aparavi Software AG
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# =============================================================================

from rocketlib import IInstanceBase, AVI_ACTION, debug, warning
from ai.common.schema import Doc, DocMetadata
from .IGlobal import IGlobal

# Supported video MIME types and their corresponding file extensions.
SUPPORTED_VIDEO_TYPES = {
'video/mp4': '.mp4',
'video/x-msvideo': '.avi',
'video/quicktime': '.mov',
'video/webm': '.webm',
}
Comment on lines +29 to +34
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

SUPPORTED_VIDEO_TYPES is defined but never used.

The mapping of MIME types to file extensions is defined but not utilized for validation or for determining the temp file extension. Consider either:

  1. Using it to validate incoming video types and select the appropriate temp file extension
  2. Removing it if validation is not required
♻️ Suggested usage in _process_video
     def _process_video(self, video_bytes: bytes):
         ...
         import tempfile
         import os
 
-        tmp_fd, tmp_path = tempfile.mkstemp(suffix='.mp4')
+        # Use the correct extension based on MIME type, default to .mp4
+        extension = SUPPORTED_VIDEO_TYPES.get(self._mime_type, '.mp4')
+        tmp_fd, tmp_path = tempfile.mkstemp(suffix=extension)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/embedding_video/IInstance.py` around lines 33 - 38,
SUPPORTED_VIDEO_TYPES is declared but never used; update the _process_video
function to validate incoming MIME types against SUPPORTED_VIDEO_TYPES and use
the mapped extension when creating temp files (or remove the constant if you opt
not to validate). Specifically, in _process_video, read the incoming video's
MIME/type header or inferred mime variable, check membership in
SUPPORTED_VIDEO_TYPES, throw or return a clear error when unsupported, and when
creating the temporary filename use SUPPORTED_VIDEO_TYPES[mime] to set the file
extension so temp files have the correct suffix; reference SUPPORTED_VIDEO_TYPES
and the _process_video function to make the change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SUPPORTED_VIDEO_TYPES is used at line 115 to determine the correct temp file extension based on MIME type: suffix = SUPPORTED_VIDEO_TYPES.get(self._mime_type, '.mp4'). It was added in a prior fix commit (453efd7) precisely to address the hardcoded .mp4 concern.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rate Limit Exceeded

@charliegillet have exceeded the limit for the number of chat messages per hour. Please wait 1 minutes and 27 seconds before sending another message.



class IInstance(IInstanceBase):
"""
IInstance manages instance-level processing of video data for embedding.

It receives video data via the writeVideo streaming interface, extracts
frames at configurable intervals using OpenCV, generates embeddings for
each frame using the global embedding model, and outputs documents with
embedding vectors and timestamp metadata.
"""

IGlobal: IGlobal
"""
Reference to the global context object of type IGlobal.

This provides access to shared resources like the embedding model and
frame extraction configuration.
"""

def __init__(self, *args, **kwargs):
"""Initialize the video embedding instance with frame tracking state."""
super().__init__(*args, **kwargs)
self._frame_chunk_id = 0
self._video_data = None
Comment thread
coderabbitai[bot] marked this conversation as resolved.
self._mime_type = None

def writeVideo(self, action: int, mimeType: str, buffer: bytes):
"""
Handle streaming video data via the AVI action protocol.

Video data arrives in chunks through BEGIN/WRITE/END actions. Once
all data is received, frames are extracted and embedded.

Args:
action (int): The AVI action type (BEGIN, WRITE, or END).
mimeType (str): The MIME type of the video data.
buffer (bytes): The video data chunk.
"""
if action == AVI_ACTION.BEGIN:
self._video_data = bytearray()
self._mime_type = mimeType
Comment thread
asclearuc marked this conversation as resolved.

elif action == AVI_ACTION.WRITE:
if self._video_data is not None:
max_size = self.IGlobal.max_video_size_bytes
if len(self._video_data) + len(buffer) > max_size:
max_mb = max_size / (1024 * 1024)
Comment thread
asclearuc marked this conversation as resolved.
warning(f'Video exceeds maximum allowed size of {max_mb:.0f} MB, rejecting')
self._video_data = None
return
self._video_data += buffer

elif action == AVI_ACTION.END:
video_data = self._video_data
self._video_data = None
if video_data is not None and len(video_data) > 0:
self._process_video(bytes(video_data))
elif video_data is None:
warning('Video was rejected (size limit exceeded or missing BEGIN), skipping embedding')

def _process_video(self, video_bytes: bytes):
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider splitting _process_video for maintainability.

Static analysis flags this method as having too many statements (64 > 50). While the current implementation is readable and follows a logical flow, consider extracting helper methods for:

  • Frame extraction setup (lines 112-143)
  • Single frame processing (lines 161-199)

This is optional given the method's clear structure.

🧰 Tools
🪛 Ruff (0.15.7)

[warning] 90-90: Too many statements (64 > 50)

(PLR0915)


[warning] 90-90: Missing return type annotation for private function _process_video

Add return type annotation: None

(ANN202)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/embedding_video/IInstance.py` at line 90, The _process_video
method in IInstance.py is too large; extract the frame extraction setup logic
into a new helper (e.g., _setup_frame_extraction) and extract per-frame logic
into another helper (e.g., _process_frame) so _process_video becomes a
high-level orchestrator calling these helpers; move the code that initializes
capture, codec/format checks and any config/loop setup into
_setup_frame_extraction, move the code that decodes/processes a single frame,
computes embeddings and handles per-frame state into _process_frame, and update
_process_video to call these helpers passing required parameters (frame index,
frame bytes, capture handle, config) while preserving exceptions, return values
and variable scope.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged. The method is inherently complex due to OpenCV frame extraction, embedding generation, and document creation all in one pass. I've kept it as a single method since extracting sub-methods would fragment the logic without improving clarity. The try/finally blocks ensure proper cleanup of temp files and VideoCapture resources.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rate Limit Exceeded

@charliegillet have exceeded the limit for the number of chat messages per hour. Please wait 1 minutes and 23 seconds before sending another message.

"""
Extract frames from video bytes and generate embeddings for each frame.

Uses OpenCV to decode the video from an in-memory buffer, extracts
frames at the configured interval, and creates embedding documents
for each extracted frame.

Args:
video_bytes (bytes): The complete video file content.
"""
from ai.common.opencv import cv2
from ai.common.image import ImageProcessor
import tempfile
import os

# Write video bytes to a temporary file for OpenCV to read.
# OpenCV's VideoCapture requires a file path or device index.
suffix = SUPPORTED_VIDEO_TYPES.get(self._mime_type, '.mp4')
tmp_fd, tmp_path = tempfile.mkstemp(suffix=suffix)
try:
with os.fdopen(tmp_fd, 'wb') as f:
f.write(video_bytes)

cap = cv2.VideoCapture(tmp_path)
try:
if not cap.isOpened():
warning('Failed to open video file for frame extraction')
return

fps = cap.get(cv2.CAP_PROP_FPS)
if fps <= 0:
fps = 30.0 # Fallback to a reasonable default

total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
video_duration = total_frames / fps

# Determine extraction boundaries.
start_time = self.IGlobal.start_time
duration = self.IGlobal.duration
if duration <= 0:
end_time = video_duration
else:
end_time = min(start_time + duration, video_duration)

interval = self.IGlobal.frame_interval
max_frames = self.IGlobal.max_frames

frames_extracted = 0
frame_interval_frames = max(1, int(interval * fps))
documents = []

current_frame_pos = int(start_time * fps) if start_time > 0 else 0
# Track where VideoCapture's read cursor sits so we can skip
# redundant seeks when the next frame is already next in line.
last_read_pos = -2

while True:
# Check max frames limit.
if max_frames > 0 and frames_extracted >= max_frames:
break

# Check if we've gone past end time.
current_time = current_frame_pos / fps
if current_time >= end_time:
break

# Only seek when the next frame to read isn't already the
# one VideoCapture would return next. Seeking on every
# iteration is wasteful for sequential reads.
if current_frame_pos != last_read_pos + 1:
cap.set(cv2.CAP_PROP_POS_FRAMES, current_frame_pos)
ret, frame = cap.read()
if not ret:
break
Comment thread
coderabbitai[bot] marked this conversation as resolved.
last_read_pos = current_frame_pos

# Calculate the timestamp for this frame.
time_stamp = current_frame_pos / fps

# cv2.imencode expects OpenCV's native BGR layout and
# produces a standard PNG, which ImageProcessor then loads
# as a correct RGB PIL image for the embedding model.
_, png_buffer = cv2.imencode('.png', frame)
frame_bytes = png_buffer.tobytes()
pil_image = ImageProcessor.load_image_from_bytes(frame_bytes)

# Generate the embedding with device lock for thread safety.
with self.IGlobal.device_lock:
embedding = self.IGlobal.embedding.create_image_embedding(pil_image)

# Encode the frame as base64 for document storage.
frame_base64 = ImageProcessor.get_base64(pil_image)

# Create metadata with frame number and timestamp.
metadata = DocMetadata(
self,
chunkId=self._frame_chunk_id,
isTable=False,
tableId=0,
isDeleted=False,
)
metadata.time_stamp = time_stamp
metadata.frame_number = current_frame_pos

# Create the document with the frame image and embedding.
doc = Doc(type='Image', page_content=frame_base64, metadata=metadata)
doc.embedding = embedding if isinstance(embedding, list) else embedding.tolist()
doc.embedding_model = self.IGlobal.embedding.model_name

documents.append(doc)

self._frame_chunk_id += 1
frames_extracted += 1

# Advance to the next frame position.
current_frame_pos += frame_interval_frames

# Emit all frame documents in a single call.
if documents:
self.instance.writeDocuments(documents)

debug(f'Video embedding complete: extracted {frames_extracted} frames, interval={interval:.1f}s')
finally:
cap.release()

finally:
# Clean up the temporary file.
if os.path.exists(tmp_path):
os.unlink(tmp_path)
Comment on lines +96 to +225
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Method complexity exceeds recommended threshold (68 > 50 statements).

Static analysis flags this method as too complex. While the logic is clear and well-commented, consider extracting:

  • Frame extraction setup (lines 120-152) into _setup_extraction(cap) -> tuple
  • Per-frame processing (lines 170-206) into _process_frame(frame, current_frame_pos, fps) -> Doc

This is noted in a past review comment and remains a recommended refactor for maintainability.

🧰 Tools
🪛 Ruff (0.15.7)

[warning] 96-96: Too many statements (68 > 50)

(PLR0915)


[warning] 96-96: Missing return type annotation for private function _process_video

Add return type annotation: None

(ANN202)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/embedding_video/IInstance.py` around lines 96 - 223, The
_process_video method is too large; extract the setup and per-frame logic into
helpers to reduce complexity: pull the "Frame extraction setup" block that
computes fps, total_frames, video_duration, start_time/end_time, interval,
max_frames, start_frame, frame_interval_frames and current_frame_pos into a new
method named _setup_extraction(cap) that returns (fps, end_time,
frame_interval_frames, current_frame_pos, max_frames, interval) and use it from
_process_video; pull the per-frame steps that convert BGR->RGB, build PIL image,
call self.IGlobal.device_lock and self.IGlobal.embedding.create_image_embedding,
encode PNG, create DocMetadata and Doc, set embedding and embedding_model, and
call self.instance.writeDocuments into a new method named _process_frame(frame,
current_frame_pos, fps) that returns the created Doc (or None on failure), then
simplify the loop in _process_video to call _process_frame and handle
frames_extracted, _frame_chunk_id, and logging exactly as before so behavior
(use of DocMetadata, Doc, self.IGlobal.device_lock, embedding.model_name, and
self.instance.writeDocuments) remains unchanged.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See reply on the other process_video complexity comment. Keeping it as one method for now since the logic is inherently sequential (extract frame -> embed -> create doc) with shared state.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rate Limit Exceeded

@charliegillet have exceeded the limit for the number of chat messages per hour. Please wait 1 minutes and 16 seconds before sending another message.

27 changes: 27 additions & 0 deletions nodes/src/nodes/embedding_video/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# =============================================================================
# MIT License
# Copyright (c) 2026 Aparavi Software AG
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# =============================================================================

from .IGlobal import IGlobal
from .IInstance import IInstance

__all__ = ['IGlobal', 'IInstance']
2 changes: 2 additions & 0 deletions nodes/src/nodes/embedding_video/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
transformers
accelerate
Loading
Loading