refactor(sleep_system): 重构睡眠系统为单例模式并优化启动逻辑

将睡眠系统的核心组件 `SleepStateManager` 从全局变量实例化改为通过 `get_sleep_state_manager()` 函数获取的单例模式。这一改变解决了模块间的循环导入问题,并提升了代码的组织性和可维护性。

主要变更:
- 引入 `get_sleep_state_manager` 函数以惰性加载方式创建和返回 `SleepStateManager` 单例。
- 将 `sleep_logic.py` 中重复的作息时间计算逻辑提取到新的 `utils.py` 模块中,以实现代码复用。
- 在 `SleepStateManager` 初始化时增加 `_refresh_sleep_state` 方法,用于在程序启动时校准睡眠状态,确保状态与当前时间一致,防止因程序重启导致的状态错乱。
- 更新所有调用点,使用新的 `get_sleep_state_manager()` 函数来访问状态管理器。
This commit is contained in:
minecraft1024a
2025-10-24 21:26:24 +08:00
parent 663e9e0a23
commit dd5770b3f0
7 changed files with 198 additions and 108 deletions

View File

@@ -20,7 +20,7 @@ from src.plugin_system.apis.chat_api import get_chat_manager
from .distribution_manager import stream_loop_manager
from .global_notice_manager import NoticeScope, global_notice_manager
from .sleep_system.state_manager import SleepState, sleep_state_manager
from .sleep_system.state_manager import SleepState, get_sleep_state_manager
if TYPE_CHECKING:
pass
@@ -150,7 +150,7 @@ class MessageManager:
"""添加消息到指定聊天流"""
# 在消息处理的最前端检查睡眠状态
if global_config.sleep_system.enable:
current_sleep_state = sleep_state_manager.get_current_state()
current_sleep_state = get_sleep_state_manager().get_current_state()
if current_sleep_state == SleepState.SLEEPING:
logger.info(f"处于 {current_sleep_state.name} 状态,消息被拦截。")
return # 直接返回,不处理消息

View File

@@ -4,8 +4,10 @@ from datetime import datetime, timedelta
from src.common.logger import get_logger
from src.config.config import global_config
from src.schedule.schedule_manager import schedule_manager
from . import utils
from .state_manager import SleepState,get_sleep_state_manager
from .state_manager import SleepState, sleep_state_manager
logger = get_logger("sleep_logic")
@@ -23,7 +25,7 @@ class SleepLogic:
检查并更新当前的睡眠状态,这是整个逻辑的入口。
由定时任务周期性调用。
"""
current_state = sleep_state_manager.get_current_state()
current_state = get_sleep_state_manager().get_current_state()
now = datetime.now()
if current_state == SleepState.AWAKE:
@@ -43,21 +45,23 @@ class SleepLogic:
"""
当状态为 AWAKE 时,检查是否应该进入睡眠。
"""
should_sleep, wake_up_time = self._should_be_sleeping(now)
from .state_manager import SleepState
should_sleep, wake_up_time = utils.should_be_sleeping(now)
if should_sleep:
logger.info("判断结果:应进入睡眠状态。")
sleep_state_manager.set_state(SleepState.SLEEPING, wake_up=wake_up_time)
get_sleep_state_manager().set_state(SleepState.SLEEPING, wake_up=wake_up_time)
def _check_should_wake_up(self, now: datetime):
"""
当状态为 SLEEPING 时,检查是否应该醒来。
这里包含了处理跨天获取日程的核心逻辑。
"""
wake_up_time = sleep_state_manager.get_wake_up_time()
from .state_manager import SleepState
wake_up_time = get_sleep_state_manager().get_wake_up_time()
# 核心逻辑:两段式检测
# 如果 state_manager 中还没有起床时间,说明是昨晚入睡,需要等待今天凌晨的新日程。
sleep_start_time = sleep_state_manager.get_sleep_start_time()
sleep_start_time = get_sleep_state_manager().get_sleep_start_time()
if not wake_up_time:
if sleep_start_time and now.date() > sleep_start_time.date():
logger.debug("当前为睡眠状态但无起床时间,尝试从新日程中解析...")
@@ -65,111 +69,24 @@ class SleepLogic:
if new_wake_up_time:
logger.info(f"成功从新日程获取到起床时间: {new_wake_up_time.strftime('%H:%M')}")
sleep_state_manager.set_wake_up_time(new_wake_up_time)
get_sleep_state_manager().set_wake_up_time(new_wake_up_time)
wake_up_time = new_wake_up_time
else:
logger.debug("未能获取到新的起床时间,继续睡眠。")
return
else:
logger.info("还没有到达第二天,继续睡眠。")
logger.info(f"尚未到苏醒时间,苏醒时间在{wake_up_time}")
if wake_up_time:
logger.info(f"尚未到苏醒时间,苏醒时间在{wake_up_time}")
if wake_up_time and now >= wake_up_time:
logger.info(f"当前时间 {now.strftime('%H:%M')} 已到达或超过预定起床时间 {wake_up_time.strftime('%H:%M')}")
sleep_state_manager.set_state(SleepState.AWAKE)
def _should_be_sleeping(self, now: datetime) -> tuple[bool, datetime | None]:
"""
判断在当前时刻,是否应该处于睡眠时间。
Returns:
元组 (是否应该睡眠, 预期的起床时间或None)
"""
sleep_config = global_config.sleep_system
if not sleep_config.enable:
return False, None
sleep_time, wake_up_time = None, None
if sleep_config.sleep_by_schedule:
sleep_time, _ = self._get_sleep_times_from_schedule(now)
if not sleep_time:
logger.debug("日程表模式开启,但未找到睡眠时间,使用固定时间作为备用。")
sleep_time, wake_up_time = self._get_fixed_sleep_times(now)
else:
sleep_time, wake_up_time = self._get_fixed_sleep_times(now)
if not sleep_time:
return False, None
# 检查当前时间是否在睡眠时间范围内
if now >= sleep_time:
# 如果起床时间是第二天(通常情况),且当前时间小于起床时间,则在睡眠范围内
if wake_up_time and wake_up_time > sleep_time and now < wake_up_time:
return True, wake_up_time
# 如果当前时间大于入睡时间,说明已经进入睡眠窗口
return True, wake_up_time
return False, None
def _get_fixed_sleep_times(self, now: datetime) -> tuple[datetime | None, datetime | None]:
"""
当使用“固定时间”模式时,从此方法计算睡眠和起床时间。
会加入配置中的随机偏移量,让作息更自然。
"""
sleep_config = global_config.sleep_system
try:
sleep_offset = random.randint(
-sleep_config.sleep_time_offset_minutes, sleep_config.sleep_time_offset_minutes
)
wake_up_offset = random.randint(
-sleep_config.wake_up_time_offset_minutes, sleep_config.wake_up_time_offset_minutes
)
sleep_t = datetime.strptime(sleep_config.fixed_sleep_time, "%H:%M").time()
wake_up_t = datetime.strptime(sleep_config.fixed_wake_up_time, "%H:%M").time()
sleep_time = datetime.combine(now.date(), sleep_t) + timedelta(minutes=sleep_offset)
# 如果起床时间比睡觉时间早,说明是第二天
wake_up_day = now.date() + timedelta(days=1) if wake_up_t < sleep_t else now.date()
wake_up_time = datetime.combine(wake_up_day, wake_up_t) + timedelta(minutes=wake_up_offset)
return sleep_time, wake_up_time
except (ValueError, TypeError) as e:
logger.error(f"解析固定睡眠时间失败: {e}")
return None, None
def _get_sleep_times_from_schedule(self, now: datetime) -> tuple[datetime | None, datetime | None]:
"""
当使用“日程表”模式时,从此方法获取睡眠时间。
实现了核心逻辑:
- 解析“今天”日程中的睡觉时间。
"""
# 阶段一:获取当天的睡觉时间
today_schedule = schedule_manager.today_schedule
sleep_time = None
if today_schedule:
for event in today_schedule:
activity = event.get("activity", "").lower()
if "sleep" in activity or "睡觉" in activity or "休息" in activity:
try:
time_range = event.get("time_range", "")
start_str, _ = time_range.split("-")
sleep_t = datetime.strptime(start_str.strip(), "%H:%M").time()
sleep_time = datetime.combine(now.date(), sleep_t)
break
except (ValueError, AttributeError):
logger.warning(f"解析日程中的睡眠时间失败: {event}")
continue
wake_up_time = None
return sleep_time, wake_up_time
get_sleep_state_manager().set_state(SleepState.AWAKE)
def _get_wakeup_times_from_schedule(self, now: datetime) -> tuple[datetime | None, datetime | None]:
"""
当使用“日程表”模式时,从此方法获取睡眠时间。
当使用“日程表”模式时,从此方法获取wakeup时间。
实现了核心逻辑:
- 解析“今天”日程中的睡觉时间。
- 解析“今天”日程中的wakeup时间。
"""
# 阶段一:获取当天的睡觉时间
today_schedule = schedule_manager.today_schedule

View File

@@ -4,6 +4,7 @@ from typing import Any
from src.common.logger import get_logger
from src.manager.local_store_manager import local_storage
from . import utils
logger = get_logger("sleep_state_manager")
@@ -46,6 +47,7 @@ class SleepStateManager:
self.state: dict[str, Any] = {}
self._default_state()
self.load_state()
self._refresh_sleep_state()
def _default_state(self):
"""
@@ -185,6 +187,72 @@ class SleepStateManager:
logger.info(f"更新预定起床时间为: {self.state['wake_up_time']}")
self.save_state()
def _refresh_sleep_state(self):
"""
程序启动时,刷新并校准当前的睡眠状态。
"""
now = datetime.now()
current_state = self.get_current_state()
# 检查并更新作息时间
# _should_be_sleeping 会综合判断固定时间和日程表,返回最终结果
_, new_wake_up_time_for_check = utils.should_be_sleeping(now)
# 我们需要的是睡眠开始时间,所以再单独获取一下
from src.config.config import global_config
if global_config.sleep_system.sleep_by_schedule:
new_sleep_time, _ = utils._get_sleep_times_from_schedule(now) # type: ignore
else:
new_sleep_time, _ = utils._get_fixed_sleep_times(now) # type: ignore
logger.info(f"新的睡眠时间:{new_sleep_time}")
stored_sleep_time = self.get_sleep_start_time()
if new_sleep_time and stored_sleep_time:
time_diff = abs(new_sleep_time - stored_sleep_time)
# 如果差异大于2小时则认为作息已更新
if time_diff > timedelta(hours=2):
logger.info(f"检测到新的睡眠时间 {new_sleep_time} 与存储的 {stored_sleep_time} 差异过大,将进行更新。")
# 更新状态以记录新的作息时间,但保持清醒
self.state["sleep_start_time"] = new_sleep_time.timestamp()
self.state["wake_up_time"] = new_wake_up_time_for_check.timestamp() if new_wake_up_time_for_check else None
self.save_state()
if current_state == SleepState.SLEEPING:
wake_up_time = self.get_wake_up_time()
if wake_up_time:
if now <= wake_up_time:
logger.info(f"启动时检测到已超过起床时间 (起床时间: {wake_up_time}),将状态强制唤醒。")
self.set_state(SleepState.AWAKE)
else:
# 如果没有起床时间,则根据睡眠开始时间判断
sleep_start_time = self.get_sleep_start_time()
if sleep_start_time:
logger.info(f"{now}-{sleep_start_time}")
if now > sleep_start_time:
logger.warning(f"当前时间 {now} 早于睡眠开始时间 {sleep_start_time}。将强制唤醒。")
self.set_state(SleepState.AWAKE)
# 如果 now >= sleep_start_time则说明正在正常睡眠无需操作
else:
# 如果连睡眠开始时间都没有,说明状态异常
logger.warning("启动时检测到睡眠状态异常:没有起床时间也没有睡眠开始时间。将强制唤醒。")
self.set_state(SleepState.AWAKE)
elif current_state == SleepState.AWAKE:
should_sleep, new_wake_up_time = utils.should_be_sleeping(now)
if should_sleep:
logger.info("启动时检测到当前时间应处于睡眠状态,但状态为清醒。将强制进入睡眠。")
self.set_state(SleepState.SLEEPING, wake_up=new_wake_up_time)
# 全局单例
sleep_state_manager = SleepStateManager()
_sleep_state_manager_instance: SleepStateManager | None = None
def get_sleep_state_manager() -> SleepStateManager:
"""
获取睡眠状态管理器的单例实例。
在首次调用时会创建实例。
"""
global _sleep_state_manager_instance
if _sleep_state_manager_instance is None:
_sleep_state_manager_instance = SleepStateManager()
return _sleep_state_manager_instance

View File

@@ -1,3 +1,5 @@
import asyncio
from src.common.logger import get_logger
from src.manager.async_task_manager import AsyncTask, async_task_manager

View File

@@ -0,0 +1,105 @@
"""
睡眠系统的工具函数模块
此模块包含了被 state_manager 和 sleep_logic 共享的、无状态的逻辑函数,
目的是为了解决循环引用的问题。
"""
import random
from datetime import datetime, timedelta
from src.common.logger import get_logger
from src.config.config import global_config
from src.schedule.schedule_manager import schedule_manager
logger = get_logger("sleep_utils")
def _get_fixed_sleep_times(now: datetime) -> tuple[datetime | None, datetime | None]:
"""
当使用“固定时间”模式时,从此方法计算睡眠和起床时间。
会加入配置中的随机偏移量,让作息更自然。
"""
sleep_config = global_config.sleep_system
try:
sleep_offset = random.randint(
-sleep_config.sleep_time_offset_minutes, sleep_config.sleep_time_offset_minutes
)
wake_up_offset = random.randint(
-sleep_config.wake_up_time_offset_minutes, sleep_config.wake_up_time_offset_minutes
)
sleep_t = datetime.strptime(sleep_config.fixed_sleep_time, "%H:%M").time()
wake_up_t = datetime.strptime(sleep_config.fixed_wake_up_time, "%H:%M").time()
sleep_time = datetime.combine(now.date(), sleep_t) + timedelta(minutes=sleep_offset)
# 如果起床时间比睡觉时间早,说明是第二天
wake_up_day = now.date() + timedelta(days=1) if wake_up_t < sleep_t else now.date()
wake_up_time = datetime.combine(wake_up_day, wake_up_t) + timedelta(minutes=wake_up_offset)
return sleep_time, wake_up_time
except (ValueError, TypeError) as e:
logger.error(f"解析固定睡眠时间失败: {e}")
return None, None
def _get_sleep_times_from_schedule(now: datetime) -> tuple[datetime | None, datetime | None]:
"""
当使用“日程表”模式时,从此方法获取睡眠时间。
实现了核心逻辑:
- 解析“今天”日程中的睡觉时间。
"""
# 阶段一:获取当天的睡觉时间
today_schedule = schedule_manager.today_schedule
sleep_time = None
if today_schedule:
for event in today_schedule:
activity = event.get("activity", "").lower()
if "sleep" in activity or "睡觉" in activity or "入睡" in activity or "进入梦乡" in activity:
try:
time_range = event.get("time_range", "")
start_str, _ = time_range.split("-")
sleep_t = datetime.strptime(start_str.strip(), "%H:%M").time()
sleep_time = datetime.combine(now.date(), sleep_t)
break
except (ValueError, AttributeError):
logger.warning(f"解析日程中的睡眠时间失败: {event}")
continue
wake_up_time = None
return sleep_time, wake_up_time
def should_be_sleeping(now: datetime) -> tuple[bool, datetime | None]:
"""
判断在当前时刻,是否应该处于睡眠时间。
Returns:
元组 (是否应该睡眠, 预期的起床时间或None)
"""
sleep_config = global_config.sleep_system
if not sleep_config.enable:
return False, None
sleep_time, wake_up_time = None, None
if sleep_config.sleep_by_schedule:
sleep_time, _ = _get_sleep_times_from_schedule(now)
if not sleep_time:
logger.debug("日程表模式开启,但未找到睡眠时间,使用固定时间作为备用。")
sleep_time, wake_up_time = _get_fixed_sleep_times(now)
else:
sleep_time, wake_up_time = _get_fixed_sleep_times(now)
if not sleep_time:
return False, None
# 检查当前时间是否在睡眠时间范围内
if now >= sleep_time:
# 如果起床时间是第二天(通常情况),且当前时间小于起床时间,则在睡眠范围内
if wake_up_time and wake_up_time > sleep_time and now < wake_up_time:
return True, wake_up_time
# 如果当前时间大于入睡时间,说明已经进入睡眠窗口
return True, wake_up_time
return False, None

View File

@@ -512,10 +512,8 @@ MoFox_Bot(第三方修改版)
logger.error(f"月度计划管理器初始化失败: {e}")
# 初始化日程管理器
if global_config.planning_system.schedule_enable:
try:
await schedule_manager.load_or_generate_today_schedule()
await schedule_manager.start_daily_schedule_generation()
await schedule_manager.initialize()
logger.info("日程表管理器初始化成功")
except Exception as e:
logger.error(f"日程表管理器初始化失败: {e}")

View File

@@ -6,7 +6,7 @@ from datetime import datetime
from maim_message import UserInfo
from src.chat.message_manager.sleep_system.state_manager import SleepState, sleep_state_manager
from src.chat.message_manager.sleep_system.state_manager import SleepState, get_sleep_state_manager
from src.chat.message_receive.chat_stream import get_chat_manager
from src.common.logger import get_logger
from src.config.config import global_config
@@ -39,7 +39,7 @@ class ColdStartTask(AsyncTask):
await asyncio.sleep(30) # 延迟以确保所有服务和聊天流已从数据库加载完毕
try:
current_state = sleep_state_manager.get_current_state()
current_state = get_sleep_state_manager().get_current_state()
if current_state == SleepState.SLEEPING:
logger.info("bot正在睡觉,跳过本次任务")
return
@@ -152,7 +152,7 @@ class ProactiveThinkingTask(AsyncTask):
# 计算下一次检查前的休眠时间
next_interval = self._get_next_interval()
try:
current_state = sleep_state_manager.get_current_state()
current_state = get_sleep_state_manager().get_current_state()
if current_state == SleepState.SLEEPING:
logger.info("bot正在睡觉,跳过本次任务")
return