From 8cc4e1994753988783f5e935bd534b82c88f3437 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Sat, 5 Apr 2025 21:00:54 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E4=B8=BA=E5=BF=83=E6=B5=81?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=9F=A5=E8=AF=86=E5=92=8C=E7=9F=A5=E8=AF=86?= =?UTF-8?q?=E7=BC=93=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/heart_flow/sub_heartflow.py | 300 ++++++++++++++++-- .../reasoning_prompt_builder.py | 44 +-- .../think_flow_chat/think_flow_chat.py | 121 ++++--- 3 files changed, 360 insertions(+), 105 deletions(-) diff --git a/src/heart_flow/sub_heartflow.py b/src/heart_flow/sub_heartflow.py index fcbe9332f..1312b7aae 100644 --- a/src/heart_flow/sub_heartflow.py +++ b/src/heart_flow/sub_heartflow.py @@ -8,6 +8,9 @@ import time from src.plugins.schedule.schedule_generator import bot_schedule from src.plugins.memory_system.Hippocampus import HippocampusManager from src.common.logger import get_module_logger, LogConfig, SUB_HEARTFLOW_STYLE_CONFIG # noqa: E402 +from src.plugins.chat.utils import get_embedding +from src.common.database import db +from typing import Union subheartflow_config = LogConfig( # 使用海马体专用样式 @@ -53,6 +56,8 @@ class SubHeartflow: self.is_active = False self.observations: list[Observation] = [] + + self.running_knowledges = [] def add_observation(self, observation: Observation): """添加一个新的observation对象到列表中,如果已存在相同id的observation则不添加""" @@ -98,49 +103,49 @@ class SubHeartflow: logger.info(f"子心流 {self.subheartflow_id} 已经5分钟没有激活,正在销毁...") break # 退出循环,销毁自己 - async def do_a_thinking(self): - current_thinking_info = self.current_mind - mood_info = self.current_state.mood + # async def do_a_thinking(self): + # current_thinking_info = self.current_mind + # mood_info = self.current_state.mood - observation = self.observations[0] - chat_observe_info = observation.observe_info - # print(f"chat_observe_info:{chat_observe_info}") + # observation = self.observations[0] + # chat_observe_info = observation.observe_info + # # print(f"chat_observe_info:{chat_observe_info}") - # 调取记忆 - related_memory = await HippocampusManager.get_instance().get_memory_from_text( - text=chat_observe_info, max_memory_num=2, max_memory_length=2, max_depth=3, fast_retrieval=False - ) + # # 调取记忆 + # related_memory = await HippocampusManager.get_instance().get_memory_from_text( + # text=chat_observe_info, max_memory_num=2, max_memory_length=2, max_depth=3, fast_retrieval=False + # ) - if related_memory: - related_memory_info = "" - for memory in related_memory: - related_memory_info += memory[1] - else: - related_memory_info = "" + # if related_memory: + # related_memory_info = "" + # for memory in related_memory: + # related_memory_info += memory[1] + # else: + # related_memory_info = "" - # print(f"相关记忆:{related_memory_info}") + # # print(f"相关记忆:{related_memory_info}") - schedule_info = bot_schedule.get_current_num_task(num=1, time_info=False) + # schedule_info = bot_schedule.get_current_num_task(num=1, time_info=False) - prompt = "" - prompt += f"你刚刚在做的事情是:{schedule_info}\n" - # prompt += f"麦麦的总体想法是:{self.main_heartflow_info}\n\n" - prompt += f"你{self.personality_info}\n" - if related_memory_info: - prompt += f"你想起来你之前见过的回忆:{related_memory_info}。\n以上是你的回忆,不一定是目前聊天里的人说的,也不一定是现在发生的事情,请记住。\n" - prompt += f"刚刚你的想法是{current_thinking_info}。\n" - prompt += "-----------------------------------\n" - prompt += f"现在你正在上网,和qq群里的网友们聊天,群里正在聊的话题是:{chat_observe_info}\n" - prompt += f"你现在{mood_info}\n" - prompt += "现在你接下去继续思考,产生新的想法,不要分点输出,输出连贯的内心独白,不要太长," - prompt += "但是记得结合上述的消息,要记得维持住你的人设,关注聊天和新内容,不要思考太多:" - reponse, reasoning_content = await self.llm_model.generate_response_async(prompt) + # prompt = "" + # prompt += f"你刚刚在做的事情是:{schedule_info}\n" + # # prompt += f"麦麦的总体想法是:{self.main_heartflow_info}\n\n" + # prompt += f"你{self.personality_info}\n" + # if related_memory_info: + # prompt += f"你想起来你之前见过的回忆:{related_memory_info}。\n以上是你的回忆,不一定是目前聊天里的人说的,也不一定是现在发生的事情,请记住。\n" + # prompt += f"刚刚你的想法是{current_thinking_info}。\n" + # prompt += "-----------------------------------\n" + # prompt += f"现在你正在上网,和qq群里的网友们聊天,群里正在聊的话题是:{chat_observe_info}\n" + # prompt += f"你现在{mood_info}\n" + # prompt += "现在你接下去继续思考,产生新的想法,不要分点输出,输出连贯的内心独白,不要太长," + # prompt += "但是记得结合上述的消息,要记得维持住你的人设,关注聊天和新内容,不要思考太多:" + # reponse, reasoning_content = await self.llm_model.generate_response_async(prompt) - self.update_current_mind(reponse) + # self.update_current_mind(reponse) - self.current_mind = reponse - logger.debug(f"prompt:\n{prompt}\n") - logger.info(f"麦麦的脑内状态:{self.current_mind}") + # self.current_mind = reponse + # logger.debug(f"prompt:\n{prompt}\n") + # logger.info(f"麦麦的脑内状态:{self.current_mind}") async def do_observe(self): observation = self.observations[0] @@ -166,6 +171,13 @@ class SubHeartflow: else: related_memory_info = "" + related_info,grouped_results = await self.get_prompt_info(chat_observe_info + message_txt, 0.4) + print(related_info) + for topic, results in grouped_results.items(): + for result in results: + print(result) + self.running_knowledges.append(result) + # print(f"相关记忆:{related_memory_info}") schedule_info = bot_schedule.get_current_num_task(num=1, time_info=False) @@ -176,6 +188,8 @@ class SubHeartflow: prompt += f"你刚刚在做的事情是:{schedule_info}\n" if related_memory_info: prompt += f"你想起来你之前见过的回忆:{related_memory_info}。\n以上是你的回忆,不一定是目前聊天里的人说的,也不一定是现在发生的事情,请记住。\n" + if related_info: + prompt += f"你想起你知道:{related_info}\n" prompt += f"刚刚你的想法是{current_thinking_info}。\n" prompt += "-----------------------------------\n" prompt += f"现在你正在上网,和qq群里的网友们聊天,群里正在聊的话题是:{chat_observe_info}\n" @@ -249,6 +263,222 @@ class SubHeartflow: def update_current_mind(self, reponse): self.past_mind.append(self.current_mind) self.current_mind = reponse + + + async def get_prompt_info(self, message: str, threshold: float): + start_time = time.time() + related_info = "" + logger.debug(f"获取知识库内容,元消息:{message[:30]}...,消息长度: {len(message)}") + + # 1. 先从LLM获取主题,类似于记忆系统的做法 + topics = [] + # try: + # # 先尝试使用记忆系统的方法获取主题 + # hippocampus = HippocampusManager.get_instance()._hippocampus + # topic_num = min(5, max(1, int(len(message) * 0.1))) + # topics_response = await hippocampus.llm_topic_judge.generate_response(hippocampus.find_topic_llm(message, topic_num)) + + # # 提取关键词 + # topics = re.findall(r"<([^>]+)>", topics_response[0]) + # if not topics: + # topics = [] + # else: + # topics = [ + # topic.strip() + # for topic in ",".join(topics).replace(",", ",").replace("、", ",").replace(" ", ",").split(",") + # if topic.strip() + # ] + + # logger.info(f"从LLM提取的主题: {', '.join(topics)}") + # except Exception as e: + # logger.error(f"从LLM提取主题失败: {str(e)}") + # # 如果LLM提取失败,使用jieba分词提取关键词作为备选 + # words = jieba.cut(message) + # topics = [word for word in words if len(word) > 1][:5] + # logger.info(f"使用jieba提取的主题: {', '.join(topics)}") + + # 如果无法提取到主题,直接使用整个消息 + if not topics: + logger.info("未能提取到任何主题,使用整个消息进行查询") + embedding = await get_embedding(message, request_type="info_retrieval") + if not embedding: + logger.error("获取消息嵌入向量失败") + return "" + + related_info = self.get_info_from_db(embedding, limit=3, threshold=threshold) + logger.info(f"知识库检索完成,总耗时: {time.time() - start_time:.3f}秒") + return related_info, {} + + # 2. 对每个主题进行知识库查询 + logger.info(f"开始处理{len(topics)}个主题的知识库查询") + + # 优化:批量获取嵌入向量,减少API调用 + embeddings = {} + topics_batch = [topic for topic in topics if len(topic) > 0] + if message: # 确保消息非空 + topics_batch.append(message) + + # 批量获取嵌入向量 + embed_start_time = time.time() + for text in topics_batch: + if not text or len(text.strip()) == 0: + continue + + try: + embedding = await get_embedding(text, request_type="info_retrieval") + if embedding: + embeddings[text] = embedding + else: + logger.warning(f"获取'{text}'的嵌入向量失败") + except Exception as e: + logger.error(f"获取'{text}'的嵌入向量时发生错误: {str(e)}") + + logger.info(f"批量获取嵌入向量完成,耗时: {time.time() - embed_start_time:.3f}秒") + + if not embeddings: + logger.error("所有嵌入向量获取失败") + return "" + + # 3. 对每个主题进行知识库查询 + all_results = [] + query_start_time = time.time() + + # 首先添加原始消息的查询结果 + if message in embeddings: + original_results = self.get_info_from_db(embeddings[message], limit=3, threshold=threshold, return_raw=True) + if original_results: + for result in original_results: + result["topic"] = "原始消息" + all_results.extend(original_results) + logger.info(f"原始消息查询到{len(original_results)}条结果") + + # 然后添加每个主题的查询结果 + for topic in topics: + if not topic or topic not in embeddings: + continue + + try: + topic_results = self.get_info_from_db(embeddings[topic], limit=3, threshold=threshold, return_raw=True) + if topic_results: + # 添加主题标记 + for result in topic_results: + result["topic"] = topic + all_results.extend(topic_results) + logger.info(f"主题'{topic}'查询到{len(topic_results)}条结果") + except Exception as e: + logger.error(f"查询主题'{topic}'时发生错误: {str(e)}") + + logger.info(f"知识库查询完成,耗时: {time.time() - query_start_time:.3f}秒,共获取{len(all_results)}条结果") + + # 4. 去重和过滤 + process_start_time = time.time() + unique_contents = set() + filtered_results = [] + for result in all_results: + content = result["content"] + if content not in unique_contents: + unique_contents.add(content) + filtered_results.append(result) + + # 5. 按相似度排序 + filtered_results.sort(key=lambda x: x["similarity"], reverse=True) + + # 6. 限制总数量(最多10条) + filtered_results = filtered_results[:10] + logger.info(f"结果处理完成,耗时: {time.time() - process_start_time:.3f}秒,过滤后剩余{len(filtered_results)}条结果") + + # 7. 格式化输出 + if filtered_results: + format_start_time = time.time() + grouped_results = {} + for result in filtered_results: + topic = result["topic"] + if topic not in grouped_results: + grouped_results[topic] = [] + grouped_results[topic].append(result) + + # 按主题组织输出 + for topic, results in grouped_results.items(): + related_info += f"【主题: {topic}】\n" + for i, result in enumerate(results, 1): + similarity = result["similarity"] + content = result["content"].strip() + # 调试:为内容添加序号和相似度信息 + # related_info += f"{i}. [{similarity:.2f}] {content}\n" + related_info += f"{content}\n" + related_info += "\n" + + logger.info(f"格式化输出完成,耗时: {time.time() - format_start_time:.3f}秒") + + logger.info(f"知识库检索总耗时: {time.time() - start_time:.3f}秒") + return related_info,grouped_results + + def get_info_from_db(self, query_embedding: list, limit: int = 1, threshold: float = 0.5, return_raw: bool = False) -> Union[str, list]: + if not query_embedding: + return "" if not return_raw else [] + # 使用余弦相似度计算 + pipeline = [ + { + "$addFields": { + "dotProduct": { + "$reduce": { + "input": {"$range": [0, {"$size": "$embedding"}]}, + "initialValue": 0, + "in": { + "$add": [ + "$$value", + { + "$multiply": [ + {"$arrayElemAt": ["$embedding", "$$this"]}, + {"$arrayElemAt": [query_embedding, "$$this"]}, + ] + }, + ] + }, + } + }, + "magnitude1": { + "$sqrt": { + "$reduce": { + "input": "$embedding", + "initialValue": 0, + "in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]}, + } + } + }, + "magnitude2": { + "$sqrt": { + "$reduce": { + "input": query_embedding, + "initialValue": 0, + "in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]}, + } + } + }, + } + }, + {"$addFields": {"similarity": {"$divide": ["$dotProduct", {"$multiply": ["$magnitude1", "$magnitude2"]}]}}}, + { + "$match": { + "similarity": {"$gte": threshold} # 只保留相似度大于等于阈值的结果 + } + }, + {"$sort": {"similarity": -1}}, + {"$limit": limit}, + {"$project": {"content": 1, "similarity": 1}}, + ] + + results = list(db.knowledges.aggregate(pipeline)) + logger.debug(f"知识库查询结果数量: {len(results)}") + + if not results: + return "" if not return_raw else [] + + if return_raw: + return results + else: + # 返回所有找到的内容,用换行分隔 + return "\n".join(str(result["content"]) for result in results) # subheartflow = SubHeartflow() diff --git a/src/plugins/chat_module/reasoning_chat/reasoning_prompt_builder.py b/src/plugins/chat_module/reasoning_chat/reasoning_prompt_builder.py index d9e2cf75b..87fc14045 100644 --- a/src/plugins/chat_module/reasoning_chat/reasoning_prompt_builder.py +++ b/src/plugins/chat_module/reasoning_chat/reasoning_prompt_builder.py @@ -167,30 +167,30 @@ class PromptBuilder: # 1. 先从LLM获取主题,类似于记忆系统的做法 topics = [] - try: - # 先尝试使用记忆系统的方法获取主题 - hippocampus = HippocampusManager.get_instance()._hippocampus - topic_num = min(5, max(1, int(len(message) * 0.1))) - topics_response = await hippocampus.llm_topic_judge.generate_response(hippocampus.find_topic_llm(message, topic_num)) + # try: + # # 先尝试使用记忆系统的方法获取主题 + # hippocampus = HippocampusManager.get_instance()._hippocampus + # topic_num = min(5, max(1, int(len(message) * 0.1))) + # topics_response = await hippocampus.llm_topic_judge.generate_response(hippocampus.find_topic_llm(message, topic_num)) - # 提取关键词 - topics = re.findall(r"<([^>]+)>", topics_response[0]) - if not topics: - topics = [] - else: - topics = [ - topic.strip() - for topic in ",".join(topics).replace(",", ",").replace("、", ",").replace(" ", ",").split(",") - if topic.strip() - ] + # # 提取关键词 + # topics = re.findall(r"<([^>]+)>", topics_response[0]) + # if not topics: + # topics = [] + # else: + # topics = [ + # topic.strip() + # for topic in ",".join(topics).replace(",", ",").replace("、", ",").replace(" ", ",").split(",") + # if topic.strip() + # ] - logger.info(f"从LLM提取的主题: {', '.join(topics)}") - except Exception as e: - logger.error(f"从LLM提取主题失败: {str(e)}") - # 如果LLM提取失败,使用jieba分词提取关键词作为备选 - words = jieba.cut(message) - topics = [word for word in words if len(word) > 1][:5] - logger.info(f"使用jieba提取的主题: {', '.join(topics)}") + # logger.info(f"从LLM提取的主题: {', '.join(topics)}") + # except Exception as e: + # logger.error(f"从LLM提取主题失败: {str(e)}") + # # 如果LLM提取失败,使用jieba分词提取关键词作为备选 + # words = jieba.cut(message) + # topics = [word for word in words if len(word) > 1][:5] + # logger.info(f"使用jieba提取的主题: {', '.join(topics)}") # 如果无法提取到主题,直接使用整个消息 if not topics: diff --git a/src/plugins/chat_module/think_flow_chat/think_flow_chat.py b/src/plugins/chat_module/think_flow_chat/think_flow_chat.py index c5ab77b6d..725fd3f72 100644 --- a/src/plugins/chat_module/think_flow_chat/think_flow_chat.py +++ b/src/plugins/chat_module/think_flow_chat/think_flow_chat.py @@ -236,59 +236,84 @@ class ThinkFlowChat: do_reply = False if random() < reply_probability: - do_reply = True - - # 创建思考消息 - timer1 = time.time() - thinking_id = await self._create_thinking_message(message, chat, userinfo, messageinfo) - timer2 = time.time() - timing_results["创建思考消息"] = timer2 - timer1 - - # 观察 - timer1 = time.time() - await heartflow.get_subheartflow(chat.stream_id).do_observe() - timer2 = time.time() - timing_results["观察"] = timer2 - timer1 - - # 思考前脑内状态 - timer1 = time.time() - await heartflow.get_subheartflow(chat.stream_id).do_thinking_before_reply(message.processed_plain_text) - timer2 = time.time() - timing_results["思考前脑内状态"] = timer2 - timer1 - - # 生成回复 - timer1 = time.time() - response_set = await self.gpt.generate_response(message) - timer2 = time.time() - timing_results["生成回复"] = timer2 - timer1 + try: + do_reply = True + + # 创建思考消息 + try: + timer1 = time.time() + thinking_id = await self._create_thinking_message(message, chat, userinfo, messageinfo) + timer2 = time.time() + timing_results["创建思考消息"] = timer2 - timer1 + except Exception as e: + logger.error(f"心流创建思考消息失败: {e}") + + try: + # 观察 + timer1 = time.time() + await heartflow.get_subheartflow(chat.stream_id).do_observe() + timer2 = time.time() + timing_results["观察"] = timer2 - timer1 + except Exception as e: + logger.error(f"心流观察失败: {e}") - if not response_set: - logger.info("为什么生成回复失败?") - return + # 思考前脑内状态 + try: + timer1 = time.time() + await heartflow.get_subheartflow(chat.stream_id).do_thinking_before_reply(message.processed_plain_text) + timer2 = time.time() + timing_results["思考前脑内状态"] = timer2 - timer1 + except Exception as e: + logger.error(f"心流思考前脑内状态失败: {e}") + + # 生成回复 + timer1 = time.time() + response_set = await self.gpt.generate_response(message) + timer2 = time.time() + timing_results["生成回复"] = timer2 - timer1 - # 发送消息 - timer1 = time.time() - await self._send_response_messages(message, chat, response_set, thinking_id) - timer2 = time.time() - timing_results["发送消息"] = timer2 - timer1 + if not response_set: + logger.info("为什么生成回复失败?") + return - # 处理表情包 - timer1 = time.time() - await self._handle_emoji(message, chat, response_set) - timer2 = time.time() - timing_results["处理表情包"] = timer2 - timer1 + # 发送消息 + try: + timer1 = time.time() + await self._send_response_messages(message, chat, response_set, thinking_id) + timer2 = time.time() + timing_results["发送消息"] = timer2 - timer1 + except Exception as e: + logger.error(f"心流发送消息失败: {e}") - # 更新心流 - timer1 = time.time() - await self._update_using_response(message, response_set) - timer2 = time.time() - timing_results["更新心流"] = timer2 - timer1 + # 处理表情包 + try: + timer1 = time.time() + await self._handle_emoji(message, chat, response_set) + timer2 = time.time() + timing_results["处理表情包"] = timer2 - timer1 + except Exception as e: + logger.error(f"心流处理表情包失败: {e}") - # 更新关系情绪 - timer1 = time.time() - await self._update_relationship(message, response_set) - timer2 = time.time() - timing_results["更新关系情绪"] = timer2 - timer1 + # 更新心流 + try: + timer1 = time.time() + await self._update_using_response(message, response_set) + timer2 = time.time() + timing_results["更新心流"] = timer2 - timer1 + except Exception as e: + logger.error(f"心流更新失败: {e}") + + # 更新关系情绪 + try: + timer1 = time.time() + await self._update_relationship(message, response_set) + timer2 = time.time() + timing_results["更新关系情绪"] = timer2 - timer1 + except Exception as e: + logger.error(f"心流更新关系情绪失败: {e}") + + except Exception as e: + logger.error(f"心流处理消息失败: {e}") # 输出性能计时结果 if do_reply: