Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,24 @@ classifiers = [
requires-python = ">=3.10"

dependencies = [
"beautifulsoup4==4.12.3",
"colorama==0.4.6",
"distlib==0.3.9",
"Jinja2==3.1.4",
"jsonpickle==4.0.1",
"jsonschema[format-nongpl]==4.23.0",
"lark==1.2.2",
"lxml-stubs==0.5.1",
"maven-artifact==0.3.4",
"packaging==24.2",
"pygit2==1.16.0",
"referencing==0.35.1",
"reqstool-python-decorators==0.0.5",
"requests-file==2.1.0",
"requests==2.32.3",
"ruamel.yaml==0.18.6",
"tabulate==0.9.0",
"xmldict==0.4.1",
"reqstool-python-decorators==0.0.5",
"packaging==24.2",
"requests==2.32.3",
"beautifulsoup4==4.12.3",
]

[project.scripts]
Expand All @@ -69,9 +70,12 @@ source = "vcs"
[tool.hatch.version.raw-options]
local_scheme = "no-local-version"

[tool.hatch.build.targets.wheel.hooks.decorators]
dependencies = ["reqstool-python-hatch-plugin==0.0.4"]
path = ["src", "tests"]
[tool.hatch.build.hooks.reqstool]
dependencies = ["reqstool-python-hatch-plugin == 0.1.1"]
sources = ["src", "tests"]
dataset_directory = "docs/reqstool"
output_directory = "build/reqstool"
test_results = ["build/**/junit.xml"]

[tool.hatch.envs.dev]
dependencies = [
Expand Down
39 changes: 39 additions & 0 deletions src/reqstool/common/dataclasses/maven_version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright © LFV

import re
from dataclasses import dataclass, field
from typing import Optional


@dataclass
class MavenVersion:
version: str
major: int = field(init=False)
minor: int = field(init=False)
patch: int = field(init=False)
build_number: Optional[int] = field(init=False)
qualifier: Optional[str] = field(init=False)
snapshot: bool = field(init=False, default=False) # New field for SNAPSHOT detection

PATTERN_SEMANTIC_VERSION: re.Pattern[str] = re.compile(
"^(?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(?:-(?P<buildno>\d+))?(?:[-.]?(?P<qualifier>"
"(?!SNAPSHOT$)[a-zA-Z][a-zA-Z0-9]*))?(?:-(?P<snapshot>SNAPSHOT))?$"
)

def __post_init__(self) -> None:
match: Optional[re.Match[str]] = self.PATTERN_SEMANTIC_VERSION.match(self.version)

if not match:
raise ValueError(f"Not a valid semantic Maven version string: {self.version}")

# semantic version is mandatory
self.major = int(match.group("major"))
self.minor = int(match.group("minor"))
self.patch = int(match.group("patch"))

self.build_number = int(match.group("buildno")) if match.group("buildno") else None

self.qualifier = match.group("qualifier")

# check if "SNAPSHOT" is in the version string
self.snapshot = True if match.group("snapshot") else False
108 changes: 96 additions & 12 deletions src/reqstool/locations/git_location.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
# Copyright © LFV

# type: ignore[no-untyped-call]

import logging
import os
import re
from dataclasses import dataclass
from typing import List, Optional, Union

from pygit2 import RemoteCallbacks, UserPass, clone_repository
import pygit2
from attrs import field
from pygit2 import CredentialType, RemoteCallbacks, UserPass, clone_repository
from reqstool_python_decorators.decorators.decorators import Requirements

from reqstool.locations.location import LocationInterface

RE_GIT_TAG_VERSION: re.Pattern[str] = re.compile(
r"^refs\/tags\/(?P<version>(?:v)?\d+\.\d+\.\d+\S*)$", re.VERBOSE | re.IGNORECASE
)

RE_SEMANTIC_VERSION_PLUS_EXTRA: re.Pattern[str] = re.compile(
r"^(?P<version>(?P<semantic>(?:v)?\d+\.\d+\.\d+)(?P<rest>\S+)?)$", re.VERBOSE | re.IGNORECASE
)


@Requirements("REQ_002")
@dataclass(kw_only=True)
Expand All @@ -18,24 +32,94 @@ class GitLocation(LocationInterface):
env_token: str
path: str

def _make_available_on_localdisk(self, dst_path: str) -> str:
api_token = os.getenv(self.env_token)
token: Optional[str] = field(init=False, default=None)

def __post_init__(self) -> None:
# Retrieve token from environment variable
self.token = os.getenv(self.env_token) if self.env_token else None

if self.token:
logging.debug("Using token for authentication")

def _make_available_on_localdisk(self, dst_path: str) -> str:
if self.branch:
repo = clone_repository(
url=self.url, path=dst_path, checkout_branch=self.branch, callbacks=self.MyRemoteCallbacks(api_token)
url=self.url, path=dst_path, checkout_branch=self.branch, callbacks=MyRemoteCallbacks(self.token)
)
else:
repo = clone_repository(url=self.url, path=dst_path, callbacks=self.MyRemoteCallbacks(api_token))
repo = clone_repository(url=self.url, path=dst_path, callbacks=MyRemoteCallbacks(self.token))

logging.debug(f"Cloned repo {self.url} (branch: {self.branch}) to {repo.workdir}\n")

return repo.workdir
return str(repo.workdir)

@staticmethod
def _get_all_versions(repo_path: str, token: Optional[str] = None) -> List[str]:
"""
Returns all versions from tags from the repository located at `repo_path` that match the RE_GIT_TAG_VERSION pattern.
"""

repo = pygit2.Repository(path=repo_path)

# If the repository needs authentication for remote references:
# if repo.remotes:
# callbacks = MyRemoteCallbacks(token)
# remote = repo.remotes["origin"]

# # Ensure remote references are updated (no fetch, just accessing the tags)
# remote.fetch(callbacks=callbacks)

all_versions: list[str] = list()

for ref in repo.references:
if match := RE_GIT_TAG_VERSION.match(ref):
all_versions.append(match.group("version"))

if not all_versions:
raise ValueError(f"No versions found for repo. {repo_path}")

return all_versions

def _resolve_version(self) -> str:
"""Resolve version based on version specifier."""

if self.version not in ["latest", "latest-stable", "latest-unstable"]:
return self.version

all_versions: List[str] = self._get_all_versions(package=self.package, base_url=self.url, token=self.token)

if self.version == "latest":
return all_versions[-1]

is_stable = self.version == "latest-stable"

filtered_versions: List[str] = self._filter_versions(all_versions=all_versions, stable_versions=is_stable)

# no matching version found
if not filtered_versions:
version_type = "stable" if is_stable else "unstable"
raise ValueError(f"No {version_type} versions found for {self.package}")

return filtered_versions[-1]

@staticmethod
def _filter_versions(all_versions: List[str], stable_versions: bool) -> List[str]:
return [
v
for v in all_versions
if (match := RE_SEMANTIC_VERSION_PLUS_EXTRA.search(v)) and (bool(match.group("rest")) != stable_versions)
]


class MyRemoteCallbacks(RemoteCallbacks):
def __init__(self, api_token):
self.auth_method = "" # x-oauth-basic, x-access-token
self.api_token = api_token
class MyRemoteCallbacks(RemoteCallbacks):
def __init__(self, token: str):
self.auth_method = "" # x-oauth-basic, x-access-token
self.token = token

def credentials(self, url, username_from_url, allowed_types):
return UserPass(username=self.auth_method, password=self.api_token)
def credentials(
self,
url: str,
username_from_url: Union[str, None],
allowed_types: CredentialType,
) -> UserPass:
return UserPass(username=self.auth_method, password=self.token)
133 changes: 99 additions & 34 deletions src/reqstool/locations/maven_location.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it is ideal to use RequestException for everything. If we receive a list of versions but there is no stable version for example then is more of NoMatchingVersionException.

But, it works. So KISS for now.

Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import os
import sys
from dataclasses import dataclass, field
from typing import Optional
from typing import List, Optional
from zipfile import ZipFile

from maven_artifact import Artifact, Downloader, RequestException
from lxml import etree
from maven_artifact import Artifact, Downloader, RequestException, Resolver
from reqstool_python_decorators.decorators.decorators import Requirements

from reqstool.common.dataclasses.maven_version import MavenVersion
from reqstool.locations.location import LocationInterface


Expand All @@ -19,53 +21,116 @@ class MavenLocation(LocationInterface):
url: Optional[str] = "https://repo.maven.apache.org/maven2"
group_id: str
artifact_id: str
version: str
version: str # Can be "latest", "latest-stable", "latest-unstable", or specific version
classifier: str = field(default="reqstool")
env_token: str
env_token: Optional[str]
token: Optional[str] = field(init=False, default=None)

def _make_available_on_localdisk(self, dst_path: str):
token = os.getenv(self.env_token)
def __post_init__(self) -> None:
# Retrieve token from environment variable
self.token = os.getenv(self.env_token) if self.env_token else None

# assume OAuth Bearer, see: https://georgearisty.dev/posts/oauth2-token-bearer-usage/
downloader = Downloader(base=self.url, token=token)
if self.token:
logging.debug("Using OAuth Bearer token for authentication")

artifact = Artifact(
group_id=self.group_id,
artifact_id=self.artifact_id,
version=self.version,
classifier=self.classifier,
extension="zip",
)

logging.debug(f"Downloading {artifact} from {self.url} to {dst_path}")
def _make_available_on_localdisk(self, dst_path: str) -> str:
downloader = Downloader(base=self.url, token=self.token)
resolver = downloader.resolver

try:
if not downloader.download(artifact, filename=dst_path):
raise RequestException(f"Error downloading artifact {artifact} from: {self.url}")
resolved_version: str = self._resolve_version(resolver)

artifact: Artifact = Artifact(
group_id=self.group_id,
artifact_id=self.artifact_id,
version=resolved_version,
classifier=self.classifier,
extension="zip",
)
resolved_artifact: Artifact = resolver.resolve(artifact)

if resolved_artifact.version != resolved_version:
logging.debug(f"Resolved version '{self.version}' to: {resolved_artifact.version}")

if not downloader.download(resolved_artifact, filename=dst_path):
raise RequestException(f"Error downloading artifact {resolved_artifact} from: {self.url}")

return self._extract_zip(resolved_artifact.get_filename(dst_path), dst_path)

except RequestException as e:
logging.fatal(e.msg)
logging.fatal(str(e))
sys.exit(1)
except Exception as e:
logging.fatal(f"Unexpected error: {str(e)}")
sys.exit(1)

logging.debug(f"Unzipping {artifact.get_filename(dst_path)} to {dst_path}\n")
def _get_all_versions(self, resolver: Resolver, artifact: Artifact) -> List[str]:
"""Get all available versions for the artifact."""
try:
# Construct Maven metadata path
group_path: str = artifact.group_id.replace("/", ".") # Convert slashes to dots first
group_path: str = group_path.replace(".", "/") # Then convert dots to slashes
path = f"/{group_path}/{artifact.artifact_id}/maven-metadata.xml"

with ZipFile(artifact.get_filename(dst_path), "r") as zip_ref:
# Extracting all the members of the zip
# into a specific location.
top_level_dirs = {name.split("/")[0] for name in zip_ref.namelist() if "/" in name}
xml = resolver.requestor.request(
resolver.base + path, resolver._onFail, lambda r: etree.fromstring(r.content)
)
all_versions: List[str] = xml.xpath("/metadata/versioning/versions/version/text()")

if not all_versions:
raise RequestException(f"No versions found for {artifact}")

return all_versions
except Exception as e:
raise RequestException(f"Failed to get versions for {artifact}: {str(e)}")

def _resolve_version(self, resolver: Resolver) -> str:
"""Resolve version based on version specifier."""

if self.version not in ["latest", "latest-stable", "latest-unstable"]:
return self.version

base_artifact: Artifact = Artifact(
group_id=self.group_id, version=self.version, artifact_id=self.artifact_id, classifier=self.classifier
)

all_versions: List[str] = self._get_all_versions(resolver, base_artifact)

if self.version == "latest":
return all_versions[-1]

is_stable = self.version == "latest-stable"
filtered_versions: List[str] = self._filter_versions(all_versions=all_versions, stable_versions=is_stable)

# no matching version found
if not filtered_versions:
version_type: str = "stable" if is_stable else "unstable"
raise RequestException(f"No {version_type} versions found for {self.group_id}:{self.artifact_id}")

return filtered_versions[-1]

@staticmethod
def _filter_versions(all_versions: List[str], stable_versions: bool) -> List[str]:
filtered_versions: List[str] = [
mv.version
for v in all_versions
if (mv := MavenVersion(version=v)) and ((bool(mv.qualifier) or mv.snapshot) != stable_versions)
]

return filtered_versions

def _extract_zip(self, zip_file: str, dst_path: str) -> str:
"""Extract ZIP file and return top level directory."""
logging.debug(f"Unzipping {zip_file} to {dst_path}")
with ZipFile(zip_file, "r") as zip_ref:
top_level_dirs = {name.split("/")[0] for name in zip_ref.namelist() if "/" in name}
zip_ref.extractall(path=dst_path)

if len(top_level_dirs) != 1:
logging.fatal(
f"Maven zip artifact {artifact} from {self.url} did not have one and only one"
f" top level directory: {top_level_dirs}"
raise RequestException(
f"Maven zip artifact does not have one and only one top level directory: {top_level_dirs}"
)
sys.exit(1)

top_level_dir = os.path.join(dst_path, top_level_dirs.pop())

# os.remove(artifact.get_filename(dst_path))

logging.debug(f"Unzipped {artifact.get_filename(dst_path)} to {top_level_dir}\n")

logging.debug(f"Unzipped {zip_file} to {top_level_dir}")
return top_level_dir
Loading