feat:合并自我处理器和关系处理器

This commit is contained in:
SengokuCola
2025-06-21 15:46:53 +08:00
parent fd09550af9
commit 0f5fdc2ae5
10 changed files with 327 additions and 598 deletions

View File

@@ -14,37 +14,13 @@ from src.manager.async_task_manager import AsyncTask
from src.llm_models.utils_model import LLMRequest
from src.config.config import global_config
from src.common.logger import get_logger
from src.person_info.person_info import get_person_info_manager
install(extra_lines=3)
logger = get_logger("individuality")
def init_prompt():
"""初始化用于关键词提取的prompts"""
extract_keywords_prompt = """
请分析以下对{bot_name}的描述,提取出其中的独立关键词。每个关键词应该是可以用来从某一角度概括的方面,例如:
性格,身高,喜好,外貌,身份,兴趣,爱好,习惯,等等..........
描述内容:
{personality_sides}
要求:
1. 选择关键词,对{bot_name}的某一方面进行概括
2. 用json格式输出以下是示例格式
{{
"性格":"性格开朗",
"兴趣":"喜欢唱歌",
"身份":"大学生",
}}
以上是一个例子,你可以输出多个关键词,现在请你根据描述内容进行总结{bot_name}输出json格式
请输出json格式不要输出任何解释或其他内容
"""
Prompt(extract_keywords_prompt, "extract_keywords_prompt")
class Individuality:
"""个体特征管理类"""
@@ -55,11 +31,8 @@ class Individuality:
self.express_style: PersonalityExpression = PersonalityExpression()
self.name = ""
# 关键词缓存相关
self.keyword_info_cache: dict = {} # {keyword: [info_list]}
self.fetch_info_file_path = "data/personality/fetch_info.json"
self.meta_info_file_path = "data/personality/meta_info.json"
self.bot_person_id = ""
self.meta_info_file_path = "data/personality/meta.json"
async def initialize(
self,
@@ -76,6 +49,13 @@ class Individuality:
personality_sides: 人格侧面描述
identity_detail: 身份细节描述
"""
person_info_manager = get_person_info_manager()
self.bot_person_id = person_info_manager.get_person_id("system", "bot_id")
self.name = bot_nickname
# 检查配置变化,如果变化则清空
await self._check_config_and_clear_if_changed(bot_nickname, personality_core, personality_sides, identity_detail)
# 初始化人格
self.personality = Personality.initialize(
bot_nickname=bot_nickname, personality_core=personality_core, personality_sides=personality_sides
@@ -84,13 +64,31 @@ class Individuality:
# 初始化身份
self.identity = Identity(identity_detail=identity_detail)
# 将所有人设写入impression
impression_parts = []
if personality_core:
impression_parts.append(f"核心人格: {personality_core}")
if personality_sides:
impression_parts.append(f"人格侧面: {''.join(personality_sides)}")
if identity_detail:
impression_parts.append(f"身份: {''.join(identity_detail)}")
impression_text = "".join(impression_parts)
if impression_text:
impression_text += ""
if impression_text:
update_data = {
"platform": "system",
"user_id": "bot_id",
"person_name": self.name,
"nickname": self.name,
}
await person_info_manager.update_one_field(self.bot_person_id, "impression", impression_text, data=update_data)
logger.info(f"已将完整人设更新到bot的impression中")
await self.express_style.extract_and_store_personality_expressions()
self.name = bot_nickname
# 预处理关键词和生成信息缓存
await self._preprocess_personality_keywords(personality_sides, identity_detail)
def to_dict(self) -> dict:
"""将个体特征转换为字典格式"""
return {
@@ -257,227 +255,64 @@ class Individuality:
return self.personality.neuroticism
return None
def _get_config_hash(self, personality_sides: list, identity_detail: list) -> str:
def _get_config_hash(self, bot_nickname: str, personality_core: str, personality_sides: list, identity_detail: list) -> str:
"""获取当前personality和identity配置的哈希值"""
# 将配置转换为字符串并排序,确保一致性
config_str = json.dumps(
{"personality_sides": sorted(personality_sides), "identity_detail": sorted(identity_detail)}, sort_keys=True
)
config_data = {
"nickname": bot_nickname,
"personality_core": personality_core,
"personality_sides": sorted(personality_sides),
"identity_detail": sorted(identity_detail)
}
config_str = json.dumps(config_data, sort_keys=True)
return hashlib.md5(config_str.encode("utf-8")).hexdigest()
async def _check_config_and_clear_if_changed(
self, bot_nickname: str, personality_core: str, personality_sides: list, identity_detail: list
):
"""检查配置是否发生变化如果变化则清空info_list"""
person_info_manager = get_person_info_manager()
current_hash = self._get_config_hash(bot_nickname, personality_core, personality_sides, identity_detail)
meta_info = self._load_meta_info()
stored_hash = meta_info.get("config_hash")
if current_hash != stored_hash:
logger.info("检测到人格配置发生变化,将清空原有的关键词缓存。")
# 清空数据库中的info_list
update_data = {
"platform": "system",
"user_id": "bot_id",
"person_name": self.name,
"nickname": self.name,
}
await person_info_manager.update_one_field(self.bot_person_id, "info_list", [], data=update_data)
# 更新元信息文件,重置计数器
new_meta_info = {"config_hash": current_hash}
self._save_meta_info(new_meta_info)
def _load_meta_info(self) -> dict:
"""从JSON文件中加载元信息"""
if os.path.exists(self.meta_info_file_path):
try:
with open(self.meta_info_file_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
print(f"读取meta_info文件失败: {e}")
except (json.JSONDecodeError, IOError) as e:
logger.error(f"读取meta_info文件失败: {e}, 将创建新文件。")
return {}
return {}
def _save_meta_info(self, meta_info: dict):
"""将元信息保存到JSON文件"""
try:
# 确保目录存在
os.makedirs(os.path.dirname(self.meta_info_file_path), exist_ok=True)
with open(self.meta_info_file_path, "w", encoding="utf-8") as f:
json.dump(meta_info, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"保存meta_info文件失败: {e}")
except IOError as e:
logger.error(f"保存meta_info文件失败: {e}")
def _check_config_change_and_clear(self, personality_sides: list, identity_detail: list):
"""检查配置是否发生变化如果变化则清空fetch_info.json"""
current_config_hash = self._get_config_hash(personality_sides, identity_detail)
meta_info = self._load_meta_info()
stored_config_hash = meta_info.get("config_hash", "")
if current_config_hash != stored_config_hash:
logger.info("检测到personality或identity配置发生变化清空fetch_info数据")
# 清空fetch_info文件
if os.path.exists(self.fetch_info_file_path):
try:
os.remove(self.fetch_info_file_path)
logger.info("已清空fetch_info文件")
except Exception as e:
logger.error(f"清空fetch_info文件失败: {e}")
# 更新元信息
meta_info["config_hash"] = current_config_hash
self._save_meta_info(meta_info)
logger.info("已更新配置哈希值")
def _load_fetch_info_from_file(self) -> dict:
"""从JSON文件中加载已保存的fetch_info数据"""
if os.path.exists(self.fetch_info_file_path):
try:
with open(self.fetch_info_file_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logger.error(f"读取fetch_info文件失败: {e}")
return {}
return {}
def _save_fetch_info_to_file(self, fetch_info_data: dict):
"""将fetch_info数据保存到JSON文件"""
try:
# 确保目录存在
os.makedirs(os.path.dirname(self.fetch_info_file_path), exist_ok=True)
with open(self.fetch_info_file_path, "w", encoding="utf-8") as f:
json.dump(fetch_info_data, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"保存fetch_info文件失败: {e}")
async def _preprocess_personality_keywords(self, personality_sides: list, identity_detail: list):
"""预处理personality关键词提取关键词并生成缓存"""
try:
logger.info("开始预处理personality关键词...")
# 检查配置变化
self._check_config_change_and_clear(personality_sides, identity_detail)
# 加载已有的预处理数据(如果存在)
fetch_info_data = self._load_fetch_info_from_file()
logger.info(f"加载已有数据,现有关键词数量: {len(fetch_info_data)}")
# 构建完整描述personality + identity
personality_sides_str = ""
for personality_side in personality_sides:
personality_sides_str += f"{personality_side}"
# 添加identity内容
for detail in identity_detail:
personality_sides_str += f"{detail}"
if not personality_sides_str:
logger.info("没有personality和identity配置跳过预处理")
return
# 提取关键词
extract_prompt = (await global_prompt_manager.get_prompt_async("extract_keywords_prompt")).format(
personality_sides=personality_sides_str, bot_name=self.name
)
llm_model = LLMRequest(
model=global_config.model.utils_small,
request_type="individuality.keyword_extract",
)
keywords_result, _ = await llm_model.generate_response_async(prompt=extract_prompt)
logger.info(f"LLM返回的原始关键词结果: '{keywords_result}'")
if not keywords_result or keywords_result.strip() == "none":
logger.info("未提取到有效关键词")
return
# 使用json_repair修复并解析JSON
keyword_dict = json.loads(repair_json(keywords_result))
logger.info(f"成功解析JSON格式的关键词: {keyword_dict}")
# 从字典中提取关键词列表,跳过"keywords"键
keyword_set = []
for key, _value in keyword_dict.items():
if key.lower() != "keywords" and key.strip():
keyword_set.append(key.strip())
logger.info(f"最终提取的关键词列表: {keyword_set}")
logger.info(f"共提取到 {len(keyword_set)} 个关键词")
# 处理每个关键词的信息
updated_count = 0
new_count = 0
for keyword in keyword_set:
try:
logger.info(f"正在处理关键词: '{keyword}' (长度: {len(keyword)})")
# 检查是否已存在该关键词
if keyword in fetch_info_data:
logger.info(f"关键词 '{keyword}' 已存在,将添加新信息...")
action_type = "追加"
else:
logger.info(f"正在为新关键词 '{keyword}' 生成信息...")
action_type = "新增"
fetch_info_data[keyword] = [] # 初始化为空列表
# 从JSON结果中获取关键词的信息
existing_info_from_json = keyword_dict.get(keyword, "")
if (
existing_info_from_json
and existing_info_from_json.strip()
and existing_info_from_json != keyword
):
# 如果JSON中有有效信息且不只是重复关键词本身直接使用
logger.info(f"从JSON结果中获取到关键词 '{keyword}' 的信息: '{existing_info_from_json}'")
if existing_info_from_json not in fetch_info_data[keyword]:
fetch_info_data[keyword].append(existing_info_from_json)
if action_type == "追加":
updated_count += 1
else:
new_count += 1
logger.info(f"{action_type}关键词 '{keyword}' 的信息成功")
else:
logger.info(f"关键词 '{keyword}' 的信息已存在,跳过重复添加")
else:
logger.info(f"关键词 '{keyword}' 在JSON中没有有效信息跳过")
except Exception as e:
logger.error(f"为关键词 '{keyword}' 生成信息时出错: {e}")
continue
# 保存合并后的数据到文件和内存缓存
if updated_count > 0 or new_count > 0:
self._save_fetch_info_to_file(fetch_info_data)
logger.info(
f"预处理完成,新增 {new_count} 个关键词,追加 {updated_count} 个关键词信息,总计 {len(fetch_info_data)} 个关键词"
)
else:
logger.info("预处理完成,但没有生成任何新的有效信息")
# 将数据加载到内存缓存
self.keyword_info_cache = fetch_info_data
logger.info(f"关键词缓存已加载,共 {len(self.keyword_info_cache)} 个关键词")
# 注册定时任务(延迟执行,避免阻塞初始化)
import asyncio
asyncio.create_task(self._register_keyword_update_task_delayed())
except Exception as e:
logger.error(f"预处理personality关键词时出错: {e}")
traceback.print_exc()
async def _register_keyword_update_task_delayed(self):
"""延迟注册关键词更新定时任务"""
try:
# 等待一小段时间确保系统完全初始化
import asyncio
await asyncio.sleep(5)
from src.manager.async_task_manager import async_task_manager
logger = get_logger("individuality")
# 创建定时任务
task = KeywordUpdateTask(
personality_sides=list(global_config.personality.personality_sides),
identity_detail=list(global_config.identity.identity_detail),
individuality_instance=self,
)
# 注册任务
await async_task_manager.add_task(task)
logger.info("关键词更新定时任务已注册")
except Exception as e:
logger.error(f"注册关键词更新定时任务失败: {e}")
traceback.print_exc()
def get_keyword_info(self, keyword: str) -> str:
async def get_keyword_info(self, keyword: str) -> str:
"""获取指定关键词的信息
Args:
@@ -486,13 +321,36 @@ class Individuality:
Returns:
str: 随机选择的一条信息,如果没有则返回空字符串
"""
if keyword in self.keyword_info_cache and self.keyword_info_cache[keyword]:
return random.choice(self.keyword_info_cache[keyword])
person_info_manager = get_person_info_manager()
info_list_json = await person_info_manager.get_value(self.bot_person_id, "info_list")
if info_list_json:
try:
# get_value might return a pre-deserialized list if it comes from a cache,
# or a JSON string if it comes from DB.
info_list = json.loads(info_list_json) if isinstance(info_list_json, str) else info_list_json
for item in info_list:
if isinstance(item, dict) and item.get("info_type") == keyword:
return item.get("info_content", "")
except (json.JSONDecodeError, TypeError):
logger.error(f"解析info_list失败: {info_list_json}")
return ""
return ""
def get_all_keywords(self) -> list:
async def get_all_keywords(self) -> list:
"""获取所有已缓存的关键词列表"""
return list(self.keyword_info_cache.keys())
person_info_manager = get_person_info_manager()
info_list_json = await person_info_manager.get_value(self.bot_person_id, "info_list")
keywords = []
if info_list_json:
try:
info_list = json.loads(info_list_json) if isinstance(info_list_json, str) else info_list_json
for item in info_list:
if isinstance(item, dict) and "info_type" in item:
keywords.append(item["info_type"])
except (json.JSONDecodeError, TypeError):
logger.error(f"解析info_list失败: {info_list_json}")
return keywords
individuality = None
@@ -503,66 +361,3 @@ def get_individuality():
if individuality is None:
individuality = Individuality()
return individuality
class KeywordUpdateTask(AsyncTask):
"""关键词更新定时任务"""
def __init__(self, personality_sides: list, identity_detail: list, individuality_instance):
# 调用父类构造函数
super().__init__(
task_name="keyword_update_task",
wait_before_start=3600, # 1小时后开始
run_interval=3600, # 每小时运行一次
)
self.personality_sides = personality_sides
self.identity_detail = identity_detail
self.individuality_instance = individuality_instance
# 任务控制参数
self.max_runs = 20
self.current_runs = 0
self.original_config_hash = individuality_instance._get_config_hash(personality_sides, identity_detail)
async def run(self):
"""执行任务"""
try:
from src.common.logger import get_logger
logger = get_logger("individuality.task")
# 检查是否超过最大运行次数
if self.current_runs >= self.max_runs:
logger.info(f"关键词更新任务已达到最大运行次数({self.max_runs}),停止执行")
# 设置为0间隔来停止循环任务
self.run_interval = 0
return
# 检查配置是否发生变化
current_config_hash = self.individuality_instance._get_config_hash(
self.personality_sides, self.identity_detail
)
if current_config_hash != self.original_config_hash:
logger.info("检测到personality或identity配置发生变化停止定时任务")
# 设置为0间隔来停止循环任务
self.run_interval = 0
return
self.current_runs += 1
logger.info(f"开始执行关键词更新任务 (第{self.current_runs}/{self.max_runs}次)")
# 执行关键词预处理
await self.individuality_instance._preprocess_personality_keywords(
self.personality_sides, self.identity_detail
)
logger.info(f"关键词更新任务完成 (第{self.current_runs}/{self.max_runs}次)")
except Exception as e:
logger.error(f"关键词更新任务执行失败: {e}")
traceback.print_exc()
# 初始化prompt模板
init_prompt()