diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000000..3ffde9e6d0 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "manager"] + path = manager + url = https://github.com/wzdnzd/proxy-manager.git diff --git a/manager b/manager new file mode 160000 index 0000000000..e3d93e51e7 --- /dev/null +++ b/manager @@ -0,0 +1 @@ +Subproject commit e3d93e51e762d269546e299490ff114031272e0b diff --git a/subscribe/airport.py b/subscribe/airport.py index b2b6d39b92..e10e444a4f 100644 --- a/subscribe/airport.py +++ b/subscribe/airport.py @@ -540,7 +540,7 @@ def parse( url=self.sub, headers=headers, retry=retry, - timeout=30, + timeout=120, trace=trace, interval=1, max_size=15 * 1024 * 1024, @@ -777,6 +777,7 @@ def clean_text(document: str) -> str: f"{artifact}.yaml", "clash", True, + True, ignore, ) if not success: diff --git a/subscribe/clash.py b/subscribe/clash.py index 193559f585..2a53629b4e 100644 --- a/subscribe/clash.py +++ b/subscribe/clash.py @@ -521,6 +521,20 @@ def verify(item: dict, mihomo: bool = True) -> bool: elif item["type"] == "vless": authentication = "uuid" + + # see: https://github.com/MetaCubeX/mihomo/blob/Alpha/transport/vless/encryption/factory.go#L12 + encryption = utils.trim(item.get("encryption", "")) + if encryption not in ["", "none"]: + parts = encryption.split(".") + + # Must be: mlkem768x25519plus..<...>.<...> (len >= 4) + if ( + len(parts) < 4 + or parts[0] != "mlkem768x25519plus" + or parts[1] not in ("native", "xorpub", "random") + ): + return False + network = utils.trim(item.get("network", "tcp")) # mihomo: https://wiki.metacubex.one/config/proxies/vless/#network diff --git a/subscribe/location.py b/subscribe/location.py index 1a00668397..0ac38cbae6 100644 --- a/subscribe/location.py +++ b/subscribe/location.py @@ -13,6 +13,8 @@ import sys import time import urllib +import urllib.parse +import urllib.request from collections import defaultdict from dataclasses import dataclass @@ -739,6 +741,23 @@ def make_proxy_request( logger.warning("No port provided for proxy") return False, {} + def _build_headers(url: str) -> dict: + result = urllib.parse.urlparse(url) + base = f"{result.scheme}://{result.netloc}" if result.scheme and result.netloc else "" + + headers = { + "User-Agent": utils.USER_AGENT, + "Accept": "application/json, text/plain, */*", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", + "Cache-Control": "no-cache", + "Pragma": "no-cache", + "Connection": "close", + "Referer": f"{base}/" if base else url, + "Origin": base if base else url, + } + + return headers + # Configure the proxy for the request proxy_url = f"http://127.0.0.1:{port}" proxies_config = {"http": proxy_url, "https": proxy_url} @@ -746,16 +765,12 @@ def make_proxy_request( # Configure proxy handler proxy_handler = urllib.request.ProxyHandler(proxies_config) - # Build opener with proxy handler - opener = urllib.request.build_opener(proxy_handler) + # Build opener with proxy handler and custom SSL context. + # Using explicit Request(headers=...) is more stable than opener.addheaders for proxy HTTPS requests. + opener = urllib.request.build_opener(proxy_handler, urllib.request.HTTPSHandler(context=utils.CTX)) + default_headers = _build_headers(url) if headers and isinstance(headers, dict): - opener.addheaders = [(k, v) for k, v in headers.items() if k] - else: - opener.addheaders = [ - ("User-Agent", utils.USER_AGENT), - ("Accept", "application/json"), - ("Connection", "close"), - ] + default_headers.update({k: v for k, v in headers.items() if k and v is not None}) # Try to get response with retry and backoff attempt, success, data = 0, False, None @@ -767,7 +782,8 @@ def make_proxy_request( time.sleep(wait_time) # Make request - response = opener.open(url, timeout=timeout) + request = urllib.request.Request(url=url, headers=default_headers, method="GET") + response = opener.open(request, timeout=timeout) if response.getcode() == 200: content = response.read().decode("utf-8") data = json.loads(content) if deserialize else content @@ -802,10 +818,10 @@ def get_ipv4(port: int, max_retries: int = 5) -> str: # Online API services for IP location LOCATION_API_SERVICES = [ {"url": "https://ipinfo.io", "country_key": "country"}, + {"url": "https://api.ip2location.io", "country_key": "country_code"}, {"url": "https://ipapi.co/json/", "country_key": "country_code"}, {"url": "https://ipwho.is", "country_key": "country_code"}, - {"url": "https://freeipapi.com/api/json", "country_key": "countryCode"}, - {"url": "https://api.country.is", "country_key": "country"}, + {"url": "https://free.freeipapi.com/api/json", "country_key": "countryCode"}, {"url": "https://api.ip.sb/geoip", "country_key": "country_code"}, ] @@ -818,7 +834,7 @@ def random_delay(min_delay: float = 0.01, max_delay: float = 0.5): time.sleep(random.uniform(min_delay, max_delay)) -def check_residential(proxy: dict, port: int, api_key: str = "", use_ipinfo: bool = True) -> ProxyQueryResult: +def check_residential(proxy: dict, port: int, api_key: str = "", ip_library: str = "iplark") -> ProxyQueryResult: """ Check if a proxy is residential by making a request through it @@ -826,39 +842,93 @@ def check_residential(proxy: dict, port: int, api_key: str = "", use_ipinfo: boo proxy: The proxy information dict port: The port of the proxy api_key: Optional API key for ipapi.is. Uses free tier if not provided - use_ipinfo: Whether to use ipinfo.io instead of ipapi.is, defaults to True + ip_library: IP query provider, supported: iplark/ipinfo/ipapi (default: iplark) Returns: ProxyQueryResult: Complete proxy query result """ - def _get_ipapi_url(key: str = "") -> str: - url, key = "https://api.ipapi.is", utils.trim(key) - if key: - url += f"?key={key}" - return url - - def _get_ipinfo_url(port: int, name: str) -> str: - # First, get the IP address - success, content = make_proxy_request( - port=port, - url="https://ipinfo.io/ip", - max_retries=2, - timeout=15, - deserialize=False, - ) - if not success or not content: - logger.warning(f"Failed to get IP from ipinfo.io for proxy {name}") - return "" - - # Extract IP from response - ip = utils.trim(content) - if not ip: - logger.warning(f"Invalid IP address from ipinfo.io for proxy {name}") - return "" + def _build_url(provider: str, port: int, name: str, api_key: str) -> str: + if provider == "ipinfo": + # First, get the IP address + success, content = make_proxy_request( + port=port, + url="https://ipinfo.io/ip", + max_retries=2, + timeout=15, + deserialize=False, + ) + if not success or not content: + logger.warning(f"Failed to get IP from ipinfo.io for proxy {name}") + return "" + + # Extract IP from response + ip = utils.trim(content) + if not ip: + logger.warning(f"Invalid IP address from ipinfo.io for proxy {name}") + return "" + + # Now get detailed information using the IP + return f"https://ipinfo.io/widget/demo/{ip}" + elif provider == "ipapi": + url, key = "https://api.ipapi.is", utils.trim(api_key) + if key: + url += f"?key={key}" + return url + elif provider == "ippure": + return "https://my.ippure.com/v1/info" + + return "https://iplark.com/ipapi/public/ipinfo" + + def _get_providers(preferred: str) -> list[str]: + candidates = ["iplark", "ipinfo", "ippure", "ipapi"] + + library = utils.trim(preferred).lower() + if library not in candidates: + library = "iplark" + + providers = [library] + for item in candidates: + if item not in providers: + providers.append(item) + + return providers + + def _extract_data(provider: str, response: dict) -> tuple[dict, str, str, str]: + data, country_code, company_type, asn_type = {}, "", "", "" + + if provider == "ipinfo": + data = response.get("data", {}) if isinstance(response, dict) else {} + country_code = data.get("country", "") + company_type = data.get("company", {}).get("type", "") + asn_type = data.get("asn", {}).get("type", "") + elif provider == "ipapi": + data = response if isinstance(response, dict) else {} + country_code = data.get("location", {}).get("country_code", "") + company_type = data.get("company", {}).get("type", "") + asn_type = data.get("asn", {}).get("type", "") + elif provider == "ippure": + data = response if isinstance(response, dict) else {} + country_code = data.get("countryCode", "") + + flag = data.get("isResidential", False) + if flag: + company_type, asn_type = "isp", "isp" + else: + company_type, asn_type = "hosting", "hosting" + else: + data = response if isinstance(response, dict) else {} + country_code = data.get("country_code", "") + + node_type = utils.trim(data.get("type", "")).lower() + if node_type == "isp": + company_type, asn_type = "isp", "isp" + elif node_type == "business": + company_type, asn_type = "business", "business" + else: + company_type, asn_type = "hosting", "hosting" - # Now get detailed information using the IP - return f"https://ipinfo.io/widget/demo/{ip}" + return data, utils.trim(country_code).upper(), utils.trim(company_type).lower(), utils.trim(asn_type).lower() name = proxy.get("name", "") result = ProxyInfo(name=name) @@ -871,32 +941,36 @@ def _get_ipinfo_url(port: int, name: str) -> str: random_delay() try: - url = "" - if use_ipinfo: - url = _get_ipinfo_url(port=port, name=name) + providers = _get_providers(ip_library) + success, response, provider = False, None, "" + + for idx, item in enumerate(providers): + url = _build_url(provider=item, port=port, name=name, api_key=api_key) + if not url: + continue - if not url: - url = _get_ipapi_url(key=api_key) - use_ipinfo = False + # Call API for IP information through the proxy + success, response = make_proxy_request(port=port, url=url, max_retries=2, timeout=12) + if success: + provider = item + break - # Call API for IP information through the proxy - success, response = make_proxy_request(port=port, url=url, max_retries=2, timeout=12) + if idx < len(providers) - 1: + fallback = providers[idx + 1] + logger.warning(f"Failed to query {url} for proxy {name}, provider={item}, trying fallback: {fallback}") + else: + logger.warning(f"Failed to query {url} for proxy {name}, provider={item}") # Parse data from response if success: try: - data = response.get("data", {}) if use_ipinfo else response - - # Extract country code from data - if use_ipinfo: - country_code = data.get("country", "") - else: - country_code = data.get("location", {}).get("country_code", "") + data, country_code, company_type, asn_type = _extract_data(provider, response) - result.country = ISO_TO_CHINESE.get(country_code, "") if country_code else "" + if country_code: + result.country = ISO_TO_CHINESE.get(country_code, "") - company_type = data.get("company", {}).get("type", "") - asn_type = data.get("asn", {}).get("type", "") + if not result.country: + result.country = utils.trim(data.get("country_zh", "") or data.get("country", "")) # Check if it's residential (both company and asn type should be "isp") if company_type == "isp" and asn_type == "isp": @@ -905,9 +979,9 @@ def _get_ipinfo_url(port: int, name: str) -> str: result.ip_type = "business" except Exception as e: - logger.error(f"Error parsing {url} response for proxy {name}: {str(e)}") + logger.error(f"Error parsing response for proxy {name}: {str(e)}") else: - logger.warning(f"Failed to query {url} for proxy {name}") + logger.warning(f"Failed to query residential info for proxy {name} with providers: {providers}") # Determine if query was successful flag = result.country != "" or result.ip_type != "" diff --git a/subscribe/process.py b/subscribe/process.py index cc98014986..95399bfe8a 100644 --- a/subscribe/process.py +++ b/subscribe/process.py @@ -62,6 +62,7 @@ def load_configs( only_check: bool = False, num_threads: int = 0, display: bool = True, + retry: int = 3, ) -> ProcessConfig: def parse_config(config: dict) -> None: tasks.extend(config.get("domains", [])) @@ -273,7 +274,7 @@ def verify(storage: dict, groups: dict) -> bool: url, ): headers = {"User-Agent": utils.USER_AGENT, "Referer": url} - content = utils.http_get(url=url, headers=headers) + content = utils.http_get(url=url, headers=headers, retry=max(retry, 1), timeout=120) if not content: logger.error(f"cannot fetch config from remote, url: {utils.hide(url=url)}") else: @@ -509,6 +510,7 @@ def aggregate(args: argparse.Namespace) -> None: clash_bin, subconverter_bin = executable.which_bin() display = not args.invisible + retry = min(max(1, args.retry), 10) # parse config server = utils.trim(args.server) or os.environ.get("SUBSCRIBE_CONF", "").strip() @@ -517,11 +519,11 @@ def aggregate(args: argparse.Namespace) -> None: only_check=args.check, num_threads=args.num, display=display, + retry=retry, ) storages = process_config.storage or {} pushtool = push.get_instance(config=push.PushConfig.from_dict(storages)) - retry = min(max(1, args.retry), 10) # generate tasks tasks, groups, sites = assign( diff --git a/subscribe/subconverter.py b/subscribe/subconverter.py index fa4b552ad0..d7fa3bc07a 100644 --- a/subscribe/subconverter.py +++ b/subscribe/subconverter.py @@ -103,7 +103,7 @@ def generate_conf( lines.extend(["emoji=false", "add_emoji=false"]) if ignore_exclude: - lines.append("exclude=流量|过期|剩余|时间|Expire|Traffic") + lines.append("exclude=[到过]期|Expire|Traffic|剩余流量|时间|官网|产品|联系") lines.append("\n") content = "\n".join(lines) diff --git a/subscribe/utils.py b/subscribe/utils.py index 957731a715..75f8962acb 100644 --- a/subscribe/utils.py +++ b/subscribe/utils.py @@ -35,7 +35,7 @@ CTX.verify_mode = ssl.CERT_NONE USER_AGENT = ( - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36" + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36" ) diff --git a/tools/socks-checker.py b/tools/socks-checker.py index c7bbb26388..9217fb0f42 100644 --- a/tools/socks-checker.py +++ b/tools/socks-checker.py @@ -276,6 +276,32 @@ } +def country_flag_emoji(country_code: str) -> str: + if not country_code or len(country_code) != 2: + return "" + + code = country_code.upper() + if not code.isalpha(): + return "" + + return chr(0x1F1E6 + ord(code[0]) - ord("A")) + chr(0x1F1E6 + ord(code[1]) - ord("A")) + + +def country_name_zh(country_code: str) -> str: + if not country_code: + return "" + + return COUNTRY_NAME_ZH.get(country_code.upper(), "") + + +def short_company_name(value: str) -> str: + if not value: + return "UNKNOWN" + + parts = [part for part in re.split(r"[\s,\.\-_@;:]+", value.strip()) if part] + return parts[0].upper() if parts else "UNKNOWN" + + @dataclass class ProxyInfo: protocol: str @@ -309,6 +335,301 @@ def from_proxy(cls, proxy_info: ProxyInfo) -> "TestResult": ) +@dataclass +class IpLookupResult: + ip: Optional[str] + data: Optional[Dict] + error: Optional[str] = None + + +class IpLibrary: + name: str = "" + + async def lookup( + self, session: aiohttp.ClientSession, proxy_info: ProxyInfo, retries: int, timeout: int + ) -> IpLookupResult: + raise NotImplementedError + + def build_remark(self, data: Dict, include_asn_name: bool) -> str: + raise NotImplementedError + + async def _make_request( + self, session: aiohttp.ClientSession, url: str, retries: int, timeout: int + ) -> Tuple[Optional[Dict], Optional[str]]: + error = None + for attempt in range(1, retries + 1): + try: + async with session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)) as response: + if response.status == 200: + data = await response.json() + if isinstance(data, dict): + return data, None + + error = "Invalid JSON response" + else: + error = f"HTTP {response.status}" + except asyncio.TimeoutError: + error = "Timeout" + except Exception as e: + error = str(e)[:100] + + if attempt < retries: + await asyncio.sleep(attempt) + + return None, error + + async def _query( + self, + session: aiohttp.ClientSession, + proxy_info: ProxyInfo, + retries: int, + timeout: int, + source: str, + fetcher, + ) -> IpLookupResult: + data, error = await fetcher(session, retries, timeout) + if not data: + host = "" if not proxy_info else proxy_info.host + return IpLookupResult(None, None, error or f"Failed to get IP info from {source}, host: {host}") + + return self._verify(data, source) + + @staticmethod + def _verify(data: Dict, source: str) -> IpLookupResult: + address = (data.get("ip") or "").strip() + if not address: + return IpLookupResult(None, None, f"Invalid IP from {source}") + + try: + ipaddress.ip_address(address) + except ValueError: + return IpLookupResult(None, None, f"Invalid IP from {source}, ip: {address}") + + return IpLookupResult(address, data, None) + + @staticmethod + def _format_remark( + country_code: str, + country: str, + label: str, + include_asn_name: bool, + company_name: str, + detail: str = "", + ) -> str: + flag = country_flag_emoji(country_code) + base = f"{flag} {country}{label}".strip() + + if include_asn_name and company_name: + if detail: + return f"{base} [{company_name}::{detail}]".strip() + + return f"{base} [{company_name}]".strip() + + return base + + +class IpinfoLibrary(IpLibrary): + name = "ipinfo" + + async def lookup( + self, session: aiohttp.ClientSession, proxy_info: ProxyInfo, retries: int, timeout: int + ) -> IpLookupResult: + host = proxy_info.host if proxy_info else "" + address = await self._resolve_ip(session, host, retries, timeout) + if not address: + return IpLookupResult(None, None, f"Failed to get IP from ipinfo.io/ip, host: {host}") + + data, error = await self._fetch_ipinfo(session, address, retries, timeout) + if not data: + return IpLookupResult(address, None, error or f"Failed to get IP info from ipinfo.io, ip: {address}") + + return IpLookupResult(address, data, None) + + def build_remark(self, data: Dict, include_asn_name: bool) -> str: + country_code = (data.get("country") or "").upper() + flag = country_flag_emoji(country_code) + + asn_info = data.get("asn", {}) or {} + company_info = data.get("company", {}) or {} + asn_type = (asn_info.get("type") or "").lower() + company_type = (company_info.get("type") or "").lower() + + asn_name = (asn_info.get("domain") or "").strip() + if not asn_name or re.match(r"^as\d+\.", asn_name, flags=re.I): + asn_name = (asn_info.get("name") or "").strip() + + company_name = short_company_name(asn_name) + + if asn_type == "isp" and company_type == "isp": + label = "家宽" + elif asn_type == "isp" or company_type == "isp": + label = "商宽" + elif asn_type == "edu" or company_type == "edu": + label = "教育" + else: + label = "" + + country = country_name_zh(country_code) or country_code or "未知" + base = f"{flag} {country}{label}".strip() + if include_asn_name and company_name: + return f"{base} [{company_name}]".strip() + + return base + + @staticmethod + def _is_ipv4(host: str) -> bool: + if not host: + return False + try: + return isinstance(ipaddress.ip_address(host), ipaddress.IPv4Address) + except ValueError: + return False + + async def _resolve_ip(self, session: aiohttp.ClientSession, host: str, retries: int, timeout: int) -> Optional[str]: + if self._is_ipv4(host): + return host + + url = "https://ipinfo.io/ip" + for attempt in range(1, retries + 1): + try: + async with session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)) as response: + if response.status == 200: + text = (await response.text()).strip() + try: + ipaddress.ip_address(text) + return text + except ValueError: + pass + except asyncio.TimeoutError: + pass + except Exception: + pass + + if attempt < retries: + await asyncio.sleep(attempt) + + return None + + async def _fetch_ipinfo( + self, session: aiohttp.ClientSession, address: str, retries: int, timeout: int + ) -> Tuple[Optional[Dict], Optional[str]]: + url = f"https://ipinfo.io/widget/demo/{address}" + data, error = await self._make_request(session, url, retries, timeout) + if not data: + return None, error + + return data.get("data", data), None + + +class IppureLibrary(IpLibrary): + name = "ippure" + + async def lookup( + self, session: aiohttp.ClientSession, proxy_info: ProxyInfo, retries: int, timeout: int + ) -> IpLookupResult: + return await self._query( + session=session, + proxy_info=proxy_info, + retries=retries, + timeout=timeout, + source=self.name, + fetcher=self._fetch_ippure, + ) + + def build_remark(self, data: Dict, include_asn_name: bool) -> str: + residential = data.get("isResidential") + label = "家宽" if residential is True else "" + + country_code = (data.get("countryCode") or "").upper() + country = country_name_zh(country_code) or (data.get("country") or "未知") + + company_name = short_company_name(data.get("asOrganization") or "") + score = str(data.get("fraudScore")).zfill(3) if "fraudScore" in data else "NUL" + + return self._format_remark( + country_code=country_code, + country=country, + label=label, + include_asn_name=include_asn_name, + company_name=company_name, + detail=score, + ) + + async def _fetch_ippure( + self, session: aiohttp.ClientSession, retries: int, timeout: int + ) -> Tuple[Optional[Dict], Optional[str]]: + url = "https://my.ippure.com/v1/info" + return await self._make_request(session, url, retries, timeout) + + +class IPLarkLibrary(IpLibrary): + name = "iplark" + + async def lookup( + self, session: aiohttp.ClientSession, proxy_info: ProxyInfo, retries: int, timeout: int + ) -> IpLookupResult: + return await self._query( + session=session, + proxy_info=proxy_info, + retries=retries, + timeout=timeout, + source=self.name, + fetcher=self._fetch_iplark, + ) + + def build_remark(self, data: Dict, include_asn_name: bool) -> str: + node_type = (data.get("type") or "").strip().lower() + if node_type == "isp": + label = "家宽" + elif node_type == "business": + label = "商宽" + elif node_type == "education": + label = "教育" + else: + label = "" + + country_code = (data.get("country_code") or "").upper() + country = country_name_zh(country_code) or (data.get("country_zh") or data.get("country") or "未知") + + # asn = str(data.get("asn") or "").strip() + # detail = f"AS{asn}" if asn else "NUL" + detail = "" + + company_name = short_company_name(data.get("organization") or "") + + return self._format_remark( + country_code=country_code, + country=country, + label=label, + include_asn_name=include_asn_name, + company_name=company_name, + detail=detail, + ) + + async def _fetch_iplark( + self, session: aiohttp.ClientSession, retries: int, timeout: int + ) -> Tuple[Optional[Dict], Optional[str]]: + url = "https://iplark.com/ipapi/public/ipinfo" + return await self._make_request(session, url, retries, timeout) + + +IP_LIBRARIES = { + "iplark": IPLarkLibrary, + "ipinfo": IpinfoLibrary, + "ippure": IppureLibrary, +} + + +def get_ip_library(name: str) -> IpLibrary: + key = (name or "iplark").strip().lower() + library = IP_LIBRARIES.get(key) + if not library: + supported = ", ".join(sorted(IP_LIBRARIES.keys())) + raise ValueError(f"Unsupported ip library: {name}. Supported: {supported}") + + return library() + + class ProxyChecker: def __init__( self, @@ -316,6 +637,7 @@ def __init__( format_pattern: Optional[str] = None, default_port: int = 1080, include_asn_name: bool = False, + ip_library: str = "iplark", ): """ 初始化代理检测器 @@ -332,6 +654,7 @@ def __init__( self.format_pattern = format_pattern self.default_port = default_port self.include_asn_name = include_asn_name + self.ip_library = get_ip_library(ip_library) self.results: List[TestResult] = [] self.summary: Optional[Dict[str, float]] = None @@ -504,19 +827,14 @@ async def test_proxy(self, proxy_info: ProxyInfo, retries: int = 3) -> TestResul try: connector = ProxyConnector.from_url(proxy_url) async with aiohttp.ClientSession(connector=connector) as session: - ip_address = await self._resolve_ip_with_proxy(session, proxy_info.host, retries) - if not ip_address: - result.error = "Failed to get IP from ipinfo.io/ip" - return result - - ip_data, ip_error = await self._fetch_ipinfo(session, ip_address, retries) - if not ip_data: - result.error = ip_error or "Failed to get IP info from ipinfo.io" + lookup = await self.ip_library.lookup(session, proxy_info, retries, self.timeout) + if not lookup.ip or not lookup.data: + result.error = lookup.error or f"Failed to get IP info from {self.ip_library.name}" return result - remark = self._build_remark_from_ipinfo(ip_data) + remark = self.ip_library.build_remark(lookup.data, self.include_asn_name) result.remark = remark - result.ip = ip_address + result.ip = lookup.ip result.status = "success" result.response_time = round((datetime.now() - start_time).total_seconds(), 2) result.error = None @@ -531,77 +849,6 @@ async def test_proxy(self, proxy_info: ProxyInfo, retries: int = 3) -> TestResul result.error = str(e)[:100] return result - def _is_ipv4(self, host: str) -> bool: - if not host: - return False - try: - return isinstance(ipaddress.ip_address(host), ipaddress.IPv4Address) - except ValueError: - return False - - async def _resolve_ip_with_proxy(self, session: aiohttp.ClientSession, host: str, retries: int) -> Optional[str]: - if self._is_ipv4(host): - return host - - url = "https://ipinfo.io/ip" - for attempt in range(1, retries + 1): - try: - async with session.get(url, timeout=aiohttp.ClientTimeout(total=self.timeout)) as response: - if response.status == 200: - text = (await response.text()).strip() - try: - ipaddress.ip_address(text) - return text - except ValueError: - pass - except asyncio.TimeoutError: - pass - except Exception: - pass - - if attempt < retries: - await asyncio.sleep(attempt) - - return None - - async def _fetch_ipinfo( - self, session: aiohttp.ClientSession, ip_address: str, retries: int - ) -> Tuple[Optional[Dict], Optional[str]]: - url = f"https://ipinfo.io/widget/demo/{ip_address}" - last_error = None - for attempt in range(1, retries + 1): - try: - async with session.get(url, timeout=aiohttp.ClientTimeout(total=self.timeout)) as response: - if response.status == 200: - data = await response.json() - if isinstance(data, dict): - return data.get("data", data), None - last_error = "Invalid JSON response" - else: - last_error = f"HTTP {response.status}" - except asyncio.TimeoutError: - last_error = "Timeout" - except Exception as e: - last_error = str(e)[:100] - - if attempt < retries: - await asyncio.sleep(attempt) - - return None, last_error - - def _country_flag_emoji(self, country_code: str) -> str: - if not country_code or len(country_code) != 2: - return "" - code = country_code.upper() - if not code.isalpha(): - return "" - return chr(0x1F1E6 + ord(code[0]) - ord("A")) + chr(0x1F1E6 + ord(code[1]) - ord("A")) - - def _country_name_zh(self, country_code: str) -> str: - if not country_code: - return "" - return COUNTRY_NAME_ZH.get(country_code.upper(), "") - def _format_standard(self, proxy_info: ProxyInfo, remark: str) -> str: auth = "" if proxy_info.username or proxy_info.password: @@ -629,40 +876,6 @@ def _format_yaml_line(self, result: TestResult) -> str: parts.append(f"password: {self._yaml_quote(result.password)}") return " - {" + ", ".join(parts) + "}" - def _build_remark_from_ipinfo(self, ip_data: Dict) -> str: - country_code = (ip_data.get("country") or "").upper() - flag = self._country_flag_emoji(country_code) - country_display = self._country_name_zh(country_code) or "未知" - - asn_info = ip_data.get("asn", {}) or {} - company_info = ip_data.get("company", {}) or {} - asn_type = (asn_info.get("type") or "").lower() - company_type = (company_info.get("type") or "").lower() - - asn_name = (asn_info.get("domain") or "").strip() - if not asn_name: - asn_name = (asn_info.get("name") or "").strip() - - if asn_name: - parts = [p for p in re.split(r"[\s,\.]+", asn_name) if p] - company_name = parts[0].upper() if parts else "UNKNOWN" - else: - company_name = "UNKNOWN" - - if asn_type == "isp" and company_type == "isp": - label = "家宽" - elif asn_type == "isp" or company_type == "isp": - label = "商宽" - elif asn_type == "edu" or company_type == "edu": - label = "教育" - else: - label = "" - - base = f"{flag} {country_display}{label}".strip() - if self.include_asn_name and company_name: - return f"{base} [{company_name}]".strip() - return base - def _convert(self, input_file: str, output_file: str, output_format: str, digits: int = 2) -> None: proxies = read_proxies(input_file) if not proxies: @@ -765,7 +978,7 @@ async def check_proxies( output_handle = None if not output_file: - output_file = f'{output_format}.{"txt" if output_format == "v2ray" else "yaml"}' + output_file = f'{output_format}-{self.ip_library}.{"txt" if output_format == "v2ray" else "yaml"}' output_handle = open(output_file, "w", encoding="utf-8") if output_format == "clash": @@ -797,12 +1010,22 @@ async def test_with_semaphore(proxy_info): # 实时输出结果 status_icon = "✓" if result.status == "success" else "✗" if result.status == "success": - print(f"{status_icon} {result.original[:60]}... | {result.response_time}s | IP: {result.ip}") + print( + f"{status_icon} {result.original[:60]}... | {result.response_time}s | Export IP: {result.ip}".encode( + "utf-8", errors="ignore" + ).decode( + "utf-8" + ) + ) if write_queue: line = self._format_yaml_line(result) if output_format == "clash" else result.proxy await write_queue.put(line + "\n") else: - print(f"{status_icon} {result.original[:60]}... | {result.error}") + print( + f"{status_icon} {result.original[:60]}... | {result.error}".encode( + "utf-8", errors="ignore" + ).decode("utf-8") + ) async with stats_lock: if result.status == "success": @@ -1089,6 +1312,14 @@ async def main(): help="在备注中追加 ASN 名称 (默认不追加)", ) + parser.add_argument( + "--ip-library", + dest="ip_library", + choices=sorted(IP_LIBRARIES.keys()), + default="iplark", + help="IP地址数据库服务商: iplark、ipinfo 或 ippure (默认: iplark)", + ) + args = parser.parse_args() # 获取代理列表 @@ -1116,6 +1347,7 @@ async def main(): format_pattern=args.format_pattern, default_port=args.default_port, include_asn_name=args.include_asn_name, + ip_library=args.ip_library, ) await checker.check_proxies(