refactor(sleep_manager): 引入上下文对象重构睡眠与唤醒状态管理

创建了 `SleepContext` 与 `WakeUpContext` 类,用于统一封装和管理各自模块的状态数据及其持久化逻辑。

- `SleepManager` 和 `WakeUpManager` 不再直接管理零散的状态属性(如 `_current_state`, `wakeup_value`),而是通过持有一个 Context 实例来进行状态的读写和保存。
- 移除了原有的 `SleepStateSerializer` 静态类和管理器中的 `_save_state` / `_load_state` 方法,将逻辑集中到新的 Context 类中。

此次重构旨在提升代码的内聚性,实现状态管理与业务逻辑的分离,使代码结构更清晰,更易于维护和扩展。
This commit is contained in:
minecraft1024a
2025-09-24 11:39:13 +08:00
committed by Windpicker-owo
parent e170999404
commit 31f2c9f282
4 changed files with 187 additions and 222 deletions

View File

@@ -6,11 +6,11 @@ from typing import Optional, TYPE_CHECKING
from src.common.logger import get_logger
from src.config.config import global_config
from .notification_sender import NotificationSender
from .sleep_state import SleepState, SleepStateSerializer
from .sleep_state import SleepState, SleepContext
from .time_checker import TimeChecker
if TYPE_CHECKING:
pass
from .wakeup_manager import WakeUpManager
logger = get_logger("sleep_manager")
@@ -25,28 +25,19 @@ class SleepManager:
"""
初始化睡眠管理器。
"""
self.time_checker = TimeChecker() # 时间检查器,用于判断当前是否处于理论睡眠时间
self.context = SleepContext() # 睡眠上下文,管理所有状态
self.time_checker = TimeChecker() # 时间检查器
self.last_sleep_log_time = 0 # 上次记录睡眠日志的时间戳
self.sleep_log_interval = 35 # 睡眠日志记录间隔(秒)
# --- 统一睡眠状态管理 ---
self._current_state: SleepState = SleepState.AWAKE # 当前睡眠状态
self._sleep_buffer_end_time: Optional[datetime] = None # 睡眠缓冲结束时间,用于状态转换
self._total_delayed_minutes_today: float = 0.0 # 今天总共延迟入睡的分钟数
self._last_sleep_check_date: Optional[date] = None # 上次检查睡眠状态的日期
self._last_fully_slept_log_time: float = 0 # 上次完全进入睡眠状态的时间戳
self._re_sleep_attempt_time: Optional[datetime] = None # 被吵醒后,尝试重新入睡的时间点
# 从本地存储加载上一次的睡眠状态
self._load_sleep_state()
def get_current_sleep_state(self) -> SleepState:
"""获取当前的睡眠状态。"""
return self._current_state
return self.context.current_state
def is_sleeping(self) -> bool:
"""判断当前是否处于正在睡觉的状态。"""
return self._current_state == SleepState.SLEEPING
return self.context.current_state == SleepState.SLEEPING
async def update_sleep_state(self, wakeup_manager: Optional["WakeUpManager"] = None):
"""
@@ -58,41 +49,42 @@ class SleepManager:
"""
# 如果全局禁用了睡眠系统,则强制设置为清醒状态并返回
if not global_config.sleep_system.enable:
if self._current_state != SleepState.AWAKE:
if self.context.current_state != SleepState.AWAKE:
logger.debug("睡眠系统禁用,强制设为 AWAKE")
self._current_state = SleepState.AWAKE
self.context.current_state = SleepState.AWAKE
return
now = datetime.now()
today = now.date()
# 跨天处理:如果日期变化,重置每日相关的睡眠状态
if self._last_sleep_check_date != today:
if self.context.last_sleep_check_date != today:
logger.info(f"新的一天 ({today}),重置睡眠状态。")
self._total_delayed_minutes_today = 0
self._current_state = SleepState.AWAKE
self._sleep_buffer_end_time = None
self._last_sleep_check_date = today
self._save_sleep_state()
self.context.total_delayed_minutes_today = 0
self.context.current_state = SleepState.AWAKE
self.context.sleep_buffer_end_time = None
self.context.last_sleep_check_date = today
self.context.save()
# 检查当前是否处于理论上的睡眠时间段
is_in_theoretical_sleep, activity = self.time_checker.is_in_theoretical_sleep_time(now.time())
# --- 状态机核心处理逻辑 ---
if self._current_state == SleepState.AWAKE:
current_state = self.context.current_state
if current_state == SleepState.AWAKE:
if is_in_theoretical_sleep:
self._handle_awake_to_sleep(now, activity, wakeup_manager)
elif self._current_state == SleepState.PREPARING_SLEEP:
elif current_state == SleepState.PREPARING_SLEEP:
self._handle_preparing_sleep(now, is_in_theoretical_sleep, wakeup_manager)
elif self._current_state == SleepState.SLEEPING:
elif current_state == SleepState.SLEEPING:
self._handle_sleeping(now, is_in_theoretical_sleep, activity, wakeup_manager)
elif self._current_state == SleepState.INSOMNIA:
elif current_state == SleepState.INSOMNIA:
self._handle_insomnia(now, is_in_theoretical_sleep)
elif self._current_state == SleepState.WOKEN_UP:
elif current_state == SleepState.WOKEN_UP:
self._handle_woken_up(now, is_in_theoretical_sleep, wakeup_manager)
def _handle_awake_to_sleep(self, now: datetime, activity: Optional[str], wakeup_manager: Optional["WakeUpManager"]):
@@ -118,13 +110,13 @@ class SleepManager:
delay_minutes = int(pressure_diff * max_delay_minutes)
# 确保总延迟不超过当日最大值
remaining_delay = max_delay_minutes - self._total_delayed_minutes_today
remaining_delay = max_delay_minutes - self.context.total_delayed_minutes_today
delay_minutes = min(delay_minutes, remaining_delay)
if delay_minutes > 0:
# 增加一些随机性
buffer_seconds = random.randint(int(delay_minutes * 0.8 * 60), int(delay_minutes * 1.2 * 60))
self._total_delayed_minutes_today += buffer_seconds / 60.0
self.context.total_delayed_minutes_today += buffer_seconds / 60.0
logger.info(f"睡眠压力 ({sleep_pressure:.1f}) 较低,延迟 {buffer_seconds / 60:.1f} 分钟入睡。")
else:
# 延迟额度已用完,设置一个较短的准备时间
@@ -139,22 +131,22 @@ class SleepManager:
if global_config.sleep_system.enable_pre_sleep_notification:
asyncio.create_task(NotificationSender.send_goodnight_notification(wakeup_manager.context))
self._sleep_buffer_end_time = now + timedelta(seconds=buffer_seconds)
self._current_state = SleepState.PREPARING_SLEEP
self.context.sleep_buffer_end_time = now + timedelta(seconds=buffer_seconds)
self.context.current_state = SleepState.PREPARING_SLEEP
logger.info(f"进入准备入睡状态,将在 {buffer_seconds / 60:.1f} 分钟内入睡。")
self._save_sleep_state()
self.context.save()
else:
# 无法获取 wakeup_manager退回旧逻辑
buffer_seconds = random.randint(1 * 60, 3 * 60)
self._sleep_buffer_end_time = now + timedelta(seconds=buffer_seconds)
self._current_state = SleepState.PREPARING_SLEEP
self.context.sleep_buffer_end_time = now + timedelta(seconds=buffer_seconds)
self.context.current_state = SleepState.PREPARING_SLEEP
logger.warning("无法获取 WakeUpManager弹性睡眠采用默认1-3分钟延迟。")
self._save_sleep_state()
self.context.save()
else:
# 非弹性睡眠模式
if wakeup_manager and global_config.sleep_system.enable_pre_sleep_notification:
asyncio.create_task(NotificationSender.send_goodnight_notification(wakeup_manager.context))
self._current_state = SleepState.SLEEPING
self.context.current_state = SleepState.SLEEPING
def _handle_preparing_sleep(self, now: datetime, is_in_theoretical_sleep: bool, wakeup_manager: Optional["WakeUpManager"]):
@@ -162,32 +154,32 @@ class SleepManager:
# 如果在准备期间离开了理论睡眠时间,则取消入睡
if not is_in_theoretical_sleep:
logger.info("准备入睡期间离开理论休眠时间,取消入睡,恢复清醒。")
self._current_state = SleepState.AWAKE
self._sleep_buffer_end_time = None
self._save_sleep_state()
self.context.current_state = SleepState.AWAKE
self.context.sleep_buffer_end_time = None
self.context.save()
# 如果缓冲时间结束,则正式进入睡眠状态
elif self._sleep_buffer_end_time and now >= self._sleep_buffer_end_time:
elif self.context.sleep_buffer_end_time and now >= self.context.sleep_buffer_end_time:
logger.info("睡眠缓冲期结束,正式进入休眠状态。")
self._current_state = SleepState.SLEEPING
self.context.current_state = SleepState.SLEEPING
self._last_fully_slept_log_time = now.timestamp()
# 设置一个随机的延迟,用于触发“睡后失眠”检查
delay_minutes_range = global_config.sleep_system.insomnia_trigger_delay_minutes
delay_minutes = random.randint(delay_minutes_range[0], delay_minutes_range[1])
self._sleep_buffer_end_time = now + timedelta(minutes=delay_minutes)
self.context.sleep_buffer_end_time = now + timedelta(minutes=delay_minutes)
logger.info(f"已设置睡后失眠检查,将在 {delay_minutes} 分钟后触发。")
self._save_sleep_state()
self.context.save()
def _handle_sleeping(self, now: datetime, is_in_theoretical_sleep: bool, activity: Optional[str], wakeup_manager: Optional["WakeUpManager"]):
"""处理“正在睡觉”状态下的逻辑。"""
# 如果理论睡眠时间结束,则自然醒来
if not is_in_theoretical_sleep:
logger.info("理论休眠时间结束,自然醒来。")
self._current_state = SleepState.AWAKE
self._save_sleep_state()
self.context.current_state = SleepState.AWAKE
self.context.save()
# 检查是否到了触发“睡后失眠”的时间点
elif self._sleep_buffer_end_time and now >= self._sleep_buffer_end_time:
elif self.context.sleep_buffer_end_time and now >= self.context.sleep_buffer_end_time:
if wakeup_manager:
sleep_pressure = wakeup_manager.context.sleep_pressure
pressure_threshold = global_config.sleep_system.flexible_sleep_pressure_threshold
@@ -201,12 +193,12 @@ class SleepManager:
logger.info("随机触发失眠。")
if insomnia_reason:
self._current_state = SleepState.INSOMNIA
self.context.current_state = SleepState.INSOMNIA
# 设置失眠的持续时间
duration_minutes_range = global_config.sleep_system.insomnia_duration_minutes
duration_minutes = random.randint(*duration_minutes_range)
self._sleep_buffer_end_time = now + timedelta(minutes=duration_minutes)
self.context.sleep_buffer_end_time = now + timedelta(minutes=duration_minutes)
# 发送失眠通知
asyncio.create_task(NotificationSender.send_insomnia_notification(wakeup_manager.context, insomnia_reason))
@@ -214,8 +206,8 @@ class SleepManager:
else:
# 睡眠压力正常,不触发失眠,清除检查时间点
logger.info(f"睡眠压力 ({sleep_pressure:.1f}) 正常,未触发睡后失眠。")
self._sleep_buffer_end_time = None
self._save_sleep_state()
self.context.sleep_buffer_end_time = None
self.context.save()
else:
# 定期记录睡眠日志
current_timestamp = now.timestamp()
@@ -228,26 +220,26 @@ class SleepManager:
# 如果离开理论睡眠时间,则失眠结束
if not is_in_theoretical_sleep:
logger.info("已离开理论休眠时间,失眠结束,恢复清醒。")
self._current_state = SleepState.AWAKE
self._sleep_buffer_end_time = None
self._save_sleep_state()
self.context.current_state = SleepState.AWAKE
self.context.sleep_buffer_end_time = None
self.context.save()
# 如果失眠持续时间已过,则恢复睡眠
elif self._sleep_buffer_end_time and now >= self._sleep_buffer_end_time:
elif self.context.sleep_buffer_end_time and now >= self.context.sleep_buffer_end_time:
logger.info("失眠状态持续时间已过,恢复睡眠。")
self._current_state = SleepState.SLEEPING
self._sleep_buffer_end_time = None
self._save_sleep_state()
self.context.current_state = SleepState.SLEEPING
self.context.sleep_buffer_end_time = None
self.context.save()
def _handle_woken_up(self, now: datetime, is_in_theoretical_sleep: bool, wakeup_manager: Optional["WakeUpManager"]):
"""处理“被吵醒”状态下的逻辑。"""
# 如果理论睡眠时间结束,则状态自动结束
if not is_in_theoretical_sleep:
logger.info("理论休眠时间结束,被吵醒的状态自动结束。")
self._current_state = SleepState.AWAKE
self._re_sleep_attempt_time = None
self._save_sleep_state()
self.context.current_state = SleepState.AWAKE
self.context.re_sleep_attempt_time = None
self.context.save()
# 到了尝试重新入睡的时间点
elif self._re_sleep_attempt_time and now >= self._re_sleep_attempt_time:
elif self.context.re_sleep_attempt_time and now >= self.context.re_sleep_attempt_time:
logger.info("被吵醒后经过一段时间,尝试重新入睡...")
if wakeup_manager:
sleep_pressure = wakeup_manager.context.sleep_pressure
@@ -257,48 +249,28 @@ class SleepManager:
if sleep_pressure >= pressure_threshold:
logger.info("睡眠压力足够,从被吵醒状态转换到准备入睡。")
buffer_seconds = random.randint(3 * 60, 8 * 60)
self._sleep_buffer_end_time = now + timedelta(seconds=buffer_seconds)
self._current_state = SleepState.PREPARING_SLEEP
self._re_sleep_attempt_time = None
self.context.sleep_buffer_end_time = now + timedelta(seconds=buffer_seconds)
self.context.current_state = SleepState.PREPARING_SLEEP
self.context.re_sleep_attempt_time = None
else:
# 睡眠压力不足,延迟一段时间后再次尝试
delay_minutes = 15
self._re_sleep_attempt_time = now + timedelta(minutes=delay_minutes)
self.context.re_sleep_attempt_time = now + timedelta(minutes=delay_minutes)
logger.info(
f"睡眠压力({sleep_pressure:.1f})仍然较低,暂时保持清醒,在 {delay_minutes} 分钟后再次尝试。"
)
self._save_sleep_state()
self.context.save()
def reset_sleep_state_after_wakeup(self):
"""
当角色被用户消息等外部因素唤醒时调用此方法。
将状态强制转换为 WOKEN_UP并设置一个延迟之后会尝试重新入睡。
"""
if self._current_state in [SleepState.PREPARING_SLEEP, SleepState.SLEEPING, SleepState.INSOMNIA]:
if self.context.current_state in [SleepState.PREPARING_SLEEP, SleepState.SLEEPING, SleepState.INSOMNIA]:
logger.info("被唤醒,进入 WOKEN_UP 状态!")
self._current_state = SleepState.WOKEN_UP
self._sleep_buffer_end_time = None
self.context.current_state = SleepState.WOKEN_UP
self.context.sleep_buffer_end_time = None
re_sleep_delay_minutes = getattr(global_config.sleep_system, "re_sleep_delay_minutes", 10)
self._re_sleep_attempt_time = datetime.now() + timedelta(minutes=re_sleep_delay_minutes)
self.context.re_sleep_attempt_time = datetime.now() + timedelta(minutes=re_sleep_delay_minutes)
logger.info(f"将在 {re_sleep_delay_minutes} 分钟后尝试重新入睡。")
self._save_sleep_state()
def _save_sleep_state(self):
"""将当前所有睡眠相关的状态打包并保存到本地存储。"""
state_data = {
"_current_state": self._current_state,
"_sleep_buffer_end_time": self._sleep_buffer_end_time,
"_total_delayed_minutes_today": self._total_delayed_minutes_today,
"_last_sleep_check_date": self._last_sleep_check_date,
"_re_sleep_attempt_time": self._re_sleep_attempt_time,
}
SleepStateSerializer.save(state_data)
def _load_sleep_state(self):
"""从本地存储加载并恢复所有睡眠相关的状态。"""
state_data = SleepStateSerializer.load()
self._current_state = state_data["_current_state"]
self._sleep_buffer_end_time = state_data["_sleep_buffer_end_time"]
self._total_delayed_minutes_today = state_data["_total_delayed_minutes_today"]
self._last_sleep_check_date = state_data["_last_sleep_check_date"]
self._re_sleep_attempt_time = state_data["_re_sleep_attempt_time"]
self.context.save()

View File

@@ -1,5 +1,7 @@
from enum import Enum, auto
from datetime import datetime
from datetime import datetime, date
from typing import Optional
from src.common.logger import get_logger
from src.manager.local_store_manager import local_storage
@@ -19,92 +21,66 @@ class SleepState(Enum):
WOKEN_UP = auto() # 被吵醒状态
class SleepStateSerializer:
class SleepContext:
"""
睡眠状态序列化器
负责将内存中的睡眠状态对象持久化到本地存储如JSON文件
以及在程序启动时从本地存储中恢复状态。
这样可以确保即使程序重启,角色的睡眠状态也能得以保留。
睡眠上下文,负责封装和管理所有与睡眠相关的状态,并处理其持久化
"""
@staticmethod
def save(state_data: dict):
"""
将当前的睡眠状态数据保存到本地存储。
def __init__(self):
"""初始化睡眠上下文,并从本地存储加载初始状态。"""
self.current_state: SleepState = SleepState.AWAKE
self.sleep_buffer_end_time: Optional[datetime] = None
self.total_delayed_minutes_today: float = 0.0
self.last_sleep_check_date: Optional[date] = None
self.re_sleep_attempt_time: Optional[datetime] = None
self.load()
Args:
state_data (dict): 包含睡眠状态信息的字典。
datetime对象会被转换为时间戳Enum成员会被转换为其名称字符串。
"""
def save(self):
"""将当前的睡眠状态数据保存到本地存储。"""
try:
# 准备要序列化的数据字典
state = {
# 保存当前状态的枚举名称
"current_state": state_data["_current_state"].name,
# 将datetime对象转换为Unix时间戳以便序列化
"sleep_buffer_end_time_ts": state_data["_sleep_buffer_end_time"].timestamp()
if state_data["_sleep_buffer_end_time"]
"current_state": self.current_state.name,
"sleep_buffer_end_time_ts": self.sleep_buffer_end_time.timestamp()
if self.sleep_buffer_end_time
else None,
"total_delayed_minutes_today": state_data["_total_delayed_minutes_today"],
# 将date对象转换为ISO格式的字符串
"last_sleep_check_date_str": state_data["_last_sleep_check_date"].isoformat()
if state_data["_last_sleep_check_date"]
"total_delayed_minutes_today": self.total_delayed_minutes_today,
"last_sleep_check_date_str": self.last_sleep_check_date.isoformat()
if self.last_sleep_check_date
else None,
"re_sleep_attempt_time_ts": state_data["_re_sleep_attempt_time"].timestamp()
if state_data["_re_sleep_attempt_time"]
"re_sleep_attempt_time_ts": self.re_sleep_attempt_time.timestamp()
if self.re_sleep_attempt_time
else None,
}
# 写入本地存储
local_storage["schedule_sleep_state"] = state
logger.debug(f"已保存睡眠状态: {state}")
logger.debug(f"已保存睡眠上下文: {state}")
except Exception as e:
logger.error(f"保存睡眠状态失败: {e}")
logger.error(f"保存睡眠上下文失败: {e}")
@staticmethod
def load() -> dict:
"""
从本地存储加载并解析睡眠状态。
Returns:
dict: 包含恢复后睡眠状态信息的字典。
如果加载失败或没有找到数据,则返回一个默认的清醒状态。
"""
# 定义一个默认的状态,以防加载失败
state_data = {
"_current_state": SleepState.AWAKE,
"_sleep_buffer_end_time": None,
"_total_delayed_minutes_today": 0,
"_last_sleep_check_date": None,
"_re_sleep_attempt_time": None,
}
def load(self):
"""从本地存储加载并解析睡眠状态。"""
try:
# 从本地存储读取数据
state = local_storage["schedule_sleep_state"]
if state and isinstance(state, dict):
# 恢复当前状态枚举
state_name = state.get("current_state")
if state_name and hasattr(SleepState, state_name):
state_data["_current_state"] = SleepState[state_name]
if not (state and isinstance(state, dict)):
logger.info("未找到本地睡眠上下文,使用默认值。")
return
# 从时间戳恢复datetime对象
end_time_ts = state.get("sleep_buffer_end_time_ts")
if end_time_ts:
state_data["_sleep_buffer_end_time"] = datetime.fromtimestamp(end_time_ts)
state_name = state.get("current_state")
if state_name and hasattr(SleepState, state_name):
self.current_state = SleepState[state_name]
# 恢复重新入睡尝试时间
re_sleep_ts = state.get("re_sleep_attempt_time_ts")
if re_sleep_ts:
state_data["_re_sleep_attempt_time"] = datetime.fromtimestamp(re_sleep_ts)
end_time_ts = state.get("sleep_buffer_end_time_ts")
if end_time_ts:
self.sleep_buffer_end_time = datetime.fromtimestamp(end_time_ts)
# 恢复今日延迟睡眠总分钟数
state_data["_total_delayed_minutes_today"] = state.get("total_delayed_minutes_today", 0)
re_sleep_ts = state.get("re_sleep_attempt_time_ts")
if re_sleep_ts:
self.re_sleep_attempt_time = datetime.fromtimestamp(re_sleep_ts)
# 从ISO格式字符串恢复date对象
date_str = state.get("last_sleep_check_date_str")
if date_str:
state_data["_last_sleep_check_date"] = datetime.fromisoformat(date_str).date()
self.total_delayed_minutes_today = state.get("total_delayed_minutes_today", 0.0)
logger.info(f"成功从本地存储加载睡眠状态: {state}")
date_str = state.get("last_sleep_check_date_str")
if date_str:
self.last_sleep_check_date = datetime.fromisoformat(date_str).date()
logger.info(f"成功从本地存储加载睡眠上下文: {state}")
except Exception as e:
# 如果加载过程中出现任何问题,记录警告并返回默认状态
logger.warning(f"加载睡眠状态失败,将使用默认值: {e}")
return state_data
logger.warning(f"加载睡眠上下文失败,将使用默认值: {e}")

View File

@@ -0,0 +1,45 @@
import time
from src.common.logger import get_logger
from src.manager.local_store_manager import local_storage
logger = get_logger("wakeup_context")
class WakeUpContext:
"""
唤醒上下文,负责封装和管理所有与唤醒相关的状态,并处理其持久化。
"""
def __init__(self):
"""初始化唤醒上下文,并从本地存储加载初始状态。"""
self.wakeup_value: float = 0.0
self.is_angry: bool = False
self.angry_start_time: float = 0.0
self.sleep_pressure: float = 100.0 # 新增:睡眠压力
self.load()
def _get_storage_key(self) -> str:
"""获取本地存储键"""
return "global_wakeup_manager_state"
def load(self):
"""从本地存储加载状态"""
state = local_storage[self._get_storage_key()]
if state and isinstance(state, dict):
self.wakeup_value = state.get("wakeup_value", 0.0)
self.is_angry = state.get("is_angry", False)
self.angry_start_time = state.get("angry_start_time", 0.0)
self.sleep_pressure = state.get("sleep_pressure", 100.0)
logger.info(f"成功从本地存储加载唤醒上下文: {state}")
else:
logger.info("未找到本地唤醒上下文,将使用默认值初始化。")
def save(self):
"""将当前状态保存到本地存储"""
state = {
"wakeup_value": self.wakeup_value,
"is_angry": self.is_angry,
"angry_start_time": self.angry_start_time,
"sleep_pressure": self.sleep_pressure,
}
local_storage[self._get_storage_key()] = state
logger.debug(f"已将唤醒上下文保存到本地存储: {state}")

View File

@@ -4,6 +4,7 @@ from typing import Optional, TYPE_CHECKING
from src.common.logger import get_logger
from src.config.config import global_config
from src.manager.local_store_manager import local_storage
from src.chat.message_manager.sleep_manager.wakeup_context import WakeUpContext
if TYPE_CHECKING:
from .sleep_manager import SleepManager
@@ -26,10 +27,8 @@ class WakeUpManager:
- 控制愤怒状态的持续时间
"""
self.sleep_manager = sleep_manager
self.wakeup_value = 0.0 # 当前唤醒度
self.is_angry = False # 是否处于愤怒状态
self.angry_start_time = 0.0 # 愤怒状态开始时间
self.last_decay_time = time.time() # 上次衰减时间
self.context = WakeUpContext() # 使用新的上下文管理器
self.last_decay_time = time.time()
self._decay_task: Optional[asyncio.Task] = None
self.is_running = False
self.last_log_time = 0
@@ -46,33 +45,6 @@ class WakeUpManager:
self.enabled = sleep_config.enable
self.angry_prompt = sleep_config.angry_prompt
self._load_wakeup_state()
def _get_storage_key(self) -> str:
"""获取本地存储键"""
return "global_wakeup_manager_state"
def _load_wakeup_state(self):
"""从本地存储加载状态"""
state = local_storage[self._get_storage_key()]
if state and isinstance(state, dict):
self.wakeup_value = state.get("wakeup_value", 0.0)
self.is_angry = state.get("is_angry", False)
self.angry_start_time = state.get("angry_start_time", 0.0)
logger.info(f"成功从本地存储加载唤醒状态: {state}")
else:
logger.info("未找到本地唤醒状态,将使用默认值初始化。")
def _save_wakeup_state(self):
"""将当前状态保存到本地存储"""
state = {
"wakeup_value": self.wakeup_value,
"is_angry": self.is_angry,
"angry_start_time": self.angry_start_time,
}
local_storage[self._get_storage_key()] = state
logger.debug(f"已将唤醒状态保存到本地存储: {state}")
async def start(self):
"""启动唤醒度管理器"""
if not self.enabled:
@@ -111,21 +83,21 @@ class WakeUpManager:
current_time = time.time()
# 检查愤怒状态是否过期
if self.is_angry and current_time - self.angry_start_time >= self.angry_duration:
self.is_angry = False
if self.context.is_angry and current_time - self.context.angry_start_time >= self.angry_duration:
self.context.is_angry = False
# 通知情绪管理系统清除愤怒状态
from src.mood.mood_manager import mood_manager
mood_manager.clear_angry_from_wakeup("global_mood")
logger.info("愤怒状态结束,恢复正常")
self._save_wakeup_state()
self.context.save()
# 唤醒度自然衰减
if self.wakeup_value > 0:
old_value = self.wakeup_value
self.wakeup_value = max(0, self.wakeup_value - self.decay_rate)
if old_value != self.wakeup_value:
logger.debug(f"唤醒度衰减: {old_value:.1f} -> {self.wakeup_value:.1f}")
self._save_wakeup_state()
if self.context.wakeup_value > 0:
old_value = self.context.wakeup_value
self.context.wakeup_value = max(0, self.context.wakeup_value - self.decay_rate)
if old_value != self.context.wakeup_value:
logger.debug(f"唤醒度衰减: {old_value:.1f} -> {self.context.wakeup_value:.1f}")
self.context.save()
def add_wakeup_value(self, is_private_chat: bool, is_mentioned: bool = False) -> bool:
"""
@@ -149,15 +121,15 @@ class WakeUpManager:
if current_sleep_state != SleepState.SLEEPING:
return False
old_value = self.wakeup_value
old_value = self.context.wakeup_value
if is_private_chat:
# 私聊每条消息都增加唤醒度
self.wakeup_value += self.private_message_increment
self.context.wakeup_value += self.private_message_increment
logger.debug(f"私聊消息增加唤醒度: +{self.private_message_increment}")
elif is_mentioned:
# 群聊只有被艾特才增加唤醒度
self.wakeup_value += self.group_mention_increment
self.context.wakeup_value += self.group_mention_increment
logger.debug(f"群聊艾特增加唤醒度: +{self.group_mention_increment}")
else:
# 群聊未被艾特,不增加唤醒度
@@ -166,29 +138,29 @@ class WakeUpManager:
current_time = time.time()
if current_time - self.last_log_time > self.log_interval:
logger.info(
f"唤醒度变化: {old_value:.1f} -> {self.wakeup_value:.1f} (阈值: {self.wakeup_threshold})"
f"唤醒度变化: {old_value:.1f} -> {self.context.wakeup_value:.1f} (阈值: {self.wakeup_threshold})"
)
self.last_log_time = current_time
else:
logger.debug(
f"唤醒度变化: {old_value:.1f} -> {self.wakeup_value:.1f} (阈值: {self.wakeup_threshold})"
f"唤醒度变化: {old_value:.1f} -> {self.context.wakeup_value:.1f} (阈值: {self.wakeup_threshold})"
)
# 检查是否达到唤醒阈值
if self.wakeup_value >= self.wakeup_threshold:
if self.context.wakeup_value >= self.wakeup_threshold:
self._trigger_wakeup()
return True
self._save_wakeup_state()
self.context.save()
return False
def _trigger_wakeup(self):
"""触发唤醒,进入愤怒状态"""
self.is_angry = True
self.angry_start_time = time.time()
self.wakeup_value = 0.0 # 重置唤醒度
self.context.is_angry = True
self.context.angry_start_time = time.time()
self.context.wakeup_value = 0.0 # 重置唤醒度
self._save_wakeup_state()
self.context.save()
# 通知情绪管理系统进入愤怒状态
from src.mood.mood_manager import mood_manager
@@ -201,30 +173,30 @@ class WakeUpManager:
def get_angry_prompt_addition(self) -> str:
"""获取愤怒状态下的提示词补充"""
if self.is_angry:
if self.context.is_angry:
return self.angry_prompt
return ""
def is_in_angry_state(self) -> bool:
"""检查是否处于愤怒状态"""
if self.is_angry:
if self.context.is_angry:
current_time = time.time()
if current_time - self.angry_start_time >= self.angry_duration:
self.is_angry = False
if current_time - self.context.angry_start_time >= self.angry_duration:
self.context.is_angry = False
# 通知情绪管理系统清除愤怒状态
from src.mood.mood_manager import mood_manager
mood_manager.clear_angry_from_wakeup("global_mood")
logger.info("愤怒状态自动过期")
return False
return self.is_angry
return self.context.is_angry
def get_status_info(self) -> dict:
"""获取当前状态信息"""
return {
"wakeup_value": self.wakeup_value,
"wakeup_value": self.context.wakeup_value,
"wakeup_threshold": self.wakeup_threshold,
"is_angry": self.is_angry,
"angry_remaining_time": max(0, self.angry_duration - (time.time() - self.angry_start_time))
if self.is_angry
"is_angry": self.context.is_angry,
"angry_remaining_time": max(0, self.angry_duration - (time.time() - self.context.angry_start_time))
if self.context.is_angry
else 0,
}