diff --git a/pyproject.toml b/pyproject.toml index c24b9d5..9db3e3b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,7 +51,7 @@ enable_error_code = "ignore-without-code" target-version = "py39" line-length = 120 -select = [ +lint.select = [ "C", # complexity "D", # docstrings "E", # pycodestyle @@ -62,7 +62,7 @@ select = [ "PGH004", # Use specific rule codes when using noqa ] -ignore = [ +lint.ignore = [ "D202", # No blank lines allowed after function docstring "D203", # 1 blank line required before class docstring "D212", # Multi-line docstring summary should start at the first line @@ -75,9 +75,9 @@ ignore = [ exclude = [ ] -[tool.ruff.per-file-ignores] +[tool.ruff.lint.per-file-ignores] "docs/source/conf.py" = ["D100"] -"pysamrthashtag/api/authentication.py" = ["D102", "D107"] +"pysmarthashtag/api/authentication.py" = ["D102", "D107"] -[tool.ruff.mccabe] +[tool.ruff.lint.mccabe] max-complexity = 25 diff --git a/pysmarthashtag/account.py b/pysmarthashtag/account.py index 7734a9e..5bc38bf 100644 --- a/pysmarthashtag/account.py +++ b/pysmarthashtag/account.py @@ -6,11 +6,13 @@ from dataclasses import InitVar, dataclass, field from typing import Optional +import httpx + from pysmarthashtag.api import utils -from pysmarthashtag.api.authentication import SmartAuthentication +from pysmarthashtag.api.authentication import SmartAuthentication, SmartLoginClient from pysmarthashtag.api.client import SmartClient, SmartClientConfiguration from pysmarthashtag.api.log_sanitizer import sanitize_log_data -from pysmarthashtag.const import API_CARS_URL, API_SELECT_CAR_URL, EndpointUrls +from pysmarthashtag.const import API_CARS_URL, API_SELECT_CAR_URL, EndpointUrls, SmartAuthMode from pysmarthashtag.models import SmartAuthError, SmartHumanCarConnectionError, SmartTokenRefreshNecessary from pysmarthashtag.vehicle.vehicle import SmartVehicle @@ -42,7 +44,18 @@ class SmartAccount: """Vehicles associated with the account.""" def __post_init__(self, password, log_responses): - """Initialize the account.""" + """ + Set up endpoint URLs and client configuration for the Smart account. + + If `endpoint_urls` is None, assigns a default EndpointUrls instance. + If `config` is None, creates a SmartClientConfiguration using a SmartAuthentication + initialized with the instance `username`, the provided `password`, and the + resolved `endpoint_urls`, and applies the `log_responses` flag to the configuration. + + Parameters: + password (str): Password used to construct the SmartAuthentication instance. + log_responses (bool): Whether the created configuration should log server responses. + """ # Ensure endpoint_urls is set if self.endpoint_urls is None: self.endpoint_urls = EndpointUrls() @@ -53,12 +66,20 @@ def __post_init__(self, password, log_responses): log_responses=log_responses, ) - async def _ensure_ssl_context(self) -> None: - """Ensure SSL context is created asynchronously. + def _is_global_auth(self) -> bool: + """ + Determine whether the account uses Global app authentication. + + Returns: + bool: `true` if the account's authentication mode equals `GLOBAL_HMAC`, `false` otherwise. + """ + return self.config.authentication.auth_mode == SmartAuthMode.GLOBAL_HMAC - This method creates the SSL context in a thread pool executor - to avoid blocking the async event loop when httpx creates - SSL connections. + async def _ensure_ssl_context(self) -> None: + """ + Ensure the configuration and its authentication both have an SSL context. + + If the configuration has no SSL context, obtain one via config.get_ssl_context() and assign it to config.ssl_context and config.authentication.ssl_context. """ if self.config.ssl_context is None: self.config.ssl_context = await self.config.get_ssl_context() @@ -73,7 +94,11 @@ async def login(self, force_refresh: bool = False) -> None: await self.config.authentication.login() async def _init_vehicles(self) -> None: - """Initialize vehicles from Smart servers.""" + """ + Initialize and populate account vehicles from the Smart API. + + Ensures an SSL context is available, requests the account's vehicle list from the configured Smart endpoint, records the UTC fetch timestamp, and adds each returned vehicle to the account via add_vehicle (passing the fetch time). Retries the request up to three times on token-refresh errors and reattempts initialization when a human-car-connection error occurs. + """ _LOGGER.debug("Getting initial vehicle list") await self._ensure_ssl_context() @@ -113,12 +138,61 @@ async def _init_vehicles(self) -> None: _LOGGER.debug("Found vehicle %s", sanitize_log_data(vehicle)) self.add_vehicle(vehicle, fetched_at) + async def _init_vehicles_global(self) -> None: + """ + Fetches the account's vehicle ownership list from the Smart Global endpoints and registers each vehicle with the account. + + Each discovered vehicle is added to the account's vehicle mapping along with the timestamp when the list was fetched. + """ + _LOGGER.debug("Getting initial vehicle list (global)") + await self._ensure_ssl_context() + + fetched_at = datetime.datetime.now(datetime.timezone.utc) + async with SmartLoginClient(ssl_context=self.config.ssl_context) as client: + path = "/vc/vehicle/v1/ownership/list" + body = json.dumps({}) + host = httpx.URL(self.endpoint_urls.get_api_base_url()).host + headers = utils.generate_global_header( + method="POST", + path=path, + host=host, + app_key=self.endpoint_urls.get_global_app_key(), + app_secret=self.endpoint_urls.get_global_app_secret(), + body=body, + access_token=self.config.authentication.access_token, + user_id=self.config.authentication.api_user_id, + id_token=self.config.authentication.id_token, + ) + vehicles_response = await client.post( + self.endpoint_urls.get_api_base_url() + path, + headers=headers, + content=body, + ) + data = vehicles_response.json() + vehicles = data.get("result") or data.get("data") or [] + for vehicle in vehicles: + _LOGGER.debug("Found vehicle %s", sanitize_log_data(vehicle)) + self.add_vehicle(vehicle, fetched_at) + def add_vehicle(self, vehicle, fetched_at): - """Add a vehicle to the account.""" + """ + Add a vehicle to the account's vehicle mapping. + + Parameters: + vehicle (dict): Vehicle data from the API; must include the `vin` key used as the mapping key. + fetched_at (datetime): UTC timestamp when the vehicle data was fetched. + """ self.vehicles[vehicle.get("vin")] = SmartVehicle(self, vehicle, fetched_at=fetched_at) async def get_vehicles(self, force_init: bool = False) -> None: - """Get the vehicles associated with the account.""" + """ + Load and refresh the vehicles for this Smart account and populate the account's internal vehicle mapping. + + If the account is not authenticated, perform authentication first. When called with `force_init=True` or when no vehicles are known, fetch the list of vehicles. For global-auth mode the method fetches global vehicle listings and updates global details and abilities; for non-global mode it selects each vehicle and updates its information, state-of-charge, and OTA info by merging the retrieved data into each SmartVehicle instance. + + Parameters: + force_init (bool): If True, re-initialize the vehicle list even if vehicles are already present. + """ await self._ensure_ssl_context() if self.config.authentication.api_user_id is None: await self.config.authentication.login() @@ -126,7 +200,14 @@ async def get_vehicles(self, force_init: bool = False) -> None: _LOGGER.debug("Getting vehicles for account") if len(self.vehicles) == 0 or force_init: - await self._init_vehicles() + if self._is_global_auth(): + await self._init_vehicles_global() + else: + await self._init_vehicles() + + if self._is_global_auth(): + await self._update_global_vehicle_details() + return for vin, vehicle in self.vehicles.items(): _LOGGER.debug("Getting vehicle data") @@ -137,7 +218,15 @@ async def get_vehicles(self, force_init: bool = False) -> None: vehicle.combine_data(vehicle_info, charging_settings=vehicle_soc, ota_info=vehicle_ota_info) async def select_active_vehicle(self, vin) -> None: - """Select the active vehicle.""" + """ + Selects the given vehicle as the active vehicle for subsequent operations. + + This updates the remote session to mark the vehicle identified by `vin` as active. When the account is configured for global authentication, this is a no-op. The method may perform internal retries on transient token or human-car-connection errors. + Parameters: + vin (str): Vehicle Identification Number of the vehicle to select. + """ + if self._is_global_auth(): + return _LOGGER.debug("Selecting vehicle") data = json.dumps( { @@ -173,8 +262,18 @@ async def select_active_vehicle(self, vin) -> None: continue break - async def get_vehicle_information(self, vin) -> str: - """Get information about a vehicle.""" + async def get_vehicle_information(self, vin) -> dict: + """ + Fetch the latest details and status for the vehicle identified by VIN, using global endpoints when the account is configured for global authentication. + + Returns: + dict: The `data` payload from the vehicle status response containing status, basic, and more fields; empty dict if no data was retrieved. + + Raises: + SmartAuthError: If vehicle information cannot be retrieved after retrying. + """ + if self._is_global_auth(): + return await self._get_vehicle_details_global(vin) _LOGGER.debug("Getting information for vehicle") params = { "latest": True, @@ -217,7 +316,19 @@ async def get_vehicle_information(self, vin) -> str: return data async def get_vehicle_soc(self, vin) -> str: - """Get information about a vehicle.""" + """ + Retrieve the vehicle's state-of-charge (SOC) data. + + If the account is using global authentication, this returns an empty dict. On failure after retries, raises SmartAuthError. + + Returns: + dict: SOC data payload from the vehicle response, or an empty dict when using global authentication. + + Raises: + SmartAuthError: If the SOC data could not be retrieved after retrying. + """ + if self._is_global_auth(): + return {} _LOGGER.debug("Getting vehicle SOC") params = { "setting": "charging", @@ -258,7 +369,24 @@ async def get_vehicle_soc(self, vin) -> str: return data async def get_vehicle_ota_info(self, vin) -> dict: - """Get information about a vehicle from OTA server.""" + """ + Retrieve OTA version information for the specified vehicle from the OTA server. + + If the account uses global authentication, no OTA request is performed and an empty dict is returned. + + Parameters: + vin (str): Vehicle Identification Number for the vehicle to query. + + Returns: + dict: A mapping with keys: + - `target_version`: OTA target version string or `None` if not present. + - `current_version`: Current vehicle OTA version string or `None` if not present. + + Raises: + SmartAuthError: When repeated authentication/token failures prevent retrieving OTA information. + """ + if self._is_global_auth(): + return {} _LOGGER.debug("Getting OTA information for vehicle") data = {} async with SmartClient(self.config) as client: @@ -296,3 +424,94 @@ async def get_vehicle_ota_info(self, vin) -> dict: if retry > 1: raise SmartAuthError("Could not get vehicle information") return data + + async def _update_global_vehicle_details(self) -> None: + """ + Fetch global details and abilities for each known vehicle and merge them into the corresponding SmartVehicle objects stored on the account. + + This updates each vehicle in-place with any details and capability information returned by the global service; no value is returned. + """ + for vin in list(self.vehicles.keys()): + await self._get_vehicle_details_global(vin) + await self._get_vehicle_abilities_global(vin) + + async def _get_vehicle_details_global(self, vin) -> dict: + """ + Fetch global vehicle details for the given VIN and merge them into the corresponding SmartVehicle if available. + + Parameters: + vin (str): Vehicle Identification Number to fetch details for. + + Returns: + dict: Parsed vehicle details retrieved from the global endpoint, or an empty dict if no details were returned. + """ + _LOGGER.debug("Getting global vehicle details") + await self._ensure_ssl_context() + async with SmartLoginClient(ssl_context=self.config.ssl_context) as client: + path = "/vc/vehicle/v1/vehicleCustomerInfo" + body = json.dumps({"vin": vin}) + host = httpx.URL(self.endpoint_urls.get_api_base_url()).host + headers = utils.generate_global_header( + method="POST", + path=path, + host=host, + app_key=self.endpoint_urls.get_global_app_key(), + app_secret=self.endpoint_urls.get_global_app_secret(), + body=body, + access_token=self.config.authentication.access_token, + user_id=self.config.authentication.api_user_id, + id_token=self.config.authentication.id_token, + ) + response = await client.post( + self.endpoint_urls.get_api_base_url() + path, + headers=headers, + content=body, + ) + data = response.json() + details = data.get("result") or data.get("data") or [] + if isinstance(details, list): + details = details[0] if details else {} + if details: + self.vehicles.get(vin).combine_data(details) + return details or {} + + async def _get_vehicle_abilities_global(self, vin) -> dict: + """ + Retrieve global ability information for a vehicle identified by VIN. + + Parameters: + vin (str): Vehicle Identification Number to query for abilities. + + Returns: + abilities (dict): Abilities data from the global API. The vehicle's `data["abilities"]` + is updated when abilities are present. Returns an empty dict if the vehicle has no + model code or if the API provides no abilities. + """ + _LOGGER.debug("Getting global vehicle abilities") + await self._ensure_ssl_context() + vehicle = self.vehicles.get(vin) + model_code = vehicle.data.get("modelCode") if vehicle else None + if not model_code: + return {} + async with SmartLoginClient(ssl_context=self.config.ssl_context) as client: + path = f"/vc/vehicle/v1/ability/{model_code}/{vin}" + host = httpx.URL(self.endpoint_urls.get_api_base_url()).host + headers = utils.generate_global_header( + method="GET", + path=path, + host=host, + app_key=self.endpoint_urls.get_global_app_key(), + app_secret=self.endpoint_urls.get_global_app_secret(), + access_token=self.config.authentication.access_token, + user_id=self.config.authentication.api_user_id, + id_token=self.config.authentication.id_token, + ) + response = await client.get( + self.endpoint_urls.get_api_base_url() + path, + headers=headers, + ) + data = response.json() + abilities = data.get("result") or data.get("data") or {} + if abilities and vehicle: + vehicle.data["abilities"] = abilities + return abilities or {} \ No newline at end of file diff --git a/pysmarthashtag/api/authentication.py b/pysmarthashtag/api/authentication.py index 68da8e6..0021b04 100644 --- a/pysmarthashtag/api/authentication.py +++ b/pysmarthashtag/api/authentication.py @@ -20,6 +20,7 @@ API_SESION_URL, HTTPX_TIMEOUT, EndpointUrls, + SmartAuthMode, ) from pysmarthashtag.models import SmartAPIError @@ -41,6 +42,21 @@ def __init__( ssl_context: Optional[ssl.SSLContext] = None, endpoint_urls: Optional[EndpointUrls] = None, ): + """ + Initialize the authentication manager with credentials, optional tokens, SSL context, and endpoint configuration. + + Parameters: + username (str): Account username used for authentication. + password (str): Account password used for authentication. + access_token (Optional[datetime.datetime]): Existing OAuth access token, if available. + expires_at (Optional[datetime.datetime]): Expiration time of `access_token`; used to determine when refresh/login is needed. + refresh_token (Optional[str]): Refresh token associated with `access_token`, if available. + ssl_context (Optional[ssl.SSLContext]): Optional SSL context to use for login requests to avoid creating one lazily. + endpoint_urls (Optional[EndpointUrls]): Endpoint configuration; if omitted a default is created and the authentication mode is inferred from it. + + Behavior: + Stores provided values on the instance, generates a random device identifier, initializes internal locks and API session token placeholders, and sets `auth_mode` by inferring it from `endpoint_urls`. + """ self.username: str = username self.password: str = password self.access_token: Optional[str] = access_token @@ -51,21 +67,20 @@ def __init__( self.api_access_token: Optional[str] = None self.api_refresh_token: Optional[str] = None self.api_user_id: Optional[str] = None + self.id_token: Optional[str] = None self.ssl_context: Optional[ssl.SSLContext] = ssl_context self.endpoint_urls: EndpointUrls = endpoint_urls if endpoint_urls is not None else EndpointUrls() + self.auth_mode: SmartAuthMode = self.endpoint_urls.infer_auth_mode() _LOGGER.debug("Device ID initialized") async def get_ssl_context(self) -> ssl.SSLContext: - """Get or create SSL context asynchronously. - - This method returns a cached SSL context if available, or creates - a new one asynchronously using the shared ssl_context module. - Thread-safe using asyncio.Lock. - - Returns - ------- - ssl.SSLContext: An SSL context for secure connections. - + """ + Obtain the SSLContext used for secure connections, creating and caching it if necessary. + + This method returns the cached SSLContext when present; otherwise it asynchronously acquires a new SSLContext and stores it for subsequent calls. The operation is safe for concurrent callers. + + Returns: + ssl.SSLContext: SSL context configured for secure HTTP connections. """ if self.ssl_context is None: # Import here to avoid circular imports @@ -135,7 +150,14 @@ async def async_auth_flow(self, request: Request) -> AsyncGenerator[Request, Res raise async def login(self) -> None: - """Login to the Smart API.""" + """ + Perform authentication with the Smart API and store retrieved tokens and expiry. + + Attempts to refresh the access token when a refresh token is available; otherwise performs a full login. On success, normalizes and stores `access_token`, `refresh_token`, `api_access_token`, `api_refresh_token`, `api_user_id`, optional `id_token`, and `expires_at` (adjusted by EXPIRES_AT_OFFSET) on the instance. + + Raises: + SmartAPIError: If required token fields are missing from the login response. + """ _LOGGER.debug("Logging in to Smart API") token_data = {} if self.refresh_token: @@ -150,6 +172,7 @@ async def login(self) -> None: self.api_access_token = token_data["api_access_token"] self.api_refresh_token = token_data["api_refresh_token"] self.api_user_id = token_data["api_user_id"] + self.id_token = token_data.get("id_token") self.expires_at = token_data["expires_at"] _LOGGER.debug("Login successful") return True @@ -157,18 +180,53 @@ async def login(self) -> None: raise SmartAPIError("Could not login to Smart API") async def _refresh_access_token(self): - """Refresh the access token.""" + """ + Attempt to refresh the stored access (and related) tokens using the configured authentication mode. + + Tries a mode-specific refresh (global HMAC refresh when configured, otherwise the EU refresh). If the refresh succeeds, returns a dict with refreshed token data (e.g., `access_token`, `refresh_token`, `expires_at`, and optional `id_token`); if the refresh fails, returns an empty dict to indicate a full login is required. + """ + if self.auth_mode == SmartAuthMode.GLOBAL_HMAC: + try: + return await self._refresh_access_token_global() + except (SmartAPIError, httpx.HTTPError, ValueError): + _LOGGER.debug("Refreshing access token failed. Logging in again") + return {} + try: - ssl_ctx = await self.get_ssl_context() - async with SmartLoginClient(ssl_context=ssl_ctx) as _: - _LOGGER.debug("Refreshing access token via relogin because refresh token is not implemented") - await self._login() - except SmartAPIError: + return await self._refresh_access_token_eu() + except (SmartAPIError, httpx.HTTPError, ValueError): _LOGGER.debug("Refreshing access token failed. Logging in again") return {} async def _login(self): - """Login to Smart web services.""" + """ + Selects and executes the appropriate login flow for the configured authentication mode. + + Dispatches to the global HMAC login when auth_mode is SmartAuthMode.GLOBAL_HMAC; otherwise runs the EU OAuth login flow. + + Returns: + token_data (dict): Authentication tokens and related metadata such as `access_token`, `refresh_token`, `expires_at`, and optionally `id_token` and API session fields. + """ + if self.auth_mode == SmartAuthMode.GLOBAL_HMAC: + return await self._login_global() + return await self._login_eu() + + async def _login_eu(self): + """ + Perform the EU OAuth login flow, exchange the OAuth access token for API session tokens, and return the resulting tokens and expiry. + + Returns: + dict: Mapping with the following keys: + access_token (str): OAuth access token obtained from the authorization redirect. + refresh_token (str): OAuth refresh token obtained from the authorization redirect. + api_access_token (str): API session access token exchanged from the OAuth access token. + api_refresh_token (str): API session refresh token exchanged from the OAuth access token. + api_user_id (str): User identifier returned by the API session exchange. + expires_at (datetime.datetime): UTC timestamp when the OAuth access token expires. + + Raises: + SmartAPIError: If the login context, login token, redirect location, or access/refresh tokens cannot be obtained. + """ ssl_ctx = await self.get_ssl_context() async with SmartLoginClient(ssl_context=ssl_ctx) as client: _LOGGER.info("Acquiring access token.") @@ -263,32 +321,7 @@ async def _login(self): except KeyError: raise SmartAPIError("Could not get access token from auth page") - data = json.dumps({"accessToken": access_token}).replace(" ", "") - r_api_access = await client.post( - # we do not know what type of car we have in our list so we fall back to the old API URL - self.endpoint_urls.get_api_base_url() + API_SESION_URL + "?identity_type=smart", - headers={ - **utils.generate_default_header( - self.device_id, - None, - params={ - "identity_type": "smart", - }, - method="POST", - url=API_SESION_URL, - body=data, - ) - }, - data=data, - ) - api_result = r_api_access.json() - _LOGGER.debug("API access result: %s", sanitize_log_data(api_result)) - try: - api_access_token = api_result["data"]["accessToken"] - api_refresh_token = api_result["data"]["refreshToken"] - api_user_id = api_result["data"]["userId"] - except KeyError: - raise SmartAPIError("Could not get API access token from API") + api_access_token, api_refresh_token, api_user_id = await self._get_api_session(client, access_token) return { "access_token": access_token, @@ -299,6 +332,241 @@ async def _login(self): "expires_at": expires_at, } + async def _get_api_session(self, client: "SmartLoginClient", access_token: str) -> tuple[str, str, str]: + """ + Exchange an OAuth access token for the API session tokens and the associated user id. + + Posts the given OAuth `access_token` to the API session endpoint and returns the API-level + access token, refresh token, and user identifier obtained from the response. + + Returns: + tuple[str, str, str]: (api_access_token, api_refresh_token, api_user_id) + + Raises: + SmartAPIError: If the response does not contain the expected token or user id fields. + """ + data = json.dumps({"accessToken": access_token}).replace(" ", "") + r_api_access = await client.post( + # we do not know what type of car we have in our list so we fall back to the old API URL + self.endpoint_urls.get_api_base_url() + API_SESION_URL + "?identity_type=smart", + headers={ + **utils.generate_default_header( + self.device_id, + None, + params={ + "identity_type": "smart", + }, + method="POST", + url=API_SESION_URL, + body=data, + ) + }, + data=data, + ) + api_result = r_api_access.json() + _LOGGER.debug("API access result: %s", sanitize_log_data(api_result)) + try: + api_access_token = api_result["data"]["accessToken"] + api_refresh_token = api_result["data"]["refreshToken"] + api_user_id = api_result["data"]["userId"] + except KeyError: + raise SmartAPIError("Could not get API access token from API") + return api_access_token, api_refresh_token, api_user_id + + async def _refresh_access_token_eu(self) -> dict: + """ + Attempt to refresh the EU (OAuth) access token and exchange it for API session tokens. + + If a refresh token is available, sends a refresh request to the OAuth token URL, computes a new expiry timestamp, exchanges the returned OAuth access token for API session tokens, and returns a mapping of tokens and metadata. Returns an empty dict if no refresh token is configured or if the refresh response does not yield an access token. + + Returns: + dict: A mapping with the following keys when successful: + - "access_token" (str): The refreshed OAuth access token. + - "refresh_token" (str): The refreshed OAuth refresh token (or the previous refresh token if not provided). + - "api_access_token" (str): The exchanged API access token for subsequent API calls. + - "api_refresh_token" (str): The exchanged API refresh token for API session renewal. + - "api_user_id" (str): The user identifier returned by the API session exchange. + - "id_token" (str | None): The ID token returned by the OAuth refresh response, if present. + - "expires_at" (datetime.datetime): UTC timestamp when the OAuth access token expires. + Returns an empty dict if no refresh was possible or the refresh response lacked an access token. + """ + if not self.refresh_token: + return {} + + ssl_ctx = await self.get_ssl_context() + async with SmartLoginClient(ssl_context=ssl_ctx) as client: + payload = { + "accessToken": "", + "refreshToken": self.refresh_token, + } + headers = { + "Content-Type": "application/json", + "User-Agent": "hello-smart/2.0.5 (Android)", + "X-API-Key": self.endpoint_urls.get_oauth_api_key(), + } + r_refresh = await client.post( + self.endpoint_urls.get_oauth_token_url(), + json=payload, + headers=headers, + ) + refresh_result = r_refresh.json() + access_token = refresh_result.get("accessToken") or refresh_result.get("access_token") + refresh_token = refresh_result.get("refreshToken") or refresh_result.get("refresh_token") + id_token = refresh_result.get("idToken") or refresh_result.get("id_token") + expires_in = refresh_result.get("expiresIn") or refresh_result.get("expires_in") + if not access_token: + return {} + + expires_at = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta( + seconds=int(expires_in) if expires_in else 3600 + ) + api_access_token, api_refresh_token, api_user_id = await self._get_api_session(client, access_token) + return { + "access_token": access_token, + "refresh_token": refresh_token or self.refresh_token, + "api_access_token": api_access_token, + "api_refresh_token": api_refresh_token, + "api_user_id": api_user_id, + "id_token": id_token, + "expires_at": expires_at, + } + + async def _login_global(self) -> dict: + """ + Perform the global (HMAC) login flow and return obtained tokens and related metadata. + + Returns: + dict: Mapping with the following keys: + - access_token (str): OAuth access token from the global login. + - refresh_token (str|None): Refresh token from the global login, if provided. + - api_access_token (str): API session access token (same as `access_token` for global flow). + - api_refresh_token (str|None): API session refresh token (same as `refresh_token` for global flow). + - api_user_id (str): User identifier returned by the global login. + - id_token (str|None): ID token returned by the global login, if present. + - expires_at (datetime.datetime): UTC timestamp when the access token expires. + """ + ssl_ctx = await self.get_ssl_context() + async with SmartLoginClient(ssl_context=ssl_ctx) as client: + _LOGGER.info("Acquiring access token (global app).") + + path = "/iam/service/api/v1/login" + payload = { + "email": self.username, + "password": self.password, + "imageSessionId": "", + "imageCode": "", + } + body = json.dumps(payload) + host = httpx.URL(self.endpoint_urls.get_api_base_url()).host + headers = utils.generate_global_header( + method="POST", + path=path, + host=host, + app_key=self.endpoint_urls.get_global_app_key(), + app_secret=self.endpoint_urls.get_global_app_secret(), + body=body, + ) + + r_login = await client.post( + self.endpoint_urls.get_api_base_url() + path, + headers=headers, + content=body, + ) + login_result = r_login.json() + _LOGGER.debug("Login result: %s", sanitize_log_data(login_result)) + data = login_result.get("data") or login_result.get("result") or {} + if not data: + message = login_result.get("message", "Unknown error") + code = login_result.get("code", "unknown") + raise SmartAPIError(f"Could not get tokens from global login: {code} {message}") + + access_token = data.get("accessToken") + refresh_token = data.get("refreshToken") + id_token = data.get("idToken") + api_user_id = data.get("userId") + expires_in = data.get("expiresIn") or data.get("expires_in") + if not access_token or not api_user_id: + raise SmartAPIError("Could not get access token from global login") + + expires_at = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta( + seconds=int(expires_in) if expires_in else HTTPX_TIMEOUT * 2 + ) + + return { + "access_token": access_token, + "refresh_token": refresh_token, + "api_access_token": access_token, + "api_refresh_token": refresh_token, + "api_user_id": api_user_id, + "id_token": id_token, + "expires_at": expires_at, + } + + async def _refresh_access_token_global(self) -> dict: + """ + Refresh the Global app access token and return the refreshed token set. + + Returns: + dict: Mapping with refreshed token and session fields: + - `access_token` (str): OAuth access token. + - `refresh_token` (str): OAuth refresh token (may be original if not returned). + - `api_access_token` (str): API session access token (same as `access_token`). + - `api_refresh_token` (str): API session refresh token (same as `refresh_token`). + - `api_user_id` (str): API user identifier. + - `id_token` (str | None): ID token if provided by the server. + - `expires_at` (datetime.datetime): UTC timestamp when the access token expires. + Returns an empty dict if no refresh token is available or the refresh attempt fails. + """ + if not self.refresh_token: + return {} + + ssl_ctx = await self.get_ssl_context() + async with SmartLoginClient(ssl_context=ssl_ctx) as client: + path = "/iam/service/api/v1/refresh/" + payload = {"refreshToken": self.refresh_token} + body = json.dumps(payload) + host = httpx.URL(self.endpoint_urls.get_api_base_url()).host + headers = utils.generate_global_header( + method="POST", + path=path, + host=host, + app_key=self.endpoint_urls.get_global_app_key(), + app_secret=self.endpoint_urls.get_global_app_secret(), + body=body, + ) + r_refresh = await client.post( + self.endpoint_urls.get_api_base_url() + path, + headers=headers, + content=body, + ) + refresh_result = r_refresh.json() + _LOGGER.debug("Refresh result: %s", sanitize_log_data(refresh_result)) + data = refresh_result.get("data") or refresh_result.get("result") or {} + if not data: + return {} + + access_token = data.get("accessToken") + refresh_token = data.get("refreshToken") or self.refresh_token + id_token = data.get("idToken") + api_user_id = data.get("userId") + expires_in = data.get("expiresIn") or data.get("expires_in") + if not access_token or not api_user_id: + return {} + + expires_at = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta( + seconds=int(expires_in) if expires_in else HTTPX_TIMEOUT * 2 + ) + + return { + "access_token": access_token, + "refresh_token": refresh_token, + "api_access_token": access_token, + "api_refresh_token": refresh_token, + "api_user_id": api_user_id, + "id_token": id_token, + "expires_at": expires_at, + } + class SmartLoginClient(httpx.AsyncClient): """Client to login to the Smart API.""" @@ -396,4 +664,4 @@ def get_retry_wait_time(response: httpx.Response) -> int: retry_after = next(iter([int(i) for i in response.json().get("message", "") if i.isdigit()])) except Exception: retry_after = 2 - return math.ceil(retry_after * 2) + return math.ceil(retry_after * 2) \ No newline at end of file diff --git a/pysmarthashtag/api/utils.py b/pysmarthashtag/api/utils.py index 1e32318..97a210d 100644 --- a/pysmarthashtag/api/utils.py +++ b/pysmarthashtag/api/utils.py @@ -4,6 +4,9 @@ import logging import secrets import time +import uuid +from email.utils import formatdate +from typing import Optional _LOGGER = logging.getLogger(__name__) @@ -70,5 +73,192 @@ def generate_default_header( def create_correct_timestamp() -> str: - """Create a correct timestamp for the request.""" + """ + Generate a timestamp string representing the current time in milliseconds since the Unix epoch. + + Returns: + timestamp (str): Current time in milliseconds since 1970-01-01 UTC, formatted as a decimal string. + """ return str(int(time.time() * 1000)) + + +def _ensure_bytes(body: Optional[object]) -> Optional[bytes]: + """ + Normalize a request body to a UTF-8 bytes object when present. + + Parameters: + body (Optional[object]): The value to normalize. If `None`, no conversion is performed. + + Returns: + Optional[bytes]: `None` if input is `None`; the input unchanged if already `bytes`; otherwise the UTF-8 encoding of `str(body)`. + """ + if body is None: + return None + if isinstance(body, bytes): + return body + return str(body).encode("utf-8") + + +def _global_md5_base64(body: bytes) -> str: + """ + Return the first 24 characters of the base64-encoded MD5 digest of `body`. + + Parameters: + body (bytes): Input bytes to hash. + + Returns: + str: First 24 characters of the base64-encoded MD5 digest. + """ + md5_hash = hashlib.md5(body).digest() + return base64.b64encode(md5_hash).decode("utf-8")[:24] + + +def _build_global_string_to_sign( + method: str, + path: str, + headers: dict[str, str], + content_md5: str = "", +) -> str: + """ + Construct the canonical string used to compute the HMAC-SHA256 signature for a Global API request. + + The resulting newline-separated string contains, in order: HTTP method, Accept header, the provided content MD5 value, Content-Type header, Date header, all `x-ca-*` headers (each as `key:value` on its own line), and the request path. This canonical string is intended to be the message passed to the signing HMAC. + + Parameters: + method (str): HTTP method (e.g., "GET", "POST"). + path (str): Request path, including query string if applicable. + headers (dict[str, str]): Request headers; values for "accept", "content-type", "date", and any `x-ca-*` headers are used. + content_md5 (str): Base64-encoded MD5 of the request body when present, or an empty string if absent. + + Returns: + str: The canonical string to sign with HMAC-SHA256. + """ + string_to_sign = [ + method, + headers.get("accept", ""), + content_md5, + headers.get("content-type", ""), + headers.get("date", ""), + ] + + ca_headers = [] + ca_header_names = [] + for key in sorted(headers.keys()): + if key.startswith("x-ca-"): + ca_headers.append(f"{key}:{headers[key]}") + ca_header_names.append(key) + + if ca_header_names: + headers["x-ca-signature-headers"] = ",".join(ca_header_names) + + string_to_sign.append("\n".join(ca_headers)) + string_to_sign.append(path) + + return "\n".join(string_to_sign) + + +def _generate_global_signature( + app_secret: str, + method: str, + path: str, + headers: dict[str, str], + body: Optional[bytes] = None, +) -> str: + """ + Create the HMAC-SHA256 signature used for Global API requests. + + If a request body is provided, its MD5 (base64, truncated to 24 chars) is computed and inserted into headers["content-md5"] before signing. The function builds the canonical string-to-sign from method, path, and headers, then returns the base64-encoded HMAC-SHA256 of that string using app_secret as the key. + + Parameters: + headers (dict[str, str]): Request headers; this dict will be mutated to include "content-md5" when a body is provided. + + Returns: + str: The base64-encoded HMAC-SHA256 signature. + """ + content_md5 = "" + if body is not None: + content_md5 = _global_md5_base64(body) + headers["content-md5"] = content_md5 + + string_to_sign = _build_global_string_to_sign(method, path, headers, content_md5) + signature = hmac.new( + app_secret.encode("utf-8"), + string_to_sign.encode("utf-8"), + hashlib.sha256, + ).digest() + return base64.b64encode(signature).decode("utf-8") + + +def generate_global_header( + method: str, + path: str, + host: str, + app_key: str, + app_secret: str, + body: Optional[object] = None, + content_type: str = "application/json", + access_token: Optional[str] = None, + user_id: Optional[str] = None, + id_token: Optional[str] = None, + extra_headers: Optional[dict[str, str]] = None, +) -> dict[str, str]: + """ + Builds HTTP headers for a Global API request and signs them with HMAC-SHA256. + + Assembles standard headers (date, content-type, host, user-agent, x-ca-timestamp, x-ca-nonce, x-ca-key, etc.), conditionally includes Authorization/x-smart-id/Xs-Auth-Token when provided, merges any extra_headers, and computes the `x-ca-signature` header using the provided `app_secret`. + + Parameters: + method (str): HTTP method (e.g., "GET", "POST") used when computing the signature. + path (str): Request path (URI) used in the signature calculation. + host (str): Host header value for the request. + app_key (str): Application key inserted as `x-ca-key`. + app_secret (str): Secret used to compute the HMAC-SHA256 signature. + body (Optional[object]): Request body; if provided it will be converted to bytes and included in the signature computation. + content_type (str): Value for `content-type` and `accept` headers. Defaults to "application/json". + access_token (Optional[str]): If provided, added as `Authorization: Bearer `. + user_id (Optional[str]): If provided, added as `x-smart-id`. + id_token (Optional[str]): If provided, added as `Xs-Auth-Token` (and `Xs-App-Ver` is set). + extra_headers (Optional[dict[str, str]]): Additional headers to merge into the final header set. + + Returns: + dict[str, str]: A dictionary of HTTP headers ready to attach to the request, including the computed `x-ca-signature`. + """ + timestamp = create_correct_timestamp() + nonce = str(uuid.uuid4()) + http_date = formatdate(timeval=None, localtime=False, usegmt=True) + + headers = { + "date": http_date, + "x-ca-timestamp": timestamp, + "x-ca-nonce": nonce, + "x-ca-key": app_key, + "x-ca-signature-method": "HmacSHA256", + "CA_VERSION": "1", + "content-type": content_type, + "accept": content_type, + "host": host, + "user-agent": "ALIYUN-ANDROID-DEMO", + } + + if access_token: + headers["Authorization"] = f"Bearer {access_token}" + if user_id: + headers["x-smart-id"] = user_id + if id_token: + headers["Xs-Auth-Token"] = id_token + headers["Xs-App-Ver"] = "1.0.8" + + if extra_headers: + headers.update(extra_headers) + + body_bytes = _ensure_bytes(body) + headers["x-ca-signature"] = _generate_global_signature( + app_secret, + method, + path, + headers, + body_bytes, + ) + + _LOGGER.debug("Constructed global request header for %s %s", method, path) + return headers \ No newline at end of file diff --git a/pysmarthashtag/cli.py b/pysmarthashtag/cli.py old mode 100755 new mode 100644 index 8089f89..ce1798e --- a/pysmarthashtag/cli.py +++ b/pysmarthashtag/cli.py @@ -24,7 +24,14 @@ def environ_or_required(key): def main_parser() -> argparse.ArgumentParser: - """Create argument parser.""" + """ + Create and return the CLI ArgumentParser and configure module logging from SMART_LOG_LEVEL. + + The parser is configured with subcommands: `status`, `info`, `watch` (with `-i` interval), `climate` (with `--vin`, `--temp`, `--active`), and `seatheating` (with `--vin`, `--level`, `--temp`, `--active`). Default authentication/region arguments are added via the module helper and the parser's default `func` is set to the command dispatcher. + + Returns: + argparse.ArgumentParser: A fully configured ArgumentParser for the CLI. + """ logging_config = { "version": 1, @@ -56,6 +63,8 @@ def main_parser() -> argparse.ArgumentParser: }, } + log_level = os.environ.get("SMART_LOG_LEVEL", "INFO").upper() + logging_config["loggers"]["pysmarthashtag"]["level"] = log_level logging.config.dictConfig(logging_config) parser = argparse.ArgumentParser(description="Smart API demo") @@ -165,26 +174,32 @@ async def set_seatheating(args) -> None: def _add_default_args(parser: argparse.ArgumentParser): - """Add the default arguments username, password to the parser.""" + """ + Add standard CLI options for Smart account credentials and API region. + + Adds the following arguments to the provided ArgumentParser: + - `--username` and `--password`: use values from `SMART_USERNAME` / `SMART_PASSWORD` when present; marked required if the corresponding environment variable is not set. + - `--region`: selects the Smart API region with choices `eu`, `intl`, `global`; defaults to the `SMART_REGION` environment variable or `"eu"` when not set. + """ parser.add_argument("--username", help="Smart username", **environ_or_required("SMART_USERNAME")) parser.add_argument("--password", help="Smart password", **environ_or_required("SMART_PASSWORD")) parser.add_argument( "--region", - help="Region for Smart API (eu=Europe, intl=International/Australia/Asia-Pacific)", - choices=["eu", "intl"], + help="Region for Smart API (eu=Europe, intl=Asia-Pacific, global=Australia/Israel)", + choices=["eu", "intl", "global"], default=os.environ.get("SMART_REGION", "eu"), ) def _get_endpoint_urls_from_args(args) -> EndpointUrls: - """Get EndpointUrls based on region argument. - - Args: - args: Parsed command line arguments containing the region. - + """ + Return endpoint URLs for the SmartRegion specified in args.region. + + Parameters: + args: Parsed command-line arguments with a `region` attribute ('eu', 'intl', or 'global'). + Returns: - EndpointUrls configured for the specified region. - + EndpointUrls: Endpoint URLs configured for the specified region. """ region = SmartRegion(args.region) return get_endpoint_urls_for_region(region) @@ -199,4 +214,4 @@ def main(): if __name__ == "__main__": - main() + main() \ No newline at end of file diff --git a/pysmarthashtag/const.py b/pysmarthashtag/const.py index 963b721..cb82b1f 100644 --- a/pysmarthashtag/const.py +++ b/pysmarthashtag/const.py @@ -15,6 +15,13 @@ API_SELECT_CAR_URL = "/device-platform/user/session/update" API_TELEMATICS_URL = "/remote-control/vehicle/telematics/" +GLOBAL_API_BASE_URL = "https://sg-app-api.smart.com" +GLOBAL_APP_KEY = "204587190" +GLOBAL_APP_SECRET = "vxnzkHbpQrkKKQKmFBZlOnL780rjXLFT" + +EU_OAUTH_BASE_URL = "https://api.app-auth.srv.smart.com/v1/" +EU_OAUTH_API_KEY = "yHpsjnd9vzLq7GMowxBa" + OTA_SERVER_URL = "https://ota.srv.smart.com/" HTTPX_TIMEOUT = 30.0 @@ -31,18 +38,29 @@ class SmartRegion(str, Enum): EU = "eu" INTL = "intl" + GLOBAL = "global" + + +class SmartAuthMode(str, Enum): + """Authentication mode for Smart APIs.""" + + EU_OAUTH = "eu_oauth" + GLOBAL_HMAC = "global_hmac" def get_endpoint_urls_for_region(region: SmartRegion) -> "EndpointUrls": """Get pre-configured EndpointUrls for a specific region. Args: + ---- region: The region to get endpoint URLs for. Returns: + ------- EndpointUrls configured for the specified region. Example: + ------- >>> from pysmarthashtag.const import SmartRegion, get_endpoint_urls_for_region >>> from pysmarthashtag.account import SmartAccount >>> @@ -65,6 +83,12 @@ def get_endpoint_urls_for_region(region: SmartRegion) -> "EndpointUrls": api_base_url="https://api.ecloudap.com", api_base_url_v2="https://apiv2.ecloudap.com", ) + elif region == SmartRegion.GLOBAL: + # Global app region (Australia/Israel) - uses sg-app-api endpoints + return EndpointUrls( + api_base_url=GLOBAL_API_BASE_URL, + api_base_url_v2=GLOBAL_API_BASE_URL, + ) else: raise ValueError(f"Unknown region: {region}") @@ -84,6 +108,10 @@ class EndpointUrls: api_base_url: Optional[str] = None api_base_url_v2: Optional[str] = None ota_server_url: Optional[str] = None + oauth_base_url: Optional[str] = None + oauth_api_key: Optional[str] = None + global_app_key: Optional[str] = None + global_app_secret: Optional[str] = None def get_api_key(self) -> str: """Get the API key, using the default if not set.""" @@ -118,5 +146,79 @@ def get_api_base_url_v2(self) -> str: return self.api_base_url_v2 if self.api_base_url_v2 is not None else API_BASE_URL_V2 def get_ota_server_url(self) -> str: - """Get the OTA server URL, using the default if not set.""" + """ + Return the configured OTA server URL or the module default. + + Returns: + str: The OTA server URL; the configured `ota_server_url` when present, otherwise `OTA_SERVER_URL`. + """ return self.ota_server_url if self.ota_server_url is not None else OTA_SERVER_URL + + def get_oauth_base_url(self) -> str: + """ + Return the OAuth base URL to use for token requests. + + Returns: + The configured OAuth base URL string, or `EU_OAUTH_BASE_URL` if no override is set. + """ + return self.oauth_base_url if self.oauth_base_url is not None else EU_OAUTH_BASE_URL + + def get_oauth_api_key(self) -> str: + """ + Return the configured OAuth API key or the default if none is configured. + + Returns: + str: The configured OAuth API key if set, otherwise the module default `EU_OAUTH_API_KEY`. + """ + return self.oauth_api_key if self.oauth_api_key is not None else EU_OAUTH_API_KEY + + def get_oauth_token_url(self) -> str: + """ + Builds the OAuth token endpoint URL for the configured OAuth base URL. + + Returns: + The full token endpoint URL (the OAuth base URL with a single trailing '/token'). + """ + return f"{self.get_oauth_base_url().rstrip('/')}/token" + + def get_global_app_key(self) -> str: + """ + Return the configured global app key or the default. + + Returns: + global_app_key (str): The configured global app key if set, otherwise the module default GLOBAL_APP_KEY. + """ + return self.global_app_key if self.global_app_key is not None else GLOBAL_APP_KEY + + def get_global_app_secret(self) -> str: + """ + Return the configured global app secret for this endpoint. + + Returns: + str: The global app secret set on this instance, or the module default `GLOBAL_APP_SECRET` if none is configured. + """ + return self.global_app_secret if self.global_app_secret is not None else GLOBAL_APP_SECRET + + def _is_global_api_base_url(self, api_base_url: str) -> bool: + """ + Determine whether `api_base_url` matches the module's global API base URL (ignoring a trailing slash). + + Returns: + True if `api_base_url` equals `GLOBAL_API_BASE_URL` after removing a trailing slash, False otherwise. + """ + normalized = api_base_url.rstrip("/") + # Default global API base URL, plus any additional explicit variants if needed. + global_base = GLOBAL_API_BASE_URL.rstrip("/") + return normalized == global_base + + def infer_auth_mode(self) -> SmartAuthMode: + """ + Selects the authentication mode to use based on the configured API base URL. + + Returns: + SmartAuthMode: `SmartAuthMode.GLOBAL_HMAC` if the configured API base URL matches the global API base URL, `SmartAuthMode.EU_OAUTH` otherwise. + """ + api_base_url = self.get_api_base_url() + if self._is_global_api_base_url(api_base_url): + return SmartAuthMode.GLOBAL_HMAC + return SmartAuthMode.EU_OAUTH \ No newline at end of file diff --git a/pysmarthashtag/tests/common.py b/pysmarthashtag/tests/common.py index 2fa5391..f9fbf75 100644 --- a/pysmarthashtag/tests/common.py +++ b/pysmarthashtag/tests/common.py @@ -1,5 +1,8 @@ """Fixtures for Smart tests.""" +import json + +import httpx import respx from pysmarthashtag.const import ( @@ -9,6 +12,8 @@ API_SELECT_CAR_URL, API_SESION_URL, AUTH_URL, + EU_OAUTH_BASE_URL, + GLOBAL_API_BASE_URL, LOGIN_URL, OTA_SERVER_URL, SERVER_URL, @@ -16,6 +21,50 @@ from pysmarthashtag.tests import RESPONSE_DIR, load_response +def _create_vehicle_details_handler(): + """Create a handler function for global vehicle details requests. + + Returns a function that returns different mock responses based on VIN in request body. + """ + + def vehicle_details_handler(request, route): + body = json.loads(request.content) + vin = body.get("vin") + if vin == "TestVIN0000000001": + return httpx.Response(200, json=load_response(RESPONSE_DIR / "global_vehicle_details.json")) + elif vin == "TestVIN0000000002": + return httpx.Response(200, json=load_response(RESPONSE_DIR / "global_vehicle_details2.json")) + return httpx.Response(404, json={"code": "404", "message": "Vehicle not found"}) + + return vehicle_details_handler + + +def _add_global_vehicle_routes(router: respx.MockRouter) -> None: + """Add global vehicle routes to the given router. + + Args: + ---- + router: The MockRouter to add routes to + + """ + # Global vehicle ownership list + router.post(GLOBAL_API_BASE_URL + "/vc/vehicle/v1/ownership/list").respond( + 200, + json=load_response(RESPONSE_DIR / "global_vehicle_list.json"), + ) + + # Global vehicle details - use side_effect to return different responses based on request + router.post(GLOBAL_API_BASE_URL + "/vc/vehicle/v1/vehicleCustomerInfo").mock( + side_effect=_create_vehicle_details_handler() + ) + + # Global vehicle abilities (need to handle dynamic model codes) + router.route(method="GET", url__regex=r"^" + GLOBAL_API_BASE_URL + r"/vc/vehicle/v1/ability/.+/.+$").respond( + 200, + json=load_response(RESPONSE_DIR / "global_vehicle_abilities.json"), + ) + + class SmartMockRouter(respx.MockRouter): """Stateful MockRouter for Smart APIs.""" @@ -32,7 +81,18 @@ def __init__( # # # # # # # # # # # # # # # # # # # # # # # # def add_login_routes(self) -> None: - """Add routes for login.""" + """ + Register mocked HTTP routes for the Smart API login flow and related endpoints. + + This sets up a stateful sequence of mocked responses used by tests, including: + - initial server redirect and authentication context endpoints, + - login endpoint returning a test session token and intermediate redirect, + - OAuth token endpoint returning access/refresh/id tokens, + - SMART session and vehicle-related endpoints (for both API_BASE_URL and API_BASE_URL_V2) used to fetch car lists, select a car, retrieve vehicle status and SOC, and perform telematics/remote-control actions, + - OTA app info endpoints for test VINs. + + Responses use predefined JSON fixtures loaded from the test RESPONSE_DIR. + """ # Login context self.get(SERVER_URL).respond(302, headers={"location": load_response(RESPONSE_DIR / "auth_context.url")}) @@ -52,6 +112,15 @@ def add_login_routes(self) -> None: json=load_response(RESPONSE_DIR / "login_result.json"), headers={"location": load_response(RESPONSE_DIR / "auth_result.url")}, ) + self.post(EU_OAUTH_BASE_URL + "token").respond( + 200, + json={ + "accessToken": "TestAccessToken", + "refreshToken": "TestRefreshToken", + "idToken": "TestIdToken", + "expiresIn": 3600, + }, + ) for base_url in [API_BASE_URL, API_BASE_URL_V2]: self.post(base_url + API_SESION_URL + "?identity_type=smart").respond( 200, @@ -96,3 +165,42 @@ def add_login_routes(self) -> None: 200, json=load_response(RESPONSE_DIR / "ota_response.json"), ) + + def add_global_routes(self) -> None: + """Add routes for global authentication mode.""" + _add_global_vehicle_routes(self) + + +class SmartGlobalMockRouter(respx.MockRouter): + """Stateful MockRouter for Smart Global APIs.""" + + def __init__( + self, + ) -> None: + """Initialize the SmartGlobalMockRouter with clean responses for global auth.""" + super().__init__(assert_all_called=False) + + self.add_login_routes() + self.add_global_routes() + + def add_login_routes(self) -> None: + """Add routes for login (global HMAC flow).""" + # Global login uses a different endpoint + self.post(GLOBAL_API_BASE_URL + "/iam/service/api/v1/login").respond( + 200, + json={ + "code": "0000", + "message": "success", + "data": { + "accessToken": "TestGlobalAccessToken", + "refreshToken": "TestGlobalRefreshToken", + "idToken": "TestGlobalIdToken", + "userId": "112233", + "expiresIn": 3600, + }, + }, + ) + + def add_global_routes(self) -> None: + """Add routes for global authentication mode.""" + _add_global_vehicle_routes(self) diff --git a/pysmarthashtag/tests/replys/global_login_result.json b/pysmarthashtag/tests/replys/global_login_result.json new file mode 100644 index 0000000..a290c62 --- /dev/null +++ b/pysmarthashtag/tests/replys/global_login_result.json @@ -0,0 +1,11 @@ +{ + "code": "0000", + "message": "Success", + "data": { + "accessToken": "TestGlobalAccessToken", + "refreshToken": "TestGlobalRefreshToken", + "idToken": "TestGlobalIdToken", + "userId": "global123456", + "expiresIn": 3600 + } +} diff --git a/pysmarthashtag/tests/replys/global_refresh_result.json b/pysmarthashtag/tests/replys/global_refresh_result.json new file mode 100644 index 0000000..1813f7a --- /dev/null +++ b/pysmarthashtag/tests/replys/global_refresh_result.json @@ -0,0 +1,11 @@ +{ + "code": "0000", + "message": "Success", + "data": { + "accessToken": "TestGlobalRefreshedAccessToken", + "refreshToken": "TestGlobalRefreshedRefreshToken", + "idToken": "TestGlobalRefreshedIdToken", + "userId": "global123456", + "expiresIn": 3600 + } +} diff --git a/pysmarthashtag/tests/replys/global_vehicle_abilities.json b/pysmarthashtag/tests/replys/global_vehicle_abilities.json new file mode 100644 index 0000000..80fcd28 --- /dev/null +++ b/pysmarthashtag/tests/replys/global_vehicle_abilities.json @@ -0,0 +1,25 @@ +{ + "code": "0000", + "message": "success", + "result": { + "remoteControl": { + "climate": true, + "lock": true, + "unlock": true, + "horn": true, + "light": true + }, + "chargingControl": { + "startCharging": true, + "stopCharging": true, + "setChargingLimit": true + }, + "vehicleStatus": { + "battery": true, + "climate": true, + "doors": true, + "location": true, + "windows": true + } + } +} diff --git a/pysmarthashtag/tests/replys/global_vehicle_details.json b/pysmarthashtag/tests/replys/global_vehicle_details.json new file mode 100644 index 0000000..083f332 --- /dev/null +++ b/pysmarthashtag/tests/replys/global_vehicle_details.json @@ -0,0 +1,25 @@ +{ + "code": "0000", + "message": "success", + "result": [ + { + "vin": "TestVIN0000000001", + "modelCode": "HX11_EUL_Premium+_RWD_000", + "modelName": "Smart #1", + "seriesName": "HX11", + "seriesCodeVs": "HX11", + "colorCode": "087", + "colorName": "SPECTRUM BLUE", + "factoryCode": "1234", + "engineNo": "ABCDEFABC", + "plateNo": "", + "iccid": "01234567890123456789", + "msisdn": "112233445566770", + "updateTime": 1706199500000, + "vehicleType": 0, + "tboxPlatform": "tsp", + "ihuPlatform": "tsp", + "proprietaryPlatform": 0 + } + ] +} diff --git a/pysmarthashtag/tests/replys/global_vehicle_details2.json b/pysmarthashtag/tests/replys/global_vehicle_details2.json new file mode 100644 index 0000000..7b5ebca --- /dev/null +++ b/pysmarthashtag/tests/replys/global_vehicle_details2.json @@ -0,0 +1,25 @@ +{ + "code": "0000", + "message": "success", + "result": [ + { + "vin": "TestVIN0000000002", + "modelCode": "HY11_EUL_Premium+_RWD_000", + "modelName": "Smart #3", + "seriesName": "HY11", + "seriesCodeVs": "HY11", + "colorCode": "087", + "colorName": "SPECTRUM BLUE", + "factoryCode": "1234", + "engineNo": "ABCDEFABC", + "plateNo": "", + "iccid": "01234567890123456789", + "msisdn": "112233445566770", + "updateTime": 1706199500000, + "vehicleType": 0, + "tboxPlatform": "tsp", + "ihuPlatform": "tsp", + "proprietaryPlatform": 0 + } + ] +} diff --git a/pysmarthashtag/tests/replys/global_vehicle_list.json b/pysmarthashtag/tests/replys/global_vehicle_list.json new file mode 100644 index 0000000..bb4c94e --- /dev/null +++ b/pysmarthashtag/tests/replys/global_vehicle_list.json @@ -0,0 +1,22 @@ +{ + "code": "0000", + "message": "success", + "result": [ + { + "vin": "TestVIN0000000001", + "modelCode": "HX11_EUL_Premium+_RWD_000", + "modelName": "Smart #1", + "seriesName": "HX11", + "colorCode": "087", + "colorName": "SPECTRUM BLUE" + }, + { + "vin": "TestVIN0000000002", + "modelCode": "HY11_EUL_Premium+_RWD_000", + "modelName": "Smart #3", + "seriesName": "HY11", + "colorCode": "087", + "colorName": "SPECTRUM BLUE" + } + ] +} diff --git a/pysmarthashtag/tests/test_authentication.py b/pysmarthashtag/tests/test_authentication.py new file mode 100644 index 0000000..2398f65 --- /dev/null +++ b/pysmarthashtag/tests/test_authentication.py @@ -0,0 +1,279 @@ +"""Test authentication module.""" + +import datetime +import logging + +import pytest +import respx +from httpx import Response + +from pysmarthashtag.api.authentication import SmartAuthentication +from pysmarthashtag.const import ( + GLOBAL_API_BASE_URL, + SmartAuthMode, + SmartRegion, + get_endpoint_urls_for_region, +) +from pysmarthashtag.models import SmartAPIError +from pysmarthashtag.tests import RESPONSE_DIR, TEST_PASSWORD, TEST_USERNAME, load_response + +_LOGGER = logging.getLogger(__name__) + + +@pytest.mark.asyncio +async def test_login_global(): + """Test the Global HMAC login flow.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + + with respx.mock: + # Mock the global login endpoint + login_route = respx.post(GLOBAL_API_BASE_URL + "/iam/service/api/v1/login").mock( + return_value=Response(200, json=load_response(RESPONSE_DIR / "global_login_result.json")) + ) + + auth = SmartAuthentication( + username=TEST_USERNAME, + password=TEST_PASSWORD, + endpoint_urls=endpoint_urls, + ) + + # Verify auth_mode is correctly inferred + assert auth.auth_mode == SmartAuthMode.GLOBAL_HMAC + + # Call _login_global directly + result = await auth._login_global() + + # Verify the login endpoint was called + assert login_route.called + + # Verify the result contains all expected fields + assert "access_token" in result + assert "refresh_token" in result + assert "api_access_token" in result + assert "api_refresh_token" in result + assert "api_user_id" in result + assert "id_token" in result + assert "expires_at" in result + + # Verify the token values are correct + assert result["access_token"] == "TestGlobalAccessToken" + assert result["refresh_token"] == "TestGlobalRefreshToken" + assert result["api_access_token"] == "TestGlobalAccessToken" + assert result["api_refresh_token"] == "TestGlobalRefreshToken" + assert result["api_user_id"] == "global123456" + assert result["id_token"] == "TestGlobalIdToken" + + # Verify expires_at is a datetime in the future + assert isinstance(result["expires_at"], datetime.datetime) + assert result["expires_at"] > datetime.datetime.now(datetime.timezone.utc) + + +@pytest.mark.asyncio +async def test_refresh_access_token_global(): + """Test the Global HMAC token refresh flow.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + + with respx.mock: + # Mock the global refresh endpoint + refresh_route = respx.post(GLOBAL_API_BASE_URL + "/iam/service/api/v1/refresh/").mock( + return_value=Response(200, json=load_response(RESPONSE_DIR / "global_refresh_result.json")) + ) + + auth = SmartAuthentication( + username=TEST_USERNAME, + password=TEST_PASSWORD, + refresh_token="TestGlobalRefreshToken", + endpoint_urls=endpoint_urls, + ) + + # Verify auth_mode is correctly inferred + assert auth.auth_mode == SmartAuthMode.GLOBAL_HMAC + + # Call _refresh_access_token_global directly + result = await auth._refresh_access_token_global() + + # Verify the refresh endpoint was called + assert refresh_route.called + + # Verify the result contains all expected fields + assert "access_token" in result + assert "refresh_token" in result + assert "api_access_token" in result + assert "api_refresh_token" in result + assert "api_user_id" in result + assert "id_token" in result + assert "expires_at" in result + + # Verify the token values are correct + assert result["access_token"] == "TestGlobalRefreshedAccessToken" + assert result["refresh_token"] == "TestGlobalRefreshedRefreshToken" + assert result["api_access_token"] == "TestGlobalRefreshedAccessToken" + assert result["api_refresh_token"] == "TestGlobalRefreshedRefreshToken" + assert result["api_user_id"] == "global123456" + assert result["id_token"] == "TestGlobalRefreshedIdToken" + + # Verify expires_at is a datetime in the future + assert isinstance(result["expires_at"], datetime.datetime) + assert result["expires_at"] > datetime.datetime.now(datetime.timezone.utc) + + +@pytest.mark.asyncio +async def test_refresh_access_token_global_no_refresh_token(): + """Test the Global HMAC token refresh returns empty dict when no refresh token.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + + auth = SmartAuthentication( + username=TEST_USERNAME, + password=TEST_PASSWORD, + endpoint_urls=endpoint_urls, + ) + + # Verify auth_mode is correctly inferred + assert auth.auth_mode == SmartAuthMode.GLOBAL_HMAC + + # Call _refresh_access_token_global directly without refresh token + result = await auth._refresh_access_token_global() + + # Should return empty dict + assert result == {} + + +@pytest.mark.asyncio +async def test_auth_mode_selection_global(): + """Test that auth_mode selection triggers Global HMAC code paths.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + + with respx.mock: + # Mock the global login endpoint + login_route = respx.post(GLOBAL_API_BASE_URL + "/iam/service/api/v1/login").mock( + return_value=Response(200, json=load_response(RESPONSE_DIR / "global_login_result.json")) + ) + + auth = SmartAuthentication( + username=TEST_USERNAME, + password=TEST_PASSWORD, + endpoint_urls=endpoint_urls, + ) + + # Verify auth_mode is correctly inferred as GLOBAL_HMAC + assert auth.auth_mode == SmartAuthMode.GLOBAL_HMAC + + # Call the main _login method which should route to _login_global + result = await auth._login() + + # Verify the global login endpoint was called (not EU OAuth) + assert login_route.called + + # Verify the result contains global tokens + assert result["access_token"] == "TestGlobalAccessToken" + assert result["api_user_id"] == "global123456" + + +@pytest.mark.asyncio +async def test_auth_mode_selection_global_refresh(): + """Test that auth_mode selection triggers Global HMAC refresh code path.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + + with respx.mock: + # Mock the global refresh endpoint + refresh_route = respx.post(GLOBAL_API_BASE_URL + "/iam/service/api/v1/refresh/").mock( + return_value=Response(200, json=load_response(RESPONSE_DIR / "global_refresh_result.json")) + ) + + auth = SmartAuthentication( + username=TEST_USERNAME, + password=TEST_PASSWORD, + refresh_token="TestGlobalRefreshToken", + endpoint_urls=endpoint_urls, + ) + + # Verify auth_mode is correctly inferred as GLOBAL_HMAC + assert auth.auth_mode == SmartAuthMode.GLOBAL_HMAC + + # Call the main _refresh_access_token method which should route to _refresh_access_token_global + result = await auth._refresh_access_token() + + # Verify the global refresh endpoint was called (not EU OAuth) + assert refresh_route.called + + # Verify the result contains refreshed global tokens + assert result["access_token"] == "TestGlobalRefreshedAccessToken" + assert result["api_user_id"] == "global123456" + + +@pytest.mark.asyncio +async def test_login_global_missing_access_token(): + """Test that _login_global raises error when access token is missing.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + + with respx.mock: + # Mock the global login endpoint with incomplete response + respx.post(GLOBAL_API_BASE_URL + "/iam/service/api/v1/login").mock( + return_value=Response( + 200, json={"code": "0000", "message": "Success", "data": {"userId": "global123456", "expiresIn": 3600}} + ) + ) + + auth = SmartAuthentication( + username=TEST_USERNAME, + password=TEST_PASSWORD, + endpoint_urls=endpoint_urls, + ) + + # Should raise SmartAPIError when access_token is missing + with pytest.raises(SmartAPIError) as exc_info: + await auth._login_global() + + assert "Could not get access token from global login" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_login_global_missing_user_id(): + """Test that _login_global raises error when user ID is missing.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + + with respx.mock: + # Mock the global login endpoint with incomplete response + respx.post(GLOBAL_API_BASE_URL + "/iam/service/api/v1/login").mock( + return_value=Response( + 200, + json={"code": "0000", "message": "Success", "data": {"accessToken": "TestToken", "expiresIn": 3600}}, + ) + ) + + auth = SmartAuthentication( + username=TEST_USERNAME, + password=TEST_PASSWORD, + endpoint_urls=endpoint_urls, + ) + + # Should raise SmartAPIError when userId is missing + with pytest.raises(SmartAPIError) as exc_info: + await auth._login_global() + + assert "Could not get access token from global login" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_login_global_no_data_field(): + """Test that _login_global raises error when data field is missing.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + + with respx.mock: + # Mock the global login endpoint with no data field + respx.post(GLOBAL_API_BASE_URL + "/iam/service/api/v1/login").mock( + return_value=Response(200, json={"code": "1001", "message": "Invalid credentials"}) + ) + + auth = SmartAuthentication( + username=TEST_USERNAME, + password=TEST_PASSWORD, + endpoint_urls=endpoint_urls, + ) + + # Should raise SmartAPIError when data is missing + with pytest.raises(SmartAPIError) as exc_info: + await auth._login_global() + + assert "Could not get tokens from global login" in str(exc_info.value) + assert "Invalid credentials" in str(exc_info.value) diff --git a/pysmarthashtag/tests/test_endpoint_urls.py b/pysmarthashtag/tests/test_endpoint_urls.py index f3bf8a9..b712fce 100644 --- a/pysmarthashtag/tests/test_endpoint_urls.py +++ b/pysmarthashtag/tests/test_endpoint_urls.py @@ -9,6 +9,7 @@ API_BASE_URL, API_KEY, AUTH_URL, + GLOBAL_API_BASE_URL, LOGIN_URL, OTA_SERVER_URL, SERVER_URL, @@ -202,6 +203,7 @@ def test_region_enum_values(self): """Test that SmartRegion enum has expected values.""" assert SmartRegion.EU.value == "eu" assert SmartRegion.INTL.value == "intl" + assert SmartRegion.GLOBAL.value == "global" def test_account_with_eu_region(self): """Test creating SmartAccount with EU region preset.""" @@ -233,3 +235,9 @@ def test_authentication_with_intl_region(self): endpoint_urls=urls, ) assert auth.endpoint_urls.get_api_base_url() == "https://api.ecloudap.com" + + def test_region_global_returns_global_endpoints(self): + """Test that GLOBAL region returns sg-app-api endpoints.""" + urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + assert urls.get_api_base_url() == GLOBAL_API_BASE_URL + assert urls.get_api_base_url_v2() == GLOBAL_API_BASE_URL diff --git a/pysmarthashtag/tests/test_global_auth.py b/pysmarthashtag/tests/test_global_auth.py new file mode 100644 index 0000000..da64daf --- /dev/null +++ b/pysmarthashtag/tests/test_global_auth.py @@ -0,0 +1,126 @@ +"""Tests for Smart Global authentication mode.""" + +import logging + +import pytest +import respx + +from pysmarthashtag.account import SmartAccount +from pysmarthashtag.const import GLOBAL_API_BASE_URL, SmartRegion, get_endpoint_urls_for_region +from pysmarthashtag.tests import TEST_PASSWORD, TEST_USERNAME +from pysmarthashtag.tests.common import SmartGlobalMockRouter + +_LOGGER = logging.getLogger(__name__) + + +@pytest.fixture +def smart_global_fixture(request: pytest.FixtureRequest): + """Patch Smart Global API calls.""" + router = SmartGlobalMockRouter() + + with router: + yield router + + +async def create_global_account_with_vehicles(): + """Create and initialize account with global endpoints and get vehicles.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + account = SmartAccount(TEST_USERNAME, TEST_PASSWORD, endpoint_urls=endpoint_urls) + await account.get_vehicles() + return account + + +@pytest.mark.asyncio +async def test_global_login(smart_global_fixture: respx.Router): + """Test the login flow with global authentication.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + account = SmartAccount(TEST_USERNAME, TEST_PASSWORD, endpoint_urls=endpoint_urls) + await account.login() + assert account.config.authentication.access_token is not None + + +@pytest.mark.asyncio +async def test_init_vehicles_global(smart_global_fixture: respx.Router): + """Test _init_vehicles_global() correctly parses vehicles from global API.""" + account = await create_global_account_with_vehicles() + + # Verify that vehicles were initialized + assert account is not None + assert account.vehicles is not None + assert len(account.vehicles) == 2 + + # Verify vehicle VINs + assert "TestVIN0000000001" in account.vehicles + assert "TestVIN0000000002" in account.vehicles + + # Verify vehicle data + vehicle1 = account.vehicles["TestVIN0000000001"] + assert vehicle1.vin == "TestVIN0000000001" + assert vehicle1.data.get("modelCode") == "HX11_EUL_Premium+_RWD_000" + assert vehicle1.data.get("modelName") == "Smart #1" + + vehicle2 = account.vehicles["TestVIN0000000002"] + assert vehicle2.vin == "TestVIN0000000002" + assert vehicle2.data.get("modelCode") == "HY11_EUL_Premium+_RWD_000" + assert vehicle2.data.get("modelName") == "Smart #3" + + +@pytest.mark.asyncio +async def test_update_global_vehicle_details(smart_global_fixture: respx.Router): + """Test _update_global_vehicle_details() correctly populates combine_data and abilities.""" + account = await create_global_account_with_vehicles() + + # Verify that vehicles have detailed data from global API + vehicle1 = account.vehicles["TestVIN0000000001"] + + # Check that combine_data was called with details from global API + # The global_vehicle_details.json contains additional vehicle information + assert vehicle1.data is not None + assert vehicle1.data.get("vin") == "TestVIN0000000001" + + # Verify that abilities were populated from global API + assert "abilities" in vehicle1.data + abilities = vehicle1.data["abilities"] + assert abilities is not None + + # Verify expected abilities structure + assert "remoteControl" in abilities + assert "chargingControl" in abilities + assert "vehicleStatus" in abilities + + # Verify specific abilities + assert abilities["remoteControl"]["climate"] is True + assert abilities["chargingControl"]["startCharging"] is True + assert abilities["vehicleStatus"]["battery"] is True + + +@pytest.mark.asyncio +async def test_global_with_endpoint_urls(smart_global_fixture: respx.Router): + """Test global authentication using EndpointUrls directly.""" + from pysmarthashtag.const import EndpointUrls + + endpoint_urls = EndpointUrls( + api_base_url=GLOBAL_API_BASE_URL, + api_base_url_v2=GLOBAL_API_BASE_URL, + ) + account = SmartAccount(TEST_USERNAME, TEST_PASSWORD, endpoint_urls=endpoint_urls) + await account.get_vehicles() + + # Verify vehicles were loaded + assert len(account.vehicles) == 2 + assert "TestVIN0000000001" in account.vehicles + assert "TestVIN0000000002" in account.vehicles + + +@pytest.mark.asyncio +async def test_global_auth_mode_detection(smart_global_fixture: respx.Router): + """Test that global auth mode is correctly detected.""" + endpoint_urls = get_endpoint_urls_for_region(SmartRegion.GLOBAL) + account = SmartAccount(TEST_USERNAME, TEST_PASSWORD, endpoint_urls=endpoint_urls) + await account.login() + + # Verify that the account uses global HMAC authentication + from pysmarthashtag.const import SmartAuthMode + + assert account.config.authentication.auth_mode == SmartAuthMode.GLOBAL_HMAC + assert account._is_global_auth() is True diff --git a/pysmarthashtag/vehicle/vehicle.py b/pysmarthashtag/vehicle/vehicle.py index f632a01..178afd7 100644 --- a/pysmarthashtag/vehicle/vehicle.py +++ b/pysmarthashtag/vehicle/vehicle.py @@ -4,7 +4,7 @@ import logging from typing import Optional -from pysmarthashtag.const import API_BASE_URL, API_BASE_URL_V2 +from pysmarthashtag.const import API_BASE_URL, API_BASE_URL_V2, SmartAuthMode from pysmarthashtag.models import ValueWithUnit, get_element_from_dict_maybe from pysmarthashtag.vehicle.battery import Battery from pysmarthashtag.vehicle.climate import Climate @@ -74,21 +74,38 @@ def __init__( charging_settings: Optional[dict] = None, fetched_at: Optional[datetime.datetime] = None, ) -> None: - """Initialize the vehicle.""" + """ + Create a SmartVehicle instance by storing the account and merging provided vehicle data, then derive series code and select the appropriate API base URL. + + Merges vehicle_base, vehicle_state, and charging_settings into the instance data (optionally recording fetched_at), ensures a `seriesCodeVs` value is present when derivable, and chooses the API base URL according to the account authentication mode and the vehicle series code. Logs vehicle initialization. + + Parameters: + account (SmartAccount): Account that owns the vehicle and provides configuration and endpoint URLs. + vehicle_base (dict): Base vehicle information retrieved from the primary API. + vehicle_state (Optional[dict]): Optional dynamic state payload to merge into the base data. + charging_settings (Optional[dict]): Optional charging-related settings to merge into the base data. + fetched_at (Optional[datetime.datetime]): Optional timestamp indicating when the provided data was fetched. + """ self.account = account self.data = {} self.combine_data(vehicle_base, vehicle_state, charging_settings, None, fetched_at) - if self.data["seriesCodeVs"].startswith("HX"): + series_code = self.data.get("seriesCodeVs") or self.data.get("modelCode") or "" + if "seriesCodeVs" not in self.data and series_code: + self.data["seriesCodeVs"] = series_code + + if getattr(self.account.config.authentication, "auth_mode", None) == SmartAuthMode.GLOBAL_HMAC: + self.base_url = self.account.endpoint_urls.get_api_base_url() + elif series_code.startswith("HX"): _LOGGER.debug("Selected Vehicle is Smart #1 use V1 API") self.base_url = API_BASE_URL - elif self.data["seriesCodeVs"].startswith("HC"): + elif series_code.startswith("HC"): _LOGGER.debug("Selected Vehicle is Smart #3 use V1 API") self.base_url = API_BASE_URL - elif self.data["seriesCodeVs"].startswith("HY"): + elif series_code.startswith("HY"): _LOGGER.debug("Selected Vehicle is Smart #5 use V2 API") self.base_url = API_BASE_URL_V2 else: - _LOGGER.warning("Unknown Series Code Prefix %s use default API", self.data["seriesCodeVs"]) + _LOGGER.warning("Unknown Series Code Prefix %s use default API", series_code) _LOGGER.debug( "Initialized vehicle %s (%s)", self.name, @@ -155,4 +172,4 @@ def _parse_data(self) -> None: self.engine_state = get_element_from_dict_maybe( self.data, "vehicleStatus", "basicVehicleStatus", "engineStatus" - ) + ) \ No newline at end of file