fix:修复sub_heartflow.py:172: RuntimeWarning: coroutine 'SubHeartflow.set_chat_state' was never awaited

This commit is contained in:
SengokuCola
2025-04-23 00:46:39 +08:00
parent aca3bdddb3
commit 503d893171
2 changed files with 58 additions and 39 deletions

View File

@@ -193,7 +193,7 @@ class Heartflow:
await asyncio.sleep(interval_seconds) await asyncio.sleep(interval_seconds)
try: try:
current_timestamp = time.time() current_timestamp = time.time()
all_interest_states = self.get_all_interest_states() # 获取所有子心流的兴趣状态 all_interest_states = await self.get_all_interest_states() # 获取所有子心流的兴趣状态
# 以追加模式打开历史日志文件 # 以追加模式打开历史日志文件
# 移除 try-except IO 块,根据用户要求 # 移除 try-except IO 块,根据用户要求
@@ -343,15 +343,10 @@ class Heartflow:
for sub_hf in subflows_snapshot: for sub_hf in subflows_snapshot:
# Double-check if subflow still exists and is in CHAT state # Double-check if subflow still exists and is in CHAT state
if ( if sub_hf.subheartflow_id in self._subheartflows and sub_hf.chat_state.chat_status == ChatState.CHAT:
sub_hf.subheartflow_id in self._subheartflows
and sub_hf.chat_state.chat_status == ChatState.CHAT
):
evaluated_count += 1 evaluated_count += 1
if sub_hf.should_evaluate_reply(): if await sub_hf.should_evaluate_reply():
stream_name = ( stream_name = chat_manager.get_stream_name(sub_hf.subheartflow_id) or sub_hf.subheartflow_id
chat_manager.get_stream_name(sub_hf.subheartflow_id) or sub_hf.subheartflow_id
)
log_prefix = f"[{stream_name}]" log_prefix = f"[{stream_name}]"
logger.info(f"{log_prefix} 兴趣概率触发,尝试将状态从 CHAT 提升到 FOCUSED") logger.info(f"{log_prefix} 兴趣概率触发,尝试将状态从 CHAT 提升到 FOCUSED")
# set_chat_state handles limit checks and HeartFChatting creation internally # set_chat_state handles limit checks and HeartFChatting creation internally
@@ -378,18 +373,42 @@ class Heartflow:
logger.info(f"当前状态:{self.current_state.mai_status.value}") logger.info(f"当前状态:{self.current_state.mai_status.value}")
def get_all_interest_states(self) -> Dict[str, Dict]: # 新增方法 async def get_all_interest_states(self) -> Dict[str, Dict]: # <-- Make async
"""获取所有活跃子心流的当前兴趣状态""" """获取所有活跃子心流的当前兴趣状态"""
states = {} states = {}
# 创建副本以避免在迭代时修改字典 # 创建副本以避免在迭代时修改字典
items_snapshot = list(self._subheartflows.items()) items_snapshot = list(self._subheartflows.items()) # Make a copy for safe iteration
tasks = []
results = {}
# Create tasks to get state concurrently
for stream_id, subheartflow in items_snapshot: for stream_id, subheartflow in items_snapshot:
# Ensure subheartflow still exists before creating task
if stream_id in self._subheartflows:
tasks.append(asyncio.create_task(subheartflow.get_interest_state(), name=f"get_state_{stream_id}"))
else:
logger.warning(f"[Heartflow] Subheartflow {stream_id} disappeared before getting state.")
# Wait for all tasks to complete
if tasks:
done, pending = await asyncio.wait(tasks, timeout=5.0) # Add a timeout
if pending:
logger.warning(f"[Heartflow] Getting interest states timed out for {len(pending)} tasks.")
for task in pending:
task.cancel()
for task in done:
stream_id = task.get_name().split("_")[-1] # Extract stream_id from task name
try: try:
# 从 SubHeartflow 获取其 InterestChatting 的状态 result = task.result()
states[stream_id] = subheartflow.get_interest_state() results[stream_id] = result
except asyncio.CancelledError:
logger.warning(f"[Heartflow] Task to get interest state for {stream_id} was cancelled (timeout).")
except Exception as e: except Exception as e:
logger.warning(f"[Heartflow] Error getting interest state for subheartflow {stream_id}: {e}") logger.warning(f"[Heartflow] Error getting interest state for subheartflow {stream_id}: {e}")
return states
return results
def cleanup_inactive_subheartflows(self, max_age_seconds=INACTIVE_THRESHOLD_SECONDS): # 修改此方法以使用兴趣时间 def cleanup_inactive_subheartflows(self, max_age_seconds=INACTIVE_THRESHOLD_SECONDS): # 修改此方法以使用兴趣时间
""" """

View File

@@ -115,7 +115,7 @@ class InterestChatting:
self.interest_dict[message.message_info.message_id] = (message, interest_value, is_mentioned) self.interest_dict[message.message_info.message_id] = (message, interest_value, is_mentioned)
self.last_interaction_time = time.time() self.last_interaction_time = time.time()
def _calculate_decay(self, current_time: float): async def _calculate_decay(self, current_time: float):
time_delta = current_time - self.last_update_time time_delta = current_time - self.last_update_time
if time_delta > 0: if time_delta > 0:
old_interest = self.interest_level old_interest = self.interest_level
@@ -145,7 +145,7 @@ class InterestChatting:
if old_interest != self.interest_level: if old_interest != self.interest_level:
self.last_update_time = current_time self.last_update_time = current_time
def _update_reply_probability(self, current_time: float): async def _update_reply_probability(self, current_time: float):
time_delta = current_time - self.last_update_time time_delta = current_time - self.last_update_time
if time_delta <= 0: if time_delta <= 0:
return return
@@ -169,7 +169,7 @@ class InterestChatting:
if previous_is_above: if previous_is_above:
if self.state_change_callback: if self.state_change_callback:
try: try:
self.state_change_callback(ChatState.ABSENT) await self.state_change_callback(ChatState.ABSENT)
except Exception as e: except Exception as e:
interest_logger.error(f"Error calling state_change_callback for ABSENT: {e}") interest_logger.error(f"Error calling state_change_callback for ABSENT: {e}")
@@ -187,30 +187,30 @@ class InterestChatting:
self.is_above_threshold = currently_above self.is_above_threshold = currently_above
def increase_interest(self, current_time: float, value: float): async def increase_interest(self, current_time: float, value: float):
self._update_reply_probability(current_time) await self._update_reply_probability(current_time)
self._calculate_decay(current_time) await self._calculate_decay(current_time)
self.interest_level += value self.interest_level += value
self.interest_level = min(self.interest_level, self.max_interest) self.interest_level = min(self.interest_level, self.max_interest)
self.last_update_time = current_time self.last_update_time = current_time
self.last_interaction_time = current_time self.last_interaction_time = current_time
def decrease_interest(self, current_time: float, value: float): async def decrease_interest(self, current_time: float, value: float):
self._update_reply_probability(current_time) await self._update_reply_probability(current_time)
self.interest_level -= value self.interest_level -= value
self.interest_level = max(self.interest_level, 0.0) self.interest_level = max(self.interest_level, 0.0)
self.last_update_time = current_time self.last_update_time = current_time
self.last_interaction_time = current_time self.last_interaction_time = current_time
def get_interest(self) -> float: async def get_interest(self) -> float:
current_time = time.time() current_time = time.time()
self._update_reply_probability(current_time) await self._update_reply_probability(current_time)
self._calculate_decay(current_time) await self._calculate_decay(current_time)
self.last_update_time = current_time self.last_update_time = current_time
return self.interest_level return self.interest_level
def get_state(self) -> dict: async def get_state(self) -> dict:
interest = self.get_interest() interest = await self.get_interest()
return { return {
"interest_level": round(interest, 2), "interest_level": round(interest, 2),
"last_update_time": self.last_update_time, "last_update_time": self.last_update_time,
@@ -219,9 +219,9 @@ class InterestChatting:
"last_interaction_time": self.last_interaction_time, "last_interaction_time": self.last_interaction_time,
} }
def should_evaluate_reply(self) -> bool: async def should_evaluate_reply(self) -> bool:
current_time = time.time() current_time = time.time()
self._update_reply_probability(current_time) await self._update_reply_probability(current_time)
if self.current_reply_probability > 0: if self.current_reply_probability > 0:
trigger = random.random() < self.current_reply_probability trigger = random.random() < self.current_reply_probability
@@ -496,16 +496,16 @@ class SubHeartflow:
logger.warning(f"SubHeartflow {self.subheartflow_id} 没有找到有效的 ChattingObservation") logger.warning(f"SubHeartflow {self.subheartflow_id} 没有找到有效的 ChattingObservation")
return None return None
def get_interest_state(self) -> dict: async def get_interest_state(self) -> dict:
return self.interest_chatting.get_state() return await self.interest_chatting.get_state()
def get_interest_level(self) -> float: async def get_interest_level(self) -> float:
return self.interest_chatting.get_interest() return await self.interest_chatting.get_interest()
def should_evaluate_reply(self) -> bool: async def should_evaluate_reply(self) -> bool:
return self.interest_chatting.should_evaluate_reply() return await self.interest_chatting.should_evaluate_reply()
def add_interest_dict_entry(self, message: MessageRecv, interest_value: float, is_mentioned: bool): async def add_interest_dict_entry(self, message: MessageRecv, interest_value: float, is_mentioned: bool):
self.interest_chatting.add_interest_dict(message, interest_value, is_mentioned) self.interest_chatting.add_interest_dict(message, interest_value, is_mentioned)
def get_interest_dict(self) -> Dict[str, tuple[MessageRecv, float, bool]]: def get_interest_dict(self) -> Dict[str, tuple[MessageRecv, float, bool]]: