diff --git a/interest_monitor_gui.py b/interest_monitor_gui.py index 5b19d4808..c713cc122 100644 --- a/interest_monitor_gui.py +++ b/interest_monitor_gui.py @@ -37,24 +37,57 @@ class InterestMonitorApp: # 使用 deque 来存储有限的历史数据点 # key: stream_id, value: deque([(timestamp, interest_level), ...]) self.stream_history = {} - # key: stream_id, value: deque([(timestamp, reply_probability), ...]) # <--- 新增:存储概率历史 + # key: stream_id, value: deque([(timestamp, reply_probability), ...]) self.probability_history = {} self.stream_colors = {} # 为每个 stream 分配颜色 - self.stream_display_names = {} # *** New: Store display names (group_name) *** + self.stream_display_names = {} # 存储显示名称 (group_name) self.selected_stream_id = tk.StringVar() # 用于 Combobox 绑定 + # --- 新增:存储其他参数 --- + # 顶层信息 + self.latest_main_mind = tk.StringVar(value="N/A") + self.latest_mai_state = tk.StringVar(value="N/A") + self.latest_subflow_count = tk.IntVar(value=0) + # 子流最新状态 (key: stream_id) + self.stream_sub_minds = {} + self.stream_chat_states = {} + self.stream_threshold_status = {} + self.stream_last_active = {} + self.stream_last_interaction = {} + # 用于显示单个流详情的 StringVar + self.single_stream_sub_mind = tk.StringVar(value="想法: N/A") + self.single_stream_chat_state = tk.StringVar(value="状态: N/A") + self.single_stream_threshold = tk.StringVar(value="阈值: N/A") + self.single_stream_last_active = tk.StringVar(value="活跃: N/A") + self.single_stream_last_interaction = tk.StringVar(value="交互: N/A") + + # --- UI 元素 --- + + # --- 新增:顶部全局信息框架 --- + self.global_info_frame = ttk.Frame(root, padding="5 0 5 5") # 顶部内边距调整 + self.global_info_frame.pack(side=tk.TOP, fill=tk.X, pady=(5, 0)) # 底部外边距为0 + + ttk.Label(self.global_info_frame, text="全局状态:").pack(side=tk.LEFT, padx=(0, 10)) + ttk.Label(self.global_info_frame, textvariable=self.latest_mai_state).pack(side=tk.LEFT, padx=5) + ttk.Label(self.global_info_frame, text="想法:").pack(side=tk.LEFT, padx=(10, 0)) + ttk.Label(self.global_info_frame, textvariable=self.latest_main_mind).pack(side=tk.LEFT, padx=5) + ttk.Label(self.global_info_frame, text="子流数:").pack(side=tk.LEFT, padx=(10, 0)) + ttk.Label(self.global_info_frame, textvariable=self.latest_subflow_count).pack(side=tk.LEFT, padx=5) + + # 创建 Notebook (选项卡控件) self.notebook = ttk.Notebook(root) - self.notebook.pack(pady=10, padx=10, fill=tk.BOTH, expand=1) + # 修改:fill 和 expand,让 notebook 填充剩余空间 + self.notebook.pack(pady=(5, 0), padx=10, fill=tk.BOTH, expand=1) #顶部外边距改小 # --- 第一个选项卡:所有流 --- self.frame_all = ttk.Frame(self.notebook, padding="5 5 5 5") self.notebook.add(self.frame_all, text="所有聊天流") - # 状态标签 + # 状态标签 (移动到最底部) self.status_label = tk.Label(root, text="Initializing...", anchor="w", fg="grey") - self.status_label.pack(side=tk.BOTTOM, fill=tk.X, padx=5, pady=2) + self.status_label.pack(side=tk.BOTTOM, fill=tk.X, padx=10, pady=(0, 5)) # 调整边距 # Matplotlib 图表设置 (用于第一个选项卡) self.fig = Figure(figsize=(5, 4), dpi=100) @@ -81,6 +114,16 @@ class InterestMonitorApp: self.stream_selector.pack(side=tk.LEFT, fill=tk.X, expand=True) self.stream_selector.bind("<>", self.on_stream_selected) + # --- 新增:单个流详情显示区域 --- + self.single_stream_details_frame = ttk.Frame(self.frame_single, padding="5 5 5 0") + self.single_stream_details_frame.pack(side=tk.TOP, fill=tk.X, pady=(0, 5)) + + ttk.Label(self.single_stream_details_frame, textvariable=self.single_stream_sub_mind).pack(side=tk.LEFT, padx=5) + ttk.Label(self.single_stream_details_frame, textvariable=self.single_stream_chat_state).pack(side=tk.LEFT, padx=5) + ttk.Label(self.single_stream_details_frame, textvariable=self.single_stream_threshold).pack(side=tk.LEFT, padx=5) + ttk.Label(self.single_stream_details_frame, textvariable=self.single_stream_last_active).pack(side=tk.LEFT, padx=5) + ttk.Label(self.single_stream_details_frame, textvariable=self.single_stream_last_interaction).pack(side=tk.LEFT, padx=5) + # Matplotlib 图表设置 (用于第二个选项卡) self.fig_single = Figure(figsize=(5, 4), dpi=100) # 修改:创建两个子图,一个显示兴趣度,一个显示概率 @@ -116,6 +159,11 @@ class InterestMonitorApp: new_stream_history = {} new_stream_display_names = {} new_probability_history = {} # <--- 重置概率历史 + # --- 新增:重置其他子流状态 --- (如果需要的话,但通常覆盖即可) + # self.stream_sub_minds = {} + # self.stream_chat_states = {} + # ... 等等 ... + read_count = 0 error_count = 0 # *** Calculate the timestamp threshold for the last 30 minutes *** @@ -128,58 +176,105 @@ class InterestMonitorApp: read_count += 1 try: log_entry = json.loads(line.strip()) - timestamp = log_entry.get("timestamp") + timestamp = log_entry.get("timestamp") # 获取顶层时间戳 - # *** Add time filtering *** - if timestamp is None or float(timestamp) < time_threshold: - continue # Skip old or invalid entries - - stream_id = log_entry.get("stream_id") - interest_level = log_entry.get("interest_level") - group_name = log_entry.get( - "group_name", stream_id - ) # *** Get group_name, fallback to stream_id *** - reply_probability = log_entry.get("reply_probability") # <--- 获取概率值 - - # *** Check other required fields AFTER time filtering *** - if stream_id is None or interest_level is None: + # *** 时间过滤 *** + if timestamp is None: error_count += 1 - continue # 跳过无效行 + continue # 跳过没有时间戳的行 + try: + entry_timestamp = float(timestamp) + if entry_timestamp < time_threshold: + continue # 跳过时间过早的条目 + except (ValueError, TypeError): + error_count += 1 + continue # 跳过时间戳格式错误的行 - # 如果是第一次读到这个 stream_id,则创建 deque - if stream_id not in new_stream_history: - new_stream_history[stream_id] = deque(maxlen=MAX_HISTORY_POINTS) - new_probability_history[stream_id] = deque(maxlen=MAX_HISTORY_POINTS) # <--- 创建概率 deque - # 检查是否已有颜色,没有则分配 - if stream_id not in self.stream_colors: - self.stream_colors[stream_id] = self.get_random_color() + # --- 新增:更新顶层信息 (使用最后一个有效行的数据) --- + self.latest_main_mind.set(log_entry.get("main_mind", self.latest_main_mind.get())) # 保留旧值如果缺失 + self.latest_mai_state.set(log_entry.get("mai_state", self.latest_mai_state.get())) + self.latest_subflow_count.set(log_entry.get("subflow_count", self.latest_subflow_count.get())) - # *** Store the latest display name found for this stream_id *** - new_stream_display_names[stream_id] = group_name + # --- 修改开始:迭代 subflows --- + subflows = log_entry.get("subflows") + if not isinstance(subflows, list): # 检查 subflows 是否存在且为列表 + error_count += 1 + continue # 跳过没有 subflows 或格式无效的行 - # 添加数据点 - new_stream_history[stream_id].append((float(timestamp), float(interest_level))) - # 添加概率数据点 (如果存在) - if reply_probability is not None: + for subflow_entry in subflows: + stream_id = subflow_entry.get("stream_id") + interest_level = subflow_entry.get("interest_level") + # 获取 group_name,如果不存在则回退到 stream_id + group_name = subflow_entry.get("group_name", stream_id) + reply_probability = subflow_entry.get("reply_probability") # 获取概率值 + + # *** 检查必要的字段 *** + # 注意:时间戳已在顶层检查过 + if stream_id is None or interest_level is None: + # 这里可以选择记录子流错误,但暂时跳过 + continue # 跳过无效的 subflow 条目 + + # 确保 interest_level 可以转换为浮点数 try: - new_probability_history[stream_id].append((float(timestamp), float(reply_probability))) - except (TypeError, ValueError): - # 如果概率值无效,可以跳过或记录一个默认值,这里跳过 - pass + interest_level_float = float(interest_level) + except (ValueError, TypeError): + continue # 跳过 interest_level 无效的 subflow + + # 如果是第一次读到这个 stream_id,则创建 deque + if stream_id not in new_stream_history: + new_stream_history[stream_id] = deque(maxlen=MAX_HISTORY_POINTS) + new_probability_history[stream_id] = deque(maxlen=MAX_HISTORY_POINTS) # 创建概率 deque + # 检查是否已有颜色,没有则分配 + if stream_id not in self.stream_colors: + self.stream_colors[stream_id] = self.get_random_color() + + # *** 存储此 stream_id 最新的显示名称 *** + new_stream_display_names[stream_id] = group_name + + # --- 新增:存储其他子流信息 --- + self.stream_sub_minds[stream_id] = subflow_entry.get("sub_mind", "N/A") + self.stream_chat_states[stream_id] = subflow_entry.get("sub_chat_state", "N/A") + self.stream_threshold_status[stream_id] = subflow_entry.get("is_above_threshold", False) + self.stream_last_active[stream_id] = subflow_entry.get("last_active_time") # 存储原始时间戳 + self.stream_last_interaction[stream_id] = subflow_entry.get("last_interaction_time") # 存储原始时间戳 + + # 添加数据点 (使用顶层时间戳) + new_stream_history[stream_id].append((entry_timestamp, interest_level_float)) + + # 添加概率数据点 (如果存在且有效) + if reply_probability is not None: + try: + # 尝试将概率转换为浮点数 + probability_float = float(reply_probability) + new_probability_history[stream_id].append((entry_timestamp, probability_float)) + except (TypeError, ValueError): + # 如果概率值无效,可以跳过或记录一个默认值,这里跳过 + pass + # --- 修改结束 --- except json.JSONDecodeError: error_count += 1 # logger.warning(f"Skipping invalid JSON line: {line.strip()}") continue # 跳过无法解析的行 - except (TypeError, ValueError): - error_count += 1 - # logger.warning(f"Skipping line due to data type error ({e}): {line.strip()}") - continue # 跳过数据类型错误的行 + # except (TypeError, ValueError) as e: # 这个外层 catch 可能不再需要,因为类型错误在内部处理了 + # error_count += 1 + # # logger.warning(f"Skipping line due to data type error ({e}): {line.strip()}") + # continue # 跳过数据类型错误的行 # 读取完成后,用新数据替换旧数据 self.stream_history = new_stream_history self.stream_display_names = new_stream_display_names # *** Update display names *** self.probability_history = new_probability_history # <--- 更新概率历史 + # 清理不再存在的 stream_id 的附加信息 (可选,但保持一致性) + streams_to_remove = set(self.stream_sub_minds.keys()) - set(new_stream_history.keys()) + for sid in streams_to_remove: + self.stream_sub_minds.pop(sid, None) + self.stream_chat_states.pop(sid, None) + self.stream_threshold_status.pop(sid, None) + self.stream_last_active.pop(sid, None) + self.stream_last_interaction.pop(sid, None) + # 颜色和显示名称也应该清理,但当前逻辑是保留旧颜色 + # self.stream_colors.pop(sid, None) status_msg = f"Data loaded at {datetime.now().strftime('%H:%M:%S')}. Lines read: {read_count}." if error_count > 0: status_msg += f" Skipped {error_count} invalid lines." @@ -383,9 +478,45 @@ class InterestMonitorApp: self.ax_single_interest.set_xlim(one_hour_ago, now) # self.ax_single_probability.set_xlim(one_hour_ago, now) # sharex 会自动同步 + # --- 新增:更新单个流的详细信息标签 --- + self.update_single_stream_details(selected_sid) + # --- 新增:重新绘制画布 --- self.canvas_single.draw() + def format_timestamp(self, ts): + """辅助函数:格式化时间戳,处理 None 或无效值""" + if ts is None: + return "N/A" + try: + # 假设 ts 是 float 类型的时间戳 + dt_object = datetime.fromtimestamp(float(ts)) + return dt_object.strftime("%Y-%m-%d %H:%M:%S") + except (ValueError, TypeError): + return "Invalid Time" + + def update_single_stream_details(self, stream_id): + """更新单个流详情区域的标签内容""" + if stream_id: + sub_mind = self.stream_sub_minds.get(stream_id, "N/A") + chat_state = self.stream_chat_states.get(stream_id, "N/A") + threshold = self.stream_threshold_status.get(stream_id, False) + last_active_ts = self.stream_last_active.get(stream_id) + last_interaction_ts = self.stream_last_interaction.get(stream_id) + + self.single_stream_sub_mind.set(f"想法: {sub_mind}") + self.single_stream_chat_state.set(f"状态: {chat_state}") + self.single_stream_threshold.set(f"阈值以上: {'是' if threshold else '否'}") + self.single_stream_last_active.set(f"最后活跃: {self.format_timestamp(last_active_ts)}") + self.single_stream_last_interaction.set(f"最后交互: {self.format_timestamp(last_interaction_ts)}") + else: + # 如果没有选择流,则清空详情 + self.single_stream_sub_mind.set("想法: N/A") + self.single_stream_chat_state.set("状态: N/A") + self.single_stream_threshold.set("阈值: N/A") + self.single_stream_last_active.set("活跃: N/A") + self.single_stream_last_interaction.set("交互: N/A") + def update_display(self): """主更新循环""" try: diff --git a/src/heart_flow/background_tasks.py b/src/heart_flow/background_tasks.py index c24054bd8..d6a9869b7 100644 --- a/src/heart_flow/background_tasks.py +++ b/src/heart_flow/background_tasks.py @@ -6,7 +6,7 @@ from src.common.logger import get_module_logger, LogConfig, BACKGROUND_TASKS_STY # Need manager types for dependency injection from src.heart_flow.mai_state_manager import MaiStateManager, MaiStateInfo -from src.heart_flow.subheartflow_manager import SubHeartflowManager +from src.heart_flow.subheartflow_manager import SubHeartflowManager, ChatState from src.heart_flow.interest_logger import InterestLogger background_tasks_log_config = LogConfig( @@ -16,6 +16,9 @@ background_tasks_log_config = LogConfig( logger = get_module_logger("background_tasks", config=background_tasks_log_config) +# 新增兴趣评估间隔 +INTEREST_EVAL_INTERVAL_SECONDS = 5 + class BackgroundTaskManager: """管理 Heartflow 的后台周期性任务。""" @@ -30,6 +33,8 @@ class BackgroundTaskManager: cleanup_interval: int, log_interval: int, inactive_threshold: int, + # 新增兴趣评估间隔参数 + interest_eval_interval: int = INTEREST_EVAL_INTERVAL_SECONDS, ): self.mai_state_info = mai_state_info self.mai_state_manager = mai_state_manager @@ -41,46 +46,70 @@ class BackgroundTaskManager: self.cleanup_interval = cleanup_interval self.log_interval = log_interval self.inactive_threshold = inactive_threshold # For cleanup task + self.interest_eval_interval = interest_eval_interval # 存储兴趣评估间隔 # Task references self._state_update_task: Optional[asyncio.Task] = None self._cleanup_task: Optional[asyncio.Task] = None self._logging_task: Optional[asyncio.Task] = None + self._interest_eval_task: Optional[asyncio.Task] = None # 新增兴趣评估任务引用 self._tasks: List[Optional[asyncio.Task]] = [] # Keep track of all tasks async def start_tasks(self): - """启动所有后台任务""" - # 状态更新任务 - if self._state_update_task is None or self._state_update_task.done(): - self._state_update_task = asyncio.create_task( - self._run_state_update_cycle(self.update_interval), name="hf_state_update" - ) - self._tasks.append(self._state_update_task) - logger.debug(f"聊天状态更新任务已启动 间隔:{self.update_interval}s") - else: - logger.warning("状态更新任务已在运行") + """启动所有后台任务 + + 功能说明: + - 启动核心后台任务: 状态更新、清理、日志记录和兴趣评估 + - 每个任务启动前检查是否已在运行 + - 将任务引用保存到任务列表 + """ + + # 任务配置列表: (任务变量名, 任务函数, 任务名称, 日志级别, 额外日志信息, 任务对象引用属性名) + task_configs = [ + (self._state_update_task, + lambda: self._run_state_update_cycle(self.update_interval), + "hf_state_update", + "debug", + f"聊天状态更新任务已启动 间隔:{self.update_interval}s", + "_state_update_task"), + + (self._cleanup_task, + self._run_cleanup_cycle, + "hf_cleanup", + "info", + f"清理任务已启动 间隔:{self.cleanup_interval}s 阈值:{self.inactive_threshold}s", + "_cleanup_task"), + + (self._logging_task, + self._run_logging_cycle, + "hf_logging", + "info", + f"日志任务已启动 间隔:{self.log_interval}s", + "_logging_task"), - # 清理任务 - if self._cleanup_task is None or self._cleanup_task.done(): - self._cleanup_task = asyncio.create_task(self._run_cleanup_cycle(), name="hf_cleanup") - self._tasks.append(self._cleanup_task) - logger.info(f"清理任务已启动 间隔:{self.cleanup_interval}s 阈值:{self.inactive_threshold}s") - else: - logger.warning("清理任务已在运行") + # 新增兴趣评估任务配置 + (self._interest_eval_task, + self._run_interest_eval_cycle, + "hf_interest_eval", + "debug", # 设为debug,避免过多日志 + f"兴趣评估任务已启动 间隔:{self.interest_eval_interval}s", + "_interest_eval_task"), + ] - # 日志任务 - if self._logging_task is None or self._logging_task.done(): - self._logging_task = asyncio.create_task(self._run_logging_cycle(), name="hf_logging") - self._tasks.append(self._logging_task) - logger.info(f"日志任务已启动 间隔:{self.log_interval}s") - else: - logger.warning("日志任务已在运行") - - # # 初始状态检查 - # initial_state = self.mai_state_info.get_current_state() - # if initial_state != self.mai_state_info.mai_status.OFFLINE: - # logger.info(f"初始状态:{initial_state.value} 触发初始激活检查") - # asyncio.create_task(self.subheartflow_manager.activate_random_subflows_to_chat(initial_state)) + # 统一启动所有任务 + for task_var, task_func, task_name, log_level, log_msg, task_attr_name in task_configs: + # 检查任务变量是否存在且未完成 + current_task_var = getattr(self, task_attr_name) + if current_task_var is None or current_task_var.done(): + new_task = asyncio.create_task(task_func(), name=task_name) + setattr(self, task_attr_name, new_task) # 更新任务变量 + if new_task not in self._tasks: # 避免重复添加 + self._tasks.append(new_task) + + # 根据配置记录不同级别的日志 + getattr(logger, log_level)(log_msg) + else: + logger.warning(f"{task_name}任务已在运行") async def stop_tasks(self): """停止所有后台任务。 @@ -131,15 +160,11 @@ class BackgroundTaskManager: # 计算并执行间隔等待 elapsed = asyncio.get_event_loop().time() - start_time sleep_time = max(0, interval - elapsed) - if sleep_time < 0.1: # 任务超时处理 - logger.warning(f"任务 {task_name} 超时执行 ({elapsed:.2f}s > {interval}s)") + # if sleep_time < 0.1: # 任务超时处理, DEBUG 时可能干扰断点 + # logger.warning(f"任务 {task_name} 超时执行 ({elapsed:.2f}s > {interval}s)") await asyncio.sleep(sleep_time) - # 非离线状态时评估兴趣 - if self.mai_state_info.get_current_state() != self.mai_state_info.mai_status.OFFLINE: - await self.subheartflow_manager.evaluate_interest_and_promote() - - logger.debug(f"任务循环结束, 当前状态: {self.mai_state_info.get_current_state().value}") + logger.debug(f"任务循环结束: {task_name}") # 调整日志信息 async def _perform_state_update_work(self): """执行状态更新工作""" @@ -189,8 +214,15 @@ class BackgroundTaskManager: logger.debug("[Background Task Cleanup] Cleanup cycle finished. No inactive flows found.") async def _perform_logging_work(self): - """执行一轮兴趣日志记录。""" - await self.interest_logger.log_interest_states() + """执行一轮状态日志记录。""" + await self.interest_logger.log_all_states() + + # --- 新增兴趣评估工作函数 --- + async def _perform_interest_eval_work(self): + """执行一轮子心流兴趣评估与提升检查。""" + # 直接调用 subheartflow_manager 的方法,并传递当前状态信息 + await self.subheartflow_manager.evaluate_interest_and_promote(self.mai_state_info) + # --- 结束新增 --- # --- Specific Task Runners --- # async def _run_state_update_cycle(self, interval: int): @@ -205,5 +237,14 @@ class BackgroundTaskManager: async def _run_logging_cycle(self): await self._run_periodic_loop( - task_name="Interest Logging", interval=self.log_interval, task_func=self._perform_logging_work + task_name="State Logging", + interval=self.log_interval, + task_func=self._perform_logging_work ) + + # --- 新增兴趣评估任务运行器 --- + async def _run_interest_eval_cycle(self): + await self._run_periodic_loop( + task_name="Interest Evaluation", interval=self.interest_eval_interval, task_func=self._perform_interest_eval_work + ) + # --- 结束新增 --- diff --git a/src/heart_flow/heartflow.py b/src/heart_flow/heartflow.py index 4bf3765be..7fbc0f58a 100644 --- a/src/heart_flow/heartflow.py +++ b/src/heart_flow/heartflow.py @@ -4,7 +4,6 @@ from src.config.config import global_config from src.plugins.schedule.schedule_generator import bot_schedule from src.common.logger import get_module_logger, LogConfig, HEARTFLOW_STYLE_CONFIG from typing import Any, Optional -from src.plugins.heartFC_chat.heartFC_generator import ResponseGenerator from src.do_tool.tool_use import ToolUser from src.plugins.person_info.relationship_manager import relationship_manager # Module instance from src.heart_flow.mai_state_manager import MaiStateInfo, MaiStateManager @@ -23,7 +22,7 @@ logger = get_module_logger("heartflow", config=heartflow_config) # Task Intervals (should be in BackgroundTaskManager or config) CLEANUP_INTERVAL_SECONDS = 1200 -STATE_UPDATE_INTERVAL_SECONDS = 30 +STATE_UPDATE_INTERVAL_SECONDS = 60 # Thresholds (should be in SubHeartflowManager or config) INACTIVE_THRESHOLD_SECONDS = 1200 @@ -57,13 +56,12 @@ class Heartflow: ) # 外部依赖模块 - self.gpt_instance = ResponseGenerator() # 响应生成器 self.tool_user_instance = ToolUser() # 工具使用模块 self.relationship_manager_instance = relationship_manager # 关系管理模块 # 子系统初始化 self.mind: Mind = Mind(self.subheartflow_manager, self.llm_model) # 思考管理器 - self.interest_logger: InterestLogger = InterestLogger(self.subheartflow_manager) # 兴趣日志记录器 + self.interest_logger: InterestLogger = InterestLogger(self.subheartflow_manager, self) # 兴趣日志记录器 # 后台任务管理器 (整合所有定时任务) self.background_task_manager: BackgroundTaskManager = BackgroundTaskManager( diff --git a/src/heart_flow/interest_logger.py b/src/heart_flow/interest_logger.py index 4b7e6323e..15a08b64d 100644 --- a/src/heart_flow/interest_logger.py +++ b/src/heart_flow/interest_logger.py @@ -12,7 +12,8 @@ from src.plugins.chat.chat_stream import chat_manager if TYPE_CHECKING: from src.heart_flow.subheartflow_manager import SubHeartflowManager - from src.heart_flow.sub_heartflow import SubHeartflow # For type hint in get_interest_states + from src.heart_flow.sub_heartflow import SubHeartflow + from src.heart_flow.heartflow import Heartflow # 导入 Heartflow 类型 interest_logger_config = LogConfig( console_format=INTEREST_STYLE_CONFIG["console_format"], @@ -26,25 +27,28 @@ HISTORY_LOG_FILENAME = "interest_history.log" class InterestLogger: - """负责定期记录所有子心流的兴趣状态到日志文件。""" + """负责定期记录主心流和所有子心流的状态到日志文件。""" - def __init__(self, subheartflow_manager: "SubHeartflowManager"): + def __init__(self, subheartflow_manager: "SubHeartflowManager", heartflow: "Heartflow"): + """ + 初始化 InterestLogger。 + + Args: + subheartflow_manager: 子心流管理器实例。 + heartflow: 主心流实例,用于获取主心流状态。 + """ self.subheartflow_manager = subheartflow_manager + self.heartflow = heartflow # 存储 Heartflow 实例 self._history_log_file_path = os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME) self._ensure_log_directory() def _ensure_log_directory(self): """确保日志目录存在。""" - try: - os.makedirs(LOG_DIRECTORY, exist_ok=True) - logger.info(f"已确保日志目录 '{LOG_DIRECTORY}' 存在") - except OSError as e: - logger.error(f"创建日志目录 '{LOG_DIRECTORY}' 出错: {e}") + os.makedirs(LOG_DIRECTORY, exist_ok=True) + logger.info(f"已确保日志目录 '{LOG_DIRECTORY}' 存在") - async def get_all_interest_states(self) -> Dict[str, Dict]: - """并发获取所有活跃子心流的当前兴趣状态。""" - _states = {} - # Get snapshot from the manager + async def get_all_subflow_states(self) -> Dict[str, Dict]: + """并发获取所有活跃子心流的当前完整状态。""" all_flows: List["SubHeartflow"] = self.subheartflow_manager.get_all_subheartflows() tasks = [] results = {} @@ -53,12 +57,11 @@ class InterestLogger: logger.debug("未找到任何子心流状态") return results - # logger.debug(f"正在获取 {len(all_flows)} 个子心流的兴趣状态...") for subheartflow in all_flows: if self.subheartflow_manager.get_subheartflow(subheartflow.subheartflow_id): tasks.append( asyncio.create_task( - subheartflow.get_interest_state(), name=f"get_state_{subheartflow.subheartflow_id}" + subheartflow.get_full_state(), name=f"get_state_{subheartflow.subheartflow_id}" ) ) else: @@ -68,74 +71,87 @@ class InterestLogger: done, pending = await asyncio.wait(tasks, timeout=5.0) if pending: - logger.warning(f"获取兴趣状态超时,有 {len(pending)} 个任务未完成") + logger.warning(f"获取子心流状态超时,有 {len(pending)} 个任务未完成") for task in pending: task.cancel() for task in done: - try: - stream_id_str = task.get_name().split("get_state_")[-1] - stream_id = stream_id_str - except IndexError: - logger.error(f"无法从任务名 {task.get_name()} 中提取 stream_id") - continue + stream_id_str = task.get_name().split("get_state_")[-1] + stream_id = stream_id_str - try: + if task.cancelled(): + logger.warning(f"获取子心流 {stream_id} 状态的任务已取消(超时)", exc_info=False) + elif task.exception(): + exc = task.exception() + logger.warning(f"获取子心流 {stream_id} 状态出错: {exc}") + else: result = task.result() results[stream_id] = result - except asyncio.CancelledError: - logger.warning(f"获取子心流 {stream_id} 兴趣状态的任务已取消(超时)", exc_info=False) - except Exception as e: - logger.warning(f"获取子心流 {stream_id} 兴趣状态出错: {e}") - logger.trace(f"成功获取 {len(results)} 个兴趣状态") + logger.trace(f"成功获取 {len(results)} 个子心流的完整状态") return results - async def log_interest_states(self): - """获取所有子心流的兴趣状态并写入日志文件。""" - # logger.debug("开始定期记录兴趣状态...") + async def log_all_states(self): + """获取主心流状态和所有子心流的完整状态并写入日志文件。""" try: current_timestamp = time.time() - all_interest_states = await self.get_all_interest_states() - if not all_interest_states: - logger.debug("没有获取到任何兴趣状态") + main_mind = self.heartflow.current_mind + # 获取 Mai 状态名称 + mai_state_name = self.heartflow.current_state.get_current_state().name + + all_subflow_states = await self.get_all_subflow_states() + + log_entry_base = { + "timestamp": round(current_timestamp, 2), + "main_mind": main_mind, + "mai_state": mai_state_name, + "subflow_count": len(all_subflow_states), + "subflows": [] + } + + if not all_subflow_states: + logger.debug("没有获取到任何子心流状态,仅记录主心流状态") + with open(self._history_log_file_path, "a", encoding="utf-8") as f: + f.write(json.dumps(log_entry_base, ensure_ascii=False) + "\n") return - count = 0 - try: - with open(self._history_log_file_path, "a", encoding="utf-8") as f: - items_snapshot = list(all_interest_states.items()) - for stream_id, state in items_snapshot: - group_name = stream_id - try: - chat_stream = chat_manager.get_stream(stream_id) - if chat_stream and chat_stream.group_info: - group_name = chat_stream.group_info.group_name - elif chat_stream and not chat_stream.group_info: - group_name = ( - f"私聊_{chat_stream.user_info.user_nickname}" - if chat_stream.user_info - else stream_id - ) - except Exception as e: - logger.trace(f"无法获取 stream_id {stream_id} 的群组名: {e}") - pass + subflow_details = [] + items_snapshot = list(all_subflow_states.items()) + for stream_id, state in items_snapshot: + group_name = stream_id + try: + chat_stream = chat_manager.get_stream(stream_id) + if chat_stream: + if chat_stream.group_info: + group_name = chat_stream.group_info.group_name + elif chat_stream.user_info: + group_name = f"私聊_{chat_stream.user_info.user_nickname}" + except Exception as e: + logger.trace(f"无法获取 stream_id {stream_id} 的群组名: {e}") - log_entry = { - "timestamp": round(current_timestamp, 2), - "stream_id": stream_id, - "interest_level": state.get("interest_level", 0.0), - "group_name": group_name, - "reply_probability": state.get("current_reply_probability", 0.0), - "is_above_threshold": state.get("is_above_threshold", False), - } - f.write(json.dumps(log_entry, ensure_ascii=False) + "\n") - count += 1 - # logger.debug(f"成功记录 {count} 条兴趣历史到 {self._history_log_file_path}") - except IOError as e: - logger.error(f"写入兴趣历史日志到 {self._history_log_file_path} 出错: {e}") + interest_state = state.get("interest_state", {}) + subflow_entry = { + "stream_id": stream_id, + "group_name": group_name, + "sub_mind": state.get("current_mind", "未知"), + "sub_chat_state": state.get("chat_state", "未知"), + "interest_level": interest_state.get("interest_level", 0.0), + "reply_probability": interest_state.get("current_reply_probability", 0.0), + "is_above_threshold": interest_state.get("is_above_threshold", False), + "last_active_time": state.get("last_active_time", 0.0), + "last_interaction_time": interest_state.get("last_interaction_time", 0.0), + } + subflow_details.append(subflow_entry) + + log_entry_base["subflows"] = subflow_details + + with open(self._history_log_file_path, "a", encoding="utf-8") as f: + f.write(json.dumps(log_entry_base, ensure_ascii=False) + "\n") + + except IOError as e: + logger.error(f"写入状态日志到 {self._history_log_file_path} 出错: {e}") except Exception as e: - logger.error(f"定期记录兴趣历史时发生意外错误: {e}") + logger.error(f"记录状态时发生意外错误: {e}") logger.error(traceback.format_exc()) diff --git a/src/heart_flow/mai_state_manager.py b/src/heart_flow/mai_state_manager.py index 2831df388..5048ff080 100644 --- a/src/heart_flow/mai_state_manager.py +++ b/src/heart_flow/mai_state_manager.py @@ -23,9 +23,9 @@ class MaiState(enum.Enum): """ OFFLINE = "不在线" - PEEKING = "看一眼手机" + PEEKING = "看一眼" NORMAL_CHAT = "正常聊天" - FOCUSED_CHAT = "专注聊天" + FOCUSED_CHAT = "专心聊天" def get_normal_chat_max_num(self): if self == MaiState.OFFLINE: @@ -127,9 +127,9 @@ class MaiStateManager: elif current_status == MaiState.PEEKING: logger.info("当前[在窥屏],思考要不要继续聊下去......") elif current_status == MaiState.NORMAL_CHAT: - logger.info("当前在[闲聊]思考要不要继续聊下去......") + logger.info("当前在[随便看]思考要不要继续聊下去......") elif current_status == MaiState.FOCUSED_CHAT: - logger.info("当前在[激情聊天]思考要不要继续聊下去......") + logger.info("当前在[专心看]思考要不要继续聊下去......") # 1. 麦麦每分钟都有概率离线 if time_since_last_min_check >= 60: @@ -149,7 +149,7 @@ class MaiStateManager: next_state_candidate = random.choices(choices_list, weights=weights, k=1)[0] if next_state_candidate != MaiState.OFFLINE: next_state = next_state_candidate - logger.debug(f"上线!开始 {next_state.name}") + logger.debug(f"上线!开始 {next_state.value}") else: # 继续离线状态 next_state = MaiState.OFFLINE @@ -159,7 +159,7 @@ class MaiStateManager: weights = [70, 20, 10] choices_list = [MaiState.OFFLINE, MaiState.NORMAL_CHAT, MaiState.FOCUSED_CHAT] next_state = random.choices(choices_list, weights=weights, k=1)[0] - logger.debug(f"手机看完了,接下来 {next_state.name}") + logger.debug(f"手机看完了,接下来 {next_state.value}") elif current_status == MaiState.NORMAL_CHAT: if time_in_current_status >= 300: # NORMAL_CHAT 最多持续 300 秒 @@ -167,16 +167,16 @@ class MaiStateManager: choices_list = [MaiState.OFFLINE, MaiState.FOCUSED_CHAT] next_state = random.choices(choices_list, weights=weights, k=1)[0] if next_state == MaiState.FOCUSED_CHAT: - logger.debug(f"继续深入聊天, {next_state.name}") + logger.debug(f"继续深入聊天, {next_state.value}") else: - logger.debug(f"聊完了,接下来 {next_state.name}") + logger.debug(f"聊完了,接下来 {next_state.value}") elif current_status == MaiState.FOCUSED_CHAT: if time_in_current_status >= 600: # FOCUSED_CHAT 最多持续 600 秒 weights = [80, 20] choices_list = [MaiState.OFFLINE, MaiState.NORMAL_CHAT] next_state = random.choices(choices_list, weights=weights, k=1)[0] - logger.debug(f"深入聊天结束,接下来 {next_state.name}") + logger.debug(f"深入聊天结束,接下来 {next_state.value}") # 如果决定了下一个状态,且这个状态与当前状态不同,则返回下一个状态 if next_state is not None and next_state != current_status: diff --git a/src/heart_flow/sub_heartflow.py b/src/heart_flow/sub_heartflow.py index 97149d4ff..4f00b6d36 100644 --- a/src/heart_flow/sub_heartflow.py +++ b/src/heart_flow/sub_heartflow.py @@ -18,7 +18,6 @@ from src.plugins.chat.chat_stream import chat_manager import math from src.plugins.heartFC_chat.heartFC_chat import HeartFChatting from src.plugins.heartFC_chat.normal_chat import NormalChat -from src.plugins.heartFC_chat.normal_chat_generator import ResponseGenerator from src.do_tool.tool_use import ToolUser from src.heart_flow.mai_state_manager import MaiStateInfo @@ -61,16 +60,15 @@ def init_prompt(): class ChatState(enum.Enum): - ABSENT = "不参与" - CHAT = "闲聊" - FOCUSED = "专注" + ABSENT = "没在看群" + CHAT = "随便水群" + FOCUSED = "激情水群" class ChatStateInfo: def __init__(self): - self.willing = 0 - self.chat_status: ChatState = ChatState.ABSENT + self.current_state_time = 120 self.mood_manager = MoodManager() self.mood = self.mood_manager.get_prompt() @@ -106,110 +104,74 @@ class InterestChatting: self.max_reply_probability: float = max_probability self.current_reply_probability: float = 0.0 self.is_above_threshold: bool = False - self.state_change_callback = state_change_callback + self.update_task: Optional[asyncio.Task] = None + self._stop_event = asyncio.Event() self.interest_dict: Dict[str, tuple[MessageRecv, float, bool]] = {} + self.update_interval = 1.0 + self.start_updates(self.update_interval) # 初始化时启动后台更新任务 + + self.above_threshold = False + self.start_hfc_probability = 0.0 def add_interest_dict(self, message: MessageRecv, interest_value: float, is_mentioned: bool): self.interest_dict[message.message_info.message_id] = (message, interest_value, is_mentioned) self.last_interaction_time = time.time() - async def _calculate_decay(self, current_time: float): - time_delta = current_time - self.last_update_time - if time_delta > 0: - old_interest = self.interest_level - if self.interest_level < 1e-9: - self.interest_level = 0.0 - else: - if self.decay_rate_per_second <= 0: - interest_logger.warning( - f"InterestChatting encountered non-positive decay rate: {self.decay_rate_per_second}. Setting interest to 0." - ) - self.interest_level = 0.0 - elif self.interest_level < 0: - interest_logger.warning( - f"InterestChatting encountered negative interest level: {self.interest_level}. Setting interest to 0." - ) - self.interest_level = 0.0 - else: - try: - decay_factor = math.pow(self.decay_rate_per_second, time_delta) - self.interest_level *= decay_factor - except ValueError as e: - interest_logger.error( - f"Math error during decay calculation: {e}. Rate: {self.decay_rate_per_second}, Delta: {time_delta}, Level: {self.interest_level}. Setting interest to 0." - ) - self.interest_level = 0.0 - - if old_interest != self.interest_level: - self.last_update_time = current_time - - async def _update_reply_probability(self, current_time: float): - time_delta = current_time - self.last_update_time - if time_delta <= 0: + async def _calculate_decay(self): + """计算兴趣值的衰减 + + 参数: + current_time: 当前时间戳 + + 处理逻辑: + 1. 计算时间差 + 2. 处理各种异常情况(负值/零值) + 3. 正常计算衰减 + 4. 更新最后更新时间 + """ + + # 处理极小兴趣值情况 + if self.interest_level < 1e-9: + self.interest_level = 0.0 return + + # 异常情况处理 + if self.decay_rate_per_second <= 0: + interest_logger.warning(f"衰减率({self.decay_rate_per_second})无效,重置兴趣值为0") + self.interest_level = 0.0 + return + + # 正常衰减计算 + try: + decay_factor = math.pow(self.decay_rate_per_second, self.update_interval) + self.interest_level *= decay_factor + except ValueError as e: + interest_logger.error(f"衰减计算错误: {e} 参数: 衰减率={self.decay_rate_per_second} 时间差={self.update_interval} 当前兴趣={self.interest_level}") + self.interest_level = 0.0 + - currently_above = self.interest_level >= self.trigger_threshold - previous_is_above = self.is_above_threshold - - if currently_above: - if not self.is_above_threshold: - self.current_reply_probability = self.base_reply_probability - interest_logger.debug( - f"兴趣跨过阈值 ({self.trigger_threshold}). 概率重置为基础值: {self.base_reply_probability:.4f}" - ) - else: - increase_amount = self.probability_increase_rate * time_delta - self.current_reply_probability += increase_amount - - self.current_reply_probability = min(self.current_reply_probability, self.max_reply_probability) - + async def _update_reply_probability(self): + self.above_threshold = self.interest_level >= self.trigger_threshold + if self.above_threshold: + self.start_hfc_probability += 0.1 else: - if previous_is_above: - if self.state_change_callback: - try: - await self.state_change_callback(ChatState.ABSENT) - except Exception as e: - interest_logger.error(f"Error calling state_change_callback for ABSENT: {e}") - - if 0 < self.probability_decay_factor < 1: - decay_multiplier = math.pow(self.probability_decay_factor, time_delta) - self.current_reply_probability *= decay_multiplier - if self.current_reply_probability < 1e-6: - self.current_reply_probability = 0.0 - elif self.probability_decay_factor <= 0: - if self.current_reply_probability > 0: - interest_logger.warning(f"无效的衰减因子 ({self.probability_decay_factor}). 设置概率为0.") - self.current_reply_probability = 0.0 - - self.current_reply_probability = max(self.current_reply_probability, 0.0) - - self.is_above_threshold = currently_above + if self.start_hfc_probability != 0: + self.start_hfc_probability -= 0.1 async def increase_interest(self, current_time: float, value: float): - await self._update_reply_probability(current_time) - await self._calculate_decay(current_time) self.interest_level += value self.interest_level = min(self.interest_level, self.max_interest) - self.last_update_time = current_time - self.last_interaction_time = current_time async def decrease_interest(self, current_time: float, value: float): - await self._update_reply_probability(current_time) self.interest_level -= value self.interest_level = max(self.interest_level, 0.0) - self.last_update_time = current_time - self.last_interaction_time = current_time async def get_interest(self) -> float: - current_time = time.time() - await self._update_reply_probability(current_time) - await self._calculate_decay(current_time) - self.last_update_time = current_time return self.interest_level async def get_state(self) -> dict: - interest = await self.get_interest() + interest = self.interest_level # 直接使用属性值 return { "interest_level": round(interest, 2), "last_update_time": self.last_update_time, @@ -219,8 +181,6 @@ class InterestChatting: } async def should_evaluate_reply(self) -> bool: - current_time = time.time() - await self._update_reply_probability(current_time) if self.current_reply_probability > 0: trigger = random.random() < self.current_reply_probability @@ -228,6 +188,67 @@ class InterestChatting: else: return False + # --- 新增后台更新任务相关方法 --- + async def _run_update_loop(self, update_interval: float = 1.0): + """后台循环,定期更新兴趣和回复概率。""" + while not self._stop_event.is_set(): + try: + if self.interest_level != 0: + await self._calculate_decay() + + await self._update_reply_probability() + + + # 等待下一个周期或停止事件 + await asyncio.wait_for(self._stop_event.wait(), timeout=update_interval) + except asyncio.TimeoutError: + # 正常超时,继续循环 + continue + except asyncio.CancelledError: + interest_logger.info("InterestChatting 更新循环被取消。") + break + except Exception as e: + interest_logger.error(f"InterestChatting 更新循环出错: {e}") + interest_logger.error(traceback.format_exc()) + # 防止错误导致CPU飙升,稍作等待 + await asyncio.sleep(5) + interest_logger.info("InterestChatting 更新循环已停止。") + + + def start_updates(self, update_interval: float = 1.0): + """启动后台更新任务""" + if self.update_task is None or self.update_task.done(): + self._stop_event.clear() + self.update_task = asyncio.create_task(self._run_update_loop(update_interval)) + interest_logger.debug("后台兴趣更新任务已创建并启动。") + else: + interest_logger.debug("后台兴趣更新任务已在运行中。") + + + async def stop_updates(self): + """停止后台更新任务""" + if self.update_task and not self.update_task.done(): + interest_logger.info("正在停止 InterestChatting 后台更新任务...") + self._stop_event.set() # 发送停止信号 + try: + # 等待任务结束,设置超时 + await asyncio.wait_for(self.update_task, timeout=5.0) + interest_logger.info("InterestChatting 后台更新任务已成功停止。") + except asyncio.TimeoutError: + interest_logger.warning("停止 InterestChatting 后台任务超时,尝试取消...") + self.update_task.cancel() + try: + await self.update_task # 等待取消完成 + except asyncio.CancelledError: + interest_logger.info("InterestChatting 后台更新任务已被取消。") + except Exception as e: + interest_logger.error(f"停止 InterestChatting 后台任务时发生异常: {e}") + finally: + self.update_task = None + else: + interest_logger.debug("InterestChatting 后台更新任务未运行或已完成。") + # --- 结束 新增方法 --- + class SubHeartflow: def __init__(self, subheartflow_id, mai_states: MaiStateInfo): @@ -272,122 +293,163 @@ class SubHeartflow: request_type="sub_heart_flow", ) - self.gpt_instance = ResponseGenerator() # 响应生成器 - self.tool_user_instance = ToolUser() # 工具使用模块 - self.log_prefix = chat_manager.get_stream_name(self.subheartflow_id) or self.subheartflow_id + async def add_time_current_state(self, add_time: float): + self.current_state_time += add_time + + async def change_to_state_chat(self): + self.current_state_time = 120 + self._start_normal_chat() + + async def change_to_state_focused(self): + self.current_state_time = 60 + self._start_heart_fc_chat() + + async def _stop_normal_chat(self): + """停止 NormalChat 的兴趣监控""" + if self.normal_chat_instance: + logger.info(f"{self.log_prefix} 停止 NormalChat 兴趣监控...") + try: + await self.normal_chat_instance.stop_chat() # 调用 stop_chat + except Exception as e: + logger.error(f"{self.log_prefix} 停止 NormalChat 监控任务时出错: {e}") + logger.error(traceback.format_exc()) + + async def _start_normal_chat(self) -> bool: + """启动 NormalChat 实例及其兴趣监控,确保 HeartFChatting 已停止""" + await self._stop_heart_fc_chat() # 确保专注聊天已停止 + + log_prefix = self.log_prefix + try: + # 总是尝试创建或获取最新的 stream 和 interest_dict + chat_stream = chat_manager.get_stream(self.chat_id) + if not chat_stream: + logger.error(f"{log_prefix} 无法获取 chat_stream,无法启动 NormalChat。") + return False + + # 如果实例不存在或需要更新,则创建新实例 + # if not self.normal_chat_instance: # 或者总是重新创建以获取最新的 interest_dict? + self.normal_chat_instance = NormalChat(chat_stream=chat_stream, interest_dict=self.get_interest_dict()) + logger.info(f"{log_prefix} 创建或更新 NormalChat 实例。") + + logger.info(f"{log_prefix} 启动 NormalChat 兴趣监控...") + await self.normal_chat_instance.start_chat() # <--- 修正:调用 start_chat + return True + except Exception as e: + logger.error(f"{log_prefix} 启动 NormalChat 时出错: {e}") + logger.error(traceback.format_exc()) + self.normal_chat_instance = None # 启动失败,清理实例 + return False + + async def _stop_heart_fc_chat(self): + """停止并清理 HeartFChatting 实例""" + if self.heart_fc_instance: + logger.info(f"{self.log_prefix} 关闭 HeartFChatting 实例...") + try: + await self.heart_fc_instance.shutdown() + except Exception as e: + logger.error(f"{self.log_prefix} 关闭 HeartFChatting 实例时出错: {e}") + logger.error(traceback.format_exc()) + finally: + # 无论是否成功关闭,都清理引用 + self.heart_fc_instance = None + + async def _start_heart_fc_chat(self) -> bool: + """启动 HeartFChatting 实例,确保 NormalChat 已停止""" + await self._stop_normal_chat() # 确保普通聊天监控已停止 + self.clear_interest_dict() # 清理兴趣字典,准备专注聊天 + + log_prefix = self.log_prefix + if self.heart_fc_instance: + if not self.heart_fc_instance._loop_active: + logger.warning(f"{log_prefix} HeartFChatting 实例存在但未激活,尝试重新激活...") + await self.heart_fc_instance.add_time() # 尝试添加时间以激活循环 + return True # 假设 add_time 会处理激活逻辑 + else: + logger.debug(f"{log_prefix} HeartFChatting 已在运行中。") + return True # 已经在运行 + + logger.info(f"{log_prefix} 麦麦准备开始专注聊天...") + try: + self.heart_fc_instance = HeartFChatting( + chat_id=self.chat_id, + ) + if await self.heart_fc_instance._initialize(): + await self.heart_fc_instance.add_time() # 初始化成功后添加初始时间 + logger.info(f"{log_prefix} 麦麦已成功进入专注聊天模式。") + return True + else: + logger.error(f"{log_prefix} HeartFChatting 初始化失败,无法进入专注模式。") + self.heart_fc_instance = None # 初始化失败,清理实例 + return False + except Exception as e: + logger.error(f"{log_prefix} 创建或初始化 HeartFChatting 实例时出错: {e}") + logger.error(traceback.format_exc()) + self.heart_fc_instance = None # 创建或初始化异常,清理实例 + return False + async def set_chat_state(self, new_state: "ChatState", current_states_num: tuple = ()): """更新sub_heartflow的聊天状态,并管理 HeartFChatting 和 NormalChat 实例及任务""" - current_state = self.chat_state.chat_status if current_state == new_state: - logger.trace(f"{self.log_prefix} 状态已为 {current_state.value}, 无需更改。") + # logger.trace(f"{self.log_prefix} 状态已为 {current_state.value}, 无需更改。") # 减少日志噪音 return - log_prefix = self.log_prefix # 使用实例属性 + log_prefix = self.log_prefix current_mai_state = self.mai_states.get_current_state() + state_changed = False # 标记状态是否实际发生改变 # --- 状态转换逻辑 --- if new_state == ChatState.CHAT: normal_limit = current_mai_state.get_normal_chat_max_num() - current_chat_count = current_states_num[1] + current_chat_count = current_states_num[1] if len(current_states_num) > 1 else 0 - if current_chat_count >= normal_limit and current_state != ChatState.CHAT: # 仅在状态转换时检查限制 - logger.debug( - f"{log_prefix} 麦麦不能从 {current_state.value} 转换到 聊天。原因:聊不过来了 ({current_chat_count}/{normal_limit})" - ) - return # 阻止状态转换 + if current_chat_count >= normal_limit and current_state != ChatState.CHAT: + logger.debug(f"{log_prefix} 无法从 {current_state.value} 转到 聊天。原因:聊不过来了 ({current_chat_count}/{normal_limit})") + return # 阻止状态转换 else: - logger.debug(f"{log_prefix} 麦麦可以进入或保持 聊天 状态 ({current_chat_count}/{normal_limit})") - if current_state == ChatState.FOCUSED and self.heart_fc_instance: - logger.info(f"{log_prefix} 麦麦不再专注聊天,转为随便水水...") - await self.heart_fc_instance.shutdown() # 正确关闭 HeartFChatting - self.heart_fc_instance = None - - chat_stream = chat_manager.get_stream(self.chat_id) - self.normal_chat_instance = NormalChat(chat_stream=chat_stream, interest_dict=self.get_interest_dict()) - await self.normal_chat_instance.start_monitoring_interest() - # NormalChat 启动/停止逻辑将在下面处理 + logger.debug(f"{log_prefix} 准备进入或保持 聊天 状态 ({current_chat_count}/{normal_limit})") + if await self._start_normal_chat(): + logger.info(f"{log_prefix} 成功进入或保持 NormalChat 状态。") + state_changed = True + else: + logger.error(f"{log_prefix} 启动 NormalChat 失败,无法进入 CHAT 状态。") + # 考虑是否需要回滚状态或采取其他措施 + return # 启动失败,不改变状态 elif new_state == ChatState.FOCUSED: focused_limit = current_mai_state.get_focused_chat_max_num() - current_focused_count = current_states_num[2] - - if current_focused_count >= focused_limit and current_state != ChatState.FOCUSED: # 仅在状态转换时检查限制 - logger.debug( - f"{log_prefix} 麦麦不能从 {current_state.value} 转换到 专注的聊天,原因:聊不过来了。({current_focused_count}/{focused_limit})" - ) - return # 阻止状态转换 + current_focused_count = current_states_num[2] if len(current_states_num) > 2 else 0 + + if current_focused_count >= focused_limit and current_state != ChatState.FOCUSED: + logger.debug(f"{log_prefix} 无法从 {current_state.value} 转到 专注。原因:聊不过来了 ({current_focused_count}/{focused_limit})") + return # 阻止状态转换 else: - logger.debug(f"{log_prefix} 麦麦可以进入或保持 专注聊天 状态 ({current_focused_count}/{focused_limit})") - if not self.heart_fc_instance: - logger.info(f"{log_prefix} 麦麦准备开始专注聊天...") - try: - await self.normal_chat_instance.stop_monitoring_interest() - self.clear_interest_dict() - - logger.info(f"{log_prefix} 停止 NormalChat 兴趣监控成功。") - except Exception as e: - logger.error(f"{log_prefix} 停止 NormalChat 兴趣监控时出错: {e}") - logger.error(traceback.format_exc()) - try: - self.heart_fc_instance = HeartFChatting( - chat_id=self.chat_id, - gpt_instance=self.gpt_instance, - tool_user_instance=self.tool_user_instance, - ) - if await self.heart_fc_instance._initialize(): - await self.heart_fc_instance.add_time() # 初始化成功后添加初始时间 - logger.info(f"{log_prefix} 麦麦已成功进入专注聊天模式。") - else: - logger.error( - f"{log_prefix} 麦麦不能专注聊天,因为 HeartFChatting 初始化失败了,状态回滚到 {current_state.value}" - ) - self.heart_fc_instance = None - return # 阻止进入 FOCUSED 状态 - - except Exception as e: - logger.error(f"{log_prefix} 创建麦麦专注聊天实例时出错: {e}") - logger.error(traceback.format_exc()) - self.heart_fc_instance = None - return # 创建实例异常,阻止进入 FOCUSED 状态 - + logger.debug(f"{log_prefix} 准备进入或保持 专注聊天 状态 ({current_focused_count}/{focused_limit})") + if await self._start_heart_fc_chat(): + logger.info(f"{log_prefix} 成功进入或保持 HeartFChatting 状态。") + state_changed = True else: - # 已经是 FOCUSED 状态,或者 heart_fc_instance 已存在但未运行(不太可能) - if not self.heart_fc_instance._loop_active: - logger.warning(f"{log_prefix} HeartFChatting 实例存在但未激活,尝试重新激活...") - await self.heart_fc_instance.add_time() # 尝试添加时间以激活循环 - else: - logger.debug(f"{log_prefix} 麦麦已经在专注聊天中。") - # NormalChat 启动/停止逻辑将在下面处理 + logger.error(f"{log_prefix} 启动 HeartFChatting 失败,无法进入 FOCUSED 状态。") + # 启动失败,状态回滚到之前的状态或ABSENT?这里保持不改变 + return # 启动失败,不改变状态 + elif new_state == ChatState.ABSENT: - if current_state == ChatState.FOCUSED and self.heart_fc_instance: - logger.info(f"{log_prefix} 麦麦离开专注的聊天,撤退了.....") - await self.heart_fc_instance.shutdown() # 正确关闭 HeartFChatting - self.heart_fc_instance = None - # NormalChat 启动/停止逻辑将在下面处理 + logger.info(f"{log_prefix} 进入 ABSENT 状态,停止所有聊天活动...") + await self._stop_normal_chat() + await self._stop_heart_fc_chat() + state_changed = True # 总是可以成功转换到 ABSENT - # --- 更新状态和最后活动时间 (先更新状态,再根据新状态管理任务)--- - self.chat_state.chat_status = new_state - self.last_active_time = time.time() - logger.info(f"{log_prefix} 麦麦的聊天状态从 {current_state.value} 变更为 {new_state.value}") - - # --- 根据新的状态管理 NormalChat 的监控任务 --- - if self.normal_chat_instance: - try: - if new_state == ChatState.ABSENT: - logger.info(f"{log_prefix} 状态变为 ABSENT,停止 NormalChat 兴趣监控...") - await self.normal_chat_instance.stop_monitoring_interest() - else: # CHAT or FOCUSED - logger.info(f"{log_prefix} 状态变为 {new_state.value},启动或确认 NormalChat 兴趣监控...") - await self.normal_chat_instance.start_monitoring_interest() - except Exception as e: - logger.error(f"{log_prefix} 管理 NormalChat 监控任务时出错 (新状态: {new_state.value}): {e}") - logger.error(traceback.format_exc()) + # --- 更新状态和最后活动时间 --- + if state_changed: + logger.info(f"{log_prefix} 麦麦的聊天状态从 {current_state.value} 变更为 {new_state.value}") + self.chat_state.chat_status = new_state + self.last_active_time = time.time() else: - logger.warning(f"{log_prefix} NormalChat 实例不可用,无法管理其监控任务。") + # 如果因为某些原因(如启动失败)没有成功改变状态,记录一下 + logger.debug(f"{log_prefix} 尝试将状态从 {current_state.value} 变为 {new_state.value},但未成功或未执行更改。") async def subheartflow_start_working(self): """启动子心流的后台任务 @@ -399,8 +461,8 @@ class SubHeartflow: logger.info(f"{self.log_prefix} 子心流开始工作...") while not self.should_stop: - # 主循环保持简单,只做状态检查 await asyncio.sleep(30) # 30秒检查一次停止标志 + logger.info(f"{self.log_prefix} 子心流后台任务已停止。") @@ -439,7 +501,7 @@ class SubHeartflow: extra_info_prompt = "无工具信息。\n" individuality = Individuality.get_instance() - prompt_personality = f"你的名字是{individuality.bot_nickname},你" + prompt_personality = f"你的名字是{individuality.personality.bot_nickname},你" prompt_personality += individuality.personality.personality_core if individuality.personality.personality_sides: @@ -543,6 +605,16 @@ class SubHeartflow: def clear_interest_dict(self): self.interest_chatting.interest_dict.clear() + async def get_full_state(self) -> dict: + """获取子心流的完整状态,包括兴趣、思维和聊天状态。""" + interest_state = await self.get_interest_state() + return { + "interest_state": interest_state, + "current_mind": self.current_mind, + "chat_state": self.chat_state.chat_status.value, + "last_active_time": self.last_active_time, + } + async def shutdown(self): """安全地关闭子心流及其管理的任务""" if self.should_stop: @@ -552,24 +624,14 @@ class SubHeartflow: logger.info(f"{self.log_prefix} 开始关闭子心流...") self.should_stop = True # 标记为停止,让后台任务退出 - # 停止 NormalChat 监控 (保持调用,确保清理) - if self.normal_chat_instance: - logger.info(f"{self.log_prefix} 停止 NormalChat 监控任务 (Shutdown)...") - try: - await self.normal_chat_instance.stop_monitoring_interest() - except Exception as e: - logger.error(f"{self.log_prefix} 停止 NormalChat 监控任务时出错 (Shutdown): {e}") - logger.error(traceback.format_exc()) + # 使用新的停止方法 + await self._stop_normal_chat() + await self._stop_heart_fc_chat() - # 停止 HeartFChatting (如果存在且正在运行) - if self.heart_fc_instance: - logger.info(f"{self.log_prefix} 关闭 HeartFChatting 实例 (Shutdown)...") - try: - await self.heart_fc_instance.shutdown() - except Exception as e: - logger.error(f"{self.log_prefix} 关闭 HeartFChatting 实例时出错 (Shutdown): {e}") - logger.error(traceback.format_exc()) - self.heart_fc_instance = None # 清理实例引用 + # 停止兴趣更新任务 + if self.interest_chatting: + logger.info(f"{self.log_prefix} 停止兴趣系统后台任务...") + await self.interest_chatting.stop_updates() # 取消可能存在的旧后台任务 (self.task) if self.task and not self.task.done(): diff --git a/src/heart_flow/subheartflow_manager.py b/src/heart_flow/subheartflow_manager.py index 02f104bfa..e19172f45 100644 --- a/src/heart_flow/subheartflow_manager.py +++ b/src/heart_flow/subheartflow_manager.py @@ -262,42 +262,89 @@ class SubHeartflowManager: logger.info(f"[停用] 完成, 尝试停止{len(flow_ids)}个, 成功{stopped_count}个") - async def evaluate_interest_and_promote(self): - """评估CHAT状态的子心流兴趣度,满足条件则提升到FOCUSED状态""" - logger.debug("[子心流管理器] 开始兴趣评估周期...") - evaluated_count = 0 - promoted_count = 0 + async def evaluate_interest_and_promote(self, current_mai_state: MaiStateInfo): + """评估子心流兴趣度,满足条件且未达上限则提升到FOCUSED状态(基于start_hfc_probability)""" + log_prefix_manager = "[子心流管理器-兴趣评估]" + logger.debug(f"{log_prefix_manager} 开始周期... 当前状态: {current_mai_state.get_current_state().value}") + + # 获取 FOCUSED 状态的数量上限 + current_state_enum = current_mai_state.get_current_state() + focused_limit = current_state_enum.get_focused_chat_max_num() + if focused_limit <= 0: + logger.debug(f"{log_prefix_manager} 当前状态 ({current_state_enum.value}) 不允许 FOCUSED 子心流, 跳过提升检查。") + return + + # 获取当前 FOCUSED 状态的数量 (初始值) + current_focused_count = self.count_subflows_by_state(ChatState.FOCUSED) + logger.debug(f"{log_prefix_manager} 专注上限: {focused_limit}, 当前专注数: {current_focused_count}") # 使用快照安全遍历 subflows_snapshot = list(self.subheartflows.values()) - - for sub_hf in subflows_snapshot: - flow_id = sub_hf.subheartflow_id - if flow_id in self.subheartflows and self.subheartflows[flow_id].chat_state.chat_status == ChatState.CHAT: - evaluated_count += 1 + promoted_count = 0 # 记录本次提升的数量 + try: + for sub_hf in subflows_snapshot: + flow_id = sub_hf.subheartflow_id stream_name = chat_manager.get_stream_name(flow_id) or flow_id - log_prefix = f"[{stream_name}]" + log_prefix_flow = f"[{stream_name}]" - should_promote = await sub_hf.should_evaluate_reply() - if should_promote: - logger.info(f"{log_prefix} 兴趣评估触发升级: CHAT -> FOCUSED") - states_num = ( - self.count_subflows_by_state(ChatState.ABSENT), - self.count_subflows_by_state(ChatState.CHAT), - self.count_subflows_by_state(ChatState.FOCUSED), - ) - await sub_hf.set_chat_state(ChatState.FOCUSED, states_num) - if ( - self.subheartflows.get(flow_id) - and self.subheartflows[flow_id].chat_state.chat_status == ChatState.FOCUSED - ): + # 只处理 CHAT 状态的子心流 + if sub_hf.chat_state.chat_status != ChatState.CHAT: + continue + + # 检查是否满足提升概率 + should_hfc = random.random() < sub_hf.interest_chatting.start_hfc_probability + if not should_hfc: + continue + + # --- 关键检查:检查 FOCUSED 数量是否已达上限 --- + # 注意:在循环内部再次获取当前数量,因为之前的提升可能已经改变了计数 + # 使用已经记录并在循环中更新的 current_focused_count + if current_focused_count >= focused_limit: + logger.debug(f"{log_prefix_manager} {log_prefix_flow} 达到专注上限 ({current_focused_count}/{focused_limit}), 无法提升。概率={sub_hf.interest_chatting.start_hfc_probability:.2f}") + continue # 跳过这个子心流,继续检查下一个 + + # --- 执行提升 --- + # 获取当前实例以检查最新状态 (防御性编程) + current_subflow = self.subheartflows.get(flow_id) + if not current_subflow or current_subflow.chat_state.chat_status != ChatState.CHAT: + logger.warning(f"{log_prefix_manager} {log_prefix_flow} 尝试提升时状态已改变或实例消失,跳过。") + continue + + logger.info(f"{log_prefix_manager} {log_prefix_flow} 兴趣评估触发升级 (prob={sub_hf.interest_chatting.start_hfc_probability:.2f}, 上限:{focused_limit}, 当前:{current_focused_count}) -> FOCUSED") + + states_num = ( + self.count_subflows_by_state(ChatState.ABSENT), + self.count_subflows_by_state(ChatState.CHAT), # 这个值在提升前计算 + current_focused_count, # 这个值在提升前计算 + ) + + # --- 状态设置 --- + original_state = current_subflow.chat_state.chat_status # 记录原始状态 + await current_subflow.set_chat_state(ChatState.FOCUSED, states_num) + + # --- 状态验证 --- + final_subflow = self.subheartflows.get(flow_id) + if final_subflow: + final_state = final_subflow.chat_state.chat_status + if final_state == ChatState.FOCUSED: + logger.debug(f"{log_prefix_manager} {log_prefix_flow} 成功从 {original_state.value} 升级到 FOCUSED 状态") promoted_count += 1 - logger.debug(f"{log_prefix} 成功升级到FOCUSED状态") - else: - logger.info(f"{log_prefix} 升级FOCUSED可能被限制阻止") + # 提升成功后,更新当前专注计数,以便后续检查能使用最新值 + current_focused_count += 1 + elif final_state == original_state: # 状态未变 + logger.warning(f"{log_prefix_manager} {log_prefix_flow} 尝试从 {original_state.value} 升级 FOCUSED 失败,状态仍为: {final_state.value} (可能被内部逻辑阻止)") + else: # 状态变成其他了? + logger.warning(f"{log_prefix_manager} {log_prefix_flow} 尝试从 {original_state.value} 升级 FOCUSED 后状态变为 {final_state.value}") + else: # 子心流消失了? + logger.warning(f"{log_prefix_manager} {log_prefix_flow} 升级后验证时子心流 {flow_id} 消失") - if evaluated_count > 0: - logger.debug(f"[子心流管理器] 评估完成. 评估{evaluated_count}个CHAT流, 升级{promoted_count}个到FOCUSED") + except Exception as e: + logger.error(f"{log_prefix_manager} 兴趣评估周期出错: {e}", exc_info=True) + + if promoted_count > 0: + logger.info(f"{log_prefix_manager} 评估周期结束, 成功提升 {promoted_count} 个子心流到 FOCUSED。") + else: + logger.debug(f"{log_prefix_manager} 评估周期结束, 未提升任何子心流。") def count_subflows_by_state(self, state: ChatState) -> int: """统计指定状态的子心流数量 diff --git a/src/plugins/chat/bot.py b/src/plugins/chat/bot.py index c98065d11..fdb2576a2 100644 --- a/src/plugins/chat/bot.py +++ b/src/plugins/chat/bot.py @@ -3,7 +3,7 @@ from ...config.config import global_config from .message import MessageRecv from ..PFC.pfc_manager import PFCManager from .chat_stream import chat_manager -from ..chat_module.only_process.only_message_process import MessageProcessor +from .only_message_process import MessageProcessor from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig from ..heartFC_chat.heartflow_processor import HeartFCProcessor @@ -82,6 +82,10 @@ class ChatBot: logger.debug(f"用户{userinfo.user_id}被禁止回复") return + if groupinfo.group_id not in global_config.talk_allowed_groups: + logger.debug(f"群{groupinfo.group_id}被禁止回复") + return + if message.message_info.template_info and not message.message_info.template_info.template_default: template_group_name = message.message_info.template_info.template_name template_items = message.message_info.template_info.template_items diff --git a/src/plugins/chat_module/only_process/only_message_process.py b/src/plugins/chat/only_message_process.py similarity index 100% rename from src/plugins/chat_module/only_process/only_message_process.py rename to src/plugins/chat/only_message_process.py diff --git a/src/plugins/heartFC_chat/heartFC_chat.py b/src/plugins/heartFC_chat/heartFC_chat.py index 476798f42..ffed743ab 100644 --- a/src/plugins/heartFC_chat/heartFC_chat.py +++ b/src/plugins/heartFC_chat/heartFC_chat.py @@ -13,9 +13,7 @@ from src.plugins.models.utils_model import LLMRequest from src.config.config import global_config from src.plugins.chat.utils_image import image_path_to_base64 # Local import needed after move from src.plugins.utils.timer_calculater import Timer # <--- Import Timer - -# --- Import necessary dependencies directly --- -from .heartFC_generator import ResponseGenerator # Assuming this is the type for gpt +from src.plugins.heartFC_chat.heartFC_generator import HeartFCGenerator from src.do_tool.tool_use import ToolUser from ..chat.message_sender import message_manager # <-- Import the global manager from src.plugins.chat.emoji_manager import emoji_manager @@ -77,9 +75,7 @@ class HeartFChatting: def __init__( self, - chat_id: str, - gpt_instance: ResponseGenerator, # 文本回复生成器 - tool_user_instance: ToolUser, # 工具使用实例 + chat_id: str ): """ HeartFChatting 初始化函数 @@ -97,13 +93,12 @@ class HeartFChatting: # 初始化状态控制 self._initialized = False # 是否已初始化标志 - self._init_lock = asyncio.Lock() # 初始化锁(确保只初始化一次) self._processing_lock = asyncio.Lock() # 处理锁(确保单次Plan-Replier-Sender周期) self._timer_lock = asyncio.Lock() # 计时器锁(安全更新计时器) # 依赖注入存储 - self.gpt_instance = gpt_instance # 文本回复生成器 - self.tool_user = tool_user_instance # 工具使用实例 + self.gpt_instance = HeartFCGenerator() # 文本回复生成器 + self.tool_user = ToolUser() # 工具使用实例 # LLM规划器配置 self.planner_llm = LLMRequest( @@ -117,7 +112,6 @@ class HeartFChatting: self._loop_timer: float = 0.0 # 循环剩余时间(秒) self._loop_active: bool = False # 循环是否正在运行 self._loop_task: Optional[asyncio.Task] = None # 主循环任务 - self._trigger_count_this_activation: int = 0 # 当前激活周期内的触发计数 self._initial_duration: float = INITIAL_DURATION # 首次触发增加的时间 self._last_added_duration: float = self._initial_duration # 上次增加的时间 @@ -131,35 +125,34 @@ class HeartFChatting: 懒初始化以使用提供的标识符解析chat_stream和sub_hf。 确保实例已准备好处理触发器。 """ - async with self._init_lock: - if self._initialized: - return True - log_prefix = self._get_log_prefix() # 获取前缀 - try: - self.chat_stream = chat_manager.get_stream(self.stream_id) + if self._initialized: + return True + log_prefix = self._get_log_prefix() # 获取前缀 + try: + self.chat_stream = chat_manager.get_stream(self.stream_id) - if not self.chat_stream: - logger.error(f"{log_prefix} 获取ChatStream失败。") - return False - - # <-- 在这里导入 heartflow 实例 - from src.heart_flow.heartflow import heartflow - - self.sub_hf = heartflow.get_subheartflow(self.stream_id) - if not self.sub_hf: - logger.warning(f"{log_prefix} 获取SubHeartflow失败。一些功能可能受限。") - - self._initialized = True - logger.info(f"麦麦感觉到了,激发了HeartFChatting{log_prefix} 初始化成功。") - return True - except Exception as e: - logger.error(f"{log_prefix} 初始化失败: {e}") - logger.error(traceback.format_exc()) + if not self.chat_stream: + logger.error(f"{log_prefix} 获取ChatStream失败。") return False + # <-- 在这里导入 heartflow 实例 + from src.heart_flow.heartflow import heartflow + + self.sub_hf = heartflow.get_subheartflow(self.stream_id) + if not self.sub_hf: + logger.warning(f"{log_prefix} 获取SubHeartflow失败。一些功能可能受限。") + + self._initialized = True + logger.info(f"麦麦感觉到了,激发了HeartFChatting{log_prefix} 初始化成功。") + return True + except Exception as e: + logger.error(f"{log_prefix} 初始化失败: {e}") + logger.error(traceback.format_exc()) + return False + async def add_time(self): """ - 为麦麦添加时间,麦麦有兴趣时,时间增加。 + 为麦麦添加时间,麦麦有兴趣时,固定增加15秒 """ log_prefix = self._get_log_prefix() if not self._initialized: @@ -168,45 +161,61 @@ class HeartFChatting: return async with self._timer_lock: - duration_to_add: float = 0.0 + duration_to_add: float = 15.0 # 固定增加15秒 + if not self._loop_active: # 首次触发 + logger.info(f"{log_prefix} 麦麦有兴趣! 打算聊:15s.") + else: # 循环已激活 + logger.info(f"{log_prefix} 麦麦想继续聊:15s, 还能聊: {self._loop_timer:.1f}s.") - if not self._loop_active: # First trigger for this activation cycle - duration_to_add = self._initial_duration # 使用初始值 - self._last_added_duration = duration_to_add # 更新上次增加的值 - self._trigger_count_this_activation = 1 # Start counting - logger.info( - f"{log_prefix} 麦麦有兴趣! #{self._trigger_count_this_activation}. 麦麦打算聊: {duration_to_add:.2f}s." - ) - else: # Loop is already active, apply 50% reduction - self._trigger_count_this_activation += 1 - duration_to_add = self._last_added_duration * 0.5 - if duration_to_add < 1.5: - duration_to_add = 1.5 - # Update _last_added_duration only if it's >= 0.5 to prevent it from becoming too small - self._last_added_duration = duration_to_add - logger.info( - f"{log_prefix} 麦麦兴趣增加! #{self._trigger_count_this_activation}. 想继续聊: {duration_to_add:.2f}s, 麦麦还能聊: {self._loop_timer:.1f}s." - ) - - # 添加计算出的时间 + # 添加固定时间 new_timer_value = self._loop_timer + duration_to_add - # Add max timer duration limit? e.g., max(0, min(new_timer_value, 300)) self._loop_timer = max(0, new_timer_value) - # Log less frequently, e.g., every 10 seconds or significant change? - # if self._trigger_count_this_activation % 5 == 0: - # logger.info(f"{log_prefix} 麦麦现在想聊{self._loop_timer:.1f}秒") - # Start the loop if it wasn't active and timer is positive + # 添加时间后,检查是否需要启动循环 + await self._start_loop_if_needed() + + async def start(self): + """ + 显式尝试启动 HeartFChatting 的主循环。 + 如果循环未激活且计时器 > 0,则启动循环。 + """ + log_prefix = self._get_log_prefix() + if not self._initialized: + if not await self._initialize(): + logger.error(f"{log_prefix} 无法启动循环: 初始化失败。") + return + logger.info(f"{log_prefix} 尝试显式启动循环...") + await self._start_loop_if_needed() + + async def _start_loop_if_needed(self): + """检查是否需要启动主循环,如果未激活且计时器大于0,则启动。""" + log_prefix = self._get_log_prefix() + should_start_loop = False + async with self._timer_lock: + # 检查是否满足启动条件:未激活且计时器有时间 if not self._loop_active and self._loop_timer > 0: - self._loop_active = True - if self._loop_task and not self._loop_task.done(): - logger.warning(f"{log_prefix} 发现意外的循环任务正在进行。取消它。") - self._loop_task.cancel() + should_start_loop = True + self._loop_active = True # 在锁内标记为活动,防止重复启动 - self._loop_task = asyncio.create_task(self._run_pf_loop()) - self._loop_task.add_done_callback(self._handle_loop_completion) - elif self._loop_active: - logger.trace(f"{log_prefix} 循环已经激活。计时器延长。") + if should_start_loop: + # 检查是否已有任务在运行(理论上不应该,因为 _loop_active=False) + if self._loop_task and not self._loop_task.done(): + logger.warning(f"{log_prefix} 发现之前的循环任务仍在运行(不符合预期)。取消旧任务。") + self._loop_task.cancel() + try: + # 等待旧任务确实被取消 + await asyncio.wait_for(self._loop_task, timeout=0.5) + except (asyncio.CancelledError, asyncio.TimeoutError): + pass # 忽略取消或超时错误 + self._loop_task = None # 清理旧任务引用 + + logger.info(f"{log_prefix} 计时器 > 0 且循环未激活,启动主循环...") + # 创建新的循环任务 + self._loop_task = asyncio.create_task(self._run_pf_loop()) + # 添加完成回调 + self._loop_task.add_done_callback(self._handle_loop_completion) + # else: + # logger.trace(f"{log_prefix} 不需要启动循环(已激活或计时器为0)") # 可以取消注释以进行调试 def _handle_loop_completion(self, task: asyncio.Task): """当 _run_pf_loop 任务完成时执行的回调。""" @@ -228,8 +237,6 @@ class HeartFChatting: if self._processing_lock.locked(): logger.warning(f"{log_prefix} HeartFChatting: 处理锁在循环结束时仍被锁定,强制释放。") self._processing_lock.release() - # Instance removal is now handled by SubHeartflow - # asyncio.create_task(self.heartfc_controller._remove_heartFC_chat_instance(self.stream_id)) # Removed async def _run_pf_loop(self): """ diff --git a/src/plugins/heartFC_chat/heartFC_generator.py b/src/plugins/heartFC_chat/heartFC_generator.py index 70d4109f8..28329b896 100644 --- a/src/plugins/heartFC_chat/heartFC_generator.py +++ b/src/plugins/heartFC_chat/heartFC_generator.py @@ -22,7 +22,7 @@ llm_config = LogConfig( logger = get_module_logger("llm_generator", config=llm_config) -class ResponseGenerator: +class HeartFCGenerator: def __init__(self): self.model_normal = LLMRequest( model=global_config.llm_normal, diff --git a/src/plugins/heartFC_chat/heartflow_processor.py b/src/plugins/heartFC_chat/heartflow_processor.py index 44206d8b3..30ac61c5b 100644 --- a/src/plugins/heartFC_chat/heartflow_processor.py +++ b/src/plugins/heartFC_chat/heartflow_processor.py @@ -67,11 +67,8 @@ class HeartFCProcessor: group_info=groupinfo, ) - # --- 确保 SubHeartflow 存在 --- subheartflow = await heartflow.create_subheartflow(chat.stream_id) - if not subheartflow: - logger.error(f"无法为 stream_id {chat.stream_id} 创建或获取 SubHeartflow,中止处理") - return + message.update_chat_stream(chat) @@ -137,33 +134,16 @@ class HeartFCProcessor: # --- 修改:兴趣度更新逻辑 --- # if is_mentioned: - interest_increase_on_mention = 2 + interest_increase_on_mention = 1 mentioned_boost = interest_increase_on_mention # 从配置获取提及增加值 interested_rate += mentioned_boost - logger.trace(f"消息提及机器人,额外增加兴趣 {mentioned_boost:.2f}") # 更新兴趣度 (调用 SubHeartflow 的方法) - current_interest = 0.0 # 初始化 - try: - # 获取当前时间,传递给 increase_interest - current_time = time.time() - await subheartflow.interest_chatting.increase_interest(current_time, value=interested_rate) - current_interest = await subheartflow.get_interest_level() # 获取更新后的值 + current_time = time.time() + await subheartflow.interest_chatting.increase_interest(current_time, value=interested_rate) - logger.trace( - f"使用激活率 {interested_rate:.2f} 更新后 (通过缓冲后),当前兴趣度: {current_interest:.2f} (Stream: {chat.stream_id})" - ) - - # 添加到 SubHeartflow 的 interest_dict - await subheartflow.add_interest_dict_entry(message, interested_rate, is_mentioned) - logger.trace( - f"Message {message.message_info.message_id} added to interest dict for stream {chat.stream_id}" - ) - - except Exception as e: - logger.error(f"更新兴趣度失败 (Stream: {chat.stream_id}): {e}") - logger.error(traceback.format_exc()) - # --- 结束修改 --- # + # 添加到 SubHeartflow 的 interest_dict,给normal_chat处理 + await subheartflow.add_interest_dict_entry(message, interested_rate, is_mentioned) # 打印消息接收和处理信息 mes_name = chat.group_info.group_name if chat.group_info else "私聊" @@ -172,7 +152,7 @@ class HeartFCProcessor: f"[{current_time}][{mes_name}]" f"{chat.user_info.user_nickname}:" f"{message.processed_plain_text}" - f"兴趣度: {current_interest:.2f}" + f"[兴趣度: {interested_rate:.2f}]" ) try: diff --git a/src/plugins/heartFC_chat/normal_chat.py b/src/plugins/heartFC_chat/normal_chat.py index 8de504151..4acacd6d1 100644 --- a/src/plugins/heartFC_chat/normal_chat.py +++ b/src/plugins/heartFC_chat/normal_chat.py @@ -7,7 +7,7 @@ from typing import List, Optional # 导入 Optional from ..moods.moods import MoodManager from ...config.config import global_config from ..chat.emoji_manager import emoji_manager -from .normal_chat_generator import ResponseGenerator +from .normal_chat_generator import NormalChatGenerator from ..chat.message import MessageSending, MessageRecv, MessageThinking, MessageSet from ..chat.message_sender import message_manager from ..chat.utils_image import image_path_to_base64 @@ -43,12 +43,10 @@ class NormalChat: self.interest_dict = interest_dict - logger.info(f"[{self.stream_name}] 正在初始化 NormalChat 实例...") - - self.gpt = ResponseGenerator() + self.gpt = NormalChatGenerator() self.mood_manager = MoodManager.get_instance() # MoodManager 保持单例 # 存储此实例的兴趣监控任务 - self._interest_monitoring_task: Optional[asyncio.Task] = None + self._chat_task: Optional[asyncio.Task] = None logger.info(f"[{self.stream_name}] NormalChat 实例初始化完成。") # 改为实例方法 @@ -73,7 +71,6 @@ class NormalChat: ) await message_manager.add_message(thinking_message) - return thinking_id # 改为实例方法 @@ -176,7 +173,7 @@ class NormalChat: await asyncio.sleep(1) # 每秒检查一次 # 检查任务是否已被取消 - if self._interest_monitoring_task is None or self._interest_monitoring_task.cancelled(): + if self._chat_task is None or self._chat_task.cancelled(): logger.info(f"[{self.stream_name}] 兴趣监控任务被取消或置空,退出") break @@ -233,7 +230,7 @@ class NormalChat: willing_log = f"[回复意愿:{await willing_manager.get_willing(self.stream_id):.2f}]" if is_willing else "" logger.info( f"[{current_time}][{mes_name}]" - f"{self.chat_stream.user_info.user_nickname}:" # 使用 self.chat_stream + f"{message.message_info.user_info.user_nickname}:" # 使用 self.chat_stream f"{message.processed_plain_text}{willing_log}[概率:{reply_probability * 100:.1f}%]" ) do_reply = False @@ -352,19 +349,21 @@ class NormalChat: return False # 改为实例方法, 移除 chat 参数 - async def start_monitoring_interest(self): - """为此 NormalChat 实例关联的 ChatStream 启动兴趣消息监控任务(如果尚未运行)。""" - if self._interest_monitoring_task is None or self._interest_monitoring_task.done(): - logger.info(f"[{self.stream_name}] 启动兴趣消息监控任务...") + + async def start_chat(self): + """为此 NormalChat 实例关联的 ChatStream 启动聊天任务(如果尚未运行)。""" + if self._chat_task is None or self._chat_task.done(): + logger.info(f"[{self.stream_name}] 启动聊天任务...") task = asyncio.create_task(self._find_interested_message()) task.add_done_callback(lambda t: self._handle_task_completion(t)) # 回调现在是实例方法 - self._interest_monitoring_task = task + self._chat_task = task + # 改为实例方法, 移除 stream_id 参数 def _handle_task_completion(self, task: asyncio.Task): """兴趣监控任务完成时的回调函数。""" # 检查完成的任务是否是当前实例的任务 - if task is not self._interest_monitoring_task: + if task is not self._chat_task: logger.warning(f"[{self.stream_name}] 收到一个未知或过时任务的完成回调。") return @@ -382,27 +381,26 @@ class NormalChat: logger.error(f"[{self.stream_name}] 处理任务完成回调时出错: {e}") finally: # 标记任务已完成/移除 - if self._interest_monitoring_task is task: # 再次确认是当前任务 - self._interest_monitoring_task = None - logger.debug(f"[{self.stream_name}] 兴趣监控任务已被标记为完成/移除。") + if self._chat_task is task: # 再次确认是当前任务 + self._chat_task = None + logger.debug(f"[{self.stream_name}] 聊天任务已被标记为完成/移除。") # 改为实例方法, 移除 stream_id 参数 - async def stop_monitoring_interest(self): + async def stop_chat(self): """停止当前实例的兴趣监控任务。""" - if self._interest_monitoring_task and not self._interest_monitoring_task.done(): - task = self._interest_monitoring_task - logger.info(f"[{self.stream_name}] 尝试取消兴趣监控任务。") + if self._chat_task and not self._chat_task.done(): + task = self._chat_task + logger.info(f"[{self.stream_name}] 尝试取消聊天任务。") task.cancel() try: await task # 等待任务响应取消 except asyncio.CancelledError: - logger.info(f"[{self.stream_name}] 兴趣监控任务已成功取消。") + logger.info(f"[{self.stream_name}] 聊天任务已成功取消。") except Exception as e: # 回调函数 _handle_task_completion 会处理异常日志 logger.warning(f"[{self.stream_name}] 等待监控任务取消时捕获到异常 (可能已在回调中记录): {e}") finally: # 确保任务状态更新,即使等待出错 (回调函数也会尝试更新) - if self._interest_monitoring_task is task: - self._interest_monitoring_task = None - # else: - # logger.debug(f"[{self.stream_name}] 没有正在运行的兴趣监控任务可停止。") + if self._chat_task is task: + self._chat_task = None + diff --git a/src/plugins/heartFC_chat/normal_chat_generator.py b/src/plugins/heartFC_chat/normal_chat_generator.py index ee133f440..07635baf6 100644 --- a/src/plugins/heartFC_chat/normal_chat_generator.py +++ b/src/plugins/heartFC_chat/normal_chat_generator.py @@ -19,7 +19,7 @@ llm_config = LogConfig( logger = get_module_logger("llm_generator", config=llm_config) -class ResponseGenerator: +class NormalChatGenerator: def __init__(self): self.model_reasoning = LLMRequest( model=global_config.llm_reasoning, @@ -77,8 +77,6 @@ class ResponseGenerator: sender_name = f"({message.chat_stream.user_info.user_id}){message.chat_stream.user_info.user_nickname}" else: sender_name = f"用户({message.chat_stream.user_info.user_id})" - - logger.debug("开始使用生成回复-2") # 构建prompt with Timer() as t_build_prompt: prompt = await prompt_builder.build_prompt(