diff --git a/Framework/Built_In_Automation/Web/Selenium/linux_system.py b/Framework/Built_In_Automation/Web/Selenium/linux_system.py new file mode 100644 index 00000000..a9c29ddc --- /dev/null +++ b/Framework/Built_In_Automation/Web/Selenium/linux_system.py @@ -0,0 +1,241 @@ +import os +import re +import shutil +import subprocess +from pathlib import Path + + +class LinuxSystemHelper: + def __init__(self, unavailable_cache_file=None): + self.os_release = self._load_os_release() + self.unavailable_cache_file = ( + Path(unavailable_cache_file) if unavailable_cache_file else None + ) + + def _load_os_release(self): + data = {} + try: + with open("/etc/os-release", "r") as f: + for line in f: + line = line.strip() + if not line or "=" not in line: + continue + key, value = line.split("=", 1) + data[key] = value.strip().strip('"').strip("'") + except Exception: + return {} + return data + + def is_ubuntu_version_at_least(self, major, minor=0): + distro_id = self.os_release.get("ID", "").lower() + version_id = self.os_release.get("VERSION_ID", "") + + version_parts = version_id.split(".") + current_major = ( + int(version_parts[0]) + if len(version_parts) > 0 and version_parts[0].isdigit() + else 0 + ) + current_minor = ( + int(version_parts[1]) + if len(version_parts) > 1 and version_parts[1].isdigit() + else 0 + ) + + return distro_id == "ubuntu" and (current_major, current_minor) >= ( + major, + minor, + ) + + def is_gui_environment(self): + return bool(os.environ.get("DISPLAY") or os.environ.get("WAYLAND_DISPLAY")) + + def is_gnome_session(self): + desktop = os.environ.get("XDG_CURRENT_DESKTOP", "").lower() + session = os.environ.get("DESKTOP_SESSION", "").lower() + return "gnome" in desktop or "gnome" in session + + def supports_sudo_askpass(self): + if not shutil.which("sudo"): + return False + + try: + result = subprocess.run( + ["sudo", "-h"], + capture_output=True, + text=True, + check=False, + ) + help_output = f"{result.stdout}\n{result.stderr}".lower() + return "askpass" in help_output and "-a" in help_output + except Exception: + return False + + def get_privilege_escalation_command(self): + if ( + self.is_gui_environment() + and self.is_gnome_session() + and shutil.which("pkexec") + ): + return ["pkexec"], "pkexec" + + askpass = os.environ.get("SUDO_ASKPASS", "") + if ( + self.is_gui_environment() + and self.supports_sudo_askpass() + and askpass + and Path(askpass).exists() + and shutil.which("sudo") + ): + return ["sudo", "-A"], "sudo -A" + + return ["sudo"], "sudo" + + def _is_package_available(self, package): + try: + result = subprocess.run( + ["apt-cache", "policy", package], + capture_output=True, + text=True, + check=False, + ) + if result.returncode != 0: + return False + + output = f"{result.stdout}\n{result.stderr}".lower() + if "candidate:" not in output: + return False + + for line in output.splitlines(): + if line.strip().startswith("candidate:"): + candidate = line.split(":", 1)[1].strip() + return candidate != "(none)" + + return False + except Exception: + return False + + def _pick_available_package(self, package_options): + for package_name in package_options: + if self._is_package_available(package_name): + return package_name + return None + + def get_chrome_dependency_packages(self): + dependencies = [ + ["libnss3"], + ["libxss1"], + ["libappindicator3-1"], + ["fonts-liberation"], + ["libasound2t64", "libasound2"], + ["libnspr4"], + ["libx11-xcb1"], + ["libxcomposite1"], + ["libxcursor1"], + ["libxdamage1"], + ["libxi6"], + ["libxtst6"], + ["libglib2.0-0t64", "libglib2.0-0"], + ["libgtk-3-0t64", "libgtk-3-0"], + ["libgdk-pixbuf2.0-0", "libgdk-pixbuf-xlib-2.0-0"], + ["libxrandr2"], + ["libpangocairo-1.0-0"], + ["libatk1.0-0t64", "libatk1.0-0"], + ["libcairo-gobject2"], + ["xvfb"], + ["ca-certificates"], + ["libatk-bridge2.0-0t64", "libatk-bridge2.0-0"], + ["libdrm2"], + ["libxkbcommon0"], + ["lsb-release"], + ["wget"], + ["xdg-utils"], + ] + + selected_packages = [] + for package_group in dependencies: + selected = self._pick_available_package(package_group) + if selected: + selected_packages.append(selected) + else: + print( + "Warning: Could not resolve package from options: " + f"{', '.join(package_group)}" + ) + + return selected_packages + + def _is_package_installed(self, package): + try: + result = subprocess.run( + ["dpkg-query", "-W", "-f=${Status}", package], + capture_output=True, + text=True, + check=False, + ) + if result.returncode != 0: + return False + return "install ok installed" in result.stdout.lower() + except Exception: + return False + + def get_missing_packages(self, packages): + return [ + package for package in packages if not self._is_package_installed(package) + ] + + def _load_unavailable_packages(self): + if not self.unavailable_cache_file or not self.unavailable_cache_file.exists(): + return set() + + try: + with open(self.unavailable_cache_file, "r") as f: + return {line.strip() for line in f if line.strip()} + except Exception: + return set() + + def _save_unavailable_packages(self, packages): + if not self.unavailable_cache_file: + return + + try: + self.unavailable_cache_file.parent.mkdir(parents=True, exist_ok=True) + with open(self.unavailable_cache_file, "w") as f: + for package in sorted(packages): + f.write(f"{package}\n") + except Exception: + return + + def add_unavailable_packages(self, packages): + package_set = {pkg for pkg in packages if pkg} + if not package_set: + return set() + + existing = self._load_unavailable_packages() + updated = existing | package_set + newly_added = updated - existing + if newly_added: + self._save_unavailable_packages(updated) + return newly_added + + def filter_cached_unavailable_packages(self, packages): + cached = self._load_unavailable_packages() + allowed = [package for package in packages if package not in cached] + skipped = [package for package in packages if package in cached] + return allowed, skipped + + def get_cached_unavailable_packages(self): + return sorted(self._load_unavailable_packages()) + + def extract_unavailable_packages_from_apt_output(self, output): + patterns = [ + r"E:\s+Package '([^']+)' has no installation candidate", + r"E:\s+Unable to locate package\s+(\S+)", + r"Package\s+(\S+)\s+is not available, but is referred to by another package", + ] + + unavailable = set() + for pattern in patterns: + unavailable.update(re.findall(pattern, output)) + + return sorted(unavailable) diff --git a/Framework/Built_In_Automation/Web/Selenium/utils.py b/Framework/Built_In_Automation/Web/Selenium/utils.py index 3517718a..22431b76 100644 --- a/Framework/Built_In_Automation/Web/Selenium/utils.py +++ b/Framework/Built_In_Automation/Web/Selenium/utils.py @@ -20,82 +20,196 @@ from rich.progress import Progress from settings import ZEUZ_NODE_DOWNLOADS_DIR +try: + from .linux_system import LinuxSystemHelper +except ImportError: + from linux_system import LinuxSystemHelper + class ChromeForTesting: CHROME_BASE_DIR = ZEUZ_NODE_DOWNLOADS_DIR / "chrome_for_testing" CHROME_VERSIONS_DIR = CHROME_BASE_DIR / "versions" CHROME_INFO_FILE = CHROME_BASE_DIR / "info.json" + CHROME_LINUX_UNAVAILABLE_DEPS_FILE = CHROME_BASE_DIR / "unavailable_linux_deps.txt" def __init__(self): self.system = platform.system().lower() self.arch = platform.machine().lower() - + if self.system == "windows": self.platform_key = "win64" if self.arch in ("amd64", "x86_64") else "win32" elif self.system == "darwin": self.platform_key = "mac-arm64" if self.arch == "arm64" else "mac-x64" elif self.system == "linux": self.platform_key = "linux64" + self.linux_helper = LinuxSystemHelper( + unavailable_cache_file=self.CHROME_LINUX_UNAVAILABLE_DEPS_FILE + ) + self._install_linux_dependencies() else: raise OSError(f"Unsupported platform: {self.system}/{self.arch}") - + + if self.system != "linux": + self.linux_helper = None + self.CHROME_BASE_DIR.mkdir(parents=True, exist_ok=True) self.CHROME_VERSIONS_DIR.mkdir(exist_ok=True) - + if not self.CHROME_INFO_FILE.exists(): self._init_info_file() + def _install_linux_dependencies(self): + """Install Chrome dependencies for Ubuntu 24.04 and newer""" + try: + if self.linux_helper and self.linux_helper.is_ubuntu_version_at_least( + 24, 4 + ): + deps = self.linux_helper.get_chrome_dependency_packages() + if not deps: + print("Warning: No dependency packages resolved for Ubuntu.") + return + + missing_deps = self.linux_helper.get_missing_packages(deps) + if not missing_deps: + print( + "Chrome dependencies already installed. Skipping apt install." + ) + return + + install_deps, cached_unavailable = ( + self.linux_helper.filter_cached_unavailable_packages(missing_deps) + ) + if cached_unavailable: + print( + "Warning: Skipping previously unavailable packages: " + f"{', '.join(cached_unavailable)}" + ) + + if not install_deps: + print( + "All missing dependencies are marked unavailable. " + "Skipping apt install." + ) + return + + privilege_cmd, privilege_mode = ( + self.linux_helper.get_privilege_escalation_command() + ) + print( + f"Installing Chrome dependencies for Ubuntu using {privilege_mode}..." + ) + subprocess.run(privilege_cmd + ["apt-get", "update", "-qq"], check=True) + install_result = subprocess.run( + privilege_cmd + ["apt-get", "install", "-y"] + install_deps, + capture_output=True, + text=True, + check=False, + ) + install_ok = install_result.returncode == 0 + + if install_result.returncode != 0: + apt_output = f"{install_result.stdout}\n{install_result.stderr}" + unavailable_from_apt = ( + self.linux_helper.extract_unavailable_packages_from_apt_output( + apt_output + ) + ) + + if unavailable_from_apt: + newly_cached = self.linux_helper.add_unavailable_packages( + unavailable_from_apt + ) + if newly_cached: + print( + "Warning: Caching unavailable packages for future runs: " + f"{', '.join(sorted(newly_cached))}" + ) + + retry_deps = [ + package + for package in install_deps + if package not in unavailable_from_apt + ] + + if retry_deps: + retry_result = subprocess.run( + privilege_cmd + + ["apt-get", "install", "-y"] + + retry_deps, + capture_output=True, + text=True, + check=False, + ) + if retry_result.returncode != 0: + print("Warning: Could not install all dependencies.") + print( + retry_result.stderr.strip() + or retry_result.stdout.strip() + ) + else: + install_ok = True + else: + print( + "Warning: All attempted packages were unavailable in apt. " + "Continuing without interruption." + ) + else: + print("Warning: Could not install dependencies.") + print( + install_result.stderr.strip() + or install_result.stdout.strip() + ) + + if install_ok: + print("Dependencies installed successfully.") + else: + print("Dependency installation completed with warnings.") + else: + return + + except Exception as e: + print(f"Warning: Could not install dependencies: {e}") + print("You may need to install Chrome dependencies manually.") + def _init_info_file(self): - #modification here to add settings to default structure + # modification here to add settings to default structure """Initialize the info.json file with default structure""" info = { - "latest": { - "version": "", - "last_check": "" - }, + "latest": {"version": "", "last_check": ""}, "installed_versions": {}, # ex: ("132.0.6763.0" : "2025-07-02") "settings": { - "days_before_fetch": 15, # set default fetch latest after 15 days - "days_before_cleanup": 50 # set default cleanup old versions after 50 days - } + "days_before_fetch": 15, # set default fetch latest after 15 days + "days_before_cleanup": 50, # set default cleanup old versions after 50 days + }, } - with open(self.CHROME_INFO_FILE, 'w') as f: + with open(self.CHROME_INFO_FILE, "w") as f: json.dump(info, f, indent=4) - def _load_info(self): """Load the info.json content""" - #modification here to use defaults with settings + # modification here to use defaults with settings defaults = { - "latest": { - "version": "", - "last_check": "" - }, + "latest": {"version": "", "last_check": ""}, "installed_versions": {}, - "settings": { - "days_before_fetch": 15, - "days_before_cleanup": 50 - } + "settings": {"days_before_fetch": 15, "days_before_cleanup": 50}, } if not self.CHROME_INFO_FILE.exists(): return defaults - with open(self.CHROME_INFO_FILE, 'r') as f: + with open(self.CHROME_INFO_FILE, "r") as f: info = json.load(f) - #adds settings if missing + # adds settings if missing if "settings" not in info: info["settings"] = defaults["settings"] self._save_info(info) return info - - def _save_info(self, info): """Save data to info.json""" - with open(self.CHROME_INFO_FILE, 'w') as f: + with open(self.CHROME_INFO_FILE, "w") as f: json.dump(info, f, indent=4) def get_latest_version(self, channel="Stable", force_check=False): @@ -104,12 +218,12 @@ def get_latest_version(self, channel="Stable", force_check=False): latest_info = info.get("latest", {}) cached_version = latest_info.get("version", "") last_check_str = latest_info.get("last_check", "") - - #get days_before_fetch from settings + + # get days_before_fetch from settings settings = info.get("settings", {}) # Check environment variable first - env_fetch = os.environ.get('CHROME_DAYS_BEFORE_FETCH') + env_fetch = os.environ.get("CHROME_DAYS_BEFORE_FETCH") if env_fetch: days_before_fetch = int(env_fetch) print(f"Using days_before_fetch from env: {days_before_fetch}") @@ -117,82 +231,96 @@ def get_latest_version(self, channel="Stable", force_check=False): # otherwise use info.json or default days_before_fetch = settings.get("days_before_fetch", 15) - #modification here to use settings for days_before_fetch + # modification here to use settings for days_before_fetch if last_check_str and not force_check: last_check = datetime.datetime.fromisoformat(last_check_str).date() - - if (datetime.date.today() - last_check) <= timedelta(days=days_before_fetch): + + if (datetime.date.today() - last_check) <= timedelta( + days=days_before_fetch + ): return cached_version - + # Fetch from API - response = requests.get("https://googlechromelabs.github.io/chrome-for-testing/last-known-good-versions.json", verify=False) + response = requests.get( + "https://googlechromelabs.github.io/chrome-for-testing/last-known-good-versions.json", + verify=False, + ) response.raise_for_status() data = response.json() - new_version = data['channels'][channel]['version'] - + new_version = data["channels"][channel]["version"] + # Update info info["latest"] = { "version": new_version, - "last_check": datetime.date.today().isoformat() + "last_check": datetime.date.today().isoformat(), } self._save_info(info) - - return new_version + return new_version def get_download_url_for_version(self, version): """Get download URLs for specific Chrome version""" response = requests.get( "https://googlechromelabs.github.io/chrome-for-testing/known-good-versions-with-downloads.json", - verify=False + verify=False, ) response.raise_for_status() data = response.json() - - version_entry = next((v for v in data['versions'] if v['version'] == version), None) + + version_entry = next( + (v for v in data["versions"] if v["version"] == version), None + ) if not version_entry: raise Exception(f"Version {version} not found in known-good-versions") - + # Find Chrome URL chrome_entry = next( - (item for item in version_entry['downloads']['chrome'] - if item['platform'] == self.platform_key), - None + ( + item + for item in version_entry["downloads"]["chrome"] + if item["platform"] == self.platform_key + ), + None, ) - + # Find ChromeDriver URL driver_entry = next( - (item for item in version_entry['downloads']['chromedriver'] - if item['platform'] == self.platform_key), - None + ( + item + for item in version_entry["downloads"]["chromedriver"] + if item["platform"] == self.platform_key + ), + None, ) if not chrome_entry or not driver_entry: - raise Exception(f"Download URLs not found for platform: {self.platform_key}") - - return chrome_entry['url'], driver_entry['url'] + raise Exception( + f"Download URLs not found for platform: {self.platform_key}" + ) + + return chrome_entry["url"], driver_entry["url"] def is_version_installed(self, version): """Check if version is installed and binaries exist""" info = self._load_info() installed_versions = info.get("installed_versions", {}) - + # Check if version is in installed list if version not in installed_versions: return False - + # Verify binaries exist version_dir = self.CHROME_VERSIONS_DIR / version chrome_bin = self.get_chrome_binary_path(version_dir) driver_bin = self.get_driver_binary_path(version_dir) - + return chrome_bin.exists() and driver_bin.exists() def _update_installed_version_date(self, version): """Update the last used date for an installed version""" info = self._load_info() today = datetime.date.today().isoformat() - + info["installed_versions"][version] = today self._save_info(info) @@ -200,11 +328,18 @@ def get_chrome_binary_path(self, version_dir): """Get path to Chrome binary""" chrome_dir_name = f"chrome-{self.platform_key}" chrome_dir = version_dir / "chrome" - + if self.system == "windows": return chrome_dir / chrome_dir_name / "chrome.exe" elif self.system == "darwin": - return chrome_dir / chrome_dir_name / "Google Chrome for Testing.app" / "Contents" / "MacOS" / "Google Chrome for Testing" + return ( + chrome_dir + / chrome_dir_name + / "Google Chrome for Testing.app" + / "Contents" + / "MacOS" + / "Google Chrome for Testing" + ) elif self.system == "linux": return chrome_dir / chrome_dir_name / "chrome" return None @@ -213,7 +348,7 @@ def get_driver_binary_path(self, version_dir): """Get path to ChromeDriver binary""" driver_dir_name = f"chromedriver-{self.platform_key}" driver_dir = version_dir / "driver" - + if self.system == "windows": return driver_dir / driver_dir_name / "chromedriver.exe" else: @@ -224,29 +359,32 @@ def download_file(self, url, target_path, title): response = requests.get(url, stream=True, verify=False) response.raise_for_status() - total_size = int(response.headers.get('content-length', 0)) # gets total size + total_size = int(response.headers.get("content-length", 0)) # gets total size block_size = 1024 # 1 Kibibyte - - with open(target_path, "wb") as f, Progress() as progress: + with open(target_path, "wb") as f, Progress() as progress: task = progress.add_task(title, total=total_size) - for block in response.iter_content(block_size): # block size 1K + for block in response.iter_content(block_size): # block size 1K if block: - f.write(block) #writes to file - progress.update(task, advance=len(block)) # advances bar length by block size + f.write(block) # writes to file + progress.update( + task, advance=len(block) + ) # advances bar length by block size return target_path - def extract_zip(self, content, target_dir): #modification here since path is passed instead of bytes + def extract_zip( + self, content, target_dir + ): # modification here since path is passed instead of bytes """Extract ZIP content to target directory""" with zipfile.ZipFile(content) as zip_ref: for member in zip_ref.infolist(): if member.is_dir(): continue - + extracted_path = zip_ref.extract(member, target_dir) - + perm = member.external_attr >> 16 if perm: os.chmod(extracted_path, perm) @@ -255,12 +393,12 @@ def set_execute_permissions(self, version_dir): """Set execute permissions for binaries (Unix systems)""" chrome_bin = self.get_chrome_binary_path(version_dir) driver_bin = self.get_driver_binary_path(version_dir) - + if self.system in ["linux", "darwin"]: # Set permissions if chrome_bin and chrome_bin.exists(): chrome_bin.chmod(chrome_bin.stat().st_mode | stat.S_IEXEC) - + if driver_bin and driver_bin.exists(): driver_bin.chmod(driver_bin.stat().st_mode | stat.S_IEXEC) @@ -279,76 +417,79 @@ def cleanup_old_versions(self): installed_versions = info.get("installed_versions", {}) today = datetime.date.today() - #get days_before_cleanup from settings + # get days_before_cleanup from settings settings = info.get("settings", {}) - + # Check environment variable first - env_fetch = os.environ.get('CHROME_DAYS_BEFORE_CLEANUP') + env_fetch = os.environ.get("CHROME_DAYS_BEFORE_CLEANUP") if env_fetch: days_before_cleanup = int(env_fetch) print(f"Using days_before_cleanup from env: {days_before_cleanup}") else: # otherwise use info.json or default days_before_cleanup = settings.get("days_before_cleanup", 50) - - #modification here to use settings for days_before_cleanup + + # modification here to use settings for days_before_cleanup cutoff_date = today - timedelta(days=days_before_cleanup) - + versions_to_remove = [] for version, date_str in installed_versions.items(): if not date_str: continue - + last_used = datetime.date.fromisoformat(date_str) if last_used < cutoff_date: versions_to_remove.append(version) - + for version in versions_to_remove: version_dir = self.CHROME_VERSIONS_DIR / version if version_dir.exists(): print(f"Cleaning up unused CfT version: {version}") shutil.rmtree(version_dir, ignore_errors=True) del installed_versions[version] - + if versions_to_remove: info["installed_versions"] = installed_versions self._save_info(info) print(f"Removed {len(versions_to_remove)} old versions of CfT") - def install_version(self, version): """Install a specific Chrome version""" version_dir = self.CHROME_VERSIONS_DIR / version - + if version_dir.exists(): shutil.rmtree(version_dir) version_dir.mkdir(parents=True) chrome_url, driver_url = self.get_download_url_for_version(version) - + chrome_zip_path = version_dir / "chrome.zip" - + # Download and extract Chrome print(f"Downloading Chrome for Testing {version}...") - self.download_file(chrome_url, chrome_zip_path, title="Downloading Chrome") #download zip to path + self.download_file( + chrome_url, chrome_zip_path, title="Downloading Chrome" + ) # download zip to path print(f"Extracting Chrome to {version_dir / 'chrome'}...") - self.extract_zip(open(chrome_zip_path, 'rb'), version_dir / "chrome") + self.extract_zip(open(chrome_zip_path, "rb"), version_dir / "chrome") + + chrome_zip_path.unlink() # remove zip - chrome_zip_path.unlink() # remove zip - driver_zip_path = version_dir / "driver.zip" - + # Download and extract ChromeDriver print(f"Downloading ChromeDriver {version}...") - self.download_file(driver_url, driver_zip_path, title="Downloading ChromeDriver") + self.download_file( + driver_url, driver_zip_path, title="Downloading ChromeDriver" + ) print(f"Extracting ChromeDriver to {version_dir / 'driver'}...") - self.extract_zip(open(driver_zip_path, 'rb'), version_dir / "driver") - + self.extract_zip(open(driver_zip_path, "rb"), version_dir / "driver") + driver_zip_path.unlink() # Set execute permissions (Unix systems) self.set_execute_permissions(version_dir) - + # Update installed versions self._update_installed_version_date(version) print(f"\nSuccessfully installed Chrome for Testing {version}") @@ -367,9 +508,11 @@ def setup_chrome_for_testing(self, version=None, channel=None): print("Chrome for testing version must be at least: '115.0.5763.0'") return None, None if version.strip().lower() == "system": - print("Forcefully trying to use regular chrome instead of chrome for testing.") + print( + "Forcefully trying to use regular chrome instead of chrome for testing." + ) return None, None - + # Use latest version if not specified if not version: version = self.get_latest_version(channel=channel, force_check=False) @@ -379,20 +522,22 @@ def setup_chrome_for_testing(self, version=None, channel=None): # Install if not already installed if not self.is_version_installed(version): - print(f"Chrome for testing version {version} is not installed. Downloading...") + print( + f"Chrome for testing version {version} is not installed. Downloading..." + ) self.install_version(version) else: print(f"Chrome for testing version {version} already installed.") # Update last used date self._update_installed_version_date(version) - + version_dir = self.CHROME_VERSIONS_DIR / version chrome_bin = self.get_chrome_binary_path(version_dir) driver_bin = self.get_driver_binary_path(version_dir) - + if not chrome_bin.exists() or not driver_bin.exists(): raise FileNotFoundError("Required binaries not found after installation") - + return chrome_bin, driver_bin @@ -428,7 +573,6 @@ def setup_chrome_for_testing(self, version=None, channel=None): # print("Selenium test failed:", str(e)) - class ChromeExtensionDownloader: CHROME_EXTENSIONS_DIR = ZEUZ_NODE_DOWNLOADS_DIR / "chrome_extensions" CFT_INFO_FILE = ZEUZ_NODE_DOWNLOADS_DIR / "chrome_for_testing" / "info.json" @@ -437,7 +581,7 @@ class ChromeExtensionDownloader: def __init__(self, chrome_version=None): self.system = platform.system().lower() self.arch = platform.machine().lower() - + self.chrome_version = chrome_version or self._get_cft_version() self._setup_platform_info() self.output_dir = self.CHROME_EXTENSIONS_DIR @@ -447,14 +591,14 @@ def __init__(self, chrome_version=None): def _get_cft_version(self): try: if self.CFT_INFO_FILE.exists(): - with open(self.CFT_INFO_FILE, 'r') as f: + with open(self.CFT_INFO_FILE, "r") as f: cft_info = json.load(f) latest_version = cft_info.get("latest", {}).get("version", "") if latest_version: return latest_version except Exception as e: print(f"Warning: Could not read CfT info file: {e}") - + return self.DEFAULT_CHROME_VERSION def _setup_platform_info(self): @@ -466,7 +610,7 @@ def _setup_platform_info(self): self.platform_os = "Linux" else: raise OSError(f"Unsupported platform: {self.system}") - + if "arm" in self.arch or "aarch" in self.arch: self.platform_arch = "arm" elif self.arch in ("x86_64", "amd64", "x64"): @@ -487,7 +631,7 @@ def _build_download_url(self, extension_id): ("prodchannel", "unknown"), ("prodversion", self.chrome_version), ("acceptformat", "crx3"), - ("x", f"id%3D{extension_id}%26uc") + ("x", f"id%3D{extension_id}%26uc"), ] query_string = "&".join([f"{k}={v}" for k, v in params]) return f"{base_url}?{query_string}" @@ -495,79 +639,81 @@ def _build_download_url(self, extension_id): def _get_download_headers(self, extension_id): return { "User-Agent": f"Mozilla/5.0 Chrome/{self.chrome_version}", - "Referer": f"https://chrome.google.com/webstore/detail/{extension_id}" + "Referer": f"https://chrome.google.com/webstore/detail/{extension_id}", } def download_extension(self, extension_id, extract=False, keep_crx=True): - print(f"Downloading extension '{extension_id}' for Chrome {self.chrome_version}...") - + print( + f"Downloading extension '{extension_id}' for Chrome {self.chrome_version}..." + ) + # Clean up first extension_dir = self.output_dir / extension_id if extension_dir.exists(): shutil.rmtree(extension_dir) extension_dir.mkdir(parents=True, exist_ok=True) - + url = self._build_download_url(extension_id) headers = self._get_download_headers(extension_id) - + crx_path = extension_dir / f"{extension_id}.crx" - + try: req = urllib.request.Request(url, headers=headers) with urllib.request.urlopen(req) as response: - with open(crx_path, 'wb') as f: + with open(crx_path, "wb") as f: f.write(response.read()) except Exception as e: raise Exception(f"Download failed: {str(e)}") - + if crx_path.stat().st_size == 0: crx_path.unlink() raise Exception("Downloaded file is empty") - + result = { "extension_id": extension_id, "chrome_version": self.chrome_version, "crx_path": str(crx_path), "extracted_path": None, - "file_size": crx_path.stat().st_size + "file_size": crx_path.stat().st_size, } - + if extract: extracted_path = self.extract_extension(crx_path) result["extracted_path"] = str(extracted_path) - + if not keep_crx: crx_path.unlink() result["crx_path"] = None - + return result def extract_extension(self, crx_path): crx_path = Path(crx_path) extract_path = crx_path.parent / crx_path.stem - + if extract_path.exists(): shutil.rmtree(extract_path) - + try: - with open(crx_path, 'rb') as f: + with open(crx_path, "rb") as f: # Check CRX header magic = f.read(4) - - if magic == b'Cr24': # CRX v3 format + + if magic == b"Cr24": # CRX v3 format # Skip header (version + header length fields) f.read(8) - header_length = struct.unpack('