diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index ec2de4c83..9a4800148 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -111,10 +111,23 @@ class StreamLoopManager: logger.warning(f"无法获取流上下文: {stream_id}") return False - # 快速路径:如果流已存在,无需处理 - if context.stream_loop_task and not context.stream_loop_task.done(): + # 快速路径:如果流已存在且不是强制启动,无需处理 + if not force and context.stream_loop_task and not context.stream_loop_task.done(): logger.debug(f"流 {stream_id} 循环已在运行") return True + + # 如果是强制启动且任务仍在运行,先取消旧任务 + if force and context.stream_loop_task and not context.stream_loop_task.done(): + logger.info(f"强制启动模式:先取消现有流循环任务: {stream_id}") + old_task = context.stream_loop_task + old_task.cancel() + try: + await asyncio.wait_for(old_task, timeout=2.0) + logger.info(f"旧流循环任务已结束: {stream_id}") + except (asyncio.TimeoutError, asyncio.CancelledError): + logger.debug(f"旧流循环任务已取消或超时: {stream_id}") + except Exception as e: + logger.warning(f"等待旧任务结束时出错: {e}") # 创建流循环任务 try: diff --git a/src/chat/message_manager/global_notice_manager.py b/src/chat/message_manager/global_notice_manager.py index cfcc125ce..ce1600b13 100644 --- a/src/chat/message_manager/global_notice_manager.py +++ b/src/chat/message_manager/global_notice_manager.py @@ -205,9 +205,9 @@ class GlobalNoticeManager: # 格式化notice消息 if notice_type: - notice_line = f"[{notice_type}] {message.processed_plain_text or message.raw_message}" + notice_line = f"[{notice_type}] {message.processed_plain_text}" else: - notice_line = f"[通知] {message.processed_plain_text or message.raw_message}" + notice_line = f"[通知] {message.processed_plain_text}" # 添加时间信息(相对时间) time_diff = int(time.time() - notice.timestamp) diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index b3eb66ffc..d7b8d4417 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -275,8 +275,20 @@ class MessageManager: inactive_streams.append(stream_id) for stream_id in inactive_streams: try: + # 在使用之前重新从 chat_manager 中获取 chat_stream,避免引用未定义或过期的变量 + chat_stream = chat_manager.streams.get(stream_id) + if not chat_stream: + logger.debug(f"聊天流 {stream_id} 在清理时已不存在,跳过") + continue + await chat_stream.context_manager.clear_context() - del chat_manager.streams[stream_id] + + # 安全删除流(若已被其他地方删除则捕获) + try: + del chat_manager.streams[stream_id] + except KeyError: + logger.debug(f"删除聊天流 {stream_id} 时未找到,可能已被移除") + logger.info(f"清理不活跃聊天流: {stream_id}") except Exception as e: logger.error(f"清理聊天流 {stream_id} 失败: {e}") @@ -342,7 +354,16 @@ class MessageManager: # 取消 stream_loop_task,子任务会通过 try-catch 自动取消 try: stream_loop_task.cancel() - logger.info(f"已取消流循环任务: {chat_stream.stream_id}") + logger.info(f"已发送取消信号到流循环任务: {chat_stream.stream_id}") + + # 等待任务真正结束(设置超时避免死锁) + try: + await asyncio.wait_for(stream_loop_task, timeout=2.0) + logger.info(f"流循环任务已完全结束: {chat_stream.stream_id}") + except asyncio.TimeoutError: + logger.warning(f"等待流循环任务结束超时: {chat_stream.stream_id}") + except asyncio.CancelledError: + logger.info(f"流循环任务已被取消: {chat_stream.stream_id}") except Exception as e: logger.warning(f"取消流循环任务失败: {chat_stream.stream_id} - {e}") diff --git a/src/plugins/built_in/affinity_flow_chatter/plan_executor.py b/src/plugins/built_in/affinity_flow_chatter/plan_executor.py index e150e7e62..fb048eada 100644 --- a/src/plugins/built_in/affinity_flow_chatter/plan_executor.py +++ b/src/plugins/built_in/affinity_flow_chatter/plan_executor.py @@ -124,11 +124,7 @@ class ChatterPlanExecutor: target_message = action_info.action_message message_id = None if target_message: - # 兼容 Pydantic 对象和字典两种情况 - if hasattr(target_message, "message_id"): - message_id = getattr(target_message, "message_id", None) - elif isinstance(target_message, dict): - message_id = target_message.get("message_id") + message_id = target_message.message_id if message_id: if message_id not in replied_message_ids: @@ -175,20 +171,10 @@ class ChatterPlanExecutor: try: logger.info(f"执行回复动作: {action_info.action_type} (原因: {action_info.reasoning})") - # 获取用户ID - 兼容对象和字典 - if hasattr(action_info.action_message, "user_info"): - # DatabaseMessages对象情况 - user_id = action_info.action_message.user_info.user_id - else: - # 字典情况(向后兼容)- 适配扁平化消息字典结构 - # 首先尝试从扁平化结构直接获取用户信息 - user_id = action_info.action_message.get("user_id") + # 获取用户ID + user_id = action_info.action_message.user_info.user_id if action_info.action_message else None - # 如果扁平化结构中没有用户信息,再尝试从嵌套的user_info获取 - if not user_id: - user_id = action_info.action_message.get("user_info", {}).get("user_id") - - if user_id == str(global_config.bot.qq_account): + if user_id and user_id == str(global_config.bot.qq_account): logger.warning("尝试回复自己,跳过此动作以防止死循环。") return { "action_type": action_info.action_type, @@ -215,13 +201,8 @@ class ChatterPlanExecutor: ) # 从返回结果中提取真正的回复文本 - if isinstance(execution_result, dict): - reply_content = execution_result.get("reply_text", "") - success = execution_result.get("success", False) - else: - # 兼容旧的返回值(虽然可能性不大) - reply_content = str(execution_result) if execution_result else "" - success = bool(reply_content) + reply_content = execution_result.get("reply_text", "") + success = execution_result.get("success", False) if success: logger.info(f"回复动作 '{action_info.action_type}' 执行成功。") @@ -294,22 +275,22 @@ class ChatterPlanExecutor: if action_info.action_type == "poke_user": target_message = action_info.action_message if target_message: - # 优先直接获取 user_id,这才是最可靠的信息 - user_id = target_message.get("user_id") + user_id = target_message.user_info.user_id + user_name = target_message.user_info.user_nickname + message_id = target_message.message_id + if user_id: action_data["user_id"] = user_id logger.info(f"检测到戳一戳动作,目标用户ID: {user_id}") + elif user_name: + action_data["user_name"] = user_name + logger.info(f"检测到戳一戳动作,目标用户: {user_name}") else: - # 如果没有 user_id,再尝试用 user_nickname 作为备用方案 - user_name = target_message.get("user_nickname") - if user_name: - action_data["user_name"] = user_name - logger.info(f"检测到戳一戳动作,目标用户: {user_name}") - else: - logger.warning("无法从戳一戳消息中获取用户ID或昵称。") + logger.warning("无法从戳一戳消息中获取用户ID或昵称。") # 传递原始消息ID以支持引用 - action_data["target_message_id"] = target_message.get("message_id") + if message_id: + action_data["target_message_id"] = message_id # 构建动作参数 action_params = { diff --git a/src/plugins/built_in/affinity_flow_chatter/plan_filter.py b/src/plugins/built_in/affinity_flow_chatter/plan_filter.py index 3013afaa4..ecc0b9f0a 100644 --- a/src/plugins/built_in/affinity_flow_chatter/plan_filter.py +++ b/src/plugins/built_in/affinity_flow_chatter/plan_filter.py @@ -544,7 +544,6 @@ class ChatterPlanFilter: target_message_dict = self._get_latest_message(message_id_list) if target_message_dict: - # 直接使用字典作为action_message,避免DatabaseMessages对象创建失败 target_message_obj = target_message_dict # 替换action_data中的临时ID为真实ID if "target_message_id" in action_data: @@ -559,10 +558,25 @@ class ChatterPlanFilter: action = "no_action" reasoning = f"无法找到目标消息进行回复。原始理由: {reasoning}" + # 转换为 DatabaseMessages 对象 + from src.common.data_models.database_data_model import DatabaseMessages + + action_message_obj = None if target_message_obj: - # 确保 action_message 中始终有 message_id 字段 + # 确保字典中有 message_id 字段 if "message_id" not in target_message_obj and "id" in target_message_obj: target_message_obj["message_id"] = target_message_obj["id"] + + try: + # 使用 ** 解包字典传入构造函数 + action_message_obj = DatabaseMessages(**target_message_obj) + logger.debug(f"[{action}] 成功转换目标消息为 DatabaseMessages 对象: {action_message_obj.message_id}") + except Exception as e: + logger.warning(f"[{action}] 无法将目标消息转换为 DatabaseMessages 对象: {e}", exc_info=True) + # 如果转换失败,对于必需目标消息的动作降级为 no_action + if action == "reply": + action = "no_action" + reasoning = f"目标消息转换失败: {e}。原始理由: {reasoning}" else: # 如果找不到目标消息,对于reply动作来说这是必需的,应该记录警告 if action == "reply": @@ -579,22 +593,13 @@ class ChatterPlanFilter: ): reasoning = f"LLM 返回了当前不可用的动作 '{action}'。原始理由: {reasoning}" action = "no_action" - #TODO:把逻辑迁移到DatabaseMessages(如果没人做下个星期我自己来) - #from src.common.data_models.database_data_model import DatabaseMessages - - #action_message_obj = None - #if target_message_obj: - #try: - #action_message_obj = DatabaseMessages(**target_message_obj) - #except Exception: - #logger.warning("无法将目标消息转换为DatabaseMessages对象") parsed_actions.append( ActionPlannerInfo( action_type=action, reasoning=reasoning, action_data=action_data, - action_message=target_message_obj, + action_message=action_message_obj, # 使用转换后的 DatabaseMessages 对象 available_actions=plan.available_actions, ) )