-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathgithub_query_helper.py
More file actions
323 lines (261 loc) · 10.1 KB
/
github_query_helper.py
File metadata and controls
323 lines (261 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# github_global_scan.py (rate-limit aware)
import os
import time
import requests
import sys
from datetime import datetime
from typing import Optional, Dict, Any, List, Set
from heuristic import Heuristic, _match_pattern
GITHUB_API = "https://api.github.com"
# --- Add these near the top of github_global_scan.py ---
RATE_LIMIT_LOGGING = True # default on
def set_rate_limit_logging(on: bool):
global RATE_LIMIT_LOGGING
RATE_LIMIT_LOGGING = bool(on)
def _print_once(line: str):
if RATE_LIMIT_LOGGING:
print(line, flush=True)
def _countdown(wait_seconds: int, *, label: str, end_ts: int | None = None):
"""Text progress bar with ETA; updates ~1s."""
if not RATE_LIMIT_LOGGING or wait_seconds <= 0:
time.sleep(max(0, wait_seconds))
return
total = int(wait_seconds)
start = time.time()
width = 30 # bar width
while True:
now = time.time()
elapsed = int(now - start)
remaining = max(0, total - elapsed)
# When we know the reset timestamp, prefer that for accuracy
if end_ts:
remaining = max(0, int(end_ts - now))
filled = int(((total - remaining) / max(1, total)) * width)
bar = "█" * filled + "░" * (width - filled)
mm, ss = divmod(remaining, 60)
sys.stdout.write(f"\r[{bar}] {label} — resumes in {mm:02d}:{ss:02d}")
sys.stdout.flush()
if remaining <= 0:
break
time.sleep(1)
sys.stdout.write("\n")
sys.stdout.flush()
# ---------- Utility
def _auth_headers(
token: Optional[str], *, accept: Optional[str] = None
) -> Dict[str, str]:
headers = {
"User-Agent": "global-heuristic-scanner/1.0",
"Accept": accept or "application/vnd.github+json",
}
if token:
headers["Authorization"] = f"Bearer {token}"
return headers
def _parse_link_header(link_header: str) -> Dict[str, str]:
links = {}
if not link_header:
return links
for part in link_header.split(","):
segs = [s.strip() for s in part.split(";")]
if len(segs) >= 2:
url = segs[0].lstrip("<").rstrip(">")
rel = segs[1].split("=")[-1].strip('"')
links[rel] = url
return links
# --- Replace your existing helpers with these versions ---
def _wait_for_rate_limit_reset(resp: requests.Response):
"""
Pause until GitHub says we can resume.
- Primary limit: uses X-RateLimit-Reset.
- Secondary/abuse: uses Retry-After (seconds).
"""
# Handle Retry-After (secondary/abuse) first if present
retry_after = resp.headers.get("Retry-After")
if retry_after:
try:
secs = int(retry_after)
except ValueError:
secs = 60 # fallback
_print_once(f"[RateLimit] Secondary limit (Retry-After={secs}s). Waiting…")
_countdown(secs, label="Secondary limit")
return
# Primary rate limit
remaining = int(resp.headers.get("X-RateLimit-Remaining", "1") or "1")
if remaining > 0:
return # nothing to do
reset_ts = int(resp.headers.get("X-RateLimit-Reset", "0") or "0")
now = int(time.time())
wait = max(0, reset_ts - now) + 2 # +2s safety
reset_at = (
time.strftime("%H:%M:%S", time.localtime(reset_ts)) if reset_ts else "unknown"
)
_print_once(
f"[RateLimit] Primary limit hit. Resets at {reset_at}. Waiting {wait}s…"
)
_countdown(wait, label="Primary limit", end_ts=reset_ts if reset_ts else None)
def _paged_get(url: str, headers: Dict[str, str], params: Dict[str, Any] | None = None):
"""
Yield responses across pages, auto-pausing on rate limits
(both primary and secondary).
"""
while True:
resp = requests.get(url, headers=headers, params=params)
# 403 can be either primary or secondary limits
if resp.status_code == 403 and (
"rate limit" in resp.text.lower() or "abuse detection" in resp.text.lower()
):
_wait_for_rate_limit_reset(resp)
# Retry same page/URL after we waited
continue
# Some enterprise setups send 429 for secondary limits
if resp.status_code == 429:
_wait_for_rate_limit_reset(resp)
continue
resp.raise_for_status()
yield resp
# Check remaining allowance and pause if necessary before next page
_wait_for_rate_limit_reset(resp)
links = _parse_link_header(resp.headers.get("Link", ""))
if "next" not in links:
break
url, params = links["next"], None # follow absolute next link
def _iso_date(d: Optional[datetime]) -> Optional[str]:
return None if d is None else d.date().isoformat()
def _needs_glob(pat: str) -> bool:
return any(ch in pat for ch in "*?[]")
# ---------- Global SEARCH: files (code)
def search_files_globally(
heuristic: Heuristic, token: Optional[str]
) -> List[Dict[str, Any]]:
results: Dict[str, Dict[str, Any]] = {}
for pat in heuristic.files:
if _needs_glob(pat):
q = f"path:{pat}"
else:
q = f'filename:"{pat}"'
params = {"q": q, "per_page": 100}
headers = _auth_headers(token, accept="application/vnd.github.text-match+json")
for resp in _paged_get(f"{GITHUB_API}/search/code", headers, params):
payload = resp.json()
for item in payload.get("items", []):
path = item.get("path") or ""
if any(_match_pattern(p, path) for p in heuristic.files):
key = f"{item['repository']['full_name']}::{path}"
results[key] = {
"repo": item["repository"]["full_name"],
"path": path,
"html_url": item.get("html_url"),
"score": item.get("score"),
}
return list(results.values())
# ---------- Global SEARCH: commits
def _commit_queries_from_heuristic(h: Heuristic) -> List[str]:
queries: Set[str] = set()
for p in h.commit_message_prefix:
if p.strip():
queries.add(f'"{p.strip()}"')
for name in h.author_names:
if name.strip():
queries.add(f'"{name.strip()}"')
for mail in h.author_mails:
if mail.strip():
queries.add(f'"{mail.strip()}"')
if not queries:
queries.add("fix")
return sorted(queries)
def search_commits_globally(
heuristic: Heuristic, token: Optional[str]
) -> List[Dict[str, Any]]:
since = _iso_date(heuristic.period_start)
until = _iso_date(heuristic.period_end) if heuristic.period_end else None
headers = _auth_headers(token, accept="application/vnd.github.cloak-preview+json")
matches: Dict[str, Dict[str, Any]] = {}
for qbase in _commit_queries_from_heuristic(heuristic):
q = qbase
if since or until:
if since and until:
q = f"{q} author-date:{since}..{until}"
elif since:
q = f"{q} author-date:>={since}"
else:
q = f"{q} author-date:<={until}"
params = {"q": q, "per_page": 100, "sort": "committer-date", "order": "desc"}
for resp in _paged_get(f"{GITHUB_API}/search/commits", headers, params):
payload = resp.json()
for item in payload.get("items", []):
commit = item.get("commit", {})
msg = commit.get("message") or ""
author = commit.get("author") or {}
author_login = (item.get("author") or {}).get("login") or ""
author_identity = " ".join(
[
author_login,
author.get("name") or "",
author.get("email") or "",
]
).strip()
if heuristic.match_commit(msg, author_identity):
key = item.get("sha")
if key:
matches[key] = {
"repo": item["repository"]["full_name"],
"sha": item.get("sha"),
"html_url": item.get("html_url"),
"author_login": author_login or None,
"author_name": author.get("name") or None,
"author_email": author.get("email") or None,
"date": author.get("date"),
"message": msg,
}
return list(matches.values())
# ---------- Global Scan
def global_scan(
heuristic: Heuristic,
incl_files: bool = True,
incl_commits: bool = True,
token: Optional[str] = None,
) -> Dict[str, Any]:
files = []
commits = []
repos_from_files = set()
repos_from_commits = set()
if incl_files:
print("files...")
files = search_files_globally(heuristic, token)
repos_from_files = {f["repo"] for f in files}
if incl_commits:
print("commits...")
commits = search_commits_globally(heuristic, token)
repos_from_commits = {c["repo"] for c in commits}
seed_repos = sorted(repos_from_files | repos_from_commits)
return {
"seed_repo_count": len(seed_repos),
"files": files,
"commits": commits,
}
def count_matching(
heuristic: Heuristic, *, token: Optional[str] = None
) -> Dict[str, Any]:
global_scan(heuristic, token=token)
return {k: (len(v) if isinstance(v, list) else v) for k, v in result.items()}
# ---------- Example usage
if __name__ == "__main__":
from datetime import datetime
set_rate_limit_logging(True)
h = Heuristic(
author_names=("jane.doe",),
author_mails=("jane@example.com",),
files=("app.rb", "**/footer.rb"),
branch_name_prefix=("feature/", "re:release-\\d+"),
commit_message_prefix=("fix", "refactor"),
period_start=datetime(2024, 1, 1),
period_end=None,
)
token = os.getenv("GITHUB_TOKEN")
if token:
print("Found GitHub PAT token!")
else:
print("No GitHub PAT token found!")
result = global_scan(h, token=token)
from pprint import pprint
pprint({k: (len(v) if isinstance(v, list) else v) for k, v in result.items()})