实现了新的睡眠系统喵~
This commit is contained in:
committed by
Windpicker-owo
parent
8b88082031
commit
6827b740b5
56
src/plugins/built_in/sleep_system/__init__.py
Normal file
56
src/plugins/built_in/sleep_system/__init__.py
Normal file
@@ -0,0 +1,56 @@
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from bot.plugin import Plugin
|
||||
from bot.plugin.meta import Meta
|
||||
from bot.plugin.plugin_config import PluginConfig
|
||||
|
||||
from .config import SleepSystemConfig
|
||||
from .state_manager import StateManager
|
||||
from .sleep_logic import SleepLogic
|
||||
from .tasks import SleepCycleTask
|
||||
|
||||
# 日志配置
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# 全局任务变量
|
||||
sleep_task: Optional[SleepCycleTask] = None
|
||||
sleep_logic_instance: Optional[SleepLogic] = None
|
||||
|
||||
class SleepSystemPlugin(Plugin):
|
||||
|
||||
def on_load(self) -> None:
|
||||
global sleep_task, sleep_logic_instance
|
||||
logger.info("睡眠系统插件正在加载...")
|
||||
|
||||
# 1. 加载配置
|
||||
config = self.get_config(SleepSystemConfig)
|
||||
|
||||
# 2. 初始化状态管理器
|
||||
state_file = Path("data/sleep_system_state.json")
|
||||
state_manager = StateManager(state_file_path=state_file)
|
||||
|
||||
# 3. 初始化核心逻辑
|
||||
sleep_logic_instance = SleepLogic(config=config, state_manager=state_manager)
|
||||
|
||||
# 4. 初始化并启动定时任务
|
||||
sleep_task = SleepCycleTask(sleep_logic=sleep_logic_instance, interval_seconds=30)
|
||||
sleep_task.start()
|
||||
|
||||
logger.info("睡眠系统插件加载完成,定时任务已启动。")
|
||||
|
||||
def on_unload(self) -> None:
|
||||
global sleep_task, sleep_logic_instance
|
||||
logger.info("睡眠系统插件正在卸载...")
|
||||
if sleep_task:
|
||||
sleep_task.stop()
|
||||
sleep_logic_instance = None
|
||||
logger.info("睡眠系统插件已卸载,定时任务已停止。")
|
||||
|
||||
def get_meta(self) -> Meta:
|
||||
return Meta(
|
||||
name="睡眠系统",
|
||||
description="一个基于状态机的、行为可预测的睡眠系统。",
|
||||
author="Kilo Code",
|
||||
version="1.0.0",
|
||||
)
|
||||
9
src/plugins/built_in/sleep_system/api.py
Normal file
9
src/plugins/built_in/sleep_system/api.py
Normal file
@@ -0,0 +1,9 @@
|
||||
import asyncio
|
||||
from . import sleep_logic_instance
|
||||
|
||||
def on_message_received():
|
||||
"""
|
||||
当接收到用户消息时调用此函数,用于处理睡眠中断。
|
||||
"""
|
||||
if sleep_logic_instance:
|
||||
asyncio.create_task(sleep_logic_instance.handle_external_event())
|
||||
18
src/plugins/built_in/sleep_system/config.py
Normal file
18
src/plugins/built_in/sleep_system/config.py
Normal file
@@ -0,0 +1,18 @@
|
||||
from typing import Tuple
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class SleepSystemConfig(BaseModel):
|
||||
# 睡眠时间段,格式为 (时, 分)
|
||||
sleep_time: Tuple[int, int] = Field(default=(23, 0), description="每日固定的入睡时间点")
|
||||
wake_up_time: Tuple[int, int] = Field(default=(7, 0), description="每日固定的唤醒时间点")
|
||||
|
||||
# 睡前准备时间(分钟)
|
||||
prepare_sleep_duration: int = Field(default=15, ge=5, le=30, description="进入睡眠状态前的准备时间(分钟)")
|
||||
|
||||
# 失眠设置
|
||||
insomnia_probability: float = Field(default=0.1, ge=0, le=1, description="在睡眠状态下触发失眠的概率")
|
||||
insomnia_duration_minutes: Tuple[int, int] = Field(default=(10, 30), description="失眠状态的持续时间范围(分钟)")
|
||||
|
||||
# 被吵醒设置
|
||||
woken_up_cooldown_minutes: int = Field(default=10, description="被吵醒后尝试重新入睡的冷却时间(分钟)")
|
||||
2
src/plugins/built_in/sleep_system/py.typed
Normal file
2
src/plugins/built_in/sleep_system/py.typed
Normal file
@@ -0,0 +1,2 @@
|
||||
# This file is intentionally left empty.
|
||||
# It is a marker file that tells MyPy to perform type checking on this package.
|
||||
134
src/plugins/built_in/sleep_system/sleep_logic.py
Normal file
134
src/plugins/built_in/sleep_system/sleep_logic.py
Normal file
@@ -0,0 +1,134 @@
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
from datetime import datetime, time as dt_time, timedelta
|
||||
from .config import SleepSystemConfig
|
||||
from .state_manager import StateManager, SleepState
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class SleepLogic:
|
||||
"""
|
||||
实现睡眠系统的核心状态机逻辑。
|
||||
"""
|
||||
def __init__(self, config: SleepSystemConfig, state_manager: StateManager):
|
||||
self.config = config
|
||||
self.state_manager = state_manager
|
||||
|
||||
async def update_state(self) -> None:
|
||||
"""
|
||||
核心更新函数,由定时任务调用。
|
||||
根据当前时间和状态,决定是否进行状态转换。
|
||||
"""
|
||||
current_state = await self.state_manager.get_state()
|
||||
now = datetime.now()
|
||||
|
||||
handler = getattr(self, f"_handle_{current_state.current_state.lower()}", self._handle_unknown)
|
||||
await handler(current_state, now)
|
||||
|
||||
def _is_in_sleep_time_range(self, now: datetime) -> bool:
|
||||
"""检查当前时间是否在理论睡眠时间范围内"""
|
||||
wake_up_time = dt_time(self.config.wake_up_time[0], self.config.wake_up_time[1])
|
||||
sleep_time = dt_time(self.config.sleep_time[0], self.config.sleep_time[1])
|
||||
now_time = now.time()
|
||||
|
||||
if sleep_time > wake_up_time: # 跨天睡眠
|
||||
return now_time >= sleep_time or now_time < wake_up_time
|
||||
else: # 当天睡眠
|
||||
return sleep_time <= now_time < wake_up_time
|
||||
|
||||
async def _handle_awake(self, state: SleepState, now: datetime):
|
||||
"""处理 AWAKE 状态的逻辑"""
|
||||
# 检查是否到了准备睡觉的时间
|
||||
sleep_datetime = datetime.combine(now.date(), dt_time(self.config.sleep_time[0], self.config.sleep_time[1]))
|
||||
prepare_start_time = sleep_datetime - timedelta(minutes=self.config.prepare_sleep_duration)
|
||||
|
||||
if prepare_start_time <= now < sleep_datetime:
|
||||
await self._transition_to(state, "PREPARING_SLEEP", duration_minutes=self.config.prepare_sleep_duration)
|
||||
logger.info("时间已到,进入睡前准备状态。")
|
||||
# 在这里可以触发“准备睡觉”的情绪或回复
|
||||
|
||||
async def _handle_preparing_sleep(self, state: SleepState, now: datetime):
|
||||
"""处理 PREPARING_SLEEP 状态的逻辑"""
|
||||
if state.state_end_time and now.timestamp() >= state.state_end_time:
|
||||
# 准备时间结束,进入睡眠
|
||||
if self._is_in_sleep_time_range(now):
|
||||
await self._transition_to(state, "SLEEPING")
|
||||
logger.info("准备时间结束,已进入睡眠状态。")
|
||||
else:
|
||||
await self._transition_to(state, "AWAKE")
|
||||
logger.info("准备期间离开了理论睡眠时间,返回 AWAKE 状态。")
|
||||
|
||||
async def _handle_sleeping(self, state: SleepState, now: datetime):
|
||||
"""处理 SLEEPING 状态的逻辑"""
|
||||
# 检查是否到了起床时间
|
||||
if not self._is_in_sleep_time_range(now):
|
||||
await self._transition_to(state, "AWAKE")
|
||||
logger.info("理论睡眠时间结束,已切换到 AWAKE 状态。")
|
||||
# 在这里可以触发“睡醒”的情绪
|
||||
return
|
||||
|
||||
# 根据概率随机触发失眠
|
||||
if random.random() < self.config.insomnia_probability:
|
||||
duration = random.randint(self.config.insomnia_duration_minutes[0], self.config.insomnia_duration_minutes[1])
|
||||
await self._transition_to(state, "INSOMNIA", duration_minutes=duration)
|
||||
logger.info(f"随机触发失眠,持续 {duration} 分钟。")
|
||||
# 在这里可以触发“烦躁”的情绪
|
||||
|
||||
async def _handle_insomnia(self, state: SleepState, now: datetime):
|
||||
"""处理 INSOMNIA 状态的逻辑"""
|
||||
# 检查失眠时间是否结束
|
||||
if state.state_end_time and now.timestamp() >= state.state_end_time:
|
||||
await self._transition_to(state, "SLEEPING")
|
||||
logger.info("失眠时间结束,返回睡眠状态。")
|
||||
# 如果在失眠期间就到了起床时间,直接唤醒
|
||||
elif not self._is_in_sleep_time_range(now):
|
||||
await self._transition_to(state, "AWAKE")
|
||||
logger.info("在失眠期间到达起床时间,已唤醒。")
|
||||
|
||||
async def _handle_woken_up(self, state: SleepState, now: datetime):
|
||||
"""处理 WOKEN_UP 状态的逻辑"""
|
||||
# 检查冷却时间是否结束
|
||||
if state.state_end_time and now.timestamp() >= state.state_end_time:
|
||||
if self._is_in_sleep_time_range(now):
|
||||
await self._transition_to(state, "PREPARING_SLEEP", duration_minutes=self.config.prepare_sleep_duration)
|
||||
logger.info("被吵醒冷却时间结束,尝试重新入睡。")
|
||||
else:
|
||||
await self._transition_to(state, "AWAKE")
|
||||
logger.info("被吵醒后到达起床时间,已唤醒。")
|
||||
|
||||
async def _handle_unknown(self, state: SleepState, now: datetime):
|
||||
"""处理未知状态"""
|
||||
logger.warning(f"检测到未知的睡眠状态: {state.current_state}。将重置为 AWAKE。")
|
||||
await self._transition_to(state, "AWAKE")
|
||||
|
||||
async def handle_external_event(self):
|
||||
"""处理外部事件,例如收到用户消息"""
|
||||
current_state = await self.state_manager.get_state()
|
||||
if current_state.current_state in ["SLEEPING", "INSOMNIA"]:
|
||||
await self._transition_to(current_state, "WOKEN_UP", duration_minutes=self.config.woken_up_cooldown_minutes)
|
||||
logger.info("在睡眠中被外部事件打断,进入 WOKEN_UP 状态。")
|
||||
# 在这里可以触发“起床气”情绪
|
||||
|
||||
async def _transition_to(self, old_state: SleepState, new_state_name: str, duration_minutes: int = 0):
|
||||
"""
|
||||
状态转换的统一处理函数。
|
||||
|
||||
Args:
|
||||
old_state: 转换前的状态对象。
|
||||
new_state_name: 新状态的名称。
|
||||
duration_minutes: 新状态的持续时间(分钟),如果为0则不设结束时间。
|
||||
"""
|
||||
current_timestamp = time.time()
|
||||
new_end_time = None
|
||||
if duration_minutes > 0:
|
||||
new_end_time = current_timestamp + duration_minutes * 60
|
||||
|
||||
new_state = SleepState(
|
||||
current_state=new_state_name,
|
||||
state_end_time=new_end_time,
|
||||
last_updated=current_timestamp,
|
||||
metadata=old_state.metadata # 继承 metadata
|
||||
)
|
||||
await self.state_manager.save_state(new_state)
|
||||
logger.info(f"睡眠状态已从 {old_state.current_state} 转换为 {new_state_name}。")
|
||||
83
src/plugins/built_in/sleep_system/state_manager.py
Normal file
83
src/plugins/built_in/sleep_system/state_manager.py
Normal file
@@ -0,0 +1,83 @@
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Optional, Dict, Any
|
||||
from pydantic import BaseModel, Field
|
||||
import asyncio
|
||||
|
||||
# 设置日志记录
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class SleepState(BaseModel):
|
||||
"""定义睡眠状态的数据模型"""
|
||||
current_state: str = Field(default="AWAKE", description="当前的睡眠状态")
|
||||
state_end_time: Optional[float] = Field(default=None, description="当前状态的预计结束时间戳")
|
||||
last_updated: float = Field(description="状态最后更新的时间戳")
|
||||
metadata: Dict[str, Any] = Field(default={}, description="用于存储额外状态信息的字典")
|
||||
|
||||
class StateManager:
|
||||
"""
|
||||
负责睡眠状态的持久化管理。
|
||||
将状态以 JSON 格式读/写到本地文件,以降低耦合。
|
||||
"""
|
||||
def __init__(self, state_file_path: Path):
|
||||
self.state_file_path = state_file_path
|
||||
self._state: Optional[SleepState] = None
|
||||
self._lock = asyncio.Lock()
|
||||
self._load_state()
|
||||
|
||||
def _load_state(self) -> None:
|
||||
"""从文件加载状态,如果文件不存在或为空,则创建默认状态"""
|
||||
try:
|
||||
if self.state_file_path.exists() and self.state_file_path.stat().st_size > 0:
|
||||
with open(self.state_file_path, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
self._state = SleepState(**data)
|
||||
logger.info(f"睡眠状态已从 {self.state_file_path} 加载。")
|
||||
else:
|
||||
self._create_default_state()
|
||||
except (json.JSONDecodeError, TypeError) as e:
|
||||
logger.warning(f"无法解析状态文件 {self.state_file_path}: {e}。将创建新的默认状态。")
|
||||
self._create_default_state()
|
||||
except Exception as e:
|
||||
logger.error(f"加载睡眠状态时发生未知错误: {e}")
|
||||
self._create_default_state()
|
||||
|
||||
def _create_default_state(self) -> None:
|
||||
"""创建一个默认的清醒状态"""
|
||||
import time
|
||||
self._state = SleepState(last_updated=time.time())
|
||||
logger.info("未找到现有状态文件,已创建默认的睡眠状态 (AWAKE)。")
|
||||
# 立即保存一次,以确保文件被创建
|
||||
asyncio.create_task(self.save_state())
|
||||
|
||||
async def get_state(self) -> SleepState:
|
||||
"""异步获取当前的状态"""
|
||||
async with self._lock:
|
||||
if self._state is None:
|
||||
self._load_state()
|
||||
# 此时 _state 必然已被 _load_state 或 _create_default_state 初始化
|
||||
assert self._state is not None, "State should be initialized here"
|
||||
return self._state.copy(deep=True)
|
||||
|
||||
async def save_state(self, new_state: Optional[SleepState] = None) -> None:
|
||||
"""
|
||||
异步保存当前状态到文件。
|
||||
如果提供了 new_state,则先更新内部状态。
|
||||
"""
|
||||
async with self._lock:
|
||||
if new_state:
|
||||
self._state = new_state
|
||||
|
||||
if self._state is None:
|
||||
logger.warning("尝试保存一个空的状态,操作已跳过。")
|
||||
return
|
||||
|
||||
try:
|
||||
# 确保目录存在
|
||||
self.state_file_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(self.state_file_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(self._state.dict(), f, indent=4, ensure_ascii=False)
|
||||
logger.debug(f"睡眠状态已成功保存到 {self.state_file_path}。")
|
||||
except Exception as e:
|
||||
logger.error(f"保存睡眠状态到 {self.state_file_path} 时失败: {e}")
|
||||
48
src/plugins/built_in/sleep_system/tasks.py
Normal file
48
src/plugins/built_in/sleep_system/tasks.py
Normal file
@@ -0,0 +1,48 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Optional
|
||||
from .sleep_logic import SleepLogic
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class SleepCycleTask:
|
||||
"""
|
||||
负责周期性地更新睡眠状态的后台任务。
|
||||
"""
|
||||
def __init__(self, sleep_logic: SleepLogic, interval_seconds: int = 30):
|
||||
self.sleep_logic = sleep_logic
|
||||
self.interval_seconds = interval_seconds
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
self._is_running = False
|
||||
|
||||
async def _run(self):
|
||||
"""任务的内部循环"""
|
||||
logger.info("睡眠系统周期性更新任务已启动。")
|
||||
while self._is_running:
|
||||
try:
|
||||
await self.sleep_logic.update_state()
|
||||
await asyncio.sleep(self.interval_seconds)
|
||||
except asyncio.CancelledError:
|
||||
logger.info("睡眠系统任务被取消。")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"睡眠系统任务在执行期间发生错误: {e}", exc_info=True)
|
||||
# 发生错误后,等待一段时间再继续,避免快速失败循环
|
||||
await asyncio.sleep(self.interval_seconds * 2)
|
||||
|
||||
def start(self):
|
||||
"""启动后台任务"""
|
||||
if not self._is_running:
|
||||
self._is_running = True
|
||||
self._task = asyncio.create_task(self._run())
|
||||
else:
|
||||
logger.warning("尝试启动一个已经在运行的睡眠系统任务。")
|
||||
|
||||
def stop(self):
|
||||
"""停止后台任务"""
|
||||
if self._is_running and self._task:
|
||||
self._is_running = False
|
||||
self._task.cancel()
|
||||
logger.info("睡眠系统周期性更新任务已请求停止。")
|
||||
else:
|
||||
logger.warning("尝试停止一个尚未启动的睡眠系统任务。")
|
||||
Reference in New Issue
Block a user