From 77f87573a465b11f22e7aa8776e035e7e2302a04 Mon Sep 17 00:00:00 2001 From: imDMG Date: Wed, 1 Nov 2023 18:20:01 +0500 Subject: [PATCH 1/2] refactoring auth.py --- auth.py | 271 +++++++++++++++++++++++++++++++++----------------------- 1 file changed, 161 insertions(+), 110 deletions(-) diff --git a/auth.py b/auth.py index 456e521..4f0217a 100644 --- a/auth.py +++ b/auth.py @@ -1,56 +1,109 @@ import contextlib -from secrets import token_urlsafe -from httpx import Client -import requests +import json from dataclasses import dataclass +from secrets import token_urlsafe from time import time -import jwt +from typing import Callable +from urllib.parse import parse_qs, urlparse + import capmonster_python -import json +import httpx +import jwt +from loguru import logger + + +def solve_cap_monster(site_url: str, site_key: str, data: str) -> str: + cap_monster = capmonster_python.HCaptchaTask( + "" + ) # api key + cap_monster.set_user_agent( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/99.0.4844.82 Safari/537.36" + ) + task_id = cap_monster.create_task( + website_url=site_url, + website_key=site_key, + custom_data=data + ) + result = cap_monster.join_task_result(task_id) + return result.get("gRecaptchaResponse") + + +# def solve_2captcha(site_url: str, site_key: str, data: str) -> str: +# from twocaptcha import TwoCaptcha +# +# solver = TwoCaptcha("") +# try: +# result = solver.hcaptcha( +# sitekey=site_key, +# url=site_url, +# param1=data +# ) +# return json.loads(json.dumps(result))["code"] +# +# except Exception as e: +# print(e) + def magic_decode(string: str): with contextlib.suppress(json.JSONDecodeError): return json.loads(string) with contextlib.suppress(jwt.exceptions.DecodeError): return jwt.decode(string, options={"verify_signature": False}) - raise DecodeException + raise ValueError + @dataclass class User: - username: str = '' - password: str = '' + username: str = "" + password: str = "" + def __hash__(self): return hash(self.username) + @dataclass -class Token(): +class Token: access_token: str id_token: str expire: float created = time() + class Version: - def __init__(self): - self.versions = requests.get("https://valorant-api.com/v1/version").json()["data"] - self.valorant = self.valorant() - self.riot = self.riot() - self.sdk = self.sdk() + def __init__(self) -> None: + self.versions = httpx.get( + "https://valorant-api.com/v1/version" + ).json()["data"] - def riot(self): + @property + def riot(self) -> str: return self.versions["riotClientBuild"] - def sdk(self): - return sdk if (sdk := self.versions["riotClientVersion"].split(".")[1]) else "23.8.0.1382" - def valorant(self): + + @property + def sdk(self) -> str: + return ( + sdk if (sdk := self.versions["riotClientVersion"].split(".")[1]) + else "23.8.0.1382" + ) + + @property + def valorant(self) -> str: return self.versions["riotClientVersion"] + version = Version() + class CaptchaFlow: - def __init__(self, session, user): - self.ses = session - self.user = user + ses: httpx.Client + user: User + + def __init__(self, solver_fn: Callable[[str, str, str], str]) -> None: + self.solver_fn = solver_fn - def get_captcha_token(self): + def get_captcha_token(self) -> dict: + url = "https://authenticate.riotgames.com/api/v1/login" data = { "clientId": "riot-client", "language": "", @@ -63,11 +116,11 @@ def get_captcha_token(self): "sdkVersion": version.sdk, "type": "auth", } - url = "https://authenticate.riotgames.com/api/v1/login" - response_data = self.ses.post(url, json=data).json() - return response_data - def get_login_token(self, code: str): + return self.ses.post(url, json=data).json() + + def get_login_token(self, code: str) -> str: + url = "https://authenticate.riotgames.com/api/v1/login" data = { "riot_identity": { "captcha": f"hcaptcha {code}", @@ -78,72 +131,56 @@ def get_login_token(self, code: str): }, "type": "auth" } - url = "https://authenticate.riotgames.com/api/v1/login" response_data = self.ses.put(url, json=data).json() return response_data["success"]["login_token"] - def login_cookies(self, login_token: str): + def login_cookies(self, login_token: str) -> None: + url = "https://auth.riotgames.com/api/v1/login-token" data = { "authentication_type": "RiotAuth", "code_verifier": "", "login_token": login_token, "persist_login": False } - url = "https://auth.riotgames.com/api/v1/login-token" self.ses.post(url, json=data) - def solve_2captcha(self, data): - from twocaptcha import TwoCaptcha - sitekey = data["captcha"]["hcaptcha"]["key"] - rqdata = data["captcha"]["hcaptcha"]["data"] - solver = TwoCaptcha("") - try: - result = solver.hcaptcha( - sitekey=sitekey, - url='https://auth.riotgames.com/', - param1=rqdata - ) - return json.loads(json.dumps(result))["code"] - - except Exception as e: - print(e) - - def solve_captcha(self, data): - sitekey = data["captcha"]["hcaptcha"]["key"] - rqdata = data["captcha"]["hcaptcha"]["data"] - capmonster = capmonster_python.HCaptchaTask("") # api key - capmonster.set_user_agent("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.82 Safari/537.36") - task_id = capmonster.create_task(website_url="https://auth.riotgames.com", website_key=sitekey, custom_data=rqdata) - result = capmonster.join_task_result(task_id) - return result.get("gRecaptchaResponse") - - def captcha_flow(self): + def captcha_flow(self, session: httpx.Client, user: User) -> None: + self.ses = session + self.user = user captcha_data = self.get_captcha_token() - captcha_token = self.solve_captcha(captcha_data) + captcha_token = self.solver_fn( + "https://auth.riotgames.com", + captcha_data["captcha"]["hcaptcha"]["key"], + captcha_data["captcha"]["hcaptcha"]["data"] + ) login_token = self.get_login_token(captcha_token) self.login_cookies(login_token) + class RiotAuth: - def __init__(self, user): - self.session = self.setup_session() - self.setup_auth(self.session) - CaptchaFlow(self.session, user).captcha_flow() - self.token, self.cookies = self.get_auth_data(self.session) - self.entitlements_token = self.get_entitlement(self.session, self.token) - - def setup_session(self): - app = "rso-auth" - session = Client() - session.headers.update({ - "User-Agent": f'RiotClient/{version.riot} {app} (Windows;10;;Professional, x64)', - "Cache-Control": "no-cache", - "Accept": "application/json", - "Content-Type": "application/json" - }) - session.cookies.update({"tdid": "", "asid": "", "did": "", "clid": ""}) - return session - - def setup_auth(self, session): + __ua = f"RiotClient/{version.riot} %s (Windows;10;;Professional, x64)" + + def __init__(self, user: User, captcha: CaptchaFlow) -> None: + self.session = httpx.Client( + headers={ + "User-Agent": self.__ua % "rso-auth", + "Cache-Control": "no-cache", + "Accept": "application/json", + "Content-Type": "application/json" + }, + cookies={"tdid": "", "asid": "", "did": "", "clid": ""} + ) + logger.debug("Setup authentication") + self.setup_auth() + logger.debug("Start captcha solving") + captcha.captcha_flow(self.session, user) + logger.debug("Get Token and cookies") + self.token, self.cookies = self.get_auth_data() + logger.debug("Get entitlements") + self.entitlements_token = self.get_entitlement() + + def setup_auth(self) -> httpx.Response: + url = "https://auth.riotgames.com/api/v1/authorization" data = { "client_id": "riot-client", "nonce": token_urlsafe(16), @@ -152,50 +189,64 @@ def setup_auth(self, session): "scope": "account openid", } - url = "https://auth.riotgames.com/api/v1/authorization" - r = session.post(url, json=data) - return r - - def get_auth_data(self, session): - r = self.setup_auth(session) - cookies = dict(r.cookies) - data = r.json() - if "error" in data: raise Exception(data["error"]) - uri = data["response"]["parameters"]["uri"] - token = self.get_token(uri) - return token, cookies - - def get_token(self, uri: str) -> Token: - access_token = uri.split("access_token=")[1].split("&scope")[0] - token_id = uri.split("id_token=")[1].split("&")[0] - expires_in = uri.split("expires_in=")[1].split("&")[0] - timestamp = time() + float(expires_in) - token = Token(access_token, token_id, timestamp) - return token - - def get_entitlement(self, session, token: Token) -> str: - app = 'entitlements' + return self.session.post(url, json=data) + + def get_auth_data(self) -> tuple[Token, dict]: + r = self.setup_auth() + if "error" in (data := r.json()): + raise Exception(data["error"]) + + return self.get_token( + data["response"]["parameters"]["uri"] + ), dict(r.cookies) + + @staticmethod + def get_token(uri: str) -> Token: + query_data = parse_qs(urlparse(uri).fragment) + + return Token( + access_token=query_data["access_token"][0], + id_token=query_data["id_token"][0], + expire=time() + float(query_data["expires_in"][0]) + ) + + def get_entitlement(self) -> str: url = "https://entitlements.auth.riotgames.com/api/token/v1" headers = { "Accept-Encoding": "gzip, deflate, br", - "Authorization": f"Bearer {token.access_token}", - "User-Agent": f'RiotClient/{version.riot} {app} (Windows;10;;Professional, x64)', + "Authorization": f"Bearer {self.token.access_token}", + "User-Agent": self.__ua % "entitlements", } - r = session.post(url, headers=headers, json={}) - data = magic_decode(r.text) - return data["entitlements_token"] + r = self.session.post(url, headers=headers, json={}) + + return magic_decode(r.text)["entitlements_token"] + -def get_user_info(session, token: Token) -> str: +def get_user_info(session: httpx.Client, token: Token) -> str: headers = { "Accept-Encoding": "gzip, deflate, br", "Authorization": f"Bearer {token.access_token}", } - r = session.post("https://auth.riotgames.com/userinfo", headers=headers, json={}) + r = session.post( + "https://auth.riotgames.com/userinfo", headers=headers, json={} + ) return magic_decode(r.text) +def login_and_get_cookies(login: str, password: str) -> dict: + riot_obj = RiotAuth( + user=User(login, password), + captcha=CaptchaFlow(solve_cap_monster) + ) + return riot_obj.cookies + + if __name__ == "__main__": - user = User("username", "password") - client = RiotAuth(user) - user = get_user_info(client.session, client.token) - print(user) + # for c in login_and_get_cookies("uesrname", "password").jar: + # print(c.name, c.value) + riot_user = User("uesrname", "password") + riot_captcha = CaptchaFlow(solve_cap_monster) + client = RiotAuth(riot_user, riot_captcha) + print(client.cookies) + # riot_user = get_user_info(client.session, client.token) + # print(riot_user) From e83834ceb907f0c81f7651f3045e6d5352415a8f Mon Sep 17 00:00:00 2001 From: imDMG Date: Wed, 1 Nov 2023 18:22:21 +0500 Subject: [PATCH 2/2] Create requirements.txt --- requirements.txt | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 requirements.txt diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ffe8fb3 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +capmonster_python +httpx +pyjwt +loguru