refactor(chat): 优化流循环管理和数据库性能

移除StreamLoopManager中的锁机制,简化并发流处理逻辑
- 删除loop_lock,减少锁竞争和超时问题
- 优化流启动、停止和清理流程
- 增强错误处理和日志记录

增强数据库操作性能
- 集成数据库批量调度器和连接池管理器
- 优化ChatStream保存机制,支持批量更新
- 改进数据库会话管理,提高并发性能

清理和优化代码结构
- 移除affinity_chatter中的重复方法
- 改进prompt表达习惯格式化
- 完善系统启动和清理流程
This commit is contained in:
Windpicker-owo
2025-10-03 13:56:58 +08:00
parent 2fc8e26d3c
commit f8aa149c39
10 changed files with 974 additions and 214 deletions

View File

@@ -23,7 +23,6 @@ class StreamLoopManager:
def __init__(self, max_concurrent_streams: int | None = None):
# 流循环任务管理
self.stream_loops: dict[str, asyncio.Task] = {}
self.loop_lock = asyncio.Lock()
# 统计信息
self.stats: dict[str, Any] = {
@@ -69,35 +68,25 @@ class StreamLoopManager:
# 取消所有流循环
try:
# 使用带超时的锁获取,避免无限等待
lock_acquired = await asyncio.wait_for(self.loop_lock.acquire(), timeout=10.0)
if not lock_acquired:
logger.error("停止管理器时获取锁超时")
else:
try:
# 创建任务列表以便并发取消
cancel_tasks = []
for stream_id, task in list(self.stream_loops.items()):
if not task.done():
task.cancel()
cancel_tasks.append((stream_id, task))
# 并发等待所有任务取消
if cancel_tasks:
logger.info(f"正在取消 {len(cancel_tasks)} 个流循环任务...")
await asyncio.gather(
*[self._wait_for_task_cancel(stream_id, task) for stream_id, task in cancel_tasks],
return_exceptions=True
)
self.stream_loops.clear()
logger.info("所有流循环已清理")
finally:
self.loop_lock.release()
except asyncio.TimeoutError:
logger.error("停止管理器时获取锁超时")
# 创建任务列表以便并发取消
cancel_tasks = []
for stream_id, task in list(self.stream_loops.items()):
if not task.done():
task.cancel()
cancel_tasks.append((stream_id, task))
# 并发等待所有任务取消
if cancel_tasks:
logger.info(f"正在取消 {len(cancel_tasks)} 个流循环任务...")
await asyncio.gather(
*[self._wait_for_task_cancel(stream_id, task) for stream_id, task in cancel_tasks],
return_exceptions=True
)
self.stream_loops.clear()
logger.info("所有流循环已清理")
except Exception as e:
logger.error(f"停止管理器时获取锁异常: {e}")
logger.error(f"停止管理器时出错: {e}")
logger.info("流循环管理器已停止")
@@ -106,88 +95,66 @@ class StreamLoopManager:
Args:
stream_id: 流ID
force: 是否强制启动
Returns:
bool: 是否成功启动
"""
# 使用更细粒度的锁策略:先检查是否需要锁,再获取锁
# 快速路径:如果流已存在,无需获取锁
# 快速路径:如果流已存在,无需处理
if stream_id in self.stream_loops:
logger.debug(f"{stream_id} 循环已在运行")
return True
# 判断是否需要强制分发(在锁外执行,减少锁持有时间)
# 判断是否需要强制分发
should_force = force or self._should_force_dispatch_for_stream(stream_id)
# 获取锁进行流循环创建
try:
# 使用带超时的锁获取,避免无限等待
lock_acquired = await asyncio.wait_for(self.loop_lock.acquire(), timeout=5.0)
if not lock_acquired:
logger.error(f"获取流循环锁超时: {stream_id}")
return False
except asyncio.TimeoutError:
logger.error(f"获取流循环锁超时: {stream_id}")
return False
except Exception as e:
logger.error(f"获取流循环锁异常: {stream_id} - {e}")
# 检查是否超过最大并发限制
current_streams = len(self.stream_loops)
if current_streams >= self.max_concurrent_streams and not should_force:
logger.warning(
f"超过最大并发流数限制({current_streams}/{self.max_concurrent_streams}),无法启动流 {stream_id}"
)
return False
try:
# 双重检查:在获取锁后再次检查流是否已存在
# 处理强制分发情况
if should_force and current_streams >= self.max_concurrent_streams:
logger.warning(
f"{stream_id} 未读消息积压严重(>{self.force_dispatch_unread_threshold}),突破并发限制强制启动分发 (当前: {current_streams}/{self.max_concurrent_streams})"
)
# 检查是否有现有的分发循环,如果有则先移除
if stream_id in self.stream_loops:
logger.debug(f" {stream_id} 循环已在运行(双重检查)")
return True
logger.info(f"发现现有流循环 {stream_id},将先移除再重新创建")
existing_task = self.stream_loops[stream_id]
if not existing_task.done():
existing_task.cancel()
# 创建异步任务来等待取消完成,并添加异常处理
cancel_task = asyncio.create_task(
self._wait_for_task_cancel(stream_id, existing_task),
name=f"cancel_existing_loop_{stream_id}"
)
# 为取消任务添加异常处理,避免孤儿任务
cancel_task.add_done_callback(
lambda task: logger.debug(f"取消任务完成: {stream_id}") if not task.exception()
else logger.error(f"取消任务异常: {stream_id} - {task.exception()}")
)
# 从字典中移除
del self.stream_loops[stream_id]
current_streams -= 1 # 更新当前流数量
# 检查是否超过最大并发限制
current_streams = len(self.stream_loops)
if current_streams >= self.max_concurrent_streams and not should_force:
logger.warning(
f"超过最大并发流数限制({current_streams}/{self.max_concurrent_streams}),无法启动流 {stream_id}"
)
return False
# 创建流循环任务
try:
task = asyncio.create_task(
self._stream_loop(stream_id),
name=f"stream_loop_{stream_id}" # 为任务添加名称,便于调试
)
self.stream_loops[stream_id] = task
self.stats["total_loops"] += 1
if should_force and current_streams >= self.max_concurrent_streams:
logger.warning(
f"{stream_id} 未读消息积压严重(>{self.force_dispatch_unread_threshold}),突破并发限制强制启动分发 (当前: {current_streams}/{self.max_concurrent_streams})"
)
# 检查是否有现有的分发循环,如果有则先移除
if stream_id in self.stream_loops:
logger.info(f"发现现有流循环 {stream_id},将先移除再重新创建")
existing_task = self.stream_loops[stream_id]
if not existing_task.done():
existing_task.cancel()
# 创建异步任务来等待取消完成,并添加异常处理
cancel_task = asyncio.create_task(
self._wait_for_task_cancel(stream_id, existing_task),
name=f"cancel_existing_loop_{stream_id}"
)
# 为取消任务添加异常处理,避免孤儿任务
cancel_task.add_done_callback(
lambda task: logger.debug(f"取消任务完成: {stream_id}") if not task.exception()
else logger.error(f"取消任务异常: {stream_id} - {task.exception()}")
)
# 从字典中移除
del self.stream_loops[stream_id]
current_streams -= 1 # 更新当前流数量
# 创建流循环任务
try:
task = asyncio.create_task(
self._stream_loop(stream_id),
name=f"stream_loop_{stream_id}" # 为任务添加名称,便于调试
)
self.stream_loops[stream_id] = task
self.stats["total_loops"] += 1
logger.info(f"启动流循环: {stream_id} (当前总数: {len(self.stream_loops)})")
return True
except Exception as e:
logger.error(f"创建流循环任务失败: {stream_id} - {e}")
return False
finally:
# 确保锁被释放
self.loop_lock.release()
logger.info(f"启动流循环: {stream_id} (当前总数: {len(self.stream_loops)})")
return True
except Exception as e:
logger.error(f"创建流循环任务失败: {stream_id} - {e}")
return False
async def stop_stream_loop(self, stream_id: str) -> bool:
"""停止指定流的循环任务
@@ -198,50 +165,27 @@ class StreamLoopManager:
Returns:
bool: 是否成功停止
"""
# 快速路径:如果流不存在,无需获取锁
# 快速路径:如果流不存在,无需处理
if stream_id not in self.stream_loops:
logger.debug(f"{stream_id} 循环不存在,无需停止")
return False
# 获取锁进行流循环停止
try:
# 使用带超时的锁获取,避免无限等待
lock_acquired = await asyncio.wait_for(self.loop_lock.acquire(), timeout=5.0)
if not lock_acquired:
logger.error(f"获取流循环锁超时: {stream_id}")
return False
except asyncio.TimeoutError:
logger.error(f"获取流循环锁超时: {stream_id}")
return False
except Exception as e:
logger.error(f"获取流循环锁异常: {stream_id} - {e}")
return False
task = self.stream_loops[stream_id]
if not task.done():
task.cancel()
try:
# 设置取消超时,避免无限等待
await asyncio.wait_for(task, timeout=5.0)
except asyncio.CancelledError:
logger.debug(f"流循环任务已取消: {stream_id}")
except asyncio.TimeoutError:
logger.warning(f"流循环任务取消超时: {stream_id}")
except Exception as e:
logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}")
try:
# 双重检查:在获取锁后再次检查流是否存在
if stream_id not in self.stream_loops:
logger.debug(f"{stream_id} 循环不存在(双重检查)")
return False
task = self.stream_loops[stream_id]
if not task.done():
task.cancel()
try:
# 设置取消超时,避免无限等待
await asyncio.wait_for(task, timeout=5.0)
except asyncio.CancelledError:
logger.debug(f"流循环任务已取消: {stream_id}")
except asyncio.TimeoutError:
logger.warning(f"流循环任务取消超时: {stream_id}")
except Exception as e:
logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}")
del self.stream_loops[stream_id]
logger.info(f"停止流循环: {stream_id} (剩余: {len(self.stream_loops)})")
return True
finally:
# 确保锁被释放
self.loop_lock.release()
del self.stream_loops[stream_id]
logger.info(f"停止流循环: {stream_id} (剩余: {len(self.stream_loops)})")
return True
async def _stream_loop(self, stream_id: str) -> None:
"""单个流的无限循环
@@ -309,22 +253,9 @@ class StreamLoopManager:
finally:
# 清理循环标记
try:
# 使用带超时的锁获取,避免无限等待
lock_acquired = await asyncio.wait_for(self.loop_lock.acquire(), timeout=5.0)
if not lock_acquired:
logger.error(f"流结束时获取锁超时: {stream_id}")
else:
try:
if stream_id in self.stream_loops:
del self.stream_loops[stream_id]
logger.debug(f"清理流循环标记: {stream_id}")
finally:
self.loop_lock.release()
except asyncio.TimeoutError:
logger.error(f"流结束时获取锁超时: {stream_id}")
except Exception as e:
logger.error(f"流结束时获取锁异常: {stream_id} - {e}")
if stream_id in self.stream_loops:
del self.stream_loops[stream_id]
logger.debug(f"清理流循环标记: {stream_id}")
logger.info(f"流循环结束: {stream_id}")

View File

@@ -601,6 +601,37 @@ class ChatManager:
else:
return None
@staticmethod
def _prepare_stream_data(stream_data_dict: dict) -> dict:
"""准备聊天流保存数据"""
user_info_d = stream_data_dict.get("user_info")
group_info_d = stream_data_dict.get("group_info")
return {
"platform": stream_data_dict["platform"],
"create_time": stream_data_dict["create_time"],
"last_active_time": stream_data_dict["last_active_time"],
"user_platform": user_info_d["platform"] if user_info_d else "",
"user_id": user_info_d["user_id"] if user_info_d else "",
"user_nickname": user_info_d["user_nickname"] if user_info_d else "",
"user_cardname": user_info_d.get("user_cardname", "") if user_info_d else None,
"group_platform": group_info_d["platform"] if group_info_d else "",
"group_id": group_info_d["group_id"] if group_info_d else "",
"group_name": group_info_d["group_name"] if group_info_d else "",
"energy_value": stream_data_dict.get("energy_value", 5.0),
"sleep_pressure": stream_data_dict.get("sleep_pressure", 0.0),
"focus_energy": stream_data_dict.get("focus_energy", 0.5),
# 新增动态兴趣度系统字段
"base_interest_energy": stream_data_dict.get("base_interest_energy", 0.5),
"message_interest_total": stream_data_dict.get("message_interest_total", 0.0),
"message_count": stream_data_dict.get("message_count", 0),
"action_count": stream_data_dict.get("action_count", 0),
"reply_count": stream_data_dict.get("reply_count", 0),
"last_interaction_time": stream_data_dict.get("last_interaction_time", time.time()),
"consecutive_no_reply": stream_data_dict.get("consecutive_no_reply", 0),
"interruption_count": stream_data_dict.get("interruption_count", 0),
}
@staticmethod
async def _save_stream(stream: ChatStream):
"""保存聊天流到数据库"""
@@ -608,6 +639,25 @@ class ChatManager:
return
stream_data_dict = stream.to_dict()
# 尝试使用数据库批量调度器
try:
from src.common.database.db_batch_scheduler import batch_update, get_batch_session
async with get_batch_session() as scheduler:
# 使用批量更新
result = await batch_update(
model_class=ChatStreams,
conditions={"stream_id": stream_data_dict["stream_id"]},
data=ChatManager._prepare_stream_data(stream_data_dict)
)
if result and result > 0:
stream.saved = True
logger.debug(f"聊天流 {stream.stream_id} 通过批量调度器保存成功")
return
except (ImportError, Exception) as e:
logger.debug(f"批量调度器保存聊天流失败,使用原始方法: {e}")
# 回退到原始方法
async def _db_save_stream_async(s_data_dict: dict):
async with get_db_session() as session:
user_info_d = s_data_dict.get("user_info")

View File

@@ -293,14 +293,14 @@ class DefaultReplyer:
try:
# 构建 Prompt
with Timer("构建Prompt", {}): # 内部计时器,可选保留
prompt,selected_expressions = await self.build_prompt_reply_context(
prompt = await asyncio.create_task(self.build_prompt_reply_context(
reply_to=reply_to,
extra_info=extra_info,
available_actions=available_actions,
choosen_actions=choosen_actions,
enable_tool=enable_tool,
reply_message=reply_message,
read_mark=read_mark,
)
))
if not prompt:
logger.warning("构建prompt失败跳过回复生成")
@@ -591,7 +591,7 @@ class DefaultReplyer:
# 获取记忆系统实例
memory_system = get_memory_system()
# 检索相关记忆
# 使用统一记忆系统检索相关记忆
enhanced_memories = await memory_system.retrieve_relevant_memories(
query=target, user_id=memory_user_id, scope_id=stream.stream_id, context=memory_context, limit=10
)

View File

@@ -526,8 +526,20 @@ class Prompt:
# 构建表达习惯块
if selected_expressions:
style_habits_str = "\n".join([f"- {expr}" for expr in selected_expressions])
expression_habits_block = f"- 你可以参考以下的语言习惯,当情景合适就使用,但不要生硬使用,以合理的方式结合到你的回复中:\n{style_habits_str}"
# 格式化表达方式,提取关键信息
formatted_expressions = []
for expr in selected_expressions:
if isinstance(expr, dict):
situation = expr.get("situation", "")
style = expr.get("style", "")
if situation and style:
formatted_expressions.append(f"- {situation}{style}")
if formatted_expressions:
style_habits_str = "\n".join(formatted_expressions)
expression_habits_block = f"你可以参考以下的语言习惯,当情景合适就使用,但不要生硬使用,以合理的方式结合到你的回复中:\n{style_habits_str}"
else:
expression_habits_block = ""
else:
expression_habits_block = ""