Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions .github/scripts/compose_telegram_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import html
import os

FAILURE_STATES = {"failure", "cancelled", "timed_out", "action_required"}
SUCCESS_STATES = {"success", "skipped", "neutral"}

JOB_RESULT_FIELDS = (
("backend", "BACKEND_RESULT"),
("sonar-backend", "SONAR_BACKEND_RESULT"),
)
Comment thread
igorsatsyuk marked this conversation as resolved.

SONAR_SKIP_FLAGS = {
"SONAR_BACKEND_RESULT": "SONAR_BACKEND_SKIPPED",
}
Comment thread
igorsatsyuk marked this conversation as resolved.


def normalize(value: str) -> str:
return (value or "unknown").lower()


def status_icon(status: str) -> str:
if status == "success":
return "✅"
if status in FAILURE_STATES:
return "❌"
if status in {"skipped", "neutral"}:
return "⏭️"
return "❓"


def get_actual_status(env_name: str, result: str) -> str:
skip_flag = SONAR_SKIP_FLAGS.get(env_name)
if skip_flag and normalize(os.environ.get(skip_flag, "")) == "true":
return "skipped"
return result


def overall_status() -> str:
statuses = []
for _, env_name in JOB_RESULT_FIELDS:
result = normalize(os.environ.get(env_name, "unknown"))
actual = get_actual_status(env_name, result)
statuses.append(actual)

if any(status in FAILURE_STATES for status in statuses):
return "failure"
if all(status in SUCCESS_STATES for status in statuses):
return "success"
return "unknown"


def esc(value: str) -> str:
return html.escape(str(value), quote=True)


def compose_message() -> str:
current_overall = overall_status()
lines = [
"jwt-demo-reactive CI finished",
"",
f"{status_icon(current_overall)} <b>Status:</b> {esc(current_overall)}",
f"<b>Branch:</b> {esc(os.environ.get('GITHUB_REF_NAME', ''))}",
f"<b>Commit:</b> {esc(os.environ.get('GITHUB_SHA', ''))}",
f"<b>Actor:</b> {esc(os.environ.get('GITHUB_ACTOR', ''))}",
f"<b>Workflow:</b> {esc(os.environ.get('GITHUB_WORKFLOW', ''))}",
"",
"<b>Job results</b>",
]

for job_name, env_name in JOB_RESULT_FIELDS:
result = normalize(os.environ.get(env_name, "unknown"))
actual = get_actual_status(env_name, result)
lines.append(f"- {status_icon(actual)} {esc(job_name)}: {esc(actual)}")

lines.append("")
lines.append(
f"Link: {esc(os.environ.get('GITHUB_SERVER_URL', ''))}/{esc(os.environ.get('GITHUB_REPOSITORY', ''))}/actions/runs/{esc(os.environ.get('GITHUB_RUN_ID', ''))}"
)

return "\n".join(lines)


def main() -> None:
message = compose_message()
output_path = os.environ["GITHUB_OUTPUT"]
with open(output_path, "a", encoding="utf-8") as output_file:
output_file.write("message<<EOF\n")
output_file.write(message)
output_file.write("\nEOF\n")


if __name__ == "__main__":
main()
298 changes: 298 additions & 0 deletions .github/scripts/sonar_quality_gate_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
#!/usr/bin/env python3
"""Wait for Sonar analysis completion and publish a detailed Quality Gate summary."""

from __future__ import annotations

import argparse
import json
import os
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path


def parse_kv_file(path: Path) -> dict[str, str]:
data: dict[str, str] = {}
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
data[key.strip()] = value.strip()
return data


def api_get_json(url: str, token: str) -> dict:
request = urllib.request.Request(url)
request.add_header("Authorization", f"Basic {build_basic_auth(token)}")
try:
with urllib.request.urlopen(request, timeout=30) as response:
return json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as error:
response_body = error.read().decode("utf-8", errors="replace")
raise RuntimeError(
f"Sonar API request failed: {url} returned HTTP {error.code}. Response: {response_body}"
) from error
except urllib.error.URLError as error:
raise RuntimeError(f"Sonar API request failed: {url}. Reason: {error.reason}") from error


def build_basic_auth(token: str) -> str:
import base64

raw = f"{token}:".encode("utf-8")
return base64.b64encode(raw).decode("ascii")


def wait_for_ce_task(ce_task_url: str, token: str, timeout_seconds: int, poll_seconds: int) -> dict:
started = time.monotonic()
while True:
payload = api_get_json(ce_task_url, token)
task = payload.get("task", {})
status = task.get("status")

if status in {"SUCCESS", "FAILED", "CANCELED"}:
return task

if (time.monotonic() - started) > timeout_seconds:
raise TimeoutError(f"Timed out waiting for Sonar CE task at {ce_task_url}")

time.sleep(poll_seconds)


def build_measures_url(host_url: str, project_key: str, pull_request: str | None, branch: str | None) -> str:
metric_keys = ",".join(
[
"coverage",
"new_coverage",
"bugs",
"new_bugs",
"vulnerabilities",
"new_vulnerabilities",
"code_smells",
"new_code_smells",
"duplicated_lines_density",
"new_duplicated_lines_density",
]
)
query = {
"component": project_key,
"metricKeys": metric_keys,
}
if pull_request:
query["pullRequest"] = pull_request
elif branch:
query["branch"] = branch
return f"{host_url}/api/measures/component?{urllib.parse.urlencode(query)}"


def detect_analysis_scope() -> tuple[str | None, str | None]:
event_path = os.getenv("GITHUB_EVENT_PATH", "")
if event_path:
try:
event_payload = json.loads(Path(event_path).read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
event_payload = {}
pull_request_number = event_payload.get("pull_request", {}).get("number")
if pull_request_number is not None:
return str(pull_request_number), None

branch = (os.getenv("GITHUB_HEAD_REF") or os.getenv("GITHUB_REF_NAME") or "").strip()
return None, branch or None


def to_measure_map(payload: dict) -> dict[str, str]:
component = payload.get("component", {})
result: dict[str, str] = {}
for measure in component.get("measures", []):
result[measure.get("metric")] = measure.get("value", "-")
return result


def fetch_measures(host_url: str, project_key: str, token: str) -> dict[str, str]:
pull_request, branch = detect_analysis_scope()
scoped_url = build_measures_url(host_url, project_key, pull_request, branch)

try:
return to_measure_map(api_get_json(scoped_url, token))
except RuntimeError as scoped_error:
base_url = build_measures_url(host_url, project_key, None, None)
if base_url != scoped_url:
try:
print(
"Scoped measures query failed; retrying without branch/pullRequest context",
file=sys.stderr,
)
return to_measure_map(api_get_json(base_url, token))
except RuntimeError as base_error:
print(f"Unable to load Sonar measures: {base_error}", file=sys.stderr)
return {}

print(f"Unable to load Sonar measures: {scoped_error}", file=sys.stderr)
return {}


def append_summary(text: str) -> None:
summary_file = os.getenv("GITHUB_STEP_SUMMARY")
if not summary_file:
return
with open(summary_file, "a", encoding="utf-8") as handle:
handle.write(text)


def is_missing_new_code_metrics_only(gate_status: str, conditions: list[dict], measures: dict[str, str]) -> bool:
if gate_status != "ERROR":
return False

if measures.get("new_coverage", "-") not in {"-", "NO_VALUE", ""}:
return False

if not conditions:
return False

has_non_ok_condition = False
for condition in conditions:
status = (condition.get("status") or "").upper()
if status in {"", "OK"}:
continue

has_non_ok_condition = True
metric_key = condition.get("metricKey", "")
actual_value = condition.get("actualValue")
normalized_actual = "-" if actual_value in (None, "") else str(actual_value)

if not metric_key.startswith("new_"):
return False
if normalized_actual not in {"-", "NO_VALUE", "None"}:
return False

return has_non_ok_condition


def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--project-key", required=True)
parser.add_argument("--component-name", required=True)
parser.add_argument("--report-task-file", required=True)
parser.add_argument("--timeout-seconds", type=int, default=300)
parser.add_argument("--poll-seconds", type=int, default=5)
parser.add_argument(
"--allow-missing-new-code-metrics",
action="store_true",
help="Do not fail when Sonar returns ERROR only because new_* metrics are unavailable",
)
args = parser.parse_args()

token = os.getenv("SONAR_TOKEN", "")
if not token:
print("SONAR_TOKEN is missing", file=sys.stderr)
return 2

configured_host_url = (os.getenv("SONAR_HOST_URL") or "").strip()
report_task_path = Path(args.report_task_file)
if not report_task_path.exists():
print(f"report-task.txt not found: {report_task_path}", file=sys.stderr)
return 2

report = parse_kv_file(report_task_path)
report_server_url = (report.get("serverUrl") or "").strip()
host_url = (configured_host_url or report_server_url or "https://sonarcloud.io").rstrip("/")
ce_task_url = report.get("ceTaskUrl")
if not ce_task_url:
print("ceTaskUrl is missing in report-task.txt", file=sys.stderr)
return 2

try:
print(f"Waiting for Sonar CE task: {ce_task_url}")
task = wait_for_ce_task(ce_task_url, token, args.timeout_seconds, args.poll_seconds)
task_status = task.get("status", "UNKNOWN")
analysis_id = task.get("analysisId")

if task_status != "SUCCESS" or not analysis_id:
print(f"Sonar CE task status is {task_status}; analysisId={analysis_id}", file=sys.stderr)
return 1

qg_url = f"{host_url}/api/qualitygates/project_status?analysisId={urllib.parse.quote(analysis_id)}"
qg_payload = api_get_json(qg_url, token)
project_status = qg_payload.get("projectStatus", {})
gate_status = project_status.get("status", "NONE")
conditions = project_status.get("conditions", [])

measures = fetch_measures(host_url, args.project_key, token)
except (RuntimeError, TimeoutError) as error:
print(str(error), file=sys.stderr)
return 1
Comment thread
igorsatsyuk marked this conversation as resolved.

print(f"Quality Gate ({args.component_name}): {gate_status}")
print("Measures:")
for metric_key in [
"coverage",
"new_coverage",
"bugs",
"new_bugs",
"vulnerabilities",
"new_vulnerabilities",
"code_smells",
"new_code_smells",
"duplicated_lines_density",
"new_duplicated_lines_density",
]:
print(f" {metric_key}: {measures.get(metric_key, '-')}")

summary_lines = [
f"### SonarQube - {args.component_name}",
"",
f"- Quality Gate: **{gate_status}**",
f"- Project key: `{args.project_key}`",
"",
"| Metric | Value |",
"|---|---:|",
]
for metric_key in [
"coverage",
"new_coverage",
"bugs",
"new_bugs",
"vulnerabilities",
"new_vulnerabilities",
"code_smells",
"new_code_smells",
"duplicated_lines_density",
"new_duplicated_lines_density",
]:
summary_lines.append(f"| `{metric_key}` | {measures.get(metric_key, '-')} |")

summary_lines.extend(["", "| Condition metric | Status | Actual | Threshold |", "|---|---|---:|---:|"])
if conditions:
for condition in conditions:
summary_lines.append(
"| `{}` | {} | {} | {} |".format(
condition.get("metricKey", "-"),
condition.get("status", "-"),
condition.get("actualValue", "-"),
condition.get("errorThreshold", "-"),
)
)
else:
summary_lines.append("| `-` | - | - | - |")

summary_lines.append("\n")
append_summary("\n".join(summary_lines))

if gate_status != "OK":
if args.allow_missing_new_code_metrics and is_missing_new_code_metrics_only(gate_status, conditions, measures):
print(
"Quality Gate returned ERROR due to unavailable new-code metrics; treated as neutral for this run"
)
return 0
print("Quality Gate failed", file=sys.stderr)
return 1

return 0


if __name__ == "__main__":
raise SystemExit(main())
Loading