diff --git a/README.md b/README.md index 00eb1c0..2696ba6 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Android TV integration for Remote Two/3 -Using [androidtvremote2](https://github.com/tronikos/androidtvremote2), [uc-integration-api](https://github.com/aitatoi/integration-python-library) and [pychromecast](https://github.com/home-assistant-libs/pychromecast). +Using [androidtvremote2](https://github.com/tronikos/androidtvremote2), [uc-integration-api](https://github.com/aitatoi/integration-python-library), [pychromecast](https://github.com/home-assistant-libs/pychromecast) and [google-play-scraper](https://github.com/JoMingyu/google-play-scraper). The integration currently supports almost all features that the androidtvremote2 library provides. Button control and ON/OFF states are supported. With the optional Google Cast support, media playing information can be @@ -16,6 +16,7 @@ Android TV devices. It can be run as an external integration for development is exposed per Android TV device to the Remote. - Device profiles allow device specific support and custom key bindings, for example double-click or long-press actions. See [command mappings](docs/command_mapping.md) for more information. +- Optional external metadata lookup using the Google Play Store ## Standalone Usage ### Setup diff --git a/data/external_cache/app_metadata.json b/data/external_cache/app_metadata.json new file mode 100644 index 0000000..5dfb8e9 --- /dev/null +++ b/data/external_cache/app_metadata.json @@ -0,0 +1,10 @@ +{ + "com.google.android.tvlauncher": { + "name": "Android TV Home", + "icon": "data/external_cache/icons/androidtv.png" + }, + "com.google.android.backdrop": { + "name": "Backdrop Daydream", + "icon": "data/external_cache/icons/androidtv.png" + } +} \ No newline at end of file diff --git a/data/external_cache/icons/androidtv.png b/data/external_cache/icons/androidtv.png new file mode 100644 index 0000000..536d6b1 Binary files /dev/null and b/data/external_cache/icons/androidtv.png differ diff --git a/intg-androidtv/apps.py b/intg-androidtv/apps.py index 0a7b5e1..64bfbbf 100644 --- a/intg-androidtv/apps.py +++ b/intg-androidtv/apps.py @@ -60,6 +60,7 @@ "tv.wuaki": "Rakuten TV", "homedia.sky.sport": "SKY", "com.teamsmart.videomanager.tv": "SmartTube", + "com.nathnetwork.supersmart": "SuperSmart", "nl.rtl.videoland.v2": "Videoland", "com.disney.disneyplus": "Disney+", "com.netflix.ninja": "Netflix", @@ -87,6 +88,7 @@ # Used to show which app is currently in the foreground (currently playing) NameMatching = { "youtube": "YouTube", + "videomanager": "YouTube", "amazonvideo": "Prime Video", "apple": "Apple TV", "plex": "Plex", diff --git a/intg-androidtv/config.py b/intg-androidtv/config.py index 464248c..b37b66f 100644 --- a/intg-androidtv/config.py +++ b/intg-androidtv/config.py @@ -34,6 +34,8 @@ class AtvDevice: """Device model name.""" auth_error: bool = False """Authentication error, device requires pairing.""" + use_external_metadata: bool = False + """Enable External Metadata.""" use_chromecast: bool = False """Enable Chromecast features.""" @@ -129,6 +131,7 @@ def update(self, atv: AtvDevice) -> bool: item.manufacturer = atv.manufacturer item.model = atv.model item.auth_error = atv.auth_error + item.use_external_metadata = atv.use_external_metadata item.use_chromecast = atv.use_chromecast return self.store() return False @@ -184,7 +187,11 @@ def clear(self) -> None: try: os.remove(file) except OSError as ex: - _LOG.error("Failed to remove certificate file %s: %s", os.path.basename(file), ex) + _LOG.error( + "Failed to remove certificate file %s: %s", + os.path.basename(file), + ex, + ) self._config = [] @@ -227,6 +234,7 @@ def load(self) -> bool: item.get("manufacturer", ""), item.get("model", ""), item.get("auth_error", False), + item.get("use_external_metadata", False), item.get("use_chromecast", False), ) self._config.append(atv) @@ -268,9 +276,9 @@ async def migrate(self) -> bool: android_tv = AndroidTv(self.certfile(item.id), self.keyfile(item.id), item) if await android_tv.init(10) and await android_tv.connect(10): if device_info := android_tv.device_info: - item.manufacturer = android_tv.device_info item.manufacturer = device_info.get("manufacturer", "") item.model = device_info.get("model", "") + item.use_external_metadata = device_info.get("use_external_metadata", False) _LOG.info( "Updating device configuration '%s' (%s) with: manufacturer=%s, model=%s", @@ -336,7 +344,11 @@ def assign_default_certs_to_device(self, atv_id: str, force: bool = False) -> bo ) os.rename(old_keyfile, new_file) except OSError as ex: - _LOG.error("Error while migrating certificate file %s: %s", os.path.basename(new_file), ex) + _LOG.error( + "Error while migrating certificate file %s: %s", + os.path.basename(new_file), + ex, + ) return False return True diff --git a/intg-androidtv/discover.py b/intg-androidtv/discover.py index 3016603..435d7de 100644 --- a/intg-androidtv/discover.py +++ b/intg-androidtv/discover.py @@ -24,7 +24,10 @@ async def android_tvs(timeout: int = 10) -> list[dict[str, str]]: discovered_android_tvs: list[dict[str, str]] = [] def on_service_state_changed( - zeroconf: Zeroconf, service_type: str, name: str, state_change: ServiceStateChange + zeroconf: Zeroconf, + service_type: str, + name: str, + state_change: ServiceStateChange, ) -> None: if state_change is not ServiceStateChange.Added: return @@ -44,7 +47,11 @@ async def display_service_info(zeroconf: Zeroconf, service_type: str, name: str) addresses = info.parsed_scoped_addresses() if addresses: - discovered_tv = {"name": name_final, "label": f"{name_final} [{addresses[0]}]", "address": addresses[0]} + discovered_tv = { + "name": name_final, + "label": f"{name_final} [{addresses[0]}]", + "address": addresses[0], + } discovered_android_tvs.append(discovered_tv) else: _LOG.debug("No info for %s", name) diff --git a/intg-androidtv/driver.py b/intg-androidtv/driver.py index aa9b0a8..771eac6 100644 --- a/intg-androidtv/driver.py +++ b/intg-androidtv/driver.py @@ -10,6 +10,7 @@ import logging import os import sys +from copy import copy from datetime import UTC, datetime from typing import Any @@ -232,7 +233,11 @@ async def handle_android_tv_update(atv_id: str, update: dict[str, Any]) -> None: if _LOG.isEnabledFor(logging.DEBUG): device = config.devices.get(atv_id) - _LOG.debug("[%s] device update: %s", device.name if device else atv_id, update) + # filter media_image_url property + log_upd = copy(update) + if MediaAttr.MEDIA_IMAGE_URL in log_upd: + log_upd[MediaAttr.MEDIA_IMAGE_URL] = "***" + _LOG.debug("[%s] device update: %s", device.name if device else atv_id, log_upd) old_state = ( configured_entity.attributes[MediaAttr.STATE] @@ -407,6 +412,7 @@ async def main(): logging.getLogger("profiles").setLevel(level) logging.getLogger("setup_flow").setLevel(level) logging.getLogger("androidtvremote2").setLevel(level) + logging.getLogger("external_metadata").setLevel(level) # logging.getLogger("pychromecast").setLevel(level) profile_path = os.path.join(api.config_dir_path, "profiles") diff --git a/intg-androidtv/external_metadata.py b/intg-androidtv/external_metadata.py new file mode 100644 index 0000000..8d55425 --- /dev/null +++ b/intg-androidtv/external_metadata.py @@ -0,0 +1,182 @@ +""" +External metadata retrieval from Google Play. + +:copyright: (c) 2025 by thomasm789, www.tmason.uk +:license: MPL-2.0, see LICENSE for more details. +""" + +import base64 +import json +import logging +import os +from io import BytesIO +from pathlib import Path +from typing import Dict +from urllib.parse import urlparse + +import google_play_scraper +import requests +from PIL import Image +from PIL.Image import Resampling +from pychromecast.controllers.media import MediaImage + +_LOG = logging.getLogger(__name__) + +CACHE_ROOT = "external_cache" +ICON_SUBDIR = "icons" +ICON_SIZE = (120, 120) + + +# Paths +def _get_cache_root() -> Path: + config_home = Path(os.environ.get("UC_DATA_HOME", "./data")) + cache_root = config_home / CACHE_ROOT + cache_root.mkdir(parents=True, exist_ok=True) + return cache_root + + +def _get_metadata_dir() -> Path: + metadata_dir = _get_cache_root() + metadata_dir.mkdir(parents=True, exist_ok=True) + return metadata_dir + + +def _get_icon_dir() -> Path: + icon_dir = _get_cache_root() / ICON_SUBDIR + icon_dir.mkdir(parents=True, exist_ok=True) + return icon_dir + + +def _get_metadata_file_path() -> Path: + return _get_metadata_dir() / "app_metadata.json" + + +def _get_icon_path(package_id: str) -> Path: + return _get_icon_dir() / f"{package_id}.png" + + +# Cache Management +def _load_cache() -> Dict[str, Dict[str, str]]: + path = _get_metadata_file_path() + if path.exists(): + try: + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + except Exception: + return {} + return {} + + +def _save_cache(cache: Dict[str, Dict[str, str]]) -> None: + path = _get_metadata_file_path() + with open(path, "w", encoding="utf-8") as f: + json.dump(cache, f, indent=2) + + +# Metadata Fetch +def _download_and_resize_icon(url: str, package_id: str) -> str | None: + try: + response = requests.get(url, timeout=10) + response.raise_for_status() + + img = Image.open(BytesIO(response.content)) + img = img.resize(ICON_SIZE, Resampling.LANCZOS) + + icon_path = _get_icon_path(package_id) + img.save(icon_path, format="PNG") + return str(icon_path) + except Exception as e: + _LOG.warning("Failed to fetch icon for %s: %s", package_id, e) + return None + + +def encode_icon_to_data_uri(icon_path: str) -> str: + """ + Encode an image from a local file path or remote URL. + + Returns a base64-encoded PNG data URI. + """ + if isinstance(icon_path, MediaImage): + icon_path = icon_path.url + + # Already a base64 data URI + if isinstance(icon_path, str) and icon_path.startswith("data:image"): + return icon_path + + try: + if _is_url(icon_path): + response = requests.get(icon_path, timeout=10) + response.raise_for_status() + img = Image.open(BytesIO(response.content)) + else: + with open(icon_path, "rb") as f: + img = Image.open(f) + img.load() # Ensure the image is fully loaded before the file is closed + + img = img.convert("RGBA") + + buffer = BytesIO() + img.save(buffer, format="PNG") + encoded = base64.b64encode(buffer.getvalue()).decode("utf-8") + return f"data:image/png;base64,{encoded}" + + except Exception as e: + _LOG.warning("Failed to encode icon to base64 for %s: %s", icon_path, e) + return "" + + +def _is_url(path: str) -> bool: + parsed = urlparse(path) + return parsed.scheme in ("http", "https") + + +def _fetch_google_play_metadata(package_id: str) -> Dict[str, str] | None: + try: + app = google_play_scraper.app(package_id) + + name = app["title"] + icon_url = app["icon"] + icon_path = _download_and_resize_icon(icon_url, package_id) + + return {"name": name, "icon": icon_path or ""} + + except Exception as e: + _LOG.warning("Google Play metadata fetch failed for %s: %s", package_id, e) + return None + + +def get_app_metadata(package_id: str) -> Dict[str, str]: + """ + Fetch metadata for a mobile application specified by the package ID. + + The metadata includes the application name and its icon encoded as a data URI. + If metadata is found in a locally cached source, it is fetched from the cache. + Otherwise, metadata is retrieved from external sources such as Google Play. + + :param package_id: The unique package identifier for the application. + :type package_id: str + :return: A dictionary containing the application's metadata. The dictionary + includes the 'name' of the application and the 'icon', which is the + application's icon encoded as a data URI. If no metadata is found, + it returns the package ID as the name and an empty string as the icon. + :rtype: Dict[str, str] + """ + cache = _load_cache() + if package_id in cache: + icon_path = cache[package_id].get("icon") + icon_data_uri = encode_icon_to_data_uri(icon_path) if icon_path else "" + return {"name": cache[package_id]["name"], "icon": icon_data_uri} + + # Try Google Play + metadata = _fetch_google_play_metadata(package_id) + # if not metadata: + # Additional Fallback option for the future maybe APKPure or another source + # metadata = fetch_fallback_metadata(package_id) + + if metadata: + cache[package_id] = metadata + _save_cache(cache) + icon_data_uri = encode_icon_to_data_uri(metadata["icon"]) if metadata["icon"] else "" + return {"name": metadata["name"], "icon": icon_data_uri} + + return {"name": package_id, "icon": ""} diff --git a/intg-androidtv/setup_flow.py b/intg-androidtv/setup_flow.py index d7476d5..cc5c317 100644 --- a/intg-androidtv/setup_flow.py +++ b/intg-androidtv/setup_flow.py @@ -45,6 +45,7 @@ class SetupSteps(IntEnum): _cfg_add_device: bool = False _discovered_android_tvs: list[dict[str, str]] = [] _pairing_android_tv: tv.AndroidTv | None = None +_use_external_metadata: bool = False _reconfigured_device: AtvDevice | None = None _use_chromecast: bool = False @@ -217,7 +218,12 @@ async def handle_driver_setup(msg: DriverSetupRequest) -> RequestUserInput | Set {"en": "Configuration mode", "de": "Konfigurations-Modus"}, [ { - "field": {"dropdown": {"value": dropdown_devices[0]["id"], "items": dropdown_devices}}, + "field": { + "dropdown": { + "value": dropdown_devices[0]["id"], + "items": dropdown_devices, + } + }, "id": "choice", "label": { "en": "Configured devices", @@ -226,7 +232,12 @@ async def handle_driver_setup(msg: DriverSetupRequest) -> RequestUserInput | Set }, }, { - "field": {"dropdown": {"value": dropdown_actions[0]["id"], "items": dropdown_actions}}, + "field": { + "dropdown": { + "value": dropdown_actions[0]["id"], + "items": dropdown_actions, + } + }, "id": "action", "label": { "en": "Action", @@ -243,7 +254,9 @@ async def handle_driver_setup(msg: DriverSetupRequest) -> RequestUserInput | Set return _user_input_discovery -async def handle_configuration_mode(msg: UserDataResponse) -> RequestUserInput | SetupComplete | SetupError: +async def handle_configuration_mode( + msg: UserDataResponse, +) -> RequestUserInput | SetupComplete | SetupError: """ Process user data response in a setup process. @@ -283,6 +296,9 @@ async def handle_configuration_mode(msg: UserDataResponse) -> RequestUserInput | _setup_step = SetupSteps.RECONFIGURE _reconfigured_device = selected_device use_chromecast = selected_device.use_chromecast if selected_device.use_chromecast else False + use_external_metadata = ( + selected_device.use_external_metadata if selected_device.use_external_metadata else False + ) return RequestUserInput( { @@ -298,6 +314,14 @@ async def handle_configuration_mode(msg: UserDataResponse) -> RequestUserInput | }, "field": {"checkbox": {"value": use_chromecast}}, }, + { + "id": "external_metadata", + "label": { + "en": "Enable enhanched metadata (e.g. Friendly Application Names and Icons)", + "fr": "Activer les métadonnées améliorées (par exemple, noms et icônes d'application)", + }, + "field": {"checkbox": {"value": use_external_metadata}}, + }, ], ) case "reset": @@ -345,7 +369,11 @@ async def _handle_discovery(msg: UserDataResponse) -> RequestUserInput | SetupEr existing = config.devices.get(android_tv.identifier) if _cfg_add_device and existing and not existing.auth_error: - _LOG.info("Manually specified device '%s' %s: already configured", existing.name, android_tv.identifier) + _LOG.info( + "Manually specified device '%s' %s: already configured", + existing.name, + android_tv.identifier, + ) # no better error code at the moment return SetupError(error_type=IntegrationSetupError.OTHER) dropdown_items.append({"id": address, "label": {"en": f"{android_tv.name} [{address}]"}}) @@ -356,11 +384,16 @@ async def _handle_discovery(msg: UserDataResponse) -> RequestUserInput | SetupEr # only add new devices or configured devices requiring new pairing for discovered_tv in _discovered_android_tvs: - tv_data = {"id": discovered_tv["address"], "label": {"en": discovered_tv["label"]}} + tv_data = { + "id": discovered_tv["address"], + "label": {"en": discovered_tv["label"]}, + } existing = config.devices.get_by_name_or_address(discovered_tv["name"], discovered_tv["address"]) if _cfg_add_device and existing and not existing.auth_error: _LOG.info( - "Skipping found device '%s' %s: already configured", discovered_tv["name"], discovered_tv["address"] + "Skipping found device '%s' %s: already configured", + discovered_tv["name"], + discovered_tv["address"], ) continue dropdown_items.append(tv_data) @@ -372,10 +405,18 @@ async def _handle_discovery(msg: UserDataResponse) -> RequestUserInput | SetupEr _setup_step = SetupSteps.DEVICE_CHOICE # TODO #9 externalize language texts return RequestUserInput( - title={"en": "Please choose your Android TV", "de": "Bitte Android TV auswählen"}, + title={ + "en": "Please choose your Android TV", + "de": "Bitte Android TV auswählen", + }, settings=[ { - "field": {"dropdown": {"value": dropdown_items[0]["id"], "items": dropdown_items}}, + "field": { + "dropdown": { + "value": dropdown_items[0]["id"], + "items": dropdown_items, + } + }, "id": "choice", "label": { "en": "Choose your Android TV", @@ -391,6 +432,11 @@ async def _handle_discovery(msg: UserDataResponse) -> RequestUserInput | SetupEr }, "field": {"checkbox": {"value": False}}, }, + { + "id": "external_metadata", + "label": {"en": "Enable external metadata (e.g. Friendly Application Names and Icons)"}, + "field": {"checkbox": {"value": False}}, + }, ], ) @@ -407,8 +453,10 @@ async def handle_device_choice(msg: UserDataResponse) -> RequestUserInput | Setu global _pairing_android_tv global _use_chromecast global _setup_step + global _use_external_metadata choice = msg.input_values["choice"] + _use_external_metadata = msg.input_values.get("external_metadata", "false") == "true" _use_chromecast = msg.input_values.get("chromecast", "false") == "true" name = "" @@ -419,7 +467,15 @@ async def handle_device_choice(msg: UserDataResponse) -> RequestUserInput | Setu certfile = config.devices.default_certfile() keyfile = config.devices.default_keyfile() _pairing_android_tv = tv.AndroidTv( - certfile, keyfile, AtvDevice(address=choice, name=name, id="", use_chromecast=False) + certfile, + keyfile, + AtvDevice( + address=choice, + name=name, + id="", + use_external_metadata=False, + use_chromecast=False, + ), ) _LOG.info("Chosen Android TV: %s. Start pairing process...", choice) @@ -439,7 +495,13 @@ async def handle_device_choice(msg: UserDataResponse) -> RequestUserInput | Setu "de": "Bitte gib die auf deinem Android-Fernseher angezeigte PIN ein", "fr": "Veuillez saisir le code PIN affiché sur votre Android TV", }, - [{"field": {"text": {"value": "000000"}}, "id": "pin", "label": {"en": "Android TV PIN"}}], + [ + { + "field": {"text": {"value": "000000"}}, + "id": "pin", + "label": {"en": "Android TV PIN"}, + } + ], ) return _setup_error_from_device_state(_pairing_android_tv.state) @@ -469,12 +531,14 @@ async def handle_user_data_pin(msg: UserDataResponse) -> SetupComplete | SetupEr # Connect again to retrieve device identifier (with init()) and additional device information (with connect()) if res == ucapi.StatusCodes.OK: - _LOG.info("[%s] Pairing done, retrieving device information", _pairing_android_tv.log_id) + _LOG.info( + "[%s] Pairing done, retrieving device information", + _pairing_android_tv.log_id, + ) res = ucapi.StatusCodes.SERVER_ERROR timeout = int(tv.CONNECTION_TIMEOUT) if await _pairing_android_tv.init(timeout) and await _pairing_android_tv.connect(timeout): - device_info = _pairing_android_tv.device_info - # Now rename the certificate files so that they are unique per device (with the identifier = mac address) + device_info = _pairing_android_tv.device_info or {} if config.devices.assign_default_certs_to_device(_pairing_android_tv.identifier, True): res = ucapi.StatusCodes.OK _pairing_android_tv.disconnect() @@ -485,30 +549,29 @@ async def handle_user_data_pin(msg: UserDataResponse) -> SetupComplete | SetupEr _pairing_android_tv = None return _setup_error_from_device_state(state) - if not device_info: - device_info = {} - device = AtvDevice( id=_pairing_android_tv.identifier, name=_pairing_android_tv.name, address=_pairing_android_tv.address, + use_external_metadata=_use_external_metadata, use_chromecast=_use_chromecast, manufacturer=device_info.get("manufacturer", ""), model=device_info.get("model", ""), ) + config.devices.add_or_update(device) # triggers AndroidTv instance creation config.devices.store() # ATV device connection will be triggered with subscribe_entities request - _pairing_android_tv = None await asyncio.sleep(1) - _LOG.info("[%s] Setup successfully completed for %s", device.name, device.id) return SetupComplete() -async def _handle_device_reconfigure(msg: UserDataResponse) -> SetupComplete | SetupError: +async def _handle_device_reconfigure( + msg: UserDataResponse, +) -> SetupComplete | SetupError: """ Process reconfiguration of a registered Android TV device. diff --git a/intg-androidtv/tv.py b/intg-androidtv/tv.py index 8f34664..2493d74 100644 --- a/intg-androidtv/tv.py +++ b/intg-androidtv/tv.py @@ -11,6 +11,7 @@ import socket import time from asyncio import AbstractEventLoop, timeout +from copy import copy from enum import IntEnum from functools import wraps from typing import Any, Awaitable, Callable, Concatenate, Coroutine, ParamSpec, TypeVar @@ -26,6 +27,7 @@ ConnectionClosed, InvalidAuth, ) +from external_metadata import encode_icon_to_data_uri, get_app_metadata from profiles import KeyPress, Profile from pychromecast import CastStatus, CastStatusListener, Chromecast, RequestTimeout from pychromecast.controllers.media import ( @@ -222,6 +224,8 @@ def __init__( self._last_update_position_time: float = 0 self._media_type = METADATA_TYPE_MOVIE self._media_image_url: str | None = None + self._app_image_url: str = "" + self._use_app_url = not device_config.use_chromecast self._player_state = media_player.States.ON self._muted = False @@ -479,7 +483,11 @@ async def connect(self, max_timeout: int | None = None) -> bool: def _handle_invalid_auth() -> None: self._state = DeviceState.AUTH_ERROR - _LOG.error("[%s] Invalid authentication for %s while reconnecting", self.log_id, self._identifier) + _LOG.error( + "[%s] Invalid authentication for %s while reconnecting", + self.log_id, + self._identifier, + ) self.events.emit(Events.AUTH_ERROR, self._identifier) self._atv.keep_reconnecting(_handle_invalid_auth) @@ -518,7 +526,10 @@ def _chromecast_connect(self): self._chromecast.wait(timeout=CONNECTION_TIMEOUT) _LOG.info("[%s] Chromecast connected", self.log_id) except (RequestTimeout, RuntimeError): - _LOG.info("[%s] Device is not active or Chromecast is not supported on this devices", self.log_id) + _LOG.info( + "[%s] Device is not active or Chromecast is not supported on this devices", + self.log_id, + ) async def _handle_connection_failure(self, connect_duration: float, ex): self._connection_attempts += 1 @@ -537,17 +548,28 @@ async def _handle_connection_failure(self, connect_duration: float, ex): # try resolving IP address from device name if we keep failing to connect, maybe the IP address changed if self._connection_attempts % 10 == 0: - _LOG.debug("[%s] Start resolving IP address for %s...", self.log_id, self._identifier) + _LOG.debug( + "[%s] Start resolving IP address for %s...", + self.log_id, + self._identifier, + ) try: discovered = await discover.android_tvs() for item in discovered: if item["name"] == self._name: if self._atv.host != item["address"]: _LOG.info( - "[%s] IP address of %s changed: %s", self.log_id, self._identifier, item["address"] + "[%s] IP address of %s changed: %s", + self.log_id, + self._identifier, + item["address"], ) self._atv.host = item["address"] - self.events.emit(Events.IP_ADDRESS_CHANGED, self._identifier, self._atv.host) + self.events.emit( + Events.IP_ADDRESS_CHANGED, + self._identifier, + self._atv.host, + ) break except Exception as e: # extra safety, otherwise reconnection task is dead @@ -568,47 +590,87 @@ def disconnect(self) -> None: self.events.emit(Events.DISCONNECTED, self._identifier) # Callbacks - def _is_on_updated(self, is_on: bool) -> None: - """Notify that the Android TV power state is updated.""" - _LOG.info("[%s] is on: %s", self.log_id, is_on) + def _apply_current_app_metadata(self, current_app: str) -> dict: update = {} - if is_on: - update[MediaAttr.STATE] = media_player.States.ON.value - # Chromecast service is not accessible when the device is in standby - self._chromecast_connect() - else: - update[MediaAttr.STATE] = media_player.States.OFF.value - self.events.emit(Events.UPDATE, self._identifier, update) - def _current_app_updated(self, current_app: str) -> None: - """Notify that the current app on Android TV is updated.""" - _LOG.debug("[%s] current_app: %s", self.log_id, current_app) - update = {MediaAttr.SOURCE: current_app} - current_title = self.media_title + # Track state of data sources + offline_name = None + offline_match = None + external_name = None + external_icon = None + # Try offline ID mapping first if current_app in apps.IdMappings: - update[MediaAttr.SOURCE] = apps.IdMappings[current_app] - self._media_app = current_app - else: - for query, app in apps.NameMatching.items(): + offline_name = apps.IdMappings[current_app] + self._media_app = offline_name + + # Try fuzzy offline name matching if ID mapping failed + if not offline_name: + for query, name in apps.NameMatching.items(): if query in current_app: - update[MediaAttr.SOURCE] = app - self._media_app = app + offline_match = name + self._media_app = name break - # TODO verify "idle" apps, probably best to make them configurable + # Try external metadata + metadata = get_app_metadata(current_app) if self._device_config.use_external_metadata else None + if metadata: + external_name = metadata.get("name") + external_icon = metadata.get("icon") + if external_name: + self._media_app = external_name + + # Determine final name/title to use + name_to_use = offline_name or offline_match or external_name or current_app + update[MediaAttr.SOURCE] = name_to_use + update[MediaAttr.MEDIA_TITLE] = name_to_use + + # Determine which icon to use + icon_to_use = None + if self._device_config.use_external_metadata and self._use_app_url: + if external_icon: + icon_to_use = encode_icon_to_data_uri(external_icon) + else: + icon_to_use = "" # Explicitly clear if expected but missing + elif self._media_image_url: + icon_to_use = encode_icon_to_data_uri(self._media_image_url) + + if icon_to_use is not None: + update[MediaAttr.MEDIA_IMAGE_URL] = icon_to_use + + # Special case handling for Android TV system apps if current_app in ("com.google.android.tvlauncher", "com.android.systemui"): update[MediaAttr.STATE] = media_player.States.ON.value - if self._media_title is None: - update[MediaAttr.MEDIA_TITLE] = "" + update[MediaAttr.MEDIA_TITLE] = "Android TV Home" + update[MediaAttr.SOURCE] = "Android TV Home" + update[MediaAttr.MEDIA_IMAGE_URL] = encode_icon_to_data_uri("data/external_cache/icons/androidtv.png") + elif current_app == "com.google.android.backdrop": + update[MediaAttr.STATE] = media_player.States.STANDBY.value + update[MediaAttr.MEDIA_TITLE] = "" + update[MediaAttr.MEDIA_IMAGE_URL] = encode_icon_to_data_uri("data/external_cache/icons/androidtv.png") else: update[MediaAttr.STATE] = media_player.States.PLAYING.value - if self._media_title is None: - update[MediaAttr.MEDIA_TITLE] = update[MediaAttr.SOURCE] - if current_title != self.media_title: - update[MediaAttr.MEDIA_TITLE] = self.media_title + return update + def _is_on_updated(self, is_on: bool) -> None: + """Notify that the Android TV power state is updated.""" + _LOG.info("[%s] is on: %s", self.log_id, is_on) + current_app = self._atv.current_app or "" + if is_on: + self._chromecast_connect() + update = self._apply_current_app_metadata(current_app) + update[MediaAttr.STATE] = media_player.States.ON.value + + else: + update = self._apply_current_app_metadata(current_app) + update[MediaAttr.STATE] = media_player.States.OFF.value + self.events.emit(Events.UPDATE, self._identifier, update) + + def _current_app_updated(self, current_app: str) -> None: + """Notify that the current app on Android TV is updated.""" + _LOG.debug("[%s] current_app: %s", self.log_id, current_app) + update = self._apply_current_app_metadata(current_app) self.events.emit(Events.UPDATE, self._identifier, update) def _volume_info_updated(self, volume_info: dict[str, str | bool]) -> None: @@ -643,13 +705,21 @@ async def send_media_player_command(self, cmd_id: str) -> ucapi.StatusCodes: BAD_REQUEST if the ``cmd_id`` is unknown or not supported """ if not self._profile: - _LOG.error("[%s] Cannot send command %s: no device profile set", self.log_id, cmd_id) + _LOG.error( + "[%s] Cannot send command %s: no device profile set", + self.log_id, + cmd_id, + ) return ucapi.StatusCodes.SERVER_ERROR if command := self._profile.command(cmd_id): return await self._send_command(command.keycode, command.action) - _LOG.error("[%s] Cannot send command, unknown or unsupported command: %s", self.log_id, cmd_id) + _LOG.error( + "[%s] Cannot send command, unknown or unsupported command: %s", + self.log_id, + cmd_id, + ) return ucapi.StatusCodes.BAD_REQUEST async def turn_on(self) -> ucapi.StatusCodes: @@ -795,15 +865,22 @@ def new_media_status(self, status: MediaStatus) -> None: if status.images and len(status.images) > 0 and status.images[0].url != self._media_image_url: self._media_image_url = status.images[0].url - update[MediaAttr.MEDIA_IMAGE_URL] = self._media_image_url - elif not self._media_image_url: - self._media_image_url = None - update[MediaAttr.MEDIA_IMAGE_URL] = "" + update[MediaAttr.MEDIA_IMAGE_URL] = encode_icon_to_data_uri(self._media_image_url) + self._use_app_url = False else: - update[MediaAttr.MEDIA_IMAGE_URL] = self._media_image_url + self._media_image_url = None + if self._device_config.use_external_metadata: + self._use_app_url = True + if self._app_image_url: + update[MediaAttr.MEDIA_IMAGE_URL] = encode_icon_to_data_uri(self._app_image_url) if update: - _LOG.debug("[%s] Update remote with Chromecast info : %s", self.log_id, update) + # filter media_image_url property + if _LOG.isEnabledFor(logging.DEBUG): + log_upd = copy(update) + if MediaAttr.MEDIA_IMAGE_URL in log_upd: + log_upd[MediaAttr.MEDIA_IMAGE_URL] = "***" + _LOG.debug("[%s] Update remote with Chromecast info : %s", self.log_id, log_upd) self.events.emit(Events.UPDATE, self._identifier, update) def load_media_failed(self, queue_item_id: int, error_code: int) -> None: @@ -818,6 +895,7 @@ def new_cast_status(self, status: CastStatus) -> None: if current_title != self.media_title: update = {MediaAttr.MEDIA_TITLE: self.media_title} + _LOG.debug("[%s] Update remote with Chromecast info : %s", self.log_id, update) self.events.emit(Events.UPDATE, self._identifier, update) diff --git a/requirements.txt b/requirements.txt index 344e19f..734f367 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,7 @@ androidtvremote2==0.2.1 ucapi==0.2.0 pyee~=13.0.0 +google_play_scraper==1.2.7 +pillow>=11.2.1 +requests>=2.32 pychromecast~=14.0.7 \ No newline at end of file