Merge branch 'dev' into mofox-bus
This commit is contained in:
@@ -19,6 +19,9 @@ install(extra_lines=3)
|
||||
|
||||
logger = get_logger("chat_stream")
|
||||
|
||||
# 用于存储后台任务的集合,防止被垃圾回收
|
||||
_background_tasks: set[asyncio.Task] = set()
|
||||
|
||||
|
||||
class ChatStream:
|
||||
"""聊天流对象,存储一个完整的聊天上下文"""
|
||||
@@ -390,6 +393,40 @@ class ChatManager:
|
||||
key = "_".join(components)
|
||||
return hashlib.sha256(key.encode()).hexdigest()
|
||||
|
||||
async def _process_message(self, message: DatabaseMessages):
|
||||
"""
|
||||
[新] 在消息处理流程中加入用户信息同步。
|
||||
"""
|
||||
# 1. 从消息中提取用户信息
|
||||
user_info = getattr(message, "user_info", None)
|
||||
if not user_info:
|
||||
return
|
||||
|
||||
platform = getattr(user_info, "platform", None)
|
||||
user_id = getattr(user_info, "user_id", None)
|
||||
nickname = getattr(user_info, "user_nickname", None)
|
||||
cardname = getattr(user_info, "user_cardname", None)
|
||||
|
||||
if not platform or not user_id:
|
||||
return
|
||||
|
||||
# 2. 异步执行用户信息同步
|
||||
try:
|
||||
from src.person_info.person_info import get_person_info_manager
|
||||
person_info_manager = get_person_info_manager()
|
||||
|
||||
# 创建一个后台任务来执行同步,不阻塞当前流程
|
||||
sync_task = asyncio.create_task(
|
||||
person_info_manager.sync_user_info(platform, user_id, nickname, cardname)
|
||||
)
|
||||
# 将任务添加到集合中以防止被垃圾回收
|
||||
# 可以在适当的地方(如程序关闭时)清理这个集合
|
||||
_background_tasks.add(sync_task)
|
||||
sync_task.add_done_callback(_background_tasks.discard)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"创建用户信息同步任务失败: {e}")
|
||||
|
||||
async def get_or_create_stream(
|
||||
self, platform: str, user_info: DatabaseUserInfo, group_info: DatabaseGroupInfo | None = None
|
||||
) -> ChatStream:
|
||||
@@ -571,23 +608,23 @@ class ChatManager:
|
||||
except Exception as e:
|
||||
logger.debug(f"批量写入器保存聊天流失败,使用原始方法: {e}")
|
||||
|
||||
# 尝试使用数据库批量调度器(回退方案1)
|
||||
try:
|
||||
from src.common.database.db_batch_scheduler import batch_update, get_batch_session
|
||||
|
||||
async with get_batch_session():
|
||||
# 使用批量更新
|
||||
result = await batch_update(
|
||||
model_class=ChatStreams,
|
||||
conditions={"stream_id": stream_data_dict["stream_id"]},
|
||||
data=ChatManager._prepare_stream_data(stream_data_dict),
|
||||
)
|
||||
if result and result > 0:
|
||||
stream.saved = True
|
||||
logger.debug(f"聊天流 {stream.stream_id} 通过批量调度器保存成功")
|
||||
return
|
||||
except (ImportError, Exception) as e:
|
||||
logger.debug(f"批量调度器保存聊天流失败,使用原始方法: {e}")
|
||||
# 尝试使用数据库批量调度器(回退方案1) - [已废弃]
|
||||
# try:
|
||||
# from src.common.database.optimization.batch_scheduler import batch_update, get_batch_session
|
||||
#
|
||||
# async with get_batch_session():
|
||||
# # 使用批量更新
|
||||
# result = await batch_update(
|
||||
# model_class=ChatStreams,
|
||||
# conditions={"stream_id": stream_data_dict["stream_id"]},
|
||||
# data=ChatManager._prepare_stream_data(stream_data_dict),
|
||||
# )
|
||||
# if result and result > 0:
|
||||
# stream.saved = True
|
||||
# logger.debug(f"聊天流 {stream.stream_id} 通过批量调度器保存成功")
|
||||
# return
|
||||
# except (ImportError, Exception) as e:
|
||||
# logger.debug(f"批量调度器保存聊天流失败,使用原始方法: {e}")
|
||||
|
||||
# 回退到原始方法(最终方案)
|
||||
async def _db_save_stream_async(s_data_dict: dict):
|
||||
|
||||
Reference in New Issue
Block a user