This commit is contained in:
Windpicker-owo
2025-10-17 20:16:27 +08:00
17 changed files with 468 additions and 385 deletions

View File

@@ -338,9 +338,7 @@ class SingleStreamContextManager:
# 只有在第一次添加消息时才检测聊天类型,避免后续消息改变类型
if len(self.context.unread_messages) == 1: # 只有这条消息
# 如果消息包含群组信息,则为群聊
if hasattr(message, "chat_info_group_id") and message.chat_info_group_id:
self.context.chat_type = ChatType.GROUP
elif hasattr(message, "chat_info_group_name") and message.chat_info_group_name:
if message.chat_info.group_info:
self.context.chat_type = ChatType.GROUP
else:
self.context.chat_type = ChatType.PRIVATE

View File

@@ -17,11 +17,10 @@ from src.common.data_models.message_manager_data_model import MessageManagerStat
from src.common.logger import get_logger
from src.config.config import global_config
from src.plugin_system.apis.chat_api import get_chat_manager
from src.plugins.built_in.sleep_system.api import on_message_received
from .distribution_manager import stream_loop_manager
# from .sleep_manager.sleep_manager import SleepManager
# from .sleep_manager.wakeup_manager import WakeUpManager
from .sleep_system.state_manager import SleepState, sleep_state_manager
if TYPE_CHECKING:
pass
@@ -44,10 +43,6 @@ class MessageManager:
self.action_manager = ChatterActionManager()
self.chatter_manager = ChatterManager(self.action_manager)
# 初始化睡眠和唤醒管理器 (已被新插件取代)
# self.sleep_manager = SleepManager()
# self.wakeup_manager = WakeUpManager(self.sleep_manager)
# 消息缓存系统 - 直接集成到消息管理器
self.message_caches: Dict[str, deque] = defaultdict(deque) # 每个流的消息缓存
self.stream_processing_status: Dict[str, bool] = defaultdict(bool) # 流的处理状态
@@ -94,8 +89,8 @@ class MessageManager:
except Exception as e:
logger.error(f"启动自适应流管理器失败: {e}")
# 启动睡眠和唤醒管理器 (已被新插件取代)
# await self.wakeup_manager.start()
# 启动睡眠和唤醒管理器
# 睡眠系统的定时任务启动移至 main.py
# 启动流循环管理器并设置chatter_manager
await stream_loop_manager.start()
@@ -142,8 +137,6 @@ class MessageManager:
except Exception as e:
logger.error(f"停止自适应流管理器失败: {e}")
# 停止睡眠和唤醒管理器 (已被新插件取代)
# await self.wakeup_manager.stop()
# 停止流循环管理器
await stream_loop_manager.stop()
@@ -152,10 +145,15 @@ class MessageManager:
async def add_message(self, stream_id: str, message: DatabaseMessages):
"""添加消息到指定聊天流"""
try:
# 触发睡眠系统外部事件
on_message_received()
# 在消息处理的最前端检查睡眠状态
current_sleep_state = sleep_state_manager.get_current_state()
if current_sleep_state == SleepState.SLEEPING:
logger.info(f"处于 {current_sleep_state.name} 状态,消息被拦截。")
return # 直接返回,不处理消息
# TODO: 在这里为 WOKEN_UP_ANGRY 等未来状态添加特殊处理逻辑
try:
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(stream_id)
if not chat_stream:
@@ -339,14 +337,9 @@ class MessageManager:
inactive_streams.append(stream_id)
for stream_id in inactive_streams:
try:
# 修复: 直接通过 stream_id 获取 chat_stream避免潜在的未绑定问题
inactive_stream = chat_manager.streams.get(stream_id)
if inactive_stream:
await inactive_stream.context_manager.clear_context()
del chat_manager.streams[stream_id]
logger.info(f"清理不活跃聊天流: {stream_id}")
else:
logger.warning(f"尝试清理一个不存在的流: {stream_id}")
await chat_stream.context_manager.clear_context()
del chat_manager.streams[stream_id]
logger.info(f"清理不活跃聊天流: {stream_id}")
except Exception as e:
logger.error(f"清理聊天流 {stream_id} 失败: {e}")
if inactive_streams:

View File

@@ -0,0 +1,195 @@
from datetime import datetime, time, timedelta
import random
from typing import Optional, Tuple
from src.common.logger import get_logger
from src.config.config import global_config
from src.schedule.schedule_manager import schedule_manager
from .state_manager import SleepState, sleep_state_manager
logger = get_logger("sleep_logic")
class SleepLogic:
"""
核心睡眠逻辑,睡眠系统的“大脑”
负责根据当前的配置、时间、日程表以及状态,判断是否需要切换睡眠状态。
它本身是无状态的,所有的状态都读取和写入 SleepStateManager。
"""
def check_and_update_sleep_state(self):
"""
检查并更新当前的睡眠状态,这是整个逻辑的入口。
由定时任务周期性调用。
"""
current_state = sleep_state_manager.get_current_state()
now = datetime.now()
if current_state == SleepState.AWAKE:
self._check_should_fall_asleep(now)
elif current_state == SleepState.SLEEPING:
self._check_should_wake_up(now)
elif current_state == SleepState.INSOMNIA:
# TODO: 实现失眠逻辑
# 例如:检查失眠状态是否结束,如果结束则转换回 SLEEPING
pass
elif current_state == SleepState.WOKEN_UP_ANGRY:
# TODO: 实现起床气逻辑
# 例如:检查生气状态是否结束,如果结束则转换回 SLEEPING 或 AWAKE
pass
def _check_should_fall_asleep(self, now: datetime):
"""
当状态为 AWAKE 时,检查是否应该进入睡眠。
"""
should_sleep, wake_up_time = self._should_be_sleeping(now)
if should_sleep:
logger.info("判断结果:应进入睡眠状态。")
sleep_state_manager.set_state(SleepState.SLEEPING, wake_up=wake_up_time)
def _check_should_wake_up(self, now: datetime):
"""
当状态为 SLEEPING 时,检查是否应该醒来。
这里包含了处理跨天获取日程的核心逻辑。
"""
wake_up_time = sleep_state_manager.get_wake_up_time()
# 核心逻辑:两段式检测
# 如果 state_manager 中还没有起床时间,说明是昨晚入睡,需要等待今天凌晨的新日程。
sleep_start_time = sleep_state_manager.get_sleep_start_time()
if not wake_up_time:
if sleep_start_time and now.date() > sleep_start_time.date():
logger.debug("当前为睡眠状态但无起床时间,尝试从新日程中解析...")
_, new_wake_up_time = self._get_wakeup_times_from_schedule(now)
if new_wake_up_time:
logger.info(f"成功从新日程获取到起床时间: {new_wake_up_time.strftime('%H:%M')}")
sleep_state_manager.set_wake_up_time(new_wake_up_time)
wake_up_time = new_wake_up_time
else:
logger.debug("未能获取到新的起床时间,继续睡眠。")
return
else:
logger.info("还没有到达第二天,继续睡眠。")
logger.info(f"尚未到苏醒时间,苏醒时间在{wake_up_time}")
if wake_up_time and now >= wake_up_time:
logger.info(f"当前时间 {now.strftime('%H:%M')} 已到达或超过预定起床时间 {wake_up_time.strftime('%H:%M')}")
sleep_state_manager.set_state(SleepState.AWAKE)
def _should_be_sleeping(self, now: datetime) -> Tuple[bool, Optional[datetime]]:
"""
判断在当前时刻,是否应该处于睡眠时间。
Returns:
元组 (是否应该睡眠, 预期的起床时间或None)
"""
sleep_config = global_config.sleep_system
if not sleep_config.enable:
return False, None
sleep_time, wake_up_time = None, None
if sleep_config.sleep_by_schedule:
sleep_time, _ = self._get_sleep_times_from_schedule(now)
if not sleep_time:
logger.debug("日程表模式开启,但未找到睡眠时间,使用固定时间作为备用。")
sleep_time, wake_up_time = self._get_fixed_sleep_times(now)
else:
sleep_time, wake_up_time = self._get_fixed_sleep_times(now)
if not sleep_time:
return False, None
# 检查当前时间是否在睡眠时间范围内
if now >= sleep_time:
# 如果起床时间是第二天(通常情况),且当前时间小于起床时间,则在睡眠范围内
if wake_up_time and wake_up_time > sleep_time and now < wake_up_time:
return True, wake_up_time
# 如果当前时间大于入睡时间,说明已经进入睡眠窗口
return True, wake_up_time
return False, None
def _get_fixed_sleep_times(self, now: datetime) -> Tuple[Optional[datetime], Optional[datetime]]:
"""
当使用“固定时间”模式时,从此方法计算睡眠和起床时间。
会加入配置中的随机偏移量,让作息更自然。
"""
sleep_config = global_config.sleep_system
try:
sleep_offset = random.randint(
-sleep_config.sleep_time_offset_minutes, sleep_config.sleep_time_offset_minutes
)
wake_up_offset = random.randint(
-sleep_config.wake_up_time_offset_minutes, sleep_config.wake_up_time_offset_minutes
)
sleep_t = datetime.strptime(sleep_config.fixed_sleep_time, "%H:%M").time()
wake_up_t = datetime.strptime(sleep_config.fixed_wake_up_time, "%H:%M").time()
sleep_time = datetime.combine(now.date(), sleep_t) + timedelta(minutes=sleep_offset)
# 如果起床时间比睡觉时间早,说明是第二天
wake_up_day = now.date() + timedelta(days=1) if wake_up_t < sleep_t else now.date()
wake_up_time = datetime.combine(wake_up_day, wake_up_t) + timedelta(minutes=wake_up_offset)
return sleep_time, wake_up_time
except (ValueError, TypeError) as e:
logger.error(f"解析固定睡眠时间失败: {e}")
return None, None
def _get_sleep_times_from_schedule(self, now: datetime) -> Tuple[Optional[datetime], Optional[datetime]]:
"""
当使用“日程表”模式时,从此方法获取睡眠时间。
实现了核心逻辑:
- 解析“今天”日程中的睡觉时间。
"""
# 阶段一:获取当天的睡觉时间
today_schedule = schedule_manager.today_schedule
sleep_time = None
if today_schedule:
for event in today_schedule:
activity = event.get("activity", "").lower()
if "sleep" in activity or "睡觉" in activity or "休息" in activity:
try:
time_range = event.get("time_range", "")
start_str, _ = time_range.split("-")
sleep_t = datetime.strptime(start_str.strip(), "%H:%M").time()
sleep_time = datetime.combine(now.date(), sleep_t)
break
except (ValueError, AttributeError):
logger.warning(f"解析日程中的睡眠时间失败: {event}")
continue
wake_up_time = None
return sleep_time, wake_up_time
def _get_wakeup_times_from_schedule(self, now: datetime) -> Tuple[Optional[datetime], Optional[datetime]]:
"""
当使用“日程表”模式时,从此方法获取睡眠时间。
实现了核心逻辑:
- 解析“今天”日程中的睡觉时间。
"""
# 阶段一:获取当天的睡觉时间
today_schedule = schedule_manager.today_schedule
wake_up_time = None
if today_schedule:
for event in today_schedule:
activity = event.get("activity", "").lower()
if "wake_up" in activity or "醒来" in activity or "起床" in activity:
try:
time_range = event.get("time_range", "")
start_str, _ = time_range.split("-")
sleep_t = datetime.strptime(start_str.strip(), "%H:%M").time()
wake_up_time = datetime.combine(now.date(), sleep_t)
break
except (ValueError, AttributeError):
logger.warning(f"解析日程中的睡眠时间失败: {event}")
continue
return None, wake_up_time
# 全局单例
sleep_logic = SleepLogic()

View File

@@ -0,0 +1,190 @@
import enum
from datetime import datetime, timedelta
from typing import Any, Dict, Optional
from src.common.logger import get_logger
from src.manager.local_store_manager import local_storage
logger = get_logger("sleep_state_manager")
class SleepState(enum.Enum):
"""
定义了所有可能的睡眠状态。
使用枚举可以使状态管理更加清晰和安全。
"""
AWAKE = "awake" # 清醒状态,正常活动
SLEEPING = "sleeping" # 沉睡状态,此时应拦截消息
INSOMNIA = "insomnia" # 失眠状态(为未来功能预留)
WOKEN_UP_ANGRY = "woken_up_angry" # 被吵醒后的生气状态(为未来功能预留)
class SleepStateManager:
"""
睡眠状态管理器 (单例模式)
这是整个睡眠系统的数据核心,负责:
1. 管理当前的睡眠状态(如:是否在睡觉、唤醒度等)。
2. 将状态持久化到本地JSON文件(`local_store.json`),实现重启后状态不丢失。
3. 提供统一的接口供其他模块查询和修改睡眠状态。
"""
_instance = None
_STATE_KEY = "sleep_system_state" # 在 local_store.json 中存储的键名
def __new__(cls, *args, **kwargs):
# 实现单例模式,确保全局只有一个状态管理器实例
if not cls._instance:
cls._instance = super(SleepStateManager, cls).__new__(cls, *args, **kwargs)
return cls._instance
def __init__(self):
"""
初始化状态管理器,定义状态数据结构并从本地加载历史状态。
"""
self.state: Dict[str, Any] = {}
self._default_state()
self.load_state()
def _default_state(self):
"""
定义并重置为默认的“清醒”状态。
当机器人启动或从睡眠中醒来时调用。
"""
self.state = {
"state": SleepState.AWAKE.value,
"state_until": None, # 特殊状态(如生气)的自动结束时间
"sleep_start_time": None, # 本次睡眠的开始时间
"wake_up_time": None, # 预定的起床时间
"wakefulness": 0.0, # 唤醒度/清醒值,用于判断是否被吵醒
"last_checked": None, # 定时任务最后检查的时间
}
def load_state(self):
"""
程序启动时,从 local_storage 加载上一次的状态。
如果找不到历史状态,则初始化为默认状态。
"""
stored_state = local_storage[self._STATE_KEY]
if isinstance(stored_state, dict):
# 合并加载的状态,以防新增字段
self.state.update(stored_state)
# 确保 state 字段是枚举成员
if "state" in self.state and not isinstance(self.state["state"], SleepState):
try:
self.state["state"] = SleepState(self.state["state"])
except ValueError:
logger.warning(f"加载了无效的睡眠状态 '{self.state['state']}',重置为 AWAKE。")
self.state["state"] = SleepState.AWAKE
else:
self.state["state"] = SleepState.AWAKE # 兼容旧数据
logger.info(f"成功加载睡眠状态: {self.get_current_state().name}")
else:
logger.info("未找到已存储的睡眠状态,将使用默认值。")
self.save_state()
def save_state(self):
"""
将当前内存中的状态保存到 local_storage。
在保存前,会将枚举类型的 state 转换为字符串以便JSON序列化。
"""
data_to_save = self.state.copy()
# 将 state 枚举成员转换为它的值(字符串)
data_to_save["state"] = self.state["state"]
local_storage[self._STATE_KEY] = data_to_save
logger.debug(f"睡眠状态已保存: {data_to_save}")
def get_current_state(self) -> SleepState:
"""
获取当前的睡眠状态。
在返回状态前,会先检查特殊状态(如生气)是否已过期。
"""
# 检查特殊状态是否已过期
state_until_str = self.state.get("state_until")
if state_until_str:
state_until = datetime.fromisoformat(state_until_str)
if datetime.now() > state_until:
logger.info(f"特殊状态 {self.state['state'].name} 已结束,自动恢复为 SLEEPING。")
# 假设特殊状态(如生气)结束后,是恢复到普通睡眠状态
self.set_state(SleepState.SLEEPING)
return self.state["state"]
def set_state(
self,
new_state: SleepState,
duration_seconds: Optional[float] = None,
sleep_start: Optional[datetime] = None,
wake_up: Optional[datetime] = None,
):
"""
核心函数:切换到新的睡眠状态,并更新相关的状态数据。
"""
current_state = self.get_current_state()
if current_state == new_state:
return # 状态未改变
logger.info(f"睡眠状态变更: {current_state.name} -> {new_state.name}")
self.state["state"] = new_state
if new_state == SleepState.AWAKE:
self._default_state() # 醒来时重置所有状态
self.state["state"] = SleepState.AWAKE # 确保状态正确
elif new_state == SleepState.SLEEPING:
self.state["sleep_start_time"] = (sleep_start or datetime.now()).isoformat()
self.state["wake_up_time"] = wake_up.isoformat() if wake_up else None
self.state["state_until"] = None # 清除特殊状态持续时间
self.state["wakefulness"] = 0.0 # 进入睡眠时清零唤醒度
elif new_state in [SleepState.WOKEN_UP_ANGRY, SleepState.INSOMNIA]:
if duration_seconds:
self.state["state_until"] = (datetime.now() + timedelta(seconds=duration_seconds)).isoformat()
else:
self.state["state_until"] = None
self.save_state()
def update_last_checked(self):
"""更新最后检查时间"""
self.state["last_checked"] = datetime.now().isoformat()
self.save_state()
def get_wake_up_time(self) -> Optional[datetime]:
"""获取预定的起床时间,如果已设置的话。"""
wake_up_str = self.state.get("wake_up_time")
if wake_up_str:
try:
return datetime.fromisoformat(wake_up_str)
except (ValueError, TypeError):
return None
return None
def get_sleep_start_time(self) -> Optional[datetime]:
"""获取本次睡眠的开始时间,如果已设置的话。"""
sleep_start_str = self.state.get("sleep_start_time")
if sleep_start_str:
try:
return datetime.fromisoformat(sleep_start_str)
except (ValueError, TypeError):
return None
return None
def set_wake_up_time(self, wake_up: datetime):
"""
更新起床时间。
主要用于“日程表”模式下,当第二天凌晨拿到新日程时,更新之前未知的起床时间。
"""
if self.get_current_state() == SleepState.AWAKE:
logger.warning("尝试为清醒状态设置起床时间,操作被忽略。")
return
self.state["wake_up_time"] = wake_up.isoformat()
logger.info(f"更新预定起床时间为: {self.state['wake_up_time']}")
self.save_state()
# 全局单例
sleep_state_manager = SleepStateManager()

View File

@@ -0,0 +1,43 @@
from src.common.logger import get_logger
from src.manager.async_task_manager import AsyncTask, async_task_manager
from .sleep_logic import sleep_logic
logger = get_logger("sleep_tasks")
class SleepSystemCheckTask(AsyncTask):
"""
睡眠系统周期性检查任务。
继承自 AsyncTask由 async_task_manager 统一管理。
"""
def __init__(self, run_interval: int = 60):
"""
初始化任务。
Args:
run_interval (int): 任务运行的时间间隔。默认为60秒检查一次。
"""
super().__init__(task_name="SleepSystemCheckTask", run_interval=run_interval)
async def run(self):
"""
任务的核心执行过程。
每次运行时,调用 sleep_logic 的主函数来检查和更新状态。
"""
logger.debug("睡眠系统定时任务触发,开始检查状态...")
try:
# 调用“大脑”进行一次思考和判断
sleep_logic.check_and_update_sleep_state()
except Exception as e:
logger.error(f"周期性检查睡眠状态时发生未知错误: {e}", exc_info=True)
async def start_sleep_system_tasks():
"""
启动睡眠系统的后台定时检查任务。
这个函数应该在程序启动时(例如 main.py被调用。
"""
logger.info("正在启动睡眠系统后台任务...")
check_task = SleepSystemCheckTask()
await async_task_manager.add_task(check_task)
logger.info("睡眠系统后台任务已成功启动。")

View File

@@ -584,6 +584,8 @@ class ChatBot:
if global_config.mood.enable_mood:
# 获取兴趣度用于情绪更新
interest_rate = getattr(message, "interest_value", 0.0)
if interest_rate is None:
interest_rate = 0.0
logger.debug(f"开始更新情绪状态,兴趣度: {interest_rate:.2f}")
# 获取当前聊天的情绪对象并更新情绪状态

View File

@@ -610,12 +610,7 @@ class SleepSystemConfig(ValidatedConfigBase):
insomnia_duration_minutes: list[int] = Field(
default_factory=lambda: [15, 45], description="单次失眠状态的持续时间范围(分钟)"
)
sleep_pressure_threshold: float = Field(default=30.0, description="触发“压力不足型失眠”的睡眠压力阈值")
deep_sleep_threshold: float = Field(default=80.0, description="进入“深度睡眠”的睡眠压力阈值")
insomnia_chance_low_pressure: float = Field(default=0.6, ge=0.0, le=1.0, description="压力不足时的失眠基础概率")
insomnia_chance_normal_pressure: float = Field(default=0.1, ge=0.0, le=1.0, description="压力正常时的失眠基础概率")
sleep_pressure_increment: float = Field(default=1.5, ge=0.0, description="每次AI执行动作后增加的睡眠压力值")
sleep_pressure_decay_rate: float = Field(default=1.5, ge=0.0, description="睡眠时,每分钟衰减的睡眠压力值")
insomnia_chance_pressure: float = Field(default=0.1, ge=0.0, le=1.0, description="失眠基础概率")
# --- 弹性睡眠与睡前消息 ---
enable_flexible_sleep: bool = Field(default=True, description="是否启用弹性睡眠")

View File

@@ -29,6 +29,7 @@ from src.plugin_system.core.event_manager import event_manager
from src.plugin_system.core.plugin_manager import plugin_manager
from src.schedule.monthly_plan_manager import monthly_plan_manager
from src.schedule.schedule_manager import schedule_manager
from src.chat.message_manager.sleep_system.tasks import start_sleep_system_tasks
# 插件系统现在使用统一的插件加载器
install(extra_lines=3)
@@ -519,6 +520,14 @@ MoFox_Bot(第三方修改版)
except Exception as e:
logger.error(f"日程表管理器初始化失败: {e}")
# 初始化睡眠系统
if global_config.sleep_system.enable:
try:
await start_sleep_system_tasks()
logger.info("睡眠系统初始化成功")
except Exception as e:
logger.error(f"睡眠系统初始化失败: {e}")
def _safe_init(self, component_name: str, init_func) -> callable:
"""安全初始化组件,捕获异常"""

View File

@@ -13,6 +13,7 @@ from src.manager.async_task_manager import AsyncTask, async_task_manager
from src.plugin_system import BaseEventHandler, EventType
from src.plugin_system.apis import chat_api, message_api, person_api
from src.plugin_system.base.base_event import HandlerResult
from src.chat.message_manager.sleep_system.state_manager import SleepState, sleep_state_manager
from .proactive_thinker_executor import ProactiveThinkerExecutor
@@ -38,6 +39,10 @@ class ColdStartTask(AsyncTask):
await asyncio.sleep(30) # 延迟以确保所有服务和聊天流已从数据库加载完毕
try:
current_state = sleep_state_manager.get_current_state()
if current_state == SleepState.SLEEPING:
logger.info("bot正在睡觉,跳过本次任务")
return
logger.info("【冷启动】开始扫描白名单,唤醒沉睡的聊天流...")
# 【修复】增加对私聊总开关的判断
@@ -147,6 +152,10 @@ class ProactiveThinkingTask(AsyncTask):
# 计算下一次检查前的休眠时间
next_interval = self._get_next_interval()
try:
current_state = sleep_state_manager.get_current_state()
if current_state == SleepState.SLEEPING:
logger.info("bot正在睡觉,跳过本次任务")
return
logger.debug(f"【日常唤醒】下一次检查将在 {next_interval:.2f} 秒后进行。")
await asyncio.sleep(next_interval)

View File

@@ -1,49 +0,0 @@
import logging
from pathlib import Path
from typing import Optional
from src.plugin_system import BasePlugin, register_plugin
from .config import SleepSystemConfig
from .state_manager import StateManager
from .sleep_logic import SleepLogic
from .tasks import SleepCycleTask
# 日志配置
logger = logging.getLogger(__name__)
# 全局任务变量
sleep_task: Optional[SleepCycleTask] = None
sleep_logic_instance: Optional[SleepLogic] = None
@register_plugin
class SleepSystemPlugin(BasePlugin):
plugin_name: str = "sleep_system"
def on_load(self) -> None:
global sleep_task, sleep_logic_instance
logger.info("睡眠系统插件正在加载...")
# 1. 加载配置
config = self.get_config(self.plugin_name, SleepSystemConfig)
# 2. 初始化状态管理器
state_file = Path(f"data/{self.plugin_name}_state.json")
state_manager = StateManager(state_file_path=state_file)
# 3. 初始化核心逻辑
sleep_logic_instance = SleepLogic(config=config, state_manager=state_manager)
# 4. 初始化并启动定时任务
sleep_task = SleepCycleTask(sleep_logic=sleep_logic_instance, interval_seconds=30)
sleep_task.start()
logger.info("睡眠系统插件加载完成,定时任务已启动。")
def on_unload(self) -> None:
global sleep_task, sleep_logic_instance
logger.info("睡眠系统插件正在卸载...")
if sleep_task:
sleep_task.stop()
sleep_logic_instance = None
logger.info("睡眠系统插件已卸载,定时任务已停止。")

View File

@@ -1,9 +0,0 @@
import asyncio
from . import sleep_logic_instance
def on_message_received():
"""
当接收到用户消息时调用此函数,用于处理睡眠中断。
"""
if sleep_logic_instance:
asyncio.create_task(sleep_logic_instance.handle_external_event())

View File

@@ -1,18 +0,0 @@
from typing import Tuple
from pydantic import BaseModel, Field
class SleepSystemConfig(BaseModel):
# 睡眠时间段,格式为 (时, 分)
sleep_time: Tuple[int, int] = Field(default=(23, 0), description="每日固定的入睡时间点")
wake_up_time: Tuple[int, int] = Field(default=(7, 0), description="每日固定的唤醒时间点")
# 睡前准备时间(分钟)
prepare_sleep_duration: int = Field(default=15, ge=5, le=30, description="进入睡眠状态前的准备时间(分钟)")
# 失眠设置
insomnia_probability: float = Field(default=0.1, ge=0, le=1, description="在睡眠状态下触发失眠的概率")
insomnia_duration_minutes: Tuple[int, int] = Field(default=(10, 30), description="失眠状态的持续时间范围(分钟)")
# 被吵醒设置
woken_up_cooldown_minutes: int = Field(default=10, description="被吵醒后尝试重新入睡的冷却时间(分钟)")

View File

@@ -1,2 +0,0 @@
# This file is intentionally left empty.
# It is a marker file that tells MyPy to perform type checking on this package.

View File

@@ -1,134 +0,0 @@
import logging
import random
import time
from datetime import datetime, time as dt_time, timedelta
from .config import SleepSystemConfig
from .state_manager import StateManager, SleepState
logger = logging.getLogger(__name__)
class SleepLogic:
"""
实现睡眠系统的核心状态机逻辑。
"""
def __init__(self, config: SleepSystemConfig, state_manager: StateManager):
self.config = config
self.state_manager = state_manager
async def update_state(self) -> None:
"""
核心更新函数,由定时任务调用。
根据当前时间和状态,决定是否进行状态转换。
"""
current_state = await self.state_manager.get_state()
now = datetime.now()
handler = getattr(self, f"_handle_{current_state.current_state.lower()}", self._handle_unknown)
await handler(current_state, now)
def _is_in_sleep_time_range(self, now: datetime) -> bool:
"""检查当前时间是否在理论睡眠时间范围内"""
wake_up_time = dt_time(self.config.wake_up_time[0], self.config.wake_up_time[1])
sleep_time = dt_time(self.config.sleep_time[0], self.config.sleep_time[1])
now_time = now.time()
if sleep_time > wake_up_time: # 跨天睡眠
return now_time >= sleep_time or now_time < wake_up_time
else: # 当天睡眠
return sleep_time <= now_time < wake_up_time
async def _handle_awake(self, state: SleepState, now: datetime):
"""处理 AWAKE 状态的逻辑"""
# 检查是否到了准备睡觉的时间
sleep_datetime = datetime.combine(now.date(), dt_time(self.config.sleep_time[0], self.config.sleep_time[1]))
prepare_start_time = sleep_datetime - timedelta(minutes=self.config.prepare_sleep_duration)
if prepare_start_time <= now < sleep_datetime:
await self._transition_to(state, "PREPARING_SLEEP", duration_minutes=self.config.prepare_sleep_duration)
logger.info("时间已到,进入睡前准备状态。")
# 在这里可以触发“准备睡觉”的情绪或回复
async def _handle_preparing_sleep(self, state: SleepState, now: datetime):
"""处理 PREPARING_SLEEP 状态的逻辑"""
if state.state_end_time and now.timestamp() >= state.state_end_time:
# 准备时间结束,进入睡眠
if self._is_in_sleep_time_range(now):
await self._transition_to(state, "SLEEPING")
logger.info("准备时间结束,已进入睡眠状态。")
else:
await self._transition_to(state, "AWAKE")
logger.info("准备期间离开了理论睡眠时间,返回 AWAKE 状态。")
async def _handle_sleeping(self, state: SleepState, now: datetime):
"""处理 SLEEPING 状态的逻辑"""
# 检查是否到了起床时间
if not self._is_in_sleep_time_range(now):
await self._transition_to(state, "AWAKE")
logger.info("理论睡眠时间结束,已切换到 AWAKE 状态。")
# 在这里可以触发“睡醒”的情绪
return
# 根据概率随机触发失眠
if random.random() < self.config.insomnia_probability:
duration = random.randint(self.config.insomnia_duration_minutes[0], self.config.insomnia_duration_minutes[1])
await self._transition_to(state, "INSOMNIA", duration_minutes=duration)
logger.info(f"随机触发失眠,持续 {duration} 分钟。")
# 在这里可以触发“烦躁”的情绪
async def _handle_insomnia(self, state: SleepState, now: datetime):
"""处理 INSOMNIA 状态的逻辑"""
# 检查失眠时间是否结束
if state.state_end_time and now.timestamp() >= state.state_end_time:
await self._transition_to(state, "SLEEPING")
logger.info("失眠时间结束,返回睡眠状态。")
# 如果在失眠期间就到了起床时间,直接唤醒
elif not self._is_in_sleep_time_range(now):
await self._transition_to(state, "AWAKE")
logger.info("在失眠期间到达起床时间,已唤醒。")
async def _handle_woken_up(self, state: SleepState, now: datetime):
"""处理 WOKEN_UP 状态的逻辑"""
# 检查冷却时间是否结束
if state.state_end_time and now.timestamp() >= state.state_end_time:
if self._is_in_sleep_time_range(now):
await self._transition_to(state, "PREPARING_SLEEP", duration_minutes=self.config.prepare_sleep_duration)
logger.info("被吵醒冷却时间结束,尝试重新入睡。")
else:
await self._transition_to(state, "AWAKE")
logger.info("被吵醒后到达起床时间,已唤醒。")
async def _handle_unknown(self, state: SleepState, now: datetime):
"""处理未知状态"""
logger.warning(f"检测到未知的睡眠状态: {state.current_state}。将重置为 AWAKE。")
await self._transition_to(state, "AWAKE")
async def handle_external_event(self):
"""处理外部事件,例如收到用户消息"""
current_state = await self.state_manager.get_state()
if current_state.current_state in ["SLEEPING", "INSOMNIA"]:
await self._transition_to(current_state, "WOKEN_UP", duration_minutes=self.config.woken_up_cooldown_minutes)
logger.info("在睡眠中被外部事件打断,进入 WOKEN_UP 状态。")
# 在这里可以触发“起床气”情绪
async def _transition_to(self, old_state: SleepState, new_state_name: str, duration_minutes: int = 0):
"""
状态转换的统一处理函数。
Args:
old_state: 转换前的状态对象。
new_state_name: 新状态的名称。
duration_minutes: 新状态的持续时间分钟如果为0则不设结束时间。
"""
current_timestamp = time.time()
new_end_time = None
if duration_minutes > 0:
new_end_time = current_timestamp + duration_minutes * 60
new_state = SleepState(
current_state=new_state_name,
state_end_time=new_end_time,
last_updated=current_timestamp,
metadata=old_state.metadata # 继承 metadata
)
await self.state_manager.save_state(new_state)
logger.info(f"睡眠状态已从 {old_state.current_state} 转换为 {new_state_name}")

View File

@@ -1,83 +0,0 @@
import json
import logging
from pathlib import Path
from typing import Optional, Dict, Any
from pydantic import BaseModel, Field
import asyncio
# 设置日志记录
logger = logging.getLogger(__name__)
class SleepState(BaseModel):
"""定义睡眠状态的数据模型"""
current_state: str = Field(default="AWAKE", description="当前的睡眠状态")
state_end_time: Optional[float] = Field(default=None, description="当前状态的预计结束时间戳")
last_updated: float = Field(description="状态最后更新的时间戳")
metadata: Dict[str, Any] = Field(default={}, description="用于存储额外状态信息的字典")
class StateManager:
"""
负责睡眠状态的持久化管理。
将状态以 JSON 格式读/写到本地文件,以降低耦合。
"""
def __init__(self, state_file_path: Path):
self.state_file_path = state_file_path
self._state: Optional[SleepState] = None
self._lock = asyncio.Lock()
self._load_state()
def _load_state(self) -> None:
"""从文件加载状态,如果文件不存在或为空,则创建默认状态"""
try:
if self.state_file_path.exists() and self.state_file_path.stat().st_size > 0:
with open(self.state_file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
self._state = SleepState(**data)
logger.info(f"睡眠状态已从 {self.state_file_path} 加载。")
else:
self._create_default_state()
except (json.JSONDecodeError, TypeError) as e:
logger.warning(f"无法解析状态文件 {self.state_file_path}: {e}。将创建新的默认状态。")
self._create_default_state()
except Exception as e:
logger.error(f"加载睡眠状态时发生未知错误: {e}")
self._create_default_state()
def _create_default_state(self) -> None:
"""创建一个默认的清醒状态"""
import time
self._state = SleepState(last_updated=time.time())
logger.info("未找到现有状态文件,已创建默认的睡眠状态 (AWAKE)。")
# 立即保存一次,以确保文件被创建
asyncio.create_task(self.save_state())
async def get_state(self) -> SleepState:
"""异步获取当前的状态"""
async with self._lock:
if self._state is None:
self._load_state()
# 此时 _state 必然已被 _load_state 或 _create_default_state 初始化
assert self._state is not None, "State should be initialized here"
return self._state.copy(deep=True)
async def save_state(self, new_state: Optional[SleepState] = None) -> None:
"""
异步保存当前状态到文件。
如果提供了 new_state则先更新内部状态。
"""
async with self._lock:
if new_state:
self._state = new_state
if self._state is None:
logger.warning("尝试保存一个空的状态,操作已跳过。")
return
try:
# 确保目录存在
self.state_file_path.parent.mkdir(parents=True, exist_ok=True)
with open(self.state_file_path, 'w', encoding='utf-8') as f:
json.dump(self._state.dict(), f, indent=4, ensure_ascii=False)
logger.debug(f"睡眠状态已成功保存到 {self.state_file_path}")
except Exception as e:
logger.error(f"保存睡眠状态到 {self.state_file_path} 时失败: {e}")

View File

@@ -1,48 +0,0 @@
import asyncio
import logging
from typing import Optional
from .sleep_logic import SleepLogic
logger = logging.getLogger(__name__)
class SleepCycleTask:
"""
负责周期性地更新睡眠状态的后台任务。
"""
def __init__(self, sleep_logic: SleepLogic, interval_seconds: int = 30):
self.sleep_logic = sleep_logic
self.interval_seconds = interval_seconds
self._task: Optional[asyncio.Task] = None
self._is_running = False
async def _run(self):
"""任务的内部循环"""
logger.info("睡眠系统周期性更新任务已启动。")
while self._is_running:
try:
await self.sleep_logic.update_state()
await asyncio.sleep(self.interval_seconds)
except asyncio.CancelledError:
logger.info("睡眠系统任务被取消。")
break
except Exception as e:
logger.error(f"睡眠系统任务在执行期间发生错误: {e}", exc_info=True)
# 发生错误后,等待一段时间再继续,避免快速失败循环
await asyncio.sleep(self.interval_seconds * 2)
def start(self):
"""启动后台任务"""
if not self._is_running:
self._is_running = True
self._task = asyncio.create_task(self._run())
else:
logger.warning("尝试启动一个已经在运行的睡眠系统任务。")
def stop(self):
"""停止后台任务"""
if self._is_running and self._task:
self._is_running = False
self._task.cancel()
logger.info("睡眠系统周期性更新任务已请求停止。")
else:
logger.warning("尝试停止一个尚未启动的睡眠系统任务。")

View File

@@ -1,5 +1,5 @@
[inner]
version = "7.3.0"
version = "7.3.1"
#----以下是给开发人员阅读的如果你只是部署了MoFox-Bot不需要阅读----
#如果你想要修改配置文件请递增version的值
@@ -490,16 +490,8 @@ re_sleep_delay_minutes = 5 # "被唤醒后,如果多久没有新消息则尝
# --- 失眠机制相关参数 ---
enable_insomnia_system = false # 是否启用失眠系统
# 触发“压力不足型失眠”的睡眠压力阈值
sleep_pressure_threshold = 30.0
# 进入“深度睡眠”的睡眠压力阈值
deep_sleep_threshold = 80.0
# 压力正常时的失眠基础概率 (0.0 to 1.0)
insomnia_chance_normal_pressure = 0.1
# 每次AI执行动作后增加的睡眠压力值
sleep_pressure_increment = 1.5
# 睡眠时,每分钟衰减的睡眠压力值
sleep_pressure_decay_rate = 1.5
# 失眠概率 (0.0 to 1.0)
insomnia_chance_pressure = 0.1
# --- 弹性睡眠与睡前消息 ---
# 是否启用弹性睡眠。启用后AI不会到点立刻入睡而是会根据睡眠压力增加5-10分钟的缓冲并可能因为压力不足而推迟睡眠。