Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "manager"]
path = manager
url = https://github.com/wzdnzd/proxy-manager.git
1 change: 1 addition & 0 deletions manager
Submodule manager added at e3d93e
3 changes: 2 additions & 1 deletion subscribe/airport.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ def parse(
url=self.sub,
headers=headers,
retry=retry,
timeout=30,
timeout=120,
trace=trace,
interval=1,
max_size=15 * 1024 * 1024,
Expand Down Expand Up @@ -777,6 +777,7 @@ def clean_text(document: str) -> str:
f"{artifact}.yaml",
"clash",
True,
True,
ignore,
)
if not success:
Expand Down
14 changes: 14 additions & 0 deletions subscribe/clash.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,20 @@ def verify(item: dict, mihomo: bool = True) -> bool:

elif item["type"] == "vless":
authentication = "uuid"

# see: https://github.com/MetaCubeX/mihomo/blob/Alpha/transport/vless/encryption/factory.go#L12
encryption = utils.trim(item.get("encryption", ""))
if encryption not in ["", "none"]:
parts = encryption.split(".")

# Must be: mlkem768x25519plus.<mode>.<...>.<...> (len >= 4)
if (
len(parts) < 4
or parts[0] != "mlkem768x25519plus"
or parts[1] not in ("native", "xorpub", "random")
):
return False

network = utils.trim(item.get("network", "tcp"))

# mihomo: https://wiki.metacubex.one/config/proxies/vless/#network
Expand Down
194 changes: 134 additions & 60 deletions subscribe/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import sys
import time
import urllib
import urllib.parse
import urllib.request
from collections import defaultdict
from dataclasses import dataclass

Expand Down Expand Up @@ -739,23 +741,36 @@ def make_proxy_request(
logger.warning("No port provided for proxy")
return False, {}

def _build_headers(url: str) -> dict:
result = urllib.parse.urlparse(url)
base = f"{result.scheme}://{result.netloc}" if result.scheme and result.netloc else ""

headers = {
"User-Agent": utils.USER_AGENT,
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"Connection": "close",
"Referer": f"{base}/" if base else url,
"Origin": base if base else url,
}

return headers

# Configure the proxy for the request
proxy_url = f"http://127.0.0.1:{port}"
proxies_config = {"http": proxy_url, "https": proxy_url}

# Configure proxy handler
proxy_handler = urllib.request.ProxyHandler(proxies_config)

# Build opener with proxy handler
opener = urllib.request.build_opener(proxy_handler)
# Build opener with proxy handler and custom SSL context.
# Using explicit Request(headers=...) is more stable than opener.addheaders for proxy HTTPS requests.
opener = urllib.request.build_opener(proxy_handler, urllib.request.HTTPSHandler(context=utils.CTX))
default_headers = _build_headers(url)
if headers and isinstance(headers, dict):
opener.addheaders = [(k, v) for k, v in headers.items() if k]
else:
opener.addheaders = [
("User-Agent", utils.USER_AGENT),
("Accept", "application/json"),
("Connection", "close"),
]
default_headers.update({k: v for k, v in headers.items() if k and v is not None})

# Try to get response with retry and backoff
attempt, success, data = 0, False, None
Expand All @@ -767,7 +782,8 @@ def make_proxy_request(
time.sleep(wait_time)

# Make request
response = opener.open(url, timeout=timeout)
request = urllib.request.Request(url=url, headers=default_headers, method="GET")
response = opener.open(request, timeout=timeout)
if response.getcode() == 200:
content = response.read().decode("utf-8")
data = json.loads(content) if deserialize else content
Expand Down Expand Up @@ -802,10 +818,10 @@ def get_ipv4(port: int, max_retries: int = 5) -> str:
# Online API services for IP location
LOCATION_API_SERVICES = [
{"url": "https://ipinfo.io", "country_key": "country"},
{"url": "https://api.ip2location.io", "country_key": "country_code"},
{"url": "https://ipapi.co/json/", "country_key": "country_code"},
{"url": "https://ipwho.is", "country_key": "country_code"},
{"url": "https://freeipapi.com/api/json", "country_key": "countryCode"},
{"url": "https://api.country.is", "country_key": "country"},
{"url": "https://free.freeipapi.com/api/json", "country_key": "countryCode"},
{"url": "https://api.ip.sb/geoip", "country_key": "country_code"},
]

Expand All @@ -818,47 +834,101 @@ def random_delay(min_delay: float = 0.01, max_delay: float = 0.5):
time.sleep(random.uniform(min_delay, max_delay))


def check_residential(proxy: dict, port: int, api_key: str = "", use_ipinfo: bool = True) -> ProxyQueryResult:
def check_residential(proxy: dict, port: int, api_key: str = "", ip_library: str = "iplark") -> ProxyQueryResult:
"""
Check if a proxy is residential by making a request through it

Args:
proxy: The proxy information dict
port: The port of the proxy
api_key: Optional API key for ipapi.is. Uses free tier if not provided
use_ipinfo: Whether to use ipinfo.io instead of ipapi.is, defaults to True
ip_library: IP query provider, supported: iplark/ipinfo/ipapi (default: iplark)

Returns:
ProxyQueryResult: Complete proxy query result
"""

def _get_ipapi_url(key: str = "") -> str:
url, key = "https://api.ipapi.is", utils.trim(key)
if key:
url += f"?key={key}"
return url

def _get_ipinfo_url(port: int, name: str) -> str:
# First, get the IP address
success, content = make_proxy_request(
port=port,
url="https://ipinfo.io/ip",
max_retries=2,
timeout=15,
deserialize=False,
)
if not success or not content:
logger.warning(f"Failed to get IP from ipinfo.io for proxy {name}")
return ""

# Extract IP from response
ip = utils.trim(content)
if not ip:
logger.warning(f"Invalid IP address from ipinfo.io for proxy {name}")
return ""
def _build_url(provider: str, port: int, name: str, api_key: str) -> str:
if provider == "ipinfo":
# First, get the IP address
success, content = make_proxy_request(
port=port,
url="https://ipinfo.io/ip",
max_retries=2,
timeout=15,
deserialize=False,
)
if not success or not content:
logger.warning(f"Failed to get IP from ipinfo.io for proxy {name}")
return ""

# Extract IP from response
ip = utils.trim(content)
if not ip:
logger.warning(f"Invalid IP address from ipinfo.io for proxy {name}")
return ""

# Now get detailed information using the IP
return f"https://ipinfo.io/widget/demo/{ip}"
elif provider == "ipapi":
url, key = "https://api.ipapi.is", utils.trim(api_key)
if key:
url += f"?key={key}"
return url
elif provider == "ippure":
return "https://my.ippure.com/v1/info"

return "https://iplark.com/ipapi/public/ipinfo"

def _get_providers(preferred: str) -> list[str]:
candidates = ["iplark", "ipinfo", "ippure", "ipapi"]

library = utils.trim(preferred).lower()
if library not in candidates:
library = "iplark"

providers = [library]
for item in candidates:
if item not in providers:
providers.append(item)

return providers

def _extract_data(provider: str, response: dict) -> tuple[dict, str, str, str]:
data, country_code, company_type, asn_type = {}, "", "", ""

if provider == "ipinfo":
data = response.get("data", {}) if isinstance(response, dict) else {}
country_code = data.get("country", "")
company_type = data.get("company", {}).get("type", "")
asn_type = data.get("asn", {}).get("type", "")
elif provider == "ipapi":
data = response if isinstance(response, dict) else {}
country_code = data.get("location", {}).get("country_code", "")
company_type = data.get("company", {}).get("type", "")
asn_type = data.get("asn", {}).get("type", "")
elif provider == "ippure":
data = response if isinstance(response, dict) else {}
country_code = data.get("countryCode", "")

flag = data.get("isResidential", False)
if flag:
company_type, asn_type = "isp", "isp"
else:
company_type, asn_type = "hosting", "hosting"
else:
data = response if isinstance(response, dict) else {}
country_code = data.get("country_code", "")

node_type = utils.trim(data.get("type", "")).lower()
if node_type == "isp":
company_type, asn_type = "isp", "isp"
elif node_type == "business":
company_type, asn_type = "business", "business"
else:
company_type, asn_type = "hosting", "hosting"

# Now get detailed information using the IP
return f"https://ipinfo.io/widget/demo/{ip}"
return data, utils.trim(country_code).upper(), utils.trim(company_type).lower(), utils.trim(asn_type).lower()

name = proxy.get("name", "")
result = ProxyInfo(name=name)
Expand All @@ -871,32 +941,36 @@ def _get_ipinfo_url(port: int, name: str) -> str:
random_delay()

try:
url = ""
if use_ipinfo:
url = _get_ipinfo_url(port=port, name=name)
providers = _get_providers(ip_library)
success, response, provider = False, None, ""

for idx, item in enumerate(providers):
url = _build_url(provider=item, port=port, name=name, api_key=api_key)
if not url:
continue

if not url:
url = _get_ipapi_url(key=api_key)
use_ipinfo = False
# Call API for IP information through the proxy
success, response = make_proxy_request(port=port, url=url, max_retries=2, timeout=12)
if success:
provider = item
break

# Call API for IP information through the proxy
success, response = make_proxy_request(port=port, url=url, max_retries=2, timeout=12)
if idx < len(providers) - 1:
fallback = providers[idx + 1]
logger.warning(f"Failed to query {url} for proxy {name}, provider={item}, trying fallback: {fallback}")
else:
logger.warning(f"Failed to query {url} for proxy {name}, provider={item}")

# Parse data from response
if success:
try:
data = response.get("data", {}) if use_ipinfo else response

# Extract country code from data
if use_ipinfo:
country_code = data.get("country", "")
else:
country_code = data.get("location", {}).get("country_code", "")
data, country_code, company_type, asn_type = _extract_data(provider, response)

result.country = ISO_TO_CHINESE.get(country_code, "") if country_code else ""
if country_code:
result.country = ISO_TO_CHINESE.get(country_code, "")

company_type = data.get("company", {}).get("type", "")
asn_type = data.get("asn", {}).get("type", "")
if not result.country:
result.country = utils.trim(data.get("country_zh", "") or data.get("country", ""))

# Check if it's residential (both company and asn type should be "isp")
if company_type == "isp" and asn_type == "isp":
Expand All @@ -905,9 +979,9 @@ def _get_ipinfo_url(port: int, name: str) -> str:
result.ip_type = "business"

except Exception as e:
logger.error(f"Error parsing {url} response for proxy {name}: {str(e)}")
logger.error(f"Error parsing response for proxy {name}: {str(e)}")
else:
logger.warning(f"Failed to query {url} for proxy {name}")
logger.warning(f"Failed to query residential info for proxy {name} with providers: {providers}")

# Determine if query was successful
flag = result.country != "" or result.ip_type != ""
Expand Down
6 changes: 4 additions & 2 deletions subscribe/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def load_configs(
only_check: bool = False,
num_threads: int = 0,
display: bool = True,
retry: int = 3,
) -> ProcessConfig:
def parse_config(config: dict) -> None:
tasks.extend(config.get("domains", []))
Expand Down Expand Up @@ -273,7 +274,7 @@ def verify(storage: dict, groups: dict) -> bool:
url,
):
headers = {"User-Agent": utils.USER_AGENT, "Referer": url}
content = utils.http_get(url=url, headers=headers)
content = utils.http_get(url=url, headers=headers, retry=max(retry, 1), timeout=120)
if not content:
logger.error(f"cannot fetch config from remote, url: {utils.hide(url=url)}")
else:
Expand Down Expand Up @@ -509,6 +510,7 @@ def aggregate(args: argparse.Namespace) -> None:

clash_bin, subconverter_bin = executable.which_bin()
display = not args.invisible
retry = min(max(1, args.retry), 10)

# parse config
server = utils.trim(args.server) or os.environ.get("SUBSCRIBE_CONF", "").strip()
Expand All @@ -517,11 +519,11 @@ def aggregate(args: argparse.Namespace) -> None:
only_check=args.check,
num_threads=args.num,
display=display,
retry=retry,
)

storages = process_config.storage or {}
pushtool = push.get_instance(config=push.PushConfig.from_dict(storages))
retry = min(max(1, args.retry), 10)

# generate tasks
tasks, groups, sites = assign(
Expand Down
2 changes: 1 addition & 1 deletion subscribe/subconverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def generate_conf(
lines.extend(["emoji=false", "add_emoji=false"])

if ignore_exclude:
lines.append("exclude=流量|过期|剩余|时间|Expire|Traffic")
lines.append("exclude=[到过]期|Expire|Traffic|剩余流量|时间|官网|产品|联系")

lines.append("\n")
content = "\n".join(lines)
Expand Down
2 changes: 1 addition & 1 deletion subscribe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
CTX.verify_mode = ssl.CERT_NONE

USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36"
)


Expand Down
Loading