diff --git a/fluster/decoders/gstreamer.py b/fluster/decoders/gstreamer.py index 7a61f91f..93bde9e3 100644 --- a/fluster/decoders/gstreamer.py +++ b/fluster/decoders/gstreamer.py @@ -18,22 +18,20 @@ import os -import shlex import subprocess from functools import lru_cache from typing import List, Optional from fluster.codec import Codec, OutputFormat from fluster.decoder import Decoder, register_decoder +from fluster.gstreamer import run_pipeline from fluster.utils import ( file_checksum, normalize_binary_cmd, - run_command, - run_command_with_output, ) -PIPELINE_TPL = "{} --no-fault filesrc location={} ! {} ! {} ! {} ! {} {}" -PIPELINE_TPL_FLU_H266_DEC = "{} --no-fault filesrc location={} ! {} ! {} ! {} {}" +PIPELINE_TPL = "filesrc location={} ! {} ! {} ! {} ! {} {}" +PIPELINE_TPL_FLU_H266_DEC = "filesrc location={} ! {} ! {} ! {} {}" @lru_cache(maxsize=None) @@ -86,7 +84,6 @@ class GStreamer(Decoder): """Base class for GStreamer decoders""" decoder_bin = "" - cmd = "" caps = "" gst_api = "" api = "" @@ -99,7 +96,6 @@ def __init__(self) -> None: if not self.name: self.name = f"{self.provider}-{self.codec.value}-{self.api}-Gst{self.gst_api}" self.description = f"{self.provider} {self.codec.value} {self.api} decoder for GStreamer {self.gst_api}" - self.cmd = normalize_binary_cmd(self.cmd) if not gst_element_exists(self.sink): self.sink = "filesink" @@ -113,7 +109,6 @@ def gen_pipeline( """Generate the GStreamer pipeline used to decode the test vector""" output = f"location={output_filepath}" if output_filepath else "" return PIPELINE_TPL.format( - self.cmd, input_filepath, self.parser if self.parser else "parsebin", self.decoder_bin, @@ -157,31 +152,32 @@ def decode( if self.sink == "videocodectestsink": output_param = output_filepath if keep_files else None pipeline = self.gen_pipeline(input_filepath, output_param, output_format) - command = shlex.split(pipeline) - command.append("-m") - data = run_command_with_output(command, timeout=timeout, verbose=verbose).splitlines() + result = run_pipeline(pipeline, timeout=timeout, verbose=verbose, print_messages=True) + if result.returncode != 0: + raise subprocess.CalledProcessError(result.returncode, pipeline, result.stdout, result.stderr) + data = result.stdout.splitlines() return self.parse_videocodectestsink_md5sum(data) pipeline = self.gen_pipeline(input_filepath, output_filepath, output_format) - run_command(shlex.split(pipeline), timeout=timeout, verbose=verbose) + result = run_pipeline(pipeline, timeout=timeout, verbose=verbose) + if result.returncode != 0: + raise subprocess.CalledProcessError(result.returncode, pipeline, result.stdout, result.stderr) return file_checksum(output_filepath) @lru_cache(maxsize=128) def check(self, verbose: bool) -> bool: """Check if GStreamer decoder is valid (better than gst-inspect)""" try: - binary = normalize_binary_cmd(f"gst-launch-{self.gst_api}") - pipeline = f"{binary} --no-fault appsrc num-buffers=0 ! {self.decoder_bin} ! fakesink" - run_command(shlex.split(pipeline), verbose=verbose) + pipeline = f"appsrc num-buffers=0 ! {self.decoder_bin} ! fakesink" + result = run_pipeline(pipeline, verbose=verbose) + return result.returncode == 0 except Exception: return False - return True class GStreamer10Video(GStreamer): """Base class for GStreamer 1.x video decoders""" - cmd = "gst-launch-1.0" caps = "video/x-raw" gst_api = "1.0" sink = "videocodectestsink" @@ -205,7 +201,6 @@ def gen_pipeline( caps = f"{self.caps} ! videoconvert dither=none ! {raw_caps}" output = f"location={output_filepath}" if output_filepath else "" return PIPELINE_TPL.format( - self.cmd, input_filepath, self.parser if self.parser else "parsebin", self.decoder_bin, @@ -218,7 +213,6 @@ def gen_pipeline( class GStreamer10Audio(GStreamer): """Base class for GStreamer 1.x audio decoders""" - cmd = "gst-launch-1.0" caps = "audio/x-raw" gst_api = "1.0" sink = "filesink" @@ -789,7 +783,6 @@ def gen_pipeline( caps = f"{self.caps} ! videoconvert dither=none ! video/x-raw,format={output_format_to_gst(output_format)}" output = f"location={output_filepath}" if output_filepath else "" return PIPELINE_TPL_FLU_H266_DEC.format( - self.cmd, input_filepath, self.decoder_bin, caps, diff --git a/fluster/gstreamer/__init__.py b/fluster/gstreamer/__init__.py new file mode 100644 index 00000000..0925af17 --- /dev/null +++ b/fluster/gstreamer/__init__.py @@ -0,0 +1,79 @@ +# Fluster - testing framework for decoders conformance +# Copyright (C) 2025, Fluendo, S.A. +# Author: Andoni Morales Alastruey , Fluendo, S.A. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public License +# as published by the Free Software Foundation, either version 3 +# of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . + +""" +GStreamer utilities for Fluster. + +This package provides CFFI bindings for GStreamer and a pipeline runner +that can be used to run GStreamer pipelines without depending on the +GStreamer Python bindings (gi.repository.Gst). +""" + +from __future__ import annotations + +import os +import subprocess +import sys +from typing import Optional + +from fluster.gstreamer.gst_cffi import GStreamerInstallation +from fluster.gstreamer.runner import ExitCode as ExitCode + + +def run_pipeline( + pipeline: str, + timeout: Optional[int] = None, + verbose: bool = False, + quiet: bool = False, + print_messages: bool = False, +) -> subprocess.CompletedProcess[str]: + """ + Run a GStreamer pipeline in a subprocess with proper environment setup. + + This is a convenience function that handles environment configuration and + spawns the GStreamer runner as a subprocess. It's the recommended way to + run GStreamer pipelines from fluster. + + Args: + pipeline: The GStreamer pipeline description string (gst-launch format). + timeout: Timeout in seconds for the pipeline to complete. None for no timeout. + verbose: Enable verbose output from the runner. + quiet: Suppress output except errors. + print_messages: Print all bus messages (like gst-launch -m). + + Returns: + subprocess.CompletedProcess with returncode, stdout, and stderr. + + Exit codes (see ExitCode enum): + SUCCESS (0) - Pipeline completed successfully (EOS) + ERROR (1) - Pipeline error occurred + INIT_ERROR (2) - Invalid arguments or initialization error + TIMEOUT (3) - Timeout occurred + """ + cmd = [sys.executable, "-m", "fluster.gstreamer.runner"] + if verbose: + cmd.append("--verbose") + if quiet: + cmd.append("--quiet") + if print_messages: + cmd.append("--messages") + if timeout is not None: + cmd.extend(["--timeout", str(timeout)]) + cmd.append(pipeline) + env = os.environ.copy() + env.update(GStreamerInstallation().get_environment()) + return subprocess.run(cmd, env=env, capture_output=True, text=True, check=False) diff --git a/fluster/gstreamer/gst_cffi.py b/fluster/gstreamer/gst_cffi.py new file mode 100644 index 00000000..6153df36 --- /dev/null +++ b/fluster/gstreamer/gst_cffi.py @@ -0,0 +1,637 @@ +# Fluster - testing framework for decoders conformance +# Copyright (C) 2025, Fluendo, S.A. +# Author: Andoni Morales Alastruey , Fluendo, S.A. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public License +# as published by the Free Software Foundation, either version 3 +# of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . + +""" +CFFI bindings for GStreamer. + +This module provides minimal CFFI bindings for GStreamer to run pipelines +without depending on the GStreamer Python bindings (gi.repository.Gst). +""" + +from __future__ import annotations + +import ctypes +import ctypes.util +import os +import platform +from typing import List, Optional, Tuple + +# Platform-specific sizes +_POINTER_SIZE = ctypes.sizeof(ctypes.c_void_p) +_GSIZE = ctypes.c_uint64 if _POINTER_SIZE == 8 else ctypes.c_uint32 +_GTYPE = _GSIZE # GType is same size as gsize + +# GStreamer type definitions +GST_STATE_VOID_PENDING = 0 +GST_STATE_NULL = 1 +GST_STATE_READY = 2 +GST_STATE_PAUSED = 3 +GST_STATE_PLAYING = 4 + +GST_STATE_CHANGE_FAILURE = 0 +GST_STATE_CHANGE_SUCCESS = 1 +GST_STATE_CHANGE_ASYNC = 2 +GST_STATE_CHANGE_NO_PREROLL = 3 + +GST_MESSAGE_UNKNOWN = 0 +GST_MESSAGE_EOS = 1 << 0 +GST_MESSAGE_ERROR = 1 << 1 +GST_MESSAGE_WARNING = 1 << 2 +GST_MESSAGE_INFO = 1 << 3 +GST_MESSAGE_TAG = 1 << 4 +GST_MESSAGE_BUFFERING = 1 << 5 +GST_MESSAGE_STATE_CHANGED = 1 << 6 +GST_MESSAGE_STATE_DIRTY = 1 << 7 +GST_MESSAGE_STEP_DONE = 1 << 8 +GST_MESSAGE_CLOCK_PROVIDE = 1 << 9 +GST_MESSAGE_CLOCK_LOST = 1 << 10 +GST_MESSAGE_NEW_CLOCK = 1 << 11 +GST_MESSAGE_STRUCTURE_CHANGE = 1 << 12 +GST_MESSAGE_STREAM_STATUS = 1 << 13 +GST_MESSAGE_APPLICATION = 1 << 14 +GST_MESSAGE_ELEMENT = 1 << 15 +GST_MESSAGE_SEGMENT_START = 1 << 16 +GST_MESSAGE_SEGMENT_DONE = 1 << 17 +GST_MESSAGE_DURATION_CHANGED = 1 << 18 +GST_MESSAGE_LATENCY = 1 << 19 +GST_MESSAGE_ASYNC_START = 1 << 20 +GST_MESSAGE_ASYNC_DONE = 1 << 21 +GST_MESSAGE_REQUEST_STATE = 1 << 22 +GST_MESSAGE_STEP_START = 1 << 23 +GST_MESSAGE_QOS = 1 << 24 +GST_MESSAGE_PROGRESS = 1 << 25 +GST_MESSAGE_TOC = 1 << 26 +GST_MESSAGE_RESET_TIME = 1 << 27 +GST_MESSAGE_STREAM_START = 1 << 28 +GST_MESSAGE_NEED_CONTEXT = 1 << 29 +GST_MESSAGE_HAVE_CONTEXT = 1 << 30 +GST_MESSAGE_ANY = 0xFFFFFFFF + +GST_CLOCK_TIME_NONE = 0xFFFFFFFFFFFFFFFF +GST_SECOND = 1000000000 + +# GStreamer framework/installation paths +GST_FRAMEWORK_PATH_MACOS = "/Library/Frameworks/GStreamer.framework" +GST_FRAMEWORK_LIBRARIES_MACOS = f"{GST_FRAMEWORK_PATH_MACOS}/Libraries" +GST_FRAMEWORK_PLUGINS_MACOS = f"{GST_FRAMEWORK_PATH_MACOS}/Versions/Current/lib/gstreamer-1.0" + +# Homebrew paths (Apple Silicon and Intel) +GST_HOMEBREW_PREFIX_ARM64 = "/opt/homebrew" +GST_HOMEBREW_PREFIX_X86_64 = "/usr/local" + + +class GStreamerInstallation: + """Detects and provides paths for GStreamer installation.""" + + _instance: Optional["GStreamerInstallation"] = None + + def __new__(cls) -> "GStreamerInstallation": + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self) -> None: + # Skip re-initialization if already detected + if hasattr(self, "_detected"): + return + self._lib_path: Optional[str] = None + self._plugin_path: Optional[str] = None + self._bin_path: Optional[str] = None # Windows only + self._detected = self._detect() + + def _detect(self) -> bool: + """Detect GStreamer installation based on platform.""" + system = platform.system() + + if system == "Darwin": + return self._detect_macos() + elif system == "Windows": + return self._detect_windows() + elif system == "Linux": + return self._detect_linux() + return False + + def _detect_macos(self) -> bool: + """Detect GStreamer on macOS (Framework or Homebrew).""" + # Check GStreamer Framework first + if os.path.exists(GST_FRAMEWORK_PATH_MACOS): + self._lib_path = GST_FRAMEWORK_LIBRARIES_MACOS + self._plugin_path = GST_FRAMEWORK_PLUGINS_MACOS + return True + + # Check Homebrew (Apple Silicon first, then Intel) + for prefix in [GST_HOMEBREW_PREFIX_ARM64, GST_HOMEBREW_PREFIX_X86_64]: + gst_lib = os.path.join(prefix, "lib", "libgstreamer-1.0.dylib") + if os.path.exists(gst_lib): + self._lib_path = os.path.join(prefix, "lib") + plugin_path = os.path.join(self._lib_path, "gstreamer-1.0") + if os.path.exists(plugin_path): + self._plugin_path = plugin_path + return True + + return False + + def _detect_windows(self) -> bool: + """Detect GStreamer on Windows.""" + gst_root = os.environ.get("GSTREAMER_1_0_ROOT_MSVC_X86_64") or os.environ.get("GSTREAMER_1_0_ROOT_X86_64") + if gst_root and os.path.exists(gst_root): + self._bin_path = os.path.join(gst_root, "bin") + self._lib_path = self._bin_path # On Windows, DLLs are in bin + self._plugin_path = os.path.join(gst_root, "lib", "gstreamer-1.0") + return True + + # Try common installation paths + for path in [ + "C:\\gstreamer\\1.0\\msvc_x86_64", + "C:\\gstreamer\\1.0\\x86_64", + ]: + if os.path.exists(path): + self._bin_path = os.path.join(path, "bin") + self._lib_path = self._bin_path + self._plugin_path = os.path.join(path, "lib", "gstreamer-1.0") + return True + + return False + + def _detect_linux(self) -> bool: + """Detect GStreamer on Linux (uses system paths).""" + # Linux typically uses system paths, no special detection needed + # Libraries are found via ldconfig/LD_LIBRARY_PATH + # Check if we can find the library via ctypes + lib_path = ctypes.util.find_library("gstreamer-1.0") + return lib_path is not None + + @property + def is_available(self) -> bool: + """Check if GStreamer installation was detected.""" + return self._detected + + @property + def lib_path(self) -> Optional[str]: + """Get the library path for GStreamer.""" + return self._lib_path + + @property + def plugin_path(self) -> Optional[str]: + """Get the plugin path for GStreamer.""" + return self._plugin_path + + @property + def bin_path(self) -> Optional[str]: + """Get the binary path for GStreamer (Windows only).""" + return self._bin_path + + def find_library(self, name: str) -> Optional[str]: + """Find a specific library by name.""" + system = platform.system() + + if system == "Darwin": + if self._lib_path: + lib_file = os.path.join(self._lib_path, f"lib{name}.dylib") + if os.path.exists(lib_file): + return lib_file + # Fallback to ctypes.util.find_library + return ctypes.util.find_library(name) + + elif system == "Windows": + if self._lib_path: + lib_file = os.path.join(self._lib_path, f"{name}.dll") + if os.path.exists(lib_file): + return lib_file + # Fallback to ctypes.util.find_library + return ctypes.util.find_library(name) + + elif system == "Linux": + # Try ctypes.util.find_library first + lib_path = ctypes.util.find_library(name) + if lib_path: + return lib_path + # Try common paths + for path in [f"lib{name}.so.0", f"lib{name}.so"]: + try: + ctypes.CDLL(path) + return path + except OSError: + continue + + return None + + def get_environment(self) -> dict[str, str]: + """Get environment variables needed to run GStreamer correctly.""" + env: dict[str, str] = {} + system = platform.system() + + if system == "Darwin": + if self._lib_path: + current_dyld = os.environ.get("DYLD_LIBRARY_PATH", "") + if self._lib_path not in current_dyld: + env["DYLD_LIBRARY_PATH"] = f"{self._lib_path}:{current_dyld}" if current_dyld else self._lib_path + if self._plugin_path: + env["GST_PLUGIN_SYSTEM_PATH"] = self._plugin_path + + elif system == "Windows": + if self._bin_path: + current_path = os.environ.get("PATH", "") + if self._bin_path not in current_path: + env["PATH"] = f"{self._bin_path};{current_path}" + if self._plugin_path: + env["GST_PLUGIN_SYSTEM_PATH"] = self._plugin_path + + # Linux typically uses system paths, no special env needed + + return env + + +class GStreamerError(Exception): + """Exception raised for GStreamer errors.""" + + +class GstCFFI: + """CFFI-based GStreamer bindings (singleton).""" + + _instance: Optional["GstCFFI"] = None + + def __new__(cls) -> "GstCFFI": + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self) -> None: + # Skip re-initialization if already set up + if hasattr(self, "_gst"): + return + self._gst: Optional[ctypes.CDLL] = None + self._glib: Optional[ctypes.CDLL] = None + self._gobject: Optional[ctypes.CDLL] = None + self._initialized = False + + def _load_libraries(self) -> None: + if self._gst is not None: + return + + installation = GStreamerInstallation() + + # Find and load GLib first + glib_path = installation.find_library("glib-2.0") + if not glib_path: + raise GStreamerError("Could not find GLib library") + self._glib = ctypes.CDLL(glib_path) + + # Find and load GObject + gobject_path = installation.find_library("gobject-2.0") + if not gobject_path: + raise GStreamerError("Could not find GObject library") + self._gobject = ctypes.CDLL(gobject_path) + + # Find and load GStreamer + gst_path = installation.find_library("gstreamer-1.0") + if not gst_path: + raise GStreamerError("Could not find GStreamer library") + self._gst = ctypes.CDLL(gst_path) + + self._setup_function_signatures() + + def _setup_function_signatures(self) -> None: + """Set up ctypes function signatures for GStreamer functions.""" + if self._gst is None or self._glib is None or self._gobject is None: + return + + # GLib functions + self._glib.g_free.argtypes = [ctypes.c_void_p] + self._glib.g_free.restype = None + + # GError structure - we'll handle it as opaque pointer + # and use helper functions + + # gst_init + self._gst.gst_init.argtypes = [ctypes.POINTER(ctypes.c_int), ctypes.POINTER(ctypes.c_char_p)] + self._gst.gst_init.restype = None + + # gst_init_check + # Note: argv is char*** (pointer to char**) but we use c_void_p for flexibility + self._gst.gst_init_check.argtypes = [ + ctypes.POINTER(ctypes.c_int), + ctypes.c_void_p, # char*** - we pass pointer to argv array + ctypes.POINTER(ctypes.c_void_p), + ] + self._gst.gst_init_check.restype = ctypes.c_int + + # gst_is_initialized + self._gst.gst_is_initialized.argtypes = [] + self._gst.gst_is_initialized.restype = ctypes.c_int + + # gst_deinit + self._gst.gst_deinit.argtypes = [] + self._gst.gst_deinit.restype = None + + # gst_parse_launch + self._gst.gst_parse_launch.argtypes = [ctypes.c_char_p, ctypes.POINTER(ctypes.c_void_p)] + self._gst.gst_parse_launch.restype = ctypes.c_void_p + + # gst_element_set_state + self._gst.gst_element_set_state.argtypes = [ctypes.c_void_p, ctypes.c_int] + self._gst.gst_element_set_state.restype = ctypes.c_int + + # gst_element_get_state + self._gst.gst_element_get_state.argtypes = [ + ctypes.c_void_p, + ctypes.POINTER(ctypes.c_int), + ctypes.POINTER(ctypes.c_int), + ctypes.c_uint64, + ] + self._gst.gst_element_get_state.restype = ctypes.c_int + + # gst_element_get_bus + self._gst.gst_element_get_bus.argtypes = [ctypes.c_void_p] + self._gst.gst_element_get_bus.restype = ctypes.c_void_p + + # gst_bus_timed_pop_filtered + self._gst.gst_bus_timed_pop_filtered.argtypes = [ctypes.c_void_p, ctypes.c_uint64, ctypes.c_int] + self._gst.gst_bus_timed_pop_filtered.restype = ctypes.c_void_p + + # gst_bus_poll + self._gst.gst_bus_poll.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.c_uint64] + self._gst.gst_bus_poll.restype = ctypes.c_void_p + + # gst_message_get_structure + self._gst.gst_message_get_structure.argtypes = [ctypes.c_void_p] + self._gst.gst_message_get_structure.restype = ctypes.c_void_p + + # gst_message_parse_error + self._gst.gst_message_parse_error.argtypes = [ + ctypes.c_void_p, + ctypes.POINTER(ctypes.c_void_p), + ctypes.POINTER(ctypes.c_void_p), + ] + self._gst.gst_message_parse_error.restype = None + + # gst_message_parse_warning + self._gst.gst_message_parse_warning.argtypes = [ + ctypes.c_void_p, + ctypes.POINTER(ctypes.c_void_p), + ctypes.POINTER(ctypes.c_void_p), + ] + self._gst.gst_message_parse_warning.restype = None + + # gst_message_unref + self._gst.gst_message_unref.argtypes = [ctypes.c_void_p] + self._gst.gst_message_unref.restype = None + + # gst_object_unref + self._gst.gst_object_unref.argtypes = [ctypes.c_void_p] + self._gst.gst_object_unref.restype = None + + # gst_structure_to_string + self._gst.gst_structure_to_string.argtypes = [ctypes.c_void_p] + self._gst.gst_structure_to_string.restype = ctypes.c_void_p + + # g_error_free + self._glib.g_error_free.argtypes = [ctypes.c_void_p] + self._glib.g_error_free.restype = None + + def init(self, options: Optional[List[str]] = None) -> None: + """Initialize GStreamer. + + Args: + options: List of GStreamer options to pass to gst_init_check. + These are command-line style options like '--gst-no-fault', + '--gst-debug-level=3', etc. If None, defaults to ['--gst-no-fault']. + """ + self._load_libraries() + if self._gst is None: + raise GStreamerError("GStreamer library not loaded") + + if self._gst.gst_is_initialized(): + self._initialized = True + return + + error = ctypes.c_void_p() + + # Default to --gst-no-fault if no options provided + if options is None: + options = ["--gst-no-fault"] + + # Build argv array (char**) + if options: + argv_list = [b"fluster"] + [opt.encode("utf-8") for opt in options] + argc = ctypes.c_int(len(argv_list)) + # Create array of c_char_p + argv_array = (ctypes.c_char_p * len(argv_list))(*argv_list) + # Create pointer to the array (char***) + argv_ptr = ctypes.pointer(ctypes.cast(argv_array, ctypes.POINTER(ctypes.c_char_p))) + result = self._gst.gst_init_check(ctypes.byref(argc), argv_ptr, ctypes.byref(error)) + else: + argc = ctypes.c_int(0) + result = self._gst.gst_init_check(ctypes.byref(argc), None, ctypes.byref(error)) + + if not result: + if error.value: + # GError structure: domain (guint32), code (gint), message (gchar*) + # We need to read the message from the error + error_msg = "GStreamer initialization failed" + self._glib.g_error_free(error) # type: ignore + raise GStreamerError(error_msg) + raise GStreamerError("GStreamer initialization failed") + self._initialized = True + + def deinit(self) -> None: + """Deinitialize GStreamer.""" + if self._gst is not None and self._initialized: + self._gst.gst_deinit() + self._initialized = False + + def parse_launch(self, pipeline_description: str) -> ctypes.c_void_p: + """Parse and create a pipeline from a description string.""" + if self._gst is None: + raise GStreamerError("GStreamer not initialized") + + error = ctypes.c_void_p() + pipeline = self._gst.gst_parse_launch(pipeline_description.encode("utf-8"), ctypes.byref(error)) + + if error.value: + # Try to get error message + error_msg = f"Failed to parse pipeline: {pipeline_description}" + self._glib.g_error_free(error) # type: ignore + raise GStreamerError(error_msg) + + if not pipeline: + raise GStreamerError(f"Failed to create pipeline: {pipeline_description}") + + return ctypes.cast(pipeline, ctypes.c_void_p) + + def element_set_state(self, element: ctypes.c_void_p, state: int) -> int: + """Set the state of an element.""" + if self._gst is None: + raise GStreamerError("GStreamer not initialized") + return int(self._gst.gst_element_set_state(element, state)) + + def element_get_state(self, element: ctypes.c_void_p, timeout: int = GST_CLOCK_TIME_NONE) -> Tuple[int, int, int]: + """Get the state of an element.""" + if self._gst is None: + raise GStreamerError("GStreamer not initialized") + state = ctypes.c_int() + pending = ctypes.c_int() + result = self._gst.gst_element_get_state(element, ctypes.byref(state), ctypes.byref(pending), timeout) + return result, state.value, pending.value + + def element_get_bus(self, element: ctypes.c_void_p) -> ctypes.c_void_p: + """Get the bus of an element.""" + if self._gst is None: + raise GStreamerError("GStreamer not initialized") + return ctypes.cast(self._gst.gst_element_get_bus(element), ctypes.c_void_p) + + def bus_timed_pop_filtered( + self, bus: ctypes.c_void_p, timeout: int, message_types: int + ) -> Optional[ctypes.c_void_p]: + """Pop a message from the bus with a timeout and filter.""" + if self._gst is None: + raise GStreamerError("GStreamer not initialized") + msg = self._gst.gst_bus_timed_pop_filtered(bus, timeout, message_types) + return msg if msg else None + + def bus_poll(self, bus: ctypes.c_void_p, message_types: int, timeout: int) -> Optional[ctypes.c_void_p]: + """Poll the bus for messages.""" + if self._gst is None: + raise GStreamerError("GStreamer not initialized") + msg = self._gst.gst_bus_poll(bus, message_types, timeout) + return msg if msg else None + + def message_get_type(self, message: ctypes.c_void_p) -> int: + """Get the type of a message.""" + # GstMiniObject structure (on 64-bit systems): + # GType type; - 8 bytes (offset 0) + # gint refcount; - 4 bytes (offset 8) + # gint lockstate; - 4 bytes (offset 12) + # guint flags; - 4 bytes (offset 16) + # GstMiniObjectCopyFunction copy; - 8 bytes (offset 24, with padding) + # GstMiniObjectDisposeFunction dispose; - 8 bytes (offset 32) + # GstMiniObjectFreeFunction free; - 8 bytes (offset 40) + # guint n_qdata; - 4 bytes (offset 48) + # gpointer qdata; - 8 bytes (offset 56, with padding) + # Total GstMiniObject size: 64 bytes on 64-bit (with alignment) + # + # GstMessage structure after GstMiniObject: + # GstMessageType type; - 4 bytes + # + # On 32-bit systems, offsets would be different. + if not message: + return GST_MESSAGE_UNKNOWN + + # The GstMessage.type field comes right after GstMiniObject + # We need to calculate the offset based on pointer size + if _POINTER_SIZE == 8: + # 64-bit: GstMiniObject is 64 bytes, type is at offset 64 + offset = 64 + else: + # 32-bit: GstMiniObject is around 32 bytes + offset = 32 + + # c_void_p doesn't support direct arithmetic, use value attribute + msg_addr = ctypes.cast(message, ctypes.c_void_p).value + if msg_addr is None: + return GST_MESSAGE_UNKNOWN + type_ptr = ctypes.cast(msg_addr + offset, ctypes.POINTER(ctypes.c_int)) + return int(type_ptr[0]) + + def message_parse_error(self, message: ctypes.c_void_p) -> Tuple[str, str]: + """Parse an error message.""" + if self._gst is None or self._glib is None: + raise GStreamerError("GStreamer not initialized") + + error = ctypes.c_void_p() + debug = ctypes.c_void_p() + self._gst.gst_message_parse_error(message, ctypes.byref(error), ctypes.byref(debug)) + + error_msg = "" + debug_msg = "" + + if error.value: + # GError: domain (guint32), code (gint), message (gchar*) + # message is at offset 8 on 64-bit systems + msg_ptr = ctypes.cast(error.value + 8, ctypes.POINTER(ctypes.c_char_p)) + if msg_ptr[0]: + error_msg = msg_ptr[0].decode("utf-8", errors="replace") + self._glib.g_error_free(error) + + if debug.value: + debug_str = ctypes.cast(debug.value, ctypes.c_char_p) + if debug_str.value: + debug_msg = debug_str.value.decode("utf-8", errors="replace") + self._glib.g_free(debug) + + return error_msg, debug_msg + + def message_parse_warning(self, message: ctypes.c_void_p) -> Tuple[str, str]: + """Parse a warning message.""" + if self._gst is None or self._glib is None: + raise GStreamerError("GStreamer not initialized") + + error = ctypes.c_void_p() + debug = ctypes.c_void_p() + self._gst.gst_message_parse_warning(message, ctypes.byref(error), ctypes.byref(debug)) + + error_msg = "" + debug_msg = "" + + if error.value: + msg_ptr = ctypes.cast(error.value + 8, ctypes.POINTER(ctypes.c_char_p)) + if msg_ptr[0]: + error_msg = msg_ptr[0].decode("utf-8", errors="replace") + self._glib.g_error_free(error) + + if debug.value: + debug_str = ctypes.cast(debug.value, ctypes.c_char_p) + if debug_str.value: + debug_msg = debug_str.value.decode("utf-8", errors="replace") + self._glib.g_free(debug) + + return error_msg, debug_msg + + def message_get_structure(self, message: ctypes.c_void_p) -> Optional[ctypes.c_void_p]: + """Get the structure of a message.""" + if self._gst is None: + raise GStreamerError("GStreamer not initialized") + structure = self._gst.gst_message_get_structure(message) + return structure if structure else None + + def structure_to_string(self, structure: ctypes.c_void_p) -> str: + """Convert a structure to a string.""" + if self._gst is None or self._glib is None: + raise GStreamerError("GStreamer not initialized") + result = self._gst.gst_structure_to_string(structure) + if result: + # Cast to c_char_p to read the string + string_ptr = ctypes.cast(result, ctypes.c_char_p) + string = string_ptr.value.decode("utf-8", errors="replace") if string_ptr.value else "" + # Free the original pointer (not the cast) + self._glib.g_free(result) + return string + return "" + + def message_unref(self, message: ctypes.c_void_p) -> None: + """Unref a message.""" + if self._gst is None: + return + if message: + self._gst.gst_message_unref(message) + + def object_unref(self, obj: ctypes.c_void_p) -> None: + """Unref an object.""" + if self._gst is None: + return + if obj: + self._gst.gst_object_unref(obj) diff --git a/fluster/gstreamer/runner.py b/fluster/gstreamer/runner.py new file mode 100644 index 00000000..411d0a4c --- /dev/null +++ b/fluster/gstreamer/runner.py @@ -0,0 +1,372 @@ +#!/usr/bin/env python3 +# Fluster - testing framework for decoders conformance +# Copyright (C) 2025, Fluendo, S.A. +# Author: Andoni Morales Alastruey , Fluendo, S.A. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public License +# as published by the Free Software Foundation, either version 3 +# of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . + +""" +GStreamer pipeline runner using CFFI bindings. + +This script provides a replacement for gst-launch-1.0 that can be used by +fluster to run GStreamer pipelines. It uses custom CFFI bindings to avoid +depending on the GStreamer Python bindings (gi.repository.Gst). + +Usage: + python -m fluster.gstreamer.runner [options] + +Options: + -m, --messages Print all bus messages (similar to gst-launch -m) + -v, --verbose Enable verbose output + -t, --timeout Timeout in seconds (default: no timeout) + -q, --quiet Suppress output except errors + --no-fault Don't install a fault handler (ignored, for compatibility) + +Exit codes: + 0 - Pipeline completed successfully (EOS) + 1 - Pipeline error occurred + 2 - Invalid arguments or initialization error + 3 - Timeout occurred +""" + +from __future__ import annotations + +import argparse +import ctypes +import signal +import sys +from enum import IntEnum +from typing import Optional + +from fluster.gstreamer.gst_cffi import ( + GST_CLOCK_TIME_NONE, + GST_MESSAGE_ANY, + GST_MESSAGE_ELEMENT, + GST_MESSAGE_EOS, + GST_MESSAGE_ERROR, + GST_MESSAGE_WARNING, + GST_SECOND, + GST_STATE_CHANGE_FAILURE, + GST_STATE_NULL, + GST_STATE_PLAYING, + GstCFFI, + GStreamerError, +) + + +class ExitCode(IntEnum): + """Exit codes for the GStreamer runner.""" + + SUCCESS = 0 + ERROR = 1 + INIT_ERROR = 2 + TIMEOUT = 3 + + +class GStreamerRunner: + """Runs GStreamer pipelines using CFFI bindings.""" + + def __init__( + self, + verbose: bool = False, + print_messages: bool = False, + quiet: bool = False, + timeout: Optional[int] = None, + ) -> None: + self.verbose = verbose + self.print_messages = print_messages + self.quiet = quiet + self.timeout = timeout + self.gst: Optional[GstCFFI] = None + self.pipeline: Optional[ctypes.c_void_p] = None + self.bus: Optional[ctypes.c_void_p] = None + self._interrupted = False + + def _log(self, message: str) -> None: + """Log a message if not in quiet mode.""" + if not self.quiet: + print(message) + + def _log_verbose(self, message: str) -> None: + """Log a message if in verbose mode.""" + if self.verbose and not self.quiet: + print(message) + + def _log_error(self, message: str) -> None: + """Log an error message (always shown).""" + print(message, file=sys.stderr) + + def _signal_handler(self, signum: int, frame: object) -> None: + """Handle interrupt signals.""" + self._interrupted = True + self._log("\nInterrupt received, stopping pipeline...") + + def init(self) -> None: + """Initialize GStreamer with default options (--gst-no-fault).""" + self.gst = GstCFFI() + self.gst.init() # Uses default options: ['--gst-no-fault'] + self._log_verbose("GStreamer initialized") + + def run_pipeline(self, pipeline_description: str) -> int: + """ + Run a GStreamer pipeline. + + Args: + pipeline_description: The pipeline description string + + Returns: + Exit code: SUCCESS, ERROR, INIT_ERROR, or TIMEOUT + """ + if self.gst is None: + self._log_error("GStreamer not initialized") + return ExitCode.INIT_ERROR + + # Install signal handlers + original_sigint = signal.signal(signal.SIGINT, self._signal_handler) + original_sigterm = signal.signal(signal.SIGTERM, self._signal_handler) + + try: + return self._run_pipeline_internal(pipeline_description) + finally: + # Restore signal handlers + signal.signal(signal.SIGINT, original_sigint) + signal.signal(signal.SIGTERM, original_sigterm) + # Cleanup + self._cleanup() + + def _run_pipeline_internal(self, pipeline_description: str) -> int: + """Internal pipeline execution logic.""" + if self.gst is None: + return ExitCode.INIT_ERROR + + self._log_verbose(f"Creating pipeline: {pipeline_description}") + + # Parse and create the pipeline + try: + self.pipeline = self.gst.parse_launch(pipeline_description) + except GStreamerError as e: + self._log_error(f"ERROR: {e}") + return ExitCode.ERROR + + if not self.pipeline: + self._log_error("ERROR: Failed to create pipeline") + return ExitCode.ERROR + + # Get the bus + self.bus = self.gst.element_get_bus(self.pipeline) + if not self.bus: + self._log_error("ERROR: Failed to get pipeline bus") + return ExitCode.ERROR + + # Set pipeline to PLAYING + self._log_verbose("Setting pipeline to PLAYING") + ret = self.gst.element_set_state(self.pipeline, GST_STATE_PLAYING) + + if ret == GST_STATE_CHANGE_FAILURE: + self._log_error("ERROR: Failed to set pipeline to PLAYING state") + return ExitCode.ERROR + + self._log_verbose("Pipeline is running...") + + # Calculate timeout in nanoseconds + if self.timeout: + timeout_ns = self.timeout * GST_SECOND + else: + timeout_ns = GST_CLOCK_TIME_NONE + + # Message loop + return self._message_loop(timeout_ns) + + def _message_loop(self, timeout_ns: int) -> int: + """Process messages from the bus until EOS or error.""" + if self.gst is None or self.bus is None: + return ExitCode.INIT_ERROR + + # Message types we're interested in + if self.print_messages: + msg_types = GST_MESSAGE_ANY + else: + msg_types = GST_MESSAGE_EOS | GST_MESSAGE_ERROR | GST_MESSAGE_WARNING | GST_MESSAGE_ELEMENT + + # Use a shorter poll interval for responsiveness + poll_timeout = min(timeout_ns, GST_SECOND) if timeout_ns != GST_CLOCK_TIME_NONE else GST_SECOND + elapsed_ns = 0 + + while not self._interrupted: + msg = self.gst.bus_timed_pop_filtered(self.bus, poll_timeout, msg_types) + + if msg: + result = self._handle_message(msg) + self.gst.message_unref(msg) + if result is not None: + return result + else: + # No message received, check for timeout + if timeout_ns != GST_CLOCK_TIME_NONE: + elapsed_ns += poll_timeout + if elapsed_ns >= timeout_ns: + self._log_error("ERROR: Pipeline timed out") + return ExitCode.TIMEOUT + + # Interrupted + self._log("Pipeline interrupted") + return ExitCode.ERROR + + def _handle_message(self, msg: ctypes.c_void_p) -> Optional[int]: + """ + Handle a bus message. + + Returns: + None to continue, exit code to stop + """ + if self.gst is None: + return ExitCode.INIT_ERROR + + msg_type = self.gst.message_get_type(msg) + + if msg_type & GST_MESSAGE_EOS: + self._log_verbose("End of stream") + return ExitCode.SUCCESS + + elif msg_type & GST_MESSAGE_ERROR: + error_msg, debug_msg = self.gst.message_parse_error(msg) + self._log_error(f"ERROR: {error_msg}") + if self.verbose and debug_msg: + self._log_error(f"Debug: {debug_msg}") + return ExitCode.ERROR + + elif msg_type & GST_MESSAGE_WARNING: + warn_msg, debug_msg = self.gst.message_parse_warning(msg) + if not self.quiet: + self._log(f"WARNING: {warn_msg}") + if self.verbose and debug_msg: + self._log(f"Debug: {debug_msg}") + + elif msg_type & GST_MESSAGE_ELEMENT: + # Handle element messages (e.g., from videocodectestsink) + if self.print_messages: + structure = self.gst.message_get_structure(msg) + if structure: + struct_str = self.gst.structure_to_string(structure) + if struct_str: + print(struct_str) + + elif self.print_messages: + # Print all other messages if -m flag is set + structure = self.gst.message_get_structure(msg) + if structure: + struct_str = self.gst.structure_to_string(structure) + if struct_str: + print(struct_str) + + return None + + def _cleanup(self) -> None: + """Clean up resources.""" + if self.gst is None: + return + + if self.pipeline: + self._log_verbose("Setting pipeline to NULL") + self.gst.element_set_state(self.pipeline, GST_STATE_NULL) + self.gst.object_unref(self.pipeline) + self.pipeline = None + + if self.bus: + self.gst.object_unref(self.bus) + self.bus = None + + +def main() -> int: + """Main entry point.""" + parser = argparse.ArgumentParser( + description="Run GStreamer pipelines using CFFI bindings", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s "videotestsrc num-buffers=100 ! fakesink" + %(prog)s -m "filesrc location=test.mp4 ! decodebin ! videoconvert ! fakesink" + %(prog)s -t 30 "filesrc location=test.h264 ! h264parse ! avdec_h264 ! fakesink" +""", + ) + parser.add_argument( + "pipeline", + nargs="?", + help="Pipeline description (gst-launch format)", + ) + parser.add_argument( + "-m", + "--messages", + action="store_true", + help="Print all bus messages", + ) + parser.add_argument( + "-v", + "--verbose", + action="store_true", + help="Enable verbose output", + ) + parser.add_argument( + "-t", + "--timeout", + type=int, + default=None, + help="Timeout in seconds", + ) + parser.add_argument( + "-q", + "--quiet", + action="store_true", + help="Suppress output except errors", + ) + parser.add_argument( + "--no-fault", + action="store_true", + help="Don't install a fault handler (ignored, for compatibility with gst-launch)", + ) + + args = parser.parse_args() + + # If no pipeline provided, check if remaining args form the pipeline + # This handles the case where the pipeline is passed without quotes + if not args.pipeline: + parser.print_help() + return ExitCode.INIT_ERROR + + try: + runner = GStreamerRunner( + verbose=args.verbose, + print_messages=args.messages, + quiet=args.quiet, + timeout=args.timeout, + ) + runner.init() + return runner.run_pipeline(args.pipeline) + except GStreamerError as e: + print(f"GStreamer error: {e}", file=sys.stderr) + return ExitCode.INIT_ERROR + except KeyboardInterrupt: + print("\nInterrupted", file=sys.stderr) + return ExitCode.ERROR + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + if args.verbose: + import traceback + + traceback.print_exc() + return ExitCode.INIT_ERROR + + +if __name__ == "__main__": + sys.exit(main())