-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbot.py
More file actions
2288 lines (2072 loc) · 85.6 KB
/
bot.py
File metadata and controls
2288 lines (2072 loc) · 85.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
import asyncio
import logging
import os
import re
import shutil
import subprocess
import tempfile
import time
from pathlib import Path
from typing import Any, Optional
from memory_store import (
ConversationMemoryStore,
SharedMemoryJournal,
build_instant_memory_snapshot,
render_memory_search_digest,
render_recent_memory_digest,
)
from openai import OpenAI
from routing import classify_group_message_semantics, classify_task
from task_registry import TaskRegistry
from telegram import Update
from telegram.constants import ChatAction
from telegram.error import BadRequest
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, TypeHandler, filters
from xhs_adapter import detect_xhs_text_intent, parse_xhs_command_args
BASE_DIR = Path(__file__).resolve().parent
LOG_FILE = BASE_DIR / "bot.log"
OPENCLAW_WORKSPACE_DIR = Path(
os.getenv("OPENCLAW_WORKSPACE_DIR", str(Path.home() / ".openclaw" / "workspace"))
).expanduser()
BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "").strip()
BACKEND = os.getenv("BACKEND", "codex").strip().lower()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "").strip()
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini").strip()
CODEX_BIN = os.getenv("CODEX_BIN", "codex").strip()
CODEX_MODEL = os.getenv("CODEX_MODEL", "").strip()
CODEX_REASONING_EFFORT = os.getenv("CODEX_REASONING_EFFORT", "").strip()
CODEX_WORKDIR = os.getenv("CODEX_WORKDIR", str(Path.home())).strip()
CODEX_SANDBOX_MODE = os.getenv("CODEX_SANDBOX_MODE", "read-only").strip()
CODEX_TIMEOUT_SECS = int(os.getenv("CODEX_TIMEOUT_SECS", "180"))
GROUP_CODEX_MODEL = os.getenv("GROUP_CODEX_MODEL", CODEX_MODEL or "gpt-5.4-mini").strip()
GROUP_CODEX_WORKDIR = os.getenv("GROUP_CODEX_WORKDIR", CODEX_WORKDIR).strip()
GROUP_CODEX_TIMEOUT_SECS = int(os.getenv("GROUP_CODEX_TIMEOUT_SECS", str(CODEX_TIMEOUT_SECS)))
GROUP_CODEX_REASONING_EFFORT = os.getenv(
"GROUP_CODEX_REASONING_EFFORT",
CODEX_REASONING_EFFORT or "medium",
).strip()
SYSTEM_PROMPT = os.getenv("SYSTEM_PROMPT", "你是一个简洁、可靠、友好的中文助手。").strip()
ALLOWED_USER_IDS = {
int(user_id.strip())
for user_id in os.getenv("ALLOWED_USER_IDS", "").split(",")
if user_id.strip().isdigit()
}
MAX_INPUT_CHARS = int(os.getenv("MAX_INPUT_CHARS", "8000"))
MAX_HISTORY_MESSAGES = int(os.getenv("MAX_HISTORY_MESSAGES", "8"))
PERSISTED_HISTORY_LIMIT = int(os.getenv("PERSISTED_HISTORY_LIMIT", "24"))
INSTANT_MEMORY_OWN_LIMIT = int(os.getenv("INSTANT_MEMORY_OWN_LIMIT", "6"))
INSTANT_MEMORY_SHARED_LIMIT = int(os.getenv("INSTANT_MEMORY_SHARED_LIMIT", "6"))
MEMORY_DB_PATH = os.getenv("MEMORY_DB_PATH", str(BASE_DIR / "bot-memory.sqlite3")).strip()
MEMORY_AGENT_ROLE = os.getenv("MEMORY_AGENT_ROLE", BACKEND or "bot").strip().lower() or "bot"
ALLOW_GROUP_CHAT = os.getenv("ALLOW_GROUP_CHAT", "true").strip().lower() in {"1", "true", "yes", "on"}
GROUP_TASK_CLAIM_ENABLED = os.getenv("GROUP_TASK_CLAIM_ENABLED", "false").strip().lower() in {
"1",
"true",
"yes",
"on",
}
GROUP_TASK_DB_PATH = os.getenv("GROUP_TASK_DB_PATH", str(BASE_DIR / "group-tasks.sqlite3")).strip()
GROUP_WORKER_ROLE = os.getenv("GROUP_WORKER_ROLE", "codex").strip().lower()
GROUP_POLL_INTERVAL_SECS = int(os.getenv("GROUP_POLL_INTERVAL_SECS", "5"))
GROUP_PROGRESS_DELAY_SECS = int(os.getenv("GROUP_PROGRESS_DELAY_SECS", "45"))
GROUP_TASK_STALE_SECS = int(os.getenv("GROUP_TASK_STALE_SECS", "120"))
ENABLE_SHARED_MEMORY_LOG = os.getenv("ENABLE_SHARED_MEMORY_LOG", "true").strip().lower() in {
"1",
"true",
"yes",
"on",
}
SHARED_MEMORY_DIR = os.getenv(
"SHARED_MEMORY_DIR",
str(BASE_DIR / "shared-memory"),
).strip()
MEMORY_TIMEZONE = os.getenv("MEMORY_TIMEZONE", "Asia/Shanghai").strip()
DANGEROUS_ACTION_CONFIRMATION = os.getenv("DANGEROUS_ACTION_CONFIRMATION", "true").strip().lower() in {
"1",
"true",
"yes",
"on",
}
DANGEROUS_ACTION_POLICY = os.getenv("DANGEROUS_ACTION_POLICY", "destructive-or-batch").strip().lower()
PENDING_CONFIRM_TTL_SECS = int(os.getenv("PENDING_CONFIRM_TTL_SECS", "600"))
NO_REPLY_SENTINEL = "[[NO_REPLY]]"
STANDALONE_XHS_BOT_USERNAME = "ukbossxiaohongshubot"
ROLE_ASSIGNMENT_KEYWORDS = [
"职责",
"角色定位",
"定位",
"你负责",
"你的主要职责",
"你的职责",
"以后你负责",
"只负责",
"专门负责",
]
PRIVATE_PROGRESS_QUERY_PATTERNS = [
r"在制作了吗",
r"在做了吗",
r"还在做吗",
r"还在制作吗",
r"做完了吗",
r"完成了吗",
r"进度(如何|怎么样|怎样)?",
r"做到哪了",
r"好了没",
r"有进展吗",
]
HANDOFF_KEYWORDS = [
"帮",
"协助",
"审核",
"检查",
"排查",
"处理",
"接手",
"优化",
"修复",
"review",
"看一下",
"发给",
"交给",
"让",
"请",
]
CODEX_GROUP_QUEUE_KEYWORDS = [
"安装",
"install",
"开发",
"编写",
"创建",
"新增",
"接入",
"配置",
"部署",
"修复",
"修改",
"重构",
"实现",
"升级",
"排查",
"调试",
"debug",
"运行",
"执行",
"搭建",
]
PRIMARY_GROUP_ROUTING_KEYWORDS = [
"发给",
"交给",
"转给",
"转交给",
"转发给",
"让他",
"让她",
"让其",
"协助",
"配合",
"接手",
"复核",
"复查",
"审核",
"检查",
"审一下",
"帮他",
"帮你",
"帮忙",
]
DELEGATION_RETURN_CATEGORY = "delegation-return"
DESTRUCTIVE_PATTERNS = [
(r"\brm\b", "shell 删除命令"),
(r"\bdel\b", "删除命令"),
(r"\bunlink\b", "删除命令"),
(r"\bgit\s+reset\b", "git 重置"),
(r"\bgit\s+clean\b", "git 清理"),
(r"\breboot\b", "系统重启"),
(r"\bshutdown\b", "系统关机"),
(r"\bmkfs\b", "磁盘格式化"),
(r"\bdrop\s+table\b", "数据库删除"),
(r"删除", "删除操作"),
(r"移除", "删除操作"),
(r"清空", "清空内容"),
(r"重置", "重置操作"),
(r"格式化", "格式化操作"),
]
def is_private_progress_query(text: str) -> bool:
normalized = (text or "").strip().lower()
if not normalized:
return False
return any(re.search(pattern, normalized) for pattern in PRIVATE_PROGRESS_QUERY_PATTERNS)
STRICT_ONLY_PATTERNS = [
(r"\bchmod\b", "权限变更"),
(r"\bchown\b", "所有者变更"),
(r"\bkill\b", "进程终止"),
(r"\bpkill\b", "进程终止"),
(r"\blaunchctl\s+unload\b", "服务卸载"),
(r"卸载", "卸载操作"),
(r"杀掉", "进程终止"),
]
BATCH_PATTERNS = [
(r"批量", "批量操作"),
(r"所有文件", "批量操作"),
(r"全部文件", "批量操作"),
(r"整个目录", "目录级操作"),
(r"整个文件夹", "目录级操作"),
(r"整个仓库", "仓库级操作"),
(r"递归", "递归操作"),
(r"\bfind\b.*-exec\b", "批量命令"),
(r"\bxargs\b", "批量命令"),
(r"\bfor\b.+\bin\b", "批量命令"),
(r"\bwhile\b", "批量命令"),
(r"\*\.", "通配符批量操作"),
(r"/\*", "通配符批量操作"),
]
MEMORY_STORE = ConversationMemoryStore(MEMORY_DB_PATH, keep_messages=PERSISTED_HISTORY_LIMIT)
SHARED_MEMORY_JOURNAL = SharedMemoryJournal(SHARED_MEMORY_DIR, timezone_name=MEMORY_TIMEZONE)
def configure_logging() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler()],
)
logging.getLogger("httpx").handlers = []
logging.getLogger("httpx").propagate = False
logging.getLogger("httpx").disabled = True
logging.getLogger("httpcore").handlers = []
logging.getLogger("httpcore").propagate = False
logging.getLogger("httpcore").disabled = True
def validate_env() -> None:
missing = []
if not BOT_TOKEN:
missing.append("TELEGRAM_BOT_TOKEN")
if BACKEND == "openai" and not OPENAI_API_KEY:
missing.append("OPENAI_API_KEY")
if BACKEND == "codex" and not resolve_codex_bin():
missing.append("CODEX_BIN")
if missing:
raise RuntimeError(f"Missing required env vars: {', '.join(missing)}")
if BACKEND not in {"codex", "openai"}:
raise RuntimeError("BACKEND must be either 'codex' or 'openai'")
if DANGEROUS_ACTION_POLICY not in {"destructive-or-batch", "strict"}:
raise RuntimeError("DANGEROUS_ACTION_POLICY must be 'destructive-or-batch' or 'strict'")
if GROUP_TASK_CLAIM_ENABLED and GROUP_WORKER_ROLE not in {"codex", "claude", "gemini"}:
raise RuntimeError("GROUP_WORKER_ROLE must be one of: codex, claude, gemini")
def resolve_codex_bin() -> str:
if not CODEX_BIN:
return ""
if os.path.isabs(CODEX_BIN):
return CODEX_BIN if os.path.exists(CODEX_BIN) else ""
return shutil.which(CODEX_BIN) or ""
def is_allowed_user(update: Update) -> bool:
user = update.effective_user
if user and not getattr(user, "is_bot", False):
if not ALLOWED_USER_IDS:
return True
return bool(user.id in ALLOWED_USER_IDS)
chat = update.effective_chat
message = update.effective_message
sender_chat = getattr(message, "sender_chat", None)
if chat and is_group_like_chat(chat.type) and sender_chat and sender_chat.id == chat.id:
logging.info(
"Allowing anonymous group sender chat_id=%s sender_chat_id=%s",
getattr(chat, "id", None),
getattr(sender_chat, "id", None),
)
return True
if chat and is_group_like_chat(chat.type):
logging.info(
"Rejected group sender chat_id=%s user_id=%s sender_chat_id=%s",
getattr(chat, "id", None),
getattr(user, "id", None) if user else None,
getattr(sender_chat, "id", None) if sender_chat else None,
)
return False
def summarize_text(text: str, limit: int = 80) -> str:
sanitized = text.replace("\n", " ").strip()
if len(sanitized) <= limit:
return sanitized
return f"{sanitized[:limit]}..."
def is_group_like_chat(chat_type: Optional[str]) -> bool:
return chat_type in {"group", "supergroup", "channel"}
def extract_message_mentions(message: Any) -> set[str]:
mentions: set[str] = set()
if not message:
return mentions
entities = list(getattr(message, "entities", None) or [])
entities.extend(getattr(message, "caption_entities", None) or [])
for entity in entities:
entity_type = str(getattr(entity, "type", "")).lower()
if entity_type != "mention":
continue
try:
raw = message.parse_entity(entity)
except Exception:
continue
normalized = raw.strip().lower().lstrip("@")
if normalized:
mentions.add(normalized)
raw_text = (getattr(message, "text", None) or getattr(message, "caption", None) or "").strip()
for match in re.findall(r"@([A-Za-z0-9_]{5,})", raw_text):
mentions.add(match.lower())
return mentions
def extract_ordered_message_mentions(message: Any) -> list[str]:
ordered: list[str] = []
raw_text = (getattr(message, "text", None) or getattr(message, "caption", None) or "").strip()
for match in re.findall(r"@([A-Za-z0-9_]{5,})", raw_text):
normalized = match.lower()
if normalized not in ordered:
ordered.append(normalized)
return ordered
def resolve_primary_group_bot_mention(message: Any) -> str:
raw_text = (getattr(message, "text", None) or getattr(message, "caption", None) or "").strip().lower()
ordered_bot_mentions = [
mention
for mention in extract_ordered_message_mentions(message)
if mention.endswith("bot")
]
if len(ordered_bot_mentions) < 2:
return ""
has_routing_hint = any(keyword in raw_text for keyword in PRIMARY_GROUP_ROUTING_KEYWORDS)
if not has_routing_hint:
has_routing_hint = bool(
re.search(
r"@[a-z0-9_]{5,}bot.*(?:让|交给|发给|转给|协助|配合|检查|审核|解决)\s+@[a-z0-9_]{5,}bot",
raw_text,
)
)
if not has_routing_hint:
return ""
return ordered_bot_mentions[0]
def resolve_user_delegation_targets(message: Any, self_username: str) -> list[str]:
primary_mention = resolve_primary_group_bot_mention(message)
if not primary_mention or primary_mention != self_username:
return []
targets: list[str] = []
for mention in extract_ordered_message_mentions(message):
normalized = mention.lower()
if not normalized.endswith("bot") or normalized == self_username:
continue
if normalized not in targets:
targets.append(normalized)
return targets
def build_group_delegation_ack_text(target_mentions: list[str]) -> str:
rendered = "、".join(f"@{mention}" for mention in target_mentions)
if len(target_mentions) == 1:
return (
f"收到,这条我先接住。"
f"我会先把相关脚本和数据链路整理后交给 {rendered} 协助处理,"
"等他回我结果后我再统一向你汇报。"
)
return f"收到,这条我先接住。我会先协调 {rendered} 协助处理,等结果回来后我再统一向你汇报。"
def build_history_transcript(history: list[dict[str, str]], limit: int = 8) -> str:
transcript: list[str] = []
for item in history[-limit:]:
role = "assistant" if item["role"] == "assistant" else "user"
transcript.append(f"[{role}]\n{summarize_text(item['content'], limit=240)}")
return "\n\n".join(transcript)
def parse_route_pair(route_reason: str, prefix: str) -> tuple[str, str]:
if not route_reason.startswith(prefix):
return "", ""
payload = route_reason[len(prefix):]
if "->" not in payload:
return "", ""
left, right = payload.split("->", 1)
return left.strip().lower(), right.strip().lower()
def extract_original_user_text(payload: str) -> str:
for line in payload.splitlines():
if line.startswith("原始用户消息:"):
return line.split(":", 1)[1].strip()
return summarize_text(payload, limit=180)
def build_delegation_return_payload(
requester_role: str,
worker_role: str,
original_user_text: str,
worker_result: str,
) -> str:
return (
f"你现在作为 Telegram 群里的 {requester_role},需要向用户做最终汇报。\n"
f"用户最初的要求:{original_user_text}\n"
f"{worker_role} 的处理结果:{summarize_text(worker_result, limit=1200)}\n\n"
"请直接用中文面向用户回复,说明对方已经协助完成了什么、当前最新情况是什么、还有没有剩余问题。"
"默认使用简体中文,不要夹带英文结论句;专有名词、路径、仓库名、代币代码可保留原文。"
"请保留“路径状态”小节,逐条列出每个已检查路径及其状态。"
"如果当前没有拿到明确路径,也要明确写“暂未返回明确路径”。"
"不要解释内部任务系统。"
"不要写思考过程,不要写 I will、我将、接下来、先去、正在、准备。"
"如果协作结果是超时或失败,就直接如实说明当前还没修好,并给出下一步最小行动。"
"建议格式:结论:...\\n路径状态:\\n- <路径> | <状态> | <说明>\\n下一步:..."
"只输出最终要发到群里的成品回复。"
)
def resolve_role_display_name(role: str) -> str:
mapping = {
"openclaw": "OpenClaw",
"codex": "Codex",
"claude": "Claude Code",
"gemini": "Gemini",
}
return mapping.get(role.lower(), role or "协作 bot")
def extract_worker_result(payload: str, worker_role: str) -> str:
prefix = f"{worker_role} 的处理结果:"
for line in payload.splitlines():
if line.startswith(prefix):
return line.split(":", 1)[1].strip()
return ""
def extract_path_status_lines(text: str) -> list[str]:
candidates: list[str] = []
seen: set[str] = set()
for raw_line in text.splitlines():
line = raw_line.strip()
if not line:
continue
normalized = line.lstrip("-*• ").strip()
if not normalized:
continue
if not (
"/" in normalized
or re.search(r"\b[\w./-]+\.(py|sh|ts|js|jsx|tsx|json|md|yaml|yml|toml)\b", normalized)
):
continue
if normalized in seen:
continue
seen.add(normalized)
candidates.append(f"- {summarize_text(normalized, limit=220)}")
return candidates[:8]
def looks_like_planning_text(text: str) -> bool:
stripped = text.strip()
if not stripped:
return True
first_line = stripped.splitlines()[0].strip()
first_lower = first_line.lower()
english_prefixes = (
"i will",
"i'll",
"let me",
"i am going to",
"i need to",
"i should",
"i can",
)
chinese_prefixes = (
"我将",
"接下来",
"我会先",
"我先",
"先去",
"正在",
"准备",
"先看",
"先检查",
"先确认",
)
return first_lower.startswith(english_prefixes) or first_line.startswith(chinese_prefixes)
def build_delegation_return_fallback(worker_role: str, worker_result: str) -> str:
worker_display = resolve_role_display_name(worker_role)
normalized = summarize_text(worker_result, limit=1200)
path_lines = extract_path_status_lines(worker_result)
path_block = "\n".join(path_lines) if path_lines else "- 暂未返回明确路径"
failure_markers = ("超时", "失败", "报错", "异常", "未完成", "还没", "没有", "未能")
if any(marker in worker_result for marker in failure_markers):
return (
f"我先同步一下最新情况:{worker_display} 这次已经介入协助,但当前还没有完全收口。"
f"目前结果是:{normalized}\n\n"
f"路径状态:\n{path_block}\n\n"
"下一步:这项检查现在还没完全修好;我会继续让他先核对数据源和脚本入口,再尽快给你准确结论。"
)[:4000]
return (
f"我先同步一下最新情况:{worker_display} 已经协助完成这轮检查。"
f"当前结论是:{normalized}\n\n"
f"路径状态:\n{path_block}"
)[:4000]
def should_bypass_openclaw_return_task(source_role: str, target_role: str) -> bool:
return source_role == "openclaw" and target_role == GROUP_WORKER_ROLE
def normalize_delegation_return_output(result_text: str, payload: str, route_reason: str) -> str:
cleaned = result_text.strip()
worker_role, _requester_role = parse_route_pair(route_reason, "delegation-return:")
if not looks_like_planning_text(cleaned):
return cleaned[:4000]
worker_result = extract_worker_result(payload, worker_role) or cleaned
logging.info(
"Normalizing delegation-return output role=%s route=%s because runner returned planning text",
GROUP_WORKER_ROLE,
route_reason,
)
return build_delegation_return_fallback(worker_role, worker_result)
def create_delegation_return_task(
registry: TaskRegistry,
*,
requester_role: str,
worker_role: str,
chat_id: str,
source_user_id: str,
original_user_text: str,
worker_result: str,
) -> int:
return registry.create_task(
source_chat_id=chat_id,
source_message_id="",
source_user_id=source_user_id,
source_text=build_delegation_return_payload(
requester_role,
worker_role,
original_user_text,
worker_result,
),
category=DELEGATION_RETURN_CATEGORY,
route_reason=f"delegation-return:{worker_role}->{requester_role}",
allowed_agents=[requester_role],
)
def parse_openclaw_breakdown_steps(payload: str) -> list[dict[str, str]]:
marker = "OpenClaw 拆分结果:"
if marker not in payload:
return []
block = payload.split(marker, 1)[1]
steps: list[dict[str, str]] = []
for raw_line in block.splitlines():
line = raw_line.strip()
match = re.match(r"^\d+\.\s+([^::]+)[::]\s*([^-]+?)\s*-\s*(.+)$", line)
if not match:
continue
owner_name = match.group(1).strip().lower()
title = match.group(2).strip()
goal = match.group(3).strip()
owner_role = {
"openclaw": "openclaw",
"codex": "codex",
"gemini": "gemini",
"claude code": "claude",
"claude": "claude",
}.get(owner_name)
if not owner_role:
continue
steps.append({"owner": owner_role, "title": title, "goal": goal})
return steps
def resolve_openclaw_followup_roles_from_payload(payload: str, current_role: str) -> list[str]:
steps = parse_openclaw_breakdown_steps(payload)
if not steps:
return []
try:
current_index = next(idx for idx, step in enumerate(steps) if step.get("owner") == current_role)
except StopIteration:
return []
roles: list[str] = []
for step in steps[current_index + 1 :]:
role = str(step.get("owner", ""))
if role and role not in roles:
roles.append(role)
return roles
def build_openclaw_step_status_text(payload: str, current_role: str, current_state: str) -> str:
steps = parse_openclaw_breakdown_steps(payload)
if not steps:
return ""
try:
current_index = next(idx for idx, step in enumerate(steps) if step.get("owner") == current_role)
except StopIteration:
current_index = -1
lines = ["步骤状态:"]
for idx, step in enumerate(steps):
role = str(step.get("owner", ""))
title = str(step.get("title", "")).strip() or "任务执行"
display = resolve_role_display_name(role)
if idx < current_index:
status = "已完成"
elif idx == current_index and current_index >= 0:
status = current_state
else:
status = "待开始"
lines.append(f"- {display} | {title} | {status}")
return "\n".join(lines)
def build_openclaw_followup_payload(
payload: str,
*,
current_role: str,
next_role: str,
original_user_text: str,
worker_result: str,
) -> str:
steps = parse_openclaw_breakdown_steps(payload)
step_lookup = {str(step.get("owner", "")): step for step in steps}
next_step = step_lookup.get(next_role, {})
next_title = str(next_step.get("title", "")).strip() or "结果整理与汇报"
next_goal = str(next_step.get("goal", "")).strip() or "基于前序结果继续完成当前步骤。"
breakdown_lines = ["拆分计划:"]
for idx, step in enumerate(steps, start=1):
breakdown_lines.append(
f"{idx}. {resolve_role_display_name(str(step.get('owner', '')))}:{str(step.get('title', '')).strip()} - {str(step.get('goal', '')).strip()}"
)
breakdown_text = "\n".join(breakdown_lines) if len(breakdown_lines) > 1 else ""
payload_text = (
"来自 Telegram 群聊中 openclaw 的拆分后续步骤。\n"
f"原始用户消息:{original_user_text}\n"
f"上一位执行者:{resolve_role_display_name(current_role)}\n"
f"当前需要你接手的角色:{resolve_role_display_name(next_role)}\n"
f"当前步骤:{next_title}\n"
f"目标:{next_goal}\n"
f"上一阶段结果:{summarize_text(worker_result, limit=1600)}\n"
)
if breakdown_text:
payload_text += f"\nOpenClaw 拆分结果:\n{breakdown_text}\n"
payload_text += (
"\n请只完成你当前负责的这一步,并直接给出适合群聊发送的最终中文结果。"
"不要解释内部任务系统,不要暴露思考过程。"
)
return payload_text
def create_openclaw_followup_tasks_from_payload(
registry: TaskRegistry,
*,
payload: str,
current_role: str,
chat_id: str,
source_user_id: str,
original_user_text: str,
worker_result: str,
) -> list[tuple[str, int]]:
next_roles = resolve_openclaw_followup_roles_from_payload(payload, current_role)
created: list[tuple[str, int]] = []
for next_role in next_roles[:1]:
task_id = registry.create_task(
source_chat_id=chat_id,
source_message_id="",
source_user_id=source_user_id,
source_text=build_openclaw_followup_payload(
payload,
current_role=current_role,
next_role=next_role,
original_user_text=original_user_text,
worker_result=worker_result,
),
category="bot-handoff",
route_reason=f"user-delegation:openclaw->{next_role}",
allowed_agents=[next_role],
)
created.append((next_role, task_id))
return created
def create_user_delegation_tasks(
registry: TaskRegistry,
*,
source_role: str,
chat_id: str,
source_message_id: str,
source_user_id: str,
user_text: str,
history: list[dict[str, str]],
target_mentions: list[str],
) -> list[tuple[str, int]]:
recent_user_notes: list[str] = []
seen_notes: set[str] = set()
for item in history[-6:]:
if item.get("role") == "assistant":
continue
note = summarize_text(item["content"], limit=140)
if note in seen_notes:
continue
seen_notes.add(note)
recent_user_notes.append(note)
context_note = "\n".join(f"- {note}" for note in recent_user_notes[-2:])
lowered_text = user_text.lower()
fast_path_hints: list[str] = []
if any(keyword in user_text for keyword in ("最新市场数据", "市场数据", "实时价格")):
fast_path_hints.extend(
[
str(OPENCLAW_WORKSPACE_DIR / "trading-dashboard" / "server.js"),
str(OPENCLAW_WORKSPACE_DIR / "trading-dashboard" / "trades.json"),
str(OPENCLAW_WORKSPACE_DIR / "btc-scalp-sim" / "market.py"),
]
)
if any(keyword in lowered_text for keyword in ("daily-crypto", "github report", "异动")):
fast_path_hints.append(str(OPENCLAW_WORKSPACE_DIR / "scripts" / "daily-crypto-github-report.sh"))
deduped_hints: list[str] = []
for hint in fast_path_hints:
if hint not in deduped_hints:
deduped_hints.append(hint)
created: list[tuple[str, int]] = []
for target_mention in target_mentions:
target_role = resolve_agent_role_from_mention(target_mention)
if not target_role or target_role == source_role:
continue
payload = (
f"来自 Telegram 群聊中 {source_role} 的用户指定协作请求。\n"
f"原始用户消息:{user_text}\n"
f"主责 bot:{source_role}\n"
f"协作目标:{target_role}\n"
"目标:检查相关代码或脚本,确认是否真的使用了最新市场数据,并给出最小修复建议。\n"
"请优先自己在工作目录内定位相关脚本/仓库,不要等待别人继续补充路径。\n"
"请优先用 rg / rg --files 快速定位文件,不要做长时间全盘发散搜索。\n"
"请按固定格式返回:结论、路径状态、下一步。\n"
"路径状态里至少逐条列出本轮检查过的每个路径,例如:- /path/file.py | 已检查/未找到/待复核 | 说明。\n"
)
if deduped_hints:
payload += "优先检查这些高相关路径;如果前 2-3 个路径已经足以确认问题,就不要继续扩展搜索:\n"
payload += "\n".join(f"- {path}" for path in deduped_hints[:4]) + "\n"
if context_note:
payload += f"\n最近用户补充:\n{context_note}\n"
payload += (
"\n最终过程反馈和结果汇报一律使用简体中文;专有名词、路径、仓库名、代币代码可保留原文。"
"\n请从对应职责出发直接接手。"
"如果在当前轮还不能完全收口,请优先返回:已检查的路径、确认到的问题点、以及下一步最小修复。"
"不要做无边界探索,也不要长篇复述群聊。"
)
task_id = registry.create_task(
source_chat_id=chat_id,
source_message_id=source_message_id,
source_user_id=source_user_id,
source_text=payload,
category="bot-handoff",
route_reason=f"user-delegation:{source_role}->{target_role}",
allowed_agents=[target_role],
)
created.append((target_role, task_id))
return created
def should_handle_group_message_for_bot(
update: Update,
context: ContextTypes.DEFAULT_TYPE,
) -> tuple[bool, bool]:
message = update.effective_message
mentions = extract_message_mentions(message)
bot_mentions = {mention for mention in mentions if mention.endswith("bot")}
if not bot_mentions:
return True, False
self_username = (
(getattr(context.bot, "username", None) or "")
.strip()
.lower()
.lstrip("@")
)
self_mentioned = bool(self_username and self_username in bot_mentions)
if not self_mentioned:
logging.info(
"Ignoring group message for other bots chat_id=%s self=%s mentioned=%s",
getattr(update.effective_chat, "id", None),
self_username or "-",
",".join(sorted(bot_mentions)),
)
return False, False
primary_mention = resolve_primary_group_bot_mention(message)
if primary_mention and self_username != primary_mention:
logging.info(
"Ignoring delegated secondary mention chat_id=%s self=%s primary=%s mentioned=%s",
getattr(update.effective_chat, "id", None),
self_username or "-",
primary_mention,
",".join(sorted(bot_mentions)),
)
return False, False
return True, True
def is_group_role_assignment(text: str, mentioned_self: bool = False) -> bool:
lowered = text.strip().lower()
if not lowered:
return False
return bool(mentioned_self and any(keyword in lowered for keyword in ROLE_ASSIGNMENT_KEYWORDS))
def build_group_role_note(text: str) -> str:
cleaned = re.sub(r"@([A-Za-z0-9_]{5,})", "", text).strip()
cleaned = re.sub(r"\s+", " ", cleaned)
return cleaned[:1000]
def resolve_agent_role_from_mention(mention: str) -> str:
lowered = mention.lower()
if "codex" in lowered:
return "codex"
if "gemini" in lowered:
return "gemini"
if "claude" in lowered:
return "claude"
return ""
def extract_handoff_targets(text: str, source_role: str) -> list[str]:
lowered = text.strip().lower()
if not lowered or not any(keyword in lowered for keyword in HANDOFF_KEYWORDS):
return []
targets: list[str] = []
for mention in sorted(set(re.findall(r"@([A-Za-z0-9_]{5,})", text))):
role = resolve_agent_role_from_mention(mention)
if role and role != source_role and role not in targets:
targets.append(role)
return targets
def build_handoff_payload(source_role: str, user_text: str, reply_text: str) -> str:
return (
f"来自 Telegram 群聊中 {source_role} 的协作请求。\n"
f"原始用户消息:{user_text}\n"
f"{source_role} 的说明:{reply_text}\n"
"请直接接手并在群里反馈结果。"
)
def enqueue_handoff_tasks(
registry: TaskRegistry,
*,
chat_id: str,
source_message_id: str,
source_user_id: str,
source_role: str,
user_text: str,
reply_text: str,
) -> list[tuple[str, int]]:
created: list[tuple[str, int]] = []
for target_role in extract_handoff_targets(reply_text, source_role):
task_id = registry.create_task(
source_chat_id=chat_id,
source_message_id=source_message_id,
source_user_id=source_user_id,
source_text=build_handoff_payload(source_role, user_text, reply_text),
category="bot-handoff",
route_reason=f"bot-handoff:{source_role}->{target_role}",
allowed_agents=[target_role],
)
created.append((target_role, task_id))
return created
def is_group_reply_fallback_error(exc: BadRequest) -> bool:
message = str(exc).lower()
return (
"topic_closed" in message
or "message to be replied not found" in message
or "message thread not found" in message
)
async def send_text_response(
update: Update,
context: ContextTypes.DEFAULT_TYPE,
text: str,
) -> None:
message = update.effective_message
chat = update.effective_chat
if not message or not chat:
return
text = text[:4000]
if not is_group_like_chat(getattr(chat, "type", None)):
await message.reply_text(text)
return
thread_id = getattr(message, "message_thread_id", None)
try:
await context.bot.send_message(chat_id=chat.id, text=text, message_thread_id=thread_id)
return
except BadRequest as exc:
if not is_group_reply_fallback_error(exc):
raise
logging.warning(
"Falling back to root group send chat_id=%s thread_id=%s error=%s",
getattr(chat, "id", None),
thread_id,
exc,
)
await context.bot.send_message(chat_id=chat.id, text=text)
async def send_chunked_text_response(
update: Update,
context: ContextTypes.DEFAULT_TYPE,
text: str,
) -> None:
clean = text.strip() or "没有可返回的内容。"
chunk_size = 3500
for index in range(0, len(clean), chunk_size):
await send_text_response(update, context, clean[index : index + chunk_size])
async def log_raw_update(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
message = update.effective_message
chat = update.effective_chat
if not message or not chat or not is_group_like_chat(chat.type):
return
user = update.effective_user
sender_chat = getattr(message, "sender_chat", None)
text = (getattr(message, "text", None) or getattr(message, "caption", None) or "").strip()
logging.info(
"Raw update chat_type=%s chat_id=%s user_id=%s sender_chat_id=%s text=%s",
chat.type,
getattr(chat, "id", None),
getattr(user, "id", None) if user else None,
getattr(sender_chat, "id", None) if sender_chat else None,
summarize_text(text, limit=100),
)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not is_allowed_user(update):
await send_text_response(update, context, "未授权用户,无法使用该机器人。")
return
await send_text_response(update, context, "机器人已启动,直接发送消息即可。")
async def help_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not is_allowed_user(update):
await send_text_response(update, context, "未授权用户,无法使用该机器人。")
return
await send_text_response(
update,
context,
"可用命令:\n"
"/start - 启动提示\n"
"/help - 查看帮助\n"
"/ping - 健康检查\n"
"/backend - 查看当前后端\n"
"/memory_recent - 查看最近记忆摘要\n"
"/memory_search - 按关键词搜索记忆\n"
"/reset - 清空当前聊天上下文\n"
"/confirm - 确认执行待确认的危险操作\n"
"/cancel - 取消待确认的危险操作\n"
"/whoami - 查看你的 Telegram user_id\n"
f"小红书相关请私聊 @{STANDALONE_XHS_BOT_USERNAME}\n"
f"直接发送文本即可对话;群聊自由对话={'on' if ALLOW_GROUP_CHAT else 'off'}。"
)