diff --git a/src/chat/focus_chat/heartFC_chat.py b/src/chat/focus_chat/heartFC_chat.py index 78ca00192..b3fedc4d5 100644 --- a/src/chat/focus_chat/heartFC_chat.py +++ b/src/chat/focus_chat/heartFC_chat.py @@ -139,14 +139,14 @@ class HeartFChatting: # 检查全局关系开关 if not global_config.relationship.enable_relationship: continue - + # 检查处理器特定配置,同时支持向后兼容 processor_enabled = getattr(config_processor_settings, config_key, True) - + # 向后兼容:如果旧的person_impression_processor为True,则启用两个新处理器 if not processor_enabled and getattr(config_processor_settings, "person_impression_processor", True): processor_enabled = True - + if processor_enabled: self.enabled_post_planning_processor_names.append(proc_name) else: diff --git a/src/chat/focus_chat/info_processors/real_time_info_processor.py b/src/chat/focus_chat/info_processors/real_time_info_processor.py index 6536ef6ec..a25fcf7cb 100644 --- a/src/chat/focus_chat/info_processors/real_time_info_processor.py +++ b/src/chat/focus_chat/info_processors/real_time_info_processor.py @@ -63,20 +63,20 @@ def init_real_time_info_prompts(): class RealTimeInfoProcessor(BaseProcessor): """实时信息提取处理器 - + 负责从对话中识别需要的用户信息,并从用户档案中实时提取相关信息 """ - + log_prefix = "实时信息" def __init__(self, subheartflow_id: str): super().__init__() - + self.subheartflow_id = subheartflow_id - + # 信息获取缓存:记录正在获取的信息请求 self.info_fetching_cache: List[Dict[str, any]] = [] - + # 信息结果缓存:存储已获取的信息结果,带TTL self.info_fetched_cache: Dict[str, Dict[str, any]] = {} # 结构:{person_id: {info_type: {"info": str, "ttl": int, "start_time": float, "person_name": str, "unknow": bool}}} @@ -94,6 +94,7 @@ class RealTimeInfoProcessor(BaseProcessor): ) from src.chat.message_receive.chat_stream import get_chat_manager + name = get_chat_manager().get_stream_name(self.subheartflow_id) self.log_prefix = f"[{name}] 实时信息" @@ -105,21 +106,21 @@ class RealTimeInfoProcessor(BaseProcessor): **kwargs, ) -> List[InfoBase]: """处理信息对象 - + Args: observations: 观察对象列表 action_type: 动作类型 action_data: 动作数据 - + Returns: List[InfoBase]: 处理后的结构化信息列表 """ # 清理过期的信息缓存 self._cleanup_expired_cache() - + # 执行实时信息识别和提取 relation_info_str = await self._identify_and_extract_info(observations, action_type, action_data) - + if relation_info_str: relation_info = RelationInfo() relation_info.set_relation_info(relation_info_str) @@ -144,12 +145,12 @@ class RealTimeInfoProcessor(BaseProcessor): action_data: dict = None, ) -> str: """识别并提取用户信息 - + Args: observations: 观察对象列表 action_type: 动作类型 action_data: 动作数据 - + Returns: str: 提取到的用户信息字符串 """ @@ -178,7 +179,7 @@ class RealTimeInfoProcessor(BaseProcessor): # 识别需要提取的信息类型 info_type = await self._identify_needed_info(chat_observe_info, sender, text) - + # 如果需要提取新信息,执行提取 if info_type: await self._extract_single_info(person_id, info_type, sender) @@ -188,10 +189,10 @@ class RealTimeInfoProcessor(BaseProcessor): def _parse_reply_target(self, target_message: str) -> tuple: """解析回复目标消息 - + Args: target_message: 目标消息,格式为 "用户名:消息内容" - + Returns: tuple: (发送者, 消息内容) """ @@ -213,16 +214,16 @@ class RealTimeInfoProcessor(BaseProcessor): def _extract_chat_observe_info(self, observations: List[Observation]) -> str: """从观察对象中提取聊天信息 - + Args: observations: 观察对象列表 - + Returns: str: 聊天观察信息 """ if not observations: return "" - + for observation in observations: if isinstance(observation, ChattingObservation): return observation.get_observe_info() @@ -230,12 +231,12 @@ class RealTimeInfoProcessor(BaseProcessor): async def _identify_needed_info(self, chat_observe_info: str, sender: str, text: str) -> str: """识别需要提取的信息类型 - + Args: chat_observe_info: 聊天观察信息 sender: 发送者 text: 消息内容 - + Returns: str: 需要提取的信息类型,如果不需要则返回None """ @@ -258,39 +259,41 @@ class RealTimeInfoProcessor(BaseProcessor): try: logger.debug(f"{self.log_prefix} 信息识别prompt: \n{prompt}\n") content, _ = await self.llm_model.generate_response_async(prompt=prompt) - + if content: content_json = json.loads(repair_json(content)) - + # 检查是否返回了不需要查询的标志 if "none" in content_json: logger.info(f"{self.log_prefix} LLM判断当前不需要查询任何信息:{content_json.get('none', '')}") return None - + info_type = content_json.get("info_type") if info_type: # 记录信息获取请求 - self.info_fetching_cache.append({ - "person_id": get_person_info_manager().get_person_id_by_person_name(sender), - "person_name": sender, - "info_type": info_type, - "start_time": time.time(), - "forget": False, - }) - + self.info_fetching_cache.append( + { + "person_id": get_person_info_manager().get_person_id_by_person_name(sender), + "person_name": sender, + "info_type": info_type, + "start_time": time.time(), + "forget": False, + } + ) + # 限制缓存大小 if len(self.info_fetching_cache) > 20: self.info_fetching_cache.pop(0) - + logger.info(f"{self.log_prefix} 识别到需要调取用户 {sender} 的[{info_type}]信息") return info_type else: logger.warning(f"{self.log_prefix} LLM未返回有效的info_type。响应: {content}") - + except Exception as e: logger.error(f"{self.log_prefix} 执行信息识别LLM请求时出错: {e}") logger.error(traceback.format_exc()) - + return None def _build_info_cache_block(self) -> str: @@ -314,7 +317,7 @@ class RealTimeInfoProcessor(BaseProcessor): async def _extract_single_info(self, person_id: str, info_type: str, person_name: str): """提取单个信息类型 - + Args: person_id: 用户ID info_type: 信息类型 @@ -353,7 +356,7 @@ class RealTimeInfoProcessor(BaseProcessor): try: person_impression = await person_info_manager.get_value(person_id, "impression") points = await person_info_manager.get_value(person_id, "points") - + # 构建印象信息块 if person_impression: person_impression_block = ( @@ -387,7 +390,7 @@ class RealTimeInfoProcessor(BaseProcessor): # 使用LLM提取信息 nickname_str = ",".join(global_config.bot.alias_names) name_block = f"你的名字是{global_config.bot.nickname},你的昵称有{nickname_str},有人也会用这些昵称称呼你。" - + prompt = (await global_prompt_manager.get_prompt_async("real_time_fetch_person_info_prompt")).format( name_block=name_block, info_type=info_type, @@ -426,14 +429,14 @@ class RealTimeInfoProcessor(BaseProcessor): logger.info(f"{self.log_prefix} 思考了也不知道{person_name} 的 {info_type} 信息") else: logger.warning(f"{self.log_prefix} 小模型返回空结果,获取 {person_name} 的 {info_type} 信息失败。") - + except Exception as e: logger.error(f"{self.log_prefix} 执行信息提取时出错: {e}") logger.error(traceback.format_exc()) async def _save_info_to_cache(self, person_id: str, info_type: str, info_content: str): """将提取到的信息保存到 person_info 的 info_list 字段中 - + Args: person_id: 用户ID info_type: 信息类型 @@ -476,12 +479,12 @@ class RealTimeInfoProcessor(BaseProcessor): def _organize_known_info(self) -> str: """组织已知的用户信息为字符串 - + Returns: str: 格式化的用户信息字符串 """ persons_infos_str = "" - + if self.info_fetched_cache: persons_with_known_info = [] # 有已知信息的人员 persons_with_unknown_info = [] # 有未知信息的人员 @@ -534,7 +537,7 @@ class RealTimeInfoProcessor(BaseProcessor): status_lines = [f"{self.log_prefix} 实时信息缓存状态:"] status_lines.append(f"获取请求缓存数:{len(self.info_fetching_cache)}") status_lines.append(f"结果缓存用户数:{len(self.info_fetched_cache)}") - + if self.info_fetched_cache: for person_id, info_types in self.info_fetched_cache.items(): person_name = list(info_types.values())[0]["person_name"] if info_types else person_id @@ -544,9 +547,9 @@ class RealTimeInfoProcessor(BaseProcessor): unknow = info_data["unknow"] status = "未知" if unknow else "已知" status_lines.append(f" {info_type}: {status} (TTL: {ttl})") - + return "\n".join(status_lines) # 初始化提示词 -init_real_time_info_prompts() \ No newline at end of file +init_real_time_info_prompts() diff --git a/src/chat/focus_chat/info_processors/relationship_processor.py b/src/chat/focus_chat/info_processors/relationship_processor.py index dff6d0931..5b945fdf1 100644 --- a/src/chat/focus_chat/info_processors/relationship_processor.py +++ b/src/chat/focus_chat/info_processors/relationship_processor.py @@ -1,6 +1,5 @@ from src.chat.heart_flow.observation.chatting_observation import ChattingObservation from src.chat.heart_flow.observation.observation import Observation -from src.llm_models.utils_model import LLMRequest from src.config.config import global_config import time import traceback @@ -36,10 +35,10 @@ logger = get_logger("relationship_build_processor") class RelationshipBuildProcessor(BaseProcessor): """关系构建处理器 - + 负责跟踪用户消息活动、管理消息段、触发关系构建和印象更新 """ - + log_prefix = "关系构建" def __init__(self, subheartflow_id: str): @@ -446,6 +445,7 @@ class RelationshipBuildProcessor(BaseProcessor): segments = self.person_engaged_cache[person_id] # 异步执行关系构建 import asyncio + asyncio.create_task(self.update_impression_on_segments(person_id, self.subheartflow_id, segments)) # 移除已处理的用户缓存 del self.person_engaged_cache[person_id]