From e3480e989e28cceb8961d5e9f8080839a8c5023a Mon Sep 17 00:00:00 2001 From: tcmofashi Date: Sat, 28 Jun 2025 18:41:44 +0800 Subject: [PATCH 1/4] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0priority=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chat/normal_chat/normal_chat.py | 205 ++++++++++++++++------- src/chat/normal_chat/priority_manager.py | 118 +++++++++++++ 2 files changed, 267 insertions(+), 56 deletions(-) create mode 100644 src/chat/normal_chat/priority_manager.py diff --git a/src/chat/normal_chat/normal_chat.py b/src/chat/normal_chat/normal_chat.py index 2b9777fba..84a8febe8 100644 --- a/src/chat/normal_chat/normal_chat.py +++ b/src/chat/normal_chat/normal_chat.py @@ -1,7 +1,7 @@ import asyncio import time -import traceback from random import random +from typing import List, Dict, Optional, Any from typing import List, Optional, Dict # 导入类型提示 import os import pickle @@ -11,6 +11,8 @@ from src.chat.heart_flow.utils_chat import get_chat_type_and_target_info from src.manager.mood_manager import mood_manager from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager from src.chat.utils.timer_calculator import Timer + +from src.chat.message_receive.chat_stream import ChatStream from src.chat.utils.prompt_builder import global_prompt_manager from .normal_chat_generator import NormalChatGenerator from ..message_receive.message import MessageSending, MessageRecv, MessageThinking, MessageSet @@ -31,6 +33,8 @@ from src.chat.utils.chat_message_builder import ( get_raw_msg_before_timestamp_with_chat, num_new_messages_since, ) +from .priority_manager import PriorityManager +import traceback willing_manager = get_willing_manager() @@ -46,64 +50,57 @@ SEGMENT_CLEANUP_CONFIG = { class NormalChat: - def __init__(self, chat_stream: ChatStream, interest_dict: dict = None, on_switch_to_focus_callback=None): - """初始化 NormalChat 实例。只进行同步操作。""" + """ + 普通聊天处理类,负责处理非核心对话的聊天逻辑。 + 每个聊天(私聊或群聊)都会有一个独立的NormalChat实例。 + """ + def __init__(self, chat_stream: ChatStream): + """ + 初始化NormalChat实例。 + + Args: + chat_stream (ChatStream): 聊天流对象,包含与特定聊天相关的所有信息。 + """ self.chat_stream = chat_stream self.stream_id = chat_stream.stream_id - self.stream_name = get_chat_manager().get_stream_name(self.stream_id) or self.stream_id + self.stream_name = chat_stream.get_name() + self.willing_amplifier = 1.0 # 回复意愿放大器,动态调整 + self.enable_planner = global_config.normal_chat.get("enable_planner", False) # 是否启用planner + self.action_manager = ActionManager(chat_stream) # 初始化动作管理器 + self.action_type: Optional[str] = None # 当前动作类型 + self.is_parallel_action: bool = False # 是否是可并行动作 - # 初始化Normal Chat专用表达器 - self.expressor = NormalChatExpressor(self.chat_stream) - self.replyer = DefaultReplyer(self.chat_stream) - - # Interest dict - self.interest_dict = interest_dict - - self.is_group_chat, self.chat_target_info = get_chat_type_and_target_info(self.stream_id) - - self.willing_amplifier = 1 - self.start_time = time.time() - - # Other sync initializations - self.gpt = NormalChatGenerator() - self.mood_manager = mood_manager - self.start_time = time.time() + # 任务管理 self._chat_task: Optional[asyncio.Task] = None - self._initialized = False # Track initialization status + self._disabled = False # 停用标志 - # Planner相关初始化 - self.action_manager = ActionManager() - self.planner = NormalChatPlanner(self.stream_name, self.action_manager) - self.action_modifier = NormalChatActionModifier(self.action_manager, self.stream_id, self.stream_name) - self.enable_planner = global_config.normal_chat.enable_planner # 从配置中读取是否启用planner + # 消息段缓存,用于关系构建 + self.person_engaged_cache: Dict[str, List[Dict[str, Any]]] = {} + self.last_cleanup_time = time.time() - # 记录最近的回复内容,每项包含: {time, user_message, response, is_mentioned, is_reference_reply} - self.recent_replies = [] - self.max_replies_history = 20 # 最多保存最近20条回复记录 + # 最近回复记录 + self.recent_replies: List[Dict[str, Any]] = [] - # 新的消息段缓存结构: - # {person_id: [{"start_time": float, "end_time": float, "last_msg_time": float, "message_count": int}, ...]} - self.person_engaged_cache: Dict[str, List[Dict[str, any]]] = {} + # 新增:回复模式和优先级管理器 + self.reply_mode = global_config.chat.get_reply_mode(self.stream_id) + if self.reply_mode == "priority": + interest_dict = self.chat_stream.interest_dict or {} + self.priority_manager = PriorityManager( + interest_dict=interest_dict, + normal_queue_max_size=global_config.chat.get("priority_queue_max_size", 5), + ) + else: + self.priority_manager = None - # 持久化存储文件路径 - self.cache_file_path = os.path.join("data", "relationship", f"relationship_cache_{self.stream_id}.pkl") - - # 最后处理的消息时间,避免重复处理相同消息 - self.last_processed_message_time = 0.0 - - # 最后清理时间,用于定期清理老消息段 - self.last_cleanup_time = 0.0 - - # 添加回调函数,用于在满足条件时通知切换到focus_chat模式 - self.on_switch_to_focus_callback = on_switch_to_focus_callback - - self._disabled = False # 增加停用标志 - - # 加载持久化的缓存 - self._load_cache() - - logger.debug(f"[{self.stream_name}] NormalChat 初始化完成 (异步部分)。") + async def disable(self): + """停用 NormalChat 实例,停止所有后台任务""" + self._disabled = True + if self._chat_task and not self._chat_task.done(): + self._chat_task.cancel() + if self.reply_mode == "priority" and self._priority_chat_task and not self._priority_chat_task.done(): + self._priority_chat_task.cancel() + logger.info(f"[{self.stream_name}] NormalChat 已停用。") # ================================ # 缓存管理模块 @@ -405,6 +402,35 @@ class NormalChat: f"[{self.stream_name}] 更新用户 {person_id} 的消息段,消息时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(msg_time))}" ) + async def _priority_chat_loop(self): + """ + 使用优先级队列的消息处理循环。 + """ + while not self._disabled: + try: + if not self.priority_manager.is_empty(): + # 获取最高优先级的消息 + message_to_process = self.priority_manager.get_highest_priority_message() + + if message_to_process: + logger.info( + f"[{self.stream_name}] 从队列中取出消息进行处理: User {message_to_process.message_info.user_info.user_id}, Time: {time.strftime('%H:%M:%S', time.localtime(message_to_process.message_info.time))}" + ) + # 检查是否应该回复 + async with self.chat_stream.get_process_lock(): + await self._process_chat_message(message_to_process) + + # 等待一段时间再检查队列 + await asyncio.sleep(1) + + except asyncio.CancelledError: + logger.info(f"[{self.stream_name}] 优先级聊天循环被取消。") + break + except Exception as e: + logger.error(f"[{self.stream_name}] 优先级聊天循环出现错误: {e}", exc_info=True) + # 出现错误时,等待更长时间避免频繁报错 + await asyncio.sleep(10) + # 改为实例方法 async def _create_thinking_message(self, message: MessageRecv, timestamp: Optional[float] = None) -> str: """创建思考消息""" @@ -602,15 +628,33 @@ class NormalChat: # 改为实例方法, 移除 chat 参数 async def normal_response(self, message: MessageRecv, is_mentioned: bool, interested_rate: float) -> None: - # 新增:如果已停用,直接返回 + """ + 处理接收到的消息。 + 根据回复模式,决定是立即处理还是放入优先级队列。 + """ + if self._disabled: + return + + # 根据回复模式决定行为 + if self.reply_mode == "priority": + # 优先模式下,所有消息都进入管理器 + if self.priority_manager: + self.priority_manager.add_message(message) + return + + # --- 以下为原有的 "兴趣" 模式逻辑 --- + await self._process_message(message, is_mentioned, interested_rate) + + async def _process_message(self, message: MessageRecv, is_mentioned: bool, interested_rate: float) -> None: + """ + 实际处理单条消息的逻辑,包括意愿判断、回复生成、动作执行等。 + """ if self._disabled: - logger.info(f"[{self.stream_name}] 已停用,忽略 normal_response。") return # 新增:在auto模式下检查是否需要直接切换到focus模式 if global_config.chat.chat_mode == "auto": - should_switch = await self._check_should_switch_to_focus() - if should_switch: + if await self._should_switch_to_focus(message, is_mentioned, interested_rate): logger.info(f"[{self.stream_name}] 检测到切换到focus聊天模式的条件,直接执行切换") if self.on_switch_to_focus_callback: await self.on_switch_to_focus_callback() @@ -864,8 +908,11 @@ class NormalChat: self._chat_task = None try: - logger.debug(f"[{self.stream_name}] 创建新的聊天轮询任务") - polling_task = asyncio.create_task(self._reply_interested_message()) + logger.debug(f"[{self.stream_name}] 创建新的聊天轮询任务,模式: {self.reply_mode}") + if self.reply_mode == "priority": + polling_task = asyncio.create_task(self._priority_reply_loop()) + else: # 默认或 "interest" 模式 + polling_task = asyncio.create_task(self._reply_interested_message()) # 设置回调 polling_task.add_done_callback(lambda t: self._handle_task_completion(t)) @@ -986,6 +1033,52 @@ class NormalChat: # 返回最近的limit条记录,按时间倒序排列 return sorted(self.recent_replies[-limit:], key=lambda x: x["time"], reverse=True) + async def _priority_reply_loop(self) -> None: + """ + [优先级模式] 循环获取并处理最高优先级的消息。 + """ + logger.info(f"[{self.stream_name}] 已启动优先级回复模式循环。") + try: + while not self._disabled: + if self.priority_manager is None: + logger.error(f"[{self.stream_name}] 处于优先级模式,但 priority_manager 未初始化。") + await asyncio.sleep(5) + continue + + # 动态调整回复频率 + self.adjust_reply_frequency() + + # 从优先级队列中获取消息 + highest_priority_message = self.priority_manager.get_highest_priority_message() + + if highest_priority_message: + message = highest_priority_message + logger.debug( + f"[{self.stream_name}] 从优先级队列中取出消息进行处理: {message.processed_plain_text[:30]}..." + ) + + # 复用现有的消息处理逻辑 + # 需要计算 is_mentioned 和 interested_rate + is_mentioned = message.is_mentioned + # 对于优先级模式,我们可以认为取出的消息就是我们感兴趣的 + # 或者我们可以从 priority_manager 的 PrioritizedMessage 中获取原始兴趣分 + # 这里我们先用一个较高的固定值,或者从消息本身获取 + interested_rate = 1.0 # 简化处理,或者可以传递更精确的值 + + await self._process_message(message, is_mentioned, interested_rate) + + # 处理完一条消息后可以稍微等待,避免过于频繁地连续回复 + await asyncio.sleep(global_config.chat.get("priority_post_reply_delay", 1.0)) + else: + # 如果队列为空,等待一段时间 + await asyncio.sleep(global_config.chat.get("priority_empty_queue_delay", 0.5)) + + except asyncio.CancelledError: + logger.debug(f"[{self.stream_name}] 优先级回复任务被取消。") + raise # 重新抛出异常 + except Exception as e: + logger.error(f"[{self.stream_name}] 优先级回复循环异常: {e}", exc_info=True) + def adjust_reply_frequency(self): """ 根据预设规则动态调整回复意愿(willing_amplifier)。 diff --git a/src/chat/normal_chat/priority_manager.py b/src/chat/normal_chat/priority_manager.py new file mode 100644 index 000000000..a059a96a9 --- /dev/null +++ b/src/chat/normal_chat/priority_manager.py @@ -0,0 +1,118 @@ +import time +import heapq +import math +from typing import List, Tuple, Dict, Any, Optional +from ..message_receive.message import MessageSending, MessageRecv, MessageThinking, MessageSet +from src.common.logger import get_logger + +logger = get_logger("normal_chat") + + +class PrioritizedMessage: + """带有优先级的消息对象""" + + def __init__(self, message: MessageRecv, interest_score: float, is_vip: bool = False): + self.message = message + self.arrival_time = time.time() + self.interest_score = interest_score + self.is_vip = is_vip + self.priority = self.calculate_priority() + + def calculate_priority(self, decay_rate: float = 0.01) -> float: + """ + 计算优先级分数。 + 优先级 = 兴趣分 * exp(-衰减率 * 消息年龄) + """ + age = time.time() - self.arrival_time + decay_factor = math.exp(-decay_rate * age) + priority = self.interest_score * decay_factor + return priority + + def __lt__(self, other: "PrioritizedMessage") -> bool: + """用于堆排序的比较函数,我们想要一个最大堆,所以用 >""" + return self.priority > other.priority + + +class PriorityManager: + """ + 管理消息队列,根据优先级选择消息进行处理。 + """ + + def __init__(self, interest_dict: Dict[str, float], normal_queue_max_size: int = 5): + self.vip_queue: List[PrioritizedMessage] = [] # VIP 消息队列 (最大堆) + self.normal_queue: List[PrioritizedMessage] = [] # 普通消息队列 (最大堆) + self.interest_dict = interest_dict if interest_dict is not None else {} + self.normal_queue_max_size = normal_queue_max_size + self.vip_users = self.interest_dict.get("vip_users", []) # 假设vip用户在interest_dict中指定 + + def _get_interest_score(self, user_id: str) -> float: + """获取用户的兴趣分,默认为1.0""" + return self.interest_dict.get("interests", {}).get(user_id, 1.0) + + def _is_vip(self, user_id: str) -> bool: + """检查用户是否为VIP""" + return user_id in self.vip_users + + def add_message(self, message: MessageRecv): + """ + 添加新消息到合适的队列中。 + """ + user_id = message.message_info.user_info.user_id + is_vip = self._is_vip(user_id) + interest_score = self._get_interest_score(user_id) + + p_message = PrioritizedMessage(message, interest_score, is_vip) + + if is_vip: + heapq.heappush(self.vip_queue, p_message) + logger.debug(f"消息来自VIP用户 {user_id}, 已添加到VIP队列. 当前VIP队列长度: {len(self.vip_queue)}") + else: + if len(self.normal_queue) >= self.normal_queue_max_size: + # 如果队列已满,只在消息优先级高于最低优先级消息时才添加 + if p_message.priority > self.normal_queue[0].priority: + heapq.heapreplace(self.normal_queue, p_message) + logger.debug(f"普通队列已满,但新消息优先级更高,已替换. 用户: {user_id}") + else: + logger.debug(f"普通队列已满且新消息优先级较低,已忽略. 用户: {user_id}") + else: + heapq.heappush(self.normal_queue, p_message) + logger.debug( + f"消息来自普通用户 {user_id}, 已添加到普通队列. 当前普通队列长度: {len(self.normal_queue)}" + ) + + def get_highest_priority_message(self) -> Optional[MessageRecv]: + """ + 从VIP和普通队列中获取当前最高优先级的消息。 + """ + # 更新所有消息的优先级 + for p_msg in self.vip_queue: + p_msg.priority = p_msg.calculate_priority() + for p_msg in self.normal_queue: + p_msg.priority = p_msg.calculate_priority() + + # 重建堆 + heapq.heapify(self.vip_queue) + heapq.heapify(self.normal_queue) + + vip_msg = self.vip_queue[0] if self.vip_queue else None + normal_msg = self.normal_queue[0] if self.normal_queue else None + + if vip_msg and normal_msg: + if vip_msg.priority >= normal_msg.priority: + return heapq.heappop(self.vip_queue).message + else: + return heapq.heappop(self.normal_queue).message + elif vip_msg: + return heapq.heappop(self.vip_queue).message + elif normal_msg: + return heapq.heappop(self.normal_queue).message + else: + return None + + def is_empty(self) -> bool: + """检查所有队列是否为空""" + return not self.vip_queue and not self.normal_queue + + def get_queue_status(self) -> str: + """获取队列状态信息""" + return f"VIP队列: {len(self.vip_queue)}, 普通队列: {len(self.normal_queue)}" From c7fc6e57ff2a3a2d63df33b6b6bebbcd35690c23 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Sat, 28 Jun 2025 10:42:03 +0000 Subject: [PATCH 2/4] =?UTF-8?q?=F0=9F=A4=96=20=E8=87=AA=E5=8A=A8=E6=A0=BC?= =?UTF-8?q?=E5=BC=8F=E5=8C=96=E4=BB=A3=E7=A0=81=20[skip=20ci]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chat/normal_chat/normal_chat.py | 11 +---------- src/chat/normal_chat/priority_manager.py | 4 ++-- 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/src/chat/normal_chat/normal_chat.py b/src/chat/normal_chat/normal_chat.py index 84a8febe8..b11669654 100644 --- a/src/chat/normal_chat/normal_chat.py +++ b/src/chat/normal_chat/normal_chat.py @@ -2,29 +2,20 @@ import asyncio import time from random import random from typing import List, Dict, Optional, Any -from typing import List, Optional, Dict # 导入类型提示 import os import pickle from maim_message import UserInfo, Seg from src.common.logger import get_logger -from src.chat.heart_flow.utils_chat import get_chat_type_and_target_info -from src.manager.mood_manager import mood_manager -from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager +from src.chat.message_receive.chat_stream import ChatStream from src.chat.utils.timer_calculator import Timer -from src.chat.message_receive.chat_stream import ChatStream from src.chat.utils.prompt_builder import global_prompt_manager -from .normal_chat_generator import NormalChatGenerator from ..message_receive.message import MessageSending, MessageRecv, MessageThinking, MessageSet from src.chat.message_receive.message_sender import message_manager from src.chat.normal_chat.willing.willing_manager import get_willing_manager from src.chat.normal_chat.normal_chat_utils import get_recent_message_stats from src.config.config import global_config from src.chat.focus_chat.planners.action_manager import ActionManager -from src.chat.normal_chat.normal_chat_planner import NormalChatPlanner -from src.chat.normal_chat.normal_chat_action_modifier import NormalChatActionModifier -from src.chat.normal_chat.normal_chat_expressor import NormalChatExpressor -from src.chat.replyer.default_generator import DefaultReplyer from src.person_info.person_info import PersonInfoManager from src.person_info.relationship_manager import get_relationship_manager from src.chat.utils.chat_message_builder import ( diff --git a/src/chat/normal_chat/priority_manager.py b/src/chat/normal_chat/priority_manager.py index a059a96a9..07112dcb2 100644 --- a/src/chat/normal_chat/priority_manager.py +++ b/src/chat/normal_chat/priority_manager.py @@ -1,8 +1,8 @@ import time import heapq import math -from typing import List, Tuple, Dict, Any, Optional -from ..message_receive.message import MessageSending, MessageRecv, MessageThinking, MessageSet +from typing import List, Dict, Optional +from ..message_receive.message import MessageRecv from src.common.logger import get_logger logger = get_logger("normal_chat") From 97ab4a242e5f735225d51da123deb3e51e6bdd53 Mon Sep 17 00:00:00 2001 From: tcmofashi Date: Tue, 1 Jul 2025 10:26:29 +0800 Subject: [PATCH 3/4] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E9=80=82?= =?UTF-8?q?=E7=94=A8=E4=BA=8E=E7=9B=B4=E6=92=AD=E7=AD=89=E5=9C=BA=E6=99=AF?= =?UTF-8?q?=E7=9A=84=E6=96=B0=E5=9B=9E=E5=A4=8D=E7=AD=96=E7=95=A5=EF=BC=8C?= =?UTF-8?q?=E5=9C=A8ada=E5=8F=91=E9=80=81=E7=89=B9=E5=AE=9A=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E6=AE=B5=E7=9A=84=E6=83=85=E5=86=B5=E4=B8=8B=E5=8F=AF?= =?UTF-8?q?=E4=BB=A5=E6=8C=89=E7=85=A7=E4=BC=98=E5=85=88=E5=BA=A6=E5=90=8C?= =?UTF-8?q?=E4=B8=80=E6=97=B6=E9=97=B4=E5=8F=AA=E5=9B=9E=E5=A4=8D=E4=B8=80?= =?UTF-8?q?=E4=BA=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chat/message_receive/chat_stream.py | 10 + src/chat/message_receive/message.py | 24 +- src/chat/normal_chat/normal_chat.py | 524 ++++++++++++----------- src/chat/normal_chat/priority_manager.py | 26 +- 4 files changed, 323 insertions(+), 261 deletions(-) diff --git a/src/chat/message_receive/chat_stream.py b/src/chat/message_receive/chat_stream.py index 55d296db9..a82acc413 100644 --- a/src/chat/message_receive/chat_stream.py +++ b/src/chat/message_receive/chat_stream.py @@ -47,6 +47,16 @@ class ChatMessageContext: return False return True + def get_priority_mode(self) -> str: + """获取优先级模式""" + return self.message.priority_mode + + def get_priority_info(self) -> Optional[dict]: + """获取优先级信息""" + if hasattr(self.message, "priority_info") and self.message.priority_info: + return self.message.priority_info + return None + class ChatStream: """聊天流对象,存储一个完整的聊天上下文""" diff --git a/src/chat/message_receive/message.py b/src/chat/message_receive/message.py index 5798eb512..1c8f7789e 100644 --- a/src/chat/message_receive/message.py +++ b/src/chat/message_receive/message.py @@ -108,6 +108,9 @@ class MessageRecv(Message): self.detailed_plain_text = message_dict.get("detailed_plain_text", "") self.is_emoji = False self.is_picid = False + self.is_mentioned = 0.0 + self.priority_mode = "interest" + self.priority_info = None def update_chat_stream(self, chat_stream: "ChatStream"): self.chat_stream = chat_stream @@ -146,8 +149,27 @@ class MessageRecv(Message): if isinstance(segment.data, str): return await get_image_manager().get_emoji_description(segment.data) return "[发了一个表情包,网卡了加载不出来]" + elif segment.type == "mention_bot": + self.is_mentioned = float(segment.data) + return "" + elif segment.type == "set_priority_mode": + # 处理设置优先级模式的消息段 + if isinstance(segment.data, str): + self.priority_mode = segment.data + return "" + elif segment.type == "priority_info": + if isinstance(segment.data, dict): + # 处理优先级信息 + self.priority_info = segment.data + """ + { + 'message_type': 'vip', # vip or normal + 'message_priority': 1.0, # 优先级,大为优先,float + } + """ + return "" else: - return f"[{segment.type}:{str(segment.data)}]" + return "" except Exception as e: logger.error(f"处理消息段失败: {str(e)}, 类型: {segment.type}, 数据: {segment.data}") return f"[处理失败的{segment.type}消息]" diff --git a/src/chat/normal_chat/normal_chat.py b/src/chat/normal_chat/normal_chat.py index b11669654..9c3144cc4 100644 --- a/src/chat/normal_chat/normal_chat.py +++ b/src/chat/normal_chat/normal_chat.py @@ -6,7 +6,7 @@ import os import pickle from maim_message import UserInfo, Seg from src.common.logger import get_logger -from src.chat.message_receive.chat_stream import ChatStream +from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager from src.chat.utils.timer_calculator import Timer from src.chat.utils.prompt_builder import global_prompt_manager @@ -27,6 +27,15 @@ from src.chat.utils.chat_message_builder import ( from .priority_manager import PriorityManager import traceback +from .normal_chat_generator import NormalChatGenerator +from src.chat.normal_chat.normal_chat_expressor import NormalChatExpressor +from src.chat.replyer.default_generator import DefaultReplyer +from src.chat.normal_chat.normal_chat_planner import NormalChatPlanner +from src.chat.normal_chat.normal_chat_action_modifier import NormalChatActionModifier + +from src.chat.heart_flow.utils_chat import get_chat_type_and_target_info +from src.manager.mood_manager import mood_manager + willing_manager = get_willing_manager() logger = get_logger("normal_chat") @@ -46,7 +55,7 @@ class NormalChat: 每个聊天(私聊或群聊)都会有一个独立的NormalChat实例。 """ - def __init__(self, chat_stream: ChatStream): + def __init__(self, chat_stream: ChatStream, interest_dict: dict = None, on_switch_to_focus_callback=None): """ 初始化NormalChat实例。 @@ -55,10 +64,61 @@ class NormalChat: """ self.chat_stream = chat_stream self.stream_id = chat_stream.stream_id - self.stream_name = chat_stream.get_name() - self.willing_amplifier = 1.0 # 回复意愿放大器,动态调整 - self.enable_planner = global_config.normal_chat.get("enable_planner", False) # 是否启用planner - self.action_manager = ActionManager(chat_stream) # 初始化动作管理器 + + self.stream_name = get_chat_manager().get_stream_name(self.stream_id) or self.stream_id + + # 初始化Normal Chat专用表达器 + self.expressor = NormalChatExpressor(self.chat_stream) + self.replyer = DefaultReplyer(self.chat_stream) + + # Interest dict + self.interest_dict = interest_dict + + self.is_group_chat, self.chat_target_info = get_chat_type_and_target_info(self.stream_id) + + self.willing_amplifier = 1 + self.start_time = time.time() + + # Other sync initializations + self.gpt = NormalChatGenerator() + self.mood_manager = mood_manager + self.start_time = time.time() + + self._initialized = False # Track initialization status + + # Planner相关初始化 + self.action_manager = ActionManager() + self.planner = NormalChatPlanner(self.stream_name, self.action_manager) + self.action_modifier = NormalChatActionModifier(self.action_manager, self.stream_id, self.stream_name) + self.enable_planner = global_config.normal_chat.enable_planner # 从配置中读取是否启用planner + + # 记录最近的回复内容,每项包含: {time, user_message, response, is_mentioned, is_reference_reply} + self.recent_replies = [] + self.max_replies_history = 20 # 最多保存最近20条回复记录 + + # 新的消息段缓存结构: + # {person_id: [{"start_time": float, "end_time": float, "last_msg_time": float, "message_count": int}, ...]} + self.person_engaged_cache: Dict[str, List[Dict[str, any]]] = {} + + # 持久化存储文件路径 + self.cache_file_path = os.path.join("data", "relationship", f"relationship_cache_{self.stream_id}.pkl") + + # 最后处理的消息时间,避免重复处理相同消息 + self.last_processed_message_time = 0.0 + + # 最后清理时间,用于定期清理老消息段 + self.last_cleanup_time = 0.0 + + # 添加回调函数,用于在满足条件时通知切换到focus_chat模式 + self.on_switch_to_focus_callback = on_switch_to_focus_callback + + self._disabled = False # 增加停用标志 + + # 加载持久化的缓存 + self._load_cache() + + logger.debug(f"[{self.stream_name}] NormalChat 初始化完成 (异步部分)。") + self.action_type: Optional[str] = None # 当前动作类型 self.is_parallel_action: bool = False # 是否是可并行动作 @@ -66,20 +126,15 @@ class NormalChat: self._chat_task: Optional[asyncio.Task] = None self._disabled = False # 停用标志 - # 消息段缓存,用于关系构建 - self.person_engaged_cache: Dict[str, List[Dict[str, Any]]] = {} - self.last_cleanup_time = time.time() - - # 最近回复记录 - self.recent_replies: List[Dict[str, Any]] = [] + self.on_switch_to_focus_callback = on_switch_to_focus_callback # 新增:回复模式和优先级管理器 - self.reply_mode = global_config.chat.get_reply_mode(self.stream_id) + self.reply_mode = self.chat_stream.context.get_priority_mode() if self.reply_mode == "priority": - interest_dict = self.chat_stream.interest_dict or {} + interest_dict = interest_dict or {} self.priority_manager = PriorityManager( interest_dict=interest_dict, - normal_queue_max_size=global_config.chat.get("priority_queue_max_size", 5), + normal_queue_max_size=5, ) else: self.priority_manager = None @@ -393,6 +448,29 @@ class NormalChat: f"[{self.stream_name}] 更新用户 {person_id} 的消息段,消息时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(msg_time))}" ) + async def _priority_chat_loop_add_message(self): + while not self._disabled: + try: + ids = list(self.interest_dict.keys()) + for msg_id in ids: + message, interest_value, _ = self.interest_dict[msg_id] + if not self._disabled: + # 更新消息段信息 + self._update_user_message_segments(message) + + # 添加消息到优先级管理器 + if self.priority_manager: + self.priority_manager.add_message(message, interest_value) + self.interest_dict.pop(msg_id, None) + except Exception as e: + logger.error( + f"[{self.stream_name}] 优先级聊天循环添加消息时出现错误: {traceback.format_exc()}", exc_info=True + ) + print(traceback.format_exc()) + # 出现错误时,等待一段时间再重试 + raise + await asyncio.sleep(0.1) + async def _priority_chat_loop(self): """ 使用优先级队列的消息处理循环。 @@ -401,15 +479,22 @@ class NormalChat: try: if not self.priority_manager.is_empty(): # 获取最高优先级的消息 - message_to_process = self.priority_manager.get_highest_priority_message() + message = self.priority_manager.get_highest_priority_message() - if message_to_process: + if message: logger.info( - f"[{self.stream_name}] 从队列中取出消息进行处理: User {message_to_process.message_info.user_info.user_id}, Time: {time.strftime('%H:%M:%S', time.localtime(message_to_process.message_info.time))}" + f"[{self.stream_name}] 从队列中取出消息进行处理: User {message.message_info.user_info.user_id}, Time: {time.strftime('%H:%M:%S', time.localtime(message.message_info.time))}" ) - # 检查是否应该回复 - async with self.chat_stream.get_process_lock(): - await self._process_chat_message(message_to_process) + # 执行定期清理 + self._cleanup_old_segments() + + # 更新消息段信息 + self._update_user_message_segments(message) + + # 检查是否有用户满足关系构建条件 + asyncio.create_task(self._check_relation_building_conditions()) + + await self.reply_one_message(message) # 等待一段时间再检查队列 await asyncio.sleep(1) @@ -418,7 +503,7 @@ class NormalChat: logger.info(f"[{self.stream_name}] 优先级聊天循环被取消。") break except Exception as e: - logger.error(f"[{self.stream_name}] 优先级聊天循环出现错误: {e}", exc_info=True) + logger.error(f"[{self.stream_name}] 优先级聊天循环出现错误: {traceback.format_exc()}", exc_info=True) # 出现错误时,等待更长时间避免频繁报错 await asyncio.sleep(10) @@ -645,7 +730,7 @@ class NormalChat: # 新增:在auto模式下检查是否需要直接切换到focus模式 if global_config.chat.chat_mode == "auto": - if await self._should_switch_to_focus(message, is_mentioned, interested_rate): + if await self._check_should_switch_to_focus(): logger.info(f"[{self.stream_name}] 检测到切换到focus聊天模式的条件,直接执行切换") if self.on_switch_to_focus_callback: await self.on_switch_to_focus_callback() @@ -695,176 +780,10 @@ class NormalChat: do_reply = False response_set = None # 初始化 response_set if random() < reply_probability: - do_reply = True - - # 回复前处理 - await willing_manager.before_generate_reply_handle(message.message_info.message_id) - - thinking_id = await self._create_thinking_message(message) - - # 如果启用planner,预先修改可用actions(避免在并行任务中重复调用) - available_actions = None - if self.enable_planner: - try: - await self.action_modifier.modify_actions_for_normal_chat( - self.chat_stream, self.recent_replies, message.processed_plain_text - ) - available_actions = self.action_manager.get_using_actions_for_mode("normal") - except Exception as e: - logger.warning(f"[{self.stream_name}] 获取available_actions失败: {e}") - available_actions = None - - # 定义并行执行的任务 - async def generate_normal_response(): - """生成普通回复""" - try: - return await self.gpt.generate_response( - message=message, - thinking_id=thinking_id, - enable_planner=self.enable_planner, - available_actions=available_actions, - ) - except Exception as e: - logger.error(f"[{self.stream_name}] 回复生成出现错误:{str(e)} {traceback.format_exc()}") - return None - - async def plan_and_execute_actions(): - """规划和执行额外动作""" - if not self.enable_planner: - logger.debug(f"[{self.stream_name}] Planner未启用,跳过动作规划") - return None - - try: - # 获取发送者名称(动作修改已在并行执行前完成) - sender_name = self._get_sender_name(message) - - no_action = { - "action_result": { - "action_type": "no_action", - "action_data": {}, - "reasoning": "规划器初始化默认", - "is_parallel": True, - }, - "chat_context": "", - "action_prompt": "", - } - - # 检查是否应该跳过规划 - if self.action_modifier.should_skip_planning(): - logger.debug(f"[{self.stream_name}] 没有可用动作,跳过规划") - self.action_type = "no_action" - return no_action - - # 执行规划 - plan_result = await self.planner.plan(message, sender_name) - action_type = plan_result["action_result"]["action_type"] - action_data = plan_result["action_result"]["action_data"] - reasoning = plan_result["action_result"]["reasoning"] - is_parallel = plan_result["action_result"].get("is_parallel", False) - - logger.info( - f"[{self.stream_name}] Planner决策: {action_type}, 理由: {reasoning}, 并行执行: {is_parallel}" - ) - self.action_type = action_type # 更新实例属性 - self.is_parallel_action = is_parallel # 新增:保存并行执行标志 - - # 如果规划器决定不执行任何动作 - if action_type == "no_action": - logger.debug(f"[{self.stream_name}] Planner决定不执行任何额外动作") - return no_action - - # 执行额外的动作(不影响回复生成) - action_result = await self._execute_action(action_type, action_data, message, thinking_id) - if action_result is not None: - logger.info(f"[{self.stream_name}] 额外动作 {action_type} 执行完成") - else: - logger.warning(f"[{self.stream_name}] 额外动作 {action_type} 执行失败") - - return { - "action_type": action_type, - "action_data": action_data, - "reasoning": reasoning, - "is_parallel": is_parallel, - } - - except Exception as e: - logger.error(f"[{self.stream_name}] Planner执行失败: {e}") - return no_action - - # 并行执行回复生成和动作规划 - self.action_type = None # 初始化动作类型 - self.is_parallel_action = False # 初始化并行动作标志 - with Timer("并行生成回复和规划", timing_results): - response_set, plan_result = await asyncio.gather( - generate_normal_response(), plan_and_execute_actions(), return_exceptions=True - ) - - # 处理生成回复的结果 - if isinstance(response_set, Exception): - logger.error(f"[{self.stream_name}] 回复生成异常: {response_set}") - response_set = None - - # 处理规划结果(可选,不影响回复) - if isinstance(plan_result, Exception): - logger.error(f"[{self.stream_name}] 动作规划异常: {plan_result}") - elif plan_result: - logger.debug(f"[{self.stream_name}] 额外动作处理完成: {self.action_type}") - - if not response_set or ( - self.enable_planner and self.action_type not in ["no_action"] and not self.is_parallel_action - ): - if not response_set: - logger.info(f"[{self.stream_name}] 模型未生成回复内容") - elif self.enable_planner and self.action_type not in ["no_action"] and not self.is_parallel_action: - logger.info(f"[{self.stream_name}] 模型选择其他动作(非并行动作)") - # 如果模型未生成回复,移除思考消息 - container = await message_manager.get_container(self.stream_id) # 使用 self.stream_id - for msg in container.messages[:]: - if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id: - container.messages.remove(msg) - logger.debug(f"[{self.stream_name}] 已移除未产生回复的思考消息 {thinking_id}") - break - # 需要在此处也调用 not_reply_handle 和 delete 吗? - # 如果是因为模型没回复,也算是一种 "未回复" - await willing_manager.not_reply_handle(message.message_info.message_id) - willing_manager.delete(message.message_info.message_id) - return # 不执行后续步骤 - - # logger.info(f"[{self.stream_name}] 回复内容: {response_set}") - - if self._disabled: - logger.info(f"[{self.stream_name}] 已停用,忽略 normal_response。") - return - - # 发送回复 (不再需要传入 chat) - with Timer("消息发送", timing_results): - first_bot_msg = await self._add_messages_to_manager(message, response_set, thinking_id) - - # 检查 first_bot_msg 是否为 None (例如思考消息已被移除的情况) - if first_bot_msg: - # 消息段已在接收消息时更新,这里不需要额外处理 - - # 记录回复信息到最近回复列表中 - reply_info = { - "time": time.time(), - "user_message": message.processed_plain_text, - "user_info": { - "user_id": message.message_info.user_info.user_id, - "user_nickname": message.message_info.user_info.user_nickname, - }, - "response": response_set, - "is_mentioned": is_mentioned, - "is_reference_reply": message.reply is not None, # 判断是否为引用回复 - "timing": {k: round(v, 2) for k, v in timing_results.items()}, - } - self.recent_replies.append(reply_info) - # 保持最近回复历史在限定数量内 - if len(self.recent_replies) > self.max_replies_history: - self.recent_replies = self.recent_replies[-self.max_replies_history :] - - # 回复后处理 - await willing_manager.after_generate_reply_handle(message.message_info.message_id) - + with Timer("获取回复", timing_results): + await willing_manager.before_generate_reply_handle(message.message_info.message_id) + do_reply = await self.reply_one_message(message) + response_set = do_reply if do_reply else None # 输出性能计时结果 if do_reply and response_set: # 确保 response_set 不是 None timing_str = " | ".join([f"{step}: {duration:.2f}秒" for step, duration in timing_results.items()]) @@ -873,6 +792,7 @@ class NormalChat: logger.info( f"[{self.stream_name}]回复消息: {trigger_msg[:30]}... | 回复内容: {response_msg[:30]}... | 计时: {timing_str}" ) + await willing_manager.after_generate_reply_handle(message.message_info.message_id) elif not do_reply: # 不回复处理 await willing_manager.not_reply_handle(message.message_info.message_id) @@ -880,6 +800,167 @@ class NormalChat: # 意愿管理器:注销当前message信息 (无论是否回复,只要处理过就删除) willing_manager.delete(message.message_info.message_id) + async def reply_one_message(self, message: MessageRecv) -> None: + # 回复前处理 + thinking_id = await self._create_thinking_message(message) + + # 如果启用planner,预先修改可用actions(避免在并行任务中重复调用) + available_actions = None + if self.enable_planner: + try: + await self.action_modifier.modify_actions_for_normal_chat( + self.chat_stream, self.recent_replies, message.processed_plain_text + ) + available_actions = self.action_manager.get_using_actions_for_mode("normal") + except Exception as e: + logger.warning(f"[{self.stream_name}] 获取available_actions失败: {e}") + available_actions = None + + # 定义并行执行的任务 + async def generate_normal_response(): + """生成普通回复""" + try: + return await self.gpt.generate_response( + message=message, + thinking_id=thinking_id, + enable_planner=self.enable_planner, + available_actions=available_actions, + ) + except Exception as e: + logger.error(f"[{self.stream_name}] 回复生成出现错误:{str(e)} {traceback.format_exc()}") + return None + + async def plan_and_execute_actions(): + """规划和执行额外动作""" + if not self.enable_planner: + logger.debug(f"[{self.stream_name}] Planner未启用,跳过动作规划") + return None + + try: + # 获取发送者名称(动作修改已在并行执行前完成) + sender_name = self._get_sender_name(message) + + no_action = { + "action_result": { + "action_type": "no_action", + "action_data": {}, + "reasoning": "规划器初始化默认", + "is_parallel": True, + }, + "chat_context": "", + "action_prompt": "", + } + + # 检查是否应该跳过规划 + if self.action_modifier.should_skip_planning(): + logger.debug(f"[{self.stream_name}] 没有可用动作,跳过规划") + self.action_type = "no_action" + return no_action + + # 执行规划 + plan_result = await self.planner.plan(message, sender_name) + action_type = plan_result["action_result"]["action_type"] + action_data = plan_result["action_result"]["action_data"] + reasoning = plan_result["action_result"]["reasoning"] + is_parallel = plan_result["action_result"].get("is_parallel", False) + + logger.info( + f"[{self.stream_name}] Planner决策: {action_type}, 理由: {reasoning}, 并行执行: {is_parallel}" + ) + self.action_type = action_type # 更新实例属性 + self.is_parallel_action = is_parallel # 新增:保存并行执行标志 + + # 如果规划器决定不执行任何动作 + if action_type == "no_action": + logger.debug(f"[{self.stream_name}] Planner决定不执行任何额外动作") + return no_action + + # 执行额外的动作(不影响回复生成) + action_result = await self._execute_action(action_type, action_data, message, thinking_id) + if action_result is not None: + logger.info(f"[{self.stream_name}] 额外动作 {action_type} 执行完成") + else: + logger.warning(f"[{self.stream_name}] 额外动作 {action_type} 执行失败") + + return { + "action_type": action_type, + "action_data": action_data, + "reasoning": reasoning, + "is_parallel": is_parallel, + } + + except Exception as e: + logger.error(f"[{self.stream_name}] Planner执行失败: {e}") + return no_action + + # 并行执行回复生成和动作规划 + self.action_type = None # 初始化动作类型 + self.is_parallel_action = False # 初始化并行动作标志 + response_set, plan_result = await asyncio.gather( + generate_normal_response(), plan_and_execute_actions(), return_exceptions=True + ) + + # 处理生成回复的结果 + if isinstance(response_set, Exception): + logger.error(f"[{self.stream_name}] 回复生成异常: {response_set}") + response_set = None + + # 处理规划结果(可选,不影响回复) + if isinstance(plan_result, Exception): + logger.error(f"[{self.stream_name}] 动作规划异常: {plan_result}") + elif plan_result: + logger.debug(f"[{self.stream_name}] 额外动作处理完成: {self.action_type}") + + if not response_set or ( + self.enable_planner and self.action_type not in ["no_action"] and not self.is_parallel_action + ): + if not response_set: + logger.info(f"[{self.stream_name}] 模型未生成回复内容") + elif self.enable_planner and self.action_type not in ["no_action"] and not self.is_parallel_action: + logger.info(f"[{self.stream_name}] 模型选择其他动作(非并行动作)") + # 如果模型未生成回复,移除思考消息 + container = await message_manager.get_container(self.stream_id) # 使用 self.stream_id + for msg in container.messages[:]: + if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id: + container.messages.remove(msg) + logger.debug(f"[{self.stream_name}] 已移除未产生回复的思考消息 {thinking_id}") + break + # 需要在此处也调用 not_reply_handle 和 delete 吗? + # 如果是因为模型没回复,也算是一种 "未回复" + return False + + # logger.info(f"[{self.stream_name}] 回复内容: {response_set}") + + if self._disabled: + logger.info(f"[{self.stream_name}] 已停用,忽略 normal_response。") + return False + + # 发送回复 (不再需要传入 chat) + first_bot_msg = await self._add_messages_to_manager(message, response_set, thinking_id) + + # 检查 first_bot_msg 是否为 None (例如思考消息已被移除的情况) + if first_bot_msg: + # 消息段已在接收消息时更新,这里不需要额外处理 + + # 记录回复信息到最近回复列表中 + reply_info = { + "time": time.time(), + "user_message": message.processed_plain_text, + "user_info": { + "user_id": message.message_info.user_info.user_id, + "user_nickname": message.message_info.user_info.user_nickname, + }, + "response": response_set, + # "is_mentioned": is_mentioned, + "is_reference_reply": message.reply is not None, # 判断是否为引用回复 + # "timing": {k: round(v, 2) for k, v in timing_results.items()}, + } + self.recent_replies.append(reply_info) + # 保持最近回复历史在限定数量内 + if len(self.recent_replies) > self.max_replies_history: + self.recent_replies = self.recent_replies[-self.max_replies_history :] + return response_set if response_set else False + # 改为实例方法, 移除 chat 参数 async def start_chat(self): @@ -899,9 +980,14 @@ class NormalChat: self._chat_task = None try: - logger.debug(f"[{self.stream_name}] 创建新的聊天轮询任务,模式: {self.reply_mode}") + logger.info(f"[{self.stream_name}] 创建新的聊天轮询任务,模式: {self.reply_mode}") if self.reply_mode == "priority": - polling_task = asyncio.create_task(self._priority_reply_loop()) + polling_task_send = asyncio.create_task(self._priority_chat_loop()) + polling_task_recv = asyncio.create_task(self._priority_chat_loop_add_message()) + print("555") + polling_task = asyncio.gather(polling_task_send, polling_task_recv) + print("666") + else: # 默认或 "interest" 模式 polling_task = asyncio.create_task(self._reply_interested_message()) @@ -942,7 +1028,7 @@ class NormalChat: # 尝试获取异常,但不抛出 exc = task.exception() if exc: - logger.error(f"[{self.stream_name}] 任务异常: {type(exc).__name__}: {exc}") + logger.error(f"[{self.stream_name}] 任务异常: {type(exc).__name__}: {exc}", exc_info=exc) else: logger.debug(f"[{self.stream_name}] 任务正常完成") except Exception as e: @@ -1024,52 +1110,6 @@ class NormalChat: # 返回最近的limit条记录,按时间倒序排列 return sorted(self.recent_replies[-limit:], key=lambda x: x["time"], reverse=True) - async def _priority_reply_loop(self) -> None: - """ - [优先级模式] 循环获取并处理最高优先级的消息。 - """ - logger.info(f"[{self.stream_name}] 已启动优先级回复模式循环。") - try: - while not self._disabled: - if self.priority_manager is None: - logger.error(f"[{self.stream_name}] 处于优先级模式,但 priority_manager 未初始化。") - await asyncio.sleep(5) - continue - - # 动态调整回复频率 - self.adjust_reply_frequency() - - # 从优先级队列中获取消息 - highest_priority_message = self.priority_manager.get_highest_priority_message() - - if highest_priority_message: - message = highest_priority_message - logger.debug( - f"[{self.stream_name}] 从优先级队列中取出消息进行处理: {message.processed_plain_text[:30]}..." - ) - - # 复用现有的消息处理逻辑 - # 需要计算 is_mentioned 和 interested_rate - is_mentioned = message.is_mentioned - # 对于优先级模式,我们可以认为取出的消息就是我们感兴趣的 - # 或者我们可以从 priority_manager 的 PrioritizedMessage 中获取原始兴趣分 - # 这里我们先用一个较高的固定值,或者从消息本身获取 - interested_rate = 1.0 # 简化处理,或者可以传递更精确的值 - - await self._process_message(message, is_mentioned, interested_rate) - - # 处理完一条消息后可以稍微等待,避免过于频繁地连续回复 - await asyncio.sleep(global_config.chat.get("priority_post_reply_delay", 1.0)) - else: - # 如果队列为空,等待一段时间 - await asyncio.sleep(global_config.chat.get("priority_empty_queue_delay", 0.5)) - - except asyncio.CancelledError: - logger.debug(f"[{self.stream_name}] 优先级回复任务被取消。") - raise # 重新抛出异常 - except Exception as e: - logger.error(f"[{self.stream_name}] 优先级回复循环异常: {e}", exc_info=True) - def adjust_reply_frequency(self): """ 根据预设规则动态调整回复意愿(willing_amplifier)。 diff --git a/src/chat/normal_chat/priority_manager.py b/src/chat/normal_chat/priority_manager.py index 07112dcb2..9e1ef76c2 100644 --- a/src/chat/normal_chat/priority_manager.py +++ b/src/chat/normal_chat/priority_manager.py @@ -11,10 +11,10 @@ logger = get_logger("normal_chat") class PrioritizedMessage: """带有优先级的消息对象""" - def __init__(self, message: MessageRecv, interest_score: float, is_vip: bool = False): + def __init__(self, message: MessageRecv, interest_scores: List[float], is_vip: bool = False): self.message = message self.arrival_time = time.time() - self.interest_score = interest_score + self.interest_scores = interest_scores self.is_vip = is_vip self.priority = self.calculate_priority() @@ -25,7 +25,7 @@ class PrioritizedMessage: """ age = time.time() - self.arrival_time decay_factor = math.exp(-decay_rate * age) - priority = self.interest_score * decay_factor + priority = sum(self.interest_scores) + decay_factor return priority def __lt__(self, other: "PrioritizedMessage") -> bool: @@ -43,25 +43,20 @@ class PriorityManager: self.normal_queue: List[PrioritizedMessage] = [] # 普通消息队列 (最大堆) self.interest_dict = interest_dict if interest_dict is not None else {} self.normal_queue_max_size = normal_queue_max_size - self.vip_users = self.interest_dict.get("vip_users", []) # 假设vip用户在interest_dict中指定 def _get_interest_score(self, user_id: str) -> float: """获取用户的兴趣分,默认为1.0""" return self.interest_dict.get("interests", {}).get(user_id, 1.0) - def _is_vip(self, user_id: str) -> bool: - """检查用户是否为VIP""" - return user_id in self.vip_users - - def add_message(self, message: MessageRecv): + def add_message(self, message: MessageRecv, interest_score: Optional[float] = None): """ 添加新消息到合适的队列中。 """ user_id = message.message_info.user_info.user_id - is_vip = self._is_vip(user_id) - interest_score = self._get_interest_score(user_id) + is_vip = message.priority_info.get("message_type") == "vip" if message.priority_info else False + message_priority = message.priority_info.get("message_priority", 0.0) if message.priority_info else 0.0 - p_message = PrioritizedMessage(message, interest_score, is_vip) + p_message = PrioritizedMessage(message, [interest_score, message_priority], is_vip) if is_vip: heapq.heappush(self.vip_queue, p_message) @@ -97,12 +92,7 @@ class PriorityManager: vip_msg = self.vip_queue[0] if self.vip_queue else None normal_msg = self.normal_queue[0] if self.normal_queue else None - if vip_msg and normal_msg: - if vip_msg.priority >= normal_msg.priority: - return heapq.heappop(self.vip_queue).message - else: - return heapq.heappop(self.normal_queue).message - elif vip_msg: + if vip_msg: return heapq.heappop(self.vip_queue).message elif normal_msg: return heapq.heappop(self.normal_queue).message From dde41b7d4ca348b34ad238686e3339e056a3b68c Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 1 Jul 2025 02:26:46 +0000 Subject: [PATCH 4/4] =?UTF-8?q?=F0=9F=A4=96=20=E8=87=AA=E5=8A=A8=E6=A0=BC?= =?UTF-8?q?=E5=BC=8F=E5=8C=96=E4=BB=A3=E7=A0=81=20[skip=20ci]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chat/normal_chat/normal_chat.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/chat/normal_chat/normal_chat.py b/src/chat/normal_chat/normal_chat.py index 9c3144cc4..6c285f21d 100644 --- a/src/chat/normal_chat/normal_chat.py +++ b/src/chat/normal_chat/normal_chat.py @@ -1,7 +1,7 @@ import asyncio import time from random import random -from typing import List, Dict, Optional, Any +from typing import List, Dict, Optional import os import pickle from maim_message import UserInfo, Seg @@ -462,7 +462,7 @@ class NormalChat: if self.priority_manager: self.priority_manager.add_message(message, interest_value) self.interest_dict.pop(msg_id, None) - except Exception as e: + except Exception: logger.error( f"[{self.stream_name}] 优先级聊天循环添加消息时出现错误: {traceback.format_exc()}", exc_info=True ) @@ -502,7 +502,7 @@ class NormalChat: except asyncio.CancelledError: logger.info(f"[{self.stream_name}] 优先级聊天循环被取消。") break - except Exception as e: + except Exception: logger.error(f"[{self.stream_name}] 优先级聊天循环出现错误: {traceback.format_exc()}", exc_info=True) # 出现错误时,等待更长时间避免频繁报错 await asyncio.sleep(10)