feat(message-manager): 改进流生命周期管理和消息对象处理

-通过适当的任务取消为流循环添加强制重启功能
-通过更安全的删除和适当的任务终止来增强流清理
-改进亲和流聊天插件中的消息对象转换
-用DatabaseMessages对象替换基于字典的消息处理
-为任务取消添加超时处理,以防止死锁
-简化计划执行中的用户ID提取和消息ID处理
This commit is contained in:
Windpicker-owo
2025-10-31 21:27:11 +08:00
parent 655b4f20c6
commit ce03ced355
5 changed files with 73 additions and 53 deletions

View File

@@ -111,10 +111,23 @@ class StreamLoopManager:
logger.warning(f"无法获取流上下文: {stream_id}")
return False
# 快速路径:如果流已存在,无需处理
if context.stream_loop_task and not context.stream_loop_task.done():
# 快速路径:如果流已存在且不是强制启动,无需处理
if not force and context.stream_loop_task and not context.stream_loop_task.done():
logger.debug(f"{stream_id} 循环已在运行")
return True
# 如果是强制启动且任务仍在运行,先取消旧任务
if force and context.stream_loop_task and not context.stream_loop_task.done():
logger.info(f"强制启动模式:先取消现有流循环任务: {stream_id}")
old_task = context.stream_loop_task
old_task.cancel()
try:
await asyncio.wait_for(old_task, timeout=2.0)
logger.info(f"旧流循环任务已结束: {stream_id}")
except (asyncio.TimeoutError, asyncio.CancelledError):
logger.debug(f"旧流循环任务已取消或超时: {stream_id}")
except Exception as e:
logger.warning(f"等待旧任务结束时出错: {e}")
# 创建流循环任务
try:

View File

@@ -205,9 +205,9 @@ class GlobalNoticeManager:
# 格式化notice消息
if notice_type:
notice_line = f"[{notice_type}] {message.processed_plain_text or message.raw_message}"
notice_line = f"[{notice_type}] {message.processed_plain_text}"
else:
notice_line = f"[通知] {message.processed_plain_text or message.raw_message}"
notice_line = f"[通知] {message.processed_plain_text}"
# 添加时间信息(相对时间)
time_diff = int(time.time() - notice.timestamp)

View File

@@ -275,8 +275,20 @@ class MessageManager:
inactive_streams.append(stream_id)
for stream_id in inactive_streams:
try:
# 在使用之前重新从 chat_manager 中获取 chat_stream避免引用未定义或过期的变量
chat_stream = chat_manager.streams.get(stream_id)
if not chat_stream:
logger.debug(f"聊天流 {stream_id} 在清理时已不存在,跳过")
continue
await chat_stream.context_manager.clear_context()
del chat_manager.streams[stream_id]
# 安全删除流(若已被其他地方删除则捕获)
try:
del chat_manager.streams[stream_id]
except KeyError:
logger.debug(f"删除聊天流 {stream_id} 时未找到,可能已被移除")
logger.info(f"清理不活跃聊天流: {stream_id}")
except Exception as e:
logger.error(f"清理聊天流 {stream_id} 失败: {e}")
@@ -342,7 +354,16 @@ class MessageManager:
# 取消 stream_loop_task子任务会通过 try-catch 自动取消
try:
stream_loop_task.cancel()
logger.info(f"取消流循环任务: {chat_stream.stream_id}")
logger.info(f"发送取消信号到流循环任务: {chat_stream.stream_id}")
# 等待任务真正结束(设置超时避免死锁)
try:
await asyncio.wait_for(stream_loop_task, timeout=2.0)
logger.info(f"流循环任务已完全结束: {chat_stream.stream_id}")
except asyncio.TimeoutError:
logger.warning(f"等待流循环任务结束超时: {chat_stream.stream_id}")
except asyncio.CancelledError:
logger.info(f"流循环任务已被取消: {chat_stream.stream_id}")
except Exception as e:
logger.warning(f"取消流循环任务失败: {chat_stream.stream_id} - {e}")