diff --git a/requirements.txt b/requirements.txt index 9b8f0b0..9d94bb7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,5 @@ pillow>=11.2.1 requests>=2.32 pychromecast~=14.0.7 httpx~=0.28.1 -sanitize-filename~=1.2.0 \ No newline at end of file +sanitize-filename~=1.2.0 +adb-shell==0.4.4 \ No newline at end of file diff --git a/src/adb_tv.py b/src/adb_tv.py new file mode 100644 index 0000000..88e038c --- /dev/null +++ b/src/adb_tv.py @@ -0,0 +1,109 @@ +""" +This module provides utilities for interacting with Android TVs via ADB (Android Debug Bridge). +It includes functions for managing ADB keys, connecting to devices, retrieving installed apps, +and verifying device authorization. +""" + +import asyncio +import os +from pathlib import Path +from typing import Dict, Optional + +from adb_shell.adb_device_async import AdbDeviceTcpAsync +from adb_shell.auth.keygen import keygen +from adb_shell.auth.sign_pythonrsa import PythonRSASigner + +ADB_CERTS_DIR = Path(os.environ.get("UC_CONFIG_HOME", "./config")) / "certs" +ADB_CERTS_DIR.mkdir(parents=True, exist_ok=True) + + +def get_adb_key_paths(device_id: str) -> tuple[Path, Path]: + """ + Return the paths to the private and public ADB keys for a given device. + + Args: + device_id (str): The unique identifier for the device. + + Returns: + tuple[Path, Path]: Paths to the private and public key files. + """ + priv = ADB_CERTS_DIR / f"adb_{device_id}" + pub = ADB_CERTS_DIR / f"adb_{device_id}.pub" + return priv, pub + + +def load_or_generate_adb_keys(device_id: str) -> PythonRSASigner: + """ + Ensure ADB RSA keys exist for the device and return the signer. + + Args: + device_id (str): The unique identifier for the device. + + Returns: + PythonRSASigner: The signer object for ADB authentication. + """ + priv_path, pub_path = get_adb_key_paths(device_id) + + if not priv_path.exists() or not pub_path.exists(): + keygen(str(priv_path)) + + with open(priv_path) as f: + priv = f.read() + with open(pub_path) as f: + pub = f.read() + return PythonRSASigner(pub, priv) + + +async def adb_connect(device_id: str, host: str, port: int = 5555) -> Optional[AdbDeviceTcpAsync]: + """ + Connect to an Android device via ADB. + + Args: + device_id (str): The unique identifier for the device. + host (str): The IP address or hostname of the device. + port (int, optional): The port number for the ADB connection. Defaults to 5555. + + Returns: + Optional[AdbDeviceTcpAsync]: The connected ADB device object, or None if the connection fails. + """ + signer = load_or_generate_adb_keys(device_id) + device = AdbDeviceTcpAsync(host, port, default_transport_timeout_s=9.0) + + try: + await device.connect(rsa_keys=[signer], auth_timeout_s=20) + return device + except Exception as e: + print(f"ADB connection failed to {host}:{port} — {e}") + return None + + +async def get_installed_apps(device: AdbDeviceTcpAsync) -> Dict[str, Dict[str, str]]: + """ + Retrieve a list of installed non-system apps in a structured format. + + Args: + device (AdbDeviceTcpAsync): The connected ADB device. + + Returns: + Dict[str, Dict[str, str]]: A dictionary of app package names and their metadata. + """ + output = await device.shell("pm list packages -3 -e") + packages = sorted(line.replace("package:", "").strip() for line in output.splitlines()) + return {package: {"url": f"market://launch?id={package}"} for package in packages} + + +async def is_authorised(device: AdbDeviceTcpAsync) -> bool: + """ + Check if the connected device is authorized for ADB communication. + + Args: + device (AdbDeviceTcpAsync): The connected ADB device. + + Returns: + bool: True if the device is authorized, False otherwise. + """ + try: + result = await device.shell("echo ADB_OK") + return "ADB_OK" in result + except Exception: + return False diff --git a/src/config.py b/src/config.py index b37b66f..8c8baea 100644 --- a/src/config.py +++ b/src/config.py @@ -11,6 +11,7 @@ import logging import os from dataclasses import dataclass +from pathlib import Path from typing import Iterator _LOG = logging.getLogger(__name__) @@ -18,6 +19,19 @@ _CFG_FILENAME = "config.json" +# Paths +def _get_config_root() -> Path: + config_home = Path(os.environ.get("UC_CONFIG_HOME", "./config")) + config_home.mkdir(parents=True, exist_ok=True) + return config_home + + +def _get_data_root() -> Path: + data_home = Path(os.environ.get("UC_DATA_HOME", "./data")) + data_home.mkdir(parents=True, exist_ok=True) + return data_home + + @dataclass class AtvDevice: """Android TV device configuration.""" @@ -38,6 +52,8 @@ class AtvDevice: """Enable External Metadata.""" use_chromecast: bool = False """Enable Chromecast features.""" + use_adb: bool = False + """Enable ADB features.""" class _EnhancedJSONEncoder(json.JSONEncoder): @@ -133,24 +149,25 @@ def update(self, atv: AtvDevice) -> bool: item.auth_error = atv.auth_error item.use_external_metadata = atv.use_external_metadata item.use_chromecast = atv.use_chromecast + item.use_adb = atv.use_adb return self.store() return False def default_certfile(self) -> str: """Return the default certificate file for initializing a device.""" - return os.path.join(self._data_path, "androidtv_remote_cert.pem") + return os.path.join(self._data_path + "/certs", "androidtv_remote_cert.pem") def default_keyfile(self) -> str: """Return the default key file for initializing a device.""" - return os.path.join(self._data_path, "androidtv_remote_key.pem") + return os.path.join(self._data_path + "/certs", "androidtv_remote_key.pem") def certfile(self, atv_id: str) -> str: """Return the certificate file of the device.""" - return os.path.join(self._data_path, f"androidtv_{atv_id}_remote_cert.pem") + return os.path.join(self._data_path + "/certs", f"androidtv_{atv_id}_remote_cert.pem") def keyfile(self, atv_id: str) -> str: """Return the key file of the device.""" - return os.path.join(self._data_path, f"androidtv_{atv_id}_remote_key.pem") + return os.path.join(self._data_path + "/certs", f"androidtv_{atv_id}_remote_key.pem") def remove(self, atv_id: str) -> bool: """Remove the given device configuration.""" @@ -236,6 +253,7 @@ def load(self) -> bool: item.get("auth_error", False), item.get("use_external_metadata", False), item.get("use_chromecast", False), + item.get("use_adb", False), ) self._config.append(atv) return True diff --git a/src/external_metadata.py b/src/external_metadata.py index f2c0dd1..b0732de 100644 --- a/src/external_metadata.py +++ b/src/external_metadata.py @@ -21,6 +21,7 @@ from PIL.Image import Resampling from pychromecast.controllers.media import MediaImage from sanitize_filename import sanitize +from config import _get_config_root, _get_data_root _LOG = logging.getLogger(__name__) @@ -30,27 +31,14 @@ # Paths -def _get_config_root() -> Path: - config_home = Path(os.environ.get("UC_CONFIG_HOME", "./config")) - config_home.mkdir(parents=True, exist_ok=True) - return config_home - - -def _get_cache_root() -> Path: - data_home = Path(os.environ.get("UC_DATA_HOME", "./data")) - cache_root = data_home / CACHE_ROOT - cache_root.mkdir(parents=True, exist_ok=True) - return cache_root - - def _get_metadata_dir() -> Path: - metadata_dir = _get_cache_root() + metadata_dir = _get_data_root() / CACHE_ROOT metadata_dir.mkdir(parents=True, exist_ok=True) return metadata_dir def _get_icon_dir() -> Path: - icon_dir = _get_cache_root() / ICON_SUBDIR + icon_dir = _get_data_root() / CACHE_ROOT / ICON_SUBDIR icon_dir.mkdir(parents=True, exist_ok=True) return icon_dir diff --git a/src/profiles.py b/src/profiles.py index ac5eace..9cd51ce 100644 --- a/src/profiles.py +++ b/src/profiles.py @@ -263,6 +263,25 @@ def match(self, manufacturer: str, model: str, use_chromecast: bool) -> Profile: select_profile = copy.copy(select_profile) select_profile.features.extend(CHROMECAST_FEATURES) + import string + for char in string.ascii_uppercase + " ": + # Use 'SPACE' as the key name for the space character + key = "SPACE" if char == " " else char + command_name = f"TEXT_{key}" + + # Append to simple_commands list + select_profile.simple_commands.append(command_name) + + # Map the command: space gets ' ', others get lowercase letter + command_value = " " if char == " " else char.lower() + select_profile.command_map[command_name] = Command(command_value, 'TEXT') + + select_profile.simple_commands.append('TEXT_BACKSPACE') + select_profile.command_map['TEXT_BACKSPACE'] = Command('DEL', KeyPress.SHORT) + + select_profile.simple_commands.append('TEXT_ENTER') + select_profile.command_map['TEXT_ENTER'] = Command('ENTER', KeyPress.SHORT) + return select_profile diff --git a/src/setup_flow.py b/src/setup_flow.py index 4fff16f..097c0a3 100644 --- a/src/setup_flow.py +++ b/src/setup_flow.py @@ -7,7 +7,9 @@ import asyncio import logging +import os from enum import IntEnum +from pathlib import Path import ucapi from ucapi import ( @@ -25,7 +27,7 @@ import config import discover import tv -from config import AtvDevice +from config import AtvDevice, _get_config_root _LOG = logging.getLogger(__name__) @@ -38,7 +40,8 @@ class SetupSteps(IntEnum): DISCOVER = 2 DEVICE_CHOICE = 3 PAIRING_PIN = 4 - RECONFIGURE = 5 + APP_SELECTION = 5 + RECONFIGURE = 6 _setup_step = SetupSteps.INIT @@ -48,6 +51,9 @@ class SetupSteps(IntEnum): _use_external_metadata: bool = False _reconfigured_device: AtvDevice | None = None _use_chromecast: bool = False +_use_adb: bool = False +_adb_device_id: str = "" +_device_info: dict[str, str] = {} # TODO #9 externalize language texts _user_input_discovery = RequestUserInput( @@ -108,6 +114,8 @@ async def driver_setup_handler(msg: SetupDriver) -> SetupAction: return await handle_device_choice(msg) if _setup_step == SetupSteps.PAIRING_PIN and "pin" in msg.input_values: return await handle_user_data_pin(msg) + if _setup_step == SetupSteps.APP_SELECTION: + return await handle_app_selection(msg) if _setup_step == SetupSteps.RECONFIGURE: return await _handle_device_reconfigure(msg) _LOG.error("No or invalid user response was received: %s", msg) @@ -285,6 +293,16 @@ async def handle_configuration_mode( _LOG.warning("Could not remove device from configuration: %s", choice) return SetupError(error_type=IntegrationSetupError.OTHER) config.devices.store() + adb_cert_path = Path(os.environ.get("UC_CONFIG_HOME", "./config")) / "certs" / f"adb_{choice}" + adb_cert_path_pub = Path(os.environ.get("UC_CONFIG_HOME", "./config")) / "certs" / f"adb_{choice}.pub" + if adb_cert_path.exists(): + adb_cert_path.unlink() + if adb_cert_path_pub.exists(): + adb_cert_path_pub.unlink() + appslist_path = Path(os.environ.get("UC_CONFIG_HOME", "./config")) / f"appslist_{choice}.json" + if appslist_path.exists(): + appslist_path.unlink() + _LOG.info("Device removed from configuration: %s", choice) return SetupComplete() case "configure": # Reconfigure device if the identifier has changed @@ -300,6 +318,7 @@ async def handle_configuration_mode( use_external_metadata = ( selected_device.use_external_metadata if selected_device.use_external_metadata else False ) + use_adb = selected_device.use_adb if selected_device.use_adb else False return RequestUserInput( { @@ -325,6 +344,15 @@ async def handle_configuration_mode( }, "field": {"checkbox": {"value": use_external_metadata}}, }, + { + "id": "adb", + "label": { + "en": "Preview feature: Enable ADB connection (for app list)", + "de": "Vorschaufunktion: Aktiviere ADB Verbindung (für App-Browsing)", + "fr": "Fonctionnalité en aperçu: Activer la connexion ADB (pour la navigation dans les applications)", + }, + "field": {"checkbox": {"value": use_adb}}, + }, ], ) case "reset": @@ -446,6 +474,15 @@ async def _handle_discovery(msg: UserDataResponse) -> RequestUserInput | SetupEr }, "field": {"checkbox": {"value": False}}, }, + { + "id": "adb", + "label": { + "en": "Preview feature: Enable ADB connection (for app list)", + "de": "Vorschaufunktion: Aktiviere ADB Verbindung (für App-Browsing)", + "fr": "Fonctionnalité en aperçu: Activer la connexion ADB (pour la navigation dans les applications)", + }, + "field": {"checkbox": {"value": False}}, + }, ], ) @@ -463,10 +500,12 @@ async def handle_device_choice(msg: UserDataResponse) -> RequestUserInput | Setu global _use_chromecast global _setup_step global _use_external_metadata + global _use_adb choice = msg.input_values["choice"] _use_external_metadata = msg.input_values.get("external_metadata", "false") == "true" _use_chromecast = msg.input_values.get("chromecast", "false") == "true" + _use_adb = msg.input_values.get("adb", "false") == "true" name = "" for discovered_tv in _discovered_android_tvs: @@ -484,6 +523,7 @@ async def handle_device_choice(msg: UserDataResponse) -> RequestUserInput | Setu id="", use_external_metadata=False, use_chromecast=False, + use_adb=False, ), ) _LOG.info("Chosen Android TV: %s. Start pairing process...", choice) @@ -516,7 +556,7 @@ async def handle_device_choice(msg: UserDataResponse) -> RequestUserInput | Setu return _setup_error_from_device_state(_pairing_android_tv.state) -async def handle_user_data_pin(msg: UserDataResponse) -> SetupComplete | SetupError: +async def handle_user_data_pin(msg: UserDataResponse) -> RequestUserInput | SetupComplete | SetupError: """ Process user data pairing pin response in a setup process. @@ -526,6 +566,9 @@ async def handle_user_data_pin(msg: UserDataResponse) -> SetupComplete | SetupEr :return: the setup action on how to continue: SetupComplete if a valid Android TV device was chosen. """ global _pairing_android_tv + global _setup_step + + _LOG.debug("Entered handle_user_data_pin with msg: %s", msg) if _pairing_android_tv is None: _LOG.error("Can't handle pairing pin: no device instance! Aborting setup") @@ -534,23 +577,33 @@ async def handle_user_data_pin(msg: UserDataResponse) -> SetupComplete | SetupEr _LOG.info("[%s] User has entered the PIN", _pairing_android_tv.log_id) res = await _pairing_android_tv.finish_pairing(msg.input_values["pin"]) - _pairing_android_tv.disconnect() + _LOG.debug("[%s] finish_pairing result: %s", _pairing_android_tv.log_id, res) - device_info = None + _pairing_android_tv.disconnect() + _LOG.debug("[%s] Disconnected after pairing attempt", _pairing_android_tv.log_id) - # Connect again to retrieve device identifier (with init()) and additional device information (with connect()) if res == ucapi.StatusCodes.OK: - _LOG.info( - "[%s] Pairing done, retrieving device information", - _pairing_android_tv.log_id, - ) + _LOG.info("[%s] Pairing done, retrieving device information", _pairing_android_tv.log_id) res = ucapi.StatusCodes.SERVER_ERROR timeout = int(tv.CONNECTION_TIMEOUT) - if await _pairing_android_tv.init(timeout) and await _pairing_android_tv.connect(timeout): - device_info = _pairing_android_tv.device_info or {} + _LOG.debug("[%s] Attempting to initialize and connect with timeout: %d", _pairing_android_tv.log_id, timeout) + + _device_info = None + timeout = int(tv.CONNECTION_TIMEOUT) + + if await _pairing_android_tv.init(timeout): + _LOG.debug("[%s] Initialization successful", _pairing_android_tv.log_id) + if await _pairing_android_tv.connect(timeout): + _LOG.debug("[%s] Connection successful", _pairing_android_tv.log_id) + _device_info = _pairing_android_tv.device_info or {} + _LOG.debug("[%s] Retrieved device info: %s", _pairing_android_tv.log_id, _device_info) + if config.devices.assign_default_certs_to_device(_pairing_android_tv.identifier, True): res = ucapi.StatusCodes.OK - _pairing_android_tv.disconnect() + _LOG.debug("[%s] Default certificates assigned successfully", _pairing_android_tv.log_id) + + _pairing_android_tv.disconnect() + _LOG.debug("[%s] Disconnected after retrieving device information", _pairing_android_tv.log_id) if res != ucapi.StatusCodes.OK: state = _pairing_android_tv.state @@ -558,23 +611,163 @@ async def handle_user_data_pin(msg: UserDataResponse) -> SetupComplete | SetupEr _pairing_android_tv = None return _setup_error_from_device_state(state) + if _use_adb: + _LOG.debug("ADB is enabled, proceeding with ADB setup") + + if not msg.input_values.get("adb", False): + _LOG.error("ADB setup failed: 'use_adb' not found in input values") + return SetupError() + + from adb_tv import adb_connect, get_installed_apps, is_authorised + + device_id = _pairing_android_tv.identifier + ip_address = _pairing_android_tv.address + _LOG.debug("Attempting ADB setup for device_id: %s, ip_address: %s", device_id, ip_address) + + adb_device = await adb_connect(device_id, ip_address) + if not adb_device: + return SetupError(error_type=IntegrationSetupError.AUTHORIZATION_ERROR) + + if not await is_authorised(adb_device): + return SetupError(error_type=IntegrationSetupError.AUTHORIZATION_ERROR) + + _LOG.debug("ADB authorisation confirmed") + + from apps import Apps, IdMappings + from external_metadata import get_app_metadata + + if _use_adb: + adb_apps = await get_installed_apps(adb_device) # dict[str, dict[str, str]] + offline_apps = {**Apps, **adb_apps} # ADB apps override Apps if same name + _LOG.debug("Retrieved ADB apps: %s", adb_apps) + await adb_device.close() + else: + offline_apps = Apps + + _LOG.debug("Retrieved offline apps: %s", offline_apps) + + settings = [] + + for package, details in sorted(offline_apps.items()): + # Start with mapped name or static name + mapped_name = IdMappings.get(package) + static_name = details.get("name", package) + name = mapped_name or static_name + + # Attempt to get external metadata if name is still just package id + if _use_external_metadata and not mapped_name: + try: + metadata = await get_app_metadata(package) + name = metadata.get("name", name) + except Exception as e: + _LOG.warning("Metadata lookup failed for %s: %s", package, e) + + if _use_adb and package in adb_apps: + # ADB-specific apps: checkbox + friendly name input + settings.append( + { + "id": f"{package}_enabled", + "label": { + "en": f"Enable {package}", + "de": f"Aktiviere {package}", + "fr": f"Activer {package}", + }, + "field": {"checkbox": {"value": False}}, + } + ) + settings.append( + { + "id": f"{package}_name", + "label": { + "en": f"Friendly name for {package}", + "de": f"Anzeigename für {package}", + "fr": f"Nom convivial pour {package}", + }, + "field": {"text": {"value": name}}, + } + ) + else: + # Predefined apps: checkbox only + settings.append( + { + "id": f"{package}_enabled", + "label": { + "en": name, + "de": name, + "fr": name, + }, + "field": {"checkbox": {"value": False}}, + } + ) + + _setup_step = SetupSteps.APP_SELECTION + return RequestUserInput( + title={ + "en": "Select visible apps", + "de": "Wähle sichtbare Apps", + "fr": "Sélectionnez les applications visibles", + }, + settings=settings, + ) + + +async def handle_app_selection(msg: UserDataResponse) -> SetupComplete | SetupError: + global _pairing_android_tv + global _use_chromecast + global _use_external_metadata + global _use_adb + global _device_info + + import json + + from apps import Apps # offline apps + + selected_apps = {} + + for field_id, value in msg.input_values.items(): + if field_id.endswith("_enabled") and str(value).lower() == "true": + package = field_id.removesuffix("_enabled") + name_field = f"{package}_name" + friendly_name = msg.input_values.get(name_field, package) + + # Prefer static app URL if it exists + static_entry = Apps.get(friendly_name) or Apps.get(package) + url = static_entry["url"] if static_entry else f"market://launch?id={package}" + + selected_apps[friendly_name] = {"url": url} + + filename = f"appslist_{_pairing_android_tv.identifier}.json" + apps_file = _get_config_root() / filename + try: + apps_file.write_text(json.dumps(selected_apps, indent=2)) + _LOG.info("App selection stored: %s", selected_apps) + except Exception as e: + _LOG.error("Failed to write selected apps: %s", e) + return SetupError() + device = AtvDevice( id=_pairing_android_tv.identifier, name=_pairing_android_tv.name, address=_pairing_android_tv.address, + manufacturer=_device_info.get("manufacturer", ""), + model=_device_info.get("model", ""), use_external_metadata=_use_external_metadata, use_chromecast=_use_chromecast, - manufacturer=device_info.get("manufacturer", ""), - model=device_info.get("model", ""), + use_adb=_use_adb, ) + _LOG.debug("Created AtvDevice: %s", device) + + config.devices.add_or_update(device) + _LOG.debug("Device added/updated in configuration") - config.devices.add_or_update(device) # triggers AndroidTv instance creation config.devices.store() + _LOG.debug("Configuration stored") - # ATV device connection will be triggered with subscribe_entities request - _pairing_android_tv = None await asyncio.sleep(1) _LOG.info("[%s] Setup successfully completed for %s", device.name, device.id) + + _pairing_android_tv = None + return SetupComplete() @@ -596,10 +789,12 @@ async def _handle_device_reconfigure( use_chromecast = msg.input_values.get("chromecast", "false") == "true" use_external_metadata = msg.input_values.get("external_metadata", "false") == "true" + use_adb = msg.input_values.get("adb", "false") == "true" _LOG.debug("User has changed configuration") _reconfigured_device.use_chromecast = use_chromecast _reconfigured_device.use_external_metadata = use_external_metadata + _reconfigured_device.use_adb = use_adb config.devices.add_or_update(_reconfigured_device) # triggers ATV instance update await asyncio.sleep(1) diff --git a/src/tv.py b/src/tv.py index fa3051d..fbd0ed4 100644 --- a/src/tv.py +++ b/src/tv.py @@ -8,6 +8,7 @@ # pylint: disable=too-many-lines import asyncio +import json import logging import os import socket @@ -49,7 +50,7 @@ import apps import discover import inputs -from config import AtvDevice +from config import AtvDevice, _get_config_root from external_metadata import encode_icon_to_data_uri, get_app_metadata from profiles import KeyPress, Profile from util import filter_data_img_properties @@ -716,8 +717,22 @@ def _is_available_updated(self, is_available: bool): def _update_app_list(self) -> None: update = {} source_list = [] - for app in apps.Apps: - source_list.append(app) + + filename = f"appslist_{self._identifier}.json" + apps_file = _get_config_root() / filename + + if apps_file.exists(): + try: + with apps_file.open("r", encoding="utf-8") as f: + selected_apps = json.load(f) + source_list.extend(selected_apps.keys()) + except Exception as e: + _LOG.warning("Failed to read apps list from %s: %s", apps_file, e) + else: + _LOG.info("No saved app list found for %s, falling back to default", self._identifier) + import apps # fall back to static apps + + source_list.extend(apps.Apps.keys()) update[MediaAttr.SOURCE_LIST] = source_list self.events.emit(Events.UPDATE, self._identifier, update) @@ -772,15 +787,31 @@ async def turn_off(self) -> ucapi.StatusCodes: async def select_source(self, source: str) -> ucapi.StatusCodes: """ - Select a given source, either a pre-defined app, input or by app-link/id. + Select a given source, either a user-defined app (from JSON), + an input source (KeyCode), or directly by app-link / id. :param source: the friendly source name or an app-link / id """ - if source in apps.Apps: - return await self._launch_app(apps.Apps[source]["url"]) + # Load saved apps for this device + apps_file = _get_config_root() / f"appslist_{self._identifier}.json" + apps_list = {} + + if apps_file.exists(): + try: + with apps_file.open("r", encoding="utf-8") as f: + apps_list = json.load(f) + except Exception as e: + _LOG.warning("Failed to read apps list for %s: %s", self._identifier, e) + + # Match known friendly app name + if source in apps_list: + return await self._launch_app(apps_list[source]["url"]) + + # Match input source if source in inputs.KeyCode: return await self._switch_input(source) + # Fall back to direct launch (e.g. package name or intent URI) return await self._launch_app(source) @async_handle_atvlib_errors @@ -804,6 +835,17 @@ async def _send_command(self, keycode: int | str, action: KeyPress = KeyPress.SH :return: OK if scheduled to be sent, other error code in case of an error """ # noqa + if action == 'TEXT': + # Special handling for text input + if not isinstance(keycode, str): + _LOG.error("[%s] Cannot send command, invalid key_code: %s", self.log_id, keycode) + return ucapi.StatusCodes.BAD_REQUEST + if keycode in ('DEL', 'ENTER'): + self._atv.send_key_command(keycode, 'SHORT') + else: + self._atv.send_text(keycode) + return ucapi.StatusCodes.OK + if action in (KeyPress.LONG, KeyPress.BEGIN): direction = "START_LONG" elif action == KeyPress.END: @@ -827,6 +869,12 @@ async def _launch_app(self, app: str) -> ucapi.StatusCodes: self._atv.send_launch_app_command(app) return ucapi.StatusCodes.OK + @async_handle_atvlib_errors + async def _send_text(self, text: str) -> ucapi.StatusCodes: + """Launch an app on Android TV.""" + self._atv.send_text(text) + return ucapi.StatusCodes.OK + async def _switch_input(self, source: str) -> ucapi.StatusCodes: """ TEST FUNCTION: Send a KEYCODE_TV_INPUT_* key.