feat(chat): 实现批量动作存储并优化消息处理流程
新增批量动作记录存储功能,提升数据库写入性能。重构消息预处理逻辑,改进兴趣度计算和同步机制,优化用户信息和群组信息处理。添加配置选项控制批量存储开关,更新相关模板配置。
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
import traceback
|
||||
from typing import Any
|
||||
|
||||
@@ -468,55 +469,110 @@ class ChatBot:
|
||||
template_group_name = None
|
||||
|
||||
async def preprocess():
|
||||
# 存储消息到数据库
|
||||
from .storage import MessageStorage
|
||||
|
||||
try:
|
||||
await MessageStorage.store_message(message, message.chat_stream)
|
||||
logger.debug(f"消息已存储到数据库: {message.message_info.message_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"存储消息到数据库失败: {e}")
|
||||
traceback.print_exc()
|
||||
|
||||
# 使用消息管理器处理消息(保持原有功能)
|
||||
from src.common.data_models.database_data_model import DatabaseMessages
|
||||
|
||||
message_info = message.message_info
|
||||
msg_user_info = getattr(message_info, "user_info", None)
|
||||
stream_user_info = getattr(message.chat_stream, "user_info", None)
|
||||
group_info = getattr(message.chat_stream, "group_info", None)
|
||||
|
||||
message_id = message_info.message_id or ""
|
||||
message_time = message_info.time if message_info.time is not None else time.time()
|
||||
is_mentioned = None
|
||||
if isinstance(message.is_mentioned, bool):
|
||||
is_mentioned = message.is_mentioned
|
||||
elif isinstance(message.is_mentioned, (int, float)):
|
||||
is_mentioned = message.is_mentioned != 0
|
||||
|
||||
user_id = ""
|
||||
user_nickname = ""
|
||||
user_cardname = None
|
||||
user_platform = ""
|
||||
if msg_user_info:
|
||||
user_id = str(getattr(msg_user_info, "user_id", "") or "")
|
||||
user_nickname = getattr(msg_user_info, "user_nickname", "") or ""
|
||||
user_cardname = getattr(msg_user_info, "user_cardname", None)
|
||||
user_platform = getattr(msg_user_info, "platform", "") or ""
|
||||
elif stream_user_info:
|
||||
user_id = str(getattr(stream_user_info, "user_id", "") or "")
|
||||
user_nickname = getattr(stream_user_info, "user_nickname", "") or ""
|
||||
user_cardname = getattr(stream_user_info, "user_cardname", None)
|
||||
user_platform = getattr(stream_user_info, "platform", "") or ""
|
||||
|
||||
chat_user_id = str(getattr(stream_user_info, "user_id", "") or "")
|
||||
chat_user_nickname = getattr(stream_user_info, "user_nickname", "") or ""
|
||||
chat_user_cardname = getattr(stream_user_info, "user_cardname", None)
|
||||
chat_user_platform = getattr(stream_user_info, "platform", "") or ""
|
||||
|
||||
group_id = getattr(group_info, "group_id", None)
|
||||
group_name = getattr(group_info, "group_name", None)
|
||||
group_platform = getattr(group_info, "platform", None)
|
||||
|
||||
# 创建数据库消息对象
|
||||
db_message = DatabaseMessages(
|
||||
message_id=message.message_info.message_id,
|
||||
time=message.message_info.time,
|
||||
message_id=message_id,
|
||||
time=float(message_time),
|
||||
chat_id=message.chat_stream.stream_id,
|
||||
processed_plain_text=message.processed_plain_text,
|
||||
display_message=message.processed_plain_text,
|
||||
is_mentioned=message.is_mentioned,
|
||||
is_at=message.is_at,
|
||||
is_emoji=message.is_emoji,
|
||||
is_picid=message.is_picid,
|
||||
is_command=message.is_command,
|
||||
is_notify=message.is_notify,
|
||||
user_id=message.message_info.user_info.user_id,
|
||||
user_nickname=message.message_info.user_info.user_nickname,
|
||||
user_cardname=message.message_info.user_info.user_cardname,
|
||||
user_platform=message.message_info.user_info.platform,
|
||||
is_mentioned=is_mentioned,
|
||||
is_at=bool(message.is_at) if message.is_at is not None else None,
|
||||
is_emoji=bool(message.is_emoji),
|
||||
is_picid=bool(message.is_picid),
|
||||
is_command=bool(message.is_command),
|
||||
is_notify=bool(message.is_notify),
|
||||
user_id=user_id,
|
||||
user_nickname=user_nickname,
|
||||
user_cardname=user_cardname,
|
||||
user_platform=user_platform,
|
||||
chat_info_stream_id=message.chat_stream.stream_id,
|
||||
chat_info_platform=message.chat_stream.platform,
|
||||
chat_info_create_time=message.chat_stream.create_time,
|
||||
chat_info_last_active_time=message.chat_stream.last_active_time,
|
||||
chat_info_user_id=message.chat_stream.user_info.user_id,
|
||||
chat_info_user_nickname=message.chat_stream.user_info.user_nickname,
|
||||
chat_info_user_cardname=message.chat_stream.user_info.user_cardname,
|
||||
chat_info_user_platform=message.chat_stream.user_info.platform,
|
||||
chat_info_create_time=float(message.chat_stream.create_time),
|
||||
chat_info_last_active_time=float(message.chat_stream.last_active_time),
|
||||
chat_info_user_id=chat_user_id,
|
||||
chat_info_user_nickname=chat_user_nickname,
|
||||
chat_info_user_cardname=chat_user_cardname,
|
||||
chat_info_user_platform=chat_user_platform,
|
||||
chat_info_group_id=group_id,
|
||||
chat_info_group_name=group_name,
|
||||
chat_info_group_platform=group_platform,
|
||||
)
|
||||
|
||||
# 如果是群聊,添加群组信息
|
||||
if message.chat_stream.group_info:
|
||||
db_message.chat_info_group_id = message.chat_stream.group_info.group_id
|
||||
db_message.chat_info_group_name = message.chat_stream.group_info.group_name
|
||||
db_message.chat_info_group_platform = message.chat_stream.group_info.platform
|
||||
# 兼容历史逻辑:显式设置群聊相关属性,便于后续逻辑通过 hasattr 判断
|
||||
if group_info:
|
||||
setattr(db_message, "chat_info_group_id", group_id)
|
||||
setattr(db_message, "chat_info_group_name", group_name)
|
||||
setattr(db_message, "chat_info_group_platform", group_platform)
|
||||
else:
|
||||
setattr(db_message, "chat_info_group_id", None)
|
||||
setattr(db_message, "chat_info_group_name", None)
|
||||
setattr(db_message, "chat_info_group_platform", None)
|
||||
|
||||
# 添加消息到消息管理器
|
||||
await message_manager.add_message(message.chat_stream.stream_id, db_message)
|
||||
logger.debug(f"消息已添加到消息管理器: {message.chat_stream.stream_id}")
|
||||
# 先交给消息管理器处理,计算兴趣度等衍生数据
|
||||
try:
|
||||
await message_manager.add_message(message.chat_stream.stream_id, db_message)
|
||||
logger.debug(f"消息已添加到消息管理器: {message.chat_stream.stream_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"消息添加到消息管理器失败: {e}")
|
||||
|
||||
# 将兴趣度结果同步回原始消息,便于后续流程使用
|
||||
message.interest_value = getattr(db_message, "interest_value", getattr(message, "interest_value", 0.0))
|
||||
setattr(message, "should_reply", getattr(db_message, "should_reply", getattr(message, "should_reply", False)))
|
||||
setattr(message, "should_act", getattr(db_message, "should_act", getattr(message, "should_act", False)))
|
||||
|
||||
# 存储消息到数据库,只进行一次写入
|
||||
try:
|
||||
await MessageStorage.store_message(message, message.chat_stream)
|
||||
logger.debug(
|
||||
"消息已存储到数据库: %s (interest=%.3f, should_reply=%s, should_act=%s)",
|
||||
message.message_info.message_id,
|
||||
getattr(message, "interest_value", -1.0),
|
||||
getattr(message, "should_reply", None),
|
||||
getattr(message, "should_act", None),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"存储消息到数据库失败: {e}")
|
||||
traceback.print_exc()
|
||||
|
||||
if template_group_name:
|
||||
async with global_prompt_manager.async_message_scope(template_group_name):
|
||||
|
||||
@@ -33,6 +33,10 @@ class ChatterActionManager:
|
||||
self._using_actions = component_registry.get_default_actions()
|
||||
|
||||
self.log_prefix: str = "ChatterActionManager"
|
||||
# 批量存储支持
|
||||
self._batch_storage_enabled = False
|
||||
self._pending_actions = []
|
||||
self._current_chat_id = None
|
||||
|
||||
# === 执行Action方法 ===
|
||||
|
||||
@@ -184,16 +188,26 @@ class ChatterActionManager:
|
||||
reason = reasoning or "选择不回复"
|
||||
logger.info(f"{log_prefix} 选择不回复,原因: {reason}")
|
||||
|
||||
# 存储no_reply信息到数据库
|
||||
asyncio.create_task(database_api.store_action_info(
|
||||
chat_stream=chat_stream,
|
||||
action_build_into_prompt=False,
|
||||
action_prompt_display=reason,
|
||||
action_done=True,
|
||||
thinking_id=thinking_id,
|
||||
action_data={"reason": reason},
|
||||
action_name="no_reply",
|
||||
))
|
||||
# 存储no_reply信息到数据库(支持批量存储)
|
||||
if self._batch_storage_enabled:
|
||||
self.add_action_to_batch(
|
||||
action_name="no_reply",
|
||||
action_data={"reason": reason},
|
||||
thinking_id=thinking_id or "",
|
||||
action_done=True,
|
||||
action_build_into_prompt=False,
|
||||
action_prompt_display=reason
|
||||
)
|
||||
else:
|
||||
asyncio.create_task(database_api.store_action_info(
|
||||
chat_stream=chat_stream,
|
||||
action_build_into_prompt=False,
|
||||
action_prompt_display=reason,
|
||||
action_done=True,
|
||||
thinking_id=thinking_id,
|
||||
action_data={"reason": reason},
|
||||
action_name="no_reply",
|
||||
))
|
||||
|
||||
# 自动清空所有未读消息
|
||||
asyncio.create_task(self._clear_all_unread_messages(chat_stream.stream_id, "no_reply"))
|
||||
@@ -474,16 +488,26 @@ class ChatterActionManager:
|
||||
person_name = await person_info_manager.get_value(person_id, "person_name")
|
||||
action_prompt_display = f"你对{person_name}进行了回复:{reply_text}"
|
||||
|
||||
# 存储动作信息到数据库
|
||||
await database_api.store_action_info(
|
||||
chat_stream=chat_stream,
|
||||
action_build_into_prompt=False,
|
||||
action_prompt_display=action_prompt_display,
|
||||
action_done=True,
|
||||
thinking_id=thinking_id,
|
||||
action_data={"reply_text": reply_text},
|
||||
action_name="reply",
|
||||
)
|
||||
# 存储动作信息到数据库(支持批量存储)
|
||||
if self._batch_storage_enabled:
|
||||
self.add_action_to_batch(
|
||||
action_name="reply",
|
||||
action_data={"reply_text": reply_text},
|
||||
thinking_id=thinking_id or "",
|
||||
action_done=True,
|
||||
action_build_into_prompt=False,
|
||||
action_prompt_display=action_prompt_display
|
||||
)
|
||||
else:
|
||||
await database_api.store_action_info(
|
||||
chat_stream=chat_stream,
|
||||
action_build_into_prompt=False,
|
||||
action_prompt_display=action_prompt_display,
|
||||
action_done=True,
|
||||
thinking_id=thinking_id,
|
||||
action_data={"reply_text": reply_text},
|
||||
action_name="reply",
|
||||
)
|
||||
|
||||
# 构建循环信息
|
||||
loop_info: dict[str, Any] = {
|
||||
@@ -579,3 +603,71 @@ class ChatterActionManager:
|
||||
)
|
||||
|
||||
return reply_text
|
||||
|
||||
def enable_batch_storage(self, chat_id: str):
|
||||
"""启用批量存储模式"""
|
||||
self._batch_storage_enabled = True
|
||||
self._current_chat_id = chat_id
|
||||
self._pending_actions.clear()
|
||||
logger.debug(f"已启用批量存储模式,chat_id: {chat_id}")
|
||||
|
||||
def disable_batch_storage(self):
|
||||
"""禁用批量存储模式"""
|
||||
self._batch_storage_enabled = False
|
||||
self._current_chat_id = None
|
||||
logger.debug("已禁用批量存储模式")
|
||||
|
||||
def add_action_to_batch(self, action_name: str, action_data: dict, thinking_id: str = "",
|
||||
action_done: bool = True, action_build_into_prompt: bool = False,
|
||||
action_prompt_display: str = ""):
|
||||
"""添加动作到批量存储列表"""
|
||||
if not self._batch_storage_enabled:
|
||||
return False
|
||||
|
||||
action_record = {
|
||||
"action_name": action_name,
|
||||
"action_data": action_data,
|
||||
"thinking_id": thinking_id,
|
||||
"action_done": action_done,
|
||||
"action_build_into_prompt": action_build_into_prompt,
|
||||
"action_prompt_display": action_prompt_display,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
self._pending_actions.append(action_record)
|
||||
logger.debug(f"已添加动作到批量存储列表: {action_name} (当前待处理: {len(self._pending_actions)} 个)")
|
||||
return True
|
||||
|
||||
async def flush_batch_storage(self, chat_stream):
|
||||
"""批量存储所有待处理的动作记录"""
|
||||
if not self._pending_actions:
|
||||
logger.debug("没有待处理的动作需要批量存储")
|
||||
return
|
||||
|
||||
try:
|
||||
logger.info(f"开始批量存储 {len(self._pending_actions)} 个动作记录")
|
||||
|
||||
# 批量存储所有动作
|
||||
stored_count = 0
|
||||
for action_data in self._pending_actions:
|
||||
try:
|
||||
result = await database_api.store_action_info(
|
||||
chat_stream=chat_stream,
|
||||
action_name=action_data.get("action_name", ""),
|
||||
action_data=action_data.get("action_data", {}),
|
||||
action_done=action_data.get("action_done", True),
|
||||
action_build_into_prompt=action_data.get("action_build_into_prompt", False),
|
||||
action_prompt_display=action_data.get("action_prompt_display", ""),
|
||||
thinking_id=action_data.get("thinking_id", "")
|
||||
)
|
||||
if result:
|
||||
stored_count += 1
|
||||
except Exception as e:
|
||||
logger.error(f"存储单个动作记录失败: {e}")
|
||||
|
||||
logger.info(f"批量存储完成: 成功存储 {stored_count}/{len(self._pending_actions)} 个动作记录")
|
||||
|
||||
# 清空待处理列表
|
||||
self._pending_actions.clear()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"批量存储动作记录时发生错误: {e}")
|
||||
|
||||
Reference in New Issue
Block a user