Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 13 additions & 20 deletions fluster/decoders/gstreamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,20 @@


import os
import shlex
import subprocess
from functools import lru_cache
from typing import List, Optional

from fluster.codec import Codec, OutputFormat
from fluster.decoder import Decoder, register_decoder
from fluster.gstreamer import run_pipeline
from fluster.utils import (
file_checksum,
normalize_binary_cmd,
run_command,
run_command_with_output,
)

PIPELINE_TPL = "{} --no-fault filesrc location={} ! {} ! {} ! {} ! {} {}"
PIPELINE_TPL_FLU_H266_DEC = "{} --no-fault filesrc location={} ! {} ! {} ! {} {}"
PIPELINE_TPL = "filesrc location={} ! {} ! {} ! {} ! {} {}"
PIPELINE_TPL_FLU_H266_DEC = "filesrc location={} ! {} ! {} ! {} {}"


@lru_cache(maxsize=None)
Expand Down Expand Up @@ -86,7 +84,6 @@ class GStreamer(Decoder):
"""Base class for GStreamer decoders"""

decoder_bin = ""
cmd = ""
caps = ""
gst_api = ""
api = ""
Expand All @@ -99,7 +96,6 @@ def __init__(self) -> None:
if not self.name:
self.name = f"{self.provider}-{self.codec.value}-{self.api}-Gst{self.gst_api}"
self.description = f"{self.provider} {self.codec.value} {self.api} decoder for GStreamer {self.gst_api}"
self.cmd = normalize_binary_cmd(self.cmd)

if not gst_element_exists(self.sink):
self.sink = "filesink"
Expand All @@ -113,7 +109,6 @@ def gen_pipeline(
"""Generate the GStreamer pipeline used to decode the test vector"""
output = f"location={output_filepath}" if output_filepath else ""
return PIPELINE_TPL.format(
self.cmd,
input_filepath,
self.parser if self.parser else "parsebin",
self.decoder_bin,
Expand Down Expand Up @@ -157,31 +152,32 @@ def decode(
if self.sink == "videocodectestsink":
output_param = output_filepath if keep_files else None
pipeline = self.gen_pipeline(input_filepath, output_param, output_format)
command = shlex.split(pipeline)
command.append("-m")
data = run_command_with_output(command, timeout=timeout, verbose=verbose).splitlines()
result = run_pipeline(pipeline, timeout=timeout, verbose=verbose, print_messages=True)
if result.returncode != 0:
raise subprocess.CalledProcessError(result.returncode, pipeline, result.stdout, result.stderr)
data = result.stdout.splitlines()
return self.parse_videocodectestsink_md5sum(data)

pipeline = self.gen_pipeline(input_filepath, output_filepath, output_format)
run_command(shlex.split(pipeline), timeout=timeout, verbose=verbose)
result = run_pipeline(pipeline, timeout=timeout, verbose=verbose)
if result.returncode != 0:
raise subprocess.CalledProcessError(result.returncode, pipeline, result.stdout, result.stderr)
return file_checksum(output_filepath)

@lru_cache(maxsize=128)
def check(self, verbose: bool) -> bool:
"""Check if GStreamer decoder is valid (better than gst-inspect)"""
try:
binary = normalize_binary_cmd(f"gst-launch-{self.gst_api}")
pipeline = f"{binary} --no-fault appsrc num-buffers=0 ! {self.decoder_bin} ! fakesink"
run_command(shlex.split(pipeline), verbose=verbose)
pipeline = f"appsrc num-buffers=0 ! {self.decoder_bin} ! fakesink"
result = run_pipeline(pipeline, verbose=verbose)
return result.returncode == 0
except Exception:
return False
return True


class GStreamer10Video(GStreamer):
"""Base class for GStreamer 1.x video decoders"""

cmd = "gst-launch-1.0"
caps = "video/x-raw"
gst_api = "1.0"
sink = "videocodectestsink"
Expand All @@ -205,7 +201,6 @@ def gen_pipeline(
caps = f"{self.caps} ! videoconvert dither=none ! {raw_caps}"
output = f"location={output_filepath}" if output_filepath else ""
return PIPELINE_TPL.format(
self.cmd,
input_filepath,
self.parser if self.parser else "parsebin",
self.decoder_bin,
Expand All @@ -218,7 +213,6 @@ def gen_pipeline(
class GStreamer10Audio(GStreamer):
"""Base class for GStreamer 1.x audio decoders"""

cmd = "gst-launch-1.0"
caps = "audio/x-raw"
gst_api = "1.0"
sink = "filesink"
Expand Down Expand Up @@ -789,7 +783,6 @@ def gen_pipeline(
caps = f"{self.caps} ! videoconvert dither=none ! video/x-raw,format={output_format_to_gst(output_format)}"
output = f"location={output_filepath}" if output_filepath else ""
return PIPELINE_TPL_FLU_H266_DEC.format(
self.cmd,
input_filepath,
self.decoder_bin,
caps,
Expand Down
79 changes: 79 additions & 0 deletions fluster/gstreamer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Fluster - testing framework for decoders conformance
# Copyright (C) 2025, Fluendo, S.A.
# Author: Andoni Morales Alastruey <amorales@fluendo.com>, Fluendo, S.A.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public License
# as published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <https://www.gnu.org/licenses/>.

"""
GStreamer utilities for Fluster.

This package provides CFFI bindings for GStreamer and a pipeline runner
that can be used to run GStreamer pipelines without depending on the
GStreamer Python bindings (gi.repository.Gst).
"""

from __future__ import annotations

import os
import subprocess
import sys
from typing import Optional

from fluster.gstreamer.gst_cffi import GStreamerInstallation
from fluster.gstreamer.runner import ExitCode as ExitCode


def run_pipeline(
pipeline: str,
timeout: Optional[int] = None,
verbose: bool = False,
quiet: bool = False,
print_messages: bool = False,
) -> subprocess.CompletedProcess[str]:
"""
Run a GStreamer pipeline in a subprocess with proper environment setup.

This is a convenience function that handles environment configuration and
spawns the GStreamer runner as a subprocess. It's the recommended way to
run GStreamer pipelines from fluster.

Args:
pipeline: The GStreamer pipeline description string (gst-launch format).
timeout: Timeout in seconds for the pipeline to complete. None for no timeout.
verbose: Enable verbose output from the runner.
quiet: Suppress output except errors.
print_messages: Print all bus messages (like gst-launch -m).

Returns:
subprocess.CompletedProcess with returncode, stdout, and stderr.

Exit codes (see ExitCode enum):
SUCCESS (0) - Pipeline completed successfully (EOS)
ERROR (1) - Pipeline error occurred
INIT_ERROR (2) - Invalid arguments or initialization error
TIMEOUT (3) - Timeout occurred
"""
cmd = [sys.executable, "-m", "fluster.gstreamer.runner"]
if verbose:
cmd.append("--verbose")
if quiet:
cmd.append("--quiet")
if print_messages:
cmd.append("--messages")
if timeout is not None:
cmd.extend(["--timeout", str(timeout)])
cmd.append(pipeline)
env = os.environ.copy()
env.update(GStreamerInstallation().get_environment())
return subprocess.run(cmd, env=env, capture_output=True, text=True, check=False)
Loading