diff --git a/src/chat/normal_chat/normal_chat.py b/src/chat/normal_chat/normal_chat.py index 2b9777fba..84a8febe8 100644 --- a/src/chat/normal_chat/normal_chat.py +++ b/src/chat/normal_chat/normal_chat.py @@ -1,7 +1,7 @@ import asyncio import time -import traceback from random import random +from typing import List, Dict, Optional, Any from typing import List, Optional, Dict # 导入类型提示 import os import pickle @@ -11,6 +11,8 @@ from src.chat.heart_flow.utils_chat import get_chat_type_and_target_info from src.manager.mood_manager import mood_manager from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager from src.chat.utils.timer_calculator import Timer + +from src.chat.message_receive.chat_stream import ChatStream from src.chat.utils.prompt_builder import global_prompt_manager from .normal_chat_generator import NormalChatGenerator from ..message_receive.message import MessageSending, MessageRecv, MessageThinking, MessageSet @@ -31,6 +33,8 @@ from src.chat.utils.chat_message_builder import ( get_raw_msg_before_timestamp_with_chat, num_new_messages_since, ) +from .priority_manager import PriorityManager +import traceback willing_manager = get_willing_manager() @@ -46,64 +50,57 @@ SEGMENT_CLEANUP_CONFIG = { class NormalChat: - def __init__(self, chat_stream: ChatStream, interest_dict: dict = None, on_switch_to_focus_callback=None): - """初始化 NormalChat 实例。只进行同步操作。""" + """ + 普通聊天处理类,负责处理非核心对话的聊天逻辑。 + 每个聊天(私聊或群聊)都会有一个独立的NormalChat实例。 + """ + def __init__(self, chat_stream: ChatStream): + """ + 初始化NormalChat实例。 + + Args: + chat_stream (ChatStream): 聊天流对象,包含与特定聊天相关的所有信息。 + """ self.chat_stream = chat_stream self.stream_id = chat_stream.stream_id - self.stream_name = get_chat_manager().get_stream_name(self.stream_id) or self.stream_id + self.stream_name = chat_stream.get_name() + self.willing_amplifier = 1.0 # 回复意愿放大器,动态调整 + self.enable_planner = global_config.normal_chat.get("enable_planner", False) # 是否启用planner + self.action_manager = ActionManager(chat_stream) # 初始化动作管理器 + self.action_type: Optional[str] = None # 当前动作类型 + self.is_parallel_action: bool = False # 是否是可并行动作 - # 初始化Normal Chat专用表达器 - self.expressor = NormalChatExpressor(self.chat_stream) - self.replyer = DefaultReplyer(self.chat_stream) - - # Interest dict - self.interest_dict = interest_dict - - self.is_group_chat, self.chat_target_info = get_chat_type_and_target_info(self.stream_id) - - self.willing_amplifier = 1 - self.start_time = time.time() - - # Other sync initializations - self.gpt = NormalChatGenerator() - self.mood_manager = mood_manager - self.start_time = time.time() + # 任务管理 self._chat_task: Optional[asyncio.Task] = None - self._initialized = False # Track initialization status + self._disabled = False # 停用标志 - # Planner相关初始化 - self.action_manager = ActionManager() - self.planner = NormalChatPlanner(self.stream_name, self.action_manager) - self.action_modifier = NormalChatActionModifier(self.action_manager, self.stream_id, self.stream_name) - self.enable_planner = global_config.normal_chat.enable_planner # 从配置中读取是否启用planner + # 消息段缓存,用于关系构建 + self.person_engaged_cache: Dict[str, List[Dict[str, Any]]] = {} + self.last_cleanup_time = time.time() - # 记录最近的回复内容,每项包含: {time, user_message, response, is_mentioned, is_reference_reply} - self.recent_replies = [] - self.max_replies_history = 20 # 最多保存最近20条回复记录 + # 最近回复记录 + self.recent_replies: List[Dict[str, Any]] = [] - # 新的消息段缓存结构: - # {person_id: [{"start_time": float, "end_time": float, "last_msg_time": float, "message_count": int}, ...]} - self.person_engaged_cache: Dict[str, List[Dict[str, any]]] = {} + # 新增:回复模式和优先级管理器 + self.reply_mode = global_config.chat.get_reply_mode(self.stream_id) + if self.reply_mode == "priority": + interest_dict = self.chat_stream.interest_dict or {} + self.priority_manager = PriorityManager( + interest_dict=interest_dict, + normal_queue_max_size=global_config.chat.get("priority_queue_max_size", 5), + ) + else: + self.priority_manager = None - # 持久化存储文件路径 - self.cache_file_path = os.path.join("data", "relationship", f"relationship_cache_{self.stream_id}.pkl") - - # 最后处理的消息时间,避免重复处理相同消息 - self.last_processed_message_time = 0.0 - - # 最后清理时间,用于定期清理老消息段 - self.last_cleanup_time = 0.0 - - # 添加回调函数,用于在满足条件时通知切换到focus_chat模式 - self.on_switch_to_focus_callback = on_switch_to_focus_callback - - self._disabled = False # 增加停用标志 - - # 加载持久化的缓存 - self._load_cache() - - logger.debug(f"[{self.stream_name}] NormalChat 初始化完成 (异步部分)。") + async def disable(self): + """停用 NormalChat 实例,停止所有后台任务""" + self._disabled = True + if self._chat_task and not self._chat_task.done(): + self._chat_task.cancel() + if self.reply_mode == "priority" and self._priority_chat_task and not self._priority_chat_task.done(): + self._priority_chat_task.cancel() + logger.info(f"[{self.stream_name}] NormalChat 已停用。") # ================================ # 缓存管理模块 @@ -405,6 +402,35 @@ class NormalChat: f"[{self.stream_name}] 更新用户 {person_id} 的消息段,消息时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(msg_time))}" ) + async def _priority_chat_loop(self): + """ + 使用优先级队列的消息处理循环。 + """ + while not self._disabled: + try: + if not self.priority_manager.is_empty(): + # 获取最高优先级的消息 + message_to_process = self.priority_manager.get_highest_priority_message() + + if message_to_process: + logger.info( + f"[{self.stream_name}] 从队列中取出消息进行处理: User {message_to_process.message_info.user_info.user_id}, Time: {time.strftime('%H:%M:%S', time.localtime(message_to_process.message_info.time))}" + ) + # 检查是否应该回复 + async with self.chat_stream.get_process_lock(): + await self._process_chat_message(message_to_process) + + # 等待一段时间再检查队列 + await asyncio.sleep(1) + + except asyncio.CancelledError: + logger.info(f"[{self.stream_name}] 优先级聊天循环被取消。") + break + except Exception as e: + logger.error(f"[{self.stream_name}] 优先级聊天循环出现错误: {e}", exc_info=True) + # 出现错误时,等待更长时间避免频繁报错 + await asyncio.sleep(10) + # 改为实例方法 async def _create_thinking_message(self, message: MessageRecv, timestamp: Optional[float] = None) -> str: """创建思考消息""" @@ -602,15 +628,33 @@ class NormalChat: # 改为实例方法, 移除 chat 参数 async def normal_response(self, message: MessageRecv, is_mentioned: bool, interested_rate: float) -> None: - # 新增:如果已停用,直接返回 + """ + 处理接收到的消息。 + 根据回复模式,决定是立即处理还是放入优先级队列。 + """ + if self._disabled: + return + + # 根据回复模式决定行为 + if self.reply_mode == "priority": + # 优先模式下,所有消息都进入管理器 + if self.priority_manager: + self.priority_manager.add_message(message) + return + + # --- 以下为原有的 "兴趣" 模式逻辑 --- + await self._process_message(message, is_mentioned, interested_rate) + + async def _process_message(self, message: MessageRecv, is_mentioned: bool, interested_rate: float) -> None: + """ + 实际处理单条消息的逻辑,包括意愿判断、回复生成、动作执行等。 + """ if self._disabled: - logger.info(f"[{self.stream_name}] 已停用,忽略 normal_response。") return # 新增:在auto模式下检查是否需要直接切换到focus模式 if global_config.chat.chat_mode == "auto": - should_switch = await self._check_should_switch_to_focus() - if should_switch: + if await self._should_switch_to_focus(message, is_mentioned, interested_rate): logger.info(f"[{self.stream_name}] 检测到切换到focus聊天模式的条件,直接执行切换") if self.on_switch_to_focus_callback: await self.on_switch_to_focus_callback() @@ -864,8 +908,11 @@ class NormalChat: self._chat_task = None try: - logger.debug(f"[{self.stream_name}] 创建新的聊天轮询任务") - polling_task = asyncio.create_task(self._reply_interested_message()) + logger.debug(f"[{self.stream_name}] 创建新的聊天轮询任务,模式: {self.reply_mode}") + if self.reply_mode == "priority": + polling_task = asyncio.create_task(self._priority_reply_loop()) + else: # 默认或 "interest" 模式 + polling_task = asyncio.create_task(self._reply_interested_message()) # 设置回调 polling_task.add_done_callback(lambda t: self._handle_task_completion(t)) @@ -986,6 +1033,52 @@ class NormalChat: # 返回最近的limit条记录,按时间倒序排列 return sorted(self.recent_replies[-limit:], key=lambda x: x["time"], reverse=True) + async def _priority_reply_loop(self) -> None: + """ + [优先级模式] 循环获取并处理最高优先级的消息。 + """ + logger.info(f"[{self.stream_name}] 已启动优先级回复模式循环。") + try: + while not self._disabled: + if self.priority_manager is None: + logger.error(f"[{self.stream_name}] 处于优先级模式,但 priority_manager 未初始化。") + await asyncio.sleep(5) + continue + + # 动态调整回复频率 + self.adjust_reply_frequency() + + # 从优先级队列中获取消息 + highest_priority_message = self.priority_manager.get_highest_priority_message() + + if highest_priority_message: + message = highest_priority_message + logger.debug( + f"[{self.stream_name}] 从优先级队列中取出消息进行处理: {message.processed_plain_text[:30]}..." + ) + + # 复用现有的消息处理逻辑 + # 需要计算 is_mentioned 和 interested_rate + is_mentioned = message.is_mentioned + # 对于优先级模式,我们可以认为取出的消息就是我们感兴趣的 + # 或者我们可以从 priority_manager 的 PrioritizedMessage 中获取原始兴趣分 + # 这里我们先用一个较高的固定值,或者从消息本身获取 + interested_rate = 1.0 # 简化处理,或者可以传递更精确的值 + + await self._process_message(message, is_mentioned, interested_rate) + + # 处理完一条消息后可以稍微等待,避免过于频繁地连续回复 + await asyncio.sleep(global_config.chat.get("priority_post_reply_delay", 1.0)) + else: + # 如果队列为空,等待一段时间 + await asyncio.sleep(global_config.chat.get("priority_empty_queue_delay", 0.5)) + + except asyncio.CancelledError: + logger.debug(f"[{self.stream_name}] 优先级回复任务被取消。") + raise # 重新抛出异常 + except Exception as e: + logger.error(f"[{self.stream_name}] 优先级回复循环异常: {e}", exc_info=True) + def adjust_reply_frequency(self): """ 根据预设规则动态调整回复意愿(willing_amplifier)。 diff --git a/src/chat/normal_chat/priority_manager.py b/src/chat/normal_chat/priority_manager.py new file mode 100644 index 000000000..a059a96a9 --- /dev/null +++ b/src/chat/normal_chat/priority_manager.py @@ -0,0 +1,118 @@ +import time +import heapq +import math +from typing import List, Tuple, Dict, Any, Optional +from ..message_receive.message import MessageSending, MessageRecv, MessageThinking, MessageSet +from src.common.logger import get_logger + +logger = get_logger("normal_chat") + + +class PrioritizedMessage: + """带有优先级的消息对象""" + + def __init__(self, message: MessageRecv, interest_score: float, is_vip: bool = False): + self.message = message + self.arrival_time = time.time() + self.interest_score = interest_score + self.is_vip = is_vip + self.priority = self.calculate_priority() + + def calculate_priority(self, decay_rate: float = 0.01) -> float: + """ + 计算优先级分数。 + 优先级 = 兴趣分 * exp(-衰减率 * 消息年龄) + """ + age = time.time() - self.arrival_time + decay_factor = math.exp(-decay_rate * age) + priority = self.interest_score * decay_factor + return priority + + def __lt__(self, other: "PrioritizedMessage") -> bool: + """用于堆排序的比较函数,我们想要一个最大堆,所以用 >""" + return self.priority > other.priority + + +class PriorityManager: + """ + 管理消息队列,根据优先级选择消息进行处理。 + """ + + def __init__(self, interest_dict: Dict[str, float], normal_queue_max_size: int = 5): + self.vip_queue: List[PrioritizedMessage] = [] # VIP 消息队列 (最大堆) + self.normal_queue: List[PrioritizedMessage] = [] # 普通消息队列 (最大堆) + self.interest_dict = interest_dict if interest_dict is not None else {} + self.normal_queue_max_size = normal_queue_max_size + self.vip_users = self.interest_dict.get("vip_users", []) # 假设vip用户在interest_dict中指定 + + def _get_interest_score(self, user_id: str) -> float: + """获取用户的兴趣分,默认为1.0""" + return self.interest_dict.get("interests", {}).get(user_id, 1.0) + + def _is_vip(self, user_id: str) -> bool: + """检查用户是否为VIP""" + return user_id in self.vip_users + + def add_message(self, message: MessageRecv): + """ + 添加新消息到合适的队列中。 + """ + user_id = message.message_info.user_info.user_id + is_vip = self._is_vip(user_id) + interest_score = self._get_interest_score(user_id) + + p_message = PrioritizedMessage(message, interest_score, is_vip) + + if is_vip: + heapq.heappush(self.vip_queue, p_message) + logger.debug(f"消息来自VIP用户 {user_id}, 已添加到VIP队列. 当前VIP队列长度: {len(self.vip_queue)}") + else: + if len(self.normal_queue) >= self.normal_queue_max_size: + # 如果队列已满,只在消息优先级高于最低优先级消息时才添加 + if p_message.priority > self.normal_queue[0].priority: + heapq.heapreplace(self.normal_queue, p_message) + logger.debug(f"普通队列已满,但新消息优先级更高,已替换. 用户: {user_id}") + else: + logger.debug(f"普通队列已满且新消息优先级较低,已忽略. 用户: {user_id}") + else: + heapq.heappush(self.normal_queue, p_message) + logger.debug( + f"消息来自普通用户 {user_id}, 已添加到普通队列. 当前普通队列长度: {len(self.normal_queue)}" + ) + + def get_highest_priority_message(self) -> Optional[MessageRecv]: + """ + 从VIP和普通队列中获取当前最高优先级的消息。 + """ + # 更新所有消息的优先级 + for p_msg in self.vip_queue: + p_msg.priority = p_msg.calculate_priority() + for p_msg in self.normal_queue: + p_msg.priority = p_msg.calculate_priority() + + # 重建堆 + heapq.heapify(self.vip_queue) + heapq.heapify(self.normal_queue) + + vip_msg = self.vip_queue[0] if self.vip_queue else None + normal_msg = self.normal_queue[0] if self.normal_queue else None + + if vip_msg and normal_msg: + if vip_msg.priority >= normal_msg.priority: + return heapq.heappop(self.vip_queue).message + else: + return heapq.heappop(self.normal_queue).message + elif vip_msg: + return heapq.heappop(self.vip_queue).message + elif normal_msg: + return heapq.heappop(self.normal_queue).message + else: + return None + + def is_empty(self) -> bool: + """检查所有队列是否为空""" + return not self.vip_queue and not self.normal_queue + + def get_queue_status(self) -> str: + """获取队列状态信息""" + return f"VIP队列: {len(self.vip_queue)}, 普通队列: {len(self.normal_queue)}"