diff --git a/src/chat/affinity_flow/__init__.py b/src/chat/affinity_flow/__init__.py new file mode 100644 index 000000000..ae0f33fec --- /dev/null +++ b/src/chat/affinity_flow/__init__.py @@ -0,0 +1,8 @@ +""" +亲和力流模块初始化文件 +提供全局的AFC管理器实例 +""" + +from src.chat.affinity_flow.afc_manager import afc_manager + +__all__ = ['afc_manager', 'AFCManager', 'AffinityFlowChatter'] \ No newline at end of file diff --git a/src/chat/affinity_flow/afc_manager.py b/src/chat/affinity_flow/afc_manager.py index 07f3b4311..38c50816e 100644 --- a/src/chat/affinity_flow/afc_manager.py +++ b/src/chat/affinity_flow/afc_manager.py @@ -135,4 +135,6 @@ class AFCManager: """更新兴趣关键词""" if stream_id in self.affinity_flow_chatters: self.affinity_flow_chatters[stream_id].update_interest_keywords(new_keywords) - logger.info(f"已更新聊天流 {stream_id} 的兴趣关键词: {list(new_keywords.keys())}") \ No newline at end of file + logger.info(f"已更新聊天流 {stream_id} 的兴趣关键词: {list(new_keywords.keys())}") + +afc_manager = AFCManager() \ No newline at end of file diff --git a/src/chat/chat_loop/cycle_processor.py b/src/chat/chat_loop/cycle_processor.py deleted file mode 100644 index 7dcf34bd3..000000000 --- a/src/chat/chat_loop/cycle_processor.py +++ /dev/null @@ -1,454 +0,0 @@ -import asyncio -import time -import traceback -import math -import random -from typing import Dict, Any, Tuple - -from src.chat.utils.timer_calculator import Timer -from src.common.logger import get_logger -from src.config.config import global_config -from src.chat.planner_actions.planner import ActionPlanner -from src.chat.planner_actions.action_modifier import ActionModifier -from src.person_info.person_info import get_person_info_manager -from src.plugin_system.apis import database_api, generator_api -from src.plugin_system.base.component_types import ChatMode -from src.mais4u.constant_s4u import ENABLE_S4U -from src.chat.chat_loop.hfc_utils import send_typing, stop_typing -from .hfc_context import HfcContext -from .response_handler import ResponseHandler -from .cycle_tracker import CycleTracker - -# 日志记录器 -logger = get_logger("hfc.processor") - - -class CycleProcessor: - """ - 循环处理器类,负责处理单次思考循环的逻辑。 - """ - def __init__(self, context: HfcContext, response_handler: ResponseHandler, cycle_tracker: CycleTracker): - """ - 初始化循环处理器 - - Args: - context: HFC聊天上下文对象,包含聊天流、能量值等信息 - response_handler: 响应处理器,负责生成和发送回复 - cycle_tracker: 循环跟踪器,负责记录和管理每次思考循环的信息 - """ - self.context = context - self.response_handler = response_handler - self.cycle_tracker = cycle_tracker - self.action_planner = ActionPlanner(chat_id=self.context.stream_id, action_manager=self.context.action_manager) - self.action_modifier = ActionModifier( - action_manager=self.context.action_manager, chat_id=self.context.stream_id - ) - - self.log_prefix = self.context.log_prefix - - async def _send_and_store_reply( - self, - response_set, - loop_start_time, - action_message, - cycle_timers: Dict[str, float], - thinking_id, - actions, - ) -> Tuple[Dict[str, Any], str, Dict[str, float]]: - """ - 发送并存储回复信息 - - Args: - response_set: 回复内容集合 - loop_start_time: 循环开始时间 - action_message: 动作消息 - cycle_timers: 循环计时器 - thinking_id: 思考ID - actions: 动作列表 - - Returns: - Tuple[Dict[str, Any], str, Dict[str, float]]: 循环信息, 回复文本, 循环计时器 - """ - # 发送回复 - with Timer("回复发送", cycle_timers): - reply_text = await self.response_handler.send_response(response_set, loop_start_time, action_message) - - # 存储reply action信息 - person_info_manager = get_person_info_manager() - - # 获取 platform,如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值 - platform = action_message.get("chat_info_platform") - if platform is None: - platform = getattr(self.context.chat_stream, "platform", "unknown") - - # 获取用户信息并生成回复提示 - person_id = person_info_manager.get_person_id( - platform, - action_message.get("chat_info_user_id", ""), - ) - person_name = await person_info_manager.get_value(person_id, "person_name") - action_prompt_display = f"你对{person_name}进行了回复:{reply_text}" - - # 存储动作信息到数据库 - await database_api.store_action_info( - chat_stream=self.context.chat_stream, - action_build_into_prompt=False, - action_prompt_display=action_prompt_display, - action_done=True, - thinking_id=thinking_id, - action_data={"reply_text": reply_text}, - action_name="reply", - ) - - # 构建循环信息 - loop_info: Dict[str, Any] = { - "loop_plan_info": { - "action_result": actions, - }, - "loop_action_info": { - "action_taken": True, - "reply_text": reply_text, - "command": "", - "taken_time": time.time(), - }, - } - - return loop_info, reply_text, cycle_timers - - async def observe(self, interest_value: float = 0.0) -> str: - """ - 观察和处理单次思考循环的核心方法 - - Args: - interest_value: 兴趣值 - - Returns: - str: 动作类型 - - 功能说明: - - 开始新的思考循环并记录计时 - - 修改可用动作并获取动作列表 - - 根据聊天模式和提及情况决定是否跳过规划器 - - 执行动作规划或直接回复 - - 根据动作类型分发到相应的处理方法 - """ - action_type = "no_action" - reply_text = "" # 初始化reply_text变量,避免UnboundLocalError - - # 使用sigmoid函数将interest_value转换为概率 - # 当interest_value为0时,概率接近0(使用Focus模式) - # 当interest_value很高时,概率接近1(使用Normal模式) - def calculate_normal_mode_probability(interest_val: float) -> float: - """ - 计算普通模式的概率 - - Args: - interest_val: 兴趣值 - - Returns: - float: 概率 - """ - # 使用sigmoid函数,调整参数使概率分布更合理 - # 当interest_value = 0时,概率约为0.1 - # 当interest_value = 1时,概率约为0.5 - # 当interest_value = 2时,概率约为0.8 - # 当interest_value = 3时,概率约为0.95 - k = 2.0 # 控制曲线陡峭程度 - x0 = 1.0 # 控制曲线中心点 - return 1.0 / (1.0 + math.exp(-k * (interest_val - x0))) - - # 计算普通模式概率 - normal_mode_probability = ( - calculate_normal_mode_probability(interest_value) - * 0.5 - / global_config.chat.get_current_talk_frequency(self.context.stream_id) - ) - - # 根据概率决定使用哪种模式 - if random.random() < normal_mode_probability: - mode = ChatMode.NORMAL - logger.info( - f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f},选择Normal planner模式" - ) - else: - mode = ChatMode.FOCUS - logger.info( - f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f},选择Focus planner模式" - ) - - # 开始新的思考循环 - cycle_timers, thinking_id = self.cycle_tracker.start_cycle() - logger.info(f"{self.log_prefix} 开始第{self.context.cycle_counter}次思考") - - if ENABLE_S4U and self.context.chat_stream and self.context.chat_stream.user_info: - await send_typing(self.context.chat_stream.user_info.user_id) - - loop_start_time = time.time() - - # 第一步:动作修改 - with Timer("动作修改", cycle_timers): - try: - await self.action_modifier.modify_actions() - available_actions = self.context.action_manager.get_using_actions() - except Exception as e: - logger.error(f"{self.context.log_prefix} 动作修改失败: {e}") - available_actions = {} - - # 规划动作 - from src.plugin_system.core.event_manager import event_manager - from src.plugin_system import EventType - - result = await event_manager.trigger_event( - EventType.ON_PLAN, permission_group="SYSTEM", stream_id=self.context.chat_stream - ) - if result and not result.all_continue_process(): - raise UserWarning(f"插件{result.get_summary().get('stopped_handlers', '')}于规划前中断了内容生成") - with Timer("规划器", cycle_timers): - actions, _ = await self.action_planner.plan(mode=mode) - - async def execute_action(action_info): - """执行单个动作的通用函数""" - try: - if action_info["action_type"] == "no_action": - return {"action_type": "no_action", "success": True, "reply_text": "", "command": ""} - if action_info["action_type"] == "no_reply": - # 直接处理no_reply逻辑,不再通过动作系统 - reason = action_info.get("reasoning", "选择不回复") - logger.info(f"{self.log_prefix} 选择不回复,原因: {reason}") - - # 存储no_reply信息到数据库 - await database_api.store_action_info( - chat_stream=self.context.chat_stream, - action_build_into_prompt=False, - action_prompt_display=reason, - action_done=True, - thinking_id=thinking_id, - action_data={"reason": reason}, - action_name="no_reply", - ) - - return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""} - elif action_info["action_type"] != "reply" and action_info["action_type"] != "no_action": - # 执行普通动作 - with Timer("动作执行", cycle_timers): - success, reply_text, command = await self._handle_action( - action_info["action_type"], - action_info["reasoning"], - action_info["action_data"], - cycle_timers, - thinking_id, - action_info["action_message"], - ) - return { - "action_type": action_info["action_type"], - "success": success, - "reply_text": reply_text, - "command": command, - } - else: - # 生成回复 - try: - success, response_set, _ = await generator_api.generate_reply( - chat_stream=self.context.chat_stream, - reply_message=action_info["action_message"], - available_actions=available_actions, - enable_tool=global_config.tool.enable_tool, - request_type="chat.replyer", - from_plugin=False, - ) - if not success or not response_set: - logger.info( - f"对 {action_info['action_message'].get('processed_plain_text')} 的回复生成失败" - ) - return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} - except asyncio.CancelledError: - logger.debug(f"{self.log_prefix} 并行执行:回复生成任务已被取消") - return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} - - # 发送并存储回复 - loop_info, reply_text, cycle_timers_reply = await self._send_and_store_reply( - response_set, - loop_start_time, - action_info["action_message"], - cycle_timers, - thinking_id, - actions, - ) - return {"action_type": "reply", "success": True, "reply_text": reply_text, "loop_info": loop_info} - except Exception as e: - logger.error(f"{self.log_prefix} 执行动作时出错: {e}") - logger.error(f"{self.log_prefix} 错误信息: {traceback.format_exc()}") - return { - "action_type": action_info["action_type"], - "success": False, - "reply_text": "", - "loop_info": None, - "error": str(e), - } - - # 分离 reply 动作和其他动作 - reply_actions = [a for a in actions if a.get("action_type") == "reply"] - other_actions = [a for a in actions if a.get("action_type") != "reply"] - - reply_loop_info = None - reply_text_from_reply = "" - other_actions_results = [] - - # 1. 首先串行执行所有 reply 动作(通常只有一个) - if reply_actions: - logger.info(f"{self.log_prefix} 正在执行文本回复...") - for action in reply_actions: - action_message = action.get("action_message") - if not action_message: - logger.warning(f"{self.log_prefix} reply 动作缺少 action_message,跳过") - continue - - # 检查是否是空的DatabaseMessages对象 - if hasattr(action_message, 'chat_info') and hasattr(action_message.chat_info, 'user_info'): - target_user_id = action_message.chat_info.user_info.user_id - else: - # 如果是字典格式,使用原来的方式 - target_user_id = action_message.get("chat_info_user_id", "") - - if not target_user_id: - logger.warning(f"{self.log_prefix} reply 动作的 action_message 缺少用户ID,跳过") - continue - - if target_user_id == global_config.bot.qq_account and not global_config.chat.allow_reply_self: - logger.warning("选取的reply的目标为bot自己,跳过reply action") - continue - result = await execute_action(action) - if isinstance(result, Exception): - logger.error(f"{self.log_prefix} 回复动作执行异常: {result}") - continue - if result.get("success"): - reply_loop_info = result.get("loop_info") - reply_text_from_reply = result.get("reply_text", "") - else: - logger.warning(f"{self.log_prefix} 回复动作执行失败") - - # 2. 然后并行执行所有其他动作 - if other_actions: - logger.info(f"{self.log_prefix} 正在执行附加动作: {[a.get('action_type') for a in other_actions]}") - other_action_tasks = [asyncio.create_task(execute_action(action)) for action in other_actions] - results = await asyncio.gather(*other_action_tasks, return_exceptions=True) - for i, result in enumerate(results): - if isinstance(result, BaseException): - logger.error(f"{self.log_prefix} 附加动作执行异常: {result}") - continue - other_actions_results.append(result) - - # 构建最终的循环信息 - if reply_loop_info: - loop_info = reply_loop_info - # 将其他动作的结果合并到loop_info中 - if "other_actions" not in loop_info["loop_action_info"]: - loop_info["loop_action_info"]["other_actions"] = [] - loop_info["loop_action_info"]["other_actions"].extend(other_actions_results) - reply_text = reply_text_from_reply - else: - # 没有回复信息,构建纯动作的loop_info - # 即使没有回复,也要正确处理其他动作 - final_action_taken = any(res.get("success", False) for res in other_actions_results) - final_reply_text = " ".join(res.get("reply_text", "") for res in other_actions_results if res.get("reply_text")) - final_command = " ".join(res.get("command", "") for res in other_actions_results if res.get("command")) - - loop_info = { - "loop_plan_info": { - "action_result": actions, - }, - "loop_action_info": { - "action_taken": final_action_taken, - "reply_text": final_reply_text, - "command": final_command, - "taken_time": time.time(), - "other_actions": other_actions_results, - }, - } - reply_text = final_reply_text - - # 停止正在输入状态 - if ENABLE_S4U: - await stop_typing() - - # 结束循环 - self.context.chat_instance.cycle_tracker.end_cycle(loop_info, cycle_timers) - self.context.chat_instance.cycle_tracker.print_cycle_info(cycle_timers) - - action_type = actions[0]["action_type"] if actions else "no_action" - return action_type - - async def _handle_action( - self, action, reasoning, action_data, cycle_timers, thinking_id, action_message - ) -> tuple[bool, str, str]: - """ - 处理具体的动作执行 - - Args: - action: 动作名称 - reasoning: 执行理由 - action_data: 动作数据 - cycle_timers: 循环计时器 - thinking_id: 思考ID - action_message: 动作消息 - - Returns: - tuple: (执行是否成功, 回复文本, 命令文本) - - 功能说明: - - 创建对应的动作处理器 - - 执行动作并捕获异常 - - 返回执行结果供上级方法整合 - """ - if not self.context.chat_stream: - return False, "", "" - try: - # 创建动作处理器 - action_handler = self.context.action_manager.create_action( - action_name=action, - action_data=action_data, - reasoning=reasoning, - cycle_timers=cycle_timers, - thinking_id=thinking_id, - chat_stream=self.context.chat_stream, - log_prefix=self.context.log_prefix, - action_message=action_message, - ) - if not action_handler: - # 动作处理器创建失败,尝试回退机制 - logger.warning(f"{self.context.log_prefix} 创建动作处理器失败: {action},尝试回退方案") - - # 获取当前可用的动作 - available_actions = self.context.action_manager.get_using_actions() - fallback_action = None - - # 回退优先级:reply > 第一个可用动作 - if "reply" in available_actions: - fallback_action = "reply" - elif available_actions: - fallback_action = list(available_actions.keys())[0] - - if fallback_action and fallback_action != action: - logger.info(f"{self.context.log_prefix} 使用回退动作: {fallback_action}") - action_handler = self.context.action_manager.create_action( - action_name=fallback_action, - action_data=action_data, - reasoning=f"原动作'{action}'不可用,自动回退。{reasoning}", - cycle_timers=cycle_timers, - thinking_id=thinking_id, - chat_stream=self.context.chat_stream, - log_prefix=self.context.log_prefix, - action_message=action_message, - ) - - if not action_handler: - logger.error(f"{self.context.log_prefix} 回退方案也失败,无法创建任何动作处理器") - return False, "", "" - - # 执行动作 - success, reply_text = await action_handler.handle_action() - return success, reply_text, "" - except Exception as e: - logger.error(f"{self.context.log_prefix} 处理{action}时出错: {e}") - traceback.print_exc() - return False, "", "" diff --git a/src/chat/chat_loop/cycle_tracker.py b/src/chat/chat_loop/cycle_tracker.py deleted file mode 100644 index 1f45c4caf..000000000 --- a/src/chat/chat_loop/cycle_tracker.py +++ /dev/null @@ -1,114 +0,0 @@ -import time -from typing import Dict, Any, Tuple - -from src.common.logger import get_logger -from src.chat.chat_loop.hfc_utils import CycleDetail -from .hfc_context import HfcContext - -logger = get_logger("hfc") - - -class CycleTracker: - def __init__(self, context: HfcContext): - """ - 初始化循环跟踪器 - - Args: - context: HFC聊天上下文对象 - - 功能说明: - - 负责跟踪和记录每次思考循环的详细信息 - - 管理循环的开始、结束和信息存储 - """ - self.context = context - - def start_cycle(self, is_proactive: bool = False) -> Tuple[Dict[str, float], str]: - """ - 开始新的思考循环 - - Args: - is_proactive: 标记这个循环是否由主动思考发起 - - Returns: - tuple: (循环计时器字典, 思考ID字符串) - - 功能说明: - - 增加循环计数器 - - 创建新的循环详情对象 - - 生成唯一的思考ID - - 初始化循环计时器 - """ - if not is_proactive: - self.context.cycle_counter += 1 - - cycle_id = self.context.cycle_counter if not is_proactive else f"{self.context.cycle_counter}.p" - self.context.current_cycle_detail = CycleDetail(cycle_id) - self.context.current_cycle_detail.thinking_id = f"tid{str(round(time.time(), 2))}" - cycle_timers = {} - return cycle_timers, self.context.current_cycle_detail.thinking_id - - def end_cycle(self, loop_info: Dict[str, Any], cycle_timers: Dict[str, float]): - """ - 结束当前思考循环 - - Args: - loop_info: 循环信息,包含规划和动作信息 - cycle_timers: 循环计时器,记录各阶段耗时 - - 功能说明: - - 设置循环详情的完整信息 - - 将当前循环加入历史记录 - - 记录计时器和结束时间 - - 打印循环统计信息 - """ - if self.context.current_cycle_detail: - self.context.current_cycle_detail.set_loop_info(loop_info) - self.context.history_loop.append(self.context.current_cycle_detail) - self.context.current_cycle_detail.timers = cycle_timers - self.context.current_cycle_detail.end_time = time.time() - self.print_cycle_info(cycle_timers) - - def print_cycle_info(self, cycle_timers: Dict[str, float]): - """ - 打印循环统计信息 - - Args: - cycle_timers: 循环计时器字典 - - 功能说明: - - 格式化各阶段的耗时信息 - - 计算总体循环持续时间 - - 输出详细的性能统计日志 - - 显示选择的动作类型 - """ - if not self.context.current_cycle_detail: - return - - timer_strings = [] - for name, elapsed in cycle_timers.items(): - formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒" - timer_strings.append(f"{name}: {formatted_time}") - - # 获取动作类型,兼容新旧格式 - # 获取动作类型 - action_type = "未知动作" - if self.context.current_cycle_detail: - loop_plan_info = self.context.current_cycle_detail.loop_plan_info - actions = loop_plan_info.get("action_result") - - if isinstance(actions, list) and actions: - # 从actions列表中提取所有action_type - action_types = [a.get("action_type", "未知") for a in actions] - action_type = ", ".join(action_types) - elif isinstance(actions, dict): - # 兼容旧格式 - action_type = actions.get("action_type", "未知动作") - - - if self.context.current_cycle_detail.end_time and self.context.current_cycle_detail.start_time: - duration = self.context.current_cycle_detail.end_time - self.context.current_cycle_detail.start_time - logger.info( - f"{self.context.log_prefix} 第{self.context.current_cycle_detail.cycle_id}次思考," - f"耗时: {duration:.1f}秒, " - f"选择动作: {action_type}" + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "") - ) diff --git a/src/chat/chat_loop/energy_manager.py b/src/chat/chat_loop/energy_manager.py deleted file mode 100644 index 2eb7e7265..000000000 --- a/src/chat/chat_loop/energy_manager.py +++ /dev/null @@ -1,162 +0,0 @@ -import asyncio -import time -from typing import Optional -from src.common.logger import get_logger -from src.config.config import global_config -from .hfc_context import HfcContext -from src.chat.chat_loop.sleep_manager import sleep_manager -logger = get_logger("hfc") - - -class EnergyManager: - def __init__(self, context: HfcContext): - """ - 初始化能量管理器 - - Args: - context: HFC聊天上下文对象 - - 功能说明: - - 管理聊天机器人的能量值系统 - - 根据聊天模式自动调整能量消耗 - - 控制能量值的衰减和记录 - """ - self.context = context - self._energy_task: Optional[asyncio.Task] = None - self.last_energy_log_time = 0 - self.energy_log_interval = 90 - - async def start(self): - """ - 启动能量管理器 - - 功能说明: - - 检查运行状态,避免重复启动 - - 创建能量循环异步任务 - - 设置任务完成回调 - - 记录启动日志 - """ - if self.context.running and not self._energy_task: - self._energy_task = asyncio.create_task(self._energy_loop()) - self._energy_task.add_done_callback(self._handle_energy_completion) - logger.info(f"{self.context.log_prefix} 能量管理器已启动") - - async def stop(self): - """ - 停止能量管理器 - - 功能说明: - - 取消正在运行的能量循环任务 - - 等待任务完全停止 - - 记录停止日志 - """ - if self._energy_task and not self._energy_task.done(): - self._energy_task.cancel() - await asyncio.sleep(0) - logger.info(f"{self.context.log_prefix} 能量管理器已停止") - - async def _energy_loop(self): - """ - 能量与睡眠压力管理的主循环 - - 功能说明: - - 每10秒执行一次能量更新 - - 根据群聊配置设置固定的聊天模式和能量值 - - 在自动模式下根据聊天模式进行能量衰减 - - NORMAL模式每次衰减0.3,FOCUS模式每次衰减0.6 - - 确保能量值不低于0.3的最小值 - """ - while self.context.running: - await asyncio.sleep(10) - - if not self.context.chat_stream: - continue - - # 判断当前是否为睡眠时间 - is_sleeping = sleep_manager.SleepManager().is_sleeping() - - if is_sleeping: - # 睡眠中:减少睡眠压力 - decay_per_10s = global_config.sleep_system.sleep_pressure_decay_rate / 6 - self.context.sleep_pressure -= decay_per_10s - self.context.sleep_pressure = max(self.context.sleep_pressure, 0) - self._log_sleep_pressure_change("睡眠压力释放") - self.context.save_context_state() - else: - # 清醒时:处理能量衰减 - is_group_chat = self.context.chat_stream.group_info is not None - if is_group_chat: - self.context.energy_value = 25 - - await asyncio.sleep(12) - self.context.energy_value -= 0.5 - self.context.energy_value = max(self.context.energy_value, 0.3) - - self._log_energy_change("能量值衰减") - self.context.save_context_state() - - def _should_log_energy(self) -> bool: - """ - 判断是否应该记录能量变化日志 - - Returns: - bool: 如果距离上次记录超过间隔时间则返回True - - 功能说明: - - 控制能量日志的记录频率,避免日志过于频繁 - - 默认间隔90秒记录一次详细日志 - - 其他时间使用调试级别日志 - """ - current_time = time.time() - if current_time - self.last_energy_log_time >= self.energy_log_interval: - self.last_energy_log_time = current_time - return True - return False - - def increase_sleep_pressure(self): - """ - 在执行动作后增加睡眠压力 - """ - increment = global_config.sleep_system.sleep_pressure_increment - self.context.sleep_pressure += increment - self.context.sleep_pressure = min(self.context.sleep_pressure, 100.0) # 设置一个100的上限 - self._log_sleep_pressure_change("执行动作,睡眠压力累积") - self.context.save_context_state() - - def _log_energy_change(self, action: str, reason: str = ""): - """ - 记录能量变化日志 - - Args: - action: 能量变化的动作描述 - reason: 可选的变化原因 - - 功能说明: - - 根据时间间隔决定使用info还是debug级别的日志 - - 格式化能量值显示(保留一位小数) - - 可选择性地包含变化原因 - """ - if self._should_log_energy(): - log_message = f"{self.context.log_prefix} {action},当前能量值:{self.context.energy_value:.1f}" - if reason: - log_message = ( - f"{self.context.log_prefix} {action},{reason},当前能量值:{self.context.energy_value:.1f}" - ) - logger.info(log_message) - else: - log_message = f"{self.context.log_prefix} {action},当前能量值:{self.context.energy_value:.1f}" - if reason: - log_message = ( - f"{self.context.log_prefix} {action},{reason},当前能量值:{self.context.energy_value:.1f}" - ) - logger.debug(log_message) - - def _log_sleep_pressure_change(self, action: str): - """ - 记录睡眠压力变化日志 - """ - # 使用与能量日志相同的频率控制 - if self._should_log_energy(): - logger.info(f"{self.context.log_prefix} {action},当前睡眠压力:{self.context.sleep_pressure:.1f}") - else: - logger.debug(f"{self.context.log_prefix} {action},当前睡眠压力:{self.context.sleep_pressure:.1f}") diff --git a/src/chat/chat_loop/heartFC_chat.py b/src/chat/chat_loop/heartFC_chat.py deleted file mode 100644 index 6f63cff1b..000000000 --- a/src/chat/chat_loop/heartFC_chat.py +++ /dev/null @@ -1,614 +0,0 @@ -import asyncio -import time -import traceback -import random -from typing import Optional, List, Dict, Any -from collections import deque - -from src.common.logger import get_logger -from src.config.config import global_config -from src.person_info.relationship_builder_manager import relationship_builder_manager -from src.chat.express.expression_learner import expression_learner_manager -from src.chat.chat_loop.sleep_manager.sleep_manager import SleepManager, SleepState -from src.plugin_system.apis import message_api - -from .hfc_context import HfcContext -from .energy_manager import EnergyManager -from .proactive.proactive_thinker import ProactiveThinker -from .cycle_processor import CycleProcessor -from .response_handler import ResponseHandler -from .cycle_tracker import CycleTracker -from .sleep_manager.wakeup_manager import WakeUpManager -from .proactive.events import ProactiveTriggerEvent - -logger = get_logger("hfc") - - -class HeartFChatting: - def __init__(self, chat_id: str): - """ - 初始化心跳聊天管理器 - - Args: - chat_id: 聊天ID标识符 - - 功能说明: - - 创建聊天上下文和所有子管理器 - - 初始化循环跟踪器、响应处理器、循环处理器等核心组件 - - 设置能量管理器、主动思考器和普通模式处理器 - - 初始化聊天模式并记录初始化完成日志 - """ - self.context = HfcContext(chat_id) - - self.cycle_tracker = CycleTracker(self.context) - self.response_handler = ResponseHandler(self.context) - self.cycle_processor = CycleProcessor(self.context, self.response_handler, self.cycle_tracker) - self.energy_manager = EnergyManager(self.context) - self.proactive_thinker = ProactiveThinker(self.context, self.cycle_processor) - self.wakeup_manager = WakeUpManager(self.context) - self.sleep_manager = SleepManager() - - # 将唤醒度管理器设置到上下文中 - self.context.wakeup_manager = self.wakeup_manager - self.context.energy_manager = self.energy_manager - self.context.sleep_manager = self.sleep_manager - # 将HeartFChatting实例设置到上下文中,以便其他组件可以调用其方法 - self.context.chat_instance = self - - self._loop_task: Optional[asyncio.Task] = None - self._proactive_monitor_task: Optional[asyncio.Task] = None - - # 记录最近3次的兴趣度 - self.recent_interest_records: deque = deque(maxlen=3) - self._initialize_chat_mode() - logger.info(f"{self.context.log_prefix} HeartFChatting 初始化完成") - - def _initialize_chat_mode(self): - """ - 初始化聊天模式 - - 功能说明: - - 检测是否为群聊环境 - - 根据全局配置设置强制聊天模式 - - 在focus模式下设置能量值为35 - - 在normal模式下设置能量值为15 - - 如果是auto模式则保持默认设置 - """ - is_group_chat = self.context.chat_stream.group_info is not None if self.context.chat_stream else False - if is_group_chat and global_config.chat.group_chat_mode != "auto": - self.context.energy_value = 25 - - async def start(self): - """ - 启动心跳聊天系统 - - 功能说明: - - 检查是否已经在运行,避免重复启动 - - 初始化关系构建器和表达学习器 - - 启动能量管理器和主动思考器 - - 创建主聊天循环任务并设置完成回调 - - 记录启动完成日志 - """ - if self.context.running: - return - self.context.running = True - - self.context.relationship_builder = relationship_builder_manager.get_or_create_builder(self.context.stream_id) - self.context.expression_learner = expression_learner_manager.get_expression_learner(self.context.stream_id) - - # 启动主动思考监视器 - if global_config.chat.enable_proactive_thinking: - self._proactive_monitor_task = asyncio.create_task(self._proactive_monitor_loop()) - self._proactive_monitor_task.add_done_callback(self._handle_proactive_monitor_completion) - logger.info(f"{self.context.log_prefix} 主动思考监视器已启动") - - await self.wakeup_manager.start() - - self._loop_task = asyncio.create_task(self._main_chat_loop()) - self._loop_task.add_done_callback(self._handle_loop_completion) - logger.info(f"{self.context.log_prefix} HeartFChatting 启动完成") - - async def stop(self): - """ - 停止心跳聊天系统 - - 功能说明: - - 检查是否正在运行,避免重复停止 - - 设置运行状态为False - - 停止能量管理器和主动思考器 - - 取消主聊天循环任务 - - 记录停止完成日志 - """ - if not self.context.running: - return - self.context.running = False - - # 停止主动思考监视器 - if self._proactive_monitor_task and not self._proactive_monitor_task.done(): - self._proactive_monitor_task.cancel() - await asyncio.sleep(0) - logger.info(f"{self.context.log_prefix} 主动思考监视器已停止") - - await self.wakeup_manager.stop() - - if self._loop_task and not self._loop_task.done(): - self._loop_task.cancel() - await asyncio.sleep(0) - logger.info(f"{self.context.log_prefix} HeartFChatting 已停止") - - def _handle_loop_completion(self, task: asyncio.Task): - """ - 处理主循环任务完成 - - Args: - task: 完成的异步任务对象 - - 功能说明: - - 处理任务异常完成的情况 - - 区分正常停止和异常终止 - - 记录相应的日志信息 - - 处理取消任务的情况 - """ - try: - if exception := task.exception(): - logger.error(f"{self.context.log_prefix} HeartFChatting: 脱离了聊天(异常): {exception}") - logger.error(traceback.format_exc()) - else: - logger.info(f"{self.context.log_prefix} HeartFChatting: 脱离了聊天 (外部停止)") - except asyncio.CancelledError: - logger.info(f"{self.context.log_prefix} HeartFChatting: 结束了聊天") - - def _handle_proactive_monitor_completion(self, task: asyncio.Task): - """ - 处理主动思考监视器任务完成 - - Args: - task: 完成的异步任务对象 - - 功能说明: - - 处理任务异常完成的情况 - - 记录任务正常结束或被取消的日志 - """ - try: - if exception := task.exception(): - logger.error(f"{self.context.log_prefix} 主动思考监视器异常: {exception}") - else: - logger.info(f"{self.context.log_prefix} 主动思考监视器正常结束") - except asyncio.CancelledError: - logger.info(f"{self.context.log_prefix} 主动思考监视器被取消") - - async def _proactive_monitor_loop(self): - """ - 主动思考监视器循环 - - 功能说明: - - 定期检查是否需要进行主动思考 - - 计算聊天沉默时间,并与动态思考间隔比较 - - 当沉默时间超过阈值时,触发主动思考 - - 处理思考过程中的异常 - """ - while self.context.running: - await asyncio.sleep(15) - - if not self._should_enable_proactive_thinking(): - continue - - current_time = time.time() - silence_duration = current_time - self.context.last_message_time - target_interval = self._get_dynamic_thinking_interval() - - if silence_duration >= target_interval: - try: - formatted_time = self._format_duration(silence_duration) - event = ProactiveTriggerEvent( - source="silence_monitor", - reason=f"聊天已沉默 {formatted_time}", - metadata={"silence_duration": silence_duration}, - ) - await self.proactive_thinker.think(event) - self.context.last_message_time = current_time - except Exception as e: - logger.error(f"{self.context.log_prefix} 主动思考触发执行出错: {e}") - logger.error(traceback.format_exc()) - - def _should_enable_proactive_thinking(self) -> bool: - """ - 判断是否应启用主动思考 - - Returns: - bool: 如果应启用主动思考则返回True,否则返回False - - 功能说明: - - 检查全局配置和特定聊天设置 - - 支持按群聊和私聊分别配置 - - 支持白名单模式,只在特定聊天中启用 - """ - if not self.context.chat_stream: - return False - - is_group_chat = self.context.chat_stream.group_info is not None - - if is_group_chat and not global_config.chat.proactive_thinking_in_group: - return False - if not is_group_chat and not global_config.chat.proactive_thinking_in_private: - return False - - stream_parts = self.context.stream_id.split(":") - current_chat_identifier = f"{stream_parts}:{stream_parts}" if len(stream_parts) >= 2 else self.context.stream_id - - enable_list = getattr( - global_config.chat, - "proactive_thinking_enable_in_groups" if is_group_chat else "proactive_thinking_enable_in_private", - [], - ) - return not enable_list or current_chat_identifier in enable_list - - def _get_dynamic_thinking_interval(self) -> float: - """ - 获取动态思考间隔时间 - - Returns: - float: 思考间隔秒数 - - 功能说明: - - 尝试从timing_utils导入正态分布间隔函数 - - 根据配置计算动态间隔,增加随机性 - - 在无法导入或计算出错时,回退到固定的间隔 - """ - try: - from src.utils.timing_utils import get_normal_distributed_interval - - base_interval = global_config.chat.proactive_thinking_interval - delta_sigma = getattr(global_config.chat, "delta_sigma", 120) - - if base_interval <= 0: - base_interval = abs(base_interval) - if delta_sigma < 0: - delta_sigma = abs(delta_sigma) - - if base_interval == 0 and delta_sigma == 0: - return 300 - if delta_sigma == 0: - return base_interval - - sigma_percentage = delta_sigma / base_interval if base_interval > 0 else delta_sigma / 1000 - return get_normal_distributed_interval(base_interval, sigma_percentage, 1, 86400, use_3sigma_rule=True) - - except ImportError: - logger.warning(f"{self.context.log_prefix} timing_utils不可用,使用固定间隔") - return max(300, abs(global_config.chat.proactive_thinking_interval)) - except Exception as e: - logger.error(f"{self.context.log_prefix} 动态间隔计算出错: {e},使用固定间隔") - return max(300, abs(global_config.chat.proactive_thinking_interval)) - - def _format_duration(self, seconds: float) -> str: - """ - 格式化时长为可读字符串 - - Args: - seconds: 时长秒数 - - Returns: - str: 格式化后的字符串 (例如 "1小时2分3秒") - """ - hours = int(seconds // 3600) - minutes = int((seconds % 3600) // 60) - secs = int(seconds % 60) - parts = [] - if hours > 0: - parts.append(f"{hours}小时") - if minutes > 0: - parts.append(f"{minutes}分") - if secs > 0 or not parts: - parts.append(f"{secs}秒") - return "".join(parts) - - async def _main_chat_loop(self): - """ - 主聊天循环 - - 功能说明: - - 持续运行聊天处理循环 - - 只有在有新消息时才进行思考循环 - - 无新消息时等待新消息到达(由主动思考系统单独处理主动发言) - - 处理取消和异常情况 - - 在异常时尝试重新启动循环 - """ - try: - while self.context.running: - has_new_messages = await self._loop_body() - - if has_new_messages: - # 有新消息时,继续快速检查是否还有更多消息 - await asyncio.sleep(1) - else: - # 无新消息时,等待较长时间再检查 - # 这里只是为了定期检查系统状态,不进行思考循环 - # 真正的新消息响应依赖于消息到达时的通知 - await asyncio.sleep(1.0) - - except asyncio.CancelledError: - logger.info(f"{self.context.log_prefix} 麦麦已关闭聊天") - except Exception: - logger.error(f"{self.context.log_prefix} 麦麦聊天意外错误,将于3s后尝试重新启动") - print(traceback.format_exc()) - await asyncio.sleep(3) - self._loop_task = asyncio.create_task(self._main_chat_loop()) - logger.error(f"{self.context.log_prefix} 结束了当前聊天循环") - - async def _loop_body(self) -> bool: - """ - 单次循环体处理 - - Returns: - bool: 是否处理了新消息 - - 功能说明: - - 检查是否处于睡眠模式,如果是则处理唤醒度逻辑 - - 获取最近的新消息(过滤机器人自己的消息和命令) - - 只有在有新消息时才进行思考循环处理 - - 更新最后消息时间和读取时间 - - 根据当前聊天模式执行不同的处理逻辑 - - FOCUS模式:直接处理所有消息并检查退出条件 - - NORMAL模式:检查进入FOCUS模式的条件,并通过normal_mode_handler处理消息 - """ - # --- 核心状态更新 --- - await self.sleep_manager.update_sleep_state(self.wakeup_manager) - current_sleep_state = self.sleep_manager.get_current_sleep_state() - is_sleeping = current_sleep_state == SleepState.SLEEPING - is_in_insomnia = current_sleep_state == SleepState.INSOMNIA - - # 核心修复:在睡眠模式(包括失眠)下获取消息时,不过滤命令消息,以确保@消息能被接收 - filter_command_flag = not (is_sleeping or is_in_insomnia) - - recent_messages = message_api.get_messages_by_time_in_chat( - chat_id=self.context.stream_id, - start_time=self.context.last_read_time, - end_time=time.time(), - limit=10, - limit_mode="latest", - filter_mai=True, - filter_command=filter_command_flag, - ) - - has_new_messages = bool(recent_messages) - new_message_count = len(recent_messages) - - # 只有在有新消息时才进行思考循环处理 - if has_new_messages: - self.context.last_message_time = time.time() - self.context.last_read_time = time.time() - - # --- 专注模式安静群组检查 --- - quiet_groups = global_config.chat.focus_mode_quiet_groups - if quiet_groups and self.context.chat_stream: - is_group_chat = self.context.chat_stream.group_info is not None - if is_group_chat: - try: - platform = self.context.chat_stream.platform - group_id = self.context.chat_stream.group_info.group_id - - # 兼容不同QQ适配器的平台名称 - is_qq_platform = platform in ["qq", "napcat"] - - current_chat_identifier = f"{platform}:{group_id}" - config_identifier_for_qq = f"qq:{group_id}" - - is_in_quiet_list = (current_chat_identifier in quiet_groups or - (is_qq_platform and config_identifier_for_qq in quiet_groups)) - - if is_in_quiet_list: - is_mentioned_in_batch = False - for msg in recent_messages: - if msg.get("is_mentioned"): - is_mentioned_in_batch = True - break - - if not is_mentioned_in_batch: - logger.info(f"{self.context.log_prefix} 在专注安静模式下,因未被提及而忽略了消息。") - return True # 消耗消息但不做回复 - except Exception as e: - logger.error(f"{self.context.log_prefix} 检查专注安静群组时出错: {e}") - - # 处理唤醒度逻辑 - if current_sleep_state in [SleepState.SLEEPING, SleepState.PREPARING_SLEEP, SleepState.INSOMNIA]: - self._handle_wakeup_messages(recent_messages) - - # 再次获取最新状态,因为 handle_wakeup 可能导致状态变为 WOKEN_UP - current_sleep_state = self.sleep_manager.get_current_sleep_state() - - if current_sleep_state == SleepState.SLEEPING: - # 只有在纯粹的 SLEEPING 状态下才跳过消息处理 - return True - - if current_sleep_state == SleepState.WOKEN_UP: - logger.info(f"{self.context.log_prefix} 从睡眠中被唤醒,将处理积压的消息。") - - # 根据聊天模式处理新消息 - should_process, interest_value = await self._should_process_messages(recent_messages) - if not should_process: - # 消息数量不足或兴趣不够,等待 - await asyncio.sleep(0.5) - return True # Skip rest of the logic for this iteration - - # Messages should be processed - action_type = await self.cycle_processor.observe(interest_value=interest_value) - - # 管理no_reply计数器 - if action_type != "no_reply": - self.recent_interest_records.clear() - self.context.no_reply_consecutive = 0 - logger.debug(f"{self.context.log_prefix} 执行了{action_type}动作,重置no_reply计数器") - else: # action_type == "no_reply" - self.context.no_reply_consecutive += 1 - self._determine_form_type() - - # 在一轮动作执行完毕后,增加睡眠压力 - if self.context.energy_manager and global_config.sleep_system.enable_insomnia_system: - if action_type not in ["no_reply", "no_action"]: - self.context.energy_manager.increase_sleep_pressure() - - # 如果成功观察,增加能量值并重置累积兴趣值 - self.context.energy_value += 1 / global_config.chat.focus_value - # 重置累积兴趣值,因为消息已经被成功处理 - self.context.breaking_accumulated_interest = 0.0 - logger.info( - f"{self.context.log_prefix} 能量值增加,当前能量值:{self.context.energy_value:.1f},重置累积兴趣值" - ) - - # 更新上一帧的睡眠状态 - self.context.was_sleeping = is_sleeping - - # --- 重新入睡逻辑 --- - # 如果被吵醒了,并且在一定时间内没有新消息,则尝试重新入睡 - if self.sleep_manager.get_current_sleep_state() == SleepState.WOKEN_UP and not has_new_messages: - re_sleep_delay = global_config.sleep_system.re_sleep_delay_minutes * 60 - # 使用 last_message_time 来判断空闲时间 - if time.time() - self.context.last_message_time > re_sleep_delay: - logger.info( - f"{self.context.log_prefix} 已被唤醒且超过 {re_sleep_delay / 60} 分钟无新消息,尝试重新入睡。" - ) - self.sleep_manager.reset_sleep_state_after_wakeup() - - # 保存HFC上下文状态 - self.context.save_context_state() - - return has_new_messages - - def _handle_wakeup_messages(self, messages): - """ - 处理休眠状态下的消息,累积唤醒度 - - Args: - messages: 消息列表 - - 功能说明: - - 区分私聊和群聊消息 - - 检查群聊消息是否艾特了机器人 - - 调用唤醒度管理器累积唤醒度 - - 如果达到阈值则唤醒并进入愤怒状态 - """ - if not self.wakeup_manager: - return - - is_private_chat = self.context.chat_stream.group_info is None if self.context.chat_stream else False - - for message in messages: - is_mentioned = False - - # 检查群聊消息是否艾特了机器人 - if not is_private_chat: - # 最终修复:直接使用消息对象中由上游处理好的 is_mention 字段。 - # 该字段在 message.py 的 MessageRecv._process_single_segment 中被设置。 - if message.get("is_mentioned"): - is_mentioned = True - - # 累积唤醒度 - woke_up = self.wakeup_manager.add_wakeup_value(is_private_chat, is_mentioned) - - if woke_up: - logger.info(f"{self.context.log_prefix} 被消息吵醒,进入愤怒状态!") - break - - def _determine_form_type(self) -> str: - """判断使用哪种形式的no_reply""" - # 检查是否启用breaking模式 - if not getattr(global_config.chat, "enable_breaking_mode", False): - logger.info(f"{self.context.log_prefix} breaking模式已禁用,使用waiting形式") - self.context.focus_energy = 1 - return "waiting" - - # 如果连续no_reply次数少于3次,使用waiting形式 - if self.context.no_reply_consecutive <= 3: - self.context.focus_energy = 1 - return "waiting" - else: - # 使用累积兴趣值而不是最近3次的记录 - total_interest = self.context.breaking_accumulated_interest - - # 计算调整后的阈值 - adjusted_threshold = 1 / global_config.chat.get_current_talk_frequency(self.context.stream_id) - - logger.info( - f"{self.context.log_prefix} 累积兴趣值: {total_interest:.2f}, 调整后阈值: {adjusted_threshold:.2f}" - ) - - # 如果累积兴趣值小于阈值,进入breaking形式 - if total_interest < adjusted_threshold: - logger.info(f"{self.context.log_prefix} 累积兴趣度不足,进入breaking形式") - self.context.focus_energy = random.randint(3, 6) - return "breaking" - else: - logger.info(f"{self.context.log_prefix} 累积兴趣度充足,使用waiting形式") - self.context.focus_energy = 1 - return "waiting" - - async def _should_process_messages(self, new_message: List[Dict[str, Any]]) -> tuple[bool, float]: - """ - 统一判断是否应该处理消息的函数 - 根据当前循环模式和消息内容决定是否继续处理 - """ - if not new_message: - return False, 0.0 - - new_message_count = len(new_message) - - talk_frequency = global_config.chat.get_current_talk_frequency(self.context.stream_id) - - modified_exit_count_threshold = self.context.focus_energy * 0.5 / talk_frequency - modified_exit_interest_threshold = 1.5 / talk_frequency - - # 计算当前批次消息的兴趣值 - batch_interest = 0.0 - for msg_dict in new_message: - interest_value = msg_dict.get("interest_value", 0.0) - if msg_dict.get("processed_plain_text", ""): - batch_interest += interest_value - - # 在breaking形式下累积所有消息的兴趣值 - if new_message_count > 0: - self.context.breaking_accumulated_interest += batch_interest - total_interest = self.context.breaking_accumulated_interest - else: - total_interest = self.context.breaking_accumulated_interest - - if new_message_count >= modified_exit_count_threshold: - # 记录兴趣度到列表 - self.recent_interest_records.append(total_interest) - # 重置累积兴趣值,因为已经达到了消息数量阈值 - self.context.breaking_accumulated_interest = 0.0 - - logger.info( - f"{self.context.log_prefix} 累计消息数量达到{new_message_count}条(>{modified_exit_count_threshold:.1f}),结束等待,累积兴趣值: {total_interest:.2f}" - ) - return True, total_interest / new_message_count - - # 检查累计兴趣值 - if new_message_count > 0: - # 只在兴趣值变化时输出log - if not hasattr(self, "_last_accumulated_interest") or total_interest != self._last_accumulated_interest: - logger.info( - f"{self.context.log_prefix} breaking形式当前累积兴趣值: {total_interest:.2f}, 专注度: {global_config.chat.focus_value:.1f}" - ) - self._last_accumulated_interest = total_interest - if total_interest >= modified_exit_interest_threshold: - # 记录兴趣度到列表 - self.recent_interest_records.append(total_interest) - # 重置累积兴趣值,因为已经达到了兴趣值阈值 - self.context.breaking_accumulated_interest = 0.0 - logger.info( - f"{self.context.log_prefix} 累计兴趣值达到{total_interest:.2f}(>{modified_exit_interest_threshold:.1f}),结束等待" - ) - return True, total_interest / new_message_count - - # 每10秒输出一次等待状态 - if ( - int(time.time() - self.context.last_read_time) > 0 - and int(time.time() - self.context.last_read_time) % 10 == 0 - ): - logger.info( - f"{self.context.log_prefix} 已等待{time.time() - self.context.last_read_time:.0f}秒,累计{new_message_count}条消息,累积兴趣{total_interest:.1f},继续等待..." - ) - await asyncio.sleep(0.5) - - return False, 0.0 diff --git a/src/chat/chat_loop/hfc_context.py b/src/chat/chat_loop/hfc_context.py deleted file mode 100644 index fe5d283ae..000000000 --- a/src/chat/chat_loop/hfc_context.py +++ /dev/null @@ -1,84 +0,0 @@ -from typing import List, Optional, TYPE_CHECKING -import time -from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager -from src.person_info.relationship_builder_manager import RelationshipBuilder -from src.chat.express.expression_learner import ExpressionLearner -from src.chat.planner_actions.action_manager import ActionManager -from src.chat.chat_loop.hfc_utils import CycleDetail -from src.config.config import global_config - -if TYPE_CHECKING: - from .sleep_manager.wakeup_manager import WakeUpManager - from .energy_manager import EnergyManager - from .heartFC_chat import HeartFChatting - from .sleep_manager.sleep_manager import SleepManager - - -class HfcContext: - def __init__(self, chat_id: str): - """ - 初始化HFC聊天上下文 - - Args: - chat_id: 聊天ID标识符 - - 功能说明: - - 存储和管理单个聊天会话的所有状态信息 - - 包含聊天流、关系构建器、表达学习器等核心组件 - - 管理聊天模式、能量值、时间戳等关键状态 - - 提供循环历史记录和当前循环详情的存储 - - 集成唤醒度管理器,处理休眠状态下的唤醒机制 - - Raises: - ValueError: 如果找不到对应的聊天流 - """ - self.stream_id: str = chat_id - self.chat_stream: Optional[ChatStream] = get_chat_manager().get_stream(self.stream_id) - if not self.chat_stream: - raise ValueError(f"无法找到聊天流: {self.stream_id}") - - self.log_prefix = f"[{get_chat_manager().get_stream_name(self.stream_id) or self.stream_id}]" - - self.relationship_builder: Optional[RelationshipBuilder] = None - self.expression_learner: Optional[ExpressionLearner] = None - - self.energy_value = self.chat_stream.energy_value - self.sleep_pressure = self.chat_stream.sleep_pressure - self.was_sleeping = False # 用于检测睡眠状态的切换 - - self.last_message_time = time.time() - self.last_read_time = time.time() - 10 - - # 从聊天流恢复breaking累积兴趣值 - self.breaking_accumulated_interest = getattr(self.chat_stream, "breaking_accumulated_interest", 0.0) - - self.action_manager = ActionManager() - - self.running: bool = False - - self.history_loop: List[CycleDetail] = [] - self.cycle_counter = 0 - self.current_cycle_detail: Optional[CycleDetail] = None - - # 唤醒度管理器 - 延迟初始化以避免循环导入 - self.wakeup_manager: Optional["WakeUpManager"] = None - self.energy_manager: Optional["EnergyManager"] = None - self.sleep_manager: Optional["SleepManager"] = None - - # 从聊天流获取focus_energy,如果没有则使用配置文件中的值 - self.focus_energy = getattr(self.chat_stream, "focus_energy", global_config.chat.focus_value) - self.no_reply_consecutive = 0 - self.total_interest = 0.0 - # breaking形式下的累积兴趣值 - self.breaking_accumulated_interest = 0.0 - # 引用HeartFChatting实例,以便其他组件可以调用其方法 - self.chat_instance: "HeartFChatting" - - def save_context_state(self): - """将当前状态保存到聊天流""" - if self.chat_stream: - self.chat_stream.energy_value = self.energy_value - self.chat_stream.sleep_pressure = self.sleep_pressure - self.chat_stream.focus_energy = self.focus_energy - self.chat_stream.no_reply_consecutive = self.no_reply_consecutive - self.chat_stream.breaking_accumulated_interest = self.breaking_accumulated_interest diff --git a/src/chat/chat_loop/hfc_utils.py b/src/chat/chat_loop/hfc_utils.py deleted file mode 100644 index 32d31fd52..000000000 --- a/src/chat/chat_loop/hfc_utils.py +++ /dev/null @@ -1,172 +0,0 @@ -import time -from typing import Optional, Dict, Any, Union - -from src.common.logger import get_logger -from src.chat.message_receive.chat_stream import get_chat_manager -from src.plugin_system.apis import send_api -from maim_message.message_base import GroupInfo - - -logger = get_logger("hfc") - - -class CycleDetail: - """ - 循环信息记录类 - - 功能说明: - - 记录单次思考循环的详细信息 - - 包含循环ID、思考ID、时间戳等基本信息 - - 存储循环的规划信息和动作信息 - - 提供序列化和转换功能 - """ - - def __init__(self, cycle_id: Union[int, str]): - """ - 初始化循环详情记录 - - Args: - cycle_id: 循环ID,用于标识循环的顺序 - - 功能说明: - - 设置循环基本标识信息 - - 初始化时间戳和计时器 - - 准备循环信息存储容器 - """ - self.cycle_id = cycle_id - self.thinking_id = "" - self.start_time = time.time() - self.end_time: Optional[float] = None - self.timers: Dict[str, float] = {} - - self.loop_plan_info: Dict[str, Any] = {} - self.loop_action_info: Dict[str, Any] = {} - - def to_dict(self) -> Dict[str, Any]: - """ - 将循环信息转换为字典格式 - - Returns: - dict: 包含所有循环信息的字典,已处理循环引用和序列化问题 - - 功能说明: - - 递归转换复杂对象为可序列化格式 - - 防止循环引用导致的无限递归 - - 限制递归深度避免栈溢出 - - 只保留基本数据类型和可序列化的值 - """ - - def convert_to_serializable(obj, depth=0, seen=None): - if seen is None: - seen = set() - - # 防止递归过深 - if depth > 5: # 降低递归深度限制 - return str(obj) - - # 防止循环引用 - obj_id = id(obj) - if obj_id in seen: - return str(obj) - seen.add(obj_id) - - try: - if hasattr(obj, "to_dict"): - # 对于有to_dict方法的对象,直接调用其to_dict方法 - return obj.to_dict() - elif isinstance(obj, dict): - # 对于字典,只保留基本类型和可序列化的值 - return { - k: convert_to_serializable(v, depth + 1, seen) - for k, v in obj.items() - if isinstance(k, (str, int, float, bool)) - } - elif isinstance(obj, (list, tuple)): - # 对于列表和元组,只保留可序列化的元素 - return [ - convert_to_serializable(item, depth + 1, seen) - for item in obj - if not isinstance(item, (dict, list, tuple)) - or isinstance(item, (str, int, float, bool, type(None))) - ] - elif isinstance(obj, (str, int, float, bool, type(None))): - return obj - else: - return str(obj) - finally: - seen.remove(obj_id) - - return { - "cycle_id": self.cycle_id, - "start_time": self.start_time, - "end_time": self.end_time, - "timers": self.timers, - "thinking_id": self.thinking_id, - "loop_plan_info": convert_to_serializable(self.loop_plan_info), - "loop_action_info": convert_to_serializable(self.loop_action_info), - } - - def set_loop_info(self, loop_info: Dict[str, Any]): - """ - 设置循环信息 - - Args: - loop_info: 包含循环规划和动作信息的字典 - - 功能说明: - - 从传入的循环信息中提取规划和动作信息 - - 更新当前循环详情的相关字段 - """ - self.loop_plan_info = loop_info["loop_plan_info"] - self.loop_action_info = loop_info["loop_action_info"] - - -async def send_typing(user_id): - """ - 发送打字状态指示 - - 功能说明: - - 创建内心聊天流(用于状态显示) - - 发送typing状态消息 - - 不存储到消息记录中 - - 用于S4U功能的视觉反馈 - """ - group_info = GroupInfo(platform="amaidesu_default", group_id="114514", group_name="内心") - - chat = await get_chat_manager().get_or_create_stream( - platform="amaidesu_default", - user_info=None, - group_info=group_info, - ) - - from plugin_system.core.event_manager import event_manager - from src.plugins.built_in.napcat_adapter_plugin.event_types import NapcatEvent - # 设置正在输入状态 - await event_manager.trigger_event(NapcatEvent.PERSONAL.SET_INPUT_STATUS,user_id=user_id,event_type=1) - - await send_api.custom_to_stream( - message_type="state", content="typing", stream_id=chat.stream_id, storage_message=False - ) - - -async def stop_typing(): - """ - 停止打字状态指示 - - 功能说明: - - 创建内心聊天流(用于状态显示) - - 发送stop_typing状态消息 - - 不存储到消息记录中 - - 结束S4U功能的视觉反馈 - """ - group_info = GroupInfo(platform="amaidesu_default", group_id="114514", group_name="内心") - - chat = await get_chat_manager().get_or_create_stream( - platform="amaidesu_default", - user_info=None, - group_info=group_info, - ) - - await send_api.custom_to_stream( - message_type="state", content="stop_typing", stream_id=chat.stream_id, storage_message=False - ) diff --git a/src/chat/chat_loop/proactive/events.py b/src/chat/chat_loop/proactive/events.py deleted file mode 100644 index 89a3bc7bb..000000000 --- a/src/chat/chat_loop/proactive/events.py +++ /dev/null @@ -1,14 +0,0 @@ -from dataclasses import dataclass, field -from typing import Optional, Dict, Any - - -@dataclass -class ProactiveTriggerEvent: - """ - 主动思考触发事件的数据类 - """ - - source: str # 触发源的标识,例如 "silence_monitor", "insomnia_manager" - reason: str # 触发的具体原因,例如 "聊天已沉默10分钟", "深夜emo" - metadata: Optional[Dict[str, Any]] = field(default_factory=dict) # 可选的元数据,用于传递额外信息 - related_message_id: Optional[str] = None # 关联的消息ID,用于加载上下文 diff --git a/src/chat/chat_loop/proactive/proactive_thinker.py b/src/chat/chat_loop/proactive/proactive_thinker.py deleted file mode 100644 index d1716e75e..000000000 --- a/src/chat/chat_loop/proactive/proactive_thinker.py +++ /dev/null @@ -1,319 +0,0 @@ -import time -import traceback -from typing import TYPE_CHECKING, Dict, Any - -from src.common.logger import get_logger -from src.plugin_system.base.component_types import ChatMode -from ..hfc_context import HfcContext -from .events import ProactiveTriggerEvent -from src.plugin_system.apis import generator_api -from src.plugin_system.apis.generator_api import process_human_text -from src.schedule.schedule_manager import schedule_manager -from src.plugin_system import tool_api -from src.config.config import global_config -from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat, build_readable_messages_with_id -from src.mood.mood_manager import mood_manager -from src.common.database.sqlalchemy_database_api import store_action_info, db_get -from src.common.database.sqlalchemy_models import Messages - -if TYPE_CHECKING: - from ..cycle_processor import CycleProcessor - -logger = get_logger("hfc") - - -class ProactiveThinker: - """ - 主动思考器,负责处理和执行主动思考事件。 - 当接收到 ProactiveTriggerEvent 时,它会根据事件内容进行一系列决策和操作, - 例如调整情绪、调用规划器生成行动,并最终可能产生一个主动的回复。 - """ - - def __init__(self, context: HfcContext, cycle_processor: "CycleProcessor"): - """ - 初始化主动思考器。 - - Args: - context (HfcContext): HFC聊天上下文对象,提供了当前聊天会话的所有背景信息。 - cycle_processor (CycleProcessor): 循环处理器,用于执行主动思考后产生的动作。 - - 功能说明: - - 接收并处理主动思考事件 (ProactiveTriggerEvent)。 - - 在思考前根据事件类型执行预处理操作,如修改当前情绪状态。 - - 调用行动规划器 (Action Planner) 来决定下一步应该做什么。 - - 如果规划结果是发送消息,则调用生成器API生成回复并发送。 - """ - self.context = context - self.cycle_processor = cycle_processor - - async def think(self, trigger_event: ProactiveTriggerEvent): - """ - 主动思考的统一入口API。 - 这是外部触发主动思考时调用的主要方法。 - - Args: - trigger_event (ProactiveTriggerEvent): 描述触发上下文的事件对象,包含了思考的来源和原因。 - """ - logger.info( - f"{self.context.log_prefix} 接收到主动思考事件: " - f"来源='{trigger_event.source}', 原因='{trigger_event.reason}'" - ) - - try: - # 步骤 1: 根据事件类型执行思考前的准备工作,例如调整情绪。 - await self._prepare_for_thinking(trigger_event) - - # 步骤 2: 执行核心的思考和决策逻辑。 - await self._execute_proactive_thinking(trigger_event) - - except Exception as e: - # 捕获并记录在思考过程中发生的任何异常。 - logger.error(f"{self.context.log_prefix} 主动思考 think 方法执行异常: {e}") - logger.error(traceback.format_exc()) - - async def _prepare_for_thinking(self, trigger_event: ProactiveTriggerEvent): - """ - 根据事件类型,在正式思考前执行准备工作。 - 目前主要是处理来自失眠管理器的事件,并据此调整情绪。 - - Args: - trigger_event (ProactiveTriggerEvent): 触发事件。 - """ - # 目前只处理来自失眠管理器(insomnia_manager)的事件 - if trigger_event.source != "insomnia_manager": - return - - try: - # 获取当前聊天的情绪对象 - mood_obj = mood_manager.get_mood_by_chat_id(self.context.stream_id) - new_mood = None - - # 根据失眠的不同原因设置对应的情绪 - if trigger_event.reason == "low_pressure": - new_mood = "精力过剩,毫无睡意" - elif trigger_event.reason == "random": - new_mood = "深夜emo,胡思乱想" - elif trigger_event.reason == "goodnight": - new_mood = "有点困了,准备睡觉了" - - # 如果成功匹配到了新的情绪,则更新情绪状态 - if new_mood: - mood_obj.mood_state = new_mood - mood_obj.last_change_time = time.time() - logger.info( - f"{self.context.log_prefix} 因 '{trigger_event.reason}'," - f"情绪状态被强制更新为: {mood_obj.mood_state}" - ) - - except Exception as e: - logger.error(f"{self.context.log_prefix} 设置失眠情绪时出错: {e}") - - async def _execute_proactive_thinking(self, trigger_event: ProactiveTriggerEvent): - """ - 执行主动思考的核心逻辑。 - 它会调用规划器来决定是否要采取行动,以及采取什么行动。 - - Args: - trigger_event (ProactiveTriggerEvent): 触发事件。 - """ - try: - actions, _ = await self.cycle_processor.action_planner.plan(mode=ChatMode.PROACTIVE) - action_result = actions[0] if actions else {} - action_type = action_result.get("action_type") - - if action_type == "proactive_reply": - await self._generate_proactive_content_and_send(action_result, trigger_event) - elif action_type not in ["do_nothing", "no_action"]: - await self.cycle_processor._handle_action( - action=action_result["action_type"], - reasoning=action_result.get("reasoning", ""), - action_data=action_result.get("action_data", {}), - cycle_timers={}, - thinking_id="", - action_message=action_result.get("action_message") - ) - else: - logger.info(f"{self.context.log_prefix} 主动思考决策: 保持沉默") - - except Exception as e: - logger.error(f"{self.context.log_prefix} 主动思考执行异常: {e}") - logger.error(traceback.format_exc()) - - - async def _generate_proactive_content_and_send(self, action_result: Dict[str, Any], trigger_event: ProactiveTriggerEvent): - """ - 获取实时信息,构建最终的生成提示词,并生成和发送主动回复。 - - Args: - action_result (Dict[str, Any]): 规划器返回的动作结果。 - trigger_event (ProactiveTriggerEvent): 触发事件。 - """ - try: - topic = action_result.get("action_data", {}).get("topic", "随便聊聊") - logger.info(f"{self.context.log_prefix} 主动思考确定主题: '{topic}'") - - schedule_block = "你今天没有日程安排。" - if global_config.planning_system.schedule_enable: - if current_activity := schedule_manager.get_current_activity(): - schedule_block = f"你当前正在:{current_activity}。" - - news_block = "暂时没有获取到最新资讯。" - if trigger_event.source != "reminder_system": - # 增加搜索前决策 - should_search_prompt = f""" -# 搜索决策 - -## 任务 -判断是否有必要为了话题“{topic}”进行网络搜索。 - -## 判断标准 -- **需要搜索**:时事新闻、知识查询、具体事件等需要外部信息的话题。 -- **无需搜索**:日常关心、个人感受、延续已有对话等不需要外部信息的话题。 - -## 你的决策 -输出`SEARCH`或`SKIP`。 -""" - from src.llm_models.utils_model import LLMRequest - from src.config.config import model_config - - decision_llm = LLMRequest( - model_set=model_config.model_task_config.planner, - request_type="planner" - ) - - decision, _ = await decision_llm.generate_response_async(prompt=should_search_prompt) - - if "SEARCH" in decision: - try: - web_search_tool = tool_api.get_tool_instance("web_search") - if web_search_tool and topic: - try: - search_result_dict = await web_search_tool.execute(function_args={"keyword": topic, "max_results": 10}) - if search_result_dict and not search_result_dict.get("error"): - news_block = search_result_dict.get("content", "未能提取有效资讯。") - elif search_result_dict: - logger.warning(f"{self.context.log_prefix} 网络搜索返回错误: {search_result_dict.get('error')}") - except Exception as e: - logger.error(f"{self.context.log_prefix} 网络搜索执行失败: {e}") - else: - logger.warning(f"{self.context.log_prefix} 未找到 web_search 工具实例或主题为空。") - except Exception as e: - logger.error(f"{self.context.log_prefix} 主动思考时网络搜索失败: {e}") - message_list = get_raw_msg_before_timestamp_with_chat( - chat_id=self.context.stream_id, - timestamp=time.time(), - limit=int(global_config.chat.max_context_size * 0.3), - ) - chat_context_block, _ = build_readable_messages_with_id(messages=message_list) - - from src.llm_models.utils_model import LLMRequest - from src.config.config import model_config - - bot_name = global_config.bot.nickname - - confirmation_prompt = f"""# 主动回复二次确认 - -## 基本信息 -你的名字是{bot_name},准备主动发起关于"{topic}"的话题。 - -## 最近的聊天内容 -{chat_context_block} - -## 合理判断标准 -请检查以下条件,如果**所有条件都合理**就可以回复: - -1. **回应检查**:检查你({bot_name})发送的最后一条消息之后,是否有其他人发言。如果没有,则大概率应该保持沉默。 -2. **话题补充**:只有当你认为准备发起的话题是对上一条无人回应消息的**有价值的补充**时,才可以在上一条消息无人回应的情况下继续发言。 -3. **时间合理性**:当前时间是否在深夜(凌晨2点-6点)这种不适合主动聊天的时段? -4. **内容价值**:这个话题"{topic}"是否有意义,不是完全无关紧要的内容? -5. **重复避免**:你准备说的话题是否与你自己的上一条消息明显重复? -6. **自然性**:在当前上下文中主动提起这个话题是否自然合理? - -## 输出要求 -如果判断应该跳过(比如上一条消息无人回应、深夜时段、无意义话题、重复内容),输出:SKIP_PROACTIVE_REPLY -其他情况都应该输出:PROCEED_TO_REPLY - -请严格按照上述格式输出,不要添加任何解释。""" - - planner_llm = LLMRequest( - model_set=model_config.model_task_config.planner, - request_type="planner" - ) - - confirmation_result, _ = await planner_llm.generate_response_async(prompt=confirmation_prompt) - - if not confirmation_result or "SKIP_PROACTIVE_REPLY" in confirmation_result: - logger.info(f"{self.context.log_prefix} 决策模型二次确认决定跳过主动回复") - return - - bot_name = global_config.bot.nickname - personality = global_config.personality - identity_block = ( - f"你的名字是{bot_name}。\n" - f"关于你:{personality.personality_core},并且{personality.personality_side}。\n" - f"你的身份是{personality.identity},平时说话风格是{personality.reply_style}。" - ) - mood_block = f"你现在的心情是:{mood_manager.get_mood_by_chat_id(self.context.stream_id).mood_state}" - - final_prompt = f""" -## 你的角色 -{identity_block} - -## 你的心情 -{mood_block} - -## 你今天的日程安排 -{schedule_block} - -## 关于你准备讨论的话题"{topic}"的最新信息 -{news_block} - -## 最近的聊天内容 -{chat_context_block} - -## 任务 -你现在想要主动说些什么。话题是"{topic}",但这只是一个参考方向。 - -根据最近的聊天内容,你可以: -- 如果是想关心朋友,就自然地询问他们的情况 -- 如果想起了之前的话题,就问问后来怎么样了 -- 如果有什么想分享的想法,就自然地开启话题 -- 如果只是想闲聊,就随意地说些什么 - -## 要求 -- 像真正的朋友一样,自然地表达关心或好奇 -- 不要过于正式,要口语化和亲切 -- 结合你的角色设定,保持温暖的风格 -- 直接输出你想说的话,不要解释为什么要说 - -请输出一条简短、自然的主动发言。 -""" - - response_text = await generator_api.generate_response_custom( - chat_stream=self.context.chat_stream, - prompt=final_prompt, - request_type="chat.replyer.proactive", - ) - - if response_text: - response_set = process_human_text( - content=response_text, - enable_splitter=global_config.response_splitter.enable, - enable_chinese_typo=global_config.chinese_typo.enable, - ) - await self.cycle_processor.response_handler.send_response( - response_set, time.time(), action_result.get("action_message") - ) - await store_action_info( - chat_stream=self.context.chat_stream, - action_name="proactive_reply", - action_data={"topic": topic, "response": response_text}, - action_prompt_display=f"主动发起对话: {topic}", - action_done=True, - ) - else: - logger.error(f"{self.context.log_prefix} 主动思考生成回复失败。") - - except Exception as e: - logger.error(f"{self.context.log_prefix} 生成主动回复内容时异常: {e}") - logger.error(traceback.format_exc()) diff --git a/src/chat/chat_loop/response_handler.py b/src/chat/chat_loop/response_handler.py deleted file mode 100644 index 9859c76c3..000000000 --- a/src/chat/chat_loop/response_handler.py +++ /dev/null @@ -1,184 +0,0 @@ -import time -import random -from typing import Dict, Any, Tuple - -from src.common.logger import get_logger -from src.plugin_system.apis import send_api, message_api, database_api -from src.person_info.person_info import get_person_info_manager -from .hfc_context import HfcContext - -# 导入反注入系统 - -# 日志记录器 -logger = get_logger("hfc") -anti_injector_logger = get_logger("anti_injector") - - -class ResponseHandler: - """ - 响应处理器类,负责生成和发送机器人的回复。 - """ - def __init__(self, context: HfcContext): - """ - 初始化响应处理器 - - Args: - context: HFC聊天上下文对象 - - 功能说明: - - 负责生成和发送机器人的回复 - - 处理回复的格式化和发送逻辑 - - 管理回复状态和日志记录 - """ - self.context = context - - async def generate_and_send_reply( - self, - response_set, - reply_to_str, - loop_start_time, - action_message, - cycle_timers: Dict[str, float], - thinking_id, - plan_result, - ) -> Tuple[Dict[str, Any], str, Dict[str, float]]: - """ - 生成并发送回复的主方法 - - Args: - response_set: 生成的回复内容集合 - reply_to_str: 回复目标字符串 - loop_start_time: 循环开始时间 - action_message: 动作消息数据 - cycle_timers: 循环计时器 - thinking_id: 思考ID - plan_result: 规划结果 - - Returns: - tuple: (循环信息, 回复文本, 计时器信息) - - 功能说明: - - 发送生成的回复内容 - - 存储动作信息到数据库 - - 构建并返回完整的循环信息 - - 用于上级方法的状态跟踪 - """ - reply_text = await self.send_response(response_set, loop_start_time, action_message) - - person_info_manager = get_person_info_manager() - - # 获取平台信息 - platform = "default" - if self.context.chat_stream: - platform = ( - action_message.get("chat_info_platform") - or action_message.get("user_platform") - or self.context.chat_stream.platform - ) - - # 获取用户信息并生成回复提示 - user_id = action_message.get("user_id", "") - person_id = person_info_manager.get_person_id(platform, user_id) - person_name = await person_info_manager.get_value(person_id, "person_name") - action_prompt_display = f"你对{person_name}进行了回复:{reply_text}" - - # 存储动作信息到数据库 - await database_api.store_action_info( - chat_stream=self.context.chat_stream, - action_build_into_prompt=False, - action_prompt_display=action_prompt_display, - action_done=True, - thinking_id=thinking_id, - action_data={"reply_text": reply_text, "reply_to": reply_to_str}, - action_name="reply", - ) - - # 构建循环信息 - loop_info: Dict[str, Any] = { - "loop_plan_info": { - "action_result": plan_result.get("action_result", {}), - }, - "loop_action_info": { - "action_taken": True, - "reply_text": reply_text, - "command": "", - "taken_time": time.time(), - }, - } - - return loop_info, reply_text, cycle_timers - - async def send_response(self, reply_set, thinking_start_time, message_data) -> str: - """ - 发送回复内容的具体实现 - - Args: - reply_set: 回复内容集合,包含多个回复段 - reply_to: 回复目标 - thinking_start_time: 思考开始时间 - message_data: 消息数据 - - Returns: - str: 完整的回复文本 - - 功能说明: - - 检查是否有新消息需要回复 - - 处理主动思考的"沉默"决定 - - 根据消息数量决定是否添加回复引用 - - 逐段发送回复内容,支持打字效果 - - 正确处理元组格式的回复段 - """ - current_time = time.time() - # 计算新消息数量 - new_message_count = message_api.count_new_messages( - chat_id=self.context.stream_id, start_time=thinking_start_time, end_time=current_time - ) - - # 根据新消息数量决定是否需要引用回复 - need_reply = new_message_count >= random.randint(2, 4) - - reply_text = "" - is_proactive_thinking = (message_data.get("message_type") == "proactive_thinking") if message_data else True - - first_replied = False - for reply_seg in reply_set: - # 调试日志:验证reply_seg的格式 - logger.debug(f"Processing reply_seg type: {type(reply_seg)}, content: {reply_seg}") - - # 修正:正确处理元组格式 (格式为: (type, content)) - if isinstance(reply_seg, tuple) and len(reply_seg) >= 2: - _, data = reply_seg - else: - # 向下兼容:如果已经是字符串,则直接使用 - data = str(reply_seg) - - if isinstance(data, list): - data = "".join(map(str, data)) - reply_text += data - - # 如果是主动思考且内容为“沉默”,则不发送 - if is_proactive_thinking and data.strip() == "沉默": - logger.info(f"{self.context.log_prefix} 主动思考决定保持沉默,不发送消息") - continue - - # 发送第一段回复 - if not first_replied: - await send_api.text_to_stream( - text=data, - stream_id=self.context.stream_id, - reply_to_message=message_data, - set_reply=need_reply, - typing=False, - ) - first_replied = True - else: - # 发送后续回复 - sent_message = await send_api.text_to_stream( - text=data, - stream_id=self.context.stream_id, - reply_to_message=None, - set_reply=False, - typing=True, - ) - - return reply_text diff --git a/src/chat/chat_loop/sleep_manager/notification_sender.py b/src/chat/chat_loop/sleep_manager/notification_sender.py deleted file mode 100644 index 95ee304e9..000000000 --- a/src/chat/chat_loop/sleep_manager/notification_sender.py +++ /dev/null @@ -1,32 +0,0 @@ -from src.common.logger import get_logger -from ..hfc_context import HfcContext - -logger = get_logger("notification_sender") - - -class NotificationSender: - @staticmethod - async def send_goodnight_notification(context: HfcContext): - """发送晚安通知""" - try: - from ..proactive.events import ProactiveTriggerEvent - from ..proactive.proactive_thinker import ProactiveThinker - - event = ProactiveTriggerEvent(source="sleep_manager", reason="goodnight") - proactive_thinker = ProactiveThinker(context, context.chat_instance.cycle_processor) - await proactive_thinker.think(event) - except Exception as e: - logger.error(f"发送晚安通知失败: {e}") - - @staticmethod - async def send_insomnia_notification(context: HfcContext, reason: str): - """发送失眠通知""" - try: - from ..proactive.events import ProactiveTriggerEvent - from ..proactive.proactive_thinker import ProactiveThinker - - event = ProactiveTriggerEvent(source="sleep_manager", reason=reason) - proactive_thinker = ProactiveThinker(context, context.chat_instance.cycle_processor) - await proactive_thinker.think(event) - except Exception as e: - logger.error(f"发送失眠通知失败: {e}") \ No newline at end of file diff --git a/src/chat/chat_loop/sleep_manager/sleep_manager.py b/src/chat/chat_loop/sleep_manager/sleep_manager.py deleted file mode 100644 index 677555aef..000000000 --- a/src/chat/chat_loop/sleep_manager/sleep_manager.py +++ /dev/null @@ -1,304 +0,0 @@ -import asyncio -import random -from datetime import datetime, timedelta, date -from typing import Optional, TYPE_CHECKING - -from src.common.logger import get_logger -from src.config.config import global_config -from .sleep_state import SleepState, SleepStateSerializer -from .time_checker import TimeChecker -from .notification_sender import NotificationSender - -if TYPE_CHECKING: - from .wakeup_manager import WakeUpManager - -logger = get_logger("sleep_manager") - - -class SleepManager: - """ - 睡眠管理器,核心组件之一,负责管理角色的睡眠周期和状态转换。 - 它实现了一个状态机,根据预设的时间表、睡眠压力和随机因素, - 在不同的睡眠状态(如清醒、准备入睡、睡眠、失眠)之间进行切换。 - """ - def __init__(self): - """ - 初始化睡眠管理器。 - """ - self.time_checker = TimeChecker() # 时间检查器,用于判断当前是否处于理论睡眠时间 - self.last_sleep_log_time = 0 # 上次记录睡眠日志的时间戳 - self.sleep_log_interval = 35 # 睡眠日志记录间隔(秒) - - # --- 统一睡眠状态管理 --- - self._current_state: SleepState = SleepState.AWAKE # 当前睡眠状态 - self._sleep_buffer_end_time: Optional[datetime] = None # 睡眠缓冲结束时间,用于状态转换 - self._total_delayed_minutes_today: float = 0.0 # 今天总共延迟入睡的分钟数 - self._last_sleep_check_date: Optional[date] = None # 上次检查睡眠状态的日期 - self._last_fully_slept_log_time: float = 0 # 上次完全进入睡眠状态的时间戳 - self._re_sleep_attempt_time: Optional[datetime] = None # 被吵醒后,尝试重新入睡的时间点 - - # 从本地存储加载上一次的睡眠状态 - self._load_sleep_state() - - def get_current_sleep_state(self) -> SleepState: - """获取当前的睡眠状态。""" - return self._current_state - - def is_sleeping(self) -> bool: - """判断当前是否处于正在睡觉的状态。""" - return self._current_state == SleepState.SLEEPING - - async def update_sleep_state(self, wakeup_manager: Optional["WakeUpManager"] = None): - """ - 更新睡眠状态的核心方法,实现状态机的主要逻辑。 - 该方法会被周期性调用,以检查并更新当前的睡眠状态。 - - Args: - wakeup_manager (Optional["WakeUpManager"]): 唤醒管理器,用于获取睡眠压力等上下文信息。 - """ - # 如果全局禁用了睡眠系统,则强制设置为清醒状态并返回 - if not global_config.sleep_system.enable: - if self._current_state != SleepState.AWAKE: - logger.debug("睡眠系统禁用,强制设为 AWAKE") - self._current_state = SleepState.AWAKE - return - - now = datetime.now() - today = now.date() - - # 跨天处理:如果日期变化,重置每日相关的睡眠状态 - if self._last_sleep_check_date != today: - logger.info(f"新的一天 ({today}),重置睡眠状态。") - self._total_delayed_minutes_today = 0 - self._current_state = SleepState.AWAKE - self._sleep_buffer_end_time = None - self._last_sleep_check_date = today - self._save_sleep_state() - - # 检查当前是否处于理论上的睡眠时间段 - is_in_theoretical_sleep, activity = self.time_checker.is_in_theoretical_sleep_time(now.time()) - - # --- 状态机核心处理逻辑 --- - if self._current_state == SleepState.AWAKE: - if is_in_theoretical_sleep: - self._handle_awake_to_sleep(now, activity, wakeup_manager) - - elif self._current_state == SleepState.PREPARING_SLEEP: - self._handle_preparing_sleep(now, is_in_theoretical_sleep, wakeup_manager) - - elif self._current_state == SleepState.SLEEPING: - self._handle_sleeping(now, is_in_theoretical_sleep, activity, wakeup_manager) - - elif self._current_state == SleepState.INSOMNIA: - self._handle_insomnia(now, is_in_theoretical_sleep) - - elif self._current_state == SleepState.WOKEN_UP: - self._handle_woken_up(now, is_in_theoretical_sleep, wakeup_manager) - - def _handle_awake_to_sleep(self, now: datetime, activity: Optional[str], wakeup_manager: Optional["WakeUpManager"]): - """处理从“清醒”到“准备入睡”的状态转换。""" - if activity: - logger.info(f"进入理论休眠时间 '{activity}',开始进行睡眠决策...") - else: - logger.info("进入理论休眠时间,开始进行睡眠决策...") - - if global_config.sleep_system.enable_flexible_sleep: - # --- 新的弹性睡眠逻辑 --- - if wakeup_manager: - sleep_pressure = wakeup_manager.context.sleep_pressure - pressure_threshold = global_config.sleep_system.flexible_sleep_pressure_threshold - max_delay_minutes = global_config.sleep_system.max_sleep_delay_minutes - - buffer_seconds = 0 - # 如果睡眠压力低于阈值,则计算延迟时间 - if sleep_pressure <= pressure_threshold: - # 压力差,归一化到 (0, 1] - pressure_diff = (pressure_threshold - sleep_pressure) / pressure_threshold - # 延迟分钟数,压力越低,延迟越长 - delay_minutes = int(pressure_diff * max_delay_minutes) - - # 确保总延迟不超过当日最大值 - remaining_delay = max_delay_minutes - self._total_delayed_minutes_today - delay_minutes = min(delay_minutes, remaining_delay) - - if delay_minutes > 0: - # 增加一些随机性 - buffer_seconds = random.randint(int(delay_minutes * 0.8 * 60), int(delay_minutes * 1.2 * 60)) - self._total_delayed_minutes_today += buffer_seconds / 60.0 - logger.info(f"睡眠压力 ({sleep_pressure:.1f}) 较低,延迟 {buffer_seconds / 60:.1f} 分钟入睡。") - else: - # 延迟额度已用完,设置一个较短的准备时间 - buffer_seconds = random.randint(1 * 60, 2 * 60) - logger.info("今日延迟入睡额度已用完,进入短暂准备后入睡。") - else: - # 睡眠压力较高,设置一个较短的准备时间 - buffer_seconds = random.randint(1 * 60, 2 * 60) - logger.info(f"睡眠压力 ({sleep_pressure:.1f}) 较高,将在短暂准备后入睡。") - - # 发送睡前通知 - if global_config.sleep_system.enable_pre_sleep_notification: - asyncio.create_task(NotificationSender.send_goodnight_notification(wakeup_manager.context)) - - self._sleep_buffer_end_time = now + timedelta(seconds=buffer_seconds) - self._current_state = SleepState.PREPARING_SLEEP - logger.info(f"进入准备入睡状态,将在 {buffer_seconds / 60:.1f} 分钟内入睡。") - self._save_sleep_state() - else: - # 无法获取 wakeup_manager,退回旧逻辑 - buffer_seconds = random.randint(1 * 60, 3 * 60) - self._sleep_buffer_end_time = now + timedelta(seconds=buffer_seconds) - self._current_state = SleepState.PREPARING_SLEEP - logger.warning("无法获取 WakeUpManager,弹性睡眠采用默认1-3分钟延迟。") - self._save_sleep_state() - else: - # 非弹性睡眠模式 - if wakeup_manager and global_config.sleep_system.enable_pre_sleep_notification: - asyncio.create_task(NotificationSender.send_goodnight_notification(wakeup_manager.context)) - self._current_state = SleepState.SLEEPING - - - def _handle_preparing_sleep(self, now: datetime, is_in_theoretical_sleep: bool, wakeup_manager: Optional["WakeUpManager"]): - """处理“准备入睡”状态下的逻辑。""" - # 如果在准备期间离开了理论睡眠时间,则取消入睡 - if not is_in_theoretical_sleep: - logger.info("准备入睡期间离开理论休眠时间,取消入睡,恢复清醒。") - self._current_state = SleepState.AWAKE - self._sleep_buffer_end_time = None - self._save_sleep_state() - # 如果缓冲时间结束,则正式进入睡眠状态 - elif self._sleep_buffer_end_time and now >= self._sleep_buffer_end_time: - logger.info("睡眠缓冲期结束,正式进入休眠状态。") - self._current_state = SleepState.SLEEPING - self._last_fully_slept_log_time = now.timestamp() - - # 设置一个随机的延迟,用于触发“睡后失眠”检查 - delay_minutes_range = global_config.sleep_system.insomnia_trigger_delay_minutes - delay_minutes = random.randint(delay_minutes_range[0], delay_minutes_range[1]) - self._sleep_buffer_end_time = now + timedelta(minutes=delay_minutes) - logger.info(f"已设置睡后失眠检查,将在 {delay_minutes} 分钟后触发。") - - self._save_sleep_state() - - def _handle_sleeping(self, now: datetime, is_in_theoretical_sleep: bool, activity: Optional[str], wakeup_manager: Optional["WakeUpManager"]): - """处理“正在睡觉”状态下的逻辑。""" - # 如果理论睡眠时间结束,则自然醒来 - if not is_in_theoretical_sleep: - logger.info("理论休眠时间结束,自然醒来。") - self._current_state = SleepState.AWAKE - self._save_sleep_state() - # 检查是否到了触发“睡后失眠”的时间点 - elif self._sleep_buffer_end_time and now >= self._sleep_buffer_end_time: - if wakeup_manager: - sleep_pressure = wakeup_manager.context.sleep_pressure - pressure_threshold = global_config.sleep_system.flexible_sleep_pressure_threshold - # 检查是否触发失眠 - insomnia_reason = None - if sleep_pressure < pressure_threshold: - insomnia_reason = "low_pressure" - logger.info(f"睡眠压力 ({sleep_pressure:.1f}) 低于阈值 ({pressure_threshold}),触发睡后失眠。") - elif random.random() < getattr(global_config.sleep_system, "random_insomnia_chance", 0.1): - insomnia_reason = "random" - logger.info("随机触发失眠。") - - if insomnia_reason: - self._current_state = SleepState.INSOMNIA - - # 设置失眠的持续时间 - duration_minutes_range = global_config.sleep_system.insomnia_duration_minutes - duration_minutes = random.randint(*duration_minutes_range) - self._sleep_buffer_end_time = now + timedelta(minutes=duration_minutes) - - # 发送失眠通知 - asyncio.create_task(NotificationSender.send_insomnia_notification(wakeup_manager.context, insomnia_reason)) - logger.info(f"进入失眠状态 (原因: {insomnia_reason}),将持续 {duration_minutes} 分钟。") - else: - # 睡眠压力正常,不触发失眠,清除检查时间点 - logger.info(f"睡眠压力 ({sleep_pressure:.1f}) 正常,未触发睡后失眠。") - self._sleep_buffer_end_time = None - self._save_sleep_state() - else: - # 定期记录睡眠日志 - current_timestamp = now.timestamp() - if current_timestamp - self.last_sleep_log_time > self.sleep_log_interval and activity: - logger.info(f"当前处于休眠活动 '{activity}' 中。") - self.last_sleep_log_time = current_timestamp - - def _handle_insomnia(self, now: datetime, is_in_theoretical_sleep: bool): - """处理“失眠”状态下的逻辑。""" - # 如果离开理论睡眠时间,则失眠结束 - if not is_in_theoretical_sleep: - logger.info("已离开理论休眠时间,失眠结束,恢复清醒。") - self._current_state = SleepState.AWAKE - self._sleep_buffer_end_time = None - self._save_sleep_state() - # 如果失眠持续时间已过,则恢复睡眠 - elif self._sleep_buffer_end_time and now >= self._sleep_buffer_end_time: - logger.info("失眠状态持续时间已过,恢复睡眠。") - self._current_state = SleepState.SLEEPING - self._sleep_buffer_end_time = None - self._save_sleep_state() - - def _handle_woken_up(self, now: datetime, is_in_theoretical_sleep: bool, wakeup_manager: Optional["WakeUpManager"]): - """处理“被吵醒”状态下的逻辑。""" - # 如果理论睡眠时间结束,则状态自动结束 - if not is_in_theoretical_sleep: - logger.info("理论休眠时间结束,被吵醒的状态自动结束。") - self._current_state = SleepState.AWAKE - self._re_sleep_attempt_time = None - self._save_sleep_state() - # 到了尝试重新入睡的时间点 - elif self._re_sleep_attempt_time and now >= self._re_sleep_attempt_time: - logger.info("被吵醒后经过一段时间,尝试重新入睡...") - if wakeup_manager: - sleep_pressure = wakeup_manager.context.sleep_pressure - pressure_threshold = global_config.sleep_system.flexible_sleep_pressure_threshold - - # 如果睡眠压力足够,则尝试重新入睡 - if sleep_pressure >= pressure_threshold: - logger.info("睡眠压力足够,从被吵醒状态转换到准备入睡。") - buffer_seconds = random.randint(3 * 60, 8 * 60) - self._sleep_buffer_end_time = now + timedelta(seconds=buffer_seconds) - self._current_state = SleepState.PREPARING_SLEEP - self._re_sleep_attempt_time = None - else: - # 睡眠压力不足,延迟一段时间后再次尝试 - delay_minutes = 15 - self._re_sleep_attempt_time = now + timedelta(minutes=delay_minutes) - logger.info( - f"睡眠压力({sleep_pressure:.1f})仍然较低,暂时保持清醒,在 {delay_minutes} 分钟后再次尝试。" - ) - self._save_sleep_state() - - def reset_sleep_state_after_wakeup(self): - """ - 当角色被用户消息等外部因素唤醒时调用此方法。 - 将状态强制转换为 WOKEN_UP,并设置一个延迟,之后会尝试重新入睡。 - """ - if self._current_state in [SleepState.PREPARING_SLEEP, SleepState.SLEEPING, SleepState.INSOMNIA]: - logger.info("被唤醒,进入 WOKEN_UP 状态!") - self._current_state = SleepState.WOKEN_UP - self._sleep_buffer_end_time = None - re_sleep_delay_minutes = getattr(global_config.sleep_system, "re_sleep_delay_minutes", 10) - self._re_sleep_attempt_time = datetime.now() + timedelta(minutes=re_sleep_delay_minutes) - logger.info(f"将在 {re_sleep_delay_minutes} 分钟后尝试重新入睡。") - self._save_sleep_state() - - def _save_sleep_state(self): - """将当前所有睡眠相关的状态打包并保存到本地存储。""" - state_data = { - "_current_state": self._current_state, - "_sleep_buffer_end_time": self._sleep_buffer_end_time, - "_total_delayed_minutes_today": self._total_delayed_minutes_today, - "_last_sleep_check_date": self._last_sleep_check_date, - "_re_sleep_attempt_time": self._re_sleep_attempt_time, - } - SleepStateSerializer.save(state_data) - - def _load_sleep_state(self): - """从本地存储加载并恢复所有睡眠相关的状态。""" - state_data = SleepStateSerializer.load() - self._current_state = state_data["_current_state"] - self._sleep_buffer_end_time = state_data["_sleep_buffer_end_time"] - self._total_delayed_minutes_today = state_data["_total_delayed_minutes_today"] - self._last_sleep_check_date = state_data["_last_sleep_check_date"] - self._re_sleep_attempt_time = state_data["_re_sleep_attempt_time"] diff --git a/src/chat/chat_loop/sleep_manager/sleep_state.py b/src/chat/chat_loop/sleep_manager/sleep_state.py deleted file mode 100644 index 624521ea0..000000000 --- a/src/chat/chat_loop/sleep_manager/sleep_state.py +++ /dev/null @@ -1,110 +0,0 @@ -from enum import Enum, auto -from datetime import datetime -from src.common.logger import get_logger -from src.manager.local_store_manager import local_storage - -logger = get_logger("sleep_state") - - -class SleepState(Enum): - """ - 定义了角色可能处于的几种睡眠状态。 - 这是一个状态机,用于管理角色的睡眠周期。 - """ - - AWAKE = auto() # 清醒状态 - INSOMNIA = auto() # 失眠状态 - PREPARING_SLEEP = auto() # 准备入睡状态,一个短暂的过渡期 - SLEEPING = auto() # 正在睡觉状态 - WOKEN_UP = auto() # 被吵醒状态 - - -class SleepStateSerializer: - """ - 睡眠状态序列化器。 - 负责将内存中的睡眠状态对象持久化到本地存储(如JSON文件), - 以及在程序启动时从本地存储中恢复状态。 - 这样可以确保即使程序重启,角色的睡眠状态也能得以保留。 - """ - @staticmethod - def save(state_data: dict): - """ - 将当前的睡眠状态数据保存到本地存储。 - - Args: - state_data (dict): 包含睡眠状态信息的字典。 - datetime对象会被转换为时间戳,Enum成员会被转换为其名称字符串。 - """ - try: - # 准备要序列化的数据字典 - state = { - # 保存当前状态的枚举名称 - "current_state": state_data["_current_state"].name, - # 将datetime对象转换为Unix时间戳以便序列化 - "sleep_buffer_end_time_ts": state_data["_sleep_buffer_end_time"].timestamp() - if state_data["_sleep_buffer_end_time"] - else None, - "total_delayed_minutes_today": state_data["_total_delayed_minutes_today"], - # 将date对象转换为ISO格式的字符串 - "last_sleep_check_date_str": state_data["_last_sleep_check_date"].isoformat() - if state_data["_last_sleep_check_date"] - else None, - "re_sleep_attempt_time_ts": state_data["_re_sleep_attempt_time"].timestamp() - if state_data["_re_sleep_attempt_time"] - else None, - } - # 写入本地存储 - local_storage["schedule_sleep_state"] = state - logger.debug(f"已保存睡眠状态: {state}") - except Exception as e: - logger.error(f"保存睡眠状态失败: {e}") - - @staticmethod - def load() -> dict: - """ - 从本地存储加载并解析睡眠状态。 - - Returns: - dict: 包含恢复后睡眠状态信息的字典。 - 如果加载失败或没有找到数据,则返回一个默认的清醒状态。 - """ - # 定义一个默认的状态,以防加载失败 - state_data = { - "_current_state": SleepState.AWAKE, - "_sleep_buffer_end_time": None, - "_total_delayed_minutes_today": 0, - "_last_sleep_check_date": None, - "_re_sleep_attempt_time": None, - } - try: - # 从本地存储读取数据 - state = local_storage["schedule_sleep_state"] - if state and isinstance(state, dict): - # 恢复当前状态枚举 - state_name = state.get("current_state") - if state_name and hasattr(SleepState, state_name): - state_data["_current_state"] = SleepState[state_name] - - # 从时间戳恢复datetime对象 - end_time_ts = state.get("sleep_buffer_end_time_ts") - if end_time_ts: - state_data["_sleep_buffer_end_time"] = datetime.fromtimestamp(end_time_ts) - - # 恢复重新入睡尝试时间 - re_sleep_ts = state.get("re_sleep_attempt_time_ts") - if re_sleep_ts: - state_data["_re_sleep_attempt_time"] = datetime.fromtimestamp(re_sleep_ts) - - # 恢复今日延迟睡眠总分钟数 - state_data["_total_delayed_minutes_today"] = state.get("total_delayed_minutes_today", 0) - - # 从ISO格式字符串恢复date对象 - date_str = state.get("last_sleep_check_date_str") - if date_str: - state_data["_last_sleep_check_date"] = datetime.fromisoformat(date_str).date() - - logger.info(f"成功从本地存储加载睡眠状态: {state}") - except Exception as e: - # 如果加载过程中出现任何问题,记录警告并返回默认状态 - logger.warning(f"加载睡眠状态失败,将使用默认值: {e}") - return state_data \ No newline at end of file diff --git a/src/chat/chat_loop/sleep_manager/time_checker.py b/src/chat/chat_loop/sleep_manager/time_checker.py deleted file mode 100644 index cbe3d45e8..000000000 --- a/src/chat/chat_loop/sleep_manager/time_checker.py +++ /dev/null @@ -1,108 +0,0 @@ -from datetime import datetime, time, timedelta -from typing import Optional, List, Dict, Any -import random - -from src.common.logger import get_logger -from src.config.config import global_config -from src.schedule.schedule_manager import schedule_manager - -logger = get_logger("time_checker") - - -class TimeChecker: - def __init__(self): - # 缓存当天的偏移量,确保一天内使用相同的偏移量 - self._daily_sleep_offset: int = 0 - self._daily_wake_offset: int = 0 - self._offset_date = None - - def _get_daily_offsets(self): - """获取当天的睡眠和起床时间偏移量,每天生成一次""" - today = datetime.now().date() - - # 如果是新的一天,重新生成偏移量 - if self._offset_date != today: - sleep_offset_range = global_config.sleep_system.sleep_time_offset_minutes - wake_offset_range = global_config.sleep_system.wake_up_time_offset_minutes - - # 生成 ±offset_range 范围内的随机偏移量 - self._daily_sleep_offset = random.randint(-sleep_offset_range, sleep_offset_range) - self._daily_wake_offset = random.randint(-wake_offset_range, wake_offset_range) - self._offset_date = today - - logger.debug(f"生成新的每日偏移量 - 睡觉时间偏移: {self._daily_sleep_offset}分钟, 起床时间偏移: {self._daily_wake_offset}分钟") - - return self._daily_sleep_offset, self._daily_wake_offset - - def get_today_schedule(self) -> Optional[List[Dict[str, Any]]]: - """从全局 ScheduleManager 获取今天的日程安排。""" - return schedule_manager.today_schedule - - def is_in_theoretical_sleep_time(self, now_time: time) -> tuple[bool, Optional[str]]: - if global_config.sleep_system.sleep_by_schedule: - if self.get_today_schedule(): - return self._is_in_schedule_sleep_time(now_time) - else: - return self._is_in_sleep_time(now_time) - else: - return self._is_in_sleep_time(now_time) - - def _is_in_schedule_sleep_time(self, now_time: time) -> tuple[bool, Optional[str]]: - """检查当前时间是否落在日程表的任何一个睡眠活动中""" - sleep_keywords = ["休眠", "睡觉", "梦乡"] - today_schedule = self.get_today_schedule() - if today_schedule: - for event in today_schedule: - try: - activity = event.get("activity", "").strip() - time_range = event.get("time_range") - - if not activity or not time_range: - continue - - if any(keyword in activity for keyword in sleep_keywords): - start_str, end_str = time_range.split("-") - start_time = datetime.strptime(start_str.strip(), "%H:%M").time() - end_time = datetime.strptime(end_str.strip(), "%H:%M").time() - - if start_time <= end_time: # 同一天 - if start_time <= now_time < end_time: - return True, activity - else: # 跨天 - if now_time >= start_time or now_time < end_time: - return True, activity - except (ValueError, KeyError, AttributeError) as e: - logger.warning(f"解析日程事件时出错: {event}, 错误: {e}") - continue - return False, None - - def _is_in_sleep_time(self, now_time: time) -> tuple[bool, Optional[str]]: - """检查当前时间是否在固定的睡眠时间内(应用偏移量)""" - try: - start_time_str = global_config.sleep_system.fixed_sleep_time - end_time_str = global_config.sleep_system.fixed_wake_up_time - - # 获取当天的偏移量 - sleep_offset, wake_offset = self._get_daily_offsets() - - # 解析基础时间 - base_start_time = datetime.strptime(start_time_str, "%H:%M") - base_end_time = datetime.strptime(end_time_str, "%H:%M") - - # 应用偏移量 - actual_start_time = (base_start_time + timedelta(minutes=sleep_offset)).time() - actual_end_time = (base_end_time + timedelta(minutes=wake_offset)).time() - - logger.debug(f"固定睡眠时间检查 - 基础时间: {start_time_str}-{end_time_str}, " - f"偏移后时间: {actual_start_time.strftime('%H:%M')}-{actual_end_time.strftime('%H:%M')}, " - f"当前时间: {now_time.strftime('%H:%M')}") - - if actual_start_time <= actual_end_time: - if actual_start_time <= now_time < actual_end_time: - return True, f"固定睡眠时间(偏移后: {actual_start_time.strftime('%H:%M')}-{actual_end_time.strftime('%H:%M')})" - else: - if now_time >= actual_start_time or now_time < actual_end_time: - return True, f"固定睡眠时间(偏移后: {actual_start_time.strftime('%H:%M')}-{actual_end_time.strftime('%H:%M')})" - except ValueError as e: - logger.error(f"固定的睡眠时间格式不正确,请使用 HH:MM 格式: {e}") - return False, None \ No newline at end of file diff --git a/src/chat/chat_loop/sleep_manager/wakeup_manager.py b/src/chat/chat_loop/sleep_manager/wakeup_manager.py deleted file mode 100644 index 28c91dd3d..000000000 --- a/src/chat/chat_loop/sleep_manager/wakeup_manager.py +++ /dev/null @@ -1,232 +0,0 @@ -import asyncio -import time -from typing import Optional -from src.common.logger import get_logger -from src.config.config import global_config -from src.manager.local_store_manager import local_storage -from ..hfc_context import HfcContext - -logger = get_logger("wakeup") - - -class WakeUpManager: - def __init__(self, context: HfcContext): - """ - 初始化唤醒度管理器 - - Args: - context: HFC聊天上下文对象 - - 功能说明: - - 管理休眠状态下的唤醒度累积 - - 处理唤醒度的自然衰减 - - 控制愤怒状态的持续时间 - """ - self.context = context - self.wakeup_value = 0.0 # 当前唤醒度 - self.is_angry = False # 是否处于愤怒状态 - self.angry_start_time = 0.0 # 愤怒状态开始时间 - self.last_decay_time = time.time() # 上次衰减时间 - self._decay_task: Optional[asyncio.Task] = None - self.last_log_time = 0 - self.log_interval = 30 - - # 从配置文件获取参数 - sleep_config = global_config.sleep_system - self.wakeup_threshold = sleep_config.wakeup_threshold - self.private_message_increment = sleep_config.private_message_increment - self.group_mention_increment = sleep_config.group_mention_increment - self.decay_rate = sleep_config.decay_rate - self.decay_interval = sleep_config.decay_interval - self.angry_duration = sleep_config.angry_duration - self.enabled = sleep_config.enable - self.angry_prompt = sleep_config.angry_prompt - - self._load_wakeup_state() - - def _get_storage_key(self) -> str: - """获取当前聊天流的本地存储键""" - return f"wakeup_manager_state_{self.context.stream_id}" - - def _load_wakeup_state(self): - """从本地存储加载状态""" - state = local_storage[self._get_storage_key()] - if state and isinstance(state, dict): - self.wakeup_value = state.get("wakeup_value", 0.0) - self.is_angry = state.get("is_angry", False) - self.angry_start_time = state.get("angry_start_time", 0.0) - logger.info(f"{self.context.log_prefix} 成功从本地存储加载唤醒状态: {state}") - else: - logger.info(f"{self.context.log_prefix} 未找到本地唤醒状态,将使用默认值初始化。") - - def _save_wakeup_state(self): - """将当前状态保存到本地存储""" - state = { - "wakeup_value": self.wakeup_value, - "is_angry": self.is_angry, - "angry_start_time": self.angry_start_time, - } - local_storage[self._get_storage_key()] = state - logger.debug(f"{self.context.log_prefix} 已将唤醒状态保存到本地存储: {state}") - - async def start(self): - """启动唤醒度管理器""" - if not self.enabled: - logger.info(f"{self.context.log_prefix} 唤醒度系统已禁用,跳过启动") - return - - if not self._decay_task: - self._decay_task = asyncio.create_task(self._decay_loop()) - self._decay_task.add_done_callback(self._handle_decay_completion) - logger.info(f"{self.context.log_prefix} 唤醒度管理器已启动") - - async def stop(self): - """停止唤醒度管理器""" - if self._decay_task and not self._decay_task.done(): - self._decay_task.cancel() - await asyncio.sleep(0) - logger.info(f"{self.context.log_prefix} 唤醒度管理器已停止") - - def _handle_decay_completion(self, task: asyncio.Task): - """处理衰减任务完成""" - try: - if exception := task.exception(): - logger.error(f"{self.context.log_prefix} 唤醒度衰减任务异常: {exception}") - else: - logger.info(f"{self.context.log_prefix} 唤醒度衰减任务正常结束") - except asyncio.CancelledError: - logger.info(f"{self.context.log_prefix} 唤醒度衰减任务被取消") - - async def _decay_loop(self): - """唤醒度衰减循环""" - while self.context.running: - await asyncio.sleep(self.decay_interval) - - current_time = time.time() - - # 检查愤怒状态是否过期 - if self.is_angry and current_time - self.angry_start_time >= self.angry_duration: - self.is_angry = False - # 通知情绪管理系统清除愤怒状态 - from src.mood.mood_manager import mood_manager - - mood_manager.clear_angry_from_wakeup(self.context.stream_id) - logger.info(f"{self.context.log_prefix} 愤怒状态结束,恢复正常") - self._save_wakeup_state() - - # 唤醒度自然衰减 - if self.wakeup_value > 0: - old_value = self.wakeup_value - self.wakeup_value = max(0, self.wakeup_value - self.decay_rate) - if old_value != self.wakeup_value: - logger.debug(f"{self.context.log_prefix} 唤醒度衰减: {old_value:.1f} -> {self.wakeup_value:.1f}") - self._save_wakeup_state() - - def add_wakeup_value(self, is_private_chat: bool, is_mentioned: bool = False) -> bool: - """ - 增加唤醒度值 - - Args: - is_private_chat: 是否为私聊 - is_mentioned: 是否被艾特(仅群聊有效) - - Returns: - bool: 是否达到唤醒阈值 - """ - # 如果系统未启用,直接返回 - if not self.enabled: - return False - - # 只有在休眠且非失眠状态下才累积唤醒度 - from .sleep_state import SleepState - - sleep_manager = self.context.sleep_manager - if not sleep_manager: - return False - - current_sleep_state = sleep_manager.get_current_sleep_state() - if current_sleep_state != SleepState.SLEEPING: - return False - - old_value = self.wakeup_value - - if is_private_chat: - # 私聊每条消息都增加唤醒度 - self.wakeup_value += self.private_message_increment - logger.debug(f"{self.context.log_prefix} 私聊消息增加唤醒度: +{self.private_message_increment}") - elif is_mentioned: - # 群聊只有被艾特才增加唤醒度 - self.wakeup_value += self.group_mention_increment - logger.debug(f"{self.context.log_prefix} 群聊艾特增加唤醒度: +{self.group_mention_increment}") - else: - # 群聊未被艾特,不增加唤醒度 - return False - - current_time = time.time() - if current_time - self.last_log_time > self.log_interval: - logger.info( - f"{self.context.log_prefix} 唤醒度变化: {old_value:.1f} -> {self.wakeup_value:.1f} (阈值: {self.wakeup_threshold})" - ) - self.last_log_time = current_time - else: - logger.debug( - f"{self.context.log_prefix} 唤醒度变化: {old_value:.1f} -> {self.wakeup_value:.1f} (阈值: {self.wakeup_threshold})" - ) - - # 检查是否达到唤醒阈值 - if self.wakeup_value >= self.wakeup_threshold: - self._trigger_wakeup() - return True - - self._save_wakeup_state() - return False - - def _trigger_wakeup(self): - """触发唤醒,进入愤怒状态""" - self.is_angry = True - self.angry_start_time = time.time() - self.wakeup_value = 0.0 # 重置唤醒度 - - self._save_wakeup_state() - - # 通知情绪管理系统进入愤怒状态 - from src.mood.mood_manager import mood_manager - - mood_manager.set_angry_from_wakeup(self.context.stream_id) - - # 通知SleepManager重置睡眠状态 - if self.context.sleep_manager: - self.context.sleep_manager.reset_sleep_state_after_wakeup() - - logger.info(f"{self.context.log_prefix} 唤醒度达到阈值({self.wakeup_threshold}),被吵醒进入愤怒状态!") - - def get_angry_prompt_addition(self) -> str: - """获取愤怒状态下的提示词补充""" - if self.is_angry: - return self.angry_prompt - return "" - - def is_in_angry_state(self) -> bool: - """检查是否处于愤怒状态""" - if self.is_angry: - current_time = time.time() - if current_time - self.angry_start_time >= self.angry_duration: - self.is_angry = False - # 通知情绪管理系统清除愤怒状态 - from src.mood.mood_manager import mood_manager - - mood_manager.clear_angry_from_wakeup(self.context.stream_id) - logger.info(f"{self.context.log_prefix} 愤怒状态自动过期") - return False - return self.is_angry - - def get_status_info(self) -> dict: - """获取当前状态信息""" - return { - "wakeup_value": self.wakeup_value, - "wakeup_threshold": self.wakeup_threshold, - "is_angry": self.is_angry, - "angry_remaining_time": max(0, self.angry_duration - (time.time() - self.angry_start_time)) - if self.is_angry - else 0, - } diff --git a/src/chat/frequency_analyzer/trigger.py b/src/chat/frequency_analyzer/trigger.py index d62547306..1558c923a 100644 --- a/src/chat/frequency_analyzer/trigger.py +++ b/src/chat/frequency_analyzer/trigger.py @@ -20,9 +20,8 @@ from datetime import datetime from typing import Dict, Optional from src.common.logger import get_logger -from src.chat.chat_loop.proactive.events import ProactiveTriggerEvent -from src.chat.heart_flow.heartflow import heartflow -from src.chat.chat_loop.sleep_manager.sleep_manager import SleepManager +from src.chat.affinity_flow.afc_manager import afc_manager +# TODO: 需要重新实现主动思考和睡眠管理功能 from .analyzer import chat_frequency_analyzer logger = get_logger("FrequencyBasedTrigger") @@ -39,8 +38,8 @@ class FrequencyBasedTrigger: 一个周期性任务,根据聊天频率分析结果来触发主动思考。 """ - def __init__(self, sleep_manager: SleepManager): - self._sleep_manager = sleep_manager + def __init__(self): + # TODO: 需要重新实现睡眠管理器 self._task: Optional[asyncio.Task] = None # 记录上次为用户触发的时间,用于冷却控制 # 格式: { "chat_id": timestamp } @@ -53,14 +52,15 @@ class FrequencyBasedTrigger: await asyncio.sleep(TRIGGER_CHECK_INTERVAL_SECONDS) logger.debug("开始执行频率触发器检查...") - # 1. 检查角色是否清醒 - if self._sleep_manager.is_sleeping(): - logger.debug("角色正在睡眠,跳过本次频率触发检查。") - continue + # 1. TODO: 检查角色是否清醒 - 需要重新实现睡眠状态检查 + # 暂时跳过睡眠检查 + # if self._sleep_manager.is_sleeping(): + # logger.debug("角色正在睡眠,跳过本次频率触发检查。") + # continue # 2. 获取所有已知的聊天ID - # 【注意】这里我们假设所有 subheartflow 的 ID 就是 chat_id - all_chat_ids = list(heartflow.subheartflows.keys()) + # 亲和力流系统中聊天ID直接从管理器获取 + all_chat_ids = list(afc_manager.affinity_flow_chatters.keys()) if not all_chat_ids: continue @@ -75,25 +75,24 @@ class FrequencyBasedTrigger: # 4. 检查当前是否是该用户的高峰聊天时间 if chat_frequency_analyzer.is_in_peak_time(chat_id, now): - sub_heartflow = await heartflow.get_or_create_subheartflow(chat_id) - if not sub_heartflow: - logger.warning(f"无法为 {chat_id} 获取或创建 sub_heartflow。") + # 5. 检查用户当前是否已有活跃的处理任务 + # 亲和力流系统不直接提供循环状态,通过检查最后活动时间来判断是否忙碌 + chatter = afc_manager.get_or_create_chatter(chat_id) + if not chatter: + logger.warning(f"无法为 {chat_id} 获取或创建亲和力聊天处理器。") continue - # 5. 检查用户当前是否已有活跃的思考或回复任务 - cycle_detail = sub_heartflow.heart_fc_instance.context.current_cycle_detail - if cycle_detail and not cycle_detail.end_time: - logger.debug(f"用户 {chat_id} 的聊天循环正忙(仍在周期 {cycle_detail.cycle_id} 中),本次不触发。") + # 检查是否在活跃状态(最近1分钟内有活动) + current_time = time.time() + if current_time - chatter.get_activity_time() < 60: + logger.debug(f"用户 {chat_id} 的亲和力处理器正忙,本次不触发。") continue - - logger.info(f"检测到用户 {chat_id} 处于聊天高峰期,且聊天循环空闲,准备触发主动思考。") + + logger.info(f"检测到用户 {chat_id} 处于聊天高峰期,且处理器空闲,准备触发主动思考。") - # 6. 直接调用 proactive_thinker - event = ProactiveTriggerEvent( - source="frequency_analyzer", - reason="User is in a high-frequency chat period." - ) - await sub_heartflow.heart_fc_instance.proactive_thinker.think(event) + # 6. TODO: 亲和力流系统的主动思考机制需要另行实现 + # 目前先记录日志,等待后续实现 + logger.info(f"用户 {chat_id} 处于高峰期,但亲和力流的主动思考功能暂未实现") # 7. 更新触发时间,进入冷却 self._last_triggered[chat_id] = time.time() diff --git a/src/chat/heart_flow/heartflow.py b/src/chat/heart_flow/heartflow.py deleted file mode 100644 index 111b37e64..000000000 --- a/src/chat/heart_flow/heartflow.py +++ /dev/null @@ -1,40 +0,0 @@ -import traceback -from typing import Any, Optional, Dict - -from src.common.logger import get_logger -from src.chat.heart_flow.sub_heartflow import SubHeartflow -from src.chat.message_receive.chat_stream import get_chat_manager - -logger = get_logger("heartflow") - - -class Heartflow: - """主心流协调器,负责初始化并协调聊天""" - - def __init__(self): - self.subheartflows: Dict[Any, "SubHeartflow"] = {} - - async def get_or_create_subheartflow(self, subheartflow_id: Any) -> Optional["SubHeartflow"]: - """获取或创建一个新的SubHeartflow实例""" - if subheartflow_id in self.subheartflows: - if subflow := self.subheartflows.get(subheartflow_id): - return subflow - - try: - new_subflow = SubHeartflow(subheartflow_id) - - await new_subflow.initialize() - - # 注册子心流 - self.subheartflows[subheartflow_id] = new_subflow - heartflow_name = get_chat_manager().get_stream_name(subheartflow_id) or subheartflow_id - logger.info(f"[{heartflow_name}] 开始接收消息") - - return new_subflow - except Exception as e: - logger.error(f"创建子心流 {subheartflow_id} 失败: {e}", exc_info=True) - traceback.print_exc() - return None - - -heartflow = Heartflow() diff --git a/src/chat/heart_flow/heartflow_message_processor.py b/src/chat/heart_flow/heartflow_message_processor.py deleted file mode 100644 index c68df532c..000000000 --- a/src/chat/heart_flow/heartflow_message_processor.py +++ /dev/null @@ -1,180 +0,0 @@ -import asyncio -import re -import math -import traceback -from datetime import datetime - -from typing import Tuple, TYPE_CHECKING - -from src.config.config import global_config -from src.chat.memory_system.Hippocampus import hippocampus_manager -from src.chat.message_receive.message import MessageRecv -from src.chat.message_receive.storage import MessageStorage -from src.chat.heart_flow.heartflow import heartflow -from src.chat.utils.utils import is_mentioned_bot_in_message -from src.chat.utils.timer_calculator import Timer -from src.chat.utils.chat_message_builder import replace_user_references_sync -from src.common.logger import get_logger -from src.person_info.relationship_manager import get_relationship_manager -from src.mood.mood_manager import mood_manager - -if TYPE_CHECKING: - from src.chat.heart_flow.sub_heartflow import SubHeartflow - -logger = get_logger("chat") - - -async def _process_relationship(message: MessageRecv) -> None: - """处理用户关系逻辑 - - Args: - message: 消息对象,包含用户信息 - """ - platform = message.message_info.platform - user_id = message.message_info.user_info.user_id # type: ignore - nickname = message.message_info.user_info.user_nickname # type: ignore - cardname = message.message_info.user_info.user_cardname or nickname # type: ignore - - relationship_manager = get_relationship_manager() - is_known = await relationship_manager.is_known_some_one(platform, user_id) - - if not is_known: - logger.info(f"首次认识用户: {nickname}") - await relationship_manager.first_knowing_some_one(platform, user_id, nickname, cardname) # type: ignore - - -async def _calculate_interest(message: MessageRecv) -> Tuple[float, bool, list[str]]: - """计算消息的兴趣度 - - Args: - message: 待处理的消息对象 - - Returns: - Tuple[float, bool, list[str]]: (兴趣度, 是否被提及, 关键词) - """ - is_mentioned, _ = is_mentioned_bot_in_message(message) - interested_rate = 0.0 - - with Timer("记忆激活"): - interested_rate, keywords = await hippocampus_manager.get_activate_from_text( - message.processed_plain_text, - max_depth=4, - fast_retrieval=False, - ) - message.key_words = keywords - message.key_words_lite = keywords - logger.debug(f"记忆激活率: {interested_rate:.2f}, 关键词: {keywords}") - - text_len = len(message.processed_plain_text) - # 根据文本长度分布调整兴趣度,采用分段函数实现更精确的兴趣度计算 - # 基于实际分布:0-5字符(26.57%), 6-10字符(27.18%), 11-20字符(22.76%), 21-30字符(10.33%), 31+字符(13.86%) - - if text_len == 0: - base_interest = 0.01 # 空消息最低兴趣度 - elif text_len <= 5: - # 1-5字符:线性增长 0.01 -> 0.03 - base_interest = 0.01 + (text_len - 1) * (0.03 - 0.01) / 4 - elif text_len <= 10: - # 6-10字符:线性增长 0.03 -> 0.06 - base_interest = 0.03 + (text_len - 5) * (0.06 - 0.03) / 5 - elif text_len <= 20: - # 11-20字符:线性增长 0.06 -> 0.12 - base_interest = 0.06 + (text_len - 10) * (0.12 - 0.06) / 10 - elif text_len <= 30: - # 21-30字符:线性增长 0.12 -> 0.18 - base_interest = 0.12 + (text_len - 20) * (0.18 - 0.12) / 10 - elif text_len <= 50: - # 31-50字符:线性增长 0.18 -> 0.22 - base_interest = 0.18 + (text_len - 30) * (0.22 - 0.18) / 20 - elif text_len <= 100: - # 51-100字符:线性增长 0.22 -> 0.26 - base_interest = 0.22 + (text_len - 50) * (0.26 - 0.22) / 50 - else: - # 100+字符:对数增长 0.26 -> 0.3,增长率递减 - base_interest = 0.26 + (0.3 - 0.26) * (math.log10(text_len - 99) / math.log10(901)) # 1000-99=901 - - # 确保在范围内 - base_interest = min(max(base_interest, 0.01), 0.3) - - interested_rate += base_interest - - if is_mentioned: - interest_increase_on_mention = 1 - interested_rate += interest_increase_on_mention - - return interested_rate, is_mentioned, keywords - - -class HeartFCMessageReceiver: - """心流处理器,负责处理接收到的消息并计算兴趣度""" - - def __init__(self): - """初始化心流处理器,创建消息存储实例""" - self.storage = MessageStorage() - - async def process_message(self, message: MessageRecv) -> None: - """处理接收到的原始消息数据 - - 主要流程: - 1. 消息解析与初始化 - 2. 消息缓冲处理 - 4. 过滤检查 - 5. 兴趣度计算 - 6. 关系处理 - - Args: - message_data: 原始消息字符串 - """ - try: - # 1. 消息解析与初始化 - userinfo = message.message_info.user_info - chat = message.chat_stream - - # 2. 兴趣度计算与更新 - interested_rate, is_mentioned, keywords = await _calculate_interest(message) - message.interest_value = interested_rate - message.is_mentioned = is_mentioned - - await self.storage.store_message(message, chat) - - subheartflow: SubHeartflow = await heartflow.get_or_create_subheartflow(chat.stream_id) # type: ignore - - # subheartflow.add_message_to_normal_chat_cache(message, interested_rate, is_mentioned) - if global_config.mood.enable_mood: - chat_mood = mood_manager.get_mood_by_chat_id(subheartflow.chat_id) - asyncio.create_task(chat_mood.update_mood_by_message(message, interested_rate)) - - # 3. 日志记录 - mes_name = chat.group_info.group_name if chat.group_info else "私聊" - # current_time = time.strftime("%H:%M:%S", time.localtime(message.message_info.time)) - current_talk_frequency = global_config.chat.get_current_talk_frequency(chat.stream_id) - - # 如果消息中包含图片标识,则将 [picid:...] 替换为 [图片] - picid_pattern = r"\[picid:([^\]]+)\]" - processed_plain_text = re.sub(picid_pattern, "[图片]", message.processed_plain_text) - - # 应用用户引用格式替换,将回复和@格式转换为可读格式 - processed_plain_text = replace_user_references_sync( - processed_plain_text, - message.message_info.platform, # type: ignore - replace_bot_name=True, - ) - - if keywords: - logger.info( - f"[{mes_name}]{userinfo.user_nickname}:{processed_plain_text}[兴趣度:{interested_rate:.2f}][关键词:{keywords}]" - ) # type: ignore - else: - logger.info( - f"[{mes_name}]{userinfo.user_nickname}:{processed_plain_text}[兴趣度:{interested_rate:.2f}]" - ) # type: ignore - - logger.debug(f"[{mes_name}][当前时段回复频率: {current_talk_frequency}]") - - # 4. 关系处理 - if global_config.relationship.enable_relationship: - await _process_relationship(message) - - except Exception as e: - logger.error(f"消息处理失败: {e}") - print(traceback.format_exc()) diff --git a/src/chat/heart_flow/sub_heartflow.py b/src/chat/heart_flow/sub_heartflow.py deleted file mode 100644 index 275a25a57..000000000 --- a/src/chat/heart_flow/sub_heartflow.py +++ /dev/null @@ -1,41 +0,0 @@ -from rich.traceback import install - -from src.common.logger import get_logger -from src.chat.message_receive.chat_stream import get_chat_manager -from src.chat.chat_loop.heartFC_chat import HeartFChatting -from src.chat.utils.utils import get_chat_type_and_target_info - -logger = get_logger("sub_heartflow") - -install(extra_lines=3) - - -class SubHeartflow: - def __init__( - self, - subheartflow_id, - ): - """子心流初始化函数 - - Args: - subheartflow_id: 子心流唯一标识符 - """ - # 基础属性,两个值是一样的 - self.subheartflow_id = subheartflow_id - self.chat_id = subheartflow_id - - self.is_group_chat, self.chat_target_info = get_chat_type_and_target_info(self.chat_id) - self.log_prefix = get_chat_manager().get_stream_name(self.subheartflow_id) or self.subheartflow_id - - # focus模式退出冷却时间管理 - self.last_focus_exit_time: float = 0 # 上次退出focus模式的时间 - - # 随便水群 normal_chat 和 认真水群 focus_chat 实例 - # CHAT模式激活 随便水群 FOCUS模式激活 认真水群 - self.heart_fc_instance: HeartFChatting = HeartFChatting( - chat_id=self.subheartflow_id, - ) # 该sub_heartflow的HeartFChatting实例 - - async def initialize(self): - """异步初始化方法,创建兴趣流并确定聊天类型""" - await self.heart_fc_instance.start() diff --git a/src/chat/message_receive/bot.py b/src/chat/message_receive/bot.py index 67c56be2a..6e593035c 100644 --- a/src/chat/message_receive/bot.py +++ b/src/chat/message_receive/bot.py @@ -11,7 +11,7 @@ from src.mood.mood_manager import mood_manager # 导入情绪管理器 from src.chat.message_receive.chat_stream import get_chat_manager, ChatStream from src.chat.message_receive.message import MessageRecv, MessageRecvS4U from src.chat.message_receive.storage import MessageStorage -from src.chat.heart_flow.heartflow_message_processor import HeartFCMessageReceiver +from src.chat.affinity_flow.afc_manager import afc_manager from src.chat.utils.prompt import Prompt, global_prompt_manager from src.plugin_system.core import component_registry, event_manager, global_announcement_manager from src.plugin_system.base import BaseCommand, EventType @@ -73,7 +73,7 @@ class ChatBot: self.bot = None # bot 实例引用 self._started = False self.mood_manager = mood_manager # 获取情绪管理器单例 - self.heartflow_message_receiver = HeartFCMessageReceiver() # 新增 + # 亲和力流消息处理器 - 直接使用全局afc_manager self.s4u_message_processor = S4UMessageProcessor() @@ -398,10 +398,7 @@ class ChatBot: # print(message_data) # logger.debug(str(message_data)) message = MessageRecv(message_data) - - if await self.handle_notice_message(message): - ... - + group_info = message.message_info.group_info user_info = message.message_info.user_info if message.message_info.additional_config: @@ -467,7 +464,13 @@ class ChatBot: template_group_name = None async def preprocess(): - await self.heartflow_message_receiver.process_message(message) + # 使用亲和力流系统处理消息 + message_data = { + "message_info": message.message_info.__dict__, + "processed_plain_text": message.processed_plain_text, + "chat_stream": message.chat_stream.__dict__ if message.chat_stream else None + } + await afc_manager.process_message(message.chat_stream.stream_id, message_data) if template_group_name: async with global_prompt_manager.async_message_scope(template_group_name):