This commit is contained in:
Windpicker-owo
2025-10-24 23:22:18 +08:00
16 changed files with 197 additions and 634 deletions

View File

@@ -17,6 +17,7 @@ from src.plugin_system import (
ToolParamType,
register_plugin,
)
from src.plugin_system.base.component_types import InjectionRule,InjectionType
from src.plugin_system.base.base_event import HandlerResult
@@ -185,7 +186,7 @@ class WeatherPrompt(BasePrompt):
prompt_name = "weather_info_prompt"
prompt_description = "向Planner注入当前天气信息以丰富对话上下文。"
injection_point = "planner_prompt"
injection_rules = [InjectionRule(target_prompt="planner_prompt", injection_type=InjectionType.REPLACE, target_content="## 可用动作列表")]
async def execute(self) -> str:
# 在实际应用中这里可以调用天气API

View File

@@ -20,7 +20,6 @@ from src.plugin_system.apis.chat_api import get_chat_manager
from .distribution_manager import stream_loop_manager
from .global_notice_manager import NoticeScope, global_notice_manager
from .sleep_system.state_manager import SleepState, sleep_state_manager
if TYPE_CHECKING:
pass
@@ -148,13 +147,6 @@ class MessageManager:
async def add_message(self, stream_id: str, message: DatabaseMessages):
"""添加消息到指定聊天流"""
# 在消息处理的最前端检查睡眠状态
current_sleep_state = sleep_state_manager.get_current_state()
if current_sleep_state == SleepState.SLEEPING:
logger.info(f"处于 {current_sleep_state.name} 状态,消息被拦截。")
return # 直接返回,不处理消息
# TODO: 在这里为 WOKEN_UP_ANGRY 等未来状态添加特殊处理逻辑
try:
# 检查是否为notice消息

View File

@@ -1,195 +0,0 @@
import random
from datetime import datetime, timedelta
from src.common.logger import get_logger
from src.config.config import global_config
from src.schedule.schedule_manager import schedule_manager
from .state_manager import SleepState, sleep_state_manager
logger = get_logger("sleep_logic")
class SleepLogic:
"""
核心睡眠逻辑,睡眠系统的“大脑”
负责根据当前的配置、时间、日程表以及状态,判断是否需要切换睡眠状态。
它本身是无状态的,所有的状态都读取和写入 SleepStateManager。
"""
def check_and_update_sleep_state(self):
"""
检查并更新当前的睡眠状态,这是整个逻辑的入口。
由定时任务周期性调用。
"""
current_state = sleep_state_manager.get_current_state()
now = datetime.now()
if current_state == SleepState.AWAKE:
self._check_should_fall_asleep(now)
elif current_state == SleepState.SLEEPING:
self._check_should_wake_up(now)
elif current_state == SleepState.INSOMNIA:
# TODO: 实现失眠逻辑
# 例如:检查失眠状态是否结束,如果结束则转换回 SLEEPING
pass
elif current_state == SleepState.WOKEN_UP_ANGRY:
# TODO: 实现起床气逻辑
# 例如:检查生气状态是否结束,如果结束则转换回 SLEEPING 或 AWAKE
pass
def _check_should_fall_asleep(self, now: datetime):
"""
当状态为 AWAKE 时,检查是否应该进入睡眠。
"""
should_sleep, wake_up_time = self._should_be_sleeping(now)
if should_sleep:
logger.info("判断结果:应进入睡眠状态。")
sleep_state_manager.set_state(SleepState.SLEEPING, wake_up=wake_up_time)
def _check_should_wake_up(self, now: datetime):
"""
当状态为 SLEEPING 时,检查是否应该醒来。
这里包含了处理跨天获取日程的核心逻辑。
"""
wake_up_time = sleep_state_manager.get_wake_up_time()
# 核心逻辑:两段式检测
# 如果 state_manager 中还没有起床时间,说明是昨晚入睡,需要等待今天凌晨的新日程。
sleep_start_time = sleep_state_manager.get_sleep_start_time()
if not wake_up_time:
if sleep_start_time and now.date() > sleep_start_time.date():
logger.debug("当前为睡眠状态但无起床时间,尝试从新日程中解析...")
_, new_wake_up_time = self._get_wakeup_times_from_schedule(now)
if new_wake_up_time:
logger.info(f"成功从新日程获取到起床时间: {new_wake_up_time.strftime('%H:%M')}")
sleep_state_manager.set_wake_up_time(new_wake_up_time)
wake_up_time = new_wake_up_time
else:
logger.debug("未能获取到新的起床时间,继续睡眠。")
return
else:
logger.info("还没有到达第二天,继续睡眠。")
logger.info(f"尚未到苏醒时间,苏醒时间在{wake_up_time}")
if wake_up_time and now >= wake_up_time:
logger.info(f"当前时间 {now.strftime('%H:%M')} 已到达或超过预定起床时间 {wake_up_time.strftime('%H:%M')}")
sleep_state_manager.set_state(SleepState.AWAKE)
def _should_be_sleeping(self, now: datetime) -> tuple[bool, datetime | None]:
"""
判断在当前时刻,是否应该处于睡眠时间。
Returns:
元组 (是否应该睡眠, 预期的起床时间或None)
"""
sleep_config = global_config.sleep_system
if not sleep_config.enable:
return False, None
sleep_time, wake_up_time = None, None
if sleep_config.sleep_by_schedule:
sleep_time, _ = self._get_sleep_times_from_schedule(now)
if not sleep_time:
logger.debug("日程表模式开启,但未找到睡眠时间,使用固定时间作为备用。")
sleep_time, wake_up_time = self._get_fixed_sleep_times(now)
else:
sleep_time, wake_up_time = self._get_fixed_sleep_times(now)
if not sleep_time:
return False, None
# 检查当前时间是否在睡眠时间范围内
if now >= sleep_time:
# 如果起床时间是第二天(通常情况),且当前时间小于起床时间,则在睡眠范围内
if wake_up_time and wake_up_time > sleep_time and now < wake_up_time:
return True, wake_up_time
# 如果当前时间大于入睡时间,说明已经进入睡眠窗口
return True, wake_up_time
return False, None
def _get_fixed_sleep_times(self, now: datetime) -> tuple[datetime | None, datetime | None]:
"""
当使用“固定时间”模式时,从此方法计算睡眠和起床时间。
会加入配置中的随机偏移量,让作息更自然。
"""
sleep_config = global_config.sleep_system
try:
sleep_offset = random.randint(
-sleep_config.sleep_time_offset_minutes, sleep_config.sleep_time_offset_minutes
)
wake_up_offset = random.randint(
-sleep_config.wake_up_time_offset_minutes, sleep_config.wake_up_time_offset_minutes
)
sleep_t = datetime.strptime(sleep_config.fixed_sleep_time, "%H:%M").time()
wake_up_t = datetime.strptime(sleep_config.fixed_wake_up_time, "%H:%M").time()
sleep_time = datetime.combine(now.date(), sleep_t) + timedelta(minutes=sleep_offset)
# 如果起床时间比睡觉时间早,说明是第二天
wake_up_day = now.date() + timedelta(days=1) if wake_up_t < sleep_t else now.date()
wake_up_time = datetime.combine(wake_up_day, wake_up_t) + timedelta(minutes=wake_up_offset)
return sleep_time, wake_up_time
except (ValueError, TypeError) as e:
logger.error(f"解析固定睡眠时间失败: {e}")
return None, None
def _get_sleep_times_from_schedule(self, now: datetime) -> tuple[datetime | None, datetime | None]:
"""
当使用“日程表”模式时,从此方法获取睡眠时间。
实现了核心逻辑:
- 解析“今天”日程中的睡觉时间。
"""
# 阶段一:获取当天的睡觉时间
today_schedule = schedule_manager.today_schedule
sleep_time = None
if today_schedule:
for event in today_schedule:
activity = event.get("activity", "").lower()
if "sleep" in activity or "睡觉" in activity or "休息" in activity:
try:
time_range = event.get("time_range", "")
start_str, _ = time_range.split("-")
sleep_t = datetime.strptime(start_str.strip(), "%H:%M").time()
sleep_time = datetime.combine(now.date(), sleep_t)
break
except (ValueError, AttributeError):
logger.warning(f"解析日程中的睡眠时间失败: {event}")
continue
wake_up_time = None
return sleep_time, wake_up_time
def _get_wakeup_times_from_schedule(self, now: datetime) -> tuple[datetime | None, datetime | None]:
"""
当使用“日程表”模式时,从此方法获取睡眠时间。
实现了核心逻辑:
- 解析“今天”日程中的睡觉时间。
"""
# 阶段一:获取当天的睡觉时间
today_schedule = schedule_manager.today_schedule
wake_up_time = None
if today_schedule:
for event in today_schedule:
activity = event.get("activity", "").lower()
if "wake_up" in activity or "醒来" in activity or "起床" in activity:
try:
time_range = event.get("time_range", "")
start_str, _ = time_range.split("-")
sleep_t = datetime.strptime(start_str.strip(), "%H:%M").time()
wake_up_time = datetime.combine(now.date(), sleep_t)
break
except (ValueError, AttributeError):
logger.warning(f"解析日程中的睡眠时间失败: {event}")
continue
return None, wake_up_time
# 全局单例
sleep_logic = SleepLogic()

View File

@@ -1,190 +0,0 @@
import enum
from datetime import datetime, timedelta
from typing import Any
from src.common.logger import get_logger
from src.manager.local_store_manager import local_storage
logger = get_logger("sleep_state_manager")
class SleepState(enum.Enum):
"""
定义了所有可能的睡眠状态。
使用枚举可以使状态管理更加清晰和安全。
"""
AWAKE = "awake" # 清醒状态,正常活动
SLEEPING = "sleeping" # 沉睡状态,此时应拦截消息
INSOMNIA = "insomnia" # 失眠状态(为未来功能预留)
WOKEN_UP_ANGRY = "woken_up_angry" # 被吵醒后的生气状态(为未来功能预留)
class SleepStateManager:
"""
睡眠状态管理器 (单例模式)
这是整个睡眠系统的数据核心,负责:
1. 管理当前的睡眠状态(如:是否在睡觉、唤醒度等)。
2. 将状态持久化到本地JSON文件(`local_store.json`),实现重启后状态不丢失。
3. 提供统一的接口供其他模块查询和修改睡眠状态。
"""
_instance = None
_STATE_KEY = "sleep_system_state" # 在 local_store.json 中存储的键名
def __new__(cls, *args, **kwargs):
# 实现单例模式,确保全局只有一个状态管理器实例
if not cls._instance:
cls._instance = super(SleepStateManager, cls).__new__(cls, *args, **kwargs)
return cls._instance
def __init__(self):
"""
初始化状态管理器,定义状态数据结构并从本地加载历史状态。
"""
self.state: dict[str, Any] = {}
self._default_state()
self.load_state()
def _default_state(self):
"""
定义并重置为默认的“清醒”状态。
当机器人启动或从睡眠中醒来时调用。
"""
self.state = {
"state": SleepState.AWAKE.value,
"state_until": None, # 特殊状态(如生气)的自动结束时间
"sleep_start_time": None, # 本次睡眠的开始时间
"wake_up_time": None, # 预定的起床时间
"wakefulness": 0.0, # 唤醒度/清醒值,用于判断是否被吵醒
"last_checked": None, # 定时任务最后检查的时间
}
def load_state(self):
"""
程序启动时,从 local_storage 加载上一次的状态。
如果找不到历史状态,则初始化为默认状态。
"""
stored_state = local_storage[self._STATE_KEY]
if isinstance(stored_state, dict):
# 合并加载的状态,以防新增字段
self.state.update(stored_state)
# 确保 state 字段是枚举成员
if "state" in self.state and not isinstance(self.state["state"], SleepState):
try:
self.state["state"] = SleepState(self.state["state"])
except ValueError:
logger.warning(f"加载了无效的睡眠状态 '{self.state['state']}',重置为 AWAKE。")
self.state["state"] = SleepState.AWAKE
else:
self.state["state"] = SleepState.AWAKE # 兼容旧数据
logger.info(f"成功加载睡眠状态: {self.get_current_state().name}")
else:
logger.info("未找到已存储的睡眠状态,将使用默认值。")
self.save_state()
def save_state(self):
"""
将当前内存中的状态保存到 local_storage。
在保存前,会将枚举类型的 state 转换为字符串以便JSON序列化。
"""
data_to_save = self.state.copy()
# 将 state 枚举成员转换为它的值(字符串)
data_to_save["state"] = self.state["state"]
local_storage[self._STATE_KEY] = data_to_save
logger.debug(f"睡眠状态已保存: {data_to_save}")
def get_current_state(self) -> SleepState:
"""
获取当前的睡眠状态。
在返回状态前,会先检查特殊状态(如生气)是否已过期。
"""
# 检查特殊状态是否已过期
state_until_str = self.state.get("state_until")
if state_until_str:
state_until = datetime.fromisoformat(state_until_str)
if datetime.now() > state_until:
logger.info(f"特殊状态 {self.state['state'].name} 已结束,自动恢复为 SLEEPING。")
# 假设特殊状态(如生气)结束后,是恢复到普通睡眠状态
self.set_state(SleepState.SLEEPING)
return self.state["state"]
def set_state(
self,
new_state: SleepState,
duration_seconds: float | None = None,
sleep_start: datetime | None = None,
wake_up: datetime | None = None,
):
"""
核心函数:切换到新的睡眠状态,并更新相关的状态数据。
"""
current_state = self.get_current_state()
if current_state == new_state:
return # 状态未改变
logger.info(f"睡眠状态变更: {current_state.name} -> {new_state.name}")
self.state["state"] = new_state
if new_state == SleepState.AWAKE:
self._default_state() # 醒来时重置所有状态
self.state["state"] = SleepState.AWAKE # 确保状态正确
elif new_state == SleepState.SLEEPING:
self.state["sleep_start_time"] = (sleep_start or datetime.now()).isoformat()
self.state["wake_up_time"] = wake_up.isoformat() if wake_up else None
self.state["state_until"] = None # 清除特殊状态持续时间
self.state["wakefulness"] = 0.0 # 进入睡眠时清零唤醒度
elif new_state in [SleepState.WOKEN_UP_ANGRY, SleepState.INSOMNIA]:
if duration_seconds:
self.state["state_until"] = (datetime.now() + timedelta(seconds=duration_seconds)).isoformat()
else:
self.state["state_until"] = None
self.save_state()
def update_last_checked(self):
"""更新最后检查时间"""
self.state["last_checked"] = datetime.now().isoformat()
self.save_state()
def get_wake_up_time(self) -> datetime | None:
"""获取预定的起床时间,如果已设置的话。"""
wake_up_str = self.state.get("wake_up_time")
if wake_up_str:
try:
return datetime.fromisoformat(wake_up_str)
except (ValueError, TypeError):
return None
return None
def get_sleep_start_time(self) -> datetime | None:
"""获取本次睡眠的开始时间,如果已设置的话。"""
sleep_start_str = self.state.get("sleep_start_time")
if sleep_start_str:
try:
return datetime.fromisoformat(sleep_start_str)
except (ValueError, TypeError):
return None
return None
def set_wake_up_time(self, wake_up: datetime):
"""
更新起床时间。
主要用于“日程表”模式下,当第二天凌晨拿到新日程时,更新之前未知的起床时间。
"""
if self.get_current_state() == SleepState.AWAKE:
logger.warning("尝试为清醒状态设置起床时间,操作被忽略。")
return
self.state["wake_up_time"] = wake_up.isoformat()
logger.info(f"更新预定起床时间为: {self.state['wake_up_time']}")
self.save_state()
# 全局单例
sleep_state_manager = SleepStateManager()

View File

@@ -1,44 +0,0 @@
from src.common.logger import get_logger
from src.manager.async_task_manager import AsyncTask, async_task_manager
from .sleep_logic import sleep_logic
logger = get_logger("sleep_tasks")
class SleepSystemCheckTask(AsyncTask):
"""
睡眠系统周期性检查任务。
继承自 AsyncTask由 async_task_manager 统一管理。
"""
def __init__(self, run_interval: int = 60):
"""
初始化任务。
Args:
run_interval (int): 任务运行的时间间隔。默认为60秒检查一次。
"""
super().__init__(task_name="SleepSystemCheckTask", run_interval=run_interval)
async def run(self):
"""
任务的核心执行过程。
每次运行时,调用 sleep_logic 的主函数来检查和更新状态。
"""
logger.debug("睡眠系统定时任务触发,开始检查状态...")
try:
# 调用“大脑”进行一次思考和判断
sleep_logic.check_and_update_sleep_state()
except Exception as e:
logger.error(f"周期性检查睡眠状态时发生未知错误: {e}", exc_info=True)
async def start_sleep_system_tasks():
"""
启动睡眠系统的后台定时检查任务。
这个函数应该在程序启动时(例如 main.py被调用。
"""
logger.info("正在启动睡眠系统后台任务...")
check_task = SleepSystemCheckTask()
await async_task_manager.add_task(check_task)
logger.info("睡眠系统后台任务已成功启动。")

View File

@@ -130,15 +130,19 @@ class PromptManager:
# 确保我们有有效的parameters实例
params_for_injection = parameters or original_prompt.parameters
components_prefix = await prompt_component_manager.execute_components_for(
injection_point=original_prompt.name, params=params_for_injection
# 应用所有匹配的注入规则,获取修改后的模板
modified_template = await prompt_component_manager.apply_injections(
target_prompt_name=original_prompt.name,
original_template=original_prompt.template,
params=params_for_injection,
)
if components_prefix:
logger.info(f"'{name}'注入插件内容: \n{components_prefix}")
# 如果模板被修改了就创建一个新的临时Prompt实例
if modified_template != original_prompt.template:
logger.info(f"'{name}'应用了Prompt注入规则")
# 创建一个新的临时Prompt实例不进行注册
new_template = f"{components_prefix}\n\n{original_prompt.template}"
temp_prompt = Prompt(
template=new_template,
template=modified_template,
name=original_prompt.name,
parameters=original_prompt.parameters,
should_register=False, # 确保不重新注册
@@ -1079,12 +1083,12 @@ async def create_prompt_async(
# 动态注入插件内容
if name:
components_prefix = await prompt_component_manager.execute_components_for(
injection_point=name, params=final_params
modified_template = await prompt_component_manager.apply_injections(
target_prompt_name=name, original_template=template, params=final_params
)
if components_prefix:
logger.debug(f"'{name}'注入插件内容: \n{components_prefix}")
template = f"{components_prefix}\n\n{template}"
if modified_template != template:
logger.debug(f"'{name}'应用了Prompt注入规则")
template = modified_template
# 使用可能已修改的模板创建实例
prompt = create_prompt(template, name, final_params)

View File

@@ -1,9 +1,11 @@
import asyncio
import re
from typing import Type
from src.chat.utils.prompt_params import PromptParameters
from src.common.logger import get_logger
from src.plugin_system.base.base_prompt import BasePrompt
from src.plugin_system.base.component_types import ComponentType, PromptInfo
from src.plugin_system.base.component_types import ComponentType, InjectionRule, InjectionType, PromptInfo
from src.plugin_system.core.component_registry import component_registry
logger = get_logger("prompt_component_manager")
@@ -19,89 +21,143 @@ class PromptComponentManager:
3. 提供一个接口以便在构建核心Prompt时能够获取并执行所有相关的组件。
"""
def get_components_for(self, injection_point: str) -> list[type[BasePrompt]]:
def _get_rules_for(self, target_prompt_name: str) -> list[tuple[InjectionRule, Type[BasePrompt]]]:
"""
获取指定注入点的所有已注册组件类。
获取指定目标Prompt的所有注入规则及其关联的组件类。
Args:
injection_point: 目标Prompt的名称。
target_prompt_name (str): 目标 Prompt 的名称。
Returns:
list[Type[BasePrompt]]: 与该注入点关联的组件类列表
list[tuple[InjectionRule, Type[BasePrompt]]]: 一个元组列表
每个元组包含一个注入规则和其对应的 Prompt 组件类,并已根据优先级排序。
"""
# 从组件注册中心获取所有启用的Prompt组件
# 从注册表中获取所有启用的 PROMPT 类型的组件
enabled_prompts = component_registry.get_enabled_components_by_type(ComponentType.PROMPT)
matching_rules = []
matching_components: list[type[BasePrompt]] = []
# 遍历所有启用的 Prompt 组件,查找与目标 Prompt 相关的注入规则
for prompt_name, prompt_info in enabled_prompts.items():
# 确保 prompt_info 是 PromptInfo 类型
if not isinstance(prompt_info, PromptInfo):
continue
# 获取注入点信息
injection_points = prompt_info.injection_point
if isinstance(injection_points, str):
injection_points = [injection_points]
# prompt_info.injection_rules 已经经过了后向兼容处理,确保总是列表
for rule in prompt_info.injection_rules:
# 如果规则的目标是当前指定的 Prompt
if rule.target_prompt == target_prompt_name:
# 获取该规则对应的组件类
component_class = component_registry.get_component_class(prompt_name, ComponentType.PROMPT)
# 确保获取到的确实是一个 BasePrompt 的子类
if component_class and issubclass(component_class, BasePrompt):
matching_rules.append((rule, component_class))
# 检查当前注入点是否匹配
if injection_point in injection_points:
# 获取组件类
component_class = component_registry.get_component_class(prompt_name, ComponentType.PROMPT)
if component_class and issubclass(component_class, BasePrompt):
matching_components.append(component_class)
# 根据规则的优先级进行排序,数字越小,优先级越高,越先应用
matching_rules.sort(key=lambda x: x[0].priority)
return matching_rules
return matching_components
async def execute_components_for(self, injection_point: str, params: PromptParameters) -> str:
async def apply_injections(
self, target_prompt_name: str, original_template: str, params: PromptParameters
) -> str:
"""
实例化并执行指定注入点的所有组件,然后将它们的输出拼接成一个字符串
获取、实例化并执行所有相关组件,然后根据注入规则修改原始模板
这是一个三步走的过程:
1. 实例化所有需要执行的组件。
2. 并行执行它们的 `execute` 方法以获取注入内容。
3. 按照优先级顺序,将内容注入到原始模板中。
Args:
injection_point: 目标Prompt的名称。
params: 用于初始化组件的 PromptParameters 对象
target_prompt_name (str): 目标 Prompt 的名称。
original_template (str): 原始的、未经修改的 Prompt 模板字符串
params (PromptParameters): 传递给 Prompt 组件实例的参数。
Returns:
str: 所有相关组件生成的、用换行符连接的文本内容
str: 应用了所有注入规则后,修改过的 Prompt 模板字符串
"""
component_classes = self.get_components_for(injection_point)
if not component_classes:
return ""
rules_with_classes = self._get_rules_for(target_prompt_name)
# 如果没有找到任何匹配的规则,就直接返回原始模板,啥也不干
if not rules_with_classes:
return original_template
tasks = []
for component_class in component_classes:
try:
# 从注册中心获取组件信息
prompt_info = component_registry.get_component_info(
component_class.prompt_name, ComponentType.PROMPT
)
if not isinstance(prompt_info, PromptInfo):
logger.warning(f"找不到 Prompt 组件 '{component_class.prompt_name}' 的信息,无法获取插件配置")
plugin_config = {}
else:
plugin_config = component_registry.get_plugin_config(prompt_info.plugin_name)
# --- 第一步: 实例化所有需要执行的组件 ---
instance_map = {} # 存储组件实例,虽然目前没直接用,但留着总没错
tasks = [] # 存放所有需要并行执行的 execute 异步任务
components_to_execute = [] # 存放需要执行的组件类,用于后续结果映射
instance = component_class(params=params, plugin_config=plugin_config)
tasks.append(instance.execute())
except Exception as e:
logger.error(f"实例化 Prompt 组件 '{component_class.prompt_name}' 失败: {e}")
for rule, component_class in rules_with_classes:
# 如果注入类型是 REMOVE那就不需要执行组件了因为它不产生内容
if rule.injection_type != InjectionType.REMOVE:
try:
# 获取组件的元信息,主要是为了拿到插件名称来读取插件配置
prompt_info = component_registry.get_component_info(
component_class.prompt_name, ComponentType.PROMPT
)
if not isinstance(prompt_info, PromptInfo):
plugin_config = {}
else:
# 从注册表获取该组件所属插件的配置
plugin_config = component_registry.get_plugin_config(prompt_info.plugin_name)
if not tasks:
return ""
# 实例化组件,并传入参数和插件配置
instance = component_class(params=params, plugin_config=plugin_config)
instance_map[component_class.prompt_name] = instance
# 将组件的 execute 方法作为一个任务添加到列表中
tasks.append(instance.execute())
components_to_execute.append(component_class)
except Exception as e:
logger.error(f"实例化 Prompt 组件 '{component_class.prompt_name}' 失败: {e}")
# 即使失败,也添加一个立即完成的空任务,以保持与其他任务的索引同步
tasks.append(asyncio.create_task(asyncio.sleep(0, result=e))) # type: ignore
# 并行执行所有组件
# --- 第二步: 并行执行所有组件的 execute 方法 ---
# 使用 asyncio.gather 来同时运行所有任务,提高效率
results = await asyncio.gather(*tasks, return_exceptions=True)
# 创建一个从组件名到执行结果的映射,方便后续查找
result_map = {
components_to_execute[i].prompt_name: res
for i, res in enumerate(results)
if not isinstance(res, Exception) # 只包含成功的结果
}
# 单独处理并记录执行失败的组件
for i, res in enumerate(results):
if isinstance(res, Exception):
logger.error(f"执行 Prompt 组件 '{components_to_execute[i].prompt_name}' 失败: {res}")
# 过滤掉执行失败的结果和空字符串
valid_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"执行 Prompt 组件 '{component_classes[i].prompt_name}' 失败: {result}")
elif result and isinstance(result, str) and result.strip():
valid_results.append(result.strip())
# --- 第三步: 按优先级顺序应用注入规则 ---
modified_template = original_template
for rule, component_class in rules_with_classes:
# 从结果映射中获取该组件生成的内容
content = result_map.get(component_class.prompt_name)
# 使用换行符拼接所有有效结果
return "\n".join(valid_results)
try:
if rule.injection_type == InjectionType.PREPEND:
if content:
modified_template = f"{content}\n{modified_template}"
elif rule.injection_type == InjectionType.APPEND:
if content:
modified_template = f"{modified_template}\n{content}"
elif rule.injection_type == InjectionType.REPLACE:
# 使用正则表达式替换目标内容
if content and rule.target_content:
modified_template = re.sub(rule.target_content, str(content), modified_template)
elif rule.injection_type == InjectionType.INSERT_AFTER:
# 在匹配到的内容后面插入
if content and rule.target_content:
# re.sub a little trick: \g<0> represents the entire matched string
replacement = f"\\g<0>\n{content}"
modified_template = re.sub(rule.target_content, replacement, modified_template)
elif rule.injection_type == InjectionType.REMOVE:
# 使用正则表达式移除目标内容
if rule.target_content:
modified_template = re.sub(rule.target_content, "", modified_template)
except re.error as e:
logger.error(
f"在为 '{component_class.prompt_name}' 应用规则时发生正则错误: {e} (pattern: '{rule.target_content}')"
)
except Exception as e:
logger.error(f"应用 Prompt 注入规则 '{rule}' 失败: {e}")
return modified_template
# 创建全局单例

View File

@@ -40,7 +40,6 @@ from src.config.official_configs import (
ProactiveThinkingConfig,
ResponsePostProcessConfig,
ResponseSplitterConfig,
SleepSystemConfig,
ToolConfig,
VideoAnalysisConfig,
VoiceConfig,
@@ -410,7 +409,6 @@ class Config(ValidatedConfigBase):
default_factory=lambda: DependencyManagementConfig(), description="依赖管理配置"
)
web_search: WebSearchConfig = Field(default_factory=lambda: WebSearchConfig(), description="网络搜索配置")
sleep_system: SleepSystemConfig = Field(default_factory=lambda: SleepSystemConfig(), description="睡眠系统配置")
planning_system: PlanningSystemConfig = Field(
default_factory=lambda: PlanningSystemConfig(), description="规划系统配置"
)

View File

@@ -593,52 +593,6 @@ class AntiPromptInjectionConfig(ValidatedConfigBase):
shield_suffix: str = Field(default=" 🛡️", description="保护后缀")
class SleepSystemConfig(ValidatedConfigBase):
"""睡眠系统配置类"""
enable: bool = Field(default=True, description="是否启用睡眠系统")
sleep_by_schedule: bool = Field(default=True, description="是否根据日程表进行睡觉")
fixed_sleep_time: str = Field(default="23:00", description="固定的睡觉时间")
fixed_wake_up_time: str = Field(default="07:00", description="固定的起床时间")
sleep_time_offset_minutes: int = Field(
default=15, ge=0, le=60, description="睡觉时间随机偏移量范围(分钟),实际睡觉时间会在±该值范围内随机"
)
wake_up_time_offset_minutes: int = Field(
default=15, ge=0, le=60, description="起床时间随机偏移量范围(分钟),实际起床时间会在±该值范围内随机"
)
wakeup_threshold: float = Field(default=15.0, ge=1.0, description="唤醒阈值,达到此值时会被唤醒")
private_message_increment: float = Field(default=3.0, ge=0.1, description="私聊消息增加的唤醒度")
group_mention_increment: float = Field(default=2.0, ge=0.1, description="群聊艾特增加的唤醒度")
decay_rate: float = Field(default=0.2, ge=0.0, description="每次衰减的唤醒度数值")
decay_interval: float = Field(default=30.0, ge=1.0, description="唤醒度衰减间隔(秒)")
angry_duration: float = Field(default=300.0, ge=10.0, description="愤怒状态持续时间(秒)")
angry_prompt: str = Field(default="你被人吵醒了非常生气,说话带着怒气", description="被吵醒后的愤怒提示词")
re_sleep_delay_minutes: int = Field(
default=5, ge=1, description="被唤醒后,如果多久没有新消息则尝试重新入睡(分钟)"
)
# --- 失眠机制相关参数 ---
enable_insomnia_system: bool = Field(default=True, description="是否启用失眠系统")
insomnia_trigger_delay_minutes: list[int] = Field(
default_factory=lambda: [30, 60], description="入睡后触发失眠判定的延迟时间范围(分钟)"
)
insomnia_duration_minutes: list[int] = Field(
default_factory=lambda: [15, 45], description="单次失眠状态的持续时间范围(分钟)"
)
insomnia_chance_pressure: float = Field(default=0.1, ge=0.0, le=1.0, description="失眠基础概率")
# --- 弹性睡眠与睡前消息 ---
enable_flexible_sleep: bool = Field(default=True, description="是否启用弹性睡眠")
flexible_sleep_pressure_threshold: float = Field(
default=40.0, description="触发弹性睡眠的睡眠压力阈值,低于该值可能延迟入睡"
)
max_sleep_delay_minutes: int = Field(default=60, description="单日最大延迟入睡分钟数")
enable_pre_sleep_notification: bool = Field(default=True, description="是否启用睡前消息")
pre_sleep_prompt: str = Field(
default="我准备睡觉了,请生成一句简短自然的晚安问候。", description="用于生成睡前消息的提示"
)
class ContextGroup(ValidatedConfigBase):
"""
上下文共享组配置

View File

@@ -13,7 +13,6 @@ from rich.traceback import install
from src.chat.emoji_system.emoji_manager import get_emoji_manager
from src.chat.memory_system.memory_manager import memory_manager
from src.chat.message_manager.sleep_system.tasks import start_sleep_system_tasks
from src.chat.message_receive.bot import chat_bot
from src.chat.message_receive.chat_stream import get_chat_manager
from src.chat.utils.statistic import OnlineTimeRecordTask, StatisticOutputTask
@@ -512,22 +511,12 @@ MoFox_Bot(第三方修改版)
logger.error(f"月度计划管理器初始化失败: {e}")
# 初始化日程管理器
if global_config.planning_system.schedule_enable:
try:
await schedule_manager.load_or_generate_today_schedule()
await schedule_manager.start_daily_schedule_generation()
await schedule_manager.initialize()
logger.info("日程表管理器初始化成功")
except Exception as e:
logger.error(f"日程表管理器初始化失败: {e}")
# 初始化睡眠系统
if global_config.sleep_system.enable:
try:
await start_sleep_system_tasks()
logger.info("睡眠系统初始化成功")
except Exception as e:
logger.error(f"睡眠系统初始化失败: {e}")
def _safe_init(self, component_name: str, init_func) -> callable:
"""安全初始化组件,捕获异常"""

View File

@@ -3,7 +3,7 @@ from typing import Any
from src.chat.utils.prompt_params import PromptParameters
from src.common.logger import get_logger
from src.plugin_system.base.component_types import ComponentType, PromptInfo
from src.plugin_system.base.component_types import ComponentType, InjectionRule, PromptInfo
logger = get_logger("base_prompt")
@@ -16,7 +16,7 @@ class BasePrompt(ABC):
子类可以通过类属性定义其行为:
- prompt_name: Prompt组件的唯一名称。
- injection_point: 指定要注入的目标Prompt名称或名称列表
- injection_rules: 定义注入规则的列表。
"""
prompt_name: str = ""
@@ -24,11 +24,15 @@ class BasePrompt(ABC):
prompt_description: str = ""
"""Prompt组件的描述"""
# 定义此组件希望注入到哪个或哪些核心Prompt中
# 可以是一个字符串(单个目标)或字符串列表(多个目标)
# 例如: "planner_prompt" 或 ["s4u_style_prompt", "normal_style_prompt"]
injection_point: str | list[str] = ""
"""要注入的目标Prompt名称或列表"""
# 定义此组件希望如何注入到核心Prompt中
# 是一个 InjectionRule 对象的列表,可以实现复杂的注入逻辑
# 例如: [InjectionRule(target_prompt="planner_prompt", injection_type=InjectionType.APPEND, priority=50)]
injection_rules: list[InjectionRule] = []
"""定义注入规则的列表"""
# 旧的注入点定义,用于向后兼容。如果定义了这个,它将被自动转换为 injection_rules。
injection_point: str | list[str] | None = None
"""[已废弃] 要注入的目标Prompt名称或列表请使用 injection_rules"""
def __init__(self, params: PromptParameters, plugin_config: dict | None = None):
"""初始化Prompt组件
@@ -87,9 +91,11 @@ class BasePrompt(ABC):
if not cls.prompt_name:
raise ValueError("Prompt组件必须定义 'prompt_name' 类属性。")
# 同时传递新旧两种定义PromptInfo的__post_init__将处理兼容性问题
return PromptInfo(
name=cls.prompt_name,
component_type=ComponentType.PROMPT,
description=cls.prompt_description,
injection_rules=cls.injection_rules,
injection_point=cls.injection_point,
)

View File

@@ -2,6 +2,38 @@ from dataclasses import dataclass, field
from enum import Enum
from typing import Any
class InjectionType(Enum):
"""Prompt注入类型枚举"""
PREPEND = "prepend" # 在开头添加
APPEND = "append" # 在末尾添加
REPLACE = "replace" # 替换指定内容
REMOVE = "remove" # 删除指定内容
INSERT_AFTER = "insert_after" # 在指定内容之后插入
def __str__(self) -> str:
return self.value
@dataclass
class InjectionRule:
"""Prompt注入规则"""
target_prompt: str # 目标Prompt的名称
injection_type: InjectionType = InjectionType.PREPEND # 注入类型
priority: int = 100 # 优先级,数字越小越先执行
target_content: str | None = None # 用于REPLACE、REMOVE和INSERT_AFTER操作的目标内容支持正则表达式
def __post_init__(self):
if self.injection_type in [
InjectionType.REPLACE,
InjectionType.REMOVE,
InjectionType.INSERT_AFTER,
] and self.target_content is None:
raise ValueError(f"'{self.injection_type.value}'类型的注入规则必须提供 'target_content'")
from maim_message import Seg
from src.llm_models.payload_content.tool_option import ToolCall as ToolCall
@@ -271,13 +303,30 @@ class EventInfo(ComponentInfo):
class PromptInfo(ComponentInfo):
"""Prompt组件信息"""
injection_point: str | list[str] = ""
"""要注入的目标Prompt名称或列表"""
injection_rules: list[InjectionRule] = field(default_factory=list)
"""定义此组件如何注入到其他Prompt"""
# 旧的injection_point用于向后兼容
injection_point: str | list[str] | None = None
def __post_init__(self):
super().__post_init__()
self.component_type = ComponentType.PROMPT
# 向后兼容逻辑:如果定义了旧的 injection_point则自动转换为新的 injection_rules
if self.injection_point:
if not self.injection_rules: # 仅当rules为空时转换
points = []
if isinstance(self.injection_point, str):
points.append(self.injection_point)
elif isinstance(self.injection_point, list):
points = self.injection_point
for point in points:
self.injection_rules.append(InjectionRule(target_prompt=point))
# 转换后可以清空旧字段,避免混淆
self.injection_point = None
@dataclass
class PluginInfo:

View File

@@ -60,7 +60,7 @@ class ChatterPlanFilter:
prompt, used_message_id_list = await self._build_prompt(plan)
plan.llm_prompt = prompt
if global_config.debug.show_prompt:
logger.debug(f"规划器原始提示词:{prompt}")
logger.info(f"规划器原始提示词:{prompt}") #叫你不要改你耳朵聋吗😡😡😡😡😡
llm_content, _ = await self.planner_llm.generate_response_async(prompt=prompt)

View File

@@ -162,16 +162,6 @@ class MessageHandler:
)
logger.debug(f"原始消息内容: {raw_message.get('message', [])}")
# 检查是否包含@或video消息段
message_segments = raw_message.get("message", [])
if message_segments:
for i, seg in enumerate(message_segments):
seg_type = seg.get("type")
if seg_type in ["at", "video"]:
logger.info(f"检测到 {seg_type.upper()} 消息段 [{i}]: {seg}")
elif seg_type not in ["text", "face", "image"]:
logger.warning(f"检测到特殊消息段 [{i}]: type={seg_type}, data={seg.get('data', {})}")
message_type: str = raw_message.get("message_type")
message_id: int = raw_message.get("message_id")
# message_time: int = raw_message.get("time")

View File

@@ -6,7 +6,6 @@ from datetime import datetime
from maim_message import UserInfo
from src.chat.message_manager.sleep_system.state_manager import SleepState, sleep_state_manager
from src.chat.message_receive.chat_stream import get_chat_manager
from src.common.logger import get_logger
from src.config.config import global_config
@@ -39,10 +38,6 @@ class ColdStartTask(AsyncTask):
await asyncio.sleep(30) # 延迟以确保所有服务和聊天流已从数据库加载完毕
try:
current_state = sleep_state_manager.get_current_state()
if current_state == SleepState.SLEEPING:
logger.info("bot正在睡觉,跳过本次任务")
return
logger.info("【冷启动】开始扫描白名单,唤醒沉睡的聊天流...")
# 【修复】增加对私聊总开关的判断
@@ -152,10 +147,6 @@ class ProactiveThinkingTask(AsyncTask):
# 计算下一次检查前的休眠时间
next_interval = self._get_next_interval()
try:
current_state = sleep_state_manager.get_current_state()
if current_state == SleepState.SLEEPING:
logger.info("bot正在睡觉,跳过本次任务")
return
logger.debug(f"【日常唤醒】下一次检查将在 {next_interval:.2f} 秒后进行。")
await asyncio.sleep(next_interval)

View File

@@ -1,5 +1,5 @@
[inner]
version = "7.3.5"
version = "7.4.5"
#----以下是给开发人员阅读的如果你只是部署了MoFox-Bot不需要阅读----
#如果你想要修改配置文件请递增version的值
@@ -485,44 +485,6 @@ searxng_api_keys = []# SearXNG 实例 API 密钥列表
enabled_engines = ["ddg"] # 启用的搜索引擎列表,可选: "exa", "tavily", "ddg","bing", "metaso"
search_strategy = "single" # 搜索策略: "single"(使用第一个可用引擎), "parallel"(并行使用所有启用的引擎), "fallback"(按顺序尝试,失败则尝试下一个)
[sleep_system]
enable = false #"是否启用睡眠系统"
sleep_by_schedule = true #"是否根据日程表进行睡觉"
fixed_sleep_time = "23:00" #"固定的睡觉时间"
fixed_wake_up_time = "07:00" #"固定的起床时间"
sleep_time_offset_minutes = 15 #"睡觉时间随机偏移量范围(分钟),实际睡觉时间会在±该值范围内随机"
wake_up_time_offset_minutes = 15 #"起床时间随机偏移量范围(分钟),实际起床时间会在±该值范围内随机"
wakeup_threshold = 15.0 #唤醒阈值,达到此值时会被唤醒"
private_message_increment = 3.0 #"私聊消息增加的唤醒度"
group_mention_increment = 2.0 #"群聊艾特增加的唤醒度"
decay_rate = 0.2 #"每次衰减的唤醒度数值"
decay_interval = 30.0 #"唤醒度衰减间隔(秒)"
angry_duration = 300.0 #"愤怒状态持续时间(秒)"
angry_prompt = "你被人吵醒了非常生气,说话带着怒气" # "被吵醒后的愤怒提示词"
re_sleep_delay_minutes = 5 # "被唤醒后,如果多久没有新消息则尝试重新入睡(分钟)"
# --- 失眠机制相关参数 ---
enable_insomnia_system = false # 是否启用失眠系统
# 失眠概率 (0.0 to 1.0)
insomnia_chance_pressure = 0.1
# --- 弹性睡眠与睡前消息 ---
# 是否启用弹性睡眠。启用后AI不会到点立刻入睡而是会根据睡眠压力增加5-10分钟的缓冲并可能因为压力不足而推迟睡眠。
enable_flexible_sleep = false
# 触发弹性睡眠的睡眠压力阈值。当AI的睡眠压力低于此值时可能会推迟入睡。
flexible_sleep_pressure_threshold = 40.0
# 每日最大可推迟入睡的总分钟数。
max_sleep_delay_minutes = 60
# 是否在进入“准备入睡”状态时发送一条消息通知。
enable_pre_sleep_notification = false
# 用于生成睡前消息的提示。AI会根据这个提示生成一句晚安问候。
pre_sleep_prompt = "我准备睡觉了,请生成一句简短自然的晚安问候。"
insomnia_duration_minutes = [30, 60] # 单次失眠状态的持续时间范围(分钟)
# --- 睡后失眠 ---
# 入睡后,经过一段延迟后触发失眠判定的延迟时间(分钟),设置为范围以增加随机性
insomnia_trigger_delay_minutes = [15, 45]
[cross_context] # 跨群聊/私聊上下文共享配置
# 这是总开关,用于一键启用或禁用此功能
enable = true