feat:进一步合并normal和focus模式,移除interest_dict(附带其他合理性修改)

This commit is contained in:
SengokuCola
2025-07-11 21:51:30 +08:00
parent 3b9d656645
commit 0cdf53fb85
9 changed files with 245 additions and 570 deletions

View File

@@ -1,3 +1,4 @@
import traceback
from src.chat.heart_flow.sub_heartflow import SubHeartflow, ChatState
from src.common.logger import get_logger
from typing import Any, Optional
@@ -30,11 +31,12 @@ class Heartflow:
# 注册子心流
self.subheartflows[subheartflow_id] = new_subflow
heartflow_name = get_chat_manager().get_stream_name(subheartflow_id) or subheartflow_id
logger.debug(f"[{heartflow_name}] 开始接收消息")
logger.info(f"[{heartflow_name}] 开始接收消息")
return new_subflow
except Exception as e:
logger.error(f"创建子心流 {subheartflow_id} 失败: {e}", exc_info=True)
traceback.print_exc()
return None
async def force_change_subheartflow_status(self, subheartflow_id: str, status: ChatState) -> None:

View File

@@ -108,13 +108,14 @@ class HeartFCMessageReceiver:
interested_rate, is_mentioned = await _calculate_interest(message)
message.interest_value = interested_rate
message.is_mentioned = is_mentioned
await self.storage.store_message(message, chat)
subheartflow = await heartflow.get_or_create_subheartflow(chat.stream_id)
message.update_chat_stream(chat)
subheartflow.add_message_to_normal_chat_cache(message, interested_rate, is_mentioned)
# subheartflow.add_message_to_normal_chat_cache(message, interested_rate, is_mentioned)
chat_mood = mood_manager.get_mood_by_chat_id(subheartflow.chat_id)
asyncio.create_task(chat_mood.update_mood_by_message(message, interested_rate))

View File

@@ -39,16 +39,21 @@ class SubHeartflow:
self.is_group_chat, self.chat_target_info = get_chat_type_and_target_info(self.chat_id)
self.log_prefix = get_chat_manager().get_stream_name(self.subheartflow_id) or self.subheartflow_id
# 兴趣消息集合
self.interest_dict: Dict[str, tuple[MessageRecv, float, bool]] = {}
# focus模式退出冷却时间管理
self.last_focus_exit_time: float = 0 # 上次退出focus模式的时间
# 随便水群 normal_chat 和 认真水群 focus_chat 实例
# CHAT模式激活 随便水群 FOCUS模式激活 认真水群
self.heart_fc_instance: Optional[HeartFChatting] = None # 该sub_heartflow的HeartFChatting实例
self.normal_chat_instance: Optional[NormalChat] = None # 该sub_heartflow的NormalChat实例
self.heart_fc_instance: Optional[HeartFChatting] = HeartFChatting(
chat_id=self.subheartflow_id,
on_stop_focus_chat=self._handle_stop_focus_chat_request,
) # 该sub_heartflow的HeartFChatting实例
self.normal_chat_instance: Optional[NormalChat] = NormalChat(
chat_stream=get_chat_manager().get_stream(self.chat_id),
on_switch_to_focus_callback=self._handle_switch_to_focus_request,
get_cooldown_progress_callback=self.get_cooldown_progress,
) # 该sub_heartflow的NormalChat实例
async def initialize(self):
"""异步初始化方法,创建兴趣流并确定聊天类型"""
@@ -79,10 +84,6 @@ class SubHeartflow:
# 使用更短的超时时间,强制快速停止
await asyncio.wait_for(self.normal_chat_instance.stop_chat(), timeout=3.0)
logger.debug(f"{self.log_prefix} stop_chat() 调用完成")
except asyncio.TimeoutError:
logger.warning(f"{self.log_prefix} 停止 NormalChat 超时,强制清理")
# 超时时强制清理实例
self.normal_chat_instance = None
except Exception as e:
logger.error(f"{self.log_prefix} 停止 NormalChat 监控任务时出错: {e}")
# 出错时也要清理实例,避免状态不一致
@@ -93,8 +94,10 @@ class SubHeartflow:
logger.warning(f"{self.log_prefix} 强制清理 NormalChat 实例")
self.normal_chat_instance = None
logger.debug(f"{self.log_prefix} _stop_normal_chat 完成")
else:
logger.info(f"{self.log_prefix} 没有normal聊天实例无需停止normal聊天")
async def _start_normal_chat(self, rewind=False) -> bool:
async def _start_normal_chat(self) -> bool:
"""
启动 NormalChat 实例,并进行异步初始化。
进入 CHAT 状态时使用。
@@ -102,30 +105,23 @@ class SubHeartflow:
"""
await self._stop_heart_fc_chat() # 确保 专注聊天已停止
self.interest_dict.clear()
log_prefix = self.log_prefix
try:
# 获取聊天流并创建 NormalChat 实例 (同步部分)
chat_stream = get_chat_manager().get_stream(self.chat_id)
if not chat_stream:
logger.error(f"{log_prefix} 无法获取 chat_stream无法启动 NormalChat。")
return False
# 在 rewind 为 True 或 NormalChat 实例尚未创建时,创建新实例
if rewind or not self.normal_chat_instance:
# 在 NormalChat 实例尚未创建时,创建新实例
if not self.normal_chat_instance:
# 提供回调函数用于接收需要切换到focus模式的通知
self.normal_chat_instance = NormalChat(
chat_stream=chat_stream,
interest_dict=self.interest_dict,
on_switch_to_focus_callback=self._handle_switch_to_focus_request,
get_cooldown_progress_callback=self.get_cooldown_progress,
)
logger.info(f"{log_prefix} 开始普通聊天,随便水群...")
logger.info(f"[{self.log_prefix}] 开始普通聊天")
await self.normal_chat_instance.start_chat() # start_chat now ensures init is called again if needed
return True
except Exception as e:
logger.error(f"{log_prefix} 启动 NormalChat 或其初始化时出错: {e}")
logger.error(f"[{self.log_prefix}] 启动 NormalChat 或其初始化时出错: {e}")
logger.error(traceback.format_exc())
self.normal_chat_instance = None # 启动/初始化失败,清理实例
return False
@@ -173,68 +169,36 @@ class SubHeartflow:
async def _stop_heart_fc_chat(self):
"""停止并清理 HeartFChatting 实例"""
if self.heart_fc_instance:
logger.debug(f"{self.log_prefix} 结束专注聊天...")
if self.heart_fc_instance.running:
logger.info(f"{self.log_prefix} 结束专注聊天...")
try:
await self.heart_fc_instance.shutdown()
except Exception as e:
logger.error(f"{self.log_prefix} 关闭 HeartFChatting 实例时出错: {e}")
logger.error(traceback.format_exc())
finally:
# 无论是否成功关闭,都清理引用
self.heart_fc_instance = None
else:
logger.info(f"{self.log_prefix} 没有专注聊天实例,无需停止专注聊天")
async def _start_heart_fc_chat(self) -> bool:
"""启动 HeartFChatting 实例,确保 NormalChat 已停止"""
logger.debug(f"{self.log_prefix} 开始启动 HeartFChatting")
try:
# 确保普通聊天监控已停止
await self._stop_normal_chat()
self.interest_dict.clear()
log_prefix = self.log_prefix
# 如果实例已存在,检查其循环任务状态
if self.heart_fc_instance:
logger.debug(f"{log_prefix} HeartFChatting 实例已存在,检查状态")
# 如果任务已完成或不存在,则尝试重新启动
if self.heart_fc_instance._loop_task is None or self.heart_fc_instance._loop_task.done():
logger.info(f"{log_prefix} HeartFChatting 实例存在但循环未运行,尝试启动...")
try:
# 添加超时保护
await asyncio.wait_for(self.heart_fc_instance.start(), timeout=15.0)
logger.info(f"{log_prefix} HeartFChatting 循环已启动。")
return True
except Exception as e:
logger.error(f"{log_prefix} 尝试启动现有 HeartFChatting 循环时出错: {e}")
logger.error(traceback.format_exc())
# 出错时清理实例,准备重新创建
self.heart_fc_instance = None
else:
# 任务正在运行
logger.debug(f"{log_prefix} HeartFChatting 已在运行中。")
return True # 已经在运行
# 如果实例不存在,则创建并启动
logger.info(f"{log_prefix} 麦麦准备开始专注聊天...")
try:
logger.debug(f"{log_prefix} 创建新的 HeartFChatting 实例")
self.heart_fc_instance = HeartFChatting(
chat_id=self.subheartflow_id,
on_stop_focus_chat=self._handle_stop_focus_chat_request,
)
logger.debug(f"{log_prefix} 启动 HeartFChatting 实例")
# 添加超时保护
await asyncio.wait_for(self.heart_fc_instance.start(), timeout=15.0)
logger.debug(f"{log_prefix} 麦麦已成功进入专注聊天模式 (新实例已启动)。")
return True
except Exception as e:
logger.error(f"{log_prefix} 创建或启动 HeartFChatting 实例时出错: {e}")
logger.error(traceback.format_exc())
self.heart_fc_instance = None # 创建或初始化异常,清理实例
return False
# 如果任务已完成或不存在,则尝试重新启动
if self.heart_fc_instance._loop_task is None or self.heart_fc_instance._loop_task.done():
logger.info(f"{self.log_prefix} HeartFChatting 实例存在但循环未运行,尝试启动...")
try:
# 添加超时保护
await asyncio.wait_for(self.heart_fc_instance.start(), timeout=15.0)
logger.info(f"{self.log_prefix} HeartFChatting 循环已启动。")
return True
except Exception as e:
logger.error(f"{self.log_prefix} 尝试启动现有 HeartFChatting 循环时出错: {e}")
logger.error(traceback.format_exc())
# 出错时清理实例,准备重新创建
self.heart_fc_instance = None
else:
# 任务正在运行
logger.debug(f"{self.log_prefix} HeartFChatting 已在运行中。")
return True # 已经在运行
except Exception as e:
logger.error(f"{self.log_prefix} _start_heart_fc_chat 执行时出错: {e}")
@@ -248,39 +212,36 @@ class SubHeartflow:
"""
current_state = self.chat_state.chat_status
state_changed = False
log_prefix = f"[{self.log_prefix}]"
if new_state == ChatState.NORMAL:
logger.debug(f"{log_prefix} 准备进入 normal聊天 状态")
if await self._start_normal_chat():
logger.debug(f"{log_prefix} 成功进入或保持 NormalChat 状态。")
state_changed = True
else:
logger.error(f"{log_prefix} 启动 NormalChat 失败,无法进入 CHAT 状态。")
# 启动失败时,保持当前状态
if self.normal_chat_instance.running:
logger.info(f"{self.log_prefix} 当前状态已经为normal")
return
else:
if await self._start_normal_chat():
logger.debug(f"{self.log_prefix} 成功进入或保持 NormalChat 状态。")
state_changed = True
else:
logger.error(f"{self.log_prefix} 启动 NormalChat 失败,无法进入 CHAT 状态。")
return
elif new_state == ChatState.FOCUSED:
logger.debug(f"{log_prefix} 准备进入 focus聊天 状态")
if self.heart_fc_instance.running:
logger.info(f"{self.log_prefix} 当前状态已经为focused")
return
if await self._start_heart_fc_chat():
logger.debug(f"{log_prefix} 成功进入或保持 HeartFChatting 状态。")
logger.debug(f"{self.log_prefix} 成功进入或保持 HeartFChatting 状态。")
state_changed = True
else:
logger.error(f"{log_prefix} 启动 HeartFChatting 失败,无法进入 FOCUSED 状态。")
logger.error(f"{self.log_prefix} 启动 HeartFChatting 失败,无法进入 FOCUSED 状态。")
# 启动失败时,保持当前状态
return
elif new_state == ChatState.ABSENT:
logger.info(f"{log_prefix} 进入 ABSENT 状态,停止所有聊天活动...")
self.interest_dict.clear()
await self._stop_normal_chat()
await self._stop_heart_fc_chat()
state_changed = True
# --- 记录focus模式退出时间 ---
if state_changed and current_state == ChatState.FOCUSED and new_state != ChatState.FOCUSED:
self.last_focus_exit_time = time.time()
logger.debug(f"{log_prefix} 记录focus模式退出时间: {self.last_focus_exit_time}")
logger.debug(f"{self.log_prefix} 记录focus模式退出时间: {self.last_focus_exit_time}")
# --- 更新状态和最后活动时间 ---
if state_changed:
@@ -292,7 +253,7 @@ class SubHeartflow:
self.chat_state_changed_time = time.time()
else:
logger.debug(
f"{log_prefix} 尝试将状态从 {current_state.value} 变为 {new_state.value},但未成功或未执行更改。"
f"{self.log_prefix} 尝试将状态从 {current_state.value} 变为 {new_state.value},但未成功或未执行更改。"
)
def add_message_to_normal_chat_cache(self, message: MessageRecv, interest_value: float, is_mentioned: bool):