feat:成功融合reasoning和HFC,由主心流统一调控

This commit is contained in:
SengokuCola
2025-04-22 02:01:52 +08:00
parent 5b894f7f59
commit 55254549be
19 changed files with 708 additions and 785 deletions

16
src/MaiBot0.6roadmap.md Normal file
View File

@@ -0,0 +1,16 @@
MaiCore/MaiBot 0.6路线图 draft
0.6.3解决0.6.x版本核心问题改进功能
主要功能加入
LPMM全面替代旧知识库
采用新的HFC回复模式取代旧心流
合并推理模式和心流模式,根据麦麦自己决策回复模式
提供新的表情包系统
0.6.4:提升用户体验,交互优化
加入webui
提供麦麦 API
修复prompt建构的各种问题
修复各种bug
调整代码文件结构,重构部分落后设计

View File

@@ -28,7 +28,7 @@ logger = get_module_logger("config", config=config_config)
# 考虑到实际上配置文件中的mai_version是不会自动更新的,所以采用硬编码 # 考虑到实际上配置文件中的mai_version是不会自动更新的,所以采用硬编码
is_test = True is_test = True
mai_version_main = "0.6.3" mai_version_main = "0.6.3"
mai_version_fix = "snapshot-2" mai_version_fix = "snapshot-3"
if mai_version_fix: if mai_version_fix:
if is_test: if is_test:

Binary file not shown.

Before

Width:  |  Height:  |  Size: 59 KiB

View File

@@ -79,4 +79,16 @@ await heartflow.heartflow_start_working()
1. 子心流会在长时间不活跃后自动清理 1. 子心流会在长时间不活跃后自动清理
2. 需要合理配置更新间隔以平衡性能和响应速度 2. 需要合理配置更新间隔以平衡性能和响应速度
3. 观察系统会限制消息处理数量以避免过载 3. 观察系统会限制消息处理数量以避免过载
更新:
把聊天控制移动到心流下吧
首先心流要根据日程以及当前状况判定总体状态MaiStateInfo
然后根据每个子心流的运行情况给子心流分配聊天资源ChatStateInfoABSENT CHAT 或者 FOCUS
子心流负责根据状态进行执行
1.将interest.py进行拆分class InterestChatting 将会在 sub_heartflow中声明每个sub_heartflow都会所属一个InterestChatting
class InterestManager 将会在heartflow中声明成为heartflow的一个组件伴随heartflow产生

Binary file not shown.

Before

Width:  |  Height:  |  Size: 91 KiB

11
src/heart_flow/Update.md Normal file
View File

@@ -0,0 +1,11 @@
更新:
把聊天控制移动到心流下吧
首先心流要根据日程以及当前状况判定总体状态MaiStateInfo
然后根据每个子心流的运行情况给子心流分配聊天资源ChatStateInfoABSENT CHAT 或者 FOCUS
子心流负责根据状态进行执行
1.将interest.py进行拆分class InterestChatting 将会在 sub_heartflow中声明每个sub_heartflow都会所属一个InterestChatting
class InterestManager 将会在heartflow中声明成为heartflow的一个组件伴随heartflow产生

Binary file not shown.

Before

Width:  |  Height:  |  Size: 88 KiB

View File

@@ -5,12 +5,16 @@ from src.config.config import global_config
from src.plugins.schedule.schedule_generator import bot_schedule from src.plugins.schedule.schedule_generator import bot_schedule
from src.plugins.utils.prompt_builder import Prompt, global_prompt_manager from src.plugins.utils.prompt_builder import Prompt, global_prompt_manager
import asyncio import asyncio
from src.common.logger import get_module_logger, LogConfig, HEARTFLOW_STYLE_CONFIG # noqa: E402 from src.common.logger import get_module_logger, LogConfig, HEARTFLOW_STYLE_CONFIG # 修改
from src.individuality.individuality import Individuality from src.individuality.individuality import Individuality
import time import time
import random import random
from typing import Dict, Any, Optional from typing import Dict, Any, Optional
import traceback import traceback
import enum
import os # 新增
import json # 新增
from src.plugins.chat.chat_stream import chat_manager # 新增
heartflow_config = LogConfig( heartflow_config = LogConfig(
# 使用海马体专用样式 # 使用海马体专用样式
@@ -41,80 +45,262 @@ def init_prompt():
Prompt(prompt, "mind_summary_prompt") Prompt(prompt, "mind_summary_prompt")
class CurrentState: # --- 新增:从 interest.py 移动过来的常量 ---
LOG_DIRECTORY = "logs/interest"
HISTORY_LOG_FILENAME = "interest_history.log"
CLEANUP_INTERVAL_SECONDS = 1200 # 清理任务运行间隔 (例如20分钟) - 保持与 interest.py 一致
INACTIVE_THRESHOLD_SECONDS = 1200 # 不活跃时间阈值 (例如20分钟) - 保持与 interest.py 一致
LOG_INTERVAL_SECONDS = 3 # 日志记录间隔 (例如3秒) - 保持与 interest.py 一致
# --- 结束新增常量 ---
# 新增 ChatStatus 枚举
class MaiState(enum.Enum):
"""
聊天状态:
OFFLINE: 不在线:回复概率极低,不会进行任何聊天
PEEKING: 看一眼手机:回复概率较低,会进行一些普通聊天
NORMAL_CHAT: 正常聊天:回复概率较高,会进行一些普通聊天和少量的专注聊天
FOCUSED_CHAT: 专注聊天:回复概率极高,会进行专注聊天和少量的普通聊天
"""
OFFLINE = "不在线"
PEEKING = "看一眼手机"
NORMAL_CHAT = "正常聊天"
FOCUSED_CHAT = "专注聊天"
def get_normal_chat_max_num(self):
if self == MaiState.OFFLINE:
return 0
elif self == MaiState.PEEKING:
return 1
elif self == MaiState.NORMAL_CHAT:
return 3
elif self == MaiState.FOCUSED_CHAT:
return 2
def get_focused_chat_max_num(self):
if self == MaiState.OFFLINE:
return 0
elif self == MaiState.PEEKING:
return 0
elif self == MaiState.NORMAL_CHAT:
return 1
elif self == MaiState.FOCUSED_CHAT:
return 2
class MaiStateInfo:
def __init__(self): def __init__(self):
self.current_state_info = "" self.current_state_info = ""
self.chat_status = "IDLE" # 使用枚举类型初始化状态,默认为不在线
self.mai_status: MaiState = MaiState.OFFLINE
self.normal_chatting = []
self.focused_chatting = []
self.mood_manager = MoodManager() self.mood_manager = MoodManager()
self.mood = self.mood_manager.get_prompt() self.mood = self.mood_manager.get_prompt()
self.attendance_factor = 0
self.engagement_factor = 0
def update_current_state_info(self): def update_current_state_info(self):
self.current_state_info = self.mood_manager.get_current_mood() self.current_state_info = self.mood_manager.get_current_mood()
# 新增更新聊天状态的方法
def update_mai_status(self, new_status: MaiState):
"""更新聊天状态"""
if isinstance(new_status, MaiState):
self.mai_status = new_status
logger.info(f"麦麦状态更新为: {self.mai_status.value}")
else:
logger.warning(f"尝试设置无效的麦麦状态: {new_status}")
class Heartflow: class Heartflow:
def __init__(self): def __init__(self):
self.current_mind = "你什么也没想" self.current_mind = "你什么也没想"
self.past_mind = [] self.past_mind = []
self.current_state: CurrentState = CurrentState() self.current_state: MaiStateInfo = MaiStateInfo()
self.llm_model = LLMRequest( self.llm_model = LLMRequest(
model=global_config.llm_heartflow, temperature=0.6, max_tokens=1000, request_type="heart_flow" model=global_config.llm_heartflow, temperature=0.6, max_tokens=1000, request_type="heart_flow"
) )
self._subheartflows: Dict[Any, SubHeartflow] = {} self._subheartflows: Dict[Any, SubHeartflow] = {}
async def _cleanup_inactive_subheartflows(self): # --- 新增:日志和清理相关属性 (从 InterestManager 移动) ---
"""定期清理不活跃的子心流""" self._history_log_file_path = os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME)
self._ensure_log_directory() # 初始化时确保目录存在
self._cleanup_task: Optional[asyncio.Task] = None
self._logging_task: Optional[asyncio.Task] = None
# 注意:衰减任务 (_decay_task) 不再需要,衰减在 SubHeartflow 的 InterestChatting 内部处理
# --- 结束新增属性 ---
def _ensure_log_directory(self): # 新增方法 (从 InterestManager 移动)
"""确保日志目录存在"""
# 移除 try-except 块,根据用户要求
os.makedirs(LOG_DIRECTORY, exist_ok=True)
logger.info(f"Log directory '{LOG_DIRECTORY}' ensured.")
# except OSError as e:
# logger.error(f"Error creating log directory '{LOG_DIRECTORY}': {e}")
async def _periodic_cleanup_task(
self, interval_seconds: int, max_age_seconds: int
): # 新增方法 (从 InterestManager 移动和修改)
"""后台清理任务的异步函数"""
while True: while True:
current_time = time.time() await asyncio.sleep(interval_seconds)
inactive_subheartflows_ids = [] # 修改变量名以清晰表示存储的是ID logger.info(f"[Heartflow] 运行定期清理 (间隔: {interval_seconds}秒)...")
self.cleanup_inactive_subheartflows(max_age_seconds=max_age_seconds) # 调用 Heartflow 自己的清理方法
# 检查所有子心流 async def _periodic_log_task(self, interval_seconds: int): # 新增方法 (从 InterestManager 移动和修改)
# 使用 list(self._subheartflows.items()) 避免在迭代时修改字典 """后台日志记录任务的异步函数 (记录所有子心流的兴趣历史数据)"""
for subheartflow_id, subheartflow in list(self._subheartflows.items()): while True:
if ( await asyncio.sleep(interval_seconds)
current_time - subheartflow.last_active_time > global_config.sub_heart_flow_stop_time try:
): # 10分钟 = 600秒 current_timestamp = time.time()
logger.info(f"发现不活跃的子心流: {subheartflow_id}, 准备清理。") all_interest_states = self.get_all_interest_states() # 获取所有子心流的兴趣状态
# 1. 标记子心流让其后台任务停止
subheartflow.should_stop = True
# 2. 将ID添加到待清理列表
inactive_subheartflows_ids.append(subheartflow_id)
# 清理不活跃的子心流 (从字典中移除) # 以追加模式打开历史日志文件
for subheartflow_id in inactive_subheartflows_ids: # 移除 try-except IO 块,根据用户要求
if subheartflow_id in self._subheartflows: with open(self._history_log_file_path, "a", encoding="utf-8") as f:
del self._subheartflows[subheartflow_id] count = 0
logger.info(f"已从主心流移除子心流: {subheartflow_id}") # 创建 items 快照以安全迭代
else: items_snapshot = list(all_interest_states.items())
logger.warning(f"尝试移除子心流 {subheartflow_id} 时发现其已被移除。") for stream_id, state in items_snapshot:
# 从 chat_manager 获取 group_name
group_name = stream_id # 默认值
try:
chat_stream = chat_manager.get_stream(stream_id)
if chat_stream and chat_stream.group_info:
group_name = chat_stream.group_info.group_name
elif chat_stream and not chat_stream.group_info: # 处理私聊
group_name = (
f"私聊_{chat_stream.user_info.user_nickname}"
if chat_stream.user_info
else stream_id
)
except Exception:
# 不记录警告,避免刷屏,使用默认 stream_id 即可
# logger.warning(f"Could not get group name for stream_id {stream_id}: {e}")
pass # 静默处理
await asyncio.sleep(30) # 每分钟检查一次 log_entry = {
"timestamp": round(current_timestamp, 2),
"stream_id": stream_id,
"interest_level": state.get("interest_level", 0.0), # 使用 get 获取,提供默认值
"group_name": group_name,
"reply_probability": state.get("current_reply_probability", 0.0), # 使用 get 获取
"is_above_threshold": state.get("is_above_threshold", False), # 使用 get 获取
}
# 将每个条目作为单独的 JSON 行写入
f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
count += 1
# logger.debug(f"[Heartflow] Successfully appended {count} interest history entries to {self._history_log_file_path}")
async def _sub_heartflow_update(self): # except IOError as e:
# logger.error(f"[Heartflow] Error writing interest history log to {self._history_log_file_path}: {e}")
except Exception as e: # 保留对其他异常的捕获
logger.error(f"[Heartflow] Unexpected error during periodic history logging: {e}")
logger.error(traceback.format_exc()) # 记录 traceback
def get_all_interest_states(self) -> Dict[str, Dict]: # 新增方法
"""获取所有活跃子心流的当前兴趣状态"""
states = {}
# 创建副本以避免在迭代时修改字典
items_snapshot = list(self._subheartflows.items())
for stream_id, subheartflow in items_snapshot:
try:
# 从 SubHeartflow 获取其 InterestChatting 的状态
states[stream_id] = subheartflow.get_interest_state()
except Exception as e:
logger.warning(f"[Heartflow] Error getting interest state for subheartflow {stream_id}: {e}")
return states
def cleanup_inactive_subheartflows(self, max_age_seconds=INACTIVE_THRESHOLD_SECONDS): # 修改此方法以使用兴趣时间
"""
清理长时间不活跃的子心流记录 (基于兴趣交互时间)
max_age_seconds: 超过此时间未通过兴趣系统交互的将被清理
"""
current_time = time.time()
keys_to_remove = []
_initial_count = len(self._subheartflows)
# 创建副本以避免在迭代时修改字典
items_snapshot = list(self._subheartflows.items())
for subheartflow_id, subheartflow in items_snapshot:
should_remove = False
reason = ""
# 检查 InterestChatting 的最后交互时间
last_interaction = subheartflow.interest_chatting.last_interaction_time
if max_age_seconds is not None and (current_time - last_interaction) > max_age_seconds:
should_remove = True
reason = (
f"interest inactive time ({current_time - last_interaction:.0f}s) > max age ({max_age_seconds}s)"
)
if should_remove:
keys_to_remove.append(subheartflow_id)
stream_name = chat_manager.get_stream_name(subheartflow_id) or subheartflow_id # 获取流名称
logger.debug(f"[Heartflow] Marking stream {stream_name} for removal. Reason: {reason}")
# 标记子心流让其后台任务停止 (如果其后台任务还在运行)
subheartflow.should_stop = True
if keys_to_remove:
logger.info(f"[Heartflow] 清理识别到 {len(keys_to_remove)} 个不活跃的流。")
for key in keys_to_remove:
if key in self._subheartflows:
# 尝试取消子心流的后台任务
task_to_cancel = self._subheartflows[key].task
if task_to_cancel and not task_to_cancel.done():
task_to_cancel.cancel()
logger.debug(f"[Heartflow] Cancelled background task for subheartflow {key}")
# 从字典中删除
del self._subheartflows[key]
stream_name = chat_manager.get_stream_name(key) or key # 获取流名称
logger.debug(f"[Heartflow] 移除了流: {stream_name}")
final_count = len(self._subheartflows) # 直接获取当前长度
logger.info(f"[Heartflow] 清理完成。移除了 {len(keys_to_remove)} 个流。当前数量: {final_count}")
else:
# logger.info(f"[Heartflow] 清理完成。没有流符合移除条件。当前数量: {initial_count}") # 减少日志噪音
pass
async def _sub_heartflow_update(self): # 这个任务目前作用不大,可以考虑移除或赋予新职责
while True: while True:
# 检查是否存在子心流 # 检查是否存在子心流
if not self._subheartflows: if not self._subheartflows:
# logger.info("当前没有子心流,等待新的子心流创建...") # logger.info("当前没有子心流,等待新的子心流创建...")
await asyncio.sleep(30) # 每分钟检查一次是否有新的子心流 await asyncio.sleep(30) # 短暂休眠
continue continue
# await self.do_a_thinking() # 当前无实际操作,只是等待
# await asyncio.sleep(global_config.heart_flow_update_interval * 3) # 5分钟思考一次
await asyncio.sleep(300) await asyncio.sleep(300)
async def heartflow_start_working(self): async def heartflow_start_working(self):
# 启动清理任务 # 启动清理任务 (使用新的 periodic_cleanup_task)
asyncio.create_task(self._cleanup_inactive_subheartflows()) if self._cleanup_task is None or self._cleanup_task.done():
self._cleanup_task = asyncio.create_task(
self._periodic_cleanup_task(
interval_seconds=CLEANUP_INTERVAL_SECONDS,
max_age_seconds=INACTIVE_THRESHOLD_SECONDS,
)
)
logger.info(
f"[Heartflow] 已创建定期清理任务。间隔: {CLEANUP_INTERVAL_SECONDS}s, 不活跃阈值: {INACTIVE_THRESHOLD_SECONDS}s"
)
else:
logger.warning("[Heartflow] 跳过创建清理任务: 任务已在运行或存在。")
# 启动子心流更新任务 # 启动日志任务 (使用新的 periodic_log_task)
asyncio.create_task(self._sub_heartflow_update()) if self._logging_task is None or self._logging_task.done():
self._logging_task = asyncio.create_task(self._periodic_log_task(interval_seconds=LOG_INTERVAL_SECONDS))
logger.info(f"[Heartflow] 已创建定期日志任务。间隔: {LOG_INTERVAL_SECONDS}s")
else:
logger.warning("[Heartflow] 跳过创建日志任务: 任务已在运行或存在。")
# (可选) 启动旧的子心流更新任务,如果它还有用的话
# asyncio.create_task(self._sub_heartflow_update())
@staticmethod @staticmethod
async def _update_current_state(): async def _update_current_state():
@@ -133,146 +319,149 @@ class Heartflow:
prompt_personality += personality_core prompt_personality += personality_core
personality_sides = individuality.personality.personality_sides personality_sides = individuality.personality.personality_sides
random.shuffle(personality_sides) # 检查列表是否为空
prompt_personality += f",{personality_sides[0]}" if personality_sides:
random.shuffle(personality_sides)
prompt_personality += f",{personality_sides[0]}"
identity_detail = individuality.identity.identity_detail identity_detail = individuality.identity.identity_detail
random.shuffle(identity_detail) # 检查列表是否为空
prompt_personality += f",{identity_detail[0]}" if identity_detail:
random.shuffle(identity_detail)
prompt_personality += f",{identity_detail[0]}"
personality_info = prompt_personality personality_info = prompt_personality
current_thinking_info = self.current_mind current_thinking_info = self.current_mind
mood_info = self.current_state.mood mood_info = self.current_state.mood
related_memory_info = "memory" related_memory_info = "memory" # TODO: 替换为实际的记忆获取逻辑
try: try:
sub_flows_info = await self.get_all_subheartflows_minds() sub_flows_info = await self.get_all_subheartflows_minds_summary() # 修改为调用汇总方法
except Exception as e: except Exception as e:
logger.error(f"获取子心流想法失败: {e}") logger.error(f"[Heartflow] 获取子心流想法汇总失败: {e}")
return logger.error(traceback.format_exc())
sub_flows_info = "(获取子心流想法时出错)" # 提供默认值
schedule_info = bot_schedule.get_current_num_task(num=4, time_info=True) schedule_info = bot_schedule.get_current_num_task(num=4, time_info=True)
# prompt = ""
# prompt += f"你刚刚在做的事情是:{schedule_info}\n"
# prompt += f"{personality_info}\n"
# prompt += f"你想起来{related_memory_info}。"
# prompt += f"刚刚你的主要想法是{current_thinking_info}。"
# prompt += f"你还有一些小想法,因为你在参加不同的群聊天,这是你正在做的事情:{sub_flows_info}\n"
# prompt += f"你现在{mood_info}。"
# prompt += "现在你接下去继续思考,产生新的想法,但是要基于原有的主要想法,不要分点输出,"
# prompt += "输出连贯的内心独白,不要太长,但是记得结合上述的消息,关注新内容:"
prompt = (await global_prompt_manager.get_prompt_async("thinking_prompt")).format( prompt = (await global_prompt_manager.get_prompt_async("thinking_prompt")).format(
schedule_info, personality_info, related_memory_info, current_thinking_info, sub_flows_info, mood_info schedule_info=schedule_info, # 使用关键字参数确保正确格式化
personality_info=personality_info,
related_memory_info=related_memory_info,
current_thinking_info=current_thinking_info,
sub_flows_info=sub_flows_info,
mood_info=mood_info,
) )
try: try:
response, reasoning_content = await self.llm_model.generate_response_async(prompt) response, reasoning_content = await self.llm_model.generate_response_async(prompt)
if not response:
logger.warning("[Heartflow] 内心独白 LLM 返回空结果。")
response = "(暂时没什么想法...)" # 提供默认想法
self.update_current_mind(response) # 更新主心流想法
logger.info(f"麦麦的总体脑内状态:{self.current_mind}")
# 更新所有子心流的主心流信息
items_snapshot = list(self._subheartflows.items()) # 创建快照
for _, subheartflow in items_snapshot:
subheartflow.main_heartflow_info = response
except Exception as e: except Exception as e:
logger.error(f"内心独白获取失败: {e}") logger.error(f"[Heartflow] 内心独白获取失败: {e}")
return logger.error(traceback.format_exc())
self.update_current_mind(response) # 此处不返回,允许程序继续执行,但主心流想法未更新
self.current_mind = response
logger.info(f"麦麦的总体脑内状态:{self.current_mind}")
# logger.info("麦麦想了想,当前活动:")
# await bot_schedule.move_doing(self.current_mind)
for _, subheartflow in self._subheartflows.items():
subheartflow.main_heartflow_info = response
def update_current_mind(self, response): def update_current_mind(self, response):
self.past_mind.append(self.current_mind) self.past_mind.append(self.current_mind)
self.current_mind = response self.current_mind = response
async def get_all_subheartflows_minds(self): async def get_all_subheartflows_minds_summary(self): # 重命名并修改
sub_minds = "" """获取所有子心流的当前想法,并进行汇总"""
for _, subheartflow in self._subheartflows.items(): sub_minds_list = []
sub_minds += subheartflow.current_mind # 创建快照
items_snapshot = list(self._subheartflows.items())
for _, subheartflow in items_snapshot:
sub_minds_list.append(subheartflow.current_mind)
return await self.minds_summary(sub_minds) if not sub_minds_list:
return "(当前没有活跃的子心流想法)"
minds_str = "\n".join([f"- {mind}" for mind in sub_minds_list]) # 格式化为列表
# 调用 LLM 进行汇总
return await self.minds_summary(minds_str)
async def minds_summary(self, minds_str): async def minds_summary(self, minds_str):
"""使用 LLM 汇总子心流的想法字符串"""
# 开始构建prompt # 开始构建prompt
prompt_personality = "" prompt_personality = ""
# person
individuality = Individuality.get_instance() individuality = Individuality.get_instance()
prompt_personality += individuality.personality.personality_core
personality_core = individuality.personality.personality_core if individuality.personality.personality_sides:
prompt_personality += personality_core prompt_personality += f",{random.choice(individuality.personality.personality_sides)}" # 随机选一个
if individuality.identity.identity_detail:
personality_sides = individuality.personality.personality_sides prompt_personality += f",{random.choice(individuality.identity.identity_detail)}" # 随机选一个
random.shuffle(personality_sides)
prompt_personality += f",{personality_sides[0]}"
identity_detail = individuality.identity.identity_detail
random.shuffle(identity_detail)
prompt_personality += f",{identity_detail[0]}"
personality_info = prompt_personality personality_info = prompt_personality
mood_info = self.current_state.mood mood_info = self.current_state.mood
bot_name = global_config.BOT_NICKNAME # 使用全局配置中的机器人昵称
# prompt = ""
# prompt += f"{personality_info}\n"
# prompt += f"现在{global_config.BOT_NICKNAME}的想法是:{self.current_mind}\n"
# prompt += f"现在{global_config.BOT_NICKNAME}在qq群里进行聊天聊天的话题如下{minds_str}\n"
# prompt += f"你现在{mood_info}\n"
# prompt += """现在请你总结这些聊天内容,注意关注聊天内容对原有的想法的影响,输出连贯的内心独白
# 不要太长,但是记得结合上述的消息,要记得你的人设,关注新内容:"""
prompt = (await global_prompt_manager.get_prompt_async("mind_summary_prompt")).format( prompt = (await global_prompt_manager.get_prompt_async("mind_summary_prompt")).format(
personality_info, global_config.BOT_NICKNAME, self.current_mind, minds_str, mood_info personality_info=personality_info, # 使用关键字参数
bot_name=bot_name,
current_mind=self.current_mind,
minds_str=minds_str,
mood_info=mood_info,
) )
response, reasoning_content = await self.llm_model.generate_response_async(prompt) try:
response, reasoning_content = await self.llm_model.generate_response_async(prompt)
return response if not response:
logger.warning("[Heartflow] 想法汇总 LLM 返回空结果。")
return "(想法汇总失败...)"
return response
except Exception as e:
logger.error(f"[Heartflow] 想法汇总失败: {e}")
logger.error(traceback.format_exc())
return "(想法汇总时发生错误...)"
async def create_subheartflow(self, subheartflow_id: Any) -> Optional[SubHeartflow]: async def create_subheartflow(self, subheartflow_id: Any) -> Optional[SubHeartflow]:
""" """
获取或创建一个新的SubHeartflow实例。 获取或创建一个新的SubHeartflow实例。
(主要逻辑不变InterestChatting 现在在 SubHeartflow 内部创建)
如果实例已存在,则直接返回。
如果不存在,则创建实例、观察对象、启动后台任务,并返回新实例。
创建过程中发生任何错误将返回 None。
Args:
subheartflow_id: 用于标识子心流的ID (例如群聊ID)。
Returns:
对应的 SubHeartflow 实例,如果创建失败则返回 None。
""" """
# 检查是否已存在
existing_subheartflow = self._subheartflows.get(subheartflow_id) existing_subheartflow = self._subheartflows.get(subheartflow_id)
if existing_subheartflow: if existing_subheartflow:
logger.debug(f"返回已存在的 subheartflow: {subheartflow_id}") # 如果已存在,确保其 last_active_time 更新 (如果需要的话)
# existing_subheartflow.last_active_time = time.time() # 移除,活跃时间由实际操作更新
# logger.debug(f"[Heartflow] 返回已存在的 subheartflow: {subheartflow_id}")
return existing_subheartflow return existing_subheartflow
# 如果不存在,则创建新的 logger.info(f"[Heartflow] 尝试创建新的 subheartflow: {subheartflow_id}")
logger.info(f"尝试创建新的 subheartflow: {subheartflow_id}")
try: try:
# 创建 SubHeartflow它内部会创建 InterestChatting
subheartflow = SubHeartflow(subheartflow_id) subheartflow = SubHeartflow(subheartflow_id)
# 创建并初始化观察对象 # 创建并初始化观察对象
logger.debug(f"{subheartflow_id} 创建 observation") logger.debug(f"[Heartflow] {subheartflow_id} 创建 observation")
observation = ChattingObservation(subheartflow_id) observation = ChattingObservation(subheartflow_id)
await observation.initialize() # 等待初始化完成 await observation.initialize()
subheartflow.add_observation(observation) subheartflow.add_observation(observation)
logger.debug(f"{subheartflow_id} 添加 observation 成功") logger.debug(f"[Heartflow] {subheartflow_id} 添加 observation 成功")
# 创建并存储后台任务 # 创建并存储后台任务 (SubHeartflow 自己的后台任务)
subheartflow.task = asyncio.create_task(subheartflow.subheartflow_start_working()) subheartflow.task = asyncio.create_task(subheartflow.subheartflow_start_working())
logger.debug(f"{subheartflow_id} 创建后台任务成功") logger.debug(f"[Heartflow] {subheartflow_id} 创建后台任务成功")
# 添加到管理字典 # 添加到管理字典
self._subheartflows[subheartflow_id] = subheartflow self._subheartflows[subheartflow_id] = subheartflow
logger.info(f"添加 subheartflow {subheartflow_id} 成功") logger.info(f"[Heartflow] 添加 subheartflow {subheartflow_id} 成功")
return subheartflow return subheartflow
except Exception as e: except Exception as e:
# 记录详细错误信息 logger.error(f"[Heartflow] 创建 subheartflow {subheartflow_id} 失败: {e}")
logger.error(f"创建 subheartflow {subheartflow_id} 失败: {e}") logger.error(traceback.format_exc())
logger.error(traceback.format_exc()) # 记录完整的 traceback
# 考虑是否需要更具体的错误处理或资源清理逻辑
return None return None
def get_subheartflow(self, observe_chat_id: Any) -> Optional[SubHeartflow]: def get_subheartflow(self, observe_chat_id: Any) -> Optional[SubHeartflow]:

View File

@@ -4,21 +4,20 @@ from src.plugins.moods.moods import MoodManager
from src.plugins.models.utils_model import LLMRequest from src.plugins.models.utils_model import LLMRequest
from src.config.config import global_config from src.config.config import global_config
import time import time
from typing import Optional, List from typing import Optional, List, Dict
import traceback import traceback
from src.plugins.chat.utils import parse_text_timestamps from src.plugins.chat.utils import parse_text_timestamps
import enum
# from src.plugins.schedule.schedule_generator import bot_schedule
# from src.plugins.memory_system.Hippocampus import HippocampusManager
from src.common.logger import get_module_logger, LogConfig, SUB_HEARTFLOW_STYLE_CONFIG # noqa: E402 from src.common.logger import get_module_logger, LogConfig, SUB_HEARTFLOW_STYLE_CONFIG # noqa: E402
# from src.plugins.chat.utils import get_embedding
# from src.common.database import db
# from typing import Union
from src.individuality.individuality import Individuality from src.individuality.individuality import Individuality
import random import random
from src.plugins.person_info.relationship_manager import relationship_manager from src.plugins.person_info.relationship_manager import relationship_manager
from ..plugins.utils.prompt_builder import Prompt, global_prompt_manager from ..plugins.utils.prompt_builder import Prompt, global_prompt_manager
from src.plugins.chat.message import MessageRecv
import math
# 定义常量 (从 interest.py 移动过来)
MAX_INTEREST = 15.0
subheartflow_config = LogConfig( subheartflow_config = LogConfig(
# 使用海马体专用样式 # 使用海马体专用样式
@@ -27,6 +26,12 @@ subheartflow_config = LogConfig(
) )
logger = get_module_logger("subheartflow", config=subheartflow_config) logger = get_module_logger("subheartflow", config=subheartflow_config)
interest_log_config = LogConfig(
console_format=SUB_HEARTFLOW_STYLE_CONFIG["console_format"],
file_format=SUB_HEARTFLOW_STYLE_CONFIG["file_format"],
)
interest_logger = get_module_logger("InterestChatting", config=interest_log_config)
def init_prompt(): def init_prompt():
prompt = "" prompt = ""
@@ -48,16 +53,166 @@ def init_prompt():
Prompt(prompt, "sub_heartflow_prompt_before") Prompt(prompt, "sub_heartflow_prompt_before")
class CurrentState: class ChatState(enum.Enum):
ABSENT = "不参与"
CHAT = "闲聊"
FOCUSED = "专注"
class ChatStateInfo:
def __init__(self): def __init__(self):
self.willing = 0 self.willing = 0
self.current_state_info = ""
self.chat_status: ChatState = ChatState.ABSENT
self.mood_manager = MoodManager() self.mood_manager = MoodManager()
self.mood = self.mood_manager.get_prompt() self.mood = self.mood_manager.get_prompt()
def update_current_state_info(self): def update_chat_state_info(self):
self.current_state_info = self.mood_manager.get_current_mood() self.chat_state_info = self.mood_manager.get_current_mood()
base_reply_probability = 0.05
probability_increase_rate_per_second = 0.08
max_reply_probability = 1
class InterestChatting:
def __init__(
self,
decay_rate=global_config.default_decay_rate_per_second,
max_interest=MAX_INTEREST,
trigger_threshold=global_config.reply_trigger_threshold,
base_reply_probability=base_reply_probability,
increase_rate=probability_increase_rate_per_second,
decay_factor=global_config.probability_decay_factor_per_second,
max_probability=max_reply_probability,
):
self.interest_level: float = 0.0
self.last_update_time: float = time.time()
self.decay_rate_per_second: float = decay_rate
self.max_interest: float = max_interest
self.last_interaction_time: float = self.last_update_time
self.trigger_threshold: float = trigger_threshold
self.base_reply_probability: float = base_reply_probability
self.probability_increase_rate: float = increase_rate
self.probability_decay_factor: float = decay_factor
self.max_reply_probability: float = max_probability
self.current_reply_probability: float = 0.0
self.is_above_threshold: bool = False
self.interest_dict: Dict[str, tuple[MessageRecv, float, bool]] = {}
def add_interest_dict(self, message: MessageRecv, interest_value: float, is_mentioned: bool):
self.interest_dict[message.message_info.message_id] = (message, interest_value, is_mentioned)
self.last_interaction_time = time.time()
def _calculate_decay(self, current_time: float):
time_delta = current_time - self.last_update_time
if time_delta > 0:
old_interest = self.interest_level
if self.interest_level < 1e-9:
self.interest_level = 0.0
else:
if self.decay_rate_per_second <= 0:
interest_logger.warning(
f"InterestChatting encountered non-positive decay rate: {self.decay_rate_per_second}. Setting interest to 0."
)
self.interest_level = 0.0
elif self.interest_level < 0:
interest_logger.warning(
f"InterestChatting encountered negative interest level: {self.interest_level}. Setting interest to 0."
)
self.interest_level = 0.0
else:
try:
decay_factor = math.pow(self.decay_rate_per_second, time_delta)
self.interest_level *= decay_factor
except ValueError as e:
interest_logger.error(
f"Math error during decay calculation: {e}. Rate: {self.decay_rate_per_second}, Delta: {time_delta}, Level: {self.interest_level}. Setting interest to 0."
)
self.interest_level = 0.0
if old_interest != self.interest_level:
self.last_update_time = current_time
def _update_reply_probability(self, current_time: float):
time_delta = current_time - self.last_update_time
if time_delta <= 0:
return
currently_above = self.interest_level >= self.trigger_threshold
if currently_above:
if not self.is_above_threshold:
self.current_reply_probability = self.base_reply_probability
interest_logger.debug(
f"兴趣跨过阈值 ({self.trigger_threshold}). 概率重置为基础值: {self.base_reply_probability:.4f}"
)
else:
increase_amount = self.probability_increase_rate * time_delta
self.current_reply_probability += increase_amount
self.current_reply_probability = min(self.current_reply_probability, self.max_reply_probability)
else:
if 0 < self.probability_decay_factor < 1:
decay_multiplier = math.pow(self.probability_decay_factor, time_delta)
self.current_reply_probability *= decay_multiplier
if self.current_reply_probability < 1e-6:
self.current_reply_probability = 0.0
elif self.probability_decay_factor <= 0:
if self.current_reply_probability > 0:
interest_logger.warning(f"无效的衰减因子 ({self.probability_decay_factor}). 设置概率为0.")
self.current_reply_probability = 0.0
self.current_reply_probability = max(self.current_reply_probability, 0.0)
self.is_above_threshold = currently_above
def increase_interest(self, current_time: float, value: float):
self._update_reply_probability(current_time)
self._calculate_decay(current_time)
self.interest_level += value
self.interest_level = min(self.interest_level, self.max_interest)
self.last_update_time = current_time
self.last_interaction_time = current_time
def decrease_interest(self, current_time: float, value: float):
self._update_reply_probability(current_time)
self.interest_level -= value
self.interest_level = max(self.interest_level, 0.0)
self.last_update_time = current_time
self.last_interaction_time = current_time
def get_interest(self) -> float:
current_time = time.time()
self._update_reply_probability(current_time)
self._calculate_decay(current_time)
self.last_update_time = current_time
return self.interest_level
def get_state(self) -> dict:
interest = self.get_interest()
return {
"interest_level": round(interest, 2),
"last_update_time": self.last_update_time,
"current_reply_probability": round(self.current_reply_probability, 4),
"is_above_threshold": self.is_above_threshold,
"last_interaction_time": self.last_interaction_time,
}
def should_evaluate_reply(self) -> bool:
current_time = time.time()
self._update_reply_probability(current_time)
if self.current_reply_probability > 0:
trigger = random.random() < self.current_reply_probability
return trigger
else:
return False
class SubHeartflow: class SubHeartflow:
@@ -66,7 +221,10 @@ class SubHeartflow:
self.current_mind = "你什么也没想" self.current_mind = "你什么也没想"
self.past_mind = [] self.past_mind = []
self.current_state: CurrentState = CurrentState() self.chat_state: ChatStateInfo = ChatStateInfo()
self.interest_chatting = InterestChatting()
self.llm_model = LLMRequest( self.llm_model = LLMRequest(
model=global_config.llm_sub_heartflow, model=global_config.llm_sub_heartflow,
temperature=global_config.llm_sub_heartflow["temp"], temperature=global_config.llm_sub_heartflow["temp"],
@@ -123,7 +281,7 @@ class SubHeartflow:
self.last_active_time = time.time() # 更新最后激活时间戳 self.last_active_time = time.time() # 更新最后激活时间戳
current_thinking_info = self.current_mind current_thinking_info = self.current_mind
mood_info = self.current_state.mood mood_info = self.chat_state.mood
observation = self._get_primary_observation() observation = self._get_primary_observation()
# --- 获取观察信息 --- # # --- 获取观察信息 --- #
@@ -255,6 +413,26 @@ class SubHeartflow:
logger.warning(f"SubHeartflow {self.subheartflow_id} 没有找到有效的 ChattingObservation") logger.warning(f"SubHeartflow {self.subheartflow_id} 没有找到有效的 ChattingObservation")
return None return None
def get_interest_state(self) -> dict:
"""获取当前兴趣状态"""
return self.interest_chatting.get_state()
def get_interest_level(self) -> float:
"""获取当前兴趣等级"""
return self.interest_chatting.get_interest()
def should_evaluate_reply(self) -> bool:
"""判断是否应该评估回复"""
return self.interest_chatting.should_evaluate_reply()
def add_interest_dict_entry(self, message: MessageRecv, interest_value: float, is_mentioned: bool):
"""添加兴趣字典条目"""
self.interest_chatting.add_interest_dict(message, interest_value, is_mentioned)
def get_interest_dict(self) -> Dict[str, tuple[MessageRecv, float, bool]]:
"""获取兴趣字典"""
return self.interest_chatting.interest_dict
init_prompt() init_prompt()
# subheartflow = SubHeartflow() # subheartflow = SubHeartflow()

View File

@@ -17,7 +17,6 @@ from .common.logger import get_module_logger
from .plugins.remote import heartbeat_thread # noqa: F401 from .plugins.remote import heartbeat_thread # noqa: F401
from .individuality.individuality import Individuality from .individuality.individuality import Individuality
from .common.server import global_server from .common.server import global_server
from .plugins.chat_module.heartFC_chat.interest import InterestManager
from .plugins.chat_module.heartFC_chat.heartFC_controler import HeartFCController from .plugins.chat_module.heartFC_chat.heartFC_controler import HeartFCController
logger = get_module_logger("main") logger = get_module_logger("main")
@@ -112,11 +111,6 @@ class MainSystem:
asyncio.create_task(heartflow.heartflow_start_working()) asyncio.create_task(heartflow.heartflow_start_working())
logger.success("心流系统启动成功") logger.success("心流系统启动成功")
# 启动 InterestManager 的后台任务
interest_manager = InterestManager() # 获取单例
await interest_manager.start_background_tasks()
logger.success("兴趣管理器后台任务启动成功")
# 初始化并独立启动 HeartFCController # 初始化并独立启动 HeartFCController
HeartFCController() HeartFCController()
heartfc_chat_instance = HeartFCController.get_instance() heartfc_chat_instance = HeartFCController.get_instance()

View File

@@ -27,7 +27,6 @@ class ChatBot:
self.bot = None # bot 实例引用 self.bot = None # bot 实例引用
self._started = False self._started = False
self.mood_manager = MoodManager.get_instance() # 获取情绪管理器单例 self.mood_manager = MoodManager.get_instance() # 获取情绪管理器单例
self.mood_manager.start_mood_update() # 启动情绪更新
self.reasoning_chat = ReasoningChat() self.reasoning_chat = ReasoningChat()
self.heartFC_processor = HeartFCProcessor() # 新增 self.heartFC_processor = HeartFCProcessor() # 新增

View File

@@ -7,10 +7,10 @@ from ...chat.emoji_manager import emoji_manager
from .heartFC_generator import ResponseGenerator from .heartFC_generator import ResponseGenerator
from .messagesender import MessageManager from .messagesender import MessageManager
from src.heart_flow.heartflow import heartflow from src.heart_flow.heartflow import heartflow
from src.heart_flow.sub_heartflow import SubHeartflow, ChatState
from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig
from src.plugins.person_info.relationship_manager import relationship_manager from src.plugins.person_info.relationship_manager import relationship_manager
from src.do_tool.tool_use import ToolUser from src.do_tool.tool_use import ToolUser
from .interest import InterestManager
from src.plugins.chat.chat_stream import chat_manager from src.plugins.chat.chat_stream import chat_manager
from .pf_chatting import PFChatting from .pf_chatting import PFChatting
@@ -46,34 +46,22 @@ class HeartFCController:
# 使用 _initialized 标志确保 __init__ 只执行一次 # 使用 _initialized 标志确保 __init__ 只执行一次
if self._initialized: if self._initialized:
return return
# 虽然 __new__ 保证了只有一个实例,但为了防止意外重入或多线程下的初始化竞争,
# 再次使用类锁保护初始化过程是更严谨的做法。
# 如果确定 __init__ 逻辑本身是幂等的或非关键的,可以省略这里的锁。
# 但为了保持原始逻辑的意图(防止重复初始化),这里保留检查。
with self.__class__._lock: # 确保初始化逻辑线程安全
if self._initialized: # 再次检查,防止锁等待期间其他线程已完成初始化
return
logger.info("正在初始化 HeartFCController 单例...") self.gpt = ResponseGenerator()
self.gpt = ResponseGenerator() self.mood_manager = MoodManager.get_instance()
self.mood_manager = MoodManager.get_instance() self.tool_user = ToolUser()
# 注意mood_manager 的 start_mood_update 可能需要在应用主循环启动后调用, self._interest_monitor_task: Optional[asyncio.Task] = None
# 或者确保其内部实现是安全的。这里保持原状。
self.mood_manager.start_mood_update() self.heartflow = heartflow
self.tool_user = ToolUser()
# 注意InterestManager() 可能是另一个单例或需要特定初始化。 self.pf_chatting_instances: Dict[str, PFChatting] = {}
# 假设 InterestManager() 返回的是正确配置的实例。 self._pf_chatting_lock = asyncio.Lock() # 这个是 asyncio.Lock用于异步上下文
self.interest_manager = InterestManager() self.emoji_manager = emoji_manager # 假设是全局或已初始化的实例
self._interest_monitor_task: Optional[asyncio.Task] = None self.relationship_manager = relationship_manager # 假设是全局或已初始化的实例
self.pf_chatting_instances: Dict[str, PFChatting] = {}
# _pf_chatting_lock 用于保护 pf_chatting_instances 的异步操作 self.MessageManager = MessageManager
self._pf_chatting_lock = asyncio.Lock() # 这个是 asyncio.Lock用于异步上下文 self._initialized = True
self.emoji_manager = emoji_manager # 假设是全局或已初始化的实例 logger.info("HeartFCController 单例初始化完成。")
self.relationship_manager = relationship_manager # 假设是全局或已初始化的实例
# MessageManager 可能是类本身或单例实例,根据其设计确定
self.MessageManager = MessageManager
self._initialized = True
logger.info("HeartFCController 单例初始化完成。")
@classmethod @classmethod
def get_instance(cls): def get_instance(cls):
@@ -114,7 +102,7 @@ class HeartFCController:
if self._interest_monitor_task is None or self._interest_monitor_task.done(): if self._interest_monitor_task is None or self._interest_monitor_task.done():
try: try:
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
self._interest_monitor_task = loop.create_task(self._interest_monitor_loop()) self._interest_monitor_task = loop.create_task(self._response_control_loop())
except RuntimeError: except RuntimeError:
logger.error("创建兴趣监控任务失败:没有运行中的事件循环。") logger.error("创建兴趣监控任务失败:没有运行中的事件循环。")
raise raise
@@ -138,41 +126,41 @@ class HeartFCController:
# --- End Added PFChatting Instance Manager --- # --- End Added PFChatting Instance Manager ---
async def _interest_monitor_loop(self): # async def update_mai_Status(self):
# """后台任务,定期检查更新麦麦状态"""
# logger.info("麦麦状态更新循环开始...")
# while True:
# await asyncio.sleep(0)
# self.heartflow.update_chat_status()
async def _response_control_loop(self):
"""后台任务,定期检查兴趣度变化并触发回复""" """后台任务,定期检查兴趣度变化并触发回复"""
logger.info("兴趣监控循环开始...") logger.info("兴趣监控循环开始...")
while True: while True:
await asyncio.sleep(INTEREST_MONITOR_INTERVAL_SECONDS) await asyncio.sleep(INTEREST_MONITOR_INTERVAL_SECONDS)
try: try:
# 从心流中获取活跃流 # 从心流中获取活跃流
active_stream_ids = list(heartflow.get_all_subheartflows_streams_ids()) active_stream_ids = list(self.heartflow.get_all_subheartflows_streams_ids())
for stream_id in active_stream_ids: for stream_id in active_stream_ids:
stream_name = chat_manager.get_stream_name(stream_id) or stream_id # 获取流名称 stream_name = chat_manager.get_stream_name(stream_id) or stream_id # 获取流名称
sub_hf = heartflow.get_subheartflow(stream_id) sub_hf = self.heartflow.get_subheartflow(stream_id)
if not sub_hf: if not sub_hf:
logger.warning(f"监控循环: 无法获取活跃流 {stream_name} 的 sub_hf") logger.warning(f"监控循环: 无法获取活跃流 {stream_name} 的 sub_hf")
continue continue
should_trigger = False should_trigger_hfc = False
try: try:
interest_chatting = self.interest_manager.get_interest_chatting(stream_id) interest_chatting = sub_hf.interest_chatting
if interest_chatting: should_trigger_hfc = interest_chatting.should_evaluate_reply()
should_trigger = interest_chatting.should_evaluate_reply()
else:
logger.trace(
f"[{stream_name}] 没有找到对应的 InterestChatting 实例,跳过基于兴趣的触发检查。"
)
except Exception as e: except Exception as e:
logger.error(f"检查兴趣触发器时出错 流 {stream_name}: {e}") logger.error(f"检查兴趣触发器时出错 流 {stream_name}: {e}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
if should_trigger: if should_trigger_hfc:
# 启动一次麦麦聊天 # 启动一次麦麦聊天
pf_instance = await self._get_or_create_pf_chatting(stream_id) await self._trigger_hfc(sub_hf)
if pf_instance:
asyncio.create_task(pf_instance.add_time())
else:
logger.error(f"[{stream_name}] 无法获取或创建PFChatting实例。跳过触发。")
except asyncio.CancelledError: except asyncio.CancelledError:
logger.info("兴趣监控循环已取消。") logger.info("兴趣监控循环已取消。")
@@ -181,3 +169,17 @@ class HeartFCController:
logger.error(f"兴趣监控循环错误: {e}") logger.error(f"兴趣监控循环错误: {e}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
await asyncio.sleep(5) # 发生错误时等待 await asyncio.sleep(5) # 发生错误时等待
async def _trigger_hfc(self, sub_hf: SubHeartflow):
chat_state = sub_hf.chat_state
if chat_state == ChatState.ABSENT:
chat_state = ChatState.CHAT
elif chat_state == ChatState.CHAT:
chat_state = ChatState.FOCUSED
# 从 sub_hf 获取 stream_id
if chat_state == ChatState.FOCUSED:
stream_id = sub_hf.subheartflow_id
pf_instance = await self._get_or_create_pf_chatting(stream_id)
if pf_instance: # 确保实例成功获取或创建
asyncio.create_task(pf_instance.add_time())

View File

@@ -11,7 +11,6 @@ from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig
from ...chat.chat_stream import chat_manager from ...chat.chat_stream import chat_manager
from ...chat.message_buffer import message_buffer from ...chat.message_buffer import message_buffer
from ...utils.timer_calculater import Timer from ...utils.timer_calculater import Timer
from .interest import InterestManager
from src.plugins.person_info.relationship_manager import relationship_manager from src.plugins.person_info.relationship_manager import relationship_manager
from .reasoning_chat import ReasoningChat from .reasoning_chat import ReasoningChat
@@ -22,14 +21,10 @@ processor_config = LogConfig(
) )
logger = get_module_logger("heartFC_processor", config=processor_config) logger = get_module_logger("heartFC_processor", config=processor_config)
# # 定义兴趣度增加触发回复的阈值 (移至 InterestManager)
# INTEREST_INCREASE_THRESHOLD = 0.5
class HeartFCProcessor: class HeartFCProcessor:
def __init__(self): def __init__(self):
self.storage = MessageStorage() self.storage = MessageStorage()
self.interest_manager = InterestManager()
self.reasoning_chat = ReasoningChat.get_instance() self.reasoning_chat = ReasoningChat.get_instance()
async def process_message(self, message_data: str) -> None: async def process_message(self, message_data: str) -> None:
@@ -74,9 +69,15 @@ class HeartFCProcessor:
group_info=groupinfo, group_info=groupinfo,
) )
# --- 添加兴趣追踪启动 --- # --- 确保 SubHeartflow 存在 ---
# 在获取到 chat 对象后,启动对该聊天流的兴趣监控 subheartflow = await heartflow.create_subheartflow(chat.stream_id)
await self.reasoning_chat.start_monitoring_interest(chat) if not subheartflow:
logger.error(f"无法为 stream_id {chat.stream_id} 创建或获取 SubHeartflow中止处理")
return
# --- 添加兴趣追踪启动 (现在移动到这里,确保 subheartflow 存在后启动) ---
# 在获取到 chat 对象和确认 subheartflow 后,启动对该聊天流的兴趣监控
await self.reasoning_chat.start_monitoring_interest(chat) # start_monitoring_interest 内部需要修改以适应
# --- 结束添加 --- # --- 结束添加 ---
message.update_chat_stream(chat) message.update_chat_stream(chat)
@@ -141,23 +142,35 @@ class HeartFCProcessor:
logger.error(f"计算记忆激活率失败: {e}") logger.error(f"计算记忆激活率失败: {e}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
# --- 修改:兴趣度更新逻辑 --- #
if is_mentioned: if is_mentioned:
interested_rate += 0.8 interest_increase_on_mention = 2
mentioned_boost = interest_increase_on_mention # 从配置获取提及增加值
interested_rate += mentioned_boost
logger.trace(f"消息提及机器人,额外增加兴趣 {mentioned_boost:.2f}")
# 更新兴趣度 # 更新兴趣度 (调用 SubHeartflow 的方法)
current_interest = 0.0 # 初始化
try: try:
self.interest_manager.increase_interest(chat.stream_id, value=interested_rate) # 获取当前时间,传递给 increase_interest
current_interest = self.interest_manager.get_interest(chat.stream_id) # 获取更新后的值用于日志 current_time = time.time()
subheartflow.interest_chatting.increase_interest(current_time, value=interested_rate)
current_interest = subheartflow.get_interest_level() # 获取更新后的值
logger.trace( logger.trace(
f"使用激活率 {interested_rate:.2f} 更新后 (通过缓冲后),当前兴趣度: {current_interest:.2f}" f"使用激活率 {interested_rate:.2f} 更新后 (通过缓冲后),当前兴趣度: {current_interest:.2f} (Stream: {chat.stream_id})"
) )
self.interest_manager.add_interest_dict(message, interested_rate, is_mentioned) # 添加到 SubHeartflow 的 interest_dict
subheartflow.add_interest_dict_entry(message, interested_rate, is_mentioned)
logger.trace(
f"Message {message.message_info.message_id} added to interest dict for stream {chat.stream_id}"
)
except Exception as e: except Exception as e:
logger.error(f"更新兴趣度失败: {e}") # 调整日志消息 logger.error(f"更新兴趣度失败 (Stream: {chat.stream_id}): {e}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
# ---- 兴趣度计算和更新结束 ---- # --- 结束修改 --- #
# 打印消息接收和处理信息 # 打印消息接收和处理信息
mes_name = chat.group_info.group_name if chat.group_info else "私聊" mes_name = chat.group_info.group_name if chat.group_info else "私聊"

View File

@@ -1,503 +0,0 @@
import time
import math
import asyncio
import threading
import json # 引入 json
import os # 引入 os
from typing import Optional # <--- 添加导入
import random # <--- 添加导入 random
from src.plugins.chat.message import MessageRecv
from src.common.logger import get_module_logger, LogConfig, DEFAULT_CONFIG # 引入 DEFAULT_CONFIG
from src.plugins.chat.chat_stream import chat_manager # *** Import ChatManager ***
# 定义日志配置 (使用 loguru 格式)
interest_log_config = LogConfig(
console_format=DEFAULT_CONFIG["console_format"], # 使用默认控制台格式
file_format=DEFAULT_CONFIG["file_format"], # 使用默认文件格式
)
logger = get_module_logger("InterestManager", config=interest_log_config)
# 定义常量
DEFAULT_DECAY_RATE_PER_SECOND = 0.98 # 每秒衰减率 (兴趣保留 99%)
MAX_INTEREST = 15.0 # 最大兴趣值
# MIN_INTEREST_THRESHOLD = 0.1 # 低于此值可能被清理 (可选)
CLEANUP_INTERVAL_SECONDS = 1200 # 清理任务运行间隔 (例如20分钟)
INACTIVE_THRESHOLD_SECONDS = 1200 # 不活跃时间阈值 (例如20分钟)
LOG_INTERVAL_SECONDS = 3 # 日志记录间隔 (例如30秒)
LOG_DIRECTORY = "logs/interest" # 日志目录
# LOG_FILENAME = "interest_log.json" # 快照日志文件名 (保留,以防其他地方用到)
HISTORY_LOG_FILENAME = "interest_history.log" # 新的历史日志文件名
# 移除阈值,将移至 HeartFC_Chat
# INTEREST_INCREASE_THRESHOLD = 0.5
# --- 新增:概率回复相关常量 ---
REPLY_TRIGGER_THRESHOLD = 3.0 # 触发概率回复的兴趣阈值 (示例值)
BASE_REPLY_PROBABILITY = 0.1 # 首次超过阈值时的基础回复概率 (示例值)
PROBABILITY_INCREASE_RATE_PER_SECOND = 0.02 # 高于阈值时,每秒概率增加量 (线性增长, 示例值)
PROBABILITY_DECAY_FACTOR_PER_SECOND = 0.2 # 低于阈值时,每秒概率衰减因子 (指数衰减, 示例值)
MAX_REPLY_PROBABILITY = 1 # 回复概率上限 (示例值)
# --- 结束:概率回复相关常量 ---
class InterestChatting:
def __init__(
self,
decay_rate=DEFAULT_DECAY_RATE_PER_SECOND,
max_interest=MAX_INTEREST,
trigger_threshold=REPLY_TRIGGER_THRESHOLD,
base_reply_probability=BASE_REPLY_PROBABILITY,
increase_rate=PROBABILITY_INCREASE_RATE_PER_SECOND,
decay_factor=PROBABILITY_DECAY_FACTOR_PER_SECOND,
max_probability=MAX_REPLY_PROBABILITY,
):
self.interest_level: float = 0.0
self.last_update_time: float = time.time() # 同时作为兴趣和概率的更新时间基准
self.decay_rate_per_second: float = decay_rate
self.max_interest: float = max_interest
self.last_interaction_time: float = self.last_update_time # 新增:最后交互时间
# --- 新增:概率回复相关属性 ---
self.trigger_threshold: float = trigger_threshold
self.base_reply_probability: float = base_reply_probability
self.probability_increase_rate: float = increase_rate
self.probability_decay_factor: float = decay_factor
self.max_reply_probability: float = max_probability
self.current_reply_probability: float = 0.0
self.is_above_threshold: bool = False # 标记兴趣值是否高于阈值
# --- 结束:概率回复相关属性 ---
# 记录激发兴趣对(消息id,激活值)
self.interest_dict = {}
def add_interest_dict(self, message: MessageRecv, interest_value: float, is_mentioned: bool):
# Store the MessageRecv object and the interest value as a tuple
self.interest_dict[message.message_info.message_id] = (message, interest_value, is_mentioned)
def _calculate_decay(self, current_time: float):
"""计算从上次更新到现在的衰减"""
time_delta = current_time - self.last_update_time
if time_delta > 0:
# 指数衰减: interest = interest * (decay_rate ^ time_delta)
# 添加处理极小兴趣值避免 math domain error
old_interest = self.interest_level
if self.interest_level < 1e-9:
self.interest_level = 0.0
else:
# 检查 decay_rate_per_second 是否为非正数,避免 math domain error
if self.decay_rate_per_second <= 0:
logger.warning(
f"InterestChatting encountered non-positive decay rate: {self.decay_rate_per_second}. Setting interest to 0."
)
self.interest_level = 0.0
# 检查 interest_level 是否为负数,虽然理论上不应发生,但以防万一
elif self.interest_level < 0:
logger.warning(
f"InterestChatting encountered negative interest level: {self.interest_level}. Setting interest to 0."
)
self.interest_level = 0.0
else:
try:
decay_factor = math.pow(self.decay_rate_per_second, time_delta)
self.interest_level *= decay_factor
except ValueError as e:
# 捕获潜在的 math domain error例如对负数开非整数次方虽然已加保护
logger.error(
f"Math error during decay calculation: {e}. Rate: {self.decay_rate_per_second}, Delta: {time_delta}, Level: {self.interest_level}. Setting interest to 0."
)
self.interest_level = 0.0
# 防止低于阈值 (如果需要)
# self.interest_level = max(self.interest_level, MIN_INTEREST_THRESHOLD)
# 只有在兴趣值发生变化时才更新时间戳
if old_interest != self.interest_level:
self.last_update_time = current_time
def _update_reply_probability(self, current_time: float):
"""根据当前兴趣是否超过阈值及时间差,更新回复概率"""
time_delta = current_time - self.last_update_time
if time_delta <= 0:
return # 时间未前进,无需更新
currently_above = self.interest_level >= self.trigger_threshold
if currently_above:
if not self.is_above_threshold:
# 刚跨过阈值,重置为基础概率
self.current_reply_probability = self.base_reply_probability
logger.debug(
f"兴趣跨过阈值 ({self.trigger_threshold}). 概率重置为基础值: {self.base_reply_probability:.4f}"
)
else:
# 持续高于阈值,线性增加概率
increase_amount = self.probability_increase_rate * time_delta
self.current_reply_probability += increase_amount
# logger.debug(f"兴趣高于阈值 ({self.trigger_threshold}) 持续 {time_delta:.2f}秒. 概率增加 {increase_amount:.4f} 到 {self.current_reply_probability:.4f}")
# 限制概率不超过最大值
self.current_reply_probability = min(self.current_reply_probability, self.max_reply_probability)
else:
if 0 < self.probability_decay_factor < 1:
decay_multiplier = math.pow(self.probability_decay_factor, time_delta)
# old_prob = self.current_reply_probability
self.current_reply_probability *= decay_multiplier
# 避免因浮点数精度问题导致概率略微大于0直接设为0
if self.current_reply_probability < 1e-6:
self.current_reply_probability = 0.0
# logger.debug(f"兴趣低于阈值 ({self.trigger_threshold}) 持续 {time_delta:.2f}秒. 概率从 {old_prob:.4f} 衰减到 {self.current_reply_probability:.4f} (因子: {self.probability_decay_factor})")
elif self.probability_decay_factor <= 0:
# 如果衰减因子无效或为0直接清零
if self.current_reply_probability > 0:
logger.warning(f"无效的衰减因子 ({self.probability_decay_factor}). 设置概率为0.")
self.current_reply_probability = 0.0
# else: decay_factor >= 1, probability will not decay or increase, which might be intended in some cases.
# 确保概率不低于0
self.current_reply_probability = max(self.current_reply_probability, 0.0)
# 更新状态标记
self.is_above_threshold = currently_above
# 更新时间戳放在调用者处,确保 interest 和 probability 基于同一点更新
def increase_interest(self, current_time: float, value: float):
"""根据传入的值增加兴趣值,并记录增加量"""
# 先更新概率和计算衰减(基于上次更新时间)
self._update_reply_probability(current_time)
self._calculate_decay(current_time)
# 应用增加
self.interest_level += value
self.interest_level = min(self.interest_level, self.max_interest) # 不超过最大值
self.last_update_time = current_time # 更新时间戳
self.last_interaction_time = current_time # 更新最后交互时间
def decrease_interest(self, current_time: float, value: float):
"""降低兴趣值并更新时间 (确保不低于0)"""
# 先更新概率(基于上次更新时间)
self._update_reply_probability(current_time)
# 注意:降低兴趣度是否需要先衰减?取决于具体逻辑,这里假设不衰减直接减
self.interest_level -= value
self.interest_level = max(self.interest_level, 0.0) # 确保不低于0
self.last_update_time = current_time # 降低也更新时间戳
self.last_interaction_time = current_time # 更新最后交互时间
def get_interest(self) -> float:
"""获取当前兴趣值 (计算衰减后)"""
# 注意:这个方法现在会触发概率和兴趣的更新
current_time = time.time()
self._update_reply_probability(current_time)
self._calculate_decay(current_time)
self.last_update_time = current_time # 更新时间戳
return self.interest_level
def get_state(self) -> dict:
"""获取当前状态字典"""
# 调用 get_interest 来确保状态已更新
interest = self.get_interest()
return {
"interest_level": round(interest, 2),
"last_update_time": self.last_update_time,
"current_reply_probability": round(self.current_reply_probability, 4), # 添加概率到状态
"is_above_threshold": self.is_above_threshold, # 添加阈值状态
"last_interaction_time": self.last_interaction_time, # 新增:添加最后交互时间到状态
# 可以选择性地暴露 last_increase_amount 给状态,方便调试
# "last_increase_amount": round(self.last_increase_amount, 2)
}
def should_evaluate_reply(self) -> bool:
"""
判断是否应该触发一次回复评估。
首先更新概率状态,然后根据当前概率进行随机判断。
"""
current_time = time.time()
# 确保概率是基于最新兴趣值计算的
self._update_reply_probability(current_time)
# 更新兴趣衰减(如果需要,取决于逻辑,这里保持和 get_interest 一致)
# self._calculate_decay(current_time)
# self.last_update_time = current_time # 更新时间戳
if self.current_reply_probability > 0:
# 只有在阈值之上且概率大于0时才有可能触发
trigger = random.random() < self.current_reply_probability
# if trigger:
# logger.info(f"回复概率评估触发! 概率: {self.current_reply_probability:.4f}, 阈值: {self.trigger_threshold}, 兴趣: {self.interest_level:.2f}")
# # 可选:触发后是否重置/降低概率?根据需要决定
# # self.current_reply_probability = self.base_reply_probability # 例如,触发后降回基础概率
# # self.current_reply_probability *= 0.5 # 例如,触发后概率减半
# else:
# logger.debug(f"回复概率评估未触发。概率: {self.current_reply_probability:.4f}")
return trigger
else:
# logger.debug(f"Reply evaluation check: Below threshold or zero probability. Probability: {self.current_reply_probability:.4f}")
return False
class InterestManager:
_instance = None
_lock = threading.Lock()
_initialized = False
def __new__(cls, *args, **kwargs):
if cls._instance is None:
with cls._lock:
# Double-check locking
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not self._initialized:
with self._lock:
# 确保初始化也只执行一次
if not self._initialized:
logger.info("Initializing InterestManager singleton...")
# key: stream_id (str), value: InterestChatting instance
self.interest_dict: dict[str, InterestChatting] = {}
# 保留旧的快照文件路径变量,尽管此任务不再写入
# self._snapshot_log_file_path = os.path.join(LOG_DIRECTORY, LOG_FILENAME)
# 定义新的历史日志文件路径
self._history_log_file_path = os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME)
self._ensure_log_directory()
self._cleanup_task = None
self._logging_task = None # 添加日志任务变量
self._initialized = True
logger.info("InterestManager initialized.") # 修改日志消息
self._decay_task = None # 新增:衰减任务变量
def _ensure_log_directory(self):
"""确保日志目录存在"""
try:
os.makedirs(LOG_DIRECTORY, exist_ok=True)
logger.info(f"Log directory '{LOG_DIRECTORY}' ensured.")
except OSError as e:
logger.error(f"Error creating log directory '{LOG_DIRECTORY}': {e}")
async def _periodic_cleanup_task(self, interval_seconds: int, max_age_seconds: int):
"""后台清理任务的异步函数"""
while True:
await asyncio.sleep(interval_seconds)
logger.info(f"运行定期清理 (间隔: {interval_seconds}秒)...")
self.cleanup_inactive_chats(max_age_seconds=max_age_seconds)
async def _periodic_log_task(self, interval_seconds: int):
"""后台日志记录任务的异步函数 (记录历史数据,包含 group_name)"""
while True:
await asyncio.sleep(interval_seconds)
# logger.debug(f"运行定期历史记录 (间隔: {interval_seconds}秒)...")
try:
current_timestamp = time.time()
all_states = self.get_all_interest_states() # 获取当前所有状态
# 以追加模式打开历史日志文件
with open(self._history_log_file_path, "a", encoding="utf-8") as f:
count = 0
for stream_id, state in all_states.items():
# *** Get group name from ChatManager ***
group_name = stream_id # Default to stream_id
try:
# Use the imported chat_manager instance
chat_stream = chat_manager.get_stream(stream_id)
if chat_stream and chat_stream.group_info:
group_name = chat_stream.group_info.group_name
elif chat_stream and not chat_stream.group_info:
# Handle private chats - maybe use user nickname?
group_name = (
f"私聊_{chat_stream.user_info.user_nickname}"
if chat_stream.user_info
else stream_id
)
except Exception as e:
logger.warning(f"Could not get group name for stream_id {stream_id}: {e}")
# Fallback to stream_id is already handled by default value
log_entry = {
"timestamp": round(current_timestamp, 2),
"stream_id": stream_id,
"interest_level": state.get("interest_level", 0.0), # 确保有默认值
"group_name": group_name, # *** Add group_name ***
# --- 新增:记录概率相关信息 ---
"reply_probability": state.get("current_reply_probability", 0.0),
"is_above_threshold": state.get("is_above_threshold", False),
# --- 结束新增 ---
}
# 将每个条目作为单独的 JSON 行写入
f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
count += 1
# logger.debug(f"Successfully appended {count} interest history entries to {self._history_log_file_path}")
# 注意:不再写入快照文件 interest_log.json
# 如果需要快照文件,可以在这里单独写入 self._snapshot_log_file_path
# 例如:
# with open(self._snapshot_log_file_path, 'w', encoding='utf-8') as snap_f:
# json.dump(all_states, snap_f, indent=4, ensure_ascii=False)
# logger.debug(f"Successfully wrote snapshot to {self._snapshot_log_file_path}")
except IOError as e:
logger.error(f"Error writing interest history log to {self._history_log_file_path}: {e}")
except Exception as e:
logger.error(f"Unexpected error during periodic history logging: {e}")
async def _periodic_decay_task(self):
"""后台衰减任务的异步函数,每秒更新一次所有实例的衰减"""
while True:
await asyncio.sleep(1) # 每秒运行一次
current_time = time.time()
# logger.debug("Running periodic decay calculation...") # 调试日志,可能过于频繁
# 创建字典项的快照进行迭代,避免在迭代时修改字典的问题
items_snapshot = list(self.interest_dict.items())
count = 0
for stream_id, chatting in items_snapshot:
try:
# 调用 InterestChatting 实例的衰减方法
chatting._calculate_decay(current_time)
count += 1
except Exception as e:
logger.error(f"Error calculating decay for stream_id {stream_id}: {e}")
# if count > 0: # 仅在实际处理了项目时记录日志,避免空闲时刷屏
# logger.debug(f"Applied decay to {count} streams.")
async def start_background_tasks(self):
"""启动清理,启动衰减,启动记录,启动启动启动启动启动"""
if self._cleanup_task is None or self._cleanup_task.done():
self._cleanup_task = asyncio.create_task(
self._periodic_cleanup_task(
interval_seconds=CLEANUP_INTERVAL_SECONDS, max_age_seconds=INACTIVE_THRESHOLD_SECONDS
)
)
logger.info(
f"已创建定期清理任务。间隔时间: {CLEANUP_INTERVAL_SECONDS}秒, 不活跃阈值: {INACTIVE_THRESHOLD_SECONDS}"
)
else:
logger.warning("跳过创建清理任务:任务已在运行或存在。")
if self._logging_task is None or self._logging_task.done():
self._logging_task = asyncio.create_task(self._periodic_log_task(interval_seconds=LOG_INTERVAL_SECONDS))
logger.info(f"已创建定期日志任务。间隔时间: {LOG_INTERVAL_SECONDS}")
else:
logger.warning("跳过创建日志任务:任务已在运行或存在。")
# 启动新的衰减任务
if self._decay_task is None or self._decay_task.done():
self._decay_task = asyncio.create_task(self._periodic_decay_task())
logger.info("已创建定期衰减任务。间隔时间: 1秒")
else:
logger.warning("跳过创建衰减任务:任务已在运行或存在。")
def get_all_interest_states(self) -> dict[str, dict]:
"""获取所有聊天流的当前兴趣状态"""
# 不再需要 current_time, 因为 get_state 现在不接收它
states = {}
# 创建副本以避免在迭代时修改字典
items_snapshot = list(self.interest_dict.items())
for stream_id, chatting in items_snapshot:
try:
# 直接调用 get_state它会使用内部的 get_interest 获取已更新的值
states[stream_id] = chatting.get_state()
except Exception as e:
logger.warning(f"Error getting state for stream_id {stream_id}: {e}")
return states
def get_interest_chatting(self, stream_id: str) -> Optional[InterestChatting]:
"""获取指定流的 InterestChatting 实例,如果不存在则返回 None"""
return self.interest_dict.get(stream_id)
def _get_or_create_interest_chatting(self, stream_id: str) -> InterestChatting:
"""获取或创建指定流的 InterestChatting 实例 (线程安全)"""
if stream_id not in self.interest_dict:
logger.debug(f"创建兴趣流: {stream_id}")
# --- 修改:创建时传入概率相关参数 (如果需要定制化,否则使用默认值) ---
self.interest_dict[stream_id] = InterestChatting(
# decay_rate=..., max_interest=..., # 可以从配置读取
trigger_threshold=REPLY_TRIGGER_THRESHOLD, # 使用全局常量
base_reply_probability=BASE_REPLY_PROBABILITY,
increase_rate=PROBABILITY_INCREASE_RATE_PER_SECOND,
decay_factor=PROBABILITY_DECAY_FACTOR_PER_SECOND,
max_probability=MAX_REPLY_PROBABILITY,
)
# --- 结束修改 ---
# 首次创建时兴趣为 0由第一次消息的 activate rate 决定初始值
return self.interest_dict[stream_id]
def get_interest(self, stream_id: str) -> float:
"""获取指定聊天流当前的兴趣度 (值由后台任务更新)"""
# current_time = time.time() # 不再需要获取当前时间
interest_chatting = self._get_or_create_interest_chatting(stream_id)
# 直接调用修改后的 get_interest不传入时间
return interest_chatting.get_interest()
def increase_interest(self, stream_id: str, value: float):
"""当收到消息时,增加指定聊天流的兴趣度"""
current_time = time.time()
interest_chatting = self._get_or_create_interest_chatting(stream_id)
# 调用修改后的 increase_interest不再传入 message
interest_chatting.increase_interest(current_time, value)
stream_name = chat_manager.get_stream_name(stream_id) or stream_id # 获取流名称
logger.debug(
f"增加了聊天流 {stream_name} 的兴趣度 {value:.2f},当前值为 {interest_chatting.interest_level:.2f}"
) # 更新日志
def decrease_interest(self, stream_id: str, value: float):
"""降低指定聊天流的兴趣度"""
current_time = time.time()
# 尝试获取,如果不存在则不做任何事
interest_chatting = self.get_interest_chatting(stream_id)
if interest_chatting:
interest_chatting.decrease_interest(current_time, value)
stream_name = chat_manager.get_stream_name(stream_id) or stream_id # 获取流名称
logger.debug(
f"降低了聊天流 {stream_name} 的兴趣度 {value:.2f},当前值为 {interest_chatting.interest_level:.2f}"
)
else:
stream_name = chat_manager.get_stream_name(stream_id) or stream_id # 获取流名称
logger.warning(f"尝试降低不存在的聊天流 {stream_name} 的兴趣度")
def add_interest_dict(self, message: MessageRecv, interest_value: float, is_mentioned: bool):
interest_chatting = self._get_or_create_interest_chatting(message.chat_stream.stream_id)
interest_chatting.add_interest_dict(message, interest_value, is_mentioned)
def cleanup_inactive_chats(self, max_age_seconds=INACTIVE_THRESHOLD_SECONDS):
"""
清理长时间不活跃的聊天流记录
max_age_seconds: 超过此时间未更新的将被清理
"""
current_time = time.time()
keys_to_remove = []
initial_count = len(self.interest_dict)
# with self._lock: # 如果需要锁整个迭代过程
# 创建副本以避免在迭代时修改字典
items_snapshot = list(self.interest_dict.items())
for stream_id, chatting in items_snapshot:
# 先计算当前兴趣,确保是最新的
# 加锁保护 chatting 对象状态的读取和可能的修改
# with self._lock: # 如果 InterestChatting 内部操作不是原子的
last_interaction = chatting.last_interaction_time # 使用最后交互时间
should_remove = False
reason = ""
# 只有设置了 max_age_seconds 才检查时间
if (
max_age_seconds is not None and (current_time - last_interaction) > max_age_seconds
): # 使用 last_interaction
should_remove = True
reason = f"inactive time ({current_time - last_interaction:.0f}s) > max age ({max_age_seconds}s)" # 更新日志信息
if should_remove:
keys_to_remove.append(stream_id)
stream_name = chat_manager.get_stream_name(stream_id) or stream_id # 获取流名称
logger.debug(f"Marking stream {stream_name} for removal. Reason: {reason}")
if keys_to_remove:
logger.info(f"清理识别到 {len(keys_to_remove)} 个不活跃/低兴趣的流。")
# with self._lock: # 确保删除操作的原子性
for key in keys_to_remove:
# 再次检查 key 是否存在,以防万一在迭代和删除之间状态改变
if key in self.interest_dict:
del self.interest_dict[key]
stream_name = chat_manager.get_stream_name(key) or key # 获取流名称
logger.debug(f"移除了流: {stream_name}")
final_count = initial_count - len(keys_to_remove)
logger.info(f"清理完成。移除了 {len(keys_to_remove)} 个流。当前数量: {final_count}")
else:
logger.info(f"清理完成。没有流符合移除条件。当前数量: {initial_count}")

View File

@@ -753,7 +753,7 @@ class PFChatting:
# --- Generate Response with LLM --- # # --- Generate Response with LLM --- #
# Access gpt instance via controller # Access gpt instance via controller
gpt_instance = self.heartfc_controller.gpt gpt_instance = self.heartfc_controller.gpt
logger.debug(f"{log_prefix}[Replier-{thinking_id}] Calling LLM to generate response...") # logger.debug(f"{log_prefix}[Replier-{thinking_id}] Calling LLM to generate response...")
# Ensure generate_response has access to current_mind if it's crucial context # Ensure generate_response has access to current_mind if it's crucial context
response_set = await gpt_instance.generate_response( response_set = await gpt_instance.generate_response(

View File

@@ -20,8 +20,8 @@ from src.plugins.chat.chat_stream import ChatStream
from src.plugins.person_info.relationship_manager import relationship_manager from src.plugins.person_info.relationship_manager import relationship_manager
from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager
from src.plugins.utils.timer_calculater import Timer from src.plugins.utils.timer_calculater import Timer
from .interest import InterestManager from src.heart_flow.heartflow import heartflow
from .heartFC_controler import HeartFCController # 导入 HeartFCController from .heartFC_controler import HeartFCController
# 定义日志配置 # 定义日志配置
chat_config = LogConfig( chat_config = LogConfig(
@@ -56,11 +56,9 @@ class ReasoningChat:
self.storage = MessageStorage() self.storage = MessageStorage()
self.gpt = ResponseGenerator() self.gpt = ResponseGenerator()
self.mood_manager = MoodManager.get_instance() self.mood_manager = MoodManager.get_instance()
self.mood_manager.start_mood_update()
# 用于存储每个 chat stream 的兴趣监控任务 # 用于存储每个 chat stream 的兴趣监控任务
self._interest_monitoring_tasks: Dict[str, asyncio.Task] = {} self._interest_monitoring_tasks: Dict[str, asyncio.Task] = {}
self._initialized = True self._initialized = True
self.interest_manager = InterestManager()
logger.info("ReasoningChat 单例初始化完成。") # 添加日志 logger.info("ReasoningChat 单例初始化完成。") # 添加日志
@classmethod @classmethod
@@ -182,55 +180,69 @@ class ReasoningChat:
# 此函数设计为后台任务,轮询指定 chat 的兴趣消息。 # 此函数设计为后台任务,轮询指定 chat 的兴趣消息。
# 它通常由外部代码在 chat 流活跃时启动。 # 它通常由外部代码在 chat 流活跃时启动。
controller = HeartFCController.get_instance() # 获取控制器实例 controller = HeartFCController.get_instance() # 获取控制器实例
stream_id = chat.stream_id # 获取 stream_id
if not controller: if not controller:
logger.error(f"无法获取 HeartFCController 实例,无法检查 PFChatting 状态。stream: {chat.stream_id}") logger.error(f"无法获取 HeartFCController 实例,无法检查 PFChatting 状态。stream: {stream_id}")
# 在没有控制器的情况下可能需要决定是继续处理还是完全停止?这里暂时假设继续 # 在没有控制器的情况下可能需要决定是继续处理还是完全停止?这里暂时假设继续
pass # 或者 return? pass # 或者 return?
logger.info(f"[{stream_id}] 兴趣消息监控任务启动。") # 增加启动日志
while True: while True:
await asyncio.sleep(1) # 每秒检查一次 await asyncio.sleep(1) # 每秒检查一次
interest_chatting = self.interest_manager.get_interest_chatting(chat.stream_id)
if not interest_chatting: # --- 修改:通过 heartflow 获取 subheartflow 和 interest_dict --- #
continue subheartflow = heartflow.get_subheartflow(stream_id)
interest_dict = interest_chatting.interest_dict if interest_chatting.interest_dict else {} # 检查 subheartflow 是否存在以及是否被标记停止
if not subheartflow or subheartflow.should_stop:
logger.info(f"[{stream_id}] SubHeartflow 不存在或已停止,兴趣消息监控任务退出。")
break # 退出循环,任务结束
# 从 subheartflow 获取 interest_dict
interest_dict = subheartflow.get_interest_dict()
# --- 结束修改 --- #
# 创建 items 快照进行迭代,避免在迭代时修改字典
items_to_process = list(interest_dict.items()) items_to_process = list(interest_dict.items())
if not items_to_process: if not items_to_process:
continue continue # 没有需要处理的消息,继续等待
# logger.debug(f"[{stream_id}] 发现 {len(items_to_process)} 条待处理兴趣消息。") # 调试日志
for msg_id, (message, interest_value, is_mentioned) in items_to_process: for msg_id, (message, interest_value, is_mentioned) in items_to_process:
# --- 检查 PFChatting 是否活跃 --- # # --- 检查 PFChatting 是否活跃 --- #
pf_active = False pf_active = False
if controller: if controller:
pf_active = controller.is_pf_chatting_active(chat.stream_id) pf_active = controller.is_pf_chatting_active(stream_id)
if pf_active: if pf_active:
# 如果 PFChatting 活跃,则跳过处理,直接移除消息 # 如果 PFChatting 活跃,则跳过处理,直接移除消息
removed_item = interest_dict.pop(msg_id, None) removed_item = interest_dict.pop(msg_id, None)
if removed_item: if removed_item:
logger.debug(f"PFChatting 活跃,已跳过并移除兴趣消息 {msg_id} for stream: {chat.stream_id}") logger.debug(f"[{stream_id}] PFChatting 活跃,已跳过并移除兴趣消息 {msg_id}")
continue # 处理下一条消息 continue # 处理下一条消息
# --- 结束检查 --- # # --- 结束检查 --- #
# 只有当 PFChatting 不活跃时才执行以下处理逻辑 # 只有当 PFChatting 不活跃时才执行以下处理逻辑
try: try:
# logger.debug(f"正在处理消息 {msg_id} for stream: {chat.stream_id}") # 可选调试信息 # logger.debug(f"[{stream_id}] 正在处理兴趣消息 {msg_id} (兴趣值: {interest_value:.2f})" )
await self.normal_reasoning_chat( await self.normal_reasoning_chat(
message=message, message=message,
chat=chat, chat=chat, # chat 对象仍然有效
is_mentioned=is_mentioned, is_mentioned=is_mentioned,
interested_rate=interest_value, interested_rate=interest_value, # 使用从字典获取的原始兴趣值
) )
# logger.debug(f"处理完成消息 {msg_id}") # 可选调试信息 # logger.debug(f"[{stream_id}] 处理完成消息 {msg_id}")
except Exception as e: except Exception as e:
logger.error(f"处理兴趣消息 {msg_id} 时出错: {e}\n{traceback.format_exc()}") logger.error(f"[{stream_id}] 处理兴趣消息 {msg_id} 时出错: {e}\n{traceback.format_exc()}")
finally: finally:
# 无论处理成功与否且PFChatting不活跃都尝试从原始字典中移除该消息 # 无论处理成功与否且PFChatting不活跃都尝试从原始字典中移除该消息
# 使用 pop(key, None) 避免 Key Error
removed_item = interest_dict.pop(msg_id, None) removed_item = interest_dict.pop(msg_id, None)
if removed_item: if removed_item:
logger.debug(f"已从兴趣字典中移除消息 {msg_id}") logger.debug(f"[{stream_id}] 已从兴趣字典中移除消息 {msg_id}")
async def normal_reasoning_chat( async def normal_reasoning_chat(
self, message: MessageRecv, chat: ChatStream, is_mentioned: bool, interested_rate: float self, message: MessageRecv, chat: ChatStream, is_mentioned: bool, interested_rate: float
@@ -281,7 +293,10 @@ class ReasoningChat:
# 生成回复 # 生成回复
try: try:
with Timer("生成回复", timing_results): with Timer("生成回复", timing_results):
response_set = await self.gpt.generate_response(message, thinking_id) response_set = await self.gpt.generate_response(
message=message,
thinking_id=thinking_id,
)
info_catcher.catch_after_generate_response(timing_results["生成回复"]) info_catcher.catch_after_generate_response(timing_results["生成回复"])
except Exception as e: except Exception as e:
@@ -289,23 +304,34 @@ class ReasoningChat:
response_set = None response_set = None
if not response_set: if not response_set:
logger.info("为什么生成回复失败?") logger.info(f"[{chat.stream_id}] 模型未生成回复内容")
return # 如果模型未生成回复,移除思考消息
container = message_manager.get_container(chat.stream_id)
# thinking_message = None
for msg in container.messages[:]: # Iterate over a copy
if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id:
# thinking_message = msg
container.messages.remove(msg)
logger.debug(f"[{chat.stream_id}] 已移除未产生回复的思考消息 {thinking_id}")
break
return # 不发送回复
# 发送消息 logger.info(f"[{chat.stream_id}] 回复内容: {response_set}")
with Timer("发送消息", timing_results):
# 发送回复
with Timer("消息发送", timing_results):
first_bot_msg = await self._send_response_messages(message, chat, response_set, thinking_id) first_bot_msg = await self._send_response_messages(message, chat, response_set, thinking_id)
info_catcher.catch_after_response(timing_results["发送消息"], response_set, first_bot_msg) info_catcher.catch_after_response(timing_results["消息发送"], response_set, first_bot_msg)
info_catcher.done_catch() info_catcher.done_catch()
# 处理表情包 # 处理表情包
with Timer("处理表情包", timing_results): with Timer("处理表情包", timing_results):
await self._handle_emoji(message, chat, response_set) await self._handle_emoji(message, chat, response_set[0])
# 更新关系情绪 # 更新关系情绪
with Timer("更新关系情绪", timing_results): with Timer("关系更新", timing_results):
await self._update_relationship(message, response_set) await self._update_relationship(message, response_set)
# 回复后处理 # 回复后处理
@@ -349,64 +375,51 @@ class ReasoningChat:
return False return False
async def start_monitoring_interest(self, chat: ChatStream): async def start_monitoring_interest(self, chat: ChatStream):
"""为指定的 ChatStream 启动后台兴趣消息监控任务。""" """为指定的 ChatStream 启动兴趣消息监控任务(如果尚未运行)"""
stream_id = chat.stream_id stream_id = chat.stream_id
# 检查任务是否已在运行 if stream_id not in self._interest_monitoring_tasks or self._interest_monitoring_tasks[stream_id].done():
if stream_id in self._interest_monitoring_tasks and not self._interest_monitoring_tasks[stream_id].done(): logger.info(f"为聊天流 {stream_id} 启动兴趣消息监控任务...")
task = self._interest_monitoring_tasks[stream_id] # 创建新任务
if not task.cancelled(): # 确保任务未被取消 task = asyncio.create_task(self._find_interested_message(chat))
logger.info(f"兴趣监控任务已在运行 stream: {stream_id}") # 添加完成回调
return task.add_done_callback(lambda t: self._handle_task_completion(stream_id, t))
else: self._interest_monitoring_tasks[stream_id] = task
logger.info(f"发现已取消的任务,重新创建 stream: {stream_id}") # else:
# 如果任务被取消了,允许重新创建 # logger.debug(f"聊天流 {stream_id} 的兴趣消息监控任务已在运行。")
logger.info(f"启动兴趣监控任务 stream: {stream_id}...")
# 创建新的后台任务来运行 _find_interested_message
task = asyncio.create_task(self._find_interested_message(chat))
self._interest_monitoring_tasks[stream_id] = task
# 添加回调,当任务完成(或被取消)时,自动从字典中移除
task.add_done_callback(lambda t: self._handle_task_completion(stream_id, t))
def _handle_task_completion(self, stream_id: str, task: asyncio.Task): def _handle_task_completion(self, stream_id: str, task: asyncio.Task):
"""处理监控任务完成的回调。""" """兴趣监控任务完成的回调函数"""
try: try:
# 检查任务是否因异常而结束 # 检查任务是否因异常而结束
exception = task.exception() exception = task.exception()
if exception: if exception:
logger.error(f"兴趣监控任务 stream {stream_id} 异常结束: {exception}", exc_info=exception) logger.error(f"聊天流 {stream_id} 的兴趣监控任务因异常结束: {exception}")
elif task.cancelled(): logger.error(traceback.format_exc()) # 记录完整的 traceback
logger.info(f"兴趣监控任务 stream {stream_id} 已被取消。")
else: else:
logger.info(f"兴趣监控任务 stream {stream_id} 正常结束。") # 理论上 while True 不会正常结束 logger.info(f"聊天流 {stream_id} 的兴趣监控任务正常结束。")
except asyncio.CancelledError: except asyncio.CancelledError:
logger.info(f"兴趣监控任务 stream {stream_id} 在完成处理期间被取消。") logger.info(f"聊天流 {stream_id} 的兴趣监控任务被取消。")
except Exception as e:
logger.error(f"处理聊天流 {stream_id} 任务完成回调时出错: {e}")
finally: finally:
# 无论如何都从字典中移除 # 从字典中移除已完成或取消的任务
removed_task = self._interest_monitoring_tasks.pop(stream_id, None) if stream_id in self._interest_monitoring_tasks:
if removed_task: del self._interest_monitoring_tasks[stream_id]
logger.debug(f"已从监控任务字典移除 stream: {stream_id}") logger.debug(f"已从监控任务字典移除 {stream_id}")
async def stop_monitoring_interest(self, stream_id: str): async def stop_monitoring_interest(self, stream_id: str):
"""停止指定 stream_id 的兴趣消息监控任务。""" """停止指定聊天流的兴趣监控任务。"""
if stream_id in self._interest_monitoring_tasks: if stream_id in self._interest_monitoring_tasks:
task = self._interest_monitoring_tasks[stream_id] task = self._interest_monitoring_tasks[stream_id]
if not task.done(): if task and not task.done():
logger.info(f"正在停止兴趣监控任务 stream: {stream_id}...") task.cancel() # 尝试取消任务
task.cancel() # 请求取消任务 logger.info(f"尝试取消聊天流 {stream_id} 的兴趣监控任务。")
try: try:
# 等待任务实际被取消(可选,提供更明确的停止) await task # 等待任务响应取消
# 设置超时以防万一
await asyncio.wait_for(task, timeout=5.0)
except asyncio.CancelledError: except asyncio.CancelledError:
logger.info(f"兴趣监控任务 stream {stream_id} 已确认取消。") logger.info(f"聊天流 {stream_id} 的兴趣监控任务已成功取消。")
except asyncio.TimeoutError:
logger.warning(f"停止兴趣监控任务 stream {stream_id} 超时。任务可能仍在运行。")
except Exception as e: except Exception as e:
# 捕获 task.exception() 可能在取消期间重新引发的错误 logger.error(f"等待聊天流 {stream_id} 监控任务取消时出现异常: {e}")
logger.error(f"停止兴趣监控任务 stream {stream_id} 时发生错误: {e}") # 在回调函数 _handle_task_completion 中移除任务
# 任务最终会由 done_callback 移除,或在这里再次确认移除 # else:
self._interest_monitoring_tasks.pop(stream_id, None) # logger.debug(f"聊天流 {stream_id} 没有正在运行的兴趣监控任务可停止。")
else:
logger.warning(f"尝试停止不存在或已停止的监控任务 stream: {stream_id}")

View File

@@ -36,7 +36,6 @@ class ReasoningChat:
self.storage = MessageStorage() self.storage = MessageStorage()
self.gpt = ResponseGenerator() self.gpt = ResponseGenerator()
self.mood_manager = MoodManager.get_instance() self.mood_manager = MoodManager.get_instance()
self.mood_manager.start_mood_update()
@staticmethod @staticmethod
async def _create_thinking_message(message, chat, userinfo, messageinfo): async def _create_thinking_message(message, chat, userinfo, messageinfo):

View File

@@ -198,9 +198,9 @@ class PersonInfoManager:
"nickname": "昵称", "nickname": "昵称",
"reason": "理由" "reason": "理由"
}""" }"""
logger.debug(f"取名提示词:{qv_name_prompt}") # logger.debug(f"取名提示词:{qv_name_prompt}")
response = await self.qv_name_llm.generate_response(qv_name_prompt) response = await self.qv_name_llm.generate_response(qv_name_prompt)
logger.debug(f"取名回复:{response}") logger.debug(f"取名提示词:{qv_name_prompt}\n取名回复:{response}")
result = self._extract_json_from_text(response[0]) result = self._extract_json_from_text(response[0])
if not result["nickname"]: if not result["nickname"]:
@@ -217,7 +217,7 @@ class PersonInfoManager:
await self.update_one_field(person_id, "name_reason", result["reason"]) await self.update_one_field(person_id, "name_reason", result["reason"])
self.person_name_list[person_id] = result["nickname"] self.person_name_list[person_id] = result["nickname"]
logger.debug(f"用户 {person_id} 的名称已更新为 {result['nickname']},原因:{result['reason']}") # logger.debug(f"用户 {person_id} 的名称已更新为 {result['nickname']},原因:{result['reason']}")
return result return result
else: else:
existing_names += f"{result['nickname']}" existing_names += f"{result['nickname']}"

View File

@@ -89,8 +89,8 @@ class RelationshipManager:
person_id = person_info_manager.get_person_id(platform, user_id) person_id = person_info_manager.get_person_id(platform, user_id)
is_qved = await person_info_manager.has_one_field(person_id, "person_name") is_qved = await person_info_manager.has_one_field(person_id, "person_name")
old_name = await person_info_manager.get_value(person_id, "person_name") old_name = await person_info_manager.get_value(person_id, "person_name")
print(f"old_name: {old_name}") # print(f"old_name: {old_name}")
print(f"is_qved: {is_qved}") # print(f"is_qved: {is_qved}")
if is_qved and old_name is not None: if is_qved and old_name is not None:
return True return True
else: else: