Skip to content

Commit aef2454

Browse files
BUILD-10745 Add audit script
1 parent 9014b55 commit aef2454

1 file changed

Lines changed: 321 additions & 0 deletions

File tree

tools/audit-action-version.py

Lines changed: 321 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
#!/usr/bin/env python3
2+
"""Audit GitHub Actions version usage across an organization.
3+
4+
Uses GitHub Code Search API (via gh CLI) to find all references to a target
5+
action across .github/ directories (workflows + composite actions) and reports
6+
repos not using an allowed version.
7+
8+
Prerequisites: gh CLI (authenticated), Python 3.6+
9+
10+
Usage:
11+
python scripts/audit-action-version.py \
12+
--org SonarSource \
13+
--action SonarSource/gh-action_cache \
14+
--allowed-refs v1,54a48984cf6564fd48f3c6c67c0891d7fe89604c \
15+
[--output report.csv] [--verbose]
16+
"""
17+
18+
from __future__ import annotations
19+
20+
import argparse
21+
import base64
22+
import csv
23+
import json
24+
import re
25+
import shutil
26+
import subprocess
27+
import sys
28+
import time
29+
from urllib.parse import quote
30+
from dataclasses import dataclass
31+
32+
33+
@dataclass
34+
class ActionRef:
35+
repo: str
36+
filepath: str
37+
line_num: int
38+
current_ref: str
39+
compliant: bool
40+
41+
42+
# ---------------------------------------------------------------------------
43+
# Logging
44+
# ---------------------------------------------------------------------------
45+
46+
_verbose = False
47+
48+
49+
def log(msg: str, *, is_debug: bool = False):
50+
"""Print to stderr. Debug messages only shown if --verbose."""
51+
if is_debug and not _verbose:
52+
return
53+
print(msg, file=sys.stderr)
54+
55+
56+
# ---------------------------------------------------------------------------
57+
# GitHub API helper
58+
# ---------------------------------------------------------------------------
59+
60+
61+
def gh_api(endpoint: str, params: dict | None = None) -> dict:
62+
"""Call GitHub API via gh CLI. Returns parsed JSON.
63+
64+
Params are passed as URL query parameters (not form body), which is
65+
required for GET endpoints like /search/code.
66+
"""
67+
if params:
68+
query_string = "&".join(
69+
f"{k}={quote(str(v), safe='')}" for k, v in params.items()
70+
)
71+
url = f"{endpoint}?{query_string}"
72+
else:
73+
url = endpoint
74+
cmd = ["gh", "api", url]
75+
result = subprocess.run(cmd, capture_output=True, text=True)
76+
if result.returncode != 0:
77+
raise RuntimeError(f"gh api {endpoint} failed: {result.stderr.strip()}")
78+
return json.loads(result.stdout)
79+
80+
81+
# ---------------------------------------------------------------------------
82+
# Code Search
83+
# ---------------------------------------------------------------------------
84+
85+
86+
def _fetch_search_page(query: str, page: int) -> dict | None:
87+
"""Fetch a single page of code search results. Returns None on failure."""
88+
try:
89+
return gh_api("search/code", {
90+
"q": query,
91+
"per_page": "100",
92+
"page": str(page),
93+
})
94+
except RuntimeError as e:
95+
log(f"Error: Search API call failed on page {page}: {e}")
96+
return None
97+
98+
99+
def _deduplicate(items: list[dict]) -> list[dict]:
100+
"""Deduplicate search results by repo+path."""
101+
seen: set[str] = set()
102+
unique: list[dict] = []
103+
for item in items:
104+
key = f"{item['repo']}:{item['path']}"
105+
if key not in seen:
106+
seen.add(key)
107+
unique.append(item)
108+
return unique
109+
110+
111+
def search_action_usage(org: str, action: str) -> list[dict]:
112+
"""Search for action usage across org's .github/ directories.
113+
114+
Returns list of {"repo": ..., "path": ...} dicts, deduplicated.
115+
"""
116+
query = f"org:{org} path:.github {action}"
117+
all_items: list[dict] = []
118+
max_pages = 10 # API cap: 1000 results = 10 pages * 100 per page
119+
120+
log(f"Searching for '{action}' in .github/ across {org}...")
121+
122+
for page in range(1, max_pages + 1):
123+
log(f" Fetching page {page}...", is_debug=True)
124+
125+
data = _fetch_search_page(query, page)
126+
if data is None:
127+
break
128+
129+
items = data.get("items", [])
130+
131+
if page == 1:
132+
log(f"Found {data.get('total_count', 0)} total matches (may include duplicates).")
133+
134+
if not items:
135+
break
136+
137+
for item in items:
138+
all_items.append({
139+
"repo": item["repository"]["full_name"],
140+
"path": item["path"],
141+
})
142+
143+
if page < max_pages:
144+
time.sleep(2) # Respect 30 req/min search rate limit
145+
else:
146+
log("Warning: Hit 1000-result API cap. Results may be incomplete.")
147+
log(" Consider narrowing the search or using a different approach.")
148+
149+
unique = _deduplicate(all_items)
150+
log(f"Found {len(unique)} unique files to inspect.")
151+
return unique
152+
153+
154+
# ---------------------------------------------------------------------------
155+
# File content fetching + version extraction
156+
# ---------------------------------------------------------------------------
157+
158+
159+
def extract_versions_from_file(
160+
repo: str, filepath: str, action: str
161+
) -> list[dict]:
162+
"""Fetch a file and extract all action version references.
163+
164+
Returns list of {"line_num": int, "ref": str} dicts.
165+
"""
166+
log(f" Fetching {repo}/{filepath}", is_debug=True)
167+
168+
try:
169+
data = gh_api(f"repos/{repo}/contents/{filepath}")
170+
except RuntimeError as e:
171+
log(f"Warning: Could not fetch {repo}/{filepath}: {e}")
172+
return []
173+
174+
content_b64 = data.get("content", "")
175+
try:
176+
content = base64.b64decode(content_b64).decode("utf-8")
177+
except Exception:
178+
log(f"Warning: Could not decode {repo}/{filepath}")
179+
return []
180+
181+
# Match "uses: owner/action[/optional/subpath]@ref" with optional quotes and whitespace
182+
pattern = re.compile(
183+
rf"uses:\s*['\"]?{re.escape(action)}[^@\s'\"#]*@([^\s'\"#]+)"
184+
)
185+
186+
results = []
187+
for line_num, line in enumerate(content.splitlines(), start=1):
188+
match = pattern.search(line)
189+
if match:
190+
results.append({"line_num": line_num, "ref": match.group(1)})
191+
192+
return results
193+
194+
195+
# ---------------------------------------------------------------------------
196+
# CLI
197+
# ---------------------------------------------------------------------------
198+
199+
200+
def parse_args() -> argparse.Namespace:
201+
parser = argparse.ArgumentParser(
202+
description="Audit GitHub Actions version usage across an organization.",
203+
)
204+
parser.add_argument("--org", required=True, help="GitHub organization to scan")
205+
parser.add_argument(
206+
"--action",
207+
required=True,
208+
help="Action to audit (e.g. SonarSource/gh-action_cache)",
209+
)
210+
parser.add_argument(
211+
"--allowed-refs",
212+
required=True,
213+
help="Comma-separated list of allowed refs (tags or SHAs)",
214+
)
215+
parser.add_argument("--output", help="Output CSV file path (default: stdout)")
216+
parser.add_argument(
217+
"--verbose", action="store_true", help="Enable debug logging",
218+
)
219+
return parser.parse_args()
220+
221+
222+
def check_prerequisites():
223+
"""Verify gh CLI is available."""
224+
if not shutil.which("gh"):
225+
print("Error: 'gh' CLI is required but not installed.", file=sys.stderr)
226+
sys.exit(1)
227+
228+
229+
# ---------------------------------------------------------------------------
230+
# Main
231+
# ---------------------------------------------------------------------------
232+
233+
234+
def main():
235+
global _verbose
236+
237+
args = parse_args()
238+
_verbose = args.verbose
239+
check_prerequisites()
240+
241+
allowed_refs = [r.strip() for r in args.allowed_refs.split(",")]
242+
243+
log(f"Auditing '{args.action}' usage across org '{args.org}'...")
244+
log(f"Allowed refs: {', '.join(allowed_refs)}")
245+
246+
# Step 1: Search for files referencing the action
247+
matched_files = search_action_usage(args.org, args.action)
248+
249+
# Step 2: Fetch each file and extract versions
250+
all_refs: list[ActionRef] = []
251+
total_files = len(matched_files)
252+
253+
log("Inspecting file contents...")
254+
255+
for i, file_info in enumerate(matched_files):
256+
repo = file_info["repo"]
257+
filepath = file_info["path"]
258+
259+
versions = extract_versions_from_file(repo, filepath, args.action)
260+
261+
for v in versions:
262+
compliant = v["ref"] in allowed_refs
263+
all_refs.append(
264+
ActionRef(
265+
repo=repo,
266+
filepath=filepath,
267+
line_num=v["line_num"],
268+
current_ref=v["ref"],
269+
compliant=compliant,
270+
)
271+
)
272+
273+
if (i + 1) % 10 == 0:
274+
log(f" Processed {i + 1}/{total_files} files...")
275+
276+
log(f"Done. Processed {total_files} files.")
277+
278+
# Step 3: Output CSV
279+
fieldnames = ["repo", "workflow_file", "line_number", "current_ref", "compliant"]
280+
281+
def write_csv(writer: csv.DictWriter):
282+
writer.writeheader()
283+
for ref in all_refs:
284+
writer.writerow(
285+
{
286+
"repo": ref.repo,
287+
"workflow_file": ref.filepath,
288+
"line_number": ref.line_num,
289+
"current_ref": ref.current_ref,
290+
"compliant": ref.compliant,
291+
}
292+
)
293+
294+
if args.output:
295+
with open(args.output, "w", newline="") as f:
296+
write_csv(csv.DictWriter(f, fieldnames=fieldnames))
297+
log(f"Report written to: {args.output}")
298+
else:
299+
write_csv(csv.DictWriter(sys.stdout, fieldnames=fieldnames))
300+
301+
# Step 4: Summary
302+
total = len(all_refs)
303+
non_compliant = [r for r in all_refs if not r.compliant]
304+
compliant_count = total - len(non_compliant)
305+
306+
log("")
307+
log("=== Audit Summary ===")
308+
log(f"Total references found: {total}")
309+
log(f"Compliant: {compliant_count}")
310+
log(f"Non-compliant: {len(non_compliant)}")
311+
312+
if non_compliant:
313+
log("")
314+
log("Non-compliant repos:")
315+
for ref in non_compliant:
316+
log(f" - {ref.repo} {ref.filepath}:{ref.line_num} @{ref.current_ref}")
317+
sys.exit(1)
318+
319+
320+
if __name__ == "__main__":
321+
main()

0 commit comments

Comments
 (0)