Files
Mofox-Core/src/chat/message_manager/distribution_manager.py
Windpicker-owo 06484245e1 refactor(chat): 优化流循环管理器的并发性能和健壮性
改进StreamLoopManager的锁机制和任务管理,添加超时控制避免死锁,使用并发操作提升性能,增强异常处理和日志记录确保系统稳定性。
2025-11-19 23:14:03 +08:00

617 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
流循环管理器
为每个聊天流创建独立的无限循环任务,主动轮询处理消息
"""
import asyncio
import time
from typing import Any
from src.chat.chatter_manager import ChatterManager
from src.chat.energy_system import energy_manager
from src.common.logger import get_logger
from src.config.config import global_config
from src.plugin_system.apis.chat_api import get_chat_manager
logger = get_logger("stream_loop_manager")
class StreamLoopManager:
"""流循环管理器 - 每个流一个独立的无限循环任务"""
def __init__(self, max_concurrent_streams: int | None = None):
# 流循环任务管理
self.stream_loops: dict[str, asyncio.Task] = {}
self.loop_lock = asyncio.Lock()
# 统计信息
self.stats: dict[str, Any] = {
"active_streams": 0,
"total_loops": 0,
"total_process_cycles": 0,
"total_failures": 0,
"start_time": time.time(),
}
# 配置参数
self.max_concurrent_streams = max_concurrent_streams or global_config.chat.max_concurrent_distributions
# 强制分发策略
self.force_dispatch_unread_threshold: int | None = getattr(
global_config.chat, "force_dispatch_unread_threshold", 20
)
self.force_dispatch_min_interval: float = getattr(global_config.chat, "force_dispatch_min_interval", 0.1)
# Chatter管理器
self.chatter_manager: ChatterManager | None = None
# 状态控制
self.is_running = False
logger.info(f"流循环管理器初始化完成 (最大并发流数: {self.max_concurrent_streams})")
async def start(self) -> None:
"""启动流循环管理器"""
if self.is_running:
logger.warning("流循环管理器已经在运行")
return
self.is_running = True
logger.info("流循环管理器已启动")
async def stop(self) -> None:
"""停止流循环管理器"""
if not self.is_running:
return
self.is_running = False
# 取消所有流循环
try:
# 使用带超时的锁获取,避免无限等待
lock_acquired = await asyncio.wait_for(self.loop_lock.acquire(), timeout=10.0)
if not lock_acquired:
logger.error("停止管理器时获取锁超时")
else:
try:
# 创建任务列表以便并发取消
cancel_tasks = []
for stream_id, task in list(self.stream_loops.items()):
if not task.done():
task.cancel()
cancel_tasks.append((stream_id, task))
# 并发等待所有任务取消
if cancel_tasks:
logger.info(f"正在取消 {len(cancel_tasks)} 个流循环任务...")
await asyncio.gather(
*[self._wait_for_task_cancel(stream_id, task) for stream_id, task in cancel_tasks],
return_exceptions=True
)
self.stream_loops.clear()
logger.info("所有流循环已清理")
finally:
self.loop_lock.release()
except asyncio.TimeoutError:
logger.error("停止管理器时获取锁超时")
except Exception as e:
logger.error(f"停止管理器时获取锁异常: {e}")
logger.info("流循环管理器已停止")
async def start_stream_loop(self, stream_id: str, force: bool = False) -> bool:
"""启动指定流的循环任务
Args:
stream_id: 流ID
Returns:
bool: 是否成功启动
"""
# 使用更细粒度的锁策略:先检查是否需要锁,再获取锁
# 快速路径:如果流已存在,无需获取锁
if stream_id in self.stream_loops:
logger.debug(f"{stream_id} 循环已在运行")
return True
# 判断是否需要强制分发(在锁外执行,减少锁持有时间)
should_force = force or self._should_force_dispatch_for_stream(stream_id)
# 获取锁进行流循环创建
try:
# 使用带超时的锁获取,避免无限等待
lock_acquired = await asyncio.wait_for(self.loop_lock.acquire(), timeout=5.0)
if not lock_acquired:
logger.error(f"获取流循环锁超时: {stream_id}")
return False
except asyncio.TimeoutError:
logger.error(f"获取流循环锁超时: {stream_id}")
return False
except Exception as e:
logger.error(f"获取流循环锁异常: {stream_id} - {e}")
return False
try:
# 双重检查:在获取锁后再次检查流是否已存在
if stream_id in self.stream_loops:
logger.debug(f"{stream_id} 循环已在运行(双重检查)")
return True
# 检查是否超过最大并发限制
current_streams = len(self.stream_loops)
if current_streams >= self.max_concurrent_streams and not should_force:
logger.warning(
f"超过最大并发流数限制({current_streams}/{self.max_concurrent_streams}),无法启动流 {stream_id}"
)
return False
if should_force and current_streams >= self.max_concurrent_streams:
logger.warning(
f"{stream_id} 未读消息积压严重(>{self.force_dispatch_unread_threshold}),突破并发限制强制启动分发 (当前: {current_streams}/{self.max_concurrent_streams})"
)
# 检查是否有现有的分发循环,如果有则先移除
if stream_id in self.stream_loops:
logger.info(f"发现现有流循环 {stream_id},将先移除再重新创建")
existing_task = self.stream_loops[stream_id]
if not existing_task.done():
existing_task.cancel()
# 创建异步任务来等待取消完成,并添加异常处理
cancel_task = asyncio.create_task(
self._wait_for_task_cancel(stream_id, existing_task),
name=f"cancel_existing_loop_{stream_id}"
)
# 为取消任务添加异常处理,避免孤儿任务
cancel_task.add_done_callback(
lambda task: logger.debug(f"取消任务完成: {stream_id}") if not task.exception()
else logger.error(f"取消任务异常: {stream_id} - {task.exception()}")
)
# 从字典中移除
del self.stream_loops[stream_id]
current_streams -= 1 # 更新当前流数量
# 创建流循环任务
try:
task = asyncio.create_task(
self._stream_loop(stream_id),
name=f"stream_loop_{stream_id}" # 为任务添加名称,便于调试
)
self.stream_loops[stream_id] = task
self.stats["total_loops"] += 1
logger.info(f"启动流循环: {stream_id} (当前总数: {len(self.stream_loops)})")
return True
except Exception as e:
logger.error(f"创建流循环任务失败: {stream_id} - {e}")
return False
finally:
# 确保锁被释放
self.loop_lock.release()
async def stop_stream_loop(self, stream_id: str) -> bool:
"""停止指定流的循环任务
Args:
stream_id: 流ID
Returns:
bool: 是否成功停止
"""
# 快速路径:如果流不存在,无需获取锁
if stream_id not in self.stream_loops:
logger.debug(f"{stream_id} 循环不存在,无需停止")
return False
# 获取锁进行流循环停止
try:
# 使用带超时的锁获取,避免无限等待
lock_acquired = await asyncio.wait_for(self.loop_lock.acquire(), timeout=5.0)
if not lock_acquired:
logger.error(f"获取流循环锁超时: {stream_id}")
return False
except asyncio.TimeoutError:
logger.error(f"获取流循环锁超时: {stream_id}")
return False
except Exception as e:
logger.error(f"获取流循环锁异常: {stream_id} - {e}")
return False
try:
# 双重检查:在获取锁后再次检查流是否存在
if stream_id not in self.stream_loops:
logger.debug(f"{stream_id} 循环不存在(双重检查)")
return False
task = self.stream_loops[stream_id]
if not task.done():
task.cancel()
try:
# 设置取消超时,避免无限等待
await asyncio.wait_for(task, timeout=5.0)
except asyncio.CancelledError:
logger.debug(f"流循环任务已取消: {stream_id}")
except asyncio.TimeoutError:
logger.warning(f"流循环任务取消超时: {stream_id}")
except Exception as e:
logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}")
del self.stream_loops[stream_id]
logger.info(f"停止流循环: {stream_id} (剩余: {len(self.stream_loops)})")
return True
finally:
# 确保锁被释放
self.loop_lock.release()
async def _stream_loop(self, stream_id: str) -> None:
"""单个流的无限循环
Args:
stream_id: 流ID
"""
logger.info(f"流循环开始: {stream_id}")
try:
while self.is_running:
try:
# 1. 获取流上下文
context = await self._get_stream_context(stream_id)
if not context:
logger.warning(f"无法获取流上下文: {stream_id}")
await asyncio.sleep(10.0)
continue
# 2. 检查是否有消息需要处理
unread_count = self._get_unread_count(context)
force_dispatch = self._needs_force_dispatch_for_context(context, unread_count)
has_messages = force_dispatch or await self._has_messages_to_process(context)
if has_messages:
if force_dispatch:
logger.info("%s 未读消息 %d 条,触发强制分发", stream_id, unread_count)
# 3. 激活chatter处理
success = await self._process_stream_messages(stream_id, context)
# 更新统计
self.stats["total_process_cycles"] += 1
if success:
logger.debug(f"流处理成功: {stream_id}")
else:
self.stats["total_failures"] += 1
logger.warning(f"流处理失败: {stream_id}")
# 4. 计算下次检查间隔
interval = await self._calculate_interval(stream_id, has_messages)
if has_messages:
updated_unread_count = self._get_unread_count(context)
if self._needs_force_dispatch_for_context(context, updated_unread_count):
interval = min(interval, max(self.force_dispatch_min_interval, 0.0))
logger.debug(
"%s 未读消息仍有 %d 条,使用加速分发间隔 %.2fs",
stream_id,
updated_unread_count,
interval,
)
# 5. sleep等待下次检查
logger.info(f"{stream_id} 等待 {interval:.2f}s")
await asyncio.sleep(interval)
except asyncio.CancelledError:
logger.info(f"流循环被取消: {stream_id}")
break
except Exception as e:
logger.error(f"流循环出错 {stream_id}: {e}", exc_info=True)
self.stats["total_failures"] += 1
await asyncio.sleep(5.0) # 错误时等待5秒再重试
finally:
# 清理循环标记
try:
# 使用带超时的锁获取,避免无限等待
lock_acquired = await asyncio.wait_for(self.loop_lock.acquire(), timeout=5.0)
if not lock_acquired:
logger.error(f"流结束时获取锁超时: {stream_id}")
else:
try:
if stream_id in self.stream_loops:
del self.stream_loops[stream_id]
logger.debug(f"清理流循环标记: {stream_id}")
finally:
self.loop_lock.release()
except asyncio.TimeoutError:
logger.error(f"流结束时获取锁超时: {stream_id}")
except Exception as e:
logger.error(f"流结束时获取锁异常: {stream_id} - {e}")
logger.info(f"流循环结束: {stream_id}")
async def _get_stream_context(self, stream_id: str) -> Any | None:
"""获取流上下文
Args:
stream_id: 流ID
Returns:
Optional[Any]: 流上下文如果不存在返回None
"""
try:
chat_manager = get_chat_manager()
chat_stream = chat_manager.get_stream(stream_id)
if chat_stream:
return chat_stream.context_manager.context
return None
except Exception as e:
logger.error(f"获取流上下文失败 {stream_id}: {e}")
return None
async def _has_messages_to_process(self, context: Any) -> bool:
"""检查是否有消息需要处理
Args:
context: 流上下文
Returns:
bool: 是否有未读消息
"""
try:
# 检查是否有未读消息
if hasattr(context, "unread_messages") and context.unread_messages:
return True
# 检查其他需要处理的条件
if hasattr(context, "has_pending_messages") and context.has_pending_messages:
return True
return False
except Exception as e:
logger.error(f"检查消息状态失败: {e}")
return False
async def _process_stream_messages(self, stream_id: str, context: Any) -> bool:
"""处理流消息
Args:
stream_id: 流ID
context: 流上下文
Returns:
bool: 是否处理成功
"""
if not self.chatter_manager:
logger.warning(f"Chatter管理器未设置: {stream_id}")
return False
try:
start_time = time.time()
# 直接调用chatter_manager处理流上下文
results = await self.chatter_manager.process_stream_context(stream_id, context)
success = results.get("success", False)
if success:
await self._refresh_focus_energy(stream_id)
process_time = time.time() - start_time
logger.debug(f"流处理成功: {stream_id} (耗时: {process_time:.2f}s)")
else:
logger.warning(f"流处理失败: {stream_id} - {results.get('error_message', '未知错误')}")
return success
except Exception as e:
logger.error(f"流处理异常: {stream_id} - {e}", exc_info=True)
return False
async def _calculate_interval(self, stream_id: str, has_messages: bool) -> float:
"""计算下次检查间隔
Args:
stream_id: 流ID
has_messages: 本次是否有消息处理
Returns:
float: 间隔时间(秒)
"""
# 基础间隔
base_interval = getattr(global_config.chat, "distribution_interval", 5.0)
# 如果没有消息,使用更长的间隔
if not has_messages:
return base_interval * 2.0 # 无消息时间隔加倍
# 尝试使用能量管理器计算间隔
try:
# 获取当前focus_energy
focus_energy = energy_manager.energy_cache.get(stream_id, (0.5, 0))[0]
# 使用能量管理器计算间隔
interval = energy_manager.get_distribution_interval(focus_energy)
logger.debug(f"{stream_id} 动态间隔: {interval:.2f}s (能量: {focus_energy:.3f})")
return interval
except Exception as e:
logger.debug(f"{stream_id} 使用默认间隔: {base_interval:.2f}s ({e})")
return base_interval
def get_queue_status(self) -> dict[str, Any]:
"""获取队列状态
Returns:
Dict[str, Any]: 队列状态信息
"""
current_time = time.time()
uptime = current_time - self.stats["start_time"] if self.is_running else 0
return {
"active_streams": len(self.stream_loops),
"total_loops": self.stats["total_loops"],
"max_concurrent": self.max_concurrent_streams,
"is_running": self.is_running,
"uptime": uptime,
"total_process_cycles": self.stats["total_process_cycles"],
"total_failures": self.stats["total_failures"],
"stats": self.stats.copy(),
}
def set_chatter_manager(self, chatter_manager: ChatterManager) -> None:
"""设置chatter管理器
Args:
chatter_manager: chatter管理器实例
"""
self.chatter_manager = chatter_manager
logger.info(f"设置chatter管理器: {chatter_manager.__class__.__name__}")
def _should_force_dispatch_for_stream(self, stream_id: str) -> bool:
if not self.force_dispatch_unread_threshold or self.force_dispatch_unread_threshold <= 0:
return False
try:
chat_manager = get_chat_manager()
chat_stream = chat_manager.get_stream(stream_id)
if not chat_stream:
return False
unread = getattr(chat_stream.context_manager.context, "unread_messages", [])
return len(unread) > self.force_dispatch_unread_threshold
except Exception as e:
logger.debug(f"检查流 {stream_id} 是否需要强制分发失败: {e}")
return False
def _get_unread_count(self, context: Any) -> int:
try:
unread_messages = getattr(context, "unread_messages", None)
if unread_messages is None:
return 0
return len(unread_messages)
except Exception:
return 0
def _needs_force_dispatch_for_context(self, context: Any, unread_count: int | None = None) -> bool:
if not self.force_dispatch_unread_threshold or self.force_dispatch_unread_threshold <= 0:
return False
count = unread_count if unread_count is not None else self._get_unread_count(context)
return count > self.force_dispatch_unread_threshold
def get_performance_summary(self) -> dict[str, Any]:
"""获取性能摘要
Returns:
Dict[str, Any]: 性能摘要
"""
current_time = time.time()
uptime = current_time - self.stats["start_time"]
# 计算吞吐量
throughput = self.stats["total_process_cycles"] / max(1, uptime / 3600) # 每小时处理次数
return {
"uptime_hours": uptime / 3600,
"active_streams": len(self.stream_loops),
"total_process_cycles": self.stats["total_process_cycles"],
"total_failures": self.stats["total_failures"],
"throughput_per_hour": throughput,
"max_concurrent_streams": self.max_concurrent_streams,
}
async def _refresh_focus_energy(self, stream_id: str) -> None:
"""分发完成后基于历史消息刷新能量值"""
try:
chat_manager = get_chat_manager()
chat_stream = chat_manager.get_stream(stream_id)
if not chat_stream:
logger.debug(f"刷新能量时未找到聊天流: {stream_id}")
return
await chat_stream.context_manager.refresh_focus_energy_from_history()
logger.debug(f"已刷新聊天流 {stream_id} 的聚焦能量")
except Exception as e:
logger.warning(f"刷新聊天流 {stream_id} 能量失败: {e}")
async def _wait_for_task_cancel(self, stream_id: str, task: asyncio.Task) -> None:
"""等待任务取消完成,带有超时控制
Args:
stream_id: 流ID
task: 要等待取消的任务
"""
try:
await asyncio.wait_for(task, timeout=5.0)
logger.debug(f"流循环任务已正常结束: {stream_id}")
except asyncio.CancelledError:
logger.debug(f"流循环任务已取消: {stream_id}")
except asyncio.TimeoutError:
logger.warning(f"流循环任务取消超时: {stream_id}")
except Exception as e:
logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}")
def _force_dispatch_stream(self, stream_id: str) -> None:
"""强制分发流处理
当流的未读消息超过阈值时,强制触发分发处理
这个方法主要用于突破并发限制时的紧急处理
注意:此方法目前未被使用,相关功能已集成到 start_stream_loop 方法中
Args:
stream_id: 流ID
"""
logger.info(f"强制分发流处理: {stream_id}")
try:
# 检查是否有现有的分发循环
if stream_id in self.stream_loops:
logger.info(f"发现现有流循环 {stream_id},将先移除再重新创建")
existing_task = self.stream_loops[stream_id]
if not existing_task.done():
existing_task.cancel()
# 创建异步任务来等待取消完成,并添加异常处理
cancel_task = asyncio.create_task(
self._wait_for_task_cancel(stream_id, existing_task),
name=f"cancel_existing_loop_{stream_id}"
)
# 为取消任务添加异常处理,避免孤儿任务
cancel_task.add_done_callback(
lambda task: logger.debug(f"取消任务完成: {stream_id}") if not task.exception()
else logger.error(f"取消任务异常: {stream_id} - {task.exception()}")
)
# 从字典中移除
del self.stream_loops[stream_id]
# 获取聊天管理器和流
chat_manager = get_chat_manager()
chat_stream = chat_manager.get_stream(stream_id)
if not chat_stream:
logger.warning(f"强制分发时未找到流: {stream_id}")
return
# 获取流上下文
context = chat_stream.context_manager.context
if not context:
logger.warning(f"强制分发时未找到流上下文: {stream_id}")
return
# 检查未读消息数量
unread_count = self._get_unread_count(context)
logger.info(f"{stream_id} 当前未读消息数: {unread_count}")
# 创建新的流循环任务
new_task = asyncio.create_task(
self._stream_loop(stream_id),
name=f"force_stream_loop_{stream_id}"
)
self.stream_loops[stream_id] = new_task
self.stats["total_loops"] += 1
logger.info(f"已创建强制分发流循环: {stream_id} (当前总数: {len(self.stream_loops)})")
except Exception as e:
logger.error(f"强制分发流处理失败 {stream_id}: {e}", exc_info=True)
# 全局流循环管理器实例
stream_loop_manager = StreamLoopManager()