feat: 新增统一命名功能,自动替换prompt内唯一标识符,优化prompt效果

This commit is contained in:
SengokuCola
2025-04-16 01:13:18 +08:00
parent 42ccb3dccb
commit 9fc74cb066
13 changed files with 443 additions and 129 deletions

View File

@@ -6,6 +6,9 @@ from typing import Any, Callable, Dict
import datetime
import asyncio
import numpy as np
from src.plugins.models.utils_model import LLM_request
from src.plugins.config.config import global_config
from src.individuality.individuality import Individuality
import matplotlib
@@ -13,6 +16,8 @@ matplotlib.use("Agg")
import matplotlib.pyplot as plt
from pathlib import Path
import pandas as pd
import json
import re
"""
@@ -32,6 +37,8 @@ logger = get_module_logger("person_info")
person_info_default = {
"person_id": None,
"person_name": None,
"name_reason": None,
"platform": None,
"user_id": None,
"nickname": None,
@@ -48,16 +55,46 @@ person_info_default = {
class PersonInfoManager:
def __init__(self):
self.person_name_list = {}
self.qv_name_llm = LLM_request(
model=global_config.llm_normal,
max_tokens=256,
request_type="qv_name",
)
if "person_info" not in db.list_collection_names():
db.create_collection("person_info")
db.person_info.create_index("person_id", unique=True)
# 初始化时读取所有person_name
cursor = db.person_info.find(
{"person_name": {"$exists": True}},
{"person_id": 1, "person_name": 1, "_id": 0}
)
for doc in cursor:
if doc.get("person_name"):
self.person_name_list[doc["person_id"]] = doc["person_name"]
logger.debug(f"已加载 {len(self.person_name_list)} 个用户名称")
def get_person_id(self, platform: str, user_id: int):
"""获取唯一id"""
#如果platform中存在-,就截取-后面的部分
if "-" in platform:
platform = platform.split("-")[1]
components = [platform, str(user_id)]
key = "_".join(components)
return hashlib.md5(key.encode()).hexdigest()
def is_person_known(self, platform: str, user_id: int):
"""判断是否认识某人"""
person_id = self.get_person_id(platform, user_id)
document = db.person_info.find_one({"person_id": person_id})
if document:
return True
else:
return False
async def create_person_info(self, person_id: str, data: dict = None):
"""创建一个项"""
if not person_id:
@@ -88,6 +125,109 @@ class PersonInfoManager:
Data[field_name] = value
logger.debug(f"更新时{person_id}不存在,已新建")
await self.create_person_info(person_id, Data)
async def has_one_field(self, person_id: str, field_name: str):
"""判断是否存在某一个字段"""
document = db.person_info.find_one({"person_id": person_id}, {field_name: 1})
if document:
return True
else:
return False
def _extract_json_from_text(self, text: str) -> dict:
"""从文本中提取JSON数据的高容错方法"""
try:
# 尝试直接解析
return json.loads(text)
except json.JSONDecodeError:
try:
# 尝试找到JSON格式的部分
json_pattern = r'\{[^{}]*\}'
matches = re.findall(json_pattern, text)
if matches:
return json.loads(matches[0])
# 如果上面都失败了,尝试提取键值对
nickname_pattern = r'"nickname"[:\s]+"([^"]+)"'
reason_pattern = r'"reason"[:\s]+"([^"]+)"'
nickname_match = re.search(nickname_pattern, text)
reason_match = re.search(reason_pattern, text)
if nickname_match:
return {
"nickname": nickname_match.group(1),
"reason": reason_match.group(1) if reason_match else "未提供理由"
}
except Exception as e:
logger.error(f"JSON提取失败: {str(e)}")
# 如果所有方法都失败了,返回空结果
return {"nickname": "", "reason": ""}
async def qv_person_name(self, person_id: str, user_nickname: str, user_cardname: str, user_avatar: str):
"""给某个用户取名"""
if not person_id:
logger.debug("取名失败person_id不能为空")
return
old_name = await self.get_value(person_id, "person_name")
old_reason = await self.get_value(person_id, "name_reason")
max_retries = 5 # 最大重试次数
current_try = 0
existing_names = ""
while current_try < max_retries:
individuality = Individuality.get_instance()
prompt_personality = individuality.get_prompt(type="personality", x_person=2, level=1)
bot_name = individuality.personality.bot_nickname
qv_name_prompt = f"你是{bot_name},你{prompt_personality}"
qv_name_prompt += f"现在你想给一个用户取一个昵称用户是的qq昵称是{user_nickname}"
qv_name_prompt += f"用户的qq群昵称名是{user_cardname}"
if user_avatar:
qv_name_prompt += f"用户的qq头像是{user_avatar}"
if old_name:
qv_name_prompt += f"你之前叫他{old_name},是因为{old_reason}"
qv_name_prompt += "\n请根据以上用户信息想想你叫他什么比较好请最好使用用户的qq昵称可以稍作修改"
if existing_names:
qv_name_prompt += f"\n请注意,以下名称已被使用,不要使用以下昵称:{existing_names}\n"
qv_name_prompt += "请用json给出你的想法并给出理由示例如下"
qv_name_prompt += '''{
"nickname": "昵称",
"reason": "理由"
}'''
logger.debug(f"取名提示词:{qv_name_prompt}")
response = await self.qv_name_llm.generate_response(qv_name_prompt)
logger.debug(f"取名回复:{response}")
result = self._extract_json_from_text(response[0])
if not result["nickname"]:
logger.error("生成的昵称为空,重试中...")
current_try += 1
continue
# 检查生成的昵称是否已存在
if result["nickname"] not in self.person_name_list.values():
# 更新数据库和内存中的列表
await self.update_one_field(person_id, "person_name", result["nickname"])
# await self.update_one_field(person_id, "nickname", user_nickname)
# await self.update_one_field(person_id, "avatar", user_avatar)
await self.update_one_field(person_id, "name_reason", result["reason"])
self.person_name_list[person_id] = result["nickname"]
logger.debug(f"用户 {person_id} 的名称已更新为 {result['nickname']},原因:{result['reason']}")
return result
else:
existing_names += f"{result['nickname']}"
logger.debug(f"生成的昵称 {result['nickname']} 已存在,重试中...")
current_try += 1
logger.error(f"{max_retries}次尝试后仍未能生成唯一昵称")
return None
async def del_one_document(self, person_id: str):
"""删除指定 person_id 的文档"""

View File

@@ -4,6 +4,8 @@ import math
from bson.decimal128 import Decimal128
from .person_info import person_info_manager
import time
import re
import traceback
relationship_config = LogConfig(
# 使用关系专用样式
@@ -74,6 +76,61 @@ class RelationshipManager:
return mood_value * coefficient
else:
return mood_value / coefficient
async def is_known_some_one(self, platform , user_id):
"""判断是否认识某人"""
is_known = person_info_manager.is_person_known(platform, user_id)
return is_known
async def is_qved_name(self, platform , user_id):
"""判断是否认识某人"""
person_id = person_info_manager.get_person_id(platform, user_id)
is_qved = await person_info_manager.has_one_field(person_id, "person_name")
old_name = await person_info_manager.get_value(person_id, "person_name")
print(f"old_name: {old_name}")
print(f"is_qved: {is_qved}")
if is_qved and old_name != None:
return True
else:
return False
async def first_knowing_some_one(self, platform , user_id, user_nickname, user_cardname, user_avatar):
"""判断是否认识某人"""
person_id = person_info_manager.get_person_id(platform,user_id)
await person_info_manager.update_one_field(person_id, "nickname", user_nickname)
# await person_info_manager.update_one_field(person_id, "user_cardname", user_cardname)
# await person_info_manager.update_one_field(person_id, "user_avatar", user_avatar)
await person_info_manager.qv_person_name(person_id, user_nickname, user_cardname, user_avatar)
async def convert_all_person_sign_to_person_name(self,input_text:str):
"""将所有人的<platform:user_id:nickname:cardname>格式转换为person_name"""
try:
# 使用正则表达式匹配<platform:user_id:nickname:cardname>格式
all_person = person_info_manager.person_name_list
pattern = r'<([^:]+):(\d+):([^:]+):([^>]+)>'
matches = re.findall(pattern, input_text)
# 遍历匹配结果,将<platform:user_id:nickname:cardname>替换为person_name
result_text = input_text
for platform, user_id, nickname, cardname in matches:
person_id = person_info_manager.get_person_id(platform, user_id)
# 默认使用昵称作为人名
person_name = nickname.strip() if nickname.strip() else cardname.strip()
if person_id in all_person:
if all_person[person_id] != None:
person_name = all_person[person_id]
print(f"将<{platform}:{user_id}:{nickname}:{cardname}>替换为{person_name}")
result_text = result_text.replace(f"<{platform}:{user_id}:{nickname}:{cardname}>", person_name)
return result_text
except Exception as e:
logger.error(traceback.format_exc())
return input_text
async def calculate_update_relationship_value(self, chat_stream: ChatStream, label: str, stance: str) -> tuple:
"""计算并变更关系值