Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 163 additions & 71 deletions pr_comment.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,36 @@
"""PR comment notifications for conflict detection."""

import logging
from collections import defaultdict
from dataclasses import dataclass
from typing import Any

from conflict_detector import ConflictResult
from conflict_detector import ConflictResult, FileOverlap, PRInfo

logger = logging.getLogger(__name__)

# Bot signature to identify our comments
COMMENT_SIGNATURE = "<!-- pr-conflict-detector-bot -->"


@dataclass
class ConflictEntry:
"""A single conflict entry for a PR, pairing the other PR with overlapping files."""

other_pr: PRInfo
files: list[FileOverlap]


def post_pr_comments(
conflicts_by_repo: dict[str, list[ConflictResult]],
github_connection: Any,
dry_run: bool = False,
) -> bool:
"""Post comments on PRs about detected conflicts.
"""Post consolidated comments on PRs about detected conflicts.

Only posts comments for conflicts that don't already have a comment.
Groups all conflicts for a given PR into a single comment. If an existing
bot comment is found on the PR, it is updated in place instead of creating
a new one.

Args:
conflicts_by_repo: Dict mapping repo names to conflict results
Expand All @@ -34,7 +46,7 @@ def post_pr_comments(

all_success = True
total_comments = 0
skipped_duplicates = 0
total_updates = 0

for repo_name, conflicts in conflicts_by_repo.items():
if not conflicts:
Expand All @@ -43,106 +55,146 @@ def post_pr_comments(
owner, repo = repo_name.split("/")
repo_obj = github_connection.repository(owner, repo)

for conflict in conflicts:
# Check both PRs in the conflict
for pr_info in [conflict.pr_a, conflict.pr_b]:
other_pr = conflict.pr_b if pr_info == conflict.pr_a else conflict.pr_a

# Check if we've already commented on this PR about this specific conflict
if _has_existing_comment(repo_obj, pr_info.number, other_pr.number):
logger.info(
"Skipping duplicate comment on %s#%s (conflict with #%s already commented)",
repo_name,
pr_info.number,
other_pr.number,
)
skipped_duplicates += 1
continue

# Build comment body
comment_body = _build_comment(conflict, pr_info, other_pr)

if dry_run:
logger.info(
"DRY RUN: Would comment on %s#%s:\n%s",
repo_name,
pr_info.number,
comment_body,
)
# Group conflicts by PR number so each PR gets one consolidated comment
pr_conflicts = _group_conflicts_by_pr(conflicts)

for pr_number, conflict_entries in pr_conflicts.items():
comment_body = _build_consolidated_comment(conflict_entries)

existing_comments = _find_existing_comments(repo_obj, pr_number)

if dry_run:
action = "update" if existing_comments else "create"
logger.info(
"DRY RUN: Would %s comment on %s#%s:\n%s",
action,
repo_name,
pr_number,
comment_body,
)
if existing_comments:
total_updates += 1
if len(existing_comments) > 1:
logger.info(
"DRY RUN: Would delete %s stale bot comment(s) on %s#%s",
len(existing_comments) - 1,
repo_name,
pr_number,
)
else:
total_comments += 1
continue

if existing_comments:
success = _update_comment(existing_comments[0], comment_body)
if success:
total_updates += 1
else:
all_success = False
# Clean up any stale extra bot comments (e.g. old per-conflict format)
for stale_comment in existing_comments[1:]:
_delete_comment(stale_comment)
else:
success = _post_comment(repo_obj, pr_number, comment_body)
if success:
total_comments += 1
else:
success = _post_comment(repo_obj, pr_info.number, comment_body)
if success:
total_comments += 1
else:
all_success = False
all_success = False

if total_comments > 0:
if total_comments > 0 or total_updates > 0:
logger.info(
"Posted %s PR comment(s), skipped %s duplicate(s)",
"Posted %s new comment(s), updated %s existing comment(s)",
total_comments,
skipped_duplicates,
total_updates,
)

return all_success


def _has_existing_comment(repo, pr_number: int, other_pr_number: int) -> bool:
"""Check if a comment already exists on this PR about the conflict.
def _group_conflicts_by_pr(
conflicts: list[ConflictResult],
) -> dict[int, list[ConflictEntry]]:
"""Group conflicts so each PR number maps to its list of conflicting PRs.

Each conflict pair contributes an entry to both PRs involved.

Args:
conflicts: List of ConflictResult objects.

Returns:
Dict mapping PR number to a list of ConflictEntry objects.
"""
grouped: dict[int, list[ConflictEntry]] = defaultdict(list)
for conflict in conflicts:
grouped[conflict.pr_a.number].append(
ConflictEntry(other_pr=conflict.pr_b, files=conflict.conflicting_files)
)
grouped[conflict.pr_b.number].append(
ConflictEntry(other_pr=conflict.pr_a, files=conflict.conflicting_files)
)
return dict(grouped)


def _find_existing_comments(repo: Any, pr_number: int) -> list[Any]:
"""Find all existing bot comments on the PR.

Returns all comments matching the bot signature, ordered by creation time.
The first element (if any) is the one to update; the rest are stale and
should be deleted during migration from per-conflict to consolidated format.

Args:
repo: GitHub repository object
pr_number: PR number to check
other_pr_number: The other PR in the conflict pair

Returns:
True if a comment already exists, False otherwise
List of comment objects with the bot signature (may be empty).
"""
try:
pr = repo.pull_request(pr_number)
for comment in pr.issue_comments():
# Check if comment has our signature and mentions the other PR
if (
COMMENT_SIGNATURE in comment.body
and f"#{other_pr_number}" in comment.body
):
return True
return False
return [c for c in pr.issue_comments() if COMMENT_SIGNATURE in c.body]
except Exception as e: # pylint: disable=broad-except
logger.warning("Failed to check existing comments on PR #%s: %s", pr_number, e)
# If we can't check, assume no comment exists to avoid blocking
return False
return []


def _build_comment(conflict: ConflictResult, current_pr, other_pr) -> str:
"""Build a comment body for a PR conflict notification.
def _build_consolidated_comment(conflict_entries: list[ConflictEntry]) -> str:
"""Build a single consolidated comment listing all conflicts for a PR.

Args:
conflict: The conflict result
current_pr: PRInfo for the PR being commented on (unused but kept for consistency)
other_pr: PRInfo for the other PR in the conflict
conflict_entries: List of ConflictEntry objects.

Returns:
Formatted comment body
Formatted comment body with a table of all conflicting PRs.
"""
_ = current_pr # Explicitly mark as unused
files_list = "\n".join(
f"- `{fo.filename}` (lines: {_format_ranges(fo.overlapping_ranges)})"
for fo in conflict.conflicting_files
)
count = len(conflict_entries)

table_rows = []
authors: list[str] = []
for entry in conflict_entries:
if entry.other_pr.author not in authors:
authors.append(entry.other_pr.author)

file_details = ", ".join(
f"`{fo.filename}` ({_format_ranges(fo.overlapping_ranges)})"
for fo in entry.files
)
table_rows.append(
f"| [#{entry.other_pr.number}]({entry.other_pr.url}) ({entry.other_pr.title}) | {file_details} |"
)

table = "\n".join(table_rows)
author_mentions = ", ".join(f"@{a}" for a in authors)

comment = f"""{COMMENT_SIGNATURE}
## ⚠️ Potential Merge Conflict Detected
## ⚠️ Potential Merge Conflicts Detected

This PR may conflict with [#{other_pr.number}]({other_pr.url}) ({other_pr.title}).
This PR may conflict with **{count}** other PR(s):

### Conflicting Files
{files_list}
| Conflicting PR | Conflicting Files (Lines) |
|---|---|
{table}

### What to do
- Review the overlapping changes in the files above
- Coordinate with @{other_pr.author} to resolve conflicts
- Consider rebasing or merging to test compatibility
**What to do:** Review the overlapping changes and coordinate with {author_mentions} to resolve conflicts.

This is an automated notification from [pr-conflict-detector](https://github.com/github-community-projects/pr-conflict-detector)."""

Expand All @@ -161,6 +213,46 @@ def _format_ranges(ranges: list[tuple[int, int]]) -> str:
return ", ".join(f"L{start}-L{end}" for start, end in ranges)


def _update_comment(comment: Any, body: str) -> bool:
"""Update an existing comment with new content.

Args:
comment: GitHub comment object to update
body: New comment body

Returns:
True if successful, False otherwise
"""
try:
comment.edit(body)
logger.info("Updated existing comment (id=%s)", comment.id)
return True
except Exception as e: # pylint: disable=broad-except
logger.error("Failed to update comment (id=%s): %s", comment.id, e)
return False


def _delete_comment(comment: Any) -> bool:
"""Delete a stale bot comment.

Used during migration from per-conflict to consolidated comment format
to clean up extra bot comments that are no longer needed.

Args:
comment: GitHub comment object to delete

Returns:
True if successful, False otherwise
"""
try:
comment.delete()
logger.info("Deleted stale bot comment (id=%s)", comment.id)
return True
except Exception as e: # pylint: disable=broad-except
logger.warning("Failed to delete stale comment (id=%s): %s", comment.id, e)
return False


def _post_comment(repo, pr_number: int, body: str) -> bool:
"""Post a comment to a pull request.

Expand Down
Loading
Loading