refactor(chat): 迁移数据库操作为异步模式并修复相关调用
将同步数据库操作全面迁移为异步模式,主要涉及: - 将 `with get_db_session()` 改为 `async with get_db_session()` - 修复相关异步调用链,确保 await 正确传递 - 优化消息管理器、上下文管理器等核心组件的异步处理 - 移除同步的 person_id 获取方法,避免协程对象传递问题 修复 deepcopy 在 StreamContext 中的序列化问题,跳过不可序列化的 asyncio.Task 对象 删除无用的测试文件和废弃的插件清单文件
This commit is contained in:
@@ -389,94 +389,105 @@ class ChatStream:
|
||||
from sqlalchemy import select, desc
|
||||
import asyncio
|
||||
|
||||
async def _load_messages():
|
||||
def _db_query():
|
||||
with get_db_session() as session:
|
||||
# 查询该stream_id的最近20条消息
|
||||
async def _load_history_messages_async():
|
||||
"""异步加载并转换历史消息到 stream_context(在事件循环中运行)。"""
|
||||
try:
|
||||
async with get_db_session() as session:
|
||||
stmt = (
|
||||
select(Messages)
|
||||
.where(Messages.chat_info_stream_id == self.stream_id)
|
||||
.order_by(desc(Messages.time))
|
||||
.limit(global_config.chat.max_context_size)
|
||||
)
|
||||
result = session.execute(stmt)
|
||||
results = result.scalars().all()
|
||||
return results
|
||||
result = await session.execute(stmt)
|
||||
db_messages = result.scalars().all()
|
||||
|
||||
# 在线程中执行数据库查询
|
||||
db_messages = await asyncio.to_thread(_db_query)
|
||||
# 转换为DatabaseMessages对象并添加到StreamContext
|
||||
for db_msg in db_messages:
|
||||
try:
|
||||
import orjson
|
||||
|
||||
# 转换为DatabaseMessages对象并添加到StreamContext
|
||||
for db_msg in db_messages:
|
||||
actions = None
|
||||
if db_msg.actions:
|
||||
try:
|
||||
actions = orjson.loads(db_msg.actions)
|
||||
except (orjson.JSONDecodeError, TypeError):
|
||||
actions = None
|
||||
|
||||
db_message = DatabaseMessages(
|
||||
message_id=db_msg.message_id,
|
||||
time=db_msg.time,
|
||||
chat_id=db_msg.chat_id,
|
||||
reply_to=db_msg.reply_to,
|
||||
interest_value=db_msg.interest_value,
|
||||
key_words=db_msg.key_words,
|
||||
key_words_lite=db_msg.key_words_lite,
|
||||
is_mentioned=db_msg.is_mentioned,
|
||||
processed_plain_text=db_msg.processed_plain_text,
|
||||
display_message=db_msg.display_message,
|
||||
priority_mode=db_msg.priority_mode,
|
||||
priority_info=db_msg.priority_info,
|
||||
additional_config=db_msg.additional_config,
|
||||
is_emoji=db_msg.is_emoji,
|
||||
is_picid=db_msg.is_picid,
|
||||
is_command=db_msg.is_command,
|
||||
is_notify=db_msg.is_notify,
|
||||
user_id=db_msg.user_id,
|
||||
user_nickname=db_msg.user_nickname,
|
||||
user_cardname=db_msg.user_cardname,
|
||||
user_platform=db_msg.user_platform,
|
||||
chat_info_group_id=db_msg.chat_info_group_id,
|
||||
chat_info_group_name=db_msg.chat_info_group_name,
|
||||
chat_info_group_platform=db_msg.chat_info_group_platform,
|
||||
chat_info_user_id=db_msg.chat_info_user_id,
|
||||
chat_info_user_nickname=db_msg.chat_info_user_nickname,
|
||||
chat_info_user_cardname=db_msg.chat_info_user_cardname,
|
||||
chat_info_user_platform=db_msg.chat_info_user_platform,
|
||||
chat_info_stream_id=db_msg.chat_info_stream_id,
|
||||
chat_info_platform=db_msg.chat_info_platform,
|
||||
chat_info_create_time=db_msg.chat_info_create_time,
|
||||
chat_info_last_active_time=db_msg.chat_info_last_active_time,
|
||||
actions=actions,
|
||||
should_reply=getattr(db_msg, "should_reply", False) or False,
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"加载历史消息 {db_message.message_id} - interest_value: {db_message.interest_value}"
|
||||
)
|
||||
|
||||
db_message.is_read = True
|
||||
self.stream_context.history_messages.append(db_message)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"转换消息 {getattr(db_msg, 'message_id', '<unknown>')} 失败: {e}")
|
||||
continue
|
||||
|
||||
if self.stream_context.history_messages:
|
||||
logger.info(
|
||||
f"已从数据库加载 {len(self.stream_context.history_messages)} 条历史消息到聊天流 {self.stream_id}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"异步加载历史消息失败: {e}")
|
||||
|
||||
# 在已有事件循环中,避免调用 asyncio.run()
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
# 没有运行的事件循环,安全地运行并等待完成
|
||||
asyncio.run(_load_history_messages_async())
|
||||
else:
|
||||
# 如果事件循环正在运行,在后台创建任务
|
||||
if loop.is_running():
|
||||
try:
|
||||
# 从SQLAlchemy模型转换为DatabaseMessages数据模型
|
||||
import orjson
|
||||
|
||||
# 解析actions字段(JSON格式)
|
||||
actions = None
|
||||
if db_msg.actions:
|
||||
try:
|
||||
actions = orjson.loads(db_msg.actions)
|
||||
except (orjson.JSONDecodeError, TypeError):
|
||||
actions = None
|
||||
|
||||
db_message = DatabaseMessages(
|
||||
message_id=db_msg.message_id,
|
||||
time=db_msg.time,
|
||||
chat_id=db_msg.chat_id,
|
||||
reply_to=db_msg.reply_to,
|
||||
interest_value=db_msg.interest_value,
|
||||
key_words=db_msg.key_words,
|
||||
key_words_lite=db_msg.key_words_lite,
|
||||
is_mentioned=db_msg.is_mentioned,
|
||||
processed_plain_text=db_msg.processed_plain_text,
|
||||
display_message=db_msg.display_message,
|
||||
priority_mode=db_msg.priority_mode,
|
||||
priority_info=db_msg.priority_info,
|
||||
additional_config=db_msg.additional_config,
|
||||
is_emoji=db_msg.is_emoji,
|
||||
is_picid=db_msg.is_picid,
|
||||
is_command=db_msg.is_command,
|
||||
is_notify=db_msg.is_notify,
|
||||
user_id=db_msg.user_id,
|
||||
user_nickname=db_msg.user_nickname,
|
||||
user_cardname=db_msg.user_cardname,
|
||||
user_platform=db_msg.user_platform,
|
||||
chat_info_group_id=db_msg.chat_info_group_id,
|
||||
chat_info_group_name=db_msg.chat_info_group_name,
|
||||
chat_info_group_platform=db_msg.chat_info_group_platform,
|
||||
chat_info_user_id=db_msg.chat_info_user_id,
|
||||
chat_info_user_nickname=db_msg.chat_info_user_nickname,
|
||||
chat_info_user_cardname=db_msg.chat_info_user_cardname,
|
||||
chat_info_user_platform=db_msg.chat_info_user_platform,
|
||||
chat_info_stream_id=db_msg.chat_info_stream_id,
|
||||
chat_info_platform=db_msg.chat_info_platform,
|
||||
chat_info_create_time=db_msg.chat_info_create_time,
|
||||
chat_info_last_active_time=db_msg.chat_info_last_active_time,
|
||||
actions=actions,
|
||||
should_reply=getattr(db_msg, "should_reply", False) or False,
|
||||
)
|
||||
|
||||
# 添加调试日志:检查从数据库加载的interest_value
|
||||
logger.debug(
|
||||
f"加载历史消息 {db_message.message_id} - interest_value: {db_message.interest_value}"
|
||||
)
|
||||
|
||||
# 标记为已读并添加到历史消息
|
||||
db_message.is_read = True
|
||||
self.stream_context.history_messages.append(db_message)
|
||||
|
||||
asyncio.create_task(_load_history_messages_async())
|
||||
except Exception as e:
|
||||
logger.warning(f"转换消息 {db_msg.message_id} 失败: {e}")
|
||||
continue
|
||||
|
||||
if self.stream_context.history_messages:
|
||||
logger.info(
|
||||
f"已从数据库加载 {len(self.stream_context.history_messages)} 条历史消息到聊天流 {self.stream_id}"
|
||||
)
|
||||
|
||||
# 创建任务来加载历史消息
|
||||
asyncio.create_task(_load_messages())
|
||||
# 如果无法创建任务,退回到阻塞运行
|
||||
logger.warning(f"无法在事件循环中创建后台任务,尝试阻塞运行: {e}")
|
||||
asyncio.run(_load_history_messages_async())
|
||||
else:
|
||||
# loop 存在但未运行,使用 asyncio.run
|
||||
asyncio.run(_load_history_messages_async())
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"加载历史消息失败: {e}")
|
||||
@@ -498,7 +509,7 @@ class ChatManager:
|
||||
self.streams: Dict[str, ChatStream] = {} # stream_id -> ChatStream
|
||||
self.last_messages: Dict[str, "MessageRecv"] = {} # stream_id -> last_message
|
||||
# try:
|
||||
# with get_db_session() as session:
|
||||
# async with get_db_session() as session:
|
||||
# db.connect(reuse_if_open=True)
|
||||
# # 确保 ChatStreams 表存在
|
||||
# session.execute(text("CREATE TABLE IF NOT EXISTS chat_streams (stream_id TEXT PRIMARY KEY, platform TEXT, create_time REAL, last_active_time REAL, user_platform TEXT, user_id TEXT, user_nickname TEXT, user_cardname TEXT, group_platform TEXT, group_id TEXT, group_name TEXT)"))
|
||||
|
||||
Reference in New Issue
Block a user