diff --git a/src/plugins/PFC/action_planner.py b/src/plugins/PFC/action_planner.py index 6eb67b7d4..372474ac0 100644 --- a/src/plugins/PFC/action_planner.py +++ b/src/plugins/PFC/action_planner.py @@ -66,14 +66,14 @@ class ActionPlanner: chat_history_list = observation_info.chat_history chat_history_text = "" for msg in chat_history_list: - chat_history_text += f"{msg}\n" + chat_history_text += f"{msg.get('detailed_plain_text', '')}\n" if observation_info.new_messages_count > 0: new_messages_list = observation_info.unprocessed_messages chat_history_text += f"有{observation_info.new_messages_count}条新消息:\n" for msg in new_messages_list: - chat_history_text += f"{msg}\n" + chat_history_text += f"{msg.get('detailed_plain_text', '')}\n" observation_info.clear_unprocessed_messages() diff --git a/src/plugins/PFC/chat_observer.py b/src/plugins/PFC/chat_observer.py index b9f704917..a766e3b4a 100644 --- a/src/plugins/PFC/chat_observer.py +++ b/src/plugins/PFC/chat_observer.py @@ -18,38 +18,36 @@ class ChatObserver: _instances: Dict[str, "ChatObserver"] = {} @classmethod - def get_instance(cls, stream_id: str, message_storage: Optional[MessageStorage] = None) -> "ChatObserver": + def get_instance(cls, stream_id: str) -> "ChatObserver": """获取或创建观察器实例 Args: stream_id: 聊天流ID - message_storage: 消息存储实现,如果为None则使用MongoDB实现 Returns: ChatObserver: 观察器实例 """ if stream_id not in cls._instances: - cls._instances[stream_id] = cls(stream_id, message_storage) + cls._instances[stream_id] = cls(stream_id) return cls._instances[stream_id] - def __init__(self, stream_id: str, message_storage: Optional[MessageStorage] = None): + def __init__(self, stream_id: str): """初始化观察器 Args: stream_id: 聊天流ID - message_storage: 消息存储实现,如果为None则使用MongoDB实现 """ if stream_id in self._instances: raise RuntimeError(f"ChatObserver for {stream_id} already exists. Use get_instance() instead.") self.stream_id = stream_id - self.message_storage = message_storage or MongoDBMessageStorage() + self.message_storage = MongoDBMessageStorage() # self.last_user_speak_time: Optional[float] = None # 对方上次发言时间 # self.last_bot_speak_time: Optional[float] = None # 机器人上次发言时间 # self.last_check_time: float = time.time() # 上次查看聊天记录时间 - self.last_message_read: Optional[str] = None # 最后读取的消息ID - self.last_message_time: Optional[float] = None # 最后一条消息的时间戳 + self.last_message_read: Optional[Dict[str, Any]] = None # 最后读取的消息ID + self.last_message_time: float = time.time() self.waiting_start_time: float = time.time() # 等待开始时间,初始化为当前时间 @@ -133,12 +131,6 @@ class ChatObserver: notification = create_cold_chat_notification(sender="chat_observer", target="pfc", is_cold=is_cold) await self.notification_manager.send_notification(notification) - async def get_new_message(self) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: - """获取上一次观察的时间点后的新消息,插入到历史记录中,并返回新消息和历史记录两个对象""" - messages = await self.message_storage.get_messages_after(self.stream_id, self.last_message_read) - for message in messages: - await self._add_message_to_history(message) - return messages, self.message_history def new_message_after(self, time_point: float) -> bool: """判断是否在指定时间点后有新消息 @@ -200,12 +192,13 @@ class ChatObserver: Returns: List[Dict[str, Any]]: 新消息列表 """ - new_messages = await self.message_storage.get_messages_after(self.stream_id, self.last_message_read) + new_messages = await self.message_storage.get_messages_after(self.stream_id, self.last_message_time) if new_messages: - self.last_message_read = new_messages[-1]["message_id"] + self.last_message_read = new_messages[-1] + self.last_message_time = new_messages[-1]["time"] - print(f"获取111111111122222222新消息: {new_messages}") + print(f"获取数据库中找到的新消息: {new_messages}") return new_messages @@ -267,6 +260,7 @@ class ChatObserver: except Exception as e: logger.error(f"更新循环出错: {e}") + logger.error(traceback.format_exc()) self._update_complete.set() # 即使出错也要设置完成事件 def trigger_update(self): diff --git a/src/plugins/PFC/message_storage.py b/src/plugins/PFC/message_storage.py index 88f409641..75bab6edd 100644 --- a/src/plugins/PFC/message_storage.py +++ b/src/plugins/PFC/message_storage.py @@ -1,18 +1,18 @@ from abc import ABC, abstractmethod from typing import List, Dict, Any, Optional from src.common.database import db - +import time class MessageStorage(ABC): """消息存储接口""" @abstractmethod - async def get_messages_after(self, chat_id: str, message_id: Optional[str] = None) -> List[Dict[str, Any]]: + async def get_messages_after(self, chat_id: str, message: Dict[str, Any]) -> List[Dict[str, Any]]: """获取指定消息ID之后的所有消息 Args: chat_id: 聊天ID - message_id: 消息ID,如果为None则获取所有消息 + message: 消息 Returns: List[Dict[str, Any]]: 消息列表 @@ -53,14 +53,11 @@ class MongoDBMessageStorage(MessageStorage): def __init__(self): self.db = db - async def get_messages_after(self, chat_id: str, message_id: Optional[str] = None) -> List[Dict[str, Any]]: + async def get_messages_after(self, chat_id: str, message_time: float) -> List[Dict[str, Any]]: query = {"chat_id": chat_id} + print(f"storage_check_message: {message_time}") - if message_id: - # 获取ID大于message_id的消息 - last_message = self.db.messages.find_one({"message_id": message_id}) - if last_message: - query["time"] = {"$gt": last_message["time"]} + query["time"] = {"$gt": message_time} return list(self.db.messages.find(query).sort("time", 1)) diff --git a/src/plugins/PFC/observation_info.py b/src/plugins/PFC/observation_info.py index 947c3205d..01f619dc3 100644 --- a/src/plugins/PFC/observation_info.py +++ b/src/plugins/PFC/observation_info.py @@ -132,11 +132,6 @@ class ObservationInfo: stream_id: 聊天流ID """ self.chat_observer = chat_observer - print(f"1919810----------------------绑定-----------------------------") - print(self.chat_observer) - print(f"1919810--------------------绑定-----------------------------") - print(self.chat_observer.notification_manager) - print(f"1919810-------------------绑定-----------------------------") self.chat_observer.notification_manager.register_handler( target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler ) @@ -144,9 +139,6 @@ class ObservationInfo: target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler ) print("1919810------------------------绑定-----------------------------") - print(f"1919810--------------------绑定-----------------------------") - print(self.chat_observer.notification_manager) - print(f"1919810-------------------绑定-----------------------------") def unbind_from_chat_observer(self): """解除与chat_observer的绑定""" @@ -235,10 +227,10 @@ class ObservationInfo: """清空未处理消息列表""" # 将未处理消息添加到历史记录中 for message in self.unprocessed_messages: - if "processed_plain_text" in message: - self.chat_history.append(message["processed_plain_text"]) + self.chat_history.append(message) # 清空未处理消息列表 self.has_unread_messages = False self.unprocessed_messages.clear() + self.chat_history_count = len(self.chat_history) self.new_messages_count = 0