diff --git a/requirements.txt b/requirements.txt index 9b8f0b0..cf3055e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,7 @@ pillow>=11.2.1 requests>=2.32 pychromecast~=14.0.7 httpx~=0.28.1 -sanitize-filename~=1.2.0 \ No newline at end of file +sanitize-filename~=1.2.0 +adb_shell==0.4.4 +async_timeout~=5.0.1 +simple-justwatch-python-api>=0.16 \ No newline at end of file diff --git a/src/adb_tv.py b/src/adb_tv.py new file mode 100644 index 0000000..88e038c --- /dev/null +++ b/src/adb_tv.py @@ -0,0 +1,109 @@ +""" +This module provides utilities for interacting with Android TVs via ADB (Android Debug Bridge). +It includes functions for managing ADB keys, connecting to devices, retrieving installed apps, +and verifying device authorization. +""" + +import asyncio +import os +from pathlib import Path +from typing import Dict, Optional + +from adb_shell.adb_device_async import AdbDeviceTcpAsync +from adb_shell.auth.keygen import keygen +from adb_shell.auth.sign_pythonrsa import PythonRSASigner + +ADB_CERTS_DIR = Path(os.environ.get("UC_CONFIG_HOME", "./config")) / "certs" +ADB_CERTS_DIR.mkdir(parents=True, exist_ok=True) + + +def get_adb_key_paths(device_id: str) -> tuple[Path, Path]: + """ + Return the paths to the private and public ADB keys for a given device. + + Args: + device_id (str): The unique identifier for the device. + + Returns: + tuple[Path, Path]: Paths to the private and public key files. + """ + priv = ADB_CERTS_DIR / f"adb_{device_id}" + pub = ADB_CERTS_DIR / f"adb_{device_id}.pub" + return priv, pub + + +def load_or_generate_adb_keys(device_id: str) -> PythonRSASigner: + """ + Ensure ADB RSA keys exist for the device and return the signer. + + Args: + device_id (str): The unique identifier for the device. + + Returns: + PythonRSASigner: The signer object for ADB authentication. + """ + priv_path, pub_path = get_adb_key_paths(device_id) + + if not priv_path.exists() or not pub_path.exists(): + keygen(str(priv_path)) + + with open(priv_path) as f: + priv = f.read() + with open(pub_path) as f: + pub = f.read() + return PythonRSASigner(pub, priv) + + +async def adb_connect(device_id: str, host: str, port: int = 5555) -> Optional[AdbDeviceTcpAsync]: + """ + Connect to an Android device via ADB. + + Args: + device_id (str): The unique identifier for the device. + host (str): The IP address or hostname of the device. + port (int, optional): The port number for the ADB connection. Defaults to 5555. + + Returns: + Optional[AdbDeviceTcpAsync]: The connected ADB device object, or None if the connection fails. + """ + signer = load_or_generate_adb_keys(device_id) + device = AdbDeviceTcpAsync(host, port, default_transport_timeout_s=9.0) + + try: + await device.connect(rsa_keys=[signer], auth_timeout_s=20) + return device + except Exception as e: + print(f"ADB connection failed to {host}:{port} — {e}") + return None + + +async def get_installed_apps(device: AdbDeviceTcpAsync) -> Dict[str, Dict[str, str]]: + """ + Retrieve a list of installed non-system apps in a structured format. + + Args: + device (AdbDeviceTcpAsync): The connected ADB device. + + Returns: + Dict[str, Dict[str, str]]: A dictionary of app package names and their metadata. + """ + output = await device.shell("pm list packages -3 -e") + packages = sorted(line.replace("package:", "").strip() for line in output.splitlines()) + return {package: {"url": f"market://launch?id={package}"} for package in packages} + + +async def is_authorised(device: AdbDeviceTcpAsync) -> bool: + """ + Check if the connected device is authorized for ADB communication. + + Args: + device (AdbDeviceTcpAsync): The connected ADB device. + + Returns: + bool: True if the device is authorized, False otherwise. + """ + try: + result = await device.shell("echo ADB_OK") + return "ADB_OK" in result + except Exception: + return False diff --git a/src/config.py b/src/config.py index b37b66f..8c8baea 100644 --- a/src/config.py +++ b/src/config.py @@ -11,6 +11,7 @@ import logging import os from dataclasses import dataclass +from pathlib import Path from typing import Iterator _LOG = logging.getLogger(__name__) @@ -18,6 +19,19 @@ _CFG_FILENAME = "config.json" +# Paths +def _get_config_root() -> Path: + config_home = Path(os.environ.get("UC_CONFIG_HOME", "./config")) + config_home.mkdir(parents=True, exist_ok=True) + return config_home + + +def _get_data_root() -> Path: + data_home = Path(os.environ.get("UC_DATA_HOME", "./data")) + data_home.mkdir(parents=True, exist_ok=True) + return data_home + + @dataclass class AtvDevice: """Android TV device configuration.""" @@ -38,6 +52,8 @@ class AtvDevice: """Enable External Metadata.""" use_chromecast: bool = False """Enable Chromecast features.""" + use_adb: bool = False + """Enable ADB features.""" class _EnhancedJSONEncoder(json.JSONEncoder): @@ -133,24 +149,25 @@ def update(self, atv: AtvDevice) -> bool: item.auth_error = atv.auth_error item.use_external_metadata = atv.use_external_metadata item.use_chromecast = atv.use_chromecast + item.use_adb = atv.use_adb return self.store() return False def default_certfile(self) -> str: """Return the default certificate file for initializing a device.""" - return os.path.join(self._data_path, "androidtv_remote_cert.pem") + return os.path.join(self._data_path + "/certs", "androidtv_remote_cert.pem") def default_keyfile(self) -> str: """Return the default key file for initializing a device.""" - return os.path.join(self._data_path, "androidtv_remote_key.pem") + return os.path.join(self._data_path + "/certs", "androidtv_remote_key.pem") def certfile(self, atv_id: str) -> str: """Return the certificate file of the device.""" - return os.path.join(self._data_path, f"androidtv_{atv_id}_remote_cert.pem") + return os.path.join(self._data_path + "/certs", f"androidtv_{atv_id}_remote_cert.pem") def keyfile(self, atv_id: str) -> str: """Return the key file of the device.""" - return os.path.join(self._data_path, f"androidtv_{atv_id}_remote_key.pem") + return os.path.join(self._data_path + "/certs", f"androidtv_{atv_id}_remote_key.pem") def remove(self, atv_id: str) -> bool: """Remove the given device configuration.""" @@ -236,6 +253,7 @@ def load(self) -> bool: item.get("auth_error", False), item.get("use_external_metadata", False), item.get("use_chromecast", False), + item.get("use_adb", False), ) self._config.append(atv) return True diff --git a/src/driver.py b/src/driver.py index 50ee0e4..e80c684 100644 --- a/src/driver.py +++ b/src/driver.py @@ -409,7 +409,7 @@ async def main(): logging.getLogger("setup_flow").setLevel(level) logging.getLogger("androidtvremote2").setLevel(level) logging.getLogger("external_metadata").setLevel(level) - # logging.getLogger("pychromecast").setLevel(level) + logging.getLogger("pychromecast").setLevel(level) profile_path = os.path.join(api.config_dir_path, "profiles") device_profile.load(profile_path) diff --git a/src/external_metadata.py b/src/external_metadata.py index f2c0dd1..e4b9a2d 100644 --- a/src/external_metadata.py +++ b/src/external_metadata.py @@ -13,7 +13,8 @@ from io import BytesIO from pathlib import Path from typing import Dict -from urllib.parse import urlparse +from urllib.parse import urlparse, quote +import re import google_play_scraper import httpx @@ -21,6 +22,8 @@ from PIL.Image import Resampling from pychromecast.controllers.media import MediaImage from sanitize_filename import sanitize +from config import _get_config_root, _get_data_root +from simplejustwatchapi import justwatch _LOG = logging.getLogger(__name__) @@ -30,27 +33,14 @@ # Paths -def _get_config_root() -> Path: - config_home = Path(os.environ.get("UC_CONFIG_HOME", "./config")) - config_home.mkdir(parents=True, exist_ok=True) - return config_home - - -def _get_cache_root() -> Path: - data_home = Path(os.environ.get("UC_DATA_HOME", "./data")) - cache_root = data_home / CACHE_ROOT - cache_root.mkdir(parents=True, exist_ok=True) - return cache_root - - def _get_metadata_dir() -> Path: - metadata_dir = _get_cache_root() + metadata_dir = _get_data_root() / CACHE_ROOT metadata_dir.mkdir(parents=True, exist_ok=True) return metadata_dir def _get_icon_dir() -> Path: - icon_dir = _get_cache_root() / ICON_SUBDIR + icon_dir = _get_data_root() / CACHE_ROOT / ICON_SUBDIR icon_dir.mkdir(parents=True, exist_ok=True) return icon_dir @@ -68,7 +58,6 @@ def _get_icon_path(icon_name: str) -> Path: return _get_config_root() / "icons" / sanitize(icon_name[9:]) return _get_icon_dir() / sanitize(icon_name) - # Cache Management def _load_cache() -> Dict[str, Dict[str, str]]: path = _get_metadata_file_path() @@ -228,4 +217,93 @@ async def get_app_metadata(package_id: str) -> Dict[str, str]: return {"name": metadata["name"], "icon": icon_data_uri} _LOG.debug("Falling back to default metadata for %s", package_id) - return {"name": package_id, "icon": ""} + return {"name": "", "icon": ""} + + +async def youtube_search(query: str, limit: int = 1): + url = f"https://www.youtube.com/results?search_query={quote(query)}" + headers = { + "User-Agent": "Mozilla/5.0" + } + + with httpx.Client(headers=headers, timeout=10) as client: + response = client.get(url) + html = response.text + + # Extract the ytInitialData JSON + match = re.search(r"var ytInitialData = ({.*?});", html) + if not match: + raise RuntimeError("Could not find ytInitialData in the page") + + data = json.loads(match.group(1)) + + try: + items = ( + data["contents"]["twoColumnSearchResultsRenderer"] + ["primaryContents"]["sectionListRenderer"] + ["contents"][0]["itemSectionRenderer"]["contents"] + ) + except (KeyError, IndexError): + raise RuntimeError("Could not parse YouTube data structure") + + for item in items: + if "videoRenderer" in item: + video = item["videoRenderer"] + video_id = video.get("videoId") + + return f"https://img.youtube.com/vi/{video_id}/0.jpg" + + return None + +async def search_poster_justwatch(query: str, country: str = "GB", limit: int = 1) -> list[dict]: + """Search for poster images using JustWatch API.""" + + response = justwatch.search(query, country, 'en', count=1, best_only=True) + + if not response[0].poster: + return None + else: + poster_url = response[0].poster + + if poster_url: + return poster_url + + return None + +async def get_best_artwork(title: str, artist: str = None, current_package: str = None) -> Dict[str, str] | bool: + _LOG.debug("Resolving best artwork for title='%s', artist='%s', current_package='%s'", title, artist, current_package) + + search_query = f"{title} - {artist}" if artist else title + + if current_package in ["com.google.android.youtube.tv", "com.liskovsoft.videomanager", "com.teamsmart.videomanager.tv"]: + + _LOG.debug("YouTube detected. Searching for artwork.") + + youtube = await youtube_search(search_query) + + if youtube: + _LOG.debug("Artwork result:\n%s", json.dumps(youtube, indent=2)) + return youtube + else: + _LOG.debug("No artwork found from YouTube search.") + + else: + + _LOG.debug("Non-YouTube package detected. Searching for artwork.") + justwatch = await search_poster_justwatch(search_query) + + if justwatch: + _LOG.debug("Artwork result:\n%s", json.dumps(justwatch, indent=2)) + return justwatch + else: + _LOG.debug("No artwork found from JustWatch search.") + + _LOG.debug("No artwork source applicable. Returning False.") + return None + + +# async def test(): +# posters = await get_best_artwork("Episode 1", "Breaking Bad", "com.plexapp.android") +# print(posters) +# +# asyncio.run(test()) \ No newline at end of file diff --git a/src/profiles.py b/src/profiles.py index ac5eace..9cd51ce 100644 --- a/src/profiles.py +++ b/src/profiles.py @@ -263,6 +263,25 @@ def match(self, manufacturer: str, model: str, use_chromecast: bool) -> Profile: select_profile = copy.copy(select_profile) select_profile.features.extend(CHROMECAST_FEATURES) + import string + for char in string.ascii_uppercase + " ": + # Use 'SPACE' as the key name for the space character + key = "SPACE" if char == " " else char + command_name = f"TEXT_{key}" + + # Append to simple_commands list + select_profile.simple_commands.append(command_name) + + # Map the command: space gets ' ', others get lowercase letter + command_value = " " if char == " " else char.lower() + select_profile.command_map[command_name] = Command(command_value, 'TEXT') + + select_profile.simple_commands.append('TEXT_BACKSPACE') + select_profile.command_map['TEXT_BACKSPACE'] = Command('DEL', KeyPress.SHORT) + + select_profile.simple_commands.append('TEXT_ENTER') + select_profile.command_map['TEXT_ENTER'] = Command('ENTER', KeyPress.SHORT) + return select_profile diff --git a/src/setup_flow.py b/src/setup_flow.py index 4fff16f..9a0fbe5 100644 --- a/src/setup_flow.py +++ b/src/setup_flow.py @@ -7,7 +7,9 @@ import asyncio import logging +import os from enum import IntEnum +from pathlib import Path import ucapi from ucapi import ( @@ -25,7 +27,7 @@ import config import discover import tv -from config import AtvDevice +from config import AtvDevice, _get_config_root _LOG = logging.getLogger(__name__) @@ -38,7 +40,8 @@ class SetupSteps(IntEnum): DISCOVER = 2 DEVICE_CHOICE = 3 PAIRING_PIN = 4 - RECONFIGURE = 5 + APP_SELECTION = 5 + RECONFIGURE = 6 _setup_step = SetupSteps.INIT @@ -48,6 +51,9 @@ class SetupSteps(IntEnum): _use_external_metadata: bool = False _reconfigured_device: AtvDevice | None = None _use_chromecast: bool = False +_use_adb: bool = False +_adb_device_id: str = "" +_device_info: dict[str, str] = {} # TODO #9 externalize language texts _user_input_discovery = RequestUserInput( @@ -108,6 +114,8 @@ async def driver_setup_handler(msg: SetupDriver) -> SetupAction: return await handle_device_choice(msg) if _setup_step == SetupSteps.PAIRING_PIN and "pin" in msg.input_values: return await handle_user_data_pin(msg) + if _setup_step == SetupSteps.APP_SELECTION: + return await handle_app_selection(msg) if _setup_step == SetupSteps.RECONFIGURE: return await _handle_device_reconfigure(msg) _LOG.error("No or invalid user response was received: %s", msg) @@ -285,6 +293,16 @@ async def handle_configuration_mode( _LOG.warning("Could not remove device from configuration: %s", choice) return SetupError(error_type=IntegrationSetupError.OTHER) config.devices.store() + adb_cert_path = Path(os.environ.get("UC_CONFIG_HOME", "./config")) / "certs" / f"adb_{choice}" + adb_cert_path_pub = Path(os.environ.get("UC_CONFIG_HOME", "./config")) / "certs" / f"adb_{choice}.pub" + if adb_cert_path.exists(): + adb_cert_path.unlink() + if adb_cert_path_pub.exists(): + adb_cert_path_pub.unlink() + appslist_path = Path(os.environ.get("UC_CONFIG_HOME", "./config")) / f"appslist_{choice}.json" + if appslist_path.exists(): + appslist_path.unlink() + _LOG.info("Device removed from configuration: %s", choice) return SetupComplete() case "configure": # Reconfigure device if the identifier has changed @@ -297,9 +315,8 @@ async def handle_configuration_mode( _setup_step = SetupSteps.RECONFIGURE _reconfigured_device = selected_device use_chromecast = selected_device.use_chromecast if selected_device.use_chromecast else False - use_external_metadata = ( - selected_device.use_external_metadata if selected_device.use_external_metadata else False - ) + use_external_metadata = selected_device.use_external_metadata if selected_device.use_external_metadata else False + use_adb = selected_device.use_adb if selected_device.use_adb else False return RequestUserInput( { @@ -325,6 +342,15 @@ async def handle_configuration_mode( }, "field": {"checkbox": {"value": use_external_metadata}}, }, + { + "id": "adb", + "label": { + "en": "Preview feature: Enable ADB connection (for app list)", + "de": "Vorschaufunktion: Aktiviere ADB Verbindung (für App-Browsing)", + "fr": "Fonctionnalité en aperçu: Activer la connexion ADB (pour la navigation dans les applications)", + }, + "field": {"checkbox": {"value": use_adb}}, + }, ], ) case "reset": @@ -446,6 +472,15 @@ async def _handle_discovery(msg: UserDataResponse) -> RequestUserInput | SetupEr }, "field": {"checkbox": {"value": False}}, }, + { + "id": "adb", + "label": { + "en": "Preview feature: Enable ADB connection (for app list)", + "de": "Vorschaufunktion: Aktiviere ADB Verbindung (für App-Browsing)", + "fr": "Fonctionnalité en aperçu: Activer la connexion ADB (pour la navigation dans les applications)", + }, + "field": {"checkbox": {"value": False}}, + }, ], ) @@ -463,10 +498,12 @@ async def handle_device_choice(msg: UserDataResponse) -> RequestUserInput | Setu global _use_chromecast global _setup_step global _use_external_metadata + global _use_adb choice = msg.input_values["choice"] _use_external_metadata = msg.input_values.get("external_metadata", "false") == "true" _use_chromecast = msg.input_values.get("chromecast", "false") == "true" + _use_adb = msg.input_values.get("adb", "false") == "true" name = "" for discovered_tv in _discovered_android_tvs: @@ -484,6 +521,7 @@ async def handle_device_choice(msg: UserDataResponse) -> RequestUserInput | Setu id="", use_external_metadata=False, use_chromecast=False, + use_adb=False, ), ) _LOG.info("Chosen Android TV: %s. Start pairing process...", choice) @@ -516,7 +554,7 @@ async def handle_device_choice(msg: UserDataResponse) -> RequestUserInput | Setu return _setup_error_from_device_state(_pairing_android_tv.state) -async def handle_user_data_pin(msg: UserDataResponse) -> SetupComplete | SetupError: +async def handle_user_data_pin(msg: UserDataResponse) -> RequestUserInput | SetupComplete | SetupError: """ Process user data pairing pin response in a setup process. @@ -526,6 +564,9 @@ async def handle_user_data_pin(msg: UserDataResponse) -> SetupComplete | SetupEr :return: the setup action on how to continue: SetupComplete if a valid Android TV device was chosen. """ global _pairing_android_tv + global _setup_step + + _LOG.debug("Entered handle_user_data_pin with msg: %s", msg) if _pairing_android_tv is None: _LOG.error("Can't handle pairing pin: no device instance! Aborting setup") @@ -534,23 +575,33 @@ async def handle_user_data_pin(msg: UserDataResponse) -> SetupComplete | SetupEr _LOG.info("[%s] User has entered the PIN", _pairing_android_tv.log_id) res = await _pairing_android_tv.finish_pairing(msg.input_values["pin"]) - _pairing_android_tv.disconnect() + _LOG.debug("[%s] finish_pairing result: %s", _pairing_android_tv.log_id, res) - device_info = None + _pairing_android_tv.disconnect() + _LOG.debug("[%s] Disconnected after pairing attempt", _pairing_android_tv.log_id) - # Connect again to retrieve device identifier (with init()) and additional device information (with connect()) if res == ucapi.StatusCodes.OK: - _LOG.info( - "[%s] Pairing done, retrieving device information", - _pairing_android_tv.log_id, - ) + _LOG.info("[%s] Pairing done, retrieving device information", _pairing_android_tv.log_id) res = ucapi.StatusCodes.SERVER_ERROR timeout = int(tv.CONNECTION_TIMEOUT) - if await _pairing_android_tv.init(timeout) and await _pairing_android_tv.connect(timeout): - device_info = _pairing_android_tv.device_info or {} + _LOG.debug("[%s] Attempting to initialize and connect with timeout: %d", _pairing_android_tv.log_id, timeout) + + _device_info = None + timeout = int(tv.CONNECTION_TIMEOUT) + + if await _pairing_android_tv.init(timeout): + _LOG.debug("[%s] Initialization successful", _pairing_android_tv.log_id) + if await _pairing_android_tv.connect(timeout): + _LOG.debug("[%s] Connection successful", _pairing_android_tv.log_id) + _device_info = _pairing_android_tv.device_info or {} + _LOG.debug("[%s] Retrieved device info: %s", _pairing_android_tv.log_id, _device_info) + if config.devices.assign_default_certs_to_device(_pairing_android_tv.identifier, True): res = ucapi.StatusCodes.OK - _pairing_android_tv.disconnect() + _LOG.debug("[%s] Default certificates assigned successfully", _pairing_android_tv.log_id) + + _pairing_android_tv.disconnect() + _LOG.debug("[%s] Disconnected after retrieving device information", _pairing_android_tv.log_id) if res != ucapi.StatusCodes.OK: state = _pairing_android_tv.state @@ -558,23 +609,183 @@ async def handle_user_data_pin(msg: UserDataResponse) -> SetupComplete | SetupEr _pairing_android_tv = None return _setup_error_from_device_state(state) + if _use_adb: + _LOG.debug("ADB is enabled, proceeding with ADB setup") + + if not msg.input_values.get("adb", False): + _LOG.error("ADB setup failed: 'use_adb' not found in input values") + return SetupError() + + from adb_tv import adb_connect, get_installed_apps, is_authorised + + device_id = _pairing_android_tv.identifier + ip_address = _pairing_android_tv.address + _LOG.debug("Attempting ADB setup for device_id: %s, ip_address: %s", device_id, ip_address) + + adb_device = await adb_connect(device_id, ip_address) + if not adb_device: + return SetupError(error_type=IntegrationSetupError.AUTHORIZATION_ERROR) + + if not await is_authorised(adb_device): + return SetupError(error_type=IntegrationSetupError.AUTHORIZATION_ERROR) + + _LOG.debug("ADB authorisation confirmed") + + from apps import Apps, IdMappings + from external_metadata import get_app_metadata + + # STEP 1: Resolve canonical friendly names used in offline mappings + offline_friendly_names = set(IdMappings.values()) + offline_package_ids = set(Apps.keys()) + + if _use_adb: + adb_apps = await get_installed_apps(adb_device) + _LOG.debug("Retrieved ADB apps: %s", adb_apps) + await adb_device.close() + + # STEP 2: Remove ADB entries that duplicate an offline friendly name + filtered_adb_apps = {} + for package, details in adb_apps.items(): + if package in Apps: + continue # Already covered offline (exact match) + if package in IdMappings: + continue # Already has a mapped friendly name offline + friendly = details.get("name", "") + if friendly and friendly in IdMappings.values(): + continue # Another offline app already uses this friendly name + filtered_adb_apps[package] = details + + + # STEP 3: Merge with Apps, giving priority to offline Apps + merged_apps = {**filtered_adb_apps, **Apps} + else: + merged_apps = Apps + + _LOG.debug("Merged apps (offline preferred): %s", merged_apps) + + # STEP 4: Build app list for rendering + app_entries = [] + + for package, details in merged_apps.items(): + mapped_name = IdMappings.get(package) + static_name = details.get("name", package) + name = mapped_name or static_name + editable = False + + if _use_external_metadata and not mapped_name: + try: + metadata = await get_app_metadata(package) + if metadata.get("name"): + name = metadata["name"] + editable = True # Only editable if name came from metadata + except Exception as e: + _LOG.warning("Metadata lookup failed for %s: %s", package, e) + + is_unfriendly = name.startswith("com.") or name.count('.') >= 2 + app_entries.append((is_unfriendly, name.lower(), package, name, editable)) + + # STEP 5: Sort by friendly/unfriendly then alphabetically + app_entries.sort() + + settings = [] + + # STEP 6: Output settings based on filtered and sorted entries + for _, _, package, name, editable in app_entries: + is_adb_only = _use_adb and package in adb_apps and package not in Apps + + settings.append( + { + "id": f"{package}_enabled", + "label": { + "en": name if not is_adb_only else f"{package}", + "de": name if not is_adb_only else f"{package}", + "fr": name if not is_adb_only else f"{package}", + }, + "field": {"checkbox": {"value": False}}, + } + ) + + if is_adb_only and editable: + settings.append( + { + "id": f"{package}_name", + "label": { + "en": f"Friendly name for {package}", + "de": f"Anzeigename für {package}", + "fr": f"Nom convivial pour {package}", + }, + "field": {"text": {"value": name}}, + } + ) + + _setup_step = SetupSteps.APP_SELECTION + return RequestUserInput( + title={ + "en": "Select visible apps", + "de": "Wähle sichtbare Apps", + "fr": "Sélectionnez les applications visibles", + }, + settings=settings, + ) + + +async def handle_app_selection(msg: UserDataResponse) -> SetupComplete | SetupError: + global _pairing_android_tv + global _use_chromecast + global _use_external_metadata + global _use_adb + global _device_info + + import json + + from apps import Apps # offline apps + + selected_apps = {} + + for field_id, value in msg.input_values.items(): + if field_id.endswith("_enabled") and str(value).lower() == "true": + package = field_id.removesuffix("_enabled") + name_field = f"{package}_name" + friendly_name = msg.input_values.get(name_field, package) + + # Prefer static app URL if it exists + static_entry = Apps.get(friendly_name) or Apps.get(package) + url = static_entry["url"] if static_entry else f"market://launch?id={package}" + + selected_apps[friendly_name] = {"url": url} + + filename = f"appslist_{_pairing_android_tv.identifier}.json" + apps_file = _get_config_root() / filename + try: + apps_file.write_text(json.dumps(selected_apps, indent=2)) + _LOG.info("App selection stored: %s", selected_apps) + except Exception as e: + _LOG.error("Failed to write selected apps: %s", e) + return SetupError() + device = AtvDevice( id=_pairing_android_tv.identifier, name=_pairing_android_tv.name, address=_pairing_android_tv.address, + manufacturer=_device_info.get("manufacturer", ""), + model=_device_info.get("model", ""), use_external_metadata=_use_external_metadata, use_chromecast=_use_chromecast, - manufacturer=device_info.get("manufacturer", ""), - model=device_info.get("model", ""), + use_adb=_use_adb, ) + _LOG.debug("Created AtvDevice: %s", device) + + config.devices.add_or_update(device) + _LOG.debug("Device added/updated in configuration") - config.devices.add_or_update(device) # triggers AndroidTv instance creation config.devices.store() + _LOG.debug("Configuration stored") - # ATV device connection will be triggered with subscribe_entities request - _pairing_android_tv = None await asyncio.sleep(1) _LOG.info("[%s] Setup successfully completed for %s", device.name, device.id) + + _pairing_android_tv = None + return SetupComplete() @@ -596,10 +807,12 @@ async def _handle_device_reconfigure( use_chromecast = msg.input_values.get("chromecast", "false") == "true" use_external_metadata = msg.input_values.get("external_metadata", "false") == "true" + use_adb = msg.input_values.get("adb", "false") == "true" _LOG.debug("User has changed configuration") _reconfigured_device.use_chromecast = use_chromecast _reconfigured_device.use_external_metadata = use_external_metadata + _reconfigured_device.use_adb = use_adb config.devices.add_or_update(_reconfigured_device) # triggers ATV instance update await asyncio.sleep(1) diff --git a/src/tv.py b/src/tv.py index fa3051d..20e9309 100644 --- a/src/tv.py +++ b/src/tv.py @@ -8,6 +8,7 @@ # pylint: disable=too-many-lines import asyncio +import json import logging import os import socket @@ -49,8 +50,8 @@ import apps import discover import inputs -from config import AtvDevice -from external_metadata import encode_icon_to_data_uri, get_app_metadata +from config import AtvDevice, _get_config_root +from external_metadata import encode_icon_to_data_uri, get_app_metadata, get_best_artwork from profiles import KeyPress, Profile from util import filter_data_img_properties @@ -232,6 +233,9 @@ def __init__( self._use_app_url = not device_config.use_chromecast self._player_state = media_player.States.ON self._muted = False + self._is_chromecast_playing = False + self._chromecast_metadata_active = False + self._emit_lock = asyncio.Lock() def __del__(self): """Destructs instance, disconnect AndroidTVRemote.""" @@ -594,111 +598,18 @@ def disconnect(self) -> None: self.events.emit(Events.DISCONNECTED, self._identifier) # Callbacks - async def _apply_current_app_metadata(self, current_app: str) -> dict: - global HOMESCREEN_IMAGE - - update = {} - # one-time initialization - if HOMESCREEN_IMAGE is None: - HOMESCREEN_IMAGE = "" - HOMESCREEN_IMAGE = await encode_icon_to_data_uri("config://androidtv.png") - - # Special handling for homescreen & Android TV system apps: show pre-defined icon - homescreen_app = apps.is_homescreen_app(current_app) - if homescreen_app or apps.is_standby_app(current_app): - update[MediaAttr.SOURCE] = apps.IdMappings[current_app] - update[MediaAttr.MEDIA_TITLE] = "" - update[MediaAttr.MEDIA_IMAGE_URL] = HOMESCREEN_IMAGE - update[MediaAttr.STATE] = ( - media_player.States.ON.value if homescreen_app else media_player.States.STANDBY.value - ) - return update - - # Track state of data sources - offline_name = None - offline_match = None - external_name = None - external_icon = None - - # Try offline ID mapping first - if current_app in apps.IdMappings: - offline_name = apps.IdMappings[current_app] - self._media_app = offline_name - - # Try fuzzy offline name matching if ID mapping failed - if not offline_name: - for query, name in apps.NameMatching.items(): - if query in current_app: - offline_match = name - self._media_app = name - break - - # Try external metadata - metadata = ( - await get_app_metadata(current_app) if current_app and self._device_config.use_external_metadata else None - ) - if metadata: - if _LOG.isEnabledFor(logging.DEBUG): - _LOG.debug("App metadata: %s", filter_data_img_properties(metadata)) - external_name = metadata.get("name") - external_icon = metadata.get("icon") - if external_name: - self._media_app = external_name - if external_icon: - self._app_image_url = external_icon - - # Determine final name/title to use - name_to_use = offline_name or offline_match or external_name or current_app - # TODO why set name to both source & media title fields? - update[MediaAttr.SOURCE] = name_to_use - if not self._media_title and not self._media_image_url: - update[MediaAttr.MEDIA_TITLE] = name_to_use - - # Determine which icon to use - icon_to_use = None - if self._device_config.use_external_metadata or self._use_app_url: - if external_icon: - icon_to_use = external_icon - else: - icon_to_use = "" - elif self._media_image_url: - icon_to_use = await encode_icon_to_data_uri(self._media_image_url) - - update[MediaAttr.STATE] = media_player.States.PLAYING.value - # Skip applying app icon if media image from cast is present - if not self._media_image_url: - if not icon_to_use: - update[MediaAttr.MEDIA_IMAGE_URL] = HOMESCREEN_IMAGE - else: - update[MediaAttr.MEDIA_IMAGE_URL] = icon_to_use - - return update def _is_on_updated(self, is_on: bool) -> None: - """Notify that the Android TV power state is updated.""" - asyncio.create_task(self._handle_is_on_updated(is_on)) - - async def _handle_is_on_updated(self, is_on: bool): - _LOG.info("[%s] is on: %s", self.log_id, is_on) - current_app = self._atv.current_app or "" - if is_on: - self._chromecast_connect() - update = await self._apply_current_app_metadata(current_app) - update[MediaAttr.STATE] = media_player.States.ON.value - else: - update = await self._apply_current_app_metadata(current_app) - update[MediaAttr.STATE] = media_player.States.OFF.value - - self.events.emit(Events.UPDATE, self._identifier, update) + if not self._loop or not self._loop.is_running(): + _LOG.warning("[%s] No running event loop for power update", self.log_id) + return + asyncio.run_coroutine_threadsafe(self._update_media_status(), self._loop) def _current_app_updated(self, current_app: str) -> None: - """Notify that the current app on Android TV is updated.""" - asyncio.create_task(self._handle_current_app_updated(current_app)) - - async def _handle_current_app_updated(self, current_app: str): - _LOG.debug("[%s] current_app: %s", self.log_id, current_app) - update = await self._apply_current_app_metadata(current_app) - self.events.emit(Events.UPDATE, self._identifier, update) + if not self._loop or not self._loop.is_running(): + _LOG.warning("[%s] No running event loop for app update", self.log_id) + return + asyncio.run_coroutine_threadsafe(self._update_media_status(), self._loop) def _volume_info_updated(self, volume_info: dict[str, str | bool]) -> None: """Notify that the Android TV volume information is updated.""" @@ -716,8 +627,22 @@ def _is_available_updated(self, is_available: bool): def _update_app_list(self) -> None: update = {} source_list = [] - for app in apps.Apps: - source_list.append(app) + + filename = f"appslist_{self._identifier}.json" + apps_file = _get_config_root() / filename + + if apps_file.exists(): + try: + with apps_file.open("r", encoding="utf-8") as f: + selected_apps = json.load(f) + source_list.extend(selected_apps.keys()) + except Exception as e: + _LOG.warning("Failed to read apps list from %s: %s", apps_file, e) + else: + _LOG.info("No saved app list found for %s, falling back to default", self._identifier) + import apps # fall back to static apps + + source_list.extend(apps.Apps.keys()) update[MediaAttr.SOURCE_LIST] = source_list self.events.emit(Events.UPDATE, self._identifier, update) @@ -772,15 +697,31 @@ async def turn_off(self) -> ucapi.StatusCodes: async def select_source(self, source: str) -> ucapi.StatusCodes: """ - Select a given source, either a pre-defined app, input or by app-link/id. + Select a given source, either a user-defined app (from JSON), + an input source (KeyCode), or directly by app-link / id. :param source: the friendly source name or an app-link / id """ - if source in apps.Apps: - return await self._launch_app(apps.Apps[source]["url"]) + # Load saved apps for this device + apps_file = _get_config_root() / f"appslist_{self._identifier}.json" + apps_list = {} + + if apps_file.exists(): + try: + with apps_file.open("r", encoding="utf-8") as f: + apps_list = json.load(f) + except Exception as e: + _LOG.warning("Failed to read apps list for %s: %s", self._identifier, e) + + # Match known friendly app name + if source in apps_list: + return await self._launch_app(apps_list[source]["url"]) + + # Match input source if source in inputs.KeyCode: return await self._switch_input(source) + # Fall back to direct launch (e.g. package name or intent URI) return await self._launch_app(source) @async_handle_atvlib_errors @@ -804,6 +745,17 @@ async def _send_command(self, keycode: int | str, action: KeyPress = KeyPress.SH :return: OK if scheduled to be sent, other error code in case of an error """ # noqa + if action == 'TEXT': + # Special handling for text input + if not isinstance(keycode, str): + _LOG.error("[%s] Cannot send command, invalid key_code: %s", self.log_id, keycode) + return ucapi.StatusCodes.BAD_REQUEST + if keycode in ('DEL', 'ENTER'): + self._atv.send_key_command(keycode, 'SHORT') + else: + self._atv.send_text(keycode) + return ucapi.StatusCodes.OK + if action in (KeyPress.LONG, KeyPress.BEGIN): direction = "START_LONG" elif action == KeyPress.END: @@ -827,6 +779,12 @@ async def _launch_app(self, app: str) -> ucapi.StatusCodes: self._atv.send_launch_app_command(app) return ucapi.StatusCodes.OK + @async_handle_atvlib_errors + async def _send_text(self, text: str) -> ucapi.StatusCodes: + """Launch an app on Android TV.""" + self._atv.send_text(text) + return ucapi.StatusCodes.OK + async def _switch_input(self, source: str) -> ucapi.StatusCodes: """ TEST FUNCTION: Send a KEYCODE_TV_INPUT_* key. @@ -842,86 +800,10 @@ def new_connection_status(self, status: ConnectionStatus) -> None: _LOG.info("[%s] Received Chromecast connection status : %s", self.log_id, status) def new_media_status(self, status: MediaStatus) -> None: - """Receive new media status event from Google cast.""" if not self._loop or not self._loop.is_running(): - _LOG.warning("[%s] No running event loop for handling new media status", self.log_id) + _LOG.warning("[%s] No running event loop for Chromecast status", self.log_id) return - - try: - asyncio.run_coroutine_threadsafe(self._handle_new_media_status(status), self._loop) - except Exception as e: - _LOG.error("[%s] Failed to schedule media status handler: %s", self.log_id, e) - - async def _handle_new_media_status(self, status: MediaStatus): - update = {} - - if ( - status.player_state - and GOOGLE_CAST_MEDIA_STATES_MAP.get(status.player_state, media_player.States.PLAYING) != self._player_state - ): - # PLAYING, PAUSED, IDLE - self._player_state = GOOGLE_CAST_MEDIA_STATES_MAP.get(status.player_state, media_player.States.PLAYING) - self._last_update_position_time = 0 - update[MediaAttr.STATE] = self._player_state - - if status.album_name != self._media_album: - self._media_album = status.album_name or "" - update[MediaAttr.MEDIA_ALBUM] = self._media_album - - if status.artist != self._media_artist: - self._media_artist = status.artist or "" - update[MediaAttr.MEDIA_ARTIST] = self._media_artist - - if status.title != self._media_title: - current_title = self.media_title - self._media_title = status.title or "" - if current_title != self.media_title: - _LOG.debug("[%s] Chromecast Media info updated : %s", self.log_id, status) - update[MediaAttr.MEDIA_TITLE] = self.media_title - - current_time = int(status.current_time) if status.current_time else 0 - duration = int(status.duration) if status.duration else 0 - changed_duration = False - - if duration != self._media_duration: - self._media_duration = duration - update[MediaAttr.MEDIA_DURATION] = self._media_duration - changed_duration = True - - # Update position every 30 seconds - if changed_duration or ( - current_time != self._media_position and self._last_update_position_time + 30 < time.time() - ): - self._media_position = current_time - update[MediaAttr.MEDIA_POSITION] = self._media_position - update[MediaAttr.MEDIA_DURATION] = self._media_duration - self._last_update_position_time = time.time() - - if ( - status.metadata_type - and GOOGLE_CAST_MEDIA_TYPES_MAP.get(status.metadata_type, MediaType.VIDEO) != self._media_type - ): - self._media_type = GOOGLE_CAST_MEDIA_TYPES_MAP.get(status.metadata_type, MediaType.VIDEO) - update[MediaAttr.MEDIA_TYPE] = self._media_type - - if status.images and len(status.images) > 0 and status.images[0].url != self._media_image_url: - self._media_image_url = status.images[0].url - update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri(self._media_image_url) - self._use_app_url = False - else: - self._media_image_url = None - if self._device_config.use_external_metadata: - self._use_app_url = True - if self._app_image_url: - update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri(self._app_image_url) - - if update: - if _LOG.isEnabledFor(logging.DEBUG): - _LOG.debug( - "[%s] Update remote with Chromecast info : %s", self.log_id, filter_data_img_properties(update) - ) - - self.events.emit(Events.UPDATE, self._identifier, update) + asyncio.run_coroutine_threadsafe(self._update_media_status(), self._loop) def load_media_failed(self, queue_item_id: int, error_code: int) -> None: """Receive new media failed event from Google cast.""" @@ -1005,3 +887,141 @@ async def volume_set(self, volume: float | None) -> ucapi.StatusCodes: except PyChromecastError as ex: _LOG.error("[%s] Chromecast error sending command : %s", self.log_id, ex) return ucapi.StatusCodes.BAD_REQUEST + + async def _update_media_status(self): + global HOMESCREEN_IMAGE + + update = {} + + # Initialize homescreen icon once + if HOMESCREEN_IMAGE is None: + HOMESCREEN_IMAGE = await encode_icon_to_data_uri("config://androidtv.png") + + current_app = self._atv.current_app or "" + is_on = self._atv.is_on + homescreen_app = apps.is_homescreen_app(current_app) + standby_app = apps.is_standby_app(current_app) + + # Device is OFF + if not is_on: + self._chromecast_metadata_active = False + update.update({ + MediaAttr.STATE: media_player.States.OFF.value, + MediaAttr.MEDIA_TITLE: "", + MediaAttr.SOURCE: "", + MediaAttr.MEDIA_IMAGE_URL: "", + }) + async with self._emit_lock: + self.events.emit(Events.UPDATE, self._identifier, update) + return + + # Device on homescreen or standby app + if homescreen_app or standby_app: + self._chromecast_metadata_active = False + update.update({ + MediaAttr.STATE: media_player.States.ON.value if homescreen_app else media_player.States.STANDBY.value, + MediaAttr.SOURCE: "", + MediaAttr.MEDIA_TITLE: "", + MediaAttr.MEDIA_IMAGE_URL: "", + }) + async with self._emit_lock: + self.events.emit(Events.UPDATE, self._identifier, update) + return + + # Chromecast status (only if enabled) + chromecast_playing_states = ( + MEDIA_PLAYER_STATE_PLAYING, + MEDIA_PLAYER_STATE_PAUSED, + MEDIA_PLAYER_STATE_BUFFERING, + ) + + chromecast_active = False + chromecast_status = None + + if self._device_config.use_chromecast and self._chromecast: + chromecast_status = self._chromecast.media_controller.status + chromecast_active = ( + self._chromecast.socket_client.is_connected and + chromecast_status and + chromecast_status.player_state in chromecast_playing_states and + (chromecast_status.title or chromecast_status.images) + ) + + self._chromecast_metadata_active = chromecast_active + + # App metadata from offline mappings + offline_name = apps.IdMappings.get(current_app) + offline_match = next( + (name for query, name in apps.NameMatching.items() if query in current_app), None + ) + + external_name = None + external_icon = None + + # External metadata (if enabled) + if self._device_config.use_external_metadata: + metadata = await get_app_metadata(current_app) + if metadata: + external_name = metadata.get("name") + external_icon = metadata.get("icon") or HOMESCREEN_IMAGE + if _LOG.isEnabledFor(logging.DEBUG): + _LOG.debug("External app metadata: %s", filter_data_img_properties(metadata)) + + app_name = offline_name or offline_match or external_name or current_app + self._media_app = app_name + self._app_image_url = external_icon if isinstance(external_icon, str) and external_icon.strip() else HOMESCREEN_IMAGE + + update[MediaAttr.SOURCE] = app_name + + # --- Chromecast Active State Handling --- + if chromecast_active: + # Media Title from Chromecast (fallback to app_name) + self._media_title = chromecast_status.title if chromecast_status.title is not None else chromecast_status.artist or "" + update[MediaAttr.MEDIA_TITLE] = self._media_title or chromecast_status.artist + + # Media Image from Chromecast + if chromecast_status.images and chromecast_status.images[0].url: + self._media_image_url = chromecast_status.images[0].url + + # External Artwork fallback (if enabled) + elif self._device_config.use_external_metadata and chromecast_status.player_state in ["PLAYING", "PAUSED", "BUFFERING"]: + self._media_image_url = await get_best_artwork( + chromecast_status.title, chromecast_status.artist, current_app + ) or external_icon or HOMESCREEN_IMAGE + else: + self._media_image_url = external_icon + + # update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri(self._media_image_url) + + update[MediaAttr.STATE] = GOOGLE_CAST_MEDIA_STATES_MAP.get( + chromecast_status.player_state, media_player.States.PLAYING + ) + + update.update({ + MediaAttr.MEDIA_ARTIST: chromecast_status.artist or "", + MediaAttr.MEDIA_ALBUM: chromecast_status.album_name or "", + MediaAttr.MEDIA_POSITION: int(chromecast_status.current_time or 0), + MediaAttr.MEDIA_DURATION: int(chromecast_status.duration or 0), + MediaAttr.SOURCE: app_name, + MediaAttr.MEDIA_IMAGE_URL: await encode_icon_to_data_uri(self._media_image_url), + }) + + + # --- Chromecast Inactive or Disabled: App-level metadata only --- + else: + self._media_title = app_name + self._media_image_url = self._app_image_url or HOMESCREEN_IMAGE + + update.update({ + MediaAttr.MEDIA_TITLE: app_name, + MediaAttr.MEDIA_IMAGE_URL: await encode_icon_to_data_uri(self._media_image_url), + MediaAttr.STATE: media_player.States.PLAYING.value, + MediaAttr.SOURCE: app_name, + MediaAttr.MEDIA_ARTIST: "", + MediaAttr.MEDIA_ALBUM: "", + MediaAttr.MEDIA_POSITION: 0, + MediaAttr.MEDIA_DURATION: 0, + }) + + async with self._emit_lock: + self.events.emit(Events.UPDATE, self._identifier, update)