feat(chat): 实现批量动作存储并优化消息处理流程

新增批量动作记录存储功能,提升数据库写入性能。重构消息预处理逻辑,改进兴趣度计算和同步机制,优化用户信息和群组信息处理。添加配置选项控制批量存储开关,更新相关模板配置。
This commit is contained in:
Windpicker-owo
2025-10-05 17:45:44 +08:00
parent a7bc1b4f20
commit dccf1cffc9
8 changed files with 307 additions and 93 deletions

View File

@@ -1,5 +1,6 @@
import os
import re
import time
import traceback
from typing import Any
@@ -472,55 +473,110 @@ class ChatBot:
template_group_name = None
async def preprocess():
# 存储消息到数据库
from .storage import MessageStorage
try:
await MessageStorage.store_message(message, message.chat_stream)
logger.debug(f"消息已存储到数据库: {message.message_info.message_id}")
except Exception as e:
logger.error(f"存储消息到数据库失败: {e}")
traceback.print_exc()
# 使用消息管理器处理消息(保持原有功能)
from src.common.data_models.database_data_model import DatabaseMessages
message_info = message.message_info
msg_user_info = getattr(message_info, "user_info", None)
stream_user_info = getattr(message.chat_stream, "user_info", None)
group_info = getattr(message.chat_stream, "group_info", None)
message_id = message_info.message_id or ""
message_time = message_info.time if message_info.time is not None else time.time()
is_mentioned = None
if isinstance(message.is_mentioned, bool):
is_mentioned = message.is_mentioned
elif isinstance(message.is_mentioned, (int, float)):
is_mentioned = message.is_mentioned != 0
user_id = ""
user_nickname = ""
user_cardname = None
user_platform = ""
if msg_user_info:
user_id = str(getattr(msg_user_info, "user_id", "") or "")
user_nickname = getattr(msg_user_info, "user_nickname", "") or ""
user_cardname = getattr(msg_user_info, "user_cardname", None)
user_platform = getattr(msg_user_info, "platform", "") or ""
elif stream_user_info:
user_id = str(getattr(stream_user_info, "user_id", "") or "")
user_nickname = getattr(stream_user_info, "user_nickname", "") or ""
user_cardname = getattr(stream_user_info, "user_cardname", None)
user_platform = getattr(stream_user_info, "platform", "") or ""
chat_user_id = str(getattr(stream_user_info, "user_id", "") or "")
chat_user_nickname = getattr(stream_user_info, "user_nickname", "") or ""
chat_user_cardname = getattr(stream_user_info, "user_cardname", None)
chat_user_platform = getattr(stream_user_info, "platform", "") or ""
group_id = getattr(group_info, "group_id", None)
group_name = getattr(group_info, "group_name", None)
group_platform = getattr(group_info, "platform", None)
# 创建数据库消息对象
db_message = DatabaseMessages(
message_id=message.message_info.message_id,
time=message.message_info.time,
message_id=message_id,
time=float(message_time),
chat_id=message.chat_stream.stream_id,
processed_plain_text=message.processed_plain_text,
display_message=message.processed_plain_text,
is_mentioned=message.is_mentioned,
is_at=message.is_at,
is_emoji=message.is_emoji,
is_picid=message.is_picid,
is_command=message.is_command,
is_notify=message.is_notify,
user_id=message.message_info.user_info.user_id,
user_nickname=message.message_info.user_info.user_nickname,
user_cardname=message.message_info.user_info.user_cardname,
user_platform=message.message_info.user_info.platform,
is_mentioned=is_mentioned,
is_at=bool(message.is_at) if message.is_at is not None else None,
is_emoji=bool(message.is_emoji),
is_picid=bool(message.is_picid),
is_command=bool(message.is_command),
is_notify=bool(message.is_notify),
user_id=user_id,
user_nickname=user_nickname,
user_cardname=user_cardname,
user_platform=user_platform,
chat_info_stream_id=message.chat_stream.stream_id,
chat_info_platform=message.chat_stream.platform,
chat_info_create_time=message.chat_stream.create_time,
chat_info_last_active_time=message.chat_stream.last_active_time,
chat_info_user_id=message.chat_stream.user_info.user_id,
chat_info_user_nickname=message.chat_stream.user_info.user_nickname,
chat_info_user_cardname=message.chat_stream.user_info.user_cardname,
chat_info_user_platform=message.chat_stream.user_info.platform,
chat_info_create_time=float(message.chat_stream.create_time),
chat_info_last_active_time=float(message.chat_stream.last_active_time),
chat_info_user_id=chat_user_id,
chat_info_user_nickname=chat_user_nickname,
chat_info_user_cardname=chat_user_cardname,
chat_info_user_platform=chat_user_platform,
chat_info_group_id=group_id,
chat_info_group_name=group_name,
chat_info_group_platform=group_platform,
)
# 如果是群聊,添加群组信息
if message.chat_stream.group_info:
db_message.chat_info_group_id = message.chat_stream.group_info.group_id
db_message.chat_info_group_name = message.chat_stream.group_info.group_name
db_message.chat_info_group_platform = message.chat_stream.group_info.platform
# 兼容历史逻辑:显式设置群聊相关属性,便于后续逻辑通过 hasattr 判断
if group_info:
setattr(db_message, "chat_info_group_id", group_id)
setattr(db_message, "chat_info_group_name", group_name)
setattr(db_message, "chat_info_group_platform", group_platform)
else:
setattr(db_message, "chat_info_group_id", None)
setattr(db_message, "chat_info_group_name", None)
setattr(db_message, "chat_info_group_platform", None)
# 添加消息到消息管理器
await message_manager.add_message(message.chat_stream.stream_id, db_message)
logger.debug(f"消息已添加到消息管理器: {message.chat_stream.stream_id}")
# 先交给消息管理器处理,计算兴趣度等衍生数据
try:
await message_manager.add_message(message.chat_stream.stream_id, db_message)
logger.debug(f"消息已添加到消息管理器: {message.chat_stream.stream_id}")
except Exception as e:
logger.error(f"消息添加到消息管理器失败: {e}")
# 将兴趣度结果同步回原始消息,便于后续流程使用
message.interest_value = getattr(db_message, "interest_value", getattr(message, "interest_value", 0.0))
setattr(message, "should_reply", getattr(db_message, "should_reply", getattr(message, "should_reply", False)))
setattr(message, "should_act", getattr(db_message, "should_act", getattr(message, "should_act", False)))
# 存储消息到数据库,只进行一次写入
try:
await MessageStorage.store_message(message, message.chat_stream)
logger.debug(
"消息已存储到数据库: %s (interest=%.3f, should_reply=%s, should_act=%s)",
message.message_info.message_id,
getattr(message, "interest_value", -1.0),
getattr(message, "should_reply", None),
getattr(message, "should_act", None),
)
except Exception as e:
logger.error(f"存储消息到数据库失败: {e}")
traceback.print_exc()
if template_group_name:
async with global_prompt_manager.async_message_scope(template_group_name):

View File

@@ -33,6 +33,10 @@ class ChatterActionManager:
self._using_actions = component_registry.get_default_actions()
self.log_prefix: str = "ChatterActionManager"
# 批量存储支持
self._batch_storage_enabled = False
self._pending_actions = []
self._current_chat_id = None
# === 执行Action方法 ===
@@ -184,16 +188,26 @@ class ChatterActionManager:
reason = reasoning or "选择不回复"
logger.info(f"{log_prefix} 选择不回复,原因: {reason}")
# 存储no_reply信息到数据库
asyncio.create_task(database_api.store_action_info(
chat_stream=chat_stream,
action_build_into_prompt=False,
action_prompt_display=reason,
action_done=True,
thinking_id=thinking_id,
action_data={"reason": reason},
action_name="no_reply",
))
# 存储no_reply信息到数据库(支持批量存储)
if self._batch_storage_enabled:
self.add_action_to_batch(
action_name="no_reply",
action_data={"reason": reason},
thinking_id=thinking_id or "",
action_done=True,
action_build_into_prompt=False,
action_prompt_display=reason
)
else:
asyncio.create_task(database_api.store_action_info(
chat_stream=chat_stream,
action_build_into_prompt=False,
action_prompt_display=reason,
action_done=True,
thinking_id=thinking_id,
action_data={"reason": reason},
action_name="no_reply",
))
# 自动清空所有未读消息
asyncio.create_task(self._clear_all_unread_messages(chat_stream.stream_id, "no_reply"))
@@ -474,16 +488,26 @@ class ChatterActionManager:
person_name = await person_info_manager.get_value(person_id, "person_name")
action_prompt_display = f"你对{person_name}进行了回复:{reply_text}"
# 存储动作信息到数据库
await database_api.store_action_info(
chat_stream=chat_stream,
action_build_into_prompt=False,
action_prompt_display=action_prompt_display,
action_done=True,
thinking_id=thinking_id,
action_data={"reply_text": reply_text},
action_name="reply",
)
# 存储动作信息到数据库(支持批量存储)
if self._batch_storage_enabled:
self.add_action_to_batch(
action_name="reply",
action_data={"reply_text": reply_text},
thinking_id=thinking_id or "",
action_done=True,
action_build_into_prompt=False,
action_prompt_display=action_prompt_display
)
else:
await database_api.store_action_info(
chat_stream=chat_stream,
action_build_into_prompt=False,
action_prompt_display=action_prompt_display,
action_done=True,
thinking_id=thinking_id,
action_data={"reply_text": reply_text},
action_name="reply",
)
# 构建循环信息
loop_info: dict[str, Any] = {
@@ -579,3 +603,71 @@ class ChatterActionManager:
)
return reply_text
def enable_batch_storage(self, chat_id: str):
"""启用批量存储模式"""
self._batch_storage_enabled = True
self._current_chat_id = chat_id
self._pending_actions.clear()
logger.debug(f"已启用批量存储模式chat_id: {chat_id}")
def disable_batch_storage(self):
"""禁用批量存储模式"""
self._batch_storage_enabled = False
self._current_chat_id = None
logger.debug("已禁用批量存储模式")
def add_action_to_batch(self, action_name: str, action_data: dict, thinking_id: str = "",
action_done: bool = True, action_build_into_prompt: bool = False,
action_prompt_display: str = ""):
"""添加动作到批量存储列表"""
if not self._batch_storage_enabled:
return False
action_record = {
"action_name": action_name,
"action_data": action_data,
"thinking_id": thinking_id,
"action_done": action_done,
"action_build_into_prompt": action_build_into_prompt,
"action_prompt_display": action_prompt_display,
"timestamp": time.time()
}
self._pending_actions.append(action_record)
logger.debug(f"已添加动作到批量存储列表: {action_name} (当前待处理: {len(self._pending_actions)} 个)")
return True
async def flush_batch_storage(self, chat_stream):
"""批量存储所有待处理的动作记录"""
if not self._pending_actions:
logger.debug("没有待处理的动作需要批量存储")
return
try:
logger.info(f"开始批量存储 {len(self._pending_actions)} 个动作记录")
# 批量存储所有动作
stored_count = 0
for action_data in self._pending_actions:
try:
result = await database_api.store_action_info(
chat_stream=chat_stream,
action_name=action_data.get("action_name", ""),
action_data=action_data.get("action_data", {}),
action_done=action_data.get("action_done", True),
action_build_into_prompt=action_data.get("action_build_into_prompt", False),
action_prompt_display=action_data.get("action_prompt_display", ""),
thinking_id=action_data.get("thinking_id", "")
)
if result:
stored_count += 1
except Exception as e:
logger.error(f"存储单个动作记录失败: {e}")
logger.info(f"批量存储完成: 成功存储 {stored_count}/{len(self._pending_actions)} 个动作记录")
# 清空待处理列表
self._pending_actions.clear()
except Exception as e:
logger.error(f"批量存储动作记录时发生错误: {e}")