diff --git a/src/chat/message_receive/chat_stream.py b/src/chat/message_receive/chat_stream.py index 55d296db9..a82acc413 100644 --- a/src/chat/message_receive/chat_stream.py +++ b/src/chat/message_receive/chat_stream.py @@ -47,6 +47,16 @@ class ChatMessageContext: return False return True + def get_priority_mode(self) -> str: + """获取优先级模式""" + return self.message.priority_mode + + def get_priority_info(self) -> Optional[dict]: + """获取优先级信息""" + if hasattr(self.message, "priority_info") and self.message.priority_info: + return self.message.priority_info + return None + class ChatStream: """聊天流对象,存储一个完整的聊天上下文""" diff --git a/src/chat/message_receive/message.py b/src/chat/message_receive/message.py index 5798eb512..1c8f7789e 100644 --- a/src/chat/message_receive/message.py +++ b/src/chat/message_receive/message.py @@ -108,6 +108,9 @@ class MessageRecv(Message): self.detailed_plain_text = message_dict.get("detailed_plain_text", "") self.is_emoji = False self.is_picid = False + self.is_mentioned = 0.0 + self.priority_mode = "interest" + self.priority_info = None def update_chat_stream(self, chat_stream: "ChatStream"): self.chat_stream = chat_stream @@ -146,8 +149,27 @@ class MessageRecv(Message): if isinstance(segment.data, str): return await get_image_manager().get_emoji_description(segment.data) return "[发了一个表情包,网卡了加载不出来]" + elif segment.type == "mention_bot": + self.is_mentioned = float(segment.data) + return "" + elif segment.type == "set_priority_mode": + # 处理设置优先级模式的消息段 + if isinstance(segment.data, str): + self.priority_mode = segment.data + return "" + elif segment.type == "priority_info": + if isinstance(segment.data, dict): + # 处理优先级信息 + self.priority_info = segment.data + """ + { + 'message_type': 'vip', # vip or normal + 'message_priority': 1.0, # 优先级,大为优先,float + } + """ + return "" else: - return f"[{segment.type}:{str(segment.data)}]" + return "" except Exception as e: logger.error(f"处理消息段失败: {str(e)}, 类型: {segment.type}, 数据: {segment.data}") return f"[处理失败的{segment.type}消息]" diff --git a/src/chat/normal_chat/normal_chat.py b/src/chat/normal_chat/normal_chat.py index b11669654..9c3144cc4 100644 --- a/src/chat/normal_chat/normal_chat.py +++ b/src/chat/normal_chat/normal_chat.py @@ -6,7 +6,7 @@ import os import pickle from maim_message import UserInfo, Seg from src.common.logger import get_logger -from src.chat.message_receive.chat_stream import ChatStream +from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager from src.chat.utils.timer_calculator import Timer from src.chat.utils.prompt_builder import global_prompt_manager @@ -27,6 +27,15 @@ from src.chat.utils.chat_message_builder import ( from .priority_manager import PriorityManager import traceback +from .normal_chat_generator import NormalChatGenerator +from src.chat.normal_chat.normal_chat_expressor import NormalChatExpressor +from src.chat.replyer.default_generator import DefaultReplyer +from src.chat.normal_chat.normal_chat_planner import NormalChatPlanner +from src.chat.normal_chat.normal_chat_action_modifier import NormalChatActionModifier + +from src.chat.heart_flow.utils_chat import get_chat_type_and_target_info +from src.manager.mood_manager import mood_manager + willing_manager = get_willing_manager() logger = get_logger("normal_chat") @@ -46,7 +55,7 @@ class NormalChat: 每个聊天(私聊或群聊)都会有一个独立的NormalChat实例。 """ - def __init__(self, chat_stream: ChatStream): + def __init__(self, chat_stream: ChatStream, interest_dict: dict = None, on_switch_to_focus_callback=None): """ 初始化NormalChat实例。 @@ -55,10 +64,61 @@ class NormalChat: """ self.chat_stream = chat_stream self.stream_id = chat_stream.stream_id - self.stream_name = chat_stream.get_name() - self.willing_amplifier = 1.0 # 回复意愿放大器,动态调整 - self.enable_planner = global_config.normal_chat.get("enable_planner", False) # 是否启用planner - self.action_manager = ActionManager(chat_stream) # 初始化动作管理器 + + self.stream_name = get_chat_manager().get_stream_name(self.stream_id) or self.stream_id + + # 初始化Normal Chat专用表达器 + self.expressor = NormalChatExpressor(self.chat_stream) + self.replyer = DefaultReplyer(self.chat_stream) + + # Interest dict + self.interest_dict = interest_dict + + self.is_group_chat, self.chat_target_info = get_chat_type_and_target_info(self.stream_id) + + self.willing_amplifier = 1 + self.start_time = time.time() + + # Other sync initializations + self.gpt = NormalChatGenerator() + self.mood_manager = mood_manager + self.start_time = time.time() + + self._initialized = False # Track initialization status + + # Planner相关初始化 + self.action_manager = ActionManager() + self.planner = NormalChatPlanner(self.stream_name, self.action_manager) + self.action_modifier = NormalChatActionModifier(self.action_manager, self.stream_id, self.stream_name) + self.enable_planner = global_config.normal_chat.enable_planner # 从配置中读取是否启用planner + + # 记录最近的回复内容,每项包含: {time, user_message, response, is_mentioned, is_reference_reply} + self.recent_replies = [] + self.max_replies_history = 20 # 最多保存最近20条回复记录 + + # 新的消息段缓存结构: + # {person_id: [{"start_time": float, "end_time": float, "last_msg_time": float, "message_count": int}, ...]} + self.person_engaged_cache: Dict[str, List[Dict[str, any]]] = {} + + # 持久化存储文件路径 + self.cache_file_path = os.path.join("data", "relationship", f"relationship_cache_{self.stream_id}.pkl") + + # 最后处理的消息时间,避免重复处理相同消息 + self.last_processed_message_time = 0.0 + + # 最后清理时间,用于定期清理老消息段 + self.last_cleanup_time = 0.0 + + # 添加回调函数,用于在满足条件时通知切换到focus_chat模式 + self.on_switch_to_focus_callback = on_switch_to_focus_callback + + self._disabled = False # 增加停用标志 + + # 加载持久化的缓存 + self._load_cache() + + logger.debug(f"[{self.stream_name}] NormalChat 初始化完成 (异步部分)。") + self.action_type: Optional[str] = None # 当前动作类型 self.is_parallel_action: bool = False # 是否是可并行动作 @@ -66,20 +126,15 @@ class NormalChat: self._chat_task: Optional[asyncio.Task] = None self._disabled = False # 停用标志 - # 消息段缓存,用于关系构建 - self.person_engaged_cache: Dict[str, List[Dict[str, Any]]] = {} - self.last_cleanup_time = time.time() - - # 最近回复记录 - self.recent_replies: List[Dict[str, Any]] = [] + self.on_switch_to_focus_callback = on_switch_to_focus_callback # 新增:回复模式和优先级管理器 - self.reply_mode = global_config.chat.get_reply_mode(self.stream_id) + self.reply_mode = self.chat_stream.context.get_priority_mode() if self.reply_mode == "priority": - interest_dict = self.chat_stream.interest_dict or {} + interest_dict = interest_dict or {} self.priority_manager = PriorityManager( interest_dict=interest_dict, - normal_queue_max_size=global_config.chat.get("priority_queue_max_size", 5), + normal_queue_max_size=5, ) else: self.priority_manager = None @@ -393,6 +448,29 @@ class NormalChat: f"[{self.stream_name}] 更新用户 {person_id} 的消息段,消息时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(msg_time))}" ) + async def _priority_chat_loop_add_message(self): + while not self._disabled: + try: + ids = list(self.interest_dict.keys()) + for msg_id in ids: + message, interest_value, _ = self.interest_dict[msg_id] + if not self._disabled: + # 更新消息段信息 + self._update_user_message_segments(message) + + # 添加消息到优先级管理器 + if self.priority_manager: + self.priority_manager.add_message(message, interest_value) + self.interest_dict.pop(msg_id, None) + except Exception as e: + logger.error( + f"[{self.stream_name}] 优先级聊天循环添加消息时出现错误: {traceback.format_exc()}", exc_info=True + ) + print(traceback.format_exc()) + # 出现错误时,等待一段时间再重试 + raise + await asyncio.sleep(0.1) + async def _priority_chat_loop(self): """ 使用优先级队列的消息处理循环。 @@ -401,15 +479,22 @@ class NormalChat: try: if not self.priority_manager.is_empty(): # 获取最高优先级的消息 - message_to_process = self.priority_manager.get_highest_priority_message() + message = self.priority_manager.get_highest_priority_message() - if message_to_process: + if message: logger.info( - f"[{self.stream_name}] 从队列中取出消息进行处理: User {message_to_process.message_info.user_info.user_id}, Time: {time.strftime('%H:%M:%S', time.localtime(message_to_process.message_info.time))}" + f"[{self.stream_name}] 从队列中取出消息进行处理: User {message.message_info.user_info.user_id}, Time: {time.strftime('%H:%M:%S', time.localtime(message.message_info.time))}" ) - # 检查是否应该回复 - async with self.chat_stream.get_process_lock(): - await self._process_chat_message(message_to_process) + # 执行定期清理 + self._cleanup_old_segments() + + # 更新消息段信息 + self._update_user_message_segments(message) + + # 检查是否有用户满足关系构建条件 + asyncio.create_task(self._check_relation_building_conditions()) + + await self.reply_one_message(message) # 等待一段时间再检查队列 await asyncio.sleep(1) @@ -418,7 +503,7 @@ class NormalChat: logger.info(f"[{self.stream_name}] 优先级聊天循环被取消。") break except Exception as e: - logger.error(f"[{self.stream_name}] 优先级聊天循环出现错误: {e}", exc_info=True) + logger.error(f"[{self.stream_name}] 优先级聊天循环出现错误: {traceback.format_exc()}", exc_info=True) # 出现错误时,等待更长时间避免频繁报错 await asyncio.sleep(10) @@ -645,7 +730,7 @@ class NormalChat: # 新增:在auto模式下检查是否需要直接切换到focus模式 if global_config.chat.chat_mode == "auto": - if await self._should_switch_to_focus(message, is_mentioned, interested_rate): + if await self._check_should_switch_to_focus(): logger.info(f"[{self.stream_name}] 检测到切换到focus聊天模式的条件,直接执行切换") if self.on_switch_to_focus_callback: await self.on_switch_to_focus_callback() @@ -695,176 +780,10 @@ class NormalChat: do_reply = False response_set = None # 初始化 response_set if random() < reply_probability: - do_reply = True - - # 回复前处理 - await willing_manager.before_generate_reply_handle(message.message_info.message_id) - - thinking_id = await self._create_thinking_message(message) - - # 如果启用planner,预先修改可用actions(避免在并行任务中重复调用) - available_actions = None - if self.enable_planner: - try: - await self.action_modifier.modify_actions_for_normal_chat( - self.chat_stream, self.recent_replies, message.processed_plain_text - ) - available_actions = self.action_manager.get_using_actions_for_mode("normal") - except Exception as e: - logger.warning(f"[{self.stream_name}] 获取available_actions失败: {e}") - available_actions = None - - # 定义并行执行的任务 - async def generate_normal_response(): - """生成普通回复""" - try: - return await self.gpt.generate_response( - message=message, - thinking_id=thinking_id, - enable_planner=self.enable_planner, - available_actions=available_actions, - ) - except Exception as e: - logger.error(f"[{self.stream_name}] 回复生成出现错误:{str(e)} {traceback.format_exc()}") - return None - - async def plan_and_execute_actions(): - """规划和执行额外动作""" - if not self.enable_planner: - logger.debug(f"[{self.stream_name}] Planner未启用,跳过动作规划") - return None - - try: - # 获取发送者名称(动作修改已在并行执行前完成) - sender_name = self._get_sender_name(message) - - no_action = { - "action_result": { - "action_type": "no_action", - "action_data": {}, - "reasoning": "规划器初始化默认", - "is_parallel": True, - }, - "chat_context": "", - "action_prompt": "", - } - - # 检查是否应该跳过规划 - if self.action_modifier.should_skip_planning(): - logger.debug(f"[{self.stream_name}] 没有可用动作,跳过规划") - self.action_type = "no_action" - return no_action - - # 执行规划 - plan_result = await self.planner.plan(message, sender_name) - action_type = plan_result["action_result"]["action_type"] - action_data = plan_result["action_result"]["action_data"] - reasoning = plan_result["action_result"]["reasoning"] - is_parallel = plan_result["action_result"].get("is_parallel", False) - - logger.info( - f"[{self.stream_name}] Planner决策: {action_type}, 理由: {reasoning}, 并行执行: {is_parallel}" - ) - self.action_type = action_type # 更新实例属性 - self.is_parallel_action = is_parallel # 新增:保存并行执行标志 - - # 如果规划器决定不执行任何动作 - if action_type == "no_action": - logger.debug(f"[{self.stream_name}] Planner决定不执行任何额外动作") - return no_action - - # 执行额外的动作(不影响回复生成) - action_result = await self._execute_action(action_type, action_data, message, thinking_id) - if action_result is not None: - logger.info(f"[{self.stream_name}] 额外动作 {action_type} 执行完成") - else: - logger.warning(f"[{self.stream_name}] 额外动作 {action_type} 执行失败") - - return { - "action_type": action_type, - "action_data": action_data, - "reasoning": reasoning, - "is_parallel": is_parallel, - } - - except Exception as e: - logger.error(f"[{self.stream_name}] Planner执行失败: {e}") - return no_action - - # 并行执行回复生成和动作规划 - self.action_type = None # 初始化动作类型 - self.is_parallel_action = False # 初始化并行动作标志 - with Timer("并行生成回复和规划", timing_results): - response_set, plan_result = await asyncio.gather( - generate_normal_response(), plan_and_execute_actions(), return_exceptions=True - ) - - # 处理生成回复的结果 - if isinstance(response_set, Exception): - logger.error(f"[{self.stream_name}] 回复生成异常: {response_set}") - response_set = None - - # 处理规划结果(可选,不影响回复) - if isinstance(plan_result, Exception): - logger.error(f"[{self.stream_name}] 动作规划异常: {plan_result}") - elif plan_result: - logger.debug(f"[{self.stream_name}] 额外动作处理完成: {self.action_type}") - - if not response_set or ( - self.enable_planner and self.action_type not in ["no_action"] and not self.is_parallel_action - ): - if not response_set: - logger.info(f"[{self.stream_name}] 模型未生成回复内容") - elif self.enable_planner and self.action_type not in ["no_action"] and not self.is_parallel_action: - logger.info(f"[{self.stream_name}] 模型选择其他动作(非并行动作)") - # 如果模型未生成回复,移除思考消息 - container = await message_manager.get_container(self.stream_id) # 使用 self.stream_id - for msg in container.messages[:]: - if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id: - container.messages.remove(msg) - logger.debug(f"[{self.stream_name}] 已移除未产生回复的思考消息 {thinking_id}") - break - # 需要在此处也调用 not_reply_handle 和 delete 吗? - # 如果是因为模型没回复,也算是一种 "未回复" - await willing_manager.not_reply_handle(message.message_info.message_id) - willing_manager.delete(message.message_info.message_id) - return # 不执行后续步骤 - - # logger.info(f"[{self.stream_name}] 回复内容: {response_set}") - - if self._disabled: - logger.info(f"[{self.stream_name}] 已停用,忽略 normal_response。") - return - - # 发送回复 (不再需要传入 chat) - with Timer("消息发送", timing_results): - first_bot_msg = await self._add_messages_to_manager(message, response_set, thinking_id) - - # 检查 first_bot_msg 是否为 None (例如思考消息已被移除的情况) - if first_bot_msg: - # 消息段已在接收消息时更新,这里不需要额外处理 - - # 记录回复信息到最近回复列表中 - reply_info = { - "time": time.time(), - "user_message": message.processed_plain_text, - "user_info": { - "user_id": message.message_info.user_info.user_id, - "user_nickname": message.message_info.user_info.user_nickname, - }, - "response": response_set, - "is_mentioned": is_mentioned, - "is_reference_reply": message.reply is not None, # 判断是否为引用回复 - "timing": {k: round(v, 2) for k, v in timing_results.items()}, - } - self.recent_replies.append(reply_info) - # 保持最近回复历史在限定数量内 - if len(self.recent_replies) > self.max_replies_history: - self.recent_replies = self.recent_replies[-self.max_replies_history :] - - # 回复后处理 - await willing_manager.after_generate_reply_handle(message.message_info.message_id) - + with Timer("获取回复", timing_results): + await willing_manager.before_generate_reply_handle(message.message_info.message_id) + do_reply = await self.reply_one_message(message) + response_set = do_reply if do_reply else None # 输出性能计时结果 if do_reply and response_set: # 确保 response_set 不是 None timing_str = " | ".join([f"{step}: {duration:.2f}秒" for step, duration in timing_results.items()]) @@ -873,6 +792,7 @@ class NormalChat: logger.info( f"[{self.stream_name}]回复消息: {trigger_msg[:30]}... | 回复内容: {response_msg[:30]}... | 计时: {timing_str}" ) + await willing_manager.after_generate_reply_handle(message.message_info.message_id) elif not do_reply: # 不回复处理 await willing_manager.not_reply_handle(message.message_info.message_id) @@ -880,6 +800,167 @@ class NormalChat: # 意愿管理器:注销当前message信息 (无论是否回复,只要处理过就删除) willing_manager.delete(message.message_info.message_id) + async def reply_one_message(self, message: MessageRecv) -> None: + # 回复前处理 + thinking_id = await self._create_thinking_message(message) + + # 如果启用planner,预先修改可用actions(避免在并行任务中重复调用) + available_actions = None + if self.enable_planner: + try: + await self.action_modifier.modify_actions_for_normal_chat( + self.chat_stream, self.recent_replies, message.processed_plain_text + ) + available_actions = self.action_manager.get_using_actions_for_mode("normal") + except Exception as e: + logger.warning(f"[{self.stream_name}] 获取available_actions失败: {e}") + available_actions = None + + # 定义并行执行的任务 + async def generate_normal_response(): + """生成普通回复""" + try: + return await self.gpt.generate_response( + message=message, + thinking_id=thinking_id, + enable_planner=self.enable_planner, + available_actions=available_actions, + ) + except Exception as e: + logger.error(f"[{self.stream_name}] 回复生成出现错误:{str(e)} {traceback.format_exc()}") + return None + + async def plan_and_execute_actions(): + """规划和执行额外动作""" + if not self.enable_planner: + logger.debug(f"[{self.stream_name}] Planner未启用,跳过动作规划") + return None + + try: + # 获取发送者名称(动作修改已在并行执行前完成) + sender_name = self._get_sender_name(message) + + no_action = { + "action_result": { + "action_type": "no_action", + "action_data": {}, + "reasoning": "规划器初始化默认", + "is_parallel": True, + }, + "chat_context": "", + "action_prompt": "", + } + + # 检查是否应该跳过规划 + if self.action_modifier.should_skip_planning(): + logger.debug(f"[{self.stream_name}] 没有可用动作,跳过规划") + self.action_type = "no_action" + return no_action + + # 执行规划 + plan_result = await self.planner.plan(message, sender_name) + action_type = plan_result["action_result"]["action_type"] + action_data = plan_result["action_result"]["action_data"] + reasoning = plan_result["action_result"]["reasoning"] + is_parallel = plan_result["action_result"].get("is_parallel", False) + + logger.info( + f"[{self.stream_name}] Planner决策: {action_type}, 理由: {reasoning}, 并行执行: {is_parallel}" + ) + self.action_type = action_type # 更新实例属性 + self.is_parallel_action = is_parallel # 新增:保存并行执行标志 + + # 如果规划器决定不执行任何动作 + if action_type == "no_action": + logger.debug(f"[{self.stream_name}] Planner决定不执行任何额外动作") + return no_action + + # 执行额外的动作(不影响回复生成) + action_result = await self._execute_action(action_type, action_data, message, thinking_id) + if action_result is not None: + logger.info(f"[{self.stream_name}] 额外动作 {action_type} 执行完成") + else: + logger.warning(f"[{self.stream_name}] 额外动作 {action_type} 执行失败") + + return { + "action_type": action_type, + "action_data": action_data, + "reasoning": reasoning, + "is_parallel": is_parallel, + } + + except Exception as e: + logger.error(f"[{self.stream_name}] Planner执行失败: {e}") + return no_action + + # 并行执行回复生成和动作规划 + self.action_type = None # 初始化动作类型 + self.is_parallel_action = False # 初始化并行动作标志 + response_set, plan_result = await asyncio.gather( + generate_normal_response(), plan_and_execute_actions(), return_exceptions=True + ) + + # 处理生成回复的结果 + if isinstance(response_set, Exception): + logger.error(f"[{self.stream_name}] 回复生成异常: {response_set}") + response_set = None + + # 处理规划结果(可选,不影响回复) + if isinstance(plan_result, Exception): + logger.error(f"[{self.stream_name}] 动作规划异常: {plan_result}") + elif plan_result: + logger.debug(f"[{self.stream_name}] 额外动作处理完成: {self.action_type}") + + if not response_set or ( + self.enable_planner and self.action_type not in ["no_action"] and not self.is_parallel_action + ): + if not response_set: + logger.info(f"[{self.stream_name}] 模型未生成回复内容") + elif self.enable_planner and self.action_type not in ["no_action"] and not self.is_parallel_action: + logger.info(f"[{self.stream_name}] 模型选择其他动作(非并行动作)") + # 如果模型未生成回复,移除思考消息 + container = await message_manager.get_container(self.stream_id) # 使用 self.stream_id + for msg in container.messages[:]: + if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id: + container.messages.remove(msg) + logger.debug(f"[{self.stream_name}] 已移除未产生回复的思考消息 {thinking_id}") + break + # 需要在此处也调用 not_reply_handle 和 delete 吗? + # 如果是因为模型没回复,也算是一种 "未回复" + return False + + # logger.info(f"[{self.stream_name}] 回复内容: {response_set}") + + if self._disabled: + logger.info(f"[{self.stream_name}] 已停用,忽略 normal_response。") + return False + + # 发送回复 (不再需要传入 chat) + first_bot_msg = await self._add_messages_to_manager(message, response_set, thinking_id) + + # 检查 first_bot_msg 是否为 None (例如思考消息已被移除的情况) + if first_bot_msg: + # 消息段已在接收消息时更新,这里不需要额外处理 + + # 记录回复信息到最近回复列表中 + reply_info = { + "time": time.time(), + "user_message": message.processed_plain_text, + "user_info": { + "user_id": message.message_info.user_info.user_id, + "user_nickname": message.message_info.user_info.user_nickname, + }, + "response": response_set, + # "is_mentioned": is_mentioned, + "is_reference_reply": message.reply is not None, # 判断是否为引用回复 + # "timing": {k: round(v, 2) for k, v in timing_results.items()}, + } + self.recent_replies.append(reply_info) + # 保持最近回复历史在限定数量内 + if len(self.recent_replies) > self.max_replies_history: + self.recent_replies = self.recent_replies[-self.max_replies_history :] + return response_set if response_set else False + # 改为实例方法, 移除 chat 参数 async def start_chat(self): @@ -899,9 +980,14 @@ class NormalChat: self._chat_task = None try: - logger.debug(f"[{self.stream_name}] 创建新的聊天轮询任务,模式: {self.reply_mode}") + logger.info(f"[{self.stream_name}] 创建新的聊天轮询任务,模式: {self.reply_mode}") if self.reply_mode == "priority": - polling_task = asyncio.create_task(self._priority_reply_loop()) + polling_task_send = asyncio.create_task(self._priority_chat_loop()) + polling_task_recv = asyncio.create_task(self._priority_chat_loop_add_message()) + print("555") + polling_task = asyncio.gather(polling_task_send, polling_task_recv) + print("666") + else: # 默认或 "interest" 模式 polling_task = asyncio.create_task(self._reply_interested_message()) @@ -942,7 +1028,7 @@ class NormalChat: # 尝试获取异常,但不抛出 exc = task.exception() if exc: - logger.error(f"[{self.stream_name}] 任务异常: {type(exc).__name__}: {exc}") + logger.error(f"[{self.stream_name}] 任务异常: {type(exc).__name__}: {exc}", exc_info=exc) else: logger.debug(f"[{self.stream_name}] 任务正常完成") except Exception as e: @@ -1024,52 +1110,6 @@ class NormalChat: # 返回最近的limit条记录,按时间倒序排列 return sorted(self.recent_replies[-limit:], key=lambda x: x["time"], reverse=True) - async def _priority_reply_loop(self) -> None: - """ - [优先级模式] 循环获取并处理最高优先级的消息。 - """ - logger.info(f"[{self.stream_name}] 已启动优先级回复模式循环。") - try: - while not self._disabled: - if self.priority_manager is None: - logger.error(f"[{self.stream_name}] 处于优先级模式,但 priority_manager 未初始化。") - await asyncio.sleep(5) - continue - - # 动态调整回复频率 - self.adjust_reply_frequency() - - # 从优先级队列中获取消息 - highest_priority_message = self.priority_manager.get_highest_priority_message() - - if highest_priority_message: - message = highest_priority_message - logger.debug( - f"[{self.stream_name}] 从优先级队列中取出消息进行处理: {message.processed_plain_text[:30]}..." - ) - - # 复用现有的消息处理逻辑 - # 需要计算 is_mentioned 和 interested_rate - is_mentioned = message.is_mentioned - # 对于优先级模式,我们可以认为取出的消息就是我们感兴趣的 - # 或者我们可以从 priority_manager 的 PrioritizedMessage 中获取原始兴趣分 - # 这里我们先用一个较高的固定值,或者从消息本身获取 - interested_rate = 1.0 # 简化处理,或者可以传递更精确的值 - - await self._process_message(message, is_mentioned, interested_rate) - - # 处理完一条消息后可以稍微等待,避免过于频繁地连续回复 - await asyncio.sleep(global_config.chat.get("priority_post_reply_delay", 1.0)) - else: - # 如果队列为空,等待一段时间 - await asyncio.sleep(global_config.chat.get("priority_empty_queue_delay", 0.5)) - - except asyncio.CancelledError: - logger.debug(f"[{self.stream_name}] 优先级回复任务被取消。") - raise # 重新抛出异常 - except Exception as e: - logger.error(f"[{self.stream_name}] 优先级回复循环异常: {e}", exc_info=True) - def adjust_reply_frequency(self): """ 根据预设规则动态调整回复意愿(willing_amplifier)。 diff --git a/src/chat/normal_chat/priority_manager.py b/src/chat/normal_chat/priority_manager.py index 07112dcb2..9e1ef76c2 100644 --- a/src/chat/normal_chat/priority_manager.py +++ b/src/chat/normal_chat/priority_manager.py @@ -11,10 +11,10 @@ logger = get_logger("normal_chat") class PrioritizedMessage: """带有优先级的消息对象""" - def __init__(self, message: MessageRecv, interest_score: float, is_vip: bool = False): + def __init__(self, message: MessageRecv, interest_scores: List[float], is_vip: bool = False): self.message = message self.arrival_time = time.time() - self.interest_score = interest_score + self.interest_scores = interest_scores self.is_vip = is_vip self.priority = self.calculate_priority() @@ -25,7 +25,7 @@ class PrioritizedMessage: """ age = time.time() - self.arrival_time decay_factor = math.exp(-decay_rate * age) - priority = self.interest_score * decay_factor + priority = sum(self.interest_scores) + decay_factor return priority def __lt__(self, other: "PrioritizedMessage") -> bool: @@ -43,25 +43,20 @@ class PriorityManager: self.normal_queue: List[PrioritizedMessage] = [] # 普通消息队列 (最大堆) self.interest_dict = interest_dict if interest_dict is not None else {} self.normal_queue_max_size = normal_queue_max_size - self.vip_users = self.interest_dict.get("vip_users", []) # 假设vip用户在interest_dict中指定 def _get_interest_score(self, user_id: str) -> float: """获取用户的兴趣分,默认为1.0""" return self.interest_dict.get("interests", {}).get(user_id, 1.0) - def _is_vip(self, user_id: str) -> bool: - """检查用户是否为VIP""" - return user_id in self.vip_users - - def add_message(self, message: MessageRecv): + def add_message(self, message: MessageRecv, interest_score: Optional[float] = None): """ 添加新消息到合适的队列中。 """ user_id = message.message_info.user_info.user_id - is_vip = self._is_vip(user_id) - interest_score = self._get_interest_score(user_id) + is_vip = message.priority_info.get("message_type") == "vip" if message.priority_info else False + message_priority = message.priority_info.get("message_priority", 0.0) if message.priority_info else 0.0 - p_message = PrioritizedMessage(message, interest_score, is_vip) + p_message = PrioritizedMessage(message, [interest_score, message_priority], is_vip) if is_vip: heapq.heappush(self.vip_queue, p_message) @@ -97,12 +92,7 @@ class PriorityManager: vip_msg = self.vip_queue[0] if self.vip_queue else None normal_msg = self.normal_queue[0] if self.normal_queue else None - if vip_msg and normal_msg: - if vip_msg.priority >= normal_msg.priority: - return heapq.heappop(self.vip_queue).message - else: - return heapq.heappop(self.normal_queue).message - elif vip_msg: + if vip_msg: return heapq.heappop(self.vip_queue).message elif normal_msg: return heapq.heappop(self.normal_queue).message