fix:使用即时提取relation值

This commit is contained in:
SengokuCola
2025-06-09 19:31:59 +08:00
parent 95cb24c11d
commit 03a3be18aa
2 changed files with 164 additions and 30 deletions

View File

@@ -19,6 +19,11 @@ import json
import asyncio import asyncio
from src.chat.utils.chat_message_builder import get_raw_msg_by_timestamp_with_chat from src.chat.utils.chat_message_builder import get_raw_msg_by_timestamp_with_chat
# 配置常量:是否启用小模型即时信息提取
# 开启时:使用小模型并行即时提取,速度更快,但精度可能略低
# 关闭时:使用原来的异步模式,精度更高但速度较慢
ENABLE_INSTANT_INFO_EXTRACTION = True
logger = get_logger("processor") logger = get_logger("processor")
@@ -96,6 +101,13 @@ class RelationshipProcessor(BaseProcessor):
model=global_config.model.relation, model=global_config.model.relation,
request_type="focus.relationship", request_type="focus.relationship",
) )
# 小模型用于即时信息提取
if ENABLE_INSTANT_INFO_EXTRACTION:
self.instant_llm_model = LLMRequest(
model=global_config.model.utils_small,
request_type="focus.relationship.instant",
)
name = chat_manager.get_stream_name(self.subheartflow_id) name = chat_manager.get_stream_name(self.subheartflow_id)
self.log_prefix = f"[{name}] " self.log_prefix = f"[{name}] "
@@ -151,9 +163,9 @@ class RelationshipProcessor(BaseProcessor):
# 根据消息数量和时间设置不同的触发条件 # 根据消息数量和时间设置不同的触发条件
should_trigger = ( should_trigger = (
message_count >= 50 or # 50条消息必定满足 message_count >= 50 or # 50条消息必定满足
(message_count >= 35 and time_elapsed >= 600) or # 35条且10分钟 (message_count >= 35 and time_elapsed >= 300) or # 35条且10分钟
(message_count >= 25 and time_elapsed >= 1800) or # 25条且30分钟 (message_count >= 25 and time_elapsed >= 900) or # 25条且30分钟
(message_count >= 10 and time_elapsed >= 3600) # 10条且1小时 (message_count >= 10 and time_elapsed >= 2000) # 10条且1小时
) )
if should_trigger: if should_trigger:
@@ -219,6 +231,10 @@ class RelationshipProcessor(BaseProcessor):
print(f"content: {content}") print(f"content: {content}")
content_json = json.loads(repair_json(content)) content_json = json.loads(repair_json(content))
# 收集即时提取任务
instant_tasks = []
async_tasks = []
for person_name, info_type in content_json.items(): for person_name, info_type in content_json.items():
person_id = person_info_manager.get_person_id_by_person_name(person_name) person_id = person_info_manager.get_person_id_by_person_name(person_name)
if person_id: if person_id:
@@ -233,6 +249,7 @@ class RelationshipProcessor(BaseProcessor):
self.info_fetching_cache.pop(0) self.info_fetching_cache.pop(0)
else: else:
logger.warning(f"{self.log_prefix} 未找到用户 {person_name} 的ID跳过调取信息。") logger.warning(f"{self.log_prefix} 未找到用户 {person_name} 的ID跳过调取信息。")
continue
logger.info(f"{self.log_prefix} 调取用户 {person_name}{info_type} 信息。") logger.info(f"{self.log_prefix} 调取用户 {person_name}{info_type} 信息。")
@@ -244,7 +261,22 @@ class RelationshipProcessor(BaseProcessor):
"start_time": time.time(), "start_time": time.time(),
"rounds": 0 "rounds": 0
}) })
asyncio.create_task(self.fetch_person_info(person_id, [info_type], start_time=time.time()))
if ENABLE_INSTANT_INFO_EXTRACTION:
# 收集即时提取任务
instant_tasks.append((person_id, info_type, time.time()))
else:
# 使用原来的异步模式
async_tasks.append(asyncio.create_task(self.fetch_person_info(person_id, [info_type], start_time=time.time())))
# 执行即时提取任务
if ENABLE_INSTANT_INFO_EXTRACTION and instant_tasks:
await self._execute_instant_extraction_batch(instant_tasks)
# 启动异步任务(如果不是即时模式)
if async_tasks:
# 异步任务不需要等待完成
pass
else: else:
logger.warning(f"{self.log_prefix} LLM返回空结果关系识别失败。") logger.warning(f"{self.log_prefix} LLM返回空结果关系识别失败。")
@@ -265,38 +297,137 @@ class RelationshipProcessor(BaseProcessor):
info_content = self.info_fetched_cache[person_id][info_type]["info"] info_content = self.info_fetched_cache[person_id][info_type]["info"]
person_infos_str += f"[{info_type}]{info_content}" person_infos_str += f"[{info_type}]{info_content}"
else: else:
person_infos_str += f"你不了解{person_name}有关[{info_type}]的信息,不要胡乱回答;" person_infos_str += f"你不了解{person_name}有关[{info_type}]的信息,不要胡乱回答,你可以直接说你不知道,或者你忘记了"
if person_infos_str: if person_infos_str:
persons_infos_str += f"你对 {person_name} 的了解:{person_infos_str}\n" persons_infos_str += f"你对 {person_name} 的了解:{person_infos_str}\n"
# 处理正在调取但还没有结果的项目 # 处理正在调取但还没有结果的项目(只在非即时提取模式下显示)
pending_info_dict = {} if not ENABLE_INSTANT_INFO_EXTRACTION:
for record in self.info_fetching_cache: pending_info_dict = {}
if not record["forget"]: for record in self.info_fetching_cache:
current_time = time.time() if not record["forget"]:
# 只处理不超过2分钟的调取请求避免过期请求一直显示 current_time = time.time()
if current_time - record["start_time"] <= 120: # 10分钟内的请求 # 只处理不超过2分钟的调取请求避免过期请求一直显示
person_id = record["person_id"] if current_time - record["start_time"] <= 120: # 10分钟内的请求
person_name = record["person_name"] person_id = record["person_id"]
info_type = record["info_type"] person_name = record["person_name"]
info_type = record["info_type"]
# 检查是否已经在info_fetched_cache中有结果
if (person_id in self.info_fetched_cache and # 检查是否已经在info_fetched_cache中有结果
info_type in self.info_fetched_cache[person_id]): if (person_id in self.info_fetched_cache and
continue info_type in self.info_fetched_cache[person_id]):
continue
# 按人物组织正在调取的信息
if person_name not in pending_info_dict: # 按人物组织正在调取的信息
pending_info_dict[person_name] = [] if person_name not in pending_info_dict:
pending_info_dict[person_name].append(info_type) pending_info_dict[person_name] = []
pending_info_dict[person_name].append(info_type)
# 添加正在调取的信息到返回字符串
for person_name, info_types in pending_info_dict.items(): # 添加正在调取的信息到返回字符串
info_types_str = "".join(info_types) for person_name, info_types in pending_info_dict.items():
persons_infos_str += f"你正在识图回忆有关 {person_name}{info_types_str} 信息,稍等一下再回答...\n" info_types_str = "".join(info_types)
persons_infos_str += f"你正在识图回忆有关 {person_name}{info_types_str} 信息,稍等一下再回答...\n"
return persons_infos_str return persons_infos_str
async def _execute_instant_extraction_batch(self, instant_tasks: list):
"""
批量执行即时提取任务
"""
if not instant_tasks:
return
logger.info(f"{self.log_prefix} [即时提取] 开始批量提取 {len(instant_tasks)} 个信息")
# 创建所有提取任务
extraction_tasks = []
for person_id, info_type, start_time in instant_tasks:
# 检查缓存中是否已存在且未过期的信息
if (person_id in self.info_fetched_cache and
info_type in self.info_fetched_cache[person_id]):
logger.info(f"{self.log_prefix} 用户 {person_id}{info_type} 信息已存在且未过期,跳过调取。")
continue
task = asyncio.create_task(self._fetch_single_info_instant(person_id, info_type, start_time))
extraction_tasks.append(task)
# 并行执行所有提取任务并等待完成
if extraction_tasks:
await asyncio.gather(*extraction_tasks, return_exceptions=True)
logger.info(f"{self.log_prefix} [即时提取] 批量提取完成")
async def _fetch_single_info_instant(self, person_id: str, info_type: str, start_time: float):
"""
使用小模型提取单个信息类型
"""
nickname_str = ",".join(global_config.bot.alias_names)
name_block = f"你的名字是{global_config.bot.nickname},你的昵称有{nickname_str},有人也会用这些昵称称呼你。"
person_name = await person_info_manager.get_value(person_id, "person_name")
person_impression = await person_info_manager.get_value(person_id, "impression")
if not person_impression:
impression_block = "你对ta没有什么深刻的印象"
else:
impression_block = f"{person_impression}"
points = await person_info_manager.get_value(person_id, "points")
if points:
points_text = "\n".join([
f"{point[2]}:{point[0]}"
for point in points
])
else:
points_text = "你不记得ta最近发生了什么"
prompt = (await global_prompt_manager.get_prompt_async("fetch_info_prompt")).format(
name_block=name_block,
info_type=info_type,
person_impression=impression_block,
person_name=person_name,
info_json_str=f'"{info_type}": "信息内容"',
points_text=points_text,
)
try:
# 使用小模型进行即时提取
content, _ = await self.instant_llm_model.generate_response_async(prompt=prompt)
logger.info(f"{self.log_prefix} [即时提取] {person_name}{info_type} 结果: {content}")
if content:
content_json = json.loads(repair_json(content))
if info_type in content_json:
info_content = content_json[info_type]
if info_content != "none" and info_content:
if person_id not in self.info_fetched_cache:
self.info_fetched_cache[person_id] = {}
self.info_fetched_cache[person_id][info_type] = {
"info": info_content,
"ttl": 8, # 小模型提取的信息TTL稍短
"start_time": start_time,
"person_name": person_name,
"unknow": False,
}
logger.info(f"{self.log_prefix} [即时提取] 成功获取 {person_name}{info_type}: {info_content}")
else:
if person_id not in self.info_fetched_cache:
self.info_fetched_cache[person_id] = {}
self.info_fetched_cache[person_id][info_type] = {
"info": "unknow",
"ttl": 8,
"start_time": start_time,
"person_name": person_name,
"unknow": True,
}
logger.info(f"{self.log_prefix} [即时提取] {person_name}{info_type} 信息不明确")
else:
logger.warning(f"{self.log_prefix} [即时提取] 小模型返回空结果,获取 {person_name}{info_type} 信息失败。")
except Exception as e:
logger.error(f"{self.log_prefix} [即时提取] 执行小模型请求获取用户信息时出错: {e}")
logger.error(traceback.format_exc())
async def fetch_person_info(self, person_id: str, info_types: list[str], start_time: float): async def fetch_person_info(self, person_id: str, info_types: list[str], start_time: float):
""" """
获取某个人的信息 获取某个人的信息

View File

@@ -133,6 +133,9 @@ class SelfProcessor(BaseProcessor):
name_block = f"你的名字是{global_config.bot.nickname},你的昵称有{nickname_str},有人也会用这些昵称称呼你。" name_block = f"你的名字是{global_config.bot.nickname},你的昵称有{nickname_str},有人也会用这些昵称称呼你。"
personality_block = individuality.get_personality_prompt(x_person=2, level=2) personality_block = individuality.get_personality_prompt(x_person=2, level=2)
identity_block = individuality.get_identity_prompt(x_person=2, level=2) identity_block = individuality.get_identity_prompt(x_person=2, level=2)
prompt = (await global_prompt_manager.get_prompt_async("indentify_prompt")).format( prompt = (await global_prompt_manager.get_prompt_async("indentify_prompt")).format(