Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
474cabe
feat: implement global authentication mode and enhance vehicle initia…
DasBasti Feb 4, 2026
d44662f
Potential fix for code scanning alert no. 3: Incomplete URL substring…
DasBasti Feb 4, 2026
20dc0eb
📝 Add docstrings to `retry-international-auth`
coderabbitai[bot] Feb 4, 2026
676fb22
Update pysmarthashtag/api/authentication.py
DasBasti Feb 4, 2026
63e1d74
Update pysmarthashtag/account.py
DasBasti Feb 4, 2026
e84fa4c
Update pyproject.toml
DasBasti Feb 4, 2026
02df818
Initial plan
Copilot Feb 4, 2026
37a6d3f
Initial plan
Copilot Feb 4, 2026
2d6ffa8
Merge branch 'retry-international-auth' into coderabbitai/docstrings/…
DasBasti Feb 4, 2026
a77ee92
Merge pull request #162 from DasBasti/coderabbitai/docstrings/d44662f
DasBasti Feb 4, 2026
fc6bb02
Add unit tests for Global HMAC authentication methods
Copilot Feb 4, 2026
6580806
Fix linting issues in authentication tests
Copilot Feb 4, 2026
9576f8a
Add global authentication mode tests
Copilot Feb 4, 2026
99f796b
Address code review feedback
Copilot Feb 4, 2026
52018f1
Fix linting and formatting issues
Copilot Feb 4, 2026
1f6d6fd
Refactor to eliminate code duplication
Copilot Feb 4, 2026
1f11a27
Update pysmarthashtag/tests/test_authentication.py
DasBasti Feb 4, 2026
c4b8fd9
Use SmartAPIError and remove unnecessary API session mocks
Copilot Feb 4, 2026
621838d
Merge pull request #164 from DasBasti/copilot/sub-pr-161-again
DasBasti Feb 4, 2026
1a4d72c
Merge branch 'retry-international-auth' into copilot/sub-pr-161
DasBasti Feb 4, 2026
b3df57a
Merge pull request #163 from DasBasti/copilot/sub-pr-161
DasBasti Feb 4, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ enable_error_code = "ignore-without-code"
target-version = "py39"
line-length = 120

select = [
lint.select = [
"C", # complexity
"D", # docstrings
"E", # pycodestyle
Expand All @@ -62,7 +62,7 @@ select = [
"PGH004", # Use specific rule codes when using noqa
]

ignore = [
lint.ignore = [
"D202", # No blank lines allowed after function docstring
"D203", # 1 blank line required before class docstring
"D212", # Multi-line docstring summary should start at the first line
Expand All @@ -75,9 +75,9 @@ ignore = [
exclude = [
]

[tool.ruff.per-file-ignores]
[tool.ruff.lint.per-file-ignores]
"docs/source/conf.py" = ["D100"]
"pysamrthashtag/api/authentication.py" = ["D102", "D107"]
"pysmarthashtag/api/authentication.py" = ["D102", "D107"]

Comment thread
coderabbitai[bot] marked this conversation as resolved.
[tool.ruff.mccabe]
[tool.ruff.lint.mccabe]
max-complexity = 25
253 changes: 236 additions & 17 deletions pysmarthashtag/account.py

Large diffs are not rendered by default.

358 changes: 313 additions & 45 deletions pysmarthashtag/api/authentication.py

Large diffs are not rendered by default.

192 changes: 191 additions & 1 deletion pysmarthashtag/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import logging
import secrets
import time
import uuid
from email.utils import formatdate
from typing import Optional

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -70,5 +73,192 @@ def generate_default_header(


def create_correct_timestamp() -> str:
"""Create a correct timestamp for the request."""
"""
Generate a timestamp string representing the current time in milliseconds since the Unix epoch.

Returns:
timestamp (str): Current time in milliseconds since 1970-01-01 UTC, formatted as a decimal string.
"""
return str(int(time.time() * 1000))


def _ensure_bytes(body: Optional[object]) -> Optional[bytes]:
"""
Normalize a request body to a UTF-8 bytes object when present.

Parameters:
body (Optional[object]): The value to normalize. If `None`, no conversion is performed.

Returns:
Optional[bytes]: `None` if input is `None`; the input unchanged if already `bytes`; otherwise the UTF-8 encoding of `str(body)`.
"""
if body is None:
return None
if isinstance(body, bytes):
return body
return str(body).encode("utf-8")
Comment on lines +85 to +99

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "utils.py" -path "*/api/*" | head -5

Repository: DasBasti/pySmartHashtag

Length of output: 96


🏁 Script executed:

cat -n pysmarthashtag/api/utils.py

Repository: DasBasti/pySmartHashtag

Length of output: 7413


🏁 Script executed:

rg "_ensure_bytes" --context 10

Repository: DasBasti/pySmartHashtag

Length of output: 2399


🏁 Script executed:

rg -i "hmac|signature|digest" -A 3 -B 3

Repository: DasBasti/pySmartHashtag

Length of output: 12603


🏁 Script executed:

rg "generate_global_header" -B 3 -A 3

Repository: DasBasti/pySmartHashtag

Length of output: 3006


🏁 Script executed:

rg -i "json\.dumps\|json\.loads" -B 2 -A 2

Repository: DasBasti/pySmartHashtag

Length of output: 49


🏁 Script executed:

fd "\.py$" -x grep -l "generate_global_header" {} \; | head -10

Repository: DasBasti/pySmartHashtag

Length of output: 163


🏁 Script executed:

cat -n pysmarthashtag/api/utils.py | sed -n '1,20p'

Repository: DasBasti/pySmartHashtag

Length of output: 706


🏁 Script executed:

rg "body_bytes" -B 5 -A 5

Repository: DasBasti/pySmartHashtag

Length of output: 905


🏁 Script executed:

rg "import json"

Repository: DasBasti/pySmartHashtag

Length of output: 291


🏁 Script executed:

fd "test.*\.py$" -x grep -l "_ensure_bytes\|generate_global_header" {} \;

Repository: DasBasti/pySmartHashtag

Length of output: 49


🏁 Script executed:

find . -name "test_*.py" -o -name "*_test.py" | xargs grep -l "ensure_bytes\|generate_global_header" 2>/dev/null || echo "No tests found"

Repository: DasBasti/pySmartHashtag

Length of output: 81


🏁 Script executed:

cat -n pysmarthashtag/api/utils.py | sed -n '147,200p'

Repository: DasBasti/pySmartHashtag

Length of output: 1963


Handle different body types explicitly in _ensure_bytes to prevent HMAC digest mismatches.

The current implementation accepts Optional[object] but uses str(body) to serialize non-bytes inputs. If callers pass dict/list objects directly (rather than pre-serialized JSON strings), str() produces Python repr syntax—not JSON—causing the MD5 digest in signature generation to mismatch the actual serialized request body. This breaks HMAC authentication silently.

All current callers pre-serialize using json.dumps(), but the permissive type hint allows misuse. Explicitly handle each type: strings encode directly, dict/list use json.dumps() with consistent separators, and unsupported types raise TypeError.

Suggested fix
+import json
 
 def _ensure_bytes(body: Optional[object]) -> Optional[bytes]:
     if body is None:
         return None
     if isinstance(body, bytes):
         return body
-    return str(body).encode("utf-8")
+    if isinstance(body, str):
+        return body.encode("utf-8")
+    if isinstance(body, (dict, list)):
+        return json.dumps(body, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
+    raise TypeError(f"Unsupported body type for signing: {type(body)!r}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _ensure_bytes(body: Optional[object]) -> Optional[bytes]:
if body is None:
return None
if isinstance(body, bytes):
return body
return str(body).encode("utf-8")
import json
def _ensure_bytes(body: Optional[object]) -> Optional[bytes]:
if body is None:
return None
if isinstance(body, bytes):
return body
if isinstance(body, str):
return body.encode("utf-8")
if isinstance(body, (dict, list)):
return json.dumps(body, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
raise TypeError(f"Unsupported body type for signing: {type(body)!r}")
🤖 Prompt for AI Agents
In `@pysmarthashtag/api/utils.py` around lines 80 - 85, The _ensure_bytes function
currently casts arbitrary objects via str(), causing non-JSON reprs and HMAC
mismatches; update _ensure_bytes to explicitly handle bytes (return as-is), str
(encode utf-8), and dict/list (serialize with json.dumps(..., separators=(",",
":")) and encode utf-8), raise a TypeError for any other types, and narrow the
type hint accordingly so callers cannot accidentally pass unsupported objects
(refer to _ensure_bytes function name to locate the change).



def _global_md5_base64(body: bytes) -> str:
"""
Return the first 24 characters of the base64-encoded MD5 digest of `body`.

Parameters:
body (bytes): Input bytes to hash.

Returns:
str: First 24 characters of the base64-encoded MD5 digest.
"""
md5_hash = hashlib.md5(body).digest()
Comment thread Dismissed
return base64.b64encode(md5_hash).decode("utf-8")[:24]


def _build_global_string_to_sign(
method: str,
path: str,
headers: dict[str, str],
content_md5: str = "",
) -> str:
"""
Construct the canonical string used to compute the HMAC-SHA256 signature for a Global API request.

The resulting newline-separated string contains, in order: HTTP method, Accept header, the provided content MD5 value, Content-Type header, Date header, all `x-ca-*` headers (each as `key:value` on its own line), and the request path. This canonical string is intended to be the message passed to the signing HMAC.

Parameters:
method (str): HTTP method (e.g., "GET", "POST").
path (str): Request path, including query string if applicable.
headers (dict[str, str]): Request headers; values for "accept", "content-type", "date", and any `x-ca-*` headers are used.
content_md5 (str): Base64-encoded MD5 of the request body when present, or an empty string if absent.

Returns:
str: The canonical string to sign with HMAC-SHA256.
"""
string_to_sign = [
method,
headers.get("accept", ""),
content_md5,
headers.get("content-type", ""),
headers.get("date", ""),
]

ca_headers = []
ca_header_names = []
for key in sorted(headers.keys()):
if key.startswith("x-ca-"):
ca_headers.append(f"{key}:{headers[key]}")
ca_header_names.append(key)

if ca_header_names:
headers["x-ca-signature-headers"] = ",".join(ca_header_names)

string_to_sign.append("\n".join(ca_headers))
string_to_sign.append(path)

return "\n".join(string_to_sign)


def _generate_global_signature(
app_secret: str,
method: str,
path: str,
headers: dict[str, str],
body: Optional[bytes] = None,
) -> str:
"""
Create the HMAC-SHA256 signature used for Global API requests.

If a request body is provided, its MD5 (base64, truncated to 24 chars) is computed and inserted into headers["content-md5"] before signing. The function builds the canonical string-to-sign from method, path, and headers, then returns the base64-encoded HMAC-SHA256 of that string using app_secret as the key.

Parameters:
headers (dict[str, str]): Request headers; this dict will be mutated to include "content-md5" when a body is provided.

Returns:
str: The base64-encoded HMAC-SHA256 signature.
"""
content_md5 = ""
if body is not None:
content_md5 = _global_md5_base64(body)
headers["content-md5"] = content_md5

string_to_sign = _build_global_string_to_sign(method, path, headers, content_md5)
signature = hmac.new(
app_secret.encode("utf-8"),
string_to_sign.encode("utf-8"),
hashlib.sha256,
).digest()
return base64.b64encode(signature).decode("utf-8")


def generate_global_header(
method: str,
path: str,
host: str,
app_key: str,
app_secret: str,
body: Optional[object] = None,
content_type: str = "application/json",
access_token: Optional[str] = None,
user_id: Optional[str] = None,
id_token: Optional[str] = None,
extra_headers: Optional[dict[str, str]] = None,
) -> dict[str, str]:
"""
Builds HTTP headers for a Global API request and signs them with HMAC-SHA256.

Assembles standard headers (date, content-type, host, user-agent, x-ca-timestamp, x-ca-nonce, x-ca-key, etc.), conditionally includes Authorization/x-smart-id/Xs-Auth-Token when provided, merges any extra_headers, and computes the `x-ca-signature` header using the provided `app_secret`.

Parameters:
method (str): HTTP method (e.g., "GET", "POST") used when computing the signature.
path (str): Request path (URI) used in the signature calculation.
host (str): Host header value for the request.
app_key (str): Application key inserted as `x-ca-key`.
app_secret (str): Secret used to compute the HMAC-SHA256 signature.
body (Optional[object]): Request body; if provided it will be converted to bytes and included in the signature computation.
content_type (str): Value for `content-type` and `accept` headers. Defaults to "application/json".
access_token (Optional[str]): If provided, added as `Authorization: Bearer <token>`.
user_id (Optional[str]): If provided, added as `x-smart-id`.
id_token (Optional[str]): If provided, added as `Xs-Auth-Token` (and `Xs-App-Ver` is set).
extra_headers (Optional[dict[str, str]]): Additional headers to merge into the final header set.

Returns:
dict[str, str]: A dictionary of HTTP headers ready to attach to the request, including the computed `x-ca-signature`.
"""
timestamp = create_correct_timestamp()
nonce = str(uuid.uuid4())
http_date = formatdate(timeval=None, localtime=False, usegmt=True)

headers = {
"date": http_date,
"x-ca-timestamp": timestamp,
"x-ca-nonce": nonce,
"x-ca-key": app_key,
"x-ca-signature-method": "HmacSHA256",
"CA_VERSION": "1",
"content-type": content_type,
"accept": content_type,
"host": host,
"user-agent": "ALIYUN-ANDROID-DEMO",
}

if access_token:
headers["Authorization"] = f"Bearer {access_token}"
if user_id:
headers["x-smart-id"] = user_id
if id_token:
headers["Xs-Auth-Token"] = id_token
headers["Xs-App-Ver"] = "1.0.8"

if extra_headers:
headers.update(extra_headers)

body_bytes = _ensure_bytes(body)
headers["x-ca-signature"] = _generate_global_signature(
app_secret,
method,
path,
headers,
body_bytes,
)

_LOGGER.debug("Constructed global request header for %s %s", method, path)
return headers
39 changes: 27 additions & 12 deletions pysmarthashtag/cli.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@ def environ_or_required(key):


def main_parser() -> argparse.ArgumentParser:
"""Create argument parser."""
"""
Create and return the CLI ArgumentParser and configure module logging from SMART_LOG_LEVEL.

The parser is configured with subcommands: `status`, `info`, `watch` (with `-i` interval), `climate` (with `--vin`, `--temp`, `--active`), and `seatheating` (with `--vin`, `--level`, `--temp`, `--active`). Default authentication/region arguments are added via the module helper and the parser's default `func` is set to the command dispatcher.

Returns:
argparse.ArgumentParser: A fully configured ArgumentParser for the CLI.
"""

logging_config = {
"version": 1,
Expand Down Expand Up @@ -56,6 +63,8 @@ def main_parser() -> argparse.ArgumentParser:
},
}

log_level = os.environ.get("SMART_LOG_LEVEL", "INFO").upper()
logging_config["loggers"]["pysmarthashtag"]["level"] = log_level
Comment on lines +66 to +67

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, let's examine the cli.py file around lines 59-60
wc -l pysmarthashtag/cli.py

Repository: DasBasti/pySmartHashtag

Length of output: 92


🏁 Script executed:

# Read the file with context around lines 59-60
head -100 pysmarthashtag/cli.py | tail -50

Repository: DasBasti/pySmartHashtag

Length of output: 2057


🏁 Script executed:

# Search for dictConfig usage in the file
rg "dictConfig" -A 3 -B 3 pysmarthashtag/

Repository: DasBasti/pySmartHashtag

Length of output: 525


🏁 Script executed:

# Look for any existing log level validation
rg -i "log.*level|SMART_LOG_LEVEL" -B 2 -A 2 pysmarthashtag/

Repository: DasBasti/pySmartHashtag

Length of output: 6094


🏁 Script executed:

# Check Python logging behavior with invalid log levels
python3 << 'EOF'
import logging
import logging.config

# Test 1: Valid log level
try:
    config = {
        "version": 1,
        "loggers": {
            "test": {
                "level": "DEBUG"
            }
        }
    }
    logging.config.dictConfig(config)
    print("✓ Valid level (DEBUG) accepted")
except Exception as e:
    print(f"✗ Valid level rejected: {e}")

# Test 2: Invalid log level string
try:
    config = {
        "version": 1,
        "loggers": {
            "test": {
                "level": "INVALID_LEVEL"
            }
        }
    }
    logging.config.dictConfig(config)
    print("✓ Invalid level (INVALID_LEVEL) accepted (silently)")
except Exception as e:
    print(f"✗ Invalid level rejected: {type(e).__name__}: {e}")

# Test 3: Empty string
try:
    config = {
        "version": 1,
        "loggers": {
            "test": {
                "level": ""
            }
        }
    }
    logging.config.dictConfig(config)
    print("✓ Empty string accepted")
except Exception as e:
    print(f"✗ Empty string rejected: {type(e).__name__}: {e}")

# Test 4: Numeric level string
try:
    config = {
        "version": 1,
        "loggers": {
            "test": {
                "level": "20"
            }
        }
    }
    logging.config.dictConfig(config)
    print("✓ Numeric string (20) accepted")
except Exception as e:
    print(f"✗ Numeric string rejected: {type(e).__name__}: {e}")
EOF

Repository: DasBasti/pySmartHashtag

Length of output: 313


🏁 Script executed:

# Check if there's any try-catch around dictConfig in the actual code
rg -B 10 "dictConfig" pysmarthashtag/cli.py | head -30

Repository: DasBasti/pySmartHashtag

Length of output: 393


🏁 Script executed:

# Get the complete logging_config setup to understand the full context
rg -B 50 "log_level = os.environ.get" pysmarthashtag/cli.py | head -80

Repository: DasBasti/pySmartHashtag

Length of output: 1680


Validate SMART_LOG_LEVEL to prevent startup failure on invalid values.

An invalid environment value will raise ValueError when dictConfig is called. Without validation, users setting invalid log levels (e.g., SMART_LOG_LEVEL=INVALID) will crash the application at startup.

🛠️ Suggested fix
    log_level = os.environ.get("SMART_LOG_LEVEL", "INFO").upper()
+   if log_level not in {"CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"}:
+       log_level = "INFO"
    logging_config["loggers"]["pysmarthashtag"]["level"] = log_level
    logging.config.dictConfig(logging_config)
🤖 Prompt for AI Agents
In `@pysmarthashtag/cli.py` around lines 59 - 60, Validate the SMART_LOG_LEVEL
value before assigning it into logging_config to avoid ValueError during
dictConfig: read and upper() the env var as you already do, check it against the
set of known logging level names (e.g., via logging._nameToLevel or by using
logging.getLevelName and confirming it maps to a numeric level), and if it's
invalid fall back to "INFO" (and emit a warning via the pysmarthashtag logger or
stderr) before setting logging_config["loggers"]["pysmarthashtag"]["level"] =
log_level; ensure this validation logic is placed where log_level is computed so
dictConfig never receives an invalid level.

logging.config.dictConfig(logging_config)

parser = argparse.ArgumentParser(description="Smart API demo")
Expand Down Expand Up @@ -165,26 +174,32 @@ async def set_seatheating(args) -> None:


def _add_default_args(parser: argparse.ArgumentParser):
"""Add the default arguments username, password to the parser."""
"""
Add standard CLI options for Smart account credentials and API region.

Adds the following arguments to the provided ArgumentParser:
- `--username` and `--password`: use values from `SMART_USERNAME` / `SMART_PASSWORD` when present; marked required if the corresponding environment variable is not set.
- `--region`: selects the Smart API region with choices `eu`, `intl`, `global`; defaults to the `SMART_REGION` environment variable or `"eu"` when not set.
"""
parser.add_argument("--username", help="Smart username", **environ_or_required("SMART_USERNAME"))
parser.add_argument("--password", help="Smart password", **environ_or_required("SMART_PASSWORD"))
parser.add_argument(
"--region",
help="Region for Smart API (eu=Europe, intl=International/Australia/Asia-Pacific)",
choices=["eu", "intl"],
help="Region for Smart API (eu=Europe, intl=Asia-Pacific, global=Australia/Israel)",
choices=["eu", "intl", "global"],
default=os.environ.get("SMART_REGION", "eu"),
)


def _get_endpoint_urls_from_args(args) -> EndpointUrls:
"""Get EndpointUrls based on region argument.

Args:
args: Parsed command line arguments containing the region.

"""
Return endpoint URLs for the SmartRegion specified in args.region.

Parameters:
args: Parsed command-line arguments with a `region` attribute ('eu', 'intl', or 'global').

Returns:
EndpointUrls configured for the specified region.

EndpointUrls: Endpoint URLs configured for the specified region.
"""
region = SmartRegion(args.region)
return get_endpoint_urls_for_region(region)
Expand All @@ -199,4 +214,4 @@ def main():


if __name__ == "__main__":
main()
main()
Loading