fix:ai哥神秘修复无法模式切换
This commit is contained in:
31
bot.py
31
bot.py
@@ -109,12 +109,33 @@ async def graceful_shutdown():
|
||||
# 停止所有异步任务
|
||||
await async_task_manager.stop_and_wait_all_tasks()
|
||||
|
||||
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
|
||||
for task in tasks:
|
||||
task.cancel()
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
# 获取所有剩余任务,排除当前任务
|
||||
remaining_tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
|
||||
|
||||
if remaining_tasks:
|
||||
logger.info(f"正在取消 {len(remaining_tasks)} 个剩余任务...")
|
||||
|
||||
# 取消所有剩余任务
|
||||
for task in remaining_tasks:
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
|
||||
# 等待所有任务完成,设置超时
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
asyncio.gather(*remaining_tasks, return_exceptions=True),
|
||||
timeout=15.0
|
||||
)
|
||||
logger.info("所有剩余任务已成功取消")
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("等待任务取消超时,强制继续关闭")
|
||||
except Exception as e:
|
||||
logger.error(f"等待任务取消时发生异常: {e}")
|
||||
|
||||
logger.info("麦麦优雅关闭完成")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"麦麦关闭失败: {e}")
|
||||
logger.error(f"麦麦关闭失败: {e}", exc_info=True)
|
||||
|
||||
|
||||
def check_eula():
|
||||
|
||||
@@ -320,11 +320,11 @@ async def clear_temp_emoji() -> None:
|
||||
logger.info("[清理] 完成")
|
||||
|
||||
|
||||
async def clean_unused_emojis(emoji_dir: str, emoji_objects: List["MaiEmoji"]) -> None:
|
||||
async def clean_unused_emojis(emoji_dir: str, emoji_objects: List["MaiEmoji"],removed_count:int) -> int:
|
||||
"""清理指定目录中未被 emoji_objects 追踪的表情包文件"""
|
||||
if not os.path.exists(emoji_dir):
|
||||
logger.warning(f"[清理] 目标目录不存在,跳过清理: {emoji_dir}")
|
||||
return
|
||||
return removed_count
|
||||
|
||||
try:
|
||||
# 获取内存中所有有效表情包的完整路径集合
|
||||
@@ -353,6 +353,8 @@ async def clean_unused_emojis(emoji_dir: str, emoji_objects: List["MaiEmoji"]) -
|
||||
else:
|
||||
logger.info(f"[清理] 目录 {emoji_dir} 中没有需要清理的。")
|
||||
|
||||
return removed_count + cleaned_count
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 清理未使用表情包文件时出错 ({emoji_dir}): {str(e)}")
|
||||
|
||||
@@ -564,7 +566,7 @@ class EmojiManager:
|
||||
self.emoji_objects = [e for e in self.emoji_objects if e not in objects_to_remove]
|
||||
|
||||
# 清理 EMOJI_REGISTED_DIR 目录中未被追踪的文件
|
||||
await clean_unused_emojis(EMOJI_REGISTED_DIR, self.emoji_objects)
|
||||
removed_count = await clean_unused_emojis(EMOJI_REGISTED_DIR, self.emoji_objects,removed_count)
|
||||
|
||||
# 输出清理结果
|
||||
if removed_count > 0:
|
||||
|
||||
@@ -77,10 +77,21 @@ class SubHeartflow:
|
||||
if self.normal_chat_instance:
|
||||
logger.info(f"{self.log_prefix} 离开normal模式")
|
||||
try:
|
||||
await self.normal_chat_instance.stop_chat() # 调用 stop_chat
|
||||
logger.debug(f"{self.log_prefix} 开始调用 stop_chat()")
|
||||
# 添加超时保护,避免无限等待
|
||||
await asyncio.wait_for(self.normal_chat_instance.stop_chat(), timeout=10.0)
|
||||
logger.debug(f"{self.log_prefix} stop_chat() 调用完成")
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"{self.log_prefix} 停止 NormalChat 超时,强制清理")
|
||||
# 超时时强制清理
|
||||
self.normal_chat_instance = None
|
||||
except Exception as e:
|
||||
logger.error(f"{self.log_prefix} 停止 NormalChat 监控任务时出错: {e}")
|
||||
logger.error(traceback.format_exc())
|
||||
# 出错时也要清理实例
|
||||
self.normal_chat_instance = None
|
||||
finally:
|
||||
logger.debug(f"{self.log_prefix} _stop_normal_chat 完成")
|
||||
|
||||
async def _start_normal_chat(self, rewind=False) -> bool:
|
||||
"""
|
||||
|
||||
@@ -161,58 +161,86 @@ class NormalChat:
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()):
|
||||
await asyncio.sleep(0.5) # 每秒检查一次
|
||||
# 检查任务是否已被取消
|
||||
if self._chat_task is None or self._chat_task.cancelled():
|
||||
logger.info(f"[{self.stream_name}] 兴趣监控任务被取消或置空,退出")
|
||||
break
|
||||
# 检查任务是否已被取消 - 移动到try块最开始
|
||||
if self._chat_task is None or self._chat_task.cancelled():
|
||||
logger.info(f"[{self.stream_name}] 兴趣监控任务被取消或置空,退出")
|
||||
break
|
||||
|
||||
items_to_process = list(self.interest_dict.items())
|
||||
if not items_to_process:
|
||||
continue
|
||||
# 检查是否已停用
|
||||
if self._disabled:
|
||||
logger.info(f"[{self.stream_name}] 已停用,退出兴趣监控")
|
||||
break
|
||||
|
||||
# 并行处理兴趣消息
|
||||
async def process_single_message(msg_id, message, interest_value, is_mentioned):
|
||||
"""处理单个兴趣消息"""
|
||||
try:
|
||||
# 处理消息
|
||||
if time.time() - self.start_time > 300:
|
||||
self.adjust_reply_frequency(duration=300 / 60)
|
||||
else:
|
||||
self.adjust_reply_frequency(duration=(time.time() - self.start_time) / 60)
|
||||
await asyncio.sleep(0.5) # 每0.5秒检查一次
|
||||
|
||||
# print(self.engaging_persons)
|
||||
# 再次检查取消状态
|
||||
if self._chat_task is None or self._chat_task.cancelled() or self._disabled:
|
||||
logger.info(f"[{self.stream_name}] 检测到停止信号,退出")
|
||||
break
|
||||
|
||||
await self.normal_response(
|
||||
message=message,
|
||||
is_mentioned=is_mentioned,
|
||||
interested_rate=interest_value * self.willing_amplifier,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[{self.stream_name}] 处理兴趣消息{msg_id}时出错: {e}\n{traceback.format_exc()}"
|
||||
)
|
||||
finally:
|
||||
self.interest_dict.pop(msg_id, None)
|
||||
items_to_process = list(self.interest_dict.items())
|
||||
if not items_to_process:
|
||||
continue
|
||||
|
||||
# 创建并行任务列表
|
||||
tasks = []
|
||||
for msg_id, (message, interest_value, is_mentioned) in items_to_process:
|
||||
task = process_single_message(msg_id, message, interest_value, is_mentioned)
|
||||
tasks.append(task)
|
||||
# 使用异步上下文管理器处理消息
|
||||
try:
|
||||
async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()):
|
||||
# 在上下文内部再次检查取消状态
|
||||
if self._chat_task is None or self._chat_task.cancelled() or self._disabled:
|
||||
logger.info(f"[{self.stream_name}] 在处理上下文中检测到停止信号,退出")
|
||||
break
|
||||
|
||||
# 并行执行所有任务,限制并发数量避免资源过度消耗
|
||||
if tasks:
|
||||
# 使用信号量控制并发数,最多同时处理5个消息
|
||||
semaphore = asyncio.Semaphore(5)
|
||||
# 并行处理兴趣消息
|
||||
async def process_single_message(msg_id, message, interest_value, is_mentioned):
|
||||
"""处理单个兴趣消息"""
|
||||
try:
|
||||
# 在处理每个消息前检查停止状态
|
||||
if self._disabled or (self._chat_task and self._chat_task.cancelled()):
|
||||
return
|
||||
|
||||
async def limited_process(task, sem):
|
||||
async with sem:
|
||||
await task
|
||||
# 处理消息
|
||||
if time.time() - self.start_time > 300:
|
||||
self.adjust_reply_frequency(duration=300 / 60)
|
||||
else:
|
||||
self.adjust_reply_frequency(duration=(time.time() - self.start_time) / 60)
|
||||
|
||||
await self.normal_response(
|
||||
message=message,
|
||||
is_mentioned=is_mentioned,
|
||||
interested_rate=interest_value * self.willing_amplifier,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[{self.stream_name}] 处理兴趣消息{msg_id}时出错: {e}\n{traceback.format_exc()}"
|
||||
)
|
||||
finally:
|
||||
self.interest_dict.pop(msg_id, None)
|
||||
|
||||
# 创建并行任务列表
|
||||
tasks = []
|
||||
for msg_id, (message, interest_value, is_mentioned) in items_to_process:
|
||||
task = process_single_message(msg_id, message, interest_value, is_mentioned)
|
||||
tasks.append(task)
|
||||
|
||||
# 并行执行所有任务,限制并发数量避免资源过度消耗
|
||||
if tasks:
|
||||
# 使用信号量控制并发数,最多同时处理5个消息
|
||||
semaphore = asyncio.Semaphore(5)
|
||||
|
||||
async def limited_process(task, sem):
|
||||
async with sem:
|
||||
await task
|
||||
|
||||
limited_tasks = [limited_process(task, semaphore) for task in tasks]
|
||||
await asyncio.gather(*limited_tasks, return_exceptions=True)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"[{self.stream_name}] 处理上下文时任务被取消")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.stream_name}] 处理上下文时出错: {e}")
|
||||
await asyncio.sleep(1)
|
||||
|
||||
limited_tasks = [limited_process(task, semaphore) for task in tasks]
|
||||
await asyncio.gather(*limited_tasks, return_exceptions=True)
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"[{self.stream_name}] 兴趣监控任务被取消")
|
||||
break
|
||||
|
||||
@@ -35,14 +35,21 @@ class PromptContext:
|
||||
"""创建一个异步的临时提示模板作用域"""
|
||||
# 保存当前上下文并设置新上下文
|
||||
if context_id is not None:
|
||||
async with self._context_lock:
|
||||
if context_id not in self._context_prompts:
|
||||
self._context_prompts[context_id] = {}
|
||||
try:
|
||||
# 添加超时保护,避免长时间等待锁
|
||||
async with asyncio.wait_for(self._context_lock.acquire(), timeout=5.0):
|
||||
if context_id not in self._context_prompts:
|
||||
self._context_prompts[context_id] = {}
|
||||
self._context_lock.release()
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"获取上下文锁超时,context_id: {context_id}")
|
||||
# 超时时直接进入,不设置上下文
|
||||
context_id = None
|
||||
|
||||
# 保存当前协程的上下文值,不影响其他协程
|
||||
previous_context = self._current_context
|
||||
# 设置当前协程的新上下文
|
||||
token = self._current_context_var.set(context_id)
|
||||
token = self._current_context_var.set(context_id) if context_id else None
|
||||
else:
|
||||
# 如果没有提供新上下文,保持当前上下文不变
|
||||
previous_context = self._current_context
|
||||
@@ -51,12 +58,17 @@ class PromptContext:
|
||||
try:
|
||||
yield self
|
||||
finally:
|
||||
# 恢复之前的上下文
|
||||
if context_id is not None:
|
||||
if token:
|
||||
# 恢复之前的上下文,添加异常保护
|
||||
if context_id is not None and token is not None:
|
||||
try:
|
||||
self._current_context_var.reset(token)
|
||||
else:
|
||||
self._current_context = previous_context
|
||||
except Exception as e:
|
||||
logger.warning(f"恢复上下文时出错: {e}")
|
||||
# 如果reset失败,尝试直接设置
|
||||
try:
|
||||
self._current_context = previous_context
|
||||
except Exception:
|
||||
pass # 静默忽略恢复失败
|
||||
|
||||
async def get_prompt_async(self, name: str) -> Optional["Prompt"]:
|
||||
"""异步获取当前作用域中的提示模板"""
|
||||
|
||||
@@ -90,8 +90,19 @@ class AsyncTaskManager:
|
||||
async with self._lock: # 由于可能需要await等待任务完成,所以需要加异步锁
|
||||
if task.task_name in self.tasks:
|
||||
logger.warning(f"已存在名称为 '{task.task_name}' 的任务,正在尝试取消并替换")
|
||||
self.tasks[task.task_name].cancel() # 取消已存在的任务
|
||||
await self.tasks[task.task_name] # 等待任务完成
|
||||
old_task = self.tasks[task.task_name]
|
||||
old_task.cancel() # 取消已存在的任务
|
||||
|
||||
# 添加超时保护,避免无限等待
|
||||
try:
|
||||
await asyncio.wait_for(old_task, timeout=5.0)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"等待任务 '{task.task_name}' 完成超时")
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"任务 '{task.task_name}' 已成功取消")
|
||||
except Exception as e:
|
||||
logger.error(f"等待任务 '{task.task_name}' 完成时发生异常: {e}")
|
||||
|
||||
logger.info(f"成功结束任务 '{task.task_name}'")
|
||||
|
||||
# 创建新任务
|
||||
@@ -123,28 +134,65 @@ class AsyncTaskManager:
|
||||
async with self._lock: # 由于可能需要await等待任务完成,所以需要加异步锁
|
||||
# 设置中止标志
|
||||
self.abort_flag.set()
|
||||
# 取消所有任务
|
||||
for name, inst in self.tasks.items():
|
||||
try:
|
||||
inst.cancel()
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"已取消任务 '{name}'")
|
||||
|
||||
# 等待所有任务完成
|
||||
for task_name, task_inst in self.tasks.items():
|
||||
# 首先收集所有任务的引用,避免在迭代过程中字典被修改
|
||||
task_items = list(self.tasks.items())
|
||||
|
||||
# 取消所有任务
|
||||
for name, inst in task_items:
|
||||
if not inst.done():
|
||||
try:
|
||||
inst.cancel()
|
||||
logger.debug(f"已请求取消任务 '{name}'")
|
||||
except Exception as e:
|
||||
logger.warning(f"取消任务 '{name}' 时发生异常: {e}")
|
||||
|
||||
# 等待所有任务完成,添加超时保护
|
||||
for task_name, task_inst in task_items:
|
||||
if not task_inst.done():
|
||||
try:
|
||||
await task_inst
|
||||
except asyncio.CancelledError: # 此处再次捕获取消异常,防止stop_all_tasks()时延迟抛出异常
|
||||
logger.info(f"任务 {task_name} 已取消")
|
||||
await asyncio.wait_for(task_inst, timeout=10.0)
|
||||
logger.debug(f"任务 '{task_name}' 已完成")
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"等待任务 '{task_name}' 完成超时")
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"任务 '{task_name}' 已取消")
|
||||
except Exception as e:
|
||||
logger.error(f"任务 {task_name} 执行时发生异常: {e}", ext_info=True)
|
||||
logger.error(f"任务 '{task_name}' 执行时发生异常: {e}", exc_info=True)
|
||||
|
||||
# 清空任务列表
|
||||
self.tasks.clear()
|
||||
self.abort_flag.clear()
|
||||
logger.info("所有异步任务已停止")
|
||||
|
||||
def debug_task_status(self):
|
||||
"""
|
||||
调试函数:打印所有任务的状态信息
|
||||
"""
|
||||
logger.info("=== 异步任务状态调试信息 ===")
|
||||
logger.info(f"当前管理的任务数量: {len(self.tasks)}")
|
||||
logger.info(f"中止标志状态: {self.abort_flag.is_set()}")
|
||||
|
||||
for task_name, task in self.tasks.items():
|
||||
status = []
|
||||
if task.done():
|
||||
status.append("已完成")
|
||||
if task.cancelled():
|
||||
status.append("已取消")
|
||||
elif task.exception():
|
||||
status.append(f"异常: {task.exception()}")
|
||||
else:
|
||||
status.append("正常完成")
|
||||
else:
|
||||
status.append("运行中")
|
||||
|
||||
logger.info(f"任务 '{task_name}': {', '.join(status)}")
|
||||
|
||||
# 检查所有asyncio任务
|
||||
all_tasks = asyncio.all_tasks()
|
||||
logger.info(f"当前事件循环中的所有任务数量: {len(all_tasks)}")
|
||||
logger.info("=== 调试信息结束 ===")
|
||||
|
||||
|
||||
async_task_manager = AsyncTaskManager()
|
||||
"""全局异步任务管理器实例"""
|
||||
|
||||
Reference in New Issue
Block a user