重构消息存储逻辑,使用Peewee模型存储消息和撤回消息,添加时间戳处理

This commit is contained in:
墨梓柒
2025-05-15 10:24:25 +08:00
parent 224c1e3fb7
commit 2be0130d23
3 changed files with 77 additions and 33 deletions

View File

@@ -1,9 +1,10 @@
import re import re
from typing import Union from typing import Union
from ...common.database.database import db # from ...common.database.database import db # db is now Peewee's SqliteDatabase instance
from .message import MessageSending, MessageRecv from .message import MessageSending, MessageRecv
from .chat_stream import ChatStream from .chat_stream import ChatStream
from ...common.database.database_model import Messages, RecalledMessages # Import Peewee models
from src.common.logger import get_module_logger from src.common.logger import get_module_logger
logger = get_module_logger("message_storage") logger = get_module_logger("message_storage")
@@ -29,42 +30,65 @@ class MessageStorage:
else: else:
filtered_detailed_plain_text = "" filtered_detailed_plain_text = ""
message_data = { chat_info_dict = chat_stream.to_dict()
"message_id": message.message_info.message_id, user_info_dict = message.message_info.user_info.to_dict()
"time": message.message_info.time,
"chat_id": chat_stream.stream_id, # Ensure message_id is an int if the model field is IntegerField
"chat_info": chat_stream.to_dict(), try:
"user_info": message.message_info.user_info.to_dict(), msg_id = int(message.message_info.message_id)
# 使用过滤后的文本 except ValueError:
"processed_plain_text": filtered_processed_plain_text, logger.error(f"Message ID {message.message_info.message_id} is not a valid integer. Storing as 0 or consider changing model field type.")
"detailed_plain_text": filtered_detailed_plain_text, msg_id = 0 # Or handle as appropriate, e.g. skip storing, or change model field to TextField
"memorized_times": message.memorized_times,
} Messages.create(
db.messages.insert_one(message_data) message_id=msg_id,
time=float(message.message_info.time),
chat_id=chat_stream.stream_id,
# Flattened chat_info
chat_info_stream_id=chat_info_dict.get("stream_id"),
chat_info_platform=chat_info_dict.get("platform"),
chat_info_user_platform=chat_info_dict.get("user_info", {}).get("platform"),
chat_info_user_id=chat_info_dict.get("user_info", {}).get("user_id"),
chat_info_user_nickname=chat_info_dict.get("user_info", {}).get("user_nickname"),
chat_info_user_cardname=chat_info_dict.get("user_info", {}).get("user_cardname"),
chat_info_group_platform=chat_info_dict.get("group_info", {}).get("platform"),
chat_info_group_id=chat_info_dict.get("group_info", {}).get("group_id"),
chat_info_group_name=chat_info_dict.get("group_info", {}).get("group_name"),
chat_info_create_time=float(chat_info_dict.get("create_time", 0.0)),
chat_info_last_active_time=float(chat_info_dict.get("last_active_time", 0.0)),
# Flattened user_info (message sender)
user_platform=user_info_dict.get("platform"),
user_id=user_info_dict.get("user_id"),
user_nickname=user_info_dict.get("user_nickname"),
user_cardname=user_info_dict.get("user_cardname"),
# Text content
processed_plain_text=filtered_processed_plain_text,
detailed_plain_text=filtered_detailed_plain_text,
memorized_times=message.memorized_times,
)
except Exception: except Exception:
logger.exception("存储消息失败") logger.exception("存储消息失败")
@staticmethod @staticmethod
async def store_recalled_message(message_id: str, time: str, chat_stream: ChatStream) -> None: async def store_recalled_message(message_id: str, time: str, chat_stream: ChatStream) -> None:
"""存储撤回消息到数据库""" """存储撤回消息到数据库"""
if "recalled_messages" not in db.list_collection_names(): # Table creation is handled by initialize_database in database_model.py
db.create_collection("recalled_messages") try:
else: RecalledMessages.create(
try: message_id=message_id,
message_data = { time=float(time), # Assuming time is a string representing a float timestamp
"message_id": message_id, stream_id=chat_stream.stream_id,
"time": time, )
"stream_id": chat_stream.stream_id, except Exception:
} logger.exception("存储撤回消息失败")
db.recalled_messages.insert_one(message_data)
except Exception:
logger.exception("存储撤回消息失败")
@staticmethod @staticmethod
async def remove_recalled_message(time: str) -> None: async def remove_recalled_message(time: str) -> None:
"""删除撤回消息""" """删除撤回消息"""
try: try:
db.recalled_messages.delete_many({"time": {"$lt": time - 300}}) # Assuming input 'time' is a string timestamp that can be converted to float
current_time_float = float(time)
RecalledMessages.delete().where(RecalledMessages.time < (current_time_float - 300)).execute()
except Exception: except Exception:
logger.exception("删除撤回消息失败") logger.exception("删除撤回消息失败")

View File

@@ -2,6 +2,7 @@ from collections import defaultdict
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Any, Dict, Tuple, List from typing import Any, Dict, Tuple, List
from src.common.logger import get_module_logger from src.common.logger import get_module_logger
from src.manager.async_task_manager import AsyncTask from src.manager.async_task_manager import AsyncTask
@@ -82,8 +83,10 @@ class OnlineTimeRecordTask(AsyncTask):
else: else:
# 若没有记录,则插入新的在线时间记录 # 若没有记录,则插入新的在线时间记录
new_record = OnlineTime.create( new_record = OnlineTime.create(
timestamp=current_time.timestamp(), # 添加此行
start_timestamp=current_time, start_timestamp=current_time,
end_timestamp=extended_end_time, end_timestamp=extended_end_time,
duration=5, # 初始时长为5分钟
) )
self.record_id = new_record.id self.record_id = new_record.id
except Exception as e: except Exception as e:

View File

@@ -1,6 +1,8 @@
from peewee import Model, DoubleField, IntegerField, BooleanField, TextField, FloatField, DateTimeField from peewee import Model, DoubleField, IntegerField, BooleanField, TextField, FloatField, DateTimeField
from .database import db from .database import db
import datetime import datetime
from ..logger_manager import get_logger
logger = get_logger("database_model")
# 请在此处定义您的数据库实例。 # 请在此处定义您的数据库实例。
# 您需要取消注释并配置适合您的数据库的部分。 # 您需要取消注释并配置适合您的数据库的部分。
# 例如,对于 SQLite: # 例如,对于 SQLite:
@@ -189,7 +191,7 @@ class OnlineTime(BaseModel):
""" """
# timestamp: "$date": "2025-05-01T18:52:18.191Z" (存储为字符串) # timestamp: "$date": "2025-05-01T18:52:18.191Z" (存储为字符串)
timestamp = TextField() timestamp = TextField(default=datetime.datetime.now) # 时间戳
duration = IntegerField() # 时长,单位分钟 duration = IntegerField() # 时长,单位分钟
start_timestamp = DateTimeField(default=datetime.datetime.now) start_timestamp = DateTimeField(default=datetime.datetime.now)
end_timestamp = DateTimeField(index=True) end_timestamp = DateTimeField(index=True)
@@ -259,6 +261,19 @@ class ThinkingLog(BaseModel):
table_name = "thinking_logs" table_name = "thinking_logs"
class RecalledMessages(BaseModel):
"""
用于存储撤回消息记录的模型。
"""
message_id = TextField(index=True) # 被撤回的消息 ID
time = DoubleField() # 撤回操作发生的时间戳
stream_id = TextField() # 对应的 ChatStreams stream_id
class Meta:
table_name = "recalled_messages"
def create_tables(): def create_tables():
""" """
创建所有在模型中定义的数据库表。 创建所有在模型中定义的数据库表。
@@ -276,6 +291,7 @@ def create_tables():
PersonInfo, PersonInfo,
Knowledges, Knowledges,
ThinkingLog, ThinkingLog,
RecalledMessages, # 添加新模型
] ]
) )
@@ -295,6 +311,7 @@ def initialize_database():
PersonInfo, PersonInfo,
Knowledges, Knowledges,
ThinkingLog, ThinkingLog,
RecalledMessages, # 添加新模型
] ]
needs_creation = False needs_creation = False
@@ -302,23 +319,23 @@ def initialize_database():
with db: # 管理 table_exists 检查的连接 with db: # 管理 table_exists 检查的连接
for model in models: for model in models:
if not db.table_exists(model): if not db.table_exists(model):
print(f"'{model._meta.table_name}' 未找到。") logger.warning(f"'{model._meta.table_name}' 未找到。")
needs_creation = True needs_creation = True
break # 一个表丢失,无需进一步检查。 break # 一个表丢失,无需进一步检查。
except Exception as e: except Exception as e:
print(f"检查表是否存在时出错: {e}") logger.exception(f"检查表是否存在时出错: {e}")
# 如果检查失败(例如数据库不可用),则退出 # 如果检查失败(例如数据库不可用),则退出
return return
if needs_creation: if needs_creation:
print("正在初始化数据库:一个或多个表丢失。正在尝试创建所有定义的表...") logger.info("正在初始化数据库:一个或多个表丢失。正在尝试创建所有定义的表...")
try: try:
create_tables() # 此函数有其自己的 'with db:' 上下文管理。 create_tables() # 此函数有其自己的 'with db:' 上下文管理。
print("数据库表创建过程完成。") logger.info("数据库表创建过程完成。")
except Exception as e: except Exception as e:
print(f"创建表期间出错: {e}") logger.exception(f"创建表期间出错: {e}")
else: else:
print("所有数据库表均已存在。") logger.info("所有数据库表均已存在。")
# 模块加载时调用初始化函数 # 模块加载时调用初始化函数