This commit is contained in:
SengokuCola
2025-04-23 15:32:24 +08:00
parent b4054544c4
commit d241e1922a
3 changed files with 50 additions and 61 deletions

View File

@@ -240,7 +240,7 @@ class SubHeartflow:
# 基础属性
self.subheartflow_id = subheartflow_id
self.chat_id = subheartflow_id
self.mai_states = mai_states
# 思维状态相关
@@ -271,11 +271,10 @@ class SubHeartflow:
max_tokens=800,
request_type="sub_heart_flow",
)
self.gpt_instance = ResponseGenerator() # 响应生成器
self.tool_user_instance = ToolUser() # 工具使用模块
self.log_prefix = chat_manager.get_stream_name(self.subheartflow_id) or self.subheartflow_id
async def set_chat_state(self, new_state: "ChatState", current_states_num: tuple = ()):
@@ -286,7 +285,7 @@ class SubHeartflow:
logger.trace(f"{self.log_prefix} 状态已为 {current_state.value}, 无需更改。")
return
log_prefix = self.log_prefix # 使用实例属性
log_prefix = self.log_prefix # 使用实例属性
current_mai_state = self.mai_states.get_current_state()
# --- 状态转换逻辑 ---
@@ -294,22 +293,21 @@ class SubHeartflow:
normal_limit = current_mai_state.get_normal_chat_max_num()
current_chat_count = current_states_num[1]
if current_chat_count >= normal_limit and current_state != ChatState.CHAT: # 仅在状态转换时检查限制
if current_chat_count >= normal_limit and current_state != ChatState.CHAT: # 仅在状态转换时检查限制
logger.debug(
f"{log_prefix} 麦麦不能从 {current_state.value} 转换到 聊天。原因:聊不过来了 ({current_chat_count}/{normal_limit})"
)
return # 阻止状态转换
return # 阻止状态转换
else:
logger.debug(f"{log_prefix} 麦麦可以进入或保持 聊天 状态 ({current_chat_count}/{normal_limit})")
if current_state == ChatState.FOCUSED and self.heart_fc_instance:
logger.info(f"{log_prefix} 麦麦不再专注聊天,转为随便水水...")
await self.heart_fc_instance.shutdown() # 正确关闭 HeartFChatting
await self.heart_fc_instance.shutdown() # 正确关闭 HeartFChatting
self.heart_fc_instance = None
chat_stream = chat_manager.get_stream(self.chat_id)
self.normal_chat_instance = NormalChat(
chat_stream=chat_stream,
interest_dict=self.get_interest_dict()
chat_stream=chat_stream, interest_dict=self.get_interest_dict()
)
await self.normal_chat_instance.start_monitoring_interest()
# NormalChat 启动/停止逻辑将在下面处理
@@ -318,11 +316,11 @@ class SubHeartflow:
focused_limit = current_mai_state.get_focused_chat_max_num()
current_focused_count = current_states_num[2]
if current_focused_count >= focused_limit and current_state != ChatState.FOCUSED: # 仅在状态转换时检查限制
if current_focused_count >= focused_limit and current_state != ChatState.FOCUSED: # 仅在状态转换时检查限制
logger.debug(
f"{log_prefix} 麦麦不能从 {current_state.value} 转换到 专注的聊天,原因:聊不过来了。({current_focused_count}/{focused_limit})"
)
return # 阻止状态转换
return # 阻止状态转换
else:
logger.debug(f"{log_prefix} 麦麦可以进入或保持 专注聊天 状态 ({current_focused_count}/{focused_limit})")
if not self.heart_fc_instance:
@@ -330,39 +328,38 @@ class SubHeartflow:
try:
await self.normal_chat_instance.stop_monitoring_interest()
self.clear_interest_dict()
logger.info(f"{log_prefix} 停止 NormalChat 兴趣监控成功。")
except Exception as e:
logger.error(f"{log_prefix} 停止 NormalChat 兴趣监控时出错: {e}")
logger.error(traceback.format_exc())
try:
self.heart_fc_instance = HeartFChatting(
chat_id=self.chat_id,
gpt_instance=self.gpt_instance,
tool_user_instance=self.tool_user_instance,
)
if await self.heart_fc_instance._initialize():
await self.heart_fc_instance.add_time() # 初始化成功后添加初始时间
await self.heart_fc_instance.add_time() # 初始化成功后添加初始时间
logger.info(f"{log_prefix} 麦麦已成功进入专注聊天模式。")
else:
logger.error(
f"{log_prefix} 麦麦不能专注聊天,因为 HeartFChatting 初始化失败了,状态回滚到 {current_state.value}"
)
self.heart_fc_instance = None
return # 阻止进入 FOCUSED 状态
return # 阻止进入 FOCUSED 状态
except Exception as e:
logger.error(f"{log_prefix} 创建麦麦专注聊天实例时出错: {e}")
logger.error(traceback.format_exc())
self.heart_fc_instance = None
return # 创建实例异常,阻止进入 FOCUSED 状态
return # 创建实例异常,阻止进入 FOCUSED 状态
else:
# 已经是 FOCUSED 状态,或者 heart_fc_instance 已存在但未运行(不太可能)
if not self.heart_fc_instance._loop_active:
logger.warning(f"{log_prefix} HeartFChatting 实例存在但未激活,尝试重新激活...")
await self.heart_fc_instance.add_time() # 尝试添加时间以激活循环
await self.heart_fc_instance.add_time() # 尝试添加时间以激活循环
else:
logger.debug(f"{log_prefix} 麦麦已经在专注聊天中。")
# NormalChat 启动/停止逻辑将在下面处理
@@ -370,7 +367,7 @@ class SubHeartflow:
elif new_state == ChatState.ABSENT:
if current_state == ChatState.FOCUSED and self.heart_fc_instance:
logger.info(f"{log_prefix} 麦麦离开专注的聊天,撤退了.....")
await self.heart_fc_instance.shutdown() # 正确关闭 HeartFChatting
await self.heart_fc_instance.shutdown() # 正确关闭 HeartFChatting
self.heart_fc_instance = None
# NormalChat 启动/停止逻辑将在下面处理
@@ -385,7 +382,7 @@ class SubHeartflow:
if new_state == ChatState.ABSENT:
logger.info(f"{log_prefix} 状态变为 ABSENT停止 NormalChat 兴趣监控...")
await self.normal_chat_instance.stop_monitoring_interest()
else: # CHAT or FOCUSED
else: # CHAT or FOCUSED
logger.info(f"{log_prefix} 状态变为 {new_state.value},启动或确认 NormalChat 兴趣监控...")
await self.normal_chat_instance.start_monitoring_interest()
except Exception as e:
@@ -394,10 +391,9 @@ class SubHeartflow:
else:
logger.warning(f"{log_prefix} NormalChat 实例不可用,无法管理其监控任务。")
async def subheartflow_start_working(self):
"""启动子心流的后台任务
功能说明:
- 负责子心流的主要后台循环
- 每30秒检查一次停止标志
@@ -410,8 +406,6 @@ class SubHeartflow:
logger.info(f"{self.log_prefix} 子心流后台任务已停止。")
async def do_thinking_before_reply(
self,
extra_info: str,
@@ -547,9 +541,10 @@ class SubHeartflow:
def get_interest_dict(self) -> Dict[str, tuple[MessageRecv, float, bool]]:
return self.interest_chatting.interest_dict
def clear_interest_dict(self):
self.interest_chatting.interest_dict.clear()
async def shutdown(self):
"""安全地关闭子心流及其管理的任务"""
if self.should_stop:
@@ -557,7 +552,7 @@ class SubHeartflow:
return
logger.info(f"{self.log_prefix} 开始关闭子心流...")
self.should_stop = True # 标记为停止,让后台任务退出
self.should_stop = True # 标记为停止,让后台任务退出
# 停止 NormalChat 监控 (保持调用,确保清理)
if self.normal_chat_instance:
@@ -576,23 +571,23 @@ class SubHeartflow:
except Exception as e:
logger.error(f"{self.log_prefix} 关闭 HeartFChatting 实例时出错 (Shutdown): {e}")
logger.error(traceback.format_exc())
self.heart_fc_instance = None # 清理实例引用
self.heart_fc_instance = None # 清理实例引用
# 取消可能存在的旧后台任务 (self.task)
if self.task and not self.task.done():
logger.info(f"{self.log_prefix} 取消子心流主任务 (Shutdown)...")
self.task.cancel()
try:
await asyncio.wait_for(self.task, timeout=1.0) # 给点时间响应取消
await asyncio.wait_for(self.task, timeout=1.0) # 给点时间响应取消
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} 子心流主任务已取消 (Shutdown)。")
except asyncio.TimeoutError:
logger.warning(f"{self.log_prefix} 等待子心流主任务取消超时 (Shutdown)。")
except Exception as e:
logger.error(f"{self.log_prefix} 等待子心流主任务取消时发生错误 (Shutdown): {e}")
logger.error(f"{self.log_prefix} 等待子心流主任务取消时发生错误 (Shutdown): {e}")
self.task = None # 清理任务引用
self.chat_state.chat_status = ChatState.ABSENT # 状态重置为不参与
self.task = None # 清理任务引用
self.chat_state.chat_status = ChatState.ABSENT # 状态重置为不参与
logger.info(f"{self.log_prefix} 子心流关闭完成。")

View File

@@ -49,11 +49,11 @@ class SubHeartflowManager:
self, subheartflow_id: Any, mai_states: MaiStateInfo
) -> Optional["SubHeartflow"]:
"""获取或创建指定ID的子心流实例
Args:
subheartflow_id: 子心流唯一标识符
mai_states: 当前麦麦状态信息
Returns:
成功返回SubHeartflow实例失败返回None
"""
@@ -64,7 +64,7 @@ class SubHeartflowManager:
if subflow.should_stop:
logger.warning(f"尝试获取已停止的子心流 {subheartflow_id},正在重新激活")
subflow.should_stop = False # 重置停止标志
subflow.last_active_time = time.time() # 更新活跃时间
logger.debug(f"获取到已存在的子心流: {subheartflow_id}")
return subflow
@@ -74,18 +74,18 @@ class SubHeartflowManager:
try:
# 初始化子心流
new_subflow = SubHeartflow(subheartflow_id, mai_states)
# 添加聊天观察者
observation = ChattingObservation(chat_id=subheartflow_id)
new_subflow.add_observation(observation)
# 注册子心流
self.subheartflows[subheartflow_id] = new_subflow
logger.info(f"子心流 {subheartflow_id} 创建成功")
# 启动后台任务
asyncio.create_task(new_subflow.subheartflow_start_working())
return new_subflow
except Exception as e:
logger.error(f"创建子心流 {subheartflow_id} 失败: {e}", exc_info=True)
@@ -104,9 +104,11 @@ class SubHeartflowManager:
# 设置状态为ABSENT释放资源
if subheartflow.chat_state.chat_status != ChatState.ABSENT:
logger.debug(f"[子心流管理] 设置 {stream_name} 状态为ABSENT")
states_num =(self.count_subflows_by_state(ChatState.ABSENT),
self.count_subflows_by_state(ChatState.CHAT),
self.count_subflows_by_state(ChatState.FOCUSED))
states_num = (
self.count_subflows_by_state(ChatState.ABSENT),
self.count_subflows_by_state(ChatState.CHAT),
self.count_subflows_by_state(ChatState.FOCUSED),
)
await subheartflow.set_chat_state(ChatState.ABSENT, states_num)
else:
logger.debug(f"[子心流管理] {stream_name} 已是ABSENT状态")
@@ -223,10 +225,12 @@ class SubHeartflowManager:
continue
logger.debug(f"[激活] 正在激活子心流{stream_name}")
states_num =(self.count_subflows_by_state(ChatState.ABSENT),
self.count_subflows_by_state(ChatState.CHAT),
self.count_subflows_by_state(ChatState.FOCUSED))
states_num = (
self.count_subflows_by_state(ChatState.ABSENT),
self.count_subflows_by_state(ChatState.CHAT),
self.count_subflows_by_state(ChatState.FOCUSED),
)
await flow.set_chat_state(ChatState.CHAT, states_num)
@@ -272,9 +276,11 @@ class SubHeartflowManager:
should_promote = await sub_hf.should_evaluate_reply()
if should_promote:
logger.info(f"{log_prefix} 兴趣评估触发升级: CHAT -> FOCUSED")
states_num =(self.count_subflows_by_state(ChatState.ABSENT),
self.count_subflows_by_state(ChatState.CHAT),
self.count_subflows_by_state(ChatState.FOCUSED))
states_num = (
self.count_subflows_by_state(ChatState.ABSENT),
self.count_subflows_by_state(ChatState.CHAT),
self.count_subflows_by_state(ChatState.FOCUSED),
)
await sub_hf.set_chat_state(ChatState.FOCUSED, states_num)
if (
self.subheartflows.get(flow_id)
@@ -343,15 +349,6 @@ class SubHeartflowManager:
else:
logger.warning(f"尝试停用不存在的 SubHeartflow: {subheartflow_id}")
async def deactivate_all_subflows(self):
"""停用并移除所有子心流。"""
logger.info("正在停用所有 SubHeartflows...")
# 获取当前所有 ID避免在迭代时修改字典
all_ids = self.get_all_subheartflows_ids()
tasks = [self.deactivate_subflow(sub_id) for sub_id in all_ids]
await asyncio.gather(*tasks)
logger.info("所有 SubHeartflows 已停用。")
async def cleanup_inactive_subflows(self, inactive_threshold_seconds: int):
"""清理长时间不活跃的子心流。"""
current_time = time.time()