refactor(chat): 迁移数据库操作为异步模式并修复相关调用

将同步数据库操作全面迁移为异步模式,主要涉及:
- 将 `with get_db_session()` 改为 `async with get_db_session()`
- 修复相关异步调用链,确保 await 正确传递
- 优化消息管理器、上下文管理器等核心组件的异步处理
- 移除同步的 person_id 获取方法,避免协程对象传递问题

修复 deepcopy 在 StreamContext 中的序列化问题,跳过不可序列化的 asyncio.Task 对象

删除无用的测试文件和废弃的插件清单文件
This commit is contained in:
Windpicker-owo
2025-09-28 20:40:46 +08:00
parent 9836d317b8
commit c31fd02daf
28 changed files with 526 additions and 604 deletions

View File

@@ -42,7 +42,7 @@ class SingleStreamContextManager:
self._update_access_stats()
return self.context
def add_message(self, message: DatabaseMessages, skip_energy_update: bool = False) -> bool:
async def add_message(self, message: DatabaseMessages, skip_energy_update: bool = False) -> bool:
"""添加消息到上下文
Args:
@@ -53,30 +53,21 @@ class SingleStreamContextManager:
bool: 是否成功添加
"""
try:
# 添加消息到上下文
self.context.add_message(message)
# 计算消息兴趣度
interest_value = self._calculate_message_interest(message)
interest_value = await self._calculate_message_interest(message)
message.interest_value = interest_value
# 更新统计
self.total_messages += 1
self.last_access_time = time.time()
# 更新能量和分发
if not skip_energy_update:
self._update_stream_energy()
await self._update_stream_energy()
distribution_manager.add_stream_message(self.stream_id, 1)
logger.debug(f"添加消息到单流上下文: {self.stream_id} (兴趣度: {interest_value:.3f})")
return True
except Exception as e:
logger.error(f"添加消息到单流上下文失败 {self.stream_id}: {e}", exc_info=True)
return False
def update_message(self, message_id: str, updates: Dict[str, Any]) -> bool:
async def update_message(self, message_id: str, updates: Dict[str, Any]) -> bool:
"""更新上下文中的消息
Args:
@@ -87,16 +78,11 @@ class SingleStreamContextManager:
bool: 是否成功更新
"""
try:
# 更新消息信息
self.context.update_message_info(message_id, **updates)
# 如果更新了兴趣度,重新计算能量
if "interest_value" in updates:
self._update_stream_energy()
await self._update_stream_energy()
logger.debug(f"更新单流上下文消息: {self.stream_id}/{message_id}")
return True
except Exception as e:
logger.error(f"更新单流上下文消息失败 {self.stream_id}/{message_id}: {e}", exc_info=True)
return False
@@ -164,16 +150,13 @@ class SingleStreamContextManager:
logger.error(f"标记消息已读失败 {self.stream_id}: {e}", exc_info=True)
return False
def clear_context(self) -> bool:
async def clear_context(self) -> bool:
"""清空上下文"""
try:
# 清空消息
if hasattr(self.context, "unread_messages"):
self.context.unread_messages.clear()
if hasattr(self.context, "history_messages"):
self.context.history_messages.clear()
# 重置状态
reset_attrs = ["interruption_count", "afc_threshold_adjustment", "last_check_time"]
for attr in reset_attrs:
if hasattr(self.context, attr):
@@ -181,13 +164,9 @@ class SingleStreamContextManager:
setattr(self.context, attr, 0)
else:
setattr(self.context, attr, time.time())
# 重新计算能量
self._update_stream_energy()
await self._update_stream_energy()
logger.info(f"清空单流上下文: {self.stream_id}")
return True
except Exception as e:
logger.error(f"清空单流上下文失败 {self.stream_id}: {e}", exc_info=True)
return False
@@ -249,39 +228,115 @@ class SingleStreamContextManager:
self.last_access_time = time.time()
self.access_count += 1
def _calculate_message_interest(self, message: DatabaseMessages) -> float:
"""计算消息兴趣度"""
async def _calculate_message_interest(self, message: DatabaseMessages) -> float:
"""异步实现:使用插件的异步评分器正确 await 计算兴趣度并返回分数。"""
try:
# 使用插件内部的兴趣度评分系统
try:
from src.plugins.built_in.affinity_flow_chatter.interest_scoring import chatter_interest_scoring_system
# 使用插件内部的兴趣度评分系统计算(同步方式)
from src.plugins.built_in.affinity_flow_chatter.interest_scoring import (
chatter_interest_scoring_system,
)
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
interest_score = loop.run_until_complete(
chatter_interest_scoring_system._calculate_single_message_score(
interest_score = await chatter_interest_scoring_system._calculate_single_message_score(
message=message, bot_nickname=global_config.bot.nickname
)
)
interest_value = interest_score.total_score
interest_value = interest_score.total_score
logger.debug(f"使用插件内部系统计算兴趣度: {interest_value:.3f}")
return interest_value
except Exception as e:
logger.warning(f"插件内部兴趣度计算失败: {e}")
return 0.5
except Exception as e:
logger.warning(f"插件内部兴趣度计算加载失败,使用默认值: {e}")
return 0.5
except Exception as e:
logger.error(f"计算消息兴趣度失败: {e}")
return 0.5
logger.debug(f"使用插件内部系统计算兴趣度: {interest_value:.3f}")
async def _calculate_message_interest_async(self, message: DatabaseMessages) -> float:
"""异步实现:使用插件的异步评分器正确 await 计算兴趣度并返回分数。"""
try:
try:
from src.plugins.built_in.affinity_flow_chatter.interest_scoring import (
chatter_interest_scoring_system,
)
# 直接 await 插件的异步方法
try:
interest_score = await chatter_interest_scoring_system._calculate_single_message_score(
message=message, bot_nickname=global_config.bot.nickname
)
interest_value = interest_score.total_score
logger.debug(f"使用插件内部系统计算兴趣度: {interest_value:.3f}")
return interest_value
except Exception as e:
logger.warning(f"插件内部兴趣度计算失败: {e}")
return 0.5
except Exception as e:
logger.warning(f"插件内部兴趣度计算失败,使用默认值: {e}")
interest_value = 0.5 # 默认中等兴趣度
return interest_value
logger.warning(f"插件内部兴趣度计算加载失败,使用默认值: {e}")
return 0.5
except Exception as e:
logger.error(f"计算消息兴趣度失败: {e}")
return 0.5
async def add_message_async(self, message: DatabaseMessages, skip_energy_update: bool = False) -> bool:
"""异步实现的 add_message将消息添加到 context并 await 能量更新与分发。"""
try:
self.context.add_message(message)
interest_value = await self._calculate_message_interest_async(message)
message.interest_value = interest_value
self.total_messages += 1
self.last_access_time = time.time()
if not skip_energy_update:
await self._update_stream_energy()
distribution_manager.add_stream_message(self.stream_id, 1)
logger.debug(f"添加消息到单流上下文(异步): {self.stream_id} (兴趣度: {interest_value:.3f})")
return True
except Exception as e:
logger.error(f"添加消息到单流上下文失败 (async) {self.stream_id}: {e}", exc_info=True)
return False
async def update_message_async(self, message_id: str, updates: Dict[str, Any]) -> bool:
"""异步实现的 update_message更新消息并在需要时 await 能量更新。"""
try:
self.context.update_message_info(message_id, **updates)
if "interest_value" in updates:
await self._update_stream_energy()
logger.debug(f"更新单流上下文消息(异步): {self.stream_id}/{message_id}")
return True
except Exception as e:
logger.error(f"更新单流上下文消息失败 (async) {self.stream_id}/{message_id}: {e}", exc_info=True)
return False
async def clear_context_async(self) -> bool:
"""异步实现的 clear_context清空消息并 await 能量重算。"""
try:
if hasattr(self.context, "unread_messages"):
self.context.unread_messages.clear()
if hasattr(self.context, "history_messages"):
self.context.history_messages.clear()
reset_attrs = ["interruption_count", "afc_threshold_adjustment", "last_check_time"]
for attr in reset_attrs:
if hasattr(self.context, attr):
if attr in ["interruption_count", "afc_threshold_adjustment"]:
setattr(self.context, attr, 0)
else:
setattr(self.context, attr, time.time())
await self._update_stream_energy()
logger.info(f"清空单流上下文(异步): {self.stream_id}")
return True
except Exception as e:
logger.error(f"清空单流上下文失败 (async) {self.stream_id}: {e}", exc_info=True)
return False
async def _update_stream_energy(self):
"""更新流能量"""
try:
@@ -305,4 +360,4 @@ class SingleStreamContextManager:
distribution_manager.update_stream_energy(self.stream_id, energy)
except Exception as e:
logger.error(f"更新单流能量失败 {self.stream_id}: {e}")
logger.error(f"更新单流能量失败 {self.stream_id}: {e}")

View File

@@ -75,29 +75,23 @@ class MessageManager:
logger.info("消息管理器已停止")
def add_message(self, stream_id: str, message: DatabaseMessages):
async def add_message(self, stream_id: str, message: DatabaseMessages):
"""添加消息到指定聊天流"""
try:
# 通过 ChatManager 获取 ChatStream
chat_manager = get_chat_manager()
chat_stream = chat_manager.get_stream(stream_id)
if not chat_stream:
logger.warning(f"MessageManager.add_message: 聊天流 {stream_id} 不存在")
return
# 使用 ChatStream 的 context_manager 添加消息
success = chat_stream.context_manager.add_message(message)
success = await chat_stream.context_manager.add_message(message)
if success:
logger.debug(f"添加消息到聊天流 {stream_id}: {message.message_id}")
else:
logger.warning(f"添加消息到聊天流 {stream_id} 失败")
except Exception as e:
logger.error(f"添加消息到聊天流 {stream_id} 时发生错误: {e}")
def update_message(
async def update_message(
self,
stream_id: str,
message_id: str,
@@ -107,15 +101,11 @@ class MessageManager:
):
"""更新消息信息"""
try:
# 通过 ChatManager 获取 ChatStream
chat_manager = get_chat_manager()
chat_stream = chat_manager.get_stream(stream_id)
if not chat_stream:
logger.warning(f"MessageManager.update_message: 聊天流 {stream_id} 不存在")
return
# 构建更新字典
updates = {}
if interest_value is not None:
updates["interest_value"] = interest_value
@@ -123,41 +113,30 @@ class MessageManager:
updates["actions"] = actions
if should_reply is not None:
updates["should_reply"] = should_reply
# 使用 ChatStream 的 context_manager 更新消息
if updates:
success = chat_stream.context_manager.update_message(message_id, updates)
success = await chat_stream.context_manager.update_message(message_id, updates)
if success:
logger.debug(f"更新消息 {message_id} 成功")
else:
logger.warning(f"更新消息 {message_id} 失败")
except Exception as e:
logger.error(f"更新消息 {message_id} 时发生错误: {e}")
def add_action(self, stream_id: str, message_id: str, action: str):
async def add_action(self, stream_id: str, message_id: str, action: str):
"""添加动作到消息"""
try:
# 通过 ChatManager 获取 ChatStream
chat_manager = get_chat_manager()
chat_stream = chat_manager.get_stream(stream_id)
if not chat_stream:
logger.warning(f"MessageManager.add_action: 聊天流 {stream_id} 不存在")
return
# 使用 ChatStream 的 context_manager 添加动作
# 注意:这里需要根据实际的 API 调整
# 假设我们可以通过 update_message 来添加动作
success = chat_stream.context_manager.update_message(
success = await chat_stream.context_manager.update_message(
message_id, {"actions": [action]}
)
if success:
logger.debug(f"为消息 {message_id} 添加动作 {action} 成功")
else:
logger.warning(f"为消息 {message_id} 添加动作 {action} 失败")
except Exception as e:
logger.error(f"为消息 {message_id} 添加动作时发生错误: {e}")
@@ -382,36 +361,27 @@ class MessageManager:
"start_time": self.stats.start_time,
}
def cleanup_inactive_streams(self, max_inactive_hours: int = 24):
async def cleanup_inactive_streams(self, max_inactive_hours: int = 24):
"""清理不活跃的聊天流"""
try:
# 通过 ChatManager 清理不活跃的流
chat_manager = get_chat_manager()
current_time = time.time()
max_inactive_seconds = max_inactive_hours * 3600
inactive_streams = []
for stream_id, chat_stream in chat_manager.streams.items():
# 检查最后活跃时间
if current_time - chat_stream.last_active_time > max_inactive_seconds:
inactive_streams.append(stream_id)
# 清理不活跃的流
for stream_id in inactive_streams:
try:
# 清理流的内容
chat_stream.context_manager.clear_context()
# 从 ChatManager 中移除
await chat_stream.context_manager.clear_context()
del chat_manager.streams[stream_id]
logger.info(f"清理不活跃聊天流: {stream_id}")
except Exception as e:
logger.error(f"清理聊天流 {stream_id} 失败: {e}")
if inactive_streams:
logger.info(f"已清理 {len(inactive_streams)} 个不活跃聊天流")
else:
logger.debug("没有需要清理的不活跃聊天流")
except Exception as e:
logger.error(f"清理不活跃聊天流时发生错误: {e}")