新增管理个人信息的person_info,将关系值并入其中,通过person_info统一管理,支持扩展

This commit is contained in:
meng_xi_pan
2025-04-02 00:11:16 +08:00
parent 9ee3f7ca10
commit 7d4e687017
4 changed files with 231 additions and 16 deletions

View File

@@ -4,7 +4,7 @@ from .plugins.utils.statistic import LLMStatistics
from .plugins.moods.moods import MoodManager
from .plugins.schedule.schedule_generator import bot_schedule
from .plugins.chat.emoji_manager import emoji_manager
from .plugins.chat.relationship_manager import relationship_manager
from .plugins.chat.person_info import person_info_manager
from .plugins.willing.willing_manager import willing_manager
from .plugins.chat.chat_stream import chat_manager
from .heart_flow.heartflow import heartflow
@@ -55,9 +55,8 @@ class MainSystem:
self.mood_manager.start_mood_update(update_interval=global_config.mood_update_interval)
logger.success("情绪管理器启动成功")
# 加载用户关系
await relationship_manager.load_all_relationships()
asyncio.create_task(relationship_manager._start_relationship_manager())
# 检查并清除person_info冗余字段
await person_info_manager.del_all_undefined_field()
# 启动愿望管理器
await willing_manager.ensure_started()

View File

@@ -158,6 +158,10 @@ class ChatBot:
else:
mes_name = "私聊"
if message.message_info.additional_config:
if "maimcore_reply_probability_gain" in message.message_info.additional_config.keys():
reply_probability += message.message_info.additional_config["maimcore_reply_probability_gain"]
# 打印收到的信息的信息
current_time = time.strftime("%H:%M:%S", time.localtime(messageinfo.time))
logger.info(
@@ -166,10 +170,6 @@ class ChatBot:
f"{message.processed_plain_text}[回复意愿:{current_willing:.2f}][概率:{reply_probability * 100:.1f}%]"
)
if message.message_info.additional_config:
if "maimcore_reply_probability_gain" in message.message_info.additional_config.keys():
reply_probability += message.message_info.additional_config["maimcore_reply_probability_gain"]
do_reply = False
# 开始组织语言
if random() < reply_probability:
@@ -191,7 +191,7 @@ class ChatBot:
timing_results["思考前脑内状态"] = timer2 - timer1
timer1 = time.time()
response_set = await self.gpt.generate_response(message)
response_set, undivided_response = await self.gpt.generate_response(message)
timer2 = time.time()
timing_results["生成回复"] = timer2 - timer1
@@ -223,6 +223,9 @@ class ChatBot:
response_msg = " ".join(response_set) if response_set else "无回复"
logger.info(f"触发消息: {trigger_msg[:20]}... | 生成消息: {response_msg[:20]}... | 性能计时: {timing_str}")
# 更新情绪和关系
await self._update_emotion_and_relationship(message, chat, undivided_response)
async def _update_using_response(self, message, response_set):
# 更新心流状态
stream_id = message.chat_stream.stream_id
@@ -310,17 +313,15 @@ class ChatBot:
)
message_manager.add_message(bot_message)
async def _update_emotion_and_relationship(self, message, chat, response, raw_content):
async def _update_emotion_and_relationship(self, message, chat, undivided_response):
"""更新情绪和关系
Args:
message: 接收到的消息
chat: 聊天流对象
response: 生成的回复
raw_content: 原始内容
undivided_response: 生成的未分割回复
"""
stance, emotion = await self.gpt._get_emotion_tags(raw_content, message.processed_plain_text)
logger.debug(f"'{response}' 立场为:{stance} 获取到的情感标签为:{emotion}")
stance, emotion = await self.gpt._get_emotion_tags(undivided_response, message.processed_plain_text)
await relationship_manager.calculate_update_relationship_value(chat_stream=chat, label=emotion, stance=stance)
self.mood_manager.update_mood_from_emotion(emotion, global_config.mood_intensity_factor)

View File

@@ -59,6 +59,7 @@ class ResponseGenerator:
current_model = self.model_normal
model_response = await self._generate_response_with_model(message, current_model)
undivided_response = model_response
# print(f"raw_content: {model_response}")
@@ -66,10 +67,10 @@ class ResponseGenerator:
logger.info(f"{global_config.BOT_NICKNAME}的回复是:{model_response}")
model_response = await self._process_response(model_response)
return model_response
return model_response, undivided_response
else:
logger.info(f"{self.current_model_type}思考,失败")
return None
return None, None
async def _generate_response_with_model(self, message: MessageThinking, model: LLM_request):
sender_name = ""

View File

@@ -0,0 +1,214 @@
from src.common.logger import get_module_logger
from ...common.database import db
import copy
import hashlib
from typing import Any, Callable, Dict, TypeVar
T = TypeVar('T') # 泛型类型
"""
PersonInfoManager 类方法功能摘要:
1. get_person_id - 根据平台和用户ID生成MD5哈希的唯一person_id
2. create_person_info - 创建新个人信息文档(自动合并默认值)
3. update_one_field - 更新单个字段值(若文档不存在则创建)
4. del_one_document - 删除指定person_id的文档
5. get_value - 获取单个字段值(返回实际值或默认值)
6. get_values - 批量获取字段值(任一字段无效则返回空字典)
7. del_all_undefined_field - 清理全集合中未定义的字段
8. get_specific_value_list - 根据指定条件返回person_id,value字典
"""
logger = get_module_logger("person_info")
person_info_default = {
"person_id" : None,
"platform" : None,
"user_id" : None,
"nickname" : None,
# "age" : 0,
"relationship_value" : 0,
# "saved" : True,
# "impression" : None,
# "gender" : Unkown,
"konw_time" : 0,
} # 个人信息的各项与默认值在此定义,以下处理会自动创建/补全每一项
class PersonInfoManager:
def __init__(self):
if "person_info" not in db.list_collection_names():
db.create_collection("person_info")
db.person_info.create_index("person_id", unique=True)
def get_person_id(self, platform:str, user_id:int):
"""获取唯一id"""
components = [platform, str(user_id)]
key = "_".join(components)
return hashlib.md5(key.encode()).hexdigest()
async def create_person_info(self, person_id:str, data:dict = {}):
"""创建一个项"""
if not person_id:
logger.debug("创建失败personid不存在")
return
_person_info_default = copy.deepcopy(person_info_default)
_person_info_default["person_id"] = person_id
if data:
for key in _person_info_default:
if key != "person_id" and key in data:
_person_info_default[key] = data[key]
db.person_info.insert_one(_person_info_default)
async def update_one_field(self, person_id:str, field_name:str, value, Data:dict = {}):
"""更新某一个字段,会补全"""
if not field_name in person_info_default.keys():
logger.debug(f"更新'{field_name}'失败,未定义的字段")
return
document = db.person_info.find_one({"person_id": person_id})
if document:
db.person_info.update_one(
{"person_id": person_id},
{"$set": {field_name: value}}
)
else:
Data[field_name] = value
logger.debug(f"更新时{person_id}不存在,已新建")
await self.create_person_info(person_id, Data)
async def del_one_document(self, person_id: str):
"""删除指定 person_id 的文档"""
if not person_id:
logger.debug("删除失败person_id 不能为空")
return
result = db.person_info.delete_one({"person_id": person_id})
if result.deleted_count > 0:
logger.debug(f"删除成功person_id={person_id}")
else:
logger.debug(f"删除失败:未找到 person_id={person_id}")
async def get_value(self, person_id: str, field_name: str):
"""获取指定person_id文档的字段值若不存在该字段则返回该字段的全局默认值"""
if not person_id:
logger.debug("get_value获取失败person_id不能为空")
return None
if field_name not in person_info_default:
logger.debug(f"get_value获取失败字段'{field_name}'未定义")
return None
document = db.person_info.find_one(
{"person_id": person_id},
{field_name: 1}
)
if document and field_name in document:
return document[field_name]
else:
logger.debug(f"获取{person_id}{field_name}失败,已返回默认值{person_info_default[field_name]}")
return person_info_default[field_name]
async def get_values(self, person_id: str, field_names: list) -> dict:
"""获取指定person_id文档的多个字段值若不存在该字段则返回该字段的全局默认值"""
if not person_id:
logger.debug("get_values获取失败person_id不能为空")
return {}
# 检查所有字段是否有效
for field in field_names:
if field not in person_info_default:
logger.debug(f"get_values获取失败字段'{field}'未定义")
return {}
# 构建查询投影(所有字段都有效才会执行到这里)
projection = {field: 1 for field in field_names}
document = db.person_info.find_one(
{"person_id": person_id},
projection
)
result = {}
for field in field_names:
result[field] = document.get(field, person_info_default[field]) if document else person_info_default[field]
return result
async def del_all_undefined_field(self):
"""删除所有项里的未定义字段"""
# 获取所有已定义的字段名
defined_fields = set(person_info_default.keys())
try:
# 遍历集合中的所有文档
for document in db.person_info.find({}):
# 找出文档中未定义的字段
undefined_fields = set(document.keys()) - defined_fields - {'_id'}
if undefined_fields:
# 构建更新操作,使用$unset删除未定义字段
update_result = db.person_info.update_one(
{'_id': document['_id']},
{'$unset': {field: 1 for field in undefined_fields}}
)
if update_result.modified_count > 0:
logger.debug(f"已清理文档 {document['_id']} 的未定义字段: {undefined_fields}")
return
except Exception as e:
logger.error(f"清理未定义字段时出错: {e}")
return
async def get_specific_value_list(
self,
field_name: str,
way: Callable[[Any], bool], # 接受任意类型值
) ->Dict[str, Any]:
"""
获取满足条件的字段值字典
Args:
field_name: 目标字段名
way: 判断函数 (value: Any) -> bool
convert_type: 强制类型转换如float/int等
Returns:
{person_id: value} | {}
Example:
# 查找所有nickname包含"admin"的用户
result = manager.specific_value_list(
"nickname",
lambda x: "admin" in x.lower()
)
"""
if field_name not in person_info_default:
logger.error(f"字段检查失败:'{field_name}'未定义")
return {}
try:
result = {}
for doc in db.person_info.find(
{field_name: {"$exists": True}},
{"person_id": 1, field_name: 1, "_id": 0}
):
try:
value = doc[field_name]
if way(value):
result[doc["person_id"]] = value
except (KeyError, TypeError, ValueError) as e:
logger.debug(f"记录{doc.get('person_id')}处理失败: {str(e)}")
continue
return result
except Exception as e:
logger.error(f"数据库查询失败: {str(e)}", exc_info=True)
return {}
person_info_manager = PersonInfoManager()