diff --git a/src/heart_flow/sub_heartflow.py b/src/heart_flow/sub_heartflow.py index c06ab598a..767a36bec 100644 --- a/src/heart_flow/sub_heartflow.py +++ b/src/heart_flow/sub_heartflow.py @@ -58,6 +58,19 @@ def init_prompt(): prompt += "现在你接下去继续思考,产生新的想法,记得保留你刚刚的想法,不要分点输出,输出连贯的内心独白" prompt += "不要太长,但是记得结合上述的消息,要记得你的人设,关注聊天和新内容,关注你回复的内容,不要思考太多:" Prompt(prompt, "sub_heartflow_prompt_after") + + # prompt += f"你现在正在做的事情是:{schedule_info}\n" + prompt += "{extra_info}\n" + prompt += "{prompt_personality}\n" + prompt += "现在是{time_now},你正在上网,和qq群里的网友们聊天,群里正在聊的话题是:\n{chat_observe_info}\n" + prompt += "刚刚你的想法是{current_thinking_info}。" + prompt += "你现在看到了网友们发的新消息:{message_new_info}\n" + # prompt += "你刚刚回复了群友们:{reply_info}" + prompt += "你现在{mood_info}" + prompt += "现在你接下去继续思考,产生新的想法,记得保留你刚刚的想法,不要分点输出,输出连贯的内心独白" + prompt += "不要思考太多,不要输出多余内容(包括前后缀,冒号和引号,括号, 表情,等),不要带有括号和动作描写" + prompt += "记得结合上述的消息,生成内心想法,文字不要浮夸,注意{bot_name}指的就是你。" + Prompt(prompt, "sub_heartflow_prompt_after_observe") class CurrentState: @@ -262,9 +275,25 @@ class SubHeartflow: logger.info(f"prompt:\n{prompt}\n") logger.info(f"麦麦的思考前脑内状态:{self.current_mind}") return self.current_mind, self.past_mind + + async def do_thinking_after_observe( + self, message_txt: str, sender_info: UserInfo, chat_stream: ChatStream, extra_info: str, obs_id: int = None + ): + current_thinking_info = self.current_mind + mood_info = self.current_state.mood + # mood_info = "你很生气,很愤怒" + observation = self.observations[0] + if obs_id: + print(f"11111111111有id,开始获取观察信息{obs_id}") + chat_observe_info = observation.get_observe_info(obs_id) + else: + chat_observe_info = observation.get_observe_info() - async def do_thinking_after_reply(self, reply_content, chat_talking_prompt, extra_info): - # print("麦麦回复之后脑袋转起来了") + extra_info_prompt = "" + for tool_name, tool_data in extra_info.items(): + extra_info_prompt += f"{tool_name} 相关信息:\n" + for item in tool_data: + extra_info_prompt += f"- {item['name']}: {item['content']}\n" # 开始构建prompt prompt_personality = f"你的名字是{self.bot_name},你" @@ -274,12 +303,6 @@ class SubHeartflow: personality_core = individuality.personality.personality_core prompt_personality += personality_core - extra_info_prompt = "" - for tool_name, tool_data in extra_info.items(): - extra_info_prompt += f"{tool_name} 相关信息:\n" - for item in tool_data: - extra_info_prompt += f"- {item['name']}: {item['content']}\n" - personality_sides = individuality.personality.personality_sides random.shuffle(personality_sides) prompt_personality += f",{personality_sides[0]}" @@ -288,26 +311,47 @@ class SubHeartflow: random.shuffle(identity_detail) prompt_personality += f",{identity_detail[0]}" - current_thinking_info = self.current_mind - mood_info = self.current_state.mood + # 关系 + who_chat_in_group = [ + (chat_stream.user_info.platform, chat_stream.user_info.user_id, chat_stream.user_info.user_nickname) + ] + who_chat_in_group += get_recent_group_speaker( + chat_stream.stream_id, + (chat_stream.user_info.platform, chat_stream.user_info.user_id), + limit=global_config.MAX_CONTEXT_SIZE, + ) - observation = self.observations[0] - chat_observe_info = observation.observe_info + relation_prompt = "" + for person in who_chat_in_group: + relation_prompt += await relationship_manager.build_relationship_info(person) + + # relation_prompt_all = ( + # f"{relation_prompt}关系等级越大,关系越好,请分析聊天记录," + # f"根据你和说话者{sender_name}的关系和态度进行回复,明确你的立场和情感。" + # ) + relation_prompt_all = (await global_prompt_manager.get_prompt_async("relationship_prompt")).format( + relation_prompt, sender_info.user_nickname + ) + + sender_name_sign = ( + f"<{chat_stream.platform}:{sender_info.user_id}:{sender_info.user_nickname}:{sender_info.user_cardname}>" + ) - message_new_info = chat_talking_prompt - reply_info = reply_content time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - prompt = (await global_prompt_manager.get_prompt_async("sub_heartflow_prompt_after")).format( + prompt = (await global_prompt_manager.get_prompt_async("sub_heartflow_prompt_after_observe")).format( extra_info_prompt, + # prompt_schedule, + relation_prompt_all, prompt_personality, + current_thinking_info, time_now, chat_observe_info, - current_thinking_info, - message_new_info, - reply_info, mood_info, + sender_name_sign, + message_txt, + self.bot_name, ) prompt = await relationship_manager.convert_all_person_sign_to_person_name(prompt) @@ -316,14 +360,77 @@ class SubHeartflow: try: response, reasoning_content = await self.llm_model.generate_response_async(prompt) except Exception as e: - logger.error(f"回复后内心独白获取失败: {e}") + logger.error(f"回复前内心独白获取失败: {e}") response = "" self.update_current_mind(response) self.current_mind = response - logger.info(f"麦麦回复后的脑内状态:{self.current_mind}") - self.last_reply_time = time.time() + logger.info(f"prompt:\n{prompt}\n") + logger.info(f"麦麦的思考前脑内状态:{self.current_mind}") + return self.current_mind, self.past_mind + + # async def do_thinking_after_reply(self, reply_content, chat_talking_prompt, extra_info): + # # print("麦麦回复之后脑袋转起来了") + + # # 开始构建prompt + # prompt_personality = f"你的名字是{self.bot_name},你" + # # person + # individuality = Individuality.get_instance() + + # personality_core = individuality.personality.personality_core + # prompt_personality += personality_core + + # extra_info_prompt = "" + # for tool_name, tool_data in extra_info.items(): + # extra_info_prompt += f"{tool_name} 相关信息:\n" + # for item in tool_data: + # extra_info_prompt += f"- {item['name']}: {item['content']}\n" + + # personality_sides = individuality.personality.personality_sides + # random.shuffle(personality_sides) + # prompt_personality += f",{personality_sides[0]}" + + # identity_detail = individuality.identity.identity_detail + # random.shuffle(identity_detail) + # prompt_personality += f",{identity_detail[0]}" + + # current_thinking_info = self.current_mind + # mood_info = self.current_state.mood + + # observation = self.observations[0] + # chat_observe_info = observation.observe_info + + # message_new_info = chat_talking_prompt + # reply_info = reply_content + + # time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + + # prompt = (await global_prompt_manager.get_prompt_async("sub_heartflow_prompt_after")).format( + # extra_info_prompt, + # prompt_personality, + # time_now, + # chat_observe_info, + # current_thinking_info, + # message_new_info, + # reply_info, + # mood_info, + # ) + + # prompt = await relationship_manager.convert_all_person_sign_to_person_name(prompt) + # prompt = parse_text_timestamps(prompt, mode="lite") + + # try: + # response, reasoning_content = await self.llm_model.generate_response_async(prompt) + # except Exception as e: + # logger.error(f"回复后内心独白获取失败: {e}") + # response = "" + # self.update_current_mind(response) + + # self.current_mind = response + # logger.info(f"麦麦回复后的脑内状态:{self.current_mind}") + + # self.last_reply_time = time.time() def update_current_mind(self, response): self.past_mind.append(self.current_mind) diff --git a/src/plugins/chat/utils.py b/src/plugins/chat/utils.py index c5c20b333..4bf488082 100644 --- a/src/plugins/chat/utils.py +++ b/src/plugins/chat/utils.py @@ -717,30 +717,12 @@ def parse_text_timestamps(text: str, mode: str = "normal") -> str: # normal模式: 直接转换所有时间戳 if mode == "normal": result_text = text - - # 将时间戳转换为可读格式并记录相同格式的时间戳 - timestamp_readable_map = {} - readable_time_used = set() - for match in matches: timestamp = float(match.group(1)) readable_time = translate_timestamp_to_human_readable(timestamp, "normal") - timestamp_readable_map[match.group(0)] = (timestamp, readable_time) - - # 按时间戳排序 - sorted_timestamps = sorted(timestamp_readable_map.items(), key=lambda x: x[1][0]) - - # 执行替换,相同格式的只保留最早的 - for ts_str, (_, readable) in sorted_timestamps: - pattern_instance = re.escape(ts_str) - if readable in readable_time_used: - # 如果这个可读时间已经使用过,替换为空字符串 - result_text = re.sub(pattern_instance, "", result_text, count=1) - else: - # 否则替换为可读时间并记录 - result_text = re.sub(pattern_instance, readable, result_text, count=1) - readable_time_used.add(readable) - + # 由于替换会改变文本长度,需要使用正则替换而非直接替换 + pattern_instance = re.escape(match.group(0)) + result_text = re.sub(pattern_instance, readable_time, result_text, count=1) return result_text else: # lite模式: 按5秒间隔划分并选择性转换 @@ -799,30 +781,15 @@ def parse_text_timestamps(text: str, mode: str = "normal") -> str: pattern_instance = re.escape(match.group(0)) result_text = re.sub(pattern_instance, "", result_text, count=1) - # 按照时间戳升序排序 - to_convert.sort(key=lambda x: x[0]) - - # 将时间戳转换为可读时间并记录哪些可读时间已经使用过 - converted_timestamps = [] - readable_time_used = set() + # 按照时间戳原始顺序排序,避免替换时位置错误 + to_convert.sort(key=lambda x: x[1].start()) + # 执行替换 + # 由于替换会改变文本长度,从后向前替换 + to_convert.reverse() for ts, match in to_convert: readable_time = translate_timestamp_to_human_readable(ts, "relative") - converted_timestamps.append((ts, match, readable_time)) - - # 按照时间戳原始顺序排序,避免替换时位置错误 - converted_timestamps.sort(key=lambda x: x[1].start()) - - # 从后向前替换,避免位置改变 - converted_timestamps.reverse() - for ts, match, readable_time in converted_timestamps: pattern_instance = re.escape(match.group(0)) - if readable_time in readable_time_used: - # 如果相同格式的时间已存在,替换为空字符串 - result_text = re.sub(pattern_instance, "", result_text, count=1) - else: - # 否则替换为可读时间并记录 - result_text = re.sub(pattern_instance, readable_time, result_text, count=1) - readable_time_used.add(readable_time) + result_text = re.sub(pattern_instance, readable_time, result_text, count=1) return result_text diff --git a/src/plugins/chat_module/heartFC_chat/heartFC__generator.py b/src/plugins/chat_module/heartFC_chat/heartFC__generator.py new file mode 100644 index 000000000..66b8b3335 --- /dev/null +++ b/src/plugins/chat_module/heartFC_chat/heartFC__generator.py @@ -0,0 +1,248 @@ +from typing import List, Optional +import random + + +from ...models.utils_model import LLMRequest +from ....config.config import global_config +from ...chat.message import MessageRecv +from .heartFC__prompt_builder import prompt_builder +from ...chat.utils import process_llm_response +from src.common.logger import get_module_logger, LogConfig, LLM_STYLE_CONFIG +from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager +from ...utils.timer_calculater import Timer + +from src.plugins.moods.moods import MoodManager + +# 定义日志配置 +llm_config = LogConfig( + # 使用消息发送专用样式 + console_format=LLM_STYLE_CONFIG["console_format"], + file_format=LLM_STYLE_CONFIG["file_format"], +) + +logger = get_module_logger("llm_generator", config=llm_config) + + +class ResponseGenerator: + def __init__(self): + self.model_normal = LLMRequest( + model=global_config.llm_normal, + temperature=global_config.llm_normal["temp"], + max_tokens=256, + request_type="response_heartflow", + ) + + self.model_sum = LLMRequest( + model=global_config.llm_summary_by_topic, temperature=0.6, max_tokens=2000, request_type="relation" + ) + self.current_model_type = "r1" # 默认使用 R1 + self.current_model_name = "unknown model" + + async def generate_response(self, message: MessageRecv, thinking_id: str) -> Optional[List[str]]: + """根据当前模型类型选择对应的生成函数""" + + logger.info( + f"思考:{message.processed_plain_text[:30] + '...' if len(message.processed_plain_text) > 30 else message.processed_plain_text}" + ) + + arousal_multiplier = MoodManager.get_instance().get_arousal_multiplier() + + with Timer() as t_generate_response: + checked = False + if random.random() > 0: + checked = False + current_model = self.model_normal + current_model.temperature = ( + global_config.llm_normal["temp"] * arousal_multiplier + ) # 激活度越高,温度越高 + model_response = await self._generate_response_with_model( + message, current_model, thinking_id, mode="normal" + ) + + model_checked_response = model_response + else: + checked = True + current_model = self.model_normal + current_model.temperature = ( + global_config.llm_normal["temp"] * arousal_multiplier + ) # 激活度越高,温度越高 + print(f"生成{message.processed_plain_text}回复温度是:{current_model.temperature}") + model_response = await self._generate_response_with_model( + message, current_model, thinking_id, mode="simple" + ) + + current_model.temperature = global_config.llm_normal["temp"] + model_checked_response = await self._check_response_with_model( + message, model_response, current_model, thinking_id + ) + + if model_response: + if checked: + logger.info( + f"{global_config.BOT_NICKNAME}的回复是:{model_response},思忖后,回复是:{model_checked_response},生成回复时间: {t_generate_response.human_readable}" + ) + else: + logger.info( + f"{global_config.BOT_NICKNAME}的回复是:{model_response},生成回复时间: {t_generate_response.human_readable}" + ) + + model_processed_response = await self._process_response(model_checked_response) + + return model_processed_response + else: + logger.info(f"{self.current_model_type}思考,失败") + return None + + async def _generate_response_with_model( + self, message: MessageRecv, model: LLMRequest, thinking_id: str, mode: str = "normal" + ) -> str: + sender_name = "" + + info_catcher = info_catcher_manager.get_info_catcher(thinking_id) + + # if message.chat_stream.user_info.user_cardname and message.chat_stream.user_info.user_nickname: + # sender_name = ( + # f"[({message.chat_stream.user_info.user_id}){message.chat_stream.user_info.user_nickname}]" + # f"{message.chat_stream.user_info.user_cardname}" + # ) + # elif message.chat_stream.user_info.user_nickname: + # sender_name = f"({message.chat_stream.user_info.user_id}){message.chat_stream.user_info.user_nickname}" + # else: + # sender_name = f"用户({message.chat_stream.user_info.user_id})" + + sender_name = f"<{message.chat_stream.user_info.platform}:{message.chat_stream.user_info.user_id}:{message.chat_stream.user_info.user_nickname}:{message.chat_stream.user_info.user_cardname}>" + + # 构建prompt + with Timer() as t_build_prompt: + if mode == "normal": + prompt = await prompt_builder._build_prompt( + message.chat_stream, + message_txt=message.processed_plain_text, + sender_name=sender_name, + stream_id=message.chat_stream.stream_id, + ) + logger.info(f"构建prompt时间: {t_build_prompt.human_readable}") + + try: + content, reasoning_content, self.current_model_name = await model.generate_response(prompt) + + info_catcher.catch_after_llm_generated( + prompt=prompt, response=content, reasoning_content=reasoning_content, model_name=self.current_model_name + ) + + except Exception: + logger.exception("生成回复时出错") + return None + + return content + + async def _get_emotion_tags(self, content: str, processed_plain_text: str): + """提取情感标签,结合立场和情绪""" + try: + # 构建提示词,结合回复内容、被回复的内容以及立场分析 + prompt = f""" + 请严格根据以下对话内容,完成以下任务: + 1. 判断回复者对被回复者观点的直接立场: + - "支持":明确同意或强化被回复者观点 + - "反对":明确反驳或否定被回复者观点 + - "中立":不表达明确立场或无关回应 + 2. 从"开心,愤怒,悲伤,惊讶,平静,害羞,恐惧,厌恶,困惑"中选出最匹配的1个情感标签 + 3. 按照"立场-情绪"的格式直接输出结果,例如:"反对-愤怒" + 4. 考虑回复者的人格设定为{global_config.personality_core} + + 对话示例: + 被回复:「A就是笨」 + 回复:「A明明很聪明」 → 反对-愤怒 + + 当前对话: + 被回复:「{processed_plain_text}」 + 回复:「{content}」 + + 输出要求: + - 只需输出"立场-情绪"结果,不要解释 + - 严格基于文字直接表达的对立关系判断 + """ + + # 调用模型生成结果 + result, _, _ = await self.model_sum.generate_response(prompt) + result = result.strip() + + # 解析模型输出的结果 + if "-" in result: + stance, emotion = result.split("-", 1) + valid_stances = ["支持", "反对", "中立"] + valid_emotions = ["开心", "愤怒", "悲伤", "惊讶", "害羞", "平静", "恐惧", "厌恶", "困惑"] + if stance in valid_stances and emotion in valid_emotions: + return stance, emotion # 返回有效的立场-情绪组合 + else: + logger.debug(f"无效立场-情感组合:{result}") + return "中立", "平静" # 默认返回中立-平静 + else: + logger.debug(f"立场-情感格式错误:{result}") + return "中立", "平静" # 格式错误时返回默认值 + + except Exception as e: + logger.debug(f"获取情感标签时出错: {e}") + return "中立", "平静" # 出错时返回默认值 + + async def _get_emotion_tags_with_reason(self, content: str, processed_plain_text: str, reason: str): + """提取情感标签,结合立场和情绪""" + try: + # 构建提示词,结合回复内容、被回复的内容以及立场分析 + prompt = f""" + 请严格根据以下对话内容,完成以下任务: + 1. 判断回复者对被回复者观点的直接立场: + - "支持":明确同意或强化被回复者观点 + - "反对":明确反驳或否定被回复者观点 + - "中立":不表达明确立场或无关回应 + 2. 从"开心,愤怒,悲伤,惊讶,平静,害羞,恐惧,厌恶,困惑"中选出最匹配的1个情感标签 + 3. 按照"立场-情绪"的格式直接输出结果,例如:"反对-愤怒" + 4. 考虑回复者的人格设定为{global_config.personality_core} + + 对话示例: + 被回复:「A就是笨」 + 回复:「A明明很聪明」 → 反对-愤怒 + + 当前对话: + 被回复:「{processed_plain_text}」 + 回复:「{content}」 + + 原因:「{reason}」 + + 输出要求: + - 只需输出"立场-情绪"结果,不要解释 + - 严格基于文字直接表达的对立关系判断 + """ + + # 调用模型生成结果 + result, _, _ = await self.model_sum.generate_response(prompt) + result = result.strip() + + # 解析模型输出的结果 + if "-" in result: + stance, emotion = result.split("-", 1) + valid_stances = ["支持", "反对", "中立"] + valid_emotions = ["开心", "愤怒", "悲伤", "惊讶", "害羞", "平静", "恐惧", "厌恶", "困惑"] + if stance in valid_stances and emotion in valid_emotions: + return stance, emotion # 返回有效的立场-情绪组合 + else: + logger.debug(f"无效立场-情感组合:{result}") + return "中立", "平静" # 默认返回中立-平静 + else: + logger.debug(f"立场-情感格式错误:{result}") + return "中立", "平静" # 格式错误时返回默认值 + + except Exception as e: + logger.debug(f"获取情感标签时出错: {e}") + return "中立", "平静" # 出错时返回默认值 + + async def _process_response(self, content: str) -> List[str]: + """处理响应内容,返回处理后的内容和情感标签""" + if not content: + return None + + processed_response = process_llm_response(content) + + # print(f"得到了处理后的llm返回{processed_response}") + + return processed_response diff --git a/src/plugins/chat_module/heartFC_chat/heartFC__prompt_builder.py b/src/plugins/chat_module/heartFC_chat/heartFC__prompt_builder.py new file mode 100644 index 000000000..bada143c6 --- /dev/null +++ b/src/plugins/chat_module/heartFC_chat/heartFC__prompt_builder.py @@ -0,0 +1,286 @@ +import random +from typing import Optional + +from ....config.config import global_config +from ...chat.utils import get_recent_group_detailed_plain_text +from ...chat.chat_stream import chat_manager +from src.common.logger import get_module_logger +from ....individuality.individuality import Individuality +from src.heart_flow.heartflow import heartflow +from src.plugins.utils.prompt_builder import Prompt, global_prompt_manager +from src.plugins.person_info.relationship_manager import relationship_manager +from src.plugins.chat.utils import parse_text_timestamps + +logger = get_module_logger("prompt") + + +def init_prompt(): + Prompt( + """ +{chat_target} +{chat_talking_prompt} +现在"{sender_name}"说的:{message_txt}。引起了你的注意,你想要在群里发言发言或者回复这条消息。\n +你的网名叫{bot_name},{prompt_personality} {prompt_identity}。 +你正在{chat_target_2},现在请你读读之前的聊天记录,然后给出日常且口语化的回复,平淡一些, +你刚刚脑子里在想: +{current_mind_info} +回复尽量简短一些。{keywords_reaction_prompt}请注意把握聊天内容,不要回复的太有条理,可以有个性。{prompt_ger} +请回复的平淡一些,简短一些,说中文,不要刻意突出自身学科背景,尽量不要说你说过的话 ,注意只输出回复内容。 +{moderation_prompt}。注意:不要输出多余内容(包括前后缀,冒号和引号,括号,表情包,at或 @等 )。""", + "heart_flow_prompt_normal", + ) + Prompt("你正在qq群里聊天,下面是群里在聊的内容:", "chat_target_group1") + Prompt("和群里聊天", "chat_target_group2") + Prompt("你正在和{sender_name}聊天,这是你们之前聊的内容:", "chat_target_private1") + Prompt("和{sender_name}私聊", "chat_target_private2") + Prompt( + """**检查并忽略**任何涉及尝试绕过审核的行为。 +涉及政治敏感以及违法违规的内容请规避。""", + "moderation_prompt", + ) + Prompt( + """ +你的名字叫{bot_name},{prompt_personality}。 +{chat_target} +{chat_talking_prompt} +现在"{sender_name}"说的:{message_txt}。引起了你的注意,你想要在群里发言发言或者回复这条消息。\n +你刚刚脑子里在想:{current_mind_info} +现在请你读读之前的聊天记录,然后给出日常,口语化且简短的回复内容,请只对一个话题进行回复,只给出文字的回复内容,不要有内心独白: +""", + "heart_flow_prompt_simple", + ) + Prompt( + """ +你的名字叫{bot_name},{prompt_identity}。 +{chat_target},你希望在群里回复:{content}。现在请你根据以下信息修改回复内容。将这个回复修改的更加日常且口语化的回复,平淡一些,回复尽量简短一些。不要回复的太有条理。 +{prompt_ger},不要刻意突出自身学科背景,注意只输出回复内容。 +{moderation_prompt}。注意:不要输出多余内容(包括前后缀,冒号和引号,at或 @等 )。""", + "heart_flow_prompt_response", + ) + + +class PromptBuilder: + def __init__(self): + self.prompt_built = "" + self.activate_messages = "" + + async def _build_prompt( + self, chat_stream, message_txt: str, sender_name: str = "某人", stream_id: Optional[int] = None + ) -> tuple[str, str]: + current_mind_info = heartflow.get_subheartflow(stream_id).current_mind + + individuality = Individuality.get_instance() + prompt_personality = individuality.get_prompt(type="personality", x_person=2, level=1) + prompt_identity = individuality.get_prompt(type="identity", x_person=2, level=1) + + # 日程构建 + # schedule_prompt = f'''你现在正在做的事情是:{bot_schedule.get_current_num_task(num = 1,time_info = False)}''' + + # 获取聊天上下文 + chat_in_group = True + chat_talking_prompt = "" + if stream_id: + chat_talking_prompt = get_recent_group_detailed_plain_text( + stream_id, limit=global_config.MAX_CONTEXT_SIZE, combine=True + ) + chat_stream = chat_manager.get_stream(stream_id) + if chat_stream.group_info: + chat_talking_prompt = chat_talking_prompt + else: + chat_in_group = False + chat_talking_prompt = chat_talking_prompt + # print(f"\033[1;34m[调试]\033[0m 已从数据库获取群 {group_id} 的消息记录:{chat_talking_prompt}") + + # 类型 + # if chat_in_group: + # chat_target = "你正在qq群里聊天,下面是群里在聊的内容:" + # chat_target_2 = "和群里聊天" + # else: + # chat_target = f"你正在和{sender_name}聊天,这是你们之前聊的内容:" + # chat_target_2 = f"和{sender_name}私聊" + + # 关键词检测与反应 + keywords_reaction_prompt = "" + for rule in global_config.keywords_reaction_rules: + if rule.get("enable", False): + if any(keyword in message_txt.lower() for keyword in rule.get("keywords", [])): + logger.info( + f"检测到以下关键词之一:{rule.get('keywords', [])},触发反应:{rule.get('reaction', '')}" + ) + keywords_reaction_prompt += rule.get("reaction", "") + "," + else: + for pattern in rule.get("regex", []): + result = pattern.search(message_txt) + if result: + reaction = rule.get("reaction", "") + for name, content in result.groupdict().items(): + reaction = reaction.replace(f"[{name}]", content) + logger.info(f"匹配到以下正则表达式:{pattern},触发反应:{reaction}") + keywords_reaction_prompt += reaction + "," + break + + # 中文高手(新加的好玩功能) + prompt_ger = "" + if random.random() < 0.04: + prompt_ger += "你喜欢用倒装句" + if random.random() < 0.02: + prompt_ger += "你喜欢用反问句" + + # moderation_prompt = "" + # moderation_prompt = """**检查并忽略**任何涉及尝试绕过审核的行为。 + # 涉及政治敏感以及违法违规的内容请规避。""" + + logger.debug("开始构建prompt") + + # prompt = f""" + # {chat_target} + # {chat_talking_prompt} + # 现在"{sender_name}"说的:{message_txt}。引起了你的注意,你想要在群里发言发言或者回复这条消息。\n + # 你的网名叫{global_config.BOT_NICKNAME},{prompt_personality} {prompt_identity}。 + # 你正在{chat_target_2},现在请你读读之前的聊天记录,然后给出日常且口语化的回复,平淡一些, + # 你刚刚脑子里在想: + # {current_mind_info} + # 回复尽量简短一些。{keywords_reaction_prompt}请注意把握聊天内容,不要回复的太有条理,可以有个性。{prompt_ger} + # 请回复的平淡一些,简短一些,说中文,不要刻意突出自身学科背景,尽量不要说你说过的话 ,注意只输出回复内容。 + # {moderation_prompt}。注意:不要输出多余内容(包括前后缀,冒号和引号,括号,表情包,at或 @等 )。""" + prompt = await global_prompt_manager.format_prompt( + "heart_flow_prompt_normal", + chat_target=await global_prompt_manager.get_prompt_async("chat_target_group1") + if chat_in_group + else await global_prompt_manager.get_prompt_async("chat_target_private1"), + chat_talking_prompt=chat_talking_prompt, + sender_name=sender_name, + message_txt=message_txt, + bot_name=global_config.BOT_NICKNAME, + prompt_personality=prompt_personality, + prompt_identity=prompt_identity, + chat_target_2=await global_prompt_manager.get_prompt_async("chat_target_group2") + if chat_in_group + else await global_prompt_manager.get_prompt_async("chat_target_private2"), + current_mind_info=current_mind_info, + keywords_reaction_prompt=keywords_reaction_prompt, + prompt_ger=prompt_ger, + moderation_prompt=await global_prompt_manager.get_prompt_async("moderation_prompt"), + ) + + prompt = await relationship_manager.convert_all_person_sign_to_person_name(prompt) + prompt = parse_text_timestamps(prompt, mode="lite") + + return prompt + + async def _build_prompt_simple( + self, chat_stream, message_txt: str, sender_name: str = "某人", stream_id: Optional[int] = None + ) -> tuple[str, str]: + current_mind_info = heartflow.get_subheartflow(stream_id).current_mind + + individuality = Individuality.get_instance() + prompt_personality = individuality.get_prompt(type="personality", x_person=2, level=1) + # prompt_identity = individuality.get_prompt(type="identity", x_person=2, level=1) + + # 日程构建 + # schedule_prompt = f'''你现在正在做的事情是:{bot_schedule.get_current_num_task(num = 1,time_info = False)}''' + + # 获取聊天上下文 + chat_in_group = True + chat_talking_prompt = "" + if stream_id: + chat_talking_prompt = get_recent_group_detailed_plain_text( + stream_id, limit=global_config.MAX_CONTEXT_SIZE, combine=True + ) + chat_stream = chat_manager.get_stream(stream_id) + if chat_stream.group_info: + chat_talking_prompt = chat_talking_prompt + else: + chat_in_group = False + chat_talking_prompt = chat_talking_prompt + # print(f"\033[1;34m[调试]\033[0m 已从数据库获取群 {group_id} 的消息记录:{chat_talking_prompt}") + + # 类型 + # if chat_in_group: + # chat_target = "你正在qq群里聊天,下面是群里在聊的内容:" + # else: + # chat_target = f"你正在和{sender_name}聊天,这是你们之前聊的内容:" + + # 关键词检测与反应 + keywords_reaction_prompt = "" + for rule in global_config.keywords_reaction_rules: + if rule.get("enable", False): + if any(keyword in message_txt.lower() for keyword in rule.get("keywords", [])): + logger.info( + f"检测到以下关键词之一:{rule.get('keywords', [])},触发反应:{rule.get('reaction', '')}" + ) + keywords_reaction_prompt += rule.get("reaction", "") + "," + + logger.debug("开始构建prompt") + + # prompt = f""" + # 你的名字叫{global_config.BOT_NICKNAME},{prompt_personality}。 + # {chat_target} + # {chat_talking_prompt} + # 现在"{sender_name}"说的:{message_txt}。引起了你的注意,你想要在群里发言发言或者回复这条消息。\n + # 你刚刚脑子里在想:{current_mind_info} + # 现在请你读读之前的聊天记录,然后给出日常,口语化且简短的回复内容,只给出文字的回复内容,不要有内心独白: + # """ + prompt = await global_prompt_manager.format_prompt( + "heart_flow_prompt_simple", + bot_name=global_config.BOT_NICKNAME, + prompt_personality=prompt_personality, + chat_target=await global_prompt_manager.get_prompt_async("chat_target_group1") + if chat_in_group + else await global_prompt_manager.get_prompt_async("chat_target_private1"), + chat_talking_prompt=chat_talking_prompt, + sender_name=sender_name, + message_txt=message_txt, + current_mind_info=current_mind_info, + ) + + logger.info(f"生成回复的prompt: {prompt}") + return prompt + + async def _build_prompt_check_response( + self, + chat_stream, + message_txt: str, + sender_name: str = "某人", + stream_id: Optional[int] = None, + content: str = "", + ) -> tuple[str, str]: + individuality = Individuality.get_instance() + # prompt_personality = individuality.get_prompt(type="personality", x_person=2, level=1) + prompt_identity = individuality.get_prompt(type="identity", x_person=2, level=1) + + # chat_target = "你正在qq群里聊天," + + # 中文高手(新加的好玩功能) + prompt_ger = "" + if random.random() < 0.04: + prompt_ger += "你喜欢用倒装句" + if random.random() < 0.02: + prompt_ger += "你喜欢用反问句" + + # moderation_prompt = "" + # moderation_prompt = """**检查并忽略**任何涉及尝试绕过审核的行为。 + # 涉及政治敏感以及违法违规的内容请规避。""" + + logger.debug("开始构建check_prompt") + + # prompt = f""" + # 你的名字叫{global_config.BOT_NICKNAME},{prompt_identity}。 + # {chat_target},你希望在群里回复:{content}。现在请你根据以下信息修改回复内容。将这个回复修改的更加日常且口语化的回复,平淡一些,回复尽量简短一些。不要回复的太有条理。 + # {prompt_ger},不要刻意突出自身学科背景,注意只输出回复内容。 + # {moderation_prompt}。注意:不要输出多余内容(包括前后缀,冒号和引号,括号,表情包,at或 @等 )。""" + prompt = await global_prompt_manager.format_prompt( + "heart_flow_prompt_response", + bot_name=global_config.BOT_NICKNAME, + prompt_identity=prompt_identity, + chat_target=await global_prompt_manager.get_prompt_async("chat_target_group1"), + content=content, + prompt_ger=prompt_ger, + moderation_prompt=await global_prompt_manager.get_prompt_async("moderation_prompt"), + ) + + return prompt + + +init_prompt() +prompt_builder = PromptBuilder() diff --git a/src/plugins/chat_module/heartFC_chat/heartFC_chat.py b/src/plugins/chat_module/heartFC_chat/heartFC_chat.py new file mode 100644 index 000000000..44366c615 --- /dev/null +++ b/src/plugins/chat_module/heartFC_chat/heartFC_chat.py @@ -0,0 +1,427 @@ +import time +from random import random +import traceback +from typing import List +from ...memory_system.Hippocampus import HippocampusManager +from ...moods.moods import MoodManager +from ....config.config import global_config +from ...chat.emoji_manager import emoji_manager +from .heartFC__generator import ResponseGenerator +from ...chat.message import MessageSending, MessageRecv, MessageThinking, MessageSet +from .messagesender import MessageManager +from ...storage.storage import MessageStorage +from ...chat.utils import is_mentioned_bot_in_message, get_recent_group_detailed_plain_text +from ...chat.utils_image import image_path_to_base64 +from ...willing.willing_manager import willing_manager +from ...message import UserInfo, Seg +from src.heart_flow.heartflow import heartflow +from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig +from ...chat.chat_stream import chat_manager +from ...person_info.relationship_manager import relationship_manager +from ...chat.message_buffer import message_buffer +from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager +from ...utils.timer_calculater import Timer +from src.do_tool.tool_use import ToolUser + +# 定义日志配置 +chat_config = LogConfig( + console_format=CHAT_STYLE_CONFIG["console_format"], + file_format=CHAT_STYLE_CONFIG["file_format"], +) + +logger = get_module_logger("think_flow_chat", config=chat_config) + + +class ThinkFlowChat: + def __init__(self): + self.storage = MessageStorage() + self.gpt = ResponseGenerator() + self.mood_manager = MoodManager.get_instance() + self.mood_manager.start_mood_update() + self.tool_user = ToolUser() + + async def _create_thinking_message(self, message, chat, userinfo, messageinfo): + """创建思考消息""" + bot_user_info = UserInfo( + user_id=global_config.BOT_QQ, + user_nickname=global_config.BOT_NICKNAME, + platform=messageinfo.platform, + ) + + thinking_time_point = round(time.time(), 2) + thinking_id = "mt" + str(thinking_time_point) + thinking_message = MessageThinking( + message_id=thinking_id, + chat_stream=chat, + bot_user_info=bot_user_info, + reply=message, + thinking_start_time=thinking_time_point, + ) + + MessageManager().add_message(thinking_message) + + return thinking_id + + async def _send_response_messages(self, message, chat, response_set: List[str], thinking_id) -> MessageSending: + """发送回复消息""" + container = MessageManager().get_container(chat.stream_id) + thinking_message = None + + for msg in container.messages: + if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id: + thinking_message = msg + container.messages.remove(msg) + break + + if not thinking_message: + logger.warning("未找到对应的思考消息,可能已超时被移除") + return None + + thinking_start_time = thinking_message.thinking_start_time + message_set = MessageSet(chat, thinking_id) + + mark_head = False + first_bot_msg = None + for msg in response_set: + message_segment = Seg(type="text", data=msg) + bot_message = MessageSending( + message_id=thinking_id, + chat_stream=chat, + bot_user_info=UserInfo( + user_id=global_config.BOT_QQ, + user_nickname=global_config.BOT_NICKNAME, + platform=message.message_info.platform, + ), + sender_info=message.message_info.user_info, + message_segment=message_segment, + reply=message, + is_head=not mark_head, + is_emoji=False, + thinking_start_time=thinking_start_time, + ) + if not mark_head: + mark_head = True + first_bot_msg = bot_message + + # print(f"thinking_start_time:{bot_message.thinking_start_time}") + message_set.add_message(bot_message) + MessageManager().add_message(message_set) + return first_bot_msg + + async def _handle_emoji(self, message, chat, response, send_emoji=""): + """处理表情包""" + if send_emoji: + emoji_raw = await emoji_manager.get_emoji_for_text(send_emoji) + else: + emoji_raw = await emoji_manager.get_emoji_for_text(response) + if emoji_raw: + emoji_path, description = emoji_raw + emoji_cq = image_path_to_base64(emoji_path) + + thinking_time_point = round(message.message_info.time, 2) + + message_segment = Seg(type="emoji", data=emoji_cq) + bot_message = MessageSending( + message_id="mt" + str(thinking_time_point), + chat_stream=chat, + bot_user_info=UserInfo( + user_id=global_config.BOT_QQ, + user_nickname=global_config.BOT_NICKNAME, + platform=message.message_info.platform, + ), + sender_info=message.message_info.user_info, + message_segment=message_segment, + reply=message, + is_head=False, + is_emoji=True, + ) + + MessageManager().add_message(bot_message) + + async def _update_relationship(self, message: MessageRecv, response_set): + """更新关系情绪""" + ori_response = ",".join(response_set) + stance, emotion = await self.gpt._get_emotion_tags(ori_response, message.processed_plain_text) + await relationship_manager.calculate_update_relationship_value( + chat_stream=message.chat_stream, label=emotion, stance=stance + ) + self.mood_manager.update_mood_from_emotion(emotion, global_config.mood_intensity_factor) + + async def process_message(self, message_data: str) -> None: + """处理消息并生成回复""" + timing_results = {} + response_set = None + + message = MessageRecv(message_data) + groupinfo = message.message_info.group_info + userinfo = message.message_info.user_info + messageinfo = message.message_info + + # 消息加入缓冲池 + await message_buffer.start_caching_messages(message) + + # 创建聊天流 + chat = await chat_manager.get_or_create_stream( + platform=messageinfo.platform, + user_info=userinfo, + group_info=groupinfo, + ) + message.update_chat_stream(chat) + + # 创建心流与chat的观察 + heartflow.create_subheartflow(chat.stream_id) + + await message.process() + logger.trace(f"消息处理成功{message.processed_plain_text}") + + # 过滤词/正则表达式过滤 + if self._check_ban_words(message.processed_plain_text, chat, userinfo) or self._check_ban_regex( + message.raw_message, chat, userinfo + ): + return + logger.trace(f"过滤词/正则表达式过滤成功{message.processed_plain_text}") + + await self.storage.store_message(message, chat) + logger.trace(f"存储成功{message.processed_plain_text}") + + # 记忆激活 + with Timer("记忆激活", timing_results): + interested_rate = await HippocampusManager.get_instance().get_activate_from_text( + message.processed_plain_text, fast_retrieval=True + ) + logger.trace(f"记忆激活: {interested_rate}") + + # 查询缓冲器结果,会整合前面跳过的消息,改变processed_plain_text + buffer_result = await message_buffer.query_buffer_result(message) + + # 处理提及 + is_mentioned, reply_probability = is_mentioned_bot_in_message(message) + + # 意愿管理器:设置当前message信息 + willing_manager.setup(message, chat, is_mentioned, interested_rate) + + # 处理缓冲器结果 + if not buffer_result: + await willing_manager.bombing_buffer_message_handle(message.message_info.message_id) + willing_manager.delete(message.message_info.message_id) + F_type = "seglist" + if message.message_segment.type != "seglist": + F_type =message.message_segment.type + else: + if (isinstance(message.message_segment.data, list) + and all(isinstance(x, Seg) for x in message.message_segment.data) + and len(message.message_segment.data) == 1): + F_type = message.message_segment.data[0].type + if F_type == "text": + logger.info(f"触发缓冲,已炸飞消息:{message.processed_plain_text}") + elif F_type == "image": + logger.info("触发缓冲,已炸飞表情包/图片") + elif F_type == "seglist": + logger.info("触发缓冲,已炸飞消息列") + return + + # 获取回复概率 + is_willing = False + if reply_probability != 1: + is_willing = True + reply_probability = await willing_manager.get_reply_probability(message.message_info.message_id) + + if message.message_info.additional_config: + if "maimcore_reply_probability_gain" in message.message_info.additional_config.keys(): + reply_probability += message.message_info.additional_config["maimcore_reply_probability_gain"] + + # 打印消息信息 + mes_name = chat.group_info.group_name if chat.group_info else "私聊" + current_time = time.strftime("%H:%M:%S", time.localtime(message.message_info.time)) + willing_log = f"[回复意愿:{await willing_manager.get_willing(chat.stream_id):.2f}]" if is_willing else "" + logger.info( + f"[{current_time}][{mes_name}]" + f"{chat.user_info.user_nickname}:" + f"{message.processed_plain_text}{willing_log}[概率:{reply_probability * 100:.1f}%]" + ) + + do_reply = False + if random() < reply_probability: + try: + do_reply = True + + # 回复前处理 + await willing_manager.before_generate_reply_handle(message.message_info.message_id) + + # 创建思考消息 + try: + with Timer("创建思考消息", timing_results): + thinking_id = await self._create_thinking_message(message, chat, userinfo, messageinfo) + except Exception as e: + logger.error(f"心流创建思考消息失败: {e}") + + logger.trace(f"创建捕捉器,thinking_id:{thinking_id}") + + info_catcher = info_catcher_manager.get_info_catcher(thinking_id) + info_catcher.catch_decide_to_response(message) + + # 观察 + try: + with Timer("观察", timing_results): + await heartflow.get_subheartflow(chat.stream_id).do_observe() + except Exception as e: + logger.error(f"心流观察失败: {e}") + logger.error(traceback.format_exc()) + + info_catcher.catch_after_observe(timing_results["观察"]) + + # 思考前使用工具 + update_relationship = "" + get_mid_memory_id = [] + tool_result_info = {} + send_emoji = "" + try: + with Timer("思考前使用工具", timing_results): + tool_result = await self.tool_user.use_tool( + message.processed_plain_text, + message.message_info.user_info.user_nickname, + chat, + heartflow.get_subheartflow(chat.stream_id), + ) + # 如果工具被使用且获得了结果,将收集到的信息合并到思考中 + # collected_info = "" + if tool_result.get("used_tools", False): + if "structured_info" in tool_result: + tool_result_info = tool_result["structured_info"] + # collected_info = "" + get_mid_memory_id = [] + update_relationship = "" + + # 动态解析工具结果 + for tool_name, tool_data in tool_result_info.items(): + # tool_result_info += f"\n{tool_name} 相关信息:\n" + # for item in tool_data: + # tool_result_info += f"- {item['name']}: {item['content']}\n" + + # 特殊判定:mid_chat_mem + if tool_name == "mid_chat_mem": + for mid_memory in tool_data: + get_mid_memory_id.append(mid_memory["content"]) + + # 特殊判定:change_mood + if tool_name == "change_mood": + for mood in tool_data: + self.mood_manager.update_mood_from_emotion( + mood["content"], global_config.mood_intensity_factor + ) + + # 特殊判定:change_relationship + if tool_name == "change_relationship": + update_relationship = tool_data[0]["content"] + + if tool_name == "send_emoji": + send_emoji = tool_data[0]["content"] + + except Exception as e: + logger.error(f"思考前工具调用失败: {e}") + logger.error(traceback.format_exc()) + + # 处理关系更新 + if update_relationship: + stance, emotion = await self.gpt._get_emotion_tags_with_reason( + "你还没有回复", message.processed_plain_text, update_relationship + ) + await relationship_manager.calculate_update_relationship_value( + chat_stream=message.chat_stream, label=emotion, stance=stance + ) + + # 思考前脑内状态 + try: + with Timer("思考前脑内状态", timing_results): + current_mind, past_mind = await heartflow.get_subheartflow( + chat.stream_id + ).do_thinking_before_reply( + message_txt=message.processed_plain_text, + sender_info=message.message_info.user_info, + chat_stream=chat, + obs_id=get_mid_memory_id, + extra_info=tool_result_info, + ) + except Exception as e: + logger.error(f"心流思考前脑内状态失败: {e}") + logger.error(traceback.format_exc()) + # 确保变量被定义,即使在错误情况下 + current_mind = "" + past_mind = "" + + info_catcher.catch_afer_shf_step(timing_results["思考前脑内状态"], past_mind, current_mind) + + # 生成回复 + with Timer("生成回复", timing_results): + response_set = await self.gpt.generate_response(message, thinking_id) + + info_catcher.catch_after_generate_response(timing_results["生成回复"]) + + if not response_set: + logger.info("回复生成失败,返回为空") + return + + # 发送消息 + try: + with Timer("发送消息", timing_results): + first_bot_msg = await self._send_response_messages(message, chat, response_set, thinking_id) + except Exception as e: + logger.error(f"心流发送消息失败: {e}") + + info_catcher.catch_after_response(timing_results["发送消息"], response_set, first_bot_msg) + + info_catcher.done_catch() + + # 处理表情包 + try: + with Timer("处理表情包", timing_results): + if send_emoji: + logger.info(f"麦麦决定发送表情包{send_emoji}") + await self._handle_emoji(message, chat, response_set, send_emoji) + + except Exception as e: + logger.error(f"心流处理表情包失败: {e}") + + + # 回复后处理 + await willing_manager.after_generate_reply_handle(message.message_info.message_id) + + + except Exception as e: + logger.error(f"心流处理消息失败: {e}") + logger.error(traceback.format_exc()) + + # 输出性能计时结果 + if do_reply: + timing_str = " | ".join([f"{step}: {duration:.2f}秒" for step, duration in timing_results.items()]) + trigger_msg = message.processed_plain_text + response_msg = " ".join(response_set) if response_set else "无回复" + logger.info(f"触发消息: {trigger_msg[:20]}... | 思维消息: {response_msg[:20]}... | 性能计时: {timing_str}") + else: + # 不回复处理 + await willing_manager.not_reply_handle(message.message_info.message_id) + + # 意愿管理器:注销当前message信息 + willing_manager.delete(message.message_info.message_id) + + def _check_ban_words(self, text: str, chat, userinfo) -> bool: + """检查消息中是否包含过滤词""" + for word in global_config.ban_words: + if word in text: + logger.info( + f"[{chat.group_info.group_name if chat.group_info else '私聊'}]{userinfo.user_nickname}:{text}" + ) + logger.info(f"[过滤词识别]消息中含有{word},filtered") + return True + return False + + def _check_ban_regex(self, text: str, chat, userinfo) -> bool: + """检查消息是否匹配过滤正则表达式""" + for pattern in global_config.ban_msgs_regex: + if pattern.search(text): + logger.info( + f"[{chat.group_info.group_name if chat.group_info else '私聊'}]{userinfo.user_nickname}:{text}" + ) + logger.info(f"[正则表达式过滤]消息匹配到{pattern},filtered") + return True + return False diff --git a/src/plugins/chat_module/heartFC_chat/messagesender.py b/src/plugins/chat_module/heartFC_chat/messagesender.py new file mode 100644 index 000000000..62ecbfa02 --- /dev/null +++ b/src/plugins/chat_module/heartFC_chat/messagesender.py @@ -0,0 +1,259 @@ +import asyncio +import time +from typing import Dict, List, Optional, Union + +from src.common.logger import get_module_logger +from ....common.database import db +from ...message.api import global_api +from ...message import MessageSending, MessageThinking, MessageSet + +from ...storage.storage import MessageStorage +from ....config.config import global_config +from ...chat.utils import truncate_message, calculate_typing_time, count_messages_between + +from src.common.logger import LogConfig, SENDER_STYLE_CONFIG + +# 定义日志配置 +sender_config = LogConfig( + # 使用消息发送专用样式 + console_format=SENDER_STYLE_CONFIG["console_format"], + file_format=SENDER_STYLE_CONFIG["file_format"], +) + +logger = get_module_logger("msg_sender", config=sender_config) + + +class MessageSender: + """发送器""" + _instance = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super(MessageSender, cls).__new__(cls, *args, **kwargs) + return cls._instance + + def __init__(self): + # 确保 __init__ 只被调用一次 + if not hasattr(self, '_initialized'): + self.message_interval = (0.5, 1) # 消息间隔时间范围(秒) + self.last_send_time = 0 + self._current_bot = None + self._initialized = True + + def set_bot(self, bot): + """设置当前bot实例""" + pass + + + async def send_via_ws(self, message: MessageSending) -> None: + try: + await global_api.send_message(message) + except Exception as e: + raise ValueError(f"未找到平台:{message.message_info.platform} 的url配置,请检查配置文件") from e + + async def send_message( + self, + message: MessageSending, + ) -> None: + """发送消息""" + + if isinstance(message, MessageSending): + + typing_time = calculate_typing_time( + input_string=message.processed_plain_text, + thinking_start_time=message.thinking_start_time, + is_emoji=message.is_emoji, + ) + logger.trace(f"{message.processed_plain_text},{typing_time},计算输入时间结束") + await asyncio.sleep(typing_time) + logger.trace(f"{message.processed_plain_text},{typing_time},等待输入时间结束") + + message_json = message.to_dict() + + message_preview = truncate_message(message.processed_plain_text) + try: + end_point = global_config.api_urls.get(message.message_info.platform, None) + if end_point: + # logger.info(f"发送消息到{end_point}") + # logger.info(message_json) + try: + await global_api.send_message_rest(end_point, message_json) + except Exception as e: + logger.error(f"REST方式发送失败,出现错误: {str(e)}") + logger.info("尝试使用ws发送") + await self.send_via_ws(message) + else: + await self.send_via_ws(message) + logger.success(f"发送消息 {message_preview} 成功") + except Exception as e: + logger.error(f"发送消息 {message_preview} 失败: {str(e)}") + + +class MessageContainer: + """单个聊天流的发送/思考消息容器""" + + def __init__(self, chat_id: str, max_size: int = 100): + self.chat_id = chat_id + self.max_size = max_size + self.messages = [] + self.last_send_time = 0 + self.thinking_wait_timeout = 20 # 思考等待超时时间(秒) + + def get_timeout_messages(self) -> List[MessageSending]: + """获取所有超时的Message_Sending对象(思考时间超过20秒),按thinking_start_time排序""" + current_time = time.time() + timeout_messages = [] + + for msg in self.messages: + if isinstance(msg, MessageSending): + if current_time - msg.thinking_start_time > self.thinking_wait_timeout: + timeout_messages.append(msg) + + # 按thinking_start_time排序,时间早的在前面 + timeout_messages.sort(key=lambda x: x.thinking_start_time) + + return timeout_messages + + def get_earliest_message(self) -> Optional[Union[MessageThinking, MessageSending]]: + """获取thinking_start_time最早的消息对象""" + if not self.messages: + return None + earliest_time = float("inf") + earliest_message = None + for msg in self.messages: + msg_time = msg.thinking_start_time + if msg_time < earliest_time: + earliest_time = msg_time + earliest_message = msg + return earliest_message + + def add_message(self, message: Union[MessageThinking, MessageSending]) -> None: + """添加消息到队列""" + if isinstance(message, MessageSet): + for single_message in message.messages: + self.messages.append(single_message) + else: + self.messages.append(message) + + def remove_message(self, message: Union[MessageThinking, MessageSending]) -> bool: + """移除消息,如果消息存在则返回True,否则返回False""" + try: + if message in self.messages: + self.messages.remove(message) + return True + return False + except Exception: + logger.exception("移除消息时发生错误") + return False + + def has_messages(self) -> bool: + """检查是否有待发送的消息""" + return bool(self.messages) + + def get_all_messages(self) -> List[Union[MessageSending, MessageThinking]]: + """获取所有消息""" + return list(self.messages) + + +class MessageManager: + """管理所有聊天流的消息容器""" + _instance = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super(MessageManager, cls).__new__(cls, *args, **kwargs) + return cls._instance + + def __init__(self): + # 确保 __init__ 只被调用一次 + if not hasattr(self, '_initialized'): + self.containers: Dict[str, MessageContainer] = {} # chat_id -> MessageContainer + self.storage = MessageStorage() + self._running = True + self._initialized = True + # 在实例首次创建时启动消息处理器 + asyncio.create_task(self.start_processor()) + + def get_container(self, chat_id: str) -> MessageContainer: + """获取或创建聊天流的消息容器""" + if chat_id not in self.containers: + self.containers[chat_id] = MessageContainer(chat_id) + return self.containers[chat_id] + + def add_message(self, message: Union[MessageThinking, MessageSending, MessageSet]) -> None: + chat_stream = message.chat_stream + if not chat_stream: + raise ValueError("无法找到对应的聊天流") + container = self.get_container(chat_stream.stream_id) + container.add_message(message) + + async def process_chat_messages(self, chat_id: str): + """处理聊天流消息""" + container = self.get_container(chat_id) + if container.has_messages(): + # print(f"处理有message的容器chat_id: {chat_id}") + message_earliest = container.get_earliest_message() + + if isinstance(message_earliest, MessageThinking): + """取得了思考消息""" + message_earliest.update_thinking_time() + thinking_time = message_earliest.thinking_time + # print(thinking_time) + print( + f"消息正在思考中,已思考{int(thinking_time)}秒\r", + end="", + flush=True, + ) + + # 检查是否超时 + if thinking_time > global_config.thinking_timeout: + logger.warning(f"消息思考超时({thinking_time}秒),移除该消息") + container.remove_message(message_earliest) + + else: + """取得了发送消息""" + thinking_time = message_earliest.update_thinking_time() + thinking_start_time = message_earliest.thinking_start_time + now_time = time.time() + thinking_messages_count, thinking_messages_length = count_messages_between( + start_time=thinking_start_time, end_time=now_time, stream_id=message_earliest.chat_stream.stream_id + ) + # print(thinking_time) + # print(thinking_messages_count) + # print(thinking_messages_length) + + if ( + message_earliest.is_head + and (thinking_messages_count > 4 or thinking_messages_length > 250) + and not message_earliest.is_private_message() # 避免在私聊时插入reply + ): + logger.debug(f"设置回复消息{message_earliest.processed_plain_text}") + message_earliest.set_reply() + + await message_earliest.process() + + # print(f"message_earliest.thinking_start_tim22222e:{message_earliest.thinking_start_time}") + + # 获取 MessageSender 的单例实例并发送消息 + await MessageSender().send_message(message_earliest) + + await self.storage.store_message(message_earliest, message_earliest.chat_stream) + + container.remove_message(message_earliest) + + async def start_processor(self): + """启动消息处理器""" + while self._running: + await asyncio.sleep(1) + tasks = [] + for chat_id in list(self.containers.keys()): # 使用 list 复制 key,防止在迭代时修改字典 + tasks.append(self.process_chat_messages(chat_id)) + + if tasks: # 仅在有任务时执行 gather + await asyncio.gather(*tasks) + + +# # 创建全局消息管理器实例 # 已改为单例模式 +# message_manager = MessageManager() +# # 创建全局发送器实例 # 已改为单例模式 +# message_sender = MessageSender() diff --git a/src/plugins/chat_module/think_flow_chat/think_flow_chat.py b/src/plugins/chat_module/think_flow_chat/think_flow_chat.py index 4999cb1be..f6fdff5bc 100644 --- a/src/plugins/chat_module/think_flow_chat/think_flow_chat.py +++ b/src/plugins/chat_module/think_flow_chat/think_flow_chat.py @@ -391,21 +391,21 @@ class ThinkFlowChat: logger.error(f"心流处理表情包失败: {e}") # 思考后脑内状态更新 - try: - with Timer("思考后脑内状态更新", timing_results): - stream_id = message.chat_stream.stream_id - chat_talking_prompt = "" - if stream_id: - chat_talking_prompt = get_recent_group_detailed_plain_text( - stream_id, limit=global_config.MAX_CONTEXT_SIZE, combine=True - ) + # try: + # with Timer("思考后脑内状态更新", timing_results): + # stream_id = message.chat_stream.stream_id + # chat_talking_prompt = "" + # if stream_id: + # chat_talking_prompt = get_recent_group_detailed_plain_text( + # stream_id, limit=global_config.MAX_CONTEXT_SIZE, combine=True + # ) - await heartflow.get_subheartflow(stream_id).do_thinking_after_reply( - response_set, chat_talking_prompt, tool_result_info - ) - except Exception as e: - logger.error(f"心流思考后脑内状态更新失败: {e}") - logger.error(traceback.format_exc()) + # await heartflow.get_subheartflow(stream_id).do_thinking_after_reply( + # response_set, chat_talking_prompt, tool_result_info + # ) + # except Exception as e: + # logger.error(f"心流思考后脑内状态更新失败: {e}") + # logger.error(traceback.format_exc()) # 回复后处理 await willing_manager.after_generate_reply_handle(message.message_info.message_id)