diff --git a/src/chat/focus_chat/info_processors/relationship_processor.py b/src/chat/focus_chat/info_processors/relationship_processor.py index c0db8df19..0436b5e50 100644 --- a/src/chat/focus_chat/info_processors/relationship_processor.py +++ b/src/chat/focus_chat/info_processors/relationship_processor.py @@ -19,6 +19,11 @@ import json import asyncio from src.chat.utils.chat_message_builder import get_raw_msg_by_timestamp_with_chat +# 配置常量:是否启用小模型即时信息提取 +# 开启时:使用小模型并行即时提取,速度更快,但精度可能略低 +# 关闭时:使用原来的异步模式,精度更高但速度较慢 +ENABLE_INSTANT_INFO_EXTRACTION = True + logger = get_logger("processor") @@ -96,6 +101,13 @@ class RelationshipProcessor(BaseProcessor): model=global_config.model.relation, request_type="focus.relationship", ) + + # 小模型用于即时信息提取 + if ENABLE_INSTANT_INFO_EXTRACTION: + self.instant_llm_model = LLMRequest( + model=global_config.model.utils_small, + request_type="focus.relationship.instant", + ) name = chat_manager.get_stream_name(self.subheartflow_id) self.log_prefix = f"[{name}] " @@ -151,9 +163,9 @@ class RelationshipProcessor(BaseProcessor): # 根据消息数量和时间设置不同的触发条件 should_trigger = ( message_count >= 50 or # 50条消息必定满足 - (message_count >= 35 and time_elapsed >= 600) or # 35条且10分钟 - (message_count >= 25 and time_elapsed >= 1800) or # 25条且30分钟 - (message_count >= 10 and time_elapsed >= 3600) # 10条且1小时 + (message_count >= 35 and time_elapsed >= 300) or # 35条且10分钟 + (message_count >= 25 and time_elapsed >= 900) or # 25条且30分钟 + (message_count >= 10 and time_elapsed >= 2000) # 10条且1小时 ) if should_trigger: @@ -219,6 +231,10 @@ class RelationshipProcessor(BaseProcessor): print(f"content: {content}") content_json = json.loads(repair_json(content)) + # 收集即时提取任务 + instant_tasks = [] + async_tasks = [] + for person_name, info_type in content_json.items(): person_id = person_info_manager.get_person_id_by_person_name(person_name) if person_id: @@ -233,6 +249,7 @@ class RelationshipProcessor(BaseProcessor): self.info_fetching_cache.pop(0) else: logger.warning(f"{self.log_prefix} 未找到用户 {person_name} 的ID,跳过调取信息。") + continue logger.info(f"{self.log_prefix} 调取用户 {person_name} 的 {info_type} 信息。") @@ -244,7 +261,22 @@ class RelationshipProcessor(BaseProcessor): "start_time": time.time(), "rounds": 0 }) - asyncio.create_task(self.fetch_person_info(person_id, [info_type], start_time=time.time())) + + if ENABLE_INSTANT_INFO_EXTRACTION: + # 收集即时提取任务 + instant_tasks.append((person_id, info_type, time.time())) + else: + # 使用原来的异步模式 + async_tasks.append(asyncio.create_task(self.fetch_person_info(person_id, [info_type], start_time=time.time()))) + + # 执行即时提取任务 + if ENABLE_INSTANT_INFO_EXTRACTION and instant_tasks: + await self._execute_instant_extraction_batch(instant_tasks) + + # 启动异步任务(如果不是即时模式) + if async_tasks: + # 异步任务不需要等待完成 + pass else: logger.warning(f"{self.log_prefix} LLM返回空结果,关系识别失败。") @@ -265,38 +297,137 @@ class RelationshipProcessor(BaseProcessor): info_content = self.info_fetched_cache[person_id][info_type]["info"] person_infos_str += f"[{info_type}]:{info_content};" else: - person_infos_str += f"你不了解{person_name}有关[{info_type}]的信息,不要胡乱回答;" + person_infos_str += f"你不了解{person_name}有关[{info_type}]的信息,不要胡乱回答,你可以直接说你不知道,或者你忘记了;" if person_infos_str: persons_infos_str += f"你对 {person_name} 的了解:{person_infos_str}\n" - # 处理正在调取但还没有结果的项目 - pending_info_dict = {} - for record in self.info_fetching_cache: - if not record["forget"]: - current_time = time.time() - # 只处理不超过2分钟的调取请求,避免过期请求一直显示 - if current_time - record["start_time"] <= 120: # 10分钟内的请求 - person_id = record["person_id"] - person_name = record["person_name"] - info_type = record["info_type"] - - # 检查是否已经在info_fetched_cache中有结果 - if (person_id in self.info_fetched_cache and - info_type in self.info_fetched_cache[person_id]): - continue - - # 按人物组织正在调取的信息 - if person_name not in pending_info_dict: - pending_info_dict[person_name] = [] - pending_info_dict[person_name].append(info_type) - - # 添加正在调取的信息到返回字符串 - for person_name, info_types in pending_info_dict.items(): - info_types_str = "、".join(info_types) - persons_infos_str += f"你正在识图回忆有关 {person_name} 的 {info_types_str} 信息,稍等一下再回答...\n" + # 处理正在调取但还没有结果的项目(只在非即时提取模式下显示) + if not ENABLE_INSTANT_INFO_EXTRACTION: + pending_info_dict = {} + for record in self.info_fetching_cache: + if not record["forget"]: + current_time = time.time() + # 只处理不超过2分钟的调取请求,避免过期请求一直显示 + if current_time - record["start_time"] <= 120: # 10分钟内的请求 + person_id = record["person_id"] + person_name = record["person_name"] + info_type = record["info_type"] + + # 检查是否已经在info_fetched_cache中有结果 + if (person_id in self.info_fetched_cache and + info_type in self.info_fetched_cache[person_id]): + continue + + # 按人物组织正在调取的信息 + if person_name not in pending_info_dict: + pending_info_dict[person_name] = [] + pending_info_dict[person_name].append(info_type) + + # 添加正在调取的信息到返回字符串 + for person_name, info_types in pending_info_dict.items(): + info_types_str = "、".join(info_types) + persons_infos_str += f"你正在识图回忆有关 {person_name} 的 {info_types_str} 信息,稍等一下再回答...\n" return persons_infos_str + async def _execute_instant_extraction_batch(self, instant_tasks: list): + """ + 批量执行即时提取任务 + """ + if not instant_tasks: + return + + logger.info(f"{self.log_prefix} [即时提取] 开始批量提取 {len(instant_tasks)} 个信息") + + # 创建所有提取任务 + extraction_tasks = [] + for person_id, info_type, start_time in instant_tasks: + # 检查缓存中是否已存在且未过期的信息 + if (person_id in self.info_fetched_cache and + info_type in self.info_fetched_cache[person_id]): + logger.info(f"{self.log_prefix} 用户 {person_id} 的 {info_type} 信息已存在且未过期,跳过调取。") + continue + + task = asyncio.create_task(self._fetch_single_info_instant(person_id, info_type, start_time)) + extraction_tasks.append(task) + + # 并行执行所有提取任务并等待完成 + if extraction_tasks: + await asyncio.gather(*extraction_tasks, return_exceptions=True) + logger.info(f"{self.log_prefix} [即时提取] 批量提取完成") + + + async def _fetch_single_info_instant(self, person_id: str, info_type: str, start_time: float): + """ + 使用小模型提取单个信息类型 + """ + nickname_str = ",".join(global_config.bot.alias_names) + name_block = f"你的名字是{global_config.bot.nickname},你的昵称有{nickname_str},有人也会用这些昵称称呼你。" + + person_name = await person_info_manager.get_value(person_id, "person_name") + + person_impression = await person_info_manager.get_value(person_id, "impression") + if not person_impression: + impression_block = "你对ta没有什么深刻的印象" + else: + impression_block = f"{person_impression}" + + points = await person_info_manager.get_value(person_id, "points") + if points: + points_text = "\n".join([ + f"{point[2]}:{point[0]}" + for point in points + ]) + else: + points_text = "你不记得ta最近发生了什么" + + prompt = (await global_prompt_manager.get_prompt_async("fetch_info_prompt")).format( + name_block=name_block, + info_type=info_type, + person_impression=impression_block, + person_name=person_name, + info_json_str=f'"{info_type}": "信息内容"', + points_text=points_text, + ) + + try: + # 使用小模型进行即时提取 + content, _ = await self.instant_llm_model.generate_response_async(prompt=prompt) + + logger.info(f"{self.log_prefix} [即时提取] {person_name} 的 {info_type} 结果: {content}") + + if content: + content_json = json.loads(repair_json(content)) + if info_type in content_json: + info_content = content_json[info_type] + if info_content != "none" and info_content: + if person_id not in self.info_fetched_cache: + self.info_fetched_cache[person_id] = {} + self.info_fetched_cache[person_id][info_type] = { + "info": info_content, + "ttl": 8, # 小模型提取的信息TTL稍短 + "start_time": start_time, + "person_name": person_name, + "unknow": False, + } + logger.info(f"{self.log_prefix} [即时提取] 成功获取 {person_name} 的 {info_type}: {info_content}") + else: + if person_id not in self.info_fetched_cache: + self.info_fetched_cache[person_id] = {} + self.info_fetched_cache[person_id][info_type] = { + "info": "unknow", + "ttl": 8, + "start_time": start_time, + "person_name": person_name, + "unknow": True, + } + logger.info(f"{self.log_prefix} [即时提取] {person_name} 的 {info_type} 信息不明确") + else: + logger.warning(f"{self.log_prefix} [即时提取] 小模型返回空结果,获取 {person_name} 的 {info_type} 信息失败。") + except Exception as e: + logger.error(f"{self.log_prefix} [即时提取] 执行小模型请求获取用户信息时出错: {e}") + logger.error(traceback.format_exc()) + async def fetch_person_info(self, person_id: str, info_types: list[str], start_time: float): """ 获取某个人的信息 diff --git a/src/chat/focus_chat/info_processors/self_processor.py b/src/chat/focus_chat/info_processors/self_processor.py index 36dc3c950..f21a1d3b1 100644 --- a/src/chat/focus_chat/info_processors/self_processor.py +++ b/src/chat/focus_chat/info_processors/self_processor.py @@ -133,6 +133,9 @@ class SelfProcessor(BaseProcessor): name_block = f"你的名字是{global_config.bot.nickname},你的昵称有{nickname_str},有人也会用这些昵称称呼你。" personality_block = individuality.get_personality_prompt(x_person=2, level=2) + + + identity_block = individuality.get_identity_prompt(x_person=2, level=2) prompt = (await global_prompt_manager.get_prompt_async("indentify_prompt")).format(