diff --git a/.env.example b/.env.example index ce61ce7..1467200 100644 --- a/.env.example +++ b/.env.example @@ -2,7 +2,8 @@ # Never commit real secrets. # --- Google Calendar (calendash-api.py) --- -# Required secret credentials: set either dedicated Calendar creds OR shared Google creds. +# Required secret credentials: use a Desktop OAuth client, and set either dedicated Calendar creds OR shared Google creds. +# If the Google app is in testing, add your account as a consent-screen test user. # Required (if GOOGLE_CLIENT_ID is empty): GOOGLE_CALENDAR_CLIENT_ID= # Required secret (if GOOGLE_CLIENT_SECRET is empty): @@ -27,12 +28,20 @@ ICON_IMAGE=~/zero2dash/images/calendash-icon.png CALENDASH_FONT_PATH= # OAUTH_PORT default: 8080 OAUTH_PORT=8080 -# GOOGLE_TOKEN_PATH default: token.json -GOOGLE_TOKEN_PATH=~/zero2dash/token.json +# GOOGLE_TOKEN_PATH default: token.json (relative to the working directory; systemd uses /opt/zero2dash) +GOOGLE_TOKEN_PATH=token.json # --- Pi-hole dashboards (piholestats_v1.1.py / piholestats_v1.2.py) --- # Optional host, default: 127.0.0.1 PIHOLE_HOST=192.168.1.2 +# Required for remote hosts unless PIHOLE_HOST already includes http:// or https:// +PIHOLE_SCHEME=http +# Optional TLS verification: auto/true/false. For self-signed HTTPS, set false or use PIHOLE_CA_BUNDLE. +PIHOLE_VERIFY_TLS=auto +# Optional CA bundle for HTTPS certificate validation +PIHOLE_CA_BUNDLE= +# Optional request timeout seconds, default: 4 +PIHOLE_TIMEOUT=4 # Required secret: Pi-hole admin password for v6 API auth. PIHOLE_PASSWORD=replace-with-your-pihole-password # Optional: legacy API token for /admin/api.php fallback. @@ -49,6 +58,7 @@ ACTIVE_HOURS=22,7 # Required: Album ID to pull shuffled photos from. GOOGLE_PHOTOS_ALBUM_ID=replace-with-google-photos-album-id # OAuth credentials are required by one of these methods: +# Use a Desktop OAuth client. If the Google app is in testing, add your account as a consent-screen test user. # 1) Existing client secrets file (default path shown), OR # 2) GOOGLE_PHOTOS_CLIENT_ID + GOOGLE_PHOTOS_CLIENT_SECRET values. GOOGLE_PHOTOS_CLIENT_SECRETS_PATH=~/zero2dash/client_secret.json @@ -69,4 +79,5 @@ FALLBACK_IMAGE=~/zero2dash/images/photos-fallback.png # LOGO_PATH default: /images/goo-photos-icon.png LOGO_PATH=/images/goo-photos-icon.png # OAUTH_OPEN_BROWSER default: 0 (false) +# Loopback OAuth only: complete sign-in on the same machine, or use SSH port forwarding for headless Pi setup. OAUTH_OPEN_BROWSER=0 diff --git a/README.md b/README.md index 448b6e9..f157303 100644 --- a/README.md +++ b/README.md @@ -90,18 +90,26 @@ cp /opt/zero2dash/.env.example /opt/zero2dash/.env chmod 600 /opt/zero2dash/.env ``` -Set at minimum: +Set at minimum for Pi-hole: - `PIHOLE_HOST` -- `PIHOLE_PASSWORD` -- `PIHOLE_API_TOKEN` (optional fallback) +- `PIHOLE_SCHEME` if `PIHOLE_HOST` is remote and does not already include `http://` or `https://` +- `PIHOLE_PASSWORD` for v6 session auth, or `PIHOLE_API_TOKEN` for legacy token auth +- `PIHOLE_VERIFY_TLS=false` for self-signed HTTPS, or `PIHOLE_CA_BUNDLE=/path/to/ca.pem` to verify a private CA +- `PIHOLE_TIMEOUT` - `REFRESH_SECS` - `ACTIVE_HOURS` (inclusive `start,end` hour window in 24h format; cross-midnight values like `22,7` are supported) - `FB_DEVICE` (optional override; defaults to `/dev/fb1`) - `FB_WIDTH` / `FB_HEIGHT` (optional override for static renderer geometry; defaults `320x240`) -## Run via systemd +Google OAuth notes: + +- Use Desktop OAuth clients for Calendar and Photos. +- Loopback OAuth only: complete sign-in on the same machine as the script, or tunnel the callback port from a headless Pi with `ssh -L 8080:localhost:8080 pihole@pihole`. +- If the Google consent screen is in testing, add your account as a test user. +- `calendash-api.py` defaults `GOOGLE_TOKEN_PATH` to `token.json` relative to `/opt/zero2dash` under systemd; `photos-shuffle.py` must keep using a separate `GOOGLE_TOKEN_PATH_PHOTOS`. +## Run via systemd Install and enable canonical units: ```sh diff --git a/scripts/calendash-api.py b/scripts/calendash-api.py index 8f6d865..9ad3e4e 100755 --- a/scripts/calendash-api.py +++ b/scripts/calendash-api.py @@ -53,7 +53,11 @@ def _normalize_scopes(raw_scopes: Any) -> set[str]: - parsed_scopes: set[str] = set() + if isinstance(raw_scopes, str): + return {scope for scope in raw_scopes.replace(",", " ").split() if scope} + if isinstance(raw_scopes, (list, tuple, set)): + return {str(scope) for scope in raw_scopes if scope} + return set() def _add_scope_values(raw_value: Any) -> None: if raw_value is None: @@ -456,6 +460,16 @@ def build_client_config(client_id: str, client_secret: str, oauth_port: int) -> } +def loopback_oauth_guidance(oauth_port: int) -> list[str]: + redirect_uri = expected_redirect_uri(oauth_port) + return [ + "Loopback OAuth only: complete Google sign-in on the same machine that is running this script.", + f"For a headless Pi, forward the callback port first: ssh -L {oauth_port}:localhost:{oauth_port} @", + "Use a Desktop OAuth client. If your Google app is in testing, add your account as a test user.", + f"Expected redirect URI: {redirect_uri}", + ] + + def save_credentials(creds: Credentials, token_path: Path) -> None: token_path.write_text(creds.to_json(), encoding="utf-8") os.chmod(token_path, 0o600) @@ -552,28 +566,14 @@ def get_credentials( raise RuntimeError( "Authenticated token does not include required calendar scopes. Ensure consent grants calendar.readonly access." ) - persist_auth_diagnostics( - diagnostics_path, - AuthAttemptDiagnostics(status="success", mode=auth_mode, oauth_port=oauth_port, redirect_uri=redirect_uri), - ) - except Exception as exc: - next_steps = _next_steps_for_auth_error(auth_mode, exc, redirect_uri, oauth_port) - persist_auth_diagnostics( - diagnostics_path, - AuthAttemptDiagnostics( - status="failed", - mode=auth_mode, - oauth_port=oauth_port, - redirect_uri=redirect_uri, - error_type=type(exc).__name__, - error_message=str(exc), - next_steps=next_steps, - ), - ) - logging.error("OAuth flow '%s' failed: %s", auth_mode, exc) - for step in next_steps: - logging.error("Next step: %s", step) - raise + if any(tag in exc_text for tag in ["access blocked", "app is blocked", "app restricted", "invalid_client"]): + logging.error( + "Google blocked this OAuth client. For calendar, use a Desktop OAuth client and add your account as a test user on the consent screen." + ) + logging.error("You can also set GOOGLE_CALENDAR_CLIENT_ID / GOOGLE_CALENDAR_CLIENT_SECRET to use a dedicated calendar OAuth client.") + for message in loopback_oauth_guidance(oauth_port): + logging.error(message) + raise RuntimeError("Loopback OAuth setup failed.") from exc save_credentials(creds, token_path) return creds diff --git a/scripts/photos-shuffle.py b/scripts/photos-shuffle.py index c739d04..c04f547 100755 --- a/scripts/photos-shuffle.py +++ b/scripts/photos-shuffle.py @@ -22,8 +22,10 @@ GOOGLE_TOKEN_PATH_PHOTOS if needed). - On first run, if token is missing/invalid and refresh is unavailable, the script starts a local OAuth flow and prints instructions to complete login. -- In headless environments, it does not auto-launch a browser by default; - copy the printed URL into any browser and paste back the result. +- Loopback OAuth only: complete the browser sign-in on the same machine as the + script, or use SSH port forwarding to forward the callback port from the Pi. +- Use a Desktop OAuth client. If the Google app is in testing, add your + account as a test user before first run. Fallback: - Ensure local fallback image exists at ~/zero2dash/images/photos-fallback.png @@ -174,7 +176,7 @@ def _verify_token_metadata(token_path: Path, force_token_path_reuse: bool) -> No def _normalize_scopes(raw_scopes: Any) -> set[str]: if isinstance(raw_scopes, str): - return {raw_scopes} + return {scope for scope in raw_scopes.replace(",", " ").split() if scope} if isinstance(raw_scopes, (list, tuple, set)): return {str(scope) for scope in raw_scopes if scope} return set() @@ -199,16 +201,40 @@ def debug(self, message: str) -> None: print(f"[debug] {message}") -class MediaDownloadError(RuntimeError): - pass - - -class MediaDownloadPermanentError(MediaDownloadError): - pass +def expected_redirect_uri(oauth_port: int) -> str: + return f"http://localhost:{oauth_port}/" + + +def build_client_config(client_id: str, client_secret: str, oauth_port: int) -> dict[str, Any]: + redirect_uri = expected_redirect_uri(oauth_port) + return { + "installed": { + "client_id": client_id, + "client_secret": client_secret, + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "redirect_uris": [ + "http://localhost", + "http://localhost/", + f"http://localhost:{oauth_port}", + redirect_uri, + "http://127.0.0.1", + "http://127.0.0.1/", + f"http://127.0.0.1:{oauth_port}", + redirect_uri.replace("localhost", "127.0.0.1", 1), + ], + } + } -class MediaDownloadTransientError(MediaDownloadError): - pass +def loopback_oauth_guidance(oauth_port: int) -> list[str]: + redirect_uri = expected_redirect_uri(oauth_port) + return [ + "Loopback OAuth only: complete Google sign-in on the same machine that is running this script.", + f"For a headless Pi, forward the callback port first: ssh -L {oauth_port}:localhost:{oauth_port} @", + "Use a Desktop OAuth client. If your Google app is in testing, add your account as a test user.", + f"Expected redirect URI: {redirect_uri}", + ] def _as_int(name: str, value: str) -> int: @@ -353,8 +379,12 @@ def load_config() -> Config: def authenticate(config: Config, log: Log, *, force_token_path_reuse: bool = False) -> Credentials: creds: Credentials | None = None - _preflight_token_path_guard(config.token_path, force_token_path_reuse) - _verify_token_metadata(config.token_path, force_token_path_reuse) + calendar_default_token = (DEFAULT_ROOT / "token.json").resolve() + if config.token_path.resolve() == calendar_default_token: + raise ValueError( + "GOOGLE_TOKEN_PATH_PHOTOS points to token.json, which is reserved for calendash-api.py. " + "Use a separate photos token path (default: ~/zero2dash/token_photos.json)." + ) if config.token_path.exists(): try: @@ -390,20 +420,13 @@ def authenticate(config: Config, log: Log, *, force_token_path_reuse: bool = Fal if not creds or not creds.valid: log.info("No valid Google token found; starting OAuth local server flow.") log.info( - "Follow the browser prompt to authorize Google Photos access, then return to this terminal." + "Complete Google sign-in on this machine, or use SSH port forwarding for the loopback callback." ) if config.client_secrets_path.exists(): flow = InstalledAppFlow.from_client_secrets_file(str(config.client_secrets_path), SCOPES) elif config.client_id and config.client_secret: flow = InstalledAppFlow.from_client_config( - { - "installed": { - "client_id": config.client_id, - "client_secret": config.client_secret, - "auth_uri": "https://accounts.google.com/o/oauth2/auth", - "token_uri": "https://oauth2.googleapis.com/token", - } - }, + build_client_config(config.client_id, config.client_secret, config.oauth_port), SCOPES, ) else: @@ -417,13 +440,18 @@ def authenticate(config: Config, log: Log, *, force_token_path_reuse: bool = Fal prompt="consent", authorization_prompt_message="Open this URL in your browser to authorize Google Photos access: {url}", open_browser=config.oauth_open_browser, + redirect_uri_trailing_slash=True, ) except Exception as exc: exc_text = str(exc).lower() + if "redirect_uri_mismatch" in exc_text: + log.info(f"OAuth redirect mismatch. Expected redirect URI: {expected_redirect_uri(config.oauth_port)}") if any(tag in exc_text for tag in ["access blocked", "app is blocked", "app restricted", "invalid_client"]): log.info("Google blocked this OAuth client for Photos. Use a dedicated Desktop OAuth client and add your account as a test user.") log.info("Set GOOGLE_PHOTOS_CLIENT_ID / GOOGLE_PHOTOS_CLIENT_SECRET (or GOOGLE_PHOTOS_CLIENT_SECRETS_PATH) in .env.") - raise + for message in loopback_oauth_guidance(config.oauth_port): + log.info(message) + raise RuntimeError("Loopback OAuth setup failed") from exc _write_token_atomically(config.token_path, creds.to_json()) _write_token_metadata(config.token_path, provider="google_photos", scopes=SCOPES) @@ -767,11 +795,8 @@ def parse_args() -> argparse.Namespace: ) parser.add_argument("--test", action="store_true", help="Render to /tmp/photos-shuffle-test.png instead of framebuffer") parser.add_argument("--debug", action="store_true", help="Enable verbose debug logs") - parser.add_argument( - "--check-config", - action="store_true", - help="Validate config (including credential-source precedence) and exit", - ) + parser.add_argument("--check-config", action="store_true", help="Validate env configuration and exit") + parser.add_argument("--auth-only", action="store_true", help="Run OAuth/token setup only and exit") parser.add_argument("--smoke-list-fetch", action="store_true", help="Run paginated list fetch smoke check and exit") parser.add_argument( "--force-token-path-reuse", @@ -812,6 +837,11 @@ def main() -> int: print("[photos-shuffle.py] Configuration check passed.") return 0 + if args.auth_only: + authenticate(config, log) + print("[photos-shuffle.py] Authentication check passed.") + return 0 + source_image: Path | None = None try: source_image = choose_online_image(config, log, force_token_path_reuse=args.force_token_path_reuse) diff --git a/scripts/piholestats_v1.1.py b/scripts/piholestats_v1.1.py index 014ecf3..2f225a3 100644 --- a/scripts/piholestats_v1.1.py +++ b/scripts/piholestats_v1.1.py @@ -4,7 +4,7 @@ # Version 1.1 - Introducing dark mode # LEGACY: kept for compatibility/manual use; canonical night service uses piholestats_v1.2.py -import os, sys, time, json, urllib.request, urllib.parse, mmap, struct, argparse, ssl, errno +import os, sys, time, json, urllib.request, urllib.parse, urllib.error, mmap, struct, argparse, ssl, errno from pathlib import Path from datetime import datetime from PIL import Image, ImageDraw, ImageFont @@ -23,6 +23,7 @@ PIHOLE_CA_BUNDLE = "" PIHOLE_PASSWORD = "" PIHOLE_API_TOKEN = "" +PIHOLE_AUTH_MODE = "" TITLE = "Pi-hole" # Colours COL_BG = (0, 0, 0) @@ -84,6 +85,19 @@ def _parse_verify_tls(value: str) -> str: return normalized +def _host_requires_explicit_scheme(raw_host: str) -> bool: + candidate = raw_host.strip() + if not candidate: + return False + parsed = urllib.parse.urlsplit(candidate if "://" in candidate else f"//{candidate}") + if parsed.scheme: + return False + hostname = (parsed.hostname or candidate.split("/")[0].split(":")[0]).strip("[]").lower() + if not hostname: + return False + return hostname not in {"localhost", "::1"} and not hostname.startswith("127.") + + def validate_hour_bounds(hour: int) -> None: if hour < 0 or hour > 23: raise ValueError(f"hour must be in range 0-23, got {hour}") @@ -138,7 +152,7 @@ def record(name: str, *, default=None, required=False, validator=None): scheme = record("PIHOLE_SCHEME", default="", validator=_parse_scheme) verify_tls = record("PIHOLE_VERIFY_TLS", default="auto", validator=_parse_verify_tls) ca_bundle = record("PIHOLE_CA_BUNDLE", default="") - password = record("PIHOLE_PASSWORD", required=True) + password = record("PIHOLE_PASSWORD", default="") api_token = record("PIHOLE_API_TOKEN", default="") refresh_secs = record("REFRESH_SECS", default=REFRESH_SECS, validator=_parse_int) request_timeout = record("PIHOLE_TIMEOUT", default=REQUEST_TIMEOUT, validator=_parse_float) @@ -150,6 +164,16 @@ def record(name: str, *, default=None, required=False, validator=None): errors.append("PIHOLE_TIMEOUT is invalid: must be greater than 0") if ca_bundle and not Path(str(ca_bundle)).is_file(): errors.append("PIHOLE_CA_BUNDLE is invalid: file does not exist") + if _host_requires_explicit_scheme(str(host)) and not str(scheme): + errors.append( + "Remote PIHOLE_HOST requires an explicit scheme: set PIHOLE_SCHEME=http|https or include http:// / https:// in PIHOLE_HOST" + ) + + auth_mode = _detect_auth_mode(str(password), str(api_token)) + if auth_mode is None: + errors.append( + "Auth configuration is invalid: set PIHOLE_PASSWORD for v6 session auth or PIHOLE_API_TOKEN for legacy token auth" + ) if errors: return None, errors @@ -162,6 +186,7 @@ def record(name: str, *, default=None, required=False, validator=None): "pihole_ca_bundle": str(ca_bundle), "pihole_password": str(password), "pihole_api_token": str(api_token), + "pihole_auth_mode": auth_mode, "request_timeout": float(request_timeout), "refresh_secs": int(refresh_secs), "active_hours": active_hours if isinstance(active_hours, tuple) else ACTIVE_HOURS, @@ -178,7 +203,7 @@ def parse_args(): def apply_config(config: dict[str, object], output_image: str = "") -> None: global FBDEV, PIHOLE_HOST, PIHOLE_SCHEME, PIHOLE_VERIFY_TLS, PIHOLE_CA_BUNDLE - global PIHOLE_PASSWORD, PIHOLE_API_TOKEN, REFRESH_SECS, ACTIVE_HOURS, BASE_URL + global PIHOLE_PASSWORD, PIHOLE_API_TOKEN, PIHOLE_AUTH_MODE, REFRESH_SECS, ACTIVE_HOURS, BASE_URL global REQUEST_TIMEOUT, REQUEST_TLS_VERIFY, OUTPUT_IMAGE FBDEV = str(config["fbdev"]) PIHOLE_HOST = str(config["pihole_host"]) @@ -187,6 +212,7 @@ def apply_config(config: dict[str, object], output_image: str = "") -> None: PIHOLE_CA_BUNDLE = str(config["pihole_ca_bundle"]) PIHOLE_PASSWORD = str(config["pihole_password"]) PIHOLE_API_TOKEN = str(config["pihole_api_token"]) + PIHOLE_AUTH_MODE = str(config["pihole_auth_mode"]) REQUEST_TIMEOUT = float(config["request_timeout"]) REFRESH_SECS = int(config["refresh_secs"]) ACTIVE_HOURS = config["active_hours"] @@ -365,25 +391,54 @@ def _normalize_host(raw_host: str, preferred_scheme: str = "") -> str: BASE_URL = _normalize_host(PIHOLE_HOST, preferred_scheme=PIHOLE_SCHEME) + +def _detect_auth_mode(password: str, api_token: str) -> str | None: + if password: + return "v6-session" + if api_token: + return "legacy-token" + return None + + +def _auth_failure(msg: str) -> RuntimeError: + return RuntimeError(f"AUTH_FAILURE: {msg}") + + +def _transport_failure(msg: str) -> RuntimeError: + return RuntimeError(f"TRANSPORT_FAILURE: {msg}") + + def _auth_get_sid(): global _SID, _SID_EXP if not PIHOLE_PASSWORD: - raise RuntimeError("Missing PIHOLE_PASSWORD") - js = _http_json(f"{BASE_URL}/api/auth", method="POST", - body={"password": PIHOLE_PASSWORD}, timeout=4) + raise _auth_failure("PIHOLE_PASSWORD is not configured") + try: + js = _http_json(f"{BASE_URL}/api/auth", method="POST", + body={"password": PIHOLE_PASSWORD}, timeout=4) + except urllib.error.HTTPError as exc: + if exc.code in {401, 403}: + raise _auth_failure("v6 session login rejected (check PIHOLE_PASSWORD)") from exc + raise _transport_failure(f"v6 auth HTTP error {exc.code}") from exc + except urllib.error.URLError as exc: + raise _transport_failure(f"v6 auth transport error: {exc.reason}") from exc sess = js.get("session", {}) if not sess.get("valid", False): - raise RuntimeError("Auth failed") + raise _auth_failure("v6 session response invalid (check PIHOLE_PASSWORD)") _SID = sess["sid"] _SID_EXP = time.time() + int(sess.get("validity", 1800)) - 10 return _SID + def _ensure_sid(): if _SID and time.time() < _SID_EXP: return _SID return _auth_get_sid() + def fetch_pihole(): + if PIHOLE_AUTH_MODE == "legacy-token": + return _fetch_legacy_summary() + try: sid = _ensure_sid() url = f"{BASE_URL}/api/stats/summary?sid=" + urllib.parse.quote(sid, safe="") @@ -400,25 +455,130 @@ def fetch_pihole(): "total": total, "blocked": blocked, "percent": percent, - "ok": True + "ok": True, + "status": "OK", } - except Exception: - pass + except Exception as exc: + v6_error = exc + primary_summary = _exception_summary(v6_error, "V6") + if not PIHOLE_API_TOKEN: + return { + "total": 0, + "blocked": 0, + "percent": 0.0, + "ok": False, + "status": _status_from_exception(v6_error, "AUTH ONLY"), + "failure": _failure_from_exception(v6_error, source="v6"), + } + + legacy = _fetch_legacy_summary() + if legacy["ok"]: + legacy["status"] = "LEGACY" + return legacy + + fallback_summary = legacy.get("failure", {}).get("summary") or legacy.get("status", "LEGACY FAIL") + print( + f"[piholestats_v1.1.py] Summary fetch failed. primary={primary_summary}; fallback={fallback_summary}", + file=sys.stderr, + ) + return { + "total": 0, + "blocked": 0, + "percent": 0.0, + "ok": False, + "status": f"{_status_from_exception(v6_error, 'V6')} / {legacy.get('status', 'LEGACY FAIL')}", + "failure": { + "reason": _failure_reason_from_exception(v6_error), + "summary": f"primary={primary_summary}; fallback={fallback_summary}", + "source": "v6+legacy", + "primary": primary_summary, + "fallback": fallback_summary, + }, + } + +def _fetch_legacy_summary(): try: - params = {"summaryRaw": ""} - if PIHOLE_API_TOKEN: - params["auth"] = PIHOLE_API_TOKEN + params = {"summaryRaw": "", "auth": PIHOLE_API_TOKEN} query = urllib.parse.urlencode(params) legacy = _http_json(f"{BASE_URL}/admin/api.php?{query}", timeout=4) + if str(legacy.get("status", "")).lower() == "unauthorized": + raise _auth_failure("legacy token rejected (check PIHOLE_API_TOKEN)") return { "total": int(legacy.get("dns_queries_today", 0)), "blocked": int(legacy.get("ads_blocked_today", 0)), "percent": float(legacy.get("ads_percentage_today", 0.0)), "ok": True, + "status": "OK", } - except Exception: - return {"total":0,"blocked":0,"percent":0.0,"ok":False} + except urllib.error.HTTPError as exc: + if exc.code in {401, 403}: + failure_exc = _auth_failure("legacy token rejected (check PIHOLE_API_TOKEN)") + message = _status_from_exception(failure_exc, "LEGACY") + else: + failure_exc = _transport_failure(f"legacy HTTP error {exc.code}") + message = _status_from_exception(failure_exc, "LEGACY") + return { + "total":0, + "blocked":0, + "percent":0.0, + "ok":False, + "status": message, + "failure": _failure_from_exception(failure_exc, source="legacy"), + } + except urllib.error.URLError as exc: + failure_exc = _transport_failure(f"legacy transport error: {exc.reason}") + message = _status_from_exception(failure_exc, "LEGACY") + return { + "total":0, + "blocked":0, + "percent":0.0, + "ok":False, + "status": message, + "failure": _failure_from_exception(failure_exc, source="legacy"), + } + except Exception as exc: + return { + "total":0, + "blocked":0, + "percent":0.0, + "ok":False, + "status": _status_from_exception(exc, "LEGACY"), + "failure": _failure_from_exception(exc, source="legacy"), + } + + +def _status_from_exception(exc: Exception, label: str) -> str: + message = str(exc) + if message.startswith("AUTH_FAILURE:"): + return f"{label} AUTH FAIL" + if message.startswith("TRANSPORT_FAILURE:"): + return f"{label} NET FAIL" + return f"{label} ERROR" + + +def _failure_reason_from_exception(exc: Exception) -> str: + message = str(exc) + if message.startswith("AUTH_FAILURE:"): + return "auth_failed" + if message.startswith("TRANSPORT_FAILURE:"): + lowered = message.lower() + if "timed out" in lowered or "timeout" in lowered: + return "network_timeout" + return "network_error" + return "unknown_error" + + +def _exception_summary(exc: Exception, label: str) -> str: + return f"{label} {_status_from_exception(exc, '').strip()} ({exc})" + + +def _failure_from_exception(exc: Exception, source: str) -> dict[str, str]: + return { + "reason": _failure_reason_from_exception(exc), + "summary": _exception_summary(exc, source.upper()), + "source": source, + } # ---------- rendering ---------- def draw_degree_circle(d, x, y, r, colour): diff --git a/scripts/piholestats_v1.2.py b/scripts/piholestats_v1.2.py index 004634a..d0a227c 100644 --- a/scripts/piholestats_v1.2.py +++ b/scripts/piholestats_v1.2.py @@ -82,6 +82,19 @@ def _parse_verify_tls(value: str) -> str: return normalized +def _host_requires_explicit_scheme(raw_host: str) -> bool: + candidate = raw_host.strip() + if not candidate: + return False + parsed = urllib.parse.urlsplit(candidate if "://" in candidate else f"//{candidate}") + if parsed.scheme: + return False + hostname = (parsed.hostname or candidate.split("/")[0].split(":")[0]).strip("[]").lower() + if not hostname: + return False + return hostname not in {"localhost", "::1"} and not hostname.startswith("127.") + + def validate_hour_bounds(hour: int) -> None: if hour < 0 or hour > 23: raise ValueError(f"hour must be in range 0-23, got {hour}") @@ -148,6 +161,10 @@ def record(name: str, *, default=None, required=False, validator=None): errors.append("PIHOLE_TIMEOUT is invalid: must be greater than 0") if ca_bundle and not Path(str(ca_bundle)).is_file(): errors.append("PIHOLE_CA_BUNDLE is invalid: file does not exist") + if _host_requires_explicit_scheme(str(host)) and not str(scheme): + errors.append( + "Remote PIHOLE_HOST requires an explicit scheme: set PIHOLE_SCHEME=http|https or include http:// / https:// in PIHOLE_HOST" + ) auth_mode = _detect_auth_mode(str(password), str(api_token)) if auth_mode is None: