feat:pfc Lite(hearfFC)在群聊初步可用

This commit is contained in:
SengokuCola
2025-04-17 23:43:41 +08:00
parent 7cfb09badc
commit 920aa5ed84
10 changed files with 985 additions and 121 deletions

View File

@@ -93,6 +93,10 @@ class InterestMonitorApp:
# --- 初始化和启动刷新 ---
self.update_display() # 首次加载并开始刷新循环
def on_stream_selected(self, event=None):
"""当 Combobox 选择改变时调用,更新单个流的图表"""
self.update_single_stream_plot()
def get_random_color(self):
"""生成随机颜色用于区分线条"""
return "#{:06x}".format(random.randint(0, 0xFFFFFF))
@@ -305,11 +309,82 @@ class InterestMonitorApp:
self.ax_single_probability.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))
selected_name = self.selected_stream_id.get()
selected_sid = None
# --- 新增:根据选中的名称找到 stream_id ---
if selected_name:
for sid, name in self.stream_display_names.items():
if name == selected_name:
selected_sid = sid
break
all_times = [] # 用于确定 X 轴范围
# --- 新增:绘制兴趣度图 ---
if selected_sid and selected_sid in self.stream_history and self.stream_history[selected_sid]:
history = self.stream_history[selected_sid]
timestamps, interests = zip(*history)
try:
mpl_dates = [datetime.fromtimestamp(ts) for ts in timestamps]
all_times.extend(mpl_dates)
self.ax_single_interest.plot(
mpl_dates,
interests,
color=self.stream_colors.get(selected_sid, 'blue'),
marker='.',
markersize=3,
linestyle='-',
linewidth=1
)
except ValueError as e:
print(f"Skipping interest plot for {selected_sid} due to invalid timestamp: {e}")
# --- 新增:绘制概率图 ---
if selected_sid and selected_sid in self.probability_history and self.probability_history[selected_sid]:
prob_history = self.probability_history[selected_sid]
prob_timestamps, probabilities = zip(*prob_history)
try:
prob_mpl_dates = [datetime.fromtimestamp(ts) for ts in prob_timestamps]
# 注意:概率图的时间点可能与兴趣度不同,也需要加入 all_times
all_times.extend(prob_mpl_dates)
self.ax_single_probability.plot(
prob_mpl_dates,
probabilities,
color=self.stream_colors.get(selected_sid, 'green'), # 可以用不同颜色
marker='.',
markersize=3,
linestyle='-',
linewidth=1
)
except ValueError as e:
print(f"Skipping probability plot for {selected_sid} due to invalid timestamp: {e}")
# --- 新增:调整 X 轴范围和格式 ---
if all_times:
min_time = min(all_times)
max_time = max(all_times)
# 设置共享的 X 轴范围
self.ax_single_interest.set_xlim(min_time, max_time)
# self.ax_single_probability.set_xlim(min_time, max_time) # sharex 会自动同步
# 自动格式化X轴标签 (应用到共享轴的最后一个子图上通常即可)
self.fig_single.autofmt_xdate()
else:
# 如果没有数据,设置一个默认的时间范围
now = datetime.now()
one_hour_ago = now - timedelta(hours=1)
self.ax_single_interest.set_xlim(one_hour_ago, now)
# self.ax_single_probability.set_xlim(one_hour_ago, now) # sharex 会自动同步
# --- 新增:重新绘制画布 ---
self.canvas_single.draw()
def update_display(self):
"""主更新循环"""
try:
self.load_and_update_history() # 从文件加载数据并更新内部状态
self.update_plot() # 根据内部状态更新图表
# *** 修改:分别调用两个图表的更新方法 ***
self.update_all_streams_plot() # 更新所有流的图表
self.update_single_stream_plot() # 更新单个流的图表
except Exception as e:
# 提供更详细的错误信息
import traceback

View File

@@ -165,7 +165,7 @@ class ToolUser:
tool_calls_str = ""
for tool_call in tool_calls:
tool_calls_str += f"{tool_call['function']['name']}\n"
logger.info(f"根据:\n{prompt}\n模型请求调用{len(tool_calls)}个工具: {tool_calls_str}")
logger.info(f"根据:\n{prompt[0:100]}...\n模型请求调用{len(tool_calls)}个工具: {tool_calls_str}")
tool_results = []
structured_info = {} # 动态生成键

View File

@@ -244,6 +244,10 @@ class Heartflow:
"""获取指定ID的SubHeartflow实例"""
return self._subheartflows.get(observe_chat_id)
def get_all_subheartflows_streams_ids(self) -> list[Any]:
"""获取当前所有活跃的子心流的 ID 列表"""
return list(self._subheartflows.keys())
init_prompt()
# 创建一个全局的管理器实例

View File

@@ -37,13 +37,13 @@ def init_prompt():
# prompt += f"麦麦的总体想法是:{self.main_heartflow_info}\n\n"
prompt += "{extra_info}\n"
# prompt += "{prompt_schedule}\n"
prompt += "{relation_prompt_all}\n"
# prompt += "{relation_prompt_all}\n"
prompt += "{prompt_personality}\n"
prompt += "刚刚你的想法是{current_thinking_info}。可以适当转换话题\n"
prompt += "-----------------------------------\n"
prompt += "现在是{time_now}你正在上网和qq群里的网友们聊天群里正在聊的话题是\n{chat_observe_info}\n"
prompt += "你现在{mood_info}\n"
prompt += "你注意到{sender_name}刚刚说:{message_txt}\n"
# prompt += "你注意到{sender_name}刚刚说:{message_txt}\n"
prompt += "现在你接下去继续思考,产生新的想法,不要分点输出,输出连贯的内心独白"
prompt += "思考时可以想想如何对群聊内容进行回复。回复的要求是:平淡一些,简短一些,说中文,尽量不要说你说过的话。如果你要回复,最好只回复一个人的一个话题\n"
prompt += "请注意不要输出多余内容(包括前后缀,冒号和引号,括号, 表情,等),不要带有括号和动作描写"
@@ -199,7 +199,7 @@ class SubHeartflow:
logger.error(f"[{self.subheartflow_id}] do_observe called but no valid observation found.")
async def do_thinking_before_reply(
self, message_txt: str, sender_info: UserInfo, chat_stream: ChatStream, extra_info: str, obs_id: list[str] = None # 修改 obs_id 类型为 list[str]
self, chat_stream: ChatStream, extra_info: str, obs_id: list[str] = None # 修改 obs_id 类型为 list[str]
):
async with self._thinking_lock: # 获取思考锁
# --- 在思考前确保观察已执行 --- #
@@ -246,45 +246,45 @@ class SubHeartflow:
identity_detail = individuality.identity.identity_detail
if identity_detail: random.shuffle(identity_detail); prompt_personality += f",{identity_detail[0]}"
who_chat_in_group = [
(chat_stream.platform, sender_info.user_id, sender_info.user_nickname) # 先添加当前发送者
]
# 获取最近发言者,排除当前发送者,避免重复
recent_speakers = get_recent_group_speaker(
chat_stream.stream_id,
(chat_stream.platform, sender_info.user_id),
limit=global_config.MAX_CONTEXT_SIZE -1 # 减去当前发送者
)
who_chat_in_group.extend(recent_speakers)
# who_chat_in_group = [
# (chat_stream.platform, sender_info.user_id, sender_info.user_nickname) # 先添加当前发送者
# ]
# # 获取最近发言者,排除当前发送者,避免重复
# recent_speakers = get_recent_group_speaker(
# chat_stream.stream_id,
# (chat_stream.platform, sender_info.user_id),
# limit=global_config.MAX_CONTEXT_SIZE -1 # 减去当前发送者
# )
# who_chat_in_group.extend(recent_speakers)
relation_prompt = ""
unique_speakers = set() # 确保人物信息不重复
for person_tuple in who_chat_in_group:
person_key = (person_tuple[0], person_tuple[1]) # 使用 platform+id 作为唯一标识
if person_key not in unique_speakers:
relation_prompt += await relationship_manager.build_relationship_info(person_tuple)
unique_speakers.add(person_key)
# relation_prompt = ""
# unique_speakers = set() # 确保人物信息不重复
# for person_tuple in who_chat_in_group:
# person_key = (person_tuple[0], person_tuple[1]) # 使用 platform+id 作为唯一标识
# if person_key not in unique_speakers:
# relation_prompt += await relationship_manager.build_relationship_info(person_tuple)
# unique_speakers.add(person_key)
relation_prompt_all = (await global_prompt_manager.get_prompt_async("relationship_prompt")).format(
relation_prompt, sender_info.user_nickname
)
# relation_prompt_all = (await global_prompt_manager.get_prompt_async("relationship_prompt")).format(
# relation_prompt, sender_info.user_nickname
# )
sender_name_sign = (
f"<{chat_stream.platform}:{sender_info.user_id}:{sender_info.user_nickname}:{sender_info.user_cardname or 'NoCard'}>"
)
# sender_name_sign = (
# f"<{chat_stream.platform}:{sender_info.user_id}:{sender_info.user_nickname}:{sender_info.user_cardname or 'NoCard'}>"
# )
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
prompt = (await global_prompt_manager.get_prompt_async("sub_heartflow_prompt_before")).format(
extra_info=extra_info_prompt,
relation_prompt_all=relation_prompt_all,
# relation_prompt_all=relation_prompt_all,
prompt_personality=prompt_personality,
current_thinking_info=current_thinking_info,
time_now=time_now,
chat_observe_info=chat_observe_info,
mood_info=mood_info,
sender_name=sender_name_sign,
message_txt=message_txt,
# sender_name=sender_name_sign,
# message_txt=message_txt,
bot_name=self.bot_name,
)

View File

@@ -38,7 +38,7 @@ class ResponseGenerator:
self.current_model_type = "r1" # 默认使用 R1
self.current_model_name = "unknown model"
async def generate_response(self, message: MessageRecv, thinking_id: str) -> Optional[List[str]]:
async def generate_response(self, message: MessageRecv, thinking_id: str,) -> Optional[List[str]]:
"""根据当前模型类型选择对应的生成函数"""
logger.info(

View File

@@ -1,8 +1,9 @@
import time
from random import random
import traceback
from typing import List, Optional
from typing import List, Optional, Dict
import asyncio
from asyncio import Lock
from ...moods.moods import MoodManager
from ....config.config import global_config
from ...chat.emoji_manager import emoji_manager
@@ -19,7 +20,8 @@ from ...utils.timer_calculater import Timer
from src.do_tool.tool_use import ToolUser
from .interest import InterestManager, InterestChatting
from src.plugins.chat.chat_stream import chat_manager
from src.plugins.chat.message import MessageInfo
from src.plugins.chat.message import BaseMessageInfo
from .pf_chatting import PFChatting
# 定义日志配置
chat_config = LogConfig(
@@ -33,13 +35,32 @@ logger = get_module_logger("heartFC_chat", config=chat_config)
INTEREST_MONITOR_INTERVAL_SECONDS = 1
class HeartFC_Chat:
_instance = None # For potential singleton access if needed by MessageManager
def __init__(self):
# --- Updated Init ---
if HeartFC_Chat._instance is not None:
# Prevent re-initialization if used as a singleton
return
self.logger = logger # Make logger accessible via self
self.gpt = ResponseGenerator()
self.mood_manager = MoodManager.get_instance()
self.mood_manager.start_mood_update()
self.tool_user = ToolUser()
self.interest_manager = InterestManager()
self._interest_monitor_task: Optional[asyncio.Task] = None
# --- New PFChatting Management ---
self.pf_chatting_instances: Dict[str, PFChatting] = {}
self._pf_chatting_lock = Lock()
# --- End New PFChatting Management ---
HeartFC_Chat._instance = self # Register instance
# --- End Updated Init ---
# --- Added Class Method for Singleton Access ---
@classmethod
def get_instance(cls):
return cls._instance
# --- End Added Class Method ---
async def start(self):
"""启动异步任务,如兴趣监控器"""
@@ -61,14 +82,29 @@ class HeartFC_Chat:
else:
logger.warning("跳过兴趣监控任务创建:任务已存在或正在运行。")
# --- Added PFChatting Instance Manager ---
async def _get_or_create_pf_chatting(self, stream_id: str) -> Optional[PFChatting]:
"""获取现有PFChatting实例或创建新实例。"""
async with self._pf_chatting_lock:
if stream_id not in self.pf_chatting_instances:
self.logger.info(f"为流 {stream_id} 创建新的PFChatting实例")
# 传递 self (HeartFC_Chat 实例) 进行依赖注入
instance = PFChatting(stream_id, self)
# 执行异步初始化
if not await instance._initialize():
self.logger.error(f"为流 {stream_id} 初始化PFChatting失败")
return None
self.pf_chatting_instances[stream_id] = instance
return self.pf_chatting_instances[stream_id]
# --- End Added PFChatting Instance Manager ---
async def _interest_monitor_loop(self):
"""后台任务,定期检查兴趣度变化并触发回复"""
logger.info("兴趣监控循环开始...")
while True:
await asyncio.sleep(INTEREST_MONITOR_INTERVAL_SECONDS)
try:
# --- 修改:遍历 SubHeartflow 并检查触发器 ---
active_stream_ids = list(heartflow.get_all_subheartflows_streams_ids()) # 需要 heartflow 提供此方法
active_stream_ids = list(heartflow.get_all_subheartflows_streams_ids())
logger.trace(f"检查 {len(active_stream_ids)} 个活跃流是否足以开启心流对话...")
for stream_id in active_stream_ids:
@@ -77,26 +113,28 @@ class HeartFC_Chat:
logger.warning(f"监控循环: 无法获取活跃流 {stream_id} 的 sub_hf")
continue
# --- 获取 Observation 和消息列表 --- #
observation = sub_hf._get_primary_observation()
if not observation:
logger.warning(f"[{stream_id}] SubHeartflow 没有在观察,无法检查触发器。")
continue
observed_messages = observation.talking_message # 获取消息字典列表
# --- 结束获取 --- #
should_trigger = False
try:
# check_reply_trigger 可以选择性地接收 observed_messages 作为参数
should_trigger = await sub_hf.check_reply_trigger() # 目前 check_reply_trigger 还不处理这个
interest_chatting = self.interest_manager.get_interest_chatting(stream_id)
if interest_chatting:
should_trigger = interest_chatting.should_evaluate_reply()
if should_trigger:
logger.info(f"[{stream_id}] 基于兴趣概率决定启动交流模式 (概率: {interest_chatting.current_reply_probability:.4f})。")
else:
logger.trace(f"[{stream_id}] 没有找到对应的 InterestChatting 实例,跳过基于兴趣的触发检查。")
except Exception as e:
logger.error(f"错误调用 check_reply_trigger{stream_id}: {e}")
logger.error(f"检查兴趣触发器时出错{stream_id}: {e}")
logger.error(traceback.format_exc())
if should_trigger:
logger.info(f"[{stream_id}] SubHeartflow 决定开启心流对话。")
# 调用修改后的处理函数,传递 stream_id 和 observed_messages
asyncio.create_task(self._process_triggered_reply(stream_id, observed_messages))
logger.info(f"[{stream_id}] 触发条件满足, 委托给PFChatting.")
# --- 修改: 获取 PFChatting 实例并调用 add_time (无参数,时间由内部衰减逻辑决定) ---
pf_instance = await self._get_or_create_pf_chatting(stream_id)
if pf_instance:
# 调用 add_time 启动或延长循环,时间由 PFChatting 内部决定
asyncio.create_task(pf_instance.add_time())
else:
logger.error(f"[{stream_id}] 无法获取或创建PFChatting实例。跳过触发。")
except asyncio.CancelledError:
@@ -107,32 +145,6 @@ class HeartFC_Chat:
logger.error(traceback.format_exc())
await asyncio.sleep(5) # 发生错误时等待
async def _process_triggered_reply(self, stream_id: str, observed_messages: List[dict]):
"""Helper coroutine to handle the processing of a triggered reply based on SubHeartflow trigger."""
try:
logger.info(f"[{stream_id}] SubHeartflow 触发回复...")
# 调用修改后的 trigger_reply_generation
await self.trigger_reply_generation(stream_id, observed_messages)
# --- 调整兴趣降低逻辑 ---
# 这里的兴趣降低可能不再适用,或者需要基于不同的逻辑
# 例如,回复后可以将 SubHeartflow 的某种"回复意愿"状态重置
# 暂时注释掉,或根据需要调整
# chatting_instance = self.interest_manager.get_interest_chatting(stream_id)
# if chatting_instance:
# decrease_value = chatting_instance.trigger_threshold / 2 # 使用实例的阈值
# self.interest_manager.decrease_interest(stream_id, value=decrease_value)
# post_trigger_interest = self.interest_manager.get_interest(stream_id) # 获取更新后的兴趣
# logger.info(f"[{stream_id}] Interest decreased by {decrease_value:.2f} (InstanceThreshold/2) after processing triggered reply. Current interest: {post_trigger_interest:.2f}")
# else:
# logger.warning(f"[{stream_id}] Could not find InterestChatting instance after reply processing to decrease interest.")
logger.debug(f"[{stream_id}] Reply processing finished. (Interest decrease logic needs review).")
except Exception as e:
logger.error(f"Error processing SubHeartflow-triggered reply for stream_id {stream_id}: {e}") # 更新日志信息
logger.error(traceback.format_exc())
# --- 结束修改 ---
async def _create_thinking_message(self, anchor_message: Optional[MessageRecv]):
"""创建思考消息 (尝试锚定到 anchor_message)"""
if not anchor_message or not anchor_message.chat_stream:
@@ -270,7 +282,7 @@ class HeartFC_Chat:
sub_hf = None
anchor_message: Optional[MessageRecv] = None # <--- 重命名,用于锚定回复的消息对象
userinfo: Optional[UserInfo] = None
messageinfo: Optional[MessageInfo] = None
messageinfo: Optional[BaseMessageInfo] = None
timing_results = {}
current_mind = None
@@ -295,33 +307,58 @@ class HeartFC_Chat:
logger.error(traceback.format_exc())
return
# --- 2. 尝试从 observed_messages 重建最后一条消息作为锚点 --- #
# --- 2. 尝试从 observed_messages 重建最后一条消息作为锚点, 失败则创建占位符 --- #
try:
with Timer("获取最后消息锚点", timing_results):
with Timer("获取或创建锚点消息", timing_results):
reconstruction_failed = False
if observed_messages:
last_msg_dict = observed_messages[-1] # 直接从传入列表获取最后一条
# 尝试从字典重建 MessageRecv 对象(可能需要调整 MessageRecv 的构造方式或创建一个辅助函数)
# 这是一个简化示例,假设 MessageRecv 可以从字典初始化
# 你可能需要根据 MessageRecv 的实际 __init__ 来调整
try:
anchor_message = MessageRecv(last_msg_dict) # 假设 MessageRecv 支持从字典创建
last_msg_dict = observed_messages[-1]
logger.debug(f"[{stream_id}] Attempting to reconstruct MessageRecv from last observed message.")
anchor_message = MessageRecv(last_msg_dict, chat_stream=chat)
if not (anchor_message and anchor_message.message_info and anchor_message.message_info.message_id and anchor_message.message_info.user_info):
raise ValueError("Reconstructed MessageRecv missing essential info.")
userinfo = anchor_message.message_info.user_info
messageinfo = anchor_message.message_info
logger.debug(f"[{stream_id}] 获取到最后消息作为锚点: ID={messageinfo.message_id}, Sender={userinfo.user_nickname}")
except Exception as e_msg:
logger.error(f"[{stream_id}] 从字典重建最后消息 MessageRecv 失败: {e_msg}. 字典: {last_msg_dict}")
anchor_message = None # 重置以表示失败
logger.debug(f"[{stream_id}] Successfully reconstructed anchor message: ID={messageinfo.message_id}, Sender={userinfo.user_nickname}")
except Exception as e_reconstruct:
logger.warning(f"[{stream_id}] Reconstructing MessageRecv from observed message failed: {e_reconstruct}. Will create placeholder.")
reconstruction_failed = True
else:
logger.warning(f"[{stream_id}] 无法从 Observation 获取最后消息锚点。")
except Exception as e:
logger.error(f"[{stream_id}] 获取最后消息锚点时出错: {e}")
logger.error(traceback.format_exc())
# 即使没有锚点,也可能继续尝试生成非回复性消息,取决于后续逻辑
logger.warning(f"[{stream_id}] observed_messages is empty. Will create placeholder anchor message.")
reconstruction_failed = True # Treat empty observed_messages as a failure to reconstruct
# --- 3. 检查是否能继续 (需要思考消息锚点) ---
if not anchor_message:
logger.warning(f"[{stream_id}] 没有有效的消息锚点,无法创建思考消息和发送回复。取消回复生成。")
return
# 如果重建失败或 observed_messages 为空,创建占位符
if reconstruction_failed:
placeholder_id = f"mid_{int(time.time() * 1000)}" # 使用毫秒时间戳增加唯一性
placeholder_user = UserInfo(user_id="system_trigger", user_nickname="系统触发")
placeholder_msg_info = BaseMessageInfo(
message_id=placeholder_id,
platform=chat.platform,
group_info=chat.group_info,
user_info=placeholder_user,
time=time.time()
# 其他 BaseMessageInfo 可能需要的字段设为默认值或 None
)
# 创建 MessageRecv 实例,注意它需要消息字典结构,我们创建一个最小化的
placeholder_msg_dict = {
"message_info": placeholder_msg_info.to_dict(),
"processed_plain_text": "", # 提供空文本
"raw_message": "",
"time": placeholder_msg_info.time,
}
# 先只用字典创建实例
anchor_message = MessageRecv(placeholder_msg_dict)
# 然后调用方法更新 chat_stream
anchor_message.update_chat_stream(chat)
userinfo = anchor_message.message_info.user_info
messageinfo = anchor_message.message_info
logger.info(f"[{stream_id}] Created placeholder anchor message: ID={messageinfo.message_id}, Sender={userinfo.user_nickname}")
except Exception as e:
logger.error(f"[{stream_id}] 获取或创建锚点消息时出错: {e}")
logger.error(traceback.format_exc())
anchor_message = None # 确保出错时 anchor_message 为 None
# --- 4. 检查并发思考限制 (使用 anchor_message 简化获取) ---
try:
@@ -399,6 +436,7 @@ class HeartFC_Chat:
with Timer("生成内心想法(SubHF)", timing_results):
# 不再传递 message_txt 和 sender_info, SubHeartflow 应基于其内部观察
current_mind, past_mind = await sub_hf.do_thinking_before_reply(
# sender_info=userinfo,
chat_stream=chat,
extra_info=tool_result_info,
obs_id=get_mid_memory_id,
@@ -415,7 +453,8 @@ class HeartFC_Chat:
# --- 9. 调用 ResponseGenerator 生成回复 (使用 anchor_message 和 current_mind) ---
try:
with Timer("生成最终回复(GPT)", timing_results):
response_set = await self.gpt.generate_response(anchor_message, thinking_id, current_mind=current_mind)
# response_set = await self.gpt.generate_response(anchor_message, thinking_id, current_mind=current_mind)
response_set = await self.gpt.generate_response(anchor_message, thinking_id)
except Exception as e:
logger.error(f"[{stream_id}] GPT 生成回复失败: {e}")
logger.error(traceback.format_exc())

View File

@@ -20,11 +20,11 @@ logger = get_module_logger("InterestManager", config=interest_log_config)
# 定义常量
DEFAULT_DECAY_RATE_PER_SECOND = 0.95 # 每秒衰减率 (兴趣保留 99%)
MAX_INTEREST = 10.0 # 最大兴趣值
MIN_INTEREST_THRESHOLD = 0.1 # 低于此值可能被清理 (可选)
DEFAULT_DECAY_RATE_PER_SECOND = 0.98 # 每秒衰减率 (兴趣保留 99%)
MAX_INTEREST = 15.0 # 最大兴趣值
# MIN_INTEREST_THRESHOLD = 0.1 # 低于此值可能被清理 (可选)
CLEANUP_INTERVAL_SECONDS = 3600 # 清理任务运行间隔 (例如1小时)
INACTIVE_THRESHOLD_SECONDS = 3600 * 24 # 不活跃时间阈值 (例如1)
INACTIVE_THRESHOLD_SECONDS = 3600 # 不活跃时间阈值 (例如1小时)
LOG_INTERVAL_SECONDS = 3 # 日志记录间隔 (例如30秒)
LOG_DIRECTORY = "logs/interest" # 日志目录
LOG_FILENAME = "interest_log.json" # 快照日志文件名 (保留,以防其他地方用到)
@@ -33,11 +33,11 @@ HISTORY_LOG_FILENAME = "interest_history.log" # 新的历史日志文件名
# INTEREST_INCREASE_THRESHOLD = 0.5
# --- 新增:概率回复相关常量 ---
REPLY_TRIGGER_THRESHOLD = 5.0 # 触发概率回复的兴趣阈值 (示例值)
REPLY_TRIGGER_THRESHOLD = 3.0 # 触发概率回复的兴趣阈值 (示例值)
BASE_REPLY_PROBABILITY = 0.05 # 首次超过阈值时的基础回复概率 (示例值)
PROBABILITY_INCREASE_RATE_PER_SECOND = 0.02 # 高于阈值时,每秒概率增加量 (线性增长, 示例值)
PROBABILITY_DECAY_FACTOR_PER_SECOND = 0.3 # 低于阈值时,每秒概率衰减因子 (指数衰减, 示例值)
MAX_REPLY_PROBABILITY = 0.95 # 回复概率上限 (示例值)
MAX_REPLY_PROBABILITY = 1 # 回复概率上限 (示例值)
# --- 结束:概率回复相关常量 ---
class InterestChatting:
@@ -117,15 +117,15 @@ class InterestChatting:
# 持续高于阈值,线性增加概率
increase_amount = self.probability_increase_rate * time_delta
self.current_reply_probability += increase_amount
logger.debug(f"兴趣高于阈值 ({self.trigger_threshold}) 持续 {time_delta:.2f}秒. 概率增加 {increase_amount:.4f}{self.current_reply_probability:.4f}")
# logger.debug(f"兴趣高于阈值 ({self.trigger_threshold}) 持续 {time_delta:.2f}秒. 概率增加 {increase_amount:.4f} 到 {self.current_reply_probability:.4f}")
# 限制概率不超过最大值
self.current_reply_probability = min(self.current_reply_probability, self.max_reply_probability)
else: # 低于阈值
if self.is_above_threshold:
# 刚低于阈值,开始衰减
logger.debug(f"兴趣低于阈值 ({self.trigger_threshold}). 概率衰减开始于 {self.current_reply_probability:.4f}")
# if self.is_above_threshold:
# # 刚低于阈值,开始衰减
# logger.debug(f"兴趣低于阈值 ({self.trigger_threshold}). 概率衰减开始于 {self.current_reply_probability:.4f}")
# else: # 持续低于阈值,继续衰减
# pass # 不需要特殊处理
@@ -133,12 +133,12 @@ class InterestChatting:
# 检查 decay_factor 是否有效
if 0 < self.probability_decay_factor < 1:
decay_multiplier = math.pow(self.probability_decay_factor, time_delta)
old_prob = self.current_reply_probability
# old_prob = self.current_reply_probability
self.current_reply_probability *= decay_multiplier
# 避免因浮点数精度问题导致概率略微大于0直接设为0
if self.current_reply_probability < 1e-6:
self.current_reply_probability = 0.0
logger.debug(f"兴趣低于阈值 ({self.trigger_threshold}) 持续 {time_delta:.2f}秒. 概率从 {old_prob:.4f} 衰减到 {self.current_reply_probability:.4f} (因子: {self.probability_decay_factor})")
# logger.debug(f"兴趣低于阈值 ({self.trigger_threshold}) 持续 {time_delta:.2f}秒. 概率从 {old_prob:.4f} 衰减到 {self.current_reply_probability:.4f} (因子: {self.probability_decay_factor})")
elif self.probability_decay_factor <= 0:
# 如果衰减因子无效或为0直接清零
if self.current_reply_probability > 0:
@@ -212,19 +212,19 @@ class InterestChatting:
# 确保概率是基于最新兴趣值计算的
self._update_reply_probability(current_time)
# 更新兴趣衰减(如果需要,取决于逻辑,这里保持和 get_interest 一致)
self._calculate_decay(current_time)
self.last_update_time = current_time # 更新时间戳
# self._calculate_decay(current_time)
# self.last_update_time = current_time # 更新时间戳
if self.is_above_threshold and self.current_reply_probability > 0:
if self.current_reply_probability > 0:
# 只有在阈值之上且概率大于0时才有可能触发
trigger = random.random() < self.current_reply_probability
if trigger:
logger.info(f"Reply evaluation triggered! Probability: {self.current_reply_probability:.4f}, Threshold: {self.trigger_threshold}, Interest: {self.interest_level:.2f}")
logger.info(f"回复概率评估触发! 概率: {self.current_reply_probability:.4f}, 阈值: {self.trigger_threshold}, 兴趣: {self.interest_level:.2f}")
# 可选:触发后是否重置/降低概率?根据需要决定
# self.current_reply_probability = self.base_reply_probability # 例如,触发后降回基础概率
# self.current_reply_probability *= 0.5 # 例如,触发后概率减半
else:
logger.debug(f"Reply evaluation NOT triggered. Probability: {self.current_reply_probability:.4f}, Random value: {trigger + 1e-9:.4f}") # 打印随机值用于调试
logger.debug(f"回复概率评估未触发。概率: {self.current_reply_probability:.4f}")
return trigger
else:
# logger.debug(f"Reply evaluation check: Below threshold or zero probability. Probability: {self.current_reply_probability:.4f}")
@@ -271,12 +271,12 @@ class InterestManager:
except OSError as e:
logger.error(f"Error creating log directory '{LOG_DIRECTORY}': {e}")
async def _periodic_cleanup_task(self, interval_seconds: int, threshold: float, max_age_seconds: int):
async def _periodic_cleanup_task(self, interval_seconds: int, max_age_seconds: int):
"""后台清理任务的异步函数"""
while True:
await asyncio.sleep(interval_seconds)
logger.info(f"运行定期清理 (间隔: {interval_seconds}秒)...")
self.cleanup_inactive_chats(threshold=threshold, max_age_seconds=max_age_seconds)
self.cleanup_inactive_chats(max_age_seconds=max_age_seconds)
async def _periodic_log_task(self, interval_seconds: int):
"""后台日志记录任务的异步函数 (记录历史数据,包含 group_name)"""
@@ -318,7 +318,7 @@ class InterestManager:
# 将每个条目作为单独的 JSON 行写入
f.write(json.dumps(log_entry, ensure_ascii=False) + '\n')
count += 1
logger.debug(f"Successfully appended {count} interest history entries to {self._history_log_file_path}")
# logger.debug(f"Successfully appended {count} interest history entries to {self._history_log_file_path}")
# 注意:不再写入快照文件 interest_log.json
# 如果需要快照文件,可以在这里单独写入 self._snapshot_log_file_path
@@ -358,7 +358,6 @@ class InterestManager:
self._cleanup_task = asyncio.create_task(
self._periodic_cleanup_task(
interval_seconds=CLEANUP_INTERVAL_SECONDS,
threshold=MIN_INTEREST_THRESHOLD,
max_age_seconds=INACTIVE_THRESHOLD_SECONDS
)
)
@@ -449,10 +448,9 @@ class InterestManager:
else:
logger.warning(f"尝试降低不存在的聊天流 {stream_id} 的兴趣度")
def cleanup_inactive_chats(self, threshold=MIN_INTEREST_THRESHOLD, max_age_seconds=INACTIVE_THRESHOLD_SECONDS):
def cleanup_inactive_chats(self, max_age_seconds=INACTIVE_THRESHOLD_SECONDS):
"""
清理长时间不活跃的聊天流记录
threshold: 低于此兴趣度的将被清理
max_age_seconds: 超过此时间未更新的将被清理
"""
current_time = time.time()

View File

@@ -0,0 +1,726 @@
import asyncio
import time
import traceback
from typing import List, Optional, Dict, Any, Deque, Union, TYPE_CHECKING
from collections import deque
import json
from ....config.config import global_config
from ...chat.message import MessageRecv, BaseMessageInfo, MessageThinking, MessageSending
from ...chat.chat_stream import ChatStream
from ...message import UserInfo
from src.heart_flow.heartflow import heartflow, SubHeartflow
from src.plugins.chat.chat_stream import chat_manager
from .messagesender import MessageManager
from src.common.logger import get_module_logger, LogConfig, DEFAULT_CONFIG # 引入 DEFAULT_CONFIG
from src.plugins.models.utils_model import LLMRequest
from src.individuality.individuality import Individuality
# 定义日志配置 (使用 loguru 格式)
interest_log_config = LogConfig(
console_format=DEFAULT_CONFIG["console_format"], # 使用默认控制台格式
file_format=DEFAULT_CONFIG["file_format"] # 使用默认文件格式
)
logger = get_module_logger("PFChattingLoop", config=interest_log_config) # Logger Name Changed
# Forward declaration for type hinting
if TYPE_CHECKING:
from .heartFC_chat import HeartFC_Chat
PLANNER_TOOL_DEFINITION = [
{
"type": "function",
"function": {
"name": "decide_reply_action",
"description": "根据当前聊天内容和上下文,决定机器人是否应该回复以及如何回复。",
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["no_reply", "text_reply", "emoji_reply"],
"description": "决定采取的行动:'no_reply'(不回复), 'text_reply'(文本回复) 或 'emoji_reply'(表情回复)。"
},
"reasoning": {
"type": "string",
"description": "做出此决定的简要理由。"
},
"emoji_query": {
"type": "string",
"description": '如果行动是\'emoji_reply\',则指定表情的主题或概念(例如,"开心""困惑")。仅在需要表情回复时提供。'
}
},
"required": ["action", "reasoning"] # 强制要求提供行动和理由
}
}
}
]
class PFChatting:
"""
Manages a continuous Plan-Filter-Check (now Plan-Replier-Sender) loop
for generating replies within a specific chat stream, controlled by a timer.
The loop runs as long as the timer > 0.
"""
def __init__(self, chat_id: str, heartfc_chat_instance: 'HeartFC_Chat'):
"""
初始化PFChatting实例。
Args:
chat_id: The identifier for the chat stream (e.g., stream_id).
heartfc_chat_instance: 访问共享资源和方法的主HeartFC_Chat实例。
"""
self.heartfc_chat = heartfc_chat_instance # 访问logger, gpt, tool_user, _send_response_messages等。
self.stream_id: str = chat_id
self.chat_stream: Optional[ChatStream] = None
self.sub_hf: Optional[SubHeartflow] = None
self._initialized = False
self._init_lock = asyncio.Lock() # Ensure initialization happens only once
self._processing_lock = asyncio.Lock() # 确保只有一个 Plan-Replier-Sender 周期在运行
self._timer_lock = asyncio.Lock() # 用于安全更新计时器
self.planner_llm = LLMRequest(
model=global_config.llm_normal,
temperature=global_config.llm_normal["temp"],
max_tokens=1000,
request_type="action_planning"
)
# Internal state for loop control
self._loop_timer: float = 0.0 # Remaining time for the loop in seconds
self._loop_active: bool = False # Is the loop currently running?
self._loop_task: Optional[asyncio.Task] = None # Stores the main loop task
self._trigger_count_this_activation: int = 0 # Counts triggers within an active period
# Removed pending_replies as processing is now serial within the loop
# self.pending_replies: Dict[str, PendingReply] = {}
async def _initialize(self) -> bool:
"""
Lazy initialization to resolve chat_stream and sub_hf using the provided identifier.
Ensures the instance is ready to handle triggers.
"""
async with self._init_lock:
if self._initialized:
return True
try:
self.chat_stream = chat_manager.get_stream(self.stream_id)
if not self.chat_stream:
logger.error(f"PFChatting-{self.stream_id} 获取ChatStream失败。")
return False
# 子心流(SubHeartflow)可能初始不存在但后续会被创建
# 在需要它的方法中应优雅处理其可能缺失的情况
self.sub_hf = heartflow.get_subheartflow(self.stream_id)
if not self.sub_hf:
logger.warning(f"PFChatting-{self.stream_id} 获取SubHeartflow失败。一些功能可能受限。")
# 决定是否继续初始化。目前允许初始化。
self._initialized = True
logger.info(f"PFChatting-{self.stream_id} 初始化成功。")
return True
except Exception as e:
logger.error(f"PFChatting-{self.stream_id} 初始化失败: {e}")
logger.error(traceback.format_exc())
return False
async def add_time(self):
"""
Adds time to the loop timer with decay and starts the loop if it's not active.
Called externally (e.g., by HeartFC_Chat) to trigger or extend activity.
Durations: 1st trigger = 10s, 2nd = 5s, 3rd+ = 2s.
"""
if not self._initialized:
if not await self._initialize():
logger.error(f"PFChatting-{self.stream_id} 无法添加时间: 未初始化。")
return
async with self._timer_lock:
duration_to_add: float = 0.0
if not self._loop_active: # First trigger for this activation cycle
duration_to_add = 10.0
self._trigger_count_this_activation = 1 # Start counting for this activation
logger.info(f"[{self.stream_id}] First trigger in activation. Adding {duration_to_add:.1f}s.")
else: # Loop is already active, apply decay
self._trigger_count_this_activation += 1
if self._trigger_count_this_activation == 2:
duration_to_add = 5.0
logger.info(f"[{self.stream_id}] 2nd trigger in activation. Adding {duration_to_add:.1f}s.")
else: # 3rd trigger or more
duration_to_add = 2.0
logger.info(f"[{self.stream_id}] {self._trigger_count_this_activation}rd/+ trigger in activation. Adding {duration_to_add:.1f}s.")
new_timer_value = self._loop_timer + duration_to_add
self._loop_timer = max(0, new_timer_value) # Ensure timer doesn't go negative conceptually
logger.info(f"[{self.stream_id}] Timer is now {self._loop_timer:.1f}s.")
if not self._loop_active and self._loop_timer > 0:
logger.info(f"[{self.stream_id}] Timer > 0 and loop not active. Starting PF loop.")
self._loop_active = True
# Cancel previous task just in case (shouldn't happen if logic is correct)
if self._loop_task and not self._loop_task.done():
logger.warning(f"[{self.stream_id}] Found existing loop task unexpectedly during start. Cancelling it.")
self._loop_task.cancel()
self._loop_task = asyncio.create_task(self._run_pf_loop())
# Add callback to reset state if loop finishes or errors out
self._loop_task.add_done_callback(self._handle_loop_completion)
elif self._loop_active:
logger.debug(f"[{self.stream_id}] Loop already active. Timer extended.")
def _handle_loop_completion(self, task: asyncio.Task):
"""Callback executed when the _run_pf_loop task finishes."""
try:
# Check if the task raised an exception
exception = task.exception()
if exception:
logger.error(f"[{self.stream_id}] PF loop task completed with error: {exception}")
logger.error(traceback.format_exc())
else:
logger.info(f"[{self.stream_id}] PF loop task completed normally (timer likely expired or cancelled).")
except asyncio.CancelledError:
logger.info(f"[{self.stream_id}] PF loop task was cancelled.")
finally:
# Reset state regardless of how the task finished
self._loop_active = False
self._loop_task = None
# Ensure lock is released if the loop somehow exited while holding it
if self._processing_lock.locked():
logger.warning(f"[{self.stream_id}] Releasing processing lock after loop task completion.")
self._processing_lock.release()
logger.info(f"[{self.stream_id}] Loop state reset.")
async def _run_pf_loop(self):
"""
主循环,当计时器>0时持续进行计划并可能回复消息
管理每个循环周期的处理锁
"""
logger.info(f"[{self.stream_id}] 开始执行PF循环")
try:
while True:
# 使用计时器锁安全地检查当前计时器值
async with self._timer_lock:
current_timer = self._loop_timer
if current_timer <= 0:
logger.info(f"[{self.stream_id}] 计时器为零或负数({current_timer:.1f}秒)退出PF循环")
break # 退出条件:计时器到期
# 记录循环开始时间
loop_cycle_start_time = time.monotonic()
# 标记本周期是否执行了操作
action_taken_this_cycle = False
# 获取处理锁,确保每个计划-回复-发送周期独占执行
acquired_lock = False
try:
await self._processing_lock.acquire()
acquired_lock = True
logger.debug(f"[{self.stream_id}] 循环获取到处理锁")
# --- Planner ---
# Planner decides action, reasoning, emoji_query, etc.
planner_result = await self._planner() # Modify planner to return decision dict
action = planner_result.get("action", "error")
reasoning = planner_result.get("reasoning", "Planner did not provide reasoning.")
emoji_query = planner_result.get("emoji_query", "")
current_mind = planner_result.get("current_mind", "[Mind unavailable]")
send_emoji_from_tools = planner_result.get("send_emoji_from_tools", "")
observed_messages = planner_result.get("observed_messages", []) # Planner needs to return this
if action == "text_reply":
logger.info(f"[{self.stream_id}] 计划循环决定: 回复文本.")
action_taken_this_cycle = True
# --- 回复器 ---
anchor_message = await self._get_anchor_message(observed_messages)
if not anchor_message:
logger.error(f"[{self.stream_id}] 循环: 无法获取锚点消息用于回复. 跳过周期.")
else:
thinking_id = await self.heartfc_chat._create_thinking_message(anchor_message)
if not thinking_id:
logger.error(f"[{self.stream_id}] 循环: 无法创建思考ID. 跳过周期.")
else:
replier_result = None
try:
# 直接 await 回复器工作
replier_result = await self._replier_work(
observed_messages=observed_messages,
anchor_message=anchor_message,
thinking_id=thinking_id,
current_mind=current_mind,
send_emoji=send_emoji_from_tools
)
except Exception as e_replier:
logger.error(f"[{self.stream_id}] 循环: 回复器工作失败: {e_replier}")
self._cleanup_thinking_message(thinking_id) # 清理思考消息
# 继续循环, 视为非操作周期
if replier_result:
# --- Sender ---
try:
await self._sender(thinking_id, anchor_message, replier_result)
logger.info(f"[{self.stream_id}] 循环: 发送器完成成功.")
except Exception as e_sender:
logger.error(f"[{self.stream_id}] 循环: 发送器失败: {e_sender}")
self._cleanup_thinking_message(thinking_id) # 确保发送失败时清理
# 继续循环, 视为非操作周期
else:
# Replier failed to produce result
logger.warning(f"[{self.stream_id}] 循环: 回复器未产生结果. 跳过发送.")
self._cleanup_thinking_message(thinking_id) # 清理思考消息
elif action == "emoji_reply":
logger.info(f"[{self.stream_id}] 计划循环决定: 回复表情 ('{emoji_query}').")
action_taken_this_cycle = True
anchor = await self._get_anchor_message(observed_messages)
if anchor:
try:
await self.heartfc_chat._handle_emoji(anchor, [], emoji_query)
except Exception as e_emoji:
logger.error(f"[{self.stream_id}] 循环: 发送表情失败: {e_emoji}")
else:
logger.warning(f"[{self.stream_id}] 循环: 无法发送表情, 无法获取锚点.")
elif action == "no_reply":
logger.info(f"[{self.stream_id}] 计划循环决定: 不回复. 原因: {reasoning}")
# Do nothing else, action_taken_this_cycle remains False
elif action == "error":
logger.error(f"[{self.stream_id}] 计划循环返回错误或失败. 原因: {reasoning}")
# 视为非操作周期
else: # Unknown action
logger.warning(f"[{self.stream_id}] 计划循环返回未知动作: {action}. 视为不回复.")
# 视为非操作周期
except Exception as e_cycle:
# Catch errors occurring within the locked section (e.g., planner crash)
logger.error(f"[{self.stream_id}] 循环周期执行时发生错误: {e_cycle}")
logger.error(traceback.format_exc())
# Ensure lock is released if an error occurs before the finally block
if acquired_lock and self._processing_lock.locked():
self._processing_lock.release()
acquired_lock = False # 防止在 finally 块中重复释放
logger.warning(f"[{self.stream_id}] 由于循环周期中的错误释放了处理锁.")
finally:
# Ensure the lock is always released after a cycle
if acquired_lock:
self._processing_lock.release()
logger.debug(f"[{self.stream_id}] 循环释放了处理锁.")
# --- Timer Decrement ---
cycle_duration = time.monotonic() - loop_cycle_start_time
async with self._timer_lock:
self._loop_timer -= cycle_duration
logger.debug(f"[{self.stream_id}] 循环周期耗时 {cycle_duration:.2f}s. 计时器剩余: {self._loop_timer:.1f}s.")
# --- Delay ---
# Add a small delay, especially if no action was taken, to prevent busy-waiting
try:
if not action_taken_this_cycle and cycle_duration < 1.5:
# If nothing happened and cycle was fast, wait a bit longer
await asyncio.sleep(1.5 - cycle_duration)
elif cycle_duration < 0.2: # Minimum delay even if action was taken
await asyncio.sleep(0.2)
except asyncio.CancelledError:
logger.info(f"[{self.stream_id}] Sleep interrupted, likely loop cancellation.")
break # Exit loop if cancelled during sleep
except asyncio.CancelledError:
logger.info(f"[{self.stream_id}] PF loop task received cancellation request.")
except Exception as e_loop_outer:
# Catch errors outside the main cycle lock (should be rare)
logger.error(f"[{self.stream_id}] PF loop encountered unexpected outer error: {e_loop_outer}")
logger.error(traceback.format_exc())
finally:
# Reset trigger count when loop finishes
async with self._timer_lock:
self._trigger_count_this_activation = 0
logger.debug(f"[{self.stream_id}] Trigger count reset to 0 as loop finishes.")
logger.info(f"[{self.stream_id}] PF loop finished execution run.")
# State reset (_loop_active, _loop_task) is handled by _handle_loop_completion callback
async def _planner(self) -> Dict[str, Any]:
"""
规划器 (Planner): 使用LLM根据上下文决定是否和如何回复。
Returns a dictionary containing the decision and context.
{'action': str, 'reasoning': str, 'emoji_query': str, 'current_mind': str,
'send_emoji_from_tools': str, 'observed_messages': List[dict]}
"""
observed_messages: List[dict] = []
tool_result_info = {}
get_mid_memory_id = []
send_emoji_from_tools = "" # Renamed for clarity
current_mind: Optional[str] = None
# --- 获取最新的观察信息 ---
try:
if self.sub_hf and self.sub_hf._get_primary_observation():
observation = self.sub_hf._get_primary_observation()
logger.debug(f"[{self.stream_id}][Planner] 调用 observation.observe()...")
await observation.observe() # 主动观察以获取最新消息
observed_messages = observation.talking_message # 获取更新后的消息列表
logger.debug(f"[{self.stream_id}][Planner] 获取到 {len(observed_messages)} 条观察消息。")
else:
logger.warning(f"[{self.stream_id}][Planner] 无法获取 SubHeartflow 或 Observation 来获取消息。")
except Exception as e:
logger.error(f"[{self.stream_id}][Planner] 获取观察信息时出错: {e}")
logger.error(traceback.format_exc())
# --- 结束获取观察信息 ---
# --- (Moved from _replier_work) 1. 思考前使用工具 ---
try:
observation_context_text = ""
if observed_messages:
context_texts = [msg.get('detailed_plain_text', '') for msg in observed_messages if msg.get('detailed_plain_text')]
observation_context_text = "\n".join(context_texts)
logger.debug(f"[{self.stream_id}][Planner] Context for tools: {observation_context_text[:100]}...")
if observation_context_text and self.sub_hf:
# Ensure SubHeartflow exists for tool use context
tool_result = await self.heartfc_chat.tool_user.use_tool(
message_txt=observation_context_text,
chat_stream=self.chat_stream,
sub_heartflow=self.sub_hf
)
if tool_result.get("used_tools", False):
tool_result_info = tool_result.get("structured_info", {})
logger.debug(f"[{self.stream_id}][Planner] Tool results: {tool_result_info}")
if "mid_chat_mem" in tool_result_info:
get_mid_memory_id = [mem["content"] for mem in tool_result_info["mid_chat_mem"] if "content" in mem]
if "send_emoji" in tool_result_info and tool_result_info["send_emoji"]:
send_emoji_from_tools = tool_result_info["send_emoji"][0].get("content", "") # Use renamed var
elif not self.sub_hf:
logger.warning(f"[{self.stream_id}][Planner] Skipping tool use because SubHeartflow is not available.")
except Exception as e_tool:
logger.error(f"[PFChatting-{self.stream_id}][Planner] Tool use failed: {e_tool}")
# Continue even if tool use fails
# --- 结束工具使用 ---
# 心流思考然后plan
try:
if self.sub_hf:
# Ensure arguments match the current do_thinking_before_reply signature
current_mind, past_mind = await self.sub_hf.do_thinking_before_reply(
chat_stream=self.chat_stream,
extra_info=tool_result_info,
obs_id=get_mid_memory_id,
)
logger.info(f"[{self.stream_id}][Planner] SubHeartflow thought: {current_mind}")
else:
logger.warning(f"[{self.stream_id}][Planner] Skipping SubHeartflow thinking because it is not available.")
current_mind = "[心流思考不可用]" # Set a default/indicator value
except Exception as e_shf:
logger.error(f"[PFChatting-{self.stream_id}][Planner] SubHeartflow thinking failed: {e_shf}")
logger.error(traceback.format_exc())
current_mind = "[心流思考出错]"
# --- 使用 LLM 进行决策 ---
action = "no_reply" # Default action
emoji_query = ""
reasoning = "默认决策或获取决策失败"
llm_error = False # Flag for LLM failure
try:
# 构建提示 (Now includes current_mind)
prompt = self._build_planner_prompt(observed_messages, current_mind)
logger.trace(f"[{self.stream_id}][Planner] Prompt: {prompt}")
# 准备 LLM 请求 Payload
payload = {
"model": self.planner_llm.model_name,
"messages": [{"role": "user", "content": prompt}],
"tools": PLANNER_TOOL_DEFINITION,
"tool_choice": {"type": "function", "function": {"name": "decide_reply_action"}}, # 强制调用此工具
}
logger.debug(f"[{self.stream_id}][Planner] 发送 Planner LLM 请求...")
# 调用 LLM
response = await self.planner_llm._execute_request(
endpoint="/chat/completions", payload=payload, prompt=prompt
)
# 解析 LLM 响应
if len(response) == 3: # 期望返回 content, reasoning_content, tool_calls
_, _, tool_calls = response
if tool_calls and isinstance(tool_calls, list) and len(tool_calls) > 0:
# 通常强制调用后只会有一个 tool_call
tool_call = tool_calls[0]
if tool_call.get("type") == "function" and tool_call.get("function", {}).get("name") == "decide_reply_action":
try:
arguments = json.loads(tool_call["function"]["arguments"])
action = arguments.get("action", "no_reply")
reasoning = arguments.get("reasoning", "未提供理由")
if action == "emoji_reply":
# Planner's decision overrides tool's emoji if action is emoji_reply
emoji_query = arguments.get("emoji_query", send_emoji_from_tools) # Use tool emoji as default if planner asks for emoji
logger.info(f"[{self.stream_id}][Planner] LLM 决策: {action}, 理由: {reasoning}, EmojiQuery: '{emoji_query}'")
except json.JSONDecodeError as json_e:
logger.error(f"[{self.stream_id}][Planner] 解析工具参数失败: {json_e}. Arguments: {tool_call['function'].get('arguments')}")
action = "error"; reasoning = "工具参数解析失败"; llm_error = True
except Exception as parse_e:
logger.error(f"[{self.stream_id}][Planner] 处理工具参数时出错: {parse_e}")
action = "error"; reasoning = "处理工具参数时出错"; llm_error = True
else:
logger.warning(f"[{self.stream_id}][Planner] LLM 未按预期调用 'decide_reply_action' 工具。Tool calls: {tool_calls}")
action = "error"; reasoning = "LLM未调用预期工具"; llm_error = True
else:
logger.warning(f"[{self.stream_id}][Planner] LLM 响应中未包含有效的工具调用。Tool calls: {tool_calls}")
action = "error"; reasoning = "LLM响应无工具调用"; llm_error = True
else:
logger.warning(f"[{self.stream_id}][Planner] LLM 未返回预期的工具调用响应。Response parts: {len(response)}")
action = "error"; reasoning = "LLM响应格式错误"; llm_error = True
except Exception as llm_e:
logger.error(f"[{self.stream_id}][Planner] Planner LLM 调用失败: {llm_e}")
logger.error(traceback.format_exc())
action = "error"; reasoning = f"LLM 调用失败: {llm_e}"; llm_error = True
# --- 返回决策结果 ---
# Note: Lock release is handled by the loop now
return {
"action": action,
"reasoning": reasoning,
"emoji_query": emoji_query, # Specific query if action is emoji_reply
"current_mind": current_mind,
"send_emoji_from_tools": send_emoji_from_tools, # Emoji suggested by pre-thinking tools
"observed_messages": observed_messages,
"llm_error": llm_error # Indicate if LLM decision process failed
}
async def _get_anchor_message(self, observed_messages: List[dict]) -> Optional[MessageRecv]:
"""
重构观察到的最后一条消息作为回复的锚点,
如果重构失败或观察为空,则创建一个占位符。
"""
if not self.chat_stream:
logger.error(f"[PFChatting-{self.stream_id}] 无法获取锚点消息: ChatStream 不可用.")
return None
try:
last_msg_dict = None
if observed_messages:
last_msg_dict = observed_messages[-1]
if last_msg_dict:
try:
# Attempt reconstruction from the last observed message dictionary
anchor_message = MessageRecv(last_msg_dict, chat_stream=self.chat_stream)
# Basic validation
if not (anchor_message and anchor_message.message_info and anchor_message.message_info.message_id and anchor_message.message_info.user_info):
raise ValueError("重构的 MessageRecv 缺少必要信息.")
logger.debug(f"[{self.stream_id}] 重构的锚点消息: ID={anchor_message.message_info.message_id}")
return anchor_message
except Exception as e_reconstruct:
logger.warning(f"[{self.stream_id}] 从观察到的消息重构 MessageRecv 失败: {e_reconstruct}. 创建占位符.")
else:
logger.warning(f"[{self.stream_id}] observed_messages 为空. 创建占位符锚点消息.")
# --- Create Placeholder ---
placeholder_id = f"mid_pf_{int(time.time() * 1000)}"
placeholder_user = UserInfo(user_id="system_trigger", user_nickname="System Trigger", platform=self.chat_stream.platform)
placeholder_msg_info = BaseMessageInfo(
message_id=placeholder_id,
platform=self.chat_stream.platform,
group_info=self.chat_stream.group_info,
user_info=placeholder_user,
time=time.time()
)
placeholder_msg_dict = {
"message_info": placeholder_msg_info.to_dict(),
"processed_plain_text": "[System Trigger Context]", # Placeholder text
"raw_message": "",
"time": placeholder_msg_info.time,
}
anchor_message = MessageRecv(placeholder_msg_dict)
anchor_message.update_chat_stream(self.chat_stream) # Associate with the stream
logger.info(f"[{self.stream_id}] Created placeholder anchor message: ID={anchor_message.message_info.message_id}")
return anchor_message
except Exception as e:
logger.error(f"[PFChatting-{self.stream_id}] Error getting/creating anchor message: {e}")
logger.error(traceback.format_exc())
return None
def _cleanup_thinking_message(self, thinking_id: str):
"""Safely removes the thinking message."""
try:
container = MessageManager().get_container(self.stream_id)
container.remove_message(thinking_id, msg_type=MessageThinking)
logger.debug(f"[{self.stream_id}] Cleaned up thinking message {thinking_id}.")
except Exception as e:
logger.error(f"[{self.stream_id}] Error cleaning up thinking message {thinking_id}: {e}")
async def _sender(self, thinking_id: str, anchor_message: MessageRecv, replier_result: Dict[str, Any]):
"""
发送器 (Sender): 使用HeartFC_Chat的方法发送生成的回复。
被 _run_pf_loop 直接调用和 await。
也处理相关的操作,如发送表情和更新关系。
Raises exception on failure to signal the loop.
"""
# replier_result should contain 'response_set' and 'send_emoji'
response_set = replier_result.get("response_set")
send_emoji = replier_result.get("send_emoji", "") # Emoji determined by tools, passed via replier
if not response_set:
logger.error(f"[PFChatting-{self.stream_id}][Sender-{thinking_id}] Called with empty response_set.")
# Clean up thinking message before raising error
self._cleanup_thinking_message(thinking_id)
raise ValueError("Sender called with no response_set") # Signal failure to loop
first_bot_msg: Optional[MessageSending] = None
send_success = False
try:
# --- Send the main text response ---
logger.debug(f"[{self.stream_id}][Sender-{thinking_id}] Sending response messages...")
# This call implicitly handles replacing the MessageThinking with MessageSending/MessageSet
first_bot_msg = await self.heartfc_chat._send_response_messages(anchor_message, response_set, thinking_id)
if first_bot_msg:
send_success = True # Mark success
logger.info(f"[PFChatting-{self.stream_id}][Sender-{thinking_id}] Successfully sent reply.")
# --- Handle associated emoji (if determined by tools) ---
if send_emoji:
logger.info(f"[PFChatting-{self.stream_id}][Sender-{thinking_id}] Sending associated emoji: {send_emoji}")
try:
# Use first_bot_msg as anchor if available, otherwise fallback to original anchor
emoji_anchor = first_bot_msg if first_bot_msg else anchor_message
await self.heartfc_chat._handle_emoji(emoji_anchor, response_set, send_emoji)
except Exception as e_emoji:
logger.error(f"[PFChatting-{self.stream_id}][Sender-{thinking_id}] Failed to send associated emoji: {e_emoji}")
# Log error but don't fail the whole send process for emoji failure
# --- Update relationship ---
try:
await self.heartfc_chat._update_relationship(anchor_message, response_set)
logger.debug(f"[PFChatting-{self.stream_id}][Sender-{thinking_id}] Updated relationship.")
except Exception as e_rel:
logger.error(f"[PFChatting-{self.stream_id}][Sender-{thinking_id}] Failed to update relationship: {e_rel}")
# Log error but don't fail the whole send process for relationship update failure
else:
# Sending failed (e.g., _send_response_messages found thinking message already gone)
send_success = False
logger.warning(f"[PFChatting-{self.stream_id}][Sender-{thinking_id}] Failed to send reply (maybe thinking message expired or was removed?).")
# No need to clean up thinking message here, _send_response_messages implies it's gone or handled
raise RuntimeError("Sending reply failed, _send_response_messages returned None.") # Signal failure
except Exception as e:
# Catch potential errors during sending or post-send actions
logger.error(f"[PFChatting-{self.stream_id}][Sender-{thinking_id}] Error during sending process: {e}")
logger.error(traceback.format_exc())
# Ensure thinking message is cleaned up if send failed mid-way and wasn't handled
if not send_success:
self._cleanup_thinking_message(thinking_id)
raise # Re-raise the exception to signal failure to the loop
# No finally block needed for lock management
async def shutdown(self):
"""
Gracefully shuts down the PFChatting instance by cancelling the active loop task.
"""
logger.info(f"[{self.stream_id}] Shutting down PFChatting...")
if self._loop_task and not self._loop_task.done():
logger.info(f"[{self.stream_id}] Cancelling active PF loop task.")
self._loop_task.cancel()
try:
# Wait briefly for the task to acknowledge cancellation
await asyncio.wait_for(self._loop_task, timeout=5.0)
except asyncio.CancelledError:
logger.info(f"[{self.stream_id}] PF loop task cancelled successfully.")
except asyncio.TimeoutError:
logger.warning(f"[{self.stream_id}] Timeout waiting for PF loop task cancellation.")
except Exception as e:
logger.error(f"[{self.stream_id}] Error during loop task cancellation: {e}")
else:
logger.info(f"[{self.stream_id}] No active PF loop task found to cancel.")
# Ensure loop state is reset even if task wasn't running or cancellation failed
self._loop_active = False
self._loop_task = None
# Double-check lock state (should be released by loop completion/cancellation handler)
if self._processing_lock.locked():
logger.warning(f"[{self.stream_id}] Releasing processing lock during shutdown.")
self._processing_lock.release()
logger.info(f"[{self.stream_id}] PFChatting shutdown complete.")
def _build_planner_prompt(self, observed_messages: List[dict], current_mind: Optional[str]) -> str:
"""构建 Planner LLM 的提示词 (现在包含 current_mind)"""
prompt = "你是一个聊天机器人助手,正在决定是否以及如何回应当前的聊天。\n"
prompt += f"你的名字是 {global_config.BOT_NICKNAME}\n"
# Add current mind state if available
if current_mind:
prompt += f"\n你当前的内部想法是:\n---\n{current_mind}\n---\n\n"
else:
prompt += "\n你当前没有特别的内部想法。\n"
if observed_messages:
context_text = "\n".join([msg.get('detailed_plain_text', '') for msg in observed_messages if msg.get('detailed_plain_text')])
prompt += "观察到的最新聊天内容如下:\n---\n"
prompt += context_text[:1500] # Limit context length
prompt += "\n---\n"
else:
prompt += "当前没有观察到新的聊天内容。\n"
prompt += "\n请结合你的内部想法和观察到的聊天内容,分析情况并使用 'decide_reply_action' 工具来决定你的最终行动。\n"
prompt += "决策依据:\n"
prompt += "1. 如果聊天内容无聊、与你无关、或者你的内部想法认为不适合回复,选择 'no_reply'\n"
prompt += "2. 如果聊天内容值得回应,且适合用文字表达(参考你的内部想法),选择 'text_reply'\n"
prompt += "3. 如果聊天内容或你的内部想法适合用一个表情来回应,选择 'emoji_reply' 并提供表情主题 'emoji_query'\n"
prompt += "必须调用 'decide_reply_action' 工具并提供 'action''reasoning'"
return prompt
# --- 回复器 (Replier) 的定义 --- #
async def _replier_work(self, observed_messages: List[dict], anchor_message: MessageRecv, thinking_id: str, current_mind: Optional[str], send_emoji: str) -> Optional[Dict[str, Any]]:
"""
回复器 (Replier): 核心逻辑用于生成回复。
被 _run_pf_loop 直接调用和 await。
Returns dict with 'response_set' and 'send_emoji' or None on failure.
"""
response_set: Optional[List[str]] = None
try:
# --- Tool Use and SubHF Thinking are now in _planner ---
# --- Generate Response with LLM ---
logger.debug(f"[{self.stream_id}][Replier-{thinking_id}] Calling LLM to generate response...")
# 注意:实际的生成调用是在 self.heartfc_chat.gpt.generate_response 中
response_set = await self.heartfc_chat.gpt.generate_response(
anchor_message,
thinking_id
# current_mind 不再直接传递给 gpt.generate_response
# 因为 generate_response 内部会通过 thinking_id 或其他方式获取所需上下文
)
if not response_set:
logger.warning(f"[{self.stream_id}][Replier-{thinking_id}] LLM生成了一个空回复集。")
return None # Indicate failure
# --- 准备并返回结果 ---
logger.info(f"[{self.stream_id}][Replier-{thinking_id}] 成功生成了回复集: {' '.join(response_set)[:50]}...")
return {
"response_set": response_set,
"send_emoji": send_emoji, # Pass through the emoji determined earlier (usually by tools)
}
except Exception as e:
logger.error(f"[PFChatting-{self.stream_id}][Replier-{thinking_id}] Unexpected error in replier_work: {e}")
logger.error(traceback.format_exc())
return None # Indicate failure

View File

@@ -0,0 +1,22 @@
新写一个类叫做pfchating
这个类初始化时会输入一个chat_stream或者stream_id
这个类会包含对应的sub_hearflow和一个chat_stream
pfchating有以下几个组成部分
规划器决定是否要进行回复根据sub_heartflow中的observe内容可以选择不回复回复文字或者回复表情包你可以使用llm的工具调用来实现
回复器可以根据信息产生回复这部分代码将大部分与trigger_reply_generation(stream_id, observed_messages)一模一样
(回复器可能同时运行多个(0-3个),这些回复器会根据不同时刻的规划器产生不同回复
检查器:由于生成回复需要时间,检查器会检查在有了新的消息内容之后,回复是否还适合,如果合适就转给发送器
如果一条消息被发送了,其他回复在检查时也要增加这条消息的信息,防止重复发送内容相近的回复
发送器将回复发送到聊天这部分主体不需要再pfcchating中实现只需要使用原有的self._send_response_messages(anchor_message, response_set, thinking_id)
当_process_triggered_reply(self, stream_id: str, observed_messages: List[dict]):触发时,并不会单独进行一次回复
问题:
1.每个pfchating是否对应一个caht_stream是否是唯一的(fix)
2.observe_text传入进来是纯str是不是应该传进来message构成的list?(fix)
3.检查失败的回复应该怎么处理?(先抛弃)
4.如何比较相似度?
5.planner怎么写好像可以先不加入这部分