feat(person_info): 实施基于稳健 ID 的用户信息同步。本次提交重构了用户识别和信息检索系统,使其基于稳定的平台和用户 ID,不再依赖脆弱的姓名解析机制。同时引入了自动后台进程,以保持用户信息的实时更新。主要变更包括:

- 在 `PersonInfoManager` 中新增 `sync_user_info` 方法,根据 `platform` 和 `user_id` 来创建和更新用户记录。
- `ChatManager` 现在会在处理消息时触发该同步作为非阻塞后台任务,确保用户数据(如昵称)保持最新。
- 提示生成逻辑,特别是关系和上下文信息的生成,已重构为使用稳定的 `user_id`,而非从回复消息内容中解析姓名。
- `PromptParameters` 已被扩展,以在整个回复生成流程中传递 `platform` 和 `user_id`。
- 弃用依赖名称到 ID 查找的脆弱方法。
This commit is contained in:
tt-P607
2025-11-25 22:01:41 +08:00
parent 714bef7c2b
commit fd65d8c4eb
5 changed files with 216 additions and 80 deletions

View File

@@ -20,6 +20,9 @@ install(extra_lines=3)
logger = get_logger("chat_stream")
# 用于存储后台任务的集合,防止被垃圾回收
_background_tasks: set[asyncio.Task] = set()
class ChatStream:
"""聊天流对象,存储一个完整的聊天上下文"""
@@ -406,6 +409,40 @@ class ChatManager:
key = "_".join(components)
return hashlib.sha256(key.encode()).hexdigest()
async def _process_message(self, message: DatabaseMessages):
"""
[新] 在消息处理流程中加入用户信息同步。
"""
# 1. 从消息中提取用户信息
user_info = getattr(message, "user_info", None)
if not user_info:
return
platform = getattr(user_info, "platform", None)
user_id = getattr(user_info, "user_id", None)
nickname = getattr(user_info, "user_nickname", None)
cardname = getattr(user_info, "user_cardname", None)
if not platform or not user_id:
return
# 2. 异步执行用户信息同步
try:
from src.person_info.person_info import get_person_info_manager
person_info_manager = get_person_info_manager()
# 创建一个后台任务来执行同步,不阻塞当前流程
sync_task = asyncio.create_task(
person_info_manager.sync_user_info(platform, user_id, nickname, cardname)
)
# 将任务添加到集合中以防止被垃圾回收
# 可以在适当的地方(如程序关闭时)清理这个集合
_background_tasks.add(sync_task)
sync_task.add_done_callback(_background_tasks.discard)
except Exception as e:
logger.error(f"创建用户信息同步任务失败: {e}")
async def get_or_create_stream(
self, platform: str, user_info: UserInfo, group_info: GroupInfo | None = None
) -> ChatStream:
@@ -437,7 +474,10 @@ class ChatManager:
# 检查是否有最后一条消息(现在使用 DatabaseMessages
from src.common.data_models.database_data_model import DatabaseMessages
if stream_id in self.last_messages and isinstance(self.last_messages[stream_id], DatabaseMessages):
await stream.set_context(self.last_messages[stream_id])
last_message = self.last_messages[stream_id]
await stream.set_context(last_message)
# 在这里调用消息处理
await self._process_message(last_message)
else:
logger.debug(f"聊天流 {stream_id} 不在最后消息列表中,可能是新创建的或还没有消息")
return stream
@@ -634,23 +674,23 @@ class ChatManager:
except Exception as e:
logger.debug(f"批量写入器保存聊天流失败,使用原始方法: {e}")
# 尝试使用数据库批量调度器回退方案1
try:
from src.common.database.db_batch_scheduler import batch_update, get_batch_session
async with get_batch_session():
# 使用批量更新
result = await batch_update(
model_class=ChatStreams,
conditions={"stream_id": stream_data_dict["stream_id"]},
data=ChatManager._prepare_stream_data(stream_data_dict),
)
if result and result > 0:
stream.saved = True
logger.debug(f"聊天流 {stream.stream_id} 通过批量调度器保存成功")
return
except (ImportError, Exception) as e:
logger.debug(f"批量调度器保存聊天流失败,使用原始方法: {e}")
# 尝试使用数据库批量调度器回退方案1 - [已废弃]
# try:
# from src.common.database.optimization.batch_scheduler import batch_update, get_batch_session
#
# async with get_batch_session():
# # 使用批量更新
# result = await batch_update(
# model_class=ChatStreams,
# conditions={"stream_id": stream_data_dict["stream_id"]},
# data=ChatManager._prepare_stream_data(stream_data_dict),
# )
# if result and result > 0:
# stream.saved = True
# logger.debug(f"聊天流 {stream.stream_id} 通过批量调度器保存成功")
# return
# except (ImportError, Exception) as e:
# logger.debug(f"批量调度器保存聊天流失败,使用原始方法: {e}")
# 回退到原始方法(最终方案)
async def _db_save_stream_async(s_data_dict: dict):