diff --git a/src/plugins/chat/__init__.py b/src/plugins/chat/__init__.py index 6dde80d24..6462d7e2f 100644 --- a/src/plugins/chat/__init__.py +++ b/src/plugins/chat/__init__.py @@ -20,7 +20,7 @@ from .chat_stream import chat_manager from ..memory_system.memory import hippocampus, memory_graph from .bot import ChatBot from .message_sender import message_manager, message_sender - +from .storage import MessageStorage # 创建LLM统计实例 llm_stats = LLMStatistics("llm_statistics.txt") @@ -148,3 +148,10 @@ async def generate_schedule_task(): await bot_schedule.initialize() if not bot_schedule.enable_output: bot_schedule.print_schedule() +async def remove_recalled_message(self) -> None: + """删除撤回消息""" + try: + self.storage = MessageStorage() + self.storage.remove_recalled_message(time.time()) + except Exception: + logger.exception("删除撤回消息失败") \ No newline at end of file diff --git a/src/plugins/chat/bot.py b/src/plugins/chat/bot.py index 4cd5043b4..bd3cd3fd3 100644 --- a/src/plugins/chat/bot.py +++ b/src/plugins/chat/bot.py @@ -8,6 +8,9 @@ from nonebot.adapters.onebot.v11 import ( PrivateMessageEvent, NoticeEvent, PokeNotifyEvent, + GroupRecallNoticeEvent, + FriendRecallNoticeEvent, + ) from ..memory_system.memory import hippocampus @@ -114,6 +117,36 @@ class ChatBot: is_emoji=False, ) message_manager.add_message(bot_message) + + if isinstance(event, GroupRecallNoticeEvent) or isinstance(event, FriendRecallNoticeEvent): + user_info = UserInfo( + user_id=event.user_id, + user_nickname=get_user_nickname(event.user_id) or None, + user_cardname=get_user_cardname(event.user_id) or None, + platform="qq", + ) + + message_cq = MessageRecvCQ( + message_id=None, + user_info=user_info, + raw_message=str("[撤回了一条消息]"), + group_info=None, + reply_message=None, + platform="qq", + ) + message_json = message_cq.to_dict() + + group_info = GroupInfo(group_id=event.group_id, group_name=None, platform="qq") + + chat = await chat_manager.get_or_create_stream( + platform=user_info.platform, user_info=user_info, group_info=group_info + ) + + await self.storage.store_recalled_message(event.message_id, time.time(), chat) + message=MessageRecv(message_json) + message.update_chat_stream(chat) + await message.process() + async def handle_message(self, event: MessageEvent, bot: Bot) -> None: """处理收到的消息""" diff --git a/src/plugins/chat/message_sender.py b/src/plugins/chat/message_sender.py index 5b580f244..1ff081bd8 100644 --- a/src/plugins/chat/message_sender.py +++ b/src/plugins/chat/message_sender.py @@ -4,7 +4,7 @@ from typing import Dict, List, Optional, Union from loguru import logger from nonebot.adapters.onebot.v11 import Bot - +from ...common.database import db from .message_cq import MessageSendCQ from .message import MessageSending, MessageThinking, MessageRecv, MessageSet @@ -36,10 +36,7 @@ class Message_Sender: message_send = MessageSendCQ(data=message_json) # logger.debug(message_send.message_info,message_send.raw_message) message_preview = truncate_message(message.processed_plain_text) - if ( - message_send.message_info.group_info - and message_send.message_info.group_info.group_id - ): + if message_send.message_info.group_info and message_send.message_info.group_info.group_id: try: await self._current_bot.send_group_msg( group_id=message.message_info.group_info.group_id, @@ -74,6 +71,23 @@ class MessageContainer: self.last_send_time = 0 self.thinking_timeout = 20 # 思考超时时间(秒) + def get_recalled_messages(self) -> List[MessageSending]: + """获取所有撤回的Message_Sending对象""" + recalled_messages = [] + + for msg in self.messages: + if isinstance(msg, MessageSending): + # 检查是否撤回,对应stream_id和message_id + if ( + db.chat_streams.find({"stream_id": msg.chat_stream.stream_id}, {"message_id": msg.message_info.message_id}) + is not None + ): + recalled_messages.append(msg) + + # 按thinking_start_time排序,时间早的在前面 + recalled_messages.sort(key=lambda x: x.thinking_start_time) + return recalled_messages + def get_timeout_messages(self) -> List[MessageSending]: """获取所有超时的Message_Sending对象(思考时间超过30秒),按thinking_start_time排序""" current_time = time.time() @@ -144,9 +158,7 @@ class MessageManager: self.containers[chat_id] = MessageContainer(chat_id) return self.containers[chat_id] - def add_message( - self, message: Union[MessageThinking, MessageSending, MessageSet] - ) -> None: + def add_message(self, message: Union[MessageThinking, MessageSending, MessageSet]) -> None: chat_stream = message.chat_stream if not chat_stream: raise ValueError("无法找到对应的聊天流") @@ -173,8 +185,20 @@ class MessageManager: if thinking_time > global_config.thinking_timeout: logger.warning(f"消息思考超时({thinking_time}秒),移除该消息") container.remove_message(message_earliest) - else: + # 检查消息是否被撤回 + recalled_messages = container.get_recalled_messages() + recalled_message_ids = [msg.message_id for msg in recalled_messages] + recalled_messages_stream_id = [msg.chat_stream.stream_id for msg in recalled_messages] + + if ( + message_earliest.message_info.message_id in recalled_message_ids + and message_earliest.chat_stream.stream_id in recalled_messages_stream_id + ): + logger.info(f"消息已被撤回,移除该消息: {message_earliest.message_id}") + container.remove_message(message_earliest) + + else: if ( message_earliest.is_head and message_earliest.update_thinking_time() > 30 @@ -189,9 +213,7 @@ class MessageManager: f"\033[1;34m[调试]\033[0m 消息“{truncate_message(message_earliest.processed_plain_text)}”正在发送中" ) - await self.storage.store_message( - message_earliest, message_earliest.chat_stream, None - ) + await self.storage.store_message(message_earliest, message_earliest.chat_stream, None) container.remove_message(message_earliest) diff --git a/src/plugins/chat/storage.py b/src/plugins/chat/storage.py index ad6662f2b..acd7db89a 100644 --- a/src/plugins/chat/storage.py +++ b/src/plugins/chat/storage.py @@ -24,4 +24,28 @@ class MessageStorage: except Exception: logger.exception("存储消息失败") + async def store_recalled_message(self, message_id: str, time: str, chat_stream:ChatStream) -> None: + """存储撤回消息到数据库""" + if "recalled_messages" not in db.list_collection_names(): + db.create_collection("recalled_messages") + else: + try: + message_data = { + "message_id": message_id, + "time": time, + "stream_id":chat_stream.stream_id, + } + db.recalled_messages.insert_one(message_data) + except Exception: + logger.exception("存储撤回消息失败") + + async def remove_recalled_message(self, time: str) -> None: + """删除撤回消息""" + try: + for msg in db.recalled_messages.distinct("message_id", {"time": time}): + if msg.time < (time-300): + db.recalled_messages.delete_one({"message_id": msg.message + }) + except Exception: + logger.exception("删除撤回消息失败") # 如果需要其他存储相关的函数,可以在这里添加