diff --git a/.gitignore b/.gitignore index 47f37ae..565c952 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,5 @@ template* *.bak cov-int/ __pycache__ +*.egg-info/ tags diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..a0bc7cd --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,9 @@ +graft include +graft lib +graft python +graft ext/uuid +include README.rst +include LICENSE +include heatshrink.LICENSE +include setup.py +include pyproject.toml diff --git a/README.rst b/README.rst index 18828cb..608ac20 100644 --- a/README.rst +++ b/README.rst @@ -32,9 +32,17 @@ Documentation is available here: `BPAK documentation`_ Building and installing ----------------------- -The library depends to mbedtls, liblzma, uuid +The library depends on mbedtls and liblzma. -Build library and tool:: +Install Python module:: + + $ pip install . + +This compiles the library and Python extension from source. Requires +``libmbedtls-dev`` and ``liblzma-dev`` (or equivalent) installed on the +system. + +Build C library and tool:: $ mkdir build && cd build $ cmake .. @@ -43,6 +51,7 @@ Build library and tool:: Running tests:: + $ mkdir build && cd build $ cmake .. -DBPAK_BUILD_TESTS=1 $ make && make test diff --git a/docs/build.rst b/docs/build.rst index 37d4235..629592e 100644 --- a/docs/build.rst +++ b/docs/build.rst @@ -4,21 +4,33 @@ Building and installing ----------------------- -The library depends on mbedtls and liblzma +The library depends on mbedtls and liblzma. -Build library and tool:: +Install Python module +--------------------- - $ cmake - $ make - $ sudo make install +:: + + $ pip install . + +This compiles the library and Python extension from source. Requires +``libmbedtls-dev`` and ``liblzma-dev`` (or equivalent) installed on the +system. -Optionally build with python support:: +Build C library and tool +------------------------ - $ cmake -DBPAK_BUILD_PYTHON_WRAPPER=1 +:: + + $ mkdir build && cd build + $ cmake .. + $ make + $ sudo make install Running tests:: - $ cmake -DBPAK_BUILD_TESTS=1 + $ mkdir build && cd build + $ cmake .. -DBPAK_BUILD_TESTS=1 $ make && make test @@ -29,12 +41,11 @@ cmake configure options Option Description =========================== ==================================================== BPAK_BUILD_MINIMAL Build a minimal version of the library -BPAK_BUILD_PYTHON_WRAPPER Build the python wrapper +BPAK_BUILD_TOOL Build the bpak command-line tool (default: ON) BPAK_BUILD_TESTS Build tests =========================== ==================================================== -The default setting is that everything is enabled except the python wrapper and -the tests. +The default setting builds the library and tool. Tests are disabled by default. Build settings diff --git a/docs/ug/01_basics.rst b/docs/ug/01_basics.rst index 2a5a924..3f3ac06 100644 --- a/docs/ug/01_basics.rst +++ b/docs/ug/01_basics.rst @@ -1,8 +1,15 @@ Basic example ============= +.. note:: + + The examples in this user guide use the C ``bpak`` command-line tool. The + ``bpak`` CLI installed from the Python package has a different, idiomatic + Click argv; see :doc:`99_python_cli` for its command reference and a + migration table. + In the simplest use-case for bitpacker the archive can be viewed as a container -format for other binaries with metadata on sizes and offsets of the parts it +format for other binaries with metadata on sizes and offsets of the parts it contains. Create an empty archive:: diff --git a/docs/ug/99_python_cli.rst b/docs/ug/99_python_cli.rst new file mode 100644 index 0000000..e409ec7 --- /dev/null +++ b/docs/ug/99_python_cli.rst @@ -0,0 +1,161 @@ +Python CLI +========== + +The ``bpak`` command shipped by the ``bpak`` Python package (``pip install bpak``) +is an idiomatic Click application. Its command-line interface is **not** the +same as the C ``bpak`` binary documented in the preceding chapters. The C +binary is unchanged; anyone needing the historical argv should continue to +install and run it directly. + +If you were using a previous version of the Python ``bpak`` script, see the +migration table below for the new shape of every subcommand. + +Overview +-------- + +- Two-level command groups: operations on parts and metadata are written as + ``noun verb``, e.g. ``bpak add part`` / ``bpak delete meta``. +- ``-v/--verbose`` is global on the root group: ``bpak -v add part ...`` rather + than per-command. +- The IDs of parts and metadata are positional arguments on every command that + addresses a single item. Where the C CLI accepted ``--part foo``, the Python + CLI takes ``foo`` positionally. +- Commands that produce binary data (``extract part``, ``extract meta``, + ``show hash --binary``) refuse to write to stdout when stdout is a terminal. + Redirect to a file or pass ``--output PATH``. +- ``generate keystore --name`` validates its argument as a C identifier + (``[A-Za-z_][A-Za-z0-9_]*``) before emitting any generated C source. + +Command reference +----------------- + +.. code-block:: text + + bpak [--version] [-v/--verbose...] + + Package lifecycle + bpak create FILE [--hash {sha256|sha384|sha512}] + [--signature {prime256v1|secp384r1|secp521r1|rsa4096}] + [--force] + bpak compare FILE1 FILE2 + + Inspection + bpak show FILE # full package overview + bpak show meta FILE [ID] [--part-ref REF] + bpak show part FILE ID [--hash] + bpak show hash FILE [--binary] # header hash (Package.digest) + + Content + bpak add part FILE ID --from PATH [--no-hash] + bpak add meta FILE ID (--from-string VAL | --from-file PATH) + [--encoder {uuid|integer|id}] [--part-ref REF] + bpak add key FILE ID --from PATH + bpak add merkle FILE ID --from PATH + + bpak set meta FILE ID VALUE [--encoder {integer|id}] [--part-ref REF] + bpak set header FILE [--key-id ID] [--keystore-id ID] + + bpak delete part FILE (ID | --all) [--keep-meta] + bpak delete meta FILE ID [--part-ref REF] + + bpak extract part FILE ID [--output PATH] + bpak extract meta FILE ID [--output PATH] [--part-ref REF] + + Signing + bpak sign FILE (--key PATH | --signature PATH) + bpak verify FILE (--key PATH | --keystore PATH) + + Transport + bpak transport add FILE ID --encoder NAME --decoder NAME + bpak transport encode FILE --output PATH [--origin PATH] + bpak transport decode FILE --output PATH [--origin PATH] + + Code generation + bpak generate id STRING + bpak generate keystore FILE --name NAME [--decorate] + +Notable differences from the C CLI +---------------------------------- + +``show`` prints a single ``Header hash`` line in its overview output. The C +CLI's full ``bpak show`` additionally prints a ``Payload hash`` value. The +Python wrapper only exposes the header hash today; use the C ``bpak show`` +binary when you need the payload hash. + +``show hash`` prints the header hash. It replaces the overloaded +C-style ``-H`` / ``-B`` / ``-P`` flag combinations. For a part-level hash, +use ``bpak show part FILE ID --hash``. + +``compare`` diffs part contents via ``Package.part_sha256`` on both sides +in addition to part-header fields. The previous Python CLI only compared +part *sizes*, so same-size-different-content parts were silently reported +as equal. + +Migration from the previous Python CLI +-------------------------------------- + ++----------------------------------------------------+----------------------------------------------------------+ +| Previous Python CLI | New Python CLI | ++====================================================+==========================================================+ +| ``bpak create FILE -H sha256 -S prime256v1 -Y`` | ``bpak create FILE --hash sha256 --signature prime256v1 | +| | --force`` | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak add FILE -p fs -f rootfs.img`` | ``bpak add part FILE fs --from rootfs.img`` | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak add FILE -m version -s 1.0.0`` | ``bpak add meta FILE version --from-string 1.0.0`` | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak add FILE -p key --encoder key -f k.pem`` | ``bpak add key FILE key --from k.pem`` | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak add FILE -p fs --encoder merkle -f r.img`` | ``bpak add merkle FILE fs --from r.img`` | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak show FILE`` | ``bpak show FILE`` (unchanged shape; overview) | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak show FILE -m version`` | ``bpak show meta FILE version`` | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak show FILE -P fs`` | ``bpak show part FILE fs --hash`` | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak show FILE -H`` | ``bpak show hash FILE`` | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak show FILE -B`` | ``bpak show hash FILE --binary`` | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak set FILE -m version -s 2.0.0`` | ``bpak set meta FILE version 2.0.0`` | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak set FILE --key-id X --keystore-id Y`` | ``bpak set header FILE --key-id X --keystore-id Y`` | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak delete FILE -p fs`` | ``bpak delete part FILE fs`` | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak delete FILE -a`` | ``bpak delete part FILE --all`` | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak extract FILE -p fs -o fs.img`` | ``bpak extract part FILE fs --output fs.img`` | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak extract FILE -m version`` | ``bpak extract meta FILE version`` (stdout, non-TTY) | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak transport FILE --add --part fs`` | ``bpak transport add FILE fs --encoder ENC --decoder D`` | +| ``--encoder E --decoder D`` | | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak transport FILE --encode --output o.bpak`` | ``bpak transport encode FILE --output o.bpak`` | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak transport FILE --decode --output o.bpak`` | ``bpak transport decode FILE --output o.bpak`` | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak sign FILE -k priv.pem`` | ``bpak sign FILE --key priv.pem`` | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak verify FILE -k pub.pem`` | ``bpak verify FILE --key pub.pem`` | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak verify FILE -K keystore.bpak`` | ``bpak verify FILE --keystore keystore.bpak`` | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak generate id STRING`` | ``bpak generate id STRING`` (unchanged) | ++----------------------------------------------------+----------------------------------------------------------+ +| ``bpak generate keystore FILE -n NAME`` | ``bpak generate keystore FILE --name NAME`` | ++----------------------------------------------------+----------------------------------------------------------+ + +Breaking changes (no compatibility shims) +----------------------------------------- + +- All short flags that differ from the common ``-o`` / ``-v`` set are removed. +- ``-Y`` is replaced by ``--force`` on ``create``. +- ``show`` no longer overloads ``-p`` to mean "filter by part reference" when + ``-m`` is given. Use ``--part-ref REF`` explicitly. +- ``add``, ``set``, ``delete``, and ``extract`` are split into two-level + subgroups (``part`` / ``meta`` / ``key`` / ``merkle`` / ``header``). +- ``transport`` subcommands replace the ``--add / --encode / --decode`` mode + flags. diff --git a/examples/python/Dockerfile b/examples/python/Dockerfile index ec35b6d..3243a05 100644 --- a/examples/python/Dockerfile +++ b/examples/python/Dockerfile @@ -1,18 +1,11 @@ # build image from bpak root folder -from python:slim +FROM python:slim -RUN apt-get update -y - -RUN apt-get install -y gcc -RUN apt-get install -y pkgconf -RUN apt-get install -y autoconf-archive -RUN apt-get install -y libtool -RUN apt-get install -y make +RUN apt-get update -y && apt-get install -y \ + gcc \ + libmbedtls-dev \ + liblzma-dev COPY . . -RUN autoreconf -fi -RUN ./configure --enable-python-library --disable-dependency-tracking -RUN make -RUN make install -RUN ldconfig +RUN pip install . diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..d4211f2 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,79 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "bpak" +requires-python = ">=3.10" +dynamic = [ + "version", + "description", + "readme", + "authors", + "license", + "classifiers", + "dependencies", + "scripts", + "urls", +] + +[dependency-groups] +dev = [ + "ruff>=0.6", + "mypy>=1.11", +] + +[tool.ruff] +line-length = 100 +extend-exclude = [ + "docs", + "build", +] +target-version = "py310" + +[tool.ruff.lint] +# See: https://docs.astral.sh/ruff/rules/ for details +select = ["ALL"] +ignore = [ + "COM812", # Conflicts with the formatter + "ISC001", # Conflicts with the formatter + "PLE0605", # ruff does not handle += in __all__ + "FBT", # don't care for these at the moment + "PT001", # https://github.com/astral-sh/ruff/issues/8796#issuecomment-1825907715 + "PT023", # https://github.com/astral-sh/ruff/issues/8796#issuecomment-1825907715 + "TRY003", # external message strings on ClickException raises + "EM101", # ditto: tolerate string-literal in exception constructors + "EM102", # ditto: tolerate f-string in exception constructors + "D203", # conflicts with D211 (pydocstyle pep257 convention) + "D213", # conflicts with D212 (pydocstyle pep257 convention) + "ERA001", # commented-out-code false positives +] + +pyupgrade.keep-runtime-typing = true +pylint.max-args = 8 + +[tool.ruff.lint.per-file-ignores] +"python/bpak/__init__.py" = ["F401", "A004"] +"python/bpak/_bpak.pyi" = ["A001", "A002"] +"python/bpak/_cli/__init__.py" = ["E402", "PLC0415"] +"python/bpak/_cli/*.py" = ["S101"] +"test/**/*.py" = [ + "S101", "T201", "ANN", "D", "INP001", "PLR2004", + "PLC0415", "E402", "I001", "S603", "S605", "S607", "S108", +] + +[tool.ruff.lint.pydocstyle] +convention = "pep257" + +[tool.ruff.format] +quote-style = "double" +indent-style = "space" +skip-magic-trailing-comma = false +line-ending = "auto" + +[tool.mypy] +warn_return_any = true +exclude = [ + "^build.*", + "^docs.*", +] diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 04ec206..aef985b 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -4,8 +4,55 @@ find_package(Python 3.6 COMPONENTS Interpreter Development ) -add_library( - ${PROJECT_NAME}-python-wrapper SHARED +set(_BPAK_PY_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/bpak") + +# Stage the pure-Python package layout inside the build tree so that +# `sys.path.insert(0, "../python/") ; import bpak` resolves to a complete +# package rooted at ${CMAKE_CURRENT_BINARY_DIR}/bpak/, with the compiled +# extension placed alongside __init__.py as bpak._bpak. +file(MAKE_DIRECTORY "${_BPAK_PY_BUILD_DIR}/_cli") + +set(_BPAK_PY_TOP_FILES + __init__.py + __main__.py + _helpers.py +) +set(_BPAK_PY_CLI_FILES + _cli/__init__.py + _cli/_common.py + _cli/add.py + _cli/compare.py + _cli/create.py + _cli/delete.py + _cli/extract.py + _cli/generate.py + _cli/set.py + _cli/show.py + _cli/sign.py + _cli/transport.py +) + +set(_BPAK_PY_STAGED_OUTPUTS) +foreach(_src IN LISTS _BPAK_PY_TOP_FILES _BPAK_PY_CLI_FILES) + set(_in "${CMAKE_CURRENT_SOURCE_DIR}/bpak/${_src}") + set(_out "${_BPAK_PY_BUILD_DIR}/${_src}") + add_custom_command( + OUTPUT "${_out}" + COMMAND ${CMAKE_COMMAND} -E copy "${_in}" "${_out}" + DEPENDS "${_in}" + COMMENT "Staging Python source: ${_src}" + ) + list(APPEND _BPAK_PY_STAGED_OUTPUTS "${_out}") +endforeach() + +add_custom_target(${PROJECT_NAME}-python-sources ALL + DEPENDS ${_BPAK_PY_STAGED_OUTPUTS}) + +# Build the CPython extension `bpak._bpak`. The init function in +# python_wrapper.c is PyInit__bpak, which Python expects from a module +# file named `_bpak`, so name the output accordingly. +Python_add_library( + ${PROJECT_NAME}-python-wrapper MODULE python_wrapper.c package.c meta.c @@ -17,21 +64,20 @@ target_include_directories(${PROJECT_NAME}-python-wrapper PRIVATE ${Python_INCLUDE_DIRS} ) -target_link_libraries(${PROJECT_NAME}-python-wrapper - ${Python_LIBRARIES} +target_link_libraries(${PROJECT_NAME}-python-wrapper PRIVATE ${PROJECT_NAME} ) -set_target_properties(${PROJECT_NAME}-python-wrapper - PROPERTIES VERSION ${PROJECT_VERSION}) -set_target_properties(${PROJECT_NAME}-python-wrapper - PROPERTIES SOVERSION ${PROJECT_VERSION_MAJOR}) - target_compile_options(${PROJECT_NAME}-python-wrapper PRIVATE -Wextra -pedantic -I${${PROJECT_NAME}_BINARY_DIR}/lib/ ) -set_target_properties(${PROJECT_NAME}-python-wrapper PROPERTIES PREFIX "") -set_target_properties(${PROJECT_NAME}-python-wrapper - PROPERTIES OUTPUT_NAME ${PROJECT_NAME}) +set_target_properties(${PROJECT_NAME}-python-wrapper PROPERTIES + OUTPUT_NAME "_bpak" + PREFIX "" + LIBRARY_OUTPUT_DIRECTORY "${_BPAK_PY_BUILD_DIR}" +) + +add_dependencies(${PROJECT_NAME}-python-wrapper + ${PROJECT_NAME}-python-sources) diff --git a/python/bpak/__init__.py b/python/bpak/__init__.py new file mode 100644 index 0000000..e6d5a22 --- /dev/null +++ b/python/bpak/__init__.py @@ -0,0 +1,61 @@ +"""BPAK - Bit Packer.""" + +from ._bpak import ( + FLAG_EXCLUDE_FROM_HASH, + FLAG_TRANSPORT, + HASH_SHA256, + HASH_SHA384, + HASH_SHA512, + KEY_PUB_RSA4096, + SIGN_RSA4096, + Error, + KEY_PUB_PRIME256v1, + KEY_PUB_SECP384r1, + KEY_PUB_SECP521r1, + Meta, + Package, + Part, + SIGN_PRIME256v1, + SIGN_SECP384r1, + SIGN_SECP521r1, + add_transport_meta, + hash_kind_str, + id, + id_to_string, + meta_to_string, + parse_public_key, + set_log_func, + signature_kind_str, + transport_decode, + transport_encode, +) + +__all__ = [ + "FLAG_EXCLUDE_FROM_HASH", + "FLAG_TRANSPORT", + "HASH_SHA256", + "HASH_SHA384", + "HASH_SHA512", + "KEY_PUB_RSA4096", + "SIGN_RSA4096", + "Error", + "KEY_PUB_PRIME256v1", + "KEY_PUB_SECP384r1", + "KEY_PUB_SECP521r1", + "Meta", + "Package", + "Part", + "SIGN_PRIME256v1", + "SIGN_SECP384r1", + "SIGN_SECP521r1", + "add_transport_meta", + "hash_kind_str", + "id", + "id_to_string", + "meta_to_string", + "parse_public_key", + "set_log_func", + "signature_kind_str", + "transport_decode", + "transport_encode", +] diff --git a/python/bpak/__main__.py b/python/bpak/__main__.py new file mode 100644 index 0000000..34e9aa6 --- /dev/null +++ b/python/bpak/__main__.py @@ -0,0 +1,6 @@ +"""bpak CLI entry point.""" + +from ._cli import cli + +if __name__ == "__main__": + cli() diff --git a/python/bpak/_bpak.pyi b/python/bpak/_bpak.pyi new file mode 100644 index 0000000..d082923 --- /dev/null +++ b/python/bpak/_bpak.pyi @@ -0,0 +1,113 @@ +from collections.abc import Callable +from types import TracebackType +from typing import Final, Self + +FLAG_EXCLUDE_FROM_HASH: Final[int] +FLAG_TRANSPORT: Final[int] + +HASH_SHA256: Final[int] +HASH_SHA384: Final[int] +HASH_SHA512: Final[int] + +KEY_PUB_PRIME256v1: Final[int] +KEY_PUB_RSA4096: Final[int] +KEY_PUB_SECP384r1: Final[int] +KEY_PUB_SECP521r1: Final[int] + +SIGN_PRIME256v1: Final[int] +SIGN_RSA4096: Final[int] +SIGN_SECP384r1: Final[int] +SIGN_SECP521r1: Final[int] + +class Error(Exception): ... + +class Meta: + id: int + part_id_ref: int + size: int + raw_data: bytes + + def as_uuid(self) -> object: ... + def as_string(self) -> str: ... + def delete(self) -> None: ... + +class Part: + id: int + size: int + offset: int + is_transport_encoded: bool + flags: int + transport_size: int + pad_bytes: int + + def read_data(self) -> bytes: ... + def delete(self, keep_meta: bool = False) -> None: ... + +class Package: + digest: bytes + hash_kind: int + signature_kind: int + key_id: int + keystore_id: int + signature: bytes + size: int + installed_size: int + parts: list[Part] + meta: list[Meta] + + def __init__(self, filename: str, mode: str = "rb") -> None: ... + def __enter__(self) -> Self: ... + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: ... + def close(self) -> None: ... + def verify(self, verify_key_path: str) -> bool: ... + def sign(self, sign_key_path: str) -> bool: ... + def verify_with_keystore(self, keystore_path: str) -> bool: ... + def add_file( + self, + part_name: str, + filename: str, + with_merkle_tree: bool = False, + flags: int = 0, + ) -> Part: ... + def add_key(self, part_name: str, filename: str, flags: int = 0) -> Part: ... + def add_meta( + self, + id: int, + part_id_ref: int = 0, + data: bytes | str | None = None, + size: int = -1, + ) -> Meta: ... + def get_part(self, id: int) -> Part: ... + def get_meta(self, id: int, part_id_ref: int = 0) -> Meta: ... + def extract_file(self, part_id: int, filename: str) -> None: ... + def delete_all_parts(self, keep_meta: bool = False) -> None: ... + def part_sha256(self, part_id: int) -> bytes: ... + +def id(string: str) -> int: ... +def set_log_func(log_func: Callable[[int, str], None] | None) -> None: ... +def transport_encode( + input: Package, + output: Package, + origin: Package | None = None, +) -> None: ... +def transport_decode( + input: Package, + output: Package, + origin: Package | None = None, +) -> None: ... +def hash_kind_str(kind: int) -> str: ... +def signature_kind_str(kind: int) -> str: ... +def id_to_string(id: int) -> str | None: ... +def meta_to_string(package: Package, meta: Meta) -> str | None: ... +def add_transport_meta( + package: Package, + part_id: int, + encoder_id: int, + decoder_id: int, +) -> None: ... +def parse_public_key(data: bytes) -> tuple[int, bytes]: ... diff --git a/python/bpak/_cli/__init__.py b/python/bpak/_cli/__init__.py new file mode 100644 index 0000000..e4ba003 --- /dev/null +++ b/python/bpak/_cli/__init__.py @@ -0,0 +1,68 @@ +"""bpak CLI root group.""" + +from __future__ import annotations + +from importlib.metadata import PackageNotFoundError, version + +import click + +from ._common import install_verbose + + +def _print_version(ctx: click.Context, _param: click.Option, value: bool) -> None: + if not value or ctx.resilient_parsing: + return + try: + ver = version("bpak") + except PackageNotFoundError: + ver = "unknown" + click.echo(f"BitPacker {ver}") + ctx.exit(0) + + +@click.group(context_settings={"help_option_names": ["-h", "--help"]}) +@click.option( + "-V", + "--version", + is_flag=True, + is_eager=True, + expose_value=False, + callback=_print_version, + help="Display version and exit", +) +@click.option( + "-v", + "--verbose", + count=True, + help="Increase verbosity (repeatable)", +) +@click.pass_context +def cli(ctx: click.Context, verbose: int) -> None: + """BPAK - Bit Packer.""" + install_verbose(ctx, verbose) + + +# Wire subcommands. Imports are deferred here so a command file can import +# from ._common without a circular dependency on this module. +from . import add as _add +from . import compare as _compare +from . import create as _create +from . import delete as _delete +from . import extract as _extract +from . import generate as _generate +from . import set as _set +from . import show as _show +from . import sign as _sign +from . import transport as _transport + +cli.add_command(_create.create) +cli.add_command(_compare.compare) +cli.add_command(_show.show) +cli.add_command(_add.add) +cli.add_command(_set.set_) +cli.add_command(_delete.delete) +cli.add_command(_extract.extract) +cli.add_command(_sign.sign) +cli.add_command(_sign.verify) +cli.add_command(_transport.transport) +cli.add_command(_generate.generate) diff --git a/python/bpak/_cli/_common.py b/python/bpak/_cli/_common.py new file mode 100644 index 0000000..6061bd4 --- /dev/null +++ b/python/bpak/_cli/_common.py @@ -0,0 +1,187 @@ +"""Shared plumbing for the bpak CLI.""" + +from __future__ import annotations + +import functools +import re +import sys +from pathlib import Path +from typing import IO, TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar + +import click + +from bpak import _bpak +from bpak._helpers import resolve_id + +if TYPE_CHECKING: + from collections.abc import Callable + +BPAK_METADATA_BYTES = getattr(_bpak, "BPAK_METADATA_BYTES", 1920) +BPAK_MAX_SIGNATURE_BYTES = getattr(_bpak, "BPAK_MAX_SIGNATURE_BYTES", 512) + +_C_IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") + +P = ParamSpec("P") +R = TypeVar("R") + + +class BpakId(click.ParamType): + name = "bpak_id" + + def convert( + self, + value: Any, # noqa: ANN401 - Click ParamType receives Any + param: click.Parameter | None, + ctx: click.Context | None, + ) -> int: + if isinstance(value, int): + return value + try: + return resolve_id(value) + except Exception as exc: # noqa: BLE001 - surface any parse error as UsageError + self.fail(f"{value!r} is not a valid bpak id ({exc})", param, ctx) + + +BPAK_ID = BpakId() + + +def _log_callback_factory(ctx: click.Context) -> Callable[[int, str], None]: + def _cb(level: int, msg: str) -> None: + v = ctx.obj.get("verbose", 0) + if level == 0 or level <= v: + click.echo(msg, nl=False, err=True) + + return _cb + + +def install_verbose(ctx: click.Context, verbose: int) -> None: + """Wire the _bpak log callback for this invocation and tear it down on exit.""" + ctx.ensure_object(dict) + ctx.obj["verbose"] = verbose + if verbose > 0: + _bpak.set_log_func(_log_callback_factory(ctx)) + else: + _bpak.set_log_func(None) + ctx.call_on_close(lambda: _bpak.set_log_func(None)) + + +def open_package( + mode: str = "rb", +) -> Callable[ + [Callable[Concatenate[_bpak.Package, P], R]], + Callable[Concatenate[str, P], R], +]: + """Open the package file and pass ``pkg`` to the callback. + + Translates ``_bpak.Error`` raised by the callback into a + ``click.ClickException``. The decorated callback must have signature + ``(pkg, ...)``; Click's own argument injection keeps working for + everything after ``pkg``. + """ + + def decorator( + func: Callable[Concatenate[_bpak.Package, P], R], + ) -> Callable[Concatenate[str, P], R]: + @functools.wraps(func) + def wrapper(filename: str, *args: P.args, **kwargs: P.kwargs) -> R: + try: + with _bpak.Package(filename, mode) as pkg: + return func(pkg, *args, **kwargs) + except _bpak.Error as exc: + raise click.ClickException(str(exc)) from exc + + return wrapper + + return decorator + + +def handle_bpak_errors(func: Callable[P, R]) -> Callable[P, R]: + """Translate ``_bpak.Error`` to ``click.ClickException``. + + For commands that manage their own Package open/close (e.g. commands + that open two packages). + """ + + @functools.wraps(func) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + try: + return func(*args, **kwargs) + except _bpak.Error as exc: + raise click.ClickException(str(exc)) from exc + + return wrapper + + +def _is_set(value: object) -> bool: + """Report whether a parsed option counts as "set" for validation. + + Accept 0 / "" / False-only-if-it-came-from-an-explicit-false as "set", + since Click only yields the default otherwise. A `None` means the option + was never provided; a `False` bool flag is likewise unset (the absence + of an `is_flag=True` flag). + """ + if value is None: + return False + return value is not False + + +def exactly_one_of(values: dict[str, Any]) -> str: + """Return the one option name whose value is set, or raise UsageError.""" + set_names = [name for name, v in values.items() if _is_set(v)] + if len(set_names) == 0: + raise click.UsageError(f"one of {', '.join(values.keys())} is required") + if len(set_names) > 1: + raise click.UsageError( + f"only one of {', '.join(values.keys())} is allowed (got: {', '.join(set_names)})" + ) + return set_names[0] + + +def at_least_one_of(values: dict[str, Any]) -> None: + if not any(_is_set(v) for v in values.values()): + raise click.UsageError(f"at least one of {', '.join(values.keys())} is required") + + +def incompatible(values: dict[str, Any]) -> None: + set_names = [k for k, v in values.items() if _is_set(v)] + if len(set_names) > 1: + raise click.UsageError( + f"{', '.join(values.keys())} are mutually exclusive (got: {', '.join(set_names)})" + ) + + +def binary_sink(output_path: str | None) -> IO[bytes]: + """Return a writable binary stream. + + - output_path given: file opened for writing (caller is responsible for closing). + - output_path None: sys.stdout.buffer, but only when stdout is non-TTY; + otherwise UsageError to prevent terminal corruption. + """ + if output_path is not None: + return Path(output_path).open("wb") + if sys.stdout.isatty(): + raise click.UsageError( + "refusing to write binary data to a terminal; redirect stdout or pass --output PATH" + ) + return sys.stdout.buffer + + +def safe_c_identifier(name: str) -> str: + if not _C_IDENTIFIER_RE.match(name): + raise click.UsageError( + f"{name!r} is not a valid C identifier (expected [A-Za-z_][A-Za-z0-9_]*)" + ) + return name + + +def bin2hex(data: bytes) -> str: + return data.hex() + + +def flag_str(flags: int) -> str: + chars = list("--------") + if flags & _bpak.FLAG_EXCLUDE_FROM_HASH: + chars[0] = "h" + if flags & _bpak.FLAG_TRANSPORT: + chars[1] = "T" + return "".join(chars) diff --git a/python/bpak/_cli/add.py b/python/bpak/_cli/add.py new file mode 100644 index 0000000..50d300b --- /dev/null +++ b/python/bpak/_cli/add.py @@ -0,0 +1,144 @@ +"""`bpak add` group — add part, meta, key, merkle to a package.""" + +from __future__ import annotations + +from pathlib import Path + +import click + +from bpak import _bpak +from bpak._helpers import encode_meta_value + +from ._common import ( + BPAK_ID, + BPAK_METADATA_BYTES, + exactly_one_of, + handle_bpak_errors, + open_package, +) + + +@click.group() +def add() -> None: + """Add content to a bpak file.""" + + +@add.command("part") +@click.argument("filename", type=click.Path(exists=True, dir_okay=False)) +@click.argument("id_", metavar="ID") +@click.option( + "--from", + "from_path", + type=click.Path(exists=True, dir_okay=False), + required=True, + help="Path to the source file whose bytes become the part contents", +) +@click.option( + "--no-hash", + is_flag=True, + help="Exclude this part from the package hash", +) +@handle_bpak_errors +@open_package("r+") +def add_part(pkg: _bpak.Package, id_: str, from_path: str, no_hash: bool) -> None: + """Add a raw part from a file. + + The ID is the part's symbolic name (e.g. "rootfs"); the underlying + library hashes it into a 32-bit ID. + """ + flags = _bpak.FLAG_EXCLUDE_FROM_HASH if no_hash else 0 + pkg.add_file(id_, from_path, flags=flags) + + +@add.command("meta") +@click.argument("filename", type=click.Path(exists=True, dir_okay=False)) +@click.argument("id_", metavar="ID", type=BPAK_ID) +@click.option( + "--from-string", + "from_string", + help="Value as a string (null-terminated by default, or passed through an encoder)", +) +@click.option( + "--from-file", + "from_file", + type=click.Path(exists=True, dir_okay=False), + help="Read the raw metadata bytes from this file", +) +@click.option( + "--encoder", + type=click.Choice(["uuid", "integer", "id"]), + help="Interpret --from-string as this type before storing", +) +@click.option( + "--part-ref", + "part_ref", + type=BPAK_ID, + default=0, + show_default=True, + help="Associate this metadata with the given part", +) +@handle_bpak_errors +@open_package("r+") +def add_meta( + pkg: _bpak.Package, + id_: int, + from_string: str | None, + from_file: str | None, + encoder: str | None, + part_ref: int, +) -> None: + """Add a metadata entry.""" + exactly_one_of({"--from-string": from_string, "--from-file": from_file}) + + if encoder and from_file: + raise click.UsageError("--encoder only applies with --from-string") + + if from_string is not None: + if encoder: + data = encode_meta_value(from_string, encoder) + else: + data = from_string.encode("ascii") + b"\x00" + else: + assert from_file is not None + data = Path(from_file).read_bytes() + + if len(data) > BPAK_METADATA_BYTES: + raise click.ClickException( + f"metadata value too large ({len(data)} > {BPAK_METADATA_BYTES} bytes)" + ) + + pkg.add_meta(id_, part_ref, data) + + +@add.command("key") +@click.argument("filename", type=click.Path(exists=True, dir_okay=False)) +@click.argument("id_", metavar="ID") +@click.option( + "--from", + "from_path", + type=click.Path(exists=True, dir_okay=False), + required=True, + help="Path to the public key file to embed", +) +@handle_bpak_errors +@open_package("r+") +def add_key(pkg: _bpak.Package, id_: str, from_path: str) -> None: + """Embed a public key as a part (ID is the key's symbolic name).""" + pkg.add_key(id_, from_path) + + +@add.command("merkle") +@click.argument("filename", type=click.Path(exists=True, dir_okay=False)) +@click.argument("id_", metavar="ID") +@click.option( + "--from", + "from_path", + type=click.Path(exists=True, dir_okay=False), + required=True, + help="Source file to hash into a merkle tree part", +) +@handle_bpak_errors +@open_package("r+") +def add_merkle(pkg: _bpak.Package, id_: str, from_path: str) -> None: + """Add a part with an accompanying merkle tree (ID is the part's symbolic name).""" + pkg.add_file(id_, from_path, with_merkle_tree=True) diff --git a/python/bpak/_cli/compare.py b/python/bpak/_cli/compare.py new file mode 100644 index 0000000..1fdbbc1 --- /dev/null +++ b/python/bpak/_cli/compare.py @@ -0,0 +1,81 @@ +"""`bpak compare` — diff two bpak files.""" + +from __future__ import annotations + +import click + +from bpak import _bpak + +from ._common import handle_bpak_errors + + +@click.command() +@click.argument("file1", type=click.Path(exists=True, dir_okay=False)) +@click.argument("file2", type=click.Path(exists=True, dir_okay=False)) +@handle_bpak_errors +def compare(file1: str, file2: str) -> None: + """Compare two bpak files: metadata, part headers, and part contents.""" + with _bpak.Package(file1, "rb") as pkg1, _bpak.Package(file2, "rb") as pkg2: + click.echo("Metadata:") + meta1 = {(m.id, m.part_id_ref): m for m in pkg1.meta} + meta2 = {(m.id, m.part_id_ref): m for m in pkg2.meta} + for key in sorted(set(meta1) | set(meta2)): + mid, mref = key + m1 = meta1.get(key) + m2 = meta2.get(key) + id_name = _bpak.id_to_string(mid) or "" + if m1 and m2: + if m1.raw_data == m2.raw_data: + sym, color = "=", None + else: + sym, color = "*", "yellow" + elif m1: + sym, color = "-", "red" + else: + sym, color = "+", "green" + click.secho( + f" {sym} {mid:08x} {id_name:<20s} ref={mref:08x}", + fg=color, + ) + + click.echo("\nParts:") + parts1 = {p.id: p for p in pkg1.parts} + parts2 = {p.id: p for p in pkg2.parts} + for pid in sorted(set(parts1) | set(parts2)): + p1 = parts1.get(pid) + p2 = parts2.get(pid) + id_name = _bpak.id_to_string(pid) or "" + if p1 and p2: + same = _parts_equal(pkg1, pkg2, p1, p2) + sym, color = ("=", None) if same else ("*", "yellow") + elif p1: + sym, color = "-", "red" + else: + sym, color = "+", "green" + present = p1 if p1 is not None else p2 + assert present is not None + size = present.size + click.secho( + f" {sym} {pid:08x} {id_name:<20s} size={size}", + fg=color, + ) + + +def _parts_equal( + pkg1: _bpak.Package, + pkg2: _bpak.Package, + p1: _bpak.Part, + p2: _bpak.Part, +) -> bool: + """Compare part-header fields and content via SHA-256.""" + if ( + p1.size != p2.size + or p1.pad_bytes != p2.pad_bytes + or p1.flags != p2.flags + or p1.transport_size != p2.transport_size + ): + return False + try: + return pkg1.part_sha256(p1.id) == pkg2.part_sha256(p2.id) + except _bpak.Error: + return False diff --git a/python/bpak/_cli/create.py b/python/bpak/_cli/create.py new file mode 100644 index 0000000..4e2b87d --- /dev/null +++ b/python/bpak/_cli/create.py @@ -0,0 +1,48 @@ +"""`bpak create` — create a new empty package.""" + +from __future__ import annotations + +from pathlib import Path + +import click + +from bpak import _bpak +from bpak._helpers import HASH_KIND_MAP, SIGN_KIND_MAP + +from ._common import handle_bpak_errors + + +@click.command() +@click.argument("filename", type=click.Path()) +@click.option( + "--hash", + "hash_kind", + type=click.Choice(list(HASH_KIND_MAP.keys())), + default="sha256", + show_default=True, + help="Hash algorithm", +) +@click.option( + "--signature", + "signature_kind", + type=click.Choice(list(SIGN_KIND_MAP.keys())), + default="prime256v1", + show_default=True, + help="Signature algorithm", +) +@click.option("--force", is_flag=True, help="Overwrite without asking") +@handle_bpak_errors +def create(filename: str, hash_kind: str, signature_kind: str, force: bool) -> None: + """Create a new empty bpak file.""" + if ( + Path(filename).exists() + and not force + and not click.confirm( + f"File '{filename}' exists. Overwrite?", + ) + ): + return + + with _bpak.Package(filename, "wb") as pkg: + pkg.hash_kind = HASH_KIND_MAP[hash_kind] + pkg.signature_kind = SIGN_KIND_MAP[signature_kind] diff --git a/python/bpak/_cli/delete.py b/python/bpak/_cli/delete.py new file mode 100644 index 0000000..67ae94f --- /dev/null +++ b/python/bpak/_cli/delete.py @@ -0,0 +1,73 @@ +"""`bpak delete` group — remove parts or metadata.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import click + +from ._common import ( + BPAK_ID, + exactly_one_of, + handle_bpak_errors, + open_package, +) + +if TYPE_CHECKING: + from bpak import _bpak + + +@click.group() +def delete() -> None: + """Delete parts or metadata from a bpak file.""" + + +@delete.command("part") +@click.argument("filename", type=click.Path(exists=True, dir_okay=False)) +@click.argument("id_", metavar="[ID]", type=BPAK_ID, required=False) +@click.option( + "--all", + "delete_all", + is_flag=True, + help="Delete every part in the package", +) +@click.option( + "--keep-meta", + is_flag=True, + help="Keep metadata entries that reference the deleted parts", +) +@handle_bpak_errors +@open_package("r+") +def delete_part( + pkg: _bpak.Package, + id_: int | None, + delete_all: bool, + keep_meta: bool, +) -> None: + """Delete a single part, or --all of them.""" + choice = exactly_one_of({"ID": id_ if id_ is not None else None, "--all": delete_all}) + if choice == "--all": + pkg.delete_all_parts(keep_meta=keep_meta) + else: + assert id_ is not None + p = pkg.get_part(id_) + p.delete(keep_meta=keep_meta) + + +@delete.command("meta") +@click.argument("filename", type=click.Path(exists=True, dir_okay=False)) +@click.argument("id_", metavar="ID", type=BPAK_ID) +@click.option( + "--part-ref", + "part_ref", + type=BPAK_ID, + default=0, + show_default=True, + help="Pick the meta entry associated with this part", +) +@handle_bpak_errors +@open_package("r+") +def delete_meta(pkg: _bpak.Package, id_: int, part_ref: int) -> None: + """Delete a metadata entry.""" + m = pkg.get_meta(id_, part_ref) + m.delete() diff --git a/python/bpak/_cli/extract.py b/python/bpak/_cli/extract.py new file mode 100644 index 0000000..2cc9380 --- /dev/null +++ b/python/bpak/_cli/extract.py @@ -0,0 +1,82 @@ +"""`bpak extract` group — write parts or metadata out of a package.""" + +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +import click + +from ._common import ( + BPAK_ID, + binary_sink, + handle_bpak_errors, + open_package, +) + +if TYPE_CHECKING: + from bpak import _bpak + + +@click.group() +def extract() -> None: + """Extract parts or metadata from a bpak file.""" + + +@extract.command("part") +@click.argument("filename", type=click.Path(exists=True, dir_okay=False)) +@click.argument("id_", metavar="ID", type=BPAK_ID) +@click.option( + "-o", + "--output", + type=click.Path(dir_okay=False), + help="Write to PATH; default is stdout (refused when stdout is a TTY)", +) +@handle_bpak_errors +@open_package("rb") +def extract_part(pkg: _bpak.Package, id_: int, output: str | None) -> None: + """Extract a part.""" + if output is not None: + pkg.extract_file(id_, output) + return + p = pkg.get_part(id_) + sink = binary_sink(None) + sink.write(p.read_data()) + sink.flush() + + +@extract.command("meta") +@click.argument("filename", type=click.Path(exists=True, dir_okay=False)) +@click.argument("id_", metavar="ID", type=BPAK_ID) +@click.option( + "-o", + "--output", + type=click.Path(dir_okay=False), + help="Write to PATH; default is stdout (refused when stdout is a TTY)", +) +@click.option( + "--part-ref", + "part_ref", + type=BPAK_ID, + default=0, + show_default=True, + help="Pick the meta entry associated with this part", +) +@handle_bpak_errors +@open_package("rb") +def extract_meta( + pkg: _bpak.Package, + id_: int, + output: str | None, + part_ref: int, +) -> None: + """Extract a metadata value.""" + m = pkg.get_meta(id_, part_ref) + data = m.raw_data + if output is not None: + with Path(output).open("wb") as f: + f.write(data) + else: + sink = binary_sink(None) + sink.write(data) + sink.flush() diff --git a/python/bpak/_cli/generate.py b/python/bpak/_cli/generate.py new file mode 100644 index 0000000..6ad5c41 --- /dev/null +++ b/python/bpak/_cli/generate.py @@ -0,0 +1,119 @@ +"""`bpak generate` group — id and keystore code generation.""" + +from __future__ import annotations + +import struct +from importlib.metadata import PackageNotFoundError +from importlib.metadata import version as pkg_version + +import click + +from bpak import _bpak + +from ._common import handle_bpak_errors, safe_c_identifier + +_KEYSTORE_PROVIDER_ID_MIN_BYTES = 4 + + +@click.group() +def generate() -> None: + """Code and ID generation utilities.""" + + +@generate.command("id") +@click.argument("string") +def generate_id(string: str) -> None: + """Translate a string to its bpak id (32-bit CRC).""" + click.echo(f'id("{string}") = 0x{_bpak.id(string):08x}') + + +_KEY_KIND_NAMES = { + _bpak.KEY_PUB_PRIME256v1: "BPAK_KEY_PUB_PRIME256v1", + _bpak.KEY_PUB_SECP384r1: "BPAK_KEY_PUB_SECP384r1", + _bpak.KEY_PUB_SECP521r1: "BPAK_KEY_PUB_SECP521r1", + _bpak.KEY_PUB_RSA4096: "BPAK_KEY_PUB_RSA4096", +} + + +@generate.command("keystore") +@click.argument("filename", type=click.Path(exists=True, dir_okay=False)) +@click.option( + "--name", + required=True, + help="C identifier prefix for the generated keystore symbols", +) +@click.option( + "--decorate", + is_flag=True, + help="Add .keystore_key/.keystore_header section attributes", +) +@handle_bpak_errors +def generate_keystore(filename: str, name: str, decorate: bool) -> None: # noqa: PLR0915 + """Emit a C keystore source file from a bpak keystore package.""" + safe = safe_c_identifier(name) + + try: + ver = pkg_version("bpak") + except PackageNotFoundError: + ver = "unknown" + + with _bpak.Package(filename, "rb") as pkg: + ks_meta = pkg.get_meta(_bpak.id("keystore-provider-id")) + raw = ks_meta.raw_data + if len(raw) < _KEYSTORE_PROVIDER_ID_MIN_BYTES: + raise click.ClickException( + "keystore-provider-id metadata is too short " + f"({len(raw)} bytes; need >= {_KEYSTORE_PROVIDER_ID_MIN_BYTES})" + ) + ks_provider_id = struct.unpack("") + out("#include ") + out("") + + key_decorator = '__attribute__((section (".keystore_key"))) ' + header_decorator = '__attribute__((section (".keystore_header"))) ' + + key_index = 0 + for p in pkg.parts: + data = p.read_data() + kind, key_data = _bpak.parse_public_key(data) + if kind not in _KEY_KIND_NAMES: + raise click.ClickException(f"Unsupported key type ({kind}) for part 0x{p.id:x}") + + out( + f"const struct bpak_key keystore_{safe}_key{key_index} " + f"{key_decorator if decorate else ''}=" + ) + out("{") + out(f" .id = 0x{p.id:x},") + out(f" .size = {len(key_data)},") + out(f" .kind = {_KEY_KIND_NAMES[kind]},") + out(" .data =") + out(" {") + line = " " + for i, b in enumerate(key_data): + line += f"0x{b:02x}, " + if (i + 1) % 8 == 0: + out(line) + line = " " + if line.strip(): + out(line) + out(" },") + out("};") + out("") + key_index += 1 + + out(f"const struct bpak_keystore keystore_{safe} {header_decorator if decorate else ''}=") + out("{") + out(f" .id = 0x{ks_provider_id:x},") + out(f" .no_of_keys = {key_index},") + out(" .verified = true,") + out(" .keys =") + out(" {") + for i in range(key_index): + out(f" (struct bpak_key *) &keystore_{safe}_key{i},") + out(" },") + out("};") diff --git a/python/bpak/_cli/set.py b/python/bpak/_cli/set.py new file mode 100644 index 0000000..56952c3 --- /dev/null +++ b/python/bpak/_cli/set.py @@ -0,0 +1,89 @@ +"""`bpak set` group — update metadata or header fields.""" + +from __future__ import annotations + +import click + +from bpak import _bpak +from bpak._helpers import encode_meta_value + +from ._common import ( + BPAK_ID, + at_least_one_of, + handle_bpak_errors, + open_package, +) + + +@click.group("set") +def set_() -> None: + """Update a bpak file.""" + + +@set_.command("meta") +@click.argument("filename", type=click.Path(exists=True, dir_okay=False)) +@click.argument("id_", metavar="ID", type=BPAK_ID) +@click.argument("value") +@click.option( + "--encoder", + type=click.Choice(["integer", "id"]), + help="Interpret VALUE through this encoder before writing", +) +@click.option( + "--part-ref", + "part_ref", + type=BPAK_ID, + default=0, + show_default=True, + help="Pick the meta entry associated with this part", +) +@handle_bpak_errors +@open_package("r+") +def set_meta( + pkg: _bpak.Package, + id_: int, + value: str, + encoder: str | None, + part_ref: int, +) -> None: + """Update an existing metadata entry, or create it if absent.""" + new_data = encode_meta_value(value, encoder) if encoder else value.encode("ascii") + b"\x00" + + try: + m = pkg.get_meta(id_, part_ref) + except _bpak.Error: + m = None + + if m is not None: + if len(new_data) <= m.size: + m.raw_data = new_data + else: + m.delete() + pkg.add_meta(id_, part_ref, new_data) + else: + pkg.add_meta(id_, part_ref, new_data) + + +@set_.command("header") +@click.argument("filename", type=click.Path(exists=True, dir_okay=False)) +@click.option( + "--key-id", + "key_id", + type=BPAK_ID, + help="Set the package's key_id header field", +) +@click.option( + "--keystore-id", + "keystore_id", + type=BPAK_ID, + help="Set the package's keystore_id header field", +) +@handle_bpak_errors +@open_package("r+") +def set_header(pkg: _bpak.Package, key_id: int | None, keystore_id: int | None) -> None: + """Update header fields (key-id, keystore-id).""" + at_least_one_of({"--key-id": key_id, "--keystore-id": keystore_id}) + if key_id is not None: + pkg.key_id = key_id + if keystore_id is not None: + pkg.keystore_id = keystore_id diff --git a/python/bpak/_cli/show.py b/python/bpak/_cli/show.py new file mode 100644 index 0000000..f00a6f7 --- /dev/null +++ b/python/bpak/_cli/show.py @@ -0,0 +1,163 @@ +"""`bpak show` group — overview, meta, part, hash.""" + +from __future__ import annotations + +import click + +from bpak import _bpak + +from ._common import ( + BPAK_ID, + BPAK_METADATA_BYTES, + bin2hex, + binary_sink, + flag_str, + handle_bpak_errors, + open_package, +) + + +class _ShowGroup(click.Group): + """Dispatch ``bpak show FILE`` to the (hidden) summary subcommand. + + Allows the user to write ``bpak show FILE`` instead of the explicit + ``bpak show summary FILE``. + """ + + def resolve_command( + self, + ctx: click.Context, + args: list[str], + ) -> tuple[str | None, click.Command | None, list[str]]: + if args and args[0] not in self.commands and not args[0].startswith("-"): + args = ["summary", *args] + return super().resolve_command(ctx, args) + + +@click.group(cls=_ShowGroup) +def show() -> None: + """Show information about a bpak file.""" + + +@show.command("summary", hidden=True) +@click.argument("filename", type=click.Path(exists=True, dir_okay=False)) +@click.pass_context +@handle_bpak_errors +def show_summary(ctx: click.Context, filename: str) -> None: + """Print a full overview of a bpak file.""" + verbose = (ctx.obj or {}).get("verbose", 0) + with _bpak.Package(filename, "rb") as pkg: + click.echo(f"BPAK File: {filename}") + click.echo("") + click.echo(f"Hash: {_bpak.hash_kind_str(pkg.hash_kind)}") + click.echo(f"Signature: {_bpak.signature_kind_str(pkg.signature_kind)}") + click.echo(f"Key ID: {pkg.key_id:08x}") + click.echo(f"Keystore ID: {pkg.keystore_id:08x}") + + click.echo("\nMetadata:") + click.echo(" ID Size Meta ID Part Ref Data") + for m in pkg.meta: + s = _bpak.meta_to_string(pkg, m) or "" + id_name = _bpak.id_to_string(m.id) or "" + ref_str = f"{m.part_id_ref:08x}" if m.part_id_ref else " " + click.echo(f" {m.id:08x} {m.size:<3} {id_name:<20s} {ref_str} {s}") + + click.echo("\nParts:") + click.echo(" ID Size Z-pad Flags Transport Size") + for p in pkg.parts: + flags = flag_str(p.flags) + ts = p.transport_size if p.flags & _bpak.FLAG_TRANSPORT else p.size + click.echo(f" {p.id:08x} {p.size:<12} {p.pad_bytes:<3} {flags} {ts:<12}") + + digest = pkg.digest + if digest: + click.echo(f"\nHeader hash: {bin2hex(digest)}") + + if verbose: + meta_size = sum(m.size for m in pkg.meta) + click.echo(f"Metadata usage: {meta_size}/{BPAK_METADATA_BYTES} bytes") + click.echo(f"Transport size: {pkg.size} bytes") + click.echo(f"Installed size: {pkg.installed_size} bytes") + + +@show.command("meta") +@click.argument("filename", type=click.Path(exists=True, dir_okay=False)) +@click.argument("id_", metavar="[ID]", type=BPAK_ID, required=False) +@click.option( + "--part-ref", + "part_ref", + type=BPAK_ID, + default=None, + help="Filter by part_id_ref; omit to list every ref. " + "Use 0 to pick unassociated/global metadata entries.", +) +@handle_bpak_errors +@open_package("rb") +def show_meta(pkg: _bpak.Package, id_: int | None, part_ref: int | None) -> None: + """Show one metadata entry, or list all of them.""" + found = False + for m in pkg.meta: + if id_ is not None and m.id != id_: + continue + if part_ref is not None and m.part_id_ref != part_ref: + continue + found = True + s = _bpak.meta_to_string(pkg, m) or "" + id_name = _bpak.id_to_string(m.id) or "" + ref_str = f"{m.part_id_ref:08x}" if m.part_id_ref else " " + click.echo(f"{m.id:08x} {id_name:<20s} ref={ref_str} size={m.size} {s}") + if id_ is not None and not found: + if part_ref is not None: + raise click.ClickException( + f"no matching meta 0x{id_:08x} with part_ref 0x{part_ref:08x}" + ) + raise click.ClickException(f"no matching meta 0x{id_:08x}") + + +@show.command("part") +@click.argument("filename", type=click.Path(exists=True, dir_okay=False)) +@click.argument("id_", metavar="ID", type=BPAK_ID) +@click.option( + "--hash", + "show_part_hash", + is_flag=True, + help="Print the SHA-256 hash of the part instead of its summary", +) +@handle_bpak_errors +@open_package("rb") +def show_part(pkg: _bpak.Package, id_: int, show_part_hash: bool) -> None: + """Show a single part.""" + if show_part_hash: + click.echo(bin2hex(pkg.part_sha256(id_))) + return + p = pkg.get_part(id_) + id_name = _bpak.id_to_string(p.id) or "" + flags = flag_str(p.flags) + ts = p.transport_size if p.flags & _bpak.FLAG_TRANSPORT else p.size + click.echo( + f"{p.id:08x} {id_name:<20s} size={p.size} pad={p.pad_bytes} " + f"flags={flags} transport_size={ts}" + ) + + +@show.command("hash") +@click.argument("filename", type=click.Path(exists=True, dir_okay=False)) +@click.option( + "--binary", + "binary_mode", + is_flag=True, + help="Write the raw digest bytes to stdout", +) +@handle_bpak_errors +@open_package("rb") +def show_hash(pkg: _bpak.Package, binary_mode: bool) -> None: + """Print the package header hash (Package.digest).""" + digest = pkg.digest + if not digest: + raise click.ClickException("package has no digest") + if binary_mode: + sink = binary_sink(None) + sink.write(digest) + sink.flush() + else: + click.echo(bin2hex(digest)) diff --git a/python/bpak/_cli/sign.py b/python/bpak/_cli/sign.py new file mode 100644 index 0000000..712dfea --- /dev/null +++ b/python/bpak/_cli/sign.py @@ -0,0 +1,88 @@ +"""`bpak sign` and `bpak verify`.""" + +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +import click + +from ._common import ( + BPAK_MAX_SIGNATURE_BYTES, + exactly_one_of, + handle_bpak_errors, + open_package, +) + +if TYPE_CHECKING: + from bpak import _bpak + + +@click.command() +@click.argument("filename", type=click.Path(exists=True, dir_okay=False)) +@click.option( + "--key", + "key_path", + type=click.Path(exists=True, dir_okay=False), + help="Private key (PEM) used to sign the package", +) +@click.option( + "--signature", + "signature_path", + type=click.Path(exists=True, dir_okay=False), + help="Pre-computed signature file to install instead of signing", +) +@handle_bpak_errors +@open_package("r+") +def sign( + pkg: _bpak.Package, + key_path: str | None, + signature_path: str | None, +) -> None: + """Sign a bpak file.""" + choice = exactly_one_of({"--key": key_path, "--signature": signature_path}) + if choice == "--signature": + assert signature_path is not None + sig_data = Path(signature_path).read_bytes() + if len(sig_data) > BPAK_MAX_SIGNATURE_BYTES: + raise click.ClickException( + f"Signature file too large ({len(sig_data)} > {BPAK_MAX_SIGNATURE_BYTES} bytes)" + ) + pkg.signature = sig_data + else: + assert key_path is not None + pkg.sign(key_path) + + +@click.command() +@click.argument("filename", type=click.Path(exists=True, dir_okay=False)) +@click.option( + "--key", + "key_path", + type=click.Path(exists=True, dir_okay=False), + help="Public key (PEM) to verify against", +) +@click.option( + "--keystore", + "keystore_path", + type=click.Path(exists=True, dir_okay=False), + help="Keystore bpak file to verify against", +) +@handle_bpak_errors +@open_package("rb") +def verify( + pkg: _bpak.Package, + key_path: str | None, + keystore_path: str | None, +) -> None: + """Verify a bpak file signature.""" + choice = exactly_one_of( + {"--key": key_path, "--keystore": keystore_path}, + ) + if choice == "--keystore": + assert keystore_path is not None + pkg.verify_with_keystore(keystore_path) + else: + assert key_path is not None + pkg.verify(key_path) + click.echo("Verification OK") diff --git a/python/bpak/_cli/transport.py b/python/bpak/_cli/transport.py new file mode 100644 index 0000000..d88bb20 --- /dev/null +++ b/python/bpak/_cli/transport.py @@ -0,0 +1,100 @@ +"""`bpak transport` group — transport metadata and encode/decode.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import click + +from bpak import _bpak + +from ._common import BPAK_ID, handle_bpak_errors, open_package + +if TYPE_CHECKING: + from collections.abc import Callable + + +@click.group() +def transport() -> None: + """Transport encoding/decoding operations.""" + + +@transport.command("add") +@click.argument("filename", type=click.Path(exists=True, dir_okay=False)) +@click.argument("id_", metavar="ID", type=BPAK_ID) +@click.option( + "--encoder", + type=BPAK_ID, + required=True, + help="Transport encoder algorithm id (e.g. 'bsdiff' or '0x...')", +) +@click.option( + "--decoder", + type=BPAK_ID, + required=True, + help="Transport decoder algorithm id", +) +@handle_bpak_errors +@open_package("r+") +def transport_add( + pkg: _bpak.Package, + id_: int, + encoder: int, + decoder: int, +) -> None: + """Add transport metadata linking a part to an encoder/decoder pair.""" + _bpak.add_transport_meta(pkg, id_, encoder, decoder) + + +def _run_codec( + op: Callable[..., None], + filename: str, + output: str, + origin: str | None, +) -> None: + with _bpak.Package(filename, "rb") as pkg_in, _bpak.Package(output, "wb") as pkg_out: + if origin is not None: + with _bpak.Package(origin, "rb") as pkg_origin: + op(pkg_in, pkg_out, pkg_origin) + else: + op(pkg_in, pkg_out) + + +@transport.command("encode") +@click.argument("filename", type=click.Path(exists=True, dir_okay=False)) +@click.option( + "-o", + "--output", + type=click.Path(dir_okay=False), + required=True, + help="Path of the transport-encoded output package", +) +@click.option( + "--origin", + type=click.Path(exists=True, dir_okay=False), + help="Origin package to diff against (for bsdiff-style encoders)", +) +@handle_bpak_errors +def transport_encode(filename: str, output: str, origin: str | None) -> None: + """Transport-encode a package.""" + _run_codec(_bpak.transport_encode, filename, output, origin) + + +@transport.command("decode") +@click.argument("filename", type=click.Path(exists=True, dir_okay=False)) +@click.option( + "-o", + "--output", + type=click.Path(dir_okay=False), + required=True, + help="Path of the transport-decoded output package", +) +@click.option( + "--origin", + type=click.Path(exists=True, dir_okay=False), + help="Origin package used to reconstruct (for bsdiff-style decoders)", +) +@handle_bpak_errors +def transport_decode(filename: str, output: str, origin: str | None) -> None: + """Transport-decode a package.""" + _run_codec(_bpak.transport_decode, filename, output, origin) diff --git a/python/bpak/_helpers.py b/python/bpak/_helpers.py new file mode 100644 index 0000000..9114b0f --- /dev/null +++ b/python/bpak/_helpers.py @@ -0,0 +1,61 @@ +"""Helper utilities for the bpak CLI.""" + +from __future__ import annotations + +import struct +import uuid + +from bpak import ( + HASH_SHA256, + HASH_SHA384, + HASH_SHA512, + SIGN_RSA4096, + SIGN_PRIME256v1, + SIGN_SECP384r1, + SIGN_SECP521r1, +) +from bpak import ( + id as bpak_id, +) + +HASH_KIND_MAP: dict[str, int] = { + "sha256": HASH_SHA256, + "sha384": HASH_SHA384, + "sha512": HASH_SHA512, +} + +SIGN_KIND_MAP: dict[str, int] = { + "prime256v1": SIGN_PRIME256v1, + "secp384r1": SIGN_SECP384r1, + "secp521r1": SIGN_SECP521r1, + "rsa4096": SIGN_RSA4096, +} + + +def resolve_id(arg: str) -> int: + """Convert a name string or numeric literal to a bpak_id_t. + + Accepts: + - ``0x...`` / ``0X...`` hex literals (any width). + - Bare decimal literals (``0``, ``42``, ...). Note: this means a + part or metadata named exactly ``"123"`` must be written as + ``0x...`` to disambiguate from the decimal 123. + - Any other string, which is CRC32-hashed via ``bpak_id()``. + """ + if arg.startswith(("0x", "0X")): + return int(arg, 16) + if arg.isdigit(): + return int(arg) + return bpak_id(arg) + + +def encode_meta_value(value: str, encoder: str) -> bytes: + """Encode a metadata value using the specified encoder.""" + if encoder == "integer": + return struct.pack(" #include #include +#include #include "python_wrapper.h" @@ -173,7 +174,7 @@ static PyObject *package_sign(PyObject *self, PyObject *args, PyObject *kwds) static PyObject *package_add_file(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"part_name", "filename", "with_merkle_tree", NULL}; + static char *kwlist[] = {"part_name", "filename", "with_merkle_tree", "flags", NULL}; BPAKPackage *package = (BPAKPackage *)self; struct bpak_header *h = bpak_pkg_header(&package->pkg); struct bpak_part_header *part; @@ -181,15 +182,17 @@ static PyObject *package_add_file(PyObject *self, PyObject *args, PyObject *kwds const char *part_name = NULL; PyObject *filename; int with_merkle_tree = 0; + unsigned int flags = 0; rc = PyArg_ParseTupleAndKeywords(args, kwds, - "sO&|p:add_file", + "sO&|pI:add_file", kwlist, &part_name, &PyUnicode_FSDecoder, &filename, - &with_merkle_tree); + &with_merkle_tree, + &flags); if (!rc) { return NULL; @@ -202,10 +205,10 @@ static PyObject *package_add_file(PyObject *self, PyObject *args, PyObject *kwds if (with_merkle_tree) { rc = bpak_pkg_add_file_with_merkle_tree(&package->pkg, - PyBytes_AsString(filename_ascii), part_name, 0); + PyBytes_AsString(filename_ascii), part_name, (uint8_t)flags); } else { rc = bpak_pkg_add_file(&package->pkg, PyBytes_AsString(filename_ascii), - part_name, 0); + part_name, (uint8_t)flags); } Py_DECREF(filename_ascii); @@ -226,21 +229,23 @@ static PyObject *package_add_file(PyObject *self, PyObject *args, PyObject *kwds static PyObject *package_add_key(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"part_name", "filename", NULL}; + static char *kwlist[] = {"part_name", "filename", "flags", NULL}; BPAKPackage *package = (BPAKPackage *)self; struct bpak_header *h = bpak_pkg_header(&package->pkg); struct bpak_part_header *part; int rc; const char *part_name = NULL; PyObject *filename; + unsigned int flags = 0; rc = PyArg_ParseTupleAndKeywords(args, kwds, - "sO&:add_key", + "sO&|I:add_key", kwlist, &part_name, &PyUnicode_FSDecoder, - &filename); + &filename, + &flags); if (!rc) { return NULL; @@ -251,8 +256,8 @@ static PyObject *package_add_key(PyObject *self, PyObject *args, PyObject *kwds) return NULL; } - - rc = bpak_pkg_add_key(&package->pkg, PyBytes_AsString(filename_ascii), part_name, 0); + rc = bpak_pkg_add_key(&package->pkg, PyBytes_AsString(filename_ascii), + part_name, (uint8_t)flags); Py_DECREF(filename_ascii); @@ -331,7 +336,20 @@ static PyObject *package_add_meta(PyObject *self, PyObject *args, PyObject *kwds } } - rc = bpak_add_meta(h, meta_id, part_ref, size, &meta); + if (size > BPAK_METADATA_BYTES) { + PyErr_Format(PyExc_ValueError, + "metadata size %zd exceeds maximum %d bytes", + size, BPAK_METADATA_BYTES); + goto err_out; + } + + if (size > UINT16_MAX) { + PyErr_Format(PyExc_ValueError, + "metadata size %zd exceeds uint16 maximum", size); + goto err_out; + } + + rc = bpak_add_meta(h, meta_id, part_ref, (uint16_t)size, &meta); if (rc != BPAK_OK) { PyErr_Format(BPAKPackageError, "failed to add metadata: %s", bpak_error_string(rc)); @@ -370,6 +388,135 @@ static PyObject *package_add_meta(PyObject *self, PyObject *args, PyObject *kwds return NULL; } +static PyObject *package_extract_file(PyObject *self, PyObject *args, + PyObject *kwds) +{ + static char *kwlist[] = {"part_id", "filename", NULL}; + BPAKPackage *package = (BPAKPackage *)self; + int rc; + bpak_id_t part_id; + PyObject *filename; + + rc = PyArg_ParseTupleAndKeywords(args, kwds, "IO&:extract_file", kwlist, + &part_id, &PyUnicode_FSDecoder, &filename); + if (!rc) { + return NULL; + } + + PyObject *filename_ascii = PyUnicode_AsASCIIString(filename); + if (!filename_ascii) { + return NULL; + } + + rc = bpak_pkg_extract_file(&package->pkg, part_id, + PyBytes_AsString(filename_ascii)); + + Py_DECREF(filename_ascii); + + if (rc != BPAK_OK) { + return PyErr_Format(BPAKPackageError, "failed to extract file: %s", + bpak_error_string(rc)); + } + + Py_RETURN_NONE; +} + +static PyObject *package_delete_all_parts(PyObject *self, PyObject *args, + PyObject *kwds) +{ + static char *kwlist[] = {"keep_meta", NULL}; + BPAKPackage *package = (BPAKPackage *)self; + int rc; + int keep_meta = 0; + + rc = PyArg_ParseTupleAndKeywords(args, kwds, "|p:delete_all_parts", + kwlist, &keep_meta); + if (!rc) { + return NULL; + } + + rc = bpak_pkg_delete_all_parts(&package->pkg, !keep_meta); + if (rc != BPAK_OK) { + return PyErr_Format(BPAKPackageError, + "failed to delete all parts: %s", + bpak_error_string(rc)); + } + + Py_RETURN_NONE; +} + +static PyObject *package_part_sha256(PyObject *self, PyObject *args, + PyObject *kwds) +{ + static char *kwlist[] = {"part_id", NULL}; + BPAKPackage *package = (BPAKPackage *)self; + int rc; + bpak_id_t part_id; + uint8_t hash_buffer[32]; + + rc = PyArg_ParseTupleAndKeywords(args, kwds, "I:part_sha256", kwlist, + &part_id); + if (!rc) { + return NULL; + } + + rc = bpak_pkg_part_sha256(&package->pkg, hash_buffer, sizeof(hash_buffer), + part_id); + if (rc != BPAK_OK) { + return PyErr_Format(BPAKPackageError, + "failed to compute part sha256: %s", + bpak_error_string(rc)); + } + + return PyBytes_FromStringAndSize((char *)hash_buffer, sizeof(hash_buffer)); +} + +static PyObject *package_verify_with_keystore(PyObject *self, PyObject *args, + PyObject *kwds) +{ + static char *kwlist[] = {"keystore_path", NULL}; + BPAKPackage *package = (BPAKPackage *)self; + int rc; + PyObject *ks_filename; + struct bpak_key *key = NULL; + struct bpak_header *h = bpak_pkg_header(&package->pkg); + + rc = PyArg_ParseTupleAndKeywords(args, kwds, "O&:verify_with_keystore", + kwlist, &PyUnicode_FSDecoder, + &ks_filename); + if (!rc) { + return NULL; + } + + PyObject *filename_ascii = PyUnicode_AsASCIIString(ks_filename); + if (!filename_ascii) { + return NULL; + } + + rc = bpak_keystore_load_key_from_file(PyBytes_AsString(filename_ascii), + h->keystore_id, h->key_id, + NULL, NULL, &key); + + Py_DECREF(filename_ascii); + + if (rc != BPAK_OK) { + return PyErr_Format(BPAKPackageError, + "could not load key from keystore: %s", + bpak_error_string(rc)); + } + + rc = bpak_pkg_verify(&package->pkg, key); + + free(key); + + if (rc != BPAK_OK) { + return PyErr_Format(BPAKPackageError, "verification failed: %s", + bpak_error_string(rc)); + } + + Py_RETURN_TRUE; +} + static PyObject *package_get_part(PyObject *self, PyObject *args, PyObject *kwds) { static char *kwlist[] = {"id", NULL}; @@ -614,6 +761,13 @@ static int package_set_signature(PyObject *self, PyObject *value, return -1; } + if (signature_sz > BPAK_SIGNATURE_MAX_BYTES) { + PyErr_Format(PyExc_ValueError, + "signature size %zd exceeds maximum %d bytes", + signature_sz, BPAK_SIGNATURE_MAX_BYTES); + return -1; + } + rc = bpak_pkg_write_raw_signature(&package->pkg, signature_data, signature_sz); @@ -804,6 +958,26 @@ static PyMethodDef package_methods[] = { METH_VARARGS | METH_KEYWORDS, "Get a reference to a specific metadata object"}, + {"extract_file", + (PyCFunction)(void (*)(void))package_extract_file, + METH_VARARGS | METH_KEYWORDS, + "Extract a part to a file"}, + + {"delete_all_parts", + (PyCFunction)(void (*)(void))package_delete_all_parts, + METH_VARARGS | METH_KEYWORDS, + "Delete all parts from the package"}, + + {"part_sha256", + (PyCFunction)(void (*)(void))package_part_sha256, + METH_VARARGS | METH_KEYWORDS, + "Compute SHA256 hash of a part"}, + + {"verify_with_keystore", + (PyCFunction)(void (*)(void))package_verify_with_keystore, + METH_VARARGS | METH_KEYWORDS, + "Verify package using a keystore bpak file"}, + /* For context manager use */ {"__enter__", (PyCFunction)(void (*)(void))package_enter, diff --git a/python/part.c b/python/part.c index 0a30d97..f51afcd 100644 --- a/python/part.c +++ b/python/part.c @@ -191,12 +191,70 @@ static PyObject *part_read_data(PyObject *self, PyObject *Py_UNUSED(ignored)) return bytes; } -static PyObject *part_delete(PyObject *self, PyObject *Py_UNUSED(ignored)) +static PyObject *part_get_flags(PyObject *self, void *closure) +{ + (void)closure; + BPAKPart *part = (BPAKPart *)self; + struct bpak_header *h = bpak_pkg_header(&part->package->pkg); + struct bpak_part_header *p; + int rc = 0; + + rc = bpak_get_part(h, part->part_id, &p); + if (rc != BPAK_OK) { + return PyErr_Format(PyExc_KeyError, "failed to get partition: %s", + bpak_error_string(rc)); + } + + return PyLong_FromLong(p->flags); +} + +static PyObject *part_get_transport_size(PyObject *self, void *closure) +{ + (void)closure; + BPAKPart *part = (BPAKPart *)self; + struct bpak_header *h = bpak_pkg_header(&part->package->pkg); + struct bpak_part_header *p; + int rc = 0; + + rc = bpak_get_part(h, part->part_id, &p); + if (rc != BPAK_OK) { + return PyErr_Format(PyExc_KeyError, "failed to get partition: %s", + bpak_error_string(rc)); + } + + return PyLong_FromUnsignedLongLong(p->transport_size); +} + +static PyObject *part_get_pad_bytes(PyObject *self, void *closure) +{ + (void)closure; + BPAKPart *part = (BPAKPart *)self; + struct bpak_header *h = bpak_pkg_header(&part->package->pkg); + struct bpak_part_header *p; + int rc = 0; + + rc = bpak_get_part(h, part->part_id, &p); + if (rc != BPAK_OK) { + return PyErr_Format(PyExc_KeyError, "failed to get partition: %s", + bpak_error_string(rc)); + } + + return PyLong_FromLong(p->pad_bytes); +} + +static PyObject *part_delete(PyObject *self, PyObject *args, PyObject *kwds) { BPAKPart *part = (BPAKPart *)self; int rc; + int keep_meta = 0; + static char *kwlist[] = {"keep_meta", NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwds, "|p:delete", kwlist, + &keep_meta)) { + return NULL; + } - rc = bpak_pkg_delete_part(&part->package->pkg, part->part_id, true); + rc = bpak_pkg_delete_part(&part->package->pkg, part->part_id, !keep_meta); if (rc != BPAK_OK) { return PyErr_Format(BPAKPackageError, "failed to delete part: %s", bpak_error_string(rc)); @@ -213,8 +271,8 @@ static PyMethodDef part_methods[] = { {"delete", (PyCFunction)(void (*)(void))part_delete, - METH_NOARGS, - "Delete part from package"}, + METH_VARARGS | METH_KEYWORDS, + "Delete part from package. Optional keep_meta=True to preserve metadata."}, {NULL} }; @@ -244,6 +302,24 @@ static PyGetSetDef part_getset[] = { "If part is transport encoded", NULL}, + {"flags", + (getter)part_get_flags, + (setter)NULL, + "Part flags byte", + NULL}, + + {"transport_size", + (getter)part_get_transport_size, + (setter)NULL, + "Part transport size in bytes", + NULL}, + + {"pad_bytes", + (getter)part_get_pad_bytes, + (setter)NULL, + "Part padding bytes", + NULL}, + {NULL} }; diff --git a/python/python_wrapper.c b/python/python_wrapper.c index 161b48e..f3013b0 100644 --- a/python/python_wrapper.c +++ b/python/python_wrapper.c @@ -7,6 +7,8 @@ #include #include #include +#include +#include #include "python_wrapper.h" @@ -132,6 +134,167 @@ static PyObject *m_transport_decode(PyObject *self, PyObject *args, } +static PyObject *m_hash_kind_str(PyObject *module, PyObject *args, + PyObject *kwds) +{ + (void)module; + int rc; + static char *kwlist[] = {"kind", NULL}; + unsigned int kind; + + rc = PyArg_ParseTupleAndKeywords(args, kwds, "I:hash_kind_str", kwlist, + &kind); + if (!rc) { + return NULL; + } + + const char *s = bpak_hash_kind((uint8_t)kind); + return PyUnicode_FromString(s); +} + +static PyObject *m_signature_kind_str(PyObject *module, PyObject *args, + PyObject *kwds) +{ + (void)module; + int rc; + static char *kwlist[] = {"kind", NULL}; + unsigned int kind; + + rc = PyArg_ParseTupleAndKeywords(args, kwds, "I:signature_kind_str", + kwlist, &kind); + if (!rc) { + return NULL; + } + + const char *s = bpak_signature_kind((uint8_t)kind); + return PyUnicode_FromString(s); +} + +static PyObject *m_id_to_string(PyObject *module, PyObject *args, + PyObject *kwds) +{ + (void)module; + int rc; + static char *kwlist[] = {"id", NULL}; + unsigned int id; + + rc = PyArg_ParseTupleAndKeywords(args, kwds, "I:id_to_string", kwlist, + &id); + if (!rc) { + return NULL; + } + + const char *s = bpak_id_to_string((bpak_id_t)id); + if (s == NULL) { + Py_RETURN_NONE; + } + + return PyUnicode_FromString(s); +} + +static PyObject *m_meta_to_string(PyObject *module, PyObject *args, + PyObject *kwds) +{ + (void)module; + int rc; + static char *kwlist[] = {"package", "meta", NULL}; + BPAKPackage *package = NULL; + BPAKMeta *meta_obj = NULL; + + rc = PyArg_ParseTupleAndKeywords(args, kwds, "O!O!:meta_to_string", + kwlist, + &BPAKPackageType, &package, + &BPAKMetaType, &meta_obj); + if (!rc) { + return NULL; + } + + struct bpak_header *h = bpak_pkg_header(&package->pkg); + struct bpak_meta_header *m; + + rc = bpak_get_meta(h, meta_obj->meta_id, meta_obj->part_ref, &m); + if (rc != BPAK_OK) { + return PyErr_Format(PyExc_KeyError, "failed to get meta: %s", + bpak_error_string(rc)); + } + + char buf[256]; + rc = bpak_meta_to_string(h, m, buf, sizeof(buf)); + if (rc != BPAK_OK) { + Py_RETURN_NONE; + } + + return PyUnicode_FromString(buf); +} + +static PyObject *m_add_transport_meta(PyObject *module, PyObject *args, + PyObject *kwds) +{ + (void)module; + int rc; + static char *kwlist[] = {"package", "part_id", "encoder_id", "decoder_id", + NULL}; + BPAKPackage *package = NULL; + unsigned int part_id; + unsigned int encoder_id; + unsigned int decoder_id; + + rc = PyArg_ParseTupleAndKeywords(args, kwds, "O!III:add_transport_meta", + kwlist, + &BPAKPackageType, &package, + &part_id, &encoder_id, &decoder_id); + if (!rc) { + return NULL; + } + + struct bpak_header *h = bpak_pkg_header(&package->pkg); + + rc = bpak_add_transport_meta(h, (bpak_id_t)part_id, + (uint32_t)encoder_id, (uint32_t)decoder_id); + if (rc != BPAK_OK) { + return PyErr_Format(BPAKPackageError, + "failed to add transport metadata: %s", + bpak_error_string(rc)); + } + + rc = package_write_header(package, false); + if (rc != BPAK_OK) { + return PyErr_Format(PyExc_IOError, "could not write header: %s", + bpak_error_string(rc)); + } + + Py_RETURN_NONE; +} + +static PyObject *m_parse_public_key(PyObject *module, PyObject *args, + PyObject *kwds) +{ + (void)module; + int rc; + static char *kwlist[] = {"data", NULL}; + const uint8_t *buffer; + Py_ssize_t length; + struct bpak_key *key = NULL; + + rc = PyArg_ParseTupleAndKeywords(args, kwds, "y#:parse_public_key", + kwlist, &buffer, &length); + if (!rc) { + return NULL; + } + + rc = bpak_crypto_parse_public_key(buffer, (size_t)length, &key); + if (rc != BPAK_OK) { + return PyErr_Format(BPAKPackageError, "failed to parse public key: %s", + bpak_error_string(rc)); + } + + PyObject *result = Py_BuildValue("(iy#)", key->kind, key->data, key->size); + + free(key); + + return result; +} + static PyMethodDef module_methods[] = { {"id", (PyCFunction)(void (*)(void))m_generate_id, @@ -157,19 +320,55 @@ static PyMethodDef module_methods[] = { "Transport decode from input to output, using origin for diff origin" }, + {"hash_kind_str", + (PyCFunction)(void (*)(void))m_hash_kind_str, + METH_VARARGS | METH_KEYWORDS, + "Convert hash kind constant to string" + }, + + {"signature_kind_str", + (PyCFunction)(void (*)(void))m_signature_kind_str, + METH_VARARGS | METH_KEYWORDS, + "Convert signature kind constant to string" + }, + + {"id_to_string", + (PyCFunction)(void (*)(void))m_id_to_string, + METH_VARARGS | METH_KEYWORDS, + "Convert bpak id to known string name, or None" + }, + + {"meta_to_string", + (PyCFunction)(void (*)(void))m_meta_to_string, + METH_VARARGS | METH_KEYWORDS, + "Convert metadata to string representation" + }, + + {"add_transport_meta", + (PyCFunction)(void (*)(void))m_add_transport_meta, + METH_VARARGS | METH_KEYWORDS, + "Add transport metadata to a package" + }, + + {"parse_public_key", + (PyCFunction)(void (*)(void))m_parse_public_key, + METH_VARARGS | METH_KEYWORDS, + "Parse a public key from bytes, returns (kind, data)" + }, + {NULL} }; static PyModuleDef module = { PyModuleDef_HEAD_INIT, - .m_name = "bpak", + .m_name = "bpak._bpak", .m_doc = NULL, .m_size = -1, .m_methods = module_methods }; -PyMODINIT_FUNC PyInit_bpak(void) +PyMODINIT_FUNC PyInit__bpak(void) { PyObject *m_p; @@ -225,5 +424,15 @@ PyMODINIT_FUNC PyInit_bpak(void) PyModule_AddIntConstant(m_p, "SIGN_PRIME256v1", BPAK_SIGN_PRIME256v1); PyModule_AddIntConstant(m_p, "SIGN_SECP384r1", BPAK_SIGN_SECP384r1); PyModule_AddIntConstant(m_p, "SIGN_SECP521r1", BPAK_SIGN_SECP521r1); + + PyModule_AddIntConstant(m_p, "KEY_PUB_RSA4096", BPAK_KEY_PUB_RSA4096); + PyModule_AddIntConstant(m_p, "KEY_PUB_PRIME256v1", BPAK_KEY_PUB_PRIME256v1); + PyModule_AddIntConstant(m_p, "KEY_PUB_SECP384r1", BPAK_KEY_PUB_SECP384r1); + PyModule_AddIntConstant(m_p, "KEY_PUB_SECP521r1", BPAK_KEY_PUB_SECP521r1); + + PyModule_AddIntConstant(m_p, "FLAG_EXCLUDE_FROM_HASH", + BPAK_FLAG_EXCLUDE_FROM_HASH); + PyModule_AddIntConstant(m_p, "FLAG_TRANSPORT", BPAK_FLAG_TRANSPORT); + return (m_p); } diff --git a/python/setup.py b/python/setup.py deleted file mode 100644 index f939a59..0000000 --- a/python/setup.py +++ /dev/null @@ -1,31 +0,0 @@ -#!/usr/bin/env python3 - -from setuptools import setup -from setuptools import Extension -import re - -def read_version(): - with open("../include/bpak/version.h", "r") as f: - return re.search(r"BPAK_VERSION_STRING \"(.*)\"$", - f.read(), - re.MULTILINE).group(1) - -setup(name='bpak', - version=read_version(), - description="BPAK python wrapper", - author="Jonas Blixt", - author_email="jonpe960@gmail.com", - license="BSD", - url="https://github.com/jonasblixt/bpak", - ext_modules=[ - Extension(name="bpak", - sources=[ - "meta.c", - "part.c", - "package.c", - "python_wrapper.c" - ], - libraries=["bpak"], - ) - ], -) diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..981d60a --- /dev/null +++ b/setup.py @@ -0,0 +1,85 @@ +"""BPAK setuptools configuration.""" + +import re +from pathlib import Path + +from setuptools import Extension, setup + + +def read_version(): + text = Path("include/bpak/version.h").read_text() + return re.search(r'BPAK_VERSION_STRING "(.+)"', text).group(1) + + +# NOTE: Keep in sync with lib/CMakeLists.txt (authoritative source list) +_lib_sources = [ + "lib/bpak.c", + "lib/bpakcrc.c", + "lib/id.c", + "lib/keystore.c", + "lib/mem.c", + "lib/utils.c", + "lib/bsdiff.c", + "lib/bspatch.c", + "lib/merkle.c", + "lib/pkg.c", + "lib/pkg_create.c", + "lib/pkg_sign.c", + "lib/pkg_verify.c", + "lib/sais.c", + "lib/transport_decode.c", + "lib/transport_encode.c", + "lib/verify.c", + "lib/heatshrink/heatshrink_decoder.c", + "lib/heatshrink/heatshrink_encoder.c", + "lib/crypto.c", + "lib/mbedtls_wrapper.c", + "lib/keystore_load_from_file.c", + "ext/uuid/unpack.c", + "ext/uuid/unparse.c", +] + +_wrapper_sources = [ + "python/python_wrapper.c", + "python/package.c", + "python/meta.c", + "python/part.c", +] + +setup( + name="bpak", + version=read_version(), + description="BPAK - Bit Packer", + long_description=Path("README.rst").read_text(), + long_description_content_type="text/x-rst", + author="Jonas Blixt", + author_email="jonpe960@gmail.com", + license="BSD", + url="https://github.com/jonasblixt/bpak", + classifiers=[ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "Topic :: Software Development :: Embedded Systems", + "License :: OSI Approved :: BSD License", + "Programming Language :: Python :: 3", + ], + packages=["bpak", "bpak._cli"], + package_dir={ + "bpak": "python/bpak", + "bpak._cli": "python/bpak/_cli", + }, + ext_modules=[ + Extension( + name="bpak._bpak", + sources=_wrapper_sources + _lib_sources, + include_dirs=["include", "ext/uuid", "python"], + define_macros=[("BPAK_HAVE_USER_SETTINGS", "1")], + libraries=["mbedcrypto", "lzma"], + extra_compile_args=["-fvisibility=hidden"], + ) + ], + install_requires=["click>=8.0"], + entry_points={ + "console_scripts": ["bpak=bpak._cli:cli"], + }, +) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 5973a19..db3fd65 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -71,9 +71,10 @@ set(TEST_SCRIPT_PYTHON test_python_create_package.py test_python_meta.py test_python_transport.py + test_python_cli.py ) -if (BPAK_BUILD_PYTHON_WRAPPER) +if (BPAK_BUILD_TESTS) foreach(test_script IN LISTS TEST_SCRIPT_PYTHON) add_test(NAME ${test_script} COMMAND "${CMAKE_CURRENT_LIST_DIR}/${test_script}" "${CMAKE_SOURCE_DIR}" diff --git a/test/test_python_cli.py b/test/test_python_cli.py new file mode 100755 index 0000000..1764070 --- /dev/null +++ b/test/test_python_cli.py @@ -0,0 +1,382 @@ +#!/usr/bin/env python3 +"""Argv-level tests for the bpak Python CLI (``bpak._cli``). + +Run from ``${CMAKE_BINARY_DIR}/test`` with the source tree passed as argv[1] +(same convention as the other test_python_*.py scripts).""" + +from __future__ import annotations + +import os +import subprocess +import sys +from pathlib import Path + +sys.path.insert(0, "../python/") + +from click.testing import CliRunner # noqa: E402 + +from bpak._cli import cli # noqa: E402 + + +# --- small helpers --- + + +def run(*args: str, input_: str | None = None): + runner = CliRunner() + return runner.invoke(cli, list(args), input=input_, catch_exceptions=False) + + +def ok(result, *, exit_code: int = 0) -> None: + if result.exit_code != exit_code: + raise AssertionError( + f"expected exit {exit_code}, got {result.exit_code}\n" + f"output:\n{result.output}" + ) + + +def fail(result, expected_fragment: str) -> None: + assert result.exit_code != 0, f"expected failure, got 0\n{result.output}" + if expected_fragment not in result.output: + raise AssertionError( + f"expected {expected_fragment!r} in output; got:\n{result.output}" + ) + + +def _make_keypair(tmp: Path) -> tuple[Path, Path]: + priv = tmp / "priv.pem" + pub = tmp / "pub.pem" + subprocess.run( + ["openssl", "ecparam", "-name", "prime256v1", "-genkey", "-noout", + "-out", str(priv)], + check=True, stderr=subprocess.DEVNULL, + ) + subprocess.run( + ["openssl", "ec", "-in", str(priv), "-pubout", "-out", str(pub)], + check=True, stderr=subprocess.DEVNULL, + ) + return priv, pub + + +# --- tests --- + + +def test_help_lists_all_top_level_commands(): + result = run("--help") + ok(result) + for cmd in ("create", "add", "show", "set", "delete", "extract", + "sign", "verify", "compare", "transport", "generate"): + assert cmd in result.output, f"missing top-level command: {cmd}" + + +def test_generate_id(): + result = run("generate", "id", "hello") + ok(result) + assert "0xf032519b" in result.output + + +def test_generate_keystore_rejects_bad_identifier(tmp_path: Path): + pkg = tmp_path / "ks.bpak" + ok(run("create", str(pkg), "--force")) + result = run("generate", "keystore", str(pkg), "--name", "bad name!") + fail(result, "not a valid C identifier") + + +def test_create_and_show_summary(tmp_path: Path): + pkg = tmp_path / "a.bpak" + ok(run("create", str(pkg), "--force")) + + # Bare `bpak show FILE` triggers the overview via the custom resolver. + result = run("show", str(pkg)) + ok(result) + assert "Hash:" in result.output + assert "Signature:" in result.output + assert "Payload hash" not in result.output # the old bogus label is gone + + # The explicit `summary` subcommand is hidden but still dispatchable. + result = run("show", "summary", str(pkg)) + ok(result) + assert "Hash:" in result.output + + +def test_add_and_show_part(tmp_path: Path): + pkg = tmp_path / "a.bpak" + payload = tmp_path / "data.bin" + payload.write_bytes(b"hello world") + ok(run("create", str(pkg), "--force")) + ok(run("add", "part", str(pkg), "fs", "--from", str(payload))) + + result = run("show", "part", str(pkg), "fs") + ok(result) + assert "size=" in result.output + assert "faabeca7" in result.output # id("fs") + + result = run("show", "part", str(pkg), "fs", "--hash") + ok(result) + assert len(result.output.strip()) == 64 # sha256 hex + + +def test_add_meta_variants(tmp_path: Path): + pkg = tmp_path / "a.bpak" + ok(run("create", str(pkg), "--force")) + + ok(run("add", "meta", str(pkg), "version", "--from-string", "1.0.0")) + ok(run("add", "meta", str(pkg), "build-id", "--from-string", "42", + "--encoder", "integer")) + + # mutual exclusion + fail(run("add", "meta", str(pkg), "x"), + "one of --from-string, --from-file is required") + + fail(run("add", "meta", str(pkg), "x", + "--from-string", "1", "--from-file", str(pkg)), + "only one of --from-string, --from-file is allowed") + + +def test_set_meta_updates_value(tmp_path: Path): + pkg = tmp_path / "a.bpak" + ok(run("create", str(pkg), "--force")) + ok(run("add", "meta", str(pkg), "version", "--from-string", "1.0.0")) + ok(run("set", "meta", str(pkg), "version", "2.0.0-longer")) + + result = run("extract", "meta", str(pkg), "version", "-o", str(tmp_path / "v.bin")) + ok(result) + data = (tmp_path / "v.bin").read_bytes() + assert data.rstrip(b"\x00").startswith(b"2.0.0-longer") + + +def test_set_header_requires_at_least_one(tmp_path: Path): + pkg = tmp_path / "a.bpak" + ok(run("create", str(pkg), "--force")) + fail(run("set", "header", str(pkg)), + "at least one of --key-id, --keystore-id is required") + ok(run("set", "header", str(pkg), "--key-id", "0xdeadbeef")) + + +def test_set_header_accepts_zero(tmp_path: Path): + """--key-id 0 is a valid explicit value, not 'missing'.""" + pkg = tmp_path / "a.bpak" + ok(run("create", str(pkg), "--force")) + ok(run("set", "header", str(pkg), "--key-id", "0")) + ok(run("set", "header", str(pkg), "--keystore-id", "0")) + + +def test_show_meta_part_ref_filter(tmp_path: Path): + pkg = tmp_path / "a.bpak" + payload = tmp_path / "data.bin" + payload.write_bytes(b"hello") + ok(run("create", str(pkg), "--force")) + ok(run("add", "part", str(pkg), "fs", "--from", str(payload))) + # A global meta (part_ref=0) and a part-scoped one with the same id-name. + ok(run("add", "meta", str(pkg), "version", + "--from-string", "global")) + ok(run("add", "meta", str(pkg), "version", + "--from-string", "fs-scope", "--part-ref", "fs")) + + all_result = run("show", "meta", str(pkg), "version") + ok(all_result) + assert all_result.output.count("\n") >= 2, \ + f"expected both entries listed, got:\n{all_result.output}" + + fs_only = run("show", "meta", str(pkg), "version", "--part-ref", "fs") + ok(fs_only) + # part_ref=id('fs')=0xfaabeca7 is printed in the 'ref=' column. + assert "faabeca7" in fs_only.output + assert fs_only.output.count("\n") == 1, \ + f"expected 1 entry, got:\n{fs_only.output}" + + global_only = run("show", "meta", str(pkg), "version", "--part-ref", "0") + ok(global_only) + assert "faabeca7" not in global_only.output, \ + f"part-scoped entry leaked into part_ref=0 filter:\n{global_only.output}" + + +def test_delete_part_mutex(tmp_path: Path): + pkg = tmp_path / "a.bpak" + payload = tmp_path / "data.bin" + payload.write_bytes(b"hello") + ok(run("create", str(pkg), "--force")) + ok(run("add", "part", str(pkg), "fs", "--from", str(payload))) + + fail(run("delete", "part", str(pkg)), + "one of ID, --all is required") + fail(run("delete", "part", str(pkg), "fs", "--all"), + "only one of ID, --all is allowed") + + ok(run("delete", "part", str(pkg), "fs")) + + +def test_delete_meta(tmp_path: Path): + pkg = tmp_path / "a.bpak" + ok(run("create", str(pkg), "--force")) + ok(run("add", "meta", str(pkg), "version", "--from-string", "1.0.0")) + ok(run("delete", "meta", str(pkg), "version")) + + +def test_extract_part_binary_tty_refusal(tmp_path: Path): + pkg = tmp_path / "a.bpak" + payload = tmp_path / "data.bin" + payload.write_bytes(b"hello") + ok(run("create", str(pkg), "--force")) + ok(run("add", "part", str(pkg), "fs", "--from", str(payload))) + + # CliRunner's stdout isn't a real TTY, so this should succeed. + result = run("extract", "part", str(pkg), "fs") + ok(result) + + # Simulate a TTY with a subprocess using `script` so isatty() is true. + env = os.environ.copy() + env["PYTHONPATH"] = "../python/" + proc = subprocess.run( + ["script", "-qc", + f"{sys.executable} -m bpak extract part {pkg} fs", "/dev/null"], + check=False, capture_output=True, env=env, + ) + combined = proc.stdout.decode("utf-8", errors="replace") + \ + proc.stderr.decode("utf-8", errors="replace") + assert "refusing to write binary data to a terminal" in combined, combined + + +def test_compare_flags_same_size_different_content(tmp_path: Path): + p1 = tmp_path / "a.bpak" + p2 = tmp_path / "b.bpak" + a = tmp_path / "a.bin" + b = tmp_path / "b.bin" + a.write_bytes(b"aaaaa\n") + b.write_bytes(b"bbbbb\n") # same length as a.bin + + ok(run("create", str(p1), "--force")) + ok(run("add", "part", str(p1), "fs", "--from", str(a))) + + ok(run("create", str(p2), "--force")) + ok(run("add", "part", str(p2), "fs", "--from", str(b))) + + result = run("compare", str(p1), str(p2)) + ok(result) + assert "* faabeca7" in result.output, \ + f"expected content-mismatch marker, got:\n{result.output}" + + +def test_sign_verify_round_trip(tmp_path: Path): + pkg = tmp_path / "a.bpak" + payload = tmp_path / "data.bin" + payload.write_bytes(b"hello") + priv, pub = _make_keypair(tmp_path) + + ok(run("create", str(pkg), "--force")) + ok(run("add", "part", str(pkg), "fs", "--from", str(payload))) + ok(run("add", "meta", str(pkg), "version", "--from-string", "1.0.0")) + + fail(run("sign", str(pkg)), "one of --key, --signature is required") + ok(run("sign", str(pkg), "--key", str(priv))) + + fail(run("verify", str(pkg)), + "one of --key, --keystore is required") + result = run("verify", str(pkg), "--key", str(pub)) + ok(result) + assert "Verification OK" in result.output + + +def test_verify_works_on_readonly_file(tmp_path: Path): + """`bpak verify` must not need write permission on the package.""" + pkg = tmp_path / "a.bpak" + payload = tmp_path / "data.bin" + payload.write_bytes(b"hello") + priv, pub = _make_keypair(tmp_path) + + ok(run("create", str(pkg), "--force")) + ok(run("add", "part", str(pkg), "fs", "--from", str(payload))) + ok(run("sign", str(pkg), "--key", str(priv))) + + os.chmod(pkg, 0o444) + try: + result = run("verify", str(pkg), "--key", str(pub)) + ok(result) + assert "Verification OK" in result.output + finally: + os.chmod(pkg, 0o644) + + +def test_transport_round_trip(tmp_path: Path): + """create -> add part -> transport add -> encode -> decode.""" + pkg = tmp_path / "a.bpak" + origin = tmp_path / "origin.bpak" + encoded = tmp_path / "encoded.bpak" + decoded = tmp_path / "decoded.bpak" + payload_a = tmp_path / "a.img" + payload_b = tmp_path / "b.img" + payload_a.write_bytes(b"A" * 4096) + payload_b.write_bytes(b"B" * 4096) + + # Build two packages sharing the same part id, used as origin / delta. + # Transport encoding requires the bpak-package uuid so the two sides + # can be matched up. + shared_uuid = "0888b0fa-9c48-4524-9845-06a641b61edd" + for p, src in ((origin, payload_a), (pkg, payload_b)): + ok(run("create", str(p), "--force")) + ok(run("add", "part", str(p), "fs", "--from", str(src))) + ok(run("add", "meta", str(p), "bpak-package", + "--from-string", shared_uuid, "--encoder", "uuid")) + ok(run("transport", "add", str(p), "fs", + "--encoder", "bsdiff", "--decoder", "bspatch")) + + ok(run("transport", "encode", str(pkg), + "--output", str(encoded), "--origin", str(origin))) + assert encoded.exists() and encoded.stat().st_size > 0 + + ok(run("transport", "decode", str(encoded), + "--output", str(decoded), "--origin", str(origin))) + assert decoded.exists() and decoded.stat().st_size > 0 + + +def test_show_hash_binary_tty_refusal_via_subprocess(tmp_path: Path): + pkg = tmp_path / "a.bpak" + ok(run("create", str(pkg), "--force")) + env = os.environ.copy() + env["PYTHONPATH"] = "../python/" + proc = subprocess.run( + ["script", "-qc", + f"{sys.executable} -m bpak show hash {pkg} --binary", "/dev/null"], + check=False, capture_output=True, env=env, + ) + combined = proc.stdout.decode("utf-8", errors="replace") + \ + proc.stderr.decode("utf-8", errors="replace") + assert "refusing to write binary data to a terminal" in combined, combined + + +# --- ad-hoc test runner --- + +def _main() -> int: + failures = [] + import inspect + import tempfile + + module_globals = globals() + test_names = sorted( + name for name, obj in module_globals.items() + if name.startswith("test_") and inspect.isfunction(obj) + ) + + for name in test_names: + fn = module_globals[name] + sig = inspect.signature(fn) + with tempfile.TemporaryDirectory(prefix="bpak-cli-") as td: + try: + if "tmp_path" in sig.parameters: + fn(Path(td)) + else: + fn() + except Exception as exc: # noqa: BLE001 + failures.append((name, exc)) + print(f"FAIL {name}: {exc}") + else: + print(f"ok {name}") + + if failures: + print(f"\n{len(failures)} / {len(test_names)} tests failed") + return 1 + print(f"\n{len(test_names)} tests passed") + return 0 + + +if __name__ == "__main__": + sys.exit(_main())