ruff
This commit is contained in:
committed by
Windpicker-owo
parent
83517b7178
commit
e02af208c4
@@ -68,23 +68,33 @@ class StreamLoopManager:
|
||||
|
||||
# 取消所有流循环
|
||||
try:
|
||||
# 创建任务列表以便并发取消
|
||||
cancel_tasks = []
|
||||
for stream_id, task in list(self.stream_loops.items()):
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
cancel_tasks.append((stream_id, task))
|
||||
# 使用带超时的锁获取,避免无限等待
|
||||
lock_acquired = await asyncio.wait_for(self.loop_lock.acquire(), timeout=10.0)
|
||||
if not lock_acquired:
|
||||
logger.error("停止管理器时获取锁超时")
|
||||
else:
|
||||
try:
|
||||
# 创建任务列表以便并发取消
|
||||
cancel_tasks = []
|
||||
for stream_id, task in list(self.stream_loops.items()):
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
cancel_tasks.append((stream_id, task))
|
||||
|
||||
# 并发等待所有任务取消
|
||||
if cancel_tasks:
|
||||
logger.info(f"正在取消 {len(cancel_tasks)} 个流循环任务...")
|
||||
await asyncio.gather(
|
||||
*[self._wait_for_task_cancel(stream_id, task) for stream_id, task in cancel_tasks],
|
||||
return_exceptions=True
|
||||
)
|
||||
# 并发等待所有任务取消
|
||||
if cancel_tasks:
|
||||
logger.info(f"正在取消 {len(cancel_tasks)} 个流循环任务...")
|
||||
await asyncio.gather(
|
||||
*[self._wait_for_task_cancel(stream_id, task) for stream_id, task in cancel_tasks],
|
||||
return_exceptions=True
|
||||
)
|
||||
|
||||
self.stream_loops.clear()
|
||||
logger.info("所有流循环已清理")
|
||||
self.stream_loops.clear()
|
||||
logger.info("所有流循环已清理")
|
||||
finally:
|
||||
self.loop_lock.release()
|
||||
except asyncio.TimeoutError:
|
||||
logger.error("停止管理器时获取锁超时")
|
||||
except Exception as e:
|
||||
logger.error(f"停止管理器时出错: {e}")
|
||||
|
||||
@@ -183,9 +193,31 @@ class StreamLoopManager:
|
||||
except Exception as e:
|
||||
logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}")
|
||||
|
||||
del self.stream_loops[stream_id]
|
||||
logger.info(f"停止流循环: {stream_id} (剩余: {len(self.stream_loops)})")
|
||||
return True
|
||||
try:
|
||||
# 双重检查:在获取锁后再次检查流是否存在
|
||||
if stream_id not in self.stream_loops:
|
||||
logger.debug(f"流 {stream_id} 循环不存在(双重检查)")
|
||||
return False
|
||||
|
||||
task = self.stream_loops[stream_id]
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
try:
|
||||
# 设置取消超时,避免无限等待
|
||||
await asyncio.wait_for(task, timeout=5.0)
|
||||
except asyncio.CancelledError:
|
||||
logger.debug(f"流循环任务已取消: {stream_id}")
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"流循环任务取消超时: {stream_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}")
|
||||
|
||||
del self.stream_loops[stream_id]
|
||||
logger.info(f"停止流循环: {stream_id} (剩余: {len(self.stream_loops)})")
|
||||
return True
|
||||
finally:
|
||||
# 确保锁被释放
|
||||
self.loop_lock.release()
|
||||
|
||||
async def _stream_loop(self, stream_id: str) -> None:
|
||||
"""单个流的无限循环
|
||||
@@ -499,7 +531,7 @@ class StreamLoopManager:
|
||||
stream_id: 流ID
|
||||
"""
|
||||
logger.info(f"强制分发流处理: {stream_id}")
|
||||
|
||||
|
||||
try:
|
||||
# 检查是否有现有的分发循环
|
||||
if stream_id in self.stream_loops:
|
||||
@@ -519,24 +551,24 @@ class StreamLoopManager:
|
||||
)
|
||||
# 从字典中移除
|
||||
del self.stream_loops[stream_id]
|
||||
|
||||
|
||||
# 获取聊天管理器和流
|
||||
chat_manager = get_chat_manager()
|
||||
chat_stream = chat_manager.get_stream(stream_id)
|
||||
if not chat_stream:
|
||||
logger.warning(f"强制分发时未找到流: {stream_id}")
|
||||
return
|
||||
|
||||
|
||||
# 获取流上下文
|
||||
context = chat_stream.context_manager.context
|
||||
if not context:
|
||||
logger.warning(f"强制分发时未找到流上下文: {stream_id}")
|
||||
return
|
||||
|
||||
|
||||
# 检查未读消息数量
|
||||
unread_count = self._get_unread_count(context)
|
||||
logger.info(f"流 {stream_id} 当前未读消息数: {unread_count}")
|
||||
|
||||
|
||||
# 创建新的流循环任务
|
||||
new_task = asyncio.create_task(
|
||||
self._stream_loop(stream_id),
|
||||
@@ -544,9 +576,9 @@ class StreamLoopManager:
|
||||
)
|
||||
self.stream_loops[stream_id] = new_task
|
||||
self.stats["total_loops"] += 1
|
||||
|
||||
|
||||
logger.info(f"已创建强制分发流循环: {stream_id} (当前总数: {len(self.stream_loops)})")
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"强制分发流处理失败 {stream_id}: {e}", exc_info=True)
|
||||
|
||||
|
||||
@@ -90,7 +90,7 @@ class MessageManager:
|
||||
return
|
||||
await self._check_and_handle_interruption(chat_stream)
|
||||
await chat_stream.context_manager.add_message(message)
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"添加消息到聊天流 {stream_id} 时发生错误: {e}")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user