diff --git a/src/chat/affinity_flow/chatter.py b/src/chat/affinity_flow/chatter.py index 31e3d89be..e31fd6d60 100644 --- a/src/chat/affinity_flow/chatter.py +++ b/src/chat/affinity_flow/chatter.py @@ -54,7 +54,7 @@ class AffinityFlowChatter: """ try: # 使用增强版规划器处理消息 - actions, target_message = await self.planner.plan(mode=ChatMode.FOCUS, use_enhanced=True) + actions, target_message = await self.planner.plan(mode=ChatMode.FOCUS) self.stats["plans_created"] += 1 # 执行动作(如果规划器返回了动作) @@ -66,7 +66,7 @@ class AffinityFlowChatter: # 更新统计 self.stats["messages_processed"] += 1 self.stats["actions_executed"] += execution_result.get("executed_count", 0) - self.stats["successful_executions"] += 1 # 假设成功 + self.stats["successful_executions"] += 1 # TODO:假设成功 self.last_activity_time = time.time() result = { diff --git a/src/chat/affinity_flow/interest_scoring.py b/src/chat/affinity_flow/interest_scoring.py index d96c555f6..5515952d1 100644 --- a/src/chat/affinity_flow/interest_scoring.py +++ b/src/chat/affinity_flow/interest_scoring.py @@ -50,7 +50,8 @@ class InterestScoringSystem: def calculate_interest_scores(self, messages: List[DatabaseMessages], bot_nickname: str) -> List[InterestScore]: """计算消息的兴趣度评分""" scores = [] - user_messages = [msg for msg in messages if msg.role == "user"] + # 通过 user_id 判断是否是用户消息(非机器人发送的消息) + user_messages = [msg for msg in messages if str(msg.user_info.user_id) != str(global_config.bot.qq_account)] for msg in user_messages: score = self._calculate_single_message_score(msg, bot_nickname) @@ -61,16 +62,16 @@ class InterestScoringSystem: def _calculate_single_message_score(self, message: DatabaseMessages, bot_nickname: str) -> InterestScore: """计算单条消息的兴趣度评分""" # 1. 计算兴趣匹配度 - interest_match_score = self._calculate_interest_match_score(message.content) + interest_match_score = self._calculate_interest_match_score(message.processed_plain_text) # 2. 计算关系分 - relationship_score = self._calculate_relationship_score(message.user_id) + relationship_score = self._calculate_relationship_score(message.user_info.user_id) # 3. 计算提及分数 - mentioned_score = self._calculate_mentioned_score(message.content, bot_nickname) + mentioned_score = self._calculate_mentioned_score(message.processed_plain_text, bot_nickname) # 4. 计算时间因子 - time_factor_score = self._calculate_time_factor_score(message.timestamp) + time_factor_score = self._calculate_time_factor_score(message.time) # 5. 计算总分 total_score = ( diff --git a/src/chat/planner_actions/action_manager.py b/src/chat/planner_actions/action_manager.py index 23755e42d..b6150fdd8 100644 --- a/src/chat/planner_actions/action_manager.py +++ b/src/chat/planner_actions/action_manager.py @@ -1,10 +1,19 @@ -from typing import Dict, Optional, Type +import asyncio +import traceback +import time +import random +from typing import Dict, Optional, Type, Any, Tuple -from src.chat.message_receive.chat_stream import ChatStream + +from src.chat.utils.timer_calculator import Timer +from src.person_info.person_info import get_person_info_manager +from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager from src.common.logger import get_logger +from src.config.config import global_config from src.plugin_system.core.component_registry import component_registry from src.plugin_system.base.component_types import ComponentType, ActionInfo from src.plugin_system.base.base_action import BaseAction +from src.plugin_system.apis import generator_api, database_api, send_api, message_api logger = get_logger("action_manager") @@ -25,6 +34,8 @@ class ActionManager: # 初始化时将默认动作加载到使用中的动作 self._using_actions = component_registry.get_default_actions() + self.log_prefix: str = "ActionManager" + # === 执行Action方法 === @staticmethod @@ -124,3 +135,340 @@ class ActionManager: actions_to_restore = list(self._using_actions.keys()) self._using_actions = component_registry.get_default_actions() logger.debug(f"恢复动作集: 从 {actions_to_restore} 恢复到默认动作集 {list(self._using_actions.keys())}") + + async def execute_action( + self, + action_name: str, + chat_id: str, + target_message: Optional[dict] = None, + reasoning: str = "", + action_data: Optional[dict] = None, + thinking_id: Optional[str] = None, + log_prefix: str = "", + ) -> Any: + """ + 执行单个动作的通用函数 + + Args: + action_name: 动作名称 + chat_id: 聊天id + target_message: 目标消息 + reasoning: 执行理由 + action_data: 动作数据 + thinking_id: 思考ID + log_prefix: 日志前缀 + + Returns: + 执行结果 + """ + try: + # 通过chat_id获取chat_stream + chat_manager = get_chat_manager() + chat_stream = chat_manager.get_stream(chat_id) + + if not chat_stream: + logger.error(f"{log_prefix} 无法找到chat_id对应的chat_stream: {chat_id}") + return {"action_type": action_name, "success": False, "reply_text": "", "error": "chat_stream not found"} + + if action_name == "no_action": + return {"action_type": "no_action", "success": True, "reply_text": "", "command": ""} + + if action_name == "no_reply": + # 直接处理no_reply逻辑,不再通过动作系统 + reason = reasoning or "选择不回复" + logger.info(f"{log_prefix} 选择不回复,原因: {reason}") + + # 存储no_reply信息到数据库 + await database_api.store_action_info( + chat_stream=chat_stream, + action_build_into_prompt=False, + action_prompt_display=reason, + action_done=True, + thinking_id=thinking_id, + action_data={"reason": reason}, + action_name="no_reply", + ) + return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""} + + elif action_name != "reply" and action_name != "no_action": + # 执行普通动作 + success, reply_text, command = await self._handle_action( + chat_stream, + action_name, + reasoning, + action_data or {}, + {}, # cycle_timers + thinking_id, + target_message, + ) + return { + "action_type": action_name, + "success": success, + "reply_text": reply_text, + "command": command, + } + else: + # 生成回复 + try: + success, response_set, _ = await generator_api.generate_reply( + chat_stream=chat_stream, + reply_message=target_message, + available_actions=self.get_using_actions(), + enable_tool=global_config.tool.enable_tool, + request_type="chat.replyer", + from_plugin=False, + ) + if not success or not response_set: + logger.info( + f"对 {target_message.get('processed_plain_text') if target_message else '未知消息'} 的回复生成失败" + ) + return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} + except asyncio.CancelledError: + logger.debug(f"{log_prefix} 并行执行:回复生成任务已被取消") + return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} + + # 发送并存储回复 + loop_info, reply_text, cycle_timers_reply = await self._send_and_store_reply( + chat_stream, + response_set, + asyncio.get_event_loop().time(), + target_message, + {}, # cycle_timers + thinking_id, + [], # actions + ) + return {"action_type": "reply", "success": True, "reply_text": reply_text, "loop_info": loop_info} + + except Exception as e: + logger.error(f"{log_prefix} 执行动作时出错: {e}") + logger.error(f"{log_prefix} 错误信息: {traceback.format_exc()}") + return { + "action_type": action_name, + "success": False, + "reply_text": "", + "loop_info": None, + "error": str(e), + } + + async def _handle_action( + self, chat_stream, action, reasoning, action_data, cycle_timers, thinking_id, action_message + ) -> tuple[bool, str, str]: + """ + 处理具体的动作执行 + + Args: + chat_stream: ChatStream实例 + action: 动作名称 + reasoning: 执行理由 + action_data: 动作数据 + cycle_timers: 循环计时器 + thinking_id: 思考ID + action_message: 动作消息 + + Returns: + tuple: (执行是否成功, 回复文本, 命令文本) + + 功能说明: + - 创建对应的动作处理器 + - 执行动作并捕获异常 + - 返回执行结果供上级方法整合 + """ + if not chat_stream: + return False, "", "" + try: + # 创建动作处理器 + action_handler = self.create_action( + action_name=action, + action_data=action_data, + reasoning=reasoning, + cycle_timers=cycle_timers, + thinking_id=thinking_id, + chat_stream=chat_stream, + log_prefix=self.log_prefix, + action_message=action_message, + ) + if not action_handler: + # 动作处理器创建失败,尝试回退机制 + logger.warning(f"{self.log_prefix} 创建动作处理器失败: {action},尝试回退方案") + + # 获取当前可用的动作 + available_actions = self.get_using_actions() + fallback_action = None + + # 回退优先级:reply > 第一个可用动作 + if "reply" in available_actions: + fallback_action = "reply" + elif available_actions: + fallback_action = list(available_actions.keys())[0] + + if fallback_action and fallback_action != action: + logger.info(f"{self.log_prefix} 使用回退动作: {fallback_action}") + action_handler = self.create_action( + action_name=fallback_action, + action_data=action_data, + reasoning=f"原动作'{action}'不可用,自动回退。{reasoning}", + cycle_timers=cycle_timers, + thinking_id=thinking_id, + chat_stream=chat_stream, + log_prefix=self.log_prefix, + action_message=action_message, + ) + + if not action_handler: + logger.error(f"{self.log_prefix} 回退方案也失败,无法创建任何动作处理器") + return False, "", "" + + # 执行动作 + success, reply_text = await action_handler.handle_action() + return success, reply_text, "" + except Exception as e: + logger.error(f"{self.log_prefix} 处理{action}时出错: {e}") + traceback.print_exc() + return False, "", "" + + async def _send_and_store_reply( + self, + chat_stream: ChatStream, + response_set, + loop_start_time, + action_message, + cycle_timers: Dict[str, float], + thinking_id, + actions, + ) -> Tuple[Dict[str, Any], str, Dict[str, float]]: + """ + 发送并存储回复信息 + + Args: + chat_stream: ChatStream实例 + response_set: 回复内容集合 + loop_start_time: 循环开始时间 + action_message: 动作消息 + cycle_timers: 循环计时器 + thinking_id: 思考ID + actions: 动作列表 + + Returns: + Tuple[Dict[str, Any], str, Dict[str, float]]: 循环信息, 回复文本, 循环计时器 + """ + # 发送回复 + with Timer("回复发送", cycle_timers): + reply_text = await self.send_response(chat_stream, response_set, loop_start_time, action_message) + + # 存储reply action信息 + person_info_manager = get_person_info_manager() + + # 获取 platform,如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值 + platform = action_message.get("chat_info_platform") + if platform is None: + platform = getattr(chat_stream, "platform", "unknown") + + # 获取用户信息并生成回复提示 + person_id = person_info_manager.get_person_id( + platform, + action_message.get("user_id", ""), + ) + person_name = await person_info_manager.get_value(person_id, "person_name") + action_prompt_display = f"你对{person_name}进行了回复:{reply_text}" + + # 存储动作信息到数据库 + await database_api.store_action_info( + chat_stream=chat_stream, + action_build_into_prompt=False, + action_prompt_display=action_prompt_display, + action_done=True, + thinking_id=thinking_id, + action_data={"reply_text": reply_text}, + action_name="reply", + ) + + # 构建循环信息 + loop_info: Dict[str, Any] = { + "loop_plan_info": { + "action_result": actions, + }, + "loop_action_info": { + "action_taken": True, + "reply_text": reply_text, + "command": "", + "taken_time": time.time(), + }, + } + + return loop_info, reply_text, cycle_timers + + async def send_response(self, chat_stream, reply_set, thinking_start_time, message_data) -> str: + """ + 发送回复内容的具体实现 + + Args: + chat_stream: ChatStream实例 + reply_set: 回复内容集合,包含多个回复段 + reply_to: 回复目标 + thinking_start_time: 思考开始时间 + message_data: 消息数据 + + Returns: + str: 完整的回复文本 + + 功能说明: + - 检查是否有新消息需要回复 + - 处理主动思考的"沉默"决定 + - 根据消息数量决定是否添加回复引用 + - 逐段发送回复内容,支持打字效果 + - 正确处理元组格式的回复段 + """ + current_time = time.time() + # 计算新消息数量 + new_message_count = message_api.count_new_messages( + chat_id=chat_stream.stream_id, start_time=thinking_start_time, end_time=current_time + ) + + # 根据新消息数量决定是否需要引用回复 + need_reply = new_message_count >= random.randint(2, 4) + + reply_text = "" + is_proactive_thinking = (message_data.get("message_type") == "proactive_thinking") if message_data else True + + first_replied = False + for reply_seg in reply_set: + # 调试日志:验证reply_seg的格式 + logger.debug(f"Processing reply_seg type: {type(reply_seg)}, content: {reply_seg}") + + # 修正:正确处理元组格式 (格式为: (type, content)) + if isinstance(reply_seg, tuple) and len(reply_seg) >= 2: + _, data = reply_seg + else: + # 向下兼容:如果已经是字符串,则直接使用 + data = str(reply_seg) + + if isinstance(data, list): + data = "".join(map(str, data)) + reply_text += data + + # 如果是主动思考且内容为“沉默”,则不发送 + if is_proactive_thinking and data.strip() == "沉默": + logger.info(f"{self.log_prefix} 主动思考决定保持沉默,不发送消息") + continue + + # 发送第一段回复 + if not first_replied: + await send_api.text_to_stream( + text=data, + stream_id=chat_stream.stream_id, + reply_to_message=message_data, + set_reply=need_reply, + typing=False, + ) + first_replied = True + else: + # 发送后续回复 + sent_message = await send_api.text_to_stream( + text=data, + stream_id=chat_stream.stream_id, + reply_to_message=None, + set_reply=False, + typing=True, + ) + + return reply_text \ No newline at end of file diff --git a/src/chat/planner_actions/plan_generator.py b/src/chat/planner_actions/plan_generator.py index ec0a11691..60fa4cbfa 100644 --- a/src/chat/planner_actions/plan_generator.py +++ b/src/chat/planner_actions/plan_generator.py @@ -9,7 +9,7 @@ from src.chat.utils.utils import get_chat_type_and_target_info from src.common.data_models.database_data_model import DatabaseMessages from src.common.data_models.info_data_model import Plan, TargetPersonInfo from src.config.config import global_config -from src.plugin_system.base.component_types import ActionInfo, ChatMode, ComponentType +from src.plugin_system.base.component_types import ActionActivationType, ActionInfo, ChatMode, ChatType, ComponentType from src.plugin_system.core.component_registry import component_registry @@ -95,6 +95,30 @@ class PlanGenerator: if action_name in all_registered_actions: current_available_actions[action_name] = all_registered_actions[action_name] + reply_info = ActionInfo( + name="reply", + component_type=ComponentType.ACTION, + description="系统级动作:选择回复消息的决策", + action_parameters={ + "content": "回复的文本内容", + "reply_to_message_id": "要回复的消息ID" + }, + action_require=[ + "你想要闲聊或者随便附和", + "当用户提到你或艾特你时", + "当需要回答用户的问题时", + "当你想参与对话时", + "当用户分享有趣的内容时" + ], + activation_type=ActionActivationType.ALWAYS, + activation_keywords=[], + associated_types=["text", "reply"], + plugin_name="SYSTEM", + enabled=True, + parallel_action=False, + mode_enable=ChatMode.ALL, + chat_type_allow=ChatType.ALL, + ) no_reply_info = ActionInfo( name="no_reply", component_type=ComponentType.ACTION, @@ -106,5 +130,5 @@ class PlanGenerator: parallel_action=False, ) current_available_actions["no_reply"] = no_reply_info - + current_available_actions["reply"] = reply_info return current_available_actions \ No newline at end of file diff --git a/src/chat/planner_actions/planner.py b/src/chat/planner_actions/planner.py index f89b9846c..28cac9439 100644 --- a/src/chat/planner_actions/planner.py +++ b/src/chat/planner_actions/planner.py @@ -18,12 +18,10 @@ from src.config.config import global_config from src.plugin_system.base.component_types import ChatMode import src.chat.planner_actions.planner_prompts #noga # noqa: F401 # 导入提示词模块以确保其被初始化 +from src.chat.planner_actions import planner_prompts #noqa logger = get_logger("planner") - - - class ActionPlanner: """ 增强版ActionPlanner,集成兴趣度评分和用户关系追踪机制。 @@ -67,7 +65,7 @@ class ActionPlanner: "other_actions_executed": 0, } - async def plan(self, mode: ChatMode = ChatMode.FOCUS, use_enhanced: bool = True) -> Tuple[List[Dict], Optional[Dict]]: + async def plan(self, mode: ChatMode = ChatMode.FOCUS) -> Tuple[List[Dict], Optional[Dict]]: """ 执行完整的增强版规划流程。 @@ -83,10 +81,8 @@ class ActionPlanner: try: self.planner_stats["total_plans"] += 1 - if use_enhanced: - return await self._enhanced_plan_flow(mode) - else: - return await self._standard_plan_flow(mode) + return await self._enhanced_plan_flow(mode) + except Exception as e: logger.error(f"规划流程出错: {e}") @@ -117,17 +113,19 @@ class ActionPlanner: self.interest_scoring.record_reply_action(False) else: self.interest_scoring.record_reply_action(True) - # 4. 筛选 Plan filtered_plan = await self.filter.filter(initial_plan) - # 5. 执行 Plan - await self._execute_plan_with_tracking(filtered_plan) + # 5. 使用 PlanExecutor 执行 Plan + execution_result = await self.executor.execute(filtered_plan) - # 6. 检查关系更新 + # 6. 根据执行结果更新统计信息 + self._update_stats_from_execution_result(execution_result) + + # 7. 检查关系更新 await self.relationship_tracker.check_and_update_relationships() - # 7. 返回结果 + # 8. 返回结果 return self._build_return_result(filtered_plan) except Exception as e: @@ -135,60 +133,54 @@ class ActionPlanner: self.planner_stats["failed_plans"] += 1 return [], None - async def _standard_plan_flow(self, mode: ChatMode) -> Tuple[List[Dict], Optional[Dict]]: - """执行标准规划流程""" - try: - # 1. 生成初始 Plan - initial_plan = await self.generator.generate(mode) - - # 2. 筛选 Plan - filtered_plan = await self.filter.filter(initial_plan) - - # 3. 执行 Plan - await self._execute_plan_with_tracking(filtered_plan) - - # 4. 返回结果 - return self._build_return_result(filtered_plan) - except Exception as e: - logger.error(f"标准规划流程出错: {e}") + logger.error(f"增强版规划流程出错: {e}") self.planner_stats["failed_plans"] += 1 return [], None - - async def _execute_plan_with_tracking(self, plan: Plan): - """执行Plan并追踪用户关系""" - if not plan.decided_actions: + + def _update_stats_from_execution_result(self, execution_result: Dict[str, any]): + """根据执行结果更新规划器统计""" + if not execution_result: return + + executed_count = execution_result.get("executed_count", 0) + successful_count = execution_result.get("successful_count", 0) + + # 更新成功执行计数 + self.planner_stats["successful_plans"] += successful_count + + # 统计回复动作和其他动作 + reply_count = 0 + other_count = 0 + + for result in execution_result.get("results", []): + action_type = result.get("action_type", "") + if action_type in ["reply", "proactive_reply"]: + reply_count += 1 + else: + other_count += 1 + + self.planner_stats["replies_generated"] += reply_count + self.planner_stats["other_actions_executed"] += other_count - for action_info in plan.decided_actions: - if action_info.action_type in ["reply", "proactive_reply"] and action_info.action_message: - # 记录用户交互 - self.relationship_tracker.add_interaction( - user_id=action_info.action_message.user_id, - user_name=action_info.action_message.user_nickname or action_info.action_message.user_id, - user_message=action_info.action_message.content, - bot_reply="Bot回复内容", # 这里需要实际的回复内容 - reply_timestamp=time.time() - ) + def _build_return_result(self, plan: Plan) -> Tuple[List[Dict], Optional[Dict]]: + """构建返回结果""" + final_actions = plan.decided_actions or [] + final_target_message = next( + (act.action_message for act in final_actions if act.action_message), None + ) - # 执行动作 - try: - await self.action_manager.execute_action( - action_name=action_info.action_type, - chat_id=self.chat_id, - target_message=action_info.action_message, - reasoning=action_info.reasoning, - action_data=action_info.action_data or {}, - ) + final_actions_dict = [asdict(act) for act in final_actions] - self.planner_stats["successful_plans"] += 1 - if action_info.action_type in ["reply", "proactive_reply"]: - self.planner_stats["replies_generated"] += 1 - else: - self.planner_stats["other_actions_executed"] += 1 + if final_target_message: + if hasattr(final_target_message, '__dataclass_fields__'): + final_target_message_dict = asdict(final_target_message) + else: + final_target_message_dict = final_target_message + else: + final_target_message_dict = None - except Exception as e: - logger.error(f"执行动作失败: {action_info.action_type}, 错误: {e}") + return final_actions_dict, final_target_message_dict def _build_return_result(self, plan: Plan) -> Tuple[List[Dict], Optional[Dict]]: """构建返回结果""" diff --git a/src/chat/planner_actions/planner_prompts.py b/src/chat/planner_actions/planner_prompts.py index d527655c8..36ecce527 100644 --- a/src/chat/planner_actions/planner_prompts.py +++ b/src/chat/planner_actions/planner_prompts.py @@ -60,18 +60,6 @@ def init_prompts(): {no_action_block} -动作:reply -动作描述:参与聊天回复,发送文本进行表达 -- 你想要闲聊或者随便附和 -- {mentioned_bonus} -- 如果你刚刚进行了回复,不要对同一个话题重复回应 -- 不要回复自己发送的消息 -{{ - "action": "reply", - "target_message_id": "触发action的消息id", - "reason": "在这里详细记录你的内心思考过程。例如:‘用户看起来很开心,我想回复一些积极的内容,分享这份喜悦。’" -}} - {action_options_text}