refactor(schedule): 将睡眠状态管理逻辑重构并迁移到SleepManager

将原先分散在 `ScheduleManager` 中的睡眠状态机逻辑(包括状态判断、转换、持久化等)抽取并封装到一个新的 `SleepManager` 类中。

这次重构的主要目的如下:
- **职责分离**: `ScheduleManager` 的核心职责是管理日程的生成和查询,而睡眠状态的管理是一个独立的、复杂的逻辑单元。将其分离可以使两个类的职责更单一、代码更清晰。
- **可维护性**: 将所有与睡眠相关的状态和逻辑集中到 `SleepManager` 中,使得未来对睡眠功能的修改和扩展更加容易,减少了对 `ScheduleManager` 的影响。
- **代码简化**: `ScheduleManager` 不再需要管理内部的睡眠状态变量(如 `_current_state`, `_sleep_buffer_end_time` 等),而是通过委托 `sleep_manager` 实例来处理,简化了其内部实现。

相应的,`HfcContext` 中冗余的睡眠相关状态(如 `is_in_insomnia`)也被移除,统一由 `SleepManager` 管理。其他模块(如 `HeartFChatting`, `WakeUpManager`)对睡眠状态的调用也已更新为通过 `schedule_manager.sleep_manager` 或其代理方法进行。
This commit is contained in:
minecraft1024a
2025-08-29 18:17:16 +08:00
committed by Windpicker-owo
parent 181a4f9d59
commit e85fb08c09
5 changed files with 425 additions and 359 deletions

View File

@@ -197,7 +197,7 @@ class HeartFChatting:
- NORMAL模式检查进入FOCUS模式的条件并通过normal_mode_handler处理消息 - NORMAL模式检查进入FOCUS模式的条件并通过normal_mode_handler处理消息
""" """
# --- 核心状态更新 --- # --- 核心状态更新 ---
await schedule_manager._update_sleep_state(self.wakeup_manager) await schedule_manager.update_sleep_state(self.wakeup_manager)
current_sleep_state = schedule_manager.get_current_sleep_state() current_sleep_state = schedule_manager.get_current_sleep_state()
is_sleeping = current_sleep_state == SleepState.SLEEPING is_sleeping = current_sleep_state == SleepState.SLEEPING
is_in_insomnia = current_sleep_state == SleepState.INSOMNIA is_in_insomnia = current_sleep_state == SleepState.INSOMNIA
@@ -235,7 +235,6 @@ class HeartFChatting:
if current_sleep_state == SleepState.WOKEN_UP: if current_sleep_state == SleepState.WOKEN_UP:
logger.info(f"{self.context.log_prefix} 从睡眠中被唤醒,将处理积压的消息。") logger.info(f"{self.context.log_prefix} 从睡眠中被唤醒,将处理积压的消息。")
self.context.last_wakeup_time = time.time()
# 根据聊天模式处理新消息 # 根据聊天模式处理新消息
if self.context.loop_mode == ChatMode.FOCUS: if self.context.loop_mode == ChatMode.FOCUS:
@@ -258,12 +257,12 @@ class HeartFChatting:
# --- 重新入睡逻辑 --- # --- 重新入睡逻辑 ---
# 如果被吵醒了,并且在一定时间内没有新消息,则尝试重新入睡 # 如果被吵醒了,并且在一定时间内没有新消息,则尝试重新入睡
if schedule_manager._is_woken_up and not has_new_messages: if schedule_manager.get_current_sleep_state() == SleepState.WOKEN_UP and not has_new_messages:
re_sleep_delay = global_config.sleep_system.re_sleep_delay_minutes * 60 re_sleep_delay = global_config.sleep_system.re_sleep_delay_minutes * 60
# 使用 last_message_time 来判断空闲时间 # 使用 last_message_time 来判断空闲时间
if time.time() - self.context.last_message_time > re_sleep_delay: if time.time() - self.context.last_message_time > re_sleep_delay:
logger.info(f"{self.context.log_prefix} 已被唤醒且超过 {re_sleep_delay / 60} 分钟无新消息,尝试重新入睡。") logger.info(f"{self.context.log_prefix} 已被唤醒且超过 {re_sleep_delay / 60} 分钟无新消息,尝试重新入睡。")
schedule_manager.reset_wakeup_state() schedule_manager.reset_sleep_state_after_wakeup()
# 保存HFC上下文状态 # 保存HFC上下文状态
self.context.save_context_state() self.context.save_context_state()

View File

@@ -46,11 +46,6 @@ class HfcContext:
self.sleep_pressure = 0.0 self.sleep_pressure = 0.0
self.was_sleeping = False # 用于检测睡眠状态的切换 self.was_sleeping = False # 用于检测睡眠状态的切换
# 失眠状态
self.is_in_insomnia: bool = False
self.insomnia_end_time: float = 0.0
self.last_wakeup_time: float = 0.0 # 被吵醒的时间
self.last_message_time = time.time() self.last_message_time = time.time()
self.last_read_time = time.time() - 10 self.last_read_time = time.time() - 10
@@ -78,8 +73,6 @@ class HfcContext:
if state and isinstance(state, dict): if state and isinstance(state, dict):
self.energy_value = state.get("energy_value", 5.0) self.energy_value = state.get("energy_value", 5.0)
self.sleep_pressure = state.get("sleep_pressure", 0.0) self.sleep_pressure = state.get("sleep_pressure", 0.0)
self.is_in_insomnia = state.get("is_in_insomnia", False)
self.insomnia_end_time = state.get("insomnia_end_time", 0.0)
logger = get_logger("hfc_context") logger = get_logger("hfc_context")
logger.info(f"{self.log_prefix} 成功从本地存储加载HFC上下文状态: {state}") logger.info(f"{self.log_prefix} 成功从本地存储加载HFC上下文状态: {state}")
else: else:
@@ -91,9 +84,6 @@ class HfcContext:
state = { state = {
"energy_value": self.energy_value, "energy_value": self.energy_value,
"sleep_pressure": self.sleep_pressure, "sleep_pressure": self.sleep_pressure,
"is_in_insomnia": self.is_in_insomnia,
"insomnia_end_time": self.insomnia_end_time,
"last_wakeup_time": self.last_wakeup_time,
} }
local_storage[self._get_storage_key()] = state local_storage[self._get_storage_key()] = state
logger = get_logger("hfc_context") logger = get_logger("hfc_context")

View File

@@ -137,7 +137,9 @@ class WakeUpManager:
# 只有在休眠且非失眠状态下才累积唤醒度 # 只有在休眠且非失眠状态下才累积唤醒度
from src.schedule.schedule_manager import schedule_manager from src.schedule.schedule_manager import schedule_manager
if not schedule_manager.is_sleeping() or self.context.is_in_insomnia: from src.schedule.sleep_manager import SleepState
current_sleep_state = schedule_manager.get_current_sleep_state()
if current_sleep_state != SleepState.SLEEPING:
return False return False
old_value = self.wakeup_value old_value = self.wakeup_value

View File

@@ -1,8 +1,7 @@
import orjson import orjson
import asyncio import asyncio
import random import random
from datetime import datetime, time, timedelta from datetime import datetime, time, timedelta, date
from enum import Enum, auto
from typing import Optional, List, Dict, Any, TYPE_CHECKING from typing import Optional, List, Dict, Any, TYPE_CHECKING
from lunar_python import Lunar from lunar_python import Lunar
from pydantic import BaseModel, ValidationError, validator from pydantic import BaseModel, ValidationError, validator
@@ -10,15 +9,14 @@ from pydantic import BaseModel, ValidationError, validator
from src.common.database.sqlalchemy_models import Schedule, get_db_session from src.common.database.sqlalchemy_models import Schedule, get_db_session
from src.common.database.monthly_plan_db import ( from src.common.database.monthly_plan_db import (
get_smart_plans_for_daily_schedule, get_smart_plans_for_daily_schedule,
update_plan_usage # 保留兼容性 update_plan_usage, # 保留兼容性
) )
from src.config.config import global_config, model_config from src.config.config import global_config, model_config
from src.llm_models.utils_model import LLMRequest from src.llm_models.utils_model import LLMRequest
from src.common.logger import get_logger from src.common.logger import get_logger
from json_repair import repair_json from json_repair import repair_json
from src.manager.async_task_manager import AsyncTask, async_task_manager from src.manager.async_task_manager import AsyncTask, async_task_manager
from src.manager.local_store_manager import local_storage from .sleep_manager import SleepManager, SleepState
from src.plugin_system.apis import send_api, generator_api
if TYPE_CHECKING: if TYPE_CHECKING:
from src.chat.chat_loop.wakeup_manager import WakeUpManager from src.chat.chat_loop.wakeup_manager import WakeUpManager
@@ -35,81 +33,85 @@ DEFAULT_SCHEDULE_GUIDELINES = """
另外,请保证充足的休眠时间来处理和整合一天的数据。 另外,请保证充足的休眠时间来处理和整合一天的数据。
""" """
class ScheduleItem(BaseModel): class ScheduleItem(BaseModel):
"""单个日程项的Pydantic模型""" """单个日程项的Pydantic模型"""
time_range: str time_range: str
activity: str activity: str
@validator('time_range') @validator("time_range")
def validate_time_range(cls, v): def validate_time_range(cls, v):
"""验证时间范围格式""" """验证时间范围格式"""
if not v or '-' not in v: if not v or "-" not in v:
raise ValueError("时间范围必须包含'-'分隔符") raise ValueError("时间范围必须包含'-'分隔符")
try: try:
start_str, end_str = v.split('-', 1) start_str, end_str = v.split("-", 1)
start_str = start_str.strip() start_str = start_str.strip()
end_str = end_str.strip() end_str = end_str.strip()
# 验证时间格式 # 验证时间格式
datetime.strptime(start_str, "%H:%M") datetime.strptime(start_str, "%H:%M")
datetime.strptime(end_str, "%H:%M") datetime.strptime(end_str, "%H:%M")
return v return v
except ValueError as e: except ValueError as e:
raise ValueError(f"时间格式无效应为HH:MM-HH:MM格式: {e}") from e raise ValueError(f"时间格式无效应为HH:MM-HH:MM格式: {e}") from e
@validator('activity') @validator("activity")
def validate_activity(cls, v): def validate_activity(cls, v):
"""验证活动描述""" """验证活动描述"""
if not v or not v.strip(): if not v or not v.strip():
raise ValueError("活动描述不能为空") raise ValueError("活动描述不能为空")
return v.strip() return v.strip()
class ScheduleData(BaseModel): class ScheduleData(BaseModel):
"""完整日程数据的Pydantic模型""" """完整日程数据的Pydantic模型"""
schedule: List[ScheduleItem] schedule: List[ScheduleItem]
@validator('schedule') @validator("schedule")
def validate_schedule_completeness(cls, v): def validate_schedule_completeness(cls, v):
"""验证日程是否覆盖24小时""" """验证日程是否覆盖24小时"""
if not v: if not v:
raise ValueError("日程不能为空") raise ValueError("日程不能为空")
# 收集所有时间段 # 收集所有时间段
time_ranges = [] time_ranges = []
for item in v: for item in v:
try: try:
start_str, end_str = item.time_range.split('-', 1) start_str, end_str = item.time_range.split("-", 1)
start_time = datetime.strptime(start_str.strip(), "%H:%M").time() start_time = datetime.strptime(start_str.strip(), "%H:%M").time()
end_time = datetime.strptime(end_str.strip(), "%H:%M").time() end_time = datetime.strptime(end_str.strip(), "%H:%M").time()
time_ranges.append((start_time, end_time)) time_ranges.append((start_time, end_time))
except ValueError: except ValueError:
continue continue
# 检查是否覆盖24小时 # 检查是否覆盖24小时
if not cls._check_24_hour_coverage(time_ranges): if not cls._check_24_hour_coverage(time_ranges):
raise ValueError("日程必须覆盖完整的24小时") raise ValueError("日程必须覆盖完整的24小时")
return v return v
@staticmethod @staticmethod
def _check_24_hour_coverage(time_ranges: List[tuple]) -> bool: def _check_24_hour_coverage(time_ranges: List[tuple]) -> bool:
"""检查时间段是否覆盖24小时""" """检查时间段是否覆盖24小时"""
if not time_ranges: if not time_ranges:
return False return False
# 将时间转换为分钟数进行计算 # 将时间转换为分钟数进行计算
def time_to_minutes(t: time) -> int: def time_to_minutes(t: time) -> int:
return t.hour * 60 + t.minute return t.hour * 60 + t.minute
# 创建覆盖情况数组 (1440分钟 = 24小时) # 创建覆盖情况数组 (1440分钟 = 24小时)
covered = [False] * 1440 covered = [False] * 1440
for start_time, end_time in time_ranges: for start_time, end_time in time_ranges:
start_min = time_to_minutes(start_time) start_min = time_to_minutes(start_time)
end_min = time_to_minutes(end_time) end_min = time_to_minutes(end_time)
if start_min <= end_min: if start_min <= end_min:
# 同一天内的时间段 # 同一天内的时间段
for i in range(start_min, end_min): for i in range(start_min, end_min):
@@ -121,17 +123,10 @@ class ScheduleData(BaseModel):
covered[i] = True covered[i] = True
for i in range(0, end_min): for i in range(0, end_min):
covered[i] = True covered[i] = True
# 检查是否所有分钟都被覆盖 # 检查是否所有分钟都被覆盖
return all(covered) return all(covered)
class SleepState(Enum):
"""睡眠状态枚举"""
AWAKE = auto() # 完全清醒
INSOMNIA = auto() # 失眠(在理论睡眠时间内保持清醒)
PREPARING_SLEEP = auto() # 准备入睡(缓冲期)
SLEEPING = auto() # 正在休眠
WOKEN_UP = auto() # 被吵醒
class ScheduleManager: class ScheduleManager:
def __init__(self): def __init__(self):
@@ -139,18 +134,8 @@ class ScheduleManager:
self.llm = LLMRequest(model_set=model_config.model_task_config.schedule_generator, request_type="schedule") self.llm = LLMRequest(model_set=model_config.model_task_config.schedule_generator, request_type="schedule")
self.max_retries = -1 # 无限重试,直到成功生成标准日程表 self.max_retries = -1 # 无限重试,直到成功生成标准日程表
self.daily_task_started = False self.daily_task_started = False
self.last_sleep_log_time = 0
self.sleep_log_interval = 35 # 日志记录间隔,单位秒
self.schedule_generation_running = False # 防止重复生成任务 self.schedule_generation_running = False # 防止重复生成任务
self.sleep_manager = SleepManager(self)
# --- 统一睡眠状态管理 ---
self._current_state: SleepState = SleepState.AWAKE
self._sleep_buffer_end_time: Optional[datetime] = None
self._total_delayed_minutes_today: int = 0
self._last_sleep_check_date: Optional[datetime.date] = None
self._last_fully_slept_log_time: float = 0
self._load_sleep_state()
async def start_daily_schedule_generation(self): async def start_daily_schedule_generation(self):
"""启动每日零点自动生成新日程的任务""" """启动每日零点自动生成新日程的任务"""
@@ -175,10 +160,10 @@ class ScheduleManager:
schedule_record = session.query(Schedule).filter(Schedule.date == today_str).first() schedule_record = session.query(Schedule).filter(Schedule.date == today_str).first()
if schedule_record: if schedule_record:
logger.info(f"从数据库加载今天的日程 ({today_str})。") logger.info(f"从数据库加载今天的日程 ({today_str})。")
try: try:
schedule_data = orjson.loads(str(schedule_record.schedule_data)) schedule_data = orjson.loads(str(schedule_record.schedule_data))
# 使用Pydantic验证日程数据 # 使用Pydantic验证日程数据
if self._validate_schedule_with_pydantic(schedule_data): if self._validate_schedule_with_pydantic(schedule_data):
self.today_schedule = schedule_data self.today_schedule = schedule_data
@@ -207,15 +192,15 @@ class ScheduleManager:
if self.schedule_generation_running: if self.schedule_generation_running:
logger.info("日程生成任务已在运行中,跳过重复启动") logger.info("日程生成任务已在运行中,跳过重复启动")
return return
# 创建异步任务进行日程生成,不阻塞主程序 # 创建异步任务进行日程生成,不阻塞主程序
asyncio.create_task(self._async_generate_and_save_schedule()) asyncio.create_task(self._async_generate_and_save_schedule())
logger.info("已启动异步日程生成任务") logger.info("已启动异步日程生成任务")
async def _async_generate_and_save_schedule(self): async def _async_generate_and_save_schedule(self):
"""异步生成并保存日程的内部方法""" """异步生成并保存日程的内部方法"""
self.schedule_generation_running = True self.schedule_generation_running = True
try: try:
now = datetime.now() now = datetime.now()
today_str = now.strftime("%Y-%m-%d") today_str = now.strftime("%Y-%m-%d")
@@ -227,7 +212,7 @@ class ScheduleManager:
festivals = lunar.getFestivals() festivals = lunar.getFestivals()
other_festivals = lunar.getOtherFestivals() other_festivals = lunar.getOtherFestivals()
all_festivals = festivals + other_festivals all_festivals = festivals + other_festivals
festival_block = "" festival_block = ""
if all_festivals: if all_festivals:
festival_text = "".join(all_festivals) festival_text = "".join(all_festivals)
@@ -238,33 +223,28 @@ class ScheduleManager:
used_plan_ids = [] used_plan_ids = []
if global_config.monthly_plan_system and global_config.monthly_plan_system.enable: if global_config.monthly_plan_system and global_config.monthly_plan_system.enable:
# 使用新的智能抽取逻辑 # 使用新的智能抽取逻辑
avoid_days = getattr(global_config.monthly_plan_system, 'avoid_repetition_days', 7) avoid_days = getattr(global_config.monthly_plan_system, "avoid_repetition_days", 7)
# 使用新的智能抽取逻辑 # 使用新的智能抽取逻辑
avoid_days = getattr(global_config.monthly_plan_system, 'avoid_repetition_days', 7) avoid_days = getattr(global_config.monthly_plan_system, "avoid_repetition_days", 7)
sampled_plans = get_smart_plans_for_daily_schedule( sampled_plans = get_smart_plans_for_daily_schedule(
current_month_str, current_month_str, max_count=3, avoid_days=avoid_days
max_count=3,
avoid_days=avoid_days
) )
# 如果计划耗尽,则触发补充生成 # 如果计划耗尽,则触发补充生成
if not sampled_plans: if not sampled_plans:
logger.info("可用的月度计划已耗尽或不足,尝试进行补充生成...") logger.info("可用的月度计划已耗尽或不足,尝试进行补充生成...")
from mmc.src.schedule.monthly_plan_manager import monthly_plan_manager from mmc.src.schedule.monthly_plan_manager import monthly_plan_manager
success = await monthly_plan_manager.generate_monthly_plans(current_month_str) success = await monthly_plan_manager.generate_monthly_plans(current_month_str)
if success: if success:
logger.info("补充生成完成,重新抽取月度计划...") logger.info("补充生成完成,重新抽取月度计划...")
sampled_plans = get_smart_plans_for_daily_schedule( sampled_plans = get_smart_plans_for_daily_schedule(
current_month_str, current_month_str, max_count=3, avoid_days=avoid_days
max_count=3,
avoid_days=avoid_days
) )
else: else:
logger.warning("月度计划补充生成失败。") logger.warning("月度计划补充生成失败。")
if sampled_plans: if sampled_plans:
used_plan_ids = [plan.id for plan in sampled_plans] # SQLAlchemy 对象的 id 属性会自动返回实际值
plan_texts = "\n".join([f"- {plan.plan_text}" for plan in sampled_plans]) plan_texts = "\n".join([f"- {plan.plan_text}" for plan in sampled_plans])
monthly_plans_block = f""" monthly_plans_block = f"""
**我这个月的一些小目标/计划 (请在今天的日程中适当体现)**: **我这个月的一些小目标/计划 (请在今天的日程中适当体现)**:
@@ -304,33 +284,33 @@ class ScheduleManager:
请你扮演我以我的身份和口吻为我生成一份完整的24小时日程表。 请你扮演我以我的身份和口吻为我生成一份完整的24小时日程表。
""" """
# 无限重试直到生成成功的标准日程表 # 无限重试直到生成成功的标准日程表
attempt = 0 attempt = 0
while True: while True:
attempt += 1 attempt += 1
try: try:
logger.info(f"正在生成日程 (第 {attempt} 次尝试)") logger.info(f"正在生成日程 (第 {attempt} 次尝试)")
# 构建当前尝试的prompt增加压力提示 # 构建当前尝试的prompt增加压力提示
prompt = base_prompt prompt = base_prompt
if attempt > 1: if attempt > 1:
failure_hint = f""" failure_hint = f"""
**重要提醒 (第{attempt}次尝试)**: **重要提醒 (第{attempt}次尝试)**:
- 前面{attempt-1}次生成都失败了请务必严格按照要求生成完整的24小时日程 - 前面{attempt - 1}次生成都失败了请务必严格按照要求生成完整的24小时日程
- 确保JSON格式正确所有时间段连续覆盖24小时 - 确保JSON格式正确所有时间段连续覆盖24小时
- 时间格式必须为HH:MM-HH:MM不能有时间间隙或重叠 - 时间格式必须为HH:MM-HH:MM不能有时间间隙或重叠
- 不要输出任何解释文字只输出纯JSON数组 - 不要输出任何解释文字只输出纯JSON数组
- 确保输出完整,不要被截断 - 确保输出完整,不要被截断
""" """
prompt += failure_hint prompt += failure_hint
response, _ = await self.llm.generate_response_async(prompt) response, _ = await self.llm.generate_response_async(prompt)
# 尝试解析和验证JSON项目内置的反截断机制会自动处理截断问题 # 尝试解析和验证JSON项目内置的反截断机制会自动处理截断问题
schedule_data = orjson.loads(repair_json(response)) schedule_data = orjson.loads(repair_json(response))
# 使用Pydantic验证生成的日程数据 # 使用Pydantic验证生成的日程数据
if self._validate_schedule_with_pydantic(schedule_data): if self._validate_schedule_with_pydantic(schedule_data):
# 验证通过,保存到数据库 # 验证通过,保存到数据库
@@ -339,46 +319,49 @@ class ScheduleManager:
existing_schedule = session.query(Schedule).filter(Schedule.date == today_str).first() existing_schedule = session.query(Schedule).filter(Schedule.date == today_str).first()
if existing_schedule: if existing_schedule:
# 更新现有日程 # 更新现有日程
session.query(Schedule).filter(Schedule.date == today_str).update({ session.query(Schedule).filter(Schedule.date == today_str).update(
Schedule.schedule_data: orjson.dumps(schedule_data).decode('utf-8'), {
Schedule.updated_at: datetime.now() Schedule.schedule_data: orjson.dumps(schedule_data).decode("utf-8"),
}) Schedule.updated_at: datetime.now(),
}
)
else: else:
# 创建新日程 # 创建新日程
new_schedule = Schedule( new_schedule = Schedule(
date=today_str, date=today_str, schedule_data=orjson.dumps(schedule_data).decode("utf-8")
schedule_data=orjson.dumps(schedule_data).decode('utf-8')
) )
session.add(new_schedule) session.add(new_schedule)
session.commit() session.commit()
# 美化输出 # 美化输出
schedule_str = f"✅ 经过 {attempt} 次尝试,成功生成并保存今天的日程 ({today_str})\n" schedule_str = f"✅ 经过 {attempt} 次尝试,成功生成并保存今天的日程 ({today_str})\n"
for item in schedule_data: for item in schedule_data:
schedule_str += f" - {item.get('time_range', '未知时间')}: {item.get('activity', '未知活动')}\n" schedule_str += (
f" - {item.get('time_range', '未知时间')}: {item.get('activity', '未知活动')}\n"
)
logger.info(schedule_str) logger.info(schedule_str)
self.today_schedule = schedule_data self.today_schedule = schedule_data
# 成功生成日程后,更新使用过的月度计划的统计信息 # 成功生成日程后,更新使用过的月度计划的统计信息
if used_plan_ids and global_config.monthly_plan_system: if used_plan_ids and global_config.monthly_plan_system:
logger.info(f"更新使用过的月度计划 {used_plan_ids} 的统计信息。") logger.info(f"更新使用过的月度计划 {used_plan_ids} 的统计信息。")
update_plan_usage(used_plan_ids, today_str) # type: ignore update_plan_usage(used_plan_ids, today_str) # type: ignore
# 成功生成,退出无限循环 # 成功生成,退出无限循环
break break
else: else:
logger.warning(f"{attempt} 次生成的日程验证失败,继续重试...") logger.warning(f"{attempt} 次生成的日程验证失败,继续重试...")
# 添加短暂延迟,避免过于频繁的请求 # 添加短暂延迟,避免过于频繁的请求
await asyncio.sleep(2) await asyncio.sleep(2)
except Exception as e: except Exception as e:
logger.error(f"{attempt} 次生成日程失败: {e}") logger.error(f"{attempt} 次生成日程失败: {e}")
logger.info("继续重试...") logger.info("继续重试...")
# 添加短暂延迟,避免过于频繁的请求 # 添加短暂延迟,避免过于频繁的请求
await asyncio.sleep(3) await asyncio.sleep(3)
finally: finally:
self.schedule_generation_running = False self.schedule_generation_running = False
logger.info("日程生成任务结束") logger.info("日程生成任务结束")
@@ -396,12 +379,12 @@ class ScheduleManager:
try: try:
time_range = event.get("time_range") time_range = event.get("time_range")
activity = event.get("activity") activity = event.get("activity")
if not time_range or not activity: if not time_range or not activity:
logger.warning(f"日程事件缺少必要字段: {event}") logger.warning(f"日程事件缺少必要字段: {event}")
continue continue
start_str, end_str = time_range.split('-') start_str, end_str = time_range.split("-")
start_time = datetime.strptime(start_str.strip(), "%H:%M").time() start_time = datetime.strptime(start_str.strip(), "%H:%M").time()
end_time = datetime.strptime(end_str.strip(), "%H:%M").time() end_time = datetime.strptime(end_str.strip(), "%H:%M").time()
@@ -418,262 +401,19 @@ class ScheduleManager:
def get_current_sleep_state(self) -> SleepState: def get_current_sleep_state(self) -> SleepState:
"""获取当前的睡眠状态""" """获取当前的睡眠状态"""
return self._current_state return self.sleep_manager.get_current_sleep_state()
def is_sleeping(self) -> bool: def is_sleeping(self) -> bool:
"""检查当前是否处于正式休眠状态""" """检查当前是否处于正式休眠状态"""
return self._current_state == SleepState.SLEEPING return self.sleep_manager.is_sleeping()
async def _update_sleep_state(self, wakeup_manager: Optional["WakeUpManager"] = None): async def update_sleep_state(self, wakeup_manager: Optional["WakeUpManager"] = None):
""" """更新睡眠状态"""
核心状态机:根据当前情况更新睡眠状态 await self.sleep_manager.update_sleep_state(wakeup_manager)
"""
# --- 基础检查 ---
if not global_config.sleep_system.enable or not self.today_schedule:
if self._current_state != SleepState.AWAKE:
logger.debug("睡眠系统禁用或无日程,强制设为 AWAKE")
self._current_state = SleepState.AWAKE
return
now = datetime.now()
today = now.date()
# --- 每日状态重置 ---
if self._last_sleep_check_date != today:
logger.info(f"新的一天 ({today}),重置睡眠状态为 AWAKE。")
self._total_delayed_minutes_today = 0
self._current_state = SleepState.AWAKE
self._sleep_buffer_end_time = None
self._last_sleep_check_date = today
self._save_sleep_state()
# --- 判断当前是否为理论上的睡眠时间 ---
is_in_theoretical_sleep, activity = self._is_in_theoretical_sleep_time(now.time())
# ===================================
# 状态机核心逻辑
# ===================================
# 状态:清醒 (AWAKE)
if self._current_state == SleepState.AWAKE:
if is_in_theoretical_sleep:
logger.info(f"进入理论休眠时间 '{activity}',开始进行睡眠决策...")
# --- 合并后的失眠与弹性睡眠决策逻辑 ---
sleep_pressure = wakeup_manager.context.sleep_pressure if wakeup_manager else 999
pressure_threshold = global_config.sleep_system.flexible_sleep_pressure_threshold
# 决策1因睡眠压力低而延迟入睡原弹性睡眠
if sleep_pressure < pressure_threshold and self._total_delayed_minutes_today < global_config.sleep_system.max_sleep_delay_minutes:
delay_minutes = 15
self._total_delayed_minutes_today += delay_minutes
self._sleep_buffer_end_time = now + timedelta(minutes=delay_minutes)
self._current_state = SleepState.INSOMNIA
logger.info(f"睡眠压力 ({sleep_pressure:.1f}) 低于阈值 ({pressure_threshold}),进入失眠状态,延迟入睡 {delay_minutes} 分钟。")
# 发送睡前通知
if global_config.sleep_system.enable_pre_sleep_notification:
asyncio.create_task(self._send_pre_sleep_notification())
# 决策2进入正常的入睡准备流程
else:
buffer_seconds = random.randint(5 * 60, 10 * 60)
self._sleep_buffer_end_time = now + timedelta(seconds=buffer_seconds)
self._current_state = SleepState.PREPARING_SLEEP
logger.info(f"睡眠压力正常或已达今日最大延迟,进入准备入睡状态,将在 {buffer_seconds / 60:.1f} 分钟内入睡。")
# 发送睡前通知
if global_config.sleep_system.enable_pre_sleep_notification:
asyncio.create_task(self._send_pre_sleep_notification())
self._save_sleep_state()
# 状态:失眠 (INSOMNIA)
elif self._current_state == SleepState.INSOMNIA:
if not is_in_theoretical_sleep:
logger.info("已离开理论休眠时间,失眠结束,恢复清醒。")
self._current_state = SleepState.AWAKE
self._save_sleep_state()
# TODO: 添加从失眠到准备入睡的转换逻辑
# 状态:准备入睡 (PREPARING_SLEEP)
elif self._current_state == SleepState.PREPARING_SLEEP:
if not is_in_theoretical_sleep:
logger.info("准备入睡期间离开理论休眠时间,取消入睡,恢复清醒。")
self._current_state = SleepState.AWAKE
self._sleep_buffer_end_time = None
self._save_sleep_state()
elif self._sleep_buffer_end_time and now >= self._sleep_buffer_end_time:
logger.info("睡眠缓冲期结束,正式进入休眠状态。")
self._current_state = SleepState.SLEEPING
self._last_fully_slept_log_time = now.timestamp()
self._save_sleep_state()
# 状态:休眠中 (SLEEPING)
elif self._current_state == SleepState.SLEEPING:
if not is_in_theoretical_sleep:
logger.info("理论休眠时间结束,自然醒来。")
self._current_state = SleepState.AWAKE
self._save_sleep_state()
else:
# 记录日志
current_timestamp = now.timestamp()
if current_timestamp - self.last_sleep_log_time > self.sleep_log_interval:
logger.info(f"当前处于休眠活动 '{activity}' 中。")
self.last_sleep_log_time = current_timestamp
# 状态:被吵醒 (WOKEN_UP)
elif self._current_state == SleepState.WOKEN_UP:
if not is_in_theoretical_sleep:
logger.info("理论休眠时间结束,被吵醒的状态自动结束。")
self._current_state = SleepState.AWAKE
self._save_sleep_state()
# TODO: 添加重新入睡的逻辑
def reset_sleep_state_after_wakeup(self): def reset_sleep_state_after_wakeup(self):
"""被唤醒后,将状态切换到 WOKEN_UP""" """被唤醒后,将状态切换到 WOKEN_UP"""
if self._current_state in [SleepState.PREPARING_SLEEP, SleepState.SLEEPING, SleepState.INSOMNIA]: self.sleep_manager.reset_sleep_state_after_wakeup()
logger.info("被唤醒,进入 WOKEN_UP 状态!")
self._current_state = SleepState.WOKEN_UP
self._sleep_buffer_end_time = None
self._save_sleep_state()
def _is_in_theoretical_sleep_time(self, now_time: time) -> (bool, Optional[str]):
"""检查当前时间是否落在日程表的任何一个睡眠活动中"""
sleep_keywords = ["休眠", "睡觉", "梦乡"]
if self.today_schedule:
for event in self.today_schedule:
try:
activity = event.get("activity", "").strip()
time_range = event.get("time_range")
if not activity or not time_range:
continue
if any(keyword in activity for keyword in sleep_keywords):
start_str, end_str = time_range.split('-')
start_time = datetime.strptime(start_str.strip(), "%H:%M").time()
end_time = datetime.strptime(end_str.strip(), "%H:%M").time()
if start_time <= end_time: # 同一天
if start_time <= now_time < end_time:
return True, activity
else: # 跨天
if now_time >= start_time or now_time < end_time:
return True, activity
except (ValueError, KeyError, AttributeError) as e:
logger.warning(f"解析日程事件时出错: {event}, 错误: {e}")
continue
return False, None
async def _send_pre_sleep_notification(self):
"""异步生成并发送睡前通知"""
try:
groups = global_config.sleep_system.pre_sleep_notification_groups
prompt = global_config.sleep_system.pre_sleep_prompt
if not groups:
logger.info("未配置睡前通知的群组,跳过发送。")
return
if not prompt:
logger.warning("睡前通知的prompt为空跳过发送。")
return
# 为防止消息风暴,稍微延迟一下
await asyncio.sleep(random.uniform(5, 15))
for group_id_str in groups:
try:
# 格式 "platform:group_id"
parts = group_id_str.split(":")
if len(parts) != 2:
logger.warning(f"无效的群组ID格式: {group_id_str}")
continue
platform, group_id = parts
# 使用与 ChatStream.get_stream_id 相同的逻辑生成 stream_id
import hashlib
key = "_".join([platform, group_id])
stream_id = hashlib.md5(key.encode()).hexdigest()
logger.info(f"正在为群组 {group_id_str} (Stream ID: {stream_id}) 生成睡前消息...")
# 调用 generator_api 生成回复
success, reply_set, _ = await generator_api.generate_reply(
chat_id=stream_id,
extra_info=prompt,
request_type="schedule.pre_sleep_notification"
)
if success and reply_set:
# 提取文本内容并发送
reply_text = "".join([content for msg_type, content in reply_set if msg_type == "text"])
if reply_text:
logger.info(f"向群组 {group_id_str} 发送睡前消息: {reply_text}")
await send_api.text_to_stream(text=reply_text, stream_id=stream_id)
else:
logger.warning(f"为群组 {group_id_str} 生成的回复内容为空。")
else:
logger.error(f"为群组 {group_id_str} 生成睡前消息失败。")
await asyncio.sleep(random.uniform(2, 5)) # 避免发送过快
except Exception as e:
logger.error(f"向群组 {group_id_str} 发送睡前消息失败: {e}")
except Exception as e:
logger.error(f"发送睡前通知任务失败: {e}")
def _save_sleep_state(self):
"""将当前弹性睡眠状态保存到本地存储"""
try:
state = {
"is_preparing_sleep": self._is_preparing_sleep,
"sleep_buffer_end_time_ts": self._sleep_buffer_end_time.timestamp() if self._sleep_buffer_end_time else None,
"total_delayed_minutes_today": self._total_delayed_minutes_today,
"last_sleep_check_date_str": self._last_sleep_check_date.isoformat() if self._last_sleep_check_date else None,
"is_in_voluntary_delay": self._is_in_voluntary_delay,
"is_woken_up": self._is_woken_up,
}
local_storage["schedule_sleep_state"] = state
logger.debug(f"已保存睡眠状态: {state}")
except Exception as e:
logger.error(f"保存睡眠状态失败: {e}")
def _load_sleep_state(self):
"""从本地存储加载弹性睡眠状态"""
try:
state = local_storage["schedule_sleep_state"]
if state and isinstance(state, dict):
self._is_preparing_sleep = state.get("is_preparing_sleep", False)
end_time_ts = state.get("sleep_buffer_end_time_ts")
if end_time_ts:
self._sleep_buffer_end_time = datetime.fromtimestamp(end_time_ts)
self._total_delayed_minutes_today = state.get("total_delayed_minutes_today", 0)
self._is_in_voluntary_delay = state.get("is_in_voluntary_delay", False)
self._is_woken_up = state.get("is_woken_up", False)
date_str = state.get("last_sleep_check_date_str")
if date_str:
self._last_sleep_check_date = datetime.fromisoformat(date_str).date()
logger.info(f"成功从本地存储加载睡眠状态: {state}")
except Exception as e:
logger.warning(f"加载睡眠状态失败,将使用默认值: {e}")
def reset_wakeup_state(self):
"""重置被唤醒的状态,允许重新尝试入睡"""
if self._is_woken_up:
logger.info("重置唤醒状态,将重新尝试入睡。")
self._is_woken_up = False
self._is_preparing_sleep = False # 允许重新进入弹性睡眠判断
self._sleep_buffer_end_time = None
self._save_sleep_state()
def _validate_schedule_with_pydantic(self, schedule_data) -> bool: def _validate_schedule_with_pydantic(self, schedule_data) -> bool:
"""使用Pydantic验证日程数据格式和完整性""" """使用Pydantic验证日程数据格式和完整性"""
@@ -694,22 +434,21 @@ class ScheduleManager:
if not isinstance(schedule_data, list): if not isinstance(schedule_data, list):
logger.warning("日程数据不是列表格式") logger.warning("日程数据不是列表格式")
return False return False
for item in schedule_data: for item in schedule_data:
if not isinstance(item, dict): if not isinstance(item, dict):
logger.warning(f"日程项不是字典格式: {item}") logger.warning(f"日程项不是字典格式: {item}")
return False return False
if 'time_range' not in item or 'activity' not in item: if "time_range" not in item or "activity" not in item:
logger.warning(f"日程项缺少必要字段 (time_range 或 activity): {item}") logger.warning(f"日程项缺少必要字段 (time_range 或 activity): {item}")
return False return False
if not isinstance(item['time_range'], str) or not isinstance(item['activity'], str): if not isinstance(item["time_range"], str) or not isinstance(item["activity"], str):
logger.warning(f"日程项字段类型不正确: {item}") logger.warning(f"日程项字段类型不正确: {item}")
return False return False
return True
return True
class DailyScheduleGenerationTask(AsyncTask): class DailyScheduleGenerationTask(AsyncTask):
@@ -728,15 +467,17 @@ class DailyScheduleGenerationTask(AsyncTask):
midnight = datetime.combine(tomorrow, time.min) midnight = datetime.combine(tomorrow, time.min)
sleep_seconds = (midnight - now).total_seconds() sleep_seconds = (midnight - now).total_seconds()
logger.info(f"下一次日程生成任务将在 {sleep_seconds:.2f} 秒后运行 (北京时间 {midnight.strftime('%Y-%m-%d %H:%M:%S')})") logger.info(
f"下一次日程生成任务将在 {sleep_seconds:.2f} 秒后运行 (北京时间 {midnight.strftime('%Y-%m-%d %H:%M:%S')})"
)
# 2. 等待直到零点 # 2. 等待直到零点
await asyncio.sleep(sleep_seconds) await asyncio.sleep(sleep_seconds)
# 3. 执行异步日程生成 # 3. 执行异步日程生成
logger.info("到达每日零点,开始异步生成新的一天日程...") logger.info("到达每日零点,开始异步生成新的一天日程...")
await self.schedule_manager.generate_and_save_schedule() await self.schedule_manager.generate_and_save_schedule()
except asyncio.CancelledError: except asyncio.CancelledError:
logger.info("每日日程生成任务被取消。") logger.info("每日日程生成任务被取消。")
break break
@@ -746,4 +487,4 @@ class DailyScheduleGenerationTask(AsyncTask):
await asyncio.sleep(300) await asyncio.sleep(300)
schedule_manager = ScheduleManager() schedule_manager = ScheduleManager()

View File

@@ -0,0 +1,334 @@
import asyncio
import random
from datetime import datetime, timedelta, date, time
from enum import Enum, auto
from typing import Optional, TYPE_CHECKING
from src.common.logger import get_logger
from src.config.config import global_config
from src.manager.local_store_manager import local_storage
from src.plugin_system.apis import send_api, generator_api
if TYPE_CHECKING:
from src.chat.chat_loop.wakeup_manager import WakeUpManager
logger = get_logger("sleep_manager")
class SleepState(Enum):
"""睡眠状态枚举"""
AWAKE = auto() # 完全清醒
INSOMNIA = auto() # 失眠(在理论睡眠时间内保持清醒)
PREPARING_SLEEP = auto() # 准备入睡(缓冲期)
SLEEPING = auto() # 正在休眠
WOKEN_UP = auto() # 被吵醒
class SleepManager:
def __init__(self, schedule_manager):
self.schedule_manager = schedule_manager
self.last_sleep_log_time = 0
self.sleep_log_interval = 35 # 日志记录间隔,单位秒
# --- 统一睡眠状态管理 ---
self._current_state: SleepState = SleepState.AWAKE
self._sleep_buffer_end_time: Optional[datetime] = None
self._total_delayed_minutes_today: int = 0
self._last_sleep_check_date: Optional[date] = None
self._last_fully_slept_log_time: float = 0
self._re_sleep_attempt_time: Optional[datetime] = None # 新增:重新入睡的尝试时间
self._load_sleep_state()
def get_current_sleep_state(self) -> SleepState:
"""获取当前的睡眠状态"""
return self._current_state
def is_sleeping(self) -> bool:
"""检查当前是否处于正式休眠状态"""
return self._current_state == SleepState.SLEEPING
async def update_sleep_state(self, wakeup_manager: Optional["WakeUpManager"] = None):
"""
核心状态机:根据当前情况更新睡眠状态
"""
# --- 基础检查 ---
if not global_config.sleep_system.enable or not self.schedule_manager.today_schedule:
if self._current_state != SleepState.AWAKE:
logger.debug("睡眠系统禁用或无日程,强制设为 AWAKE")
self._current_state = SleepState.AWAKE
return
now = datetime.now()
today = now.date()
# --- 每日状态重置 ---
if self._last_sleep_check_date != today:
logger.info(f"新的一天 ({today}),重置睡眠状态为 AWAKE。")
self._total_delayed_minutes_today = 0
self._current_state = SleepState.AWAKE
self._sleep_buffer_end_time = None
self._last_sleep_check_date = today
self._save_sleep_state()
# --- 判断当前是否为理论上的睡眠时间 ---
is_in_theoretical_sleep, activity = self._is_in_theoretical_sleep_time(now.time())
# ===================================
# 状态机核心逻辑
# ===================================
# 状态:清醒 (AWAKE)
if self._current_state == SleepState.AWAKE:
if is_in_theoretical_sleep:
logger.info(f"进入理论休眠时间 '{activity}',开始进行睡眠决策...")
# --- 合并后的失眠与弹性睡眠决策逻辑 ---
sleep_pressure = wakeup_manager.context.sleep_pressure if wakeup_manager else 999
pressure_threshold = global_config.sleep_system.flexible_sleep_pressure_threshold
# 决策1因睡眠压力低而延迟入睡原弹性睡眠
if sleep_pressure < pressure_threshold and self._total_delayed_minutes_today < global_config.sleep_system.max_sleep_delay_minutes:
delay_minutes = 15
self._total_delayed_minutes_today += delay_minutes
self._sleep_buffer_end_time = now + timedelta(minutes=delay_minutes)
self._current_state = SleepState.INSOMNIA
logger.info(f"睡眠压力 ({sleep_pressure:.1f}) 低于阈值 ({pressure_threshold}),进入失眠状态,延迟入睡 {delay_minutes} 分钟。")
# 发送睡前通知
if global_config.sleep_system.enable_pre_sleep_notification:
asyncio.create_task(self._send_pre_sleep_notification())
# 决策2进入正常的入睡准备流程
else:
buffer_seconds = random.randint(5 * 60, 10 * 60)
self._sleep_buffer_end_time = now + timedelta(seconds=buffer_seconds)
self._current_state = SleepState.PREPARING_SLEEP
logger.info(f"睡眠压力正常或已达今日最大延迟,进入准备入睡状态,将在 {buffer_seconds / 60:.1f} 分钟内入睡。")
# 发送睡前通知
if global_config.sleep_system.enable_pre_sleep_notification:
asyncio.create_task(self._send_pre_sleep_notification())
self._save_sleep_state()
# 状态:失眠 (INSOMNIA)
elif self._current_state == SleepState.INSOMNIA:
if not is_in_theoretical_sleep:
logger.info("已离开理论休眠时间,失眠结束,恢复清醒。")
self._current_state = SleepState.AWAKE
self._save_sleep_state()
elif self._sleep_buffer_end_time and now >= self._sleep_buffer_end_time:
logger.info("失眠状态下的延迟时间已过,重新评估是否入睡...")
sleep_pressure = wakeup_manager.context.sleep_pressure if wakeup_manager else 999
pressure_threshold = global_config.sleep_system.flexible_sleep_pressure_threshold
if sleep_pressure >= pressure_threshold or self._total_delayed_minutes_today >= global_config.sleep_system.max_sleep_delay_minutes:
logger.info("睡眠压力足够或已达最大延迟,从失眠状态转换到准备入睡。")
buffer_seconds = random.randint(5 * 60, 10 * 60)
self._sleep_buffer_end_time = now + timedelta(seconds=buffer_seconds)
self._current_state = SleepState.PREPARING_SLEEP
else:
logger.info(f"睡眠压力({sleep_pressure:.1f})仍然较低再延迟15分钟。")
delay_minutes = 15
self._total_delayed_minutes_today += delay_minutes
self._sleep_buffer_end_time = now + timedelta(minutes=delay_minutes)
self._save_sleep_state()
# 状态:准备入睡 (PREPARING_SLEEP)
elif self._current_state == SleepState.PREPARING_SLEEP:
if not is_in_theoretical_sleep:
logger.info("准备入睡期间离开理论休眠时间,取消入睡,恢复清醒。")
self._current_state = SleepState.AWAKE
self._sleep_buffer_end_time = None
self._save_sleep_state()
elif self._sleep_buffer_end_time and now >= self._sleep_buffer_end_time:
logger.info("睡眠缓冲期结束,正式进入休眠状态。")
self._current_state = SleepState.SLEEPING
self._last_fully_slept_log_time = now.timestamp()
self._save_sleep_state()
# 状态:休眠中 (SLEEPING)
elif self._current_state == SleepState.SLEEPING:
if not is_in_theoretical_sleep:
logger.info("理论休眠时间结束,自然醒来。")
self._current_state = SleepState.AWAKE
self._save_sleep_state()
else:
# 记录日志
current_timestamp = now.timestamp()
if current_timestamp - self.last_sleep_log_time > self.sleep_log_interval:
logger.info(f"当前处于休眠活动 '{activity}' 中。")
self.last_sleep_log_time = current_timestamp
# 状态:被吵醒 (WOKEN_UP)
elif self._current_state == SleepState.WOKEN_UP:
if not is_in_theoretical_sleep:
logger.info("理论休眠时间结束,被吵醒的状态自动结束。")
self._current_state = SleepState.AWAKE
self._re_sleep_attempt_time = None
self._save_sleep_state()
elif self._re_sleep_attempt_time and now >= self._re_sleep_attempt_time:
logger.info("被吵醒后经过一段时间,尝试重新入睡...")
sleep_pressure = wakeup_manager.context.sleep_pressure if wakeup_manager else 999
pressure_threshold = global_config.sleep_system.flexible_sleep_pressure_threshold
if sleep_pressure >= pressure_threshold:
logger.info("睡眠压力足够,从被吵醒状态转换到准备入睡。")
buffer_seconds = random.randint(3 * 60, 8 * 60) # 重新入睡的缓冲期可以短一些
self._sleep_buffer_end_time = now + timedelta(seconds=buffer_seconds)
self._current_state = SleepState.PREPARING_SLEEP
self._re_sleep_attempt_time = None
else:
delay_minutes = 15
self._re_sleep_attempt_time = now + timedelta(minutes=delay_minutes)
logger.info(f"睡眠压力({sleep_pressure:.1f})仍然较低,暂时保持清醒,在 {delay_minutes} 分钟后再次尝试。")
self._save_sleep_state()
def reset_sleep_state_after_wakeup(self):
"""被唤醒后,将状态切换到 WOKEN_UP"""
if self._current_state in [SleepState.PREPARING_SLEEP, SleepState.SLEEPING, SleepState.INSOMNIA]:
logger.info("被唤醒,进入 WOKEN_UP 状态!")
self._current_state = SleepState.WOKEN_UP
self._sleep_buffer_end_time = None
# 设置一个延迟,之后再尝试重新入睡
re_sleep_delay_minutes = getattr(global_config.sleep_system, 're_sleep_delay_minutes', 10)
self._re_sleep_attempt_time = datetime.now() + timedelta(minutes=re_sleep_delay_minutes)
logger.info(f"将在 {re_sleep_delay_minutes} 分钟后尝试重新入睡。")
self._save_sleep_state()
def _is_in_theoretical_sleep_time(self, now_time: time) -> tuple[bool, Optional[str]]:
"""检查当前时间是否落在日程表的任何一个睡眠活动中"""
sleep_keywords = ["休眠", "睡觉", "梦乡"]
if self.schedule_manager.today_schedule:
for event in self.schedule_manager.today_schedule:
try:
activity = event.get("activity", "").strip()
time_range = event.get("time_range")
if not activity or not time_range:
continue
if any(keyword in activity for keyword in sleep_keywords):
start_str, end_str = time_range.split('-')
start_time = datetime.strptime(start_str.strip(), "%H:%M").time()
end_time = datetime.strptime(end_str.strip(), "%H:%M").time()
if start_time <= end_time: # 同一天
if start_time <= now_time < end_time:
return True, activity
else: # 跨天
if now_time >= start_time or now_time < end_time:
return True, activity
except (ValueError, KeyError, AttributeError) as e:
logger.warning(f"解析日程事件时出错: {event}, 错误: {e}")
continue
return False, None
async def _send_pre_sleep_notification(self):
"""异步生成并发送睡前通知"""
try:
groups = global_config.sleep_system.pre_sleep_notification_groups
prompt = global_config.sleep_system.pre_sleep_prompt
if not groups:
logger.info("未配置睡前通知的群组,跳过发送。")
return
if not prompt:
logger.warning("睡前通知的prompt为空跳过发送。")
return
# 为防止消息风暴,稍微延迟一下
await asyncio.sleep(random.uniform(5, 15))
for group_id_str in groups:
try:
# 格式 "platform:group_id"
parts = group_id_str.split(":")
if len(parts) != 2:
logger.warning(f"无效的群组ID格式: {group_id_str}")
continue
platform, group_id = parts
# 使用与 ChatStream.get_stream_id 相同的逻辑生成 stream_id
import hashlib
key = "_".join([platform, group_id])
stream_id = hashlib.md5(key.encode()).hexdigest()
logger.info(f"正在为群组 {group_id_str} (Stream ID: {stream_id}) 生成睡前消息...")
# 调用 generator_api 生成回复
success, reply_set, _ = await generator_api.generate_reply(
chat_id=stream_id,
extra_info=prompt,
request_type="schedule.pre_sleep_notification"
)
if success and reply_set:
# 提取文本内容并发送
reply_text = "".join([content for msg_type, content in reply_set if msg_type == "text"])
if reply_text:
logger.info(f"向群组 {group_id_str} 发送睡前消息: {reply_text}")
await send_api.text_to_stream(text=reply_text, stream_id=stream_id)
else:
logger.warning(f"为群组 {group_id_str} 生成的回复内容为空。")
else:
logger.error(f"为群组 {group_id_str} 生成睡前消息失败。")
await asyncio.sleep(random.uniform(2, 5)) # 避免发送过快
except Exception as e:
logger.error(f"向群组 {group_id_str} 发送睡前消息失败: {e}")
except Exception as e:
logger.error(f"发送睡前通知任务失败: {e}")
def _save_sleep_state(self):
"""将当前睡眠状态保存到本地存储"""
try:
state = {
"current_state": self._current_state.name,
"sleep_buffer_end_time_ts": self._sleep_buffer_end_time.timestamp() if self._sleep_buffer_end_time else None,
"total_delayed_minutes_today": self._total_delayed_minutes_today,
"last_sleep_check_date_str": self._last_sleep_check_date.isoformat() if self._last_sleep_check_date else None,
"re_sleep_attempt_time_ts": self._re_sleep_attempt_time.timestamp() if self._re_sleep_attempt_time else None,
}
local_storage["schedule_sleep_state"] = state
logger.debug(f"已保存睡眠状态: {state}")
except Exception as e:
logger.error(f"保存睡眠状态失败: {e}")
def _load_sleep_state(self):
"""从本地存储加载睡眠状态"""
try:
state = local_storage["schedule_sleep_state"]
if state and isinstance(state, dict):
state_name = state.get("current_state")
if state_name and hasattr(SleepState, state_name):
self._current_state = SleepState[state_name]
end_time_ts = state.get("sleep_buffer_end_time_ts")
if end_time_ts:
self._sleep_buffer_end_time = datetime.fromtimestamp(end_time_ts)
re_sleep_ts = state.get("re_sleep_attempt_time_ts")
if re_sleep_ts:
self._re_sleep_attempt_time = datetime.fromtimestamp(re_sleep_ts)
self._total_delayed_minutes_today = state.get("total_delayed_minutes_today", 0)
date_str = state.get("last_sleep_check_date_str")
if date_str:
self._last_sleep_check_date = datetime.fromisoformat(date_str).date()
logger.info(f"成功从本地存储加载睡眠状态: {state}")
except Exception as e:
logger.warning(f"加载睡眠状态失败,将使用默认值: {e}")