forked from kjqwer/astrbot_plugin_sy
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtools.py
More file actions
558 lines (473 loc) · 27.5 KB
/
tools.py
File metadata and controls
558 lines (473 loc) · 27.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
import datetime
from typing import Union
from astrbot.api.event import AstrMessageEvent
from astrbot.api.star import Context
from astrbot.api import logger
from .utils import parse_datetime_for_llm, save_reminder_data, check_reminder_limit
class ReminderTools:
def __init__(self, star_instance):
self.star = star_instance
self.context = star_instance.context
self.reminder_data = star_instance.reminder_data
self.data_file = star_instance.data_file
self.scheduler_manager = star_instance.scheduler_manager
self.unique_session = star_instance.unique_session
# 延迟初始化统一处理器,避免循环依赖
self._processor = None
@property
def processor(self):
"""懒加载统一处理器实例"""
if self._processor is None:
from .command_utils import UnifiedCommandProcessor
self._processor = UnifiedCommandProcessor(self.star)
return self._processor
def get_session_id(self, msg_origin, creator_id=None):
"""
根据会话隔离设置,获取正确的会话ID
Args:
msg_origin: 原始会话ID
creator_id: 创建者ID
Returns:
str: 处理后的会话ID
"""
if not self.unique_session:
return msg_origin
# 如果启用了会话隔离,并且有创建者ID,则在会话ID中添加用户标识
if creator_id and ":" in msg_origin:
# 在群聊环境中添加用户ID
if (":GroupMessage:" in msg_origin or
"@chatroom" in msg_origin or
":ChannelMessage:" in msg_origin):
# 分割会话ID并在末尾添加用户标识
parts = msg_origin.rsplit(":", 1)
if len(parts) == 2:
return f"{parts[0]}:{parts[1]}_{creator_id}"
return msg_origin
async def set_reminder(self, event: Union[AstrMessageEvent, Context], text: str, datetime_str: str, user_name: str = "用户", repeat: str = None, holiday_type: str = None, group_id: str = None):
'''设置一个提醒
Args:
text(string): 提醒内容
datetime_str(string): 提醒时间,格式为 %Y-%m-%d %H:%M
user_name(string): 提醒对象名称,默认为"用户"
repeat(string): 重复类型,可选值:daily(每天),weekly(每周),monthly(每月),yearly(每年),none(不重复)
holiday_type(string): 可选,节假日类型:workday(仅工作日执行),holiday(仅法定节假日执行)
group_id(string): 可选,指定群聊ID,用于在特定群聊中设置提醒
'''
# 权限检查
if hasattr(event, 'get_sender_id'):
from .utils import check_permission_and_return_error
error_msg = check_permission_and_return_error(event, self.star.whitelist)
if error_msg:
return error_msg
try:
logger.info(f"set_reminder被调用: text='{text}', datetime_str='{datetime_str}', user_name='{user_name}', repeat='{repeat}', holiday_type='{holiday_type}', group_id='{group_id}'")
# 如果是Context类型,无法使用统一处理器,使用原有逻辑
if isinstance(event, Context):
logger.info("使用Context模式的legacy方法")
return await self._legacy_set_reminder(event, text, datetime_str, user_name, repeat, holiday_type)
# 为LLM工具使用简单的时间解析
try:
parsed_datetime_str = parse_datetime_for_llm(datetime_str)
logger.info(f"时间解析成功: '{datetime_str}' -> '{parsed_datetime_str}'")
except ValueError as e:
logger.error(f"时间解析失败: {e}")
return str(e)
# 使用统一处理器处理提醒
logger.info("开始调用统一处理器")
result_message = None
async for result in self.processor.process_add_item(
event, 'reminder', text, parsed_datetime_str, None, repeat, holiday_type, group_id, time_already_parsed=True
):
logger.info(f"收到统一处理器结果: {type(result)}")
# event.plain_result() 返回的是MessageEventResult对象
# 使用get_plain_text()方法提取纯文本消息
if hasattr(result, 'get_plain_text'):
result_message = result.get_plain_text()
logger.info(f"提取的消息 (get_plain_text): '{result_message}'")
elif hasattr(result, 'chain') and result.chain:
# 手动提取链中的文本
from astrbot.core.message.components import Plain
texts = [comp.text for comp in result.chain if isinstance(comp, Plain)]
result_message = " ".join(texts)
logger.info(f"提取的消息 (chain): '{result_message}'")
else:
result_message = str(result)
logger.info(f"提取的消息 (str): '{result_message}'")
break # 只获取第一个结果
logger.info(f"最终返回消息: '{result_message}'")
return result_message or "设置提醒成功"
except Exception as e:
return f"设置提醒时出错:{str(e)}"
async def set_task(self, event: Union[AstrMessageEvent, Context], text: str, datetime_str: str, repeat: str = None, holiday_type: str = None, group_id: str = None):
'''设置一个任务,到时间后会让AI执行该任务
Args:
text(string): 任务内容,AI将执行的操作
datetime_str(string): 任务执行时间,格式为 %Y-%m-%d %H:%M
repeat(string): 重复类型,可选值:daily(每天),weekly(每周),monthly(每月),yearly(每年),none(不重复)
holiday_type(string): 可选,节假日类型:workday(仅工作日执行),holiday(仅法定节假日执行)
group_id(string): 可选,指定群聊ID,用于在特定群聊中设置任务
'''
# 权限检查
if hasattr(event, 'get_sender_id'):
from .utils import check_permission_and_return_error
error_msg = check_permission_and_return_error(event, self.star.whitelist)
if error_msg:
return error_msg
try:
# 如果是Context类型,无法使用统一处理器,使用原有逻辑
if isinstance(event, Context):
return await self._legacy_set_task(event, text, datetime_str, repeat, holiday_type)
# 为LLM工具使用简单的时间解析
try:
parsed_datetime_str = parse_datetime_for_llm(datetime_str)
except ValueError as e:
return str(e)
# 使用统一处理器处理任务
result_message = None
async for result in self.processor.process_add_item(
event, 'task', text, parsed_datetime_str, None, repeat, holiday_type, group_id, time_already_parsed=True
):
# event.plain_result() 返回的是MessageEventResult对象
# 使用get_plain_text()方法提取纯文本消息
if hasattr(result, 'get_plain_text'):
result_message = result.get_plain_text()
elif hasattr(result, 'chain') and result.chain:
# 手动提取链中的文本
from astrbot.core.message.components import Plain
texts = [comp.text for comp in result.chain if isinstance(comp, Plain)]
result_message = " ".join(texts)
else:
result_message = str(result)
break # 只获取第一个结果
return result_message or "设置任务成功"
except Exception as e:
return f"设置任务时出错:{str(e)}"
async def _legacy_set_reminder(self, event: Context, text: str, datetime_str: str, user_name: str = "用户", repeat: str = None, holiday_type: str = None):
'''兼容Context模式的设置提醒方法(原有逻辑)'''
try:
msg_origin = self.context.get_event_queue()._queue[0].session_id
creator_id = None # Context 模式下无法获取创建者ID
creator_name = None
# 使用兼容性处理器确保key存在
actual_key = self.star.compatibility_handler.ensure_key_exists(msg_origin)
# 检查提醒数量限制
can_create, error_msg = check_reminder_limit(
self.reminder_data,
actual_key,
self.star.max_reminders_per_user,
self.unique_session,
creator_id
)
if not can_create:
return error_msg
# 处理重复类型和节假日类型的组合
final_repeat = repeat or "none"
if repeat and holiday_type:
final_repeat = f"{repeat}_{holiday_type}"
reminder = {
"text": text,
"datetime": datetime_str,
"user_name": user_name,
"repeat": final_repeat,
"creator_id": creator_id,
"creator_name": creator_name, # 添加创建者昵称
"is_task": False # 标记为提醒,不是任务
}
self.reminder_data[actual_key].append(reminder)
# 解析时间
dt = datetime.datetime.strptime(datetime_str, "%Y-%m-%d %H:%M")
# 设置定时任务并保存任务ID
job_id = self.scheduler_manager.add_job(actual_key, reminder, dt)
reminder["job_id"] = job_id # 保存任务ID到提醒数据中
await save_reminder_data(self.data_file, self.reminder_data)
# 构建提示信息
repeat_str = ""
if repeat == "daily" and not holiday_type:
repeat_str = ",每天重复"
elif repeat == "daily" and holiday_type == "workday":
repeat_str = ",每个工作日重复(法定节假日不触发)"
elif repeat == "daily" and holiday_type == "holiday":
repeat_str = ",每个法定节假日重复"
elif repeat == "weekly" and not holiday_type:
repeat_str = ",每周重复"
elif repeat == "weekly" and holiday_type == "workday":
repeat_str = ",每周的这一天重复,但仅工作日触发"
elif repeat == "weekly" and holiday_type == "holiday":
repeat_str = ",每周的这一天重复,但仅法定节假日触发"
elif repeat == "monthly" and not holiday_type:
repeat_str = ",每月重复"
elif repeat == "monthly" and holiday_type == "workday":
repeat_str = ",每月的这一天重复,但仅工作日触发"
elif repeat == "monthly" and holiday_type == "holiday":
repeat_str = ",每月的这一天重复,但仅法定节假日触发"
elif repeat == "yearly" and not holiday_type:
repeat_str = ",每年重复"
elif repeat == "yearly" and holiday_type == "workday":
repeat_str = ",每年的这一天重复,但仅工作日触发"
elif repeat == "yearly" and holiday_type == "holiday":
repeat_str = ",每年的这一天重复,但仅法定节假日触发"
return f"已设置提醒:\n内容: {text}\n时间: {datetime_str}{repeat_str}\n\n使用 /rmd ls 查看所有提醒"
except Exception as e:
return f"设置提醒时出错:{str(e)}"
async def _legacy_set_task(self, event: Context, text: str, datetime_str: str, repeat: str = None, holiday_type: str = None):
'''兼容Context模式的设置任务方法(原有逻辑)'''
try:
msg_origin = self.context.get_event_queue()._queue[0].session_id
creator_id = None # Context 模式下无法获取创建者ID
creator_name = None
# 使用兼容性处理器确保key存在
actual_key = self.star.compatibility_handler.ensure_key_exists(msg_origin)
# 检查提醒数量限制
can_create, error_msg = check_reminder_limit(
self.reminder_data,
actual_key,
self.star.max_reminders_per_user,
self.unique_session,
creator_id
)
if not can_create:
return error_msg
# 处理重复类型和节假日类型的组合
final_repeat = repeat or "none"
if repeat and holiday_type:
final_repeat = f"{repeat}_{holiday_type}"
task = {
"text": text,
"datetime": datetime_str,
"user_name": "用户", # 任务模式下不需要特别指定用户名
"repeat": final_repeat,
"creator_id": creator_id,
"creator_name": creator_name, # 添加创建者昵称
"is_task": True # 标记为任务,不是提醒
}
self.reminder_data[actual_key].append(task)
# 解析时间
dt = datetime.datetime.strptime(datetime_str, "%Y-%m-%d %H:%M")
# 设置定时任务并保存任务ID
job_id = self.scheduler_manager.add_job(actual_key, task, dt)
task["job_id"] = job_id # 保存任务ID到任务数据中
await save_reminder_data(self.data_file, self.reminder_data)
# 构建提示信息
repeat_str = ""
if repeat == "daily" and not holiday_type:
repeat_str = ",每天重复"
elif repeat == "daily" and holiday_type == "workday":
repeat_str = ",每个工作日重复(法定节假日不触发)"
elif repeat == "daily" and holiday_type == "holiday":
repeat_str = ",每个法定节假日重复"
elif repeat == "weekly" and not holiday_type:
repeat_str = ",每周重复"
elif repeat == "weekly" and holiday_type == "workday":
repeat_str = ",每周的这一天重复,但仅工作日触发"
elif repeat == "weekly" and holiday_type == "holiday":
repeat_str = ",每周的这一天重复,但仅法定节假日触发"
elif repeat == "monthly" and not holiday_type:
repeat_str = ",每月重复"
elif repeat == "monthly" and holiday_type == "workday":
repeat_str = ",每月的这一天重复,但仅工作日触发"
elif repeat == "monthly" and holiday_type == "holiday":
repeat_str = ",每月的这一天重复,但仅法定节假日触发"
elif repeat == "yearly" and not holiday_type:
repeat_str = ",每年重复"
elif repeat == "yearly" and holiday_type == "workday":
repeat_str = ",每年的这一天重复,但仅工作日触发"
elif repeat == "yearly" and holiday_type == "holiday":
repeat_str = ",每年的这一天重复,但仅法定节假日触发"
return f"已设置任务:\n内容: {text}\n时间: {datetime_str}{repeat_str}\n\n使用 /rmd ls 查看所有任务"
except Exception as e:
return f"设置任务时出错:{str(e)}"
async def delete_reminder(self, event: Union[AstrMessageEvent, Context],
content: str = None, # 任务内容关键词
time: str = None, # 具体时间点 HH:MM
weekday: str = None, # 星期 mon,tue,wed,thu,fri,sat,sun
repeat_type: str = None, # 重复类型 daily,weekly,monthly,yearly
date: str = None, # 具体日期 YYYY-MM-DD
all: str = None, # 是否删除所有 "yes"/"no"
task_only: str = "no", # 是否只删除任务
reminder_only: str = "no", # 是否只删除提醒
group_id: str = None # 可选,指定群聊ID
):
'''删除符合条件的提醒或者任务,可组合多个条件进行精确筛选
Args:
content(string): 可选,提醒或者任务内容包含的关键词
time(string): 可选,具体时间点,格式为 HH:MM,如 "08:00"
weekday(string): 可选,星期几,可选值:mon,tue,wed,thu,fri,sat,sun
repeat_type(string): 可选,重复类型,可选值:daily,weekly,monthly,yearly
date(string): 可选,具体日期,格式为 YYYY-MM-DD,如 "2024-02-09"
all(string): 可选,是否删除所有提醒,可选值:yes/no,默认no
task_only(string): 可选,是否只删除任务,可选值:yes/no,默认no
reminder_only(string): 可选,是否只删除提醒,可选值:yes/no,默认no
group_id(string): 可选,指定群聊ID,用于删除特定群聊中的提醒或任务
'''
# 权限检查
if hasattr(event, 'get_sender_id'):
from .utils import check_permission_and_return_error
error_msg = check_permission_and_return_error(event, self.star.whitelist)
if error_msg:
return error_msg
try:
if isinstance(event, Context):
msg_origin = self.context.get_event_queue()._queue[0].session_id
creator_id = None
else:
creator_id = event.get_sender_id()
if group_id:
# 远程群聊操作 - 构建远程会话ID
from .command_utils import SessionHelper
msg_origin = SessionHelper.build_remote_session_id(event, group_id, self.unique_session)
else:
# 本地操作
raw_msg_origin = event.unified_msg_origin
# 使用会话隔离功能获取会话ID
msg_origin = self.get_session_id(raw_msg_origin, creator_id)
# 调试信息:打印所有调度任务
logger.info("Current jobs in scheduler:")
for job in self.scheduler_manager.scheduler.get_jobs():
logger.info(f"Job ID: {job.id}, Next run: {job.next_run_time}, Args: {job.args}")
# 使用兼容性处理器获取提醒列表
reminders = self.star.compatibility_handler.get_reminders(msg_origin)
actual_key = self.star.compatibility_handler.get_actual_key(msg_origin)
if not reminders:
location_desc = f"群聊 {group_id} 中" if group_id else ""
return f"当前{location_desc}没有任何提醒或任务。"
# 用于存储要删除的任务索引
to_delete = []
# 验证星期格式
week_map = {
'mon': 0, 'tue': 1, 'wed': 2, 'thu': 3,
'fri': 4, 'sat': 5, 'sun': 6
}
if weekday and weekday.lower() not in week_map:
return "星期格式错误,可选值:mon,tue,wed,thu,fri,sat,sun"
# 验证重复类型
repeat_types = ["daily", "weekly", "monthly", "yearly"]
if repeat_type and repeat_type.lower() not in repeat_types:
return "重复类型错误,可选值:daily,weekly,monthly,yearly"
for i, reminder in enumerate(reminders):
dt = datetime.datetime.strptime(reminder["datetime"], "%Y-%m-%d %H:%M")
# 检查是否只删除任务或只删除提醒
is_task_only = task_only and task_only.lower() == "yes"
is_reminder_only = reminder_only and reminder_only.lower() == "yes"
if is_task_only and not reminder.get("is_task", False):
continue
if is_reminder_only and reminder.get("is_task", False):
continue
# 如果指定删除所有,直接添加
if all and all.lower() == "yes":
to_delete.append(i)
continue
# 检查各个条件,所有指定的条件都必须满足
match = True
# 检查内容
if content and content not in reminder["text"]:
match = False
# 检查时间点
if time:
reminder_time = dt.strftime("%H:%M")
if reminder_time != time:
match = False
# 检查星期
if weekday:
if reminder.get("repeat") == "weekly":
# 对于每周重复的任务,检查是否在指定星期执行
if dt.weekday() != week_map[weekday.lower()]:
match = False
else:
# 对于非每周重复的任务,检查日期是否落在指定星期
if dt.weekday() != week_map[weekday.lower()]:
match = False
# 检查重复类型
if repeat_type and reminder.get("repeat") != repeat_type.lower():
match = False
# 检查具体日期
if date:
reminder_date = dt.strftime("%Y-%m-%d")
if reminder_date != date:
match = False
# 如果所有条件都满足,添加到删除列表
if match:
to_delete.append(i)
if not to_delete:
conditions = []
if content:
conditions.append(f"内容包含{content}")
if time:
conditions.append(f"时间为{time}")
if weekday:
conditions.append(f"在{weekday}")
if repeat_type:
conditions.append(f"重复类型为{repeat_type}")
if date:
conditions.append(f"日期为{date}")
if task_only:
conditions.append("仅任务")
if reminder_only:
conditions.append("仅提醒")
location_desc = f"群聊 {group_id} 中" if group_id else ""
return f"没有在{location_desc}找到符合条件的提醒或任务:{', '.join(conditions)}"
# 从后往前删除,避免索引变化
deleted_reminders = []
for i in sorted(to_delete, reverse=True):
reminder = reminders[i]
# 调试信息:打印正在删除的任务
logger.info(f"Attempting to delete {'task' if reminder.get('is_task', False) else 'reminder'}: {reminder}")
# 尝试删除调度任务 - 优先使用保存的任务ID
job_found = False
# 如果有保存的任务ID,直接删除
if reminder.get('job_id'):
try:
self.scheduler_manager.remove_job(reminder['job_id'])
logger.info(f"Successfully removed job by stored ID: {reminder['job_id']}")
job_found = True
except Exception as e:
logger.warning(f"Failed to remove job by stored ID {reminder['job_id']}: {str(e)}")
# 如果直接删除失败,则通过内容匹配删除
if not job_found:
for job in self.scheduler_manager.scheduler.get_jobs():
if job.id.startswith(f"reminder_") and len(job.args) >= 2:
try:
# 检查任务参数中的提醒内容是否匹配
job_session_id = job.args[0]
job_reminder = job.args[1]
if (job_session_id == actual_key and
isinstance(job_reminder, dict) and
job_reminder.get('text') == reminder['text'] and
job_reminder.get('datetime') == reminder['datetime']):
self.scheduler_manager.remove_job(job.id)
logger.info(f"Successfully removed job by content match: {job.id}")
job_found = True
break
except Exception as e:
logger.error(f"Error checking job {job.id}: {str(e)}")
if not job_found:
logger.warning(f"No matching job found for removed item: {reminder.get('text', 'unknown')}")
deleted_reminders.append(reminder)
reminders.pop(i)
# 更新数据
self.reminder_data[actual_key] = reminders
await save_reminder_data(self.data_file, self.reminder_data)
# 调试信息:打印剩余的调度任务
logger.info("Remaining jobs in scheduler:")
for job in self.scheduler_manager.scheduler.get_jobs():
logger.info(f"Job ID: {job.id}, Next run: {job.next_run_time}, Args: {job.args}")
# 生成删除报告
location_desc = f"群聊 {group_id} 中的" if group_id else ""
if len(deleted_reminders) == 1:
item_type = "任务" if deleted_reminders[0].get("is_task", False) else "提醒"
return f"已删除{location_desc}{item_type}:{deleted_reminders[0]['text']}"
else:
tasks = []
reminders_list = []
for r in deleted_reminders:
if r.get("is_task", False):
tasks.append(f"- {r['text']}")
else:
reminders_list.append(f"- {r['text']}")
result = f"已删除{location_desc} {len(deleted_reminders)} 个项目:"
if tasks:
result += f"\n\n任务({len(tasks)}):\n" + "\n".join(tasks)
if reminders_list:
result += f"\n\n提醒({len(reminders_list)}):\n" + "\n".join(reminders_list)
return result
except Exception as e:
return f"删除提醒或任务时出错:{str(e)}"