better:更好的关系处理器

This commit is contained in:
SengokuCola
2025-06-27 00:30:45 +08:00
parent 5f67774f6a
commit a411aa3da4
9 changed files with 199 additions and 375 deletions

View File

@@ -413,7 +413,6 @@ class HeartFChatting:
"action_result": { "action_result": {
"action_type": "error", "action_type": "error",
"action_data": {}, "action_data": {},
"reasoning": f"上下文处理失败: {e}",
}, },
"observed_messages": "", "observed_messages": "",
}, },
@@ -650,143 +649,9 @@ class HeartFChatting:
return all_plan_info, processor_time_costs return all_plan_info, processor_time_costs
async def _process_post_planning_processors(self, observations: List[Observation], action_data: dict) -> dict:
"""
处理后期处理器(规划后执行的处理器)
包括:关系处理器、表达选择器、记忆激活器
参数:
observations: 观察器列表
action_data: 原始动作数据
返回:
dict: 更新后的动作数据
"""
logger.info(f"{self.log_prefix} 开始执行后期处理器")
# 创建所有后期任务
task_list = []
task_to_name_map = {}
# 添加后期处理器任务
for processor in self.post_planning_processors:
processor_name = processor.__class__.__name__
async def run_processor_with_timeout(proc=processor):
return await asyncio.wait_for(
proc.process_info(observations=observations),
timeout=global_config.focus_chat.processor_max_time,
)
task = asyncio.create_task(run_processor_with_timeout())
task_list.append(task)
task_to_name_map[task] = ("processor", processor_name)
logger.info(f"{self.log_prefix} 启动后期处理器任务: {processor_name}")
# 添加记忆激活器任务
async def run_memory_with_timeout():
return await asyncio.wait_for(
self.memory_activator.activate_memory(observations),
timeout=MEMORY_ACTIVATION_TIMEOUT,
)
memory_task = asyncio.create_task(run_memory_with_timeout())
task_list.append(memory_task)
task_to_name_map[memory_task] = ("memory", "MemoryActivator")
logger.info(f"{self.log_prefix} 启动记忆激活器任务")
# 如果没有任何后期任务,直接返回
if not task_list:
logger.info(f"{self.log_prefix} 没有启用的后期处理器或记忆激活器")
return action_data
# 等待所有任务完成
pending_tasks = set(task_list)
all_post_plan_info = []
running_memorys = []
while pending_tasks:
done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
task_type, task_name = task_to_name_map[task]
try:
result = await task
if task_type == "processor":
logger.info(f"{self.log_prefix} 后期处理器 {task_name} 已完成!")
if result is not None:
all_post_plan_info.extend(result)
else:
logger.warning(f"{self.log_prefix} 后期处理器 {task_name} 返回了 None")
elif task_type == "memory":
logger.info(f"{self.log_prefix} 记忆激活器已完成!")
if result is not None:
running_memorys = result
else:
logger.warning(f"{self.log_prefix} 记忆激活器返回了 None")
running_memorys = []
except asyncio.TimeoutError:
if task_type == "processor":
logger.warning(
f"{self.log_prefix} 后期处理器 {task_name} 超时(>{global_config.focus_chat.processor_max_time}s已跳过"
)
elif task_type == "memory":
logger.warning(f"{self.log_prefix} 记忆激活器超时(>{MEMORY_ACTIVATION_TIMEOUT}s已跳过")
running_memorys = []
except Exception as e:
if task_type == "processor":
logger.error(
f"{self.log_prefix} 后期处理器 {task_name} 执行失败. 错误: {e}",
exc_info=True,
)
elif task_type == "memory":
logger.error(f"{self.log_prefix} 记忆激活器执行失败. 错误: {e}", exc_info=True)
running_memorys = []
# 将后期处理器的结果整合到 action_data 中
updated_action_data = action_data.copy()
relation_info = ""
selected_expressions = []
structured_info = ""
for info in all_post_plan_info:
if isinstance(info, RelationInfo):
relation_info = info.get_processed_info()
elif isinstance(info, ExpressionSelectionInfo):
selected_expressions = info.get_expressions_for_action_data()
elif isinstance(info, StructuredInfo):
structured_info = info.get_processed_info()
if relation_info:
updated_action_data["relation_info_block"] = relation_info
if selected_expressions:
updated_action_data["selected_expressions"] = selected_expressions
if structured_info:
updated_action_data["structured_info"] = structured_info
# 特殊处理running_memorys
if running_memorys:
memory_str = "以下是当前在聊天中,你回忆起的记忆:\n"
for running_memory in running_memorys:
memory_str += f"{running_memory['content']}\n"
updated_action_data["memory_block"] = memory_str
logger.info(f"{self.log_prefix} 添加了 {len(running_memorys)} 个激活的记忆到action_data")
if all_post_plan_info or running_memorys:
logger.info(
f"{self.log_prefix} 后期处理完成,产生了 {len(all_post_plan_info)} 个信息项和 {len(running_memorys)} 个记忆"
)
return updated_action_data
async def _process_post_planning_processors_with_timing( async def _process_post_planning_processors_with_timing(
self, observations: List[Observation], action_data: dict self, observations: List[Observation], action_type: str, action_data: dict
) -> tuple[dict, dict]: ) -> tuple[dict, dict]:
""" """
处理后期处理器(规划后执行的处理器)并收集详细时间统计 处理后期处理器(规划后执行的处理器)并收集详细时间统计
@@ -794,6 +659,7 @@ class HeartFChatting:
参数: 参数:
observations: 观察器列表 observations: 观察器列表
action_type: 动作类型
action_data: 原始动作数据 action_data: 原始动作数据
返回: 返回:
@@ -815,7 +681,9 @@ class HeartFChatting:
start_time = time.time() start_time = time.time()
try: try:
result = await asyncio.wait_for( result = await asyncio.wait_for(
proc.process_info(observations=observations), proc.process_info(
observations=observations, action_type=action_type, action_data=action_data
),
timeout=global_config.focus_chat.processor_max_time, timeout=global_config.focus_chat.processor_max_time,
) )
end_time = time.time() end_time = time.time()
@@ -1074,7 +942,7 @@ class HeartFChatting:
# 记录详细的后处理器时间 # 记录详细的后处理器时间
post_start_time = time.time() post_start_time = time.time()
action_data, post_processor_time_costs = await self._process_post_planning_processors_with_timing( action_data, post_processor_time_costs = await self._process_post_planning_processors_with_timing(
self.observations, action_data self.observations, action_type, action_data
) )
post_end_time = time.time() post_end_time = time.time()
logger.info(f"{self.log_prefix} 后期处理器总耗时: {post_end_time - post_start_time:.3f}") logger.info(f"{self.log_prefix} 后期处理器总耗时: {post_end_time - post_start_time:.3f}")

View File

@@ -181,9 +181,18 @@ class HeartFCMessageReceiver:
mes_name = chat.group_info.group_name if chat.group_info else "私聊" mes_name = chat.group_info.group_name if chat.group_info else "私聊"
# current_time = time.strftime("%H:%M:%S", time.localtime(message.message_info.time)) # current_time = time.strftime("%H:%M:%S", time.localtime(message.message_info.time))
current_talk_frequency = global_config.chat.get_current_talk_frequency(chat.stream_id) current_talk_frequency = global_config.chat.get_current_talk_frequency(chat.stream_id)
logger.info(
f"[{mes_name}]{userinfo.user_nickname}:{message.processed_plain_text}[当前回复频率: {current_talk_frequency}]" # 如果消息中包含图片标识,则日志展示为图片
) import re
picid_match = re.search(r"\[picid:([^\]]+)\]", message.processed_plain_text)
if picid_match:
logger.info(
f"[{mes_name}]{userinfo.user_nickname}: [图片] [当前回复频率: {current_talk_frequency}]"
)
else:
logger.info(
f"[{mes_name}]{userinfo.user_nickname}:{message.processed_plain_text}[当前回复频率: {current_talk_frequency}]"
)
# 8. 关系处理 # 8. 关系处理
if global_config.relationship.enable_relationship: if global_config.relationship.enable_relationship:

View File

@@ -27,7 +27,13 @@ class ExpressionSelectorProcessor(BaseProcessor):
name = get_chat_manager().get_stream_name(self.subheartflow_id) name = get_chat_manager().get_stream_name(self.subheartflow_id)
self.log_prefix = f"[{name}] 表达选择器" self.log_prefix = f"[{name}] 表达选择器"
async def process_info(self, observations: List[Observation] = None, *infos) -> List[InfoBase]: async def process_info(
self,
observations: List[Observation] = None,
action_type: str = None,
action_data: dict = None,
**kwargs,
) -> List[InfoBase]:
"""处理信息对象 """处理信息对象
Args: Args:
@@ -70,9 +76,14 @@ class ExpressionSelectorProcessor(BaseProcessor):
return [] return []
try: try:
if action_type == "reply":
target_message = action_data.get("reply_to", "")
else:
target_message = ""
# LLM模式调用LLM选择5-10个然后随机选5个 # LLM模式调用LLM选择5-10个然后随机选5个
selected_expressions = await expression_selector.select_suitable_expressions_llm( selected_expressions = await expression_selector.select_suitable_expressions_llm(
self.subheartflow_id, chat_info, max_num=12, min_num=2 self.subheartflow_id, chat_info, max_num=12, min_num=2, target_message=target_message
) )
cache_size = len(selected_expressions) if selected_expressions else 0 cache_size = len(selected_expressions) if selected_expressions else 0
mode_desc = f"LLM模式已缓存{cache_size}个)" mode_desc = f"LLM模式已缓存{cache_size}个)"

View File

@@ -37,139 +37,51 @@ SEGMENT_CLEANUP_CONFIG = {
"cleanup_interval_hours": 1, # 清理间隔(小时) "cleanup_interval_hours": 1, # 清理间隔(小时)
} }
# 用于随机生成prompt示例的资源池
USER_EXAMPLE_KEYS = ["用户A", "小明", "Alice", "陈皮", "老王", "Bob", "张三", "李四"]
USER_EXAMPLE_VALUES = [
"ta的昵称",
"ta对你的态度",
"你对ta的印象",
"ta最近心情如何",
"你们的关系",
"ta的身份",
"ta的兴趣爱好",
"ta和你的共同点",
"ta的习惯",
"你们最近做的事",
"你对ta的语气",
"你们的互动方式",
"给你的第一印象",
"你们最近聊过什么",
]
BOT_EXAMPLE_VALUES = [
"身份",
"性格",
"你的原则",
"你的知识",
"你的目标",
"你的爱好",
"你最近在做什么",
"头像",
"年龄",
"性别",
"职业",
"兴趣爱好",
"习惯",
"目标",
"原则",
"知识",
"爱好",
]
logger = get_logger("processor") logger = get_logger("processor")
def _generate_random_prompt_example() -> str:
"""动态生成一个随机的、符合规则的JSON示例字符串"""
bot_nickname = global_config.bot.nickname
bot_aliases = list(global_config.bot.alias_names)
# 确定示例数量
num_user_examples = random.randint(1, 2)
num_bot_examples = random.randint(1, 2)
example_dict = {}
# 1. 生成用户提取示例
user_keys = random.sample(USER_EXAMPLE_KEYS, min(num_user_examples, len(USER_EXAMPLE_KEYS)))
user_values = random.sample(USER_EXAMPLE_VALUES, min(num_user_examples, len(USER_EXAMPLE_VALUES)))
for i in range(len(user_keys)):
example_dict[user_keys[i]] = user_values[i]
# 2. 生成bot自身示例 (使用昵称和别名避免key重复)
bot_name_pool = [bot_nickname] + bot_aliases
random.shuffle(bot_name_pool)
bot_values = random.sample(BOT_EXAMPLE_VALUES, min(num_bot_examples, len(BOT_EXAMPLE_VALUES)))
for i in range(min(num_bot_examples, len(bot_name_pool), len(bot_values))):
example_dict[bot_name_pool[i]] = bot_values[i]
# 3. 添加固定示例
example_dict["person_name"] = "其他信息"
# 随机化顺序并格式化为JSON字符串
items = list(example_dict.items())
random.shuffle(items)
shuffled_dict = dict(items)
return json.dumps(shuffled_dict, ensure_ascii=False, indent=4)
def init_prompt(): def init_prompt():
relationship_prompt = """ relationship_prompt = """
<聊天记录> <聊天记录>
{chat_observe_info} {chat_observe_info}
</聊天记录> </聊天记录>
{info_cache_block}
请不要重复调取相同的信息
{name_block} {name_block}
请你阅读聊天记录,查看是否需要调取某个人的信息,这个人可以是出现在聊天记录中的,也可以是记录中提到的人,也可以是你自己({bot_name})。 现在,你想要回复{person_name}的消息,消息内容是:{target_message}。请根据聊天记录和你要回复的消息,从你对{person_name}的了解中提取有关的信息:
你不同程度上认识群聊里的人,以及他们谈论到的人,你可以根据聊天记录,回忆起有关他们的信息,帮助你参与聊天 1.你需要提供你想要提取的信息具体是哪方面的信息例如年龄性别对ta的印象最近发生的事等等。
1.你需要提供用户名和你想要提取的信息名称类型来进行调取 2.请注意,请不要重复调取相同的信息,已经调取的信息如下:
2.请注意,提取的信息类型一定要和用户有关,不要提取无关的信息 {info_cache_block}
3.你也可以调取有关自己({bot_name})的信息 3.如果当前聊天记录中没有需要查询的信息,或者现有信息已经足够回复,请返回{{"none": "不需要查询"}}
4.如果当前聊天记录中没有需要查询的信息,或者现有信息已经足够回复,请返回{{"none": "不需要查询"}}
请以json格式输出例如 请以json格式输出例如
{example_json} {{
"info_type": "信息类型",
如果不需要查询任何信息,请输出: }}
{{"none": "不需要查询"}}
请严格按照json输出格式不要输出多余内容可以同时查询多个人的信息
请严格按照json输出格式不要输出多余内容
""" """
Prompt(relationship_prompt, "relationship_prompt") Prompt(relationship_prompt, "relationship_prompt")
fetch_info_prompt = """ fetch_info_prompt = """
{name_block} {name_block}
以下是你在之前与{person_name}的交流中,产生的对{person_name}的了解,请你从中提取用户的有关"{info_type}"的信息如果用户没有相关信息请输出none 以下是你在之前与{person_name}的交流中,产生的对{person_name}的了解:
{person_impression_block} {person_impression_block}
{points_text_block} {points_text_block}
请严格按照以下json输出格式不要输出多余内容
请从中提取用户"{person_name}"的有关"{info_type}"信息
请以json格式输出例如
{{ {{
{info_json_str} {info_json_str}
}} }}
请严格按照json输出格式不要输出多余内容
""" """
Prompt(fetch_info_prompt, "fetch_person_info_prompt") Prompt(fetch_info_prompt, "fetch_person_info_prompt")
fetch_bot_info_prompt = """
你是{nickname},你的昵称有{alias_names}
以下是你对自己的了解,请你从中提取和"{info_type}"有关的信息如果无法提取请输出none
{person_impression_block}
{points_text_block}
请严格按照以下json输出格式不要输出多余内容
{{
"{info_type}": "有关你自己的{info_type}的信息内容"
}}
"""
Prompt(fetch_bot_info_prompt, "fetch_bot_info_prompt")
class PersonImpressionpProcessor(BaseProcessor): class PersonImpressionpProcessor(BaseProcessor):
log_prefix = "关系" log_prefix = "关系"
@@ -293,7 +205,7 @@ class PersonImpressionpProcessor(BaseProcessor):
} }
segments.append(new_segment) segments.append(new_segment)
person_name = get_person_info_manager().get_value(person_id, "person_name") or person_id person_name = get_person_info_manager().get_value_sync(person_id, "person_name") or person_id
logger.info( logger.info(
f"{self.log_prefix} 眼熟用户 {person_name}{time.strftime('%H:%M:%S', time.localtime(potential_start_time))} - {time.strftime('%H:%M:%S', time.localtime(message_time))} 之间有 {new_segment['message_count']} 条消息" f"{self.log_prefix} 眼熟用户 {person_name}{time.strftime('%H:%M:%S', time.localtime(potential_start_time))} - {time.strftime('%H:%M:%S', time.localtime(message_time))} 之间有 {new_segment['message_count']} 条消息"
) )
@@ -341,7 +253,8 @@ class PersonImpressionpProcessor(BaseProcessor):
"message_count": self._count_messages_in_timerange(potential_start_time, message_time), "message_count": self._count_messages_in_timerange(potential_start_time, message_time),
} }
segments.append(new_segment) segments.append(new_segment)
person_name = get_person_info_manager().get_value(person_id, "person_name") or person_id person_info_manager = get_person_info_manager()
person_name = person_info_manager.get_value_sync(person_id, "person_name") or person_id
logger.info(f"{self.log_prefix} 重新眼熟用户 {person_name} 创建新消息段超过10条消息间隔: {new_segment}") logger.info(f"{self.log_prefix} 重新眼熟用户 {person_name} 创建新消息段超过10条消息间隔: {new_segment}")
self._save_cache() self._save_cache()
@@ -515,16 +428,26 @@ class PersonImpressionpProcessor(BaseProcessor):
# 统筹各模块协作、对外提供服务接口 # 统筹各模块协作、对外提供服务接口
# ================================ # ================================
async def process_info(self, observations: List[Observation] = None, *infos) -> List[InfoBase]: async def process_info(
self,
observations: List[Observation] = None,
action_type: str = None,
action_data: dict = None,
**kwargs,
) -> List[InfoBase]:
"""处理信息对象 """处理信息对象
Args: Args:
*infos: 可变数量的InfoBase类型的信息对象 observations: 观察对象列表
action_type: 动作类型
action_data: 动作数据
Returns: Returns:
List[InfoBase]: 处理后的结构化信息列表 List[InfoBase]: 处理后的结构化信息列表
""" """
relation_info_str = await self.relation_identify(observations) await self.build_relation(observations)
relation_info_str = await self.relation_identify(observations,action_type,action_data)
if relation_info_str: if relation_info_str:
relation_info = RelationInfo() relation_info = RelationInfo()
@@ -535,28 +458,14 @@ class PersonImpressionpProcessor(BaseProcessor):
return [relation_info] return [relation_info]
async def relation_identify( async def build_relation(self, observations: List[Observation] = None):
self, """构建关系"""
observations: List[Observation] = None,
):
"""
在回复前进行思考,生成内心想法并收集工具调用结果
"""
# 0. 执行定期清理
self._cleanup_old_segments() self._cleanup_old_segments()
# 1. 从观察信息中提取所需数据
# 需要兼容私聊
chat_observe_info = ""
current_time = time.time() current_time = time.time()
if observations: if observations:
for observation in observations: for observation in observations:
if isinstance(observation, ChattingObservation): if isinstance(observation, ChattingObservation):
chat_observe_info = observation.get_observe_info()
# latest_message_time = observation.last_observe_time
# 从聊天观察中提取用户信息并更新消息段
# 获取最新的非bot消息来更新消息段
latest_messages = get_raw_msg_by_timestamp_with_chat( latest_messages = get_raw_msg_by_timestamp_with_chat(
self.subheartflow_id, self.subheartflow_id,
self.last_processed_message_time, self.last_processed_message_time,
@@ -610,6 +519,55 @@ class PersonImpressionpProcessor(BaseProcessor):
del self.person_engaged_cache[person_id] del self.person_engaged_cache[person_id]
self._save_cache() self._save_cache()
async def relation_identify(
self,
observations: List[Observation] = None,
action_type: str = None,
action_data: dict = None,
):
"""
从人物获取信息
"""
chat_observe_info = ""
current_time = time.time()
if observations:
for observation in observations:
if isinstance(observation, ChattingObservation):
chat_observe_info = observation.get_observe_info()
# latest_message_time = observation.last_observe_time
# 从聊天观察中提取用户信息并更新消息段
# 获取最新的非bot消息来更新消息段
latest_messages = get_raw_msg_by_timestamp_with_chat(
self.subheartflow_id,
self.last_processed_message_time,
current_time,
limit=50, # 获取自上次处理后的消息
)
if latest_messages:
# 处理所有新的非bot消息
for latest_msg in latest_messages:
user_id = latest_msg.get("user_id")
platform = latest_msg.get("user_platform") or latest_msg.get("chat_info_platform")
msg_time = latest_msg.get("time", 0)
if (
user_id
and platform
and user_id != global_config.bot.qq_account
and msg_time > self.last_processed_message_time
):
from src.person_info.person_info import PersonInfoManager
person_id = PersonInfoManager.get_person_id(platform, user_id)
self._update_message_segments(person_id, msg_time)
logger.debug(
f"{self.log_prefix} 更新用户 {person_id} 的消息段,消息时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(msg_time))}"
)
self.last_processed_message_time = max(self.last_processed_message_time, msg_time)
break
for person_id in list(self.info_fetched_cache.keys()): for person_id in list(self.info_fetched_cache.keys()):
for info_type in list(self.info_fetched_cache[person_id].keys()): for info_type in list(self.info_fetched_cache[person_id].keys()):
self.info_fetched_cache[person_id][info_type]["ttl"] -= 1 self.info_fetched_cache[person_id][info_type]["ttl"] -= 1
@@ -618,7 +576,37 @@ class PersonImpressionpProcessor(BaseProcessor):
if not self.info_fetched_cache[person_id]: if not self.info_fetched_cache[person_id]:
del self.info_fetched_cache[person_id] del self.info_fetched_cache[person_id]
# 5. 为需要处理的人员准备LLM prompt
if action_type != "reply":
return None
target_message = action_data.get("reply_to", "")
if ":" in target_message:
parts = target_message.split(":", 1)
elif "" in target_message:
parts = target_message.split("", 1)
else:
logger.warning(f"reply_to格式不正确: {target_message},跳过关系识别")
return None
if len(parts) != 2:
logger.warning(f"reply_to格式不正确: {target_message},跳过关系识别")
return None
sender = parts[0].strip()
text = parts[1].strip()
person_info_manager = get_person_info_manager()
person_id = person_info_manager.get_person_id_by_person_name(sender)
if not person_id:
logger.warning(f"未找到用户 {sender} 的ID跳过关系识别")
return None
nickname_str = ",".join(global_config.bot.alias_names) nickname_str = ",".join(global_config.bot.alias_names)
name_block = f"你的名字是{global_config.bot.nickname},你的昵称有{nickname_str},有人也会用这些昵称称呼你。" name_block = f"你的名字是{global_config.bot.nickname},你的昵称有{nickname_str},有人也会用这些昵称称呼你。"
@@ -638,19 +626,16 @@ class PersonImpressionpProcessor(BaseProcessor):
f"你已经调取了[{info_fetching['person_name']}]的[{info_fetching['info_type']}]信息\n" f"你已经调取了[{info_fetching['person_name']}]的[{info_fetching['info_type']}]信息\n"
) )
example_json = _generate_random_prompt_example()
prompt = (await global_prompt_manager.get_prompt_async("relationship_prompt")).format( prompt = (await global_prompt_manager.get_prompt_async("relationship_prompt")).format(
name_block=name_block,
bot_name=global_config.bot.nickname,
time_now=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
chat_observe_info=chat_observe_info, chat_observe_info=chat_observe_info,
name_block=name_block,
info_cache_block=info_cache_block, info_cache_block=info_cache_block,
example_json=example_json, person_name=sender,
target_message=text,
) )
try: try:
logger.debug(f"{self.log_prefix} 人物信息prompt: \n{prompt}\n") logger.info(f"{self.log_prefix} 人物信息prompt: \n{prompt}\n")
content, _ = await self.llm_model.generate_response_async(prompt=prompt) content, _ = await self.llm_model.generate_response_async(prompt=prompt)
if content: if content:
# print(f"content: {content}") # print(f"content: {content}")
@@ -661,29 +646,12 @@ class PersonImpressionpProcessor(BaseProcessor):
logger.info(f"{self.log_prefix} LLM判断当前不需要查询任何信息{content_json.get('none', '')}") logger.info(f"{self.log_prefix} LLM判断当前不需要查询任何信息{content_json.get('none', '')}")
# 跳过新的信息提取,但仍会处理已有缓存 # 跳过新的信息提取,但仍会处理已有缓存
else: else:
# 收集即时提取任务 info_type = content_json.get("info_type")
instant_tasks = [] if info_type:
async_tasks = []
person_info_manager = get_person_info_manager()
for person_name, info_type in content_json.items():
is_bot = (
person_name == global_config.bot.nickname or person_name in global_config.bot.alias_names
)
if is_bot:
person_id = person_info_manager.get_person_id("system", "bot_id")
logger.info(f"{self.log_prefix} 检测到对bot自身({person_name})的信息查询使用特殊ID。")
else:
person_id = person_info_manager.get_person_id_by_person_name(person_name)
if not person_id:
logger.warning(f"{self.log_prefix} 未找到用户 {person_name} 的ID跳过调取信息。")
continue
self.info_fetching_cache.append( self.info_fetching_cache.append(
{ {
"person_id": person_id, "person_id": person_id,
"person_name": person_name, "person_name": sender,
"info_type": info_type, "info_type": info_type,
"start_time": time.time(), "start_time": time.time(),
"forget": False, "forget": False,
@@ -692,22 +660,12 @@ class PersonImpressionpProcessor(BaseProcessor):
if len(self.info_fetching_cache) > 20: if len(self.info_fetching_cache) > 20:
self.info_fetching_cache.pop(0) self.info_fetching_cache.pop(0)
logger.info(f"{self.log_prefix} 调取用户 {person_name} {info_type} 信息。") logger.info(f"{self.log_prefix} 调取用户 {sender}[{info_type}]信息。")
# 收集即时提取任务 # 执行信息提取
instant_tasks.append((person_id, info_type, time.time())) await self._fetch_single_info_instant(person_id, info_type, time.time())
else:
# 执行即时提取任务 logger.warning(f"{self.log_prefix} LLM did not return a valid info_type. Response: {content}")
if instant_tasks:
await self._execute_instant_extraction_batch(instant_tasks)
# 启动异步任务(如果不是即时模式)
if async_tasks:
# 异步任务不需要等待完成
pass
else:
logger.warning(f"{self.log_prefix} LLM返回空结果关系识别失败。")
except Exception as e: except Exception as e:
logger.error(f"{self.log_prefix} 执行LLM请求或处理响应时出错: {e}") logger.error(f"{self.log_prefix} 执行LLM请求或处理响应时出错: {e}")
@@ -838,31 +796,6 @@ class PersonImpressionpProcessor(BaseProcessor):
# 负责实时分析对话需求、提取用户信息、管理信息缓存 # 负责实时分析对话需求、提取用户信息、管理信息缓存
# ================================ # ================================
async def _execute_instant_extraction_batch(self, instant_tasks: list):
"""
批量执行即时提取任务
"""
if not instant_tasks:
return
logger.info(f"{self.log_prefix} [即时提取] 开始批量提取 {len(instant_tasks)} 个信息")
# 创建所有提取任务
extraction_tasks = []
for person_id, info_type, start_time in instant_tasks:
# 检查缓存中是否已存在且未过期的信息
if person_id in self.info_fetched_cache and info_type in self.info_fetched_cache[person_id]:
logger.debug(f"{self.log_prefix} 用户 {person_id}{info_type} 信息已存在且未过期,跳过调取。")
continue
task = asyncio.create_task(self._fetch_single_info_instant(person_id, info_type, start_time))
extraction_tasks.append(task)
# 并行执行所有提取任务并等待完成
if extraction_tasks:
await asyncio.gather(*extraction_tasks, return_exceptions=True)
logger.info(f"{self.log_prefix} [即时提取] 批量提取完成")
async def _fetch_single_info_instant(self, person_id: str, info_type: str, start_time: float): async def _fetch_single_info_instant(self, person_id: str, info_type: str, start_time: float):
""" """
使用小模型提取单个信息类型 使用小模型提取单个信息类型
@@ -890,7 +823,7 @@ class PersonImpressionpProcessor(BaseProcessor):
self.info_fetched_cache[person_id][info_type] = { self.info_fetched_cache[person_id][info_type] = {
"info": cached_info, "info": cached_info,
"ttl": 4, "ttl": 2,
"start_time": start_time, "start_time": start_time,
"person_name": person_name, "person_name": person_name,
"unknow": cached_info == "none", "unknow": cached_info == "none",
@@ -898,8 +831,6 @@ class PersonImpressionpProcessor(BaseProcessor):
logger.info(f"{self.log_prefix} 记得 {person_name}{info_type}: {cached_info}") logger.info(f"{self.log_prefix} 记得 {person_name}{info_type}: {cached_info}")
return return
bot_person_id = PersonInfoManager.get_person_id("system", "bot_id")
is_bot = person_id == bot_person_id
try: try:
person_name = await person_info_manager.get_value(person_id, "person_name") person_name = await person_info_manager.get_value(person_id, "person_name")
@@ -923,7 +854,7 @@ class PersonImpressionpProcessor(BaseProcessor):
self.info_fetched_cache[person_id] = {} self.info_fetched_cache[person_id] = {}
self.info_fetched_cache[person_id][info_type] = { self.info_fetched_cache[person_id][info_type] = {
"info": "none", "info": "none",
"ttl": 4, "ttl": 2,
"start_time": start_time, "start_time": start_time,
"person_name": person_name, "person_name": person_name,
"unknow": True, "unknow": True,
@@ -932,27 +863,18 @@ class PersonImpressionpProcessor(BaseProcessor):
await self._save_info_to_cache(person_id, info_type, "none") await self._save_info_to_cache(person_id, info_type, "none")
return return
if is_bot: nickname_str = ",".join(global_config.bot.alias_names)
prompt = (await global_prompt_manager.get_prompt_async("fetch_bot_info_prompt")).format( name_block = (
nickname=global_config.bot.nickname, f"你的名字是{global_config.bot.nickname},你的昵称有{nickname_str},有人也会用这些昵称称呼你。"
alias_names=",".join(global_config.bot.alias_names), )
info_type=info_type, prompt = (await global_prompt_manager.get_prompt_async("fetch_person_info_prompt")).format(
person_impression_block=person_impression_block, name_block=name_block,
points_text_block=points_text_block, info_type=info_type,
) person_impression_block=person_impression_block,
else: person_name=person_name,
nickname_str = ",".join(global_config.bot.alias_names) info_json_str=f'"{info_type}": "有关{info_type}的信息内容"',
name_block = ( points_text_block=points_text_block,
f"你的名字是{global_config.bot.nickname},你的昵称有{nickname_str},有人也会用这些昵称称呼你。" )
)
prompt = (await global_prompt_manager.get_prompt_async("fetch_person_info_prompt")).format(
name_block=name_block,
info_type=info_type,
person_impression_block=person_impression_block,
person_name=person_name,
info_json_str=f'"{info_type}": "有关{info_type}的信息内容"',
points_text_block=points_text_block,
)
except Exception: except Exception:
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
return return
@@ -972,7 +894,7 @@ class PersonImpressionpProcessor(BaseProcessor):
self.info_fetched_cache[person_id] = {} self.info_fetched_cache[person_id] = {}
self.info_fetched_cache[person_id][info_type] = { self.info_fetched_cache[person_id][info_type] = {
"info": "unknow" if is_unknown else info_content, "info": "unknow" if is_unknown else info_content,
"ttl": 8, "ttl": 3,
"start_time": start_time, "start_time": start_time,
"person_name": person_name, "person_name": person_name,
"unknow": is_unknown, "unknow": is_unknown,

View File

@@ -47,13 +47,20 @@ class ToolProcessor(BaseProcessor):
) )
self.structured_info = [] self.structured_info = []
async def process_info(self, observations: Optional[List[Observation]] = None) -> List[StructuredInfo]: async def process_info(
self,
observations: List[Observation] = None,
action_type: str = None,
action_data: dict = None,
**kwargs,
) -> List[StructuredInfo]:
"""处理信息对象 """处理信息对象
Args: Args:
observations: 可选的观察列表包含ChattingObservation和StructureObservation类型 observations: 可选的观察列表包含ChattingObservation和StructureObservation类型
running_memories: 可选的运行时记忆列表,包含字典类型的记忆信息 action_type: 动作类型
*infos: 可变数量的InfoBase类型的信息对象 action_data: 动作数据
**kwargs: 其他可选参数
Returns: Returns:
list: 处理后的结构化信息列表 list: 处理后的结构化信息列表
@@ -85,7 +92,9 @@ class ToolProcessor(BaseProcessor):
return [structured_info] return [structured_info]
async def execute_tools(self, observation: ChattingObservation): async def execute_tools(
self, observation: ChattingObservation, action_type: str = None, action_data: dict = None
):
""" """
并行执行工具,返回结构化信息 并行执行工具,返回结构化信息
@@ -95,6 +104,8 @@ class ToolProcessor(BaseProcessor):
is_group_chat: 是否为群聊默认为False is_group_chat: 是否为群聊默认为False
return_details: 是否返回详细信息默认为False return_details: 是否返回详细信息默认为False
cycle_info: 循环信息对象,可用于记录详细执行信息 cycle_info: 循环信息对象,可用于记录详细执行信息
action_type: 动作类型
action_data: 动作数据
返回: 返回:
如果return_details为False: 如果return_details为False:

View File

@@ -470,7 +470,7 @@ class ActionModifier:
response = response.strip().lower() response = response.strip().lower()
# print(base_prompt) # print(base_prompt)
print(f"LLM判定动作 {action_name}:响应='{response}'") # print(f"LLM判定动作 {action_name}:响应='{response}'")
should_activate = "" in response or "yes" in response or "true" in response should_activate = "" in response or "yes" in response or "true" in response

View File

@@ -183,7 +183,7 @@ class NormalChat:
"message_count": self._count_messages_in_timerange(potential_start_time, message_time), "message_count": self._count_messages_in_timerange(potential_start_time, message_time),
} }
segments.append(new_segment) segments.append(new_segment)
logger.info( logger.debug(
f"[{self.stream_name}] 为用户 {person_id} 创建新消息段: 时间范围 {time.strftime('%H:%M:%S', time.localtime(potential_start_time))} - {time.strftime('%H:%M:%S', time.localtime(message_time))}, 消息数: {new_segment['message_count']}" f"[{self.stream_name}] 为用户 {person_id} 创建新消息段: 时间范围 {time.strftime('%H:%M:%S', time.localtime(potential_start_time))} - {time.strftime('%H:%M:%S', time.localtime(message_time))}, 消息数: {new_segment['message_count']}"
) )
self._save_cache() self._save_cache()
@@ -230,7 +230,7 @@ class NormalChat:
"message_count": self._count_messages_in_timerange(potential_start_time, message_time), "message_count": self._count_messages_in_timerange(potential_start_time, message_time),
} }
segments.append(new_segment) segments.append(new_segment)
logger.info(f"[{self.stream_name}] 为用户 {person_id} 创建新消息段超过10条消息间隔: {new_segment}") logger.debug(f"[{self.stream_name}] 为用户 {person_id} 创建新消息段超过10条消息间隔: {new_segment}")
self._save_cache() self._save_cache()

View File

@@ -53,7 +53,9 @@ async def generate_with_model(
Tuple[bool, str, str, str]: (是否成功, 生成的内容, 推理过程, 模型名称) Tuple[bool, str, str, str]: (是否成功, 生成的内容, 推理过程, 模型名称)
""" """
try: try:
logger.info(f"[LLMAPI] 使用模型生成内容,提示词: {prompt[:200]}...") model_name = model_config.get("name")
logger.info(f"[LLMAPI] 使用模型 {model_name} 生成内容")
logger.debug(f"[LLMAPI] 完整提示词: {prompt}")
llm_request = LLMRequest(model=model_config, request_type=request_type, **kwargs) llm_request = LLMRequest(model=model_config, request_type=request_type, **kwargs)

View File

@@ -343,14 +343,15 @@ class NoReplyAction(BaseAction):
if success and response: if success and response:
response = response.strip() response = response.strip()
logger.info(f"{self.log_prefix} 模型({model_name})原始JSON响应: {response}") logger.debug(f"{self.log_prefix} 模型({model_name})原始JSON响应: {response}")
# 解析LLM的JSON响应提取判断结果和理由 # 解析LLM的JSON响应提取判断结果和理由
judge_result, reason = self._parse_llm_judge_response(response) judge_result, reason = self._parse_llm_judge_response(response)
logger.info( if judge_result:
f"{self.log_prefix} JSON解析结果 - 判断: {judge_result}, 理由: {reason}" logger.info(f"{self.log_prefix} 决定继续参与讨论,结束等待,原因: {reason}")
) else:
logger.info(f"{self.log_prefix} 决定不参与讨论,继续等待,原因: {reason}")
# 将判断结果保存到历史中 # 将判断结果保存到历史中
judge_history.append((current_time, judge_result, reason)) judge_history.append((current_time, judge_result, reason))