-
Notifications
You must be signed in to change notification settings - Fork 20
Expand file tree
/
Copy path_arkose_pow_python.py
More file actions
185 lines (162 loc) · 7.34 KB
/
Copy path_arkose_pow_python.py
File metadata and controls
185 lines (162 loc) · 7.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
"""Pure-Python sequence.js 算法实现 (Agent A 逆向后的产物).
来源: _seq_analysis/transforms_python_skel.py + extract_pipeline.py.
调用方:
enc = compute_encoded(seq_js_path, seed) # 返回 base64 字符串, None=不支持(回退Node)
ft = compute_final_transform(seq_js_path, seed, nonces) # 返回 list, None=不支持
primitives = get_pipeline(seq_js_path) # 返回 list or None
未识别的文件 (~49% F2/F3 小变体, Agent E 在扩展) 返回 None → 调用方 fallback Node.
identified 文件 (~51% F1 大变体, 100% byte-exact verified) 直接返回 → PoW solve 24s → <100ms.
"""
import os, sys, json, hashlib
from pathlib import Path
# 让 _seq_analysis 里的 tools 可 import
_SEQ_DIR = Path(__file__).resolve().parent / "_seq_analysis"
if str(_SEQ_DIR) not in sys.path:
sys.path.insert(0, str(_SEQ_DIR))
# ── 全局缓存 ──
# 1. 持久缓存 (precompute_pipelines.py 离线产物): hash → pipeline (or [] = 不支持)
_DISK_CACHE_FILE = Path(__file__).resolve().parent / "_seq_pipeline_cache.json"
_DISK_CACHE = None # lazy load
_DISK_CACHE_MTIME = 0.0 # 文件 mtime, 用于热重载检测
# 2. 进程内 path → pipeline 缓存 (避免每次重算 hash + 查表)
_MEM_CACHE = {}
# 3. 重载检查节流: 每 N 秒最多 stat 一次盘 (避免每号 PoW 都 stat)
_RELOAD_CHECK_INTERVAL = 30.0 # 秒
_LAST_RELOAD_CHECK = 0.0
# 4. 重载日志独立文件 (GUI 抓不到 print)
_RELOAD_LOG = Path(__file__).resolve().parent / "_cache_reload.log"
def _load_disk_cache():
"""载入离线 hash 缓存. ★ 加热重载: 文件 mtime 变了就重新读盘 +
清空 _MEM_CACHE (因为之前残缺判定可能已被修复)."""
global _DISK_CACHE, _DISK_CACHE_MTIME, _LAST_RELOAD_CHECK, _MEM_CACHE
import time as _t
# 首次加载
if _DISK_CACHE is None:
try:
if _DISK_CACHE_FILE.exists():
_DISK_CACHE = json.load(open(_DISK_CACHE_FILE, encoding="utf-8"))
_DISK_CACHE_MTIME = _DISK_CACHE_FILE.stat().st_mtime
else:
_DISK_CACHE = {}
except Exception:
_DISK_CACHE = {}
return _DISK_CACHE
# 节流: 每 _RELOAD_CHECK_INTERVAL 秒检查一次盘上 mtime
now = _t.time()
if now - _LAST_RELOAD_CHECK < _RELOAD_CHECK_INTERVAL:
return _DISK_CACHE
_LAST_RELOAD_CHECK = now
try:
cur_mtime = _DISK_CACHE_FILE.stat().st_mtime
except Exception:
return _DISK_CACHE
if cur_mtime <= _DISK_CACHE_MTIME:
return _DISK_CACHE
# 文件更新了, 热重载 + 清 mem cache
try:
new_cache = json.load(open(_DISK_CACHE_FILE, encoding="utf-8"))
old_size = len(_DISK_CACHE)
new_size = len(new_cache)
_DISK_CACHE = new_cache
_DISK_CACHE_MTIME = cur_mtime
_MEM_CACHE.clear() # ★ 关键: 清掉之前残缺判定 (那些 hash 现在已修复)
try:
with open(_RELOAD_LOG, "a", encoding="utf-8") as fd:
fd.write(f"{_t.strftime('%Y-%m-%d %H:%M:%S')} reload {old_size}→{new_size} 条, mem_cache 清空\n")
except Exception:
pass
except Exception as e:
try:
with open(_RELOAD_LOG, "a", encoding="utf-8") as fd:
fd.write(f"{_t.strftime('%Y-%m-%d %H:%M:%S')} reload FAIL: {e}\n")
except Exception:
pass
return _DISK_CACHE
def _file_hash(path):
h = hashlib.sha256()
with open(path, "rb") as f:
h.update(f.read())
return h.hexdigest()[:16]
# Agent A/E 工具 (extract_pipeline 包内部跑 deobfuscator, 慢且产 stderr 噪音; 仅 fallback 用)
try:
from extract_pipeline import extract_pipeline as _extract
from transforms_python_skel import apply_pipeline as _apply, encoded_of as _enc_of
_AVAILABLE = True
except Exception as _e:
_AVAILABLE = False
_IMPORT_ERR = str(_e)[:200]
def get_pipeline(seq_js_path):
"""返回 primitives list. 不支持时返回 None. 优先查离线缓存, 完全避免 deobfuscator 调用."""
if not _AVAILABLE:
return None
sp = str(seq_js_path)
if sp in _MEM_CACHE:
return _MEM_CACHE[sp]
# 1) 查离线缓存 (按文件 hash)
try:
fh = _file_hash(sp)
except Exception:
_MEM_CACHE[sp] = None
return None
disk = _load_disk_cache()
if fh in disk:
prims = disk[fh]
# cache 里 [] = 已知不支持; null = 异常; list = 真 pipeline.
# ★ 2026-06-06: batch_precompute 轻量正则只识别部分原语 → cache 里有大量 1-2 步的
# "不完整" pipeline (真实是 3-4 步). 用了就算错 encoded → GPU 找不到 nonce → 整套
# 短路成骗局. 阈值: Arkose 真实 pipeline ≥ 3 步, < 3 视为不可信, 回退 Node.
# ★ 2026-06-06: 实测 3 步条目也是错的 (batch_precompute 漏抓):
# cache 存 ['reducer','lev_dist','reducer'] 但真实 ['reducer','caseswap','reducer','lev_dist'].
# 只信 4 步 (Arkose pipeline 真实长度). 3 步及以下走 Node fallback.
_MIN_LEN = int(os.environ.get("POW_PY_MIN_PIPELINE_LEN", "4"))
if isinstance(prims, list) and len(prims) >= _MIN_LEN and not any("UNKNOWN" in p for p in prims):
_MEM_CACHE[sp] = prims
return prims
_MEM_CACHE[sp] = None # 已知不支持/异常/不完整 → 别再 fallback 跑 deobfuscator
return None
# 2) cache miss: 文件没预计算 (新 sequence.js 变种). 选择: 跑 deobfuscator (慢+噪音) 或返回 None.
# 默认返回 None (让 Node fallback, 保稳); POW_PY_LAZY_EXTRACT=1 才在线 extract.
if os.environ.get("POW_PY_LAZY_EXTRACT", "0") == "1":
try:
prims = _extract(sp)
except Exception:
_MEM_CACHE[sp] = None
return None
if prims and not any("UNKNOWN" in p for p in prims):
_MEM_CACHE[sp] = prims
# 顺手写入 disk cache (但不写盘, 等下次 precompute 跑)
disk[fh] = prims
return prims
_MEM_CACHE[sp] = None
return None
def compute_encoded(seq_js_path, seed):
"""Python 实现 sequence.js 的 gpu_mode 输出 (encoded). 不支持时返回 None."""
prims = get_pipeline(seq_js_path)
if prims is None:
return None
try:
transformed = _apply(prims, seed)
return _enc_of(transformed), transformed
except Exception:
return None
def compute_final_transform(seq_js_path, transformed, nonces):
"""Python 实现 finalTransform: 在已 transformed 的 seed 后追加 nonces 字典再跑 pipeline.
transformed: compute_encoded 返回的第二个值 (PASS A 输出).
nonces: list[int], 每 split 一个."""
prims = get_pipeline(seq_js_path)
if prims is None:
return None
try:
# Arkose 协议: finalTransform = transform(transformed + [{i: str(nonce_i)}])
nonces_item = {str(i): str(n) for i, n in enumerate(nonces)}
return _apply(prims, list(transformed) + [nonces_item])
except Exception:
return None
def available():
return _AVAILABLE
def stats():
"""诊断: 返回 cache 命中数."""
return {"cache_size": len(_PIPELINE_CACHE),
"supported": sum(1 for v in _PIPELINE_CACHE.values() if v is not None),
"unsupported": sum(1 for v in _PIPELINE_CACHE.values() if v is None),
"available": _AVAILABLE}