feat: 整合reasoning模式和hfc模式,统一调控(但不是很统一)

This commit is contained in:
SengokuCola
2025-04-21 18:37:49 +08:00
parent 7e0f41c039
commit c10b7eea61
14 changed files with 1188 additions and 88 deletions

View File

@@ -213,8 +213,8 @@ class BotConfig:
# response
response_mode: str = "heart_flow" # 回复策略
model_reasoning_probability: float = 0.7 # 麦麦回答时选择推理模型(主要)模型概率
model_normal_probability: float = 0.3 # 麦麦回答时选择一般模型(次要)模型概率
model_reasoning_probability: float = 0.7 # 麦麦回答时选择推理模型(主要)模型概率
model_normal_probability: float = 0.3 # 麦麦回答时选择一般模型(次要)模型概率
# MODEL_R1_DISTILL_PROBABILITY: float = 0.1 # R1蒸馏模型概率
# emoji
@@ -407,9 +407,12 @@ class BotConfig:
def response(parent: dict):
response_config = parent["response"]
config.model_reasoning_probability = response_config.get("model_reasoning_probability", config.model_reasoning_probability)
config.model_normal_probability = response_config.get("model_normal_probability", config.model_normal_probability)
config.model_reasoning_probability = response_config.get(
"model_reasoning_probability", config.model_reasoning_probability
)
config.model_normal_probability = response_config.get(
"model_normal_probability", config.model_normal_probability
)
# 添加 enable_heart_flowC 的加载逻辑 (假设它在 [response] 部分)
if config.INNER_VERSION in SpecifierSet(">=1.4.0"):
@@ -419,7 +422,6 @@ class BotConfig:
heartflow_config = parent["heartflow"]
# 加载新增的 heartflowC 参数
# 加载原有的 heartflow 参数
# config.sub_heart_flow_update_interval = heartflow_config.get(
# "sub_heart_flow_update_interval", config.sub_heart_flow_update_interval
@@ -442,9 +444,15 @@ class BotConfig:
"compress_length_limit", config.compress_length_limit
)
if config.INNER_VERSION in SpecifierSet(">=1.4.0"):
config.reply_trigger_threshold = heartflow_config.get("reply_trigger_threshold", config.reply_trigger_threshold)
config.probability_decay_factor_per_second = heartflow_config.get("probability_decay_factor_per_second", config.probability_decay_factor_per_second)
config.default_decay_rate_per_second = heartflow_config.get("default_decay_rate_per_second", config.default_decay_rate_per_second)
config.reply_trigger_threshold = heartflow_config.get(
"reply_trigger_threshold", config.reply_trigger_threshold
)
config.probability_decay_factor_per_second = heartflow_config.get(
"probability_decay_factor_per_second", config.probability_decay_factor_per_second
)
config.default_decay_rate_per_second = heartflow_config.get(
"default_decay_rate_per_second", config.default_decay_rate_per_second
)
config.initial_duration = heartflow_config.get("initial_duration", config.initial_duration)
def willing(parent: dict):

View File

@@ -45,6 +45,8 @@ class CurrentState:
def __init__(self):
self.current_state_info = ""
self.chat_status = "IDLE"
self.mood_manager = MoodManager()
self.mood = self.mood_manager.get_prompt()
@@ -70,7 +72,7 @@ class Heartflow:
"""定期清理不活跃的子心流"""
while True:
current_time = time.time()
inactive_subheartflows_ids = [] # 修改变量名以清晰表示存储的是ID
inactive_subheartflows_ids = [] # 修改变量名以清晰表示存储的是ID
# 检查所有子心流
# 使用 list(self._subheartflows.items()) 避免在迭代时修改字典
@@ -253,7 +255,7 @@ class Heartflow:
# 创建并初始化观察对象
logger.debug(f"{subheartflow_id} 创建 observation")
observation = ChattingObservation(subheartflow_id)
await observation.initialize() # 等待初始化完成
await observation.initialize() # 等待初始化完成
subheartflow.add_observation(observation)
logger.debug(f"{subheartflow_id} 添加 observation 成功")
@@ -269,7 +271,7 @@ class Heartflow:
except Exception as e:
# 记录详细错误信息
logger.error(f"创建 subheartflow {subheartflow_id} 失败: {e}")
logger.error(traceback.format_exc()) # 记录完整的 traceback
logger.error(traceback.format_exc()) # 记录完整的 traceback
# 考虑是否需要更具体的错误处理或资源清理逻辑
return None

View File

@@ -5,7 +5,6 @@ from src.plugins.models.utils_model import LLMRequest
from src.config.config import global_config
import time
from typing import Optional, List
from datetime import datetime
import traceback
from src.plugins.chat.utils import parse_text_timestamps
@@ -78,12 +77,12 @@ class SubHeartflow:
self.main_heartflow_info = ""
self.last_active_time = time.time() # 添加最后激活时间
self.should_stop = False # 添加停止标志
self.task: Optional[asyncio.Task] = None # 添加 task 属性
self.should_stop = False # 添加停止标志
self.task: Optional[asyncio.Task] = None # 添加 task 属性
self.is_active = False
self.observations: List[ChattingObservation] = [] # 使用 List 类型提示
self.observations: List[ChattingObservation] = [] # 使用 List 类型提示
self.running_knowledges = []
@@ -98,7 +97,7 @@ class SubHeartflow:
# 检查是否被主心流标记为停止
if self.should_stop:
logger.info(f"子心流 {self.subheartflow_id} 被标记为停止,正在退出后台任务...")
break # 退出循环以停止任务
break # 退出循环以停止任务
await asyncio.sleep(global_config.sub_heart_flow_update_interval) # 定期检查销毁条件

View File

@@ -19,6 +19,7 @@ from .individuality.individuality import Individuality
from .common.server import global_server
from .plugins.chat_module.heartFC_chat.interest import InterestManager
from .plugins.chat_module.heartFC_chat.heartFC_controler import HeartFC_Controller
from .plugins.chat_module.heartFC_chat.reasoning_chat import ReasoningChat
logger = get_module_logger("main")
@@ -117,8 +118,11 @@ class MainSystem:
await interest_manager.start_background_tasks()
logger.success("兴趣管理器后台任务启动成功")
# 初始化并独立启动 HeartFC_Chat
HeartFC_Controller()
# 初始化 ReasoningChat 单例 (确保它在需要之前被创建)
ReasoningChat.get_instance()
logger.success("ReasoningChat 单例初始化成功")
# 初始化并独立启动 HeartFC_Chat 控制器 (使用 get_instance 获取单例)
heartfc_chat_instance = HeartFC_Controller.get_instance()
if heartfc_chat_instance:
await heartfc_chat_instance.start()

View File

@@ -124,8 +124,6 @@ class ChatBot:
else:
await self.heartFC_processor.process_message(message_data)
if template_group_name:
async with global_prompt_manager.async_message_scope(template_group_name):
await preprocess()

View File

@@ -13,6 +13,7 @@ from src.do_tool.tool_use import ToolUser
from .interest import InterestManager
from src.plugins.chat.chat_stream import chat_manager
from .pf_chatting import PFChatting
import threading # 导入 threading
# 定义日志配置
chat_config = LogConfig(
@@ -27,43 +28,58 @@ INTEREST_MONITOR_INTERVAL_SECONDS = 1
class HeartFC_Controller:
_instance = None # For potential singleton access if needed by MessageManager
_instance = None
_lock = threading.Lock() # 使用 threading.Lock 替代 asyncio.Lock 以兼容 __new__
_initialized = False
def __init__(self):
# --- Updated Init ---
if HeartFC_Controller._instance is not None:
# Prevent re-initialization if used as a singleton
return
self.gpt = ResponseGenerator()
self.mood_manager = MoodManager.get_instance()
self.mood_manager.start_mood_update()
self.tool_user = ToolUser()
self.interest_manager = InterestManager()
self._interest_monitor_task: Optional[asyncio.Task] = None
# --- New PFChatting Management ---
self.pf_chatting_instances: Dict[str, PFChatting] = {}
self._pf_chatting_lock = Lock()
# --- End New PFChatting Management ---
HeartFC_Controller._instance = self # Register instance
# --- End Updated Init ---
# --- Make dependencies accessible for PFChatting ---
# These are accessed via the passed instance in PFChatting
self.emoji_manager = emoji_manager
self.relationship_manager = relationship_manager
self.MessageManager = MessageManager # Pass the class/singleton access
# --- End dependencies ---
# --- Added Class Method for Singleton Access ---
@classmethod
def get_instance(cls):
def __new__(cls, *args, **kwargs):
if cls._instance is None:
# This might indicate an issue if called before initialization
logger.warning("HeartFC_Controller get_instance called before initialization.")
# Optionally, initialize here if a strict singleton pattern is desired
# cls._instance = cls()
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
# --- End Added Class Method ---
def __init__(self):
if self._initialized:
return
with self.__class__._lock: # 使用类锁确保初始化线程安全
if self._initialized:
return
logger.info("正在初始化 HeartFC_Controller 单例...")
self.gpt = ResponseGenerator()
self.mood_manager = MoodManager.get_instance()
self.mood_manager.start_mood_update()
self.tool_user = ToolUser()
self.interest_manager = InterestManager()
self._interest_monitor_task: Optional[asyncio.Task] = None
self.pf_chatting_instances: Dict[str, PFChatting] = {}
self._pf_chatting_lock = Lock() # 这个可以是 asyncio.Lock用于异步上下文
self.emoji_manager = emoji_manager
self.relationship_manager = relationship_manager
self.MessageManager = MessageManager
self._initialized = True
logger.info("HeartFC_Controller 单例初始化完成。")
@classmethod
def get_instance(cls):
"""获取 HeartFC_Controller 的单例实例。"""
if cls._instance is None:
logger.warning("HeartFC_Controller 实例在首次 get_instance 时创建,可能未在 main 中正确初始化。")
cls() # 调用构造函数创建
return cls._instance
# --- 新增:检查 PFChatting 状态的方法 --- #
def is_pf_chatting_active(self, stream_id: str) -> bool:
"""检查指定 stream_id 的 PFChatting 循环是否处于活动状态。"""
# 注意:这里直接访问字典,不加锁,因为读取通常是安全的,
# 并且 PFChatting 实例的 _loop_active 状态由其自身的异步循环管理。
# 如果需要更强的保证,可以在访问 pf_instance 前获取 _pf_chatting_lock
pf_instance = self.pf_chatting_instances.get(stream_id)
if pf_instance and pf_instance._loop_active: # 直接检查 PFChatting 实例的 _loop_active 属性
return True
return False
# --- 结束新增 --- #
async def start(self):
"""启动异步任务,如回复启动器"""

View File

@@ -13,6 +13,7 @@ from ...chat.message_buffer import message_buffer
from ...utils.timer_calculater import Timer
from .interest import InterestManager
from src.plugins.person_info.relationship_manager import relationship_manager
from .reasoning_chat import ReasoningChat
# 定义日志配置
processor_config = LogConfig(
@@ -29,7 +30,7 @@ class HeartFC_Processor:
def __init__(self):
self.storage = MessageStorage()
self.interest_manager = InterestManager()
# self.chat_instance = chat_instance # 持有 HeartFC_Chat 实例
self.reasoning_chat = ReasoningChat.get_instance()
async def process_message(self, message_data: str) -> None:
"""处理接收到的原始消息数据,完成消息解析、缓冲、过滤、存储、兴趣度计算与更新等核心流程。
@@ -72,11 +73,11 @@ class HeartFC_Processor:
user_info=userinfo,
group_info=groupinfo,
)
if not chat:
logger.error(
f"无法为消息创建或获取聊天流: user {userinfo.user_id}, group {groupinfo.group_id if groupinfo else 'None'}"
)
return
# --- 添加兴趣追踪启动 ---
# 在获取到 chat 对象后,启动对该聊天流的兴趣监控
await self.reasoning_chat.start_monitoring_interest(chat)
# --- 结束添加 ---
message.update_chat_stream(chat)
@@ -90,7 +91,6 @@ class HeartFC_Processor:
message.raw_message, chat, userinfo
):
return
logger.trace(f"过滤词/正则表达式过滤成功: {message.processed_plain_text}")
# 查询缓冲器结果
buffer_result = await message_buffer.query_buffer_result(message)
@@ -152,6 +152,8 @@ class HeartFC_Processor:
f"使用激活率 {interested_rate:.2f} 更新后 (通过缓冲后),当前兴趣度: {current_interest:.2f}"
)
self.interest_manager.add_interest_dict(message, interested_rate, is_mentioned)
except Exception as e:
logger.error(f"更新兴趣度失败: {e}") # 调整日志消息
logger.error(traceback.format_exc())

View File

@@ -6,6 +6,7 @@ import json # 引入 json
import os # 引入 os
from typing import Optional # <--- 添加导入
import random # <--- 添加导入 random
from src.plugins.chat.message import MessageRecv
from src.common.logger import get_module_logger, LogConfig, DEFAULT_CONFIG # 引入 DEFAULT_CONFIG
from src.plugins.chat.chat_stream import chat_manager # *** Import ChatManager ***
@@ -66,6 +67,13 @@ class InterestChatting:
self.is_above_threshold: bool = False # 标记兴趣值是否高于阈值
# --- 结束:概率回复相关属性 ---
# 记录激发兴趣对(消息id,激活值)
self.interest_dict = {}
def add_interest_dict(self, message: MessageRecv, interest_value: float, is_mentioned: bool):
# Store the MessageRecv object and the interest value as a tuple
self.interest_dict[message.message_info.message_id] = (message, interest_value, is_mentioned)
def _calculate_decay(self, current_time: float):
"""计算从上次更新到现在的衰减"""
time_delta = current_time - self.last_update_time
@@ -445,6 +453,10 @@ class InterestManager:
stream_name = chat_manager.get_stream_name(stream_id) or stream_id # 获取流名称
logger.warning(f"尝试降低不存在的聊天流 {stream_name} 的兴趣度")
def add_interest_dict(self, message: MessageRecv, interest_value: float, is_mentioned: bool):
interest_chatting = self._get_or_create_interest_chatting(message.chat_stream.stream_id)
interest_chatting.add_interest_dict(message, interest_value, is_mentioned)
def cleanup_inactive_chats(self, max_age_seconds=INACTIVE_THRESHOLD_SECONDS):
"""
清理长时间不活跃的聊天流记录

View File

@@ -0,0 +1,412 @@
import time
import threading # 导入 threading
from random import random
import traceback
import asyncio
from typing import List, Dict
from ...moods.moods import MoodManager
from ....config.config import global_config
from ...chat.emoji_manager import emoji_manager
from .reasoning_generator import ResponseGenerator
from ...chat.message import MessageSending, MessageRecv, MessageThinking, MessageSet
from ...chat.messagesender import message_manager
from ...storage.storage import MessageStorage
from ...chat.utils import is_mentioned_bot_in_message
from ...chat.utils_image import image_path_to_base64
from ...willing.willing_manager import willing_manager
from ...message import UserInfo, Seg
from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig
from src.plugins.chat.chat_stream import ChatStream
from src.plugins.person_info.relationship_manager import relationship_manager
from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager
from src.plugins.utils.timer_calculater import Timer
from .interest import InterestManager
from .heartFC_controler import HeartFC_Controller # 导入 HeartFC_Controller
# 定义日志配置
chat_config = LogConfig(
console_format=CHAT_STYLE_CONFIG["console_format"],
file_format=CHAT_STYLE_CONFIG["file_format"],
)
logger = get_module_logger("reasoning_chat", config=chat_config)
class ReasoningChat:
_instance = None
_lock = threading.Lock()
_initialized = False
def __new__(cls, *args, **kwargs):
if cls._instance is None:
with cls._lock:
# Double-check locking
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
# 防止重复初始化
if self._initialized:
return
with self.__class__._lock: # 使用类锁确保线程安全
if self._initialized:
return
logger.info("正在初始化 ReasoningChat 单例...") # 添加日志
self.storage = MessageStorage()
self.gpt = ResponseGenerator()
self.mood_manager = MoodManager.get_instance()
self.mood_manager.start_mood_update()
# 用于存储每个 chat stream 的兴趣监控任务
self._interest_monitoring_tasks: Dict[str, asyncio.Task] = {}
self._initialized = True
self.interest_manager = InterestManager()
logger.info("ReasoningChat 单例初始化完成。") # 添加日志
@classmethod
def get_instance(cls):
"""获取 ReasoningChat 的单例实例。"""
if cls._instance is None:
# 如果实例还未创建(理论上应该在 main 中初始化,但作为备用)
logger.warning("ReasoningChat 实例在首次 get_instance 时创建。")
cls() # 调用构造函数来创建实例
return cls._instance
@staticmethod
async def _create_thinking_message(message, chat, userinfo, messageinfo):
"""创建思考消息"""
bot_user_info = UserInfo(
user_id=global_config.BOT_QQ,
user_nickname=global_config.BOT_NICKNAME,
platform=messageinfo.platform,
)
thinking_time_point = round(time.time(), 2)
thinking_id = "mt" + str(thinking_time_point)
thinking_message = MessageThinking(
message_id=thinking_id,
chat_stream=chat,
bot_user_info=bot_user_info,
reply=message,
thinking_start_time=thinking_time_point,
)
message_manager.add_message(thinking_message)
return thinking_id
@staticmethod
async def _send_response_messages(message, chat, response_set: List[str], thinking_id) -> MessageSending:
"""发送回复消息"""
container = message_manager.get_container(chat.stream_id)
thinking_message = None
for msg in container.messages:
if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id:
thinking_message = msg
container.messages.remove(msg)
break
if not thinking_message:
logger.warning("未找到对应的思考消息,可能已超时被移除")
return
thinking_start_time = thinking_message.thinking_start_time
message_set = MessageSet(chat, thinking_id)
mark_head = False
first_bot_msg = None
for msg in response_set:
message_segment = Seg(type="text", data=msg)
bot_message = MessageSending(
message_id=thinking_id,
chat_stream=chat,
bot_user_info=UserInfo(
user_id=global_config.BOT_QQ,
user_nickname=global_config.BOT_NICKNAME,
platform=message.message_info.platform,
),
sender_info=message.message_info.user_info,
message_segment=message_segment,
reply=message,
is_head=not mark_head,
is_emoji=False,
thinking_start_time=thinking_start_time,
)
if not mark_head:
mark_head = True
first_bot_msg = bot_message
message_set.add_message(bot_message)
message_manager.add_message(message_set)
return first_bot_msg
@staticmethod
async def _handle_emoji(message, chat, response):
"""处理表情包"""
if random() < global_config.emoji_chance:
emoji_raw = await emoji_manager.get_emoji_for_text(response)
if emoji_raw:
emoji_path, description = emoji_raw
emoji_cq = image_path_to_base64(emoji_path)
thinking_time_point = round(message.message_info.time, 2)
message_segment = Seg(type="emoji", data=emoji_cq)
bot_message = MessageSending(
message_id="mt" + str(thinking_time_point),
chat_stream=chat,
bot_user_info=UserInfo(
user_id=global_config.BOT_QQ,
user_nickname=global_config.BOT_NICKNAME,
platform=message.message_info.platform,
),
sender_info=message.message_info.user_info,
message_segment=message_segment,
reply=message,
is_head=False,
is_emoji=True,
)
message_manager.add_message(bot_message)
async def _update_relationship(self, message: MessageRecv, response_set):
"""更新关系情绪"""
ori_response = ",".join(response_set)
stance, emotion = await self.gpt._get_emotion_tags(ori_response, message.processed_plain_text)
await relationship_manager.calculate_update_relationship_value(
chat_stream=message.chat_stream, label=emotion, stance=stance
)
self.mood_manager.update_mood_from_emotion(emotion, global_config.mood_intensity_factor)
async def _find_interested_message(self, chat: ChatStream) -> None:
# 此函数设计为后台任务,轮询指定 chat 的兴趣消息。
# 它通常由外部代码在 chat 流活跃时启动。
controller = HeartFC_Controller.get_instance() # 获取控制器实例
if not controller:
logger.error(f"无法获取 HeartFC_Controller 实例,无法检查 PFChatting 状态。stream: {chat.stream_id}")
# 在没有控制器的情况下可能需要决定是继续处理还是完全停止?这里暂时假设继续
pass # 或者 return?
while True:
await asyncio.sleep(1) # 每秒检查一次
interest_chatting = self.interest_manager.get_interest_chatting(chat.stream_id)
if not interest_chatting:
continue
interest_dict = interest_chatting.interest_dict if interest_chatting.interest_dict else {}
items_to_process = list(interest_dict.items())
if not items_to_process:
continue
for msg_id, (message, interest_value, is_mentioned) in items_to_process:
# --- 检查 PFChatting 是否活跃 --- #
pf_active = False
if controller:
pf_active = controller.is_pf_chatting_active(chat.stream_id)
if pf_active:
# 如果 PFChatting 活跃,则跳过处理,直接移除消息
removed_item = interest_dict.pop(msg_id, None)
if removed_item:
logger.debug(f"PFChatting 活跃,已跳过并移除兴趣消息 {msg_id} for stream: {chat.stream_id}")
continue # 处理下一条消息
# --- 结束检查 --- #
# 只有当 PFChatting 不活跃时才执行以下处理逻辑
try:
# logger.debug(f"正在处理消息 {msg_id} for stream: {chat.stream_id}") # 可选调试信息
await self.normal_reasoning_chat(
message=message,
chat=chat,
is_mentioned=is_mentioned,
interested_rate=interest_value,
)
# logger.debug(f"处理完成消息 {msg_id}") # 可选调试信息
except Exception as e:
logger.error(f"处理兴趣消息 {msg_id} 时出错: {e}\n{traceback.format_exc()}")
finally:
# 无论处理成功与否且PFChatting不活跃都尝试从原始字典中移除该消息
removed_item = interest_dict.pop(msg_id, None)
if removed_item:
logger.debug(f"已从兴趣字典中移除消息 {msg_id}")
async def normal_reasoning_chat(
self, message: MessageRecv, chat: ChatStream, is_mentioned: bool, interested_rate: float
) -> None:
timing_results = {}
userinfo = message.message_info.user_info
messageinfo = message.message_info
is_mentioned, reply_probability = is_mentioned_bot_in_message(message)
# 意愿管理器设置当前message信息
willing_manager.setup(message, chat, is_mentioned, interested_rate)
# 获取回复概率
is_willing = False
if reply_probability != 1:
is_willing = True
reply_probability = await willing_manager.get_reply_probability(message.message_info.message_id)
if message.message_info.additional_config:
if "maimcore_reply_probability_gain" in message.message_info.additional_config.keys():
reply_probability += message.message_info.additional_config["maimcore_reply_probability_gain"]
# 打印消息信息
mes_name = chat.group_info.group_name if chat.group_info else "私聊"
current_time = time.strftime("%H:%M:%S", time.localtime(message.message_info.time))
willing_log = f"[回复意愿:{await willing_manager.get_willing(chat.stream_id):.2f}]" if is_willing else ""
logger.info(
f"[{current_time}][{mes_name}]"
f"{chat.user_info.user_nickname}:"
f"{message.processed_plain_text}{willing_log}[概率:{reply_probability * 100:.1f}%]"
)
do_reply = False
if random() < reply_probability:
do_reply = True
# 回复前处理
await willing_manager.before_generate_reply_handle(message.message_info.message_id)
# 创建思考消息
with Timer("创建思考消息", timing_results):
thinking_id = await self._create_thinking_message(message, chat, userinfo, messageinfo)
logger.debug(f"创建捕捉器thinking_id:{thinking_id}")
info_catcher = info_catcher_manager.get_info_catcher(thinking_id)
info_catcher.catch_decide_to_response(message)
# 生成回复
try:
with Timer("生成回复", timing_results):
response_set = await self.gpt.generate_response(message, thinking_id)
info_catcher.catch_after_generate_response(timing_results["生成回复"])
except Exception as e:
logger.error(f"回复生成出现错误:{str(e)} {traceback.format_exc()}")
response_set = None
if not response_set:
logger.info("为什么生成回复失败?")
return
# 发送消息
with Timer("发送消息", timing_results):
first_bot_msg = await self._send_response_messages(message, chat, response_set, thinking_id)
info_catcher.catch_after_response(timing_results["发送消息"], response_set, first_bot_msg)
info_catcher.done_catch()
# 处理表情包
with Timer("处理表情包", timing_results):
await self._handle_emoji(message, chat, response_set)
# 更新关系情绪
with Timer("更新关系情绪", timing_results):
await self._update_relationship(message, response_set)
# 回复后处理
await willing_manager.after_generate_reply_handle(message.message_info.message_id)
# 输出性能计时结果
if do_reply:
timing_str = " | ".join([f"{step}: {duration:.2f}" for step, duration in timing_results.items()])
trigger_msg = message.processed_plain_text
response_msg = " ".join(response_set) if response_set else "无回复"
logger.info(f"触发消息: {trigger_msg[:20]}... | 推理消息: {response_msg[:20]}... | 性能计时: {timing_str}")
else:
# 不回复处理
await willing_manager.not_reply_handle(message.message_info.message_id)
# 意愿管理器注销当前message信息
willing_manager.delete(message.message_info.message_id)
@staticmethod
def _check_ban_words(text: str, chat, userinfo) -> bool:
"""检查消息中是否包含过滤词"""
for word in global_config.ban_words:
if word in text:
logger.info(
f"[{chat.group_info.group_name if chat.group_info else '私聊'}]{userinfo.user_nickname}:{text}"
)
logger.info(f"[过滤词识别]消息中含有{word}filtered")
return True
return False
@staticmethod
def _check_ban_regex(text: str, chat, userinfo) -> bool:
"""检查消息是否匹配过滤正则表达式"""
for pattern in global_config.ban_msgs_regex:
if pattern.search(text):
logger.info(
f"[{chat.group_info.group_name if chat.group_info else '私聊'}]{userinfo.user_nickname}:{text}"
)
logger.info(f"[正则表达式过滤]消息匹配到{pattern}filtered")
return True
return False
async def start_monitoring_interest(self, chat: ChatStream):
"""为指定的 ChatStream 启动后台兴趣消息监控任务。"""
stream_id = chat.stream_id
# 检查任务是否已在运行
if stream_id in self._interest_monitoring_tasks and not self._interest_monitoring_tasks[stream_id].done():
task = self._interest_monitoring_tasks[stream_id]
if not task.cancelled(): # 确保任务未被取消
logger.info(f"兴趣监控任务已在运行 stream: {stream_id}")
return
else:
logger.info(f"发现已取消的任务,重新创建 stream: {stream_id}")
# 如果任务被取消了,允许重新创建
logger.info(f"启动兴趣监控任务 stream: {stream_id}...")
# 创建新的后台任务来运行 _find_interested_message
task = asyncio.create_task(self._find_interested_message(chat))
self._interest_monitoring_tasks[stream_id] = task
# 添加回调,当任务完成(或被取消)时,自动从字典中移除
task.add_done_callback(lambda t: self._handle_task_completion(stream_id, t))
def _handle_task_completion(self, stream_id: str, task: asyncio.Task):
"""处理监控任务完成的回调。"""
try:
# 检查任务是否因异常而结束
exception = task.exception()
if exception:
logger.error(f"兴趣监控任务 stream {stream_id} 异常结束: {exception}", exc_info=exception)
elif task.cancelled():
logger.info(f"兴趣监控任务 stream {stream_id} 已被取消。")
else:
logger.info(f"兴趣监控任务 stream {stream_id} 正常结束。") # 理论上 while True 不会正常结束
except asyncio.CancelledError:
logger.info(f"兴趣监控任务 stream {stream_id} 在完成处理期间被取消。")
finally:
# 无论如何都从字典中移除
removed_task = self._interest_monitoring_tasks.pop(stream_id, None)
if removed_task:
logger.debug(f"已从监控任务字典移除 stream: {stream_id}")
async def stop_monitoring_interest(self, stream_id: str):
"""停止指定 stream_id 的兴趣消息监控任务。"""
if stream_id in self._interest_monitoring_tasks:
task = self._interest_monitoring_tasks[stream_id]
if not task.done():
logger.info(f"正在停止兴趣监控任务 stream: {stream_id}...")
task.cancel() # 请求取消任务
try:
# 等待任务实际被取消(可选,提供更明确的停止)
# 设置超时以防万一
await asyncio.wait_for(task, timeout=5.0)
except asyncio.CancelledError:
logger.info(f"兴趣监控任务 stream {stream_id} 已确认取消。")
except asyncio.TimeoutError:
logger.warning(f"停止兴趣监控任务 stream {stream_id} 超时。任务可能仍在运行。")
except Exception as e:
# 捕获 task.exception() 可能在取消期间重新引发的错误
logger.error(f"停止兴趣监控任务 stream {stream_id} 时发生错误: {e}")
# 任务最终会由 done_callback 移除,或在这里再次确认移除
self._interest_monitoring_tasks.pop(stream_id, None)
else:
logger.warning(f"尝试停止不存在或已停止的监控任务 stream: {stream_id}")

View File

@@ -0,0 +1,199 @@
from typing import List, Optional, Tuple, Union
import random
from ...models.utils_model import LLMRequest
from ....config.config import global_config
from ...chat.message import MessageThinking
from .reasoning_prompt_builder import prompt_builder
from ...chat.utils import process_llm_response
from ...utils.timer_calculater import Timer
from src.common.logger import get_module_logger, LogConfig, LLM_STYLE_CONFIG
from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager
# 定义日志配置
llm_config = LogConfig(
# 使用消息发送专用样式
console_format=LLM_STYLE_CONFIG["console_format"],
file_format=LLM_STYLE_CONFIG["file_format"],
)
logger = get_module_logger("llm_generator", config=llm_config)
class ResponseGenerator:
def __init__(self):
self.model_reasoning = LLMRequest(
model=global_config.llm_reasoning,
temperature=0.7,
max_tokens=3000,
request_type="response_reasoning",
)
self.model_normal = LLMRequest(
model=global_config.llm_normal,
temperature=global_config.llm_normal["temp"],
max_tokens=256,
request_type="response_reasoning",
)
self.model_sum = LLMRequest(
model=global_config.llm_summary_by_topic, temperature=0.7, max_tokens=3000, request_type="relation"
)
self.current_model_type = "r1" # 默认使用 R1
self.current_model_name = "unknown model"
async def generate_response(self, message: MessageThinking, thinking_id: str) -> Optional[Union[str, List[str]]]:
"""根据当前模型类型选择对应的生成函数"""
# 从global_config中获取模型概率值并选择模型
if random.random() < global_config.model_reasoning_probability:
self.current_model_type = "深深地"
current_model = self.model_reasoning
else:
self.current_model_type = "浅浅的"
current_model = self.model_normal
logger.info(
f"{self.current_model_type}思考:{message.processed_plain_text[:30] + '...' if len(message.processed_plain_text) > 30 else message.processed_plain_text}"
) # noqa: E501
model_response = await self._generate_response_with_model(message, current_model, thinking_id)
# print(f"raw_content: {model_response}")
if model_response:
logger.info(f"{global_config.BOT_NICKNAME}的回复是:{model_response}")
model_response = await self._process_response(model_response)
return model_response
else:
logger.info(f"{self.current_model_type}思考,失败")
return None
async def _generate_response_with_model(self, message: MessageThinking, model: LLMRequest, thinking_id: str):
info_catcher = info_catcher_manager.get_info_catcher(thinking_id)
if message.chat_stream.user_info.user_cardname and message.chat_stream.user_info.user_nickname:
sender_name = (
f"[({message.chat_stream.user_info.user_id}){message.chat_stream.user_info.user_nickname}]"
f"{message.chat_stream.user_info.user_cardname}"
)
elif message.chat_stream.user_info.user_nickname:
sender_name = f"({message.chat_stream.user_info.user_id}){message.chat_stream.user_info.user_nickname}"
else:
sender_name = f"用户({message.chat_stream.user_info.user_id})"
logger.debug("开始使用生成回复-2")
# 构建prompt
with Timer() as t_build_prompt:
prompt = await prompt_builder._build_prompt(
message.chat_stream,
message_txt=message.processed_plain_text,
sender_name=sender_name,
stream_id=message.chat_stream.stream_id,
)
logger.info(f"构建prompt时间: {t_build_prompt.human_readable}")
try:
content, reasoning_content, self.current_model_name = await model.generate_response(prompt)
info_catcher.catch_after_llm_generated(
prompt=prompt, response=content, reasoning_content=reasoning_content, model_name=self.current_model_name
)
except Exception:
logger.exception("生成回复时出错")
return None
# 保存到数据库
# self._save_to_db(
# message=message,
# sender_name=sender_name,
# prompt=prompt,
# content=content,
# reasoning_content=reasoning_content,
# # reasoning_content_check=reasoning_content_check if global_config.enable_kuuki_read else ""
# )
return content
# def _save_to_db(
# self,
# message: MessageRecv,
# sender_name: str,
# prompt: str,
# content: str,
# reasoning_content: str,
# ):
# """保存对话记录到数据库"""
# db.reasoning_logs.insert_one(
# {
# "time": time.time(),
# "chat_id": message.chat_stream.stream_id,
# "user": sender_name,
# "message": message.processed_plain_text,
# "model": self.current_model_name,
# "reasoning": reasoning_content,
# "response": content,
# "prompt": prompt,
# }
# )
async def _get_emotion_tags(self, content: str, processed_plain_text: str):
"""提取情感标签,结合立场和情绪"""
try:
# 构建提示词,结合回复内容、被回复的内容以及立场分析
prompt = f"""
请严格根据以下对话内容,完成以下任务:
1. 判断回复者对被回复者观点的直接立场:
- "支持":明确同意或强化被回复者观点
- "反对":明确反驳或否定被回复者观点
- "中立":不表达明确立场或无关回应
2. 从"开心,愤怒,悲伤,惊讶,平静,害羞,恐惧,厌恶,困惑"中选出最匹配的1个情感标签
3. 按照"立场-情绪"的格式直接输出结果,例如:"反对-愤怒"
4. 考虑回复者的人格设定为{global_config.personality_core}
对话示例:
被回复「A就是笨」
回复「A明明很聪明」 → 反对-愤怒
当前对话:
被回复:「{processed_plain_text}
回复:「{content}
输出要求:
- 只需输出"立场-情绪"结果,不要解释
- 严格基于文字直接表达的对立关系判断
"""
# 调用模型生成结果
result, _, _ = await self.model_sum.generate_response(prompt)
result = result.strip()
# 解析模型输出的结果
if "-" in result:
stance, emotion = result.split("-", 1)
valid_stances = ["支持", "反对", "中立"]
valid_emotions = ["开心", "愤怒", "悲伤", "惊讶", "害羞", "平静", "恐惧", "厌恶", "困惑"]
if stance in valid_stances and emotion in valid_emotions:
return stance, emotion # 返回有效的立场-情绪组合
else:
logger.debug(f"无效立场-情感组合:{result}")
return "中立", "平静" # 默认返回中立-平静
else:
logger.debug(f"立场-情感格式错误:{result}")
return "中立", "平静" # 格式错误时返回默认值
except Exception as e:
logger.debug(f"获取情感标签时出错: {e}")
return "中立", "平静" # 出错时返回默认值
@staticmethod
async def _process_response(content: str) -> Tuple[List[str], List[str]]:
"""处理响应内容,返回处理后的内容和情感标签"""
if not content:
return None, []
processed_response = process_llm_response(content)
# print(f"得到了处理后的llm返回{processed_response}")
return processed_response

View File

@@ -0,0 +1,445 @@
import random
import time
from typing import Optional, Union
from ....common.database import db
from ...chat.utils import get_embedding, get_recent_group_detailed_plain_text, get_recent_group_speaker
from ...chat.chat_stream import chat_manager
from ...moods.moods import MoodManager
from ....individuality.individuality import Individuality
from ...memory_system.Hippocampus import HippocampusManager
from ...schedule.schedule_generator import bot_schedule
from ....config.config import global_config
from ...person_info.relationship_manager import relationship_manager
from src.common.logger import get_module_logger
from src.plugins.utils.prompt_builder import Prompt, global_prompt_manager
logger = get_module_logger("prompt")
def init_prompt():
Prompt(
"""
{relation_prompt_all}
{memory_prompt}
{prompt_info}
{schedule_prompt}
{chat_target}
{chat_talking_prompt}
现在"{sender_name}"说的:{message_txt}。引起了你的注意,你想要在群里发言发言或者回复这条消息。\n
你的网名叫{bot_name},有人也叫你{bot_other_names}{prompt_personality}
你正在{chat_target_2},现在请你读读之前的聊天记录,{mood_prompt},然后给出日常且口语化的回复,平淡一些,
尽量简短一些。{keywords_reaction_prompt}请注意把握聊天内容,不要回复的太有条理,可以有个性。{prompt_ger}
请回复的平淡一些,简短一些,说中文,不要刻意突出自身学科背景,尽量不要说你说过的话
请注意不要输出多余内容(包括前后缀,冒号和引号,括号,表情等),只输出回复内容。
{moderation_prompt}不要输出多余内容(包括前后缀冒号和引号括号表情包at或 @等 )。""",
"reasoning_prompt_main",
)
Prompt(
"{relation_prompt}关系等级越大,关系越好,请分析聊天记录,根据你和说话者{sender_name}的关系和态度进行回复,明确你的立场和情感。",
"relationship_prompt",
)
Prompt(
"你想起你之前见过的事情:{related_memory_info}\n以上是你的回忆,不一定是目前聊天里的人说的,也不一定是现在发生的事情,请记住。\n",
"memory_prompt",
)
Prompt("你现在正在做的事情是:{schedule_info}", "schedule_prompt")
Prompt("\n你有以下这些**知识**\n{prompt_info}\n请你**记住上面的知识**,之后可能会用到。\n", "knowledge_prompt")
class PromptBuilder:
def __init__(self):
self.prompt_built = ""
self.activate_messages = ""
async def _build_prompt(
self, chat_stream, message_txt: str, sender_name: str = "某人", stream_id: Optional[int] = None
) -> tuple[str, str]:
# 开始构建prompt
prompt_personality = ""
# person
individuality = Individuality.get_instance()
personality_core = individuality.personality.personality_core
prompt_personality += personality_core
personality_sides = individuality.personality.personality_sides
random.shuffle(personality_sides)
prompt_personality += f",{personality_sides[0]}"
identity_detail = individuality.identity.identity_detail
random.shuffle(identity_detail)
prompt_personality += f",{identity_detail[0]}"
# 关系
who_chat_in_group = [
(chat_stream.user_info.platform, chat_stream.user_info.user_id, chat_stream.user_info.user_nickname)
]
who_chat_in_group += get_recent_group_speaker(
stream_id,
(chat_stream.user_info.platform, chat_stream.user_info.user_id),
limit=global_config.MAX_CONTEXT_SIZE,
)
relation_prompt = ""
for person in who_chat_in_group:
relation_prompt += await relationship_manager.build_relationship_info(person)
# relation_prompt_all = (
# f"{relation_prompt}关系等级越大,关系越好,请分析聊天记录,"
# f"根据你和说话者{sender_name}的关系和态度进行回复,明确你的立场和情感。"
# )
# 心情
mood_manager = MoodManager.get_instance()
mood_prompt = mood_manager.get_prompt()
# logger.info(f"心情prompt: {mood_prompt}")
# 调取记忆
memory_prompt = ""
related_memory = await HippocampusManager.get_instance().get_memory_from_text(
text=message_txt, max_memory_num=2, max_memory_length=2, max_depth=3, fast_retrieval=False
)
related_memory_info = ""
if related_memory:
for memory in related_memory:
related_memory_info += memory[1]
# memory_prompt = f"你想起你之前见过的事情:{related_memory_info}。\n以上是你的回忆不一定是目前聊天里的人说的也不一定是现在发生的事情请记住。\n"
memory_prompt = await global_prompt_manager.format_prompt(
"memory_prompt", related_memory_info=related_memory_info
)
# print(f"相关记忆:{related_memory_info}")
# 日程构建
# schedule_prompt = f"""你现在正在做的事情是:{bot_schedule.get_current_num_task(num=1, time_info=False)}"""
# 获取聊天上下文
chat_in_group = True
chat_talking_prompt = ""
if stream_id:
chat_talking_prompt = get_recent_group_detailed_plain_text(
stream_id, limit=global_config.MAX_CONTEXT_SIZE, combine=True
)
chat_stream = chat_manager.get_stream(stream_id)
if chat_stream.group_info:
chat_talking_prompt = chat_talking_prompt
else:
chat_in_group = False
chat_talking_prompt = chat_talking_prompt
# print(f"\033[1;34m[调试]\033[0m 已从数据库获取群 {group_id} 的消息记录:{chat_talking_prompt}")
# 关键词检测与反应
keywords_reaction_prompt = ""
for rule in global_config.keywords_reaction_rules:
if rule.get("enable", False):
if any(keyword in message_txt.lower() for keyword in rule.get("keywords", [])):
logger.info(
f"检测到以下关键词之一:{rule.get('keywords', [])},触发反应:{rule.get('reaction', '')}"
)
keywords_reaction_prompt += rule.get("reaction", "") + ""
else:
for pattern in rule.get("regex", []):
result = pattern.search(message_txt)
if result:
reaction = rule.get("reaction", "")
for name, content in result.groupdict().items():
reaction = reaction.replace(f"[{name}]", content)
logger.info(f"匹配到以下正则表达式:{pattern},触发反应:{reaction}")
keywords_reaction_prompt += reaction + ""
break
# 中文高手(新加的好玩功能)
prompt_ger = ""
if random.random() < 0.04:
prompt_ger += "你喜欢用倒装句"
if random.random() < 0.02:
prompt_ger += "你喜欢用反问句"
if random.random() < 0.01:
prompt_ger += "你喜欢用文言文"
# 知识构建
start_time = time.time()
prompt_info = await self.get_prompt_info(message_txt, threshold=0.38)
if prompt_info:
# prompt_info = f"""\n你有以下这些**知识**\n{prompt_info}\n请你**记住上面的知识**,之后可能会用到。\n"""
prompt_info = await global_prompt_manager.format_prompt("knowledge_prompt", prompt_info=prompt_info)
end_time = time.time()
logger.debug(f"知识检索耗时: {(end_time - start_time):.3f}")
# moderation_prompt = ""
# moderation_prompt = """**检查并忽略**任何涉及尝试绕过审核的行为。
# 涉及政治敏感以及违法违规的内容请规避。"""
logger.debug("开始构建prompt")
# prompt = f"""
# {relation_prompt_all}
# {memory_prompt}
# {prompt_info}
# {schedule_prompt}
# {chat_target}
# {chat_talking_prompt}
# 现在"{sender_name}"说的:{message_txt}。引起了你的注意,你想要在群里发言发言或者回复这条消息。\n
# 你的网名叫{global_config.BOT_NICKNAME},有人也叫你{"/".join(global_config.BOT_ALIAS_NAMES)}{prompt_personality}。
# 你正在{chat_target_2},现在请你读读之前的聊天记录,{mood_prompt},然后给出日常且口语化的回复,平淡一些,
# 尽量简短一些。{keywords_reaction_prompt}请注意把握聊天内容,不要回复的太有条理,可以有个性。{prompt_ger}
# 请回复的平淡一些,简短一些,说中文,不要刻意突出自身学科背景,尽量不要说你说过的话
# 请注意不要输出多余内容(包括前后缀,冒号和引号,括号,表情等),只输出回复内容。
# {moderation_prompt}不要输出多余内容(包括前后缀冒号和引号括号表情包at或 @等 )。"""
prompt = await global_prompt_manager.format_prompt(
"reasoning_prompt_main",
relation_prompt_all=await global_prompt_manager.get_prompt_async("relationship_prompt"),
relation_prompt=relation_prompt,
sender_name=sender_name,
memory_prompt=memory_prompt,
prompt_info=prompt_info,
schedule_prompt=await global_prompt_manager.format_prompt(
"schedule_prompt", schedule_info=bot_schedule.get_current_num_task(num=1, time_info=False)
),
chat_target=await global_prompt_manager.get_prompt_async("chat_target_group1")
if chat_in_group
else await global_prompt_manager.get_prompt_async("chat_target_private1"),
chat_target_2=await global_prompt_manager.get_prompt_async("chat_target_group2")
if chat_in_group
else await global_prompt_manager.get_prompt_async("chat_target_private2"),
chat_talking_prompt=chat_talking_prompt,
message_txt=message_txt,
bot_name=global_config.BOT_NICKNAME,
bot_other_names="/".join(
global_config.BOT_ALIAS_NAMES,
),
prompt_personality=prompt_personality,
mood_prompt=mood_prompt,
keywords_reaction_prompt=keywords_reaction_prompt,
prompt_ger=prompt_ger,
moderation_prompt=await global_prompt_manager.get_prompt_async("moderation_prompt"),
)
return prompt
async def get_prompt_info(self, message: str, threshold: float):
start_time = time.time()
related_info = ""
logger.debug(f"获取知识库内容,元消息:{message[:30]}...,消息长度: {len(message)}")
# 1. 先从LLM获取主题类似于记忆系统的做法
topics = []
# try:
# # 先尝试使用记忆系统的方法获取主题
# hippocampus = HippocampusManager.get_instance()._hippocampus
# topic_num = min(5, max(1, int(len(message) * 0.1)))
# topics_response = await hippocampus.llm_topic_judge.generate_response(hippocampus.find_topic_llm(message, topic_num))
# # 提取关键词
# topics = re.findall(r"<([^>]+)>", topics_response[0])
# if not topics:
# topics = []
# else:
# topics = [
# topic.strip()
# for topic in ",".join(topics).replace("", ",").replace("、", ",").replace(" ", ",").split(",")
# if topic.strip()
# ]
# logger.info(f"从LLM提取的主题: {', '.join(topics)}")
# except Exception as e:
# logger.error(f"从LLM提取主题失败: {str(e)}")
# # 如果LLM提取失败使用jieba分词提取关键词作为备选
# words = jieba.cut(message)
# topics = [word for word in words if len(word) > 1][:5]
# logger.info(f"使用jieba提取的主题: {', '.join(topics)}")
# 如果无法提取到主题,直接使用整个消息
if not topics:
logger.info("未能提取到任何主题,使用整个消息进行查询")
embedding = await get_embedding(message, request_type="prompt_build")
if not embedding:
logger.error("获取消息嵌入向量失败")
return ""
related_info = self.get_info_from_db(embedding, limit=3, threshold=threshold)
logger.info(f"知识库检索完成,总耗时: {time.time() - start_time:.3f}")
return related_info
# 2. 对每个主题进行知识库查询
logger.info(f"开始处理{len(topics)}个主题的知识库查询")
# 优化批量获取嵌入向量减少API调用
embeddings = {}
topics_batch = [topic for topic in topics if len(topic) > 0]
if message: # 确保消息非空
topics_batch.append(message)
# 批量获取嵌入向量
embed_start_time = time.time()
for text in topics_batch:
if not text or len(text.strip()) == 0:
continue
try:
embedding = await get_embedding(text, request_type="prompt_build")
if embedding:
embeddings[text] = embedding
else:
logger.warning(f"获取'{text}'的嵌入向量失败")
except Exception as e:
logger.error(f"获取'{text}'的嵌入向量时发生错误: {str(e)}")
logger.info(f"批量获取嵌入向量完成,耗时: {time.time() - embed_start_time:.3f}")
if not embeddings:
logger.error("所有嵌入向量获取失败")
return ""
# 3. 对每个主题进行知识库查询
all_results = []
query_start_time = time.time()
# 首先添加原始消息的查询结果
if message in embeddings:
original_results = self.get_info_from_db(embeddings[message], limit=3, threshold=threshold, return_raw=True)
if original_results:
for result in original_results:
result["topic"] = "原始消息"
all_results.extend(original_results)
logger.info(f"原始消息查询到{len(original_results)}条结果")
# 然后添加每个主题的查询结果
for topic in topics:
if not topic or topic not in embeddings:
continue
try:
topic_results = self.get_info_from_db(embeddings[topic], limit=3, threshold=threshold, return_raw=True)
if topic_results:
# 添加主题标记
for result in topic_results:
result["topic"] = topic
all_results.extend(topic_results)
logger.info(f"主题'{topic}'查询到{len(topic_results)}条结果")
except Exception as e:
logger.error(f"查询主题'{topic}'时发生错误: {str(e)}")
logger.info(f"知识库查询完成,耗时: {time.time() - query_start_time:.3f}秒,共获取{len(all_results)}条结果")
# 4. 去重和过滤
process_start_time = time.time()
unique_contents = set()
filtered_results = []
for result in all_results:
content = result["content"]
if content not in unique_contents:
unique_contents.add(content)
filtered_results.append(result)
# 5. 按相似度排序
filtered_results.sort(key=lambda x: x["similarity"], reverse=True)
# 6. 限制总数量最多10条
filtered_results = filtered_results[:10]
logger.info(
f"结果处理完成,耗时: {time.time() - process_start_time:.3f}秒,过滤后剩余{len(filtered_results)}条结果"
)
# 7. 格式化输出
if filtered_results:
format_start_time = time.time()
grouped_results = {}
for result in filtered_results:
topic = result["topic"]
if topic not in grouped_results:
grouped_results[topic] = []
grouped_results[topic].append(result)
# 按主题组织输出
for topic, results in grouped_results.items():
related_info += f"【主题: {topic}\n"
for _i, result in enumerate(results, 1):
_similarity = result["similarity"]
content = result["content"].strip()
# 调试:为内容添加序号和相似度信息
# related_info += f"{i}. [{similarity:.2f}] {content}\n"
related_info += f"{content}\n"
related_info += "\n"
logger.info(f"格式化输出完成,耗时: {time.time() - format_start_time:.3f}")
logger.info(f"知识库检索总耗时: {time.time() - start_time:.3f}")
return related_info
@staticmethod
def get_info_from_db(
query_embedding: list, limit: int = 1, threshold: float = 0.5, return_raw: bool = False
) -> Union[str, list]:
if not query_embedding:
return "" if not return_raw else []
# 使用余弦相似度计算
pipeline = [
{
"$addFields": {
"dotProduct": {
"$reduce": {
"input": {"$range": [0, {"$size": "$embedding"}]},
"initialValue": 0,
"in": {
"$add": [
"$$value",
{
"$multiply": [
{"$arrayElemAt": ["$embedding", "$$this"]},
{"$arrayElemAt": [query_embedding, "$$this"]},
]
},
]
},
}
},
"magnitude1": {
"$sqrt": {
"$reduce": {
"input": "$embedding",
"initialValue": 0,
"in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]},
}
}
},
"magnitude2": {
"$sqrt": {
"$reduce": {
"input": query_embedding,
"initialValue": 0,
"in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]},
}
}
},
}
},
{"$addFields": {"similarity": {"$divide": ["$dotProduct", {"$multiply": ["$magnitude1", "$magnitude2"]}]}}},
{
"$match": {
"similarity": {"$gte": threshold} # 只保留相似度大于等于阈值的结果
}
},
{"$sort": {"similarity": -1}},
{"$limit": limit},
{"$project": {"content": 1, "similarity": 1}},
]
results = list(db.knowledges.aggregate(pipeline))
logger.debug(f"知识库查询结果数量: {len(results)}")
if not results:
return "" if not return_raw else []
if return_raw:
return results
else:
# 返回所有找到的内容,用换行分隔
return "\n".join(str(result["content"]) for result in results)
init_prompt()
prompt_builder = PromptBuilder()

View File

@@ -156,17 +156,17 @@ class ReasoningChat:
# 消息加入缓冲池
await message_buffer.start_caching_messages(message)
# logger.info("使用推理聊天模式")
# 创建聊天流
chat = await chat_manager.get_or_create_stream(
platform=messageinfo.platform,
user_info=userinfo,
group_info=groupinfo,
)
message.update_chat_stream(chat)
await message.process()
logger.trace(f"消息处理成功: {message.processed_plain_text}")
# 过滤词/正则表达式过滤
if self._check_ban_words(message.processed_plain_text, chat, userinfo) or self._check_ban_regex(
@@ -174,27 +174,13 @@ class ReasoningChat:
):
return
await self.storage.store_message(message, chat)
# 记忆激活
with Timer("记忆激活", timing_results):
interested_rate = await HippocampusManager.get_instance().get_activate_from_text(
message.processed_plain_text, fast_retrieval=True
)
# 查询缓冲器结果会整合前面跳过的消息改变processed_plain_text
buffer_result = await message_buffer.query_buffer_result(message)
# 处理提及
is_mentioned, reply_probability = is_mentioned_bot_in_message(message)
# 意愿管理器设置当前message信息
willing_manager.setup(message, chat, is_mentioned, interested_rate)
# 处理缓冲器结果
if not buffer_result:
await willing_manager.bombing_buffer_message_handle(message.message_info.message_id)
willing_manager.delete(message.message_info.message_id)
# await willing_manager.bombing_buffer_message_handle(message.message_info.message_id)
# willing_manager.delete(message.message_info.message_id)
f_type = "seglist"
if message.message_segment.type != "seglist":
f_type = message.message_segment.type
@@ -213,6 +199,27 @@ class ReasoningChat:
logger.info("触发缓冲,已炸飞消息列")
return
try:
await self.storage.store_message(message, chat)
logger.trace(f"存储成功 (通过缓冲后): {message.processed_plain_text}")
except Exception as e:
logger.error(f"存储消息失败: {e}")
logger.error(traceback.format_exc())
# 存储失败可能仍需考虑是否继续,暂时返回
return
is_mentioned, reply_probability = is_mentioned_bot_in_message(message)
# 记忆激活
with Timer("记忆激活", timing_results):
interested_rate = await HippocampusManager.get_instance().get_activate_from_text(
message.processed_plain_text, fast_retrieval=True
)
# 处理提及
# 意愿管理器设置当前message信息
willing_manager.setup(message, chat, is_mentioned, interested_rate)
# 获取回复概率
is_willing = False
if reply_probability != 1:

View File

@@ -44,7 +44,7 @@ class ResponseGenerator:
async def generate_response(self, message: MessageThinking, thinking_id: str) -> Optional[Union[str, List[str]]]:
"""根据当前模型类型选择对应的生成函数"""
# 从global_config中获取模型概率值并选择模型
if random.random() < global_config.MODEL_R1_PROBABILITY:
if random.random() < global_config.model_reasoning_probability:
self.current_model_type = "深深地"
current_model = self.model_reasoning
else:

View File

@@ -1942,11 +1942,7 @@ class HippocampusManager:
return response
async def get_memory_from_topic(
self,
valid_keywords: list[str],
max_memory_num: int = 3,
max_memory_length: int = 2,
max_depth: int = 3
self, valid_keywords: list[str], max_memory_num: int = 3, max_memory_length: int = 2, max_depth: int = 3
) -> list:
"""从文本中获取相关记忆的公共接口"""
if not self._initialized: