From 9ba1b32e053ecfa2772136983ca0f40172671883 Mon Sep 17 00:00:00 2001 From: Thomas Mason Date: Wed, 23 Apr 2025 22:25:26 +0100 Subject: [PATCH 1/8] updates to external_metadata.py for async and adding httpx dependancy --- intg-androidtv/external_metadata.py | 67 +++++++++-------------------- requirements.txt | 3 +- 2 files changed, 22 insertions(+), 48 deletions(-) diff --git a/intg-androidtv/external_metadata.py b/intg-androidtv/external_metadata.py index 8d55425..1166b9c 100644 --- a/intg-androidtv/external_metadata.py +++ b/intg-androidtv/external_metadata.py @@ -5,6 +5,7 @@ :license: MPL-2.0, see LICENSE for more details. """ +import asyncio import base64 import json import logging @@ -15,7 +16,7 @@ from urllib.parse import urlparse import google_play_scraper -import requests +import httpx from PIL import Image from PIL.Image import Resampling from pychromecast.controllers.media import MediaImage @@ -74,32 +75,26 @@ def _save_cache(cache: Dict[str, Dict[str, str]]) -> None: # Metadata Fetch -def _download_and_resize_icon(url: str, package_id: str) -> str | None: +async def _download_and_resize_icon_async(url: str, package_id: str) -> str | None: try: - response = requests.get(url, timeout=10) - response.raise_for_status() - - img = Image.open(BytesIO(response.content)) - img = img.resize(ICON_SIZE, Resampling.LANCZOS) + async with httpx.AsyncClient(timeout=10) as client: + response = await client.get(url) + response.raise_for_status() + img = Image.open(BytesIO(response.content)) + img = img.resize(ICON_SIZE, Resampling.LANCZOS) - icon_path = _get_icon_path(package_id) - img.save(icon_path, format="PNG") - return str(icon_path) + icon_path = _get_icon_path(package_id) + img.save(icon_path, format="PNG") + return str(icon_path) except Exception as e: _LOG.warning("Failed to fetch icon for %s: %s", package_id, e) return None def encode_icon_to_data_uri(icon_path: str) -> str: - """ - Encode an image from a local file path or remote URL. - - Returns a base64-encoded PNG data URI. - """ if isinstance(icon_path, MediaImage): icon_path = icon_path.url - # Already a base64 data URI if isinstance(icon_path, str) and icon_path.startswith("data:image"): return icon_path @@ -111,10 +106,9 @@ def encode_icon_to_data_uri(icon_path: str) -> str: else: with open(icon_path, "rb") as f: img = Image.open(f) - img.load() # Ensure the image is fully loaded before the file is closed + img.load() img = img.convert("RGBA") - buffer = BytesIO() img.save(buffer, format="PNG") encoded = base64.b64encode(buffer.getvalue()).decode("utf-8") @@ -130,53 +124,32 @@ def _is_url(path: str) -> bool: return parsed.scheme in ("http", "https") -def _fetch_google_play_metadata(package_id: str) -> Dict[str, str] | None: +async def _fetch_google_play_metadata_async(package_id: str) -> Dict[str, str] | None: try: - app = google_play_scraper.app(package_id) - + app = await asyncio.to_thread(google_play_scraper.app, package_id) name = app["title"] icon_url = app["icon"] - icon_path = _download_and_resize_icon(icon_url, package_id) - + icon_path = await _download_and_resize_icon_async(icon_url, package_id) return {"name": name, "icon": icon_path or ""} - except Exception as e: _LOG.warning("Google Play metadata fetch failed for %s: %s", package_id, e) return None -def get_app_metadata(package_id: str) -> Dict[str, str]: - """ - Fetch metadata for a mobile application specified by the package ID. - - The metadata includes the application name and its icon encoded as a data URI. - If metadata is found in a locally cached source, it is fetched from the cache. - Otherwise, metadata is retrieved from external sources such as Google Play. - - :param package_id: The unique package identifier for the application. - :type package_id: str - :return: A dictionary containing the application's metadata. The dictionary - includes the 'name' of the application and the 'icon', which is the - application's icon encoded as a data URI. If no metadata is found, - it returns the package ID as the name and an empty string as the icon. - :rtype: Dict[str, str] - """ +async def get_app_metadata_async(package_id: str) -> Dict[str, str]: cache = _load_cache() if package_id in cache: icon_path = cache[package_id].get("icon") icon_data_uri = encode_icon_to_data_uri(icon_path) if icon_path else "" return {"name": cache[package_id]["name"], "icon": icon_data_uri} - # Try Google Play - metadata = _fetch_google_play_metadata(package_id) - # if not metadata: - # Additional Fallback option for the future maybe APKPure or another source - # metadata = fetch_fallback_metadata(package_id) - + metadata = await _fetch_google_play_metadata_async(package_id) if metadata: cache[package_id] = metadata _save_cache(cache) - icon_data_uri = encode_icon_to_data_uri(metadata["icon"]) if metadata["icon"] else "" + icon_data_uri = ( + encode_icon_to_data_uri(metadata["icon"]) if metadata["icon"] else "" + ) return {"name": metadata["name"], "icon": icon_data_uri} return {"name": package_id, "icon": ""} diff --git a/requirements.txt b/requirements.txt index 734f367..e0afc6d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ pyee~=13.0.0 google_play_scraper==1.2.7 pillow>=11.2.1 requests>=2.32 -pychromecast~=14.0.7 \ No newline at end of file +pychromecast~=14.0.7 +httpx \ No newline at end of file From a558e1072c2ec5ebb363b8cf07c460a066a4b79f Mon Sep 17 00:00:00 2001 From: Thomas Mason Date: Wed, 23 Apr 2025 23:20:50 +0100 Subject: [PATCH 2/8] fixes bugs in 64 ands 63 --- intg-androidtv/driver.py | 44 ++++-- intg-androidtv/external_metadata.py | 124 +++++++++++----- intg-androidtv/tv.py | 214 ++++++++++++++++++++-------- 3 files changed, 276 insertions(+), 106 deletions(-) diff --git a/intg-androidtv/driver.py b/intg-androidtv/driver.py index 771eac6..53a3a8f 100644 --- a/intg-androidtv/driver.py +++ b/intg-androidtv/driver.py @@ -39,7 +39,9 @@ async def on_connect(): """When the UCR2 connects, all configured Android TV devices are getting connected.""" _LOG.debug("Client connect command: connecting device(s)") - await api.set_device_state(ucapi.DeviceStates.CONNECTED) # just to make sure the device state is set + await api.set_device_state( + ucapi.DeviceStates.CONNECTED + ) # just to make sure the device state is set for atv in _configured_android_tvs.values(): # start background task _LOOP.create_task(atv.connect()) @@ -95,7 +97,9 @@ async def on_subscribe_entities(entity_ids) -> None: state = media_player.States.UNAVAILABLE else: state = media_player.States.ON if atv.is_on else media_player.States.OFF - api.configured_entities.update_attributes(entity_id, {media_player.Attributes.STATE: state}) + api.configured_entities.update_attributes( + entity_id, {media_player.Attributes.STATE: state} + ) _LOOP.create_task(atv.connect()) continue @@ -103,7 +107,9 @@ async def on_subscribe_entities(entity_ids) -> None: if device: _add_configured_android_tv(device) else: - _LOG.error("Failed to subscribe entity %s: no Android TV instance found", entity_id) + _LOG.error( + "Failed to subscribe entity %s: no Android TV instance found", entity_id + ) @api.listens_to(ucapi.Events.UNSUBSCRIBE_ENTITIES) @@ -144,7 +150,9 @@ async def media_player_cmd_handler( android_tv = _configured_android_tvs[atv_id] - _LOG.info("[%s] command: %s %s", android_tv.log_id, cmd_id, params if params else "") + _LOG.info( + "[%s] command: %s %s", android_tv.log_id, cmd_id, params if params else "" + ) if cmd_id == media_player.Commands.ON: return await android_tv.turn_on() @@ -178,8 +186,12 @@ async def handle_connected(identifier: str): config.devices.update(device) # TODO is this the correct state? - api.configured_entities.update_attributes(identifier, {media_player.Attributes.STATE: media_player.States.STANDBY}) - await api.set_device_state(ucapi.DeviceStates.CONNECTED) # just to make sure the device state is set + api.configured_entities.update_attributes( + identifier, {media_player.Attributes.STATE: media_player.States.STANDBY} + ) + await api.set_device_state( + ucapi.DeviceStates.CONNECTED + ) # just to make sure the device state is set async def handle_disconnected(identifier: str): @@ -237,7 +249,9 @@ async def handle_android_tv_update(atv_id: str, update: dict[str, Any]) -> None: log_upd = copy(update) if MediaAttr.MEDIA_IMAGE_URL in log_upd: log_upd[MediaAttr.MEDIA_IMAGE_URL] = "***" - _LOG.debug("[%s] device update: %s", device.name if device else atv_id, log_upd) + _LOG.debug( + "[%s] device update: %s", device.name if device else atv_id, log_upd + ) old_state = ( configured_entity.attributes[MediaAttr.STATE] @@ -290,7 +304,9 @@ async def handle_android_tv_update(atv_id: str, update: dict[str, Any]) -> None: def _add_configured_android_tv(device: config.AtvDevice, connect: bool = True) -> None: - profile = device_profile.match(device.manufacturer, device.model, device.use_chromecast) + profile = device_profile.match( + device.manufacturer, device.model, device.use_chromecast + ) # the device should not yet be configured, but better be safe if device.id in _configured_android_tvs: @@ -308,7 +324,9 @@ def _add_configured_android_tv(device: config.AtvDevice, connect: bool = True) - android_tv.events.on(tv.Events.DISCONNECTED, handle_disconnected) android_tv.events.on(tv.Events.AUTH_ERROR, handle_authentication_error) android_tv.events.on(tv.Events.UPDATE, handle_android_tv_update) - android_tv.events.on(tv.Events.IP_ADDRESS_CHANGED, handle_android_tv_address_change) + android_tv.events.on( + tv.Events.IP_ADDRESS_CHANGED, handle_android_tv_address_change + ) _configured_android_tvs[device.id] = android_tv _LOG.info( @@ -378,7 +396,9 @@ def on_device_added(device: config.AtvDevice) -> None: def on_device_removed(device: config.AtvDevice | None) -> None: """Handle a removed device in the configuration.""" if device is None: - _LOG.debug("Configuration cleared, disconnecting & removing all configured Android TV instances") + _LOG.debug( + "Configuration cleared, disconnecting & removing all configured Android TV instances" + ) for atv in _configured_android_tvs.values(): atv.disconnect() atv.events.remove_all_listeners() @@ -419,7 +439,9 @@ async def main(): device_profile.load(profile_path) # load paired devices - config.devices = config.Devices(api.config_dir_path, on_device_added, on_device_removed) + config.devices = config.Devices( + api.config_dir_path, on_device_added, on_device_removed + ) # best effort migration (if required): network might not be available during startup if config.devices.migration_required(): await config.devices.migrate() diff --git a/intg-androidtv/external_metadata.py b/intg-androidtv/external_metadata.py index 1166b9c..1d168a3 100644 --- a/intg-androidtv/external_metadata.py +++ b/intg-androidtv/external_metadata.py @@ -33,27 +33,34 @@ def _get_cache_root() -> Path: config_home = Path(os.environ.get("UC_DATA_HOME", "./data")) cache_root = config_home / CACHE_ROOT cache_root.mkdir(parents=True, exist_ok=True) + _LOG.debug("Cache root path: %s", cache_root) return cache_root def _get_metadata_dir() -> Path: metadata_dir = _get_cache_root() metadata_dir.mkdir(parents=True, exist_ok=True) + _LOG.debug("Metadata directory path: %s", metadata_dir) return metadata_dir def _get_icon_dir() -> Path: icon_dir = _get_cache_root() / ICON_SUBDIR icon_dir.mkdir(parents=True, exist_ok=True) + _LOG.debug("Icon directory path: %s", icon_dir) return icon_dir def _get_metadata_file_path() -> Path: - return _get_metadata_dir() / "app_metadata.json" + path = _get_metadata_dir() / "app_metadata.json" + _LOG.debug("Metadata file path: %s", path) + return path def _get_icon_path(package_id: str) -> Path: - return _get_icon_dir() / f"{package_id}.png" + path = _get_icon_dir() / f"{package_id}.png" + _LOG.debug("Icon file path for %s: %s", package_id, path) + return path # Cache Management @@ -62,9 +69,13 @@ def _load_cache() -> Dict[str, Dict[str, str]]: if path.exists(): try: with open(path, "r", encoding="utf-8") as f: - return json.load(f) - except Exception: + cache = json.load(f) + _LOG.debug("Loaded metadata cache with %d entries", len(cache)) + return cache + except Exception as e: + _LOG.warning("Failed to load metadata cache: %s", e) return {} + _LOG.debug("Metadata cache file does not exist") return {} @@ -72,50 +83,79 @@ def _save_cache(cache: Dict[str, Dict[str, str]]) -> None: path = _get_metadata_file_path() with open(path, "w", encoding="utf-8") as f: json.dump(cache, f, indent=2) + _LOG.debug("Saved metadata cache with %d entries", len(cache)) # Metadata Fetch -async def _download_and_resize_icon_async(url: str, package_id: str) -> str | None: +async def _download_and_resize_icon(url: str, package_id: str) -> str | None: + _LOG.debug("Downloading and resizing icon for %s from %s", package_id, url) try: - async with httpx.AsyncClient(timeout=10) as client: - response = await client.get(url) + async with httpx.AsyncClient() as client: + response = await client.get(url, timeout=10) response.raise_for_status() - img = Image.open(BytesIO(response.content)) - img = img.resize(ICON_SIZE, Resampling.LANCZOS) + img_bytes = BytesIO(response.content) + + def resize_image() -> str: + img = Image.open(img_bytes) + img = img.resize(ICON_SIZE, Resampling.LANCZOS) icon_path = _get_icon_path(package_id) img.save(icon_path, format="PNG") - return str(icon_path) + _LOG.debug("Saved resized icon to %s", icon_path) + return icon_path.name + + filename = await asyncio.to_thread(resize_image) + return filename + except Exception as e: _LOG.warning("Failed to fetch icon for %s: %s", package_id, e) return None -def encode_icon_to_data_uri(icon_path: str) -> str: - if isinstance(icon_path, MediaImage): - icon_path = icon_path.url +async def encode_icon_to_data_uri(icon_name: str) -> str: + _LOG.debug("Encoding icon to data URI: %s", icon_name) + if isinstance(icon_name, MediaImage): + icon_name = icon_name.url - if isinstance(icon_path, str) and icon_path.startswith("data:image"): - return icon_path + if isinstance(icon_name, str) and icon_name.startswith("data:image"): + _LOG.debug("Icon is already a data URI") + return icon_name try: - if _is_url(icon_path): - response = requests.get(icon_path, timeout=10) - response.raise_for_status() - img = Image.open(BytesIO(response.content)) + if _is_url(icon_name): + async with httpx.AsyncClient() as client: + response = await client.get(icon_name, timeout=10) + response.raise_for_status() + img_bytes = BytesIO(response.content) + + def encode_image() -> str: + img = Image.open(img_bytes) + img = img.convert("RGBA") + buffer = BytesIO() + img.save(buffer, format="PNG") + encoded = base64.b64encode(buffer.getvalue()).decode("utf-8") + return f"data:image/png;base64,{encoded}" + + return await asyncio.to_thread(encode_image) else: - with open(icon_path, "rb") as f: - img = Image.open(f) - img.load() - img = img.convert("RGBA") - buffer = BytesIO() - img.save(buffer, format="PNG") - encoded = base64.b64encode(buffer.getvalue()).decode("utf-8") - return f"data:image/png;base64,{encoded}" + def load_and_encode() -> str: + icon_path = _get_icon_dir() / icon_name + if not icon_path.exists(): + raise FileNotFoundError(f"Icon not found: {icon_path}") + with open(icon_path, "rb") as f: + img = Image.open(f) + img.load() + img = img.convert("RGBA") + buffer = BytesIO() + img.save(buffer, format="PNG") + encoded = base64.b64encode(buffer.getvalue()).decode("utf-8") + return f"data:image/png;base64,{encoded}" + + return await asyncio.to_thread(load_and_encode) except Exception as e: - _LOG.warning("Failed to encode icon to base64 for %s: %s", icon_path, e) + _LOG.warning("Failed to encode icon to base64 for %s: %s", icon_name, e) return "" @@ -124,32 +164,44 @@ def _is_url(path: str) -> bool: return parsed.scheme in ("http", "https") -async def _fetch_google_play_metadata_async(package_id: str) -> Dict[str, str] | None: +async def _fetch_google_play_metadata(package_id: str) -> Dict[str, str] | None: + _LOG.debug("Fetching metadata for %s from Google Play", package_id) try: app = await asyncio.to_thread(google_play_scraper.app, package_id) + name = app["title"] icon_url = app["icon"] - icon_path = await _download_and_resize_icon_async(icon_url, package_id) - return {"name": name, "icon": icon_path or ""} + icon_name = await _download_and_resize_icon(icon_url, package_id) + + _LOG.debug( + "Fetched metadata for %s: name='%s', icon='%s'", package_id, name, icon_name + ) + return {"name": name, "icon": icon_name or ""} + except Exception as e: _LOG.warning("Google Play metadata fetch failed for %s: %s", package_id, e) return None -async def get_app_metadata_async(package_id: str) -> Dict[str, str]: +async def get_app_metadata(package_id: str) -> Dict[str, str]: + _LOG.debug("Getting app metadata for %s", package_id) cache = _load_cache() if package_id in cache: - icon_path = cache[package_id].get("icon") - icon_data_uri = encode_icon_to_data_uri(icon_path) if icon_path else "" + _LOG.debug("Cache hit for %s", package_id) + icon_name = cache[package_id].get("icon") + icon_data_uri = await encode_icon_to_data_uri(icon_name) if icon_name else "" return {"name": cache[package_id]["name"], "icon": icon_data_uri} - metadata = await _fetch_google_play_metadata_async(package_id) + _LOG.debug("Cache miss for %s", package_id) + metadata = await _fetch_google_play_metadata(package_id) + if metadata: cache[package_id] = metadata _save_cache(cache) icon_data_uri = ( - encode_icon_to_data_uri(metadata["icon"]) if metadata["icon"] else "" + await encode_icon_to_data_uri(metadata["icon"]) if metadata["icon"] else "" ) return {"name": metadata["name"], "icon": icon_data_uri} + _LOG.debug("Falling back to default metadata for %s", package_id) return {"name": package_id, "icon": ""} diff --git a/intg-androidtv/tv.py b/intg-androidtv/tv.py index 2493d74..26884dd 100644 --- a/intg-androidtv/tv.py +++ b/intg-androidtv/tv.py @@ -55,9 +55,7 @@ _LOG = logging.getLogger(__name__) CONNECTION_TIMEOUT: float = 10.0 -"""Android TV device connection timeout in seconds.""" BACKOFF_MAX: int = 30 -"""Maximum backoff duration in seconds.""" MIN_RECONNECT_DELAY: float = 0.5 BACKOFF_FACTOR: float = 1.5 @@ -118,7 +116,9 @@ class DeviceState(IntEnum): # https://github.com/home-assistant/core/blob/fd1f0b0efeb5231d3ee23d1cb2a10cdeff7c23f1/homeassistant/components/denonavr/media_player.py def async_handle_atvlib_errors( func: Callable[Concatenate[_AndroidTvT, _P], Awaitable[ucapi.StatusCodes | None]], -) -> Callable[Concatenate[_AndroidTvT, _P], Coroutine[Any, Any, ucapi.StatusCodes | None]]: +) -> Callable[ + Concatenate[_AndroidTvT, _P], Coroutine[Any, Any, ucapi.StatusCodes | None] +]: """Log errors occurred when calling an Android TV device. Decorates methods of AndroidTv class. @@ -127,21 +127,32 @@ def async_handle_atvlib_errors( """ @wraps(func) - async def wrapper(self: _AndroidTvT, *args: _P.args, **kwargs: _P.kwargs) -> ucapi.StatusCodes: + async def wrapper( + self: _AndroidTvT, *args: _P.args, **kwargs: _P.kwargs + ) -> ucapi.StatusCodes: try: # use the same exceptions as the func is throwing (e.g. AndroidTVRemote.send_key_command) state = self.state if state != DeviceState.CONNECTED: - if state in (DeviceState.DISCONNECTED, DeviceState.CONNECTING) or self.is_on is None: + if ( + state in (DeviceState.DISCONNECTED, DeviceState.CONNECTING) + or self.is_on is None + ): raise ConnectionClosed("Disconnected from device") if state in (DeviceState.AUTH_ERROR, DeviceState.PAIRING_ERROR): - raise InvalidAuth("Invalid authentication, device requires to be paired again") + raise InvalidAuth( + "Invalid authentication, device requires to be paired again" + ) raise CannotConnect(f"Device connection not active (state={state})") # workaround for "swallowed commands" since _atv.send_key_command doesn't provide a result # pylint: disable=W0212 if ( - not (self._atv and self._atv._remote_message_protocol and self._atv._remote_message_protocol.transport) + not ( + self._atv + and self._atv._remote_message_protocol + and self._atv._remote_message_protocol.transport + ) or self._atv._remote_message_protocol.transport.is_closing() ): _LOG.warning( @@ -162,7 +173,9 @@ async def wrapper(self: _AndroidTvT, *args: _P.args, **kwargs: _P.kwargs) -> uca _LOG.error("[%s] Cannot send command: %s", self.log_id, ex) return ucapi.StatusCodes.CONFLICT except ValueError as ex: - _LOG.error("[%s] Cannot send command, invalid key_code: %s", self.log_id, ex) + _LOG.error( + "[%s] Cannot send command, invalid key_code: %s", self.log_id, ex + ) return ucapi.StatusCodes.BAD_REQUEST return wrapper @@ -243,7 +256,9 @@ async def init(self, max_timeout: int | None = None) -> bool: :return: True if connected or connecting, False if timeout occurred. """ if self._state in (DeviceState.INITIALIZING, DeviceState.CONNECTING): - _LOG.debug("[%s] Skipping init task: connection task already running", self.log_id) + _LOG.debug( + "[%s] Skipping init task: connection task already running", self.log_id + ) return True self._state = DeviceState.INITIALIZING @@ -378,7 +393,9 @@ async def start_pairing(self) -> ucapi.StatusCodes: return ucapi.StatusCodes.SERVICE_UNAVAILABLE except InvalidAuth as ex: self._state = DeviceState.AUTH_ERROR - _LOG.error("[%s] Authentication error in start pairing: %s", self.log_id, ex) + _LOG.error( + "[%s] Authentication error in start pairing: %s", self.log_id, ex + ) return ucapi.StatusCodes.UNAUTHORIZED async def finish_pairing(self, pin: str) -> ucapi.StatusCodes: @@ -449,7 +466,9 @@ async def connect(self, max_timeout: int | None = None) -> bool: self._reconnect_delay = MIN_RECONNECT_DELAY except InvalidAuth: self._state = DeviceState.AUTH_ERROR - _LOG.error("[%s] Invalid authentication for %s", self.log_id, self._identifier) + _LOG.error( + "[%s] Invalid authentication for %s", self.log_id, self._identifier + ) self.events.emit(Events.AUTH_ERROR, self._identifier) break except (CannotConnect, ConnectionClosed, asyncio.TimeoutError) as ex: @@ -520,7 +539,9 @@ def _chromecast_connect(self): try: self._chromecast.register_status_listener(self) - self._chromecast.socket_client.media_controller.register_status_listener(self) + self._chromecast.socket_client.media_controller.register_status_listener( + self + ) self._chromecast.register_connection_listener(self) _LOG.info("[%s] Chromecast connecting", self.log_id) self._chromecast.wait(timeout=CONNECTION_TIMEOUT) @@ -590,21 +611,18 @@ def disconnect(self) -> None: self.events.emit(Events.DISCONNECTED, self._identifier) # Callbacks - def _apply_current_app_metadata(self, current_app: str) -> dict: + async def _apply_current_app_metadata(self, current_app: str) -> dict: update = {} - # Track state of data sources offline_name = None offline_match = None external_name = None external_icon = None - # Try offline ID mapping first if current_app in apps.IdMappings: offline_name = apps.IdMappings[current_app] self._media_app = offline_name - # Try fuzzy offline name matching if ID mapping failed if not offline_name: for query, name in apps.NameMatching.items(): if query in current_app: @@ -612,65 +630,78 @@ def _apply_current_app_metadata(self, current_app: str) -> dict: self._media_app = name break - # Try external metadata - metadata = get_app_metadata(current_app) if self._device_config.use_external_metadata else None + metadata = ( + await get_app_metadata(current_app) + if self._device_config.use_external_metadata + else None + ) if metadata: external_name = metadata.get("name") external_icon = metadata.get("icon") if external_name: self._media_app = external_name - # Determine final name/title to use name_to_use = offline_name or offline_match or external_name or current_app update[MediaAttr.SOURCE] = name_to_use update[MediaAttr.MEDIA_TITLE] = name_to_use - # Determine which icon to use icon_to_use = None if self._device_config.use_external_metadata and self._use_app_url: if external_icon: - icon_to_use = encode_icon_to_data_uri(external_icon) + icon_to_use = external_icon else: - icon_to_use = "" # Explicitly clear if expected but missing + icon_to_use = "" elif self._media_image_url: - icon_to_use = encode_icon_to_data_uri(self._media_image_url) + icon_to_use = await encode_icon_to_data_uri(self._media_image_url) if icon_to_use is not None: update[MediaAttr.MEDIA_IMAGE_URL] = icon_to_use - # Special case handling for Android TV system apps if current_app in ("com.google.android.tvlauncher", "com.android.systemui"): update[MediaAttr.STATE] = media_player.States.ON.value update[MediaAttr.MEDIA_TITLE] = "Android TV Home" update[MediaAttr.SOURCE] = "Android TV Home" - update[MediaAttr.MEDIA_IMAGE_URL] = encode_icon_to_data_uri("data/external_cache/icons/androidtv.png") + update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri( + "androidtv.png" + ) elif current_app == "com.google.android.backdrop": update[MediaAttr.STATE] = media_player.States.STANDBY.value update[MediaAttr.MEDIA_TITLE] = "" - update[MediaAttr.MEDIA_IMAGE_URL] = encode_icon_to_data_uri("data/external_cache/icons/androidtv.png") + update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri( + "androidtv.png" + ) else: update[MediaAttr.STATE] = media_player.States.PLAYING.value + update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri( + "androidtv.png" + ) + + _LOG.debug("%s", update) return update def _is_on_updated(self, is_on: bool) -> None: - """Notify that the Android TV power state is updated.""" + asyncio.create_task(self._handle_is_on_updated(is_on)) + + async def _handle_is_on_updated(self, is_on: bool): _LOG.info("[%s] is on: %s", self.log_id, is_on) current_app = self._atv.current_app or "" if is_on: self._chromecast_connect() - update = self._apply_current_app_metadata(current_app) + update = await self._apply_current_app_metadata(current_app) update[MediaAttr.STATE] = media_player.States.ON.value - else: - update = self._apply_current_app_metadata(current_app) + update = await self._apply_current_app_metadata(current_app) update[MediaAttr.STATE] = media_player.States.OFF.value + self.events.emit(Events.UPDATE, self._identifier, update) def _current_app_updated(self, current_app: str) -> None: - """Notify that the current app on Android TV is updated.""" + asyncio.create_task(self._handle_current_app_updated(current_app)) + + async def _handle_current_app_updated(self, current_app: str): _LOG.debug("[%s] current_app: %s", self.log_id, current_app) - update = self._apply_current_app_metadata(current_app) + update = await self._apply_current_app_metadata(current_app) self.events.emit(Events.UPDATE, self._identifier, update) def _volume_info_updated(self, volume_info: dict[str, str | bool]) -> None: @@ -684,7 +715,9 @@ def _is_available_updated(self, is_available: bool): """Notify that the Android TV is ready to receive commands or is unavailable.""" _LOG.info("[%s] is_available: %s", self.log_id, is_available) self._state = DeviceState.CONNECTED if is_available else DeviceState.CONNECTING - self.events.emit(Events.CONNECTED if is_available else Events.DISCONNECTED, self.identifier) + self.events.emit( + Events.CONNECTED if is_available else Events.DISCONNECTED, self.identifier + ) def _update_app_list(self) -> None: update = {} @@ -757,7 +790,9 @@ async def select_source(self, source: str) -> ucapi.StatusCodes: return await self._launch_app(source) @async_handle_atvlib_errors - async def _send_command(self, keycode: int | str, action: KeyPress = KeyPress.SHORT) -> ucapi.StatusCodes: + async def _send_command( + self, keycode: int | str, action: KeyPress = KeyPress.SHORT + ) -> ucapi.StatusCodes: """ Send a key press to Android TV. @@ -812,75 +847,118 @@ async def _switch_input(self, source: str) -> ucapi.StatusCodes: def new_connection_status(self, status: ConnectionStatus) -> None: """Receive new connection status event from Google cast.""" - _LOG.debug("[%s] Received Chromecast connection status : %s", self.log_id, status) + _LOG.debug( + "[%s] Received Chromecast connection status : %s", self.log_id, status + ) if status.status == "CONNECTED": _LOG.debug("[%s] Chromecast connected", self.log_id) def new_media_status(self, status: MediaStatus) -> None: """Receive new media status event from Google cast.""" - # For debugging, too verbose - # _LOG.debug("[%s] Update from Chromecast info : %s", self.log_id, status) + if not self._loop or not self._loop.is_running(): + _LOG.warning( + "[%s] No running event loop for handling new media status", self.log_id + ) + return + + try: + asyncio.run_coroutine_threadsafe( + self._handle_new_media_status(status), self._loop + ) + except Exception as e: + _LOG.error( + "[%s] Failed to schedule media status handler: %s", self.log_id, e + ) + + async def _handle_new_media_status(self, status: MediaStatus): update = {} + if ( status.player_state - and GOOGLE_CAST_MEDIA_STATES_MAP.get(status.player_state, media_player.States.PLAYING) != self._player_state + and GOOGLE_CAST_MEDIA_STATES_MAP.get( + status.player_state, media_player.States.PLAYING + ) + != self._player_state ): - # PLAYING, PAUSED, IDLE - self._player_state = GOOGLE_CAST_MEDIA_STATES_MAP.get(status.player_state, media_player.States.PLAYING) + self._player_state = GOOGLE_CAST_MEDIA_STATES_MAP.get( + status.player_state, media_player.States.PLAYING + ) self._last_update_position_time = 0 update[MediaAttr.STATE] = self._player_state + if status.album_name != self._media_album: - self._media_album = status.album_name if status.album_name else "" + self._media_album = status.album_name or "" update[MediaAttr.MEDIA_ALBUM] = self._media_album + if status.artist != self._media_artist: - self._media_artist = status.artist if status.artist else "" + self._media_artist = status.artist or "" update[MediaAttr.MEDIA_ARTIST] = self._media_artist + if status.title != self._media_title: current_title = self.media_title - self._media_title = status.title if status.title else "" + self._media_title = status.title or "" if current_title != self.media_title: - _LOG.debug("[%s] Chromecast Media info updated : %s", self.log_id, status) + _LOG.debug( + "[%s] Chromecast Media info updated : %s", self.log_id, status + ) update[MediaAttr.MEDIA_TITLE] = self.media_title + current_time = int(status.current_time) if status.current_time else 0 duration = int(status.duration) if status.duration else 0 - chanded_duration = False + changed_duration = False + if duration != self._media_duration: self._media_duration = duration update[MediaAttr.MEDIA_DURATION] = self._media_duration - chanded_duration = True - # Update position every 30 seconds - if chanded_duration or ( - current_time != self._media_position and self._last_update_position_time + 30 < time.time() + changed_duration = True + + if changed_duration or ( + current_time != self._media_position + and self._last_update_position_time + 30 < time.time() ): self._media_position = current_time update[MediaAttr.MEDIA_POSITION] = self._media_position update[MediaAttr.MEDIA_DURATION] = self._media_duration self._last_update_position_time = time.time() + if ( status.metadata_type - and GOOGLE_CAST_MEDIA_TYPES_MAP.get(status.metadata_type, MediaType.VIDEO) != self._media_type + and GOOGLE_CAST_MEDIA_TYPES_MAP.get(status.metadata_type, MediaType.VIDEO) + != self._media_type ): - self._media_type = GOOGLE_CAST_MEDIA_TYPES_MAP.get(self._media_type, MediaType.VIDEO) + self._media_type = GOOGLE_CAST_MEDIA_TYPES_MAP.get( + status.metadata_type, MediaType.VIDEO + ) update[MediaAttr.MEDIA_TYPE] = self._media_type - if status.images and len(status.images) > 0 and status.images[0].url != self._media_image_url: + if ( + status.images + and len(status.images) > 0 + and status.images[0].url != self._media_image_url + ): self._media_image_url = status.images[0].url - update[MediaAttr.MEDIA_IMAGE_URL] = encode_icon_to_data_uri(self._media_image_url) + update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri( + self._media_image_url + ) self._use_app_url = False else: self._media_image_url = None if self._device_config.use_external_metadata: self._use_app_url = True if self._app_image_url: - update[MediaAttr.MEDIA_IMAGE_URL] = encode_icon_to_data_uri(self._app_image_url) + update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri( + self._app_image_url + ) if update: - # filter media_image_url property if _LOG.isEnabledFor(logging.DEBUG): log_upd = copy(update) if MediaAttr.MEDIA_IMAGE_URL in log_upd: log_upd[MediaAttr.MEDIA_IMAGE_URL] = "***" - _LOG.debug("[%s] Update remote with Chromecast info : %s", self.log_id, log_upd) + _LOG.debug( + "[%s] Update remote with Chromecast info : %s", self.log_id, log_upd + ) + self.events.emit(Events.UPDATE, self._identifier, update) def load_media_failed(self, queue_item_id: int, error_code: int) -> None: @@ -889,21 +967,39 @@ def load_media_failed(self, queue_item_id: int, error_code: int) -> None: def new_cast_status(self, status: CastStatus) -> None: """Receive new cast event from Google cast.""" _LOG.debug("[%s] Received Chromecast cast status : %s", self.log_id, status) + + if not self._loop or not self._loop.is_running(): + _LOG.warning("[%s] No running event loop for cast status", self.log_id) + return + + try: + asyncio.run_coroutine_threadsafe( + self._handle_new_cast_status(status), self._loop + ) + except Exception as e: + _LOG.error( + "[%s] Failed to schedule cast status handler: %s", self.log_id, e + ) + + async def _handle_new_cast_status(self, status: CastStatus): current_title = self.media_title if status.display_name: self._media_app = status.display_name if current_title != self.media_title: update = {MediaAttr.MEDIA_TITLE: self.media_title} - - _LOG.debug("[%s] Update remote with Chromecast info : %s", self.log_id, update) + _LOG.debug( + "[%s] Update remote with Chromecast info : %s", self.log_id, update + ) self.events.emit(Events.UPDATE, self._identifier, update) async def media_seek(self, position: float) -> ucapi.StatusCodes: """Seek the media at the given position.""" try: if self._chromecast: - self._chromecast.media_controller.seek(position, timeout=CONNECTION_TIMEOUT) + self._chromecast.media_controller.seek( + position, timeout=CONNECTION_TIMEOUT + ) return ucapi.StatusCodes.OK except Exception as ex: _LOG.error("[%s] Chromecast error seeking command : %s", self.log_id, ex) From 15df3faa5fadd2fa57225c9b7ee4383e497b10ea Mon Sep 17 00:00:00 2001 From: Thomas Mason Date: Wed, 23 Apr 2025 23:21:44 +0100 Subject: [PATCH 3/8] update default metadata --- data/external_cache/app_metadata.json | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/data/external_cache/app_metadata.json b/data/external_cache/app_metadata.json index 5dfb8e9..74f9550 100644 --- a/data/external_cache/app_metadata.json +++ b/data/external_cache/app_metadata.json @@ -1,10 +1,14 @@ { + "com.android.systemui": { + "name": "Android TV", + "icon": "androidtv.png" + }, "com.google.android.tvlauncher": { - "name": "Android TV Home", - "icon": "data/external_cache/icons/androidtv.png" + "name": "Android TV", + "icon": "androidtv.png" }, "com.google.android.backdrop": { "name": "Backdrop Daydream", - "icon": "data/external_cache/icons/androidtv.png" + "icon": "androidtv.png" } } \ No newline at end of file From b20e50075fd5403154a18ee7fdd34945ab3ab08d Mon Sep 17 00:00:00 2001 From: Thomas Mason Date: Wed, 23 Apr 2025 23:47:02 +0100 Subject: [PATCH 4/8] further tweaks --- intg-androidtv/tv.py | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/intg-androidtv/tv.py b/intg-androidtv/tv.py index 26884dd..57fd67b 100644 --- a/intg-androidtv/tv.py +++ b/intg-androidtv/tv.py @@ -640,13 +640,18 @@ async def _apply_current_app_metadata(self, current_app: str) -> dict: external_icon = metadata.get("icon") if external_name: self._media_app = external_name + if external_icon: + self._app_image_url = external_icon + + _LOG.debug("%s", metadata) name_to_use = offline_name or offline_match or external_name or current_app update[MediaAttr.SOURCE] = name_to_use - update[MediaAttr.MEDIA_TITLE] = name_to_use + if not self._media_title and not self._media_image_url: + update[MediaAttr.MEDIA_TITLE] = name_to_use icon_to_use = None - if self._device_config.use_external_metadata and self._use_app_url: + if self._device_config.use_external_metadata or self._use_app_url: if external_icon: icon_to_use = external_icon else: @@ -654,29 +659,31 @@ async def _apply_current_app_metadata(self, current_app: str) -> dict: elif self._media_image_url: icon_to_use = await encode_icon_to_data_uri(self._media_image_url) - if icon_to_use is not None: - update[MediaAttr.MEDIA_IMAGE_URL] = icon_to_use - if current_app in ("com.google.android.tvlauncher", "com.android.systemui"): update[MediaAttr.STATE] = media_player.States.ON.value - update[MediaAttr.MEDIA_TITLE] = "Android TV Home" - update[MediaAttr.SOURCE] = "Android TV Home" + update[MediaAttr.MEDIA_TITLE] = "Android TV" + update[MediaAttr.SOURCE] = "Android TV" update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri( "androidtv.png" ) + elif current_app == "com.google.android.backdrop": update[MediaAttr.STATE] = media_player.States.STANDBY.value update[MediaAttr.MEDIA_TITLE] = "" update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri( "androidtv.png" ) + else: update[MediaAttr.STATE] = media_player.States.PLAYING.value - update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri( - "androidtv.png" - ) + if not icon_to_use: + update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri( + "androidtv.png" + ) + else: + update[MediaAttr.MEDIA_IMAGE_URL] = icon_to_use - _LOG.debug("%s", update) + # _LOG.debug("%s", update) return update From 74e8fd4450e4e39617f897fb9b882c0abaa5e349 Mon Sep 17 00:00:00 2001 From: Thomas Mason Date: Wed, 23 Apr 2025 23:52:18 +0100 Subject: [PATCH 5/8] further tweaks --- intg-androidtv/tv.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/intg-androidtv/tv.py b/intg-androidtv/tv.py index 57fd67b..9a25ebc 100644 --- a/intg-androidtv/tv.py +++ b/intg-androidtv/tv.py @@ -676,12 +676,14 @@ async def _apply_current_app_metadata(self, current_app: str) -> dict: else: update[MediaAttr.STATE] = media_player.States.PLAYING.value - if not icon_to_use: - update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri( - "androidtv.png" - ) - else: - update[MediaAttr.MEDIA_IMAGE_URL] = icon_to_use + # Skip applying app icon if media image from cast is present + if not self._media_image_url: + if not icon_to_use: + update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri( + "androidtv.png" + ) + else: + update[MediaAttr.MEDIA_IMAGE_URL] = icon_to_use # _LOG.debug("%s", update) From a15c3434bdcd8fb95b025c3353d0f7f6d3347014 Mon Sep 17 00:00:00 2001 From: Markus Zehnder Date: Fri, 25 Apr 2025 15:54:17 +0200 Subject: [PATCH 6/8] linting --- intg-androidtv/driver.py | 44 +++------ intg-androidtv/external_metadata.py | 59 +++++++----- intg-androidtv/tv.py | 142 +++++++--------------------- requirements.txt | 2 +- 4 files changed, 83 insertions(+), 164 deletions(-) diff --git a/intg-androidtv/driver.py b/intg-androidtv/driver.py index 53a3a8f..771eac6 100644 --- a/intg-androidtv/driver.py +++ b/intg-androidtv/driver.py @@ -39,9 +39,7 @@ async def on_connect(): """When the UCR2 connects, all configured Android TV devices are getting connected.""" _LOG.debug("Client connect command: connecting device(s)") - await api.set_device_state( - ucapi.DeviceStates.CONNECTED - ) # just to make sure the device state is set + await api.set_device_state(ucapi.DeviceStates.CONNECTED) # just to make sure the device state is set for atv in _configured_android_tvs.values(): # start background task _LOOP.create_task(atv.connect()) @@ -97,9 +95,7 @@ async def on_subscribe_entities(entity_ids) -> None: state = media_player.States.UNAVAILABLE else: state = media_player.States.ON if atv.is_on else media_player.States.OFF - api.configured_entities.update_attributes( - entity_id, {media_player.Attributes.STATE: state} - ) + api.configured_entities.update_attributes(entity_id, {media_player.Attributes.STATE: state}) _LOOP.create_task(atv.connect()) continue @@ -107,9 +103,7 @@ async def on_subscribe_entities(entity_ids) -> None: if device: _add_configured_android_tv(device) else: - _LOG.error( - "Failed to subscribe entity %s: no Android TV instance found", entity_id - ) + _LOG.error("Failed to subscribe entity %s: no Android TV instance found", entity_id) @api.listens_to(ucapi.Events.UNSUBSCRIBE_ENTITIES) @@ -150,9 +144,7 @@ async def media_player_cmd_handler( android_tv = _configured_android_tvs[atv_id] - _LOG.info( - "[%s] command: %s %s", android_tv.log_id, cmd_id, params if params else "" - ) + _LOG.info("[%s] command: %s %s", android_tv.log_id, cmd_id, params if params else "") if cmd_id == media_player.Commands.ON: return await android_tv.turn_on() @@ -186,12 +178,8 @@ async def handle_connected(identifier: str): config.devices.update(device) # TODO is this the correct state? - api.configured_entities.update_attributes( - identifier, {media_player.Attributes.STATE: media_player.States.STANDBY} - ) - await api.set_device_state( - ucapi.DeviceStates.CONNECTED - ) # just to make sure the device state is set + api.configured_entities.update_attributes(identifier, {media_player.Attributes.STATE: media_player.States.STANDBY}) + await api.set_device_state(ucapi.DeviceStates.CONNECTED) # just to make sure the device state is set async def handle_disconnected(identifier: str): @@ -249,9 +237,7 @@ async def handle_android_tv_update(atv_id: str, update: dict[str, Any]) -> None: log_upd = copy(update) if MediaAttr.MEDIA_IMAGE_URL in log_upd: log_upd[MediaAttr.MEDIA_IMAGE_URL] = "***" - _LOG.debug( - "[%s] device update: %s", device.name if device else atv_id, log_upd - ) + _LOG.debug("[%s] device update: %s", device.name if device else atv_id, log_upd) old_state = ( configured_entity.attributes[MediaAttr.STATE] @@ -304,9 +290,7 @@ async def handle_android_tv_update(atv_id: str, update: dict[str, Any]) -> None: def _add_configured_android_tv(device: config.AtvDevice, connect: bool = True) -> None: - profile = device_profile.match( - device.manufacturer, device.model, device.use_chromecast - ) + profile = device_profile.match(device.manufacturer, device.model, device.use_chromecast) # the device should not yet be configured, but better be safe if device.id in _configured_android_tvs: @@ -324,9 +308,7 @@ def _add_configured_android_tv(device: config.AtvDevice, connect: bool = True) - android_tv.events.on(tv.Events.DISCONNECTED, handle_disconnected) android_tv.events.on(tv.Events.AUTH_ERROR, handle_authentication_error) android_tv.events.on(tv.Events.UPDATE, handle_android_tv_update) - android_tv.events.on( - tv.Events.IP_ADDRESS_CHANGED, handle_android_tv_address_change - ) + android_tv.events.on(tv.Events.IP_ADDRESS_CHANGED, handle_android_tv_address_change) _configured_android_tvs[device.id] = android_tv _LOG.info( @@ -396,9 +378,7 @@ def on_device_added(device: config.AtvDevice) -> None: def on_device_removed(device: config.AtvDevice | None) -> None: """Handle a removed device in the configuration.""" if device is None: - _LOG.debug( - "Configuration cleared, disconnecting & removing all configured Android TV instances" - ) + _LOG.debug("Configuration cleared, disconnecting & removing all configured Android TV instances") for atv in _configured_android_tvs.values(): atv.disconnect() atv.events.remove_all_listeners() @@ -439,9 +419,7 @@ async def main(): device_profile.load(profile_path) # load paired devices - config.devices = config.Devices( - api.config_dir_path, on_device_added, on_device_removed - ) + config.devices = config.Devices(api.config_dir_path, on_device_added, on_device_removed) # best effort migration (if required): network might not be available during startup if config.devices.migration_required(): await config.devices.migrate() diff --git a/intg-androidtv/external_metadata.py b/intg-androidtv/external_metadata.py index 1d168a3..1339bd0 100644 --- a/intg-androidtv/external_metadata.py +++ b/intg-androidtv/external_metadata.py @@ -113,6 +113,11 @@ def resize_image() -> str: async def encode_icon_to_data_uri(icon_name: str) -> str: + """ + Encode an image from a local file path or remote URL. + + Returns a base64-encoded PNG data URI. + """ _LOG.debug("Encoding icon to data URI: %s", icon_name) if isinstance(icon_name, MediaImage): icon_name = icon_name.url @@ -137,22 +142,21 @@ def encode_image() -> str: return f"data:image/png;base64,{encoded}" return await asyncio.to_thread(encode_image) - else: - - def load_and_encode() -> str: - icon_path = _get_icon_dir() / icon_name - if not icon_path.exists(): - raise FileNotFoundError(f"Icon not found: {icon_path}") - with open(icon_path, "rb") as f: - img = Image.open(f) - img.load() - img = img.convert("RGBA") - buffer = BytesIO() - img.save(buffer, format="PNG") - encoded = base64.b64encode(buffer.getvalue()).decode("utf-8") - return f"data:image/png;base64,{encoded}" - - return await asyncio.to_thread(load_and_encode) + + def load_and_encode() -> str: + icon_path = _get_icon_dir() / icon_name + if not icon_path.exists(): + raise FileNotFoundError(f"Icon not found: {icon_path}") + with open(icon_path, "rb") as f: + img = Image.open(f) + img.load() + img = img.convert("RGBA") + buffer = BytesIO() + img.save(buffer, format="PNG") + encoded = base64.b64encode(buffer.getvalue()).decode("utf-8") + return f"data:image/png;base64,{encoded}" + + return await asyncio.to_thread(load_and_encode) except Exception as e: _LOG.warning("Failed to encode icon to base64 for %s: %s", icon_name, e) @@ -173,9 +177,7 @@ async def _fetch_google_play_metadata(package_id: str) -> Dict[str, str] | None: icon_url = app["icon"] icon_name = await _download_and_resize_icon(icon_url, package_id) - _LOG.debug( - "Fetched metadata for %s: name='%s', icon='%s'", package_id, name, icon_name - ) + _LOG.debug("Fetched metadata for %s: name='%s', icon='%s'", package_id, name, icon_name) return {"name": name, "icon": icon_name or ""} except Exception as e: @@ -184,6 +186,21 @@ async def _fetch_google_play_metadata(package_id: str) -> Dict[str, str] | None: async def get_app_metadata(package_id: str) -> Dict[str, str]: + """ + Fetch metadata for a mobile application specified by the package ID. + + The metadata includes the application name and its icon encoded as a data URI. + If metadata is found in a locally cached source, it is fetched from the cache. + Otherwise, metadata is retrieved from external sources such as Google Play. + + :param package_id: The unique package identifier for the application. + :type package_id: str + :return: A dictionary containing the application's metadata. The dictionary + includes the 'name' of the application and the 'icon', which is the + application's icon encoded as a data URI. If no metadata is found, + it returns the package ID as the name and an empty string as the icon. + :rtype: Dict[str, str] + """ _LOG.debug("Getting app metadata for %s", package_id) cache = _load_cache() if package_id in cache: @@ -198,9 +215,7 @@ async def get_app_metadata(package_id: str) -> Dict[str, str]: if metadata: cache[package_id] = metadata _save_cache(cache) - icon_data_uri = ( - await encode_icon_to_data_uri(metadata["icon"]) if metadata["icon"] else "" - ) + icon_data_uri = await encode_icon_to_data_uri(metadata["icon"]) if metadata["icon"] else "" return {"name": metadata["name"], "icon": icon_data_uri} _LOG.debug("Falling back to default metadata for %s", package_id) diff --git a/intg-androidtv/tv.py b/intg-androidtv/tv.py index 9a25ebc..1c1d857 100644 --- a/intg-androidtv/tv.py +++ b/intg-androidtv/tv.py @@ -116,9 +116,7 @@ class DeviceState(IntEnum): # https://github.com/home-assistant/core/blob/fd1f0b0efeb5231d3ee23d1cb2a10cdeff7c23f1/homeassistant/components/denonavr/media_player.py def async_handle_atvlib_errors( func: Callable[Concatenate[_AndroidTvT, _P], Awaitable[ucapi.StatusCodes | None]], -) -> Callable[ - Concatenate[_AndroidTvT, _P], Coroutine[Any, Any, ucapi.StatusCodes | None] -]: +) -> Callable[Concatenate[_AndroidTvT, _P], Coroutine[Any, Any, ucapi.StatusCodes | None]]: """Log errors occurred when calling an Android TV device. Decorates methods of AndroidTv class. @@ -127,32 +125,21 @@ def async_handle_atvlib_errors( """ @wraps(func) - async def wrapper( - self: _AndroidTvT, *args: _P.args, **kwargs: _P.kwargs - ) -> ucapi.StatusCodes: + async def wrapper(self: _AndroidTvT, *args: _P.args, **kwargs: _P.kwargs) -> ucapi.StatusCodes: try: # use the same exceptions as the func is throwing (e.g. AndroidTVRemote.send_key_command) state = self.state if state != DeviceState.CONNECTED: - if ( - state in (DeviceState.DISCONNECTED, DeviceState.CONNECTING) - or self.is_on is None - ): + if state in (DeviceState.DISCONNECTED, DeviceState.CONNECTING) or self.is_on is None: raise ConnectionClosed("Disconnected from device") if state in (DeviceState.AUTH_ERROR, DeviceState.PAIRING_ERROR): - raise InvalidAuth( - "Invalid authentication, device requires to be paired again" - ) + raise InvalidAuth("Invalid authentication, device requires to be paired again") raise CannotConnect(f"Device connection not active (state={state})") # workaround for "swallowed commands" since _atv.send_key_command doesn't provide a result # pylint: disable=W0212 if ( - not ( - self._atv - and self._atv._remote_message_protocol - and self._atv._remote_message_protocol.transport - ) + not (self._atv and self._atv._remote_message_protocol and self._atv._remote_message_protocol.transport) or self._atv._remote_message_protocol.transport.is_closing() ): _LOG.warning( @@ -173,9 +160,7 @@ async def wrapper( _LOG.error("[%s] Cannot send command: %s", self.log_id, ex) return ucapi.StatusCodes.CONFLICT except ValueError as ex: - _LOG.error( - "[%s] Cannot send command, invalid key_code: %s", self.log_id, ex - ) + _LOG.error("[%s] Cannot send command, invalid key_code: %s", self.log_id, ex) return ucapi.StatusCodes.BAD_REQUEST return wrapper @@ -256,9 +241,7 @@ async def init(self, max_timeout: int | None = None) -> bool: :return: True if connected or connecting, False if timeout occurred. """ if self._state in (DeviceState.INITIALIZING, DeviceState.CONNECTING): - _LOG.debug( - "[%s] Skipping init task: connection task already running", self.log_id - ) + _LOG.debug("[%s] Skipping init task: connection task already running", self.log_id) return True self._state = DeviceState.INITIALIZING @@ -393,9 +376,7 @@ async def start_pairing(self) -> ucapi.StatusCodes: return ucapi.StatusCodes.SERVICE_UNAVAILABLE except InvalidAuth as ex: self._state = DeviceState.AUTH_ERROR - _LOG.error( - "[%s] Authentication error in start pairing: %s", self.log_id, ex - ) + _LOG.error("[%s] Authentication error in start pairing: %s", self.log_id, ex) return ucapi.StatusCodes.UNAUTHORIZED async def finish_pairing(self, pin: str) -> ucapi.StatusCodes: @@ -466,9 +447,7 @@ async def connect(self, max_timeout: int | None = None) -> bool: self._reconnect_delay = MIN_RECONNECT_DELAY except InvalidAuth: self._state = DeviceState.AUTH_ERROR - _LOG.error( - "[%s] Invalid authentication for %s", self.log_id, self._identifier - ) + _LOG.error("[%s] Invalid authentication for %s", self.log_id, self._identifier) self.events.emit(Events.AUTH_ERROR, self._identifier) break except (CannotConnect, ConnectionClosed, asyncio.TimeoutError) as ex: @@ -539,9 +518,7 @@ def _chromecast_connect(self): try: self._chromecast.register_status_listener(self) - self._chromecast.socket_client.media_controller.register_status_listener( - self - ) + self._chromecast.socket_client.media_controller.register_status_listener(self) self._chromecast.register_connection_listener(self) _LOG.info("[%s] Chromecast connecting", self.log_id) self._chromecast.wait(timeout=CONNECTION_TIMEOUT) @@ -630,11 +607,7 @@ async def _apply_current_app_metadata(self, current_app: str) -> dict: self._media_app = name break - metadata = ( - await get_app_metadata(current_app) - if self._device_config.use_external_metadata - else None - ) + metadata = await get_app_metadata(current_app) if self._device_config.use_external_metadata else None if metadata: external_name = metadata.get("name") external_icon = metadata.get("icon") @@ -663,25 +636,19 @@ async def _apply_current_app_metadata(self, current_app: str) -> dict: update[MediaAttr.STATE] = media_player.States.ON.value update[MediaAttr.MEDIA_TITLE] = "Android TV" update[MediaAttr.SOURCE] = "Android TV" - update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri( - "androidtv.png" - ) + update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri("androidtv.png") elif current_app == "com.google.android.backdrop": update[MediaAttr.STATE] = media_player.States.STANDBY.value update[MediaAttr.MEDIA_TITLE] = "" - update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri( - "androidtv.png" - ) + update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri("androidtv.png") else: update[MediaAttr.STATE] = media_player.States.PLAYING.value # Skip applying app icon if media image from cast is present if not self._media_image_url: if not icon_to_use: - update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri( - "androidtv.png" - ) + update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri("androidtv.png") else: update[MediaAttr.MEDIA_IMAGE_URL] = icon_to_use @@ -724,9 +691,7 @@ def _is_available_updated(self, is_available: bool): """Notify that the Android TV is ready to receive commands or is unavailable.""" _LOG.info("[%s] is_available: %s", self.log_id, is_available) self._state = DeviceState.CONNECTED if is_available else DeviceState.CONNECTING - self.events.emit( - Events.CONNECTED if is_available else Events.DISCONNECTED, self.identifier - ) + self.events.emit(Events.CONNECTED if is_available else Events.DISCONNECTED, self.identifier) def _update_app_list(self) -> None: update = {} @@ -799,9 +764,7 @@ async def select_source(self, source: str) -> ucapi.StatusCodes: return await self._launch_app(source) @async_handle_atvlib_errors - async def _send_command( - self, keycode: int | str, action: KeyPress = KeyPress.SHORT - ) -> ucapi.StatusCodes: + async def _send_command(self, keycode: int | str, action: KeyPress = KeyPress.SHORT) -> ucapi.StatusCodes: """ Send a key press to Android TV. @@ -856,42 +819,29 @@ async def _switch_input(self, source: str) -> ucapi.StatusCodes: def new_connection_status(self, status: ConnectionStatus) -> None: """Receive new connection status event from Google cast.""" - _LOG.debug( - "[%s] Received Chromecast connection status : %s", self.log_id, status - ) + _LOG.debug("[%s] Received Chromecast connection status : %s", self.log_id, status) if status.status == "CONNECTED": _LOG.debug("[%s] Chromecast connected", self.log_id) def new_media_status(self, status: MediaStatus) -> None: """Receive new media status event from Google cast.""" if not self._loop or not self._loop.is_running(): - _LOG.warning( - "[%s] No running event loop for handling new media status", self.log_id - ) + _LOG.warning("[%s] No running event loop for handling new media status", self.log_id) return try: - asyncio.run_coroutine_threadsafe( - self._handle_new_media_status(status), self._loop - ) + asyncio.run_coroutine_threadsafe(self._handle_new_media_status(status), self._loop) except Exception as e: - _LOG.error( - "[%s] Failed to schedule media status handler: %s", self.log_id, e - ) + _LOG.error("[%s] Failed to schedule media status handler: %s", self.log_id, e) async def _handle_new_media_status(self, status: MediaStatus): update = {} if ( status.player_state - and GOOGLE_CAST_MEDIA_STATES_MAP.get( - status.player_state, media_player.States.PLAYING - ) - != self._player_state + and GOOGLE_CAST_MEDIA_STATES_MAP.get(status.player_state, media_player.States.PLAYING) != self._player_state ): - self._player_state = GOOGLE_CAST_MEDIA_STATES_MAP.get( - status.player_state, media_player.States.PLAYING - ) + self._player_state = GOOGLE_CAST_MEDIA_STATES_MAP.get(status.player_state, media_player.States.PLAYING) self._last_update_position_time = 0 update[MediaAttr.STATE] = self._player_state @@ -907,9 +857,7 @@ async def _handle_new_media_status(self, status: MediaStatus): current_title = self.media_title self._media_title = status.title or "" if current_title != self.media_title: - _LOG.debug( - "[%s] Chromecast Media info updated : %s", self.log_id, status - ) + _LOG.debug("[%s] Chromecast Media info updated : %s", self.log_id, status) update[MediaAttr.MEDIA_TITLE] = self.media_title current_time = int(status.current_time) if status.current_time else 0 @@ -922,8 +870,7 @@ async def _handle_new_media_status(self, status: MediaStatus): changed_duration = True if changed_duration or ( - current_time != self._media_position - and self._last_update_position_time + 30 < time.time() + current_time != self._media_position and self._last_update_position_time + 30 < time.time() ): self._media_position = current_time update[MediaAttr.MEDIA_POSITION] = self._media_position @@ -932,41 +879,28 @@ async def _handle_new_media_status(self, status: MediaStatus): if ( status.metadata_type - and GOOGLE_CAST_MEDIA_TYPES_MAP.get(status.metadata_type, MediaType.VIDEO) - != self._media_type + and GOOGLE_CAST_MEDIA_TYPES_MAP.get(status.metadata_type, MediaType.VIDEO) != self._media_type ): - self._media_type = GOOGLE_CAST_MEDIA_TYPES_MAP.get( - status.metadata_type, MediaType.VIDEO - ) + self._media_type = GOOGLE_CAST_MEDIA_TYPES_MAP.get(status.metadata_type, MediaType.VIDEO) update[MediaAttr.MEDIA_TYPE] = self._media_type - if ( - status.images - and len(status.images) > 0 - and status.images[0].url != self._media_image_url - ): + if status.images and len(status.images) > 0 and status.images[0].url != self._media_image_url: self._media_image_url = status.images[0].url - update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri( - self._media_image_url - ) + update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri(self._media_image_url) self._use_app_url = False else: self._media_image_url = None if self._device_config.use_external_metadata: self._use_app_url = True if self._app_image_url: - update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri( - self._app_image_url - ) + update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri(self._app_image_url) if update: if _LOG.isEnabledFor(logging.DEBUG): log_upd = copy(update) if MediaAttr.MEDIA_IMAGE_URL in log_upd: log_upd[MediaAttr.MEDIA_IMAGE_URL] = "***" - _LOG.debug( - "[%s] Update remote with Chromecast info : %s", self.log_id, log_upd - ) + _LOG.debug("[%s] Update remote with Chromecast info : %s", self.log_id, log_upd) self.events.emit(Events.UPDATE, self._identifier, update) @@ -982,13 +916,9 @@ def new_cast_status(self, status: CastStatus) -> None: return try: - asyncio.run_coroutine_threadsafe( - self._handle_new_cast_status(status), self._loop - ) + asyncio.run_coroutine_threadsafe(self._handle_new_cast_status(status), self._loop) except Exception as e: - _LOG.error( - "[%s] Failed to schedule cast status handler: %s", self.log_id, e - ) + _LOG.error("[%s] Failed to schedule cast status handler: %s", self.log_id, e) async def _handle_new_cast_status(self, status: CastStatus): current_title = self.media_title @@ -997,18 +927,14 @@ async def _handle_new_cast_status(self, status: CastStatus): if current_title != self.media_title: update = {MediaAttr.MEDIA_TITLE: self.media_title} - _LOG.debug( - "[%s] Update remote with Chromecast info : %s", self.log_id, update - ) + _LOG.debug("[%s] Update remote with Chromecast info : %s", self.log_id, update) self.events.emit(Events.UPDATE, self._identifier, update) async def media_seek(self, position: float) -> ucapi.StatusCodes: """Seek the media at the given position.""" try: if self._chromecast: - self._chromecast.media_controller.seek( - position, timeout=CONNECTION_TIMEOUT - ) + self._chromecast.media_controller.seek(position, timeout=CONNECTION_TIMEOUT) return ucapi.StatusCodes.OK except Exception as ex: _LOG.error("[%s] Chromecast error seeking command : %s", self.log_id, ex) diff --git a/requirements.txt b/requirements.txt index e0afc6d..6138722 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,4 @@ google_play_scraper==1.2.7 pillow>=11.2.1 requests>=2.32 pychromecast~=14.0.7 -httpx \ No newline at end of file +httpx~=0.28.1 \ No newline at end of file From cb392b4dddd1cd608926f5c61e8d07014547dd28 Mon Sep 17 00:00:00 2001 From: Markus Zehnder Date: Fri, 25 Apr 2025 16:20:39 +0200 Subject: [PATCH 7/8] revert removed comments, reduce logging --- .gitignore | 1 + intg-androidtv/external_metadata.py | 5 ----- intg-androidtv/tv.py | 16 ++++++++++++++-- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index f6b156c..e7e8a0b 100644 --- a/.gitignore +++ b/.gitignore @@ -105,3 +105,4 @@ config.json *.pem licenses.json tools/licenses.md +/data/external_cache/icons/ diff --git a/intg-androidtv/external_metadata.py b/intg-androidtv/external_metadata.py index 1339bd0..fa703ba 100644 --- a/intg-androidtv/external_metadata.py +++ b/intg-androidtv/external_metadata.py @@ -33,33 +33,28 @@ def _get_cache_root() -> Path: config_home = Path(os.environ.get("UC_DATA_HOME", "./data")) cache_root = config_home / CACHE_ROOT cache_root.mkdir(parents=True, exist_ok=True) - _LOG.debug("Cache root path: %s", cache_root) return cache_root def _get_metadata_dir() -> Path: metadata_dir = _get_cache_root() metadata_dir.mkdir(parents=True, exist_ok=True) - _LOG.debug("Metadata directory path: %s", metadata_dir) return metadata_dir def _get_icon_dir() -> Path: icon_dir = _get_cache_root() / ICON_SUBDIR icon_dir.mkdir(parents=True, exist_ok=True) - _LOG.debug("Icon directory path: %s", icon_dir) return icon_dir def _get_metadata_file_path() -> Path: path = _get_metadata_dir() / "app_metadata.json" - _LOG.debug("Metadata file path: %s", path) return path def _get_icon_path(package_id: str) -> Path: path = _get_icon_dir() / f"{package_id}.png" - _LOG.debug("Icon file path for %s: %s", package_id, path) return path diff --git a/intg-androidtv/tv.py b/intg-androidtv/tv.py index 1c1d857..29599fc 100644 --- a/intg-androidtv/tv.py +++ b/intg-androidtv/tv.py @@ -55,7 +55,9 @@ _LOG = logging.getLogger(__name__) CONNECTION_TIMEOUT: float = 10.0 +"""Android TV device connection timeout in seconds.""" BACKOFF_MAX: int = 30 +"""Maximum backoff duration in seconds.""" MIN_RECONNECT_DELAY: float = 0.5 BACKOFF_FACTOR: float = 1.5 @@ -591,15 +593,18 @@ def disconnect(self) -> None: async def _apply_current_app_metadata(self, current_app: str) -> dict: update = {} + # Track state of data sources offline_name = None offline_match = None external_name = None external_icon = None + # Try offline ID mapping first if current_app in apps.IdMappings: offline_name = apps.IdMappings[current_app] self._media_app = offline_name + # Try fuzzy offline name matching if ID mapping failed if not offline_name: for query, name in apps.NameMatching.items(): if query in current_app: @@ -607,6 +612,7 @@ async def _apply_current_app_metadata(self, current_app: str) -> dict: self._media_app = name break + # Try external metadata metadata = await get_app_metadata(current_app) if self._device_config.use_external_metadata else None if metadata: external_name = metadata.get("name") @@ -618,11 +624,13 @@ async def _apply_current_app_metadata(self, current_app: str) -> dict: _LOG.debug("%s", metadata) + # Determine final name/title to use name_to_use = offline_name or offline_match or external_name or current_app update[MediaAttr.SOURCE] = name_to_use if not self._media_title and not self._media_image_url: update[MediaAttr.MEDIA_TITLE] = name_to_use + # Determine which icon to use icon_to_use = None if self._device_config.use_external_metadata or self._use_app_url: if external_icon: @@ -632,17 +640,16 @@ async def _apply_current_app_metadata(self, current_app: str) -> dict: elif self._media_image_url: icon_to_use = await encode_icon_to_data_uri(self._media_image_url) + # Special case handling for Android TV system apps if current_app in ("com.google.android.tvlauncher", "com.android.systemui"): update[MediaAttr.STATE] = media_player.States.ON.value update[MediaAttr.MEDIA_TITLE] = "Android TV" update[MediaAttr.SOURCE] = "Android TV" update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri("androidtv.png") - elif current_app == "com.google.android.backdrop": update[MediaAttr.STATE] = media_player.States.STANDBY.value update[MediaAttr.MEDIA_TITLE] = "" update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri("androidtv.png") - else: update[MediaAttr.STATE] = media_player.States.PLAYING.value # Skip applying app icon if media image from cast is present @@ -657,6 +664,7 @@ async def _apply_current_app_metadata(self, current_app: str) -> dict: return update def _is_on_updated(self, is_on: bool) -> None: + """Notify that the Android TV power state is updated.""" asyncio.create_task(self._handle_is_on_updated(is_on)) async def _handle_is_on_updated(self, is_on: bool): @@ -673,6 +681,7 @@ async def _handle_is_on_updated(self, is_on: bool): self.events.emit(Events.UPDATE, self._identifier, update) def _current_app_updated(self, current_app: str) -> None: + """Notify that the current app on Android TV is updated.""" asyncio.create_task(self._handle_current_app_updated(current_app)) async def _handle_current_app_updated(self, current_app: str): @@ -841,6 +850,7 @@ async def _handle_new_media_status(self, status: MediaStatus): status.player_state and GOOGLE_CAST_MEDIA_STATES_MAP.get(status.player_state, media_player.States.PLAYING) != self._player_state ): + # PLAYING, PAUSED, IDLE self._player_state = GOOGLE_CAST_MEDIA_STATES_MAP.get(status.player_state, media_player.States.PLAYING) self._last_update_position_time = 0 update[MediaAttr.STATE] = self._player_state @@ -869,6 +879,7 @@ async def _handle_new_media_status(self, status: MediaStatus): update[MediaAttr.MEDIA_DURATION] = self._media_duration changed_duration = True + # Update position every 30 seconds if changed_duration or ( current_time != self._media_position and self._last_update_position_time + 30 < time.time() ): @@ -896,6 +907,7 @@ async def _handle_new_media_status(self, status: MediaStatus): update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri(self._app_image_url) if update: + # filter media_image_url property if _LOG.isEnabledFor(logging.DEBUG): log_upd = copy(update) if MediaAttr.MEDIA_IMAGE_URL in log_upd: From 86f3b86c9757c4a73f5511477162eea181d29c11 Mon Sep 17 00:00:00 2001 From: Markus Zehnder Date: Fri, 25 Apr 2025 16:31:29 +0200 Subject: [PATCH 8/8] cosmetics --- intg-androidtv/external_metadata.py | 6 ++---- intg-androidtv/tv.py | 4 +--- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/intg-androidtv/external_metadata.py b/intg-androidtv/external_metadata.py index fa703ba..4e7b4a4 100644 --- a/intg-androidtv/external_metadata.py +++ b/intg-androidtv/external_metadata.py @@ -49,13 +49,11 @@ def _get_icon_dir() -> Path: def _get_metadata_file_path() -> Path: - path = _get_metadata_dir() / "app_metadata.json" - return path + return _get_metadata_dir() / "app_metadata.json" def _get_icon_path(package_id: str) -> Path: - path = _get_icon_dir() / f"{package_id}.png" - return path + return _get_icon_dir() / f"{package_id}.png" # Cache Management diff --git a/intg-androidtv/tv.py b/intg-androidtv/tv.py index 29599fc..198bca5 100644 --- a/intg-androidtv/tv.py +++ b/intg-androidtv/tv.py @@ -622,7 +622,7 @@ async def _apply_current_app_metadata(self, current_app: str) -> dict: if external_icon: self._app_image_url = external_icon - _LOG.debug("%s", metadata) + _LOG.debug("App metadata: %s", metadata) # Determine final name/title to use name_to_use = offline_name or offline_match or external_name or current_app @@ -659,8 +659,6 @@ async def _apply_current_app_metadata(self, current_app: str) -> dict: else: update[MediaAttr.MEDIA_IMAGE_URL] = icon_to_use - # _LOG.debug("%s", update) - return update def _is_on_updated(self, is_on: bool) -> None: