炸飞hfc,引入afc

This commit is contained in:
Windpicker-owo
2025-09-15 17:51:49 +08:00
parent c52b4daf1a
commit 0d47e237ee
21 changed files with 46 additions and 3198 deletions

View File

@@ -0,0 +1,8 @@
"""
亲和力流模块初始化文件
提供全局的AFC管理器实例
"""
from src.chat.affinity_flow.afc_manager import afc_manager
__all__ = ['afc_manager', 'AFCManager', 'AffinityFlowChatter']

View File

@@ -136,3 +136,5 @@ class AFCManager:
if stream_id in self.affinity_flow_chatters:
self.affinity_flow_chatters[stream_id].update_interest_keywords(new_keywords)
logger.info(f"已更新聊天流 {stream_id} 的兴趣关键词: {list(new_keywords.keys())}")
afc_manager = AFCManager()

View File

@@ -1,454 +0,0 @@
import asyncio
import time
import traceback
import math
import random
from typing import Dict, Any, Tuple
from src.chat.utils.timer_calculator import Timer
from src.common.logger import get_logger
from src.config.config import global_config
from src.chat.planner_actions.planner import ActionPlanner
from src.chat.planner_actions.action_modifier import ActionModifier
from src.person_info.person_info import get_person_info_manager
from src.plugin_system.apis import database_api, generator_api
from src.plugin_system.base.component_types import ChatMode
from src.mais4u.constant_s4u import ENABLE_S4U
from src.chat.chat_loop.hfc_utils import send_typing, stop_typing
from .hfc_context import HfcContext
from .response_handler import ResponseHandler
from .cycle_tracker import CycleTracker
# 日志记录器
logger = get_logger("hfc.processor")
class CycleProcessor:
"""
循环处理器类,负责处理单次思考循环的逻辑。
"""
def __init__(self, context: HfcContext, response_handler: ResponseHandler, cycle_tracker: CycleTracker):
"""
初始化循环处理器
Args:
context: HFC聊天上下文对象包含聊天流、能量值等信息
response_handler: 响应处理器,负责生成和发送回复
cycle_tracker: 循环跟踪器,负责记录和管理每次思考循环的信息
"""
self.context = context
self.response_handler = response_handler
self.cycle_tracker = cycle_tracker
self.action_planner = ActionPlanner(chat_id=self.context.stream_id, action_manager=self.context.action_manager)
self.action_modifier = ActionModifier(
action_manager=self.context.action_manager, chat_id=self.context.stream_id
)
self.log_prefix = self.context.log_prefix
async def _send_and_store_reply(
self,
response_set,
loop_start_time,
action_message,
cycle_timers: Dict[str, float],
thinking_id,
actions,
) -> Tuple[Dict[str, Any], str, Dict[str, float]]:
"""
发送并存储回复信息
Args:
response_set: 回复内容集合
loop_start_time: 循环开始时间
action_message: 动作消息
cycle_timers: 循环计时器
thinking_id: 思考ID
actions: 动作列表
Returns:
Tuple[Dict[str, Any], str, Dict[str, float]]: 循环信息, 回复文本, 循环计时器
"""
# 发送回复
with Timer("回复发送", cycle_timers):
reply_text = await self.response_handler.send_response(response_set, loop_start_time, action_message)
# 存储reply action信息
person_info_manager = get_person_info_manager()
# 获取 platform如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值
platform = action_message.get("chat_info_platform")
if platform is None:
platform = getattr(self.context.chat_stream, "platform", "unknown")
# 获取用户信息并生成回复提示
person_id = person_info_manager.get_person_id(
platform,
action_message.get("chat_info_user_id", ""),
)
person_name = await person_info_manager.get_value(person_id, "person_name")
action_prompt_display = f"你对{person_name}进行了回复:{reply_text}"
# 存储动作信息到数据库
await database_api.store_action_info(
chat_stream=self.context.chat_stream,
action_build_into_prompt=False,
action_prompt_display=action_prompt_display,
action_done=True,
thinking_id=thinking_id,
action_data={"reply_text": reply_text},
action_name="reply",
)
# 构建循环信息
loop_info: Dict[str, Any] = {
"loop_plan_info": {
"action_result": actions,
},
"loop_action_info": {
"action_taken": True,
"reply_text": reply_text,
"command": "",
"taken_time": time.time(),
},
}
return loop_info, reply_text, cycle_timers
async def observe(self, interest_value: float = 0.0) -> str:
"""
观察和处理单次思考循环的核心方法
Args:
interest_value: 兴趣值
Returns:
str: 动作类型
功能说明:
- 开始新的思考循环并记录计时
- 修改可用动作并获取动作列表
- 根据聊天模式和提及情况决定是否跳过规划器
- 执行动作规划或直接回复
- 根据动作类型分发到相应的处理方法
"""
action_type = "no_action"
reply_text = "" # 初始化reply_text变量避免UnboundLocalError
# 使用sigmoid函数将interest_value转换为概率
# 当interest_value为0时概率接近0使用Focus模式
# 当interest_value很高时概率接近1使用Normal模式
def calculate_normal_mode_probability(interest_val: float) -> float:
"""
计算普通模式的概率
Args:
interest_val: 兴趣值
Returns:
float: 概率
"""
# 使用sigmoid函数调整参数使概率分布更合理
# 当interest_value = 0时概率约为0.1
# 当interest_value = 1时概率约为0.5
# 当interest_value = 2时概率约为0.8
# 当interest_value = 3时概率约为0.95
k = 2.0 # 控制曲线陡峭程度
x0 = 1.0 # 控制曲线中心点
return 1.0 / (1.0 + math.exp(-k * (interest_val - x0)))
# 计算普通模式概率
normal_mode_probability = (
calculate_normal_mode_probability(interest_value)
* 0.5
/ global_config.chat.get_current_talk_frequency(self.context.stream_id)
)
# 根据概率决定使用哪种模式
if random.random() < normal_mode_probability:
mode = ChatMode.NORMAL
logger.info(
f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f}选择Normal planner模式"
)
else:
mode = ChatMode.FOCUS
logger.info(
f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f}选择Focus planner模式"
)
# 开始新的思考循环
cycle_timers, thinking_id = self.cycle_tracker.start_cycle()
logger.info(f"{self.log_prefix} 开始第{self.context.cycle_counter}次思考")
if ENABLE_S4U and self.context.chat_stream and self.context.chat_stream.user_info:
await send_typing(self.context.chat_stream.user_info.user_id)
loop_start_time = time.time()
# 第一步:动作修改
with Timer("动作修改", cycle_timers):
try:
await self.action_modifier.modify_actions()
available_actions = self.context.action_manager.get_using_actions()
except Exception as e:
logger.error(f"{self.context.log_prefix} 动作修改失败: {e}")
available_actions = {}
# 规划动作
from src.plugin_system.core.event_manager import event_manager
from src.plugin_system import EventType
result = await event_manager.trigger_event(
EventType.ON_PLAN, permission_group="SYSTEM", stream_id=self.context.chat_stream
)
if result and not result.all_continue_process():
raise UserWarning(f"插件{result.get_summary().get('stopped_handlers', '')}于规划前中断了内容生成")
with Timer("规划器", cycle_timers):
actions, _ = await self.action_planner.plan(mode=mode)
async def execute_action(action_info):
"""执行单个动作的通用函数"""
try:
if action_info["action_type"] == "no_action":
return {"action_type": "no_action", "success": True, "reply_text": "", "command": ""}
if action_info["action_type"] == "no_reply":
# 直接处理no_reply逻辑不再通过动作系统
reason = action_info.get("reasoning", "选择不回复")
logger.info(f"{self.log_prefix} 选择不回复,原因: {reason}")
# 存储no_reply信息到数据库
await database_api.store_action_info(
chat_stream=self.context.chat_stream,
action_build_into_prompt=False,
action_prompt_display=reason,
action_done=True,
thinking_id=thinking_id,
action_data={"reason": reason},
action_name="no_reply",
)
return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""}
elif action_info["action_type"] != "reply" and action_info["action_type"] != "no_action":
# 执行普通动作
with Timer("动作执行", cycle_timers):
success, reply_text, command = await self._handle_action(
action_info["action_type"],
action_info["reasoning"],
action_info["action_data"],
cycle_timers,
thinking_id,
action_info["action_message"],
)
return {
"action_type": action_info["action_type"],
"success": success,
"reply_text": reply_text,
"command": command,
}
else:
# 生成回复
try:
success, response_set, _ = await generator_api.generate_reply(
chat_stream=self.context.chat_stream,
reply_message=action_info["action_message"],
available_actions=available_actions,
enable_tool=global_config.tool.enable_tool,
request_type="chat.replyer",
from_plugin=False,
)
if not success or not response_set:
logger.info(
f"{action_info['action_message'].get('processed_plain_text')} 的回复生成失败"
)
return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None}
except asyncio.CancelledError:
logger.debug(f"{self.log_prefix} 并行执行:回复生成任务已被取消")
return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None}
# 发送并存储回复
loop_info, reply_text, cycle_timers_reply = await self._send_and_store_reply(
response_set,
loop_start_time,
action_info["action_message"],
cycle_timers,
thinking_id,
actions,
)
return {"action_type": "reply", "success": True, "reply_text": reply_text, "loop_info": loop_info}
except Exception as e:
logger.error(f"{self.log_prefix} 执行动作时出错: {e}")
logger.error(f"{self.log_prefix} 错误信息: {traceback.format_exc()}")
return {
"action_type": action_info["action_type"],
"success": False,
"reply_text": "",
"loop_info": None,
"error": str(e),
}
# 分离 reply 动作和其他动作
reply_actions = [a for a in actions if a.get("action_type") == "reply"]
other_actions = [a for a in actions if a.get("action_type") != "reply"]
reply_loop_info = None
reply_text_from_reply = ""
other_actions_results = []
# 1. 首先串行执行所有 reply 动作(通常只有一个)
if reply_actions:
logger.info(f"{self.log_prefix} 正在执行文本回复...")
for action in reply_actions:
action_message = action.get("action_message")
if not action_message:
logger.warning(f"{self.log_prefix} reply 动作缺少 action_message跳过")
continue
# 检查是否是空的DatabaseMessages对象
if hasattr(action_message, 'chat_info') and hasattr(action_message.chat_info, 'user_info'):
target_user_id = action_message.chat_info.user_info.user_id
else:
# 如果是字典格式,使用原来的方式
target_user_id = action_message.get("chat_info_user_id", "")
if not target_user_id:
logger.warning(f"{self.log_prefix} reply 动作的 action_message 缺少用户ID跳过")
continue
if target_user_id == global_config.bot.qq_account and not global_config.chat.allow_reply_self:
logger.warning("选取的reply的目标为bot自己跳过reply action")
continue
result = await execute_action(action)
if isinstance(result, Exception):
logger.error(f"{self.log_prefix} 回复动作执行异常: {result}")
continue
if result.get("success"):
reply_loop_info = result.get("loop_info")
reply_text_from_reply = result.get("reply_text", "")
else:
logger.warning(f"{self.log_prefix} 回复动作执行失败")
# 2. 然后并行执行所有其他动作
if other_actions:
logger.info(f"{self.log_prefix} 正在执行附加动作: {[a.get('action_type') for a in other_actions]}")
other_action_tasks = [asyncio.create_task(execute_action(action)) for action in other_actions]
results = await asyncio.gather(*other_action_tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, BaseException):
logger.error(f"{self.log_prefix} 附加动作执行异常: {result}")
continue
other_actions_results.append(result)
# 构建最终的循环信息
if reply_loop_info:
loop_info = reply_loop_info
# 将其他动作的结果合并到loop_info中
if "other_actions" not in loop_info["loop_action_info"]:
loop_info["loop_action_info"]["other_actions"] = []
loop_info["loop_action_info"]["other_actions"].extend(other_actions_results)
reply_text = reply_text_from_reply
else:
# 没有回复信息构建纯动作的loop_info
# 即使没有回复,也要正确处理其他动作
final_action_taken = any(res.get("success", False) for res in other_actions_results)
final_reply_text = " ".join(res.get("reply_text", "") for res in other_actions_results if res.get("reply_text"))
final_command = " ".join(res.get("command", "") for res in other_actions_results if res.get("command"))
loop_info = {
"loop_plan_info": {
"action_result": actions,
},
"loop_action_info": {
"action_taken": final_action_taken,
"reply_text": final_reply_text,
"command": final_command,
"taken_time": time.time(),
"other_actions": other_actions_results,
},
}
reply_text = final_reply_text
# 停止正在输入状态
if ENABLE_S4U:
await stop_typing()
# 结束循环
self.context.chat_instance.cycle_tracker.end_cycle(loop_info, cycle_timers)
self.context.chat_instance.cycle_tracker.print_cycle_info(cycle_timers)
action_type = actions[0]["action_type"] if actions else "no_action"
return action_type
async def _handle_action(
self, action, reasoning, action_data, cycle_timers, thinking_id, action_message
) -> tuple[bool, str, str]:
"""
处理具体的动作执行
Args:
action: 动作名称
reasoning: 执行理由
action_data: 动作数据
cycle_timers: 循环计时器
thinking_id: 思考ID
action_message: 动作消息
Returns:
tuple: (执行是否成功, 回复文本, 命令文本)
功能说明:
- 创建对应的动作处理器
- 执行动作并捕获异常
- 返回执行结果供上级方法整合
"""
if not self.context.chat_stream:
return False, "", ""
try:
# 创建动作处理器
action_handler = self.context.action_manager.create_action(
action_name=action,
action_data=action_data,
reasoning=reasoning,
cycle_timers=cycle_timers,
thinking_id=thinking_id,
chat_stream=self.context.chat_stream,
log_prefix=self.context.log_prefix,
action_message=action_message,
)
if not action_handler:
# 动作处理器创建失败,尝试回退机制
logger.warning(f"{self.context.log_prefix} 创建动作处理器失败: {action},尝试回退方案")
# 获取当前可用的动作
available_actions = self.context.action_manager.get_using_actions()
fallback_action = None
# 回退优先级reply > 第一个可用动作
if "reply" in available_actions:
fallback_action = "reply"
elif available_actions:
fallback_action = list(available_actions.keys())[0]
if fallback_action and fallback_action != action:
logger.info(f"{self.context.log_prefix} 使用回退动作: {fallback_action}")
action_handler = self.context.action_manager.create_action(
action_name=fallback_action,
action_data=action_data,
reasoning=f"原动作'{action}'不可用,自动回退。{reasoning}",
cycle_timers=cycle_timers,
thinking_id=thinking_id,
chat_stream=self.context.chat_stream,
log_prefix=self.context.log_prefix,
action_message=action_message,
)
if not action_handler:
logger.error(f"{self.context.log_prefix} 回退方案也失败,无法创建任何动作处理器")
return False, "", ""
# 执行动作
success, reply_text = await action_handler.handle_action()
return success, reply_text, ""
except Exception as e:
logger.error(f"{self.context.log_prefix} 处理{action}时出错: {e}")
traceback.print_exc()
return False, "", ""

View File

@@ -1,114 +0,0 @@
import time
from typing import Dict, Any, Tuple
from src.common.logger import get_logger
from src.chat.chat_loop.hfc_utils import CycleDetail
from .hfc_context import HfcContext
logger = get_logger("hfc")
class CycleTracker:
def __init__(self, context: HfcContext):
"""
初始化循环跟踪器
Args:
context: HFC聊天上下文对象
功能说明:
- 负责跟踪和记录每次思考循环的详细信息
- 管理循环的开始、结束和信息存储
"""
self.context = context
def start_cycle(self, is_proactive: bool = False) -> Tuple[Dict[str, float], str]:
"""
开始新的思考循环
Args:
is_proactive: 标记这个循环是否由主动思考发起
Returns:
tuple: (循环计时器字典, 思考ID字符串)
功能说明:
- 增加循环计数器
- 创建新的循环详情对象
- 生成唯一的思考ID
- 初始化循环计时器
"""
if not is_proactive:
self.context.cycle_counter += 1
cycle_id = self.context.cycle_counter if not is_proactive else f"{self.context.cycle_counter}.p"
self.context.current_cycle_detail = CycleDetail(cycle_id)
self.context.current_cycle_detail.thinking_id = f"tid{str(round(time.time(), 2))}"
cycle_timers = {}
return cycle_timers, self.context.current_cycle_detail.thinking_id
def end_cycle(self, loop_info: Dict[str, Any], cycle_timers: Dict[str, float]):
"""
结束当前思考循环
Args:
loop_info: 循环信息,包含规划和动作信息
cycle_timers: 循环计时器,记录各阶段耗时
功能说明:
- 设置循环详情的完整信息
- 将当前循环加入历史记录
- 记录计时器和结束时间
- 打印循环统计信息
"""
if self.context.current_cycle_detail:
self.context.current_cycle_detail.set_loop_info(loop_info)
self.context.history_loop.append(self.context.current_cycle_detail)
self.context.current_cycle_detail.timers = cycle_timers
self.context.current_cycle_detail.end_time = time.time()
self.print_cycle_info(cycle_timers)
def print_cycle_info(self, cycle_timers: Dict[str, float]):
"""
打印循环统计信息
Args:
cycle_timers: 循环计时器字典
功能说明:
- 格式化各阶段的耗时信息
- 计算总体循环持续时间
- 输出详细的性能统计日志
- 显示选择的动作类型
"""
if not self.context.current_cycle_detail:
return
timer_strings = []
for name, elapsed in cycle_timers.items():
formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}"
timer_strings.append(f"{name}: {formatted_time}")
# 获取动作类型,兼容新旧格式
# 获取动作类型
action_type = "未知动作"
if self.context.current_cycle_detail:
loop_plan_info = self.context.current_cycle_detail.loop_plan_info
actions = loop_plan_info.get("action_result")
if isinstance(actions, list) and actions:
# 从actions列表中提取所有action_type
action_types = [a.get("action_type", "未知") for a in actions]
action_type = ", ".join(action_types)
elif isinstance(actions, dict):
# 兼容旧格式
action_type = actions.get("action_type", "未知动作")
if self.context.current_cycle_detail.end_time and self.context.current_cycle_detail.start_time:
duration = self.context.current_cycle_detail.end_time - self.context.current_cycle_detail.start_time
logger.info(
f"{self.context.log_prefix}{self.context.current_cycle_detail.cycle_id}次思考,"
f"耗时: {duration:.1f}秒, "
f"选择动作: {action_type}" + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "")
)

View File

@@ -1,162 +0,0 @@
import asyncio
import time
from typing import Optional
from src.common.logger import get_logger
from src.config.config import global_config
from .hfc_context import HfcContext
from src.chat.chat_loop.sleep_manager import sleep_manager
logger = get_logger("hfc")
class EnergyManager:
def __init__(self, context: HfcContext):
"""
初始化能量管理器
Args:
context: HFC聊天上下文对象
功能说明:
- 管理聊天机器人的能量值系统
- 根据聊天模式自动调整能量消耗
- 控制能量值的衰减和记录
"""
self.context = context
self._energy_task: Optional[asyncio.Task] = None
self.last_energy_log_time = 0
self.energy_log_interval = 90
async def start(self):
"""
启动能量管理器
功能说明:
- 检查运行状态,避免重复启动
- 创建能量循环异步任务
- 设置任务完成回调
- 记录启动日志
"""
if self.context.running and not self._energy_task:
self._energy_task = asyncio.create_task(self._energy_loop())
self._energy_task.add_done_callback(self._handle_energy_completion)
logger.info(f"{self.context.log_prefix} 能量管理器已启动")
async def stop(self):
"""
停止能量管理器
功能说明:
- 取消正在运行的能量循环任务
- 等待任务完全停止
- 记录停止日志
"""
if self._energy_task and not self._energy_task.done():
self._energy_task.cancel()
await asyncio.sleep(0)
logger.info(f"{self.context.log_prefix} 能量管理器已停止")
async def _energy_loop(self):
"""
能量与睡眠压力管理的主循环
功能说明:
- 每10秒执行一次能量更新
- 根据群聊配置设置固定的聊天模式和能量值
- 在自动模式下根据聊天模式进行能量衰减
- NORMAL模式每次衰减0.3FOCUS模式每次衰减0.6
- 确保能量值不低于0.3的最小值
"""
while self.context.running:
await asyncio.sleep(10)
if not self.context.chat_stream:
continue
# 判断当前是否为睡眠时间
is_sleeping = sleep_manager.SleepManager().is_sleeping()
if is_sleeping:
# 睡眠中:减少睡眠压力
decay_per_10s = global_config.sleep_system.sleep_pressure_decay_rate / 6
self.context.sleep_pressure -= decay_per_10s
self.context.sleep_pressure = max(self.context.sleep_pressure, 0)
self._log_sleep_pressure_change("睡眠压力释放")
self.context.save_context_state()
else:
# 清醒时:处理能量衰减
is_group_chat = self.context.chat_stream.group_info is not None
if is_group_chat:
self.context.energy_value = 25
await asyncio.sleep(12)
self.context.energy_value -= 0.5
self.context.energy_value = max(self.context.energy_value, 0.3)
self._log_energy_change("能量值衰减")
self.context.save_context_state()
def _should_log_energy(self) -> bool:
"""
判断是否应该记录能量变化日志
Returns:
bool: 如果距离上次记录超过间隔时间则返回True
功能说明:
- 控制能量日志的记录频率,避免日志过于频繁
- 默认间隔90秒记录一次详细日志
- 其他时间使用调试级别日志
"""
current_time = time.time()
if current_time - self.last_energy_log_time >= self.energy_log_interval:
self.last_energy_log_time = current_time
return True
return False
def increase_sleep_pressure(self):
"""
在执行动作后增加睡眠压力
"""
increment = global_config.sleep_system.sleep_pressure_increment
self.context.sleep_pressure += increment
self.context.sleep_pressure = min(self.context.sleep_pressure, 100.0) # 设置一个100的上限
self._log_sleep_pressure_change("执行动作,睡眠压力累积")
self.context.save_context_state()
def _log_energy_change(self, action: str, reason: str = ""):
"""
记录能量变化日志
Args:
action: 能量变化的动作描述
reason: 可选的变化原因
功能说明:
- 根据时间间隔决定使用info还是debug级别的日志
- 格式化能量值显示(保留一位小数)
- 可选择性地包含变化原因
"""
if self._should_log_energy():
log_message = f"{self.context.log_prefix} {action},当前能量值:{self.context.energy_value:.1f}"
if reason:
log_message = (
f"{self.context.log_prefix} {action}{reason},当前能量值:{self.context.energy_value:.1f}"
)
logger.info(log_message)
else:
log_message = f"{self.context.log_prefix} {action},当前能量值:{self.context.energy_value:.1f}"
if reason:
log_message = (
f"{self.context.log_prefix} {action}{reason},当前能量值:{self.context.energy_value:.1f}"
)
logger.debug(log_message)
def _log_sleep_pressure_change(self, action: str):
"""
记录睡眠压力变化日志
"""
# 使用与能量日志相同的频率控制
if self._should_log_energy():
logger.info(f"{self.context.log_prefix} {action},当前睡眠压力:{self.context.sleep_pressure:.1f}")
else:
logger.debug(f"{self.context.log_prefix} {action},当前睡眠压力:{self.context.sleep_pressure:.1f}")

View File

@@ -1,614 +0,0 @@
import asyncio
import time
import traceback
import random
from typing import Optional, List, Dict, Any
from collections import deque
from src.common.logger import get_logger
from src.config.config import global_config
from src.person_info.relationship_builder_manager import relationship_builder_manager
from src.chat.express.expression_learner import expression_learner_manager
from src.chat.chat_loop.sleep_manager.sleep_manager import SleepManager, SleepState
from src.plugin_system.apis import message_api
from .hfc_context import HfcContext
from .energy_manager import EnergyManager
from .proactive.proactive_thinker import ProactiveThinker
from .cycle_processor import CycleProcessor
from .response_handler import ResponseHandler
from .cycle_tracker import CycleTracker
from .sleep_manager.wakeup_manager import WakeUpManager
from .proactive.events import ProactiveTriggerEvent
logger = get_logger("hfc")
class HeartFChatting:
def __init__(self, chat_id: str):
"""
初始化心跳聊天管理器
Args:
chat_id: 聊天ID标识符
功能说明:
- 创建聊天上下文和所有子管理器
- 初始化循环跟踪器、响应处理器、循环处理器等核心组件
- 设置能量管理器、主动思考器和普通模式处理器
- 初始化聊天模式并记录初始化完成日志
"""
self.context = HfcContext(chat_id)
self.cycle_tracker = CycleTracker(self.context)
self.response_handler = ResponseHandler(self.context)
self.cycle_processor = CycleProcessor(self.context, self.response_handler, self.cycle_tracker)
self.energy_manager = EnergyManager(self.context)
self.proactive_thinker = ProactiveThinker(self.context, self.cycle_processor)
self.wakeup_manager = WakeUpManager(self.context)
self.sleep_manager = SleepManager()
# 将唤醒度管理器设置到上下文中
self.context.wakeup_manager = self.wakeup_manager
self.context.energy_manager = self.energy_manager
self.context.sleep_manager = self.sleep_manager
# 将HeartFChatting实例设置到上下文中以便其他组件可以调用其方法
self.context.chat_instance = self
self._loop_task: Optional[asyncio.Task] = None
self._proactive_monitor_task: Optional[asyncio.Task] = None
# 记录最近3次的兴趣度
self.recent_interest_records: deque = deque(maxlen=3)
self._initialize_chat_mode()
logger.info(f"{self.context.log_prefix} HeartFChatting 初始化完成")
def _initialize_chat_mode(self):
"""
初始化聊天模式
功能说明:
- 检测是否为群聊环境
- 根据全局配置设置强制聊天模式
- 在focus模式下设置能量值为35
- 在normal模式下设置能量值为15
- 如果是auto模式则保持默认设置
"""
is_group_chat = self.context.chat_stream.group_info is not None if self.context.chat_stream else False
if is_group_chat and global_config.chat.group_chat_mode != "auto":
self.context.energy_value = 25
async def start(self):
"""
启动心跳聊天系统
功能说明:
- 检查是否已经在运行,避免重复启动
- 初始化关系构建器和表达学习器
- 启动能量管理器和主动思考器
- 创建主聊天循环任务并设置完成回调
- 记录启动完成日志
"""
if self.context.running:
return
self.context.running = True
self.context.relationship_builder = relationship_builder_manager.get_or_create_builder(self.context.stream_id)
self.context.expression_learner = expression_learner_manager.get_expression_learner(self.context.stream_id)
# 启动主动思考监视器
if global_config.chat.enable_proactive_thinking:
self._proactive_monitor_task = asyncio.create_task(self._proactive_monitor_loop())
self._proactive_monitor_task.add_done_callback(self._handle_proactive_monitor_completion)
logger.info(f"{self.context.log_prefix} 主动思考监视器已启动")
await self.wakeup_manager.start()
self._loop_task = asyncio.create_task(self._main_chat_loop())
self._loop_task.add_done_callback(self._handle_loop_completion)
logger.info(f"{self.context.log_prefix} HeartFChatting 启动完成")
async def stop(self):
"""
停止心跳聊天系统
功能说明:
- 检查是否正在运行,避免重复停止
- 设置运行状态为False
- 停止能量管理器和主动思考器
- 取消主聊天循环任务
- 记录停止完成日志
"""
if not self.context.running:
return
self.context.running = False
# 停止主动思考监视器
if self._proactive_monitor_task and not self._proactive_monitor_task.done():
self._proactive_monitor_task.cancel()
await asyncio.sleep(0)
logger.info(f"{self.context.log_prefix} 主动思考监视器已停止")
await self.wakeup_manager.stop()
if self._loop_task and not self._loop_task.done():
self._loop_task.cancel()
await asyncio.sleep(0)
logger.info(f"{self.context.log_prefix} HeartFChatting 已停止")
def _handle_loop_completion(self, task: asyncio.Task):
"""
处理主循环任务完成
Args:
task: 完成的异步任务对象
功能说明:
- 处理任务异常完成的情况
- 区分正常停止和异常终止
- 记录相应的日志信息
- 处理取消任务的情况
"""
try:
if exception := task.exception():
logger.error(f"{self.context.log_prefix} HeartFChatting: 脱离了聊天(异常): {exception}")
logger.error(traceback.format_exc())
else:
logger.info(f"{self.context.log_prefix} HeartFChatting: 脱离了聊天 (外部停止)")
except asyncio.CancelledError:
logger.info(f"{self.context.log_prefix} HeartFChatting: 结束了聊天")
def _handle_proactive_monitor_completion(self, task: asyncio.Task):
"""
处理主动思考监视器任务完成
Args:
task: 完成的异步任务对象
功能说明:
- 处理任务异常完成的情况
- 记录任务正常结束或被取消的日志
"""
try:
if exception := task.exception():
logger.error(f"{self.context.log_prefix} 主动思考监视器异常: {exception}")
else:
logger.info(f"{self.context.log_prefix} 主动思考监视器正常结束")
except asyncio.CancelledError:
logger.info(f"{self.context.log_prefix} 主动思考监视器被取消")
async def _proactive_monitor_loop(self):
"""
主动思考监视器循环
功能说明:
- 定期检查是否需要进行主动思考
- 计算聊天沉默时间,并与动态思考间隔比较
- 当沉默时间超过阈值时,触发主动思考
- 处理思考过程中的异常
"""
while self.context.running:
await asyncio.sleep(15)
if not self._should_enable_proactive_thinking():
continue
current_time = time.time()
silence_duration = current_time - self.context.last_message_time
target_interval = self._get_dynamic_thinking_interval()
if silence_duration >= target_interval:
try:
formatted_time = self._format_duration(silence_duration)
event = ProactiveTriggerEvent(
source="silence_monitor",
reason=f"聊天已沉默 {formatted_time}",
metadata={"silence_duration": silence_duration},
)
await self.proactive_thinker.think(event)
self.context.last_message_time = current_time
except Exception as e:
logger.error(f"{self.context.log_prefix} 主动思考触发执行出错: {e}")
logger.error(traceback.format_exc())
def _should_enable_proactive_thinking(self) -> bool:
"""
判断是否应启用主动思考
Returns:
bool: 如果应启用主动思考则返回True否则返回False
功能说明:
- 检查全局配置和特定聊天设置
- 支持按群聊和私聊分别配置
- 支持白名单模式,只在特定聊天中启用
"""
if not self.context.chat_stream:
return False
is_group_chat = self.context.chat_stream.group_info is not None
if is_group_chat and not global_config.chat.proactive_thinking_in_group:
return False
if not is_group_chat and not global_config.chat.proactive_thinking_in_private:
return False
stream_parts = self.context.stream_id.split(":")
current_chat_identifier = f"{stream_parts}:{stream_parts}" if len(stream_parts) >= 2 else self.context.stream_id
enable_list = getattr(
global_config.chat,
"proactive_thinking_enable_in_groups" if is_group_chat else "proactive_thinking_enable_in_private",
[],
)
return not enable_list or current_chat_identifier in enable_list
def _get_dynamic_thinking_interval(self) -> float:
"""
获取动态思考间隔时间
Returns:
float: 思考间隔秒数
功能说明:
- 尝试从timing_utils导入正态分布间隔函数
- 根据配置计算动态间隔,增加随机性
- 在无法导入或计算出错时,回退到固定的间隔
"""
try:
from src.utils.timing_utils import get_normal_distributed_interval
base_interval = global_config.chat.proactive_thinking_interval
delta_sigma = getattr(global_config.chat, "delta_sigma", 120)
if base_interval <= 0:
base_interval = abs(base_interval)
if delta_sigma < 0:
delta_sigma = abs(delta_sigma)
if base_interval == 0 and delta_sigma == 0:
return 300
if delta_sigma == 0:
return base_interval
sigma_percentage = delta_sigma / base_interval if base_interval > 0 else delta_sigma / 1000
return get_normal_distributed_interval(base_interval, sigma_percentage, 1, 86400, use_3sigma_rule=True)
except ImportError:
logger.warning(f"{self.context.log_prefix} timing_utils不可用使用固定间隔")
return max(300, abs(global_config.chat.proactive_thinking_interval))
except Exception as e:
logger.error(f"{self.context.log_prefix} 动态间隔计算出错: {e},使用固定间隔")
return max(300, abs(global_config.chat.proactive_thinking_interval))
def _format_duration(self, seconds: float) -> str:
"""
格式化时长为可读字符串
Args:
seconds: 时长秒数
Returns:
str: 格式化后的字符串 (例如 "1小时2分3秒")
"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
parts = []
if hours > 0:
parts.append(f"{hours}小时")
if minutes > 0:
parts.append(f"{minutes}")
if secs > 0 or not parts:
parts.append(f"{secs}")
return "".join(parts)
async def _main_chat_loop(self):
"""
主聊天循环
功能说明:
- 持续运行聊天处理循环
- 只有在有新消息时才进行思考循环
- 无新消息时等待新消息到达(由主动思考系统单独处理主动发言)
- 处理取消和异常情况
- 在异常时尝试重新启动循环
"""
try:
while self.context.running:
has_new_messages = await self._loop_body()
if has_new_messages:
# 有新消息时,继续快速检查是否还有更多消息
await asyncio.sleep(1)
else:
# 无新消息时,等待较长时间再检查
# 这里只是为了定期检查系统状态,不进行思考循环
# 真正的新消息响应依赖于消息到达时的通知
await asyncio.sleep(1.0)
except asyncio.CancelledError:
logger.info(f"{self.context.log_prefix} 麦麦已关闭聊天")
except Exception:
logger.error(f"{self.context.log_prefix} 麦麦聊天意外错误将于3s后尝试重新启动")
print(traceback.format_exc())
await asyncio.sleep(3)
self._loop_task = asyncio.create_task(self._main_chat_loop())
logger.error(f"{self.context.log_prefix} 结束了当前聊天循环")
async def _loop_body(self) -> bool:
"""
单次循环体处理
Returns:
bool: 是否处理了新消息
功能说明:
- 检查是否处于睡眠模式,如果是则处理唤醒度逻辑
- 获取最近的新消息(过滤机器人自己的消息和命令)
- 只有在有新消息时才进行思考循环处理
- 更新最后消息时间和读取时间
- 根据当前聊天模式执行不同的处理逻辑
- FOCUS模式直接处理所有消息并检查退出条件
- NORMAL模式检查进入FOCUS模式的条件并通过normal_mode_handler处理消息
"""
# --- 核心状态更新 ---
await self.sleep_manager.update_sleep_state(self.wakeup_manager)
current_sleep_state = self.sleep_manager.get_current_sleep_state()
is_sleeping = current_sleep_state == SleepState.SLEEPING
is_in_insomnia = current_sleep_state == SleepState.INSOMNIA
# 核心修复:在睡眠模式(包括失眠)下获取消息时,不过滤命令消息,以确保@消息能被接收
filter_command_flag = not (is_sleeping or is_in_insomnia)
recent_messages = message_api.get_messages_by_time_in_chat(
chat_id=self.context.stream_id,
start_time=self.context.last_read_time,
end_time=time.time(),
limit=10,
limit_mode="latest",
filter_mai=True,
filter_command=filter_command_flag,
)
has_new_messages = bool(recent_messages)
new_message_count = len(recent_messages)
# 只有在有新消息时才进行思考循环处理
if has_new_messages:
self.context.last_message_time = time.time()
self.context.last_read_time = time.time()
# --- 专注模式安静群组检查 ---
quiet_groups = global_config.chat.focus_mode_quiet_groups
if quiet_groups and self.context.chat_stream:
is_group_chat = self.context.chat_stream.group_info is not None
if is_group_chat:
try:
platform = self.context.chat_stream.platform
group_id = self.context.chat_stream.group_info.group_id
# 兼容不同QQ适配器的平台名称
is_qq_platform = platform in ["qq", "napcat"]
current_chat_identifier = f"{platform}:{group_id}"
config_identifier_for_qq = f"qq:{group_id}"
is_in_quiet_list = (current_chat_identifier in quiet_groups or
(is_qq_platform and config_identifier_for_qq in quiet_groups))
if is_in_quiet_list:
is_mentioned_in_batch = False
for msg in recent_messages:
if msg.get("is_mentioned"):
is_mentioned_in_batch = True
break
if not is_mentioned_in_batch:
logger.info(f"{self.context.log_prefix} 在专注安静模式下,因未被提及而忽略了消息。")
return True # 消耗消息但不做回复
except Exception as e:
logger.error(f"{self.context.log_prefix} 检查专注安静群组时出错: {e}")
# 处理唤醒度逻辑
if current_sleep_state in [SleepState.SLEEPING, SleepState.PREPARING_SLEEP, SleepState.INSOMNIA]:
self._handle_wakeup_messages(recent_messages)
# 再次获取最新状态,因为 handle_wakeup 可能导致状态变为 WOKEN_UP
current_sleep_state = self.sleep_manager.get_current_sleep_state()
if current_sleep_state == SleepState.SLEEPING:
# 只有在纯粹的 SLEEPING 状态下才跳过消息处理
return True
if current_sleep_state == SleepState.WOKEN_UP:
logger.info(f"{self.context.log_prefix} 从睡眠中被唤醒,将处理积压的消息。")
# 根据聊天模式处理新消息
should_process, interest_value = await self._should_process_messages(recent_messages)
if not should_process:
# 消息数量不足或兴趣不够,等待
await asyncio.sleep(0.5)
return True # Skip rest of the logic for this iteration
# Messages should be processed
action_type = await self.cycle_processor.observe(interest_value=interest_value)
# 管理no_reply计数器
if action_type != "no_reply":
self.recent_interest_records.clear()
self.context.no_reply_consecutive = 0
logger.debug(f"{self.context.log_prefix} 执行了{action_type}动作重置no_reply计数器")
else: # action_type == "no_reply"
self.context.no_reply_consecutive += 1
self._determine_form_type()
# 在一轮动作执行完毕后,增加睡眠压力
if self.context.energy_manager and global_config.sleep_system.enable_insomnia_system:
if action_type not in ["no_reply", "no_action"]:
self.context.energy_manager.increase_sleep_pressure()
# 如果成功观察,增加能量值并重置累积兴趣值
self.context.energy_value += 1 / global_config.chat.focus_value
# 重置累积兴趣值,因为消息已经被成功处理
self.context.breaking_accumulated_interest = 0.0
logger.info(
f"{self.context.log_prefix} 能量值增加,当前能量值:{self.context.energy_value:.1f},重置累积兴趣值"
)
# 更新上一帧的睡眠状态
self.context.was_sleeping = is_sleeping
# --- 重新入睡逻辑 ---
# 如果被吵醒了,并且在一定时间内没有新消息,则尝试重新入睡
if self.sleep_manager.get_current_sleep_state() == SleepState.WOKEN_UP and not has_new_messages:
re_sleep_delay = global_config.sleep_system.re_sleep_delay_minutes * 60
# 使用 last_message_time 来判断空闲时间
if time.time() - self.context.last_message_time > re_sleep_delay:
logger.info(
f"{self.context.log_prefix} 已被唤醒且超过 {re_sleep_delay / 60} 分钟无新消息,尝试重新入睡。"
)
self.sleep_manager.reset_sleep_state_after_wakeup()
# 保存HFC上下文状态
self.context.save_context_state()
return has_new_messages
def _handle_wakeup_messages(self, messages):
"""
处理休眠状态下的消息,累积唤醒度
Args:
messages: 消息列表
功能说明:
- 区分私聊和群聊消息
- 检查群聊消息是否艾特了机器人
- 调用唤醒度管理器累积唤醒度
- 如果达到阈值则唤醒并进入愤怒状态
"""
if not self.wakeup_manager:
return
is_private_chat = self.context.chat_stream.group_info is None if self.context.chat_stream else False
for message in messages:
is_mentioned = False
# 检查群聊消息是否艾特了机器人
if not is_private_chat:
# 最终修复:直接使用消息对象中由上游处理好的 is_mention 字段。
# 该字段在 message.py 的 MessageRecv._process_single_segment 中被设置。
if message.get("is_mentioned"):
is_mentioned = True
# 累积唤醒度
woke_up = self.wakeup_manager.add_wakeup_value(is_private_chat, is_mentioned)
if woke_up:
logger.info(f"{self.context.log_prefix} 被消息吵醒,进入愤怒状态!")
break
def _determine_form_type(self) -> str:
"""判断使用哪种形式的no_reply"""
# 检查是否启用breaking模式
if not getattr(global_config.chat, "enable_breaking_mode", False):
logger.info(f"{self.context.log_prefix} breaking模式已禁用使用waiting形式")
self.context.focus_energy = 1
return "waiting"
# 如果连续no_reply次数少于3次使用waiting形式
if self.context.no_reply_consecutive <= 3:
self.context.focus_energy = 1
return "waiting"
else:
# 使用累积兴趣值而不是最近3次的记录
total_interest = self.context.breaking_accumulated_interest
# 计算调整后的阈值
adjusted_threshold = 1 / global_config.chat.get_current_talk_frequency(self.context.stream_id)
logger.info(
f"{self.context.log_prefix} 累积兴趣值: {total_interest:.2f}, 调整后阈值: {adjusted_threshold:.2f}"
)
# 如果累积兴趣值小于阈值进入breaking形式
if total_interest < adjusted_threshold:
logger.info(f"{self.context.log_prefix} 累积兴趣度不足进入breaking形式")
self.context.focus_energy = random.randint(3, 6)
return "breaking"
else:
logger.info(f"{self.context.log_prefix} 累积兴趣度充足使用waiting形式")
self.context.focus_energy = 1
return "waiting"
async def _should_process_messages(self, new_message: List[Dict[str, Any]]) -> tuple[bool, float]:
"""
统一判断是否应该处理消息的函数
根据当前循环模式和消息内容决定是否继续处理
"""
if not new_message:
return False, 0.0
new_message_count = len(new_message)
talk_frequency = global_config.chat.get_current_talk_frequency(self.context.stream_id)
modified_exit_count_threshold = self.context.focus_energy * 0.5 / talk_frequency
modified_exit_interest_threshold = 1.5 / talk_frequency
# 计算当前批次消息的兴趣值
batch_interest = 0.0
for msg_dict in new_message:
interest_value = msg_dict.get("interest_value", 0.0)
if msg_dict.get("processed_plain_text", ""):
batch_interest += interest_value
# 在breaking形式下累积所有消息的兴趣值
if new_message_count > 0:
self.context.breaking_accumulated_interest += batch_interest
total_interest = self.context.breaking_accumulated_interest
else:
total_interest = self.context.breaking_accumulated_interest
if new_message_count >= modified_exit_count_threshold:
# 记录兴趣度到列表
self.recent_interest_records.append(total_interest)
# 重置累积兴趣值,因为已经达到了消息数量阈值
self.context.breaking_accumulated_interest = 0.0
logger.info(
f"{self.context.log_prefix} 累计消息数量达到{new_message_count}条(>{modified_exit_count_threshold:.1f}),结束等待,累积兴趣值: {total_interest:.2f}"
)
return True, total_interest / new_message_count
# 检查累计兴趣值
if new_message_count > 0:
# 只在兴趣值变化时输出log
if not hasattr(self, "_last_accumulated_interest") or total_interest != self._last_accumulated_interest:
logger.info(
f"{self.context.log_prefix} breaking形式当前累积兴趣值: {total_interest:.2f}, 专注度: {global_config.chat.focus_value:.1f}"
)
self._last_accumulated_interest = total_interest
if total_interest >= modified_exit_interest_threshold:
# 记录兴趣度到列表
self.recent_interest_records.append(total_interest)
# 重置累积兴趣值,因为已经达到了兴趣值阈值
self.context.breaking_accumulated_interest = 0.0
logger.info(
f"{self.context.log_prefix} 累计兴趣值达到{total_interest:.2f}(>{modified_exit_interest_threshold:.1f}),结束等待"
)
return True, total_interest / new_message_count
# 每10秒输出一次等待状态
if (
int(time.time() - self.context.last_read_time) > 0
and int(time.time() - self.context.last_read_time) % 10 == 0
):
logger.info(
f"{self.context.log_prefix} 已等待{time.time() - self.context.last_read_time:.0f}秒,累计{new_message_count}条消息,累积兴趣{total_interest:.1f},继续等待..."
)
await asyncio.sleep(0.5)
return False, 0.0

View File

@@ -1,84 +0,0 @@
from typing import List, Optional, TYPE_CHECKING
import time
from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager
from src.person_info.relationship_builder_manager import RelationshipBuilder
from src.chat.express.expression_learner import ExpressionLearner
from src.chat.planner_actions.action_manager import ActionManager
from src.chat.chat_loop.hfc_utils import CycleDetail
from src.config.config import global_config
if TYPE_CHECKING:
from .sleep_manager.wakeup_manager import WakeUpManager
from .energy_manager import EnergyManager
from .heartFC_chat import HeartFChatting
from .sleep_manager.sleep_manager import SleepManager
class HfcContext:
def __init__(self, chat_id: str):
"""
初始化HFC聊天上下文
Args:
chat_id: 聊天ID标识符
功能说明:
- 存储和管理单个聊天会话的所有状态信息
- 包含聊天流、关系构建器、表达学习器等核心组件
- 管理聊天模式、能量值、时间戳等关键状态
- 提供循环历史记录和当前循环详情的存储
- 集成唤醒度管理器,处理休眠状态下的唤醒机制
Raises:
ValueError: 如果找不到对应的聊天流
"""
self.stream_id: str = chat_id
self.chat_stream: Optional[ChatStream] = get_chat_manager().get_stream(self.stream_id)
if not self.chat_stream:
raise ValueError(f"无法找到聊天流: {self.stream_id}")
self.log_prefix = f"[{get_chat_manager().get_stream_name(self.stream_id) or self.stream_id}]"
self.relationship_builder: Optional[RelationshipBuilder] = None
self.expression_learner: Optional[ExpressionLearner] = None
self.energy_value = self.chat_stream.energy_value
self.sleep_pressure = self.chat_stream.sleep_pressure
self.was_sleeping = False # 用于检测睡眠状态的切换
self.last_message_time = time.time()
self.last_read_time = time.time() - 10
# 从聊天流恢复breaking累积兴趣值
self.breaking_accumulated_interest = getattr(self.chat_stream, "breaking_accumulated_interest", 0.0)
self.action_manager = ActionManager()
self.running: bool = False
self.history_loop: List[CycleDetail] = []
self.cycle_counter = 0
self.current_cycle_detail: Optional[CycleDetail] = None
# 唤醒度管理器 - 延迟初始化以避免循环导入
self.wakeup_manager: Optional["WakeUpManager"] = None
self.energy_manager: Optional["EnergyManager"] = None
self.sleep_manager: Optional["SleepManager"] = None
# 从聊天流获取focus_energy如果没有则使用配置文件中的值
self.focus_energy = getattr(self.chat_stream, "focus_energy", global_config.chat.focus_value)
self.no_reply_consecutive = 0
self.total_interest = 0.0
# breaking形式下的累积兴趣值
self.breaking_accumulated_interest = 0.0
# 引用HeartFChatting实例以便其他组件可以调用其方法
self.chat_instance: "HeartFChatting"
def save_context_state(self):
"""将当前状态保存到聊天流"""
if self.chat_stream:
self.chat_stream.energy_value = self.energy_value
self.chat_stream.sleep_pressure = self.sleep_pressure
self.chat_stream.focus_energy = self.focus_energy
self.chat_stream.no_reply_consecutive = self.no_reply_consecutive
self.chat_stream.breaking_accumulated_interest = self.breaking_accumulated_interest

View File

@@ -1,172 +0,0 @@
import time
from typing import Optional, Dict, Any, Union
from src.common.logger import get_logger
from src.chat.message_receive.chat_stream import get_chat_manager
from src.plugin_system.apis import send_api
from maim_message.message_base import GroupInfo
logger = get_logger("hfc")
class CycleDetail:
"""
循环信息记录类
功能说明:
- 记录单次思考循环的详细信息
- 包含循环ID、思考ID、时间戳等基本信息
- 存储循环的规划信息和动作信息
- 提供序列化和转换功能
"""
def __init__(self, cycle_id: Union[int, str]):
"""
初始化循环详情记录
Args:
cycle_id: 循环ID用于标识循环的顺序
功能说明:
- 设置循环基本标识信息
- 初始化时间戳和计时器
- 准备循环信息存储容器
"""
self.cycle_id = cycle_id
self.thinking_id = ""
self.start_time = time.time()
self.end_time: Optional[float] = None
self.timers: Dict[str, float] = {}
self.loop_plan_info: Dict[str, Any] = {}
self.loop_action_info: Dict[str, Any] = {}
def to_dict(self) -> Dict[str, Any]:
"""
将循环信息转换为字典格式
Returns:
dict: 包含所有循环信息的字典,已处理循环引用和序列化问题
功能说明:
- 递归转换复杂对象为可序列化格式
- 防止循环引用导致的无限递归
- 限制递归深度避免栈溢出
- 只保留基本数据类型和可序列化的值
"""
def convert_to_serializable(obj, depth=0, seen=None):
if seen is None:
seen = set()
# 防止递归过深
if depth > 5: # 降低递归深度限制
return str(obj)
# 防止循环引用
obj_id = id(obj)
if obj_id in seen:
return str(obj)
seen.add(obj_id)
try:
if hasattr(obj, "to_dict"):
# 对于有to_dict方法的对象直接调用其to_dict方法
return obj.to_dict()
elif isinstance(obj, dict):
# 对于字典,只保留基本类型和可序列化的值
return {
k: convert_to_serializable(v, depth + 1, seen)
for k, v in obj.items()
if isinstance(k, (str, int, float, bool))
}
elif isinstance(obj, (list, tuple)):
# 对于列表和元组,只保留可序列化的元素
return [
convert_to_serializable(item, depth + 1, seen)
for item in obj
if not isinstance(item, (dict, list, tuple))
or isinstance(item, (str, int, float, bool, type(None)))
]
elif isinstance(obj, (str, int, float, bool, type(None))):
return obj
else:
return str(obj)
finally:
seen.remove(obj_id)
return {
"cycle_id": self.cycle_id,
"start_time": self.start_time,
"end_time": self.end_time,
"timers": self.timers,
"thinking_id": self.thinking_id,
"loop_plan_info": convert_to_serializable(self.loop_plan_info),
"loop_action_info": convert_to_serializable(self.loop_action_info),
}
def set_loop_info(self, loop_info: Dict[str, Any]):
"""
设置循环信息
Args:
loop_info: 包含循环规划和动作信息的字典
功能说明:
- 从传入的循环信息中提取规划和动作信息
- 更新当前循环详情的相关字段
"""
self.loop_plan_info = loop_info["loop_plan_info"]
self.loop_action_info = loop_info["loop_action_info"]
async def send_typing(user_id):
"""
发送打字状态指示
功能说明:
- 创建内心聊天流(用于状态显示)
- 发送typing状态消息
- 不存储到消息记录中
- 用于S4U功能的视觉反馈
"""
group_info = GroupInfo(platform="amaidesu_default", group_id="114514", group_name="内心")
chat = await get_chat_manager().get_or_create_stream(
platform="amaidesu_default",
user_info=None,
group_info=group_info,
)
from plugin_system.core.event_manager import event_manager
from src.plugins.built_in.napcat_adapter_plugin.event_types import NapcatEvent
# 设置正在输入状态
await event_manager.trigger_event(NapcatEvent.PERSONAL.SET_INPUT_STATUS,user_id=user_id,event_type=1)
await send_api.custom_to_stream(
message_type="state", content="typing", stream_id=chat.stream_id, storage_message=False
)
async def stop_typing():
"""
停止打字状态指示
功能说明:
- 创建内心聊天流(用于状态显示)
- 发送stop_typing状态消息
- 不存储到消息记录中
- 结束S4U功能的视觉反馈
"""
group_info = GroupInfo(platform="amaidesu_default", group_id="114514", group_name="内心")
chat = await get_chat_manager().get_or_create_stream(
platform="amaidesu_default",
user_info=None,
group_info=group_info,
)
await send_api.custom_to_stream(
message_type="state", content="stop_typing", stream_id=chat.stream_id, storage_message=False
)

View File

@@ -1,14 +0,0 @@
from dataclasses import dataclass, field
from typing import Optional, Dict, Any
@dataclass
class ProactiveTriggerEvent:
"""
主动思考触发事件的数据类
"""
source: str # 触发源的标识,例如 "silence_monitor", "insomnia_manager"
reason: str # 触发的具体原因,例如 "聊天已沉默10分钟", "深夜emo"
metadata: Optional[Dict[str, Any]] = field(default_factory=dict) # 可选的元数据,用于传递额外信息
related_message_id: Optional[str] = None # 关联的消息ID用于加载上下文

View File

@@ -1,319 +0,0 @@
import time
import traceback
from typing import TYPE_CHECKING, Dict, Any
from src.common.logger import get_logger
from src.plugin_system.base.component_types import ChatMode
from ..hfc_context import HfcContext
from .events import ProactiveTriggerEvent
from src.plugin_system.apis import generator_api
from src.plugin_system.apis.generator_api import process_human_text
from src.schedule.schedule_manager import schedule_manager
from src.plugin_system import tool_api
from src.config.config import global_config
from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat, build_readable_messages_with_id
from src.mood.mood_manager import mood_manager
from src.common.database.sqlalchemy_database_api import store_action_info, db_get
from src.common.database.sqlalchemy_models import Messages
if TYPE_CHECKING:
from ..cycle_processor import CycleProcessor
logger = get_logger("hfc")
class ProactiveThinker:
"""
主动思考器,负责处理和执行主动思考事件。
当接收到 ProactiveTriggerEvent 时,它会根据事件内容进行一系列决策和操作,
例如调整情绪、调用规划器生成行动,并最终可能产生一个主动的回复。
"""
def __init__(self, context: HfcContext, cycle_processor: "CycleProcessor"):
"""
初始化主动思考器。
Args:
context (HfcContext): HFC聊天上下文对象提供了当前聊天会话的所有背景信息。
cycle_processor (CycleProcessor): 循环处理器,用于执行主动思考后产生的动作。
功能说明:
- 接收并处理主动思考事件 (ProactiveTriggerEvent)。
- 在思考前根据事件类型执行预处理操作,如修改当前情绪状态。
- 调用行动规划器 (Action Planner) 来决定下一步应该做什么。
- 如果规划结果是发送消息则调用生成器API生成回复并发送。
"""
self.context = context
self.cycle_processor = cycle_processor
async def think(self, trigger_event: ProactiveTriggerEvent):
"""
主动思考的统一入口API。
这是外部触发主动思考时调用的主要方法。
Args:
trigger_event (ProactiveTriggerEvent): 描述触发上下文的事件对象,包含了思考的来源和原因。
"""
logger.info(
f"{self.context.log_prefix} 接收到主动思考事件: "
f"来源='{trigger_event.source}', 原因='{trigger_event.reason}'"
)
try:
# 步骤 1: 根据事件类型执行思考前的准备工作,例如调整情绪。
await self._prepare_for_thinking(trigger_event)
# 步骤 2: 执行核心的思考和决策逻辑。
await self._execute_proactive_thinking(trigger_event)
except Exception as e:
# 捕获并记录在思考过程中发生的任何异常。
logger.error(f"{self.context.log_prefix} 主动思考 think 方法执行异常: {e}")
logger.error(traceback.format_exc())
async def _prepare_for_thinking(self, trigger_event: ProactiveTriggerEvent):
"""
根据事件类型,在正式思考前执行准备工作。
目前主要是处理来自失眠管理器的事件,并据此调整情绪。
Args:
trigger_event (ProactiveTriggerEvent): 触发事件。
"""
# 目前只处理来自失眠管理器(insomnia_manager)的事件
if trigger_event.source != "insomnia_manager":
return
try:
# 获取当前聊天的情绪对象
mood_obj = mood_manager.get_mood_by_chat_id(self.context.stream_id)
new_mood = None
# 根据失眠的不同原因设置对应的情绪
if trigger_event.reason == "low_pressure":
new_mood = "精力过剩,毫无睡意"
elif trigger_event.reason == "random":
new_mood = "深夜emo胡思乱想"
elif trigger_event.reason == "goodnight":
new_mood = "有点困了,准备睡觉了"
# 如果成功匹配到了新的情绪,则更新情绪状态
if new_mood:
mood_obj.mood_state = new_mood
mood_obj.last_change_time = time.time()
logger.info(
f"{self.context.log_prefix}'{trigger_event.reason}'"
f"情绪状态被强制更新为: {mood_obj.mood_state}"
)
except Exception as e:
logger.error(f"{self.context.log_prefix} 设置失眠情绪时出错: {e}")
async def _execute_proactive_thinking(self, trigger_event: ProactiveTriggerEvent):
"""
执行主动思考的核心逻辑。
它会调用规划器来决定是否要采取行动,以及采取什么行动。
Args:
trigger_event (ProactiveTriggerEvent): 触发事件。
"""
try:
actions, _ = await self.cycle_processor.action_planner.plan(mode=ChatMode.PROACTIVE)
action_result = actions[0] if actions else {}
action_type = action_result.get("action_type")
if action_type == "proactive_reply":
await self._generate_proactive_content_and_send(action_result, trigger_event)
elif action_type not in ["do_nothing", "no_action"]:
await self.cycle_processor._handle_action(
action=action_result["action_type"],
reasoning=action_result.get("reasoning", ""),
action_data=action_result.get("action_data", {}),
cycle_timers={},
thinking_id="",
action_message=action_result.get("action_message")
)
else:
logger.info(f"{self.context.log_prefix} 主动思考决策: 保持沉默")
except Exception as e:
logger.error(f"{self.context.log_prefix} 主动思考执行异常: {e}")
logger.error(traceback.format_exc())
async def _generate_proactive_content_and_send(self, action_result: Dict[str, Any], trigger_event: ProactiveTriggerEvent):
"""
获取实时信息,构建最终的生成提示词,并生成和发送主动回复。
Args:
action_result (Dict[str, Any]): 规划器返回的动作结果。
trigger_event (ProactiveTriggerEvent): 触发事件。
"""
try:
topic = action_result.get("action_data", {}).get("topic", "随便聊聊")
logger.info(f"{self.context.log_prefix} 主动思考确定主题: '{topic}'")
schedule_block = "你今天没有日程安排。"
if global_config.planning_system.schedule_enable:
if current_activity := schedule_manager.get_current_activity():
schedule_block = f"你当前正在:{current_activity}"
news_block = "暂时没有获取到最新资讯。"
if trigger_event.source != "reminder_system":
# 增加搜索前决策
should_search_prompt = f"""
# 搜索决策
## 任务
判断是否有必要为了话题“{topic}”进行网络搜索。
## 判断标准
- **需要搜索**:时事新闻、知识查询、具体事件等需要外部信息的话题。
- **无需搜索**:日常关心、个人感受、延续已有对话等不需要外部信息的话题。
## 你的决策
输出`SEARCH`或`SKIP`。
"""
from src.llm_models.utils_model import LLMRequest
from src.config.config import model_config
decision_llm = LLMRequest(
model_set=model_config.model_task_config.planner,
request_type="planner"
)
decision, _ = await decision_llm.generate_response_async(prompt=should_search_prompt)
if "SEARCH" in decision:
try:
web_search_tool = tool_api.get_tool_instance("web_search")
if web_search_tool and topic:
try:
search_result_dict = await web_search_tool.execute(function_args={"keyword": topic, "max_results": 10})
if search_result_dict and not search_result_dict.get("error"):
news_block = search_result_dict.get("content", "未能提取有效资讯。")
elif search_result_dict:
logger.warning(f"{self.context.log_prefix} 网络搜索返回错误: {search_result_dict.get('error')}")
except Exception as e:
logger.error(f"{self.context.log_prefix} 网络搜索执行失败: {e}")
else:
logger.warning(f"{self.context.log_prefix} 未找到 web_search 工具实例或主题为空。")
except Exception as e:
logger.error(f"{self.context.log_prefix} 主动思考时网络搜索失败: {e}")
message_list = get_raw_msg_before_timestamp_with_chat(
chat_id=self.context.stream_id,
timestamp=time.time(),
limit=int(global_config.chat.max_context_size * 0.3),
)
chat_context_block, _ = build_readable_messages_with_id(messages=message_list)
from src.llm_models.utils_model import LLMRequest
from src.config.config import model_config
bot_name = global_config.bot.nickname
confirmation_prompt = f"""# 主动回复二次确认
## 基本信息
你的名字是{bot_name},准备主动发起关于"{topic}"的话题。
## 最近的聊天内容
{chat_context_block}
## 合理判断标准
请检查以下条件,如果**所有条件都合理**就可以回复:
1. **回应检查**:检查你({bot_name})发送的最后一条消息之后,是否有其他人发言。如果没有,则大概率应该保持沉默。
2. **话题补充**:只有当你认为准备发起的话题是对上一条无人回应消息的**有价值的补充**时,才可以在上一条消息无人回应的情况下继续发言。
3. **时间合理性**当前时间是否在深夜凌晨2点-6点这种不适合主动聊天的时段
4. **内容价值**:这个话题"{topic}"是否有意义,不是完全无关紧要的内容?
5. **重复避免**:你准备说的话题是否与你自己的上一条消息明显重复?
6. **自然性**:在当前上下文中主动提起这个话题是否自然合理?
## 输出要求
如果判断应该跳过比如上一条消息无人回应、深夜时段、无意义话题、重复内容输出SKIP_PROACTIVE_REPLY
其他情况都应该输出PROCEED_TO_REPLY
请严格按照上述格式输出,不要添加任何解释。"""
planner_llm = LLMRequest(
model_set=model_config.model_task_config.planner,
request_type="planner"
)
confirmation_result, _ = await planner_llm.generate_response_async(prompt=confirmation_prompt)
if not confirmation_result or "SKIP_PROACTIVE_REPLY" in confirmation_result:
logger.info(f"{self.context.log_prefix} 决策模型二次确认决定跳过主动回复")
return
bot_name = global_config.bot.nickname
personality = global_config.personality
identity_block = (
f"你的名字是{bot_name}\n"
f"关于你:{personality.personality_core},并且{personality.personality_side}\n"
f"你的身份是{personality.identity},平时说话风格是{personality.reply_style}"
)
mood_block = f"你现在的心情是:{mood_manager.get_mood_by_chat_id(self.context.stream_id).mood_state}"
final_prompt = f"""
## 你的角色
{identity_block}
## 你的心情
{mood_block}
## 你今天的日程安排
{schedule_block}
## 关于你准备讨论的话题"{topic}"的最新信息
{news_block}
## 最近的聊天内容
{chat_context_block}
## 任务
你现在想要主动说些什么。话题是"{topic}",但这只是一个参考方向。
根据最近的聊天内容,你可以:
- 如果是想关心朋友,就自然地询问他们的情况
- 如果想起了之前的话题,就问问后来怎么样了
- 如果有什么想分享的想法,就自然地开启话题
- 如果只是想闲聊,就随意地说些什么
## 要求
- 像真正的朋友一样,自然地表达关心或好奇
- 不要过于正式,要口语化和亲切
- 结合你的角色设定,保持温暖的风格
- 直接输出你想说的话,不要解释为什么要说
请输出一条简短、自然的主动发言。
"""
response_text = await generator_api.generate_response_custom(
chat_stream=self.context.chat_stream,
prompt=final_prompt,
request_type="chat.replyer.proactive",
)
if response_text:
response_set = process_human_text(
content=response_text,
enable_splitter=global_config.response_splitter.enable,
enable_chinese_typo=global_config.chinese_typo.enable,
)
await self.cycle_processor.response_handler.send_response(
response_set, time.time(), action_result.get("action_message")
)
await store_action_info(
chat_stream=self.context.chat_stream,
action_name="proactive_reply",
action_data={"topic": topic, "response": response_text},
action_prompt_display=f"主动发起对话: {topic}",
action_done=True,
)
else:
logger.error(f"{self.context.log_prefix} 主动思考生成回复失败。")
except Exception as e:
logger.error(f"{self.context.log_prefix} 生成主动回复内容时异常: {e}")
logger.error(traceback.format_exc())

View File

@@ -1,184 +0,0 @@
import time
import random
from typing import Dict, Any, Tuple
from src.common.logger import get_logger
from src.plugin_system.apis import send_api, message_api, database_api
from src.person_info.person_info import get_person_info_manager
from .hfc_context import HfcContext
# 导入反注入系统
# 日志记录器
logger = get_logger("hfc")
anti_injector_logger = get_logger("anti_injector")
class ResponseHandler:
"""
响应处理器类,负责生成和发送机器人的回复。
"""
def __init__(self, context: HfcContext):
"""
初始化响应处理器
Args:
context: HFC聊天上下文对象
功能说明:
- 负责生成和发送机器人的回复
- 处理回复的格式化和发送逻辑
- 管理回复状态和日志记录
"""
self.context = context
async def generate_and_send_reply(
self,
response_set,
reply_to_str,
loop_start_time,
action_message,
cycle_timers: Dict[str, float],
thinking_id,
plan_result,
) -> Tuple[Dict[str, Any], str, Dict[str, float]]:
"""
生成并发送回复的主方法
Args:
response_set: 生成的回复内容集合
reply_to_str: 回复目标字符串
loop_start_time: 循环开始时间
action_message: 动作消息数据
cycle_timers: 循环计时器
thinking_id: 思考ID
plan_result: 规划结果
Returns:
tuple: (循环信息, 回复文本, 计时器信息)
功能说明:
- 发送生成的回复内容
- 存储动作信息到数据库
- 构建并返回完整的循环信息
- 用于上级方法的状态跟踪
"""
reply_text = await self.send_response(response_set, loop_start_time, action_message)
person_info_manager = get_person_info_manager()
# 获取平台信息
platform = "default"
if self.context.chat_stream:
platform = (
action_message.get("chat_info_platform")
or action_message.get("user_platform")
or self.context.chat_stream.platform
)
# 获取用户信息并生成回复提示
user_id = action_message.get("user_id", "")
person_id = person_info_manager.get_person_id(platform, user_id)
person_name = await person_info_manager.get_value(person_id, "person_name")
action_prompt_display = f"你对{person_name}进行了回复:{reply_text}"
# 存储动作信息到数据库
await database_api.store_action_info(
chat_stream=self.context.chat_stream,
action_build_into_prompt=False,
action_prompt_display=action_prompt_display,
action_done=True,
thinking_id=thinking_id,
action_data={"reply_text": reply_text, "reply_to": reply_to_str},
action_name="reply",
)
# 构建循环信息
loop_info: Dict[str, Any] = {
"loop_plan_info": {
"action_result": plan_result.get("action_result", {}),
},
"loop_action_info": {
"action_taken": True,
"reply_text": reply_text,
"command": "",
"taken_time": time.time(),
},
}
return loop_info, reply_text, cycle_timers
async def send_response(self, reply_set, thinking_start_time, message_data) -> str:
"""
发送回复内容的具体实现
Args:
reply_set: 回复内容集合,包含多个回复段
reply_to: 回复目标
thinking_start_time: 思考开始时间
message_data: 消息数据
Returns:
str: 完整的回复文本
功能说明:
- 检查是否有新消息需要回复
- 处理主动思考的"沉默"决定
- 根据消息数量决定是否添加回复引用
- 逐段发送回复内容,支持打字效果
- 正确处理元组格式的回复段
"""
current_time = time.time()
# 计算新消息数量
new_message_count = message_api.count_new_messages(
chat_id=self.context.stream_id, start_time=thinking_start_time, end_time=current_time
)
# 根据新消息数量决定是否需要引用回复
need_reply = new_message_count >= random.randint(2, 4)
reply_text = ""
is_proactive_thinking = (message_data.get("message_type") == "proactive_thinking") if message_data else True
first_replied = False
for reply_seg in reply_set:
# 调试日志验证reply_seg的格式
logger.debug(f"Processing reply_seg type: {type(reply_seg)}, content: {reply_seg}")
# 修正:正确处理元组格式 (格式为: (type, content))
if isinstance(reply_seg, tuple) and len(reply_seg) >= 2:
_, data = reply_seg
else:
# 向下兼容:如果已经是字符串,则直接使用
data = str(reply_seg)
if isinstance(data, list):
data = "".join(map(str, data))
reply_text += data
# 如果是主动思考且内容为“沉默”,则不发送
if is_proactive_thinking and data.strip() == "沉默":
logger.info(f"{self.context.log_prefix} 主动思考决定保持沉默,不发送消息")
continue
# 发送第一段回复
if not first_replied:
await send_api.text_to_stream(
text=data,
stream_id=self.context.stream_id,
reply_to_message=message_data,
set_reply=need_reply,
typing=False,
)
first_replied = True
else:
# 发送后续回复
sent_message = await send_api.text_to_stream(
text=data,
stream_id=self.context.stream_id,
reply_to_message=None,
set_reply=False,
typing=True,
)
return reply_text

View File

@@ -1,32 +0,0 @@
from src.common.logger import get_logger
from ..hfc_context import HfcContext
logger = get_logger("notification_sender")
class NotificationSender:
@staticmethod
async def send_goodnight_notification(context: HfcContext):
"""发送晚安通知"""
try:
from ..proactive.events import ProactiveTriggerEvent
from ..proactive.proactive_thinker import ProactiveThinker
event = ProactiveTriggerEvent(source="sleep_manager", reason="goodnight")
proactive_thinker = ProactiveThinker(context, context.chat_instance.cycle_processor)
await proactive_thinker.think(event)
except Exception as e:
logger.error(f"发送晚安通知失败: {e}")
@staticmethod
async def send_insomnia_notification(context: HfcContext, reason: str):
"""发送失眠通知"""
try:
from ..proactive.events import ProactiveTriggerEvent
from ..proactive.proactive_thinker import ProactiveThinker
event = ProactiveTriggerEvent(source="sleep_manager", reason=reason)
proactive_thinker = ProactiveThinker(context, context.chat_instance.cycle_processor)
await proactive_thinker.think(event)
except Exception as e:
logger.error(f"发送失眠通知失败: {e}")

View File

@@ -1,304 +0,0 @@
import asyncio
import random
from datetime import datetime, timedelta, date
from typing import Optional, TYPE_CHECKING
from src.common.logger import get_logger
from src.config.config import global_config
from .sleep_state import SleepState, SleepStateSerializer
from .time_checker import TimeChecker
from .notification_sender import NotificationSender
if TYPE_CHECKING:
from .wakeup_manager import WakeUpManager
logger = get_logger("sleep_manager")
class SleepManager:
"""
睡眠管理器,核心组件之一,负责管理角色的睡眠周期和状态转换。
它实现了一个状态机,根据预设的时间表、睡眠压力和随机因素,
在不同的睡眠状态(如清醒、准备入睡、睡眠、失眠)之间进行切换。
"""
def __init__(self):
"""
初始化睡眠管理器。
"""
self.time_checker = TimeChecker() # 时间检查器,用于判断当前是否处于理论睡眠时间
self.last_sleep_log_time = 0 # 上次记录睡眠日志的时间戳
self.sleep_log_interval = 35 # 睡眠日志记录间隔(秒)
# --- 统一睡眠状态管理 ---
self._current_state: SleepState = SleepState.AWAKE # 当前睡眠状态
self._sleep_buffer_end_time: Optional[datetime] = None # 睡眠缓冲结束时间,用于状态转换
self._total_delayed_minutes_today: float = 0.0 # 今天总共延迟入睡的分钟数
self._last_sleep_check_date: Optional[date] = None # 上次检查睡眠状态的日期
self._last_fully_slept_log_time: float = 0 # 上次完全进入睡眠状态的时间戳
self._re_sleep_attempt_time: Optional[datetime] = None # 被吵醒后,尝试重新入睡的时间点
# 从本地存储加载上一次的睡眠状态
self._load_sleep_state()
def get_current_sleep_state(self) -> SleepState:
"""获取当前的睡眠状态。"""
return self._current_state
def is_sleeping(self) -> bool:
"""判断当前是否处于正在睡觉的状态。"""
return self._current_state == SleepState.SLEEPING
async def update_sleep_state(self, wakeup_manager: Optional["WakeUpManager"] = None):
"""
更新睡眠状态的核心方法,实现状态机的主要逻辑。
该方法会被周期性调用,以检查并更新当前的睡眠状态。
Args:
wakeup_manager (Optional["WakeUpManager"]): 唤醒管理器,用于获取睡眠压力等上下文信息。
"""
# 如果全局禁用了睡眠系统,则强制设置为清醒状态并返回
if not global_config.sleep_system.enable:
if self._current_state != SleepState.AWAKE:
logger.debug("睡眠系统禁用,强制设为 AWAKE")
self._current_state = SleepState.AWAKE
return
now = datetime.now()
today = now.date()
# 跨天处理:如果日期变化,重置每日相关的睡眠状态
if self._last_sleep_check_date != today:
logger.info(f"新的一天 ({today}),重置睡眠状态。")
self._total_delayed_minutes_today = 0
self._current_state = SleepState.AWAKE
self._sleep_buffer_end_time = None
self._last_sleep_check_date = today
self._save_sleep_state()
# 检查当前是否处于理论上的睡眠时间段
is_in_theoretical_sleep, activity = self.time_checker.is_in_theoretical_sleep_time(now.time())
# --- 状态机核心处理逻辑 ---
if self._current_state == SleepState.AWAKE:
if is_in_theoretical_sleep:
self._handle_awake_to_sleep(now, activity, wakeup_manager)
elif self._current_state == SleepState.PREPARING_SLEEP:
self._handle_preparing_sleep(now, is_in_theoretical_sleep, wakeup_manager)
elif self._current_state == SleepState.SLEEPING:
self._handle_sleeping(now, is_in_theoretical_sleep, activity, wakeup_manager)
elif self._current_state == SleepState.INSOMNIA:
self._handle_insomnia(now, is_in_theoretical_sleep)
elif self._current_state == SleepState.WOKEN_UP:
self._handle_woken_up(now, is_in_theoretical_sleep, wakeup_manager)
def _handle_awake_to_sleep(self, now: datetime, activity: Optional[str], wakeup_manager: Optional["WakeUpManager"]):
"""处理从“清醒”到“准备入睡”的状态转换。"""
if activity:
logger.info(f"进入理论休眠时间 '{activity}',开始进行睡眠决策...")
else:
logger.info("进入理论休眠时间,开始进行睡眠决策...")
if global_config.sleep_system.enable_flexible_sleep:
# --- 新的弹性睡眠逻辑 ---
if wakeup_manager:
sleep_pressure = wakeup_manager.context.sleep_pressure
pressure_threshold = global_config.sleep_system.flexible_sleep_pressure_threshold
max_delay_minutes = global_config.sleep_system.max_sleep_delay_minutes
buffer_seconds = 0
# 如果睡眠压力低于阈值,则计算延迟时间
if sleep_pressure <= pressure_threshold:
# 压力差,归一化到 (0, 1]
pressure_diff = (pressure_threshold - sleep_pressure) / pressure_threshold
# 延迟分钟数,压力越低,延迟越长
delay_minutes = int(pressure_diff * max_delay_minutes)
# 确保总延迟不超过当日最大值
remaining_delay = max_delay_minutes - self._total_delayed_minutes_today
delay_minutes = min(delay_minutes, remaining_delay)
if delay_minutes > 0:
# 增加一些随机性
buffer_seconds = random.randint(int(delay_minutes * 0.8 * 60), int(delay_minutes * 1.2 * 60))
self._total_delayed_minutes_today += buffer_seconds / 60.0
logger.info(f"睡眠压力 ({sleep_pressure:.1f}) 较低,延迟 {buffer_seconds / 60:.1f} 分钟入睡。")
else:
# 延迟额度已用完,设置一个较短的准备时间
buffer_seconds = random.randint(1 * 60, 2 * 60)
logger.info("今日延迟入睡额度已用完,进入短暂准备后入睡。")
else:
# 睡眠压力较高,设置一个较短的准备时间
buffer_seconds = random.randint(1 * 60, 2 * 60)
logger.info(f"睡眠压力 ({sleep_pressure:.1f}) 较高,将在短暂准备后入睡。")
# 发送睡前通知
if global_config.sleep_system.enable_pre_sleep_notification:
asyncio.create_task(NotificationSender.send_goodnight_notification(wakeup_manager.context))
self._sleep_buffer_end_time = now + timedelta(seconds=buffer_seconds)
self._current_state = SleepState.PREPARING_SLEEP
logger.info(f"进入准备入睡状态,将在 {buffer_seconds / 60:.1f} 分钟内入睡。")
self._save_sleep_state()
else:
# 无法获取 wakeup_manager退回旧逻辑
buffer_seconds = random.randint(1 * 60, 3 * 60)
self._sleep_buffer_end_time = now + timedelta(seconds=buffer_seconds)
self._current_state = SleepState.PREPARING_SLEEP
logger.warning("无法获取 WakeUpManager弹性睡眠采用默认1-3分钟延迟。")
self._save_sleep_state()
else:
# 非弹性睡眠模式
if wakeup_manager and global_config.sleep_system.enable_pre_sleep_notification:
asyncio.create_task(NotificationSender.send_goodnight_notification(wakeup_manager.context))
self._current_state = SleepState.SLEEPING
def _handle_preparing_sleep(self, now: datetime, is_in_theoretical_sleep: bool, wakeup_manager: Optional["WakeUpManager"]):
"""处理“准备入睡”状态下的逻辑。"""
# 如果在准备期间离开了理论睡眠时间,则取消入睡
if not is_in_theoretical_sleep:
logger.info("准备入睡期间离开理论休眠时间,取消入睡,恢复清醒。")
self._current_state = SleepState.AWAKE
self._sleep_buffer_end_time = None
self._save_sleep_state()
# 如果缓冲时间结束,则正式进入睡眠状态
elif self._sleep_buffer_end_time and now >= self._sleep_buffer_end_time:
logger.info("睡眠缓冲期结束,正式进入休眠状态。")
self._current_state = SleepState.SLEEPING
self._last_fully_slept_log_time = now.timestamp()
# 设置一个随机的延迟,用于触发“睡后失眠”检查
delay_minutes_range = global_config.sleep_system.insomnia_trigger_delay_minutes
delay_minutes = random.randint(delay_minutes_range[0], delay_minutes_range[1])
self._sleep_buffer_end_time = now + timedelta(minutes=delay_minutes)
logger.info(f"已设置睡后失眠检查,将在 {delay_minutes} 分钟后触发。")
self._save_sleep_state()
def _handle_sleeping(self, now: datetime, is_in_theoretical_sleep: bool, activity: Optional[str], wakeup_manager: Optional["WakeUpManager"]):
"""处理“正在睡觉”状态下的逻辑。"""
# 如果理论睡眠时间结束,则自然醒来
if not is_in_theoretical_sleep:
logger.info("理论休眠时间结束,自然醒来。")
self._current_state = SleepState.AWAKE
self._save_sleep_state()
# 检查是否到了触发“睡后失眠”的时间点
elif self._sleep_buffer_end_time and now >= self._sleep_buffer_end_time:
if wakeup_manager:
sleep_pressure = wakeup_manager.context.sleep_pressure
pressure_threshold = global_config.sleep_system.flexible_sleep_pressure_threshold
# 检查是否触发失眠
insomnia_reason = None
if sleep_pressure < pressure_threshold:
insomnia_reason = "low_pressure"
logger.info(f"睡眠压力 ({sleep_pressure:.1f}) 低于阈值 ({pressure_threshold}),触发睡后失眠。")
elif random.random() < getattr(global_config.sleep_system, "random_insomnia_chance", 0.1):
insomnia_reason = "random"
logger.info("随机触发失眠。")
if insomnia_reason:
self._current_state = SleepState.INSOMNIA
# 设置失眠的持续时间
duration_minutes_range = global_config.sleep_system.insomnia_duration_minutes
duration_minutes = random.randint(*duration_minutes_range)
self._sleep_buffer_end_time = now + timedelta(minutes=duration_minutes)
# 发送失眠通知
asyncio.create_task(NotificationSender.send_insomnia_notification(wakeup_manager.context, insomnia_reason))
logger.info(f"进入失眠状态 (原因: {insomnia_reason}),将持续 {duration_minutes} 分钟。")
else:
# 睡眠压力正常,不触发失眠,清除检查时间点
logger.info(f"睡眠压力 ({sleep_pressure:.1f}) 正常,未触发睡后失眠。")
self._sleep_buffer_end_time = None
self._save_sleep_state()
else:
# 定期记录睡眠日志
current_timestamp = now.timestamp()
if current_timestamp - self.last_sleep_log_time > self.sleep_log_interval and activity:
logger.info(f"当前处于休眠活动 '{activity}' 中。")
self.last_sleep_log_time = current_timestamp
def _handle_insomnia(self, now: datetime, is_in_theoretical_sleep: bool):
"""处理“失眠”状态下的逻辑。"""
# 如果离开理论睡眠时间,则失眠结束
if not is_in_theoretical_sleep:
logger.info("已离开理论休眠时间,失眠结束,恢复清醒。")
self._current_state = SleepState.AWAKE
self._sleep_buffer_end_time = None
self._save_sleep_state()
# 如果失眠持续时间已过,则恢复睡眠
elif self._sleep_buffer_end_time and now >= self._sleep_buffer_end_time:
logger.info("失眠状态持续时间已过,恢复睡眠。")
self._current_state = SleepState.SLEEPING
self._sleep_buffer_end_time = None
self._save_sleep_state()
def _handle_woken_up(self, now: datetime, is_in_theoretical_sleep: bool, wakeup_manager: Optional["WakeUpManager"]):
"""处理“被吵醒”状态下的逻辑。"""
# 如果理论睡眠时间结束,则状态自动结束
if not is_in_theoretical_sleep:
logger.info("理论休眠时间结束,被吵醒的状态自动结束。")
self._current_state = SleepState.AWAKE
self._re_sleep_attempt_time = None
self._save_sleep_state()
# 到了尝试重新入睡的时间点
elif self._re_sleep_attempt_time and now >= self._re_sleep_attempt_time:
logger.info("被吵醒后经过一段时间,尝试重新入睡...")
if wakeup_manager:
sleep_pressure = wakeup_manager.context.sleep_pressure
pressure_threshold = global_config.sleep_system.flexible_sleep_pressure_threshold
# 如果睡眠压力足够,则尝试重新入睡
if sleep_pressure >= pressure_threshold:
logger.info("睡眠压力足够,从被吵醒状态转换到准备入睡。")
buffer_seconds = random.randint(3 * 60, 8 * 60)
self._sleep_buffer_end_time = now + timedelta(seconds=buffer_seconds)
self._current_state = SleepState.PREPARING_SLEEP
self._re_sleep_attempt_time = None
else:
# 睡眠压力不足,延迟一段时间后再次尝试
delay_minutes = 15
self._re_sleep_attempt_time = now + timedelta(minutes=delay_minutes)
logger.info(
f"睡眠压力({sleep_pressure:.1f})仍然较低,暂时保持清醒,在 {delay_minutes} 分钟后再次尝试。"
)
self._save_sleep_state()
def reset_sleep_state_after_wakeup(self):
"""
当角色被用户消息等外部因素唤醒时调用此方法。
将状态强制转换为 WOKEN_UP并设置一个延迟之后会尝试重新入睡。
"""
if self._current_state in [SleepState.PREPARING_SLEEP, SleepState.SLEEPING, SleepState.INSOMNIA]:
logger.info("被唤醒,进入 WOKEN_UP 状态!")
self._current_state = SleepState.WOKEN_UP
self._sleep_buffer_end_time = None
re_sleep_delay_minutes = getattr(global_config.sleep_system, "re_sleep_delay_minutes", 10)
self._re_sleep_attempt_time = datetime.now() + timedelta(minutes=re_sleep_delay_minutes)
logger.info(f"将在 {re_sleep_delay_minutes} 分钟后尝试重新入睡。")
self._save_sleep_state()
def _save_sleep_state(self):
"""将当前所有睡眠相关的状态打包并保存到本地存储。"""
state_data = {
"_current_state": self._current_state,
"_sleep_buffer_end_time": self._sleep_buffer_end_time,
"_total_delayed_minutes_today": self._total_delayed_minutes_today,
"_last_sleep_check_date": self._last_sleep_check_date,
"_re_sleep_attempt_time": self._re_sleep_attempt_time,
}
SleepStateSerializer.save(state_data)
def _load_sleep_state(self):
"""从本地存储加载并恢复所有睡眠相关的状态。"""
state_data = SleepStateSerializer.load()
self._current_state = state_data["_current_state"]
self._sleep_buffer_end_time = state_data["_sleep_buffer_end_time"]
self._total_delayed_minutes_today = state_data["_total_delayed_minutes_today"]
self._last_sleep_check_date = state_data["_last_sleep_check_date"]
self._re_sleep_attempt_time = state_data["_re_sleep_attempt_time"]

View File

@@ -1,110 +0,0 @@
from enum import Enum, auto
from datetime import datetime
from src.common.logger import get_logger
from src.manager.local_store_manager import local_storage
logger = get_logger("sleep_state")
class SleepState(Enum):
"""
定义了角色可能处于的几种睡眠状态。
这是一个状态机,用于管理角色的睡眠周期。
"""
AWAKE = auto() # 清醒状态
INSOMNIA = auto() # 失眠状态
PREPARING_SLEEP = auto() # 准备入睡状态,一个短暂的过渡期
SLEEPING = auto() # 正在睡觉状态
WOKEN_UP = auto() # 被吵醒状态
class SleepStateSerializer:
"""
睡眠状态序列化器。
负责将内存中的睡眠状态对象持久化到本地存储如JSON文件
以及在程序启动时从本地存储中恢复状态。
这样可以确保即使程序重启,角色的睡眠状态也能得以保留。
"""
@staticmethod
def save(state_data: dict):
"""
将当前的睡眠状态数据保存到本地存储。
Args:
state_data (dict): 包含睡眠状态信息的字典。
datetime对象会被转换为时间戳Enum成员会被转换为其名称字符串。
"""
try:
# 准备要序列化的数据字典
state = {
# 保存当前状态的枚举名称
"current_state": state_data["_current_state"].name,
# 将datetime对象转换为Unix时间戳以便序列化
"sleep_buffer_end_time_ts": state_data["_sleep_buffer_end_time"].timestamp()
if state_data["_sleep_buffer_end_time"]
else None,
"total_delayed_minutes_today": state_data["_total_delayed_minutes_today"],
# 将date对象转换为ISO格式的字符串
"last_sleep_check_date_str": state_data["_last_sleep_check_date"].isoformat()
if state_data["_last_sleep_check_date"]
else None,
"re_sleep_attempt_time_ts": state_data["_re_sleep_attempt_time"].timestamp()
if state_data["_re_sleep_attempt_time"]
else None,
}
# 写入本地存储
local_storage["schedule_sleep_state"] = state
logger.debug(f"已保存睡眠状态: {state}")
except Exception as e:
logger.error(f"保存睡眠状态失败: {e}")
@staticmethod
def load() -> dict:
"""
从本地存储加载并解析睡眠状态。
Returns:
dict: 包含恢复后睡眠状态信息的字典。
如果加载失败或没有找到数据,则返回一个默认的清醒状态。
"""
# 定义一个默认的状态,以防加载失败
state_data = {
"_current_state": SleepState.AWAKE,
"_sleep_buffer_end_time": None,
"_total_delayed_minutes_today": 0,
"_last_sleep_check_date": None,
"_re_sleep_attempt_time": None,
}
try:
# 从本地存储读取数据
state = local_storage["schedule_sleep_state"]
if state and isinstance(state, dict):
# 恢复当前状态枚举
state_name = state.get("current_state")
if state_name and hasattr(SleepState, state_name):
state_data["_current_state"] = SleepState[state_name]
# 从时间戳恢复datetime对象
end_time_ts = state.get("sleep_buffer_end_time_ts")
if end_time_ts:
state_data["_sleep_buffer_end_time"] = datetime.fromtimestamp(end_time_ts)
# 恢复重新入睡尝试时间
re_sleep_ts = state.get("re_sleep_attempt_time_ts")
if re_sleep_ts:
state_data["_re_sleep_attempt_time"] = datetime.fromtimestamp(re_sleep_ts)
# 恢复今日延迟睡眠总分钟数
state_data["_total_delayed_minutes_today"] = state.get("total_delayed_minutes_today", 0)
# 从ISO格式字符串恢复date对象
date_str = state.get("last_sleep_check_date_str")
if date_str:
state_data["_last_sleep_check_date"] = datetime.fromisoformat(date_str).date()
logger.info(f"成功从本地存储加载睡眠状态: {state}")
except Exception as e:
# 如果加载过程中出现任何问题,记录警告并返回默认状态
logger.warning(f"加载睡眠状态失败,将使用默认值: {e}")
return state_data

View File

@@ -1,108 +0,0 @@
from datetime import datetime, time, timedelta
from typing import Optional, List, Dict, Any
import random
from src.common.logger import get_logger
from src.config.config import global_config
from src.schedule.schedule_manager import schedule_manager
logger = get_logger("time_checker")
class TimeChecker:
def __init__(self):
# 缓存当天的偏移量,确保一天内使用相同的偏移量
self._daily_sleep_offset: int = 0
self._daily_wake_offset: int = 0
self._offset_date = None
def _get_daily_offsets(self):
"""获取当天的睡眠和起床时间偏移量,每天生成一次"""
today = datetime.now().date()
# 如果是新的一天,重新生成偏移量
if self._offset_date != today:
sleep_offset_range = global_config.sleep_system.sleep_time_offset_minutes
wake_offset_range = global_config.sleep_system.wake_up_time_offset_minutes
# 生成 ±offset_range 范围内的随机偏移量
self._daily_sleep_offset = random.randint(-sleep_offset_range, sleep_offset_range)
self._daily_wake_offset = random.randint(-wake_offset_range, wake_offset_range)
self._offset_date = today
logger.debug(f"生成新的每日偏移量 - 睡觉时间偏移: {self._daily_sleep_offset}分钟, 起床时间偏移: {self._daily_wake_offset}分钟")
return self._daily_sleep_offset, self._daily_wake_offset
def get_today_schedule(self) -> Optional[List[Dict[str, Any]]]:
"""从全局 ScheduleManager 获取今天的日程安排。"""
return schedule_manager.today_schedule
def is_in_theoretical_sleep_time(self, now_time: time) -> tuple[bool, Optional[str]]:
if global_config.sleep_system.sleep_by_schedule:
if self.get_today_schedule():
return self._is_in_schedule_sleep_time(now_time)
else:
return self._is_in_sleep_time(now_time)
else:
return self._is_in_sleep_time(now_time)
def _is_in_schedule_sleep_time(self, now_time: time) -> tuple[bool, Optional[str]]:
"""检查当前时间是否落在日程表的任何一个睡眠活动中"""
sleep_keywords = ["休眠", "睡觉", "梦乡"]
today_schedule = self.get_today_schedule()
if today_schedule:
for event in today_schedule:
try:
activity = event.get("activity", "").strip()
time_range = event.get("time_range")
if not activity or not time_range:
continue
if any(keyword in activity for keyword in sleep_keywords):
start_str, end_str = time_range.split("-")
start_time = datetime.strptime(start_str.strip(), "%H:%M").time()
end_time = datetime.strptime(end_str.strip(), "%H:%M").time()
if start_time <= end_time: # 同一天
if start_time <= now_time < end_time:
return True, activity
else: # 跨天
if now_time >= start_time or now_time < end_time:
return True, activity
except (ValueError, KeyError, AttributeError) as e:
logger.warning(f"解析日程事件时出错: {event}, 错误: {e}")
continue
return False, None
def _is_in_sleep_time(self, now_time: time) -> tuple[bool, Optional[str]]:
"""检查当前时间是否在固定的睡眠时间内(应用偏移量)"""
try:
start_time_str = global_config.sleep_system.fixed_sleep_time
end_time_str = global_config.sleep_system.fixed_wake_up_time
# 获取当天的偏移量
sleep_offset, wake_offset = self._get_daily_offsets()
# 解析基础时间
base_start_time = datetime.strptime(start_time_str, "%H:%M")
base_end_time = datetime.strptime(end_time_str, "%H:%M")
# 应用偏移量
actual_start_time = (base_start_time + timedelta(minutes=sleep_offset)).time()
actual_end_time = (base_end_time + timedelta(minutes=wake_offset)).time()
logger.debug(f"固定睡眠时间检查 - 基础时间: {start_time_str}-{end_time_str}, "
f"偏移后时间: {actual_start_time.strftime('%H:%M')}-{actual_end_time.strftime('%H:%M')}, "
f"当前时间: {now_time.strftime('%H:%M')}")
if actual_start_time <= actual_end_time:
if actual_start_time <= now_time < actual_end_time:
return True, f"固定睡眠时间(偏移后: {actual_start_time.strftime('%H:%M')}-{actual_end_time.strftime('%H:%M')})"
else:
if now_time >= actual_start_time or now_time < actual_end_time:
return True, f"固定睡眠时间(偏移后: {actual_start_time.strftime('%H:%M')}-{actual_end_time.strftime('%H:%M')})"
except ValueError as e:
logger.error(f"固定的睡眠时间格式不正确,请使用 HH:MM 格式: {e}")
return False, None

View File

@@ -1,232 +0,0 @@
import asyncio
import time
from typing import Optional
from src.common.logger import get_logger
from src.config.config import global_config
from src.manager.local_store_manager import local_storage
from ..hfc_context import HfcContext
logger = get_logger("wakeup")
class WakeUpManager:
def __init__(self, context: HfcContext):
"""
初始化唤醒度管理器
Args:
context: HFC聊天上下文对象
功能说明:
- 管理休眠状态下的唤醒度累积
- 处理唤醒度的自然衰减
- 控制愤怒状态的持续时间
"""
self.context = context
self.wakeup_value = 0.0 # 当前唤醒度
self.is_angry = False # 是否处于愤怒状态
self.angry_start_time = 0.0 # 愤怒状态开始时间
self.last_decay_time = time.time() # 上次衰减时间
self._decay_task: Optional[asyncio.Task] = None
self.last_log_time = 0
self.log_interval = 30
# 从配置文件获取参数
sleep_config = global_config.sleep_system
self.wakeup_threshold = sleep_config.wakeup_threshold
self.private_message_increment = sleep_config.private_message_increment
self.group_mention_increment = sleep_config.group_mention_increment
self.decay_rate = sleep_config.decay_rate
self.decay_interval = sleep_config.decay_interval
self.angry_duration = sleep_config.angry_duration
self.enabled = sleep_config.enable
self.angry_prompt = sleep_config.angry_prompt
self._load_wakeup_state()
def _get_storage_key(self) -> str:
"""获取当前聊天流的本地存储键"""
return f"wakeup_manager_state_{self.context.stream_id}"
def _load_wakeup_state(self):
"""从本地存储加载状态"""
state = local_storage[self._get_storage_key()]
if state and isinstance(state, dict):
self.wakeup_value = state.get("wakeup_value", 0.0)
self.is_angry = state.get("is_angry", False)
self.angry_start_time = state.get("angry_start_time", 0.0)
logger.info(f"{self.context.log_prefix} 成功从本地存储加载唤醒状态: {state}")
else:
logger.info(f"{self.context.log_prefix} 未找到本地唤醒状态,将使用默认值初始化。")
def _save_wakeup_state(self):
"""将当前状态保存到本地存储"""
state = {
"wakeup_value": self.wakeup_value,
"is_angry": self.is_angry,
"angry_start_time": self.angry_start_time,
}
local_storage[self._get_storage_key()] = state
logger.debug(f"{self.context.log_prefix} 已将唤醒状态保存到本地存储: {state}")
async def start(self):
"""启动唤醒度管理器"""
if not self.enabled:
logger.info(f"{self.context.log_prefix} 唤醒度系统已禁用,跳过启动")
return
if not self._decay_task:
self._decay_task = asyncio.create_task(self._decay_loop())
self._decay_task.add_done_callback(self._handle_decay_completion)
logger.info(f"{self.context.log_prefix} 唤醒度管理器已启动")
async def stop(self):
"""停止唤醒度管理器"""
if self._decay_task and not self._decay_task.done():
self._decay_task.cancel()
await asyncio.sleep(0)
logger.info(f"{self.context.log_prefix} 唤醒度管理器已停止")
def _handle_decay_completion(self, task: asyncio.Task):
"""处理衰减任务完成"""
try:
if exception := task.exception():
logger.error(f"{self.context.log_prefix} 唤醒度衰减任务异常: {exception}")
else:
logger.info(f"{self.context.log_prefix} 唤醒度衰减任务正常结束")
except asyncio.CancelledError:
logger.info(f"{self.context.log_prefix} 唤醒度衰减任务被取消")
async def _decay_loop(self):
"""唤醒度衰减循环"""
while self.context.running:
await asyncio.sleep(self.decay_interval)
current_time = time.time()
# 检查愤怒状态是否过期
if self.is_angry and current_time - self.angry_start_time >= self.angry_duration:
self.is_angry = False
# 通知情绪管理系统清除愤怒状态
from src.mood.mood_manager import mood_manager
mood_manager.clear_angry_from_wakeup(self.context.stream_id)
logger.info(f"{self.context.log_prefix} 愤怒状态结束,恢复正常")
self._save_wakeup_state()
# 唤醒度自然衰减
if self.wakeup_value > 0:
old_value = self.wakeup_value
self.wakeup_value = max(0, self.wakeup_value - self.decay_rate)
if old_value != self.wakeup_value:
logger.debug(f"{self.context.log_prefix} 唤醒度衰减: {old_value:.1f} -> {self.wakeup_value:.1f}")
self._save_wakeup_state()
def add_wakeup_value(self, is_private_chat: bool, is_mentioned: bool = False) -> bool:
"""
增加唤醒度值
Args:
is_private_chat: 是否为私聊
is_mentioned: 是否被艾特(仅群聊有效)
Returns:
bool: 是否达到唤醒阈值
"""
# 如果系统未启用,直接返回
if not self.enabled:
return False
# 只有在休眠且非失眠状态下才累积唤醒度
from .sleep_state import SleepState
sleep_manager = self.context.sleep_manager
if not sleep_manager:
return False
current_sleep_state = sleep_manager.get_current_sleep_state()
if current_sleep_state != SleepState.SLEEPING:
return False
old_value = self.wakeup_value
if is_private_chat:
# 私聊每条消息都增加唤醒度
self.wakeup_value += self.private_message_increment
logger.debug(f"{self.context.log_prefix} 私聊消息增加唤醒度: +{self.private_message_increment}")
elif is_mentioned:
# 群聊只有被艾特才增加唤醒度
self.wakeup_value += self.group_mention_increment
logger.debug(f"{self.context.log_prefix} 群聊艾特增加唤醒度: +{self.group_mention_increment}")
else:
# 群聊未被艾特,不增加唤醒度
return False
current_time = time.time()
if current_time - self.last_log_time > self.log_interval:
logger.info(
f"{self.context.log_prefix} 唤醒度变化: {old_value:.1f} -> {self.wakeup_value:.1f} (阈值: {self.wakeup_threshold})"
)
self.last_log_time = current_time
else:
logger.debug(
f"{self.context.log_prefix} 唤醒度变化: {old_value:.1f} -> {self.wakeup_value:.1f} (阈值: {self.wakeup_threshold})"
)
# 检查是否达到唤醒阈值
if self.wakeup_value >= self.wakeup_threshold:
self._trigger_wakeup()
return True
self._save_wakeup_state()
return False
def _trigger_wakeup(self):
"""触发唤醒,进入愤怒状态"""
self.is_angry = True
self.angry_start_time = time.time()
self.wakeup_value = 0.0 # 重置唤醒度
self._save_wakeup_state()
# 通知情绪管理系统进入愤怒状态
from src.mood.mood_manager import mood_manager
mood_manager.set_angry_from_wakeup(self.context.stream_id)
# 通知SleepManager重置睡眠状态
if self.context.sleep_manager:
self.context.sleep_manager.reset_sleep_state_after_wakeup()
logger.info(f"{self.context.log_prefix} 唤醒度达到阈值({self.wakeup_threshold}),被吵醒进入愤怒状态!")
def get_angry_prompt_addition(self) -> str:
"""获取愤怒状态下的提示词补充"""
if self.is_angry:
return self.angry_prompt
return ""
def is_in_angry_state(self) -> bool:
"""检查是否处于愤怒状态"""
if self.is_angry:
current_time = time.time()
if current_time - self.angry_start_time >= self.angry_duration:
self.is_angry = False
# 通知情绪管理系统清除愤怒状态
from src.mood.mood_manager import mood_manager
mood_manager.clear_angry_from_wakeup(self.context.stream_id)
logger.info(f"{self.context.log_prefix} 愤怒状态自动过期")
return False
return self.is_angry
def get_status_info(self) -> dict:
"""获取当前状态信息"""
return {
"wakeup_value": self.wakeup_value,
"wakeup_threshold": self.wakeup_threshold,
"is_angry": self.is_angry,
"angry_remaining_time": max(0, self.angry_duration - (time.time() - self.angry_start_time))
if self.is_angry
else 0,
}

View File

@@ -20,9 +20,8 @@ from datetime import datetime
from typing import Dict, Optional
from src.common.logger import get_logger
from src.chat.chat_loop.proactive.events import ProactiveTriggerEvent
from src.chat.heart_flow.heartflow import heartflow
from src.chat.chat_loop.sleep_manager.sleep_manager import SleepManager
from src.chat.affinity_flow.afc_manager import afc_manager
# TODO: 需要重新实现主动思考和睡眠管理功能
from .analyzer import chat_frequency_analyzer
logger = get_logger("FrequencyBasedTrigger")
@@ -39,8 +38,8 @@ class FrequencyBasedTrigger:
一个周期性任务,根据聊天频率分析结果来触发主动思考。
"""
def __init__(self, sleep_manager: SleepManager):
self._sleep_manager = sleep_manager
def __init__(self):
# TODO: 需要重新实现睡眠管理器
self._task: Optional[asyncio.Task] = None
# 记录上次为用户触发的时间,用于冷却控制
# 格式: { "chat_id": timestamp }
@@ -53,14 +52,15 @@ class FrequencyBasedTrigger:
await asyncio.sleep(TRIGGER_CHECK_INTERVAL_SECONDS)
logger.debug("开始执行频率触发器检查...")
# 1. 检查角色是否清醒
if self._sleep_manager.is_sleeping():
logger.debug("角色正在睡眠,跳过本次频率触发检查。")
continue
# 1. TODO: 检查角色是否清醒 - 需要重新实现睡眠状态检查
# 暂时跳过睡眠检查
# if self._sleep_manager.is_sleeping():
# logger.debug("角色正在睡眠,跳过本次频率触发检查。")
# continue
# 2. 获取所有已知的聊天ID
# 【注意】这里我们假设所有 subheartflow 的 ID 就是 chat_id
all_chat_ids = list(heartflow.subheartflows.keys())
# 亲和力流系统中聊天ID直接从管理器获取
all_chat_ids = list(afc_manager.affinity_flow_chatters.keys())
if not all_chat_ids:
continue
@@ -75,25 +75,24 @@ class FrequencyBasedTrigger:
# 4. 检查当前是否是该用户的高峰聊天时间
if chat_frequency_analyzer.is_in_peak_time(chat_id, now):
sub_heartflow = await heartflow.get_or_create_subheartflow(chat_id)
if not sub_heartflow:
logger.warning(f"无法为 {chat_id} 获取或创建 sub_heartflow。")
# 5. 检查用户当前是否已有活跃的处理任务
# 亲和力流系统不直接提供循环状态,通过检查最后活动时间来判断是否忙碌
chatter = afc_manager.get_or_create_chatter(chat_id)
if not chatter:
logger.warning(f"无法为 {chat_id} 获取或创建亲和力聊天处理器。")
continue
# 5. 检查用户当前是否已有活跃的思考或回复任务
cycle_detail = sub_heartflow.heart_fc_instance.context.current_cycle_detail
if cycle_detail and not cycle_detail.end_time:
logger.debug(f"用户 {chat_id}聊天循环正忙(仍在周期 {cycle_detail.cycle_id} 中),本次不触发。")
# 检查是否在活跃状态最近1分钟内有活动
current_time = time.time()
if current_time - chatter.get_activity_time() < 60:
logger.debug(f"用户 {chat_id}亲和力处理器正忙,本次不触发。")
continue
logger.info(f"检测到用户 {chat_id} 处于聊天高峰期,且聊天循环空闲,准备触发主动思考。")
logger.info(f"检测到用户 {chat_id} 处于聊天高峰期,且处理器空闲,准备触发主动思考。")
# 6. 直接调用 proactive_thinker
event = ProactiveTriggerEvent(
source="frequency_analyzer",
reason="User is in a high-frequency chat period."
)
await sub_heartflow.heart_fc_instance.proactive_thinker.think(event)
# 6. TODO: 亲和力流系统的主动思考机制需要另行实现
# 目前先记录日志,等待后续实现
logger.info(f"用户 {chat_id} 处于高峰期,但亲和力流的主动思考功能暂未实现")
# 7. 更新触发时间,进入冷却
self._last_triggered[chat_id] = time.time()

View File

@@ -1,40 +0,0 @@
import traceback
from typing import Any, Optional, Dict
from src.common.logger import get_logger
from src.chat.heart_flow.sub_heartflow import SubHeartflow
from src.chat.message_receive.chat_stream import get_chat_manager
logger = get_logger("heartflow")
class Heartflow:
"""主心流协调器,负责初始化并协调聊天"""
def __init__(self):
self.subheartflows: Dict[Any, "SubHeartflow"] = {}
async def get_or_create_subheartflow(self, subheartflow_id: Any) -> Optional["SubHeartflow"]:
"""获取或创建一个新的SubHeartflow实例"""
if subheartflow_id in self.subheartflows:
if subflow := self.subheartflows.get(subheartflow_id):
return subflow
try:
new_subflow = SubHeartflow(subheartflow_id)
await new_subflow.initialize()
# 注册子心流
self.subheartflows[subheartflow_id] = new_subflow
heartflow_name = get_chat_manager().get_stream_name(subheartflow_id) or subheartflow_id
logger.info(f"[{heartflow_name}] 开始接收消息")
return new_subflow
except Exception as e:
logger.error(f"创建子心流 {subheartflow_id} 失败: {e}", exc_info=True)
traceback.print_exc()
return None
heartflow = Heartflow()

View File

@@ -1,180 +0,0 @@
import asyncio
import re
import math
import traceback
from datetime import datetime
from typing import Tuple, TYPE_CHECKING
from src.config.config import global_config
from src.chat.memory_system.Hippocampus import hippocampus_manager
from src.chat.message_receive.message import MessageRecv
from src.chat.message_receive.storage import MessageStorage
from src.chat.heart_flow.heartflow import heartflow
from src.chat.utils.utils import is_mentioned_bot_in_message
from src.chat.utils.timer_calculator import Timer
from src.chat.utils.chat_message_builder import replace_user_references_sync
from src.common.logger import get_logger
from src.person_info.relationship_manager import get_relationship_manager
from src.mood.mood_manager import mood_manager
if TYPE_CHECKING:
from src.chat.heart_flow.sub_heartflow import SubHeartflow
logger = get_logger("chat")
async def _process_relationship(message: MessageRecv) -> None:
"""处理用户关系逻辑
Args:
message: 消息对象,包含用户信息
"""
platform = message.message_info.platform
user_id = message.message_info.user_info.user_id # type: ignore
nickname = message.message_info.user_info.user_nickname # type: ignore
cardname = message.message_info.user_info.user_cardname or nickname # type: ignore
relationship_manager = get_relationship_manager()
is_known = await relationship_manager.is_known_some_one(platform, user_id)
if not is_known:
logger.info(f"首次认识用户: {nickname}")
await relationship_manager.first_knowing_some_one(platform, user_id, nickname, cardname) # type: ignore
async def _calculate_interest(message: MessageRecv) -> Tuple[float, bool, list[str]]:
"""计算消息的兴趣度
Args:
message: 待处理的消息对象
Returns:
Tuple[float, bool, list[str]]: (兴趣度, 是否被提及, 关键词)
"""
is_mentioned, _ = is_mentioned_bot_in_message(message)
interested_rate = 0.0
with Timer("记忆激活"):
interested_rate, keywords = await hippocampus_manager.get_activate_from_text(
message.processed_plain_text,
max_depth=4,
fast_retrieval=False,
)
message.key_words = keywords
message.key_words_lite = keywords
logger.debug(f"记忆激活率: {interested_rate:.2f}, 关键词: {keywords}")
text_len = len(message.processed_plain_text)
# 根据文本长度分布调整兴趣度,采用分段函数实现更精确的兴趣度计算
# 基于实际分布0-5字符(26.57%), 6-10字符(27.18%), 11-20字符(22.76%), 21-30字符(10.33%), 31+字符(13.86%)
if text_len == 0:
base_interest = 0.01 # 空消息最低兴趣度
elif text_len <= 5:
# 1-5字符线性增长 0.01 -> 0.03
base_interest = 0.01 + (text_len - 1) * (0.03 - 0.01) / 4
elif text_len <= 10:
# 6-10字符线性增长 0.03 -> 0.06
base_interest = 0.03 + (text_len - 5) * (0.06 - 0.03) / 5
elif text_len <= 20:
# 11-20字符线性增长 0.06 -> 0.12
base_interest = 0.06 + (text_len - 10) * (0.12 - 0.06) / 10
elif text_len <= 30:
# 21-30字符线性增长 0.12 -> 0.18
base_interest = 0.12 + (text_len - 20) * (0.18 - 0.12) / 10
elif text_len <= 50:
# 31-50字符线性增长 0.18 -> 0.22
base_interest = 0.18 + (text_len - 30) * (0.22 - 0.18) / 20
elif text_len <= 100:
# 51-100字符线性增长 0.22 -> 0.26
base_interest = 0.22 + (text_len - 50) * (0.26 - 0.22) / 50
else:
# 100+字符:对数增长 0.26 -> 0.3,增长率递减
base_interest = 0.26 + (0.3 - 0.26) * (math.log10(text_len - 99) / math.log10(901)) # 1000-99=901
# 确保在范围内
base_interest = min(max(base_interest, 0.01), 0.3)
interested_rate += base_interest
if is_mentioned:
interest_increase_on_mention = 1
interested_rate += interest_increase_on_mention
return interested_rate, is_mentioned, keywords
class HeartFCMessageReceiver:
"""心流处理器,负责处理接收到的消息并计算兴趣度"""
def __init__(self):
"""初始化心流处理器,创建消息存储实例"""
self.storage = MessageStorage()
async def process_message(self, message: MessageRecv) -> None:
"""处理接收到的原始消息数据
主要流程:
1. 消息解析与初始化
2. 消息缓冲处理
4. 过滤检查
5. 兴趣度计算
6. 关系处理
Args:
message_data: 原始消息字符串
"""
try:
# 1. 消息解析与初始化
userinfo = message.message_info.user_info
chat = message.chat_stream
# 2. 兴趣度计算与更新
interested_rate, is_mentioned, keywords = await _calculate_interest(message)
message.interest_value = interested_rate
message.is_mentioned = is_mentioned
await self.storage.store_message(message, chat)
subheartflow: SubHeartflow = await heartflow.get_or_create_subheartflow(chat.stream_id) # type: ignore
# subheartflow.add_message_to_normal_chat_cache(message, interested_rate, is_mentioned)
if global_config.mood.enable_mood:
chat_mood = mood_manager.get_mood_by_chat_id(subheartflow.chat_id)
asyncio.create_task(chat_mood.update_mood_by_message(message, interested_rate))
# 3. 日志记录
mes_name = chat.group_info.group_name if chat.group_info else "私聊"
# current_time = time.strftime("%H:%M:%S", time.localtime(message.message_info.time))
current_talk_frequency = global_config.chat.get_current_talk_frequency(chat.stream_id)
# 如果消息中包含图片标识,则将 [picid:...] 替换为 [图片]
picid_pattern = r"\[picid:([^\]]+)\]"
processed_plain_text = re.sub(picid_pattern, "[图片]", message.processed_plain_text)
# 应用用户引用格式替换,将回复<aaa:bbb>和@<aaa:bbb>格式转换为可读格式
processed_plain_text = replace_user_references_sync(
processed_plain_text,
message.message_info.platform, # type: ignore
replace_bot_name=True,
)
if keywords:
logger.info(
f"[{mes_name}]{userinfo.user_nickname}:{processed_plain_text}[兴趣度:{interested_rate:.2f}][关键词:{keywords}]"
) # type: ignore
else:
logger.info(
f"[{mes_name}]{userinfo.user_nickname}:{processed_plain_text}[兴趣度:{interested_rate:.2f}]"
) # type: ignore
logger.debug(f"[{mes_name}][当前时段回复频率: {current_talk_frequency}]")
# 4. 关系处理
if global_config.relationship.enable_relationship:
await _process_relationship(message)
except Exception as e:
logger.error(f"消息处理失败: {e}")
print(traceback.format_exc())

View File

@@ -1,41 +0,0 @@
from rich.traceback import install
from src.common.logger import get_logger
from src.chat.message_receive.chat_stream import get_chat_manager
from src.chat.chat_loop.heartFC_chat import HeartFChatting
from src.chat.utils.utils import get_chat_type_and_target_info
logger = get_logger("sub_heartflow")
install(extra_lines=3)
class SubHeartflow:
def __init__(
self,
subheartflow_id,
):
"""子心流初始化函数
Args:
subheartflow_id: 子心流唯一标识符
"""
# 基础属性,两个值是一样的
self.subheartflow_id = subheartflow_id
self.chat_id = subheartflow_id
self.is_group_chat, self.chat_target_info = get_chat_type_and_target_info(self.chat_id)
self.log_prefix = get_chat_manager().get_stream_name(self.subheartflow_id) or self.subheartflow_id
# focus模式退出冷却时间管理
self.last_focus_exit_time: float = 0 # 上次退出focus模式的时间
# 随便水群 normal_chat 和 认真水群 focus_chat 实例
# CHAT模式激活 随便水群 FOCUS模式激活 认真水群
self.heart_fc_instance: HeartFChatting = HeartFChatting(
chat_id=self.subheartflow_id,
) # 该sub_heartflow的HeartFChatting实例
async def initialize(self):
"""异步初始化方法,创建兴趣流并确定聊天类型"""
await self.heart_fc_instance.start()

View File

@@ -11,7 +11,7 @@ from src.mood.mood_manager import mood_manager # 导入情绪管理器
from src.chat.message_receive.chat_stream import get_chat_manager, ChatStream
from src.chat.message_receive.message import MessageRecv, MessageRecvS4U
from src.chat.message_receive.storage import MessageStorage
from src.chat.heart_flow.heartflow_message_processor import HeartFCMessageReceiver
from src.chat.affinity_flow.afc_manager import afc_manager
from src.chat.utils.prompt import Prompt, global_prompt_manager
from src.plugin_system.core import component_registry, event_manager, global_announcement_manager
from src.plugin_system.base import BaseCommand, EventType
@@ -73,7 +73,7 @@ class ChatBot:
self.bot = None # bot 实例引用
self._started = False
self.mood_manager = mood_manager # 获取情绪管理器单例
self.heartflow_message_receiver = HeartFCMessageReceiver() # 新增
# 亲和力流消息处理器 - 直接使用全局afc_manager
self.s4u_message_processor = S4UMessageProcessor()
@@ -399,9 +399,6 @@ class ChatBot:
# logger.debug(str(message_data))
message = MessageRecv(message_data)
if await self.handle_notice_message(message):
...
group_info = message.message_info.group_info
user_info = message.message_info.user_info
if message.message_info.additional_config:
@@ -467,7 +464,13 @@ class ChatBot:
template_group_name = None
async def preprocess():
await self.heartflow_message_receiver.process_message(message)
# 使用亲和力流系统处理消息
message_data = {
"message_info": message.message_info.__dict__,
"processed_plain_text": message.processed_plain_text,
"chat_stream": message.chat_stream.__dict__ if message.chat_stream else None
}
await afc_manager.process_message(message.chat_stream.stream_id, message_data)
if template_group_name:
async with global_prompt_manager.async_message_scope(template_group_name):