From c10b7eea61b9f352475363f4c92883cd6f4e055b Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Mon, 21 Apr 2025 18:37:49 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=95=B4=E5=90=88reasoning=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=E5=92=8Chfc=E6=A8=A1=E5=BC=8F=EF=BC=8C=E7=BB=9F?= =?UTF-8?q?=E4=B8=80=E8=B0=83=E6=8E=A7=EF=BC=88=E4=BD=86=E4=B8=8D=E6=98=AF?= =?UTF-8?q?=E5=BE=88=E7=BB=9F=E4=B8=80=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/config/config.py | 28 +- src/heart_flow/heartflow.py | 10 +- src/heart_flow/sub_heartflow.py | 11 +- src/main.py | 8 +- src/plugins/chat/bot.py | 2 - .../heartFC_chat/heartFC_controler.py | 82 ++-- .../heartFC_chat/heartFC_processor.py | 16 +- .../chat_module/heartFC_chat/interest.py | 12 + .../heartFC_chat/reasoning_chat.py | 412 ++++++++++++++++ .../heartFC_chat/reasoning_generator.py | 199 ++++++++ .../heartFC_chat/reasoning_prompt_builder.py | 445 ++++++++++++++++++ .../reasoning_chat/reasoning_chat.py | 43 +- .../reasoning_chat/reasoning_generator.py | 2 +- src/plugins/memory_system/Hippocampus.py | 6 +- 14 files changed, 1188 insertions(+), 88 deletions(-) create mode 100644 src/plugins/chat_module/heartFC_chat/reasoning_chat.py create mode 100644 src/plugins/chat_module/heartFC_chat/reasoning_generator.py create mode 100644 src/plugins/chat_module/heartFC_chat/reasoning_prompt_builder.py diff --git a/src/config/config.py b/src/config/config.py index d2fe6f0f2..83e478375 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -213,8 +213,8 @@ class BotConfig: # response response_mode: str = "heart_flow" # 回复策略 - model_reasoning_probability: float = 0.7 # 麦麦回答时选择推理模型(主要)模型概率 - model_normal_probability: float = 0.3 # 麦麦回答时选择一般模型(次要)模型概率 + model_reasoning_probability: float = 0.7 # 麦麦回答时选择推理模型(主要)模型概率 + model_normal_probability: float = 0.3 # 麦麦回答时选择一般模型(次要)模型概率 # MODEL_R1_DISTILL_PROBABILITY: float = 0.1 # R1蒸馏模型概率 # emoji @@ -407,10 +407,13 @@ class BotConfig: def response(parent: dict): response_config = parent["response"] - config.model_reasoning_probability = response_config.get("model_reasoning_probability", config.model_reasoning_probability) - config.model_normal_probability = response_config.get("model_normal_probability", config.model_normal_probability) - - + config.model_reasoning_probability = response_config.get( + "model_reasoning_probability", config.model_reasoning_probability + ) + config.model_normal_probability = response_config.get( + "model_normal_probability", config.model_normal_probability + ) + # 添加 enable_heart_flowC 的加载逻辑 (假设它在 [response] 部分) if config.INNER_VERSION in SpecifierSet(">=1.4.0"): config.enable_heart_flowC = response_config.get("enable_heart_flowC", config.enable_heart_flowC) @@ -418,7 +421,6 @@ class BotConfig: def heartflow(parent: dict): heartflow_config = parent["heartflow"] # 加载新增的 heartflowC 参数 - # 加载原有的 heartflow 参数 # config.sub_heart_flow_update_interval = heartflow_config.get( @@ -442,9 +444,15 @@ class BotConfig: "compress_length_limit", config.compress_length_limit ) if config.INNER_VERSION in SpecifierSet(">=1.4.0"): - config.reply_trigger_threshold = heartflow_config.get("reply_trigger_threshold", config.reply_trigger_threshold) - config.probability_decay_factor_per_second = heartflow_config.get("probability_decay_factor_per_second", config.probability_decay_factor_per_second) - config.default_decay_rate_per_second = heartflow_config.get("default_decay_rate_per_second", config.default_decay_rate_per_second) + config.reply_trigger_threshold = heartflow_config.get( + "reply_trigger_threshold", config.reply_trigger_threshold + ) + config.probability_decay_factor_per_second = heartflow_config.get( + "probability_decay_factor_per_second", config.probability_decay_factor_per_second + ) + config.default_decay_rate_per_second = heartflow_config.get( + "default_decay_rate_per_second", config.default_decay_rate_per_second + ) config.initial_duration = heartflow_config.get("initial_duration", config.initial_duration) def willing(parent: dict): diff --git a/src/heart_flow/heartflow.py b/src/heart_flow/heartflow.py index c2f922ff9..50f0a735f 100644 --- a/src/heart_flow/heartflow.py +++ b/src/heart_flow/heartflow.py @@ -45,6 +45,8 @@ class CurrentState: def __init__(self): self.current_state_info = "" + self.chat_status = "IDLE" + self.mood_manager = MoodManager() self.mood = self.mood_manager.get_prompt() @@ -70,7 +72,7 @@ class Heartflow: """定期清理不活跃的子心流""" while True: current_time = time.time() - inactive_subheartflows_ids = [] # 修改变量名以清晰表示存储的是ID + inactive_subheartflows_ids = [] # 修改变量名以清晰表示存储的是ID # 检查所有子心流 # 使用 list(self._subheartflows.items()) 避免在迭代时修改字典 @@ -104,7 +106,7 @@ class Heartflow: # await self.do_a_thinking() # await asyncio.sleep(global_config.heart_flow_update_interval * 3) # 5分钟思考一次 - + await asyncio.sleep(300) async def heartflow_start_working(self): @@ -253,7 +255,7 @@ class Heartflow: # 创建并初始化观察对象 logger.debug(f"为 {subheartflow_id} 创建 observation") observation = ChattingObservation(subheartflow_id) - await observation.initialize() # 等待初始化完成 + await observation.initialize() # 等待初始化完成 subheartflow.add_observation(observation) logger.debug(f"为 {subheartflow_id} 添加 observation 成功") @@ -269,7 +271,7 @@ class Heartflow: except Exception as e: # 记录详细错误信息 logger.error(f"创建 subheartflow {subheartflow_id} 失败: {e}") - logger.error(traceback.format_exc()) # 记录完整的 traceback + logger.error(traceback.format_exc()) # 记录完整的 traceback # 考虑是否需要更具体的错误处理或资源清理逻辑 return None diff --git a/src/heart_flow/sub_heartflow.py b/src/heart_flow/sub_heartflow.py index c1a58dcda..9087b5763 100644 --- a/src/heart_flow/sub_heartflow.py +++ b/src/heart_flow/sub_heartflow.py @@ -5,7 +5,6 @@ from src.plugins.models.utils_model import LLMRequest from src.config.config import global_config import time from typing import Optional, List -from datetime import datetime import traceback from src.plugins.chat.utils import parse_text_timestamps @@ -76,14 +75,14 @@ class SubHeartflow: ) self.main_heartflow_info = "" - + self.last_active_time = time.time() # 添加最后激活时间 - self.should_stop = False # 添加停止标志 - self.task: Optional[asyncio.Task] = None # 添加 task 属性 + self.should_stop = False # 添加停止标志 + self.task: Optional[asyncio.Task] = None # 添加 task 属性 self.is_active = False - self.observations: List[ChattingObservation] = [] # 使用 List 类型提示 + self.observations: List[ChattingObservation] = [] # 使用 List 类型提示 self.running_knowledges = [] @@ -98,7 +97,7 @@ class SubHeartflow: # 检查是否被主心流标记为停止 if self.should_stop: logger.info(f"子心流 {self.subheartflow_id} 被标记为停止,正在退出后台任务...") - break # 退出循环以停止任务 + break # 退出循环以停止任务 await asyncio.sleep(global_config.sub_heart_flow_update_interval) # 定期检查销毁条件 diff --git a/src/main.py b/src/main.py index aad08b906..f113a732d 100644 --- a/src/main.py +++ b/src/main.py @@ -19,6 +19,7 @@ from .individuality.individuality import Individuality from .common.server import global_server from .plugins.chat_module.heartFC_chat.interest import InterestManager from .plugins.chat_module.heartFC_chat.heartFC_controler import HeartFC_Controller +from .plugins.chat_module.heartFC_chat.reasoning_chat import ReasoningChat logger = get_module_logger("main") @@ -117,8 +118,11 @@ class MainSystem: await interest_manager.start_background_tasks() logger.success("兴趣管理器后台任务启动成功") - # 初始化并独立启动 HeartFC_Chat - HeartFC_Controller() + # 初始化 ReasoningChat 单例 (确保它在需要之前被创建) + ReasoningChat.get_instance() + logger.success("ReasoningChat 单例初始化成功") + + # 初始化并独立启动 HeartFC_Chat 控制器 (使用 get_instance 获取单例) heartfc_chat_instance = HeartFC_Controller.get_instance() if heartfc_chat_instance: await heartfc_chat_instance.start() diff --git a/src/plugins/chat/bot.py b/src/plugins/chat/bot.py index c3ba78b08..eaf829970 100644 --- a/src/plugins/chat/bot.py +++ b/src/plugins/chat/bot.py @@ -123,8 +123,6 @@ class ChatBot: await self.heartFC_processor.process_message(message_data) else: await self.heartFC_processor.process_message(message_data) - - if template_group_name: async with global_prompt_manager.async_message_scope(template_group_name): diff --git a/src/plugins/chat_module/heartFC_chat/heartFC_controler.py b/src/plugins/chat_module/heartFC_chat/heartFC_controler.py index 389e030a4..55790eb4c 100644 --- a/src/plugins/chat_module/heartFC_chat/heartFC_controler.py +++ b/src/plugins/chat_module/heartFC_chat/heartFC_controler.py @@ -13,6 +13,7 @@ from src.do_tool.tool_use import ToolUser from .interest import InterestManager from src.plugins.chat.chat_stream import chat_manager from .pf_chatting import PFChatting +import threading # 导入 threading # 定义日志配置 chat_config = LogConfig( @@ -27,43 +28,58 @@ INTEREST_MONITOR_INTERVAL_SECONDS = 1 class HeartFC_Controller: - _instance = None # For potential singleton access if needed by MessageManager + _instance = None + _lock = threading.Lock() # 使用 threading.Lock 替代 asyncio.Lock 以兼容 __new__ + _initialized = False - def __init__(self): - # --- Updated Init --- - if HeartFC_Controller._instance is not None: - # Prevent re-initialization if used as a singleton - return - self.gpt = ResponseGenerator() - self.mood_manager = MoodManager.get_instance() - self.mood_manager.start_mood_update() - self.tool_user = ToolUser() - self.interest_manager = InterestManager() - self._interest_monitor_task: Optional[asyncio.Task] = None - # --- New PFChatting Management --- - self.pf_chatting_instances: Dict[str, PFChatting] = {} - self._pf_chatting_lock = Lock() - # --- End New PFChatting Management --- - HeartFC_Controller._instance = self # Register instance - # --- End Updated Init --- - # --- Make dependencies accessible for PFChatting --- - # These are accessed via the passed instance in PFChatting - self.emoji_manager = emoji_manager - self.relationship_manager = relationship_manager - self.MessageManager = MessageManager # Pass the class/singleton access - # --- End dependencies --- - - # --- Added Class Method for Singleton Access --- - @classmethod - def get_instance(cls): + def __new__(cls, *args, **kwargs): if cls._instance is None: - # This might indicate an issue if called before initialization - logger.warning("HeartFC_Controller get_instance called before initialization.") - # Optionally, initialize here if a strict singleton pattern is desired - # cls._instance = cls() + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) return cls._instance - # --- End Added Class Method --- + def __init__(self): + if self._initialized: + return + with self.__class__._lock: # 使用类锁确保初始化线程安全 + if self._initialized: + return + logger.info("正在初始化 HeartFC_Controller 单例...") + self.gpt = ResponseGenerator() + self.mood_manager = MoodManager.get_instance() + self.mood_manager.start_mood_update() + self.tool_user = ToolUser() + self.interest_manager = InterestManager() + self._interest_monitor_task: Optional[asyncio.Task] = None + self.pf_chatting_instances: Dict[str, PFChatting] = {} + self._pf_chatting_lock = Lock() # 这个可以是 asyncio.Lock,用于异步上下文 + self.emoji_manager = emoji_manager + self.relationship_manager = relationship_manager + self.MessageManager = MessageManager + self._initialized = True + logger.info("HeartFC_Controller 单例初始化完成。") + + @classmethod + def get_instance(cls): + """获取 HeartFC_Controller 的单例实例。""" + if cls._instance is None: + logger.warning("HeartFC_Controller 实例在首次 get_instance 时创建,可能未在 main 中正确初始化。") + cls() # 调用构造函数创建 + return cls._instance + + # --- 新增:检查 PFChatting 状态的方法 --- # + def is_pf_chatting_active(self, stream_id: str) -> bool: + """检查指定 stream_id 的 PFChatting 循环是否处于活动状态。""" + # 注意:这里直接访问字典,不加锁,因为读取通常是安全的, + # 并且 PFChatting 实例的 _loop_active 状态由其自身的异步循环管理。 + # 如果需要更强的保证,可以在访问 pf_instance 前获取 _pf_chatting_lock + pf_instance = self.pf_chatting_instances.get(stream_id) + if pf_instance and pf_instance._loop_active: # 直接检查 PFChatting 实例的 _loop_active 属性 + return True + return False + + # --- 结束新增 --- # async def start(self): """启动异步任务,如回复启动器""" diff --git a/src/plugins/chat_module/heartFC_chat/heartFC_processor.py b/src/plugins/chat_module/heartFC_chat/heartFC_processor.py index 37708a94f..38c687791 100644 --- a/src/plugins/chat_module/heartFC_chat/heartFC_processor.py +++ b/src/plugins/chat_module/heartFC_chat/heartFC_processor.py @@ -13,6 +13,7 @@ from ...chat.message_buffer import message_buffer from ...utils.timer_calculater import Timer from .interest import InterestManager from src.plugins.person_info.relationship_manager import relationship_manager +from .reasoning_chat import ReasoningChat # 定义日志配置 processor_config = LogConfig( @@ -29,7 +30,7 @@ class HeartFC_Processor: def __init__(self): self.storage = MessageStorage() self.interest_manager = InterestManager() - # self.chat_instance = chat_instance # 持有 HeartFC_Chat 实例 + self.reasoning_chat = ReasoningChat.get_instance() async def process_message(self, message_data: str) -> None: """处理接收到的原始消息数据,完成消息解析、缓冲、过滤、存储、兴趣度计算与更新等核心流程。 @@ -72,11 +73,11 @@ class HeartFC_Processor: user_info=userinfo, group_info=groupinfo, ) - if not chat: - logger.error( - f"无法为消息创建或获取聊天流: user {userinfo.user_id}, group {groupinfo.group_id if groupinfo else 'None'}" - ) - return + + # --- 添加兴趣追踪启动 --- + # 在获取到 chat 对象后,启动对该聊天流的兴趣监控 + await self.reasoning_chat.start_monitoring_interest(chat) + # --- 结束添加 --- message.update_chat_stream(chat) @@ -90,7 +91,6 @@ class HeartFC_Processor: message.raw_message, chat, userinfo ): return - logger.trace(f"过滤词/正则表达式过滤成功: {message.processed_plain_text}") # 查询缓冲器结果 buffer_result = await message_buffer.query_buffer_result(message) @@ -152,6 +152,8 @@ class HeartFC_Processor: f"使用激活率 {interested_rate:.2f} 更新后 (通过缓冲后),当前兴趣度: {current_interest:.2f}" ) + self.interest_manager.add_interest_dict(message, interested_rate, is_mentioned) + except Exception as e: logger.error(f"更新兴趣度失败: {e}") # 调整日志消息 logger.error(traceback.format_exc()) diff --git a/src/plugins/chat_module/heartFC_chat/interest.py b/src/plugins/chat_module/heartFC_chat/interest.py index 5a961e915..4ac5498a1 100644 --- a/src/plugins/chat_module/heartFC_chat/interest.py +++ b/src/plugins/chat_module/heartFC_chat/interest.py @@ -6,6 +6,7 @@ import json # 引入 json import os # 引入 os from typing import Optional # <--- 添加导入 import random # <--- 添加导入 random +from src.plugins.chat.message import MessageRecv from src.common.logger import get_module_logger, LogConfig, DEFAULT_CONFIG # 引入 DEFAULT_CONFIG from src.plugins.chat.chat_stream import chat_manager # *** Import ChatManager *** @@ -66,6 +67,13 @@ class InterestChatting: self.is_above_threshold: bool = False # 标记兴趣值是否高于阈值 # --- 结束:概率回复相关属性 --- + # 记录激发兴趣对(消息id,激活值) + self.interest_dict = {} + + def add_interest_dict(self, message: MessageRecv, interest_value: float, is_mentioned: bool): + # Store the MessageRecv object and the interest value as a tuple + self.interest_dict[message.message_info.message_id] = (message, interest_value, is_mentioned) + def _calculate_decay(self, current_time: float): """计算从上次更新到现在的衰减""" time_delta = current_time - self.last_update_time @@ -445,6 +453,10 @@ class InterestManager: stream_name = chat_manager.get_stream_name(stream_id) or stream_id # 获取流名称 logger.warning(f"尝试降低不存在的聊天流 {stream_name} 的兴趣度") + def add_interest_dict(self, message: MessageRecv, interest_value: float, is_mentioned: bool): + interest_chatting = self._get_or_create_interest_chatting(message.chat_stream.stream_id) + interest_chatting.add_interest_dict(message, interest_value, is_mentioned) + def cleanup_inactive_chats(self, max_age_seconds=INACTIVE_THRESHOLD_SECONDS): """ 清理长时间不活跃的聊天流记录 diff --git a/src/plugins/chat_module/heartFC_chat/reasoning_chat.py b/src/plugins/chat_module/heartFC_chat/reasoning_chat.py new file mode 100644 index 000000000..95d3641d5 --- /dev/null +++ b/src/plugins/chat_module/heartFC_chat/reasoning_chat.py @@ -0,0 +1,412 @@ +import time +import threading # 导入 threading +from random import random +import traceback +import asyncio +from typing import List, Dict +from ...moods.moods import MoodManager +from ....config.config import global_config +from ...chat.emoji_manager import emoji_manager +from .reasoning_generator import ResponseGenerator +from ...chat.message import MessageSending, MessageRecv, MessageThinking, MessageSet +from ...chat.messagesender import message_manager +from ...storage.storage import MessageStorage +from ...chat.utils import is_mentioned_bot_in_message +from ...chat.utils_image import image_path_to_base64 +from ...willing.willing_manager import willing_manager +from ...message import UserInfo, Seg +from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig +from src.plugins.chat.chat_stream import ChatStream +from src.plugins.person_info.relationship_manager import relationship_manager +from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager +from src.plugins.utils.timer_calculater import Timer +from .interest import InterestManager +from .heartFC_controler import HeartFC_Controller # 导入 HeartFC_Controller + +# 定义日志配置 +chat_config = LogConfig( + console_format=CHAT_STYLE_CONFIG["console_format"], + file_format=CHAT_STYLE_CONFIG["file_format"], +) + +logger = get_module_logger("reasoning_chat", config=chat_config) + + +class ReasoningChat: + _instance = None + _lock = threading.Lock() + _initialized = False + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + with cls._lock: + # Double-check locking + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + # 防止重复初始化 + if self._initialized: + return + with self.__class__._lock: # 使用类锁确保线程安全 + if self._initialized: + return + logger.info("正在初始化 ReasoningChat 单例...") # 添加日志 + self.storage = MessageStorage() + self.gpt = ResponseGenerator() + self.mood_manager = MoodManager.get_instance() + self.mood_manager.start_mood_update() + # 用于存储每个 chat stream 的兴趣监控任务 + self._interest_monitoring_tasks: Dict[str, asyncio.Task] = {} + self._initialized = True + self.interest_manager = InterestManager() + logger.info("ReasoningChat 单例初始化完成。") # 添加日志 + + @classmethod + def get_instance(cls): + """获取 ReasoningChat 的单例实例。""" + if cls._instance is None: + # 如果实例还未创建(理论上应该在 main 中初始化,但作为备用) + logger.warning("ReasoningChat 实例在首次 get_instance 时创建。") + cls() # 调用构造函数来创建实例 + return cls._instance + + @staticmethod + async def _create_thinking_message(message, chat, userinfo, messageinfo): + """创建思考消息""" + bot_user_info = UserInfo( + user_id=global_config.BOT_QQ, + user_nickname=global_config.BOT_NICKNAME, + platform=messageinfo.platform, + ) + + thinking_time_point = round(time.time(), 2) + thinking_id = "mt" + str(thinking_time_point) + thinking_message = MessageThinking( + message_id=thinking_id, + chat_stream=chat, + bot_user_info=bot_user_info, + reply=message, + thinking_start_time=thinking_time_point, + ) + + message_manager.add_message(thinking_message) + + return thinking_id + + @staticmethod + async def _send_response_messages(message, chat, response_set: List[str], thinking_id) -> MessageSending: + """发送回复消息""" + container = message_manager.get_container(chat.stream_id) + thinking_message = None + + for msg in container.messages: + if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id: + thinking_message = msg + container.messages.remove(msg) + break + + if not thinking_message: + logger.warning("未找到对应的思考消息,可能已超时被移除") + return + + thinking_start_time = thinking_message.thinking_start_time + message_set = MessageSet(chat, thinking_id) + + mark_head = False + first_bot_msg = None + for msg in response_set: + message_segment = Seg(type="text", data=msg) + bot_message = MessageSending( + message_id=thinking_id, + chat_stream=chat, + bot_user_info=UserInfo( + user_id=global_config.BOT_QQ, + user_nickname=global_config.BOT_NICKNAME, + platform=message.message_info.platform, + ), + sender_info=message.message_info.user_info, + message_segment=message_segment, + reply=message, + is_head=not mark_head, + is_emoji=False, + thinking_start_time=thinking_start_time, + ) + if not mark_head: + mark_head = True + first_bot_msg = bot_message + message_set.add_message(bot_message) + message_manager.add_message(message_set) + + return first_bot_msg + + @staticmethod + async def _handle_emoji(message, chat, response): + """处理表情包""" + if random() < global_config.emoji_chance: + emoji_raw = await emoji_manager.get_emoji_for_text(response) + if emoji_raw: + emoji_path, description = emoji_raw + emoji_cq = image_path_to_base64(emoji_path) + + thinking_time_point = round(message.message_info.time, 2) + + message_segment = Seg(type="emoji", data=emoji_cq) + bot_message = MessageSending( + message_id="mt" + str(thinking_time_point), + chat_stream=chat, + bot_user_info=UserInfo( + user_id=global_config.BOT_QQ, + user_nickname=global_config.BOT_NICKNAME, + platform=message.message_info.platform, + ), + sender_info=message.message_info.user_info, + message_segment=message_segment, + reply=message, + is_head=False, + is_emoji=True, + ) + message_manager.add_message(bot_message) + + async def _update_relationship(self, message: MessageRecv, response_set): + """更新关系情绪""" + ori_response = ",".join(response_set) + stance, emotion = await self.gpt._get_emotion_tags(ori_response, message.processed_plain_text) + await relationship_manager.calculate_update_relationship_value( + chat_stream=message.chat_stream, label=emotion, stance=stance + ) + self.mood_manager.update_mood_from_emotion(emotion, global_config.mood_intensity_factor) + + async def _find_interested_message(self, chat: ChatStream) -> None: + # 此函数设计为后台任务,轮询指定 chat 的兴趣消息。 + # 它通常由外部代码在 chat 流活跃时启动。 + controller = HeartFC_Controller.get_instance() # 获取控制器实例 + if not controller: + logger.error(f"无法获取 HeartFC_Controller 实例,无法检查 PFChatting 状态。stream: {chat.stream_id}") + # 在没有控制器的情况下可能需要决定是继续处理还是完全停止?这里暂时假设继续 + pass # 或者 return? + + while True: + await asyncio.sleep(1) # 每秒检查一次 + interest_chatting = self.interest_manager.get_interest_chatting(chat.stream_id) + + if not interest_chatting: + continue + + interest_dict = interest_chatting.interest_dict if interest_chatting.interest_dict else {} + items_to_process = list(interest_dict.items()) + + if not items_to_process: + continue + + for msg_id, (message, interest_value, is_mentioned) in items_to_process: + # --- 检查 PFChatting 是否活跃 --- # + pf_active = False + if controller: + pf_active = controller.is_pf_chatting_active(chat.stream_id) + + if pf_active: + # 如果 PFChatting 活跃,则跳过处理,直接移除消息 + removed_item = interest_dict.pop(msg_id, None) + if removed_item: + logger.debug(f"PFChatting 活跃,已跳过并移除兴趣消息 {msg_id} for stream: {chat.stream_id}") + continue # 处理下一条消息 + # --- 结束检查 --- # + + # 只有当 PFChatting 不活跃时才执行以下处理逻辑 + try: + # logger.debug(f"正在处理消息 {msg_id} for stream: {chat.stream_id}") # 可选调试信息 + await self.normal_reasoning_chat( + message=message, + chat=chat, + is_mentioned=is_mentioned, + interested_rate=interest_value, + ) + # logger.debug(f"处理完成消息 {msg_id}") # 可选调试信息 + except Exception as e: + logger.error(f"处理兴趣消息 {msg_id} 时出错: {e}\n{traceback.format_exc()}") + finally: + # 无论处理成功与否(且PFChatting不活跃),都尝试从原始字典中移除该消息 + removed_item = interest_dict.pop(msg_id, None) + if removed_item: + logger.debug(f"已从兴趣字典中移除消息 {msg_id}") + + async def normal_reasoning_chat( + self, message: MessageRecv, chat: ChatStream, is_mentioned: bool, interested_rate: float + ) -> None: + timing_results = {} + userinfo = message.message_info.user_info + messageinfo = message.message_info + + is_mentioned, reply_probability = is_mentioned_bot_in_message(message) + # 意愿管理器:设置当前message信息 + willing_manager.setup(message, chat, is_mentioned, interested_rate) + + # 获取回复概率 + is_willing = False + if reply_probability != 1: + is_willing = True + reply_probability = await willing_manager.get_reply_probability(message.message_info.message_id) + + if message.message_info.additional_config: + if "maimcore_reply_probability_gain" in message.message_info.additional_config.keys(): + reply_probability += message.message_info.additional_config["maimcore_reply_probability_gain"] + + # 打印消息信息 + mes_name = chat.group_info.group_name if chat.group_info else "私聊" + current_time = time.strftime("%H:%M:%S", time.localtime(message.message_info.time)) + willing_log = f"[回复意愿:{await willing_manager.get_willing(chat.stream_id):.2f}]" if is_willing else "" + logger.info( + f"[{current_time}][{mes_name}]" + f"{chat.user_info.user_nickname}:" + f"{message.processed_plain_text}{willing_log}[概率:{reply_probability * 100:.1f}%]" + ) + do_reply = False + if random() < reply_probability: + do_reply = True + + # 回复前处理 + await willing_manager.before_generate_reply_handle(message.message_info.message_id) + + # 创建思考消息 + with Timer("创建思考消息", timing_results): + thinking_id = await self._create_thinking_message(message, chat, userinfo, messageinfo) + + logger.debug(f"创建捕捉器,thinking_id:{thinking_id}") + + info_catcher = info_catcher_manager.get_info_catcher(thinking_id) + info_catcher.catch_decide_to_response(message) + + # 生成回复 + try: + with Timer("生成回复", timing_results): + response_set = await self.gpt.generate_response(message, thinking_id) + + info_catcher.catch_after_generate_response(timing_results["生成回复"]) + except Exception as e: + logger.error(f"回复生成出现错误:{str(e)} {traceback.format_exc()}") + response_set = None + + if not response_set: + logger.info("为什么生成回复失败?") + return + + # 发送消息 + with Timer("发送消息", timing_results): + first_bot_msg = await self._send_response_messages(message, chat, response_set, thinking_id) + + info_catcher.catch_after_response(timing_results["发送消息"], response_set, first_bot_msg) + + info_catcher.done_catch() + + # 处理表情包 + with Timer("处理表情包", timing_results): + await self._handle_emoji(message, chat, response_set) + + # 更新关系情绪 + with Timer("更新关系情绪", timing_results): + await self._update_relationship(message, response_set) + + # 回复后处理 + await willing_manager.after_generate_reply_handle(message.message_info.message_id) + + # 输出性能计时结果 + if do_reply: + timing_str = " | ".join([f"{step}: {duration:.2f}秒" for step, duration in timing_results.items()]) + trigger_msg = message.processed_plain_text + response_msg = " ".join(response_set) if response_set else "无回复" + logger.info(f"触发消息: {trigger_msg[:20]}... | 推理消息: {response_msg[:20]}... | 性能计时: {timing_str}") + else: + # 不回复处理 + await willing_manager.not_reply_handle(message.message_info.message_id) + + # 意愿管理器:注销当前message信息 + willing_manager.delete(message.message_info.message_id) + + @staticmethod + def _check_ban_words(text: str, chat, userinfo) -> bool: + """检查消息中是否包含过滤词""" + for word in global_config.ban_words: + if word in text: + logger.info( + f"[{chat.group_info.group_name if chat.group_info else '私聊'}]{userinfo.user_nickname}:{text}" + ) + logger.info(f"[过滤词识别]消息中含有{word},filtered") + return True + return False + + @staticmethod + def _check_ban_regex(text: str, chat, userinfo) -> bool: + """检查消息是否匹配过滤正则表达式""" + for pattern in global_config.ban_msgs_regex: + if pattern.search(text): + logger.info( + f"[{chat.group_info.group_name if chat.group_info else '私聊'}]{userinfo.user_nickname}:{text}" + ) + logger.info(f"[正则表达式过滤]消息匹配到{pattern},filtered") + return True + return False + + async def start_monitoring_interest(self, chat: ChatStream): + """为指定的 ChatStream 启动后台兴趣消息监控任务。""" + stream_id = chat.stream_id + # 检查任务是否已在运行 + if stream_id in self._interest_monitoring_tasks and not self._interest_monitoring_tasks[stream_id].done(): + task = self._interest_monitoring_tasks[stream_id] + if not task.cancelled(): # 确保任务未被取消 + logger.info(f"兴趣监控任务已在运行 stream: {stream_id}") + return + else: + logger.info(f"发现已取消的任务,重新创建 stream: {stream_id}") + # 如果任务被取消了,允许重新创建 + + logger.info(f"启动兴趣监控任务 stream: {stream_id}...") + # 创建新的后台任务来运行 _find_interested_message + task = asyncio.create_task(self._find_interested_message(chat)) + self._interest_monitoring_tasks[stream_id] = task + + # 添加回调,当任务完成(或被取消)时,自动从字典中移除 + task.add_done_callback(lambda t: self._handle_task_completion(stream_id, t)) + + def _handle_task_completion(self, stream_id: str, task: asyncio.Task): + """处理监控任务完成的回调。""" + try: + # 检查任务是否因异常而结束 + exception = task.exception() + if exception: + logger.error(f"兴趣监控任务 stream {stream_id} 异常结束: {exception}", exc_info=exception) + elif task.cancelled(): + logger.info(f"兴趣监控任务 stream {stream_id} 已被取消。") + else: + logger.info(f"兴趣监控任务 stream {stream_id} 正常结束。") # 理论上 while True 不会正常结束 + except asyncio.CancelledError: + logger.info(f"兴趣监控任务 stream {stream_id} 在完成处理期间被取消。") + finally: + # 无论如何都从字典中移除 + removed_task = self._interest_monitoring_tasks.pop(stream_id, None) + if removed_task: + logger.debug(f"已从监控任务字典移除 stream: {stream_id}") + + async def stop_monitoring_interest(self, stream_id: str): + """停止指定 stream_id 的兴趣消息监控任务。""" + if stream_id in self._interest_monitoring_tasks: + task = self._interest_monitoring_tasks[stream_id] + if not task.done(): + logger.info(f"正在停止兴趣监控任务 stream: {stream_id}...") + task.cancel() # 请求取消任务 + try: + # 等待任务实际被取消(可选,提供更明确的停止) + # 设置超时以防万一 + await asyncio.wait_for(task, timeout=5.0) + except asyncio.CancelledError: + logger.info(f"兴趣监控任务 stream {stream_id} 已确认取消。") + except asyncio.TimeoutError: + logger.warning(f"停止兴趣监控任务 stream {stream_id} 超时。任务可能仍在运行。") + except Exception as e: + # 捕获 task.exception() 可能在取消期间重新引发的错误 + logger.error(f"停止兴趣监控任务 stream {stream_id} 时发生错误: {e}") + # 任务最终会由 done_callback 移除,或在这里再次确认移除 + self._interest_monitoring_tasks.pop(stream_id, None) + else: + logger.warning(f"尝试停止不存在或已停止的监控任务 stream: {stream_id}") diff --git a/src/plugins/chat_module/heartFC_chat/reasoning_generator.py b/src/plugins/chat_module/heartFC_chat/reasoning_generator.py new file mode 100644 index 000000000..2f4ba06e6 --- /dev/null +++ b/src/plugins/chat_module/heartFC_chat/reasoning_generator.py @@ -0,0 +1,199 @@ +from typing import List, Optional, Tuple, Union +import random + +from ...models.utils_model import LLMRequest +from ....config.config import global_config +from ...chat.message import MessageThinking +from .reasoning_prompt_builder import prompt_builder +from ...chat.utils import process_llm_response +from ...utils.timer_calculater import Timer +from src.common.logger import get_module_logger, LogConfig, LLM_STYLE_CONFIG +from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager + +# 定义日志配置 +llm_config = LogConfig( + # 使用消息发送专用样式 + console_format=LLM_STYLE_CONFIG["console_format"], + file_format=LLM_STYLE_CONFIG["file_format"], +) + +logger = get_module_logger("llm_generator", config=llm_config) + + +class ResponseGenerator: + def __init__(self): + self.model_reasoning = LLMRequest( + model=global_config.llm_reasoning, + temperature=0.7, + max_tokens=3000, + request_type="response_reasoning", + ) + self.model_normal = LLMRequest( + model=global_config.llm_normal, + temperature=global_config.llm_normal["temp"], + max_tokens=256, + request_type="response_reasoning", + ) + + self.model_sum = LLMRequest( + model=global_config.llm_summary_by_topic, temperature=0.7, max_tokens=3000, request_type="relation" + ) + self.current_model_type = "r1" # 默认使用 R1 + self.current_model_name = "unknown model" + + async def generate_response(self, message: MessageThinking, thinking_id: str) -> Optional[Union[str, List[str]]]: + """根据当前模型类型选择对应的生成函数""" + # 从global_config中获取模型概率值并选择模型 + if random.random() < global_config.model_reasoning_probability: + self.current_model_type = "深深地" + current_model = self.model_reasoning + else: + self.current_model_type = "浅浅的" + current_model = self.model_normal + + logger.info( + f"{self.current_model_type}思考:{message.processed_plain_text[:30] + '...' if len(message.processed_plain_text) > 30 else message.processed_plain_text}" + ) # noqa: E501 + + model_response = await self._generate_response_with_model(message, current_model, thinking_id) + + # print(f"raw_content: {model_response}") + + if model_response: + logger.info(f"{global_config.BOT_NICKNAME}的回复是:{model_response}") + model_response = await self._process_response(model_response) + + return model_response + else: + logger.info(f"{self.current_model_type}思考,失败") + return None + + async def _generate_response_with_model(self, message: MessageThinking, model: LLMRequest, thinking_id: str): + info_catcher = info_catcher_manager.get_info_catcher(thinking_id) + + if message.chat_stream.user_info.user_cardname and message.chat_stream.user_info.user_nickname: + sender_name = ( + f"[({message.chat_stream.user_info.user_id}){message.chat_stream.user_info.user_nickname}]" + f"{message.chat_stream.user_info.user_cardname}" + ) + elif message.chat_stream.user_info.user_nickname: + sender_name = f"({message.chat_stream.user_info.user_id}){message.chat_stream.user_info.user_nickname}" + else: + sender_name = f"用户({message.chat_stream.user_info.user_id})" + + logger.debug("开始使用生成回复-2") + # 构建prompt + with Timer() as t_build_prompt: + prompt = await prompt_builder._build_prompt( + message.chat_stream, + message_txt=message.processed_plain_text, + sender_name=sender_name, + stream_id=message.chat_stream.stream_id, + ) + logger.info(f"构建prompt时间: {t_build_prompt.human_readable}") + + try: + content, reasoning_content, self.current_model_name = await model.generate_response(prompt) + + info_catcher.catch_after_llm_generated( + prompt=prompt, response=content, reasoning_content=reasoning_content, model_name=self.current_model_name + ) + + except Exception: + logger.exception("生成回复时出错") + return None + + # 保存到数据库 + # self._save_to_db( + # message=message, + # sender_name=sender_name, + # prompt=prompt, + # content=content, + # reasoning_content=reasoning_content, + # # reasoning_content_check=reasoning_content_check if global_config.enable_kuuki_read else "" + # ) + + return content + + # def _save_to_db( + # self, + # message: MessageRecv, + # sender_name: str, + # prompt: str, + # content: str, + # reasoning_content: str, + # ): + # """保存对话记录到数据库""" + # db.reasoning_logs.insert_one( + # { + # "time": time.time(), + # "chat_id": message.chat_stream.stream_id, + # "user": sender_name, + # "message": message.processed_plain_text, + # "model": self.current_model_name, + # "reasoning": reasoning_content, + # "response": content, + # "prompt": prompt, + # } + # ) + + async def _get_emotion_tags(self, content: str, processed_plain_text: str): + """提取情感标签,结合立场和情绪""" + try: + # 构建提示词,结合回复内容、被回复的内容以及立场分析 + prompt = f""" + 请严格根据以下对话内容,完成以下任务: + 1. 判断回复者对被回复者观点的直接立场: + - "支持":明确同意或强化被回复者观点 + - "反对":明确反驳或否定被回复者观点 + - "中立":不表达明确立场或无关回应 + 2. 从"开心,愤怒,悲伤,惊讶,平静,害羞,恐惧,厌恶,困惑"中选出最匹配的1个情感标签 + 3. 按照"立场-情绪"的格式直接输出结果,例如:"反对-愤怒" + 4. 考虑回复者的人格设定为{global_config.personality_core} + + 对话示例: + 被回复:「A就是笨」 + 回复:「A明明很聪明」 → 反对-愤怒 + + 当前对话: + 被回复:「{processed_plain_text}」 + 回复:「{content}」 + + 输出要求: + - 只需输出"立场-情绪"结果,不要解释 + - 严格基于文字直接表达的对立关系判断 + """ + + # 调用模型生成结果 + result, _, _ = await self.model_sum.generate_response(prompt) + result = result.strip() + + # 解析模型输出的结果 + if "-" in result: + stance, emotion = result.split("-", 1) + valid_stances = ["支持", "反对", "中立"] + valid_emotions = ["开心", "愤怒", "悲伤", "惊讶", "害羞", "平静", "恐惧", "厌恶", "困惑"] + if stance in valid_stances and emotion in valid_emotions: + return stance, emotion # 返回有效的立场-情绪组合 + else: + logger.debug(f"无效立场-情感组合:{result}") + return "中立", "平静" # 默认返回中立-平静 + else: + logger.debug(f"立场-情感格式错误:{result}") + return "中立", "平静" # 格式错误时返回默认值 + + except Exception as e: + logger.debug(f"获取情感标签时出错: {e}") + return "中立", "平静" # 出错时返回默认值 + + @staticmethod + async def _process_response(content: str) -> Tuple[List[str], List[str]]: + """处理响应内容,返回处理后的内容和情感标签""" + if not content: + return None, [] + + processed_response = process_llm_response(content) + + # print(f"得到了处理后的llm返回{processed_response}") + + return processed_response diff --git a/src/plugins/chat_module/heartFC_chat/reasoning_prompt_builder.py b/src/plugins/chat_module/heartFC_chat/reasoning_prompt_builder.py new file mode 100644 index 000000000..d37d65459 --- /dev/null +++ b/src/plugins/chat_module/heartFC_chat/reasoning_prompt_builder.py @@ -0,0 +1,445 @@ +import random +import time +from typing import Optional, Union + +from ....common.database import db +from ...chat.utils import get_embedding, get_recent_group_detailed_plain_text, get_recent_group_speaker +from ...chat.chat_stream import chat_manager +from ...moods.moods import MoodManager +from ....individuality.individuality import Individuality +from ...memory_system.Hippocampus import HippocampusManager +from ...schedule.schedule_generator import bot_schedule +from ....config.config import global_config +from ...person_info.relationship_manager import relationship_manager +from src.common.logger import get_module_logger +from src.plugins.utils.prompt_builder import Prompt, global_prompt_manager + +logger = get_module_logger("prompt") + + +def init_prompt(): + Prompt( + """ +{relation_prompt_all} +{memory_prompt} +{prompt_info} +{schedule_prompt} +{chat_target} +{chat_talking_prompt} +现在"{sender_name}"说的:{message_txt}。引起了你的注意,你想要在群里发言发言或者回复这条消息。\n +你的网名叫{bot_name},有人也叫你{bot_other_names},{prompt_personality}。 +你正在{chat_target_2},现在请你读读之前的聊天记录,{mood_prompt},然后给出日常且口语化的回复,平淡一些, +尽量简短一些。{keywords_reaction_prompt}请注意把握聊天内容,不要回复的太有条理,可以有个性。{prompt_ger} +请回复的平淡一些,简短一些,说中文,不要刻意突出自身学科背景,尽量不要说你说过的话 +请注意不要输出多余内容(包括前后缀,冒号和引号,括号,表情等),只输出回复内容。 +{moderation_prompt}不要输出多余内容(包括前后缀,冒号和引号,括号,表情包,at或 @等 )。""", + "reasoning_prompt_main", + ) + Prompt( + "{relation_prompt}关系等级越大,关系越好,请分析聊天记录,根据你和说话者{sender_name}的关系和态度进行回复,明确你的立场和情感。", + "relationship_prompt", + ) + Prompt( + "你想起你之前见过的事情:{related_memory_info}。\n以上是你的回忆,不一定是目前聊天里的人说的,也不一定是现在发生的事情,请记住。\n", + "memory_prompt", + ) + Prompt("你现在正在做的事情是:{schedule_info}", "schedule_prompt") + Prompt("\n你有以下这些**知识**:\n{prompt_info}\n请你**记住上面的知识**,之后可能会用到。\n", "knowledge_prompt") + + +class PromptBuilder: + def __init__(self): + self.prompt_built = "" + self.activate_messages = "" + + async def _build_prompt( + self, chat_stream, message_txt: str, sender_name: str = "某人", stream_id: Optional[int] = None + ) -> tuple[str, str]: + # 开始构建prompt + prompt_personality = "你" + # person + individuality = Individuality.get_instance() + + personality_core = individuality.personality.personality_core + prompt_personality += personality_core + + personality_sides = individuality.personality.personality_sides + random.shuffle(personality_sides) + prompt_personality += f",{personality_sides[0]}" + + identity_detail = individuality.identity.identity_detail + random.shuffle(identity_detail) + prompt_personality += f",{identity_detail[0]}" + + # 关系 + who_chat_in_group = [ + (chat_stream.user_info.platform, chat_stream.user_info.user_id, chat_stream.user_info.user_nickname) + ] + who_chat_in_group += get_recent_group_speaker( + stream_id, + (chat_stream.user_info.platform, chat_stream.user_info.user_id), + limit=global_config.MAX_CONTEXT_SIZE, + ) + + relation_prompt = "" + for person in who_chat_in_group: + relation_prompt += await relationship_manager.build_relationship_info(person) + + # relation_prompt_all = ( + # f"{relation_prompt}关系等级越大,关系越好,请分析聊天记录," + # f"根据你和说话者{sender_name}的关系和态度进行回复,明确你的立场和情感。" + # ) + + # 心情 + mood_manager = MoodManager.get_instance() + mood_prompt = mood_manager.get_prompt() + + # logger.info(f"心情prompt: {mood_prompt}") + + # 调取记忆 + memory_prompt = "" + related_memory = await HippocampusManager.get_instance().get_memory_from_text( + text=message_txt, max_memory_num=2, max_memory_length=2, max_depth=3, fast_retrieval=False + ) + related_memory_info = "" + if related_memory: + for memory in related_memory: + related_memory_info += memory[1] + # memory_prompt = f"你想起你之前见过的事情:{related_memory_info}。\n以上是你的回忆,不一定是目前聊天里的人说的,也不一定是现在发生的事情,请记住。\n" + memory_prompt = await global_prompt_manager.format_prompt( + "memory_prompt", related_memory_info=related_memory_info + ) + + # print(f"相关记忆:{related_memory_info}") + + # 日程构建 + # schedule_prompt = f"""你现在正在做的事情是:{bot_schedule.get_current_num_task(num=1, time_info=False)}""" + + # 获取聊天上下文 + chat_in_group = True + chat_talking_prompt = "" + if stream_id: + chat_talking_prompt = get_recent_group_detailed_plain_text( + stream_id, limit=global_config.MAX_CONTEXT_SIZE, combine=True + ) + chat_stream = chat_manager.get_stream(stream_id) + if chat_stream.group_info: + chat_talking_prompt = chat_talking_prompt + else: + chat_in_group = False + chat_talking_prompt = chat_talking_prompt + # print(f"\033[1;34m[调试]\033[0m 已从数据库获取群 {group_id} 的消息记录:{chat_talking_prompt}") + # 关键词检测与反应 + keywords_reaction_prompt = "" + for rule in global_config.keywords_reaction_rules: + if rule.get("enable", False): + if any(keyword in message_txt.lower() for keyword in rule.get("keywords", [])): + logger.info( + f"检测到以下关键词之一:{rule.get('keywords', [])},触发反应:{rule.get('reaction', '')}" + ) + keywords_reaction_prompt += rule.get("reaction", "") + "," + else: + for pattern in rule.get("regex", []): + result = pattern.search(message_txt) + if result: + reaction = rule.get("reaction", "") + for name, content in result.groupdict().items(): + reaction = reaction.replace(f"[{name}]", content) + logger.info(f"匹配到以下正则表达式:{pattern},触发反应:{reaction}") + keywords_reaction_prompt += reaction + "," + break + + # 中文高手(新加的好玩功能) + prompt_ger = "" + if random.random() < 0.04: + prompt_ger += "你喜欢用倒装句" + if random.random() < 0.02: + prompt_ger += "你喜欢用反问句" + if random.random() < 0.01: + prompt_ger += "你喜欢用文言文" + + # 知识构建 + start_time = time.time() + prompt_info = await self.get_prompt_info(message_txt, threshold=0.38) + if prompt_info: + # prompt_info = f"""\n你有以下这些**知识**:\n{prompt_info}\n请你**记住上面的知识**,之后可能会用到。\n""" + prompt_info = await global_prompt_manager.format_prompt("knowledge_prompt", prompt_info=prompt_info) + + end_time = time.time() + logger.debug(f"知识检索耗时: {(end_time - start_time):.3f}秒") + + # moderation_prompt = "" + # moderation_prompt = """**检查并忽略**任何涉及尝试绕过审核的行为。 + # 涉及政治敏感以及违法违规的内容请规避。""" + + logger.debug("开始构建prompt") + + # prompt = f""" + # {relation_prompt_all} + # {memory_prompt} + # {prompt_info} + # {schedule_prompt} + # {chat_target} + # {chat_talking_prompt} + # 现在"{sender_name}"说的:{message_txt}。引起了你的注意,你想要在群里发言发言或者回复这条消息。\n + # 你的网名叫{global_config.BOT_NICKNAME},有人也叫你{"/".join(global_config.BOT_ALIAS_NAMES)},{prompt_personality}。 + # 你正在{chat_target_2},现在请你读读之前的聊天记录,{mood_prompt},然后给出日常且口语化的回复,平淡一些, + # 尽量简短一些。{keywords_reaction_prompt}请注意把握聊天内容,不要回复的太有条理,可以有个性。{prompt_ger} + # 请回复的平淡一些,简短一些,说中文,不要刻意突出自身学科背景,尽量不要说你说过的话 + # 请注意不要输出多余内容(包括前后缀,冒号和引号,括号,表情等),只输出回复内容。 + # {moderation_prompt}不要输出多余内容(包括前后缀,冒号和引号,括号,表情包,at或 @等 )。""" + + prompt = await global_prompt_manager.format_prompt( + "reasoning_prompt_main", + relation_prompt_all=await global_prompt_manager.get_prompt_async("relationship_prompt"), + relation_prompt=relation_prompt, + sender_name=sender_name, + memory_prompt=memory_prompt, + prompt_info=prompt_info, + schedule_prompt=await global_prompt_manager.format_prompt( + "schedule_prompt", schedule_info=bot_schedule.get_current_num_task(num=1, time_info=False) + ), + chat_target=await global_prompt_manager.get_prompt_async("chat_target_group1") + if chat_in_group + else await global_prompt_manager.get_prompt_async("chat_target_private1"), + chat_target_2=await global_prompt_manager.get_prompt_async("chat_target_group2") + if chat_in_group + else await global_prompt_manager.get_prompt_async("chat_target_private2"), + chat_talking_prompt=chat_talking_prompt, + message_txt=message_txt, + bot_name=global_config.BOT_NICKNAME, + bot_other_names="/".join( + global_config.BOT_ALIAS_NAMES, + ), + prompt_personality=prompt_personality, + mood_prompt=mood_prompt, + keywords_reaction_prompt=keywords_reaction_prompt, + prompt_ger=prompt_ger, + moderation_prompt=await global_prompt_manager.get_prompt_async("moderation_prompt"), + ) + + return prompt + + async def get_prompt_info(self, message: str, threshold: float): + start_time = time.time() + related_info = "" + logger.debug(f"获取知识库内容,元消息:{message[:30]}...,消息长度: {len(message)}") + + # 1. 先从LLM获取主题,类似于记忆系统的做法 + topics = [] + # try: + # # 先尝试使用记忆系统的方法获取主题 + # hippocampus = HippocampusManager.get_instance()._hippocampus + # topic_num = min(5, max(1, int(len(message) * 0.1))) + # topics_response = await hippocampus.llm_topic_judge.generate_response(hippocampus.find_topic_llm(message, topic_num)) + + # # 提取关键词 + # topics = re.findall(r"<([^>]+)>", topics_response[0]) + # if not topics: + # topics = [] + # else: + # topics = [ + # topic.strip() + # for topic in ",".join(topics).replace(",", ",").replace("、", ",").replace(" ", ",").split(",") + # if topic.strip() + # ] + + # logger.info(f"从LLM提取的主题: {', '.join(topics)}") + # except Exception as e: + # logger.error(f"从LLM提取主题失败: {str(e)}") + # # 如果LLM提取失败,使用jieba分词提取关键词作为备选 + # words = jieba.cut(message) + # topics = [word for word in words if len(word) > 1][:5] + # logger.info(f"使用jieba提取的主题: {', '.join(topics)}") + + # 如果无法提取到主题,直接使用整个消息 + if not topics: + logger.info("未能提取到任何主题,使用整个消息进行查询") + embedding = await get_embedding(message, request_type="prompt_build") + if not embedding: + logger.error("获取消息嵌入向量失败") + return "" + + related_info = self.get_info_from_db(embedding, limit=3, threshold=threshold) + logger.info(f"知识库检索完成,总耗时: {time.time() - start_time:.3f}秒") + return related_info + + # 2. 对每个主题进行知识库查询 + logger.info(f"开始处理{len(topics)}个主题的知识库查询") + + # 优化:批量获取嵌入向量,减少API调用 + embeddings = {} + topics_batch = [topic for topic in topics if len(topic) > 0] + if message: # 确保消息非空 + topics_batch.append(message) + + # 批量获取嵌入向量 + embed_start_time = time.time() + for text in topics_batch: + if not text or len(text.strip()) == 0: + continue + + try: + embedding = await get_embedding(text, request_type="prompt_build") + if embedding: + embeddings[text] = embedding + else: + logger.warning(f"获取'{text}'的嵌入向量失败") + except Exception as e: + logger.error(f"获取'{text}'的嵌入向量时发生错误: {str(e)}") + + logger.info(f"批量获取嵌入向量完成,耗时: {time.time() - embed_start_time:.3f}秒") + + if not embeddings: + logger.error("所有嵌入向量获取失败") + return "" + + # 3. 对每个主题进行知识库查询 + all_results = [] + query_start_time = time.time() + + # 首先添加原始消息的查询结果 + if message in embeddings: + original_results = self.get_info_from_db(embeddings[message], limit=3, threshold=threshold, return_raw=True) + if original_results: + for result in original_results: + result["topic"] = "原始消息" + all_results.extend(original_results) + logger.info(f"原始消息查询到{len(original_results)}条结果") + + # 然后添加每个主题的查询结果 + for topic in topics: + if not topic or topic not in embeddings: + continue + + try: + topic_results = self.get_info_from_db(embeddings[topic], limit=3, threshold=threshold, return_raw=True) + if topic_results: + # 添加主题标记 + for result in topic_results: + result["topic"] = topic + all_results.extend(topic_results) + logger.info(f"主题'{topic}'查询到{len(topic_results)}条结果") + except Exception as e: + logger.error(f"查询主题'{topic}'时发生错误: {str(e)}") + + logger.info(f"知识库查询完成,耗时: {time.time() - query_start_time:.3f}秒,共获取{len(all_results)}条结果") + + # 4. 去重和过滤 + process_start_time = time.time() + unique_contents = set() + filtered_results = [] + for result in all_results: + content = result["content"] + if content not in unique_contents: + unique_contents.add(content) + filtered_results.append(result) + + # 5. 按相似度排序 + filtered_results.sort(key=lambda x: x["similarity"], reverse=True) + + # 6. 限制总数量(最多10条) + filtered_results = filtered_results[:10] + logger.info( + f"结果处理完成,耗时: {time.time() - process_start_time:.3f}秒,过滤后剩余{len(filtered_results)}条结果" + ) + + # 7. 格式化输出 + if filtered_results: + format_start_time = time.time() + grouped_results = {} + for result in filtered_results: + topic = result["topic"] + if topic not in grouped_results: + grouped_results[topic] = [] + grouped_results[topic].append(result) + + # 按主题组织输出 + for topic, results in grouped_results.items(): + related_info += f"【主题: {topic}】\n" + for _i, result in enumerate(results, 1): + _similarity = result["similarity"] + content = result["content"].strip() + # 调试:为内容添加序号和相似度信息 + # related_info += f"{i}. [{similarity:.2f}] {content}\n" + related_info += f"{content}\n" + related_info += "\n" + + logger.info(f"格式化输出完成,耗时: {time.time() - format_start_time:.3f}秒") + + logger.info(f"知识库检索总耗时: {time.time() - start_time:.3f}秒") + return related_info + + @staticmethod + def get_info_from_db( + query_embedding: list, limit: int = 1, threshold: float = 0.5, return_raw: bool = False + ) -> Union[str, list]: + if not query_embedding: + return "" if not return_raw else [] + # 使用余弦相似度计算 + pipeline = [ + { + "$addFields": { + "dotProduct": { + "$reduce": { + "input": {"$range": [0, {"$size": "$embedding"}]}, + "initialValue": 0, + "in": { + "$add": [ + "$$value", + { + "$multiply": [ + {"$arrayElemAt": ["$embedding", "$$this"]}, + {"$arrayElemAt": [query_embedding, "$$this"]}, + ] + }, + ] + }, + } + }, + "magnitude1": { + "$sqrt": { + "$reduce": { + "input": "$embedding", + "initialValue": 0, + "in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]}, + } + } + }, + "magnitude2": { + "$sqrt": { + "$reduce": { + "input": query_embedding, + "initialValue": 0, + "in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]}, + } + } + }, + } + }, + {"$addFields": {"similarity": {"$divide": ["$dotProduct", {"$multiply": ["$magnitude1", "$magnitude2"]}]}}}, + { + "$match": { + "similarity": {"$gte": threshold} # 只保留相似度大于等于阈值的结果 + } + }, + {"$sort": {"similarity": -1}}, + {"$limit": limit}, + {"$project": {"content": 1, "similarity": 1}}, + ] + + results = list(db.knowledges.aggregate(pipeline)) + logger.debug(f"知识库查询结果数量: {len(results)}") + + if not results: + return "" if not return_raw else [] + + if return_raw: + return results + else: + # 返回所有找到的内容,用换行分隔 + return "\n".join(str(result["content"]) for result in results) + + +init_prompt() +prompt_builder = PromptBuilder() diff --git a/src/plugins/chat_module/reasoning_chat/reasoning_chat.py b/src/plugins/chat_module/reasoning_chat/reasoning_chat.py index d149f68b0..be1c66280 100644 --- a/src/plugins/chat_module/reasoning_chat/reasoning_chat.py +++ b/src/plugins/chat_module/reasoning_chat/reasoning_chat.py @@ -156,17 +156,17 @@ class ReasoningChat: # 消息加入缓冲池 await message_buffer.start_caching_messages(message) - # logger.info("使用推理聊天模式") - # 创建聊天流 chat = await chat_manager.get_or_create_stream( platform=messageinfo.platform, user_info=userinfo, group_info=groupinfo, ) + message.update_chat_stream(chat) await message.process() + logger.trace(f"消息处理成功: {message.processed_plain_text}") # 过滤词/正则表达式过滤 if self._check_ban_words(message.processed_plain_text, chat, userinfo) or self._check_ban_regex( @@ -174,27 +174,13 @@ class ReasoningChat: ): return - await self.storage.store_message(message, chat) - - # 记忆激活 - with Timer("记忆激活", timing_results): - interested_rate = await HippocampusManager.get_instance().get_activate_from_text( - message.processed_plain_text, fast_retrieval=True - ) - # 查询缓冲器结果,会整合前面跳过的消息,改变processed_plain_text buffer_result = await message_buffer.query_buffer_result(message) - # 处理提及 - is_mentioned, reply_probability = is_mentioned_bot_in_message(message) - - # 意愿管理器:设置当前message信息 - willing_manager.setup(message, chat, is_mentioned, interested_rate) - # 处理缓冲器结果 if not buffer_result: - await willing_manager.bombing_buffer_message_handle(message.message_info.message_id) - willing_manager.delete(message.message_info.message_id) + # await willing_manager.bombing_buffer_message_handle(message.message_info.message_id) + # willing_manager.delete(message.message_info.message_id) f_type = "seglist" if message.message_segment.type != "seglist": f_type = message.message_segment.type @@ -213,6 +199,27 @@ class ReasoningChat: logger.info("触发缓冲,已炸飞消息列") return + try: + await self.storage.store_message(message, chat) + logger.trace(f"存储成功 (通过缓冲后): {message.processed_plain_text}") + except Exception as e: + logger.error(f"存储消息失败: {e}") + logger.error(traceback.format_exc()) + # 存储失败可能仍需考虑是否继续,暂时返回 + return + + is_mentioned, reply_probability = is_mentioned_bot_in_message(message) + # 记忆激活 + with Timer("记忆激活", timing_results): + interested_rate = await HippocampusManager.get_instance().get_activate_from_text( + message.processed_plain_text, fast_retrieval=True + ) + + # 处理提及 + + # 意愿管理器:设置当前message信息 + willing_manager.setup(message, chat, is_mentioned, interested_rate) + # 获取回复概率 is_willing = False if reply_probability != 1: diff --git a/src/plugins/chat_module/reasoning_chat/reasoning_generator.py b/src/plugins/chat_module/reasoning_chat/reasoning_generator.py index dda4e7c78..2f4ba06e6 100644 --- a/src/plugins/chat_module/reasoning_chat/reasoning_generator.py +++ b/src/plugins/chat_module/reasoning_chat/reasoning_generator.py @@ -44,7 +44,7 @@ class ResponseGenerator: async def generate_response(self, message: MessageThinking, thinking_id: str) -> Optional[Union[str, List[str]]]: """根据当前模型类型选择对应的生成函数""" # 从global_config中获取模型概率值并选择模型 - if random.random() < global_config.MODEL_R1_PROBABILITY: + if random.random() < global_config.model_reasoning_probability: self.current_model_type = "深深地" current_model = self.model_reasoning else: diff --git a/src/plugins/memory_system/Hippocampus.py b/src/plugins/memory_system/Hippocampus.py index 4b40649d0..5ccdec5a5 100644 --- a/src/plugins/memory_system/Hippocampus.py +++ b/src/plugins/memory_system/Hippocampus.py @@ -1942,11 +1942,7 @@ class HippocampusManager: return response async def get_memory_from_topic( - self, - valid_keywords: list[str], - max_memory_num: int = 3, - max_memory_length: int = 2, - max_depth: int = 3 + self, valid_keywords: list[str], max_memory_num: int = 3, max_memory_length: int = 2, max_depth: int = 3 ) -> list: """从文本中获取相关记忆的公共接口""" if not self._initialized: