diff --git a/.env.example b/.env.example index 1467200..6d91915 100644 --- a/.env.example +++ b/.env.example @@ -54,9 +54,25 @@ REFRESH_SECS=3 # Cross-midnight windows are supported, e.g. 22,7 means 22:00 through 07:59. ACTIVE_HOURS=22,7 +# --- Local / Drive-backed photos (preferred for photos-shuffle.py) --- +# Preferred primary source for shuffled photos. +LOCAL_PHOTOS_DIR=~/zero2dash/photos +# Optional: Google Drive folder ID used by scripts/drive-sync.py to populate LOCAL_PHOTOS_DIR. +GOOGLE_DRIVE_FOLDER_ID= +# Optional: service account JSON used by scripts/drive-sync.py. +GOOGLE_DRIVE_SERVICE_ACCOUNT_JSON=~/zero2dash/drive-service-account.json +# Optional sync state path for scripts/drive-sync.py +GOOGLE_DRIVE_SYNC_STATE_PATH=~/zero2dash/cache/drive_sync_state.json +# Optional resize state path for scripts/photo-resize.py +PHOTO_RESIZE_STATE_PATH=~/zero2dash/cache/photo_resize_state.json +# Optional resize script path used by scripts/drive-sync.py after downloads +PHOTO_RESIZE_SCRIPT=~/zero2dash/scripts/photo-resize.py + # --- Google Photos (photos-shuffle.py) --- -# Required: Album ID to pull shuffled photos from. -GOOGLE_PHOTOS_ALBUM_ID=replace-with-google-photos-album-id +# Optional: app-created album ID for Google Photos fallback only. +# Normal personal/shared Google Photos albums are no longer suitable; use LOCAL_PHOTOS_DIR, +# optionally populated by Google Drive sync. +GOOGLE_PHOTOS_ALBUM_ID= # OAuth credentials are required by one of these methods: # Use a Desktop OAuth client. If the Google app is in testing, add your account as a consent-screen test user. # 1) Existing client secrets file (default path shown), OR @@ -81,3 +97,5 @@ LOGO_PATH=/images/goo-photos-icon.png # OAUTH_OPEN_BROWSER default: 0 (false) # Loopback OAuth only: complete sign-in on the same machine, or use SSH port forwarding for headless Pi setup. OAUTH_OPEN_BROWSER=0 + + diff --git a/.gitignore b/.gitignore index 6926248..29862e5 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,8 @@ __pycache__/ *.pyc .env .env.local +photos/* +!photos/.gitkeep +cache/drive_sync_state.json +cache/photo_resize_state.json + diff --git a/README.md b/README.md index f157303..7fd357d 100644 --- a/README.md +++ b/README.md @@ -109,6 +109,13 @@ Google OAuth notes: - If the Google consent screen is in testing, add your account as a test user. - `calendash-api.py` defaults `GOOGLE_TOKEN_PATH` to `token.json` relative to `/opt/zero2dash` under systemd; `photos-shuffle.py` must keep using a separate `GOOGLE_TOKEN_PATH_PHOTOS`. +Drive-backed photos notes: + +- `scripts/photos-shuffle.py` now treats `LOCAL_PHOTOS_DIR` as the primary source. +- Use `scripts/drive-sync.py` to populate that directory from a shared Google Drive folder. +- `scripts/photo-resize.py` proportionally reduces changed images to 50% before they are reused locally. +- Normal personal/shared Google Photos albums are no longer a reliable headless source; if you still configure `GOOGLE_PHOTOS_ALBUM_ID`, treat it as an app-created-album fallback only. + ## Run via systemd Install and enable canonical units: @@ -138,12 +145,33 @@ journalctl -u pihole-display-dark.service -n 50 --no-pager Use `python3 scripts/photos-shuffle.py --check-config` to validate the configuration and print the credential source that will be used. +## Drive-backed photo sync + +Use a shared Google Drive folder when you want remote photo management without depending on the now-hobbled Google Photos album API. + +Required configuration: + +- `LOCAL_PHOTOS_DIR` +- `GOOGLE_DRIVE_FOLDER_ID` +- `GOOGLE_DRIVE_SERVICE_ACCOUNT_JSON` + +Recommended workflow: + +```sh +python3 scripts/drive-sync.py +python3 scripts/photos-shuffle.py --test +``` + +`drive-sync.py` downloads images from the shared Drive folder into `LOCAL_PHOTOS_DIR` and then runs `photo-resize.py`, which shrinks new or changed images to 50% of their original width and height before reuse. + ## Notes -- `display_rotator.py` excludes `piholestats_v1.2.py` by default so day mode and night mode stay distinct. +- `display_rotator.py` excludes `piholestats_v1.2.py`, `calendash-api.py`, `_config.py`, `drive-sync.py`, and `photo-resize.py` by default so helper scripts do not end up in the day rotator. - Static image scripts (for example `tram-info.py`, `weather-dash.py`, `calendash-img.py`) are rotator-friendly page scripts, not systemd service units by themselves. ### Framebuffer overrides in systemd Both canonical service units now set `FB_DEVICE=/dev/fb1` by default and load `/opt/zero2dash/.env` afterward, so setting `FB_DEVICE` in `.env` overrides the unit default without editing unit files. + + diff --git a/display_rotator.py b/display_rotator.py index d59c1dd..099ab7d 100644 --- a/display_rotator.py +++ b/display_rotator.py @@ -30,7 +30,13 @@ DEFAULT_PAGES_DIR = "scripts" DEFAULT_PAGE_GLOB = "*.py" -DEFAULT_EXCLUDE_PATTERNS = ["piholestats_v1.2.py", "calendash-api.py"] +DEFAULT_EXCLUDE_PATTERNS = [ + "piholestats_v1.2.py", + "calendash-api.py", + "_config.py", + "drive-sync.py", + "photo-resize.py", +] DEFAULT_INCLUDE_PATTERNS: list[str] = [] DEFAULT_ROTATE_SECS = 30 SHUTDOWN_WAIT_SECS = 5 @@ -1012,3 +1018,4 @@ def request_stop(signum: int, _frame: object) -> None: if __name__ == "__main__": raise SystemExit(main()) + diff --git a/photos/.gitkeep b/photos/.gitkeep new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/photos/.gitkeep @@ -0,0 +1 @@ + diff --git a/pr-body.md b/pr-body.md new file mode 100644 index 0000000..4854fcb --- /dev/null +++ b/pr-body.md @@ -0,0 +1,10 @@ +## Summary +- require an explicit scheme for remote Pi-hole hosts and validate TLS/timeout-related configuration +- improve Pi-hole auth handling in `piholestats_v1.3.py`, including v6-session vs legacy-token mode detection and clearer auth/transport failure reporting +- tighten Google Calendar and Photos OAuth handling around loopback redirects, Desktop OAuth clients, and token scope parsing +- add a Photos `--auth-only` mode and improve token-path separation from the calendar flow +- update `.env.example` and `README.md` to document the revised Pi-hole and Google OAuth setup + +## Testing +- not run from this Codex environment + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ed596c5 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,10 @@ +# Runtime Python dependencies for zero2dash scripts. +# System tools such as git, gh, systemd, and framebuffer drivers are not pip packages. + +Pillow>=10.0 +python-dotenv>=1.0 +pytz>=2024.1 +google-auth>=2.0 +google-auth-oauthlib>=1.2 +google-auth-httplib2>=0.2 +google-api-python-client>=2.0 diff --git a/scripts/calendash-img.py b/scripts/calendash-img.py index 07c7178..8b4cee3 100755 --- a/scripts/calendash-img.py +++ b/scripts/calendash-img.py @@ -18,7 +18,7 @@ TOUCH_DEVICE_DEFAULT = os.environ.get("TOUCH_DEVICE", "/dev/input/event0") WIDTH_DEFAULT = int(os.environ.get("FB_WIDTH", "320")) HEIGHT_DEFAULT = int(os.environ.get("FB_HEIGHT", "240")) -DEFAULT_IMAGE = Path(__file__).resolve().parent.parent / "images" / "calendash" / "output.png" +DEFAULT_IMAGE = Path(__file__).resolve().parent.parent / "images" / "calendash.png" INPUT_EVENT_STRUCT = struct.Struct("llHHI") EV_KEY = 0x01 BTN_TOUCH = 0x14A @@ -41,7 +41,7 @@ def rgb888_to_rgb565(image: Image.Image) -> bytes: def load_frame(image_path: Path, width: int, height: int) -> Image.Image: if not image_path.exists(): - raise FileNotFoundError(f"Background image not found: {image_path}") + raise FileNotFoundError(f"Calendar image not found: {image_path}") return Image.open(image_path).convert("RGB").resize((width, height), RESAMPLING_LANCZOS) @@ -154,3 +154,4 @@ def main() -> int: if __name__ == "__main__": raise SystemExit(main()) + diff --git a/scripts/drive-sync.py b/scripts/drive-sync.py new file mode 100644 index 0000000..0706a73 --- /dev/null +++ b/scripts/drive-sync.py @@ -0,0 +1,259 @@ +#!/usr/bin/env python3 +"""Sync image files from a shared Google Drive folder into LOCAL_PHOTOS_DIR.""" + +from __future__ import annotations + +import argparse +import json +import mimetypes +import re +import subprocess +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Any +from urllib.error import HTTPError +from urllib.parse import urlencode +from urllib.request import Request as UrlRequest, urlopen + +from dotenv import load_dotenv +from google.auth.transport.requests import Request +from google.oauth2 import service_account + +from _config import get_env, report_validation_errors + +DEFAULT_ROOT = Path("~/zero2dash").expanduser() +DRIVE_SCOPES = ["https://www.googleapis.com/auth/drive.readonly"] +DEFAULT_STATE_PATH = DEFAULT_ROOT / "cache" / "drive_sync_state.json" +DEFAULT_RESIZE_SCRIPT = DEFAULT_ROOT / "scripts" / "photo-resize.py" +ALLOWED_SUFFIXES = {".jpg", ".jpeg", ".png", ".webp"} + + +@dataclass +class Config: + folder_id: str + service_account_json: Path + local_photos_dir: Path + state_path: Path + resize_script: Path + skip_resize: bool + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Sync a shared Google Drive folder into LOCAL_PHOTOS_DIR.") + parser.add_argument("--check-config", action="store_true", help="Validate configuration and exit") + parser.add_argument("--skip-resize", action="store_true", help="Skip the follow-up resize step") + return parser.parse_args() + + +def validate_config(skip_resize: bool) -> tuple[Config | None, list[str]]: + errors: list[str] = [] + + def record(name: str, *, default: Any = None, required: bool = False) -> Any: + try: + return get_env(name, default=default, required=required) + except ValueError as exc: + errors.append(str(exc)) + return default + + folder_id = str(record("GOOGLE_DRIVE_FOLDER_ID", required=True)) + service_account_raw = str(record("GOOGLE_DRIVE_SERVICE_ACCOUNT_JSON", default=str(DEFAULT_ROOT / "drive-service-account.json"))) + local_photos_raw = str(record("LOCAL_PHOTOS_DIR", default=str(DEFAULT_ROOT / "photos"))) + state_raw = str(record("GOOGLE_DRIVE_SYNC_STATE_PATH", default=str(DEFAULT_STATE_PATH))) + resize_raw = str(record("PHOTO_RESIZE_SCRIPT", default=str(DEFAULT_RESIZE_SCRIPT))) + + service_account_json = Path(service_account_raw).expanduser() + if not service_account_json.is_file(): + errors.append(f"GOOGLE_DRIVE_SERVICE_ACCOUNT_JSON not found: {service_account_json}") + + config = Config( + folder_id=folder_id, + service_account_json=service_account_json, + local_photos_dir=Path(local_photos_raw).expanduser(), + state_path=Path(state_raw).expanduser(), + resize_script=Path(resize_raw).expanduser(), + skip_resize=skip_resize, + ) + + if errors: + return None, errors + return config, [] + + +def load_config(skip_resize: bool) -> Config: + load_dotenv(DEFAULT_ROOT / ".env") + config, errors = validate_config(skip_resize) + if errors: + report_validation_errors("drive-sync.py", errors) + raise ValueError("Invalid configuration") + assert config is not None + return config + + +def load_state(path: Path) -> dict[str, dict[str, str]]: + if not path.exists(): + return {} + try: + data = json.loads(path.read_text(encoding="utf-8")) + except Exception: + return {} + return data if isinstance(data, dict) else {} + + +def save_state(path: Path, state: dict[str, dict[str, str]]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(state, indent=2, sort_keys=True), encoding="utf-8") + + +def build_credentials(config: Config): + creds = service_account.Credentials.from_service_account_file( + str(config.service_account_json), + scopes=DRIVE_SCOPES, + ) + creds.refresh(Request()) + return creds + + +def drive_json(creds, endpoint: str, params: dict[str, str]) -> dict[str, Any]: + creds.refresh(Request()) + url = f"https://www.googleapis.com/drive/v3/{endpoint}?{urlencode(params)}" + req = UrlRequest(url, headers={"Authorization": f"Bearer {creds.token}"}) + try: + with urlopen(req, timeout=30) as response: # nosec B310 + payload = json.loads(response.read().decode("utf-8")) + except HTTPError as exc: + body = exc.read().decode("utf-8", "replace") + raise RuntimeError(f"Google Drive API request failed ({exc.code} {exc.reason}): {body}") from exc + if not isinstance(payload, dict): + raise ValueError("Google Drive API response must be a JSON object") + return payload + + +def list_folder_images(creds, folder_id: str) -> list[dict[str, str]]: + images: list[dict[str, str]] = [] + page_token = "" + while True: + params = { + "q": f"'{folder_id}' in parents and trashed = false and mimeType contains 'image/'", + "fields": "nextPageToken,files(id,name,mimeType,modifiedTime)", + "pageSize": "1000", + "supportsAllDrives": "true", + "includeItemsFromAllDrives": "true", + } + if page_token: + params["pageToken"] = page_token + payload = drive_json(creds, "files", params) + files = payload.get("files", []) + if not isinstance(files, list): + raise ValueError("Google Drive API response field 'files' must be a list") + for item in files: + if isinstance(item, dict): + images.append({ + "id": str(item.get("id") or ""), + "name": str(item.get("name") or "image"), + "mimeType": str(item.get("mimeType") or "image/jpeg"), + "modifiedTime": str(item.get("modifiedTime") or ""), + }) + page_token = str(payload.get("nextPageToken") or "") + if not page_token: + break + return [item for item in images if item["id"]] + + +def safe_stem(name: str) -> str: + stem = Path(name).stem or "image" + return re.sub(r"[^A-Za-z0-9._-]+", "-", stem).strip("-._") or "image" + + +def suffix_for_item(item: dict[str, str]) -> str: + suffix = Path(item["name"]).suffix.lower() + if suffix in ALLOWED_SUFFIXES: + return suffix + guessed = (mimetypes.guess_extension(item["mimeType"]) or ".jpg").lower() + return ".jpg" if guessed == ".jpe" else guessed + + +def local_path_for_item(local_dir: Path, item: dict[str, str]) -> Path: + return local_dir / f"drive-{item['id']}-{safe_stem(item['name'])}{suffix_for_item(item)}" + + +def download_file(creds, file_id: str, target: Path) -> None: + creds.refresh(Request()) + url = f"https://www.googleapis.com/drive/v3/files/{file_id}?alt=media&supportsAllDrives=true" + req = UrlRequest(url, headers={"Authorization": f"Bearer {creds.token}"}) + with urlopen(req, timeout=60) as response: # nosec B310 + data = response.read() + target.parent.mkdir(parents=True, exist_ok=True) + target.write_bytes(data) + + +def run_resize(config: Config) -> None: + if config.skip_resize: + print("[drive-sync.py] Resize step skipped (--skip-resize).") + return + if not config.resize_script.is_file(): + print(f"[drive-sync.py] Resize script not found: {config.resize_script}; skipping resize step.") + return + result = subprocess.run([sys.executable, str(config.resize_script)], check=False) + if result.returncode != 0: + raise RuntimeError(f"Resize step failed with exit code {result.returncode}") + + +def sync_drive(config: Config) -> None: + creds = build_credentials(config) + items = list_folder_images(creds, config.folder_id) + config.local_photos_dir.mkdir(parents=True, exist_ok=True) + + state = load_state(config.state_path) + next_state: dict[str, dict[str, str]] = {} + downloaded = 0 + skipped = 0 + removed = 0 + + for item in items: + target = local_path_for_item(config.local_photos_dir, item) + previous = state.get(item["id"], {}) + previous_name = previous.get("local_name", "") + previous_target = config.local_photos_dir / previous_name if previous_name else None + if previous_target and previous_target != target and previous_target.exists() and previous_target.name.startswith("drive-"): + previous_target.unlink() + if previous.get("modifiedTime") == item["modifiedTime"] and target.exists(): + skipped += 1 + else: + download_file(creds, item["id"], target) + downloaded += 1 + next_state[item["id"]] = { + "modifiedTime": item["modifiedTime"], + "local_name": target.name, + } + + stale_ids = set(state) - set(next_state) + for stale_id in stale_ids: + stale_name = state.get(stale_id, {}).get("local_name", "") + stale_path = config.local_photos_dir / stale_name if stale_name else None + if stale_path and stale_path.exists() and stale_path.name.startswith("drive-"): + stale_path.unlink() + removed += 1 + + save_state(config.state_path, next_state) + print(f"[drive-sync.py] Synced {len(items)} images: downloaded={downloaded}, skipped={skipped}, removed={removed}.") + run_resize(config) + + +def main() -> int: + args = parse_args() + try: + config = load_config(skip_resize=args.skip_resize) + except ValueError: + return 1 + + if args.check_config: + print("[drive-sync.py] Configuration check passed.") + return 0 + + sync_drive(config) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/photo-resize.py b/scripts/photo-resize.py new file mode 100644 index 0000000..de078d2 --- /dev/null +++ b/scripts/photo-resize.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python3 +"""Resize changed images in LOCAL_PHOTOS_DIR to 50% of their current dimensions.""" + +from __future__ import annotations + +import argparse +import json +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from dotenv import load_dotenv +from PIL import Image, ImageOps + +from _config import get_env, report_validation_errors + +DEFAULT_ROOT = Path("~/zero2dash").expanduser() +DEFAULT_STATE_PATH = DEFAULT_ROOT / "cache" / "photo_resize_state.json" +ALLOWED_SUFFIXES = {".jpg", ".jpeg", ".png", ".webp"} +RESIZE_SCALE = 0.5 + + +@dataclass +class Config: + local_photos_dir: Path + state_path: Path + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Resize changed photos in LOCAL_PHOTOS_DIR by 50%.") + parser.add_argument("--check-config", action="store_true", help="Validate configuration and exit") + return parser.parse_args() + + +def validate_config() -> tuple[Config | None, list[str]]: + errors: list[str] = [] + + def record(name: str, *, default: Any = None, required: bool = False) -> Any: + try: + return get_env(name, default=default, required=required) + except ValueError as exc: + errors.append(str(exc)) + return default + + local_photos_raw = str(record("LOCAL_PHOTOS_DIR", default=str(DEFAULT_ROOT / "photos"))) + state_raw = str(record("PHOTO_RESIZE_STATE_PATH", default=str(DEFAULT_STATE_PATH))) + + config = Config( + local_photos_dir=Path(local_photos_raw).expanduser(), + state_path=Path(state_raw).expanduser(), + ) + if errors: + return None, errors + return config, [] + + +def load_state(path: Path) -> dict[str, dict[str, int]]: + if not path.exists(): + return {} + try: + data = json.loads(path.read_text(encoding="utf-8")) + except Exception: + return {} + return data if isinstance(data, dict) else {} + + +def save_state(path: Path, state: dict[str, dict[str, int]]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(state, indent=2, sort_keys=True), encoding="utf-8") + + +def iter_images(local_photos_dir: Path) -> list[Path]: + if not local_photos_dir.exists(): + return [] + return sorted( + path + for path in local_photos_dir.rglob("*") + if path.is_file() and path.suffix.lower() in ALLOWED_SUFFIXES + ) + + +def resize_in_place(path: Path) -> None: + with Image.open(path) as raw: + image = ImageOps.exif_transpose(raw) + if image.mode not in {"RGB", "RGBA"}: + image = image.convert("RGB") + width = max(1, int(round(image.width * RESIZE_SCALE))) + height = max(1, int(round(image.height * RESIZE_SCALE))) + resized = image.resize((width, height), Image.Resampling.LANCZOS) + save_kwargs: dict[str, Any] = {} + suffix = path.suffix.lower() + if suffix in {".jpg", ".jpeg"}: + if resized.mode != "RGB": + resized = resized.convert("RGB") + save_kwargs = {"quality": 90, "optimize": True} + elif suffix == ".png": + save_kwargs = {"optimize": True} + resized.save(path, **save_kwargs) + + +def main() -> int: + args = parse_args() + load_dotenv(DEFAULT_ROOT / ".env") + config, errors = validate_config() + if errors: + report_validation_errors("photo-resize.py", errors) + return 1 + assert config is not None + + if args.check_config: + print("[photo-resize.py] Configuration check passed.") + return 0 + + config.local_photos_dir.mkdir(parents=True, exist_ok=True) + state = load_state(config.state_path) + next_state: dict[str, dict[str, int]] = {} + processed = 0 + skipped = 0 + + for image_path in iter_images(config.local_photos_dir): + stat = image_path.stat() + key = str(image_path.relative_to(config.local_photos_dir)) + current = {"size": stat.st_size, "mtime_ns": stat.st_mtime_ns} + if state.get(key) == current: + next_state[key] = current + skipped += 1 + continue + resize_in_place(image_path) + updated = image_path.stat() + next_state[key] = {"size": updated.st_size, "mtime_ns": updated.st_mtime_ns} + processed += 1 + + save_state(config.state_path, next_state) + print(f"[photo-resize.py] Processed={processed}, skipped={skipped}, total={processed + skipped}.") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/photos-shuffle.py b/scripts/photos-shuffle.py index c04f547..89af9af 100755 --- a/scripts/photos-shuffle.py +++ b/scripts/photos-shuffle.py @@ -2,15 +2,14 @@ """ photos-shuffle.py -One-shot Google Photos album renderer for a 320x240 framebuffer page. +One-shot photo renderer for a 320x240 framebuffer page. Requirements (pip): Pillow, python-dotenv, google-auth, google-auth-oauthlib, google-api-python-client. Configuration (.env): -- Required: GOOGLE_PHOTOS_ALBUM_ID -- Optional: GOOGLE_PHOTOS_CLIENT_SECRETS_PATH, GOOGLE_TOKEN_PATH_PHOTOS, FB_DEVICE, WIDTH, - HEIGHT, CACHE_DIR, FALLBACK_IMAGE, LOGO_PATH +- Optional: LOCAL_PHOTOS_DIR, GOOGLE_PHOTOS_ALBUM_ID, GOOGLE_PHOTOS_CLIENT_SECRETS_PATH, + GOOGLE_TOKEN_PATH_PHOTOS, FB_DEVICE, WIDTH, HEIGHT, CACHE_DIR, FALLBACK_IMAGE, LOGO_PATH - Optional OAuth alternative: GOOGLE_PHOTOS_CLIENT_ID and GOOGLE_PHOTOS_CLIENT_SECRET (used when GOOGLE_PHOTOS_CLIENT_SECRETS_PATH file is not present). @@ -26,33 +25,27 @@ script, or use SSH port forwarding to forward the callback port from the Pi. - Use a Desktop OAuth client. If the Google app is in testing, add your account as a test user before first run. +- Since 31 March 2025, Google Photos Library API read access is limited to app-created + albums and media items. Personal/shared albums should use LOCAL_PHOTOS_DIR, + optionally populated by Google Drive sync. Fallback: +- Preferred source is LOCAL_PHOTOS_DIR (default: ~/zero2dash/photos). If it is empty, the + script can still try Google Photos when configured, then CACHE_DIR, then FALLBACK_IMAGE. - Ensure local fallback image exists at ~/zero2dash/images/photos-fallback.png - (or override with FALLBACK_IMAGE). If online/offline fetch fails, fallback is - rendered through the same processing pipeline. - -Credential precedence: -1) GOOGLE_PHOTOS_CLIENT_SECRETS_PATH file (from env/.env; defaults to - ~/zero2dash/client_secret.json). -2) GOOGLE_PHOTOS_CLIENT_ID + GOOGLE_PHOTOS_CLIENT_SECRET env/.env values. -3) GOOGLE_CLIENT_ID + GOOGLE_CLIENT_SECRET env/.env values (legacy fallback). + (or override with FALLBACK_IMAGE). """ from __future__ import annotations import argparse -import hashlib import json import os import random -import socket -import tempfile -import time from dataclasses import dataclass from pathlib import Path from typing import Any -from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit +from urllib.error import HTTPError from _config import get_env, report_validation_errors @@ -60,26 +53,19 @@ from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow -from googleapiclient.discovery import build from PIL import Image, ImageEnhance -SCOPES = ["https://www.googleapis.com/auth/photoslibrary.readonly"] +SCOPES = ["https://www.googleapis.com/auth/photoslibrary.readonly.appcreateddata"] DEFAULT_ROOT = Path("~/zero2dash").expanduser() TEST_OUTPUT = Path("/tmp/photos-shuffle-test.png") LOGO_WIDTH_RATIO = 0.14 LOGO_PADDING_RATIO = 0.03 BRIGHTNESS_FACTOR = 0.75 -TOKEN_METADATA_SUFFIX = ".meta.json" - -CREDENTIAL_SOURCES_CHECKLIST = [ - "GOOGLE_PHOTOS_CLIENT_SECRETS_PATH file (env/.env; default: ~/zero2dash/client_secret.json)", - "GOOGLE_PHOTOS_CLIENT_ID + GOOGLE_PHOTOS_CLIENT_SECRET (env/.env)", - "GOOGLE_CLIENT_ID + GOOGLE_CLIENT_SECRET (legacy env/.env fallback)", -] @dataclass class Config: + local_photos_dir: Path album_id: str client_secrets_path: Path client_id: str @@ -91,87 +77,8 @@ class Config: cache_dir: Path fallback_image: Path logo_path: Path - logo_attempted_paths: list[Path] oauth_port: int oauth_open_browser: bool - max_online_attempts: int - - -def _scope_fingerprint(scopes: list[str]) -> str: - canonical = "\n".join(sorted(set(scopes))) - return hashlib.sha256(canonical.encode("utf-8")).hexdigest() - - -def _token_metadata_path(token_path: Path) -> Path: - return token_path.with_name(f"{token_path.name}{TOKEN_METADATA_SUFFIX}") - - -def _write_token_metadata(token_path: Path, provider: str, scopes: list[str]) -> None: - metadata_path = _token_metadata_path(token_path) - payload = { - "provider": provider, - "scopes": sorted(set(scopes)), - "scope_fingerprint": _scope_fingerprint(scopes), - } - metadata_path.parent.mkdir(parents=True, exist_ok=True) - metadata_path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") - os.chmod(metadata_path, 0o600) - - -def _calendar_token_path() -> Path: - return Path(os.getenv("GOOGLE_TOKEN_PATH", "token.json")).expanduser().resolve() - - -def _preflight_token_path_guard(token_path: Path, force_token_path_reuse: bool) -> None: - calendar_token_path = _calendar_token_path() - if token_path.resolve() != calendar_token_path: - return - if force_token_path_reuse: - print( - "[warning] GOOGLE_TOKEN_PATH_PHOTOS matches GOOGLE_TOKEN_PATH " - f"({token_path.resolve()}), but proceeding due to --force-token-path-reuse." - ) - return - raise ValueError( - "Refusing to reuse a single token path for photos and calendar scripts. " - "Set GOOGLE_TOKEN_PATH_PHOTOS and GOOGLE_TOKEN_PATH to different files, " - "or pass --force-token-path-reuse to override intentionally." - ) - - -def _verify_token_metadata(token_path: Path, force_token_path_reuse: bool) -> None: - metadata_path = _token_metadata_path(token_path) - if not metadata_path.exists(): - return - - try: - metadata_payload = json.loads(metadata_path.read_text(encoding="utf-8")) - except Exception as exc: - print(f"[warning] Ignoring unreadable token metadata at {metadata_path.resolve()} ({exc}).") - return - - expected_provider = "google_photos" - expected_fingerprint = _scope_fingerprint(SCOPES) - provider = str(metadata_payload.get("provider", "")).strip() - scope_fingerprint = str(metadata_payload.get("scope_fingerprint", "")).strip() - - if provider == expected_provider and scope_fingerprint == expected_fingerprint: - return - - mismatch_message = ( - f"Token metadata mismatch for {token_path.resolve()} (found provider={provider or 'unknown'}, " - f"scope_fingerprint={scope_fingerprint or 'unknown'}; expected provider={expected_provider})." - ) - remediation = ( - "Remediation: set GOOGLE_TOKEN_PATH_PHOTOS to a dedicated photos token file and " - "set GOOGLE_TOKEN_PATH to a separate calendar token file. " - "If shared token usage is intentional, rerun with --force-token-path-reuse." - ) - - if force_token_path_reuse: - print(f"[warning] {mismatch_message} {remediation}") - return - raise ValueError(f"{mismatch_message} {remediation}") def _normalize_scopes(raw_scopes: Any) -> set[str]: @@ -248,41 +155,7 @@ def _as_bool(value: str) -> bool: return value.strip().lower() in {"1", "true", "yes", "on"} -def format_credentials_checklist() -> str: - return "Credential sources checked (in order): " + " ; ".join( - f"{index}) {source}" for index, source in enumerate(CREDENTIAL_SOURCES_CHECKLIST, start=1) - ) - - -def resolve_repo_or_script_relative_path(path_raw: str) -> tuple[Path, list[Path]]: - expanded = Path(path_raw).expanduser() - if expanded.is_absolute(): - return expanded, [expanded] - - repo_candidate = (REPO_ROOT / expanded).resolve() - script_candidate = (SCRIPT_DIR / expanded).resolve() - attempted = [repo_candidate] - if script_candidate != repo_candidate: - attempted.append(script_candidate) - - for candidate in attempted: - if candidate.exists(): - return candidate, attempted - return repo_candidate, attempted - - -def selected_credential_source(config: Config) -> str: - if config.client_secrets_path.exists(): - return f"GOOGLE_PHOTOS_CLIENT_SECRETS_PATH file ({config.client_secrets_path})" - if config.client_id and config.client_secret: - photos_scoped = bool(os.getenv("GOOGLE_PHOTOS_CLIENT_ID") and os.getenv("GOOGLE_PHOTOS_CLIENT_SECRET")) - if photos_scoped: - return "GOOGLE_PHOTOS_CLIENT_ID + GOOGLE_PHOTOS_CLIENT_SECRET" - return "GOOGLE_CLIENT_ID + GOOGLE_CLIENT_SECRET (legacy fallback)" - return "none" - - -def validate_config(*, force_token_path_reuse: bool = False) -> tuple[Config | None, list[str]]: +def validate_config() -> tuple[Config | None, list[str]]: errors: list[str] = [] def record(name: str, *, default: Any = None, required: bool = False, validator: Any = None) -> Any: @@ -292,7 +165,8 @@ def record(name: str, *, default: Any = None, required: bool = False, validator: errors.append(str(exc)) return default - album_id = record("GOOGLE_PHOTOS_ALBUM_ID", required=True) + local_photos_raw = record("LOCAL_PHOTOS_DIR", default=str(DEFAULT_ROOT / "photos")) + album_id = record("GOOGLE_PHOTOS_ALBUM_ID", default="") client_secrets_raw = record("GOOGLE_PHOTOS_CLIENT_SECRETS_PATH", default=str(DEFAULT_ROOT / "client_secret.json")) client_id = record("GOOGLE_PHOTOS_CLIENT_ID") or record("GOOGLE_CLIENT_ID") or "" @@ -304,16 +178,9 @@ def record(name: str, *, default: Any = None, required: bool = False, validator: height = record("HEIGHT", default=240, validator=lambda v: _as_int("HEIGHT", v)) cache_raw = record("CACHE_DIR", default=str(DEFAULT_ROOT / "cache" / "google_photos")) fallback_raw = record("FALLBACK_IMAGE", default=str(DEFAULT_ROOT / "images" / "photos-fallback.png")) - logo_override_raw = os.getenv("LOGO_PATH") - logo_raw = logo_override_raw if logo_override_raw is not None else str(DEFAULT_LOGO_RELATIVE_PATH) - logo_path, logo_attempted_paths = resolve_repo_or_script_relative_path(str(logo_raw)) + logo_raw = record("LOGO_PATH", default="/images/goo-photos-icon.png") oauth_port = record("OAUTH_PORT", default=8080, validator=lambda v: _as_int("OAUTH_PORT", v)) oauth_open_browser = record("OAUTH_OPEN_BROWSER", default=False, validator=_as_bool) - max_online_attempts = record( - "GOOGLE_PHOTOS_MAX_ATTEMPTS", - default=0, - validator=lambda v: _as_int("GOOGLE_PHOTOS_MAX_ATTEMPTS", v), - ) if isinstance(width, int) and width <= 0: errors.append("WIDTH is invalid: must be greater than 0") @@ -321,14 +188,13 @@ def record(name: str, *, default: Any = None, required: bool = False, validator: errors.append("HEIGHT is invalid: must be greater than 0") if isinstance(oauth_port, int) and oauth_port <= 0: errors.append("OAUTH_PORT is invalid: must be greater than 0") - if isinstance(max_online_attempts, int) and max_online_attempts < 0: - errors.append("GOOGLE_PHOTOS_MAX_ATTEMPTS is invalid: must be >= 0") fallback_image = Path(str(fallback_raw)).expanduser() if not fallback_image.exists(): errors.append(f"FALLBACK_IMAGE not found: {fallback_image}") config = Config( + local_photos_dir=Path(str(local_photos_raw)).expanduser(), album_id=str(album_id), client_secrets_path=Path(str(client_secrets_raw)).expanduser(), client_id=str(client_id), @@ -339,27 +205,23 @@ def record(name: str, *, default: Any = None, required: bool = False, validator: height=int(height), cache_dir=Path(str(cache_raw)).expanduser(), fallback_image=fallback_image, - logo_path=logo_path, - logo_attempted_paths=logo_attempted_paths, + logo_path=Path(str(logo_raw)).expanduser(), oauth_port=int(oauth_port), oauth_open_browser=bool(oauth_open_browser), - max_online_attempts=int(max_online_attempts), ) - calendar_token_path = _calendar_token_path() - if config.token_path.resolve() == calendar_token_path and not force_token_path_reuse: + calendar_default_token = (DEFAULT_ROOT / "token.json").resolve() + if config.album_id and config.token_path.resolve() == calendar_default_token: errors.append( - "GOOGLE_TOKEN_PATH_PHOTOS points to the same file as GOOGLE_TOKEN_PATH, which is unsafe for " - "cross-script token reuse. Set GOOGLE_TOKEN_PATH_PHOTOS and GOOGLE_TOKEN_PATH to different files, " - "or use --force-token-path-reuse to override intentionally." + "GOOGLE_TOKEN_PATH_PHOTOS points to token.json, which is reserved for calendash-api.py. " + "Use a separate photos token path (default: ~/zero2dash/token_photos.json)." ) - if not config.client_secrets_path.exists() and not (config.client_id and config.client_secret): + if config.album_id and not config.client_secrets_path.exists() and not (config.client_id and config.client_secret): errors.append( f"Google Photos OAuth credentials are required: set GOOGLE_PHOTOS_CLIENT_SECRETS_PATH to an existing file " f"or provide GOOGLE_PHOTOS_CLIENT_ID + GOOGLE_PHOTOS_CLIENT_SECRET (checked path: {config.client_secrets_path})." ) - errors.append(format_credentials_checklist()) if errors: return None, errors @@ -376,11 +238,11 @@ def load_config() -> Config: return config -def authenticate(config: Config, log: Log, *, force_token_path_reuse: bool = False) -> Credentials: +def authenticate(config: Config, log: Log) -> Credentials: creds: Credentials | None = None calendar_default_token = (DEFAULT_ROOT / "token.json").resolve() - if config.token_path.resolve() == calendar_default_token: + if config.album_id and config.token_path.resolve() == calendar_default_token: raise ValueError( "GOOGLE_TOKEN_PATH_PHOTOS points to token.json, which is reserved for calendash-api.py. " "Use a separate photos token path (default: ~/zero2dash/token_photos.json)." @@ -395,7 +257,7 @@ def authenticate(config: Config, log: Log, *, force_token_path_reuse: bool = Fal if payload and not _is_token_compatible_with_photos(payload): log.info( - f"Token at {config.token_path} does not include Google Photos scope; re-authenticating" + f"Token at {config.token_path} does not include required Google Photos scope {SCOPES[0]}; re-authenticating" ) else: try: @@ -411,8 +273,6 @@ def authenticate(config: Config, log: Log, *, force_token_path_reuse: bool = Fal log.debug("Refreshing existing Google OAuth token") try: creds.refresh(Request()) - _write_token_atomically(config.token_path, creds.to_json()) - log.info(f"Google OAuth token refreshed and saved to {config.token_path}") except Exception as exc: log.info(f"Token refresh failed ({exc}); starting OAuth flow") creds = None @@ -453,28 +313,93 @@ def authenticate(config: Config, log: Log, *, force_token_path_reuse: bool = Fal log.info(message) raise RuntimeError("Loopback OAuth setup failed") from exc - _write_token_atomically(config.token_path, creds.to_json()) - _write_token_metadata(config.token_path, provider="google_photos", scopes=SCOPES) + config.token_path.parent.mkdir(parents=True, exist_ok=True) + config.token_path.write_text(creds.to_json(), encoding="utf-8") log.debug(f"Saved OAuth token to {config.token_path}") return creds -def _write_token_atomically(path: Path, content: str) -> None: - path.parent.mkdir(parents=True, exist_ok=True) - with tempfile.NamedTemporaryFile("w", encoding="utf-8", dir=path.parent, delete=False) as tmp_file: - tmp_file.write(content) - tmp_file.flush() - os.fsync(tmp_file.fileno()) - temp_path = Path(tmp_file.name) - temp_path.replace(path) +def _photos_api_json(creds: Credentials, endpoint: str, body: dict[str, Any]) -> dict[str, Any]: + from urllib.request import Request as UrlRequest, urlopen + + url = f"https://photoslibrary.googleapis.com/v1/{endpoint}" + payload = json.dumps(body).encode("utf-8") + request = UrlRequest( + url, + data=payload, + headers={ + "Authorization": f"Bearer {creds.token}", + "Content-Type": "application/json", + }, + method="POST", + ) + try: + with urlopen(request, timeout=20) as response: # nosec B310 + data = json.loads(response.read().decode("utf-8")) + except HTTPError as exc: + detail = _describe_google_api_error(exc) + if detail: + raise RuntimeError(detail) from exc + raise + if not isinstance(data, dict): + raise ValueError("Google Photos API response must be a JSON object") + return data + + +def _describe_google_api_error(exc: HTTPError) -> str: + try: + raw_body = exc.read() + except Exception: + raw_body = b"" + + body_text = raw_body.decode("utf-8", "replace").strip() if raw_body else "" + try: + payload = json.loads(body_text) if body_text else {} + except json.JSONDecodeError: + payload = {} + + error = payload.get("error") if isinstance(payload, dict) else None + if not isinstance(error, dict): + return f"Google Photos API request failed ({exc.code} {exc.reason})" + + message = str(error.get("message") or f"Google Photos API request failed ({exc.code} {exc.reason})").strip() + lowered = message.lower() + if "insufficient authentication scopes" in lowered: + return ( + "Google Photos Library API removed photoslibrary.readonly on 31 March 2025. " + "This script now only works with app-created albums/media using " + "photoslibrary.readonly.appcreateddata. Re-authorise with the new scope, " + "or populate CACHE_DIR from another source for personal/shared albums." + ) + + details = error.get("details") + if not isinstance(details, list): + return message + + for detail in details: + if not isinstance(detail, dict): + continue + metadata = detail.get("metadata") + if detail.get("reason") == "SERVICE_DISABLED" and isinstance(metadata, dict): + activation_url = metadata.get("activationUrl") + consumer = metadata.get("consumer") + if activation_url and consumer: + return f"{message} Enable Photos Library API for {consumer}: {activation_url}" + if activation_url: + return f"{message} Enable Photos Library API here: {activation_url}" + + return message def list_album_images(creds: Credentials, album_id: str, log: Log) -> list[dict[str, Any]]: - service = build("photoslibrary", "v1", credentials=creds, cache_discovery=False) - return _list_album_images_from_service(service, album_id, log) + return _list_album_images(lambda body: _photos_api_json(creds, "mediaItems:search", body), album_id, log) def _list_album_images_from_service(service: Any, album_id: str, log: Log) -> list[dict[str, Any]]: + return _list_album_images(lambda body: service.mediaItems().search(body=body).execute(), album_id, log) + + +def _list_album_images(fetch_page: Any, album_id: str, log: Log) -> list[dict[str, Any]]: if not album_id: raise ValueError("album_id is required") @@ -488,7 +413,7 @@ def _list_album_images_from_service(service: Any, album_id: str, log: Log) -> li if page_token: body["pageToken"] = page_token - response = service.mediaItems().search(body=body).execute() + response = fetch_page(body) if not isinstance(response, dict): raise ValueError("Google Photos API response must be a JSON object") @@ -514,7 +439,6 @@ def _list_album_images_from_service(service: Any, album_id: str, log: Log) -> li log.debug(f"Album returned {len(images)} image media items across {page_count} pages") return images - def smoke_check_list_fetch(log: Log) -> None: class _Request: def __init__(self, response: dict[str, Any]): @@ -588,58 +512,20 @@ def cache_path_for_item(cache_dir: Path, item: dict[str, Any]) -> Path: def download_to_cache(creds: Credentials, item: dict[str, Any], cache_path: Path, config: Config, log: Log) -> None: - from urllib.error import HTTPError, URLError base_url = item.get("baseUrl") if not base_url: raise ValueError("mediaItem missing baseUrl") sized_url = f"{base_url}=w{config.width * 2}-h{config.height * 2}" - split_url = urlsplit(sized_url) - query = dict(parse_qsl(split_url.query, keep_blank_values=True)) - if creds.token and "access_token" not in query: - query["access_token"] = creds.token - media_url = urlunsplit(split_url._replace(query=urlencode(query))) - from urllib.request import Request as UrlRequest, urlopen - max_attempts = 3 - for attempt in range(1, max_attempts + 1): - req = UrlRequest(media_url, headers={"Authorization": f"Bearer {creds.token}"}) - try: - with urlopen(req, timeout=20) as resp: # nosec B310 - status_code = getattr(resp, "status", 200) - if not 200 <= status_code < 300: - raise MediaDownloadPermanentError(f"HTTP {status_code} for media download") - data = resp.read() - cache_path.parent.mkdir(parents=True, exist_ok=True) - cache_path.write_bytes(data) - log.debug(f"Cached image: {cache_path}") - return - except HTTPError as exc: - status_code = exc.code - if 500 <= status_code < 600: - if attempt == max_attempts: - raise MediaDownloadTransientError( - f"Transient HTTP {status_code} after {max_attempts} attempts" - ) from exc - time.sleep(0.5 * attempt) - continue - raise MediaDownloadPermanentError(f"HTTP {status_code} for media download") from exc - except (TimeoutError, socket.timeout) as exc: - if attempt == max_attempts: - raise MediaDownloadTransientError(f"Timed out after {max_attempts} attempts") from exc - time.sleep(0.5 * attempt) - continue - except URLError as exc: - reason = getattr(exc, "reason", None) - if isinstance(reason, (TimeoutError, socket.timeout)): - if attempt == max_attempts: - raise MediaDownloadTransientError(f"Timed out after {max_attempts} attempts") from exc - time.sleep(0.5 * attempt) - continue - raise MediaDownloadPermanentError(f"URL error during media download: {exc}") from exc + req = UrlRequest(sized_url, headers={"Authorization": f"Bearer {creds.token}"}) + with urlopen(req, timeout=20) as resp: # nosec B310 + data = resp.read() - raise MediaDownloadTransientError(f"Media download failed after {max_attempts} attempts") + cache_path.parent.mkdir(parents=True, exist_ok=True) + cache_path.write_bytes(data) + log.debug(f"Cached image: {cache_path}") def list_cached_images(cache_dir: Path) -> list[Path]: @@ -709,69 +595,40 @@ def write_framebuffer(img: Image.Image, fb_device: str, width: int, height: int) fb.write(payload) -def choose_online_image(config: Config, log: Log, *, force_token_path_reuse: bool = False) -> Path: - creds = authenticate(config, log, force_token_path_reuse=force_token_path_reuse) +def choose_local_image(config: Config, log: Log) -> Path: + local_images = list_cached_images(config.local_photos_dir) + if not local_images: + raise RuntimeError(f"Local photos directory empty: {config.local_photos_dir}") + chosen = random.choice(local_images) + log.info(f"LOCAL: selected {chosen.name}") + return chosen + +def choose_online_image(config: Config, log: Log) -> Path: + if not config.album_id: + raise RuntimeError("GOOGLE_PHOTOS_ALBUM_ID not configured") + + creds = authenticate(config, log) items = list_album_images(creds, config.album_id, log) if not items: raise RuntimeError("No image media items found in album") random.shuffle(items) - attempt_limit = len(items) if config.max_online_attempts == 0 else min(len(items), config.max_online_attempts) - - failures_by_item: dict[str, str] = {} - outcomes: dict[str, int] = {"cache_hit": 0, "download_success": 0, "download_failure": 0} - attempted_count = 0 - for item in items: - if attempted_count >= attempt_limit: - break - media_id = item.get("id", "unknown") - if media_id in failures_by_item: - continue - cache_path = cache_path_for_item(config.cache_dir, item) - attempted_count += 1 - if cache_path.exists() and cache_path.stat().st_size > 0: - outcomes["cache_hit"] += 1 log.info(f"ONLINE: cache hit for {media_id}") - log.info( - "ONLINE summary: " - f"attempted={attempted_count}/{attempt_limit}, " - f"cache_hit={outcomes['cache_hit']}, " - f"download_success={outcomes['download_success']}, " - f"download_failure={outcomes['download_failure']}" - ) return cache_path log.info(f"ONLINE: cache miss for {media_id}; downloading") try: download_to_cache(creds, item, cache_path, config, log) - outcomes["download_success"] += 1 - log.info( - "ONLINE summary: " - f"attempted={attempted_count}/{attempt_limit}, " - f"cache_hit={outcomes['cache_hit']}, " - f"download_success={outcomes['download_success']}, " - f"download_failure={outcomes['download_failure']}" - ) return cache_path except Exception as exc: - outcomes["download_failure"] += 1 - failures_by_item[str(media_id)] = str(exc) log.debug(f"Download failed for {media_id}: {exc}") continue - failure_summary = ", ".join(f"{media_id}={reason}" for media_id, reason in failures_by_item.items()) or "none" - log.info( - "ONLINE summary: " - f"attempted={attempted_count}/{attempt_limit}, " - f"cache_hit={outcomes['cache_hit']}, " - f"download_success={outcomes['download_success']}, " - f"download_failure={outcomes['download_failure']}" - ) - raise RuntimeError(f"Unable to fetch any album image after {attempted_count} attempts; failures: {failure_summary}") + raise RuntimeError("Unable to fetch any album image") def choose_offline_image(config: Config, log: Log) -> Path: @@ -784,25 +641,12 @@ def choose_offline_image(config: Config, log: Log) -> Path: def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser( - description="Render one random Google Photos album image to framebuffer.", - epilog=( - "Credential source precedence: " - "1) GOOGLE_PHOTOS_CLIENT_SECRETS_PATH file, " - "2) GOOGLE_PHOTOS_CLIENT_ID + GOOGLE_PHOTOS_CLIENT_SECRET, " - "3) GOOGLE_CLIENT_ID + GOOGLE_CLIENT_SECRET (legacy fallback)." - ), - ) + parser = argparse.ArgumentParser(description="Render one random Google Photos album image to framebuffer.") parser.add_argument("--test", action="store_true", help="Render to /tmp/photos-shuffle-test.png instead of framebuffer") parser.add_argument("--debug", action="store_true", help="Enable verbose debug logs") parser.add_argument("--check-config", action="store_true", help="Validate env configuration and exit") parser.add_argument("--auth-only", action="store_true", help="Run OAuth/token setup only and exit") parser.add_argument("--smoke-list-fetch", action="store_true", help="Run paginated list fetch smoke check and exit") - parser.add_argument( - "--force-token-path-reuse", - action="store_true", - help="Allow GOOGLE_TOKEN_PATH_PHOTOS to match GOOGLE_TOKEN_PATH and bypass token metadata mismatch checks.", - ) return parser.parse_args() @@ -819,39 +663,38 @@ def main() -> int: return 1 load_dotenv(DEFAULT_ROOT / ".env") - config, errors = validate_config(force_token_path_reuse=args.force_token_path_reuse) + config, errors = validate_config() if errors: report_validation_errors("photos-shuffle.py", errors) return 1 assert config is not None - try: - _preflight_token_path_guard(config.token_path, args.force_token_path_reuse) - _verify_token_metadata(config.token_path, args.force_token_path_reuse) - except ValueError as exc: - print(exc) - return 1 - if args.check_config: - print(f"[photos-shuffle.py] Credential source: {selected_credential_source(config)}") print("[photos-shuffle.py] Configuration check passed.") return 0 if args.auth_only: + if not config.album_id: + print("[photos-shuffle.py] No Google Photos album configured; auth check skipped.") + return 0 authenticate(config, log) print("[photos-shuffle.py] Authentication check passed.") return 0 source_image: Path | None = None try: - source_image = choose_online_image(config, log, force_token_path_reuse=args.force_token_path_reuse) - except Exception as exc: - log.info(f"Online unavailable ({exc}); trying offline cache") + source_image = choose_local_image(config, log) + except Exception as local_exc: + log.info(f"Local photos unavailable ({local_exc}); trying online source") try: - source_image = choose_offline_image(config, log) - except Exception as off_exc: - log.info(f"Offline cache unavailable ({off_exc}); using fallback image") - source_image = config.fallback_image + source_image = choose_online_image(config, log) + except Exception as exc: + log.info(f"Online unavailable ({exc}); trying offline cache") + try: + source_image = choose_offline_image(config, log) + except Exception as off_exc: + log.info(f"Offline cache unavailable ({off_exc}); using fallback image") + source_image = config.fallback_image try: frame = composite_frame(source_image, config.logo_path, config.width, config.height) @@ -886,3 +729,9 @@ def main() -> int: if __name__ == "__main__": raise SystemExit(main()) + + + + + + diff --git a/scripts/piholestats_v1.2.py b/scripts/piholestats_v1.2.py index d0a227c..a91276d 100644 --- a/scripts/piholestats_v1.2.py +++ b/scripts/piholestats_v1.2.py @@ -7,9 +7,12 @@ from pathlib import Path from datetime import datetime from PIL import Image, ImageDraw, ImageFont +from dotenv import load_dotenv from _config import get_env, report_validation_errors +DEFAULT_ROOT = Path('~/zero2dash').expanduser() + # -------- CONFIG -------- FBDEV = "/dev/fb1" W, H = 320, 240 @@ -646,6 +649,7 @@ def main(): print(f"[piholestats_v1.2.py] Self checks passed.") return 0 + load_dotenv(DEFAULT_ROOT / '.env') config, errors = validate_config() if errors: report_validation_errors("piholestats_v1.2.py", errors) @@ -700,3 +704,5 @@ def main(): raise SystemExit(main()) except KeyboardInterrupt: pass + + diff --git a/scripts/piholestats_v1.1.py b/scripts/piholestats_v1.3.py similarity index 87% rename from scripts/piholestats_v1.1.py rename to scripts/piholestats_v1.3.py index 2f225a3..217a01f 100644 --- a/scripts/piholestats_v1.1.py +++ b/scripts/piholestats_v1.3.py @@ -1,21 +1,23 @@ #!/usr/bin/env python3 # Pi-hole TFT Dashboard -> direct framebuffer RGB565 (no X, no SDL) # v6 auth handled elsewhere; this file only renders and calls API -# Version 1.1 - Introducing dark mode -# LEGACY: kept for compatibility/manual use; canonical night service uses piholestats_v1.2.py +# Version 1.3 - Always-on stats display +# Manual always-on variant; canonical night service uses piholestats_v1.2.py import os, sys, time, json, urllib.request, urllib.parse, urllib.error, mmap, struct, argparse, ssl, errno from pathlib import Path from datetime import datetime from PIL import Image, ImageDraw, ImageFont +from dotenv import load_dotenv from _config import get_env, report_validation_errors +DEFAULT_ROOT = Path('~/zero2dash').expanduser() + # -------- CONFIG -------- FBDEV = "/dev/fb1" W, H = 320, 240 REFRESH_SECS = 3 -ACTIVE_HOURS = (7, 22) PIHOLE_HOST = "127.0.0.1" PIHOLE_SCHEME = "" @@ -51,17 +53,6 @@ def _parse_int(value: str) -> int: raise ValueError(f"expected integer, got {value!r}") from exc -def _parse_active_hours(value: str) -> tuple[int, int]: - parts = [part.strip() for part in value.split(",")] - if len(parts) != 2: - raise ValueError("expected format start,end (e.g. 22,7)") - try: - start_hour, end_hour = int(parts[0]), int(parts[1]) - except ValueError as exc: - raise ValueError(f"expected integers in start,end, got {value!r}") from exc - for hour in (start_hour, end_hour): - validate_hour_bounds(hour) - return start_hour, end_hour def _parse_float(value: str) -> float: @@ -98,43 +89,10 @@ def _host_requires_explicit_scheme(raw_host: str) -> bool: return hostname not in {"localhost", "::1"} and not hostname.startswith("127.") -def validate_hour_bounds(hour: int) -> None: - if hour < 0 or hour > 23: - raise ValueError(f"hour must be in range 0-23, got {hour}") - - -def is_hour_active(now_hour: int, start: int, end: int) -> bool: - validate_hour_bounds(now_hour) - validate_hour_bounds(start) - validate_hour_bounds(end) - if start <= end: - return start <= now_hour <= end - return now_hour >= start or now_hour <= end - - -def run_self_checks() -> None: - # Non-wrapping range. - assert is_hour_active(8, 8, 17) - assert is_hour_active(17, 8, 17) - assert not is_hour_active(7, 8, 17) - # Cross-midnight range. - assert is_hour_active(23, 22, 7) - assert is_hour_active(3, 22, 7) - assert not is_hour_active(12, 22, 7) - # Single-hour range. - assert is_hour_active(0, 0, 0) - assert not is_hour_active(1, 0, 0) - # Late-night to early-morning range. - assert is_hour_active(23, 23, 2) - assert is_hour_active(1, 23, 2) - assert not is_hour_active(10, 23, 2) - - for invalid_hour in (-1, 24): - try: - is_hour_active(invalid_hour, 8, 17) - raise AssertionError("expected ValueError for out-of-range hour") - except ValueError: - pass + + + + def validate_config() -> tuple[dict[str, object] | None, list[str]]: @@ -156,7 +114,6 @@ def record(name: str, *, default=None, required=False, validator=None): api_token = record("PIHOLE_API_TOKEN", default="") refresh_secs = record("REFRESH_SECS", default=REFRESH_SECS, validator=_parse_int) request_timeout = record("PIHOLE_TIMEOUT", default=REQUEST_TIMEOUT, validator=_parse_float) - active_hours = record("ACTIVE_HOURS", default=f"{ACTIVE_HOURS[0]},{ACTIVE_HOURS[1]}", validator=_parse_active_hours) if isinstance(refresh_secs, int) and refresh_secs < 1: errors.append("REFRESH_SECS is invalid: must be greater than or equal to 1") @@ -189,21 +146,19 @@ def record(name: str, *, default=None, required=False, validator=None): "pihole_auth_mode": auth_mode, "request_timeout": float(request_timeout), "refresh_secs": int(refresh_secs), - "active_hours": active_hours if isinstance(active_hours, tuple) else ACTIVE_HOURS, }, [] def parse_args(): parser = argparse.ArgumentParser(description="Pi-hole framebuffer dashboard") parser.add_argument("--check-config", action="store_true", help="Validate env configuration and exit") - parser.add_argument("--self-test", action="store_true", help="Run active-hours self checks and exit") parser.add_argument("--output-image", default="", help="Write rendered frame to a PNG file instead of framebuffer") return parser.parse_args() def apply_config(config: dict[str, object], output_image: str = "") -> None: global FBDEV, PIHOLE_HOST, PIHOLE_SCHEME, PIHOLE_VERIFY_TLS, PIHOLE_CA_BUNDLE - global PIHOLE_PASSWORD, PIHOLE_API_TOKEN, PIHOLE_AUTH_MODE, REFRESH_SECS, ACTIVE_HOURS, BASE_URL + global PIHOLE_PASSWORD, PIHOLE_API_TOKEN, PIHOLE_AUTH_MODE, REFRESH_SECS, BASE_URL global REQUEST_TIMEOUT, REQUEST_TLS_VERIFY, OUTPUT_IMAGE FBDEV = str(config["fbdev"]) PIHOLE_HOST = str(config["pihole_host"]) @@ -215,7 +170,6 @@ def apply_config(config: dict[str, object], output_image: str = "") -> None: PIHOLE_AUTH_MODE = str(config["pihole_auth_mode"]) REQUEST_TIMEOUT = float(config["request_timeout"]) REFRESH_SECS = int(config["refresh_secs"]) - ACTIVE_HOURS = config["active_hours"] OUTPUT_IMAGE = output_image.strip() or str(get_env("OUTPUT_IMAGE", default="")).strip() BASE_URL = _normalize_host(PIHOLE_HOST, preferred_scheme=PIHOLE_SCHEME) REQUEST_TLS_VERIFY = _resolve_tls_verify(BASE_URL, PIHOLE_VERIFY_TLS, PIHOLE_CA_BUNDLE) @@ -478,7 +432,7 @@ def fetch_pihole(): fallback_summary = legacy.get("failure", {}).get("summary") or legacy.get("status", "LEGACY FAIL") print( - f"[piholestats_v1.1.py] Summary fetch failed. primary={primary_summary}; fallback={fallback_summary}", + f"[piholestats_v1.3.py] Summary fetch failed. primary={primary_summary}; fallback={fallback_summary}", file=sys.stderr, ) return { @@ -601,7 +555,7 @@ def draw_temp_value(d, rect, temp_c, font, colour): d.text((x, y), num, font=font, fill=colour) -def draw_frame(stats, temp_c, uptime, active): +def draw_frame(stats, temp_c, uptime): img = Image.new("RGB", (W, H), COL_BG) d = ImageDraw.Draw(img) big = load_font(28, True) @@ -626,12 +580,6 @@ def tile(x, y, color, title, value, val_font, value_is_temp=False): tw, th = text_size(d, value, val_font) d.text((r[0]+(r[2]-tw)//2, r[1]+(r[3]-th)//2), value, font=val_font, fill=COL_TXT) - if not active: - d.rounded_rectangle([margin, margin, W-margin, H-margin], radius=12, fill=(20,20,20)) - msg = f"{TITLE}: Sleeping" - tw, th = text_size(d, msg, mid) - d.text(((W-tw)//2,(H-th)//2), msg, font=mid, fill=(180,180,180)) - return img total = stats["total"] blocked = stats["blocked"] @@ -658,19 +606,16 @@ def tile(x, y, color, title, value, val_font, value_is_temp=False): def main(): args = parse_args() - if args.self_test: - run_self_checks() - print(f"[piholestats_v1.1.py] Self checks passed.") - return 0 + load_dotenv(DEFAULT_ROOT / '.env') config, errors = validate_config() if errors: - report_validation_errors("piholestats_v1.1.py", errors) + report_validation_errors("piholestats_v1.3.py", errors) return 1 assert config is not None if args.check_config: - print("[piholestats_v1.1.py] Configuration check passed.") + print("[piholestats_v1.3.py] Configuration check passed.") return 0 apply_config(config, output_image=args.output_image) @@ -686,8 +631,6 @@ def main(): pass while True: - hr = time.localtime().tm_hour - active = is_hour_active(hr, ACTIVE_HOURS[0], ACTIVE_HOURS[1]) s = fetch_pihole() if s["ok"]: @@ -696,10 +639,10 @@ def main(): temp_c = read_temp_c() uptime = read_uptime_str() - frame = draw_frame(cached, temp_c, uptime, active) + frame = draw_frame(cached, temp_c, uptime) fb_write(frame) if OUTPUT_IMAGE: - print(f"[piholestats_v1.1.py] Rendered test frame to {OUTPUT_IMAGE}.") + print(f"[piholestats_v1.3.py] Rendered test frame to {OUTPUT_IMAGE}.") return 0 time.sleep(REFRESH_SECS) @@ -710,3 +653,7 @@ def main(): raise SystemExit(main()) except KeyboardInterrupt: pass + + + +