diff --git a/mio/behavior_cam.py b/mio/behavior_cam.py new file mode 100644 index 00000000..e72f4ad8 --- /dev/null +++ b/mio/behavior_cam.py @@ -0,0 +1,319 @@ +""" +Behavior camera capture using multiprocessing for USB cameras. +""" + +import multiprocessing +import os +import queue +import time +from pathlib import Path +from typing import Optional, Union + +import cv2 +import numpy as np + +from mio import init_logger +from mio.devices.usbcam import convert_frame_for_codec, open_camera +from mio.exceptions import EndOfRecordingException +from mio.io import BufferedCSVWriter, VideoWriter +from mio.models.usbcam import USBCameraRecordingConfig +from mio.types import ConfigSource +from mio.utils import exact_iter + +FPS_LOG_INTERVAL_SECONDS = 10.0 +FRAME_QUEUE_MAXSIZE = 100 +CSV_BUFFER_SIZE = 100 +QUEUE_TIMEOUT_SECONDS = 1.0 + + +class BehaviorCam: + """ + Behavior camera capture class using multiprocessing. + + Separates camera interface from capture/writing logic using multiprocessing. + Similar architecture to :class:`.StreamDaq`. + """ + + def __init__( + self, + recording_config: Union[USBCameraRecordingConfig, ConfigSource], + camera_index: int, + ) -> None: + """ + Initialize behavior camera capture. + + Args: + recording_config: Configuration object, config ID, or path to config file + camera_index: Index of the camera to use + """ + self.logger = init_logger("behaviorCam") + self.config = USBCameraRecordingConfig.from_any(recording_config) + self.camera_index = camera_index + self.terminate: multiprocessing.Event = multiprocessing.Event() + + def _camera_recv( + self, + frame_queue: multiprocessing.Queue, + ) -> None: + """ + Read frames from camera and put them in the queue. + + This runs in a separate process to decouple camera I/O from writing. + + Args: + frame_queue: Queue to put frames into + """ + locallogs = init_logger("behaviorCam.camera_recv") + + try: + cap = open_camera( + camera_index=self.camera_index, + frame_width=self.config.frame_width, + frame_height=self.config.frame_height, + fps=self.config.fps, + capture_format=self.config.format, + ) + except RuntimeError: + frame_queue.put(None) + raise + + locallogs.info("Camera opened, starting frame capture") + + try: + while not self.terminate.is_set(): + try: + ret, frame = cap.read() + except EndOfRecordingException: + locallogs.info("End of recorded data") + break + if ret: + # Get timestamp for this frame (float unix time in seconds) + unix_time = time.time() + try: + frame_queue.put( + (frame, unix_time), + block=True, + timeout=QUEUE_TIMEOUT_SECONDS, + ) + except queue.Full: + locallogs.warning("Frame queue full, skipping frame") + else: + locallogs.warning("Failed to read frame from camera") + time.sleep(0.01) # Small delay before retry + finally: + cap.release() + locallogs.debug("Camera released, putting sentinel in queue") + try: + frame_queue.put(None, block=True, timeout=QUEUE_TIMEOUT_SECONDS) + except queue.Full: + locallogs.error("Frame queue full, could not put sentinel") + + def capture( + self, + output_dir: Optional[str] = None, + show_video: bool = True, + capture_binary: Optional[Path] = None, + ) -> None: + """ + Start frame capture and recording. + + Args: + output_dir: Output directory (defaults to config.output_dir) + show_video: If True, display video preview window + capture_binary: If set, save raw frames and timestamps to this ``.npz`` path + """ + self.terminate.clear() + + output_dir = output_dir or self.config.output_dir + os.makedirs(output_dir, exist_ok=True) + + # Create video writer with Unix timestamp filename + timestamp = int(time.time()) # seconds (for filename) + + # Determine container format based on codec + if self.config.codec in ("libx264", "h264"): + video_ext = ".mp4" + container_format = "mp4" + else: + video_ext = ".avi" + container_format = "avi" + + video_path = Path(output_dir) / f"{timestamp}{video_ext}" + csv_path = Path(output_dir) / f"{timestamp}.csv" + + # Get actual resolution and fps (may differ from requested) + # We'll get these from the first frame + actual_fps = self.config.fps + actual_width = self.config.frame_width + actual_height = self.config.frame_height + + # Build output_dict (pix_fmt only used by skvideo backend, cv2 ignores it) + output_dict = { + "-vcodec": self.config.codec, + "-f": container_format, + "-vsync": "0", # Disable frame sync - write frames as-is (same as StreamDaq) + } + # Only add pix_fmt for skvideo backend (cv2 doesn't use it) + if self.config.backend == "skvideo" and self.config.pix_fmt is not None: + output_dict["-pix_fmt"] = self.config.pix_fmt + + writer = VideoWriter( + path=video_path, + fps=actual_fps, + output_dict=output_dict, + backend=self.config.backend, + ) + + csv_writer = BufferedCSVWriter( + file_path=csv_path, + header=["frame_index", "unix_time"], + buffer_size=CSV_BUFFER_SIZE, + ) + + shared_resource_manager = multiprocessing.Manager() + frame_queue = shared_resource_manager.Queue(maxsize=FRAME_QUEUE_MAXSIZE) + + p_camera = multiprocessing.Process( + target=self._camera_recv, + args=(frame_queue,), + name="camera_recv", + ) + + p_camera.start() + + self.logger.info(f"Recording to {video_path}") + self.logger.info("Press 'q' to stop recording") + + frames_written = 0 + first_frame = True + start_time = None + last_fps_log_time = None + frames_in_window = 0 + writer_used = False + binary_frames = [] if capture_binary else None + binary_timestamps = [] if capture_binary else None + + try: + for frame_data in exact_iter(frame_queue.get, None): + frame, unix_time = frame_data + + # Get actual dimensions from first frame and initialize timing + if first_frame: + actual_height, actual_width = frame.shape[:2] + self.logger.info( + f"Resolution: {actual_width}x{actual_height} @ {actual_fps}fps" + ) + # Start FPS counting from the first grabbed frame + start_time = unix_time + last_fps_log_time = unix_time + first_frame = False + + # Convert frame based on codec + frame_out = convert_frame_for_codec(frame, self.config.codec) + + # Write frame to video + writer.write_frame(frame_out) + writer_used = True + + # Write frame metadata to CSV + csv_writer.append( + { + "frame_index": frames_written, + "unix_time": unix_time, + } + ) + + if capture_binary: + binary_frames.append(frame) + binary_timestamps.append(unix_time) + + frames_written += 1 + frames_in_window += 1 + + # Log FPS at regular intervals (FPS for the last window) + current_time = time.time() + window_elapsed = current_time - last_fps_log_time + if window_elapsed >= FPS_LOG_INTERVAL_SECONDS: + measured_fps = frames_in_window / window_elapsed + total_elapsed = current_time - start_time + self.logger.info( + f"\nFPS:\t{measured_fps:.2f}\nFrames:\t{frames_written} \n" + f"Time:\t{total_elapsed:.1f}s \n" + ) + last_fps_log_time = current_time + frames_in_window = 0 + + # Show preview and check for 'q' key + if show_video: + try: + cv2.imshow("Recording", frame) + if cv2.waitKey(1) & 0xFF == ord("q"): + self.logger.info("Recording stopped by user ('q' key)") + break + except cv2.error as e: + self.logger.exception(f"Error displaying frame: {e}") + + except KeyboardInterrupt: + self.logger.info("Recording stopped by user (Ctrl+C)") + self.terminate.set() + except Exception as e: + self.logger.exception(f"Error during capture: {e}") + self.terminate.set() + finally: + # Wait for camera process to finish + self.terminate.set() + p_camera.join(timeout=5) + if p_camera.is_alive(): + self.logger.warning("Termination timeout: force terminating camera process") + p_camera.terminate() + p_camera.join() + + # Close writers (only if used) + if writer_used: + try: + writer.close() + except AttributeError as e: + # FFmpegWriter may not have _proc if no frames were written + self.logger.warning(f"Error closing video writer: {e}") + else: + # Remove empty video file if no frames were written + if video_path.exists(): + video_path.unlink() + csv_writer.close() + + if capture_binary and binary_frames: + np.savez( + capture_binary, + frames=np.array(binary_frames, dtype=np.uint8), + timestamps=np.array(binary_timestamps, dtype=np.float64), + ) + self.logger.info( + f"Saved binary data to {capture_binary} ({len(binary_frames)} frames)" + ) + + if show_video: + cv2.destroyAllWindows() + cv2.waitKey(100) + + if writer_used: + self.logger.info( + f"Saved recording to {video_path} ({frames_written} frames written)" + ) + + # Verify video frame count matches CSV row count + try: + cap = cv2.VideoCapture(str(video_path)) + video_frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + cap.release() + + if video_frame_count != frames_written: + self.logger.warning( + f"Frame count mismatch: video has {video_frame_count} frames " + f"but CSV has {frames_written} rows" + ) + except Exception as e: + self.logger.warning( + f"Could not verify video frame count ({self.config.backend} backend): {e}" + ) + else: + self.logger.warning("No frames were written, recording file removed") diff --git a/mio/cli/main.py b/mio/cli/main.py index 0c3d2834..0d2058ad 100644 --- a/mio/cli/main.py +++ b/mio/cli/main.py @@ -8,6 +8,7 @@ from mio.cli.process import process from mio.cli.stream import stream from mio.cli.update import device, update +from mio.cli.usbcam import usbcam from mio.cli.util import hash @@ -27,3 +28,4 @@ def cli(ctx: click.Context) -> None: cli.add_command(config) cli.add_command(process) cli.add_command(hash) +cli.add_command(usbcam) diff --git a/mio/cli/usbcam.py b/mio/cli/usbcam.py new file mode 100644 index 00000000..b08d7635 --- /dev/null +++ b/mio/cli/usbcam.py @@ -0,0 +1,172 @@ +""" +CLI commands for recording video from USB camera. +""" + +import os +from pathlib import Path +from typing import Optional + +import click + +from mio.behavior_cam import BehaviorCam +from mio.cli.common import ConfigIDOrPath +from mio.devices.usbcam import format_camera_info +from mio.devices.usbcam import list_cameras as list_available_cameras +from mio.models.usbcam import USBCameraRecordingConfig +from mio.ntp import prompt_ntp_sync + + +@click.group(invoke_without_command=True) +@click.option( + "--list", + "list_cameras", + is_flag=True, + help="List available cameras and exit", +) +@click.pass_context +def usbcam(ctx: click.Context, list_cameras: bool) -> None: + """ + Command group for USB Camera + """ + if list_cameras: + cameras = list_available_cameras() + if not cameras: + click.echo("No cameras found") + else: + click.echo("Available cameras:") + for idx, info in cameras.items(): + click.echo(f" {format_camera_info(idx, info)}") + ctx.exit() + + if ctx.invoked_subcommand is None: + click.echo(ctx.get_help()) + ctx.exit() + + +@usbcam.command() +@click.option( + "-c", + "--config", + required=True, + type=ConfigIDOrPath(), + help=( + "Either a config `id` or a path to USB camera config YAML file. " + "If path is relative, treated as relative to the current directory, " + "and then if no matching file is found, relative to the user `config_dir` " + "(see `mio config --help`)." + ), +) +@click.option( + "-o", + "--output-dir", + type=click.Path(), + help="Override output directory from config (optional)", +) +@click.option( + "-i", + "--index", + type=int, + help="Specify camera index (optional)", +) +@click.option( + "-b", + "--binary_export", + is_flag=True, + help="Save raw frames to a .npz file alongside the video", +) +@click.option("--no-display", is_flag=True, help="Don't show video preview in real time") +def record( + config: str, + output_dir: Optional[str], + index: Optional[int], + binary_export: bool, + no_display: bool, +) -> None: + """Record video with Unix timestamp filename""" + recording_config = USBCameraRecordingConfig.from_any(config) + + # Check NTP sync if configured + if recording_config.ntp_server is not None: + prompt_ntp_sync( + recording_config.ntp_server, max_offset_seconds=recording_config.ntp_max_offset_seconds + ) + + # Override output_dir if provided via CLI + if output_dir is not None: + recording_config.output_dir = output_dir + + if index is not None: + camera_index = index + else: + # Get available cameras and prompt for selection + cameras = list_available_cameras() + if not cameras: + raise click.ClickException("No cameras found. Please connect a camera and try again.") + + click.echo("Available cameras:") + for idx, info in cameras.items(): + click.echo(f" {format_camera_info(idx, info)}") + + selected_index = click.prompt( + "Select camera index", + type=click.Choice([str(idx) for idx in cameras], case_sensitive=False), + default=str(min(cameras.keys())), + ) + camera_index = int(selected_index) + + # Compute binary export path if requested + if binary_export: + import time as _time + + binary_output = Path(recording_config.output_dir) / f"{int(_time.time())}.npz" + else: + binary_output = None + + behavior_cam = BehaviorCam(recording_config=recording_config, camera_index=camera_index) + try: + behavior_cam.capture(show_video=not no_display, capture_binary=binary_output) + except Exception as e: + click.echo(f"Error recording video: {e}", err=True) + raise click.ClickException(f"Error recording video: {e}") from e + + +@usbcam.command() +@click.option( + "-c", + "--config", + required=True, + type=ConfigIDOrPath(), + help="Either a config `id` or a path to USB camera config YAML file.", +) +@click.option( + "-s", + "--source", + required=True, + help="Path to .npz file with recorded frames", + type=click.Path(exists=True), +) +@click.option( + "-b", + "--binary_export", + is_flag=True, + help="Save raw frames to a .npz file alongside the video", +) +@click.option("--no-display", is_flag=True, help="Don't show video preview in real time") +@click.pass_context +def test( + ctx: click.Context, config: str, source: str, binary_export: bool, no_display: bool +) -> None: + """ + Run BehaviorCam in testing mode, using USBCamMock rather than the actual device. + """ + os.environ["BEHAVIORCAM_MOCKRUN"] = "just_placeholder" + os.environ["PYTEST_USBCAM_DATA_FILE"] = str(source) + + ctx.invoke( + record, + config=config, + output_dir=None, + index=0, + binary_export=binary_export, + no_display=no_display, + ) diff --git a/mio/data/config/camera/elp-camera-skvideo.yaml b/mio/data/config/camera/elp-camera-skvideo.yaml new file mode 100644 index 00000000..564be638 --- /dev/null +++ b/mio/data/config/camera/elp-camera-skvideo.yaml @@ -0,0 +1,12 @@ +id: elp-camera-skvideo +mio_model: mio.models.usbcam.USBCameraRecordingConfig +mio_version: 0.8.2.dev13+g338e847.d20251030 +output_dir: user_data/recordings +frame_width: 1280 +frame_height: 720 +fps: 20 +codec: libx264 +pix_fmt: yuv420p +ntp_server: mio-ntp.local +ntp_max_offset_seconds: 0.01 +backend: skvideo diff --git a/mio/data/config/camera/elp-camera.yaml b/mio/data/config/camera/elp-camera.yaml new file mode 100644 index 00000000..39faf07c --- /dev/null +++ b/mio/data/config/camera/elp-camera.yaml @@ -0,0 +1,12 @@ +id: elp-camera +mio_model: mio.models.usbcam.USBCameraRecordingConfig +mio_version: 0.8.2.dev13+g338e847.d20251030 +output_dir: user_data/recordings +frame_width: 1280 +frame_height: 720 +fps: 20 +codec: h264 +pix_fmt: yuv420p +ntp_server: mio-ntp.local +ntp_max_offset_seconds: 0.01 +backend: cv2 \ No newline at end of file diff --git a/mio/data/config/camera/usbcam_mbp.yaml b/mio/data/config/camera/usbcam_mbp.yaml new file mode 100644 index 00000000..5d3ba66e --- /dev/null +++ b/mio/data/config/camera/usbcam_mbp.yaml @@ -0,0 +1,10 @@ +id: default-camera +mio_model: mio.models.usbcam.USBCameraRecordingConfig +mio_version: 0.8.2.dev13+g338e847.d20251030 +camera_index: 0 +output_dir: recordings +frame_width: 1920 +frame_height: 1080 +fps: 30 +codec: libx264 +pix_fmt: yuv420p diff --git a/mio/devices/mocks.py b/mio/devices/mocks.py index 3cdcb96d..757b42c8 100644 --- a/mio/devices/mocks.py +++ b/mio/devices/mocks.py @@ -11,14 +11,17 @@ import os import sys +import time from pathlib import Path -from typing import Dict, Optional +from typing import Dict, Optional, Tuple if sys.version_info < (3, 11): from typing_extensions import Self else: from typing import Self +import numpy as np + from mio.exceptions import EndOfRecordingException @@ -97,3 +100,75 @@ def __next__(self) -> bytes: return self.read_data() except EndOfRecordingException as e: raise StopIteration() from e + + +class USBCamMock: + """ + Mock class for :class:`cv2.VideoCapture`. + + Replays frames from a ``.npz`` file recorded with ``mio usbcam record --binary_export`` + with keys ``frames`` (N, H, W, C) uint8 and ``timestamps`` (N,) float64. + + Set as class variable so that it can be monkeypatched in tests that + require different source data files. + + Can be set using the ``PYTEST_USBCAM_DATA_FILE`` environment variable if + this mock is to be used within a separate process. + """ + + DATA_FILE: Optional[Path] = None + REALTIME: bool = False + """If True, sleep between frames to match recorded timestamps. + + Can also be set via ``PYTEST_USBCAM_REALTIME=1`` env var for multiprocessing. + """ + + def __init__(self) -> None: + if self.DATA_FILE is None: + if os.environ.get("PYTEST_USBCAM_DATA_FILE") is not None: + # need to get file from env variables here because on some platforms + # the default method for creating a new process is "spawn" which creates + # an entirely new python session instead of "fork" which would preserve + # the classvar + data_file: str = os.environ.get("PYTEST_USBCAM_DATA_FILE") # type: ignore + self.DATA_FILE = Path(data_file) + USBCamMock.DATA_FILE = Path(data_file) + else: + raise RuntimeError("DATA_FILE class attr must be set before using USBCamMock") + + if not self.REALTIME and os.environ.get("PYTEST_USBCAM_REALTIME"): + self.REALTIME = True # Not sure why this is needed but following the OKDevMock for now + USBCamMock.REALTIME = True + + data = np.load(self.DATA_FILE) + self._frames = data["frames"] + self._timestamps = data["timestamps"] + self._position = 0 + self._opened = True + self._props: Dict[int, float] = {} + + def isOpened(self) -> bool: # noqa: N802 - match cv2.VideoCapture API + return self._opened + + def read(self) -> Tuple[bool, Optional[np.ndarray]]: + if self._position >= len(self._frames): + raise EndOfRecordingException("End of recorded frames") + + if self.REALTIME and self._position > 0: + dt = self._timestamps[self._position] - self._timestamps[self._position - 1] + if dt > 0: + time.sleep(dt) + + frame = self._frames[self._position] + self._position += 1 + return True, frame + + def set(self, prop: int, value: float) -> bool: + self._props[prop] = value + return True + + def get(self, prop: int) -> float: + return self._props.get(prop, 0.0) + + def release(self) -> None: + self._opened = False diff --git a/mio/devices/usbcam.py b/mio/devices/usbcam.py new file mode 100644 index 00000000..55b4e297 --- /dev/null +++ b/mio/devices/usbcam.py @@ -0,0 +1,175 @@ +""" +USB Camera device helper functions. +""" + +import os +import time +from typing import TYPE_CHECKING, Dict, TypedDict, Union + +import cv2 +import numpy as np + +from mio.logging import init_logger +from mio.models.usbcam import Codec + +if TYPE_CHECKING: + from mio.devices.mocks import USBCamMock + +logger = init_logger("usbcam") + +# Constants +MAX_CAMERA_INDEX = 5 +CAMERA_INIT_DELAY_SECONDS = 0.1 # Delay after setting camera properties before reading +CAMERA_INIT_RETRY_ATTEMPTS = 3 # Number of retry attempts when reading initial frame + +# Mapping from capture format names to FourCC codes (FourCC requires exactly 4 characters) +CAPTURE_FORMAT_FOURCC: Dict[str, str] = { + "MJPEG": "MJPG", + "YUY2": "YUY2", +} + + +class CameraInfo(TypedDict): + """Camera information from OpenCV discovery.""" + + name: str + resolution: str + fps: int + + +def convert_frame_for_codec(frame: np.ndarray, codec: Codec) -> np.ndarray: + """ + Convert frame color space based on codec requirements. + + Args: + frame: Input frame (BGR from OpenCV) + codec: Video codec for output encoding + + Returns: + Converted frame ready for video writer + """ + if codec == "rawvideo": + # Rawvideo expects grayscale + if len(frame.shape) == 3: + return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + else: + return frame + else: + # Other codecs expect RGB + if len(frame.shape) == 3: + return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + else: + raise ValueError( + f"Expected BGR (3-channel) frame for codec '{codec}', " + f"got shape {frame.shape}. Use 'rawvideo' for grayscale." + ) + + +def open_camera( + camera_index: int, + frame_width: int, + frame_height: int, + fps: int, + capture_format: str = "MJPEG", +) -> Union[cv2.VideoCapture, "USBCamMock"]: + """ + Open and configure a camera with the specified settings. + + In test/mock mode (``PYTEST_CURRENT_TEST`` or ``BEHAVIORCAM_MOCKRUN`` env var set), + returns a :class:`~mio.devices.mocks.USBCamMock` instead of a real camera. + + Args: + camera_index: Index of the camera to open + frame_width: Desired frame width + frame_height: Desired frame height + fps: Desired frames per second + capture_format: Camera capture format (e.g., "MJPEG", "YUY2") + + Returns: + Configured VideoCapture object (or USBCamMock in test mode) + + Raises: + RuntimeError: If camera cannot be opened or cannot read frames + """ + if os.environ.get("PYTEST_CURRENT_TEST") or os.environ.get("BEHAVIORCAM_MOCKRUN"): + from mio.devices.mocks import USBCamMock + + return USBCamMock() + + cap = cv2.VideoCapture(camera_index) + if not cap.isOpened(): + raise RuntimeError(f"Failed to open camera at index {camera_index}") + + fourcc_code = CAPTURE_FORMAT_FOURCC[capture_format] + fourcc = cv2.VideoWriter_fourcc(*fourcc_code) + cap.set(cv2.CAP_PROP_FOURCC, fourcc) + cap.set(cv2.CAP_PROP_FRAME_WIDTH, frame_width) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, frame_height) + cap.set(cv2.CAP_PROP_FPS, fps) + + # Give camera time to initialize after setting properties + time.sleep(CAMERA_INIT_DELAY_SECONDS) + + # Verify camera is working by reading a test frame + # Retry a few times as some cameras need a moment to start + ret = False + for _ in range(CAMERA_INIT_RETRY_ATTEMPTS): + ret, _ = cap.read() + if ret: + break + time.sleep(CAMERA_INIT_DELAY_SECONDS) + + if not ret: + cap.release() + raise RuntimeError( + f"Camera at index {camera_index} opened but could not read initial frame " + f"after {CAMERA_INIT_RETRY_ATTEMPTS} attempts. " + "The camera may be in use by another application." + ) + + return cap + + +def format_camera_info(idx: int, info: CameraInfo) -> str: + """ + Format camera information for display. + + Args: + idx: Camera index + info: Camera info from discovery + + Returns: + Formatted string for display + """ + return f"[{idx}] {info['name']} - {info['resolution']} @ {info['fps']} fps" + + +def list_cameras() -> Dict[int, CameraInfo]: + """ + List available cameras with name, resolution, and fps. + + Returns: + Dictionary mapping camera index (0, 1, 2...) to camera info. + """ + available_cameras: Dict[int, CameraInfo] = {} + + logger.info(f"Scanning for cameras (indices 0-{MAX_CAMERA_INDEX - 1})...") + for i in range(MAX_CAMERA_INDEX): + cap = cv2.VideoCapture(i) + if cap.isOpened(): + ret, frame = cap.read() + if ret: + resolution = f"{frame.shape[1]}x{frame.shape[0]}" + fps = int(cap.get(cv2.CAP_PROP_FPS)) + available_cameras[i] = { + "name": f"Camera {i}", + "resolution": resolution, + "fps": fps, + } + else: + logger.debug(f"Camera at index {i} opened but failed to read frame") + cap.release() + else: + logger.debug(f"No camera found at index {i}") + + return available_cameras diff --git a/mio/io.py b/mio/io.py index e2f97f5f..d8942ebc 100644 --- a/mio/io.py +++ b/mio/io.py @@ -22,7 +22,7 @@ class VideoWriter: """ - Write data to a video file using FFMpegWriter. + Write data to a video file using FFMpegWriter (default) or cv2.VideoWriter. """ DEFAULT_OUTPUT = { @@ -37,17 +37,43 @@ def __init__( path: Union[str, Path], fps: int, output_dict: Union[dict, None] = None, + backend: Literal["skvideo", "cv2"] = "skvideo", ): """ Initialize the VideoWriter object. - """ - if output_dict is None: - output_dict = {} - output_dict = {**self.DEFAULT_OUTPUT, **output_dict} - - input_dict = {"-framerate": str(fps)} - self.writer = FFmpegWriter(filename=str(path), inputdict=input_dict, outputdict=output_dict) + Args: + path: Output video file path + fps: Frames per second + output_dict: FFmpeg output options (for skvideo backend) + backend: Backend to use - "skvideo" (default) or "cv2" + """ + self.path = Path(path) + self.fps = fps + self.backend = backend + + if backend == "cv2": + # cv2 backend - will initialize on first frame + # Map codec to fourcc + codec = output_dict.get("-vcodec", "mjpeg") if output_dict else "mjpeg" + fourcc_map = { + "mjpeg": "MJPG", + "libx264": "mp4v", + "h264": "mp4v", + "rawvideo": "GREY", + } + self.fourcc = fourcc_map.get(codec.lower(), "MJPG") + self.writer = None # Initialize on first frame + self.frame_shape = None + else: + # skvideo backend (default) + if output_dict is None: + output_dict = {} + output_dict = {**self.DEFAULT_OUTPUT, **output_dict} + input_dict = {"-framerate": str(fps)} + self.writer = FFmpegWriter( + filename=str(path), inputdict=input_dict, outputdict=output_dict + ) def write_frame(self, frame: np.ndarray) -> bool: """ @@ -59,14 +85,37 @@ def write_frame(self, frame: np.ndarray) -> bool: Returns: bool: True if the frame write was attempted and did not raise anything. """ - self.writer.writeFrame(frame) + if self.backend == "cv2": + if self.writer is None: + # Initialize cv2.VideoWriter on first frame + height, width = frame.shape[:2] + is_color = len(frame.shape) == 3 + self.writer = cv2.VideoWriter( + str(self.path), + cv2.VideoWriter_fourcc(*self.fourcc), + self.fps, + (width, height), + isColor=is_color, + ) + if not self.writer.isOpened(): + raise RuntimeError(f"Failed to open cv2.VideoWriter for {self.path}") + # cv2.VideoWriter expects BGR, but frames from convert_frame_for_codec are RGB + if len(frame.shape) == 3 and frame.shape[2] == 3: + frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) + self.writer.write(frame) + else: + self.writer.writeFrame(frame) return True def close(self) -> None: """ Close the video file. """ - self.writer.close() + if self.backend == "cv2": + if self.writer is not None: + self.writer.release() + else: + self.writer.close() class VideoReader: diff --git a/mio/models/usbcam.py b/mio/models/usbcam.py new file mode 100644 index 00000000..2434b59c --- /dev/null +++ b/mio/models/usbcam.py @@ -0,0 +1,60 @@ +""" +Models for USB camera recording configuration. +""" + +from typing import Literal, Optional + +from pydantic import Field + +from mio.models import MiniscopeConfig +from mio.models.mixins import ConfigYAMLMixin + +Codec = Literal["mjpeg", "libx264", "h264", "rawvideo"] + + +class USBCameraRecordingConfig(MiniscopeConfig, ConfigYAMLMixin): + """ + Configuration for recording video from USB camera. + """ + + output_dir: str = Field( + default="recordings", description="Directory to save the recorded video." + ) + frame_width: int = Field(default=1920, description="Width of the recorded video.") + frame_height: int = Field(default=1080, description="Height of the recorded video.") + fps: int = Field(default=20, description="Frames per second of the recorded video.") + format: Literal["MJPEG", "YUY2"] = Field( + default="MJPEG", + description="Video format for camera capture. " + "Note: Output video encoding is handled by VideoWriter.", + ) + codec: Codec = Field( + default="libx264", + description=( + "Video codec for output file. " + "Used by skvideo backend, mapped to fourcc for cv2 backend." + ), + ) + pix_fmt: Optional[str] = Field( + default="yuv420p", + description=( + "Pixel format for video encoding (e.g., yuvj420p, yuv420p, gray). " + "Only used by skvideo backend, ignored by cv2 backend." + ), + ) + backend: Literal["skvideo", "cv2"] = Field( + default="skvideo", + description=( + "Video writer backend: 'skvideo' uses FFmpegWriter, " "'cv2' uses cv2.VideoWriter." + ), + ) + ntp_server: Optional[str] = Field( + default=None, + description="NTP server address for time synchronization check. " + "If specified, the system time will be verified against this server before capture.", + ) + ntp_max_offset_seconds: float = Field( + default=0.01, + description="Maximum allowed time offset in seconds for NTP synchronization check " + "(default: 0.01 = 10ms).", + ) diff --git a/mio/stream_daq.py b/mio/stream_daq.py index c894e489..e611f897 100644 --- a/mio/stream_daq.py +++ b/mio/stream_daq.py @@ -10,7 +10,7 @@ import time from collections.abc import Iterator from pathlib import Path -from typing import Any, Callable, Generator, List, Literal, Optional, Tuple, Union +from typing import Generator, List, Literal, Optional, Tuple, Union import cv2 import numpy as np @@ -31,6 +31,7 @@ from mio.plots.headers import StreamPlotter from mio.process.frame_helper import FrequencyMaskHelper from mio.types import ConfigSource +from mio.utils import exact_iter HAVE_OK = False ok_error = None @@ -45,19 +46,6 @@ pass # okDev stays None; error raised when actually trying to use FPGA -def exact_iter(f: Callable, sentinel: Any) -> Generator[Any, None, None]: - """ - A version of :func:`iter` that compares with `is` rather than `==` - because truth value of numpy arrays is ambiguous. - """ - while True: - val = f() - if val is sentinel: - break - else: - yield val - - class StreamDaq: """ A combined class for configuring and reading frames from a UART and FPGA source. diff --git a/mio/utils.py b/mio/utils.py index abc05ff1..9e717410 100644 --- a/mio/utils.py +++ b/mio/utils.py @@ -3,14 +3,37 @@ """ import hashlib +from collections.abc import Callable from pathlib import Path -from typing import TYPE_CHECKING, Union +from typing import TYPE_CHECKING, Generator, TypeVar, Union import cv2 if TYPE_CHECKING: pass +T = TypeVar("T") + + +def exact_iter(f: Callable[[], T], sentinel: T) -> Generator[T, None, None]: + """ + A version of :func:`iter` that compares with `is` rather than `==` + because truth value of numpy arrays is ambiguous. + + Args: + f: Function to call repeatedly + sentinel: Sentinel value to stop iteration when `f()` returns this (compared with `is`) + + Yields: + Values from `f()` until sentinel is encountered + """ + while True: + val = f() + if val is sentinel: + break + else: + yield val + def hash_file(path: Union[Path, str]) -> str: """ diff --git a/pdm.lock b/pdm.lock index 71595252..c0960199 100644 --- a/pdm.lock +++ b/pdm.lock @@ -813,6 +813,22 @@ files = [ {file = "coverage-7.10.7.tar.gz", hash = "sha256:f4ab143ab113be368a3e9b795f9cd7906c5ef407d6173fe9675a902e1fffc239"}, ] +[[package]] +name = "cv2-enumerate-cameras" +version = "1.3.0" +requires_python = ">=3.6" +summary = "Enumerate / List / Find / Detect / Search index for opencv VideoCapture." +groups = ["default"] +dependencies = [ + "pyobjc-framework-AVFoundation; platform_system == \"Darwin\"", +] +files = [ + {file = "cv2_enumerate_cameras-1.3.0-cp32-abi3-win32.whl", hash = "sha256:bd24be3c2df42b88da59e9f6029b97484fc8468678a2be3900aa717835c40f6a"}, + {file = "cv2_enumerate_cameras-1.3.0-cp32-abi3-win_amd64.whl", hash = "sha256:ef892cc7d92dec3ad0f696fa592a9d5461e593698048f84e877353cffcfc486f"}, + {file = "cv2_enumerate_cameras-1.3.0-py3-none-any.whl", hash = "sha256:7a67d8cc98fd112c3df51ff699c6953884e96088ced2ee05191bdeaaa2e1e847"}, + {file = "cv2_enumerate_cameras-1.3.0.tar.gz", hash = "sha256:795d8005b0e92ca117e1b8c4849c752edb39b7e2a1818b2d7ac4463aa26caac1"}, +] + [[package]] name = "cycler" version = "0.12.1" @@ -2136,6 +2152,147 @@ files = [ {file = "pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887"}, ] +[[package]] +name = "pyobjc-core" +version = "12.1" +requires_python = ">=3.10" +summary = "Python<->ObjC Interoperability Module" +groups = ["default"] +marker = "platform_system == \"Darwin\" and python_version ~= \"3.10\"" +files = [ + {file = "pyobjc_core-12.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:93418e79c1655f66b4352168f8c85c942707cb1d3ea13a1da3e6f6a143bacda7"}, + {file = "pyobjc_core-12.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:c918ebca280925e7fcb14c5c43ce12dcb9574a33cccb889be7c8c17f3bcce8b6"}, + {file = "pyobjc_core-12.1-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:818bcc6723561f207e5b5453efe9703f34bc8781d11ce9b8be286bb415eb4962"}, + {file = "pyobjc_core-12.1-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:01c0cf500596f03e21c23aef9b5f326b9fb1f8f118cf0d8b66749b6cf4cbb37a"}, + {file = "pyobjc_core-12.1-cp313-cp313t-macosx_10_13_universal2.whl", hash = "sha256:177aaca84bb369a483e4961186704f64b2697708046745f8167e818d968c88fc"}, + {file = "pyobjc_core-12.1-cp314-cp314-macosx_10_15_universal2.whl", hash = "sha256:844515f5d86395b979d02152576e7dee9cc679acc0b32dc626ef5bda315eaa43"}, + {file = "pyobjc_core-12.1-cp314-cp314t-macosx_10_15_universal2.whl", hash = "sha256:453b191df1a4b80e756445b935491b974714456ae2cbae816840bd96f86db882"}, + {file = "pyobjc_core-12.1.tar.gz", hash = "sha256:2bb3903f5387f72422145e1466b3ac3f7f0ef2e9960afa9bcd8961c5cbf8bd21"}, +] + +[[package]] +name = "pyobjc-framework-avfoundation" +version = "12.1" +requires_python = ">=3.10" +summary = "Wrappers for the framework AVFoundation on macOS" +groups = ["default"] +marker = "platform_system == \"Darwin\" and python_version ~= \"3.10\"" +dependencies = [ + "pyobjc-core>=12.1", + "pyobjc-framework-Cocoa>=12.1", + "pyobjc-framework-CoreAudio>=12.1", + "pyobjc-framework-CoreMedia>=12.1", + "pyobjc-framework-Quartz>=12.1", +] +files = [ + {file = "pyobjc_framework_avfoundation-12.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:370d5f1149c1041028cb1f5fb61b9f56655fe53bbffafc79393b0824a474bef0"}, + {file = "pyobjc_framework_avfoundation-12.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:82cc2c2d9ab6cc04feeb4700ff251d00f1fcafff573c63d4e87168ff80adb926"}, + {file = "pyobjc_framework_avfoundation-12.1-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:bf634f89265b4d93126153200d885b6de4859ed6b3bc65e69ff75540bc398406"}, + {file = "pyobjc_framework_avfoundation-12.1-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:f8ac7f7e0884ac8f12009cdb9d4fefc2f269294ab2ccfd84520a560859b69cec"}, + {file = "pyobjc_framework_avfoundation-12.1-cp313-cp313t-macosx_10_13_universal2.whl", hash = "sha256:51aba2c6816badfb1fb5a2de1b68b33a23f065bf9e3b99d46ede0c8c774ac7a4"}, + {file = "pyobjc_framework_avfoundation-12.1-cp314-cp314-macosx_10_15_universal2.whl", hash = "sha256:9a3ffd1ae90bd72dbcf2875aa9254369e805b904140362a7338ebf1af54201a6"}, + {file = "pyobjc_framework_avfoundation-12.1-cp314-cp314t-macosx_10_15_universal2.whl", hash = "sha256:394c99876b9a38db4851ddf8146db363556895c12e9c711ccd3c3f907ac8e273"}, + {file = "pyobjc_framework_avfoundation-12.1.tar.gz", hash = "sha256:eda0bb60be380f9ba2344600c4231dd58a3efafa99fdc65d3673ecfbb83f6fcb"}, +] + +[[package]] +name = "pyobjc-framework-avfoundation" +version = "12.0" +requires_python = ">=3.9" +summary = "" +groups = ["default"] +marker = "python_version < \"3.10\" and python_version >= \"3.9\" and platform_system == \"Darwin\"" +files = [ + {file = "pyobjc_framework_avfoundation-12.0.tar.gz", hash = "sha256:e9e9a15edea43341b39de677a58ac98b2a6bd4d6c55176b4804c5f75b3d20ece"}, +] + +[[package]] +name = "pyobjc-framework-cocoa" +version = "12.1" +requires_python = ">=3.10" +summary = "Wrappers for the Cocoa frameworks on macOS" +groups = ["default"] +marker = "platform_system == \"Darwin\" and python_version ~= \"3.10\"" +dependencies = [ + "pyobjc-core>=12.1", +] +files = [ + {file = "pyobjc_framework_cocoa-12.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:9b880d3bdcd102809d704b6d8e14e31611443aa892d9f60e8491e457182fdd48"}, + {file = "pyobjc_framework_cocoa-12.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:f52228bcf38da64b77328787967d464e28b981492b33a7675585141e1b0a01e6"}, + {file = "pyobjc_framework_cocoa-12.1-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:547c182837214b7ec4796dac5aee3aa25abc665757b75d7f44f83c994bcb0858"}, + {file = "pyobjc_framework_cocoa-12.1-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:5a3dcd491cacc2f5a197142b3c556d8aafa3963011110102a093349017705118"}, + {file = "pyobjc_framework_cocoa-12.1-cp313-cp313t-macosx_10_13_universal2.whl", hash = "sha256:914b74328c22d8ca261d78c23ef2befc29776e0b85555973927b338c5734ca44"}, + {file = "pyobjc_framework_cocoa-12.1-cp314-cp314-macosx_10_15_universal2.whl", hash = "sha256:03342a60fc0015bcdf9b93ac0b4f457d3938e9ef761b28df9564c91a14f0129a"}, + {file = "pyobjc_framework_cocoa-12.1-cp314-cp314t-macosx_10_15_universal2.whl", hash = "sha256:6ba1dc1bfa4da42d04e93d2363491275fb2e2be5c20790e561c8a9e09b8cf2cc"}, + {file = "pyobjc_framework_cocoa-12.1.tar.gz", hash = "sha256:5556c87db95711b985d5efdaaf01c917ddd41d148b1e52a0c66b1a2e2c5c1640"}, +] + +[[package]] +name = "pyobjc-framework-coreaudio" +version = "12.1" +requires_python = ">=3.10" +summary = "Wrappers for the framework CoreAudio on macOS" +groups = ["default"] +marker = "platform_system == \"Darwin\" and python_version ~= \"3.10\"" +dependencies = [ + "pyobjc-core>=12.1", + "pyobjc-framework-Cocoa>=12.1", +] +files = [ + {file = "pyobjc_framework_coreaudio-12.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:d676c85bb9dc51217d94adc3b5b60e7e5b59a81167446f06821b2687d92641d3"}, + {file = "pyobjc_framework_coreaudio-12.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:a452de6b509fa4a20160c0410b72330ac871696cd80237883955a5b3a4de8f2a"}, + {file = "pyobjc_framework_coreaudio-12.1-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:a5ad6309779663f846ab36fe6c49647e470b7e08473c3e48b4f004017bdb68a4"}, + {file = "pyobjc_framework_coreaudio-12.1-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:3d8ef424850c8ae2146f963afaec6c4f5bf0c2e412871e68fb6ecfb209b8376f"}, + {file = "pyobjc_framework_coreaudio-12.1-cp313-cp313t-macosx_10_13_universal2.whl", hash = "sha256:6552624df39dbc68ff9328f244ba56f59234ecbde8455db1e617a71bc4f3dd3a"}, + {file = "pyobjc_framework_coreaudio-12.1-cp314-cp314-macosx_10_15_universal2.whl", hash = "sha256:78ea67483a5deb21625c189328152008d278fe1da4304da9fcc1babd12627038"}, + {file = "pyobjc_framework_coreaudio-12.1-cp314-cp314t-macosx_10_15_universal2.whl", hash = "sha256:8d81b0d0296ab4571a4ff302e5cdb52386e486eb8749e99b95b9141438558ca2"}, + {file = "pyobjc_framework_coreaudio-12.1.tar.gz", hash = "sha256:a9e72925fcc1795430496ce0bffd4ddaa92c22460a10308a7283ade830089fe1"}, +] + +[[package]] +name = "pyobjc-framework-coremedia" +version = "12.1" +requires_python = ">=3.10" +summary = "Wrappers for the framework CoreMedia on macOS" +groups = ["default"] +marker = "platform_system == \"Darwin\" and python_version ~= \"3.10\"" +dependencies = [ + "pyobjc-core>=12.1", + "pyobjc-framework-Cocoa>=12.1", +] +files = [ + {file = "pyobjc_framework_coremedia-12.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:4fce8570db3eaa7b841456a7890b24546504d1059157dc33e700b14d9d3073d8"}, + {file = "pyobjc_framework_coremedia-12.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:ee7b822c9bb674b5b0a70bfb133410acae354e9241b6983f075395f3562f3c46"}, + {file = "pyobjc_framework_coremedia-12.1-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:161a627f5c8cd30a5ebb935189f740e21e6cd94871a9afd463efdb5d51e255fa"}, + {file = "pyobjc_framework_coremedia-12.1-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:98e885b7a092083fceaef0a7fc406a01ba7bcd3318fb927e59e055931c99cac8"}, + {file = "pyobjc_framework_coremedia-12.1-cp313-cp313t-macosx_10_13_universal2.whl", hash = "sha256:d2b84149c1b3e65ec9050a3e5b617e6c0b4cdad2ab622c2d8c5747a20f013e16"}, + {file = "pyobjc_framework_coremedia-12.1-cp314-cp314-macosx_10_15_universal2.whl", hash = "sha256:737ec6e0b63414f42f7188030c85975d6d2124fbf6b15b52c99b6cc20250af4d"}, + {file = "pyobjc_framework_coremedia-12.1-cp314-cp314t-macosx_10_15_universal2.whl", hash = "sha256:6a9419e0d143df16a1562520a13a389417386e2a53031530af6da60c34058ced"}, + {file = "pyobjc_framework_coremedia-12.1.tar.gz", hash = "sha256:166c66a9c01e7a70103f3ca44c571431d124b9070612ef63a1511a4e6d9d84a7"}, +] + +[[package]] +name = "pyobjc-framework-quartz" +version = "12.1" +requires_python = ">=3.10" +summary = "Wrappers for the Quartz frameworks on macOS" +groups = ["default"] +marker = "platform_system == \"Darwin\" and python_version ~= \"3.10\"" +dependencies = [ + "pyobjc-core>=12.1", + "pyobjc-framework-Cocoa>=12.1", +] +files = [ + {file = "pyobjc_framework_quartz-12.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:c6f312ae79ef8b3019dcf4b3374c52035c7c7bc4a09a1748b61b041bb685a0ed"}, + {file = "pyobjc_framework_quartz-12.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:19f99ac49a0b15dd892e155644fe80242d741411a9ed9c119b18b7466048625a"}, + {file = "pyobjc_framework_quartz-12.1-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:7730cdce46c7e985535b5a42c31381af4aa6556e5642dc55b5e6597595e57a16"}, + {file = "pyobjc_framework_quartz-12.1-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:629b7971b1b43a11617f1460cd218bd308dfea247cd4ee3842eb40ca6f588860"}, + {file = "pyobjc_framework_quartz-12.1-cp313-cp313t-macosx_10_13_universal2.whl", hash = "sha256:53b84e880c358ba1ddcd7e8d5ea0407d760eca58b96f0d344829162cda5f37b3"}, + {file = "pyobjc_framework_quartz-12.1-cp314-cp314-macosx_10_15_universal2.whl", hash = "sha256:42d306b07f05ae7d155984503e0fb1b701fecd31dcc5c79fe8ab9790ff7e0de0"}, + {file = "pyobjc_framework_quartz-12.1-cp314-cp314t-macosx_10_15_universal2.whl", hash = "sha256:0cc08fddb339b2760df60dea1057453557588908e42bdc62184b6396ce2d6e9a"}, + {file = "pyobjc_framework_quartz-12.1.tar.gz", hash = "sha256:27f782f3513ac88ec9b6c82d9767eef95a5cf4175ce88a1e5a65875fee799608"}, +] + [[package]] name = "pyparsing" version = "3.2.5" diff --git a/tests/conftest.py b/tests/conftest.py index b5430eb3..a86703a1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -63,6 +63,25 @@ def _set_okdev_input(file: Union[str, Path]): return _set_okdev_input +@pytest.fixture() +def set_usbcam_input(monkeypatch): + """ + closure fixture to set the environment variable used by BehaviorCam to set the + USBCamMock data source + """ + + def _set_usbcam_input(file: Union[str, Path], realtime: bool = False): + from mio.devices.mocks import USBCamMock + + monkeypatch.setattr(USBCamMock, "DATA_FILE", file) + monkeypatch.setattr(USBCamMock, "REALTIME", realtime) + os.environ["PYTEST_USBCAM_DATA_FILE"] = str(file) + if realtime: + os.environ["PYTEST_USBCAM_REALTIME"] = "1" + + return _set_usbcam_input + + @pytest.fixture() def config_override(tmp_path) -> Callable[[Path, dict], Path]: """ diff --git a/tests/test_behavior_cam.py b/tests/test_behavior_cam.py new file mode 100644 index 00000000..8e03c347 --- /dev/null +++ b/tests/test_behavior_cam.py @@ -0,0 +1,300 @@ +import csv +from pathlib import Path + +import cv2 +import numpy as np +import pytest + +from mio.behavior_cam import BehaviorCam +from mio.devices.usbcam import convert_frame_for_codec, format_camera_info +from mio.io import VideoReader, VideoWriter +from mio.models.usbcam import USBCameraRecordingConfig + +NUM_TEST_FRAMES = 10 +TEST_WIDTH = 1280 +TEST_HEIGHT = 720 +TEST_FPS = 20 +STRESS_NUM_FRAMES = 100 + + + +def _make_npz(path: Path, num_frames: int = NUM_TEST_FRAMES, fps: float = TEST_FPS) -> Path: + """Generate a synthetic .npz matching elp-camera config.""" + frames = np.random.default_rng().integers( + 0, 255, size=(num_frames, TEST_HEIGHT, TEST_WIDTH, 3), dtype=np.uint8 + ) + timestamps = np.arange(num_frames, dtype=np.float64) / fps + np.savez(path, frames=frames, timestamps=timestamps) + return path + + +def test_capture_binary_export(set_usbcam_input, tmp_path): + """Test that capture_binary saves raw frames to .npz.""" + config = USBCameraRecordingConfig.from_id("elp-camera") + config.output_dir = str(tmp_path) + config.ntp_server = None + + npz_path = _make_npz(tmp_path / "test_input.npz") + set_usbcam_input(npz_path) + + binary_output = tmp_path / "export.npz" + behavior_cam = BehaviorCam(recording_config=config, camera_index=0) + behavior_cam.capture(show_video=False, capture_binary=binary_output) + + assert binary_output.exists() + + data = np.load(binary_output) + assert "frames" in data + assert "timestamps" in data + assert data["frames"].shape[0] == NUM_TEST_FRAMES + assert data["timestamps"].shape[0] == NUM_TEST_FRAMES + +def test_videowriter_close_writes_moov_atom(tmp_path): + """Verify VideoWriter.close() produces a valid mp4 with moov atom. + + Without proper close (stdin.close + wait), FFmpeg may exit before + writing the moov atom, making the file unreadable. + """ + video_path = tmp_path / "test.mp4" + writer = VideoWriter( + path=video_path, + fps=20, + output_dict={"-vcodec": "libx264", "-f": "mp4", "-pix_fmt": "yuv420p", "-vsync": "0"}, + backend="skvideo", + ) + + frames = np.random.default_rng().integers( + 0, 255, size=(NUM_TEST_FRAMES, TEST_HEIGHT, TEST_WIDTH, 3), dtype=np.uint8 + ) + for frame in frames: + writer.write_frame(frame) + + writer.close() + + # If moov atom is missing, VideoCapture will fail to open or report 0 frames + cap = cv2.VideoCapture(str(video_path)) + assert cap.isOpened(), "Failed to open video — moov atom likely missing" + frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + cap.release() + assert frame_count == NUM_TEST_FRAMES, ( + f"Expected {NUM_TEST_FRAMES} frames, got {frame_count}" + ) + +def test_format_camera_info(): + info = {"name": "Camera 0", "resolution": "1280x720", "fps": 30} + result = format_camera_info(0, info) + assert "[0]" in result + assert "1280x720" in result + assert "30 fps" in result + +def test_usbcam_mock_read_all_frames(set_usbcam_input, tmp_path): + """USBCamMock should yield all frames then raise EndOfRecordingException.""" + from mio.devices.mocks import USBCamMock + from mio.exceptions import EndOfRecordingException + + npz_path = _make_npz(tmp_path / "mock.npz", num_frames=5) + set_usbcam_input(npz_path) + + mock = USBCamMock() + assert mock.isOpened() + + frames_read = 0 + while True: + try: + ret, frame = mock.read() + assert ret is True + assert frame.shape == (TEST_HEIGHT, TEST_WIDTH, 3) + frames_read += 1 + except EndOfRecordingException: + break + + assert frames_read == 5 + +def test_usbcam_mock_set_get(tmp_path): + """USBCamMock.set()/get() should store and retrieve properties.""" + from mio.devices.mocks import USBCamMock + + # Need DATA_FILE set; use a minimal npz + npz_path = tmp_path / "mock_props.npz" + np.savez(npz_path, frames=np.zeros((1, 2, 2, 3), dtype=np.uint8), timestamps=np.array([0.0])) + USBCamMock.DATA_FILE = npz_path + mock = USBCamMock() + + assert mock.get(cv2.CAP_PROP_FPS) == 0.0 # default + mock.set(cv2.CAP_PROP_FPS, 30.0) + assert mock.get(cv2.CAP_PROP_FPS) == 30.0 + +def test_usbcam_mock_release(set_usbcam_input, tmp_path): + """USBCamMock.release() should mark camera as closed.""" + from mio.devices.mocks import USBCamMock + + npz_path = _make_npz(tmp_path / "mock.npz", num_frames=1) + set_usbcam_input(npz_path) + + mock = USBCamMock() + assert mock.isOpened() + mock.release() + assert not mock.isOpened() + +def test_csv_structure(set_usbcam_input, tmp_path): + """Verify CSV has correct columns and monotonically increasing frame indices.""" + config = USBCameraRecordingConfig.from_id("elp-camera") + config.output_dir = str(tmp_path) + config.ntp_server = None + + npz_path = _make_npz(tmp_path / "csv_test.npz", num_frames=20) + set_usbcam_input(npz_path) + + cam = BehaviorCam(recording_config=config, camera_index=0) + cam.capture(show_video=False) + + csv_files = list(tmp_path.glob("*.csv")) + assert len(csv_files) == 1 + + with open(csv_files[0]) as f: + reader = csv.DictReader(f) + rows = list(reader) + + # Correct columns + assert reader.fieldnames == ["frame_index", "unix_time"] + + # Monotonically increasing frame index starting at 0 + indices = [int(r["frame_index"]) for r in rows] + assert indices == list(range(len(rows))) + + # Timestamps should be monotonically non-decreasing + times = [float(r["unix_time"]) for r in rows] + assert all(t1 <= t2 for t1, t2 in zip(times, times[1:])) + +def test_write_frame_count_matches_video(set_usbcam_input, tmp_path, monkeypatch): + """Count write_frame calls and verify they match the video frame count.""" + call_count = {"ok": 0, "failed": 0} + original = VideoWriter.write_frame + + def wrapped(self, frame): + ok = original(self, frame) + if ok: + call_count["ok"] += 1 + else: + call_count["failed"] += 1 + return ok + + monkeypatch.setattr(VideoWriter, "write_frame", wrapped, raising=True) + + config = USBCameraRecordingConfig.from_id("elp-camera") + config.output_dir = str(tmp_path) + config.ntp_server = None + + npz_path = _make_npz(tmp_path / "count_test.npz", num_frames=30) + set_usbcam_input(npz_path) + + cam = BehaviorCam(recording_config=config, camera_index=0) + cam.capture(show_video=False) + + video_files = list(tmp_path.glob("*.mp4")) + list(tmp_path.glob("*.avi")) + assert len(video_files) == 1 + + cap = cv2.VideoCapture(str(video_files[0])) + video_frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + cap.release() + + assert call_count["failed"] == 0, f"write_frame had {call_count['failed']} failures" + assert call_count["ok"] == video_frame_count, ( + f"write_frame calls ({call_count['ok']}) != video frames ({video_frame_count})" + ) + +@pytest.mark.parametrize( + "codec,expected_ext", + [("h264", ".mp4"), ("libx264", ".mp4"), ("rawvideo", ".avi")], +) +def test_output_container_matches_codec(set_usbcam_input, tmp_path, codec, expected_ext): + """Verify the correct container format is chosen for each codec.""" + config = USBCameraRecordingConfig.from_id("elp-camera") + config.output_dir = str(tmp_path) + config.ntp_server = None + config.codec = codec + if codec == "libx264": + config.backend = "skvideo" + config.pix_fmt = "yuv420p" + + npz_path = _make_npz(tmp_path / "codec_test.npz", num_frames=5) + set_usbcam_input(npz_path) + + cam = BehaviorCam(recording_config=config, camera_index=0) + cam.capture(show_video=False) + + video_files = list(tmp_path.glob(f"*{expected_ext}")) + assert len(video_files) == 1, ( + f"Expected 1 {expected_ext} file for codec={codec}, " + f"found: {list(tmp_path.glob('*.*'))}" + ) + +@pytest.fixture(params=["cv2", "skvideo"]) +def backend_config(request, tmp_path) -> USBCameraRecordingConfig: + """ELP camera config with parameterized backend for comparison testing.""" + config = USBCameraRecordingConfig.from_id("elp-camera") + config.output_dir = str(tmp_path) + config.ntp_server = None + config.backend = request.param + if request.param == "skvideo": + config.codec = "libx264" + config.pix_fmt = "yuv420p" + return config + + +def test_frame_count_matches_csv(backend_config, set_usbcam_input, tmp_path): + """Verify video frame count matches CSV row count for each backend. + + Known bug: skvideo backend may produce fewer video frames than CSV rows. + """ + npz_path = _make_npz(tmp_path / "stress_input.npz", num_frames=STRESS_NUM_FRAMES) + set_usbcam_input(npz_path) + + behavior_cam = BehaviorCam(recording_config=backend_config, camera_index=0) + behavior_cam.capture(show_video=False) + + video_files = list(tmp_path.glob("*.mp4")) + csv_files = list(tmp_path.glob("*.csv")) + + assert len(video_files) == 1, f"Expected 1 video file, found {len(video_files)}" + assert len(csv_files) == 1, f"Expected 1 CSV file, found {len(csv_files)}" + + with open(csv_files[0]) as f: + csv_row_count = sum(1 for _ in csv.DictReader(f)) + + reader = VideoReader(str(video_files[0])) + video_frame_count = sum(1 for _ in reader.read_frames()) + reader.release() + + assert video_frame_count == csv_row_count, ( + f"Frame count mismatch ({backend_config.backend} backend): " + f"video has {video_frame_count} frames but CSV has {csv_row_count} rows" + ) + +@pytest.mark.parametrize("input_fps", [20, 19, 18, 15]) +def test_frame_count_realtime(backend_config, set_usbcam_input, tmp_path, input_fps): + """Verify frame count with realtime replay to simulate camera timing.""" + npz_path = _make_npz(tmp_path / "realtime_input.npz", num_frames=STRESS_NUM_FRAMES, fps=input_fps) + set_usbcam_input(npz_path, realtime=True) + + behavior_cam = BehaviorCam(recording_config=backend_config, camera_index=0) + behavior_cam.capture(show_video=False) + + video_files = list(tmp_path.glob("*.mp4")) + csv_files = list(tmp_path.glob("*.csv")) + + assert len(video_files) == 1, f"Expected 1 video file, found {len(video_files)}" + assert len(csv_files) == 1, f"Expected 1 CSV file, found {len(csv_files)}" + + with open(csv_files[0]) as f: + csv_row_count = sum(1 for _ in csv.DictReader(f)) + + reader = VideoReader(str(video_files[0])) + video_frame_count = sum(1 for _ in reader.read_frames()) + reader.release() + + assert video_frame_count == csv_row_count, ( + f"Frame count mismatch ({backend_config.backend} backend, " + f"input {input_fps}fps vs configured {TEST_FPS}fps): " + f"video has {video_frame_count} frames but CSV has {csv_row_count} rows" + ) diff --git a/tests/test_cli.py b/tests/test_cli.py index b7e15d7e..17e24646 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,10 +1,13 @@ +import csv import sys +import numpy as np import pytest from click.testing import CliRunner from mio.cli.config import config, _list from mio.cli.stream import capture +from mio.cli.usbcam import test as usbcam_test from mio import Config from mio.utils import hash_video from mio.models import config as _config_mod @@ -171,3 +174,47 @@ def test_cli_capture( assert result.exit_code == 0 output_hash = hash_video(path_stem.with_suffix(".avi")) assert output_hash == video_hash + + +@pytest.mark.timeout(30) +def test_cli_usbcam_test(set_usbcam_input, tmp_path, config_override): + """ + `mio usbcam test` should run BehaviorCam with mock data and produce video + CSV output. + """ + num_frames = 10 + width, height, fps = 1280, 720, 20 + + frames = np.random.default_rng(42).integers( + 0, 255, size=(num_frames, height, width, 3), dtype=np.uint8 + ) + timestamps = np.arange(num_frames, dtype=np.float64) / fps + npz_path = tmp_path / "test_input.npz" + np.savez(npz_path, frames=frames, timestamps=timestamps) + + set_usbcam_input(npz_path) + + # Override config to write output to tmp_path and disable NTP + from mio import BASE_DIR + + elp_config_path = BASE_DIR / "data" / "config" / "camera" / "elp-camera.yaml" + config_path = config_override( + elp_config_path, {"output_dir": str(tmp_path), "ntp_server": None} + ) + + runner = CliRunner() + result = runner.invoke( + usbcam_test, + ["--config", str(config_path), "--source", str(npz_path), "--no-display"], + ) + + assert result.exit_code == 0, f"CLI failed: {result.output}\n{result.exception}" + + video_files = list(tmp_path.glob("*.mp4")) + list(tmp_path.glob("*.avi")) + csv_files = list(tmp_path.glob("*.csv")) + + assert len(video_files) == 1, f"Expected 1 video file, found {len(video_files)}" + assert len(csv_files) == 1, f"Expected 1 CSV file, found {len(csv_files)}" + + with open(csv_files[0]) as f: + csv_row_count = sum(1 for _ in csv.DictReader(f)) + assert csv_row_count == num_frames