Files
Mofox-Core/src/chat/chat_loop/heartFC_chat.py
minecraft1024a 7513c875d5 refactor(sleep): 将睡眠和唤醒管理逻辑重构到独立的sleep_manager模块
将原先分散在 `chat/chat_loop` 和 `schedule` 目录下的 `WakeUpManager` 和 `SleepManager` 相关逻辑,统一移动并整合到新的 `src/chat/chat_loop/sleep_manager` 模块中。

这次重构旨在提高代码的模块化和内聚性,使睡眠相关的功能更加集中和易于管理。同时更新了所有相关的导入路径,以反映新的文件结构。
2025-11-19 22:57:45 +08:00

532 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import time
import traceback
import random
from typing import Optional, List, Dict, Any
from collections import deque
from src.common.logger import get_logger
from src.config.config import global_config
from src.person_info.relationship_builder_manager import relationship_builder_manager
from src.chat.express.expression_learner import expression_learner_manager
from src.schedule.schedule_manager import schedule_manager, SleepState
from src.plugin_system.apis import message_api
from .hfc_context import HfcContext
from .energy_manager import EnergyManager
from .proactive.proactive_thinker import ProactiveThinker
from .cycle_processor import CycleProcessor
from .response_handler import ResponseHandler
from .cycle_tracker import CycleTracker
from .sleep_manager.wakeup_manager import WakeUpManager
from .proactive.events import ProactiveTriggerEvent
logger = get_logger("hfc")
class HeartFChatting:
def __init__(self, chat_id: str):
"""
初始化心跳聊天管理器
Args:
chat_id: 聊天ID标识符
功能说明:
- 创建聊天上下文和所有子管理器
- 初始化循环跟踪器、响应处理器、循环处理器等核心组件
- 设置能量管理器、主动思考器和普通模式处理器
- 初始化聊天模式并记录初始化完成日志
"""
self.context = HfcContext(chat_id)
self.cycle_tracker = CycleTracker(self.context)
self.response_handler = ResponseHandler(self.context)
self.cycle_processor = CycleProcessor(self.context, self.response_handler, self.cycle_tracker)
self.energy_manager = EnergyManager(self.context)
self.proactive_thinker = ProactiveThinker(self.context, self.cycle_processor)
self.wakeup_manager = WakeUpManager(self.context)
# 将唤醒度管理器设置到上下文中
self.context.wakeup_manager = self.wakeup_manager
self.context.energy_manager = self.energy_manager
# 将HeartFChatting实例设置到上下文中以便其他组件可以调用其方法
self.context.chat_instance = self
self._loop_task: Optional[asyncio.Task] = None
self._proactive_monitor_task: Optional[asyncio.Task] = None
# 记录最近3次的兴趣度
self.recent_interest_records: deque = deque(maxlen=3)
self._initialize_chat_mode()
logger.info(f"{self.context.log_prefix} HeartFChatting 初始化完成")
def _initialize_chat_mode(self):
"""
初始化聊天模式
功能说明:
- 检测是否为群聊环境
- 根据全局配置设置强制聊天模式
- 在focus模式下设置能量值为35
- 在normal模式下设置能量值为15
- 如果是auto模式则保持默认设置
"""
is_group_chat = self.context.chat_stream.group_info is not None if self.context.chat_stream else False
if is_group_chat and global_config.chat.group_chat_mode != "auto":
self.context.energy_value = 25
async def start(self):
"""
启动心跳聊天系统
功能说明:
- 检查是否已经在运行,避免重复启动
- 初始化关系构建器和表达学习器
- 启动能量管理器和主动思考器
- 创建主聊天循环任务并设置完成回调
- 记录启动完成日志
"""
if self.context.running:
return
self.context.running = True
self.context.relationship_builder = relationship_builder_manager.get_or_create_builder(self.context.stream_id)
self.context.expression_learner = expression_learner_manager.get_expression_learner(self.context.stream_id)
# 启动主动思考监视器
if global_config.chat.enable_proactive_thinking:
self._proactive_monitor_task = asyncio.create_task(self._proactive_monitor_loop())
self._proactive_monitor_task.add_done_callback(self._handle_proactive_monitor_completion)
logger.info(f"{self.context.log_prefix} 主动思考监视器已启动")
await self.wakeup_manager.start()
self._loop_task = asyncio.create_task(self._main_chat_loop())
self._loop_task.add_done_callback(self._handle_loop_completion)
logger.info(f"{self.context.log_prefix} HeartFChatting 启动完成")
async def stop(self):
"""
停止心跳聊天系统
功能说明:
- 检查是否正在运行,避免重复停止
- 设置运行状态为False
- 停止能量管理器和主动思考器
- 取消主聊天循环任务
- 记录停止完成日志
"""
if not self.context.running:
return
self.context.running = False
# 停止主动思考监视器
if self._proactive_monitor_task and not self._proactive_monitor_task.done():
self._proactive_monitor_task.cancel()
await asyncio.sleep(0)
logger.info(f"{self.context.log_prefix} 主动思考监视器已停止")
await self.wakeup_manager.stop()
if self._loop_task and not self._loop_task.done():
self._loop_task.cancel()
await asyncio.sleep(0)
logger.info(f"{self.context.log_prefix} HeartFChatting 已停止")
def _handle_loop_completion(self, task: asyncio.Task):
"""
处理主循环任务完成
Args:
task: 完成的异步任务对象
功能说明:
- 处理任务异常完成的情况
- 区分正常停止和异常终止
- 记录相应的日志信息
- 处理取消任务的情况
"""
try:
if exception := task.exception():
logger.error(f"{self.context.log_prefix} HeartFChatting: 脱离了聊天(异常): {exception}")
logger.error(traceback.format_exc())
else:
logger.info(f"{self.context.log_prefix} HeartFChatting: 脱离了聊天 (外部停止)")
except asyncio.CancelledError:
logger.info(f"{self.context.log_prefix} HeartFChatting: 结束了聊天")
def _handle_proactive_monitor_completion(self, task: asyncio.Task):
try:
if exception := task.exception():
logger.error(f"{self.context.log_prefix} 主动思考监视器异常: {exception}")
else:
logger.info(f"{self.context.log_prefix} 主动思考监视器正常结束")
except asyncio.CancelledError:
logger.info(f"{self.context.log_prefix} 主动思考监视器被取消")
async def _proactive_monitor_loop(self):
while self.context.running:
await asyncio.sleep(15)
if not self._should_enable_proactive_thinking():
continue
current_time = time.time()
silence_duration = current_time - self.context.last_message_time
target_interval = self._get_dynamic_thinking_interval()
if silence_duration >= target_interval:
try:
formatted_time = self._format_duration(silence_duration)
event = ProactiveTriggerEvent(
source="silence_monitor",
reason=f"聊天已沉默 {formatted_time}",
metadata={"silence_duration": silence_duration},
)
await self.proactive_thinker.think(event)
self.context.last_message_time = current_time
except Exception as e:
logger.error(f"{self.context.log_prefix} 主动思考触发执行出错: {e}")
logger.error(traceback.format_exc())
def _should_enable_proactive_thinking(self) -> bool:
if not self.context.chat_stream:
return False
is_group_chat = self.context.chat_stream.group_info is not None
if is_group_chat and not global_config.chat.proactive_thinking_in_group:
return False
if not is_group_chat and not global_config.chat.proactive_thinking_in_private:
return False
stream_parts = self.context.stream_id.split(":")
current_chat_identifier = f"{stream_parts}:{stream_parts}" if len(stream_parts) >= 2 else self.context.stream_id
enable_list = getattr(
global_config.chat,
"proactive_thinking_enable_in_groups" if is_group_chat else "proactive_thinking_enable_in_private",
[],
)
return not enable_list or current_chat_identifier in enable_list
def _get_dynamic_thinking_interval(self) -> float:
try:
from src.utils.timing_utils import get_normal_distributed_interval
base_interval = global_config.chat.proactive_thinking_interval
delta_sigma = getattr(global_config.chat, "delta_sigma", 120)
if base_interval <= 0:
base_interval = abs(base_interval)
if delta_sigma < 0:
delta_sigma = abs(delta_sigma)
if base_interval == 0 and delta_sigma == 0:
return 300
if delta_sigma == 0:
return base_interval
sigma_percentage = delta_sigma / base_interval if base_interval > 0 else delta_sigma / 1000
return get_normal_distributed_interval(base_interval, sigma_percentage, 1, 86400, use_3sigma_rule=True)
except ImportError:
logger.warning(f"{self.context.log_prefix} timing_utils不可用使用固定间隔")
return max(300, abs(global_config.chat.proactive_thinking_interval))
except Exception as e:
logger.error(f"{self.context.log_prefix} 动态间隔计算出错: {e},使用固定间隔")
return max(300, abs(global_config.chat.proactive_thinking_interval))
def _format_duration(self, seconds: float) -> str:
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
parts = []
if hours > 0:
parts.append(f"{hours}小时")
if minutes > 0:
parts.append(f"{minutes}")
if secs > 0 or not parts:
parts.append(f"{secs}")
return "".join(parts)
async def _main_chat_loop(self):
"""
主聊天循环
功能说明:
- 持续运行聊天处理循环
- 只有在有新消息时才进行思考循环
- 无新消息时等待新消息到达(由主动思考系统单独处理主动发言)
- 处理取消和异常情况
- 在异常时尝试重新启动循环
"""
try:
while self.context.running:
has_new_messages = await self._loop_body()
if has_new_messages:
# 有新消息时,继续快速检查是否还有更多消息
await asyncio.sleep(1)
else:
# 无新消息时,等待较长时间再检查
# 这里只是为了定期检查系统状态,不进行思考循环
# 真正的新消息响应依赖于消息到达时的通知
await asyncio.sleep(1.0)
except asyncio.CancelledError:
logger.info(f"{self.context.log_prefix} 麦麦已关闭聊天")
except Exception:
logger.error(f"{self.context.log_prefix} 麦麦聊天意外错误将于3s后尝试重新启动")
print(traceback.format_exc())
await asyncio.sleep(3)
self._loop_task = asyncio.create_task(self._main_chat_loop())
logger.error(f"{self.context.log_prefix} 结束了当前聊天循环")
async def _loop_body(self) -> bool:
"""
单次循环体处理
Returns:
bool: 是否处理了新消息
功能说明:
- 检查是否处于睡眠模式,如果是则处理唤醒度逻辑
- 获取最近的新消息(过滤机器人自己的消息和命令)
- 只有在有新消息时才进行思考循环处理
- 更新最后消息时间和读取时间
- 根据当前聊天模式执行不同的处理逻辑
- FOCUS模式直接处理所有消息并检查退出条件
- NORMAL模式检查进入FOCUS模式的条件并通过normal_mode_handler处理消息
"""
# --- 核心状态更新 ---
await schedule_manager.update_sleep_state(self.wakeup_manager)
current_sleep_state = schedule_manager.get_current_sleep_state()
is_sleeping = current_sleep_state == SleepState.SLEEPING
is_in_insomnia = current_sleep_state == SleepState.INSOMNIA
# 核心修复:在睡眠模式(包括失眠)下获取消息时,不过滤命令消息,以确保@消息能被接收
filter_command_flag = not (is_sleeping or is_in_insomnia)
recent_messages = message_api.get_messages_by_time_in_chat(
chat_id=self.context.stream_id,
start_time=self.context.last_read_time,
end_time=time.time(),
limit=10,
limit_mode="latest",
filter_mai=True,
filter_command=filter_command_flag,
)
has_new_messages = bool(recent_messages)
new_message_count = len(recent_messages)
# 只有在有新消息时才进行思考循环处理
if has_new_messages:
self.context.last_message_time = time.time()
self.context.last_read_time = time.time()
# 处理唤醒度逻辑
if current_sleep_state in [SleepState.SLEEPING, SleepState.PREPARING_SLEEP, SleepState.INSOMNIA]:
self._handle_wakeup_messages(recent_messages)
# 再次获取最新状态,因为 handle_wakeup 可能导致状态变为 WOKEN_UP
current_sleep_state = schedule_manager.get_current_sleep_state()
if current_sleep_state == SleepState.SLEEPING:
# 只有在纯粹的 SLEEPING 状态下才跳过消息处理
return True
if current_sleep_state == SleepState.WOKEN_UP:
logger.info(f"{self.context.log_prefix} 从睡眠中被唤醒,将处理积压的消息。")
# 根据聊天模式处理新消息
should_process, interest_value = await self._should_process_messages(recent_messages)
if not should_process:
# 消息数量不足或兴趣不够,等待
await asyncio.sleep(0.5)
return True # Skip rest of the logic for this iteration
# Messages should be processed
action_type = await self.cycle_processor.observe(interest_value=interest_value)
# 管理no_reply计数器
if action_type != "no_reply":
self.recent_interest_records.clear()
self.context.no_reply_consecutive = 0
logger.debug(f"{self.context.log_prefix} 执行了{action_type}动作重置no_reply计数器")
else: # action_type == "no_reply"
self.context.no_reply_consecutive += 1
self._determine_form_type()
# 在一轮动作执行完毕后,增加睡眠压力
if self.context.energy_manager and global_config.sleep_system.enable_insomnia_system:
if action_type not in ["no_reply", "no_action"]:
self.context.energy_manager.increase_sleep_pressure()
# 如果成功观察,增加能量值并重置累积兴趣值
self.context.energy_value += 1 / global_config.chat.focus_value
# 重置累积兴趣值,因为消息已经被成功处理
self.context.breaking_accumulated_interest = 0.0
logger.info(
f"{self.context.log_prefix} 能量值增加,当前能量值:{self.context.energy_value:.1f},重置累积兴趣值"
)
# 更新上一帧的睡眠状态
self.context.was_sleeping = is_sleeping
# --- 重新入睡逻辑 ---
# 如果被吵醒了,并且在一定时间内没有新消息,则尝试重新入睡
if schedule_manager.get_current_sleep_state() == SleepState.WOKEN_UP and not has_new_messages:
re_sleep_delay = global_config.sleep_system.re_sleep_delay_minutes * 60
# 使用 last_message_time 来判断空闲时间
if time.time() - self.context.last_message_time > re_sleep_delay:
logger.info(
f"{self.context.log_prefix} 已被唤醒且超过 {re_sleep_delay / 60} 分钟无新消息,尝试重新入睡。"
)
schedule_manager.reset_sleep_state_after_wakeup()
# 保存HFC上下文状态
self.context.save_context_state()
return has_new_messages
def _handle_wakeup_messages(self, messages):
"""
处理休眠状态下的消息,累积唤醒度
Args:
messages: 消息列表
功能说明:
- 区分私聊和群聊消息
- 检查群聊消息是否艾特了机器人
- 调用唤醒度管理器累积唤醒度
- 如果达到阈值则唤醒并进入愤怒状态
"""
if not self.wakeup_manager:
return
is_private_chat = self.context.chat_stream.group_info is None if self.context.chat_stream else False
for message in messages:
is_mentioned = False
# 检查群聊消息是否艾特了机器人
if not is_private_chat:
# 最终修复:直接使用消息对象中由上游处理好的 is_mention 字段。
# 该字段在 message.py 的 MessageRecv._process_single_segment 中被设置。
if message.get("is_mentioned"):
is_mentioned = True
# 累积唤醒度
woke_up = self.wakeup_manager.add_wakeup_value(is_private_chat, is_mentioned)
if woke_up:
logger.info(f"{self.context.log_prefix} 被消息吵醒,进入愤怒状态!")
break
def _determine_form_type(self) -> str:
"""判断使用哪种形式的no_reply"""
# 检查是否启用breaking模式
if not getattr(global_config.chat, "enable_breaking_mode", False):
logger.info(f"{self.context.log_prefix} breaking模式已禁用使用waiting形式")
self.context.focus_energy = 1
return "waiting"
# 如果连续no_reply次数少于3次使用waiting形式
if self.context.no_reply_consecutive <= 3:
self.context.focus_energy = 1
return "waiting"
else:
# 使用累积兴趣值而不是最近3次的记录
total_interest = self.context.breaking_accumulated_interest
# 计算调整后的阈值
adjusted_threshold = 1 / global_config.chat.get_current_talk_frequency(self.context.stream_id)
logger.info(
f"{self.context.log_prefix} 累积兴趣值: {total_interest:.2f}, 调整后阈值: {adjusted_threshold:.2f}"
)
# 如果累积兴趣值小于阈值进入breaking形式
if total_interest < adjusted_threshold:
logger.info(f"{self.context.log_prefix} 累积兴趣度不足进入breaking形式")
self.context.focus_energy = random.randint(3, 6)
return "breaking"
else:
logger.info(f"{self.context.log_prefix} 累积兴趣度充足使用waiting形式")
self.context.focus_energy = 1
return "waiting"
async def _should_process_messages(self, new_message: List[Dict[str, Any]]) -> tuple[bool, float]:
"""
统一判断是否应该处理消息的函数
根据当前循环模式和消息内容决定是否继续处理
"""
if not new_message:
return False, 0.0
new_message_count = len(new_message)
talk_frequency = global_config.chat.get_current_talk_frequency(self.context.stream_id)
modified_exit_count_threshold = self.context.focus_energy * 0.5 / talk_frequency
modified_exit_interest_threshold = 1.5 / talk_frequency
# 计算当前批次消息的兴趣值
batch_interest = 0.0
for msg_dict in new_message:
interest_value = msg_dict.get("interest_value", 0.0)
if msg_dict.get("processed_plain_text", ""):
batch_interest += interest_value
# 在breaking形式下累积所有消息的兴趣值
if new_message_count > 0:
self.context.breaking_accumulated_interest += batch_interest
total_interest = self.context.breaking_accumulated_interest
else:
total_interest = self.context.breaking_accumulated_interest
if new_message_count >= modified_exit_count_threshold:
# 记录兴趣度到列表
self.recent_interest_records.append(total_interest)
# 重置累积兴趣值,因为已经达到了消息数量阈值
self.context.breaking_accumulated_interest = 0.0
logger.info(
f"{self.context.log_prefix} 累计消息数量达到{new_message_count}条(>{modified_exit_count_threshold:.1f}),结束等待,累积兴趣值: {total_interest:.2f}"
)
return True, total_interest / new_message_count
# 检查累计兴趣值
if new_message_count > 0:
# 只在兴趣值变化时输出log
if not hasattr(self, "_last_accumulated_interest") or total_interest != self._last_accumulated_interest:
logger.info(
f"{self.context.log_prefix} breaking形式当前累积兴趣值: {total_interest:.2f}, 专注度: {global_config.chat.focus_value:.1f}"
)
self._last_accumulated_interest = total_interest
if total_interest >= modified_exit_interest_threshold:
# 记录兴趣度到列表
self.recent_interest_records.append(total_interest)
# 重置累积兴趣值,因为已经达到了兴趣值阈值
self.context.breaking_accumulated_interest = 0.0
logger.info(
f"{self.context.log_prefix} 累计兴趣值达到{total_interest:.2f}(>{modified_exit_interest_threshold:.1f}),结束等待"
)
return True, total_interest / new_message_count
# 每10秒输出一次等待状态
if (
int(time.time() - self.context.last_read_time) > 0
and int(time.time() - self.context.last_read_time) % 10 == 0
):
logger.info(
f"{self.context.log_prefix} 已等待{time.time() - self.context.last_read_time:.0f}秒,累计{new_message_count}条消息,累积兴趣{total_interest:.1f},继续等待..."
)
await asyncio.sleep(0.5)
return False, 0.0