-
Notifications
You must be signed in to change notification settings - Fork 45
feat(nodes): add video embedding node for semantic search and RAG pipelines #516
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
489d8a0
a3b2713
31551b8
6420b6e
34effd8
453efd7
ee44d9c
8d466ab
16fa8e3
fbb0774
8b922fd
8086fda
006e8d2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,91 @@ | ||
| # ============================================================================= | ||
| # MIT License | ||
| # Copyright (c) 2026 Aparavi Software AG | ||
| # | ||
| # Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| # of this software and associated documentation files (the "Software"), to deal | ||
| # in the Software without restriction, including without limitation the rights | ||
| # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| # copies of the Software, and to permit persons to whom the Software is | ||
| # furnished to do so, subject to the following conditions: | ||
| # | ||
| # The above copyright notice and this permission notice shall be included in | ||
| # all copies or substantial portions of the Software. | ||
| # | ||
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| # SOFTWARE. | ||
| # ============================================================================= | ||
|
|
||
| import os | ||
| import threading | ||
| from rocketlib import IGlobalBase, OPEN_MODE | ||
| from ai.common.config import Config | ||
|
|
||
|
|
||
| class IGlobal(IGlobalBase): | ||
| """ | ||
| IGlobal manages global lifecycle events for the video embedding node. | ||
|
|
||
| It handles setup of the embedding model, frame extraction configuration, | ||
| and shared resources at the start of the node's global lifecycle, and | ||
| ensures proper cleanup when the lifecycle ends. | ||
| """ | ||
|
|
||
| def beginGlobal(self): | ||
| """ | ||
| Initialize resources needed for the video embedding node. | ||
|
|
||
| This includes: | ||
| - Installing node-specific dependencies from requirements.txt. | ||
| - Importing and loading the embedding model for frame embedding. | ||
| - Loading frame extraction configuration (interval, max frames, etc.). | ||
| - Creating a thread lock for device access during video processing. | ||
| """ | ||
| if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG: | ||
| return | ||
|
|
||
| from depends import depends | ||
|
|
||
| requirements = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt') | ||
| depends(requirements) | ||
|
|
||
| # Import torch to ensure the PyTorch framework is loaded. | ||
| import ai.common.torch # noqa: F401 | ||
|
|
||
| # Import Embedding class locally to avoid circular imports. | ||
| from nodes.embedding_image.embedding import Embedding | ||
|
|
||
| # Access the 'bag' from the current endpoint. | ||
| bag = self.IEndpoint.endpoint.bag | ||
|
|
||
| # Retrieve node-specific configuration. | ||
| config = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig) | ||
|
|
||
| # Instantiate the Embedding object for generating frame embeddings. | ||
| # Pass raw connConfig since Embedding calls getNodeConfig internally. | ||
| self.embedding = Embedding(self.glb.logicalType, self.glb.connConfig, bag) | ||
|
|
||
| # Store frame extraction settings from the config. | ||
| self.frame_interval = config.get('interval', 5) | ||
| self.max_frames = config.get('max_frames', 50) | ||
| self.start_time = config.get('start_time', 0) | ||
| self.duration = config.get('duration', 0) | ||
| self.max_video_size_bytes = config.get('maxVideoSizeMB', 500) * 1024 * 1024 | ||
|
|
||
| # Mutex for device access during video processing. | ||
| self.device_lock = threading.Lock() | ||
|
|
||
| def endGlobal(self): | ||
| """ | ||
| Clean up resources when the global lifecycle ends. | ||
|
|
||
| Releases the embedding model reference and device lock to allow | ||
| garbage collection of resources. | ||
| """ | ||
| self.embedding = None | ||
| self.device_lock = None | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,225 @@ | ||
| # ============================================================================= | ||
| # MIT License | ||
| # Copyright (c) 2026 Aparavi Software AG | ||
| # | ||
| # Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| # of this software and associated documentation files (the "Software"), to deal | ||
| # in the Software without restriction, including without limitation the rights | ||
| # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| # copies of the Software, and to permit persons to whom the Software is | ||
| # furnished to do so, subject to the following conditions: | ||
| # | ||
| # The above copyright notice and this permission notice shall be included in | ||
| # all copies or substantial portions of the Software. | ||
| # | ||
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| # SOFTWARE. | ||
| # ============================================================================= | ||
|
|
||
| from rocketlib import IInstanceBase, AVI_ACTION, debug, warning | ||
| from ai.common.schema import Doc, DocMetadata | ||
| from .IGlobal import IGlobal | ||
|
|
||
| # Supported video MIME types and their corresponding file extensions. | ||
| SUPPORTED_VIDEO_TYPES = { | ||
| 'video/mp4': '.mp4', | ||
| 'video/x-msvideo': '.avi', | ||
| 'video/quicktime': '.mov', | ||
| 'video/webm': '.webm', | ||
| } | ||
|
Comment on lines
+29
to
+34
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial
The mapping of MIME types to file extensions is defined but not utilized for validation or for determining the temp file extension. Consider either:
♻️ Suggested usage in _process_video def _process_video(self, video_bytes: bytes):
...
import tempfile
import os
- tmp_fd, tmp_path = tempfile.mkstemp(suffix='.mp4')
+ # Use the correct extension based on MIME type, default to .mp4
+ extension = SUPPORTED_VIDEO_TYPES.get(self._mime_type, '.mp4')
+ tmp_fd, tmp_path = tempfile.mkstemp(suffix=extension)🤖 Prompt for AI Agents
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SUPPORTED_VIDEO_TYPES is used at line 115 to determine the correct temp file extension based on MIME type:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rate Limit Exceeded
|
||
|
|
||
|
|
||
| class IInstance(IInstanceBase): | ||
| """ | ||
| IInstance manages instance-level processing of video data for embedding. | ||
|
|
||
| It receives video data via the writeVideo streaming interface, extracts | ||
| frames at configurable intervals using OpenCV, generates embeddings for | ||
| each frame using the global embedding model, and outputs documents with | ||
| embedding vectors and timestamp metadata. | ||
| """ | ||
|
|
||
| IGlobal: IGlobal | ||
| """ | ||
| Reference to the global context object of type IGlobal. | ||
|
|
||
| This provides access to shared resources like the embedding model and | ||
| frame extraction configuration. | ||
| """ | ||
|
|
||
| def __init__(self, *args, **kwargs): | ||
| """Initialize the video embedding instance with frame tracking state.""" | ||
| super().__init__(*args, **kwargs) | ||
| self._frame_chunk_id = 0 | ||
| self._video_data = None | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| self._mime_type = None | ||
|
|
||
| def writeVideo(self, action: int, mimeType: str, buffer: bytes): | ||
| """ | ||
| Handle streaming video data via the AVI action protocol. | ||
|
|
||
| Video data arrives in chunks through BEGIN/WRITE/END actions. Once | ||
| all data is received, frames are extracted and embedded. | ||
|
|
||
| Args: | ||
| action (int): The AVI action type (BEGIN, WRITE, or END). | ||
| mimeType (str): The MIME type of the video data. | ||
| buffer (bytes): The video data chunk. | ||
| """ | ||
| if action == AVI_ACTION.BEGIN: | ||
| self._video_data = bytearray() | ||
| self._mime_type = mimeType | ||
|
asclearuc marked this conversation as resolved.
|
||
|
|
||
| elif action == AVI_ACTION.WRITE: | ||
| if self._video_data is not None: | ||
| max_size = self.IGlobal.max_video_size_bytes | ||
| if len(self._video_data) + len(buffer) > max_size: | ||
| max_mb = max_size / (1024 * 1024) | ||
|
asclearuc marked this conversation as resolved.
|
||
| warning(f'Video exceeds maximum allowed size of {max_mb:.0f} MB, rejecting') | ||
| self._video_data = None | ||
| return | ||
| self._video_data += buffer | ||
|
|
||
| elif action == AVI_ACTION.END: | ||
| video_data = self._video_data | ||
| self._video_data = None | ||
| if video_data is not None and len(video_data) > 0: | ||
| self._process_video(bytes(video_data)) | ||
| elif video_data is None: | ||
| warning('Video was rejected (size limit exceeded or missing BEGIN), skipping embedding') | ||
|
|
||
| def _process_video(self, video_bytes: bytes): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial Consider splitting Static analysis flags this method as having too many statements (64 > 50). While the current implementation is readable and follows a logical flow, consider extracting helper methods for:
This is optional given the method's clear structure. 🧰 Tools🪛 Ruff (0.15.7)[warning] 90-90: Too many statements (64 > 50) (PLR0915) [warning] 90-90: Missing return type annotation for private function Add return type annotation: (ANN202) 🤖 Prompt for AI Agents
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Acknowledged. The method is inherently complex due to OpenCV frame extraction, embedding generation, and document creation all in one pass. I've kept it as a single method since extracting sub-methods would fragment the logic without improving clarity. The try/finally blocks ensure proper cleanup of temp files and VideoCapture resources.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rate Limit Exceeded
|
||
| """ | ||
| Extract frames from video bytes and generate embeddings for each frame. | ||
|
|
||
| Uses OpenCV to decode the video from an in-memory buffer, extracts | ||
| frames at the configured interval, and creates embedding documents | ||
| for each extracted frame. | ||
|
|
||
| Args: | ||
| video_bytes (bytes): The complete video file content. | ||
| """ | ||
| from ai.common.opencv import cv2 | ||
| from ai.common.image import ImageProcessor | ||
| import tempfile | ||
| import os | ||
|
|
||
| # Write video bytes to a temporary file for OpenCV to read. | ||
| # OpenCV's VideoCapture requires a file path or device index. | ||
| suffix = SUPPORTED_VIDEO_TYPES.get(self._mime_type, '.mp4') | ||
| tmp_fd, tmp_path = tempfile.mkstemp(suffix=suffix) | ||
| try: | ||
| with os.fdopen(tmp_fd, 'wb') as f: | ||
| f.write(video_bytes) | ||
|
|
||
| cap = cv2.VideoCapture(tmp_path) | ||
| try: | ||
| if not cap.isOpened(): | ||
| warning('Failed to open video file for frame extraction') | ||
| return | ||
|
|
||
| fps = cap.get(cv2.CAP_PROP_FPS) | ||
| if fps <= 0: | ||
| fps = 30.0 # Fallback to a reasonable default | ||
|
|
||
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | ||
| video_duration = total_frames / fps | ||
|
|
||
| # Determine extraction boundaries. | ||
| start_time = self.IGlobal.start_time | ||
| duration = self.IGlobal.duration | ||
| if duration <= 0: | ||
| end_time = video_duration | ||
| else: | ||
| end_time = min(start_time + duration, video_duration) | ||
|
|
||
| interval = self.IGlobal.frame_interval | ||
| max_frames = self.IGlobal.max_frames | ||
|
|
||
| frames_extracted = 0 | ||
| frame_interval_frames = max(1, int(interval * fps)) | ||
| documents = [] | ||
|
|
||
| current_frame_pos = int(start_time * fps) if start_time > 0 else 0 | ||
| # Track where VideoCapture's read cursor sits so we can skip | ||
| # redundant seeks when the next frame is already next in line. | ||
| last_read_pos = -2 | ||
|
|
||
| while True: | ||
| # Check max frames limit. | ||
| if max_frames > 0 and frames_extracted >= max_frames: | ||
| break | ||
|
|
||
| # Check if we've gone past end time. | ||
| current_time = current_frame_pos / fps | ||
| if current_time >= end_time: | ||
| break | ||
|
|
||
| # Only seek when the next frame to read isn't already the | ||
| # one VideoCapture would return next. Seeking on every | ||
| # iteration is wasteful for sequential reads. | ||
| if current_frame_pos != last_read_pos + 1: | ||
| cap.set(cv2.CAP_PROP_POS_FRAMES, current_frame_pos) | ||
| ret, frame = cap.read() | ||
| if not ret: | ||
| break | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| last_read_pos = current_frame_pos | ||
|
|
||
| # Calculate the timestamp for this frame. | ||
| time_stamp = current_frame_pos / fps | ||
|
|
||
| # cv2.imencode expects OpenCV's native BGR layout and | ||
| # produces a standard PNG, which ImageProcessor then loads | ||
| # as a correct RGB PIL image for the embedding model. | ||
| _, png_buffer = cv2.imencode('.png', frame) | ||
| frame_bytes = png_buffer.tobytes() | ||
| pil_image = ImageProcessor.load_image_from_bytes(frame_bytes) | ||
|
|
||
| # Generate the embedding with device lock for thread safety. | ||
| with self.IGlobal.device_lock: | ||
| embedding = self.IGlobal.embedding.create_image_embedding(pil_image) | ||
|
|
||
| # Encode the frame as base64 for document storage. | ||
| frame_base64 = ImageProcessor.get_base64(pil_image) | ||
|
|
||
| # Create metadata with frame number and timestamp. | ||
| metadata = DocMetadata( | ||
| self, | ||
| chunkId=self._frame_chunk_id, | ||
| isTable=False, | ||
| tableId=0, | ||
| isDeleted=False, | ||
| ) | ||
| metadata.time_stamp = time_stamp | ||
| metadata.frame_number = current_frame_pos | ||
|
|
||
| # Create the document with the frame image and embedding. | ||
| doc = Doc(type='Image', page_content=frame_base64, metadata=metadata) | ||
| doc.embedding = embedding if isinstance(embedding, list) else embedding.tolist() | ||
| doc.embedding_model = self.IGlobal.embedding.model_name | ||
|
|
||
| documents.append(doc) | ||
|
|
||
| self._frame_chunk_id += 1 | ||
| frames_extracted += 1 | ||
|
|
||
| # Advance to the next frame position. | ||
| current_frame_pos += frame_interval_frames | ||
|
|
||
| # Emit all frame documents in a single call. | ||
| if documents: | ||
| self.instance.writeDocuments(documents) | ||
|
|
||
| debug(f'Video embedding complete: extracted {frames_extracted} frames, interval={interval:.1f}s') | ||
| finally: | ||
| cap.release() | ||
|
|
||
| finally: | ||
| # Clean up the temporary file. | ||
| if os.path.exists(tmp_path): | ||
| os.unlink(tmp_path) | ||
|
Comment on lines
+96
to
+225
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial Method complexity exceeds recommended threshold (68 > 50 statements). Static analysis flags this method as too complex. While the logic is clear and well-commented, consider extracting:
This is noted in a past review comment and remains a recommended refactor for maintainability. 🧰 Tools🪛 Ruff (0.15.7)[warning] 96-96: Too many statements (68 > 50) (PLR0915) [warning] 96-96: Missing return type annotation for private function Add return type annotation: (ANN202) 🤖 Prompt for AI Agents
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See reply on the other process_video complexity comment. Keeping it as one method for now since the logic is inherently sequential (extract frame -> embed -> create doc) with shared state.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rate Limit Exceeded
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| # ============================================================================= | ||
| # MIT License | ||
| # Copyright (c) 2026 Aparavi Software AG | ||
| # | ||
| # Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| # of this software and associated documentation files (the "Software"), to deal | ||
| # in the Software without restriction, including without limitation the rights | ||
| # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| # copies of the Software, and to permit persons to whom the Software is | ||
| # furnished to do so, subject to the following conditions: | ||
| # | ||
| # The above copyright notice and this permission notice shall be included in | ||
| # all copies or substantial portions of the Software. | ||
| # | ||
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| # SOFTWARE. | ||
| # ============================================================================= | ||
|
|
||
| from .IGlobal import IGlobal | ||
| from .IInstance import IInstance | ||
|
|
||
| __all__ = ['IGlobal', 'IInstance'] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| transformers | ||
| accelerate |
Uh oh!
There was an error while loading. Please reload this page.