From 4a05b76ab1af1285ca62a2066971aed75ebd1d11 Mon Sep 17 00:00:00 2001 From: Christoph Langer Date: Sat, 28 Mar 2026 20:39:18 +0100 Subject: [PATCH] Improve TR log in handling --- pyproject.toml | 1 - pytr/account.py | 85 +++++--------- pytr/api.py | 250 +++++++++++++---------------------------- pytr/awswaf/webgl.json | 4 - pytr/main.py | 8 -- 5 files changed, 111 insertions(+), 237 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index b4a7057..14ec22b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,6 @@ dependencies = [ "coloredlogs", "cryptography", "curl_cffi", - "ecdsa", "packaging", "pathvalidate", "pygments", diff --git a/pytr/account.py b/pytr/account.py index abbf0bb..4911f4b 100644 --- a/pytr/account.py +++ b/pytr/account.py @@ -5,8 +5,8 @@ from pygments import formatters, highlight, lexers -from pytr.api import CREDENTIALS_FILE, TradeRepublicApi -from pytr.utils import get_logger +from .api import BASE_DIR, CREDENTIALS_FILE, TradeRepublicApi +from .utils import get_logger def get_settings(tr): @@ -18,9 +18,8 @@ def get_settings(tr): return formatted_json -def login(phone_no=None, pin=None, web=True, store_credentials=False, waf_token=None): +def login(phone_no=None, pin=None, store_credentials=False, waf_token=None): """ - If web is true, use web login method, else simulate app login. Handle credentials parameters and store to credentials file if requested. If no parameters are set but are needed then ask for input """ @@ -28,22 +27,18 @@ def login(phone_no=None, pin=None, web=True, store_credentials=False, waf_token= save_cookies = True if phone_no is None and CREDENTIALS_FILE.is_file(): - log.info("Found credentials file") with open(CREDENTIALS_FILE) as f: lines = f.readlines() phone_no = lines[0].strip() pin = lines[1].strip() phone_no_masked = phone_no[:-8] + "********" pin_masked = len(pin) * "*" - log.info(f"Phone: {phone_no_masked}, PIN: {pin_masked}") + log.info(f"Using credentials from file {CREDENTIALS_FILE}. Phone: {phone_no_masked}, PIN: {pin_masked}") else: - CREDENTIALS_FILE.parent.mkdir(parents=True, exist_ok=True) + BASE_DIR.mkdir(parents=True, exist_ok=True) if phone_no is None: - log.info("Credentials file not found") print("Please enter your TradeRepublic phone number in the format +4912345678:") phone_no = input() - else: - log.info("Phone number provided as argument") if pin is None: print("Please enter your TradeRepublic pin:") @@ -53,56 +48,36 @@ def login(phone_no=None, pin=None, web=True, store_credentials=False, waf_token= with open(CREDENTIALS_FILE, "w") as f: f.writelines([phone_no + "\n", pin + "\n"]) - log.info(f"Saved credentials in {CREDENTIALS_FILE}") + log.info(f"Storing credentials/cookies in {BASE_DIR}") else: save_cookies = False tr = TradeRepublicApi(phone_no=phone_no, pin=pin, save_cookies=save_cookies, waf_token=waf_token) - if web: - # Use same login as app.traderepublic.com - if tr.resume_websession(): - log.info("Web session resumed") - else: - try: - countdown = tr.initiate_weblogin() - except ValueError as e: - log.fatal(str(e)) - sys.exit(1) - request_time = time.time() - print("Enter the code you received to your mobile app as a notification.") - print(f"Enter nothing if you want to receive the (same) code as SMS. (Countdown: {countdown})") - code = input("Code: ") - if code == "": - countdown = countdown - (time.time() - request_time) - for remaining in range(int(countdown)): - print( - f"Need to wait {int(countdown - remaining)} seconds before requesting SMS...", - end="\r", - ) - time.sleep(1) - print() - tr.resend_weblogin() - code = input("SMS requested. Enter the confirmation code:") - tr.complete_weblogin(code) - else: - # Try to login. Ask for device reset if needed + # Use same login as app.traderepublic.com + if not tr.resume_websession(): try: - tr.login() - except (KeyError, AttributeError): - # old keyfile or no keyfile - print("Error logging in. Reset device? (y)") - confirmation = input() - if confirmation == "y": - tr.initiate_device_reset() - print("You should have received a SMS with a token. Please type it in:") - token = input() - tr.complete_device_reset(token) - print("Reset done") - else: - print("Cancelling reset") - sys.exit(1) + countdown = tr.initiate_weblogin() + except ValueError as e: + log.fatal(str(e)) + sys.exit(1) + request_time = time.time() + print("Enter the code you received to your mobile app as a notification.") + print(f"Enter nothing if you want to receive the (same) code as SMS. (Countdown: {countdown})") + code = input("Code: ") + if code == "": + countdown = countdown - (time.time() - request_time) + for remaining in range(int(countdown)): + print( + f"Need to wait {int(countdown - remaining)} seconds before requesting SMS...", + end="\r", + ) + time.sleep(1) + print() + tr.resend_weblogin() + code = input("SMS requested. Enter the confirmation code:") + tr.complete_weblogin(code) + log.info("Logged in.") - log.info("Logged in") - # log.debug(get_settings(tr)) + log.debug(get_settings(tr)) return tr diff --git a/pytr/api.py b/pytr/api.py index b50ec0c..1a8cbc0 100644 --- a/pytr/api.py +++ b/pytr/api.py @@ -21,8 +21,6 @@ # SOFTWARE. import asyncio -import base64 -import hashlib import json import pathlib import re @@ -37,8 +35,6 @@ import requests import websockets from curl_cffi import requests as cffi_requests -from ecdsa import NIST256p, SigningKey # type: ignore[import-untyped] -from ecdsa.util import sigencode_der # type: ignore[import-untyped] from pytr.awswaf.aws import AwsWaf from pytr.utils import get_logger @@ -46,24 +42,18 @@ home = pathlib.Path.home() BASE_DIR = home / ".pytr" CREDENTIALS_FILE = BASE_DIR / "credentials" -KEY_FILE = BASE_DIR / "keyfile.pem" COOKIES_FILE = BASE_DIR / "cookies.txt" class TradeRepublicApi: - _default_headers = {"User-Agent": "TradeRepublic/Android 30/App Version 1.1.5534"} - _default_headers_web = { + _default_headers = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.74 Safari/537.36" } _host = "https://api.traderepublic.com" _waf_login_url = "https://app.traderepublic.com/login" - _weblogin = False - _refresh_token = None - _session_token = None - _session_token_expires_at = None _process_id = None - _web_session_token_expires_at = 0 + _session_token_expires_at = 0 _ws = None _lock = asyncio.Lock() @@ -74,24 +64,10 @@ class TradeRepublicApi: _credentials_file = CREDENTIALS_FILE _cookies_file = COOKIES_FILE - @property - def session_token(self): - if not self._refresh_token: - self.login() - elif self._refresh_token and time.time() > self._session_token_expires_at: - self.refresh_access_token() - return self._session_token - - @session_token.setter - def session_token(self, val): - self._session_token_expires_at = time.time() + 290 - self._session_token = val - def __init__( self, phone_no=None, pin=None, - keyfile=None, locale="de", save_cookies=False, credentials_file=None, @@ -101,6 +77,7 @@ def __init__( self.log = get_logger(__name__) self._locale = locale self._save_cookies = save_cookies + self._waf_token = waf_token self._credentials_file = pathlib.Path(credentials_file) if credentials_file else CREDENTIALS_FILE @@ -118,101 +95,16 @@ def __init__( self._cookies_file = pathlib.Path(cookies_file) if cookies_file else BASE_DIR / f"cookies.{self.phone_no}.txt" - self.keyfile = keyfile if keyfile else KEY_FILE - try: - with open(self.keyfile, "rb") as f: - self.sk = SigningKey.from_pem(f.read(), hashfunc=hashlib.sha512) - except FileNotFoundError: - pass - self._websession = requests.Session() - self._websession.headers = self._default_headers_web + self._websession.headers = self._default_headers if self._save_cookies: self._websession.cookies = MozillaCookieJar(self._cookies_file) - self._waf_token = waf_token or self._fetch_waf_token() - if self._waf_token: - self._set_waf_cookie(self._waf_token) - - def initiate_device_reset(self): - self.sk = SigningKey.generate(curve=NIST256p, hashfunc=hashlib.sha512) - - r = requests.post( - f"{self._host}/api/v1/auth/account/reset/device", - json={"phoneNumber": self.phone_no, "pin": self.pin}, - headers=self._default_headers, - ) - - self._process_id = r.json()["processId"] - - def complete_device_reset(self, token): - if not self._process_id and not self.sk: - raise ValueError("Initiate Device Reset first.") - - pubkey_bytes = self.sk.get_verifying_key().to_string("uncompressed") - pubkey_string = base64.b64encode(pubkey_bytes).decode("ascii") - - r = requests.post( - f"{self._host}/api/v1/auth/account/reset/device/{self._process_id}/key", - json={"code": token, "deviceKey": pubkey_string}, - headers=self._default_headers, - ) - if r.status_code == 200: - with open(self.keyfile, "wb") as f: - f.write(self.sk.to_pem()) - - def login(self): - self.log.info("Logging in") - r = self._sign_request( - "/api/v1/auth/login", - payload={"phoneNumber": self.phone_no, "pin": self.pin}, - ) - self._refresh_token = r.json()["refreshToken"] - self.session_token = r.json()["sessionToken"] - - def refresh_access_token(self): - self.log.info("Refreshing access token") - r = self._sign_request("/api/v1/auth/session", method="GET") - self.session_token = r.json()["sessionToken"] - self.save_websession() - - def _sign_request(self, url_path, payload=None, method="POST"): - ts = int(time.time() * 1000) - payload_string = json.dumps(payload) if payload else "" - signature_payload = f"{ts}.{payload_string}" - signature = self.sk.sign( - bytes(signature_payload, "utf-8"), - hashfunc=hashlib.sha512, - sigencode=sigencode_der, - ) - signature_string = base64.b64encode(signature).decode("ascii") - - headers = self._default_headers.copy() - headers["X-Zeta-Timestamp"] = str(ts) - headers["X-Zeta-Signature"] = signature_string - headers["Content-Type"] = "application/json" - - if url_path == "/api/v1/auth/login": - pass - elif url_path == "/api/v1/auth/session": - headers["Authorization"] = f"Bearer {self._refresh_token}" - elif self.session_token: - headers["Authorization"] = f"Bearer {self.session_token}" - - if self._waf_token and url_path.startswith("/api/v1/auth/"): - headers["Cookie"] = f"aws-waf-token={self._waf_token}" - - return requests.request( - method=method, - url=f"{self._host}{url_path}", - data=payload_string, - headers=headers, - ) - def _fetch_waf_token(self): - """Fetch AWS WAF token by solving the challenge. Requires pytr[waf] extras.""" + """Get AWS WAF token by solving the challenge.""" try: + self.log.info("Retrieving AWS WAF token...") session = cffi_requests.Session(impersonate="chrome") response = session.get(self._waf_login_url) m = re.search(r'src="(https://[^"]+/challenge\.js)"', response.text) @@ -223,10 +115,10 @@ def _fetch_waf_token(self): waf_endpoint = challenge_js_url.split("https://", 1)[1].rsplit("/challenge.js", 1)[0] challenge_js = session.get(challenge_js_url).text token = AwsWaf(waf_endpoint, "app.traderepublic.com", challenge_js)() - self.log.info("AWS WAF token obtained automatically") + self.log.info("AWS WAF token obtained.") return token - except Exception as e: - self.log.warning(f"Failed to fetch WAF token automatically: {e}") + except Exception: + self.log.warning("Failed to get AWS WAF token", exc_info=True) return None def _set_waf_cookie(self, token: str): @@ -252,11 +144,20 @@ def _set_waf_cookie(self, token: str): self._websession.cookies.set_cookie(cookie) def initiate_weblogin(self): + self.log.info("Initiating web login...") + + if not self._waf_token: + self._waf_token = self._fetch_waf_token() + self._set_waf_cookie(self._waf_token) + r = self._websession.post( f"{self._host}/api/v1/auth/web/login", json={"phoneNumber": self.phone_no, "pin": self.pin}, ) + self.log.debug(f"Web login returned: {r.status_code}") + r.raise_for_status() j = r.json() + self.log.debug(f"Web login data: {json.dumps(j, indent=4)}") try: self._process_id = j["processId"] except KeyError: @@ -281,7 +182,6 @@ def complete_weblogin(self, verify_code): r = self._websession.post(f"{self._host}/api/v1/auth/web/login/{self._process_id}/{verify_code}") r.raise_for_status() self.save_websession() - self._weblogin = True def save_websession(self): # Saves session cookies too (expirydate=0). @@ -298,31 +198,33 @@ def resume_websession(self): Use saved cookie file to resume web session return success """ - if self._save_cookies is False: + self.log.debug("Called resume_websession.") + + # Only attempt to resume if cookies are used and a cookie file exists. + if self._save_cookies is False or not self._cookies_file.exists(): return False - # Only attempt to load if the cookie file exists. - if self._cookies_file.exists(): - # Loads session cookies too (expirydate=0). - self._websession.cookies.load(ignore_discard=True, ignore_expires=True) - self._weblogin = True - # Re-apply fresh WAF token over any stale one from the cookie file - if self._waf_token: - self._set_waf_cookie(self._waf_token) - try: - self.settings() - except requests.exceptions.HTTPError: - self._weblogin = False - return False - else: - return True - return False + self.log.info("Trying to resume websession...") + + # Loads session cookies too (expirydate=0). + self._websession.cookies.load(ignore_discard=True, ignore_expires=True) + + try: + self.log.debug("Calling settings...") + self.settings() + except requests.exceptions.HTTPError: + self.log.info("Resuming websession failed.") + self.log.debug("Error calling tr.settings().", exc_info=True) + return False + + self.log.info("Websession resumed.") + return True def _web_request(self, url_path, payload=None, method="GET"): - if self._web_session_token_expires_at < time.time(): + if self._session_token_expires_at < time.time(): r = self._websession.get(f"{self._host}/api/v1/auth/web/session") r.raise_for_status() - self._web_session_token_expires_at = time.time() + 290 + self._session_token_expires_at = time.time() + 290 return self._websession.request(method=method, url=f"{self._host}{url_path}", data=payload) async def _get_ws(self): @@ -335,22 +237,21 @@ async def _get_ws(self): connection_message = {"locale": self._locale} connect_id = 21 - if self._weblogin: - # authenticate with cookies, set different connection message and connect ID - cookie_str = "" - for cookie in self._websession.cookies: - if cookie.domain.endswith("traderepublic.com"): - cookie_str += f"{cookie.name}={cookie.value}; " - extra_headers = {"Cookie": cookie_str.rstrip("; ")} - - connection_message = { - "locale": self._locale, - "platformId": "webtrading", - "platformVersion": "chrome - 94.0.4606", - "clientId": "app.traderepublic.com", - "clientVersion": "5582", - } - connect_id = 31 + # authenticate with cookies, set different connection message and connect ID + cookie_str = "" + for cookie in self._websession.cookies: + if cookie.domain.endswith("traderepublic.com"): + cookie_str += f"{cookie.name}={cookie.value}; " + extra_headers = {"Cookie": cookie_str.rstrip("; ")} + + connection_message = { + "locale": self._locale, + "platformId": "webtrading", + "platformVersion": "chrome - 94.0.4606", + "clientId": "app.traderepublic.com", + "clientVersion": "5582", + } + connect_id = 31 self._ws = await websockets.connect( "wss://api.traderepublic.com", ssl=ssl_context, additional_headers=extra_headers @@ -361,7 +262,7 @@ async def _get_ws(self): if not response == "connected": raise ValueError(f"Connection Error: {response}") - self.log.info("Connected to websocket...") + self.log.info("Connected.") return self._ws @@ -385,8 +286,6 @@ async def subscribe(self, payload): self.subscriptions[subscription_id] = payload payload_with_token = payload.copy() - if not self._weblogin: - payload_with_token["token"] = self.session_token await ws.send(f"sub {subscription_id} {json.dumps(payload_with_token)}") return subscription_id @@ -805,31 +704,44 @@ async def unsubscribe_news(self, isin): return await self.subscribe({"type": "unsubscribeNews", "instrumentId": isin}) def payout(self, amount): - return self._sign_request("/api/v1/payout", {"amount": amount}).json() + return requests.request( + method="POST", + url=f"{self._host}/api/v1/payout", + data={"amount": amount}, + headers=self._default_headers, + ).json() def confirm_payout(self, process_id, code): - r = self._sign_request(f"/api/v1/payout/{process_id}/code", {"code": code}) + r = requests.request( + method="POST", + url=f"{self._host}/api/v1/payout/{process_id}/code", + data={"code": code}, + headers=self._default_headers, + ) + if r.status_code != 200: raise ValueError(f"Payout failed with response {r.text!r}") def settings(self): - if self._weblogin: - r = self._web_request("/api/v2/auth/account") - else: - r = self._sign_request("/api/v1/auth/account", method="GET") + r = self._web_request("/api/v2/auth/account") r.raise_for_status() return r.json() def order_cost(self, isin, exchange, order_mode, order_type, size, sell_fractions): - url = ( - f"/api/v1/user/costtransparency?instrumentId={isin}&exchangeId={exchange}" - f"&mode={order_mode}&type={order_type}&size={size}&sellFractions={sell_fractions}" - ) - return self._sign_request(url, method="GET").text + return requests.request( + method="GET", + url=f"{self._host}/api/v1/user/costtransparency?instrumentId={isin}&exchangeId={exchange}&mode={order_mode}&type={order_type}&size={size}&sellFractions={sell_fractions}", + data=None, + headers=self._default_headers, + ).text def savings_plan_cost(self, isin, amount, interval): - url = f"/api/v1/user/savingsplancosttransparency?instrumentId={isin}&amount={amount}&interval={interval}" - return self._sign_request(url, method="GET").text + return requests.request( + method="GET", + url=f"{self._host}/api/v1/user/savingsplancosttransparency?instrumentId={isin}&amount={amount}&interval={interval}", + data=None, + headers=self._default_headers, + ).text def __getattr__(self, name): if name[:9] == "blocking_": diff --git a/pytr/awswaf/webgl.json b/pytr/awswaf/webgl.json index c41b080..eab6ae3 100644 --- a/pytr/awswaf/webgl.json +++ b/pytr/awswaf/webgl.json @@ -1,8 +1,4 @@ [ - { - "note": "Vendored from https://github.com/xKiian/awswaf", - "license": "MIT" - }, { "webgl_unmasked_renderer": "ANGLE (Apple, ANGLE Metal Renderer: Apple M2 Pro, Unspecified Version)", "webgl": [ diff --git a/pytr/main.py b/pytr/main.py index a4a4324..1d2dadb 100644 --- a/pytr/main.py +++ b/pytr/main.py @@ -75,7 +75,6 @@ def formatter(prog): # parent subparser with common login arguments parser_login_args = argparse.ArgumentParser(add_help=False) - parser_login_args.add_argument("--applogin", help="Use app login instead of web login", action="store_true") parser_login_args.add_argument("-n", "--phone_no", help="TradeRepublic phone number (international format)") parser_login_args.add_argument("-p", "--pin", help="TradeRepublic pin") parser_login_args.add_argument( @@ -444,7 +443,6 @@ def main(): login( phone_no=args.phone_no, pin=args.pin, - web=not args.applogin, store_credentials=args.store_credentials, waf_token=args.waf_token, ) @@ -453,7 +451,6 @@ def main(): login( phone_no=args.phone_no, pin=args.pin, - web=not args.applogin, store_credentials=args.store_credentials, waf_token=args.waf_token, ), @@ -470,7 +467,6 @@ def main(): login( phone_no=args.phone_no, pin=args.pin, - web=not args.applogin, store_credentials=args.store_credentials, waf_token=args.waf_token, ), @@ -481,7 +477,6 @@ def main(): login( phone_no=args.phone_no, pin=args.pin, - web=not args.applogin, store_credentials=args.store_credentials, waf_token=args.waf_token, ), @@ -511,7 +506,6 @@ def main(): login( phone_no=args.phone_no, pin=args.pin, - web=not args.applogin, store_credentials=args.store_credentials, waf_token=args.waf_token, ), @@ -546,7 +540,6 @@ def main(): login( phone_no=args.phone_no, pin=args.pin, - web=not args.applogin, store_credentials=args.store_credentials, waf_token=args.waf_token, ), @@ -562,7 +555,6 @@ def main(): login( phone_no=args.phone_no, pin=args.pin, - web=not args.applogin, store_credentials=args.store_credentials, waf_token=args.waf_token, ),