diff --git a/src/chat/emoji_system/emoji_manager.py b/src/chat/emoji_system/emoji_manager.py index 0fbe2f045..051864132 100644 --- a/src/chat/emoji_system/emoji_manager.py +++ b/src/chat/emoji_system/emoji_manager.py @@ -334,6 +334,8 @@ async def clean_unused_emojis(emoji_dir: str, emoji_objects: List["MaiEmoji"],re # 遍历指定目录中的所有文件 for file_name in os.listdir(emoji_dir): file_full_path = os.path.join(emoji_dir, file_name) + + # 确保处理的是文件而不是子目录 if not os.path.isfile(file_full_path): diff --git a/src/chat/focus_chat/heartFC_chat.py b/src/chat/focus_chat/heartFC_chat.py index 411366b47..ce4a43cba 100644 --- a/src/chat/focus_chat/heartFC_chat.py +++ b/src/chat/focus_chat/heartFC_chat.py @@ -216,26 +216,41 @@ class HeartFChatting: async def start(self): """检查是否需要启动主循环,如果未激活则启动。""" + logger.debug(f"{self.log_prefix} 开始启动 HeartFChatting") + # 如果循环已经激活,直接返回 if self._loop_active: + logger.debug(f"{self.log_prefix} HeartFChatting 已激活,无需重复启动") return - # 标记为活动状态,防止重复启动 - self._loop_active = True + try: + # 标记为活动状态,防止重复启动 + self._loop_active = True - # 检查是否已有任务在运行(理论上不应该,因为 _loop_active=False) - if self._loop_task and not self._loop_task.done(): - logger.warning(f"{self.log_prefix} 发现之前的循环任务仍在运行(不符合预期)。取消旧任务。") - self._loop_task.cancel() - try: - # 等待旧任务确实被取消 - await asyncio.wait_for(self._loop_task, timeout=0.5) - except (asyncio.CancelledError, asyncio.TimeoutError): - pass # 忽略取消或超时错误 - self._loop_task = None # 清理旧任务引用 + # 检查是否已有任务在运行(理论上不应该,因为 _loop_active=False) + if self._loop_task and not self._loop_task.done(): + logger.warning(f"{self.log_prefix} 发现之前的循环任务仍在运行(不符合预期)。取消旧任务。") + self._loop_task.cancel() + try: + # 等待旧任务确实被取消 + await asyncio.wait_for(self._loop_task, timeout=5.0) + except (asyncio.CancelledError, asyncio.TimeoutError): + pass # 忽略取消或超时错误 + except Exception as e: + logger.warning(f"{self.log_prefix} 等待旧任务取消时出错: {e}") + self._loop_task = None # 清理旧任务引用 - self._loop_task = asyncio.create_task(self._run_focus_chat()) - self._loop_task.add_done_callback(self._handle_loop_completion) + logger.debug(f"{self.log_prefix} 创建新的 HeartFChatting 主循环任务") + self._loop_task = asyncio.create_task(self._run_focus_chat()) + self._loop_task.add_done_callback(self._handle_loop_completion) + logger.debug(f"{self.log_prefix} HeartFChatting 启动完成") + + except Exception as e: + # 启动失败时重置状态 + self._loop_active = False + self._loop_task = None + logger.error(f"{self.log_prefix} HeartFChatting 启动失败: {e}") + raise def _handle_loop_completion(self, task: asyncio.Task): """当 _hfc_loop 任务完成时执行的回调。""" @@ -260,6 +275,8 @@ class HeartFChatting: try: while True: # 主循环 logger.debug(f"{self.log_prefix} 开始第{self._cycle_counter}次循环") + + # 检查关闭标志 if self._shutting_down: logger.info(f"{self.log_prefix} 检测到关闭标志,退出 Focus Chat 循环。") break @@ -274,73 +291,98 @@ class HeartFChatting: loop_cycle_start_time = time.monotonic() # 执行规划和处理阶段 - async with self._get_cycle_context(): - thinking_id = "tid" + str(round(time.time(), 2)) - self._current_cycle_detail.set_thinking_id(thinking_id) - # 主循环:思考->决策->执行 - async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): - logger.debug(f"模板 {self.chat_stream.context.get_template_name()}") - loop_info = await self._observe_process_plan_action_loop(cycle_timers, thinking_id) + try: + async with self._get_cycle_context(): + thinking_id = "tid" + str(round(time.time(), 2)) + self._current_cycle_detail.set_thinking_id(thinking_id) + + # 使用异步上下文管理器处理消息 + try: + async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): + # 在上下文内部检查关闭状态 + if self._shutting_down: + logger.info(f"{self.log_prefix} 在处理上下文中检测到关闭信号,退出") + break + + logger.debug(f"模板 {self.chat_stream.context.get_template_name()}") + loop_info = await self._observe_process_plan_action_loop(cycle_timers, thinking_id) - if loop_info["loop_action_info"]["command"] == "stop_focus_chat": - logger.info(f"{self.log_prefix} 麦麦决定停止专注聊天") - # 如果设置了回调函数,则调用它 - if self.on_stop_focus_chat: - try: - await self.on_stop_focus_chat() - logger.info(f"{self.log_prefix} 成功调用回调函数处理停止专注聊天") - except Exception as e: - logger.error(f"{self.log_prefix} 调用停止专注聊天回调函数时出错: {e}") - logger.error(traceback.format_exc()) + if loop_info["loop_action_info"]["command"] == "stop_focus_chat": + logger.info(f"{self.log_prefix} 麦麦决定停止专注聊天") + # 如果设置了回调函数,则调用它 + if self.on_stop_focus_chat: + try: + await self.on_stop_focus_chat() + logger.info(f"{self.log_prefix} 成功调用回调函数处理停止专注聊天") + except Exception as e: + logger.error(f"{self.log_prefix} 调用停止专注聊天回调函数时出错: {e}") + logger.error(traceback.format_exc()) + break + + except asyncio.CancelledError: + logger.info(f"{self.log_prefix} 处理上下文时任务被取消") break + except Exception as e: + logger.error(f"{self.log_prefix} 处理上下文时出错: {e}") + # 上下文处理失败,跳过当前循环 + await asyncio.sleep(1) + continue - self._current_cycle_detail.set_loop_info(loop_info) + self._current_cycle_detail.set_loop_info(loop_info) - # 从observations列表中获取HFCloopObservation - hfcloop_observation = next( - (obs for obs in self.observations if isinstance(obs, HFCloopObservation)), None - ) - if hfcloop_observation: - hfcloop_observation.add_loop_info(self._current_cycle_detail) - else: - logger.warning(f"{self.log_prefix} 未找到HFCloopObservation实例") + # 从observations列表中获取HFCloopObservation + hfcloop_observation = next( + (obs for obs in self.observations if isinstance(obs, HFCloopObservation)), None + ) + if hfcloop_observation: + hfcloop_observation.add_loop_info(self._current_cycle_detail) + else: + logger.warning(f"{self.log_prefix} 未找到HFCloopObservation实例") - self._current_cycle_detail.timers = cycle_timers + self._current_cycle_detail.timers = cycle_timers - # 防止循环过快消耗资源 - await _handle_cycle_delay( - loop_info["loop_action_info"]["action_taken"], loop_cycle_start_time, self.log_prefix + # 防止循环过快消耗资源 + await _handle_cycle_delay( + loop_info["loop_action_info"]["action_taken"], loop_cycle_start_time, self.log_prefix + ) + + # 完成当前循环并保存历史 + self._current_cycle_detail.complete_cycle() + self._cycle_history.append(self._current_cycle_detail) + + # 记录循环信息和计时器结果 + timer_strings = [] + for name, elapsed in cycle_timers.items(): + formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒" + timer_strings.append(f"{name}: {formatted_time}") + + # 新增:输出每个处理器的耗时 + processor_time_costs = self._current_cycle_detail.loop_processor_info.get("processor_time_costs", {}) + processor_time_strings = [] + for pname, ptime in processor_time_costs.items(): + formatted_ptime = f"{ptime * 1000:.2f}毫秒" if ptime < 1 else f"{ptime:.2f}秒" + processor_time_strings.append(f"{pname}: {formatted_ptime}") + processor_time_log = ( + ("\n各处理器耗时: " + "; ".join(processor_time_strings)) if processor_time_strings else "" ) - # 完成当前循环并保存历史 - self._current_cycle_detail.complete_cycle() - self._cycle_history.append(self._current_cycle_detail) + logger.info( + f"{self.log_prefix} 第{self._current_cycle_detail.cycle_id}次思考," + f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒, " + f"动作: {self._current_cycle_detail.loop_plan_info['action_result']['action_type']}" + + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "") + + processor_time_log + ) - # 记录循环信息和计时器结果 - timer_strings = [] - for name, elapsed in cycle_timers.items(): - formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒" - timer_strings.append(f"{name}: {formatted_time}") - - # 新增:输出每个处理器的耗时 - processor_time_costs = self._current_cycle_detail.loop_processor_info.get("processor_time_costs", {}) - processor_time_strings = [] - for pname, ptime in processor_time_costs.items(): - formatted_ptime = f"{ptime * 1000:.2f}毫秒" if ptime < 1 else f"{ptime:.2f}秒" - processor_time_strings.append(f"{pname}: {formatted_ptime}") - processor_time_log = ( - ("\n各处理器耗时: " + "; ".join(processor_time_strings)) if processor_time_strings else "" - ) - - logger.info( - f"{self.log_prefix} 第{self._current_cycle_detail.cycle_id}次思考," - f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒, " - f"动作: {self._current_cycle_detail.loop_plan_info['action_result']['action_type']}" - + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "") - + processor_time_log - ) - - await asyncio.sleep(global_config.focus_chat.think_interval) + await asyncio.sleep(global_config.focus_chat.think_interval) + + except asyncio.CancelledError: + logger.info(f"{self.log_prefix} 循环处理时任务被取消") + break + except Exception as e: + logger.error(f"{self.log_prefix} 循环处理时出错: {e}") + logger.error(traceback.format_exc()) + await asyncio.sleep(1) # 出错后等待一秒再继续 except asyncio.CancelledError: # 设置了关闭标志位后被取消是正常流程 diff --git a/src/chat/heart_flow/sub_heartflow.py b/src/chat/heart_flow/sub_heartflow.py index bb8e78581..661a4db96 100644 --- a/src/chat/heart_flow/sub_heartflow.py +++ b/src/chat/heart_flow/sub_heartflow.py @@ -78,19 +78,22 @@ class SubHeartflow: logger.info(f"{self.log_prefix} 离开normal模式") try: logger.debug(f"{self.log_prefix} 开始调用 stop_chat()") - # 添加超时保护,避免无限等待 - await asyncio.wait_for(self.normal_chat_instance.stop_chat(), timeout=10.0) + # 使用更短的超时时间,强制快速停止 + await asyncio.wait_for(self.normal_chat_instance.stop_chat(), timeout=3.0) logger.debug(f"{self.log_prefix} stop_chat() 调用完成") except asyncio.TimeoutError: logger.warning(f"{self.log_prefix} 停止 NormalChat 超时,强制清理") - # 超时时强制清理 + # 超时时强制清理实例 self.normal_chat_instance = None except Exception as e: logger.error(f"{self.log_prefix} 停止 NormalChat 监控任务时出错: {e}") - logger.error(traceback.format_exc()) - # 出错时也要清理实例 + # 出错时也要清理实例,避免状态不一致 self.normal_chat_instance = None finally: + # 确保实例被清理 + if self.normal_chat_instance: + logger.warning(f"{self.log_prefix} 强制清理 NormalChat 实例") + self.normal_chat_instance = None logger.debug(f"{self.log_prefix} _stop_normal_chat 完成") async def _start_normal_chat(self, rewind=False) -> bool: @@ -175,46 +178,71 @@ class SubHeartflow: async def _start_heart_fc_chat(self) -> bool: """启动 HeartFChatting 实例,确保 NormalChat 已停止""" - await self._stop_normal_chat() # 确保普通聊天监控已停止 - self.interest_dict.clear() - - log_prefix = self.log_prefix - # 如果实例已存在,检查其循环任务状态 - if self.heart_fc_instance: - # 如果任务已完成或不存在,则尝试重新启动 - if self.heart_fc_instance._loop_task is None or self.heart_fc_instance._loop_task.done(): - logger.info(f"{log_prefix} HeartFChatting 实例存在但循环未运行,尝试启动...") - try: - await self.heart_fc_instance.start() # 启动循环 - logger.info(f"{log_prefix} HeartFChatting 循环已启动。") - return True - except Exception as e: - logger.error(f"{log_prefix} 尝试启动现有 HeartFChatting 循环时出错: {e}") - logger.error(traceback.format_exc()) - return False # 启动失败 - else: - # 任务正在运行 - logger.debug(f"{log_prefix} HeartFChatting 已在运行中。") - return True # 已经在运行 - - # 如果实例不存在,则创建并启动 - logger.info(f"{log_prefix} 麦麦准备开始专注聊天...") + logger.debug(f"{self.log_prefix} 开始启动 HeartFChatting") + try: - self.heart_fc_instance = HeartFChatting( - chat_id=self.subheartflow_id, - # observations=self.observations, - on_stop_focus_chat=self._handle_stop_focus_chat_request, - ) + # 确保普通聊天监控已停止 + await self._stop_normal_chat() + self.interest_dict.clear() + + log_prefix = self.log_prefix + # 如果实例已存在,检查其循环任务状态 + if self.heart_fc_instance: + logger.debug(f"{log_prefix} HeartFChatting 实例已存在,检查状态") + # 如果任务已完成或不存在,则尝试重新启动 + if self.heart_fc_instance._loop_task is None or self.heart_fc_instance._loop_task.done(): + logger.info(f"{log_prefix} HeartFChatting 实例存在但循环未运行,尝试启动...") + try: + # 添加超时保护 + await asyncio.wait_for(self.heart_fc_instance.start(), timeout=15.0) + logger.info(f"{log_prefix} HeartFChatting 循环已启动。") + return True + except asyncio.TimeoutError: + logger.error(f"{log_prefix} 启动现有 HeartFChatting 循环超时") + # 超时时清理实例,准备重新创建 + self.heart_fc_instance = None + except Exception as e: + logger.error(f"{log_prefix} 尝试启动现有 HeartFChatting 循环时出错: {e}") + logger.error(traceback.format_exc()) + # 出错时清理实例,准备重新创建 + self.heart_fc_instance = None + else: + # 任务正在运行 + logger.debug(f"{log_prefix} HeartFChatting 已在运行中。") + return True # 已经在运行 - await self.heart_fc_instance.start() - logger.debug(f"{log_prefix} 麦麦已成功进入专注聊天模式 (新实例已启动)。") - return True + # 如果实例不存在,则创建并启动 + logger.info(f"{log_prefix} 麦麦准备开始专注聊天...") + try: + logger.debug(f"{log_prefix} 创建新的 HeartFChatting 实例") + self.heart_fc_instance = HeartFChatting( + chat_id=self.subheartflow_id, + # observations=self.observations, + on_stop_focus_chat=self._handle_stop_focus_chat_request, + ) + logger.debug(f"{log_prefix} 启动 HeartFChatting 实例") + # 添加超时保护 + await asyncio.wait_for(self.heart_fc_instance.start(), timeout=15.0) + logger.debug(f"{log_prefix} 麦麦已成功进入专注聊天模式 (新实例已启动)。") + return True + + except asyncio.TimeoutError: + logger.error(f"{log_prefix} 创建或启动新 HeartFChatting 实例超时") + self.heart_fc_instance = None # 超时时清理实例 + return False + except Exception as e: + logger.error(f"{log_prefix} 创建或启动 HeartFChatting 实例时出错: {e}") + logger.error(traceback.format_exc()) + self.heart_fc_instance = None # 创建或初始化异常,清理实例 + return False + except Exception as e: - logger.error(f"{log_prefix} 创建或启动 HeartFChatting 实例时出错: {e}") + logger.error(f"{self.log_prefix} _start_heart_fc_chat 执行时出错: {e}") logger.error(traceback.format_exc()) - self.heart_fc_instance = None # 创建或初始化异常,清理实例 return False + finally: + logger.debug(f"{self.log_prefix} _start_heart_fc_chat 完成") async def change_chat_state(self, new_state: ChatState) -> None: """ diff --git a/src/chat/normal_chat/normal_chat.py b/src/chat/normal_chat/normal_chat.py index 5d271e714..841927654 100644 --- a/src/chat/normal_chat/normal_chat.py +++ b/src/chat/normal_chat/normal_chat.py @@ -159,94 +159,119 @@ class NormalChat: 后台任务方法,轮询当前实例关联chat的兴趣消息 通常由start_monitoring_interest()启动 """ - while True: - try: - # 检查任务是否已被取消 - 移动到try块最开始 - if self._chat_task is None or self._chat_task.cancelled(): - logger.info(f"[{self.stream_name}] 兴趣监控任务被取消或置空,退出") - break - - # 检查是否已停用 + logger.debug(f"[{self.stream_name}] 兴趣监控任务开始") + + try: + while True: + # 第一层检查:立即检查取消和停用状态 if self._disabled: - logger.info(f"[{self.stream_name}] 已停用,退出兴趣监控") + logger.info(f"[{self.stream_name}] 检测到停用标志,退出兴趣监控") break - - await asyncio.sleep(0.5) # 每0.5秒检查一次 - # 再次检查取消状态 - if self._chat_task is None or self._chat_task.cancelled() or self._disabled: - logger.info(f"[{self.stream_name}] 检测到停止信号,退出") + # 检查当前任务是否已被取消 + current_task = asyncio.current_task() + if current_task and current_task.cancelled(): + logger.info(f"[{self.stream_name}] 当前任务已被取消,退出") break - items_to_process = list(self.interest_dict.items()) - if not items_to_process: - continue - - # 使用异步上下文管理器处理消息 try: - async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): - # 在上下文内部再次检查取消状态 - if self._chat_task is None or self._chat_task.cancelled() or self._disabled: - logger.info(f"[{self.stream_name}] 在处理上下文中检测到停止信号,退出") - break + # 短暂等待,让出控制权 + await asyncio.sleep(0.1) + + # 第二层检查:睡眠后再次检查状态 + if self._disabled: + logger.info(f"[{self.stream_name}] 睡眠后检测到停用标志,退出") + break - # 并行处理兴趣消息 - async def process_single_message(msg_id, message, interest_value, is_mentioned): - """处理单个兴趣消息""" - try: - # 在处理每个消息前检查停止状态 - if self._disabled or (self._chat_task and self._chat_task.cancelled()): - return + # 获取待处理消息 + items_to_process = list(self.interest_dict.items()) + if not items_to_process: + # 没有消息时继续下一轮循环 + continue - # 处理消息 - if time.time() - self.start_time > 300: - self.adjust_reply_frequency(duration=300 / 60) - else: - self.adjust_reply_frequency(duration=(time.time() - self.start_time) / 60) + # 第三层检查:在处理消息前最后检查一次 + if self._disabled: + logger.info(f"[{self.stream_name}] 处理消息前检测到停用标志,退出") + break - await self.normal_response( - message=message, - is_mentioned=is_mentioned, - interested_rate=interest_value * self.willing_amplifier, - ) - except Exception as e: - logger.error( - f"[{self.stream_name}] 处理兴趣消息{msg_id}时出错: {e}\n{traceback.format_exc()}" - ) - finally: - self.interest_dict.pop(msg_id, None) + # 使用异步上下文管理器处理消息 + try: + async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): + # 在上下文内部再次检查取消状态 + if self._disabled: + logger.info(f"[{self.stream_name}] 在处理上下文中检测到停止信号,退出") + break - # 创建并行任务列表 - tasks = [] - for msg_id, (message, interest_value, is_mentioned) in items_to_process: - task = process_single_message(msg_id, message, interest_value, is_mentioned) - tasks.append(task) + # 并行处理兴趣消息 + async def process_single_message(msg_id, message, interest_value, is_mentioned): + """处理单个兴趣消息""" + try: + # 在处理每个消息前检查停止状态 + if self._disabled: + logger.debug(f"[{self.stream_name}] 处理消息时检测到停用,跳过消息 {msg_id}") + return - # 并行执行所有任务,限制并发数量避免资源过度消耗 - if tasks: - # 使用信号量控制并发数,最多同时处理5个消息 - semaphore = asyncio.Semaphore(5) + # 处理消息 + if time.time() - self.start_time > 300: + self.adjust_reply_frequency(duration=300 / 60) + else: + self.adjust_reply_frequency(duration=(time.time() - self.start_time) / 60) - async def limited_process(task, sem): - async with sem: - await task + await self.normal_response( + message=message, + is_mentioned=is_mentioned, + interested_rate=interest_value * self.willing_amplifier, + ) + except asyncio.CancelledError: + logger.debug(f"[{self.stream_name}] 处理消息 {msg_id} 时被取消") + raise # 重新抛出取消异常 + except Exception as e: + logger.error(f"[{self.stream_name}] 处理兴趣消息{msg_id}时出错: {e}") + # 不打印完整traceback,避免日志污染 + finally: + # 无论如何都要清理消息 + self.interest_dict.pop(msg_id, None) + + # 创建并行任务列表 + tasks = [] + for msg_id, (message, interest_value, is_mentioned) in items_to_process: + task = process_single_message(msg_id, message, interest_value, is_mentioned) + tasks.append(task) + + # 并行执行所有任务,限制并发数量避免资源过度消耗 + if tasks: + # 使用信号量控制并发数,最多同时处理5个消息 + semaphore = asyncio.Semaphore(5) + + async def limited_process(task, sem): + async with sem: + await task + + limited_tasks = [limited_process(task, semaphore) for task in tasks] + await asyncio.gather(*limited_tasks, return_exceptions=True) + + except asyncio.CancelledError: + logger.info(f"[{self.stream_name}] 处理上下文时任务被取消") + break + except Exception as e: + logger.error(f"[{self.stream_name}] 处理上下文时出错: {e}") + # 出错后短暂等待,避免快速重试 + await asyncio.sleep(0.5) - limited_tasks = [limited_process(task, semaphore) for task in tasks] - await asyncio.gather(*limited_tasks, return_exceptions=True) - except asyncio.CancelledError: - logger.info(f"[{self.stream_name}] 处理上下文时任务被取消") + logger.info(f"[{self.stream_name}] 主循环中任务被取消") break except Exception as e: - logger.error(f"[{self.stream_name}] 处理上下文时出错: {e}") - await asyncio.sleep(1) + logger.error(f"[{self.stream_name}] 主循环出错: {e}") + # 出错后等待一秒再继续 + await asyncio.sleep(1.0) - except asyncio.CancelledError: - logger.info(f"[{self.stream_name}] 兴趣监控任务被取消") - break - except Exception as e: - logger.error(f"[{self.stream_name}] 兴趣监控任务出错: {e}\n{traceback.format_exc()}") - await asyncio.sleep(1) # 出错后等待一秒再继续 + except asyncio.CancelledError: + logger.info(f"[{self.stream_name}] 兴趣监控任务被取消") + except Exception as e: + logger.error(f"[{self.stream_name}] 兴趣监控任务严重错误: {e}") + finally: + logger.debug(f"[{self.stream_name}] 兴趣监控任务结束") # 改为实例方法, 移除 chat 参数 async def normal_response(self, message: MessageRecv, is_mentioned: bool, interested_rate: float) -> None: @@ -504,60 +529,112 @@ class NormalChat: # 改为实例方法, 移除 chat 参数 async def start_chat(self): - """启动聊天任务。""" # Ensure initialized before starting tasks - self._disabled = False # 启动时重置停用标志 - - if self._chat_task is None or self._chat_task.done(): - # logger.info(f"[{self.stream_name}] 开始处理兴趣消息...") - polling_task = asyncio.create_task(self._reply_interested_message()) - polling_task.add_done_callback(lambda t: self._handle_task_completion(t)) - self._chat_task = polling_task - else: + """启动聊天任务。""" + logger.debug(f"[{self.stream_name}] 开始启动聊天任务") + + # 重置停用标志 + self._disabled = False + + # 检查是否已有运行中的任务 + if self._chat_task and not self._chat_task.done(): logger.info(f"[{self.stream_name}] 聊天轮询任务已在运行中。") + return + + # 清理可能存在的已完成任务引用 + if self._chat_task and self._chat_task.done(): + self._chat_task = None + + try: + logger.debug(f"[{self.stream_name}] 创建新的聊天轮询任务") + polling_task = asyncio.create_task(self._reply_interested_message()) + + # 设置回调 + polling_task.add_done_callback(lambda t: self._handle_task_completion(t)) + + # 保存任务引用 + self._chat_task = polling_task + + logger.debug(f"[{self.stream_name}] 聊天任务启动完成") + + except Exception as e: + logger.error(f"[{self.stream_name}] 启动聊天任务失败: {e}") + self._chat_task = None + raise def _handle_task_completion(self, task: asyncio.Task): """任务完成回调处理""" - if task is not self._chat_task: - logger.warning(f"[{self.stream_name}] 收到未知任务回调") - return try: - if exc := task.exception(): - logger.error(f"[{self.stream_name}] 任务异常: {exc}") - traceback.print_exc() - except asyncio.CancelledError: - logger.debug(f"[{self.stream_name}] 任务已取消") + # 简化回调逻辑,避免复杂的异常处理 + logger.debug(f"[{self.stream_name}] 任务完成回调被调用") + + # 检查是否是我们管理的任务 + if task is not self._chat_task: + # 如果已经不是当前任务(可能在stop_chat中已被清空),直接返回 + logger.debug(f"[{self.stream_name}] 回调的任务不是当前管理的任务") + return + + # 清理任务引用 + self._chat_task = None + logger.debug(f"[{self.stream_name}] 任务引用已清理") + + # 简单记录任务状态,不进行复杂处理 + if task.cancelled(): + logger.debug(f"[{self.stream_name}] 任务已取消") + elif task.done(): + try: + # 尝试获取异常,但不抛出 + exc = task.exception() + if exc: + logger.error(f"[{self.stream_name}] 任务异常: {type(exc).__name__}: {exc}") + else: + logger.debug(f"[{self.stream_name}] 任务正常完成") + except Exception as e: + # 获取异常时也可能出错,静默处理 + logger.debug(f"[{self.stream_name}] 获取任务异常时出错: {e}") + except Exception as e: - logger.error(f"[{self.stream_name}] 回调处理错误: {e}") - finally: - if self._chat_task is task: - self._chat_task = None - logger.debug(f"[{self.stream_name}] 任务清理完成") + # 回调函数中的任何异常都要捕获,避免影响系统 + logger.error(f"[{self.stream_name}] 任务完成回调处理出错: {e}") + # 确保任务引用被清理 + self._chat_task = None # 改为实例方法, 移除 stream_id 参数 async def stop_chat(self): """停止当前实例的兴趣监控任务。""" - self._disabled = True # 停止时设置停用标志 - if self._chat_task and not self._chat_task.done(): - task = self._chat_task - logger.debug(f"[{self.stream_name}] 尝试取消normal聊天任务。") - task.cancel() - try: - # 添加超时机制,最多等待2秒 - await asyncio.wait_for(task, timeout=2.0) - except asyncio.TimeoutError: - logger.warning(f"[{self.stream_name}] 等待任务取消超时,强制结束") - except asyncio.CancelledError: - logger.info(f"[{self.stream_name}] 结束一般聊天模式。") - except Exception as e: - # 回调函数 _handle_task_completion 会处理异常日志 - logger.warning(f"[{self.stream_name}] 等待监控任务取消时捕获到异常 (可能已在回调中记录): {e}") - finally: - # 确保任务状态更新,即使等待出错 (回调函数也会尝试更新) - if self._chat_task is task: - self._chat_task = None + logger.debug(f"[{self.stream_name}] 开始停止聊天任务") + + # 立即设置停用标志,防止新任务启动 + self._disabled = True + + # 如果没有运行中的任务,直接返回 + if not self._chat_task or self._chat_task.done(): + logger.debug(f"[{self.stream_name}] 没有运行中的任务,直接完成停止") + self._chat_task = None + return + + # 保存任务引用并立即清空,避免回调中的循环引用 + task_to_cancel = self._chat_task + self._chat_task = None + + logger.debug(f"[{self.stream_name}] 取消聊天任务") + + # 尝试优雅取消任务 + task_to_cancel.cancel() + + # 不等待任务完成,让它自然结束 + # 这样可以避免等待过程中的潜在递归问题 + + # 异步清理思考消息,不阻塞当前流程 + asyncio.create_task(self._cleanup_thinking_messages_async()) + + logger.debug(f"[{self.stream_name}] 聊天任务停止完成") - # 清理所有未处理的思考消息 + async def _cleanup_thinking_messages_async(self): + """异步清理思考消息,避免阻塞主流程""" try: + # 添加短暂延迟,让任务有时间响应取消 + await asyncio.sleep(0.1) + container = await message_manager.get_container(self.stream_id) if container: # 查找并移除所有 MessageThinking 类型的消息 @@ -567,8 +644,8 @@ class NormalChat: container.messages.remove(msg) logger.info(f"[{self.stream_name}] 清理了 {len(thinking_messages)} 条未处理的思考消息。") except Exception as e: - logger.error(f"[{self.stream_name}] 清理思考消息时出错: {e}") - traceback.print_exc() + logger.error(f"[{self.stream_name}] 异步清理思考消息时出错: {e}") + # 不打印完整栈跟踪,避免日志污染 # 获取最近回复记录的方法 def get_recent_replies(self, limit: int = 10) -> List[dict]: diff --git a/test_normal_chat_stop.py b/test_normal_chat_stop.py new file mode 100644 index 000000000..743353441 --- /dev/null +++ b/test_normal_chat_stop.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +""" +NormalChat 启动停止测试脚本 +""" + +import asyncio +import time +import logging +from src.common.logger import get_logger + +logger = get_logger("test_normal_chat_stop") + +async def test_task_cancel_behavior(): + """测试任务取消行为""" + + class MockNormalChat: + def __init__(self): + self._disabled = False + self._chat_task = None + self.stream_name = "test_stream" + + async def mock_reply_loop(self): + """模拟回复循环""" + logger.info("模拟回复循环开始") + try: + while True: + # 检查停用标志 + if self._disabled: + logger.info("检测到停用标志,退出循环") + break + + # 模拟工作 + logger.info("模拟处理消息...") + await asyncio.sleep(0.1) + + except asyncio.CancelledError: + logger.info("模拟回复循环被取消") + raise + except Exception as e: + logger.error(f"模拟回复循环出错: {e}") + finally: + logger.info("模拟回复循环结束") + + async def start_chat(self): + """启动聊天""" + if self._chat_task and not self._chat_task.done(): + logger.info("任务已在运行") + return + + self._disabled = False + self._chat_task = asyncio.create_task(self.mock_reply_loop()) + logger.info("聊天任务已启动") + + async def stop_chat(self): + """停止聊天""" + logger.info("开始停止聊天") + + # 设置停用标志 + self._disabled = True + + if not self._chat_task or self._chat_task.done(): + logger.info("没有运行中的任务") + return + + # 保存任务引用并清空 + task_to_cancel = self._chat_task + self._chat_task = None + + # 取消任务 + task_to_cancel.cancel() + + logger.info("聊天任务停止完成") + + # 测试正常启动停止 + logger.info("=== 测试正常启动停止 ===") + chat = MockNormalChat() + + # 启动 + await chat.start_chat() + await asyncio.sleep(0.5) # 让任务运行一会 + + # 停止 + await chat.stop_chat() + await asyncio.sleep(0.1) # 让取消操作完成 + + logger.info("=== 测试完成 ===") + +async def main(): + """主函数""" + logger.info("开始 NormalChat 停止测试") + + try: + await test_task_cancel_behavior() + except Exception as e: + logger.error(f"测试失败: {e}") + import traceback + logger.error(traceback.format_exc()) + + logger.info("测试结束") + +if __name__ == "__main__": + # 设置日志级别 + logging.basicConfig(level=logging.INFO) + asyncio.run(main()) \ No newline at end of file