# 定义了来自外部世界的信息 # 外部世界可以是某个聊天 不同平台的聊天 也可以是任意媒体 from datetime import datetime from src.plugins.models.utils_model import LLMRequest from src.config.config import global_config from src.common.database import db from src.common.logger import get_module_logger import traceback logger = get_module_logger("observation") # 所有观察的基类 class Observation: def __init__(self, observe_type, observe_id): self.observe_info = "" self.observe_type = observe_type self.observe_id = observe_id self.last_observe_time = datetime.now().timestamp() # 初始化为当前时间 # 聊天观察 class ChattingObservation(Observation): def __init__(self, chat_id): super().__init__("chat", chat_id) self.chat_id = chat_id self.talking_message = [] self.talking_message_str = "" self.name = global_config.BOT_NICKNAME self.nick_name = global_config.BOT_ALIAS_NAMES self.max_now_obs_len = global_config.observation_context_size self.overlap_len = global_config.compressed_length self.mid_memorys = [] self.max_mid_memory_len = global_config.compress_length_limit self.mid_memory_info = "" self.now_message_info = "" # self._observe_lock = asyncio.Lock() # 移除锁 # 初始化时加载最近的10条消息 initial_messages_cursor = ( db.messages.find({"chat_id": self.chat_id, "time": {"$lt": self.last_observe_time}}) .sort("time", -1) # 按时间倒序 .limit(10) # 获取最多10条 ) initial_messages = list(initial_messages_cursor) initial_messages.reverse() # 恢复时间正序 self.talking_message = initial_messages # 将这些消息设为初始上下文 self.now_message_info = self.translate_message_list_to_str(self.talking_message) # 更新初始的 now_message_info self.llm_summary = LLMRequest( model=global_config.llm_observation, temperature=0.7, max_tokens=300, request_type="chat_observation" ) # 进行一次观察 返回观察结果observe_info def get_observe_info(self, ids=None): if ids: mid_memory_str = "" for id in ids: print(f"id:{id}") try: for mid_memory in self.mid_memorys: if mid_memory["id"] == id: mid_memory_by_id = mid_memory msg_str = "" for msg in mid_memory_by_id["messages"]: msg_str += f"{msg['detailed_plain_text']}" # time_diff = int((datetime.now().timestamp() - mid_memory_by_id["created_at"]) / 60) # mid_memory_str += f"距离现在{time_diff}分钟前:\n{msg_str}\n" mid_memory_str += f"{msg_str}\n" except Exception as e: logger.error(f"获取mid_memory_id失败: {e}") traceback.print_exc() # print(f"获取mid_memory_id失败: {e}") return self.now_message_info return mid_memory_str + "现在群里正在聊:\n" + self.now_message_info else: return self.now_message_info async def observe(self): # async with self._observe_lock: # 移除锁 # 查找新消息,最多获取 self.max_now_obs_len 条 new_messages_cursor = ( db.messages.find({"chat_id": self.chat_id, "time": {"$gt": self.last_observe_time}}) .sort("time", -1) # 按时间倒序排序 .limit(self.max_now_obs_len) # 限制数量 ) new_messages = list(new_messages_cursor) new_messages.reverse() # 反转列表,使消息按时间正序排列 if not new_messages: # 如果没有获取到限制数量内的较新消息,可能仍然有更早的消息,但我们只关注最近的 # 检查是否有任何新消息(即使超出限制),以决定是否更新 last_observe_time # 注意:这里的查询也可能与其他并发 observe 冲突,但锁保护了状态更新 # 由于外部已加锁,此处的并发冲突担忧不再需要 any_new_message = db.messages.find_one({"chat_id": self.chat_id, "time": {"$gt": self.last_observe_time}}) if not any_new_message: return # 确实没有新消息 # 如果有超过限制的更早的新消息,仍然需要更新时间戳,防止重复获取旧消息 # 但不将它们加入 talking_message latest_message_time_cursor = ( db.messages.find({"chat_id": self.chat_id, "time": {"$gt": self.last_observe_time}}) .sort("time", -1) .limit(1) ) latest_time_doc = next(latest_message_time_cursor, None) if latest_time_doc: # 确保只在严格大于时更新,避免因并发查询导致时间戳回退 if latest_time_doc["time"] > self.last_observe_time: self.last_observe_time = latest_time_doc["time"] return # 返回,因为我们只关心限制内的最新消息 self.last_observe_time = new_messages[-1]["time"] self.talking_message.extend(new_messages) if len(self.talking_message) > self.max_now_obs_len: try: # 使用 try...finally 仅用于可能的LLM调用错误处理 # 计算需要移除的消息数量,保留最新的 max_now_obs_len 条 messages_to_remove_count = len(self.talking_message) - self.max_now_obs_len oldest_messages = self.talking_message[:messages_to_remove_count] self.talking_message = self.talking_message[messages_to_remove_count:] # 保留后半部分,即最新的 oldest_messages_str = "\n".join( [msg["detailed_plain_text"] for msg in oldest_messages if "detailed_plain_text" in msg] ) # 增加检查 oldest_timestamps = [msg["time"] for msg in oldest_messages] # 调用 LLM 总结主题 prompt = f"请总结以下聊天记录的主题:\n{oldest_messages_str}\n主题,用一句话概括包括人物事件和主要信息,不要分点:" summary = "无法总结主题" # 默认值 try: summary_result, _ = await self.llm_summary.generate_response_async(prompt) if summary_result: # 确保结果不为空 summary = summary_result except Exception as e: logger.error(f"总结主题失败 for chat {self.chat_id}: {e}") # 保留默认总结 "无法总结主题" mid_memory = { "id": str(int(datetime.now().timestamp())), "theme": summary, "messages": oldest_messages, # 存储原始消息对象 "timestamps": oldest_timestamps, "chat_id": self.chat_id, "created_at": datetime.now().timestamp(), } # print(f"mid_memory:{mid_memory}") # 存入内存中的 mid_memorys self.mid_memorys.append(mid_memory) if len(self.mid_memorys) > self.max_mid_memory_len: self.mid_memorys.pop(0) # 移除最旧的 mid_memory_str = "之前聊天的内容概述是:\n" for mid_memory_item in self.mid_memorys: # 重命名循环变量以示区分 time_diff = int((datetime.now().timestamp() - mid_memory_item["created_at"]) / 60) mid_memory_str += ( f"距离现在{time_diff}分钟前(聊天记录id:{mid_memory_item['id']}):{mid_memory_item['theme']}\n" ) self.mid_memory_info = mid_memory_str except Exception as e: # 将异常处理移至此处以覆盖整个总结过程 logger.error(f"处理和总结旧消息时出错 for chat {self.chat_id}: {e}") traceback.print_exc() # 记录详细堆栈 # print(f"处理后self.talking_message:{self.talking_message}") now_message_str = "" # 使用 self.translate_message_list_to_str 更新当前聊天内容 now_message_str += self.translate_message_list_to_str(talking_message=self.talking_message) self.now_message_info = now_message_str logger.trace( f"Chat {self.chat_id} - 压缩早期记忆:{self.mid_memory_info}\n现在聊天内容:{self.now_message_info}" ) @staticmethod def translate_message_list_to_str(talking_message): talking_message_str = "" for message in talking_message: talking_message_str += message["detailed_plain_text"] return talking_message_str