diff --git a/services/analysis/ml_analyzer.py b/services/analysis/ml_analyzer.py index 0f7f3f38..c2fdacfc 100644 --- a/services/analysis/ml_analyzer.py +++ b/services/analysis/ml_analyzer.py @@ -441,10 +441,12 @@ async def replay_memory(self, group_id: str, new_messages: List[Dict[str, Any]], await self.temporary_persona_updater.apply_comprehensive_update_to_system_prompt( group_id, insights_data ) - logger.info(f"成功将强化学习结果集成到system_prompt: {group_id}") + logger.info( + f"强化学习结果已提交 WebUI 人格审查: {group_id}" + ) except Exception as e: - logger.error(f"集成强化学习结果到system_prompt失败: {e}") + logger.error(f"提交强化学习结果到人格审查失败: {e}") # 这里可以将 refined_data 传递给 PersonaUpdater 进行人格更新 diff --git a/services/analysis/multidimensional_analyzer.py b/services/analysis/multidimensional_analyzer.py index 3011b159..9fc73731 100644 --- a/services/analysis/multidimensional_analyzer.py +++ b/services/analysis/multidimensional_analyzer.py @@ -570,10 +570,12 @@ async def analyze_message_context(self, event: AstrMessageEvent, message_text: s await self.temporary_persona_updater.apply_comprehensive_update_to_system_prompt( group_id, update_data ) - logger.info(f"成功将多维度分析结果集成到system_prompt: {group_id}") + logger.info( + f"多维度分析结果已提交 WebUI 人格审查: {group_id}" + ) except Exception as e: - logger.error(f"集成分析结果到system_prompt失败: {e}") + logger.error(f"提交分析结果到人格审查失败: {e}") # 这里不抛出异常,让分析结果正常返回 return analysis_result diff --git a/services/learning/realtime_processor.py b/services/learning/realtime_processor.py index 104da6c5..b9bc81f8 100644 --- a/services/learning/realtime_processor.py +++ b/services/learning/realtime_processor.py @@ -257,9 +257,6 @@ async def _process_expression_style_learning( group_id, limit=5 ) if learned_patterns: - await self._apply_style_to_prompt_temporarily( - group_id, learned_patterns - ) few_shots_content = ( await self._dialog_analyzer.generate_few_shots_dialog( group_id, message_data_list @@ -270,70 +267,22 @@ async def _process_expression_style_learning( group_id, learned_patterns, few_shots_content ) logger.info( - f"群组 {group_id} 表达风格学习结果已临时应用到prompt," - "并已提交人格审查" + f"群组 {group_id} 表达风格学习结果已提交人格审查," + "等待批准后再写入 begin_dialogs" ) else: logger.info( - f"群组 {group_id} 表达风格学习结果已临时应用到prompt" + f"群组 {group_id} 表达风格学习成功," + "但未生成可审查的 few-shot 示例,跳过人格写入" ) except Exception as e: logger.error(f"处理表达风格学习结果失败: {e}") self._learning_stats.style_updates += 1 - if self.update_system_prompt_callback: - await self.update_system_prompt_callback(group_id) - logger.info( - f"群组 {group_id} 表达风格学习结果已应用到system_prompt" - ) - except Exception as e: logger.error(f"群组 {group_id} 表达风格学习处理失败: {e}") - # Temporary style application — injected as begin_dialogs example conversations - - async def _apply_style_to_prompt_temporarily( - self, group_id: str, learned_patterns: List[Any] - ) -> None: - """Inject learned style patterns as begin_dialogs example conversations.""" - try: - if not learned_patterns: - return - - dialog_pairs: list = [] - for pattern in learned_patterns[:5]: - situation = ( - pattern.situation - if hasattr(pattern, "situation") - else pattern.get("situation", "") - ) - expression = ( - pattern.expression - if hasattr(pattern, "expression") - else pattern.get("expression", "") - ) - if situation and expression: - dialog_pairs.append((situation, expression)) - - if not dialog_pairs: - return - - success = await self._temporary_persona_updater.apply_style_as_begin_dialogs( - group_id, dialog_pairs - ) - - if success: - logger.info( - f"群组 {group_id} 风格示例已注入 begin_dialogs," - f"包含 {len(dialog_pairs)} 组示例对话" - ) - else: - logger.warning(f"群组 {group_id} begin_dialogs 风格注入失败") - - except Exception as e: - logger.error(f"追加 few-shot 风格示例失败: {e}") - # Helpers @staticmethod diff --git a/services/persona/temporary_persona_updater.py b/services/persona/temporary_persona_updater.py index 82f88134..be7ad509 100644 --- a/services/persona/temporary_persona_updater.py +++ b/services/persona/temporary_persona_updater.py @@ -5,6 +5,7 @@ import json import time import asyncio +import hashlib from typing import Dict, List, Optional, Any, Tuple from datetime import datetime, timedelta @@ -12,6 +13,7 @@ from astrbot.api.star import Context from ...config import PluginConfig +from ...constants import UPDATE_TYPE_PROGRESSIVE_PERSONA_LEARNING from ...core.interfaces import IPersonaUpdater, IPersonaBackupManager @@ -1268,9 +1270,138 @@ async def _apply_context_awareness_update_to_system_prompt(self, group_id: str, logger.error(f"直接更新上下文感知到system prompt失败: {e}") return False + @staticmethod + def _format_review_value(value: Any) -> str: + """Convert runtime context values to readable review text.""" + if value is None: + return "" + if isinstance(value, str): + return value.strip() + if isinstance(value, (list, tuple, set)): + return "、".join(str(item).strip() for item in value if str(item).strip()) + if isinstance(value, dict): + return json.dumps(value, ensure_ascii=False, sort_keys=True) + return str(value).strip() + + @staticmethod + def _append_text_block(base_text: str, appended_text: str) -> str: + base = (base_text or '').strip() + appended = (appended_text or '').strip() + if base and appended: + return f"{base}\n\n{appended}" + return appended or base + + def _build_comprehensive_review_content(self, update_data: dict) -> str: + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M') + sections: List[str] = [] + + def add_section(title: str, fields: List[Tuple[str, Any]], instruction: str) -> None: + lines = [f"【{title} - {timestamp}】"] + for label, value in fields: + formatted = self._format_review_value(value) + if formatted: + lines.append(f"• {label}: {formatted}") + if len(lines) == 1: + return + lines.append(instruction) + sections.append("\n".join(lines)) + + if 'mood' in update_data: + add_section( + '当前情绪状态', + [('情绪描述', update_data.get('mood'))], + '• 请根据此情绪调整回复的语气和风格,保持语言与心情对应', + ) + + relationship_info = update_data.get('social_relationship') + if isinstance(relationship_info, dict): + add_section( + '当前社交关系状态', + [ + ('用户关系', relationship_info.get('user_relationships')), + ('群体氛围', relationship_info.get('group_atmosphere')), + ('互动风格', relationship_info.get('interaction_style')), + ], + '• 请根据当前社交关系状态调整回复方式和互动风格', + ) + + profile_info = update_data.get('user_profile') + if isinstance(profile_info, dict): + add_section( + '当前用户档案', + [ + ('用户偏好', profile_info.get('preferences')), + ('兴趣爱好', profile_info.get('interests')), + ('沟通风格', profile_info.get('communication_style')), + ('性格特征', profile_info.get('personality_traits')), + ], + '• 请根据用户档案信息调整回复内容和方式以更好地适应用户', + ) + + insights_info = update_data.get('learning_insights') + if isinstance(insights_info, dict): + add_section( + '当前学习洞察', + [ + ('交互模式', insights_info.get('interaction_patterns')), + ('改进建议', insights_info.get('improvement_suggestions')), + ('有效策略', insights_info.get('effective_strategies')), + ('学习重点', insights_info.get('learning_focus')), + ], + '• 请根据学习洞察调整回复策略和改进交互质量', + ) + + context_info = update_data.get('context_awareness') + if isinstance(context_info, dict): + add_section( + '当前上下文状态', + [ + ('当前话题', context_info.get('current_topic')), + ('对话状态', context_info.get('conversation_state')), + ('近期关注', context_info.get('recent_focus')), + ('对话流向', context_info.get('dialogue_flow')), + ], + '• 请根据当前上下文状态保持话题连贯性并提供相关回复', + ) + + return "\n\n".join(sections).strip() + + @staticmethod + def _runtime_context_signature(update_data: dict) -> str: + normalized = json.dumps( + update_data or {}, + ensure_ascii=False, + sort_keys=True, + default=str, + ) + return hashlib.sha256(normalized.encode('utf-8')).hexdigest() + + async def _has_pending_runtime_context_review( + self, group_id: str, signature: str, proposed_content: str + ) -> bool: + if not self.db_manager or not hasattr(self.db_manager, 'get_pending_persona_learning_reviews'): + return False + try: + pending_reviews = await self.db_manager.get_pending_persona_learning_reviews() + for review in pending_reviews or []: + if review.get('group_id') != group_id: + continue + metadata = review.get('metadata') or {} + if metadata.get('runtime_context_review'): + return True + if review.get('proposed_content') == proposed_content: + return True + except Exception as exc: + logger.debug(f"检查运行时上下文审查重复项失败: {exc}") + return False + async def apply_comprehensive_update_to_system_prompt(self, group_id: str, update_data: dict) -> bool: """ - 综合应用多种类型的增量更新到 system prompt + 将多种运行时上下文更新提交到 WebUI 人格审查队列。 + + 这里沿用 MaiBot 的 learn-before-upsert 思路:自动学习阶段只生成 + 待审查候选,不直接修改 AstrBot 人格。用户在 WebUI 批准后才会 + 通过 PersonaReviewService 追加到 system_prompt。 Args: group_id: 群组ID @@ -1280,44 +1411,63 @@ async def apply_comprehensive_update_to_system_prompt(self, group_id: str, updat bool: 是否应用成功 """ try: - success_count = 0 - total_updates = 0 - - # 应用情绪更新 - if 'mood' in update_data: - total_updates += 1 - if await self._apply_mood_update_to_system_prompt(group_id, update_data['mood']): - success_count += 1 - - # 应用社交关系更新 - if 'social_relationship' in update_data: - total_updates += 1 - if await self._apply_social_relationship_update_to_system_prompt(group_id, update_data['social_relationship']): - success_count += 1 - - # 应用用户档案更新 - if 'user_profile' in update_data: - total_updates += 1 - if await self._apply_user_profile_update_to_system_prompt(group_id, update_data['user_profile']): - success_count += 1 - - # 应用学习洞察更新 - if 'learning_insights' in update_data: - total_updates += 1 - if await self._apply_learning_insights_update_to_system_prompt(group_id, update_data['learning_insights']): - success_count += 1 - - # 应用上下文感知更新 - if 'context_awareness' in update_data: - total_updates += 1 - if await self._apply_context_awareness_update_to_system_prompt(group_id, update_data['context_awareness']): - success_count += 1 - - logger.info(f"综合更新完成: {success_count}/{total_updates} 项更新成功应用") - return success_count > 0 + proposed_content = self._build_comprehensive_review_content(update_data or {}) + if not proposed_content: + logger.debug(f"群组 {group_id} 运行时上下文为空,跳过人格审查创建") + return False + + signature = self._runtime_context_signature(update_data or {}) + if await self._has_pending_runtime_context_review( + group_id, signature, proposed_content + ): + logger.info(f"群组 {group_id} 已存在运行时上下文待审查记录,跳过重复创建") + return True + + original_prompt = '' + try: + persona = await self._get_framework_persona(group_id) + if persona: + original_prompt = persona.get('prompt', '') or persona.get('system_prompt', '') + except Exception as exc: + logger.debug(f"获取当前人格用于审查快照失败: {exc}") + + new_content = self._append_text_block(original_prompt, proposed_content) + metadata = { + 'runtime_context_review': True, + 'runtime_context_signature': signature, + 'update_categories': list((update_data or {}).keys()), + 'raw_update_data': update_data or {}, + 'application_mode': 'append_system_prompt', + 'review_flow': 'maibot_web_review_before_apply', + } + + if not self.db_manager or not hasattr(self.db_manager, 'add_persona_learning_review'): + logger.warning("数据库管理器不可用,无法创建运行时上下文人格审查记录") + return False + + review_id = await self.db_manager.add_persona_learning_review( + group_id=group_id, + proposed_content=proposed_content, + learning_source=UPDATE_TYPE_PROGRESSIVE_PERSONA_LEARNING, + confidence_score=0.7, + raw_analysis="运行时上下文候选已提交 WebUI 审查,批准后才会写入人格", + metadata=metadata, + original_content=original_prompt, + new_content=new_content, + ) + + if review_id: + logger.info( + f"群组 {group_id} 运行时上下文已提交人格审查 " + f"(ID: {review_id}),等待批准后再写入 system_prompt" + ) + return True + + logger.warning(f"群组 {group_id} 运行时上下文审查记录创建失败") + return False except Exception as e: - logger.error(f"综合应用增量更新失败: {e}") + logger.error(f"提交运行时上下文人格审查失败: {e}") return False async def _create_mood_backup_persona(self, group_id: str, mood_type: str) -> bool: diff --git a/tests/unit/test_learning_chain_regressions.py b/tests/unit/test_learning_chain_regressions.py index 15d3f8a0..4fa4a891 100644 --- a/tests/unit/test_learning_chain_regressions.py +++ b/tests/unit/test_learning_chain_regressions.py @@ -45,6 +45,9 @@ filter_learning_messages, should_ignore_learning_sample, ) +from self_learning_EterU.services.persona.temporary_persona_updater import ( + TemporaryPersonaUpdater, +) from self_learning_EterU.services.state.enhanced_interaction import ( EnhancedInteractionService, ) @@ -879,6 +882,124 @@ async def test_realtime_expression_learning_is_batch_gated(): assert learner.trigger_learning_for_group.await_count == 2 +@pytest.mark.unit +@pytest.mark.asyncio +async def test_realtime_expression_learning_creates_review_without_persona_write(): + config = SimpleNamespace( + message_min_length=1, + message_max_length=500, + enable_expression_patterns=True, + expression_learning_trigger_messages=1, + ) + collector = SimpleNamespace( + get_statistics=AsyncMock(return_value={"raw_messages": 10}) + ) + pattern = SimpleNamespace( + situation="用户问候", + expression="元气回应", + to_dict=lambda: {"situation": "用户问候", "expression": "元气回应"}, + ) + learner = SimpleNamespace( + trigger_learning_for_group=AsyncMock(return_value=True), + get_expression_patterns=AsyncMock(return_value=[pattern]), + ) + factory_manager = SimpleNamespace( + get_component_factory=lambda: SimpleNamespace( + create_expression_pattern_learner=lambda: learner + ) + ) + db_manager = SimpleNamespace( + get_recent_raw_messages=AsyncMock( + return_value=[ + { + "id": idx, + "sender_id": f"user-{idx}", + "sender_name": f"User {idx}", + "message": f"这是第{idx}条足够长的表达学习消息", + "timestamp": time.time() + idx, + "platform": "test", + } + for idx in range(1, 6) + ] + ) + ) + dialog_analyzer = SimpleNamespace( + generate_few_shots_dialog=AsyncMock(return_value="A: 你好\nB: 我来了"), + create_style_learning_review_request=AsyncMock(), + ) + temporary_persona_updater = SimpleNamespace( + apply_style_as_begin_dialogs=AsyncMock(), + ) + update_callback = AsyncMock() + + processor = RealtimeProcessor( + plugin_config=config, + message_collector=collector, + multidimensional_analyzer=SimpleNamespace(), + persona_manager=SimpleNamespace(), + temporary_persona_updater=temporary_persona_updater, + dialog_analyzer=dialog_analyzer, + learning_stats=SimpleNamespace(style_updates=0), + factory_manager=factory_manager, + db_manager=db_manager, + ) + processor.update_system_prompt_callback = update_callback + + await processor.process_expression_learning( + "group-a", + "这是用于表达学习审查的消息", + "user-a", + ) + + dialog_analyzer.create_style_learning_review_request.assert_awaited_once() + temporary_persona_updater.apply_style_as_begin_dialogs.assert_not_awaited() + update_callback.assert_not_awaited() + + +@pytest.mark.unit +@pytest.mark.asyncio +async def test_runtime_context_updates_are_queued_for_review_without_persona_write(): + updater = TemporaryPersonaUpdater.__new__(TemporaryPersonaUpdater) + updater.db_manager = SimpleNamespace( + get_pending_persona_learning_reviews=AsyncMock(return_value=[]), + add_persona_learning_review=AsyncMock(return_value=99), + ) + updater._get_framework_persona = AsyncMock( + return_value={"name": "default", "prompt": "Base prompt"} + ) + updater._update_framework_persona = AsyncMock() + + update_data = { + "learning_insights": { + "interaction_patterns": "喜欢短句互动", + "improvement_suggestions": "少把记忆主题写进固定人格", + "effective_strategies": "先审查再应用", + "learning_focus": "记忆重放学习", + }, + "context_awareness": { + "current_topic": "MaiBot 记忆主题", + "recent_focus": "人格污染", + "dialogue_flow": "话题相关度: 0.8", + }, + } + + success = await updater.apply_comprehensive_update_to_system_prompt( + "group-a", + update_data, + ) + + assert success is True + updater._update_framework_persona.assert_not_awaited() + updater.db_manager.add_persona_learning_review.assert_awaited_once() + review_kwargs = updater.db_manager.add_persona_learning_review.await_args.kwargs + assert review_kwargs["group_id"] == "group-a" + assert review_kwargs["original_content"] == "Base prompt" + assert review_kwargs["new_content"].startswith("Base prompt\n\n") + assert "MaiBot 记忆主题" in review_kwargs["proposed_content"] + assert review_kwargs["metadata"]["runtime_context_review"] is True + assert review_kwargs["metadata"]["review_flow"] == "maibot_web_review_before_apply" + + @pytest.mark.unit @pytest.mark.asyncio async def test_expression_pattern_save_handles_duplicate_existing_rows(tmp_path):