Merge remote-tracking branch 'upstream/debug'

This commit is contained in:
tcmofashi
2025-03-03 09:02:25 +08:00
26 changed files with 623 additions and 718 deletions

View File

@@ -52,6 +52,7 @@ async def start_background_tasks():
"""启动后台任务"""
# 只启动表情包管理任务
asyncio.create_task(emoji_manager.start_periodic_check(interval_MINS=global_config.EMOJI_CHECK_INTERVAL))
await bot_schedule.initialize()
bot_schedule.print_schedule()
@driver.on_startup
@@ -90,7 +91,7 @@ async def monitor_relationships():
async def build_memory_task():
"""每30秒执行一次记忆构建"""
print("\033[1;32m[记忆构建]\033[0m 开始构建记忆...")
await hippocampus.build_memory(chat_size=12)
await hippocampus.build_memory(chat_size=30)
print("\033[1;32m[记忆构建]\033[0m 记忆构建完成")

View File

@@ -2,7 +2,7 @@ from nonebot.adapters.onebot.v11 import GroupMessageEvent, Message as EventMessa
from .message import Message,MessageSet
from .config import BotConfig, global_config
from .storage import MessageStorage
from .llm_generator import LLMResponseGenerator
from .llm_generator import ResponseGenerator
from .message_stream import MessageStream, MessageStreamContainer
from .topic_identifier import topic_identifier
from random import random, choice
@@ -20,7 +20,7 @@ from ..memory_system.memory import memory_graph
class ChatBot:
def __init__(self):
self.storage = MessageStorage()
self.gpt = LLMResponseGenerator()
self.gpt = ResponseGenerator()
self.bot = None # bot 实例引用
self._started = False

View File

@@ -1,4 +1,4 @@
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import Dict, Any, Optional, Set
import os
from nonebot.log import logger, default_format
@@ -32,7 +32,15 @@ class BotConfig:
EMOJI_CHECK_INTERVAL: int = 120 # 表情包检查间隔(分钟)
EMOJI_REGISTER_INTERVAL: int = 10 # 表情包注册间隔(分钟)
# 模型配置
llm_reasoning: Dict[str, str] = field(default_factory=lambda: {})
llm_reasoning_minor: Dict[str, str] = field(default_factory=lambda: {})
llm_normal: Dict[str, str] = field(default_factory=lambda: {})
llm_normal_minor: Dict[str, str] = field(default_factory=lambda: {})
vlm: Dict[str, str] = field(default_factory=lambda: {})
API_USING: str = "siliconflow" # 使用的API
API_PAID: bool = False # 是否使用付费API
MODEL_R1_PROBABILITY: float = 0.8 # R1模型概率
MODEL_V3_PROBABILITY: float = 0.1 # V3模型概率
MODEL_R1_DISTILL_PROBABILITY: float = 0.1 # R1蒸馏模型概率
@@ -48,20 +56,19 @@ class BotConfig:
PROMPT_SCHEDULE_GEN="一个曾经学习地质,现在学习心理学和脑科学的女大学生喜欢刷qq贴吧知乎和小红书"
@staticmethod
def get_default_config_path() -> str:
"""获取默认配置文件路径"""
def get_config_dir() -> str:
"""获取配置文件目录"""
current_dir = os.path.dirname(os.path.abspath(__file__))
root_dir = os.path.abspath(os.path.join(current_dir, '..', '..', '..'))
config_dir = os.path.join(root_dir, 'config')
return os.path.join(config_dir, 'bot_config.toml')
if not os.path.exists(config_dir):
os.makedirs(config_dir)
return config_dir
@classmethod
def load_config(cls, config_path: str = None) -> "BotConfig":
"""从TOML配置文件加载配置"""
if config_path is None:
config_path = cls.get_default_config_path()
logger.info(f"使用默认配置文件路径: {config_path}")
config = cls()
if os.path.exists(config_path):
with open(config_path, "rb") as f:
@@ -89,6 +96,26 @@ class BotConfig:
config.MODEL_V3_PROBABILITY = response_config.get("model_v3_probability", config.MODEL_V3_PROBABILITY)
config.MODEL_R1_DISTILL_PROBABILITY = response_config.get("model_r1_distill_probability", config.MODEL_R1_DISTILL_PROBABILITY)
config.API_USING = response_config.get("api_using", config.API_USING)
config.API_PAID = response_config.get("api_paid", config.API_PAID)
# 加载模型配置
if "model" in toml_dict:
model_config = toml_dict["model"]
if "llm_reasoning" in model_config:
config.llm_reasoning = model_config["llm_reasoning"]
if "llm_reasoning_minor" in model_config:
config.llm_reasoning_minor = model_config["llm_reasoning_minor"]
if "llm_normal" in model_config:
config.llm_normal = model_config["llm_normal"]
if "llm_normal_minor" in model_config:
config.llm_normal_minor = model_config["llm_normal_minor"]
if "vlm" in model_config:
config.vlm = model_config["vlm"]
# 消息配置
if "message" in toml_dict:
@@ -125,12 +152,21 @@ class BotConfig:
return config
# 获取配置文件路径
bot_config_path = BotConfig.get_default_config_path()
config_dir = os.path.dirname(bot_config_path)
logger.info(f"尝试从 {bot_config_path} 加载机器人配置")
bot_config_floder_path = BotConfig.get_config_dir()
print(f"正在品鉴配置文件目录: {bot_config_floder_path}")
bot_config_path = os.path.join(bot_config_floder_path, "bot_config_dev.toml")
if not os.path.exists(bot_config_path):
# 如果开发环境配置文件不存在,则使用默认配置文件
bot_config_path = os.path.join(bot_config_floder_path, "bot_config.toml")
logger.info("使用默认配置文件")
else:
logger.info("已找到开发环境配置文件")
global_config = BotConfig.load_config(config_path=bot_config_path)
@dataclass
class LLMConfig:
"""机器人配置类"""
@@ -151,3 +187,4 @@ llm_config.DEEP_SEEK_BASE_URL = config.deep_seek_base_url
if not global_config.enable_advance_output:
# logger.remove()
pass

View File

@@ -12,6 +12,7 @@ import time
import asyncio
from .utils_image import storage_image,storage_emoji
from .utils_user import get_user_nickname
from ..models.utils_model import LLM_request
#解析各种CQ码
#包含CQ码类
import urllib3
@@ -57,6 +58,11 @@ class CQCode:
translated_plain_text: Optional[str] = None
reply_message: Dict = None # 存储回复消息
image_base64: Optional[str] = None
_llm: Optional[LLM_request] = None
def __post_init__(self):
"""初始化LLM实例"""
self._llm = LLM_request(model=global_config.vlm, temperature=0.4, max_tokens=300)
def translate(self):
"""根据CQ码类型进行相应的翻译处理"""
@@ -161,7 +167,7 @@ class CQCode:
# 将 base64 字符串转换为字节类型
image_bytes = base64.b64decode(base64_str)
storage_emoji(image_bytes)
return self.get_image_description(base64_str)
return self.get_emoji_description(base64_str)
else:
return '[表情包]'
@@ -181,93 +187,23 @@ class CQCode:
def get_emoji_description(self, image_base64: str) -> str:
"""调用AI接口获取表情包描述"""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {config.siliconflow_key}"
}
payload = {
"model": "deepseek-ai/deepseek-vl2",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "这是一个表情包请用简短的中文描述这个表情包传达的情感和含义。最多20个字。"
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_base64}"
}
}
]
}
],
"max_tokens": 50,
"temperature": 0.4
}
response = requests.post(
f"{config.siliconflow_base_url}chat/completions",
headers=headers,
json=payload,
timeout=30
)
if response.status_code == 200:
result_json = response.json()
if "choices" in result_json and len(result_json["choices"]) > 0:
description = result_json["choices"][0]["message"]["content"]
return f"[表情包:{description}]"
raise ValueError(f"AI接口调用失败: {response.text}")
try:
prompt = "这是一个表情包请用简短的中文描述这个表情包传达的情感和含义。最多20个字。"
description, _ = self._llm.generate_response_for_image_sync(prompt, image_base64)
return f"[表情包:{description}]"
except Exception as e:
print(f"\033[1;31m[错误]\033[0m AI接口调用失败: {str(e)}")
return "[表情包]"
def get_image_description(self, image_base64: str) -> str:
"""调用AI接口获取普通图片描述"""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {config.siliconflow_key}"
}
payload = {
"model": "deepseek-ai/deepseek-vl2",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "请用中文描述这张图片的内容。如果有文字请把文字都描述出来。并尝试猜测这个图片的含义。最多200个字。"
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_base64}"
}
}
]
}
],
"max_tokens": 300,
"temperature": 0.6
}
response = requests.post(
f"{config.siliconflow_base_url}chat/completions",
headers=headers,
json=payload,
timeout=30
)
if response.status_code == 200:
result_json = response.json()
if "choices" in result_json and len(result_json["choices"]) > 0:
description = result_json["choices"][0]["message"]["content"]
return f"[图片:{description}]"
raise ValueError(f"AI接口调用失败: {response.text}")
try:
prompt = "请用中文描述这张图片的内容。如果有文字请把文字都描述出来。并尝试猜测这个图片的含义。最多200个字。"
description, _ = self._llm.generate_response_for_image_sync(prompt, image_base64)
return f"[图片:{description}]"
except Exception as e:
print(f"\033[1;31m[错误]\033[0m AI接口调用失败: {str(e)}")
return "[图片]"
def translate_forward(self) -> str:
"""处理转发消息"""
@@ -349,7 +285,7 @@ class CQCode:
# 创建Message对象
from .message import Message
if self.reply_message == None:
print(f"\033[1;31m[错误]\033[0m 回复消息为空")
# print(f"\033[1;31m[错误]\033[0m 回复消息为空")
return '[回复某人消息]'
if self.reply_message.sender.user_id:

View File

@@ -14,6 +14,8 @@ import asyncio
import time
from nonebot import get_driver
from ..chat.config import global_config
from ..models.utils_model import LLM_request
driver = get_driver()
config = driver.config
@@ -43,6 +45,7 @@ class EmojiManager:
def __init__(self):
self.db = Database.get_instance()
self._scan_task = None
self.llm = LLM_request(model=global_config.vlm, temperature=0.3, max_tokens=50)
def _ensure_emoji_dir(self):
"""确保表情存储目录存在"""
@@ -87,55 +90,23 @@ class EmojiManager:
print(f"\033[1;31m[错误]\033[0m 记录表情使用失败: {str(e)}")
async def _get_emotion_from_text(self, text: str) -> List[str]:
"""从文本中识别情感关键词使用DeepSeek API进行分析
"""从文本中识别情感关键词
Args:
text: 输入文本
Returns:
List[str]: 匹配到的情感标签列表
"""
try:
# 准备请求数据
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {config.siliconflow_key}"
}
prompt = f'分析这段文本:"{text}",从"happy,angry,sad,surprised,disgusted,fearful,neutral"中选出最匹配的1个情感标签。只需要返回标签不要输出其他任何内容。'
payload = {
"model": "deepseek-ai/DeepSeek-V3",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": f'分析这段文本:"{text}",从"happy,angry,sad,surprised,disgusted,fearful,neutral"中选出最匹配的1个情感标签。只需要返回标签不要输出其他任何内容。'
}
]
}
],
"max_tokens": 50,
"temperature": 0.3
}
content, _ = await self.llm.generate_response(prompt)
emotion = content.strip().lower()
async with aiohttp.ClientSession() as session:
async with session.post(
f"{config.siliconflow_base_url}chat/completions",
headers=headers,
json=payload
) as response:
if response.status != 200:
print(f"\033[1;31m[错误]\033[0m API请求失败: {await response.text()}")
return ['neutral']
result = json.loads(await response.text())
if "choices" in result and len(result["choices"]) > 0:
emotion = result["choices"][0]["message"]["content"].strip().lower()
# 确保返回的标签是有效的
if emotion in self.EMOTION_KEYWORDS:
print(f"\033[1;32m[成功]\033[0m 识别到的情感: {emotion}")
return [emotion] # 返回单个情感标签的列表
if emotion in self.EMOTION_KEYWORDS:
print(f"\033[1;32m[成功]\033[0m 识别到的情感: {emotion}")
return [emotion]
return ['neutral'] # 如果无法识别情感返回neutral
return ['neutral']
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 情感分析失败: {str(e)}")
@@ -250,52 +221,20 @@ class EmojiManager:
async def _get_emoji_tag(self, image_base64: str) -> str:
"""获取表情包的标签"""
async with aiohttp.ClientSession() as session:
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {config.siliconflow_key}"
}
try:
prompt = '这是一个表情包,请从"happy", "angry", "sad", "surprised", "disgusted", "fearful", "neutral"中选出1个情感标签。只输出标签不要输出其他任何内容只输出情感标签就好'
payload = {
"model": "deepseek-ai/deepseek-vl2",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": '这是一个表情包,请从"happy", "angry", "sad", "surprised", "disgusted", "fearful", "neutral"中选出1个情感标签。只输出标签不要输出其他任何内容只输出情感标签就好'
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_base64}"
}
}
]
}
],
"max_tokens": 60,
"temperature": 0.3
}
content, _ = await self.llm.generate_response_for_image(prompt, image_base64)
tag_result = content.strip().lower()
async with session.post(
f"{config.siliconflow_base_url}chat/completions",
headers=headers,
json=payload
) as response:
if response.status == 200:
result = await response.json()
if "choices" in result and len(result["choices"]) > 0:
tag_result = result["choices"][0]["message"]["content"].strip().lower()
valid_tags = ["happy", "angry", "sad", "surprised", "disgusted", "fearful", "neutral"]
for tag_match in valid_tags:
if tag_match in tag_result or tag_match == tag_result:
return tag_match
print(f"\033[1;33m[警告]\033[0m 无效的标签: {tag_match}, 跳过")
else:
print(f"\033[1;31m[错误]\033[0m 获取标签失败, 状态码: {response.status}")
valid_tags = ["happy", "angry", "sad", "surprised", "disgusted", "fearful", "neutral"]
for tag_match in valid_tags:
if tag_match in tag_result or tag_match == tag_result:
return tag_match
print(f"\033[1;33m[警告]\033[0m 无效的标签: {tag_result}, 跳过")
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 获取标签失败: {str(e)}")
print(f"\033[1;32m[调试信息]\033[0m 使用默认标签: neutral")
return "skip" # 默认标签

View File

@@ -13,274 +13,120 @@ from .prompt_builder import prompt_builder
from .config import global_config
from .utils import process_llm_response
from nonebot import get_driver
from ..models.utils_model import LLM_request
driver = get_driver()
config = driver.config
class LLMResponseGenerator:
class ResponseGenerator:
def __init__(self):
if global_config.API_USING == "siliconflow":
self.client = OpenAI(
api_key=config.siliconflow_key,
base_url=config.siliconflow_base_url
)
elif global_config.API_USING == "deepseek":
self.client = OpenAI(
api_key=config.deep_seek_key,
base_url=config.deep_seek_base_url
)
self.model_r1 = LLM_request(model=global_config.llm_reasoning, temperature=0.7)
self.model_v3 = LLM_request(model=global_config.llm_normal, temperature=0.7)
self.model_r1_distill = LLM_request(model=global_config.llm_reasoning_minor, temperature=0.7)
self.db = Database.get_instance()
# 当前使用的模型类型
self.current_model_type = 'r1' # 默认使用 R1
async def generate_response(self, message: Message) -> Optional[Union[str, List[str]]]:
"""根据当前模型类型选择对应的生成函数"""
# 从global_config中获取模型概率值
model_r1_probability = global_config.MODEL_R1_PROBABILITY
model_v3_probability = global_config.MODEL_V3_PROBABILITY
model_r1_distill_probability = global_config.MODEL_R1_DISTILL_PROBABILITY
# 生成随机数并根据概率选择模型
# 从global_config中获取模型概率值并选择模型
rand = random.random()
if rand < model_r1_probability:
if rand < global_config.MODEL_R1_PROBABILITY:
self.current_model_type = 'r1'
elif rand < model_r1_probability + model_v3_probability:
current_model = self.model_r1
elif rand < global_config.MODEL_R1_PROBABILITY + global_config.MODEL_V3_PROBABILITY:
self.current_model_type = 'v3'
current_model = self.model_v3
else:
self.current_model_type = 'r1_distill' # 默认使用 R1-Distill
self.current_model_type = 'r1_distill'
current_model = self.model_r1_distill
print(f"+++++++++++++++++{global_config.BOT_NICKNAME}{self.current_model_type}思考中+++++++++++++++++")
if self.current_model_type == 'r1':
model_response = await self._generate_r1_response(message)
elif self.current_model_type == 'v3':
model_response = await self._generate_v3_response(message)
else:
model_response = await self._generate_r1_distill_response(message)
# 打印情感标签
print(f'{global_config.BOT_NICKNAME}的回复是:{model_response}')
model_response, emotion = await self._process_response(model_response)
model_response = await self._generate_response_with_model(message, current_model)
if model_response:
print(f"'{model_response}' 获取到的情感标签为{emotion}")
valuedict={
print(f'{global_config.BOT_NICKNAME}的回复是{model_response}')
model_response, emotion = await self._process_response(model_response)
if model_response:
print(f"'{model_response}' 获取到的情感标签为:{emotion}")
valuedict={
'happy':0.5,'angry':-1,'sad':-0.5,'surprised':0.5,'disgusted':-1.5,'fearful':-0.25,'neutral':0.25
}
await relationship_manager.update_relationship_value(message.user_id, relationship_value=valuedict[emotion[0]])
}
await relationship_manager.update_relationship_value(message.user_id, relationship_value=valuedict[emotion[0]])
return model_response, emotion
return None, []
return model_response, emotion
async def _generate_base_response(
self,
message: Message,
model_name: str,
model_params: Optional[Dict[str, Any]] = None
) -> Optional[str]:
async def _generate_response_with_model(self, message: Message, model: LLM_request) -> Optional[str]:
"""使用指定的模型生成回复"""
sender_name = message.user_nickname or f"用户{message.user_id}"
if message.user_cardname:
sender_name=f"[({message.user_id}){message.user_nickname}]{message.user_cardname}"
# 获取关系值
if relationship_manager.get_relationship(message.user_id):
relationship_value = relationship_manager.get_relationship(message.user_id).relationship_value
relationship_value = relationship_manager.get_relationship(message.user_id).relationship_value if relationship_manager.get_relationship(message.user_id) else 0.0
if relationship_value != 0.0:
print(f"\033[1;32m[关系管理]\033[0m 回复中_当前关系值: {relationship_value}")
else:
relationship_value = 0.0
''' 构建prompt '''
prompt,prompt_check = prompt_builder._build_prompt(
# 构建prompt
prompt, prompt_check = prompt_builder._build_prompt(
message_txt=message.processed_plain_text,
sender_name=sender_name,
relationship_value=relationship_value,
group_id=message.group_id
)
# 设置默认参数
default_params = {
"model": model_name,
"messages": [{"role": "user", "content": prompt}],
"stream": False,
"max_tokens": 2048,
"temperature": 0.7
}
default_params_check = {
"model": "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B",
"messages": [{"role": "user", "content": prompt_check}],
"stream": False,
"max_tokens": 2048,
"temperature": 0.7
}
default_params_check = {
"model": "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B",
"messages": [{"role": "user", "content": prompt_check}],
"stream": False,
"max_tokens": 1024,
"temperature": 0.7
}
default_params_check = {
"model": "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B",
"messages": [{"role": "user", "content": prompt_check}],
"stream": False,
"max_tokens": 1024,
"temperature": 0.7
}
# 更新参数
if model_params:
default_params.update(model_params)
def create_completion():
return self.client.chat.completions.create(**default_params)
def create_completion_check():
return self.client.chat.completions.create(**default_params_check)
loop = asyncio.get_event_loop()
# 读空气模块
air = 0
reasoning_content_check=''
content_check=''
if global_config.enable_kuuki_read:
response_check = await loop.run_in_executor(None, create_completion_check)
if response_check:
reasoning_content_check = ""
if hasattr(response_check.choices[0].message, "reasoning"):
reasoning_content_check = response_check.choices[0].message.reasoning or reasoning_content_check
elif hasattr(response_check.choices[0].message, "reasoning_content"):
reasoning_content_check = response_check.choices[0].message.reasoning_content or reasoning_content_check
content_check = response_check.choices[0].message.content
print(f"\033[1;32m[读空气]\033[0m 读空气结果为{content_check}")
if 'yes' not in content_check.lower():
air = 1
#稀释读空气的判定
if air == 1 and random.random() < 0.3:
self.db.db.reasoning_logs.insert_one({
'time': time.time(),
'group_id': message.group_id,
'user': sender_name,
'message': message.processed_plain_text,
'model': model_name,
'reasoning_check': reasoning_content_check,
'response_check': content_check,
'reasoning': "",
'response': "",
'prompt': prompt,
'prompt_check': prompt_check,
'model_params': default_params
})
return None
content_check, reasoning_content_check = await self.model_v3.generate_response(prompt_check)
print(f"\033[1;32m[读空气]\033[0m 读空气结果为{content_check}")
if 'yes' not in content_check.lower() and random.random() < 0.3:
self._save_to_db(
message=message,
sender_name=sender_name,
prompt=prompt,
prompt_check=prompt_check,
content="",
content_check=content_check,
reasoning_content="",
reasoning_content_check=reasoning_content_check
)
return None
response = await loop.run_in_executor(None, create_completion)
# 生成回复
content, reasoning_content = await model.generate_response(prompt)
# 检查响应内容
if not response:
print("请求未返回任何内容")
return None
if not response.choices or not response.choices[0].message.content:
print("请求返回的内容无效:", response)
return None
content = response.choices[0].message.content
# 获取推理内容
reasoning_content = ""
if hasattr(response.choices[0].message, "reasoning"):
reasoning_content = response.choices[0].message.reasoning or reasoning_content
elif hasattr(response.choices[0].message, "reasoning_content"):
reasoning_content = response.choices[0].message.reasoning_content or reasoning_content
# 保存到数据库
self._save_to_db(
message=message,
sender_name=sender_name,
prompt=prompt,
prompt_check=prompt_check,
content=content,
content_check=content_check if global_config.enable_kuuki_read else "",
reasoning_content=reasoning_content,
reasoning_content_check=reasoning_content_check if global_config.enable_kuuki_read else ""
)
return content
def _save_to_db(self, message: Message, sender_name: str, prompt: str, prompt_check: str,
content: str, content_check: str, reasoning_content: str, reasoning_content_check: str):
"""保存对话记录到数据库"""
self.db.db.reasoning_logs.insert_one({
'time': time.time(),
'group_id': message.group_id,
'user': sender_name,
'message': message.processed_plain_text,
'model': model_name,
'model': self.current_model_type,
'reasoning_check': reasoning_content_check,
'response_check': content_check,
'reasoning': reasoning_content,
'response': content,
'prompt': prompt,
'prompt_check': prompt_check,
'model_params': default_params
'prompt_check': prompt_check
})
return content
async def _generate_r1_response(self, message: Message) -> Optional[str]:
"""使用 DeepSeek-R1 模型生成回复"""
if global_config.API_USING == "deepseek":
return await self._generate_base_response(
message,
"deepseek-reasoner",
{"temperature": 0.7, "max_tokens": 2048}
)
else:
return await self._generate_base_response(
message,
"Pro/deepseek-ai/DeepSeek-R1",
{"temperature": 0.7, "max_tokens": 2048}
)
async def _generate_v3_response(self, message: Message) -> Optional[str]:
"""使用 DeepSeek-V3 模型生成回复"""
if global_config.API_USING == "deepseek":
return await self._generate_base_response(
message,
"deepseek-chat",
{"temperature": 0.8, "max_tokens": 2048}
)
else:
return await self._generate_base_response(
message,
"Pro/deepseek-ai/DeepSeek-V3",
{"temperature": 0.8, "max_tokens": 2048}
)
async def _generate_r1_distill_response(self, message: Message) -> Optional[str]:
"""使用 DeepSeek-R1-Distill-Qwen-32B 模型生成回复"""
return await self._generate_base_response(
message,
"deepseek-ai/DeepSeek-R1-Distill-Qwen-32B",
{"temperature": 0.7, "max_tokens": 2048}
)
async def _get_group_chat_context(self, message: Message) -> str:
"""获取群聊上下文"""
recent_messages = self.db.db.messages.find(
{"group_id": message.group_id}
).sort("time", -1).limit(15)
messages_list = list(recent_messages)[::-1]
group_chat = ""
for msg_dict in messages_list:
time_str = time.strftime("%m-%d %H:%M:%S", time.localtime(msg_dict['time']))
display_name = msg_dict.get('user_nickname', f"用户{msg_dict['user_id']}")
cardname = msg_dict.get('user_cardname', '')
display_name = f"[({msg_dict['user_id']}){display_name}]{cardname}" if cardname!='' else display_name
content = msg_dict.get('processed_plain_text', msg_dict['plain_text'])
group_chat += f"[{time_str}] {display_name}: {content}\n"
return group_chat
async def _get_emotion_tags(self, content: str) -> List[str]:
"""提取情感标签"""
@@ -291,33 +137,12 @@ class LLMResponseGenerator:
输出:
'''
messages = [{"role": "user", "content": prompt}]
loop = asyncio.get_event_loop()
if global_config.API_USING == "deepseek":
model = "deepseek-chat"
else:
model = "Pro/deepseek-ai/DeepSeek-V3"
create_completion = partial(
self.client.chat.completions.create,
model=model,
messages=messages,
stream=False,
max_tokens=30,
temperature=0.6
)
response = await loop.run_in_executor(None, create_completion)
if response.choices[0].message.content:
# 确保返回的是列表格式
emotion_tag = response.choices[0].message.content.strip()
return [emotion_tag] # 将单个标签包装成列表返回
return ["neutral"] # 如果无法获取情感标签,返回默认值
content, _ = await self.model_v3.generate_response(prompt)
return [content.strip()] if content else ["neutral"]
except Exception as e:
print(f"获取情感标签时出错: {e}")
return ["neutral"] # 发生错误时返回默认值
return ["neutral"]
async def _process_response(self, content: str) -> Tuple[List[str], List[str]]:
"""处理响应内容,返回处理后的内容和情感标签"""
@@ -325,10 +150,6 @@ class LLMResponseGenerator:
return None, []
emotion_tags = await self._get_emotion_tags(content)
processed_response = process_llm_response(content)
return processed_response, emotion_tags
# 创建全局实例
llm_response = LLMResponseGenerator()
return processed_response, emotion_tags

View File

@@ -72,12 +72,15 @@ class PromptBuilder:
# print(f"\033[1;32m[前额叶]\033[0m 合并所有需要的记忆2: {list(overlapping_second_layer)}")
# 使用集合去重
all_memories = list(set(all_first_layer_items) | set(overlapping_second_layer))
# 从每个来源随机选择2条记忆如果有的话
selected_first_layer = random.sample(all_first_layer_items, min(2, len(all_first_layer_items))) if all_first_layer_items else []
selected_second_layer = random.sample(list(overlapping_second_layer), min(2, len(overlapping_second_layer))) if overlapping_second_layer else []
# 合并并去重
all_memories = list(set(selected_first_layer + selected_second_layer))
if all_memories:
print(f"\033[1;32m[前额叶]\033[0m 合并所有需要的记忆: {all_memories}")
if all_memories: # 只在列表非空时选择随机项
random_item = choice(all_memories)
random_item = " ".join(all_memories)
memory_prompt = f"看到这些聊天,你想起来{random_item}\n"
else:
memory_prompt = "" # 如果没有记忆,则返回空字符串
@@ -150,7 +153,7 @@ class PromptBuilder:
if personality_choice < 4/6: # 第一种人格
prompt_personality = f'''{activate_prompt}你的网名叫{global_config.BOT_NICKNAME}{personality[0]},{promt_info_prompt},
现在请你给出日常且口语化的回复,平淡一些,尽量简短一些。{is_bot_prompt}
请注意把握群里的聊天内容,不要回复的太有条理,可以有个性。'''
请注意把握群里的聊天内容,不要刻意突出自身学科背景,不要回复的太有条理,可以有个性。'''
elif personality_choice < 1: # 第二种人格
prompt_personality = f'''{activate_prompt}你的网名叫{global_config.BOT_NICKNAME}{personality[1]}{promt_info_prompt},

View File

@@ -3,6 +3,7 @@ from openai import OpenAI
from .message import Message
import jieba
from nonebot import get_driver
from .config import global_config
driver = get_driver()
config = driver.config
@@ -24,7 +25,7 @@ class TopicIdentifier:
消息内容:{text}"""
response = self.client.chat.completions.create(
model="Pro/deepseek-ai/DeepSeek-V3",
model=global_config.SILICONFLOW_MODEL_V3,
messages=[{"role": "user", "content": prompt}],
temperature=0.8,
max_tokens=10