Merge branch 'debug' into patch-1

This commit is contained in:
gyc123456-1
2025-03-07 22:39:30 +08:00
committed by GitHub
37 changed files with 2871 additions and 1608 deletions

View File

@@ -14,7 +14,13 @@ from nonebot.rule import to_me
from .bot import chat_bot
from .emoji_manager import emoji_manager
import time
from ..utils.statistic import LLMStatistics
# 创建LLM统计实例
llm_stats = LLMStatistics("llm_statistics.txt")
# 添加标志变量
_message_manager_started = False
# 获取驱动器
driver = get_driver()
@@ -55,6 +61,10 @@ scheduler = require("nonebot_plugin_apscheduler").scheduler
@driver.on_startup
async def start_background_tasks():
"""启动后台任务"""
# 启动LLM统计
llm_stats.start()
print("\033[1;32m[初始化]\033[0m LLM统计功能已启动")
# 只启动表情包管理任务
asyncio.create_task(emoji_manager.start_periodic_check(interval_MINS=global_config.EMOJI_CHECK_INTERVAL))
await bot_schedule.initialize()
@@ -70,18 +80,20 @@ async def init_relationships():
@driver.on_bot_connect
async def _(bot: Bot):
"""Bot连接成功时的处理"""
global _message_manager_started
print(f"\033[1;38;5;208m-----------{global_config.BOT_NICKNAME}成功连接!-----------\033[0m")
await willing_manager.ensure_started()
message_sender.set_bot(bot)
print("\033[1;38;5;208m-----------消息发送器已启动!-----------\033[0m")
asyncio.create_task(message_manager.start_processor())
print("\033[1;38;5;208m-----------消息处理器已启动!-----------\033[0m")
if not _message_manager_started:
asyncio.create_task(message_manager.start_processor())
_message_manager_started = True
print("\033[1;38;5;208m-----------消息处理器已启动!-----------\033[0m")
asyncio.create_task(emoji_manager._periodic_scan(interval_MINS=global_config.EMOJI_REGISTER_INTERVAL))
print("\033[1;38;5;208m-----------开始偷表情包!-----------\033[0m")
# 启动消息发送控制任务
@group_msg.handle()
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
@@ -90,7 +102,7 @@ async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
# 添加build_memory定时任务
@scheduler.scheduled_job("interval", seconds=global_config.build_memory_interval, id="build_memory")
async def build_memory_task():
"""30秒执行一次记忆构建"""
"""build_memory_interval秒执行一次记忆构建"""
print("\033[1;32m[记忆构建]\033[0m -------------------------------------------开始构建记忆-------------------------------------------")
start_time = time.time()
await hippocampus.operation_build_memory(chat_size=20)

View File

@@ -15,7 +15,7 @@ from .message import Message_Thinking # 导入 Message_Thinking 类
from .relationship_manager import relationship_manager
from .willing_manager import willing_manager # 导入意愿管理器
from .utils import is_mentioned_bot_in_txt, calculate_typing_time
from ..memory_system.memory import memory_graph
from ..memory_system.memory import memory_graph,hippocampus
from loguru import logger
class ChatBot:
@@ -58,6 +58,7 @@ class ChatBot:
plain_text=event.get_plaintext(),
reply_message=event.reply,
)
await message.initialize()
# 过滤词
for word in global_config.ban_words:
@@ -70,24 +71,12 @@ class ChatBot:
topic=await topic_identifier.identify_topic_llm(message.processed_plain_text)
# topic1 = topic_identifier.identify_topic_jieba(message.processed_plain_text)
# topic2 = await topic_identifier.identify_topic_llm(message.processed_plain_text)
# topic3 = topic_identifier.identify_topic_snownlp(message.processed_plain_text)
logger.info(f"\033[1;32m[主题识别]\033[0m 使用{global_config.topic_extract}主题: {topic}")
all_num = 0
interested_num = 0
if topic:
for current_topic in topic:
all_num += 1
first_layer_items, second_layer_items = memory_graph.get_related_item(current_topic, depth=2)
if first_layer_items:
interested_num += 1
print(f"\033[1;32m[前额叶]\033[0m 对|{current_topic}|有印象")
interested_rate = interested_num / all_num if all_num > 0 else 0
# topic=await topic_identifier.identify_topic_llm(message.processed_plain_text)
topic = ''
interested_rate = 0
interested_rate = await hippocampus.memory_activate_value(message.processed_plain_text)/100
print(f"\033[1;32m[记忆激活]\033[0m 对{message.processed_plain_text}的激活度:---------------------------------------{interested_rate}\n")
# logger.info(f"\033[1;32m[主题识别]\033[0m 使用{global_config.topic_extract}主题: {topic}")
await self.storage.store_message(message, topic[0] if topic else None)
@@ -119,14 +108,9 @@ class ChatBot:
willing_manager.change_reply_willing_sent(thinking_message.group_id)
response, emotion = await self.gpt.generate_response(message)
# if response is None:
# thinking_message.interupt=True
response,raw_content = await self.gpt.generate_response(message)
if response:
# print(f"\033[1;32m[思考结束]\033[0m 思考结束,已得到回复,开始回复")
# 找到并删除对应的thinking消息
container = message_manager.get_container(event.group_id)
thinking_message = None
# 找到message,删除
@@ -134,7 +118,7 @@ class ChatBot:
if isinstance(msg, Message_Thinking) and msg.message_id == think_id:
thinking_message = msg
container.messages.remove(msg)
print(f"\033[1;32m[思考消息删除]\033[0m 已找到思考消息对象,开始删除")
# print(f"\033[1;32m[思考消息删除]\033[0m 已找到思考消息对象,开始删除")
break
#记录开始思考的时间,避免从思考到回复的时间太久
@@ -144,6 +128,7 @@ class ChatBot:
accu_typing_time = 0
# print(f"\033[1;32m[开始回复]\033[0m 开始将回复1载入发送容器")
mark_head = False
for msg in response:
# print(f"\033[1;32m[回复内容]\033[0m {msg}")
#通过时间改变时间戳
@@ -164,15 +149,20 @@ class ChatBot:
thinking_start_time=thinking_start_time, #记录了思考开始的时间
reply_message_id=message.message_id
)
await bot_message.initialize()
if not mark_head:
bot_message.is_head = True
mark_head = True
message_set.add_message(bot_message)
#message_set 可以直接加入 message_manager
print(f"\033[1;32m[回复]\033[0m 将回复载入发送容器")
# print(f"\033[1;32m[回复]\033[0m 将回复载入发送容器")
message_manager.add_message(message_set)
bot_response_time = tinking_time_point
if random() < global_config.emoji_chance:
emoji_path = await emoji_manager.get_emoji_for_emotion(emotion)
emoji_path,discription = await emoji_manager.get_emoji_for_text(response)
if emoji_path:
emoji_cq = CQCode.create_emoji_cq(emoji_path)
@@ -188,6 +178,7 @@ class ChatBot:
raw_message=emoji_cq,
plain_text=emoji_cq,
processed_plain_text=emoji_cq,
detailed_plain_text=discription,
user_nickname=global_config.BOT_NICKNAME,
group_name=message.group_name,
time=bot_response_time,
@@ -196,9 +187,16 @@ class ChatBot:
thinking_start_time=thinking_start_time,
# reply_message_id=message.message_id
)
await bot_message.initialize()
message_manager.add_message(bot_message)
emotion = await self.gpt._get_emotion_tags(raw_content)
print(f"'{response}' 获取到的情感标签为:{emotion}")
valuedict={
'happy':0.5,'angry':-1,'sad':-0.5,'surprised':0.5,'disgusted':-1.5,'fearful':-0.25,'neutral':0.25
}
await relationship_manager.update_relationship_value(message.user_id, relationship_value=valuedict[emotion[0]])
willing_manager.change_reply_willing_after_sent(event.group_id)
# willing_manager.change_reply_willing_after_sent(event.group_id)
# 创建全局ChatBot实例
chat_bot = ChatBot()

View File

@@ -30,23 +30,26 @@ class BotConfig:
forget_memory_interval: int = 300 # 记忆遗忘间隔(秒)
EMOJI_CHECK_INTERVAL: int = 120 # 表情包检查间隔(分钟)
EMOJI_REGISTER_INTERVAL: int = 10 # 表情包注册间隔(分钟)
EMOJI_SAVE: bool = True # 偷表情包
EMOJI_CHECK: bool = False #是否开启过滤
EMOJI_CHECK_PROMPT: str = "符合公序良俗" # 表情包过滤要求
ban_words = set()
max_response_length: int = 1024 # 最大回复长度
# 模型配置
llm_reasoning: Dict[str, str] = field(default_factory=lambda: {})
llm_reasoning_minor: Dict[str, str] = field(default_factory=lambda: {})
llm_normal: Dict[str, str] = field(default_factory=lambda: {})
llm_normal_minor: Dict[str, str] = field(default_factory=lambda: {})
llm_topic_judge: Dict[str, str] = field(default_factory=lambda: {})
llm_summary_by_topic: Dict[str, str] = field(default_factory=lambda: {})
llm_emotion_judge: Dict[str, str] = field(default_factory=lambda: {})
embedding: Dict[str, str] = field(default_factory=lambda: {})
vlm: Dict[str, str] = field(default_factory=lambda: {})
moderation: Dict[str, str] = field(default_factory=lambda: {})
# 主题提取配置
topic_extract: str = 'snownlp' # 只支持jieba,snownlp,llm
llm_topic_extract: Dict[str, str] = field(default_factory=lambda: {})
API_USING: str = "siliconflow" # 使用的API
API_PAID: bool = False # 是否使用付费API
MODEL_R1_PROBABILITY: float = 0.8 # R1模型概率
MODEL_V3_PROBABILITY: float = 0.1 # V3模型概率
MODEL_R1_DISTILL_PROBABILITY: float = 0.1 # R1蒸馏模型概率
@@ -93,6 +96,9 @@ class BotConfig:
emoji_config = toml_dict["emoji"]
config.EMOJI_CHECK_INTERVAL = emoji_config.get("check_interval", config.EMOJI_CHECK_INTERVAL)
config.EMOJI_REGISTER_INTERVAL = emoji_config.get("register_interval", config.EMOJI_REGISTER_INTERVAL)
config.EMOJI_CHECK_PROMPT = emoji_config.get('check_prompt',config.EMOJI_CHECK_PROMPT)
config.EMOJI_SAVE = emoji_config.get('auto_save',config.EMOJI_SAVE)
config.EMOJI_CHECK = emoji_config.get('enable_check',config.EMOJI_CHECK)
if "cq_code" in toml_dict:
cq_code_config = toml_dict["cq_code"]
@@ -110,8 +116,7 @@ class BotConfig:
config.MODEL_R1_PROBABILITY = response_config.get("model_r1_probability", config.MODEL_R1_PROBABILITY)
config.MODEL_V3_PROBABILITY = response_config.get("model_v3_probability", config.MODEL_V3_PROBABILITY)
config.MODEL_R1_DISTILL_PROBABILITY = response_config.get("model_r1_distill_probability", config.MODEL_R1_DISTILL_PROBABILITY)
config.API_USING = response_config.get("api_using", config.API_USING)
config.API_PAID = response_config.get("api_paid", config.API_PAID)
config.max_response_length = response_config.get("max_response_length", config.max_response_length)
# 加载模型配置
if "model" in toml_dict:
@@ -125,10 +130,18 @@ class BotConfig:
if "llm_normal" in model_config:
config.llm_normal = model_config["llm_normal"]
config.llm_topic_extract = config.llm_normal
if "llm_normal_minor" in model_config:
config.llm_normal_minor = model_config["llm_normal_minor"]
if "llm_topic_judge" in model_config:
config.llm_topic_judge = model_config["llm_topic_judge"]
if "llm_summary_by_topic" in model_config:
config.llm_summary_by_topic = model_config["llm_summary_by_topic"]
if "llm_emotion_judge" in model_config:
config.llm_emotion_judge = model_config["llm_emotion_judge"]
if "vlm" in model_config:
config.vlm = model_config["vlm"]
@@ -136,14 +149,8 @@ class BotConfig:
if "embedding" in model_config:
config.embedding = model_config["embedding"]
if 'topic' in toml_dict:
topic_config=toml_dict['topic']
if 'topic_extract' in topic_config:
config.topic_extract=topic_config.get('topic_extract',config.topic_extract)
logger.info(f"载入自定义主题提取为{config.topic_extract}")
if config.topic_extract=='llm' and 'llm_topic' in topic_config:
config.llm_topic_extract=topic_config['llm_topic']
logger.info(f"载入自定义主题提取模型为{config.llm_topic_extract['name']}")
if "moderation" in model_config:
config.moderation = model_config["moderation"]
# 消息配置
if "message" in toml_dict:
@@ -178,13 +185,13 @@ class BotConfig:
bot_config_floder_path = BotConfig.get_config_dir()
print(f"正在品鉴配置文件目录: {bot_config_floder_path}")
bot_config_path = os.path.join(bot_config_floder_path, "bot_config_dev.toml")
if not os.path.exists(bot_config_path):
bot_config_path = os.path.join(bot_config_floder_path, "bot_config.toml")
if os.path.exists(bot_config_path):
# 如果开发环境配置文件不存在,则使用默认配置文件
bot_config_path = os.path.join(bot_config_floder_path, "bot_config.toml")
print(f"异常的新鲜,异常的美味: {bot_config_path}")
logger.info("使用bot配置文件")
else:
logger.info("已找到开发bot配置文件")
logger.info("没有找到美味")
global_config = BotConfig.load_config(config_path=bot_config_path)

View File

@@ -10,12 +10,12 @@ from nonebot.adapters.onebot.v11 import Bot
from .config import global_config
import time
import asyncio
from .utils_image import storage_image,storage_emoji
from .utils_image import storage_image, storage_emoji
from .utils_user import get_user_nickname
from ..models.utils_model import LLM_request
from .mapper import emojimapper
#解析各种CQ码
#包含CQ码类
# 解析各种CQ码
# 包含CQ码类
import urllib3
from urllib3.util import create_urllib3_context
from nonebot import get_driver
@@ -28,6 +28,7 @@ ctx = create_urllib3_context()
ctx.load_default_certs()
ctx.set_ciphers("AES128-GCM-SHA256")
class TencentSSLAdapter(requests.adapters.HTTPAdapter):
def __init__(self, ssl_context=None, **kwargs):
self.ssl_context = ssl_context
@@ -38,6 +39,7 @@ class TencentSSLAdapter(requests.adapters.HTTPAdapter):
num_pools=connections, maxsize=maxsize,
block=block, ssl_context=self.ssl_context)
@dataclass
class CQCode:
"""
@@ -65,15 +67,15 @@ class CQCode:
"""初始化LLM实例"""
self._llm = LLM_request(model=global_config.vlm, temperature=0.4, max_tokens=300)
def translate(self):
async def translate(self):
"""根据CQ码类型进行相应的翻译处理"""
if self.type == 'text':
self.translated_plain_text = self.params.get('text', '')
elif self.type == 'image':
if self.params.get('sub_type') == '0':
self.translated_plain_text = self.translate_image()
self.translated_plain_text = await self.translate_image()
else:
self.translated_plain_text = self.translate_emoji()
self.translated_plain_text = await self.translate_emoji()
elif self.type == 'at':
user_nickname = get_user_nickname(self.params.get('qq', ''))
if user_nickname:
@@ -81,13 +83,13 @@ class CQCode:
else:
self.translated_plain_text = f"@某人"
elif self.type == 'reply':
self.translated_plain_text = self.translate_reply()
self.translated_plain_text = await self.translate_reply()
elif self.type == 'face':
face_id = self.params.get('id', '')
# self.translated_plain_text = f"[表情{face_id}]"
self.translated_plain_text = f"[{emojimapper.get(int(face_id), "表情")}]"
elif self.type == 'forward':
self.translated_plain_text = self.translate_forward()
self.translated_plain_text = await self.translate_forward()
else:
self.translated_plain_text = f"[{self.type}]"
@@ -134,7 +136,7 @@ class CQCode:
# 腾讯服务器特殊状态码处理
if response.status_code == 400 and 'multimedia.nt.qq.com.cn' in url:
return None
if response.status_code != 200:
raise requests.exceptions.HTTPError(f"HTTP {response.status_code}")
@@ -158,8 +160,8 @@ class CQCode:
return None
return None
def translate_emoji(self) -> str:
async def translate_emoji(self) -> str:
"""处理表情包类型的CQ码"""
if 'url' not in self.params:
return '[表情包]'
@@ -168,50 +170,51 @@ class CQCode:
# 将 base64 字符串转换为字节类型
image_bytes = base64.b64decode(base64_str)
storage_emoji(image_bytes)
return self.get_emoji_description(base64_str)
return await self.get_emoji_description(base64_str)
else:
return '[表情包]'
def translate_image(self) -> str:
async def translate_image(self) -> str:
"""处理图片类型的CQ码区分普通图片和表情包"""
#没有url直接返回默认文本
# 没有url直接返回默认文本
if 'url' not in self.params:
return '[图片]'
base64_str = self.get_img()
if base64_str:
image_bytes = base64.b64decode(base64_str)
storage_image(image_bytes)
return self.get_image_description(base64_str)
return await self.get_image_description(base64_str)
else:
return '[图片]'
def get_emoji_description(self, image_base64: str) -> str:
async def get_emoji_description(self, image_base64: str) -> str:
"""调用AI接口获取表情包描述"""
try:
prompt = "这是一个表情包请用简短的中文描述这个表情包传达的情感和含义。最多20个字。"
description, _ = self._llm.generate_response_for_image_sync(prompt, image_base64)
# description, _ = self._llm.generate_response_for_image_sync(prompt, image_base64)
description, _ = await self._llm.generate_response_for_image(prompt, image_base64)
return f"[表情包:{description}]"
except Exception as e:
print(f"\033[1;31m[错误]\033[0m AI接口调用失败: {str(e)}")
return "[表情包]"
def get_image_description(self, image_base64: str) -> str:
async def get_image_description(self, image_base64: str) -> str:
"""调用AI接口获取普通图片描述"""
try:
prompt = "请用中文描述这张图片的内容。如果有文字请把文字都描述出来。并尝试猜测这个图片的含义。最多200个字。"
description, _ = self._llm.generate_response_for_image_sync(prompt, image_base64)
# description, _ = self._llm.generate_response_for_image_sync(prompt, image_base64)
description, _ = await self._llm.generate_response_for_image(prompt, image_base64)
return f"[图片:{description}]"
except Exception as e:
print(f"\033[1;31m[错误]\033[0m AI接口调用失败: {str(e)}")
return "[图片]"
def translate_forward(self) -> str:
async def translate_forward(self) -> str:
"""处理转发消息"""
try:
if 'content' not in self.params:
return '[转发消息]'
# 解析content内容需要先反转义
content = self.unescape(self.params['content'])
# print(f"\033[1;34m[调试信息]\033[0m 转发消息内容: {content}")
@@ -222,17 +225,17 @@ class CQCode:
except ValueError as e:
print(f"\033[1;31m[错误]\033[0m 解析转发消息内容失败: {str(e)}")
return '[转发消息]'
# 处理每条消息
formatted_messages = []
for msg in messages:
sender = msg.get('sender', {})
nickname = sender.get('card') or sender.get('nickname', '未知用户')
# 获取消息内容并使用Message类处理
raw_message = msg.get('raw_message', '')
message_array = msg.get('message', [])
if message_array and isinstance(message_array, list):
# 检查是否包含嵌套的转发消息
for message_part in message_array:
@@ -250,6 +253,7 @@ class CQCode:
plain_text=raw_message,
group_id=msg.get('group_id', 0)
)
await message_obj.initialize()
content = message_obj.processed_plain_text
else:
content = '[空消息]'
@@ -264,23 +268,24 @@ class CQCode:
plain_text=raw_message,
group_id=msg.get('group_id', 0)
)
await message_obj.initialize()
content = message_obj.processed_plain_text
else:
content = '[空消息]'
formatted_msg = f"{nickname}: {content}"
formatted_messages.append(formatted_msg)
# 合并所有消息
combined_messages = '\n'.join(formatted_messages)
print(f"\033[1;34m[调试信息]\033[0m 合并后的转发消息: {combined_messages}")
return f"[转发消息:\n{combined_messages}]"
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 处理转发消息失败: {str(e)}")
return '[转发消息]'
def translate_reply(self) -> str:
async def translate_reply(self) -> str:
"""处理回复类型的CQ码"""
# 创建Message对象
@@ -288,7 +293,7 @@ class CQCode:
if self.reply_message == None:
# print(f"\033[1;31m[错误]\033[0m 回复消息为空")
return '[回复某人消息]'
if self.reply_message.sender.user_id:
message_obj = Message(
user_id=self.reply_message.sender.user_id,
@@ -296,6 +301,7 @@ class CQCode:
raw_message=str(self.reply_message.message),
group_id=self.group_id
)
await message_obj.initialize()
if message_obj.user_id == global_config.BOT_QQ:
return f"[回复 {global_config.BOT_NICKNAME} 的消息: {message_obj.processed_plain_text}]"
else:
@@ -309,9 +315,9 @@ class CQCode:
def unescape(text: str) -> str:
"""反转义CQ码中的特殊字符"""
return text.replace('&#44;', ',') \
.replace('&#91;', '[') \
.replace('&#93;', ']') \
.replace('&amp;', '&')
.replace('&#91;', '[') \
.replace('&#93;', ']') \
.replace('&amp;', '&')
@staticmethod
def create_emoji_cq(file_path: str) -> str:
@@ -326,15 +332,16 @@ class CQCode:
abs_path = os.path.abspath(file_path)
# 转义特殊字符
escaped_path = abs_path.replace('&', '&amp;') \
.replace('[', '&#91;') \
.replace(']', '&#93;') \
.replace(',', '&#44;')
.replace('[', '&#91;') \
.replace(']', '&#93;') \
.replace(',', '&#44;')
# 生成CQ码设置sub_type=1表示这是表情包
return f"[CQ:image,file=file:///{escaped_path},sub_type=1]"
class CQCode_tool:
@staticmethod
def cq_from_dict_to_class(cq_code: Dict, reply: Optional[Dict] = None) -> CQCode:
async def cq_from_dict_to_class(cq_code: Dict, reply: Optional[Dict] = None) -> CQCode:
"""
将CQ码字典转换为CQCode对象
@@ -353,7 +360,7 @@ class CQCode_tool:
params['text'] = cq_code.get('data', {}).get('text', '')
else:
params = cq_code.get('data', {})
instance = CQCode(
type=cq_type,
params=params,
@@ -361,11 +368,11 @@ class CQCode_tool:
user_id=0,
reply_message=reply
)
# 进行翻译处理
instance.translate()
await instance.translate()
return instance
@staticmethod
def create_reply_cq(message_id: int) -> str:
"""
@@ -376,6 +383,6 @@ class CQCode_tool:
回复CQ码字符串
"""
return f"[CQ:reply,id={message_id}]"
cq_code_tool = CQCode_tool()

View File

@@ -14,10 +14,14 @@ import asyncio
import time
from PIL import Image
import io
from loguru import logger
import traceback
from nonebot import get_driver
from ..chat.config import global_config
from ..models.utils_model import LLM_request
from ..chat.utils_image import image_path_to_base64
from ..chat.utils import get_embedding
driver = get_driver()
config = driver.config
@@ -27,16 +31,6 @@ class EmojiManager:
_instance = None
EMOJI_DIR = "data/emoji" # 表情包存储目录
EMOTION_KEYWORDS = {
'happy': ['开心', '快乐', '高兴', '欢喜', '', '喜悦', '兴奋', '愉快', '', ''],
'angry': ['生气', '愤怒', '恼火', '不爽', '火大', '', '气愤', '恼怒', '发火', '不满'],
'sad': ['伤心', '难过', '悲伤', '痛苦', '', '忧伤', '悲痛', '哀伤', '委屈', '失落'],
'surprised': ['惊讶', '震惊', '吃惊', '意外', '', '诧异', '惊奇', '惊喜', '不敢相信', '目瞪口呆'],
'disgusted': ['恶心', '讨厌', '厌恶', '反感', '嫌弃', '', '嫌恶', '憎恶', '不喜欢', ''],
'fearful': ['害怕', '恐惧', '惊恐', '担心', '', '惊吓', '惊慌', '畏惧', '胆怯', ''],
'neutral': ['普通', '一般', '还行', '正常', '平静', '平淡', '一般般', '凑合', '还好', '就这样']
}
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
@@ -47,7 +41,8 @@ class EmojiManager:
def __init__(self):
self.db = Database.get_instance()
self._scan_task = None
self.llm = LLM_request(model=global_config.vlm, temperature=0.3, max_tokens=50)
self.vlm = LLM_request(model=global_config.vlm, temperature=0.3, max_tokens=1000)
self.llm_emotion_judge = LLM_request(model=global_config.llm_normal_minor, max_tokens=60,temperature=0.8) #更高的温度更少的token后续可以根据情绪来调整温度
def _ensure_emoji_dir(self):
"""确保表情存储目录存在"""
@@ -64,7 +59,7 @@ class EmojiManager:
# 启动时执行一次完整性检查
self.check_emoji_file_integrity()
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 初始化表情管理器失败: {str(e)}")
logger.error(f"初始化表情管理器失败: {str(e)}")
def _ensure_db(self):
"""确保数据库已初始化"""
@@ -74,9 +69,20 @@ class EmojiManager:
raise RuntimeError("EmojiManager not initialized")
def _ensure_emoji_collection(self):
"""确保emoji集合存在并创建索引"""
"""确保emoji集合存在并创建索引
这个函数用于确保MongoDB数据库中存在emoji集合,并创建必要的索引。
索引的作用是加快数据库查询速度:
- embedding字段的2dsphere索引: 用于加速向量相似度搜索,帮助快速找到相似的表情包
- tags字段的普通索引: 加快按标签搜索表情包的速度
- filename字段的唯一索引: 确保文件名不重复,同时加快按文件名查找的速度
没有索引的话,数据库每次查询都需要扫描全部数据,建立索引后可以大大提高查询效率。
"""
if 'emoji' not in self.db.db.list_collection_names():
self.db.db.create_collection('emoji')
self.db.db.emoji.create_index([('embedding', '2dsphere')])
self.db.db.emoji.create_index([('tags', 1)])
self.db.db.emoji.create_index([('filename', 1)], unique=True)
@@ -89,228 +95,128 @@ class EmojiManager:
{'$inc': {'usage_count': 1}}
)
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 记录表情使用失败: {str(e)}")
logger.error(f"记录表情使用失败: {str(e)}")
async def _get_emotion_from_text(self, text: str) -> List[str]:
"""从文本中识别情感关键词
Args:
text: 输入文本
Returns:
List[str]: 匹配到的情感标签列表
"""
try:
prompt = f'分析这段文本:"{text}",从"happy,angry,sad,surprised,disgusted,fearful,neutral"中选出最匹配的1个情感标签。只需要返回标签不要输出其他任何内容。'
content, _ = await self.llm.generate_response(prompt)
emotion = content.strip().lower()
if emotion in self.EMOTION_KEYWORDS:
print(f"\033[1;32m[成功]\033[0m 识别到的情感: {emotion}")
return [emotion]
return ['neutral']
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 情感分析失败: {str(e)}")
return ['neutral']
async def get_emoji_for_emotion(self, emotion_tag: str) -> Optional[str]:
try:
self._ensure_db()
# 构建查询条件:标签匹配任一情感
query = {'tags': {'$in': emotion_tag}}
# print(f"\033[1;34m[调试]\033[0m 表情查询条件: {query}")
try:
# 随机获取一个匹配的表情
emoji = self.db.db.emoji.aggregate([
{'$match': query},
{'$sample': {'size': 1}}
]).next()
print(f"\033[1;32m[成功]\033[0m 找到匹配的表情")
if emoji and 'path' in emoji:
# 更新使用次数
self.db.db.emoji.update_one(
{'_id': emoji['_id']},
{'$inc': {'usage_count': 1}}
)
return emoji['path']
except StopIteration:
# 如果没有匹配的表情,从所有表情中随机选择一个
print(f"\033[1;33m[提示]\033[0m 未找到匹配的表情,随机选择一个")
try:
emoji = self.db.db.emoji.aggregate([
{'$sample': {'size': 1}}
]).next()
if emoji and 'path' in emoji:
# 更新使用次数
self.db.db.emoji.update_one(
{'_id': emoji['_id']},
{'$inc': {'usage_count': 1}}
)
return emoji['path']
except StopIteration:
print(f"\033[1;31m[错误]\033[0m 数据库中没有任何表情")
return None
return None
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 获取表情包失败: {str(e)}")
return None
async def get_emoji_for_text(self, text: str) -> Optional[str]:
"""根据文本内容获取相关表情包
Args:
text: 输入文本
Returns:
Optional[str]: 表情包文件路径如果没有找到则返回None
可不可以通过 配置文件中的指令 来自定义使用表情包的逻辑?
我觉得可行
"""
try:
self._ensure_db()
# 获取情感标签
emotions = await self._get_emotion_from_text(text)
print(""+ str(text) + " 获取到的情感标签为:" + str(emotions))
if not emotions:
return None
# 构建查询条件:标签匹配任一情感
query = {'tags': {'$in': emotions}}
print(f"\033[1;34m[调试]\033[0m 表情查询条件: {query}")
print(f"\033[1;34m[调试]\033[0m 匹配到的情感: {emotions}")
# 获取文本的embedding
text_for_search= await self._get_kimoji_for_text(text)
if not text_for_search:
logger.error("无法获取文本的情绪")
return None
text_embedding = await get_embedding(text_for_search)
if not text_embedding:
logger.error("无法获取文本的embedding")
return None
try:
# 随机获取一个匹配的表情
emoji = self.db.db.emoji.aggregate([
{'$match': query},
{'$sample': {'size': 1}}
]).next()
print(f"\033[1;32m[成功]\033[0m 找到匹配的表情")
if emoji and 'path' in emoji:
# 获取所有表情
all_emojis = list(self.db.db.emoji.find({}, {'_id': 1, 'path': 1, 'embedding': 1, 'discription': 1}))
if not all_emojis:
logger.warning("数据库中没有任何表情包")
return None
# 计算余弦相似度并排序
def cosine_similarity(v1, v2):
if not v1 or not v2:
return 0
dot_product = sum(a * b for a, b in zip(v1, v2))
norm_v1 = sum(a * a for a in v1) ** 0.5
norm_v2 = sum(b * b for b in v2) ** 0.5
if norm_v1 == 0 or norm_v2 == 0:
return 0
return dot_product / (norm_v1 * norm_v2)
# 计算所有表情包与输入文本的相似度
emoji_similarities = [
(emoji, cosine_similarity(text_embedding, emoji.get('embedding', [])))
for emoji in all_emojis
]
# 按相似度降序排序
emoji_similarities.sort(key=lambda x: x[1], reverse=True)
# 获取前3个最相似的表情包
top_3_emojis = emoji_similarities[:3]
if not top_3_emojis:
logger.warning("未找到匹配的表情包")
return None
# 从前3个中随机选择一个
selected_emoji, similarity = random.choice(top_3_emojis)
if selected_emoji and 'path' in selected_emoji:
# 更新使用次数
self.db.db.emoji.update_one(
{'_id': emoji['_id']},
{'_id': selected_emoji['_id']},
{'$inc': {'usage_count': 1}}
)
return emoji['path']
except StopIteration:
# 如果没有匹配的表情,从所有表情中随机选择一个
print(f"\033[1;33m[提示]\033[0m 未找到匹配的表情,随机选择一个")
try:
emoji = self.db.db.emoji.aggregate([
{'$sample': {'size': 1}}
]).next()
if emoji and 'path' in emoji:
# 更新使用次数
self.db.db.emoji.update_one(
{'_id': emoji['_id']},
{'$inc': {'usage_count': 1}}
)
return emoji['path']
except StopIteration:
print(f"\033[1;31m[错误]\033[0m 数据库中没有任何表情")
return None
logger.success(f"找到匹配的表情包: {selected_emoji.get('discription', '无描述')} (相似度: {similarity:.4f})")
# 稍微改一下文本描述,不然容易产生幻觉,描述已经包含 表情包 了
return selected_emoji['path'],"[ %s ]" % selected_emoji.get('discription', '无描述')
except Exception as search_error:
logger.error(f"搜索表情包失败: {str(search_error)}")
return None
return None
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 获取表情包失败: {str(e)}")
logger.error(f"获取表情包失败: {str(e)}")
return None
async def _get_emoji_tag(self, image_base64: str) -> str:
async def _get_emoji_discription(self, image_base64: str) -> str:
"""获取表情包的标签"""
try:
prompt = '这是一个表情包,请从"happy", "angry", "sad", "surprised", "disgusted", "fearful", "neutral"中选出1个情感标签。只输出标签不要输出其他任何内容只输出情感标签就好'
prompt = '这是一个表情包,使用中文简洁的描述一下表情包的内容和表情包所表达的情感'
content, _ = await self.llm.generate_response_for_image(prompt, image_base64)
tag_result = content.strip().lower()
valid_tags = ["happy", "angry", "sad", "surprised", "disgusted", "fearful", "neutral"]
for tag_match in valid_tags:
if tag_match in tag_result or tag_match == tag_result:
return tag_match
print(f"\033[1;33m[警告]\033[0m 无效的标签: {tag_result}, 跳过")
content, _ = await self.vlm.generate_response_for_image(prompt, image_base64)
logger.debug(f"输出描述: {content}")
return content
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 获取标签失败: {str(e)}")
return "skip"
print(f"\033[1;32m[调试信息]\033[0m 使用默认标签: neutral")
return "skip" # 默认标签
async def _compress_image(self, image_path: str, target_size: int = 0.8 * 1024 * 1024) -> Optional[str]:
"""压缩图片并返回base64编码
Args:
image_path: 图片文件路径
target_size: 目标文件大小字节默认0.8MB
Returns:
Optional[str]: 成功返回base64编码的图片数据失败返回None
"""
try:
file_size = os.path.getsize(image_path)
if file_size <= target_size:
# 如果文件已经小于目标大小直接读取并返回base64
with open(image_path, 'rb') as f:
return base64.b64encode(f.read()).decode('utf-8')
# 打开图片
with Image.open(image_path) as img:
# 获取原始尺寸
original_width, original_height = img.size
# 计算缩放比例
scale = min(1.0, (target_size / file_size) ** 0.5)
# 计算新的尺寸
new_width = int(original_width * scale)
new_height = int(original_height * scale)
# 创建内存缓冲区
output_buffer = io.BytesIO()
# 如果是GIF处理所有帧
if getattr(img, "is_animated", False):
frames = []
for frame_idx in range(img.n_frames):
img.seek(frame_idx)
new_frame = img.copy()
new_frame = new_frame.resize((new_width, new_height), Image.Resampling.LANCZOS)
frames.append(new_frame)
# 保存到缓冲区
frames[0].save(
output_buffer,
format='GIF',
save_all=True,
append_images=frames[1:],
optimize=True,
duration=img.info.get('duration', 100),
loop=img.info.get('loop', 0)
)
else:
# 处理静态图片
resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
# 保存到缓冲区,保持原始格式
if img.format == 'PNG' and img.mode in ('RGBA', 'LA'):
resized_img.save(output_buffer, format='PNG', optimize=True)
else:
resized_img.save(output_buffer, format='JPEG', quality=95, optimize=True)
# 获取压缩后的数据并转换为base64
compressed_data = output_buffer.getvalue()
print(f"\033[1;32m[成功]\033[0m 压缩图片: {os.path.basename(image_path)} ({original_width}x{original_height} -> {new_width}x{new_height})")
return base64.b64encode(compressed_data).decode('utf-8')
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 压缩图片失败: {os.path.basename(image_path)}, 错误: {str(e)}")
logger.error(f"获取标签失败: {str(e)}")
return None
async def _check_emoji(self, image_base64: str) -> str:
try:
prompt = f'这是一个表情包,请回答这个表情包是否满足\"{global_config.EMOJI_CHECK_PROMPT}\"的要求,是则回答是,否则回答否,不要出现任何其他内容'
content, _ = await self.vlm.generate_response_for_image(prompt, image_base64)
logger.debug(f"输出描述: {content}")
return content
except Exception as e:
logger.error(f"获取标签失败: {str(e)}")
return None
async def _get_kimoji_for_text(self, text:str):
try:
prompt = f'这是{global_config.BOT_NICKNAME}将要发送的消息内容:\n{text}\n若要为其配上表情包,请你输出这个表情包应该表达怎样的情感,应该给人什么样的感觉,不要太简洁也不要太长,注意不要输出任何对消息内容的分析内容,只输出\"一种什么样的感觉\"中间的形容词部分。'
content, _ = await self.llm_emotion_judge.generate_response_async(prompt)
logger.info(f"输出描述: {content}")
return content
except Exception as e:
logger.error(f"获取标签失败: {str(e)}")
return None
async def scan_new_emojis(self):
"""扫描新的表情包"""
try:
@@ -329,34 +235,43 @@ class EmojiManager:
continue
# 压缩图片并获取base64编码
image_base64 = await self._compress_image(image_path)
image_base64 = image_path_to_base64(image_path)
if image_base64 is None:
os.remove(image_path)
continue
# 获取表情包的情感标签
tag = await self._get_emoji_tag(image_base64)
if not tag == "skip":
# 获取表情包的描述
discription = await self._get_emoji_discription(image_base64)
if global_config.EMOJI_CHECK:
check = await self._check_emoji(image_base64)
if '' not in check:
os.remove(image_path)
logger.info(f"描述: {discription}")
logger.info(f"其不满足过滤规则,被剔除 {check}")
continue
logger.info(f"check通过 {check}")
embedding = await get_embedding(discription)
if discription is not None:
# 准备数据库记录
emoji_record = {
'filename': filename,
'path': image_path,
'tags': [tag],
'embedding':embedding,
'discription': discription,
'timestamp': int(time.time())
}
# 保存到数据库
self.db.db['emoji'].insert_one(emoji_record)
print(f"\033[1;32m[成功]\033[0m 注册新表情包: {filename}")
print(f"标签: {tag}")
logger.success(f"注册新表情包: {filename}")
logger.info(f"描述: {discription}")
else:
print(f"\033[1;33m[警告]\033[0m 跳过表情包: {filename}")
logger.warning(f"跳过表情包: {filename}")
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 扫描表情包失败: {str(e)}")
import traceback
print(traceback.format_exc())
logger.error(f"扫描表情包失败: {str(e)}")
logger.error(traceback.format_exc())
async def _periodic_scan(self, interval_MINS: int = 10):
"""定期扫描新表情包"""
while True:
@@ -364,6 +279,7 @@ class EmojiManager:
await self.scan_new_emojis()
await asyncio.sleep(interval_MINS * 60) # 每600秒扫描一次
def check_emoji_file_integrity(self):
"""检查表情包文件完整性
如果文件已被删除,则从数据库中移除对应记录
@@ -378,44 +294,42 @@ class EmojiManager:
for emoji in all_emojis:
try:
if 'path' not in emoji:
print(f"\033[1;33m[提示]\033[0m 发现无效记录缺少path字段ID: {emoji.get('_id', 'unknown')}")
logger.warning(f"发现无效记录缺少path字段ID: {emoji.get('_id', 'unknown')}")
self.db.db.emoji.delete_one({'_id': emoji['_id']})
removed_count += 1
continue
if 'embedding' not in emoji:
logger.warning(f"发现过时记录缺少embedding字段ID: {emoji.get('_id', 'unknown')}")
self.db.db.emoji.delete_one({'_id': emoji['_id']})
removed_count += 1
continue
# 检查文件是否存在
if not os.path.exists(emoji['path']):
print(f"\033[1;33m[提示]\033[0m 表情包文件已被删除: {emoji['path']}")
logger.warning(f"表情包文件已被删除: {emoji['path']}")
# 从数据库中删除记录
result = self.db.db.emoji.delete_one({'_id': emoji['_id']})
if result.deleted_count > 0:
print(f"\033[1;32m[成功]\033[0m 成功删除数据库记录: {emoji['_id']}")
logger.success(f"成功删除数据库记录: {emoji['_id']}")
removed_count += 1
else:
print(f"\033[1;31m[错误]\033[0m 删除数据库记录失败: {emoji['_id']}")
logger.error(f"删除数据库记录失败: {emoji['_id']}")
except Exception as item_error:
print(f"\033[1;31m[错误]\033[0m 处理表情包记录时出错: {str(item_error)}")
logger.error(f"处理表情包记录时出错: {str(item_error)}")
continue
# 验证清理结果
remaining_count = self.db.db.emoji.count_documents({})
if removed_count > 0:
print(f"\033[1;32m[成功]\033[0m 已清理 {removed_count} 个失效的表情包记录")
print(f"\033[1;34m[统计]\033[0m 清理前总数: {total_count} | 清理后总数: {remaining_count}")
# print(f"\033[1;34m[统计]\033[0m 应删除数量: {removed_count} | 实际删除数量: {total_count - remaining_count}")
# 执行数据库压缩
try:
self.db.db.command({"compact": "emoji"})
print(f"\033[1;32m[成功]\033[0m 数据库集合压缩完成")
except Exception as compact_error:
print(f"\033[1;31m[错误]\033[0m 数据库压缩失败: {str(compact_error)}")
logger.success(f"已清理 {removed_count} 个失效的表情包记录")
logger.info(f"清理前总数: {total_count} | 清理后总数: {remaining_count}")
else:
print(f"\033[1;36m[表情包]\033[0m 已检查 {total_count} 个表情包记录")
logger.info(f"已检查 {total_count} 个表情包记录")
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 检查表情包完整性失败: {str(e)}")
import traceback
print(f"\033[1;31m[错误追踪]\033[0m\n{traceback.format_exc()}")
logger.error(f"检查表情包完整性失败: {str(e)}")
logger.error(traceback.format_exc())
async def start_periodic_check(self, interval_MINS: int = 120):
while True:

View File

@@ -24,6 +24,7 @@ class ResponseGenerator:
self.model_r1 = LLM_request(model=global_config.llm_reasoning, temperature=0.7,max_tokens=1000)
self.model_v3 = LLM_request(model=global_config.llm_normal, temperature=0.7,max_tokens=1000)
self.model_r1_distill = LLM_request(model=global_config.llm_reasoning_minor, temperature=0.7,max_tokens=1000)
self.model_v25 = LLM_request(model=global_config.llm_normal_minor, temperature=0.7,max_tokens=1000)
self.db = Database.get_instance()
self.current_model_type = 'r1' # 默认使用 R1
@@ -44,19 +45,15 @@ class ResponseGenerator:
print(f"+++++++++++++++++{global_config.BOT_NICKNAME}{self.current_model_type}思考中+++++++++++++++++")
model_response = await self._generate_response_with_model(message, current_model)
raw_content=model_response
if model_response:
print(f'{global_config.BOT_NICKNAME}的回复是:{model_response}')
model_response, emotion = await self._process_response(model_response)
model_response = await self._process_response(model_response)
if model_response:
print(f"'{model_response}' 获取到的情感标签为:{emotion}")
valuedict={
'happy':0.5,'angry':-1,'sad':-0.5,'surprised':0.5,'disgusted':-1.5,'fearful':-0.25,'neutral':0.25
}
await relationship_manager.update_relationship_value(message.user_id, relationship_value=valuedict[emotion[0]])
return model_response, emotion
return None, []
return model_response ,raw_content
return None,raw_content
async def _generate_response_with_model(self, message: Message, model: LLM_request) -> Optional[str]:
"""使用指定的模型生成回复"""
@@ -67,10 +64,11 @@ class ResponseGenerator:
# 获取关系值
relationship_value = relationship_manager.get_relationship(message.user_id).relationship_value if relationship_manager.get_relationship(message.user_id) else 0.0
if relationship_value != 0.0:
print(f"\033[1;32m[关系管理]\033[0m 回复中_当前关系值: {relationship_value}")
# print(f"\033[1;32m[关系管理]\033[0m 回复中_当前关系值: {relationship_value}")
pass
# 构建prompt
prompt, prompt_check = prompt_builder._build_prompt(
prompt, prompt_check = await prompt_builder._build_prompt(
message_txt=message.processed_plain_text,
sender_name=sender_name,
relationship_value=relationship_value,
@@ -142,7 +140,7 @@ class ResponseGenerator:
内容:{content}
输出:
'''
content, _ = await self.model_v3.generate_response(prompt)
content, _ = await self.model_v25.generate_response(prompt)
content=content.strip()
if content in ['happy','angry','sad','surprised','disgusted','fearful','neutral']:
return [content]
@@ -158,10 +156,9 @@ class ResponseGenerator:
if not content:
return None, []
emotion_tags = await self._get_emotion_tags(content)
processed_response = process_llm_response(content)
return processed_response, emotion_tags
return processed_response
class InitiativeMessageGenerate:

View File

@@ -27,58 +27,66 @@ class Message:
"""消息数据类"""
message_id: int = None
time: float = None
group_id: int = None
group_name: str = None # 群名称
group_name: str = None # 群名称
user_id: int = None
user_nickname: str = None # 用户昵称
user_cardname: str=None # 用户群昵称
raw_message: str = None # 原始消息包含未解析的cq码
plain_text: str = None # 纯文本
user_cardname: str = None # 用户群昵称
raw_message: str = None # 原始消息包含未解析的cq码
plain_text: str = None # 纯文本
reply_message: Dict = None # 存储 回复的 源消息
# 延迟初始化字段
_initialized: bool = False
message_segments: List[Dict] = None # 存储解析后的消息片段
processed_plain_text: str = None # 用于存储处理后的plain_text
detailed_plain_text: str = None # 用于存储详细可读文本
reply_message: Dict = None # 存储 回复的 源消息
is_emoji: bool = False # 是否是表情包
has_emoji: bool = False # 是否包含表情包
translate_cq: bool = True # 是否翻译cq码
def __post_init__(self):
if self.time is None:
self.time = int(time.time())
if not self.group_name:
self.group_name = get_groupname(self.group_id)
if not self.user_nickname:
self.user_nickname = get_user_nickname(self.user_id)
if not self.user_cardname:
self.user_cardname=get_user_cardname(self.user_id)
if not self.processed_plain_text:
if self.raw_message:
self.message_segments = self.parse_message_segments(str(self.raw_message))
# 状态标志
is_emoji: bool = False
has_emoji: bool = False
translate_cq: bool = True
async def initialize(self):
"""显式异步初始化方法(必须调用)"""
if self._initialized:
return
# 异步获取补充信息
self.group_name = self.group_name or get_groupname(self.group_id)
self.user_nickname = self.user_nickname or get_user_nickname(self.user_id)
self.user_cardname = self.user_cardname or get_user_cardname(self.user_id)
# 消息解析
if self.raw_message:
if not isinstance(self,Message_Sending):
self.message_segments = await self.parse_message_segments(self.raw_message)
self.processed_plain_text = ' '.join(
seg.translated_plain_text
for seg in self.message_segments
)
#将详细翻译为详细可读文本
# 构建详细文本
if self.time is None:
self.time = int(time.time())
time_str = time.strftime("%m-%d %H:%M:%S", time.localtime(self.time))
try:
name = f"{self.user_nickname}(ta的昵称:{self.user_cardname},ta的id:{self.user_id})"
except:
name = self.user_nickname or f"用户{self.user_id}"
content = self.processed_plain_text
self.detailed_plain_text = f"[{time_str}] {name}: {content}\n"
name = (
f"{self.user_nickname}(ta的昵称:{self.user_cardname},ta的id:{self.user_id})"
if self.user_cardname
else f"{self.user_nickname or f'用户{self.user_id}'}"
)
if isinstance(self,Message_Sending) and self.is_emoji:
self.detailed_plain_text = f"[{time_str}] {name}: {self.detailed_plain_text}\n"
else:
self.detailed_plain_text = f"[{time_str}] {name}: {self.processed_plain_text}\n"
self._initialized = True
def parse_message_segments(self, message: str) -> List[CQCode]:
async def parse_message_segments(self, message: str) -> List[CQCode]:
"""
将消息解析为片段列表包括纯文本和CQ码
返回的列表中每个元素都是字典,包含:
@@ -136,7 +144,7 @@ class Message:
#翻译作为字典的CQ码
for _code_item in cq_code_dict_list:
message_obj = cq_code_tool.cq_from_dict_to_class(_code_item,reply = self.reply_message)
message_obj = await cq_code_tool.cq_from_dict_to_class(_code_item,reply = self.reply_message)
trans_list.append(message_obj)
return trans_list
@@ -169,6 +177,8 @@ class Message_Sending(Message):
reply_message_id: int = None # 存储 回复的 源消息ID
is_head: bool = False # 是否是头部消息
def update_thinking_time(self):
self.thinking_time = round(time.time(), 2) - self.thinking_start_time
return self.thinking_time

View File

@@ -103,7 +103,7 @@ class MessageContainer:
def add_message(self, message: Union[Message_Thinking, Message_Sending]) -> None:
"""添加消息到队列"""
print(f"\033[1;32m[添加消息]\033[0m 添加消息到对应群")
# print(f"\033[1;32m[添加消息]\033[0m 添加消息到对应群")
if isinstance(message, MessageSet):
for single_message in message.messages:
self.messages.append(single_message)
@@ -156,26 +156,21 @@ class MessageManager:
#最早的对象,可能是思考消息,也可能是发送消息
message_earliest = container.get_earliest_message() #一个message_thinking or message_sending
#一个月后删了
if not message_earliest:
print(f"\033[1;34m[BUG如果出现这个说明有BUG3月4日留]\033[0m ")
return
#如果是思考消息
if isinstance(message_earliest, Message_Thinking):
#优先等待这条消息
message_earliest.update_thinking_time()
thinking_time = message_earliest.thinking_time
print(f"\033[1;34m[调试]\033[0m 消息正在思考中,已思考{int(thinking_time)}")
if thinking_time % 10 == 0:
print(f"\033[1;34m[调试]\033[0m 消息正在思考中,已思考{int(thinking_time)}")
else:# 如果不是message_thinking就只能是message_sending
print(f"\033[1;34m[调试]\033[0m 消息'{message_earliest.processed_plain_text}'正在发送中")
#直接发,等什么呢
if message_earliest.update_thinking_time() < 30:
await message_sender.send_group_message(group_id, message_earliest.processed_plain_text, auto_escape=False)
else:
if message_earliest.is_head and message_earliest.update_thinking_time() >30:
await message_sender.send_group_message(group_id, message_earliest.processed_plain_text, auto_escape=False, reply_message_id=message_earliest.reply_message_id)
#移除消息
else:
await message_sender.send_group_message(group_id, message_earliest.processed_plain_text, auto_escape=False)
#移除消息
if message_earliest.is_emoji:
message_earliest.processed_plain_text = "[表情包]"
await self.storage.store_message(message_earliest, None)
@@ -192,10 +187,11 @@ class MessageManager:
try:
#发送
if msg.update_thinking_time() < 30:
await message_sender.send_group_message(group_id, msg.processed_plain_text, auto_escape=False)
else:
if msg.is_head and msg.update_thinking_time() >30:
await message_sender.send_group_message(group_id, msg.processed_plain_text, auto_escape=False, reply_message_id=msg.reply_message_id)
else:
await message_sender.send_group_message(group_id, msg.processed_plain_text, auto_escape=False)
#如果是表情包,则替换为"[表情包]"
if msg.is_emoji:

View File

@@ -6,9 +6,11 @@ from .utils import get_embedding, combine_messages, get_recent_group_detailed_pl
from ...common.database import Database
from .config import global_config
from .topic_identifier import topic_identifier
from ..memory_system.memory import memory_graph
from ..memory_system.memory import memory_graph,hippocampus
from random import choice
import numpy as np
import jieba
from collections import Counter
class PromptBuilder:
def __init__(self):
@@ -16,7 +18,9 @@ class PromptBuilder:
self.activate_messages = ''
self.db = Database.get_instance()
def _build_prompt(self,
async def _build_prompt(self,
message_txt: str,
sender_name: str = "某人",
relationship_value: float = 0.0,
@@ -31,60 +35,7 @@ class PromptBuilder:
Returns:
str: 构建好的prompt
"""
memory_prompt = ''
start_time = time.time() # 记录开始时间
# topic = await topic_identifier.identify_topic_llm(message_txt)
topic = topic_identifier.identify_topic_snownlp(message_txt)
# print(f"\033[1;32m[pb主题识别]\033[0m 主题: {topic}")
all_first_layer_items = [] # 存储所有第一层记忆
all_second_layer_items = {} # 用字典存储每个topic的第二层记忆
overlapping_second_layer = set() # 存储重叠的第二层记忆
if topic:
# 遍历所有topic
for current_topic in topic:
first_layer_items, second_layer_items = memory_graph.get_related_item(current_topic, depth=2)
# if first_layer_items:
# print(f"\033[1;32m[前额叶]\033[0m 主题 '{current_topic}' 的第一层记忆: {first_layer_items}")
# 记录第一层数据
all_first_layer_items.extend(first_layer_items)
# 记录第二层数据
all_second_layer_items[current_topic] = second_layer_items
# 检查是否有重叠的第二层数据
for other_topic, other_second_layer in all_second_layer_items.items():
if other_topic != current_topic:
# 找到重叠的记忆
overlap = set(second_layer_items) & set(other_second_layer)
if overlap:
# print(f"\033[1;32m[前额叶]\033[0m 发现主题 '{current_topic}' 和 '{other_topic}' 有共同的第二层记忆: {overlap}")
overlapping_second_layer.update(overlap)
selected_first_layer = random.sample(all_first_layer_items, min(2, len(all_first_layer_items))) if all_first_layer_items else []
selected_second_layer = random.sample(list(overlapping_second_layer), min(2, len(overlapping_second_layer))) if overlapping_second_layer else []
# 合并并去重
all_memories = list(set(selected_first_layer + selected_second_layer))
if all_memories:
print(f"\033[1;32m[前额叶]\033[0m 合并所有需要的记忆: {all_memories}")
random_item = " ".join(all_memories)
memory_prompt = f"看到这些聊天,你想起来{random_item}\n"
else:
memory_prompt = "" # 如果没有记忆,则返回空字符串
end_time = time.time() # 记录结束时间
print(f"\033[1;32m[回忆耗时]\033[0m 耗时: {(end_time - start_time):.3f}") # 输出耗时
"""
#先禁用关系
if 0 > 30:
relation_prompt = "关系特别特别好,你很喜欢喜欢他"
@@ -109,25 +60,52 @@ class PromptBuilder:
prompt_info = ''
promt_info_prompt = ''
prompt_info = self.get_prompt_info(message_txt,threshold=0.5)
prompt_info = await self.get_prompt_info(message_txt,threshold=0.5)
if prompt_info:
prompt_info = f'''\n----------------------------------------------------\n你有以下这些[知识]\n{prompt_info}\n请你记住上面的[知识],之后可能会用到\n----------------------------------------------------\n'''
# promt_info_prompt = '你有一些[知识],在上面可以参考。'
end_time = time.time()
print(f"\033[1;32m[知识检索]\033[0m 耗时: {(end_time - start_time):.3f}")
# print(f"\033[1;34m[调试]\033[0m 获取知识库内容结果: {prompt_info}")
# print(f"\033[1;34m[调试信息]\033[0m 正在构建聊天上下文")
# 获取聊天上下文
chat_talking_prompt = ''
if group_id:
chat_talking_prompt = get_recent_group_detailed_plain_text(self.db, group_id, limit=global_config.MAX_CONTEXT_SIZE,combine = True)
chat_talking_prompt = f"以下是群里正在聊天的内容:\n{chat_talking_prompt}"
# print(f"\033[1;34m[调试]\033[0m 已从数据库获取群 {group_id} 的消息记录:{chat_talking_prompt}")
# 使用新的记忆获取方法
memory_prompt = ''
start_time = time.time()
# 调用 hippocampus 的 get_relevant_memories 方法
relevant_memories = await hippocampus.get_relevant_memories(
text=message_txt,
max_topics=5,
similarity_threshold=0.4,
max_memory_num=5
)
if relevant_memories:
# 格式化记忆内容
memory_items = []
for memory in relevant_memories:
memory_items.append(f"关于「{memory['topic']}」的记忆:{memory['content']}")
memory_prompt = f"看到这些聊天,你想起来:\n" + "\n".join(memory_items) + "\n"
# 打印调试信息
print("\n\033[1;32m[记忆检索]\033[0m 找到以下相关记忆:")
for memory in relevant_memories:
print(f"- 主题「{memory['topic']}」[相似度: {memory['similarity']:.2f}]: {memory['content']}")
end_time = time.time()
print(f"\033[1;32m[回忆耗时]\033[0m 耗时: {(end_time - start_time):.3f}")
#激活prompt构建
activate_prompt = ''
activate_prompt = f"以上是群里正在进行的聊天,{memory_prompt} 现在昵称为 '{sender_name}' 的用户说的:{message_txt}。引起了你的注意,你和他{relation_prompt},你想要{relation_prompt_2}"
@@ -162,29 +140,19 @@ class PromptBuilder:
if random.random() < 0.01:
prompt_ger += '你喜欢用文言文'
#额外信息要求
extra_info = '''但是记得回复平淡一些,简短一些,尤其注意在没明确提到时不要过多提及自身的背景, 记住不要输出多余内容(包括前后缀,冒号和引号,括号,表情等),只需要输出回复内容就好,不要输出其他任何内容'''
extra_info = '''但是记得回复平淡一些,简短一些,尤其注意在没明确提到时不要过多提及自身的背景, 不要直接回复别人发的表情包,记住不要输出多余内容(包括前后缀,冒号和引号,括号,表情等),只需要输出回复内容就好,不要输出其他任何内容'''
#合并prompt
prompt = ""
prompt += f"{prompt_info}\n"
prompt += f"{prompt_date}\n"
prompt += f"{chat_talking_prompt}\n"
# prompt += f"{memory_prompt}\n"
# prompt += f"{activate_prompt}\n"
prompt += f"{prompt_personality}\n"
prompt += f"{prompt_ger}\n"
prompt += f"{extra_info}\n"
'''读空气prompt处理'''
activate_prompt_check=f"以上是群里正在进行的聊天,昵称为 '{sender_name}' 的用户说的:{message_txt}。引起了你的注意,你和他{relation_prompt},你想要{relation_prompt_2},但是这不一定是合适的时机,请你决定是否要回应这条消息。"
prompt_personality_check = ''
extra_check_info=f"请注意把握群里的聊天内容的基础上,综合群内的氛围,例如,和{global_config.BOT_NICKNAME}相关的话题要积极回复,如果是at自己的消息一定要回复如果自己正在和别人聊天一定要回复其他话题如果合适搭话也可以回复如果认为应该回复请输出yes否则输出no请注意是决定是否需要回复而不是编写回复内容除了yes和no不要输出任何回复内容。"
@@ -247,10 +215,10 @@ class PromptBuilder:
return prompt_for_initiative
def get_prompt_info(self,message:str,threshold:float):
async def get_prompt_info(self,message:str,threshold:float):
related_info = ''
print(f"\033[1;34m[调试]\033[0m 获取知识库内容,元消息:{message[:30]}...,消息长度: {len(message)}")
embedding = get_embedding(message)
embedding = await get_embedding(message)
related_info += self.get_info_from_db(embedding,threshold=threshold)
return related_info

View File

@@ -4,7 +4,6 @@ from .message import Message
import jieba
from nonebot import get_driver
from .config import global_config
from snownlp import SnowNLP
from ..models.utils_model import LLM_request
driver = get_driver()
@@ -12,10 +11,8 @@ config = driver.config
class TopicIdentifier:
def __init__(self):
self.llm_client = LLM_request(model=global_config.llm_topic_extract)
self.select=global_config.topic_extract
self.llm_topic_judge = LLM_request(model=global_config.llm_topic_judge)
async def identify_topic_llm(self, text: str) -> Optional[List[str]]:
"""识别消息主题,返回主题列表"""
@@ -26,7 +23,7 @@ class TopicIdentifier:
消息内容:{text}"""
# 使用 LLM_request 类进行请求
topic, _ = await self.llm_client.generate_response(prompt)
topic, _ = await self.llm_topic_judge.generate_response(prompt)
if not topic:
print(f"\033[1;31m[错误]\033[0m LLM API 返回为空")
@@ -42,25 +39,4 @@ class TopicIdentifier:
print(f"\033[1;32m[主题识别]\033[0m 主题: {topic_list}")
return topic_list if topic_list else None
def identify_topic_snownlp(self, text: str) -> Optional[List[str]]:
"""使用 SnowNLP 进行主题识别
Args:
text (str): 需要识别主题的文本
Returns:
Optional[List[str]]: 返回识别出的主题关键词列表,如果无法识别则返回 None
"""
if not text or len(text.strip()) == 0:
return None
try:
s = SnowNLP(text)
# 提取前3个关键词作为主题
keywords = s.keywords(5)
return keywords if keywords else None
except Exception as e:
print(f"\033[1;31m[错误]\033[0m SnowNLP 处理失败: {str(e)}")
return None
topic_identifier = TopicIdentifier()

View File

@@ -11,6 +11,9 @@ from collections import Counter
import math
from nonebot import get_driver
from ..models.utils_model import LLM_request
import aiohttp
import jieba
from ..utils.typo_generator import ChineseTypoGenerator
driver = get_driver()
config = driver.config
@@ -30,16 +33,18 @@ def combine_messages(messages: List[Message]) -> str:
time_str = time.strftime("%m-%d %H:%M:%S", time.localtime(message.time))
name = message.user_nickname or f"用户{message.user_id}"
content = message.processed_plain_text or message.plain_text
result += f"[{time_str}] {name}: {content}\n"
return result
def db_message_to_str (message_dict: Dict) -> str:
def db_message_to_str(message_dict: Dict) -> str:
print(f"message_dict: {message_dict}")
time_str = time.strftime("%m-%d %H:%M:%S", time.localtime(message_dict["time"]))
try:
name="[(%s)%s]%s" % (message_dict['user_id'],message_dict.get("user_nickname", ""),message_dict.get("user_cardname", ""))
name = "[(%s)%s]%s" % (
message_dict['user_id'], message_dict.get("user_nickname", ""), message_dict.get("user_cardname", ""))
except:
name = message_dict.get("user_nickname", "") or f"用户{message_dict['user_id']}"
content = message_dict.get("processed_plain_text", "")
@@ -56,6 +61,7 @@ def is_mentioned_bot_in_message(message: Message) -> bool:
return True
return False
def is_mentioned_bot_in_txt(message: str) -> bool:
"""检查消息是否提到了机器人"""
keywords = [global_config.BOT_NICKNAME]
@@ -64,10 +70,13 @@ def is_mentioned_bot_in_txt(message: str) -> bool:
return True
return False
def get_embedding(text):
async def get_embedding(text):
"""获取文本的embedding向量"""
llm = LLM_request(model=global_config.embedding)
return llm.get_embedding_sync(text)
# return llm.get_embedding_sync(text)
return await llm.get_embedding(text)
def cosine_similarity(v1, v2):
dot_product = np.dot(v1, v2)
@@ -75,52 +84,55 @@ def cosine_similarity(v1, v2):
norm2 = np.linalg.norm(v2)
return dot_product / (norm1 * norm2)
def calculate_information_content(text):
"""计算文本的信息量(熵)"""
char_count = Counter(text)
total_chars = len(text)
entropy = 0
for count in char_count.values():
probability = count / total_chars
entropy -= probability * math.log2(probability)
return entropy
def get_cloest_chat_from_db(db, length: int, timestamp: str):
"""从数据库中获取最接近指定时间戳的聊天记录,并记录读取次数"""
chat_text = ''
closest_record = db.db.messages.find_one({"time": {"$lte": timestamp}}, sort=[('time', -1)])
if closest_record and closest_record.get('memorized', 0) < 4:
if closest_record and closest_record.get('memorized', 0) < 4:
closest_time = closest_record['time']
group_id = closest_record['group_id'] # 获取groupid
# 获取该时间戳之后的length条消息且groupid相同
chat_records = list(db.db.messages.find(
{"time": {"$gt": closest_time}, "group_id": group_id}
).sort('time', 1).limit(length))
# 更新每条消息的memorized属性
for record in chat_records:
# 检查当前记录的memorized值
current_memorized = record.get('memorized', 0)
if current_memorized > 3:
if current_memorized > 3:
# print(f"消息已读取3次跳过")
return ''
# 更新memorized值
db.db.messages.update_one(
{"_id": record["_id"]},
{"$set": {"memorized": current_memorized + 1}}
)
chat_text += record["detailed_plain_text"]
return chat_text
print(f"消息已读取3次跳过")
# print(f"消息已读取3次跳过")
return ''
def get_recent_group_messages(db, group_id: int, limit: int = 12) -> list:
async def get_recent_group_messages(db, group_id: int, limit: int = 12) -> list:
"""从数据库获取群组最近的消息记录
Args:
@@ -132,7 +144,7 @@ def get_recent_group_messages(db, group_id: int, limit: int = 12) -> list:
list: Message对象列表按时间正序排列
"""
# 从数据库获取最近消息
# 从数据库获取最近消息
recent_messages = list(db.db.messages.find(
{"group_id": group_id},
# {
@@ -147,7 +159,7 @@ def get_recent_group_messages(db, group_id: int, limit: int = 12) -> list:
if not recent_messages:
return []
# 转换为 Message对象列表
from .message import Message
message_objects = []
@@ -162,16 +174,18 @@ def get_recent_group_messages(db, group_id: int, limit: int = 12) -> list:
processed_plain_text=msg_data.get("processed_text", ""),
group_id=group_id
)
await msg.initialize()
message_objects.append(msg)
except KeyError:
print("[WARNING] 数据库中存在无效的消息")
continue
# 按时间正序排列
message_objects.reverse()
return message_objects
def get_recent_group_detailed_plain_text(db, group_id: int, limit: int = 12,combine = False):
def get_recent_group_detailed_plain_text(db, group_id: int, limit: int = 12, combine=False):
recent_messages = list(db.db.messages.find(
{"group_id": group_id},
{
@@ -185,16 +199,16 @@ def get_recent_group_detailed_plain_text(db, group_id: int, limit: int = 12,comb
if not recent_messages:
return []
message_detailed_plain_text = ''
message_detailed_plain_text_list = []
# 反转消息列表,使最新的消息在最后
recent_messages.reverse()
if combine:
for msg_db_data in recent_messages:
message_detailed_plain_text+=str(msg_db_data["detailed_plain_text"])
message_detailed_plain_text += str(msg_db_data["detailed_plain_text"])
return message_detailed_plain_text
else:
for msg_db_data in recent_messages:
@@ -202,7 +216,6 @@ def get_recent_group_detailed_plain_text(db, group_id: int, limit: int = 12,comb
return message_detailed_plain_text_list
def split_into_sentences_w_remove_punctuation(text: str) -> List[str]:
"""将文本分割成句子,但保持书名号中的内容完整
Args:
@@ -222,30 +235,30 @@ def split_into_sentences_w_remove_punctuation(text: str) -> List[str]:
split_strength = 0.7
else:
split_strength = 0.9
#先移除换行符
# 先移除换行符
# print(f"split_strength: {split_strength}")
# print(f"处理前的文本: {text}")
# 统一将英文逗号转换为中文逗号
text = text.replace(',', '')
text = text.replace('\n', ' ')
# print(f"处理前的文本: {text}")
text_no_1 = ''
for letter in text:
# print(f"当前字符: {letter}")
if letter in ['!','','?','']:
if letter in ['!', '', '?', '']:
# print(f"当前字符: {letter}, 随机数: {random.random()}")
if random.random() < split_strength:
letter = ''
if letter in ['','']:
if letter in ['', '']:
# print(f"当前字符: {letter}, 随机数: {random.random()}")
if random.random() < 1 - split_strength:
letter = ''
text_no_1 += letter
# 对每个逗号单独判断是否分割
sentences = [text_no_1]
new_sentences = []
@@ -274,84 +287,16 @@ def split_into_sentences_w_remove_punctuation(text: str) -> List[str]:
sentences_done = []
for sentence in sentences:
sentence = sentence.rstrip(',')
if random.random() < split_strength*0.5:
if random.random() < split_strength * 0.5:
sentence = sentence.replace('', '').replace(',', '')
elif random.random() < split_strength:
sentence = sentence.replace('', ' ').replace(',', ' ')
sentences_done.append(sentence)
print(f"处理后的句子: {sentences_done}")
return sentences_done
# 常见的错别字映射
TYPO_DICT = {
'': '地得',
'': '咯啦勒',
'': '嘛麻',
'': '八把罢',
'': '',
'': '再在',
'': '',
'': '',
'': '沃窝喔',
'': '泥尼拟',
'': '它她塔祂',
'': '',
'': '阿哇',
'': '呐捏',
'': '豆读毒',
'': '',
'': '回汇',
'': '趣取曲',
'': '作坐',
'': '相像',
'': '说税睡',
'': '砍堪刊',
'': '来莱赖',
'': '号毫豪',
'': '给既继',
'': '锅果裹',
'': '',
'': '位未',
'': '甚深伸',
'': '末麽嘛',
'': '话花划',
'': '织直值',
'': '',
'': '听停挺',
'': '见件建',
'': '觉脚搅',
'': '得德锝',
'': '着找招',
'': '向象想',
'': '等灯登',
'': '谢写卸',
'': '对队',
'': '里理鲤',
'': '啦拉喇',
'': '吃持迟',
'': '哦喔噢',
'': '呀压',
'': '',
'': '太抬台',
'': '',
'': '',
'': '以已',
'': '因应',
'': '啥沙傻',
'': '行型形',
'': '哈蛤铪',
'': '嘿黑嗨',
'': '嗯恩摁',
'': '哎爱埃',
'': '呜屋污',
'': '喂位未',
'': '嘛麻马',
'': '嗨害亥',
'': '哇娃蛙',
'': '咦意易',
'': '嘻西希'
}
def random_remove_punctuation(text: str) -> str:
"""随机处理标点符号,模拟人类打字习惯
@@ -364,7 +309,7 @@ def random_remove_punctuation(text: str) -> str:
"""
result = ''
text_len = len(text)
for i, char in enumerate(text):
if char == '' and i == text_len - 1: # 结尾的句号
if random.random() > 0.4: # 80%概率删除结尾句号
@@ -379,32 +324,30 @@ def random_remove_punctuation(text: str) -> str:
result += char
return result
def add_typos(text: str) -> str:
TYPO_RATE = 0.02 # 控制错别字出现的概率(2%)
result = ""
for char in text:
if char in TYPO_DICT and random.random() < TYPO_RATE:
# 从可能的错别字中随机选择一个
typos = TYPO_DICT[char]
result += random.choice(typos)
else:
result += char
return result
def process_llm_response(text: str) -> List[str]:
# processed_response = process_text_with_typos(content)
if len(text) > 200:
print(f"回复过长 ({len(text)} 字符),返回默认回复")
return ['懒得说']
if len(text) > 300:
print(f"回复过长 ({len(text)} 字符),返回默认回复")
return ['懒得说']
# 处理长消息
sentences = split_into_sentences_w_remove_punctuation(add_typos(text))
typo_generator = ChineseTypoGenerator(
error_rate=0.03,
min_freq=7,
tone_error_rate=0.2,
word_replace_rate=0.02
)
typoed_text = typo_generator.create_typo_sentence(text)[0]
sentences = split_into_sentences_w_remove_punctuation(typoed_text)
# 检查分割后的消息数量是否过多超过3条
if len(sentences) > 3:
if len(sentences) > 4:
print(f"分割后消息数量过多 ({len(sentences)} 条),返回默认回复")
return [f'{global_config.BOT_NICKNAME}不知道哦']
return sentences
def calculate_typing_time(input_string: str, chinese_time: float = 0.2, english_time: float = 0.1) -> float:
"""
计算输入字符串所需的时间,中文和英文字符有不同的输入时间
@@ -417,7 +360,46 @@ def calculate_typing_time(input_string: str, chinese_time: float = 0.2, english_
if '\u4e00' <= char <= '\u9fff': # 判断是否为中文字符
total_time += chinese_time
else: # 其他字符(如英文)
total_time += english_time
total_time += english_time
return total_time
def cosine_similarity(v1, v2):
"""计算余弦相似度"""
dot_product = np.dot(v1, v2)
norm1 = np.linalg.norm(v1)
norm2 = np.linalg.norm(v2)
if norm1 == 0 or norm2 == 0:
return 0
return dot_product / (norm1 * norm2)
def text_to_vector(text):
"""将文本转换为词频向量"""
# 分词
words = jieba.lcut(text)
# 统计词频
word_freq = Counter(words)
return word_freq
def find_similar_topics_simple(text: str, topics: list, top_k: int = 5) -> list:
"""使用简单的余弦相似度计算文本相似度"""
# 将输入文本转换为词频向量
text_vector = text_to_vector(text)
# 计算每个主题的相似度
similarities = []
for topic in topics:
topic_vector = text_to_vector(topic)
# 获取所有唯一词
all_words = set(text_vector.keys()) | set(topic_vector.keys())
# 构建向量
v1 = [text_vector.get(word, 0) for word in all_words]
v2 = [topic_vector.get(word, 0) for word in all_words]
# 计算相似度
similarity = cosine_similarity(v1, v2)
similarities.append((topic, similarity))
# 按相似度降序排序并返回前k个
return sorted(similarities, key=lambda x: x[1], reverse=True)[:top_k]

View File

@@ -4,6 +4,7 @@ import hashlib
import time
import os
from ...common.database import Database
from ..chat.config import global_config
import zlib # 用于 CRC32
import base64
from nonebot import get_driver
@@ -143,6 +144,8 @@ def storage_emoji(image_data: bytes) -> bytes:
Returns:
bytes: 原始图片数据
"""
if not global_config.EMOJI_SAVE:
return image_data
try:
# 使用 CRC32 计算哈希值
hash_value = format(zlib.crc32(image_data) & 0xFFFFFFFF, 'x')
@@ -227,7 +230,7 @@ def compress_base64_image_by_scale(base64_data: str, target_size: int = 0.8 * 10
image_data = base64.b64decode(base64_data)
# 如果已经小于目标大小,直接返回原图
if len(image_data) <= target_size:
if len(image_data) <= 2*1024*1024:
return base64_data
# 将字节数据转换为图片对象
@@ -252,7 +255,7 @@ def compress_base64_image_by_scale(base64_data: str, target_size: int = 0.8 * 10
for frame_idx in range(img.n_frames):
img.seek(frame_idx)
new_frame = img.copy()
new_frame = new_frame.resize((new_width, new_height), Image.Resampling.LANCZOS)
new_frame = new_frame.resize((new_width//2, new_height//2), Image.Resampling.LANCZOS) # 动图折上折
frames.append(new_frame)
# 保存到缓冲区
@@ -286,4 +289,19 @@ def compress_base64_image_by_scale(base64_data: str, target_size: int = 0.8 * 10
logger.error(f"压缩图片失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return base64_data
return base64_data
def image_path_to_base64(image_path: str) -> str:
"""将图片路径转换为base64编码
Args:
image_path: 图片文件路径
Returns:
str: base64编码的图片数据
"""
try:
with open(image_path, 'rb') as f:
image_data = f.read()
return base64.b64encode(image_data).decode('utf-8')
except Exception as e:
logger.error(f"读取图片失败: {image_path}, 错误: {str(e)}")
return None

View File

@@ -34,16 +34,16 @@ class WillingManager:
print(f"被重复提及, 当前意愿: {current_willing}")
if is_emoji:
current_willing *= 0.15
current_willing *= 0.1
print(f"表情包, 当前意愿: {current_willing}")
if interested_rate > 0.65:
if interested_rate > 0.4:
print(f"兴趣度: {interested_rate}, 当前意愿: {current_willing}")
current_willing += interested_rate-0.6
current_willing += interested_rate-0.1
self.group_reply_willing[group_id] = min(current_willing, 3.0)
reply_probability = max((current_willing - 0.55) * 1.9, 0)
reply_probability = max((current_willing - 0.45) * 2, 0)
if group_id not in config.talk_allowed_groups:
current_willing = 0
reply_probability = 0