ai哥说的对

This commit is contained in:
Bakadax
2025-04-28 18:46:29 +08:00
parent ef24f013de
commit d07e2669c8
2 changed files with 244 additions and 125 deletions

View File

@@ -70,7 +70,7 @@ class Conversation:
self.chat_observer = ChatObserver.get_instance(self.stream_id, self.private_name) self.chat_observer = ChatObserver.get_instance(self.stream_id, self.private_name)
self.chat_observer.start() self.chat_observer.start()
self.observation_info = ObservationInfo(self.private_name) self.observation_info = ObservationInfo(self.private_name)
self.observation_info.bind_to_chat_observer(self.chat_observer, self.private_name) self.observation_info.bind_to_chat_observer(self.chat_observer)
# print(self.chat_observer.get_cached_messages(limit=) # print(self.chat_observer.get_cached_messages(limit=)
self.conversation_info = ConversationInfo() self.conversation_info = ConversationInfo()

View File

@@ -1,13 +1,12 @@
# Programmable Friendly Conversationalist
# Prefrontal cortex
from typing import List, Optional, Dict, Any, Set from typing import List, Optional, Dict, Any, Set
from maim_message import UserInfo from maim_message import UserInfo
import time import time
from dataclasses import dataclass, field from dataclasses import dataclass, field
from src.common.logger import get_module_logger from src.common.logger import get_module_logger
from .chat_observer import ChatObserver from .chat_observer import ChatObserver
from .chat_states import NotificationHandler, NotificationType from .chat_states import NotificationHandler, NotificationType, Notification
from src.plugins.utils.chat_message_builder import build_readable_messages from src.plugins.utils.chat_message_builder import build_readable_messages
import traceback # 导入 traceback 用于调试
logger = get_module_logger("observation_info") logger = get_module_logger("observation_info")
@@ -20,183 +19,283 @@ class ObservationInfoHandler(NotificationHandler):
Args: Args:
observation_info: 要更新的ObservationInfo实例 observation_info: 要更新的ObservationInfo实例
private_name: 私聊对象的名称,用于日志记录
""" """
self.observation_info = observation_info self.observation_info = observation_info
# 将 private_name 存储在 handler 实例中
self.private_name = private_name self.private_name = private_name
async def handle_notification(self, notification): async def handle_notification(self, notification: Notification): # 添加类型提示
# 获取通知类型和数据 # 获取通知类型和数据
notification_type = notification.type notification_type = notification.type
data = notification.data data = notification.data
if notification_type == NotificationType.NEW_MESSAGE: try: # 添加错误处理块
# 处理新消息通知 if notification_type == NotificationType.NEW_MESSAGE:
logger.debug(f"[私聊][{self.private_name}]收到新消息通知data: {data}") # 处理新消息通知
message_id = data.get("message_id") # logger.debug(f"[私聊][{self.private_name}]收到新消息通知data: {data}") # 可以在需要时取消注释
processed_plain_text = data.get("processed_plain_text") message_id = data.get("message_id")
detailed_plain_text = data.get("detailed_plain_text") processed_plain_text = data.get("processed_plain_text")
user_info = data.get("user_info") detailed_plain_text = data.get("detailed_plain_text")
time_value = data.get("time") user_info_dict = data.get("user_info") # 先获取字典
time_value = data.get("time")
message = { # 确保 user_info 是字典类型再创建 UserInfo 对象
"message_id": message_id, user_info = None
"processed_plain_text": processed_plain_text, if isinstance(user_info_dict, dict):
"detailed_plain_text": detailed_plain_text, try:
"user_info": user_info, user_info = UserInfo.from_dict(user_info_dict)
"time": time_value, except Exception as e:
} logger.error(f"[私聊][{self.private_name}]从字典创建 UserInfo 时出错: {e}, 字典内容: {user_info_dict}")
# 可以选择在这里返回或记录错误,避免后续代码出错
return
elif user_info_dict is not None:
logger.warning(f"[私聊][{self.private_name}]收到的 user_info 不是预期的字典类型: {type(user_info_dict)}")
# 根据需要处理非字典情况,这里暂时返回
return
self.observation_info.update_from_message(message)
elif notification_type == NotificationType.COLD_CHAT: message = {
# 处理冷场通知 "message_id": message_id,
is_cold = data.get("is_cold", False) "processed_plain_text": processed_plain_text,
self.observation_info.update_cold_chat_status(is_cold, time.time()) "detailed_plain_text": detailed_plain_text,
"user_info": user_info_dict, # 存储原始字典或 UserInfo 对象,取决于你的 update_from_message 如何处理
"time": time_value,
}
# 传递 UserInfo 对象(如果成功创建)或原始字典
await self.observation_info.update_from_message(message, user_info) # 修改:传递 user_info 对象
elif notification_type == NotificationType.ACTIVE_CHAT:
# 处理活跃通知
is_active = data.get("is_active", False)
self.observation_info.is_cold = not is_active
elif notification_type == NotificationType.BOT_SPEAKING: elif notification_type == NotificationType.COLD_CHAT:
# 处理机器人说话通知 # 处理冷场通知
self.observation_info.is_typing = False is_cold = data.get("is_cold", False)
self.observation_info.last_bot_speak_time = time.time() await self.observation_info.update_cold_chat_status(is_cold, time.time()) # 修改:改为 await 调用
elif notification_type == NotificationType.USER_SPEAKING: elif notification_type == NotificationType.ACTIVE_CHAT:
# 处理用户说话通知 # 处理活跃通知 (通常由 COLD_CHAT 的反向状态处理)
self.observation_info.is_typing = False is_active = data.get("is_active", False)
self.observation_info.last_user_speak_time = time.time() self.observation_info.is_cold = not is_active
elif notification_type == NotificationType.MESSAGE_DELETED: elif notification_type == NotificationType.BOT_SPEAKING:
# 处理消息删除通知 # 处理机器人说话通知 (按需实现)
message_id = data.get("message_id") self.observation_info.is_typing = False
self.observation_info.unprocessed_messages = [ self.observation_info.last_bot_speak_time = time.time()
msg for msg in self.observation_info.unprocessed_messages if msg.get("message_id") != message_id
]
elif notification_type == NotificationType.USER_JOINED: elif notification_type == NotificationType.USER_SPEAKING:
# 处理用户加入通知 # 处理用户说话通知
user_id = data.get("user_id") self.observation_info.is_typing = False
if user_id: self.observation_info.last_user_speak_time = time.time()
self.observation_info.active_users.add(user_id)
elif notification_type == NotificationType.USER_LEFT: elif notification_type == NotificationType.MESSAGE_DELETED:
# 处理用户离开通知 # 处理消息删除通知
user_id = data.get("user_id") message_id = data.get("message_id")
if user_id: # 从 unprocessed_messages 中移除被删除的消息
self.observation_info.active_users.discard(user_id) original_count = len(self.observation_info.unprocessed_messages)
self.observation_info.unprocessed_messages = [
msg for msg in self.observation_info.unprocessed_messages if msg.get("message_id") != message_id
]
if len(self.observation_info.unprocessed_messages) < original_count:
logger.info(f"[私聊][{self.private_name}]移除了未处理的消息 (ID: {message_id})")
elif notification_type == NotificationType.ERROR:
# 处理错误通知 elif notification_type == NotificationType.USER_JOINED:
error_msg = data.get("error", "") # 处理用户加入通知 (如果适用私聊场景)
logger.error(f"[私聊][{self.private_name}]收到错误通知: {error_msg}") user_id = data.get("user_id")
if user_id:
self.observation_info.active_users.add(str(user_id)) # 确保是字符串
elif notification_type == NotificationType.USER_LEFT:
# 处理用户离开通知 (如果适用私聊场景)
user_id = data.get("user_id")
if user_id:
self.observation_info.active_users.discard(str(user_id)) # 确保是字符串
elif notification_type == NotificationType.ERROR:
# 处理错误通知
error_msg = data.get("error", "未提供错误信息")
logger.error(f"[私聊][{self.private_name}]收到错误通知: {error_msg}")
except Exception as e:
logger.error(f"[私聊][{self.private_name}]处理通知时发生错误: {e}")
logger.error(traceback.format_exc()) # 打印详细堆栈信息
@dataclass @dataclass
class ObservationInfo: class ObservationInfo:
"""决策信息类用于收集和管理来自chat_observer的通知信息""" """决策信息类用于收集和管理来自chat_observer的通知信息"""
# --- 修改:添加 private_name 字段 ---
private_name: str = field(init=True) # 让 dataclass 的 __init__ 接收 private_name
# data_list # data_list
chat_history: List[str] = field(default_factory=list) chat_history: List[Dict[str, Any]] = field(default_factory=list) # 修改:明确类型为 Dict
chat_history_str: str = "" chat_history_str: str = ""
unprocessed_messages: List[Dict[str, Any]] = field(default_factory=list) unprocessed_messages: List[Dict[str, Any]] = field(default_factory=list) # 修改:明确类型为 Dict
active_users: Set[str] = field(default_factory=set) active_users: Set[str] = field(default_factory=set)
# data # data
last_bot_speak_time: Optional[float] = None last_bot_speak_time: Optional[float] = None
last_user_speak_time: Optional[float] = None last_user_speak_time: Optional[float] = None
last_message_time: Optional[float] = None last_message_time: Optional[float] = None
# 添加 last_message_id
last_message_id: Optional[str] = None
last_message_content: str = "" last_message_content: str = ""
last_message_sender: Optional[str] = None last_message_sender: Optional[str] = None
bot_id: Optional[str] = None bot_id: Optional[str] = None
chat_history_count: int = 0 chat_history_count: int = 0
new_messages_count: int = 0 new_messages_count: int = 0
cold_chat_duration: float = 0.0 cold_chat_start_time: Optional[float] = None # 用于计算冷场持续时间
cold_chat_duration: float = 0.0 # 缓存计算结果
# state # state
is_typing: bool = False is_typing: bool = False # 可能表示对方正在输入
has_unread_messages: bool = False # has_unread_messages: bool = False # 这个状态可以通过 new_messages_count > 0 判断
is_cold_chat: bool = False is_cold_chat: bool = False
changed: bool = False changed: bool = False # 用于标记状态是否有变化,以便外部模块决定是否重新规划
# #spec # #spec (暂时注释掉,如果不需要)
# meta_plan_trigger: bool = False # meta_plan_trigger: bool = False
def __post_init__(self, private_name: str = None): # --- 修改:移除 __post_init__ 的参数 ---
"""初始化后创建handler""" def __post_init__(self):
self.chat_observer = None """初始化后创建handler并进行必要的设置"""
self.handler = ObservationInfoHandler(self, private_name) self.chat_observer: Optional[ChatObserver] = None # 添加类型提示
self.private_name = private_name self.handler = ObservationInfoHandler(self, self.private_name)
def bind_to_chat_observer(self, chat_observer: ChatObserver, private_name: str): def bind_to_chat_observer(self, chat_observer: ChatObserver):
"""绑定到指定的chat_observer """绑定到指定的chat_observer
Args: Args:
stream_id: 聊天流ID chat_observer: 要绑定的 ChatObserver 实例
""" """
if self.chat_observer:
logger.warning(f"[私聊][{self.private_name}]尝试重复绑定 ChatObserver")
return
self.chat_observer = chat_observer self.chat_observer = chat_observer
self.chat_observer.notification_manager.register_handler( try:
target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler # 注册关心的通知类型
) self.chat_observer.notification_manager.register_handler(
self.chat_observer.notification_manager.register_handler( target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler
target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler )
) self.chat_observer.notification_manager.register_handler(
target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler
)
# 可以根据需要注册更多通知类型
# self.chat_observer.notification_manager.register_handler(
# target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler
# )
logger.info(f"[私聊][{self.private_name}]成功绑定到 ChatObserver")
except Exception as e:
logger.error(f"[私聊][{self.private_name}]绑定到 ChatObserver 时出错: {e}")
self.chat_observer = None # 绑定失败,重置
def unbind_from_chat_observer(self): def unbind_from_chat_observer(self):
"""解除与chat_observer的绑定""" """解除与chat_observer的绑定"""
if self.chat_observer: if self.chat_observer and hasattr(self.chat_observer, 'notification_manager'): # 增加检查
self.chat_observer.notification_manager.unregister_handler( try:
target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler self.chat_observer.notification_manager.unregister_handler(
) target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler
self.chat_observer.notification_manager.unregister_handler( )
target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler self.chat_observer.notification_manager.unregister_handler(
) target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler
self.chat_observer = None )
# 如果注册了其他类型,也要在这里注销
# self.chat_observer.notification_manager.unregister_handler(
# target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler
# )
logger.info(f"[私聊][{self.private_name}]成功从 ChatObserver 解绑")
except Exception as e:
logger.error(f"[私聊][{self.private_name}]从 ChatObserver 解绑时出错: {e}")
finally: # 确保 chat_observer 被重置
self.chat_observer = None
else:
logger.warning(f"[私聊][{self.private_name}]尝试解绑时 ChatObserver 不存在或无效")
def update_from_message(self, message: Dict[str, Any]):
# 修改update_from_message 接收 UserInfo 对象
async def update_from_message(self, message: Dict[str, Any], user_info: Optional[UserInfo]):
"""从消息更新信息 """从消息更新信息
Args: Args:
message: 消息数据 message: 消息数据字典
user_info: 解析后的 UserInfo 对象 (可能为 None)
""" """
# logger.debug(f"[私聊][{self.private_name}]更新信息from_message: {message}") message_time = message.get("time")
self.last_message_time = message["time"] message_id = message.get("message_id")
self.last_message_id = message["message_id"] processed_text = message.get("processed_plain_text", "")
self.last_message_content = message.get("processed_plain_text", "") # 只有在新消息到达时才更新 last_message 相关信息
if message_time and message_time > (self.last_message_time or 0):
self.last_message_time = message_time
self.last_message_id = message_id
self.last_message_content = processed_text
# 重置冷场计时器
self.is_cold_chat = False
self.cold_chat_start_time = None
self.cold_chat_duration = 0.0
user_info = UserInfo.from_dict(message.get("user_info", {}))
self.last_message_sender = user_info.user_id
if user_info.user_id == self.bot_id: if user_info:
self.last_bot_speak_time = message["time"] sender_id = str(user_info.user_id) # 确保是字符串
self.last_message_sender = sender_id
# 更新发言时间
if sender_id == self.bot_id:
self.last_bot_speak_time = message_time
else:
self.last_user_speak_time = message_time
self.active_users.add(sender_id) # 用户发言则认为其活跃
else:
logger.warning(f"[私聊][{self.private_name}]处理消息更新时缺少有效的 UserInfo 对象, message_id: {message_id}")
self.last_message_sender = None # 发送者未知
# 将原始消息字典添加到未处理列表
self.unprocessed_messages.append(message)
self.new_messages_count = len(self.unprocessed_messages) # 直接用列表长度
# logger.debug(f"[私聊][{self.private_name}]消息更新: last_time={self.last_message_time}, new_count={self.new_messages_count}")
self.update_changed() # 标记状态已改变
else: else:
self.last_user_speak_time = message["time"] # 如果消息时间戳不是最新的,可能不需要处理,或者记录一个警告
self.active_users.add(user_info.user_id) pass
# logger.warning(f"[私聊][{self.private_name}]收到过时或无效时间戳的消息: ID={message_id}, time={message_time}")
self.new_messages_count += 1
self.unprocessed_messages.append(message)
self.update_changed()
def update_changed(self): def update_changed(self):
"""更新changed状态""" """标记状态已改变,并重置标记"""
# logger.debug(f"[私聊][{self.private_name}]状态标记为已改变 (changed=True)")
self.changed = True self.changed = True
def update_cold_chat_status(self, is_cold: bool, current_time: float):
async def update_cold_chat_status(self, is_cold: bool, current_time: float):
"""更新冷场状态 """更新冷场状态
Args: Args:
is_cold: 是否冷场 is_cold: 是否处于冷场状态
current_time: 当前时间 current_time: 当前时间
""" """
self.is_cold_chat = is_cold if is_cold != self.is_cold_chat: # 仅在状态变化时更新
if is_cold and self.last_message_time: self.is_cold_chat = is_cold
self.cold_chat_duration = current_time - self.last_message_time if is_cold:
# 进入冷场状态
self.cold_chat_start_time = self.last_message_time or current_time # 从最后消息时间开始算,或从当前时间开始
logger.info(f"[私聊][{self.private_name}]进入冷场状态,开始时间: {self.cold_chat_start_time}")
else:
# 结束冷场状态
if self.cold_chat_start_time:
self.cold_chat_duration = current_time - self.cold_chat_start_time
logger.info(f"[私聊][{self.private_name}]结束冷场状态,持续时间: {self.cold_chat_duration:.2f}")
self.cold_chat_start_time = None # 重置开始时间
self.update_changed() # 状态变化,标记改变
# 即使状态没变,如果是冷场状态,也更新持续时间
if self.is_cold_chat and self.cold_chat_start_time:
self.cold_chat_duration = current_time - self.cold_chat_start_time
def get_active_duration(self) -> float: def get_active_duration(self) -> float:
"""获取当前活跃时长 """获取当前活跃时长 (距离最后一条消息的时间)
Returns: Returns:
float: 最后一条消息到现在的时长(秒) float: 最后一条消息到现在的时长(秒)
@@ -206,7 +305,7 @@ class ObservationInfo:
return time.time() - self.last_message_time return time.time() - self.last_message_time
def get_user_response_time(self) -> Optional[float]: def get_user_response_time(self) -> Optional[float]:
"""获取用户响应时间 """获取用户最后响应时间 (距离用户最后发言的时间)
Returns: Returns:
Optional[float]: 用户最后发言到现在的时长如果没有用户发言则返回None Optional[float]: 用户最后发言到现在的时长如果没有用户发言则返回None
@@ -216,7 +315,7 @@ class ObservationInfo:
return time.time() - self.last_user_speak_time return time.time() - self.last_user_speak_time
def get_bot_response_time(self) -> Optional[float]: def get_bot_response_time(self) -> Optional[float]:
"""获取机器人响应时间 """获取机器人最后响应时间 (距离机器人最后发言的时间)
Returns: Returns:
Optional[float]: 机器人最后发言到现在的时长如果没有机器人发言则返回None Optional[float]: 机器人最后发言到现在的时长如果没有机器人发言则返回None
@@ -226,19 +325,39 @@ class ObservationInfo:
return time.time() - self.last_bot_speak_time return time.time() - self.last_bot_speak_time
async def clear_unprocessed_messages(self): async def clear_unprocessed_messages(self):
"""清空未处理消息列表""" """未处理消息移入历史记录,并更新相关状态"""
# 将未处理消息添加到历史记录中 if not self.unprocessed_messages:
for message in self.unprocessed_messages: return # 没有未处理消息,直接返回
self.chat_history.append(message)
self.chat_history_str = await build_readable_messages( # logger.debug(f"[私聊][{self.private_name}]处理 {len(self.unprocessed_messages)} 条未处理消息...")
self.chat_history[-20:] if len(self.chat_history) > 20 else self.chat_history, # 将未处理消息添加到历史记录中 (确保历史记录有长度限制,避免无限增长)
replace_bot_name=True, max_history_len = 100 # 示例最多保留100条历史记录
merge_messages=False, self.chat_history.extend(self.unprocessed_messages)
timestamp_mode="relative", if len(self.chat_history) > max_history_len:
read_mark=0.0, self.chat_history = self.chat_history[-max_history_len:]
)
# 清空未处理消息列表 # 更新历史记录字符串 (只使用最近一部分生成例如20条)
self.has_unread_messages = False history_slice_for_str = self.chat_history[-20:]
try:
self.chat_history_str = await build_readable_messages(
history_slice_for_str,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0, # read_mark 可能需要根据逻辑调整
)
except Exception as e:
logger.error(f"[私聊][{self.private_name}]构建聊天记录字符串时出错: {e}")
self.chat_history_str = "[构建聊天记录出错]" # 提供错误提示
# 清空未处理消息列表和计数
cleared_count = len(self.unprocessed_messages)
self.unprocessed_messages.clear() self.unprocessed_messages.clear()
self.chat_history_count = len(self.chat_history)
self.new_messages_count = 0 self.new_messages_count = 0
# self.has_unread_messages = False # 这个状态可以通过 new_messages_count 判断
self.chat_history_count = len(self.chat_history) # 更新历史记录总数
# logger.debug(f"[私聊][{self.private_name}]已处理 {cleared_count} 条消息,当前历史记录 {self.chat_history_count} 条。")
self.update_changed() # 状态改变