From c4c0983947c48d22f0e3740a458d5657bfe3625d Mon Sep 17 00:00:00 2001 From: UnCLAS-Prommer Date: Sun, 27 Jul 2025 12:43:59 +0800 Subject: [PATCH] typing and ruff fix --- src/chat/chat_loop/heartFC_chat.py | 236 +++++++++++++++------------- src/chat/planner_actions/planner.py | 10 +- 2 files changed, 134 insertions(+), 112 deletions(-) diff --git a/src/chat/chat_loop/heartFC_chat.py b/src/chat/chat_loop/heartFC_chat.py index 5feae462a..efa8f69b9 100644 --- a/src/chat/chat_loop/heartFC_chat.py +++ b/src/chat/chat_loop/heartFC_chat.py @@ -2,7 +2,7 @@ import asyncio import time import traceback import random -from typing import List, Optional, Dict, Any +from typing import List, Optional, Dict, Any, Tuple from rich.traceback import install from src.config.config import global_config @@ -217,9 +217,11 @@ class HeartFChatting: filter_bot=True, ) if global_config.chat.focus_value != 0: - if len(new_messages_data) > 3 / pow(global_config.chat.focus_value,0.5): + if len(new_messages_data) > 3 / pow(global_config.chat.focus_value, 0.5): self.loop_mode = ChatMode.FOCUS - self.energy_value = 10 + (len(new_messages_data) / (3 / pow(global_config.chat.focus_value,0.5))) * 10 + self.energy_value = ( + 10 + (len(new_messages_data) / (3 / pow(global_config.chat.focus_value, 0.5))) * 10 + ) return True if self.energy_value >= 30: @@ -254,20 +256,21 @@ class HeartFChatting: ) person_name = await person_info_manager.get_value(person_id, "person_name") return f"{person_name}:{message_data.get('processed_plain_text')}" - + async def _send_and_store_reply( self, - response_set, - reply_to_str, - loop_start_time, - action_message, - cycle_timers, + response_set, + reply_to_str, + loop_start_time, + action_message, + cycle_timers: Dict[str, float], thinking_id, - plan_result): + plan_result, + ) -> Tuple[Dict[str, Any], str, Dict[str, float]]: with Timer("回复发送", cycle_timers): reply_text = await self._send_response(response_set, reply_to_str, loop_start_time, action_message) - # 存储reply action信息 + # 存储reply action信息 person_info_manager = get_person_info_manager() person_id = person_info_manager.get_person_id( action_message.get("chat_info_platform", ""), @@ -275,7 +278,7 @@ class HeartFChatting: ) person_name = await person_info_manager.get_value(person_id, "person_name") action_prompt_display = f"你对{person_name}进行了回复:{reply_text}" - + await database_api.store_action_info( chat_stream=self.chat_stream, action_build_into_prompt=False, @@ -285,10 +288,9 @@ class HeartFChatting: action_data={"reply_text": reply_text, "reply_to": reply_to_str}, action_name="reply", ) - - + # 构建循环信息 - loop_info = { + loop_info: Dict[str, Any] = { "loop_plan_info": { "action_result": plan_result.get("action_result", {}), }, @@ -299,8 +301,8 @@ class HeartFChatting: "taken_time": time.time(), }, } - - return loop_info, reply_text,cycle_timers + + return loop_info, reply_text, cycle_timers async def _observe(self, message_data: Optional[Dict[str, Any]] = None): # sourcery skip: hoist-statement-from-if, merge-comparisons, reintroduce-else @@ -310,12 +312,12 @@ class HeartFChatting: reply_text = "" # 初始化reply_text变量,避免UnboundLocalError gen_task = None # 初始化gen_task变量,避免UnboundLocalError reply_to_str = "" # 初始化reply_to_str变量 - + # 创建新的循环信息 cycle_timers, thinking_id = self.start_cycle() logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次思考[模式:{self.loop_mode}]") - + if ENABLE_S4U: await send_typing() @@ -337,19 +339,20 @@ class HeartFChatting: skip_planner = False if self.loop_mode == ChatMode.NORMAL: # 过滤掉reply相关的动作,检查是否还有其他动作 - non_reply_actions = {k: v for k, v in available_actions.items() - if k not in ['reply', 'no_reply', 'no_action']} - + non_reply_actions = { + k: v for k, v in available_actions.items() if k not in ["reply", "no_reply", "no_action"] + } + if not non_reply_actions: skip_planner = True logger.info(f"{self.log_prefix} Normal模式下没有可用动作,直接回复") - + # 直接设置为reply动作 action_type = "reply" reasoning = "" action_data = {"loop_start_time": loop_start_time} is_parallel = False - + # 构建plan_result用于后续处理 plan_result = { "action_result": { @@ -361,22 +364,25 @@ class HeartFChatting: }, "action_prompt": "", } - target_message = message_data - + target_message = message_data + # 如果normal模式且不跳过规划器,开始一个回复生成进程,先准备好回复(其实是和planer同时进行的) if not skip_planner: reply_to_str = await self.build_reply_to_str(message_data) - gen_task = asyncio.create_task(self._generate_response( - message_data=message_data, - available_actions=available_actions, - reply_to=reply_to_str, - request_type="chat.replyer.normal")) - + gen_task = asyncio.create_task( + self._generate_response( + message_data=message_data, + available_actions=available_actions, + reply_to=reply_to_str, + request_type="chat.replyer.normal", + ) + ) + if not skip_planner: with Timer("规划器", cycle_timers): plan_result, target_message = await self.action_planner.plan(mode=self.loop_mode) - action_result: dict = plan_result.get("action_result", {}) # type: ignore + action_result: Dict[str, Any] = plan_result.get("action_result", {}) # type: ignore action_type, action_data, reasoning, is_parallel = ( action_result.get("action_type", "error"), action_result.get("action_data", {}), @@ -386,29 +392,27 @@ class HeartFChatting: action_data["loop_start_time"] = loop_start_time - if action_type == "reply": logger.info(f"{self.log_prefix}{global_config.bot.nickname} 决定进行回复") elif is_parallel: - logger.info( - f"{self.log_prefix}{global_config.bot.nickname} 决定进行回复, 同时执行{action_type}动作" - ) + logger.info(f"{self.log_prefix}{global_config.bot.nickname} 决定进行回复, 同时执行{action_type}动作") else: # 只有在gen_task存在时才进行相关操作 - if gen_task is not None: + if gen_task: if not gen_task.done(): gen_task.cancel() logger.debug(f"{self.log_prefix} 已取消预生成的回复任务") logger.info( f"{self.log_prefix}{global_config.bot.nickname} 原本想要回复,但选择执行{action_type},不发表回复" ) - else: - content = " ".join([item[1] for item in gen_task.result() if item[0] == "text"]) + elif generation_result := gen_task.result(): + content = " ".join([item[1] for item in generation_result if item[0] == "text"]) logger.debug(f"{self.log_prefix} 预生成的回复任务已完成") logger.info( f"{self.log_prefix}{global_config.bot.nickname} 原本想要回复:{content},但选择执行{action_type},不发表回复" ) - + else: + logger.warning(f"{self.log_prefix} 预生成的回复任务未生成有效内容") action_message: Dict[str, Any] = message_data or target_message # type: ignore if action_type == "reply": @@ -417,11 +421,14 @@ class HeartFChatting: # 只有在gen_task存在时才等待 if not gen_task: reply_to_str = await self.build_reply_to_str(message_data) - gen_task = asyncio.create_task(self._generate_response( - message_data=message_data, - available_actions=available_actions, - reply_to=reply_to_str, - request_type="chat.replyer.normal")) + gen_task = asyncio.create_task( + self._generate_response( + message_data=message_data, + available_actions=available_actions, + reply_to=reply_to_str, + request_type="chat.replyer.normal", + ) + ) gather_timeout = global_config.chat.thinking_timeout try: @@ -436,56 +443,71 @@ class HeartFChatting: return False else: logger.info(f"{self.log_prefix}{global_config.bot.nickname} 决定进行回复 (focus模式)") - + # 构建reply_to字符串 reply_to_str = await self.build_reply_to_str(action_message) - + # 生成回复 with Timer("回复生成", cycle_timers): response_set = await self._generate_response( - message_data=action_message, - available_actions=available_actions, - reply_to=reply_to_str, - request_type="chat.replyer.focus") - + message_data=action_message, + available_actions=available_actions, + reply_to=reply_to_str, + request_type="chat.replyer.focus", + ) + if not response_set: logger.warning(f"{self.log_prefix}模型未生成回复内容") return False - - loop_info, reply_text,cycle_timers = await self._send_and_store_reply(response_set, reply_to_str, loop_start_time, action_message, cycle_timers, thinking_id, plan_result) - + + loop_info, reply_text, cycle_timers = await self._send_and_store_reply( + response_set, reply_to_str, loop_start_time, action_message, cycle_timers, thinking_id, plan_result + ) + return True else: # 并行执行:同时进行回复发送和动作执行 - tasks = [] - + # 先置空防止未定义错误 + background_reply_task = None + background_action_task = None # 如果是并行执行且在normal模式下,需要等待预生成的回复任务完成并发送回复 if self.loop_mode == ChatMode.NORMAL and is_parallel and gen_task: - async def handle_reply_task(): + + async def handle_reply_task() -> Tuple[Optional[Dict[str, Any]], str, Dict[str, float]]: # 等待预生成的回复任务完成 gather_timeout = global_config.chat.thinking_timeout try: response_set = await asyncio.wait_for(gen_task, timeout=gather_timeout) - + except asyncio.TimeoutError: - logger.warning(f"{self.log_prefix} 并行执行:回复生成超时>{global_config.chat.thinking_timeout}s,已跳过") + logger.warning( + f"{self.log_prefix} 并行执行:回复生成超时>{global_config.chat.thinking_timeout}s,已跳过" + ) return None, "", {} except asyncio.CancelledError: logger.debug(f"{self.log_prefix} 并行执行:回复生成任务已被取消") return None, "", {} - + if not response_set: logger.warning(f"{self.log_prefix} 模型超时或生成回复内容为空") return None, "", {} - + reply_to_str = await self.build_reply_to_str(action_message) - loop_info, reply_text, cycle_timers_reply = await self._send_and_store_reply(response_set, reply_to_str, loop_start_time, action_message, cycle_timers, thinking_id, plan_result) + loop_info, reply_text, cycle_timers_reply = await self._send_and_store_reply( + response_set, + reply_to_str, + loop_start_time, + action_message, + cycle_timers, + thinking_id, + plan_result, + ) return loop_info, reply_text, cycle_timers_reply - - # 添加回复任务到并行任务列表 - tasks.append(asyncio.create_task(handle_reply_task())) - + + # 执行回复任务并赋值到变量 + background_reply_task = asyncio.create_task(handle_reply_task()) + # 动作执行任务 async def handle_action_task(): with Timer("动作执行", cycle_timers): @@ -493,52 +515,55 @@ class HeartFChatting: action_type, reasoning, action_data, cycle_timers, thinking_id, action_message ) return success, reply_text, command - - # 添加动作执行任务到并行任务列表 - tasks.append(asyncio.create_task(handle_action_task())) - - # 并行执行所有任务 - results = await asyncio.gather(*tasks, return_exceptions=True) - - # 处理结果 + + # 执行动作任务并赋值到变量 + background_action_task = asyncio.create_task(handle_action_task()) + reply_loop_info = None reply_text_from_reply = "" action_success = False action_reply_text = "" action_command = "" - - if len(tasks) == 2: # 有回复任务和动作任务 + + # 并行执行所有任务 + if background_reply_task: + results = await asyncio.gather( + background_reply_task, background_action_task, return_exceptions=True + ) # 处理回复任务结果 reply_result = results[0] - if isinstance(reply_result, Exception): + if isinstance(reply_result, BaseException): logger.error(f"{self.log_prefix} 回复任务执行异常: {reply_result}") elif reply_result and reply_result[0] is not None: reply_loop_info, reply_text_from_reply, _ = reply_result - + # 处理动作任务结果 - action_result = results[1] - if isinstance(action_result, Exception): - logger.error(f"{self.log_prefix} 动作任务执行异常: {action_result}") + action_task_result = results[1] + if isinstance(action_task_result, BaseException): + logger.error(f"{self.log_prefix} 动作任务执行异常: {action_task_result}") else: - action_success, action_reply_text, action_command = action_result - - else: # 只有动作任务 - action_result = results[0] - if isinstance(action_result, Exception): - logger.error(f"{self.log_prefix} 动作任务执行异常: {action_result}") + action_success, action_reply_text, action_command = action_task_result + else: + results = await asyncio.gather(background_action_task, return_exceptions=True) + # 只有动作任务 + action_task_result = results[0] + if isinstance(action_task_result, BaseException): + logger.error(f"{self.log_prefix} 动作任务执行异常: {action_task_result}") else: - action_success, action_reply_text, action_command = action_result + action_success, action_reply_text, action_command = action_task_result # 构建最终的循环信息 if reply_loop_info: # 如果有回复信息,使用回复的loop_info作为基础 loop_info = reply_loop_info # 更新动作执行信息 - loop_info["loop_action_info"].update({ - "action_taken": action_success, - "command": action_command, - "taken_time": time.time(), - }) + loop_info["loop_action_info"].update( + { + "action_taken": action_success, + "command": action_command, + "taken_time": time.time(), + } + ) reply_text = reply_text_from_reply else: # 没有回复信息,构建纯动作的loop_info @@ -555,11 +580,10 @@ class HeartFChatting: } reply_text = action_reply_text - if ENABLE_S4U: await stop_typing() await mai_thinking_manager.get_mai_think(self.stream_id).do_think_after_response(reply_text) - + self.end_cycle(loop_info, cycle_timers) self.print_cycle_info(cycle_timers) @@ -603,7 +627,7 @@ class HeartFChatting: action: str, reasoning: str, action_data: dict, - cycle_timers: dict, + cycle_timers: Dict[str, float], thinking_id: str, action_message: dict, ) -> tuple[bool, str, str]: @@ -712,7 +736,11 @@ class HeartFChatting: return False async def _generate_response( - self, message_data: dict, available_actions: Optional[Dict[str, ActionInfo]], reply_to: str, request_type: str = "chat.replyer.normal" + self, + message_data: dict, + available_actions: Optional[Dict[str, ActionInfo]], + reply_to: str, + request_type: str = "chat.replyer.normal", ) -> Optional[list]: """生成普通回复""" try: @@ -734,7 +762,7 @@ class HeartFChatting: logger.error(f"{self.log_prefix}回复生成出现错误:{str(e)} {traceback.format_exc()}") return None - async def _send_response(self, reply_set, reply_to, thinking_start_time, message_data): + async def _send_response(self, reply_set, reply_to, thinking_start_time, message_data) -> str: current_time = time.time() new_message_count = message_api.count_new_messages( chat_id=self.chat_stream.stream_id, start_time=thinking_start_time, end_time=current_time @@ -746,13 +774,9 @@ class HeartFChatting: need_reply = new_message_count >= random.randint(2, 4) if need_reply: - logger.info( - f"{self.log_prefix} 从思考到回复,共有{new_message_count}条新消息,使用引用回复" - ) + logger.info(f"{self.log_prefix} 从思考到回复,共有{new_message_count}条新消息,使用引用回复") else: - logger.info( - f"{self.log_prefix} 从思考到回复,共有{new_message_count}条新消息,不使用引用回复" - ) + logger.info(f"{self.log_prefix} 从思考到回复,共有{new_message_count}条新消息,不使用引用回复") reply_text = "" first_replied = False diff --git a/src/chat/planner_actions/planner.py b/src/chat/planner_actions/planner.py index d3ea2fd74..0b26a97d0 100644 --- a/src/chat/planner_actions/planner.py +++ b/src/chat/planner_actions/planner.py @@ -211,9 +211,8 @@ class ActionPlanner: reasoning = f"Planner 内部处理错误: {outer_e}" is_parallel = False - if mode == ChatMode.NORMAL: - if action in current_available_actions: - is_parallel = current_available_actions[action].parallel_action + if mode == ChatMode.NORMAL and action in current_available_actions: + is_parallel = current_available_actions[action].parallel_action action_result = { "action_type": action, @@ -256,7 +255,7 @@ class ActionPlanner: actions_before_now = get_actions_by_timestamp_with_chat( chat_id=self.chat_id, - timestamp_start=time.time()-3600, + timestamp_start=time.time() - 3600, timestamp_end=time.time(), limit=5, ) @@ -264,7 +263,7 @@ class ActionPlanner: actions_before_now_block = build_readable_actions( actions=actions_before_now, ) - + actions_before_now_block = f"你刚刚选择并执行过的action是:\n{actions_before_now_block}" self.last_obs_time_mark = time.time() @@ -276,7 +275,6 @@ class ActionPlanner: if global_config.chat.at_bot_inevitable_reply: mentioned_bonus = "\n- 有人提到你,或者at你" - by_what = "聊天内容" target_prompt = '\n "target_message_id":"触发action的消息id"' no_action_block = f"""重要说明: