diff --git a/src/plugins/PFC/conversation.py b/src/plugins/PFC/conversation.py index 756e01e55..d7ba00c8d 100644 --- a/src/plugins/PFC/conversation.py +++ b/src/plugins/PFC/conversation.py @@ -70,7 +70,7 @@ class Conversation: self.chat_observer = ChatObserver.get_instance(self.stream_id, self.private_name) self.chat_observer.start() self.observation_info = ObservationInfo(self.private_name) - self.observation_info.bind_to_chat_observer(self.chat_observer, self.private_name) + self.observation_info.bind_to_chat_observer(self.chat_observer) # print(self.chat_observer.get_cached_messages(limit=) self.conversation_info = ConversationInfo() diff --git a/src/plugins/PFC/observation_info.py b/src/plugins/PFC/observation_info.py index 92b09cc0c..29cc870ff 100644 --- a/src/plugins/PFC/observation_info.py +++ b/src/plugins/PFC/observation_info.py @@ -1,13 +1,12 @@ -# Programmable Friendly Conversationalist -# Prefrontal cortex from typing import List, Optional, Dict, Any, Set from maim_message import UserInfo import time from dataclasses import dataclass, field from src.common.logger import get_module_logger from .chat_observer import ChatObserver -from .chat_states import NotificationHandler, NotificationType +from .chat_states import NotificationHandler, NotificationType, Notification from src.plugins.utils.chat_message_builder import build_readable_messages +import traceback # 导入 traceback 用于调试 logger = get_module_logger("observation_info") @@ -20,183 +19,283 @@ class ObservationInfoHandler(NotificationHandler): Args: observation_info: 要更新的ObservationInfo实例 + private_name: 私聊对象的名称,用于日志记录 """ self.observation_info = observation_info + # 将 private_name 存储在 handler 实例中 self.private_name = private_name - async def handle_notification(self, notification): + async def handle_notification(self, notification: Notification): # 添加类型提示 # 获取通知类型和数据 notification_type = notification.type data = notification.data - if notification_type == NotificationType.NEW_MESSAGE: - # 处理新消息通知 - logger.debug(f"[私聊][{self.private_name}]收到新消息通知data: {data}") - message_id = data.get("message_id") - processed_plain_text = data.get("processed_plain_text") - detailed_plain_text = data.get("detailed_plain_text") - user_info = data.get("user_info") - time_value = data.get("time") + try: # 添加错误处理块 + if notification_type == NotificationType.NEW_MESSAGE: + # 处理新消息通知 + # logger.debug(f"[私聊][{self.private_name}]收到新消息通知data: {data}") # 可以在需要时取消注释 + message_id = data.get("message_id") + processed_plain_text = data.get("processed_plain_text") + detailed_plain_text = data.get("detailed_plain_text") + user_info_dict = data.get("user_info") # 先获取字典 + time_value = data.get("time") - message = { - "message_id": message_id, - "processed_plain_text": processed_plain_text, - "detailed_plain_text": detailed_plain_text, - "user_info": user_info, - "time": time_value, - } + # 确保 user_info 是字典类型再创建 UserInfo 对象 + user_info = None + if isinstance(user_info_dict, dict): + try: + user_info = UserInfo.from_dict(user_info_dict) + except Exception as e: + logger.error(f"[私聊][{self.private_name}]从字典创建 UserInfo 时出错: {e}, 字典内容: {user_info_dict}") + # 可以选择在这里返回或记录错误,避免后续代码出错 + return + elif user_info_dict is not None: + logger.warning(f"[私聊][{self.private_name}]收到的 user_info 不是预期的字典类型: {type(user_info_dict)}") + # 根据需要处理非字典情况,这里暂时返回 + return - self.observation_info.update_from_message(message) - elif notification_type == NotificationType.COLD_CHAT: - # 处理冷场通知 - is_cold = data.get("is_cold", False) - self.observation_info.update_cold_chat_status(is_cold, time.time()) + message = { + "message_id": message_id, + "processed_plain_text": processed_plain_text, + "detailed_plain_text": detailed_plain_text, + "user_info": user_info_dict, # 存储原始字典或 UserInfo 对象,取决于你的 update_from_message 如何处理 + "time": time_value, + } + # 传递 UserInfo 对象(如果成功创建)或原始字典 + await self.observation_info.update_from_message(message, user_info) # 修改:传递 user_info 对象 - elif notification_type == NotificationType.ACTIVE_CHAT: - # 处理活跃通知 - is_active = data.get("is_active", False) - self.observation_info.is_cold = not is_active - elif notification_type == NotificationType.BOT_SPEAKING: - # 处理机器人说话通知 - self.observation_info.is_typing = False - self.observation_info.last_bot_speak_time = time.time() + elif notification_type == NotificationType.COLD_CHAT: + # 处理冷场通知 + is_cold = data.get("is_cold", False) + await self.observation_info.update_cold_chat_status(is_cold, time.time()) # 修改:改为 await 调用 - elif notification_type == NotificationType.USER_SPEAKING: - # 处理用户说话通知 - self.observation_info.is_typing = False - self.observation_info.last_user_speak_time = time.time() + elif notification_type == NotificationType.ACTIVE_CHAT: + # 处理活跃通知 (通常由 COLD_CHAT 的反向状态处理) + is_active = data.get("is_active", False) + self.observation_info.is_cold = not is_active - elif notification_type == NotificationType.MESSAGE_DELETED: - # 处理消息删除通知 - message_id = data.get("message_id") - self.observation_info.unprocessed_messages = [ - msg for msg in self.observation_info.unprocessed_messages if msg.get("message_id") != message_id - ] + elif notification_type == NotificationType.BOT_SPEAKING: + # 处理机器人说话通知 (按需实现) + self.observation_info.is_typing = False + self.observation_info.last_bot_speak_time = time.time() - elif notification_type == NotificationType.USER_JOINED: - # 处理用户加入通知 - user_id = data.get("user_id") - if user_id: - self.observation_info.active_users.add(user_id) + elif notification_type == NotificationType.USER_SPEAKING: + # 处理用户说话通知 + self.observation_info.is_typing = False + self.observation_info.last_user_speak_time = time.time() - elif notification_type == NotificationType.USER_LEFT: - # 处理用户离开通知 - user_id = data.get("user_id") - if user_id: - self.observation_info.active_users.discard(user_id) + elif notification_type == NotificationType.MESSAGE_DELETED: + # 处理消息删除通知 + message_id = data.get("message_id") + # 从 unprocessed_messages 中移除被删除的消息 + original_count = len(self.observation_info.unprocessed_messages) + self.observation_info.unprocessed_messages = [ + msg for msg in self.observation_info.unprocessed_messages if msg.get("message_id") != message_id + ] + if len(self.observation_info.unprocessed_messages) < original_count: + logger.info(f"[私聊][{self.private_name}]移除了未处理的消息 (ID: {message_id})") - elif notification_type == NotificationType.ERROR: - # 处理错误通知 - error_msg = data.get("error", "") - logger.error(f"[私聊][{self.private_name}]收到错误通知: {error_msg}") + + elif notification_type == NotificationType.USER_JOINED: + # 处理用户加入通知 (如果适用私聊场景) + user_id = data.get("user_id") + if user_id: + self.observation_info.active_users.add(str(user_id)) # 确保是字符串 + + elif notification_type == NotificationType.USER_LEFT: + # 处理用户离开通知 (如果适用私聊场景) + user_id = data.get("user_id") + if user_id: + self.observation_info.active_users.discard(str(user_id)) # 确保是字符串 + + elif notification_type == NotificationType.ERROR: + # 处理错误通知 + error_msg = data.get("error", "未提供错误信息") + logger.error(f"[私聊][{self.private_name}]收到错误通知: {error_msg}") + + except Exception as e: + logger.error(f"[私聊][{self.private_name}]处理通知时发生错误: {e}") + logger.error(traceback.format_exc()) # 打印详细堆栈信息 @dataclass class ObservationInfo: """决策信息类,用于收集和管理来自chat_observer的通知信息""" + # --- 修改:添加 private_name 字段 --- + private_name: str = field(init=True) # 让 dataclass 的 __init__ 接收 private_name + # data_list - chat_history: List[str] = field(default_factory=list) + chat_history: List[Dict[str, Any]] = field(default_factory=list) # 修改:明确类型为 Dict chat_history_str: str = "" - unprocessed_messages: List[Dict[str, Any]] = field(default_factory=list) + unprocessed_messages: List[Dict[str, Any]] = field(default_factory=list) # 修改:明确类型为 Dict active_users: Set[str] = field(default_factory=set) # data last_bot_speak_time: Optional[float] = None last_user_speak_time: Optional[float] = None last_message_time: Optional[float] = None + # 添加 last_message_id + last_message_id: Optional[str] = None last_message_content: str = "" last_message_sender: Optional[str] = None bot_id: Optional[str] = None chat_history_count: int = 0 new_messages_count: int = 0 - cold_chat_duration: float = 0.0 + cold_chat_start_time: Optional[float] = None # 用于计算冷场持续时间 + cold_chat_duration: float = 0.0 # 缓存计算结果 # state - is_typing: bool = False - has_unread_messages: bool = False + is_typing: bool = False # 可能表示对方正在输入 + # has_unread_messages: bool = False # 这个状态可以通过 new_messages_count > 0 判断 is_cold_chat: bool = False - changed: bool = False + changed: bool = False # 用于标记状态是否有变化,以便外部模块决定是否重新规划 - # #spec + # #spec (暂时注释掉,如果不需要) # meta_plan_trigger: bool = False - def __post_init__(self, private_name: str = None): - """初始化后创建handler""" - self.chat_observer = None - self.handler = ObservationInfoHandler(self, private_name) - self.private_name = private_name + # --- 修改:移除 __post_init__ 的参数 --- + def __post_init__(self): + """初始化后创建handler并进行必要的设置""" + self.chat_observer: Optional[ChatObserver] = None # 添加类型提示 + self.handler = ObservationInfoHandler(self, self.private_name) - def bind_to_chat_observer(self, chat_observer: ChatObserver, private_name: str): + def bind_to_chat_observer(self, chat_observer: ChatObserver): """绑定到指定的chat_observer Args: - stream_id: 聊天流ID + chat_observer: 要绑定的 ChatObserver 实例 """ + if self.chat_observer: + logger.warning(f"[私聊][{self.private_name}]尝试重复绑定 ChatObserver") + return + self.chat_observer = chat_observer - self.chat_observer.notification_manager.register_handler( - target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler - ) - self.chat_observer.notification_manager.register_handler( - target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler - ) + try: + # 注册关心的通知类型 + self.chat_observer.notification_manager.register_handler( + target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler + ) + self.chat_observer.notification_manager.register_handler( + target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler + ) + # 可以根据需要注册更多通知类型 + # self.chat_observer.notification_manager.register_handler( + # target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler + # ) + logger.info(f"[私聊][{self.private_name}]成功绑定到 ChatObserver") + except Exception as e: + logger.error(f"[私聊][{self.private_name}]绑定到 ChatObserver 时出错: {e}") + self.chat_observer = None # 绑定失败,重置 def unbind_from_chat_observer(self): """解除与chat_observer的绑定""" - if self.chat_observer: - self.chat_observer.notification_manager.unregister_handler( - target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler - ) - self.chat_observer.notification_manager.unregister_handler( - target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler - ) - self.chat_observer = None + if self.chat_observer and hasattr(self.chat_observer, 'notification_manager'): # 增加检查 + try: + self.chat_observer.notification_manager.unregister_handler( + target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler + ) + self.chat_observer.notification_manager.unregister_handler( + target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler + ) + # 如果注册了其他类型,也要在这里注销 + # self.chat_observer.notification_manager.unregister_handler( + # target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler + # ) + logger.info(f"[私聊][{self.private_name}]成功从 ChatObserver 解绑") + except Exception as e: + logger.error(f"[私聊][{self.private_name}]从 ChatObserver 解绑时出错: {e}") + finally: # 确保 chat_observer 被重置 + self.chat_observer = None + else: + logger.warning(f"[私聊][{self.private_name}]尝试解绑时 ChatObserver 不存在或无效") - def update_from_message(self, message: Dict[str, Any]): + + # 修改:update_from_message 接收 UserInfo 对象 + async def update_from_message(self, message: Dict[str, Any], user_info: Optional[UserInfo]): """从消息更新信息 Args: - message: 消息数据 + message: 消息数据字典 + user_info: 解析后的 UserInfo 对象 (可能为 None) """ - # logger.debug(f"[私聊][{self.private_name}]更新信息from_message: {message}") - self.last_message_time = message["time"] - self.last_message_id = message["message_id"] + message_time = message.get("time") + message_id = message.get("message_id") + processed_text = message.get("processed_plain_text", "") - self.last_message_content = message.get("processed_plain_text", "") + # 只有在新消息到达时才更新 last_message 相关信息 + if message_time and message_time > (self.last_message_time or 0): + self.last_message_time = message_time + self.last_message_id = message_id + self.last_message_content = processed_text + # 重置冷场计时器 + self.is_cold_chat = False + self.cold_chat_start_time = None + self.cold_chat_duration = 0.0 - user_info = UserInfo.from_dict(message.get("user_info", {})) - self.last_message_sender = user_info.user_id - if user_info.user_id == self.bot_id: - self.last_bot_speak_time = message["time"] + if user_info: + sender_id = str(user_info.user_id) # 确保是字符串 + self.last_message_sender = sender_id + # 更新发言时间 + if sender_id == self.bot_id: + self.last_bot_speak_time = message_time + else: + self.last_user_speak_time = message_time + self.active_users.add(sender_id) # 用户发言则认为其活跃 + else: + logger.warning(f"[私聊][{self.private_name}]处理消息更新时缺少有效的 UserInfo 对象, message_id: {message_id}") + self.last_message_sender = None # 发送者未知 + + + # 将原始消息字典添加到未处理列表 + self.unprocessed_messages.append(message) + self.new_messages_count = len(self.unprocessed_messages) # 直接用列表长度 + + # logger.debug(f"[私聊][{self.private_name}]消息更新: last_time={self.last_message_time}, new_count={self.new_messages_count}") + self.update_changed() # 标记状态已改变 else: - self.last_user_speak_time = message["time"] - self.active_users.add(user_info.user_id) + # 如果消息时间戳不是最新的,可能不需要处理,或者记录一个警告 + pass + # logger.warning(f"[私聊][{self.private_name}]收到过时或无效时间戳的消息: ID={message_id}, time={message_time}") - self.new_messages_count += 1 - self.unprocessed_messages.append(message) - - self.update_changed() def update_changed(self): - """更新changed状态""" + """标记状态已改变,并重置标记""" + # logger.debug(f"[私聊][{self.private_name}]状态标记为已改变 (changed=True)") self.changed = True - def update_cold_chat_status(self, is_cold: bool, current_time: float): + + async def update_cold_chat_status(self, is_cold: bool, current_time: float): """更新冷场状态 Args: - is_cold: 是否冷场 - current_time: 当前时间 + is_cold: 是否处于冷场状态 + current_time: 当前时间戳 """ - self.is_cold_chat = is_cold - if is_cold and self.last_message_time: - self.cold_chat_duration = current_time - self.last_message_time + if is_cold != self.is_cold_chat: # 仅在状态变化时更新 + self.is_cold_chat = is_cold + if is_cold: + # 进入冷场状态 + self.cold_chat_start_time = self.last_message_time or current_time # 从最后消息时间开始算,或从当前时间开始 + logger.info(f"[私聊][{self.private_name}]进入冷场状态,开始时间: {self.cold_chat_start_time}") + else: + # 结束冷场状态 + if self.cold_chat_start_time: + self.cold_chat_duration = current_time - self.cold_chat_start_time + logger.info(f"[私聊][{self.private_name}]结束冷场状态,持续时间: {self.cold_chat_duration:.2f} 秒") + self.cold_chat_start_time = None # 重置开始时间 + self.update_changed() # 状态变化,标记改变 + + # 即使状态没变,如果是冷场状态,也更新持续时间 + if self.is_cold_chat and self.cold_chat_start_time: + self.cold_chat_duration = current_time - self.cold_chat_start_time + def get_active_duration(self) -> float: - """获取当前活跃时长 + """获取当前活跃时长 (距离最后一条消息的时间) Returns: float: 最后一条消息到现在的时长(秒) @@ -206,7 +305,7 @@ class ObservationInfo: return time.time() - self.last_message_time def get_user_response_time(self) -> Optional[float]: - """获取用户响应时间 + """获取用户最后响应时间 (距离用户最后发言的时间) Returns: Optional[float]: 用户最后发言到现在的时长(秒),如果没有用户发言则返回None @@ -216,7 +315,7 @@ class ObservationInfo: return time.time() - self.last_user_speak_time def get_bot_response_time(self) -> Optional[float]: - """获取机器人响应时间 + """获取机器人最后响应时间 (距离机器人最后发言的时间) Returns: Optional[float]: 机器人最后发言到现在的时长(秒),如果没有机器人发言则返回None @@ -226,19 +325,39 @@ class ObservationInfo: return time.time() - self.last_bot_speak_time async def clear_unprocessed_messages(self): - """清空未处理消息列表""" - # 将未处理消息添加到历史记录中 - for message in self.unprocessed_messages: - self.chat_history.append(message) - self.chat_history_str = await build_readable_messages( - self.chat_history[-20:] if len(self.chat_history) > 20 else self.chat_history, - replace_bot_name=True, - merge_messages=False, - timestamp_mode="relative", - read_mark=0.0, - ) - # 清空未处理消息列表 - self.has_unread_messages = False + """将未处理消息移入历史记录,并更新相关状态""" + if not self.unprocessed_messages: + return # 没有未处理消息,直接返回 + + # logger.debug(f"[私聊][{self.private_name}]处理 {len(self.unprocessed_messages)} 条未处理消息...") + # 将未处理消息添加到历史记录中 (确保历史记录有长度限制,避免无限增长) + max_history_len = 100 # 示例:最多保留100条历史记录 + self.chat_history.extend(self.unprocessed_messages) + if len(self.chat_history) > max_history_len: + self.chat_history = self.chat_history[-max_history_len:] + + # 更新历史记录字符串 (只使用最近一部分生成,例如20条) + history_slice_for_str = self.chat_history[-20:] + try: + self.chat_history_str = await build_readable_messages( + history_slice_for_str, + replace_bot_name=True, + merge_messages=False, + timestamp_mode="relative", + read_mark=0.0, # read_mark 可能需要根据逻辑调整 + ) + except Exception as e: + logger.error(f"[私聊][{self.private_name}]构建聊天记录字符串时出错: {e}") + self.chat_history_str = "[构建聊天记录出错]" # 提供错误提示 + + + # 清空未处理消息列表和计数 + cleared_count = len(self.unprocessed_messages) self.unprocessed_messages.clear() - self.chat_history_count = len(self.chat_history) self.new_messages_count = 0 + # self.has_unread_messages = False # 这个状态可以通过 new_messages_count 判断 + + self.chat_history_count = len(self.chat_history) # 更新历史记录总数 + # logger.debug(f"[私聊][{self.private_name}]已处理 {cleared_count} 条消息,当前历史记录 {self.chat_history_count} 条。") + + self.update_changed() # 状态改变 \ No newline at end of file