-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgroup_bot.py
More file actions
4590 lines (4187 loc) · 172 KB
/
group_bot.py
File metadata and controls
4590 lines (4187 loc) · 172 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
import asyncio
import contextlib
from dataclasses import replace
import json
import logging
import os
import re
import signal
import subprocess
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
from memory_store import (
ConversationMemoryStore,
LongTermMemoryWriter,
SharedMemoryJournal,
build_instant_memory_snapshot,
render_memory_search_digest,
render_recent_memory_digest,
)
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.constants import ChatAction
from telegram.error import BadRequest
from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, TypeHandler, filters
from routing import classify_group_message_semantics, classify_task, format_allowed_agents
from runners import RunnerConfig, run_task
from task_registry import TaskRegistry
BASE_DIR = Path(__file__).resolve().parent
OPENCLAW_WORKSPACE_DIR = Path(
os.getenv("OPENCLAW_WORKSPACE_DIR", str(Path.home() / ".openclaw" / "workspace"))
).expanduser()
BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "").strip()
BOT_ROLE = os.getenv("BOT_ROLE", "").strip().lower()
BOT_MODE = os.getenv("BOT_MODE", "worker").strip().lower()
BOT_DISPLAY_NAME = os.getenv("BOT_DISPLAY_NAME", BOT_ROLE or "agent").strip()
GROUP_TASK_DB_PATH = os.getenv("GROUP_TASK_DB_PATH", str(BASE_DIR / "group-tasks.sqlite3")).strip()
LOG_FILE = Path(os.getenv("GROUP_LOG_FILE", str(BASE_DIR / f"group-{BOT_ROLE or 'agent'}.log")).strip())
POLL_INTERVAL_SECS = int(os.getenv("POLL_INTERVAL_SECS", "5"))
PROGRESS_DELAY_SECS = int(os.getenv("PROGRESS_DELAY_SECS", "45"))
DIRECT_GROUP_PROGRESS_DELAY_SECS = int(os.getenv("DIRECT_GROUP_PROGRESS_DELAY_SECS", "30"))
DIRECT_GROUP_FALLBACK_TIMEOUT_SECS = int(os.getenv("DIRECT_GROUP_FALLBACK_TIMEOUT_SECS", "120"))
DIRECT_PRIVATE_PROGRESS_DELAY_SECS = int(os.getenv("DIRECT_PRIVATE_PROGRESS_DELAY_SECS", "20"))
DIRECT_PRIVATE_FALLBACK_TIMEOUT_SECS = int(os.getenv("DIRECT_PRIVATE_FALLBACK_TIMEOUT_SECS", "120"))
TASK_STALE_CLAIM_SECS = int(os.getenv("TASK_STALE_CLAIM_SECS", "120"))
WORKDIR = os.getenv("WORKDIR", str(Path.home() / ".openclaw" / "workspace")).strip()
TASK_COMMAND = os.getenv("TASK_COMMAND", "task").strip()
ALLOW_GROUP_CHAT = os.getenv("ALLOW_GROUP_CHAT", "true").strip().lower() in {"1", "true", "yes", "on"}
ALLOW_DM_TASKS = os.getenv("ALLOW_DM_TASKS", "true").strip().lower() in {"1", "true", "yes", "on"}
ENABLE_DIRECT_PRIVATE_TASKS = os.getenv("ENABLE_DIRECT_PRIVATE_TASKS", "true").strip().lower() in {
"1",
"true",
"yes",
"on",
}
PRIVATE_TASK_MODE = os.getenv("PRIVATE_TASK_MODE", "direct").strip().lower()
MAX_CONTEXT_MESSAGES = int(os.getenv("MAX_CONTEXT_MESSAGES", "16" if BOT_ROLE == "openclaw" else "8"))
PERSISTED_HISTORY_LIMIT = int(os.getenv("PERSISTED_HISTORY_LIMIT", "80" if BOT_ROLE == "openclaw" else "24"))
INSTANT_MEMORY_OWN_LIMIT = int(os.getenv("INSTANT_MEMORY_OWN_LIMIT", "6"))
INSTANT_MEMORY_SHARED_LIMIT = int(os.getenv("INSTANT_MEMORY_SHARED_LIMIT", "6"))
MEMORY_DB_PATH = os.getenv("MEMORY_DB_PATH", str(BASE_DIR / "bot-memory.sqlite3")).strip()
ENABLE_SHARED_MEMORY_LOG = os.getenv("ENABLE_SHARED_MEMORY_LOG", "true").strip().lower() in {
"1",
"true",
"yes",
"on",
}
SHARED_MEMORY_DIR = os.getenv(
"SHARED_MEMORY_DIR",
str(BASE_DIR / "shared-memory"),
).strip()
MEMORY_TIMEZONE = os.getenv("MEMORY_TIMEZONE", "Asia/Shanghai").strip()
ALLOW_LONG_TERM_MEMORY_WRITE = os.getenv("ALLOW_LONG_TERM_MEMORY_WRITE", "false").strip().lower() in {
"1",
"true",
"yes",
"on",
}
LONG_TERM_MEMORY_SCRIPT_PATH = os.getenv(
"LONG_TERM_MEMORY_SCRIPT_PATH",
str(BASE_DIR / "scripts" / "shared-memory-write.sh"),
).strip()
RUNNER_BACKEND = os.getenv("RUNNER_BACKEND", "").strip()
PRIVATE_DIRECT_RUNNER_BACKEND = os.getenv("PRIVATE_DIRECT_RUNNER_BACKEND", "").strip()
PRIVATE_DIRECT_OPENCLAW_AGENT_ID = os.getenv("PRIVATE_DIRECT_OPENCLAW_AGENT_ID", "main").strip()
PRIVATE_DIRECT_WORKDIR = os.getenv("PRIVATE_DIRECT_WORKDIR", "").strip()
RUNNER_TIMEOUT_SECS = int(os.getenv("RUNNER_TIMEOUT_SECS", "900"))
ALLOWED_USER_IDS = {
int(user_id.strip())
for user_id in os.getenv("ALLOWED_USER_IDS", "").split(",")
if user_id.strip().isdigit()
}
CODEx_BIN = os.getenv("CODEX_BIN", "codex").strip()
CODEX_MODEL = os.getenv("CODEX_MODEL", "gpt-5.4-mini").strip()
CODEX_REASONING_EFFORT = os.getenv("CODEX_REASONING_EFFORT", "medium").strip()
CODEX_SANDBOX_MODE = os.getenv("CODEX_SANDBOX_MODE", "workspace-write").strip()
CLAUDE_BIN = os.getenv("CLAUDE_BIN", "claude").strip()
CLAUDE_MODEL = os.getenv("CLAUDE_MODEL", "").strip()
CLAUDE_PERMISSION_MODE = os.getenv("CLAUDE_PERMISSION_MODE", "acceptEdits").strip()
GEMINI_BIN = os.getenv("GEMINI_BIN", "gemini").strip()
GEMINI_MODEL = os.getenv("GEMINI_MODEL", "").strip()
GEMINI_APPROVAL_MODE = os.getenv("GEMINI_APPROVAL_MODE", "auto_edit").strip()
OPENCLAW_BIN = os.getenv("OPENCLAW_BIN", "openclaw").strip()
OPENCLAW_AGENT_ID = os.getenv("OPENCLAW_AGENT_ID", BOT_ROLE).strip()
OPENCLAW_ROUTER_CODING_AGENT = os.getenv("OPENCLAW_ROUTER_CODING_AGENT", "codex").strip()
OPENCLAW_ROUTER_DOCS_AGENT = os.getenv("OPENCLAW_ROUTER_DOCS_AGENT", "gemini").strip()
OPENCLAW_ROUTER_DEFAULT_AGENT = os.getenv("OPENCLAW_ROUTER_DEFAULT_AGENT", "codex").strip()
DAILY_CRYPTO_LATEST_DIGEST_JSON_PATH = os.getenv(
"DAILY_CRYPTO_LATEST_DIGEST_JSON_PATH",
str(Path.home() / ".openclaw" / "workspace" / "reports" / "daily-crypto" / "latest.digest.json"),
).strip()
DAILY_CRYPTO_LATEST_DIGEST_TEXT_PATH = os.getenv(
"DAILY_CRYPTO_LATEST_DIGEST_TEXT_PATH",
str(Path.home() / ".openclaw" / "workspace" / "reports" / "daily-crypto" / "latest.digest.txt"),
).strip()
DAILY_REPORT_CALLBACK_PREFIX = "daily-report:"
NO_REPLY_SENTINEL = "[[NO_REPLY]]"
RUNTIME_MONITOR_INTERVAL_SECS = 60
RUNTIME_MONITOR_LOOKBACK_MINUTES = 1
RUNTIME_MONITOR_MIN_LOOKBACK_MOVE = 3.0
RUNTIME_MONITOR_SECONDARY_LOOKBACK_MINUTES = 5
RUNTIME_MONITOR_SECONDARY_MIN_LOOKBACK_MOVE = 10.0
RUNTIME_MONITOR_ALERT_MODE = "dual-rise"
TENBAGGER_SCRIPT_PATH = Path.home() / ".openclaw" / "workspace" / "scripts" / "fetch_high_low_ratio.py"
TENBAGGER_OUTPUT_DIR = Path.home() / ".openclaw" / "workspace" / "reports" / "binance-tenbagger"
ROLE_ASSIGNMENT_KEYWORDS = [
"职责",
"角色定位",
"定位",
"你负责",
"你的主要职责",
"你的职责",
"以后你负责",
"只负责",
"专门负责",
]
HANDOFF_KEYWORDS = [
"帮",
"协助",
"审核",
"检查",
"排查",
"处理",
"接手",
"优化",
"修复",
"review",
"看一下",
"发给",
"交给",
"让",
"请",
]
PRIMARY_GROUP_ROUTING_KEYWORDS = [
"发给",
"交给",
"转给",
"转交给",
"转发给",
"让他",
"让她",
"让其",
"协助",
"配合",
"接手",
"复核",
"复查",
"审核",
"检查",
"审一下",
"帮他",
"帮你",
"帮忙",
]
DELEGATION_RETURN_CATEGORY = "delegation-return"
EXPLICIT_DISPATCH_MARKERS = [
"#codex",
"#claude",
"#claudecode",
"#gemini",
"#dispatch",
"#派单",
"交给codex",
"交给claude",
"交给gemini",
"让codex",
"让claude",
"让gemini",
"派给codex",
"派给claude",
"派给gemini",
"分配给codex",
"分配给claude",
"分配给gemini",
"分派给codex",
"分派给claude",
"分派给gemini",
"请分派",
"帮我分派",
"请派单",
"帮我派单",
"请分配给合适",
"交给合适的bot",
"交给对应bot",
]
AGENT_SERVICE_LABELS = {
"openclaw": ["com.ukgorclawbot.telegram-group-openclaw"],
"gemini": ["com.ukgorclawbot.telegram-group-gemini"],
"claude": ["com.ukgorclawbot.telegram-group-claude"],
"codex": ["com.ukgorclawbot.telegram-openai-bot", "com.ukgorclawbot.telegram-group-codex"],
}
def configure_logging() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler()],
)
logging.getLogger("httpx").handlers = []
logging.getLogger("httpx").propagate = False
logging.getLogger("httpx").disabled = True
logging.getLogger("httpcore").handlers = []
logging.getLogger("httpcore").propagate = False
logging.getLogger("httpcore").disabled = True
def validate_env() -> None:
if not BOT_TOKEN:
raise RuntimeError("Missing TELEGRAM_BOT_TOKEN")
if BOT_ROLE not in {"openclaw", "codex", "claude", "gemini"}:
raise RuntimeError("BOT_ROLE must be one of: openclaw, codex, claude, gemini")
if BOT_MODE not in {"dispatcher", "worker"}:
raise RuntimeError("BOT_MODE must be dispatcher or worker")
if PRIVATE_TASK_MODE not in {"direct", "queue", "hybrid", "manual"}:
raise RuntimeError("PRIVATE_TASK_MODE must be direct, queue, hybrid, or manual")
if BOT_MODE == "worker" and not RUNNER_BACKEND:
raise RuntimeError("Worker bot requires RUNNER_BACKEND")
def get_registry(app: Application) -> TaskRegistry:
return app.bot_data["registry"]
def get_memory_store(app: Application) -> ConversationMemoryStore:
return app.bot_data["memory_store"]
def get_instant_memory_snapshot(app: Application, chat_id: str) -> str:
return build_instant_memory_snapshot(
get_memory_store(app),
bot_role=BOT_ROLE,
chat_id=chat_id,
own_limit=INSTANT_MEMORY_OWN_LIMIT,
shared_limit=INSTANT_MEMORY_SHARED_LIMIT,
)
def get_shared_journal(app: Application) -> SharedMemoryJournal:
return app.bot_data["shared_memory_journal"]
def get_long_term_writer(app: Application) -> LongTermMemoryWriter:
return app.bot_data["long_term_memory_writer"]
def mirror_group_result_to_openclaw_memory(
memory_store: ConversationMemoryStore,
*,
chat_id: str,
user_id: str,
content: str,
) -> None:
clean = content.strip()
if BOT_ROLE == "openclaw" or not clean:
return
memory_store.append_message(
"openclaw",
chat_id,
user_id,
"assistant",
f"[群聊结果 · {BOT_DISPLAY_NAME}]\n{clean}",
)
def get_runner_config() -> RunnerConfig:
return RunnerConfig(
backend=RUNNER_BACKEND,
workdir=WORKDIR,
timeout_secs=RUNNER_TIMEOUT_SECS,
codex_bin=CODEx_BIN,
codex_model=CODEX_MODEL,
codex_reasoning_effort=CODEX_REASONING_EFFORT,
codex_sandbox=CODEX_SANDBOX_MODE,
claude_bin=CLAUDE_BIN,
claude_model=CLAUDE_MODEL,
claude_permission_mode=CLAUDE_PERMISSION_MODE,
gemini_bin=GEMINI_BIN,
gemini_model=GEMINI_MODEL,
gemini_approval_mode=GEMINI_APPROVAL_MODE,
openclaw_bin=OPENCLAW_BIN,
openclaw_agent_id=OPENCLAW_AGENT_ID,
openclaw_router_coding_agent=OPENCLAW_ROUTER_CODING_AGENT,
openclaw_router_docs_agent=OPENCLAW_ROUTER_DOCS_AGENT,
openclaw_router_default_agent=OPENCLAW_ROUTER_DEFAULT_AGENT,
)
async def notify_long_running_task(
app: Application,
*,
task_id: int,
chat_id: str,
route_reason: str,
) -> None:
await asyncio.sleep(PROGRESS_DELAY_SECS)
task = get_registry(app).get_task(task_id)
if not task or str(task.get("status", "")) != "claimed":
return
delegation_source_role, delegation_target_role = parse_route_pair(route_reason, "user-delegation:")
if delegation_source_role and delegation_target_role == BOT_ROLE:
if should_bypass_openclaw_return_task(delegation_source_role, delegation_target_role):
text = (
f"[{BOT_DISPLAY_NAME}] 任务 #{task_id} 仍在处理中,"
"我会继续直接同步进展,并把关键结果写回 OpenClaw 记忆。"
)
else:
text = (
f"[{BOT_DISPLAY_NAME}] 任务 #{task_id} 仍在检查相关路径和代码链路,"
f"完成后我会先把结果交回 {resolve_role_display_name(delegation_source_role)} 统一汇总。"
)
else:
text = f"[{BOT_DISPLAY_NAME}] 任务 #{task_id} 仍在处理中,我会在完成后继续同步结果。"
await app.bot.send_message(chat_id=chat_id, text=text)
async def send_group_chat_text(
app: Application,
*,
chat_id: int,
text: str,
thread_id: Optional[int] = None,
) -> None:
text = text[:4000]
try:
await app.bot.send_message(chat_id=chat_id, text=text, message_thread_id=thread_id)
return
except BadRequest as exc:
if not is_group_reply_fallback_error(exc):
raise
logging.warning(
"Falling back to root group send role=%s chat_id=%s thread_id=%s error=%s",
BOT_ROLE,
chat_id,
thread_id,
exc,
)
await app.bot.send_message(chat_id=chat_id, text=text)
async def notify_long_running_direct_group_task(
app: Application,
*,
chat_id: int,
thread_id: Optional[int],
message_semantics: str = "task",
) -> None:
await asyncio.sleep(DIRECT_GROUP_PROGRESS_DELAY_SECS)
if message_semantics == "casual":
text = f"[{BOT_DISPLAY_NAME}] 我在想一下,马上回你。"
else:
text = (
f"[{BOT_DISPLAY_NAME}] 这条需求还在处理中,"
"我正在整理关键信息和执行步骤,稍后继续直接回复你。"
)
await send_group_chat_text(
app,
chat_id=chat_id,
thread_id=thread_id,
text=text,
)
async def notify_long_running_direct_private_task(
app: Application,
*,
chat_id: int,
) -> None:
await asyncio.sleep(DIRECT_PRIVATE_PROGRESS_DELAY_SECS)
text = f"[{BOT_DISPLAY_NAME}] 这条我还在处理;如果直连继续卡住,我会自动切到任务链继续跑。"
await app.bot.send_message(chat_id=chat_id, text=text)
def get_private_runner_config() -> RunnerConfig:
cfg = get_runner_config()
if PRIVATE_DIRECT_RUNNER_BACKEND:
cfg.backend = PRIVATE_DIRECT_RUNNER_BACKEND
elif BOT_ROLE == "openclaw" and BOT_MODE == "dispatcher":
cfg.backend = "openclaw_agent"
if PRIVATE_DIRECT_WORKDIR:
cfg.workdir = PRIVATE_DIRECT_WORKDIR
if cfg.backend == "openclaw_agent" and BOT_ROLE == "openclaw" and BOT_MODE == "dispatcher":
cfg.openclaw_agent_id = PRIVATE_DIRECT_OPENCLAW_AGENT_ID or "main"
return cfg
def summarize_text(text: str, limit: int = 120) -> str:
clean = text.replace("\n", " ").strip()
if len(clean) <= limit:
return clean
return f"{clean[:limit]}..."
def is_group_like_chat(chat_type: Optional[str]) -> bool:
return chat_type in {"group", "supergroup", "channel"}
def extract_message_mentions(message: Any) -> set[str]:
mentions: set[str] = set()
if not message:
return mentions
entities = list(getattr(message, "entities", None) or [])
entities.extend(getattr(message, "caption_entities", None) or [])
for entity in entities:
entity_type = str(getattr(entity, "type", "")).lower()
if entity_type != "mention":
continue
try:
raw = message.parse_entity(entity)
except Exception:
continue
normalized = raw.strip().lower().lstrip("@")
if normalized:
mentions.add(normalized)
raw_text = (getattr(message, "text", None) or getattr(message, "caption", None) or "").strip()
for match in re.findall(r"@([A-Za-z0-9_]{5,})", raw_text):
mentions.add(match.lower())
return mentions
def extract_ordered_message_mentions(message: Any) -> List[str]:
ordered: List[str] = []
raw_text = (getattr(message, "text", None) or getattr(message, "caption", None) or "").strip()
for match in re.findall(r"@([A-Za-z0-9_]{5,})", raw_text):
normalized = match.lower()
if normalized not in ordered:
ordered.append(normalized)
return ordered
def resolve_primary_group_bot_mention(message: Any) -> str:
raw_text = (getattr(message, "text", None) or getattr(message, "caption", None) or "").strip().lower()
ordered_bot_mentions = [
mention
for mention in extract_ordered_message_mentions(message)
if mention.endswith("bot")
]
if len(ordered_bot_mentions) < 2:
return ""
has_routing_hint = any(keyword in raw_text for keyword in PRIMARY_GROUP_ROUTING_KEYWORDS)
if not has_routing_hint:
has_routing_hint = bool(
re.search(
r"@[a-z0-9_]{5,}bot.*(?:让|交给|发给|转给|协助|配合|检查|审核|解决)\s+@[a-z0-9_]{5,}bot",
raw_text,
)
)
if not has_routing_hint:
return ""
return ordered_bot_mentions[0]
def resolve_user_delegation_targets(message: Any, self_username: str) -> List[str]:
primary_mention = resolve_primary_group_bot_mention(message)
if not primary_mention or primary_mention != self_username:
return []
targets: List[str] = []
for mention in extract_ordered_message_mentions(message):
normalized = mention.lower()
if not normalized.endswith("bot") or normalized == self_username:
continue
if normalized not in targets:
targets.append(normalized)
return targets
def build_group_delegation_ack_text(target_mentions: List[str]) -> str:
rendered = "、".join(f"@{mention}" for mention in target_mentions)
if len(target_mentions) == 1:
return (
f"收到,这条我先接住。"
f"我会先把相关脚本和数据链路整理后交给 {rendered} 协助处理,"
"等他回我结果后我再统一向你汇报。"
)
return f"收到,这条我先接住。我会先协调 {rendered} 协助处理,等结果回来后我再统一向你汇报。"
def build_role_delegation_ack_text(target_roles: List[str]) -> str:
rendered = "、".join(resolve_role_display_name(role) for role in target_roles)
if len(target_roles) == 1:
return (
"收到,这条我先接住。"
f"按当前群职责,这部分我会交给 {rendered} 继续处理,"
"等他回我结果后我再统一向你汇报。"
)
return f"收到,这条我先接住。按当前群职责,这部分我会协调 {rendered} 继续处理,等结果回来后我再统一向你汇报。"
def build_openclaw_task_breakdown(text: str, target_roles: Optional[List[str]] = None) -> List[Dict[str, str]]:
route = classify_task(text)
lowered = text.lower()
preferred_roles = [role for role in (target_roles or []) if role in {"codex", "claude", "gemini"}]
if not preferred_roles:
preferred_roles = [
role
for role in route.get("allowed_agents", [])
if str(role) in {"codex", "claude", "gemini"}
]
preferred_roles = [str(role) for role in preferred_roles]
needs_summary = any(
keyword in lowered
for keyword in (
"汇报",
"总结",
"报告",
"整理",
"分析",
"结论",
"晨报",
"日报",
"市场情况",
"跟踪",
)
)
needs_research = any(
keyword in lowered
for keyword in (
"资料",
"情报",
"搜索",
"搜集",
"翻译",
"润色",
"数据",
)
)
steps: List[Dict[str, str]] = []
seen_roles: set[str] = set()
can_use_gemini = "gemini" in preferred_roles or is_agent_service_online("gemini")
def add_step(role: str, title: str, goal: str) -> None:
normalized_role = str(role)
if normalized_role in seen_roles:
return
seen_roles.add(normalized_role)
steps.append(
{
"owner": normalized_role,
"title": title,
"goal": goal,
}
)
docs_only = str(route.get("category", "")) == "docs"
tech_role = next((role for role in preferred_roles if role in {"codex", "claude"}), "")
if docs_only:
if can_use_gemini or not preferred_roles:
add_step(
"gemini",
"资料整理与分析",
"围绕用户目标搜集资料、读取现成脚本或现有结果,并输出中文结论。",
)
return steps
if tech_role:
add_step(
tech_role,
"技术执行",
"围绕用户需求完成开发、脚本、排障或配置落地,并给出路径状态与验证结果。",
)
if can_use_gemini and (needs_summary or needs_research or tech_role):
add_step(
"gemini",
"结果整理与中文汇报",
"基于执行结果整理资料、分析数据,并输出适合群聊查看的中文结论。",
)
if not steps:
fallback_role = preferred_roles[0] if preferred_roles else "gemini"
fallback_title = "资料整理与分析" if fallback_role == "gemini" else "技术执行"
fallback_goal = (
"围绕用户目标搜集资料、读取现成脚本或现有结果,并输出中文结论。"
if fallback_role == "gemini"
else "围绕用户需求完成开发、脚本、排障或配置落地,并给出路径状态与验证结果。"
)
add_step(fallback_role, fallback_title, fallback_goal)
return steps
def render_openclaw_task_breakdown(breakdown: List[Dict[str, str]]) -> str:
if not breakdown:
return ""
lines = ["拆分计划:"]
for idx, step in enumerate(breakdown, start=1):
owner = resolve_role_display_name(str(step.get("owner", "")))
title = str(step.get("title", "")).strip() or "任务执行"
goal = str(step.get("goal", "")).strip() or "按职责继续处理。"
lines.append(f"{idx}. {owner}:{title} - {goal}")
return "\n".join(lines)
def build_openclaw_dispatch_ack_text(text: str, target_roles: List[str]) -> str:
breakdown = build_openclaw_task_breakdown(text, target_roles)
breakdown_text = render_openclaw_task_breakdown(breakdown)
if not breakdown_text:
return "收到,我先拆一下这条任务,然后按职责继续分派。"
return (
"收到,我先拆一下这条任务。\n"
f"{breakdown_text}\n"
"接下来:我会按这个拆分继续分派,并把关键进展写回 OpenClaw 记忆。"
)
def inject_openclaw_breakdown_into_payload(payload: str, text: str, target_roles: List[str]) -> str:
breakdown = build_openclaw_task_breakdown(text, target_roles)
breakdown_text = render_openclaw_task_breakdown(breakdown)
if not breakdown_text:
return payload
return (
f"{payload}\n\n"
"OpenClaw 拆分结果:\n"
f"{breakdown_text}\n"
"请优先按这个拆分理解当前任务;如果你只负责其中一段,就聚焦自己负责的那一步。"
)
def parse_openclaw_breakdown_steps(payload: str) -> List[Dict[str, str]]:
marker = "OpenClaw 拆分结果:"
if marker not in payload:
return []
block = payload.split(marker, 1)[1]
steps: List[Dict[str, str]] = []
for raw_line in block.splitlines():
line = raw_line.strip()
match = re.match(r"^\d+\.\s+([^::]+)[::]\s*([^-]+?)\s*-\s*(.+)$", line)
if not match:
continue
owner_name = match.group(1).strip().lower()
title = match.group(2).strip()
goal = match.group(3).strip()
owner_role = {
"openclaw": "openclaw",
"codex": "codex",
"gemini": "gemini",
"claude code": "claude",
"claude": "claude",
}.get(owner_name)
if not owner_role:
continue
steps.append({"owner": owner_role, "title": title, "goal": goal})
return steps
def resolve_openclaw_followup_roles_from_payload(payload: str, current_role: str) -> List[str]:
steps = parse_openclaw_breakdown_steps(payload)
if not steps:
return []
try:
current_index = next(idx for idx, step in enumerate(steps) if step.get("owner") == current_role)
except StopIteration:
return []
roles: List[str] = []
for step in steps[current_index + 1 :]:
role = str(step.get("owner", ""))
if role and role not in roles and is_agent_service_online(role):
roles.append(role)
return roles
def build_openclaw_step_status_text(payload: str, current_role: str, current_state: str) -> str:
steps = parse_openclaw_breakdown_steps(payload)
if not steps:
return ""
try:
current_index = next(idx for idx, step in enumerate(steps) if step.get("owner") == current_role)
except StopIteration:
current_index = -1
lines = ["步骤状态:"]
for idx, step in enumerate(steps):
role = str(step.get("owner", ""))
title = str(step.get("title", "")).strip() or "任务执行"
display = resolve_role_display_name(role)
if idx < current_index:
status = "已完成"
elif idx == current_index and current_index >= 0:
status = current_state
else:
status = "待开始"
lines.append(f"- {display} | {title} | {status}")
return "\n".join(lines)
def build_openclaw_followup_payload(
payload: str,
*,
current_role: str,
next_role: str,
original_user_text: str,
worker_result: str,
) -> str:
steps = parse_openclaw_breakdown_steps(payload)
step_lookup = {str(step.get("owner", "")): step for step in steps}
next_roles_after_current = resolve_openclaw_followup_roles_from_payload(payload, next_role)
is_final_step = not next_roles_after_current
current_display = resolve_role_display_name(current_role)
next_display = resolve_role_display_name(next_role)
next_step = step_lookup.get(next_role, {})
next_title = str(next_step.get("title", "")).strip() or "结果整理与汇报"
next_goal = str(next_step.get("goal", "")).strip() or "基于前序结果继续完成当前步骤。"
breakdown_text = render_openclaw_task_breakdown(steps) if steps else ""
payload_text = (
"来自 Telegram 群聊中 openclaw 的拆分后续步骤。\n"
f"OpenClaw 步骤模式:{'FINAL_SUMMARY' if is_final_step else 'FOLLOWUP'}\n"
f"原始用户消息:{original_user_text}\n"
f"上一位执行者:{current_display}\n"
f"当前需要你接手的角色:{next_display}\n"
f"当前步骤:{next_title}\n"
f"目标:{next_goal}\n"
f"上一阶段结果:{summarize_text(worker_result, limit=1600)}\n"
)
if breakdown_text:
payload_text += f"\nOpenClaw 拆分结果:\n{breakdown_text}\n"
if is_final_step:
payload_text += (
"\n这是 OpenClaw 拆分中的最后一步。"
"请直接基于上一阶段结果完成中文汇总并面向群聊回复,"
"不要继续委派给其他 bot,除非用户在后续新消息里明确要求。"
"不要解释内部任务系统,不要暴露思考过程。"
)
else:
payload_text += (
"\n请只完成你当前负责的这一步,并直接给出适合群聊发送的最终中文结果。"
"不要解释内部任务系统,不要暴露思考过程。"
)
return payload_text
def create_openclaw_followup_tasks_from_payload(
registry: TaskRegistry,
*,
payload: str,
current_role: str,
chat_id: str,
source_user_id: str,
original_user_text: str,
worker_result: str,
) -> List[tuple[str, int]]:
next_roles = resolve_openclaw_followup_roles_from_payload(payload, current_role)
created: List[tuple[str, int]] = []
for next_role in next_roles[:1]:
task_id = registry.create_task(
source_chat_id=chat_id,
source_message_id="",
source_user_id=source_user_id,
source_text=build_openclaw_followup_payload(
payload,
current_role=current_role,
next_role=next_role,
original_user_text=original_user_text,
worker_result=worker_result,
),
category="bot-handoff",
route_reason=f"user-delegation:openclaw->{next_role}",
allowed_agents=[next_role],
)
created.append((next_role, task_id))
return created
def should_bypass_openclaw_return_task(source_role: str, target_role: str) -> bool:
return source_role == "openclaw" and target_role == BOT_ROLE
def is_openclaw_final_summary_payload(payload: str) -> bool:
return "OpenClaw 步骤模式:FINAL_SUMMARY" in payload
def resolve_openclaw_followup_roles(
memory_store: ConversationMemoryStore,
*,
chat_id: str,
user_text: str,
reply_text: str,
) -> List[str]:
explicit_targets = [
role
for role in extract_handoff_targets(reply_text, BOT_ROLE)
if role in {"codex", "claude"} and is_agent_service_online(role)
]
if explicit_targets:
return explicit_targets
auto_roles = resolve_technical_auto_delegation_roles(
memory_store,
chat_id=chat_id,
text=user_text,
)
deduped: List[str] = []
for role in auto_roles:
if role not in {"codex", "claude"}:
continue
if role not in deduped:
deduped.append(role)
return deduped
def build_history_transcript(history: List[Dict[str, str]], limit: int = 8) -> str:
transcript: List[str] = []
for item in history[-limit:]:
role = "assistant" if item["role"] == "assistant" else "user"
transcript.append(f"[{role}]\n{summarize_text(item['content'], limit=240)}")
return "\n\n".join(transcript)
def parse_route_pair(route_reason: str, prefix: str) -> tuple[str, str]:
if not route_reason.startswith(prefix):
return "", ""
payload = route_reason[len(prefix):]
if "->" not in payload:
return "", ""
left, right = payload.split("->", 1)
return left.strip().lower(), right.strip().lower()
def extract_original_user_text(payload: str) -> str:
for line in payload.splitlines():
if line.startswith("原始用户消息:"):
return line.split(":", 1)[1].strip()
return summarize_text(payload, limit=180)
def build_delegation_return_payload(
requester_role: str,
worker_role: str,
original_user_text: str,
worker_result: str,
) -> str:
return (
f"你现在作为 Telegram 群里的 {requester_role},需要向用户做最终汇报。\n"
f"用户最初的要求:{original_user_text}\n"
f"{worker_role} 的处理结果:{summarize_text(worker_result, limit=1200)}\n\n"
"请直接用中文面向用户回复,说明对方已经协助完成了什么、当前最新情况是什么、还有没有剩余问题。"
"默认使用简体中文,不要夹带英文结论句;专有名词、路径、仓库名、代币代码可保留原文。"
"请保留“路径状态”小节,逐条列出每个已检查路径及其状态。"
"如果当前没有拿到明确路径,也要明确写“暂未返回明确路径”。"
"不要解释内部任务系统。"
"不要写思考过程,不要写 I will、我将、接下来、先去、正在、准备。"
"如果协作结果是超时或失败,就直接如实说明当前还没修好,并给出下一步最小行动。"
"建议格式:结论:...\\n路径状态:\\n- <路径> | <状态> | <说明>\\n下一步:..."
"只输出最终要发到群里的成品回复。"
)
def resolve_role_display_name(role: str) -> str:
mapping = {
"openclaw": "OpenClaw",
"codex": "Codex",
"claude": "Claude Code",
"gemini": "Gemini",
}
return mapping.get(role.lower(), role or "协作 bot")
def extract_worker_result(payload: str, worker_role: str) -> str:
prefix = f"{worker_role} 的处理结果:"
for line in payload.splitlines():
if line.startswith(prefix):
return line.split(":", 1)[1].strip()
return ""
def extract_path_status_lines(text: str) -> List[str]:
candidates: List[str] = []
seen: set[str] = set()
for raw_line in text.splitlines():
line = raw_line.strip()
if not line:
continue
normalized = line.lstrip("-*• ").strip()
if not normalized:
continue
if not (
"/" in normalized
or re.search(r"\b[\w./-]+\.(py|sh|ts|js|jsx|tsx|json|md|yaml|yml|toml)\b", normalized)
):
continue
if normalized in seen:
continue
seen.add(normalized)
candidates.append(f"- {summarize_text(normalized, limit=220)}")
return candidates[:8]
def looks_like_planning_text(text: str) -> bool:
stripped = text.strip()
if not stripped:
return True
first_line = stripped.splitlines()[0].strip()
first_lower = first_line.lower()
english_prefixes = (
"i will",
"i'll",
"let me",
"i am going to",
"i need to",
"i should",
"i can",
)
chinese_prefixes = (
"我将",
"接下来",
"我会先",
"我先",
"先去",
"正在",
"准备",
"先看",
"先检查",
"先确认",
)
return first_lower.startswith(english_prefixes) or first_line.startswith(chinese_prefixes)
def line_looks_like_meta_reply(line: str) -> bool:
stripped = line.strip()
if not stripped:
return False
lowered = stripped.lower()
english_prefixes = (
"i will",
"i'll",
"let me",
"here's my plan",
"plan:",
"thinking:",
"analysis:",
"first,",
)
chinese_prefixes = (
"我将",
"接下来",
"我会先",
"我先",
"先去",
"正在",
"准备",
"思路",
"思路:",
"分析:",
"计划:",
"让我先",
"我来先",
)
return lowered.startswith(english_prefixes) or stripped.startswith(chinese_prefixes)
def compact_casual_reply(text: str, limit: int = 120) -> str:
cleaned = re.sub(r"\s+", " ", text).strip()
if not cleaned:
return cleaned
parts = re.split(r"(?<=[。!?!?])\s*", cleaned)
kept: List[str] = []
for part in parts:
part = part.strip()
if not part:
continue
kept.append(part)
preview = "".join(kept)
if len(kept) >= 2 or len(preview) >= limit:
break
result = "".join(kept) if kept else cleaned
return summarize_text(result, limit=limit)