From e4b9b9e5d1dd21a62247b458c7fa5dc618885f51 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Sun, 15 Jun 2025 01:14:07 +0800 Subject: [PATCH 1/6] =?UTF-8?q?fix=EF=BC=9A=E5=8F=82=E6=95=B0=E8=B0=83?= =?UTF-8?q?=E6=95=B4=EF=BC=8C=E6=8F=90=E9=82=A3=E5=AE=B6=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E9=A1=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chat/focus_chat/expressors/default_expressor.py | 4 ++-- src/chat/normal_chat/normal_chat.py | 4 ++-- src/chat/normal_chat/normal_chat_action_modifier.py | 2 +- src/config/official_configs.py | 3 +++ template/bot_config_template.toml | 3 ++- 5 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/chat/focus_chat/expressors/default_expressor.py b/src/chat/focus_chat/expressors/default_expressor.py index 816ff7347..9be412c83 100644 --- a/src/chat/focus_chat/expressors/default_expressor.py +++ b/src/chat/focus_chat/expressors/default_expressor.py @@ -28,7 +28,7 @@ logger = get_logger("expressor") def init_prompt(): Prompt( """ -你可以参考以下的语言习惯,如果情景合适就使用,不要盲目使用,不要生硬使用,而是结合到表达中: +你可以参考你的以下的语言习惯,如果情景合适就使用,不要盲目使用,不要生硬使用,而是结合到表达中: {style_habbits} 你现在正在群里聊天,以下是群里正在进行的聊天内容: @@ -38,7 +38,7 @@ def init_prompt(): {chat_target} 你的名字是{bot_name},{prompt_personality},在这聊天中,"{target_message}"引起了你的注意,对这句话,你想表达:{in_mind_reply},原因是:{reason}。你现在要思考怎么回复 -你需要使用合适的语法和句法,参考聊天内容,组织一条日常且口语化的回复。 +你需要使用合适的语法和句法,参考聊天内容,组织一条日常且口语化的回复。请你修改你想表达的原句,符合你的表达风格和语言习惯 请你根据情景使用以下句法: {grammar_habbits} {config_expression_style},你可以完全重组回复,保留最基本的表达含义就好,但重组后保持语意通顺。 diff --git a/src/chat/normal_chat/normal_chat.py b/src/chat/normal_chat/normal_chat.py index 0cb7f9cc8..7c23492bc 100644 --- a/src/chat/normal_chat/normal_chat.py +++ b/src/chat/normal_chat/normal_chat.py @@ -750,14 +750,14 @@ class NormalChat: reply_ratio = reply_count / total_messages if total_messages > 0 else 0 # 使用对数函数让低比率时概率上升更快:log(1 + ratio * k) / log(1 + k) + base # k=7时,0.05比率对应约0.4概率,0.1比率对应约0.6概率,0.2比率对应约0.8概率 - k_reply = 7 + k_reply = 7 * global_config.normal_chat.relation_frequency base_reply_prob = 0.1 # 基础概率10% reply_build_probability = (math.log(1 + reply_ratio * k_reply) / math.log(1 + k_reply)) * 0.9 + base_reply_prob if reply_ratio > 0 else base_reply_prob # 计算接收概率(receive_count的影响) receive_ratio = receive_count / total_messages if total_messages > 0 else 0 # 接收概率使用更温和的对数曲线,最大0.5,基础0.08 - k_receive = 6 + k_receive = 6 * global_config.normal_chat.relation_frequency base_receive_prob = 0.08 # 基础概率8% receive_build_probability = (math.log(1 + receive_ratio * k_receive) / math.log(1 + k_receive)) * 0.42 + base_receive_prob if receive_ratio > 0 else base_receive_prob diff --git a/src/chat/normal_chat/normal_chat_action_modifier.py b/src/chat/normal_chat/normal_chat_action_modifier.py index f2f9ee1a1..38d02e95a 100644 --- a/src/chat/normal_chat/normal_chat_action_modifier.py +++ b/src/chat/normal_chat/normal_chat_action_modifier.py @@ -309,7 +309,7 @@ class NormalChatActionModifier: if container: thinking_count = sum(1 for msg in container.messages if isinstance(msg, MessageThinking)) print(f"thinking_count: {thinking_count}") - if thinking_count >= 4 / global_config.chat.auto_focus_threshold: # 如果堆积超过3条思考消息 + if thinking_count >= 4 * global_config.chat.auto_focus_threshold: # 如果堆积超过3条思考消息 logger.debug(f"{self.log_prefix} 检测到思考消息堆积({thinking_count}条),切换到focus模式") return True diff --git a/src/config/official_configs.py b/src/config/official_configs.py index a98180d57..6984073f6 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -57,6 +57,9 @@ class RelationshipConfig(ConfigBase): build_relationship_interval: int = 600 """构建关系间隔 单位秒,如果为0则不构建关系""" + + relation_frequency: int = 1 + """关系频率,麦麦构建关系的速度,仅在normal_chat模式下有效""" @dataclass diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index ead9ce7a6..8c93cfa41 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "2.20.0" +version = "2.21.0" #----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读---- #如果你想要修改配置文件,请在修改后将version的值进行变更 @@ -46,6 +46,7 @@ learning_interval = 600 # 学习间隔 单位秒 [relationship] enable_relationship = true # 是否启用关系系统 +relation_frequency = 1 # 关系频率,麦麦构建关系的速度,仅在normal_chat模式下有效 [chat] #麦麦的聊天通用设置 chat_mode = "normal" # 聊天模式 —— 普通模式:normal,专注模式:focus,在普通模式和专注模式之间自动切换 From 225a0f50aa40a2197b593774badb185ccef57507 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Sat, 14 Jun 2025 17:14:53 +0000 Subject: [PATCH 2/6] =?UTF-8?q?=F0=9F=A4=96=20=E8=87=AA=E5=8A=A8=E6=A0=BC?= =?UTF-8?q?=E5=BC=8F=E5=8C=96=E4=BB=A3=E7=A0=81=20[skip=20ci]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/config/official_configs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 6984073f6..56913e2b5 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -57,7 +57,7 @@ class RelationshipConfig(ConfigBase): build_relationship_interval: int = 600 """构建关系间隔 单位秒,如果为0则不构建关系""" - + relation_frequency: int = 1 """关系频率,麦麦构建关系的速度,仅在normal_chat模式下有效""" From 327580dbecfc8bc6277886940fce54d7af82f0c8 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Sun, 15 Jun 2025 01:27:54 +0800 Subject: [PATCH 3/6] =?UTF-8?q?fix:ai=E5=93=A5=E7=A5=9E=E7=A7=98=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E6=97=A0=E6=B3=95=E6=A8=A1=E5=BC=8F=E5=88=87=E6=8D=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bot.py | 31 +++++-- src/chat/emoji_system/emoji_manager.py | 8 +- src/chat/heart_flow/sub_heartflow.py | 13 ++- src/chat/normal_chat/normal_chat.py | 116 +++++++++++++++---------- src/chat/utils/prompt_builder.py | 30 +++++-- src/manager/async_task_manager.py | 74 +++++++++++++--- 6 files changed, 197 insertions(+), 75 deletions(-) diff --git a/bot.py b/bot.py index 7a4d1ed17..f38ca8ec0 100644 --- a/bot.py +++ b/bot.py @@ -109,12 +109,33 @@ async def graceful_shutdown(): # 停止所有异步任务 await async_task_manager.stop_and_wait_all_tasks() - tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] - for task in tasks: - task.cancel() - await asyncio.gather(*tasks, return_exceptions=True) + # 获取所有剩余任务,排除当前任务 + remaining_tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + + if remaining_tasks: + logger.info(f"正在取消 {len(remaining_tasks)} 个剩余任务...") + + # 取消所有剩余任务 + for task in remaining_tasks: + if not task.done(): + task.cancel() + + # 等待所有任务完成,设置超时 + try: + await asyncio.wait_for( + asyncio.gather(*remaining_tasks, return_exceptions=True), + timeout=15.0 + ) + logger.info("所有剩余任务已成功取消") + except asyncio.TimeoutError: + logger.warning("等待任务取消超时,强制继续关闭") + except Exception as e: + logger.error(f"等待任务取消时发生异常: {e}") + + logger.info("麦麦优雅关闭完成") + except Exception as e: - logger.error(f"麦麦关闭失败: {e}") + logger.error(f"麦麦关闭失败: {e}", exc_info=True) def check_eula(): diff --git a/src/chat/emoji_system/emoji_manager.py b/src/chat/emoji_system/emoji_manager.py index bd160135c..0fbe2f045 100644 --- a/src/chat/emoji_system/emoji_manager.py +++ b/src/chat/emoji_system/emoji_manager.py @@ -320,11 +320,11 @@ async def clear_temp_emoji() -> None: logger.info("[清理] 完成") -async def clean_unused_emojis(emoji_dir: str, emoji_objects: List["MaiEmoji"]) -> None: +async def clean_unused_emojis(emoji_dir: str, emoji_objects: List["MaiEmoji"],removed_count:int) -> int: """清理指定目录中未被 emoji_objects 追踪的表情包文件""" if not os.path.exists(emoji_dir): logger.warning(f"[清理] 目标目录不存在,跳过清理: {emoji_dir}") - return + return removed_count try: # 获取内存中所有有效表情包的完整路径集合 @@ -352,6 +352,8 @@ async def clean_unused_emojis(emoji_dir: str, emoji_objects: List["MaiEmoji"]) - logger.info(f"[清理] 在目录 {emoji_dir} 中清理了 {cleaned_count} 个破损表情包。") else: logger.info(f"[清理] 目录 {emoji_dir} 中没有需要清理的。") + + return removed_count + cleaned_count except Exception as e: logger.error(f"[错误] 清理未使用表情包文件时出错 ({emoji_dir}): {str(e)}") @@ -564,7 +566,7 @@ class EmojiManager: self.emoji_objects = [e for e in self.emoji_objects if e not in objects_to_remove] # 清理 EMOJI_REGISTED_DIR 目录中未被追踪的文件 - await clean_unused_emojis(EMOJI_REGISTED_DIR, self.emoji_objects) + removed_count = await clean_unused_emojis(EMOJI_REGISTED_DIR, self.emoji_objects,removed_count) # 输出清理结果 if removed_count > 0: diff --git a/src/chat/heart_flow/sub_heartflow.py b/src/chat/heart_flow/sub_heartflow.py index da7825c72..bb8e78581 100644 --- a/src/chat/heart_flow/sub_heartflow.py +++ b/src/chat/heart_flow/sub_heartflow.py @@ -77,10 +77,21 @@ class SubHeartflow: if self.normal_chat_instance: logger.info(f"{self.log_prefix} 离开normal模式") try: - await self.normal_chat_instance.stop_chat() # 调用 stop_chat + logger.debug(f"{self.log_prefix} 开始调用 stop_chat()") + # 添加超时保护,避免无限等待 + await asyncio.wait_for(self.normal_chat_instance.stop_chat(), timeout=10.0) + logger.debug(f"{self.log_prefix} stop_chat() 调用完成") + except asyncio.TimeoutError: + logger.warning(f"{self.log_prefix} 停止 NormalChat 超时,强制清理") + # 超时时强制清理 + self.normal_chat_instance = None except Exception as e: logger.error(f"{self.log_prefix} 停止 NormalChat 监控任务时出错: {e}") logger.error(traceback.format_exc()) + # 出错时也要清理实例 + self.normal_chat_instance = None + finally: + logger.debug(f"{self.log_prefix} _stop_normal_chat 完成") async def _start_normal_chat(self, rewind=False) -> bool: """ diff --git a/src/chat/normal_chat/normal_chat.py b/src/chat/normal_chat/normal_chat.py index 33824dcdb..5d271e714 100644 --- a/src/chat/normal_chat/normal_chat.py +++ b/src/chat/normal_chat/normal_chat.py @@ -161,58 +161,86 @@ class NormalChat: """ while True: try: - async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): - await asyncio.sleep(0.5) # 每秒检查一次 - # 检查任务是否已被取消 - if self._chat_task is None or self._chat_task.cancelled(): - logger.info(f"[{self.stream_name}] 兴趣监控任务被取消或置空,退出") - break + # 检查任务是否已被取消 - 移动到try块最开始 + if self._chat_task is None or self._chat_task.cancelled(): + logger.info(f"[{self.stream_name}] 兴趣监控任务被取消或置空,退出") + break + + # 检查是否已停用 + if self._disabled: + logger.info(f"[{self.stream_name}] 已停用,退出兴趣监控") + break - items_to_process = list(self.interest_dict.items()) - if not items_to_process: - continue + await asyncio.sleep(0.5) # 每0.5秒检查一次 + + # 再次检查取消状态 + if self._chat_task is None or self._chat_task.cancelled() or self._disabled: + logger.info(f"[{self.stream_name}] 检测到停止信号,退出") + break - # 并行处理兴趣消息 - async def process_single_message(msg_id, message, interest_value, is_mentioned): - """处理单个兴趣消息""" - try: - # 处理消息 - if time.time() - self.start_time > 300: - self.adjust_reply_frequency(duration=300 / 60) - else: - self.adjust_reply_frequency(duration=(time.time() - self.start_time) / 60) + items_to_process = list(self.interest_dict.items()) + if not items_to_process: + continue - # print(self.engaging_persons) + # 使用异步上下文管理器处理消息 + try: + async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): + # 在上下文内部再次检查取消状态 + if self._chat_task is None or self._chat_task.cancelled() or self._disabled: + logger.info(f"[{self.stream_name}] 在处理上下文中检测到停止信号,退出") + break - await self.normal_response( - message=message, - is_mentioned=is_mentioned, - interested_rate=interest_value * self.willing_amplifier, - ) - except Exception as e: - logger.error( - f"[{self.stream_name}] 处理兴趣消息{msg_id}时出错: {e}\n{traceback.format_exc()}" - ) - finally: - self.interest_dict.pop(msg_id, None) + # 并行处理兴趣消息 + async def process_single_message(msg_id, message, interest_value, is_mentioned): + """处理单个兴趣消息""" + try: + # 在处理每个消息前检查停止状态 + if self._disabled or (self._chat_task and self._chat_task.cancelled()): + return - # 创建并行任务列表 - tasks = [] - for msg_id, (message, interest_value, is_mentioned) in items_to_process: - task = process_single_message(msg_id, message, interest_value, is_mentioned) - tasks.append(task) + # 处理消息 + if time.time() - self.start_time > 300: + self.adjust_reply_frequency(duration=300 / 60) + else: + self.adjust_reply_frequency(duration=(time.time() - self.start_time) / 60) - # 并行执行所有任务,限制并发数量避免资源过度消耗 - if tasks: - # 使用信号量控制并发数,最多同时处理5个消息 - semaphore = asyncio.Semaphore(5) + await self.normal_response( + message=message, + is_mentioned=is_mentioned, + interested_rate=interest_value * self.willing_amplifier, + ) + except Exception as e: + logger.error( + f"[{self.stream_name}] 处理兴趣消息{msg_id}时出错: {e}\n{traceback.format_exc()}" + ) + finally: + self.interest_dict.pop(msg_id, None) - async def limited_process(task, sem): - async with sem: - await task + # 创建并行任务列表 + tasks = [] + for msg_id, (message, interest_value, is_mentioned) in items_to_process: + task = process_single_message(msg_id, message, interest_value, is_mentioned) + tasks.append(task) + + # 并行执行所有任务,限制并发数量避免资源过度消耗 + if tasks: + # 使用信号量控制并发数,最多同时处理5个消息 + semaphore = asyncio.Semaphore(5) + + async def limited_process(task, sem): + async with sem: + await task + + limited_tasks = [limited_process(task, semaphore) for task in tasks] + await asyncio.gather(*limited_tasks, return_exceptions=True) + + except asyncio.CancelledError: + logger.info(f"[{self.stream_name}] 处理上下文时任务被取消") + break + except Exception as e: + logger.error(f"[{self.stream_name}] 处理上下文时出错: {e}") + await asyncio.sleep(1) - limited_tasks = [limited_process(task, semaphore) for task in tasks] - await asyncio.gather(*limited_tasks, return_exceptions=True) except asyncio.CancelledError: logger.info(f"[{self.stream_name}] 兴趣监控任务被取消") break diff --git a/src/chat/utils/prompt_builder.py b/src/chat/utils/prompt_builder.py index cc9ee3e64..ebd0c0500 100644 --- a/src/chat/utils/prompt_builder.py +++ b/src/chat/utils/prompt_builder.py @@ -35,14 +35,21 @@ class PromptContext: """创建一个异步的临时提示模板作用域""" # 保存当前上下文并设置新上下文 if context_id is not None: - async with self._context_lock: - if context_id not in self._context_prompts: - self._context_prompts[context_id] = {} + try: + # 添加超时保护,避免长时间等待锁 + async with asyncio.wait_for(self._context_lock.acquire(), timeout=5.0): + if context_id not in self._context_prompts: + self._context_prompts[context_id] = {} + self._context_lock.release() + except asyncio.TimeoutError: + logger.warning(f"获取上下文锁超时,context_id: {context_id}") + # 超时时直接进入,不设置上下文 + context_id = None # 保存当前协程的上下文值,不影响其他协程 previous_context = self._current_context # 设置当前协程的新上下文 - token = self._current_context_var.set(context_id) + token = self._current_context_var.set(context_id) if context_id else None else: # 如果没有提供新上下文,保持当前上下文不变 previous_context = self._current_context @@ -51,12 +58,17 @@ class PromptContext: try: yield self finally: - # 恢复之前的上下文 - if context_id is not None: - if token: + # 恢复之前的上下文,添加异常保护 + if context_id is not None and token is not None: + try: self._current_context_var.reset(token) - else: - self._current_context = previous_context + except Exception as e: + logger.warning(f"恢复上下文时出错: {e}") + # 如果reset失败,尝试直接设置 + try: + self._current_context = previous_context + except Exception: + pass # 静默忽略恢复失败 async def get_prompt_async(self, name: str) -> Optional["Prompt"]: """异步获取当前作用域中的提示模板""" diff --git a/src/manager/async_task_manager.py b/src/manager/async_task_manager.py index 0a6a5e45f..7f7e29e05 100644 --- a/src/manager/async_task_manager.py +++ b/src/manager/async_task_manager.py @@ -90,8 +90,19 @@ class AsyncTaskManager: async with self._lock: # 由于可能需要await等待任务完成,所以需要加异步锁 if task.task_name in self.tasks: logger.warning(f"已存在名称为 '{task.task_name}' 的任务,正在尝试取消并替换") - self.tasks[task.task_name].cancel() # 取消已存在的任务 - await self.tasks[task.task_name] # 等待任务完成 + old_task = self.tasks[task.task_name] + old_task.cancel() # 取消已存在的任务 + + # 添加超时保护,避免无限等待 + try: + await asyncio.wait_for(old_task, timeout=5.0) + except asyncio.TimeoutError: + logger.warning(f"等待任务 '{task.task_name}' 完成超时") + except asyncio.CancelledError: + logger.info(f"任务 '{task.task_name}' 已成功取消") + except Exception as e: + logger.error(f"等待任务 '{task.task_name}' 完成时发生异常: {e}") + logger.info(f"成功结束任务 '{task.task_name}'") # 创建新任务 @@ -123,28 +134,65 @@ class AsyncTaskManager: async with self._lock: # 由于可能需要await等待任务完成,所以需要加异步锁 # 设置中止标志 self.abort_flag.set() + + # 首先收集所有任务的引用,避免在迭代过程中字典被修改 + task_items = list(self.tasks.items()) + # 取消所有任务 - for name, inst in self.tasks.items(): - try: - inst.cancel() - except asyncio.CancelledError: - logger.info(f"已取消任务 '{name}'") + for name, inst in task_items: + if not inst.done(): + try: + inst.cancel() + logger.debug(f"已请求取消任务 '{name}'") + except Exception as e: + logger.warning(f"取消任务 '{name}' 时发生异常: {e}") - # 等待所有任务完成 - for task_name, task_inst in self.tasks.items(): + # 等待所有任务完成,添加超时保护 + for task_name, task_inst in task_items: if not task_inst.done(): try: - await task_inst - except asyncio.CancelledError: # 此处再次捕获取消异常,防止stop_all_tasks()时延迟抛出异常 - logger.info(f"任务 {task_name} 已取消") + await asyncio.wait_for(task_inst, timeout=10.0) + logger.debug(f"任务 '{task_name}' 已完成") + except asyncio.TimeoutError: + logger.warning(f"等待任务 '{task_name}' 完成超时") + except asyncio.CancelledError: + logger.info(f"任务 '{task_name}' 已取消") except Exception as e: - logger.error(f"任务 {task_name} 执行时发生异常: {e}", ext_info=True) + logger.error(f"任务 '{task_name}' 执行时发生异常: {e}", exc_info=True) # 清空任务列表 self.tasks.clear() self.abort_flag.clear() logger.info("所有异步任务已停止") + def debug_task_status(self): + """ + 调试函数:打印所有任务的状态信息 + """ + logger.info("=== 异步任务状态调试信息 ===") + logger.info(f"当前管理的任务数量: {len(self.tasks)}") + logger.info(f"中止标志状态: {self.abort_flag.is_set()}") + + for task_name, task in self.tasks.items(): + status = [] + if task.done(): + status.append("已完成") + if task.cancelled(): + status.append("已取消") + elif task.exception(): + status.append(f"异常: {task.exception()}") + else: + status.append("正常完成") + else: + status.append("运行中") + + logger.info(f"任务 '{task_name}': {', '.join(status)}") + + # 检查所有asyncio任务 + all_tasks = asyncio.all_tasks() + logger.info(f"当前事件循环中的所有任务数量: {len(all_tasks)}") + logger.info("=== 调试信息结束 ===") + async_task_manager = AsyncTaskManager() """全局异步任务管理器实例""" From 86be17405ad1b62fc44a5c1b8c4c3316e5435f40 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Sat, 14 Jun 2025 17:28:38 +0000 Subject: [PATCH 4/6] =?UTF-8?q?=F0=9F=A4=96=20=E8=87=AA=E5=8A=A8=E6=A0=BC?= =?UTF-8?q?=E5=BC=8F=E5=8C=96=E4=BB=A3=E7=A0=81=20[skip=20ci]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bot.py | 15 ++++++--------- src/chat/emoji_system/emoji_manager.py | 6 +++--- src/chat/normal_chat/normal_chat.py | 6 +++--- src/manager/async_task_manager.py | 14 +++++++------- 4 files changed, 19 insertions(+), 22 deletions(-) diff --git a/bot.py b/bot.py index f38ca8ec0..bdf140fbd 100644 --- a/bot.py +++ b/bot.py @@ -111,29 +111,26 @@ async def graceful_shutdown(): # 获取所有剩余任务,排除当前任务 remaining_tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] - + if remaining_tasks: logger.info(f"正在取消 {len(remaining_tasks)} 个剩余任务...") - + # 取消所有剩余任务 for task in remaining_tasks: if not task.done(): task.cancel() - + # 等待所有任务完成,设置超时 try: - await asyncio.wait_for( - asyncio.gather(*remaining_tasks, return_exceptions=True), - timeout=15.0 - ) + await asyncio.wait_for(asyncio.gather(*remaining_tasks, return_exceptions=True), timeout=15.0) logger.info("所有剩余任务已成功取消") except asyncio.TimeoutError: logger.warning("等待任务取消超时,强制继续关闭") except Exception as e: logger.error(f"等待任务取消时发生异常: {e}") - + logger.info("麦麦优雅关闭完成") - + except Exception as e: logger.error(f"麦麦关闭失败: {e}", exc_info=True) diff --git a/src/chat/emoji_system/emoji_manager.py b/src/chat/emoji_system/emoji_manager.py index 0fbe2f045..56461d8b7 100644 --- a/src/chat/emoji_system/emoji_manager.py +++ b/src/chat/emoji_system/emoji_manager.py @@ -320,7 +320,7 @@ async def clear_temp_emoji() -> None: logger.info("[清理] 完成") -async def clean_unused_emojis(emoji_dir: str, emoji_objects: List["MaiEmoji"],removed_count:int) -> int: +async def clean_unused_emojis(emoji_dir: str, emoji_objects: List["MaiEmoji"], removed_count: int) -> int: """清理指定目录中未被 emoji_objects 追踪的表情包文件""" if not os.path.exists(emoji_dir): logger.warning(f"[清理] 目标目录不存在,跳过清理: {emoji_dir}") @@ -352,7 +352,7 @@ async def clean_unused_emojis(emoji_dir: str, emoji_objects: List["MaiEmoji"],re logger.info(f"[清理] 在目录 {emoji_dir} 中清理了 {cleaned_count} 个破损表情包。") else: logger.info(f"[清理] 目录 {emoji_dir} 中没有需要清理的。") - + return removed_count + cleaned_count except Exception as e: @@ -566,7 +566,7 @@ class EmojiManager: self.emoji_objects = [e for e in self.emoji_objects if e not in objects_to_remove] # 清理 EMOJI_REGISTED_DIR 目录中未被追踪的文件 - removed_count = await clean_unused_emojis(EMOJI_REGISTED_DIR, self.emoji_objects,removed_count) + removed_count = await clean_unused_emojis(EMOJI_REGISTED_DIR, self.emoji_objects, removed_count) # 输出清理结果 if removed_count > 0: diff --git a/src/chat/normal_chat/normal_chat.py b/src/chat/normal_chat/normal_chat.py index 5d271e714..8cfea7a25 100644 --- a/src/chat/normal_chat/normal_chat.py +++ b/src/chat/normal_chat/normal_chat.py @@ -165,14 +165,14 @@ class NormalChat: if self._chat_task is None or self._chat_task.cancelled(): logger.info(f"[{self.stream_name}] 兴趣监控任务被取消或置空,退出") break - + # 检查是否已停用 if self._disabled: logger.info(f"[{self.stream_name}] 已停用,退出兴趣监控") break await asyncio.sleep(0.5) # 每0.5秒检查一次 - + # 再次检查取消状态 if self._chat_task is None or self._chat_task.cancelled() or self._disabled: logger.info(f"[{self.stream_name}] 检测到停止信号,退出") @@ -233,7 +233,7 @@ class NormalChat: limited_tasks = [limited_process(task, semaphore) for task in tasks] await asyncio.gather(*limited_tasks, return_exceptions=True) - + except asyncio.CancelledError: logger.info(f"[{self.stream_name}] 处理上下文时任务被取消") break diff --git a/src/manager/async_task_manager.py b/src/manager/async_task_manager.py index 7f7e29e05..1e1e9132f 100644 --- a/src/manager/async_task_manager.py +++ b/src/manager/async_task_manager.py @@ -92,7 +92,7 @@ class AsyncTaskManager: logger.warning(f"已存在名称为 '{task.task_name}' 的任务,正在尝试取消并替换") old_task = self.tasks[task.task_name] old_task.cancel() # 取消已存在的任务 - + # 添加超时保护,避免无限等待 try: await asyncio.wait_for(old_task, timeout=5.0) @@ -102,7 +102,7 @@ class AsyncTaskManager: logger.info(f"任务 '{task.task_name}' 已成功取消") except Exception as e: logger.error(f"等待任务 '{task.task_name}' 完成时发生异常: {e}") - + logger.info(f"成功结束任务 '{task.task_name}'") # 创建新任务 @@ -134,10 +134,10 @@ class AsyncTaskManager: async with self._lock: # 由于可能需要await等待任务完成,所以需要加异步锁 # 设置中止标志 self.abort_flag.set() - + # 首先收集所有任务的引用,避免在迭代过程中字典被修改 task_items = list(self.tasks.items()) - + # 取消所有任务 for name, inst in task_items: if not inst.done(): @@ -172,7 +172,7 @@ class AsyncTaskManager: logger.info("=== 异步任务状态调试信息 ===") logger.info(f"当前管理的任务数量: {len(self.tasks)}") logger.info(f"中止标志状态: {self.abort_flag.is_set()}") - + for task_name, task in self.tasks.items(): status = [] if task.done(): @@ -185,9 +185,9 @@ class AsyncTaskManager: status.append("正常完成") else: status.append("运行中") - + logger.info(f"任务 '{task_name}': {', '.join(status)}") - + # 检查所有asyncio任务 all_tasks = asyncio.all_tasks() logger.info(f"当前事件循环中的所有任务数量: {len(all_tasks)}") From fe19406187e2d3091d809f68311cf64ee022d716 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Sun, 15 Jun 2025 01:44:41 +0800 Subject: [PATCH 5/6] =?UTF-8?q?fix=EF=BC=9A=E5=86=8D=E6=AC=A1=E5=B0=9D?= =?UTF-8?q?=E8=AF=95=E4=BF=AE=E5=A4=8D=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chat/emoji_system/emoji_manager.py | 2 + src/chat/focus_chat/heartFC_chat.py | 186 +++++++++------ src/chat/heart_flow/sub_heartflow.py | 106 +++++---- src/chat/normal_chat/normal_chat.py | 307 ++++++++++++++++--------- test_normal_chat_stop.py | 104 +++++++++ 5 files changed, 479 insertions(+), 226 deletions(-) create mode 100644 test_normal_chat_stop.py diff --git a/src/chat/emoji_system/emoji_manager.py b/src/chat/emoji_system/emoji_manager.py index 0fbe2f045..051864132 100644 --- a/src/chat/emoji_system/emoji_manager.py +++ b/src/chat/emoji_system/emoji_manager.py @@ -334,6 +334,8 @@ async def clean_unused_emojis(emoji_dir: str, emoji_objects: List["MaiEmoji"],re # 遍历指定目录中的所有文件 for file_name in os.listdir(emoji_dir): file_full_path = os.path.join(emoji_dir, file_name) + + # 确保处理的是文件而不是子目录 if not os.path.isfile(file_full_path): diff --git a/src/chat/focus_chat/heartFC_chat.py b/src/chat/focus_chat/heartFC_chat.py index 411366b47..ce4a43cba 100644 --- a/src/chat/focus_chat/heartFC_chat.py +++ b/src/chat/focus_chat/heartFC_chat.py @@ -216,26 +216,41 @@ class HeartFChatting: async def start(self): """检查是否需要启动主循环,如果未激活则启动。""" + logger.debug(f"{self.log_prefix} 开始启动 HeartFChatting") + # 如果循环已经激活,直接返回 if self._loop_active: + logger.debug(f"{self.log_prefix} HeartFChatting 已激活,无需重复启动") return - # 标记为活动状态,防止重复启动 - self._loop_active = True + try: + # 标记为活动状态,防止重复启动 + self._loop_active = True - # 检查是否已有任务在运行(理论上不应该,因为 _loop_active=False) - if self._loop_task and not self._loop_task.done(): - logger.warning(f"{self.log_prefix} 发现之前的循环任务仍在运行(不符合预期)。取消旧任务。") - self._loop_task.cancel() - try: - # 等待旧任务确实被取消 - await asyncio.wait_for(self._loop_task, timeout=0.5) - except (asyncio.CancelledError, asyncio.TimeoutError): - pass # 忽略取消或超时错误 - self._loop_task = None # 清理旧任务引用 + # 检查是否已有任务在运行(理论上不应该,因为 _loop_active=False) + if self._loop_task and not self._loop_task.done(): + logger.warning(f"{self.log_prefix} 发现之前的循环任务仍在运行(不符合预期)。取消旧任务。") + self._loop_task.cancel() + try: + # 等待旧任务确实被取消 + await asyncio.wait_for(self._loop_task, timeout=5.0) + except (asyncio.CancelledError, asyncio.TimeoutError): + pass # 忽略取消或超时错误 + except Exception as e: + logger.warning(f"{self.log_prefix} 等待旧任务取消时出错: {e}") + self._loop_task = None # 清理旧任务引用 - self._loop_task = asyncio.create_task(self._run_focus_chat()) - self._loop_task.add_done_callback(self._handle_loop_completion) + logger.debug(f"{self.log_prefix} 创建新的 HeartFChatting 主循环任务") + self._loop_task = asyncio.create_task(self._run_focus_chat()) + self._loop_task.add_done_callback(self._handle_loop_completion) + logger.debug(f"{self.log_prefix} HeartFChatting 启动完成") + + except Exception as e: + # 启动失败时重置状态 + self._loop_active = False + self._loop_task = None + logger.error(f"{self.log_prefix} HeartFChatting 启动失败: {e}") + raise def _handle_loop_completion(self, task: asyncio.Task): """当 _hfc_loop 任务完成时执行的回调。""" @@ -260,6 +275,8 @@ class HeartFChatting: try: while True: # 主循环 logger.debug(f"{self.log_prefix} 开始第{self._cycle_counter}次循环") + + # 检查关闭标志 if self._shutting_down: logger.info(f"{self.log_prefix} 检测到关闭标志,退出 Focus Chat 循环。") break @@ -274,73 +291,98 @@ class HeartFChatting: loop_cycle_start_time = time.monotonic() # 执行规划和处理阶段 - async with self._get_cycle_context(): - thinking_id = "tid" + str(round(time.time(), 2)) - self._current_cycle_detail.set_thinking_id(thinking_id) - # 主循环:思考->决策->执行 - async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): - logger.debug(f"模板 {self.chat_stream.context.get_template_name()}") - loop_info = await self._observe_process_plan_action_loop(cycle_timers, thinking_id) + try: + async with self._get_cycle_context(): + thinking_id = "tid" + str(round(time.time(), 2)) + self._current_cycle_detail.set_thinking_id(thinking_id) + + # 使用异步上下文管理器处理消息 + try: + async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): + # 在上下文内部检查关闭状态 + if self._shutting_down: + logger.info(f"{self.log_prefix} 在处理上下文中检测到关闭信号,退出") + break + + logger.debug(f"模板 {self.chat_stream.context.get_template_name()}") + loop_info = await self._observe_process_plan_action_loop(cycle_timers, thinking_id) - if loop_info["loop_action_info"]["command"] == "stop_focus_chat": - logger.info(f"{self.log_prefix} 麦麦决定停止专注聊天") - # 如果设置了回调函数,则调用它 - if self.on_stop_focus_chat: - try: - await self.on_stop_focus_chat() - logger.info(f"{self.log_prefix} 成功调用回调函数处理停止专注聊天") - except Exception as e: - logger.error(f"{self.log_prefix} 调用停止专注聊天回调函数时出错: {e}") - logger.error(traceback.format_exc()) + if loop_info["loop_action_info"]["command"] == "stop_focus_chat": + logger.info(f"{self.log_prefix} 麦麦决定停止专注聊天") + # 如果设置了回调函数,则调用它 + if self.on_stop_focus_chat: + try: + await self.on_stop_focus_chat() + logger.info(f"{self.log_prefix} 成功调用回调函数处理停止专注聊天") + except Exception as e: + logger.error(f"{self.log_prefix} 调用停止专注聊天回调函数时出错: {e}") + logger.error(traceback.format_exc()) + break + + except asyncio.CancelledError: + logger.info(f"{self.log_prefix} 处理上下文时任务被取消") break + except Exception as e: + logger.error(f"{self.log_prefix} 处理上下文时出错: {e}") + # 上下文处理失败,跳过当前循环 + await asyncio.sleep(1) + continue - self._current_cycle_detail.set_loop_info(loop_info) + self._current_cycle_detail.set_loop_info(loop_info) - # 从observations列表中获取HFCloopObservation - hfcloop_observation = next( - (obs for obs in self.observations if isinstance(obs, HFCloopObservation)), None - ) - if hfcloop_observation: - hfcloop_observation.add_loop_info(self._current_cycle_detail) - else: - logger.warning(f"{self.log_prefix} 未找到HFCloopObservation实例") + # 从observations列表中获取HFCloopObservation + hfcloop_observation = next( + (obs for obs in self.observations if isinstance(obs, HFCloopObservation)), None + ) + if hfcloop_observation: + hfcloop_observation.add_loop_info(self._current_cycle_detail) + else: + logger.warning(f"{self.log_prefix} 未找到HFCloopObservation实例") - self._current_cycle_detail.timers = cycle_timers + self._current_cycle_detail.timers = cycle_timers - # 防止循环过快消耗资源 - await _handle_cycle_delay( - loop_info["loop_action_info"]["action_taken"], loop_cycle_start_time, self.log_prefix + # 防止循环过快消耗资源 + await _handle_cycle_delay( + loop_info["loop_action_info"]["action_taken"], loop_cycle_start_time, self.log_prefix + ) + + # 完成当前循环并保存历史 + self._current_cycle_detail.complete_cycle() + self._cycle_history.append(self._current_cycle_detail) + + # 记录循环信息和计时器结果 + timer_strings = [] + for name, elapsed in cycle_timers.items(): + formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒" + timer_strings.append(f"{name}: {formatted_time}") + + # 新增:输出每个处理器的耗时 + processor_time_costs = self._current_cycle_detail.loop_processor_info.get("processor_time_costs", {}) + processor_time_strings = [] + for pname, ptime in processor_time_costs.items(): + formatted_ptime = f"{ptime * 1000:.2f}毫秒" if ptime < 1 else f"{ptime:.2f}秒" + processor_time_strings.append(f"{pname}: {formatted_ptime}") + processor_time_log = ( + ("\n各处理器耗时: " + "; ".join(processor_time_strings)) if processor_time_strings else "" ) - # 完成当前循环并保存历史 - self._current_cycle_detail.complete_cycle() - self._cycle_history.append(self._current_cycle_detail) + logger.info( + f"{self.log_prefix} 第{self._current_cycle_detail.cycle_id}次思考," + f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒, " + f"动作: {self._current_cycle_detail.loop_plan_info['action_result']['action_type']}" + + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "") + + processor_time_log + ) - # 记录循环信息和计时器结果 - timer_strings = [] - for name, elapsed in cycle_timers.items(): - formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒" - timer_strings.append(f"{name}: {formatted_time}") - - # 新增:输出每个处理器的耗时 - processor_time_costs = self._current_cycle_detail.loop_processor_info.get("processor_time_costs", {}) - processor_time_strings = [] - for pname, ptime in processor_time_costs.items(): - formatted_ptime = f"{ptime * 1000:.2f}毫秒" if ptime < 1 else f"{ptime:.2f}秒" - processor_time_strings.append(f"{pname}: {formatted_ptime}") - processor_time_log = ( - ("\n各处理器耗时: " + "; ".join(processor_time_strings)) if processor_time_strings else "" - ) - - logger.info( - f"{self.log_prefix} 第{self._current_cycle_detail.cycle_id}次思考," - f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒, " - f"动作: {self._current_cycle_detail.loop_plan_info['action_result']['action_type']}" - + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "") - + processor_time_log - ) - - await asyncio.sleep(global_config.focus_chat.think_interval) + await asyncio.sleep(global_config.focus_chat.think_interval) + + except asyncio.CancelledError: + logger.info(f"{self.log_prefix} 循环处理时任务被取消") + break + except Exception as e: + logger.error(f"{self.log_prefix} 循环处理时出错: {e}") + logger.error(traceback.format_exc()) + await asyncio.sleep(1) # 出错后等待一秒再继续 except asyncio.CancelledError: # 设置了关闭标志位后被取消是正常流程 diff --git a/src/chat/heart_flow/sub_heartflow.py b/src/chat/heart_flow/sub_heartflow.py index bb8e78581..661a4db96 100644 --- a/src/chat/heart_flow/sub_heartflow.py +++ b/src/chat/heart_flow/sub_heartflow.py @@ -78,19 +78,22 @@ class SubHeartflow: logger.info(f"{self.log_prefix} 离开normal模式") try: logger.debug(f"{self.log_prefix} 开始调用 stop_chat()") - # 添加超时保护,避免无限等待 - await asyncio.wait_for(self.normal_chat_instance.stop_chat(), timeout=10.0) + # 使用更短的超时时间,强制快速停止 + await asyncio.wait_for(self.normal_chat_instance.stop_chat(), timeout=3.0) logger.debug(f"{self.log_prefix} stop_chat() 调用完成") except asyncio.TimeoutError: logger.warning(f"{self.log_prefix} 停止 NormalChat 超时,强制清理") - # 超时时强制清理 + # 超时时强制清理实例 self.normal_chat_instance = None except Exception as e: logger.error(f"{self.log_prefix} 停止 NormalChat 监控任务时出错: {e}") - logger.error(traceback.format_exc()) - # 出错时也要清理实例 + # 出错时也要清理实例,避免状态不一致 self.normal_chat_instance = None finally: + # 确保实例被清理 + if self.normal_chat_instance: + logger.warning(f"{self.log_prefix} 强制清理 NormalChat 实例") + self.normal_chat_instance = None logger.debug(f"{self.log_prefix} _stop_normal_chat 完成") async def _start_normal_chat(self, rewind=False) -> bool: @@ -175,46 +178,71 @@ class SubHeartflow: async def _start_heart_fc_chat(self) -> bool: """启动 HeartFChatting 实例,确保 NormalChat 已停止""" - await self._stop_normal_chat() # 确保普通聊天监控已停止 - self.interest_dict.clear() - - log_prefix = self.log_prefix - # 如果实例已存在,检查其循环任务状态 - if self.heart_fc_instance: - # 如果任务已完成或不存在,则尝试重新启动 - if self.heart_fc_instance._loop_task is None or self.heart_fc_instance._loop_task.done(): - logger.info(f"{log_prefix} HeartFChatting 实例存在但循环未运行,尝试启动...") - try: - await self.heart_fc_instance.start() # 启动循环 - logger.info(f"{log_prefix} HeartFChatting 循环已启动。") - return True - except Exception as e: - logger.error(f"{log_prefix} 尝试启动现有 HeartFChatting 循环时出错: {e}") - logger.error(traceback.format_exc()) - return False # 启动失败 - else: - # 任务正在运行 - logger.debug(f"{log_prefix} HeartFChatting 已在运行中。") - return True # 已经在运行 - - # 如果实例不存在,则创建并启动 - logger.info(f"{log_prefix} 麦麦准备开始专注聊天...") + logger.debug(f"{self.log_prefix} 开始启动 HeartFChatting") + try: - self.heart_fc_instance = HeartFChatting( - chat_id=self.subheartflow_id, - # observations=self.observations, - on_stop_focus_chat=self._handle_stop_focus_chat_request, - ) + # 确保普通聊天监控已停止 + await self._stop_normal_chat() + self.interest_dict.clear() + + log_prefix = self.log_prefix + # 如果实例已存在,检查其循环任务状态 + if self.heart_fc_instance: + logger.debug(f"{log_prefix} HeartFChatting 实例已存在,检查状态") + # 如果任务已完成或不存在,则尝试重新启动 + if self.heart_fc_instance._loop_task is None or self.heart_fc_instance._loop_task.done(): + logger.info(f"{log_prefix} HeartFChatting 实例存在但循环未运行,尝试启动...") + try: + # 添加超时保护 + await asyncio.wait_for(self.heart_fc_instance.start(), timeout=15.0) + logger.info(f"{log_prefix} HeartFChatting 循环已启动。") + return True + except asyncio.TimeoutError: + logger.error(f"{log_prefix} 启动现有 HeartFChatting 循环超时") + # 超时时清理实例,准备重新创建 + self.heart_fc_instance = None + except Exception as e: + logger.error(f"{log_prefix} 尝试启动现有 HeartFChatting 循环时出错: {e}") + logger.error(traceback.format_exc()) + # 出错时清理实例,准备重新创建 + self.heart_fc_instance = None + else: + # 任务正在运行 + logger.debug(f"{log_prefix} HeartFChatting 已在运行中。") + return True # 已经在运行 - await self.heart_fc_instance.start() - logger.debug(f"{log_prefix} 麦麦已成功进入专注聊天模式 (新实例已启动)。") - return True + # 如果实例不存在,则创建并启动 + logger.info(f"{log_prefix} 麦麦准备开始专注聊天...") + try: + logger.debug(f"{log_prefix} 创建新的 HeartFChatting 实例") + self.heart_fc_instance = HeartFChatting( + chat_id=self.subheartflow_id, + # observations=self.observations, + on_stop_focus_chat=self._handle_stop_focus_chat_request, + ) + logger.debug(f"{log_prefix} 启动 HeartFChatting 实例") + # 添加超时保护 + await asyncio.wait_for(self.heart_fc_instance.start(), timeout=15.0) + logger.debug(f"{log_prefix} 麦麦已成功进入专注聊天模式 (新实例已启动)。") + return True + + except asyncio.TimeoutError: + logger.error(f"{log_prefix} 创建或启动新 HeartFChatting 实例超时") + self.heart_fc_instance = None # 超时时清理实例 + return False + except Exception as e: + logger.error(f"{log_prefix} 创建或启动 HeartFChatting 实例时出错: {e}") + logger.error(traceback.format_exc()) + self.heart_fc_instance = None # 创建或初始化异常,清理实例 + return False + except Exception as e: - logger.error(f"{log_prefix} 创建或启动 HeartFChatting 实例时出错: {e}") + logger.error(f"{self.log_prefix} _start_heart_fc_chat 执行时出错: {e}") logger.error(traceback.format_exc()) - self.heart_fc_instance = None # 创建或初始化异常,清理实例 return False + finally: + logger.debug(f"{self.log_prefix} _start_heart_fc_chat 完成") async def change_chat_state(self, new_state: ChatState) -> None: """ diff --git a/src/chat/normal_chat/normal_chat.py b/src/chat/normal_chat/normal_chat.py index 5d271e714..841927654 100644 --- a/src/chat/normal_chat/normal_chat.py +++ b/src/chat/normal_chat/normal_chat.py @@ -159,94 +159,119 @@ class NormalChat: 后台任务方法,轮询当前实例关联chat的兴趣消息 通常由start_monitoring_interest()启动 """ - while True: - try: - # 检查任务是否已被取消 - 移动到try块最开始 - if self._chat_task is None or self._chat_task.cancelled(): - logger.info(f"[{self.stream_name}] 兴趣监控任务被取消或置空,退出") - break - - # 检查是否已停用 + logger.debug(f"[{self.stream_name}] 兴趣监控任务开始") + + try: + while True: + # 第一层检查:立即检查取消和停用状态 if self._disabled: - logger.info(f"[{self.stream_name}] 已停用,退出兴趣监控") + logger.info(f"[{self.stream_name}] 检测到停用标志,退出兴趣监控") break - - await asyncio.sleep(0.5) # 每0.5秒检查一次 - # 再次检查取消状态 - if self._chat_task is None or self._chat_task.cancelled() or self._disabled: - logger.info(f"[{self.stream_name}] 检测到停止信号,退出") + # 检查当前任务是否已被取消 + current_task = asyncio.current_task() + if current_task and current_task.cancelled(): + logger.info(f"[{self.stream_name}] 当前任务已被取消,退出") break - items_to_process = list(self.interest_dict.items()) - if not items_to_process: - continue - - # 使用异步上下文管理器处理消息 try: - async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): - # 在上下文内部再次检查取消状态 - if self._chat_task is None or self._chat_task.cancelled() or self._disabled: - logger.info(f"[{self.stream_name}] 在处理上下文中检测到停止信号,退出") - break + # 短暂等待,让出控制权 + await asyncio.sleep(0.1) + + # 第二层检查:睡眠后再次检查状态 + if self._disabled: + logger.info(f"[{self.stream_name}] 睡眠后检测到停用标志,退出") + break - # 并行处理兴趣消息 - async def process_single_message(msg_id, message, interest_value, is_mentioned): - """处理单个兴趣消息""" - try: - # 在处理每个消息前检查停止状态 - if self._disabled or (self._chat_task and self._chat_task.cancelled()): - return + # 获取待处理消息 + items_to_process = list(self.interest_dict.items()) + if not items_to_process: + # 没有消息时继续下一轮循环 + continue - # 处理消息 - if time.time() - self.start_time > 300: - self.adjust_reply_frequency(duration=300 / 60) - else: - self.adjust_reply_frequency(duration=(time.time() - self.start_time) / 60) + # 第三层检查:在处理消息前最后检查一次 + if self._disabled: + logger.info(f"[{self.stream_name}] 处理消息前检测到停用标志,退出") + break - await self.normal_response( - message=message, - is_mentioned=is_mentioned, - interested_rate=interest_value * self.willing_amplifier, - ) - except Exception as e: - logger.error( - f"[{self.stream_name}] 处理兴趣消息{msg_id}时出错: {e}\n{traceback.format_exc()}" - ) - finally: - self.interest_dict.pop(msg_id, None) + # 使用异步上下文管理器处理消息 + try: + async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): + # 在上下文内部再次检查取消状态 + if self._disabled: + logger.info(f"[{self.stream_name}] 在处理上下文中检测到停止信号,退出") + break - # 创建并行任务列表 - tasks = [] - for msg_id, (message, interest_value, is_mentioned) in items_to_process: - task = process_single_message(msg_id, message, interest_value, is_mentioned) - tasks.append(task) + # 并行处理兴趣消息 + async def process_single_message(msg_id, message, interest_value, is_mentioned): + """处理单个兴趣消息""" + try: + # 在处理每个消息前检查停止状态 + if self._disabled: + logger.debug(f"[{self.stream_name}] 处理消息时检测到停用,跳过消息 {msg_id}") + return - # 并行执行所有任务,限制并发数量避免资源过度消耗 - if tasks: - # 使用信号量控制并发数,最多同时处理5个消息 - semaphore = asyncio.Semaphore(5) + # 处理消息 + if time.time() - self.start_time > 300: + self.adjust_reply_frequency(duration=300 / 60) + else: + self.adjust_reply_frequency(duration=(time.time() - self.start_time) / 60) - async def limited_process(task, sem): - async with sem: - await task + await self.normal_response( + message=message, + is_mentioned=is_mentioned, + interested_rate=interest_value * self.willing_amplifier, + ) + except asyncio.CancelledError: + logger.debug(f"[{self.stream_name}] 处理消息 {msg_id} 时被取消") + raise # 重新抛出取消异常 + except Exception as e: + logger.error(f"[{self.stream_name}] 处理兴趣消息{msg_id}时出错: {e}") + # 不打印完整traceback,避免日志污染 + finally: + # 无论如何都要清理消息 + self.interest_dict.pop(msg_id, None) + + # 创建并行任务列表 + tasks = [] + for msg_id, (message, interest_value, is_mentioned) in items_to_process: + task = process_single_message(msg_id, message, interest_value, is_mentioned) + tasks.append(task) + + # 并行执行所有任务,限制并发数量避免资源过度消耗 + if tasks: + # 使用信号量控制并发数,最多同时处理5个消息 + semaphore = asyncio.Semaphore(5) + + async def limited_process(task, sem): + async with sem: + await task + + limited_tasks = [limited_process(task, semaphore) for task in tasks] + await asyncio.gather(*limited_tasks, return_exceptions=True) + + except asyncio.CancelledError: + logger.info(f"[{self.stream_name}] 处理上下文时任务被取消") + break + except Exception as e: + logger.error(f"[{self.stream_name}] 处理上下文时出错: {e}") + # 出错后短暂等待,避免快速重试 + await asyncio.sleep(0.5) - limited_tasks = [limited_process(task, semaphore) for task in tasks] - await asyncio.gather(*limited_tasks, return_exceptions=True) - except asyncio.CancelledError: - logger.info(f"[{self.stream_name}] 处理上下文时任务被取消") + logger.info(f"[{self.stream_name}] 主循环中任务被取消") break except Exception as e: - logger.error(f"[{self.stream_name}] 处理上下文时出错: {e}") - await asyncio.sleep(1) + logger.error(f"[{self.stream_name}] 主循环出错: {e}") + # 出错后等待一秒再继续 + await asyncio.sleep(1.0) - except asyncio.CancelledError: - logger.info(f"[{self.stream_name}] 兴趣监控任务被取消") - break - except Exception as e: - logger.error(f"[{self.stream_name}] 兴趣监控任务出错: {e}\n{traceback.format_exc()}") - await asyncio.sleep(1) # 出错后等待一秒再继续 + except asyncio.CancelledError: + logger.info(f"[{self.stream_name}] 兴趣监控任务被取消") + except Exception as e: + logger.error(f"[{self.stream_name}] 兴趣监控任务严重错误: {e}") + finally: + logger.debug(f"[{self.stream_name}] 兴趣监控任务结束") # 改为实例方法, 移除 chat 参数 async def normal_response(self, message: MessageRecv, is_mentioned: bool, interested_rate: float) -> None: @@ -504,60 +529,112 @@ class NormalChat: # 改为实例方法, 移除 chat 参数 async def start_chat(self): - """启动聊天任务。""" # Ensure initialized before starting tasks - self._disabled = False # 启动时重置停用标志 - - if self._chat_task is None or self._chat_task.done(): - # logger.info(f"[{self.stream_name}] 开始处理兴趣消息...") - polling_task = asyncio.create_task(self._reply_interested_message()) - polling_task.add_done_callback(lambda t: self._handle_task_completion(t)) - self._chat_task = polling_task - else: + """启动聊天任务。""" + logger.debug(f"[{self.stream_name}] 开始启动聊天任务") + + # 重置停用标志 + self._disabled = False + + # 检查是否已有运行中的任务 + if self._chat_task and not self._chat_task.done(): logger.info(f"[{self.stream_name}] 聊天轮询任务已在运行中。") + return + + # 清理可能存在的已完成任务引用 + if self._chat_task and self._chat_task.done(): + self._chat_task = None + + try: + logger.debug(f"[{self.stream_name}] 创建新的聊天轮询任务") + polling_task = asyncio.create_task(self._reply_interested_message()) + + # 设置回调 + polling_task.add_done_callback(lambda t: self._handle_task_completion(t)) + + # 保存任务引用 + self._chat_task = polling_task + + logger.debug(f"[{self.stream_name}] 聊天任务启动完成") + + except Exception as e: + logger.error(f"[{self.stream_name}] 启动聊天任务失败: {e}") + self._chat_task = None + raise def _handle_task_completion(self, task: asyncio.Task): """任务完成回调处理""" - if task is not self._chat_task: - logger.warning(f"[{self.stream_name}] 收到未知任务回调") - return try: - if exc := task.exception(): - logger.error(f"[{self.stream_name}] 任务异常: {exc}") - traceback.print_exc() - except asyncio.CancelledError: - logger.debug(f"[{self.stream_name}] 任务已取消") + # 简化回调逻辑,避免复杂的异常处理 + logger.debug(f"[{self.stream_name}] 任务完成回调被调用") + + # 检查是否是我们管理的任务 + if task is not self._chat_task: + # 如果已经不是当前任务(可能在stop_chat中已被清空),直接返回 + logger.debug(f"[{self.stream_name}] 回调的任务不是当前管理的任务") + return + + # 清理任务引用 + self._chat_task = None + logger.debug(f"[{self.stream_name}] 任务引用已清理") + + # 简单记录任务状态,不进行复杂处理 + if task.cancelled(): + logger.debug(f"[{self.stream_name}] 任务已取消") + elif task.done(): + try: + # 尝试获取异常,但不抛出 + exc = task.exception() + if exc: + logger.error(f"[{self.stream_name}] 任务异常: {type(exc).__name__}: {exc}") + else: + logger.debug(f"[{self.stream_name}] 任务正常完成") + except Exception as e: + # 获取异常时也可能出错,静默处理 + logger.debug(f"[{self.stream_name}] 获取任务异常时出错: {e}") + except Exception as e: - logger.error(f"[{self.stream_name}] 回调处理错误: {e}") - finally: - if self._chat_task is task: - self._chat_task = None - logger.debug(f"[{self.stream_name}] 任务清理完成") + # 回调函数中的任何异常都要捕获,避免影响系统 + logger.error(f"[{self.stream_name}] 任务完成回调处理出错: {e}") + # 确保任务引用被清理 + self._chat_task = None # 改为实例方法, 移除 stream_id 参数 async def stop_chat(self): """停止当前实例的兴趣监控任务。""" - self._disabled = True # 停止时设置停用标志 - if self._chat_task and not self._chat_task.done(): - task = self._chat_task - logger.debug(f"[{self.stream_name}] 尝试取消normal聊天任务。") - task.cancel() - try: - # 添加超时机制,最多等待2秒 - await asyncio.wait_for(task, timeout=2.0) - except asyncio.TimeoutError: - logger.warning(f"[{self.stream_name}] 等待任务取消超时,强制结束") - except asyncio.CancelledError: - logger.info(f"[{self.stream_name}] 结束一般聊天模式。") - except Exception as e: - # 回调函数 _handle_task_completion 会处理异常日志 - logger.warning(f"[{self.stream_name}] 等待监控任务取消时捕获到异常 (可能已在回调中记录): {e}") - finally: - # 确保任务状态更新,即使等待出错 (回调函数也会尝试更新) - if self._chat_task is task: - self._chat_task = None + logger.debug(f"[{self.stream_name}] 开始停止聊天任务") + + # 立即设置停用标志,防止新任务启动 + self._disabled = True + + # 如果没有运行中的任务,直接返回 + if not self._chat_task or self._chat_task.done(): + logger.debug(f"[{self.stream_name}] 没有运行中的任务,直接完成停止") + self._chat_task = None + return + + # 保存任务引用并立即清空,避免回调中的循环引用 + task_to_cancel = self._chat_task + self._chat_task = None + + logger.debug(f"[{self.stream_name}] 取消聊天任务") + + # 尝试优雅取消任务 + task_to_cancel.cancel() + + # 不等待任务完成,让它自然结束 + # 这样可以避免等待过程中的潜在递归问题 + + # 异步清理思考消息,不阻塞当前流程 + asyncio.create_task(self._cleanup_thinking_messages_async()) + + logger.debug(f"[{self.stream_name}] 聊天任务停止完成") - # 清理所有未处理的思考消息 + async def _cleanup_thinking_messages_async(self): + """异步清理思考消息,避免阻塞主流程""" try: + # 添加短暂延迟,让任务有时间响应取消 + await asyncio.sleep(0.1) + container = await message_manager.get_container(self.stream_id) if container: # 查找并移除所有 MessageThinking 类型的消息 @@ -567,8 +644,8 @@ class NormalChat: container.messages.remove(msg) logger.info(f"[{self.stream_name}] 清理了 {len(thinking_messages)} 条未处理的思考消息。") except Exception as e: - logger.error(f"[{self.stream_name}] 清理思考消息时出错: {e}") - traceback.print_exc() + logger.error(f"[{self.stream_name}] 异步清理思考消息时出错: {e}") + # 不打印完整栈跟踪,避免日志污染 # 获取最近回复记录的方法 def get_recent_replies(self, limit: int = 10) -> List[dict]: diff --git a/test_normal_chat_stop.py b/test_normal_chat_stop.py new file mode 100644 index 000000000..743353441 --- /dev/null +++ b/test_normal_chat_stop.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +""" +NormalChat 启动停止测试脚本 +""" + +import asyncio +import time +import logging +from src.common.logger import get_logger + +logger = get_logger("test_normal_chat_stop") + +async def test_task_cancel_behavior(): + """测试任务取消行为""" + + class MockNormalChat: + def __init__(self): + self._disabled = False + self._chat_task = None + self.stream_name = "test_stream" + + async def mock_reply_loop(self): + """模拟回复循环""" + logger.info("模拟回复循环开始") + try: + while True: + # 检查停用标志 + if self._disabled: + logger.info("检测到停用标志,退出循环") + break + + # 模拟工作 + logger.info("模拟处理消息...") + await asyncio.sleep(0.1) + + except asyncio.CancelledError: + logger.info("模拟回复循环被取消") + raise + except Exception as e: + logger.error(f"模拟回复循环出错: {e}") + finally: + logger.info("模拟回复循环结束") + + async def start_chat(self): + """启动聊天""" + if self._chat_task and not self._chat_task.done(): + logger.info("任务已在运行") + return + + self._disabled = False + self._chat_task = asyncio.create_task(self.mock_reply_loop()) + logger.info("聊天任务已启动") + + async def stop_chat(self): + """停止聊天""" + logger.info("开始停止聊天") + + # 设置停用标志 + self._disabled = True + + if not self._chat_task or self._chat_task.done(): + logger.info("没有运行中的任务") + return + + # 保存任务引用并清空 + task_to_cancel = self._chat_task + self._chat_task = None + + # 取消任务 + task_to_cancel.cancel() + + logger.info("聊天任务停止完成") + + # 测试正常启动停止 + logger.info("=== 测试正常启动停止 ===") + chat = MockNormalChat() + + # 启动 + await chat.start_chat() + await asyncio.sleep(0.5) # 让任务运行一会 + + # 停止 + await chat.stop_chat() + await asyncio.sleep(0.1) # 让取消操作完成 + + logger.info("=== 测试完成 ===") + +async def main(): + """主函数""" + logger.info("开始 NormalChat 停止测试") + + try: + await test_task_cancel_behavior() + except Exception as e: + logger.error(f"测试失败: {e}") + import traceback + logger.error(traceback.format_exc()) + + logger.info("测试结束") + +if __name__ == "__main__": + # 设置日志级别 + logging.basicConfig(level=logging.INFO) + asyncio.run(main()) \ No newline at end of file From b9c3cd8ab39b96eea516a64445f2f7039b6c2b9a Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Sat, 14 Jun 2025 17:45:19 +0000 Subject: [PATCH 6/6] =?UTF-8?q?=F0=9F=A4=96=20=E8=87=AA=E5=8A=A8=E6=A0=BC?= =?UTF-8?q?=E5=BC=8F=E5=8C=96=E4=BB=A3=E7=A0=81=20[skip=20ci]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chat/emoji_system/emoji_manager.py | 2 - src/chat/focus_chat/heartFC_chat.py | 22 ++++++----- src/chat/heart_flow/sub_heartflow.py | 6 +-- src/chat/normal_chat/normal_chat.py | 54 +++++++++++++------------- test_normal_chat_stop.py | 43 ++++++++++---------- 5 files changed, 67 insertions(+), 60 deletions(-) diff --git a/src/chat/emoji_system/emoji_manager.py b/src/chat/emoji_system/emoji_manager.py index 26440c1c9..56461d8b7 100644 --- a/src/chat/emoji_system/emoji_manager.py +++ b/src/chat/emoji_system/emoji_manager.py @@ -334,8 +334,6 @@ async def clean_unused_emojis(emoji_dir: str, emoji_objects: List["MaiEmoji"], r # 遍历指定目录中的所有文件 for file_name in os.listdir(emoji_dir): file_full_path = os.path.join(emoji_dir, file_name) - - # 确保处理的是文件而不是子目录 if not os.path.isfile(file_full_path): diff --git a/src/chat/focus_chat/heartFC_chat.py b/src/chat/focus_chat/heartFC_chat.py index ce4a43cba..d48ec465b 100644 --- a/src/chat/focus_chat/heartFC_chat.py +++ b/src/chat/focus_chat/heartFC_chat.py @@ -217,7 +217,7 @@ class HeartFChatting: async def start(self): """检查是否需要启动主循环,如果未激活则启动。""" logger.debug(f"{self.log_prefix} 开始启动 HeartFChatting") - + # 如果循环已经激活,直接返回 if self._loop_active: logger.debug(f"{self.log_prefix} HeartFChatting 已激活,无需重复启动") @@ -244,7 +244,7 @@ class HeartFChatting: self._loop_task = asyncio.create_task(self._run_focus_chat()) self._loop_task.add_done_callback(self._handle_loop_completion) logger.debug(f"{self.log_prefix} HeartFChatting 启动完成") - + except Exception as e: # 启动失败时重置状态 self._loop_active = False @@ -275,7 +275,7 @@ class HeartFChatting: try: while True: # 主循环 logger.debug(f"{self.log_prefix} 开始第{self._cycle_counter}次循环") - + # 检查关闭标志 if self._shutting_down: logger.info(f"{self.log_prefix} 检测到关闭标志,退出 Focus Chat 循环。") @@ -295,15 +295,17 @@ class HeartFChatting: async with self._get_cycle_context(): thinking_id = "tid" + str(round(time.time(), 2)) self._current_cycle_detail.set_thinking_id(thinking_id) - + # 使用异步上下文管理器处理消息 try: - async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): + async with global_prompt_manager.async_message_scope( + self.chat_stream.context.get_template_name() + ): # 在上下文内部检查关闭状态 if self._shutting_down: logger.info(f"{self.log_prefix} 在处理上下文中检测到关闭信号,退出") break - + logger.debug(f"模板 {self.chat_stream.context.get_template_name()}") loop_info = await self._observe_process_plan_action_loop(cycle_timers, thinking_id) @@ -318,7 +320,7 @@ class HeartFChatting: logger.error(f"{self.log_prefix} 调用停止专注聊天回调函数时出错: {e}") logger.error(traceback.format_exc()) break - + except asyncio.CancelledError: logger.info(f"{self.log_prefix} 处理上下文时任务被取消") break @@ -357,7 +359,9 @@ class HeartFChatting: timer_strings.append(f"{name}: {formatted_time}") # 新增:输出每个处理器的耗时 - processor_time_costs = self._current_cycle_detail.loop_processor_info.get("processor_time_costs", {}) + processor_time_costs = self._current_cycle_detail.loop_processor_info.get( + "processor_time_costs", {} + ) processor_time_strings = [] for pname, ptime in processor_time_costs.items(): formatted_ptime = f"{ptime * 1000:.2f}毫秒" if ptime < 1 else f"{ptime:.2f}秒" @@ -375,7 +379,7 @@ class HeartFChatting: ) await asyncio.sleep(global_config.focus_chat.think_interval) - + except asyncio.CancelledError: logger.info(f"{self.log_prefix} 循环处理时任务被取消") break diff --git a/src/chat/heart_flow/sub_heartflow.py b/src/chat/heart_flow/sub_heartflow.py index 661a4db96..b78fba3ec 100644 --- a/src/chat/heart_flow/sub_heartflow.py +++ b/src/chat/heart_flow/sub_heartflow.py @@ -179,12 +179,12 @@ class SubHeartflow: async def _start_heart_fc_chat(self) -> bool: """启动 HeartFChatting 实例,确保 NormalChat 已停止""" logger.debug(f"{self.log_prefix} 开始启动 HeartFChatting") - + try: # 确保普通聊天监控已停止 await self._stop_normal_chat() self.interest_dict.clear() - + log_prefix = self.log_prefix # 如果实例已存在,检查其循环任务状态 if self.heart_fc_instance: @@ -236,7 +236,7 @@ class SubHeartflow: logger.error(traceback.format_exc()) self.heart_fc_instance = None # 创建或初始化异常,清理实例 return False - + except Exception as e: logger.error(f"{self.log_prefix} _start_heart_fc_chat 执行时出错: {e}") logger.error(traceback.format_exc()) diff --git a/src/chat/normal_chat/normal_chat.py b/src/chat/normal_chat/normal_chat.py index 841927654..d7bd0b840 100644 --- a/src/chat/normal_chat/normal_chat.py +++ b/src/chat/normal_chat/normal_chat.py @@ -160,14 +160,14 @@ class NormalChat: 通常由start_monitoring_interest()启动 """ logger.debug(f"[{self.stream_name}] 兴趣监控任务开始") - + try: while True: # 第一层检查:立即检查取消和停用状态 if self._disabled: logger.info(f"[{self.stream_name}] 检测到停用标志,退出兴趣监控") break - + # 检查当前任务是否已被取消 current_task = asyncio.current_task() if current_task and current_task.cancelled(): @@ -177,7 +177,7 @@ class NormalChat: try: # 短暂等待,让出控制权 await asyncio.sleep(0.1) - + # 第二层检查:睡眠后再次检查状态 if self._disabled: logger.info(f"[{self.stream_name}] 睡眠后检测到停用标志,退出") @@ -196,7 +196,9 @@ class NormalChat: # 使用异步上下文管理器处理消息 try: - async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): + async with global_prompt_manager.async_message_scope( + self.chat_stream.context.get_template_name() + ): # 在上下文内部再次检查取消状态 if self._disabled: logger.info(f"[{self.stream_name}] 在处理上下文中检测到停止信号,退出") @@ -249,7 +251,7 @@ class NormalChat: limited_tasks = [limited_process(task, semaphore) for task in tasks] await asyncio.gather(*limited_tasks, return_exceptions=True) - + except asyncio.CancelledError: logger.info(f"[{self.stream_name}] 处理上下文时任务被取消") break @@ -531,31 +533,31 @@ class NormalChat: async def start_chat(self): """启动聊天任务。""" logger.debug(f"[{self.stream_name}] 开始启动聊天任务") - + # 重置停用标志 self._disabled = False - + # 检查是否已有运行中的任务 if self._chat_task and not self._chat_task.done(): logger.info(f"[{self.stream_name}] 聊天轮询任务已在运行中。") return - + # 清理可能存在的已完成任务引用 if self._chat_task and self._chat_task.done(): self._chat_task = None - + try: logger.debug(f"[{self.stream_name}] 创建新的聊天轮询任务") polling_task = asyncio.create_task(self._reply_interested_message()) - + # 设置回调 polling_task.add_done_callback(lambda t: self._handle_task_completion(t)) - + # 保存任务引用 self._chat_task = polling_task - + logger.debug(f"[{self.stream_name}] 聊天任务启动完成") - + except Exception as e: logger.error(f"[{self.stream_name}] 启动聊天任务失败: {e}") self._chat_task = None @@ -566,17 +568,17 @@ class NormalChat: try: # 简化回调逻辑,避免复杂的异常处理 logger.debug(f"[{self.stream_name}] 任务完成回调被调用") - + # 检查是否是我们管理的任务 if task is not self._chat_task: # 如果已经不是当前任务(可能在stop_chat中已被清空),直接返回 logger.debug(f"[{self.stream_name}] 回调的任务不是当前管理的任务") return - + # 清理任务引用 self._chat_task = None logger.debug(f"[{self.stream_name}] 任务引用已清理") - + # 简单记录任务状态,不进行复杂处理 if task.cancelled(): logger.debug(f"[{self.stream_name}] 任务已取消") @@ -591,7 +593,7 @@ class NormalChat: except Exception as e: # 获取异常时也可能出错,静默处理 logger.debug(f"[{self.stream_name}] 获取任务异常时出错: {e}") - + except Exception as e: # 回调函数中的任何异常都要捕获,避免影响系统 logger.error(f"[{self.stream_name}] 任务完成回调处理出错: {e}") @@ -602,31 +604,31 @@ class NormalChat: async def stop_chat(self): """停止当前实例的兴趣监控任务。""" logger.debug(f"[{self.stream_name}] 开始停止聊天任务") - + # 立即设置停用标志,防止新任务启动 self._disabled = True - + # 如果没有运行中的任务,直接返回 if not self._chat_task or self._chat_task.done(): logger.debug(f"[{self.stream_name}] 没有运行中的任务,直接完成停止") self._chat_task = None return - + # 保存任务引用并立即清空,避免回调中的循环引用 task_to_cancel = self._chat_task self._chat_task = None - + logger.debug(f"[{self.stream_name}] 取消聊天任务") - + # 尝试优雅取消任务 task_to_cancel.cancel() - + # 不等待任务完成,让它自然结束 # 这样可以避免等待过程中的潜在递归问题 - + # 异步清理思考消息,不阻塞当前流程 asyncio.create_task(self._cleanup_thinking_messages_async()) - + logger.debug(f"[{self.stream_name}] 聊天任务停止完成") async def _cleanup_thinking_messages_async(self): @@ -634,7 +636,7 @@ class NormalChat: try: # 添加短暂延迟,让任务有时间响应取消 await asyncio.sleep(0.1) - + container = await message_manager.get_container(self.stream_id) if container: # 查找并移除所有 MessageThinking 类型的消息 diff --git a/test_normal_chat_stop.py b/test_normal_chat_stop.py index 743353441..1a08565ae 100644 --- a/test_normal_chat_stop.py +++ b/test_normal_chat_stop.py @@ -4,21 +4,21 @@ NormalChat 启动停止测试脚本 """ import asyncio -import time import logging from src.common.logger import get_logger logger = get_logger("test_normal_chat_stop") + async def test_task_cancel_behavior(): """测试任务取消行为""" - + class MockNormalChat: def __init__(self): self._disabled = False self._chat_task = None self.stream_name = "test_stream" - + async def mock_reply_loop(self): """模拟回复循环""" logger.info("模拟回复循环开始") @@ -28,11 +28,11 @@ async def test_task_cancel_behavior(): if self._disabled: logger.info("检测到停用标志,退出循环") break - + # 模拟工作 logger.info("模拟处理消息...") await asyncio.sleep(0.1) - + except asyncio.CancelledError: logger.info("模拟回复循环被取消") raise @@ -40,65 +40,68 @@ async def test_task_cancel_behavior(): logger.error(f"模拟回复循环出错: {e}") finally: logger.info("模拟回复循环结束") - + async def start_chat(self): """启动聊天""" if self._chat_task and not self._chat_task.done(): logger.info("任务已在运行") return - + self._disabled = False self._chat_task = asyncio.create_task(self.mock_reply_loop()) logger.info("聊天任务已启动") - + async def stop_chat(self): """停止聊天""" logger.info("开始停止聊天") - + # 设置停用标志 self._disabled = True - + if not self._chat_task or self._chat_task.done(): logger.info("没有运行中的任务") return - + # 保存任务引用并清空 task_to_cancel = self._chat_task self._chat_task = None - + # 取消任务 task_to_cancel.cancel() - + logger.info("聊天任务停止完成") - + # 测试正常启动停止 logger.info("=== 测试正常启动停止 ===") chat = MockNormalChat() - + # 启动 await chat.start_chat() await asyncio.sleep(0.5) # 让任务运行一会 - + # 停止 await chat.stop_chat() await asyncio.sleep(0.1) # 让取消操作完成 - + logger.info("=== 测试完成 ===") + async def main(): """主函数""" logger.info("开始 NormalChat 停止测试") - + try: await test_task_cancel_behavior() except Exception as e: logger.error(f"测试失败: {e}") import traceback + logger.error(traceback.format_exc()) - + logger.info("测试结束") + if __name__ == "__main__": # 设置日志级别 logging.basicConfig(level=logging.INFO) - asyncio.run(main()) \ No newline at end of file + asyncio.run(main())