Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions comeit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@
from .checks.footer import Footer
from .checks.header import Header
from .commit_message import parse_commit_message
from .commit_reader import (
CommitMessage,
get_commit_hashes,
get_commit_message,
get_default_branch,
get_git_log,
)
from .logger import LogLevel, configure_logger
from .rules.rule import Component, Rule, Severity
from .rules.rule_creator import RuleCreator
Expand All @@ -23,4 +30,9 @@
LogLevel.__name__,
configure_logger.__name__,
parse_commit_message.__name__,
get_commit_hashes.__name__,
get_commit_message.__name__,
get_default_branch.__name__,
get_git_log.__name__,
CommitMessage.__name__,
]
15 changes: 0 additions & 15 deletions comeit/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,6 @@ def init_rules(
return rules


def parse_commit_message(commit_message: str):
lines = commit_message.strip().split("\n")
header = lines[0] if lines else ""
body = ""
footer = ""

if len(lines) > 1:
remaining = "\n".join(lines[1:])
parts = remaining.split("\n\n", 1)
body = parts[0].strip() if len(parts) > 0 else ""
footer = parts[1].strip() if len(parts) > 1 else ""

return header, body, footer


def create_commit_types(extra_types: list[str] = None, custom_types: list[str] = None) -> set[str]:
"""Create commit types from default types and/or custom types or extra types.

Expand Down
177 changes: 177 additions & 0 deletions comeit/commit_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import logging
import subprocess
from dataclasses import dataclass
from pathlib import Path

logger = logging.getLogger(__name__)


@dataclass
class CommitMessage:
"""Represents a commit message with its associated SHA.

Attributes:
sha (str): The SHA identifier of the commit.
message (str): The commit message associated with the SHA.
"""

sha: str
message: str

def __str__(self):
"""Returns a clear string representation of the CommitMessage."""
return (
f"{'-' * 52}\n"
f"Commit SHA: {self.sha}\n"
f"{'-' * 52}\n"
f"{self.message}\n"
f"{'-' * 52}\n"
)


def run_git_command(
git_args: list[str], repo_path: Path | None = None
) -> subprocess.CompletedProcess:
"""Run a git command in a specified repository path.

Args:
git_args (list[str]): A list of git arguments to run.
repo_path (Path | None): The path to the Git repository. If None, the command
is executed in the current directory.

Returns:
subprocess.CompletedProcess: The result of the `subprocess.run` call.

Raises:
subprocess.CalledProcessError: If the git command fails.
"""
cmd = ["git"]
if repo_path:
cmd.extend(["-C", str(repo_path)])

cmd.extend(git_args)
logger.debug(f"Running git command: `{' '.join(cmd)}`")

try:
return subprocess.run(
cmd,
capture_output=True,
text=True,
check=True,
)
except subprocess.CalledProcessError as e:
logger.error(f"Git command failed: {e.stderr}")
raise


def get_default_branch(repo_path: Path | None = None) -> str | None:
"""Retrieve the default branch name of a Git repository.

This method tries to find origin/HEAD. This might not work unless the upstream is tracked.
Most often it works, however a fallback option if this method doesn't find the default branch
can be read from a config file or resolve to `main` or `master`.

Args:
repo_path (Path | None): The path to the Git repository. If None, the command
is executed in the current directory.

Returns:
str | None: The default branch name, or None if it could not be determined.
"""
try:
result = run_git_command(["rev-parse", "--abbrev-ref", "origin/HEAD"], repo_path)
return result.stdout.strip().replace("origin/", "")
except subprocess.CalledProcessError:
logger.error("Git failed to find the default branch from origin/HEAD.")
return None


def get_git_log(from_ref: str = None, to_ref: str = None, repo_path: Path | None = None) -> str:
"""Retrieve the git log between two references.

Args:
from_ref (str | None): The starting reference (commit SHA, tag, or branch).
to_ref (str | None): The ending reference (commit SHA, tag, or branch).
repo_path (Path | None): The path to the Git repository. If None, the command
is executed in the current directory.

Returns:
str: The git log output as a string.
"""
log_format = "--format=%B%"

git_args = ["log", log_format]

if from_ref and to_ref:
git_args.append(f"{from_ref}..{to_ref}")
elif from_ref:
git_args.append(f"{from_ref}..HEAD")

result = run_git_command(git_args, repo_path)
return result.stdout


def get_commit_message(sha: str, repo_path: Path | None = None) -> CommitMessage:
"""Retrieve the commit message for a given commit SHA.

Args:
sha (str): The commit SHA for which to retrieve the message.
repo_path (Path | None): The path to the Git repository. If None, the command
is executed in the current directory.

Raises:
ValueError: If the provided SHA is invalid.
RuntimeError: If the command to retrieve the commit message fails.

Returns:
CommitMessage: An instance of CommitMessage containing the SHA and message.
"""
cmd = ["show", "-s", "--format=%B", sha]

try:
result = run_git_command(cmd, repo_path)
return CommitMessage(sha=sha, message=result.stdout.rstrip())
except subprocess.CalledProcessError as e:
logger.error(f"Failed to retrieve commit message for SHA {sha}: {e.stderr}")
raise RuntimeError(f"Error retrieving commit message for SHA '{sha}'") from e
except ValueError as e:
logger.error(f"Failed to create {CommitMessage.__name__} object: {e}")
raise ValueError(f"Invalid commit SHA: {sha}") from e


def get_commit_hashes(
from_ref: str | None = None, to_ref: str | None = None, repo_path: Path | None = None
) -> list[str]:
"""Retrieve a list of commit SHAs from `from_ref` to `to_ref` using git rev-list.

The commit on `from_ref` will not be included. To include it add `^` at the end: `from_ref^`.

The method determines the range of commits based on the provided references:
- If `from_ref` and `to_ref` are provided, it retrieves commits in the range `from_ref..to_ref`.
- If only `from_ref` is provided, it retrieves commits from `from_ref..HEAD`.
- If neither reference is provided, it retrieves `HEAD` - all commits in the repository.

Args:
from_ref (str | None): The starting commit SHA, tag, or branch (e.g., `7db61cb^`).
to_ref (str | None): The ending commit SHA, tag, or branch (e.g., `HEAD`).
repo_path (Path | None): The path to the Git repository. If None, the command
is executed in the current directory.

Returns:
list[str]: A list of commit SHAs from `from_ref` to `to_ref`.
"""
git_args = ["rev-list"]

if from_ref and to_ref:
git_args.append(f"{from_ref}..{to_ref}")
elif from_ref:
git_args.append(f"{from_ref}..HEAD")
else:
git_args.append("HEAD")

try:
result: str = run_git_command(git_args, repo_path)
# Split the output into a list of commit hashes
return result.stdout.strip().split("\n")
except subprocess.CalledProcessError:
return []
169 changes: 169 additions & 0 deletions comeit/tests/test_commit_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import pytest
import subprocess
from pathlib import Path
from comeit import get_commit_hashes, get_commit_message, get_default_branch

DEFAULT_BRANCH = "main"


@pytest.fixture
def init_git_repo(tmp_path: Path):
"""Fixture to initialize a temporary Git repository."""
repo_path = tmp_path / "test_repo"
repo_path.mkdir()

# Set user config
subprocess.run(["git", "-C", str(repo_path), "config", "user.name", "Test User"], check=True)
subprocess.run(
["git", "-C", str(repo_path), "config", "user.email", "test@example.com"], check=True
)

# Initialize a new git repository
subprocess.run(
["git", "-C", str(repo_path), "init", f"--initial-branch={DEFAULT_BRANCH}"], check=True
)
subprocess.run(["git", "-C", str(repo_path), "branch", "-M", "main"], check=True)
subprocess.run(
["git", "-C", str(repo_path), "commit", "--allow-empty", "-m", "chore: initial commit"],
check=True,
)

yield repo_path


def run_git(repo_path: Path, *args):
"""Helper to run git commands in the repo."""
subprocess.run(["git", "-C", str(repo_path), *args], check=True)


def git_commit(repo_path: Path, message: str):
run_git(repo_path, "commit", "--allow-empty", "-m", message)


def git_create_branch(repo_path: Path, branch: str):
run_git(repo_path, "branch", branch)


def git_switch_branch(repo_path: Path, branch: str):
run_git(repo_path, "checkout", branch)


def git_log(repo_path: Path):
run_git(repo_path, "log")


@pytest.mark.parametrize("from_ref, to_ref", [(DEFAULT_BRANCH, "HEAD"), (DEFAULT_BRANCH, None)])
def test_commits_on_current_branch(init_git_repo: Path, from_ref: str, to_ref: str | None):
"""Test that commit ranges between SHAs are properly retrieved.

Verifies from default branch `main..HEAD` which means the current checked out branch.
"""
repo_path = init_git_repo

test_branch = "test-branch"
git_create_branch(repo_path, test_branch)
git_switch_branch(repo_path, test_branch)

first_commit = "feat: first commit"
second_commit = "feat: second commit\n\nThis is a crazy body that is a bit long"
git_commit(repo_path, first_commit)
git_commit(repo_path, second_commit)

# Get the commit hashes in the range
commit_hashes = get_commit_hashes(from_ref=from_ref, to_ref=to_ref, repo_path=repo_path)

assert len(commit_hashes) == 2, f"Expected 2 commits, but got {len(commit_hashes)}"

# Retrieve and assert the commit messages
expected_messages = [second_commit, first_commit]
commit_messages = [get_commit_message(sha=sha, repo_path=repo_path) for sha in commit_hashes]

for commit, expected_message in zip(commit_messages, expected_messages):
assert (
commit.message == expected_message
), f"{expected_message=}, but got '{commit.message=}'"

@pytest.mark.parametrize("sha_start, sha_end, expected_count", [
("HEAD~3", "HEAD", 3),
("HEAD~1", "HEAD", 1),
("HEAD^", "HEAD", 1),
(DEFAULT_BRANCH, "test-branch", 3),
(DEFAULT_BRANCH, None, 3),
("1.0.0", "HEAD", 3),
],
ids=[
"HEAD~3..HEAD",
"HEAD~1..HEAD",
"HEAD^..HEAD",
"main..test-branch",
"main..HEAD",
"1.0.0..HEAD"
]
)
def test_commit_ranges_with_sha(init_git_repo: Path, sha_start: str, sha_end: str, expected_count: int):
"""Test that commit ranges using SHAs are properly retrieved."""
repo_path = init_git_repo

# Create a tag for the first commit
run_git(repo_path, "tag", "1.0.0", "HEAD") # Tag the second commit

test_branch = "test-branch"
git_create_branch(repo_path, test_branch)
git_switch_branch(repo_path, test_branch)

# Create multiple commits
git_commit(repo_path, "feat: first commit")
git_commit(repo_path, "feat: second commit")
git_commit(repo_path, "feat: third commit")


# Get the commit hashes in the range
commit_hashes = get_commit_hashes(from_ref=sha_start, to_ref=sha_end, repo_path=repo_path)

assert len(commit_hashes) == expected_count, (
f"Expected {expected_count} commits from {sha_start} to {sha_end}, but got {len(commit_hashes)}"
)

# Retrieve and assert the commit messages if there are expected commits
if expected_count > 0:
commit_messages = [get_commit_message(sha=sha, repo_path=repo_path) for sha in commit_hashes]
for commit in commit_messages:
assert commit.message.startswith("feat:"), "Commit message should start with 'feat:'"

@pytest.mark.parametrize("sha_start, sha_end, expected_count", [
("HEAD~3", "HEAD", 3),
("HEAD~1", "HEAD", 1),
("HEAD^", "HEAD", 1),
(DEFAULT_BRANCH, "test-branch", 0),
("1.0.0", "HEAD", 3),
(None, None, 4) # Don't pass in any sha's
],
ids=[
"HEAD~3..HEAD",
"HEAD~1..HEAD",
"HEAD^..HEAD",
"main..test-branch",
"1.0.0..HEAD",
"HEAD"
]
)
def test_commit_ranges_on_main_branch(init_git_repo: Path, sha_start: str, sha_end: str, expected_count: int):
"""Test that commit ranges using SHAs and tags are properly retrieved from the main branch."""
repo_path = init_git_repo

# Create a tag for the initial commit
run_git(repo_path, "tag", "1.0.0", "HEAD") # Tag the initial commit

git_commit(repo_path, "feat: first commit")
git_commit(repo_path, "feat: second commit")
git_commit(repo_path, "feat: third commit")

# Get the commit hashes in the range
if sha_start is None and sha_end is None:
commit_hashes = get_commit_hashes(repo_path=repo_path)
else:
commit_hashes = get_commit_hashes(from_ref=sha_start, to_ref=sha_end, repo_path=repo_path)

assert len(commit_hashes) == expected_count, (
f"Expected {expected_count} commits from {sha_start} to {sha_end}, but got {len(commit_hashes)}"
)
Loading