diff --git a/src/chat/chat_loop/proactive/events.py b/src/chat/chat_loop/proactive/events.py index c273afef1..89a3bc7bb 100644 --- a/src/chat/chat_loop/proactive/events.py +++ b/src/chat/chat_loop/proactive/events.py @@ -11,3 +11,4 @@ class ProactiveTriggerEvent: source: str # 触发源的标识,例如 "silence_monitor", "insomnia_manager" reason: str # 触发的具体原因,例如 "聊天已沉默10分钟", "深夜emo" metadata: Optional[Dict[str, Any]] = field(default_factory=dict) # 可选的元数据,用于传递额外信息 + related_message_id: Optional[str] = None # 关联的消息ID,用于加载上下文 diff --git a/src/chat/chat_loop/proactive/proactive_thinker.py b/src/chat/chat_loop/proactive/proactive_thinker.py index 9303607b3..7c736b51a 100644 --- a/src/chat/chat_loop/proactive/proactive_thinker.py +++ b/src/chat/chat_loop/proactive/proactive_thinker.py @@ -1,6 +1,7 @@ import time import traceback import orjson +import re from typing import TYPE_CHECKING, Dict, Any from src.common.logger import get_logger @@ -15,7 +16,8 @@ from src.plugin_system.base.component_types import ComponentType from src.config.config import global_config from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat, build_readable_messages_with_id from src.mood.mood_manager import mood_manager -from src.common.database.sqlalchemy_database_api import store_action_info +from src.common.database.sqlalchemy_database_api import store_action_info, db_get +from src.common.database.sqlalchemy_models import Messages if TYPE_CHECKING: from ..cycle_processor import CycleProcessor @@ -118,85 +120,156 @@ class ProactiveThinker: trigger_event (ProactiveTriggerEvent): 触发事件。 """ try: - # 调用规划器的 PROACTIVE 模式,让其决定下一步的行动 - actions, _ = await self.cycle_processor.action_planner.plan(mode=ChatMode.PROACTIVE) + # 如果是提醒事件,跳过规划器,直接构建默认动作 + if trigger_event.source == "reminder_system": + # 1. 获取原始消息上下文 + action_message = {} + if trigger_event.related_message_id: + # 直接将从数据库获取的完整消息记录作为 action_message + action_message = await db_get( + Messages, {"message_id": trigger_event.related_message_id}, single_result=True + ) or {} - # 通常只关心规划出的第一个动作 - action_result = actions[0] if actions else {} + # 2. 智能确定@对象 + reason_text = trigger_event.reason.replace("定时提醒:", "").strip() + user_name_match = re.search(r"艾特一下(\S+)", reason_text) + + if user_name_match: + user_name = user_name_match.group(1) + at_message = reason_text.replace(f"艾特一下{user_name}", "").strip() + elif action_message.get("user_nickname"): + user_name = action_message.get("user_nickname") + at_message = reason_text + else: + user_name = "我" + at_message = reason_text - action_type = action_result.get("action_type") + # 3. 构建动作 + action_result = { + "action_type": "at_user", + "reasoning": "执行定时提醒", + "action_data": { + "user_name": user_name, + "at_message": at_message or "时间到啦!" + }, + "action_message": action_message + } + + # 4. 执行或回退 + try: + success, _, _ = await self.cycle_processor._handle_action( + action=action_result["action_type"], + reasoning=action_result["reasoning"], + action_data=action_result["action_data"], + cycle_timers={}, + thinking_id="", + action_message=action_result["action_message"] + ) + if not success: + raise Exception("at_user action failed") + except Exception: + logger.warning(f"{self.context.log_prefix} at_user动作执行失败,回退到proactive_reply") + fallback_action = { + "action_type": "proactive_reply", + "action_data": {"topic": trigger_event.reason}, + "action_message": action_message + } + await self._generate_proactive_content_and_send(fallback_action, trigger_event) - if action_type == "proactive_reply": - await self._generate_proactive_content_and_send(action_result) - elif action_type != "do_nothing": - logger.warning(f"{self.context.log_prefix} 主动思考返回了未知的动作类型: {action_type}") else: - # 如果规划结果是“什么都不做”,则记录日志 - logger.info(f"{self.context.log_prefix} 主动思考决策: 保持沉默") + # 对于其他来源的主动思考,正常调用规划器 + actions, _ = await self.cycle_processor.action_planner.plan(mode=ChatMode.PROACTIVE) + action_result = actions[0] if actions else {} + action_type = action_result.get("action_type") + if action_type == "proactive_reply": + await self._generate_proactive_content_and_send(action_result, trigger_event) + elif action_type not in ["do_nothing", "no_action"]: + await self.cycle_processor._handle_action( + action=action_result["action_type"], + reasoning=action_result.get("reasoning", ""), + action_data=action_result.get("action_data", {}), + cycle_timers={}, + thinking_id="", + action_message=action_result.get("action_message") + ) + else: + logger.info(f"{self.context.log_prefix} 主动思考决策: 保持沉默") + except Exception as e: logger.error(f"{self.context.log_prefix} 主动思考执行异常: {e}") logger.error(traceback.format_exc()) - async def _generate_proactive_content_and_send(self, action_result: Dict[str, Any]): + async def _get_reminder_context(self, message_id: str) -> str: + """获取提醒消息的上下文""" + try: + # 只获取那一条消息 + message_record = await db_get(Messages, {"message_id": message_id}, single_result=True) + if message_record: + # 使用 build_readable_messages_with_id 来格式化单条消息 + chat_context_block, _ = build_readable_messages_with_id(messages=[message_record]) + return chat_context_block + return "无法加载相关的聊天记录。" + except Exception as e: + logger.error(f"{self.context.log_prefix} 获取提醒上下文失败: {e}") + return "无法加载相关的聊天记录。" + + async def _generate_proactive_content_and_send(self, action_result: Dict[str, Any], trigger_event: ProactiveTriggerEvent): """ 获取实时信息,构建最终的生成提示词,并生成和发送主动回复。 Args: action_result (Dict[str, Any]): 规划器返回的动作结果。 + trigger_event (ProactiveTriggerEvent): 触发事件。 """ try: topic = action_result.get("action_data", {}).get("topic", "随便聊聊") logger.info(f"{self.context.log_prefix} 主动思考确定主题: '{topic}'") - # 1. 获取日程信息 schedule_block = "你今天没有日程安排。" if global_config.planning_system.schedule_enable: if current_activity := schedule_manager.get_current_activity(): schedule_block = f"你当前正在:{current_activity}。" - # 2. 网络搜索 news_block = "暂时没有获取到最新资讯。" - try: - web_search_tool = tool_api.get_tool_instance("web_search") - if web_search_tool: - # 检查工具的execute方法签名,使用正确的参数名 - try: - search_result_dict = await web_search_tool.execute(search_query=topic, max_results=10) - except TypeError: - # 如果search_query不工作,尝试其他可能的参数名 + if trigger_event.source != "reminder_system": + try: + web_search_tool = tool_api.get_tool_instance("web_search") + if web_search_tool: try: - search_result_dict = await web_search_tool.execute(keyword=topic, max_results=10) + search_result_dict = await web_search_tool.execute(search_query=topic, max_results=10) except TypeError: - # 跳过网络搜索,避免影响主动思考 - logger.warning(f"{self.context.log_prefix} 网络搜索工具参数不匹配,跳过搜索") - news_block = "跳过网络搜索。" - search_result_dict = None - - if search_result_dict and not search_result_dict.get("error"): - news_block = search_result_dict.get("content", "未能提取有效资讯。") - elif search_result_dict: - logger.warning(f"{self.context.log_prefix} 网络搜索返回错误: {search_result_dict.get('error')}") - else: - logger.warning(f"{self.context.log_prefix} 未找到 web_search 工具实例。") - except Exception as e: - logger.error(f"{self.context.log_prefix} 主动思考时网络搜索失败: {e}") + try: + search_result_dict = await web_search_tool.execute(keyword=topic, max_results=10) + except TypeError: + logger.warning(f"{self.context.log_prefix} 网络搜索工具参数不匹配,跳过搜索") + news_block = "跳过网络搜索。" + search_result_dict = None + + if search_result_dict and not search_result_dict.get("error"): + news_block = search_result_dict.get("content", "未能提取有效资讯。") + elif search_result_dict: + logger.warning(f"{self.context.log_prefix} 网络搜索返回错误: {search_result_dict.get('error')}") + else: + logger.warning(f"{self.context.log_prefix} 未找到 web_search 工具实例。") + except Exception as e: + logger.error(f"{self.context.log_prefix} 主动思考时网络搜索失败: {e}") - # 3. 获取最新的聊天上下文 - message_list = get_raw_msg_before_timestamp_with_chat( - chat_id=self.context.stream_id, - timestamp=time.time(), - limit=int(global_config.chat.max_context_size * 0.3), - ) - chat_context_block, _ = build_readable_messages_with_id(messages=message_list) + if trigger_event.source == "reminder_system" and trigger_event.related_message_id: + chat_context_block = await self._get_reminder_context(trigger_event.related_message_id) + else: + message_list = get_raw_msg_before_timestamp_with_chat( + chat_id=self.context.stream_id, + timestamp=time.time(), + limit=int(global_config.chat.max_context_size * 0.3), + ) + chat_context_block, _ = build_readable_messages_with_id(messages=message_list) - # 4. 使用决策模型进行二次确认(节省珍贵的回复模型调用) from src.llm_models.utils_model import LLMRequest from src.config.config import model_config bot_name = global_config.bot.nickname - # 构建二次确认提示词 confirmation_prompt = f"""# 主动回复二次确认 ## 基本信息 @@ -219,7 +292,6 @@ class ProactiveThinker: 请严格按照上述格式输出,不要添加任何解释。""" - # 使用决策模型进行二次确认 planner_llm = LLMRequest( model_set=model_config.model_task_config.planner, request_type="planner" @@ -227,12 +299,10 @@ class ProactiveThinker: confirmation_result, _ = await planner_llm.generate_response_async(prompt=confirmation_prompt) - # 检查二次确认结果 if not confirmation_result or "SKIP_PROACTIVE_REPLY" in confirmation_result: logger.info(f"{self.context.log_prefix} 决策模型二次确认决定跳过主动回复") return - # 5. 只有通过二次确认才调用珍贵的回复模型 bot_name = global_config.bot.nickname personality = global_config.personality identity_block = ( diff --git a/src/chat/heart_flow/heartflow_message_processor.py b/src/chat/heart_flow/heartflow_message_processor.py index df3b92aeb..a0d18c44c 100644 --- a/src/chat/heart_flow/heartflow_message_processor.py +++ b/src/chat/heart_flow/heartflow_message_processor.py @@ -199,7 +199,8 @@ class HeartFCMessageReceiver: "chat_id": chat.stream_id, "content": reminder_event.content, "confidence": reminder_event.confidence, - "created_at": datetime.now().isoformat() + "created_at": datetime.now().isoformat(), + "original_message_id": message.message_info.message_id } success = await event_scheduler.schedule_event( diff --git a/src/chat/planner_actions/planner.py b/src/chat/planner_actions/planner.py index 49871b78a..2a96f77b4 100644 --- a/src/chat/planner_actions/planner.py +++ b/src/chat/planner_actions/planner.py @@ -484,6 +484,7 @@ class ActionPlanner: mode: ChatMode = ChatMode.FOCUS, loop_start_time: float = 0.0, available_actions: Optional[Dict[str, ActionInfo]] = None, + pseudo_message: Optional[str] = None, ) -> Tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]: """ [注释] "大脑"规划器。 @@ -513,6 +514,8 @@ class ActionPlanner: truncate=False, show_actions=False, ) + if pseudo_message: + chat_content_block_short += f"\n[m99] 刚刚, 用户: {pseudo_message}" self.last_obs_time_mark = time.time() is_group_chat, chat_target_info, current_available_actions = self.get_necessary_info() @@ -522,8 +525,8 @@ class ActionPlanner: # --- 2. 启动小脑并行思考 --- all_sub_planner_results: List[Dict[str, Any]] = [] - # PROACTIVE模式下禁用小脑,避免与大脑的主动思考决策冲突 - if mode != ChatMode.PROACTIVE: + # PROACTIVE模式下,只有在有伪消息(来自提醒)时才激活小脑 + if mode != ChatMode.PROACTIVE or pseudo_message: try: sub_planner_actions: Dict[str, ActionInfo] = {} for action_name, action_info in available_actions.items(): @@ -607,40 +610,45 @@ class ActionPlanner: action, reasoning = "no_reply", f"大脑处理错误: {e}" # --- 4. 整合大脑和小脑的决策 --- - # 如果是私聊且开启了强制回复,则将no_reply强制改为reply - if not is_group_chat and global_config.chat.force_reply_private and action == "no_reply": - action = "reply" - reasoning = "私聊强制回复" - logger.info(f"{self.log_prefix}私聊强制回复已触发,将动作从 'no_reply' 修改为 'reply'") - - is_parallel = True - for info in all_sub_planner_results: - action_type = info.get("action_type") - if action_type and action_type not in ["no_action", "no_reply"]: - action_info = available_actions.get(action_type) - if action_info and not action_info.parallel_action: - is_parallel = False - break - - action_data["loop_start_time"] = loop_start_time - final_actions: List[Dict[str, Any]] = [] - - if is_parallel: - logger.info(f"{self.log_prefix}决策模式: 大脑与小脑并行") - if action not in ["no_action", "no_reply"]: - final_actions.append( - { - "action_type": action, - "reasoning": reasoning, - "action_data": action_data, - "action_message": target_message, - "available_actions": available_actions, - } - ) - final_actions.extend(all_sub_planner_results) + # 特殊规则:如果是提醒任务,且大脑决定do_nothing,则忽略大脑,采用小脑的决策 + if mode == ChatMode.PROACTIVE and pseudo_message and action == "do_nothing": + logger.info(f"{self.log_prefix}提醒任务触发,大脑决策为do_nothing,忽略大脑并采用小脑决策") + final_actions = all_sub_planner_results else: - logger.info(f"{self.log_prefix}决策模式: 小脑优先 (检测到非并行action)") - final_actions.extend(all_sub_planner_results) + # 如果是私聊且开启了强制回复,则将no_reply强制改为reply + if not is_group_chat and global_config.chat.force_reply_private and action == "no_reply": + action = "reply" + reasoning = "私聊强制回复" + logger.info(f"{self.log_prefix}私聊强制回复已触发,将动作从 'no_reply' 修改为 'reply'") + + is_parallel = True + for info in all_sub_planner_results: + action_type = info.get("action_type") + if action_type and action_type not in ["no_action", "no_reply"]: + action_info = available_actions.get(action_type) + if action_info and not action_info.parallel_action: + is_parallel = False + break + + action_data["loop_start_time"] = loop_start_time + final_actions: List[Dict[str, Any]] = [] + + if is_parallel: + logger.info(f"{self.log_prefix}决策模式: 大脑与小脑并行") + if action not in ["no_action", "no_reply"]: + final_actions.append( + { + "action_type": action, + "reasoning": reasoning, + "action_data": action_data, + "action_message": target_message, + "available_actions": available_actions, + } + ) + final_actions.extend(all_sub_planner_results) + else: + logger.info(f"{self.log_prefix}决策模式: 小脑优先 (检测到非并行action)") + final_actions.extend(all_sub_planner_results) final_actions = self._filter_no_actions(final_actions)