diff --git a/src/chat/focus_chat/heartFC_chat.py b/src/chat/focus_chat/heartFC_chat.py index 08008bfe9..872a800a8 100644 --- a/src/chat/focus_chat/heartFC_chat.py +++ b/src/chat/focus_chat/heartFC_chat.py @@ -14,11 +14,26 @@ from src.chat.planner_actions.planner import ActionPlanner from src.chat.planner_actions.action_modifier import ActionModifier from src.chat.planner_actions.action_manager import ActionManager from src.config.config import global_config -from src.chat.focus_chat.hfc_performance_logger import HFCPerformanceLogger from src.person_info.relationship_builder_manager import relationship_builder_manager from src.chat.focus_chat.hfc_utils import CycleDetail +ERROR_LOOP_INFO = { + "loop_plan_info": { + "action_result": { + "action_type": "error", + "action_data": {}, + "reasoning": "循环处理失败", + }, + }, + "loop_action_info": { + "action_taken": False, + "reply_text": "", + "command": "", + "taken_time": time.time(), + }, +} + install(extra_lines=3) # 注释:原来的动作修改超时常量已移除,因为改为顺序执行 @@ -66,17 +81,14 @@ class HeartFChatting: self.action_planner = ActionPlanner(chat_id=self.stream_id, action_manager=self.action_manager) self.action_modifier = ActionModifier(action_manager=self.action_manager, chat_id=self.stream_id) - self._processing_lock = asyncio.Lock() - # 循环控制内部状态 - self._loop_active: bool = False # 循环是否正在运行 + self.running: bool = False self._loop_task: Optional[asyncio.Task] = None # 主循环任务 # 添加循环信息管理相关的属性 self._cycle_counter = 0 self._cycle_history: Deque[CycleDetail] = deque(maxlen=10) # 保留最近10个循环的信息 self._current_cycle_detail: Optional[CycleDetail] = None - self._shutting_down: bool = False # 关闭标志位 # 存储回调函数 self.on_stop_focus_chat = on_stop_focus_chat @@ -84,11 +96,6 @@ class HeartFChatting: self.reply_timeout_count = 0 self.plan_timeout_count = 0 - # 初始化性能记录器 - # 如果没有指定版本号,则使用全局版本管理器的版本号 - - self.performance_logger = HFCPerformanceLogger(chat_id) - logger.info( f"{self.log_prefix} HeartFChatting 初始化完成,消息疲惫阈值: {self._message_threshold}条(基于exit_focus_threshold={global_config.chat.exit_focus_threshold}计算,仅在auto模式下生效)" ) @@ -97,36 +104,23 @@ class HeartFChatting: """检查是否需要启动主循环,如果未激活则启动。""" # 如果循环已经激活,直接返回 - if self._loop_active: + if self.running: logger.debug(f"{self.log_prefix} HeartFChatting 已激活,无需重复启动") return try: # 重置消息计数器,开始新的focus会话 self.reset_message_count() - # 标记为活动状态,防止重复启动 - self._loop_active = True + self.running = True - # 检查是否已有任务在运行(理论上不应该,因为 _loop_active=False) - if self._loop_task and not self._loop_task.done(): - logger.warning(f"{self.log_prefix} 发现之前的循环任务仍在运行(不符合预期)。取消旧任务。") - self._loop_task.cancel() - try: - # 等待旧任务确实被取消 - await asyncio.wait_for(self._loop_task, timeout=5.0) - except Exception as e: - logger.warning(f"{self.log_prefix} 等待旧任务取消时出错: {e}") - self._loop_task = None # 清理旧任务引用 - - logger.debug(f"{self.log_prefix} 创建新的 HeartFChatting 主循环任务") - self._loop_task = asyncio.create_task(self._run_focus_chat()) + self._loop_task = asyncio.create_task(self._main_chat_loop()) self._loop_task.add_done_callback(self._handle_loop_completion) - logger.debug(f"{self.log_prefix} HeartFChatting 启动完成") + logger.info(f"{self.log_prefix} HeartFChatting 启动完成") except Exception as e: # 启动失败时重置状态 - self._loop_active = False + self.running = False self._loop_task = None logger.error(f"{self.log_prefix} HeartFChatting 启动失败: {e}") raise @@ -143,264 +137,151 @@ class HeartFChatting: except asyncio.CancelledError: logger.info(f"{self.log_prefix} HeartFChatting: 脱离了聊天(任务取消)") finally: - self._loop_active = False + self.running = False self._loop_task = None - if self._processing_lock.locked(): - logger.warning(f"{self.log_prefix} HeartFChatting: 处理锁在循环结束时仍被锁定,强制释放。") - self._processing_lock.release() + + def start_cycle(self): + self._cycle_counter += 1 + self._current_cycle_detail = CycleDetail(self._cycle_counter) + self._current_cycle_detail.prefix = self.log_prefix + thinking_id = "tid" + str(round(time.time(), 2)) + self._current_cycle_detail.set_thinking_id(thinking_id) + cycle_timers = {} + return cycle_timers, thinking_id + + def end_cycle(self,loop_info,cycle_timers): + self._current_cycle_detail.set_loop_info(loop_info) + self.loop_info.add_loop_info(self._current_cycle_detail) + self._current_cycle_detail.timers = cycle_timers + self._current_cycle_detail.complete_cycle() + self._cycle_history.append(self._current_cycle_detail) + + def print_cycle_info(self,cycle_timers): + # 记录循环信息和计时器结果 + timer_strings = [] + for name, elapsed in cycle_timers.items(): + formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒" + timer_strings.append(f"{name}: {formatted_time}") - async def _run_focus_chat(self): - """主循环,持续进行计划并可能回复消息,直到被外部取消。""" - try: - while True: # 主循环 - logger.debug(f"{self.log_prefix} 开始第{self._cycle_counter}次循环") - - # 检查关闭标志 - if self._shutting_down: - logger.info(f"{self.log_prefix} 检测到关闭标志,退出 Focus Chat 循环。") - break - - # 创建新的循环信息 - self._cycle_counter += 1 - self._current_cycle_detail = CycleDetail(self._cycle_counter) - self._current_cycle_detail.prefix = self.log_prefix - - # 初始化周期状态 - cycle_timers = {} - - # 执行规划和处理阶段 - try: - async with self._get_cycle_context(): - thinking_id = "tid" + str(round(time.time(), 2)) - self._current_cycle_detail.set_thinking_id(thinking_id) - - # 使用异步上下文管理器处理消息 - try: - async with global_prompt_manager.async_message_scope( - self.chat_stream.context.get_template_name() - ): - # 在上下文内部检查关闭状态 - if self._shutting_down: - logger.info(f"{self.log_prefix} 在处理上下文中检测到关闭信号,退出") - break - - logger.debug(f"模板 {self.chat_stream.context.get_template_name()}") - loop_info = await self._observe_process_plan_action_loop(cycle_timers, thinking_id) - - if loop_info["loop_action_info"]["command"] == "stop_focus_chat": - logger.info(f"{self.log_prefix} 麦麦决定停止专注聊天") - - # 如果设置了回调函数,则调用它 - if self.on_stop_focus_chat: - try: - await self.on_stop_focus_chat() - logger.info(f"{self.log_prefix} 成功调用回调函数处理停止专注聊天") - except Exception as e: - logger.error(f"{self.log_prefix} 调用停止专注聊天回调函数时出错: {e}") - logger.error(traceback.format_exc()) - break - - except asyncio.CancelledError: - logger.info(f"{self.log_prefix} 处理上下文时任务被取消") - break - except Exception as e: - logger.error(f"{self.log_prefix} 处理上下文时出错: {e}") - # 为当前循环设置错误状态,防止后续重复报错 - error_loop_info = { - "loop_plan_info": { - "action_result": { - "action_type": "error", - "action_data": {}, - }, - }, - "loop_action_info": { - "action_taken": False, - "reply_text": "", - "command": "", - "taken_time": time.time(), - }, - } - self._current_cycle_detail.set_loop_info(error_loop_info) - self._current_cycle_detail.complete_cycle() - - # 上下文处理失败,跳过当前循环 - await asyncio.sleep(1) - continue - - self._current_cycle_detail.set_loop_info(loop_info) - - self.loop_info.add_loop_info(self._current_cycle_detail) - - self._current_cycle_detail.timers = cycle_timers - - # 完成当前循环并保存历史 - self._current_cycle_detail.complete_cycle() - self._cycle_history.append(self._current_cycle_detail) - - # 记录循环信息和计时器结果 - timer_strings = [] - for name, elapsed in cycle_timers.items(): - formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒" - timer_strings.append(f"{name}: {formatted_time}") - - logger.info( - f"{self.log_prefix} 第{self._current_cycle_detail.cycle_id}次思考," - f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒, " - f"选择动作: {self._current_cycle_detail.loop_plan_info.get('action_result', {}).get('action_type', '未知动作')}" - + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "") - ) - - # 记录性能数据 - try: - action_result = self._current_cycle_detail.loop_plan_info.get("action_result", {}) - cycle_performance_data = { - "cycle_id": self._current_cycle_detail.cycle_id, - "action_type": action_result.get("action_type", "unknown"), - "total_time": self._current_cycle_detail.end_time - self._current_cycle_detail.start_time, - "step_times": cycle_timers.copy(), - "reasoning": action_result.get("reasoning", ""), - "success": self._current_cycle_detail.loop_action_info.get("action_taken", False), - } - self.performance_logger.record_cycle(cycle_performance_data) - except Exception as perf_e: - logger.warning(f"{self.log_prefix} 记录性能数据失败: {perf_e}") - - await asyncio.sleep(global_config.focus_chat.think_interval) - - except asyncio.CancelledError: - logger.info(f"{self.log_prefix} 循环处理时任务被取消") - break - except Exception as e: - logger.error(f"{self.log_prefix} 循环处理时出错: {e}") - logger.error(traceback.format_exc()) - - # 如果_current_cycle_detail存在但未完成,为其设置错误状态 - if self._current_cycle_detail and not hasattr(self._current_cycle_detail, "end_time"): - error_loop_info = { - "loop_plan_info": { - "action_result": { - "action_type": "error", - "action_data": {}, - "reasoning": f"循环处理失败: {e}", - }, - }, - "loop_action_info": { - "action_taken": False, - "reply_text": "", - "command": "", - "taken_time": time.time(), - }, - } - try: - self._current_cycle_detail.set_loop_info(error_loop_info) - self._current_cycle_detail.complete_cycle() - except Exception as inner_e: - logger.error(f"{self.log_prefix} 设置错误状态时出错: {inner_e}") - - await asyncio.sleep(1) # 出错后等待一秒再继续 - - except asyncio.CancelledError: - # 设置了关闭标志位后被取消是正常流程 - if not self._shutting_down: - logger.warning(f"{self.log_prefix} 麦麦Focus聊天模式意外被取消") - else: - logger.info(f"{self.log_prefix} 麦麦已离开Focus聊天模式") - except Exception as e: - logger.error(f"{self.log_prefix} 麦麦Focus聊天模式意外错误: {e}") - print(traceback.format_exc()) - - @contextlib.asynccontextmanager - async def _get_cycle_context(self): - """ - 循环周期的上下文管理器 - - 用于确保资源的正确获取和释放: - 1. 获取处理锁 - 2. 执行操作 - 3. 释放锁 - """ - acquired = False - try: - await self._processing_lock.acquire() - acquired = True - yield acquired - finally: - if acquired and self._processing_lock.locked(): - self._processing_lock.release() - - async def _observe_process_plan_action_loop(self, cycle_timers: dict, thinking_id: str) -> dict: - try: - loop_start_time = time.time() - await self.loop_info.observe() - - await self.relationship_builder.build_relation() - - # 顺序执行调整动作和处理器阶段 - # 第一步:动作修改 - with Timer("动作修改", cycle_timers): - try: - # 调用完整的动作修改流程 - await self.action_modifier.modify_actions( - loop_info=self.loop_info, - mode="focus", - ) - except Exception as e: - logger.error(f"{self.log_prefix} 动作修改失败: {e}") - # 继续执行,不中断流程 - - with Timer("规划器", cycle_timers): - plan_result = await self.action_planner.plan() - - loop_plan_info = { - "action_result": plan_result.get("action_result", {}), - } - - action_type, action_data, reasoning = ( - plan_result.get("action_result", {}).get("action_type", "error"), - plan_result.get("action_result", {}).get("action_data", {}), - plan_result.get("action_result", {}).get("reasoning", "未提供理由"), + logger.info( + f"{self.log_prefix} 第{self._current_cycle_detail.cycle_id}次思考," + f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒, " + f"选择动作: {self._current_cycle_detail.loop_plan_info.get('action_result', {}).get('action_type', '未知动作')}" + + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "") ) + - action_data["loop_start_time"] = loop_start_time + + async def _focus_mode_loopbody(self): + logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次循环") - if action_type == "reply": - action_str = "回复" - elif action_type == "no_reply": - action_str = "不回复" - else: - action_str = action_type + # 创建新的循环信息 + cycle_timers, thinking_id = self.start_cycle() - logger.debug(f"{self.log_prefix} 麦麦想要:'{action_str}',理由是:{reasoning}") + # 执行规划和处理阶段 + try: + async with global_prompt_manager.async_message_scope( + self.chat_stream.context.get_template_name() + ): - # 动作执行计时 - with Timer("动作执行", cycle_timers): - success, reply_text, command = await self._handle_action( - action_type, reasoning, action_data, cycle_timers, thinking_id + loop_start_time = time.time() + await self.loop_info.observe() + await self.relationship_builder.build_relation() + + # 第一步:动作修改 + with Timer("动作修改", cycle_timers): + try: + await self.action_modifier.modify_actions( + loop_info=self.loop_info, + mode="focus", + ) + except Exception as e: + logger.error(f"{self.log_prefix} 动作修改失败: {e}") + + with Timer("规划器", cycle_timers): + plan_result = await self.action_planner.plan() + + action_result = plan_result.get("action_result", {}) + action_type, action_data, reasoning = ( + action_result.get("action_type", "error"), + action_result.get("action_data", {}), + action_result.get("reasoning", "未提供理由"), ) - loop_action_info = { - "action_taken": success, - "reply_text": reply_text, - "command": command, - "taken_time": time.time(), + action_data["loop_start_time"] = loop_start_time + + # 动作执行计时 + with Timer("动作执行", cycle_timers): + success, reply_text, command = await self._handle_action( + action_type, reasoning, action_data, cycle_timers, thinking_id + ) + + loop_info = { + "loop_plan_info": { + "action_result": plan_result.get("action_result", {}), + }, + "loop_action_info": { + "action_taken": success, + "reply_text": reply_text, + "command": command, + "taken_time": time.time(), + }, } - loop_info = { - "loop_plan_info": loop_plan_info, - "loop_action_info": loop_action_info, - } + if loop_info["loop_action_info"]["command"] == "stop_focus_chat": + logger.info(f"{self.log_prefix} 麦麦决定停止专注聊天") + return False + #停止该聊天模式的循环 - return loop_info + self.end_cycle(loop_info,cycle_timers) + self.print_cycle_info(cycle_timers) + await asyncio.sleep(global_config.focus_chat.think_interval) + + return True + + + except asyncio.CancelledError: + logger.info(f"{self.log_prefix} focus循环任务被取消") + return False except Exception as e: - logger.error(f"{self.log_prefix} FOCUS聊天处理失败: {e}") + logger.error(f"{self.log_prefix} 循环处理时出错: {e}") logger.error(traceback.format_exc()) - return { - "loop_plan_info": { - "action_result": {"action_type": "error", "action_data": {}, "reasoning": f"处理失败: {e}"}, - }, - "loop_action_info": {"action_taken": False, "reply_text": "", "command": "", "taken_time": time.time()}, - } + + # 如果_current_cycle_detail存在但未完成,为其设置错误状态 + if self._current_cycle_detail and not hasattr(self._current_cycle_detail, "end_time"): + error_loop_info = ERROR_LOOP_INFO + try: + self._current_cycle_detail.set_loop_info(error_loop_info) + self._current_cycle_detail.complete_cycle() + except Exception as inner_e: + logger.error(f"{self.log_prefix} 设置错误状态时出错: {inner_e}") + + await asyncio.sleep(1) # 出错后等待一秒再继续\ + return False + + + async def _main_chat_loop(self): + """主循环,持续进行计划并可能回复消息,直到被外部取消。""" + try: + loop_mode = "focus" + loop_mode_loopbody = self._focus_mode_loopbody + + + while self.running: # 主循环 + success = await loop_mode_loopbody() + if not success: + break + + logger.info(f"{self.log_prefix} 麦麦已强制离开 {loop_mode} 聊天模式") + + + except asyncio.CancelledError: + # 设置了关闭标志位后被取消是正常流程 + logger.info(f"{self.log_prefix} 麦麦已强制离开 {loop_mode} 聊天模式") + except Exception as e: + logger.error(f"{self.log_prefix} 麦麦 {loop_mode} 聊天模式意外错误: {e}") + print(traceback.format_exc()) async def _handle_action( self, @@ -434,7 +315,6 @@ class HeartFChatting: thinking_id=thinking_id, chat_stream=self.chat_stream, log_prefix=self.log_prefix, - shutting_down=self._shutting_down, ) except Exception as e: logger.error(f"{self.log_prefix} 创建动作处理器时出错: {e}") @@ -453,40 +333,16 @@ class HeartFChatting: success, reply_text = result command = "" - # 检查action_data中是否有系统命令,优先使用系统命令 - if "_system_command" in action_data: - command = action_data["_system_command"] - logger.debug(f"{self.log_prefix} 从action_data中获取系统命令: {command}") - - # 新增:消息计数和疲惫检查 - if action == "reply" and success: - self._message_count += 1 - current_threshold = self._get_current_fatigue_threshold() - logger.info( - f"{self.log_prefix} 已发送第 {self._message_count} 条消息(动态阈值: {current_threshold}, exit_focus_threshold: {global_config.chat.exit_focus_threshold})" - ) - - # 检查是否达到疲惫阈值(只有在auto模式下才会自动退出) - if ( - global_config.chat.chat_mode == "auto" - and self._message_count >= current_threshold - and not self._fatigue_triggered - ): - self._fatigue_triggered = True - logger.info( - f"{self.log_prefix} [auto模式] 已发送 {self._message_count} 条消息,达到疲惫阈值 {current_threshold},麦麦感到疲惫了,准备退出专注聊天模式" + command = self._count_reply_and_exit_focus_chat(action,success) + + if reply_text == "timeout": + self.reply_timeout_count += 1 + if self.reply_timeout_count > 5: + logger.warning( + f"[{self.log_prefix} ] 连续回复超时次数过多,{global_config.chat.thinking_timeout}秒 内大模型没有返回有效内容,请检查你的api是否速度过慢或配置错误。建议不要使用推理模型,推理模型生成速度过慢。或者尝试拉高thinking_timeout参数,这可能导致回复时间过长。" ) - # 设置系统命令,在下次循环检查时触发退出 - command = "stop_focus_chat" - else: - if reply_text == "timeout": - self.reply_timeout_count += 1 - if self.reply_timeout_count > 5: - logger.warning( - f"[{self.log_prefix} ] 连续回复超时次数过多,{global_config.chat.thinking_timeout}秒 内大模型没有返回有效内容,请检查你的api是否速度过慢或配置错误。建议不要使用推理模型,推理模型生成速度过慢。或者尝试拉高thinking_timeout参数,这可能导致回复时间过长。" - ) - logger.warning(f"{self.log_prefix} 回复生成超时{global_config.chat.thinking_timeout}s,已跳过") - return False, "", "" + logger.warning(f"{self.log_prefix} 回复生成超时{global_config.chat.thinking_timeout}s,已跳过") + return False, "", "" return success, reply_text, command @@ -494,6 +350,33 @@ class HeartFChatting: logger.error(f"{self.log_prefix} 处理{action}时出错: {e}") traceback.print_exc() return False, "", "" + + def _count_reply_and_exit_focus_chat(self,action,success): + # 新增:消息计数和疲惫检查 + if action == "reply" and success: + self._message_count += 1 + current_threshold = self._get_current_fatigue_threshold() + logger.info( + f"{self.log_prefix} 已发送第 {self._message_count} 条消息(动态阈值: {current_threshold}, exit_focus_threshold: {global_config.chat.exit_focus_threshold})" + ) + + # 检查是否达到疲惫阈值(只有在auto模式下才会自动退出) + if ( + global_config.chat.chat_mode == "auto" + and self._message_count >= current_threshold + and not self._fatigue_triggered + ): + self._fatigue_triggered = True + logger.info( + f"{self.log_prefix} [auto模式] 已发送 {self._message_count} 条消息,达到疲惫阈值 {current_threshold},麦麦感到疲惫了,准备退出专注聊天模式" + ) + # 设置系统命令,在下次循环检查时触发退出 + command = "stop_focus_chat" + + return command + return "" + + def _get_current_fatigue_threshold(self) -> int: """动态获取当前的疲惫阈值,基于exit_focus_threshold配置 @@ -503,19 +386,6 @@ class HeartFChatting: """ return max(10, int(30 / global_config.chat.exit_focus_threshold)) - def get_message_count_info(self) -> dict: - """获取消息计数信息 - - Returns: - dict: 包含消息计数信息的字典 - """ - current_threshold = self._get_current_fatigue_threshold() - return { - "current_count": self._message_count, - "threshold": current_threshold, - "fatigue_triggered": self._fatigue_triggered, - "remaining": max(0, current_threshold - self._message_count), - } def reset_message_count(self): """重置消息计数器(用于重新启动focus模式时)""" @@ -526,7 +396,7 @@ class HeartFChatting: async def shutdown(self): """优雅关闭HeartFChatting实例,取消活动循环任务""" logger.info(f"{self.log_prefix} 正在关闭HeartFChatting...") - self._shutting_down = True # <-- 在开始关闭时设置标志位 + self.running = False # <-- 在开始关闭时设置标志位 # 记录最终的消息统计 if self._message_count > 0: @@ -549,34 +419,11 @@ class HeartFChatting: logger.info(f"{self.log_prefix} 没有活动的HeartFChatting循环任务") # 清理状态 - self._loop_active = False + self.running = False self._loop_task = None - if self._processing_lock.locked(): - self._processing_lock.release() - logger.warning(f"{self.log_prefix} 已释放处理锁") - - # 完成性能统计 - try: - self.performance_logger.finalize_session() - logger.info(f"{self.log_prefix} 性能统计已完成") - except Exception as e: - logger.warning(f"{self.log_prefix} 完成性能统计时出错: {e}") # 重置消息计数器,为下次启动做准备 self.reset_message_count() logger.info(f"{self.log_prefix} HeartFChatting关闭完成") - def get_cycle_history(self, last_n: Optional[int] = None) -> List[Dict[str, Any]]: - """获取循环历史记录 - - 参数: - last_n: 获取最近n个循环的信息,如果为None则获取所有历史记录 - - 返回: - List[Dict[str, Any]]: 循环历史记录列表 - """ - history = list(self._cycle_history) - if last_n is not None: - history = history[-last_n:] - return [cycle.to_dict() for cycle in history] diff --git a/src/chat/message_receive/message.py b/src/chat/message_receive/message.py index 710d2525a..8cc06573c 100644 --- a/src/chat/message_receive/message.py +++ b/src/chat/message_receive/message.py @@ -441,3 +441,55 @@ class MessageSet: def __len__(self) -> int: return len(self.messages) + + +def message_recv_from_dict(message_dict: dict) -> MessageRecv: + return MessageRecv( + + message_dict + + ) + +def message_from_db_dict(db_dict: dict) -> MessageRecv: + """从数据库字典创建MessageRecv实例""" + # 转换扁平的数据库字典为嵌套结构 + message_info_dict = { + "platform": db_dict.get("chat_info_platform"), + "message_id": db_dict.get("message_id"), + "time": db_dict.get("time"), + "group_info": { + "platform": db_dict.get("chat_info_group_platform"), + "group_id": db_dict.get("chat_info_group_id"), + "group_name": db_dict.get("chat_info_group_name"), + }, + "user_info": { + "platform": db_dict.get("user_platform"), + "user_id": db_dict.get("user_id"), + "user_nickname": db_dict.get("user_nickname"), + "user_cardname": db_dict.get("user_cardname"), + }, + } + + processed_text = db_dict.get("processed_plain_text", "") + + # 构建 MessageRecv 需要的字典 + recv_dict = { + "message_info": message_info_dict, + "message_segment": {"type": "text", "data": processed_text}, # 从纯文本重建消息段 + "raw_message": None, # 数据库中未存储原始消息 + "processed_plain_text": processed_text, + "detailed_plain_text": db_dict.get("detailed_plain_text", ""), + } + + # 创建 MessageRecv 实例 + msg = MessageRecv(recv_dict) + + # 从数据库字典中填充其他可选字段 + msg.interest_value = db_dict.get("interest_value") + msg.is_mentioned = db_dict.get("is_mentioned") + msg.priority_mode = db_dict.get("priority_mode", "interest") + msg.priority_info = db_dict.get("priority_info") + msg.is_emoji = db_dict.get("is_emoji", False) + msg.is_picid = db_dict.get("is_picid", False) + + return msg \ No newline at end of file diff --git a/src/chat/message_receive/storage.py b/src/chat/message_receive/storage.py index 998e06f21..9524a2779 100644 --- a/src/chat/message_receive/storage.py +++ b/src/chat/message_receive/storage.py @@ -38,11 +38,21 @@ class MessageStorage: else: filtered_display_message = "" interest_value = 0 + is_mentioned = False reply_to = message.reply_to + priority_mode = "" + priority_info = {} + is_emoji = False + is_picid = False else: filtered_display_message = "" interest_value = message.interest_value + is_mentioned = message.is_mentioned reply_to = "" + priority_mode = message.priority_mode + priority_info = message.priority_info + is_emoji = message.is_emoji + is_picid = message.is_picid chat_info_dict = chat_stream.to_dict() user_info_dict = message.message_info.user_info.to_dict() @@ -61,6 +71,7 @@ class MessageStorage: chat_id=chat_stream.stream_id, # Flattened chat_info reply_to=reply_to, + is_mentioned=is_mentioned, chat_info_stream_id=chat_info_dict.get("stream_id"), chat_info_platform=chat_info_dict.get("platform"), chat_info_user_platform=user_info_from_chat.get("platform"), @@ -82,6 +93,10 @@ class MessageStorage: display_message=filtered_display_message, memorized_times=message.memorized_times, interest_value=interest_value, + priority_mode=priority_mode, + priority_info=priority_info, + is_emoji=is_emoji, + is_picid=is_picid, ) except Exception: logger.exception("存储消息失败") diff --git a/src/chat/normal_chat/priority_manager.py b/src/chat/normal_chat/priority_manager.py index 9e1ef76c2..facbecd23 100644 --- a/src/chat/normal_chat/priority_manager.py +++ b/src/chat/normal_chat/priority_manager.py @@ -1,8 +1,9 @@ import time import heapq import math +import json from typing import List, Dict, Optional -from ..message_receive.message import MessageRecv + from src.common.logger import get_logger logger = get_logger("normal_chat") @@ -11,8 +12,8 @@ logger = get_logger("normal_chat") class PrioritizedMessage: """带有优先级的消息对象""" - def __init__(self, message: MessageRecv, interest_scores: List[float], is_vip: bool = False): - self.message = message + def __init__(self, message_data: dict, interest_scores: List[float], is_vip: bool = False): + self.message_data = message_data self.arrival_time = time.time() self.interest_scores = interest_scores self.is_vip = is_vip @@ -38,25 +39,28 @@ class PriorityManager: 管理消息队列,根据优先级选择消息进行处理。 """ - def __init__(self, interest_dict: Dict[str, float], normal_queue_max_size: int = 5): + def __init__(self, normal_queue_max_size: int = 5): self.vip_queue: List[PrioritizedMessage] = [] # VIP 消息队列 (最大堆) self.normal_queue: List[PrioritizedMessage] = [] # 普通消息队列 (最大堆) - self.interest_dict = interest_dict if interest_dict is not None else {} self.normal_queue_max_size = normal_queue_max_size - def _get_interest_score(self, user_id: str) -> float: - """获取用户的兴趣分,默认为1.0""" - return self.interest_dict.get("interests", {}).get(user_id, 1.0) - - def add_message(self, message: MessageRecv, interest_score: Optional[float] = None): + def add_message(self, message_data: dict, interest_score: Optional[float] = None): """ 添加新消息到合适的队列中。 """ - user_id = message.message_info.user_info.user_id - is_vip = message.priority_info.get("message_type") == "vip" if message.priority_info else False - message_priority = message.priority_info.get("message_priority", 0.0) if message.priority_info else 0.0 + user_id = message_data.get("user_id") + + priority_info_raw = message_data.get("priority_info") + priority_info = {} + if isinstance(priority_info_raw, str): + priority_info = json.loads(priority_info_raw) + elif isinstance(priority_info_raw, dict): + priority_info = priority_info_raw - p_message = PrioritizedMessage(message, [interest_score, message_priority], is_vip) + is_vip = priority_info.get("message_type") == "vip" + message_priority = priority_info.get("message_priority", 0.0) + + p_message = PrioritizedMessage(message_data, [interest_score, message_priority], is_vip) if is_vip: heapq.heappush(self.vip_queue, p_message) @@ -75,7 +79,7 @@ class PriorityManager: f"消息来自普通用户 {user_id}, 已添加到普通队列. 当前普通队列长度: {len(self.normal_queue)}" ) - def get_highest_priority_message(self) -> Optional[MessageRecv]: + def get_highest_priority_message(self) -> Optional[dict]: """ 从VIP和普通队列中获取当前最高优先级的消息。 """ @@ -93,9 +97,9 @@ class PriorityManager: normal_msg = self.normal_queue[0] if self.normal_queue else None if vip_msg: - return heapq.heappop(self.vip_queue).message + return heapq.heappop(self.vip_queue).message_data elif normal_msg: - return heapq.heappop(self.normal_queue).message + return heapq.heappop(self.normal_queue).message_data else: return None diff --git a/src/chat/normal_chat/willing/willing_manager.py b/src/chat/normal_chat/willing/willing_manager.py index 0fa701f94..8b9191f74 100644 --- a/src/chat/normal_chat/willing/willing_manager.py +++ b/src/chat/normal_chat/willing/willing_manager.py @@ -91,19 +91,19 @@ class BaseWillingManager(ABC): self.lock = asyncio.Lock() self.logger = logger - def setup(self, message: MessageRecv, chat: ChatStream, is_mentioned_bot: bool, interested_rate: float): + def setup(self, message: dict, chat: ChatStream): person_id = PersonInfoManager.get_person_id(chat.platform, chat.user_info.user_id) - self.ongoing_messages[message.message_info.message_id] = WillingInfo( + self.ongoing_messages[message.get("message_id")] = WillingInfo( message=message, chat=chat, person_info_manager=get_person_info_manager(), chat_id=chat.stream_id, person_id=person_id, group_info=chat.group_info, - is_mentioned_bot=is_mentioned_bot, - is_emoji=message.is_emoji, - is_picid=message.is_picid, - interested_rate=interested_rate, + is_mentioned_bot=message.get("is_mentioned_bot", False), + is_emoji=message.get("is_emoji", False), + is_picid=message.get("is_picid", False), + interested_rate=message.get("interested_rate", 0), ) def delete(self, message_id: str): diff --git a/src/common/database/database_model.py b/src/common/database/database_model.py index 3485fedeb..1c19dcc3f 100644 --- a/src/common/database/database_model.py +++ b/src/common/database/database_model.py @@ -130,6 +130,7 @@ class Messages(BaseModel): reply_to = TextField(null=True) interest_value = DoubleField(null=True) + is_mentioned = BooleanField(null=True) # 从 chat_info 扁平化而来的字段 chat_info_stream_id = TextField() @@ -155,6 +156,13 @@ class Messages(BaseModel): detailed_plain_text = TextField(null=True) # 详细的纯文本消息 memorized_times = IntegerField(default=0) # 被记忆的次数 + priority_mode = TextField(null=True) + priority_info = TextField(null=True) + + additional_config = TextField(null=True) + is_emoji = BooleanField(default=False) + is_picid = BooleanField(default=False) + class Meta: # database = db # 继承自 BaseModel table_name = "messages"