feat:将关系改为及时构建

This commit is contained in:
SengokuCola
2025-06-08 17:44:41 +08:00
parent 2df83f0bec
commit 851a559b33
8 changed files with 191 additions and 91 deletions

View File

@@ -16,6 +16,8 @@ from src.chat.focus_chat.info.relation_info import RelationInfo
from json_repair import repair_json from json_repair import repair_json
from src.person_info.person_info import person_info_manager from src.person_info.person_info import person_info_manager
import json import json
import asyncio
from src.chat.utils.chat_message_builder import get_raw_msg_by_timestamp_with_chat
logger = get_logger("processor") logger = get_logger("processor")
@@ -37,6 +39,7 @@ def init_prompt():
1. 根据聊天记录的需求,如果需要你和某个人的信息,请输出你和这个人之间精简的信息 1. 根据聊天记录的需求,如果需要你和某个人的信息,请输出你和这个人之间精简的信息
2. 如果没有特别需要提及的信息,就不用输出这个人的信息 2. 如果没有特别需要提及的信息,就不用输出这个人的信息
3. 如果有人问你对他的看法或者关系,请输出你和这个人之间的信息 3. 如果有人问你对他的看法或者关系,请输出你和这个人之间的信息
4. 你可以完全不输出任何信息,或者不输出某个人
请从这些信息中提取出你对某人的了解信息,信息提取成一串文本: 请从这些信息中提取出你对某人的了解信息,信息提取成一串文本:
@@ -58,6 +61,11 @@ class RelationshipProcessor(BaseProcessor):
super().__init__() super().__init__()
self.subheartflow_id = subheartflow_id self.subheartflow_id = subheartflow_id
self.person_cache: Dict[str, Dict[str, any]] = {} # {person_id: {"info": str, "ttl": int, "start_time": float}}
self.pending_updates: Dict[str, Dict[str, any]] = (
{}
) # {person_id: {"start_time": float, "end_time": float, "grace_period_ttl": int, "chat_id": str}}
self.grace_period_rounds = 5
self.llm_model = LLMRequest( self.llm_model = LLMRequest(
model=global_config.model.relation, model=global_config.model.relation,
@@ -91,54 +99,109 @@ class RelationshipProcessor(BaseProcessor):
return [relation_info] return [relation_info]
async def relation_identify( async def relation_identify(
self, observations: Optional[List[Observation]] = None, self,
observations: Optional[List[Observation]] = None,
): ):
""" """
在回复前进行思考,生成内心想法并收集工具调用结果 在回复前进行思考,生成内心想法并收集工具调用结果
参数:
observations: 观察信息
返回:
如果return_prompt为False:
tuple: (current_mind, past_mind) 当前想法和过去的想法列表
如果return_prompt为True:
tuple: (current_mind, past_mind, prompt) 当前想法、过去的想法列表和使用的prompt
""" """
# 0. 从观察信息中提取所需数据
person_list = []
chat_observe_info = ""
is_group_chat = False
if observations:
for observation in observations:
if isinstance(observation, ChattingObservation):
is_group_chat = observation.is_group_chat
chat_observe_info = observation.get_observe_info()
person_list = observation.person_list
break
if observations is None: # 1. 处理等待更新的条目仅检查TTL不检查是否被重提
observations = [] persons_to_update_now = [] # 等待期结束,需要立即更新的用户
for observation in observations: for person_id, data in list(self.pending_updates.items()):
if isinstance(observation, ChattingObservation): data["grace_period_ttl"] -= 1
# 获取聊天元信息 if data["grace_period_ttl"] <= 0:
is_group_chat = observation.is_group_chat persons_to_update_now.append(person_id)
chat_target_info = observation.chat_target_info
chat_target_name = "对方" # 私聊默认名称 # 触发等待期结束的更新任务
if not is_group_chat and chat_target_info: for person_id in persons_to_update_now:
# 优先使用person_name其次user_nickname最后回退到默认值 if person_id in self.pending_updates:
chat_target_name = ( update_data = self.pending_updates.pop(person_id)
chat_target_info.get("person_name") or chat_target_info.get("user_nickname") or chat_target_name logger.info(f"{self.log_prefix} 用户 {person_id} 等待期结束,开始印象更新。")
asyncio.create_task(
self.update_impression_on_cache_expiry(
person_id, update_data["chat_id"], update_data["start_time"], update_data["end_time"]
) )
# 获取聊天内容 )
chat_observe_info = observation.get_observe_info()
person_list = observation.person_list
nickname_str = "" # 2. 维护活动缓存,并将过期条目移至等待区或立即更新
for nicknames in global_config.bot.alias_names: persons_moved_to_pending = []
nickname_str += f"{nicknames}," for person_id, cache_data in self.person_cache.items():
cache_data["ttl"] -= 1
if cache_data["ttl"] <= 0:
persons_moved_to_pending.append(person_id)
for person_id in persons_moved_to_pending:
if person_id in self.person_cache:
cache_item = self.person_cache.pop(person_id)
start_time = cache_item.get("start_time")
end_time = time.time()
time_elapsed = end_time - start_time
impression_messages = get_raw_msg_by_timestamp_with_chat(self.subheartflow_id, start_time, end_time)
message_count = len(impression_messages)
if message_count > 50 or (time_elapsed > 600 and message_count > 20):
logger.info(
f"{self.log_prefix} 用户 {person_id} 缓存过期,满足立即更新条件 (消息数: {message_count}, 持续时间: {time_elapsed:.0f}s),立即更新。"
)
asyncio.create_task(
self.update_impression_on_cache_expiry(person_id, self.subheartflow_id, start_time, end_time)
)
else:
logger.info(f"{self.log_prefix} 用户 {person_id} 缓存过期,进入更新等待区。")
self.pending_updates[person_id] = {
"start_time": start_time,
"end_time": end_time,
"grace_period_ttl": self.grace_period_rounds,
"chat_id": self.subheartflow_id,
}
# 3. 准备LLM输入和直接使用缓存
if not person_list:
return ""
cached_person_info_str = ""
persons_to_process = []
person_name_list_for_llm = []
for person_id in person_list:
if person_id in self.person_cache:
logger.info(f"{self.log_prefix} 关系识别 (缓存): {person_id}")
person_name = await person_info_manager.get_value(person_id, "person_name")
info = self.person_cache[person_id]["info"]
cached_person_info_str += f"你对 {person_name} 的了解:{info}\n"
else:
# 所有不在活动缓存中的用户包括等待区的都将由LLM处理
persons_to_process.append(person_id)
person_name_list_for_llm.append(await person_info_manager.get_value(person_id, "person_name"))
# 4. 如果没有需要LLM处理的人员直接返回缓存信息
if not persons_to_process:
final_result = cached_person_info_str.strip()
if final_result:
logger.info(f"{self.log_prefix} 关系识别 (全部缓存): {final_result}")
return final_result
# 5. 为需要处理的人员准备LLM prompt
nickname_str = ",".join(global_config.bot.alias_names)
name_block = f"你的名字是{global_config.bot.nickname},你的昵称有{nickname_str},有人也会用这些昵称称呼你。" name_block = f"你的名字是{global_config.bot.nickname},你的昵称有{nickname_str},有人也会用这些昵称称呼你。"
relation_prompt_init = "你对群聊里的人的印象是:\n" if is_group_chat else "你对对方的印象是:\n"
if is_group_chat:
relation_prompt_init = "你对群聊里的人的印象是:\n"
else:
relation_prompt_init = "你对对方的印象是:\n"
relation_prompt = "" relation_prompt = ""
person_name_list = [] for person_id in persons_to_process:
for person in person_list: relation_prompt += f"{await relationship_manager.build_relationship_info(person_id, is_id=True)}\n\n"
relation_prompt += f"{await relationship_manager.build_relationship_info(person, is_id=True)}\n\n"
person_name_list.append(await person_info_manager.get_value(person, "person_name"))
if relation_prompt: if relation_prompt:
relation_prompt = relation_prompt_init + relation_prompt relation_prompt = relation_prompt_init + relation_prompt
else: else:
@@ -151,45 +214,76 @@ class RelationshipProcessor(BaseProcessor):
chat_observe_info=chat_observe_info, chat_observe_info=chat_observe_info,
) )
# print(prompt) # 6. 调用LLM并处理结果
newly_processed_info_str = ""
content = ""
try: try:
logger.info(f"{self.log_prefix} 关系识别prompt: \n{prompt}\n") logger.info(f"{self.log_prefix} 关系识别prompt: \n{prompt}\n")
content, _ = await self.llm_model.generate_response_async(prompt=prompt) content, _ = await self.llm_model.generate_response_async(prompt=prompt)
if not content: if content:
print(f"content: {content}")
content_json = json.loads(repair_json(content))
for person_name, person_info in content_json.items():
if person_name in person_name_list_for_llm:
try:
idx = person_name_list_for_llm.index(person_name)
person_id = persons_to_process[idx]
# 关键:检查此人是否在等待区,如果是,则为"唤醒"
start_time = time.time() # 新用户的默认start_time
if person_id in self.pending_updates:
logger.info(f"{self.log_prefix} 用户 {person_id} 在等待期被LLM重提重新激活缓存。")
revived_item = self.pending_updates.pop(person_id)
start_time = revived_item["start_time"]
self.person_cache[person_id] = {
"info": person_info,
"ttl": 5,
"start_time": start_time,
}
newly_processed_info_str += f"你对 {person_name} 的了解:{person_info}\n"
except (ValueError, IndexError):
continue
else:
logger.warning(f"{self.log_prefix} LLM返回空结果关系识别失败。") logger.warning(f"{self.log_prefix} LLM返回空结果关系识别失败。")
print(f"content: {content}")
content = repair_json(content)
content = json.loads(content)
person_info_str = ""
for person_name, person_info in content.items():
# print(f"person_name: {person_name}, person_info: {person_info}")
# print(f"person_list: {person_name_list}")
if person_name not in person_name_list:
continue
person_str = f"你对 {person_name} 的了解:{person_info}\n"
person_info_str += person_str
except Exception as e: except Exception as e:
# 处理总体异常
logger.error(f"{self.log_prefix} 执行LLM请求或处理响应时出错: {e}") logger.error(f"{self.log_prefix} 执行LLM请求或处理响应时出错: {e}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
person_info_str = "关系识别过程中出现错误" newly_processed_info_str = "关系识别过程中出现错误"
# 7. 合并缓存和新处理的信息
person_info_str = (cached_person_info_str + newly_processed_info_str).strip()
if person_info_str == "None": if person_info_str == "None":
person_info_str = "" person_info_str = ""
# 记录初步思考结果
logger.info(f"{self.log_prefix} 关系识别: {person_info_str}") logger.info(f"{self.log_prefix} 关系识别: {person_info_str}")
return person_info_str return person_info_str
async def update_impression_on_cache_expiry(
self, person_id: str, chat_id: str, start_time: float, end_time: float
):
"""
在缓存过期时,获取聊天记录并更新用户印象
"""
logger.info(f"缓存过期,开始为 {person_id} 更新印象。时间范围:{start_time} -> {end_time}")
try:
impression_messages = get_raw_msg_by_timestamp_with_chat(chat_id, start_time, end_time)
if impression_messages:
logger.info(f"{person_id} 获取到 {len(impression_messages)} 条消息用于印象更新。")
await relationship_manager.update_person_impression(
person_id=person_id, timestamp=end_time, bot_engaged_messages=impression_messages
)
else:
logger.info(f"在指定时间范围内没有找到 {person_id} 的消息,不更新印象。")
except Exception as e:
logger.error(f"{person_id} 更新印象时发生错误: {e}")
logger.error(traceback.format_exc())
init_prompt() init_prompt()

View File

@@ -20,22 +20,27 @@ logger = get_logger("processor")
def init_prompt(): def init_prompt():
indentify_prompt = """ indentify_prompt = """
<聊天记录>
{chat_observe_info}
</聊天记录>
<人格>
{name_block} {name_block}
你是一个AI但是你伪装成了一个人类不要让别人发现这一点 你是一个AI但是你伪装成了一个人类不要让别人发现这一点,你的性格是:
请参考以下人格,不要被当前聊天内容中的内容误导:
{prompt_personality} {prompt_personality}
{indentify_block} {indentify_block}
</人格>
以下是正在进行的聊天内容: 请区分聊天记录的内容和你稳定的人格,聊天记录是现在发生的事情,人格是你稳定的独特的特质。
现在是{time_now},你正在参与聊天
{chat_observe_info}
现在请你输出对自己的描述:请严格遵守以下规则 {name_block}
现在请你提取你人格的关键信息,提取成一串文本:
1. 根据聊天记录,输出与聊天记录相关的自我描述,包括人格,形象等等,对人格形象进行精简 1. 根据聊天记录,输出与聊天记录相关的自我描述,包括人格,形象等等,对人格形象进行精简
2. 思考有没有内容与你的描述相关 2. 思考有没有内容与你的描述相关
3. 如果没有明显相关内容,请输出十几个字的简短自我描述 3. 如果没有明显相关内容,请输出十几个字的简短自我描述
现在请输出你的自我描述,请注意不要输出多余内容(包括前后缀,括号()表情包at或 @等 ) 现在请输出你的自我描述,格式是:“你是.....,你.................(描述)”
请注意不要输出多余内容(包括前后缀,括号()表情包at或 @等 )
""" """
Prompt(indentify_prompt, "indentify_prompt") Prompt(indentify_prompt, "indentify_prompt")

View File

@@ -37,11 +37,13 @@ def init_prompt():
{extra_info_block} {extra_info_block}
{relation_info_block}
{time_block} {time_block}
你现在正在群里聊天,以下是群里正在进行的聊天内容: 你现在正在群里聊天,以下是群里正在进行的聊天内容:
{chat_info} {chat_info}
{relation_info_block}
以上是聊天内容,你需要了解聊天记录中的内容 以上是聊天内容,你需要了解聊天记录中的内容
@@ -605,6 +607,8 @@ class DefaultReplyer:
platform=self.chat_stream.platform, platform=self.chat_stream.platform,
) )
# await anchor_message.process()
bot_message = MessageSending( bot_message = MessageSending(
message_id=message_id, # 使用片段的唯一ID message_id=message_id, # 使用片段的唯一ID
chat_stream=self.chat_stream, chat_stream=self.chat_stream,

View File

@@ -132,13 +132,13 @@ class ChattingObservation(Observation):
# logger.debug(f"找到的锚定消息find_msg: {find_msg}") # logger.debug(f"找到的锚定消息find_msg: {find_msg}")
break break
else: else:
similarity = difflib.SequenceMatcher(None, text, message["processed_plain_text"]).ratio() similarity = difflib.SequenceMatcher(None, text, message["raw_message"]).ratio()
msg_list.append({"message": message, "similarity": similarity}) msg_list.append({"message": message, "similarity": similarity})
# logger.debug(f"对锚定消息检查message: {message['processed_plain_text']},similarity: {similarity}") # logger.debug(f"对锚定消息检查message: {message['processed_plain_text']},similarity: {similarity}")
if not find_msg: if not find_msg:
if msg_list: if msg_list:
msg_list.sort(key=lambda x: x["similarity"], reverse=True) msg_list.sort(key=lambda x: x["similarity"], reverse=True)
if msg_list[0]["similarity"] >= 0.5: # 只返回相似度大于等于0.5的消息 if msg_list[0]["similarity"] >= 0.9: # 只返回相似度大于等于0.5的消息
find_msg = msg_list[0]["message"] find_msg = msg_list[0]["message"]
else: else:
logger.debug("没有找到锚定消息,相似度低") logger.debug("没有找到锚定消息,相似度低")
@@ -191,6 +191,7 @@ class ChattingObservation(Observation):
"detailed_plain_text": find_msg.get("processed_plain_text"), "detailed_plain_text": find_msg.get("processed_plain_text"),
"processed_plain_text": find_msg.get("processed_plain_text"), "processed_plain_text": find_msg.get("processed_plain_text"),
} }
# print(f"message_dict: {message_dict}")
find_rec_msg = MessageRecv(message_dict) find_rec_msg = MessageRecv(message_dict)
# logger.debug(f"锚定消息处理后find_rec_msg: {find_rec_msg}") # logger.debug(f"锚定消息处理后find_rec_msg: {find_rec_msg}")
return find_rec_msg return find_rec_msg

View File

@@ -20,7 +20,6 @@ from .common.server import global_server, Server
from rich.traceback import install from rich.traceback import install
from .chat.focus_chat.expressors.exprssion_learner import expression_learner from .chat.focus_chat.expressors.exprssion_learner import expression_learner
from .api.main import start_api_server from .api.main import start_api_server
from .person_info.impression_update_task import impression_update_task
install(extra_lines=3) install(extra_lines=3)
@@ -60,9 +59,6 @@ class MainSystem:
# 添加遥测心跳任务 # 添加遥测心跳任务
await async_task_manager.add_task(TelemetryHeartBeatTask()) await async_task_manager.add_task(TelemetryHeartBeatTask())
# 添加印象更新任务
await async_task_manager.add_task(impression_update_task)
# 启动API服务器 # 启动API服务器
start_api_server() start_api_server()
logger.success("API服务器启动成功") logger.success("API服务器启动成功")

View File

@@ -11,12 +11,12 @@ from collections import defaultdict
logger = get_logger("relation") logger = get_logger("relation")
# 暂时弃用,改为实时更新
class ImpressionUpdateTask(AsyncTask): class ImpressionUpdateTask(AsyncTask):
def __init__(self): def __init__(self):
super().__init__( super().__init__(
task_name="impression_update", task_name="impression_update",
wait_before_start=5, wait_before_start=60,
run_interval=global_config.relationship.build_relationship_interval, run_interval=global_config.relationship.build_relationship_interval,
) )
@@ -24,10 +24,10 @@ class ImpressionUpdateTask(AsyncTask):
try: try:
# 获取最近的消息 # 获取最近的消息
current_time = int(time.time()) current_time = int(time.time())
start_time = current_time - 600 # 1小时 start_time = current_time - global_config.relationship.build_relationship_interval # 100分钟
# 获取所有消息 # 获取所有消息
messages = get_raw_msg_by_timestamp(timestamp_start=start_time, timestamp_end=current_time, limit=300) messages = get_raw_msg_by_timestamp(timestamp_start=start_time, timestamp_end=current_time)
if not messages: if not messages:
logger.info("没有找到需要处理的消息") logger.info("没有找到需要处理的消息")
@@ -45,6 +45,10 @@ class ImpressionUpdateTask(AsyncTask):
# 处理每个聊天组 # 处理每个聊天组
for chat_id, msgs in chat_messages.items(): for chat_id, msgs in chat_messages.items():
# 获取chat_stream # 获取chat_stream
if len(msgs) < 30:
logger.info(f"聊天组 {chat_id} 消息数小于30跳过处理")
continue
chat_stream = chat_manager.get_stream(chat_id) chat_stream = chat_manager.get_stream(chat_id)
if not chat_stream: if not chat_stream:
logger.warning(f"未找到聊天组 {chat_id} 的chat_stream跳过处理") logger.warning(f"未找到聊天组 {chat_id} 的chat_stream跳过处理")
@@ -168,7 +172,3 @@ class ImpressionUpdateTask(AsyncTask):
except Exception as e: except Exception as e:
logger.exception(f"更新印象任务失败: {str(e)}") logger.exception(f"更新印象任务失败: {str(e)}")
# 创建任务实例
impression_update_task = ImpressionUpdateTask()

View File

@@ -282,8 +282,8 @@ class RelationshipManager:
for original_name, mapped_name in name_mapping.items(): for original_name, mapped_name in name_mapping.items():
points = points.replace(mapped_name, original_name) points = points.replace(mapped_name, original_name)
logger.info(f"prompt: {prompt}") # logger.info(f"prompt: {prompt}")
logger.info(f"points: {points}") # logger.info(f"points: {points}")
if not points: if not points:
logger.warning(f"未能从LLM获取 {person_name} 的新印象") logger.warning(f"未能从LLM获取 {person_name} 的新印象")
@@ -296,6 +296,7 @@ class RelationshipManager:
if points_data == "none" or not points_data or points_data.get("point") == "none": if points_data == "none" or not points_data or points_data.get("point") == "none":
points_list = [] points_list = []
else: else:
logger.info(f"points_data: {points_data}")
if isinstance(points_data, dict) and "points" in points_data: if isinstance(points_data, dict) and "points" in points_data:
points_data = points_data["points"] points_data = points_data["points"]
if not isinstance(points_data, list): if not isinstance(points_data, list):

View File

@@ -41,12 +41,11 @@ identity_detail = [
[expression] [expression]
# 表达方式 # 表达方式
expression_style = "描述麦麦说话的表达风格,表达习惯,例如:(回复尽量简短一些。可以参考贴吧,知乎和微博的回复风格,回复不要浮夸,不要用夸张修辞,平淡一些。不要有额外的符号,尽量简单简短)" expression_style = "描述麦麦说话的表达风格,表达习惯,例如:(回复尽量简短一些。可以参考贴吧,知乎和微博的回复风格,回复不要浮夸,不要用夸张修辞,平淡一些。不要有额外的符号,尽量简单简短)"
enable_expression_learning = false # 是否启用表达学习,麦麦会学习人类说话风格 enable_expression_learning = false # 是否启用表达学习,麦麦会学习不同群里人类说话风格(群之间不互通)
learning_interval = 600 # 学习间隔 单位秒 learning_interval = 600 # 学习间隔 单位秒
[relationship] [relationship]
give_name = true # 麦麦是否给其他人取名,关闭后无法使用禁言功能 give_name = true # 麦麦是否给其他人取名
build_relationship_interval = 600 # 构建关系间隔 单位秒
[chat] #麦麦的聊天通用设置 [chat] #麦麦的聊天通用设置
chat_mode = "normal" # 聊天模式 —— 普通模式normal专注模式focus在普通模式和专注模式之间自动切换 chat_mode = "normal" # 聊天模式 —— 普通模式normal专注模式focus在普通模式和专注模式之间自动切换