diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index c3496b79b..3f34d9ef1 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -111,10 +111,23 @@ class StreamLoopManager: logger.warning(f"无法获取流上下文: {stream_id}") return False - # 快速路径:如果流已存在,无需处理 - if context.stream_loop_task and not context.stream_loop_task.done(): + # 快速路径:如果流已存在且不是强制启动,无需处理 + if not force and context.stream_loop_task and not context.stream_loop_task.done(): logger.debug(f"流 {stream_id} 循环已在运行") return True + + # 如果是强制启动且任务仍在运行,先取消旧任务 + if force and context.stream_loop_task and not context.stream_loop_task.done(): + logger.info(f"强制启动模式:先取消现有流循环任务: {stream_id}") + old_task = context.stream_loop_task + old_task.cancel() + try: + await asyncio.wait_for(old_task, timeout=2.0) + logger.info(f"旧流循环任务已结束: {stream_id}") + except (asyncio.TimeoutError, asyncio.CancelledError): + logger.debug(f"旧流循环任务已取消或超时: {stream_id}") + except Exception as e: + logger.warning(f"等待旧任务结束时出错: {e}") # 创建流循环任务 try: diff --git a/src/chat/message_manager/global_notice_manager.py b/src/chat/message_manager/global_notice_manager.py index cfcc125ce..ce1600b13 100644 --- a/src/chat/message_manager/global_notice_manager.py +++ b/src/chat/message_manager/global_notice_manager.py @@ -205,9 +205,9 @@ class GlobalNoticeManager: # 格式化notice消息 if notice_type: - notice_line = f"[{notice_type}] {message.processed_plain_text or message.raw_message}" + notice_line = f"[{notice_type}] {message.processed_plain_text}" else: - notice_line = f"[通知] {message.processed_plain_text or message.raw_message}" + notice_line = f"[通知] {message.processed_plain_text}" # 添加时间信息(相对时间) time_diff = int(time.time() - notice.timestamp) diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index a06e07be0..54c74007f 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -275,8 +275,20 @@ class MessageManager: inactive_streams.append(stream_id) for stream_id in inactive_streams: try: + # 在使用之前重新从 chat_manager 中获取 chat_stream,避免引用未定义或过期的变量 + chat_stream = chat_manager.streams.get(stream_id) + if not chat_stream: + logger.debug(f"聊天流 {stream_id} 在清理时已不存在,跳过") + continue + await chat_stream.context_manager.clear_context() - del chat_manager.streams[stream_id] + + # 安全删除流(若已被其他地方删除则捕获) + try: + del chat_manager.streams[stream_id] + except KeyError: + logger.debug(f"删除聊天流 {stream_id} 时未找到,可能已被移除") + logger.info(f"清理不活跃聊天流: {stream_id}") except Exception as e: logger.error(f"清理聊天流 {stream_id} 失败: {e}") @@ -342,7 +354,16 @@ class MessageManager: # 取消 stream_loop_task,子任务会通过 try-catch 自动取消 try: stream_loop_task.cancel() - logger.info(f"已取消流循环任务: {chat_stream.stream_id}") + logger.info(f"已发送取消信号到流循环任务: {chat_stream.stream_id}") + + # 等待任务真正结束(设置超时避免死锁) + try: + await asyncio.wait_for(stream_loop_task, timeout=2.0) + logger.info(f"流循环任务已完全结束: {chat_stream.stream_id}") + except asyncio.TimeoutError: + logger.warning(f"等待流循环任务结束超时: {chat_stream.stream_id}") + except asyncio.CancelledError: + logger.info(f"流循环任务已被取消: {chat_stream.stream_id}") except Exception as e: logger.warning(f"取消流循环任务失败: {chat_stream.stream_id} - {e}") diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index ea5e6dffd..b7fdcf6a2 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -257,6 +257,8 @@ class DefaultReplyer: if not master_config or not master_config.enable: return "" + if not self.chat_stream.user_info: + return "" platform, user_id = self.chat_stream.platform, self.chat_stream.user_info.user_id try: if user_id: @@ -312,7 +314,7 @@ class DefaultReplyer: extra_info=extra_info, available_actions=available_actions, enable_tool=enable_tool, - reply_message=reply_message, + reply_message=DatabaseMessages(**reply_message) if isinstance(reply_message, dict) else reply_message, ) if not prompt: @@ -976,7 +978,6 @@ class DefaultReplyer: if unread_messages: unread_lines = [] for msg in unread_messages: - msg_id = msg.message_id msg_time = time.strftime("%H:%M:%S", time.localtime(msg.time)) msg_content = msg.processed_plain_text @@ -1077,7 +1078,7 @@ class DefaultReplyer: if unread_messages: unread_lines = [] for msg in unread_messages: - msg_id = msg.get("message_id", "") + msg.get("message_id", "") msg_time = time.strftime("%H:%M:%S", time.localtime(msg.get("time", time.time()))) msg_content = msg.get("processed_plain_text", "") @@ -1150,7 +1151,7 @@ class DefaultReplyer: extra_info: str = "", available_actions: dict[str, ActionInfo] | None = None, enable_tool: bool = True, - reply_message: dict[str, Any] | DatabaseMessages | None = None, + reply_message: DatabaseMessages | None = None, ) -> str: """ 构建回复器上下文 @@ -1612,17 +1613,11 @@ class DefaultReplyer: target = "(无消息内容)" # 添加情绪状态获取 + mood_prompt = "" if global_config.mood.enable_mood: chat_mood = mood_manager.get_mood_by_chat_id(chat_id) mood_prompt = chat_mood.mood_state - # 检查是否有愤怒状态的补充提示词 - angry_prompt_addition = mood_manager.get_angry_prompt_addition(chat_id) - if angry_prompt_addition: - mood_prompt = f"{mood_prompt}。{angry_prompt_addition}" - else: - mood_prompt = "" - # 从内存获取历史消息,避免重复查询数据库 from src.plugin_system.apis.chat_api import get_chat_manager @@ -1769,11 +1764,16 @@ class DefaultReplyer: platform=self.chat_stream.platform, ) - # 从 DatabaseMessages 获取 sender_info - if anchor_message: - sender_info = anchor_message.user_info - else: - sender_info = None + # 从 DatabaseMessages 获取 sender_info 并转换为 UserInfo + sender_info = None + if anchor_message and anchor_message.user_info: + db_user_info = anchor_message.user_info + sender_info = UserInfo( + platform=db_user_info.platform, + user_id=db_user_info.user_id, + user_nickname=db_user_info.user_nickname, + user_cardname=db_user_info.user_cardname, + ) return MessageSending( message_id=message_id, # 使用片段的唯一ID diff --git a/src/plugins/built_in/affinity_flow_chatter/plan_executor.py b/src/plugins/built_in/affinity_flow_chatter/plan_executor.py index 494bcc443..59bb098e6 100644 --- a/src/plugins/built_in/affinity_flow_chatter/plan_executor.py +++ b/src/plugins/built_in/affinity_flow_chatter/plan_executor.py @@ -124,11 +124,7 @@ class ChatterPlanExecutor: target_message = action_info.action_message message_id = None if target_message: - # 兼容 Pydantic 对象和字典两种情况 - if hasattr(target_message, "message_id"): - message_id = getattr(target_message, "message_id", None) - elif isinstance(target_message, dict): - message_id = target_message.get("message_id") + message_id = target_message.message_id if message_id: if message_id not in replied_message_ids: @@ -175,22 +171,10 @@ class ChatterPlanExecutor: try: logger.info(f"执行回复动作: {action_info.action_type} (原因: {action_info.reasoning})") - # 获取用户ID - 兼容对象和字典 - if action_info.action_message: - # DatabaseMessages对象情况 - user_id = action_info.action_message.user_info.user_id - if not user_id: - logger.error("在action_message里面找不到userid,无法执行回复") - return { - "action_type": action_info.action_type, - "success": False, - "error_message": "在action_message里面找不到userid", - "execution_time": 0, - "reasoning": action_info.reasoning, - "reply_content": "", - } + # 获取用户ID + user_id = action_info.action_message.user_info.user_id if action_info.action_message else None - if user_id == str(global_config.bot.qq_account): + if user_id and user_id == str(global_config.bot.qq_account): logger.warning("尝试回复自己,跳过此动作以防止死循环。") return { "action_type": action_info.action_type, @@ -217,13 +201,8 @@ class ChatterPlanExecutor: ) # 从返回结果中提取真正的回复文本 - if isinstance(execution_result, dict): - reply_content = execution_result.get("reply_text", "") - success = execution_result.get("success", False) - else: - # 兼容旧的返回值(虽然可能性不大) - reply_content = str(execution_result) if execution_result else "" - success = bool(reply_content) + reply_content = execution_result.get("reply_text", "") + success = execution_result.get("success", False) if success: logger.info(f"回复动作 '{action_info.action_type}' 执行成功。") @@ -291,6 +270,28 @@ class ChatterPlanExecutor: logger.info(f"执行其他动作: {action_info.action_type} (原因: {action_info.reasoning})") action_data = action_info.action_data or {} + + # 针对 poke_user 动作,特殊处理 + if action_info.action_type == "poke_user": + target_message = action_info.action_message + if target_message: + user_id = target_message.user_info.user_id + user_name = target_message.user_info.user_nickname + message_id = target_message.message_id + + if user_id: + action_data["user_id"] = user_id + logger.info(f"检测到戳一戳动作,目标用户ID: {user_id}") + elif user_name: + action_data["user_name"] = user_name + logger.info(f"检测到戳一戳动作,目标用户: {user_name}") + else: + logger.warning("无法从戳一戳消息中获取用户ID或昵称。") + + # 传递原始消息ID以支持引用 + if message_id: + action_data["target_message_id"] = message_id + # 构建动作参数 action_params = { "chat_id": plan.chat_id, diff --git a/src/plugins/built_in/affinity_flow_chatter/plan_filter.py b/src/plugins/built_in/affinity_flow_chatter/plan_filter.py index c8b97d180..4c7c690d6 100644 --- a/src/plugins/built_in/affinity_flow_chatter/plan_filter.py +++ b/src/plugins/built_in/affinity_flow_chatter/plan_filter.py @@ -546,7 +546,6 @@ class ChatterPlanFilter: target_message_dict = self._get_latest_message(message_id_list) if target_message_dict: - # 直接使用字典作为action_message,避免DatabaseMessages对象创建失败 target_message_obj = target_message_dict # 替换action_data中的临时ID为真实ID if "target_message_id" in action_data: @@ -561,10 +560,25 @@ class ChatterPlanFilter: action = "no_action" reasoning = f"无法找到目标消息进行回复。原始理由: {reasoning}" + # 转换为 DatabaseMessages 对象 + from src.common.data_models.database_data_model import DatabaseMessages + + action_message_obj = None if target_message_obj: - # 确保 action_message 中始终有 message_id 字段 + # 确保字典中有 message_id 字段 if "message_id" not in target_message_obj and "id" in target_message_obj: target_message_obj["message_id"] = target_message_obj["id"] + + try: + # 使用 ** 解包字典传入构造函数 + action_message_obj = DatabaseMessages(**target_message_obj) + logger.debug(f"[{action}] 成功转换目标消息为 DatabaseMessages 对象: {action_message_obj.message_id}") + except Exception as e: + logger.warning(f"[{action}] 无法将目标消息转换为 DatabaseMessages 对象: {e}", exc_info=True) + # 如果转换失败,对于必需目标消息的动作降级为 no_action + if action == "reply": + action = "no_action" + reasoning = f"目标消息转换失败: {e}。原始理由: {reasoning}" else: # 如果找不到目标消息,对于reply动作来说这是必需的,应该记录警告 if action == "reply": @@ -581,21 +595,13 @@ class ChatterPlanFilter: ): reasoning = f"LLM 返回了当前不可用的动作 '{action}'。原始理由: {reasoning}" action = "no_action" - from src.common.data_models.database_data_model import DatabaseMessages - - action_message_obj = None - if target_message_obj: - try: - action_message_obj = DatabaseMessages(**target_message_obj) - except Exception: - logger.warning("无法将目标消息转换为DatabaseMessages对象") parsed_actions.append( ActionPlannerInfo( action_type=action, reasoning=reasoning, action_data=action_data, - action_message=action_message_obj, + action_message=action_message_obj, # 使用转换后的 DatabaseMessages 对象 available_actions=plan.available_actions, ) )