refactor(bot): 使用统一方法转换消息为数据库对象,简化代码逻辑

This commit is contained in:
Windpicker-owo
2025-10-31 16:02:14 +08:00
parent 155b6e9d04
commit 50260818a8
3 changed files with 136 additions and 305 deletions

View File

@@ -445,93 +445,11 @@ class ChatBot:
# 处理notice消息
notice_handled = await self.handle_notice_message(message)
if notice_handled:
# notice消息已处理需要先添加到message_manager再存储
# notice消息已处理使用统一的转换方法
try:
import time
from src.common.data_models.database_data_model import DatabaseMessages
message_info = message.message_info
msg_user_info = getattr(message_info, "user_info", None)
stream_user_info = getattr(message.chat_stream, "user_info", None)
group_info = getattr(message.chat_stream, "group_info", None)
message_id = message_info.message_id or ""
message_time = message_info.time if message_info.time is not None else time.time()
user_id = ""
user_nickname = ""
user_cardname = None
user_platform = ""
if msg_user_info:
user_id = str(getattr(msg_user_info, "user_id", "") or "")
user_nickname = getattr(msg_user_info, "user_nickname", "") or ""
user_cardname = getattr(msg_user_info, "user_cardname", None)
user_platform = getattr(msg_user_info, "platform", "") or ""
elif stream_user_info:
user_id = str(getattr(stream_user_info, "user_id", "") or "")
user_nickname = getattr(stream_user_info, "user_nickname", "") or ""
user_cardname = getattr(stream_user_info, "user_cardname", None)
user_platform = getattr(stream_user_info, "platform", "") or ""
chat_user_id = str(getattr(stream_user_info, "user_id", "") or "")
chat_user_nickname = getattr(stream_user_info, "user_nickname", "") or ""
chat_user_cardname = getattr(stream_user_info, "user_cardname", None)
chat_user_platform = getattr(stream_user_info, "platform", "") or ""
group_id = getattr(group_info, "group_id", None)
group_name = getattr(group_info, "group_name", None)
group_platform = getattr(group_info, "platform", None)
# 构建additional_config确保包含is_notice标志
import json
additional_config_dict = {
"is_notice": True,
"notice_type": message.notice_type or "unknown",
"is_public_notice": bool(message.is_public_notice),
}
# 如果message_info有additional_config合并进来
if hasattr(message_info, "additional_config") and message_info.additional_config:
if isinstance(message_info.additional_config, dict):
additional_config_dict.update(message_info.additional_config)
elif isinstance(message_info.additional_config, str):
try:
existing_config = json.loads(message_info.additional_config)
additional_config_dict.update(existing_config)
except Exception:
pass
additional_config_json = json.dumps(additional_config_dict)
# 创建数据库消息对象
db_message = DatabaseMessages(
message_id=message_id,
time=float(message_time),
chat_id=message.chat_stream.stream_id,
processed_plain_text=message.processed_plain_text,
display_message=message.processed_plain_text,
is_notify=bool(message.is_notify),
is_public_notice=bool(message.is_public_notice),
notice_type=message.notice_type,
additional_config=additional_config_json,
user_id=user_id,
user_nickname=user_nickname,
user_cardname=user_cardname,
user_platform=user_platform,
chat_info_stream_id=message.chat_stream.stream_id,
chat_info_platform=message.chat_stream.platform,
chat_info_create_time=float(message.chat_stream.create_time),
chat_info_last_active_time=float(message.chat_stream.last_active_time),
chat_info_user_id=chat_user_id,
chat_info_user_nickname=chat_user_nickname,
chat_info_user_cardname=chat_user_cardname,
chat_info_user_platform=chat_user_platform,
chat_info_group_id=group_id,
chat_info_group_name=group_name,
chat_info_group_platform=group_platform,
)
# 直接转换为 DatabaseMessages
db_message = message.to_database_message()
# 添加到message_manager这会将notice添加到全局notice管理器
await message_manager.add_message(message.chat_stream.stream_id, db_message)
logger.info(f"✅ Notice消息已添加到message_manager: type={message.notice_type}, stream={message.chat_stream.stream_id}")
@@ -589,125 +507,11 @@ class ChatBot:
template_group_name = None
async def preprocess():
import time
from src.common.data_models.database_data_model import DatabaseMessages
message_info = message.message_info
msg_user_info = getattr(message_info, "user_info", None)
stream_user_info = getattr(message.chat_stream, "user_info", None)
# 使用统一的转换方法创建数据库消息对象
db_message = message.to_database_message()
group_info = getattr(message.chat_stream, "group_info", None)
message_id = message_info.message_id or ""
message_time = message_info.time if hasattr(message_info, "time") and message_info.time is not None else time.time()
is_mentioned = None
if isinstance(message.is_mentioned, bool):
is_mentioned = message.is_mentioned
elif isinstance(message.is_mentioned, int | float):
is_mentioned = message.is_mentioned != 0
user_id = ""
user_nickname = ""
user_cardname = None
user_platform = ""
if msg_user_info:
user_id = str(getattr(msg_user_info, "user_id", "") or "")
user_nickname = getattr(msg_user_info, "user_nickname", "") or ""
user_cardname = getattr(msg_user_info, "user_cardname", None)
user_platform = getattr(msg_user_info, "platform", "") or ""
elif stream_user_info:
user_id = str(getattr(stream_user_info, "user_id", "") or "")
user_nickname = getattr(stream_user_info, "user_nickname", "") or ""
user_cardname = getattr(stream_user_info, "user_cardname", None)
user_platform = getattr(stream_user_info, "platform", "") or ""
chat_user_id = str(getattr(stream_user_info, "user_id", "") or "")
chat_user_nickname = getattr(stream_user_info, "user_nickname", "") or ""
chat_user_cardname = getattr(stream_user_info, "user_cardname", None)
chat_user_platform = getattr(stream_user_info, "platform", "") or ""
group_id = getattr(group_info, "group_id", None)
group_name = getattr(group_info, "group_name", None)
group_platform = getattr(group_info, "platform", None)
# 准备 additional_config将 format_info 嵌入其中
additional_config_str = None
try:
import orjson
additional_config_data = {}
# 首先获取adapter传递的additional_config
if hasattr(message_info, 'additional_config') and message_info.additional_config:
if isinstance(message_info.additional_config, dict):
additional_config_data = message_info.additional_config.copy()
elif isinstance(message_info.additional_config, str):
try:
additional_config_data = orjson.loads(message_info.additional_config)
except Exception as e:
logger.warning(f"无法解析 additional_config JSON: {e}")
additional_config_data = {}
# 然后添加format_info到additional_config中
if hasattr(message_info, 'format_info') and message_info.format_info:
try:
format_info_dict = message_info.format_info.to_dict()
additional_config_data["format_info"] = format_info_dict
logger.debug(f"[bot.py] 嵌入 format_info 到 additional_config: {format_info_dict}")
except Exception as e:
logger.warning(f"将 format_info 转换为字典失败: {e}")
else:
logger.warning(f"[bot.py] [问题] 消息缺少 format_info: message_id={message_id}")
# 序列化为JSON字符串
if additional_config_data:
additional_config_str = orjson.dumps(additional_config_data).decode("utf-8")
except Exception as e:
logger.error(f"准备 additional_config 失败: {e}")
# 创建数据库消息对象
db_message = DatabaseMessages(
message_id=message_id,
time=float(message_time),
chat_id=message.chat_stream.stream_id,
processed_plain_text=message.processed_plain_text,
display_message=message.processed_plain_text,
is_mentioned=is_mentioned,
is_at=bool(message.is_at) if message.is_at is not None else None,
is_emoji=bool(message.is_emoji),
is_picid=bool(message.is_picid),
is_command=bool(message.is_command),
is_notify=bool(message.is_notify),
is_public_notice=bool(message.is_public_notice),
notice_type=message.notice_type,
additional_config=additional_config_str,
user_id=user_id,
user_nickname=user_nickname,
user_cardname=user_cardname,
user_platform=user_platform,
chat_info_stream_id=message.chat_stream.stream_id,
chat_info_platform=message.chat_stream.platform,
chat_info_create_time=float(message.chat_stream.create_time),
chat_info_last_active_time=float(message.chat_stream.last_active_time),
chat_info_user_id=chat_user_id,
chat_info_user_nickname=chat_user_nickname,
chat_info_user_cardname=chat_user_cardname,
chat_info_user_platform=chat_user_platform,
chat_info_group_id=group_id,
chat_info_group_name=group_name,
chat_info_group_platform=group_platform,
)
# 兼容历史逻辑:显式设置群聊相关属性,便于后续逻辑通过 hasattr 判断
if group_info:
setattr(db_message, "chat_info_group_id", group_id)
setattr(db_message, "chat_info_group_name", group_name)
setattr(db_message, "chat_info_group_platform", group_platform)
else:
setattr(db_message, "chat_info_group_id", None)
setattr(db_message, "chat_info_group_name", None)
setattr(db_message, "chat_info_group_platform", None)
# 先交给消息管理器处理,计算兴趣度等衍生数据
try:
# 在将消息添加到管理器之前进行最终的静默检查

View File

@@ -144,6 +144,134 @@ class MessageRecv(Message):
def update_chat_stream(self, chat_stream: "ChatStream"):
self.chat_stream = chat_stream
def to_database_message(self) -> "DatabaseMessages":
"""将 MessageRecv 转换为 DatabaseMessages 对象
Returns:
DatabaseMessages: 数据库消息对象
"""
from src.common.data_models.database_data_model import DatabaseMessages
import json
import time
message_info = self.message_info
msg_user_info = getattr(message_info, "user_info", None)
stream_user_info = getattr(self.chat_stream, "user_info", None) if self.chat_stream else None
group_info = getattr(self.chat_stream, "group_info", None) if self.chat_stream else None
message_id = message_info.message_id or ""
message_time = message_info.time if hasattr(message_info, "time") and message_info.time is not None else time.time()
is_mentioned = None
if isinstance(self.is_mentioned, bool):
is_mentioned = self.is_mentioned
elif isinstance(self.is_mentioned, int | float):
is_mentioned = self.is_mentioned != 0
# 提取用户信息
user_id = ""
user_nickname = ""
user_cardname = None
user_platform = ""
if msg_user_info:
user_id = str(getattr(msg_user_info, "user_id", "") or "")
user_nickname = getattr(msg_user_info, "user_nickname", "") or ""
user_cardname = getattr(msg_user_info, "user_cardname", None)
user_platform = getattr(msg_user_info, "platform", "") or ""
elif stream_user_info:
user_id = str(getattr(stream_user_info, "user_id", "") or "")
user_nickname = getattr(stream_user_info, "user_nickname", "") or ""
user_cardname = getattr(stream_user_info, "user_cardname", None)
user_platform = getattr(stream_user_info, "platform", "") or ""
# 提取聊天流信息
chat_user_id = str(getattr(stream_user_info, "user_id", "") or "") if stream_user_info else ""
chat_user_nickname = getattr(stream_user_info, "user_nickname", "") or "" if stream_user_info else ""
chat_user_cardname = getattr(stream_user_info, "user_cardname", None) if stream_user_info else None
chat_user_platform = getattr(stream_user_info, "platform", "") or "" if stream_user_info else ""
group_id = getattr(group_info, "group_id", None) if group_info else None
group_name = getattr(group_info, "group_name", None) if group_info else None
group_platform = getattr(group_info, "platform", None) if group_info else None
# 准备 additional_config
additional_config_str = None
try:
import orjson
additional_config_data = {}
# 首先获取adapter传递的additional_config
if hasattr(message_info, 'additional_config') and message_info.additional_config:
if isinstance(message_info.additional_config, dict):
additional_config_data = message_info.additional_config.copy()
elif isinstance(message_info.additional_config, str):
try:
additional_config_data = orjson.loads(message_info.additional_config)
except Exception as e:
logger.warning(f"无法解析 additional_config JSON: {e}")
additional_config_data = {}
# 添加notice相关标志
if self.is_notify:
additional_config_data["is_notice"] = True
additional_config_data["notice_type"] = self.notice_type or "unknown"
additional_config_data["is_public_notice"] = bool(self.is_public_notice)
# 添加format_info到additional_config中
if hasattr(message_info, 'format_info') and message_info.format_info:
try:
format_info_dict = message_info.format_info.to_dict()
additional_config_data["format_info"] = format_info_dict
logger.debug(f"[message.py] 嵌入 format_info 到 additional_config: {format_info_dict}")
except Exception as e:
logger.warning(f"将 format_info 转换为字典失败: {e}")
# 序列化为JSON字符串
if additional_config_data:
additional_config_str = orjson.dumps(additional_config_data).decode("utf-8")
except Exception as e:
logger.error(f"准备 additional_config 失败: {e}")
# 创建数据库消息对象
db_message = DatabaseMessages(
message_id=message_id,
time=float(message_time),
chat_id=self.chat_stream.stream_id if self.chat_stream else "",
processed_plain_text=self.processed_plain_text,
display_message=self.processed_plain_text,
is_mentioned=is_mentioned,
is_at=bool(self.is_at) if self.is_at is not None else None,
is_emoji=bool(self.is_emoji),
is_picid=bool(self.is_picid),
is_command=bool(self.is_command),
is_notify=bool(self.is_notify),
is_public_notice=bool(self.is_public_notice),
notice_type=self.notice_type,
additional_config=additional_config_str,
user_id=user_id,
user_nickname=user_nickname,
user_cardname=user_cardname,
user_platform=user_platform,
chat_info_stream_id=self.chat_stream.stream_id if self.chat_stream else "",
chat_info_platform=self.chat_stream.platform if self.chat_stream else "",
chat_info_create_time=float(self.chat_stream.create_time) if self.chat_stream else 0.0,
chat_info_last_active_time=float(self.chat_stream.last_active_time) if self.chat_stream else 0.0,
chat_info_user_id=chat_user_id,
chat_info_user_nickname=chat_user_nickname,
chat_info_user_cardname=chat_user_cardname,
chat_info_user_platform=chat_user_platform,
chat_info_group_id=group_id,
chat_info_group_name=group_name,
chat_info_group_platform=group_platform,
)
# 同步兴趣度等衍生属性
db_message.interest_value = getattr(self, "interest_value", 0.0)
setattr(db_message, "should_reply", getattr(self, "should_reply", False))
setattr(db_message, "should_act", getattr(self, "should_act", False))
return db_message
async def process(self) -> None:
"""处理消息内容,生成纯文本和详细文本
@@ -479,64 +607,9 @@ class MessageSending(MessageProcessBase):
return self.message_info.group_info is None or self.message_info.group_info.group_id is None
@dataclass
class MessageSet:
"""消息集合类,可以存储多个发送消息"""
def __init__(self, chat_stream: "ChatStream", message_id: str):
self.chat_stream = chat_stream
self.message_id = message_id
self.messages: list[MessageSending] = []
self.time = round(time.time(), 3) # 保留3位小数
def add_message(self, message: MessageSending) -> None:
"""添加消息到集合"""
if not isinstance(message, MessageSending):
raise TypeError("MessageSet只能添加MessageSending类型的消息")
self.messages.append(message)
self.messages.sort(key=lambda x: x.message_info.time) # type: ignore
def get_message_by_index(self, index: int) -> MessageSending | None:
"""通过索引获取消息"""
return self.messages[index] if 0 <= index < len(self.messages) else None
def get_message_by_time(self, target_time: float) -> MessageSending | None:
"""获取最接近指定时间的消息"""
if not self.messages:
return None
left, right = 0, len(self.messages) - 1
while left < right:
mid = (left + right) // 2
if self.messages[mid].message_info.time < target_time: # type: ignore
left = mid + 1
else:
right = mid
return self.messages[left]
def clear_messages(self) -> None:
"""清空所有消息"""
self.messages.clear()
def remove_message(self, message: MessageSending) -> bool:
"""移除指定消息"""
if message in self.messages:
self.messages.remove(message)
return True
return False
def __str__(self) -> str:
return f"MessageSet(id={self.message_id}, count={len(self.messages)})"
def __len__(self) -> int:
return len(self.messages)
def message_recv_from_dict(message_dict: dict) -> MessageRecv:
return MessageRecv(message_dict)
def message_from_db_dict(db_dict: dict) -> MessageRecv:
"""从数据库字典创建MessageRecv实例"""
# 转换扁平的数据库字典为嵌套结构

View File

@@ -32,10 +32,8 @@ from src.common.logger import get_logger
from src.config.config import global_config, model_config
from src.individuality.individuality import get_individuality
from src.llm_models.utils_model import LLMRequest
from src.mais4u.mai_think import mai_thinking_manager
# 旧记忆系统已被移除
# 旧记忆系统已被移除
from src.mood.mood_manager import mood_manager
from src.person_info.person_info import get_person_info_manager
from src.plugin_system.apis import llm_api
@@ -1161,50 +1159,6 @@ class DefaultReplyer:
return interest_scores
def build_mai_think_context(
self,
chat_id: str,
memory_block: str,
relation_info: str,
time_block: str,
chat_target_1: str,
chat_target_2: str,
mood_prompt: str,
identity_block: str,
sender: str,
target: str,
chat_info: str,
) -> Any:
"""构建 mai_think 上下文信息
Args:
chat_id: 聊天ID
memory_block: 记忆块内容
relation_info: 关系信息
time_block: 时间块内容
chat_target_1: 聊天目标1
chat_target_2: 聊天目标2
mood_prompt: 情绪提示
identity_block: 身份块内容
sender: 发送者名称
target: 目标消息内容
chat_info: 聊天信息
Returns:
Any: mai_think 实例
"""
mai_think = mai_thinking_manager.get_mai_think(chat_id)
mai_think.memory_block = memory_block
mai_think.relation_info_block = relation_info
mai_think.time_block = time_block
mai_think.chat_target = chat_target_1
mai_think.chat_target_2 = chat_target_2
mai_think.chat_info = chat_info
mai_think.mood_state = mood_prompt
mai_think.identity = identity_block
mai_think.sender = sender
mai_think.target = target
return mai_think
async def build_prompt_reply_context(
self,