Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
9ba1b32
updates to external_metadata.py for async and adding httpx dependancy
thomasm789 Apr 23, 2025
a558e10
fixes bugs in 64 ands 63
thomasm789 Apr 23, 2025
15df3fa
update default metadata
thomasm789 Apr 23, 2025
b20e500
further tweaks
thomasm789 Apr 23, 2025
74e8fd4
further tweaks
thomasm789 Apr 23, 2025
4709da4
adb poc
thomasm789 Apr 23, 2025
27ec593
update
thomasm789 Apr 24, 2025
1e2c497
merge from main
thomasm789 Apr 24, 2025
a7f53d2
update adb attempt
thomasm789 Apr 29, 2025
fe13f34
Merge remote-tracking branch 'origin/main' into android-adb
thomasm789 Apr 29, 2025
3ef5ba7
updates
thomasm789 Apr 29, 2025
17c2690
update
thomasm789 Apr 29, 2025
95ecaaf
lint
thomasm789 Apr 29, 2025
ced371e
isort
thomasm789 Apr 29, 2025
9573f72
update setupflow to allow for application list selection and if adb a…
thomasm789 Apr 30, 2025
ab015f7
testing
thomasm789 Apr 30, 2025
4ead16b
final commit - adding in apps configurator + adb + cleaning up adb ke…
thomasm789 Apr 30, 2025
fdefbfb
using idmapping if friendly names are already known + fixes
thomasm789 Apr 30, 2025
ca21a78
fixes
thomasm789 Apr 30, 2025
b0ae053
linting
thomasm789 Apr 30, 2025
2b621da
adding in requirements.txt
thomasm789 Apr 30, 2025
3739dd7
clean up
thomasm789 Apr 30, 2025
a799181
clean up
thomasm789 Apr 30, 2025
e35a2f8
replacing cache_root with config _get_data_root
thomasm789 Apr 30, 2025
f57eb1e
linting
thomasm789 Apr 30, 2025
447086e
adding send_text method
thomasm789 May 1, 2025
5044cd3
adding in simple commands for text entry via androidremote2
thomasm789 May 1, 2025
b76d49b
clean up
thomasm789 May 1, 2025
1a01857
add text backspace support
thomasm789 May 1, 2025
ca2c5d4
updating androidtv pem location to go into /certs folder
thomasm789 May 1, 2025
0f0f851
adding in enter
thomasm789 May 1, 2025
9d2525a
clean up app list logic - remove duplicates and sort alpha and make f…
thomasm789 May 2, 2025
0405df2
clean up app list logic
thomasm789 May 2, 2025
00de323
clean up app list logic
thomasm789 May 2, 2025
431d54c
Merge branch 'send-text'
thomasm789 May 5, 2025
e05a1dc
adding further external metadata to pull youtube artwork
thomasm789 May 5, 2025
4af1e62
adding further external metadata to pull youtube artwork
thomasm789 May 5, 2025
e655722
testing
thomasm789 May 6, 2025
9d89387
unified media handling
thomasm789 May 6, 2025
c7c2887
tweaks
thomasm789 May 6, 2025
ca4cf4b
tweaks
thomasm789 May 6, 2025
7e0d892
tweaks
thomasm789 May 6, 2025
9e10d46
update
thomasm789 May 14, 2025
7da4420
update
thomasm789 May 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@ pillow>=11.2.1
requests>=2.32
pychromecast~=14.0.7
httpx~=0.28.1
sanitize-filename~=1.2.0
sanitize-filename~=1.2.0
adb_shell==0.4.4
async_timeout~=5.0.1
simple-justwatch-python-api>=0.16
109 changes: 109 additions & 0 deletions src/adb_tv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""
This module provides utilities for interacting with Android TVs via ADB (Android Debug Bridge).
It includes functions for managing ADB keys, connecting to devices, retrieving installed apps,
and verifying device authorization.
"""

import asyncio
import os
from pathlib import Path
from typing import Dict, Optional

from adb_shell.adb_device_async import AdbDeviceTcpAsync
from adb_shell.auth.keygen import keygen
from adb_shell.auth.sign_pythonrsa import PythonRSASigner

ADB_CERTS_DIR = Path(os.environ.get("UC_CONFIG_HOME", "./config")) / "certs"
ADB_CERTS_DIR.mkdir(parents=True, exist_ok=True)


def get_adb_key_paths(device_id: str) -> tuple[Path, Path]:
"""
Return the paths to the private and public ADB keys for a given device.

Args:
device_id (str): The unique identifier for the device.

Returns:
tuple[Path, Path]: Paths to the private and public key files.
"""
priv = ADB_CERTS_DIR / f"adb_{device_id}"
pub = ADB_CERTS_DIR / f"adb_{device_id}.pub"
return priv, pub


def load_or_generate_adb_keys(device_id: str) -> PythonRSASigner:
"""
Ensure ADB RSA keys exist for the device and return the signer.

Args:
device_id (str): The unique identifier for the device.

Returns:
PythonRSASigner: The signer object for ADB authentication.
"""
priv_path, pub_path = get_adb_key_paths(device_id)

if not priv_path.exists() or not pub_path.exists():
keygen(str(priv_path))

with open(priv_path) as f:
priv = f.read()
with open(pub_path) as f:
pub = f.read()
return PythonRSASigner(pub, priv)


async def adb_connect(device_id: str, host: str, port: int = 5555) -> Optional[AdbDeviceTcpAsync]:
"""
Connect to an Android device via ADB.

Args:
device_id (str): The unique identifier for the device.
host (str): The IP address or hostname of the device.
port (int, optional): The port number for the ADB connection. Defaults to 5555.

Returns:
Optional[AdbDeviceTcpAsync]: The connected ADB device object, or None if the connection fails.
"""
signer = load_or_generate_adb_keys(device_id)
device = AdbDeviceTcpAsync(host, port, default_transport_timeout_s=9.0)

try:
await device.connect(rsa_keys=[signer], auth_timeout_s=20)
return device
except Exception as e:
print(f"ADB connection failed to {host}:{port} — {e}")
return None


async def get_installed_apps(device: AdbDeviceTcpAsync) -> Dict[str, Dict[str, str]]:
"""
Retrieve a list of installed non-system apps in a structured format.

Args:
device (AdbDeviceTcpAsync): The connected ADB device.

Returns:
Dict[str, Dict[str, str]]: A dictionary of app package names and their metadata.
"""
output = await device.shell("pm list packages -3 -e")
packages = sorted(line.replace("package:", "").strip() for line in output.splitlines())
return {package: {"url": f"market://launch?id={package}"} for package in packages}


async def is_authorised(device: AdbDeviceTcpAsync) -> bool:
"""
Check if the connected device is authorized for ADB communication.

Args:
device (AdbDeviceTcpAsync): The connected ADB device.

Returns:
bool: True if the device is authorized, False otherwise.
"""
try:
result = await device.shell("echo ADB_OK")
return "ADB_OK" in result
except Exception:
return False
26 changes: 22 additions & 4 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,27 @@
import logging
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Iterator

_LOG = logging.getLogger(__name__)

_CFG_FILENAME = "config.json"


# Paths
def _get_config_root() -> Path:
config_home = Path(os.environ.get("UC_CONFIG_HOME", "./config"))
config_home.mkdir(parents=True, exist_ok=True)
return config_home


def _get_data_root() -> Path:
data_home = Path(os.environ.get("UC_DATA_HOME", "./data"))
data_home.mkdir(parents=True, exist_ok=True)
return data_home


@dataclass
class AtvDevice:
"""Android TV device configuration."""
Expand All @@ -38,6 +52,8 @@ class AtvDevice:
"""Enable External Metadata."""
use_chromecast: bool = False
"""Enable Chromecast features."""
use_adb: bool = False
"""Enable ADB features."""


class _EnhancedJSONEncoder(json.JSONEncoder):
Expand Down Expand Up @@ -133,24 +149,25 @@ def update(self, atv: AtvDevice) -> bool:
item.auth_error = atv.auth_error
item.use_external_metadata = atv.use_external_metadata
item.use_chromecast = atv.use_chromecast
item.use_adb = atv.use_adb
return self.store()
return False

def default_certfile(self) -> str:
"""Return the default certificate file for initializing a device."""
return os.path.join(self._data_path, "androidtv_remote_cert.pem")
return os.path.join(self._data_path + "/certs", "androidtv_remote_cert.pem")

def default_keyfile(self) -> str:
"""Return the default key file for initializing a device."""
return os.path.join(self._data_path, "androidtv_remote_key.pem")
return os.path.join(self._data_path + "/certs", "androidtv_remote_key.pem")

def certfile(self, atv_id: str) -> str:
"""Return the certificate file of the device."""
return os.path.join(self._data_path, f"androidtv_{atv_id}_remote_cert.pem")
return os.path.join(self._data_path + "/certs", f"androidtv_{atv_id}_remote_cert.pem")

def keyfile(self, atv_id: str) -> str:
"""Return the key file of the device."""
return os.path.join(self._data_path, f"androidtv_{atv_id}_remote_key.pem")
return os.path.join(self._data_path + "/certs", f"androidtv_{atv_id}_remote_key.pem")

def remove(self, atv_id: str) -> bool:
"""Remove the given device configuration."""
Expand Down Expand Up @@ -236,6 +253,7 @@ def load(self) -> bool:
item.get("auth_error", False),
item.get("use_external_metadata", False),
item.get("use_chromecast", False),
item.get("use_adb", False),
)
self._config.append(atv)
return True
Expand Down
2 changes: 1 addition & 1 deletion src/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ async def main():
logging.getLogger("setup_flow").setLevel(level)
logging.getLogger("androidtvremote2").setLevel(level)
logging.getLogger("external_metadata").setLevel(level)
# logging.getLogger("pychromecast").setLevel(level)
logging.getLogger("pychromecast").setLevel(level)

profile_path = os.path.join(api.config_dir_path, "profiles")
device_profile.load(profile_path)
Expand Down
114 changes: 96 additions & 18 deletions src/external_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@
from io import BytesIO
from pathlib import Path
from typing import Dict
from urllib.parse import urlparse
from urllib.parse import urlparse, quote
import re

import google_play_scraper
import httpx
from PIL import Image
from PIL.Image import Resampling
from pychromecast.controllers.media import MediaImage
from sanitize_filename import sanitize
from config import _get_config_root, _get_data_root
from simplejustwatchapi import justwatch

_LOG = logging.getLogger(__name__)

Expand All @@ -30,27 +33,14 @@


# Paths
def _get_config_root() -> Path:
config_home = Path(os.environ.get("UC_CONFIG_HOME", "./config"))
config_home.mkdir(parents=True, exist_ok=True)
return config_home


def _get_cache_root() -> Path:
data_home = Path(os.environ.get("UC_DATA_HOME", "./data"))
cache_root = data_home / CACHE_ROOT
cache_root.mkdir(parents=True, exist_ok=True)
return cache_root


def _get_metadata_dir() -> Path:
metadata_dir = _get_cache_root()
metadata_dir = _get_data_root() / CACHE_ROOT
metadata_dir.mkdir(parents=True, exist_ok=True)
return metadata_dir


def _get_icon_dir() -> Path:
icon_dir = _get_cache_root() / ICON_SUBDIR
icon_dir = _get_data_root() / CACHE_ROOT / ICON_SUBDIR
icon_dir.mkdir(parents=True, exist_ok=True)
return icon_dir

Expand All @@ -68,7 +58,6 @@ def _get_icon_path(icon_name: str) -> Path:
return _get_config_root() / "icons" / sanitize(icon_name[9:])
return _get_icon_dir() / sanitize(icon_name)


# Cache Management
def _load_cache() -> Dict[str, Dict[str, str]]:
path = _get_metadata_file_path()
Expand Down Expand Up @@ -228,4 +217,93 @@ async def get_app_metadata(package_id: str) -> Dict[str, str]:
return {"name": metadata["name"], "icon": icon_data_uri}

_LOG.debug("Falling back to default metadata for %s", package_id)
return {"name": package_id, "icon": ""}
return {"name": "", "icon": ""}


async def youtube_search(query: str, limit: int = 1):
url = f"https://www.youtube.com/results?search_query={quote(query)}"
headers = {
"User-Agent": "Mozilla/5.0"
}

with httpx.Client(headers=headers, timeout=10) as client:
response = client.get(url)
html = response.text

# Extract the ytInitialData JSON
match = re.search(r"var ytInitialData = ({.*?});</script>", html)
if not match:
raise RuntimeError("Could not find ytInitialData in the page")

data = json.loads(match.group(1))

try:
items = (
data["contents"]["twoColumnSearchResultsRenderer"]
["primaryContents"]["sectionListRenderer"]
["contents"][0]["itemSectionRenderer"]["contents"]
)
except (KeyError, IndexError):
raise RuntimeError("Could not parse YouTube data structure")

for item in items:
if "videoRenderer" in item:
video = item["videoRenderer"]
video_id = video.get("videoId")

return f"https://img.youtube.com/vi/{video_id}/0.jpg"

return None

async def search_poster_justwatch(query: str, country: str = "GB", limit: int = 1) -> list[dict]:
"""Search for poster images using JustWatch API."""

response = justwatch.search(query, country, 'en', count=1, best_only=True)

if not response[0].poster:
return None
else:
poster_url = response[0].poster

if poster_url:
return poster_url

return None

async def get_best_artwork(title: str, artist: str = None, current_package: str = None) -> Dict[str, str] | bool:
_LOG.debug("Resolving best artwork for title='%s', artist='%s', current_package='%s'", title, artist, current_package)

search_query = f"{title} - {artist}" if artist else title

if current_package in ["com.google.android.youtube.tv", "com.liskovsoft.videomanager", "com.teamsmart.videomanager.tv"]:

_LOG.debug("YouTube detected. Searching for artwork.")

youtube = await youtube_search(search_query)

if youtube:
_LOG.debug("Artwork result:\n%s", json.dumps(youtube, indent=2))
return youtube
else:
_LOG.debug("No artwork found from YouTube search.")

else:

_LOG.debug("Non-YouTube package detected. Searching for artwork.")
justwatch = await search_poster_justwatch(search_query)

if justwatch:
_LOG.debug("Artwork result:\n%s", json.dumps(justwatch, indent=2))
return justwatch
else:
_LOG.debug("No artwork found from JustWatch search.")

_LOG.debug("No artwork source applicable. Returning False.")
return None


# async def test():
# posters = await get_best_artwork("Episode 1", "Breaking Bad", "com.plexapp.android")
# print(posters)
#
# asyncio.run(test())
19 changes: 19 additions & 0 deletions src/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,25 @@ def match(self, manufacturer: str, model: str, use_chromecast: bool) -> Profile:
select_profile = copy.copy(select_profile)
select_profile.features.extend(CHROMECAST_FEATURES)

import string
for char in string.ascii_uppercase + " ":
# Use 'SPACE' as the key name for the space character
key = "SPACE" if char == " " else char
command_name = f"TEXT_{key}"

# Append to simple_commands list
select_profile.simple_commands.append(command_name)

# Map the command: space gets ' ', others get lowercase letter
command_value = " " if char == " " else char.lower()
select_profile.command_map[command_name] = Command(command_value, 'TEXT')

select_profile.simple_commands.append('TEXT_BACKSPACE')
select_profile.command_map['TEXT_BACKSPACE'] = Command('DEL', KeyPress.SHORT)

select_profile.simple_commands.append('TEXT_ENTER')
select_profile.command_map['TEXT_ENTER'] = Command('ENTER', KeyPress.SHORT)

return select_profile


Expand Down
Loading