From 503d893171f756d7a62fa363d0d7a986f790e99b Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Wed, 23 Apr 2025 00:46:39 +0800 Subject: [PATCH] =?UTF-8?q?fix=EF=BC=9A=E4=BF=AE=E5=A4=8Dsub=5Fheartflow.p?= =?UTF-8?q?y:172:=20RuntimeWarning:=20coroutine=20'SubHeartflow.set=5Fchat?= =?UTF-8?q?=5Fstate'=20was=20never=20awaited?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/heart_flow/heartflow.py | 53 ++++++++++++++++++++++----------- src/heart_flow/sub_heartflow.py | 44 +++++++++++++-------------- 2 files changed, 58 insertions(+), 39 deletions(-) diff --git a/src/heart_flow/heartflow.py b/src/heart_flow/heartflow.py index e74364ea1..c2bc19893 100644 --- a/src/heart_flow/heartflow.py +++ b/src/heart_flow/heartflow.py @@ -193,7 +193,7 @@ class Heartflow: await asyncio.sleep(interval_seconds) try: current_timestamp = time.time() - all_interest_states = self.get_all_interest_states() # 获取所有子心流的兴趣状态 + all_interest_states = await self.get_all_interest_states() # 获取所有子心流的兴趣状态 # 以追加模式打开历史日志文件 # 移除 try-except IO 块,根据用户要求 @@ -343,15 +343,10 @@ class Heartflow: for sub_hf in subflows_snapshot: # Double-check if subflow still exists and is in CHAT state - if ( - sub_hf.subheartflow_id in self._subheartflows - and sub_hf.chat_state.chat_status == ChatState.CHAT - ): + if sub_hf.subheartflow_id in self._subheartflows and sub_hf.chat_state.chat_status == ChatState.CHAT: evaluated_count += 1 - if sub_hf.should_evaluate_reply(): - stream_name = ( - chat_manager.get_stream_name(sub_hf.subheartflow_id) or sub_hf.subheartflow_id - ) + if await sub_hf.should_evaluate_reply(): + stream_name = chat_manager.get_stream_name(sub_hf.subheartflow_id) or sub_hf.subheartflow_id log_prefix = f"[{stream_name}]" logger.info(f"{log_prefix} 兴趣概率触发,尝试将状态从 CHAT 提升到 FOCUSED") # set_chat_state handles limit checks and HeartFChatting creation internally @@ -378,18 +373,42 @@ class Heartflow: logger.info(f"当前状态:{self.current_state.mai_status.value}") - def get_all_interest_states(self) -> Dict[str, Dict]: # 新增方法 + async def get_all_interest_states(self) -> Dict[str, Dict]: # <-- Make async """获取所有活跃子心流的当前兴趣状态""" states = {} # 创建副本以避免在迭代时修改字典 - items_snapshot = list(self._subheartflows.items()) + items_snapshot = list(self._subheartflows.items()) # Make a copy for safe iteration + tasks = [] + results = {} + + # Create tasks to get state concurrently for stream_id, subheartflow in items_snapshot: - try: - # 从 SubHeartflow 获取其 InterestChatting 的状态 - states[stream_id] = subheartflow.get_interest_state() - except Exception as e: - logger.warning(f"[Heartflow] Error getting interest state for subheartflow {stream_id}: {e}") - return states + # Ensure subheartflow still exists before creating task + if stream_id in self._subheartflows: + tasks.append(asyncio.create_task(subheartflow.get_interest_state(), name=f"get_state_{stream_id}")) + else: + logger.warning(f"[Heartflow] Subheartflow {stream_id} disappeared before getting state.") + + # Wait for all tasks to complete + if tasks: + done, pending = await asyncio.wait(tasks, timeout=5.0) # Add a timeout + + if pending: + logger.warning(f"[Heartflow] Getting interest states timed out for {len(pending)} tasks.") + for task in pending: + task.cancel() + + for task in done: + stream_id = task.get_name().split("_")[-1] # Extract stream_id from task name + try: + result = task.result() + results[stream_id] = result + except asyncio.CancelledError: + logger.warning(f"[Heartflow] Task to get interest state for {stream_id} was cancelled (timeout).") + except Exception as e: + logger.warning(f"[Heartflow] Error getting interest state for subheartflow {stream_id}: {e}") + + return results def cleanup_inactive_subheartflows(self, max_age_seconds=INACTIVE_THRESHOLD_SECONDS): # 修改此方法以使用兴趣时间 """ diff --git a/src/heart_flow/sub_heartflow.py b/src/heart_flow/sub_heartflow.py index 650b51961..9bbe2f9a5 100644 --- a/src/heart_flow/sub_heartflow.py +++ b/src/heart_flow/sub_heartflow.py @@ -115,7 +115,7 @@ class InterestChatting: self.interest_dict[message.message_info.message_id] = (message, interest_value, is_mentioned) self.last_interaction_time = time.time() - def _calculate_decay(self, current_time: float): + async def _calculate_decay(self, current_time: float): time_delta = current_time - self.last_update_time if time_delta > 0: old_interest = self.interest_level @@ -145,7 +145,7 @@ class InterestChatting: if old_interest != self.interest_level: self.last_update_time = current_time - def _update_reply_probability(self, current_time: float): + async def _update_reply_probability(self, current_time: float): time_delta = current_time - self.last_update_time if time_delta <= 0: return @@ -169,7 +169,7 @@ class InterestChatting: if previous_is_above: if self.state_change_callback: try: - self.state_change_callback(ChatState.ABSENT) + await self.state_change_callback(ChatState.ABSENT) except Exception as e: interest_logger.error(f"Error calling state_change_callback for ABSENT: {e}") @@ -187,30 +187,30 @@ class InterestChatting: self.is_above_threshold = currently_above - def increase_interest(self, current_time: float, value: float): - self._update_reply_probability(current_time) - self._calculate_decay(current_time) + async def increase_interest(self, current_time: float, value: float): + await self._update_reply_probability(current_time) + await self._calculate_decay(current_time) self.interest_level += value self.interest_level = min(self.interest_level, self.max_interest) self.last_update_time = current_time self.last_interaction_time = current_time - def decrease_interest(self, current_time: float, value: float): - self._update_reply_probability(current_time) + async def decrease_interest(self, current_time: float, value: float): + await self._update_reply_probability(current_time) self.interest_level -= value self.interest_level = max(self.interest_level, 0.0) self.last_update_time = current_time self.last_interaction_time = current_time - def get_interest(self) -> float: + async def get_interest(self) -> float: current_time = time.time() - self._update_reply_probability(current_time) - self._calculate_decay(current_time) + await self._update_reply_probability(current_time) + await self._calculate_decay(current_time) self.last_update_time = current_time return self.interest_level - def get_state(self) -> dict: - interest = self.get_interest() + async def get_state(self) -> dict: + interest = await self.get_interest() return { "interest_level": round(interest, 2), "last_update_time": self.last_update_time, @@ -219,9 +219,9 @@ class InterestChatting: "last_interaction_time": self.last_interaction_time, } - def should_evaluate_reply(self) -> bool: + async def should_evaluate_reply(self) -> bool: current_time = time.time() - self._update_reply_probability(current_time) + await self._update_reply_probability(current_time) if self.current_reply_probability > 0: trigger = random.random() < self.current_reply_probability @@ -496,16 +496,16 @@ class SubHeartflow: logger.warning(f"SubHeartflow {self.subheartflow_id} 没有找到有效的 ChattingObservation") return None - def get_interest_state(self) -> dict: - return self.interest_chatting.get_state() + async def get_interest_state(self) -> dict: + return await self.interest_chatting.get_state() - def get_interest_level(self) -> float: - return self.interest_chatting.get_interest() + async def get_interest_level(self) -> float: + return await self.interest_chatting.get_interest() - def should_evaluate_reply(self) -> bool: - return self.interest_chatting.should_evaluate_reply() + async def should_evaluate_reply(self) -> bool: + return await self.interest_chatting.should_evaluate_reply() - def add_interest_dict_entry(self, message: MessageRecv, interest_value: float, is_mentioned: bool): + async def add_interest_dict_entry(self, message: MessageRecv, interest_value: float, is_mentioned: bool): self.interest_chatting.add_interest_dict(message, interest_value, is_mentioned) def get_interest_dict(self) -> Dict[str, tuple[MessageRecv, float, bool]]: