-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathissue_writer.py
More file actions
169 lines (131 loc) · 5.19 KB
/
issue_writer.py
File metadata and controls
169 lines (131 loc) · 5.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
"""Issue creation for PR conflict detection results."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from conflict_detector import ConflictCluster, cluster_conflicts
if TYPE_CHECKING:
from github3.repos.repo import Repository
logger = logging.getLogger(__name__)
ISSUE_TAG = "<!-- pr-conflict-detector -->"
ISSUE_HEADER = (
"> **Note:** This issue was automatically generated by the "
"PR Conflict Detector action.\n\n"
)
def create_or_update_issue(
repo: Repository,
conflicts: list,
report_title: str = "PR Conflict Report",
dry_run: bool = False,
) -> str | None:
"""Create or update an issue in the repository with conflict information.
Args:
repo: A github3.py repository object.
conflicts: List of ConflictResult objects for this repository.
report_title: Title for the issue.
dry_run: If True, log what would happen but make no API calls.
Returns:
The issue URL if created/updated, None if dry_run or no conflicts.
"""
if not conflicts:
logger.info("No conflicts to report — skipping issue creation.")
return None
body = _build_issue_body(conflicts)
existing_issue = _find_existing_issue(repo, report_title)
if dry_run:
action = "update" if existing_issue else "create"
logger.info(
"Dry run: would %s issue '%s' in %s",
action,
report_title,
repo.full_name,
)
return None
if existing_issue:
existing_issue.edit(body=body)
logger.info("Updated issue %s", existing_issue.html_url)
return str(existing_issue.html_url)
new_issue = repo.create_issue(title=report_title, body=body)
logger.info("Created issue %s", new_issue.html_url)
return str(new_issue.html_url)
def _find_existing_issue(repo: Repository, title: str):
"""Search open issues for one matching the given title and hidden tag."""
for issue in repo.issues(state="open"):
if issue.title == title and ISSUE_TAG in (issue.body or ""):
return issue
return None
def _build_issue_body(conflicts: list) -> str:
"""Build the markdown body for the conflict issue."""
body = ISSUE_TAG + "\n"
body += ISSUE_HEADER
clusters = cluster_conflicts(conflicts)
for i, cluster in enumerate(clusters, 1):
if len(cluster.prs) == 2:
body += _build_pair_section(cluster.conflicts[0])
else:
body += _build_cluster_section(cluster, i)
return body
def _build_pair_section(conflict) -> str:
"""Build markdown for a simple two-PR conflict."""
pr_a_link = f"[#{conflict.pr_a.number}]({conflict.pr_a.url})"
pr_b_link = f"[#{conflict.pr_b.number}]({conflict.pr_b.url})"
authors = _format_authors(conflict)
file_parts = []
for fo in conflict.conflicting_files:
ranges = ", ".join(f"L{start}-L{end}" for start, end in fo.overlapping_ranges)
file_parts.append(f"`{fo.filename}` ({ranges})")
return (
f"**{pr_a_link}** ↔ **{pr_b_link}** — "
f"{', '.join(file_parts)} — {authors}\n\n"
)
def _build_cluster_section(cluster: ConflictCluster, index: int) -> str:
"""Build markdown for a multi-PR cluster."""
authors: set[str] = set()
pr_lines = []
for pr in cluster.prs:
pr_lines.append(f"- [#{pr.number}]({pr.url}) {pr.title}")
if pr.author:
authors.add(f"@{pr.author}")
files_str = ", ".join(f"`{f}`" for f in cluster.shared_files)
authors_str = ", ".join(sorted(authors))
section = (
f"### Cluster {index} — {len(cluster.prs)} PRs, "
f"{len(cluster.conflicts)} conflict(s)\n\n"
)
section += f"**Authors:** {authors_str}\n\n"
section += f"**Files:** {files_str}\n\n"
section += "**PRs:**\n"
section += "\n".join(pr_lines) + "\n\n"
section += "<details>\n<summary>Pairwise details</summary>\n\n"
section += "| PR A | PR B | Files | Lines | Authors |\n"
section += "|------|------|-------|-------|---------|\n"
for conflict in cluster.conflicts:
pr_a_link = (
f"[#{conflict.pr_a.number}]({conflict.pr_a.url}) {conflict.pr_a.title}"
)
pr_b_link = (
f"[#{conflict.pr_b.number}]({conflict.pr_b.url}) {conflict.pr_b.title}"
)
file_parts = []
line_parts = []
for fo in conflict.conflicting_files:
file_parts.append(f"`{fo.filename}`")
ranges = ", ".join(
f"L{start}-L{end}" for start, end in fo.overlapping_ranges
)
line_parts.append(ranges)
pair_authors = _format_authors(conflict)
section += (
f"| {pr_a_link} | {pr_b_link} "
f"| {', '.join(file_parts)} | {', '.join(line_parts)} "
f"| {pair_authors} |\n"
)
section += "\n</details>\n\n"
return section
def _format_authors(conflict) -> str:
"""Return a deduplicated, sorted string of @-mentioned authors."""
authors: set[str] = set()
if conflict.pr_a.author:
authors.add(f"@{conflict.pr_a.author}")
if conflict.pr_b.author:
authors.add(f"@{conflict.pr_b.author}")
return ", ".join(sorted(authors))