Revert "实现了新的睡眠系统喵~",我看了不如我回去自己写一个()

This reverts commit e1dde64fc6.
This commit is contained in:
minecraft1024a
2025-10-17 18:44:38 +08:00
committed by Windpicker-owo
parent dc3c6a235c
commit 0510ce9d55
14 changed files with 804 additions and 364 deletions

View File

@@ -1,49 +0,0 @@
import logging
from pathlib import Path
from typing import Optional
from src.plugin_system import BasePlugin, register_plugin
from .config import SleepSystemConfig
from .state_manager import StateManager
from .sleep_logic import SleepLogic
from .tasks import SleepCycleTask
# 日志配置
logger = logging.getLogger(__name__)
# 全局任务变量
sleep_task: Optional[SleepCycleTask] = None
sleep_logic_instance: Optional[SleepLogic] = None
@register_plugin
class SleepSystemPlugin(BasePlugin):
plugin_name: str = "sleep_system"
def on_load(self) -> None:
global sleep_task, sleep_logic_instance
logger.info("睡眠系统插件正在加载...")
# 1. 加载配置
config = self.get_config(self.plugin_name, SleepSystemConfig)
# 2. 初始化状态管理器
state_file = Path(f"data/{self.plugin_name}_state.json")
state_manager = StateManager(state_file_path=state_file)
# 3. 初始化核心逻辑
sleep_logic_instance = SleepLogic(config=config, state_manager=state_manager)
# 4. 初始化并启动定时任务
sleep_task = SleepCycleTask(sleep_logic=sleep_logic_instance, interval_seconds=30)
sleep_task.start()
logger.info("睡眠系统插件加载完成,定时任务已启动。")
def on_unload(self) -> None:
global sleep_task, sleep_logic_instance
logger.info("睡眠系统插件正在卸载...")
if sleep_task:
sleep_task.stop()
sleep_logic_instance = None
logger.info("睡眠系统插件已卸载,定时任务已停止。")

View File

@@ -1,9 +0,0 @@
import asyncio
from . import sleep_logic_instance
def on_message_received():
"""
当接收到用户消息时调用此函数,用于处理睡眠中断。
"""
if sleep_logic_instance:
asyncio.create_task(sleep_logic_instance.handle_external_event())

View File

@@ -1,18 +0,0 @@
from typing import Tuple
from pydantic import BaseModel, Field
class SleepSystemConfig(BaseModel):
# 睡眠时间段,格式为 (时, 分)
sleep_time: Tuple[int, int] = Field(default=(23, 0), description="每日固定的入睡时间点")
wake_up_time: Tuple[int, int] = Field(default=(7, 0), description="每日固定的唤醒时间点")
# 睡前准备时间(分钟)
prepare_sleep_duration: int = Field(default=15, ge=5, le=30, description="进入睡眠状态前的准备时间(分钟)")
# 失眠设置
insomnia_probability: float = Field(default=0.1, ge=0, le=1, description="在睡眠状态下触发失眠的概率")
insomnia_duration_minutes: Tuple[int, int] = Field(default=(10, 30), description="失眠状态的持续时间范围(分钟)")
# 被吵醒设置
woken_up_cooldown_minutes: int = Field(default=10, description="被吵醒后尝试重新入睡的冷却时间(分钟)")

View File

@@ -1,2 +0,0 @@
# This file is intentionally left empty.
# It is a marker file that tells MyPy to perform type checking on this package.

View File

@@ -1,134 +0,0 @@
import logging
import random
import time
from datetime import datetime, time as dt_time, timedelta
from .config import SleepSystemConfig
from .state_manager import StateManager, SleepState
logger = logging.getLogger(__name__)
class SleepLogic:
"""
实现睡眠系统的核心状态机逻辑。
"""
def __init__(self, config: SleepSystemConfig, state_manager: StateManager):
self.config = config
self.state_manager = state_manager
async def update_state(self) -> None:
"""
核心更新函数,由定时任务调用。
根据当前时间和状态,决定是否进行状态转换。
"""
current_state = await self.state_manager.get_state()
now = datetime.now()
handler = getattr(self, f"_handle_{current_state.current_state.lower()}", self._handle_unknown)
await handler(current_state, now)
def _is_in_sleep_time_range(self, now: datetime) -> bool:
"""检查当前时间是否在理论睡眠时间范围内"""
wake_up_time = dt_time(self.config.wake_up_time[0], self.config.wake_up_time[1])
sleep_time = dt_time(self.config.sleep_time[0], self.config.sleep_time[1])
now_time = now.time()
if sleep_time > wake_up_time: # 跨天睡眠
return now_time >= sleep_time or now_time < wake_up_time
else: # 当天睡眠
return sleep_time <= now_time < wake_up_time
async def _handle_awake(self, state: SleepState, now: datetime):
"""处理 AWAKE 状态的逻辑"""
# 检查是否到了准备睡觉的时间
sleep_datetime = datetime.combine(now.date(), dt_time(self.config.sleep_time[0], self.config.sleep_time[1]))
prepare_start_time = sleep_datetime - timedelta(minutes=self.config.prepare_sleep_duration)
if prepare_start_time <= now < sleep_datetime:
await self._transition_to(state, "PREPARING_SLEEP", duration_minutes=self.config.prepare_sleep_duration)
logger.info("时间已到,进入睡前准备状态。")
# 在这里可以触发“准备睡觉”的情绪或回复
async def _handle_preparing_sleep(self, state: SleepState, now: datetime):
"""处理 PREPARING_SLEEP 状态的逻辑"""
if state.state_end_time and now.timestamp() >= state.state_end_time:
# 准备时间结束,进入睡眠
if self._is_in_sleep_time_range(now):
await self._transition_to(state, "SLEEPING")
logger.info("准备时间结束,已进入睡眠状态。")
else:
await self._transition_to(state, "AWAKE")
logger.info("准备期间离开了理论睡眠时间,返回 AWAKE 状态。")
async def _handle_sleeping(self, state: SleepState, now: datetime):
"""处理 SLEEPING 状态的逻辑"""
# 检查是否到了起床时间
if not self._is_in_sleep_time_range(now):
await self._transition_to(state, "AWAKE")
logger.info("理论睡眠时间结束,已切换到 AWAKE 状态。")
# 在这里可以触发“睡醒”的情绪
return
# 根据概率随机触发失眠
if random.random() < self.config.insomnia_probability:
duration = random.randint(self.config.insomnia_duration_minutes[0], self.config.insomnia_duration_minutes[1])
await self._transition_to(state, "INSOMNIA", duration_minutes=duration)
logger.info(f"随机触发失眠,持续 {duration} 分钟。")
# 在这里可以触发“烦躁”的情绪
async def _handle_insomnia(self, state: SleepState, now: datetime):
"""处理 INSOMNIA 状态的逻辑"""
# 检查失眠时间是否结束
if state.state_end_time and now.timestamp() >= state.state_end_time:
await self._transition_to(state, "SLEEPING")
logger.info("失眠时间结束,返回睡眠状态。")
# 如果在失眠期间就到了起床时间,直接唤醒
elif not self._is_in_sleep_time_range(now):
await self._transition_to(state, "AWAKE")
logger.info("在失眠期间到达起床时间,已唤醒。")
async def _handle_woken_up(self, state: SleepState, now: datetime):
"""处理 WOKEN_UP 状态的逻辑"""
# 检查冷却时间是否结束
if state.state_end_time and now.timestamp() >= state.state_end_time:
if self._is_in_sleep_time_range(now):
await self._transition_to(state, "PREPARING_SLEEP", duration_minutes=self.config.prepare_sleep_duration)
logger.info("被吵醒冷却时间结束,尝试重新入睡。")
else:
await self._transition_to(state, "AWAKE")
logger.info("被吵醒后到达起床时间,已唤醒。")
async def _handle_unknown(self, state: SleepState, now: datetime):
"""处理未知状态"""
logger.warning(f"检测到未知的睡眠状态: {state.current_state}。将重置为 AWAKE。")
await self._transition_to(state, "AWAKE")
async def handle_external_event(self):
"""处理外部事件,例如收到用户消息"""
current_state = await self.state_manager.get_state()
if current_state.current_state in ["SLEEPING", "INSOMNIA"]:
await self._transition_to(current_state, "WOKEN_UP", duration_minutes=self.config.woken_up_cooldown_minutes)
logger.info("在睡眠中被外部事件打断,进入 WOKEN_UP 状态。")
# 在这里可以触发“起床气”情绪
async def _transition_to(self, old_state: SleepState, new_state_name: str, duration_minutes: int = 0):
"""
状态转换的统一处理函数。
Args:
old_state: 转换前的状态对象。
new_state_name: 新状态的名称。
duration_minutes: 新状态的持续时间分钟如果为0则不设结束时间。
"""
current_timestamp = time.time()
new_end_time = None
if duration_minutes > 0:
new_end_time = current_timestamp + duration_minutes * 60
new_state = SleepState(
current_state=new_state_name,
state_end_time=new_end_time,
last_updated=current_timestamp,
metadata=old_state.metadata # 继承 metadata
)
await self.state_manager.save_state(new_state)
logger.info(f"睡眠状态已从 {old_state.current_state} 转换为 {new_state_name}")

View File

@@ -1,83 +0,0 @@
import json
import logging
from pathlib import Path
from typing import Optional, Dict, Any
from pydantic import BaseModel, Field
import asyncio
# 设置日志记录
logger = logging.getLogger(__name__)
class SleepState(BaseModel):
"""定义睡眠状态的数据模型"""
current_state: str = Field(default="AWAKE", description="当前的睡眠状态")
state_end_time: Optional[float] = Field(default=None, description="当前状态的预计结束时间戳")
last_updated: float = Field(description="状态最后更新的时间戳")
metadata: Dict[str, Any] = Field(default={}, description="用于存储额外状态信息的字典")
class StateManager:
"""
负责睡眠状态的持久化管理。
将状态以 JSON 格式读/写到本地文件,以降低耦合。
"""
def __init__(self, state_file_path: Path):
self.state_file_path = state_file_path
self._state: Optional[SleepState] = None
self._lock = asyncio.Lock()
self._load_state()
def _load_state(self) -> None:
"""从文件加载状态,如果文件不存在或为空,则创建默认状态"""
try:
if self.state_file_path.exists() and self.state_file_path.stat().st_size > 0:
with open(self.state_file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
self._state = SleepState(**data)
logger.info(f"睡眠状态已从 {self.state_file_path} 加载。")
else:
self._create_default_state()
except (json.JSONDecodeError, TypeError) as e:
logger.warning(f"无法解析状态文件 {self.state_file_path}: {e}。将创建新的默认状态。")
self._create_default_state()
except Exception as e:
logger.error(f"加载睡眠状态时发生未知错误: {e}")
self._create_default_state()
def _create_default_state(self) -> None:
"""创建一个默认的清醒状态"""
import time
self._state = SleepState(last_updated=time.time())
logger.info("未找到现有状态文件,已创建默认的睡眠状态 (AWAKE)。")
# 立即保存一次,以确保文件被创建
asyncio.create_task(self.save_state())
async def get_state(self) -> SleepState:
"""异步获取当前的状态"""
async with self._lock:
if self._state is None:
self._load_state()
# 此时 _state 必然已被 _load_state 或 _create_default_state 初始化
assert self._state is not None, "State should be initialized here"
return self._state.copy(deep=True)
async def save_state(self, new_state: Optional[SleepState] = None) -> None:
"""
异步保存当前状态到文件。
如果提供了 new_state则先更新内部状态。
"""
async with self._lock:
if new_state:
self._state = new_state
if self._state is None:
logger.warning("尝试保存一个空的状态,操作已跳过。")
return
try:
# 确保目录存在
self.state_file_path.parent.mkdir(parents=True, exist_ok=True)
with open(self.state_file_path, 'w', encoding='utf-8') as f:
json.dump(self._state.dict(), f, indent=4, ensure_ascii=False)
logger.debug(f"睡眠状态已成功保存到 {self.state_file_path}")
except Exception as e:
logger.error(f"保存睡眠状态到 {self.state_file_path} 时失败: {e}")

View File

@@ -1,48 +0,0 @@
import asyncio
import logging
from typing import Optional
from .sleep_logic import SleepLogic
logger = logging.getLogger(__name__)
class SleepCycleTask:
"""
负责周期性地更新睡眠状态的后台任务。
"""
def __init__(self, sleep_logic: SleepLogic, interval_seconds: int = 30):
self.sleep_logic = sleep_logic
self.interval_seconds = interval_seconds
self._task: Optional[asyncio.Task] = None
self._is_running = False
async def _run(self):
"""任务的内部循环"""
logger.info("睡眠系统周期性更新任务已启动。")
while self._is_running:
try:
await self.sleep_logic.update_state()
await asyncio.sleep(self.interval_seconds)
except asyncio.CancelledError:
logger.info("睡眠系统任务被取消。")
break
except Exception as e:
logger.error(f"睡眠系统任务在执行期间发生错误: {e}", exc_info=True)
# 发生错误后,等待一段时间再继续,避免快速失败循环
await asyncio.sleep(self.interval_seconds * 2)
def start(self):
"""启动后台任务"""
if not self._is_running:
self._is_running = True
self._task = asyncio.create_task(self._run())
else:
logger.warning("尝试启动一个已经在运行的睡眠系统任务。")
def stop(self):
"""停止后台任务"""
if self._is_running and self._task:
self._is_running = False
self._task.cancel()
logger.info("睡眠系统周期性更新任务已请求停止。")
else:
logger.warning("尝试停止一个尚未启动的睡眠系统任务。")