From 4cb57278b13a2dab5eb26686774919cb527aceab Mon Sep 17 00:00:00 2001 From: UnCLAS-Prommer Date: Mon, 11 Aug 2025 11:35:14 +0800 Subject: [PATCH] =?UTF-8?q?typing=E5=92=8C=E9=98=B2=E7=82=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chat/chat_loop/heartFC_chat.py | 181 +++++++++++++---------------- src/chat/knowledge/qa_manager.py | 9 +- src/config/config.py | 4 +- 3 files changed, 88 insertions(+), 106 deletions(-) diff --git a/src/chat/chat_loop/heartFC_chat.py b/src/chat/chat_loop/heartFC_chat.py index f416bcecb..a3b841a93 100644 --- a/src/chat/chat_loop/heartFC_chat.py +++ b/src/chat/chat_loop/heartFC_chat.py @@ -11,7 +11,6 @@ from src.common.logger import get_logger from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager from src.chat.utils.prompt_builder import global_prompt_manager from src.chat.utils.timer_calculator import Timer -from src.chat.utils.chat_message_builder import get_raw_msg_by_timestamp_with_chat from src.chat.planner_actions.planner import ActionPlanner from src.chat.planner_actions.action_modifier import ActionModifier from src.chat.planner_actions.action_manager import ActionManager @@ -25,6 +24,7 @@ from src.plugin_system.apis import generator_api, send_api, message_api, databas from src.chat.willing.willing_manager import get_willing_manager from src.mais4u.mai_think import mai_thinking_manager from src.mais4u.constant_s4u import ENABLE_S4U + # no_reply逻辑已集成到heartFC_chat.py中,不再需要导入 from src.chat.chat_loop.hfc_utils import send_typing, stop_typing @@ -90,7 +90,6 @@ class HeartFChatting: self.relationship_builder = relationship_builder_manager.get_or_create_builder(self.stream_id) self.expression_learner = expression_learner_manager.get_expression_learner(self.stream_id) - self.action_manager = ActionManager() self.action_planner = ActionPlanner(chat_id=self.stream_id, action_manager=self.action_manager) @@ -116,7 +115,7 @@ class HeartFChatting: logger.info(f"{self.log_prefix} HeartFChatting 初始化完成") self.energy_value = 5 - + self.focus_energy = 1 self.no_reply_consecutive = 0 # 最近三次no_reply的新消息兴趣度记录 @@ -194,28 +193,27 @@ class HeartFChatting: # 获取动作类型,兼容新旧格式 action_type = "未知动作" - if hasattr(self, '_current_cycle_detail') and self._current_cycle_detail: + if hasattr(self, "_current_cycle_detail") and self._current_cycle_detail: loop_plan_info = self._current_cycle_detail.loop_plan_info if isinstance(loop_plan_info, dict): - action_result = loop_plan_info.get('action_result', {}) + action_result = loop_plan_info.get("action_result", {}) if isinstance(action_result, dict): # 旧格式:action_result是字典 - action_type = action_result.get('action_type', '未知动作') + action_type = action_result.get("action_type", "未知动作") elif isinstance(action_result, list) and action_result: # 新格式:action_result是actions列表 - action_type = action_result[0].get('action_type', '未知动作') + action_type = action_result[0].get("action_type", "未知动作") elif isinstance(loop_plan_info, list) and loop_plan_info: # 直接是actions列表的情况 - action_type = loop_plan_info[0].get('action_type', '未知动作') + action_type = loop_plan_info[0].get("action_type", "未知动作") logger.info( f"{self.log_prefix} 第{self._current_cycle_detail.cycle_id}次思考," f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒, " # type: ignore - f"选择动作: {action_type}" - + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "") + f"选择动作: {action_type}" + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "") ) - - def _determine_form_type(self) -> str: + + def _determine_form_type(self): """判断使用哪种形式的no_reply""" # 如果连续no_reply次数少于3次,使用waiting形式 if self.no_reply_consecutive <= 3: @@ -223,71 +221,73 @@ class HeartFChatting: else: # 计算最近三次记录的兴趣度总和 total_recent_interest = sum(self.recent_interest_records) - + # 计算调整后的阈值 adjusted_threshold = 3 / global_config.chat.get_current_talk_frequency(self.stream_id) - - logger.info(f"{self.log_prefix} 最近三次兴趣度总和: {total_recent_interest:.2f}, 调整后阈值: {adjusted_threshold:.2f}") - + + logger.info( + f"{self.log_prefix} 最近三次兴趣度总和: {total_recent_interest:.2f}, 调整后阈值: {adjusted_threshold:.2f}" + ) + # 如果兴趣度总和小于阈值,进入breaking形式 if total_recent_interest < adjusted_threshold: logger.info(f"{self.log_prefix} 兴趣度不足,进入breaking形式") self.focus_energy = random.randint(3, 6) else: logger.info(f"{self.log_prefix} 兴趣度充足") - self.focus_energy = 1 - - async def _should_process_messages(self, new_message: List[Dict[str, Any]]) -> tuple[bool,float]: + self.focus_energy = 1 + + async def _should_process_messages(self, new_message: List[Dict[str, Any]]) -> tuple[bool, float]: """ 判断是否应该处理消息 - + Args: new_message: 新消息列表 mode: 当前聊天模式 - + Returns: bool: 是否应该处理消息 """ new_message_count = len(new_message) - # talk_frequency = global_config.chat.get_current_talk_frequency(self.stream_id) modified_exit_count_threshold = self.focus_energy / global_config.chat.focus_value - + total_interest = 0.0 for msg_dict in new_message: interest_value = msg_dict.get("interest_value", 0.0) if msg_dict.get("processed_plain_text", ""): total_interest += interest_value - + if new_message_count >= modified_exit_count_threshold: # 记录兴趣度到列表 - - + self.recent_interest_records.append(total_interest) - + logger.info( f"{self.log_prefix} 累计消息数量达到{new_message_count}条(>{modified_exit_count_threshold:.1f}),结束等待" ) - logger.info(self.last_read_time) - logger.info(new_message) - return True,total_interest/new_message_count + logger.info(str(self.last_read_time)) + logger.info(str(new_message)) + return True, total_interest / new_message_count # 检查累计兴趣值 if new_message_count > 0: # 只在兴趣值变化时输出log if not hasattr(self, "_last_accumulated_interest") or total_interest != self._last_accumulated_interest: - logger.info(f"{self.log_prefix} breaking形式当前累计兴趣值: {total_interest:.2f}, 专注度: {global_config.chat.focus_value:.1f}") + logger.info( + f"{self.log_prefix} breaking形式当前累计兴趣值: {total_interest:.2f}, 专注度: {global_config.chat.focus_value:.1f}" + ) self._last_accumulated_interest = total_interest - + if total_interest >= 3 / global_config.chat.focus_value: # 记录兴趣度到列表 self.recent_interest_records.append(total_interest) - + logger.info( f"{self.log_prefix} 累计兴趣值达到{total_interest:.2f}(>{3 / global_config.chat.focus_value}),结束等待" ) - return True,total_interest/new_message_count + return True, total_interest / new_message_count # 每10秒输出一次等待状态 if int(time.time() - self.last_read_time) > 0 and int(time.time() - self.last_read_time) % 10 == 0: @@ -295,29 +295,28 @@ class HeartFChatting: f"{self.log_prefix} 已等待{time.time() - self.last_read_time:.0f}秒,累计{new_message_count}条消息,累计兴趣{total_interest:.1f},继续等待..." ) await asyncio.sleep(0.5) - - return False,0.0 + return False, 0.0 async def _loopbody(self): recent_messages_dict = message_api.get_messages_by_time_in_chat( chat_id=self.stream_id, start_time=self.last_read_time, end_time=time.time(), - limit = 10, + limit=10, limit_mode="latest", filter_mai=True, filter_command=True, - ) - + ) + # 统一的消息处理逻辑 - should_process,interest_value = await self._should_process_messages(recent_messages_dict) - + should_process, interest_value = await self._should_process_messages(recent_messages_dict) + if should_process: # earliest_message_data = recent_messages_dict[0] # self.last_read_time = earliest_message_data.get("time") self.last_read_time = time.time() - await self._observe(interest_value = interest_value) + await self._observe(interest_value=interest_value) else: # Normal模式:消息数量不足,等待 @@ -328,12 +327,12 @@ class HeartFChatting: async def build_reply_to_str(self, message_data: dict): person_info_manager = get_person_info_manager() - + # 获取 platform,如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值 platform = message_data.get("chat_info_platform") if platform is None: platform = getattr(self.chat_stream, "platform", "unknown") - + person_id = person_info_manager.get_person_id( platform, # type: ignore message_data.get("user_id"), # type: ignore @@ -356,12 +355,12 @@ class HeartFChatting: # 存储reply action信息 person_info_manager = get_person_info_manager() - + # 获取 platform,如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值 platform = action_message.get("chat_info_platform") if platform is None: platform = getattr(self.chat_stream, "platform", "unknown") - + person_id = person_info_manager.get_person_id( platform, action_message.get("user_id", ""), @@ -394,17 +393,15 @@ class HeartFChatting: return loop_info, reply_text, cycle_timers - async def _observe(self,interest_value:float = 0.0) -> bool: - + async def _observe(self, interest_value: float = 0.0) -> bool: action_type = "no_action" reply_text = "" # 初始化reply_text变量,避免UnboundLocalError - reply_to_str = "" # 初始化reply_to_str变量 # 根据interest_value计算概率,决定使用哪种planner模式 # interest_value越高,越倾向于使用Normal模式 import random import math - + # 使用sigmoid函数将interest_value转换为概率 # 当interest_value为0时,概率接近0(使用Focus模式) # 当interest_value很高时,概率接近1(使用Normal模式) @@ -417,16 +414,22 @@ class HeartFChatting: k = 2.0 # 控制曲线陡峭程度 x0 = 1.0 # 控制曲线中心点 return 1.0 / (1.0 + math.exp(-k * (interest_val - x0))) - - normal_mode_probability = calculate_normal_mode_probability(interest_value) / global_config.chat.get_current_talk_frequency(self.stream_id) - + + normal_mode_probability = calculate_normal_mode_probability( + interest_value + ) / global_config.chat.get_current_talk_frequency(self.stream_id) + # 根据概率决定使用哪种模式 if random.random() < normal_mode_probability: mode = ChatMode.NORMAL - logger.info(f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f},选择Normal planner模式") + logger.info( + f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f},选择Normal planner模式" + ) else: mode = ChatMode.FOCUS - logger.info(f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f},选择Focus planner模式") + logger.info( + f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f},选择Focus planner模式" + ) # 创建新的循环信息 cycle_timers, thinking_id = self.start_cycle() @@ -463,7 +466,7 @@ class HeartFChatting: ): return False with Timer("规划器", cycle_timers): - actions, _= await self.action_planner.plan( + actions, _ = await self.action_planner.plan( mode=mode, loop_start_time=loop_start_time, available_actions=available_actions, @@ -477,7 +480,6 @@ class HeartFChatting: # action_result.get("is_parallel", True), # ) - # 3. 并行执行所有动作 async def execute_action(action_info): """执行单个动作的通用函数""" @@ -486,7 +488,7 @@ class HeartFChatting: # 直接处理no_reply逻辑,不再通过动作系统 reason = action_info.get("reasoning", "选择不回复") logger.info(f"{self.log_prefix} 选择不回复,原因: {reason}") - + # 存储no_reply信息到数据库 await database_api.store_action_info( chat_stream=self.chat_stream, @@ -497,13 +499,8 @@ class HeartFChatting: action_data={"reason": reason}, action_name="no_reply", ) - - return { - "action_type": "no_reply", - "success": True, - "reply_text": "", - "command": "" - } + + return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""} elif action_info["action_type"] != "reply": # 执行普通动作 with Timer("动作执行", cycle_timers): @@ -513,19 +510,18 @@ class HeartFChatting: action_info["action_data"], cycle_timers, thinking_id, - action_info["action_message"] + action_info["action_message"], ) return { "action_type": action_info["action_type"], "success": success, "reply_text": reply_text, - "command": command + "command": command, } else: # 执行回复动作 reply_to_str = await self.build_reply_to_str(action_info["action_message"]) - - + # 生成回复 gather_timeout = global_config.chat.thinking_timeout try: @@ -536,35 +532,20 @@ class HeartFChatting: reply_to=reply_to_str, request_type="chat.replyer", ), - timeout=gather_timeout + timeout=gather_timeout, ) except asyncio.TimeoutError: logger.warning( f"{self.log_prefix} 并行执行:回复生成超时>{global_config.chat.thinking_timeout}s,已跳过" ) - return { - "action_type": "reply", - "success": False, - "reply_text": "", - "loop_info": None - } + return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} except asyncio.CancelledError: logger.debug(f"{self.log_prefix} 并行执行:回复生成任务已被取消") - return { - "action_type": "reply", - "success": False, - "reply_text": "", - "loop_info": None - } + return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} if not response_set: logger.warning(f"{self.log_prefix} 模型超时或生成回复内容为空") - return { - "action_type": "reply", - "success": False, - "reply_text": "", - "loop_info": None - } + return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} loop_info, reply_text, cycle_timers_reply = await self._send_and_store_reply( response_set, @@ -579,7 +560,7 @@ class HeartFChatting: "action_type": "reply", "success": True, "reply_text": reply_text, - "loop_info": loop_info + "loop_info": loop_info, } except Exception as e: logger.error(f"{self.log_prefix} 执行动作时出错: {e}") @@ -589,30 +570,29 @@ class HeartFChatting: "success": False, "reply_text": "", "loop_info": None, - "error": str(e) + "error": str(e), } - + # 创建所有动作的后台任务 # print(actions) - + action_tasks = [asyncio.create_task(execute_action(action)) for action in actions] - + # 并行执行所有任务 results = await asyncio.gather(*action_tasks, return_exceptions=True) - + # 处理执行结果 reply_loop_info = None reply_text_from_reply = "" action_success = False action_reply_text = "" action_command = "" - - for i, result in enumerate(results): + + for result in results: if isinstance(result, BaseException): logger.error(f"{self.log_prefix} 动作执行异常: {result}") continue - - action_info = actions[i] + if result["action_type"] != "reply": action_success = result["success"] action_reply_text = result["reply_text"] @@ -651,7 +631,6 @@ class HeartFChatting: }, } reply_text = action_reply_text - if ENABLE_S4U: await stop_typing() @@ -663,7 +642,7 @@ class HeartFChatting: # await self.willing_manager.after_generate_reply_handle(message_data.get("message_id", "")) action_type = actions[0]["action_type"] if actions else "no_action" - + # 管理no_reply计数器:当执行了非no_reply动作时,重置计数器 if action_type != "no_reply": # no_reply逻辑已集成到heartFC_chat.py中,直接重置计数器 @@ -671,7 +650,7 @@ class HeartFChatting: self.no_reply_consecutive = 0 logger.debug(f"{self.log_prefix} 执行了{action_type}动作,重置no_reply计数器") return True - + if action_type == "no_reply": self.no_reply_consecutive += 1 self._determine_form_type() diff --git a/src/chat/knowledge/qa_manager.py b/src/chat/knowledge/qa_manager.py index 5354447af..b8b31efb4 100644 --- a/src/chat/knowledge/qa_manager.py +++ b/src/chat/knowledge/qa_manager.py @@ -24,7 +24,9 @@ class QAManager: self.kg_manager = kg_manager self.qa_model = LLMRequest(model_set=model_config.model_task_config.lpmm_qa, request_type="lpmm.qa") - async def process_query(self, question: str) -> Optional[Tuple[List[Tuple[str, float, float]], Optional[Dict[str, float]]]]: + async def process_query( + self, question: str + ) -> Optional[Tuple[List[Tuple[str, float, float]], Optional[Dict[str, float]]]]: """处理查询""" # 生成问题的Embedding @@ -56,7 +58,8 @@ class QAManager: logger.debug(f"关系检索用时:{part_end_time - part_start_time:.5f}s") for res in relation_search_res: - rel_str = self.embed_manager.relation_embedding_store.store.get(res[0]).str + if store_item := self.embed_manager.relation_embedding_store.store.get(res[0]): + rel_str = store_item.str print(f"找到相关关系,相似度:{(res[1] * 100):.2f}% - {rel_str}") # TODO: 使用LLM过滤三元组结果 @@ -105,7 +108,7 @@ class QAManager: if not query_res: logger.debug("知识库查询结果为空,可能是知识库中没有相关内容") return None - + knowledge = [ ( self.embed_manager.paragraphs_embedding_store.store[res[0]].str, diff --git a/src/config/config.py b/src/config/config.py index a9f926b58..021275514 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -70,8 +70,8 @@ def get_key_comment(toml_table, key): return item.trivia.comment if hasattr(toml_table, "keys"): for k in toml_table.keys(): - if isinstance(k, KeyType) and k.key == key: - return k.trivia.comment + if isinstance(k, KeyType) and k.key == key: # type: ignore + return k.trivia.comment # type: ignore return None