-
Notifications
You must be signed in to change notification settings - Fork 0
chore(ci): align workflow structure #67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+564
−129
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
e244c89
chore(ci): align workflow structure
igorsatsyuk ffc9334
fix(ci): address copilot review comments
igorsatsyuk 912f2bb
fix(ci): address copilot follow-up comments
igorsatsyuk 5219010
fix(ci): handle missing coverage and sonar timeout
igorsatsyuk fc35036
fix(ci): pin remaining workflow actions
igorsatsyuk fdabffe
fix(ci): align sonar job label in telegram summary
igorsatsyuk b6a990f
fix(ci): scope actions write permission to backend job
igorsatsyuk File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,93 @@ | ||
| import html | ||
| import os | ||
|
|
||
| FAILURE_STATES = {"failure", "cancelled", "timed_out", "action_required"} | ||
| SUCCESS_STATES = {"success", "skipped", "neutral"} | ||
|
|
||
| JOB_RESULT_FIELDS = ( | ||
| ("backend", "BACKEND_RESULT"), | ||
| ("sonar-backend", "SONAR_BACKEND_RESULT"), | ||
| ) | ||
|
|
||
| SONAR_SKIP_FLAGS = { | ||
| "SONAR_BACKEND_RESULT": "SONAR_BACKEND_SKIPPED", | ||
| } | ||
|
igorsatsyuk marked this conversation as resolved.
|
||
|
|
||
|
|
||
| def normalize(value: str) -> str: | ||
| return (value or "unknown").lower() | ||
|
|
||
|
|
||
| def status_icon(status: str) -> str: | ||
| if status == "success": | ||
| return "✅" | ||
| if status in FAILURE_STATES: | ||
| return "❌" | ||
| if status in {"skipped", "neutral"}: | ||
| return "⏭️" | ||
| return "❓" | ||
|
|
||
|
|
||
| def get_actual_status(env_name: str, result: str) -> str: | ||
| skip_flag = SONAR_SKIP_FLAGS.get(env_name) | ||
| if skip_flag and normalize(os.environ.get(skip_flag, "")) == "true": | ||
| return "skipped" | ||
| return result | ||
|
|
||
|
|
||
| def overall_status() -> str: | ||
| statuses = [] | ||
| for _, env_name in JOB_RESULT_FIELDS: | ||
| result = normalize(os.environ.get(env_name, "unknown")) | ||
| actual = get_actual_status(env_name, result) | ||
| statuses.append(actual) | ||
|
|
||
| if any(status in FAILURE_STATES for status in statuses): | ||
| return "failure" | ||
| if all(status in SUCCESS_STATES for status in statuses): | ||
| return "success" | ||
| return "unknown" | ||
|
|
||
|
|
||
| def esc(value: str) -> str: | ||
| return html.escape(str(value), quote=True) | ||
|
|
||
|
|
||
| def compose_message() -> str: | ||
| current_overall = overall_status() | ||
| lines = [ | ||
| "jwt-demo-reactive CI finished", | ||
| "", | ||
| f"{status_icon(current_overall)} <b>Status:</b> {esc(current_overall)}", | ||
| f"<b>Branch:</b> {esc(os.environ.get('GITHUB_REF_NAME', ''))}", | ||
| f"<b>Commit:</b> {esc(os.environ.get('GITHUB_SHA', ''))}", | ||
| f"<b>Actor:</b> {esc(os.environ.get('GITHUB_ACTOR', ''))}", | ||
| f"<b>Workflow:</b> {esc(os.environ.get('GITHUB_WORKFLOW', ''))}", | ||
| "", | ||
| "<b>Job results</b>", | ||
| ] | ||
|
|
||
| for job_name, env_name in JOB_RESULT_FIELDS: | ||
| result = normalize(os.environ.get(env_name, "unknown")) | ||
| actual = get_actual_status(env_name, result) | ||
| lines.append(f"- {status_icon(actual)} {esc(job_name)}: {esc(actual)}") | ||
|
|
||
| lines.append("") | ||
| lines.append( | ||
| f"Link: {esc(os.environ.get('GITHUB_SERVER_URL', ''))}/{esc(os.environ.get('GITHUB_REPOSITORY', ''))}/actions/runs/{esc(os.environ.get('GITHUB_RUN_ID', ''))}" | ||
| ) | ||
|
|
||
| return "\n".join(lines) | ||
|
|
||
|
|
||
| def main() -> None: | ||
| message = compose_message() | ||
| output_path = os.environ["GITHUB_OUTPUT"] | ||
| with open(output_path, "a", encoding="utf-8") as output_file: | ||
| output_file.write("message<<EOF\n") | ||
| output_file.write(message) | ||
| output_file.write("\nEOF\n") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,298 @@ | ||
| #!/usr/bin/env python3 | ||
| """Wait for Sonar analysis completion and publish a detailed Quality Gate summary.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import argparse | ||
| import json | ||
| import os | ||
| import sys | ||
| import time | ||
| import urllib.error | ||
| import urllib.parse | ||
| import urllib.request | ||
| from pathlib import Path | ||
|
|
||
|
|
||
| def parse_kv_file(path: Path) -> dict[str, str]: | ||
| data: dict[str, str] = {} | ||
| for raw_line in path.read_text(encoding="utf-8").splitlines(): | ||
| line = raw_line.strip() | ||
| if not line or line.startswith("#") or "=" not in line: | ||
| continue | ||
| key, value = line.split("=", 1) | ||
| data[key.strip()] = value.strip() | ||
| return data | ||
|
|
||
|
|
||
| def api_get_json(url: str, token: str) -> dict: | ||
| request = urllib.request.Request(url) | ||
| request.add_header("Authorization", f"Basic {build_basic_auth(token)}") | ||
| try: | ||
| with urllib.request.urlopen(request, timeout=30) as response: | ||
| return json.loads(response.read().decode("utf-8")) | ||
| except urllib.error.HTTPError as error: | ||
| response_body = error.read().decode("utf-8", errors="replace") | ||
| raise RuntimeError( | ||
| f"Sonar API request failed: {url} returned HTTP {error.code}. Response: {response_body}" | ||
| ) from error | ||
| except urllib.error.URLError as error: | ||
| raise RuntimeError(f"Sonar API request failed: {url}. Reason: {error.reason}") from error | ||
|
|
||
|
|
||
| def build_basic_auth(token: str) -> str: | ||
| import base64 | ||
|
|
||
| raw = f"{token}:".encode("utf-8") | ||
| return base64.b64encode(raw).decode("ascii") | ||
|
|
||
|
|
||
| def wait_for_ce_task(ce_task_url: str, token: str, timeout_seconds: int, poll_seconds: int) -> dict: | ||
| started = time.monotonic() | ||
| while True: | ||
| payload = api_get_json(ce_task_url, token) | ||
| task = payload.get("task", {}) | ||
| status = task.get("status") | ||
|
|
||
| if status in {"SUCCESS", "FAILED", "CANCELED"}: | ||
| return task | ||
|
|
||
| if (time.monotonic() - started) > timeout_seconds: | ||
| raise TimeoutError(f"Timed out waiting for Sonar CE task at {ce_task_url}") | ||
|
|
||
| time.sleep(poll_seconds) | ||
|
|
||
|
|
||
| def build_measures_url(host_url: str, project_key: str, pull_request: str | None, branch: str | None) -> str: | ||
| metric_keys = ",".join( | ||
| [ | ||
| "coverage", | ||
| "new_coverage", | ||
| "bugs", | ||
| "new_bugs", | ||
| "vulnerabilities", | ||
| "new_vulnerabilities", | ||
| "code_smells", | ||
| "new_code_smells", | ||
| "duplicated_lines_density", | ||
| "new_duplicated_lines_density", | ||
| ] | ||
| ) | ||
| query = { | ||
| "component": project_key, | ||
| "metricKeys": metric_keys, | ||
| } | ||
| if pull_request: | ||
| query["pullRequest"] = pull_request | ||
| elif branch: | ||
| query["branch"] = branch | ||
| return f"{host_url}/api/measures/component?{urllib.parse.urlencode(query)}" | ||
|
|
||
|
|
||
| def detect_analysis_scope() -> tuple[str | None, str | None]: | ||
| event_path = os.getenv("GITHUB_EVENT_PATH", "") | ||
| if event_path: | ||
| try: | ||
| event_payload = json.loads(Path(event_path).read_text(encoding="utf-8")) | ||
| except (OSError, json.JSONDecodeError): | ||
| event_payload = {} | ||
| pull_request_number = event_payload.get("pull_request", {}).get("number") | ||
| if pull_request_number is not None: | ||
| return str(pull_request_number), None | ||
|
|
||
| branch = (os.getenv("GITHUB_HEAD_REF") or os.getenv("GITHUB_REF_NAME") or "").strip() | ||
| return None, branch or None | ||
|
|
||
|
|
||
| def to_measure_map(payload: dict) -> dict[str, str]: | ||
| component = payload.get("component", {}) | ||
| result: dict[str, str] = {} | ||
| for measure in component.get("measures", []): | ||
| result[measure.get("metric")] = measure.get("value", "-") | ||
| return result | ||
|
|
||
|
|
||
| def fetch_measures(host_url: str, project_key: str, token: str) -> dict[str, str]: | ||
| pull_request, branch = detect_analysis_scope() | ||
| scoped_url = build_measures_url(host_url, project_key, pull_request, branch) | ||
|
|
||
| try: | ||
| return to_measure_map(api_get_json(scoped_url, token)) | ||
| except RuntimeError as scoped_error: | ||
| base_url = build_measures_url(host_url, project_key, None, None) | ||
| if base_url != scoped_url: | ||
| try: | ||
| print( | ||
| "Scoped measures query failed; retrying without branch/pullRequest context", | ||
| file=sys.stderr, | ||
| ) | ||
| return to_measure_map(api_get_json(base_url, token)) | ||
| except RuntimeError as base_error: | ||
| print(f"Unable to load Sonar measures: {base_error}", file=sys.stderr) | ||
| return {} | ||
|
|
||
| print(f"Unable to load Sonar measures: {scoped_error}", file=sys.stderr) | ||
| return {} | ||
|
|
||
|
|
||
| def append_summary(text: str) -> None: | ||
| summary_file = os.getenv("GITHUB_STEP_SUMMARY") | ||
| if not summary_file: | ||
| return | ||
| with open(summary_file, "a", encoding="utf-8") as handle: | ||
| handle.write(text) | ||
|
|
||
|
|
||
| def is_missing_new_code_metrics_only(gate_status: str, conditions: list[dict], measures: dict[str, str]) -> bool: | ||
| if gate_status != "ERROR": | ||
| return False | ||
|
|
||
| if measures.get("new_coverage", "-") not in {"-", "NO_VALUE", ""}: | ||
| return False | ||
|
|
||
| if not conditions: | ||
| return False | ||
|
|
||
| has_non_ok_condition = False | ||
| for condition in conditions: | ||
| status = (condition.get("status") or "").upper() | ||
| if status in {"", "OK"}: | ||
| continue | ||
|
|
||
| has_non_ok_condition = True | ||
| metric_key = condition.get("metricKey", "") | ||
| actual_value = condition.get("actualValue") | ||
| normalized_actual = "-" if actual_value in (None, "") else str(actual_value) | ||
|
|
||
| if not metric_key.startswith("new_"): | ||
| return False | ||
| if normalized_actual not in {"-", "NO_VALUE", "None"}: | ||
| return False | ||
|
|
||
| return has_non_ok_condition | ||
|
|
||
|
|
||
| def main() -> int: | ||
| parser = argparse.ArgumentParser() | ||
| parser.add_argument("--project-key", required=True) | ||
| parser.add_argument("--component-name", required=True) | ||
| parser.add_argument("--report-task-file", required=True) | ||
| parser.add_argument("--timeout-seconds", type=int, default=300) | ||
| parser.add_argument("--poll-seconds", type=int, default=5) | ||
| parser.add_argument( | ||
| "--allow-missing-new-code-metrics", | ||
| action="store_true", | ||
| help="Do not fail when Sonar returns ERROR only because new_* metrics are unavailable", | ||
| ) | ||
| args = parser.parse_args() | ||
|
|
||
| token = os.getenv("SONAR_TOKEN", "") | ||
| if not token: | ||
| print("SONAR_TOKEN is missing", file=sys.stderr) | ||
| return 2 | ||
|
|
||
| configured_host_url = (os.getenv("SONAR_HOST_URL") or "").strip() | ||
| report_task_path = Path(args.report_task_file) | ||
| if not report_task_path.exists(): | ||
| print(f"report-task.txt not found: {report_task_path}", file=sys.stderr) | ||
| return 2 | ||
|
|
||
| report = parse_kv_file(report_task_path) | ||
| report_server_url = (report.get("serverUrl") or "").strip() | ||
| host_url = (configured_host_url or report_server_url or "https://sonarcloud.io").rstrip("/") | ||
| ce_task_url = report.get("ceTaskUrl") | ||
| if not ce_task_url: | ||
| print("ceTaskUrl is missing in report-task.txt", file=sys.stderr) | ||
| return 2 | ||
|
|
||
| try: | ||
| print(f"Waiting for Sonar CE task: {ce_task_url}") | ||
| task = wait_for_ce_task(ce_task_url, token, args.timeout_seconds, args.poll_seconds) | ||
| task_status = task.get("status", "UNKNOWN") | ||
| analysis_id = task.get("analysisId") | ||
|
|
||
| if task_status != "SUCCESS" or not analysis_id: | ||
| print(f"Sonar CE task status is {task_status}; analysisId={analysis_id}", file=sys.stderr) | ||
| return 1 | ||
|
|
||
| qg_url = f"{host_url}/api/qualitygates/project_status?analysisId={urllib.parse.quote(analysis_id)}" | ||
| qg_payload = api_get_json(qg_url, token) | ||
| project_status = qg_payload.get("projectStatus", {}) | ||
| gate_status = project_status.get("status", "NONE") | ||
| conditions = project_status.get("conditions", []) | ||
|
|
||
| measures = fetch_measures(host_url, args.project_key, token) | ||
| except (RuntimeError, TimeoutError) as error: | ||
| print(str(error), file=sys.stderr) | ||
| return 1 | ||
|
igorsatsyuk marked this conversation as resolved.
|
||
|
|
||
| print(f"Quality Gate ({args.component_name}): {gate_status}") | ||
| print("Measures:") | ||
| for metric_key in [ | ||
| "coverage", | ||
| "new_coverage", | ||
| "bugs", | ||
| "new_bugs", | ||
| "vulnerabilities", | ||
| "new_vulnerabilities", | ||
| "code_smells", | ||
| "new_code_smells", | ||
| "duplicated_lines_density", | ||
| "new_duplicated_lines_density", | ||
| ]: | ||
| print(f" {metric_key}: {measures.get(metric_key, '-')}") | ||
|
|
||
| summary_lines = [ | ||
| f"### SonarQube - {args.component_name}", | ||
| "", | ||
| f"- Quality Gate: **{gate_status}**", | ||
| f"- Project key: `{args.project_key}`", | ||
| "", | ||
| "| Metric | Value |", | ||
| "|---|---:|", | ||
| ] | ||
| for metric_key in [ | ||
| "coverage", | ||
| "new_coverage", | ||
| "bugs", | ||
| "new_bugs", | ||
| "vulnerabilities", | ||
| "new_vulnerabilities", | ||
| "code_smells", | ||
| "new_code_smells", | ||
| "duplicated_lines_density", | ||
| "new_duplicated_lines_density", | ||
| ]: | ||
| summary_lines.append(f"| `{metric_key}` | {measures.get(metric_key, '-')} |") | ||
|
|
||
| summary_lines.extend(["", "| Condition metric | Status | Actual | Threshold |", "|---|---|---:|---:|"]) | ||
| if conditions: | ||
| for condition in conditions: | ||
| summary_lines.append( | ||
| "| `{}` | {} | {} | {} |".format( | ||
| condition.get("metricKey", "-"), | ||
| condition.get("status", "-"), | ||
| condition.get("actualValue", "-"), | ||
| condition.get("errorThreshold", "-"), | ||
| ) | ||
| ) | ||
| else: | ||
| summary_lines.append("| `-` | - | - | - |") | ||
|
|
||
| summary_lines.append("\n") | ||
| append_summary("\n".join(summary_lines)) | ||
|
|
||
| if gate_status != "OK": | ||
| if args.allow_missing_new_code_metrics and is_missing_new_code_metrics_only(gate_status, conditions, measures): | ||
| print( | ||
| "Quality Gate returned ERROR due to unavailable new-code metrics; treated as neutral for this run" | ||
| ) | ||
| return 0 | ||
| print("Quality Gate failed", file=sys.stderr) | ||
| return 1 | ||
|
|
||
| return 0 | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| raise SystemExit(main()) | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.