From 474cabe31c37f1be181ef1d98f5303daf2f62dd3 Mon Sep 17 00:00:00 2001 From: Bastian Neumann Date: Wed, 4 Feb 2026 21:54:21 +0100 Subject: [PATCH 01/16] feat: implement global authentication mode and enhance vehicle initialization - Added support for global authentication mode in SmartAccount and SmartAuthentication classes. - Introduced methods for initializing vehicles from Smart Global servers. - Updated API session handling to accommodate global app requirements. - Enhanced logging and error handling for global API interactions. - Modified CLI to support new region options for global access. --- pyproject.toml | 8 +- pysmarthashtag/account.py | 128 ++++++++++- pysmarthashtag/api/authentication.py | 235 ++++++++++++++++++--- pysmarthashtag/api/utils.py | 125 +++++++++++ pysmarthashtag/cli.py | 8 +- pysmarthashtag/const.py | 55 +++++ pysmarthashtag/tests/common.py | 10 + pysmarthashtag/tests/test_endpoint_urls.py | 8 + pysmarthashtag/vehicle/vehicle.py | 16 +- 9 files changed, 548 insertions(+), 45 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index c24b9d5..166d055 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,7 +51,7 @@ enable_error_code = "ignore-without-code" target-version = "py39" line-length = 120 -select = [ +lint.select = [ "C", # complexity "D", # docstrings "E", # pycodestyle @@ -62,7 +62,7 @@ select = [ "PGH004", # Use specific rule codes when using noqa ] -ignore = [ +lint.ignore = [ "D202", # No blank lines allowed after function docstring "D203", # 1 blank line required before class docstring "D212", # Multi-line docstring summary should start at the first line @@ -75,9 +75,9 @@ ignore = [ exclude = [ ] -[tool.ruff.per-file-ignores] +[tool.ruff.lint.per-file-ignores] "docs/source/conf.py" = ["D100"] "pysamrthashtag/api/authentication.py" = ["D102", "D107"] -[tool.ruff.mccabe] +[tool.ruff.lint.mccabe] max-complexity = 25 diff --git a/pysmarthashtag/account.py b/pysmarthashtag/account.py index 7734a9e..8fb578f 100644 --- a/pysmarthashtag/account.py +++ b/pysmarthashtag/account.py @@ -6,11 +6,13 @@ from dataclasses import InitVar, dataclass, field from typing import Optional +import httpx + from pysmarthashtag.api import utils -from pysmarthashtag.api.authentication import SmartAuthentication +from pysmarthashtag.api.authentication import SmartAuthentication, SmartLoginClient from pysmarthashtag.api.client import SmartClient, SmartClientConfiguration from pysmarthashtag.api.log_sanitizer import sanitize_log_data -from pysmarthashtag.const import API_CARS_URL, API_SELECT_CAR_URL, EndpointUrls +from pysmarthashtag.const import API_CARS_URL, API_SELECT_CAR_URL, EndpointUrls, SmartAuthMode from pysmarthashtag.models import SmartAuthError, SmartHumanCarConnectionError, SmartTokenRefreshNecessary from pysmarthashtag.vehicle.vehicle import SmartVehicle @@ -53,6 +55,10 @@ def __post_init__(self, password, log_responses): log_responses=log_responses, ) + def _is_global_auth(self) -> bool: + """Return True when using the Global app authentication mode.""" + return self.config.authentication.auth_mode == SmartAuthMode.GLOBAL_HMAC + async def _ensure_ssl_context(self) -> None: """Ensure SSL context is created asynchronously. @@ -113,6 +119,38 @@ async def _init_vehicles(self) -> None: _LOGGER.debug("Found vehicle %s", sanitize_log_data(vehicle)) self.add_vehicle(vehicle, fetched_at) + async def _init_vehicles_global(self) -> None: + """Initialize vehicles from Smart Global servers.""" + _LOGGER.debug("Getting initial vehicle list (global)") + await self._ensure_ssl_context() + + fetched_at = datetime.datetime.now(datetime.timezone.utc) + async with SmartLoginClient(ssl_context=self.config.ssl_context) as client: + path = "/vc/vehicle/v1/ownership/list" + body = json.dumps({}) + host = httpx.URL(self.endpoint_urls.get_api_base_url()).host + headers = utils.generate_global_header( + method="POST", + path=path, + host=host, + app_key=self.endpoint_urls.get_global_app_key(), + app_secret=self.endpoint_urls.get_global_app_secret(), + body=body, + access_token=self.config.authentication.access_token, + user_id=self.config.authentication.api_user_id, + id_token=self.config.authentication.id_token, + ) + vehicles_response = await client.post( + self.endpoint_urls.get_api_base_url() + path, + headers=headers, + content=body, + ) + data = vehicles_response.json() + vehicles = data.get("result") or data.get("data") or [] + for vehicle in vehicles: + _LOGGER.debug("Found vehicle %s", sanitize_log_data(vehicle)) + self.add_vehicle(vehicle, fetched_at) + def add_vehicle(self, vehicle, fetched_at): """Add a vehicle to the account.""" self.vehicles[vehicle.get("vin")] = SmartVehicle(self, vehicle, fetched_at=fetched_at) @@ -126,7 +164,14 @@ async def get_vehicles(self, force_init: bool = False) -> None: _LOGGER.debug("Getting vehicles for account") if len(self.vehicles) == 0 or force_init: - await self._init_vehicles() + if self._is_global_auth(): + await self._init_vehicles_global() + else: + await self._init_vehicles() + + if self._is_global_auth(): + await self._update_global_vehicle_details() + return for vin, vehicle in self.vehicles.items(): _LOGGER.debug("Getting vehicle data") @@ -138,6 +183,8 @@ async def get_vehicles(self, force_init: bool = False) -> None: async def select_active_vehicle(self, vin) -> None: """Select the active vehicle.""" + if self._is_global_auth(): + return _LOGGER.debug("Selecting vehicle") data = json.dumps( { @@ -175,6 +222,8 @@ async def select_active_vehicle(self, vin) -> None: async def get_vehicle_information(self, vin) -> str: """Get information about a vehicle.""" + if self._is_global_auth(): + return await self._get_vehicle_details_global(vin) _LOGGER.debug("Getting information for vehicle") params = { "latest": True, @@ -218,6 +267,8 @@ async def get_vehicle_information(self, vin) -> str: async def get_vehicle_soc(self, vin) -> str: """Get information about a vehicle.""" + if self._is_global_auth(): + return {} _LOGGER.debug("Getting vehicle SOC") params = { "setting": "charging", @@ -259,6 +310,8 @@ async def get_vehicle_soc(self, vin) -> str: async def get_vehicle_ota_info(self, vin) -> dict: """Get information about a vehicle from OTA server.""" + if self._is_global_auth(): + return {} _LOGGER.debug("Getting OTA information for vehicle") data = {} async with SmartClient(self.config) as client: @@ -296,3 +349,72 @@ async def get_vehicle_ota_info(self, vin) -> dict: if retry > 1: raise SmartAuthError("Could not get vehicle information") return data + + async def _update_global_vehicle_details(self) -> None: + """Fetch global vehicle details and abilities.""" + for vin in list(self.vehicles.keys()): + await self._get_vehicle_details_global(vin) + await self._get_vehicle_abilities_global(vin) + + async def _get_vehicle_details_global(self, vin) -> dict: + """Get global vehicle details.""" + _LOGGER.debug("Getting global vehicle details") + await self._ensure_ssl_context() + async with SmartLoginClient(ssl_context=self.config.ssl_context) as client: + path = "/vc/vehicle/v1/vehicleCustomerInfo" + body = json.dumps({"vin": vin}) + host = httpx.URL(self.endpoint_urls.get_api_base_url()).host + headers = utils.generate_global_header( + method="POST", + path=path, + host=host, + app_key=self.endpoint_urls.get_global_app_key(), + app_secret=self.endpoint_urls.get_global_app_secret(), + body=body, + access_token=self.config.authentication.access_token, + user_id=self.config.authentication.api_user_id, + id_token=self.config.authentication.id_token, + ) + response = await client.post( + self.endpoint_urls.get_api_base_url() + path, + headers=headers, + content=body, + ) + data = response.json() + details = data.get("result") or data.get("data") or [] + if isinstance(details, list): + details = details[0] if details else {} + if details: + self.vehicles.get(vin).combine_data(details) + return details or {} + + async def _get_vehicle_abilities_global(self, vin) -> dict: + """Get global vehicle abilities.""" + _LOGGER.debug("Getting global vehicle abilities") + await self._ensure_ssl_context() + vehicle = self.vehicles.get(vin) + model_code = vehicle.data.get("modelCode") if vehicle else None + if not model_code: + return {} + async with SmartLoginClient(ssl_context=self.config.ssl_context) as client: + path = f"/vc/vehicle/v1/ability/{model_code}/{vin}" + host = httpx.URL(self.endpoint_urls.get_api_base_url()).host + headers = utils.generate_global_header( + method="GET", + path=path, + host=host, + app_key=self.endpoint_urls.get_global_app_key(), + app_secret=self.endpoint_urls.get_global_app_secret(), + access_token=self.config.authentication.access_token, + user_id=self.config.authentication.api_user_id, + id_token=self.config.authentication.id_token, + ) + response = await client.get( + self.endpoint_urls.get_api_base_url() + path, + headers=headers, + ) + data = response.json() + abilities = data.get("result") or data.get("data") or {} + if abilities and vehicle: + vehicle.data["abilities"] = abilities + return abilities or {} diff --git a/pysmarthashtag/api/authentication.py b/pysmarthashtag/api/authentication.py index 68da8e6..b7357bd 100644 --- a/pysmarthashtag/api/authentication.py +++ b/pysmarthashtag/api/authentication.py @@ -20,6 +20,7 @@ API_SESION_URL, HTTPX_TIMEOUT, EndpointUrls, + SmartAuthMode, ) from pysmarthashtag.models import SmartAPIError @@ -51,8 +52,10 @@ def __init__( self.api_access_token: Optional[str] = None self.api_refresh_token: Optional[str] = None self.api_user_id: Optional[str] = None + self.id_token: Optional[str] = None self.ssl_context: Optional[ssl.SSLContext] = ssl_context self.endpoint_urls: EndpointUrls = endpoint_urls if endpoint_urls is not None else EndpointUrls() + self.auth_mode: SmartAuthMode = self.endpoint_urls.infer_auth_mode() _LOGGER.debug("Device ID initialized") async def get_ssl_context(self) -> ssl.SSLContext: @@ -150,6 +153,7 @@ async def login(self) -> None: self.api_access_token = token_data["api_access_token"] self.api_refresh_token = token_data["api_refresh_token"] self.api_user_id = token_data["api_user_id"] + self.id_token = token_data.get("id_token") self.expires_at = token_data["expires_at"] _LOGGER.debug("Login successful") return True @@ -158,17 +162,27 @@ async def login(self) -> None: async def _refresh_access_token(self): """Refresh the access token.""" + if self.auth_mode == SmartAuthMode.GLOBAL_HMAC: + try: + return await self._refresh_access_token_global() + except (SmartAPIError, httpx.HTTPError, ValueError): + _LOGGER.debug("Refreshing access token failed. Logging in again") + return {} + try: - ssl_ctx = await self.get_ssl_context() - async with SmartLoginClient(ssl_context=ssl_ctx) as _: - _LOGGER.debug("Refreshing access token via relogin because refresh token is not implemented") - await self._login() - except SmartAPIError: + return await self._refresh_access_token_eu() + except (SmartAPIError, httpx.HTTPError, ValueError): _LOGGER.debug("Refreshing access token failed. Logging in again") return {} async def _login(self): """Login to Smart web services.""" + if self.auth_mode == SmartAuthMode.GLOBAL_HMAC: + return await self._login_global() + return await self._login_eu() + + async def _login_eu(self): + """Login to Smart web services (EU OAuth flow).""" ssl_ctx = await self.get_ssl_context() async with SmartLoginClient(ssl_context=ssl_ctx) as client: _LOGGER.info("Acquiring access token.") @@ -263,32 +277,7 @@ async def _login(self): except KeyError: raise SmartAPIError("Could not get access token from auth page") - data = json.dumps({"accessToken": access_token}).replace(" ", "") - r_api_access = await client.post( - # we do not know what type of car we have in our list so we fall back to the old API URL - self.endpoint_urls.get_api_base_url() + API_SESION_URL + "?identity_type=smart", - headers={ - **utils.generate_default_header( - self.device_id, - None, - params={ - "identity_type": "smart", - }, - method="POST", - url=API_SESION_URL, - body=data, - ) - }, - data=data, - ) - api_result = r_api_access.json() - _LOGGER.debug("API access result: %s", sanitize_log_data(api_result)) - try: - api_access_token = api_result["data"]["accessToken"] - api_refresh_token = api_result["data"]["refreshToken"] - api_user_id = api_result["data"]["userId"] - except KeyError: - raise SmartAPIError("Could not get API access token from API") + api_access_token, api_refresh_token, api_user_id = await self._get_api_session(client, access_token) return { "access_token": access_token, @@ -299,6 +288,190 @@ async def _login(self): "expires_at": expires_at, } + async def _get_api_session(self, client: "SmartLoginClient", access_token: str) -> tuple[str, str, str]: + """Exchange OAuth access token for API session tokens.""" + data = json.dumps({"accessToken": access_token}).replace(" ", "") + r_api_access = await client.post( + # we do not know what type of car we have in our list so we fall back to the old API URL + self.endpoint_urls.get_api_base_url() + API_SESION_URL + "?identity_type=smart", + headers={ + **utils.generate_default_header( + self.device_id, + None, + params={ + "identity_type": "smart", + }, + method="POST", + url=API_SESION_URL, + body=data, + ) + }, + data=data, + ) + api_result = r_api_access.json() + _LOGGER.debug("API access result: %s", sanitize_log_data(api_result)) + try: + api_access_token = api_result["data"]["accessToken"] + api_refresh_token = api_result["data"]["refreshToken"] + api_user_id = api_result["data"]["userId"] + except KeyError: + raise SmartAPIError("Could not get API access token from API") + return api_access_token, api_refresh_token, api_user_id + + async def _refresh_access_token_eu(self) -> dict: + """Refresh the EU OAuth access token.""" + if not self.refresh_token: + return {} + + ssl_ctx = await self.get_ssl_context() + async with SmartLoginClient(ssl_context=ssl_ctx) as client: + payload = { + "accessToken": "", + "refreshToken": self.refresh_token, + } + headers = { + "Content-Type": "application/json", + "User-Agent": "hello-smart/2.0.5 (Android)", + "X-API-Key": self.endpoint_urls.get_oauth_api_key(), + } + r_refresh = await client.post( + self.endpoint_urls.get_oauth_token_url(), + json=payload, + headers=headers, + ) + refresh_result = r_refresh.json() + access_token = refresh_result.get("accessToken") or refresh_result.get("access_token") + refresh_token = refresh_result.get("refreshToken") or refresh_result.get("refresh_token") + id_token = refresh_result.get("idToken") or refresh_result.get("id_token") + expires_in = refresh_result.get("expiresIn") or refresh_result.get("expires_in") + if not access_token: + return {} + + expires_at = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta( + seconds=int(expires_in) if expires_in else HTTPX_TIMEOUT * 2 + ) + api_access_token, api_refresh_token, api_user_id = await self._get_api_session(client, access_token) + return { + "access_token": access_token, + "refresh_token": refresh_token or self.refresh_token, + "api_access_token": api_access_token, + "api_refresh_token": api_refresh_token, + "api_user_id": api_user_id, + "id_token": id_token, + "expires_at": expires_at, + } + + async def _login_global(self) -> dict: + """Login to Smart Global app services (HMAC flow).""" + ssl_ctx = await self.get_ssl_context() + async with SmartLoginClient(ssl_context=ssl_ctx) as client: + _LOGGER.info("Acquiring access token (global app).") + + path = "/iam/service/api/v1/login" + payload = { + "email": self.username, + "password": self.password, + "imageSessionId": "", + "imageCode": "", + } + body = json.dumps(payload) + host = httpx.URL(self.endpoint_urls.get_api_base_url()).host + headers = utils.generate_global_header( + method="POST", + path=path, + host=host, + app_key=self.endpoint_urls.get_global_app_key(), + app_secret=self.endpoint_urls.get_global_app_secret(), + body=body, + ) + + r_login = await client.post( + self.endpoint_urls.get_api_base_url() + path, + headers=headers, + content=body, + ) + login_result = r_login.json() + _LOGGER.debug("Login result: %s", sanitize_log_data(login_result)) + data = login_result.get("data") or login_result.get("result") or {} + if not data: + message = login_result.get("message", "Unknown error") + code = login_result.get("code", "unknown") + raise SmartAPIError(f"Could not get tokens from global login: {code} {message}") + + access_token = data.get("accessToken") + refresh_token = data.get("refreshToken") + id_token = data.get("idToken") + api_user_id = data.get("userId") + expires_in = data.get("expiresIn") or data.get("expires_in") + if not access_token or not api_user_id: + raise SmartAPIError("Could not get access token from global login") + + expires_at = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta( + seconds=int(expires_in) if expires_in else HTTPX_TIMEOUT * 2 + ) + + return { + "access_token": access_token, + "refresh_token": refresh_token, + "api_access_token": access_token, + "api_refresh_token": refresh_token, + "api_user_id": api_user_id, + "id_token": id_token, + "expires_at": expires_at, + } + + async def _refresh_access_token_global(self) -> dict: + """Refresh the Global app access token.""" + if not self.refresh_token: + return {} + + ssl_ctx = await self.get_ssl_context() + async with SmartLoginClient(ssl_context=ssl_ctx) as client: + path = "/iam/service/api/v1/refresh/" + payload = {"refreshToken": self.refresh_token} + body = json.dumps(payload) + host = httpx.URL(self.endpoint_urls.get_api_base_url()).host + headers = utils.generate_global_header( + method="POST", + path=path, + host=host, + app_key=self.endpoint_urls.get_global_app_key(), + app_secret=self.endpoint_urls.get_global_app_secret(), + body=body, + ) + r_refresh = await client.post( + self.endpoint_urls.get_api_base_url() + path, + headers=headers, + content=body, + ) + refresh_result = r_refresh.json() + _LOGGER.debug("Refresh result: %s", sanitize_log_data(refresh_result)) + data = refresh_result.get("data") or refresh_result.get("result") or {} + if not data: + return {} + + access_token = data.get("accessToken") + refresh_token = data.get("refreshToken") or self.refresh_token + id_token = data.get("idToken") + api_user_id = data.get("userId") + expires_in = data.get("expiresIn") or data.get("expires_in") + if not access_token or not api_user_id: + return {} + + expires_at = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta( + seconds=int(expires_in) if expires_in else HTTPX_TIMEOUT * 2 + ) + + return { + "access_token": access_token, + "refresh_token": refresh_token, + "api_access_token": access_token, + "api_refresh_token": refresh_token, + "api_user_id": api_user_id, + "id_token": id_token, + "expires_at": expires_at, + } + class SmartLoginClient(httpx.AsyncClient): """Client to login to the Smart API.""" diff --git a/pysmarthashtag/api/utils.py b/pysmarthashtag/api/utils.py index 1e32318..42441c7 100644 --- a/pysmarthashtag/api/utils.py +++ b/pysmarthashtag/api/utils.py @@ -4,6 +4,9 @@ import logging import secrets import time +import uuid +from email.utils import formatdate +from typing import Optional _LOGGER = logging.getLogger(__name__) @@ -72,3 +75,125 @@ def generate_default_header( def create_correct_timestamp() -> str: """Create a correct timestamp for the request.""" return str(int(time.time() * 1000)) + + +def _ensure_bytes(body: Optional[object]) -> Optional[bytes]: + if body is None: + return None + if isinstance(body, bytes): + return body + return str(body).encode("utf-8") + + +def _global_md5_base64(body: bytes) -> str: + """Calculate MD5 hash and return the first 24 chars of base64 encoding.""" + md5_hash = hashlib.md5(body).digest() + return base64.b64encode(md5_hash).decode("utf-8")[:24] + + +def _build_global_string_to_sign( + method: str, + path: str, + headers: dict[str, str], + content_md5: str = "", +) -> str: + """Build the string to sign for HMAC-SHA256 Global API requests.""" + string_to_sign = [ + method, + headers.get("accept", ""), + content_md5, + headers.get("content-type", ""), + headers.get("date", ""), + ] + + ca_headers = [] + ca_header_names = [] + for key in sorted(headers.keys()): + if key.startswith("x-ca-"): + ca_headers.append(f"{key}:{headers[key]}") + ca_header_names.append(key) + + if ca_header_names: + headers["x-ca-signature-headers"] = ",".join(ca_header_names) + + string_to_sign.append("\n".join(ca_headers)) + string_to_sign.append(path) + + return "\n".join(string_to_sign) + + +def _generate_global_signature( + app_secret: str, + method: str, + path: str, + headers: dict[str, str], + body: Optional[bytes] = None, +) -> str: + """Generate HMAC-SHA256 signature for Global API requests.""" + content_md5 = "" + if body is not None: + content_md5 = _global_md5_base64(body) + headers["content-md5"] = content_md5 + + string_to_sign = _build_global_string_to_sign(method, path, headers, content_md5) + signature = hmac.new( + app_secret.encode("utf-8"), + string_to_sign.encode("utf-8"), + hashlib.sha256, + ).digest() + return base64.b64encode(signature).decode("utf-8") + + +def generate_global_header( + method: str, + path: str, + host: str, + app_key: str, + app_secret: str, + body: Optional[object] = None, + content_type: str = "application/json", + access_token: Optional[str] = None, + user_id: Optional[str] = None, + id_token: Optional[str] = None, + extra_headers: Optional[dict[str, str]] = None, +) -> dict[str, str]: + """Generate signed headers for Global app requests.""" + timestamp = create_correct_timestamp() + nonce = str(uuid.uuid4()) + http_date = formatdate(timeval=None, localtime=False, usegmt=True) + + headers = { + "date": http_date, + "x-ca-timestamp": timestamp, + "x-ca-nonce": nonce, + "x-ca-key": app_key, + "x-ca-signature-method": "HmacSHA256", + "CA_VERSION": "1", + "content-type": content_type, + "accept": content_type, + "host": host, + "user-agent": "ALIYUN-ANDROID-DEMO", + } + + if access_token: + headers["Authorization"] = f"Bearer {access_token}" + if user_id: + headers["x-smart-id"] = user_id + if id_token: + headers["Xs-Auth-Token"] = id_token + headers["Xs-App-Ver"] = "1.0.8" + + if extra_headers: + headers.update(extra_headers) + + body_bytes = _ensure_bytes(body) + headers["x-ca-signature"] = _generate_global_signature( + app_secret, + method, + path, + headers, + body_bytes, + ) + + _LOGGER.debug("Constructed global request header for %s %s", method, path) + return headers diff --git a/pysmarthashtag/cli.py b/pysmarthashtag/cli.py index 8089f89..41c7427 100755 --- a/pysmarthashtag/cli.py +++ b/pysmarthashtag/cli.py @@ -56,6 +56,8 @@ def main_parser() -> argparse.ArgumentParser: }, } + log_level = os.environ.get("SMART_LOG_LEVEL", "INFO").upper() + logging_config["loggers"]["pysmarthashtag"]["level"] = log_level logging.config.dictConfig(logging_config) parser = argparse.ArgumentParser(description="Smart API demo") @@ -170,8 +172,8 @@ def _add_default_args(parser: argparse.ArgumentParser): parser.add_argument("--password", help="Smart password", **environ_or_required("SMART_PASSWORD")) parser.add_argument( "--region", - help="Region for Smart API (eu=Europe, intl=International/Australia/Asia-Pacific)", - choices=["eu", "intl"], + help="Region for Smart API (eu=Europe, intl=Asia-Pacific, global=Australia/Israel)", + choices=["eu", "intl", "global"], default=os.environ.get("SMART_REGION", "eu"), ) @@ -180,9 +182,11 @@ def _get_endpoint_urls_from_args(args) -> EndpointUrls: """Get EndpointUrls based on region argument. Args: + ---- args: Parsed command line arguments containing the region. Returns: + ------- EndpointUrls configured for the specified region. """ diff --git a/pysmarthashtag/const.py b/pysmarthashtag/const.py index 963b721..f214df1 100644 --- a/pysmarthashtag/const.py +++ b/pysmarthashtag/const.py @@ -15,6 +15,13 @@ API_SELECT_CAR_URL = "/device-platform/user/session/update" API_TELEMATICS_URL = "/remote-control/vehicle/telematics/" +GLOBAL_API_BASE_URL = "https://sg-app-api.smart.com" +GLOBAL_APP_KEY = "204587190" +GLOBAL_APP_SECRET = "vxnzkHbpQrkKKQKmFBZlOnL780rjXLFT" + +EU_OAUTH_BASE_URL = "https://api.app-auth.srv.smart.com/v1/" +EU_OAUTH_API_KEY = "yHpsjnd9vzLq7GMowxBa" + OTA_SERVER_URL = "https://ota.srv.smart.com/" HTTPX_TIMEOUT = 30.0 @@ -31,18 +38,29 @@ class SmartRegion(str, Enum): EU = "eu" INTL = "intl" + GLOBAL = "global" + + +class SmartAuthMode(str, Enum): + """Authentication mode for Smart APIs.""" + + EU_OAUTH = "eu_oauth" + GLOBAL_HMAC = "global_hmac" def get_endpoint_urls_for_region(region: SmartRegion) -> "EndpointUrls": """Get pre-configured EndpointUrls for a specific region. Args: + ---- region: The region to get endpoint URLs for. Returns: + ------- EndpointUrls configured for the specified region. Example: + ------- >>> from pysmarthashtag.const import SmartRegion, get_endpoint_urls_for_region >>> from pysmarthashtag.account import SmartAccount >>> @@ -65,6 +83,12 @@ def get_endpoint_urls_for_region(region: SmartRegion) -> "EndpointUrls": api_base_url="https://api.ecloudap.com", api_base_url_v2="https://apiv2.ecloudap.com", ) + elif region == SmartRegion.GLOBAL: + # Global app region (Australia/Israel) - uses sg-app-api endpoints + return EndpointUrls( + api_base_url=GLOBAL_API_BASE_URL, + api_base_url_v2=GLOBAL_API_BASE_URL, + ) else: raise ValueError(f"Unknown region: {region}") @@ -84,6 +108,10 @@ class EndpointUrls: api_base_url: Optional[str] = None api_base_url_v2: Optional[str] = None ota_server_url: Optional[str] = None + oauth_base_url: Optional[str] = None + oauth_api_key: Optional[str] = None + global_app_key: Optional[str] = None + global_app_secret: Optional[str] = None def get_api_key(self) -> str: """Get the API key, using the default if not set.""" @@ -120,3 +148,30 @@ def get_api_base_url_v2(self) -> str: def get_ota_server_url(self) -> str: """Get the OTA server URL, using the default if not set.""" return self.ota_server_url if self.ota_server_url is not None else OTA_SERVER_URL + + def get_oauth_base_url(self) -> str: + """Get the OAuth base URL, using the default if not set.""" + return self.oauth_base_url if self.oauth_base_url is not None else EU_OAUTH_BASE_URL + + def get_oauth_api_key(self) -> str: + """Get the OAuth API key, using the default if not set.""" + return self.oauth_api_key if self.oauth_api_key is not None else EU_OAUTH_API_KEY + + def get_oauth_token_url(self) -> str: + """Get the OAuth token URL.""" + return f"{self.get_oauth_base_url().rstrip('/')}/token" + + def get_global_app_key(self) -> str: + """Get the Global app key, using the default if not set.""" + return self.global_app_key if self.global_app_key is not None else GLOBAL_APP_KEY + + def get_global_app_secret(self) -> str: + """Get the Global app secret, using the default if not set.""" + return self.global_app_secret if self.global_app_secret is not None else GLOBAL_APP_SECRET + + def infer_auth_mode(self) -> SmartAuthMode: + """Infer authentication mode based on endpoint URLs.""" + api_base_url = self.get_api_base_url() + if "sg-app-api.smart.com" in api_base_url: + return SmartAuthMode.GLOBAL_HMAC + return SmartAuthMode.EU_OAUTH diff --git a/pysmarthashtag/tests/common.py b/pysmarthashtag/tests/common.py index 2fa5391..95cd1ff 100644 --- a/pysmarthashtag/tests/common.py +++ b/pysmarthashtag/tests/common.py @@ -9,6 +9,7 @@ API_SELECT_CAR_URL, API_SESION_URL, AUTH_URL, + EU_OAUTH_BASE_URL, LOGIN_URL, OTA_SERVER_URL, SERVER_URL, @@ -52,6 +53,15 @@ def add_login_routes(self) -> None: json=load_response(RESPONSE_DIR / "login_result.json"), headers={"location": load_response(RESPONSE_DIR / "auth_result.url")}, ) + self.post(EU_OAUTH_BASE_URL + "token").respond( + 200, + json={ + "accessToken": "TestAccessToken", + "refreshToken": "TestRefreshToken", + "idToken": "TestIdToken", + "expiresIn": 3600, + }, + ) for base_url in [API_BASE_URL, API_BASE_URL_V2]: self.post(base_url + API_SESION_URL + "?identity_type=smart").respond( 200, diff --git a/pysmarthashtag/tests/test_endpoint_urls.py b/pysmarthashtag/tests/test_endpoint_urls.py index f3bf8a9..b712fce 100644 --- a/pysmarthashtag/tests/test_endpoint_urls.py +++ b/pysmarthashtag/tests/test_endpoint_urls.py @@ -9,6 +9,7 @@ API_BASE_URL, API_KEY, AUTH_URL, + GLOBAL_API_BASE_URL, LOGIN_URL, OTA_SERVER_URL, SERVER_URL, @@ -202,6 +203,7 @@ def test_region_enum_values(self): """Test that SmartRegion enum has expected values.""" assert SmartRegion.EU.value == "eu" assert SmartRegion.INTL.value == "intl" + assert SmartRegion.GLOBAL.value == "global" def test_account_with_eu_region(self): """Test creating SmartAccount with EU region preset.""" @@ -233,3 +235,9 @@ def test_authentication_with_intl_region(self): endpoint_urls=urls, ) assert auth.endpoint_urls.get_api_base_url() == "https://api.ecloudap.com" + + def test_region_global_returns_global_endpoints(self): + """Test that GLOBAL region returns sg-app-api endpoints.""" + urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + assert urls.get_api_base_url() == GLOBAL_API_BASE_URL + assert urls.get_api_base_url_v2() == GLOBAL_API_BASE_URL diff --git a/pysmarthashtag/vehicle/vehicle.py b/pysmarthashtag/vehicle/vehicle.py index f632a01..842af7f 100644 --- a/pysmarthashtag/vehicle/vehicle.py +++ b/pysmarthashtag/vehicle/vehicle.py @@ -4,7 +4,7 @@ import logging from typing import Optional -from pysmarthashtag.const import API_BASE_URL, API_BASE_URL_V2 +from pysmarthashtag.const import API_BASE_URL, API_BASE_URL_V2, SmartAuthMode from pysmarthashtag.models import ValueWithUnit, get_element_from_dict_maybe from pysmarthashtag.vehicle.battery import Battery from pysmarthashtag.vehicle.climate import Climate @@ -78,17 +78,23 @@ def __init__( self.account = account self.data = {} self.combine_data(vehicle_base, vehicle_state, charging_settings, None, fetched_at) - if self.data["seriesCodeVs"].startswith("HX"): + series_code = self.data.get("seriesCodeVs") or self.data.get("modelCode") or "" + if "seriesCodeVs" not in self.data and series_code: + self.data["seriesCodeVs"] = series_code + + if getattr(self.account.config.authentication, "auth_mode", None) == SmartAuthMode.GLOBAL_HMAC: + self.base_url = self.account.endpoint_urls.get_api_base_url() + elif series_code.startswith("HX"): _LOGGER.debug("Selected Vehicle is Smart #1 use V1 API") self.base_url = API_BASE_URL - elif self.data["seriesCodeVs"].startswith("HC"): + elif series_code.startswith("HC"): _LOGGER.debug("Selected Vehicle is Smart #3 use V1 API") self.base_url = API_BASE_URL - elif self.data["seriesCodeVs"].startswith("HY"): + elif series_code.startswith("HY"): _LOGGER.debug("Selected Vehicle is Smart #5 use V2 API") self.base_url = API_BASE_URL_V2 else: - _LOGGER.warning("Unknown Series Code Prefix %s use default API", self.data["seriesCodeVs"]) + _LOGGER.warning("Unknown Series Code Prefix %s use default API", series_code) _LOGGER.debug( "Initialized vehicle %s (%s)", self.name, From d44662f201b8b3a4bb80c4a337476208b572ca78 Mon Sep 17 00:00:00 2001 From: Bastian Neumann Date: Wed, 4 Feb 2026 21:57:45 +0100 Subject: [PATCH 02/16] Potential fix for code scanning alert no. 3: Incomplete URL substring sanitization Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> --- pysmarthashtag/const.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/pysmarthashtag/const.py b/pysmarthashtag/const.py index f214df1..0b4691b 100644 --- a/pysmarthashtag/const.py +++ b/pysmarthashtag/const.py @@ -169,9 +169,20 @@ def get_global_app_secret(self) -> str: """Get the Global app secret, using the default if not set.""" return self.global_app_secret if self.global_app_secret is not None else GLOBAL_APP_SECRET + def _is_global_api_base_url(self, api_base_url: str) -> bool: + """Determine if the given API base URL should use GLOBAL_HMAC auth. + + This avoids substring checks by comparing against known-good base URLs, + normalizing away a trailing slash if present. + """ + normalized = api_base_url.rstrip("/") + # Default global API base URL, plus any additional explicit variants if needed. + global_base = GLOBAL_API_BASE_URL.rstrip("/") + return normalized == global_base + def infer_auth_mode(self) -> SmartAuthMode: """Infer authentication mode based on endpoint URLs.""" api_base_url = self.get_api_base_url() - if "sg-app-api.smart.com" in api_base_url: + if self._is_global_api_base_url(api_base_url): return SmartAuthMode.GLOBAL_HMAC return SmartAuthMode.EU_OAUTH From 20dc0ebdd43b18762279308a7e511f16a778ff01 Mon Sep 17 00:00:00 2001 From: "coderabbitai[bot]" <136622811+coderabbitai[bot]@users.noreply.github.com> Date: Wed, 4 Feb 2026 21:12:22 +0000 Subject: [PATCH 03/16] =?UTF-8?q?=F0=9F=93=9D=20Add=20docstrings=20to=20`r?= =?UTF-8?q?etry-international-auth`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Docstrings generation was requested by @DasBasti. * https://github.com/DasBasti/pySmartHashtag/pull/161#issuecomment-3849699039 The following files were modified: * `pysmarthashtag/account.py` * `pysmarthashtag/api/authentication.py` * `pysmarthashtag/api/utils.py` * `pysmarthashtag/cli.py` * `pysmarthashtag/const.py` * `pysmarthashtag/tests/common.py` * `pysmarthashtag/vehicle/vehicle.py` --- pysmarthashtag/account.py | 135 +++++++++++++++++++++++---- pysmarthashtag/api/authentication.py | 133 ++++++++++++++++++++++---- pysmarthashtag/api/utils.py | 77 +++++++++++++-- pysmarthashtag/cli.py | 35 ++++--- pysmarthashtag/const.py | 60 +++++++++--- pysmarthashtag/tests/common.py | 15 ++- pysmarthashtag/vehicle/vehicle.py | 15 ++- 7 files changed, 398 insertions(+), 72 deletions(-) mode change 100755 => 100644 pysmarthashtag/cli.py diff --git a/pysmarthashtag/account.py b/pysmarthashtag/account.py index 8fb578f..d9902d6 100644 --- a/pysmarthashtag/account.py +++ b/pysmarthashtag/account.py @@ -44,7 +44,18 @@ class SmartAccount: """Vehicles associated with the account.""" def __post_init__(self, password, log_responses): - """Initialize the account.""" + """ + Set up endpoint URLs and client configuration for the Smart account. + + If `endpoint_urls` is None, assigns a default EndpointUrls instance. + If `config` is None, creates a SmartClientConfiguration using a SmartAuthentication + initialized with the instance `username`, the provided `password`, and the + resolved `endpoint_urls`, and applies the `log_responses` flag to the configuration. + + Parameters: + password (str): Password used to construct the SmartAuthentication instance. + log_responses (bool): Whether the created configuration should log server responses. + """ # Ensure endpoint_urls is set if self.endpoint_urls is None: self.endpoint_urls = EndpointUrls() @@ -56,15 +67,19 @@ def __post_init__(self, password, log_responses): ) def _is_global_auth(self) -> bool: - """Return True when using the Global app authentication mode.""" + """ + Determine whether the account uses Global app authentication. + + Returns: + bool: `true` if the account's authentication mode equals `GLOBAL_HMAC`, `false` otherwise. + """ return self.config.authentication.auth_mode == SmartAuthMode.GLOBAL_HMAC async def _ensure_ssl_context(self) -> None: - """Ensure SSL context is created asynchronously. - - This method creates the SSL context in a thread pool executor - to avoid blocking the async event loop when httpx creates - SSL connections. + """ + Ensure the configuration and its authentication both have an SSL context. + + If the configuration has no SSL context, obtain one via config.get_ssl_context() and assign it to config.ssl_context and config.authentication.ssl_context. """ if self.config.ssl_context is None: self.config.ssl_context = await self.config.get_ssl_context() @@ -79,7 +94,11 @@ async def login(self, force_refresh: bool = False) -> None: await self.config.authentication.login() async def _init_vehicles(self) -> None: - """Initialize vehicles from Smart servers.""" + """ + Initialize and populate account vehicles from the Smart API. + + Ensures an SSL context is available, requests the account's vehicle list from the configured Smart endpoint, records the UTC fetch timestamp, and adds each returned vehicle to the account via add_vehicle (passing the fetch time). Retries the request up to three times on token-refresh errors and reattempts initialization when a human-car-connection error occurs. + """ _LOGGER.debug("Getting initial vehicle list") await self._ensure_ssl_context() @@ -120,7 +139,11 @@ async def _init_vehicles(self) -> None: self.add_vehicle(vehicle, fetched_at) async def _init_vehicles_global(self) -> None: - """Initialize vehicles from Smart Global servers.""" + """ + Fetches the account's vehicle ownership list from the Smart Global endpoints and registers each vehicle with the account. + + Each discovered vehicle is added to the account's vehicle mapping along with the timestamp when the list was fetched. + """ _LOGGER.debug("Getting initial vehicle list (global)") await self._ensure_ssl_context() @@ -152,11 +175,24 @@ async def _init_vehicles_global(self) -> None: self.add_vehicle(vehicle, fetched_at) def add_vehicle(self, vehicle, fetched_at): - """Add a vehicle to the account.""" + """ + Add a vehicle to the account's vehicle mapping. + + Parameters: + vehicle (dict): Vehicle data from the API; must include the `vin` key used as the mapping key. + fetched_at (datetime): UTC timestamp when the vehicle data was fetched. + """ self.vehicles[vehicle.get("vin")] = SmartVehicle(self, vehicle, fetched_at=fetched_at) async def get_vehicles(self, force_init: bool = False) -> None: - """Get the vehicles associated with the account.""" + """ + Load and refresh the vehicles for this Smart account and populate the account's internal vehicle mapping. + + If the account is not authenticated, perform authentication first. When called with `force_init=True` or when no vehicles are known, fetch the list of vehicles. For global-auth mode the method fetches global vehicle listings and updates global details and abilities; for non-global mode it selects each vehicle and updates its information, state-of-charge, and OTA info by merging the retrieved data into each SmartVehicle instance. + + Parameters: + force_init (bool): If True, re-initialize the vehicle list even if vehicles are already present. + """ await self._ensure_ssl_context() if self.config.authentication.api_user_id is None: await self.config.authentication.login() @@ -182,7 +218,13 @@ async def get_vehicles(self, force_init: bool = False) -> None: vehicle.combine_data(vehicle_info, charging_settings=vehicle_soc, ota_info=vehicle_ota_info) async def select_active_vehicle(self, vin) -> None: - """Select the active vehicle.""" + """ + Selects the given vehicle as the active vehicle for subsequent operations. + + This updates the remote session to mark the vehicle identified by `vin` as active. When the account is configured for global authentication, this is a no-op. The method may perform internal retries on transient token or human-car-connection errors. + Parameters: + vin (str): Vehicle Identification Number of the vehicle to select. + """ if self._is_global_auth(): return _LOGGER.debug("Selecting vehicle") @@ -221,7 +263,15 @@ async def select_active_vehicle(self, vin) -> None: break async def get_vehicle_information(self, vin) -> str: - """Get information about a vehicle.""" + """ + Fetch the latest details and status for the vehicle identified by VIN, using global endpoints when the account is configured for global authentication. + + Returns: + dict: The `data` payload from the vehicle status response containing status, basic, and more fields; empty dict if no data was retrieved. + + Raises: + SmartAuthError: If vehicle information cannot be retrieved after retrying. + """ if self._is_global_auth(): return await self._get_vehicle_details_global(vin) _LOGGER.debug("Getting information for vehicle") @@ -266,7 +316,17 @@ async def get_vehicle_information(self, vin) -> str: return data async def get_vehicle_soc(self, vin) -> str: - """Get information about a vehicle.""" + """ + Retrieve the vehicle's state-of-charge (SOC) data. + + If the account is using global authentication, this returns an empty dict. On failure after retries, raises SmartAuthError. + + Returns: + dict: SOC data payload from the vehicle response, or an empty dict when using global authentication. + + Raises: + SmartAuthError: If the SOC data could not be retrieved after retrying. + """ if self._is_global_auth(): return {} _LOGGER.debug("Getting vehicle SOC") @@ -309,7 +369,22 @@ async def get_vehicle_soc(self, vin) -> str: return data async def get_vehicle_ota_info(self, vin) -> dict: - """Get information about a vehicle from OTA server.""" + """ + Retrieve OTA version information for the specified vehicle from the OTA server. + + If the account uses global authentication, no OTA request is performed and an empty dict is returned. + + Parameters: + vin (str): Vehicle Identification Number for the vehicle to query. + + Returns: + dict: A mapping with keys: + - `target_version`: OTA target version string or `None` if not present. + - `current_version`: Current vehicle OTA version string or `None` if not present. + + Raises: + SmartAuthError: When repeated authentication/token failures prevent retrieving OTA information. + """ if self._is_global_auth(): return {} _LOGGER.debug("Getting OTA information for vehicle") @@ -351,13 +426,25 @@ async def get_vehicle_ota_info(self, vin) -> dict: return data async def _update_global_vehicle_details(self) -> None: - """Fetch global vehicle details and abilities.""" + """ + Fetch global details and abilities for each known vehicle and merge them into the corresponding SmartVehicle objects stored on the account. + + This updates each vehicle in-place with any details and capability information returned by the global service; no value is returned. + """ for vin in list(self.vehicles.keys()): await self._get_vehicle_details_global(vin) await self._get_vehicle_abilities_global(vin) async def _get_vehicle_details_global(self, vin) -> dict: - """Get global vehicle details.""" + """ + Fetch global vehicle details for the given VIN and merge them into the corresponding SmartVehicle if available. + + Parameters: + vin (str): Vehicle Identification Number to fetch details for. + + Returns: + dict: Parsed vehicle details retrieved from the global endpoint, or an empty dict if no details were returned. + """ _LOGGER.debug("Getting global vehicle details") await self._ensure_ssl_context() async with SmartLoginClient(ssl_context=self.config.ssl_context) as client: @@ -389,7 +476,17 @@ async def _get_vehicle_details_global(self, vin) -> dict: return details or {} async def _get_vehicle_abilities_global(self, vin) -> dict: - """Get global vehicle abilities.""" + """ + Retrieve global ability information for a vehicle identified by VIN. + + Parameters: + vin (str): Vehicle Identification Number to query for abilities. + + Returns: + abilities (dict): Abilities data from the global API. The vehicle's `data["abilities"]` + is updated when abilities are present. Returns an empty dict if the vehicle has no + model code or if the API provides no abilities. + """ _LOGGER.debug("Getting global vehicle abilities") await self._ensure_ssl_context() vehicle = self.vehicles.get(vin) @@ -417,4 +514,4 @@ async def _get_vehicle_abilities_global(self, vin) -> dict: abilities = data.get("result") or data.get("data") or {} if abilities and vehicle: vehicle.data["abilities"] = abilities - return abilities or {} + return abilities or {} \ No newline at end of file diff --git a/pysmarthashtag/api/authentication.py b/pysmarthashtag/api/authentication.py index b7357bd..f258b7a 100644 --- a/pysmarthashtag/api/authentication.py +++ b/pysmarthashtag/api/authentication.py @@ -42,6 +42,21 @@ def __init__( ssl_context: Optional[ssl.SSLContext] = None, endpoint_urls: Optional[EndpointUrls] = None, ): + """ + Initialize the authentication manager with credentials, optional tokens, SSL context, and endpoint configuration. + + Parameters: + username (str): Account username used for authentication. + password (str): Account password used for authentication. + access_token (Optional[datetime.datetime]): Existing OAuth access token, if available. + expires_at (Optional[datetime.datetime]): Expiration time of `access_token`; used to determine when refresh/login is needed. + refresh_token (Optional[str]): Refresh token associated with `access_token`, if available. + ssl_context (Optional[ssl.SSLContext]): Optional SSL context to use for login requests to avoid creating one lazily. + endpoint_urls (Optional[EndpointUrls]): Endpoint configuration; if omitted a default is created and the authentication mode is inferred from it. + + Behavior: + Stores provided values on the instance, generates a random device identifier, initializes internal locks and API session token placeholders, and sets `auth_mode` by inferring it from `endpoint_urls`. + """ self.username: str = username self.password: str = password self.access_token: Optional[str] = access_token @@ -59,16 +74,13 @@ def __init__( _LOGGER.debug("Device ID initialized") async def get_ssl_context(self) -> ssl.SSLContext: - """Get or create SSL context asynchronously. - - This method returns a cached SSL context if available, or creates - a new one asynchronously using the shared ssl_context module. - Thread-safe using asyncio.Lock. - - Returns - ------- - ssl.SSLContext: An SSL context for secure connections. - + """ + Obtain the SSLContext used for secure connections, creating and caching it if necessary. + + This method returns the cached SSLContext when present; otherwise it asynchronously acquires a new SSLContext and stores it for subsequent calls. The operation is safe for concurrent callers. + + Returns: + ssl.SSLContext: SSL context configured for secure HTTP connections. """ if self.ssl_context is None: # Import here to avoid circular imports @@ -138,7 +150,14 @@ async def async_auth_flow(self, request: Request) -> AsyncGenerator[Request, Res raise async def login(self) -> None: - """Login to the Smart API.""" + """ + Perform authentication with the Smart API and store retrieved tokens and expiry. + + Attempts to refresh the access token when a refresh token is available; otherwise performs a full login. On success, normalizes and stores `access_token`, `refresh_token`, `api_access_token`, `api_refresh_token`, `api_user_id`, optional `id_token`, and `expires_at` (adjusted by EXPIRES_AT_OFFSET) on the instance. + + Raises: + SmartAPIError: If required token fields are missing from the login response. + """ _LOGGER.debug("Logging in to Smart API") token_data = {} if self.refresh_token: @@ -161,7 +180,11 @@ async def login(self) -> None: raise SmartAPIError("Could not login to Smart API") async def _refresh_access_token(self): - """Refresh the access token.""" + """ + Attempt to refresh the stored access (and related) tokens using the configured authentication mode. + + Tries a mode-specific refresh (global HMAC refresh when configured, otherwise the EU refresh). If the refresh succeeds, returns a dict with refreshed token data (e.g., `access_token`, `refresh_token`, `expires_at`, and optional `id_token`); if the refresh fails, returns an empty dict to indicate a full login is required. + """ if self.auth_mode == SmartAuthMode.GLOBAL_HMAC: try: return await self._refresh_access_token_global() @@ -176,13 +199,34 @@ async def _refresh_access_token(self): return {} async def _login(self): - """Login to Smart web services.""" + """ + Selects and executes the appropriate login flow for the configured authentication mode. + + Dispatches to the global HMAC login when auth_mode is SmartAuthMode.GLOBAL_HMAC; otherwise runs the EU OAuth login flow. + + Returns: + token_data (dict): Authentication tokens and related metadata such as `access_token`, `refresh_token`, `expires_at`, and optionally `id_token` and API session fields. + """ if self.auth_mode == SmartAuthMode.GLOBAL_HMAC: return await self._login_global() return await self._login_eu() async def _login_eu(self): - """Login to Smart web services (EU OAuth flow).""" + """ + Perform the EU OAuth login flow, exchange the OAuth access token for API session tokens, and return the resulting tokens and expiry. + + Returns: + dict: Mapping with the following keys: + access_token (str): OAuth access token obtained from the authorization redirect. + refresh_token (str): OAuth refresh token obtained from the authorization redirect. + api_access_token (str): API session access token exchanged from the OAuth access token. + api_refresh_token (str): API session refresh token exchanged from the OAuth access token. + api_user_id (str): User identifier returned by the API session exchange. + expires_at (datetime.datetime): UTC timestamp when the OAuth access token expires. + + Raises: + SmartAPIError: If the login context, login token, redirect location, or access/refresh tokens cannot be obtained. + """ ssl_ctx = await self.get_ssl_context() async with SmartLoginClient(ssl_context=ssl_ctx) as client: _LOGGER.info("Acquiring access token.") @@ -289,7 +333,18 @@ async def _login_eu(self): } async def _get_api_session(self, client: "SmartLoginClient", access_token: str) -> tuple[str, str, str]: - """Exchange OAuth access token for API session tokens.""" + """ + Exchange an OAuth access token for the API session tokens and the associated user id. + + Posts the given OAuth `access_token` to the API session endpoint and returns the API-level + access token, refresh token, and user identifier obtained from the response. + + Returns: + tuple[str, str, str]: (api_access_token, api_refresh_token, api_user_id) + + Raises: + SmartAPIError: If the response does not contain the expected token or user id fields. + """ data = json.dumps({"accessToken": access_token}).replace(" ", "") r_api_access = await client.post( # we do not know what type of car we have in our list so we fall back to the old API URL @@ -319,7 +374,22 @@ async def _get_api_session(self, client: "SmartLoginClient", access_token: str) return api_access_token, api_refresh_token, api_user_id async def _refresh_access_token_eu(self) -> dict: - """Refresh the EU OAuth access token.""" + """ + Attempt to refresh the EU (OAuth) access token and exchange it for API session tokens. + + If a refresh token is available, sends a refresh request to the OAuth token URL, computes a new expiry timestamp, exchanges the returned OAuth access token for API session tokens, and returns a mapping of tokens and metadata. Returns an empty dict if no refresh token is configured or if the refresh response does not yield an access token. + + Returns: + dict: A mapping with the following keys when successful: + - "access_token" (str): The refreshed OAuth access token. + - "refresh_token" (str): The refreshed OAuth refresh token (or the previous refresh token if not provided). + - "api_access_token" (str): The exchanged API access token for subsequent API calls. + - "api_refresh_token" (str): The exchanged API refresh token for API session renewal. + - "api_user_id" (str): The user identifier returned by the API session exchange. + - "id_token" (str | None): The ID token returned by the OAuth refresh response, if present. + - "expires_at" (datetime.datetime): UTC timestamp when the OAuth access token expires. + Returns an empty dict if no refresh was possible or the refresh response lacked an access token. + """ if not self.refresh_token: return {} @@ -362,7 +432,19 @@ async def _refresh_access_token_eu(self) -> dict: } async def _login_global(self) -> dict: - """Login to Smart Global app services (HMAC flow).""" + """ + Perform the global (HMAC) login flow and return obtained tokens and related metadata. + + Returns: + dict: Mapping with the following keys: + - access_token (str): OAuth access token from the global login. + - refresh_token (str|None): Refresh token from the global login, if provided. + - api_access_token (str): API session access token (same as `access_token` for global flow). + - api_refresh_token (str|None): API session refresh token (same as `refresh_token` for global flow). + - api_user_id (str): User identifier returned by the global login. + - id_token (str|None): ID token returned by the global login, if present. + - expires_at (datetime.datetime): UTC timestamp when the access token expires. + """ ssl_ctx = await self.get_ssl_context() async with SmartLoginClient(ssl_context=ssl_ctx) as client: _LOGGER.info("Acquiring access token (global app).") @@ -421,7 +503,20 @@ async def _login_global(self) -> dict: } async def _refresh_access_token_global(self) -> dict: - """Refresh the Global app access token.""" + """ + Refresh the Global app access token and return the refreshed token set. + + Returns: + dict: Mapping with refreshed token and session fields: + - `access_token` (str): OAuth access token. + - `refresh_token` (str): OAuth refresh token (may be original if not returned). + - `api_access_token` (str): API session access token (same as `access_token`). + - `api_refresh_token` (str): API session refresh token (same as `refresh_token`). + - `api_user_id` (str): API user identifier. + - `id_token` (str | None): ID token if provided by the server. + - `expires_at` (datetime.datetime): UTC timestamp when the access token expires. + Returns an empty dict if no refresh token is available or the refresh attempt fails. + """ if not self.refresh_token: return {} @@ -569,4 +664,4 @@ def get_retry_wait_time(response: httpx.Response) -> int: retry_after = next(iter([int(i) for i in response.json().get("message", "") if i.isdigit()])) except Exception: retry_after = 2 - return math.ceil(retry_after * 2) + return math.ceil(retry_after * 2) \ No newline at end of file diff --git a/pysmarthashtag/api/utils.py b/pysmarthashtag/api/utils.py index 42441c7..97a210d 100644 --- a/pysmarthashtag/api/utils.py +++ b/pysmarthashtag/api/utils.py @@ -73,11 +73,25 @@ def generate_default_header( def create_correct_timestamp() -> str: - """Create a correct timestamp for the request.""" + """ + Generate a timestamp string representing the current time in milliseconds since the Unix epoch. + + Returns: + timestamp (str): Current time in milliseconds since 1970-01-01 UTC, formatted as a decimal string. + """ return str(int(time.time() * 1000)) def _ensure_bytes(body: Optional[object]) -> Optional[bytes]: + """ + Normalize a request body to a UTF-8 bytes object when present. + + Parameters: + body (Optional[object]): The value to normalize. If `None`, no conversion is performed. + + Returns: + Optional[bytes]: `None` if input is `None`; the input unchanged if already `bytes`; otherwise the UTF-8 encoding of `str(body)`. + """ if body is None: return None if isinstance(body, bytes): @@ -86,7 +100,15 @@ def _ensure_bytes(body: Optional[object]) -> Optional[bytes]: def _global_md5_base64(body: bytes) -> str: - """Calculate MD5 hash and return the first 24 chars of base64 encoding.""" + """ + Return the first 24 characters of the base64-encoded MD5 digest of `body`. + + Parameters: + body (bytes): Input bytes to hash. + + Returns: + str: First 24 characters of the base64-encoded MD5 digest. + """ md5_hash = hashlib.md5(body).digest() return base64.b64encode(md5_hash).decode("utf-8")[:24] @@ -97,7 +119,20 @@ def _build_global_string_to_sign( headers: dict[str, str], content_md5: str = "", ) -> str: - """Build the string to sign for HMAC-SHA256 Global API requests.""" + """ + Construct the canonical string used to compute the HMAC-SHA256 signature for a Global API request. + + The resulting newline-separated string contains, in order: HTTP method, Accept header, the provided content MD5 value, Content-Type header, Date header, all `x-ca-*` headers (each as `key:value` on its own line), and the request path. This canonical string is intended to be the message passed to the signing HMAC. + + Parameters: + method (str): HTTP method (e.g., "GET", "POST"). + path (str): Request path, including query string if applicable. + headers (dict[str, str]): Request headers; values for "accept", "content-type", "date", and any `x-ca-*` headers are used. + content_md5 (str): Base64-encoded MD5 of the request body when present, or an empty string if absent. + + Returns: + str: The canonical string to sign with HMAC-SHA256. + """ string_to_sign = [ method, headers.get("accept", ""), @@ -129,7 +164,17 @@ def _generate_global_signature( headers: dict[str, str], body: Optional[bytes] = None, ) -> str: - """Generate HMAC-SHA256 signature for Global API requests.""" + """ + Create the HMAC-SHA256 signature used for Global API requests. + + If a request body is provided, its MD5 (base64, truncated to 24 chars) is computed and inserted into headers["content-md5"] before signing. The function builds the canonical string-to-sign from method, path, and headers, then returns the base64-encoded HMAC-SHA256 of that string using app_secret as the key. + + Parameters: + headers (dict[str, str]): Request headers; this dict will be mutated to include "content-md5" when a body is provided. + + Returns: + str: The base64-encoded HMAC-SHA256 signature. + """ content_md5 = "" if body is not None: content_md5 = _global_md5_base64(body) @@ -157,7 +202,27 @@ def generate_global_header( id_token: Optional[str] = None, extra_headers: Optional[dict[str, str]] = None, ) -> dict[str, str]: - """Generate signed headers for Global app requests.""" + """ + Builds HTTP headers for a Global API request and signs them with HMAC-SHA256. + + Assembles standard headers (date, content-type, host, user-agent, x-ca-timestamp, x-ca-nonce, x-ca-key, etc.), conditionally includes Authorization/x-smart-id/Xs-Auth-Token when provided, merges any extra_headers, and computes the `x-ca-signature` header using the provided `app_secret`. + + Parameters: + method (str): HTTP method (e.g., "GET", "POST") used when computing the signature. + path (str): Request path (URI) used in the signature calculation. + host (str): Host header value for the request. + app_key (str): Application key inserted as `x-ca-key`. + app_secret (str): Secret used to compute the HMAC-SHA256 signature. + body (Optional[object]): Request body; if provided it will be converted to bytes and included in the signature computation. + content_type (str): Value for `content-type` and `accept` headers. Defaults to "application/json". + access_token (Optional[str]): If provided, added as `Authorization: Bearer `. + user_id (Optional[str]): If provided, added as `x-smart-id`. + id_token (Optional[str]): If provided, added as `Xs-Auth-Token` (and `Xs-App-Ver` is set). + extra_headers (Optional[dict[str, str]]): Additional headers to merge into the final header set. + + Returns: + dict[str, str]: A dictionary of HTTP headers ready to attach to the request, including the computed `x-ca-signature`. + """ timestamp = create_correct_timestamp() nonce = str(uuid.uuid4()) http_date = formatdate(timeval=None, localtime=False, usegmt=True) @@ -196,4 +261,4 @@ def generate_global_header( ) _LOGGER.debug("Constructed global request header for %s %s", method, path) - return headers + return headers \ No newline at end of file diff --git a/pysmarthashtag/cli.py b/pysmarthashtag/cli.py old mode 100755 new mode 100644 index 41c7427..ce1798e --- a/pysmarthashtag/cli.py +++ b/pysmarthashtag/cli.py @@ -24,7 +24,14 @@ def environ_or_required(key): def main_parser() -> argparse.ArgumentParser: - """Create argument parser.""" + """ + Create and return the CLI ArgumentParser and configure module logging from SMART_LOG_LEVEL. + + The parser is configured with subcommands: `status`, `info`, `watch` (with `-i` interval), `climate` (with `--vin`, `--temp`, `--active`), and `seatheating` (with `--vin`, `--level`, `--temp`, `--active`). Default authentication/region arguments are added via the module helper and the parser's default `func` is set to the command dispatcher. + + Returns: + argparse.ArgumentParser: A fully configured ArgumentParser for the CLI. + """ logging_config = { "version": 1, @@ -167,7 +174,13 @@ async def set_seatheating(args) -> None: def _add_default_args(parser: argparse.ArgumentParser): - """Add the default arguments username, password to the parser.""" + """ + Add standard CLI options for Smart account credentials and API region. + + Adds the following arguments to the provided ArgumentParser: + - `--username` and `--password`: use values from `SMART_USERNAME` / `SMART_PASSWORD` when present; marked required if the corresponding environment variable is not set. + - `--region`: selects the Smart API region with choices `eu`, `intl`, `global`; defaults to the `SMART_REGION` environment variable or `"eu"` when not set. + """ parser.add_argument("--username", help="Smart username", **environ_or_required("SMART_USERNAME")) parser.add_argument("--password", help="Smart password", **environ_or_required("SMART_PASSWORD")) parser.add_argument( @@ -179,16 +192,14 @@ def _add_default_args(parser: argparse.ArgumentParser): def _get_endpoint_urls_from_args(args) -> EndpointUrls: - """Get EndpointUrls based on region argument. - - Args: - ---- - args: Parsed command line arguments containing the region. - + """ + Return endpoint URLs for the SmartRegion specified in args.region. + + Parameters: + args: Parsed command-line arguments with a `region` attribute ('eu', 'intl', or 'global'). + Returns: - ------- - EndpointUrls configured for the specified region. - + EndpointUrls: Endpoint URLs configured for the specified region. """ region = SmartRegion(args.region) return get_endpoint_urls_for_region(region) @@ -203,4 +214,4 @@ def main(): if __name__ == "__main__": - main() + main() \ No newline at end of file diff --git a/pysmarthashtag/const.py b/pysmarthashtag/const.py index 0b4691b..cb82b1f 100644 --- a/pysmarthashtag/const.py +++ b/pysmarthashtag/const.py @@ -146,34 +146,65 @@ def get_api_base_url_v2(self) -> str: return self.api_base_url_v2 if self.api_base_url_v2 is not None else API_BASE_URL_V2 def get_ota_server_url(self) -> str: - """Get the OTA server URL, using the default if not set.""" + """ + Return the configured OTA server URL or the module default. + + Returns: + str: The OTA server URL; the configured `ota_server_url` when present, otherwise `OTA_SERVER_URL`. + """ return self.ota_server_url if self.ota_server_url is not None else OTA_SERVER_URL def get_oauth_base_url(self) -> str: - """Get the OAuth base URL, using the default if not set.""" + """ + Return the OAuth base URL to use for token requests. + + Returns: + The configured OAuth base URL string, or `EU_OAUTH_BASE_URL` if no override is set. + """ return self.oauth_base_url if self.oauth_base_url is not None else EU_OAUTH_BASE_URL def get_oauth_api_key(self) -> str: - """Get the OAuth API key, using the default if not set.""" + """ + Return the configured OAuth API key or the default if none is configured. + + Returns: + str: The configured OAuth API key if set, otherwise the module default `EU_OAUTH_API_KEY`. + """ return self.oauth_api_key if self.oauth_api_key is not None else EU_OAUTH_API_KEY def get_oauth_token_url(self) -> str: - """Get the OAuth token URL.""" + """ + Builds the OAuth token endpoint URL for the configured OAuth base URL. + + Returns: + The full token endpoint URL (the OAuth base URL with a single trailing '/token'). + """ return f"{self.get_oauth_base_url().rstrip('/')}/token" def get_global_app_key(self) -> str: - """Get the Global app key, using the default if not set.""" + """ + Return the configured global app key or the default. + + Returns: + global_app_key (str): The configured global app key if set, otherwise the module default GLOBAL_APP_KEY. + """ return self.global_app_key if self.global_app_key is not None else GLOBAL_APP_KEY def get_global_app_secret(self) -> str: - """Get the Global app secret, using the default if not set.""" + """ + Return the configured global app secret for this endpoint. + + Returns: + str: The global app secret set on this instance, or the module default `GLOBAL_APP_SECRET` if none is configured. + """ return self.global_app_secret if self.global_app_secret is not None else GLOBAL_APP_SECRET def _is_global_api_base_url(self, api_base_url: str) -> bool: - """Determine if the given API base URL should use GLOBAL_HMAC auth. - - This avoids substring checks by comparing against known-good base URLs, - normalizing away a trailing slash if present. + """ + Determine whether `api_base_url` matches the module's global API base URL (ignoring a trailing slash). + + Returns: + True if `api_base_url` equals `GLOBAL_API_BASE_URL` after removing a trailing slash, False otherwise. """ normalized = api_base_url.rstrip("/") # Default global API base URL, plus any additional explicit variants if needed. @@ -181,8 +212,13 @@ def _is_global_api_base_url(self, api_base_url: str) -> bool: return normalized == global_base def infer_auth_mode(self) -> SmartAuthMode: - """Infer authentication mode based on endpoint URLs.""" + """ + Selects the authentication mode to use based on the configured API base URL. + + Returns: + SmartAuthMode: `SmartAuthMode.GLOBAL_HMAC` if the configured API base URL matches the global API base URL, `SmartAuthMode.EU_OAUTH` otherwise. + """ api_base_url = self.get_api_base_url() if self._is_global_api_base_url(api_base_url): return SmartAuthMode.GLOBAL_HMAC - return SmartAuthMode.EU_OAUTH + return SmartAuthMode.EU_OAUTH \ No newline at end of file diff --git a/pysmarthashtag/tests/common.py b/pysmarthashtag/tests/common.py index 95cd1ff..66b3495 100644 --- a/pysmarthashtag/tests/common.py +++ b/pysmarthashtag/tests/common.py @@ -33,7 +33,18 @@ def __init__( # # # # # # # # # # # # # # # # # # # # # # # # def add_login_routes(self) -> None: - """Add routes for login.""" + """ + Register mocked HTTP routes for the Smart API login flow and related endpoints. + + This sets up a stateful sequence of mocked responses used by tests, including: + - initial server redirect and authentication context endpoints, + - login endpoint returning a test session token and intermediate redirect, + - OAuth token endpoint returning access/refresh/id tokens, + - SMART session and vehicle-related endpoints (for both API_BASE_URL and API_BASE_URL_V2) used to fetch car lists, select a car, retrieve vehicle status and SOC, and perform telematics/remote-control actions, + - OTA app info endpoints for test VINs. + + Responses use predefined JSON fixtures loaded from the test RESPONSE_DIR. + """ # Login context self.get(SERVER_URL).respond(302, headers={"location": load_response(RESPONSE_DIR / "auth_context.url")}) @@ -105,4 +116,4 @@ def add_login_routes(self) -> None: self.get(OTA_SERVER_URL + "app/info/TestVIN0000000002").respond( 200, json=load_response(RESPONSE_DIR / "ota_response.json"), - ) + ) \ No newline at end of file diff --git a/pysmarthashtag/vehicle/vehicle.py b/pysmarthashtag/vehicle/vehicle.py index 842af7f..178afd7 100644 --- a/pysmarthashtag/vehicle/vehicle.py +++ b/pysmarthashtag/vehicle/vehicle.py @@ -74,7 +74,18 @@ def __init__( charging_settings: Optional[dict] = None, fetched_at: Optional[datetime.datetime] = None, ) -> None: - """Initialize the vehicle.""" + """ + Create a SmartVehicle instance by storing the account and merging provided vehicle data, then derive series code and select the appropriate API base URL. + + Merges vehicle_base, vehicle_state, and charging_settings into the instance data (optionally recording fetched_at), ensures a `seriesCodeVs` value is present when derivable, and chooses the API base URL according to the account authentication mode and the vehicle series code. Logs vehicle initialization. + + Parameters: + account (SmartAccount): Account that owns the vehicle and provides configuration and endpoint URLs. + vehicle_base (dict): Base vehicle information retrieved from the primary API. + vehicle_state (Optional[dict]): Optional dynamic state payload to merge into the base data. + charging_settings (Optional[dict]): Optional charging-related settings to merge into the base data. + fetched_at (Optional[datetime.datetime]): Optional timestamp indicating when the provided data was fetched. + """ self.account = account self.data = {} self.combine_data(vehicle_base, vehicle_state, charging_settings, None, fetched_at) @@ -161,4 +172,4 @@ def _parse_data(self) -> None: self.engine_state = get_element_from_dict_maybe( self.data, "vehicleStatus", "basicVehicleStatus", "engineStatus" - ) + ) \ No newline at end of file From 676fb225d03665c97c549786be699b88ecf0671a Mon Sep 17 00:00:00 2001 From: Bastian Neumann Date: Wed, 4 Feb 2026 22:12:24 +0100 Subject: [PATCH 04/16] Update pysmarthashtag/api/authentication.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- pysmarthashtag/api/authentication.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pysmarthashtag/api/authentication.py b/pysmarthashtag/api/authentication.py index b7357bd..477fd70 100644 --- a/pysmarthashtag/api/authentication.py +++ b/pysmarthashtag/api/authentication.py @@ -348,7 +348,7 @@ async def _refresh_access_token_eu(self) -> dict: return {} expires_at = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta( - seconds=int(expires_in) if expires_in else HTTPX_TIMEOUT * 2 + seconds=int(expires_in) if expires_in else 3600 ) api_access_token, api_refresh_token, api_user_id = await self._get_api_session(client, access_token) return { From 63e1d74dc446edb6c294cdff02ec4ee93054e673 Mon Sep 17 00:00:00 2001 From: Bastian Neumann Date: Wed, 4 Feb 2026 22:12:57 +0100 Subject: [PATCH 05/16] Update pysmarthashtag/account.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- pysmarthashtag/account.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pysmarthashtag/account.py b/pysmarthashtag/account.py index 8fb578f..0abac27 100644 --- a/pysmarthashtag/account.py +++ b/pysmarthashtag/account.py @@ -220,7 +220,7 @@ async def select_active_vehicle(self, vin) -> None: continue break - async def get_vehicle_information(self, vin) -> str: + async def get_vehicle_information(self, vin) -> dict: """Get information about a vehicle.""" if self._is_global_auth(): return await self._get_vehicle_details_global(vin) From e84fa4cb47b280da7c467871e3a9114d6ef7ce77 Mon Sep 17 00:00:00 2001 From: Bastian Neumann Date: Wed, 4 Feb 2026 22:20:20 +0100 Subject: [PATCH 06/16] Update pyproject.toml Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 166d055..9db3e3b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -77,7 +77,7 @@ exclude = [ [tool.ruff.lint.per-file-ignores] "docs/source/conf.py" = ["D100"] -"pysamrthashtag/api/authentication.py" = ["D102", "D107"] +"pysmarthashtag/api/authentication.py" = ["D102", "D107"] [tool.ruff.lint.mccabe] max-complexity = 25 From 02df8181916100fb9c4fc9d5af9fe409285b67b9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 4 Feb 2026 21:21:21 +0000 Subject: [PATCH 07/16] Initial plan From 37a6d3f8eb0d35e8801cf3720cfad1816b70bf86 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 4 Feb 2026 21:21:29 +0000 Subject: [PATCH 08/16] Initial plan From fc6bb0250d6f7a98af77c5ea78b1c35cb03eb7cf Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 4 Feb 2026 21:24:18 +0000 Subject: [PATCH 09/16] Add unit tests for Global HMAC authentication methods Co-authored-by: DasBasti <1713093+DasBasti@users.noreply.github.com> --- .../tests/replys/global_login_result.json | 11 + .../tests/replys/global_refresh_result.json | 11 + pysmarthashtag/tests/test_authentication.py | 303 ++++++++++++++++++ 3 files changed, 325 insertions(+) create mode 100644 pysmarthashtag/tests/replys/global_login_result.json create mode 100644 pysmarthashtag/tests/replys/global_refresh_result.json create mode 100644 pysmarthashtag/tests/test_authentication.py diff --git a/pysmarthashtag/tests/replys/global_login_result.json b/pysmarthashtag/tests/replys/global_login_result.json new file mode 100644 index 0000000..a290c62 --- /dev/null +++ b/pysmarthashtag/tests/replys/global_login_result.json @@ -0,0 +1,11 @@ +{ + "code": "0000", + "message": "Success", + "data": { + "accessToken": "TestGlobalAccessToken", + "refreshToken": "TestGlobalRefreshToken", + "idToken": "TestGlobalIdToken", + "userId": "global123456", + "expiresIn": 3600 + } +} diff --git a/pysmarthashtag/tests/replys/global_refresh_result.json b/pysmarthashtag/tests/replys/global_refresh_result.json new file mode 100644 index 0000000..1813f7a --- /dev/null +++ b/pysmarthashtag/tests/replys/global_refresh_result.json @@ -0,0 +1,11 @@ +{ + "code": "0000", + "message": "Success", + "data": { + "accessToken": "TestGlobalRefreshedAccessToken", + "refreshToken": "TestGlobalRefreshedRefreshToken", + "idToken": "TestGlobalRefreshedIdToken", + "userId": "global123456", + "expiresIn": 3600 + } +} diff --git a/pysmarthashtag/tests/test_authentication.py b/pysmarthashtag/tests/test_authentication.py new file mode 100644 index 0000000..355b3d0 --- /dev/null +++ b/pysmarthashtag/tests/test_authentication.py @@ -0,0 +1,303 @@ +"""Test authentication module.""" + +import datetime +import logging + +import pytest +import respx +from httpx import Response + +from pysmarthashtag.api.authentication import SmartAuthentication +from pysmarthashtag.const import ( + GLOBAL_API_BASE_URL, + GLOBAL_APP_KEY, + GLOBAL_APP_SECRET, + SmartAuthMode, + get_endpoint_urls_for_region, + SmartRegion, + EndpointUrls, +) +from pysmarthashtag.tests import RESPONSE_DIR, TEST_PASSWORD, TEST_USERNAME, load_response + +_LOGGER = logging.getLogger(__name__) + + +@pytest.mark.asyncio +async def test_login_global(): + """Test the Global HMAC login flow.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + + with respx.mock: + # Mock the global login endpoint + login_route = respx.post( + GLOBAL_API_BASE_URL + "/iam/service/api/v1/login" + ).mock(return_value=Response(200, json=load_response(RESPONSE_DIR / "global_login_result.json"))) + + # Mock API session endpoint + respx.post( + GLOBAL_API_BASE_URL + "/iam/service/api/v1/session?identity_type=smart" + ).mock(return_value=Response(200, json=load_response(RESPONSE_DIR / "api_access.json"))) + + auth = SmartAuthentication( + username=TEST_USERNAME, + password=TEST_PASSWORD, + endpoint_urls=endpoint_urls, + ) + + # Verify auth_mode is correctly inferred + assert auth.auth_mode == SmartAuthMode.GLOBAL_HMAC + + # Call _login_global directly + result = await auth._login_global() + + # Verify the login endpoint was called + assert login_route.called + + # Verify the result contains all expected fields + assert "access_token" in result + assert "refresh_token" in result + assert "api_access_token" in result + assert "api_refresh_token" in result + assert "api_user_id" in result + assert "id_token" in result + assert "expires_at" in result + + # Verify the token values are correct + assert result["access_token"] == "TestGlobalAccessToken" + assert result["refresh_token"] == "TestGlobalRefreshToken" + assert result["api_access_token"] == "TestGlobalAccessToken" + assert result["api_refresh_token"] == "TestGlobalRefreshToken" + assert result["api_user_id"] == "global123456" + assert result["id_token"] == "TestGlobalIdToken" + + # Verify expires_at is a datetime in the future + assert isinstance(result["expires_at"], datetime.datetime) + assert result["expires_at"] > datetime.datetime.now(datetime.timezone.utc) + + +@pytest.mark.asyncio +async def test_refresh_access_token_global(): + """Test the Global HMAC token refresh flow.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + + with respx.mock: + # Mock the global refresh endpoint + refresh_route = respx.post( + GLOBAL_API_BASE_URL + "/iam/service/api/v1/refresh/" + ).mock(return_value=Response(200, json=load_response(RESPONSE_DIR / "global_refresh_result.json"))) + + auth = SmartAuthentication( + username=TEST_USERNAME, + password=TEST_PASSWORD, + refresh_token="TestGlobalRefreshToken", + endpoint_urls=endpoint_urls, + ) + + # Verify auth_mode is correctly inferred + assert auth.auth_mode == SmartAuthMode.GLOBAL_HMAC + + # Call _refresh_access_token_global directly + result = await auth._refresh_access_token_global() + + # Verify the refresh endpoint was called + assert refresh_route.called + + # Verify the result contains all expected fields + assert "access_token" in result + assert "refresh_token" in result + assert "api_access_token" in result + assert "api_refresh_token" in result + assert "api_user_id" in result + assert "id_token" in result + assert "expires_at" in result + + # Verify the token values are correct + assert result["access_token"] == "TestGlobalRefreshedAccessToken" + assert result["refresh_token"] == "TestGlobalRefreshedRefreshToken" + assert result["api_access_token"] == "TestGlobalRefreshedAccessToken" + assert result["api_refresh_token"] == "TestGlobalRefreshedRefreshToken" + assert result["api_user_id"] == "global123456" + assert result["id_token"] == "TestGlobalRefreshedIdToken" + + # Verify expires_at is a datetime in the future + assert isinstance(result["expires_at"], datetime.datetime) + assert result["expires_at"] > datetime.datetime.now(datetime.timezone.utc) + + +@pytest.mark.asyncio +async def test_refresh_access_token_global_no_refresh_token(): + """Test the Global HMAC token refresh returns empty dict when no refresh token.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + + auth = SmartAuthentication( + username=TEST_USERNAME, + password=TEST_PASSWORD, + endpoint_urls=endpoint_urls, + ) + + # Verify auth_mode is correctly inferred + assert auth.auth_mode == SmartAuthMode.GLOBAL_HMAC + + # Call _refresh_access_token_global directly without refresh token + result = await auth._refresh_access_token_global() + + # Should return empty dict + assert result == {} + + +@pytest.mark.asyncio +async def test_auth_mode_selection_global(): + """Test that auth_mode selection triggers Global HMAC code paths.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + + with respx.mock: + # Mock the global login endpoint + login_route = respx.post( + GLOBAL_API_BASE_URL + "/iam/service/api/v1/login" + ).mock(return_value=Response(200, json=load_response(RESPONSE_DIR / "global_login_result.json"))) + + # Mock API session endpoint + session_route = respx.post( + GLOBAL_API_BASE_URL + "/iam/service/api/v1/session?identity_type=smart" + ).mock(return_value=Response(200, json=load_response(RESPONSE_DIR / "api_access.json"))) + + auth = SmartAuthentication( + username=TEST_USERNAME, + password=TEST_PASSWORD, + endpoint_urls=endpoint_urls, + ) + + # Verify auth_mode is correctly inferred as GLOBAL_HMAC + assert auth.auth_mode == SmartAuthMode.GLOBAL_HMAC + + # Call the main _login method which should route to _login_global + result = await auth._login() + + # Verify the global login endpoint was called (not EU OAuth) + assert login_route.called + + # Verify the result contains global tokens + assert result["access_token"] == "TestGlobalAccessToken" + assert result["api_user_id"] == "global123456" + + +@pytest.mark.asyncio +async def test_auth_mode_selection_global_refresh(): + """Test that auth_mode selection triggers Global HMAC refresh code path.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + + with respx.mock: + # Mock the global refresh endpoint + refresh_route = respx.post( + GLOBAL_API_BASE_URL + "/iam/service/api/v1/refresh/" + ).mock(return_value=Response(200, json=load_response(RESPONSE_DIR / "global_refresh_result.json"))) + + auth = SmartAuthentication( + username=TEST_USERNAME, + password=TEST_PASSWORD, + refresh_token="TestGlobalRefreshToken", + endpoint_urls=endpoint_urls, + ) + + # Verify auth_mode is correctly inferred as GLOBAL_HMAC + assert auth.auth_mode == SmartAuthMode.GLOBAL_HMAC + + # Call the main _refresh_access_token method which should route to _refresh_access_token_global + result = await auth._refresh_access_token() + + # Verify the global refresh endpoint was called (not EU OAuth) + assert refresh_route.called + + # Verify the result contains refreshed global tokens + assert result["access_token"] == "TestGlobalRefreshedAccessToken" + assert result["api_user_id"] == "global123456" + + +@pytest.mark.asyncio +async def test_login_global_missing_access_token(): + """Test that _login_global raises error when access token is missing.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + + with respx.mock: + # Mock the global login endpoint with incomplete response + respx.post( + GLOBAL_API_BASE_URL + "/iam/service/api/v1/login" + ).mock(return_value=Response(200, json={ + "code": "0000", + "message": "Success", + "data": { + "userId": "global123456", + "expiresIn": 3600 + } + })) + + auth = SmartAuthentication( + username=TEST_USERNAME, + password=TEST_PASSWORD, + endpoint_urls=endpoint_urls, + ) + + # Should raise SmartAPIError when access_token is missing + with pytest.raises(Exception) as exc_info: + await auth._login_global() + + assert "Could not get access token from global login" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_login_global_missing_user_id(): + """Test that _login_global raises error when user ID is missing.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + + with respx.mock: + # Mock the global login endpoint with incomplete response + respx.post( + GLOBAL_API_BASE_URL + "/iam/service/api/v1/login" + ).mock(return_value=Response(200, json={ + "code": "0000", + "message": "Success", + "data": { + "accessToken": "TestToken", + "expiresIn": 3600 + } + })) + + auth = SmartAuthentication( + username=TEST_USERNAME, + password=TEST_PASSWORD, + endpoint_urls=endpoint_urls, + ) + + # Should raise SmartAPIError when userId is missing + with pytest.raises(Exception) as exc_info: + await auth._login_global() + + assert "Could not get access token from global login" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_login_global_no_data_field(): + """Test that _login_global raises error when data field is missing.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + + with respx.mock: + # Mock the global login endpoint with no data field + respx.post( + GLOBAL_API_BASE_URL + "/iam/service/api/v1/login" + ).mock(return_value=Response(200, json={ + "code": "1001", + "message": "Invalid credentials" + })) + + auth = SmartAuthentication( + username=TEST_USERNAME, + password=TEST_PASSWORD, + endpoint_urls=endpoint_urls, + ) + + # Should raise SmartAPIError when data is missing + with pytest.raises(Exception) as exc_info: + await auth._login_global() + + assert "Could not get tokens from global login" in str(exc_info.value) + assert "Invalid credentials" in str(exc_info.value) From 6580806b9ddd3581d45f70ce50a42433e5ec874e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 4 Feb 2026 21:24:55 +0000 Subject: [PATCH 10/16] Fix linting issues in authentication tests Co-authored-by: DasBasti <1713093+DasBasti@users.noreply.github.com> --- pysmarthashtag/tests/test_authentication.py | 173 +++++++++----------- 1 file changed, 79 insertions(+), 94 deletions(-) diff --git a/pysmarthashtag/tests/test_authentication.py b/pysmarthashtag/tests/test_authentication.py index 355b3d0..bc6be09 100644 --- a/pysmarthashtag/tests/test_authentication.py +++ b/pysmarthashtag/tests/test_authentication.py @@ -10,12 +10,9 @@ from pysmarthashtag.api.authentication import SmartAuthentication from pysmarthashtag.const import ( GLOBAL_API_BASE_URL, - GLOBAL_APP_KEY, - GLOBAL_APP_SECRET, SmartAuthMode, - get_endpoint_urls_for_region, SmartRegion, - EndpointUrls, + get_endpoint_urls_for_region, ) from pysmarthashtag.tests import RESPONSE_DIR, TEST_PASSWORD, TEST_USERNAME, load_response @@ -26,33 +23,33 @@ async def test_login_global(): """Test the Global HMAC login flow.""" endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) - + with respx.mock: # Mock the global login endpoint - login_route = respx.post( - GLOBAL_API_BASE_URL + "/iam/service/api/v1/login" - ).mock(return_value=Response(200, json=load_response(RESPONSE_DIR / "global_login_result.json"))) - + login_route = respx.post(GLOBAL_API_BASE_URL + "/iam/service/api/v1/login").mock( + return_value=Response(200, json=load_response(RESPONSE_DIR / "global_login_result.json")) + ) + # Mock API session endpoint - respx.post( - GLOBAL_API_BASE_URL + "/iam/service/api/v1/session?identity_type=smart" - ).mock(return_value=Response(200, json=load_response(RESPONSE_DIR / "api_access.json"))) - + respx.post(GLOBAL_API_BASE_URL + "/iam/service/api/v1/session?identity_type=smart").mock( + return_value=Response(200, json=load_response(RESPONSE_DIR / "api_access.json")) + ) + auth = SmartAuthentication( username=TEST_USERNAME, password=TEST_PASSWORD, endpoint_urls=endpoint_urls, ) - + # Verify auth_mode is correctly inferred assert auth.auth_mode == SmartAuthMode.GLOBAL_HMAC - + # Call _login_global directly result = await auth._login_global() - + # Verify the login endpoint was called assert login_route.called - + # Verify the result contains all expected fields assert "access_token" in result assert "refresh_token" in result @@ -61,7 +58,7 @@ async def test_login_global(): assert "api_user_id" in result assert "id_token" in result assert "expires_at" in result - + # Verify the token values are correct assert result["access_token"] == "TestGlobalAccessToken" assert result["refresh_token"] == "TestGlobalRefreshToken" @@ -69,7 +66,7 @@ async def test_login_global(): assert result["api_refresh_token"] == "TestGlobalRefreshToken" assert result["api_user_id"] == "global123456" assert result["id_token"] == "TestGlobalIdToken" - + # Verify expires_at is a datetime in the future assert isinstance(result["expires_at"], datetime.datetime) assert result["expires_at"] > datetime.datetime.now(datetime.timezone.utc) @@ -79,29 +76,29 @@ async def test_login_global(): async def test_refresh_access_token_global(): """Test the Global HMAC token refresh flow.""" endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) - + with respx.mock: # Mock the global refresh endpoint - refresh_route = respx.post( - GLOBAL_API_BASE_URL + "/iam/service/api/v1/refresh/" - ).mock(return_value=Response(200, json=load_response(RESPONSE_DIR / "global_refresh_result.json"))) - + refresh_route = respx.post(GLOBAL_API_BASE_URL + "/iam/service/api/v1/refresh/").mock( + return_value=Response(200, json=load_response(RESPONSE_DIR / "global_refresh_result.json")) + ) + auth = SmartAuthentication( username=TEST_USERNAME, password=TEST_PASSWORD, refresh_token="TestGlobalRefreshToken", endpoint_urls=endpoint_urls, ) - + # Verify auth_mode is correctly inferred assert auth.auth_mode == SmartAuthMode.GLOBAL_HMAC - + # Call _refresh_access_token_global directly result = await auth._refresh_access_token_global() - + # Verify the refresh endpoint was called assert refresh_route.called - + # Verify the result contains all expected fields assert "access_token" in result assert "refresh_token" in result @@ -110,7 +107,7 @@ async def test_refresh_access_token_global(): assert "api_user_id" in result assert "id_token" in result assert "expires_at" in result - + # Verify the token values are correct assert result["access_token"] == "TestGlobalRefreshedAccessToken" assert result["refresh_token"] == "TestGlobalRefreshedRefreshToken" @@ -118,7 +115,7 @@ async def test_refresh_access_token_global(): assert result["api_refresh_token"] == "TestGlobalRefreshedRefreshToken" assert result["api_user_id"] == "global123456" assert result["id_token"] == "TestGlobalRefreshedIdToken" - + # Verify expires_at is a datetime in the future assert isinstance(result["expires_at"], datetime.datetime) assert result["expires_at"] > datetime.datetime.now(datetime.timezone.utc) @@ -128,19 +125,19 @@ async def test_refresh_access_token_global(): async def test_refresh_access_token_global_no_refresh_token(): """Test the Global HMAC token refresh returns empty dict when no refresh token.""" endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) - + auth = SmartAuthentication( username=TEST_USERNAME, password=TEST_PASSWORD, endpoint_urls=endpoint_urls, ) - + # Verify auth_mode is correctly inferred assert auth.auth_mode == SmartAuthMode.GLOBAL_HMAC - + # Call _refresh_access_token_global directly without refresh token result = await auth._refresh_access_token_global() - + # Should return empty dict assert result == {} @@ -149,33 +146,33 @@ async def test_refresh_access_token_global_no_refresh_token(): async def test_auth_mode_selection_global(): """Test that auth_mode selection triggers Global HMAC code paths.""" endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) - + with respx.mock: # Mock the global login endpoint - login_route = respx.post( - GLOBAL_API_BASE_URL + "/iam/service/api/v1/login" - ).mock(return_value=Response(200, json=load_response(RESPONSE_DIR / "global_login_result.json"))) - + login_route = respx.post(GLOBAL_API_BASE_URL + "/iam/service/api/v1/login").mock( + return_value=Response(200, json=load_response(RESPONSE_DIR / "global_login_result.json")) + ) + # Mock API session endpoint - session_route = respx.post( - GLOBAL_API_BASE_URL + "/iam/service/api/v1/session?identity_type=smart" - ).mock(return_value=Response(200, json=load_response(RESPONSE_DIR / "api_access.json"))) - + respx.post(GLOBAL_API_BASE_URL + "/iam/service/api/v1/session?identity_type=smart").mock( + return_value=Response(200, json=load_response(RESPONSE_DIR / "api_access.json")) + ) + auth = SmartAuthentication( username=TEST_USERNAME, password=TEST_PASSWORD, endpoint_urls=endpoint_urls, ) - + # Verify auth_mode is correctly inferred as GLOBAL_HMAC assert auth.auth_mode == SmartAuthMode.GLOBAL_HMAC - + # Call the main _login method which should route to _login_global result = await auth._login() - + # Verify the global login endpoint was called (not EU OAuth) assert login_route.called - + # Verify the result contains global tokens assert result["access_token"] == "TestGlobalAccessToken" assert result["api_user_id"] == "global123456" @@ -185,29 +182,29 @@ async def test_auth_mode_selection_global(): async def test_auth_mode_selection_global_refresh(): """Test that auth_mode selection triggers Global HMAC refresh code path.""" endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) - + with respx.mock: # Mock the global refresh endpoint - refresh_route = respx.post( - GLOBAL_API_BASE_URL + "/iam/service/api/v1/refresh/" - ).mock(return_value=Response(200, json=load_response(RESPONSE_DIR / "global_refresh_result.json"))) - + refresh_route = respx.post(GLOBAL_API_BASE_URL + "/iam/service/api/v1/refresh/").mock( + return_value=Response(200, json=load_response(RESPONSE_DIR / "global_refresh_result.json")) + ) + auth = SmartAuthentication( username=TEST_USERNAME, password=TEST_PASSWORD, refresh_token="TestGlobalRefreshToken", endpoint_urls=endpoint_urls, ) - + # Verify auth_mode is correctly inferred as GLOBAL_HMAC assert auth.auth_mode == SmartAuthMode.GLOBAL_HMAC - + # Call the main _refresh_access_token method which should route to _refresh_access_token_global result = await auth._refresh_access_token() - + # Verify the global refresh endpoint was called (not EU OAuth) assert refresh_route.called - + # Verify the result contains refreshed global tokens assert result["access_token"] == "TestGlobalRefreshedAccessToken" assert result["api_user_id"] == "global123456" @@ -217,30 +214,25 @@ async def test_auth_mode_selection_global_refresh(): async def test_login_global_missing_access_token(): """Test that _login_global raises error when access token is missing.""" endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) - + with respx.mock: # Mock the global login endpoint with incomplete response - respx.post( - GLOBAL_API_BASE_URL + "/iam/service/api/v1/login" - ).mock(return_value=Response(200, json={ - "code": "0000", - "message": "Success", - "data": { - "userId": "global123456", - "expiresIn": 3600 - } - })) - + respx.post(GLOBAL_API_BASE_URL + "/iam/service/api/v1/login").mock( + return_value=Response( + 200, json={"code": "0000", "message": "Success", "data": {"userId": "global123456", "expiresIn": 3600}} + ) + ) + auth = SmartAuthentication( username=TEST_USERNAME, password=TEST_PASSWORD, endpoint_urls=endpoint_urls, ) - + # Should raise SmartAPIError when access_token is missing with pytest.raises(Exception) as exc_info: await auth._login_global() - + assert "Could not get access token from global login" in str(exc_info.value) @@ -248,30 +240,26 @@ async def test_login_global_missing_access_token(): async def test_login_global_missing_user_id(): """Test that _login_global raises error when user ID is missing.""" endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) - + with respx.mock: # Mock the global login endpoint with incomplete response - respx.post( - GLOBAL_API_BASE_URL + "/iam/service/api/v1/login" - ).mock(return_value=Response(200, json={ - "code": "0000", - "message": "Success", - "data": { - "accessToken": "TestToken", - "expiresIn": 3600 - } - })) - + respx.post(GLOBAL_API_BASE_URL + "/iam/service/api/v1/login").mock( + return_value=Response( + 200, + json={"code": "0000", "message": "Success", "data": {"accessToken": "TestToken", "expiresIn": 3600}}, + ) + ) + auth = SmartAuthentication( username=TEST_USERNAME, password=TEST_PASSWORD, endpoint_urls=endpoint_urls, ) - + # Should raise SmartAPIError when userId is missing with pytest.raises(Exception) as exc_info: await auth._login_global() - + assert "Could not get access token from global login" in str(exc_info.value) @@ -279,25 +267,22 @@ async def test_login_global_missing_user_id(): async def test_login_global_no_data_field(): """Test that _login_global raises error when data field is missing.""" endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) - + with respx.mock: # Mock the global login endpoint with no data field - respx.post( - GLOBAL_API_BASE_URL + "/iam/service/api/v1/login" - ).mock(return_value=Response(200, json={ - "code": "1001", - "message": "Invalid credentials" - })) - + respx.post(GLOBAL_API_BASE_URL + "/iam/service/api/v1/login").mock( + return_value=Response(200, json={"code": "1001", "message": "Invalid credentials"}) + ) + auth = SmartAuthentication( username=TEST_USERNAME, password=TEST_PASSWORD, endpoint_urls=endpoint_urls, ) - + # Should raise SmartAPIError when data is missing with pytest.raises(Exception) as exc_info: await auth._login_global() - + assert "Could not get tokens from global login" in str(exc_info.value) assert "Invalid credentials" in str(exc_info.value) From 9576f8a7100e84a42ee7d0ba85ab7c7740d11a0c Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 4 Feb 2026 21:26:46 +0000 Subject: [PATCH 11/16] Add global authentication mode tests - Created mock response files for global API endpoints - Added SmartGlobalMockRouter for global auth testing - Implemented comprehensive tests for _init_vehicles_global() and _update_global_vehicle_details() - All tests passing, no regression in existing tests Co-authored-by: DasBasti <1713093+DasBasti@users.noreply.github.com> --- pysmarthashtag/tests/common.py | 79 +++++++++++ .../replys/global_vehicle_abilities.json | 25 ++++ .../tests/replys/global_vehicle_details.json | 25 ++++ .../tests/replys/global_vehicle_details2.json | 25 ++++ .../tests/replys/global_vehicle_list.json | 22 +++ pysmarthashtag/tests/test_global_auth.py | 125 ++++++++++++++++++ 6 files changed, 301 insertions(+) create mode 100644 pysmarthashtag/tests/replys/global_vehicle_abilities.json create mode 100644 pysmarthashtag/tests/replys/global_vehicle_details.json create mode 100644 pysmarthashtag/tests/replys/global_vehicle_details2.json create mode 100644 pysmarthashtag/tests/replys/global_vehicle_list.json create mode 100644 pysmarthashtag/tests/test_global_auth.py diff --git a/pysmarthashtag/tests/common.py b/pysmarthashtag/tests/common.py index 95cd1ff..fcae2f8 100644 --- a/pysmarthashtag/tests/common.py +++ b/pysmarthashtag/tests/common.py @@ -1,5 +1,6 @@ """Fixtures for Smart tests.""" +import httpx import respx from pysmarthashtag.const import ( @@ -10,6 +11,7 @@ API_SESION_URL, AUTH_URL, EU_OAUTH_BASE_URL, + GLOBAL_API_BASE_URL, LOGIN_URL, OTA_SERVER_URL, SERVER_URL, @@ -106,3 +108,80 @@ def add_login_routes(self) -> None: 200, json=load_response(RESPONSE_DIR / "ota_response.json"), ) + + def add_global_routes(self) -> None: + """Add routes for global authentication mode.""" + # Global vehicle ownership list + self.post(GLOBAL_API_BASE_URL + "/vc/vehicle/v1/ownership/list").respond( + 200, + json=load_response(RESPONSE_DIR / "global_vehicle_list.json"), + ) + # Global vehicle details for each test VIN + for vin in ["TestVIN0000000001", "TestVIN0000000002"]: + self.post(GLOBAL_API_BASE_URL + "/vc/vehicle/v1/vehicleCustomerInfo").respond( + 200, + json=load_response(RESPONSE_DIR / "global_vehicle_details.json"), + ) + # Global vehicle abilities (need to handle dynamic model codes) + self.route(method="GET", url__regex=r"^" + GLOBAL_API_BASE_URL + r"/vc/vehicle/v1/ability/.+/.+$").respond( + 200, + json=load_response(RESPONSE_DIR / "global_vehicle_abilities.json"), + ) + + +class SmartGlobalMockRouter(respx.MockRouter): + """Stateful MockRouter for Smart Global APIs.""" + + def __init__( + self, + ) -> None: + """Initialize the SmartGlobalMockRouter with clean responses for global auth.""" + super().__init__(assert_all_called=False) + + self.add_login_routes() + self.add_global_routes() + + def add_login_routes(self) -> None: + """Add routes for login (global HMAC flow).""" + # Global login uses a different endpoint + self.post(GLOBAL_API_BASE_URL + "/iam/service/api/v1/login").respond( + 200, + json={ + "code": "0000", + "message": "success", + "data": { + "accessToken": "TestGlobalAccessToken", + "refreshToken": "TestGlobalRefreshToken", + "idToken": "TestGlobalIdToken", + "userId": "112233", + "expiresIn": 3600, + }, + }, + ) + + def add_global_routes(self) -> None: + """Add routes for global authentication mode.""" + # Global vehicle ownership list + self.post(GLOBAL_API_BASE_URL + "/vc/vehicle/v1/ownership/list").respond( + 200, + json=load_response(RESPONSE_DIR / "global_vehicle_list.json"), + ) + + # Global vehicle details - use side_effect to return different responses based on request + def vehicle_details_handler(request, route): + import json as json_mod + body = json_mod.loads(request.content) + vin = body.get("vin") + if vin == "TestVIN0000000001": + return httpx.Response(200, json=load_response(RESPONSE_DIR / "global_vehicle_details.json")) + elif vin == "TestVIN0000000002": + return httpx.Response(200, json=load_response(RESPONSE_DIR / "global_vehicle_details2.json")) + return httpx.Response(404, json={"code": "404", "message": "Vehicle not found"}) + + self.post(GLOBAL_API_BASE_URL + "/vc/vehicle/v1/vehicleCustomerInfo").mock(side_effect=vehicle_details_handler) + + # Global vehicle abilities (need to handle dynamic model codes) + self.route(method="GET", url__regex=r"^" + GLOBAL_API_BASE_URL + r"/vc/vehicle/v1/ability/.+/.+$").respond( + 200, + json=load_response(RESPONSE_DIR / "global_vehicle_abilities.json"), + ) diff --git a/pysmarthashtag/tests/replys/global_vehicle_abilities.json b/pysmarthashtag/tests/replys/global_vehicle_abilities.json new file mode 100644 index 0000000..80fcd28 --- /dev/null +++ b/pysmarthashtag/tests/replys/global_vehicle_abilities.json @@ -0,0 +1,25 @@ +{ + "code": "0000", + "message": "success", + "result": { + "remoteControl": { + "climate": true, + "lock": true, + "unlock": true, + "horn": true, + "light": true + }, + "chargingControl": { + "startCharging": true, + "stopCharging": true, + "setChargingLimit": true + }, + "vehicleStatus": { + "battery": true, + "climate": true, + "doors": true, + "location": true, + "windows": true + } + } +} diff --git a/pysmarthashtag/tests/replys/global_vehicle_details.json b/pysmarthashtag/tests/replys/global_vehicle_details.json new file mode 100644 index 0000000..083f332 --- /dev/null +++ b/pysmarthashtag/tests/replys/global_vehicle_details.json @@ -0,0 +1,25 @@ +{ + "code": "0000", + "message": "success", + "result": [ + { + "vin": "TestVIN0000000001", + "modelCode": "HX11_EUL_Premium+_RWD_000", + "modelName": "Smart #1", + "seriesName": "HX11", + "seriesCodeVs": "HX11", + "colorCode": "087", + "colorName": "SPECTRUM BLUE", + "factoryCode": "1234", + "engineNo": "ABCDEFABC", + "plateNo": "", + "iccid": "01234567890123456789", + "msisdn": "112233445566770", + "updateTime": 1706199500000, + "vehicleType": 0, + "tboxPlatform": "tsp", + "ihuPlatform": "tsp", + "proprietaryPlatform": 0 + } + ] +} diff --git a/pysmarthashtag/tests/replys/global_vehicle_details2.json b/pysmarthashtag/tests/replys/global_vehicle_details2.json new file mode 100644 index 0000000..7b5ebca --- /dev/null +++ b/pysmarthashtag/tests/replys/global_vehicle_details2.json @@ -0,0 +1,25 @@ +{ + "code": "0000", + "message": "success", + "result": [ + { + "vin": "TestVIN0000000002", + "modelCode": "HY11_EUL_Premium+_RWD_000", + "modelName": "Smart #3", + "seriesName": "HY11", + "seriesCodeVs": "HY11", + "colorCode": "087", + "colorName": "SPECTRUM BLUE", + "factoryCode": "1234", + "engineNo": "ABCDEFABC", + "plateNo": "", + "iccid": "01234567890123456789", + "msisdn": "112233445566770", + "updateTime": 1706199500000, + "vehicleType": 0, + "tboxPlatform": "tsp", + "ihuPlatform": "tsp", + "proprietaryPlatform": 0 + } + ] +} diff --git a/pysmarthashtag/tests/replys/global_vehicle_list.json b/pysmarthashtag/tests/replys/global_vehicle_list.json new file mode 100644 index 0000000..bb4c94e --- /dev/null +++ b/pysmarthashtag/tests/replys/global_vehicle_list.json @@ -0,0 +1,22 @@ +{ + "code": "0000", + "message": "success", + "result": [ + { + "vin": "TestVIN0000000001", + "modelCode": "HX11_EUL_Premium+_RWD_000", + "modelName": "Smart #1", + "seriesName": "HX11", + "colorCode": "087", + "colorName": "SPECTRUM BLUE" + }, + { + "vin": "TestVIN0000000002", + "modelCode": "HY11_EUL_Premium+_RWD_000", + "modelName": "Smart #3", + "seriesName": "HY11", + "colorCode": "087", + "colorName": "SPECTRUM BLUE" + } + ] +} diff --git a/pysmarthashtag/tests/test_global_auth.py b/pysmarthashtag/tests/test_global_auth.py new file mode 100644 index 0000000..96e10ea --- /dev/null +++ b/pysmarthashtag/tests/test_global_auth.py @@ -0,0 +1,125 @@ +"""Tests for Smart Global authentication mode.""" + +import logging + +import pytest +import respx + +from pysmarthashtag.account import SmartAccount +from pysmarthashtag.const import GLOBAL_API_BASE_URL, SmartRegion, get_endpoint_urls_for_region +from pysmarthashtag.tests import TEST_PASSWORD, TEST_USERNAME +from pysmarthashtag.tests.common import SmartGlobalMockRouter + +_LOGGER = logging.getLogger(__name__) + + +@pytest.fixture +def smart_global_fixture(request: pytest.FixtureRequest): + """Patch Smart Global API calls.""" + router = SmartGlobalMockRouter() + + with router: + yield router + + +async def prepare_global_account_with_vehicles(): + """Initialize account with global endpoints and get vehicles.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + account = SmartAccount(TEST_USERNAME, TEST_PASSWORD, endpoint_urls=endpoint_urls) + await account.get_vehicles() + return account + + +@pytest.mark.asyncio +async def test_global_login(smart_global_fixture: respx.Router): + """Test the login flow with global authentication.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + account = SmartAccount(TEST_USERNAME, TEST_PASSWORD, endpoint_urls=endpoint_urls) + await account.login() + assert account.config.authentication.access_token is not None + + +@pytest.mark.asyncio +async def test_init_vehicles_global(smart_global_fixture: respx.Router): + """Test _init_vehicles_global() correctly parses vehicles from global API.""" + account = await prepare_global_account_with_vehicles() + + # Verify that vehicles were initialized + assert account is not None + assert account.vehicles is not None + assert len(account.vehicles) == 2 + + # Verify vehicle VINs + assert "TestVIN0000000001" in account.vehicles + assert "TestVIN0000000002" in account.vehicles + + # Verify vehicle data + vehicle1 = account.vehicles["TestVIN0000000001"] + assert vehicle1.vin == "TestVIN0000000001" + assert vehicle1.data.get("modelCode") == "HX11_EUL_Premium+_RWD_000" + assert vehicle1.data.get("modelName") == "Smart #1" + + vehicle2 = account.vehicles["TestVIN0000000002"] + assert vehicle2.vin == "TestVIN0000000002" + assert vehicle2.data.get("modelCode") == "HY11_EUL_Premium+_RWD_000" + assert vehicle2.data.get("modelName") == "Smart #3" + + +@pytest.mark.asyncio +async def test_update_global_vehicle_details(smart_global_fixture: respx.Router): + """Test _update_global_vehicle_details() correctly populates combine_data and abilities.""" + account = await prepare_global_account_with_vehicles() + + # Verify that vehicles have detailed data from global API + vehicle1 = account.vehicles["TestVIN0000000001"] + + # Check that combine_data was called with details from global API + # The global_vehicle_details.json contains additional vehicle information + assert vehicle1.data is not None + assert vehicle1.data.get("vin") == "TestVIN0000000001" + + # Verify that abilities were populated from global API + assert "abilities" in vehicle1.data + abilities = vehicle1.data["abilities"] + assert abilities is not None + + # Verify expected abilities structure + assert "remoteControl" in abilities + assert "chargingControl" in abilities + assert "vehicleStatus" in abilities + + # Verify specific abilities + assert abilities["remoteControl"]["climate"] is True + assert abilities["chargingControl"]["startCharging"] is True + assert abilities["vehicleStatus"]["battery"] is True + + +@pytest.mark.asyncio +async def test_global_with_endpoint_urls(smart_global_fixture: respx.Router): + """Test global authentication using EndpointUrls directly.""" + from pysmarthashtag.const import EndpointUrls + + endpoint_urls = EndpointUrls( + api_base_url=GLOBAL_API_BASE_URL, + api_base_url_v2=GLOBAL_API_BASE_URL, + ) + account = SmartAccount(TEST_USERNAME, TEST_PASSWORD, endpoint_urls=endpoint_urls) + await account.get_vehicles() + + # Verify vehicles were loaded + assert len(account.vehicles) == 2 + assert "TestVIN0000000001" in account.vehicles + assert "TestVIN0000000002" in account.vehicles + + +@pytest.mark.asyncio +async def test_global_auth_mode_detection(smart_global_fixture: respx.Router): + """Test that global auth mode is correctly detected.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + account = SmartAccount(TEST_USERNAME, TEST_PASSWORD, endpoint_urls=endpoint_urls) + await account.login() + + # Verify that the account uses global HMAC authentication + from pysmarthashtag.const import SmartAuthMode + assert account.config.authentication.auth_mode == SmartAuthMode.GLOBAL_HMAC + assert account._is_global_auth() is True From 99f796b4b28de19dcd18a1da989817db04328695 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 4 Feb 2026 21:28:08 +0000 Subject: [PATCH 12/16] Address code review feedback - Move json import to module level in common.py - Fix SmartMockRouter.add_global_routes to use side_effect handler - Rename prepare_global_account_with_vehicles to create_global_account_with_vehicles Co-authored-by: DasBasti <1713093+DasBasti@users.noreply.github.com> --- pysmarthashtag/tests/common.py | 24 ++++++++++++++++-------- pysmarthashtag/tests/test_global_auth.py | 8 ++++---- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/pysmarthashtag/tests/common.py b/pysmarthashtag/tests/common.py index fcae2f8..fe3c78c 100644 --- a/pysmarthashtag/tests/common.py +++ b/pysmarthashtag/tests/common.py @@ -1,5 +1,7 @@ """Fixtures for Smart tests.""" +import json + import httpx import respx @@ -116,12 +118,19 @@ def add_global_routes(self) -> None: 200, json=load_response(RESPONSE_DIR / "global_vehicle_list.json"), ) - # Global vehicle details for each test VIN - for vin in ["TestVIN0000000001", "TestVIN0000000002"]: - self.post(GLOBAL_API_BASE_URL + "/vc/vehicle/v1/vehicleCustomerInfo").respond( - 200, - json=load_response(RESPONSE_DIR / "global_vehicle_details.json"), - ) + + # Global vehicle details - use side_effect to return different responses based on request + def vehicle_details_handler(request, route): + body = json.loads(request.content) + vin = body.get("vin") + if vin == "TestVIN0000000001": + return httpx.Response(200, json=load_response(RESPONSE_DIR / "global_vehicle_details.json")) + elif vin == "TestVIN0000000002": + return httpx.Response(200, json=load_response(RESPONSE_DIR / "global_vehicle_details2.json")) + return httpx.Response(404, json={"code": "404", "message": "Vehicle not found"}) + + self.post(GLOBAL_API_BASE_URL + "/vc/vehicle/v1/vehicleCustomerInfo").mock(side_effect=vehicle_details_handler) + # Global vehicle abilities (need to handle dynamic model codes) self.route(method="GET", url__regex=r"^" + GLOBAL_API_BASE_URL + r"/vc/vehicle/v1/ability/.+/.+$").respond( 200, @@ -169,8 +178,7 @@ def add_global_routes(self) -> None: # Global vehicle details - use side_effect to return different responses based on request def vehicle_details_handler(request, route): - import json as json_mod - body = json_mod.loads(request.content) + body = json.loads(request.content) vin = body.get("vin") if vin == "TestVIN0000000001": return httpx.Response(200, json=load_response(RESPONSE_DIR / "global_vehicle_details.json")) diff --git a/pysmarthashtag/tests/test_global_auth.py b/pysmarthashtag/tests/test_global_auth.py index 96e10ea..4891efd 100644 --- a/pysmarthashtag/tests/test_global_auth.py +++ b/pysmarthashtag/tests/test_global_auth.py @@ -22,8 +22,8 @@ def smart_global_fixture(request: pytest.FixtureRequest): yield router -async def prepare_global_account_with_vehicles(): - """Initialize account with global endpoints and get vehicles.""" +async def create_global_account_with_vehicles(): + """Create and initialize account with global endpoints and get vehicles.""" endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) account = SmartAccount(TEST_USERNAME, TEST_PASSWORD, endpoint_urls=endpoint_urls) await account.get_vehicles() @@ -42,7 +42,7 @@ async def test_global_login(smart_global_fixture: respx.Router): @pytest.mark.asyncio async def test_init_vehicles_global(smart_global_fixture: respx.Router): """Test _init_vehicles_global() correctly parses vehicles from global API.""" - account = await prepare_global_account_with_vehicles() + account = await create_global_account_with_vehicles() # Verify that vehicles were initialized assert account is not None @@ -68,7 +68,7 @@ async def test_init_vehicles_global(smart_global_fixture: respx.Router): @pytest.mark.asyncio async def test_update_global_vehicle_details(smart_global_fixture: respx.Router): """Test _update_global_vehicle_details() correctly populates combine_data and abilities.""" - account = await prepare_global_account_with_vehicles() + account = await create_global_account_with_vehicles() # Verify that vehicles have detailed data from global API vehicle1 = account.vehicles["TestVIN0000000001"] From 52018f107fe0a931150028b01330f9ba613a0779 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 4 Feb 2026 21:28:40 +0000 Subject: [PATCH 13/16] Fix linting and formatting issues - Remove trailing whitespace from blank lines - Apply ruff formatting Co-authored-by: DasBasti <1713093+DasBasti@users.noreply.github.com> --- pysmarthashtag/tests/common.py | 12 ++++++------ pysmarthashtag/tests/test_global_auth.py | 25 ++++++++++++------------ 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/pysmarthashtag/tests/common.py b/pysmarthashtag/tests/common.py index fe3c78c..0bf764b 100644 --- a/pysmarthashtag/tests/common.py +++ b/pysmarthashtag/tests/common.py @@ -118,7 +118,7 @@ def add_global_routes(self) -> None: 200, json=load_response(RESPONSE_DIR / "global_vehicle_list.json"), ) - + # Global vehicle details - use side_effect to return different responses based on request def vehicle_details_handler(request, route): body = json.loads(request.content) @@ -128,9 +128,9 @@ def vehicle_details_handler(request, route): elif vin == "TestVIN0000000002": return httpx.Response(200, json=load_response(RESPONSE_DIR / "global_vehicle_details2.json")) return httpx.Response(404, json={"code": "404", "message": "Vehicle not found"}) - + self.post(GLOBAL_API_BASE_URL + "/vc/vehicle/v1/vehicleCustomerInfo").mock(side_effect=vehicle_details_handler) - + # Global vehicle abilities (need to handle dynamic model codes) self.route(method="GET", url__regex=r"^" + GLOBAL_API_BASE_URL + r"/vc/vehicle/v1/ability/.+/.+$").respond( 200, @@ -175,7 +175,7 @@ def add_global_routes(self) -> None: 200, json=load_response(RESPONSE_DIR / "global_vehicle_list.json"), ) - + # Global vehicle details - use side_effect to return different responses based on request def vehicle_details_handler(request, route): body = json.loads(request.content) @@ -185,9 +185,9 @@ def vehicle_details_handler(request, route): elif vin == "TestVIN0000000002": return httpx.Response(200, json=load_response(RESPONSE_DIR / "global_vehicle_details2.json")) return httpx.Response(404, json={"code": "404", "message": "Vehicle not found"}) - + self.post(GLOBAL_API_BASE_URL + "/vc/vehicle/v1/vehicleCustomerInfo").mock(side_effect=vehicle_details_handler) - + # Global vehicle abilities (need to handle dynamic model codes) self.route(method="GET", url__regex=r"^" + GLOBAL_API_BASE_URL + r"/vc/vehicle/v1/ability/.+/.+$").respond( 200, diff --git a/pysmarthashtag/tests/test_global_auth.py b/pysmarthashtag/tests/test_global_auth.py index 4891efd..da64daf 100644 --- a/pysmarthashtag/tests/test_global_auth.py +++ b/pysmarthashtag/tests/test_global_auth.py @@ -43,22 +43,22 @@ async def test_global_login(smart_global_fixture: respx.Router): async def test_init_vehicles_global(smart_global_fixture: respx.Router): """Test _init_vehicles_global() correctly parses vehicles from global API.""" account = await create_global_account_with_vehicles() - + # Verify that vehicles were initialized assert account is not None assert account.vehicles is not None assert len(account.vehicles) == 2 - + # Verify vehicle VINs assert "TestVIN0000000001" in account.vehicles assert "TestVIN0000000002" in account.vehicles - + # Verify vehicle data vehicle1 = account.vehicles["TestVIN0000000001"] assert vehicle1.vin == "TestVIN0000000001" assert vehicle1.data.get("modelCode") == "HX11_EUL_Premium+_RWD_000" assert vehicle1.data.get("modelName") == "Smart #1" - + vehicle2 = account.vehicles["TestVIN0000000002"] assert vehicle2.vin == "TestVIN0000000002" assert vehicle2.data.get("modelCode") == "HY11_EUL_Premium+_RWD_000" @@ -69,25 +69,25 @@ async def test_init_vehicles_global(smart_global_fixture: respx.Router): async def test_update_global_vehicle_details(smart_global_fixture: respx.Router): """Test _update_global_vehicle_details() correctly populates combine_data and abilities.""" account = await create_global_account_with_vehicles() - + # Verify that vehicles have detailed data from global API vehicle1 = account.vehicles["TestVIN0000000001"] - + # Check that combine_data was called with details from global API # The global_vehicle_details.json contains additional vehicle information assert vehicle1.data is not None assert vehicle1.data.get("vin") == "TestVIN0000000001" - + # Verify that abilities were populated from global API assert "abilities" in vehicle1.data abilities = vehicle1.data["abilities"] assert abilities is not None - + # Verify expected abilities structure assert "remoteControl" in abilities assert "chargingControl" in abilities assert "vehicleStatus" in abilities - + # Verify specific abilities assert abilities["remoteControl"]["climate"] is True assert abilities["chargingControl"]["startCharging"] is True @@ -98,14 +98,14 @@ async def test_update_global_vehicle_details(smart_global_fixture: respx.Router) async def test_global_with_endpoint_urls(smart_global_fixture: respx.Router): """Test global authentication using EndpointUrls directly.""" from pysmarthashtag.const import EndpointUrls - + endpoint_urls = EndpointUrls( api_base_url=GLOBAL_API_BASE_URL, api_base_url_v2=GLOBAL_API_BASE_URL, ) account = SmartAccount(TEST_USERNAME, TEST_PASSWORD, endpoint_urls=endpoint_urls) await account.get_vehicles() - + # Verify vehicles were loaded assert len(account.vehicles) == 2 assert "TestVIN0000000001" in account.vehicles @@ -118,8 +118,9 @@ async def test_global_auth_mode_detection(smart_global_fixture: respx.Router): endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) account = SmartAccount(TEST_USERNAME, TEST_PASSWORD, endpoint_urls=endpoint_urls) await account.login() - + # Verify that the account uses global HMAC authentication from pysmarthashtag.const import SmartAuthMode + assert account.config.authentication.auth_mode == SmartAuthMode.GLOBAL_HMAC assert account._is_global_auth() is True From 1f6d6fda6776aac33bfac7f440c4af9a0e0d335d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 4 Feb 2026 21:30:02 +0000 Subject: [PATCH 14/16] Refactor to eliminate code duplication - Extract vehicle_details_handler to module-level _create_vehicle_details_handler function - Create shared _add_global_vehicle_routes helper function - Both SmartMockRouter and SmartGlobalMockRouter now use shared implementation Co-authored-by: DasBasti <1713093+DasBasti@users.noreply.github.com> --- pysmarthashtag/tests/common.py | 92 +++++++++++++++++----------------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/pysmarthashtag/tests/common.py b/pysmarthashtag/tests/common.py index 0bf764b..e9399b3 100644 --- a/pysmarthashtag/tests/common.py +++ b/pysmarthashtag/tests/common.py @@ -21,6 +21,50 @@ from pysmarthashtag.tests import RESPONSE_DIR, load_response +def _create_vehicle_details_handler(): + """Create a handler function for global vehicle details requests. + + Returns a function that returns different mock responses based on VIN in request body. + """ + + def vehicle_details_handler(request, route): + body = json.loads(request.content) + vin = body.get("vin") + if vin == "TestVIN0000000001": + return httpx.Response(200, json=load_response(RESPONSE_DIR / "global_vehicle_details.json")) + elif vin == "TestVIN0000000002": + return httpx.Response(200, json=load_response(RESPONSE_DIR / "global_vehicle_details2.json")) + return httpx.Response(404, json={"code": "404", "message": "Vehicle not found"}) + + return vehicle_details_handler + + +def _add_global_vehicle_routes(router: respx.MockRouter) -> None: + """Add global vehicle routes to the given router. + + Args: + ---- + router: The MockRouter to add routes to + + """ + # Global vehicle ownership list + router.post(GLOBAL_API_BASE_URL + "/vc/vehicle/v1/ownership/list").respond( + 200, + json=load_response(RESPONSE_DIR / "global_vehicle_list.json"), + ) + + # Global vehicle details - use side_effect to return different responses based on request + router.post(GLOBAL_API_BASE_URL + "/vc/vehicle/v1/vehicleCustomerInfo").mock( + side_effect=_create_vehicle_details_handler() + ) + + # Global vehicle abilities (need to handle dynamic model codes) + router.route(method="GET", url__regex=r"^" + GLOBAL_API_BASE_URL + r"/vc/vehicle/v1/ability/.+/.+$").respond( + 200, + json=load_response(RESPONSE_DIR / "global_vehicle_abilities.json"), + ) + + class SmartMockRouter(respx.MockRouter): """Stateful MockRouter for Smart APIs.""" @@ -113,29 +157,7 @@ def add_login_routes(self) -> None: def add_global_routes(self) -> None: """Add routes for global authentication mode.""" - # Global vehicle ownership list - self.post(GLOBAL_API_BASE_URL + "/vc/vehicle/v1/ownership/list").respond( - 200, - json=load_response(RESPONSE_DIR / "global_vehicle_list.json"), - ) - - # Global vehicle details - use side_effect to return different responses based on request - def vehicle_details_handler(request, route): - body = json.loads(request.content) - vin = body.get("vin") - if vin == "TestVIN0000000001": - return httpx.Response(200, json=load_response(RESPONSE_DIR / "global_vehicle_details.json")) - elif vin == "TestVIN0000000002": - return httpx.Response(200, json=load_response(RESPONSE_DIR / "global_vehicle_details2.json")) - return httpx.Response(404, json={"code": "404", "message": "Vehicle not found"}) - - self.post(GLOBAL_API_BASE_URL + "/vc/vehicle/v1/vehicleCustomerInfo").mock(side_effect=vehicle_details_handler) - - # Global vehicle abilities (need to handle dynamic model codes) - self.route(method="GET", url__regex=r"^" + GLOBAL_API_BASE_URL + r"/vc/vehicle/v1/ability/.+/.+$").respond( - 200, - json=load_response(RESPONSE_DIR / "global_vehicle_abilities.json"), - ) + _add_global_vehicle_routes(self) class SmartGlobalMockRouter(respx.MockRouter): @@ -170,26 +192,4 @@ def add_login_routes(self) -> None: def add_global_routes(self) -> None: """Add routes for global authentication mode.""" - # Global vehicle ownership list - self.post(GLOBAL_API_BASE_URL + "/vc/vehicle/v1/ownership/list").respond( - 200, - json=load_response(RESPONSE_DIR / "global_vehicle_list.json"), - ) - - # Global vehicle details - use side_effect to return different responses based on request - def vehicle_details_handler(request, route): - body = json.loads(request.content) - vin = body.get("vin") - if vin == "TestVIN0000000001": - return httpx.Response(200, json=load_response(RESPONSE_DIR / "global_vehicle_details.json")) - elif vin == "TestVIN0000000002": - return httpx.Response(200, json=load_response(RESPONSE_DIR / "global_vehicle_details2.json")) - return httpx.Response(404, json={"code": "404", "message": "Vehicle not found"}) - - self.post(GLOBAL_API_BASE_URL + "/vc/vehicle/v1/vehicleCustomerInfo").mock(side_effect=vehicle_details_handler) - - # Global vehicle abilities (need to handle dynamic model codes) - self.route(method="GET", url__regex=r"^" + GLOBAL_API_BASE_URL + r"/vc/vehicle/v1/ability/.+/.+$").respond( - 200, - json=load_response(RESPONSE_DIR / "global_vehicle_abilities.json"), - ) + _add_global_vehicle_routes(self) From 1f11a27a875d66accad9c76c78c28c36f7b5ef99 Mon Sep 17 00:00:00 2001 From: Bastian Neumann Date: Wed, 4 Feb 2026 22:40:22 +0100 Subject: [PATCH 15/16] Update pysmarthashtag/tests/test_authentication.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- pysmarthashtag/tests/test_authentication.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pysmarthashtag/tests/test_authentication.py b/pysmarthashtag/tests/test_authentication.py index bc6be09..d54e885 100644 --- a/pysmarthashtag/tests/test_authentication.py +++ b/pysmarthashtag/tests/test_authentication.py @@ -30,11 +30,6 @@ async def test_login_global(): return_value=Response(200, json=load_response(RESPONSE_DIR / "global_login_result.json")) ) - # Mock API session endpoint - respx.post(GLOBAL_API_BASE_URL + "/iam/service/api/v1/session?identity_type=smart").mock( - return_value=Response(200, json=load_response(RESPONSE_DIR / "api_access.json")) - ) - auth = SmartAuthentication( username=TEST_USERNAME, password=TEST_PASSWORD, From c4b8fd93d255f6fe22536ca612b8bc4d95e75eea Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 4 Feb 2026 21:41:38 +0000 Subject: [PATCH 16/16] Use SmartAPIError and remove unnecessary API session mocks Co-authored-by: DasBasti <1713093+DasBasti@users.noreply.github.com> --- pysmarthashtag/tests/test_authentication.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/pysmarthashtag/tests/test_authentication.py b/pysmarthashtag/tests/test_authentication.py index d54e885..2398f65 100644 --- a/pysmarthashtag/tests/test_authentication.py +++ b/pysmarthashtag/tests/test_authentication.py @@ -14,6 +14,7 @@ SmartRegion, get_endpoint_urls_for_region, ) +from pysmarthashtag.models import SmartAPIError from pysmarthashtag.tests import RESPONSE_DIR, TEST_PASSWORD, TEST_USERNAME, load_response _LOGGER = logging.getLogger(__name__) @@ -148,11 +149,6 @@ async def test_auth_mode_selection_global(): return_value=Response(200, json=load_response(RESPONSE_DIR / "global_login_result.json")) ) - # Mock API session endpoint - respx.post(GLOBAL_API_BASE_URL + "/iam/service/api/v1/session?identity_type=smart").mock( - return_value=Response(200, json=load_response(RESPONSE_DIR / "api_access.json")) - ) - auth = SmartAuthentication( username=TEST_USERNAME, password=TEST_PASSWORD, @@ -225,7 +221,7 @@ async def test_login_global_missing_access_token(): ) # Should raise SmartAPIError when access_token is missing - with pytest.raises(Exception) as exc_info: + with pytest.raises(SmartAPIError) as exc_info: await auth._login_global() assert "Could not get access token from global login" in str(exc_info.value) @@ -252,7 +248,7 @@ async def test_login_global_missing_user_id(): ) # Should raise SmartAPIError when userId is missing - with pytest.raises(Exception) as exc_info: + with pytest.raises(SmartAPIError) as exc_info: await auth._login_global() assert "Could not get access token from global login" in str(exc_info.value) @@ -276,7 +272,7 @@ async def test_login_global_no_data_field(): ) # Should raise SmartAPIError when data is missing - with pytest.raises(Exception) as exc_info: + with pytest.raises(SmartAPIError) as exc_info: await auth._login_global() assert "Could not get tokens from global login" in str(exc_info.value)