Files
Mofox-Core/src/heart_flow/heartflow.py
2025-04-23 00:55:33 +08:00

898 lines
47 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from src.heart_flow.sub_heartflow import SubHeartflow, ChattingObservation, ChatState
from src.plugins.moods.moods import MoodManager
from src.plugins.models.utils_model import LLMRequest
from src.config.config import global_config
from src.plugins.schedule.schedule_generator import bot_schedule
from src.plugins.utils.prompt_builder import Prompt, global_prompt_manager
import asyncio
from src.common.logger import get_module_logger, LogConfig, HEARTFLOW_STYLE_CONFIG # 修改
from src.individuality.individuality import Individuality
import time
import random
from typing import Dict, Any, Optional, TYPE_CHECKING
import traceback
import enum
import os # 新增
import json # 新增
from src.plugins.chat.chat_stream import chat_manager # 新增
# --- Add imports for merged dependencies ---
from src.plugins.heartFC_chat.heartFC_generator import ResponseGenerator
from src.do_tool.tool_use import ToolUser
from src.plugins.chat.emoji_manager import emoji_manager # Module instance
from src.plugins.person_info.relationship_manager import relationship_manager # Module instance
# --- End imports ---
heartflow_config = LogConfig(
# 使用海马体专用样式
console_format=HEARTFLOW_STYLE_CONFIG["console_format"],
file_format=HEARTFLOW_STYLE_CONFIG["file_format"],
)
logger = get_module_logger("heartflow", config=heartflow_config)
# Type hinting for circular dependency
if TYPE_CHECKING:
from src.heart_flow.sub_heartflow import SubHeartflow, ChatState # Keep SubHeartflow here too
# from src.plugins.heartFC_chat.heartFC_controler import HeartFCController # No longer needed
def init_prompt():
prompt = ""
prompt += "你刚刚在做的事情是:{schedule_info}\n"
prompt += "{personality_info}\n"
prompt += "你想起来{related_memory_info}"
prompt += "刚刚你的主要想法是{current_thinking_info}"
prompt += "你还有一些小想法,因为你在参加不同的群聊天,这是你正在做的事情:{sub_flows_info}\n"
prompt += "你现在{mood_info}"
prompt += "现在你接下去继续思考,产生新的想法,但是要基于原有的主要想法,不要分点输出,"
prompt += "输出连贯的内心独白,不要太长,但是记得结合上述的消息,关注新内容:"
Prompt(prompt, "thinking_prompt")
prompt = ""
prompt += "{personality_info}\n"
prompt += "现在{bot_name}的想法是:{current_mind}\n"
prompt += "现在{bot_name}在qq群里进行聊天聊天的话题如下{minds_str}\n"
prompt += "你现在{mood_info}\n"
prompt += """现在请你总结这些聊天内容,注意关注聊天内容对原有的想法的影响,输出连贯的内心独白
不要太长,但是记得结合上述的消息,要记得你的人设,关注新内容:"""
Prompt(prompt, "mind_summary_prompt")
# --- 新增:从 interest.py 移动过来的常量 ---
LOG_DIRECTORY = "logs/interest"
HISTORY_LOG_FILENAME = "interest_history.log"
CLEANUP_INTERVAL_SECONDS = 1200 # 清理任务运行间隔 (例如20分钟) - 保持与 interest.py 一致
INACTIVE_THRESHOLD_SECONDS = 1200 # 不活跃时间阈值 (例如20分钟) - 保持与 interest.py 一致
LOG_INTERVAL_SECONDS = 3 # 日志记录间隔 (例如3秒) - 保持与 interest.py 一致
# --- 结束新增常量 ---
# --- 新增:状态更新常量 ---
STATE_UPDATE_INTERVAL_SECONDS = 30 # 状态更新检查间隔(秒)
FIVE_MINUTES = 1 * 60
FIFTEEN_MINUTES = 5 * 60
TWENTY_MINUTES = 10 * 60
# --- 结束新增常量 ---
# 新增 ChatStatus 枚举
class MaiState(enum.Enum):
"""
聊天状态:
OFFLINE: 不在线:回复概率极低,不会进行任何聊天
PEEKING: 看一眼手机:回复概率较低,会进行一些普通聊天
NORMAL_CHAT: 正常聊天:回复概率较高,会进行一些普通聊天和少量的专注聊天
FOCUSED_CHAT: 专注聊天:回复概率极高,会进行专注聊天和少量的普通聊天
"""
OFFLINE = "不在线"
PEEKING = "看一眼手机"
NORMAL_CHAT = "正常聊天"
FOCUSED_CHAT = "专注聊天"
def get_normal_chat_max_num(self):
if self == MaiState.OFFLINE:
return 0
elif self == MaiState.PEEKING:
return 1
elif self == MaiState.NORMAL_CHAT:
return 3
elif self == MaiState.FOCUSED_CHAT:
return 2
def get_focused_chat_max_num(self):
if self == MaiState.OFFLINE:
return 0
elif self == MaiState.PEEKING:
return 0
elif self == MaiState.NORMAL_CHAT:
return 1
elif self == MaiState.FOCUSED_CHAT:
return 2
class MaiStateInfo:
def __init__(self):
# 使用枚举类型初始化状态,默认为正常聊天
self.mai_status: MaiState = MaiState.OFFLINE
self.mai_status_history = [] # 历史状态,包含 状态,最后时间
self.last_status_change_time: float = time.time() # 新增:状态最后改变时间
self.last_5min_check_time: float = time.time() # 新增上次5分钟规则检查时间
self.normal_chatting = []
self.focused_chatting = []
self.mood_manager = MoodManager()
self.mood = self.mood_manager.get_prompt()
# 新增更新聊天状态的方法
def update_mai_status(self, new_status: MaiState):
"""更新聊天状态"""
if isinstance(new_status, MaiState) and new_status != self.mai_status: # 只有状态实际改变时才更新
self.mai_status = new_status
current_time = time.time()
self.last_status_change_time = current_time # 更新状态改变时间
self.last_5min_check_time = current_time # 重置5分钟检查计时器
# 将新状态和时间戳添加到历史记录
self.mai_status_history.append((new_status, current_time))
logger.info(f"麦麦状态更新为: {self.mai_status.value}")
elif not isinstance(new_status, MaiState):
logger.warning(f"尝试设置无效的麦麦状态: {new_status}")
# else: # 状态未改变,不处理
# pass
class Heartflow:
def __init__(self):
self.current_mind = "你什么也没想"
self.past_mind = []
self.current_state: MaiStateInfo = MaiStateInfo()
self.llm_model = LLMRequest(
model=global_config.llm_heartflow, temperature=0.6, max_tokens=1000, request_type="heart_flow"
)
self._subheartflows: Dict[Any, "SubHeartflow"] = {} # Update type hint
# --- Dependencies moved from HeartFCController ---
self.gpt_instance = ResponseGenerator()
self.mood_manager = (
MoodManager.get_instance()
) # Note: MaiStateInfo also has one, consider consolidating later if needed
self.tool_user_instance = ToolUser()
self.emoji_manager_instance = emoji_manager # Module instance
self.relationship_manager_instance = relationship_manager # Module instance
# --- End moved dependencies ---
# --- Background Task Management ---
self._history_log_file_path = os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME)
self._ensure_log_directory() # 初始化时确保目录存在
self._cleanup_task: Optional[asyncio.Task] = None
self._logging_task: Optional[asyncio.Task] = None
self._state_update_task: Optional[asyncio.Task] = None # 新增:状态更新任务
# 注意:衰减任务 (_decay_task) 不再需要,衰减在 SubHeartflow 的 InterestChatting 内部处理
# --- End moved dependencies ---
def _ensure_log_directory(self): # 新增方法 (从 InterestManager 移动)
"""确保日志目录存在"""
# 移除 try-except 块,根据用户要求
os.makedirs(LOG_DIRECTORY, exist_ok=True)
logger.info(f"Log directory '{LOG_DIRECTORY}' ensured.")
# except OSError as e:
# logger.error(f"Error creating log directory '{LOG_DIRECTORY}': {e}")
async def _periodic_cleanup_task(
self, interval_seconds: int, max_age_seconds: int
): # 新增方法 (从 InterestManager 移动和修改)
"""后台清理任务的异步函数"""
while True:
await asyncio.sleep(interval_seconds)
logger.info(f"[Heartflow] 运行定期清理 (间隔: {interval_seconds}秒)...")
self.cleanup_inactive_subheartflows(max_age_seconds=max_age_seconds) # 调用 Heartflow 自己的清理方法
async def _periodic_log_task(self, interval_seconds: int): # 新增方法 (从 InterestManager 移动和修改)
"""后台日志记录任务的异步函数 (记录所有子心流的兴趣历史数据)"""
while True:
await asyncio.sleep(interval_seconds)
try:
current_timestamp = time.time()
all_interest_states = await self.get_all_interest_states() # 获取所有子心流的兴趣状态
# 以追加模式打开历史日志文件
# 移除 try-except IO 块,根据用户要求
with open(self._history_log_file_path, "a", encoding="utf-8") as f:
count = 0
# 创建 items 快照以安全迭代
items_snapshot = list(all_interest_states.items())
for stream_id, state in items_snapshot:
# 从 chat_manager 获取 group_name
group_name = stream_id # 默认值
try:
chat_stream = chat_manager.get_stream(stream_id)
if chat_stream and chat_stream.group_info:
group_name = chat_stream.group_info.group_name
elif chat_stream and not chat_stream.group_info: # 处理私聊
group_name = (
f"私聊_{chat_stream.user_info.user_nickname}"
if chat_stream.user_info
else stream_id
)
except Exception:
# 不记录警告,避免刷屏,使用默认 stream_id 即可
# logger.warning(f"Could not get group name for stream_id {stream_id}: {e}")
pass # 静默处理
log_entry = {
"timestamp": round(current_timestamp, 2),
"stream_id": stream_id,
"interest_level": state.get("interest_level", 0.0), # 使用 get 获取,提供默认值
"group_name": group_name,
"reply_probability": state.get("current_reply_probability", 0.0), # 使用 get 获取
"is_above_threshold": state.get("is_above_threshold", False), # 使用 get 获取
}
# 将每个条目作为单独的 JSON 行写入
f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
count += 1
# logger.debug(f"[Heartflow] Successfully appended {count} interest history entries to {self._history_log_file_path}")
# except IOError as e:
# logger.error(f"[Heartflow] Error writing interest history log to {self._history_log_file_path}: {e}")
except Exception as e: # 保留对其他异常的捕获
logger.error(f"[Heartflow] Unexpected error during periodic history logging: {e}")
logger.error(traceback.format_exc()) # 记录 traceback
async def _periodic_state_update_task(self):
"""定期检查并更新 Mai 状态"""
while True:
await asyncio.sleep(STATE_UPDATE_INTERVAL_SECONDS)
try:
current_time = time.time()
# 获取更新前的状态
previous_status = self.current_state.mai_status
current_status = self.current_state.mai_status # 保持此行以进行后续逻辑
time_in_current_status = current_time - self.current_state.last_status_change_time
time_since_last_5min_check = current_time - self.current_state.last_5min_check_time
next_state = None # 预设下一状态为 None
# --- 状态转换逻辑 (保持不变) ---
# 1. 通用规则每5分钟检查 (对于非 OFFLINE 状态)
if time_since_last_5min_check >= FIVE_MINUTES:
self.current_state.last_5min_check_time = current_time # 重置5分钟检查计时器无论是否切换
if current_status != MaiState.OFFLINE:
if random.random() < 0.10: # 10% 概率切换到 OFFLINE
logger.debug(f"[Heartflow State] 触发5分钟规则{current_status.value} 切换到 OFFLINE")
next_state = MaiState.OFFLINE # 设置 next_state 而不是直接更新
# self.current_state.update_mai_status(MaiState.OFFLINE)
# continue # 状态已改变,进入下一轮循环
# 2. 状态持续时间规则 (仅在未被5分钟规则覆盖时执行)
if next_state is None: # 仅当5分钟规则未触发切换时检查持续时间
if current_status == MaiState.OFFLINE:
# OFFLINE 状态下检查是否已持续5分钟
if time_in_current_status >= FIVE_MINUTES:
weights = [35, 35, 30]
choices_list = [MaiState.PEEKING, MaiState.NORMAL_CHAT, MaiState.OFFLINE]
next_state_candidate = random.choices(choices_list, weights=weights, k=1)[0]
if next_state_candidate != MaiState.OFFLINE:
next_state = next_state_candidate
logger.debug(f"[Heartflow State] OFFLINE 持续时间达到,切换到 {next_state.value}")
else:
# 保持 OFFLINE重置计时器以开始新的5分钟计时
logger.debug("[Heartflow State] OFFLINE 持续时间达到,保持 OFFLINE重置计时器")
self.current_state.last_status_change_time = current_time
self.current_state.last_5min_check_time = current_time # 保持一致
# 显式将 next_state 设为 OFFLINE 以便后续处理
next_state = MaiState.OFFLINE
elif current_status == MaiState.PEEKING:
if time_in_current_status >= FIVE_MINUTES: # PEEKING 最多持续 5 分钟
weights = [50, 30, 20]
choices_list = [MaiState.OFFLINE, MaiState.NORMAL_CHAT, MaiState.FOCUSED_CHAT]
next_state = random.choices(choices_list, weights=weights, k=1)[0]
logger.debug(f"[Heartflow State] PEEKING 持续时间达到,切换到 {next_state.value}")
elif current_status == MaiState.NORMAL_CHAT:
if time_in_current_status >= FIFTEEN_MINUTES: # NORMAL_CHAT 最多持续 15 分钟
weights = [50, 50]
choices_list = [MaiState.OFFLINE, MaiState.FOCUSED_CHAT]
next_state = random.choices(choices_list, weights=weights, k=1)[0]
logger.debug(f"[Heartflow State] NORMAL_CHAT 持续时间达到,切换到 {next_state.value}")
elif current_status == MaiState.FOCUSED_CHAT:
if time_in_current_status >= TWENTY_MINUTES: # FOCUSED_CHAT 最多持续 20 分钟
weights = [80, 20]
choices_list = [MaiState.OFFLINE, MaiState.NORMAL_CHAT]
next_state = random.choices(choices_list, weights=weights, k=1)[0]
logger.debug(f"[Heartflow State] FOCUSED_CHAT 持续时间达到,切换到 {next_state.value}")
# --- 状态转换逻辑结束 ---
# --- 更新状态并执行相关操作 --- #
if next_state is not None:
# 检查状态是否真的改变了
if next_state != previous_status:
logger.info(f"[Heartflow] 准备从 {previous_status.value} 转换状态到 {next_state.value}")
self.current_state.update_mai_status(next_state)
# 在状态改变后,强制执行子心流数量限制 (保持)
await self._enforce_subheartflow_limits(next_state)
# --- 新增逻辑:根据状态转换调整子心流 --- #
if previous_status == MaiState.OFFLINE and next_state != MaiState.OFFLINE:
logger.info("[Heartflow] 主状态从 OFFLINE 激活,尝试激活子心流到 CHAT 状态。")
await self._activate_random_subflows_to_chat(next_state)
elif next_state == MaiState.OFFLINE and previous_status != MaiState.OFFLINE:
logger.info("[Heartflow] 主状态变为 OFFLINE停用所有子心流活动。")
await self._deactivate_all_subflows_on_offline()
# --- 结束新增逻辑 --- #
elif next_state == MaiState.OFFLINE and previous_status == MaiState.OFFLINE:
# 如果决定保持 OFFLINE 状态(例如,因为随机选择或持续时间规则),并且之前已经是 OFFLINE
# 确保计时器被重置 (这在上面的持续时间规则中已处理,但为了清晰再次确认)
if time_in_current_status >= FIVE_MINUTES:
# 确保计时器已在上面重置,这里无需操作,只记录日志
logger.debug("[Heartflow State] 保持 OFFLINE 状态,计时器已重置。")
pass # 无需状态转换,也无需调用激活/停用逻辑
# --- 如果没有确定 next_state (即没有触发任何切换规则) --- #
# logger.debug(f"[Heartflow State] 状态未改变,保持 {current_status.value}") # 减少日志噪音
# --- Integrated Interest Evaluation Logic (formerly in Controller loop) ---
if self.current_state.mai_status != MaiState.OFFLINE:
try:
# Use snapshot for safe iteration
subflows_snapshot = list(self._subheartflows.values())
evaluated_count = 0
promoted_count = 0
for sub_hf in subflows_snapshot:
# Double-check if subflow still exists and is in CHAT state
if sub_hf.subheartflow_id in self._subheartflows and sub_hf.chat_state.chat_status == ChatState.CHAT:
evaluated_count += 1
if await sub_hf.should_evaluate_reply():
stream_name = chat_manager.get_stream_name(sub_hf.subheartflow_id) or sub_hf.subheartflow_id
log_prefix = f"[{stream_name}]"
logger.info(f"{log_prefix} 兴趣概率触发,尝试将状态从 CHAT 提升到 FOCUSED")
# set_chat_state handles limit checks and HeartFChatting creation internally
await sub_hf.set_chat_state(ChatState.FOCUSED)
# Check if state actually changed (set_chat_state might block due to limits)
if sub_hf.chat_state.chat_status == ChatState.FOCUSED:
promoted_count += 1
# else: # No need to log every non-trigger event
# logger.trace(f"[{sub_hf.subheartflow_id}] In CHAT state, but should_evaluate_reply returned False.")
if evaluated_count > 0:
logger.debug(
f"[Heartflow Interest Eval] Evaluated {evaluated_count} CHAT flows. Promoted {promoted_count} to FOCUSED."
)
except Exception as e:
logger.error(f"[Heartflow] 兴趣评估任务出错: {e}")
logger.error(traceback.format_exc())
# --- End Integrated Interest Evaluation ---
except Exception as e:
logger.error(f"[Heartflow] 状态更新任务出错: {e}")
logger.error(traceback.format_exc())
logger.info(f"当前状态:{self.current_state.mai_status.value}")
async def get_all_interest_states(self) -> Dict[str, Dict]: # <-- Make async
"""获取所有活跃子心流的当前兴趣状态"""
states = {}
# 创建副本以避免在迭代时修改字典
items_snapshot = list(self._subheartflows.items()) # Make a copy for safe iteration
tasks = []
results = {}
# Create tasks to get state concurrently
for stream_id, subheartflow in items_snapshot:
# Ensure subheartflow still exists before creating task
if stream_id in self._subheartflows:
tasks.append(asyncio.create_task(subheartflow.get_interest_state(), name=f"get_state_{stream_id}"))
else:
logger.warning(f"[Heartflow] Subheartflow {stream_id} disappeared before getting state.")
# Wait for all tasks to complete
if tasks:
done, pending = await asyncio.wait(tasks, timeout=5.0) # Add a timeout
if pending:
logger.warning(f"[Heartflow] Getting interest states timed out for {len(pending)} tasks.")
for task in pending:
task.cancel()
for task in done:
stream_id = task.get_name().split("_")[-1] # Extract stream_id from task name
try:
result = task.result()
results[stream_id] = result
except asyncio.CancelledError:
logger.warning(f"[Heartflow] Task to get interest state for {stream_id} was cancelled (timeout).")
except Exception as e:
logger.warning(f"[Heartflow] Error getting interest state for subheartflow {stream_id}: {e}")
return results
def cleanup_inactive_subheartflows(self, max_age_seconds=INACTIVE_THRESHOLD_SECONDS): # 修改此方法以使用兴趣时间
"""
清理长时间不活跃的子心流记录 (基于兴趣交互时间)
max_age_seconds: 超过此时间未通过兴趣系统交互的将被清理
"""
current_time = time.time()
keys_to_remove = []
_initial_count = len(self._subheartflows)
# 创建副本以避免在迭代时修改字典
items_snapshot = list(self._subheartflows.items())
for subheartflow_id, subheartflow in items_snapshot:
should_remove = False
reason = ""
# 检查 InterestChatting 的最后交互时间
last_interaction = subheartflow.interest_chatting.last_interaction_time
if max_age_seconds is not None and (current_time - last_interaction) > max_age_seconds:
should_remove = True
reason = (
f"interest inactive time ({current_time - last_interaction:.0f}s) > max age ({max_age_seconds}s)"
)
if should_remove:
keys_to_remove.append(subheartflow_id)
stream_name = chat_manager.get_stream_name(subheartflow_id) or subheartflow_id # 获取流名称
logger.debug(f"[Heartflow] Marking stream {stream_name} for removal. Reason: {reason}")
# 标记子心流让其后台任务停止 (如果其后台任务还在运行)
subheartflow.should_stop = True
if keys_to_remove:
logger.info(f"[Heartflow] 清理识别到 {len(keys_to_remove)} 个不活跃的流。")
for key in keys_to_remove:
if key in self._subheartflows:
# 尝试取消子心流的后台任务
task_to_cancel = self._subheartflows[key].task
if task_to_cancel and not task_to_cancel.done():
task_to_cancel.cancel()
logger.debug(f"[Heartflow] Cancelled background task for subheartflow {key}")
# 从字典中删除
del self._subheartflows[key]
stream_name = chat_manager.get_stream_name(key) or key # 获取流名称
logger.debug(f"[Heartflow] 移除了流: {stream_name}")
final_count = len(self._subheartflows) # 直接获取当前长度
logger.info(f"[Heartflow] 清理完成。移除了 {len(keys_to_remove)} 个流。当前数量: {final_count}")
else:
# logger.info(f"[Heartflow] 清理完成。没有流符合移除条件。当前数量: {initial_count}") # 减少日志噪音
pass
async def heartflow_start_working(self):
# 启动清理任务 (使用新的 periodic_cleanup_task)
if self._cleanup_task is None or self._cleanup_task.done():
self._cleanup_task = asyncio.create_task(
self._periodic_cleanup_task(
interval_seconds=CLEANUP_INTERVAL_SECONDS,
max_age_seconds=INACTIVE_THRESHOLD_SECONDS,
)
)
logger.info(
f"[Heartflow] 已创建定期清理任务。间隔: {CLEANUP_INTERVAL_SECONDS}s, 不活跃阈值: {INACTIVE_THRESHOLD_SECONDS}s"
)
else:
logger.warning("[Heartflow] 跳过创建清理任务: 任务已在运行或存在。")
# 启动日志任务 (使用新的 periodic_log_task)
if self._logging_task is None or self._logging_task.done():
self._logging_task = asyncio.create_task(self._periodic_log_task(interval_seconds=LOG_INTERVAL_SECONDS))
logger.info(f"[Heartflow] 已创建定期日志任务。间隔: {LOG_INTERVAL_SECONDS}s")
else:
logger.warning("[Heartflow] 跳过创建日志任务: 任务已在运行或存在。")
# 新增:启动状态更新任务
if self._state_update_task is None or self._state_update_task.done():
self._state_update_task = asyncio.create_task(self._periodic_state_update_task())
logger.info(f"[Heartflow] 已创建定期状态更新任务。间隔: {STATE_UPDATE_INTERVAL_SECONDS}s")
else:
logger.warning("[Heartflow] 跳过创建状态更新任务: 任务已在运行或存在。")
# --- 新增:在启动时根据初始状态激活子心流 ---
if self.current_state.mai_status != MaiState.OFFLINE:
logger.info(f"[Heartflow] 初始状态为 {self.current_state.mai_status.value},执行初始子心流激活检查。")
# 使用 create_task 确保它不会阻塞 heartflow_start_working 的完成
# 传递当前状态给激活函数,以便它知道激活的限制
asyncio.create_task(self._activate_random_subflows_to_chat(self.current_state.mai_status))
# --- 结束新增逻辑 ---
@staticmethod
async def _update_current_state():
print("TODO")
async def do_a_thinking(self):
# logger.debug("麦麦大脑袋转起来了")
# 开始构建prompt
prompt_personality = ""
# person
individuality = Individuality.get_instance()
personality_core = individuality.personality.personality_core
prompt_personality += personality_core
personality_sides = individuality.personality.personality_sides
# 检查列表是否为空
if personality_sides:
random.shuffle(personality_sides)
prompt_personality += f",{personality_sides[0]}"
identity_detail = individuality.identity.identity_detail
# 检查列表是否为空
if identity_detail:
random.shuffle(identity_detail)
prompt_personality += f",{identity_detail[0]}"
personality_info = prompt_personality
current_thinking_info = self.current_mind
mood_info = self.current_state.mood
related_memory_info = "memory" # TODO: 替换为实际的记忆获取逻辑
try:
sub_flows_info = await self.get_all_subheartflows_minds_summary() # 修改为调用汇总方法
except Exception as e:
logger.error(f"[Heartflow] 获取子心流想法汇总失败: {e}")
logger.error(traceback.format_exc())
sub_flows_info = "(获取子心流想法时出错)" # 提供默认值
schedule_info = bot_schedule.get_current_num_task(num=4, time_info=True)
prompt = (await global_prompt_manager.get_prompt_async("thinking_prompt")).format(
schedule_info=schedule_info, # 使用关键字参数确保正确格式化
personality_info=personality_info,
related_memory_info=related_memory_info,
current_thinking_info=current_thinking_info,
sub_flows_info=sub_flows_info,
mood_info=mood_info,
)
try:
response, reasoning_content = await self.llm_model.generate_response_async(prompt)
if not response:
logger.warning("[Heartflow] 内心独白 LLM 返回空结果。")
response = "(暂时没什么想法...)" # 提供默认想法
self.update_current_mind(response) # 更新主心流想法
logger.info(f"麦麦的总体脑内状态:{self.current_mind}")
# 更新所有子心流的主心流信息
items_snapshot = list(self._subheartflows.items()) # 创建快照
for _, subheartflow in items_snapshot:
subheartflow.main_heartflow_info = response
except Exception as e:
logger.error(f"[Heartflow] 内心独白获取失败: {e}")
logger.error(traceback.format_exc())
# 此处不返回,允许程序继续执行,但主心流想法未更新
def update_current_mind(self, response):
self.past_mind.append(self.current_mind)
self.current_mind = response
async def get_all_subheartflows_minds_summary(self): # 重命名并修改
"""获取所有子心流的当前想法,并进行汇总"""
sub_minds_list = []
# 创建快照
items_snapshot = list(self._subheartflows.items())
for _, subheartflow in items_snapshot:
sub_minds_list.append(subheartflow.current_mind)
if not sub_minds_list:
return "(当前没有活跃的子心流想法)"
minds_str = "\n".join([f"- {mind}" for mind in sub_minds_list]) # 格式化为列表
# 调用 LLM 进行汇总
return await self.minds_summary(minds_str)
async def minds_summary(self, minds_str):
"""使用 LLM 汇总子心流的想法字符串"""
# 开始构建prompt
prompt_personality = ""
individuality = Individuality.get_instance()
prompt_personality += individuality.personality.personality_core
if individuality.personality.personality_sides:
prompt_personality += f",{random.choice(individuality.personality.personality_sides)}" # 随机选一个
if individuality.identity.identity_detail:
prompt_personality += f",{random.choice(individuality.identity.identity_detail)}" # 随机选一个
personality_info = prompt_personality
mood_info = self.current_state.mood
bot_name = global_config.BOT_NICKNAME # 使用全局配置中的机器人昵称
prompt = (await global_prompt_manager.get_prompt_async("mind_summary_prompt")).format(
personality_info=personality_info, # 使用关键字参数
bot_name=bot_name,
current_mind=self.current_mind,
minds_str=minds_str,
mood_info=mood_info,
)
try:
response, reasoning_content = await self.llm_model.generate_response_async(prompt)
if not response:
logger.warning("[Heartflow] 想法汇总 LLM 返回空结果。")
return "(想法汇总失败...)"
return response
except Exception as e:
logger.error(f"[Heartflow] 想法汇总失败: {e}")
logger.error(traceback.format_exc())
return "(想法汇总时发生错误...)"
# --- Add helper method to count subflows by state --- #
def count_subflows_by_state(self, target_state: "ChatState") -> int:
"""Counts the number of subheartflows currently in the specified state."""
count = 0
# Use items() directly for read-only iteration if thread safety isn't a major concern here
# Or create snapshot if modification during iteration is possible elsewhere
items_snapshot = list(self._subheartflows.items())
for _, flow in items_snapshot:
# Check if flow still exists in the main dict in case it was removed concurrently
if flow.subheartflow_id in self._subheartflows and flow.chat_state.chat_status == target_state:
count += 1
return count
# --- End helper method --- #
async def create_subheartflow(self, subheartflow_id: Any) -> Optional["SubHeartflow"]:
"""
获取或创建一个新的SubHeartflow实例。
创建本身不受限因为初始状态是ABSENT。
限制将在状态转换时检查。
"""
existing_subheartflow = self._subheartflows.get(subheartflow_id)
if existing_subheartflow:
return existing_subheartflow
# logger.info(f"[Heartflow] 尝试创建新的 subheartflow: {subheartflow_id}")
try:
subheartflow = SubHeartflow(subheartflow_id, self)
# 创建并初始化观察对象
observation = ChattingObservation(subheartflow_id)
await observation.initialize()
subheartflow.add_observation(observation)
# 创建并存储后台任务 (SubHeartflow 自己的后台任务)
subheartflow.task = asyncio.create_task(subheartflow.subheartflow_start_working())
logger.debug(f"[Heartflow] 为 {subheartflow_id} 创建后台任务成功,添加 observation 成功")
# 添加到管理字典
self._subheartflows[subheartflow_id] = subheartflow
logger.info(f"[Heartflow] 添加 subheartflow {subheartflow_id} 成功")
return subheartflow
except Exception as e:
logger.error(f"[Heartflow] 创建 subheartflow {subheartflow_id} 失败: {e}")
logger.error(traceback.format_exc())
return None
def get_subheartflow(self, observe_chat_id: Any) -> Optional["SubHeartflow"]:
"""获取指定ID的SubHeartflow实例"""
return self._subheartflows.get(observe_chat_id)
def get_all_subheartflows_streams_ids(self) -> list[Any]:
"""获取当前所有活跃的子心流的 ID 列表"""
return list(self._subheartflows.keys())
async def _stop_subheartflow(self, subheartflow_id: Any, reason: str):
"""停止并移除指定的子心流,确保 HeartFChatting 被关闭"""
if subheartflow_id in self._subheartflows:
subheartflow = self._subheartflows[subheartflow_id]
stream_name = chat_manager.get_stream_name(subheartflow_id) or subheartflow_id
logger.info(f"[Heartflow Limits] 停止子心流 {stream_name}. 原因: {reason}")
# --- 新增:在取消任务和删除前,先设置状态为 ABSENT 以关闭 HeartFChatting ---
try:
if subheartflow.chat_state.chat_status != ChatState.ABSENT:
logger.debug(f"[Heartflow Limits] 将子心流 {stream_name} 状态设置为 ABSENT 以确保资源释放...")
await subheartflow.set_chat_state(ChatState.ABSENT) # 调用异步方法
else:
logger.debug(f"[Heartflow Limits] 子心流 {stream_name} 已经是 ABSENT 状态。")
except Exception as e:
logger.error(f"[Heartflow Limits] 在停止子心流 {stream_name} 时设置状态为 ABSENT 出错: {e}")
# 即使出错,仍继续尝试停止任务和移除
# --- 结束新增逻辑 ---
# 标记停止并取消任务
subheartflow.should_stop = True
task_to_cancel = subheartflow.task
if task_to_cancel and not task_to_cancel.done():
task_to_cancel.cancel()
logger.debug(f"[Heartflow Limits] 已取消子心流 {stream_name} 的后台任务")
# TODO: Ensure controller.stop_heartFC_chat is called if needed
# This is now handled by subheartflow.set_chat_state(ChatState.ABSENT) called in _stop_subheartflow
# from src.plugins.heartFC_chat.heartFC_controler import HeartFCController # Local import to avoid cycle
# controller = HeartFCController.get_instance()
# if controller and controller.is_heartFC_chat_active(subheartflow_id):
# await controller.stop_heartFC_chat(subheartflow_id)
# 从字典移除
del self._subheartflows[subheartflow_id]
logger.debug(f"[Heartflow Limits] 已移除子心流: {stream_name}")
return True
return False
async def _enforce_subheartflow_limits(self, current_mai_state: MaiState):
"""根据当前的 MaiState 强制执行 SubHeartflow 数量限制"""
normal_limit = current_mai_state.get_normal_chat_max_num()
focused_limit = current_mai_state.get_focused_chat_max_num()
logger.debug(
f"[Heartflow Limits] 执行限制检查。当前状态: {current_mai_state.value}, Normal上限: {normal_limit}, Focused上限: {focused_limit}"
)
# 分类并统计当前 subheartflows
normal_flows = []
focused_flows = []
other_flows = [] # e.g., ABSENT
# 创建快照以安全迭代
items_snapshot = list(self._subheartflows.items())
for flow_id, flow in items_snapshot:
# 确保 flow 实例仍然存在 (避免在迭代期间被其他任务移除)
if flow_id not in self._subheartflows:
continue
if flow.chat_state.chat_status == ChatState.CHAT:
normal_flows.append((flow_id, flow.last_active_time))
elif flow.chat_state.chat_status == ChatState.FOCUSED:
focused_flows.append((flow_id, flow.last_active_time))
else:
other_flows.append((flow_id, flow.last_active_time))
logger.debug(
f"[Heartflow Limits] 当前计数 - Normal: {len(normal_flows)}, Focused: {len(focused_flows)}, Other: {len(other_flows)}"
)
stopped_count = 0
# 检查 Normal (CHAT) 限制
if len(normal_flows) > normal_limit:
excess_count = len(normal_flows) - normal_limit
logger.info(f"[Heartflow Limits] 检测到 Normal (CHAT) 状态超额 {excess_count} 个。上限: {normal_limit}")
# 按 last_active_time 升序排序 (最不活跃的在前)
normal_flows.sort(key=lambda item: item[1])
# 停止最不活跃的超额部分
for i in range(excess_count):
flow_id_to_stop = normal_flows[i][0]
if await self._stop_subheartflow(
flow_id_to_stop, f"Normal (CHAT) 状态超出上限 ({normal_limit}),停止最不活跃的实例"
):
stopped_count += 1
# 重新获取 focused_flows 列表,因为上面的停止操作可能已经改变了状态或移除了实例
focused_flows = []
items_snapshot_after_normal = list(self._subheartflows.items())
for flow_id, flow in items_snapshot_after_normal:
if flow_id not in self._subheartflows:
continue # Double check
if flow.chat_state.chat_status == ChatState.FOCUSED:
focused_flows.append((flow_id, flow.last_active_time))
# 检查 Focused (FOCUSED) 限制
if len(focused_flows) > focused_limit:
excess_count = len(focused_flows) - focused_limit
logger.info(
f"[Heartflow Limits] 检测到 Focused (FOCUSED) 状态超额 {excess_count} 个。上限: {focused_limit}"
)
# 按 last_active_time 升序排序
focused_flows.sort(key=lambda item: item[1])
# 停止最不活跃的超额部分
for i in range(excess_count):
flow_id_to_stop = focused_flows[i][0]
if await self._stop_subheartflow(
flow_id_to_stop, f"Focused (FOCUSED) 状态超出上限 ({focused_limit}),停止最不活跃的实例"
):
stopped_count += 1
if stopped_count > 0:
logger.info(
f"[Heartflow Limits] 限制执行完成,共停止了 {stopped_count} 个子心流。当前总数: {len(self._subheartflows)}"
)
else:
logger.debug(f"[Heartflow Limits] 限制检查完成,无需停止子心流。当前总数: {len(self._subheartflows)}")
# --- 新增方法 --- #
async def _activate_random_subflows_to_chat(self, new_mai_state: MaiState):
"""当主状态从 OFFLINE 激活时,随机选择子心流进入 CHAT 状态"""
limit = new_mai_state.get_normal_chat_max_num()
if limit <= 0:
logger.info("[Heartflow Activate] 当前状态不允许 CHAT 子心流,跳过激活。")
return
# 使用快照进行迭代
all_flows_snapshot = list(self._subheartflows.values())
absent_flows = [
flow
for flow in all_flows_snapshot
if flow.subheartflow_id in self._subheartflows and flow.chat_state.chat_status == ChatState.ABSENT
]
num_to_activate = min(limit, len(absent_flows))
if num_to_activate <= 0:
logger.info(f"[Heartflow Activate] 没有处于 ABSENT 状态的子心流可供激活至 CHAT (上限: {limit})。")
return
logger.info(
f"[Heartflow Activate] 将随机选择 {num_to_activate} 个 (上限 {limit}) ABSENT 子心流激活至 CHAT 状态。"
)
selected_flows = random.sample(absent_flows, num_to_activate)
activated_count = 0
for flow in selected_flows:
# 再次检查 flow 是否仍然存在且状态为 ABSENT (以防并发修改)
if (
flow.subheartflow_id in self._subheartflows
and self._subheartflows[flow.subheartflow_id].chat_state.chat_status == ChatState.ABSENT
):
stream_name = chat_manager.get_stream_name(flow.subheartflow_id) or flow.subheartflow_id
logger.debug(f"[Heartflow Activate] 正在将子心流 {stream_name} 状态设置为 CHAT。")
# 调用 set_chat_state它内部会处理日志记录
await flow.set_chat_state(ChatState.CHAT)
activated_count += 1
else:
stream_name = chat_manager.get_stream_name(flow.subheartflow_id) or flow.subheartflow_id
logger.warning(f"[Heartflow Activate] 跳过激活子心流 {stream_name},因为它不再存在或状态已改变。")
logger.info(f"[Heartflow Activate] 完成激活,成功将 {activated_count} 个子心流设置为 CHAT 状态。")
async def _deactivate_all_subflows_on_offline(self):
"""当主状态变为 OFFLINE 时,停止所有子心流的活动并设置为 ABSENT"""
logger.info("[Heartflow Deactivate] 开始停用所有子心流...")
try:
# TODO: Ensure controller.stop_heartFC_chat is called if needed
# This is now handled by subheartflow.set_chat_state(ChatState.ABSENT) called in _stop_subheartflow
# from src.plugins.heartFC_chat.heartFC_controler import HeartFCController # Local import to avoid cycle
# controller = HeartFCController.get_instance()
# if controller and controller.is_heartFC_chat_active(flow_id):
# await controller.stop_heartFC_chat(flow_id)
# 使用 ID 快照进行迭代
flow_ids_snapshot = list(self._subheartflows.keys())
deactivated_count = 0
for flow_id in flow_ids_snapshot:
subflow = self._subheartflows.get(flow_id)
if not subflow:
continue # Subflow 可能在迭代过程中被清理
stream_name = chat_manager.get_stream_name(flow_id) or flow_id
try:
# 停止相关聊天进程 (例如 pf_chat)
# TODO: 确认是否有 reason_chat 需要停止,并添加相应逻辑
# if controller:
# if controller.is_heartFC_chat_active(flow_id):
# logger.debug(f"[Heartflow Deactivate] 正在停止子心流 {stream_name} 的 heartFC_chat。")
# await controller.stop_heartFC_chat(flow_id)
# 设置状态为 ABSENT
if subflow.chat_state.chat_status != ChatState.ABSENT:
logger.debug(f"[Heartflow Deactivate] 正在将子心流 {stream_name} 状态设置为 ABSENT。")
# 调用 set_chat_state它会处理日志和状态更新
subflow.set_chat_state(ChatState.ABSENT)
deactivated_count += 1
else:
# 如果已经是 ABSENT则无需再次设置但记录一下检查
logger.trace(f"[Heartflow Deactivate] 子心流 {stream_name} 已处于 ABSENT 状态。")
except Exception as e:
logger.error(f"[Heartflow Deactivate] 停用子心流 {stream_name} 时出错: {e}")
logger.error(traceback.format_exc())
logger.info(
f"[Heartflow Deactivate] 完成停用,共将 {deactivated_count} 个子心流设置为 ABSENT 状态 (不包括已是 ABSENT 的)。"
)
except Exception as e:
logger.error(f"[Heartflow Deactivate] 停用所有子心流时出错: {e}")
logger.error(traceback.format_exc())
init_prompt()
# 创建一个全局的管理器实例
heartflow = Heartflow()