ref:彻底合并normal和focus,完全基于planner决定target message

This commit is contained in:
SengokuCola
2025-08-11 00:20:08 +08:00
parent 3804124df8
commit a247be0a04
2 changed files with 215 additions and 286 deletions

View File

@@ -90,10 +90,7 @@ class HeartFChatting:
self.relationship_builder = relationship_builder_manager.get_or_create_builder(self.stream_id)
self.expression_learner = expression_learner_manager.get_expression_learner(self.stream_id)
self.loop_mode = ChatMode.NORMAL # 初始循环模式为普通模式
self.last_action = "no_action"
self.action_manager = ActionManager()
self.action_planner = ActionPlanner(chat_id=self.stream_id, action_manager=self.action_manager)
@@ -184,13 +181,9 @@ class HeartFChatting:
async def _energy_loop(self):
while self.running:
await asyncio.sleep(10)
if self.loop_mode == ChatMode.NORMAL:
self.energy_value -= 0.3
self.energy_value = max(self.energy_value, 0.3)
if self.loop_mode == ChatMode.FOCUS:
self.energy_value -= 0.6
self.energy_value = max(self.energy_value, 0.3)
await asyncio.sleep(12)
self.energy_value -= 0.5
self.energy_value = max(self.energy_value, 0.3)
def print_cycle_info(self, cycle_timers):
# 记录循环信息和计时器结果
@@ -199,10 +192,26 @@ class HeartFChatting:
formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}"
timer_strings.append(f"{name}: {formatted_time}")
# 获取动作类型,兼容新旧格式
action_type = "未知动作"
if hasattr(self, '_current_cycle_detail') and self._current_cycle_detail:
loop_plan_info = self._current_cycle_detail.loop_plan_info
if isinstance(loop_plan_info, dict):
action_result = loop_plan_info.get('action_result', {})
if isinstance(action_result, dict):
# 旧格式action_result是字典
action_type = action_result.get('action_type', '未知动作')
elif isinstance(action_result, list) and action_result:
# 新格式action_result是actions列表
action_type = action_result[0].get('action_type', '未知动作')
elif isinstance(loop_plan_info, list) and loop_plan_info:
# 直接是actions列表的情况
action_type = loop_plan_info[0].get('action_type', '未知动作')
logger.info(
f"{self.log_prefix}{self._current_cycle_detail.cycle_id}次思考,"
f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒, " # type: ignore
f"选择动作: {self._current_cycle_detail.loop_plan_info.get('action_result', {}).get('action_type', '未知动作')}"
f"选择动作: {action_type}"
+ (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "")
)
@@ -228,7 +237,7 @@ class HeartFChatting:
logger.info(f"{self.log_prefix} 兴趣度充足")
self.focus_energy = 1
async def _should_process_messages(self, new_message: List[Dict[str, Any]], mode: ChatMode) -> bool:
async def _should_process_messages(self, new_message: List[Dict[str, Any]]) -> tuple[bool,float]:
"""
判断是否应该处理消息
@@ -241,61 +250,56 @@ class HeartFChatting:
"""
new_message_count = len(new_message)
if mode == ChatMode.NORMAL:
# Normal模式简单的消息数量判断
return new_message_count >= self.focus_energy
talk_frequency = global_config.chat.get_current_talk_frequency(self.stream_id)
modified_exit_count_threshold = self.focus_energy / talk_frequency
if new_message_count >= modified_exit_count_threshold:
# 记录兴趣度到列表
total_interest = 0.0
for msg_dict in new_message:
interest_value = msg_dict.get("interest_value", 0.0)
if msg_dict.get("processed_plain_text", ""):
total_interest += interest_value
elif mode == ChatMode.FOCUS:
# Focus模式原有的breaking形式no_reply逻辑
talk_frequency = global_config.chat.get_current_talk_frequency(self.stream_id)
modified_exit_count_threshold = self.focus_energy / talk_frequency
self.recent_interest_records.append(total_interest)
if new_message_count >= modified_exit_count_threshold:
logger.info(
f"{self.log_prefix} 累计消息数量达到{new_message_count}条(>{modified_exit_count_threshold}),结束等待"
)
return True,total_interest/new_message_count
# 检查累计兴趣值
if new_message_count > 0:
accumulated_interest = 0.0
for msg_dict in new_message:
text = msg_dict.get("processed_plain_text", "")
interest_value = msg_dict.get("interest_value", 0.0)
if text:
accumulated_interest += interest_value
# 只在兴趣值变化时输出log
if not hasattr(self, "_last_accumulated_interest") or accumulated_interest != self._last_accumulated_interest:
logger.info(f"{self.log_prefix} breaking形式当前累计兴趣值: {accumulated_interest:.2f}, 当前聊天频率: {talk_frequency:.2f}")
self._last_accumulated_interest = accumulated_interest
if accumulated_interest >= 3 / talk_frequency:
# 记录兴趣度到列表
total_interest = 0.0
for msg_dict in new_message:
interest_value = msg_dict.get("interest_value", 0.0)
if msg_dict.get("processed_plain_text", ""):
total_interest += interest_value
self.recent_interest_records.append(total_interest)
self.recent_interest_records.append(accumulated_interest)
logger.info(
f"{self.log_prefix} 累计消息数量达到{new_message_count}(>{modified_exit_count_threshold}),结束等待"
f"{self.log_prefix} 累计兴趣值达到{accumulated_interest:.2f}(>{5 / talk_frequency}),结束等待"
)
return True
return True,accumulated_interest/new_message_count
# 检查累计兴趣值
if new_message_count > 0:
accumulated_interest = 0.0
for msg_dict in new_message:
text = msg_dict.get("processed_plain_text", "")
interest_value = msg_dict.get("interest_value", 0.0)
if text:
accumulated_interest += interest_value
# 只在兴趣值变化时输出log
if not hasattr(self, "_last_accumulated_interest") or accumulated_interest != self._last_accumulated_interest:
logger.info(f"{self.log_prefix} breaking形式当前累计兴趣值: {accumulated_interest:.2f}, 当前聊天频率: {talk_frequency:.2f}")
self._last_accumulated_interest = accumulated_interest
if accumulated_interest >= 3 / talk_frequency:
# 记录兴趣度到列表
self.recent_interest_records.append(accumulated_interest)
logger.info(
f"{self.log_prefix} 累计兴趣值达到{accumulated_interest:.2f}(>{5 / talk_frequency}),结束等待"
)
return True
# 每10秒输出一次等待状态
if int(time.time() - self.last_read_time) > 0 and int(time.time() - self.last_read_time) % 10 == 0:
logger.info(
f"{self.log_prefix} 已等待{time.time() - self.last_read_time:.0f}秒,累计{new_message_count}条消息,继续等待..."
)
await asyncio.sleep(0.5)
return False
# 每10秒输出一次等待状态
if int(time.time() - self.last_read_time) > 0 and int(time.time() - self.last_read_time) % 10 == 0:
logger.info(
f"{self.log_prefix} 已等待{time.time() - self.last_read_time:.0f}秒,累计{new_message_count}条消息,继续等待..."
)
await asyncio.sleep(0.5)
return False,0.0
async def _loopbody(self):
@@ -307,69 +311,33 @@ class HeartFChatting:
limit_mode="latest",
filter_mai=True,
filter_command=True,
)
# 先进行focus判定
if self.loop_mode == ChatMode.FOCUS:
if self.energy_value <= 1:
logger.info(f"{self.log_prefix} 能量值过低进入normal模式")
self.energy_value = 1
self.loop_mode = ChatMode.NORMAL
return True
elif self.loop_mode == ChatMode.NORMAL:
if global_config.chat.focus_value != 0 and self.energy_value >= 30:
self.loop_mode = ChatMode.FOCUS
return True
)
# 统一的消息处理逻辑
should_process = await self._should_process_messages(recent_messages_dict, self.loop_mode)
should_process,interest_value = await self._should_process_messages(recent_messages_dict)
if self.loop_mode == ChatMode.FOCUS:
# Focus模式处理
if self.last_action == "no_reply" and not should_process:
# 需要继续等待
self.energy_value -= 0.3 / global_config.chat.focus_value
logger.info(f"{self.log_prefix} 能量值减少,当前能量值:{self.energy_value:.1f}")
await asyncio.sleep(0.5)
return True
if should_process:
# Focus模式设置last_read_time并执行observe
self.last_read_time = time.time()
if await self._observe():
self.energy_value += 1 / global_config.chat.focus_value
logger.info(f"{self.log_prefix} 能量值增加,当前能量值:{self.energy_value:.1f}")
return True
elif self.loop_mode == ChatMode.NORMAL:
# Normal模式处理
if should_process:
# Normal模式设置last_read_time为最早消息的时间并调用normal_response
earliest_message_data = recent_messages_dict[0]
self.last_read_time = earliest_message_data.get("time")
if_think = await self.normal_response(earliest_message_data)
if if_think:
factor = max(global_config.chat.focus_value, 0.1)
self.energy_value *= 1.1 * pow(factor, 0.5)
logger.info(f"{self.log_prefix} 进行了思考,能量值按倍数增加,当前能量值:{self.energy_value:.1f}")
else:
self.energy_value += 0.1 * global_config.chat.focus_value
logger.debug(f"{self.log_prefix} 没有进行思考,能量值线性增加,当前能量值:{self.energy_value:.1f}")
if should_process:
earliest_message_data = recent_messages_dict[0]
self.last_read_time = earliest_message_data.get("time")
await self._observe(interest_value = interest_value)
logger.debug(f"{self.log_prefix} 当前能量值:{self.energy_value:.1f}")
return True
else:
# Normal模式消息数量不足等待
await asyncio.sleep(0.5)
return True
else:
# Normal模式消息数量不足等待
await asyncio.sleep(0.5)
return True
return True
async def build_reply_to_str(self, message_data: dict):
person_info_manager = get_person_info_manager()
# 获取 platform如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值
platform = message_data.get("chat_info_platform")
if platform is None:
platform = getattr(self.chat_stream, "platform", "unknown")
person_id = person_info_manager.get_person_id(
message_data.get("chat_info_platform"), # type: ignore
platform, # type: ignore
message_data.get("user_id"), # type: ignore
)
person_name = await person_info_manager.get_value(person_id, "person_name")
@@ -383,15 +351,21 @@ class HeartFChatting:
action_message,
cycle_timers: Dict[str, float],
thinking_id,
plan_result,
actions,
) -> Tuple[Dict[str, Any], str, Dict[str, float]]:
with Timer("回复发送", cycle_timers):
reply_text = await self._send_response(response_set, reply_to_str, loop_start_time, action_message)
# 存储reply action信息
person_info_manager = get_person_info_manager()
# 获取 platform如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值
platform = action_message.get("chat_info_platform")
if platform is None:
platform = getattr(self.chat_stream, "platform", "unknown")
person_id = person_info_manager.get_person_id(
action_message.get("chat_info_platform", ""),
platform,
action_message.get("user_id", ""),
)
person_name = await person_info_manager.get_value(person_id, "person_name")
@@ -410,7 +384,7 @@ class HeartFChatting:
# 构建循环信息
loop_info: Dict[str, Any] = {
"loop_plan_info": {
"action_result": plan_result.get("action_result", {}),
"action_result": actions,
},
"loop_action_info": {
"action_taken": True,
@@ -422,17 +396,44 @@ class HeartFChatting:
return loop_info, reply_text, cycle_timers
async def _observe(self, message_data: Optional[Dict[str, Any]] = None) -> bool:
if not message_data:
message_data = {}
async def _observe(self,interest_value:float = 0.0) -> bool:
action_type = "no_action"
reply_text = "" # 初始化reply_text变量避免UnboundLocalError
reply_to_str = "" # 初始化reply_to_str变量
# 根据interest_value计算概率决定使用哪种planner模式
# interest_value越高越倾向于使用Normal模式
import random
import math
# 使用sigmoid函数将interest_value转换为概率
# 当interest_value为0时概率接近0使用Focus模式
# 当interest_value很高时概率接近1使用Normal模式
def calculate_normal_mode_probability(interest_val: float) -> float:
# 使用sigmoid函数调整参数使概率分布更合理
# 当interest_value = 0时概率约为0.1
# 当interest_value = 1时概率约为0.5
# 当interest_value = 2时概率约为0.8
# 当interest_value = 3时概率约为0.95
k = 2.0 # 控制曲线陡峭程度
x0 = 1.0 # 控制曲线中心点
return 1.0 / (1.0 + math.exp(-k * (interest_val - x0)))
normal_mode_probability = calculate_normal_mode_probability(interest_value)
# 根据概率决定使用哪种模式
if random.random() < normal_mode_probability:
mode = ChatMode.NORMAL
logger.info(f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f}选择Normal planner模式")
else:
mode = ChatMode.FOCUS
logger.info(f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f}选择Focus planner模式")
# 创建新的循环信息
cycle_timers, thinking_id = self.start_cycle()
logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次思考[模式:{self.loop_mode}]")
logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次思考")
if ENABLE_S4U:
await send_typing()
@@ -452,82 +453,32 @@ class HeartFChatting:
except Exception as e:
logger.error(f"{self.log_prefix} 动作修改失败: {e}")
# 检查是否在normal模式下没有可用动作除了reply相关动作
skip_planner = False
if self.loop_mode == ChatMode.NORMAL:
# 过滤掉reply相关的动作检查是否还有其他动作
non_reply_actions = {
k: v for k, v in available_actions.items() if k not in ["reply", "no_reply", "no_action"]
}
if not non_reply_actions:
skip_planner = True
logger.info(f"{self.log_prefix} Normal模式下没有可用动作直接回复")
# 直接设置为reply动作
action_type = "reply"
reasoning = ""
action_data = {"loop_start_time": loop_start_time}
is_parallel = False
# 构建plan_result用于后续处理
plan_result = {
"action_result": {
"action_type": action_type,
"action_data": action_data,
"reasoning": reasoning,
"timestamp": time.time(),
"is_parallel": is_parallel,
},
"action_prompt": "",
}
target_message = message_data
if not skip_planner:
planner_info = self.action_planner.get_necessary_info()
prompt_info = await self.action_planner.build_planner_prompt(
is_group_chat=planner_info[0],
chat_target_info=planner_info[1],
current_available_actions=planner_info[2],
)
if not await events_manager.handle_mai_events(
EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id
):
return False
with Timer("规划器", cycle_timers):
plan_result, target_message = await self.action_planner.plan(mode=self.loop_mode)
action_result: Dict[str, Any] = plan_result.get("action_result", {}) # type: ignore
action_type, action_data, reasoning, is_parallel = (
action_result.get("action_type", "error"),
action_result.get("action_data", {}),
action_result.get("reasoning", "未提供理由"),
action_result.get("is_parallel", True),
# 执行planner
planner_info = self.action_planner.get_necessary_info()
prompt_info = await self.action_planner.build_planner_prompt(
is_group_chat=planner_info[0],
chat_target_info=planner_info[1],
current_available_actions=planner_info[2],
)
if not await events_manager.handle_mai_events(
EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id
):
return False
with Timer("规划器", cycle_timers):
actions, _= await self.action_planner.plan(
mode=mode,
loop_start_time=loop_start_time,
available_actions=available_actions,
)
action_data["loop_start_time"] = loop_start_time
# action_result: Dict[str, Any] = plan_result.get("action_result", {}) # type: ignore
# action_type, action_data, reasoning, is_parallel = (
# action_result.get("action_type", "error"),
# action_result.get("action_data", {}),
# action_result.get("reasoning", "未提供理由"),
# action_result.get("is_parallel", True),
# )
action_message = message_data or target_message
# 重构后的动作处理逻辑:先汇总所有动作,然后并行执行
actions = []
# 1. 添加Planner取得的动作
actions.append({
"action_type": action_type,
"reasoning": reasoning,
"action_data": action_data,
"action_message": action_message,
"available_actions": available_actions # 添加这个字段
})
# 2. 如果不是reply动作且需要并行执行额外添加reply动作
if action_type != "reply" and is_parallel:
actions.append({
"action_type": "reply",
"action_message": action_message,
"available_actions": available_actions
})
# 3. 并行执行所有动作
async def execute_action(action_info):
@@ -575,7 +526,7 @@ class HeartFChatting:
else:
# 执行回复动作
reply_to_str = await self.build_reply_to_str(action_info["action_message"])
request_type = "chat.replyer"
# 生成回复
gather_timeout = global_config.chat.thinking_timeout
@@ -585,7 +536,7 @@ class HeartFChatting:
message_data=action_info["action_message"],
available_actions=action_info["available_actions"],
reply_to=reply_to_str,
request_type=request_type,
request_type="chat.replyer",
),
timeout=gather_timeout
)
@@ -624,7 +575,7 @@ class HeartFChatting:
action_info["action_message"],
cycle_timers,
thinking_id,
plan_result,
actions,
)
return {
"action_type": "reply",
@@ -634,6 +585,7 @@ class HeartFChatting:
}
except Exception as e:
logger.error(f"{self.log_prefix} 执行动作时出错: {e}")
logger.error(f"{self.log_prefix} 错误信息: {traceback.format_exc()}")
return {
"action_type": action_info["action_type"],
"success": False,
@@ -643,6 +595,8 @@ class HeartFChatting:
}
# 创建所有动作的后台任务
print(actions)
action_tasks = [asyncio.create_task(execute_action(action)) for action in actions]
# 并行执行所有任务
@@ -689,7 +643,7 @@ class HeartFChatting:
# 没有回复信息构建纯动作的loop_info
loop_info = {
"loop_plan_info": {
"action_result": plan_result.get("action_result", {}),
"action_result": actions,
},
"loop_action_info": {
"action_taken": action_success,
@@ -700,7 +654,6 @@ class HeartFChatting:
}
reply_text = action_reply_text
self.last_action = action_type
if ENABLE_S4U:
await stop_typing()
@@ -709,21 +662,17 @@ class HeartFChatting:
self.end_cycle(loop_info, cycle_timers)
self.print_cycle_info(cycle_timers)
if self.loop_mode == ChatMode.NORMAL:
await self.willing_manager.after_generate_reply_handle(message_data.get("message_id", ""))
# await self.willing_manager.after_generate_reply_handle(message_data.get("message_id", ""))
action_type = actions[0]["action_type"] if actions else "no_action"
# 管理no_reply计数器当执行了非no_reply动作时重置计数器
if action_type != "no_reply" and action_type != "no_action":
if action_type != "no_reply":
# no_reply逻辑已集成到heartFC_chat.py中直接重置计数器
self.recent_interest_records.clear()
self.no_reply_consecutive = 0
logger.info(f"{self.log_prefix} 执行了{action_type}动作重置no_reply计数器")
logger.debug(f"{self.log_prefix} 执行了{action_type}动作重置no_reply计数器")
return True
elif action_type == "no_action":
# 当执行回复动作时也重置no_reply计数
self.recent_interest_records.clear()
self.no_reply_consecutive = 0
logger.info(f"{self.log_prefix} 执行了回复动作重置no_reply计数器")
if action_type == "no_reply":
self.no_reply_consecutive += 1
@@ -815,54 +764,6 @@ class HeartFChatting:
traceback.print_exc()
return False, "", ""
async def normal_response(self, message_data: dict) -> bool:
"""
处理接收到的消息。
"兴趣"模式下,判断是否回复并生成内容。
"""
interested_rate = message_data.get("interest_value") or 0.0
self.willing_manager.setup(message_data, self.chat_stream)
reply_probability = await self.willing_manager.get_reply_probability(message_data.get("message_id", ""))
talk_frequency = -1.00
if reply_probability < 1: # 简化逻辑,如果未提及 (reply_probability 为 0),则获取意愿概率
additional_config = message_data.get("additional_config", {})
if additional_config and "maimcore_reply_probability_gain" in additional_config:
reply_probability += additional_config["maimcore_reply_probability_gain"]
reply_probability = min(max(reply_probability, 0), 1) # 确保概率在 0-1 之间
talk_frequency = global_config.chat.get_current_talk_frequency(self.stream_id)
reply_probability = talk_frequency * reply_probability
# 处理表情包
if message_data.get("is_emoji") or message_data.get("is_picid"):
reply_probability = 0
# 打印消息信息
mes_name = self.chat_stream.group_info.group_name if self.chat_stream.group_info else "私聊"
# logger.info(f"[{mes_name}] 当前聊天频率: {talk_frequency:.2f},兴趣值: {interested_rate:.2f},回复概率: {reply_probability * 100:.1f}%")
if reply_probability > 0.05:
logger.info(
f"[{mes_name}]"
f"{message_data.get('user_nickname')}:"
f"{message_data.get('processed_plain_text')}[兴趣:{interested_rate:.2f}][回复概率:{reply_probability * 100:.1f}%]"
)
if random.random() < reply_probability:
await self.willing_manager.before_generate_reply_handle(message_data.get("message_id", ""))
await self._observe(message_data=message_data)
return True
# 意愿管理器注销当前message信息 (无论是否回复,只要处理过就删除)
self.willing_manager.delete(message_data.get("message_id", ""))
return False
async def _generate_response(
self,
message_data: dict,
@@ -904,8 +805,6 @@ class HeartFChatting:
if need_reply:
logger.info(f"{self.log_prefix} 从思考到回复,共有{new_message_count}条新消息,使用引用回复")
else:
logger.info(f"{self.log_prefix} 从思考到回复,共有{new_message_count}条新消息,不使用引用回复")
reply_text = ""
first_replied = False