refactor(chat): 优化异步任务处理和消息管理逻辑

- 使用asyncio.create_task替代await调用,提升并发性能
- 简化流管理器的槽位获取逻辑,移除回退方案
- 重构上下文管理器的消息添加和更新机制
- 移除StreamContext中的冗余方法,保持数据模型的简洁性
- 优化兴趣度评分系统的更新流程,减少阻塞操作

这些改动主要关注性能优化和代码结构简化,不涉及功能变更。
This commit is contained in:
Windpicker-owo
2025-10-05 15:17:30 +08:00
parent 5edf50705c
commit 23e523c7b9
9 changed files with 92 additions and 255 deletions

View File

@@ -23,8 +23,6 @@ class StreamLoopManager:
def __init__(self, max_concurrent_streams: int | None = None):
# 流循环任务管理
self.stream_loops: dict[str, asyncio.Task] = {}
# 跟踪流使用的管理器类型
self.stream_management_type: dict[str, str] = {} # stream_id -> "adaptive" or "fallback"
# 统计信息
self.stats: dict[str, Any] = {
@@ -115,7 +113,6 @@ class StreamLoopManager:
return True
# 使用自适应流管理器获取槽位
use_adaptive = False
try:
from src.chat.message_manager.adaptive_stream_manager import get_adaptive_stream_manager
adaptive_manager = get_adaptive_stream_manager()
@@ -132,21 +129,14 @@ class StreamLoopManager:
)
if slot_acquired:
use_adaptive = True
logger.debug(f"成功获取流处理槽位: {stream_id} (优先级: {priority.name})")
else:
logger.debug(f"自适应管理器拒绝槽位请求: {stream_id},尝试回退方案")
else:
logger.debug("自适应管理器未运行,使用原始方法")
logger.debug("自适应管理器未运行")
except Exception as e:
logger.debug(f"自适应管理器获取槽位失败,使用原始方法: {e}")
# 如果自适应管理器失败或未运行,使用回退方案
if not use_adaptive:
if not await self._fallback_acquire_slot(stream_id, force):
logger.debug(f"回退方案也失败: {stream_id}")
return False
logger.debug(f"自适应管理器获取槽位失败: {e}")
# 创建流循环任务
try:
@@ -155,68 +145,22 @@ class StreamLoopManager:
name=f"stream_loop_{stream_id}"
)
self.stream_loops[stream_id] = loop_task
# 记录管理器类型
self.stream_management_type[stream_id] = "adaptive" if use_adaptive else "fallback"
# 更新统计信息
self.stats["active_streams"] += 1
self.stats["total_loops"] += 1
logger.info(f"启动流循环任务: {stream_id} (管理器: {'adaptive' if use_adaptive else 'fallback'})")
logger.info(f"启动流循环任务: {stream_id}")
return True
except Exception as e:
logger.error(f"启动流循环任务失败 {stream_id}: {e}")
# 释放槽位
if use_adaptive:
try:
from src.chat.message_manager.adaptive_stream_manager import get_adaptive_stream_manager
adaptive_manager = get_adaptive_stream_manager()
adaptive_manager.release_stream_slot(stream_id)
except:
pass
from src.chat.message_manager.adaptive_stream_manager import get_adaptive_stream_manager
adaptive_manager = get_adaptive_stream_manager()
adaptive_manager.release_stream_slot(stream_id)
return False
async def _fallback_acquire_slot(self, stream_id: str, force: bool) -> bool:
"""回退方案:获取槽位(原始方法)"""
# 判断是否需要强制分发
should_force = force or await self._should_force_dispatch_for_stream(stream_id)
# 检查是否超过最大并发限制
current_streams = len(self.stream_loops)
if current_streams >= self.max_concurrent_streams and not should_force:
logger.warning(
f"超过最大并发流数限制({current_streams}/{self.max_concurrent_streams}),无法启动流 {stream_id}"
)
return False
# 处理强制分发情况
if should_force and current_streams >= self.max_concurrent_streams:
logger.warning(
f"{stream_id} 未读消息积压严重(>{self.force_dispatch_unread_threshold}),突破并发限制强制启动分发 (当前: {current_streams}/{self.max_concurrent_streams})"
)
# 检查是否有现有的分发循环,如果有则先移除
if stream_id in self.stream_loops:
logger.info(f"发现现有流循环 {stream_id},将先移除再重新创建")
existing_task = self.stream_loops[stream_id]
if not existing_task.done():
existing_task.cancel()
# 创建异步任务来等待取消完成,并添加异常处理
cancel_task = asyncio.create_task(
self._wait_for_task_cancel(stream_id, existing_task),
name=f"cancel_existing_loop_{stream_id}"
)
# 为取消任务添加异常处理,避免孤儿任务
cancel_task.add_done_callback(
lambda task: logger.debug(f"取消任务完成: {stream_id}") if not task.exception()
else logger.error(f"取消任务异常: {stream_id} - {task.exception()}")
)
# 从字典中移除
del self.stream_loops[stream_id]
current_streams -= 1 # 更新当前流数量
return True
def _determine_stream_priority(self, stream_id: str) -> "StreamPriority":
"""确定流优先级"""
try:
@@ -237,20 +181,6 @@ class StreamLoopManager:
from src.chat.message_manager.adaptive_stream_manager import StreamPriority
return StreamPriority.NORMAL
# 创建流循环任务
try:
task = asyncio.create_task(
self._stream_loop(stream_id),
name=f"stream_loop_{stream_id}" # 为任务添加名称,便于调试
)
self.stream_loops[stream_id] = task
self.stats["total_loops"] += 1
logger.info(f"启动流循环: {stream_id} (当前总数: {len(self.stream_loops)})")
return True
except Exception as e:
logger.error(f"创建流循环任务失败: {stream_id} - {e}")
return False
async def stop_stream_loop(self, stream_id: str) -> bool:
"""停止指定流的循环任务
@@ -342,17 +272,6 @@ class StreamLoopManager:
# 4. 计算下次检查间隔
interval = await self._calculate_interval(stream_id, has_messages)
if has_messages:
updated_unread_count = self._get_unread_count(context)
if self._needs_force_dispatch_for_context(context, updated_unread_count):
interval = min(interval, max(self.force_dispatch_min_interval, 0.0))
logger.debug(
"%s 未读消息仍有 %d 条,使用加速分发间隔 %.2fs",
stream_id,
updated_unread_count,
interval,
)
# 5. sleep等待下次检查
logger.info(f"{stream_id} 等待 {interval:.2f}s")
await asyncio.sleep(interval)
@@ -378,23 +297,14 @@ class StreamLoopManager:
del self.stream_loops[stream_id]
logger.debug(f"清理流循环标记: {stream_id}")
# 根据管理器类型释放相应的槽位
management_type = self.stream_management_type.get(stream_id, "fallback")
if management_type == "adaptive":
# 释放自适应管理器的槽位
try:
from src.chat.message_manager.adaptive_stream_manager import get_adaptive_stream_manager
adaptive_manager = get_adaptive_stream_manager()
adaptive_manager.release_stream_slot(stream_id)
logger.debug(f"释放自适应流处理槽位: {stream_id}")
except Exception as e:
logger.debug(f"释放自适应流处理槽位失败: {e}")
else:
logger.debug(f"{stream_id} 使用回退方案,无需释放自适应槽位")
# 清理管理器类型记录
if stream_id in self.stream_management_type:
del self.stream_management_type[stream_id]
# 释放自适应管理器的槽位
try:
from src.chat.message_manager.adaptive_stream_manager import get_adaptive_stream_manager
adaptive_manager = get_adaptive_stream_manager()
adaptive_manager.release_stream_slot(stream_id)
logger.debug(f"释放自适应流处理槽位: {stream_id}")
except Exception as e:
logger.debug(f"释放自适应流处理槽位失败: {e}")
logger.info(f"流循环结束: {stream_id}")
@@ -417,7 +327,7 @@ class StreamLoopManager:
logger.error(f"获取流上下文失败 {stream_id}: {e}")
return None
async def _has_messages_to_process(self, context: Any) -> bool:
async def _has_messages_to_process(self, context: StreamContext) -> bool:
"""检查是否有消息需要处理
Args:
@@ -464,7 +374,7 @@ class StreamLoopManager:
success = results.get("success", False)
if success:
await self._refresh_focus_energy(stream_id)
asyncio.create_task(self._refresh_focus_energy(stream_id))
process_time = time.time() - start_time
logger.debug(f"流处理成功: {stream_id} (耗时: {process_time:.2f}s)")
else:
@@ -553,16 +463,16 @@ class StreamLoopManager:
logger.debug(f"检查流 {stream_id} 是否需要强制分发失败: {e}")
return False
def _get_unread_count(self, context: Any) -> int:
def _get_unread_count(self, context: StreamContext) -> int:
try:
unread_messages = getattr(context, "unread_messages", None)
unread_messages = context.unread_messages
if unread_messages is None:
return 0
return len(unread_messages)
except Exception:
return 0
def _needs_force_dispatch_for_context(self, context: Any, unread_count: int | None = None) -> bool:
def _needs_force_dispatch_for_context(self, context: StreamContext, unread_count: int | None = None) -> bool:
if not self.force_dispatch_unread_threshold or self.force_dispatch_unread_threshold <= 0:
return False