diff --git a/src/do_tool/tool_can_use/change_mood.py b/src/do_tool/not_used/change_mood.py similarity index 100% rename from src/do_tool/tool_can_use/change_mood.py rename to src/do_tool/not_used/change_mood.py diff --git a/src/do_tool/tool_can_use/change_relationship.py b/src/do_tool/not_used/change_relationship.py similarity index 100% rename from src/do_tool/tool_can_use/change_relationship.py rename to src/do_tool/not_used/change_relationship.py diff --git a/src/heart_flow/observation.py b/src/heart_flow/observation.py index 00213f3f1..89ee7bb90 100644 --- a/src/heart_flow/observation.py +++ b/src/heart_flow/observation.py @@ -6,6 +6,7 @@ from src.config.config import global_config from src.common.database import db from src.common.logger import get_module_logger import traceback +import asyncio logger = get_module_logger("observation") @@ -38,7 +39,7 @@ class ChattingObservation(Observation): self.mid_memory_info = "" self.now_message_info = "" - self.updating_old = False + self._observe_lock = asyncio.Lock() # 添加锁 self.llm_summary = LLMRequest( model=global_config.llm_observation, temperature=0.7, max_tokens=300, request_type="chat_observation" @@ -72,75 +73,108 @@ class ChattingObservation(Observation): return self.now_message_info async def observe(self): - # 查找新消息 - new_messages = list( - db.messages.find({"chat_id": self.chat_id, "time": {"$gt": self.last_observe_time}}).sort("time", 1) - ) # 按时间正序排列 + async with self._observe_lock: # 获取锁 + # 查找新消息,最多获取 self.max_now_obs_len 条 + new_messages_cursor = ( + db.messages.find({"chat_id": self.chat_id, "time": {"$gt": self.last_observe_time}}) + .sort("time", -1) # 按时间倒序排序 + .limit(self.max_now_obs_len) # 限制数量 + ) + new_messages = list(new_messages_cursor) + new_messages.reverse() # 反转列表,使消息按时间正序排列 - if not new_messages: - return self.observe_info # 没有新消息,返回上次观察结果 + if not new_messages: + # 如果没有获取到限制数量内的较新消息,可能仍然有更早的消息,但我们只关注最近的 + # 检查是否有任何新消息(即使超出限制),以决定是否更新 last_observe_time + # 注意:这里的查询也可能与其他并发 observe 冲突,但锁保护了状态更新 + any_new_message = db.messages.find_one({"chat_id": self.chat_id, "time": {"$gt": self.last_observe_time}}) + if not any_new_message: + return # 确实没有新消息 - self.last_observe_time = new_messages[-1]["time"] + # 如果有超过限制的更早的新消息,仍然需要更新时间戳,防止重复获取旧消息 + # 但不将它们加入 talking_message + latest_message_time_cursor = db.messages.find({"chat_id": self.chat_id, "time": {"$gt": self.last_observe_time}}).sort("time", -1).limit(1) + latest_time_doc = next(latest_message_time_cursor, None) + if latest_time_doc: + # 确保只在严格大于时更新,避免因并发查询导致时间戳回退 + if latest_time_doc["time"] > self.last_observe_time: + self.last_observe_time = latest_time_doc["time"] + return # 返回,因为我们只关心限制内的最新消息 - self.talking_message.extend(new_messages) + # 在持有锁的情况下,再次过滤,确保只处理真正新的消息 + # 防止处理在等待锁期间已被其他协程处理的消息 + truly_new_messages = [msg for msg in new_messages if msg["time"] > self.last_observe_time] - # 将新消息转换为字符串格式 - new_messages_str = "" - for msg in new_messages: - if "detailed_plain_text" in msg: - new_messages_str += f"{msg['detailed_plain_text']}" + if not truly_new_messages: + logger.debug(f"Chat {self.chat_id}: Fetched messages, but already processed by another concurrent observe call.") + return # 所有获取的消息都已被处理 - # print(f"new_messages_str:{new_messages_str}") + # 如果获取到了 truly_new_messages (在限制内且时间戳大于上次记录) + self.last_observe_time = truly_new_messages[-1]["time"] # 更新时间戳为获取到的最新消息的时间 - # 将新消息添加到talking_message,同时保持列表长度不超过20条 + self.talking_message.extend(truly_new_messages) - if len(self.talking_message) > self.max_now_obs_len and not self.updating_old: - self.updating_old = True - # 计算需要保留的消息数量 - keep_messages_count = self.max_now_obs_len - self.overlap_len - # 提取所有超出保留数量的最老消息 - oldest_messages = self.talking_message[:-keep_messages_count] - self.talking_message = self.talking_message[-keep_messages_count:] - oldest_messages_str = "\n".join([msg["detailed_plain_text"] for msg in oldest_messages]) - oldest_timestamps = [msg["time"] for msg in oldest_messages] + # 将新消息转换为字符串格式 (此变量似乎未使用,暂时注释掉) + # new_messages_str = "" + # for msg in truly_new_messages: + # if "detailed_plain_text" in msg: + # new_messages_str += f"{msg['detailed_plain_text']}" - # 调用 LLM 总结主题 - prompt = f"请总结以下聊天记录的主题:\n{oldest_messages_str}\n主题,用一句话概括包括人物事件和主要信息,不要分点:" - try: - summary, _ = await self.llm_summary.generate_response_async(prompt) - except Exception as e: - print(f"总结主题失败: {e}") - summary = "无法总结主题" + # print(f"new_messages_str:{new_messages_str}") - mid_memory = { - "id": str(int(datetime.now().timestamp())), - "theme": summary, - "messages": oldest_messages, - "timestamps": oldest_timestamps, - "chat_id": self.chat_id, - "created_at": datetime.now().timestamp(), - } - # print(f"mid_memory:{mid_memory}") - # 存入内存中的 mid_memorys - self.mid_memorys.append(mid_memory) - if len(self.mid_memorys) > self.max_mid_memory_len: - self.mid_memorys.pop(0) + # 锁保证了这部分逻辑的原子性 + if len(self.talking_message) > self.max_now_obs_len: + try: # 使用 try...finally 仅用于可能的LLM调用错误处理 + # 计算需要移除的消息数量,保留最新的 max_now_obs_len 条 + messages_to_remove_count = len(self.talking_message) - self.max_now_obs_len + oldest_messages = self.talking_message[:messages_to_remove_count] + self.talking_message = self.talking_message[messages_to_remove_count:] # 保留后半部分,即最新的 + oldest_messages_str = "\n".join([msg["detailed_plain_text"] for msg in oldest_messages if "detailed_plain_text" in msg]) # 增加检查 + oldest_timestamps = [msg["time"] for msg in oldest_messages] - mid_memory_str = "之前聊天的内容概括是:\n" - for mid_memory in self.mid_memorys: - time_diff = int((datetime.now().timestamp() - mid_memory["created_at"]) / 60) - mid_memory_str += f"距离现在{time_diff}分钟前(聊天记录id:{mid_memory['id']}):{mid_memory['theme']}\n" - self.mid_memory_info = mid_memory_str + # 调用 LLM 总结主题 + prompt = f"请总结以下聊天记录的主题:\n{oldest_messages_str}\n主题,用一句话概括包括人物事件和主要信息,不要分点:" + summary = "无法总结主题" # 默认值 + try: + summary_result, _ = await self.llm_summary.generate_response_async(prompt) + if summary_result: # 确保结果不为空 + summary = summary_result + except Exception as e: + logger.error(f"总结主题失败 for chat {self.chat_id}: {e}") + # 保留默认总结 "无法总结主题" - self.updating_old = False + mid_memory = { + "id": str(int(datetime.now().timestamp())), + "theme": summary, + "messages": oldest_messages, # 存储原始消息对象 + "timestamps": oldest_timestamps, + "chat_id": self.chat_id, + "created_at": datetime.now().timestamp(), + } + # print(f"mid_memory:{mid_memory}") + # 存入内存中的 mid_memorys + self.mid_memorys.append(mid_memory) + if len(self.mid_memorys) > self.max_mid_memory_len: + self.mid_memorys.pop(0) # 移除最旧的 - # print(f"处理后self.talking_message:{self.talking_message}") + mid_memory_str = "之前聊天的内容概括是:\n" + for mid_memory_item in self.mid_memorys: # 重命名循环变量以示区分 + time_diff = int((datetime.now().timestamp() - mid_memory_item["created_at"]) / 60) + mid_memory_str += f"距离现在{time_diff}分钟前(聊天记录id:{mid_memory_item['id']}):{mid_memory_item['theme']}\n" + self.mid_memory_info = mid_memory_str + except Exception as e: # 将异常处理移至此处以覆盖整个总结过程 + logger.error(f"处理和总结旧消息时出错 for chat {self.chat_id}: {e}") + traceback.print_exc() # 记录详细堆栈 - now_message_str = "" - now_message_str += self.translate_message_list_to_str(talking_message=self.talking_message) - self.now_message_info = now_message_str + # print(f"处理后self.talking_message:{self.talking_message}") - logger.debug(f"压缩早期记忆:{self.mid_memory_info}\n现在聊天内容:{self.now_message_info}") + now_message_str = "" + # 使用 self.translate_message_list_to_str 更新当前聊天内容 + now_message_str += self.translate_message_list_to_str(talking_message=self.talking_message) + self.now_message_info = now_message_str + + logger.debug(f"Chat {self.chat_id} - 压缩早期记忆:{self.mid_memory_info}\n现在聊天内容:{self.now_message_info}") + # 锁在退出 async with 块时自动释放 async def update_talking_summary(self, new_messages_str): prompt = ""