diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index dd9fc3fd..e1874f98 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -50,7 +50,7 @@ jobs: - name: Update latest tag if: ${{ steps.release_info.outputs.tag == 'latest' }} - uses: EndBug/latest-tag@fabb56bc8d15d5937c76719060da2226f5c3ffa8 + uses: EndBug/latest-tag@fabb56bc8d15d5937c76719060da2226f5c3ffa8 with: ref: latest description: Last state in main diff --git a/CHANGELOG.rst b/CHANGELOG.rst index f71b9787..43ec044e 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,11 @@ +Unreleased +========== + +* Add archive (``vcs: archive``) support for fetching dependencies from ``.tar.gz``, ``.tgz``, ``.tar.bz2``, ``.tar.xz`` and ``.zip`` files via HTTP, HTTPS or file URLs (#1058) +* Fix path-traversal check using character-based prefix comparison instead of path-component comparison (#1058) +* Fix directory hash being non-deterministic across filesystem traversal orders, causing false local-change detection (#1058) +* Fix ``dfetch freeze`` not capturing branch information for SVN projects when only the revision matched (#1058) + Release 0.12.1 (released 2026-02-24) ==================================== diff --git a/dfetch/commands/check.py b/dfetch/commands/check.py index e338fbb3..b42c59a0 100644 --- a/dfetch/commands/check.py +++ b/dfetch/commands/check.py @@ -15,6 +15,10 @@ .. scenario-include:: ../features/check-svn-repo.feature + .. tab:: Archive + + .. scenario-include:: ../features/check-archive.feature + Sub-manifests ~~~~~~~~~~~~~ diff --git a/dfetch/commands/format_patch.py b/dfetch/commands/format_patch.py index a0358668..ad092624 100644 --- a/dfetch/commands/format_patch.py +++ b/dfetch/commands/format_patch.py @@ -37,7 +37,11 @@ from dfetch.project.gitsubproject import GitSubProject from dfetch.project.subproject import SubProject from dfetch.project.svnsubproject import SvnSubProject -from dfetch.util.util import catch_runtime_exceptions, in_directory +from dfetch.util.util import ( + catch_runtime_exceptions, + check_no_path_traversal, + in_directory, +) from dfetch.vcs.patch import Patch, PatchAuthor, PatchInfo, PatchType logger = get_logger(__name__) @@ -80,11 +84,7 @@ def __call__(self, args: argparse.Namespace) -> None: output_dir_path = pathlib.Path(args.output_directory).resolve() - if not output_dir_path.is_relative_to(superproject.root_directory): - raise RuntimeError( - f"Output directory '{output_dir_path}' must be inside" - f" the superproject root '{superproject.root_directory}'" - ) + check_no_path_traversal(output_dir_path, superproject.root_directory) output_dir_path.mkdir(parents=True, exist_ok=True) diff --git a/dfetch/commands/freeze.py b/dfetch/commands/freeze.py index c3e38137..d057c488 100644 --- a/dfetch/commands/freeze.py +++ b/dfetch/commands/freeze.py @@ -36,6 +36,14 @@ .. scenario-include:: ../features/freeze-projects.feature +For archive projects, ``dfetch freeze`` adds the hash under the nested +``integrity.hash`` key (e.g. ``integrity.hash: sha256:``) to pin the +exact archive content used. This value acts as the version identifier: +DFetch verifies the downloaded archive against it on every subsequent +``dfetch update``. + +.. scenario-include:: ../features/freeze-archive.feature + """ import argparse @@ -78,24 +86,25 @@ def __call__(self, args: argparse.Namespace) -> None: with in_directory(superproject.root_directory): for project in superproject.manifest.projects: with catch_runtime_exceptions(exceptions) as exceptions: - on_disk_version = dfetch.project.create_sub_project( - project - ).on_disk_version() - - if project.version == on_disk_version: - logger.print_info_line( - project.name, - f"Already pinned in manifest on version {project.version}", - ) - elif on_disk_version: - logger.print_info_line( - project.name, f"Freezing on version {on_disk_version}" - ) - project.version = on_disk_version + sub_project = dfetch.project.create_sub_project(project) + on_disk_version = sub_project.on_disk_version() + + new_version = sub_project.freeze_project(project) + if new_version is None: + if on_disk_version: + logger.print_info_line( + project.name, + f"Already pinned in manifest on version {on_disk_version}", + ) + else: + logger.print_warning_line( + project.name, + "No version on disk, first update with 'dfetch update'", + ) else: - logger.print_warning_line( + logger.print_info_line( project.name, - "No version on disk, first update with 'dfetch update'", + f"Frozen on version {new_version}", ) projects.append(project) diff --git a/dfetch/commands/report.py b/dfetch/commands/report.py index d54ee31c..aa3fcd67 100644 --- a/dfetch/commands/report.py +++ b/dfetch/commands/report.py @@ -14,9 +14,9 @@ from dfetch.manifest.project import ProjectEntry from dfetch.project import create_super_project from dfetch.project.metadata import Metadata -from dfetch.project.subproject import SubProject from dfetch.reporting import REPORTERS, ReportTypes from dfetch.util.license import License, guess_license_in_file +from dfetch.util.util import is_license_file logger = get_logger(__name__) @@ -89,8 +89,7 @@ def _determine_licenses(project: ProjectEntry) -> list[License]: license_files = [] with dfetch.util.util.in_directory(project.destination): - - for license_file in filter(SubProject.is_license_file, glob.glob("*")): + for license_file in filter(is_license_file, glob.glob("*")): logger.debug(f"Found license file {license_file} for {project.name}") guessed_license = guess_license_in_file(license_file) @@ -107,10 +106,24 @@ def _determine_licenses(project: ProjectEntry) -> list[License]: @staticmethod def _determine_version(project: ProjectEntry) -> str: - """Determine the fetched version.""" + """Determine the fetched version. + + For archive projects the sha256 hash (``sha256:``) stored in the + metadata *revision* field is used as the version identifier. When no + metadata is present yet, the ``integrity.hash`` field from the manifest + is used as fallback so the SBOM can still be generated before the first + fetch. + """ try: metadata = Metadata.from_file(Metadata.from_project_entry(project).path) - version = metadata.tag or metadata.revision or "" + version = ( + metadata.tag + or metadata.revision + or project.tag + or project.revision + or project.hash + or "" + ) except FileNotFoundError: - version = project.tag or project.revision or "" + version = project.tag or project.revision or project.hash or "" return version diff --git a/dfetch/commands/update.py b/dfetch/commands/update.py index ab1ddd35..6e44ca35 100644 --- a/dfetch/commands/update.py +++ b/dfetch/commands/update.py @@ -15,6 +15,10 @@ .. scenario-include:: ../features/fetch-svn-repo.feature + .. tab:: Archive + + .. scenario-include:: ../features/fetch-archive.feature + Sub-manifests ~~~~~~~~~~~~~~~ @@ -37,7 +41,11 @@ from dfetch.commands.common import check_sub_manifests from dfetch.log import get_logger from dfetch.project import create_super_project -from dfetch.util.util import catch_runtime_exceptions, in_directory +from dfetch.util.util import ( + catch_runtime_exceptions, + check_no_path_traversal, + in_directory, +) logger = get_logger(__name__) @@ -85,9 +93,14 @@ def __call__(self, args: argparse.Namespace) -> None: for project in superproject.manifest.selected_projects(args.projects): with catch_runtime_exceptions(exceptions) as exceptions: self._check_destination(project, destinations) + destination = project.destination + + def _ignored(dst: str = destination) -> list[str]: + return list(superproject.ignored_files(dst)) + dfetch.project.create_sub_project(project).update( force=args.force, - files_to_ignore=superproject.ignored_files(project.destination), + ignored_files_callback=_ignored, ) if not args.no_recommendations and os.path.isdir( @@ -117,8 +130,9 @@ def _check_path_traversal( project: dfetch.manifest.project.ProjectEntry, real_path: str, safe_dir: str ) -> None: """Check if destination is outside the directory tree.""" - if os.path.commonprefix((real_path, safe_dir)) != safe_dir: - # See https://owasp.org/www-community/attacks/Path_Traversal + try: + check_no_path_traversal(real_path, safe_dir) + except RuntimeError: logger.print_warning_line( project.name, f'Skipping, path "{project.destination}" is outside manifest directory tree.', @@ -126,7 +140,7 @@ def _check_path_traversal( raise RuntimeError( "Destination must be in the manifests folder or a subfolder. " f'"{project.destination}" is outside this tree!' - ) + ) from None @staticmethod def _check_dst_not_in_blacklist( diff --git a/dfetch/commands/update_patch.py b/dfetch/commands/update_patch.py index efbb4710..c0117180 100644 --- a/dfetch/commands/update_patch.py +++ b/dfetch/commands/update_patch.py @@ -41,7 +41,11 @@ from dfetch.project.gitsuperproject import GitSuperProject from dfetch.project.metadata import Metadata from dfetch.project.superproject import NoVcsSuperProject, RevisionRange -from dfetch.util.util import catch_runtime_exceptions, in_directory +from dfetch.util.util import ( + catch_runtime_exceptions, + check_no_path_traversal, + in_directory, +) logger = get_logger(__name__) @@ -86,8 +90,10 @@ def __call__(self, args: argparse.Namespace) -> None: for project in superproject.manifest.selected_projects(args.projects): with catch_runtime_exceptions(exceptions) as exceptions: subproject = dfetch.project.create_sub_project(project) + destination = project.destination - files_to_ignore = superproject.ignored_files(project.destination) + def _ignored(dst: str = destination) -> list[str]: + return list(superproject.ignored_files(dst)) # Check if the project has a patch, maybe suggest creating one? if not subproject.patch: @@ -118,7 +124,7 @@ def __call__(self, args: argparse.Namespace) -> None: # force update to fetched version from metadata without applying patch subproject.update( force=True, - files_to_ignore=files_to_ignore, + ignored_files_callback=_ignored, patch_count=len(subproject.patch) - 1, ) @@ -141,7 +147,7 @@ def __call__(self, args: argparse.Namespace) -> None: # force update again to fetched version from metadata but with applying patch subproject.update( - force=True, files_to_ignore=files_to_ignore, patch_count=-1 + force=True, ignored_files_callback=_ignored, patch_count=-1 ) if exceptions: @@ -158,8 +164,8 @@ def _update_patch( patch_path = pathlib.Path(patch_to_update).resolve() try: - patch_path.relative_to(root) - except ValueError: + check_no_path_traversal(patch_path, root) + except RuntimeError: logger.print_warning_line( project_name, f'No updating patch "{patch_to_update}" which is outside {root}', diff --git a/dfetch/log.py b/dfetch/log.py index 52476ffa..0025fe25 100644 --- a/dfetch/log.py +++ b/dfetch/log.py @@ -3,12 +3,14 @@ import logging import os import sys +import types from contextlib import nullcontext from typing import Any, cast from rich.console import Console from rich.highlighter import NullHighlighter from rich.logging import RichHandler +from rich.markup import escape as markup_escape from rich.status import Status from dfetch import __version__ @@ -52,26 +54,30 @@ class DLogger(logging.Logger): def print_report_line(self, name: str, info: str) -> None: """Print a line for a report.""" + safe_name = markup_escape(name) + safe_info = markup_escape(info) self.info( - f" [bold][bright_green]{name:20s}:[/bright_green][blue] {info}[/blue][/bold]" + f" [bold][bright_green]{safe_name:20s}:[/bright_green][blue] {safe_info}[/blue][/bold]" ) def print_info_line(self, name: str, info: str) -> None: """Print a line of info, only printing the project name once.""" if name not in DLogger._printed_projects: - self.info(f" [bold][bright_green]{name}:[/bright_green][/bold]") + safe_name = markup_escape(name) + self.info(f" [bold][bright_green]{safe_name}:[/bright_green][/bold]") DLogger._printed_projects.add(name) - line = info.replace("\n", "\n ") + line = markup_escape(info).replace("\n", "\n ") self.info(f" [bold blue]> {line}[/bold blue]") def print_warning_line(self, name: str, info: str) -> None: """Print a warning line: green name, yellow value.""" if name not in DLogger._printed_projects: - self.info(f" [bold][bright_green]{name}:[/bright_green][/bold]") + safe_name = markup_escape(name) + self.info(f" [bold][bright_green]{safe_name}:[/bright_green][/bold]") DLogger._printed_projects.add(name) - line = info.replace("\n", "\n ") + line = markup_escape(info).replace("\n", "\n ") self.info(f" [bold bright_yellow]> {line}[/bold bright_yellow]") def print_title(self) -> None: @@ -85,12 +91,14 @@ def print_info_field(self, field_name: str, field: str) -> None: def warning(self, msg: object, *args: Any, **kwargs: Any) -> None: """Log warning.""" super().warning( - f"[bold bright_yellow]{msg}[/bold bright_yellow]", *args, **kwargs + f"[bold bright_yellow]{markup_escape(str(msg))}[/bold bright_yellow]", + *args, + **kwargs, ) def error(self, msg: object, *args: Any, **kwargs: Any) -> None: """Log error.""" - super().error(f"[red]{msg}[/red]", *args, **kwargs) + super().error(f"[red]{markup_escape(str(msg))}[/red]", *args, **kwargs) def status( self, name: str, message: str, spinner: str = "dots", enabled: bool = True @@ -111,11 +119,12 @@ def status( return nullcontext(None) if name not in DLogger._printed_projects: - self.info(f" [bold][bright_green]{name}:[/bright_green][/bold]") + safe_name = markup_escape(name) + self.info(f" [bold][bright_green]{safe_name}:[/bright_green][/bold]") DLogger._printed_projects.add(name) return Status( - f"[bold bright_blue]> {message}[/bold bright_blue]", + f"[bold bright_blue]> {markup_escape(message)}[/bold bright_blue]", spinner=spinner, console=rich_console, ) @@ -138,8 +147,9 @@ def filter(self, record: logging.LogRecord) -> bool: """Add indentation to the log record message.""" color = "blue" if record.levelno < logging.WARNING else "yellow" - line = record.msg.replace("\n", "\n ") + line = markup_escape(record.getMessage()).replace("\n", "\n ") record.msg = f"{self.prefix}[{color}]{line}[/{color}]" + record.args = () return True @@ -186,7 +196,22 @@ def get_logger(name: str, console: Console | None = None) -> DLogger: def configure_external_logger(name: str, level: int = logging.INFO) -> None: """Configure an external logger from a third party package.""" logger = logging.getLogger(name) + # Ensure the external logger is a plain Logger so its log methods do not + # wrap messages in Rich markup (which DLogger.warning / DLogger.error do). + # Without this, markup_escape in ExtLogFilter would turn those Rich tags + # into literal text that shifts tab-stop calculations when rendered. + logger.__class__ = logging.Logger logger.setLevel(level) logger.propagate = True logger.handlers.clear() logger.addFilter(ExtLogFilter()) + # Some packages (e.g. patch_ng) cache logger bound-methods as module-level + # names at import time (e.g. `warning = logger.warning`). After the + # __class__ reassignment above those cached references still point at the + # old DLogger method, so re-bind them to the freshly demoted logger. + module = sys.modules.get(name.split(".")[0]) + if module is not None: + for method_name in ("debug", "info", "warning", "error", "critical"): + attr = getattr(module, method_name, None) + if isinstance(attr, types.MethodType) and attr.__self__ is logger: + setattr(module, method_name, getattr(logger, method_name)) diff --git a/dfetch/manifest/manifest.py b/dfetch/manifest/manifest.py index 0f30e983..ec084c28 100644 --- a/dfetch/manifest/manifest.py +++ b/dfetch/manifest/manifest.py @@ -97,7 +97,9 @@ class ManifestDict(TypedDict, total=True): # pylint: disable=too-many-ancestors version: int | str remotes: NotRequired[Sequence[RemoteDict | Remote]] - projects: Sequence[ProjectEntryDict | ProjectEntry | dict[str, str | list[str]]] + projects: Sequence[ + ProjectEntryDict | ProjectEntry | dict[str, str | list[str] | dict[str, str]] + ] class Manifest: @@ -140,14 +142,16 @@ def __init__( def _init_projects( self, projects: Sequence[ - ProjectEntryDict | ProjectEntry | dict[str, str | list[str]] + ProjectEntryDict + | ProjectEntry + | dict[str, str | list[str] | dict[str, str]] ], ) -> dict[str, ProjectEntry]: """Iterate over projects from manifest and initialize ProjectEntries from it. Args: projects (Sequence[ - Union[ProjectEntryDict, ProjectEntry, Dict[str, Union[str, list[str]]]] + Union[ProjectEntryDict, ProjectEntry, Dict[str, Union[str, list[str], dict[str, str]]]] ]): Iterable with projects Raises: @@ -304,9 +308,11 @@ def _as_dict(self) -> dict[str, ManifestDict]: if len(remotes) == 1: remotes[0].pop("default", None) - projects: list[dict[str, str | list[str]]] = [] + projects: list[dict[str, str | list[str] | dict[str, str]]] = [] for project in self.projects: - project_yaml: dict[str, str | list[str]] = project.as_yaml() + project_yaml: dict[str, str | list[str] | dict[str, str]] = ( + project.as_yaml() + ) if len(remotes) == 1: project_yaml.pop("remote", None) projects.append(project_yaml) diff --git a/dfetch/manifest/parse.py b/dfetch/manifest/parse.py index df29901d..1c0bb4e3 100644 --- a/dfetch/manifest/parse.py +++ b/dfetch/manifest/parse.py @@ -10,7 +10,11 @@ from dfetch.log import get_logger from dfetch.manifest.manifest import Manifest, ManifestDict from dfetch.manifest.schema import MANIFEST_SCHEMA -from dfetch.util.util import find_file, prefix_runtime_exceptions +from dfetch.util.util import ( + check_no_path_traversal, + find_file, + prefix_runtime_exceptions, +) logger = get_logger(__name__) @@ -92,7 +96,9 @@ def get_submanifests(skip: list[str] | None = None) -> list[Manifest]: for path in find_file(DEFAULT_MANIFEST_NAME, root_dir): path = os.path.realpath(path) - if os.path.commonprefix((path, root_dir)) != root_dir: + try: + check_no_path_traversal(path, root_dir) + except RuntimeError: logger.warning(f"Sub-manifest {path} is outside {root_dir}") continue diff --git a/dfetch/manifest/project.py b/dfetch/manifest/project.py index 2d66a136..ed5e8202 100644 --- a/dfetch/manifest/project.py +++ b/dfetch/manifest/project.py @@ -221,6 +221,49 @@ vcs: svn repo-path: cpputest/cpputest +Archive +####### +Projects distributed as ``.tar.gz``, ``.tgz``, ``.tar.bz2``, ``.tar.xz`` or ``.zip`` archive files +can be fetched using ``vcs: archive``. DFetch downloads the archive from the ``url:`` and extracts +it to the destination directory, stripping the top-level directory if present. + +The ``src:`` and ``ignore:`` attributes work the same way as for git/SVN projects. + +.. code-block:: yaml + + manifest: + version: 0.0 + + projects: + - name: my-library + vcs: archive + url: https://example.com/releases/my-library-1.0.tar.gz + +Integrity verification +********************** +Use the ``integrity:`` block to verify the integrity of the downloaded archive. +Currently the ``hash:`` sub-field is supported (format ``:``); +only ``sha256`` is recognised today, but the block is designed to grow to support +detached signature verification via ``sig:`` (signature URL) and ``sig-key:`` +(signing-key URL or fingerprint) in the future. + +.. code-block:: yaml + + manifest: + version: 0.0 + + projects: + - name: my-library + vcs: archive + url: https://example.com/releases/my-library-1.0.tar.gz + integrity: + hash: sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 + +Run ``dfetch freeze`` after an initial ``dfetch update`` to add the sha256 hash to +the manifest automatically. + +.. scenario-include:: ../features/fetch-archive.feature + Patch ##### *DFetch* promotes upstreaming changes, but also allows local changes. These changes can be managed with local patch @@ -277,6 +320,7 @@ import copy from collections.abc import Sequence +from dataclasses import dataclass, field from typing_extensions import Required, TypedDict @@ -284,6 +328,32 @@ from dfetch.manifest.version import Version from dfetch.util.util import always_str_list, str_if_possible + +@dataclass +class Integrity: + """Integrity verification data for an archive dependency. + + Holds the ``hash:`` sub-field today and is designed to accommodate + future signature-verification fields: + + * ``sig`` - URL of a detached signature file (``.sig`` / ``.asc``). + * ``sig_key`` - URL or fingerprint of the signing key (``.p7s`` / ``.gpg``). + """ + + hash: str = field(default="") + + def __bool__(self) -> bool: + """Return *True* when any integrity data is present.""" + return bool(self.hash) + + def as_yaml(self) -> dict[str, str]: + """Serialise to a YAML-compatible dict, omitting empty fields.""" + data: dict[str, str] = {} + if self.hash: + data["hash"] = self.hash + return data + + ProjectEntryDict = TypedDict( "ProjectEntryDict", { @@ -300,6 +370,7 @@ "repo-path": str, "vcs": str, "ignore": Sequence[str], + "integrity": dict[str, str], "default_remote": str, }, total=False, @@ -327,6 +398,8 @@ def __init__(self, kwargs: ProjectEntryDict) -> None: self._tag: str = kwargs.get("tag", "") self._vcs: str = kwargs.get("vcs", "") self._ignore: Sequence[str] = kwargs.get("ignore", []) + integrity_data: dict[str, str] = kwargs.get("integrity", {}) + self._integrity = Integrity(hash=integrity_data.get("hash", "")) if not self._remote and not self._url: self._remote = kwargs.get("default_remote", "") @@ -334,7 +407,7 @@ def __init__(self, kwargs: ProjectEntryDict) -> None: @classmethod def from_yaml( cls, - yamldata: dict[str, str | list[str]] | ProjectEntryDict, + yamldata: dict[str, str | list[str] | dict[str, str]] | ProjectEntryDict, default_remote: str = "", ) -> "ProjectEntry": """Create a Project Entry from yaml data. @@ -443,6 +516,21 @@ def ignore(self) -> Sequence[str]: """Get the list of files/folders to ignore from this project (relative to src).""" return self._ignore + @property + def integrity(self) -> Integrity: + """Get the integrity verification data for this archive project.""" + return self._integrity + + @property + def hash(self) -> str: + """Convenience accessor for ``integrity.hash``.""" + return self._integrity.hash + + @hash.setter + def hash(self, value: str) -> None: + """Set ``integrity.hash`` (convenience setter used by freeze).""" + self._integrity.hash = value + def __repr__(self) -> str: """Get a string representation of this project entry.""" version = ( @@ -463,9 +551,9 @@ def as_recommendation(self) -> "ProjectEntry": recommendation._repo_path = "" # pylint: disable=protected-access return recommendation - def as_yaml(self) -> dict[str, str | list[str]]: + def as_yaml(self) -> dict[str, str | list[str] | dict[str, str]]: """Get this project as yaml dictionary.""" - yamldata = { + yamldata: dict[str, str | list[str] | dict[str, str] | None] = { "name": self._name, "revision": self._revision, "remote": self._remote, @@ -477,6 +565,7 @@ def as_yaml(self) -> dict[str, str | list[str]]: "tag": self._tag, "repo-path": self._repo_path, "vcs": self._vcs, + "integrity": self._integrity.as_yaml() or None, } return {k: v for k, v in yamldata.items() if v} diff --git a/dfetch/manifest/schema.py b/dfetch/manifest/schema.py index 823b63ce..f7b59f0a 100644 --- a/dfetch/manifest/schema.py +++ b/dfetch/manifest/schema.py @@ -15,6 +15,21 @@ } ) +HASH_STR = Regex( + r"^(sha256:[a-fA-F0-9]{64}|sha384:[a-fA-F0-9]{96}|sha512:[a-fA-F0-9]{128})$" +) + +# ``integrity:`` block — designed for future extension with ``sig:`` and +# ``sig-key:`` fields for detached signature / signing-key verification. +INTEGRITY_MAP = Map( + { + Optional("hash"): HASH_STR, + # Future fields (uncomment when implemented): + # Optional("sig"): SAFE_STR, # detached signature URL (.sig / .asc) + # Optional("sig-key"): SAFE_STR, # signing-key URL or fingerprint (.p7s / .gpg) + } +) + PROJECT_SCHEMA = Map( { "name": SAFE_STR, @@ -26,9 +41,10 @@ Optional("repo-path"): SAFE_STR, Optional("remote"): SAFE_STR, Optional("patch"): SAFE_STR | Seq(SAFE_STR), - Optional("vcs"): Enum(["git", "svn"]), + Optional("vcs"): Enum(["git", "svn", "archive"]), Optional("src"): SAFE_STR, Optional("ignore"): Seq(SAFE_STR), + Optional("integrity"): INTEGRITY_MAP, } ) diff --git a/dfetch/project/__init__.py b/dfetch/project/__init__.py index 57dfde14..b6bb3ecd 100644 --- a/dfetch/project/__init__.py +++ b/dfetch/project/__init__.py @@ -6,6 +6,7 @@ import dfetch.manifest.project from dfetch.log import get_logger from dfetch.manifest.parse import find_manifest, parse +from dfetch.project.archivesubproject import ArchiveSubProject from dfetch.project.gitsubproject import GitSubProject from dfetch.project.gitsuperproject import GitSuperProject from dfetch.project.subproject import SubProject @@ -14,7 +15,9 @@ from dfetch.project.svnsuperproject import SvnSuperProject from dfetch.util.util import resolve_absolute_path -SUPPORTED_SUBPROJECT_TYPES = [GitSubProject, SvnSubProject] +SUPPORTED_SUBPROJECT_TYPES: list[ + type[ArchiveSubProject] | type[GitSubProject] | type[SvnSubProject] +] = [ArchiveSubProject, GitSubProject, SvnSubProject] SUPPORTED_SUPERPROJECT_TYPES = [GitSuperProject, SvnSuperProject] logger = get_logger(__name__) diff --git a/dfetch/project/archivesubproject.py b/dfetch/project/archivesubproject.py new file mode 100644 index 00000000..bd702ebf --- /dev/null +++ b/dfetch/project/archivesubproject.py @@ -0,0 +1,251 @@ +"""Archive (tar/zip) specific implementation. + +Archives are a third VCS type alongside ``git`` and ``svn``. They represent +versioned dependencies that are distributed as ``.tar.gz``, ``.tgz``, +``.tar.bz2``, ``.tar.xz`` or ``.zip`` files reachable via ``http://``, +``https://``, or ``file://`` URLs. + +Unlike git and SVN, archives have no inherent "branching" or "tagging" +concept. Version identity is expressed through: + +* **No hash** - the URL itself acts as the identity. The archive is + considered up-to-date as long as the same URL is still reachable. +* **``integrity.hash: :``** - the cryptographic hash of the + archive file acts as the version identifier. The fetch step verifies the + downloaded archive against this hash and raises an error on mismatch. + +The ``integrity:`` block is designed for future extension: ``sig:`` and +``sig-key:`` fields for detached signature / signing-key verification will +slot in alongside ``hash:`` without breaking existing manifests. +Supported hash algorithms: ``sha256``, ``sha384``, and ``sha512``. + +Example manifest entries:: + + projects: + # URL-pinned (no integrity check) + - name: my-headers + url: https://example.com/my-headers-latest.tar.gz + vcs: archive + + # Hash-pinned (integrity verified on every fetch) + - name: my-library + url: https://example.com/releases/my-library-1.0.tar.gz + vcs: archive + integrity: + hash: sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 + +.. scenario-include:: ../features/fetch-archive.feature + +.. scenario-include:: ../features/freeze-archive.feature +""" + +from __future__ import annotations + +import http.client +import os +import pathlib +import tempfile + +from dfetch.log import get_logger +from dfetch.manifest.project import ProjectEntry +from dfetch.manifest.version import Version +from dfetch.project.subproject import SubProject +from dfetch.vcs.archive import ( + ARCHIVE_EXTENSIONS, + ArchiveLocalRepo, + ArchiveRemote, + is_archive_url, +) +from dfetch.vcs.integrity_hash import IntegrityHash + +logger = get_logger(__name__) + + +def _suffix_for_url(url: str) -> str: + """Return the archive file suffix for *url* (e.g. ``'.tar.gz'``, ``'.zip'``).""" + lower = url.lower() + for ext in sorted(ARCHIVE_EXTENSIONS, key=len, reverse=True): + if lower.endswith(ext): + return ext + return ".archive" + + +class ArchiveSubProject(SubProject): + """A project fetched from a tar/zip archive URL. + + Supports ``src:`` (sub-path extraction), ``ignore:`` (file exclusion) and + ``patch:`` (local patches applied after every fetch) in the same way as + the git and SVN implementations. + """ + + NAME = "archive" + + def __init__(self, project: ProjectEntry) -> None: + """Create an ArchiveSubProject.""" + super().__init__(project) + self._project_entry = project + self._remote_repo = ArchiveRemote(project.remote_url) + + def check(self) -> bool: + """Return *True* when the project URL looks like an archive.""" + return is_archive_url(self.remote) + + @staticmethod + def revision_is_enough() -> bool: + """Archives are uniquely identified by their hash (or URL), so yes.""" + return True + + @staticmethod + def list_tool_info() -> None: + """Log information about the archive fetching tool (Python's http.client).""" + SubProject._log_tool("http.client", http.client.__doc__ or "built-in") + + def get_default_branch(self) -> str: + """Archives have no branches; return an empty string.""" + return "" + + def _latest_revision_on_branch(self, branch: str) -> str: + """For archives the 'latest revision' is always the URL (or hash).""" + del branch + return self.remote + + def _download_and_compute_hash( + self, algorithm: str = "sha256", url: str | None = None + ) -> IntegrityHash: + """Download the archive to a temporary file and return its :class:`IntegrityHash`. + + The hash is computed during the download stream — no extra file read. + The temporary file is always cleaned up, even on error. + + Args: + algorithm: Hash algorithm to use (``sha256``, ``sha384``, ``sha512``). + url: If given, download from this URL instead of ``self._remote_repo``. + Use this to pin to the exact URL stored in the on-disk revision. + + Raises: + RuntimeError: On download failure or unsupported algorithm. + """ + effective_url = url if url is not None else self.remote + remote = ArchiveRemote(effective_url) if url is not None else self._remote_repo + fd, tmp_path = tempfile.mkstemp(suffix=_suffix_for_url(effective_url)) + os.close(fd) + try: + hex_digest = remote.download(tmp_path, algorithm=algorithm) + return IntegrityHash(algorithm, hex_digest) + finally: + try: + os.remove(tmp_path) + except OSError: + pass + + def _does_revision_exist(self, revision: str) -> bool: # noqa: ARG002 + """Check whether the archive URL is still reachable. + + A lightweight HEAD (or partial-GET) reachability check is used for + all revision types, including hash-pinned ones. Full content-integrity + verification is intentionally deferred to fetch time (``_fetch_impl``), + keeping ``dfetch check`` fast even for large archives over slow links. + """ + return self._remote_repo.is_accessible() + + def _list_of_tags(self) -> list[str]: + """Archives have no tags; returns an empty list.""" + return [] + + @property + def wanted_version(self) -> Version: + """Version derived from the ``integrity.hash`` field or the archive URL. + + * With ``integrity.hash: :`` → ``Version(revision=':')`` + * Without hash → ``Version(revision=)`` + + This makes the standard :class:`~dfetch.project.subproject.SubProject` + comparison machinery work transparently for archives. + """ + if self._project_entry.hash: + return Version(revision=self._project_entry.hash) + return Version(revision=self.remote) + + def _fetch_impl(self, version: Version) -> Version: + """Download and extract the archive to the local destination. + + 1. Download the archive to a temporary file. + 2. If ``integrity.hash`` is specified, verify the downloaded file. + 3. Extract to :attr:`local_path`, respecting ``src:`` and ``ignore:``. + + Raises: + RuntimeError: On download failure or hash mismatch. + + Returns: + The version that was actually fetched (hash string or URL). + """ + revision = version.revision + + pathlib.Path(self.local_path).mkdir(parents=True, exist_ok=True) + + fd, tmp_path = tempfile.mkstemp(suffix=_suffix_for_url(self.remote)) + os.close(fd) + try: + expected = IntegrityHash.parse(revision) + if expected: + actual_hex = self._remote_repo.download( + tmp_path, algorithm=expected.algorithm + ) + if not expected.matches(actual_hex): + raise RuntimeError( + f"Hash mismatch for {self._project_entry.name}! " + f"{expected.algorithm} expected {expected.hex_digest}" + ) + else: + self._remote_repo.download(tmp_path) + + ArchiveLocalRepo.extract( + tmp_path, + self.local_path, + src=self.source, + ignore=self.ignore, + ) + finally: + try: + os.remove(tmp_path) + except OSError: + pass + + return version + + def freeze_project(self, project: ProjectEntry) -> str | None: + """Pin *project* to a cryptographic hash of the archive. + + * If the archive was already fetched with a hash, the on-disk revision + (``sha256:``) is written to ``integrity.hash`` in the manifest. + * If the archive was fetched without a hash (URL-only), the archive is + downloaded again, its SHA-256 is computed, and the result is written + to ``integrity.hash``. This ensures the manifest always ends up + pinned to a specific content fingerprint. SHA-256 is used as the + default algorithm when no prior hash is present. + + Returns: + The ``:`` string written to *project*, or *None* if + the manifest was already up-to-date. + + Raises: + RuntimeError: On download or hash-computation failure so the caller + can log a meaningful error rather than silently claiming the + project is already pinned. + """ + on_disk = self.on_disk_version() + if not on_disk: + return None + + revision = on_disk.revision + + # Already hash-pinned — use the on-disk revision directly. + # Otherwise download from the revision URL (not the possibly-updated manifest URL). + pinned = IntegrityHash.parse(revision) or self._download_and_compute_hash( + "sha256", url=revision + ) + new_hash = str(pinned) + if project.hash == new_hash: + return None + project.hash = new_hash + return new_hash diff --git a/dfetch/project/gitsubproject.py b/dfetch/project/gitsubproject.py index c52f3208..db98ba3d 100644 --- a/dfetch/project/gitsubproject.py +++ b/dfetch/project/gitsubproject.py @@ -8,7 +8,7 @@ from dfetch.manifest.project import ProjectEntry from dfetch.manifest.version import Version from dfetch.project.subproject import SubProject -from dfetch.util.util import safe_rmtree +from dfetch.util.util import LICENSE_GLOBS, safe_rmtree from dfetch.vcs.git import GitLocalRepo, GitRemote, get_git_version logger = get_logger(__name__) @@ -64,8 +64,8 @@ def _fetch_impl(self, version: Version) -> Version: # When exporting a file, the destination directory must already exist pathlib.Path(self.local_path).mkdir(parents=True, exist_ok=True) - license_globs = [f"/{name.lower()}" for name in self.LICENSE_GLOBS] + [ - f"/{name.upper()}" for name in self.LICENSE_GLOBS + license_globs = [f"/{name.lower()}" for name in LICENSE_GLOBS] + [ + f"/{name.upper()}" for name in LICENSE_GLOBS ] local_repo = GitLocalRepo(self.local_path) diff --git a/dfetch/project/gitsuperproject.py b/dfetch/project/gitsuperproject.py index ec9f7928..d5e547a9 100644 --- a/dfetch/project/gitsuperproject.py +++ b/dfetch/project/gitsuperproject.py @@ -16,7 +16,7 @@ from dfetch.project.gitsubproject import GitSubProject from dfetch.project.subproject import SubProject from dfetch.project.superproject import RevisionRange, SuperProject -from dfetch.util.util import resolve_absolute_path +from dfetch.util.util import check_no_path_traversal, resolve_absolute_path from dfetch.vcs.git import GitLocalRepo logger = get_logger(__name__) @@ -43,10 +43,7 @@ def ignored_files(self, path: str) -> Sequence[str]: """Return a list of files that can be ignored in a given path.""" resolved_path = resolve_absolute_path(path) - if not resolved_path.is_relative_to(self.root_directory): - raise RuntimeError( - f"{resolved_path} not in superproject {self.root_directory}!" - ) + check_no_path_traversal(resolved_path, self.root_directory) return GitLocalRepo.ignored_files(path) diff --git a/dfetch/project/subproject.py b/dfetch/project/subproject.py index 20f685e8..5649b92c 100644 --- a/dfetch/project/subproject.py +++ b/dfetch/project/subproject.py @@ -1,10 +1,9 @@ """SubProject.""" -import fnmatch import os import pathlib from abc import ABC, abstractmethod -from collections.abc import Sequence +from collections.abc import Callable, Sequence from dfetch.log import get_logger from dfetch.manifest.project import ProjectEntry @@ -26,7 +25,6 @@ class SubProject(ABC): """ NAME = "" - LICENSE_GLOBS = ["licen[cs]e*", "copying*", "copyright*"] def __init__(self, project: ProjectEntry) -> None: """Create the subproject.""" @@ -92,7 +90,7 @@ def update_is_required(self, force: bool = False) -> Version | None: def update( self, force: bool = False, - files_to_ignore: Sequence[str] | None = None, + ignored_files_callback: Callable[[], Sequence[str]] | None = None, patch_count: int = -1, ) -> None: """Update this subproject if required. @@ -100,7 +98,11 @@ def update( Args: force (bool, optional): Ignore if version is ok or any local changes were done. Defaults to False. - files_to_ignore (Sequence[str], optional): list of files that are ok to overwrite. + ignored_files_callback (Callable, optional): Called to obtain the set of files + to ignore. Invoked twice: once before clearing the destination (to detect + pre-existing local changes) and once after extraction (to compute the stored + hash). Calling it at both points ensures the stored hash and the check-time + hash use the same skiplist, preventing false "local changes" reports. patch_count (int, optional): Number of patches to apply (-1 means all). """ to_fetch = self.update_is_required(force) @@ -108,9 +110,11 @@ def update( if not to_fetch: return - files_to_ignore = files_to_ignore or [] + pre_fetch_ignored = ( + list(ignored_files_callback()) if ignored_files_callback else [] + ) - if not force and self._are_there_local_changes(files_to_ignore): + if not force and self._are_there_local_changes(pre_fetch_ignored): self._log_project( "skipped - local changes after last update (use --force to overwrite)" ) @@ -130,9 +134,16 @@ def update( applied_patches = self._apply_patches(patch_count) + post_fetch_ignored = ( + list(ignored_files_callback()) if ignored_files_callback else [] + ) + self.__metadata.fetched( actually_fetched, - hash_=hash_directory(self.local_path, skiplist=[self.__metadata.FILENAME]), + hash_=hash_directory( + self.local_path, + skiplist=[self.__metadata.FILENAME] + post_fetch_ignored, + ), patch_=applied_patches, ) @@ -388,10 +399,33 @@ def _fetch_impl(self, version: Version) -> Version: def get_default_branch(self) -> str: """Get the default branch of this repository.""" - @staticmethod - def is_license_file(filename: str) -> bool: - """Check if the given filename is a license file.""" - return any( - fnmatch.fnmatch(filename.lower(), pattern) - for pattern in SubProject.LICENSE_GLOBS - ) + def freeze_project(self, project: ProjectEntry) -> str | None: + """Freeze *project* to its current on-disk version. + + Subclasses may override this to apply VCS-specific freeze logic (e.g. + :class:`~dfetch.project.archivesubproject.ArchiveSubProject` stores + the hash under ``integrity.hash`` rather than ``revision:``). + + Returns: + The version string that was written to *project* when a change was + made, or *None* if the entry was already pinned to the on-disk + version or no on-disk version could be determined. + + Raises: + RuntimeError: When VCS-specific freeze logic fails (e.g. archive + download error). Callers should catch and report these. + """ + on_disk_version = self.on_disk_version() + if ( + on_disk_version + and project.version.tag == on_disk_version.tag + and project.version.revision == on_disk_version.revision + and (bool(project.version.tag) or self.revision_is_enough()) + ): + return None + if on_disk_version: + project.version = on_disk_version + return ( + on_disk_version.revision or on_disk_version.tag or str(on_disk_version) + ) + return None diff --git a/dfetch/project/superproject.py b/dfetch/project/superproject.py index e7c8a199..4d5cce56 100644 --- a/dfetch/project/superproject.py +++ b/dfetch/project/superproject.py @@ -19,7 +19,7 @@ from dfetch.manifest.manifest import Manifest from dfetch.manifest.project import ProjectEntry from dfetch.project.subproject import SubProject -from dfetch.util.util import resolve_absolute_path +from dfetch.util.util import check_no_path_traversal, resolve_absolute_path logger = get_logger(__name__) @@ -136,10 +136,7 @@ def ignored_files(self, path: str) -> Sequence[str]: """Return a list of files that can be ignored in a given path.""" resolved_path = resolve_absolute_path(path) - if not resolved_path.is_relative_to(self.root_directory): - raise RuntimeError( - f"{resolved_path} not in superproject {self.root_directory}!" - ) + check_no_path_traversal(resolved_path, self.root_directory) return [] diff --git a/dfetch/project/svnsubproject.py b/dfetch/project/svnsubproject.py index 6284daaf..85c28c22 100644 --- a/dfetch/project/svnsubproject.py +++ b/dfetch/project/svnsubproject.py @@ -11,6 +11,7 @@ from dfetch.util.util import ( find_matching_files, find_non_matching_files, + is_license_file, safe_rm, ) from dfetch.vcs.svn import SvnRemote, SvnRepo, get_svn_version @@ -103,7 +104,7 @@ def _determine_what_to_fetch(self, version: Version) -> tuple[str, str, str]: def _remove_ignored_files(self) -> None: """Remove any ignored files, whilst keeping license files.""" for file_or_dir in find_matching_files(self.local_path, self.ignore): - if not (file_or_dir.is_file() and self.is_license_file(file_or_dir.name)): + if not (file_or_dir.is_file() and is_license_file(file_or_dir.name)): safe_rm(file_or_dir) def _fetch_impl(self, version: Version) -> Version: @@ -168,9 +169,7 @@ def _get_info(self, branch: str) -> dict[str, str]: def _license_files(url_path: str) -> list[str]: return [ str(license) - for license in filter( - SvnSubProject.is_license_file, SvnRepo.files_in_path(url_path) - ) + for license in filter(is_license_file, SvnRepo.files_in_path(url_path)) ] def _get_revision(self, branch: str) -> str: diff --git a/dfetch/project/svnsuperproject.py b/dfetch/project/svnsuperproject.py index e19aa11f..c3d708bb 100644 --- a/dfetch/project/svnsuperproject.py +++ b/dfetch/project/svnsuperproject.py @@ -17,6 +17,7 @@ from dfetch.project.superproject import RevisionRange, SuperProject from dfetch.project.svnsubproject import SvnSubProject from dfetch.util.util import ( + check_no_path_traversal, in_directory, resolve_absolute_path, ) @@ -47,10 +48,7 @@ def ignored_files(self, path: str) -> Sequence[str]: """Return a list of files that can be ignored in a given path.""" resolved_path = resolve_absolute_path(path) - if not resolved_path.is_relative_to(self.root_directory): - raise RuntimeError( - f"{resolved_path} not in superproject {self.root_directory}!" - ) + check_no_path_traversal(resolved_path, self.root_directory) return SvnRepo.ignored_files(path) diff --git a/dfetch/reporting/sbom_reporter.py b/dfetch/reporting/sbom_reporter.py index f5b352fc..ab5f0aef 100644 --- a/dfetch/reporting/sbom_reporter.py +++ b/dfetch/reporting/sbom_reporter.py @@ -14,6 +14,20 @@ :scenario: A fetched project generates a json sbom +Archive dependencies +-------------------- +Archive dependencies (tar.gz, zip, …) are recorded with a ``distribution`` +external reference and, when an ``integrity.hash:`` field is set, a ``SHA-256`` +component hash for supply-chain integrity verification. + +.. scenario-include:: ../features/report-sbom-archive.feature + :scenario: + A fetched archive without a hash generates a json sbom + +.. scenario-include:: ../features/report-sbom-archive.feature + :scenario: + A fetched archive with sha256 hash generates a json sbom with hash + Gitlab ------ Let *DFetch* generate a SBoM and add the result as artifact in your gitlab-ci runner. @@ -71,7 +85,13 @@ from decimal import Decimal from cyclonedx.builder.this import this_component as cdx_lib_component -from cyclonedx.model import ExternalReference, ExternalReferenceType, XsUri +from cyclonedx.model import ( + ExternalReference, + ExternalReferenceType, + HashAlgorithm, + HashType, + XsUri, +) from cyclonedx.model.bom import Bom from cyclonedx.model.component import Component, ComponentType from cyclonedx.model.component_evidence import ( @@ -87,12 +107,16 @@ from cyclonedx.model.license import LicenseAcknowledgement from cyclonedx.output import make_outputter from cyclonedx.schema import OutputFormat, SchemaVersion +from packageurl import PackageURL -import dfetch.util.purl +import dfetch from dfetch.manifest.manifest import Manifest from dfetch.manifest.project import ProjectEntry from dfetch.reporting.reporter import Reporter from dfetch.util.license import License +from dfetch.util.purl import vcs_url_to_purl +from dfetch.vcs.archive import archive_url_to_purl +from dfetch.vcs.integrity_hash import IntegrityHash # PyRight is pedantic with decorators see https://github.com/madpah/serializable/issues/8 # It might be fixable with https://github.com/microsoft/pyright/discussions/4426, would prefer @@ -100,6 +124,14 @@ # pyright: reportCallIssue=false, reportAttributeAccessIssue=false +# Map from dfetch hash-field algorithm prefix to CycloneDX HashAlgorithm name +DFETCH_TO_CDX_HASH_ALGORITHM: dict[str, str] = { + "sha256": "SHA-256", + "sha384": "SHA-384", + "sha512": "SHA-512", +} + + class SbomReporter(Reporter): """Reporter for generating SBoM's.""" @@ -166,14 +198,15 @@ def add_project( version: str, ) -> None: """Add a project to the report.""" - purl = dfetch.util.purl.remote_url_to_purl( - project.remote_url, version=version, subpath=project.source or None - ) - + subpath = project.source or None + if project.vcs == "archive": + purl = archive_url_to_purl( + project.remote_url, version=version, subpath=subpath + ) + else: + purl = vcs_url_to_purl(project.remote_url, version=version, subpath=subpath) name = project.name if purl.type == "generic" else purl.name - location = self.manifest.find_name_in_manifest(project.name) - component = Component( name=name, version=version, @@ -229,8 +262,17 @@ def add_project( ], ), ) + self._apply_external_references(component, purl, version) + self._apply_licenses(component, licenses) + self._bom.components.add(component) + @staticmethod + def _apply_external_references( + component: Component, purl: PackageURL, version: str + ) -> None: + """Attach external references to *component* based on its PURL type.""" if purl.type == "github": + component.group = purl.namespace component.external_references.add( ExternalReference( type=ExternalReferenceType.VCS, @@ -238,40 +280,70 @@ def add_project( ) ) elif purl.type == "bitbucket": + component.group = purl.namespace component.external_references.add( ExternalReference( type=ExternalReferenceType.VCS, url=XsUri(f"https://bitbucket.org/{purl.namespace}/{purl.name}"), ) ) + elif purl.qualifiers.get("download_url"): + SbomReporter._apply_archive_refs(component, purl, version) else: - component.group = purl.namespace + SbomReporter._apply_vcs_refs(component, purl) - vcs_url = purl.qualifiers.get("vcs_url", "") - # ExternalReferenceType.VCS does not support ssh:// urls - if vcs_url and "ssh://" not in vcs_url: - component.external_references.add( - ExternalReference( - type=ExternalReferenceType.VCS, - url=XsUri(vcs_url), + @staticmethod + def _apply_archive_refs( + component: Component, purl: PackageURL, version: str + ) -> None: + """Add DISTRIBUTION reference and optional hash for an archive dependency.""" + download_url = purl.qualifiers["download_url"] + component.group = purl.namespace or None # type: ignore[assignment] + component.external_references.add( + ExternalReference( + type=ExternalReferenceType.DISTRIBUTION, + url=XsUri(download_url), + ) + ) + integrity = IntegrityHash.parse(version) if version else None + if integrity: + cdx_algo_name = DFETCH_TO_CDX_HASH_ALGORITHM.get(integrity.algorithm) + if cdx_algo_name: + component.hashes.add( + HashType( + alg=HashAlgorithm(cdx_algo_name), + content=integrity.hex_digest, ) ) - for lic in licenses: + @staticmethod + def _apply_vcs_refs(component: Component, purl: PackageURL) -> None: + """Add VCS external reference and group for a generic VCS dependency.""" + component.group = purl.namespace or None + vcs_url = purl.qualifiers.get("vcs_url", "") + # ExternalReferenceType.VCS does not support ssh:// urls + if vcs_url and "ssh://" not in vcs_url: + component.external_references.add( + ExternalReference( + type=ExternalReferenceType.VCS, + url=XsUri(vcs_url), + ) + ) - # License wants either an SPDX id or a name, prefer SPDX id when available + @staticmethod + def _apply_licenses(component: Component, licenses: list[License]) -> None: + """Attach *licenses* to *component* and its evidence block.""" + for lic in licenses: + # Prefer SPDX id when available cdx_license = ( CycloneDxLicense(id=lic.spdx_id) if lic.spdx_id else CycloneDxLicense(name=lic.name) ) - component.licenses.add(cdx_license) if component.evidence: component.evidence.licenses.add(cdx_license) - self._bom.components.add(component) - def dump_to_file(self, outfile: str) -> bool: """Dump the SBoM to file.""" output_format = OutputFormat( diff --git a/dfetch/util/purl.py b/dfetch/util/purl.py index f7a7a819..294d8a5a 100644 --- a/dfetch/util/purl.py +++ b/dfetch/util/purl.py @@ -1,6 +1,6 @@ -"""Module to convert remote URLs to valid Package URLs (PURLs). +"""Module to convert VCS remote URLs to valid Package URLs (PURLs). -Supports: GitHub, Bitbucket, SVN, SSH paths, and more. +Supports: GitHub, Bitbucket, SVN, SSH paths, and generic VCS URLs. """ import re @@ -77,47 +77,52 @@ def _known_purl_types( return None -def remote_url_to_purl( - remote_url: str, version: str | None = None, subpath: str | None = None -) -> PackageURL: - """Convert a remote URL to a valid PackageURL object. +def _vcs_namespace_and_name(remote_url: str) -> tuple[str, str, str]: + """Derive namespace, name, and normalised URL for a generic VCS remote URL. - Supports GitHub, Bitbucket, SVN, SSH paths. - Optionally specify version and subpath. + Returns: + A ``(namespace, name, remote_url)`` tuple where *remote_url* may have + been normalised (e.g. SSH short-form converted to ``ssh://`` scheme). """ - purl = _known_purl_types(remote_url, version, subpath) - if purl: - return purl - parsed = urlparse(remote_url) path = parsed.path.lstrip("/") - if "svn" in parsed.scheme or "svn." in parsed.netloc: namespace, name = _namespace_and_name_from_domain_and_path(parsed.netloc, path) if namespace.startswith("p/"): namespace = namespace[len("p/") :] namespace = namespace.replace("/svn/", "/") - else: match = SSH_REGEX.match(remote_url) if match: namespace, name = _namespace_and_name_from_domain_and_path( - match.group("host"), - match.group("path"), + match.group("host"), match.group("path") ) - if not parsed.scheme: remote_url = f"ssh://{parsed.path.replace(':', '/')}" else: namespace, name = _namespace_and_name_from_domain_and_path( remote_url, path.replace(".git", "") ) + return namespace, name, remote_url + + +def vcs_url_to_purl( + vcs_url: str, version: str | None = None, subpath: str | None = None +) -> PackageURL: + """Convert a VCS remote URL to a valid PackageURL object. + Supports GitHub, Bitbucket, SVN, SSH paths, and generic VCS URLs. + Optionally specify version and subpath. + """ + purl = _known_purl_types(vcs_url, version, subpath) + if purl: + return purl + namespace, name, vcs_url = _vcs_namespace_and_name(vcs_url) return PackageURL( type="generic", namespace=namespace, name=name, version=version, - qualifiers={"vcs_url": remote_url}, + qualifiers={"vcs_url": vcs_url}, subpath=subpath, ) diff --git a/dfetch/util/util.py b/dfetch/util/util.py index b5f83b20..24fdeceb 100644 --- a/dfetch/util/util.py +++ b/dfetch/util/util.py @@ -12,6 +12,98 @@ from _hashlib import HASH +#: Glob patterns used to identify license files by filename. +LICENSE_GLOBS = ["licen[cs]e*", "copying*", "copyright*"] + + +def is_license_file(filename: str) -> bool: + """Return *True* when *filename* matches a known license file pattern.""" + return any(fnmatch.fnmatch(filename.lower(), pattern) for pattern in LICENSE_GLOBS) + + +def _copy_entry(src_entry: str, dest_entry: str, root: str) -> None: + """Copy a single file or directory *src_entry* to *dest_entry*. + + Raises :exc:`RuntimeError` if *src_entry* resolves outside *root*. + """ + check_no_path_traversal(src_entry, root) + if os.path.isdir(src_entry): + shutil.copytree(src_entry, dest_entry, symlinks=True) + else: + shutil.copy2(src_entry, dest_entry) + + +def copy_directory_contents(src_dir: str, dest_dir: str) -> None: + """Copy every entry in *src_dir* directly into *dest_dir*. + + Directories are copied recursively; files are copied with metadata. + """ + for entry_name in os.listdir(src_dir): + src_path = os.path.join(src_dir, entry_name) + _copy_entry( + src_path, + os.path.join(dest_dir, entry_name), + src_dir, + ) + + +def copy_src_subset( + src_root: str, dest_dir: str, src: str, keep_licenses: bool +) -> None: + """Copy a *src* sub-path from *src_root* into *dest_dir*. + + When *src* is a directory, its contents are copied flat into *dest_dir*. + When *src* is a single file, that file is copied into *dest_dir*. + If *keep_licenses* is ``True``, any license files found directly in + *src_root* are also copied regardless of the *src* filter. + + Raises: + RuntimeError: When *src* does not exist inside *src_root*. + """ + src_path = os.path.join(src_root, src) + check_no_path_traversal(src_path, src_root) + resolved_src_path = os.path.realpath(src_path) + if os.path.isdir(resolved_src_path): + copy_directory_contents(resolved_src_path, dest_dir) + elif os.path.isfile(resolved_src_path): + shutil.copy2( + resolved_src_path, + os.path.join(dest_dir, os.path.basename(resolved_src_path)), + ) + else: + raise RuntimeError(f"src {src!r} was not found in the extracted archive") + + if keep_licenses: + for entry_name in os.listdir(src_root): + full_path = os.path.join(src_root, entry_name) + check_no_path_traversal(full_path, src_root) + if os.path.isfile(full_path) and is_license_file(entry_name): + shutil.copy2(full_path, os.path.join(dest_dir, entry_name)) + + +def prune_files_by_pattern(directory: str, patterns: Sequence[str]) -> None: + """Remove files and directories in *directory* matching *patterns*. + + License files are never removed even when they match a pattern. + """ + seen: set[str] = set() + paths = [] + for file_or_dir in find_matching_files(directory, patterns): + path_str = str(file_or_dir) + if path_str in seen: + continue + seen.add(path_str) + paths.append(file_or_dir) + + # Remove children before parents to avoid FileNotFoundError on already-deleted paths. + paths.sort(key=lambda p: len(str(p)), reverse=True) + + for file_or_dir in paths: + if os.path.lexists(str(file_or_dir)) and not ( + file_or_dir.is_file() and is_license_file(file_or_dir.name) + ): + safe_rm(file_or_dir) + def _remove_readonly(func: Any, path: str, _: Any) -> None: if not os.access(path, os.W_OK): @@ -161,6 +253,29 @@ def str_if_possible(data: list[str]) -> str | list[str]: return "" if not data else data[0] if len(data) == 1 else data +def check_no_path_traversal(path: str | Path, root: str | Path) -> None: + """Raise *RuntimeError* if *path* escapes *root*. + + Both *path* and *root* are resolved with :func:`os.path.realpath` before + comparison, so symlinks and relative ``..`` components cannot bypass the + check. + + See https://owasp.org/www-community/attacks/Path_Traversal + + Raises: + RuntimeError: When *path* resolves to a location outside *root*. + """ + resolved_root = os.path.realpath(root) + resolved_path = os.path.realpath(path) + try: + escapes = os.path.commonpath([resolved_root, resolved_path]) != resolved_root + except ValueError: + # commonpath raises ValueError on Windows when paths span different drives + escapes = True + if escapes: + raise RuntimeError(f"{str(path)!r} is outside root {str(root)!r}") + + def resolve_absolute_path(path: str | Path) -> Path: """Return a guaranteed absolute Path, resolving symlinks. diff --git a/dfetch/vcs/archive.py b/dfetch/vcs/archive.py new file mode 100644 index 00000000..a8caed9a --- /dev/null +++ b/dfetch/vcs/archive.py @@ -0,0 +1,455 @@ +"""Archive (tar/zip) VCS implementation. + +Supports fetching dependencies distributed as ``.tar.gz``, ``.tgz``, +``.tar.bz2``, ``.tar.xz`` or ``.zip`` archives from any URL that Python's +:mod:`urllib.request` can reach (``http://``, ``https://``, ``file://``, …). + +Optional integrity checking is supported via an ``integrity:`` manifest block. +The ``hash:`` sub-field accepts ``sha256:`` (64 hex chars), +``sha384:`` (96 hex chars), or ``sha512:`` (128 hex chars). +The block is designed to grow with ``sig:`` and ``sig-key:`` fields for +detached signature / signing-key verification in the future. + +Example manifest entry:: + + projects: + - name: my-library + url: https://example.com/releases/my-library-1.0.tar.gz + vcs: archive + integrity: + hash: sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 + +""" + +from __future__ import annotations + +import hashlib +import http.client +import os +import pathlib +import shutil +import sys +import tarfile +import tempfile +import urllib.parse +import urllib.request +import zipfile +from collections.abc import Sequence +from typing import overload + +from packageurl import PackageURL + +from dfetch.log import get_logger +from dfetch.util.util import ( + copy_directory_contents, + copy_src_subset, + prune_files_by_pattern, +) + +logger = get_logger(__name__) + +#: Archive file extensions recognised by DFetch. +ARCHIVE_EXTENSIONS = (".tar.gz", ".tgz", ".tar.bz2", ".tar.xz", ".zip") + +# Safety limits applied during extraction to prevent decompression bombs. +_MAX_UNCOMPRESSED_BYTES = 500 * 1024 * 1024 # 500 MB +_MAX_MEMBER_COUNT = 10_000 + + +def is_archive_url(url: str) -> bool: + """Return *True* when *url* ends with a recognised archive extension. + + Query strings and fragments are stripped before testing so that URLs like + ``https://example.com/pkg.tar.gz?download=1`` are correctly recognised. + """ + path = urllib.parse.urlparse(url).path + return any(path.lower().endswith(ext) for ext in ARCHIVE_EXTENSIONS) + + +def strip_archive_extension(name: str) -> str: + """Remove a recognised archive extension from *name*.""" + lower = name.lower() + for ext in ARCHIVE_EXTENSIONS: + if lower.endswith(ext): + return name[: -len(ext)] + return name + + +def archive_url_to_purl( + download_url: str, + version: str | None = None, + subpath: str | None = None, +) -> PackageURL: + """Build a generic PackageURL for an archive download URL.""" + parsed = urllib.parse.urlparse(download_url) + basename = os.path.basename(parsed.path) + name = strip_archive_extension(basename) or "unknown" + namespace = parsed.hostname or "" + return PackageURL( + type="generic", + namespace=namespace or None, + name=name, + version=version, + qualifiers={"download_url": download_url}, + subpath=subpath, + ) + + +def _http_conn(scheme: str, netloc: str, timeout: int) -> http.client.HTTPConnection: + """Return an :class:`http.client.HTTPConnection` or HTTPS variant for *netloc*.""" + if scheme == "https": + return http.client.HTTPSConnection(netloc, timeout=timeout) + return http.client.HTTPConnection(netloc, timeout=timeout) + + +def _resource_path(parsed: urllib.parse.ParseResult) -> str: + """Return the path + query portion of *parsed* suitable for HTTP requests.""" + path = parsed.path or "/" + return f"{path}?{parsed.query}" if parsed.query else path + + +class ArchiveRemote: + """Represents a remote archive (tar/zip) URL. + + Provides helpers to check accessibility and download the archive. + """ + + def __init__(self, url: str) -> None: + """Create an ArchiveRemote for *url*.""" + self.url = url + + def is_accessible(self) -> bool: + """Return *True* when the archive URL is reachable. + + * ``file://`` URLs are checked with :func:`os.path.exists` directly — + no network round-trip needed. + * ``http``/``https`` URLs first try a ``HEAD`` request. If the server + rejects it (405/501) a partial ``GET`` (``Range: bytes=0-0``) is + attempted instead. Returns *False* on any final failure. + * Any other URL scheme returns *False*. + """ + parsed = urllib.parse.urlparse(self.url) + if parsed.scheme == "file": + return os.path.exists(urllib.request.url2pathname(parsed.path)) + if parsed.scheme not in ("http", "https"): + return False + return self._is_http_reachable(parsed) + + def _is_http_reachable(self, parsed: urllib.parse.ParseResult) -> bool: + """Try HEAD then partial-GET to confirm an HTTP/HTTPS URL is reachable.""" + netloc, path = parsed.netloc, _resource_path(parsed) + for method, headers in [("HEAD", {}), ("GET", {"Range": "bytes=0-0"})]: + try: + conn = _http_conn(parsed.scheme, netloc, timeout=15) + try: + conn.request(method, path, headers=headers) + status = conn.getresponse().status + if status not in (405, 501): + return status < 400 + finally: + conn.close() + except (OSError, ValueError, http.client.HTTPException): + return False + return False + + @overload + def download(self, dest_path: str, algorithm: str) -> str: ... + @overload + def download(self, dest_path: str, algorithm: None = ...) -> None: ... + + def download(self, dest_path: str, algorithm: str | None = None) -> str | None: + """Download the archive to *dest_path*, optionally computing its hash. + + When *algorithm* is given the hash is computed during the download + stream (zero extra file reads) and the hex digest is returned. + + Args: + dest_path: Local file path to write the archive to. + algorithm: Hash algorithm name (e.g. ``"sha256"``). When *None* + no hash is computed and *None* is returned. + + Returns: + Hex digest string when *algorithm* is provided, else *None*. + + Raises: + RuntimeError: On download failure or unsupported URL scheme. + """ + hasher = hashlib.new(algorithm) if algorithm else None + parsed = urllib.parse.urlparse(self.url) + if parsed.scheme == "file": + file_path = urllib.request.url2pathname(parsed.path) + try: + if hasher: + with open(file_path, "rb") as src, open(dest_path, "wb") as dst: + for chunk in iter(lambda: src.read(65536), b""): + dst.write(chunk) + hasher.update(chunk) + else: + shutil.copy(file_path, dest_path) + except OSError as exc: + raise RuntimeError( + f"'{self.url}' is not a valid URL or unreachable: {exc}" + ) from exc + elif parsed.scheme in ("http", "https"): + self._http_download(parsed, dest_path, hasher=hasher) + else: + raise RuntimeError( + f"'{self.url}' uses unsupported scheme '{parsed.scheme}'." + ) + return hasher.hexdigest() if hasher else None + + _MAX_REDIRECTS = 10 + + def _http_download( + self, + parsed: urllib.parse.ParseResult, + dest_path: str, + hasher: hashlib._Hash | None = None, + ) -> None: + """Download an HTTP/HTTPS resource to *dest_path*, following redirects. + + Up to :attr:`_MAX_REDIRECTS` 3xx redirects are followed transparently + (e.g. GitHub archive URLs redirect to a CDN). When *hasher* is + provided each chunk is fed into it during streaming, so the caller gets + the hash without an extra file read. + """ + for _ in range(self._MAX_REDIRECTS + 1): + conn = _http_conn(parsed.scheme, parsed.netloc, timeout=60) + try: + conn.request("GET", _resource_path(parsed)) + resp = conn.getresponse() + if resp.status in (301, 302, 303, 307, 308): + location = resp.getheader("Location", "") + if not location: + raise RuntimeError( + f"Redirect with no Location header from '{parsed.geturl()}'" + ) + parsed = urllib.parse.urlparse( + urllib.parse.urljoin(parsed.geturl(), location) + ) + continue + if resp.status != 200: + raise RuntimeError( + f"HTTP {resp.status} when downloading '{self.url}'" + ) + with open(dest_path, "wb") as fh: + while chunk := resp.read(65536): + fh.write(chunk) + if hasher: + hasher.update(chunk) + return + except (OSError, http.client.HTTPException) as exc: + raise RuntimeError( + f"'{self.url}' is not a valid URL or unreachable: {exc}" + ) from exc + finally: + conn.close() + raise RuntimeError(f"Too many redirects when downloading '{self.url}'") + + +class ArchiveLocalRepo: + """Extracts an archive to a local destination directory. + + Supports ``.tar.gz``, ``.tgz``, ``.tar.bz2``, ``.tar.xz`` and ``.zip`` + archives. A single top-level directory in the archive is automatically + stripped (like ``tar --strip-components=1``), so the archive may be + structured as ``project-1.0/src/…`` or ``src/…`` - both work. + """ + + @staticmethod + def extract( + archive_path: str, + dest_dir: str, + src: str = "", + ignore: Sequence[str] = (), + is_license: bool = True, + ) -> None: + """Extract *archive_path* into *dest_dir*, applying *src* / *ignore* filters. + + Args: + archive_path: Path to the downloaded archive file. + dest_dir: Directory to place the extracted contents into. + src: Optional sub-directory (or glob pattern) inside the archive + to extract exclusively. License files from the archive root + are always retained when *src* is set. + ignore: Sequence of glob patterns for files/directories to exclude. + is_license: Whether to check for and retain license files when + *src* is specified. + """ + with tempfile.TemporaryDirectory() as tmp_dir: + ArchiveLocalRepo._extract_raw(archive_path, tmp_dir) + + # Strip a single top-level directory if the archive uses one + top_entries = os.listdir(tmp_dir) + if len(top_entries) == 1 and os.path.isdir( + os.path.join(tmp_dir, top_entries[0]) + ): + extract_root = os.path.join(tmp_dir, top_entries[0]) + else: + extract_root = tmp_dir + + pathlib.Path(dest_dir).mkdir(parents=True, exist_ok=True) + + if src: + copy_src_subset(extract_root, dest_dir, src.rstrip("/"), is_license) + else: + copy_directory_contents(extract_root, dest_dir) + + if ignore: + prune_files_by_pattern(dest_dir, ignore) + + @staticmethod + def _check_archive_limits(member_count: int, total_bytes: int) -> None: + """Enforce decompression-bomb size and count limits. + + Raises: + RuntimeError: When *member_count* or *total_bytes* exceeds the + configured safety limits. + """ + if member_count > _MAX_MEMBER_COUNT: + raise RuntimeError( + f"Archive contains {member_count} members which exceeds the " + f"safety limit of {_MAX_MEMBER_COUNT}." + ) + if total_bytes > _MAX_UNCOMPRESSED_BYTES: + raise RuntimeError( + f"Archive uncompressed size ({total_bytes} bytes) exceeds the " + f"safety limit of {_MAX_UNCOMPRESSED_BYTES} bytes." + ) + + @staticmethod + def _check_archive_member_path(name: str) -> None: + """Raise *RuntimeError* if *name* is an unsafe archive member path. + + Rejects absolute paths and any ``..`` path component. + + Raises: + RuntimeError: When *name* is absolute or contains a ``..`` component. + """ + member_path = pathlib.PurePosixPath(name) + if member_path.is_absolute() or any(part == ".." for part in member_path.parts): + raise RuntimeError(f"Archive contains an unsafe member path: {name!r}") + + @staticmethod + def check_zip_members(zf: zipfile.ZipFile) -> list[zipfile.ZipInfo]: + """Validate all ZIP member paths against path-traversal attacks. + + Returns: + The validated list of members, safe to pass to + :meth:`zipfile.ZipFile.extract`. + + Raises: + RuntimeError: When any member contains an absolute path, a ``..`` + component, or when the archive exceeds the size/count limits. + """ + members = zf.infolist() + ArchiveLocalRepo._check_archive_limits( + len(members), sum(info.file_size for info in members) + ) + for info in members: + ArchiveLocalRepo._check_archive_member_path(info.filename) + return members + + @staticmethod + def _check_tar_member_type(member: tarfile.TarInfo) -> None: + """Reject dangerous TAR member types that could harm the host system. + + On Python ≥ 3.11.4 the ``filter="tar"`` passed to + :meth:`tarfile.TarFile.extractall` already blocks many of these, but + we validate here too so the guard is active on **all** supported Python + versions and provides defence-in-depth on newer ones. + + Rejected member types: + + * **Symlinks with absolute or escaping targets** — could create a + foothold outside the extraction directory for later writes. + * **Hard links with absolute or escaping targets** — same risk as + dangerous symlinks; the target path is validated like a regular + member name. + * **Device files** (character, block) — accessing ``/dev/mem`` or + similar via an extracted device node can compromise the host. + * **FIFO / named pipes** — rarely present in software archives and + can be used to communicate with host processes or block extraction. + + Raises: + RuntimeError: When *member* is a disallowed or unsafe member type. + """ + if member.issym(): + target = member.linkname + if os.path.isabs(target) or any( + part == ".." for part in pathlib.PurePosixPath(target).parts + ): + raise RuntimeError( + f"Archive contains a symlink with an unsafe target: " + f"{member.name!r} -> {target!r}" + ) + elif member.islnk(): + # Hard-link targets are archive-relative paths; apply the same + # path-traversal check as we do for regular member names. + ArchiveLocalRepo._check_archive_member_path(member.linkname) + elif member.isdev() or member.isfifo(): + raise RuntimeError( + f"Archive contains a special file (device/FIFO): {member.name!r}" + ) + + @staticmethod + def _check_tar_members(tf: tarfile.TarFile) -> None: + """Validate TAR members against decompression bombs and unsafe member types. + + Checks applied (all supported Python versions): + + * **Size / count limits** — guard against decompression-bomb archives. + * **Path traversal** — reject absolute paths and ``..`` components. + * **Unsafe member types** — reject symlinks with absolute or escaping + targets, hardlinks with escaping targets, device files, and FIFOs + (see :meth:`_check_tar_member_type`). + + On Python ≥ 3.11.4 the ``filter="tar"`` passed to + :meth:`tarfile.TarFile.extractall` provides additional OS-level + protection; these checks remain as defence-in-depth. + + Raises: + RuntimeError: When the archive exceeds the size/count limits, + contains an absolute path or ``..`` component, or contains an + unsafe member type (dangerous symlink, device file, FIFO). + """ + members = tf.getmembers() + ArchiveLocalRepo._check_archive_limits( + len(members), sum(m.size for m in members if m.isfile()) + ) + for member in members: + ArchiveLocalRepo._check_archive_member_path(member.name) + ArchiveLocalRepo._check_tar_member_type(member) + + @staticmethod + def _extract_raw(archive_path: str, dest_dir: str) -> None: + """Extract archive contents to *dest_dir* without any filtering. + + Safety checks performed before extraction: + + * TAR: :meth:`_check_tar_members` validates every member for + decompression-bomb limits, path traversal, dangerous symlink + targets, hardlink targets, device files, and FIFOs on **all** + supported Python versions. When Python ≥ 3.11.4 is available the + built-in ``filter="tar"`` provides additional OS-level enforcement + as defence-in-depth. + * ZIP: member path traversal validation (absolute paths and ``..`` + components are rejected) plus member count and size limits. + """ + lower = archive_path.lower() + if tarfile.is_tarfile(archive_path) and not lower.endswith(".zip"): + with tarfile.open(archive_path, "r:*") as tf: + ArchiveLocalRepo._check_tar_members(tf) + if sys.version_info >= (3, 11, 4): + tf.extractall(dest_dir, filter="tar") # nosec B202 + else: + tf.extractall(dest_dir) # nosec B202 + elif lower.endswith(".zip") or zipfile.is_zipfile(archive_path): + with zipfile.ZipFile(archive_path) as zf: + ArchiveLocalRepo.check_zip_members(zf) + zf.extractall(dest_dir) # nosec B202 + else: + raise RuntimeError( + f"Unsupported archive format: '{archive_path}'. " + f"Supported formats: {', '.join(ARCHIVE_EXTENSIONS)}" + ) diff --git a/dfetch/vcs/integrity_hash.py b/dfetch/vcs/integrity_hash.py new file mode 100644 index 00000000..9fcf4e45 --- /dev/null +++ b/dfetch/vcs/integrity_hash.py @@ -0,0 +1,69 @@ +"""Integrity hash: a ``:`` content fingerprint.""" + +from __future__ import annotations + +import hmac + +#: Supported hash algorithms, ordered strongest-first so :meth:`IntegrityHash.parse` +#: matches the most specific prefix when algorithm names share a common prefix. +SUPPORTED_HASH_ALGORITHMS = ("sha512", "sha384", "sha256") + + +class IntegrityHash: + """A parsed ``:`` integrity hash value. + + Use :meth:`parse` to build one from a raw string (returns *None* when the + string does not match a known algorithm prefix). Use the constructor when + both parts are already known. + + >>> h = IntegrityHash.parse("sha256:abc123") + >>> h.algorithm, h.hex_digest + ('sha256', 'abc123') + >>> str(h) + 'sha256:abc123' + """ + + def __init__(self, algorithm: str, hex_digest: str) -> None: + """Create an IntegrityHash from known *algorithm* and *hex_digest*.""" + self.algorithm = algorithm + self.hex_digest = hex_digest + + @classmethod + def parse(cls, value: str) -> IntegrityHash | None: + """Return an :class:`IntegrityHash` when *value* is ``:``. + + Returns *None* when *value* does not start with a known algorithm prefix. + """ + for algo in SUPPORTED_HASH_ALGORITHMS: + if value.startswith(f"{algo}:"): + return cls(algo, value[len(algo) + 1 :]) + return None + + def __str__(self) -> str: + """Return the canonical ``:`` string.""" + return f"{self.algorithm}:{self.hex_digest}" + + def __repr__(self) -> str: + """Return a developer-readable representation.""" + return f"IntegrityHash({self.algorithm!r}, {self.hex_digest!r})" + + def __eq__(self, other: object) -> bool: + """Compare two :class:`IntegrityHash` instances (case-insensitive hex).""" + if isinstance(other, IntegrityHash): + return ( + self.algorithm == other.algorithm + and self.hex_digest.lower() == other.hex_digest.lower() + ) + return NotImplemented + + def __hash__(self) -> int: + """Hash based on algorithm and lower-cased hex digest.""" + return hash((self.algorithm, self.hex_digest.lower())) + + def matches(self, actual_hex: str) -> bool: + """Return *True* when *actual_hex* equals this hash's digest. + + Uses :func:`hmac.compare_digest` for constant-time comparison to + avoid leaking timing information about the expected value. + """ + return hmac.compare_digest(actual_hex.lower(), self.hex_digest.lower()) diff --git a/doc/_ext/sphinxcontrib_asciinema/.dfetch_data.yaml b/doc/_ext/sphinxcontrib_asciinema/.dfetch_data.yaml index 49a36e79..4f9d5878 100644 --- a/doc/_ext/sphinxcontrib_asciinema/.dfetch_data.yaml +++ b/doc/_ext/sphinxcontrib_asciinema/.dfetch_data.yaml @@ -2,8 +2,8 @@ # For more info see https://dfetch.rtfd.io/en/latest/getting_started.html dfetch: branch: master - hash: 5b0a3a18e1e83d363f9eb0ac4b3fca17 - last_fetch: 26/01/2026, 23:40:59 + hash: dcd1473e1a3ca613b804e3e51e7ee342 + last_fetch: 22/03/2026, 19:52:31 patch: - doc/_ext/patches/001-autoformat-sphinxcontrib.asciinema.patch - doc/_ext/patches/002-fix-options-sphinxcontrib.asciinema.patch diff --git a/doc/generate-casts/demo-magic/.dfetch_data.yaml b/doc/generate-casts/demo-magic/.dfetch_data.yaml index 9c5c2598..07045916 100644 --- a/doc/generate-casts/demo-magic/.dfetch_data.yaml +++ b/doc/generate-casts/demo-magic/.dfetch_data.yaml @@ -2,8 +2,8 @@ # For more info see https://dfetch.rtfd.io/en/latest/getting_started.html dfetch: branch: master - hash: 476a29a874df3840ac2bd916e7097b92 - last_fetch: 14/10/2025, 19:16:12 + hash: d67278c164d7a103c46ff953560f1f0a + last_fetch: 22/03/2026, 19:50:56 patch: '' remote_url: https://github.com/paxtonhare/demo-magic.git revision: 2a2f439c26a93286dc2adc6ef2a81755af83f36e diff --git a/doc/landing-page/conf.py b/doc/landing-page/conf.py index ee3a7cb0..e5762639 100644 --- a/doc/landing-page/conf.py +++ b/doc/landing-page/conf.py @@ -95,11 +95,17 @@ html_css_files = [ ( "https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.5.1/css/brands.min.css", - {"integrity": "sha512-8RxmFOVaKQe/xtg6lbscU9DU0IRhURWEuiI0tXevv+lXbAHfkpamD4VKFQRto9WgfOJDwOZ74c/s9Yesv3VvIQ==", "crossorigin": "anonymous"}, + { + "integrity": "sha512-8RxmFOVaKQe/xtg6lbscU9DU0IRhURWEuiI0tXevv+lXbAHfkpamD4VKFQRto9WgfOJDwOZ74c/s9Yesv3VvIQ==", + "crossorigin": "anonymous", + }, ), ( "https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.5.1/css/fontawesome.min.css", - {"integrity": "sha512-d0olNN35C6VLiulAobxYHZiXJmq+vl+BGIgAxQtD5+kqudro/xNMvv2yIHAciGHpExsIbKX3iLg+0B6d0k4+ZA==", "crossorigin": "anonymous"}, + { + "integrity": "sha512-d0olNN35C6VLiulAobxYHZiXJmq+vl+BGIgAxQtD5+kqudro/xNMvv2yIHAciGHpExsIbKX3iLg+0B6d0k4+ZA==", + "crossorigin": "anonymous", + }, ), "css/custom.css", ] diff --git a/doc/manifest.rst b/doc/manifest.rst index 65e30aef..a1947f86 100644 --- a/doc/manifest.rst +++ b/doc/manifest.rst @@ -99,6 +99,7 @@ Below an overview of all possible fields on the manifest. The bold items are man enum: - git - svn + - archive src: type: string description: > @@ -108,4 +109,21 @@ Below an overview of all possible fields on the manifest. The bold items are man description: Files to ignore. See :ref:`Ignore` for details. items: type: string + integrity: + type: object + description: > + Integrity verification block for archive dependencies. + Only used with ``vcs: archive``. + Designed for future extension with ``sig:`` (detached signature URL) + and ``sig-key:`` (signing-key URL or fingerprint) fields alongside ``hash:``. + See :ref:`Archive` for details. + properties: + hash: + type: string + description: > + Cryptographic hash of the archive file. + Format: ``:``. + Supported algorithms: ``sha256`` (64 hex chars), + ``sha384`` (96 hex chars), and ``sha512`` (128 hex chars). + Example: ``sha256:e3b0c4…``. uniqueItems: true diff --git a/example/dfetch.yaml b/example/dfetch.yaml index 7f950abd..4ee1e746 100644 --- a/example/dfetch.yaml +++ b/example/dfetch.yaml @@ -42,3 +42,13 @@ manifest: dst: Tests/cpputest-git-rev-only revision: d14505cc9191fcf17ccbd92af1c3409eb3969890 repo-path: cpputest/cpputest.git # Use external git directly + + - name: cppcheck-archive + remote: github + dst: Tests/cppcheck-archive + repo-path: danmar/cppcheck/archive/2.20.0.tar.gz + ignore: + - tests + - .github + integrity: + hash: sha256:7be7992439339017edb551d8e7d2315f9bb57c402da50c2cee9cd0e2724600a1 diff --git a/features/check-archive.feature b/features/check-archive.feature new file mode 100644 index 00000000..1ba56760 --- /dev/null +++ b/features/check-archive.feature @@ -0,0 +1,143 @@ +Feature: Checking dependencies from an archive + + DFetch can check if archive-based projects are up-to-date. + For archives without an integrity hash the URL is the version identifier + so the project is always considered up-to-date once fetched (the URL has + not changed). For archives with an 'integrity.hash' the hash is the + version identifier, and dfetch reports whether the locally stored version + matches the wanted hash. + + Scenario: Archive project without hash is reported as up-to-date after fetch + Given an archive "SomeProject.tar.gz" with the files + | path | + | README.md | + And the manifest 'dfetch.yaml' in MyProject + """ + manifest: + version: '0.0' + projects: + - name: SomeProject + url: some-remote-server/SomeProject.tar.gz + vcs: archive + """ + And all projects are updated in MyProject + When I run "dfetch check" in MyProject + Then the output shows + """ + Dfetch (0.12.1) + SomeProject: + > up-to-date (some-remote-server/SomeProject.tar.gz) + """ + + Scenario: Archive project with correct sha256 hash is reported as up-to-date + Given an archive "SomeProject.tar.gz" with the files + | path | + | README.md | + And the manifest 'dfetch.yaml' in MyProject + """ + manifest: + version: '0.0' + projects: + - name: SomeProject + url: some-remote-server/SomeProject.tar.gz + vcs: archive + integrity: + hash: sha256: + """ + And all projects are updated in MyProject + When I run "dfetch check" in MyProject + Then the output shows + """ + Dfetch (0.12.1) + SomeProject: + > up-to-date (sha256:) + """ + + Scenario: Archive project that has not been fetched yet is reported + Given an archive "SomeProject.tar.gz" with the files + | path | + | README.md | + And the manifest 'dfetch.yaml' in MyProject + """ + manifest: + version: '0.0' + projects: + - name: SomeProject + url: some-remote-server/SomeProject.tar.gz + vcs: archive + """ + When I run "dfetch check" in MyProject + Then the output shows + """ + Dfetch (0.12.1) + SomeProject: + > wanted (some-remote-server/SomeProject.tar.gz), available (some-remote-server/SomeProject.tar.gz) + """ + + Scenario: Non-existent archive URL is reported + Given the manifest 'dfetch.yaml' + """ + manifest: + version: '0.0' + projects: + - name: non-existent-archive + url: https://dfetch.invalid/does-not-exist.tar.gz + vcs: archive + """ + When I run "dfetch check" + Then the output shows + """ + Dfetch (0.12.1) + non-existent-archive: + > wanted (https://dfetch.invalid/does-not-exist.tar.gz), but not available at the upstream. + """ + + Scenario: Archive project with ignore list shows no local changes after fresh fetch + Given an archive "SomeProject.tar.gz" with the files + | path | + | README.md | + | src/main.c | + | tests/test_main.c | + And the manifest 'dfetch.yaml' in MyProject + """ + manifest: + version: '0.0' + projects: + - name: SomeProject + url: some-remote-server/SomeProject.tar.gz + vcs: archive + ignore: + - tests + """ + And all projects are updated in MyProject + When I run "dfetch check SomeProject" in MyProject + Then the output shows + """ + Dfetch (0.12.1) + SomeProject: + > up-to-date (some-remote-server/SomeProject.tar.gz) + """ + + Scenario: Archive with local changes is reported + Given an archive "SomeProject.tar.gz" with the files + | path | + | README.md | + And the manifest 'dfetch.yaml' in MyProject + """ + manifest: + version: '0.0' + projects: + - name: SomeProject + url: some-remote-server/SomeProject.tar.gz + vcs: archive + """ + And all projects are updated in MyProject + And "SomeProject/README.md" in MyProject is changed locally + When I run "dfetch check SomeProject" in MyProject + Then the output shows + """ + Dfetch (0.12.1) + SomeProject: + > Local changes were detected, please generate a patch using 'dfetch diff SomeProject' and add it to your manifest using 'patch:'. Alternatively overwrite the local changes with 'dfetch update --force SomeProject' + > up-to-date (some-remote-server/SomeProject.tar.gz) + """ diff --git a/features/fetch-archive.feature b/features/fetch-archive.feature new file mode 100644 index 00000000..0f667f19 --- /dev/null +++ b/features/fetch-archive.feature @@ -0,0 +1,226 @@ +Feature: Fetching dependencies from an archive (tar/zip) + + Some projects are distributed as tar or zip archives, for example as + GitHub release assets or on internal artifact servers. DFetch supports + fetching these archives using the 'archive' vcs type. Optionally, an + 'integrity:' block with a 'hash:' sub-field can be specified for + cryptographic integrity verification. + + Scenario: Tar.gz archive project is fetched + Given an archive "SomeProject.tar.gz" with the files + | path | + | README.md | + | src/main.c | + And the manifest 'dfetch.yaml' in MyProject + """ + manifest: + version: '0.0' + projects: + - name: SomeProject + url: some-remote-server/SomeProject.tar.gz + vcs: archive + """ + When I run "dfetch update" in MyProject + Then 'MyProject' looks like: + """ + MyProject/ + SomeProject/ + .dfetch_data.yaml + README.md + src/ + main.c + dfetch.yaml + """ + + Scenario: Zip archive project is fetched + Given an archive "SomeProject.zip" with the files + | path | + | README.md | + | include/lib.h | + And the manifest 'dfetch.yaml' in MyProject + """ + manifest: + version: '0.0' + projects: + - name: SomeProject + url: some-remote-server/SomeProject.zip + vcs: archive + """ + When I run "dfetch update" in MyProject + Then 'MyProject' looks like: + """ + MyProject/ + SomeProject/ + .dfetch_data.yaml + README.md + include/ + lib.h + dfetch.yaml + """ + + Scenario: Archive projects with sha256, sha384 and sha512 hash verification are fetched + Given an archive "SomeProject.tar.gz" with the files + | path | + | README.md | + And the manifest 'dfetch.yaml' in MyProject + """ + manifest: + version: '0.0' + projects: + - name: SomeProject-sha256 + url: some-remote-server/SomeProject.tar.gz + vcs: archive + integrity: + hash: sha256: + - name: SomeProject-sha384 + url: some-remote-server/SomeProject.tar.gz + vcs: archive + integrity: + hash: sha384: + - name: SomeProject-sha512 + url: some-remote-server/SomeProject.tar.gz + vcs: archive + integrity: + hash: sha512: + """ + When I run "dfetch update" in MyProject + Then 'MyProject' looks like: + """ + MyProject/ + SomeProject-sha256/ + .dfetch_data.yaml + README.md + SomeProject-sha384/ + .dfetch_data.yaml + README.md + SomeProject-sha512/ + .dfetch_data.yaml + README.md + dfetch.yaml + """ + + Scenario: Archive with incorrect sha256 hash is rejected + Given an archive "SomeProject.tar.gz" with the files + | path | + | README.md | + And the manifest 'dfetch.yaml' in MyProject + """ + manifest: + version: '0.0' + projects: + - name: SomeProject + url: some-remote-server/SomeProject.tar.gz + vcs: archive + integrity: + hash: sha256:0000000000000000000000000000000000000000000000000000000000000000 + """ + When I run "dfetch update" in MyProject + Then the output shows + """ + Dfetch (0.12.1) + Hash mismatch for SomeProject! sha256 expected 0000000000000000000000000000000000000000000000000000000000000000 + """ + + Scenario: Specific directory from archive can be fetched + Given an archive "SomeProject.tar.gz" with the files + | path | + | src/main.c | + | src/lib.c | + | tests/test_main.c | + And the manifest 'dfetch.yaml' in MyProject + """ + manifest: + version: '0.0' + projects: + - name: SomeProject + url: some-remote-server/SomeProject.tar.gz + vcs: archive + src: src/ + """ + When I run "dfetch update" in MyProject + Then 'MyProject' looks like: + """ + MyProject/ + SomeProject/ + .dfetch_data.yaml + lib.c + main.c + dfetch.yaml + """ + + Scenario: Files can be ignored when fetching from archive + Given an archive "SomeProject.tar.gz" with the files + | path | + | README.md | + | src/main.c | + | tests/test_main.c | + And the manifest 'dfetch.yaml' in MyProject + """ + manifest: + version: '0.0' + projects: + - name: SomeProject + url: some-remote-server/SomeProject.tar.gz + vcs: archive + ignore: + - tests + """ + When I run "dfetch update" in MyProject + Then 'MyProject' looks like: + """ + MyProject/ + SomeProject/ + .dfetch_data.yaml + README.md + src/ + main.c + dfetch.yaml + """ + + Scenario: Archive is re-fetched when force flag is given + Given an archive "SomeProject.tar.gz" with the files + | path | + | README.md | + And the manifest 'dfetch.yaml' in MyProject + """ + manifest: + version: '0.0' + projects: + - name: SomeProject + url: some-remote-server/SomeProject.tar.gz + vcs: archive + """ + And all projects are updated in MyProject + When I run "dfetch update --force" in MyProject + Then the output shows + """ + Dfetch (0.12.1) + SomeProject: + > Fetched some-remote-server/SomeProject.tar.gz + """ + + Scenario: Multiple archive projects are fetched + Given an archive "LibA.tar.gz" with the files + | path | + | README.md | + And an archive "LibB.zip" with the files + | path | + | README.md | + And the manifest 'dfetch.yaml' in MyProject + """ + manifest: + version: '0.0' + projects: + - name: LibA + url: some-remote-server/LibA.tar.gz + vcs: archive + + - name: LibB + url: some-remote-server/LibB.zip + vcs: archive + """ + When I run "dfetch update" in MyProject + Then the following projects are fetched + | path | + | MyProject/LibA | + | MyProject/LibB | diff --git a/features/freeze-archive.feature b/features/freeze-archive.feature new file mode 100644 index 00000000..1e5fb3db --- /dev/null +++ b/features/freeze-archive.feature @@ -0,0 +1,68 @@ +Feature: Freeze archive dependencies + + For archive projects, 'dfetch freeze' adds a sha256 hash to the manifest + to pin the exact archive content. This uses the 'integrity.hash: sha256:' + format, which can be extended to other algorithms or signature fields in + the future. + + Archives that already have an integrity hash in the manifest are left unchanged. + + Scenario: Archive project is frozen with its sha256 hash + Given an archive "SomeProject.tar.gz" with the files + | path | + | README.md | + And the manifest 'dfetch.yaml' + """ + manifest: + version: '0.0' + projects: + - name: SomeProject + url: some-remote-server/SomeProject.tar.gz + vcs: archive + """ + And all projects are updated + When I run "dfetch freeze" + Then the manifest 'dfetch.yaml' is replaced with + """ + manifest: + version: '0.0' + + projects: + - name: SomeProject + url: some-remote-server/SomeProject.tar.gz + vcs: archive + integrity: + hash: sha256: + + """ + + Scenario: Already frozen archive project is not changed by freeze + Given an archive "SomeProject.tar.gz" with the files + | path | + | README.md | + And the manifest 'dfetch.yaml' + """ + manifest: + version: '0.0' + projects: + - name: SomeProject + url: some-remote-server/SomeProject.tar.gz + vcs: archive + integrity: + hash: sha256: + """ + And all projects are updated + When I run "dfetch freeze" + Then the manifest 'dfetch.yaml' is replaced with + """ + manifest: + version: '0.0' + + projects: + - name: SomeProject + url: some-remote-server/SomeProject.tar.gz + vcs: archive + integrity: + hash: sha256: + + """ diff --git a/features/report-sbom-archive.feature b/features/report-sbom-archive.feature new file mode 100644 index 00000000..886be973 --- /dev/null +++ b/features/report-sbom-archive.feature @@ -0,0 +1,118 @@ +Feature: Create a CycloneDX SBOM for archive dependencies + + *Dfetch* can generate a software Bill-of-Materials (SBOM) that includes + dependencies fetched from tar/zip archives. + + For archive components the SBOM records: + - A ``generic`` Package URL (PURL) with a ``download_url`` qualifier + pointing at the archive. + - An external reference of type ``distribution`` (not ``vcs``). + - A ``SHA-256`` component hash when an ``integrity.hash`` field is present + in the manifest, so downstream tooling can verify supply-chain integrity. + + Scenario: A fetched archive without a hash generates a json sbom + Given an archive "SomeProject.tar.gz" + And the manifest 'dfetch.yaml' + """ + manifest: + version: '0.0' + + projects: + - name: SomeProject + url: some-remote-server/SomeProject.tar.gz + vcs: archive + """ + And all projects are updated + When I run "dfetch report -t sbom" + Then the 'report.json' json file includes + """ + { + "components": [ + { + "name": "SomeProject", + "type": "library", + "externalReferences": [ + { + "type": "distribution", + "url": "" + } + ] + } + ] + } + """ + + Scenario: A fetched archive with sha256 hash generates a json sbom with hash + Given an archive "SomeProject.tar.gz" + And the manifest 'dfetch.yaml' + """ + manifest: + version: '0.0' + + projects: + - name: SomeProject + url: some-remote-server/SomeProject.tar.gz + vcs: archive + integrity: + hash: sha256: + """ + And all projects are updated + When I run "dfetch report -t sbom" + Then the 'report.json' json file includes + """ + { + "components": [ + { + "name": "SomeProject", + "version": "sha256:", + "type": "library", + "hashes": [ + { + "alg": "SHA-256", + "content": "" + } + ], + "externalReferences": [ + { + "type": "distribution", + "url": "" + } + ] + } + ] + } + """ + + Scenario: An unfetched archive with hash in manifest reports hash as version + Given an archive "SomeProject.tar.gz" + And the manifest 'dfetch.yaml' + """ + manifest: + version: '0.0' + + projects: + - name: SomeProject + url: some-remote-server/SomeProject.tar.gz + vcs: archive + integrity: + hash: sha256: + """ + When I run "dfetch report -t sbom" + Then the 'report.json' json file includes + """ + { + "components": [ + { + "name": "SomeProject", + "version": "sha256:", + "type": "library", + "hashes": [ + { + "alg": "SHA-256", + "content": "" + } + ] + } + ] + } + """ diff --git a/features/report-sbom.feature b/features/report-sbom.feature index 19a47b63..e0052aa2 100644 --- a/features/report-sbom.feature +++ b/features/report-sbom.feature @@ -94,6 +94,7 @@ Feature: Create an CycloneDX sbom "url": "https://github.com/cpputest/cpputest" } ], + "group": "cpputest", "licenses": [ { "license": { @@ -226,3 +227,38 @@ Feature: Create an CycloneDX sbom "specVersion": "1.6" } """ + + Scenario: A fetched archive dependency generates a json sbom with distribution reference + Given the manifest 'dfetch.yaml' + """ + manifest: + version: '0.0' + + projects: + - name: test-repo-headers + url: https://github.com/dfetch-org/test-repo/archive/refs/tags/v1.tar.gz + vcs: archive + ignore: + - '*.md' + - '*.txt' + """ + And all projects are updated + When I run "dfetch report -t sbom" + Then the 'report.json' json file includes + """ + { + "components": [ + { + "name": "test-repo-headers", + "group": "github.com", + "type": "library", + "externalReferences": [ + { + "type": "distribution", + "url": "https://github.com/dfetch-org/test-repo/archive/refs/tags/v1.tar.gz" + } + ] + } + ] + } + """ diff --git a/features/steps/archive_steps.py b/features/steps/archive_steps.py new file mode 100644 index 00000000..1c961035 --- /dev/null +++ b/features/steps/archive_steps.py @@ -0,0 +1,90 @@ +"""Steps for archive-based feature tests.""" + +# pylint: disable=function-redefined, missing-function-docstring, import-error, not-callable +# pyright: reportRedeclaration=false, reportAttributeAccessIssue=false, reportCallIssue=false + +import hashlib +import io +import os +import pathlib +import tarfile +import zipfile + +from behave import given # pylint: disable=no-name-in-module + + +def _file_digest(path: str, constructor) -> str: + """Return the hex digest of *path* using the given hashlib *constructor*.""" + h = constructor() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + h.update(chunk) + return h.hexdigest() + + +def create_tar_gz(archive_path: str, name: str, files: list[dict]) -> None: + """Create a .tar.gz archive with files nested under a top-level / directory.""" + with tarfile.open(archive_path, "w:gz") as tar: + for file in files: + content = f"Generated file {file['path']}\n".encode() + member_path = f"{name}/{file['path']}" + info = tarfile.TarInfo(name=member_path) + info.size = len(content) + tar.addfile(info, io.BytesIO(content)) + + +def create_zip(archive_path: str, name: str, files: list[dict]) -> None: + """Create a .zip archive with files nested under a top-level / directory.""" + with zipfile.ZipFile(archive_path, "w", zipfile.ZIP_DEFLATED) as zf: + for file in files: + content = f"Generated file {file['path']}\n" + member_path = f"{name}/{file['path']}" + zf.writestr(member_path, content) + + +def _archive_url(context, filename: str) -> str: + """Build the archive URL in the same format used by apply_manifest_substitutions. + + apply_manifest_substitutions produces ``file:///`` + absolute path, which for an + absolute path like ``/tmp/...`` yields four slashes (``file:////tmp/...``). + We must match that format so placeholder substitution works in SBOM assertions. + + :func:`pathlib.Path.as_posix` is used instead of :func:`str.split`/join so + that mixed separators (e.g. on Windows) are normalised correctly. + """ + server_posix = pathlib.Path(context.remotes_dir_path).as_posix() + return f"file:///{server_posix}/{filename}" + + +def _create_archive(context, name: str, extension: str) -> None: + """Create an archive of the given *extension* in the remote server directory.""" + server_path = context.remotes_dir_path + pathlib.Path(server_path).mkdir(parents=True, exist_ok=True) + + filename = f"{name}{extension}" + archive_path = os.path.join(server_path, filename) + files = list(context.table) if context.table else [{"path": "README.md"}] + + if extension == ".tar.gz": + create_tar_gz(archive_path, name, files) + elif extension == ".zip": + create_zip(archive_path, name, files) + else: + raise ValueError(f"Unsupported archive extension: {extension!r}") + + context.archive_sha256 = _file_digest(archive_path, hashlib.sha256) + context.archive_sha384 = _file_digest(archive_path, hashlib.sha384) + context.archive_sha512 = _file_digest(archive_path, hashlib.sha512) + context.archive_url = _archive_url(context, filename) + + +@given('an archive "{name}.tar.gz" with the files') +@given('an archive "{name}.tar.gz"') +def step_impl(context, name): + _create_archive(context, name, ".tar.gz") + + +@given('an archive "{name}.zip" with the files') +@given('an archive "{name}.zip"') +def step_impl(context, name): + _create_archive(context, name, ".zip") diff --git a/features/steps/generic_steps.py b/features/steps/generic_steps.py index 8d35a44f..0f646dd7 100644 --- a/features/steps/generic_steps.py +++ b/features/steps/generic_steps.py @@ -50,8 +50,8 @@ def temporary_env(key: str, value: str): def remote_server_path(context): - """Get the path to the remote dir.""" - return "/".join(context.remotes_dir_path.split(os.sep)) + """Get the path to the remote dir as a POSIX path string.""" + return pathlib.Path(context.remotes_dir_path).as_posix() def call_command(context: Context, args: list[str], path: Optional[str] = ".") -> None: @@ -95,6 +95,80 @@ def check_json(path: Union[str, os.PathLike], content: str) -> None: ) +def apply_archive_substitutions(text: str, context) -> str: + """Replace archive-related dynamic placeholders with values stored on *context*.""" + if hasattr(context, "archive_sha256"): + text = text.replace("", context.archive_sha256) + if hasattr(context, "archive_sha384"): + text = text.replace("", context.archive_sha384) + if hasattr(context, "archive_sha512"): + text = text.replace("", context.archive_sha512) + if hasattr(context, "archive_url"): + text = text.replace("", context.archive_url) + return text + + +def _json_subset_matches(expected, actual) -> bool: + """Return *True* when *expected* is a subset of *actual* (recursive). + + **List matching is greedy and order-sensitive.** Each item in *expected* + is matched against *actual* in order, claiming the first unused actual + item that satisfies the subset check. This means an earlier expected + item can consume the only actual item that a later, more specific + expected item would need. For example, with:: + + expected = [{"a": 1}, {"a": 1, "b": 2}] + actual = [{"a": 1, "b": 2}] + + the first expected item matches ``{"a": 1, "b": 2}`` (leaving nothing + for the second), so the overall match returns *False* even though + ``{"a": 1, "b": 2}`` satisfies the second item. Consumers should + **not** rely on non-deterministic matching; instead, pre-order *expected* + lists from most-specific to least-specific to avoid this behaviour. + """ + if isinstance(expected, dict): + if not isinstance(actual, dict): + return False + return all( + k in actual and _json_subset_matches(v, actual[k]) + for k, v in expected.items() + ) + if isinstance(expected, list): + if not isinstance(actual, list): + return False + matched = [False] * len(actual) + for exp_item in expected: + found = False + for i, act_item in enumerate(actual): + if not matched[i] and _json_subset_matches(exp_item, act_item): + matched[i] = True + found = True + break + if not found: + return False + return True + return expected == actual + + +def check_json_subset(path: Union[str, os.PathLike], content: str, context) -> None: + """Assert that a JSON file *contains* the given key-values (subset match). + + Dynamic placeholders (````, ````) in + *content* are substituted with values from *context* before parsing. + """ + content = apply_archive_substitutions(content, context) + + with open(path, "r", encoding="UTF-8") as file_to_check: + actual_json = json.load(file_to_check) + expected_json = json.loads(content) + + assert _json_subset_matches(expected_json, actual_json), ( + f"JSON subset mismatch.\n" + f"Expected subset:\n{json.dumps(expected_json, indent=4, sort_keys=True)}\n" + f"Actual:\n{json.dumps(actual_json, indent=4, sort_keys=True)}" + ) + + def check_content( expected_content: Iterable[str], actual_content: Iterable[str] ) -> None: @@ -196,6 +270,8 @@ def check_output(context, line_count=None): context: Behave context with cmd_output and expected text line_count: If set, compare only the first N lines of actual output """ + expected_raw = apply_archive_substitutions(context.text, context) + expected_text = multisub( patterns=[ (dfetch_title, "Dfetch (x.x.x)"), @@ -204,7 +280,7 @@ def check_output(context, line_count=None): (ansi_escape, ""), (svn_error, "svn: EXXXXXX: "), ], - text=context.text, + text=expected_raw, ) actual_text = multisub( @@ -330,6 +406,12 @@ def step_impl(context, name): check_file(name, context.text) +@then("the '{name}' json file includes") +def step_impl(context, name): + """Partial JSON match - the expected JSON must be a *subset* of the actual file.""" + check_json_subset(name, context.text, context) + + def multisub(patterns: List[Tuple[Pattern[str], str]], text: str) -> str: """Apply a list of tuples that each contain a regex + replace string.""" for pattern, replace in patterns: diff --git a/features/steps/manifest_steps.py b/features/steps/manifest_steps.py index 13641e22..30d0c2f4 100644 --- a/features/steps/manifest_steps.py +++ b/features/steps/manifest_steps.py @@ -9,16 +9,28 @@ from behave import given, then, when # pylint: disable=no-name-in-module -from features.steps.generic_steps import check_file, generate_file, remote_server_path +from features.steps.generic_steps import ( + apply_archive_substitutions, + check_file, + generate_file, + remote_server_path, +) + + +def apply_manifest_substitutions(context, contents: str) -> str: + """Apply context-specific substitutions to manifest contents.""" + result = apply_archive_substitutions(contents, context) + result = result.replace( + "url: some-remote-server", f"url: file:///{remote_server_path(context)}" + ) + return result def generate_manifest( context, name="dfetch.yaml", contents: Optional[str] = None, path=None ): contents = contents or context.text - manifest = contents.replace( - "url: some-remote-server", f"url: file:///{remote_server_path(context)}" - ) + manifest = apply_manifest_substitutions(context, contents) generate_file(os.path.join(path or os.getcwd(), name), manifest) @@ -37,7 +49,7 @@ def step_impl(context, name, path=None): @then("it should generate the manifest '{name}'") def step_impl(context, name): """Check a manifest.""" - check_file(name, context.text) + check_file(name, apply_manifest_substitutions(context, context.text)) @given("the manifest '{name}' with the projects:") diff --git a/features/validate-manifest.feature b/features/validate-manifest.feature index dd32b95a..b1e8a82f 100644 --- a/features/validate-manifest.feature +++ b/features/validate-manifest.feature @@ -51,6 +51,63 @@ Feature: Validate a manifest unexpected key not in schema 'manifest-wrong' """ + Scenario: A valid archive manifest with integrity hashes is validated + Given the manifest 'dfetch.yaml' + """ + manifest: + version: '0.0' + + projects: + - name: SomeLib-sha256 + url: https://example.com/SomeLib-1.0.tar.gz + vcs: archive + integrity: + hash: sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 + + - name: SomeLib-sha384 + url: https://example.com/SomeLib-2.0.tar.gz + vcs: archive + integrity: + hash: sha384:38b060a751ac96384cd9327eb1b1e36a21fdb71114be07434c0cc7bf63f6e1da274edebfe76f65fbd51ad2f14898b95b + + - name: SomeLib-sha512 + url: https://example.com/SomeLib-3.0.tar.gz + vcs: archive + integrity: + hash: sha512:cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e + + """ + When I run "dfetch validate" + Then the output shows + """ + Dfetch (0.12.1) + dfetch.yaml : valid + """ + + Scenario: A manifest with an invalid integrity hash format is rejected + Given the manifest 'dfetch.yaml' + """ + manifest: + version: '0.0' + + projects: + - name: SomeLib + url: https://example.com/SomeLib-1.0.tar.gz + vcs: archive + integrity: + hash: not-a-valid-hash + + """ + When I run "dfetch validate" + Then the output shows + """ + Dfetch (0.12.1) + Schema validation failed: + hash: not-a-valid-hash + ^ (line: 9) + found non-matching string + """ + Scenario: A manifest with duplicate project names Given the manifest 'dfetch.yaml' """ diff --git a/tests/test_archive.py b/tests/test_archive.py new file mode 100644 index 00000000..47890001 --- /dev/null +++ b/tests/test_archive.py @@ -0,0 +1,498 @@ +"""Unit tests for dfetch.vcs.archive and dfetch.project.archivesubproject.""" + +import hashlib +import io +import os +import pathlib +import tarfile +import tempfile +import zipfile +from unittest.mock import patch + +import pytest + +from dfetch.manifest.project import ProjectEntry +from dfetch.manifest.version import Version +from dfetch.project.archivesubproject import ArchiveSubProject, _suffix_for_url +from dfetch.vcs.archive import ( + ARCHIVE_EXTENSIONS, + ArchiveLocalRepo, + ArchiveRemote, + is_archive_url, +) + +# These are static methods on ArchiveLocalRepo +_check_archive_limits = ArchiveLocalRepo._check_archive_limits +_check_zip_members = ArchiveLocalRepo.check_zip_members +_check_tar_members = ArchiveLocalRepo._check_tar_members +_check_tar_member_type = ArchiveLocalRepo._check_tar_member_type + + +# --------------------------------------------------------------------------- +# is_archive_url +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "url", + [ + "https://example.com/lib.tar.gz", + "https://example.com/lib.tgz", + "https://example.com/lib.tar.bz2", + "https://example.com/lib.tar.xz", + "https://example.com/lib.zip", + "file:///tmp/lib.ZIP", # case-insensitive + ], +) +def test_is_archive_url_true(url): + assert is_archive_url(url) is True + + +@pytest.mark.parametrize( + "url", + [ + "https://example.com/repo.git", + "https://example.com/", + "svn://svn.example.com/trunk", + "https://example.com/lib.tar.gz.sig", + ], +) +def test_is_archive_url_false(url): + assert is_archive_url(url) is False + + +# --------------------------------------------------------------------------- +# _suffix_for_url +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "url,expected", + [ + ("https://example.com/lib.tar.gz", ".tar.gz"), + ("https://example.com/lib.tgz", ".tgz"), + ("https://example.com/lib.tar.bz2", ".tar.bz2"), + ("https://example.com/lib.tar.xz", ".tar.xz"), + ("https://example.com/lib.zip", ".zip"), + ("https://example.com/lib.unknown", ".archive"), + ], +) +def test_suffix_for_url(url, expected): + assert _suffix_for_url(url) == expected + + +def test_suffix_for_url_prefers_longest_match(): + # .tar.gz should win over .gz + assert _suffix_for_url("https://example.com/lib.tar.gz") == ".tar.gz" + + +# --------------------------------------------------------------------------- +# _check_archive_limits +# --------------------------------------------------------------------------- + + +def test_check_archive_limits_ok(): + _check_archive_limits(member_count=1, total_bytes=1024) # should not raise + + +def test_check_archive_limits_too_many_members(): + with pytest.raises(RuntimeError, match="safety limit"): + _check_archive_limits(member_count=10_001, total_bytes=0) + + +def test_check_archive_limits_too_large(): + with pytest.raises(RuntimeError, match="safety limit"): + _check_archive_limits(member_count=1, total_bytes=500 * 1024 * 1024 + 1) + + +# --------------------------------------------------------------------------- +# _check_zip_members +# --------------------------------------------------------------------------- + + +def _make_zip(member_names: list[str]) -> zipfile.ZipFile: + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + for name in member_names: + zf.writestr(name, "content") + buf.seek(0) + return zipfile.ZipFile(buf) + + +def test_check_zip_members_safe(): + zf = _make_zip(["project/README.md", "project/src/main.c"]) + _check_zip_members(zf) # should not raise + + +def test_check_zip_members_dot_dot(): + zf = _make_zip(["project/../etc/passwd"]) + with pytest.raises(RuntimeError, match="unsafe member path"): + _check_zip_members(zf) + + +def test_check_zip_members_absolute(): + zf = _make_zip(["/etc/passwd"]) + with pytest.raises(RuntimeError, match="unsafe member path"): + _check_zip_members(zf) + + +# --------------------------------------------------------------------------- +# _check_tar_members +# --------------------------------------------------------------------------- + + +def _make_tar(member_names: list[str]) -> tarfile.TarFile: + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w:gz") as tf: + for name in member_names: + content = b"content" + info = tarfile.TarInfo(name=name) + info.size = len(content) + tf.addfile(info, io.BytesIO(content)) + buf.seek(0) + return tarfile.open(fileobj=buf, mode="r:gz") + + +def test_check_tar_members_safe(): + tf = _make_tar(["project/README.md", "project/src/main.c"]) + _check_tar_members(tf) # should not raise + + +def test_check_tar_members_dot_dot(): + tf = _make_tar(["project/../etc/passwd"]) + with pytest.raises(RuntimeError, match="unsafe member path"): + _check_tar_members(tf) + + +def test_check_tar_members_absolute(): + tf = _make_tar(["/etc/passwd"]) + with pytest.raises(RuntimeError, match="unsafe member path"): + _check_tar_members(tf) + + +def _make_tar_with_member(setup_fn) -> tarfile.TarFile: + """Create an in-memory tar whose members are set up by *setup_fn(tf)*.""" + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w:") as tf: + setup_fn(tf) + buf.seek(0) + return tarfile.open(fileobj=buf, mode="r:") + + +def _add_symlink(tf: tarfile.TarFile, name: str, target: str) -> None: + info = tarfile.TarInfo(name=name) + info.type = tarfile.SYMTYPE + info.linkname = target + tf.addfile(info) + + +def _add_hardlink(tf: tarfile.TarFile, name: str, target: str) -> None: + info = tarfile.TarInfo(name=name) + info.type = tarfile.LNKTYPE + info.linkname = target + tf.addfile(info) + + +def _add_chrdev(tf: tarfile.TarFile, name: str) -> None: + info = tarfile.TarInfo(name=name) + info.type = tarfile.CHRTYPE + tf.addfile(info) + + +def _add_blkdev(tf: tarfile.TarFile, name: str) -> None: + info = tarfile.TarInfo(name=name) + info.type = tarfile.BLKTYPE + tf.addfile(info) + + +def _add_fifo(tf: tarfile.TarFile, name: str) -> None: + info = tarfile.TarInfo(name=name) + info.type = tarfile.FIFOTYPE + tf.addfile(info) + + +# --------------------------------------------------------------------------- +# _check_tar_member_type — symlink validation +# --------------------------------------------------------------------------- + + +def test_check_tar_member_type_safe_symlink(): + tf = _make_tar_with_member(lambda t: _add_symlink(t, "link", "relative/target")) + member = tf.getmembers()[0] + _check_tar_member_type(member) # should not raise + + +def test_check_tar_member_type_absolute_symlink(): + tf = _make_tar_with_member(lambda t: _add_symlink(t, "link", "/etc/passwd")) + member = tf.getmembers()[0] + with pytest.raises(RuntimeError, match="unsafe target"): + _check_tar_member_type(member) + + +def test_check_tar_member_type_dotdot_symlink(): + tf = _make_tar_with_member(lambda t: _add_symlink(t, "link", "../../etc/passwd")) + member = tf.getmembers()[0] + with pytest.raises(RuntimeError, match="unsafe target"): + _check_tar_member_type(member) + + +# --------------------------------------------------------------------------- +# _check_tar_member_type — hardlink validation +# --------------------------------------------------------------------------- + + +def test_check_tar_member_type_safe_hardlink(): + tf = _make_tar_with_member(lambda t: _add_hardlink(t, "hardlink", "project/real.c")) + member = tf.getmembers()[0] + _check_tar_member_type(member) # should not raise + + +def test_check_tar_member_type_dotdot_hardlink(): + tf = _make_tar_with_member( + lambda t: _add_hardlink(t, "hardlink", "../outside/secret.txt") + ) + member = tf.getmembers()[0] + with pytest.raises(RuntimeError, match="unsafe member path"): + _check_tar_member_type(member) + + +# --------------------------------------------------------------------------- +# _check_tar_member_type — device / FIFO validation +# --------------------------------------------------------------------------- + + +def test_check_tar_member_type_char_device(): + tf = _make_tar_with_member(lambda t: _add_chrdev(t, "dev/mem")) + member = tf.getmembers()[0] + with pytest.raises(RuntimeError, match="special file"): + _check_tar_member_type(member) + + +def test_check_tar_member_type_block_device(): + tf = _make_tar_with_member(lambda t: _add_blkdev(t, "dev/sda")) + member = tf.getmembers()[0] + with pytest.raises(RuntimeError, match="special file"): + _check_tar_member_type(member) + + +def test_check_tar_member_type_fifo(): + tf = _make_tar_with_member(lambda t: _add_fifo(t, "named_pipe")) + member = tf.getmembers()[0] + with pytest.raises(RuntimeError, match="special file"): + _check_tar_member_type(member) + + +# --------------------------------------------------------------------------- +# _check_tar_members — integration of member-type validation +# --------------------------------------------------------------------------- + + +def test_check_tar_members_rejects_absolute_symlink(): + tf = _make_tar_with_member(lambda t: _add_symlink(t, "link", "/etc/passwd")) + with pytest.raises(RuntimeError, match="unsafe target"): + _check_tar_members(tf) + + +def test_check_tar_members_rejects_device_file(): + tf = _make_tar_with_member(lambda t: _add_chrdev(t, "dev/mem")) + with pytest.raises(RuntimeError, match="special file"): + _check_tar_members(tf) + + +# --------------------------------------------------------------------------- +# ArchiveRemote.is_accessible +# --------------------------------------------------------------------------- + + +def test_is_accessible_existing_file(): + with tempfile.NamedTemporaryFile(suffix=".tar.gz", delete=False) as f: + path = f.name + try: + url = pathlib.Path(path).as_uri() + remote = ArchiveRemote(url) + assert remote.is_accessible() is True + finally: + os.remove(path) + + +def test_is_accessible_missing_file(): + remote = ArchiveRemote("file:////nonexistent/path/lib.tar.gz") + assert remote.is_accessible() is False + + +# --------------------------------------------------------------------------- +# ArchiveLocalRepo.extract - basic smoke test +# --------------------------------------------------------------------------- + + +def _make_tar_gz_file(archive_path: str, members: dict[str, bytes]) -> None: + with tarfile.open(archive_path, "w:gz") as tf: + for name, content in members.items(): + info = tarfile.TarInfo(name=name) + info.size = len(content) + tf.addfile(info, io.BytesIO(content)) + + +def test_extract_tar_gz_strips_top_level_dir(): + with tempfile.TemporaryDirectory() as tmp: + archive_path = os.path.join(tmp, "lib.tar.gz") + _make_tar_gz_file( + archive_path, + { + "lib-1.0/README.md": b"hello", + "lib-1.0/src/main.c": b"int main(){}", + }, + ) + dest = os.path.join(tmp, "dest") + ArchiveLocalRepo.extract(archive_path, dest) + assert os.path.isfile(os.path.join(dest, "README.md")) + assert os.path.isfile(os.path.join(dest, "src", "main.c")) + + +def test_extract_tar_gz_with_src_filter(): + with tempfile.TemporaryDirectory() as tmp: + archive_path = os.path.join(tmp, "lib.tar.gz") + _make_tar_gz_file( + archive_path, + { + "lib-1.0/README.md": b"readme", + "lib-1.0/src/main.c": b"main", + "lib-1.0/tests/test.c": b"test", + }, + ) + dest = os.path.join(tmp, "dest") + ArchiveLocalRepo.extract(archive_path, dest, src="src") + assert os.path.isfile(os.path.join(dest, "main.c")) + assert not os.path.exists(os.path.join(dest, "tests")) + # License-like files are not present in this archive so no extra files expected + + +def test_extract_zip(): + with tempfile.TemporaryDirectory() as tmp: + archive_path = os.path.join(tmp, "lib.zip") + with zipfile.ZipFile(archive_path, "w") as zf: + zf.writestr("lib-1.0/README.md", "hello") + zf.writestr("lib-1.0/src/main.c", "int main(){}") + dest = os.path.join(tmp, "dest") + ArchiveLocalRepo.extract(archive_path, dest) + assert os.path.isfile(os.path.join(dest, "README.md")) + assert os.path.isfile(os.path.join(dest, "src", "main.c")) + + +def test_all_archive_extensions_covered(): + """Ensure ARCHIVE_EXTENSIONS is a non-empty tuple of dot-prefixed strings.""" + assert len(ARCHIVE_EXTENSIONS) > 0 + for ext in ARCHIVE_EXTENSIONS: + assert ext.startswith(".") + + +# --------------------------------------------------------------------------- +# Helpers shared by ArchiveSubProject tests +# --------------------------------------------------------------------------- + + +def _make_tar_gz(path: str, content: bytes = b"hello") -> None: + """Write a minimal .tar.gz archive containing one file to *path*.""" + with tarfile.open(path, "w:gz") as tf: + info = tarfile.TarInfo(name="pkg/README.md") + info.size = len(content) + tf.addfile(info, io.BytesIO(content)) + + +def _sha256_file(path: str) -> str: + h = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + h.update(chunk) + return h.hexdigest() + + +def _file_url(path: str) -> str: + return pathlib.Path(path).as_uri() + + +def _make_subproject(url: str) -> ArchiveSubProject: + return ArchiveSubProject( + ProjectEntry({"name": "pkg", "url": url, "vcs": "archive"}) + ) + + +# --------------------------------------------------------------------------- +# ArchiveSubProject._download_and_compute_hash – explicit url parameter +# --------------------------------------------------------------------------- + + +def test_download_and_compute_hash_default_uses_remote_repo(): + """Without an explicit url the hash is computed from self._remote_repo.""" + with tempfile.TemporaryDirectory() as tmp: + archive = os.path.join(tmp, "pkg.tar.gz") + _make_tar_gz(archive) + url = _file_url(archive) + sp = _make_subproject(url) + + result = sp._download_and_compute_hash("sha256") + + assert result.algorithm == "sha256" + assert result.hex_digest == _sha256_file(archive) + + +def test_download_and_compute_hash_explicit_url_overrides_remote_repo(): + """When *url* is supplied a fresh ArchiveRemote for that URL is used. + + This is the regression guard for the fix: if the manifest URL was changed + after fetching, freeze must still hash the *original* archive (the one + recorded in the on-disk revision), not the current manifest URL. + """ + with tempfile.TemporaryDirectory() as tmp: + archive_a = os.path.join(tmp, "pkg_a.tar.gz") + archive_b = os.path.join(tmp, "pkg_b.tar.gz") + _make_tar_gz(archive_a, content=b"version A") + _make_tar_gz(archive_b, content=b"version B") + url_a = _file_url(archive_a) + url_b = _file_url(archive_b) + + # SubProject points to archive_b (current manifest URL). + sp = _make_subproject(url_b) + + # Passing url=url_a must use archive_a's content. + result = sp._download_and_compute_hash("sha256", url=url_a) + + assert result.hex_digest == _sha256_file(archive_a) + assert result.hex_digest != _sha256_file(archive_b) + + +# --------------------------------------------------------------------------- +# ArchiveSubProject.freeze_project – uses on-disk revision URL +# --------------------------------------------------------------------------- + + +def test_freeze_project_uses_on_disk_url_not_manifest_url(): + """freeze_project must hash the archive at the on-disk revision URL. + + Scenario: the manifest URL was updated after the last fetch. Without the + fix, freeze would download from the new (current) manifest URL and produce + a hash that doesn't match the fetched archive. With the fix it uses the + URL stored in the on-disk revision. + """ + with tempfile.TemporaryDirectory() as tmp: + archive_a = os.path.join(tmp, "pkg_a.tar.gz") + archive_b = os.path.join(tmp, "pkg_b.tar.gz") + _make_tar_gz(archive_a, content=b"original fetch") + _make_tar_gz(archive_b, content=b"updated manifest url") + url_a = _file_url(archive_a) + url_b = _file_url(archive_b) + + # SubProject now points to archive_b (manifest was updated after fetch). + sp = _make_subproject(url_b) + + # Simulate on-disk state: was fetched from url_a (no hash-pin at the time). + on_disk = Version(revision=url_a) + with patch.object(sp, "on_disk_version", return_value=on_disk): + project_entry = ProjectEntry( + {"name": "pkg", "url": url_b, "vcs": "archive"} + ) + sp.freeze_project(project_entry) + + expected_hash = f"sha256:{_sha256_file(archive_a)}" + assert project_entry.hash == expected_hash + assert _sha256_file(archive_b) not in project_entry.hash diff --git a/tests/test_integrity.py b/tests/test_integrity.py new file mode 100644 index 00000000..0518a094 --- /dev/null +++ b/tests/test_integrity.py @@ -0,0 +1,81 @@ +"""Unit tests for the Integrity dataclass and ProjectEntry integrity fields.""" + +from dfetch.manifest.project import Integrity, ProjectEntry + +# --------------------------------------------------------------------------- +# Integrity dataclass +# --------------------------------------------------------------------------- + + +def test_integrity_empty_is_falsy(): + assert not Integrity() + + +def test_integrity_with_hash_is_truthy(): + assert Integrity(hash="sha256:" + "a" * 64) + + +def test_integrity_as_yaml_empty(): + assert Integrity().as_yaml() == {} + + +def test_integrity_as_yaml_with_hash(): + h = "sha256:" + "a" * 64 + assert Integrity(hash=h).as_yaml() == {"hash": h} + + +# --------------------------------------------------------------------------- +# ProjectEntry with integrity block +# --------------------------------------------------------------------------- + + +def test_projectentry_hash_from_integrity_block(): + h = "sha256:" + "b" * 64 + project = ProjectEntry({"name": "lib", "integrity": {"hash": h}}) + assert project.hash == h + + +def test_projectentry_hash_empty_by_default(): + project = ProjectEntry({"name": "lib"}) + assert project.hash == "" + + +def test_projectentry_integrity_truthy_with_hash(): + h = "sha256:" + "c" * 64 + project = ProjectEntry({"name": "lib", "integrity": {"hash": h}}) + assert project.integrity + + +def test_projectentry_integrity_falsy_without_hash(): + project = ProjectEntry({"name": "lib", "integrity": {}}) + assert not project.integrity + + +def test_projectentry_as_yaml_includes_integrity(): + h = "sha256:" + "d" * 64 + project = ProjectEntry( + { + "name": "lib", + "url": "https://example.com/lib.tar.gz", + "vcs": "archive", + "integrity": {"hash": h}, + } + ) + yaml_data = project.as_yaml() + assert yaml_data["integrity"] == {"hash": h} + + +def test_projectentry_as_yaml_omits_empty_integrity(): + project = ProjectEntry({"name": "lib"}) + yaml_data = project.as_yaml() + assert "integrity" not in yaml_data + + +def test_projectentry_hash_setter(): + project = ProjectEntry( + {"name": "lib", "url": "https://example.com/lib.tar.gz", "vcs": "archive"} + ) + h = "sha256:" + "e" * 64 + project.hash = h + assert project.hash == h + assert project.integrity.hash == h diff --git a/tests/test_integrity_hash.py b/tests/test_integrity_hash.py new file mode 100644 index 00000000..d0c06261 --- /dev/null +++ b/tests/test_integrity_hash.py @@ -0,0 +1,111 @@ +"""Unit tests for dfetch.vcs.integrity_hash.""" + +import pytest + +from dfetch.vcs.integrity_hash import SUPPORTED_HASH_ALGORITHMS, IntegrityHash + +# --------------------------------------------------------------------------- +# SUPPORTED_HASH_ALGORITHMS +# --------------------------------------------------------------------------- + + +def test_supported_hash_algorithms_contains_sha256(): + assert "sha256" in SUPPORTED_HASH_ALGORITHMS + + +def test_supported_hash_algorithms_contains_sha384(): + assert "sha384" in SUPPORTED_HASH_ALGORITHMS + + +def test_supported_hash_algorithms_contains_sha512(): + assert "sha512" in SUPPORTED_HASH_ALGORITHMS + + +# --------------------------------------------------------------------------- +# IntegrityHash.parse +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "value,expected_algo,expected_hex", + [ + ("sha256:abc123", "sha256", "abc123"), + ("sha384:def456", "sha384", "def456"), + ("sha512:ghi789", "sha512", "ghi789"), + ], +) +def test_parse_valid(value, expected_algo, expected_hex): + h = IntegrityHash.parse(value) + assert h is not None + assert h.algorithm == expected_algo + assert h.hex_digest == expected_hex + + +def test_parse_returns_none_for_url(): + assert IntegrityHash.parse("https://example.com/lib.tar.gz") is None + + +def test_parse_returns_none_for_plain_string(): + assert IntegrityHash.parse("notahash") is None + + +# --------------------------------------------------------------------------- +# IntegrityHash.__str__ / __repr__ +# --------------------------------------------------------------------------- + + +def test_str_roundtrip(): + h = IntegrityHash("sha256", "abc123") + assert str(h) == "sha256:abc123" + + +def test_repr(): + h = IntegrityHash("sha256", "abc123") + assert repr(h) == "IntegrityHash('sha256', 'abc123')" + + +# --------------------------------------------------------------------------- +# IntegrityHash.__eq__ / __hash__ +# --------------------------------------------------------------------------- + + +def test_eq_same(): + assert IntegrityHash("sha256", "abc") == IntegrityHash("sha256", "abc") + + +def test_eq_case_insensitive_hex(): + assert IntegrityHash("sha256", "ABCDEF") == IntegrityHash("sha256", "abcdef") + + +def test_eq_different_digest(): + assert IntegrityHash("sha256", "aaa") != IntegrityHash("sha256", "bbb") + + +def test_eq_non_integrity_hash_returns_not_implemented(): + assert IntegrityHash("sha256", "abc").__eq__("sha256:abc") is NotImplemented + + +def test_hash_usable_in_set(): + a = IntegrityHash("sha256", "abc") + b = IntegrityHash("sha256", "ABC") + assert len({a, b}) == 1 + + +# --------------------------------------------------------------------------- +# IntegrityHash.matches +# --------------------------------------------------------------------------- + + +def test_matches_equal(): + h = IntegrityHash("sha256", "a" * 64) + assert h.matches("a" * 64) is True + + +def test_matches_case_insensitive(): + h = IntegrityHash("sha256", "abcdef") + assert h.matches("ABCDEF") is True + + +def test_matches_not_equal(): + h = IntegrityHash("sha256", "a" * 64) + assert h.matches("b" * 64) is False diff --git a/tests/test_patch.py b/tests/test_patch.py index 4c9d88d0..6b408b8a 100644 --- a/tests/test_patch.py +++ b/tests/test_patch.py @@ -247,13 +247,13 @@ def test_reverse_patch_zero_length_hunk(): assert _reverse_patch(patch) == expected -# Random small file: 5–15 lines, each line 5–20 chars (filtered to exclude control chars) +# Random small file: 5-15 lines, each line 5-20 chars (filtered to exclude control chars) st_file_lines = st.lists( st.text( min_size=5, max_size=20, alphabet=st.characters( - blacklist_categories=("Cc", "Cs"), blacklist_characters="\r\n" + blacklist_categories=("Cc", "Cs", "Zl", "Zp"), blacklist_characters="\r\n" ), ), min_size=5, diff --git a/tests/test_purl.py b/tests/test_purl.py index a96aa02c..c78f2e44 100644 --- a/tests/test_purl.py +++ b/tests/test_purl.py @@ -2,7 +2,8 @@ import pytest -from dfetch.util.purl import remote_url_to_purl +from dfetch.util.purl import vcs_url_to_purl +from dfetch.vcs.archive import archive_url_to_purl @pytest.mark.parametrize( @@ -117,8 +118,71 @@ ], ) def test_remote_url_to_purl(url, expected): - purl = remote_url_to_purl(url) + purl = vcs_url_to_purl(url) if expected is None: assert purl is None else: assert str(purl) == expected + + +# --------------------------------------------------------------------------- +# Archive URL → PURL (attribute-based to avoid percent-encoding sensitivity) +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "url,expected_name,expected_namespace,expected_download_url", + [ + ( + "https://example.com/releases/mylib-1.0.tar.gz", + "mylib-1.0", + "example.com", + "https://example.com/releases/mylib-1.0.tar.gz", + ), + ( + "https://example.com/lib.zip", + "lib", + "example.com", + "https://example.com/lib.zip", + ), + ( + "https://releases.example.com/project-2.1.tar.bz2", + "project-2.1", + "releases.example.com", + "https://releases.example.com/project-2.1.tar.bz2", + ), + ( + "https://example.com/lib.tgz", + "lib", + "example.com", + "https://example.com/lib.tgz", + ), + ( + "https://example.com/lib.tar.xz", + "lib", + "example.com", + "https://example.com/lib.tar.xz", + ), + ( + "file:///tmp/local-archive.tar.gz", + "local-archive", + "", # no hostname for file:// URLs + "file:///tmp/local-archive.tar.gz", + ), + ], +) +def test_archive_url_to_purl_attributes( + url, expected_name, expected_namespace, expected_download_url +): + purl = archive_url_to_purl(url) + assert purl.type == "generic" + assert purl.name == expected_name + assert (purl.namespace or "") == expected_namespace + assert purl.qualifiers.get("download_url") == expected_download_url + assert "vcs_url" not in (purl.qualifiers or {}) + + +def test_archive_purl_with_version(): + url = "https://example.com/lib-1.0.tar.gz" + purl = archive_url_to_purl(url, version="sha256:" + "a" * 64) + assert purl.version == "sha256:" + "a" * 64 diff --git a/tests/test_subproject.py b/tests/test_subproject.py index b3503c29..462086dc 100644 --- a/tests/test_subproject.py +++ b/tests/test_subproject.py @@ -4,7 +4,7 @@ # flake8: noqa from typing import Optional, Union -from unittest.mock import patch +from unittest.mock import MagicMock, call, patch import pytest @@ -135,6 +135,111 @@ def test_are_there_local_changes( ) +def test_update_uses_ignored_files_callback_for_stored_hash(): + """The hash stored after fetch must use the post-fetch ignored files. + + The callback is called twice: once before clearing (pre-fetch local-changes + check) and once after extraction (to compute the stored hash). The second + call returns the post-extraction state so the stored hash matches what + dfetch check will compute later. + """ + pre_fetch_ignored = ["old_file.txt"] + post_fetch_ignored = ["new_ignored.txt"] + + # Return different values on successive calls to simulate pre/post extraction + callback = MagicMock(side_effect=[pre_fetch_ignored, post_fetch_ignored]) + + with patch("dfetch.project.subproject.os.path.exists") as mock_exists: + with patch("dfetch.project.subproject.Metadata.from_file") as mock_meta_file: + with patch("dfetch.project.subproject.hash_directory") as mock_hash: + with patch("dfetch.project.subproject.safe_rm"): + with patch("dfetch.project.subproject.Metadata.dump"): + mock_exists.return_value = True + mock_meta_file.return_value.version = Version(revision="abc") + mock_hash.return_value = "hash123" + + subproject = ConcreteSubProject(ProjectEntry({"name": "p1"})) + subproject._wanted_version = Version(revision="new") + + subproject.update(force=True, ignored_files_callback=callback) + + assert callback.call_count == 2 + # The hash must be computed with the post-fetch ignored list + hash_call_skiplist = mock_hash.call_args[1]["skiplist"] + assert "new_ignored.txt" in hash_call_skiplist + assert "old_file.txt" not in hash_call_skiplist + + +@pytest.mark.parametrize( + "name, project_version, on_disk_version, expect_return, expect_project_version", + [ + ( + "already-pinned-tag-matches", + Version(tag="v1.0", branch="main"), + Version(tag="v1.0", branch="main"), + None, + Version(tag="v1.0", branch="main"), + ), + ( + "already-pinned-tag-matches-branch-differs", + Version(tag="v1.0"), + Version(tag="v1.0", branch="main"), + None, + Version(tag="v1.0"), + ), + ( + "already-pinned-revision-matches-branch-differs", + Version(revision="abc123"), + Version(revision="abc123", branch="feature"), + "abc123", + Version(revision="abc123", branch="feature"), + ), + ( + "tag-differs-triggers-freeze", + Version(tag="v1.0"), + Version(tag="v2.0", branch="main"), + "v2.0", + Version(tag="v2.0", branch="main"), + ), + ( + "revision-differs-triggers-freeze", + Version(revision="abc123"), + Version(revision="def456", branch="main"), + "def456", + Version(revision="def456", branch="main"), + ), + ( + "no-on-disk-version", + Version(tag="v1.0"), + None, + None, + Version(tag="v1.0"), + ), + ], +) +def test_freeze_project( + name: str, + project_version: Version, + on_disk_version: Union[Version, None], + expect_return: Union[str, None], + expect_project_version: Version, +): + with patch("dfetch.project.subproject.os.path.exists") as mocked_path_exists: + with patch("dfetch.project.subproject.Metadata.from_file") as mocked_metadata: + subproject = ConcreteSubProject(ProjectEntry({"name": "proj1"})) + + mocked_path_exists.return_value = bool(on_disk_version) + mocked_metadata().version = on_disk_version + + project = ProjectEntry({"name": "proj1"}) + project.version = project_version + + result = subproject.freeze_project(project) + + assert result == expect_return + assert project.version == expect_project_version + + @pytest.mark.parametrize( "ci_env_value, expected_result", [ diff --git a/tests/test_update.py b/tests/test_update.py index aa78e0b4..f6078185 100644 --- a/tests/test_update.py +++ b/tests/test_update.py @@ -5,7 +5,7 @@ import argparse from pathlib import Path -from unittest.mock import Mock, patch +from unittest.mock import ANY, Mock, patch import pytest @@ -75,7 +75,16 @@ def test_forced_update(): update(args) mocked_create.return_value.update.assert_called_once_with( - force=True, files_to_ignore=[] + force=True, + ignored_files_callback=ANY, + ) + + cb = mocked_create.return_value.update.call_args.kwargs[ + "ignored_files_callback" + ] + cb() + fake_superproject.ignored_files.assert_called_once_with( + "some_dest" ) diff --git a/tests/test_util.py b/tests/test_util.py new file mode 100644 index 00000000..5e7010c3 --- /dev/null +++ b/tests/test_util.py @@ -0,0 +1,157 @@ +"""Unit tests for dfetch.util.util.""" + +# mypy: ignore-errors +# flake8: noqa + +import pytest + +from dfetch.util.util import copy_src_subset, hash_directory, prune_files_by_pattern + +# --------------------------------------------------------------------------- +# copy_src_subset – path-traversal protection +# --------------------------------------------------------------------------- + + +def test_copy_src_subset_copies_file(tmp_path): + src_root = tmp_path / "src" + src_root.mkdir() + (src_root / "lib.h").write_text("content") + dest = tmp_path / "dest" + dest.mkdir() + + copy_src_subset(str(src_root), str(dest), "lib.h", keep_licenses=False) + + assert (dest / "lib.h").read_text() == "content" + + +def test_copy_src_subset_copies_directory(tmp_path): + src_root = tmp_path / "src" + src_root.mkdir() + sub = src_root / "subdir" + sub.mkdir() + (sub / "a.c").write_text("code") + dest = tmp_path / "dest" + dest.mkdir() + + copy_src_subset(str(src_root), str(dest), "subdir", keep_licenses=False) + + assert (dest / "a.c").read_text() == "code" + + +@pytest.mark.parametrize( + "evil_src", + [ + "../outside.txt", + "../../etc/passwd", + "/etc/passwd", + ], +) +def test_copy_src_subset_rejects_path_traversal(tmp_path, evil_src): + src_root = tmp_path / "src" + src_root.mkdir() + dest = tmp_path / "dest" + dest.mkdir() + + with pytest.raises(RuntimeError): + copy_src_subset(str(src_root), str(dest), evil_src, keep_licenses=False) + + +# --------------------------------------------------------------------------- +# hash_directory – determinism +# --------------------------------------------------------------------------- + + +def test_hash_directory_is_deterministic(tmp_path): + """hash_directory must return the same value on repeated calls.""" + d = tmp_path / "proj" + d.mkdir() + (d / "a.c").write_text("int main(){}") + (d / "b.h").write_text("#pragma once") + sub = d / "src" + sub.mkdir() + (sub / "util.c").write_text("void util(){}") + + assert hash_directory(str(d), None) == hash_directory(str(d), None) + + +def test_hash_directory_differs_when_file_content_changes(tmp_path): + """Modifying a file must produce a different hash.""" + d = tmp_path / "proj" + d.mkdir() + f = d / "file.txt" + f.write_text("original") + + h1 = hash_directory(str(d), None) + f.write_text("modified") + h2 = hash_directory(str(d), None) + + assert h1 != h2 + + +def test_hash_directory_skiplist_excludes_file(tmp_path): + """Files listed in skiplist must not contribute to the hash.""" + d = tmp_path / "proj" + d.mkdir() + (d / "tracked.txt").write_text("data") + (d / "ignored.txt").write_text("ignored data") + + h_with_skip = hash_directory(str(d), ["ignored.txt"]) + (d / "ignored.txt").write_text("changed ignored data") + h_with_skip2 = hash_directory(str(d), ["ignored.txt"]) + + assert h_with_skip == h_with_skip2 + + +# --------------------------------------------------------------------------- +# prune_files_by_pattern – delete-order safety +# --------------------------------------------------------------------------- + + +def test_prune_removes_matched_file(tmp_path): + (tmp_path / "remove_me.txt").write_text("gone") + prune_files_by_pattern(str(tmp_path), ["remove_me.txt"]) + assert not (tmp_path / "remove_me.txt").exists() + + +def test_prune_parent_and_child_both_matched_no_error(tmp_path): + """When a dir and a file inside it both match, removal must not raise. + + Before the fix, removing the parent first left the child path pointing at a + non-existent location; the subsequent safe_rm call then raised + FileNotFoundError. + """ + src = tmp_path / "src" + src.mkdir() + (src / "main.c").write_text("int main(){}") + + # "src" matches the directory; "main.c" matches the child inside it. + prune_files_by_pattern(str(tmp_path), ["src", "main.c"]) + + assert not src.exists() + + +def test_prune_preserves_license_file(tmp_path): + """License files must survive even when they match a removal pattern.""" + (tmp_path / "LICENSE").write_text("MIT") + (tmp_path / "delete_me.txt").write_text("gone") + + prune_files_by_pattern(str(tmp_path), ["LICENSE", "delete_me.txt"]) + + assert (tmp_path / "LICENSE").exists() + assert not (tmp_path / "delete_me.txt").exists() + + +def test_prune_skips_already_removed_paths(tmp_path): + """Paths that no longer exist after a parent removal are silently skipped.""" + parent = tmp_path / "libs" + parent.mkdir() + child = parent / "lib.a" + child.write_text("binary") + unrelated = tmp_path / "readme.txt" + unrelated.write_text("keep") + + # Both "libs" (directory) and "libs/lib.a" (child) match; no exception expected. + prune_files_by_pattern(str(tmp_path), ["libs", "lib.a"]) + + assert not parent.exists() + assert unrelated.exists()