Files
Mofox-Core/src/heart_flow/sub_heartflow.py
2025-04-22 16:42:45 +00:00

516 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from .observation import Observation, ChattingObservation
import asyncio
from src.plugins.moods.moods import MoodManager
from src.plugins.models.utils_model import LLMRequest
from src.config.config import global_config
import time
from typing import Optional, List, Dict, Callable, TYPE_CHECKING
import traceback
from src.plugins.chat.utils import parse_text_timestamps
import enum
from src.common.logger import get_module_logger, LogConfig, SUB_HEARTFLOW_STYLE_CONFIG # noqa: E402
from src.individuality.individuality import Individuality
import random
from src.plugins.person_info.relationship_manager import relationship_manager
from ..plugins.utils.prompt_builder import Prompt, global_prompt_manager
from src.plugins.chat.message import MessageRecv
from src.plugins.chat.chat_stream import chat_manager
import math
from src.plugins.heartFC_chat.heartFC_chat import HeartFChatting
# Type hinting for circular dependency
if TYPE_CHECKING:
from .heartflow import Heartflow # Import Heartflow for type hinting
from .sub_heartflow import ChatState # Keep ChatState here too?
from src.plugins.heartFC_chat.heartFC_chat import HeartFChatting # <-- Add for type hint
# 定义常量 (从 interest.py 移动过来)
MAX_INTEREST = 15.0
subheartflow_config = LogConfig(
# 使用海马体专用样式
console_format=SUB_HEARTFLOW_STYLE_CONFIG["console_format"],
file_format=SUB_HEARTFLOW_STYLE_CONFIG["file_format"],
)
logger = get_module_logger("subheartflow", config=subheartflow_config)
interest_log_config = LogConfig(
console_format=SUB_HEARTFLOW_STYLE_CONFIG["console_format"],
file_format=SUB_HEARTFLOW_STYLE_CONFIG["file_format"],
)
interest_logger = get_module_logger("InterestChatting", config=interest_log_config)
def init_prompt():
prompt = ""
# prompt += f"麦麦的总体想法是:{self.main_heartflow_info}\n\n"
prompt += "{extra_info}\n"
# prompt += "{prompt_schedule}\n"
# prompt += "{relation_prompt_all}\n"
prompt += "{prompt_personality}\n"
prompt += "刚刚你的想法是:\n我是{bot_name},我想,{current_thinking_info}\n"
prompt += "-----------------------------------\n"
prompt += "现在是{time_now}你正在上网和qq群里的网友们聊天群里正在聊的话题是\n{chat_observe_info}\n"
prompt += "\n你现在{mood_info}\n"
# prompt += "你注意到{sender_name}刚刚说:{message_txt}\n"
prompt += "现在请你根据刚刚的想法继续思考,思考时可以想想如何对群聊内容进行回复,要不要对群里的话题进行回复,关注新话题,可以适当转换话题,大家正在说的话才是聊天的主题。\n"
prompt += "回复的要求是:平淡一些,简短一些,说中文,如果你要回复,最好只回复一个人的一个话题\n"
prompt += "请注意不要输出多余内容(包括前后缀,冒号和引号,括号, 表情,等),不要带有括号和动作描写。不要回复自己的发言,尽量不要说你说过的话。"
prompt += "现在请你{hf_do_next},不要分点输出,生成内心想法,文字不要浮夸"
Prompt(prompt, "sub_heartflow_prompt_before")
class ChatState(enum.Enum):
ABSENT = "不参与"
CHAT = "闲聊"
FOCUSED = "专注"
class ChatStateInfo:
def __init__(self):
self.willing = 0
self.chat_status: ChatState = ChatState.ABSENT
self.mood_manager = MoodManager()
self.mood = self.mood_manager.get_prompt()
base_reply_probability = 0.05
probability_increase_rate_per_second = 0.08
max_reply_probability = 1
class InterestChatting:
def __init__(
self,
decay_rate=global_config.default_decay_rate_per_second,
max_interest=MAX_INTEREST,
trigger_threshold=global_config.reply_trigger_threshold,
base_reply_probability=base_reply_probability,
increase_rate=probability_increase_rate_per_second,
decay_factor=global_config.probability_decay_factor_per_second,
max_probability=max_reply_probability,
state_change_callback: Optional[Callable[[ChatState], None]] = None,
):
self.interest_level: float = 0.0
self.last_update_time: float = time.time()
self.decay_rate_per_second: float = decay_rate
self.max_interest: float = max_interest
self.last_interaction_time: float = self.last_update_time
self.trigger_threshold: float = trigger_threshold
self.base_reply_probability: float = base_reply_probability
self.probability_increase_rate: float = increase_rate
self.probability_decay_factor: float = decay_factor
self.max_reply_probability: float = max_probability
self.current_reply_probability: float = 0.0
self.is_above_threshold: bool = False
self.state_change_callback = state_change_callback
self.interest_dict: Dict[str, tuple[MessageRecv, float, bool]] = {}
def add_interest_dict(self, message: MessageRecv, interest_value: float, is_mentioned: bool):
self.interest_dict[message.message_info.message_id] = (message, interest_value, is_mentioned)
self.last_interaction_time = time.time()
def _calculate_decay(self, current_time: float):
time_delta = current_time - self.last_update_time
if time_delta > 0:
old_interest = self.interest_level
if self.interest_level < 1e-9:
self.interest_level = 0.0
else:
if self.decay_rate_per_second <= 0:
interest_logger.warning(
f"InterestChatting encountered non-positive decay rate: {self.decay_rate_per_second}. Setting interest to 0."
)
self.interest_level = 0.0
elif self.interest_level < 0:
interest_logger.warning(
f"InterestChatting encountered negative interest level: {self.interest_level}. Setting interest to 0."
)
self.interest_level = 0.0
else:
try:
decay_factor = math.pow(self.decay_rate_per_second, time_delta)
self.interest_level *= decay_factor
except ValueError as e:
interest_logger.error(
f"Math error during decay calculation: {e}. Rate: {self.decay_rate_per_second}, Delta: {time_delta}, Level: {self.interest_level}. Setting interest to 0."
)
self.interest_level = 0.0
if old_interest != self.interest_level:
self.last_update_time = current_time
def _update_reply_probability(self, current_time: float):
time_delta = current_time - self.last_update_time
if time_delta <= 0:
return
currently_above = self.interest_level >= self.trigger_threshold
previous_is_above = self.is_above_threshold
if currently_above:
if not self.is_above_threshold:
self.current_reply_probability = self.base_reply_probability
interest_logger.debug(
f"兴趣跨过阈值 ({self.trigger_threshold}). 概率重置为基础值: {self.base_reply_probability:.4f}"
)
else:
increase_amount = self.probability_increase_rate * time_delta
self.current_reply_probability += increase_amount
self.current_reply_probability = min(self.current_reply_probability, self.max_reply_probability)
else:
if previous_is_above:
if self.state_change_callback:
try:
self.state_change_callback(ChatState.ABSENT)
except Exception as e:
interest_logger.error(f"Error calling state_change_callback for ABSENT: {e}")
if 0 < self.probability_decay_factor < 1:
decay_multiplier = math.pow(self.probability_decay_factor, time_delta)
self.current_reply_probability *= decay_multiplier
if self.current_reply_probability < 1e-6:
self.current_reply_probability = 0.0
elif self.probability_decay_factor <= 0:
if self.current_reply_probability > 0:
interest_logger.warning(f"无效的衰减因子 ({self.probability_decay_factor}). 设置概率为0.")
self.current_reply_probability = 0.0
self.current_reply_probability = max(self.current_reply_probability, 0.0)
self.is_above_threshold = currently_above
def increase_interest(self, current_time: float, value: float):
self._update_reply_probability(current_time)
self._calculate_decay(current_time)
self.interest_level += value
self.interest_level = min(self.interest_level, self.max_interest)
self.last_update_time = current_time
self.last_interaction_time = current_time
def decrease_interest(self, current_time: float, value: float):
self._update_reply_probability(current_time)
self.interest_level -= value
self.interest_level = max(self.interest_level, 0.0)
self.last_update_time = current_time
self.last_interaction_time = current_time
def get_interest(self) -> float:
current_time = time.time()
self._update_reply_probability(current_time)
self._calculate_decay(current_time)
self.last_update_time = current_time
return self.interest_level
def get_state(self) -> dict:
interest = self.get_interest()
return {
"interest_level": round(interest, 2),
"last_update_time": self.last_update_time,
"current_reply_probability": round(self.current_reply_probability, 4),
"is_above_threshold": self.is_above_threshold,
"last_interaction_time": self.last_interaction_time,
}
def should_evaluate_reply(self) -> bool:
current_time = time.time()
self._update_reply_probability(current_time)
if self.current_reply_probability > 0:
trigger = random.random() < self.current_reply_probability
return trigger
else:
return False
class SubHeartflow:
def __init__(self, subheartflow_id, parent_heartflow: "Heartflow"):
"""子心流初始化函数
Args:
subheartflow_id: 子心流唯一标识符
parent_heartflow: 父级心流实例
"""
# 基础属性
self.subheartflow_id = subheartflow_id
self.parent_heartflow = parent_heartflow
self.bot_name = global_config.BOT_NICKNAME # 机器人昵称
# 思维状态相关
self.current_mind = "你什么也没想" # 当前想法
self.past_mind = [] # 历史想法记录
self.main_heartflow_info = "" # 主心流信息
# 聊天状态管理
self.chat_state: ChatStateInfo = ChatStateInfo() # 聊天状态信息
self.interest_chatting = InterestChatting(state_change_callback=self.set_chat_state) # 兴趣聊天系统
# 活动状态管理
self.last_active_time = time.time() # 最后活跃时间
self.is_active = False # 是否活跃标志
self.should_stop = False # 停止标志
self.task: Optional[asyncio.Task] = None # 后台任务
self.heart_fc_instance: Optional["HeartFChatting"] = None # <-- Add instance variable
# 观察和知识系统
self.observations: List[ChattingObservation] = [] # 观察列表
self.running_knowledges = [] # 运行中的知识
# LLM模型配置
self.llm_model = LLMRequest(
model=global_config.llm_sub_heartflow,
temperature=global_config.llm_sub_heartflow["temp"],
max_tokens=800,
request_type="sub_heart_flow",
)
async def set_chat_state(self, new_state: "ChatState"):
"""更新sub_heartflow的聊天状态并管理 HeartFChatting 实例"""
current_state = self.chat_state.chat_status
if current_state == new_state:
logger.trace(f"[{self.subheartflow_id}] State already {current_state.value}, no change.")
return # No change needed
log_prefix = f"[{chat_manager.get_stream_name(self.subheartflow_id) or self.subheartflow_id}]"
current_mai_state = self.parent_heartflow.current_state.mai_status
# --- Entering CHAT state ---
if new_state == ChatState.CHAT:
normal_limit = current_mai_state.get_normal_chat_max_num()
current_chat_count = self.parent_heartflow.count_subflows_by_state(ChatState.CHAT)
if current_chat_count >= normal_limit:
logger.debug(
f"{log_prefix} 拒绝从 {current_state.value} 转换到 CHAT。原因CHAT 状态已达上限 ({normal_limit})。当前数量: {current_chat_count}"
)
return # Block the state transition
else:
logger.debug(
f"{log_prefix} 允许从 {current_state.value} 转换到 CHAT (上限: {normal_limit}, 当前: {current_chat_count})"
)
# If transitioning out of FOCUSED, shut down HeartFChatting first
if current_state == ChatState.FOCUSED and self.heart_fc_instance:
logger.info(f"{log_prefix} 从 FOCUSED 转换到 CHAT正在关闭 HeartFChatting...")
await self.heart_fc_instance.shutdown()
self.heart_fc_instance = None
# --- Entering FOCUSED state ---
elif new_state == ChatState.FOCUSED:
focused_limit = current_mai_state.get_focused_chat_max_num()
current_focused_count = self.parent_heartflow.count_subflows_by_state(ChatState.FOCUSED)
if current_focused_count >= focused_limit:
logger.debug(
f"{log_prefix} 拒绝从 {current_state.value} 转换到 FOCUSED。原因FOCUSED 状态已达上限 ({focused_limit})。当前数量: {current_focused_count}"
)
return # Block the state transition
else:
logger.debug(
f"{log_prefix} 允许从 {current_state.value} 转换到 FOCUSED (上限: {focused_limit}, 当前: {current_focused_count})"
)
if not self.heart_fc_instance:
logger.info(f"{log_prefix} 状态转为 FOCUSED创建并初始化 HeartFChatting 实例...")
try:
self.heart_fc_instance = HeartFChatting(
chat_id=self.subheartflow_id,
gpt_instance=self.parent_heartflow.gpt_instance,
tool_user_instance=self.parent_heartflow.tool_user_instance,
emoji_manager_instance=self.parent_heartflow.emoji_manager_instance,
)
# Initialize and potentially start the loop via add_time
if await self.heart_fc_instance._initialize():
# Give it an initial time boost to start the loop
await self.heart_fc_instance.add_time()
logger.info(f"{log_prefix} HeartFChatting 实例已创建并启动。")
else:
logger.error(
f"{log_prefix} HeartFChatting 实例初始化失败,状态回滚到 {current_state.value}"
)
self.heart_fc_instance = None
return # Prevent state change if HeartFChatting fails to init
except Exception as e:
logger.error(f"{log_prefix} 创建 HeartFChatting 实例时出错: {e}")
logger.error(traceback.format_exc())
self.heart_fc_instance = None
return # Prevent state change on error
else:
logger.warning(f"{log_prefix} 尝试进入 FOCUSED 状态,但 HeartFChatting 实例已存在。")
# --- Entering ABSENT state (or any state other than FOCUSED) ---
elif current_state == ChatState.FOCUSED and self.heart_fc_instance:
logger.info(f"{log_prefix} 从 FOCUSED 转换到 {new_state.value},正在关闭 HeartFChatting...")
await self.heart_fc_instance.shutdown()
self.heart_fc_instance = None
# --- Update state and timestamp if transition is allowed --- # 更新状态必须放在所有检查和操作之后
self.chat_state.chat_status = new_state
self.last_active_time = time.time()
logger.info(f"{log_prefix} 聊天状态从 {current_state.value} 变更为 {new_state.value}")
async def subheartflow_start_working(self):
while True:
if self.should_stop:
logger.info(f"子心流 {self.subheartflow_id} 被标记为停止,正在退出后台任务...")
break
# await asyncio.sleep(global_config.sub_heart_flow_update_interval)
await asyncio.sleep(10)
async def ensure_observed(self):
observation = self._get_primary_observation()
if observation:
try:
await observation.observe()
logger.trace(f"[{self.subheartflow_id}] Observation updated before thinking.")
except Exception as e:
logger.error(f"[{self.subheartflow_id}] Error during pre-thinking observation: {e}")
logger.error(traceback.format_exc())
async def do_thinking_before_reply(
self,
extra_info: str,
obs_id: list[str] = None,
):
self.last_active_time = time.time()
current_thinking_info = self.current_mind
mood_info = self.chat_state.mood
observation = self._get_primary_observation()
chat_observe_info = ""
if obs_id:
try:
chat_observe_info = observation.get_observe_info(obs_id)
logger.debug(f"[{self.subheartflow_id}] Using specific observation IDs: {obs_id}")
except Exception as e:
logger.error(
f"[{self.subheartflow_id}] Error getting observe info with IDs {obs_id}: {e}. Falling back."
)
chat_observe_info = observation.get_observe_info()
else:
chat_observe_info = observation.get_observe_info()
# logger.debug(f"[{self.subheartflow_id}] Using default observation info.")
extra_info_prompt = ""
if extra_info:
for tool_name, tool_data in extra_info.items():
extra_info_prompt += f"{tool_name} 相关信息:\n"
for item in tool_data:
extra_info_prompt += f"- {item['name']}: {item['content']}\n"
else:
extra_info_prompt = "无工具信息。\n"
individuality = Individuality.get_instance()
prompt_personality = f"你的名字是{self.bot_name},你"
prompt_personality += individuality.personality.personality_core
if individuality.personality.personality_sides:
random_side = random.choice(individuality.personality.personality_sides)
prompt_personality += f"{random_side}"
if individuality.identity.identity_detail:
random_detail = random.choice(individuality.identity.identity_detail)
prompt_personality += f"{random_detail}"
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
local_random = random.Random()
current_minute = int(time.strftime("%M"))
local_random.seed(current_minute)
hf_options = [
("继续生成你在这个聊天中的想法,在原来想法的基础上继续思考", 0.7),
("生成你在这个聊天中的想法,在原来的想法上尝试新的话题", 0.1),
("生成你在这个聊天中的想法,不要太深入", 0.1),
("继续生成你在这个聊天中的想法,进行深入思考", 0.1),
]
hf_do_next = local_random.choices(
[option[0] for option in hf_options], weights=[option[1] for option in hf_options], k=1
)[0]
prompt = (await global_prompt_manager.get_prompt_async("sub_heartflow_prompt_before")).format(
extra_info=extra_info_prompt,
prompt_personality=prompt_personality,
bot_name=self.bot_name,
current_thinking_info=current_thinking_info,
time_now=time_now,
chat_observe_info=chat_observe_info,
mood_info=mood_info,
hf_do_next=hf_do_next,
)
prompt = await relationship_manager.convert_all_person_sign_to_person_name(prompt)
prompt = parse_text_timestamps(prompt, mode="lite")
logger.debug(f"[{self.subheartflow_id}] 心流思考prompt:\n{prompt}\n")
try:
response, reasoning_content = await self.llm_model.generate_response_async(prompt)
logger.debug(f"[{self.subheartflow_id}] 心流思考结果:\n{response}\n")
if not response:
response = "(不知道该想些什么...)"
logger.warning(f"[{self.subheartflow_id}] LLM 返回空结果,思考失败。")
except Exception as e:
logger.error(f"[{self.subheartflow_id}] 内心独白获取失败: {e}")
response = "(思考时发生错误...)"
self.update_current_mind(response)
return self.current_mind, self.past_mind
def update_current_mind(self, response):
self.past_mind.append(self.current_mind)
self.current_mind = response
def add_observation(self, observation: Observation):
for existing_obs in self.observations:
if existing_obs.observe_id == observation.observe_id:
return
self.observations.append(observation)
def remove_observation(self, observation: Observation):
if observation in self.observations:
self.observations.remove(observation)
def get_all_observations(self) -> list[Observation]:
return self.observations
def clear_observations(self):
self.observations.clear()
def _get_primary_observation(self) -> Optional[ChattingObservation]:
if self.observations and isinstance(self.observations[0], ChattingObservation):
return self.observations[0]
logger.warning(f"SubHeartflow {self.subheartflow_id} 没有找到有效的 ChattingObservation")
return None
def get_interest_state(self) -> dict:
return self.interest_chatting.get_state()
def get_interest_level(self) -> float:
return self.interest_chatting.get_interest()
def should_evaluate_reply(self) -> bool:
return self.interest_chatting.should_evaluate_reply()
def add_interest_dict_entry(self, message: MessageRecv, interest_value: float, is_mentioned: bool):
self.interest_chatting.add_interest_dict(message, interest_value, is_mentioned)
def get_interest_dict(self) -> Dict[str, tuple[MessageRecv, float, bool]]:
return self.interest_chatting.interest_dict
init_prompt()